1// Package agent is the core orchestration layer for Crush AI agents.
2//
3// It provides session-based AI agent functionality for managing
4// conversations, tool execution, and message handling. It coordinates
5// interactions between language models, messages, sessions, and tools while
6// handling features like automatic summarization, queuing, and token
7// management.
8package agent
9
10import (
11 "cmp"
12 "context"
13 _ "embed"
14 "encoding/base64"
15 "errors"
16 "fmt"
17 "log/slog"
18 "net/http"
19 "os"
20 "regexp"
21 "strconv"
22 "strings"
23 "sync"
24 "time"
25
26 "charm.land/catwalk/pkg/catwalk"
27 "charm.land/fantasy"
28 "charm.land/fantasy/providers/anthropic"
29 "charm.land/fantasy/providers/bedrock"
30 "charm.land/fantasy/providers/google"
31 "charm.land/fantasy/providers/openai"
32 "charm.land/fantasy/providers/openrouter"
33 "charm.land/fantasy/providers/vercel"
34 "charm.land/lipgloss/v2"
35 "github.com/charmbracelet/crush/internal/agent/hyper"
36 "github.com/charmbracelet/crush/internal/agent/notify"
37 "github.com/charmbracelet/crush/internal/agent/tools"
38 "github.com/charmbracelet/crush/internal/agent/tools/mcp"
39 "github.com/charmbracelet/crush/internal/config"
40 "github.com/charmbracelet/crush/internal/csync"
41 "github.com/charmbracelet/crush/internal/message"
42 "github.com/charmbracelet/crush/internal/pubsub"
43 "github.com/charmbracelet/crush/internal/session"
44 "github.com/charmbracelet/crush/internal/stringext"
45 "github.com/charmbracelet/crush/internal/version"
46 "github.com/charmbracelet/x/exp/charmtone"
47)
48
49const (
50 DefaultSessionName = "Untitled Session"
51
52 // Constants for auto-summarization thresholds
53 largeContextWindowThreshold = 200_000
54 largeContextWindowBuffer = 20_000
55 smallContextWindowRatio = 0.2
56)
57
58var userAgent = fmt.Sprintf("Charm-Crush/%s (https://charm.land/crush)", version.Version)
59
60//go:embed templates/title.md
61var titlePrompt []byte
62
63//go:embed templates/summary.md
64var summaryPrompt []byte
65
66// Used to remove <think> tags from generated titles.
67var (
68 thinkTagRegex = regexp.MustCompile(`(?s)<think>.*?</think>`)
69 orphanThinkTagRegex = regexp.MustCompile(`</?think>`)
70)
71
72type SessionAgentCall struct {
73 SessionID string
74 Prompt string
75 ProviderOptions fantasy.ProviderOptions
76 Attachments []message.Attachment
77 MaxOutputTokens int64
78 Temperature *float64
79 TopP *float64
80 TopK *int64
81 FrequencyPenalty *float64
82 PresencePenalty *float64
83 NonInteractive bool
84}
85
86type SessionAgent interface {
87 Run(context.Context, SessionAgentCall) (*fantasy.AgentResult, error)
88 SetModels(large Model, small Model)
89 SetTools(tools []fantasy.AgentTool)
90 SetSystemPrompt(systemPrompt string)
91 Cancel(sessionID string)
92 CancelAll()
93 IsSessionBusy(sessionID string) bool
94 IsBusy() bool
95 QueuedPrompts(sessionID string) int
96 QueuedPromptsList(sessionID string) []string
97 ClearQueue(sessionID string)
98 Summarize(context.Context, string, fantasy.ProviderOptions) error
99 Model() Model
100}
101
102type Model struct {
103 Model fantasy.LanguageModel
104 CatwalkCfg catwalk.Model
105 ModelCfg config.SelectedModel
106 FlatRate bool
107}
108
109type sessionAgent struct {
110 largeModel *csync.Value[Model]
111 smallModel *csync.Value[Model]
112 systemPromptPrefix *csync.Value[string]
113 systemPrompt *csync.Value[string]
114 tools *csync.Slice[fantasy.AgentTool]
115
116 isSubAgent bool
117 sessions session.Service
118 messages message.Service
119 disableAutoSummarize bool
120 isYolo bool
121 notify pubsub.Publisher[notify.Notification]
122
123 messageQueue *csync.Map[string, []SessionAgentCall]
124 activeRequests *csync.Map[string, context.CancelFunc]
125}
126
127type SessionAgentOptions struct {
128 LargeModel Model
129 SmallModel Model
130 SystemPromptPrefix string
131 SystemPrompt string
132 IsSubAgent bool
133 DisableAutoSummarize bool
134 IsYolo bool
135 Sessions session.Service
136 Messages message.Service
137 Tools []fantasy.AgentTool
138 Notify pubsub.Publisher[notify.Notification]
139}
140
141func NewSessionAgent(
142 opts SessionAgentOptions,
143) SessionAgent {
144 return &sessionAgent{
145 largeModel: csync.NewValue(opts.LargeModel),
146 smallModel: csync.NewValue(opts.SmallModel),
147 systemPromptPrefix: csync.NewValue(opts.SystemPromptPrefix),
148 systemPrompt: csync.NewValue(opts.SystemPrompt),
149 isSubAgent: opts.IsSubAgent,
150 sessions: opts.Sessions,
151 messages: opts.Messages,
152 disableAutoSummarize: opts.DisableAutoSummarize,
153 tools: csync.NewSliceFrom(opts.Tools),
154 isYolo: opts.IsYolo,
155 notify: opts.Notify,
156 messageQueue: csync.NewMap[string, []SessionAgentCall](),
157 activeRequests: csync.NewMap[string, context.CancelFunc](),
158 }
159}
160
161func (a *sessionAgent) Run(ctx context.Context, call SessionAgentCall) (*fantasy.AgentResult, error) {
162 if call.Prompt == "" && !message.ContainsTextAttachment(call.Attachments) {
163 return nil, ErrEmptyPrompt
164 }
165 if call.SessionID == "" {
166 return nil, ErrSessionMissing
167 }
168
169 // Queue the message if busy
170 if a.IsSessionBusy(call.SessionID) {
171 existing, ok := a.messageQueue.Get(call.SessionID)
172 if !ok {
173 existing = []SessionAgentCall{}
174 }
175 existing = append(existing, call)
176 a.messageQueue.Set(call.SessionID, existing)
177 return nil, nil
178 }
179
180 // Copy mutable fields under lock to avoid races with SetTools/SetModels.
181 agentTools := a.tools.Copy()
182 largeModel := a.largeModel.Get()
183 systemPrompt := a.systemPrompt.Get()
184 promptPrefix := a.systemPromptPrefix.Get()
185 var instructions strings.Builder
186
187 for _, server := range mcp.GetStates() {
188 if server.State != mcp.StateConnected {
189 continue
190 }
191 if s := server.Client.InitializeResult().Instructions; s != "" {
192 instructions.WriteString(s)
193 instructions.WriteString("\n\n")
194 }
195 }
196
197 if s := instructions.String(); s != "" {
198 systemPrompt += "\n\n<mcp-instructions>\n" + s + "\n</mcp-instructions>"
199 }
200
201 if len(agentTools) > 0 {
202 // Add Anthropic caching to the last tool.
203 agentTools[len(agentTools)-1].SetProviderOptions(a.getCacheControlOptions())
204 }
205
206 agent := fantasy.NewAgent(
207 largeModel.Model,
208 fantasy.WithSystemPrompt(systemPrompt),
209 fantasy.WithTools(agentTools...),
210 fantasy.WithUserAgent(userAgent),
211 )
212
213 sessionLock := sync.Mutex{}
214 currentSession, err := a.sessions.Get(ctx, call.SessionID)
215 if err != nil {
216 return nil, fmt.Errorf("failed to get session: %w", err)
217 }
218
219 msgs, err := a.getSessionMessages(ctx, currentSession)
220 if err != nil {
221 return nil, fmt.Errorf("failed to get session messages: %w", err)
222 }
223
224 var wg sync.WaitGroup
225 // Generate title if first message.
226 if len(msgs) == 0 {
227 titleCtx := ctx // Copy to avoid race with ctx reassignment below.
228 wg.Go(func() {
229 a.generateTitle(titleCtx, call.SessionID, call.Prompt)
230 })
231 }
232 defer wg.Wait()
233
234 // Add the user message to the session.
235 _, err = a.createUserMessage(ctx, call)
236 if err != nil {
237 return nil, err
238 }
239
240 // Add the session to the context.
241 ctx = context.WithValue(ctx, tools.SessionIDContextKey, call.SessionID)
242
243 genCtx, cancel := context.WithCancel(ctx)
244 a.activeRequests.Set(call.SessionID, cancel)
245
246 defer cancel()
247 defer a.activeRequests.Del(call.SessionID)
248 // Drain any debounced message updates before returning. message.Service
249 // already flushes synchronously on terminal updates, but a defer here
250 // guarantees the contract at every Run exit (success, error, panic
251 // recovery upstream) without callers needing to know.
252 defer func() {
253 if flushErr := a.messages.FlushAll(ctx); flushErr != nil {
254 slog.Error("Failed to flush pending message updates after run", "error", flushErr)
255 }
256 }()
257
258 history, files := a.preparePrompt(msgs, largeModel.CatwalkCfg.SupportsImages, call.Attachments...)
259
260 startTime := time.Now()
261 a.eventPromptSent(call.SessionID)
262
263 var currentAssistant *message.Message
264 var shouldSummarize bool
265 // Don't send MaxOutputTokens if 0 — some providers (e.g. LM Studio) reject it
266 var maxOutputTokens *int64
267 if call.MaxOutputTokens > 0 {
268 maxOutputTokens = &call.MaxOutputTokens
269 }
270 result, err := agent.Stream(genCtx, fantasy.AgentStreamCall{
271 Prompt: message.PromptWithTextAttachments(call.Prompt, call.Attachments),
272 Files: files,
273 Messages: history,
274 ProviderOptions: call.ProviderOptions,
275 MaxOutputTokens: maxOutputTokens,
276 TopP: call.TopP,
277 Temperature: call.Temperature,
278 PresencePenalty: call.PresencePenalty,
279 TopK: call.TopK,
280 FrequencyPenalty: call.FrequencyPenalty,
281 PrepareStep: func(callContext context.Context, options fantasy.PrepareStepFunctionOptions) (_ context.Context, prepared fantasy.PrepareStepResult, err error) {
282 prepared.Messages = options.Messages
283 for i := range prepared.Messages {
284 prepared.Messages[i].ProviderOptions = nil
285 }
286
287 // Use latest tools (updated by SetTools when MCP tools change).
288 prepared.Tools = a.tools.Copy()
289
290 queuedCalls, _ := a.messageQueue.Get(call.SessionID)
291 a.messageQueue.Del(call.SessionID)
292 for _, queued := range queuedCalls {
293 userMessage, createErr := a.createUserMessage(callContext, queued)
294 if createErr != nil {
295 return callContext, prepared, createErr
296 }
297 prepared.Messages = append(prepared.Messages, userMessage.ToAIMessage()...)
298 }
299
300 prepared.Messages = a.workaroundProviderMediaLimitations(prepared.Messages, largeModel)
301
302 lastSystemRoleInx := 0
303 systemMessageUpdated := false
304 for i, msg := range prepared.Messages {
305 // Only add cache control to the last message.
306 if msg.Role == fantasy.MessageRoleSystem {
307 lastSystemRoleInx = i
308 } else if !systemMessageUpdated {
309 prepared.Messages[lastSystemRoleInx].ProviderOptions = a.getCacheControlOptions()
310 systemMessageUpdated = true
311 }
312 // Than add cache control to the last 2 messages.
313 if i > len(prepared.Messages)-3 {
314 prepared.Messages[i].ProviderOptions = a.getCacheControlOptions()
315 }
316 }
317
318 if promptPrefix != "" {
319 prepared.Messages = append([]fantasy.Message{fantasy.NewSystemMessage(promptPrefix)}, prepared.Messages...)
320 }
321
322 var assistantMsg message.Message
323 assistantMsg, err = a.messages.Create(callContext, call.SessionID, message.CreateMessageParams{
324 Role: message.Assistant,
325 Parts: []message.ContentPart{},
326 Model: largeModel.ModelCfg.Model,
327 Provider: largeModel.ModelCfg.Provider,
328 })
329 if err != nil {
330 return callContext, prepared, err
331 }
332 callContext = context.WithValue(callContext, tools.MessageIDContextKey, assistantMsg.ID)
333 callContext = context.WithValue(callContext, tools.SupportsImagesContextKey, largeModel.CatwalkCfg.SupportsImages)
334 callContext = context.WithValue(callContext, tools.ModelNameContextKey, largeModel.CatwalkCfg.Name)
335 currentAssistant = &assistantMsg
336 return callContext, prepared, err
337 },
338 OnReasoningStart: func(id string, reasoning fantasy.ReasoningContent) error {
339 currentAssistant.AppendReasoningContent(reasoning.Text)
340 return a.messages.Update(genCtx, *currentAssistant)
341 },
342 OnReasoningDelta: func(id string, text string) error {
343 currentAssistant.AppendReasoningContent(text)
344 return a.messages.Update(genCtx, *currentAssistant)
345 },
346 OnReasoningEnd: func(id string, reasoning fantasy.ReasoningContent) error {
347 // handle anthropic signature
348 if anthropicData, ok := reasoning.ProviderMetadata[anthropic.Name]; ok {
349 if reasoning, ok := anthropicData.(*anthropic.ReasoningOptionMetadata); ok {
350 currentAssistant.AppendReasoningSignature(reasoning.Signature)
351 }
352 }
353 if googleData, ok := reasoning.ProviderMetadata[google.Name]; ok {
354 if reasoning, ok := googleData.(*google.ReasoningMetadata); ok {
355 currentAssistant.AppendThoughtSignature(reasoning.Signature, reasoning.ToolID)
356 }
357 }
358 if openaiData, ok := reasoning.ProviderMetadata[openai.Name]; ok {
359 if reasoning, ok := openaiData.(*openai.ResponsesReasoningMetadata); ok {
360 currentAssistant.SetReasoningResponsesData(reasoning)
361 }
362 }
363 currentAssistant.FinishThinking()
364 return a.messages.Update(genCtx, *currentAssistant)
365 },
366 OnTextDelta: func(id string, text string) error {
367 // Strip leading newline from initial text content. This is is
368 // particularly important in non-interactive mode where leading
369 // newlines are very visible.
370 if len(currentAssistant.Parts) == 0 {
371 text = strings.TrimPrefix(text, "\n")
372 }
373
374 currentAssistant.AppendContent(text)
375 return a.messages.Update(genCtx, *currentAssistant)
376 },
377 OnToolInputStart: func(id string, toolName string) error {
378 toolCall := message.ToolCall{
379 ID: id,
380 Name: toolName,
381 ProviderExecuted: false,
382 Finished: false,
383 }
384 currentAssistant.AddToolCall(toolCall)
385 // Use parent ctx instead of genCtx to ensure the update succeeds
386 // even if the request is canceled mid-stream
387 return a.messages.Update(ctx, *currentAssistant)
388 },
389 OnRetry: func(err *fantasy.ProviderError, delay time.Duration) {
390 slog.Warn("Provider request failed, retrying", providerRetryLogFields(err, delay)...)
391 },
392 OnToolCall: func(tc fantasy.ToolCallContent) error {
393 toolCall := message.ToolCall{
394 ID: tc.ToolCallID,
395 Name: tc.ToolName,
396 Input: tc.Input,
397 ProviderExecuted: false,
398 Finished: true,
399 }
400 currentAssistant.AddToolCall(toolCall)
401 // Use parent ctx instead of genCtx to ensure the update succeeds
402 // even if the request is canceled mid-stream
403 return a.messages.Update(ctx, *currentAssistant)
404 },
405 OnToolResult: func(result fantasy.ToolResultContent) error {
406 toolResult := a.convertToToolResult(result)
407 // Use parent ctx instead of genCtx to ensure the message is created
408 // even if the request is canceled mid-stream
409 _, createMsgErr := a.messages.Create(ctx, currentAssistant.SessionID, message.CreateMessageParams{
410 Role: message.Tool,
411 Parts: []message.ContentPart{
412 toolResult,
413 },
414 })
415 return createMsgErr
416 },
417 OnStepFinish: func(stepResult fantasy.StepResult) error {
418 finishReason := message.FinishReasonUnknown
419 switch stepResult.FinishReason {
420 case fantasy.FinishReasonLength:
421 finishReason = message.FinishReasonMaxTokens
422 case fantasy.FinishReasonStop:
423 finishReason = message.FinishReasonEndTurn
424 case fantasy.FinishReasonToolCalls:
425 finishReason = message.FinishReasonToolUse
426 }
427 // If a tool result halted the turn (e.g. a hook halt or a
428 // permission denial), the step ends on FinishReasonToolCalls but
429 // the model will not be called again. Treat it as the end of the
430 // turn so the UI can render the assistant footer.
431 if finishReason == message.FinishReasonToolUse {
432 for _, tr := range stepResult.Content.ToolResults() {
433 if tr.StopTurn {
434 finishReason = message.FinishReasonEndTurn
435 break
436 }
437 }
438 }
439 currentAssistant.AddFinish(finishReason, "", "")
440 sessionLock.Lock()
441 defer sessionLock.Unlock()
442
443 updatedSession, getSessionErr := a.sessions.Get(ctx, call.SessionID)
444 if getSessionErr != nil {
445 return getSessionErr
446 }
447 a.updateSessionUsage(largeModel, &updatedSession, stepResult.Usage, a.openrouterCost(stepResult.ProviderMetadata))
448 _, sessionErr := a.sessions.Save(ctx, updatedSession)
449 if sessionErr != nil {
450 return sessionErr
451 }
452 currentSession = updatedSession
453 return a.messages.Update(genCtx, *currentAssistant)
454 },
455 StopWhen: []fantasy.StopCondition{
456 func(_ []fantasy.StepResult) bool {
457 cw := int64(largeModel.CatwalkCfg.ContextWindow)
458 // If context window is unknown (0), skip auto-summarize
459 // to avoid immediately truncating custom/local models.
460 if cw == 0 {
461 return false
462 }
463 tokens := currentSession.CompletionTokens + currentSession.PromptTokens
464 remaining := cw - tokens
465 var threshold int64
466 if cw > largeContextWindowThreshold {
467 threshold = largeContextWindowBuffer
468 } else {
469 threshold = int64(float64(cw) * smallContextWindowRatio)
470 }
471 if (remaining <= threshold) && !a.disableAutoSummarize {
472 shouldSummarize = true
473 return true
474 }
475 return false
476 },
477 func(steps []fantasy.StepResult) bool {
478 return hasRepeatedToolCalls(steps, loopDetectionWindowSize, loopDetectionMaxRepeats)
479 },
480 },
481 })
482
483 a.eventPromptResponded(call.SessionID, time.Since(startTime).Truncate(time.Second))
484
485 if err != nil {
486 isHyper := largeModel.ModelCfg.Provider == hyper.Name
487 isCancelErr := errors.Is(err, context.Canceled)
488 if currentAssistant == nil {
489 return result, err
490 }
491 // Ensure we finish thinking on error to close the reasoning state.
492 currentAssistant.FinishThinking()
493 toolCalls := currentAssistant.ToolCalls()
494 // INFO: we use the parent context here because the genCtx has been cancelled.
495 msgs, createErr := a.messages.List(ctx, currentAssistant.SessionID)
496 if createErr != nil {
497 return nil, createErr
498 }
499 for _, tc := range toolCalls {
500 if !tc.Finished {
501 tc.Finished = true
502 tc.Input = "{}"
503 currentAssistant.AddToolCall(tc)
504 updateErr := a.messages.Update(ctx, *currentAssistant)
505 if updateErr != nil {
506 return nil, updateErr
507 }
508 }
509
510 found := false
511 for _, msg := range msgs {
512 if msg.Role == message.Tool {
513 for _, tr := range msg.ToolResults() {
514 if tr.ToolCallID == tc.ID {
515 found = true
516 break
517 }
518 }
519 }
520 if found {
521 break
522 }
523 }
524 if found {
525 continue
526 }
527 content := "There was an error while executing the tool"
528 if isCancelErr {
529 content = "Error: user cancelled assistant tool calling"
530 }
531 toolResult := message.ToolResult{
532 ToolCallID: tc.ID,
533 Name: tc.Name,
534 Content: content,
535 IsError: true,
536 }
537 _, createErr = a.messages.Create(ctx, currentAssistant.SessionID, message.CreateMessageParams{
538 Role: message.Tool,
539 Parts: []message.ContentPart{
540 toolResult,
541 },
542 })
543 if createErr != nil {
544 return nil, createErr
545 }
546 }
547 var fantasyErr *fantasy.Error
548 var providerErr *fantasy.ProviderError
549 const defaultTitle = "Provider Error"
550 linkStyle := lipgloss.NewStyle().Foreground(charmtone.Guac).Underline(true)
551 if isCancelErr {
552 currentAssistant.AddFinish(message.FinishReasonCanceled, "User canceled request", "")
553 } else if isHyper && errors.As(err, &providerErr) && providerErr.StatusCode == http.StatusUnauthorized {
554 currentAssistant.AddFinish(message.FinishReasonError, "Unauthorized", `Please re-authenticate with Hyper. You can also run "crush auth" to re-authenticate.`)
555 if a.notify != nil {
556 a.notify.Publish(pubsub.CreatedEvent, notify.Notification{
557 SessionID: call.SessionID,
558 SessionTitle: currentSession.Title,
559 Type: notify.TypeReAuthenticate,
560 ProviderID: largeModel.ModelCfg.Provider,
561 })
562 }
563 } else if isHyper && errors.As(err, &providerErr) && providerErr.StatusCode == http.StatusPaymentRequired {
564 url := hyper.BaseURL()
565 link := linkStyle.Hyperlink(url, "id=hyper").Render(url)
566 currentAssistant.AddFinish(message.FinishReasonError, "No credits", "You're out of credits. Add more at "+link)
567 } else if errors.As(err, &providerErr) {
568 if providerErr.Message == "The requested model is not supported." {
569 url := "https://github.com/settings/copilot/features"
570 link := linkStyle.Hyperlink(url, "id=copilot").Render(url)
571 currentAssistant.AddFinish(
572 message.FinishReasonError,
573 "Copilot model not enabled",
574 fmt.Sprintf("%q is not enabled in Copilot. Go to the following page to enable it. Then, wait 5 minutes before trying again. %s", largeModel.CatwalkCfg.Name, link),
575 )
576 } else {
577 currentAssistant.AddFinish(message.FinishReasonError, cmp.Or(stringext.Capitalize(providerErr.Title), defaultTitle), providerErr.Message)
578 }
579 } else if errors.As(err, &fantasyErr) {
580 currentAssistant.AddFinish(message.FinishReasonError, cmp.Or(stringext.Capitalize(fantasyErr.Title), defaultTitle), fantasyErr.Message)
581 } else {
582 currentAssistant.AddFinish(message.FinishReasonError, defaultTitle, err.Error())
583 }
584 // Note: we use the parent context here because the genCtx has been
585 // cancelled.
586 updateErr := a.messages.Update(ctx, *currentAssistant)
587 if updateErr != nil {
588 return nil, updateErr
589 }
590 return nil, err
591 }
592
593 if shouldSummarize {
594 a.activeRequests.Del(call.SessionID)
595 if summarizeErr := a.Summarize(genCtx, call.SessionID, call.ProviderOptions); summarizeErr != nil {
596 return nil, summarizeErr
597 }
598 // If the agent wasn't done...
599 if len(currentAssistant.ToolCalls()) > 0 {
600 existing, ok := a.messageQueue.Get(call.SessionID)
601 if !ok {
602 existing = []SessionAgentCall{}
603 }
604 call.Prompt = fmt.Sprintf("The previous session was interrupted because it got too long, the initial user request was: `%s`", call.Prompt)
605 existing = append(existing, call)
606 a.messageQueue.Set(call.SessionID, existing)
607 }
608 }
609
610 // Release active request before publishing the notification.
611 // TUI handlers poll IsSessionBusy() and only re-evaluate when a
612 // tea.Msg arrives, so the cleanup must precede the notify or
613 // subscribers see stale busy state at the moment of receipt.
614 a.activeRequests.Del(call.SessionID)
615 cancel()
616
617 // Send notification that agent has finished its turn (skip for
618 // nested/non-interactive sessions).
619 if !call.NonInteractive && a.notify != nil {
620 a.notify.Publish(pubsub.CreatedEvent, notify.Notification{
621 SessionID: call.SessionID,
622 SessionTitle: currentSession.Title,
623 Type: notify.TypeAgentFinished,
624 })
625 }
626
627 queuedMessages, ok := a.messageQueue.Get(call.SessionID)
628 if !ok || len(queuedMessages) == 0 {
629 return result, err
630 }
631 // There are queued messages restart the loop.
632 firstQueuedMessage := queuedMessages[0]
633 a.messageQueue.Set(call.SessionID, queuedMessages[1:])
634 return a.Run(ctx, firstQueuedMessage)
635}
636
637func (a *sessionAgent) Summarize(ctx context.Context, sessionID string, opts fantasy.ProviderOptions) error {
638 if a.IsSessionBusy(sessionID) {
639 return ErrSessionBusy
640 }
641
642 // Copy mutable fields under lock to avoid races with SetModels.
643 largeModel := a.largeModel.Get()
644 systemPromptPrefix := a.systemPromptPrefix.Get()
645
646 currentSession, err := a.sessions.Get(ctx, sessionID)
647 if err != nil {
648 return fmt.Errorf("failed to get session: %w", err)
649 }
650 msgs, err := a.getSessionMessages(ctx, currentSession)
651 if err != nil {
652 return err
653 }
654 if len(msgs) == 0 {
655 // Nothing to summarize.
656 return nil
657 }
658
659 aiMsgs, _ := a.preparePrompt(msgs, largeModel.CatwalkCfg.SupportsImages)
660
661 genCtx, cancel := context.WithCancel(ctx)
662 a.activeRequests.Set(sessionID, cancel)
663 defer a.activeRequests.Del(sessionID)
664 defer cancel()
665 defer func() {
666 if flushErr := a.messages.FlushAll(ctx); flushErr != nil {
667 slog.Error("Failed to flush pending message updates after summarize", "error", flushErr)
668 }
669 }()
670
671 agent := fantasy.NewAgent(largeModel.Model,
672 fantasy.WithSystemPrompt(string(summaryPrompt)),
673 fantasy.WithUserAgent(userAgent),
674 )
675 summaryMessage, err := a.messages.Create(ctx, sessionID, message.CreateMessageParams{
676 Role: message.Assistant,
677 Model: largeModel.Model.Model(),
678 Provider: largeModel.Model.Provider(),
679 IsSummaryMessage: true,
680 })
681 if err != nil {
682 return err
683 }
684
685 summaryPromptText := buildSummaryPrompt(currentSession.Todos)
686
687 resp, err := agent.Stream(genCtx, fantasy.AgentStreamCall{
688 Prompt: summaryPromptText,
689 Messages: aiMsgs,
690 ProviderOptions: opts,
691 PrepareStep: func(callContext context.Context, options fantasy.PrepareStepFunctionOptions) (_ context.Context, prepared fantasy.PrepareStepResult, err error) {
692 prepared.Messages = options.Messages
693 if systemPromptPrefix != "" {
694 prepared.Messages = append([]fantasy.Message{fantasy.NewSystemMessage(systemPromptPrefix)}, prepared.Messages...)
695 }
696 return callContext, prepared, nil
697 },
698 OnReasoningDelta: func(id string, text string) error {
699 summaryMessage.AppendReasoningContent(text)
700 return a.messages.Update(genCtx, summaryMessage)
701 },
702 OnReasoningEnd: func(id string, reasoning fantasy.ReasoningContent) error {
703 // Handle anthropic signature.
704 if anthropicData, ok := reasoning.ProviderMetadata["anthropic"]; ok {
705 if signature, ok := anthropicData.(*anthropic.ReasoningOptionMetadata); ok && signature.Signature != "" {
706 summaryMessage.AppendReasoningSignature(signature.Signature)
707 }
708 }
709 summaryMessage.FinishThinking()
710 return a.messages.Update(genCtx, summaryMessage)
711 },
712 OnTextDelta: func(id, text string) error {
713 summaryMessage.AppendContent(text)
714 return a.messages.Update(genCtx, summaryMessage)
715 },
716 })
717 if err != nil {
718 isCancelErr := errors.Is(err, context.Canceled)
719 if isCancelErr {
720 // User cancelled summarize we need to remove the summary message.
721 deleteErr := a.messages.Delete(ctx, summaryMessage.ID)
722 return deleteErr
723 }
724 // Mark the summary message as finished with an error so the UI
725 // stops spinning.
726 summaryMessage.AddFinish(message.FinishReasonError, "Summarization Error", err.Error())
727 if updateErr := a.messages.Update(ctx, summaryMessage); updateErr != nil {
728 return updateErr
729 }
730 return err
731 }
732
733 summaryMessage.AddFinish(message.FinishReasonEndTurn, "", "")
734 err = a.messages.Update(genCtx, summaryMessage)
735 if err != nil {
736 return err
737 }
738
739 var openrouterCost *float64
740 for _, step := range resp.Steps {
741 stepCost := a.openrouterCost(step.ProviderMetadata)
742 if stepCost != nil {
743 newCost := *stepCost
744 if openrouterCost != nil {
745 newCost += *openrouterCost
746 }
747 openrouterCost = &newCost
748 }
749 }
750
751 a.updateSessionUsage(largeModel, ¤tSession, resp.TotalUsage, openrouterCost)
752
753 // Just in case, get just the last usage info.
754 usage := resp.Response.Usage
755 currentSession.SummaryMessageID = summaryMessage.ID
756 currentSession.CompletionTokens = usage.OutputTokens
757 currentSession.PromptTokens = 0
758 _, err = a.sessions.Save(genCtx, currentSession)
759 if err != nil {
760 return err
761 }
762
763 // Release the active request before processing queued messages so that
764 // Run() does not see the session as busy.
765 a.activeRequests.Del(sessionID)
766 cancel()
767
768 // Process any messages that were queued while summarizing.
769 queuedMessages, ok := a.messageQueue.Get(sessionID)
770 if !ok || len(queuedMessages) == 0 {
771 return nil
772 }
773 firstQueuedMessage := queuedMessages[0]
774 a.messageQueue.Set(sessionID, queuedMessages[1:])
775 _, qErr := a.Run(ctx, firstQueuedMessage)
776 return qErr
777}
778
779func (a *sessionAgent) getCacheControlOptions() fantasy.ProviderOptions {
780 if t, _ := strconv.ParseBool(os.Getenv("CRUSH_DISABLE_ANTHROPIC_CACHE")); t {
781 return fantasy.ProviderOptions{}
782 }
783 return fantasy.ProviderOptions{
784 anthropic.Name: &anthropic.ProviderCacheControlOptions{
785 CacheControl: anthropic.CacheControl{Type: "ephemeral"},
786 },
787 bedrock.Name: &anthropic.ProviderCacheControlOptions{
788 CacheControl: anthropic.CacheControl{Type: "ephemeral"},
789 },
790 vercel.Name: &anthropic.ProviderCacheControlOptions{
791 CacheControl: anthropic.CacheControl{Type: "ephemeral"},
792 },
793 }
794}
795
796func (a *sessionAgent) createUserMessage(ctx context.Context, call SessionAgentCall) (message.Message, error) {
797 parts := []message.ContentPart{message.TextContent{Text: call.Prompt}}
798 var attachmentParts []message.ContentPart
799 for _, attachment := range call.Attachments {
800 attachmentParts = append(attachmentParts, message.BinaryContent{Path: attachment.FilePath, MIMEType: attachment.MimeType, Data: attachment.Content})
801 }
802 parts = append(parts, attachmentParts...)
803 msg, err := a.messages.Create(ctx, call.SessionID, message.CreateMessageParams{
804 Role: message.User,
805 Parts: parts,
806 })
807 if err != nil {
808 return message.Message{}, fmt.Errorf("failed to create user message: %w", err)
809 }
810 return msg, nil
811}
812
813func (a *sessionAgent) preparePrompt(msgs []message.Message, supportsImages bool, attachments ...message.Attachment) ([]fantasy.Message, []fantasy.FilePart) {
814 var history []fantasy.Message
815 if !a.isSubAgent {
816 history = append(history, fantasy.NewUserMessage(
817 fmt.Sprintf("<system_reminder>%s</system_reminder>",
818 `This is a reminder that your todo list is currently empty. DO NOT mention this to the user explicitly because they are already aware.
819If you are working on tasks that would benefit from a todo list please use the "todos" tool to create one.
820If not, please feel free to ignore. Again do not mention this message to the user.`,
821 ),
822 ))
823 }
824 // Collect all tool call IDs present in assistant messages and all tool
825 // result IDs present in tool messages. This lets us detect both orphaned
826 // tool results (result without a call) and orphaned tool calls (call
827 // without a result).
828 knownToolCallIDs := make(map[string]struct{})
829 knownToolResultIDs := make(map[string]struct{})
830 for _, m := range msgs {
831 switch m.Role {
832 case message.Assistant:
833 for _, tc := range m.ToolCalls() {
834 knownToolCallIDs[tc.ID] = struct{}{}
835 }
836 case message.Tool:
837 for _, tr := range m.ToolResults() {
838 knownToolResultIDs[tr.ToolCallID] = struct{}{}
839 }
840 }
841 }
842
843 for _, m := range msgs {
844 if len(m.Parts) == 0 {
845 continue
846 }
847 // Assistant message without content or tool calls (cancelled before it returned anything).
848 if m.Role == message.Assistant && len(m.ToolCalls()) == 0 && m.Content().Text == "" && m.ReasoningContent().String() == "" {
849 continue
850 }
851 if m.Role == message.Tool {
852 if msg, ok := filterOrphanedToolResults(m, knownToolCallIDs); ok {
853 history = append(history, msg)
854 }
855 continue
856 }
857 aiMsgs := m.ToAIMessage()
858 if !supportsImages {
859 for i := range aiMsgs {
860 if aiMsgs[i].Role == fantasy.MessageRoleUser {
861 aiMsgs[i].Content = filterFileParts(aiMsgs[i].Content)
862 }
863 }
864 }
865 history = append(history, aiMsgs...)
866
867 if m.Role == message.Assistant {
868 if msg, ok := syntheticToolResultsForOrphanedCalls(m, knownToolResultIDs); ok {
869 history = append(history, msg)
870 }
871 }
872 }
873
874 var files []fantasy.FilePart
875 for _, attachment := range attachments {
876 if attachment.IsText() {
877 continue
878 }
879 files = append(files, fantasy.FilePart{
880 Filename: attachment.FileName,
881 Data: attachment.Content,
882 MediaType: attachment.MimeType,
883 })
884 }
885
886 return history, files
887}
888
889// filterFileParts removes fantasy.FilePart entries from a slice of message
890// parts. Used to strip image attachments from historical user messages when
891// the current model does not support them.
892func filterFileParts(parts []fantasy.MessagePart) []fantasy.MessagePart {
893 filtered := make([]fantasy.MessagePart, 0, len(parts))
894 for _, part := range parts {
895 if _, ok := fantasy.AsMessagePart[fantasy.FilePart](part); ok {
896 continue
897 }
898 filtered = append(filtered, part)
899 }
900 return filtered
901}
902
903// filterOrphanedToolResults converts a tool message to a fantasy.Message,
904// dropping any tool result parts whose tool_call_id has no matching tool call
905// in the known set. An orphaned result causes API validation to fail on every
906// subsequent turn, permanently locking the session. Returns the filtered
907// message and true if at least one valid part remains.
908func filterOrphanedToolResults(m message.Message, knownToolCallIDs map[string]struct{}) (fantasy.Message, bool) {
909 aiMsgs := m.ToAIMessage()
910 if len(aiMsgs) == 0 {
911 return fantasy.Message{}, false
912 }
913 var validParts []fantasy.MessagePart
914 for _, part := range aiMsgs[0].Content {
915 tr, ok := fantasy.AsMessagePart[fantasy.ToolResultPart](part)
916 if !ok {
917 validParts = append(validParts, part)
918 continue
919 }
920 if _, known := knownToolCallIDs[tr.ToolCallID]; known {
921 validParts = append(validParts, part)
922 } else {
923 slog.Warn("Dropping orphaned tool result with no matching tool call",
924 "tool_call_id", tr.ToolCallID,
925 )
926 }
927 }
928 if len(validParts) == 0 {
929 return fantasy.Message{}, false
930 }
931 msg := aiMsgs[0]
932 msg.Content = validParts
933 return msg, true
934}
935
936// syntheticToolResultsForOrphanedCalls returns a tool message containing
937// synthetic tool results for any tool calls in the assistant message that
938// have no matching result in knownToolResultIDs. LLM APIs require every
939// tool_use to be immediately followed by a tool_result; an interrupted
940// session can leave orphaned tool_use blocks that permanently lock the
941// conversation. Returns the message and true if any synthetic results were
942// produced.
943func syntheticToolResultsForOrphanedCalls(m message.Message, knownToolResultIDs map[string]struct{}) (fantasy.Message, bool) {
944 var syntheticParts []fantasy.MessagePart
945 for _, tc := range m.ToolCalls() {
946 if _, hasResult := knownToolResultIDs[tc.ID]; hasResult {
947 continue
948 }
949 slog.Warn("Injecting synthetic tool result for orphaned tool call",
950 "tool_call_id", tc.ID,
951 "tool_name", tc.Name,
952 )
953 syntheticParts = append(syntheticParts, fantasy.ToolResultPart{
954 ToolCallID: tc.ID,
955 Output: fantasy.ToolResultOutputContentError{
956 Error: errors.New("tool call was interrupted and did not produce a result, you may retry this call if the result is still needed"),
957 },
958 })
959 }
960 if len(syntheticParts) == 0 {
961 return fantasy.Message{}, false
962 }
963 return fantasy.Message{
964 Role: fantasy.MessageRoleTool,
965 Content: syntheticParts,
966 }, true
967}
968
969func (a *sessionAgent) getSessionMessages(ctx context.Context, session session.Session) ([]message.Message, error) {
970 msgs, err := a.messages.List(ctx, session.ID)
971 if err != nil {
972 return nil, fmt.Errorf("failed to list messages: %w", err)
973 }
974
975 if session.SummaryMessageID != "" {
976 summaryMsgIndex := -1
977 for i, msg := range msgs {
978 if msg.ID == session.SummaryMessageID {
979 summaryMsgIndex = i
980 break
981 }
982 }
983 if summaryMsgIndex != -1 {
984 msgs = msgs[summaryMsgIndex:]
985 msgs[0].Role = message.User
986 }
987 }
988 return msgs, nil
989}
990
991// generateTitle generates a session titled based on the initial prompt.
992func (a *sessionAgent) generateTitle(ctx context.Context, sessionID string, userPrompt string) {
993 if userPrompt == "" {
994 return
995 }
996
997 smallModel := a.smallModel.Get()
998 largeModel := a.largeModel.Get()
999 systemPromptPrefix := a.systemPromptPrefix.Get()
1000
1001 var maxOutputTokens int64 = 40
1002 if smallModel.CatwalkCfg.CanReason {
1003 maxOutputTokens = smallModel.CatwalkCfg.DefaultMaxTokens
1004 }
1005
1006 newAgent := func(m fantasy.LanguageModel, p []byte, tok int64) fantasy.Agent {
1007 return fantasy.NewAgent(m,
1008 fantasy.WithSystemPrompt(string(p)+"\n /no_think"),
1009 fantasy.WithMaxOutputTokens(tok),
1010 fantasy.WithUserAgent(userAgent),
1011 )
1012 }
1013
1014 streamCall := fantasy.AgentStreamCall{
1015 Prompt: fmt.Sprintf("Generate a concise title for the following content:\n\n%s\n <think>\n\n</think>", userPrompt),
1016 PrepareStep: func(callCtx context.Context, opts fantasy.PrepareStepFunctionOptions) (_ context.Context, prepared fantasy.PrepareStepResult, err error) {
1017 prepared.Messages = opts.Messages
1018 if systemPromptPrefix != "" {
1019 prepared.Messages = append([]fantasy.Message{
1020 fantasy.NewSystemMessage(systemPromptPrefix),
1021 }, prepared.Messages...)
1022 }
1023 return callCtx, prepared, nil
1024 },
1025 }
1026
1027 // Use the small model to generate the title.
1028 model := smallModel
1029 agent := newAgent(model.Model, titlePrompt, maxOutputTokens)
1030 resp, err := agent.Stream(ctx, streamCall)
1031 if err == nil {
1032 // We successfully generated a title with the small model.
1033 slog.Debug("Generated title with small model")
1034 } else {
1035 // It didn't work. Let's try with the big model.
1036 slog.Error("Error generating title with small model; trying big model", "err", err)
1037 model = largeModel
1038 agent = newAgent(model.Model, titlePrompt, maxOutputTokens)
1039 resp, err = agent.Stream(ctx, streamCall)
1040 if err == nil {
1041 slog.Debug("Generated title with large model")
1042 } else {
1043 // Welp, the large model didn't work either. Use the default
1044 // session name and return.
1045 slog.Error("Error generating title with large model", "err", err)
1046 saveErr := a.sessions.Rename(ctx, sessionID, DefaultSessionName)
1047 if saveErr != nil {
1048 slog.Error("Failed to save session title", "error", saveErr)
1049 }
1050 return
1051 }
1052 }
1053
1054 if resp == nil {
1055 // Actually, we didn't get a response so we can't. Use the default
1056 // session name and return.
1057 slog.Error("Response is nil; can't generate title")
1058 saveErr := a.sessions.Rename(ctx, sessionID, DefaultSessionName)
1059 if saveErr != nil {
1060 slog.Error("Failed to save session title", "error", saveErr)
1061 }
1062 return
1063 }
1064
1065 // Clean up title.
1066 var title string
1067 title = strings.ReplaceAll(resp.Response.Content.Text(), "\n", " ")
1068
1069 // Remove thinking tags if present.
1070 title = thinkTagRegex.ReplaceAllString(title, "")
1071 title = orphanThinkTagRegex.ReplaceAllString(title, "")
1072
1073 title = strings.TrimSpace(title)
1074 title = cmp.Or(title, DefaultSessionName)
1075
1076 // Calculate usage and cost.
1077 var openrouterCost *float64
1078 for _, step := range resp.Steps {
1079 stepCost := a.openrouterCost(step.ProviderMetadata)
1080 if stepCost != nil {
1081 newCost := *stepCost
1082 if openrouterCost != nil {
1083 newCost += *openrouterCost
1084 }
1085 openrouterCost = &newCost
1086 }
1087 }
1088
1089 modelConfig := model.CatwalkCfg
1090 cost := modelConfig.CostPer1MInCached/1e6*float64(resp.TotalUsage.CacheCreationTokens) +
1091 modelConfig.CostPer1MOutCached/1e6*float64(resp.TotalUsage.CacheReadTokens) +
1092 modelConfig.CostPer1MIn/1e6*float64(resp.TotalUsage.InputTokens) +
1093 modelConfig.CostPer1MOut/1e6*float64(resp.TotalUsage.OutputTokens)
1094
1095 // Use override cost if available (e.g., from OpenRouter).
1096 if openrouterCost != nil {
1097 cost = *openrouterCost
1098 }
1099
1100 // Skip cost accumulation
1101 if model.FlatRate {
1102 cost = 0
1103 }
1104
1105 promptTokens := resp.TotalUsage.InputTokens + resp.TotalUsage.CacheCreationTokens
1106 completionTokens := resp.TotalUsage.OutputTokens
1107
1108 // Atomically update only title and usage fields to avoid overriding other
1109 // concurrent session updates.
1110 saveErr := a.sessions.UpdateTitleAndUsage(ctx, sessionID, title, promptTokens, completionTokens, cost)
1111 if saveErr != nil {
1112 slog.Error("Failed to save session title and usage", "error", saveErr)
1113 return
1114 }
1115}
1116
1117func (a *sessionAgent) openrouterCost(metadata fantasy.ProviderMetadata) *float64 {
1118 openrouterMetadata, ok := metadata[openrouter.Name]
1119 if !ok {
1120 return nil
1121 }
1122
1123 opts, ok := openrouterMetadata.(*openrouter.ProviderMetadata)
1124 if !ok {
1125 return nil
1126 }
1127 return &opts.Usage.Cost
1128}
1129
1130func (a *sessionAgent) updateSessionUsage(model Model, session *session.Session, usage fantasy.Usage, overrideCost *float64) {
1131 modelConfig := model.CatwalkCfg
1132 cost := modelConfig.CostPer1MInCached/1e6*float64(usage.CacheCreationTokens) +
1133 modelConfig.CostPer1MOutCached/1e6*float64(usage.CacheReadTokens) +
1134 modelConfig.CostPer1MIn/1e6*float64(usage.InputTokens) +
1135 modelConfig.CostPer1MOut/1e6*float64(usage.OutputTokens)
1136
1137 a.eventTokensUsed(session.ID, model, usage, cost)
1138
1139 // Use override cost if available (e.g., from OpenRouter).
1140 if overrideCost != nil {
1141 cost = *overrideCost
1142 }
1143
1144 // Skip cost accumulation
1145 if model.FlatRate {
1146 cost = 0
1147 }
1148
1149 session.Cost += cost
1150 session.CompletionTokens = usage.OutputTokens
1151 session.PromptTokens = usage.InputTokens + usage.CacheReadTokens
1152}
1153
1154func (a *sessionAgent) Cancel(sessionID string) {
1155 // Cancel regular requests. Don't use Take() here - we need the entry to
1156 // remain in activeRequests so IsBusy() returns true until the goroutine
1157 // fully completes (including error handling that may access the DB).
1158 // The defer in processRequest will clean up the entry.
1159 if cancel, ok := a.activeRequests.Get(sessionID); ok && cancel != nil {
1160 slog.Debug("Request cancellation initiated", "session_id", sessionID)
1161 cancel()
1162 }
1163
1164 // Also check for summarize requests.
1165 if cancel, ok := a.activeRequests.Get(sessionID + "-summarize"); ok && cancel != nil {
1166 slog.Debug("Summarize cancellation initiated", "session_id", sessionID)
1167 cancel()
1168 }
1169
1170 if a.QueuedPrompts(sessionID) > 0 {
1171 slog.Debug("Clearing queued prompts", "session_id", sessionID)
1172 a.messageQueue.Del(sessionID)
1173 }
1174}
1175
1176func (a *sessionAgent) ClearQueue(sessionID string) {
1177 if a.QueuedPrompts(sessionID) > 0 {
1178 slog.Debug("Clearing queued prompts", "session_id", sessionID)
1179 a.messageQueue.Del(sessionID)
1180 }
1181}
1182
1183func (a *sessionAgent) CancelAll() {
1184 if !a.IsBusy() {
1185 return
1186 }
1187 for key := range a.activeRequests.Seq2() {
1188 a.Cancel(key) // key is sessionID
1189 }
1190
1191 timeout := time.After(5 * time.Second)
1192 for a.IsBusy() {
1193 select {
1194 case <-timeout:
1195 return
1196 default:
1197 time.Sleep(200 * time.Millisecond)
1198 }
1199 }
1200}
1201
1202func (a *sessionAgent) IsBusy() bool {
1203 var busy bool
1204 for cancelFunc := range a.activeRequests.Seq() {
1205 if cancelFunc != nil {
1206 busy = true
1207 break
1208 }
1209 }
1210 return busy
1211}
1212
1213func (a *sessionAgent) IsSessionBusy(sessionID string) bool {
1214 _, busy := a.activeRequests.Get(sessionID)
1215 return busy
1216}
1217
1218func (a *sessionAgent) QueuedPrompts(sessionID string) int {
1219 l, ok := a.messageQueue.Get(sessionID)
1220 if !ok {
1221 return 0
1222 }
1223 return len(l)
1224}
1225
1226func (a *sessionAgent) QueuedPromptsList(sessionID string) []string {
1227 l, ok := a.messageQueue.Get(sessionID)
1228 if !ok {
1229 return nil
1230 }
1231 prompts := make([]string, len(l))
1232 for i, call := range l {
1233 prompts[i] = call.Prompt
1234 }
1235 return prompts
1236}
1237
1238func (a *sessionAgent) SetModels(large Model, small Model) {
1239 a.largeModel.Set(large)
1240 a.smallModel.Set(small)
1241}
1242
1243func (a *sessionAgent) SetTools(tools []fantasy.AgentTool) {
1244 a.tools.SetSlice(tools)
1245}
1246
1247func (a *sessionAgent) SetSystemPrompt(systemPrompt string) {
1248 a.systemPrompt.Set(systemPrompt)
1249}
1250
1251func (a *sessionAgent) Model() Model {
1252 return a.largeModel.Get()
1253}
1254
1255// convertToToolResult converts a fantasy tool result to a message tool result.
1256func (a *sessionAgent) convertToToolResult(result fantasy.ToolResultContent) message.ToolResult {
1257 baseResult := message.ToolResult{
1258 ToolCallID: result.ToolCallID,
1259 Name: result.ToolName,
1260 Metadata: result.ClientMetadata,
1261 }
1262
1263 switch result.Result.GetType() {
1264 case fantasy.ToolResultContentTypeText:
1265 if r, ok := fantasy.AsToolResultOutputType[fantasy.ToolResultOutputContentText](result.Result); ok {
1266 baseResult.Content = r.Text
1267 }
1268 case fantasy.ToolResultContentTypeError:
1269 if r, ok := fantasy.AsToolResultOutputType[fantasy.ToolResultOutputContentError](result.Result); ok {
1270 baseResult.Content = r.Error.Error()
1271 baseResult.IsError = true
1272 }
1273 case fantasy.ToolResultContentTypeMedia:
1274 if r, ok := fantasy.AsToolResultOutputType[fantasy.ToolResultOutputContentMedia](result.Result); ok {
1275 if !stringext.IsValidBase64(r.Data) {
1276 slog.Warn("Tool returned media with invalid base64 data, discarding image",
1277 "tool", result.ToolName,
1278 "tool_call_id", result.ToolCallID,
1279 )
1280 baseResult.Content = "Tool returned image data with invalid encoding"
1281 baseResult.IsError = true
1282 } else {
1283 content := r.Text
1284 if content == "" {
1285 content = fmt.Sprintf("Loaded %s content", r.MediaType)
1286 }
1287 baseResult.Content = content
1288 baseResult.Data = r.Data
1289 baseResult.MIMEType = r.MediaType
1290 }
1291 }
1292 }
1293
1294 return baseResult
1295}
1296
1297// workaroundProviderMediaLimitations converts media content in tool results to
1298// user messages for providers that don't natively support images in tool results.
1299//
1300// Problem: OpenAI, Google, OpenRouter, and other OpenAI-compatible providers
1301// don't support sending images/media in tool result messages - they only accept
1302// text in tool results. However, they DO support images in user messages.
1303//
1304// If we send media in tool results to these providers, the API returns an error.
1305//
1306// Solution: For these providers, we:
1307// 1. Replace the media in the tool result with a text placeholder
1308// 2. Inject a user message immediately after with the image as a file attachment
1309// 3. This maintains the tool execution flow while working around API limitations
1310//
1311// Anthropic and Bedrock support images natively in tool results, so we skip
1312// this workaround for them.
1313//
1314// Example transformation:
1315//
1316// BEFORE: [tool result: image data]
1317// AFTER: [tool result: "Image loaded - see attached"], [user: image attachment]
1318func (a *sessionAgent) workaroundProviderMediaLimitations(messages []fantasy.Message, largeModel Model) []fantasy.Message {
1319 providerSupportsMedia := largeModel.ModelCfg.Provider == string(catwalk.InferenceProviderAnthropic) ||
1320 largeModel.ModelCfg.Provider == string(catwalk.InferenceProviderBedrock)
1321
1322 if providerSupportsMedia {
1323 return messages
1324 }
1325
1326 convertedMessages := make([]fantasy.Message, 0, len(messages))
1327
1328 for _, msg := range messages {
1329 if msg.Role != fantasy.MessageRoleTool {
1330 convertedMessages = append(convertedMessages, msg)
1331 continue
1332 }
1333
1334 textParts := make([]fantasy.MessagePart, 0, len(msg.Content))
1335 var mediaFiles []fantasy.FilePart
1336
1337 for _, part := range msg.Content {
1338 toolResult, ok := fantasy.AsMessagePart[fantasy.ToolResultPart](part)
1339 if !ok {
1340 textParts = append(textParts, part)
1341 continue
1342 }
1343
1344 if media, ok := fantasy.AsToolResultOutputType[fantasy.ToolResultOutputContentMedia](toolResult.Output); ok {
1345 decoded, err := base64.StdEncoding.DecodeString(media.Data)
1346 if err != nil {
1347 slog.Warn("Failed to decode media data", "error", err)
1348 textParts = append(textParts, part)
1349 continue
1350 }
1351
1352 mediaFiles = append(mediaFiles, fantasy.FilePart{
1353 Data: decoded,
1354 MediaType: media.MediaType,
1355 Filename: fmt.Sprintf("tool-result-%s", toolResult.ToolCallID),
1356 })
1357
1358 textParts = append(textParts, fantasy.ToolResultPart{
1359 ToolCallID: toolResult.ToolCallID,
1360 Output: fantasy.ToolResultOutputContentText{
1361 Text: "[Image/media content loaded - see attached file]",
1362 },
1363 ProviderOptions: toolResult.ProviderOptions,
1364 })
1365 } else {
1366 textParts = append(textParts, part)
1367 }
1368 }
1369
1370 convertedMessages = append(convertedMessages, fantasy.Message{
1371 Role: fantasy.MessageRoleTool,
1372 Content: textParts,
1373 })
1374
1375 if len(mediaFiles) > 0 {
1376 convertedMessages = append(convertedMessages, fantasy.NewUserMessage(
1377 "Here is the media content from the tool result:",
1378 mediaFiles...,
1379 ))
1380 }
1381 }
1382
1383 return convertedMessages
1384}
1385
1386// buildSummaryPrompt constructs the prompt text for session summarization.
1387func buildSummaryPrompt(todos []session.Todo) string {
1388 var sb strings.Builder
1389 sb.WriteString("Provide a detailed summary of our conversation above.")
1390 if len(todos) > 0 {
1391 sb.WriteString("\n\n## Current Todo List\n\n")
1392 for _, t := range todos {
1393 fmt.Fprintf(&sb, "- [%s] %s\n", t.Status, t.Content)
1394 }
1395 sb.WriteString("\nInclude these tasks and their statuses in your summary. ")
1396 sb.WriteString("Instruct the resuming assistant to use the `todos` tool to continue tracking progress on these tasks.")
1397 }
1398 return sb.String()
1399}
1400
1401func providerRetryLogFields(err *fantasy.ProviderError, delay time.Duration) []any {
1402 fields := []any{
1403 "retry_delay", delay.String(),
1404 }
1405 if err == nil {
1406 return fields
1407 }
1408 fields = append(fields, "status_code", err.StatusCode)
1409 if err.Title != "" {
1410 fields = append(fields, "title", err.Title)
1411 }
1412 if err.Message != "" {
1413 fields = append(fields, "message", err.Message)
1414 }
1415 return fields
1416}