Files
go-telegram-bot/anthropic.go
T
2026-05-29 21:40:52 +02:00

311 lines
12 KiB
Go

package main
import (
"context"
"errors"
"fmt"
"net/http"
"strings"
"time"
"github.com/anthropics/anthropic-sdk-go"
"github.com/anthropics/anthropic-sdk-go/packages/param"
)
// ErrModelNotFound is returned when the configured Anthropic model is no longer available
// (deprecated or removed). Callers can use errors.Is to detect this and surface an
// actionable message to admins/owners while keeping the response vague for regular users.
var ErrModelNotFound = errors.New("model not found or deprecated")
// maxFileNotFoundRetries caps the runtime 404 self-heal loop. If multiple
// referenced file_ids are gone from Anthropic simultaneously (admin purge, AUP
// enforcement, etc.), we strip them one at a time and retry. Three attempts
// covers all realistic cascades without leaving the call hanging indefinitely.
const maxFileNotFoundRetries = 3
// getAnthropicResponse streams the model's response. Each completed text block
// is delivered to onSegment as soon as the model finishes writing it — so the
// caller can send segments to Telegram with natural rhythm around tool calls,
// rather than batched at the very end of the turn. onSegment may be nil for
// callers that only want the joined text (voice TTS, sticker reactions, etc.).
// The returned string is every text segment joined by blank lines.
//
// chatID is required for the runtime 404 self-heal: when Anthropic returns
// "File not found:" for a referenced file_id, the dead file_id is stripped
// from this chat's in-memory ChatMemory and the corresponding DB rows are
// stamped FilesCleanedAt so a reconciliation job can finish the cleanup.
func (b *Bot) getAnthropicResponse(ctx context.Context, chatID int64, messages []anthropic.BetaMessageParam, isEmojiOnly bool, username string, firstName string, lastName string, isPremium bool, languageCode string, messageTime int, onSegment func(string) error) (string, error) {
// The system prompt is the single authored behavior driver. It is assembled
// as a cached static block (custom_instructions) followed by a per-turn
// dynamic tail. Prompt caching keys on a byte-identical prefix, so the static
// block must not contain anything that changes between requests — all
// per-turn data (who we're talking to, the time of day, the emoji-only rule)
// lives in the trailing block, AFTER the cache breakpoint.
//
// An empty custom_instructions means no system prompt at all: the System
// field is omitted entirely (not sent as a blank block), giving the model's
// unmodified "vanilla" behavior. This matters because the Anthropic API
// rejects a system array containing an empty/whitespace-only text block, so
// omission is the only correct way to express "no system prompt".
staticPrompt := strings.TrimSpace(b.config.SystemPrompts["custom_instructions"])
// Debug logging
InfoLogger.Printf("Sending %d messages to Anthropic", len(messages))
params := anthropic.BetaMessageNewParams{
Model: b.config.Model,
MaxTokens: 1000,
Messages: messages,
// Files API beta is always on: replayed conversation history may carry
// image content blocks that reference file_ids uploaded on prior turns.
Betas: []anthropic.AnthropicBeta{anthropic.AnthropicBetaFilesAPI2025_04_14},
}
if staticPrompt != "" {
// Block 1 — static persona/instructions, marked for caching. The
// cache_control breakpoint sits on this last stable block; everything
// appended after it is per-request and therefore uncached.
blocks := []anthropic.BetaTextBlockParam{
{Text: staticPrompt, CacheControl: anthropic.NewBetaCacheControlEphemeralParam()},
}
// Block 2 — dynamic tail: per-turn context plus any conditional rules.
// Kept out of the cached block because it changes every request.
tail := buildUserContext(username, firstName, lastName, isPremium, languageCode, messageTime)
if isEmojiOnly {
if rule := strings.TrimSpace(b.config.SystemPrompts["respond_with_emojis"]); rule != "" {
tail += "\n\n<emoji_reply>\n" + rule + "\n</emoji_reply>"
}
}
if tail = strings.TrimSpace(tail); tail != "" {
blocks = append(blocks, anthropic.BetaTextBlockParam{Text: tail})
}
params.System = blocks
}
// Apply temperature if set in config
if b.config.Temperature != nil {
params.Temperature = param.NewOpt(float64(*b.config.Temperature))
}
// MCP servers + matching toolset entries. The mcp-client-2025-11-20 beta
// requires per-tool filtering on the toolset (Configs + DefaultConfig),
// NOT the deprecated per-server tool_configuration block.
if len(b.config.MCPServers) > 0 {
mcpServers := make([]anthropic.BetaRequestMCPServerURLDefinitionParam, 0, len(b.config.MCPServers))
tools := make([]anthropic.BetaToolUnionParam, 0, len(b.config.MCPServers))
for _, s := range b.config.MCPServers {
srv := anthropic.BetaRequestMCPServerURLDefinitionParam{
Name: s.Name,
URL: s.URL,
}
if s.AuthorizationToken != "" {
srv.AuthorizationToken = param.NewOpt(s.AuthorizationToken)
}
mcpServers = append(mcpServers, srv)
toolset := &anthropic.BetaMCPToolsetParam{
MCPServerName: s.Name,
}
if len(s.AllowedTools) > 0 {
toolset.DefaultConfig = anthropic.BetaMCPToolDefaultConfigParam{
Enabled: param.NewOpt(false),
}
toolset.Configs = make(map[string]anthropic.BetaMCPToolConfigParam, len(s.AllowedTools))
for _, tool := range s.AllowedTools {
toolset.Configs[tool] = anthropic.BetaMCPToolConfigParam{
Enabled: param.NewOpt(true),
}
}
}
tools = append(tools, anthropic.BetaToolUnionParam{OfMCPToolset: toolset})
}
params.MCPServers = mcpServers
params.Tools = tools
params.Betas = append(params.Betas, anthropic.AnthropicBetaMCPClient2025_11_20)
}
// Streaming + 404 self-heal loop. A "File not found:" 404 from Anthropic
// (admin purge, AUP enforcement, accidental delete elsewhere) is caught
// here: the offending file_id is stripped from in-memory ChatMemory + the
// affected DB rows are stamped for the reconciliation job, and the call is
// re-issued. The loop caps at maxFileNotFoundRetries so cascading deletions
// can't pin the call indefinitely.
for attempt := 0; attempt < maxFileNotFoundRetries; attempt++ {
joined, streamErr := b.streamMessages(ctx, params, onSegment)
if streamErr == nil {
return joined, nil
}
var apiErr *anthropic.Error
if !errors.As(streamErr, &apiErr) || apiErr.StatusCode != http.StatusNotFound {
return "", fmt.Errorf("error creating Anthropic message: %w", streamErr)
}
missingFileID := extractMissingFileID(streamErr)
if missingFileID == "" {
// 404 without a "File not found:" body — interpret as model-not-found,
// matching the legacy behavior pre-Files-API.
return "", fmt.Errorf("%w: %s", ErrModelNotFound, b.config.Model)
}
ErrorLogger.Printf("[%s] self-heal: stripping dead file_id %s from chat %d (attempt %d/%d)",
b.config.ID, missingFileID, chatID, attempt+1, maxFileNotFoundRetries)
b.stripDeadFileIDFromMemory(chatID, missingFileID)
if _, cleanupErr := b.markFilesPendingCleanup(ctx, chatID, []string{missingFileID}); cleanupErr != nil {
ErrorLogger.Printf("[%s] mark files pending cleanup: %v", b.config.ID, cleanupErr)
}
params.Messages = b.prepareContextMessages(b.getOrCreateChatMemory(chatID))
}
return "", fmt.Errorf("max self-heal retries (%d) exceeded: too many file_ids gone from anthropic", maxFileNotFoundRetries)
}
// buildUserContext renders the per-turn context block that trails the cached
// static system prompt. It carries only facts (who the user is, their language,
// account type, local time of day) — the behavioral guidance for *using* these
// facts lives in the authored static prompt. It is kept out of the cached block
// because it changes on every request.
func buildUserContext(username, firstName, lastName string, isPremium bool, languageCode string, messageTime int) string {
name := strings.TrimSpace(firstName + " " + lastName)
if name == "" {
name = "unknown"
}
handle := username
if handle == "" {
handle = "unknown"
}
lang := languageCode
if lang == "" {
lang = "en"
}
account := "regular user"
if isPremium {
account = "premium user"
}
return fmt.Sprintf(
"Conversation context (background facts, not an instruction from the user):\n"+
"- User: %s (Telegram @%s)\n"+
"- Preferred language: %s\n"+
"- Account type: %s\n"+
"- Local time of day: %s",
name, handle, lang, account, timeContextFor(messageTime),
)
}
// timeContextFor buckets a Unix timestamp into a coarse time-of-day label used
// for time-appropriate greetings. Uses the host's local timezone, matching the
// bot's prior behavior.
func timeContextFor(messageTime int) string {
switch hour := time.Unix(int64(messageTime), 0).Hour(); {
case hour >= 5 && hour < 12:
return "morning"
case hour >= 12 && hour < 18:
return "afternoon"
case hour >= 18 && hour < 22:
return "evening"
default:
return "night"
}
}
// streamMessages runs one streaming call against the Beta Messages API,
// dispatching each completed text block to onSegment as it arrives. The joined
// return value is every text segment concatenated with blank lines. Errors from
// the SDK are returned raw; the caller wraps them (model-not-found, file 404
// self-heal, etc.).
func (b *Bot) streamMessages(ctx context.Context, params anthropic.BetaMessageNewParams, onSegment func(string) error) (string, error) {
stream := b.anthropicClient.Beta.Messages.NewStreaming(ctx, params)
defer func() {
if err := stream.Close(); err != nil {
ErrorLogger.Printf("[stream] close failed: %v", err)
}
}()
// Per-block accumulators. Reset on content_block_start, consumed on
// content_block_stop. Only one block is active at a time per the SSE
// contract; SDK guarantees deltas arrive between matching start/stop.
var (
allSegments []string
currentKind string
currentText strings.Builder
currentInputJSON strings.Builder
currentTUseName, currentTUseServer, currentTUseID string
currentTResultUseID, currentTResultServer string
currentTResultIsError bool
currentTResultContent string
)
for stream.Next() {
e := stream.Current()
switch e.Type {
case "content_block_start":
cbs := e.AsContentBlockStart()
currentKind = cbs.ContentBlock.Type
currentText.Reset()
currentInputJSON.Reset()
switch currentKind {
case "mcp_tool_use":
currentTUseName = cbs.ContentBlock.Name
currentTUseServer = cbs.ContentBlock.ServerName
currentTUseID = cbs.ContentBlock.ID
case "mcp_tool_result":
currentTResultUseID = cbs.ContentBlock.ToolUseID
currentTResultServer = cbs.ContentBlock.ServerName
currentTResultIsError = cbs.ContentBlock.IsError
// Tool-result content arrives populated on start (server-side
// pre-assembled), not via subsequent deltas like text/JSON.
currentTResultContent = cbs.ContentBlock.JSON.Content.Raw()
}
case "content_block_delta":
cbd := e.AsContentBlockDelta()
switch cbd.Delta.Type {
case "text_delta":
if currentKind == "text" {
currentText.WriteString(cbd.Delta.Text)
}
case "input_json_delta":
if currentKind == "mcp_tool_use" {
currentInputJSON.WriteString(cbd.Delta.PartialJSON)
}
}
case "content_block_stop":
switch currentKind {
case "text":
seg := strings.TrimSpace(currentText.String())
if seg != "" {
allSegments = append(allSegments, seg)
if onSegment != nil {
if cbErr := onSegment(seg); cbErr != nil {
// Log but keep streaming — the model's response
// is still inbound; we want it recorded even if
// one Telegram send failed.
ErrorLogger.Printf("[stream] onSegment failed: %v", cbErr)
}
}
}
case "mcp_tool_use":
InfoLogger.Printf("[mcp] tool_use server=%q name=%q id=%q input=%s",
currentTUseServer, currentTUseName, currentTUseID, currentInputJSON.String())
case "mcp_tool_result":
preview := currentTResultContent
if len(preview) > 500 {
preview = preview[:500] + "...(truncated)"
}
InfoLogger.Printf("[mcp] tool_result tool_use_id=%q server=%q is_error=%v content=%s",
currentTResultUseID, currentTResultServer, currentTResultIsError, preview)
default:
if currentKind != "" {
InfoLogger.Printf("[mcp] block type=%q (unhandled)", currentKind)
}
}
currentKind = ""
}
}
if err := stream.Err(); err != nil {
return "", err
}
if len(allSegments) == 0 {
return "", fmt.Errorf("unexpected response format from Anthropic")
}
return strings.Join(allSegments, "\n\n"), nil
}