1// Package agent is the core orchestration layer for Crush AI agents.
2//
3// It provides session-based AI agent functionality for managing
4// conversations, tool execution, and message handling. It coordinates
5// interactions between language models, messages, sessions, and tools while
6// handling features like automatic summarization, queuing, and token
7// management.
8package agent
9
10import (
11 "cmp"
12 "context"
13 _ "embed"
14 "encoding/base64"
15 "errors"
16 "fmt"
17 "log/slog"
18 "net/http"
19 "os"
20 "regexp"
21 "strconv"
22 "strings"
23 "sync"
24 "time"
25
26 "charm.land/catwalk/pkg/catwalk"
27 "charm.land/fantasy"
28 "charm.land/fantasy/providers/anthropic"
29 "charm.land/fantasy/providers/bedrock"
30 "charm.land/fantasy/providers/google"
31 "charm.land/fantasy/providers/openai"
32 "charm.land/fantasy/providers/openrouter"
33 "charm.land/fantasy/providers/vercel"
34 "charm.land/lipgloss/v2"
35 "github.com/charmbracelet/crush/internal/agent/hyper"
36 "github.com/charmbracelet/crush/internal/agent/notify"
37 "github.com/charmbracelet/crush/internal/agent/tools"
38 "github.com/charmbracelet/crush/internal/agent/tools/mcp"
39 "github.com/charmbracelet/crush/internal/config"
40 "github.com/charmbracelet/crush/internal/csync"
41 "github.com/charmbracelet/crush/internal/message"
42 "github.com/charmbracelet/crush/internal/pubsub"
43 "github.com/charmbracelet/crush/internal/session"
44 "github.com/charmbracelet/crush/internal/stringext"
45 "github.com/charmbracelet/crush/internal/version"
46 "github.com/charmbracelet/x/exp/charmtone"
47)
48
49const (
50 DefaultSessionName = "Untitled Session"
51
52 // Constants for auto-summarization thresholds
53 largeContextWindowThreshold = 200_000
54 largeContextWindowBuffer = 20_000
55 smallContextWindowRatio = 0.2
56)
57
58var userAgent = fmt.Sprintf("Charm-Crush/%s (https://charm.land/crush)", version.Version)
59
60//go:embed templates/title.md
61var titlePrompt []byte
62
63//go:embed templates/summary.md
64var summaryPrompt []byte
65
66// Used to remove <think> tags from generated titles.
67var (
68 thinkTagRegex = regexp.MustCompile(`(?s)<think>.*?</think>`)
69 orphanThinkTagRegex = regexp.MustCompile(`</?think>`)
70)
71
72type SessionAgentCall struct {
73 SessionID string
74 Prompt string
75 ProviderOptions fantasy.ProviderOptions
76 Attachments []message.Attachment
77 MaxOutputTokens int64
78 Temperature *float64
79 TopP *float64
80 TopK *int64
81 FrequencyPenalty *float64
82 PresencePenalty *float64
83 NonInteractive bool
84}
85
86type SessionAgent interface {
87 Run(context.Context, SessionAgentCall) (*fantasy.AgentResult, error)
88 SetModels(large Model, small Model)
89 SetTools(tools []fantasy.AgentTool)
90 SetSystemPrompt(systemPrompt string)
91 Cancel(sessionID string)
92 CancelAll()
93 IsSessionBusy(sessionID string) bool
94 IsBusy() bool
95 QueuedPrompts(sessionID string) int
96 QueuedPromptsList(sessionID string) []string
97 ClearQueue(sessionID string)
98 Summarize(context.Context, string, fantasy.ProviderOptions) error
99 Model() Model
100}
101
102type Model struct {
103 Model fantasy.LanguageModel
104 CatwalkCfg catwalk.Model
105 ModelCfg config.SelectedModel
106 FlatRate bool
107}
108
109type sessionAgent struct {
110 largeModel *csync.Value[Model]
111 smallModel *csync.Value[Model]
112 systemPromptPrefix *csync.Value[string]
113 systemPrompt *csync.Value[string]
114 tools *csync.Slice[fantasy.AgentTool]
115
116 isSubAgent bool
117 sessions session.Service
118 messages message.Service
119 disableAutoSummarize bool
120 isYolo bool
121 notify pubsub.Publisher[notify.Notification]
122
123 messageQueue *csync.Map[string, []SessionAgentCall]
124 activeRequests *csync.Map[string, context.CancelFunc]
125}
126
127type SessionAgentOptions struct {
128 LargeModel Model
129 SmallModel Model
130 SystemPromptPrefix string
131 SystemPrompt string
132 IsSubAgent bool
133 DisableAutoSummarize bool
134 IsYolo bool
135 Sessions session.Service
136 Messages message.Service
137 Tools []fantasy.AgentTool
138 Notify pubsub.Publisher[notify.Notification]
139}
140
141func NewSessionAgent(
142 opts SessionAgentOptions,
143) SessionAgent {
144 return &sessionAgent{
145 largeModel: csync.NewValue(opts.LargeModel),
146 smallModel: csync.NewValue(opts.SmallModel),
147 systemPromptPrefix: csync.NewValue(opts.SystemPromptPrefix),
148 systemPrompt: csync.NewValue(opts.SystemPrompt),
149 isSubAgent: opts.IsSubAgent,
150 sessions: opts.Sessions,
151 messages: opts.Messages,
152 disableAutoSummarize: opts.DisableAutoSummarize,
153 tools: csync.NewSliceFrom(opts.Tools),
154 isYolo: opts.IsYolo,
155 notify: opts.Notify,
156 messageQueue: csync.NewMap[string, []SessionAgentCall](),
157 activeRequests: csync.NewMap[string, context.CancelFunc](),
158 }
159}
160
161func (a *sessionAgent) Run(ctx context.Context, call SessionAgentCall) (*fantasy.AgentResult, error) {
162 if call.Prompt == "" && !message.ContainsTextAttachment(call.Attachments) {
163 return nil, ErrEmptyPrompt
164 }
165 if call.SessionID == "" {
166 return nil, ErrSessionMissing
167 }
168
169 // Queue the message if busy
170 if a.IsSessionBusy(call.SessionID) {
171 existing, ok := a.messageQueue.Get(call.SessionID)
172 if !ok {
173 existing = []SessionAgentCall{}
174 }
175 existing = append(existing, call)
176 a.messageQueue.Set(call.SessionID, existing)
177 return nil, nil
178 }
179
180 // Copy mutable fields under lock to avoid races with SetTools/SetModels.
181 agentTools := a.tools.Copy()
182 largeModel := a.largeModel.Get()
183 systemPrompt := a.systemPrompt.Get()
184 promptPrefix := a.systemPromptPrefix.Get()
185 var instructions strings.Builder
186
187 for _, server := range mcp.GetStates() {
188 if server.State != mcp.StateConnected {
189 continue
190 }
191 if s := server.Client.InitializeResult().Instructions; s != "" {
192 instructions.WriteString(s)
193 instructions.WriteString("\n\n")
194 }
195 }
196
197 if s := instructions.String(); s != "" {
198 systemPrompt += "\n\n<mcp-instructions>\n" + s + "\n</mcp-instructions>"
199 }
200
201 if len(agentTools) > 0 {
202 // Add Anthropic caching to the last tool.
203 agentTools[len(agentTools)-1].SetProviderOptions(a.getCacheControlOptions())
204 }
205
206 agent := fantasy.NewAgent(
207 largeModel.Model,
208 fantasy.WithSystemPrompt(systemPrompt),
209 fantasy.WithTools(agentTools...),
210 fantasy.WithUserAgent(userAgent),
211 )
212
213 sessionLock := sync.Mutex{}
214 currentSession, err := a.sessions.Get(ctx, call.SessionID)
215 if err != nil {
216 return nil, fmt.Errorf("failed to get session: %w", err)
217 }
218
219 msgs, err := a.getSessionMessages(ctx, currentSession)
220 if err != nil {
221 return nil, fmt.Errorf("failed to get session messages: %w", err)
222 }
223
224 var wg sync.WaitGroup
225 // Generate title if first message.
226 if len(msgs) == 0 {
227 titleCtx := ctx // Copy to avoid race with ctx reassignment below.
228 wg.Go(func() {
229 a.generateTitle(titleCtx, call.SessionID, call.Prompt)
230 })
231 }
232 defer wg.Wait()
233
234 // Add the user message to the session.
235 _, err = a.createUserMessage(ctx, call)
236 if err != nil {
237 return nil, err
238 }
239
240 // Add the session to the context.
241 ctx = context.WithValue(ctx, tools.SessionIDContextKey, call.SessionID)
242
243 genCtx, cancel := context.WithCancel(ctx)
244 a.activeRequests.Set(call.SessionID, cancel)
245
246 defer cancel()
247 defer a.activeRequests.Del(call.SessionID)
248 // Drain any debounced message updates before returning. message.Service
249 // already flushes synchronously on terminal updates, but a defer here
250 // guarantees the contract at every Run exit (success, error, panic
251 // recovery upstream) without callers needing to know.
252 defer func() {
253 if flushErr := a.messages.FlushAll(ctx); flushErr != nil {
254 slog.Error("Failed to flush pending message updates after run", "error", flushErr)
255 }
256 }()
257
258 history, files := a.preparePrompt(msgs, largeModel.CatwalkCfg.SupportsImages, call.Attachments...)
259
260 startTime := time.Now()
261 a.eventPromptSent(call.SessionID)
262
263 var currentAssistant *message.Message
264 var shouldSummarize bool
265 // Don't send MaxOutputTokens if 0 — some providers (e.g. LM Studio) reject it
266 var maxOutputTokens *int64
267 if call.MaxOutputTokens > 0 {
268 maxOutputTokens = &call.MaxOutputTokens
269 }
270 result, err := agent.Stream(genCtx, fantasy.AgentStreamCall{
271 Prompt: message.PromptWithTextAttachments(call.Prompt, call.Attachments),
272 Files: files,
273 Messages: history,
274 ProviderOptions: call.ProviderOptions,
275 MaxOutputTokens: maxOutputTokens,
276 TopP: call.TopP,
277 Temperature: call.Temperature,
278 PresencePenalty: call.PresencePenalty,
279 TopK: call.TopK,
280 FrequencyPenalty: call.FrequencyPenalty,
281 PrepareStep: func(callContext context.Context, options fantasy.PrepareStepFunctionOptions) (_ context.Context, prepared fantasy.PrepareStepResult, err error) {
282 prepared.Messages = options.Messages
283 for i := range prepared.Messages {
284 prepared.Messages[i].ProviderOptions = nil
285 }
286
287 // Use latest tools (updated by SetTools when MCP tools change).
288 prepared.Tools = a.tools.Copy()
289
290 queuedCalls, _ := a.messageQueue.Get(call.SessionID)
291 a.messageQueue.Del(call.SessionID)
292 for _, queued := range queuedCalls {
293 userMessage, createErr := a.createUserMessage(callContext, queued)
294 if createErr != nil {
295 return callContext, prepared, createErr
296 }
297 prepared.Messages = append(prepared.Messages, userMessage.ToAIMessage()...)
298 }
299
300 prepared.Messages = a.workaroundProviderMediaLimitations(prepared.Messages, largeModel)
301
302 lastSystemRoleInx := 0
303 systemMessageUpdated := false
304 for i, msg := range prepared.Messages {
305 // Only add cache control to the last message.
306 if msg.Role == fantasy.MessageRoleSystem {
307 lastSystemRoleInx = i
308 } else if !systemMessageUpdated {
309 prepared.Messages[lastSystemRoleInx].ProviderOptions = a.getCacheControlOptions()
310 systemMessageUpdated = true
311 }
312 // Than add cache control to the last 2 messages.
313 if i > len(prepared.Messages)-3 {
314 prepared.Messages[i].ProviderOptions = a.getCacheControlOptions()
315 }
316 }
317
318 if promptPrefix != "" {
319 prepared.Messages = append([]fantasy.Message{fantasy.NewSystemMessage(promptPrefix)}, prepared.Messages...)
320 }
321
322 var assistantMsg message.Message
323 assistantMsg, err = a.messages.Create(callContext, call.SessionID, message.CreateMessageParams{
324 Role: message.Assistant,
325 Parts: []message.ContentPart{},
326 Model: largeModel.ModelCfg.Model,
327 Provider: largeModel.ModelCfg.Provider,
328 })
329 if err != nil {
330 return callContext, prepared, err
331 }
332 callContext = context.WithValue(callContext, tools.MessageIDContextKey, assistantMsg.ID)
333 callContext = context.WithValue(callContext, tools.SupportsImagesContextKey, largeModel.CatwalkCfg.SupportsImages)
334 callContext = context.WithValue(callContext, tools.ModelNameContextKey, largeModel.CatwalkCfg.Name)
335 currentAssistant = &assistantMsg
336 return callContext, prepared, err
337 },
338 OnReasoningStart: func(id string, reasoning fantasy.ReasoningContent) error {
339 currentAssistant.AppendReasoningContent(reasoning.Text)
340 return a.messages.Update(genCtx, *currentAssistant)
341 },
342 OnReasoningDelta: func(id string, text string) error {
343 currentAssistant.AppendReasoningContent(text)
344 return a.messages.Update(genCtx, *currentAssistant)
345 },
346 OnReasoningEnd: func(id string, reasoning fantasy.ReasoningContent) error {
347 // handle anthropic signature
348 if anthropicData, ok := reasoning.ProviderMetadata[anthropic.Name]; ok {
349 if reasoning, ok := anthropicData.(*anthropic.ReasoningOptionMetadata); ok {
350 currentAssistant.AppendReasoningSignature(reasoning.Signature)
351 }
352 }
353 if googleData, ok := reasoning.ProviderMetadata[google.Name]; ok {
354 if reasoning, ok := googleData.(*google.ReasoningMetadata); ok {
355 currentAssistant.AppendThoughtSignature(reasoning.Signature, reasoning.ToolID)
356 }
357 }
358 if openaiData, ok := reasoning.ProviderMetadata[openai.Name]; ok {
359 if reasoning, ok := openaiData.(*openai.ResponsesReasoningMetadata); ok {
360 currentAssistant.SetReasoningResponsesData(reasoning)
361 }
362 }
363 currentAssistant.FinishThinking()
364 return a.messages.Update(genCtx, *currentAssistant)
365 },
366 OnTextDelta: func(id string, text string) error {
367 // Strip leading newline from initial text content. This is is
368 // particularly important in non-interactive mode where leading
369 // newlines are very visible.
370 if len(currentAssistant.Parts) == 0 {
371 text = strings.TrimPrefix(text, "\n")
372 }
373
374 currentAssistant.AppendContent(text)
375 return a.messages.Update(genCtx, *currentAssistant)
376 },
377 OnToolInputStart: func(id string, toolName string) error {
378 toolCall := message.ToolCall{
379 ID: id,
380 Name: toolName,
381 ProviderExecuted: false,
382 Finished: false,
383 }
384 currentAssistant.AddToolCall(toolCall)
385 // Use parent ctx instead of genCtx to ensure the update succeeds
386 // even if the request is canceled mid-stream
387 return a.messages.Update(ctx, *currentAssistant)
388 },
389 OnRetry: func(err *fantasy.ProviderError, delay time.Duration) {
390 slog.Warn("Provider request failed, retrying", providerRetryLogFields(err, delay)...)
391 },
392 OnToolCall: func(tc fantasy.ToolCallContent) error {
393 toolCall := message.ToolCall{
394 ID: tc.ToolCallID,
395 Name: tc.ToolName,
396 Input: tc.Input,
397 ProviderExecuted: false,
398 Finished: true,
399 }
400 currentAssistant.AddToolCall(toolCall)
401 // Use parent ctx instead of genCtx to ensure the update succeeds
402 // even if the request is canceled mid-stream
403 return a.messages.Update(ctx, *currentAssistant)
404 },
405 OnToolResult: func(result fantasy.ToolResultContent) error {
406 toolResult := a.convertToToolResult(result)
407 // Use parent ctx instead of genCtx to ensure the message is created
408 // even if the request is canceled mid-stream
409 _, createMsgErr := a.messages.Create(ctx, currentAssistant.SessionID, message.CreateMessageParams{
410 Role: message.Tool,
411 Parts: []message.ContentPart{
412 toolResult,
413 },
414 })
415 return createMsgErr
416 },
417 OnStepFinish: func(stepResult fantasy.StepResult) error {
418 finishReason := message.FinishReasonUnknown
419 switch stepResult.FinishReason {
420 case fantasy.FinishReasonLength:
421 finishReason = message.FinishReasonMaxTokens
422 case fantasy.FinishReasonStop:
423 finishReason = message.FinishReasonEndTurn
424 case fantasy.FinishReasonToolCalls:
425 finishReason = message.FinishReasonToolUse
426 }
427 // If a tool result halted the turn (e.g. a hook halt or a
428 // permission denial), the step ends on FinishReasonToolCalls but
429 // the model will not be called again. Treat it as the end of the
430 // turn so the UI can render the assistant footer.
431 if finishReason == message.FinishReasonToolUse {
432 for _, tr := range stepResult.Content.ToolResults() {
433 if tr.StopTurn {
434 finishReason = message.FinishReasonEndTurn
435 break
436 }
437 }
438 }
439 currentAssistant.AddFinish(finishReason, "", "")
440 sessionLock.Lock()
441 defer sessionLock.Unlock()
442
443 updatedSession, getSessionErr := a.sessions.Get(ctx, call.SessionID)
444 if getSessionErr != nil {
445 return getSessionErr
446 }
447 a.updateSessionUsage(largeModel, &updatedSession, stepResult.Usage, a.openrouterCost(stepResult.ProviderMetadata))
448 _, sessionErr := a.sessions.Save(ctx, updatedSession)
449 if sessionErr != nil {
450 return sessionErr
451 }
452 currentSession = updatedSession
453 return a.messages.Update(genCtx, *currentAssistant)
454 },
455 StopWhen: []fantasy.StopCondition{
456 func(_ []fantasy.StepResult) bool {
457 cw := int64(largeModel.CatwalkCfg.ContextWindow)
458 // If context window is unknown (0), skip auto-summarize
459 // to avoid immediately truncating custom/local models.
460 if cw == 0 {
461 return false
462 }
463 tokens := currentSession.CompletionTokens + currentSession.PromptTokens
464 remaining := cw - tokens
465 var threshold int64
466 if cw > largeContextWindowThreshold {
467 threshold = largeContextWindowBuffer
468 } else {
469 threshold = int64(float64(cw) * smallContextWindowRatio)
470 }
471 if (remaining <= threshold) && !a.disableAutoSummarize {
472 shouldSummarize = true
473 return true
474 }
475 return false
476 },
477 func(steps []fantasy.StepResult) bool {
478 return hasRepeatedToolCalls(steps, loopDetectionWindowSize, loopDetectionMaxRepeats)
479 },
480 },
481 })
482
483 a.eventPromptResponded(call.SessionID, time.Since(startTime).Truncate(time.Second))
484
485 if err != nil {
486 isHyper := largeModel.ModelCfg.Provider == hyper.Name
487 isCancelErr := errors.Is(err, context.Canceled)
488 if currentAssistant == nil {
489 return result, err
490 }
491 // Ensure we finish thinking on error to close the reasoning state.
492 currentAssistant.FinishThinking()
493 toolCalls := currentAssistant.ToolCalls()
494 // INFO: we use the parent context here because the genCtx has been cancelled.
495 msgs, createErr := a.messages.List(ctx, currentAssistant.SessionID)
496 if createErr != nil {
497 return nil, createErr
498 }
499 for _, tc := range toolCalls {
500 if !tc.Finished {
501 tc.Finished = true
502 tc.Input = "{}"
503 currentAssistant.AddToolCall(tc)
504 updateErr := a.messages.Update(ctx, *currentAssistant)
505 if updateErr != nil {
506 return nil, updateErr
507 }
508 }
509
510 found := false
511 for _, msg := range msgs {
512 if msg.Role == message.Tool {
513 for _, tr := range msg.ToolResults() {
514 if tr.ToolCallID == tc.ID {
515 found = true
516 break
517 }
518 }
519 }
520 if found {
521 break
522 }
523 }
524 if found {
525 continue
526 }
527 content := "There was an error while executing the tool"
528 if isCancelErr {
529 content = "Error: user cancelled assistant tool calling"
530 }
531 toolResult := message.ToolResult{
532 ToolCallID: tc.ID,
533 Name: tc.Name,
534 Content: content,
535 IsError: true,
536 }
537 _, createErr = a.messages.Create(ctx, currentAssistant.SessionID, message.CreateMessageParams{
538 Role: message.Tool,
539 Parts: []message.ContentPart{
540 toolResult,
541 },
542 })
543 if createErr != nil {
544 return nil, createErr
545 }
546 }
547 var fantasyErr *fantasy.Error
548 var providerErr *fantasy.ProviderError
549 const defaultTitle = "Provider Error"
550 linkStyle := lipgloss.NewStyle().Foreground(charmtone.Guac).Underline(true)
551 if isCancelErr {
552 currentAssistant.AddFinish(message.FinishReasonCanceled, "User canceled request", "")
553 } else if isHyper && errors.As(err, &providerErr) && providerErr.StatusCode == http.StatusUnauthorized {
554 currentAssistant.AddFinish(message.FinishReasonError, "Unauthorized", `Please re-authenticate with Hyper. You can also run "crush auth" to re-authenticate.`)
555 if a.notify != nil {
556 a.notify.Publish(pubsub.CreatedEvent, notify.Notification{
557 SessionID: call.SessionID,
558 SessionTitle: currentSession.Title,
559 Type: notify.TypeReAuthenticate,
560 ProviderID: largeModel.ModelCfg.Provider,
561 })
562 }
563 } else if isHyper && errors.As(err, &providerErr) && providerErr.StatusCode == http.StatusPaymentRequired {
564 url := hyper.BaseURL()
565 link := linkStyle.Hyperlink(url, "id=hyper").Render(url)
566 currentAssistant.AddFinish(message.FinishReasonError, "No credits", "You're out of credits. Add more at "+link)
567 } else if errors.As(err, &providerErr) {
568 if providerErr.Message == "The requested model is not supported." {
569 url := "https://github.com/settings/copilot/features"
570 link := linkStyle.Hyperlink(url, "id=copilot").Render(url)
571 currentAssistant.AddFinish(
572 message.FinishReasonError,
573 "Copilot model not enabled",
574 fmt.Sprintf("%q is not enabled in Copilot. Go to the following page to enable it. Then, wait 5 minutes before trying again. %s", largeModel.CatwalkCfg.Name, link),
575 )
576 } else {
577 currentAssistant.AddFinish(message.FinishReasonError, cmp.Or(stringext.Capitalize(providerErr.Title), defaultTitle), providerErr.Message)
578 }
579 } else if errors.As(err, &fantasyErr) {
580 currentAssistant.AddFinish(message.FinishReasonError, cmp.Or(stringext.Capitalize(fantasyErr.Title), defaultTitle), fantasyErr.Message)
581 } else {
582 currentAssistant.AddFinish(message.FinishReasonError, defaultTitle, err.Error())
583 }
584 // Note: we use the parent context here because the genCtx has been
585 // cancelled.
586 updateErr := a.messages.Update(ctx, *currentAssistant)
587 if updateErr != nil {
588 return nil, updateErr
589 }
590 return nil, err
591 }
592
593 if shouldSummarize {
594 a.activeRequests.Del(call.SessionID)
595 if summarizeErr := a.Summarize(genCtx, call.SessionID, call.ProviderOptions); summarizeErr != nil {
596 return nil, summarizeErr
597 }
598 // If the agent wasn't done...
599 if len(currentAssistant.ToolCalls()) > 0 {
600 existing, ok := a.messageQueue.Get(call.SessionID)
601 if !ok {
602 existing = []SessionAgentCall{}
603 }
604 call.Prompt = fmt.Sprintf("The previous session was interrupted because it got too long, the initial user request was: `%s`", call.Prompt)
605 existing = append(existing, call)
606 a.messageQueue.Set(call.SessionID, existing)
607 }
608 }
609
610 // Release active request before publishing the notification.
611 // TUI handlers poll IsSessionBusy() and only re-evaluate when a
612 // tea.Msg arrives, so the cleanup must precede the notify or
613 // subscribers see stale busy state at the moment of receipt.
614 a.activeRequests.Del(call.SessionID)
615 cancel()
616
617 // Send notification that agent has finished its turn (skip for
618 // nested/non-interactive sessions).
619 if !call.NonInteractive && a.notify != nil {
620 a.notify.Publish(pubsub.CreatedEvent, notify.Notification{
621 SessionID: call.SessionID,
622 SessionTitle: currentSession.Title,
623 Type: notify.TypeAgentFinished,
624 })
625 }
626
627 queuedMessages, ok := a.messageQueue.Get(call.SessionID)
628 if !ok || len(queuedMessages) == 0 {
629 return result, err
630 }
631 // There are queued messages restart the loop.
632 firstQueuedMessage := queuedMessages[0]
633 a.messageQueue.Set(call.SessionID, queuedMessages[1:])
634 return a.Run(ctx, firstQueuedMessage)
635}
636
637func (a *sessionAgent) Summarize(ctx context.Context, sessionID string, opts fantasy.ProviderOptions) error {
638 if a.IsSessionBusy(sessionID) {
639 return ErrSessionBusy
640 }
641
642 // Copy mutable fields under lock to avoid races with SetModels.
643 largeModel := a.largeModel.Get()
644 systemPromptPrefix := a.systemPromptPrefix.Get()
645
646 currentSession, err := a.sessions.Get(ctx, sessionID)
647 if err != nil {
648 return fmt.Errorf("failed to get session: %w", err)
649 }
650 msgs, err := a.getSessionMessages(ctx, currentSession)
651 if err != nil {
652 return err
653 }
654 if len(msgs) == 0 {
655 // Nothing to summarize.
656 return nil
657 }
658
659 aiMsgs, _ := a.preparePrompt(msgs, largeModel.CatwalkCfg.SupportsImages)
660
661 genCtx, cancel := context.WithCancel(ctx)
662 a.activeRequests.Set(sessionID, cancel)
663 defer a.activeRequests.Del(sessionID)
664 defer cancel()
665 defer func() {
666 if flushErr := a.messages.FlushAll(ctx); flushErr != nil {
667 slog.Error("Failed to flush pending message updates after summarize", "error", flushErr)
668 }
669 }()
670
671 agent := fantasy.NewAgent(
672 largeModel.Model,
673 fantasy.WithSystemPrompt(string(summaryPrompt)),
674 fantasy.WithUserAgent(userAgent),
675 )
676 summaryMessage, err := a.messages.Create(ctx, sessionID, message.CreateMessageParams{
677 Role: message.Assistant,
678 Model: largeModel.Model.Model(),
679 Provider: largeModel.Model.Provider(),
680 IsSummaryMessage: true,
681 })
682 if err != nil {
683 return err
684 }
685
686 summaryPromptText := buildSummaryPrompt(currentSession.Todos)
687
688 resp, err := agent.Stream(genCtx, fantasy.AgentStreamCall{
689 Prompt: summaryPromptText,
690 Messages: aiMsgs,
691 ProviderOptions: opts,
692 PrepareStep: func(callContext context.Context, options fantasy.PrepareStepFunctionOptions) (_ context.Context, prepared fantasy.PrepareStepResult, err error) {
693 prepared.Messages = options.Messages
694 if systemPromptPrefix != "" {
695 prepared.Messages = append([]fantasy.Message{fantasy.NewSystemMessage(systemPromptPrefix)}, prepared.Messages...)
696 }
697 return callContext, prepared, nil
698 },
699 OnReasoningDelta: func(id string, text string) error {
700 summaryMessage.AppendReasoningContent(text)
701 return a.messages.Update(genCtx, summaryMessage)
702 },
703 OnReasoningEnd: func(id string, reasoning fantasy.ReasoningContent) error {
704 // Handle anthropic signature.
705 if anthropicData, ok := reasoning.ProviderMetadata["anthropic"]; ok {
706 if signature, ok := anthropicData.(*anthropic.ReasoningOptionMetadata); ok && signature.Signature != "" {
707 summaryMessage.AppendReasoningSignature(signature.Signature)
708 }
709 }
710 summaryMessage.FinishThinking()
711 return a.messages.Update(genCtx, summaryMessage)
712 },
713 OnTextDelta: func(id, text string) error {
714 summaryMessage.AppendContent(text)
715 return a.messages.Update(genCtx, summaryMessage)
716 },
717 })
718 if err != nil {
719 isCancelErr := errors.Is(err, context.Canceled)
720 if isCancelErr {
721 // User cancelled summarize we need to remove the summary message.
722 deleteErr := a.messages.Delete(ctx, summaryMessage.ID)
723 return deleteErr
724 }
725 // Mark the summary message as finished with an error so the UI
726 // stops spinning.
727 summaryMessage.AddFinish(message.FinishReasonError, "Summarization Error", err.Error())
728 if updateErr := a.messages.Update(ctx, summaryMessage); updateErr != nil {
729 return updateErr
730 }
731 return err
732 }
733
734 summaryMessage.AddFinish(message.FinishReasonEndTurn, "", "")
735 err = a.messages.Update(genCtx, summaryMessage)
736 if err != nil {
737 return err
738 }
739
740 var openrouterCost *float64
741 for _, step := range resp.Steps {
742 stepCost := a.openrouterCost(step.ProviderMetadata)
743 if stepCost != nil {
744 newCost := *stepCost
745 if openrouterCost != nil {
746 newCost += *openrouterCost
747 }
748 openrouterCost = &newCost
749 }
750 }
751
752 a.updateSessionUsage(largeModel, ¤tSession, resp.TotalUsage, openrouterCost)
753
754 // Just in case, get just the last usage info.
755 usage := resp.Response.Usage
756 currentSession.SummaryMessageID = summaryMessage.ID
757 currentSession.CompletionTokens = usage.OutputTokens
758 currentSession.PromptTokens = 0
759 _, err = a.sessions.Save(genCtx, currentSession)
760 if err != nil {
761 return err
762 }
763
764 // Release the active request before processing queued messages so that
765 // Run() does not see the session as busy.
766 a.activeRequests.Del(sessionID)
767 cancel()
768
769 // Process any messages that were queued while summarizing.
770 queuedMessages, ok := a.messageQueue.Get(sessionID)
771 if !ok || len(queuedMessages) == 0 {
772 return nil
773 }
774 firstQueuedMessage := queuedMessages[0]
775 a.messageQueue.Set(sessionID, queuedMessages[1:])
776 _, qErr := a.Run(ctx, firstQueuedMessage)
777 return qErr
778}
779
780func (a *sessionAgent) getCacheControlOptions() fantasy.ProviderOptions {
781 if t, _ := strconv.ParseBool(os.Getenv("CRUSH_DISABLE_ANTHROPIC_CACHE")); t {
782 return fantasy.ProviderOptions{}
783 }
784 return fantasy.ProviderOptions{
785 anthropic.Name: &anthropic.ProviderCacheControlOptions{
786 CacheControl: anthropic.CacheControl{Type: "ephemeral"},
787 },
788 bedrock.Name: &anthropic.ProviderCacheControlOptions{
789 CacheControl: anthropic.CacheControl{Type: "ephemeral"},
790 },
791 vercel.Name: &anthropic.ProviderCacheControlOptions{
792 CacheControl: anthropic.CacheControl{Type: "ephemeral"},
793 },
794 }
795}
796
797func (a *sessionAgent) createUserMessage(ctx context.Context, call SessionAgentCall) (message.Message, error) {
798 parts := []message.ContentPart{message.TextContent{Text: call.Prompt}}
799 var attachmentParts []message.ContentPart
800 for _, attachment := range call.Attachments {
801 attachmentParts = append(attachmentParts, message.BinaryContent{Path: attachment.FilePath, MIMEType: attachment.MimeType, Data: attachment.Content})
802 }
803 parts = append(parts, attachmentParts...)
804 msg, err := a.messages.Create(ctx, call.SessionID, message.CreateMessageParams{
805 Role: message.User,
806 Parts: parts,
807 })
808 if err != nil {
809 return message.Message{}, fmt.Errorf("failed to create user message: %w", err)
810 }
811 return msg, nil
812}
813
814func (a *sessionAgent) preparePrompt(msgs []message.Message, supportsImages bool, attachments ...message.Attachment) ([]fantasy.Message, []fantasy.FilePart) {
815 var history []fantasy.Message
816 if !a.isSubAgent {
817 history = append(history, fantasy.NewUserMessage(
818 fmt.Sprintf(
819 "<system_reminder>%s</system_reminder>",
820 `This is a reminder that your todo list is currently empty. DO NOT mention this to the user explicitly because they are already aware.
821If you are working on tasks that would benefit from a todo list please use the "todos" tool to create one.
822If not, please feel free to ignore. Again do not mention this message to the user.`,
823 ),
824 ))
825 }
826 // Collect all tool call IDs present in assistant messages and all tool
827 // result IDs present in tool messages. This lets us detect both orphaned
828 // tool results (result without a call) and orphaned tool calls (call
829 // without a result).
830 knownToolCallIDs := make(map[string]struct{})
831 knownToolResultIDs := make(map[string]struct{})
832 for _, m := range msgs {
833 switch m.Role {
834 case message.Assistant:
835 for _, tc := range m.ToolCalls() {
836 knownToolCallIDs[tc.ID] = struct{}{}
837 }
838 case message.Tool:
839 for _, tr := range m.ToolResults() {
840 knownToolResultIDs[tr.ToolCallID] = struct{}{}
841 }
842 }
843 }
844
845 for _, m := range msgs {
846 if len(m.Parts) == 0 {
847 continue
848 }
849 // Assistant message without content or tool calls (cancelled before it returned anything).
850 if m.Role == message.Assistant && len(m.ToolCalls()) == 0 && m.Content().Text == "" && m.ReasoningContent().String() == "" {
851 continue
852 }
853 if m.Role == message.Tool {
854 if msg, ok := filterOrphanedToolResults(m, knownToolCallIDs); ok {
855 history = append(history, msg)
856 }
857 continue
858 }
859 aiMsgs := m.ToAIMessage()
860 if !supportsImages {
861 for i := range aiMsgs {
862 if aiMsgs[i].Role == fantasy.MessageRoleUser {
863 aiMsgs[i].Content = filterFileParts(aiMsgs[i].Content)
864 }
865 }
866 }
867 history = append(history, aiMsgs...)
868
869 if m.Role == message.Assistant {
870 if msg, ok := syntheticToolResultsForOrphanedCalls(m, knownToolResultIDs); ok {
871 history = append(history, msg)
872 }
873 }
874 }
875
876 var files []fantasy.FilePart
877 for _, attachment := range attachments {
878 if attachment.IsText() {
879 continue
880 }
881 files = append(files, fantasy.FilePart{
882 Filename: attachment.FileName,
883 Data: attachment.Content,
884 MediaType: attachment.MimeType,
885 })
886 }
887
888 return history, files
889}
890
891// filterFileParts removes fantasy.FilePart entries from a slice of message
892// parts. Used to strip image attachments from historical user messages when
893// the current model does not support them.
894func filterFileParts(parts []fantasy.MessagePart) []fantasy.MessagePart {
895 filtered := make([]fantasy.MessagePart, 0, len(parts))
896 for _, part := range parts {
897 if _, ok := fantasy.AsMessagePart[fantasy.FilePart](part); ok {
898 continue
899 }
900 filtered = append(filtered, part)
901 }
902 return filtered
903}
904
905// filterOrphanedToolResults converts a tool message to a fantasy.Message,
906// dropping any tool result parts whose tool_call_id has no matching tool call
907// in the known set. An orphaned result causes API validation to fail on every
908// subsequent turn, permanently locking the session. Returns the filtered
909// message and true if at least one valid part remains.
910func filterOrphanedToolResults(m message.Message, knownToolCallIDs map[string]struct{}) (fantasy.Message, bool) {
911 aiMsgs := m.ToAIMessage()
912 if len(aiMsgs) == 0 {
913 return fantasy.Message{}, false
914 }
915 var validParts []fantasy.MessagePart
916 for _, part := range aiMsgs[0].Content {
917 tr, ok := fantasy.AsMessagePart[fantasy.ToolResultPart](part)
918 if !ok {
919 validParts = append(validParts, part)
920 continue
921 }
922 if _, known := knownToolCallIDs[tr.ToolCallID]; known {
923 validParts = append(validParts, part)
924 } else {
925 slog.Warn(
926 "Dropping orphaned tool result with no matching tool call",
927 "tool_call_id", tr.ToolCallID,
928 )
929 }
930 }
931 if len(validParts) == 0 {
932 return fantasy.Message{}, false
933 }
934 msg := aiMsgs[0]
935 msg.Content = validParts
936 return msg, true
937}
938
939// syntheticToolResultsForOrphanedCalls returns a tool message containing
940// synthetic tool results for any tool calls in the assistant message that
941// have no matching result in knownToolResultIDs. LLM APIs require every
942// tool_use to be immediately followed by a tool_result; an interrupted
943// session can leave orphaned tool_use blocks that permanently lock the
944// conversation. Returns the message and true if any synthetic results were
945// produced.
946func syntheticToolResultsForOrphanedCalls(m message.Message, knownToolResultIDs map[string]struct{}) (fantasy.Message, bool) {
947 var syntheticParts []fantasy.MessagePart
948 for _, tc := range m.ToolCalls() {
949 if _, hasResult := knownToolResultIDs[tc.ID]; hasResult {
950 continue
951 }
952 slog.Warn(
953 "Injecting synthetic tool result for orphaned tool call",
954 "tool_call_id", tc.ID,
955 "tool_name", tc.Name,
956 )
957 syntheticParts = append(syntheticParts, fantasy.ToolResultPart{
958 ToolCallID: tc.ID,
959 Output: fantasy.ToolResultOutputContentError{
960 Error: errors.New("tool call was interrupted and did not produce a result, you may retry this call if the result is still needed"),
961 },
962 })
963 }
964 if len(syntheticParts) == 0 {
965 return fantasy.Message{}, false
966 }
967 return fantasy.Message{
968 Role: fantasy.MessageRoleTool,
969 Content: syntheticParts,
970 }, true
971}
972
973func (a *sessionAgent) getSessionMessages(ctx context.Context, session session.Session) ([]message.Message, error) {
974 msgs, err := a.messages.List(ctx, session.ID)
975 if err != nil {
976 return nil, fmt.Errorf("failed to list messages: %w", err)
977 }
978
979 if session.SummaryMessageID != "" {
980 summaryMsgIndex := -1
981 for i, msg := range msgs {
982 if msg.ID == session.SummaryMessageID {
983 summaryMsgIndex = i
984 break
985 }
986 }
987 if summaryMsgIndex != -1 {
988 msgs = msgs[summaryMsgIndex:]
989 msgs[0].Role = message.User
990 }
991 }
992 return msgs, nil
993}
994
995// generateTitle generates a session titled based on the initial prompt.
996func (a *sessionAgent) generateTitle(ctx context.Context, sessionID string, userPrompt string) {
997 if userPrompt == "" {
998 return
999 }
1000
1001 smallModel := a.smallModel.Get()
1002 largeModel := a.largeModel.Get()
1003 systemPromptPrefix := a.systemPromptPrefix.Get()
1004
1005 var maxOutputTokens int64 = 40
1006 if smallModel.CatwalkCfg.CanReason {
1007 maxOutputTokens = smallModel.CatwalkCfg.DefaultMaxTokens
1008 }
1009
1010 newAgent := func(m fantasy.LanguageModel, p []byte, tok int64) fantasy.Agent {
1011 return fantasy.NewAgent(
1012 m,
1013 fantasy.WithSystemPrompt(string(p)+"\n /no_think"),
1014 fantasy.WithMaxOutputTokens(tok),
1015 fantasy.WithUserAgent(userAgent),
1016 )
1017 }
1018
1019 streamCall := fantasy.AgentStreamCall{
1020 Prompt: fmt.Sprintf("Generate a concise title for the following content:\n\n%s\n <think>\n\n</think>", userPrompt),
1021 PrepareStep: func(callCtx context.Context, opts fantasy.PrepareStepFunctionOptions) (_ context.Context, prepared fantasy.PrepareStepResult, err error) {
1022 prepared.Messages = opts.Messages
1023 if systemPromptPrefix != "" {
1024 prepared.Messages = append([]fantasy.Message{
1025 fantasy.NewSystemMessage(systemPromptPrefix),
1026 }, prepared.Messages...)
1027 }
1028 return callCtx, prepared, nil
1029 },
1030 }
1031
1032 // Use the small model to generate the title.
1033 model := smallModel
1034 agent := newAgent(model.Model, titlePrompt, maxOutputTokens)
1035 resp, err := agent.Stream(ctx, streamCall)
1036 if err == nil {
1037 // We successfully generated a title with the small model.
1038 slog.Debug("Generated title with small model")
1039 } else {
1040 // It didn't work. Let's try with the big model.
1041 slog.Error("Error generating title with small model; trying big model", "err", err)
1042 model = largeModel
1043 agent = newAgent(model.Model, titlePrompt, maxOutputTokens)
1044 resp, err = agent.Stream(ctx, streamCall)
1045 if err == nil {
1046 slog.Debug("Generated title with large model")
1047 } else {
1048 // Welp, the large model didn't work either. Use the default
1049 // session name and return.
1050 slog.Error("Error generating title with large model", "err", err)
1051 saveErr := a.sessions.Rename(ctx, sessionID, DefaultSessionName)
1052 if saveErr != nil {
1053 slog.Error("Failed to save session title", "error", saveErr)
1054 }
1055 return
1056 }
1057 }
1058
1059 if resp == nil {
1060 // Actually, we didn't get a response so we can't. Use the default
1061 // session name and return.
1062 slog.Error("Response is nil; can't generate title")
1063 saveErr := a.sessions.Rename(ctx, sessionID, DefaultSessionName)
1064 if saveErr != nil {
1065 slog.Error("Failed to save session title", "error", saveErr)
1066 }
1067 return
1068 }
1069
1070 // Clean up title.
1071 var title string
1072 title = strings.ReplaceAll(resp.Response.Content.Text(), "\n", " ")
1073
1074 // Remove thinking tags if present.
1075 title = thinkTagRegex.ReplaceAllString(title, "")
1076 title = orphanThinkTagRegex.ReplaceAllString(title, "")
1077
1078 title = strings.TrimSpace(title)
1079 title = cmp.Or(title, DefaultSessionName)
1080
1081 // Calculate usage and cost.
1082 var openrouterCost *float64
1083 for _, step := range resp.Steps {
1084 stepCost := a.openrouterCost(step.ProviderMetadata)
1085 if stepCost != nil {
1086 newCost := *stepCost
1087 if openrouterCost != nil {
1088 newCost += *openrouterCost
1089 }
1090 openrouterCost = &newCost
1091 }
1092 }
1093
1094 modelConfig := model.CatwalkCfg
1095 cost := modelConfig.CostPer1MInCached/1e6*float64(resp.TotalUsage.CacheCreationTokens) +
1096 modelConfig.CostPer1MOutCached/1e6*float64(resp.TotalUsage.CacheReadTokens) +
1097 modelConfig.CostPer1MIn/1e6*float64(resp.TotalUsage.InputTokens) +
1098 modelConfig.CostPer1MOut/1e6*float64(resp.TotalUsage.OutputTokens)
1099
1100 // Use override cost if available (e.g., from OpenRouter).
1101 if openrouterCost != nil {
1102 cost = *openrouterCost
1103 }
1104
1105 // Skip cost accumulation
1106 if model.FlatRate {
1107 cost = 0
1108 }
1109
1110 promptTokens := resp.TotalUsage.InputTokens + resp.TotalUsage.CacheCreationTokens
1111 completionTokens := resp.TotalUsage.OutputTokens
1112
1113 // Atomically update only title and usage fields to avoid overriding other
1114 // concurrent session updates.
1115 saveErr := a.sessions.UpdateTitleAndUsage(ctx, sessionID, title, promptTokens, completionTokens, cost)
1116 if saveErr != nil {
1117 slog.Error("Failed to save session title and usage", "error", saveErr)
1118 return
1119 }
1120}
1121
1122func (a *sessionAgent) openrouterCost(metadata fantasy.ProviderMetadata) *float64 {
1123 openrouterMetadata, ok := metadata[openrouter.Name]
1124 if !ok {
1125 return nil
1126 }
1127
1128 opts, ok := openrouterMetadata.(*openrouter.ProviderMetadata)
1129 if !ok {
1130 return nil
1131 }
1132 return &opts.Usage.Cost
1133}
1134
1135func (a *sessionAgent) updateSessionUsage(model Model, session *session.Session, usage fantasy.Usage, overrideCost *float64) {
1136 modelConfig := model.CatwalkCfg
1137 cost := modelConfig.CostPer1MInCached/1e6*float64(usage.CacheCreationTokens) +
1138 modelConfig.CostPer1MOutCached/1e6*float64(usage.CacheReadTokens) +
1139 modelConfig.CostPer1MIn/1e6*float64(usage.InputTokens) +
1140 modelConfig.CostPer1MOut/1e6*float64(usage.OutputTokens)
1141
1142 a.eventTokensUsed(session.ID, model, usage, cost)
1143
1144 // Use override cost if available (e.g., from OpenRouter).
1145 if overrideCost != nil {
1146 cost = *overrideCost
1147 }
1148
1149 // Skip cost accumulation
1150 if model.FlatRate {
1151 cost = 0
1152 }
1153
1154 session.Cost += cost
1155 session.CompletionTokens = usage.OutputTokens
1156 session.PromptTokens = usage.InputTokens + usage.CacheReadTokens
1157}
1158
1159func (a *sessionAgent) Cancel(sessionID string) {
1160 // Cancel regular requests. Don't use Take() here - we need the entry to
1161 // remain in activeRequests so IsBusy() returns true until the goroutine
1162 // fully completes (including error handling that may access the DB).
1163 // The defer in processRequest will clean up the entry.
1164 if cancel, ok := a.activeRequests.Get(sessionID); ok && cancel != nil {
1165 slog.Debug("Request cancellation initiated", "session_id", sessionID)
1166 cancel()
1167 }
1168
1169 // Also check for summarize requests.
1170 if cancel, ok := a.activeRequests.Get(sessionID + "-summarize"); ok && cancel != nil {
1171 slog.Debug("Summarize cancellation initiated", "session_id", sessionID)
1172 cancel()
1173 }
1174
1175 if a.QueuedPrompts(sessionID) > 0 {
1176 slog.Debug("Clearing queued prompts", "session_id", sessionID)
1177 a.messageQueue.Del(sessionID)
1178 }
1179}
1180
1181func (a *sessionAgent) ClearQueue(sessionID string) {
1182 if a.QueuedPrompts(sessionID) > 0 {
1183 slog.Debug("Clearing queued prompts", "session_id", sessionID)
1184 a.messageQueue.Del(sessionID)
1185 }
1186}
1187
1188func (a *sessionAgent) CancelAll() {
1189 if !a.IsBusy() {
1190 return
1191 }
1192 for key := range a.activeRequests.Seq2() {
1193 a.Cancel(key) // key is sessionID
1194 }
1195
1196 timeout := time.After(5 * time.Second)
1197 for a.IsBusy() {
1198 select {
1199 case <-timeout:
1200 return
1201 default:
1202 time.Sleep(200 * time.Millisecond)
1203 }
1204 }
1205}
1206
1207func (a *sessionAgent) IsBusy() bool {
1208 var busy bool
1209 for cancelFunc := range a.activeRequests.Seq() {
1210 if cancelFunc != nil {
1211 busy = true
1212 break
1213 }
1214 }
1215 return busy
1216}
1217
1218func (a *sessionAgent) IsSessionBusy(sessionID string) bool {
1219 _, busy := a.activeRequests.Get(sessionID)
1220 return busy
1221}
1222
1223func (a *sessionAgent) QueuedPrompts(sessionID string) int {
1224 l, ok := a.messageQueue.Get(sessionID)
1225 if !ok {
1226 return 0
1227 }
1228 return len(l)
1229}
1230
1231func (a *sessionAgent) QueuedPromptsList(sessionID string) []string {
1232 l, ok := a.messageQueue.Get(sessionID)
1233 if !ok {
1234 return nil
1235 }
1236 prompts := make([]string, len(l))
1237 for i, call := range l {
1238 prompts[i] = call.Prompt
1239 }
1240 return prompts
1241}
1242
1243func (a *sessionAgent) SetModels(large Model, small Model) {
1244 a.largeModel.Set(large)
1245 a.smallModel.Set(small)
1246}
1247
1248func (a *sessionAgent) SetTools(tools []fantasy.AgentTool) {
1249 a.tools.SetSlice(tools)
1250}
1251
1252func (a *sessionAgent) SetSystemPrompt(systemPrompt string) {
1253 a.systemPrompt.Set(systemPrompt)
1254}
1255
1256func (a *sessionAgent) Model() Model {
1257 return a.largeModel.Get()
1258}
1259
1260// convertToToolResult converts a fantasy tool result to a message tool result.
1261func (a *sessionAgent) convertToToolResult(result fantasy.ToolResultContent) message.ToolResult {
1262 baseResult := message.ToolResult{
1263 ToolCallID: result.ToolCallID,
1264 Name: result.ToolName,
1265 Metadata: result.ClientMetadata,
1266 }
1267
1268 switch result.Result.GetType() {
1269 case fantasy.ToolResultContentTypeText:
1270 if r, ok := fantasy.AsToolResultOutputType[fantasy.ToolResultOutputContentText](result.Result); ok {
1271 baseResult.Content = r.Text
1272 }
1273 case fantasy.ToolResultContentTypeError:
1274 if r, ok := fantasy.AsToolResultOutputType[fantasy.ToolResultOutputContentError](result.Result); ok {
1275 baseResult.Content = r.Error.Error()
1276 baseResult.IsError = true
1277 }
1278 case fantasy.ToolResultContentTypeMedia:
1279 if r, ok := fantasy.AsToolResultOutputType[fantasy.ToolResultOutputContentMedia](result.Result); ok {
1280 if !stringext.IsValidBase64(r.Data) {
1281 slog.Warn(
1282 "Tool returned media with invalid base64 data, discarding image",
1283 "tool", result.ToolName,
1284 "tool_call_id", result.ToolCallID,
1285 )
1286 baseResult.Content = "Tool returned image data with invalid encoding"
1287 baseResult.IsError = true
1288 } else {
1289 content := r.Text
1290 if content == "" {
1291 content = fmt.Sprintf("Loaded %s content", r.MediaType)
1292 }
1293 baseResult.Content = content
1294 baseResult.Data = r.Data
1295 baseResult.MIMEType = r.MediaType
1296 }
1297 }
1298 }
1299
1300 return baseResult
1301}
1302
1303// workaroundProviderMediaLimitations converts media content in tool results to
1304// user messages for providers that don't natively support images in tool results.
1305//
1306// Problem: OpenAI, Google, OpenRouter, and other OpenAI-compatible providers
1307// don't support sending images/media in tool result messages - they only accept
1308// text in tool results. However, they DO support images in user messages.
1309//
1310// If we send media in tool results to these providers, the API returns an error.
1311//
1312// Solution: For these providers, we:
1313// 1. Replace the media in the tool result with a text placeholder
1314// 2. Inject a user message immediately after with the image as a file attachment
1315// 3. This maintains the tool execution flow while working around API limitations
1316//
1317// Anthropic and Bedrock support images natively in tool results, so we skip
1318// this workaround for them.
1319//
1320// Example transformation:
1321//
1322// BEFORE: [tool result: image data]
1323// AFTER: [tool result: "Image loaded - see attached"], [user: image attachment]
1324func (a *sessionAgent) workaroundProviderMediaLimitations(messages []fantasy.Message, largeModel Model) []fantasy.Message {
1325 providerSupportsMedia := largeModel.ModelCfg.Provider == string(catwalk.InferenceProviderAnthropic) ||
1326 largeModel.ModelCfg.Provider == string(catwalk.InferenceProviderBedrock)
1327
1328 if providerSupportsMedia {
1329 return messages
1330 }
1331
1332 convertedMessages := make([]fantasy.Message, 0, len(messages))
1333
1334 for _, msg := range messages {
1335 if msg.Role != fantasy.MessageRoleTool {
1336 convertedMessages = append(convertedMessages, msg)
1337 continue
1338 }
1339
1340 textParts := make([]fantasy.MessagePart, 0, len(msg.Content))
1341 var mediaFiles []fantasy.FilePart
1342
1343 for _, part := range msg.Content {
1344 toolResult, ok := fantasy.AsMessagePart[fantasy.ToolResultPart](part)
1345 if !ok {
1346 textParts = append(textParts, part)
1347 continue
1348 }
1349
1350 if media, ok := fantasy.AsToolResultOutputType[fantasy.ToolResultOutputContentMedia](toolResult.Output); ok {
1351 decoded, err := base64.StdEncoding.DecodeString(media.Data)
1352 if err != nil {
1353 slog.Warn("Failed to decode media data", "error", err)
1354 textParts = append(textParts, part)
1355 continue
1356 }
1357
1358 mediaFiles = append(mediaFiles, fantasy.FilePart{
1359 Data: decoded,
1360 MediaType: media.MediaType,
1361 Filename: fmt.Sprintf("tool-result-%s", toolResult.ToolCallID),
1362 })
1363
1364 textParts = append(textParts, fantasy.ToolResultPart{
1365 ToolCallID: toolResult.ToolCallID,
1366 Output: fantasy.ToolResultOutputContentText{
1367 Text: "[Image/media content loaded - see attached file]",
1368 },
1369 ProviderOptions: toolResult.ProviderOptions,
1370 })
1371 } else {
1372 textParts = append(textParts, part)
1373 }
1374 }
1375
1376 convertedMessages = append(convertedMessages, fantasy.Message{
1377 Role: fantasy.MessageRoleTool,
1378 Content: textParts,
1379 })
1380
1381 if len(mediaFiles) > 0 {
1382 convertedMessages = append(convertedMessages, fantasy.NewUserMessage(
1383 "Here is the media content from the tool result:",
1384 mediaFiles...,
1385 ))
1386 }
1387 }
1388
1389 return convertedMessages
1390}
1391
1392// buildSummaryPrompt constructs the prompt text for session summarization.
1393func buildSummaryPrompt(todos []session.Todo) string {
1394 var sb strings.Builder
1395 sb.WriteString("Provide a detailed summary of our conversation above.")
1396 if len(todos) > 0 {
1397 sb.WriteString("\n\n## Current Todo List\n\n")
1398 for _, t := range todos {
1399 fmt.Fprintf(&sb, "- [%s] %s\n", t.Status, t.Content)
1400 }
1401 sb.WriteString("\nInclude these tasks and their statuses in your summary. ")
1402 sb.WriteString("Instruct the resuming assistant to use the `todos` tool to continue tracking progress on these tasks.")
1403 }
1404 return sb.String()
1405}
1406
1407func providerRetryLogFields(err *fantasy.ProviderError, delay time.Duration) []any {
1408 fields := []any{
1409 "retry_delay", delay.String(),
1410 }
1411 if err == nil {
1412 return fields
1413 }
1414 fields = append(fields, "status_code", err.StatusCode)
1415 if err.Title != "" {
1416 fields = append(fields, "title", err.Title)
1417 }
1418 if err.Message != "" {
1419 fields = append(fields, "message", err.Message)
1420 }
1421 return fields
1422}