1// Package agent is the core orchestration layer for Crush AI agents.
2//
3// It provides session-based AI agent functionality for managing
4// conversations, tool execution, and message handling. It coordinates
5// interactions between language models, messages, sessions, and tools while
6// handling features like automatic summarization, queuing, and token
7// management.
8package agent
9
10import (
11 "cmp"
12 "context"
13 _ "embed"
14 "encoding/base64"
15 "errors"
16 "fmt"
17 "log/slog"
18 "net/http"
19 "os"
20 "regexp"
21 "strconv"
22 "strings"
23 "sync"
24 "time"
25
26 "charm.land/catwalk/pkg/catwalk"
27 "charm.land/fantasy"
28 "charm.land/fantasy/providers/anthropic"
29 "charm.land/fantasy/providers/bedrock"
30 "charm.land/fantasy/providers/google"
31 "charm.land/fantasy/providers/openai"
32 "charm.land/fantasy/providers/openrouter"
33 "charm.land/fantasy/providers/vercel"
34 "charm.land/lipgloss/v2"
35 "github.com/charmbracelet/crush/internal/agent/hyper"
36 "github.com/charmbracelet/crush/internal/agent/notify"
37 "github.com/charmbracelet/crush/internal/agent/tools"
38 "github.com/charmbracelet/crush/internal/agent/tools/mcp"
39 "github.com/charmbracelet/crush/internal/config"
40 "github.com/charmbracelet/crush/internal/csync"
41 "github.com/charmbracelet/crush/internal/message"
42 "github.com/charmbracelet/crush/internal/pubsub"
43 "github.com/charmbracelet/crush/internal/session"
44 "github.com/charmbracelet/crush/internal/stringext"
45 "github.com/charmbracelet/crush/internal/version"
46 "github.com/charmbracelet/x/exp/charmtone"
47)
48
49const (
50 DefaultSessionName = "Untitled Session"
51
52 // Constants for auto-summarization thresholds
53 largeContextWindowThreshold = 200_000
54 largeContextWindowBuffer = 20_000
55 smallContextWindowRatio = 0.2
56)
57
58var userAgent = fmt.Sprintf("Charm-Crush/%s (https://charm.land/crush)", version.Version)
59
60//go:embed templates/title.md
61var titlePrompt []byte
62
63//go:embed templates/summary.md
64var summaryPrompt []byte
65
66// Used to remove <think> tags from generated titles.
67var (
68 thinkTagRegex = regexp.MustCompile(`(?s)<think>.*?</think>`)
69 orphanThinkTagRegex = regexp.MustCompile(`</?think>`)
70)
71
72type SessionAgentCall struct {
73 SessionID string
74 // RunID, when non-empty, is the caller-supplied correlator that
75 // gets echoed back on the notify.RunComplete event emitted for
76 // this turn. It is preserved when the call is enqueued behind a
77 // busy session so the queued turn's terminal event is still
78 // recognisable to the original caller. Callers that need a
79 // reliable completion contract (e.g. `crush run` against a
80 // session that may be busy) MUST set it; SessionID alone is
81 // ambiguous when concurrent turns share the same session.
82 RunID string
83 Prompt string
84 ProviderOptions fantasy.ProviderOptions
85 Attachments []message.Attachment
86 MaxOutputTokens int64
87 Temperature *float64
88 TopP *float64
89 TopK *int64
90 FrequencyPenalty *float64
91 PresencePenalty *float64
92 NonInteractive bool
93 // OnComplete, when non-nil, replaces the default RunComplete
94 // publish path: the inner Run hands the terminal payload to this
95 // callback instead of emitting it on the RunComplete broker. The
96 // coordinator uses this hook to coalesce the unauthorized →
97 // re-auth → retry chain into a single user-visible terminal
98 // event, so non-interactive clients (e.g. `crush run`) don't
99 // exit on a stale failed-attempt RunComplete before the
100 // successful retry. It is intentionally stripped when queueing
101 // a busy-session call (see Run): the originating
102 // coordinator.Run has long returned by the time the queued
103 // recursion drains, so falling back to the default broker
104 // publish keeps the event visible to subscribers.
105 OnComplete func(notify.RunComplete)
106}
107
108type SessionAgent interface {
109 Run(context.Context, SessionAgentCall) (*fantasy.AgentResult, error)
110 SetModels(large Model, small Model)
111 SetTools(tools []fantasy.AgentTool)
112 SetSystemPrompt(systemPrompt string)
113 Cancel(sessionID string)
114 CancelAll()
115 IsSessionBusy(sessionID string) bool
116 IsBusy() bool
117 QueuedPrompts(sessionID string) int
118 QueuedPromptsList(sessionID string) []string
119 ClearQueue(sessionID string)
120 Summarize(context.Context, string, fantasy.ProviderOptions) error
121 Model() Model
122}
123
124type Model struct {
125 Model fantasy.LanguageModel
126 CatwalkCfg catwalk.Model
127 ModelCfg config.SelectedModel
128 FlatRate bool
129}
130
131type sessionAgent struct {
132 largeModel *csync.Value[Model]
133 smallModel *csync.Value[Model]
134 systemPromptPrefix *csync.Value[string]
135 systemPrompt *csync.Value[string]
136 tools *csync.Slice[fantasy.AgentTool]
137
138 isSubAgent bool
139 sessions session.Service
140 messages message.Service
141 disableAutoSummarize bool
142 isYolo bool
143 notify pubsub.Publisher[notify.Notification]
144 runComplete pubsub.Publisher[notify.RunComplete]
145
146 messageQueue *csync.Map[string, []SessionAgentCall]
147 activeRequests *csync.Map[string, context.CancelFunc]
148}
149
150type SessionAgentOptions struct {
151 LargeModel Model
152 SmallModel Model
153 SystemPromptPrefix string
154 SystemPrompt string
155 IsSubAgent bool
156 DisableAutoSummarize bool
157 IsYolo bool
158 Sessions session.Service
159 Messages message.Service
160 Tools []fantasy.AgentTool
161 Notify pubsub.Publisher[notify.Notification]
162 RunComplete pubsub.Publisher[notify.RunComplete]
163}
164
165func NewSessionAgent(
166 opts SessionAgentOptions,
167) SessionAgent {
168 return &sessionAgent{
169 largeModel: csync.NewValue(opts.LargeModel),
170 smallModel: csync.NewValue(opts.SmallModel),
171 systemPromptPrefix: csync.NewValue(opts.SystemPromptPrefix),
172 systemPrompt: csync.NewValue(opts.SystemPrompt),
173 isSubAgent: opts.IsSubAgent,
174 sessions: opts.Sessions,
175 messages: opts.Messages,
176 disableAutoSummarize: opts.DisableAutoSummarize,
177 tools: csync.NewSliceFrom(opts.Tools),
178 isYolo: opts.IsYolo,
179 notify: opts.Notify,
180 runComplete: opts.RunComplete,
181 messageQueue: csync.NewMap[string, []SessionAgentCall](),
182 activeRequests: csync.NewMap[string, context.CancelFunc](),
183 }
184}
185
186func (a *sessionAgent) Run(ctx context.Context, call SessionAgentCall) (result *fantasy.AgentResult, retErr error) {
187 if call.Prompt == "" && !message.ContainsTextAttachment(call.Attachments) {
188 return nil, ErrEmptyPrompt
189 }
190 if call.SessionID == "" {
191 return nil, ErrSessionMissing
192 }
193
194 // Queue the message if busy. Strip OnComplete: the caller that
195 // supplied the hook (typically coordinator.Run) has its own
196 // retry/coalesce scope that ends when it returns, so by the time
197 // the queue drains nobody is left to consume the buffered
198 // terminal event. The recursive Run will fall back to the
199 // default broker publish, which is what existing subscribers
200 // expect for queued turns.
201 if a.IsSessionBusy(call.SessionID) {
202 existing, ok := a.messageQueue.Get(call.SessionID)
203 if !ok {
204 existing = []SessionAgentCall{}
205 }
206 queued := call
207 queued.OnComplete = nil
208 existing = append(existing, queued)
209 a.messageQueue.Set(call.SessionID, existing)
210 return nil, nil
211 }
212
213 // Copy mutable fields under lock to avoid races with SetTools/SetModels.
214 agentTools := a.tools.Copy()
215 largeModel := a.largeModel.Get()
216 systemPrompt := a.systemPrompt.Get()
217 promptPrefix := a.systemPromptPrefix.Get()
218 var instructions strings.Builder
219
220 for _, server := range mcp.GetStates() {
221 if server.State != mcp.StateConnected {
222 continue
223 }
224 if s := server.Client.InitializeResult().Instructions; s != "" {
225 instructions.WriteString(s)
226 instructions.WriteString("\n\n")
227 }
228 }
229
230 if s := instructions.String(); s != "" {
231 systemPrompt += "\n\n<mcp-instructions>\n" + s + "\n</mcp-instructions>"
232 }
233
234 if len(agentTools) > 0 {
235 // Add Anthropic caching to the last tool.
236 agentTools[len(agentTools)-1].SetProviderOptions(a.getCacheControlOptions())
237 }
238
239 agent := fantasy.NewAgent(
240 largeModel.Model,
241 fantasy.WithSystemPrompt(systemPrompt),
242 fantasy.WithTools(agentTools...),
243 fantasy.WithUserAgent(userAgent),
244 )
245
246 sessionLock := sync.Mutex{}
247 currentSession, err := a.sessions.Get(ctx, call.SessionID)
248 if err != nil {
249 return nil, fmt.Errorf("failed to get session: %w", err)
250 }
251
252 msgs, err := a.getSessionMessages(ctx, currentSession)
253 if err != nil {
254 return nil, fmt.Errorf("failed to get session messages: %w", err)
255 }
256
257 var wg sync.WaitGroup
258 // Generate title if first message.
259 if len(msgs) == 0 {
260 titleCtx := ctx // Copy to avoid race with ctx reassignment below.
261 wg.Go(func() {
262 a.generateTitle(titleCtx, call.SessionID, call.Prompt)
263 })
264 }
265 defer wg.Wait()
266
267 // Add the user message to the session.
268 _, err = a.createUserMessage(ctx, call)
269 if err != nil {
270 return nil, err
271 }
272
273 // Add the session to the context.
274 ctx = context.WithValue(ctx, tools.SessionIDContextKey, call.SessionID)
275
276 genCtx, cancel := context.WithCancel(ctx)
277 a.activeRequests.Set(call.SessionID, cancel)
278
279 defer cancel()
280 defer a.activeRequests.Del(call.SessionID)
281 // skipRunComplete is set just before the queued-recursion path so
282 // the outer Run doesn't publish a RunComplete that would race
283 // with — and be superseded by — the recursive call's own
284 // RunComplete (each queued user prompt is its own turn and
285 // publishes exactly one terminal event).
286 var skipRunComplete bool
287 // currentAssistant is declared here so the deferred RunComplete
288 // publish below can capture the pointer that PrepareStep will
289 // later (re)assign for each streaming step. The final assistant
290 // message of the turn is the value reachable through this
291 // pointer when the defer runs.
292 var currentAssistant *message.Message
293 // Drain any debounced message updates before returning. message.Service
294 // already flushes synchronously on terminal updates, but a defer here
295 // guarantees the contract at every Run exit (success, error, panic
296 // recovery upstream) without callers needing to know.
297 //
298 // After the flush completes — meaning all per-message
299 // Publish(UpdatedEvent) calls have fired and been buffered into
300 // every subscriber's channel — publish the authoritative
301 // RunComplete event for this turn. The flush-then-publish order
302 // gives well-behaved clients the best chance of seeing the final
303 // message event before RunComplete; the embedded Text field
304 // reconciles for clients that observe the events out of order
305 // (the pubsub broker fan-in does not serialize publishes from
306 // different upstream brokers).
307 defer func() {
308 if flushErr := a.messages.FlushAll(ctx); flushErr != nil {
309 slog.Error("Failed to flush pending message updates after run", "error", flushErr)
310 }
311 if skipRunComplete {
312 return
313 }
314 complete := notify.RunComplete{SessionID: call.SessionID, RunID: call.RunID}
315 if currentAssistant != nil {
316 complete.MessageID = currentAssistant.ID
317 complete.Text = currentAssistant.Content().String()
318 }
319 if retErr != nil {
320 complete.Error = retErr.Error()
321 complete.Cancelled = errors.Is(retErr, context.Canceled)
322 } else if ctx.Err() != nil {
323 complete.Cancelled = true
324 }
325 // Prefer the per-call hook when supplied so the coordinator
326 // can coalesce retries (e.g. unauthorized → re-auth → retry)
327 // into a single user-visible terminal event. The fallback
328 // must-deliver publish applies bounded-blocking semantics to
329 // the authoritative terminal event so a momentarily-full
330 // subscriber channel can't silently drop it and hang
331 // non-interactive clients waiting on RunComplete.
332 if call.OnComplete != nil {
333 call.OnComplete(complete)
334 return
335 }
336 if a.runComplete == nil {
337 return
338 }
339 a.runComplete.PublishMustDeliver(ctx, pubsub.UpdatedEvent, complete)
340 }()
341
342 history, files := a.preparePrompt(msgs, largeModel.CatwalkCfg.SupportsImages, call.Attachments...)
343
344 startTime := time.Now()
345 a.eventPromptSent(call.SessionID)
346
347 var stepMessages []fantasy.Message
348 var shouldSummarize bool
349 // Don't send MaxOutputTokens if 0 — some providers (e.g. LM Studio) reject it
350 var maxOutputTokens *int64
351 if call.MaxOutputTokens > 0 {
352 maxOutputTokens = &call.MaxOutputTokens
353 }
354 result, err = agent.Stream(genCtx, fantasy.AgentStreamCall{
355 Prompt: message.PromptWithTextAttachments(call.Prompt, call.Attachments),
356 Files: files,
357 Messages: history,
358 ProviderOptions: call.ProviderOptions,
359 MaxOutputTokens: maxOutputTokens,
360 TopP: call.TopP,
361 Temperature: call.Temperature,
362 PresencePenalty: call.PresencePenalty,
363 TopK: call.TopK,
364 FrequencyPenalty: call.FrequencyPenalty,
365 PrepareStep: func(callContext context.Context, options fantasy.PrepareStepFunctionOptions) (_ context.Context, prepared fantasy.PrepareStepResult, err error) {
366 prepared.Messages = options.Messages
367 for i := range prepared.Messages {
368 prepared.Messages[i].ProviderOptions = nil
369 }
370
371 // Use latest tools (updated by SetTools when MCP tools change).
372 prepared.Tools = a.tools.Copy()
373
374 queuedCalls, _ := a.messageQueue.Get(call.SessionID)
375 a.messageQueue.Del(call.SessionID)
376 for _, queued := range queuedCalls {
377 userMessage, createErr := a.createUserMessage(callContext, queued)
378 if createErr != nil {
379 return callContext, prepared, createErr
380 }
381 prepared.Messages = append(prepared.Messages, userMessage.ToAIMessage()...)
382 }
383
384 prepared.Messages = a.workaroundProviderMediaLimitations(prepared.Messages, largeModel)
385
386 lastSystemRoleInx := 0
387 systemMessageUpdated := false
388 for i, msg := range prepared.Messages {
389 // Only add cache control to the last message.
390 if msg.Role == fantasy.MessageRoleSystem {
391 lastSystemRoleInx = i
392 } else if !systemMessageUpdated {
393 prepared.Messages[lastSystemRoleInx].ProviderOptions = a.getCacheControlOptions()
394 systemMessageUpdated = true
395 }
396 // Than add cache control to the last 2 messages.
397 if i > len(prepared.Messages)-3 {
398 prepared.Messages[i].ProviderOptions = a.getCacheControlOptions()
399 }
400 }
401
402 if promptPrefix != "" {
403 prepared.Messages = append([]fantasy.Message{fantasy.NewSystemMessage(promptPrefix)}, prepared.Messages...)
404 }
405
406 sessionLock.Lock()
407 stepMessages = cloneFantasyMessages(prepared.Messages)
408 sessionLock.Unlock()
409
410 var assistantMsg message.Message
411 assistantMsg, err = a.messages.Create(callContext, call.SessionID, message.CreateMessageParams{
412 Role: message.Assistant,
413 Parts: []message.ContentPart{},
414 Model: largeModel.ModelCfg.Model,
415 Provider: largeModel.ModelCfg.Provider,
416 })
417 if err != nil {
418 return callContext, prepared, err
419 }
420 callContext = context.WithValue(callContext, tools.MessageIDContextKey, assistantMsg.ID)
421 callContext = context.WithValue(callContext, tools.SupportsImagesContextKey, largeModel.CatwalkCfg.SupportsImages)
422 callContext = context.WithValue(callContext, tools.ModelNameContextKey, largeModel.CatwalkCfg.Name)
423 currentAssistant = &assistantMsg
424 return callContext, prepared, err
425 },
426 OnReasoningStart: func(id string, reasoning fantasy.ReasoningContent) error {
427 currentAssistant.AppendReasoningContent(reasoning.Text)
428 return a.messages.Update(genCtx, *currentAssistant)
429 },
430 OnReasoningDelta: func(id string, text string) error {
431 currentAssistant.AppendReasoningContent(text)
432 return a.messages.Update(genCtx, *currentAssistant)
433 },
434 OnReasoningEnd: func(id string, reasoning fantasy.ReasoningContent) error {
435 // handle anthropic signature
436 if anthropicData, ok := reasoning.ProviderMetadata[anthropic.Name]; ok {
437 if reasoning, ok := anthropicData.(*anthropic.ReasoningOptionMetadata); ok {
438 currentAssistant.AppendReasoningSignature(reasoning.Signature)
439 }
440 }
441 if googleData, ok := reasoning.ProviderMetadata[google.Name]; ok {
442 if reasoning, ok := googleData.(*google.ReasoningMetadata); ok {
443 currentAssistant.AppendThoughtSignature(reasoning.Signature, reasoning.ToolID)
444 }
445 }
446 if openaiData, ok := reasoning.ProviderMetadata[openai.Name]; ok {
447 if reasoning, ok := openaiData.(*openai.ResponsesReasoningMetadata); ok {
448 currentAssistant.SetReasoningResponsesData(reasoning)
449 }
450 }
451 currentAssistant.FinishThinking()
452 return a.messages.Update(genCtx, *currentAssistant)
453 },
454 OnTextDelta: func(id string, text string) error {
455 // Strip leading newline from initial text content. This is is
456 // particularly important in non-interactive mode where leading
457 // newlines are very visible.
458 if len(currentAssistant.Parts) == 0 {
459 text = strings.TrimPrefix(text, "\n")
460 }
461
462 currentAssistant.AppendContent(text)
463 return a.messages.Update(genCtx, *currentAssistant)
464 },
465 OnToolInputStart: func(id string, toolName string) error {
466 toolCall := message.ToolCall{
467 ID: id,
468 Name: toolName,
469 ProviderExecuted: false,
470 Finished: false,
471 }
472 currentAssistant.AddToolCall(toolCall)
473 // Use parent ctx instead of genCtx to ensure the update succeeds
474 // even if the request is canceled mid-stream
475 return a.messages.Update(ctx, *currentAssistant)
476 },
477 OnRetry: func(err *fantasy.ProviderError, delay time.Duration) {
478 slog.Warn("Provider request failed, retrying", providerRetryLogFields(err, delay)...)
479 },
480 OnToolCall: func(tc fantasy.ToolCallContent) error {
481 toolCall := message.ToolCall{
482 ID: tc.ToolCallID,
483 Name: tc.ToolName,
484 Input: tc.Input,
485 ProviderExecuted: false,
486 Finished: true,
487 }
488 currentAssistant.AddToolCall(toolCall)
489 // Use parent ctx instead of genCtx to ensure the update succeeds
490 // even if the request is canceled mid-stream
491 return a.messages.Update(ctx, *currentAssistant)
492 },
493 OnToolResult: func(result fantasy.ToolResultContent) error {
494 toolResult := a.convertToToolResult(result)
495 // Use parent ctx instead of genCtx to ensure the message is created
496 // even if the request is canceled mid-stream
497 _, createMsgErr := a.messages.Create(ctx, currentAssistant.SessionID, message.CreateMessageParams{
498 Role: message.Tool,
499 Parts: []message.ContentPart{
500 toolResult,
501 },
502 })
503 return createMsgErr
504 },
505 OnStepFinish: func(stepResult fantasy.StepResult) error {
506 finishReason := message.FinishReasonUnknown
507 switch stepResult.FinishReason {
508 case fantasy.FinishReasonLength:
509 finishReason = message.FinishReasonMaxTokens
510 case fantasy.FinishReasonStop:
511 finishReason = message.FinishReasonEndTurn
512 case fantasy.FinishReasonToolCalls:
513 finishReason = message.FinishReasonToolUse
514 }
515 // If a tool result halted the turn (e.g. a hook halt or a
516 // permission denial), the step ends on FinishReasonToolCalls but
517 // the model will not be called again. Treat it as the end of the
518 // turn so the UI can render the assistant footer.
519 if finishReason == message.FinishReasonToolUse {
520 for _, tr := range stepResult.Content.ToolResults() {
521 if tr.StopTurn {
522 finishReason = message.FinishReasonEndTurn
523 break
524 }
525 }
526 }
527 currentAssistant.AddFinish(finishReason, "", "")
528 sessionLock.Lock()
529 defer sessionLock.Unlock()
530
531 updatedSession, getSessionErr := a.sessions.Get(ctx, call.SessionID)
532 if getSessionErr != nil {
533 return getSessionErr
534 }
535 usage, estimated := fallbackStepUsage(stepMessages, stepResult)
536 a.updateSessionUsage(largeModel, &updatedSession, usage, a.openrouterCost(stepResult.ProviderMetadata), estimated)
537 _, sessionErr := a.sessions.Save(ctx, updatedSession)
538 if sessionErr != nil {
539 return sessionErr
540 }
541 currentSession = updatedSession
542 return a.messages.Update(genCtx, *currentAssistant)
543 },
544 StopWhen: []fantasy.StopCondition{
545 func(_ []fantasy.StepResult) bool {
546 cw := int64(largeModel.CatwalkCfg.ContextWindow)
547 // If context window is unknown (0), skip auto-summarize
548 // to avoid immediately truncating custom/local models.
549 if cw == 0 {
550 return false
551 }
552 tokens := currentSession.CompletionTokens + currentSession.PromptTokens
553 remaining := cw - tokens
554 var threshold int64
555 if cw > largeContextWindowThreshold {
556 threshold = largeContextWindowBuffer
557 } else {
558 threshold = int64(float64(cw) * smallContextWindowRatio)
559 }
560 if (remaining <= threshold) && !a.disableAutoSummarize {
561 shouldSummarize = true
562 return true
563 }
564 return false
565 },
566 func(steps []fantasy.StepResult) bool {
567 return hasRepeatedToolCalls(steps, loopDetectionWindowSize, loopDetectionMaxRepeats)
568 },
569 },
570 })
571
572 a.eventPromptResponded(call.SessionID, time.Since(startTime).Truncate(time.Second))
573
574 if err != nil {
575 isHyper := largeModel.ModelCfg.Provider == hyper.Name
576 isCancelErr := errors.Is(err, context.Canceled)
577 if currentAssistant == nil {
578 return result, err
579 }
580 // Ensure we finish thinking on error to close the reasoning state.
581 currentAssistant.FinishThinking()
582 toolCalls := currentAssistant.ToolCalls()
583 // INFO: we use the parent context here because the genCtx has been cancelled.
584 msgs, createErr := a.messages.List(ctx, currentAssistant.SessionID)
585 if createErr != nil {
586 return nil, createErr
587 }
588 for _, tc := range toolCalls {
589 if !tc.Finished {
590 tc.Finished = true
591 tc.Input = "{}"
592 currentAssistant.AddToolCall(tc)
593 updateErr := a.messages.Update(ctx, *currentAssistant)
594 if updateErr != nil {
595 return nil, updateErr
596 }
597 }
598
599 found := false
600 for _, msg := range msgs {
601 if msg.Role == message.Tool {
602 for _, tr := range msg.ToolResults() {
603 if tr.ToolCallID == tc.ID {
604 found = true
605 break
606 }
607 }
608 }
609 if found {
610 break
611 }
612 }
613 if found {
614 continue
615 }
616 content := "There was an error while executing the tool"
617 if isCancelErr {
618 content = "Error: user cancelled assistant tool calling"
619 }
620 toolResult := message.ToolResult{
621 ToolCallID: tc.ID,
622 Name: tc.Name,
623 Content: content,
624 IsError: true,
625 }
626 _, createErr = a.messages.Create(ctx, currentAssistant.SessionID, message.CreateMessageParams{
627 Role: message.Tool,
628 Parts: []message.ContentPart{
629 toolResult,
630 },
631 })
632 if createErr != nil {
633 return nil, createErr
634 }
635 }
636 var fantasyErr *fantasy.Error
637 var providerErr *fantasy.ProviderError
638 const defaultTitle = "Provider Error"
639 linkStyle := lipgloss.NewStyle().Foreground(charmtone.Guac).Underline(true)
640 if isCancelErr {
641 currentAssistant.AddFinish(message.FinishReasonCanceled, "User canceled request", "")
642 } else if isHyper && errors.As(err, &providerErr) && providerErr.StatusCode == http.StatusUnauthorized {
643 currentAssistant.AddFinish(message.FinishReasonError, "Unauthorized", `Please re-authenticate with Hyper. You can also run "crush auth" to re-authenticate.`)
644 if a.notify != nil {
645 a.notify.Publish(pubsub.CreatedEvent, notify.Notification{
646 SessionID: call.SessionID,
647 SessionTitle: currentSession.Title,
648 Type: notify.TypeReAuthenticate,
649 ProviderID: largeModel.ModelCfg.Provider,
650 })
651 }
652 } else if isHyper && errors.As(err, &providerErr) && providerErr.StatusCode == http.StatusPaymentRequired {
653 url := hyper.BaseURL()
654 link := linkStyle.Hyperlink(url, "id=hyper").Render(url)
655 currentAssistant.AddFinish(message.FinishReasonError, "No credits", "You're out of credits. Add more at "+link)
656 } else if errors.As(err, &providerErr) {
657 if providerErr.Message == "The requested model is not supported." {
658 url := "https://github.com/settings/copilot/features"
659 link := linkStyle.Hyperlink(url, "id=copilot").Render(url)
660 currentAssistant.AddFinish(
661 message.FinishReasonError,
662 "Copilot model not enabled",
663 fmt.Sprintf("%q is not enabled in Copilot. Go to the following page to enable it. Then, wait 5 minutes before trying again. %s", largeModel.CatwalkCfg.Name, link),
664 )
665 } else {
666 currentAssistant.AddFinish(message.FinishReasonError, cmp.Or(stringext.Capitalize(providerErr.Title), defaultTitle), providerErr.Message)
667 }
668 } else if errors.As(err, &fantasyErr) {
669 currentAssistant.AddFinish(message.FinishReasonError, cmp.Or(stringext.Capitalize(fantasyErr.Title), defaultTitle), fantasyErr.Message)
670 } else {
671 currentAssistant.AddFinish(message.FinishReasonError, defaultTitle, err.Error())
672 }
673 // Note: we use the parent context here because the genCtx has been
674 // cancelled.
675 updateErr := a.messages.Update(ctx, *currentAssistant)
676 if updateErr != nil {
677 return nil, updateErr
678 }
679 return nil, err
680 }
681
682 if shouldSummarize {
683 a.activeRequests.Del(call.SessionID)
684 if summarizeErr := a.Summarize(genCtx, call.SessionID, call.ProviderOptions); summarizeErr != nil {
685 return nil, summarizeErr
686 }
687 // If the agent wasn't done...
688 if len(currentAssistant.ToolCalls()) > 0 {
689 existing, ok := a.messageQueue.Get(call.SessionID)
690 if !ok {
691 existing = []SessionAgentCall{}
692 }
693 call.Prompt = fmt.Sprintf("The previous session was interrupted because it got too long, the initial user request was: `%s`", call.Prompt)
694 existing = append(existing, call)
695 a.messageQueue.Set(call.SessionID, existing)
696 }
697 }
698
699 // Release active request before publishing the notification.
700 // TUI handlers poll IsSessionBusy() and only re-evaluate when a
701 // tea.Msg arrives, so the cleanup must precede the notify or
702 // subscribers see stale busy state at the moment of receipt.
703 a.activeRequests.Del(call.SessionID)
704 cancel()
705
706 // Send notification that agent has finished its turn (skip for
707 // nested/non-interactive sessions).
708 if !call.NonInteractive && a.notify != nil {
709 a.notify.Publish(pubsub.CreatedEvent, notify.Notification{
710 SessionID: call.SessionID,
711 SessionTitle: currentSession.Title,
712 Type: notify.TypeAgentFinished,
713 })
714 }
715
716 queuedMessages, ok := a.messageQueue.Get(call.SessionID)
717 if !ok || len(queuedMessages) == 0 {
718 return result, err
719 }
720 // There are queued messages restart the loop. The recursive Run
721 // publishes its own RunComplete for the queued prompt, so suppress
722 // the outer defer's emit to avoid a duplicate event whose Error
723 // field would belong to the recursive turn but whose MessageID/Text
724 // would belong to the outer turn.
725 skipRunComplete = true
726 firstQueuedMessage := queuedMessages[0]
727 a.messageQueue.Set(call.SessionID, queuedMessages[1:])
728 return a.Run(ctx, firstQueuedMessage)
729}
730
731func (a *sessionAgent) Summarize(ctx context.Context, sessionID string, opts fantasy.ProviderOptions) error {
732 if a.IsSessionBusy(sessionID) {
733 return ErrSessionBusy
734 }
735
736 // Copy mutable fields under lock to avoid races with SetModels.
737 largeModel := a.largeModel.Get()
738 systemPromptPrefix := a.systemPromptPrefix.Get()
739
740 currentSession, err := a.sessions.Get(ctx, sessionID)
741 if err != nil {
742 return fmt.Errorf("failed to get session: %w", err)
743 }
744 msgs, err := a.getSessionMessages(ctx, currentSession)
745 if err != nil {
746 return err
747 }
748 if len(msgs) == 0 {
749 // Nothing to summarize.
750 return nil
751 }
752
753 aiMsgs, _ := a.preparePrompt(msgs, largeModel.CatwalkCfg.SupportsImages)
754
755 genCtx, cancel := context.WithCancel(ctx)
756 a.activeRequests.Set(sessionID, cancel)
757 defer a.activeRequests.Del(sessionID)
758 defer cancel()
759 defer func() {
760 if flushErr := a.messages.FlushAll(ctx); flushErr != nil {
761 slog.Error("Failed to flush pending message updates after summarize", "error", flushErr)
762 }
763 }()
764
765 agent := fantasy.NewAgent(
766 largeModel.Model,
767 fantasy.WithSystemPrompt(string(summaryPrompt)),
768 fantasy.WithUserAgent(userAgent),
769 )
770 summaryMessage, err := a.messages.Create(ctx, sessionID, message.CreateMessageParams{
771 Role: message.Assistant,
772 Model: largeModel.ModelCfg.Model,
773 Provider: largeModel.ModelCfg.Provider,
774 IsSummaryMessage: true,
775 })
776 if err != nil {
777 return err
778 }
779
780 summaryPromptText := buildSummaryPrompt(currentSession.Todos)
781
782 resp, err := agent.Stream(genCtx, fantasy.AgentStreamCall{
783 Prompt: summaryPromptText,
784 Messages: aiMsgs,
785 ProviderOptions: opts,
786 PrepareStep: func(callContext context.Context, options fantasy.PrepareStepFunctionOptions) (_ context.Context, prepared fantasy.PrepareStepResult, err error) {
787 prepared.Messages = options.Messages
788 if systemPromptPrefix != "" {
789 prepared.Messages = append([]fantasy.Message{fantasy.NewSystemMessage(systemPromptPrefix)}, prepared.Messages...)
790 }
791 return callContext, prepared, nil
792 },
793 OnReasoningDelta: func(id string, text string) error {
794 summaryMessage.AppendReasoningContent(text)
795 return a.messages.Update(genCtx, summaryMessage)
796 },
797 OnReasoningEnd: func(id string, reasoning fantasy.ReasoningContent) error {
798 // Handle anthropic signature.
799 if anthropicData, ok := reasoning.ProviderMetadata["anthropic"]; ok {
800 if signature, ok := anthropicData.(*anthropic.ReasoningOptionMetadata); ok && signature.Signature != "" {
801 summaryMessage.AppendReasoningSignature(signature.Signature)
802 }
803 }
804 summaryMessage.FinishThinking()
805 return a.messages.Update(genCtx, summaryMessage)
806 },
807 OnTextDelta: func(id, text string) error {
808 summaryMessage.AppendContent(text)
809 return a.messages.Update(genCtx, summaryMessage)
810 },
811 })
812 if err != nil {
813 isCancelErr := errors.Is(err, context.Canceled)
814 if isCancelErr {
815 // User cancelled summarize we need to remove the summary message.
816 deleteErr := a.messages.Delete(ctx, summaryMessage.ID)
817 return deleteErr
818 }
819 // Mark the summary message as finished with an error so the UI
820 // stops spinning.
821 summaryMessage.AddFinish(message.FinishReasonError, "Summarization Error", err.Error())
822 if updateErr := a.messages.Update(ctx, summaryMessage); updateErr != nil {
823 return updateErr
824 }
825 return err
826 }
827
828 summaryMessage.AddFinish(message.FinishReasonEndTurn, "", "")
829 err = a.messages.Update(genCtx, summaryMessage)
830 if err != nil {
831 return err
832 }
833
834 var openrouterCost *float64
835 for _, step := range resp.Steps {
836 stepCost := a.openrouterCost(step.ProviderMetadata)
837 if stepCost != nil {
838 newCost := *stepCost
839 if openrouterCost != nil {
840 newCost += *openrouterCost
841 }
842 openrouterCost = &newCost
843 }
844 }
845
846 a.updateSessionUsage(largeModel, ¤tSession, resp.TotalUsage, openrouterCost, false)
847
848 // Just in case, get just the last usage info.
849 usage := resp.Response.Usage
850 currentSession.SummaryMessageID = summaryMessage.ID
851 currentSession.CompletionTokens = summaryCompletionTokens(usage, summaryMessage)
852 currentSession.PromptTokens = 0
853 currentSession.EstimatedUsage = usageIsZero(usage)
854 _, err = a.sessions.Save(genCtx, currentSession)
855 if err != nil {
856 return err
857 }
858
859 // Release the active request before processing queued messages so that
860 // Run() does not see the session as busy.
861 a.activeRequests.Del(sessionID)
862 cancel()
863
864 // Process any messages that were queued while summarizing.
865 queuedMessages, ok := a.messageQueue.Get(sessionID)
866 if !ok || len(queuedMessages) == 0 {
867 return nil
868 }
869 firstQueuedMessage := queuedMessages[0]
870 a.messageQueue.Set(sessionID, queuedMessages[1:])
871 _, qErr := a.Run(ctx, firstQueuedMessage)
872 return qErr
873}
874
875func (a *sessionAgent) getCacheControlOptions() fantasy.ProviderOptions {
876 if t, _ := strconv.ParseBool(os.Getenv("CRUSH_DISABLE_ANTHROPIC_CACHE")); t {
877 return fantasy.ProviderOptions{}
878 }
879 return fantasy.ProviderOptions{
880 anthropic.Name: &anthropic.ProviderCacheControlOptions{
881 CacheControl: anthropic.CacheControl{Type: "ephemeral"},
882 },
883 bedrock.Name: &anthropic.ProviderCacheControlOptions{
884 CacheControl: anthropic.CacheControl{Type: "ephemeral"},
885 },
886 vercel.Name: &anthropic.ProviderCacheControlOptions{
887 CacheControl: anthropic.CacheControl{Type: "ephemeral"},
888 },
889 }
890}
891
892func (a *sessionAgent) createUserMessage(ctx context.Context, call SessionAgentCall) (message.Message, error) {
893 parts := []message.ContentPart{message.TextContent{Text: call.Prompt}}
894 var attachmentParts []message.ContentPart
895 for _, attachment := range call.Attachments {
896 attachmentParts = append(attachmentParts, message.BinaryContent{Path: attachment.FilePath, MIMEType: attachment.MimeType, Data: attachment.Content})
897 }
898 parts = append(parts, attachmentParts...)
899 msg, err := a.messages.Create(ctx, call.SessionID, message.CreateMessageParams{
900 Role: message.User,
901 Parts: parts,
902 })
903 if err != nil {
904 return message.Message{}, fmt.Errorf("failed to create user message: %w", err)
905 }
906 return msg, nil
907}
908
909func (a *sessionAgent) preparePrompt(msgs []message.Message, supportsImages bool, attachments ...message.Attachment) ([]fantasy.Message, []fantasy.FilePart) {
910 var history []fantasy.Message
911 if !a.isSubAgent {
912 history = append(history, fantasy.NewUserMessage(
913 fmt.Sprintf(
914 "<system_reminder>%s</system_reminder>",
915 `This is a reminder that your todo list is currently empty. DO NOT mention this to the user explicitly because they are already aware.
916If you are working on tasks that would benefit from a todo list please use the "todos" tool to create one.
917If not, please feel free to ignore. Again do not mention this message to the user.`,
918 ),
919 ))
920 }
921 // Collect all tool call IDs present in assistant messages and all tool
922 // result IDs present in tool messages. This lets us detect both orphaned
923 // tool results (result without a call) and orphaned tool calls (call
924 // without a result).
925 knownToolCallIDs := make(map[string]struct{})
926 knownToolResultIDs := make(map[string]struct{})
927 for _, m := range msgs {
928 switch m.Role {
929 case message.Assistant:
930 for _, tc := range m.ToolCalls() {
931 knownToolCallIDs[tc.ID] = struct{}{}
932 }
933 case message.Tool:
934 for _, tr := range m.ToolResults() {
935 knownToolResultIDs[tr.ToolCallID] = struct{}{}
936 }
937 }
938 }
939
940 for _, m := range msgs {
941 if len(m.Parts) == 0 {
942 continue
943 }
944 // Assistant message without content or tool calls (cancelled before it returned anything).
945 if m.Role == message.Assistant && len(m.ToolCalls()) == 0 && m.Content().Text == "" && m.ReasoningContent().String() == "" {
946 continue
947 }
948 if m.Role == message.Tool {
949 if msg, ok := filterOrphanedToolResults(m, knownToolCallIDs); ok {
950 history = append(history, msg)
951 }
952 continue
953 }
954 aiMsgs := m.ToAIMessage()
955 if !supportsImages {
956 for i := range aiMsgs {
957 if aiMsgs[i].Role == fantasy.MessageRoleUser {
958 aiMsgs[i].Content = filterFileParts(aiMsgs[i].Content)
959 }
960 }
961 }
962 history = append(history, aiMsgs...)
963
964 if m.Role == message.Assistant {
965 if msg, ok := syntheticToolResultsForOrphanedCalls(m, knownToolResultIDs); ok {
966 history = append(history, msg)
967 }
968 }
969 }
970
971 var files []fantasy.FilePart
972 for _, attachment := range attachments {
973 if attachment.IsText() {
974 continue
975 }
976 files = append(files, fantasy.FilePart{
977 Filename: attachment.FileName,
978 Data: attachment.Content,
979 MediaType: attachment.MimeType,
980 })
981 }
982
983 return history, files
984}
985
986// filterFileParts removes fantasy.FilePart entries from a slice of message
987// parts. Used to strip image attachments from historical user messages when
988// the current model does not support them.
989func filterFileParts(parts []fantasy.MessagePart) []fantasy.MessagePart {
990 filtered := make([]fantasy.MessagePart, 0, len(parts))
991 for _, part := range parts {
992 if _, ok := fantasy.AsMessagePart[fantasy.FilePart](part); ok {
993 continue
994 }
995 filtered = append(filtered, part)
996 }
997 return filtered
998}
999
1000// filterOrphanedToolResults converts a tool message to a fantasy.Message,
1001// dropping any tool result parts whose tool_call_id has no matching tool call
1002// in the known set. An orphaned result causes API validation to fail on every
1003// subsequent turn, permanently locking the session. Returns the filtered
1004// message and true if at least one valid part remains.
1005func filterOrphanedToolResults(m message.Message, knownToolCallIDs map[string]struct{}) (fantasy.Message, bool) {
1006 aiMsgs := m.ToAIMessage()
1007 if len(aiMsgs) == 0 {
1008 return fantasy.Message{}, false
1009 }
1010 var validParts []fantasy.MessagePart
1011 for _, part := range aiMsgs[0].Content {
1012 tr, ok := fantasy.AsMessagePart[fantasy.ToolResultPart](part)
1013 if !ok {
1014 validParts = append(validParts, part)
1015 continue
1016 }
1017 if _, known := knownToolCallIDs[tr.ToolCallID]; known {
1018 validParts = append(validParts, part)
1019 } else {
1020 slog.Warn(
1021 "Dropping orphaned tool result with no matching tool call",
1022 "tool_call_id", tr.ToolCallID,
1023 )
1024 }
1025 }
1026 if len(validParts) == 0 {
1027 return fantasy.Message{}, false
1028 }
1029 msg := aiMsgs[0]
1030 msg.Content = validParts
1031 return msg, true
1032}
1033
1034// syntheticToolResultsForOrphanedCalls returns a tool message containing
1035// synthetic tool results for any tool calls in the assistant message that
1036// have no matching result in knownToolResultIDs. LLM APIs require every
1037// tool_use to be immediately followed by a tool_result; an interrupted
1038// session can leave orphaned tool_use blocks that permanently lock the
1039// conversation. Returns the message and true if any synthetic results were
1040// produced.
1041func syntheticToolResultsForOrphanedCalls(m message.Message, knownToolResultIDs map[string]struct{}) (fantasy.Message, bool) {
1042 var syntheticParts []fantasy.MessagePart
1043 for _, tc := range m.ToolCalls() {
1044 if _, hasResult := knownToolResultIDs[tc.ID]; hasResult {
1045 continue
1046 }
1047 slog.Warn(
1048 "Injecting synthetic tool result for orphaned tool call",
1049 "tool_call_id", tc.ID,
1050 "tool_name", tc.Name,
1051 )
1052 syntheticParts = append(syntheticParts, fantasy.ToolResultPart{
1053 ToolCallID: tc.ID,
1054 Output: fantasy.ToolResultOutputContentError{
1055 Error: errors.New("tool call was interrupted and did not produce a result, you may retry this call if the result is still needed"),
1056 },
1057 })
1058 }
1059 if len(syntheticParts) == 0 {
1060 return fantasy.Message{}, false
1061 }
1062 return fantasy.Message{
1063 Role: fantasy.MessageRoleTool,
1064 Content: syntheticParts,
1065 }, true
1066}
1067
1068func (a *sessionAgent) getSessionMessages(ctx context.Context, session session.Session) ([]message.Message, error) {
1069 msgs, err := a.messages.List(ctx, session.ID)
1070 if err != nil {
1071 return nil, fmt.Errorf("failed to list messages: %w", err)
1072 }
1073
1074 if session.SummaryMessageID != "" {
1075 summaryMsgIndex := -1
1076 for i, msg := range msgs {
1077 if msg.ID == session.SummaryMessageID {
1078 summaryMsgIndex = i
1079 break
1080 }
1081 }
1082 if summaryMsgIndex != -1 {
1083 msgs = msgs[summaryMsgIndex:]
1084 msgs[0].Role = message.User
1085 }
1086 }
1087 return msgs, nil
1088}
1089
1090// generateTitle generates a session titled based on the initial prompt.
1091func (a *sessionAgent) generateTitle(ctx context.Context, sessionID string, userPrompt string) {
1092 if userPrompt == "" {
1093 return
1094 }
1095
1096 smallModel := a.smallModel.Get()
1097 largeModel := a.largeModel.Get()
1098 systemPromptPrefix := a.systemPromptPrefix.Get()
1099
1100 var maxOutputTokens int64 = 40
1101 if smallModel.CatwalkCfg.CanReason {
1102 maxOutputTokens = smallModel.CatwalkCfg.DefaultMaxTokens
1103 }
1104
1105 newAgent := func(m fantasy.LanguageModel, p []byte, tok int64) fantasy.Agent {
1106 return fantasy.NewAgent(
1107 m,
1108 fantasy.WithSystemPrompt(string(p)+"\n /no_think"),
1109 fantasy.WithMaxOutputTokens(tok),
1110 fantasy.WithUserAgent(userAgent),
1111 )
1112 }
1113
1114 streamCall := fantasy.AgentStreamCall{
1115 Prompt: fmt.Sprintf("Generate a concise title for the following content:\n\n%s\n <think>\n\n</think>", userPrompt),
1116 PrepareStep: func(callCtx context.Context, opts fantasy.PrepareStepFunctionOptions) (_ context.Context, prepared fantasy.PrepareStepResult, err error) {
1117 prepared.Messages = opts.Messages
1118 if systemPromptPrefix != "" {
1119 prepared.Messages = append([]fantasy.Message{
1120 fantasy.NewSystemMessage(systemPromptPrefix),
1121 }, prepared.Messages...)
1122 }
1123 return callCtx, prepared, nil
1124 },
1125 }
1126
1127 // Use the small model to generate the title.
1128 model := smallModel
1129 agent := newAgent(model.Model, titlePrompt, maxOutputTokens)
1130 resp, err := agent.Stream(ctx, streamCall)
1131 if err == nil {
1132 // We successfully generated a title with the small model.
1133 slog.Debug("Generated title with small model")
1134 } else {
1135 // It didn't work. Let's try with the big model.
1136 slog.Error("Error generating title with small model; trying big model", "err", err)
1137 model = largeModel
1138 agent = newAgent(model.Model, titlePrompt, maxOutputTokens)
1139 resp, err = agent.Stream(ctx, streamCall)
1140 if err == nil {
1141 slog.Debug("Generated title with large model")
1142 } else {
1143 // Welp, the large model didn't work either. Use the default
1144 // session name and return.
1145 slog.Error("Error generating title with large model", "err", err)
1146 saveErr := a.sessions.Rename(ctx, sessionID, DefaultSessionName)
1147 if saveErr != nil {
1148 slog.Error("Failed to save session title", "error", saveErr)
1149 }
1150 return
1151 }
1152 }
1153
1154 if resp == nil {
1155 // Actually, we didn't get a response so we can't. Use the default
1156 // session name and return.
1157 slog.Error("Response is nil; can't generate title")
1158 saveErr := a.sessions.Rename(ctx, sessionID, DefaultSessionName)
1159 if saveErr != nil {
1160 slog.Error("Failed to save session title", "error", saveErr)
1161 }
1162 return
1163 }
1164
1165 // Clean up title.
1166 var title string
1167 title = strings.ReplaceAll(resp.Response.Content.Text(), "\n", " ")
1168
1169 // Remove thinking tags if present.
1170 title = thinkTagRegex.ReplaceAllString(title, "")
1171 title = orphanThinkTagRegex.ReplaceAllString(title, "")
1172
1173 title = strings.TrimSpace(title)
1174 title = cmp.Or(title, DefaultSessionName)
1175
1176 // Calculate usage and cost.
1177 var openrouterCost *float64
1178 for _, step := range resp.Steps {
1179 stepCost := a.openrouterCost(step.ProviderMetadata)
1180 if stepCost != nil {
1181 newCost := *stepCost
1182 if openrouterCost != nil {
1183 newCost += *openrouterCost
1184 }
1185 openrouterCost = &newCost
1186 }
1187 }
1188
1189 modelConfig := model.CatwalkCfg
1190 cost := modelConfig.CostPer1MInCached/1e6*float64(resp.TotalUsage.CacheCreationTokens) +
1191 modelConfig.CostPer1MOutCached/1e6*float64(resp.TotalUsage.CacheReadTokens) +
1192 modelConfig.CostPer1MIn/1e6*float64(resp.TotalUsage.InputTokens) +
1193 modelConfig.CostPer1MOut/1e6*float64(resp.TotalUsage.OutputTokens)
1194
1195 // Use override cost if available (e.g., from OpenRouter).
1196 if openrouterCost != nil {
1197 cost = *openrouterCost
1198 }
1199
1200 // Skip cost accumulation
1201 if model.FlatRate {
1202 cost = 0
1203 }
1204
1205 promptTokens := resp.TotalUsage.InputTokens + resp.TotalUsage.CacheCreationTokens
1206 completionTokens := resp.TotalUsage.OutputTokens
1207
1208 // Atomically update only title and usage fields to avoid overriding other
1209 // concurrent session updates.
1210 saveErr := a.sessions.UpdateTitleAndUsage(ctx, sessionID, title, promptTokens, completionTokens, cost)
1211 if saveErr != nil {
1212 slog.Error("Failed to save session title and usage", "error", saveErr)
1213 return
1214 }
1215}
1216
1217func (a *sessionAgent) openrouterCost(metadata fantasy.ProviderMetadata) *float64 {
1218 openrouterMetadata, ok := metadata[openrouter.Name]
1219 if !ok {
1220 return nil
1221 }
1222
1223 opts, ok := openrouterMetadata.(*openrouter.ProviderMetadata)
1224 if !ok {
1225 return nil
1226 }
1227 return &opts.Usage.Cost
1228}
1229
1230func (a *sessionAgent) updateSessionUsage(model Model, session *session.Session, usage fantasy.Usage, overrideCost *float64, estimated bool) {
1231 if !usageIsZero(usage) {
1232 session.EstimatedUsage = estimated
1233 }
1234
1235 modelConfig := model.CatwalkCfg
1236 cost := modelConfig.CostPer1MInCached/1e6*float64(usage.CacheCreationTokens) +
1237 modelConfig.CostPer1MOutCached/1e6*float64(usage.CacheReadTokens) +
1238 modelConfig.CostPer1MIn/1e6*float64(usage.InputTokens) +
1239 modelConfig.CostPer1MOut/1e6*float64(usage.OutputTokens)
1240
1241 if !estimated {
1242 a.eventTokensUsed(session.ID, model, usage, cost)
1243 }
1244
1245 if estimated {
1246 cost = 0
1247 } else {
1248 // Use override cost if available (e.g., from OpenRouter).
1249 if overrideCost != nil {
1250 cost = *overrideCost
1251 }
1252
1253 // Skip cost accumulation
1254 if model.FlatRate {
1255 cost = 0
1256 }
1257 }
1258
1259 session.Cost += cost
1260 updateSessionTokenCounters(session, usage)
1261}
1262
1263func updateSessionTokenCounters(session *session.Session, usage fantasy.Usage) {
1264 if usage.OutputTokens != 0 {
1265 session.CompletionTokens = usage.OutputTokens
1266 }
1267 if promptTokens := usage.InputTokens + usage.CacheReadTokens; promptTokens != 0 {
1268 session.PromptTokens = promptTokens
1269 }
1270}
1271
1272func summaryCompletionTokens(usage fantasy.Usage, summaryMessage message.Message) int64 {
1273 if usage.OutputTokens != 0 {
1274 return usage.OutputTokens
1275 }
1276 return approxTokenCount(summaryMessage.Content().Text) + approxTokenCount(summaryMessage.ReasoningContent().String())
1277}
1278
1279func (a *sessionAgent) Cancel(sessionID string) {
1280 // Cancel regular requests. Don't use Take() here - we need the entry to
1281 // remain in activeRequests so IsBusy() returns true until the goroutine
1282 // fully completes (including error handling that may access the DB).
1283 // The defer in processRequest will clean up the entry.
1284 if cancel, ok := a.activeRequests.Get(sessionID); ok && cancel != nil {
1285 slog.Debug("Request cancellation initiated", "session_id", sessionID)
1286 cancel()
1287 }
1288
1289 // Also check for summarize requests.
1290 if cancel, ok := a.activeRequests.Get(sessionID + "-summarize"); ok && cancel != nil {
1291 slog.Debug("Summarize cancellation initiated", "session_id", sessionID)
1292 cancel()
1293 }
1294
1295 if a.QueuedPrompts(sessionID) > 0 {
1296 slog.Debug("Clearing queued prompts", "session_id", sessionID)
1297 a.messageQueue.Del(sessionID)
1298 }
1299}
1300
1301func (a *sessionAgent) ClearQueue(sessionID string) {
1302 if a.QueuedPrompts(sessionID) > 0 {
1303 slog.Debug("Clearing queued prompts", "session_id", sessionID)
1304 a.messageQueue.Del(sessionID)
1305 }
1306}
1307
1308func (a *sessionAgent) CancelAll() {
1309 if !a.IsBusy() {
1310 return
1311 }
1312 for key := range a.activeRequests.Seq2() {
1313 a.Cancel(key) // key is sessionID
1314 }
1315
1316 timeout := time.After(5 * time.Second)
1317 for a.IsBusy() {
1318 select {
1319 case <-timeout:
1320 return
1321 default:
1322 time.Sleep(200 * time.Millisecond)
1323 }
1324 }
1325}
1326
1327func (a *sessionAgent) IsBusy() bool {
1328 var busy bool
1329 for cancelFunc := range a.activeRequests.Seq() {
1330 if cancelFunc != nil {
1331 busy = true
1332 break
1333 }
1334 }
1335 return busy
1336}
1337
1338func (a *sessionAgent) IsSessionBusy(sessionID string) bool {
1339 _, busy := a.activeRequests.Get(sessionID)
1340 return busy
1341}
1342
1343func (a *sessionAgent) QueuedPrompts(sessionID string) int {
1344 l, ok := a.messageQueue.Get(sessionID)
1345 if !ok {
1346 return 0
1347 }
1348 return len(l)
1349}
1350
1351func (a *sessionAgent) QueuedPromptsList(sessionID string) []string {
1352 l, ok := a.messageQueue.Get(sessionID)
1353 if !ok {
1354 return nil
1355 }
1356 prompts := make([]string, len(l))
1357 for i, call := range l {
1358 prompts[i] = call.Prompt
1359 }
1360 return prompts
1361}
1362
1363func (a *sessionAgent) SetModels(large Model, small Model) {
1364 a.largeModel.Set(large)
1365 a.smallModel.Set(small)
1366}
1367
1368func (a *sessionAgent) SetTools(tools []fantasy.AgentTool) {
1369 a.tools.SetSlice(tools)
1370}
1371
1372func (a *sessionAgent) SetSystemPrompt(systemPrompt string) {
1373 a.systemPrompt.Set(systemPrompt)
1374}
1375
1376func (a *sessionAgent) Model() Model {
1377 return a.largeModel.Get()
1378}
1379
1380// convertToToolResult converts a fantasy tool result to a message tool result.
1381func (a *sessionAgent) convertToToolResult(result fantasy.ToolResultContent) message.ToolResult {
1382 baseResult := message.ToolResult{
1383 ToolCallID: result.ToolCallID,
1384 Name: result.ToolName,
1385 Metadata: result.ClientMetadata,
1386 }
1387
1388 switch result.Result.GetType() {
1389 case fantasy.ToolResultContentTypeText:
1390 if r, ok := fantasy.AsToolResultOutputType[fantasy.ToolResultOutputContentText](result.Result); ok {
1391 baseResult.Content = r.Text
1392 }
1393 case fantasy.ToolResultContentTypeError:
1394 if r, ok := fantasy.AsToolResultOutputType[fantasy.ToolResultOutputContentError](result.Result); ok {
1395 baseResult.Content = r.Error.Error()
1396 baseResult.IsError = true
1397 }
1398 case fantasy.ToolResultContentTypeMedia:
1399 if r, ok := fantasy.AsToolResultOutputType[fantasy.ToolResultOutputContentMedia](result.Result); ok {
1400 if !stringext.IsValidBase64(r.Data) {
1401 slog.Warn(
1402 "Tool returned media with invalid base64 data, discarding image",
1403 "tool", result.ToolName,
1404 "tool_call_id", result.ToolCallID,
1405 )
1406 baseResult.Content = "Tool returned image data with invalid encoding"
1407 baseResult.IsError = true
1408 } else {
1409 content := r.Text
1410 if content == "" {
1411 content = fmt.Sprintf("Loaded %s content", r.MediaType)
1412 }
1413 baseResult.Content = content
1414 baseResult.Data = r.Data
1415 baseResult.MIMEType = r.MediaType
1416 }
1417 }
1418 }
1419
1420 return baseResult
1421}
1422
1423// workaroundProviderMediaLimitations converts media content in tool results to
1424// user messages for providers that don't natively support images in tool results.
1425//
1426// Problem: OpenAI, Google, OpenRouter, and other OpenAI-compatible providers
1427// don't support sending images/media in tool result messages - they only accept
1428// text in tool results. However, they DO support images in user messages.
1429//
1430// If we send media in tool results to these providers, the API returns an error.
1431//
1432// Solution: For these providers, we:
1433// 1. Replace the media in the tool result with a text placeholder
1434// 2. Inject a user message immediately after with the image as a file attachment
1435// 3. This maintains the tool execution flow while working around API limitations
1436//
1437// Anthropic and Bedrock support images natively in tool results, so we skip
1438// this workaround for them.
1439//
1440// Example transformation:
1441//
1442// BEFORE: [tool result: image data]
1443// AFTER: [tool result: "Image loaded - see attached"], [user: image attachment]
1444func (a *sessionAgent) workaroundProviderMediaLimitations(messages []fantasy.Message, largeModel Model) []fantasy.Message {
1445 providerSupportsMedia := largeModel.ModelCfg.Provider == string(catwalk.InferenceProviderAnthropic) ||
1446 largeModel.ModelCfg.Provider == string(catwalk.InferenceProviderBedrock) ||
1447 largeModel.ModelCfg.Provider == string(catwalk.InferenceProviderBedrockEurope)
1448
1449 if providerSupportsMedia {
1450 return messages
1451 }
1452
1453 convertedMessages := make([]fantasy.Message, 0, len(messages))
1454
1455 for _, msg := range messages {
1456 if msg.Role != fantasy.MessageRoleTool {
1457 convertedMessages = append(convertedMessages, msg)
1458 continue
1459 }
1460
1461 textParts := make([]fantasy.MessagePart, 0, len(msg.Content))
1462 var mediaFiles []fantasy.FilePart
1463
1464 for _, part := range msg.Content {
1465 toolResult, ok := fantasy.AsMessagePart[fantasy.ToolResultPart](part)
1466 if !ok {
1467 textParts = append(textParts, part)
1468 continue
1469 }
1470
1471 if media, ok := fantasy.AsToolResultOutputType[fantasy.ToolResultOutputContentMedia](toolResult.Output); ok {
1472 decoded, err := base64.StdEncoding.DecodeString(media.Data)
1473 if err != nil {
1474 slog.Warn("Failed to decode media data", "error", err)
1475 textParts = append(textParts, part)
1476 continue
1477 }
1478
1479 mediaFiles = append(mediaFiles, fantasy.FilePart{
1480 Data: decoded,
1481 MediaType: media.MediaType,
1482 Filename: fmt.Sprintf("tool-result-%s", toolResult.ToolCallID),
1483 })
1484
1485 textParts = append(textParts, fantasy.ToolResultPart{
1486 ToolCallID: toolResult.ToolCallID,
1487 Output: fantasy.ToolResultOutputContentText{
1488 Text: "[Image/media content loaded - see attached file]",
1489 },
1490 ProviderOptions: toolResult.ProviderOptions,
1491 })
1492 } else {
1493 textParts = append(textParts, part)
1494 }
1495 }
1496
1497 convertedMessages = append(convertedMessages, fantasy.Message{
1498 Role: fantasy.MessageRoleTool,
1499 Content: textParts,
1500 })
1501
1502 if len(mediaFiles) > 0 {
1503 convertedMessages = append(convertedMessages, fantasy.NewUserMessage(
1504 "Here is the media content from the tool result:",
1505 mediaFiles...,
1506 ))
1507 }
1508 }
1509
1510 return convertedMessages
1511}
1512
1513// buildSummaryPrompt constructs the prompt text for session summarization.
1514func buildSummaryPrompt(todos []session.Todo) string {
1515 var sb strings.Builder
1516 sb.WriteString("Provide a detailed summary of our conversation above.")
1517 if len(todos) > 0 {
1518 sb.WriteString("\n\n## Current Todo List\n\n")
1519 for _, t := range todos {
1520 fmt.Fprintf(&sb, "- [%s] %s\n", t.Status, t.Content)
1521 }
1522 sb.WriteString("\nInclude these tasks and their statuses in your summary. ")
1523 sb.WriteString("Instruct the resuming assistant to use the `todos` tool to continue tracking progress on these tasks.")
1524 }
1525 return sb.String()
1526}
1527
1528func providerRetryLogFields(err *fantasy.ProviderError, delay time.Duration) []any {
1529 fields := []any{
1530 "retry_delay", delay.String(),
1531 }
1532 if err == nil {
1533 return fields
1534 }
1535 fields = append(fields, "status_code", err.StatusCode)
1536 if err.Title != "" {
1537 fields = append(fields, "title", err.Title)
1538 }
1539 if err.Message != "" {
1540 fields = append(fields, "message", err.Message)
1541 }
1542 return fields
1543}