1// Package agent is the core orchestration layer for Crush AI agents.
2//
3// It provides session-based AI agent functionality for managing
4// conversations, tool execution, and message handling. It coordinates
5// interactions between language models, messages, sessions, and tools while
6// handling features like automatic summarization, queuing, and token
7// management.
8package agent
9
10import (
11 "cmp"
12 "context"
13 _ "embed"
14 "encoding/base64"
15 "errors"
16 "fmt"
17 "log/slog"
18 "net/http"
19 "os"
20 "regexp"
21 "strconv"
22 "strings"
23 "sync"
24 "sync/atomic"
25 "time"
26
27 "charm.land/catwalk/pkg/catwalk"
28 "charm.land/fantasy"
29 "charm.land/fantasy/providers/anthropic"
30 "charm.land/fantasy/providers/bedrock"
31 "charm.land/fantasy/providers/google"
32 "charm.land/fantasy/providers/openai"
33 "charm.land/fantasy/providers/openrouter"
34 "charm.land/fantasy/providers/vercel"
35 "charm.land/lipgloss/v2"
36 "github.com/charmbracelet/crush/internal/agent/hyper"
37 "github.com/charmbracelet/crush/internal/agent/notify"
38 "github.com/charmbracelet/crush/internal/agent/tools"
39 "github.com/charmbracelet/crush/internal/agent/tools/mcp"
40 "github.com/charmbracelet/crush/internal/config"
41 "github.com/charmbracelet/crush/internal/csync"
42 "github.com/charmbracelet/crush/internal/message"
43 "github.com/charmbracelet/crush/internal/pubsub"
44 "github.com/charmbracelet/crush/internal/session"
45 "github.com/charmbracelet/crush/internal/stringext"
46 "github.com/charmbracelet/crush/internal/version"
47 "github.com/charmbracelet/x/exp/charmtone"
48)
49
50const (
51 DefaultSessionName = "Untitled Session"
52
53 // Constants for auto-summarization thresholds
54 largeContextWindowThreshold = 200_000
55 largeContextWindowBuffer = 20_000
56 smallContextWindowRatio = 0.2
57)
58
59var userAgent = fmt.Sprintf("Charm-Crush/%s (https://charm.land/crush)", version.Version)
60
61//go:embed templates/title.md
62var titlePrompt []byte
63
64//go:embed templates/summary.md
65var summaryPrompt []byte
66
67// Used to remove <think> tags from generated titles.
68var (
69 thinkTagRegex = regexp.MustCompile(`(?s)<think>.*?</think>`)
70 orphanThinkTagRegex = regexp.MustCompile(`</?think>`)
71)
72
73type SessionAgentCall struct {
74 SessionID string
75 // RunID, when non-empty, is the caller-supplied correlator that
76 // gets echoed back on the notify.RunComplete event emitted for
77 // this turn. It is preserved when the call is enqueued behind a
78 // busy session so the queued turn's terminal event is still
79 // recognisable to the original caller. Callers that need a
80 // reliable completion contract (e.g. `crush run` against a
81 // session that may be busy) MUST set it; SessionID alone is
82 // ambiguous when concurrent turns share the same session.
83 RunID string
84 Prompt string
85 ProviderOptions fantasy.ProviderOptions
86 Attachments []message.Attachment
87 MaxOutputTokens int64
88 Temperature *float64
89 TopP *float64
90 TopK *int64
91 FrequencyPenalty *float64
92 PresencePenalty *float64
93 NonInteractive bool
94 // OnComplete, when non-nil, replaces the default RunComplete
95 // publish path: the inner Run hands the terminal payload to this
96 // callback instead of emitting it on the RunComplete broker. The
97 // coordinator uses this hook to coalesce the unauthorized →
98 // re-auth → retry chain into a single user-visible terminal
99 // event, so non-interactive clients (e.g. `crush run`) don't
100 // exit on a stale failed-attempt RunComplete before the
101 // successful retry. It is intentionally stripped when queueing
102 // a busy-session call (see Run): the originating
103 // coordinator.Run has long returned by the time the queued
104 // recursion drains, so falling back to the default broker
105 // publish keeps the event visible to subscribers.
106 OnComplete func(notify.RunComplete)
107 // Accepted, when non-nil, is the accept reservation taken by
108 // BeginAccepted before the call was dispatched onto a goroutine
109 // (the client/server fire-and-forget path). Run consumes it under
110 // dispatchMu[SessionID] once the accepted -> (cancel-on-entry |
111 // queued | active) transition has been chosen. When nil
112 // (in-process / local callers like AppWorkspace), behavior is
113 // unchanged and no accept tracking applies.
114 Accepted *AcceptedRun
115 // acceptSeq carries the accept sequence of the handle that produced
116 // this call after it has been enqueued and its Accepted handle
117 // stripped. The queue-drain paths compare it against a session's
118 // cancel mark so a follow-up queued before a cancel is dropped while
119 // one queued after the cancel survives. 0 means untracked (an
120 // in-process enqueue with no accept reservation), which the drain
121 // paths treat as covered by any present mark, preserving the
122 // pre-sequence behavior.
123 acceptSeq uint64
124}
125
126type SessionAgent interface {
127 Run(context.Context, SessionAgentCall) (*fantasy.AgentResult, error)
128 BeginAccepted(sessionID string) *AcceptedRun
129 SetModels(large Model, small Model)
130 SetTools(tools []fantasy.AgentTool)
131 SetSystemPrompt(systemPrompt string)
132 Cancel(sessionID string)
133 CancelAll()
134 IsSessionBusy(sessionID string) bool
135 IsBusy() bool
136 QueuedPrompts(sessionID string) int
137 QueuedPromptsList(sessionID string) []string
138 ClearQueue(sessionID string)
139 Summarize(context.Context, string, fantasy.ProviderOptions) error
140 Model() Model
141}
142
143type Model struct {
144 Model fantasy.LanguageModel
145 CatwalkCfg catwalk.Model
146 ModelCfg config.SelectedModel
147 FlatRate bool
148}
149
150type sessionAgent struct {
151 largeModel *csync.Value[Model]
152 smallModel *csync.Value[Model]
153 systemPromptPrefix *csync.Value[string]
154 systemPrompt *csync.Value[string]
155 tools *csync.Slice[fantasy.AgentTool]
156
157 isSubAgent bool
158 sessions session.Service
159 messages message.Service
160 disableAutoSummarize bool
161 isYolo bool
162 notify pubsub.Publisher[notify.Notification]
163 runComplete pubsub.Publisher[notify.RunComplete]
164
165 messageQueue *csync.Map[string, []SessionAgentCall]
166 activeRequests *csync.Map[string, context.CancelFunc]
167
168 // dispatchMu holds a per-session mutex that serializes the
169 // accepted -> (cancel-on-entry | queued | active) transition in
170 // Run against a concurrent Cancel. The lock is held only during
171 // the brief handoff (no DB or LLM I/O under the lock).
172 dispatchMu *csync.Map[string, *sync.Mutex]
173 // acceptedRuns counts dispatched-but-not-yet-active runs per
174 // session. A counter > 0 means a dispatched prompt is in flight
175 // and has not yet completed the dispatch handoff in Run. Only
176 // BeginAccepted increments it; only AcceptedRun.Close decrements
177 // it.
178 acceptedRuns *csync.Map[string, int]
179 // cancelMark records, per session, a high-water accept sequence: an
180 // accepted handle is canceled by it iff the handle's sequence is at
181 // or below the mark. Cancel raises the mark to the latest sequence
182 // assigned at cancel time, so a single Cancel covers every prompt
183 // accepted-but-not-yet-active then, while a prompt accepted later
184 // (higher sequence) is never poisoned. Absent or 0 means no pending
185 // cancel. It is only raised by Cancel when acceptedRuns > 0, so an
186 // idle Escape never records a mark.
187 cancelMark *csync.Map[string, uint64]
188 // dispatchMuCreate guards lazy creation of per-session entries in
189 // dispatchMu so two goroutines can't race to lock different mutex
190 // instances for the same session.
191 dispatchMuCreate sync.Mutex
192 // acceptedMu serializes increments/decrements of acceptedRuns and
193 // the assignment of accept sequence numbers from acceptSeqGen. It
194 // is separate from dispatchMu so AcceptedRun.Close (which may run
195 // while Run holds dispatchMu for the same session) does not
196 // deadlock by re-entering the dispatch lock.
197 acceptedMu sync.Mutex
198 // acceptSeqGen is the monotonic source of accept sequence numbers.
199 // Each BeginAccepted increments it under acceptedMu and stamps the
200 // returned handle, so sequences strictly increase in accept order
201 // across the agent. Cancel uses its current value as the per-session
202 // high-water mark.
203 acceptSeqGen uint64
204}
205
206type SessionAgentOptions struct {
207 LargeModel Model
208 SmallModel Model
209 SystemPromptPrefix string
210 SystemPrompt string
211 IsSubAgent bool
212 DisableAutoSummarize bool
213 IsYolo bool
214 Sessions session.Service
215 Messages message.Service
216 Tools []fantasy.AgentTool
217 Notify pubsub.Publisher[notify.Notification]
218 RunComplete pubsub.Publisher[notify.RunComplete]
219}
220
221func NewSessionAgent(
222 opts SessionAgentOptions,
223) SessionAgent {
224 return &sessionAgent{
225 largeModel: csync.NewValue(opts.LargeModel),
226 smallModel: csync.NewValue(opts.SmallModel),
227 systemPromptPrefix: csync.NewValue(opts.SystemPromptPrefix),
228 systemPrompt: csync.NewValue(opts.SystemPrompt),
229 isSubAgent: opts.IsSubAgent,
230 sessions: opts.Sessions,
231 messages: opts.Messages,
232 disableAutoSummarize: opts.DisableAutoSummarize,
233 tools: csync.NewSliceFrom(opts.Tools),
234 isYolo: opts.IsYolo,
235 notify: opts.Notify,
236 runComplete: opts.RunComplete,
237 messageQueue: csync.NewMap[string, []SessionAgentCall](),
238 activeRequests: csync.NewMap[string, context.CancelFunc](),
239 dispatchMu: csync.NewMap[string, *sync.Mutex](),
240 acceptedRuns: csync.NewMap[string, int](),
241 cancelMark: csync.NewMap[string, uint64](),
242 }
243}
244
245// AcceptedRun owns exactly one accept reservation taken by
246// BeginAccepted. It is the only carrier of accept-state across the
247// backend.runAgent / Coordinator.Run / sessionAgent.Run layers: a
248// counter > 0 means a dispatched prompt is in flight and has not yet
249// completed the dispatch handoff in Run. Close is the only way to
250// release the reservation and is idempotent.
251type AcceptedRun struct {
252 agent *sessionAgent
253 sessionID string
254 // seq is the monotonic accept sequence stamped by BeginAccepted. A
255 // cancel covers this handle iff seq is at or below the session's
256 // cancel mark, so a handle accepted after a cancel (higher seq) is
257 // never poisoned by it.
258 seq uint64
259 done atomic.Bool
260}
261
262// Close decrements the accept counter for this reservation. It is safe
263// to call multiple times; only the first call has effect.
264func (r *AcceptedRun) Close() {
265 if r == nil {
266 return
267 }
268 if !r.done.CompareAndSwap(false, true) {
269 return
270 }
271 r.agent.endAccepted(r.sessionID)
272}
273
274// SessionID exposes the session this reservation is for so the run path
275// can use it without an extra parameter.
276func (r *AcceptedRun) SessionID() string {
277 if r == nil {
278 return ""
279 }
280 return r.sessionID
281}
282
283// BeginAccepted increments the accept counter for sessionID and returns
284// a handle whose Close is the only way to decrement it. It is the only
285// entry point that mutates acceptedRuns.
286func (a *sessionAgent) BeginAccepted(sessionID string) *AcceptedRun {
287 a.acceptedMu.Lock()
288 defer a.acceptedMu.Unlock()
289 count, _ := a.acceptedRuns.Get(sessionID)
290 a.acceptedRuns.Set(sessionID, count+1)
291 a.acceptSeqGen++
292 return &AcceptedRun{agent: a, sessionID: sessionID, seq: a.acceptSeqGen}
293}
294
295// endAccepted decrements the accept counter for sessionID. It is only
296// called via AcceptedRun.Close. It uses a dedicated lock (not the
297// per-session dispatch mutex) so it can run while Run holds dispatchMu
298// for the same session without deadlocking.
299//
300// When the count reaches zero the session's cancel mark is dropped: no
301// accepted handle remains for it to cover, and any handle accepted later
302// gets a strictly higher sequence that the mark would not match anyway.
303// Handles canceled on entry never reach RunComplete, so this is the only
304// place that clears the mark for an all-canceled batch. Sibling handles
305// covered by the same mark are serialized on the per-session dispatch
306// mutex and read the mark before they Close, so this never clears it out
307// from under a covered handle still waiting to enter Run.
308func (a *sessionAgent) endAccepted(sessionID string) {
309 a.acceptedMu.Lock()
310 defer a.acceptedMu.Unlock()
311 count, ok := a.acceptedRuns.Get(sessionID)
312 if !ok || count <= 1 {
313 a.acceptedRuns.Del(sessionID)
314 a.cancelMark.Del(sessionID)
315 return
316 }
317 a.acceptedRuns.Set(sessionID, count-1)
318}
319
320// sessionMu returns the per-session dispatch mutex, creating it on first
321// use. Creation is guarded so concurrent callers always observe the same
322// mutex instance for a given session.
323func (a *sessionAgent) sessionMu(sessionID string) *sync.Mutex {
324 if mu, ok := a.dispatchMu.Get(sessionID); ok {
325 return mu
326 }
327 a.dispatchMuCreate.Lock()
328 defer a.dispatchMuCreate.Unlock()
329 if mu, ok := a.dispatchMu.Get(sessionID); ok {
330 return mu
331 }
332 mu := &sync.Mutex{}
333 a.dispatchMu.Set(sessionID, mu)
334 return mu
335}
336
337// enqueueCall appends call to the session's message queue. The
338// OnComplete hook is stripped: the caller that supplied it (typically
339// coordinator.Run) has its own retry/coalesce scope that ends when it
340// returns, so by the time the queue drains nobody is left to consume the
341// buffered terminal event. The recursive Run falls back to the default
342// broker publish, which is what existing subscribers expect for queued
343// turns.
344func (a *sessionAgent) enqueueCall(call SessionAgentCall) {
345 existing, ok := a.messageQueue.Get(call.SessionID)
346 if !ok {
347 existing = []SessionAgentCall{}
348 }
349 queued := call
350 if call.Accepted != nil {
351 // Preserve the accept sequence after the handle is stripped so
352 // the queue-drain paths can tell a follow-up queued before a
353 // cancel (covered by the mark) from one queued after it.
354 queued.acceptSeq = call.Accepted.seq
355 }
356 queued.OnComplete = nil
357 queued.Accepted = nil
358 existing = append(existing, queued)
359 a.messageQueue.Set(call.SessionID, existing)
360}
361
362// drainQueueForStep partitions the session's queued calls for the current
363// streaming step under the per-session dispatch mutex so the filtering is
364// atomic against a concurrent Cancel: canceledBySeq requires the caller to
365// hold that mutex, and evaluating it here (rather than after unlocking)
366// prevents a cancel recorded between the drain and the check from being
367// observed inconsistently.
368//
369// Calls covered by a pending cancel are dropped; the dropped ones that
370// carry a RunID are returned in canceledWithRunID so the caller can
371// publish their terminal cancelled RunComplete (a caller waiting on that
372// RunID, e.g. `crush run`, would otherwise hang). Uncanceled calls without
373// a RunID are returned in fold to be folded into the active turn,
374// preserving the existing follow-up behavior. Uncanceled calls that carry
375// a RunID are left in the queue so each runs as its own turn via the
376// recursive run path and publishes its own RunComplete, giving every
377// RunID-bearing prompt an explicit lifecycle instead of being silently
378// absorbed into another turn. fold is processed by the caller without the
379// lock held.
380func (a *sessionAgent) drainQueueForStep(sessionID string) (fold, canceledWithRunID []SessionAgentCall) {
381 dispatchLock := a.sessionMu(sessionID)
382 dispatchLock.Lock()
383 defer dispatchLock.Unlock()
384 queuedCalls, _ := a.messageQueue.Get(sessionID)
385 var keep []SessionAgentCall
386 for _, queued := range queuedCalls {
387 if a.canceledBySeq(sessionID, queued.acceptSeq) {
388 if queued.RunID != "" {
389 canceledWithRunID = append(canceledWithRunID, queued)
390 }
391 continue
392 }
393 if queued.RunID != "" {
394 keep = append(keep, queued)
395 continue
396 }
397 fold = append(fold, queued)
398 }
399 if len(keep) == 0 {
400 a.messageQueue.Del(sessionID)
401 } else {
402 a.messageQueue.Set(sessionID, keep)
403 }
404 return fold, canceledWithRunID
405}
406
407// publishCanceledQueueDrops emits a terminal cancelled RunComplete for
408// every dropped queued call that carries a RunID. A queued prompt removed
409// from the queue without ever running — covered by a pending cancel, or
410// cleared by Cancel/ClearQueue — would otherwise leave a caller blocked on
411// that RunID: `crush run` ignores live message events and exits only on a
412// RunComplete whose RunID matches. Calls without a RunID had no such waiter
413// and are dropped silently as before. A detached, bounded context keeps the
414// must-deliver publish alive even when the run context that triggered the
415// drop is already canceled.
416func (a *sessionAgent) publishCanceledQueueDrops(drops []SessionAgentCall) {
417 var hasRunID bool
418 for _, d := range drops {
419 if d.RunID != "" {
420 hasRunID = true
421 break
422 }
423 }
424 if !hasRunID {
425 return
426 }
427 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
428 defer cancel()
429 for _, d := range drops {
430 if d.RunID == "" {
431 continue
432 }
433 a.publishRunComplete(ctx, d, notify.RunComplete{
434 SessionID: d.SessionID,
435 RunID: d.RunID,
436 Cancelled: true,
437 })
438 }
439}
440
441// clearQueueAndNotify removes all queued prompts for the session and
442// publishes a terminal cancelled RunComplete for any that carried a RunID,
443// so callers waiting on those RunIDs (e.g. `crush run`) are not left
444// hanging when their queued prompt is discarded without running.
445func (a *sessionAgent) clearQueueAndNotify(sessionID string) {
446 queued, ok := a.messageQueue.Get(sessionID)
447 a.messageQueue.Del(sessionID)
448 if !ok {
449 return
450 }
451 a.publishCanceledQueueDrops(queued)
452}
453
454// clearPendingCancel removes any pending-cancel mark for sessionID. It
455// takes the per-session dispatch lock so it is ordered against Cancel
456// and the dispatch handoff.
457func (a *sessionAgent) clearPendingCancel(sessionID string) {
458 mu := a.sessionMu(sessionID)
459 mu.Lock()
460 defer mu.Unlock()
461 a.cancelMark.Del(sessionID)
462}
463
464// canceledBySeq reports whether an accepted handle or queued call with
465// the given accept sequence is covered by a pending cancel for the
466// session. Callers must hold the session's dispatch mutex. A tracked
467// sequence (seq > 0) is covered only when it is at or below the cancel
468// high-water mark, so a prompt accepted after the cancel (higher seq) is
469// never poisoned. An untracked sequence (seq == 0, an in-process enqueue
470// with no accept reservation) is covered whenever any mark is present,
471// preserving the pre-sequence behavior. The mark is not consumed: it
472// stays so every sibling handle it covers observes the same cancel, and
473// a later handle (higher seq) ignores it regardless.
474func (a *sessionAgent) canceledBySeq(sessionID string, seq uint64) bool {
475 mark, ok := a.cancelMark.Get(sessionID)
476 if !ok || mark == 0 {
477 return false
478 }
479 return seq == 0 || seq <= mark
480}
481
482// persistCanceledTurn writes the user/assistant records for a turn that
483// was canceled before (or just as) streaming would have produced them.
484// It creates the user message only when it was not already created by an
485// earlier createUserMessage call (userMsgCreated), then writes an
486// assistant message with FinishReasonCanceled. Both writes use
487// context.WithoutCancel(ctx) so workspace shutdown (which cancels the run
488// context) can't drop them.
489func (a *sessionAgent) persistCanceledTurn(ctx context.Context, call SessionAgentCall, userMsgCreated bool) error {
490 writeCtx, cancel := context.WithTimeout(context.WithoutCancel(ctx), 5*time.Second)
491 defer cancel()
492 if !userMsgCreated {
493 if _, err := a.createUserMessage(writeCtx, call); err != nil {
494 return err
495 }
496 }
497 largeModel := a.largeModel.Get()
498 assistant, err := a.messages.Create(writeCtx, call.SessionID, message.CreateMessageParams{
499 Role: message.Assistant,
500 Parts: []message.ContentPart{},
501 Model: largeModel.ModelCfg.Model,
502 Provider: largeModel.ModelCfg.Provider,
503 })
504 if err != nil {
505 return err
506 }
507 assistant.AddFinish(message.FinishReasonCanceled, "User canceled request", "")
508 return a.messages.Update(writeCtx, assistant)
509}
510
511// publishRunComplete emits the authoritative terminal event for a turn.
512// It honors the per-call OnComplete hook when set (so the coordinator can
513// coalesce retries) and otherwise falls back to the RunComplete broker.
514// ctx is used only for the bounded-blocking must-deliver publish; the
515// terminal payload is supplied by the caller. This is the single emit path
516// shared by the streaming defer and the cancel-on-entry early return so a
517// caller waiting on RunComplete (e.g. `crush run` with a RunID) always
518// observes exactly one terminal event regardless of which Run branch ends
519// the turn.
520func (a *sessionAgent) publishRunComplete(ctx context.Context, call SessionAgentCall, complete notify.RunComplete) {
521 if call.OnComplete != nil {
522 call.OnComplete(complete)
523 return
524 }
525 if a.runComplete == nil {
526 return
527 }
528 a.runComplete.PublishMustDeliver(ctx, pubsub.UpdatedEvent, complete)
529}
530
531// ValidateCall performs the cheap structural validation that
532// sessionAgent.Run requires before a call can be dispatched: a call must
533// carry either a non-empty prompt or a text attachment, and it must name a
534// session. It is exported so callers that accept a run before dispatching it
535// (e.g. backend.SendMessage) can apply the same checks and keep the error
536// contract consistent.
537func ValidateCall(call SessionAgentCall) error {
538 if call.Prompt == "" && !message.ContainsTextAttachment(call.Attachments) {
539 return ErrEmptyPrompt
540 }
541 if call.SessionID == "" {
542 return ErrSessionMissing
543 }
544 return nil
545}
546
547func (a *sessionAgent) Run(ctx context.Context, call SessionAgentCall) (result *fantasy.AgentResult, retErr error) {
548 if err := ValidateCall(call); err != nil {
549 return nil, err
550 }
551
552 // genCtx/cancel are the run context and its cancel func. For the
553 // accepted (fire-and-forget) dispatch path they are created under
554 // dispatchMu below so a concurrent Cancel can observe the
555 // activeRequests entry before the assistant message exists. For
556 // the in-process path they stay nil here and are created later,
557 // preserving the original ordering.
558 var (
559 genCtx context.Context
560 cancel context.CancelFunc
561 activeRegistered bool
562 userMsgCreated bool
563 )
564
565 if call.Accepted != nil {
566 // Serialize the accepted -> (cancel-on-entry | queued |
567 // active) transition against a concurrent Cancel. Cancel takes
568 // the same per-session lock, so every cancel observes at least
569 // one of: a cancel mark, an activeRequests entry, or a
570 // messageQueue entry it then clears.
571 mu := a.sessionMu(call.SessionID)
572 mu.Lock()
573
574 if a.canceledBySeq(call.SessionID, call.Accepted.seq) {
575 // Cancel-on-entry: a cancel arrived while this run was
576 // dispatched but not yet active, and this handle's accept
577 // sequence is at or below the session's cancel mark. The
578 // mark is left in place so sibling handles it also covers
579 // observe the same cancel; release the accept reservation,
580 // drop the lock, and persist a canceled turn without
581 // entering Stream.
582 //
583 // This path returns before the streaming defer that
584 // publishes RunComplete is installed, so emit the terminal
585 // event explicitly. Without it, a caller waiting on
586 // RunComplete for this RunID (e.g. `crush run`, which
587 // ignores message events and blocks on RunComplete) would
588 // hang on an immediately-canceled accepted run.
589 call.Accepted.Close()
590 mu.Unlock()
591 complete := notify.RunComplete{
592 SessionID: call.SessionID,
593 RunID: call.RunID,
594 Cancelled: true,
595 }
596 if err := a.persistCanceledTurn(ctx, call, false); err != nil {
597 complete.Error = err.Error()
598 a.publishRunComplete(ctx, call, complete)
599 return nil, err
600 }
601 a.publishRunComplete(ctx, call, complete)
602 return nil, nil
603 }
604
605 if a.IsSessionBusy(call.SessionID) {
606 // Busy: an earlier prompt is active. Queue this call and
607 // release the accept reservation. A Cancel arriving after
608 // this point sees the active entry and clears the queue.
609 a.enqueueCall(call)
610 call.Accepted.Close()
611 mu.Unlock()
612 return nil, nil
613 }
614
615 // Idle: become the active run. Register the cancel func before
616 // dropping the lock so a Cancel that arrives between here and
617 // assistant creation is not lost.
618 runCtx := context.WithValue(ctx, tools.SessionIDContextKey, call.SessionID)
619 genCtx, cancel = context.WithCancel(runCtx)
620 a.activeRequests.Set(call.SessionID, cancel)
621 activeRegistered = true
622 call.Accepted.Close()
623 mu.Unlock()
624
625 defer cancel()
626 defer a.activeRequests.Del(call.SessionID)
627 } else if a.IsSessionBusy(call.SessionID) {
628 // Queue the message if busy. Strip OnComplete: the caller that
629 // supplied the hook (typically coordinator.Run) has its own
630 // retry/coalesce scope that ends when it returns, so by the time
631 // the queue drains nobody is left to consume the buffered
632 // terminal event. The recursive Run will fall back to the
633 // default broker publish, which is what existing subscribers
634 // expect for queued turns.
635 a.enqueueCall(call)
636 return nil, nil
637 }
638
639 // Copy mutable fields under lock to avoid races with SetTools/SetModels.
640 agentTools := a.tools.Copy()
641 largeModel := a.largeModel.Get()
642 systemPrompt := a.systemPrompt.Get()
643 promptPrefix := a.systemPromptPrefix.Get()
644 var instructions strings.Builder
645
646 for _, server := range mcp.GetStates() {
647 if server.State != mcp.StateConnected {
648 continue
649 }
650 if s := server.Client.InitializeResult().Instructions; s != "" {
651 instructions.WriteString(s)
652 instructions.WriteString("\n\n")
653 }
654 }
655
656 if s := instructions.String(); s != "" {
657 systemPrompt += "\n\n<mcp-instructions>\n" + s + "\n</mcp-instructions>"
658 }
659
660 if len(agentTools) > 0 {
661 // Add Anthropic caching to the last tool.
662 agentTools[len(agentTools)-1].SetProviderOptions(a.getCacheControlOptions())
663 }
664
665 agent := fantasy.NewAgent(
666 largeModel.Model,
667 fantasy.WithSystemPrompt(systemPrompt),
668 fantasy.WithTools(agentTools...),
669 fantasy.WithUserAgent(userAgent),
670 )
671
672 sessionLock := sync.Mutex{}
673 currentSession, err := a.sessions.Get(ctx, call.SessionID)
674 if err != nil {
675 return nil, fmt.Errorf("failed to get session: %w", err)
676 }
677
678 msgs, err := a.getSessionMessages(ctx, currentSession)
679 if err != nil {
680 return nil, fmt.Errorf("failed to get session messages: %w", err)
681 }
682
683 var wg sync.WaitGroup
684 // Generate title if first message.
685 if len(msgs) == 0 {
686 titleCtx := ctx // Copy to avoid race with ctx reassignment below.
687 wg.Go(func() {
688 a.generateTitle(titleCtx, call.SessionID, call.Prompt)
689 })
690 }
691 defer wg.Wait()
692
693 // Add the user message to the session.
694 _, err = a.createUserMessage(ctx, call)
695 if err != nil {
696 return nil, err
697 }
698 userMsgCreated = true
699
700 // Add the session to the context.
701 ctx = context.WithValue(ctx, tools.SessionIDContextKey, call.SessionID)
702
703 // For the accepted dispatch path the run context and cancel func
704 // were already created and registered under dispatchMu above; reuse
705 // them. For the in-process path create them here, preserving the
706 // original ordering.
707 if !activeRegistered {
708 genCtx, cancel = context.WithCancel(ctx)
709 a.activeRequests.Set(call.SessionID, cancel)
710
711 defer cancel()
712 defer a.activeRequests.Del(call.SessionID)
713 }
714 // skipRunComplete is set just before the queued-recursion path so
715 // the outer Run doesn't publish a RunComplete that would race
716 // with — and be superseded by — the recursive call's own
717 // RunComplete (each queued user prompt is its own turn and
718 // publishes exactly one terminal event).
719 var skipRunComplete bool
720 // currentAssistant is declared here so the deferred RunComplete
721 // publish below can capture the pointer that PrepareStep will
722 // later (re)assign for each streaming step. The final assistant
723 // message of the turn is the value reachable through this
724 // pointer when the defer runs.
725 var currentAssistant *message.Message
726 // Drain any debounced message updates before returning. message.Service
727 // already flushes synchronously on terminal updates, but a defer here
728 // guarantees the contract at every Run exit (success, error, panic
729 // recovery upstream) without callers needing to know.
730 //
731 // After the flush completes — meaning all per-message
732 // Publish(UpdatedEvent) calls have fired and been buffered into
733 // every subscriber's channel — publish the authoritative
734 // RunComplete event for this turn. The flush-then-publish order
735 // gives well-behaved clients the best chance of seeing the final
736 // message event before RunComplete; the embedded Text field
737 // reconciles for clients that observe the events out of order
738 // (the pubsub broker fan-in does not serialize publishes from
739 // different upstream brokers).
740 defer func() {
741 // Use a context detached from the run context: workspace
742 // shutdown cancels ctx before this goroutine returns, but the
743 // buffered streaming deltas must still land before the DB is
744 // closed. A short timeout bounds the flush.
745 flushCtx, flushCancel := context.WithTimeout(context.WithoutCancel(ctx), 5*time.Second)
746 defer flushCancel()
747 if flushErr := a.messages.FlushAll(flushCtx); flushErr != nil {
748 slog.Error("Failed to flush pending message updates after run", "error", flushErr)
749 }
750 if skipRunComplete {
751 return
752 }
753 complete := notify.RunComplete{SessionID: call.SessionID, RunID: call.RunID}
754 if currentAssistant != nil {
755 complete.MessageID = currentAssistant.ID
756 complete.Text = currentAssistant.Content().String()
757 }
758 if retErr != nil {
759 complete.Error = retErr.Error()
760 complete.Cancelled = errors.Is(retErr, context.Canceled)
761 } else if ctx.Err() != nil {
762 complete.Cancelled = true
763 }
764 // Prefer the per-call hook when supplied so the coordinator
765 // can coalesce retries (e.g. unauthorized → re-auth → retry)
766 // into a single user-visible terminal event. The fallback
767 // must-deliver publish applies bounded-blocking semantics to
768 // the authoritative terminal event so a momentarily-full
769 // subscriber channel can't silently drop it and hang
770 // non-interactive clients waiting on RunComplete.
771 a.publishRunComplete(ctx, call, complete)
772 }()
773
774 history, files := a.preparePrompt(msgs, largeModel.CatwalkCfg.SupportsImages, call.Attachments...)
775
776 startTime := time.Now()
777 a.eventPromptSent(call.SessionID)
778
779 var stepMessages []fantasy.Message
780 var shouldSummarize bool
781 // Don't send MaxOutputTokens if 0 — some providers (e.g. LM Studio) reject it
782 var maxOutputTokens *int64
783 if call.MaxOutputTokens > 0 {
784 maxOutputTokens = &call.MaxOutputTokens
785 }
786 result, err = agent.Stream(genCtx, fantasy.AgentStreamCall{
787 Prompt: message.PromptWithTextAttachments(call.Prompt, call.Attachments),
788 Files: files,
789 Messages: history,
790 ProviderOptions: call.ProviderOptions,
791 MaxOutputTokens: maxOutputTokens,
792 TopP: call.TopP,
793 Temperature: call.Temperature,
794 PresencePenalty: call.PresencePenalty,
795 TopK: call.TopK,
796 FrequencyPenalty: call.FrequencyPenalty,
797 PrepareStep: func(callContext context.Context, options fantasy.PrepareStepFunctionOptions) (_ context.Context, prepared fantasy.PrepareStepResult, err error) {
798 prepared.Messages = options.Messages
799 for i := range prepared.Messages {
800 prepared.Messages[i].ProviderOptions = nil
801 }
802
803 // Use latest tools (updated by SetTools when MCP tools change).
804 prepared.Tools = a.tools.Copy()
805
806 // Drain queued follow-up prompts for this step. Calls covered
807 // by a cancel recorded while they sat in the queue are dropped:
808 // a cancel that arrived after a prompt was queued must not let
809 // it run as part of this step. Coverage is per-call by accept
810 // sequence so a follow-up queued after the cancel (higher seq)
811 // is not dropped. A dropped prompt carrying a RunID still gets
812 // its terminal cancelled RunComplete so a caller waiting on it
813 // does not hang. Uncanceled prompts without a RunID are folded
814 // into this turn; uncanceled prompts with a RunID are left
815 // queued so each runs as its own turn (with its own
816 // RunComplete) via the recursive run path below.
817 fold, canceledRunIDs := a.drainQueueForStep(call.SessionID)
818 a.publishCanceledQueueDrops(canceledRunIDs)
819 for _, queued := range fold {
820 userMessage, createErr := a.createUserMessage(callContext, queued)
821 if createErr != nil {
822 return callContext, prepared, createErr
823 }
824 prepared.Messages = append(prepared.Messages, userMessage.ToAIMessage()...)
825 }
826
827 prepared.Messages = a.workaroundProviderMediaLimitations(prepared.Messages, largeModel)
828
829 lastSystemRoleInx := 0
830 systemMessageUpdated := false
831 for i, msg := range prepared.Messages {
832 // Only add cache control to the last message.
833 if msg.Role == fantasy.MessageRoleSystem {
834 lastSystemRoleInx = i
835 } else if !systemMessageUpdated {
836 prepared.Messages[lastSystemRoleInx].ProviderOptions = a.getCacheControlOptions()
837 systemMessageUpdated = true
838 }
839 // Than add cache control to the last 2 messages.
840 if i > len(prepared.Messages)-3 {
841 prepared.Messages[i].ProviderOptions = a.getCacheControlOptions()
842 }
843 }
844
845 if promptPrefix != "" {
846 prepared.Messages = append([]fantasy.Message{fantasy.NewSystemMessage(promptPrefix)}, prepared.Messages...)
847 }
848
849 sessionLock.Lock()
850 stepMessages = cloneFantasyMessages(prepared.Messages)
851 sessionLock.Unlock()
852
853 var assistantMsg message.Message
854 assistantMsg, err = a.messages.Create(callContext, call.SessionID, message.CreateMessageParams{
855 Role: message.Assistant,
856 Parts: []message.ContentPart{},
857 Model: largeModel.ModelCfg.Model,
858 Provider: largeModel.ModelCfg.Provider,
859 })
860 if err != nil {
861 return callContext, prepared, err
862 }
863 callContext = context.WithValue(callContext, tools.MessageIDContextKey, assistantMsg.ID)
864 callContext = context.WithValue(callContext, tools.SupportsImagesContextKey, largeModel.CatwalkCfg.SupportsImages)
865 callContext = context.WithValue(callContext, tools.ModelNameContextKey, largeModel.CatwalkCfg.Name)
866 currentAssistant = &assistantMsg
867 return callContext, prepared, err
868 },
869 OnReasoningStart: func(id string, reasoning fantasy.ReasoningContent) error {
870 currentAssistant.AppendReasoningContent(reasoning.Text)
871 return a.messages.Update(genCtx, *currentAssistant)
872 },
873 OnReasoningDelta: func(id string, text string) error {
874 currentAssistant.AppendReasoningContent(text)
875 return a.messages.Update(genCtx, *currentAssistant)
876 },
877 OnReasoningEnd: func(id string, reasoning fantasy.ReasoningContent) error {
878 // handle anthropic signature
879 if anthropicData, ok := reasoning.ProviderMetadata[anthropic.Name]; ok {
880 if reasoning, ok := anthropicData.(*anthropic.ReasoningOptionMetadata); ok {
881 currentAssistant.AppendReasoningSignature(reasoning.Signature)
882 }
883 }
884 if googleData, ok := reasoning.ProviderMetadata[google.Name]; ok {
885 if reasoning, ok := googleData.(*google.ReasoningMetadata); ok {
886 currentAssistant.AppendThoughtSignature(reasoning.Signature, reasoning.ToolID)
887 }
888 }
889 if openaiData, ok := reasoning.ProviderMetadata[openai.Name]; ok {
890 if reasoning, ok := openaiData.(*openai.ResponsesReasoningMetadata); ok {
891 currentAssistant.SetReasoningResponsesData(reasoning)
892 }
893 }
894 currentAssistant.FinishThinking()
895 return a.messages.Update(genCtx, *currentAssistant)
896 },
897 OnTextDelta: func(id string, text string) error {
898 // Strip leading newline from initial text content. This is is
899 // particularly important in non-interactive mode where leading
900 // newlines are very visible.
901 if len(currentAssistant.Parts) == 0 {
902 text = strings.TrimPrefix(text, "\n")
903 }
904
905 currentAssistant.AppendContent(text)
906 return a.messages.Update(genCtx, *currentAssistant)
907 },
908 OnToolInputStart: func(id string, toolName string) error {
909 toolCall := message.ToolCall{
910 ID: id,
911 Name: toolName,
912 ProviderExecuted: false,
913 Finished: false,
914 }
915 currentAssistant.AddToolCall(toolCall)
916 // Use parent ctx instead of genCtx to ensure the update succeeds
917 // even if the request is canceled mid-stream
918 return a.messages.Update(ctx, *currentAssistant)
919 },
920 OnRetry: func(err *fantasy.ProviderError, delay time.Duration) {
921 slog.Warn("Provider request failed, retrying", providerRetryLogFields(err, delay)...)
922 },
923 OnToolCall: func(tc fantasy.ToolCallContent) error {
924 toolCall := message.ToolCall{
925 ID: tc.ToolCallID,
926 Name: tc.ToolName,
927 Input: tc.Input,
928 ProviderExecuted: false,
929 Finished: true,
930 }
931 currentAssistant.AddToolCall(toolCall)
932 // Use parent ctx instead of genCtx to ensure the update succeeds
933 // even if the request is canceled mid-stream
934 return a.messages.Update(ctx, *currentAssistant)
935 },
936 OnToolResult: func(result fantasy.ToolResultContent) error {
937 toolResult := a.convertToToolResult(result)
938 // Use parent ctx instead of genCtx to ensure the message is created
939 // even if the request is canceled mid-stream
940 _, createMsgErr := a.messages.Create(ctx, currentAssistant.SessionID, message.CreateMessageParams{
941 Role: message.Tool,
942 Parts: []message.ContentPart{
943 toolResult,
944 },
945 })
946 return createMsgErr
947 },
948 OnStepFinish: func(stepResult fantasy.StepResult) error {
949 finishReason := message.FinishReasonUnknown
950 switch stepResult.FinishReason {
951 case fantasy.FinishReasonLength:
952 finishReason = message.FinishReasonMaxTokens
953 case fantasy.FinishReasonStop:
954 finishReason = message.FinishReasonEndTurn
955 case fantasy.FinishReasonToolCalls:
956 finishReason = message.FinishReasonToolUse
957 }
958 // If a tool result halted the turn (e.g. a hook halt or a
959 // permission denial), the step ends on FinishReasonToolCalls but
960 // the model will not be called again. Treat it as the end of the
961 // turn so the UI can render the assistant footer.
962 if finishReason == message.FinishReasonToolUse {
963 for _, tr := range stepResult.Content.ToolResults() {
964 if tr.StopTurn {
965 finishReason = message.FinishReasonEndTurn
966 break
967 }
968 }
969 }
970 currentAssistant.AddFinish(finishReason, "", "")
971 sessionLock.Lock()
972 defer sessionLock.Unlock()
973
974 updatedSession, getSessionErr := a.sessions.Get(ctx, call.SessionID)
975 if getSessionErr != nil {
976 return getSessionErr
977 }
978 usage, estimated := fallbackStepUsage(stepMessages, stepResult)
979 a.updateSessionUsage(largeModel, &updatedSession, usage, a.openrouterCost(stepResult.ProviderMetadata), estimated)
980 _, sessionErr := a.sessions.Save(ctx, updatedSession)
981 if sessionErr != nil {
982 return sessionErr
983 }
984 currentSession = updatedSession
985 return a.messages.Update(genCtx, *currentAssistant)
986 },
987 StopWhen: []fantasy.StopCondition{
988 func(_ []fantasy.StepResult) bool {
989 cw := int64(largeModel.CatwalkCfg.ContextWindow)
990 // If context window is unknown (0), skip auto-summarize
991 // to avoid immediately truncating custom/local models.
992 if cw == 0 {
993 return false
994 }
995 tokens := currentSession.CompletionTokens + currentSession.PromptTokens
996 remaining := cw - tokens
997 var threshold int64
998 if cw > largeContextWindowThreshold {
999 threshold = largeContextWindowBuffer
1000 } else {
1001 threshold = int64(float64(cw) * smallContextWindowRatio)
1002 }
1003 if (remaining <= threshold) && !a.disableAutoSummarize {
1004 shouldSummarize = true
1005 return true
1006 }
1007 return false
1008 },
1009 func(steps []fantasy.StepResult) bool {
1010 return hasRepeatedToolCalls(steps, loopDetectionWindowSize, loopDetectionMaxRepeats)
1011 },
1012 },
1013 })
1014
1015 a.eventPromptResponded(call.SessionID, time.Since(startTime).Truncate(time.Second))
1016
1017 if err != nil {
1018 isHyper := largeModel.ModelCfg.Provider == hyper.Name
1019 isCancelErr := errors.Is(err, context.Canceled)
1020 if currentAssistant == nil {
1021 // Cancel-before-assistant-creation window: the run was
1022 // canceled after activeRequests.Set but before PrepareStep
1023 // created the assistant message. Without this, the turn
1024 // would return with no FinishReasonCanceled marker and no
1025 // user-visible record. The user message was already created
1026 // above, so persistCanceledTurn only writes the assistant
1027 // record.
1028 if isCancelErr {
1029 if persistErr := a.persistCanceledTurn(ctx, call, userMsgCreated); persistErr != nil {
1030 return nil, persistErr
1031 }
1032 }
1033 return result, err
1034 }
1035 // Persist final state with a context detached from the run
1036 // context. The run context (ctx) is derived from the
1037 // workspace context, which workspace shutdown cancels before
1038 // agent goroutines finish; using ctx here would drop the
1039 // final assistant state. WithoutCancel keeps the values
1040 // (e.g. session ID) while ignoring cancellation, and a short
1041 // timeout bounds the cleanup writes.
1042 cleanupCtx, cleanupCancel := context.WithTimeout(context.WithoutCancel(ctx), 5*time.Second)
1043 defer cleanupCancel()
1044 // Ensure we finish thinking on error to close the reasoning state.
1045 currentAssistant.FinishThinking()
1046 toolCalls := currentAssistant.ToolCalls()
1047 // INFO: we use the cleanup context here because the genCtx has been cancelled.
1048 msgs, createErr := a.messages.List(cleanupCtx, currentAssistant.SessionID)
1049 if createErr != nil {
1050 return nil, createErr
1051 }
1052 for _, tc := range toolCalls {
1053 if !tc.Finished {
1054 tc.Finished = true
1055 tc.Input = "{}"
1056 currentAssistant.AddToolCall(tc)
1057 updateErr := a.messages.Update(cleanupCtx, *currentAssistant)
1058 if updateErr != nil {
1059 return nil, updateErr
1060 }
1061 }
1062
1063 found := false
1064 for _, msg := range msgs {
1065 if msg.Role == message.Tool {
1066 for _, tr := range msg.ToolResults() {
1067 if tr.ToolCallID == tc.ID {
1068 found = true
1069 break
1070 }
1071 }
1072 }
1073 if found {
1074 break
1075 }
1076 }
1077 if found {
1078 continue
1079 }
1080 content := "There was an error while executing the tool"
1081 if isCancelErr {
1082 content = "Error: user cancelled assistant tool calling"
1083 }
1084 toolResult := message.ToolResult{
1085 ToolCallID: tc.ID,
1086 Name: tc.Name,
1087 Content: content,
1088 IsError: true,
1089 }
1090 _, createErr = a.messages.Create(cleanupCtx, currentAssistant.SessionID, message.CreateMessageParams{
1091 Role: message.Tool,
1092 Parts: []message.ContentPart{
1093 toolResult,
1094 },
1095 })
1096 if createErr != nil {
1097 return nil, createErr
1098 }
1099 }
1100 var fantasyErr *fantasy.Error
1101 var providerErr *fantasy.ProviderError
1102 const defaultTitle = "Provider Error"
1103 linkStyle := lipgloss.NewStyle().Foreground(charmtone.Guac).Underline(true)
1104 if isCancelErr {
1105 currentAssistant.AddFinish(message.FinishReasonCanceled, "User canceled request", "")
1106 } else if isHyper && errors.As(err, &providerErr) && providerErr.StatusCode == http.StatusUnauthorized {
1107 currentAssistant.AddFinish(message.FinishReasonError, "Unauthorized", `Please re-authenticate with Hyper. You can also run "crush auth" to re-authenticate.`)
1108 if a.notify != nil {
1109 a.notify.Publish(pubsub.CreatedEvent, notify.Notification{
1110 SessionID: call.SessionID,
1111 SessionTitle: currentSession.Title,
1112 Type: notify.TypeReAuthenticate,
1113 ProviderID: largeModel.ModelCfg.Provider,
1114 })
1115 }
1116 } else if isHyper && errors.As(err, &providerErr) && providerErr.StatusCode == http.StatusPaymentRequired {
1117 url := hyper.BaseURL()
1118 link := linkStyle.Hyperlink(url, "id=hyper").Render(url)
1119 currentAssistant.AddFinish(message.FinishReasonError, "No credits", "You're out of credits. Add more at "+link)
1120 } else if errors.As(err, &providerErr) {
1121 if providerErr.Message == "The requested model is not supported." {
1122 url := "https://github.com/settings/copilot/features"
1123 link := linkStyle.Hyperlink(url, "id=copilot").Render(url)
1124 currentAssistant.AddFinish(
1125 message.FinishReasonError,
1126 "Copilot model not enabled",
1127 fmt.Sprintf("%q is not enabled in Copilot. Go to the following page to enable it. Then, wait 5 minutes before trying again. %s", largeModel.CatwalkCfg.Name, link),
1128 )
1129 } else {
1130 currentAssistant.AddFinish(message.FinishReasonError, cmp.Or(stringext.Capitalize(providerErr.Title), defaultTitle), providerErr.Message)
1131 }
1132 } else if errors.As(err, &fantasyErr) {
1133 currentAssistant.AddFinish(message.FinishReasonError, cmp.Or(stringext.Capitalize(fantasyErr.Title), defaultTitle), fantasyErr.Message)
1134 } else {
1135 currentAssistant.AddFinish(message.FinishReasonError, defaultTitle, err.Error())
1136 }
1137 // Note: we use the cleanup context here because the genCtx has been
1138 // cancelled.
1139 updateErr := a.messages.Update(cleanupCtx, *currentAssistant)
1140 if updateErr != nil {
1141 return nil, updateErr
1142 }
1143 return nil, err
1144 }
1145
1146 if shouldSummarize {
1147 a.activeRequests.Del(call.SessionID)
1148 if summarizeErr := a.Summarize(genCtx, call.SessionID, call.ProviderOptions); summarizeErr != nil {
1149 return nil, summarizeErr
1150 }
1151 // If the agent wasn't done...
1152 if len(currentAssistant.ToolCalls()) > 0 {
1153 existing, ok := a.messageQueue.Get(call.SessionID)
1154 if !ok {
1155 existing = []SessionAgentCall{}
1156 }
1157 call.Prompt = fmt.Sprintf("The previous session was interrupted because it got too long, the initial user request was: `%s`", call.Prompt)
1158 existing = append(existing, call)
1159 a.messageQueue.Set(call.SessionID, existing)
1160 }
1161 }
1162
1163 // Release active request before publishing the notification.
1164 // TUI handlers poll IsSessionBusy() and only re-evaluate when a
1165 // tea.Msg arrives, so the cleanup must precede the notify or
1166 // subscribers see stale busy state at the moment of receipt.
1167 a.activeRequests.Del(call.SessionID)
1168 cancel()
1169
1170 // Send notification that agent has finished its turn (skip for
1171 // nested/non-interactive sessions).
1172 if !call.NonInteractive && a.notify != nil {
1173 a.notify.Publish(pubsub.CreatedEvent, notify.Notification{
1174 SessionID: call.SessionID,
1175 SessionTitle: currentSession.Title,
1176 Type: notify.TypeAgentFinished,
1177 })
1178 }
1179
1180 // Hand off to the next queued prompt (if any) under dispatchMu so
1181 // the transition from this finished run to the queued run is atomic
1182 // against a concurrent Cancel. activeRequests for this session was
1183 // just deleted above, so without the lock there is a window in
1184 // which the session looks idle and a cancel becomes a no-op that
1185 // fails to stop the queued prompt. Holding the lock lets us observe
1186 // a pending cancel recorded against the session and drop the queue
1187 // instead of running it, and (for the recursion) hand a fresh
1188 // accept reservation to the dequeued call so acceptedRuns stays > 0
1189 // across the recursive Run's own dispatch handoff — keeping the
1190 // session observable to Cancel for the entire transition and
1191 // closing the dequeue -> re-register window.
1192 mu := a.sessionMu(call.SessionID)
1193 mu.Lock()
1194 queuedMessages, _ := a.messageQueue.Get(call.SessionID)
1195 if mark, ok := a.cancelMark.Get(call.SessionID); ok && mark > 0 && len(queuedMessages) > 0 {
1196 // A cancel was recorded for this session (e.g. it arrived while
1197 // this run was active and follow-ups had been queued). Drop the
1198 // queued prompts it covers (accept sequence at or below the
1199 // mark, or untracked); keep any queued after the cancel (higher
1200 // sequence) so they still run.
1201 var kept []SessionAgentCall
1202 var canceledRunIDDrops []SessionAgentCall
1203 for _, q := range queuedMessages {
1204 if q.acceptSeq == 0 || q.acceptSeq <= mark {
1205 if q.RunID != "" {
1206 canceledRunIDDrops = append(canceledRunIDDrops, q)
1207 }
1208 continue
1209 }
1210 kept = append(kept, q)
1211 }
1212 queuedMessages = kept
1213 a.messageQueue.Set(call.SessionID, kept)
1214 // A dropped prompt carrying a RunID must still publish its
1215 // terminal cancelled RunComplete so a caller waiting on that
1216 // RunID does not hang.
1217 a.publishCanceledQueueDrops(canceledRunIDDrops)
1218 }
1219 if len(queuedMessages) == 0 {
1220 // No queued work. Clear the cancel mark only when no accepted
1221 // run remains in flight that it might still cover; otherwise a
1222 // sibling prompt (sequence at or below the mark) waiting to
1223 // enter Run would lose its cancellation. When accepted runs are
1224 // gone, this also clears a stale mark so it can't catch a
1225 // future run.
1226 a.messageQueue.Del(call.SessionID)
1227 a.acceptedMu.Lock()
1228 inFlight, _ := a.acceptedRuns.Get(call.SessionID)
1229 a.acceptedMu.Unlock()
1230 if inFlight == 0 {
1231 a.cancelMark.Del(call.SessionID)
1232 }
1233 mu.Unlock()
1234 return result, err
1235 }
1236 // There are queued messages, restart the loop. Suppress the outer
1237 // defer's emit: it would otherwise observe the recursive Run's retErr
1238 // (named-return clobbering through the return below) against this
1239 // turn's MessageID/Text and publish a mixed, racing event.
1240 skipRunComplete = true
1241 // Decide whether this turn still owes its own terminal RunComplete.
1242 // Each submitted prompt with a RunID has its own lifecycle, so a turn
1243 // that is finished and handing off to a *different* queued prompt must
1244 // publish its own RunComplete here — leaving it to the recursive turn
1245 // (which carries a different RunID) would hang a caller waiting on
1246 // this turn's RunID. The exception is the summarize-continuation path,
1247 // which re-queues this same call (same RunID) to resume after a
1248 // summary; in that case the eventual terminal turn for this RunID
1249 // publishes, so publishing now would double-emit.
1250 outerOwesRunComplete := call.RunID != ""
1251 if outerOwesRunComplete {
1252 for _, q := range queuedMessages {
1253 if q.RunID == call.RunID {
1254 outerOwesRunComplete = false
1255 break
1256 }
1257 }
1258 }
1259 firstQueuedMessage := queuedMessages[0]
1260 a.messageQueue.Set(call.SessionID, queuedMessages[1:])
1261 // Reserve a fresh accept for the dequeued prompt before dropping the
1262 // lock so acceptedRuns > 0 across the handoff into the recursive
1263 // Run. This closes the window between this dequeue and the recursive
1264 // Run registering its activeRequests entry: a cancel arriving in
1265 // that window now records a pending cancel (acceptedRuns > 0) that
1266 // the recursive Run's accepted path observes as cancel-on-entry.
1267 firstQueuedMessage.Accepted = a.BeginAccepted(call.SessionID)
1268 mu.Unlock()
1269 if outerOwesRunComplete {
1270 complete := notify.RunComplete{SessionID: call.SessionID, RunID: call.RunID}
1271 if currentAssistant != nil {
1272 complete.MessageID = currentAssistant.ID
1273 complete.Text = currentAssistant.Content().String()
1274 }
1275 if ctx.Err() != nil {
1276 complete.Cancelled = true
1277 }
1278 a.publishRunComplete(ctx, call, complete)
1279 }
1280 return a.Run(ctx, firstQueuedMessage)
1281}
1282
1283func (a *sessionAgent) Summarize(ctx context.Context, sessionID string, opts fantasy.ProviderOptions) error {
1284 if a.IsSessionBusy(sessionID) {
1285 return ErrSessionBusy
1286 }
1287
1288 // Copy mutable fields under lock to avoid races with SetModels.
1289 largeModel := a.largeModel.Get()
1290 systemPromptPrefix := a.systemPromptPrefix.Get()
1291
1292 currentSession, err := a.sessions.Get(ctx, sessionID)
1293 if err != nil {
1294 return fmt.Errorf("failed to get session: %w", err)
1295 }
1296 msgs, err := a.getSessionMessages(ctx, currentSession)
1297 if err != nil {
1298 return err
1299 }
1300 if len(msgs) == 0 {
1301 // Nothing to summarize.
1302 return nil
1303 }
1304
1305 aiMsgs, _ := a.preparePrompt(msgs, largeModel.CatwalkCfg.SupportsImages)
1306
1307 genCtx, cancel := context.WithCancel(ctx)
1308 a.activeRequests.Set(sessionID, cancel)
1309 defer a.activeRequests.Del(sessionID)
1310 defer cancel()
1311 defer func() {
1312 if flushErr := a.messages.FlushAll(ctx); flushErr != nil {
1313 slog.Error("Failed to flush pending message updates after summarize", "error", flushErr)
1314 }
1315 }()
1316
1317 agent := fantasy.NewAgent(
1318 largeModel.Model,
1319 fantasy.WithSystemPrompt(string(summaryPrompt)),
1320 fantasy.WithUserAgent(userAgent),
1321 )
1322 summaryMessage, err := a.messages.Create(ctx, sessionID, message.CreateMessageParams{
1323 Role: message.Assistant,
1324 Model: largeModel.ModelCfg.Model,
1325 Provider: largeModel.ModelCfg.Provider,
1326 IsSummaryMessage: true,
1327 })
1328 if err != nil {
1329 return err
1330 }
1331
1332 summaryPromptText := buildSummaryPrompt(currentSession.Todos)
1333
1334 resp, err := agent.Stream(genCtx, fantasy.AgentStreamCall{
1335 Prompt: summaryPromptText,
1336 Messages: aiMsgs,
1337 ProviderOptions: opts,
1338 PrepareStep: func(callContext context.Context, options fantasy.PrepareStepFunctionOptions) (_ context.Context, prepared fantasy.PrepareStepResult, err error) {
1339 prepared.Messages = options.Messages
1340 if systemPromptPrefix != "" {
1341 prepared.Messages = append([]fantasy.Message{fantasy.NewSystemMessage(systemPromptPrefix)}, prepared.Messages...)
1342 }
1343 return callContext, prepared, nil
1344 },
1345 OnReasoningDelta: func(id string, text string) error {
1346 summaryMessage.AppendReasoningContent(text)
1347 return a.messages.Update(genCtx, summaryMessage)
1348 },
1349 OnReasoningEnd: func(id string, reasoning fantasy.ReasoningContent) error {
1350 // Handle anthropic signature.
1351 if anthropicData, ok := reasoning.ProviderMetadata["anthropic"]; ok {
1352 if signature, ok := anthropicData.(*anthropic.ReasoningOptionMetadata); ok && signature.Signature != "" {
1353 summaryMessage.AppendReasoningSignature(signature.Signature)
1354 }
1355 }
1356 summaryMessage.FinishThinking()
1357 return a.messages.Update(genCtx, summaryMessage)
1358 },
1359 OnTextDelta: func(id, text string) error {
1360 summaryMessage.AppendContent(text)
1361 return a.messages.Update(genCtx, summaryMessage)
1362 },
1363 })
1364 if err != nil {
1365 isCancelErr := errors.Is(err, context.Canceled)
1366 if isCancelErr {
1367 // User cancelled summarize we need to remove the summary message.
1368 deleteErr := a.messages.Delete(ctx, summaryMessage.ID)
1369 return deleteErr
1370 }
1371 // Mark the summary message as finished with an error so the UI
1372 // stops spinning.
1373 summaryMessage.AddFinish(message.FinishReasonError, "Summarization Error", err.Error())
1374 if updateErr := a.messages.Update(ctx, summaryMessage); updateErr != nil {
1375 return updateErr
1376 }
1377 return err
1378 }
1379
1380 summaryMessage.AddFinish(message.FinishReasonEndTurn, "", "")
1381 err = a.messages.Update(genCtx, summaryMessage)
1382 if err != nil {
1383 return err
1384 }
1385
1386 var openrouterCost *float64
1387 for _, step := range resp.Steps {
1388 stepCost := a.openrouterCost(step.ProviderMetadata)
1389 if stepCost != nil {
1390 newCost := *stepCost
1391 if openrouterCost != nil {
1392 newCost += *openrouterCost
1393 }
1394 openrouterCost = &newCost
1395 }
1396 }
1397
1398 a.updateSessionUsage(largeModel, ¤tSession, resp.TotalUsage, openrouterCost, false)
1399
1400 // Just in case, get just the last usage info.
1401 usage := resp.Response.Usage
1402 currentSession.SummaryMessageID = summaryMessage.ID
1403 currentSession.CompletionTokens = summaryCompletionTokens(usage, summaryMessage)
1404 currentSession.PromptTokens = 0
1405 currentSession.EstimatedUsage = usageIsZero(usage)
1406 _, err = a.sessions.Save(genCtx, currentSession)
1407 if err != nil {
1408 return err
1409 }
1410
1411 // Release the active request before processing queued messages so that
1412 // Run() does not see the session as busy.
1413 a.activeRequests.Del(sessionID)
1414 cancel()
1415
1416 // Process any messages that were queued while summarizing.
1417 queuedMessages, ok := a.messageQueue.Get(sessionID)
1418 if !ok || len(queuedMessages) == 0 {
1419 return nil
1420 }
1421 firstQueuedMessage := queuedMessages[0]
1422 a.messageQueue.Set(sessionID, queuedMessages[1:])
1423 _, qErr := a.Run(ctx, firstQueuedMessage)
1424 return qErr
1425}
1426
1427func (a *sessionAgent) getCacheControlOptions() fantasy.ProviderOptions {
1428 if t, _ := strconv.ParseBool(os.Getenv("CRUSH_DISABLE_ANTHROPIC_CACHE")); t {
1429 return fantasy.ProviderOptions{}
1430 }
1431 return fantasy.ProviderOptions{
1432 anthropic.Name: &anthropic.ProviderCacheControlOptions{
1433 CacheControl: anthropic.CacheControl{Type: "ephemeral"},
1434 },
1435 bedrock.Name: &anthropic.ProviderCacheControlOptions{
1436 CacheControl: anthropic.CacheControl{Type: "ephemeral"},
1437 },
1438 vercel.Name: &anthropic.ProviderCacheControlOptions{
1439 CacheControl: anthropic.CacheControl{Type: "ephemeral"},
1440 },
1441 }
1442}
1443
1444func (a *sessionAgent) createUserMessage(ctx context.Context, call SessionAgentCall) (message.Message, error) {
1445 parts := []message.ContentPart{message.TextContent{Text: call.Prompt}}
1446 var attachmentParts []message.ContentPart
1447 for _, attachment := range call.Attachments {
1448 attachmentParts = append(attachmentParts, message.BinaryContent{Path: attachment.FilePath, MIMEType: attachment.MimeType, Data: attachment.Content})
1449 }
1450 parts = append(parts, attachmentParts...)
1451 msg, err := a.messages.Create(ctx, call.SessionID, message.CreateMessageParams{
1452 Role: message.User,
1453 Parts: parts,
1454 })
1455 if err != nil {
1456 return message.Message{}, fmt.Errorf("failed to create user message: %w", err)
1457 }
1458 return msg, nil
1459}
1460
1461func (a *sessionAgent) preparePrompt(msgs []message.Message, supportsImages bool, attachments ...message.Attachment) ([]fantasy.Message, []fantasy.FilePart) {
1462 var history []fantasy.Message
1463 if !a.isSubAgent {
1464 history = append(history, fantasy.NewUserMessage(
1465 fmt.Sprintf(
1466 "<system_reminder>%s</system_reminder>",
1467 `This is a reminder that your todo list is currently empty. DO NOT mention this to the user explicitly because they are already aware.
1468If you are working on tasks that would benefit from a todo list please use the "todos" tool to create one.
1469If not, please feel free to ignore. Again do not mention this message to the user.`,
1470 ),
1471 ))
1472 }
1473 // Collect all tool call IDs present in assistant messages and all tool
1474 // result IDs present in tool messages. This lets us detect both orphaned
1475 // tool results (result without a call) and orphaned tool calls (call
1476 // without a result).
1477 knownToolCallIDs := make(map[string]struct{})
1478 knownToolResultIDs := make(map[string]struct{})
1479 for _, m := range msgs {
1480 switch m.Role {
1481 case message.Assistant:
1482 for _, tc := range m.ToolCalls() {
1483 knownToolCallIDs[tc.ID] = struct{}{}
1484 }
1485 case message.Tool:
1486 for _, tr := range m.ToolResults() {
1487 knownToolResultIDs[tr.ToolCallID] = struct{}{}
1488 }
1489 }
1490 }
1491
1492 for _, m := range msgs {
1493 if len(m.Parts) == 0 {
1494 continue
1495 }
1496 // Assistant message without content or tool calls (cancelled before it returned anything).
1497 if m.Role == message.Assistant && len(m.ToolCalls()) == 0 && m.Content().Text == "" && m.ReasoningContent().String() == "" {
1498 continue
1499 }
1500 if m.Role == message.Tool {
1501 if msg, ok := filterOrphanedToolResults(m, knownToolCallIDs); ok {
1502 history = append(history, msg)
1503 }
1504 continue
1505 }
1506 aiMsgs := m.ToAIMessage()
1507 if !supportsImages {
1508 for i := range aiMsgs {
1509 if aiMsgs[i].Role == fantasy.MessageRoleUser {
1510 aiMsgs[i].Content = filterFileParts(aiMsgs[i].Content)
1511 }
1512 }
1513 }
1514 history = append(history, aiMsgs...)
1515
1516 if m.Role == message.Assistant {
1517 if msg, ok := syntheticToolResultsForOrphanedCalls(m, knownToolResultIDs); ok {
1518 history = append(history, msg)
1519 }
1520 }
1521 }
1522
1523 var files []fantasy.FilePart
1524 for _, attachment := range attachments {
1525 if attachment.IsText() {
1526 continue
1527 }
1528 files = append(files, fantasy.FilePart{
1529 Filename: attachment.FileName,
1530 Data: attachment.Content,
1531 MediaType: attachment.MimeType,
1532 })
1533 }
1534
1535 return history, files
1536}
1537
1538// filterFileParts removes fantasy.FilePart entries from a slice of message
1539// parts. Used to strip image attachments from historical user messages when
1540// the current model does not support them.
1541func filterFileParts(parts []fantasy.MessagePart) []fantasy.MessagePart {
1542 filtered := make([]fantasy.MessagePart, 0, len(parts))
1543 for _, part := range parts {
1544 if _, ok := fantasy.AsMessagePart[fantasy.FilePart](part); ok {
1545 continue
1546 }
1547 filtered = append(filtered, part)
1548 }
1549 return filtered
1550}
1551
1552// filterOrphanedToolResults converts a tool message to a fantasy.Message,
1553// dropping any tool result parts whose tool_call_id has no matching tool call
1554// in the known set. An orphaned result causes API validation to fail on every
1555// subsequent turn, permanently locking the session. Returns the filtered
1556// message and true if at least one valid part remains.
1557func filterOrphanedToolResults(m message.Message, knownToolCallIDs map[string]struct{}) (fantasy.Message, bool) {
1558 aiMsgs := m.ToAIMessage()
1559 if len(aiMsgs) == 0 {
1560 return fantasy.Message{}, false
1561 }
1562 var validParts []fantasy.MessagePart
1563 for _, part := range aiMsgs[0].Content {
1564 tr, ok := fantasy.AsMessagePart[fantasy.ToolResultPart](part)
1565 if !ok {
1566 validParts = append(validParts, part)
1567 continue
1568 }
1569 if _, known := knownToolCallIDs[tr.ToolCallID]; known {
1570 validParts = append(validParts, part)
1571 } else {
1572 slog.Warn(
1573 "Dropping orphaned tool result with no matching tool call",
1574 "tool_call_id", tr.ToolCallID,
1575 )
1576 }
1577 }
1578 if len(validParts) == 0 {
1579 return fantasy.Message{}, false
1580 }
1581 msg := aiMsgs[0]
1582 msg.Content = validParts
1583 return msg, true
1584}
1585
1586// syntheticToolResultsForOrphanedCalls returns a tool message containing
1587// synthetic tool results for any tool calls in the assistant message that
1588// have no matching result in knownToolResultIDs. LLM APIs require every
1589// tool_use to be immediately followed by a tool_result; an interrupted
1590// session can leave orphaned tool_use blocks that permanently lock the
1591// conversation. Returns the message and true if any synthetic results were
1592// produced.
1593func syntheticToolResultsForOrphanedCalls(m message.Message, knownToolResultIDs map[string]struct{}) (fantasy.Message, bool) {
1594 var syntheticParts []fantasy.MessagePart
1595 for _, tc := range m.ToolCalls() {
1596 if _, hasResult := knownToolResultIDs[tc.ID]; hasResult {
1597 continue
1598 }
1599 slog.Warn(
1600 "Injecting synthetic tool result for orphaned tool call",
1601 "tool_call_id", tc.ID,
1602 "tool_name", tc.Name,
1603 )
1604 syntheticParts = append(syntheticParts, fantasy.ToolResultPart{
1605 ToolCallID: tc.ID,
1606 Output: fantasy.ToolResultOutputContentError{
1607 Error: errors.New("tool call was interrupted and did not produce a result, you may retry this call if the result is still needed"),
1608 },
1609 })
1610 }
1611 if len(syntheticParts) == 0 {
1612 return fantasy.Message{}, false
1613 }
1614 return fantasy.Message{
1615 Role: fantasy.MessageRoleTool,
1616 Content: syntheticParts,
1617 }, true
1618}
1619
1620func (a *sessionAgent) getSessionMessages(ctx context.Context, session session.Session) ([]message.Message, error) {
1621 msgs, err := a.messages.List(ctx, session.ID)
1622 if err != nil {
1623 return nil, fmt.Errorf("failed to list messages: %w", err)
1624 }
1625
1626 if session.SummaryMessageID != "" {
1627 summaryMsgIndex := -1
1628 for i, msg := range msgs {
1629 if msg.ID == session.SummaryMessageID {
1630 summaryMsgIndex = i
1631 break
1632 }
1633 }
1634 if summaryMsgIndex != -1 {
1635 msgs = msgs[summaryMsgIndex:]
1636 msgs[0].Role = message.User
1637 }
1638 }
1639 return msgs, nil
1640}
1641
1642// generateTitle generates a session titled based on the initial prompt.
1643func (a *sessionAgent) generateTitle(ctx context.Context, sessionID string, userPrompt string) {
1644 if userPrompt == "" {
1645 return
1646 }
1647
1648 smallModel := a.smallModel.Get()
1649 largeModel := a.largeModel.Get()
1650 systemPromptPrefix := a.systemPromptPrefix.Get()
1651
1652 var maxOutputTokens int64 = 40
1653 if smallModel.CatwalkCfg.CanReason {
1654 maxOutputTokens = smallModel.CatwalkCfg.DefaultMaxTokens
1655 }
1656
1657 newAgent := func(m fantasy.LanguageModel, p []byte, tok int64) fantasy.Agent {
1658 return fantasy.NewAgent(
1659 m,
1660 fantasy.WithSystemPrompt(string(p)+"\n /no_think"),
1661 fantasy.WithMaxOutputTokens(tok),
1662 fantasy.WithUserAgent(userAgent),
1663 )
1664 }
1665
1666 streamCall := fantasy.AgentStreamCall{
1667 Prompt: fmt.Sprintf("Generate a concise title for the following content:\n\n%s\n <think>\n\n</think>", userPrompt),
1668 PrepareStep: func(callCtx context.Context, opts fantasy.PrepareStepFunctionOptions) (_ context.Context, prepared fantasy.PrepareStepResult, err error) {
1669 prepared.Messages = opts.Messages
1670 if systemPromptPrefix != "" {
1671 prepared.Messages = append([]fantasy.Message{
1672 fantasy.NewSystemMessage(systemPromptPrefix),
1673 }, prepared.Messages...)
1674 }
1675 return callCtx, prepared, nil
1676 },
1677 }
1678
1679 // Use the small model to generate the title.
1680 model := smallModel
1681 agent := newAgent(model.Model, titlePrompt, maxOutputTokens)
1682 resp, err := agent.Stream(ctx, streamCall)
1683 if err == nil {
1684 // We successfully generated a title with the small model.
1685 slog.Debug("Generated title with small model")
1686 } else {
1687 // It didn't work. Let's try with the big model.
1688 slog.Error("Error generating title with small model; trying big model", "err", err)
1689 model = largeModel
1690 agent = newAgent(model.Model, titlePrompt, maxOutputTokens)
1691 resp, err = agent.Stream(ctx, streamCall)
1692 if err == nil {
1693 slog.Debug("Generated title with large model")
1694 } else {
1695 // Welp, the large model didn't work either. Use the default
1696 // session name and return.
1697 slog.Error("Error generating title with large model", "err", err)
1698 saveErr := a.sessions.Rename(ctx, sessionID, DefaultSessionName)
1699 if saveErr != nil {
1700 slog.Error("Failed to save session title", "error", saveErr)
1701 }
1702 return
1703 }
1704 }
1705
1706 if resp == nil {
1707 // Actually, we didn't get a response so we can't. Use the default
1708 // session name and return.
1709 slog.Error("Response is nil; can't generate title")
1710 saveErr := a.sessions.Rename(ctx, sessionID, DefaultSessionName)
1711 if saveErr != nil {
1712 slog.Error("Failed to save session title", "error", saveErr)
1713 }
1714 return
1715 }
1716
1717 // Clean up title.
1718 var title string
1719 title = strings.ReplaceAll(resp.Response.Content.Text(), "\n", " ")
1720
1721 // Remove thinking tags if present.
1722 title = thinkTagRegex.ReplaceAllString(title, "")
1723 title = orphanThinkTagRegex.ReplaceAllString(title, "")
1724
1725 title = strings.TrimSpace(title)
1726 title = cmp.Or(title, DefaultSessionName)
1727
1728 // Calculate usage and cost.
1729 var openrouterCost *float64
1730 for _, step := range resp.Steps {
1731 stepCost := a.openrouterCost(step.ProviderMetadata)
1732 if stepCost != nil {
1733 newCost := *stepCost
1734 if openrouterCost != nil {
1735 newCost += *openrouterCost
1736 }
1737 openrouterCost = &newCost
1738 }
1739 }
1740
1741 modelConfig := model.CatwalkCfg
1742 cost := modelConfig.CostPer1MInCached/1e6*float64(resp.TotalUsage.CacheCreationTokens) +
1743 modelConfig.CostPer1MOutCached/1e6*float64(resp.TotalUsage.CacheReadTokens) +
1744 modelConfig.CostPer1MIn/1e6*float64(resp.TotalUsage.InputTokens) +
1745 modelConfig.CostPer1MOut/1e6*float64(resp.TotalUsage.OutputTokens)
1746
1747 // Use override cost if available (e.g., from OpenRouter).
1748 if openrouterCost != nil {
1749 cost = *openrouterCost
1750 }
1751
1752 // Skip cost accumulation
1753 if model.FlatRate {
1754 cost = 0
1755 }
1756
1757 promptTokens := resp.TotalUsage.InputTokens + resp.TotalUsage.CacheCreationTokens
1758 completionTokens := resp.TotalUsage.OutputTokens
1759
1760 // Atomically update only title and usage fields to avoid overriding other
1761 // concurrent session updates.
1762 saveErr := a.sessions.UpdateTitleAndUsage(ctx, sessionID, title, promptTokens, completionTokens, cost)
1763 if saveErr != nil {
1764 slog.Error("Failed to save session title and usage", "error", saveErr)
1765 return
1766 }
1767}
1768
1769func (a *sessionAgent) openrouterCost(metadata fantasy.ProviderMetadata) *float64 {
1770 openrouterMetadata, ok := metadata[openrouter.Name]
1771 if !ok {
1772 return nil
1773 }
1774
1775 opts, ok := openrouterMetadata.(*openrouter.ProviderMetadata)
1776 if !ok {
1777 return nil
1778 }
1779 return &opts.Usage.Cost
1780}
1781
1782func (a *sessionAgent) updateSessionUsage(model Model, session *session.Session, usage fantasy.Usage, overrideCost *float64, estimated bool) {
1783 if !usageIsZero(usage) {
1784 session.EstimatedUsage = estimated
1785 }
1786
1787 modelConfig := model.CatwalkCfg
1788 cost := modelConfig.CostPer1MInCached/1e6*float64(usage.CacheCreationTokens) +
1789 modelConfig.CostPer1MOutCached/1e6*float64(usage.CacheReadTokens) +
1790 modelConfig.CostPer1MIn/1e6*float64(usage.InputTokens) +
1791 modelConfig.CostPer1MOut/1e6*float64(usage.OutputTokens)
1792
1793 if !estimated {
1794 a.eventTokensUsed(session.ID, model, usage, cost)
1795 }
1796
1797 if estimated {
1798 cost = 0
1799 } else {
1800 // Use override cost if available (e.g., from OpenRouter).
1801 if overrideCost != nil {
1802 cost = *overrideCost
1803 }
1804
1805 // Skip cost accumulation
1806 if model.FlatRate {
1807 cost = 0
1808 }
1809 }
1810
1811 session.Cost += cost
1812 updateSessionTokenCounters(session, usage)
1813}
1814
1815func updateSessionTokenCounters(session *session.Session, usage fantasy.Usage) {
1816 if usage.OutputTokens != 0 {
1817 session.CompletionTokens = usage.OutputTokens
1818 }
1819 if promptTokens := usage.InputTokens + usage.CacheReadTokens; promptTokens != 0 {
1820 session.PromptTokens = promptTokens
1821 }
1822}
1823
1824func summaryCompletionTokens(usage fantasy.Usage, summaryMessage message.Message) int64 {
1825 if usage.OutputTokens != 0 {
1826 return usage.OutputTokens
1827 }
1828 return approxTokenCount(summaryMessage.Content().Text) + approxTokenCount(summaryMessage.ReasoningContent().String())
1829}
1830
1831func (a *sessionAgent) Cancel(sessionID string) {
1832 // Serialize against the dispatch handoff in Run so the accepted ->
1833 // (cancel-on-entry | queued | active) transition is atomic against
1834 // this cancel. Every cancel observes at least one of: an active
1835 // request, an accepted run (recorded as a pending cancel), or a
1836 // queue entry it then clears. If none of those hold, an idle Escape
1837 // is a true no-op and must not poison the next prompt.
1838 mu := a.sessionMu(sessionID)
1839 mu.Lock()
1840 defer mu.Unlock()
1841
1842 // Cancel regular requests. Don't use Take() here - we need the entry to
1843 // remain in activeRequests so IsBusy() returns true until the goroutine
1844 // fully completes (including error handling that may access the DB).
1845 // The defer in processRequest will clean up the entry.
1846 if cancel, ok := a.activeRequests.Get(sessionID); ok && cancel != nil {
1847 slog.Debug("Request cancellation initiated", "session_id", sessionID)
1848 cancel()
1849 }
1850
1851 // Also check for summarize requests.
1852 if cancel, ok := a.activeRequests.Get(sessionID + "-summarize"); ok && cancel != nil {
1853 slog.Debug("Summarize cancellation initiated", "session_id", sessionID)
1854 cancel()
1855 }
1856
1857 // Record a pending cancel only when a dispatched-but-not-yet-active
1858 // run exists. This catches runs still in the goroutine scheduler or
1859 // about to enter Run's busy-queue branch, while leaving an idle
1860 // session untouched. Active and accepted are not mutually exclusive:
1861 // when a run is active and a follow-up has been accepted, both the
1862 // cancel above and this pending record fire.
1863 //
1864 // Raise the session's cancel mark to the latest accept sequence
1865 // assigned so far. Every prompt currently accepted-but-not-yet-
1866 // active has a sequence at or below that value, so one cancel covers
1867 // all of them; a prompt accepted after this cancel gets a strictly
1868 // higher sequence and is never poisoned. Using max keeps repeated
1869 // cancels idempotent while the same prompts are in flight and lets a
1870 // later cancel extend coverage to prompts accepted since.
1871 a.acceptedMu.Lock()
1872 count, ok := a.acceptedRuns.Get(sessionID)
1873 mark := a.acceptSeqGen
1874 a.acceptedMu.Unlock()
1875 if ok && count > 0 {
1876 slog.Debug("Recording cancel mark for accepted runs", "session_id", sessionID, "count", count, "mark", mark)
1877 existing, _ := a.cancelMark.Get(sessionID)
1878 a.cancelMark.Set(sessionID, max(existing, mark))
1879 }
1880
1881 if a.QueuedPrompts(sessionID) > 0 {
1882 slog.Debug("Clearing queued prompts", "session_id", sessionID)
1883 a.clearQueueAndNotify(sessionID)
1884 }
1885}
1886
1887func (a *sessionAgent) ClearQueue(sessionID string) {
1888 if a.QueuedPrompts(sessionID) > 0 {
1889 slog.Debug("Clearing queued prompts", "session_id", sessionID)
1890 a.clearQueueAndNotify(sessionID)
1891 }
1892}
1893
1894func (a *sessionAgent) CancelAll() {
1895 if !a.IsBusy() {
1896 return
1897 }
1898 for key := range a.activeRequests.Seq2() {
1899 a.Cancel(key) // key is sessionID
1900 }
1901
1902 timeout := time.After(5 * time.Second)
1903 for a.IsBusy() {
1904 select {
1905 case <-timeout:
1906 return
1907 default:
1908 time.Sleep(200 * time.Millisecond)
1909 }
1910 }
1911}
1912
1913func (a *sessionAgent) IsBusy() bool {
1914 var busy bool
1915 for cancelFunc := range a.activeRequests.Seq() {
1916 if cancelFunc != nil {
1917 busy = true
1918 break
1919 }
1920 }
1921 return busy
1922}
1923
1924func (a *sessionAgent) IsSessionBusy(sessionID string) bool {
1925 _, busy := a.activeRequests.Get(sessionID)
1926 return busy
1927}
1928
1929func (a *sessionAgent) QueuedPrompts(sessionID string) int {
1930 l, ok := a.messageQueue.Get(sessionID)
1931 if !ok {
1932 return 0
1933 }
1934 return len(l)
1935}
1936
1937func (a *sessionAgent) QueuedPromptsList(sessionID string) []string {
1938 l, ok := a.messageQueue.Get(sessionID)
1939 if !ok {
1940 return nil
1941 }
1942 prompts := make([]string, len(l))
1943 for i, call := range l {
1944 prompts[i] = call.Prompt
1945 }
1946 return prompts
1947}
1948
1949func (a *sessionAgent) SetModels(large Model, small Model) {
1950 a.largeModel.Set(large)
1951 a.smallModel.Set(small)
1952}
1953
1954func (a *sessionAgent) SetTools(tools []fantasy.AgentTool) {
1955 a.tools.SetSlice(tools)
1956}
1957
1958func (a *sessionAgent) SetSystemPrompt(systemPrompt string) {
1959 a.systemPrompt.Set(systemPrompt)
1960}
1961
1962func (a *sessionAgent) Model() Model {
1963 return a.largeModel.Get()
1964}
1965
1966// convertToToolResult converts a fantasy tool result to a message tool result.
1967func (a *sessionAgent) convertToToolResult(result fantasy.ToolResultContent) message.ToolResult {
1968 baseResult := message.ToolResult{
1969 ToolCallID: result.ToolCallID,
1970 Name: result.ToolName,
1971 Metadata: result.ClientMetadata,
1972 }
1973
1974 switch result.Result.GetType() {
1975 case fantasy.ToolResultContentTypeText:
1976 if r, ok := fantasy.AsToolResultOutputType[fantasy.ToolResultOutputContentText](result.Result); ok {
1977 baseResult.Content = r.Text
1978 }
1979 case fantasy.ToolResultContentTypeError:
1980 if r, ok := fantasy.AsToolResultOutputType[fantasy.ToolResultOutputContentError](result.Result); ok {
1981 baseResult.Content = r.Error.Error()
1982 baseResult.IsError = true
1983 }
1984 case fantasy.ToolResultContentTypeMedia:
1985 if r, ok := fantasy.AsToolResultOutputType[fantasy.ToolResultOutputContentMedia](result.Result); ok {
1986 if !stringext.IsValidBase64(r.Data) {
1987 slog.Warn(
1988 "Tool returned media with invalid base64 data, discarding image",
1989 "tool", result.ToolName,
1990 "tool_call_id", result.ToolCallID,
1991 )
1992 baseResult.Content = "Tool returned image data with invalid encoding"
1993 baseResult.IsError = true
1994 } else {
1995 content := r.Text
1996 if content == "" {
1997 content = fmt.Sprintf("Loaded %s content", r.MediaType)
1998 }
1999 baseResult.Content = content
2000 baseResult.Data = r.Data
2001 baseResult.MIMEType = r.MediaType
2002 }
2003 }
2004 }
2005
2006 return baseResult
2007}
2008
2009// workaroundProviderMediaLimitations converts media content in tool results to
2010// user messages for providers that don't natively support images in tool results.
2011//
2012// Problem: OpenAI, Google, OpenRouter, and other OpenAI-compatible providers
2013// don't support sending images/media in tool result messages - they only accept
2014// text in tool results. However, they DO support images in user messages.
2015//
2016// If we send media in tool results to these providers, the API returns an error.
2017//
2018// Solution: For these providers, we:
2019// 1. Replace the media in the tool result with a text placeholder
2020// 2. Inject a user message immediately after with the image as a file attachment
2021// 3. This maintains the tool execution flow while working around API limitations
2022//
2023// Anthropic and Bedrock support images natively in tool results, so we skip
2024// this workaround for them.
2025//
2026// Example transformation:
2027//
2028// BEFORE: [tool result: image data]
2029// AFTER: [tool result: "Image loaded - see attached"], [user: image attachment]
2030func (a *sessionAgent) workaroundProviderMediaLimitations(messages []fantasy.Message, largeModel Model) []fantasy.Message {
2031 providerSupportsMedia := largeModel.ModelCfg.Provider == string(catwalk.InferenceProviderAnthropic) ||
2032 largeModel.ModelCfg.Provider == string(catwalk.InferenceProviderBedrock) ||
2033 largeModel.ModelCfg.Provider == string(catwalk.InferenceProviderBedrockEurope)
2034
2035 if providerSupportsMedia {
2036 return messages
2037 }
2038
2039 convertedMessages := make([]fantasy.Message, 0, len(messages))
2040
2041 for _, msg := range messages {
2042 if msg.Role != fantasy.MessageRoleTool {
2043 convertedMessages = append(convertedMessages, msg)
2044 continue
2045 }
2046
2047 textParts := make([]fantasy.MessagePart, 0, len(msg.Content))
2048 var mediaFiles []fantasy.FilePart
2049
2050 for _, part := range msg.Content {
2051 toolResult, ok := fantasy.AsMessagePart[fantasy.ToolResultPart](part)
2052 if !ok {
2053 textParts = append(textParts, part)
2054 continue
2055 }
2056
2057 if media, ok := fantasy.AsToolResultOutputType[fantasy.ToolResultOutputContentMedia](toolResult.Output); ok {
2058 decoded, err := base64.StdEncoding.DecodeString(media.Data)
2059 if err != nil {
2060 slog.Warn("Failed to decode media data", "error", err)
2061 textParts = append(textParts, part)
2062 continue
2063 }
2064
2065 mediaFiles = append(mediaFiles, fantasy.FilePart{
2066 Data: decoded,
2067 MediaType: media.MediaType,
2068 Filename: fmt.Sprintf("tool-result-%s", toolResult.ToolCallID),
2069 })
2070
2071 textParts = append(textParts, fantasy.ToolResultPart{
2072 ToolCallID: toolResult.ToolCallID,
2073 Output: fantasy.ToolResultOutputContentText{
2074 Text: "[Image/media content loaded - see attached file]",
2075 },
2076 ProviderOptions: toolResult.ProviderOptions,
2077 })
2078 } else {
2079 textParts = append(textParts, part)
2080 }
2081 }
2082
2083 convertedMessages = append(convertedMessages, fantasy.Message{
2084 Role: fantasy.MessageRoleTool,
2085 Content: textParts,
2086 })
2087
2088 if len(mediaFiles) > 0 {
2089 convertedMessages = append(convertedMessages, fantasy.NewUserMessage(
2090 "Here is the media content from the tool result:",
2091 mediaFiles...,
2092 ))
2093 }
2094 }
2095
2096 return convertedMessages
2097}
2098
2099// buildSummaryPrompt constructs the prompt text for session summarization.
2100func buildSummaryPrompt(todos []session.Todo) string {
2101 var sb strings.Builder
2102 sb.WriteString("Provide a detailed summary of our conversation above.")
2103 if len(todos) > 0 {
2104 sb.WriteString("\n\n## Current Todo List\n\n")
2105 for _, t := range todos {
2106 fmt.Fprintf(&sb, "- [%s] %s\n", t.Status, t.Content)
2107 }
2108 sb.WriteString("\nInclude these tasks and their statuses in your summary. ")
2109 sb.WriteString("Instruct the resuming assistant to use the `todos` tool to continue tracking progress on these tasks.")
2110 }
2111 return sb.String()
2112}
2113
2114func providerRetryLogFields(err *fantasy.ProviderError, delay time.Duration) []any {
2115 fields := []any{
2116 "retry_delay", delay.String(),
2117 }
2118 if err == nil {
2119 return fields
2120 }
2121 fields = append(fields, "status_code", err.StatusCode)
2122 if err.Title != "" {
2123 fields = append(fields, "title", err.Title)
2124 }
2125 if err.Message != "" {
2126 fields = append(fields, "message", err.Message)
2127 }
2128 return fields
2129}