package main import ( "context" "errors" "fmt" "net/http" "strings" "time" "github.com/anthropics/anthropic-sdk-go" "github.com/anthropics/anthropic-sdk-go/packages/param" ) // ErrModelNotFound is returned when the configured Anthropic model is no longer available // (deprecated or removed). Callers can use errors.Is to detect this and surface an // actionable message to admins/owners while keeping the response vague for regular users. var ErrModelNotFound = errors.New("model not found or deprecated") // maxFileNotFoundRetries caps the runtime 404 self-heal loop. If multiple // referenced file_ids are gone from Anthropic simultaneously (admin purge, AUP // enforcement, etc.), we strip them one at a time and retry. Three attempts // covers all realistic cascades without leaving the call hanging indefinitely. const maxFileNotFoundRetries = 3 // getAnthropicResponse streams the model's response. Each completed text block // is delivered to onSegment as soon as the model finishes writing it — so the // caller can send segments to Telegram with natural rhythm around tool calls, // rather than batched at the very end of the turn. onSegment may be nil for // callers that only want the joined text (voice TTS, sticker reactions, etc.). // The returned string is every text segment joined by blank lines. // // chatID is required for the runtime 404 self-heal: when Anthropic returns // "File not found:" for a referenced file_id, the dead file_id is stripped // from this chat's in-memory ChatMemory and the corresponding DB rows are // stamped FilesCleanedAt so a reconciliation job can finish the cleanup. func (b *Bot) getAnthropicResponse(ctx context.Context, chatID int64, messages []anthropic.BetaMessageParam, isNewChat, isOwner, isEmojiOnly bool, username string, firstName string, lastName string, isPremium bool, languageCode string, messageTime int, onSegment func(string) error) (string, error) { // Use prompts from config var systemMessage string if isNewChat { systemMessage = b.config.SystemPrompts["new_chat"] } else { systemMessage = b.config.SystemPrompts["continue_conversation"] } // Combine default prompt with custom instructions systemMessage = b.config.SystemPrompts["default"] + " " + b.config.SystemPrompts["custom_instructions"] + " " + systemMessage // Handle username placeholder usernameValue := username if username == "" { usernameValue = "unknown" // Use "unknown" when username is not available } systemMessage = strings.ReplaceAll(systemMessage, "{username}", usernameValue) // Handle firstname placeholder firstnameValue := firstName if firstName == "" { firstnameValue = "unknown" // Use "unknown" when first name is not available } systemMessage = strings.ReplaceAll(systemMessage, "{firstname}", firstnameValue) // Handle lastname placeholder lastnameValue := lastName if lastName == "" { lastnameValue = "" // Empty string when last name is not available } systemMessage = strings.ReplaceAll(systemMessage, "{lastname}", lastnameValue) // Handle language code placeholder langValue := languageCode if languageCode == "" { langValue = "en" // Default to English when language code is not available } systemMessage = strings.ReplaceAll(systemMessage, "{language}", langValue) // Handle premium status premiumStatus := "regular user" if isPremium { premiumStatus = "premium user" } systemMessage = strings.ReplaceAll(systemMessage, "{premium_status}", premiumStatus) // Handle time awareness timeObj := time.Unix(int64(messageTime), 0) hour := timeObj.Hour() var timeContext string if hour >= 5 && hour < 12 { timeContext = "morning" } else if hour >= 12 && hour < 18 { timeContext = "afternoon" } else if hour >= 18 && hour < 22 { timeContext = "evening" } else { timeContext = "night" } systemMessage = strings.ReplaceAll(systemMessage, "{time_context}", timeContext) if !isOwner { systemMessage += " " + b.config.SystemPrompts["avoid_sensitive"] } if isEmojiOnly { systemMessage += " " + b.config.SystemPrompts["respond_with_emojis"] } // Debug logging InfoLogger.Printf("Sending %d messages to Anthropic", len(messages)) params := anthropic.BetaMessageNewParams{ Model: b.config.Model, MaxTokens: 1000, Messages: messages, System: []anthropic.BetaTextBlockParam{{Text: systemMessage}}, // Files API beta is always on: replayed conversation history may carry // image content blocks that reference file_ids uploaded on prior turns. Betas: []anthropic.AnthropicBeta{anthropic.AnthropicBetaFilesAPI2025_04_14}, } // Apply temperature if set in config if b.config.Temperature != nil { params.Temperature = param.NewOpt(float64(*b.config.Temperature)) } // MCP servers + matching toolset entries. The mcp-client-2025-11-20 beta // requires per-tool filtering on the toolset (Configs + DefaultConfig), // NOT the deprecated per-server tool_configuration block. if len(b.config.MCPServers) > 0 { mcpServers := make([]anthropic.BetaRequestMCPServerURLDefinitionParam, 0, len(b.config.MCPServers)) tools := make([]anthropic.BetaToolUnionParam, 0, len(b.config.MCPServers)) for _, s := range b.config.MCPServers { srv := anthropic.BetaRequestMCPServerURLDefinitionParam{ Name: s.Name, URL: s.URL, } if s.AuthorizationToken != "" { srv.AuthorizationToken = param.NewOpt(s.AuthorizationToken) } mcpServers = append(mcpServers, srv) toolset := &anthropic.BetaMCPToolsetParam{ MCPServerName: s.Name, } if len(s.AllowedTools) > 0 { toolset.DefaultConfig = anthropic.BetaMCPToolDefaultConfigParam{ Enabled: param.NewOpt(false), } toolset.Configs = make(map[string]anthropic.BetaMCPToolConfigParam, len(s.AllowedTools)) for _, tool := range s.AllowedTools { toolset.Configs[tool] = anthropic.BetaMCPToolConfigParam{ Enabled: param.NewOpt(true), } } } tools = append(tools, anthropic.BetaToolUnionParam{OfMCPToolset: toolset}) } params.MCPServers = mcpServers params.Tools = tools params.Betas = append(params.Betas, anthropic.AnthropicBetaMCPClient2025_11_20) } // Streaming + 404 self-heal loop. A "File not found:" 404 from Anthropic // (admin purge, AUP enforcement, accidental delete elsewhere) is caught // here: the offending file_id is stripped from in-memory ChatMemory + the // affected DB rows are stamped for the reconciliation job, and the call is // re-issued. The loop caps at maxFileNotFoundRetries so cascading deletions // can't pin the call indefinitely. for attempt := 0; attempt < maxFileNotFoundRetries; attempt++ { joined, streamErr := b.streamMessages(ctx, params, onSegment) if streamErr == nil { return joined, nil } var apiErr *anthropic.Error if !errors.As(streamErr, &apiErr) || apiErr.StatusCode != http.StatusNotFound { return "", fmt.Errorf("error creating Anthropic message: %w", streamErr) } missingFileID := extractMissingFileID(streamErr) if missingFileID == "" { // 404 without a "File not found:" body — interpret as model-not-found, // matching the legacy behavior pre-Files-API. return "", fmt.Errorf("%w: %s", ErrModelNotFound, b.config.Model) } ErrorLogger.Printf("[%s] self-heal: stripping dead file_id %s from chat %d (attempt %d/%d)", b.config.ID, missingFileID, chatID, attempt+1, maxFileNotFoundRetries) b.stripDeadFileIDFromMemory(chatID, missingFileID) if _, cleanupErr := b.markFilesPendingCleanup(ctx, chatID, []string{missingFileID}); cleanupErr != nil { ErrorLogger.Printf("[%s] mark files pending cleanup: %v", b.config.ID, cleanupErr) } params.Messages = b.prepareContextMessages(b.getOrCreateChatMemory(chatID)) } return "", fmt.Errorf("max self-heal retries (%d) exceeded: too many file_ids gone from anthropic", maxFileNotFoundRetries) } // streamMessages runs one streaming call against the Beta Messages API, // dispatching each completed text block to onSegment as it arrives. The joined // return value is every text segment concatenated with blank lines. Errors from // the SDK are returned raw; the caller wraps them (model-not-found, file 404 // self-heal, etc.). func (b *Bot) streamMessages(ctx context.Context, params anthropic.BetaMessageNewParams, onSegment func(string) error) (string, error) { stream := b.anthropicClient.Beta.Messages.NewStreaming(ctx, params) defer func() { if err := stream.Close(); err != nil { ErrorLogger.Printf("[stream] close failed: %v", err) } }() // Per-block accumulators. Reset on content_block_start, consumed on // content_block_stop. Only one block is active at a time per the SSE // contract; SDK guarantees deltas arrive between matching start/stop. var ( allSegments []string currentKind string currentText strings.Builder currentInputJSON strings.Builder currentTUseName, currentTUseServer, currentTUseID string currentTResultUseID, currentTResultServer string currentTResultIsError bool currentTResultContent string ) for stream.Next() { e := stream.Current() switch e.Type { case "content_block_start": cbs := e.AsContentBlockStart() currentKind = cbs.ContentBlock.Type currentText.Reset() currentInputJSON.Reset() switch currentKind { case "mcp_tool_use": currentTUseName = cbs.ContentBlock.Name currentTUseServer = cbs.ContentBlock.ServerName currentTUseID = cbs.ContentBlock.ID case "mcp_tool_result": currentTResultUseID = cbs.ContentBlock.ToolUseID currentTResultServer = cbs.ContentBlock.ServerName currentTResultIsError = cbs.ContentBlock.IsError // Tool-result content arrives populated on start (server-side // pre-assembled), not via subsequent deltas like text/JSON. currentTResultContent = cbs.ContentBlock.JSON.Content.Raw() } case "content_block_delta": cbd := e.AsContentBlockDelta() switch cbd.Delta.Type { case "text_delta": if currentKind == "text" { currentText.WriteString(cbd.Delta.Text) } case "input_json_delta": if currentKind == "mcp_tool_use" { currentInputJSON.WriteString(cbd.Delta.PartialJSON) } } case "content_block_stop": switch currentKind { case "text": seg := strings.TrimSpace(currentText.String()) if seg != "" { allSegments = append(allSegments, seg) if onSegment != nil { if cbErr := onSegment(seg); cbErr != nil { // Log but keep streaming — the model's response // is still inbound; we want it recorded even if // one Telegram send failed. ErrorLogger.Printf("[stream] onSegment failed: %v", cbErr) } } } case "mcp_tool_use": InfoLogger.Printf("[mcp] tool_use server=%q name=%q id=%q input=%s", currentTUseServer, currentTUseName, currentTUseID, currentInputJSON.String()) case "mcp_tool_result": preview := currentTResultContent if len(preview) > 500 { preview = preview[:500] + "...(truncated)" } InfoLogger.Printf("[mcp] tool_result tool_use_id=%q server=%q is_error=%v content=%s", currentTResultUseID, currentTResultServer, currentTResultIsError, preview) default: if currentKind != "" { InfoLogger.Printf("[mcp] block type=%q (unhandled)", currentKind) } } currentKind = "" } } if err := stream.Err(); err != nil { return "", err } if len(allSegments) == 0 { return "", fmt.Errorf("unexpected response format from Anthropic") } return strings.Join(allSegments, "\n\n"), nil }