1// Package agent is the core orchestration layer for Crush AI agents.
2//
3// It provides session-based AI agent functionality for managing
4// conversations, tool execution, and message handling. It coordinates
5// interactions between language models, messages, sessions, and tools while
6// handling features like automatic summarization, queuing, and token
7// management.
8package agent
9
10import (
11 "cmp"
12 "context"
13 _ "embed"
14 "encoding/base64"
15 "errors"
16 "fmt"
17 "log/slog"
18 "net/http"
19 "os"
20 "regexp"
21 "strconv"
22 "strings"
23 "sync"
24 "sync/atomic"
25 "time"
26
27 "charm.land/catwalk/pkg/catwalk"
28 "charm.land/fantasy"
29 "charm.land/fantasy/providers/anthropic"
30 "charm.land/fantasy/providers/bedrock"
31 "charm.land/fantasy/providers/google"
32 "charm.land/fantasy/providers/openai"
33 "charm.land/fantasy/providers/openrouter"
34 "charm.land/fantasy/providers/vercel"
35 "charm.land/lipgloss/v2"
36 "github.com/charmbracelet/crush/internal/agent/hyper"
37 "github.com/charmbracelet/crush/internal/agent/notify"
38 "github.com/charmbracelet/crush/internal/agent/tools"
39 "github.com/charmbracelet/crush/internal/agent/tools/mcp"
40 "github.com/charmbracelet/crush/internal/config"
41 "github.com/charmbracelet/crush/internal/csync"
42 "github.com/charmbracelet/crush/internal/message"
43 "github.com/charmbracelet/crush/internal/pubsub"
44 "github.com/charmbracelet/crush/internal/session"
45 "github.com/charmbracelet/crush/internal/stringext"
46 "github.com/charmbracelet/crush/internal/version"
47 "github.com/charmbracelet/x/exp/charmtone"
48)
49
50const (
51 DefaultSessionName = "Untitled Session"
52
53 // Constants for auto-summarization thresholds
54 largeContextWindowThreshold = 200_000
55 largeContextWindowBuffer = 20_000
56 smallContextWindowRatio = 0.2
57)
58
59var userAgent = fmt.Sprintf("Charm-Crush/%s (https://charm.land/crush)", version.Version)
60
61//go:embed templates/title.md
62var titlePrompt []byte
63
64//go:embed templates/summary.md
65var summaryPrompt []byte
66
67// Used to remove <think> tags from generated titles.
68var (
69 thinkTagRegex = regexp.MustCompile(`(?s)<think>.*?</think>`)
70 orphanThinkTagRegex = regexp.MustCompile(`</?think>`)
71)
72
73type SessionAgentCall struct {
74 SessionID string
75 // RunID, when non-empty, is the caller-supplied correlator that
76 // gets echoed back on the notify.RunComplete event emitted for
77 // this turn. It is preserved when the call is enqueued behind a
78 // busy session so the queued turn's terminal event is still
79 // recognisable to the original caller. Callers that need a
80 // reliable completion contract (e.g. `crush run` against a
81 // session that may be busy) MUST set it; SessionID alone is
82 // ambiguous when concurrent turns share the same session.
83 RunID string
84 Prompt string
85 ProviderOptions fantasy.ProviderOptions
86 Attachments []message.Attachment
87 MaxOutputTokens int64
88 Temperature *float64
89 TopP *float64
90 TopK *int64
91 FrequencyPenalty *float64
92 PresencePenalty *float64
93 NonInteractive bool
94 // OnComplete, when non-nil, replaces the default RunComplete
95 // publish path: the inner Run hands the terminal payload to this
96 // callback instead of emitting it on the RunComplete broker. The
97 // coordinator uses this hook to coalesce the unauthorized →
98 // re-auth → retry chain into a single user-visible terminal
99 // event, so non-interactive clients (e.g. `crush run`) don't
100 // exit on a stale failed-attempt RunComplete before the
101 // successful retry. It is intentionally stripped when queueing
102 // a busy-session call (see Run): the originating
103 // coordinator.Run has long returned by the time the queued
104 // recursion drains, so falling back to the default broker
105 // publish keeps the event visible to subscribers.
106 OnComplete func(notify.RunComplete)
107 // Accepted, when non-nil, is the accept reservation taken by
108 // BeginAccepted before the call was dispatched onto a goroutine
109 // (the client/server fire-and-forget path). Run consumes it under
110 // dispatchMu[SessionID] once the accepted -> (cancel-on-entry |
111 // queued | active) transition has been chosen. When nil
112 // (in-process / local callers like AppWorkspace), behavior is
113 // unchanged and no accept tracking applies.
114 Accepted *AcceptedRun
115}
116
117type SessionAgent interface {
118 Run(context.Context, SessionAgentCall) (*fantasy.AgentResult, error)
119 BeginAccepted(sessionID string) *AcceptedRun
120 SetModels(large Model, small Model)
121 SetTools(tools []fantasy.AgentTool)
122 SetSystemPrompt(systemPrompt string)
123 Cancel(sessionID string)
124 CancelAll()
125 IsSessionBusy(sessionID string) bool
126 IsBusy() bool
127 QueuedPrompts(sessionID string) int
128 QueuedPromptsList(sessionID string) []string
129 ClearQueue(sessionID string)
130 Summarize(context.Context, string, fantasy.ProviderOptions) error
131 Model() Model
132}
133
134type Model struct {
135 Model fantasy.LanguageModel
136 CatwalkCfg catwalk.Model
137 ModelCfg config.SelectedModel
138 FlatRate bool
139}
140
141type sessionAgent struct {
142 largeModel *csync.Value[Model]
143 smallModel *csync.Value[Model]
144 systemPromptPrefix *csync.Value[string]
145 systemPrompt *csync.Value[string]
146 tools *csync.Slice[fantasy.AgentTool]
147
148 isSubAgent bool
149 sessions session.Service
150 messages message.Service
151 disableAutoSummarize bool
152 isYolo bool
153 notify pubsub.Publisher[notify.Notification]
154 runComplete pubsub.Publisher[notify.RunComplete]
155
156 messageQueue *csync.Map[string, []SessionAgentCall]
157 activeRequests *csync.Map[string, context.CancelFunc]
158
159 // dispatchMu holds a per-session mutex that serializes the
160 // accepted -> (cancel-on-entry | queued | active) transition in
161 // Run against a concurrent Cancel. The lock is held only during
162 // the brief handoff (no DB or LLM I/O under the lock).
163 dispatchMu *csync.Map[string, *sync.Mutex]
164 // acceptedRuns counts dispatched-but-not-yet-active runs per
165 // session. A counter > 0 means a dispatched prompt is in flight
166 // and has not yet completed the dispatch handoff in Run. Only
167 // BeginAccepted increments it; only AcceptedRun.Close decrements
168 // it.
169 acceptedRuns *csync.Map[string, int]
170 // pendingCancels records sessions whose dispatched-but-not-yet-
171 // running call should observe a cancellation request. It is only
172 // set by Cancel when acceptedRuns > 0, so an idle Escape never
173 // poisons the next prompt.
174 pendingCancels *csync.Map[string, struct{}]
175 // dispatchMuCreate guards lazy creation of per-session entries in
176 // dispatchMu so two goroutines can't race to lock different mutex
177 // instances for the same session.
178 dispatchMuCreate sync.Mutex
179 // acceptedMu serializes increments/decrements of acceptedRuns. It
180 // is separate from dispatchMu so AcceptedRun.Close (which may run
181 // while Run holds dispatchMu for the same session) does not
182 // deadlock by re-entering the dispatch lock.
183 acceptedMu sync.Mutex
184}
185
186type SessionAgentOptions struct {
187 LargeModel Model
188 SmallModel Model
189 SystemPromptPrefix string
190 SystemPrompt string
191 IsSubAgent bool
192 DisableAutoSummarize bool
193 IsYolo bool
194 Sessions session.Service
195 Messages message.Service
196 Tools []fantasy.AgentTool
197 Notify pubsub.Publisher[notify.Notification]
198 RunComplete pubsub.Publisher[notify.RunComplete]
199}
200
201func NewSessionAgent(
202 opts SessionAgentOptions,
203) SessionAgent {
204 return &sessionAgent{
205 largeModel: csync.NewValue(opts.LargeModel),
206 smallModel: csync.NewValue(opts.SmallModel),
207 systemPromptPrefix: csync.NewValue(opts.SystemPromptPrefix),
208 systemPrompt: csync.NewValue(opts.SystemPrompt),
209 isSubAgent: opts.IsSubAgent,
210 sessions: opts.Sessions,
211 messages: opts.Messages,
212 disableAutoSummarize: opts.DisableAutoSummarize,
213 tools: csync.NewSliceFrom(opts.Tools),
214 isYolo: opts.IsYolo,
215 notify: opts.Notify,
216 runComplete: opts.RunComplete,
217 messageQueue: csync.NewMap[string, []SessionAgentCall](),
218 activeRequests: csync.NewMap[string, context.CancelFunc](),
219 dispatchMu: csync.NewMap[string, *sync.Mutex](),
220 acceptedRuns: csync.NewMap[string, int](),
221 pendingCancels: csync.NewMap[string, struct{}](),
222 }
223}
224
225// AcceptedRun owns exactly one accept reservation taken by
226// BeginAccepted. It is the only carrier of accept-state across the
227// backend.runAgent / Coordinator.Run / sessionAgent.Run layers: a
228// counter > 0 means a dispatched prompt is in flight and has not yet
229// completed the dispatch handoff in Run. Close is the only way to
230// release the reservation and is idempotent.
231type AcceptedRun struct {
232 agent *sessionAgent
233 sessionID string
234 done atomic.Bool
235}
236
237// Close decrements the accept counter for this reservation. It is safe
238// to call multiple times; only the first call has effect.
239func (r *AcceptedRun) Close() {
240 if r == nil {
241 return
242 }
243 if !r.done.CompareAndSwap(false, true) {
244 return
245 }
246 r.agent.endAccepted(r.sessionID)
247}
248
249// SessionID exposes the session this reservation is for so the run path
250// can use it without an extra parameter.
251func (r *AcceptedRun) SessionID() string {
252 if r == nil {
253 return ""
254 }
255 return r.sessionID
256}
257
258// BeginAccepted increments the accept counter for sessionID and returns
259// a handle whose Close is the only way to decrement it. It is the only
260// entry point that mutates acceptedRuns.
261func (a *sessionAgent) BeginAccepted(sessionID string) *AcceptedRun {
262 a.acceptedMu.Lock()
263 defer a.acceptedMu.Unlock()
264 count, _ := a.acceptedRuns.Get(sessionID)
265 a.acceptedRuns.Set(sessionID, count+1)
266 return &AcceptedRun{agent: a, sessionID: sessionID}
267}
268
269// endAccepted decrements the accept counter for sessionID. It is only
270// called via AcceptedRun.Close. It uses a dedicated lock (not the
271// per-session dispatch mutex) so it can run while Run holds dispatchMu
272// for the same session without deadlocking.
273func (a *sessionAgent) endAccepted(sessionID string) {
274 a.acceptedMu.Lock()
275 defer a.acceptedMu.Unlock()
276 count, ok := a.acceptedRuns.Get(sessionID)
277 if !ok || count <= 1 {
278 a.acceptedRuns.Del(sessionID)
279 return
280 }
281 a.acceptedRuns.Set(sessionID, count-1)
282}
283
284// sessionMu returns the per-session dispatch mutex, creating it on first
285// use. Creation is guarded so concurrent callers always observe the same
286// mutex instance for a given session.
287func (a *sessionAgent) sessionMu(sessionID string) *sync.Mutex {
288 if mu, ok := a.dispatchMu.Get(sessionID); ok {
289 return mu
290 }
291 a.dispatchMuCreate.Lock()
292 defer a.dispatchMuCreate.Unlock()
293 if mu, ok := a.dispatchMu.Get(sessionID); ok {
294 return mu
295 }
296 mu := &sync.Mutex{}
297 a.dispatchMu.Set(sessionID, mu)
298 return mu
299}
300
301// enqueueCall appends call to the session's message queue. The
302// OnComplete hook is stripped: the caller that supplied it (typically
303// coordinator.Run) has its own retry/coalesce scope that ends when it
304// returns, so by the time the queue drains nobody is left to consume the
305// buffered terminal event. The recursive Run falls back to the default
306// broker publish, which is what existing subscribers expect for queued
307// turns.
308func (a *sessionAgent) enqueueCall(call SessionAgentCall) {
309 existing, ok := a.messageQueue.Get(call.SessionID)
310 if !ok {
311 existing = []SessionAgentCall{}
312 }
313 queued := call
314 queued.OnComplete = nil
315 queued.Accepted = nil
316 existing = append(existing, queued)
317 a.messageQueue.Set(call.SessionID, existing)
318}
319
320// clearPendingCancel removes any pending-cancel record for sessionID. It
321// takes the per-session dispatch lock so it is ordered against Cancel and
322// the dispatch handoff.
323func (a *sessionAgent) clearPendingCancel(sessionID string) {
324 mu := a.sessionMu(sessionID)
325 mu.Lock()
326 defer mu.Unlock()
327 a.pendingCancels.Del(sessionID)
328}
329
330// persistCanceledTurn writes the user/assistant records for a turn that
331// was canceled before (or just as) streaming would have produced them.
332// It creates the user message only when it was not already created by an
333// earlier createUserMessage call (userMsgCreated), then writes an
334// assistant message with FinishReasonCanceled. Both writes use
335// context.WithoutCancel(ctx) so workspace shutdown (which cancels the run
336// context) can't drop them.
337func (a *sessionAgent) persistCanceledTurn(ctx context.Context, call SessionAgentCall, userMsgCreated bool) error {
338 writeCtx, cancel := context.WithTimeout(context.WithoutCancel(ctx), 5*time.Second)
339 defer cancel()
340 if !userMsgCreated {
341 if _, err := a.createUserMessage(writeCtx, call); err != nil {
342 return err
343 }
344 }
345 largeModel := a.largeModel.Get()
346 assistant, err := a.messages.Create(writeCtx, call.SessionID, message.CreateMessageParams{
347 Role: message.Assistant,
348 Parts: []message.ContentPart{},
349 Model: largeModel.ModelCfg.Model,
350 Provider: largeModel.ModelCfg.Provider,
351 })
352 if err != nil {
353 return err
354 }
355 assistant.AddFinish(message.FinishReasonCanceled, "User canceled request", "")
356 return a.messages.Update(writeCtx, assistant)
357}
358
359// ValidateCall performs the cheap structural validation that
360// sessionAgent.Run requires before a call can be dispatched: a call must
361// carry either a non-empty prompt or a text attachment, and it must name a
362// session. It is exported so callers that accept a run before dispatching it
363// (e.g. backend.SendMessage) can apply the same checks and keep the error
364// contract consistent.
365func ValidateCall(call SessionAgentCall) error {
366 if call.Prompt == "" && !message.ContainsTextAttachment(call.Attachments) {
367 return ErrEmptyPrompt
368 }
369 if call.SessionID == "" {
370 return ErrSessionMissing
371 }
372 return nil
373}
374
375func (a *sessionAgent) Run(ctx context.Context, call SessionAgentCall) (result *fantasy.AgentResult, retErr error) {
376 if err := ValidateCall(call); err != nil {
377 return nil, err
378 }
379
380 // genCtx/cancel are the run context and its cancel func. For the
381 // accepted (fire-and-forget) dispatch path they are created under
382 // dispatchMu below so a concurrent Cancel can observe the
383 // activeRequests entry before the assistant message exists. For
384 // the in-process path they stay nil here and are created later,
385 // preserving the original ordering.
386 var (
387 genCtx context.Context
388 cancel context.CancelFunc
389 activeRegistered bool
390 userMsgCreated bool
391 )
392
393 if call.Accepted != nil {
394 // Serialize the accepted -> (cancel-on-entry | queued |
395 // active) transition against a concurrent Cancel. Cancel takes
396 // the same per-session lock, so every cancel observes at least
397 // one of: pendingCancels, an activeRequests entry, or a
398 // messageQueue entry it then clears.
399 mu := a.sessionMu(call.SessionID)
400 mu.Lock()
401
402 if _, pending := a.pendingCancels.Get(call.SessionID); pending {
403 // Cancel-on-entry: a cancel arrived while this run was
404 // dispatched but not yet active. Consume the pending
405 // cancel, release the accept reservation, drop the lock,
406 // and persist a canceled turn without entering Stream.
407 a.pendingCancels.Del(call.SessionID)
408 call.Accepted.Close()
409 mu.Unlock()
410 if err := a.persistCanceledTurn(ctx, call, false); err != nil {
411 return nil, err
412 }
413 return nil, nil
414 }
415
416 if a.IsSessionBusy(call.SessionID) {
417 // Busy: an earlier prompt is active. Queue this call and
418 // release the accept reservation. A Cancel arriving after
419 // this point sees the active entry and clears the queue.
420 a.enqueueCall(call)
421 call.Accepted.Close()
422 mu.Unlock()
423 return nil, nil
424 }
425
426 // Idle: become the active run. Register the cancel func before
427 // dropping the lock so a Cancel that arrives between here and
428 // assistant creation is not lost.
429 runCtx := context.WithValue(ctx, tools.SessionIDContextKey, call.SessionID)
430 genCtx, cancel = context.WithCancel(runCtx)
431 a.activeRequests.Set(call.SessionID, cancel)
432 activeRegistered = true
433 call.Accepted.Close()
434 mu.Unlock()
435
436 defer cancel()
437 defer a.activeRequests.Del(call.SessionID)
438 } else if a.IsSessionBusy(call.SessionID) {
439 // Queue the message if busy. Strip OnComplete: the caller that
440 // supplied the hook (typically coordinator.Run) has its own
441 // retry/coalesce scope that ends when it returns, so by the time
442 // the queue drains nobody is left to consume the buffered
443 // terminal event. The recursive Run will fall back to the
444 // default broker publish, which is what existing subscribers
445 // expect for queued turns.
446 a.enqueueCall(call)
447 return nil, nil
448 }
449
450 // Copy mutable fields under lock to avoid races with SetTools/SetModels.
451 agentTools := a.tools.Copy()
452 largeModel := a.largeModel.Get()
453 systemPrompt := a.systemPrompt.Get()
454 promptPrefix := a.systemPromptPrefix.Get()
455 var instructions strings.Builder
456
457 for _, server := range mcp.GetStates() {
458 if server.State != mcp.StateConnected {
459 continue
460 }
461 if s := server.Client.InitializeResult().Instructions; s != "" {
462 instructions.WriteString(s)
463 instructions.WriteString("\n\n")
464 }
465 }
466
467 if s := instructions.String(); s != "" {
468 systemPrompt += "\n\n<mcp-instructions>\n" + s + "\n</mcp-instructions>"
469 }
470
471 if len(agentTools) > 0 {
472 // Add Anthropic caching to the last tool.
473 agentTools[len(agentTools)-1].SetProviderOptions(a.getCacheControlOptions())
474 }
475
476 agent := fantasy.NewAgent(
477 largeModel.Model,
478 fantasy.WithSystemPrompt(systemPrompt),
479 fantasy.WithTools(agentTools...),
480 fantasy.WithUserAgent(userAgent),
481 )
482
483 sessionLock := sync.Mutex{}
484 currentSession, err := a.sessions.Get(ctx, call.SessionID)
485 if err != nil {
486 return nil, fmt.Errorf("failed to get session: %w", err)
487 }
488
489 msgs, err := a.getSessionMessages(ctx, currentSession)
490 if err != nil {
491 return nil, fmt.Errorf("failed to get session messages: %w", err)
492 }
493
494 var wg sync.WaitGroup
495 // Generate title if first message.
496 if len(msgs) == 0 {
497 titleCtx := ctx // Copy to avoid race with ctx reassignment below.
498 wg.Go(func() {
499 a.generateTitle(titleCtx, call.SessionID, call.Prompt)
500 })
501 }
502 defer wg.Wait()
503
504 // Add the user message to the session.
505 _, err = a.createUserMessage(ctx, call)
506 if err != nil {
507 return nil, err
508 }
509 userMsgCreated = true
510
511 // Add the session to the context.
512 ctx = context.WithValue(ctx, tools.SessionIDContextKey, call.SessionID)
513
514 // For the accepted dispatch path the run context and cancel func
515 // were already created and registered under dispatchMu above; reuse
516 // them. For the in-process path create them here, preserving the
517 // original ordering.
518 if !activeRegistered {
519 genCtx, cancel = context.WithCancel(ctx)
520 a.activeRequests.Set(call.SessionID, cancel)
521
522 defer cancel()
523 defer a.activeRequests.Del(call.SessionID)
524 }
525 // skipRunComplete is set just before the queued-recursion path so
526 // the outer Run doesn't publish a RunComplete that would race
527 // with — and be superseded by — the recursive call's own
528 // RunComplete (each queued user prompt is its own turn and
529 // publishes exactly one terminal event).
530 var skipRunComplete bool
531 // currentAssistant is declared here so the deferred RunComplete
532 // publish below can capture the pointer that PrepareStep will
533 // later (re)assign for each streaming step. The final assistant
534 // message of the turn is the value reachable through this
535 // pointer when the defer runs.
536 var currentAssistant *message.Message
537 // Drain any debounced message updates before returning. message.Service
538 // already flushes synchronously on terminal updates, but a defer here
539 // guarantees the contract at every Run exit (success, error, panic
540 // recovery upstream) without callers needing to know.
541 //
542 // After the flush completes — meaning all per-message
543 // Publish(UpdatedEvent) calls have fired and been buffered into
544 // every subscriber's channel — publish the authoritative
545 // RunComplete event for this turn. The flush-then-publish order
546 // gives well-behaved clients the best chance of seeing the final
547 // message event before RunComplete; the embedded Text field
548 // reconciles for clients that observe the events out of order
549 // (the pubsub broker fan-in does not serialize publishes from
550 // different upstream brokers).
551 defer func() {
552 // Use a context detached from the run context: workspace
553 // shutdown cancels ctx before this goroutine returns, but the
554 // buffered streaming deltas must still land before the DB is
555 // closed. A short timeout bounds the flush.
556 flushCtx, flushCancel := context.WithTimeout(context.WithoutCancel(ctx), 5*time.Second)
557 defer flushCancel()
558 if flushErr := a.messages.FlushAll(flushCtx); flushErr != nil {
559 slog.Error("Failed to flush pending message updates after run", "error", flushErr)
560 }
561 if skipRunComplete {
562 return
563 }
564 complete := notify.RunComplete{SessionID: call.SessionID, RunID: call.RunID}
565 if currentAssistant != nil {
566 complete.MessageID = currentAssistant.ID
567 complete.Text = currentAssistant.Content().String()
568 }
569 if retErr != nil {
570 complete.Error = retErr.Error()
571 complete.Cancelled = errors.Is(retErr, context.Canceled)
572 } else if ctx.Err() != nil {
573 complete.Cancelled = true
574 }
575 // Prefer the per-call hook when supplied so the coordinator
576 // can coalesce retries (e.g. unauthorized → re-auth → retry)
577 // into a single user-visible terminal event. The fallback
578 // must-deliver publish applies bounded-blocking semantics to
579 // the authoritative terminal event so a momentarily-full
580 // subscriber channel can't silently drop it and hang
581 // non-interactive clients waiting on RunComplete.
582 if call.OnComplete != nil {
583 call.OnComplete(complete)
584 return
585 }
586 if a.runComplete == nil {
587 return
588 }
589 a.runComplete.PublishMustDeliver(ctx, pubsub.UpdatedEvent, complete)
590 }()
591
592 history, files := a.preparePrompt(msgs, largeModel.CatwalkCfg.SupportsImages, call.Attachments...)
593
594 startTime := time.Now()
595 a.eventPromptSent(call.SessionID)
596
597 var stepMessages []fantasy.Message
598 var shouldSummarize bool
599 // Don't send MaxOutputTokens if 0 — some providers (e.g. LM Studio) reject it
600 var maxOutputTokens *int64
601 if call.MaxOutputTokens > 0 {
602 maxOutputTokens = &call.MaxOutputTokens
603 }
604 result, err = agent.Stream(genCtx, fantasy.AgentStreamCall{
605 Prompt: message.PromptWithTextAttachments(call.Prompt, call.Attachments),
606 Files: files,
607 Messages: history,
608 ProviderOptions: call.ProviderOptions,
609 MaxOutputTokens: maxOutputTokens,
610 TopP: call.TopP,
611 Temperature: call.Temperature,
612 PresencePenalty: call.PresencePenalty,
613 TopK: call.TopK,
614 FrequencyPenalty: call.FrequencyPenalty,
615 PrepareStep: func(callContext context.Context, options fantasy.PrepareStepFunctionOptions) (_ context.Context, prepared fantasy.PrepareStepResult, err error) {
616 prepared.Messages = options.Messages
617 for i := range prepared.Messages {
618 prepared.Messages[i].ProviderOptions = nil
619 }
620
621 // Use latest tools (updated by SetTools when MCP tools change).
622 prepared.Tools = a.tools.Copy()
623
624 // Drain queued follow-up prompts, but skip them if a cancel
625 // was recorded for the session while they sat in the queue:
626 // a cancel that arrived after the queue insertion must not
627 // let the queued prompt run as part of this step.
628 dispatchLock := a.sessionMu(call.SessionID)
629 dispatchLock.Lock()
630 _, canceled := a.pendingCancels.Get(call.SessionID)
631 queuedCalls, _ := a.messageQueue.Get(call.SessionID)
632 a.messageQueue.Del(call.SessionID)
633 dispatchLock.Unlock()
634 if !canceled {
635 for _, queued := range queuedCalls {
636 userMessage, createErr := a.createUserMessage(callContext, queued)
637 if createErr != nil {
638 return callContext, prepared, createErr
639 }
640 prepared.Messages = append(prepared.Messages, userMessage.ToAIMessage()...)
641 }
642 }
643
644 prepared.Messages = a.workaroundProviderMediaLimitations(prepared.Messages, largeModel)
645
646 lastSystemRoleInx := 0
647 systemMessageUpdated := false
648 for i, msg := range prepared.Messages {
649 // Only add cache control to the last message.
650 if msg.Role == fantasy.MessageRoleSystem {
651 lastSystemRoleInx = i
652 } else if !systemMessageUpdated {
653 prepared.Messages[lastSystemRoleInx].ProviderOptions = a.getCacheControlOptions()
654 systemMessageUpdated = true
655 }
656 // Than add cache control to the last 2 messages.
657 if i > len(prepared.Messages)-3 {
658 prepared.Messages[i].ProviderOptions = a.getCacheControlOptions()
659 }
660 }
661
662 if promptPrefix != "" {
663 prepared.Messages = append([]fantasy.Message{fantasy.NewSystemMessage(promptPrefix)}, prepared.Messages...)
664 }
665
666 sessionLock.Lock()
667 stepMessages = cloneFantasyMessages(prepared.Messages)
668 sessionLock.Unlock()
669
670 var assistantMsg message.Message
671 assistantMsg, err = a.messages.Create(callContext, call.SessionID, message.CreateMessageParams{
672 Role: message.Assistant,
673 Parts: []message.ContentPart{},
674 Model: largeModel.ModelCfg.Model,
675 Provider: largeModel.ModelCfg.Provider,
676 })
677 if err != nil {
678 return callContext, prepared, err
679 }
680 callContext = context.WithValue(callContext, tools.MessageIDContextKey, assistantMsg.ID)
681 callContext = context.WithValue(callContext, tools.SupportsImagesContextKey, largeModel.CatwalkCfg.SupportsImages)
682 callContext = context.WithValue(callContext, tools.ModelNameContextKey, largeModel.CatwalkCfg.Name)
683 currentAssistant = &assistantMsg
684 return callContext, prepared, err
685 },
686 OnReasoningStart: func(id string, reasoning fantasy.ReasoningContent) error {
687 currentAssistant.AppendReasoningContent(reasoning.Text)
688 return a.messages.Update(genCtx, *currentAssistant)
689 },
690 OnReasoningDelta: func(id string, text string) error {
691 currentAssistant.AppendReasoningContent(text)
692 return a.messages.Update(genCtx, *currentAssistant)
693 },
694 OnReasoningEnd: func(id string, reasoning fantasy.ReasoningContent) error {
695 // handle anthropic signature
696 if anthropicData, ok := reasoning.ProviderMetadata[anthropic.Name]; ok {
697 if reasoning, ok := anthropicData.(*anthropic.ReasoningOptionMetadata); ok {
698 currentAssistant.AppendReasoningSignature(reasoning.Signature)
699 }
700 }
701 if googleData, ok := reasoning.ProviderMetadata[google.Name]; ok {
702 if reasoning, ok := googleData.(*google.ReasoningMetadata); ok {
703 currentAssistant.AppendThoughtSignature(reasoning.Signature, reasoning.ToolID)
704 }
705 }
706 if openaiData, ok := reasoning.ProviderMetadata[openai.Name]; ok {
707 if reasoning, ok := openaiData.(*openai.ResponsesReasoningMetadata); ok {
708 currentAssistant.SetReasoningResponsesData(reasoning)
709 }
710 }
711 currentAssistant.FinishThinking()
712 return a.messages.Update(genCtx, *currentAssistant)
713 },
714 OnTextDelta: func(id string, text string) error {
715 // Strip leading newline from initial text content. This is is
716 // particularly important in non-interactive mode where leading
717 // newlines are very visible.
718 if len(currentAssistant.Parts) == 0 {
719 text = strings.TrimPrefix(text, "\n")
720 }
721
722 currentAssistant.AppendContent(text)
723 return a.messages.Update(genCtx, *currentAssistant)
724 },
725 OnToolInputStart: func(id string, toolName string) error {
726 toolCall := message.ToolCall{
727 ID: id,
728 Name: toolName,
729 ProviderExecuted: false,
730 Finished: false,
731 }
732 currentAssistant.AddToolCall(toolCall)
733 // Use parent ctx instead of genCtx to ensure the update succeeds
734 // even if the request is canceled mid-stream
735 return a.messages.Update(ctx, *currentAssistant)
736 },
737 OnRetry: func(err *fantasy.ProviderError, delay time.Duration) {
738 slog.Warn("Provider request failed, retrying", providerRetryLogFields(err, delay)...)
739 },
740 OnToolCall: func(tc fantasy.ToolCallContent) error {
741 toolCall := message.ToolCall{
742 ID: tc.ToolCallID,
743 Name: tc.ToolName,
744 Input: tc.Input,
745 ProviderExecuted: false,
746 Finished: true,
747 }
748 currentAssistant.AddToolCall(toolCall)
749 // Use parent ctx instead of genCtx to ensure the update succeeds
750 // even if the request is canceled mid-stream
751 return a.messages.Update(ctx, *currentAssistant)
752 },
753 OnToolResult: func(result fantasy.ToolResultContent) error {
754 toolResult := a.convertToToolResult(result)
755 // Use parent ctx instead of genCtx to ensure the message is created
756 // even if the request is canceled mid-stream
757 _, createMsgErr := a.messages.Create(ctx, currentAssistant.SessionID, message.CreateMessageParams{
758 Role: message.Tool,
759 Parts: []message.ContentPart{
760 toolResult,
761 },
762 })
763 return createMsgErr
764 },
765 OnStepFinish: func(stepResult fantasy.StepResult) error {
766 finishReason := message.FinishReasonUnknown
767 switch stepResult.FinishReason {
768 case fantasy.FinishReasonLength:
769 finishReason = message.FinishReasonMaxTokens
770 case fantasy.FinishReasonStop:
771 finishReason = message.FinishReasonEndTurn
772 case fantasy.FinishReasonToolCalls:
773 finishReason = message.FinishReasonToolUse
774 }
775 // If a tool result halted the turn (e.g. a hook halt or a
776 // permission denial), the step ends on FinishReasonToolCalls but
777 // the model will not be called again. Treat it as the end of the
778 // turn so the UI can render the assistant footer.
779 if finishReason == message.FinishReasonToolUse {
780 for _, tr := range stepResult.Content.ToolResults() {
781 if tr.StopTurn {
782 finishReason = message.FinishReasonEndTurn
783 break
784 }
785 }
786 }
787 currentAssistant.AddFinish(finishReason, "", "")
788 sessionLock.Lock()
789 defer sessionLock.Unlock()
790
791 updatedSession, getSessionErr := a.sessions.Get(ctx, call.SessionID)
792 if getSessionErr != nil {
793 return getSessionErr
794 }
795 usage, estimated := fallbackStepUsage(stepMessages, stepResult)
796 a.updateSessionUsage(largeModel, &updatedSession, usage, a.openrouterCost(stepResult.ProviderMetadata), estimated)
797 _, sessionErr := a.sessions.Save(ctx, updatedSession)
798 if sessionErr != nil {
799 return sessionErr
800 }
801 currentSession = updatedSession
802 return a.messages.Update(genCtx, *currentAssistant)
803 },
804 StopWhen: []fantasy.StopCondition{
805 func(_ []fantasy.StepResult) bool {
806 cw := int64(largeModel.CatwalkCfg.ContextWindow)
807 // If context window is unknown (0), skip auto-summarize
808 // to avoid immediately truncating custom/local models.
809 if cw == 0 {
810 return false
811 }
812 tokens := currentSession.CompletionTokens + currentSession.PromptTokens
813 remaining := cw - tokens
814 var threshold int64
815 if cw > largeContextWindowThreshold {
816 threshold = largeContextWindowBuffer
817 } else {
818 threshold = int64(float64(cw) * smallContextWindowRatio)
819 }
820 if (remaining <= threshold) && !a.disableAutoSummarize {
821 shouldSummarize = true
822 return true
823 }
824 return false
825 },
826 func(steps []fantasy.StepResult) bool {
827 return hasRepeatedToolCalls(steps, loopDetectionWindowSize, loopDetectionMaxRepeats)
828 },
829 },
830 })
831
832 a.eventPromptResponded(call.SessionID, time.Since(startTime).Truncate(time.Second))
833
834 if err != nil {
835 isHyper := largeModel.ModelCfg.Provider == hyper.Name
836 isCancelErr := errors.Is(err, context.Canceled)
837 if currentAssistant == nil {
838 // Cancel-before-assistant-creation window: the run was
839 // canceled after activeRequests.Set but before PrepareStep
840 // created the assistant message. Without this, the turn
841 // would return with no FinishReasonCanceled marker and no
842 // user-visible record. The user message was already created
843 // above, so persistCanceledTurn only writes the assistant
844 // record.
845 if isCancelErr {
846 if persistErr := a.persistCanceledTurn(ctx, call, userMsgCreated); persistErr != nil {
847 return nil, persistErr
848 }
849 }
850 return result, err
851 }
852 // Persist final state with a context detached from the run
853 // context. The run context (ctx) is derived from the
854 // workspace context, which workspace shutdown cancels before
855 // agent goroutines finish; using ctx here would drop the
856 // final assistant state. WithoutCancel keeps the values
857 // (e.g. session ID) while ignoring cancellation, and a short
858 // timeout bounds the cleanup writes.
859 cleanupCtx, cleanupCancel := context.WithTimeout(context.WithoutCancel(ctx), 5*time.Second)
860 defer cleanupCancel()
861 // Ensure we finish thinking on error to close the reasoning state.
862 currentAssistant.FinishThinking()
863 toolCalls := currentAssistant.ToolCalls()
864 // INFO: we use the cleanup context here because the genCtx has been cancelled.
865 msgs, createErr := a.messages.List(cleanupCtx, currentAssistant.SessionID)
866 if createErr != nil {
867 return nil, createErr
868 }
869 for _, tc := range toolCalls {
870 if !tc.Finished {
871 tc.Finished = true
872 tc.Input = "{}"
873 currentAssistant.AddToolCall(tc)
874 updateErr := a.messages.Update(cleanupCtx, *currentAssistant)
875 if updateErr != nil {
876 return nil, updateErr
877 }
878 }
879
880 found := false
881 for _, msg := range msgs {
882 if msg.Role == message.Tool {
883 for _, tr := range msg.ToolResults() {
884 if tr.ToolCallID == tc.ID {
885 found = true
886 break
887 }
888 }
889 }
890 if found {
891 break
892 }
893 }
894 if found {
895 continue
896 }
897 content := "There was an error while executing the tool"
898 if isCancelErr {
899 content = "Error: user cancelled assistant tool calling"
900 }
901 toolResult := message.ToolResult{
902 ToolCallID: tc.ID,
903 Name: tc.Name,
904 Content: content,
905 IsError: true,
906 }
907 _, createErr = a.messages.Create(cleanupCtx, currentAssistant.SessionID, message.CreateMessageParams{
908 Role: message.Tool,
909 Parts: []message.ContentPart{
910 toolResult,
911 },
912 })
913 if createErr != nil {
914 return nil, createErr
915 }
916 }
917 var fantasyErr *fantasy.Error
918 var providerErr *fantasy.ProviderError
919 const defaultTitle = "Provider Error"
920 linkStyle := lipgloss.NewStyle().Foreground(charmtone.Guac).Underline(true)
921 if isCancelErr {
922 currentAssistant.AddFinish(message.FinishReasonCanceled, "User canceled request", "")
923 } else if isHyper && errors.As(err, &providerErr) && providerErr.StatusCode == http.StatusUnauthorized {
924 currentAssistant.AddFinish(message.FinishReasonError, "Unauthorized", `Please re-authenticate with Hyper. You can also run "crush auth" to re-authenticate.`)
925 if a.notify != nil {
926 a.notify.Publish(pubsub.CreatedEvent, notify.Notification{
927 SessionID: call.SessionID,
928 SessionTitle: currentSession.Title,
929 Type: notify.TypeReAuthenticate,
930 ProviderID: largeModel.ModelCfg.Provider,
931 })
932 }
933 } else if isHyper && errors.As(err, &providerErr) && providerErr.StatusCode == http.StatusPaymentRequired {
934 url := hyper.BaseURL()
935 link := linkStyle.Hyperlink(url, "id=hyper").Render(url)
936 currentAssistant.AddFinish(message.FinishReasonError, "No credits", "You're out of credits. Add more at "+link)
937 } else if errors.As(err, &providerErr) {
938 if providerErr.Message == "The requested model is not supported." {
939 url := "https://github.com/settings/copilot/features"
940 link := linkStyle.Hyperlink(url, "id=copilot").Render(url)
941 currentAssistant.AddFinish(
942 message.FinishReasonError,
943 "Copilot model not enabled",
944 fmt.Sprintf("%q is not enabled in Copilot. Go to the following page to enable it. Then, wait 5 minutes before trying again. %s", largeModel.CatwalkCfg.Name, link),
945 )
946 } else {
947 currentAssistant.AddFinish(message.FinishReasonError, cmp.Or(stringext.Capitalize(providerErr.Title), defaultTitle), providerErr.Message)
948 }
949 } else if errors.As(err, &fantasyErr) {
950 currentAssistant.AddFinish(message.FinishReasonError, cmp.Or(stringext.Capitalize(fantasyErr.Title), defaultTitle), fantasyErr.Message)
951 } else {
952 currentAssistant.AddFinish(message.FinishReasonError, defaultTitle, err.Error())
953 }
954 // Note: we use the cleanup context here because the genCtx has been
955 // cancelled.
956 updateErr := a.messages.Update(cleanupCtx, *currentAssistant)
957 if updateErr != nil {
958 return nil, updateErr
959 }
960 return nil, err
961 }
962
963 if shouldSummarize {
964 a.activeRequests.Del(call.SessionID)
965 if summarizeErr := a.Summarize(genCtx, call.SessionID, call.ProviderOptions); summarizeErr != nil {
966 return nil, summarizeErr
967 }
968 // If the agent wasn't done...
969 if len(currentAssistant.ToolCalls()) > 0 {
970 existing, ok := a.messageQueue.Get(call.SessionID)
971 if !ok {
972 existing = []SessionAgentCall{}
973 }
974 call.Prompt = fmt.Sprintf("The previous session was interrupted because it got too long, the initial user request was: `%s`", call.Prompt)
975 existing = append(existing, call)
976 a.messageQueue.Set(call.SessionID, existing)
977 }
978 }
979
980 // Release active request before publishing the notification.
981 // TUI handlers poll IsSessionBusy() and only re-evaluate when a
982 // tea.Msg arrives, so the cleanup must precede the notify or
983 // subscribers see stale busy state at the moment of receipt.
984 a.activeRequests.Del(call.SessionID)
985 cancel()
986
987 // Send notification that agent has finished its turn (skip for
988 // nested/non-interactive sessions).
989 if !call.NonInteractive && a.notify != nil {
990 a.notify.Publish(pubsub.CreatedEvent, notify.Notification{
991 SessionID: call.SessionID,
992 SessionTitle: currentSession.Title,
993 Type: notify.TypeAgentFinished,
994 })
995 }
996
997 // Hand off to the next queued prompt (if any) under dispatchMu so
998 // the transition from this finished run to the queued run is atomic
999 // against a concurrent Cancel. activeRequests for this session was
1000 // just deleted above, so without the lock there is a window in
1001 // which the session looks idle and a cancel becomes a no-op that
1002 // fails to stop the queued prompt. Holding the lock lets us observe
1003 // a pending cancel recorded against the session and drop the queue
1004 // instead of running it, and (for the recursion) hand a fresh
1005 // accept reservation to the dequeued call so acceptedRuns stays > 0
1006 // across the recursive Run's own dispatch handoff — keeping the
1007 // session observable to Cancel for the entire transition and
1008 // closing the dequeue -> re-register window.
1009 mu := a.sessionMu(call.SessionID)
1010 mu.Lock()
1011 if _, pending := a.pendingCancels.Get(call.SessionID); pending {
1012 // A cancel was recorded for this session (e.g. it arrived while
1013 // this run was active and a follow-up had been accepted). Drop
1014 // the queue instead of running it and consume the marker.
1015 a.pendingCancels.Del(call.SessionID)
1016 a.messageQueue.Del(call.SessionID)
1017 mu.Unlock()
1018 return result, err
1019 }
1020 queuedMessages, ok := a.messageQueue.Get(call.SessionID)
1021 if !ok || len(queuedMessages) == 0 {
1022 // No queued work. Clear any stale pending-cancel entry as a
1023 // safety net so it can't catch a future run (no-op when unset).
1024 a.pendingCancels.Del(call.SessionID)
1025 mu.Unlock()
1026 return result, err
1027 }
1028 // There are queued messages restart the loop. The recursive Run
1029 // publishes its own RunComplete for the queued prompt, so suppress
1030 // the outer defer's emit to avoid a duplicate event whose Error
1031 // field would belong to the recursive turn but whose MessageID/Text
1032 // would belong to the outer turn.
1033 skipRunComplete = true
1034 firstQueuedMessage := queuedMessages[0]
1035 a.messageQueue.Set(call.SessionID, queuedMessages[1:])
1036 // Reserve a fresh accept for the dequeued prompt before dropping the
1037 // lock so acceptedRuns > 0 across the handoff into the recursive
1038 // Run. This closes the window between this dequeue and the recursive
1039 // Run registering its activeRequests entry: a cancel arriving in
1040 // that window now records a pending cancel (acceptedRuns > 0) that
1041 // the recursive Run's accepted path observes as cancel-on-entry.
1042 firstQueuedMessage.Accepted = a.BeginAccepted(call.SessionID)
1043 mu.Unlock()
1044 return a.Run(ctx, firstQueuedMessage)
1045}
1046
1047func (a *sessionAgent) Summarize(ctx context.Context, sessionID string, opts fantasy.ProviderOptions) error {
1048 if a.IsSessionBusy(sessionID) {
1049 return ErrSessionBusy
1050 }
1051
1052 // Copy mutable fields under lock to avoid races with SetModels.
1053 largeModel := a.largeModel.Get()
1054 systemPromptPrefix := a.systemPromptPrefix.Get()
1055
1056 currentSession, err := a.sessions.Get(ctx, sessionID)
1057 if err != nil {
1058 return fmt.Errorf("failed to get session: %w", err)
1059 }
1060 msgs, err := a.getSessionMessages(ctx, currentSession)
1061 if err != nil {
1062 return err
1063 }
1064 if len(msgs) == 0 {
1065 // Nothing to summarize.
1066 return nil
1067 }
1068
1069 aiMsgs, _ := a.preparePrompt(msgs, largeModel.CatwalkCfg.SupportsImages)
1070
1071 genCtx, cancel := context.WithCancel(ctx)
1072 a.activeRequests.Set(sessionID, cancel)
1073 defer a.activeRequests.Del(sessionID)
1074 defer cancel()
1075 defer func() {
1076 if flushErr := a.messages.FlushAll(ctx); flushErr != nil {
1077 slog.Error("Failed to flush pending message updates after summarize", "error", flushErr)
1078 }
1079 }()
1080
1081 agent := fantasy.NewAgent(
1082 largeModel.Model,
1083 fantasy.WithSystemPrompt(string(summaryPrompt)),
1084 fantasy.WithUserAgent(userAgent),
1085 )
1086 summaryMessage, err := a.messages.Create(ctx, sessionID, message.CreateMessageParams{
1087 Role: message.Assistant,
1088 Model: largeModel.ModelCfg.Model,
1089 Provider: largeModel.ModelCfg.Provider,
1090 IsSummaryMessage: true,
1091 })
1092 if err != nil {
1093 return err
1094 }
1095
1096 summaryPromptText := buildSummaryPrompt(currentSession.Todos)
1097
1098 resp, err := agent.Stream(genCtx, fantasy.AgentStreamCall{
1099 Prompt: summaryPromptText,
1100 Messages: aiMsgs,
1101 ProviderOptions: opts,
1102 PrepareStep: func(callContext context.Context, options fantasy.PrepareStepFunctionOptions) (_ context.Context, prepared fantasy.PrepareStepResult, err error) {
1103 prepared.Messages = options.Messages
1104 if systemPromptPrefix != "" {
1105 prepared.Messages = append([]fantasy.Message{fantasy.NewSystemMessage(systemPromptPrefix)}, prepared.Messages...)
1106 }
1107 return callContext, prepared, nil
1108 },
1109 OnReasoningDelta: func(id string, text string) error {
1110 summaryMessage.AppendReasoningContent(text)
1111 return a.messages.Update(genCtx, summaryMessage)
1112 },
1113 OnReasoningEnd: func(id string, reasoning fantasy.ReasoningContent) error {
1114 // Handle anthropic signature.
1115 if anthropicData, ok := reasoning.ProviderMetadata["anthropic"]; ok {
1116 if signature, ok := anthropicData.(*anthropic.ReasoningOptionMetadata); ok && signature.Signature != "" {
1117 summaryMessage.AppendReasoningSignature(signature.Signature)
1118 }
1119 }
1120 summaryMessage.FinishThinking()
1121 return a.messages.Update(genCtx, summaryMessage)
1122 },
1123 OnTextDelta: func(id, text string) error {
1124 summaryMessage.AppendContent(text)
1125 return a.messages.Update(genCtx, summaryMessage)
1126 },
1127 })
1128 if err != nil {
1129 isCancelErr := errors.Is(err, context.Canceled)
1130 if isCancelErr {
1131 // User cancelled summarize we need to remove the summary message.
1132 deleteErr := a.messages.Delete(ctx, summaryMessage.ID)
1133 return deleteErr
1134 }
1135 // Mark the summary message as finished with an error so the UI
1136 // stops spinning.
1137 summaryMessage.AddFinish(message.FinishReasonError, "Summarization Error", err.Error())
1138 if updateErr := a.messages.Update(ctx, summaryMessage); updateErr != nil {
1139 return updateErr
1140 }
1141 return err
1142 }
1143
1144 summaryMessage.AddFinish(message.FinishReasonEndTurn, "", "")
1145 err = a.messages.Update(genCtx, summaryMessage)
1146 if err != nil {
1147 return err
1148 }
1149
1150 var openrouterCost *float64
1151 for _, step := range resp.Steps {
1152 stepCost := a.openrouterCost(step.ProviderMetadata)
1153 if stepCost != nil {
1154 newCost := *stepCost
1155 if openrouterCost != nil {
1156 newCost += *openrouterCost
1157 }
1158 openrouterCost = &newCost
1159 }
1160 }
1161
1162 a.updateSessionUsage(largeModel, ¤tSession, resp.TotalUsage, openrouterCost, false)
1163
1164 // Just in case, get just the last usage info.
1165 usage := resp.Response.Usage
1166 currentSession.SummaryMessageID = summaryMessage.ID
1167 currentSession.CompletionTokens = summaryCompletionTokens(usage, summaryMessage)
1168 currentSession.PromptTokens = 0
1169 currentSession.EstimatedUsage = usageIsZero(usage)
1170 _, err = a.sessions.Save(genCtx, currentSession)
1171 if err != nil {
1172 return err
1173 }
1174
1175 // Release the active request before processing queued messages so that
1176 // Run() does not see the session as busy.
1177 a.activeRequests.Del(sessionID)
1178 cancel()
1179
1180 // Process any messages that were queued while summarizing.
1181 queuedMessages, ok := a.messageQueue.Get(sessionID)
1182 if !ok || len(queuedMessages) == 0 {
1183 return nil
1184 }
1185 firstQueuedMessage := queuedMessages[0]
1186 a.messageQueue.Set(sessionID, queuedMessages[1:])
1187 _, qErr := a.Run(ctx, firstQueuedMessage)
1188 return qErr
1189}
1190
1191func (a *sessionAgent) getCacheControlOptions() fantasy.ProviderOptions {
1192 if t, _ := strconv.ParseBool(os.Getenv("CRUSH_DISABLE_ANTHROPIC_CACHE")); t {
1193 return fantasy.ProviderOptions{}
1194 }
1195 return fantasy.ProviderOptions{
1196 anthropic.Name: &anthropic.ProviderCacheControlOptions{
1197 CacheControl: anthropic.CacheControl{Type: "ephemeral"},
1198 },
1199 bedrock.Name: &anthropic.ProviderCacheControlOptions{
1200 CacheControl: anthropic.CacheControl{Type: "ephemeral"},
1201 },
1202 vercel.Name: &anthropic.ProviderCacheControlOptions{
1203 CacheControl: anthropic.CacheControl{Type: "ephemeral"},
1204 },
1205 }
1206}
1207
1208func (a *sessionAgent) createUserMessage(ctx context.Context, call SessionAgentCall) (message.Message, error) {
1209 parts := []message.ContentPart{message.TextContent{Text: call.Prompt}}
1210 var attachmentParts []message.ContentPart
1211 for _, attachment := range call.Attachments {
1212 attachmentParts = append(attachmentParts, message.BinaryContent{Path: attachment.FilePath, MIMEType: attachment.MimeType, Data: attachment.Content})
1213 }
1214 parts = append(parts, attachmentParts...)
1215 msg, err := a.messages.Create(ctx, call.SessionID, message.CreateMessageParams{
1216 Role: message.User,
1217 Parts: parts,
1218 })
1219 if err != nil {
1220 return message.Message{}, fmt.Errorf("failed to create user message: %w", err)
1221 }
1222 return msg, nil
1223}
1224
1225func (a *sessionAgent) preparePrompt(msgs []message.Message, supportsImages bool, attachments ...message.Attachment) ([]fantasy.Message, []fantasy.FilePart) {
1226 var history []fantasy.Message
1227 if !a.isSubAgent {
1228 history = append(history, fantasy.NewUserMessage(
1229 fmt.Sprintf(
1230 "<system_reminder>%s</system_reminder>",
1231 `This is a reminder that your todo list is currently empty. DO NOT mention this to the user explicitly because they are already aware.
1232If you are working on tasks that would benefit from a todo list please use the "todos" tool to create one.
1233If not, please feel free to ignore. Again do not mention this message to the user.`,
1234 ),
1235 ))
1236 }
1237 // Collect all tool call IDs present in assistant messages and all tool
1238 // result IDs present in tool messages. This lets us detect both orphaned
1239 // tool results (result without a call) and orphaned tool calls (call
1240 // without a result).
1241 knownToolCallIDs := make(map[string]struct{})
1242 knownToolResultIDs := make(map[string]struct{})
1243 for _, m := range msgs {
1244 switch m.Role {
1245 case message.Assistant:
1246 for _, tc := range m.ToolCalls() {
1247 knownToolCallIDs[tc.ID] = struct{}{}
1248 }
1249 case message.Tool:
1250 for _, tr := range m.ToolResults() {
1251 knownToolResultIDs[tr.ToolCallID] = struct{}{}
1252 }
1253 }
1254 }
1255
1256 for _, m := range msgs {
1257 if len(m.Parts) == 0 {
1258 continue
1259 }
1260 // Assistant message without content or tool calls (cancelled before it returned anything).
1261 if m.Role == message.Assistant && len(m.ToolCalls()) == 0 && m.Content().Text == "" && m.ReasoningContent().String() == "" {
1262 continue
1263 }
1264 if m.Role == message.Tool {
1265 if msg, ok := filterOrphanedToolResults(m, knownToolCallIDs); ok {
1266 history = append(history, msg)
1267 }
1268 continue
1269 }
1270 aiMsgs := m.ToAIMessage()
1271 if !supportsImages {
1272 for i := range aiMsgs {
1273 if aiMsgs[i].Role == fantasy.MessageRoleUser {
1274 aiMsgs[i].Content = filterFileParts(aiMsgs[i].Content)
1275 }
1276 }
1277 }
1278 history = append(history, aiMsgs...)
1279
1280 if m.Role == message.Assistant {
1281 if msg, ok := syntheticToolResultsForOrphanedCalls(m, knownToolResultIDs); ok {
1282 history = append(history, msg)
1283 }
1284 }
1285 }
1286
1287 var files []fantasy.FilePart
1288 for _, attachment := range attachments {
1289 if attachment.IsText() {
1290 continue
1291 }
1292 files = append(files, fantasy.FilePart{
1293 Filename: attachment.FileName,
1294 Data: attachment.Content,
1295 MediaType: attachment.MimeType,
1296 })
1297 }
1298
1299 return history, files
1300}
1301
1302// filterFileParts removes fantasy.FilePart entries from a slice of message
1303// parts. Used to strip image attachments from historical user messages when
1304// the current model does not support them.
1305func filterFileParts(parts []fantasy.MessagePart) []fantasy.MessagePart {
1306 filtered := make([]fantasy.MessagePart, 0, len(parts))
1307 for _, part := range parts {
1308 if _, ok := fantasy.AsMessagePart[fantasy.FilePart](part); ok {
1309 continue
1310 }
1311 filtered = append(filtered, part)
1312 }
1313 return filtered
1314}
1315
1316// filterOrphanedToolResults converts a tool message to a fantasy.Message,
1317// dropping any tool result parts whose tool_call_id has no matching tool call
1318// in the known set. An orphaned result causes API validation to fail on every
1319// subsequent turn, permanently locking the session. Returns the filtered
1320// message and true if at least one valid part remains.
1321func filterOrphanedToolResults(m message.Message, knownToolCallIDs map[string]struct{}) (fantasy.Message, bool) {
1322 aiMsgs := m.ToAIMessage()
1323 if len(aiMsgs) == 0 {
1324 return fantasy.Message{}, false
1325 }
1326 var validParts []fantasy.MessagePart
1327 for _, part := range aiMsgs[0].Content {
1328 tr, ok := fantasy.AsMessagePart[fantasy.ToolResultPart](part)
1329 if !ok {
1330 validParts = append(validParts, part)
1331 continue
1332 }
1333 if _, known := knownToolCallIDs[tr.ToolCallID]; known {
1334 validParts = append(validParts, part)
1335 } else {
1336 slog.Warn(
1337 "Dropping orphaned tool result with no matching tool call",
1338 "tool_call_id", tr.ToolCallID,
1339 )
1340 }
1341 }
1342 if len(validParts) == 0 {
1343 return fantasy.Message{}, false
1344 }
1345 msg := aiMsgs[0]
1346 msg.Content = validParts
1347 return msg, true
1348}
1349
1350// syntheticToolResultsForOrphanedCalls returns a tool message containing
1351// synthetic tool results for any tool calls in the assistant message that
1352// have no matching result in knownToolResultIDs. LLM APIs require every
1353// tool_use to be immediately followed by a tool_result; an interrupted
1354// session can leave orphaned tool_use blocks that permanently lock the
1355// conversation. Returns the message and true if any synthetic results were
1356// produced.
1357func syntheticToolResultsForOrphanedCalls(m message.Message, knownToolResultIDs map[string]struct{}) (fantasy.Message, bool) {
1358 var syntheticParts []fantasy.MessagePart
1359 for _, tc := range m.ToolCalls() {
1360 if _, hasResult := knownToolResultIDs[tc.ID]; hasResult {
1361 continue
1362 }
1363 slog.Warn(
1364 "Injecting synthetic tool result for orphaned tool call",
1365 "tool_call_id", tc.ID,
1366 "tool_name", tc.Name,
1367 )
1368 syntheticParts = append(syntheticParts, fantasy.ToolResultPart{
1369 ToolCallID: tc.ID,
1370 Output: fantasy.ToolResultOutputContentError{
1371 Error: errors.New("tool call was interrupted and did not produce a result, you may retry this call if the result is still needed"),
1372 },
1373 })
1374 }
1375 if len(syntheticParts) == 0 {
1376 return fantasy.Message{}, false
1377 }
1378 return fantasy.Message{
1379 Role: fantasy.MessageRoleTool,
1380 Content: syntheticParts,
1381 }, true
1382}
1383
1384func (a *sessionAgent) getSessionMessages(ctx context.Context, session session.Session) ([]message.Message, error) {
1385 msgs, err := a.messages.List(ctx, session.ID)
1386 if err != nil {
1387 return nil, fmt.Errorf("failed to list messages: %w", err)
1388 }
1389
1390 if session.SummaryMessageID != "" {
1391 summaryMsgIndex := -1
1392 for i, msg := range msgs {
1393 if msg.ID == session.SummaryMessageID {
1394 summaryMsgIndex = i
1395 break
1396 }
1397 }
1398 if summaryMsgIndex != -1 {
1399 msgs = msgs[summaryMsgIndex:]
1400 msgs[0].Role = message.User
1401 }
1402 }
1403 return msgs, nil
1404}
1405
1406// generateTitle generates a session titled based on the initial prompt.
1407func (a *sessionAgent) generateTitle(ctx context.Context, sessionID string, userPrompt string) {
1408 if userPrompt == "" {
1409 return
1410 }
1411
1412 smallModel := a.smallModel.Get()
1413 largeModel := a.largeModel.Get()
1414 systemPromptPrefix := a.systemPromptPrefix.Get()
1415
1416 var maxOutputTokens int64 = 40
1417 if smallModel.CatwalkCfg.CanReason {
1418 maxOutputTokens = smallModel.CatwalkCfg.DefaultMaxTokens
1419 }
1420
1421 newAgent := func(m fantasy.LanguageModel, p []byte, tok int64) fantasy.Agent {
1422 return fantasy.NewAgent(
1423 m,
1424 fantasy.WithSystemPrompt(string(p)+"\n /no_think"),
1425 fantasy.WithMaxOutputTokens(tok),
1426 fantasy.WithUserAgent(userAgent),
1427 )
1428 }
1429
1430 streamCall := fantasy.AgentStreamCall{
1431 Prompt: fmt.Sprintf("Generate a concise title for the following content:\n\n%s\n <think>\n\n</think>", userPrompt),
1432 PrepareStep: func(callCtx context.Context, opts fantasy.PrepareStepFunctionOptions) (_ context.Context, prepared fantasy.PrepareStepResult, err error) {
1433 prepared.Messages = opts.Messages
1434 if systemPromptPrefix != "" {
1435 prepared.Messages = append([]fantasy.Message{
1436 fantasy.NewSystemMessage(systemPromptPrefix),
1437 }, prepared.Messages...)
1438 }
1439 return callCtx, prepared, nil
1440 },
1441 }
1442
1443 // Use the small model to generate the title.
1444 model := smallModel
1445 agent := newAgent(model.Model, titlePrompt, maxOutputTokens)
1446 resp, err := agent.Stream(ctx, streamCall)
1447 if err == nil {
1448 // We successfully generated a title with the small model.
1449 slog.Debug("Generated title with small model")
1450 } else {
1451 // It didn't work. Let's try with the big model.
1452 slog.Error("Error generating title with small model; trying big model", "err", err)
1453 model = largeModel
1454 agent = newAgent(model.Model, titlePrompt, maxOutputTokens)
1455 resp, err = agent.Stream(ctx, streamCall)
1456 if err == nil {
1457 slog.Debug("Generated title with large model")
1458 } else {
1459 // Welp, the large model didn't work either. Use the default
1460 // session name and return.
1461 slog.Error("Error generating title with large model", "err", err)
1462 saveErr := a.sessions.Rename(ctx, sessionID, DefaultSessionName)
1463 if saveErr != nil {
1464 slog.Error("Failed to save session title", "error", saveErr)
1465 }
1466 return
1467 }
1468 }
1469
1470 if resp == nil {
1471 // Actually, we didn't get a response so we can't. Use the default
1472 // session name and return.
1473 slog.Error("Response is nil; can't generate title")
1474 saveErr := a.sessions.Rename(ctx, sessionID, DefaultSessionName)
1475 if saveErr != nil {
1476 slog.Error("Failed to save session title", "error", saveErr)
1477 }
1478 return
1479 }
1480
1481 // Clean up title.
1482 var title string
1483 title = strings.ReplaceAll(resp.Response.Content.Text(), "\n", " ")
1484
1485 // Remove thinking tags if present.
1486 title = thinkTagRegex.ReplaceAllString(title, "")
1487 title = orphanThinkTagRegex.ReplaceAllString(title, "")
1488
1489 title = strings.TrimSpace(title)
1490 title = cmp.Or(title, DefaultSessionName)
1491
1492 // Calculate usage and cost.
1493 var openrouterCost *float64
1494 for _, step := range resp.Steps {
1495 stepCost := a.openrouterCost(step.ProviderMetadata)
1496 if stepCost != nil {
1497 newCost := *stepCost
1498 if openrouterCost != nil {
1499 newCost += *openrouterCost
1500 }
1501 openrouterCost = &newCost
1502 }
1503 }
1504
1505 modelConfig := model.CatwalkCfg
1506 cost := modelConfig.CostPer1MInCached/1e6*float64(resp.TotalUsage.CacheCreationTokens) +
1507 modelConfig.CostPer1MOutCached/1e6*float64(resp.TotalUsage.CacheReadTokens) +
1508 modelConfig.CostPer1MIn/1e6*float64(resp.TotalUsage.InputTokens) +
1509 modelConfig.CostPer1MOut/1e6*float64(resp.TotalUsage.OutputTokens)
1510
1511 // Use override cost if available (e.g., from OpenRouter).
1512 if openrouterCost != nil {
1513 cost = *openrouterCost
1514 }
1515
1516 // Skip cost accumulation
1517 if model.FlatRate {
1518 cost = 0
1519 }
1520
1521 promptTokens := resp.TotalUsage.InputTokens + resp.TotalUsage.CacheCreationTokens
1522 completionTokens := resp.TotalUsage.OutputTokens
1523
1524 // Atomically update only title and usage fields to avoid overriding other
1525 // concurrent session updates.
1526 saveErr := a.sessions.UpdateTitleAndUsage(ctx, sessionID, title, promptTokens, completionTokens, cost)
1527 if saveErr != nil {
1528 slog.Error("Failed to save session title and usage", "error", saveErr)
1529 return
1530 }
1531}
1532
1533func (a *sessionAgent) openrouterCost(metadata fantasy.ProviderMetadata) *float64 {
1534 openrouterMetadata, ok := metadata[openrouter.Name]
1535 if !ok {
1536 return nil
1537 }
1538
1539 opts, ok := openrouterMetadata.(*openrouter.ProviderMetadata)
1540 if !ok {
1541 return nil
1542 }
1543 return &opts.Usage.Cost
1544}
1545
1546func (a *sessionAgent) updateSessionUsage(model Model, session *session.Session, usage fantasy.Usage, overrideCost *float64, estimated bool) {
1547 if !usageIsZero(usage) {
1548 session.EstimatedUsage = estimated
1549 }
1550
1551 modelConfig := model.CatwalkCfg
1552 cost := modelConfig.CostPer1MInCached/1e6*float64(usage.CacheCreationTokens) +
1553 modelConfig.CostPer1MOutCached/1e6*float64(usage.CacheReadTokens) +
1554 modelConfig.CostPer1MIn/1e6*float64(usage.InputTokens) +
1555 modelConfig.CostPer1MOut/1e6*float64(usage.OutputTokens)
1556
1557 if !estimated {
1558 a.eventTokensUsed(session.ID, model, usage, cost)
1559 }
1560
1561 if estimated {
1562 cost = 0
1563 } else {
1564 // Use override cost if available (e.g., from OpenRouter).
1565 if overrideCost != nil {
1566 cost = *overrideCost
1567 }
1568
1569 // Skip cost accumulation
1570 if model.FlatRate {
1571 cost = 0
1572 }
1573 }
1574
1575 session.Cost += cost
1576 updateSessionTokenCounters(session, usage)
1577}
1578
1579func updateSessionTokenCounters(session *session.Session, usage fantasy.Usage) {
1580 if usage.OutputTokens != 0 {
1581 session.CompletionTokens = usage.OutputTokens
1582 }
1583 if promptTokens := usage.InputTokens + usage.CacheReadTokens; promptTokens != 0 {
1584 session.PromptTokens = promptTokens
1585 }
1586}
1587
1588func summaryCompletionTokens(usage fantasy.Usage, summaryMessage message.Message) int64 {
1589 if usage.OutputTokens != 0 {
1590 return usage.OutputTokens
1591 }
1592 return approxTokenCount(summaryMessage.Content().Text) + approxTokenCount(summaryMessage.ReasoningContent().String())
1593}
1594
1595func (a *sessionAgent) Cancel(sessionID string) {
1596 // Serialize against the dispatch handoff in Run so the accepted ->
1597 // (cancel-on-entry | queued | active) transition is atomic against
1598 // this cancel. Every cancel observes at least one of: an active
1599 // request, an accepted run (recorded as a pending cancel), or a
1600 // queue entry it then clears. If none of those hold, an idle Escape
1601 // is a true no-op and must not poison the next prompt.
1602 mu := a.sessionMu(sessionID)
1603 mu.Lock()
1604 defer mu.Unlock()
1605
1606 // Cancel regular requests. Don't use Take() here - we need the entry to
1607 // remain in activeRequests so IsBusy() returns true until the goroutine
1608 // fully completes (including error handling that may access the DB).
1609 // The defer in processRequest will clean up the entry.
1610 if cancel, ok := a.activeRequests.Get(sessionID); ok && cancel != nil {
1611 slog.Debug("Request cancellation initiated", "session_id", sessionID)
1612 cancel()
1613 }
1614
1615 // Also check for summarize requests.
1616 if cancel, ok := a.activeRequests.Get(sessionID + "-summarize"); ok && cancel != nil {
1617 slog.Debug("Summarize cancellation initiated", "session_id", sessionID)
1618 cancel()
1619 }
1620
1621 // Record a pending cancel only when a dispatched-but-not-yet-active
1622 // run exists. This catches a run still in the goroutine scheduler or
1623 // about to enter Run's busy-queue branch, while leaving an idle
1624 // session untouched. Active and accepted are not mutually exclusive:
1625 // when a run is active and a follow-up has been accepted, both the
1626 // cancel above and this pending record fire.
1627 a.acceptedMu.Lock()
1628 count, ok := a.acceptedRuns.Get(sessionID)
1629 a.acceptedMu.Unlock()
1630 if ok && count > 0 {
1631 slog.Debug("Recording pending cancel for accepted run", "session_id", sessionID)
1632 a.pendingCancels.Set(sessionID, struct{}{})
1633 }
1634
1635 if a.QueuedPrompts(sessionID) > 0 {
1636 slog.Debug("Clearing queued prompts", "session_id", sessionID)
1637 a.messageQueue.Del(sessionID)
1638 }
1639}
1640
1641func (a *sessionAgent) ClearQueue(sessionID string) {
1642 if a.QueuedPrompts(sessionID) > 0 {
1643 slog.Debug("Clearing queued prompts", "session_id", sessionID)
1644 a.messageQueue.Del(sessionID)
1645 }
1646}
1647
1648func (a *sessionAgent) CancelAll() {
1649 if !a.IsBusy() {
1650 return
1651 }
1652 for key := range a.activeRequests.Seq2() {
1653 a.Cancel(key) // key is sessionID
1654 }
1655
1656 timeout := time.After(5 * time.Second)
1657 for a.IsBusy() {
1658 select {
1659 case <-timeout:
1660 return
1661 default:
1662 time.Sleep(200 * time.Millisecond)
1663 }
1664 }
1665}
1666
1667func (a *sessionAgent) IsBusy() bool {
1668 var busy bool
1669 for cancelFunc := range a.activeRequests.Seq() {
1670 if cancelFunc != nil {
1671 busy = true
1672 break
1673 }
1674 }
1675 return busy
1676}
1677
1678func (a *sessionAgent) IsSessionBusy(sessionID string) bool {
1679 _, busy := a.activeRequests.Get(sessionID)
1680 return busy
1681}
1682
1683func (a *sessionAgent) QueuedPrompts(sessionID string) int {
1684 l, ok := a.messageQueue.Get(sessionID)
1685 if !ok {
1686 return 0
1687 }
1688 return len(l)
1689}
1690
1691func (a *sessionAgent) QueuedPromptsList(sessionID string) []string {
1692 l, ok := a.messageQueue.Get(sessionID)
1693 if !ok {
1694 return nil
1695 }
1696 prompts := make([]string, len(l))
1697 for i, call := range l {
1698 prompts[i] = call.Prompt
1699 }
1700 return prompts
1701}
1702
1703func (a *sessionAgent) SetModels(large Model, small Model) {
1704 a.largeModel.Set(large)
1705 a.smallModel.Set(small)
1706}
1707
1708func (a *sessionAgent) SetTools(tools []fantasy.AgentTool) {
1709 a.tools.SetSlice(tools)
1710}
1711
1712func (a *sessionAgent) SetSystemPrompt(systemPrompt string) {
1713 a.systemPrompt.Set(systemPrompt)
1714}
1715
1716func (a *sessionAgent) Model() Model {
1717 return a.largeModel.Get()
1718}
1719
1720// convertToToolResult converts a fantasy tool result to a message tool result.
1721func (a *sessionAgent) convertToToolResult(result fantasy.ToolResultContent) message.ToolResult {
1722 baseResult := message.ToolResult{
1723 ToolCallID: result.ToolCallID,
1724 Name: result.ToolName,
1725 Metadata: result.ClientMetadata,
1726 }
1727
1728 switch result.Result.GetType() {
1729 case fantasy.ToolResultContentTypeText:
1730 if r, ok := fantasy.AsToolResultOutputType[fantasy.ToolResultOutputContentText](result.Result); ok {
1731 baseResult.Content = r.Text
1732 }
1733 case fantasy.ToolResultContentTypeError:
1734 if r, ok := fantasy.AsToolResultOutputType[fantasy.ToolResultOutputContentError](result.Result); ok {
1735 baseResult.Content = r.Error.Error()
1736 baseResult.IsError = true
1737 }
1738 case fantasy.ToolResultContentTypeMedia:
1739 if r, ok := fantasy.AsToolResultOutputType[fantasy.ToolResultOutputContentMedia](result.Result); ok {
1740 if !stringext.IsValidBase64(r.Data) {
1741 slog.Warn(
1742 "Tool returned media with invalid base64 data, discarding image",
1743 "tool", result.ToolName,
1744 "tool_call_id", result.ToolCallID,
1745 )
1746 baseResult.Content = "Tool returned image data with invalid encoding"
1747 baseResult.IsError = true
1748 } else {
1749 content := r.Text
1750 if content == "" {
1751 content = fmt.Sprintf("Loaded %s content", r.MediaType)
1752 }
1753 baseResult.Content = content
1754 baseResult.Data = r.Data
1755 baseResult.MIMEType = r.MediaType
1756 }
1757 }
1758 }
1759
1760 return baseResult
1761}
1762
1763// workaroundProviderMediaLimitations converts media content in tool results to
1764// user messages for providers that don't natively support images in tool results.
1765//
1766// Problem: OpenAI, Google, OpenRouter, and other OpenAI-compatible providers
1767// don't support sending images/media in tool result messages - they only accept
1768// text in tool results. However, they DO support images in user messages.
1769//
1770// If we send media in tool results to these providers, the API returns an error.
1771//
1772// Solution: For these providers, we:
1773// 1. Replace the media in the tool result with a text placeholder
1774// 2. Inject a user message immediately after with the image as a file attachment
1775// 3. This maintains the tool execution flow while working around API limitations
1776//
1777// Anthropic and Bedrock support images natively in tool results, so we skip
1778// this workaround for them.
1779//
1780// Example transformation:
1781//
1782// BEFORE: [tool result: image data]
1783// AFTER: [tool result: "Image loaded - see attached"], [user: image attachment]
1784func (a *sessionAgent) workaroundProviderMediaLimitations(messages []fantasy.Message, largeModel Model) []fantasy.Message {
1785 providerSupportsMedia := largeModel.ModelCfg.Provider == string(catwalk.InferenceProviderAnthropic) ||
1786 largeModel.ModelCfg.Provider == string(catwalk.InferenceProviderBedrock) ||
1787 largeModel.ModelCfg.Provider == string(catwalk.InferenceProviderBedrockEurope)
1788
1789 if providerSupportsMedia {
1790 return messages
1791 }
1792
1793 convertedMessages := make([]fantasy.Message, 0, len(messages))
1794
1795 for _, msg := range messages {
1796 if msg.Role != fantasy.MessageRoleTool {
1797 convertedMessages = append(convertedMessages, msg)
1798 continue
1799 }
1800
1801 textParts := make([]fantasy.MessagePart, 0, len(msg.Content))
1802 var mediaFiles []fantasy.FilePart
1803
1804 for _, part := range msg.Content {
1805 toolResult, ok := fantasy.AsMessagePart[fantasy.ToolResultPart](part)
1806 if !ok {
1807 textParts = append(textParts, part)
1808 continue
1809 }
1810
1811 if media, ok := fantasy.AsToolResultOutputType[fantasy.ToolResultOutputContentMedia](toolResult.Output); ok {
1812 decoded, err := base64.StdEncoding.DecodeString(media.Data)
1813 if err != nil {
1814 slog.Warn("Failed to decode media data", "error", err)
1815 textParts = append(textParts, part)
1816 continue
1817 }
1818
1819 mediaFiles = append(mediaFiles, fantasy.FilePart{
1820 Data: decoded,
1821 MediaType: media.MediaType,
1822 Filename: fmt.Sprintf("tool-result-%s", toolResult.ToolCallID),
1823 })
1824
1825 textParts = append(textParts, fantasy.ToolResultPart{
1826 ToolCallID: toolResult.ToolCallID,
1827 Output: fantasy.ToolResultOutputContentText{
1828 Text: "[Image/media content loaded - see attached file]",
1829 },
1830 ProviderOptions: toolResult.ProviderOptions,
1831 })
1832 } else {
1833 textParts = append(textParts, part)
1834 }
1835 }
1836
1837 convertedMessages = append(convertedMessages, fantasy.Message{
1838 Role: fantasy.MessageRoleTool,
1839 Content: textParts,
1840 })
1841
1842 if len(mediaFiles) > 0 {
1843 convertedMessages = append(convertedMessages, fantasy.NewUserMessage(
1844 "Here is the media content from the tool result:",
1845 mediaFiles...,
1846 ))
1847 }
1848 }
1849
1850 return convertedMessages
1851}
1852
1853// buildSummaryPrompt constructs the prompt text for session summarization.
1854func buildSummaryPrompt(todos []session.Todo) string {
1855 var sb strings.Builder
1856 sb.WriteString("Provide a detailed summary of our conversation above.")
1857 if len(todos) > 0 {
1858 sb.WriteString("\n\n## Current Todo List\n\n")
1859 for _, t := range todos {
1860 fmt.Fprintf(&sb, "- [%s] %s\n", t.Status, t.Content)
1861 }
1862 sb.WriteString("\nInclude these tasks and their statuses in your summary. ")
1863 sb.WriteString("Instruct the resuming assistant to use the `todos` tool to continue tracking progress on these tasks.")
1864 }
1865 return sb.String()
1866}
1867
1868func providerRetryLogFields(err *fantasy.ProviderError, delay time.Duration) []any {
1869 fields := []any{
1870 "retry_delay", delay.String(),
1871 }
1872 if err == nil {
1873 return fields
1874 }
1875 fields = append(fields, "status_code", err.StatusCode)
1876 if err.Title != "" {
1877 fields = append(fields, "title", err.Title)
1878 }
1879 if err.Message != "" {
1880 fields = append(fields, "message", err.Message)
1881 }
1882 return fields
1883}