1// Package agent is the core orchestration layer for Crush AI agents.
2//
3// It provides session-based AI agent functionality for managing
4// conversations, tool execution, and message handling. It coordinates
5// interactions between language models, messages, sessions, and tools while
6// handling features like automatic summarization, queuing, and token
7// management.
8package agent
9
10import (
11 "cmp"
12 "context"
13 _ "embed"
14 "encoding/base64"
15 "errors"
16 "fmt"
17 "log/slog"
18 "net/http"
19 "os"
20 "regexp"
21 "strconv"
22 "strings"
23 "sync"
24 "time"
25
26 "charm.land/catwalk/pkg/catwalk"
27 "charm.land/fantasy"
28 "charm.land/fantasy/providers/anthropic"
29 "charm.land/fantasy/providers/bedrock"
30 "charm.land/fantasy/providers/google"
31 "charm.land/fantasy/providers/openai"
32 "charm.land/fantasy/providers/openrouter"
33 "charm.land/fantasy/providers/vercel"
34 "charm.land/lipgloss/v2"
35 "github.com/charmbracelet/crush/internal/agent/hyper"
36 "github.com/charmbracelet/crush/internal/agent/notify"
37 "github.com/charmbracelet/crush/internal/agent/tools"
38 "github.com/charmbracelet/crush/internal/agent/tools/mcp"
39 "github.com/charmbracelet/crush/internal/config"
40 "github.com/charmbracelet/crush/internal/csync"
41 "github.com/charmbracelet/crush/internal/message"
42 "github.com/charmbracelet/crush/internal/pubsub"
43 "github.com/charmbracelet/crush/internal/session"
44 "github.com/charmbracelet/crush/internal/stringext"
45 "github.com/charmbracelet/crush/internal/version"
46 "github.com/charmbracelet/x/exp/charmtone"
47)
48
49const (
50 DefaultSessionName = "Untitled Session"
51
52 // Constants for auto-summarization thresholds
53 largeContextWindowThreshold = 200_000
54 largeContextWindowBuffer = 20_000
55 smallContextWindowRatio = 0.2
56)
57
58var userAgent = fmt.Sprintf("Charm-Crush/%s (https://charm.land/crush)", version.Version)
59
60//go:embed templates/title.md
61var titlePrompt []byte
62
63//go:embed templates/summary.md
64var summaryPrompt []byte
65
66// Used to remove <think> tags from generated titles.
67var (
68 thinkTagRegex = regexp.MustCompile(`(?s)<think>.*?</think>`)
69 orphanThinkTagRegex = regexp.MustCompile(`</?think>`)
70)
71
72type SessionAgentCall struct {
73 SessionID string
74 // RunID, when non-empty, is the caller-supplied correlator that
75 // gets echoed back on the notify.RunComplete event emitted for
76 // this turn. It is preserved when the call is enqueued behind a
77 // busy session so the queued turn's terminal event is still
78 // recognisable to the original caller. Callers that need a
79 // reliable completion contract (e.g. `crush run` against a
80 // session that may be busy) MUST set it; SessionID alone is
81 // ambiguous when concurrent turns share the same session.
82 RunID string
83 Prompt string
84 ProviderOptions fantasy.ProviderOptions
85 Attachments []message.Attachment
86 MaxOutputTokens int64
87 Temperature *float64
88 TopP *float64
89 TopK *int64
90 FrequencyPenalty *float64
91 PresencePenalty *float64
92 NonInteractive bool
93 // OnComplete, when non-nil, replaces the default RunComplete
94 // publish path: the inner Run hands the terminal payload to this
95 // callback instead of emitting it on the RunComplete broker. The
96 // coordinator uses this hook to coalesce the unauthorized →
97 // re-auth → retry chain into a single user-visible terminal
98 // event, so non-interactive clients (e.g. `crush run`) don't
99 // exit on a stale failed-attempt RunComplete before the
100 // successful retry. It is intentionally stripped when queueing
101 // a busy-session call (see Run): the originating
102 // coordinator.Run has long returned by the time the queued
103 // recursion drains, so falling back to the default broker
104 // publish keeps the event visible to subscribers.
105 OnComplete func(notify.RunComplete)
106}
107
108type SessionAgent interface {
109 Run(context.Context, SessionAgentCall) (*fantasy.AgentResult, error)
110 SetModels(large Model, small Model)
111 SetTools(tools []fantasy.AgentTool)
112 SetSystemPrompt(systemPrompt string)
113 Cancel(sessionID string)
114 CancelAll()
115 IsSessionBusy(sessionID string) bool
116 IsBusy() bool
117 QueuedPrompts(sessionID string) int
118 QueuedPromptsList(sessionID string) []string
119 ClearQueue(sessionID string)
120 Summarize(context.Context, string, fantasy.ProviderOptions) error
121 Model() Model
122}
123
124type Model struct {
125 Model fantasy.LanguageModel
126 CatwalkCfg catwalk.Model
127 ModelCfg config.SelectedModel
128 FlatRate bool
129}
130
131type sessionAgent struct {
132 largeModel *csync.Value[Model]
133 smallModel *csync.Value[Model]
134 systemPromptPrefix *csync.Value[string]
135 systemPrompt *csync.Value[string]
136 tools *csync.Slice[fantasy.AgentTool]
137
138 isSubAgent bool
139 sessions session.Service
140 messages message.Service
141 disableAutoSummarize bool
142 isYolo bool
143 notify pubsub.Publisher[notify.Notification]
144 runComplete pubsub.Publisher[notify.RunComplete]
145
146 messageQueue *csync.Map[string, []SessionAgentCall]
147 activeRequests *csync.Map[string, context.CancelFunc]
148}
149
150type SessionAgentOptions struct {
151 LargeModel Model
152 SmallModel Model
153 SystemPromptPrefix string
154 SystemPrompt string
155 IsSubAgent bool
156 DisableAutoSummarize bool
157 IsYolo bool
158 Sessions session.Service
159 Messages message.Service
160 Tools []fantasy.AgentTool
161 Notify pubsub.Publisher[notify.Notification]
162 RunComplete pubsub.Publisher[notify.RunComplete]
163}
164
165func NewSessionAgent(
166 opts SessionAgentOptions,
167) SessionAgent {
168 return &sessionAgent{
169 largeModel: csync.NewValue(opts.LargeModel),
170 smallModel: csync.NewValue(opts.SmallModel),
171 systemPromptPrefix: csync.NewValue(opts.SystemPromptPrefix),
172 systemPrompt: csync.NewValue(opts.SystemPrompt),
173 isSubAgent: opts.IsSubAgent,
174 sessions: opts.Sessions,
175 messages: opts.Messages,
176 disableAutoSummarize: opts.DisableAutoSummarize,
177 tools: csync.NewSliceFrom(opts.Tools),
178 isYolo: opts.IsYolo,
179 notify: opts.Notify,
180 runComplete: opts.RunComplete,
181 messageQueue: csync.NewMap[string, []SessionAgentCall](),
182 activeRequests: csync.NewMap[string, context.CancelFunc](),
183 }
184}
185
186func (a *sessionAgent) Run(ctx context.Context, call SessionAgentCall) (result *fantasy.AgentResult, retErr error) {
187 if call.Prompt == "" && !message.ContainsTextAttachment(call.Attachments) {
188 return nil, ErrEmptyPrompt
189 }
190 if call.SessionID == "" {
191 return nil, ErrSessionMissing
192 }
193
194 // Queue the message if busy. Strip OnComplete: the caller that
195 // supplied the hook (typically coordinator.Run) has its own
196 // retry/coalesce scope that ends when it returns, so by the time
197 // the queue drains nobody is left to consume the buffered
198 // terminal event. The recursive Run will fall back to the
199 // default broker publish, which is what existing subscribers
200 // expect for queued turns.
201 if a.IsSessionBusy(call.SessionID) {
202 existing, ok := a.messageQueue.Get(call.SessionID)
203 if !ok {
204 existing = []SessionAgentCall{}
205 }
206 queued := call
207 queued.OnComplete = nil
208 existing = append(existing, queued)
209 a.messageQueue.Set(call.SessionID, existing)
210 return nil, nil
211 }
212
213 // Copy mutable fields under lock to avoid races with SetTools/SetModels.
214 agentTools := a.tools.Copy()
215 largeModel := a.largeModel.Get()
216 systemPrompt := a.systemPrompt.Get()
217 promptPrefix := a.systemPromptPrefix.Get()
218 var instructions strings.Builder
219
220 for _, server := range mcp.GetStates() {
221 if server.State != mcp.StateConnected {
222 continue
223 }
224 if s := server.Client.InitializeResult().Instructions; s != "" {
225 instructions.WriteString(s)
226 instructions.WriteString("\n\n")
227 }
228 }
229
230 if s := instructions.String(); s != "" {
231 systemPrompt += "\n\n<mcp-instructions>\n" + s + "\n</mcp-instructions>"
232 }
233
234 if len(agentTools) > 0 {
235 // Add Anthropic caching to the last tool.
236 agentTools[len(agentTools)-1].SetProviderOptions(a.getCacheControlOptions())
237 }
238
239 agent := fantasy.NewAgent(
240 largeModel.Model,
241 fantasy.WithSystemPrompt(systemPrompt),
242 fantasy.WithTools(agentTools...),
243 fantasy.WithUserAgent(userAgent),
244 )
245
246 sessionLock := sync.Mutex{}
247 currentSession, err := a.sessions.Get(ctx, call.SessionID)
248 if err != nil {
249 return nil, fmt.Errorf("failed to get session: %w", err)
250 }
251
252 msgs, err := a.getSessionMessages(ctx, currentSession)
253 if err != nil {
254 return nil, fmt.Errorf("failed to get session messages: %w", err)
255 }
256
257 var wg sync.WaitGroup
258 // Generate title if first message.
259 if len(msgs) == 0 {
260 titleCtx := ctx // Copy to avoid race with ctx reassignment below.
261 wg.Go(func() {
262 a.generateTitle(titleCtx, call.SessionID, call.Prompt)
263 })
264 }
265 defer wg.Wait()
266
267 // Add the user message to the session.
268 _, err = a.createUserMessage(ctx, call)
269 if err != nil {
270 return nil, err
271 }
272
273 // Add the session to the context.
274 ctx = context.WithValue(ctx, tools.SessionIDContextKey, call.SessionID)
275
276 genCtx, cancel := context.WithCancel(ctx)
277 a.activeRequests.Set(call.SessionID, cancel)
278
279 defer cancel()
280 defer a.activeRequests.Del(call.SessionID)
281 // skipRunComplete is set just before the queued-recursion path so
282 // the outer Run doesn't publish a RunComplete that would race
283 // with — and be superseded by — the recursive call's own
284 // RunComplete (each queued user prompt is its own turn and
285 // publishes exactly one terminal event).
286 var skipRunComplete bool
287 // currentAssistant is declared here so the deferred RunComplete
288 // publish below can capture the pointer that PrepareStep will
289 // later (re)assign for each streaming step. The final assistant
290 // message of the turn is the value reachable through this
291 // pointer when the defer runs.
292 var currentAssistant *message.Message
293 // Drain any debounced message updates before returning. message.Service
294 // already flushes synchronously on terminal updates, but a defer here
295 // guarantees the contract at every Run exit (success, error, panic
296 // recovery upstream) without callers needing to know.
297 //
298 // After the flush completes — meaning all per-message
299 // Publish(UpdatedEvent) calls have fired and been buffered into
300 // every subscriber's channel — publish the authoritative
301 // RunComplete event for this turn. The flush-then-publish order
302 // gives well-behaved clients the best chance of seeing the final
303 // message event before RunComplete; the embedded Text field
304 // reconciles for clients that observe the events out of order
305 // (the pubsub broker fan-in does not serialize publishes from
306 // different upstream brokers).
307 defer func() {
308 if flushErr := a.messages.FlushAll(ctx); flushErr != nil {
309 slog.Error("Failed to flush pending message updates after run", "error", flushErr)
310 }
311 if skipRunComplete {
312 return
313 }
314 complete := notify.RunComplete{SessionID: call.SessionID, RunID: call.RunID}
315 if currentAssistant != nil {
316 complete.MessageID = currentAssistant.ID
317 complete.Text = currentAssistant.Content().String()
318 }
319 if retErr != nil {
320 complete.Error = retErr.Error()
321 complete.Cancelled = errors.Is(retErr, context.Canceled)
322 } else if ctx.Err() != nil {
323 complete.Cancelled = true
324 }
325 // Prefer the per-call hook when supplied so the coordinator
326 // can coalesce retries (e.g. unauthorized → re-auth → retry)
327 // into a single user-visible terminal event. The fallback
328 // must-deliver publish applies bounded-blocking semantics to
329 // the authoritative terminal event so a momentarily-full
330 // subscriber channel can't silently drop it and hang
331 // non-interactive clients waiting on RunComplete.
332 if call.OnComplete != nil {
333 call.OnComplete(complete)
334 return
335 }
336 if a.runComplete == nil {
337 return
338 }
339 a.runComplete.PublishMustDeliver(ctx, pubsub.UpdatedEvent, complete)
340 }()
341
342 history, files := a.preparePrompt(msgs, largeModel.CatwalkCfg.SupportsImages, call.Attachments...)
343
344 startTime := time.Now()
345 a.eventPromptSent(call.SessionID)
346
347 var stepMessages []fantasy.Message
348 var shouldSummarize bool
349 // Don't send MaxOutputTokens if 0 — some providers (e.g. LM Studio) reject it
350 var maxOutputTokens *int64
351 if call.MaxOutputTokens > 0 {
352 maxOutputTokens = &call.MaxOutputTokens
353 }
354 result, err = agent.Stream(genCtx, fantasy.AgentStreamCall{
355 Prompt: message.PromptWithTextAttachments(call.Prompt, call.Attachments),
356 Files: files,
357 Messages: history,
358 ProviderOptions: call.ProviderOptions,
359 MaxOutputTokens: maxOutputTokens,
360 TopP: call.TopP,
361 Temperature: call.Temperature,
362 PresencePenalty: call.PresencePenalty,
363 TopK: call.TopK,
364 FrequencyPenalty: call.FrequencyPenalty,
365 PrepareStep: func(callContext context.Context, options fantasy.PrepareStepFunctionOptions) (_ context.Context, prepared fantasy.PrepareStepResult, err error) {
366 prepared.Messages = options.Messages
367 for i := range prepared.Messages {
368 prepared.Messages[i].ProviderOptions = nil
369 }
370
371 // Use latest tools (updated by SetTools when MCP tools change).
372 prepared.Tools = a.tools.Copy()
373
374 queuedCalls, _ := a.messageQueue.Get(call.SessionID)
375 a.messageQueue.Del(call.SessionID)
376 for _, queued := range queuedCalls {
377 userMessage, createErr := a.createUserMessage(callContext, queued)
378 if createErr != nil {
379 return callContext, prepared, createErr
380 }
381 prepared.Messages = append(prepared.Messages, userMessage.ToAIMessage()...)
382 }
383
384 prepared.Messages = a.workaroundProviderMediaLimitations(prepared.Messages, largeModel)
385
386 lastSystemRoleInx := 0
387 systemMessageUpdated := false
388 for i, msg := range prepared.Messages {
389 // Only add cache control to the last message.
390 if msg.Role == fantasy.MessageRoleSystem {
391 lastSystemRoleInx = i
392 } else if !systemMessageUpdated {
393 prepared.Messages[lastSystemRoleInx].ProviderOptions = a.getCacheControlOptions()
394 systemMessageUpdated = true
395 }
396 // Than add cache control to the last 2 messages.
397 if i > len(prepared.Messages)-3 {
398 prepared.Messages[i].ProviderOptions = a.getCacheControlOptions()
399 }
400 }
401
402 if promptPrefix != "" {
403 prepared.Messages = append([]fantasy.Message{fantasy.NewSystemMessage(promptPrefix)}, prepared.Messages...)
404 }
405
406 sessionLock.Lock()
407 stepMessages = cloneFantasyMessages(prepared.Messages)
408 sessionLock.Unlock()
409
410 var assistantMsg message.Message
411 assistantMsg, err = a.messages.Create(callContext, call.SessionID, message.CreateMessageParams{
412 Role: message.Assistant,
413 Parts: []message.ContentPart{},
414 Model: largeModel.ModelCfg.Model,
415 Provider: largeModel.ModelCfg.Provider,
416 })
417 if err != nil {
418 return callContext, prepared, err
419 }
420 callContext = context.WithValue(callContext, tools.MessageIDContextKey, assistantMsg.ID)
421 callContext = context.WithValue(callContext, tools.SupportsImagesContextKey, largeModel.CatwalkCfg.SupportsImages)
422 callContext = context.WithValue(callContext, tools.ModelNameContextKey, largeModel.CatwalkCfg.Name)
423 currentAssistant = &assistantMsg
424 return callContext, prepared, err
425 },
426 OnReasoningStart: func(id string, reasoning fantasy.ReasoningContent) error {
427 currentAssistant.AppendReasoningContent(reasoning.Text)
428 return a.messages.Update(genCtx, *currentAssistant)
429 },
430 OnReasoningDelta: func(id string, text string) error {
431 currentAssistant.AppendReasoningContent(text)
432 return a.messages.Update(genCtx, *currentAssistant)
433 },
434 OnReasoningEnd: func(id string, reasoning fantasy.ReasoningContent) error {
435 // handle anthropic signature
436 if anthropicData, ok := reasoning.ProviderMetadata[anthropic.Name]; ok {
437 if reasoning, ok := anthropicData.(*anthropic.ReasoningOptionMetadata); ok {
438 currentAssistant.AppendReasoningSignature(reasoning.Signature)
439 }
440 }
441 if googleData, ok := reasoning.ProviderMetadata[google.Name]; ok {
442 if reasoning, ok := googleData.(*google.ReasoningMetadata); ok {
443 currentAssistant.AppendThoughtSignature(reasoning.Signature, reasoning.ToolID)
444 }
445 }
446 if openaiData, ok := reasoning.ProviderMetadata[openai.Name]; ok {
447 if reasoning, ok := openaiData.(*openai.ResponsesReasoningMetadata); ok {
448 currentAssistant.SetReasoningResponsesData(reasoning)
449 }
450 }
451 currentAssistant.FinishThinking()
452 return a.messages.Update(genCtx, *currentAssistant)
453 },
454 OnTextDelta: func(id string, text string) error {
455 // Strip leading newline from initial text content. This is is
456 // particularly important in non-interactive mode where leading
457 // newlines are very visible.
458 if len(currentAssistant.Parts) == 0 {
459 text = strings.TrimPrefix(text, "\n")
460 }
461
462 currentAssistant.AppendContent(text)
463 return a.messages.Update(genCtx, *currentAssistant)
464 },
465 OnToolInputStart: func(id string, toolName string) error {
466 toolCall := message.ToolCall{
467 ID: id,
468 Name: toolName,
469 ProviderExecuted: false,
470 Finished: false,
471 }
472 currentAssistant.AddToolCall(toolCall)
473 // Use parent ctx instead of genCtx to ensure the update succeeds
474 // even if the request is canceled mid-stream
475 return a.messages.Update(ctx, *currentAssistant)
476 },
477 OnRetry: func(err *fantasy.ProviderError, delay time.Duration) {
478 slog.Warn("Provider request failed, retrying", providerRetryLogFields(err, delay)...)
479 },
480 OnToolCall: func(tc fantasy.ToolCallContent) error {
481 toolCall := message.ToolCall{
482 ID: tc.ToolCallID,
483 Name: tc.ToolName,
484 Input: tc.Input,
485 ProviderExecuted: false,
486 Finished: true,
487 }
488 currentAssistant.AddToolCall(toolCall)
489 // Use parent ctx instead of genCtx to ensure the update succeeds
490 // even if the request is canceled mid-stream
491 return a.messages.Update(ctx, *currentAssistant)
492 },
493 OnToolResult: func(result fantasy.ToolResultContent) error {
494 toolResult := a.convertToToolResult(result)
495 // Use parent ctx instead of genCtx to ensure the message is created
496 // even if the request is canceled mid-stream
497 _, createMsgErr := a.messages.Create(ctx, currentAssistant.SessionID, message.CreateMessageParams{
498 Role: message.Tool,
499 Parts: []message.ContentPart{
500 toolResult,
501 },
502 })
503 return createMsgErr
504 },
505 OnStepFinish: func(stepResult fantasy.StepResult) error {
506 finishReason := message.FinishReasonUnknown
507 switch stepResult.FinishReason {
508 case fantasy.FinishReasonLength:
509 finishReason = message.FinishReasonMaxTokens
510 case fantasy.FinishReasonStop:
511 finishReason = message.FinishReasonEndTurn
512 case fantasy.FinishReasonToolCalls:
513 finishReason = message.FinishReasonToolUse
514 }
515 // If a tool result halted the turn (e.g. a hook halt or a
516 // permission denial), the step ends on FinishReasonToolCalls but
517 // the model will not be called again. Treat it as the end of the
518 // turn so the UI can render the assistant footer.
519 if finishReason == message.FinishReasonToolUse {
520 for _, tr := range stepResult.Content.ToolResults() {
521 if tr.StopTurn {
522 finishReason = message.FinishReasonEndTurn
523 break
524 }
525 }
526 }
527 currentAssistant.AddFinish(finishReason, "", "")
528 sessionLock.Lock()
529 defer sessionLock.Unlock()
530
531 updatedSession, getSessionErr := a.sessions.Get(ctx, call.SessionID)
532 if getSessionErr != nil {
533 return getSessionErr
534 }
535 usage, estimated := fallbackStepUsage(stepMessages, stepResult)
536 a.updateSessionUsage(largeModel, &updatedSession, usage, a.openrouterCost(stepResult.ProviderMetadata), estimated)
537 _, sessionErr := a.sessions.Save(ctx, updatedSession)
538 if sessionErr != nil {
539 return sessionErr
540 }
541 currentSession = updatedSession
542 return a.messages.Update(genCtx, *currentAssistant)
543 },
544 StopWhen: []fantasy.StopCondition{
545 func(_ []fantasy.StepResult) bool {
546 cw := int64(largeModel.CatwalkCfg.ContextWindow)
547 // If context window is unknown (0), skip auto-summarize
548 // to avoid immediately truncating custom/local models.
549 if cw == 0 {
550 return false
551 }
552 tokens := currentSession.CompletionTokens + currentSession.PromptTokens
553 remaining := cw - tokens
554 var threshold int64
555 if cw > largeContextWindowThreshold {
556 threshold = largeContextWindowBuffer
557 } else {
558 threshold = int64(float64(cw) * smallContextWindowRatio)
559 }
560 if (remaining <= threshold) && !a.disableAutoSummarize {
561 shouldSummarize = true
562 return true
563 }
564 return false
565 },
566 func(steps []fantasy.StepResult) bool {
567 return hasRepeatedToolCalls(steps, loopDetectionWindowSize, loopDetectionMaxRepeats)
568 },
569 },
570 })
571
572 a.eventPromptResponded(call.SessionID, time.Since(startTime).Truncate(time.Second))
573
574 if err != nil {
575 isHyper := largeModel.ModelCfg.Provider == hyper.Name
576 isCancelErr := errors.Is(err, context.Canceled)
577 if currentAssistant == nil {
578 return result, err
579 }
580 // Ensure we finish thinking on error to close the reasoning state.
581 currentAssistant.FinishThinking()
582 toolCalls := currentAssistant.ToolCalls()
583 // INFO: we use the parent context here because the genCtx has been cancelled.
584 msgs, createErr := a.messages.List(ctx, currentAssistant.SessionID)
585 if createErr != nil {
586 return nil, createErr
587 }
588 for _, tc := range toolCalls {
589 if !tc.Finished {
590 tc.Finished = true
591 tc.Input = "{}"
592 currentAssistant.AddToolCall(tc)
593 updateErr := a.messages.Update(ctx, *currentAssistant)
594 if updateErr != nil {
595 return nil, updateErr
596 }
597 }
598
599 found := false
600 for _, msg := range msgs {
601 if msg.Role == message.Tool {
602 for _, tr := range msg.ToolResults() {
603 if tr.ToolCallID == tc.ID {
604 found = true
605 break
606 }
607 }
608 }
609 if found {
610 break
611 }
612 }
613 if found {
614 continue
615 }
616 content := "There was an error while executing the tool"
617 if isCancelErr {
618 content = "Error: user cancelled assistant tool calling"
619 }
620 toolResult := message.ToolResult{
621 ToolCallID: tc.ID,
622 Name: tc.Name,
623 Content: content,
624 IsError: true,
625 }
626 _, createErr = a.messages.Create(ctx, currentAssistant.SessionID, message.CreateMessageParams{
627 Role: message.Tool,
628 Parts: []message.ContentPart{
629 toolResult,
630 },
631 })
632 if createErr != nil {
633 return nil, createErr
634 }
635 }
636 var fantasyErr *fantasy.Error
637 var providerErr *fantasy.ProviderError
638 const defaultTitle = "Provider Error"
639 linkStyle := lipgloss.NewStyle().Foreground(charmtone.Guac).Underline(true)
640 if isCancelErr {
641 currentAssistant.AddFinish(message.FinishReasonCanceled, "User canceled request", "")
642 } else if isHyper && errors.As(err, &providerErr) && providerErr.StatusCode == http.StatusUnauthorized {
643 currentAssistant.AddFinish(message.FinishReasonError, "Unauthorized", `Please re-authenticate with Hyper. You can also run "crush auth" to re-authenticate.`)
644 } else if isHyper && errors.As(err, &providerErr) && providerErr.StatusCode == http.StatusPaymentRequired {
645 url := hyper.BaseURL()
646 link := linkStyle.Hyperlink(url, "id=hyper").Render(url)
647 currentAssistant.AddFinish(message.FinishReasonError, "No credits", "You're out of credits. Add more at "+link)
648 } else if errors.As(err, &providerErr) {
649 if providerErr.Message == "The requested model is not supported." {
650 url := "https://github.com/settings/copilot/features"
651 link := linkStyle.Hyperlink(url, "id=copilot").Render(url)
652 currentAssistant.AddFinish(
653 message.FinishReasonError,
654 "Copilot model not enabled",
655 fmt.Sprintf("%q is not enabled in Copilot. Go to the following page to enable it. Then, wait 5 minutes before trying again. %s", largeModel.CatwalkCfg.Name, link),
656 )
657 } else {
658 currentAssistant.AddFinish(message.FinishReasonError, cmp.Or(stringext.Capitalize(providerErr.Title), defaultTitle), providerErr.Message)
659 }
660 } else if errors.As(err, &fantasyErr) {
661 currentAssistant.AddFinish(message.FinishReasonError, cmp.Or(stringext.Capitalize(fantasyErr.Title), defaultTitle), fantasyErr.Message)
662 } else {
663 currentAssistant.AddFinish(message.FinishReasonError, defaultTitle, err.Error())
664 }
665 // Note: we use the parent context here because the genCtx has been
666 // cancelled.
667 updateErr := a.messages.Update(ctx, *currentAssistant)
668 if updateErr != nil {
669 return nil, updateErr
670 }
671 return nil, err
672 }
673
674 if shouldSummarize {
675 a.activeRequests.Del(call.SessionID)
676 if summarizeErr := a.Summarize(genCtx, call.SessionID, call.ProviderOptions); summarizeErr != nil {
677 return nil, summarizeErr
678 }
679 // If the agent wasn't done...
680 if len(currentAssistant.ToolCalls()) > 0 {
681 existing, ok := a.messageQueue.Get(call.SessionID)
682 if !ok {
683 existing = []SessionAgentCall{}
684 }
685 call.Prompt = fmt.Sprintf("The previous session was interrupted because it got too long, the initial user request was: `%s`", call.Prompt)
686 existing = append(existing, call)
687 a.messageQueue.Set(call.SessionID, existing)
688 }
689 }
690
691 // Release active request before publishing the notification.
692 // TUI handlers poll IsSessionBusy() and only re-evaluate when a
693 // tea.Msg arrives, so the cleanup must precede the notify or
694 // subscribers see stale busy state at the moment of receipt.
695 a.activeRequests.Del(call.SessionID)
696 cancel()
697
698 // Send notification that agent has finished its turn (skip for
699 // nested/non-interactive sessions).
700 if !call.NonInteractive && a.notify != nil {
701 a.notify.Publish(pubsub.CreatedEvent, notify.Notification{
702 SessionID: call.SessionID,
703 SessionTitle: currentSession.Title,
704 Type: notify.TypeAgentFinished,
705 })
706 }
707
708 queuedMessages, ok := a.messageQueue.Get(call.SessionID)
709 if !ok || len(queuedMessages) == 0 {
710 return result, err
711 }
712 // There are queued messages restart the loop. The recursive Run
713 // publishes its own RunComplete for the queued prompt, so suppress
714 // the outer defer's emit to avoid a duplicate event whose Error
715 // field would belong to the recursive turn but whose MessageID/Text
716 // would belong to the outer turn.
717 skipRunComplete = true
718 firstQueuedMessage := queuedMessages[0]
719 a.messageQueue.Set(call.SessionID, queuedMessages[1:])
720 return a.Run(ctx, firstQueuedMessage)
721}
722
723func (a *sessionAgent) Summarize(ctx context.Context, sessionID string, opts fantasy.ProviderOptions) error {
724 if a.IsSessionBusy(sessionID) {
725 return ErrSessionBusy
726 }
727
728 // Copy mutable fields under lock to avoid races with SetModels.
729 largeModel := a.largeModel.Get()
730 systemPromptPrefix := a.systemPromptPrefix.Get()
731
732 currentSession, err := a.sessions.Get(ctx, sessionID)
733 if err != nil {
734 return fmt.Errorf("failed to get session: %w", err)
735 }
736 msgs, err := a.getSessionMessages(ctx, currentSession)
737 if err != nil {
738 return err
739 }
740 if len(msgs) == 0 {
741 // Nothing to summarize.
742 return nil
743 }
744
745 aiMsgs, _ := a.preparePrompt(msgs, largeModel.CatwalkCfg.SupportsImages)
746
747 genCtx, cancel := context.WithCancel(ctx)
748 a.activeRequests.Set(sessionID, cancel)
749 defer a.activeRequests.Del(sessionID)
750 defer cancel()
751 defer func() {
752 if flushErr := a.messages.FlushAll(ctx); flushErr != nil {
753 slog.Error("Failed to flush pending message updates after summarize", "error", flushErr)
754 }
755 }()
756
757 agent := fantasy.NewAgent(
758 largeModel.Model,
759 fantasy.WithSystemPrompt(string(summaryPrompt)),
760 fantasy.WithUserAgent(userAgent),
761 )
762 summaryMessage, err := a.messages.Create(ctx, sessionID, message.CreateMessageParams{
763 Role: message.Assistant,
764 Model: largeModel.ModelCfg.Model,
765 Provider: largeModel.ModelCfg.Provider,
766 IsSummaryMessage: true,
767 })
768 if err != nil {
769 return err
770 }
771
772 summaryPromptText := buildSummaryPrompt(currentSession.Todos)
773
774 resp, err := agent.Stream(genCtx, fantasy.AgentStreamCall{
775 Prompt: summaryPromptText,
776 Messages: aiMsgs,
777 ProviderOptions: opts,
778 PrepareStep: func(callContext context.Context, options fantasy.PrepareStepFunctionOptions) (_ context.Context, prepared fantasy.PrepareStepResult, err error) {
779 prepared.Messages = options.Messages
780 if systemPromptPrefix != "" {
781 prepared.Messages = append([]fantasy.Message{fantasy.NewSystemMessage(systemPromptPrefix)}, prepared.Messages...)
782 }
783 return callContext, prepared, nil
784 },
785 OnReasoningDelta: func(id string, text string) error {
786 summaryMessage.AppendReasoningContent(text)
787 return a.messages.Update(genCtx, summaryMessage)
788 },
789 OnReasoningEnd: func(id string, reasoning fantasy.ReasoningContent) error {
790 // Handle anthropic signature.
791 if anthropicData, ok := reasoning.ProviderMetadata["anthropic"]; ok {
792 if signature, ok := anthropicData.(*anthropic.ReasoningOptionMetadata); ok && signature.Signature != "" {
793 summaryMessage.AppendReasoningSignature(signature.Signature)
794 }
795 }
796 summaryMessage.FinishThinking()
797 return a.messages.Update(genCtx, summaryMessage)
798 },
799 OnTextDelta: func(id, text string) error {
800 summaryMessage.AppendContent(text)
801 return a.messages.Update(genCtx, summaryMessage)
802 },
803 })
804 if err != nil {
805 isCancelErr := errors.Is(err, context.Canceled)
806 if isCancelErr {
807 // User cancelled summarize we need to remove the summary message.
808 deleteErr := a.messages.Delete(ctx, summaryMessage.ID)
809 return deleteErr
810 }
811 // Mark the summary message as finished with an error so the UI
812 // stops spinning.
813 summaryMessage.AddFinish(message.FinishReasonError, "Summarization Error", err.Error())
814 if updateErr := a.messages.Update(ctx, summaryMessage); updateErr != nil {
815 return updateErr
816 }
817 return err
818 }
819
820 summaryMessage.AddFinish(message.FinishReasonEndTurn, "", "")
821 err = a.messages.Update(genCtx, summaryMessage)
822 if err != nil {
823 return err
824 }
825
826 var openrouterCost *float64
827 for _, step := range resp.Steps {
828 stepCost := a.openrouterCost(step.ProviderMetadata)
829 if stepCost != nil {
830 newCost := *stepCost
831 if openrouterCost != nil {
832 newCost += *openrouterCost
833 }
834 openrouterCost = &newCost
835 }
836 }
837
838 a.updateSessionUsage(largeModel, ¤tSession, resp.TotalUsage, openrouterCost, false)
839
840 // Just in case, get just the last usage info.
841 usage := resp.Response.Usage
842 currentSession.SummaryMessageID = summaryMessage.ID
843 currentSession.CompletionTokens = summaryCompletionTokens(usage, summaryMessage)
844 currentSession.PromptTokens = 0
845 currentSession.EstimatedUsage = usageIsZero(usage)
846 _, err = a.sessions.Save(genCtx, currentSession)
847 if err != nil {
848 return err
849 }
850
851 // Release the active request before processing queued messages so that
852 // Run() does not see the session as busy.
853 a.activeRequests.Del(sessionID)
854 cancel()
855
856 // Process any messages that were queued while summarizing.
857 queuedMessages, ok := a.messageQueue.Get(sessionID)
858 if !ok || len(queuedMessages) == 0 {
859 return nil
860 }
861 firstQueuedMessage := queuedMessages[0]
862 a.messageQueue.Set(sessionID, queuedMessages[1:])
863 _, qErr := a.Run(ctx, firstQueuedMessage)
864 return qErr
865}
866
867func (a *sessionAgent) getCacheControlOptions() fantasy.ProviderOptions {
868 if t, _ := strconv.ParseBool(os.Getenv("CRUSH_DISABLE_ANTHROPIC_CACHE")); t {
869 return fantasy.ProviderOptions{}
870 }
871 return fantasy.ProviderOptions{
872 anthropic.Name: &anthropic.ProviderCacheControlOptions{
873 CacheControl: anthropic.CacheControl{Type: "ephemeral"},
874 },
875 bedrock.Name: &anthropic.ProviderCacheControlOptions{
876 CacheControl: anthropic.CacheControl{Type: "ephemeral"},
877 },
878 vercel.Name: &anthropic.ProviderCacheControlOptions{
879 CacheControl: anthropic.CacheControl{Type: "ephemeral"},
880 },
881 }
882}
883
884func (a *sessionAgent) createUserMessage(ctx context.Context, call SessionAgentCall) (message.Message, error) {
885 parts := []message.ContentPart{message.TextContent{Text: call.Prompt}}
886 var attachmentParts []message.ContentPart
887 for _, attachment := range call.Attachments {
888 attachmentParts = append(attachmentParts, message.BinaryContent{Path: attachment.FilePath, MIMEType: attachment.MimeType, Data: attachment.Content})
889 }
890 parts = append(parts, attachmentParts...)
891 msg, err := a.messages.Create(ctx, call.SessionID, message.CreateMessageParams{
892 Role: message.User,
893 Parts: parts,
894 })
895 if err != nil {
896 return message.Message{}, fmt.Errorf("failed to create user message: %w", err)
897 }
898 return msg, nil
899}
900
901func (a *sessionAgent) preparePrompt(msgs []message.Message, supportsImages bool, attachments ...message.Attachment) ([]fantasy.Message, []fantasy.FilePart) {
902 var history []fantasy.Message
903 if !a.isSubAgent {
904 history = append(history, fantasy.NewUserMessage(
905 fmt.Sprintf(
906 "<system_reminder>%s</system_reminder>",
907 `This is a reminder that your todo list is currently empty. DO NOT mention this to the user explicitly because they are already aware.
908If you are working on tasks that would benefit from a todo list please use the "todos" tool to create one.
909If not, please feel free to ignore. Again do not mention this message to the user.`,
910 ),
911 ))
912 }
913 // Collect all tool call IDs present in assistant messages and all tool
914 // result IDs present in tool messages. This lets us detect both orphaned
915 // tool results (result without a call) and orphaned tool calls (call
916 // without a result).
917 knownToolCallIDs := make(map[string]struct{})
918 knownToolResultIDs := make(map[string]struct{})
919 for _, m := range msgs {
920 switch m.Role {
921 case message.Assistant:
922 for _, tc := range m.ToolCalls() {
923 knownToolCallIDs[tc.ID] = struct{}{}
924 }
925 case message.Tool:
926 for _, tr := range m.ToolResults() {
927 knownToolResultIDs[tr.ToolCallID] = struct{}{}
928 }
929 }
930 }
931
932 for _, m := range msgs {
933 if len(m.Parts) == 0 {
934 continue
935 }
936 // Assistant message without content or tool calls (cancelled before it returned anything).
937 if m.Role == message.Assistant && len(m.ToolCalls()) == 0 && m.Content().Text == "" && m.ReasoningContent().String() == "" {
938 continue
939 }
940 if m.Role == message.Tool {
941 if msg, ok := filterOrphanedToolResults(m, knownToolCallIDs); ok {
942 history = append(history, msg)
943 }
944 continue
945 }
946 aiMsgs := m.ToAIMessage()
947 if !supportsImages {
948 for i := range aiMsgs {
949 if aiMsgs[i].Role == fantasy.MessageRoleUser {
950 aiMsgs[i].Content = filterFileParts(aiMsgs[i].Content)
951 }
952 }
953 }
954 history = append(history, aiMsgs...)
955
956 if m.Role == message.Assistant {
957 if msg, ok := syntheticToolResultsForOrphanedCalls(m, knownToolResultIDs); ok {
958 history = append(history, msg)
959 }
960 }
961 }
962
963 var files []fantasy.FilePart
964 for _, attachment := range attachments {
965 if attachment.IsText() {
966 continue
967 }
968 files = append(files, fantasy.FilePart{
969 Filename: attachment.FileName,
970 Data: attachment.Content,
971 MediaType: attachment.MimeType,
972 })
973 }
974
975 return history, files
976}
977
978// filterFileParts removes fantasy.FilePart entries from a slice of message
979// parts. Used to strip image attachments from historical user messages when
980// the current model does not support them.
981func filterFileParts(parts []fantasy.MessagePart) []fantasy.MessagePart {
982 filtered := make([]fantasy.MessagePart, 0, len(parts))
983 for _, part := range parts {
984 if _, ok := fantasy.AsMessagePart[fantasy.FilePart](part); ok {
985 continue
986 }
987 filtered = append(filtered, part)
988 }
989 return filtered
990}
991
992// filterOrphanedToolResults converts a tool message to a fantasy.Message,
993// dropping any tool result parts whose tool_call_id has no matching tool call
994// in the known set. An orphaned result causes API validation to fail on every
995// subsequent turn, permanently locking the session. Returns the filtered
996// message and true if at least one valid part remains.
997func filterOrphanedToolResults(m message.Message, knownToolCallIDs map[string]struct{}) (fantasy.Message, bool) {
998 aiMsgs := m.ToAIMessage()
999 if len(aiMsgs) == 0 {
1000 return fantasy.Message{}, false
1001 }
1002 var validParts []fantasy.MessagePart
1003 for _, part := range aiMsgs[0].Content {
1004 tr, ok := fantasy.AsMessagePart[fantasy.ToolResultPart](part)
1005 if !ok {
1006 validParts = append(validParts, part)
1007 continue
1008 }
1009 if _, known := knownToolCallIDs[tr.ToolCallID]; known {
1010 validParts = append(validParts, part)
1011 } else {
1012 slog.Warn(
1013 "Dropping orphaned tool result with no matching tool call",
1014 "tool_call_id", tr.ToolCallID,
1015 )
1016 }
1017 }
1018 if len(validParts) == 0 {
1019 return fantasy.Message{}, false
1020 }
1021 msg := aiMsgs[0]
1022 msg.Content = validParts
1023 return msg, true
1024}
1025
1026// syntheticToolResultsForOrphanedCalls returns a tool message containing
1027// synthetic tool results for any tool calls in the assistant message that
1028// have no matching result in knownToolResultIDs. LLM APIs require every
1029// tool_use to be immediately followed by a tool_result; an interrupted
1030// session can leave orphaned tool_use blocks that permanently lock the
1031// conversation. Returns the message and true if any synthetic results were
1032// produced.
1033func syntheticToolResultsForOrphanedCalls(m message.Message, knownToolResultIDs map[string]struct{}) (fantasy.Message, bool) {
1034 var syntheticParts []fantasy.MessagePart
1035 for _, tc := range m.ToolCalls() {
1036 if _, hasResult := knownToolResultIDs[tc.ID]; hasResult {
1037 continue
1038 }
1039 slog.Warn(
1040 "Injecting synthetic tool result for orphaned tool call",
1041 "tool_call_id", tc.ID,
1042 "tool_name", tc.Name,
1043 )
1044 syntheticParts = append(syntheticParts, fantasy.ToolResultPart{
1045 ToolCallID: tc.ID,
1046 Output: fantasy.ToolResultOutputContentError{
1047 Error: errors.New("tool call was interrupted and did not produce a result, you may retry this call if the result is still needed"),
1048 },
1049 })
1050 }
1051 if len(syntheticParts) == 0 {
1052 return fantasy.Message{}, false
1053 }
1054 return fantasy.Message{
1055 Role: fantasy.MessageRoleTool,
1056 Content: syntheticParts,
1057 }, true
1058}
1059
1060func (a *sessionAgent) getSessionMessages(ctx context.Context, session session.Session) ([]message.Message, error) {
1061 msgs, err := a.messages.List(ctx, session.ID)
1062 if err != nil {
1063 return nil, fmt.Errorf("failed to list messages: %w", err)
1064 }
1065
1066 if session.SummaryMessageID != "" {
1067 summaryMsgIndex := -1
1068 for i, msg := range msgs {
1069 if msg.ID == session.SummaryMessageID {
1070 summaryMsgIndex = i
1071 break
1072 }
1073 }
1074 if summaryMsgIndex != -1 {
1075 msgs = msgs[summaryMsgIndex:]
1076 msgs[0].Role = message.User
1077 }
1078 }
1079 return msgs, nil
1080}
1081
1082// generateTitle generates a session titled based on the initial prompt.
1083func (a *sessionAgent) generateTitle(ctx context.Context, sessionID string, userPrompt string) {
1084 if userPrompt == "" {
1085 return
1086 }
1087
1088 smallModel := a.smallModel.Get()
1089 largeModel := a.largeModel.Get()
1090 systemPromptPrefix := a.systemPromptPrefix.Get()
1091
1092 var maxOutputTokens int64 = 40
1093 if smallModel.CatwalkCfg.CanReason {
1094 maxOutputTokens = smallModel.CatwalkCfg.DefaultMaxTokens
1095 }
1096
1097 newAgent := func(m fantasy.LanguageModel, p []byte, tok int64) fantasy.Agent {
1098 return fantasy.NewAgent(
1099 m,
1100 fantasy.WithSystemPrompt(string(p)+"\n /no_think"),
1101 fantasy.WithMaxOutputTokens(tok),
1102 fantasy.WithUserAgent(userAgent),
1103 )
1104 }
1105
1106 streamCall := fantasy.AgentStreamCall{
1107 Prompt: fmt.Sprintf("Generate a concise title for the following content:\n\n%s\n <think>\n\n</think>", userPrompt),
1108 PrepareStep: func(callCtx context.Context, opts fantasy.PrepareStepFunctionOptions) (_ context.Context, prepared fantasy.PrepareStepResult, err error) {
1109 prepared.Messages = opts.Messages
1110 if systemPromptPrefix != "" {
1111 prepared.Messages = append([]fantasy.Message{
1112 fantasy.NewSystemMessage(systemPromptPrefix),
1113 }, prepared.Messages...)
1114 }
1115 return callCtx, prepared, nil
1116 },
1117 }
1118
1119 // Use the small model to generate the title.
1120 model := smallModel
1121 agent := newAgent(model.Model, titlePrompt, maxOutputTokens)
1122 resp, err := agent.Stream(ctx, streamCall)
1123 if err == nil {
1124 // We successfully generated a title with the small model.
1125 slog.Debug("Generated title with small model")
1126 } else {
1127 // It didn't work. Let's try with the big model.
1128 slog.Error("Error generating title with small model; trying big model", "err", err)
1129 model = largeModel
1130 agent = newAgent(model.Model, titlePrompt, maxOutputTokens)
1131 resp, err = agent.Stream(ctx, streamCall)
1132 if err == nil {
1133 slog.Debug("Generated title with large model")
1134 } else {
1135 // Welp, the large model didn't work either. Use the default
1136 // session name and return.
1137 slog.Error("Error generating title with large model", "err", err)
1138 saveErr := a.sessions.Rename(ctx, sessionID, DefaultSessionName)
1139 if saveErr != nil {
1140 slog.Error("Failed to save session title", "error", saveErr)
1141 }
1142 return
1143 }
1144 }
1145
1146 if resp == nil {
1147 // Actually, we didn't get a response so we can't. Use the default
1148 // session name and return.
1149 slog.Error("Response is nil; can't generate title")
1150 saveErr := a.sessions.Rename(ctx, sessionID, DefaultSessionName)
1151 if saveErr != nil {
1152 slog.Error("Failed to save session title", "error", saveErr)
1153 }
1154 return
1155 }
1156
1157 // Clean up title.
1158 var title string
1159 title = strings.ReplaceAll(resp.Response.Content.Text(), "\n", " ")
1160
1161 // Remove thinking tags if present.
1162 title = thinkTagRegex.ReplaceAllString(title, "")
1163 title = orphanThinkTagRegex.ReplaceAllString(title, "")
1164
1165 title = strings.TrimSpace(title)
1166 title = cmp.Or(title, DefaultSessionName)
1167
1168 // Calculate usage and cost.
1169 var openrouterCost *float64
1170 for _, step := range resp.Steps {
1171 stepCost := a.openrouterCost(step.ProviderMetadata)
1172 if stepCost != nil {
1173 newCost := *stepCost
1174 if openrouterCost != nil {
1175 newCost += *openrouterCost
1176 }
1177 openrouterCost = &newCost
1178 }
1179 }
1180
1181 modelConfig := model.CatwalkCfg
1182 cost := modelConfig.CostPer1MInCached/1e6*float64(resp.TotalUsage.CacheCreationTokens) +
1183 modelConfig.CostPer1MOutCached/1e6*float64(resp.TotalUsage.CacheReadTokens) +
1184 modelConfig.CostPer1MIn/1e6*float64(resp.TotalUsage.InputTokens) +
1185 modelConfig.CostPer1MOut/1e6*float64(resp.TotalUsage.OutputTokens)
1186
1187 // Use override cost if available (e.g., from OpenRouter).
1188 if openrouterCost != nil {
1189 cost = *openrouterCost
1190 }
1191
1192 // Skip cost accumulation
1193 if model.FlatRate {
1194 cost = 0
1195 }
1196
1197 promptTokens := resp.TotalUsage.InputTokens + resp.TotalUsage.CacheCreationTokens
1198 completionTokens := resp.TotalUsage.OutputTokens
1199
1200 // Atomically update only title and usage fields to avoid overriding other
1201 // concurrent session updates.
1202 saveErr := a.sessions.UpdateTitleAndUsage(ctx, sessionID, title, promptTokens, completionTokens, cost)
1203 if saveErr != nil {
1204 slog.Error("Failed to save session title and usage", "error", saveErr)
1205 return
1206 }
1207}
1208
1209func (a *sessionAgent) openrouterCost(metadata fantasy.ProviderMetadata) *float64 {
1210 openrouterMetadata, ok := metadata[openrouter.Name]
1211 if !ok {
1212 return nil
1213 }
1214
1215 opts, ok := openrouterMetadata.(*openrouter.ProviderMetadata)
1216 if !ok {
1217 return nil
1218 }
1219 return &opts.Usage.Cost
1220}
1221
1222func (a *sessionAgent) updateSessionUsage(model Model, session *session.Session, usage fantasy.Usage, overrideCost *float64, estimated bool) {
1223 if !usageIsZero(usage) {
1224 session.EstimatedUsage = estimated
1225 }
1226
1227 modelConfig := model.CatwalkCfg
1228 cost := modelConfig.CostPer1MInCached/1e6*float64(usage.CacheCreationTokens) +
1229 modelConfig.CostPer1MOutCached/1e6*float64(usage.CacheReadTokens) +
1230 modelConfig.CostPer1MIn/1e6*float64(usage.InputTokens) +
1231 modelConfig.CostPer1MOut/1e6*float64(usage.OutputTokens)
1232
1233 if !estimated {
1234 a.eventTokensUsed(session.ID, model, usage, cost)
1235 }
1236
1237 if estimated {
1238 cost = 0
1239 } else {
1240 // Use override cost if available (e.g., from OpenRouter).
1241 if overrideCost != nil {
1242 cost = *overrideCost
1243 }
1244
1245 // Skip cost accumulation
1246 if model.FlatRate {
1247 cost = 0
1248 }
1249 }
1250
1251 session.Cost += cost
1252 updateSessionTokenCounters(session, usage)
1253}
1254
1255func updateSessionTokenCounters(session *session.Session, usage fantasy.Usage) {
1256 if usage.OutputTokens != 0 {
1257 session.CompletionTokens = usage.OutputTokens
1258 }
1259 if promptTokens := usage.InputTokens + usage.CacheReadTokens; promptTokens != 0 {
1260 session.PromptTokens = promptTokens
1261 }
1262}
1263
1264func summaryCompletionTokens(usage fantasy.Usage, summaryMessage message.Message) int64 {
1265 if usage.OutputTokens != 0 {
1266 return usage.OutputTokens
1267 }
1268 return approxTokenCount(summaryMessage.Content().Text) + approxTokenCount(summaryMessage.ReasoningContent().String())
1269}
1270
1271func (a *sessionAgent) Cancel(sessionID string) {
1272 // Cancel regular requests. Don't use Take() here - we need the entry to
1273 // remain in activeRequests so IsBusy() returns true until the goroutine
1274 // fully completes (including error handling that may access the DB).
1275 // The defer in processRequest will clean up the entry.
1276 if cancel, ok := a.activeRequests.Get(sessionID); ok && cancel != nil {
1277 slog.Debug("Request cancellation initiated", "session_id", sessionID)
1278 cancel()
1279 }
1280
1281 // Also check for summarize requests.
1282 if cancel, ok := a.activeRequests.Get(sessionID + "-summarize"); ok && cancel != nil {
1283 slog.Debug("Summarize cancellation initiated", "session_id", sessionID)
1284 cancel()
1285 }
1286
1287 if a.QueuedPrompts(sessionID) > 0 {
1288 slog.Debug("Clearing queued prompts", "session_id", sessionID)
1289 a.messageQueue.Del(sessionID)
1290 }
1291}
1292
1293func (a *sessionAgent) ClearQueue(sessionID string) {
1294 if a.QueuedPrompts(sessionID) > 0 {
1295 slog.Debug("Clearing queued prompts", "session_id", sessionID)
1296 a.messageQueue.Del(sessionID)
1297 }
1298}
1299
1300func (a *sessionAgent) CancelAll() {
1301 if !a.IsBusy() {
1302 return
1303 }
1304 for key := range a.activeRequests.Seq2() {
1305 a.Cancel(key) // key is sessionID
1306 }
1307
1308 timeout := time.After(5 * time.Second)
1309 for a.IsBusy() {
1310 select {
1311 case <-timeout:
1312 return
1313 default:
1314 time.Sleep(200 * time.Millisecond)
1315 }
1316 }
1317}
1318
1319func (a *sessionAgent) IsBusy() bool {
1320 var busy bool
1321 for cancelFunc := range a.activeRequests.Seq() {
1322 if cancelFunc != nil {
1323 busy = true
1324 break
1325 }
1326 }
1327 return busy
1328}
1329
1330func (a *sessionAgent) IsSessionBusy(sessionID string) bool {
1331 _, busy := a.activeRequests.Get(sessionID)
1332 return busy
1333}
1334
1335func (a *sessionAgent) QueuedPrompts(sessionID string) int {
1336 l, ok := a.messageQueue.Get(sessionID)
1337 if !ok {
1338 return 0
1339 }
1340 return len(l)
1341}
1342
1343func (a *sessionAgent) QueuedPromptsList(sessionID string) []string {
1344 l, ok := a.messageQueue.Get(sessionID)
1345 if !ok {
1346 return nil
1347 }
1348 prompts := make([]string, len(l))
1349 for i, call := range l {
1350 prompts[i] = call.Prompt
1351 }
1352 return prompts
1353}
1354
1355func (a *sessionAgent) SetModels(large Model, small Model) {
1356 a.largeModel.Set(large)
1357 a.smallModel.Set(small)
1358}
1359
1360func (a *sessionAgent) SetTools(tools []fantasy.AgentTool) {
1361 a.tools.SetSlice(tools)
1362}
1363
1364func (a *sessionAgent) SetSystemPrompt(systemPrompt string) {
1365 a.systemPrompt.Set(systemPrompt)
1366}
1367
1368func (a *sessionAgent) Model() Model {
1369 return a.largeModel.Get()
1370}
1371
1372// convertToToolResult converts a fantasy tool result to a message tool result.
1373func (a *sessionAgent) convertToToolResult(result fantasy.ToolResultContent) message.ToolResult {
1374 baseResult := message.ToolResult{
1375 ToolCallID: result.ToolCallID,
1376 Name: result.ToolName,
1377 Metadata: result.ClientMetadata,
1378 }
1379
1380 switch result.Result.GetType() {
1381 case fantasy.ToolResultContentTypeText:
1382 if r, ok := fantasy.AsToolResultOutputType[fantasy.ToolResultOutputContentText](result.Result); ok {
1383 baseResult.Content = r.Text
1384 }
1385 case fantasy.ToolResultContentTypeError:
1386 if r, ok := fantasy.AsToolResultOutputType[fantasy.ToolResultOutputContentError](result.Result); ok {
1387 baseResult.Content = r.Error.Error()
1388 baseResult.IsError = true
1389 }
1390 case fantasy.ToolResultContentTypeMedia:
1391 if r, ok := fantasy.AsToolResultOutputType[fantasy.ToolResultOutputContentMedia](result.Result); ok {
1392 if !stringext.IsValidBase64(r.Data) {
1393 slog.Warn(
1394 "Tool returned media with invalid base64 data, discarding image",
1395 "tool", result.ToolName,
1396 "tool_call_id", result.ToolCallID,
1397 )
1398 baseResult.Content = "Tool returned image data with invalid encoding"
1399 baseResult.IsError = true
1400 } else {
1401 content := r.Text
1402 if content == "" {
1403 content = fmt.Sprintf("Loaded %s content", r.MediaType)
1404 }
1405 baseResult.Content = content
1406 baseResult.Data = r.Data
1407 baseResult.MIMEType = r.MediaType
1408 }
1409 }
1410 }
1411
1412 return baseResult
1413}
1414
1415// workaroundProviderMediaLimitations converts media content in tool results to
1416// user messages for providers that don't natively support images in tool results.
1417//
1418// Problem: OpenAI, Google, OpenRouter, and other OpenAI-compatible providers
1419// don't support sending images/media in tool result messages - they only accept
1420// text in tool results. However, they DO support images in user messages.
1421//
1422// If we send media in tool results to these providers, the API returns an error.
1423//
1424// Solution: For these providers, we:
1425// 1. Replace the media in the tool result with a text placeholder
1426// 2. Inject a user message immediately after with the image as a file attachment
1427// 3. This maintains the tool execution flow while working around API limitations
1428//
1429// Anthropic and Bedrock support images natively in tool results, so we skip
1430// this workaround for them.
1431//
1432// Example transformation:
1433//
1434// BEFORE: [tool result: image data]
1435// AFTER: [tool result: "Image loaded - see attached"], [user: image attachment]
1436func (a *sessionAgent) workaroundProviderMediaLimitations(messages []fantasy.Message, largeModel Model) []fantasy.Message {
1437 providerSupportsMedia := largeModel.ModelCfg.Provider == string(catwalk.InferenceProviderAnthropic) ||
1438 largeModel.ModelCfg.Provider == string(catwalk.InferenceProviderBedrock) ||
1439 largeModel.ModelCfg.Provider == string(catwalk.InferenceProviderBedrockEurope)
1440
1441 if providerSupportsMedia {
1442 return messages
1443 }
1444
1445 convertedMessages := make([]fantasy.Message, 0, len(messages))
1446
1447 for _, msg := range messages {
1448 if msg.Role != fantasy.MessageRoleTool {
1449 convertedMessages = append(convertedMessages, msg)
1450 continue
1451 }
1452
1453 textParts := make([]fantasy.MessagePart, 0, len(msg.Content))
1454 var mediaFiles []fantasy.FilePart
1455
1456 for _, part := range msg.Content {
1457 toolResult, ok := fantasy.AsMessagePart[fantasy.ToolResultPart](part)
1458 if !ok {
1459 textParts = append(textParts, part)
1460 continue
1461 }
1462
1463 if media, ok := fantasy.AsToolResultOutputType[fantasy.ToolResultOutputContentMedia](toolResult.Output); ok {
1464 decoded, err := base64.StdEncoding.DecodeString(media.Data)
1465 if err != nil {
1466 slog.Warn("Failed to decode media data", "error", err)
1467 textParts = append(textParts, part)
1468 continue
1469 }
1470
1471 mediaFiles = append(mediaFiles, fantasy.FilePart{
1472 Data: decoded,
1473 MediaType: media.MediaType,
1474 Filename: fmt.Sprintf("tool-result-%s", toolResult.ToolCallID),
1475 })
1476
1477 textParts = append(textParts, fantasy.ToolResultPart{
1478 ToolCallID: toolResult.ToolCallID,
1479 Output: fantasy.ToolResultOutputContentText{
1480 Text: "[Image/media content loaded - see attached file]",
1481 },
1482 ProviderOptions: toolResult.ProviderOptions,
1483 })
1484 } else {
1485 textParts = append(textParts, part)
1486 }
1487 }
1488
1489 convertedMessages = append(convertedMessages, fantasy.Message{
1490 Role: fantasy.MessageRoleTool,
1491 Content: textParts,
1492 })
1493
1494 if len(mediaFiles) > 0 {
1495 convertedMessages = append(convertedMessages, fantasy.NewUserMessage(
1496 "Here is the media content from the tool result:",
1497 mediaFiles...,
1498 ))
1499 }
1500 }
1501
1502 return convertedMessages
1503}
1504
1505// buildSummaryPrompt constructs the prompt text for session summarization.
1506func buildSummaryPrompt(todos []session.Todo) string {
1507 var sb strings.Builder
1508 sb.WriteString("Provide a detailed summary of our conversation above.")
1509 if len(todos) > 0 {
1510 sb.WriteString("\n\n## Current Todo List\n\n")
1511 for _, t := range todos {
1512 fmt.Fprintf(&sb, "- [%s] %s\n", t.Status, t.Content)
1513 }
1514 sb.WriteString("\nInclude these tasks and their statuses in your summary. ")
1515 sb.WriteString("Instruct the resuming assistant to use the `todos` tool to continue tracking progress on these tasks.")
1516 }
1517 return sb.String()
1518}
1519
1520func providerRetryLogFields(err *fantasy.ProviderError, delay time.Duration) []any {
1521 fields := []any{
1522 "retry_delay", delay.String(),
1523 }
1524 if err == nil {
1525 return fields
1526 }
1527 fields = append(fields, "status_code", err.StatusCode)
1528 if err.Title != "" {
1529 fields = append(fields, "title", err.Title)
1530 }
1531 if err.Message != "" {
1532 fields = append(fields, "message", err.Message)
1533 }
1534 return fields
1535}