mirror of
https://github.com/HugeFrog24/go-telegram-bot.git
synced 2026-06-29 22:07:12 +00:00
MCP error logging
This commit is contained in:
+272
-90
@@ -6,6 +6,7 @@ import (
|
||||
"fmt"
|
||||
"net/http"
|
||||
"strings"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/anthropics/anthropic-sdk-go"
|
||||
@@ -17,75 +18,68 @@ import (
|
||||
// actionable message to admins/owners while keeping the response vague for regular users.
|
||||
var ErrModelNotFound = errors.New("model not found or deprecated")
|
||||
|
||||
func (b *Bot) getAnthropicResponse(ctx context.Context, messages []anthropic.BetaMessageParam, isNewChat, isOwner, isEmojiOnly bool, username string, firstName string, lastName string, isPremium bool, languageCode string, messageTime int) (string, error) {
|
||||
// Use prompts from config
|
||||
var systemMessage string
|
||||
if isNewChat {
|
||||
systemMessage = b.config.SystemPrompts["new_chat"]
|
||||
} else {
|
||||
systemMessage = b.config.SystemPrompts["continue_conversation"]
|
||||
}
|
||||
// maxFileNotFoundRetries caps the runtime 404 self-heal loop. If multiple
|
||||
// referenced file_ids are gone from Anthropic simultaneously (admin purge, AUP
|
||||
// enforcement, etc.), we strip them one at a time and retry. Three attempts
|
||||
// covers all realistic cascades without leaving the call hanging indefinitely.
|
||||
const maxFileNotFoundRetries = 3
|
||||
|
||||
// Combine default prompt with custom instructions
|
||||
systemMessage = b.config.SystemPrompts["default"] + " " + b.config.SystemPrompts["custom_instructions"] + " " + systemMessage
|
||||
// mcpUnsupportedSentinel is the placeholder text Anthropic's server-side MCP
|
||||
// connector substitutes when a tool result can't be serialized into a supported
|
||||
// content block. It arrives inside a normal mcp_tool_result with is_error=false,
|
||||
// so it is otherwise indistinguishable from success — every empty-result issue
|
||||
// across the ecosystem (strands #2122, openai-agents #1035, opencode #15371)
|
||||
// shows the same is_error=false on these, so we substring-match the text rather
|
||||
// than rely on the error flag. Observed trigger: an MCP server returns an empty
|
||||
// content array for a zero-result query (e.g. Outline list_documents with no
|
||||
// match), which the connector can't serialize. Substring (not the full
|
||||
// sentence) so a minor wording change upstream doesn't silently break detection.
|
||||
//
|
||||
// Scope: this catches the SOFT variant only — a streamed mcp_tool_result whose
|
||||
// content is the sentinel. The HARD variant (e.g. an unsupported image media
|
||||
// type) is a 400 that aborts the whole stream and never produces a result
|
||||
// block, so it surfaces via streamMessages' error return, not here.
|
||||
const mcpUnsupportedSentinel = "format not currently supported by the Anthropic API"
|
||||
|
||||
// Handle username placeholder
|
||||
usernameValue := username
|
||||
if username == "" {
|
||||
usernameValue = "unknown" // Use "unknown" when username is not available
|
||||
}
|
||||
systemMessage = strings.ReplaceAll(systemMessage, "{username}", usernameValue)
|
||||
// mcpUnsupportedCount tallies sentinel hits across the whole process lifetime
|
||||
// (all bots, all MCP servers) so the true rate is visible — the chat masks most
|
||||
// of them because the model often degrades gracefully. The per-hit ERROR line
|
||||
// carries the bot ID and server name for attribution; this is just the running
|
||||
// total.
|
||||
var mcpUnsupportedCount atomic.Uint64
|
||||
|
||||
// Handle firstname placeholder
|
||||
firstnameValue := firstName
|
||||
if firstName == "" {
|
||||
firstnameValue = "unknown" // Use "unknown" when first name is not available
|
||||
}
|
||||
systemMessage = strings.ReplaceAll(systemMessage, "{firstname}", firstnameValue)
|
||||
// mcpCall pairs a tool_use's server+name+input so a later unsupported result —
|
||||
// which arrives in a SEPARATE block linked only by tool_use_id — can name the
|
||||
// server and query that triggered it. server matters once a bot configures more
|
||||
// than one MCP server (the config supports a slice), otherwise the log can't say
|
||||
// which server choked.
|
||||
type mcpCall struct{ server, name, input string }
|
||||
|
||||
// Handle lastname placeholder
|
||||
lastnameValue := lastName
|
||||
if lastName == "" {
|
||||
lastnameValue = "" // Empty string when last name is not available
|
||||
}
|
||||
systemMessage = strings.ReplaceAll(systemMessage, "{lastname}", lastnameValue)
|
||||
|
||||
// Handle language code placeholder
|
||||
langValue := languageCode
|
||||
if languageCode == "" {
|
||||
langValue = "en" // Default to English when language code is not available
|
||||
}
|
||||
systemMessage = strings.ReplaceAll(systemMessage, "{language}", langValue)
|
||||
|
||||
// Handle premium status
|
||||
premiumStatus := "regular user"
|
||||
if isPremium {
|
||||
premiumStatus = "premium user"
|
||||
}
|
||||
systemMessage = strings.ReplaceAll(systemMessage, "{premium_status}", premiumStatus)
|
||||
|
||||
// Handle time awareness
|
||||
timeObj := time.Unix(int64(messageTime), 0)
|
||||
hour := timeObj.Hour()
|
||||
var timeContext string
|
||||
if hour >= 5 && hour < 12 {
|
||||
timeContext = "morning"
|
||||
} else if hour >= 12 && hour < 18 {
|
||||
timeContext = "afternoon"
|
||||
} else if hour >= 18 && hour < 22 {
|
||||
timeContext = "evening"
|
||||
} else {
|
||||
timeContext = "night"
|
||||
}
|
||||
systemMessage = strings.ReplaceAll(systemMessage, "{time_context}", timeContext)
|
||||
|
||||
if !isOwner {
|
||||
systemMessage += " " + b.config.SystemPrompts["avoid_sensitive"]
|
||||
}
|
||||
|
||||
if isEmojiOnly {
|
||||
systemMessage += " " + b.config.SystemPrompts["respond_with_emojis"]
|
||||
}
|
||||
// getAnthropicResponse streams the model's response. Each completed text block
|
||||
// is delivered to onSegment as soon as the model finishes writing it — so the
|
||||
// caller can send segments to Telegram with natural rhythm around tool calls,
|
||||
// rather than batched at the very end of the turn. onSegment may be nil for
|
||||
// callers that only want the joined text (voice TTS, sticker reactions, etc.).
|
||||
// The returned string is every text segment joined by blank lines.
|
||||
//
|
||||
// chatID is required for the runtime 404 self-heal: when Anthropic returns
|
||||
// "File not found:" for a referenced file_id, the dead file_id is stripped
|
||||
// from this chat's in-memory ChatMemory and the corresponding DB rows are
|
||||
// stamped FilesCleanedAt so a reconciliation job can finish the cleanup.
|
||||
func (b *Bot) getAnthropicResponse(ctx context.Context, chatID int64, messages []anthropic.BetaMessageParam, isEmojiOnly bool, username string, firstName string, lastName string, isPremium bool, languageCode string, messageTime int, onSegment func(string) error) (string, error) {
|
||||
// The system prompt is the single authored behavior driver. It is assembled
|
||||
// as a cached static block (custom_instructions) followed by a per-turn
|
||||
// dynamic tail. Prompt caching keys on a byte-identical prefix, so the static
|
||||
// block must not contain anything that changes between requests — all
|
||||
// per-turn data (who we're talking to, the time of day, the emoji-only rule)
|
||||
// lives in the trailing block, AFTER the cache breakpoint.
|
||||
//
|
||||
// An empty custom_instructions means no system prompt at all: the System
|
||||
// field is omitted entirely (not sent as a blank block), giving the model's
|
||||
// unmodified "vanilla" behavior. This matters because the Anthropic API
|
||||
// rejects a system array containing an empty/whitespace-only text block, so
|
||||
// omission is the only correct way to express "no system prompt".
|
||||
staticPrompt := strings.TrimSpace(b.config.SystemPrompts["custom_instructions"])
|
||||
|
||||
// Debug logging
|
||||
InfoLogger.Printf("Sending %d messages to Anthropic", len(messages))
|
||||
@@ -94,7 +88,30 @@ func (b *Bot) getAnthropicResponse(ctx context.Context, messages []anthropic.Bet
|
||||
Model: b.config.Model,
|
||||
MaxTokens: 1000,
|
||||
Messages: messages,
|
||||
System: []anthropic.BetaTextBlockParam{{Text: systemMessage}},
|
||||
// Files API beta is always on: replayed conversation history may carry
|
||||
// image content blocks that reference file_ids uploaded on prior turns.
|
||||
Betas: []anthropic.AnthropicBeta{anthropic.AnthropicBetaFilesAPI2025_04_14},
|
||||
}
|
||||
|
||||
if staticPrompt != "" {
|
||||
// Block 1 — static persona/instructions, marked for caching. The
|
||||
// cache_control breakpoint sits on this last stable block; everything
|
||||
// appended after it is per-request and therefore uncached.
|
||||
blocks := []anthropic.BetaTextBlockParam{
|
||||
{Text: staticPrompt, CacheControl: anthropic.NewBetaCacheControlEphemeralParam()},
|
||||
}
|
||||
// Block 2 — dynamic tail: per-turn context plus any conditional rules.
|
||||
// Kept out of the cached block because it changes every request.
|
||||
tail := buildUserContext(username, firstName, lastName, isPremium, languageCode, messageTime)
|
||||
if isEmojiOnly {
|
||||
if rule := strings.TrimSpace(b.config.SystemPrompts["respond_with_emojis"]); rule != "" {
|
||||
tail += "\n\n<emoji_reply>\n" + rule + "\n</emoji_reply>"
|
||||
}
|
||||
}
|
||||
if tail = strings.TrimSpace(tail); tail != "" {
|
||||
blocks = append(blocks, anthropic.BetaTextBlockParam{Text: tail})
|
||||
}
|
||||
params.System = blocks
|
||||
}
|
||||
|
||||
// Apply temperature if set in config
|
||||
@@ -136,40 +153,205 @@ func (b *Bot) getAnthropicResponse(ctx context.Context, messages []anthropic.Bet
|
||||
}
|
||||
params.MCPServers = mcpServers
|
||||
params.Tools = tools
|
||||
params.Betas = []anthropic.AnthropicBeta{anthropic.AnthropicBetaMCPClient2025_11_20}
|
||||
params.Betas = append(params.Betas, anthropic.AnthropicBetaMCPClient2025_11_20)
|
||||
}
|
||||
|
||||
resp, err := b.anthropicClient.Beta.Messages.New(ctx, params)
|
||||
if err != nil {
|
||||
// Streaming + 404 self-heal loop. A "File not found:" 404 from Anthropic
|
||||
// (admin purge, AUP enforcement, accidental delete elsewhere) is caught
|
||||
// here: the offending file_id is stripped from in-memory ChatMemory + the
|
||||
// affected DB rows are stamped for the reconciliation job, and the call is
|
||||
// re-issued. The loop caps at maxFileNotFoundRetries so cascading deletions
|
||||
// can't pin the call indefinitely.
|
||||
for attempt := 0; attempt < maxFileNotFoundRetries; attempt++ {
|
||||
joined, streamErr := b.streamMessages(ctx, params, onSegment)
|
||||
if streamErr == nil {
|
||||
return joined, nil
|
||||
}
|
||||
var apiErr *anthropic.Error
|
||||
if errors.As(err, &apiErr) && apiErr.StatusCode == http.StatusNotFound {
|
||||
if !errors.As(streamErr, &apiErr) || apiErr.StatusCode != http.StatusNotFound {
|
||||
return "", fmt.Errorf("error creating Anthropic message: %w", streamErr)
|
||||
}
|
||||
missingFileID := extractMissingFileID(streamErr)
|
||||
if missingFileID == "" {
|
||||
// 404 without a "File not found:" body — interpret as model-not-found,
|
||||
// matching the legacy behavior pre-Files-API.
|
||||
return "", fmt.Errorf("%w: %s", ErrModelNotFound, b.config.Model)
|
||||
}
|
||||
return "", fmt.Errorf("error creating Anthropic message: %w", err)
|
||||
ErrorLogger.Printf("[%s] self-heal: stripping dead file_id %s from chat %d (attempt %d/%d)",
|
||||
b.config.ID, missingFileID, chatID, attempt+1, maxFileNotFoundRetries)
|
||||
b.stripDeadFileIDFromMemory(chatID, missingFileID)
|
||||
if _, cleanupErr := b.markFilesPendingCleanup(ctx, chatID, []string{missingFileID}); cleanupErr != nil {
|
||||
ErrorLogger.Printf("[%s] mark files pending cleanup: %v", b.config.ID, cleanupErr)
|
||||
}
|
||||
params.Messages = b.prepareContextMessages(b.getOrCreateChatMemory(chatID))
|
||||
}
|
||||
return "", fmt.Errorf("max self-heal retries (%d) exceeded: too many file_ids gone from anthropic", maxFileNotFoundRetries)
|
||||
}
|
||||
|
||||
var sb strings.Builder
|
||||
for _, block := range resp.Content {
|
||||
switch block.Type {
|
||||
case "text":
|
||||
sb.WriteString(block.Text)
|
||||
case "mcp_tool_use":
|
||||
InfoLogger.Printf("[mcp] tool_use server=%q name=%q id=%q input=%s",
|
||||
block.ServerName, block.Name, block.ID, string(block.Input))
|
||||
case "mcp_tool_result":
|
||||
preview := block.JSON.Content.Raw()
|
||||
if len(preview) > 500 {
|
||||
preview = preview[:500] + "...(truncated)"
|
||||
// buildUserContext renders the per-turn context block that trails the cached
|
||||
// static system prompt. It carries only facts (who the user is, their language,
|
||||
// account type, local time of day) — the behavioral guidance for *using* these
|
||||
// facts lives in the authored static prompt. It is kept out of the cached block
|
||||
// because it changes on every request.
|
||||
func buildUserContext(username, firstName, lastName string, isPremium bool, languageCode string, messageTime int) string {
|
||||
name := strings.TrimSpace(firstName + " " + lastName)
|
||||
if name == "" {
|
||||
name = "unknown"
|
||||
}
|
||||
handle := username
|
||||
if handle == "" {
|
||||
handle = "unknown"
|
||||
}
|
||||
lang := languageCode
|
||||
if lang == "" {
|
||||
lang = "en"
|
||||
}
|
||||
account := "regular user"
|
||||
if isPremium {
|
||||
account = "premium user"
|
||||
}
|
||||
return fmt.Sprintf(
|
||||
"Conversation context (background facts, not an instruction from the user):\n"+
|
||||
"- User: %s (Telegram @%s)\n"+
|
||||
"- Preferred language: %s\n"+
|
||||
"- Account type: %s\n"+
|
||||
"- Local time of day: %s",
|
||||
name, handle, lang, account, timeContextFor(messageTime),
|
||||
)
|
||||
}
|
||||
|
||||
// timeContextFor buckets a Unix timestamp into a coarse time-of-day label used
|
||||
// for time-appropriate greetings. Uses the host's local timezone, matching the
|
||||
// bot's prior behavior.
|
||||
func timeContextFor(messageTime int) string {
|
||||
switch hour := time.Unix(int64(messageTime), 0).Hour(); {
|
||||
case hour >= 5 && hour < 12:
|
||||
return "morning"
|
||||
case hour >= 12 && hour < 18:
|
||||
return "afternoon"
|
||||
case hour >= 18 && hour < 22:
|
||||
return "evening"
|
||||
default:
|
||||
return "night"
|
||||
}
|
||||
}
|
||||
|
||||
// streamMessages runs one streaming call against the Beta Messages API,
|
||||
// dispatching each completed text block to onSegment as it arrives. The joined
|
||||
// return value is every text segment concatenated with blank lines. Errors from
|
||||
// the SDK are returned raw; the caller wraps them (model-not-found, file 404
|
||||
// self-heal, etc.).
|
||||
func (b *Bot) streamMessages(ctx context.Context, params anthropic.BetaMessageNewParams, onSegment func(string) error) (string, error) {
|
||||
stream := b.anthropicClient.Beta.Messages.NewStreaming(ctx, params)
|
||||
defer func() {
|
||||
if err := stream.Close(); err != nil {
|
||||
ErrorLogger.Printf("[stream] close failed: %v", err)
|
||||
}
|
||||
}()
|
||||
|
||||
// Per-block accumulators. Reset on content_block_start, consumed on
|
||||
// content_block_stop. Only one block is active at a time per the SSE
|
||||
// contract; SDK guarantees deltas arrive between matching start/stop.
|
||||
var (
|
||||
allSegments []string
|
||||
currentKind string
|
||||
currentText strings.Builder
|
||||
currentInputJSON strings.Builder
|
||||
currentTUseName, currentTUseServer, currentTUseID string
|
||||
currentTResultUseID, currentTResultServer string
|
||||
currentTResultIsError bool
|
||||
currentTResultContent string
|
||||
// tool_use_id -> {server, name, input}. Lives for one request; bounded
|
||||
// by the number of tool calls in the stream, so no eviction needed.
|
||||
mcpCalls = map[string]mcpCall{}
|
||||
)
|
||||
|
||||
for stream.Next() {
|
||||
e := stream.Current()
|
||||
switch e.Type {
|
||||
case "content_block_start":
|
||||
cbs := e.AsContentBlockStart()
|
||||
currentKind = cbs.ContentBlock.Type
|
||||
currentText.Reset()
|
||||
currentInputJSON.Reset()
|
||||
switch currentKind {
|
||||
case "mcp_tool_use":
|
||||
currentTUseName = cbs.ContentBlock.Name
|
||||
currentTUseServer = cbs.ContentBlock.ServerName
|
||||
currentTUseID = cbs.ContentBlock.ID
|
||||
case "mcp_tool_result":
|
||||
currentTResultUseID = cbs.ContentBlock.ToolUseID
|
||||
currentTResultServer = cbs.ContentBlock.ServerName
|
||||
currentTResultIsError = cbs.ContentBlock.IsError
|
||||
// Tool-result content arrives populated on start (server-side
|
||||
// pre-assembled), not via subsequent deltas like text/JSON.
|
||||
currentTResultContent = cbs.ContentBlock.JSON.Content.Raw()
|
||||
}
|
||||
InfoLogger.Printf("[mcp] tool_result tool_use_id=%q server=%q is_error=%v content=%s",
|
||||
block.ToolUseID, block.ServerName, block.IsError, preview)
|
||||
default:
|
||||
InfoLogger.Printf("[mcp] block type=%q (unhandled)", block.Type)
|
||||
|
||||
case "content_block_delta":
|
||||
cbd := e.AsContentBlockDelta()
|
||||
switch cbd.Delta.Type {
|
||||
case "text_delta":
|
||||
if currentKind == "text" {
|
||||
currentText.WriteString(cbd.Delta.Text)
|
||||
}
|
||||
case "input_json_delta":
|
||||
if currentKind == "mcp_tool_use" {
|
||||
currentInputJSON.WriteString(cbd.Delta.PartialJSON)
|
||||
}
|
||||
}
|
||||
|
||||
case "content_block_stop":
|
||||
switch currentKind {
|
||||
case "text":
|
||||
seg := strings.TrimSpace(currentText.String())
|
||||
if seg != "" {
|
||||
allSegments = append(allSegments, seg)
|
||||
if onSegment != nil {
|
||||
if cbErr := onSegment(seg); cbErr != nil {
|
||||
// Log but keep streaming — the model's response
|
||||
// is still inbound; we want it recorded even if
|
||||
// one Telegram send failed.
|
||||
ErrorLogger.Printf("[stream] onSegment failed: %v", cbErr)
|
||||
}
|
||||
}
|
||||
}
|
||||
case "mcp_tool_use":
|
||||
mcpCalls[currentTUseID] = mcpCall{
|
||||
server: currentTUseServer,
|
||||
name: currentTUseName,
|
||||
input: currentInputJSON.String(),
|
||||
}
|
||||
InfoLogger.Printf("[mcp] tool_use server=%q name=%q id=%q input=%s",
|
||||
currentTUseServer, currentTUseName, currentTUseID, currentInputJSON.String())
|
||||
case "mcp_tool_result":
|
||||
preview := currentTResultContent
|
||||
if len(preview) > 500 {
|
||||
preview = preview[:500] + "...(truncated)"
|
||||
}
|
||||
InfoLogger.Printf("[mcp] tool_result tool_use_id=%q server=%q is_error=%v content=%s",
|
||||
currentTResultUseID, currentTResultServer, currentTResultIsError, preview)
|
||||
if strings.Contains(currentTResultContent, mcpUnsupportedSentinel) {
|
||||
total := mcpUnsupportedCount.Add(1)
|
||||
call := mcpCalls[currentTResultUseID]
|
||||
ErrorLogger.Printf("[%s][mcp][unsupported] connector could not serialize result "+
|
||||
"(total=%d): server=%q tool=%q input=%s tool_use_id=%q",
|
||||
b.config.ID, total, call.server, call.name, call.input, currentTResultUseID)
|
||||
}
|
||||
default:
|
||||
if currentKind != "" {
|
||||
InfoLogger.Printf("[mcp] block type=%q (unhandled)", currentKind)
|
||||
}
|
||||
}
|
||||
currentKind = ""
|
||||
}
|
||||
}
|
||||
out := sb.String()
|
||||
if out == "" {
|
||||
|
||||
if err := stream.Err(); err != nil {
|
||||
return "", err
|
||||
}
|
||||
if len(allSegments) == 0 {
|
||||
return "", fmt.Errorf("unexpected response format from Anthropic")
|
||||
}
|
||||
return out, nil
|
||||
return strings.Join(allSegments, "\n\n"), nil
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user