This commit is contained in:
HugeFrog24
2026-05-25 23:39:40 +02:00
parent 7c74d91bbb
commit 4fc9d8a5c5
6 changed files with 144 additions and 101 deletions
+103 -34
View File
@@ -17,7 +17,13 @@ import (
// actionable message to admins/owners while keeping the response vague for regular users.
var ErrModelNotFound = errors.New("model not found or deprecated")
func (b *Bot) getAnthropicResponse(ctx context.Context, messages []anthropic.BetaMessageParam, isNewChat, isOwner, isEmojiOnly bool, username string, firstName string, lastName string, isPremium bool, languageCode string, messageTime int) ([]string, error) {
// getAnthropicResponse streams the model's response. Each completed text block
// is delivered to onSegment as soon as the model finishes writing it — so the
// caller can send segments to Telegram with natural rhythm around tool calls,
// rather than batched at the very end of the turn. onSegment may be nil for
// callers that only want the joined text (voice TTS, sticker reactions, etc.).
// The returned string is every text segment joined by blank lines.
func (b *Bot) getAnthropicResponse(ctx context.Context, messages []anthropic.BetaMessageParam, isNewChat, isOwner, isEmojiOnly bool, username string, firstName string, lastName string, isPremium bool, languageCode string, messageTime int, onSegment func(string) error) (string, error) {
// Use prompts from config
var systemMessage string
if isNewChat {
@@ -139,43 +145,106 @@ func (b *Bot) getAnthropicResponse(ctx context.Context, messages []anthropic.Bet
params.Betas = []anthropic.AnthropicBeta{anthropic.AnthropicBetaMCPClient2025_11_20}
}
resp, err := b.anthropicClient.Beta.Messages.New(ctx, params)
if err != nil {
var apiErr *anthropic.Error
if errors.As(err, &apiErr) && apiErr.StatusCode == http.StatusNotFound {
return nil, fmt.Errorf("%w: %s", ErrModelNotFound, b.config.Model)
stream := b.anthropicClient.Beta.Messages.NewStreaming(ctx, params)
defer func() {
if err := stream.Close(); err != nil {
ErrorLogger.Printf("[stream] close failed: %v", err)
}
}()
// Per-block accumulators. Reset on content_block_start, consumed on
// content_block_stop. Only one block is active at a time per the SSE
// contract; SDK guarantees deltas arrive between matching start/stop.
var (
allSegments []string
currentKind string
currentText strings.Builder
currentInputJSON strings.Builder
currentTUseName, currentTUseServer, currentTUseID string
currentTResultUseID, currentTResultServer string
currentTResultIsError bool
currentTResultContent string
)
for stream.Next() {
e := stream.Current()
switch e.Type {
case "content_block_start":
cbs := e.AsContentBlockStart()
currentKind = cbs.ContentBlock.Type
currentText.Reset()
currentInputJSON.Reset()
switch currentKind {
case "mcp_tool_use":
currentTUseName = cbs.ContentBlock.Name
currentTUseServer = cbs.ContentBlock.ServerName
currentTUseID = cbs.ContentBlock.ID
case "mcp_tool_result":
currentTResultUseID = cbs.ContentBlock.ToolUseID
currentTResultServer = cbs.ContentBlock.ServerName
currentTResultIsError = cbs.ContentBlock.IsError
// Tool-result content arrives populated on start (server-side
// pre-assembled), not via subsequent deltas like text/JSON.
currentTResultContent = cbs.ContentBlock.JSON.Content.Raw()
}
case "content_block_delta":
cbd := e.AsContentBlockDelta()
switch cbd.Delta.Type {
case "text_delta":
if currentKind == "text" {
currentText.WriteString(cbd.Delta.Text)
}
case "input_json_delta":
if currentKind == "mcp_tool_use" {
currentInputJSON.WriteString(cbd.Delta.PartialJSON)
}
}
case "content_block_stop":
switch currentKind {
case "text":
seg := strings.TrimSpace(currentText.String())
if seg != "" {
allSegments = append(allSegments, seg)
if onSegment != nil {
if cbErr := onSegment(seg); cbErr != nil {
// Log but keep streaming — the model's response
// is still inbound; we want it recorded even if
// one Telegram send failed.
ErrorLogger.Printf("[stream] onSegment failed: %v", cbErr)
}
}
}
case "mcp_tool_use":
InfoLogger.Printf("[mcp] tool_use server=%q name=%q id=%q input=%s",
currentTUseServer, currentTUseName, currentTUseID, currentInputJSON.String())
case "mcp_tool_result":
preview := currentTResultContent
if len(preview) > 500 {
preview = preview[:500] + "...(truncated)"
}
InfoLogger.Printf("[mcp] tool_result tool_use_id=%q server=%q is_error=%v content=%s",
currentTResultUseID, currentTResultServer, currentTResultIsError, preview)
default:
if currentKind != "" {
InfoLogger.Printf("[mcp] block type=%q (unhandled)", currentKind)
}
}
currentKind = ""
}
return nil, fmt.Errorf("error creating Anthropic message: %w", err)
}
// Collect text blocks as separate segments so the Telegram delivery layer
// can render each as its own message (matches the conversational rhythm
// Claude uses around tool calls). Callers that want one joined string
// (voice TTS, stickers) do strings.Join themselves.
var segments []string
for _, block := range resp.Content {
switch block.Type {
case "text":
t := strings.TrimSpace(block.Text)
if t != "" {
segments = append(segments, t)
}
case "mcp_tool_use":
InfoLogger.Printf("[mcp] tool_use server=%q name=%q id=%q input=%s",
block.ServerName, block.Name, block.ID, string(block.Input))
case "mcp_tool_result":
preview := block.JSON.Content.Raw()
if len(preview) > 500 {
preview = preview[:500] + "...(truncated)"
}
InfoLogger.Printf("[mcp] tool_result tool_use_id=%q server=%q is_error=%v content=%s",
block.ToolUseID, block.ServerName, block.IsError, preview)
default:
InfoLogger.Printf("[mcp] block type=%q (unhandled)", block.Type)
if err := stream.Err(); err != nil {
var apiErr *anthropic.Error
if errors.As(err, &apiErr) && apiErr.StatusCode == http.StatusNotFound {
return "", fmt.Errorf("%w: %s", ErrModelNotFound, b.config.Model)
}
return "", fmt.Errorf("error creating Anthropic message: %w", err)
}
if len(segments) == 0 {
return nil, fmt.Errorf("unexpected response format from Anthropic")
if len(allSegments) == 0 {
return "", fmt.Errorf("unexpected response format from Anthropic")
}
return segments, nil
return strings.Join(allSegments, "\n\n"), nil
}