1// Package agent is the core orchestration layer for Crush AI agents.
2//
3// It provides session-based AI agent functionality for managing
4// conversations, tool execution, and message handling. It coordinates
5// interactions between language models, messages, sessions, and tools while
6// handling features like automatic summarization, queuing, and token
7// management.
8package agent
9
10import (
11 "cmp"
12 "context"
13 _ "embed"
14 "encoding/base64"
15 "errors"
16 "fmt"
17 "log/slog"
18 "net/http"
19 "os"
20 "regexp"
21 "strconv"
22 "strings"
23 "sync"
24 "sync/atomic"
25 "time"
26
27 "charm.land/catwalk/pkg/catwalk"
28 "charm.land/fantasy"
29 "charm.land/fantasy/providers/anthropic"
30 "charm.land/fantasy/providers/bedrock"
31 "charm.land/fantasy/providers/google"
32 "charm.land/fantasy/providers/openai"
33 "charm.land/fantasy/providers/openrouter"
34 "charm.land/fantasy/providers/vercel"
35 "charm.land/lipgloss/v2"
36 "github.com/charmbracelet/crush/internal/agent/hyper"
37 "github.com/charmbracelet/crush/internal/agent/notify"
38 "github.com/charmbracelet/crush/internal/agent/tools"
39 "github.com/charmbracelet/crush/internal/agent/tools/mcp"
40 "github.com/charmbracelet/crush/internal/config"
41 "github.com/charmbracelet/crush/internal/csync"
42 "github.com/charmbracelet/crush/internal/message"
43 "github.com/charmbracelet/crush/internal/pubsub"
44 "github.com/charmbracelet/crush/internal/session"
45 "github.com/charmbracelet/crush/internal/stringext"
46 "github.com/charmbracelet/crush/internal/version"
47 "github.com/charmbracelet/x/exp/charmtone"
48)
49
50const (
51 DefaultSessionName = "Untitled Session"
52
53 // Constants for auto-summarization thresholds
54 largeContextWindowThreshold = 200_000
55 largeContextWindowBuffer = 20_000
56 smallContextWindowRatio = 0.2
57)
58
59var userAgent = fmt.Sprintf("Charm-Crush/%s (https://charm.land/crush)", version.Version)
60
61//go:embed templates/title.md
62var titlePrompt []byte
63
64//go:embed templates/summary.md
65var summaryPrompt []byte
66
67// Used to remove <think> tags from generated titles.
68var (
69 thinkTagRegex = regexp.MustCompile(`(?s)<think>.*?</think>`)
70 orphanThinkTagRegex = regexp.MustCompile(`</?think>`)
71)
72
73type SessionAgentCall struct {
74 SessionID string
75 // RunID, when non-empty, is the caller-supplied correlator that
76 // gets echoed back on the notify.RunComplete event emitted for
77 // this turn. It is preserved when the call is enqueued behind a
78 // busy session so the queued turn's terminal event is still
79 // recognisable to the original caller. Callers that need a
80 // reliable completion contract (e.g. `crush run` against a
81 // session that may be busy) MUST set it; SessionID alone is
82 // ambiguous when concurrent turns share the same session.
83 RunID string
84 Prompt string
85 ProviderOptions fantasy.ProviderOptions
86 Attachments []message.Attachment
87 MaxOutputTokens int64
88 Temperature *float64
89 TopP *float64
90 TopK *int64
91 FrequencyPenalty *float64
92 PresencePenalty *float64
93 NonInteractive bool
94 // OnComplete, when non-nil, replaces the default RunComplete
95 // publish path: the inner Run hands the terminal payload to this
96 // callback instead of emitting it on the RunComplete broker. The
97 // coordinator uses this hook to coalesce the unauthorized →
98 // re-auth → retry chain into a single user-visible terminal
99 // event, so non-interactive clients (e.g. `crush run`) don't
100 // exit on a stale failed-attempt RunComplete before the
101 // successful retry. It is intentionally stripped when queueing
102 // a busy-session call (see Run): the originating
103 // coordinator.Run has long returned by the time the queued
104 // recursion drains, so falling back to the default broker
105 // publish keeps the event visible to subscribers.
106 OnComplete func(notify.RunComplete)
107 // Accepted, when non-nil, is the accept reservation taken by
108 // BeginAccepted before the call was dispatched onto a goroutine
109 // (the client/server fire-and-forget path). Run consumes it under
110 // dispatchMu[SessionID] once the accepted -> (cancel-on-entry |
111 // queued | active) transition has been chosen. When nil
112 // (in-process / local callers like AppWorkspace), behavior is
113 // unchanged and no accept tracking applies.
114 Accepted *AcceptedRun
115 // acceptSeq carries the accept sequence of the handle that produced
116 // this call after it has been enqueued and its Accepted handle
117 // stripped. The queue-drain paths compare it against a session's
118 // cancel mark so a follow-up queued before a cancel is dropped while
119 // one queued after the cancel survives. 0 means untracked (an
120 // in-process enqueue with no accept reservation), which the drain
121 // paths treat as covered by any present mark, preserving the
122 // pre-sequence behavior.
123 acceptSeq uint64
124}
125
126type SessionAgent interface {
127 Run(context.Context, SessionAgentCall) (*fantasy.AgentResult, error)
128 BeginAccepted(sessionID string) *AcceptedRun
129 SetModels(large Model, small Model)
130 SetTools(tools []fantasy.AgentTool)
131 SetSystemPrompt(systemPrompt string)
132 Cancel(sessionID string)
133 CancelAll()
134 IsSessionBusy(sessionID string) bool
135 IsBusy() bool
136 QueuedPrompts(sessionID string) int
137 QueuedPromptsList(sessionID string) []string
138 ClearQueue(sessionID string)
139 Summarize(context.Context, string, fantasy.ProviderOptions) error
140 Model() Model
141}
142
143type Model struct {
144 Model fantasy.LanguageModel
145 CatwalkCfg catwalk.Model
146 ModelCfg config.SelectedModel
147 FlatRate bool
148}
149
150type sessionAgent struct {
151 largeModel *csync.Value[Model]
152 smallModel *csync.Value[Model]
153 systemPromptPrefix *csync.Value[string]
154 systemPrompt *csync.Value[string]
155 tools *csync.Slice[fantasy.AgentTool]
156
157 isSubAgent bool
158 sessions session.Service
159 messages message.Service
160 disableAutoSummarize bool
161 isYolo bool
162 notify pubsub.Publisher[notify.Notification]
163 runComplete pubsub.Publisher[notify.RunComplete]
164
165 messageQueue *csync.Map[string, []SessionAgentCall]
166 activeRequests *csync.Map[string, context.CancelFunc]
167
168 // dispatchMu holds a per-session mutex that serializes the
169 // accepted -> (cancel-on-entry | queued | active) transition in
170 // Run against a concurrent Cancel. The lock is held only during
171 // the brief handoff (no DB or LLM I/O under the lock).
172 dispatchMu *csync.Map[string, *sync.Mutex]
173 // acceptedRuns counts dispatched-but-not-yet-active runs per
174 // session. A counter > 0 means a dispatched prompt is in flight
175 // and has not yet completed the dispatch handoff in Run. Only
176 // BeginAccepted increments it; only AcceptedRun.Close decrements
177 // it.
178 acceptedRuns *csync.Map[string, int]
179 // cancelMark records, per session, a high-water accept sequence: an
180 // accepted handle is canceled by it iff the handle's sequence is at
181 // or below the mark. Cancel raises the mark to the latest sequence
182 // assigned at cancel time, so a single Cancel covers every prompt
183 // accepted-but-not-yet-active then, while a prompt accepted later
184 // (higher sequence) is never poisoned. Absent or 0 means no pending
185 // cancel. It is only raised by Cancel when acceptedRuns > 0, so an
186 // idle Escape never records a mark.
187 cancelMark *csync.Map[string, uint64]
188 // dispatchMuCreate guards lazy creation of per-session entries in
189 // dispatchMu so two goroutines can't race to lock different mutex
190 // instances for the same session.
191 dispatchMuCreate sync.Mutex
192 // acceptedMu serializes increments/decrements of acceptedRuns and
193 // the assignment of accept sequence numbers from acceptSeqGen. It
194 // is separate from dispatchMu so AcceptedRun.Close (which may run
195 // while Run holds dispatchMu for the same session) does not
196 // deadlock by re-entering the dispatch lock.
197 acceptedMu sync.Mutex
198 // acceptSeqGen is the monotonic source of accept sequence numbers.
199 // Each BeginAccepted increments it under acceptedMu and stamps the
200 // returned handle, so sequences strictly increase in accept order
201 // across the agent. Cancel uses its current value as the per-session
202 // high-water mark.
203 acceptSeqGen uint64
204}
205
206type SessionAgentOptions struct {
207 LargeModel Model
208 SmallModel Model
209 SystemPromptPrefix string
210 SystemPrompt string
211 IsSubAgent bool
212 DisableAutoSummarize bool
213 IsYolo bool
214 Sessions session.Service
215 Messages message.Service
216 Tools []fantasy.AgentTool
217 Notify pubsub.Publisher[notify.Notification]
218 RunComplete pubsub.Publisher[notify.RunComplete]
219}
220
221func NewSessionAgent(
222 opts SessionAgentOptions,
223) SessionAgent {
224 return &sessionAgent{
225 largeModel: csync.NewValue(opts.LargeModel),
226 smallModel: csync.NewValue(opts.SmallModel),
227 systemPromptPrefix: csync.NewValue(opts.SystemPromptPrefix),
228 systemPrompt: csync.NewValue(opts.SystemPrompt),
229 isSubAgent: opts.IsSubAgent,
230 sessions: opts.Sessions,
231 messages: opts.Messages,
232 disableAutoSummarize: opts.DisableAutoSummarize,
233 tools: csync.NewSliceFrom(opts.Tools),
234 isYolo: opts.IsYolo,
235 notify: opts.Notify,
236 runComplete: opts.RunComplete,
237 messageQueue: csync.NewMap[string, []SessionAgentCall](),
238 activeRequests: csync.NewMap[string, context.CancelFunc](),
239 dispatchMu: csync.NewMap[string, *sync.Mutex](),
240 acceptedRuns: csync.NewMap[string, int](),
241 cancelMark: csync.NewMap[string, uint64](),
242 }
243}
244
245// AcceptedRun owns exactly one accept reservation taken by
246// BeginAccepted. It is the only carrier of accept-state across the
247// backend.runAgent / Coordinator.Run / sessionAgent.Run layers: a
248// counter > 0 means a dispatched prompt is in flight and has not yet
249// completed the dispatch handoff in Run. Close is the only way to
250// release the reservation and is idempotent.
251type AcceptedRun struct {
252 agent *sessionAgent
253 sessionID string
254 // seq is the monotonic accept sequence stamped by BeginAccepted. A
255 // cancel covers this handle iff seq is at or below the session's
256 // cancel mark, so a handle accepted after a cancel (higher seq) is
257 // never poisoned by it.
258 seq uint64
259 done atomic.Bool
260}
261
262// Close decrements the accept counter for this reservation. It is safe
263// to call multiple times; only the first call has effect.
264func (r *AcceptedRun) Close() {
265 if r == nil {
266 return
267 }
268 if !r.done.CompareAndSwap(false, true) {
269 return
270 }
271 r.agent.endAccepted(r.sessionID)
272}
273
274// SessionID exposes the session this reservation is for so the run path
275// can use it without an extra parameter.
276func (r *AcceptedRun) SessionID() string {
277 if r == nil {
278 return ""
279 }
280 return r.sessionID
281}
282
283// BeginAccepted increments the accept counter for sessionID and returns
284// a handle whose Close is the only way to decrement it. It is the only
285// entry point that mutates acceptedRuns.
286func (a *sessionAgent) BeginAccepted(sessionID string) *AcceptedRun {
287 a.acceptedMu.Lock()
288 defer a.acceptedMu.Unlock()
289 count, _ := a.acceptedRuns.Get(sessionID)
290 a.acceptedRuns.Set(sessionID, count+1)
291 a.acceptSeqGen++
292 return &AcceptedRun{agent: a, sessionID: sessionID, seq: a.acceptSeqGen}
293}
294
295// endAccepted decrements the accept counter for sessionID. It is only
296// called via AcceptedRun.Close. It uses a dedicated lock (not the
297// per-session dispatch mutex) so it can run while Run holds dispatchMu
298// for the same session without deadlocking.
299//
300// When the count reaches zero the session's cancel mark is dropped: no
301// accepted handle remains for it to cover, and any handle accepted later
302// gets a strictly higher sequence that the mark would not match anyway.
303// Handles canceled on entry never reach RunComplete, so this is the only
304// place that clears the mark for an all-canceled batch. Sibling handles
305// covered by the same mark are serialized on the per-session dispatch
306// mutex and read the mark before they Close, so this never clears it out
307// from under a covered handle still waiting to enter Run.
308func (a *sessionAgent) endAccepted(sessionID string) {
309 a.acceptedMu.Lock()
310 defer a.acceptedMu.Unlock()
311 count, ok := a.acceptedRuns.Get(sessionID)
312 if !ok || count <= 1 {
313 a.acceptedRuns.Del(sessionID)
314 a.cancelMark.Del(sessionID)
315 return
316 }
317 a.acceptedRuns.Set(sessionID, count-1)
318}
319
320// sessionMu returns the per-session dispatch mutex, creating it on first
321// use. Creation is guarded so concurrent callers always observe the same
322// mutex instance for a given session.
323func (a *sessionAgent) sessionMu(sessionID string) *sync.Mutex {
324 if mu, ok := a.dispatchMu.Get(sessionID); ok {
325 return mu
326 }
327 a.dispatchMuCreate.Lock()
328 defer a.dispatchMuCreate.Unlock()
329 if mu, ok := a.dispatchMu.Get(sessionID); ok {
330 return mu
331 }
332 mu := &sync.Mutex{}
333 a.dispatchMu.Set(sessionID, mu)
334 return mu
335}
336
337// enqueueCall appends call to the session's message queue. The
338// OnComplete hook is stripped: the caller that supplied it (typically
339// coordinator.Run) has its own retry/coalesce scope that ends when it
340// returns, so by the time the queue drains nobody is left to consume the
341// buffered terminal event. The recursive Run falls back to the default
342// broker publish, which is what existing subscribers expect for queued
343// turns.
344func (a *sessionAgent) enqueueCall(call SessionAgentCall) {
345 existing, ok := a.messageQueue.Get(call.SessionID)
346 if !ok {
347 existing = []SessionAgentCall{}
348 }
349 queued := call
350 if call.Accepted != nil {
351 // Preserve the accept sequence after the handle is stripped so
352 // the queue-drain paths can tell a follow-up queued before a
353 // cancel (covered by the mark) from one queued after it.
354 queued.acceptSeq = call.Accepted.seq
355 }
356 queued.OnComplete = nil
357 queued.Accepted = nil
358 existing = append(existing, queued)
359 a.messageQueue.Set(call.SessionID, existing)
360}
361
362// clearPendingCancel removes any pending-cancel mark for sessionID. It
363// takes the per-session dispatch lock so it is ordered against Cancel
364// and the dispatch handoff.
365func (a *sessionAgent) clearPendingCancel(sessionID string) {
366 mu := a.sessionMu(sessionID)
367 mu.Lock()
368 defer mu.Unlock()
369 a.cancelMark.Del(sessionID)
370}
371
372// canceledBySeq reports whether an accepted handle or queued call with
373// the given accept sequence is covered by a pending cancel for the
374// session. Callers must hold the session's dispatch mutex. A tracked
375// sequence (seq > 0) is covered only when it is at or below the cancel
376// high-water mark, so a prompt accepted after the cancel (higher seq) is
377// never poisoned. An untracked sequence (seq == 0, an in-process enqueue
378// with no accept reservation) is covered whenever any mark is present,
379// preserving the pre-sequence behavior. The mark is not consumed: it
380// stays so every sibling handle it covers observes the same cancel, and
381// a later handle (higher seq) ignores it regardless.
382func (a *sessionAgent) canceledBySeq(sessionID string, seq uint64) bool {
383 mark, ok := a.cancelMark.Get(sessionID)
384 if !ok || mark == 0 {
385 return false
386 }
387 return seq == 0 || seq <= mark
388}
389
390// persistCanceledTurn writes the user/assistant records for a turn that
391// was canceled before (or just as) streaming would have produced them.
392// It creates the user message only when it was not already created by an
393// earlier createUserMessage call (userMsgCreated), then writes an
394// assistant message with FinishReasonCanceled. Both writes use
395// context.WithoutCancel(ctx) so workspace shutdown (which cancels the run
396// context) can't drop them.
397func (a *sessionAgent) persistCanceledTurn(ctx context.Context, call SessionAgentCall, userMsgCreated bool) error {
398 writeCtx, cancel := context.WithTimeout(context.WithoutCancel(ctx), 5*time.Second)
399 defer cancel()
400 if !userMsgCreated {
401 if _, err := a.createUserMessage(writeCtx, call); err != nil {
402 return err
403 }
404 }
405 largeModel := a.largeModel.Get()
406 assistant, err := a.messages.Create(writeCtx, call.SessionID, message.CreateMessageParams{
407 Role: message.Assistant,
408 Parts: []message.ContentPart{},
409 Model: largeModel.ModelCfg.Model,
410 Provider: largeModel.ModelCfg.Provider,
411 })
412 if err != nil {
413 return err
414 }
415 assistant.AddFinish(message.FinishReasonCanceled, "User canceled request", "")
416 return a.messages.Update(writeCtx, assistant)
417}
418
419// publishRunComplete emits the authoritative terminal event for a turn.
420// It honors the per-call OnComplete hook when set (so the coordinator can
421// coalesce retries) and otherwise falls back to the RunComplete broker.
422// ctx is used only for the bounded-blocking must-deliver publish; the
423// terminal payload is supplied by the caller. This is the single emit path
424// shared by the streaming defer and the cancel-on-entry early return so a
425// caller waiting on RunComplete (e.g. `crush run` with a RunID) always
426// observes exactly one terminal event regardless of which Run branch ends
427// the turn.
428func (a *sessionAgent) publishRunComplete(ctx context.Context, call SessionAgentCall, complete notify.RunComplete) {
429 if call.OnComplete != nil {
430 call.OnComplete(complete)
431 return
432 }
433 if a.runComplete == nil {
434 return
435 }
436 a.runComplete.PublishMustDeliver(ctx, pubsub.UpdatedEvent, complete)
437}
438
439// ValidateCall performs the cheap structural validation that
440// sessionAgent.Run requires before a call can be dispatched: a call must
441// carry either a non-empty prompt or a text attachment, and it must name a
442// session. It is exported so callers that accept a run before dispatching it
443// (e.g. backend.SendMessage) can apply the same checks and keep the error
444// contract consistent.
445func ValidateCall(call SessionAgentCall) error {
446 if call.Prompt == "" && !message.ContainsTextAttachment(call.Attachments) {
447 return ErrEmptyPrompt
448 }
449 if call.SessionID == "" {
450 return ErrSessionMissing
451 }
452 return nil
453}
454
455func (a *sessionAgent) Run(ctx context.Context, call SessionAgentCall) (result *fantasy.AgentResult, retErr error) {
456 if err := ValidateCall(call); err != nil {
457 return nil, err
458 }
459
460 // genCtx/cancel are the run context and its cancel func. For the
461 // accepted (fire-and-forget) dispatch path they are created under
462 // dispatchMu below so a concurrent Cancel can observe the
463 // activeRequests entry before the assistant message exists. For
464 // the in-process path they stay nil here and are created later,
465 // preserving the original ordering.
466 var (
467 genCtx context.Context
468 cancel context.CancelFunc
469 activeRegistered bool
470 userMsgCreated bool
471 )
472
473 if call.Accepted != nil {
474 // Serialize the accepted -> (cancel-on-entry | queued |
475 // active) transition against a concurrent Cancel. Cancel takes
476 // the same per-session lock, so every cancel observes at least
477 // one of: a cancel mark, an activeRequests entry, or a
478 // messageQueue entry it then clears.
479 mu := a.sessionMu(call.SessionID)
480 mu.Lock()
481
482 if a.canceledBySeq(call.SessionID, call.Accepted.seq) {
483 // Cancel-on-entry: a cancel arrived while this run was
484 // dispatched but not yet active, and this handle's accept
485 // sequence is at or below the session's cancel mark. The
486 // mark is left in place so sibling handles it also covers
487 // observe the same cancel; release the accept reservation,
488 // drop the lock, and persist a canceled turn without
489 // entering Stream.
490 //
491 // This path returns before the streaming defer that
492 // publishes RunComplete is installed, so emit the terminal
493 // event explicitly. Without it, a caller waiting on
494 // RunComplete for this RunID (e.g. `crush run`, which
495 // ignores message events and blocks on RunComplete) would
496 // hang on an immediately-canceled accepted run.
497 call.Accepted.Close()
498 mu.Unlock()
499 complete := notify.RunComplete{
500 SessionID: call.SessionID,
501 RunID: call.RunID,
502 Cancelled: true,
503 }
504 if err := a.persistCanceledTurn(ctx, call, false); err != nil {
505 complete.Error = err.Error()
506 a.publishRunComplete(ctx, call, complete)
507 return nil, err
508 }
509 a.publishRunComplete(ctx, call, complete)
510 return nil, nil
511 }
512
513 if a.IsSessionBusy(call.SessionID) {
514 // Busy: an earlier prompt is active. Queue this call and
515 // release the accept reservation. A Cancel arriving after
516 // this point sees the active entry and clears the queue.
517 a.enqueueCall(call)
518 call.Accepted.Close()
519 mu.Unlock()
520 return nil, nil
521 }
522
523 // Idle: become the active run. Register the cancel func before
524 // dropping the lock so a Cancel that arrives between here and
525 // assistant creation is not lost.
526 runCtx := context.WithValue(ctx, tools.SessionIDContextKey, call.SessionID)
527 genCtx, cancel = context.WithCancel(runCtx)
528 a.activeRequests.Set(call.SessionID, cancel)
529 activeRegistered = true
530 call.Accepted.Close()
531 mu.Unlock()
532
533 defer cancel()
534 defer a.activeRequests.Del(call.SessionID)
535 } else if a.IsSessionBusy(call.SessionID) {
536 // Queue the message if busy. Strip OnComplete: the caller that
537 // supplied the hook (typically coordinator.Run) has its own
538 // retry/coalesce scope that ends when it returns, so by the time
539 // the queue drains nobody is left to consume the buffered
540 // terminal event. The recursive Run will fall back to the
541 // default broker publish, which is what existing subscribers
542 // expect for queued turns.
543 a.enqueueCall(call)
544 return nil, nil
545 }
546
547 // Copy mutable fields under lock to avoid races with SetTools/SetModels.
548 agentTools := a.tools.Copy()
549 largeModel := a.largeModel.Get()
550 systemPrompt := a.systemPrompt.Get()
551 promptPrefix := a.systemPromptPrefix.Get()
552 var instructions strings.Builder
553
554 for _, server := range mcp.GetStates() {
555 if server.State != mcp.StateConnected {
556 continue
557 }
558 if s := server.Client.InitializeResult().Instructions; s != "" {
559 instructions.WriteString(s)
560 instructions.WriteString("\n\n")
561 }
562 }
563
564 if s := instructions.String(); s != "" {
565 systemPrompt += "\n\n<mcp-instructions>\n" + s + "\n</mcp-instructions>"
566 }
567
568 if len(agentTools) > 0 {
569 // Add Anthropic caching to the last tool.
570 agentTools[len(agentTools)-1].SetProviderOptions(a.getCacheControlOptions())
571 }
572
573 agent := fantasy.NewAgent(
574 largeModel.Model,
575 fantasy.WithSystemPrompt(systemPrompt),
576 fantasy.WithTools(agentTools...),
577 fantasy.WithUserAgent(userAgent),
578 )
579
580 sessionLock := sync.Mutex{}
581 currentSession, err := a.sessions.Get(ctx, call.SessionID)
582 if err != nil {
583 return nil, fmt.Errorf("failed to get session: %w", err)
584 }
585
586 msgs, err := a.getSessionMessages(ctx, currentSession)
587 if err != nil {
588 return nil, fmt.Errorf("failed to get session messages: %w", err)
589 }
590
591 var wg sync.WaitGroup
592 // Generate title if first message.
593 if len(msgs) == 0 {
594 titleCtx := ctx // Copy to avoid race with ctx reassignment below.
595 wg.Go(func() {
596 a.generateTitle(titleCtx, call.SessionID, call.Prompt)
597 })
598 }
599 defer wg.Wait()
600
601 // Add the user message to the session.
602 _, err = a.createUserMessage(ctx, call)
603 if err != nil {
604 return nil, err
605 }
606 userMsgCreated = true
607
608 // Add the session to the context.
609 ctx = context.WithValue(ctx, tools.SessionIDContextKey, call.SessionID)
610
611 // For the accepted dispatch path the run context and cancel func
612 // were already created and registered under dispatchMu above; reuse
613 // them. For the in-process path create them here, preserving the
614 // original ordering.
615 if !activeRegistered {
616 genCtx, cancel = context.WithCancel(ctx)
617 a.activeRequests.Set(call.SessionID, cancel)
618
619 defer cancel()
620 defer a.activeRequests.Del(call.SessionID)
621 }
622 // skipRunComplete is set just before the queued-recursion path so
623 // the outer Run doesn't publish a RunComplete that would race
624 // with — and be superseded by — the recursive call's own
625 // RunComplete (each queued user prompt is its own turn and
626 // publishes exactly one terminal event).
627 var skipRunComplete bool
628 // currentAssistant is declared here so the deferred RunComplete
629 // publish below can capture the pointer that PrepareStep will
630 // later (re)assign for each streaming step. The final assistant
631 // message of the turn is the value reachable through this
632 // pointer when the defer runs.
633 var currentAssistant *message.Message
634 // Drain any debounced message updates before returning. message.Service
635 // already flushes synchronously on terminal updates, but a defer here
636 // guarantees the contract at every Run exit (success, error, panic
637 // recovery upstream) without callers needing to know.
638 //
639 // After the flush completes — meaning all per-message
640 // Publish(UpdatedEvent) calls have fired and been buffered into
641 // every subscriber's channel — publish the authoritative
642 // RunComplete event for this turn. The flush-then-publish order
643 // gives well-behaved clients the best chance of seeing the final
644 // message event before RunComplete; the embedded Text field
645 // reconciles for clients that observe the events out of order
646 // (the pubsub broker fan-in does not serialize publishes from
647 // different upstream brokers).
648 defer func() {
649 // Use a context detached from the run context: workspace
650 // shutdown cancels ctx before this goroutine returns, but the
651 // buffered streaming deltas must still land before the DB is
652 // closed. A short timeout bounds the flush.
653 flushCtx, flushCancel := context.WithTimeout(context.WithoutCancel(ctx), 5*time.Second)
654 defer flushCancel()
655 if flushErr := a.messages.FlushAll(flushCtx); flushErr != nil {
656 slog.Error("Failed to flush pending message updates after run", "error", flushErr)
657 }
658 if skipRunComplete {
659 return
660 }
661 complete := notify.RunComplete{SessionID: call.SessionID, RunID: call.RunID}
662 if currentAssistant != nil {
663 complete.MessageID = currentAssistant.ID
664 complete.Text = currentAssistant.Content().String()
665 }
666 if retErr != nil {
667 complete.Error = retErr.Error()
668 complete.Cancelled = errors.Is(retErr, context.Canceled)
669 } else if ctx.Err() != nil {
670 complete.Cancelled = true
671 }
672 // Prefer the per-call hook when supplied so the coordinator
673 // can coalesce retries (e.g. unauthorized → re-auth → retry)
674 // into a single user-visible terminal event. The fallback
675 // must-deliver publish applies bounded-blocking semantics to
676 // the authoritative terminal event so a momentarily-full
677 // subscriber channel can't silently drop it and hang
678 // non-interactive clients waiting on RunComplete.
679 a.publishRunComplete(ctx, call, complete)
680 }()
681
682 history, files := a.preparePrompt(msgs, largeModel.CatwalkCfg.SupportsImages, call.Attachments...)
683
684 startTime := time.Now()
685 a.eventPromptSent(call.SessionID)
686
687 var stepMessages []fantasy.Message
688 var shouldSummarize bool
689 // Don't send MaxOutputTokens if 0 — some providers (e.g. LM Studio) reject it
690 var maxOutputTokens *int64
691 if call.MaxOutputTokens > 0 {
692 maxOutputTokens = &call.MaxOutputTokens
693 }
694 result, err = agent.Stream(genCtx, fantasy.AgentStreamCall{
695 Prompt: message.PromptWithTextAttachments(call.Prompt, call.Attachments),
696 Files: files,
697 Messages: history,
698 ProviderOptions: call.ProviderOptions,
699 MaxOutputTokens: maxOutputTokens,
700 TopP: call.TopP,
701 Temperature: call.Temperature,
702 PresencePenalty: call.PresencePenalty,
703 TopK: call.TopK,
704 FrequencyPenalty: call.FrequencyPenalty,
705 PrepareStep: func(callContext context.Context, options fantasy.PrepareStepFunctionOptions) (_ context.Context, prepared fantasy.PrepareStepResult, err error) {
706 prepared.Messages = options.Messages
707 for i := range prepared.Messages {
708 prepared.Messages[i].ProviderOptions = nil
709 }
710
711 // Use latest tools (updated by SetTools when MCP tools change).
712 prepared.Tools = a.tools.Copy()
713
714 // Drain queued follow-up prompts, but skip any covered by a
715 // cancel recorded while they sat in the queue: a cancel that
716 // arrived after a prompt was queued must not let it run as
717 // part of this step. Coverage is per-call by accept sequence
718 // so a follow-up queued after the cancel (higher seq) is
719 // still folded in.
720 dispatchLock := a.sessionMu(call.SessionID)
721 dispatchLock.Lock()
722 queuedCalls, _ := a.messageQueue.Get(call.SessionID)
723 a.messageQueue.Del(call.SessionID)
724 dispatchLock.Unlock()
725 for _, queued := range queuedCalls {
726 if a.canceledBySeq(call.SessionID, queued.acceptSeq) {
727 continue
728 }
729 userMessage, createErr := a.createUserMessage(callContext, queued)
730 if createErr != nil {
731 return callContext, prepared, createErr
732 }
733 prepared.Messages = append(prepared.Messages, userMessage.ToAIMessage()...)
734 }
735
736 prepared.Messages = a.workaroundProviderMediaLimitations(prepared.Messages, largeModel)
737
738 lastSystemRoleInx := 0
739 systemMessageUpdated := false
740 for i, msg := range prepared.Messages {
741 // Only add cache control to the last message.
742 if msg.Role == fantasy.MessageRoleSystem {
743 lastSystemRoleInx = i
744 } else if !systemMessageUpdated {
745 prepared.Messages[lastSystemRoleInx].ProviderOptions = a.getCacheControlOptions()
746 systemMessageUpdated = true
747 }
748 // Than add cache control to the last 2 messages.
749 if i > len(prepared.Messages)-3 {
750 prepared.Messages[i].ProviderOptions = a.getCacheControlOptions()
751 }
752 }
753
754 if promptPrefix != "" {
755 prepared.Messages = append([]fantasy.Message{fantasy.NewSystemMessage(promptPrefix)}, prepared.Messages...)
756 }
757
758 sessionLock.Lock()
759 stepMessages = cloneFantasyMessages(prepared.Messages)
760 sessionLock.Unlock()
761
762 var assistantMsg message.Message
763 assistantMsg, err = a.messages.Create(callContext, call.SessionID, message.CreateMessageParams{
764 Role: message.Assistant,
765 Parts: []message.ContentPart{},
766 Model: largeModel.ModelCfg.Model,
767 Provider: largeModel.ModelCfg.Provider,
768 })
769 if err != nil {
770 return callContext, prepared, err
771 }
772 callContext = context.WithValue(callContext, tools.MessageIDContextKey, assistantMsg.ID)
773 callContext = context.WithValue(callContext, tools.SupportsImagesContextKey, largeModel.CatwalkCfg.SupportsImages)
774 callContext = context.WithValue(callContext, tools.ModelNameContextKey, largeModel.CatwalkCfg.Name)
775 currentAssistant = &assistantMsg
776 return callContext, prepared, err
777 },
778 OnReasoningStart: func(id string, reasoning fantasy.ReasoningContent) error {
779 currentAssistant.AppendReasoningContent(reasoning.Text)
780 return a.messages.Update(genCtx, *currentAssistant)
781 },
782 OnReasoningDelta: func(id string, text string) error {
783 currentAssistant.AppendReasoningContent(text)
784 return a.messages.Update(genCtx, *currentAssistant)
785 },
786 OnReasoningEnd: func(id string, reasoning fantasy.ReasoningContent) error {
787 // handle anthropic signature
788 if anthropicData, ok := reasoning.ProviderMetadata[anthropic.Name]; ok {
789 if reasoning, ok := anthropicData.(*anthropic.ReasoningOptionMetadata); ok {
790 currentAssistant.AppendReasoningSignature(reasoning.Signature)
791 }
792 }
793 if googleData, ok := reasoning.ProviderMetadata[google.Name]; ok {
794 if reasoning, ok := googleData.(*google.ReasoningMetadata); ok {
795 currentAssistant.AppendThoughtSignature(reasoning.Signature, reasoning.ToolID)
796 }
797 }
798 if openaiData, ok := reasoning.ProviderMetadata[openai.Name]; ok {
799 if reasoning, ok := openaiData.(*openai.ResponsesReasoningMetadata); ok {
800 currentAssistant.SetReasoningResponsesData(reasoning)
801 }
802 }
803 currentAssistant.FinishThinking()
804 return a.messages.Update(genCtx, *currentAssistant)
805 },
806 OnTextDelta: func(id string, text string) error {
807 // Strip leading newline from initial text content. This is is
808 // particularly important in non-interactive mode where leading
809 // newlines are very visible.
810 if len(currentAssistant.Parts) == 0 {
811 text = strings.TrimPrefix(text, "\n")
812 }
813
814 currentAssistant.AppendContent(text)
815 return a.messages.Update(genCtx, *currentAssistant)
816 },
817 OnToolInputStart: func(id string, toolName string) error {
818 toolCall := message.ToolCall{
819 ID: id,
820 Name: toolName,
821 ProviderExecuted: false,
822 Finished: false,
823 }
824 currentAssistant.AddToolCall(toolCall)
825 // Use parent ctx instead of genCtx to ensure the update succeeds
826 // even if the request is canceled mid-stream
827 return a.messages.Update(ctx, *currentAssistant)
828 },
829 OnRetry: func(err *fantasy.ProviderError, delay time.Duration) {
830 slog.Warn("Provider request failed, retrying", providerRetryLogFields(err, delay)...)
831 },
832 OnToolCall: func(tc fantasy.ToolCallContent) error {
833 toolCall := message.ToolCall{
834 ID: tc.ToolCallID,
835 Name: tc.ToolName,
836 Input: tc.Input,
837 ProviderExecuted: false,
838 Finished: true,
839 }
840 currentAssistant.AddToolCall(toolCall)
841 // Use parent ctx instead of genCtx to ensure the update succeeds
842 // even if the request is canceled mid-stream
843 return a.messages.Update(ctx, *currentAssistant)
844 },
845 OnToolResult: func(result fantasy.ToolResultContent) error {
846 toolResult := a.convertToToolResult(result)
847 // Use parent ctx instead of genCtx to ensure the message is created
848 // even if the request is canceled mid-stream
849 _, createMsgErr := a.messages.Create(ctx, currentAssistant.SessionID, message.CreateMessageParams{
850 Role: message.Tool,
851 Parts: []message.ContentPart{
852 toolResult,
853 },
854 })
855 return createMsgErr
856 },
857 OnStepFinish: func(stepResult fantasy.StepResult) error {
858 finishReason := message.FinishReasonUnknown
859 switch stepResult.FinishReason {
860 case fantasy.FinishReasonLength:
861 finishReason = message.FinishReasonMaxTokens
862 case fantasy.FinishReasonStop:
863 finishReason = message.FinishReasonEndTurn
864 case fantasy.FinishReasonToolCalls:
865 finishReason = message.FinishReasonToolUse
866 }
867 // If a tool result halted the turn (e.g. a hook halt or a
868 // permission denial), the step ends on FinishReasonToolCalls but
869 // the model will not be called again. Treat it as the end of the
870 // turn so the UI can render the assistant footer.
871 if finishReason == message.FinishReasonToolUse {
872 for _, tr := range stepResult.Content.ToolResults() {
873 if tr.StopTurn {
874 finishReason = message.FinishReasonEndTurn
875 break
876 }
877 }
878 }
879 currentAssistant.AddFinish(finishReason, "", "")
880 sessionLock.Lock()
881 defer sessionLock.Unlock()
882
883 updatedSession, getSessionErr := a.sessions.Get(ctx, call.SessionID)
884 if getSessionErr != nil {
885 return getSessionErr
886 }
887 usage, estimated := fallbackStepUsage(stepMessages, stepResult)
888 a.updateSessionUsage(largeModel, &updatedSession, usage, a.openrouterCost(stepResult.ProviderMetadata), estimated)
889 _, sessionErr := a.sessions.Save(ctx, updatedSession)
890 if sessionErr != nil {
891 return sessionErr
892 }
893 currentSession = updatedSession
894 return a.messages.Update(genCtx, *currentAssistant)
895 },
896 StopWhen: []fantasy.StopCondition{
897 func(_ []fantasy.StepResult) bool {
898 cw := int64(largeModel.CatwalkCfg.ContextWindow)
899 // If context window is unknown (0), skip auto-summarize
900 // to avoid immediately truncating custom/local models.
901 if cw == 0 {
902 return false
903 }
904 tokens := currentSession.CompletionTokens + currentSession.PromptTokens
905 remaining := cw - tokens
906 var threshold int64
907 if cw > largeContextWindowThreshold {
908 threshold = largeContextWindowBuffer
909 } else {
910 threshold = int64(float64(cw) * smallContextWindowRatio)
911 }
912 if (remaining <= threshold) && !a.disableAutoSummarize {
913 shouldSummarize = true
914 return true
915 }
916 return false
917 },
918 func(steps []fantasy.StepResult) bool {
919 return hasRepeatedToolCalls(steps, loopDetectionWindowSize, loopDetectionMaxRepeats)
920 },
921 },
922 })
923
924 a.eventPromptResponded(call.SessionID, time.Since(startTime).Truncate(time.Second))
925
926 if err != nil {
927 isHyper := largeModel.ModelCfg.Provider == hyper.Name
928 isCancelErr := errors.Is(err, context.Canceled)
929 if currentAssistant == nil {
930 // Cancel-before-assistant-creation window: the run was
931 // canceled after activeRequests.Set but before PrepareStep
932 // created the assistant message. Without this, the turn
933 // would return with no FinishReasonCanceled marker and no
934 // user-visible record. The user message was already created
935 // above, so persistCanceledTurn only writes the assistant
936 // record.
937 if isCancelErr {
938 if persistErr := a.persistCanceledTurn(ctx, call, userMsgCreated); persistErr != nil {
939 return nil, persistErr
940 }
941 }
942 return result, err
943 }
944 // Persist final state with a context detached from the run
945 // context. The run context (ctx) is derived from the
946 // workspace context, which workspace shutdown cancels before
947 // agent goroutines finish; using ctx here would drop the
948 // final assistant state. WithoutCancel keeps the values
949 // (e.g. session ID) while ignoring cancellation, and a short
950 // timeout bounds the cleanup writes.
951 cleanupCtx, cleanupCancel := context.WithTimeout(context.WithoutCancel(ctx), 5*time.Second)
952 defer cleanupCancel()
953 // Ensure we finish thinking on error to close the reasoning state.
954 currentAssistant.FinishThinking()
955 toolCalls := currentAssistant.ToolCalls()
956 // INFO: we use the cleanup context here because the genCtx has been cancelled.
957 msgs, createErr := a.messages.List(cleanupCtx, currentAssistant.SessionID)
958 if createErr != nil {
959 return nil, createErr
960 }
961 for _, tc := range toolCalls {
962 if !tc.Finished {
963 tc.Finished = true
964 tc.Input = "{}"
965 currentAssistant.AddToolCall(tc)
966 updateErr := a.messages.Update(cleanupCtx, *currentAssistant)
967 if updateErr != nil {
968 return nil, updateErr
969 }
970 }
971
972 found := false
973 for _, msg := range msgs {
974 if msg.Role == message.Tool {
975 for _, tr := range msg.ToolResults() {
976 if tr.ToolCallID == tc.ID {
977 found = true
978 break
979 }
980 }
981 }
982 if found {
983 break
984 }
985 }
986 if found {
987 continue
988 }
989 content := "There was an error while executing the tool"
990 if isCancelErr {
991 content = "Error: user cancelled assistant tool calling"
992 }
993 toolResult := message.ToolResult{
994 ToolCallID: tc.ID,
995 Name: tc.Name,
996 Content: content,
997 IsError: true,
998 }
999 _, createErr = a.messages.Create(cleanupCtx, currentAssistant.SessionID, message.CreateMessageParams{
1000 Role: message.Tool,
1001 Parts: []message.ContentPart{
1002 toolResult,
1003 },
1004 })
1005 if createErr != nil {
1006 return nil, createErr
1007 }
1008 }
1009 var fantasyErr *fantasy.Error
1010 var providerErr *fantasy.ProviderError
1011 const defaultTitle = "Provider Error"
1012 linkStyle := lipgloss.NewStyle().Foreground(charmtone.Guac).Underline(true)
1013 if isCancelErr {
1014 currentAssistant.AddFinish(message.FinishReasonCanceled, "User canceled request", "")
1015 } else if isHyper && errors.As(err, &providerErr) && providerErr.StatusCode == http.StatusUnauthorized {
1016 currentAssistant.AddFinish(message.FinishReasonError, "Unauthorized", `Please re-authenticate with Hyper. You can also run "crush auth" to re-authenticate.`)
1017 if a.notify != nil {
1018 a.notify.Publish(pubsub.CreatedEvent, notify.Notification{
1019 SessionID: call.SessionID,
1020 SessionTitle: currentSession.Title,
1021 Type: notify.TypeReAuthenticate,
1022 ProviderID: largeModel.ModelCfg.Provider,
1023 })
1024 }
1025 } else if isHyper && errors.As(err, &providerErr) && providerErr.StatusCode == http.StatusPaymentRequired {
1026 url := hyper.BaseURL()
1027 link := linkStyle.Hyperlink(url, "id=hyper").Render(url)
1028 currentAssistant.AddFinish(message.FinishReasonError, "No credits", "You're out of credits. Add more at "+link)
1029 } else if errors.As(err, &providerErr) {
1030 if providerErr.Message == "The requested model is not supported." {
1031 url := "https://github.com/settings/copilot/features"
1032 link := linkStyle.Hyperlink(url, "id=copilot").Render(url)
1033 currentAssistant.AddFinish(
1034 message.FinishReasonError,
1035 "Copilot model not enabled",
1036 fmt.Sprintf("%q is not enabled in Copilot. Go to the following page to enable it. Then, wait 5 minutes before trying again. %s", largeModel.CatwalkCfg.Name, link),
1037 )
1038 } else {
1039 currentAssistant.AddFinish(message.FinishReasonError, cmp.Or(stringext.Capitalize(providerErr.Title), defaultTitle), providerErr.Message)
1040 }
1041 } else if errors.As(err, &fantasyErr) {
1042 currentAssistant.AddFinish(message.FinishReasonError, cmp.Or(stringext.Capitalize(fantasyErr.Title), defaultTitle), fantasyErr.Message)
1043 } else {
1044 currentAssistant.AddFinish(message.FinishReasonError, defaultTitle, err.Error())
1045 }
1046 // Note: we use the cleanup context here because the genCtx has been
1047 // cancelled.
1048 updateErr := a.messages.Update(cleanupCtx, *currentAssistant)
1049 if updateErr != nil {
1050 return nil, updateErr
1051 }
1052 return nil, err
1053 }
1054
1055 if shouldSummarize {
1056 a.activeRequests.Del(call.SessionID)
1057 if summarizeErr := a.Summarize(genCtx, call.SessionID, call.ProviderOptions); summarizeErr != nil {
1058 return nil, summarizeErr
1059 }
1060 // If the agent wasn't done...
1061 if len(currentAssistant.ToolCalls()) > 0 {
1062 existing, ok := a.messageQueue.Get(call.SessionID)
1063 if !ok {
1064 existing = []SessionAgentCall{}
1065 }
1066 call.Prompt = fmt.Sprintf("The previous session was interrupted because it got too long, the initial user request was: `%s`", call.Prompt)
1067 existing = append(existing, call)
1068 a.messageQueue.Set(call.SessionID, existing)
1069 }
1070 }
1071
1072 // Release active request before publishing the notification.
1073 // TUI handlers poll IsSessionBusy() and only re-evaluate when a
1074 // tea.Msg arrives, so the cleanup must precede the notify or
1075 // subscribers see stale busy state at the moment of receipt.
1076 a.activeRequests.Del(call.SessionID)
1077 cancel()
1078
1079 // Send notification that agent has finished its turn (skip for
1080 // nested/non-interactive sessions).
1081 if !call.NonInteractive && a.notify != nil {
1082 a.notify.Publish(pubsub.CreatedEvent, notify.Notification{
1083 SessionID: call.SessionID,
1084 SessionTitle: currentSession.Title,
1085 Type: notify.TypeAgentFinished,
1086 })
1087 }
1088
1089 // Hand off to the next queued prompt (if any) under dispatchMu so
1090 // the transition from this finished run to the queued run is atomic
1091 // against a concurrent Cancel. activeRequests for this session was
1092 // just deleted above, so without the lock there is a window in
1093 // which the session looks idle and a cancel becomes a no-op that
1094 // fails to stop the queued prompt. Holding the lock lets us observe
1095 // a pending cancel recorded against the session and drop the queue
1096 // instead of running it, and (for the recursion) hand a fresh
1097 // accept reservation to the dequeued call so acceptedRuns stays > 0
1098 // across the recursive Run's own dispatch handoff — keeping the
1099 // session observable to Cancel for the entire transition and
1100 // closing the dequeue -> re-register window.
1101 mu := a.sessionMu(call.SessionID)
1102 mu.Lock()
1103 queuedMessages, _ := a.messageQueue.Get(call.SessionID)
1104 if mark, ok := a.cancelMark.Get(call.SessionID); ok && mark > 0 && len(queuedMessages) > 0 {
1105 // A cancel was recorded for this session (e.g. it arrived while
1106 // this run was active and follow-ups had been queued). Drop the
1107 // queued prompts it covers (accept sequence at or below the
1108 // mark, or untracked); keep any queued after the cancel (higher
1109 // sequence) so they still run.
1110 var kept []SessionAgentCall
1111 for _, q := range queuedMessages {
1112 if q.acceptSeq == 0 || q.acceptSeq <= mark {
1113 continue
1114 }
1115 kept = append(kept, q)
1116 }
1117 queuedMessages = kept
1118 a.messageQueue.Set(call.SessionID, kept)
1119 }
1120 if len(queuedMessages) == 0 {
1121 // No queued work. Clear the cancel mark only when no accepted
1122 // run remains in flight that it might still cover; otherwise a
1123 // sibling prompt (sequence at or below the mark) waiting to
1124 // enter Run would lose its cancellation. When accepted runs are
1125 // gone, this also clears a stale mark so it can't catch a
1126 // future run.
1127 a.messageQueue.Del(call.SessionID)
1128 a.acceptedMu.Lock()
1129 inFlight, _ := a.acceptedRuns.Get(call.SessionID)
1130 a.acceptedMu.Unlock()
1131 if inFlight == 0 {
1132 a.cancelMark.Del(call.SessionID)
1133 }
1134 mu.Unlock()
1135 return result, err
1136 }
1137 // There are queued messages restart the loop. The recursive Run
1138 // publishes its own RunComplete for the queued prompt, so suppress
1139 // the outer defer's emit to avoid a duplicate event whose Error
1140 // field would belong to the recursive turn but whose MessageID/Text
1141 // would belong to the outer turn.
1142 skipRunComplete = true
1143 firstQueuedMessage := queuedMessages[0]
1144 a.messageQueue.Set(call.SessionID, queuedMessages[1:])
1145 // Reserve a fresh accept for the dequeued prompt before dropping the
1146 // lock so acceptedRuns > 0 across the handoff into the recursive
1147 // Run. This closes the window between this dequeue and the recursive
1148 // Run registering its activeRequests entry: a cancel arriving in
1149 // that window now records a pending cancel (acceptedRuns > 0) that
1150 // the recursive Run's accepted path observes as cancel-on-entry.
1151 firstQueuedMessage.Accepted = a.BeginAccepted(call.SessionID)
1152 mu.Unlock()
1153 return a.Run(ctx, firstQueuedMessage)
1154}
1155
1156func (a *sessionAgent) Summarize(ctx context.Context, sessionID string, opts fantasy.ProviderOptions) error {
1157 if a.IsSessionBusy(sessionID) {
1158 return ErrSessionBusy
1159 }
1160
1161 // Copy mutable fields under lock to avoid races with SetModels.
1162 largeModel := a.largeModel.Get()
1163 systemPromptPrefix := a.systemPromptPrefix.Get()
1164
1165 currentSession, err := a.sessions.Get(ctx, sessionID)
1166 if err != nil {
1167 return fmt.Errorf("failed to get session: %w", err)
1168 }
1169 msgs, err := a.getSessionMessages(ctx, currentSession)
1170 if err != nil {
1171 return err
1172 }
1173 if len(msgs) == 0 {
1174 // Nothing to summarize.
1175 return nil
1176 }
1177
1178 aiMsgs, _ := a.preparePrompt(msgs, largeModel.CatwalkCfg.SupportsImages)
1179
1180 genCtx, cancel := context.WithCancel(ctx)
1181 a.activeRequests.Set(sessionID, cancel)
1182 defer a.activeRequests.Del(sessionID)
1183 defer cancel()
1184 defer func() {
1185 if flushErr := a.messages.FlushAll(ctx); flushErr != nil {
1186 slog.Error("Failed to flush pending message updates after summarize", "error", flushErr)
1187 }
1188 }()
1189
1190 agent := fantasy.NewAgent(
1191 largeModel.Model,
1192 fantasy.WithSystemPrompt(string(summaryPrompt)),
1193 fantasy.WithUserAgent(userAgent),
1194 )
1195 summaryMessage, err := a.messages.Create(ctx, sessionID, message.CreateMessageParams{
1196 Role: message.Assistant,
1197 Model: largeModel.ModelCfg.Model,
1198 Provider: largeModel.ModelCfg.Provider,
1199 IsSummaryMessage: true,
1200 })
1201 if err != nil {
1202 return err
1203 }
1204
1205 summaryPromptText := buildSummaryPrompt(currentSession.Todos)
1206
1207 resp, err := agent.Stream(genCtx, fantasy.AgentStreamCall{
1208 Prompt: summaryPromptText,
1209 Messages: aiMsgs,
1210 ProviderOptions: opts,
1211 PrepareStep: func(callContext context.Context, options fantasy.PrepareStepFunctionOptions) (_ context.Context, prepared fantasy.PrepareStepResult, err error) {
1212 prepared.Messages = options.Messages
1213 if systemPromptPrefix != "" {
1214 prepared.Messages = append([]fantasy.Message{fantasy.NewSystemMessage(systemPromptPrefix)}, prepared.Messages...)
1215 }
1216 return callContext, prepared, nil
1217 },
1218 OnReasoningDelta: func(id string, text string) error {
1219 summaryMessage.AppendReasoningContent(text)
1220 return a.messages.Update(genCtx, summaryMessage)
1221 },
1222 OnReasoningEnd: func(id string, reasoning fantasy.ReasoningContent) error {
1223 // Handle anthropic signature.
1224 if anthropicData, ok := reasoning.ProviderMetadata["anthropic"]; ok {
1225 if signature, ok := anthropicData.(*anthropic.ReasoningOptionMetadata); ok && signature.Signature != "" {
1226 summaryMessage.AppendReasoningSignature(signature.Signature)
1227 }
1228 }
1229 summaryMessage.FinishThinking()
1230 return a.messages.Update(genCtx, summaryMessage)
1231 },
1232 OnTextDelta: func(id, text string) error {
1233 summaryMessage.AppendContent(text)
1234 return a.messages.Update(genCtx, summaryMessage)
1235 },
1236 })
1237 if err != nil {
1238 isCancelErr := errors.Is(err, context.Canceled)
1239 if isCancelErr {
1240 // User cancelled summarize we need to remove the summary message.
1241 deleteErr := a.messages.Delete(ctx, summaryMessage.ID)
1242 return deleteErr
1243 }
1244 // Mark the summary message as finished with an error so the UI
1245 // stops spinning.
1246 summaryMessage.AddFinish(message.FinishReasonError, "Summarization Error", err.Error())
1247 if updateErr := a.messages.Update(ctx, summaryMessage); updateErr != nil {
1248 return updateErr
1249 }
1250 return err
1251 }
1252
1253 summaryMessage.AddFinish(message.FinishReasonEndTurn, "", "")
1254 err = a.messages.Update(genCtx, summaryMessage)
1255 if err != nil {
1256 return err
1257 }
1258
1259 var openrouterCost *float64
1260 for _, step := range resp.Steps {
1261 stepCost := a.openrouterCost(step.ProviderMetadata)
1262 if stepCost != nil {
1263 newCost := *stepCost
1264 if openrouterCost != nil {
1265 newCost += *openrouterCost
1266 }
1267 openrouterCost = &newCost
1268 }
1269 }
1270
1271 a.updateSessionUsage(largeModel, ¤tSession, resp.TotalUsage, openrouterCost, false)
1272
1273 // Just in case, get just the last usage info.
1274 usage := resp.Response.Usage
1275 currentSession.SummaryMessageID = summaryMessage.ID
1276 currentSession.CompletionTokens = summaryCompletionTokens(usage, summaryMessage)
1277 currentSession.PromptTokens = 0
1278 currentSession.EstimatedUsage = usageIsZero(usage)
1279 _, err = a.sessions.Save(genCtx, currentSession)
1280 if err != nil {
1281 return err
1282 }
1283
1284 // Release the active request before processing queued messages so that
1285 // Run() does not see the session as busy.
1286 a.activeRequests.Del(sessionID)
1287 cancel()
1288
1289 // Process any messages that were queued while summarizing.
1290 queuedMessages, ok := a.messageQueue.Get(sessionID)
1291 if !ok || len(queuedMessages) == 0 {
1292 return nil
1293 }
1294 firstQueuedMessage := queuedMessages[0]
1295 a.messageQueue.Set(sessionID, queuedMessages[1:])
1296 _, qErr := a.Run(ctx, firstQueuedMessage)
1297 return qErr
1298}
1299
1300func (a *sessionAgent) getCacheControlOptions() fantasy.ProviderOptions {
1301 if t, _ := strconv.ParseBool(os.Getenv("CRUSH_DISABLE_ANTHROPIC_CACHE")); t {
1302 return fantasy.ProviderOptions{}
1303 }
1304 return fantasy.ProviderOptions{
1305 anthropic.Name: &anthropic.ProviderCacheControlOptions{
1306 CacheControl: anthropic.CacheControl{Type: "ephemeral"},
1307 },
1308 bedrock.Name: &anthropic.ProviderCacheControlOptions{
1309 CacheControl: anthropic.CacheControl{Type: "ephemeral"},
1310 },
1311 vercel.Name: &anthropic.ProviderCacheControlOptions{
1312 CacheControl: anthropic.CacheControl{Type: "ephemeral"},
1313 },
1314 }
1315}
1316
1317func (a *sessionAgent) createUserMessage(ctx context.Context, call SessionAgentCall) (message.Message, error) {
1318 parts := []message.ContentPart{message.TextContent{Text: call.Prompt}}
1319 var attachmentParts []message.ContentPart
1320 for _, attachment := range call.Attachments {
1321 attachmentParts = append(attachmentParts, message.BinaryContent{Path: attachment.FilePath, MIMEType: attachment.MimeType, Data: attachment.Content})
1322 }
1323 parts = append(parts, attachmentParts...)
1324 msg, err := a.messages.Create(ctx, call.SessionID, message.CreateMessageParams{
1325 Role: message.User,
1326 Parts: parts,
1327 })
1328 if err != nil {
1329 return message.Message{}, fmt.Errorf("failed to create user message: %w", err)
1330 }
1331 return msg, nil
1332}
1333
1334func (a *sessionAgent) preparePrompt(msgs []message.Message, supportsImages bool, attachments ...message.Attachment) ([]fantasy.Message, []fantasy.FilePart) {
1335 var history []fantasy.Message
1336 if !a.isSubAgent {
1337 history = append(history, fantasy.NewUserMessage(
1338 fmt.Sprintf(
1339 "<system_reminder>%s</system_reminder>",
1340 `This is a reminder that your todo list is currently empty. DO NOT mention this to the user explicitly because they are already aware.
1341If you are working on tasks that would benefit from a todo list please use the "todos" tool to create one.
1342If not, please feel free to ignore. Again do not mention this message to the user.`,
1343 ),
1344 ))
1345 }
1346 // Collect all tool call IDs present in assistant messages and all tool
1347 // result IDs present in tool messages. This lets us detect both orphaned
1348 // tool results (result without a call) and orphaned tool calls (call
1349 // without a result).
1350 knownToolCallIDs := make(map[string]struct{})
1351 knownToolResultIDs := make(map[string]struct{})
1352 for _, m := range msgs {
1353 switch m.Role {
1354 case message.Assistant:
1355 for _, tc := range m.ToolCalls() {
1356 knownToolCallIDs[tc.ID] = struct{}{}
1357 }
1358 case message.Tool:
1359 for _, tr := range m.ToolResults() {
1360 knownToolResultIDs[tr.ToolCallID] = struct{}{}
1361 }
1362 }
1363 }
1364
1365 for _, m := range msgs {
1366 if len(m.Parts) == 0 {
1367 continue
1368 }
1369 // Assistant message without content or tool calls (cancelled before it returned anything).
1370 if m.Role == message.Assistant && len(m.ToolCalls()) == 0 && m.Content().Text == "" && m.ReasoningContent().String() == "" {
1371 continue
1372 }
1373 if m.Role == message.Tool {
1374 if msg, ok := filterOrphanedToolResults(m, knownToolCallIDs); ok {
1375 history = append(history, msg)
1376 }
1377 continue
1378 }
1379 aiMsgs := m.ToAIMessage()
1380 if !supportsImages {
1381 for i := range aiMsgs {
1382 if aiMsgs[i].Role == fantasy.MessageRoleUser {
1383 aiMsgs[i].Content = filterFileParts(aiMsgs[i].Content)
1384 }
1385 }
1386 }
1387 history = append(history, aiMsgs...)
1388
1389 if m.Role == message.Assistant {
1390 if msg, ok := syntheticToolResultsForOrphanedCalls(m, knownToolResultIDs); ok {
1391 history = append(history, msg)
1392 }
1393 }
1394 }
1395
1396 var files []fantasy.FilePart
1397 for _, attachment := range attachments {
1398 if attachment.IsText() {
1399 continue
1400 }
1401 files = append(files, fantasy.FilePart{
1402 Filename: attachment.FileName,
1403 Data: attachment.Content,
1404 MediaType: attachment.MimeType,
1405 })
1406 }
1407
1408 return history, files
1409}
1410
1411// filterFileParts removes fantasy.FilePart entries from a slice of message
1412// parts. Used to strip image attachments from historical user messages when
1413// the current model does not support them.
1414func filterFileParts(parts []fantasy.MessagePart) []fantasy.MessagePart {
1415 filtered := make([]fantasy.MessagePart, 0, len(parts))
1416 for _, part := range parts {
1417 if _, ok := fantasy.AsMessagePart[fantasy.FilePart](part); ok {
1418 continue
1419 }
1420 filtered = append(filtered, part)
1421 }
1422 return filtered
1423}
1424
1425// filterOrphanedToolResults converts a tool message to a fantasy.Message,
1426// dropping any tool result parts whose tool_call_id has no matching tool call
1427// in the known set. An orphaned result causes API validation to fail on every
1428// subsequent turn, permanently locking the session. Returns the filtered
1429// message and true if at least one valid part remains.
1430func filterOrphanedToolResults(m message.Message, knownToolCallIDs map[string]struct{}) (fantasy.Message, bool) {
1431 aiMsgs := m.ToAIMessage()
1432 if len(aiMsgs) == 0 {
1433 return fantasy.Message{}, false
1434 }
1435 var validParts []fantasy.MessagePart
1436 for _, part := range aiMsgs[0].Content {
1437 tr, ok := fantasy.AsMessagePart[fantasy.ToolResultPart](part)
1438 if !ok {
1439 validParts = append(validParts, part)
1440 continue
1441 }
1442 if _, known := knownToolCallIDs[tr.ToolCallID]; known {
1443 validParts = append(validParts, part)
1444 } else {
1445 slog.Warn(
1446 "Dropping orphaned tool result with no matching tool call",
1447 "tool_call_id", tr.ToolCallID,
1448 )
1449 }
1450 }
1451 if len(validParts) == 0 {
1452 return fantasy.Message{}, false
1453 }
1454 msg := aiMsgs[0]
1455 msg.Content = validParts
1456 return msg, true
1457}
1458
1459// syntheticToolResultsForOrphanedCalls returns a tool message containing
1460// synthetic tool results for any tool calls in the assistant message that
1461// have no matching result in knownToolResultIDs. LLM APIs require every
1462// tool_use to be immediately followed by a tool_result; an interrupted
1463// session can leave orphaned tool_use blocks that permanently lock the
1464// conversation. Returns the message and true if any synthetic results were
1465// produced.
1466func syntheticToolResultsForOrphanedCalls(m message.Message, knownToolResultIDs map[string]struct{}) (fantasy.Message, bool) {
1467 var syntheticParts []fantasy.MessagePart
1468 for _, tc := range m.ToolCalls() {
1469 if _, hasResult := knownToolResultIDs[tc.ID]; hasResult {
1470 continue
1471 }
1472 slog.Warn(
1473 "Injecting synthetic tool result for orphaned tool call",
1474 "tool_call_id", tc.ID,
1475 "tool_name", tc.Name,
1476 )
1477 syntheticParts = append(syntheticParts, fantasy.ToolResultPart{
1478 ToolCallID: tc.ID,
1479 Output: fantasy.ToolResultOutputContentError{
1480 Error: errors.New("tool call was interrupted and did not produce a result, you may retry this call if the result is still needed"),
1481 },
1482 })
1483 }
1484 if len(syntheticParts) == 0 {
1485 return fantasy.Message{}, false
1486 }
1487 return fantasy.Message{
1488 Role: fantasy.MessageRoleTool,
1489 Content: syntheticParts,
1490 }, true
1491}
1492
1493func (a *sessionAgent) getSessionMessages(ctx context.Context, session session.Session) ([]message.Message, error) {
1494 msgs, err := a.messages.List(ctx, session.ID)
1495 if err != nil {
1496 return nil, fmt.Errorf("failed to list messages: %w", err)
1497 }
1498
1499 if session.SummaryMessageID != "" {
1500 summaryMsgIndex := -1
1501 for i, msg := range msgs {
1502 if msg.ID == session.SummaryMessageID {
1503 summaryMsgIndex = i
1504 break
1505 }
1506 }
1507 if summaryMsgIndex != -1 {
1508 msgs = msgs[summaryMsgIndex:]
1509 msgs[0].Role = message.User
1510 }
1511 }
1512 return msgs, nil
1513}
1514
1515// generateTitle generates a session titled based on the initial prompt.
1516func (a *sessionAgent) generateTitle(ctx context.Context, sessionID string, userPrompt string) {
1517 if userPrompt == "" {
1518 return
1519 }
1520
1521 smallModel := a.smallModel.Get()
1522 largeModel := a.largeModel.Get()
1523 systemPromptPrefix := a.systemPromptPrefix.Get()
1524
1525 var maxOutputTokens int64 = 40
1526 if smallModel.CatwalkCfg.CanReason {
1527 maxOutputTokens = smallModel.CatwalkCfg.DefaultMaxTokens
1528 }
1529
1530 newAgent := func(m fantasy.LanguageModel, p []byte, tok int64) fantasy.Agent {
1531 return fantasy.NewAgent(
1532 m,
1533 fantasy.WithSystemPrompt(string(p)+"\n /no_think"),
1534 fantasy.WithMaxOutputTokens(tok),
1535 fantasy.WithUserAgent(userAgent),
1536 )
1537 }
1538
1539 streamCall := fantasy.AgentStreamCall{
1540 Prompt: fmt.Sprintf("Generate a concise title for the following content:\n\n%s\n <think>\n\n</think>", userPrompt),
1541 PrepareStep: func(callCtx context.Context, opts fantasy.PrepareStepFunctionOptions) (_ context.Context, prepared fantasy.PrepareStepResult, err error) {
1542 prepared.Messages = opts.Messages
1543 if systemPromptPrefix != "" {
1544 prepared.Messages = append([]fantasy.Message{
1545 fantasy.NewSystemMessage(systemPromptPrefix),
1546 }, prepared.Messages...)
1547 }
1548 return callCtx, prepared, nil
1549 },
1550 }
1551
1552 // Use the small model to generate the title.
1553 model := smallModel
1554 agent := newAgent(model.Model, titlePrompt, maxOutputTokens)
1555 resp, err := agent.Stream(ctx, streamCall)
1556 if err == nil {
1557 // We successfully generated a title with the small model.
1558 slog.Debug("Generated title with small model")
1559 } else {
1560 // It didn't work. Let's try with the big model.
1561 slog.Error("Error generating title with small model; trying big model", "err", err)
1562 model = largeModel
1563 agent = newAgent(model.Model, titlePrompt, maxOutputTokens)
1564 resp, err = agent.Stream(ctx, streamCall)
1565 if err == nil {
1566 slog.Debug("Generated title with large model")
1567 } else {
1568 // Welp, the large model didn't work either. Use the default
1569 // session name and return.
1570 slog.Error("Error generating title with large model", "err", err)
1571 saveErr := a.sessions.Rename(ctx, sessionID, DefaultSessionName)
1572 if saveErr != nil {
1573 slog.Error("Failed to save session title", "error", saveErr)
1574 }
1575 return
1576 }
1577 }
1578
1579 if resp == nil {
1580 // Actually, we didn't get a response so we can't. Use the default
1581 // session name and return.
1582 slog.Error("Response is nil; can't generate title")
1583 saveErr := a.sessions.Rename(ctx, sessionID, DefaultSessionName)
1584 if saveErr != nil {
1585 slog.Error("Failed to save session title", "error", saveErr)
1586 }
1587 return
1588 }
1589
1590 // Clean up title.
1591 var title string
1592 title = strings.ReplaceAll(resp.Response.Content.Text(), "\n", " ")
1593
1594 // Remove thinking tags if present.
1595 title = thinkTagRegex.ReplaceAllString(title, "")
1596 title = orphanThinkTagRegex.ReplaceAllString(title, "")
1597
1598 title = strings.TrimSpace(title)
1599 title = cmp.Or(title, DefaultSessionName)
1600
1601 // Calculate usage and cost.
1602 var openrouterCost *float64
1603 for _, step := range resp.Steps {
1604 stepCost := a.openrouterCost(step.ProviderMetadata)
1605 if stepCost != nil {
1606 newCost := *stepCost
1607 if openrouterCost != nil {
1608 newCost += *openrouterCost
1609 }
1610 openrouterCost = &newCost
1611 }
1612 }
1613
1614 modelConfig := model.CatwalkCfg
1615 cost := modelConfig.CostPer1MInCached/1e6*float64(resp.TotalUsage.CacheCreationTokens) +
1616 modelConfig.CostPer1MOutCached/1e6*float64(resp.TotalUsage.CacheReadTokens) +
1617 modelConfig.CostPer1MIn/1e6*float64(resp.TotalUsage.InputTokens) +
1618 modelConfig.CostPer1MOut/1e6*float64(resp.TotalUsage.OutputTokens)
1619
1620 // Use override cost if available (e.g., from OpenRouter).
1621 if openrouterCost != nil {
1622 cost = *openrouterCost
1623 }
1624
1625 // Skip cost accumulation
1626 if model.FlatRate {
1627 cost = 0
1628 }
1629
1630 promptTokens := resp.TotalUsage.InputTokens + resp.TotalUsage.CacheCreationTokens
1631 completionTokens := resp.TotalUsage.OutputTokens
1632
1633 // Atomically update only title and usage fields to avoid overriding other
1634 // concurrent session updates.
1635 saveErr := a.sessions.UpdateTitleAndUsage(ctx, sessionID, title, promptTokens, completionTokens, cost)
1636 if saveErr != nil {
1637 slog.Error("Failed to save session title and usage", "error", saveErr)
1638 return
1639 }
1640}
1641
1642func (a *sessionAgent) openrouterCost(metadata fantasy.ProviderMetadata) *float64 {
1643 openrouterMetadata, ok := metadata[openrouter.Name]
1644 if !ok {
1645 return nil
1646 }
1647
1648 opts, ok := openrouterMetadata.(*openrouter.ProviderMetadata)
1649 if !ok {
1650 return nil
1651 }
1652 return &opts.Usage.Cost
1653}
1654
1655func (a *sessionAgent) updateSessionUsage(model Model, session *session.Session, usage fantasy.Usage, overrideCost *float64, estimated bool) {
1656 if !usageIsZero(usage) {
1657 session.EstimatedUsage = estimated
1658 }
1659
1660 modelConfig := model.CatwalkCfg
1661 cost := modelConfig.CostPer1MInCached/1e6*float64(usage.CacheCreationTokens) +
1662 modelConfig.CostPer1MOutCached/1e6*float64(usage.CacheReadTokens) +
1663 modelConfig.CostPer1MIn/1e6*float64(usage.InputTokens) +
1664 modelConfig.CostPer1MOut/1e6*float64(usage.OutputTokens)
1665
1666 if !estimated {
1667 a.eventTokensUsed(session.ID, model, usage, cost)
1668 }
1669
1670 if estimated {
1671 cost = 0
1672 } else {
1673 // Use override cost if available (e.g., from OpenRouter).
1674 if overrideCost != nil {
1675 cost = *overrideCost
1676 }
1677
1678 // Skip cost accumulation
1679 if model.FlatRate {
1680 cost = 0
1681 }
1682 }
1683
1684 session.Cost += cost
1685 updateSessionTokenCounters(session, usage)
1686}
1687
1688func updateSessionTokenCounters(session *session.Session, usage fantasy.Usage) {
1689 if usage.OutputTokens != 0 {
1690 session.CompletionTokens = usage.OutputTokens
1691 }
1692 if promptTokens := usage.InputTokens + usage.CacheReadTokens; promptTokens != 0 {
1693 session.PromptTokens = promptTokens
1694 }
1695}
1696
1697func summaryCompletionTokens(usage fantasy.Usage, summaryMessage message.Message) int64 {
1698 if usage.OutputTokens != 0 {
1699 return usage.OutputTokens
1700 }
1701 return approxTokenCount(summaryMessage.Content().Text) + approxTokenCount(summaryMessage.ReasoningContent().String())
1702}
1703
1704func (a *sessionAgent) Cancel(sessionID string) {
1705 // Serialize against the dispatch handoff in Run so the accepted ->
1706 // (cancel-on-entry | queued | active) transition is atomic against
1707 // this cancel. Every cancel observes at least one of: an active
1708 // request, an accepted run (recorded as a pending cancel), or a
1709 // queue entry it then clears. If none of those hold, an idle Escape
1710 // is a true no-op and must not poison the next prompt.
1711 mu := a.sessionMu(sessionID)
1712 mu.Lock()
1713 defer mu.Unlock()
1714
1715 // Cancel regular requests. Don't use Take() here - we need the entry to
1716 // remain in activeRequests so IsBusy() returns true until the goroutine
1717 // fully completes (including error handling that may access the DB).
1718 // The defer in processRequest will clean up the entry.
1719 if cancel, ok := a.activeRequests.Get(sessionID); ok && cancel != nil {
1720 slog.Debug("Request cancellation initiated", "session_id", sessionID)
1721 cancel()
1722 }
1723
1724 // Also check for summarize requests.
1725 if cancel, ok := a.activeRequests.Get(sessionID + "-summarize"); ok && cancel != nil {
1726 slog.Debug("Summarize cancellation initiated", "session_id", sessionID)
1727 cancel()
1728 }
1729
1730 // Record a pending cancel only when a dispatched-but-not-yet-active
1731 // run exists. This catches runs still in the goroutine scheduler or
1732 // about to enter Run's busy-queue branch, while leaving an idle
1733 // session untouched. Active and accepted are not mutually exclusive:
1734 // when a run is active and a follow-up has been accepted, both the
1735 // cancel above and this pending record fire.
1736 //
1737 // Raise the session's cancel mark to the latest accept sequence
1738 // assigned so far. Every prompt currently accepted-but-not-yet-
1739 // active has a sequence at or below that value, so one cancel covers
1740 // all of them; a prompt accepted after this cancel gets a strictly
1741 // higher sequence and is never poisoned. Using max keeps repeated
1742 // cancels idempotent while the same prompts are in flight and lets a
1743 // later cancel extend coverage to prompts accepted since.
1744 a.acceptedMu.Lock()
1745 count, ok := a.acceptedRuns.Get(sessionID)
1746 mark := a.acceptSeqGen
1747 a.acceptedMu.Unlock()
1748 if ok && count > 0 {
1749 slog.Debug("Recording cancel mark for accepted runs", "session_id", sessionID, "count", count, "mark", mark)
1750 existing, _ := a.cancelMark.Get(sessionID)
1751 a.cancelMark.Set(sessionID, max(existing, mark))
1752 }
1753
1754 if a.QueuedPrompts(sessionID) > 0 {
1755 slog.Debug("Clearing queued prompts", "session_id", sessionID)
1756 a.messageQueue.Del(sessionID)
1757 }
1758}
1759
1760func (a *sessionAgent) ClearQueue(sessionID string) {
1761 if a.QueuedPrompts(sessionID) > 0 {
1762 slog.Debug("Clearing queued prompts", "session_id", sessionID)
1763 a.messageQueue.Del(sessionID)
1764 }
1765}
1766
1767func (a *sessionAgent) CancelAll() {
1768 if !a.IsBusy() {
1769 return
1770 }
1771 for key := range a.activeRequests.Seq2() {
1772 a.Cancel(key) // key is sessionID
1773 }
1774
1775 timeout := time.After(5 * time.Second)
1776 for a.IsBusy() {
1777 select {
1778 case <-timeout:
1779 return
1780 default:
1781 time.Sleep(200 * time.Millisecond)
1782 }
1783 }
1784}
1785
1786func (a *sessionAgent) IsBusy() bool {
1787 var busy bool
1788 for cancelFunc := range a.activeRequests.Seq() {
1789 if cancelFunc != nil {
1790 busy = true
1791 break
1792 }
1793 }
1794 return busy
1795}
1796
1797func (a *sessionAgent) IsSessionBusy(sessionID string) bool {
1798 _, busy := a.activeRequests.Get(sessionID)
1799 return busy
1800}
1801
1802func (a *sessionAgent) QueuedPrompts(sessionID string) int {
1803 l, ok := a.messageQueue.Get(sessionID)
1804 if !ok {
1805 return 0
1806 }
1807 return len(l)
1808}
1809
1810func (a *sessionAgent) QueuedPromptsList(sessionID string) []string {
1811 l, ok := a.messageQueue.Get(sessionID)
1812 if !ok {
1813 return nil
1814 }
1815 prompts := make([]string, len(l))
1816 for i, call := range l {
1817 prompts[i] = call.Prompt
1818 }
1819 return prompts
1820}
1821
1822func (a *sessionAgent) SetModels(large Model, small Model) {
1823 a.largeModel.Set(large)
1824 a.smallModel.Set(small)
1825}
1826
1827func (a *sessionAgent) SetTools(tools []fantasy.AgentTool) {
1828 a.tools.SetSlice(tools)
1829}
1830
1831func (a *sessionAgent) SetSystemPrompt(systemPrompt string) {
1832 a.systemPrompt.Set(systemPrompt)
1833}
1834
1835func (a *sessionAgent) Model() Model {
1836 return a.largeModel.Get()
1837}
1838
1839// convertToToolResult converts a fantasy tool result to a message tool result.
1840func (a *sessionAgent) convertToToolResult(result fantasy.ToolResultContent) message.ToolResult {
1841 baseResult := message.ToolResult{
1842 ToolCallID: result.ToolCallID,
1843 Name: result.ToolName,
1844 Metadata: result.ClientMetadata,
1845 }
1846
1847 switch result.Result.GetType() {
1848 case fantasy.ToolResultContentTypeText:
1849 if r, ok := fantasy.AsToolResultOutputType[fantasy.ToolResultOutputContentText](result.Result); ok {
1850 baseResult.Content = r.Text
1851 }
1852 case fantasy.ToolResultContentTypeError:
1853 if r, ok := fantasy.AsToolResultOutputType[fantasy.ToolResultOutputContentError](result.Result); ok {
1854 baseResult.Content = r.Error.Error()
1855 baseResult.IsError = true
1856 }
1857 case fantasy.ToolResultContentTypeMedia:
1858 if r, ok := fantasy.AsToolResultOutputType[fantasy.ToolResultOutputContentMedia](result.Result); ok {
1859 if !stringext.IsValidBase64(r.Data) {
1860 slog.Warn(
1861 "Tool returned media with invalid base64 data, discarding image",
1862 "tool", result.ToolName,
1863 "tool_call_id", result.ToolCallID,
1864 )
1865 baseResult.Content = "Tool returned image data with invalid encoding"
1866 baseResult.IsError = true
1867 } else {
1868 content := r.Text
1869 if content == "" {
1870 content = fmt.Sprintf("Loaded %s content", r.MediaType)
1871 }
1872 baseResult.Content = content
1873 baseResult.Data = r.Data
1874 baseResult.MIMEType = r.MediaType
1875 }
1876 }
1877 }
1878
1879 return baseResult
1880}
1881
1882// workaroundProviderMediaLimitations converts media content in tool results to
1883// user messages for providers that don't natively support images in tool results.
1884//
1885// Problem: OpenAI, Google, OpenRouter, and other OpenAI-compatible providers
1886// don't support sending images/media in tool result messages - they only accept
1887// text in tool results. However, they DO support images in user messages.
1888//
1889// If we send media in tool results to these providers, the API returns an error.
1890//
1891// Solution: For these providers, we:
1892// 1. Replace the media in the tool result with a text placeholder
1893// 2. Inject a user message immediately after with the image as a file attachment
1894// 3. This maintains the tool execution flow while working around API limitations
1895//
1896// Anthropic and Bedrock support images natively in tool results, so we skip
1897// this workaround for them.
1898//
1899// Example transformation:
1900//
1901// BEFORE: [tool result: image data]
1902// AFTER: [tool result: "Image loaded - see attached"], [user: image attachment]
1903func (a *sessionAgent) workaroundProviderMediaLimitations(messages []fantasy.Message, largeModel Model) []fantasy.Message {
1904 providerSupportsMedia := largeModel.ModelCfg.Provider == string(catwalk.InferenceProviderAnthropic) ||
1905 largeModel.ModelCfg.Provider == string(catwalk.InferenceProviderBedrock) ||
1906 largeModel.ModelCfg.Provider == string(catwalk.InferenceProviderBedrockEurope)
1907
1908 if providerSupportsMedia {
1909 return messages
1910 }
1911
1912 convertedMessages := make([]fantasy.Message, 0, len(messages))
1913
1914 for _, msg := range messages {
1915 if msg.Role != fantasy.MessageRoleTool {
1916 convertedMessages = append(convertedMessages, msg)
1917 continue
1918 }
1919
1920 textParts := make([]fantasy.MessagePart, 0, len(msg.Content))
1921 var mediaFiles []fantasy.FilePart
1922
1923 for _, part := range msg.Content {
1924 toolResult, ok := fantasy.AsMessagePart[fantasy.ToolResultPart](part)
1925 if !ok {
1926 textParts = append(textParts, part)
1927 continue
1928 }
1929
1930 if media, ok := fantasy.AsToolResultOutputType[fantasy.ToolResultOutputContentMedia](toolResult.Output); ok {
1931 decoded, err := base64.StdEncoding.DecodeString(media.Data)
1932 if err != nil {
1933 slog.Warn("Failed to decode media data", "error", err)
1934 textParts = append(textParts, part)
1935 continue
1936 }
1937
1938 mediaFiles = append(mediaFiles, fantasy.FilePart{
1939 Data: decoded,
1940 MediaType: media.MediaType,
1941 Filename: fmt.Sprintf("tool-result-%s", toolResult.ToolCallID),
1942 })
1943
1944 textParts = append(textParts, fantasy.ToolResultPart{
1945 ToolCallID: toolResult.ToolCallID,
1946 Output: fantasy.ToolResultOutputContentText{
1947 Text: "[Image/media content loaded - see attached file]",
1948 },
1949 ProviderOptions: toolResult.ProviderOptions,
1950 })
1951 } else {
1952 textParts = append(textParts, part)
1953 }
1954 }
1955
1956 convertedMessages = append(convertedMessages, fantasy.Message{
1957 Role: fantasy.MessageRoleTool,
1958 Content: textParts,
1959 })
1960
1961 if len(mediaFiles) > 0 {
1962 convertedMessages = append(convertedMessages, fantasy.NewUserMessage(
1963 "Here is the media content from the tool result:",
1964 mediaFiles...,
1965 ))
1966 }
1967 }
1968
1969 return convertedMessages
1970}
1971
1972// buildSummaryPrompt constructs the prompt text for session summarization.
1973func buildSummaryPrompt(todos []session.Todo) string {
1974 var sb strings.Builder
1975 sb.WriteString("Provide a detailed summary of our conversation above.")
1976 if len(todos) > 0 {
1977 sb.WriteString("\n\n## Current Todo List\n\n")
1978 for _, t := range todos {
1979 fmt.Fprintf(&sb, "- [%s] %s\n", t.Status, t.Content)
1980 }
1981 sb.WriteString("\nInclude these tasks and their statuses in your summary. ")
1982 sb.WriteString("Instruct the resuming assistant to use the `todos` tool to continue tracking progress on these tasks.")
1983 }
1984 return sb.String()
1985}
1986
1987func providerRetryLogFields(err *fantasy.ProviderError, delay time.Duration) []any {
1988 fields := []any{
1989 "retry_delay", delay.String(),
1990 }
1991 if err == nil {
1992 return fields
1993 }
1994 fields = append(fields, "status_code", err.StatusCode)
1995 if err.Title != "" {
1996 fields = append(fields, "title", err.Title)
1997 }
1998 if err.Message != "" {
1999 fields = append(fields, "message", err.Message)
2000 }
2001 return fields
2002}