diff --git a/.cursor/rules/snyk_rules.mdc b/.cursor/rules/snyk_rules.mdc deleted file mode 100644 index eef0634..0000000 --- a/.cursor/rules/snyk_rules.mdc +++ /dev/null @@ -1,14 +0,0 @@ ---- -alwaysApply: true -always_on: true -trigger: always_on -applyTo: "**" -description: Snyk Security At Inception ---- - -# Project security best practices - -- Always run snyk_code_scan tool for new first party code that is generated in a Snyk-supported language. -- If any security issues are found based on newly introduced or modified code or dependencies, attempt to fix the issues using the results context from Snyk. -- Rescan the code after fixing the issues to ensure that the issues were fixed and that there are no newly introduced issues. -- Repeat this process until no new issues are found. diff --git a/.roo/mcp.json b/.roo/mcp.json deleted file mode 100644 index 7001130..0000000 --- a/.roo/mcp.json +++ /dev/null @@ -1,3 +0,0 @@ -{ - "mcpServers": {} -} \ No newline at end of file diff --git a/anthropic.go b/anthropic.go index 44f9703..8346856 100644 --- a/anthropic.go +++ b/anthropic.go @@ -17,7 +17,13 @@ import ( // actionable message to admins/owners while keeping the response vague for regular users. var ErrModelNotFound = errors.New("model not found or deprecated") -func (b *Bot) getAnthropicResponse(ctx context.Context, messages []anthropic.BetaMessageParam, isNewChat, isOwner, isEmojiOnly bool, username string, firstName string, lastName string, isPremium bool, languageCode string, messageTime int) ([]string, error) { +// getAnthropicResponse streams the model's response. Each completed text block +// is delivered to onSegment as soon as the model finishes writing it — so the +// caller can send segments to Telegram with natural rhythm around tool calls, +// rather than batched at the very end of the turn. onSegment may be nil for +// callers that only want the joined text (voice TTS, sticker reactions, etc.). +// The returned string is every text segment joined by blank lines. +func (b *Bot) getAnthropicResponse(ctx context.Context, messages []anthropic.BetaMessageParam, isNewChat, isOwner, isEmojiOnly bool, username string, firstName string, lastName string, isPremium bool, languageCode string, messageTime int, onSegment func(string) error) (string, error) { // Use prompts from config var systemMessage string if isNewChat { @@ -139,43 +145,106 @@ func (b *Bot) getAnthropicResponse(ctx context.Context, messages []anthropic.Bet params.Betas = []anthropic.AnthropicBeta{anthropic.AnthropicBetaMCPClient2025_11_20} } - resp, err := b.anthropicClient.Beta.Messages.New(ctx, params) - if err != nil { - var apiErr *anthropic.Error - if errors.As(err, &apiErr) && apiErr.StatusCode == http.StatusNotFound { - return nil, fmt.Errorf("%w: %s", ErrModelNotFound, b.config.Model) + stream := b.anthropicClient.Beta.Messages.NewStreaming(ctx, params) + defer func() { + if err := stream.Close(); err != nil { + ErrorLogger.Printf("[stream] close failed: %v", err) + } + }() + + // Per-block accumulators. Reset on content_block_start, consumed on + // content_block_stop. Only one block is active at a time per the SSE + // contract; SDK guarantees deltas arrive between matching start/stop. + var ( + allSegments []string + currentKind string + currentText strings.Builder + currentInputJSON strings.Builder + currentTUseName, currentTUseServer, currentTUseID string + currentTResultUseID, currentTResultServer string + currentTResultIsError bool + currentTResultContent string + ) + + for stream.Next() { + e := stream.Current() + switch e.Type { + case "content_block_start": + cbs := e.AsContentBlockStart() + currentKind = cbs.ContentBlock.Type + currentText.Reset() + currentInputJSON.Reset() + switch currentKind { + case "mcp_tool_use": + currentTUseName = cbs.ContentBlock.Name + currentTUseServer = cbs.ContentBlock.ServerName + currentTUseID = cbs.ContentBlock.ID + case "mcp_tool_result": + currentTResultUseID = cbs.ContentBlock.ToolUseID + currentTResultServer = cbs.ContentBlock.ServerName + currentTResultIsError = cbs.ContentBlock.IsError + // Tool-result content arrives populated on start (server-side + // pre-assembled), not via subsequent deltas like text/JSON. + currentTResultContent = cbs.ContentBlock.JSON.Content.Raw() + } + + case "content_block_delta": + cbd := e.AsContentBlockDelta() + switch cbd.Delta.Type { + case "text_delta": + if currentKind == "text" { + currentText.WriteString(cbd.Delta.Text) + } + case "input_json_delta": + if currentKind == "mcp_tool_use" { + currentInputJSON.WriteString(cbd.Delta.PartialJSON) + } + } + + case "content_block_stop": + switch currentKind { + case "text": + seg := strings.TrimSpace(currentText.String()) + if seg != "" { + allSegments = append(allSegments, seg) + if onSegment != nil { + if cbErr := onSegment(seg); cbErr != nil { + // Log but keep streaming — the model's response + // is still inbound; we want it recorded even if + // one Telegram send failed. + ErrorLogger.Printf("[stream] onSegment failed: %v", cbErr) + } + } + } + case "mcp_tool_use": + InfoLogger.Printf("[mcp] tool_use server=%q name=%q id=%q input=%s", + currentTUseServer, currentTUseName, currentTUseID, currentInputJSON.String()) + case "mcp_tool_result": + preview := currentTResultContent + if len(preview) > 500 { + preview = preview[:500] + "...(truncated)" + } + InfoLogger.Printf("[mcp] tool_result tool_use_id=%q server=%q is_error=%v content=%s", + currentTResultUseID, currentTResultServer, currentTResultIsError, preview) + default: + if currentKind != "" { + InfoLogger.Printf("[mcp] block type=%q (unhandled)", currentKind) + } + } + currentKind = "" } - return nil, fmt.Errorf("error creating Anthropic message: %w", err) } - // Collect text blocks as separate segments so the Telegram delivery layer - // can render each as its own message (matches the conversational rhythm - // Claude uses around tool calls). Callers that want one joined string - // (voice TTS, stickers) do strings.Join themselves. - var segments []string - for _, block := range resp.Content { - switch block.Type { - case "text": - t := strings.TrimSpace(block.Text) - if t != "" { - segments = append(segments, t) - } - case "mcp_tool_use": - InfoLogger.Printf("[mcp] tool_use server=%q name=%q id=%q input=%s", - block.ServerName, block.Name, block.ID, string(block.Input)) - case "mcp_tool_result": - preview := block.JSON.Content.Raw() - if len(preview) > 500 { - preview = preview[:500] + "...(truncated)" - } - InfoLogger.Printf("[mcp] tool_result tool_use_id=%q server=%q is_error=%v content=%s", - block.ToolUseID, block.ServerName, block.IsError, preview) - default: - InfoLogger.Printf("[mcp] block type=%q (unhandled)", block.Type) + if err := stream.Err(); err != nil { + var apiErr *anthropic.Error + if errors.As(err, &apiErr) && apiErr.StatusCode == http.StatusNotFound { + return "", fmt.Errorf("%w: %s", ErrModelNotFound, b.config.Model) } + return "", fmt.Errorf("error creating Anthropic message: %w", err) } - if len(segments) == 0 { - return nil, fmt.Errorf("unexpected response format from Anthropic") + + if len(allSegments) == 0 { + return "", fmt.Errorf("unexpected response format from Anthropic") } - return segments, nil + return strings.Join(allSegments, "\n\n"), nil } diff --git a/bot.go b/bot.go index fc3c89a..2a8f493 100644 --- a/bot.go +++ b/bot.go @@ -449,41 +449,24 @@ func (b *Bot) sendResponse(ctx context.Context, chatID int64, text string, busin return nil } -// sendMultiResponse delivers a multi-block LLM response as separate Telegram -// messages while keeping a single logical assistant turn in storage. The DB -// row and chat memory hold the joined text (segments separated by blank lines), -// so the model's next-turn context sees one assistant turn — matching today's -// 1-reply-per-prompt invariant — even though the user saw N bubbles. -// -// Partial send failures (a later segment fails after earlier ones succeeded) -// are logged but do not abort the remaining sends. The DB record is canonical: -// the model's next turn will reference what it intended to say. -func (b *Bot) sendMultiResponse(ctx context.Context, chatID int64, segments []string, businessConnectionID string) error { - if len(segments) == 0 { - return nil +// sendOneSegment delivers a single Telegram message without touching storage +// or chat memory. Used by the streaming response path: each completed text +// block fires this helper as it arrives, and the full turn is recorded once +// at end-of-stream via screenOutgoingMessage. Keeps the 1-reply-per-prompt +// storage invariant while letting the user see segments with natural rhythm. +func (b *Bot) sendOneSegment(ctx context.Context, chatID int64, text, businessConnectionID string) error { + params := &bot.SendMessageParams{ + ChatID: chatID, + Text: text, } - - fullText := strings.Join(segments, "\n\n") - if _, err := b.screenOutgoingMessage(chatID, fullText); err != nil { - ErrorLogger.Printf("Error storing assistant message: %v", err) + if businessConnectionID != "" { + params.BusinessConnectionID = businessConnectionID + } + if _, err := b.tgBot.SendMessage(ctx, params); err != nil { + ErrorLogger.Printf("[%s] Error sending segment to chat %d with BusinessConnectionID %s: %v", + b.config.ID, chatID, businessConnectionID, err) return err } - - for i, seg := range segments { - params := &bot.SendMessageParams{ - ChatID: chatID, - Text: seg, - } - if businessConnectionID != "" { - params.BusinessConnectionID = businessConnectionID - } - if _, err := b.tgBot.SendMessage(ctx, params); err != nil { - ErrorLogger.Printf("[%s] Error sending segment %d/%d to chat %d with BusinessConnectionID %s: %v", - b.config.ID, i+1, len(segments), chatID, businessConnectionID, err) - // Keep going: earlier segments are already in the user's chat, - // and the DB has the full turn recorded. - } - } return nil } diff --git a/go-telegram-bot.exe b/go-telegram-bot.exe index 365d95d..358a5d4 100644 Binary files a/go-telegram-bot.exe and b/go-telegram-bot.exe differ diff --git a/handlers.go b/handlers.go index dce7998..f2f9534 100644 --- a/handlers.go +++ b/handlers.go @@ -55,7 +55,10 @@ func (b *Bot) handleVoiceMessage(ctx context.Context, message *models.Message, u chatMemory := b.getOrCreateChatMemory(chatID) contextMessages := b.prepareContextMessages(chatMemory) - segments, err := b.getAnthropicResponse(ctx, contextMessages, isNewChat, isOwner, false, username, firstName, lastName, isPremium, languageCode, messageTime) + // Voice path passes nil for onSegment: tool-call narration across multiple + // TTS clips would be jarring, so we accumulate everything and synthesize one + // audio clip from the joined text. + response, err := b.getAnthropicResponse(ctx, contextMessages, isNewChat, isOwner, false, username, firstName, lastName, isPremium, languageCode, messageTime, nil) if err != nil { ErrorLogger.Printf("Error getting Anthropic response for voice: %v", err) if err := b.sendResponse(ctx, chatID, b.anthropicErrorResponse(err, userID), businessConnectionID); err != nil { @@ -64,10 +67,6 @@ func (b *Bot) handleVoiceMessage(ctx context.Context, message *models.Message, u return } - // Voice replies always synthesize as one audio clip — tool-call narration - // across multiple TTS clips would be jarring, so we join here. - response := strings.Join(segments, "\n\n") - audioReader, err := b.generateSpeech(ctx, response) if err != nil { // TTS failed — fall back to text so the user still gets a reply. @@ -365,22 +364,31 @@ func (b *Bot) handleUpdate(ctx context.Context, tgBot *bot.Bot, update *models.U // Determine if the text contains only emojis isEmojiOnly := isOnlyEmojis(text) - // Get response from Anthropic - segments, err := b.getAnthropicResponse(ctx, contextMessages, isNewChatFlag, isOwner, isEmojiOnly, username, firstName, lastName, isPremium, languageCode, messageTime) + // Stream Anthropic's reply, sending each completed text block to Telegram + // as it arrives — gives the conversational rhythm Claude uses around tool + // calls (text → pause for tool → text → pause → text), rather than a long + // upfront wait followed by all bubbles at once. + joined, err := b.getAnthropicResponse( + ctx, contextMessages, isNewChatFlag, isOwner, isEmojiOnly, + username, firstName, lastName, isPremium, languageCode, messageTime, + func(seg string) error { + return b.sendOneSegment(ctx, chatID, seg, businessConnectionID) + }, + ) if err != nil { ErrorLogger.Printf("Error getting Anthropic response: %v", err) // Errors go out as a single message — no need to fan out a one-line error. - if err := b.sendResponse(ctx, chatID, b.anthropicErrorResponse(err, userID), businessConnectionID); err != nil { - ErrorLogger.Printf("Error sending response: %v", err) + if sendErr := b.sendResponse(ctx, chatID, b.anthropicErrorResponse(err, userID), businessConnectionID); sendErr != nil { + ErrorLogger.Printf("Error sending response: %v", sendErr) } return } - // Successful LLM reply: deliver each text block as its own Telegram bubble, - // matching the conversational rhythm Claude uses around tool calls. - if err := b.sendMultiResponse(ctx, chatID, segments, businessConnectionID); err != nil { - ErrorLogger.Printf("Error sending response: %v", err) - return + // Record the full turn once, at end-of-stream. Same 1-reply-per-prompt + // invariant as the non-streaming path: one DB row, one answered_on stamp, + // one chat-memory entry containing the joined segments. + if _, storeErr := b.screenOutgoingMessage(chatID, joined); storeErr != nil { + ErrorLogger.Printf("Error recording assistant turn: %v", storeErr) } } @@ -419,13 +427,13 @@ func (b *Bot) generateStickerResponse(ctx context.Context, message Message, cont // "Sent a sticker: "), so the full conversation history is preserved. if message.StickerFileID != "" { messageTime := int(message.Timestamp.Unix()) - segments, err := b.getAnthropicResponse(ctx, contextMessages, false, false, true, message.Username, "", "", false, "", messageTime) + // Sticker reactions are casual chit-chat; tool use is unusual here, so + // pass nil for onSegment and return the joined text for a single bubble. + response, err := b.getAnthropicResponse(ctx, contextMessages, false, false, true, message.Username, "", "", false, "", messageTime, nil) if err != nil { return "", err } - // Sticker reactions are casual chit-chat; tool use is unusual here, so - // join into one message rather than fanning out as multiple bubbles. - return strings.Join(segments, "\n\n"), nil + return response, nil } return "Hmm, that's interesting!", nil