1// Package agent is the core orchestration layer for Crush AI agents.
2//
3// It provides session-based AI agent functionality for managing
4// conversations, tool execution, and message handling. It coordinates
5// interactions between language models, messages, sessions, and tools while
6// handling features like automatic summarization, queuing, and token
7// management.
8package agent
9
10import (
11 "cmp"
12 "context"
13 _ "embed"
14 "encoding/base64"
15 "errors"
16 "fmt"
17 "log/slog"
18 "net/http"
19 "os"
20 "regexp"
21 "strconv"
22 "strings"
23 "sync"
24 "sync/atomic"
25 "time"
26
27 "charm.land/catwalk/pkg/catwalk"
28 "charm.land/fantasy"
29 "charm.land/fantasy/providers/anthropic"
30 "charm.land/fantasy/providers/bedrock"
31 "charm.land/fantasy/providers/google"
32 "charm.land/fantasy/providers/openai"
33 "charm.land/fantasy/providers/openrouter"
34 "charm.land/fantasy/providers/vercel"
35 "charm.land/lipgloss/v2"
36 "github.com/charmbracelet/crush/internal/agent/hyper"
37 "github.com/charmbracelet/crush/internal/agent/notify"
38 "github.com/charmbracelet/crush/internal/agent/tools"
39 "github.com/charmbracelet/crush/internal/agent/tools/mcp"
40 "github.com/charmbracelet/crush/internal/config"
41 "github.com/charmbracelet/crush/internal/csync"
42 "github.com/charmbracelet/crush/internal/message"
43 "github.com/charmbracelet/crush/internal/pubsub"
44 "github.com/charmbracelet/crush/internal/session"
45 "github.com/charmbracelet/crush/internal/stringext"
46 "github.com/charmbracelet/crush/internal/version"
47 "github.com/charmbracelet/x/exp/charmtone"
48)
49
50const (
51 DefaultSessionName = "Untitled Session"
52
53 // Constants for auto-summarization thresholds
54 largeContextWindowThreshold = 200_000
55 largeContextWindowBuffer = 20_000
56 smallContextWindowRatio = 0.2
57)
58
59var userAgent = fmt.Sprintf("Charm-Crush/%s (https://charm.land/crush)", version.Version)
60
61//go:embed templates/title.md
62var titlePrompt []byte
63
64//go:embed templates/summary.md
65var summaryPrompt []byte
66
67// Used to remove <think> tags from generated titles.
68var (
69 thinkTagRegex = regexp.MustCompile(`(?s)<think>.*?</think>`)
70 orphanThinkTagRegex = regexp.MustCompile(`</?think>`)
71)
72
73type SessionAgentCall struct {
74 SessionID string
75 // RunID, when non-empty, is the caller-supplied correlator that
76 // gets echoed back on the notify.RunComplete event emitted for
77 // this turn. It is preserved when the call is enqueued behind a
78 // busy session so the queued turn's terminal event is still
79 // recognisable to the original caller. Callers that need a
80 // reliable completion contract (e.g. `crush run` against a
81 // session that may be busy) MUST set it; SessionID alone is
82 // ambiguous when concurrent turns share the same session.
83 RunID string
84 Prompt string
85 ProviderOptions fantasy.ProviderOptions
86 Attachments []message.Attachment
87 MaxOutputTokens int64
88 Temperature *float64
89 TopP *float64
90 TopK *int64
91 FrequencyPenalty *float64
92 PresencePenalty *float64
93 NonInteractive bool
94 // OnComplete, when non-nil, replaces the default RunComplete
95 // publish path: the inner Run hands the terminal payload to this
96 // callback instead of emitting it on the RunComplete broker. The
97 // coordinator uses this hook to coalesce the unauthorized →
98 // re-auth → retry chain into a single user-visible terminal
99 // event, so non-interactive clients (e.g. `crush run`) don't
100 // exit on a stale failed-attempt RunComplete before the
101 // successful retry. It is intentionally stripped when queueing
102 // a busy-session call (see Run): the originating
103 // coordinator.Run has long returned by the time the queued
104 // recursion drains, so falling back to the default broker
105 // publish keeps the event visible to subscribers.
106 OnComplete func(notify.RunComplete)
107 // Accepted, when non-nil, is the accept reservation taken by
108 // BeginAccepted before the call was dispatched onto a goroutine
109 // (the client/server fire-and-forget path). Run consumes it under
110 // dispatchMu[SessionID] once the accepted -> (cancel-on-entry |
111 // queued | active) transition has been chosen. When nil
112 // (in-process / local callers like AppWorkspace), behavior is
113 // unchanged and no accept tracking applies.
114 Accepted *AcceptedRun
115 // acceptSeq carries the accept sequence of the handle that produced
116 // this call after it has been enqueued and its Accepted handle
117 // stripped. The queue-drain paths compare it against a session's
118 // cancel mark so a follow-up queued before a cancel is dropped while
119 // one queued after the cancel survives. 0 means untracked (an
120 // in-process enqueue with no accept reservation), which the drain
121 // paths treat as covered by any present mark, preserving the
122 // pre-sequence behavior.
123 acceptSeq uint64
124}
125
126type SessionAgent interface {
127 Run(context.Context, SessionAgentCall) (*fantasy.AgentResult, error)
128 BeginAccepted(sessionID string) *AcceptedRun
129 SetModels(large Model, small Model)
130 SetTools(tools []fantasy.AgentTool)
131 SetSystemPrompt(systemPrompt string)
132 Cancel(sessionID string)
133 CancelAll()
134 IsSessionBusy(sessionID string) bool
135 IsBusy() bool
136 QueuedPrompts(sessionID string) int
137 QueuedPromptsList(sessionID string) []string
138 ClearQueue(sessionID string)
139 Summarize(context.Context, string, fantasy.ProviderOptions) error
140 Model() Model
141}
142
143type Model struct {
144 Model fantasy.LanguageModel
145 CatwalkCfg catwalk.Model
146 ModelCfg config.SelectedModel
147 FlatRate bool
148}
149
150type sessionAgent struct {
151 largeModel *csync.Value[Model]
152 smallModel *csync.Value[Model]
153 systemPromptPrefix *csync.Value[string]
154 systemPrompt *csync.Value[string]
155 tools *csync.Slice[fantasy.AgentTool]
156
157 isSubAgent bool
158 sessions session.Service
159 messages message.Service
160 disableAutoSummarize bool
161 isYolo bool
162 notify pubsub.Publisher[notify.Notification]
163 runComplete pubsub.Publisher[notify.RunComplete]
164
165 messageQueue *csync.Map[string, []SessionAgentCall]
166 activeRequests *csync.Map[string, context.CancelFunc]
167
168 // dispatchMu holds a per-session mutex that serializes the
169 // accepted -> (cancel-on-entry | queued | active) transition in
170 // Run against a concurrent Cancel. The lock is held only during
171 // the brief handoff (no DB or LLM I/O under the lock).
172 dispatchMu *csync.Map[string, *sync.Mutex]
173 // acceptedRuns counts dispatched-but-not-yet-active runs per
174 // session. A counter > 0 means a dispatched prompt is in flight
175 // and has not yet completed the dispatch handoff in Run. Only
176 // BeginAccepted increments it; only AcceptedRun.Close decrements
177 // it.
178 acceptedRuns *csync.Map[string, int]
179 // cancelMark records, per session, a high-water accept sequence: an
180 // accepted handle is canceled by it iff the handle's sequence is at
181 // or below the mark. Cancel raises the mark to the latest sequence
182 // assigned at cancel time, so a single Cancel covers every prompt
183 // accepted-but-not-yet-active then, while a prompt accepted later
184 // (higher sequence) is never poisoned. Absent or 0 means no pending
185 // cancel. It is only raised by Cancel when acceptedRuns > 0, so an
186 // idle Escape never records a mark.
187 cancelMark *csync.Map[string, uint64]
188 // dispatchMuCreate guards lazy creation of per-session entries in
189 // dispatchMu so two goroutines can't race to lock different mutex
190 // instances for the same session.
191 dispatchMuCreate sync.Mutex
192 // acceptedMu serializes increments/decrements of acceptedRuns and
193 // the assignment of accept sequence numbers from acceptSeqGen. It
194 // is separate from dispatchMu so AcceptedRun.Close (which may run
195 // while Run holds dispatchMu for the same session) does not
196 // deadlock by re-entering the dispatch lock.
197 acceptedMu sync.Mutex
198 // acceptSeqGen is the monotonic source of accept sequence numbers.
199 // Each BeginAccepted increments it under acceptedMu and stamps the
200 // returned handle, so sequences strictly increase in accept order
201 // across the agent. Cancel uses its current value as the per-session
202 // high-water mark.
203 acceptSeqGen uint64
204}
205
206type SessionAgentOptions struct {
207 LargeModel Model
208 SmallModel Model
209 SystemPromptPrefix string
210 SystemPrompt string
211 IsSubAgent bool
212 DisableAutoSummarize bool
213 IsYolo bool
214 Sessions session.Service
215 Messages message.Service
216 Tools []fantasy.AgentTool
217 Notify pubsub.Publisher[notify.Notification]
218 RunComplete pubsub.Publisher[notify.RunComplete]
219}
220
221func NewSessionAgent(
222 opts SessionAgentOptions,
223) SessionAgent {
224 return &sessionAgent{
225 largeModel: csync.NewValue(opts.LargeModel),
226 smallModel: csync.NewValue(opts.SmallModel),
227 systemPromptPrefix: csync.NewValue(opts.SystemPromptPrefix),
228 systemPrompt: csync.NewValue(opts.SystemPrompt),
229 isSubAgent: opts.IsSubAgent,
230 sessions: opts.Sessions,
231 messages: opts.Messages,
232 disableAutoSummarize: opts.DisableAutoSummarize,
233 tools: csync.NewSliceFrom(opts.Tools),
234 isYolo: opts.IsYolo,
235 notify: opts.Notify,
236 runComplete: opts.RunComplete,
237 messageQueue: csync.NewMap[string, []SessionAgentCall](),
238 activeRequests: csync.NewMap[string, context.CancelFunc](),
239 dispatchMu: csync.NewMap[string, *sync.Mutex](),
240 acceptedRuns: csync.NewMap[string, int](),
241 cancelMark: csync.NewMap[string, uint64](),
242 }
243}
244
245// AcceptedRun owns exactly one accept reservation taken by
246// BeginAccepted. It is the only carrier of accept-state across the
247// backend.runAgent / Coordinator.Run / sessionAgent.Run layers: a
248// counter > 0 means a dispatched prompt is in flight and has not yet
249// completed the dispatch handoff in Run. Close is the only way to
250// release the reservation and is idempotent.
251type AcceptedRun struct {
252 agent *sessionAgent
253 sessionID string
254 // seq is the monotonic accept sequence stamped by BeginAccepted. A
255 // cancel covers this handle iff seq is at or below the session's
256 // cancel mark, so a handle accepted after a cancel (higher seq) is
257 // never poisoned by it.
258 seq uint64
259 done atomic.Bool
260}
261
262// Close decrements the accept counter for this reservation. It is safe
263// to call multiple times; only the first call has effect.
264func (r *AcceptedRun) Close() {
265 if r == nil {
266 return
267 }
268 if !r.done.CompareAndSwap(false, true) {
269 return
270 }
271 r.agent.endAccepted(r.sessionID)
272}
273
274// SessionID exposes the session this reservation is for so the run path
275// can use it without an extra parameter.
276func (r *AcceptedRun) SessionID() string {
277 if r == nil {
278 return ""
279 }
280 return r.sessionID
281}
282
283// BeginAccepted increments the accept counter for sessionID and returns
284// a handle whose Close is the only way to decrement it. It is the only
285// entry point that mutates acceptedRuns.
286func (a *sessionAgent) BeginAccepted(sessionID string) *AcceptedRun {
287 a.acceptedMu.Lock()
288 defer a.acceptedMu.Unlock()
289 count, _ := a.acceptedRuns.Get(sessionID)
290 a.acceptedRuns.Set(sessionID, count+1)
291 a.acceptSeqGen++
292 return &AcceptedRun{agent: a, sessionID: sessionID, seq: a.acceptSeqGen}
293}
294
295// endAccepted decrements the accept counter for sessionID. It is only
296// called via AcceptedRun.Close. It uses a dedicated lock (not the
297// per-session dispatch mutex) so it can run while Run holds dispatchMu
298// for the same session without deadlocking.
299//
300// When the count reaches zero the session's cancel mark is dropped: no
301// accepted handle remains for it to cover, and any handle accepted later
302// gets a strictly higher sequence that the mark would not match anyway.
303// Handles canceled on entry never reach RunComplete, so this is the only
304// place that clears the mark for an all-canceled batch. Sibling handles
305// covered by the same mark are serialized on the per-session dispatch
306// mutex and read the mark before they Close, so this never clears it out
307// from under a covered handle still waiting to enter Run.
308func (a *sessionAgent) endAccepted(sessionID string) {
309 a.acceptedMu.Lock()
310 defer a.acceptedMu.Unlock()
311 count, ok := a.acceptedRuns.Get(sessionID)
312 if !ok || count <= 1 {
313 a.acceptedRuns.Del(sessionID)
314 a.cancelMark.Del(sessionID)
315 return
316 }
317 a.acceptedRuns.Set(sessionID, count-1)
318}
319
320// sessionMu returns the per-session dispatch mutex, creating it on first
321// use. Creation is guarded so concurrent callers always observe the same
322// mutex instance for a given session.
323func (a *sessionAgent) sessionMu(sessionID string) *sync.Mutex {
324 if mu, ok := a.dispatchMu.Get(sessionID); ok {
325 return mu
326 }
327 a.dispatchMuCreate.Lock()
328 defer a.dispatchMuCreate.Unlock()
329 if mu, ok := a.dispatchMu.Get(sessionID); ok {
330 return mu
331 }
332 mu := &sync.Mutex{}
333 a.dispatchMu.Set(sessionID, mu)
334 return mu
335}
336
337// enqueueCall appends call to the session's message queue. The
338// OnComplete hook is stripped: the caller that supplied it (typically
339// coordinator.Run) has its own retry/coalesce scope that ends when it
340// returns, so by the time the queue drains nobody is left to consume the
341// buffered terminal event. The recursive Run falls back to the default
342// broker publish, which is what existing subscribers expect for queued
343// turns.
344func (a *sessionAgent) enqueueCall(call SessionAgentCall) {
345 existing, ok := a.messageQueue.Get(call.SessionID)
346 if !ok {
347 existing = []SessionAgentCall{}
348 }
349 queued := call
350 if call.Accepted != nil {
351 // Preserve the accept sequence after the handle is stripped so
352 // the queue-drain paths can tell a follow-up queued before a
353 // cancel (covered by the mark) from one queued after it.
354 queued.acceptSeq = call.Accepted.seq
355 }
356 queued.OnComplete = nil
357 queued.Accepted = nil
358 existing = append(existing, queued)
359 a.messageQueue.Set(call.SessionID, existing)
360}
361
362// drainQueueForStep partitions the session's queued calls for the current
363// streaming step under the per-session dispatch mutex so the filtering is
364// atomic against a concurrent Cancel: canceledBySeq requires the caller to
365// hold that mutex, and evaluating it here (rather than after unlocking)
366// prevents a cancel recorded between the drain and the check from being
367// observed inconsistently.
368//
369// Calls covered by a pending cancel are dropped; the dropped ones that
370// carry a RunID are returned in canceledWithRunID so the caller can
371// publish their terminal cancelled RunComplete (a caller waiting on that
372// RunID, e.g. `crush run`, would otherwise hang). Uncanceled calls without
373// a RunID are returned in fold to be folded into the active turn,
374// preserving the existing follow-up behavior. Uncanceled calls that carry
375// a RunID are left in the queue so each runs as its own turn via the
376// recursive run path and publishes its own RunComplete, giving every
377// RunID-bearing prompt an explicit lifecycle instead of being silently
378// absorbed into another turn. fold is processed by the caller without the
379// lock held.
380func (a *sessionAgent) drainQueueForStep(sessionID string) (fold, canceledWithRunID []SessionAgentCall) {
381 dispatchLock := a.sessionMu(sessionID)
382 dispatchLock.Lock()
383 defer dispatchLock.Unlock()
384 queuedCalls, _ := a.messageQueue.Get(sessionID)
385 var keep []SessionAgentCall
386 for _, queued := range queuedCalls {
387 if a.canceledBySeq(sessionID, queued.acceptSeq) {
388 if queued.RunID != "" {
389 canceledWithRunID = append(canceledWithRunID, queued)
390 }
391 continue
392 }
393 if queued.RunID != "" {
394 keep = append(keep, queued)
395 continue
396 }
397 fold = append(fold, queued)
398 }
399 if len(keep) == 0 {
400 a.messageQueue.Del(sessionID)
401 } else {
402 a.messageQueue.Set(sessionID, keep)
403 }
404 return fold, canceledWithRunID
405}
406
407// publishCanceledQueueDrops emits a terminal cancelled RunComplete for
408// every dropped queued call that carries a RunID. A queued prompt removed
409// from the queue without ever running — covered by a pending cancel, or
410// cleared by Cancel/ClearQueue — would otherwise leave a caller blocked on
411// that RunID: `crush run` ignores live message events and exits only on a
412// RunComplete whose RunID matches. Calls without a RunID had no such waiter
413// and are dropped silently as before. A detached, bounded context keeps the
414// must-deliver publish alive even when the run context that triggered the
415// drop is already canceled.
416func (a *sessionAgent) publishCanceledQueueDrops(drops []SessionAgentCall) {
417 var hasRunID bool
418 for _, d := range drops {
419 if d.RunID != "" {
420 hasRunID = true
421 break
422 }
423 }
424 if !hasRunID {
425 return
426 }
427 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
428 defer cancel()
429 for _, d := range drops {
430 if d.RunID == "" {
431 continue
432 }
433 a.publishRunComplete(ctx, d, notify.RunComplete{
434 SessionID: d.SessionID,
435 RunID: d.RunID,
436 Cancelled: true,
437 })
438 }
439}
440
441// clearQueueAndNotify removes all queued prompts for the session and
442// publishes a terminal cancelled RunComplete for any that carried a RunID,
443// so callers waiting on those RunIDs (e.g. `crush run`) are not left
444// hanging when their queued prompt is discarded without running.
445func (a *sessionAgent) clearQueueAndNotify(sessionID string) {
446 queued, ok := a.messageQueue.Get(sessionID)
447 a.messageQueue.Del(sessionID)
448 if !ok {
449 return
450 }
451 a.publishCanceledQueueDrops(queued)
452}
453
454// clearPendingCancel removes any pending-cancel mark for sessionID. It
455// takes the per-session dispatch lock so it is ordered against Cancel
456// and the dispatch handoff.
457func (a *sessionAgent) clearPendingCancel(sessionID string) {
458 mu := a.sessionMu(sessionID)
459 mu.Lock()
460 defer mu.Unlock()
461 a.cancelMark.Del(sessionID)
462}
463
464// canceledBySeq reports whether an accepted handle or queued call with
465// the given accept sequence is covered by a pending cancel for the
466// session. Callers must hold the session's dispatch mutex. A tracked
467// sequence (seq > 0) is covered only when it is at or below the cancel
468// high-water mark, so a prompt accepted after the cancel (higher seq) is
469// never poisoned. An untracked sequence (seq == 0, an in-process enqueue
470// with no accept reservation) is covered whenever any mark is present,
471// preserving the pre-sequence behavior. The mark is not consumed: it
472// stays so every sibling handle it covers observes the same cancel, and
473// a later handle (higher seq) ignores it regardless.
474func (a *sessionAgent) canceledBySeq(sessionID string, seq uint64) bool {
475 mark, ok := a.cancelMark.Get(sessionID)
476 if !ok || mark == 0 {
477 return false
478 }
479 return seq == 0 || seq <= mark
480}
481
482// persistCanceledTurn writes the user/assistant records for a turn that
483// was canceled before (or just as) streaming would have produced them.
484// It creates the user message only when it was not already created by an
485// earlier createUserMessage call (userMsgCreated), then writes an
486// assistant message with FinishReasonCanceled. Both writes use
487// context.WithoutCancel(ctx) so workspace shutdown (which cancels the run
488// context) can't drop them.
489func (a *sessionAgent) persistCanceledTurn(ctx context.Context, call SessionAgentCall, userMsgCreated bool) error {
490 writeCtx, cancel := context.WithTimeout(context.WithoutCancel(ctx), 5*time.Second)
491 defer cancel()
492 if !userMsgCreated {
493 if _, err := a.createUserMessage(writeCtx, call); err != nil {
494 return err
495 }
496 }
497 largeModel := a.largeModel.Get()
498 assistant, err := a.messages.Create(writeCtx, call.SessionID, message.CreateMessageParams{
499 Role: message.Assistant,
500 Parts: []message.ContentPart{},
501 Model: largeModel.ModelCfg.Model,
502 Provider: largeModel.ModelCfg.Provider,
503 })
504 if err != nil {
505 return err
506 }
507 assistant.AddFinish(message.FinishReasonCanceled, "User canceled request", "")
508 return a.messages.Update(writeCtx, assistant)
509}
510
511// publishRunComplete emits the authoritative terminal event for a turn.
512// It honors the per-call OnComplete hook when set (so the coordinator can
513// coalesce retries) and otherwise falls back to the RunComplete broker.
514// ctx is used only for the bounded-blocking must-deliver publish; the
515// terminal payload is supplied by the caller. This is the single emit path
516// shared by the streaming defer and the cancel-on-entry early return so a
517// caller waiting on RunComplete (e.g. `crush run` with a RunID) always
518// observes exactly one terminal event regardless of which Run branch ends
519// the turn.
520func (a *sessionAgent) publishRunComplete(ctx context.Context, call SessionAgentCall, complete notify.RunComplete) {
521 if call.OnComplete != nil {
522 call.OnComplete(complete)
523 return
524 }
525 if a.runComplete == nil {
526 return
527 }
528 a.runComplete.PublishMustDeliver(ctx, pubsub.UpdatedEvent, complete)
529}
530
531// ValidateCall performs the cheap structural validation that
532// sessionAgent.Run requires before a call can be dispatched: a call must
533// carry either a non-empty prompt or a text attachment, and it must name a
534// session. It is exported so callers that accept a run before dispatching it
535// (e.g. backend.SendMessage) can apply the same checks and keep the error
536// contract consistent.
537func ValidateCall(call SessionAgentCall) error {
538 if call.Prompt == "" && !message.ContainsTextAttachment(call.Attachments) {
539 return ErrEmptyPrompt
540 }
541 if call.SessionID == "" {
542 return ErrSessionMissing
543 }
544 return nil
545}
546
547func (a *sessionAgent) Run(ctx context.Context, call SessionAgentCall) (result *fantasy.AgentResult, retErr error) {
548 if err := ValidateCall(call); err != nil {
549 return nil, err
550 }
551
552 // genCtx/cancel are the run context and its cancel func. For the
553 // accepted (fire-and-forget) dispatch path they are created under
554 // dispatchMu below so a concurrent Cancel can observe the
555 // activeRequests entry before the assistant message exists. For
556 // the in-process path they stay nil here and are created later,
557 // preserving the original ordering.
558 var (
559 genCtx context.Context
560 cancel context.CancelFunc
561 activeRegistered bool
562 userMsgCreated bool
563 )
564
565 if call.Accepted != nil {
566 // Serialize the accepted -> (cancel-on-entry | queued |
567 // active) transition against a concurrent Cancel. Cancel takes
568 // the same per-session lock, so every cancel observes at least
569 // one of: a cancel mark, an activeRequests entry, or a
570 // messageQueue entry it then clears.
571 mu := a.sessionMu(call.SessionID)
572 mu.Lock()
573
574 if a.canceledBySeq(call.SessionID, call.Accepted.seq) {
575 // Cancel-on-entry: a cancel arrived while this run was
576 // dispatched but not yet active, and this handle's accept
577 // sequence is at or below the session's cancel mark. The
578 // mark is left in place so sibling handles it also covers
579 // observe the same cancel; release the accept reservation,
580 // drop the lock, and persist a canceled turn without
581 // entering Stream.
582 //
583 // This path returns before the streaming defer that
584 // publishes RunComplete is installed, so emit the terminal
585 // event explicitly. Without it, a caller waiting on
586 // RunComplete for this RunID (e.g. `crush run`, which
587 // ignores message events and blocks on RunComplete) would
588 // hang on an immediately-canceled accepted run.
589 call.Accepted.Close()
590 mu.Unlock()
591 complete := notify.RunComplete{
592 SessionID: call.SessionID,
593 RunID: call.RunID,
594 Cancelled: true,
595 }
596 if err := a.persistCanceledTurn(ctx, call, false); err != nil {
597 complete.Error = err.Error()
598 a.publishRunComplete(ctx, call, complete)
599 return nil, err
600 }
601 a.publishRunComplete(ctx, call, complete)
602 return nil, nil
603 }
604
605 if a.IsSessionBusy(call.SessionID) {
606 // Busy: an earlier prompt is active. Queue this call and
607 // release the accept reservation. A Cancel arriving after
608 // this point sees the active entry and clears the queue.
609 a.enqueueCall(call)
610 call.Accepted.Close()
611 mu.Unlock()
612 return nil, nil
613 }
614
615 // Idle: become the active run. Register the cancel func before
616 // dropping the lock so a Cancel that arrives between here and
617 // assistant creation is not lost.
618 runCtx := context.WithValue(ctx, tools.SessionIDContextKey, call.SessionID)
619 genCtx, cancel = context.WithCancel(runCtx)
620 a.activeRequests.Set(call.SessionID, cancel)
621 activeRegistered = true
622 call.Accepted.Close()
623 mu.Unlock()
624
625 defer cancel()
626 defer a.activeRequests.Del(call.SessionID)
627 } else if a.IsSessionBusy(call.SessionID) {
628 // Queue the message if busy. Strip OnComplete: the caller that
629 // supplied the hook (typically coordinator.Run) has its own
630 // retry/coalesce scope that ends when it returns, so by the time
631 // the queue drains nobody is left to consume the buffered
632 // terminal event. The recursive Run will fall back to the
633 // default broker publish, which is what existing subscribers
634 // expect for queued turns.
635 a.enqueueCall(call)
636 return nil, nil
637 }
638
639 // Copy mutable fields under lock to avoid races with SetTools/SetModels.
640 agentTools := a.tools.Copy()
641 largeModel := a.largeModel.Get()
642 systemPrompt := a.systemPrompt.Get()
643 promptPrefix := a.systemPromptPrefix.Get()
644 var instructions strings.Builder
645
646 for _, server := range mcp.GetStates() {
647 if server.State != mcp.StateConnected {
648 continue
649 }
650 if s := server.Client.InitializeResult().Instructions; s != "" {
651 instructions.WriteString(s)
652 instructions.WriteString("\n\n")
653 }
654 }
655
656 if s := instructions.String(); s != "" {
657 systemPrompt += "\n\n<mcp-instructions>\n" + s + "\n</mcp-instructions>"
658 }
659
660 if len(agentTools) > 0 {
661 // Add Anthropic caching to the last tool.
662 agentTools[len(agentTools)-1].SetProviderOptions(a.getCacheControlOptions())
663 }
664
665 agent := fantasy.NewAgent(
666 largeModel.Model,
667 fantasy.WithSystemPrompt(systemPrompt),
668 fantasy.WithTools(agentTools...),
669 fantasy.WithUserAgent(userAgent),
670 )
671
672 sessionLock := sync.Mutex{}
673 currentSession, err := a.sessions.Get(ctx, call.SessionID)
674 if err != nil {
675 return nil, fmt.Errorf("failed to get session: %w", err)
676 }
677
678 msgs, err := a.getSessionMessages(ctx, currentSession)
679 if err != nil {
680 return nil, fmt.Errorf("failed to get session messages: %w", err)
681 }
682
683 var wg sync.WaitGroup
684 // Generate title if first message.
685 if len(msgs) == 0 {
686 titleCtx := ctx // Copy to avoid race with ctx reassignment below.
687 wg.Go(func() {
688 a.generateTitle(titleCtx, call.SessionID, call.Prompt)
689 })
690 }
691 defer wg.Wait()
692
693 // Add the user message to the session.
694 _, err = a.createUserMessage(ctx, call)
695 if err != nil {
696 return nil, err
697 }
698 userMsgCreated = true
699
700 // Add the session to the context.
701 ctx = context.WithValue(ctx, tools.SessionIDContextKey, call.SessionID)
702
703 // For the accepted dispatch path the run context and cancel func
704 // were already created and registered under dispatchMu above; reuse
705 // them. For the in-process path create them here, preserving the
706 // original ordering.
707 if !activeRegistered {
708 genCtx, cancel = context.WithCancel(ctx)
709 a.activeRequests.Set(call.SessionID, cancel)
710
711 defer cancel()
712 defer a.activeRequests.Del(call.SessionID)
713 }
714 // skipRunComplete is set just before the queued-recursion path so
715 // the outer Run doesn't publish a RunComplete that would race
716 // with — and be superseded by — the recursive call's own
717 // RunComplete (each queued user prompt is its own turn and
718 // publishes exactly one terminal event).
719 var skipRunComplete bool
720 // currentAssistant is declared here so the deferred RunComplete
721 // publish below can capture the pointer that PrepareStep will
722 // later (re)assign for each streaming step. The final assistant
723 // message of the turn is the value reachable through this
724 // pointer when the defer runs.
725 var currentAssistant *message.Message
726 // Drain any debounced message updates before returning. message.Service
727 // already flushes synchronously on terminal updates, but a defer here
728 // guarantees the contract at every Run exit (success, error, panic
729 // recovery upstream) without callers needing to know.
730 //
731 // After the flush completes — meaning all per-message
732 // Publish(UpdatedEvent) calls have fired and been buffered into
733 // every subscriber's channel — publish the authoritative
734 // RunComplete event for this turn. The flush-then-publish order
735 // gives well-behaved clients the best chance of seeing the final
736 // message event before RunComplete; the embedded Text field
737 // reconciles for clients that observe the events out of order
738 // (the pubsub broker fan-in does not serialize publishes from
739 // different upstream brokers).
740 defer func() {
741 // Use a context detached from the run context: workspace
742 // shutdown cancels ctx before this goroutine returns, but the
743 // buffered streaming deltas must still land before the DB is
744 // closed. A short timeout bounds the flush.
745 flushCtx, flushCancel := context.WithTimeout(context.WithoutCancel(ctx), 5*time.Second)
746 defer flushCancel()
747 if flushErr := a.messages.FlushAll(flushCtx); flushErr != nil {
748 slog.Error("Failed to flush pending message updates after run", "error", flushErr)
749 }
750 if skipRunComplete {
751 return
752 }
753 complete := notify.RunComplete{SessionID: call.SessionID, RunID: call.RunID}
754 if currentAssistant != nil {
755 complete.MessageID = currentAssistant.ID
756 complete.Text = currentAssistant.Content().String()
757 }
758 if retErr != nil {
759 complete.Error = retErr.Error()
760 complete.Cancelled = errors.Is(retErr, context.Canceled)
761 } else if ctx.Err() != nil {
762 complete.Cancelled = true
763 }
764 // Prefer the per-call hook when supplied so the coordinator
765 // can coalesce retries (e.g. unauthorized → re-auth → retry)
766 // into a single user-visible terminal event. The fallback
767 // must-deliver publish applies bounded-blocking semantics to
768 // the authoritative terminal event so a momentarily-full
769 // subscriber channel can't silently drop it and hang
770 // non-interactive clients waiting on RunComplete.
771 a.publishRunComplete(ctx, call, complete)
772 }()
773
774 history, files := a.preparePrompt(msgs, largeModel.CatwalkCfg.SupportsImages, call.Attachments...)
775
776 startTime := time.Now()
777 a.eventPromptSent(call.SessionID)
778
779 var stepMessages []fantasy.Message
780 var shouldSummarize bool
781 // Don't send MaxOutputTokens if 0 — some providers (e.g. LM Studio) reject it
782 var maxOutputTokens *int64
783 if call.MaxOutputTokens > 0 {
784 maxOutputTokens = &call.MaxOutputTokens
785 }
786 result, err = agent.Stream(genCtx, fantasy.AgentStreamCall{
787 Prompt: message.PromptWithTextAttachments(call.Prompt, call.Attachments),
788 Files: files,
789 Messages: history,
790 ProviderOptions: call.ProviderOptions,
791 MaxOutputTokens: maxOutputTokens,
792 TopP: call.TopP,
793 Temperature: call.Temperature,
794 PresencePenalty: call.PresencePenalty,
795 TopK: call.TopK,
796 FrequencyPenalty: call.FrequencyPenalty,
797 PrepareStep: func(callContext context.Context, options fantasy.PrepareStepFunctionOptions) (_ context.Context, prepared fantasy.PrepareStepResult, err error) {
798 prepared.Messages = options.Messages
799 for i := range prepared.Messages {
800 prepared.Messages[i].ProviderOptions = nil
801 }
802
803 // Use latest tools (updated by SetTools when MCP tools change).
804 prepared.Tools = a.tools.Copy()
805
806 // Drain queued follow-up prompts for this step. Calls covered
807 // by a cancel recorded while they sat in the queue are dropped:
808 // a cancel that arrived after a prompt was queued must not let
809 // it run as part of this step. Coverage is per-call by accept
810 // sequence so a follow-up queued after the cancel (higher seq)
811 // is not dropped. A dropped prompt carrying a RunID still gets
812 // its terminal cancelled RunComplete so a caller waiting on it
813 // does not hang. Uncanceled prompts without a RunID are folded
814 // into this turn; uncanceled prompts with a RunID are left
815 // queued so each runs as its own turn (with its own
816 // RunComplete) via the recursive run path below.
817 fold, canceledRunIDs := a.drainQueueForStep(call.SessionID)
818 a.publishCanceledQueueDrops(canceledRunIDs)
819 for _, queued := range fold {
820 userMessage, createErr := a.createUserMessage(callContext, queued)
821 if createErr != nil {
822 return callContext, prepared, createErr
823 }
824 prepared.Messages = append(prepared.Messages, userMessage.ToAIMessage()...)
825 }
826
827 prepared.Messages = a.workaroundProviderMediaLimitations(prepared.Messages, largeModel)
828
829 lastSystemRoleInx := 0
830 systemMessageUpdated := false
831 for i, msg := range prepared.Messages {
832 // Only add cache control to the last message.
833 if msg.Role == fantasy.MessageRoleSystem {
834 lastSystemRoleInx = i
835 } else if !systemMessageUpdated {
836 prepared.Messages[lastSystemRoleInx].ProviderOptions = a.getCacheControlOptions()
837 systemMessageUpdated = true
838 }
839 // Than add cache control to the last 2 messages.
840 if i > len(prepared.Messages)-3 {
841 prepared.Messages[i].ProviderOptions = a.getCacheControlOptions()
842 }
843 }
844
845 if promptPrefix != "" {
846 prepared.Messages = append([]fantasy.Message{fantasy.NewSystemMessage(promptPrefix)}, prepared.Messages...)
847 }
848
849 sessionLock.Lock()
850 stepMessages = cloneFantasyMessages(prepared.Messages)
851 sessionLock.Unlock()
852
853 var assistantMsg message.Message
854 assistantMsg, err = a.messages.Create(callContext, call.SessionID, message.CreateMessageParams{
855 Role: message.Assistant,
856 Parts: []message.ContentPart{},
857 Model: largeModel.ModelCfg.Model,
858 Provider: largeModel.ModelCfg.Provider,
859 })
860 if err != nil {
861 return callContext, prepared, err
862 }
863 callContext = context.WithValue(callContext, tools.MessageIDContextKey, assistantMsg.ID)
864 callContext = context.WithValue(callContext, tools.SupportsImagesContextKey, largeModel.CatwalkCfg.SupportsImages)
865 callContext = context.WithValue(callContext, tools.ModelNameContextKey, largeModel.CatwalkCfg.Name)
866 currentAssistant = &assistantMsg
867 return callContext, prepared, err
868 },
869 OnReasoningStart: func(id string, reasoning fantasy.ReasoningContent) error {
870 currentAssistant.AppendReasoningContent(reasoning.Text)
871 return a.messages.Update(genCtx, *currentAssistant)
872 },
873 OnReasoningDelta: func(id string, text string) error {
874 currentAssistant.AppendReasoningContent(text)
875 return a.messages.Update(genCtx, *currentAssistant)
876 },
877 OnReasoningEnd: func(id string, reasoning fantasy.ReasoningContent) error {
878 // handle anthropic signature
879 if anthropicData, ok := reasoning.ProviderMetadata[anthropic.Name]; ok {
880 if reasoning, ok := anthropicData.(*anthropic.ReasoningOptionMetadata); ok {
881 currentAssistant.AppendReasoningSignature(reasoning.Signature)
882 }
883 }
884 if googleData, ok := reasoning.ProviderMetadata[google.Name]; ok {
885 if reasoning, ok := googleData.(*google.ReasoningMetadata); ok {
886 currentAssistant.AppendThoughtSignature(reasoning.Signature, reasoning.ToolID)
887 }
888 }
889 if openaiData, ok := reasoning.ProviderMetadata[openai.Name]; ok {
890 if reasoning, ok := openaiData.(*openai.ResponsesReasoningMetadata); ok {
891 currentAssistant.SetReasoningResponsesData(reasoning)
892 }
893 }
894 currentAssistant.FinishThinking()
895 return a.messages.Update(genCtx, *currentAssistant)
896 },
897 OnTextDelta: func(id string, text string) error {
898 // Strip leading newline from initial text content. This is is
899 // particularly important in non-interactive mode where leading
900 // newlines are very visible.
901 if len(currentAssistant.Parts) == 0 {
902 text = strings.TrimPrefix(text, "\n")
903 }
904
905 currentAssistant.AppendContent(text)
906 return a.messages.Update(genCtx, *currentAssistant)
907 },
908 OnToolInputStart: func(id string, toolName string) error {
909 toolCall := message.ToolCall{
910 ID: id,
911 Name: toolName,
912 ProviderExecuted: false,
913 Finished: false,
914 }
915 currentAssistant.AddToolCall(toolCall)
916 // Use parent ctx instead of genCtx to ensure the update succeeds
917 // even if the request is canceled mid-stream
918 return a.messages.Update(ctx, *currentAssistant)
919 },
920 OnRetry: func(err *fantasy.ProviderError, delay time.Duration) {
921 slog.Warn("Provider request failed, retrying", providerRetryLogFields(err, delay)...)
922 },
923 OnToolCall: func(tc fantasy.ToolCallContent) error {
924 toolCall := message.ToolCall{
925 ID: tc.ToolCallID,
926 Name: tc.ToolName,
927 Input: tc.Input,
928 ProviderExecuted: false,
929 Finished: true,
930 }
931 currentAssistant.AddToolCall(toolCall)
932 // Use parent ctx instead of genCtx to ensure the update succeeds
933 // even if the request is canceled mid-stream
934 return a.messages.Update(ctx, *currentAssistant)
935 },
936 OnToolResult: func(result fantasy.ToolResultContent) error {
937 toolResult := a.convertToToolResult(result)
938 // Use parent ctx instead of genCtx to ensure the message is created
939 // even if the request is canceled mid-stream
940 _, createMsgErr := a.messages.Create(ctx, currentAssistant.SessionID, message.CreateMessageParams{
941 Role: message.Tool,
942 Parts: []message.ContentPart{
943 toolResult,
944 },
945 })
946 return createMsgErr
947 },
948 OnStepFinish: func(stepResult fantasy.StepResult) error {
949 finishReason := message.FinishReasonUnknown
950 switch stepResult.FinishReason {
951 case fantasy.FinishReasonLength:
952 finishReason = message.FinishReasonMaxTokens
953 case fantasy.FinishReasonStop:
954 finishReason = message.FinishReasonEndTurn
955 case fantasy.FinishReasonToolCalls:
956 finishReason = message.FinishReasonToolUse
957 }
958 // If a tool result halted the turn (e.g. a hook halt or a
959 // permission denial), the step ends on FinishReasonToolCalls but
960 // the model will not be called again. Treat it as the end of the
961 // turn so the UI can render the assistant footer.
962 if finishReason == message.FinishReasonToolUse {
963 for _, tr := range stepResult.Content.ToolResults() {
964 if tr.StopTurn {
965 finishReason = message.FinishReasonEndTurn
966 break
967 }
968 }
969 }
970 currentAssistant.AddFinish(finishReason, "", "")
971 sessionLock.Lock()
972 defer sessionLock.Unlock()
973
974 updatedSession, getSessionErr := a.sessions.Get(ctx, call.SessionID)
975 if getSessionErr != nil {
976 return getSessionErr
977 }
978 usage, estimated := fallbackStepUsage(stepMessages, stepResult)
979 a.updateSessionUsage(largeModel, &updatedSession, usage, a.openrouterCost(stepResult.ProviderMetadata), estimated)
980 _, sessionErr := a.sessions.Save(ctx, updatedSession)
981 if sessionErr != nil {
982 return sessionErr
983 }
984 currentSession = updatedSession
985 return a.messages.Update(genCtx, *currentAssistant)
986 },
987 StopWhen: []fantasy.StopCondition{
988 func(_ []fantasy.StepResult) bool {
989 cw := int64(largeModel.CatwalkCfg.ContextWindow)
990 // If context window is unknown (0), skip auto-summarize
991 // to avoid immediately truncating custom/local models.
992 if cw == 0 {
993 return false
994 }
995 tokens := currentSession.CompletionTokens + currentSession.PromptTokens
996 remaining := cw - tokens
997 var threshold int64
998 if cw > largeContextWindowThreshold {
999 threshold = largeContextWindowBuffer
1000 } else {
1001 threshold = int64(float64(cw) * smallContextWindowRatio)
1002 }
1003 if (remaining <= threshold) && !a.disableAutoSummarize {
1004 shouldSummarize = true
1005 return true
1006 }
1007 return false
1008 },
1009 func(steps []fantasy.StepResult) bool {
1010 return hasRepeatedToolCalls(steps, loopDetectionWindowSize, loopDetectionMaxRepeats)
1011 },
1012 },
1013 })
1014
1015 a.eventPromptResponded(call.SessionID, time.Since(startTime).Truncate(time.Second))
1016
1017 if err != nil {
1018 isHyper := largeModel.ModelCfg.Provider == hyper.Name
1019 isCancelErr := errors.Is(err, context.Canceled)
1020 if currentAssistant == nil {
1021 // Cancel-before-assistant-creation window: the run was
1022 // canceled after activeRequests.Set but before PrepareStep
1023 // created the assistant message. Without this, the turn
1024 // would return with no FinishReasonCanceled marker and no
1025 // user-visible record. The user message was already created
1026 // above, so persistCanceledTurn only writes the assistant
1027 // record.
1028 if isCancelErr {
1029 if persistErr := a.persistCanceledTurn(ctx, call, userMsgCreated); persistErr != nil {
1030 return nil, persistErr
1031 }
1032 }
1033 return result, err
1034 }
1035 // Persist final state with a context detached from the run
1036 // context. The run context (ctx) is derived from the
1037 // workspace context, which workspace shutdown cancels before
1038 // agent goroutines finish; using ctx here would drop the
1039 // final assistant state. WithoutCancel keeps the values
1040 // (e.g. session ID) while ignoring cancellation, and a short
1041 // timeout bounds the cleanup writes.
1042 cleanupCtx, cleanupCancel := context.WithTimeout(context.WithoutCancel(ctx), 5*time.Second)
1043 defer cleanupCancel()
1044 // Ensure we finish thinking on error to close the reasoning state.
1045 currentAssistant.FinishThinking()
1046 toolCalls := currentAssistant.ToolCalls()
1047 // INFO: we use the cleanup context here because the genCtx has been cancelled.
1048 msgs, createErr := a.messages.List(cleanupCtx, currentAssistant.SessionID)
1049 if createErr != nil {
1050 return nil, createErr
1051 }
1052 for _, tc := range toolCalls {
1053 if !tc.Finished {
1054 tc.Finished = true
1055 tc.Input = "{}"
1056 currentAssistant.AddToolCall(tc)
1057 updateErr := a.messages.Update(cleanupCtx, *currentAssistant)
1058 if updateErr != nil {
1059 return nil, updateErr
1060 }
1061 }
1062
1063 found := false
1064 for _, msg := range msgs {
1065 if msg.Role == message.Tool {
1066 for _, tr := range msg.ToolResults() {
1067 if tr.ToolCallID == tc.ID {
1068 found = true
1069 break
1070 }
1071 }
1072 }
1073 if found {
1074 break
1075 }
1076 }
1077 if found {
1078 continue
1079 }
1080 content := "There was an error while executing the tool"
1081 if isCancelErr {
1082 content = "Error: user cancelled assistant tool calling"
1083 }
1084 toolResult := message.ToolResult{
1085 ToolCallID: tc.ID,
1086 Name: tc.Name,
1087 Content: content,
1088 IsError: true,
1089 }
1090 _, createErr = a.messages.Create(cleanupCtx, currentAssistant.SessionID, message.CreateMessageParams{
1091 Role: message.Tool,
1092 Parts: []message.ContentPart{
1093 toolResult,
1094 },
1095 })
1096 if createErr != nil {
1097 return nil, createErr
1098 }
1099 }
1100 var fantasyErr *fantasy.Error
1101 var providerErr *fantasy.ProviderError
1102 const defaultTitle = "Provider Error"
1103 linkStyle := lipgloss.NewStyle().Foreground(charmtone.Guac).Underline(true)
1104 if isCancelErr {
1105 currentAssistant.AddFinish(message.FinishReasonCanceled, "User canceled request", "")
1106 } else if isHyper && errors.As(err, &providerErr) && providerErr.StatusCode == http.StatusUnauthorized {
1107 currentAssistant.AddFinish(message.FinishReasonError, "Unauthorized", `Please re-authenticate with Hyper. You can also run "crush auth" to re-authenticate.`)
1108 } else if isHyper && errors.As(err, &providerErr) && providerErr.StatusCode == http.StatusPaymentRequired {
1109 url := hyper.BaseURL()
1110 link := linkStyle.Hyperlink(url, "id=hyper").Render(url)
1111 currentAssistant.AddFinish(message.FinishReasonError, "No credits", "You're out of credits. Add more at "+link)
1112 } else if errors.As(err, &providerErr) {
1113 if providerErr.Message == "The requested model is not supported." {
1114 url := "https://github.com/settings/copilot/features"
1115 link := linkStyle.Hyperlink(url, "id=copilot").Render(url)
1116 currentAssistant.AddFinish(
1117 message.FinishReasonError,
1118 "Copilot model not enabled",
1119 fmt.Sprintf("%q is not enabled in Copilot. Go to the following page to enable it. Then, wait 5 minutes before trying again. %s", largeModel.CatwalkCfg.Name, link),
1120 )
1121 } else {
1122 currentAssistant.AddFinish(message.FinishReasonError, cmp.Or(stringext.Capitalize(providerErr.Title), defaultTitle), providerErr.Message)
1123 }
1124 } else if errors.As(err, &fantasyErr) {
1125 currentAssistant.AddFinish(message.FinishReasonError, cmp.Or(stringext.Capitalize(fantasyErr.Title), defaultTitle), fantasyErr.Message)
1126 } else {
1127 currentAssistant.AddFinish(message.FinishReasonError, defaultTitle, err.Error())
1128 }
1129 // Note: we use the cleanup context here because the genCtx has been
1130 // cancelled.
1131 updateErr := a.messages.Update(cleanupCtx, *currentAssistant)
1132 if updateErr != nil {
1133 return nil, updateErr
1134 }
1135 return nil, err
1136 }
1137
1138 if shouldSummarize {
1139 a.activeRequests.Del(call.SessionID)
1140 if summarizeErr := a.Summarize(genCtx, call.SessionID, call.ProviderOptions); summarizeErr != nil {
1141 return nil, summarizeErr
1142 }
1143 // If the agent wasn't done...
1144 if len(currentAssistant.ToolCalls()) > 0 {
1145 existing, ok := a.messageQueue.Get(call.SessionID)
1146 if !ok {
1147 existing = []SessionAgentCall{}
1148 }
1149 call.Prompt = fmt.Sprintf("The previous session was interrupted because it got too long, the initial user request was: `%s`", call.Prompt)
1150 existing = append(existing, call)
1151 a.messageQueue.Set(call.SessionID, existing)
1152 }
1153 }
1154
1155 // Release active request before publishing the notification.
1156 // TUI handlers poll IsSessionBusy() and only re-evaluate when a
1157 // tea.Msg arrives, so the cleanup must precede the notify or
1158 // subscribers see stale busy state at the moment of receipt.
1159 a.activeRequests.Del(call.SessionID)
1160 cancel()
1161
1162 // Send notification that agent has finished its turn (skip for
1163 // nested/non-interactive sessions).
1164 if !call.NonInteractive && a.notify != nil {
1165 a.notify.Publish(pubsub.CreatedEvent, notify.Notification{
1166 SessionID: call.SessionID,
1167 SessionTitle: currentSession.Title,
1168 Type: notify.TypeAgentFinished,
1169 })
1170 }
1171
1172 // Hand off to the next queued prompt (if any) under dispatchMu so
1173 // the transition from this finished run to the queued run is atomic
1174 // against a concurrent Cancel. activeRequests for this session was
1175 // just deleted above, so without the lock there is a window in
1176 // which the session looks idle and a cancel becomes a no-op that
1177 // fails to stop the queued prompt. Holding the lock lets us observe
1178 // a pending cancel recorded against the session and drop the queue
1179 // instead of running it, and (for the recursion) hand a fresh
1180 // accept reservation to the dequeued call so acceptedRuns stays > 0
1181 // across the recursive Run's own dispatch handoff — keeping the
1182 // session observable to Cancel for the entire transition and
1183 // closing the dequeue -> re-register window.
1184 mu := a.sessionMu(call.SessionID)
1185 mu.Lock()
1186 queuedMessages, _ := a.messageQueue.Get(call.SessionID)
1187 if mark, ok := a.cancelMark.Get(call.SessionID); ok && mark > 0 && len(queuedMessages) > 0 {
1188 // A cancel was recorded for this session (e.g. it arrived while
1189 // this run was active and follow-ups had been queued). Drop the
1190 // queued prompts it covers (accept sequence at or below the
1191 // mark, or untracked); keep any queued after the cancel (higher
1192 // sequence) so they still run.
1193 var kept []SessionAgentCall
1194 var canceledRunIDDrops []SessionAgentCall
1195 for _, q := range queuedMessages {
1196 if q.acceptSeq == 0 || q.acceptSeq <= mark {
1197 if q.RunID != "" {
1198 canceledRunIDDrops = append(canceledRunIDDrops, q)
1199 }
1200 continue
1201 }
1202 kept = append(kept, q)
1203 }
1204 queuedMessages = kept
1205 a.messageQueue.Set(call.SessionID, kept)
1206 // A dropped prompt carrying a RunID must still publish its
1207 // terminal cancelled RunComplete so a caller waiting on that
1208 // RunID does not hang.
1209 a.publishCanceledQueueDrops(canceledRunIDDrops)
1210 }
1211 if len(queuedMessages) == 0 {
1212 // No queued work. Clear the cancel mark only when no accepted
1213 // run remains in flight that it might still cover; otherwise a
1214 // sibling prompt (sequence at or below the mark) waiting to
1215 // enter Run would lose its cancellation. When accepted runs are
1216 // gone, this also clears a stale mark so it can't catch a
1217 // future run.
1218 a.messageQueue.Del(call.SessionID)
1219 a.acceptedMu.Lock()
1220 inFlight, _ := a.acceptedRuns.Get(call.SessionID)
1221 a.acceptedMu.Unlock()
1222 if inFlight == 0 {
1223 a.cancelMark.Del(call.SessionID)
1224 }
1225 mu.Unlock()
1226 return result, err
1227 }
1228 // There are queued messages, restart the loop. Suppress the outer
1229 // defer's emit: it would otherwise observe the recursive Run's retErr
1230 // (named-return clobbering through the return below) against this
1231 // turn's MessageID/Text and publish a mixed, racing event.
1232 skipRunComplete = true
1233 // Decide whether this turn still owes its own terminal RunComplete.
1234 // Each submitted prompt with a RunID has its own lifecycle, so a turn
1235 // that is finished and handing off to a *different* queued prompt must
1236 // publish its own RunComplete here — leaving it to the recursive turn
1237 // (which carries a different RunID) would hang a caller waiting on
1238 // this turn's RunID. The exception is the summarize-continuation path,
1239 // which re-queues this same call (same RunID) to resume after a
1240 // summary; in that case the eventual terminal turn for this RunID
1241 // publishes, so publishing now would double-emit.
1242 outerOwesRunComplete := call.RunID != ""
1243 if outerOwesRunComplete {
1244 for _, q := range queuedMessages {
1245 if q.RunID == call.RunID {
1246 outerOwesRunComplete = false
1247 break
1248 }
1249 }
1250 }
1251 firstQueuedMessage := queuedMessages[0]
1252 a.messageQueue.Set(call.SessionID, queuedMessages[1:])
1253 // Reserve a fresh accept for the dequeued prompt before dropping the
1254 // lock so acceptedRuns > 0 across the handoff into the recursive
1255 // Run. This closes the window between this dequeue and the recursive
1256 // Run registering its activeRequests entry: a cancel arriving in
1257 // that window now records a pending cancel (acceptedRuns > 0) that
1258 // the recursive Run's accepted path observes as cancel-on-entry.
1259 firstQueuedMessage.Accepted = a.BeginAccepted(call.SessionID)
1260 mu.Unlock()
1261 if outerOwesRunComplete {
1262 complete := notify.RunComplete{SessionID: call.SessionID, RunID: call.RunID}
1263 if currentAssistant != nil {
1264 complete.MessageID = currentAssistant.ID
1265 complete.Text = currentAssistant.Content().String()
1266 }
1267 if ctx.Err() != nil {
1268 complete.Cancelled = true
1269 }
1270 a.publishRunComplete(ctx, call, complete)
1271 }
1272 return a.Run(ctx, firstQueuedMessage)
1273}
1274
1275func (a *sessionAgent) Summarize(ctx context.Context, sessionID string, opts fantasy.ProviderOptions) error {
1276 if a.IsSessionBusy(sessionID) {
1277 return ErrSessionBusy
1278 }
1279
1280 // Copy mutable fields under lock to avoid races with SetModels.
1281 largeModel := a.largeModel.Get()
1282 systemPromptPrefix := a.systemPromptPrefix.Get()
1283
1284 currentSession, err := a.sessions.Get(ctx, sessionID)
1285 if err != nil {
1286 return fmt.Errorf("failed to get session: %w", err)
1287 }
1288 msgs, err := a.getSessionMessages(ctx, currentSession)
1289 if err != nil {
1290 return err
1291 }
1292 if len(msgs) == 0 {
1293 // Nothing to summarize.
1294 return nil
1295 }
1296
1297 aiMsgs, _ := a.preparePrompt(msgs, largeModel.CatwalkCfg.SupportsImages)
1298
1299 genCtx, cancel := context.WithCancel(ctx)
1300 a.activeRequests.Set(sessionID, cancel)
1301 defer a.activeRequests.Del(sessionID)
1302 defer cancel()
1303 defer func() {
1304 if flushErr := a.messages.FlushAll(ctx); flushErr != nil {
1305 slog.Error("Failed to flush pending message updates after summarize", "error", flushErr)
1306 }
1307 }()
1308
1309 agent := fantasy.NewAgent(
1310 largeModel.Model,
1311 fantasy.WithSystemPrompt(string(summaryPrompt)),
1312 fantasy.WithUserAgent(userAgent),
1313 )
1314 summaryMessage, err := a.messages.Create(ctx, sessionID, message.CreateMessageParams{
1315 Role: message.Assistant,
1316 Model: largeModel.ModelCfg.Model,
1317 Provider: largeModel.ModelCfg.Provider,
1318 IsSummaryMessage: true,
1319 })
1320 if err != nil {
1321 return err
1322 }
1323
1324 summaryPromptText := buildSummaryPrompt(currentSession.Todos)
1325
1326 resp, err := agent.Stream(genCtx, fantasy.AgentStreamCall{
1327 Prompt: summaryPromptText,
1328 Messages: aiMsgs,
1329 ProviderOptions: opts,
1330 PrepareStep: func(callContext context.Context, options fantasy.PrepareStepFunctionOptions) (_ context.Context, prepared fantasy.PrepareStepResult, err error) {
1331 prepared.Messages = options.Messages
1332 if systemPromptPrefix != "" {
1333 prepared.Messages = append([]fantasy.Message{fantasy.NewSystemMessage(systemPromptPrefix)}, prepared.Messages...)
1334 }
1335 return callContext, prepared, nil
1336 },
1337 OnReasoningDelta: func(id string, text string) error {
1338 summaryMessage.AppendReasoningContent(text)
1339 return a.messages.Update(genCtx, summaryMessage)
1340 },
1341 OnReasoningEnd: func(id string, reasoning fantasy.ReasoningContent) error {
1342 // Handle anthropic signature.
1343 if anthropicData, ok := reasoning.ProviderMetadata["anthropic"]; ok {
1344 if signature, ok := anthropicData.(*anthropic.ReasoningOptionMetadata); ok && signature.Signature != "" {
1345 summaryMessage.AppendReasoningSignature(signature.Signature)
1346 }
1347 }
1348 summaryMessage.FinishThinking()
1349 return a.messages.Update(genCtx, summaryMessage)
1350 },
1351 OnTextDelta: func(id, text string) error {
1352 summaryMessage.AppendContent(text)
1353 return a.messages.Update(genCtx, summaryMessage)
1354 },
1355 })
1356 if err != nil {
1357 isCancelErr := errors.Is(err, context.Canceled)
1358 if isCancelErr {
1359 // User cancelled summarize we need to remove the summary message.
1360 deleteErr := a.messages.Delete(ctx, summaryMessage.ID)
1361 return deleteErr
1362 }
1363 // Mark the summary message as finished with an error so the UI
1364 // stops spinning.
1365 summaryMessage.AddFinish(message.FinishReasonError, "Summarization Error", err.Error())
1366 if updateErr := a.messages.Update(ctx, summaryMessage); updateErr != nil {
1367 return updateErr
1368 }
1369 return err
1370 }
1371
1372 summaryMessage.AddFinish(message.FinishReasonEndTurn, "", "")
1373 err = a.messages.Update(genCtx, summaryMessage)
1374 if err != nil {
1375 return err
1376 }
1377
1378 var openrouterCost *float64
1379 for _, step := range resp.Steps {
1380 stepCost := a.openrouterCost(step.ProviderMetadata)
1381 if stepCost != nil {
1382 newCost := *stepCost
1383 if openrouterCost != nil {
1384 newCost += *openrouterCost
1385 }
1386 openrouterCost = &newCost
1387 }
1388 }
1389
1390 a.updateSessionUsage(largeModel, ¤tSession, resp.TotalUsage, openrouterCost, false)
1391
1392 // Just in case, get just the last usage info.
1393 usage := resp.Response.Usage
1394 currentSession.SummaryMessageID = summaryMessage.ID
1395 currentSession.CompletionTokens = summaryCompletionTokens(usage, summaryMessage)
1396 currentSession.PromptTokens = 0
1397 currentSession.EstimatedUsage = usageIsZero(usage)
1398 _, err = a.sessions.Save(genCtx, currentSession)
1399 if err != nil {
1400 return err
1401 }
1402
1403 // Release the active request before processing queued messages so that
1404 // Run() does not see the session as busy.
1405 a.activeRequests.Del(sessionID)
1406 cancel()
1407
1408 // Process any messages that were queued while summarizing.
1409 queuedMessages, ok := a.messageQueue.Get(sessionID)
1410 if !ok || len(queuedMessages) == 0 {
1411 return nil
1412 }
1413 firstQueuedMessage := queuedMessages[0]
1414 a.messageQueue.Set(sessionID, queuedMessages[1:])
1415 _, qErr := a.Run(ctx, firstQueuedMessage)
1416 return qErr
1417}
1418
1419func (a *sessionAgent) getCacheControlOptions() fantasy.ProviderOptions {
1420 if t, _ := strconv.ParseBool(os.Getenv("CRUSH_DISABLE_ANTHROPIC_CACHE")); t {
1421 return fantasy.ProviderOptions{}
1422 }
1423 return fantasy.ProviderOptions{
1424 anthropic.Name: &anthropic.ProviderCacheControlOptions{
1425 CacheControl: anthropic.CacheControl{Type: "ephemeral"},
1426 },
1427 bedrock.Name: &anthropic.ProviderCacheControlOptions{
1428 CacheControl: anthropic.CacheControl{Type: "ephemeral"},
1429 },
1430 vercel.Name: &anthropic.ProviderCacheControlOptions{
1431 CacheControl: anthropic.CacheControl{Type: "ephemeral"},
1432 },
1433 }
1434}
1435
1436func (a *sessionAgent) createUserMessage(ctx context.Context, call SessionAgentCall) (message.Message, error) {
1437 parts := []message.ContentPart{message.TextContent{Text: call.Prompt}}
1438 var attachmentParts []message.ContentPart
1439 for _, attachment := range call.Attachments {
1440 attachmentParts = append(attachmentParts, message.BinaryContent{Path: attachment.FilePath, MIMEType: attachment.MimeType, Data: attachment.Content})
1441 }
1442 parts = append(parts, attachmentParts...)
1443 msg, err := a.messages.Create(ctx, call.SessionID, message.CreateMessageParams{
1444 Role: message.User,
1445 Parts: parts,
1446 })
1447 if err != nil {
1448 return message.Message{}, fmt.Errorf("failed to create user message: %w", err)
1449 }
1450 return msg, nil
1451}
1452
1453func (a *sessionAgent) preparePrompt(msgs []message.Message, supportsImages bool, attachments ...message.Attachment) ([]fantasy.Message, []fantasy.FilePart) {
1454 var history []fantasy.Message
1455 if !a.isSubAgent {
1456 history = append(history, fantasy.NewUserMessage(
1457 fmt.Sprintf(
1458 "<system_reminder>%s</system_reminder>",
1459 `This is a reminder that your todo list is currently empty. DO NOT mention this to the user explicitly because they are already aware.
1460If you are working on tasks that would benefit from a todo list please use the "todos" tool to create one.
1461If not, please feel free to ignore. Again do not mention this message to the user.`,
1462 ),
1463 ))
1464 }
1465 // Collect all tool call IDs present in assistant messages and all tool
1466 // result IDs present in tool messages. This lets us detect both orphaned
1467 // tool results (result without a call) and orphaned tool calls (call
1468 // without a result).
1469 knownToolCallIDs := make(map[string]struct{})
1470 knownToolResultIDs := make(map[string]struct{})
1471 for _, m := range msgs {
1472 switch m.Role {
1473 case message.Assistant:
1474 for _, tc := range m.ToolCalls() {
1475 knownToolCallIDs[tc.ID] = struct{}{}
1476 }
1477 case message.Tool:
1478 for _, tr := range m.ToolResults() {
1479 knownToolResultIDs[tr.ToolCallID] = struct{}{}
1480 }
1481 }
1482 }
1483
1484 for _, m := range msgs {
1485 if len(m.Parts) == 0 {
1486 continue
1487 }
1488 // Assistant message without content or tool calls (cancelled before it returned anything).
1489 if m.Role == message.Assistant && len(m.ToolCalls()) == 0 && m.Content().Text == "" && m.ReasoningContent().String() == "" {
1490 continue
1491 }
1492 if m.Role == message.Tool {
1493 if msg, ok := filterOrphanedToolResults(m, knownToolCallIDs); ok {
1494 history = append(history, msg)
1495 }
1496 continue
1497 }
1498 aiMsgs := m.ToAIMessage()
1499 if !supportsImages {
1500 for i := range aiMsgs {
1501 if aiMsgs[i].Role == fantasy.MessageRoleUser {
1502 aiMsgs[i].Content = filterFileParts(aiMsgs[i].Content)
1503 }
1504 }
1505 }
1506 history = append(history, aiMsgs...)
1507
1508 if m.Role == message.Assistant {
1509 if msg, ok := syntheticToolResultsForOrphanedCalls(m, knownToolResultIDs); ok {
1510 history = append(history, msg)
1511 }
1512 }
1513 }
1514
1515 var files []fantasy.FilePart
1516 for _, attachment := range attachments {
1517 if attachment.IsText() {
1518 continue
1519 }
1520 files = append(files, fantasy.FilePart{
1521 Filename: attachment.FileName,
1522 Data: attachment.Content,
1523 MediaType: attachment.MimeType,
1524 })
1525 }
1526
1527 return history, files
1528}
1529
1530// filterFileParts removes fantasy.FilePart entries from a slice of message
1531// parts. Used to strip image attachments from historical user messages when
1532// the current model does not support them.
1533func filterFileParts(parts []fantasy.MessagePart) []fantasy.MessagePart {
1534 filtered := make([]fantasy.MessagePart, 0, len(parts))
1535 for _, part := range parts {
1536 if _, ok := fantasy.AsMessagePart[fantasy.FilePart](part); ok {
1537 continue
1538 }
1539 filtered = append(filtered, part)
1540 }
1541 return filtered
1542}
1543
1544// filterOrphanedToolResults converts a tool message to a fantasy.Message,
1545// dropping any tool result parts whose tool_call_id has no matching tool call
1546// in the known set. An orphaned result causes API validation to fail on every
1547// subsequent turn, permanently locking the session. Returns the filtered
1548// message and true if at least one valid part remains.
1549func filterOrphanedToolResults(m message.Message, knownToolCallIDs map[string]struct{}) (fantasy.Message, bool) {
1550 aiMsgs := m.ToAIMessage()
1551 if len(aiMsgs) == 0 {
1552 return fantasy.Message{}, false
1553 }
1554 var validParts []fantasy.MessagePart
1555 for _, part := range aiMsgs[0].Content {
1556 tr, ok := fantasy.AsMessagePart[fantasy.ToolResultPart](part)
1557 if !ok {
1558 validParts = append(validParts, part)
1559 continue
1560 }
1561 if _, known := knownToolCallIDs[tr.ToolCallID]; known {
1562 validParts = append(validParts, part)
1563 } else {
1564 slog.Warn(
1565 "Dropping orphaned tool result with no matching tool call",
1566 "tool_call_id", tr.ToolCallID,
1567 )
1568 }
1569 }
1570 if len(validParts) == 0 {
1571 return fantasy.Message{}, false
1572 }
1573 msg := aiMsgs[0]
1574 msg.Content = validParts
1575 return msg, true
1576}
1577
1578// syntheticToolResultsForOrphanedCalls returns a tool message containing
1579// synthetic tool results for any tool calls in the assistant message that
1580// have no matching result in knownToolResultIDs. LLM APIs require every
1581// tool_use to be immediately followed by a tool_result; an interrupted
1582// session can leave orphaned tool_use blocks that permanently lock the
1583// conversation. Returns the message and true if any synthetic results were
1584// produced.
1585func syntheticToolResultsForOrphanedCalls(m message.Message, knownToolResultIDs map[string]struct{}) (fantasy.Message, bool) {
1586 var syntheticParts []fantasy.MessagePart
1587 for _, tc := range m.ToolCalls() {
1588 if _, hasResult := knownToolResultIDs[tc.ID]; hasResult {
1589 continue
1590 }
1591 slog.Warn(
1592 "Injecting synthetic tool result for orphaned tool call",
1593 "tool_call_id", tc.ID,
1594 "tool_name", tc.Name,
1595 )
1596 syntheticParts = append(syntheticParts, fantasy.ToolResultPart{
1597 ToolCallID: tc.ID,
1598 Output: fantasy.ToolResultOutputContentError{
1599 Error: errors.New("tool call was interrupted and did not produce a result, you may retry this call if the result is still needed"),
1600 },
1601 })
1602 }
1603 if len(syntheticParts) == 0 {
1604 return fantasy.Message{}, false
1605 }
1606 return fantasy.Message{
1607 Role: fantasy.MessageRoleTool,
1608 Content: syntheticParts,
1609 }, true
1610}
1611
1612func (a *sessionAgent) getSessionMessages(ctx context.Context, session session.Session) ([]message.Message, error) {
1613 msgs, err := a.messages.List(ctx, session.ID)
1614 if err != nil {
1615 return nil, fmt.Errorf("failed to list messages: %w", err)
1616 }
1617
1618 if session.SummaryMessageID != "" {
1619 summaryMsgIndex := -1
1620 for i, msg := range msgs {
1621 if msg.ID == session.SummaryMessageID {
1622 summaryMsgIndex = i
1623 break
1624 }
1625 }
1626 if summaryMsgIndex != -1 {
1627 msgs = msgs[summaryMsgIndex:]
1628 msgs[0].Role = message.User
1629 }
1630 }
1631 return msgs, nil
1632}
1633
1634// generateTitle generates a session titled based on the initial prompt.
1635func (a *sessionAgent) generateTitle(ctx context.Context, sessionID string, userPrompt string) {
1636 if userPrompt == "" {
1637 return
1638 }
1639
1640 smallModel := a.smallModel.Get()
1641 largeModel := a.largeModel.Get()
1642 systemPromptPrefix := a.systemPromptPrefix.Get()
1643
1644 var maxOutputTokens int64 = 40
1645 if smallModel.CatwalkCfg.CanReason {
1646 maxOutputTokens = smallModel.CatwalkCfg.DefaultMaxTokens
1647 }
1648
1649 newAgent := func(m fantasy.LanguageModel, p []byte, tok int64) fantasy.Agent {
1650 return fantasy.NewAgent(
1651 m,
1652 fantasy.WithSystemPrompt(string(p)+"\n /no_think"),
1653 fantasy.WithMaxOutputTokens(tok),
1654 fantasy.WithUserAgent(userAgent),
1655 )
1656 }
1657
1658 streamCall := fantasy.AgentStreamCall{
1659 Prompt: fmt.Sprintf("Generate a concise title for the following content:\n\n%s\n <think>\n\n</think>", userPrompt),
1660 PrepareStep: func(callCtx context.Context, opts fantasy.PrepareStepFunctionOptions) (_ context.Context, prepared fantasy.PrepareStepResult, err error) {
1661 prepared.Messages = opts.Messages
1662 if systemPromptPrefix != "" {
1663 prepared.Messages = append([]fantasy.Message{
1664 fantasy.NewSystemMessage(systemPromptPrefix),
1665 }, prepared.Messages...)
1666 }
1667 return callCtx, prepared, nil
1668 },
1669 }
1670
1671 // Use the small model to generate the title.
1672 model := smallModel
1673 agent := newAgent(model.Model, titlePrompt, maxOutputTokens)
1674 resp, err := agent.Stream(ctx, streamCall)
1675 if err == nil {
1676 // We successfully generated a title with the small model.
1677 slog.Debug("Generated title with small model")
1678 } else {
1679 // It didn't work. Let's try with the big model.
1680 slog.Error("Error generating title with small model; trying big model", "err", err)
1681 model = largeModel
1682 agent = newAgent(model.Model, titlePrompt, maxOutputTokens)
1683 resp, err = agent.Stream(ctx, streamCall)
1684 if err == nil {
1685 slog.Debug("Generated title with large model")
1686 } else {
1687 // Welp, the large model didn't work either. Use the default
1688 // session name and return.
1689 slog.Error("Error generating title with large model", "err", err)
1690 saveErr := a.sessions.Rename(ctx, sessionID, DefaultSessionName)
1691 if saveErr != nil {
1692 slog.Error("Failed to save session title", "error", saveErr)
1693 }
1694 return
1695 }
1696 }
1697
1698 if resp == nil {
1699 // Actually, we didn't get a response so we can't. Use the default
1700 // session name and return.
1701 slog.Error("Response is nil; can't generate title")
1702 saveErr := a.sessions.Rename(ctx, sessionID, DefaultSessionName)
1703 if saveErr != nil {
1704 slog.Error("Failed to save session title", "error", saveErr)
1705 }
1706 return
1707 }
1708
1709 // Clean up title.
1710 var title string
1711 title = strings.ReplaceAll(resp.Response.Content.Text(), "\n", " ")
1712
1713 // Remove thinking tags if present.
1714 title = thinkTagRegex.ReplaceAllString(title, "")
1715 title = orphanThinkTagRegex.ReplaceAllString(title, "")
1716
1717 title = strings.TrimSpace(title)
1718 title = cmp.Or(title, DefaultSessionName)
1719
1720 // Calculate usage and cost.
1721 var openrouterCost *float64
1722 for _, step := range resp.Steps {
1723 stepCost := a.openrouterCost(step.ProviderMetadata)
1724 if stepCost != nil {
1725 newCost := *stepCost
1726 if openrouterCost != nil {
1727 newCost += *openrouterCost
1728 }
1729 openrouterCost = &newCost
1730 }
1731 }
1732
1733 modelConfig := model.CatwalkCfg
1734 cost := modelConfig.CostPer1MInCached/1e6*float64(resp.TotalUsage.CacheCreationTokens) +
1735 modelConfig.CostPer1MOutCached/1e6*float64(resp.TotalUsage.CacheReadTokens) +
1736 modelConfig.CostPer1MIn/1e6*float64(resp.TotalUsage.InputTokens) +
1737 modelConfig.CostPer1MOut/1e6*float64(resp.TotalUsage.OutputTokens)
1738
1739 // Use override cost if available (e.g., from OpenRouter).
1740 if openrouterCost != nil {
1741 cost = *openrouterCost
1742 }
1743
1744 // Skip cost accumulation
1745 if model.FlatRate {
1746 cost = 0
1747 }
1748
1749 promptTokens := resp.TotalUsage.InputTokens + resp.TotalUsage.CacheCreationTokens
1750 completionTokens := resp.TotalUsage.OutputTokens
1751
1752 // Atomically update only title and usage fields to avoid overriding other
1753 // concurrent session updates.
1754 saveErr := a.sessions.UpdateTitleAndUsage(ctx, sessionID, title, promptTokens, completionTokens, cost)
1755 if saveErr != nil {
1756 slog.Error("Failed to save session title and usage", "error", saveErr)
1757 return
1758 }
1759}
1760
1761func (a *sessionAgent) openrouterCost(metadata fantasy.ProviderMetadata) *float64 {
1762 openrouterMetadata, ok := metadata[openrouter.Name]
1763 if !ok {
1764 return nil
1765 }
1766
1767 opts, ok := openrouterMetadata.(*openrouter.ProviderMetadata)
1768 if !ok {
1769 return nil
1770 }
1771 return &opts.Usage.Cost
1772}
1773
1774func (a *sessionAgent) updateSessionUsage(model Model, session *session.Session, usage fantasy.Usage, overrideCost *float64, estimated bool) {
1775 if !usageIsZero(usage) {
1776 session.EstimatedUsage = estimated
1777 }
1778
1779 modelConfig := model.CatwalkCfg
1780 cost := modelConfig.CostPer1MInCached/1e6*float64(usage.CacheCreationTokens) +
1781 modelConfig.CostPer1MOutCached/1e6*float64(usage.CacheReadTokens) +
1782 modelConfig.CostPer1MIn/1e6*float64(usage.InputTokens) +
1783 modelConfig.CostPer1MOut/1e6*float64(usage.OutputTokens)
1784
1785 if !estimated {
1786 a.eventTokensUsed(session.ID, model, usage, cost)
1787 }
1788
1789 if estimated {
1790 cost = 0
1791 } else {
1792 // Use override cost if available (e.g., from OpenRouter).
1793 if overrideCost != nil {
1794 cost = *overrideCost
1795 }
1796
1797 // Skip cost accumulation
1798 if model.FlatRate {
1799 cost = 0
1800 }
1801 }
1802
1803 session.Cost += cost
1804 updateSessionTokenCounters(session, usage)
1805}
1806
1807func updateSessionTokenCounters(session *session.Session, usage fantasy.Usage) {
1808 if usage.OutputTokens != 0 {
1809 session.CompletionTokens = usage.OutputTokens
1810 }
1811 if promptTokens := usage.InputTokens + usage.CacheReadTokens; promptTokens != 0 {
1812 session.PromptTokens = promptTokens
1813 }
1814}
1815
1816func summaryCompletionTokens(usage fantasy.Usage, summaryMessage message.Message) int64 {
1817 if usage.OutputTokens != 0 {
1818 return usage.OutputTokens
1819 }
1820 return approxTokenCount(summaryMessage.Content().Text) + approxTokenCount(summaryMessage.ReasoningContent().String())
1821}
1822
1823func (a *sessionAgent) Cancel(sessionID string) {
1824 // Serialize against the dispatch handoff in Run so the accepted ->
1825 // (cancel-on-entry | queued | active) transition is atomic against
1826 // this cancel. Every cancel observes at least one of: an active
1827 // request, an accepted run (recorded as a pending cancel), or a
1828 // queue entry it then clears. If none of those hold, an idle Escape
1829 // is a true no-op and must not poison the next prompt.
1830 mu := a.sessionMu(sessionID)
1831 mu.Lock()
1832 defer mu.Unlock()
1833
1834 // Cancel regular requests. Don't use Take() here - we need the entry to
1835 // remain in activeRequests so IsBusy() returns true until the goroutine
1836 // fully completes (including error handling that may access the DB).
1837 // The defer in processRequest will clean up the entry.
1838 if cancel, ok := a.activeRequests.Get(sessionID); ok && cancel != nil {
1839 slog.Debug("Request cancellation initiated", "session_id", sessionID)
1840 cancel()
1841 }
1842
1843 // Also check for summarize requests.
1844 if cancel, ok := a.activeRequests.Get(sessionID + "-summarize"); ok && cancel != nil {
1845 slog.Debug("Summarize cancellation initiated", "session_id", sessionID)
1846 cancel()
1847 }
1848
1849 // Record a pending cancel only when a dispatched-but-not-yet-active
1850 // run exists. This catches runs still in the goroutine scheduler or
1851 // about to enter Run's busy-queue branch, while leaving an idle
1852 // session untouched. Active and accepted are not mutually exclusive:
1853 // when a run is active and a follow-up has been accepted, both the
1854 // cancel above and this pending record fire.
1855 //
1856 // Raise the session's cancel mark to the latest accept sequence
1857 // assigned so far. Every prompt currently accepted-but-not-yet-
1858 // active has a sequence at or below that value, so one cancel covers
1859 // all of them; a prompt accepted after this cancel gets a strictly
1860 // higher sequence and is never poisoned. Using max keeps repeated
1861 // cancels idempotent while the same prompts are in flight and lets a
1862 // later cancel extend coverage to prompts accepted since.
1863 a.acceptedMu.Lock()
1864 count, ok := a.acceptedRuns.Get(sessionID)
1865 mark := a.acceptSeqGen
1866 a.acceptedMu.Unlock()
1867 if ok && count > 0 {
1868 slog.Debug("Recording cancel mark for accepted runs", "session_id", sessionID, "count", count, "mark", mark)
1869 existing, _ := a.cancelMark.Get(sessionID)
1870 a.cancelMark.Set(sessionID, max(existing, mark))
1871 }
1872
1873 if a.QueuedPrompts(sessionID) > 0 {
1874 slog.Debug("Clearing queued prompts", "session_id", sessionID)
1875 a.clearQueueAndNotify(sessionID)
1876 }
1877}
1878
1879func (a *sessionAgent) ClearQueue(sessionID string) {
1880 if a.QueuedPrompts(sessionID) > 0 {
1881 slog.Debug("Clearing queued prompts", "session_id", sessionID)
1882 a.clearQueueAndNotify(sessionID)
1883 }
1884}
1885
1886func (a *sessionAgent) CancelAll() {
1887 if !a.IsBusy() {
1888 return
1889 }
1890 for key := range a.activeRequests.Seq2() {
1891 a.Cancel(key) // key is sessionID
1892 }
1893
1894 timeout := time.After(5 * time.Second)
1895 for a.IsBusy() {
1896 select {
1897 case <-timeout:
1898 return
1899 default:
1900 time.Sleep(200 * time.Millisecond)
1901 }
1902 }
1903}
1904
1905func (a *sessionAgent) IsBusy() bool {
1906 var busy bool
1907 for cancelFunc := range a.activeRequests.Seq() {
1908 if cancelFunc != nil {
1909 busy = true
1910 break
1911 }
1912 }
1913 return busy
1914}
1915
1916func (a *sessionAgent) IsSessionBusy(sessionID string) bool {
1917 _, busy := a.activeRequests.Get(sessionID)
1918 return busy
1919}
1920
1921func (a *sessionAgent) QueuedPrompts(sessionID string) int {
1922 l, ok := a.messageQueue.Get(sessionID)
1923 if !ok {
1924 return 0
1925 }
1926 return len(l)
1927}
1928
1929func (a *sessionAgent) QueuedPromptsList(sessionID string) []string {
1930 l, ok := a.messageQueue.Get(sessionID)
1931 if !ok {
1932 return nil
1933 }
1934 prompts := make([]string, len(l))
1935 for i, call := range l {
1936 prompts[i] = call.Prompt
1937 }
1938 return prompts
1939}
1940
1941func (a *sessionAgent) SetModels(large Model, small Model) {
1942 a.largeModel.Set(large)
1943 a.smallModel.Set(small)
1944}
1945
1946func (a *sessionAgent) SetTools(tools []fantasy.AgentTool) {
1947 a.tools.SetSlice(tools)
1948}
1949
1950func (a *sessionAgent) SetSystemPrompt(systemPrompt string) {
1951 a.systemPrompt.Set(systemPrompt)
1952}
1953
1954func (a *sessionAgent) Model() Model {
1955 return a.largeModel.Get()
1956}
1957
1958// convertToToolResult converts a fantasy tool result to a message tool result.
1959func (a *sessionAgent) convertToToolResult(result fantasy.ToolResultContent) message.ToolResult {
1960 baseResult := message.ToolResult{
1961 ToolCallID: result.ToolCallID,
1962 Name: result.ToolName,
1963 Metadata: result.ClientMetadata,
1964 }
1965
1966 switch result.Result.GetType() {
1967 case fantasy.ToolResultContentTypeText:
1968 if r, ok := fantasy.AsToolResultOutputType[fantasy.ToolResultOutputContentText](result.Result); ok {
1969 baseResult.Content = r.Text
1970 }
1971 case fantasy.ToolResultContentTypeError:
1972 if r, ok := fantasy.AsToolResultOutputType[fantasy.ToolResultOutputContentError](result.Result); ok {
1973 baseResult.Content = r.Error.Error()
1974 baseResult.IsError = true
1975 }
1976 case fantasy.ToolResultContentTypeMedia:
1977 if r, ok := fantasy.AsToolResultOutputType[fantasy.ToolResultOutputContentMedia](result.Result); ok {
1978 if !stringext.IsValidBase64(r.Data) {
1979 slog.Warn(
1980 "Tool returned media with invalid base64 data, discarding image",
1981 "tool", result.ToolName,
1982 "tool_call_id", result.ToolCallID,
1983 )
1984 baseResult.Content = "Tool returned image data with invalid encoding"
1985 baseResult.IsError = true
1986 } else {
1987 content := r.Text
1988 if content == "" {
1989 content = fmt.Sprintf("Loaded %s content", r.MediaType)
1990 }
1991 baseResult.Content = content
1992 baseResult.Data = r.Data
1993 baseResult.MIMEType = r.MediaType
1994 }
1995 }
1996 }
1997
1998 return baseResult
1999}
2000
2001// workaroundProviderMediaLimitations converts media content in tool results to
2002// user messages for providers that don't natively support images in tool results.
2003//
2004// Problem: OpenAI, Google, OpenRouter, and other OpenAI-compatible providers
2005// don't support sending images/media in tool result messages - they only accept
2006// text in tool results. However, they DO support images in user messages.
2007//
2008// If we send media in tool results to these providers, the API returns an error.
2009//
2010// Solution: For these providers, we:
2011// 1. Replace the media in the tool result with a text placeholder
2012// 2. Inject a user message immediately after with the image as a file attachment
2013// 3. This maintains the tool execution flow while working around API limitations
2014//
2015// Anthropic and Bedrock support images natively in tool results, so we skip
2016// this workaround for them.
2017//
2018// Example transformation:
2019//
2020// BEFORE: [tool result: image data]
2021// AFTER: [tool result: "Image loaded - see attached"], [user: image attachment]
2022func (a *sessionAgent) workaroundProviderMediaLimitations(messages []fantasy.Message, largeModel Model) []fantasy.Message {
2023 providerSupportsMedia := largeModel.ModelCfg.Provider == string(catwalk.InferenceProviderAnthropic) ||
2024 largeModel.ModelCfg.Provider == string(catwalk.InferenceProviderBedrock) ||
2025 largeModel.ModelCfg.Provider == string(catwalk.InferenceProviderBedrockEurope)
2026
2027 if providerSupportsMedia {
2028 return messages
2029 }
2030
2031 convertedMessages := make([]fantasy.Message, 0, len(messages))
2032
2033 for _, msg := range messages {
2034 if msg.Role != fantasy.MessageRoleTool {
2035 convertedMessages = append(convertedMessages, msg)
2036 continue
2037 }
2038
2039 textParts := make([]fantasy.MessagePart, 0, len(msg.Content))
2040 var mediaFiles []fantasy.FilePart
2041
2042 for _, part := range msg.Content {
2043 toolResult, ok := fantasy.AsMessagePart[fantasy.ToolResultPart](part)
2044 if !ok {
2045 textParts = append(textParts, part)
2046 continue
2047 }
2048
2049 if media, ok := fantasy.AsToolResultOutputType[fantasy.ToolResultOutputContentMedia](toolResult.Output); ok {
2050 decoded, err := base64.StdEncoding.DecodeString(media.Data)
2051 if err != nil {
2052 slog.Warn("Failed to decode media data", "error", err)
2053 textParts = append(textParts, part)
2054 continue
2055 }
2056
2057 mediaFiles = append(mediaFiles, fantasy.FilePart{
2058 Data: decoded,
2059 MediaType: media.MediaType,
2060 Filename: fmt.Sprintf("tool-result-%s", toolResult.ToolCallID),
2061 })
2062
2063 textParts = append(textParts, fantasy.ToolResultPart{
2064 ToolCallID: toolResult.ToolCallID,
2065 Output: fantasy.ToolResultOutputContentText{
2066 Text: "[Image/media content loaded - see attached file]",
2067 },
2068 ProviderOptions: toolResult.ProviderOptions,
2069 })
2070 } else {
2071 textParts = append(textParts, part)
2072 }
2073 }
2074
2075 convertedMessages = append(convertedMessages, fantasy.Message{
2076 Role: fantasy.MessageRoleTool,
2077 Content: textParts,
2078 })
2079
2080 if len(mediaFiles) > 0 {
2081 convertedMessages = append(convertedMessages, fantasy.NewUserMessage(
2082 "Here is the media content from the tool result:",
2083 mediaFiles...,
2084 ))
2085 }
2086 }
2087
2088 return convertedMessages
2089}
2090
2091// buildSummaryPrompt constructs the prompt text for session summarization.
2092func buildSummaryPrompt(todos []session.Todo) string {
2093 var sb strings.Builder
2094 sb.WriteString("Provide a detailed summary of our conversation above.")
2095 if len(todos) > 0 {
2096 sb.WriteString("\n\n## Current Todo List\n\n")
2097 for _, t := range todos {
2098 fmt.Fprintf(&sb, "- [%s] %s\n", t.Status, t.Content)
2099 }
2100 sb.WriteString("\nInclude these tasks and their statuses in your summary. ")
2101 sb.WriteString("Instruct the resuming assistant to use the `todos` tool to continue tracking progress on these tasks.")
2102 }
2103 return sb.String()
2104}
2105
2106func providerRetryLogFields(err *fantasy.ProviderError, delay time.Duration) []any {
2107 fields := []any{
2108 "retry_delay", delay.String(),
2109 }
2110 if err == nil {
2111 return fields
2112 }
2113 fields = append(fields, "status_code", err.StatusCode)
2114 if err.Title != "" {
2115 fields = append(fields, "title", err.Title)
2116 }
2117 if err.Message != "" {
2118 fields = append(fields, "message", err.Message)
2119 }
2120 return fields
2121}