1// Package agent is the core orchestration layer for Crush AI agents.
2//
3// It provides session-based AI agent functionality for managing
4// conversations, tool execution, and message handling. It coordinates
5// interactions between language models, messages, sessions, and tools while
6// handling features like automatic summarization, queuing, and token
7// management.
8package agent
9
10import (
11 "cmp"
12 "context"
13 _ "embed"
14 "encoding/base64"
15 "errors"
16 "fmt"
17 "log/slog"
18 "net/http"
19 "os"
20 "regexp"
21 "strconv"
22 "strings"
23 "sync"
24 "time"
25
26 "charm.land/catwalk/pkg/catwalk"
27 "charm.land/fantasy"
28 "charm.land/fantasy/providers/anthropic"
29 "charm.land/fantasy/providers/bedrock"
30 "charm.land/fantasy/providers/google"
31 "charm.land/fantasy/providers/openai"
32 "charm.land/fantasy/providers/openrouter"
33 "charm.land/fantasy/providers/vercel"
34 "charm.land/lipgloss/v2"
35 "github.com/charmbracelet/crush/internal/agent/hyper"
36 "github.com/charmbracelet/crush/internal/agent/notify"
37 "github.com/charmbracelet/crush/internal/agent/tools"
38 "github.com/charmbracelet/crush/internal/agent/tools/mcp"
39 "github.com/charmbracelet/crush/internal/config"
40 "github.com/charmbracelet/crush/internal/csync"
41 "github.com/charmbracelet/crush/internal/message"
42 "github.com/charmbracelet/crush/internal/pubsub"
43 "github.com/charmbracelet/crush/internal/session"
44 "github.com/charmbracelet/crush/internal/stringext"
45 "github.com/charmbracelet/crush/internal/version"
46 "github.com/charmbracelet/x/exp/charmtone"
47)
48
49const (
50 DefaultSessionName = "Untitled Session"
51
52 // Constants for auto-summarization thresholds
53 largeContextWindowThreshold = 200_000
54 largeContextWindowBuffer = 20_000
55 smallContextWindowRatio = 0.2
56)
57
58var userAgent = fmt.Sprintf("Charm-Crush/%s (https://charm.land/crush)", version.Version)
59
60//go:embed templates/title.md
61var titlePrompt []byte
62
63//go:embed templates/summary.md
64var summaryPrompt []byte
65
66// Used to remove <think> tags from generated titles.
67var (
68 thinkTagRegex = regexp.MustCompile(`(?s)<think>.*?</think>`)
69 orphanThinkTagRegex = regexp.MustCompile(`</?think>`)
70)
71
72type SessionAgentCall struct {
73 SessionID string
74 // RunID, when non-empty, is the caller-supplied correlator that
75 // gets echoed back on the notify.RunComplete event emitted for
76 // this turn. It is preserved when the call is enqueued behind a
77 // busy session so the queued turn's terminal event is still
78 // recognisable to the original caller. Callers that need a
79 // reliable completion contract (e.g. `crush run` against a
80 // session that may be busy) MUST set it; SessionID alone is
81 // ambiguous when concurrent turns share the same session.
82 RunID string
83 Prompt string
84 ProviderOptions fantasy.ProviderOptions
85 Attachments []message.Attachment
86 MaxOutputTokens int64
87 Temperature *float64
88 TopP *float64
89 TopK *int64
90 FrequencyPenalty *float64
91 PresencePenalty *float64
92 NonInteractive bool
93 // OnComplete, when non-nil, replaces the default RunComplete
94 // publish path: the inner Run hands the terminal payload to this
95 // callback instead of emitting it on the RunComplete broker. The
96 // coordinator uses this hook to coalesce the unauthorized →
97 // re-auth → retry chain into a single user-visible terminal
98 // event, so non-interactive clients (e.g. `crush run`) don't
99 // exit on a stale failed-attempt RunComplete before the
100 // successful retry. It is intentionally stripped when queueing
101 // a busy-session call (see Run): the originating
102 // coordinator.Run has long returned by the time the queued
103 // recursion drains, so falling back to the default broker
104 // publish keeps the event visible to subscribers.
105 OnComplete func(notify.RunComplete)
106}
107
108type SessionAgent interface {
109 Run(context.Context, SessionAgentCall) (*fantasy.AgentResult, error)
110 SetModels(large Model, small Model)
111 SetTools(tools []fantasy.AgentTool)
112 SetSystemPrompt(systemPrompt string)
113 Cancel(sessionID string)
114 CancelAll()
115 IsSessionBusy(sessionID string) bool
116 IsBusy() bool
117 QueuedPrompts(sessionID string) int
118 QueuedPromptsList(sessionID string) []string
119 ClearQueue(sessionID string)
120 Summarize(context.Context, string, fantasy.ProviderOptions) error
121 Model() Model
122}
123
124type Model struct {
125 Model fantasy.LanguageModel
126 CatwalkCfg catwalk.Model
127 ModelCfg config.SelectedModel
128 FlatRate bool
129}
130
131type sessionAgent struct {
132 largeModel *csync.Value[Model]
133 smallModel *csync.Value[Model]
134 systemPromptPrefix *csync.Value[string]
135 systemPrompt *csync.Value[string]
136 tools *csync.Slice[fantasy.AgentTool]
137
138 isSubAgent bool
139 sessions session.Service
140 messages message.Service
141 disableAutoSummarize bool
142 isYolo bool
143 notify pubsub.Publisher[notify.Notification]
144 runComplete pubsub.Publisher[notify.RunComplete]
145
146 messageQueue *csync.Map[string, []SessionAgentCall]
147 activeRequests *csync.Map[string, context.CancelFunc]
148}
149
150type SessionAgentOptions struct {
151 LargeModel Model
152 SmallModel Model
153 SystemPromptPrefix string
154 SystemPrompt string
155 IsSubAgent bool
156 DisableAutoSummarize bool
157 IsYolo bool
158 Sessions session.Service
159 Messages message.Service
160 Tools []fantasy.AgentTool
161 Notify pubsub.Publisher[notify.Notification]
162 RunComplete pubsub.Publisher[notify.RunComplete]
163}
164
165func NewSessionAgent(
166 opts SessionAgentOptions,
167) SessionAgent {
168 return &sessionAgent{
169 largeModel: csync.NewValue(opts.LargeModel),
170 smallModel: csync.NewValue(opts.SmallModel),
171 systemPromptPrefix: csync.NewValue(opts.SystemPromptPrefix),
172 systemPrompt: csync.NewValue(opts.SystemPrompt),
173 isSubAgent: opts.IsSubAgent,
174 sessions: opts.Sessions,
175 messages: opts.Messages,
176 disableAutoSummarize: opts.DisableAutoSummarize,
177 tools: csync.NewSliceFrom(opts.Tools),
178 isYolo: opts.IsYolo,
179 notify: opts.Notify,
180 runComplete: opts.RunComplete,
181 messageQueue: csync.NewMap[string, []SessionAgentCall](),
182 activeRequests: csync.NewMap[string, context.CancelFunc](),
183 }
184}
185
186func (a *sessionAgent) Run(ctx context.Context, call SessionAgentCall) (result *fantasy.AgentResult, retErr error) {
187 if call.Prompt == "" && !message.ContainsTextAttachment(call.Attachments) {
188 return nil, ErrEmptyPrompt
189 }
190 if call.SessionID == "" {
191 return nil, ErrSessionMissing
192 }
193
194 // Queue the message if busy. Strip OnComplete: the caller that
195 // supplied the hook (typically coordinator.Run) has its own
196 // retry/coalesce scope that ends when it returns, so by the time
197 // the queue drains nobody is left to consume the buffered
198 // terminal event. The recursive Run will fall back to the
199 // default broker publish, which is what existing subscribers
200 // expect for queued turns.
201 if a.IsSessionBusy(call.SessionID) {
202 existing, ok := a.messageQueue.Get(call.SessionID)
203 if !ok {
204 existing = []SessionAgentCall{}
205 }
206 queued := call
207 queued.OnComplete = nil
208 existing = append(existing, queued)
209 a.messageQueue.Set(call.SessionID, existing)
210 return nil, nil
211 }
212
213 // Copy mutable fields under lock to avoid races with SetTools/SetModels.
214 agentTools := a.tools.Copy()
215 largeModel := a.largeModel.Get()
216 systemPrompt := a.systemPrompt.Get()
217 promptPrefix := a.systemPromptPrefix.Get()
218 var instructions strings.Builder
219
220 for _, server := range mcp.GetStates() {
221 if server.State != mcp.StateConnected {
222 continue
223 }
224 if s := server.Client.InitializeResult().Instructions; s != "" {
225 instructions.WriteString(s)
226 instructions.WriteString("\n\n")
227 }
228 }
229
230 if s := instructions.String(); s != "" {
231 systemPrompt += "\n\n<mcp-instructions>\n" + s + "\n</mcp-instructions>"
232 }
233
234 if len(agentTools) > 0 {
235 // Add Anthropic caching to the last tool.
236 agentTools[len(agentTools)-1].SetProviderOptions(a.getCacheControlOptions())
237 }
238
239 agent := fantasy.NewAgent(
240 largeModel.Model,
241 fantasy.WithSystemPrompt(systemPrompt),
242 fantasy.WithTools(agentTools...),
243 fantasy.WithUserAgent(userAgent),
244 )
245
246 sessionLock := sync.Mutex{}
247 currentSession, err := a.sessions.Get(ctx, call.SessionID)
248 if err != nil {
249 return nil, fmt.Errorf("failed to get session: %w", err)
250 }
251
252 msgs, err := a.getSessionMessages(ctx, currentSession)
253 if err != nil {
254 return nil, fmt.Errorf("failed to get session messages: %w", err)
255 }
256
257 var wg sync.WaitGroup
258 // Generate title if first message.
259 if len(msgs) == 0 {
260 titleCtx := ctx // Copy to avoid race with ctx reassignment below.
261 wg.Go(func() {
262 a.generateTitle(titleCtx, call.SessionID, call.Prompt)
263 })
264 }
265 defer wg.Wait()
266
267 // Add the user message to the session.
268 _, err = a.createUserMessage(ctx, call)
269 if err != nil {
270 return nil, err
271 }
272
273 // Add the session to the context.
274 ctx = context.WithValue(ctx, tools.SessionIDContextKey, call.SessionID)
275
276 genCtx, cancel := context.WithCancel(ctx)
277 a.activeRequests.Set(call.SessionID, cancel)
278
279 defer cancel()
280 defer a.activeRequests.Del(call.SessionID)
281 // skipRunComplete is set just before the queued-recursion path so
282 // the outer Run doesn't publish a RunComplete that would race
283 // with — and be superseded by — the recursive call's own
284 // RunComplete (each queued user prompt is its own turn and
285 // publishes exactly one terminal event).
286 var skipRunComplete bool
287 // currentAssistant is declared here so the deferred RunComplete
288 // publish below can capture the pointer that PrepareStep will
289 // later (re)assign for each streaming step. The final assistant
290 // message of the turn is the value reachable through this
291 // pointer when the defer runs.
292 var currentAssistant *message.Message
293 // Drain any debounced message updates before returning. message.Service
294 // already flushes synchronously on terminal updates, but a defer here
295 // guarantees the contract at every Run exit (success, error, panic
296 // recovery upstream) without callers needing to know.
297 //
298 // After the flush completes — meaning all per-message
299 // Publish(UpdatedEvent) calls have fired and been buffered into
300 // every subscriber's channel — publish the authoritative
301 // RunComplete event for this turn. The flush-then-publish order
302 // gives well-behaved clients the best chance of seeing the final
303 // message event before RunComplete; the embedded Text field
304 // reconciles for clients that observe the events out of order
305 // (the pubsub broker fan-in does not serialize publishes from
306 // different upstream brokers).
307 defer func() {
308 // Use a context detached from the run context: workspace
309 // shutdown cancels ctx before this goroutine returns, but the
310 // buffered streaming deltas must still land before the DB is
311 // closed. A short timeout bounds the flush.
312 flushCtx, flushCancel := context.WithTimeout(context.WithoutCancel(ctx), 5*time.Second)
313 defer flushCancel()
314 if flushErr := a.messages.FlushAll(flushCtx); flushErr != nil {
315 slog.Error("Failed to flush pending message updates after run", "error", flushErr)
316 }
317 if skipRunComplete {
318 return
319 }
320 complete := notify.RunComplete{SessionID: call.SessionID, RunID: call.RunID}
321 if currentAssistant != nil {
322 complete.MessageID = currentAssistant.ID
323 complete.Text = currentAssistant.Content().String()
324 }
325 if retErr != nil {
326 complete.Error = retErr.Error()
327 complete.Cancelled = errors.Is(retErr, context.Canceled)
328 } else if ctx.Err() != nil {
329 complete.Cancelled = true
330 }
331 // Prefer the per-call hook when supplied so the coordinator
332 // can coalesce retries (e.g. unauthorized → re-auth → retry)
333 // into a single user-visible terminal event. The fallback
334 // must-deliver publish applies bounded-blocking semantics to
335 // the authoritative terminal event so a momentarily-full
336 // subscriber channel can't silently drop it and hang
337 // non-interactive clients waiting on RunComplete.
338 if call.OnComplete != nil {
339 call.OnComplete(complete)
340 return
341 }
342 if a.runComplete == nil {
343 return
344 }
345 a.runComplete.PublishMustDeliver(ctx, pubsub.UpdatedEvent, complete)
346 }()
347
348 history, files := a.preparePrompt(msgs, largeModel.CatwalkCfg.SupportsImages, call.Attachments...)
349
350 startTime := time.Now()
351 a.eventPromptSent(call.SessionID)
352
353 var stepMessages []fantasy.Message
354 var shouldSummarize bool
355 // Don't send MaxOutputTokens if 0 — some providers (e.g. LM Studio) reject it
356 var maxOutputTokens *int64
357 if call.MaxOutputTokens > 0 {
358 maxOutputTokens = &call.MaxOutputTokens
359 }
360 result, err = agent.Stream(genCtx, fantasy.AgentStreamCall{
361 Prompt: message.PromptWithTextAttachments(call.Prompt, call.Attachments),
362 Files: files,
363 Messages: history,
364 ProviderOptions: call.ProviderOptions,
365 MaxOutputTokens: maxOutputTokens,
366 TopP: call.TopP,
367 Temperature: call.Temperature,
368 PresencePenalty: call.PresencePenalty,
369 TopK: call.TopK,
370 FrequencyPenalty: call.FrequencyPenalty,
371 PrepareStep: func(callContext context.Context, options fantasy.PrepareStepFunctionOptions) (_ context.Context, prepared fantasy.PrepareStepResult, err error) {
372 prepared.Messages = options.Messages
373 for i := range prepared.Messages {
374 prepared.Messages[i].ProviderOptions = nil
375 }
376
377 // Use latest tools (updated by SetTools when MCP tools change).
378 prepared.Tools = a.tools.Copy()
379
380 queuedCalls, _ := a.messageQueue.Get(call.SessionID)
381 a.messageQueue.Del(call.SessionID)
382 for _, queued := range queuedCalls {
383 userMessage, createErr := a.createUserMessage(callContext, queued)
384 if createErr != nil {
385 return callContext, prepared, createErr
386 }
387 prepared.Messages = append(prepared.Messages, userMessage.ToAIMessage()...)
388 }
389
390 prepared.Messages = a.workaroundProviderMediaLimitations(prepared.Messages, largeModel)
391
392 lastSystemRoleInx := 0
393 systemMessageUpdated := false
394 for i, msg := range prepared.Messages {
395 // Only add cache control to the last message.
396 if msg.Role == fantasy.MessageRoleSystem {
397 lastSystemRoleInx = i
398 } else if !systemMessageUpdated {
399 prepared.Messages[lastSystemRoleInx].ProviderOptions = a.getCacheControlOptions()
400 systemMessageUpdated = true
401 }
402 // Than add cache control to the last 2 messages.
403 if i > len(prepared.Messages)-3 {
404 prepared.Messages[i].ProviderOptions = a.getCacheControlOptions()
405 }
406 }
407
408 if promptPrefix != "" {
409 prepared.Messages = append([]fantasy.Message{fantasy.NewSystemMessage(promptPrefix)}, prepared.Messages...)
410 }
411
412 sessionLock.Lock()
413 stepMessages = cloneFantasyMessages(prepared.Messages)
414 sessionLock.Unlock()
415
416 var assistantMsg message.Message
417 assistantMsg, err = a.messages.Create(callContext, call.SessionID, message.CreateMessageParams{
418 Role: message.Assistant,
419 Parts: []message.ContentPart{},
420 Model: largeModel.ModelCfg.Model,
421 Provider: largeModel.ModelCfg.Provider,
422 })
423 if err != nil {
424 return callContext, prepared, err
425 }
426 callContext = context.WithValue(callContext, tools.MessageIDContextKey, assistantMsg.ID)
427 callContext = context.WithValue(callContext, tools.SupportsImagesContextKey, largeModel.CatwalkCfg.SupportsImages)
428 callContext = context.WithValue(callContext, tools.ModelNameContextKey, largeModel.CatwalkCfg.Name)
429 currentAssistant = &assistantMsg
430 return callContext, prepared, err
431 },
432 OnReasoningStart: func(id string, reasoning fantasy.ReasoningContent) error {
433 currentAssistant.AppendReasoningContent(reasoning.Text)
434 return a.messages.Update(genCtx, *currentAssistant)
435 },
436 OnReasoningDelta: func(id string, text string) error {
437 currentAssistant.AppendReasoningContent(text)
438 return a.messages.Update(genCtx, *currentAssistant)
439 },
440 OnReasoningEnd: func(id string, reasoning fantasy.ReasoningContent) error {
441 // handle anthropic signature
442 if anthropicData, ok := reasoning.ProviderMetadata[anthropic.Name]; ok {
443 if reasoning, ok := anthropicData.(*anthropic.ReasoningOptionMetadata); ok {
444 currentAssistant.AppendReasoningSignature(reasoning.Signature)
445 }
446 }
447 if googleData, ok := reasoning.ProviderMetadata[google.Name]; ok {
448 if reasoning, ok := googleData.(*google.ReasoningMetadata); ok {
449 currentAssistant.AppendThoughtSignature(reasoning.Signature, reasoning.ToolID)
450 }
451 }
452 if openaiData, ok := reasoning.ProviderMetadata[openai.Name]; ok {
453 if reasoning, ok := openaiData.(*openai.ResponsesReasoningMetadata); ok {
454 currentAssistant.SetReasoningResponsesData(reasoning)
455 }
456 }
457 currentAssistant.FinishThinking()
458 return a.messages.Update(genCtx, *currentAssistant)
459 },
460 OnTextDelta: func(id string, text string) error {
461 // Strip leading newline from initial text content. This is is
462 // particularly important in non-interactive mode where leading
463 // newlines are very visible.
464 if len(currentAssistant.Parts) == 0 {
465 text = strings.TrimPrefix(text, "\n")
466 }
467
468 currentAssistant.AppendContent(text)
469 return a.messages.Update(genCtx, *currentAssistant)
470 },
471 OnToolInputStart: func(id string, toolName string) error {
472 toolCall := message.ToolCall{
473 ID: id,
474 Name: toolName,
475 ProviderExecuted: false,
476 Finished: false,
477 }
478 currentAssistant.AddToolCall(toolCall)
479 // Use parent ctx instead of genCtx to ensure the update succeeds
480 // even if the request is canceled mid-stream
481 return a.messages.Update(ctx, *currentAssistant)
482 },
483 OnRetry: func(err *fantasy.ProviderError, delay time.Duration) {
484 slog.Warn("Provider request failed, retrying", providerRetryLogFields(err, delay)...)
485 },
486 OnToolCall: func(tc fantasy.ToolCallContent) error {
487 toolCall := message.ToolCall{
488 ID: tc.ToolCallID,
489 Name: tc.ToolName,
490 Input: tc.Input,
491 ProviderExecuted: false,
492 Finished: true,
493 }
494 currentAssistant.AddToolCall(toolCall)
495 // Use parent ctx instead of genCtx to ensure the update succeeds
496 // even if the request is canceled mid-stream
497 return a.messages.Update(ctx, *currentAssistant)
498 },
499 OnToolResult: func(result fantasy.ToolResultContent) error {
500 toolResult := a.convertToToolResult(result)
501 // Use parent ctx instead of genCtx to ensure the message is created
502 // even if the request is canceled mid-stream
503 _, createMsgErr := a.messages.Create(ctx, currentAssistant.SessionID, message.CreateMessageParams{
504 Role: message.Tool,
505 Parts: []message.ContentPart{
506 toolResult,
507 },
508 })
509 return createMsgErr
510 },
511 OnStepFinish: func(stepResult fantasy.StepResult) error {
512 finishReason := message.FinishReasonUnknown
513 switch stepResult.FinishReason {
514 case fantasy.FinishReasonLength:
515 finishReason = message.FinishReasonMaxTokens
516 case fantasy.FinishReasonStop:
517 finishReason = message.FinishReasonEndTurn
518 case fantasy.FinishReasonToolCalls:
519 finishReason = message.FinishReasonToolUse
520 }
521 // If a tool result halted the turn (e.g. a hook halt or a
522 // permission denial), the step ends on FinishReasonToolCalls but
523 // the model will not be called again. Treat it as the end of the
524 // turn so the UI can render the assistant footer.
525 if finishReason == message.FinishReasonToolUse {
526 for _, tr := range stepResult.Content.ToolResults() {
527 if tr.StopTurn {
528 finishReason = message.FinishReasonEndTurn
529 break
530 }
531 }
532 }
533 currentAssistant.AddFinish(finishReason, "", "")
534 sessionLock.Lock()
535 defer sessionLock.Unlock()
536
537 updatedSession, getSessionErr := a.sessions.Get(ctx, call.SessionID)
538 if getSessionErr != nil {
539 return getSessionErr
540 }
541 usage, estimated := fallbackStepUsage(stepMessages, stepResult)
542 a.updateSessionUsage(largeModel, &updatedSession, usage, a.openrouterCost(stepResult.ProviderMetadata), estimated)
543 _, sessionErr := a.sessions.Save(ctx, updatedSession)
544 if sessionErr != nil {
545 return sessionErr
546 }
547 currentSession = updatedSession
548 return a.messages.Update(genCtx, *currentAssistant)
549 },
550 StopWhen: []fantasy.StopCondition{
551 func(_ []fantasy.StepResult) bool {
552 cw := int64(largeModel.CatwalkCfg.ContextWindow)
553 // If context window is unknown (0), skip auto-summarize
554 // to avoid immediately truncating custom/local models.
555 if cw == 0 {
556 return false
557 }
558 tokens := currentSession.CompletionTokens + currentSession.PromptTokens
559 remaining := cw - tokens
560 var threshold int64
561 if cw > largeContextWindowThreshold {
562 threshold = largeContextWindowBuffer
563 } else {
564 threshold = int64(float64(cw) * smallContextWindowRatio)
565 }
566 if (remaining <= threshold) && !a.disableAutoSummarize {
567 shouldSummarize = true
568 return true
569 }
570 return false
571 },
572 func(steps []fantasy.StepResult) bool {
573 return hasRepeatedToolCalls(steps, loopDetectionWindowSize, loopDetectionMaxRepeats)
574 },
575 },
576 })
577
578 a.eventPromptResponded(call.SessionID, time.Since(startTime).Truncate(time.Second))
579
580 if err != nil {
581 isHyper := largeModel.ModelCfg.Provider == hyper.Name
582 isCancelErr := errors.Is(err, context.Canceled)
583 if currentAssistant == nil {
584 return result, err
585 }
586 // Persist final state with a context detached from the run
587 // context. The run context (ctx) is derived from the
588 // workspace context, which workspace shutdown cancels before
589 // agent goroutines finish; using ctx here would drop the
590 // final assistant state. WithoutCancel keeps the values
591 // (e.g. session ID) while ignoring cancellation, and a short
592 // timeout bounds the cleanup writes.
593 cleanupCtx, cleanupCancel := context.WithTimeout(context.WithoutCancel(ctx), 5*time.Second)
594 defer cleanupCancel()
595 // Ensure we finish thinking on error to close the reasoning state.
596 currentAssistant.FinishThinking()
597 toolCalls := currentAssistant.ToolCalls()
598 // INFO: we use the cleanup context here because the genCtx has been cancelled.
599 msgs, createErr := a.messages.List(cleanupCtx, currentAssistant.SessionID)
600 if createErr != nil {
601 return nil, createErr
602 }
603 for _, tc := range toolCalls {
604 if !tc.Finished {
605 tc.Finished = true
606 tc.Input = "{}"
607 currentAssistant.AddToolCall(tc)
608 updateErr := a.messages.Update(cleanupCtx, *currentAssistant)
609 if updateErr != nil {
610 return nil, updateErr
611 }
612 }
613
614 found := false
615 for _, msg := range msgs {
616 if msg.Role == message.Tool {
617 for _, tr := range msg.ToolResults() {
618 if tr.ToolCallID == tc.ID {
619 found = true
620 break
621 }
622 }
623 }
624 if found {
625 break
626 }
627 }
628 if found {
629 continue
630 }
631 content := "There was an error while executing the tool"
632 if isCancelErr {
633 content = "Error: user cancelled assistant tool calling"
634 }
635 toolResult := message.ToolResult{
636 ToolCallID: tc.ID,
637 Name: tc.Name,
638 Content: content,
639 IsError: true,
640 }
641 _, createErr = a.messages.Create(cleanupCtx, currentAssistant.SessionID, message.CreateMessageParams{
642 Role: message.Tool,
643 Parts: []message.ContentPart{
644 toolResult,
645 },
646 })
647 if createErr != nil {
648 return nil, createErr
649 }
650 }
651 var fantasyErr *fantasy.Error
652 var providerErr *fantasy.ProviderError
653 const defaultTitle = "Provider Error"
654 linkStyle := lipgloss.NewStyle().Foreground(charmtone.Guac).Underline(true)
655 if isCancelErr {
656 currentAssistant.AddFinish(message.FinishReasonCanceled, "User canceled request", "")
657 } else if isHyper && errors.As(err, &providerErr) && providerErr.StatusCode == http.StatusUnauthorized {
658 currentAssistant.AddFinish(message.FinishReasonError, "Unauthorized", `Please re-authenticate with Hyper. You can also run "crush auth" to re-authenticate.`)
659 if a.notify != nil {
660 a.notify.Publish(pubsub.CreatedEvent, notify.Notification{
661 SessionID: call.SessionID,
662 SessionTitle: currentSession.Title,
663 Type: notify.TypeReAuthenticate,
664 ProviderID: largeModel.ModelCfg.Provider,
665 })
666 }
667 } else if isHyper && errors.As(err, &providerErr) && providerErr.StatusCode == http.StatusPaymentRequired {
668 url := hyper.BaseURL()
669 link := linkStyle.Hyperlink(url, "id=hyper").Render(url)
670 currentAssistant.AddFinish(message.FinishReasonError, "No credits", "You're out of credits. Add more at "+link)
671 } else if errors.As(err, &providerErr) {
672 if providerErr.Message == "The requested model is not supported." {
673 url := "https://github.com/settings/copilot/features"
674 link := linkStyle.Hyperlink(url, "id=copilot").Render(url)
675 currentAssistant.AddFinish(
676 message.FinishReasonError,
677 "Copilot model not enabled",
678 fmt.Sprintf("%q is not enabled in Copilot. Go to the following page to enable it. Then, wait 5 minutes before trying again. %s", largeModel.CatwalkCfg.Name, link),
679 )
680 } else {
681 currentAssistant.AddFinish(message.FinishReasonError, cmp.Or(stringext.Capitalize(providerErr.Title), defaultTitle), providerErr.Message)
682 }
683 } else if errors.As(err, &fantasyErr) {
684 currentAssistant.AddFinish(message.FinishReasonError, cmp.Or(stringext.Capitalize(fantasyErr.Title), defaultTitle), fantasyErr.Message)
685 } else {
686 currentAssistant.AddFinish(message.FinishReasonError, defaultTitle, err.Error())
687 }
688 // Note: we use the cleanup context here because the genCtx has been
689 // cancelled.
690 updateErr := a.messages.Update(cleanupCtx, *currentAssistant)
691 if updateErr != nil {
692 return nil, updateErr
693 }
694 return nil, err
695 }
696
697 if shouldSummarize {
698 a.activeRequests.Del(call.SessionID)
699 if summarizeErr := a.Summarize(genCtx, call.SessionID, call.ProviderOptions); summarizeErr != nil {
700 return nil, summarizeErr
701 }
702 // If the agent wasn't done...
703 if len(currentAssistant.ToolCalls()) > 0 {
704 existing, ok := a.messageQueue.Get(call.SessionID)
705 if !ok {
706 existing = []SessionAgentCall{}
707 }
708 call.Prompt = fmt.Sprintf("The previous session was interrupted because it got too long, the initial user request was: `%s`", call.Prompt)
709 existing = append(existing, call)
710 a.messageQueue.Set(call.SessionID, existing)
711 }
712 }
713
714 // Release active request before publishing the notification.
715 // TUI handlers poll IsSessionBusy() and only re-evaluate when a
716 // tea.Msg arrives, so the cleanup must precede the notify or
717 // subscribers see stale busy state at the moment of receipt.
718 a.activeRequests.Del(call.SessionID)
719 cancel()
720
721 // Send notification that agent has finished its turn (skip for
722 // nested/non-interactive sessions).
723 if !call.NonInteractive && a.notify != nil {
724 a.notify.Publish(pubsub.CreatedEvent, notify.Notification{
725 SessionID: call.SessionID,
726 SessionTitle: currentSession.Title,
727 Type: notify.TypeAgentFinished,
728 })
729 }
730
731 queuedMessages, ok := a.messageQueue.Get(call.SessionID)
732 if !ok || len(queuedMessages) == 0 {
733 return result, err
734 }
735 // There are queued messages restart the loop. The recursive Run
736 // publishes its own RunComplete for the queued prompt, so suppress
737 // the outer defer's emit to avoid a duplicate event whose Error
738 // field would belong to the recursive turn but whose MessageID/Text
739 // would belong to the outer turn.
740 skipRunComplete = true
741 firstQueuedMessage := queuedMessages[0]
742 a.messageQueue.Set(call.SessionID, queuedMessages[1:])
743 return a.Run(ctx, firstQueuedMessage)
744}
745
746func (a *sessionAgent) Summarize(ctx context.Context, sessionID string, opts fantasy.ProviderOptions) error {
747 if a.IsSessionBusy(sessionID) {
748 return ErrSessionBusy
749 }
750
751 // Copy mutable fields under lock to avoid races with SetModels.
752 largeModel := a.largeModel.Get()
753 systemPromptPrefix := a.systemPromptPrefix.Get()
754
755 currentSession, err := a.sessions.Get(ctx, sessionID)
756 if err != nil {
757 return fmt.Errorf("failed to get session: %w", err)
758 }
759 msgs, err := a.getSessionMessages(ctx, currentSession)
760 if err != nil {
761 return err
762 }
763 if len(msgs) == 0 {
764 // Nothing to summarize.
765 return nil
766 }
767
768 aiMsgs, _ := a.preparePrompt(msgs, largeModel.CatwalkCfg.SupportsImages)
769
770 genCtx, cancel := context.WithCancel(ctx)
771 a.activeRequests.Set(sessionID, cancel)
772 defer a.activeRequests.Del(sessionID)
773 defer cancel()
774 defer func() {
775 if flushErr := a.messages.FlushAll(ctx); flushErr != nil {
776 slog.Error("Failed to flush pending message updates after summarize", "error", flushErr)
777 }
778 }()
779
780 agent := fantasy.NewAgent(
781 largeModel.Model,
782 fantasy.WithSystemPrompt(string(summaryPrompt)),
783 fantasy.WithUserAgent(userAgent),
784 )
785 summaryMessage, err := a.messages.Create(ctx, sessionID, message.CreateMessageParams{
786 Role: message.Assistant,
787 Model: largeModel.ModelCfg.Model,
788 Provider: largeModel.ModelCfg.Provider,
789 IsSummaryMessage: true,
790 })
791 if err != nil {
792 return err
793 }
794
795 summaryPromptText := buildSummaryPrompt(currentSession.Todos)
796
797 resp, err := agent.Stream(genCtx, fantasy.AgentStreamCall{
798 Prompt: summaryPromptText,
799 Messages: aiMsgs,
800 ProviderOptions: opts,
801 PrepareStep: func(callContext context.Context, options fantasy.PrepareStepFunctionOptions) (_ context.Context, prepared fantasy.PrepareStepResult, err error) {
802 prepared.Messages = options.Messages
803 if systemPromptPrefix != "" {
804 prepared.Messages = append([]fantasy.Message{fantasy.NewSystemMessage(systemPromptPrefix)}, prepared.Messages...)
805 }
806 return callContext, prepared, nil
807 },
808 OnReasoningDelta: func(id string, text string) error {
809 summaryMessage.AppendReasoningContent(text)
810 return a.messages.Update(genCtx, summaryMessage)
811 },
812 OnReasoningEnd: func(id string, reasoning fantasy.ReasoningContent) error {
813 // Handle anthropic signature.
814 if anthropicData, ok := reasoning.ProviderMetadata["anthropic"]; ok {
815 if signature, ok := anthropicData.(*anthropic.ReasoningOptionMetadata); ok && signature.Signature != "" {
816 summaryMessage.AppendReasoningSignature(signature.Signature)
817 }
818 }
819 summaryMessage.FinishThinking()
820 return a.messages.Update(genCtx, summaryMessage)
821 },
822 OnTextDelta: func(id, text string) error {
823 summaryMessage.AppendContent(text)
824 return a.messages.Update(genCtx, summaryMessage)
825 },
826 })
827 if err != nil {
828 isCancelErr := errors.Is(err, context.Canceled)
829 if isCancelErr {
830 // User cancelled summarize we need to remove the summary message.
831 deleteErr := a.messages.Delete(ctx, summaryMessage.ID)
832 return deleteErr
833 }
834 // Mark the summary message as finished with an error so the UI
835 // stops spinning.
836 summaryMessage.AddFinish(message.FinishReasonError, "Summarization Error", err.Error())
837 if updateErr := a.messages.Update(ctx, summaryMessage); updateErr != nil {
838 return updateErr
839 }
840 return err
841 }
842
843 summaryMessage.AddFinish(message.FinishReasonEndTurn, "", "")
844 err = a.messages.Update(genCtx, summaryMessage)
845 if err != nil {
846 return err
847 }
848
849 var openrouterCost *float64
850 for _, step := range resp.Steps {
851 stepCost := a.openrouterCost(step.ProviderMetadata)
852 if stepCost != nil {
853 newCost := *stepCost
854 if openrouterCost != nil {
855 newCost += *openrouterCost
856 }
857 openrouterCost = &newCost
858 }
859 }
860
861 a.updateSessionUsage(largeModel, ¤tSession, resp.TotalUsage, openrouterCost, false)
862
863 // Just in case, get just the last usage info.
864 usage := resp.Response.Usage
865 currentSession.SummaryMessageID = summaryMessage.ID
866 currentSession.CompletionTokens = summaryCompletionTokens(usage, summaryMessage)
867 currentSession.PromptTokens = 0
868 currentSession.EstimatedUsage = usageIsZero(usage)
869 _, err = a.sessions.Save(genCtx, currentSession)
870 if err != nil {
871 return err
872 }
873
874 // Release the active request before processing queued messages so that
875 // Run() does not see the session as busy.
876 a.activeRequests.Del(sessionID)
877 cancel()
878
879 // Process any messages that were queued while summarizing.
880 queuedMessages, ok := a.messageQueue.Get(sessionID)
881 if !ok || len(queuedMessages) == 0 {
882 return nil
883 }
884 firstQueuedMessage := queuedMessages[0]
885 a.messageQueue.Set(sessionID, queuedMessages[1:])
886 _, qErr := a.Run(ctx, firstQueuedMessage)
887 return qErr
888}
889
890func (a *sessionAgent) getCacheControlOptions() fantasy.ProviderOptions {
891 if t, _ := strconv.ParseBool(os.Getenv("CRUSH_DISABLE_ANTHROPIC_CACHE")); t {
892 return fantasy.ProviderOptions{}
893 }
894 return fantasy.ProviderOptions{
895 anthropic.Name: &anthropic.ProviderCacheControlOptions{
896 CacheControl: anthropic.CacheControl{Type: "ephemeral"},
897 },
898 bedrock.Name: &anthropic.ProviderCacheControlOptions{
899 CacheControl: anthropic.CacheControl{Type: "ephemeral"},
900 },
901 vercel.Name: &anthropic.ProviderCacheControlOptions{
902 CacheControl: anthropic.CacheControl{Type: "ephemeral"},
903 },
904 }
905}
906
907func (a *sessionAgent) createUserMessage(ctx context.Context, call SessionAgentCall) (message.Message, error) {
908 parts := []message.ContentPart{message.TextContent{Text: call.Prompt}}
909 var attachmentParts []message.ContentPart
910 for _, attachment := range call.Attachments {
911 attachmentParts = append(attachmentParts, message.BinaryContent{Path: attachment.FilePath, MIMEType: attachment.MimeType, Data: attachment.Content})
912 }
913 parts = append(parts, attachmentParts...)
914 msg, err := a.messages.Create(ctx, call.SessionID, message.CreateMessageParams{
915 Role: message.User,
916 Parts: parts,
917 })
918 if err != nil {
919 return message.Message{}, fmt.Errorf("failed to create user message: %w", err)
920 }
921 return msg, nil
922}
923
924func (a *sessionAgent) preparePrompt(msgs []message.Message, supportsImages bool, attachments ...message.Attachment) ([]fantasy.Message, []fantasy.FilePart) {
925 var history []fantasy.Message
926 if !a.isSubAgent {
927 history = append(history, fantasy.NewUserMessage(
928 fmt.Sprintf(
929 "<system_reminder>%s</system_reminder>",
930 `This is a reminder that your todo list is currently empty. DO NOT mention this to the user explicitly because they are already aware.
931If you are working on tasks that would benefit from a todo list please use the "todos" tool to create one.
932If not, please feel free to ignore. Again do not mention this message to the user.`,
933 ),
934 ))
935 }
936 // Collect all tool call IDs present in assistant messages and all tool
937 // result IDs present in tool messages. This lets us detect both orphaned
938 // tool results (result without a call) and orphaned tool calls (call
939 // without a result).
940 knownToolCallIDs := make(map[string]struct{})
941 knownToolResultIDs := make(map[string]struct{})
942 for _, m := range msgs {
943 switch m.Role {
944 case message.Assistant:
945 for _, tc := range m.ToolCalls() {
946 knownToolCallIDs[tc.ID] = struct{}{}
947 }
948 case message.Tool:
949 for _, tr := range m.ToolResults() {
950 knownToolResultIDs[tr.ToolCallID] = struct{}{}
951 }
952 }
953 }
954
955 for _, m := range msgs {
956 if len(m.Parts) == 0 {
957 continue
958 }
959 // Assistant message without content or tool calls (cancelled before it returned anything).
960 if m.Role == message.Assistant && len(m.ToolCalls()) == 0 && m.Content().Text == "" && m.ReasoningContent().String() == "" {
961 continue
962 }
963 if m.Role == message.Tool {
964 if msg, ok := filterOrphanedToolResults(m, knownToolCallIDs); ok {
965 history = append(history, msg)
966 }
967 continue
968 }
969 aiMsgs := m.ToAIMessage()
970 if !supportsImages {
971 for i := range aiMsgs {
972 if aiMsgs[i].Role == fantasy.MessageRoleUser {
973 aiMsgs[i].Content = filterFileParts(aiMsgs[i].Content)
974 }
975 }
976 }
977 history = append(history, aiMsgs...)
978
979 if m.Role == message.Assistant {
980 if msg, ok := syntheticToolResultsForOrphanedCalls(m, knownToolResultIDs); ok {
981 history = append(history, msg)
982 }
983 }
984 }
985
986 var files []fantasy.FilePart
987 for _, attachment := range attachments {
988 if attachment.IsText() {
989 continue
990 }
991 files = append(files, fantasy.FilePart{
992 Filename: attachment.FileName,
993 Data: attachment.Content,
994 MediaType: attachment.MimeType,
995 })
996 }
997
998 return history, files
999}
1000
1001// filterFileParts removes fantasy.FilePart entries from a slice of message
1002// parts. Used to strip image attachments from historical user messages when
1003// the current model does not support them.
1004func filterFileParts(parts []fantasy.MessagePart) []fantasy.MessagePart {
1005 filtered := make([]fantasy.MessagePart, 0, len(parts))
1006 for _, part := range parts {
1007 if _, ok := fantasy.AsMessagePart[fantasy.FilePart](part); ok {
1008 continue
1009 }
1010 filtered = append(filtered, part)
1011 }
1012 return filtered
1013}
1014
1015// filterOrphanedToolResults converts a tool message to a fantasy.Message,
1016// dropping any tool result parts whose tool_call_id has no matching tool call
1017// in the known set. An orphaned result causes API validation to fail on every
1018// subsequent turn, permanently locking the session. Returns the filtered
1019// message and true if at least one valid part remains.
1020func filterOrphanedToolResults(m message.Message, knownToolCallIDs map[string]struct{}) (fantasy.Message, bool) {
1021 aiMsgs := m.ToAIMessage()
1022 if len(aiMsgs) == 0 {
1023 return fantasy.Message{}, false
1024 }
1025 var validParts []fantasy.MessagePart
1026 for _, part := range aiMsgs[0].Content {
1027 tr, ok := fantasy.AsMessagePart[fantasy.ToolResultPart](part)
1028 if !ok {
1029 validParts = append(validParts, part)
1030 continue
1031 }
1032 if _, known := knownToolCallIDs[tr.ToolCallID]; known {
1033 validParts = append(validParts, part)
1034 } else {
1035 slog.Warn(
1036 "Dropping orphaned tool result with no matching tool call",
1037 "tool_call_id", tr.ToolCallID,
1038 )
1039 }
1040 }
1041 if len(validParts) == 0 {
1042 return fantasy.Message{}, false
1043 }
1044 msg := aiMsgs[0]
1045 msg.Content = validParts
1046 return msg, true
1047}
1048
1049// syntheticToolResultsForOrphanedCalls returns a tool message containing
1050// synthetic tool results for any tool calls in the assistant message that
1051// have no matching result in knownToolResultIDs. LLM APIs require every
1052// tool_use to be immediately followed by a tool_result; an interrupted
1053// session can leave orphaned tool_use blocks that permanently lock the
1054// conversation. Returns the message and true if any synthetic results were
1055// produced.
1056func syntheticToolResultsForOrphanedCalls(m message.Message, knownToolResultIDs map[string]struct{}) (fantasy.Message, bool) {
1057 var syntheticParts []fantasy.MessagePart
1058 for _, tc := range m.ToolCalls() {
1059 if _, hasResult := knownToolResultIDs[tc.ID]; hasResult {
1060 continue
1061 }
1062 slog.Warn(
1063 "Injecting synthetic tool result for orphaned tool call",
1064 "tool_call_id", tc.ID,
1065 "tool_name", tc.Name,
1066 )
1067 syntheticParts = append(syntheticParts, fantasy.ToolResultPart{
1068 ToolCallID: tc.ID,
1069 Output: fantasy.ToolResultOutputContentError{
1070 Error: errors.New("tool call was interrupted and did not produce a result, you may retry this call if the result is still needed"),
1071 },
1072 })
1073 }
1074 if len(syntheticParts) == 0 {
1075 return fantasy.Message{}, false
1076 }
1077 return fantasy.Message{
1078 Role: fantasy.MessageRoleTool,
1079 Content: syntheticParts,
1080 }, true
1081}
1082
1083func (a *sessionAgent) getSessionMessages(ctx context.Context, session session.Session) ([]message.Message, error) {
1084 msgs, err := a.messages.List(ctx, session.ID)
1085 if err != nil {
1086 return nil, fmt.Errorf("failed to list messages: %w", err)
1087 }
1088
1089 if session.SummaryMessageID != "" {
1090 summaryMsgIndex := -1
1091 for i, msg := range msgs {
1092 if msg.ID == session.SummaryMessageID {
1093 summaryMsgIndex = i
1094 break
1095 }
1096 }
1097 if summaryMsgIndex != -1 {
1098 msgs = msgs[summaryMsgIndex:]
1099 msgs[0].Role = message.User
1100 }
1101 }
1102 return msgs, nil
1103}
1104
1105// generateTitle generates a session titled based on the initial prompt.
1106func (a *sessionAgent) generateTitle(ctx context.Context, sessionID string, userPrompt string) {
1107 if userPrompt == "" {
1108 return
1109 }
1110
1111 smallModel := a.smallModel.Get()
1112 largeModel := a.largeModel.Get()
1113 systemPromptPrefix := a.systemPromptPrefix.Get()
1114
1115 var maxOutputTokens int64 = 40
1116 if smallModel.CatwalkCfg.CanReason {
1117 maxOutputTokens = smallModel.CatwalkCfg.DefaultMaxTokens
1118 }
1119
1120 newAgent := func(m fantasy.LanguageModel, p []byte, tok int64) fantasy.Agent {
1121 return fantasy.NewAgent(
1122 m,
1123 fantasy.WithSystemPrompt(string(p)+"\n /no_think"),
1124 fantasy.WithMaxOutputTokens(tok),
1125 fantasy.WithUserAgent(userAgent),
1126 )
1127 }
1128
1129 streamCall := fantasy.AgentStreamCall{
1130 Prompt: fmt.Sprintf("Generate a concise title for the following content:\n\n%s\n <think>\n\n</think>", userPrompt),
1131 PrepareStep: func(callCtx context.Context, opts fantasy.PrepareStepFunctionOptions) (_ context.Context, prepared fantasy.PrepareStepResult, err error) {
1132 prepared.Messages = opts.Messages
1133 if systemPromptPrefix != "" {
1134 prepared.Messages = append([]fantasy.Message{
1135 fantasy.NewSystemMessage(systemPromptPrefix),
1136 }, prepared.Messages...)
1137 }
1138 return callCtx, prepared, nil
1139 },
1140 }
1141
1142 // Use the small model to generate the title.
1143 model := smallModel
1144 agent := newAgent(model.Model, titlePrompt, maxOutputTokens)
1145 resp, err := agent.Stream(ctx, streamCall)
1146 if err == nil {
1147 // We successfully generated a title with the small model.
1148 slog.Debug("Generated title with small model")
1149 } else {
1150 // It didn't work. Let's try with the big model.
1151 slog.Error("Error generating title with small model; trying big model", "err", err)
1152 model = largeModel
1153 agent = newAgent(model.Model, titlePrompt, maxOutputTokens)
1154 resp, err = agent.Stream(ctx, streamCall)
1155 if err == nil {
1156 slog.Debug("Generated title with large model")
1157 } else {
1158 // Welp, the large model didn't work either. Use the default
1159 // session name and return.
1160 slog.Error("Error generating title with large model", "err", err)
1161 saveErr := a.sessions.Rename(ctx, sessionID, DefaultSessionName)
1162 if saveErr != nil {
1163 slog.Error("Failed to save session title", "error", saveErr)
1164 }
1165 return
1166 }
1167 }
1168
1169 if resp == nil {
1170 // Actually, we didn't get a response so we can't. Use the default
1171 // session name and return.
1172 slog.Error("Response is nil; can't generate title")
1173 saveErr := a.sessions.Rename(ctx, sessionID, DefaultSessionName)
1174 if saveErr != nil {
1175 slog.Error("Failed to save session title", "error", saveErr)
1176 }
1177 return
1178 }
1179
1180 // Clean up title.
1181 var title string
1182 title = strings.ReplaceAll(resp.Response.Content.Text(), "\n", " ")
1183
1184 // Remove thinking tags if present.
1185 title = thinkTagRegex.ReplaceAllString(title, "")
1186 title = orphanThinkTagRegex.ReplaceAllString(title, "")
1187
1188 title = strings.TrimSpace(title)
1189 title = cmp.Or(title, DefaultSessionName)
1190
1191 // Calculate usage and cost.
1192 var openrouterCost *float64
1193 for _, step := range resp.Steps {
1194 stepCost := a.openrouterCost(step.ProviderMetadata)
1195 if stepCost != nil {
1196 newCost := *stepCost
1197 if openrouterCost != nil {
1198 newCost += *openrouterCost
1199 }
1200 openrouterCost = &newCost
1201 }
1202 }
1203
1204 modelConfig := model.CatwalkCfg
1205 cost := modelConfig.CostPer1MInCached/1e6*float64(resp.TotalUsage.CacheCreationTokens) +
1206 modelConfig.CostPer1MOutCached/1e6*float64(resp.TotalUsage.CacheReadTokens) +
1207 modelConfig.CostPer1MIn/1e6*float64(resp.TotalUsage.InputTokens) +
1208 modelConfig.CostPer1MOut/1e6*float64(resp.TotalUsage.OutputTokens)
1209
1210 // Use override cost if available (e.g., from OpenRouter).
1211 if openrouterCost != nil {
1212 cost = *openrouterCost
1213 }
1214
1215 // Skip cost accumulation
1216 if model.FlatRate {
1217 cost = 0
1218 }
1219
1220 promptTokens := resp.TotalUsage.InputTokens + resp.TotalUsage.CacheCreationTokens
1221 completionTokens := resp.TotalUsage.OutputTokens
1222
1223 // Atomically update only title and usage fields to avoid overriding other
1224 // concurrent session updates.
1225 saveErr := a.sessions.UpdateTitleAndUsage(ctx, sessionID, title, promptTokens, completionTokens, cost)
1226 if saveErr != nil {
1227 slog.Error("Failed to save session title and usage", "error", saveErr)
1228 return
1229 }
1230}
1231
1232func (a *sessionAgent) openrouterCost(metadata fantasy.ProviderMetadata) *float64 {
1233 openrouterMetadata, ok := metadata[openrouter.Name]
1234 if !ok {
1235 return nil
1236 }
1237
1238 opts, ok := openrouterMetadata.(*openrouter.ProviderMetadata)
1239 if !ok {
1240 return nil
1241 }
1242 return &opts.Usage.Cost
1243}
1244
1245func (a *sessionAgent) updateSessionUsage(model Model, session *session.Session, usage fantasy.Usage, overrideCost *float64, estimated bool) {
1246 if !usageIsZero(usage) {
1247 session.EstimatedUsage = estimated
1248 }
1249
1250 modelConfig := model.CatwalkCfg
1251 cost := modelConfig.CostPer1MInCached/1e6*float64(usage.CacheCreationTokens) +
1252 modelConfig.CostPer1MOutCached/1e6*float64(usage.CacheReadTokens) +
1253 modelConfig.CostPer1MIn/1e6*float64(usage.InputTokens) +
1254 modelConfig.CostPer1MOut/1e6*float64(usage.OutputTokens)
1255
1256 if !estimated {
1257 a.eventTokensUsed(session.ID, model, usage, cost)
1258 }
1259
1260 if estimated {
1261 cost = 0
1262 } else {
1263 // Use override cost if available (e.g., from OpenRouter).
1264 if overrideCost != nil {
1265 cost = *overrideCost
1266 }
1267
1268 // Skip cost accumulation
1269 if model.FlatRate {
1270 cost = 0
1271 }
1272 }
1273
1274 session.Cost += cost
1275 updateSessionTokenCounters(session, usage)
1276}
1277
1278func updateSessionTokenCounters(session *session.Session, usage fantasy.Usage) {
1279 if usage.OutputTokens != 0 {
1280 session.CompletionTokens = usage.OutputTokens
1281 }
1282 if promptTokens := usage.InputTokens + usage.CacheReadTokens; promptTokens != 0 {
1283 session.PromptTokens = promptTokens
1284 }
1285}
1286
1287func summaryCompletionTokens(usage fantasy.Usage, summaryMessage message.Message) int64 {
1288 if usage.OutputTokens != 0 {
1289 return usage.OutputTokens
1290 }
1291 return approxTokenCount(summaryMessage.Content().Text) + approxTokenCount(summaryMessage.ReasoningContent().String())
1292}
1293
1294func (a *sessionAgent) Cancel(sessionID string) {
1295 // Cancel regular requests. Don't use Take() here - we need the entry to
1296 // remain in activeRequests so IsBusy() returns true until the goroutine
1297 // fully completes (including error handling that may access the DB).
1298 // The defer in processRequest will clean up the entry.
1299 if cancel, ok := a.activeRequests.Get(sessionID); ok && cancel != nil {
1300 slog.Debug("Request cancellation initiated", "session_id", sessionID)
1301 cancel()
1302 }
1303
1304 // Also check for summarize requests.
1305 if cancel, ok := a.activeRequests.Get(sessionID + "-summarize"); ok && cancel != nil {
1306 slog.Debug("Summarize cancellation initiated", "session_id", sessionID)
1307 cancel()
1308 }
1309
1310 if a.QueuedPrompts(sessionID) > 0 {
1311 slog.Debug("Clearing queued prompts", "session_id", sessionID)
1312 a.messageQueue.Del(sessionID)
1313 }
1314}
1315
1316func (a *sessionAgent) ClearQueue(sessionID string) {
1317 if a.QueuedPrompts(sessionID) > 0 {
1318 slog.Debug("Clearing queued prompts", "session_id", sessionID)
1319 a.messageQueue.Del(sessionID)
1320 }
1321}
1322
1323func (a *sessionAgent) CancelAll() {
1324 if !a.IsBusy() {
1325 return
1326 }
1327 for key := range a.activeRequests.Seq2() {
1328 a.Cancel(key) // key is sessionID
1329 }
1330
1331 timeout := time.After(5 * time.Second)
1332 for a.IsBusy() {
1333 select {
1334 case <-timeout:
1335 return
1336 default:
1337 time.Sleep(200 * time.Millisecond)
1338 }
1339 }
1340}
1341
1342func (a *sessionAgent) IsBusy() bool {
1343 var busy bool
1344 for cancelFunc := range a.activeRequests.Seq() {
1345 if cancelFunc != nil {
1346 busy = true
1347 break
1348 }
1349 }
1350 return busy
1351}
1352
1353func (a *sessionAgent) IsSessionBusy(sessionID string) bool {
1354 _, busy := a.activeRequests.Get(sessionID)
1355 return busy
1356}
1357
1358func (a *sessionAgent) QueuedPrompts(sessionID string) int {
1359 l, ok := a.messageQueue.Get(sessionID)
1360 if !ok {
1361 return 0
1362 }
1363 return len(l)
1364}
1365
1366func (a *sessionAgent) QueuedPromptsList(sessionID string) []string {
1367 l, ok := a.messageQueue.Get(sessionID)
1368 if !ok {
1369 return nil
1370 }
1371 prompts := make([]string, len(l))
1372 for i, call := range l {
1373 prompts[i] = call.Prompt
1374 }
1375 return prompts
1376}
1377
1378func (a *sessionAgent) SetModels(large Model, small Model) {
1379 a.largeModel.Set(large)
1380 a.smallModel.Set(small)
1381}
1382
1383func (a *sessionAgent) SetTools(tools []fantasy.AgentTool) {
1384 a.tools.SetSlice(tools)
1385}
1386
1387func (a *sessionAgent) SetSystemPrompt(systemPrompt string) {
1388 a.systemPrompt.Set(systemPrompt)
1389}
1390
1391func (a *sessionAgent) Model() Model {
1392 return a.largeModel.Get()
1393}
1394
1395// convertToToolResult converts a fantasy tool result to a message tool result.
1396func (a *sessionAgent) convertToToolResult(result fantasy.ToolResultContent) message.ToolResult {
1397 baseResult := message.ToolResult{
1398 ToolCallID: result.ToolCallID,
1399 Name: result.ToolName,
1400 Metadata: result.ClientMetadata,
1401 }
1402
1403 switch result.Result.GetType() {
1404 case fantasy.ToolResultContentTypeText:
1405 if r, ok := fantasy.AsToolResultOutputType[fantasy.ToolResultOutputContentText](result.Result); ok {
1406 baseResult.Content = r.Text
1407 }
1408 case fantasy.ToolResultContentTypeError:
1409 if r, ok := fantasy.AsToolResultOutputType[fantasy.ToolResultOutputContentError](result.Result); ok {
1410 baseResult.Content = r.Error.Error()
1411 baseResult.IsError = true
1412 }
1413 case fantasy.ToolResultContentTypeMedia:
1414 if r, ok := fantasy.AsToolResultOutputType[fantasy.ToolResultOutputContentMedia](result.Result); ok {
1415 if !stringext.IsValidBase64(r.Data) {
1416 slog.Warn(
1417 "Tool returned media with invalid base64 data, discarding image",
1418 "tool", result.ToolName,
1419 "tool_call_id", result.ToolCallID,
1420 )
1421 baseResult.Content = "Tool returned image data with invalid encoding"
1422 baseResult.IsError = true
1423 } else {
1424 content := r.Text
1425 if content == "" {
1426 content = fmt.Sprintf("Loaded %s content", r.MediaType)
1427 }
1428 baseResult.Content = content
1429 baseResult.Data = r.Data
1430 baseResult.MIMEType = r.MediaType
1431 }
1432 }
1433 }
1434
1435 return baseResult
1436}
1437
1438// workaroundProviderMediaLimitations converts media content in tool results to
1439// user messages for providers that don't natively support images in tool results.
1440//
1441// Problem: OpenAI, Google, OpenRouter, and other OpenAI-compatible providers
1442// don't support sending images/media in tool result messages - they only accept
1443// text in tool results. However, they DO support images in user messages.
1444//
1445// If we send media in tool results to these providers, the API returns an error.
1446//
1447// Solution: For these providers, we:
1448// 1. Replace the media in the tool result with a text placeholder
1449// 2. Inject a user message immediately after with the image as a file attachment
1450// 3. This maintains the tool execution flow while working around API limitations
1451//
1452// Anthropic and Bedrock support images natively in tool results, so we skip
1453// this workaround for them.
1454//
1455// Example transformation:
1456//
1457// BEFORE: [tool result: image data]
1458// AFTER: [tool result: "Image loaded - see attached"], [user: image attachment]
1459func (a *sessionAgent) workaroundProviderMediaLimitations(messages []fantasy.Message, largeModel Model) []fantasy.Message {
1460 providerSupportsMedia := largeModel.ModelCfg.Provider == string(catwalk.InferenceProviderAnthropic) ||
1461 largeModel.ModelCfg.Provider == string(catwalk.InferenceProviderBedrock) ||
1462 largeModel.ModelCfg.Provider == string(catwalk.InferenceProviderBedrockEurope)
1463
1464 if providerSupportsMedia {
1465 return messages
1466 }
1467
1468 convertedMessages := make([]fantasy.Message, 0, len(messages))
1469
1470 for _, msg := range messages {
1471 if msg.Role != fantasy.MessageRoleTool {
1472 convertedMessages = append(convertedMessages, msg)
1473 continue
1474 }
1475
1476 textParts := make([]fantasy.MessagePart, 0, len(msg.Content))
1477 var mediaFiles []fantasy.FilePart
1478
1479 for _, part := range msg.Content {
1480 toolResult, ok := fantasy.AsMessagePart[fantasy.ToolResultPart](part)
1481 if !ok {
1482 textParts = append(textParts, part)
1483 continue
1484 }
1485
1486 if media, ok := fantasy.AsToolResultOutputType[fantasy.ToolResultOutputContentMedia](toolResult.Output); ok {
1487 decoded, err := base64.StdEncoding.DecodeString(media.Data)
1488 if err != nil {
1489 slog.Warn("Failed to decode media data", "error", err)
1490 textParts = append(textParts, part)
1491 continue
1492 }
1493
1494 mediaFiles = append(mediaFiles, fantasy.FilePart{
1495 Data: decoded,
1496 MediaType: media.MediaType,
1497 Filename: fmt.Sprintf("tool-result-%s", toolResult.ToolCallID),
1498 })
1499
1500 textParts = append(textParts, fantasy.ToolResultPart{
1501 ToolCallID: toolResult.ToolCallID,
1502 Output: fantasy.ToolResultOutputContentText{
1503 Text: "[Image/media content loaded - see attached file]",
1504 },
1505 ProviderOptions: toolResult.ProviderOptions,
1506 })
1507 } else {
1508 textParts = append(textParts, part)
1509 }
1510 }
1511
1512 convertedMessages = append(convertedMessages, fantasy.Message{
1513 Role: fantasy.MessageRoleTool,
1514 Content: textParts,
1515 })
1516
1517 if len(mediaFiles) > 0 {
1518 convertedMessages = append(convertedMessages, fantasy.NewUserMessage(
1519 "Here is the media content from the tool result:",
1520 mediaFiles...,
1521 ))
1522 }
1523 }
1524
1525 return convertedMessages
1526}
1527
1528// buildSummaryPrompt constructs the prompt text for session summarization.
1529func buildSummaryPrompt(todos []session.Todo) string {
1530 var sb strings.Builder
1531 sb.WriteString("Provide a detailed summary of our conversation above.")
1532 if len(todos) > 0 {
1533 sb.WriteString("\n\n## Current Todo List\n\n")
1534 for _, t := range todos {
1535 fmt.Fprintf(&sb, "- [%s] %s\n", t.Status, t.Content)
1536 }
1537 sb.WriteString("\nInclude these tasks and their statuses in your summary. ")
1538 sb.WriteString("Instruct the resuming assistant to use the `todos` tool to continue tracking progress on these tasks.")
1539 }
1540 return sb.String()
1541}
1542
1543func providerRetryLogFields(err *fantasy.ProviderError, delay time.Duration) []any {
1544 fields := []any{
1545 "retry_delay", delay.String(),
1546 }
1547 if err == nil {
1548 return fields
1549 }
1550 fields = append(fields, "status_code", err.StatusCode)
1551 if err.Title != "" {
1552 fields = append(fields, "title", err.Title)
1553 }
1554 if err.Message != "" {
1555 fields = append(fields, "message", err.Message)
1556 }
1557 return fields
1558}