1// Package agent is the core orchestration layer for Crush AI agents.
2//
3// It provides session-based AI agent functionality for managing
4// conversations, tool execution, and message handling. It coordinates
5// interactions between language models, messages, sessions, and tools while
6// handling features like automatic summarization, queuing, and token
7// management.
8package agent
9
10import (
11 "cmp"
12 "context"
13 _ "embed"
14 "encoding/base64"
15 "errors"
16 "fmt"
17 "log/slog"
18 "net/http"
19 "os"
20 "regexp"
21 "strconv"
22 "strings"
23 "sync"
24 "sync/atomic"
25 "time"
26
27 "charm.land/catwalk/pkg/catwalk"
28 "charm.land/fantasy"
29 "charm.land/fantasy/providers/anthropic"
30 "charm.land/fantasy/providers/bedrock"
31 "charm.land/fantasy/providers/google"
32 "charm.land/fantasy/providers/openai"
33 "charm.land/fantasy/providers/openrouter"
34 "charm.land/fantasy/providers/vercel"
35 "charm.land/lipgloss/v2"
36 "github.com/charmbracelet/crush/internal/agent/hyper"
37 "github.com/charmbracelet/crush/internal/agent/notify"
38 "github.com/charmbracelet/crush/internal/agent/tools"
39 "github.com/charmbracelet/crush/internal/agent/tools/mcp"
40 "github.com/charmbracelet/crush/internal/config"
41 "github.com/charmbracelet/crush/internal/csync"
42 "github.com/charmbracelet/crush/internal/message"
43 "github.com/charmbracelet/crush/internal/pubsub"
44 "github.com/charmbracelet/crush/internal/session"
45 "github.com/charmbracelet/crush/internal/stringext"
46 "github.com/charmbracelet/crush/internal/version"
47 "github.com/charmbracelet/x/exp/charmtone"
48)
49
50const (
51 DefaultSessionName = "Untitled Session"
52
53 // Constants for auto-summarization thresholds
54 largeContextWindowThreshold = 200_000
55 largeContextWindowBuffer = 20_000
56 smallContextWindowRatio = 0.2
57)
58
59var userAgent = fmt.Sprintf("Charm-Crush/%s (https://charm.land/crush)", version.Version)
60
61//go:embed templates/title.md
62var titlePrompt []byte
63
64//go:embed templates/summary.md
65var summaryPrompt []byte
66
67// Used to remove <think> tags from generated titles.
68var (
69 thinkTagRegex = regexp.MustCompile(`(?s)<think>.*?</think>`)
70 orphanThinkTagRegex = regexp.MustCompile(`</?think>`)
71)
72
73type SessionAgentCall struct {
74 SessionID string
75 // RunID, when non-empty, is the caller-supplied correlator that
76 // gets echoed back on the notify.RunComplete event emitted for
77 // this turn. It is preserved when the call is enqueued behind a
78 // busy session so the queued turn's terminal event is still
79 // recognisable to the original caller. Callers that need a
80 // reliable completion contract (e.g. `crush run` against a
81 // session that may be busy) MUST set it; SessionID alone is
82 // ambiguous when concurrent turns share the same session.
83 RunID string
84 Prompt string
85 ProviderOptions fantasy.ProviderOptions
86 Attachments []message.Attachment
87 MaxOutputTokens int64
88 Temperature *float64
89 TopP *float64
90 TopK *int64
91 FrequencyPenalty *float64
92 PresencePenalty *float64
93 NonInteractive bool
94 // OnComplete, when non-nil, replaces the default RunComplete
95 // publish path: the inner Run hands the terminal payload to this
96 // callback instead of emitting it on the RunComplete broker. The
97 // coordinator uses this hook to coalesce the unauthorized →
98 // re-auth → retry chain into a single user-visible terminal
99 // event, so non-interactive clients (e.g. `crush run`) don't
100 // exit on a stale failed-attempt RunComplete before the
101 // successful retry. It is intentionally stripped when queueing
102 // a busy-session call (see Run): the originating
103 // coordinator.Run has long returned by the time the queued
104 // recursion drains, so falling back to the default broker
105 // publish keeps the event visible to subscribers.
106 OnComplete func(notify.RunComplete)
107 // Accepted, when non-nil, is the accept reservation taken by
108 // BeginAccepted before the call was dispatched onto a goroutine
109 // (the client/server fire-and-forget path). Run consumes it under
110 // dispatchMu[SessionID] once the accepted -> (cancel-on-entry |
111 // queued | active) transition has been chosen. When nil
112 // (in-process / local callers like AppWorkspace), behavior is
113 // unchanged and no accept tracking applies.
114 Accepted *AcceptedRun
115 // acceptSeq carries the accept sequence of the handle that produced
116 // this call after it has been enqueued and its Accepted handle
117 // stripped. The queue-drain paths compare it against a session's
118 // cancel mark so a follow-up queued before a cancel is dropped while
119 // one queued after the cancel survives. 0 means untracked (an
120 // in-process enqueue with no accept reservation), which the drain
121 // paths treat as covered by any present mark, preserving the
122 // pre-sequence behavior.
123 acceptSeq uint64
124}
125
126type SessionAgent interface {
127 Run(context.Context, SessionAgentCall) (*fantasy.AgentResult, error)
128 BeginAccepted(sessionID string) *AcceptedRun
129 SetModels(large Model, small Model)
130 SetTools(tools []fantasy.AgentTool)
131 SetSystemPrompt(systemPrompt string)
132 Cancel(sessionID string)
133 CancelAll()
134 IsSessionBusy(sessionID string) bool
135 IsBusy() bool
136 QueuedPrompts(sessionID string) int
137 QueuedPromptsList(sessionID string) []string
138 ClearQueue(sessionID string)
139 Summarize(context.Context, string, fantasy.ProviderOptions) error
140 Model() Model
141}
142
143type Model struct {
144 Model fantasy.LanguageModel
145 CatwalkCfg catwalk.Model
146 ModelCfg config.SelectedModel
147 FlatRate bool
148}
149
150type sessionAgent struct {
151 largeModel *csync.Value[Model]
152 smallModel *csync.Value[Model]
153 systemPromptPrefix *csync.Value[string]
154 systemPrompt *csync.Value[string]
155 tools *csync.Slice[fantasy.AgentTool]
156
157 isSubAgent bool
158 sessions session.Service
159 messages message.Service
160 disableAutoSummarize bool
161 isYolo bool
162 notify pubsub.Publisher[notify.Notification]
163 runComplete pubsub.Publisher[notify.RunComplete]
164
165 messageQueue *csync.Map[string, []SessionAgentCall]
166 activeRequests *csync.Map[string, context.CancelFunc]
167
168 // dispatchMu holds a per-session mutex that serializes the
169 // accepted -> (cancel-on-entry | queued | active) transition in
170 // Run against a concurrent Cancel. The lock is held only during
171 // the brief handoff (no DB or LLM I/O under the lock).
172 dispatchMu *csync.Map[string, *sync.Mutex]
173 // acceptedRuns counts dispatched-but-not-yet-active runs per
174 // session. A counter > 0 means a dispatched prompt is in flight
175 // and has not yet completed the dispatch handoff in Run. Only
176 // BeginAccepted increments it; only AcceptedRun.Close decrements
177 // it.
178 acceptedRuns *csync.Map[string, int]
179 // cancelMark records, per session, a high-water accept sequence: an
180 // accepted handle is canceled by it iff the handle's sequence is at
181 // or below the mark. Cancel raises the mark to the latest sequence
182 // assigned at cancel time, so a single Cancel covers every prompt
183 // accepted-but-not-yet-active then, while a prompt accepted later
184 // (higher sequence) is never poisoned. Absent or 0 means no pending
185 // cancel. It is only raised by Cancel when acceptedRuns > 0, so an
186 // idle Escape never records a mark.
187 cancelMark *csync.Map[string, uint64]
188 // dispatchMuCreate guards lazy creation of per-session entries in
189 // dispatchMu so two goroutines can't race to lock different mutex
190 // instances for the same session.
191 dispatchMuCreate sync.Mutex
192 // acceptedMu serializes increments/decrements of acceptedRuns and
193 // the assignment of accept sequence numbers from acceptSeqGen. It
194 // is separate from dispatchMu so AcceptedRun.Close (which may run
195 // while Run holds dispatchMu for the same session) does not
196 // deadlock by re-entering the dispatch lock.
197 acceptedMu sync.Mutex
198 // acceptSeqGen is the monotonic source of accept sequence numbers.
199 // Each BeginAccepted increments it under acceptedMu and stamps the
200 // returned handle, so sequences strictly increase in accept order
201 // across the agent. Cancel uses its current value as the per-session
202 // high-water mark.
203 acceptSeqGen uint64
204}
205
206type SessionAgentOptions struct {
207 LargeModel Model
208 SmallModel Model
209 SystemPromptPrefix string
210 SystemPrompt string
211 IsSubAgent bool
212 DisableAutoSummarize bool
213 IsYolo bool
214 Sessions session.Service
215 Messages message.Service
216 Tools []fantasy.AgentTool
217 Notify pubsub.Publisher[notify.Notification]
218 RunComplete pubsub.Publisher[notify.RunComplete]
219}
220
221func NewSessionAgent(
222 opts SessionAgentOptions,
223) SessionAgent {
224 return &sessionAgent{
225 largeModel: csync.NewValue(opts.LargeModel),
226 smallModel: csync.NewValue(opts.SmallModel),
227 systemPromptPrefix: csync.NewValue(opts.SystemPromptPrefix),
228 systemPrompt: csync.NewValue(opts.SystemPrompt),
229 isSubAgent: opts.IsSubAgent,
230 sessions: opts.Sessions,
231 messages: opts.Messages,
232 disableAutoSummarize: opts.DisableAutoSummarize,
233 tools: csync.NewSliceFrom(opts.Tools),
234 isYolo: opts.IsYolo,
235 notify: opts.Notify,
236 runComplete: opts.RunComplete,
237 messageQueue: csync.NewMap[string, []SessionAgentCall](),
238 activeRequests: csync.NewMap[string, context.CancelFunc](),
239 dispatchMu: csync.NewMap[string, *sync.Mutex](),
240 acceptedRuns: csync.NewMap[string, int](),
241 cancelMark: csync.NewMap[string, uint64](),
242 }
243}
244
245// AcceptedRun owns exactly one accept reservation taken by
246// BeginAccepted. It is the only carrier of accept-state across the
247// backend.runAgent / Coordinator.Run / sessionAgent.Run layers: a
248// counter > 0 means a dispatched prompt is in flight and has not yet
249// completed the dispatch handoff in Run. Close is the only way to
250// release the reservation and is idempotent.
251type AcceptedRun struct {
252 agent *sessionAgent
253 sessionID string
254 // seq is the monotonic accept sequence stamped by BeginAccepted. A
255 // cancel covers this handle iff seq is at or below the session's
256 // cancel mark, so a handle accepted after a cancel (higher seq) is
257 // never poisoned by it.
258 seq uint64
259 done atomic.Bool
260}
261
262// Close decrements the accept counter for this reservation. It is safe
263// to call multiple times; only the first call has effect.
264func (r *AcceptedRun) Close() {
265 if r == nil {
266 return
267 }
268 if !r.done.CompareAndSwap(false, true) {
269 return
270 }
271 r.agent.endAccepted(r.sessionID)
272}
273
274// SessionID exposes the session this reservation is for so the run path
275// can use it without an extra parameter.
276func (r *AcceptedRun) SessionID() string {
277 if r == nil {
278 return ""
279 }
280 return r.sessionID
281}
282
283// BeginAccepted increments the accept counter for sessionID and returns
284// a handle whose Close is the only way to decrement it. It is the only
285// entry point that mutates acceptedRuns.
286func (a *sessionAgent) BeginAccepted(sessionID string) *AcceptedRun {
287 a.acceptedMu.Lock()
288 defer a.acceptedMu.Unlock()
289 count, _ := a.acceptedRuns.Get(sessionID)
290 a.acceptedRuns.Set(sessionID, count+1)
291 a.acceptSeqGen++
292 return &AcceptedRun{agent: a, sessionID: sessionID, seq: a.acceptSeqGen}
293}
294
295// endAccepted decrements the accept counter for sessionID. It is only
296// called via AcceptedRun.Close. It uses a dedicated lock (not the
297// per-session dispatch mutex) so it can run while Run holds dispatchMu
298// for the same session without deadlocking.
299//
300// When the count reaches zero the session's cancel mark is dropped: no
301// accepted handle remains for it to cover, and any handle accepted later
302// gets a strictly higher sequence that the mark would not match anyway.
303// Handles canceled on entry never reach RunComplete, so this is the only
304// place that clears the mark for an all-canceled batch. Sibling handles
305// covered by the same mark are serialized on the per-session dispatch
306// mutex and read the mark before they Close, so this never clears it out
307// from under a covered handle still waiting to enter Run.
308func (a *sessionAgent) endAccepted(sessionID string) {
309 a.acceptedMu.Lock()
310 defer a.acceptedMu.Unlock()
311 count, ok := a.acceptedRuns.Get(sessionID)
312 if !ok || count <= 1 {
313 a.acceptedRuns.Del(sessionID)
314 a.cancelMark.Del(sessionID)
315 return
316 }
317 a.acceptedRuns.Set(sessionID, count-1)
318}
319
320// sessionMu returns the per-session dispatch mutex, creating it on first
321// use. Creation is guarded so concurrent callers always observe the same
322// mutex instance for a given session.
323func (a *sessionAgent) sessionMu(sessionID string) *sync.Mutex {
324 if mu, ok := a.dispatchMu.Get(sessionID); ok {
325 return mu
326 }
327 a.dispatchMuCreate.Lock()
328 defer a.dispatchMuCreate.Unlock()
329 if mu, ok := a.dispatchMu.Get(sessionID); ok {
330 return mu
331 }
332 mu := &sync.Mutex{}
333 a.dispatchMu.Set(sessionID, mu)
334 return mu
335}
336
337// enqueueCall appends call to the session's message queue. The
338// OnComplete hook is stripped: the caller that supplied it (typically
339// coordinator.Run) has its own retry/coalesce scope that ends when it
340// returns, so by the time the queue drains nobody is left to consume the
341// buffered terminal event. The recursive Run falls back to the default
342// broker publish, which is what existing subscribers expect for queued
343// turns.
344func (a *sessionAgent) enqueueCall(call SessionAgentCall) {
345 existing, ok := a.messageQueue.Get(call.SessionID)
346 if !ok {
347 existing = []SessionAgentCall{}
348 }
349 queued := call
350 if call.Accepted != nil {
351 // Preserve the accept sequence after the handle is stripped so
352 // the queue-drain paths can tell a follow-up queued before a
353 // cancel (covered by the mark) from one queued after it.
354 queued.acceptSeq = call.Accepted.seq
355 }
356 queued.OnComplete = nil
357 queued.Accepted = nil
358 existing = append(existing, queued)
359 a.messageQueue.Set(call.SessionID, existing)
360}
361
362// drainUncanceledQueue copies and clears the session's message queue and
363// returns the queued calls not covered by a pending cancel. The queue
364// copy/delete and the cancel-mark check happen under the per-session
365// dispatch mutex so the filtering is atomic against a concurrent Cancel:
366// canceledBySeq requires the caller to hold that mutex, and evaluating it
367// here (rather than after unlocking) prevents a cancel recorded between
368// the drain and the check from being observed inconsistently. The
369// returned survivors are processed by the caller without the lock held.
370func (a *sessionAgent) drainUncanceledQueue(sessionID string) []SessionAgentCall {
371 dispatchLock := a.sessionMu(sessionID)
372 dispatchLock.Lock()
373 defer dispatchLock.Unlock()
374 queuedCalls, _ := a.messageQueue.Get(sessionID)
375 a.messageQueue.Del(sessionID)
376 survivors := queuedCalls[:0]
377 for _, queued := range queuedCalls {
378 if a.canceledBySeq(sessionID, queued.acceptSeq) {
379 continue
380 }
381 survivors = append(survivors, queued)
382 }
383 return survivors
384}
385
386// clearPendingCancel removes any pending-cancel mark for sessionID. It
387// takes the per-session dispatch lock so it is ordered against Cancel
388// and the dispatch handoff.
389func (a *sessionAgent) clearPendingCancel(sessionID string) {
390 mu := a.sessionMu(sessionID)
391 mu.Lock()
392 defer mu.Unlock()
393 a.cancelMark.Del(sessionID)
394}
395
396// canceledBySeq reports whether an accepted handle or queued call with
397// the given accept sequence is covered by a pending cancel for the
398// session. Callers must hold the session's dispatch mutex. A tracked
399// sequence (seq > 0) is covered only when it is at or below the cancel
400// high-water mark, so a prompt accepted after the cancel (higher seq) is
401// never poisoned. An untracked sequence (seq == 0, an in-process enqueue
402// with no accept reservation) is covered whenever any mark is present,
403// preserving the pre-sequence behavior. The mark is not consumed: it
404// stays so every sibling handle it covers observes the same cancel, and
405// a later handle (higher seq) ignores it regardless.
406func (a *sessionAgent) canceledBySeq(sessionID string, seq uint64) bool {
407 mark, ok := a.cancelMark.Get(sessionID)
408 if !ok || mark == 0 {
409 return false
410 }
411 return seq == 0 || seq <= mark
412}
413
414// persistCanceledTurn writes the user/assistant records for a turn that
415// was canceled before (or just as) streaming would have produced them.
416// It creates the user message only when it was not already created by an
417// earlier createUserMessage call (userMsgCreated), then writes an
418// assistant message with FinishReasonCanceled. Both writes use
419// context.WithoutCancel(ctx) so workspace shutdown (which cancels the run
420// context) can't drop them.
421func (a *sessionAgent) persistCanceledTurn(ctx context.Context, call SessionAgentCall, userMsgCreated bool) error {
422 writeCtx, cancel := context.WithTimeout(context.WithoutCancel(ctx), 5*time.Second)
423 defer cancel()
424 if !userMsgCreated {
425 if _, err := a.createUserMessage(writeCtx, call); err != nil {
426 return err
427 }
428 }
429 largeModel := a.largeModel.Get()
430 assistant, err := a.messages.Create(writeCtx, call.SessionID, message.CreateMessageParams{
431 Role: message.Assistant,
432 Parts: []message.ContentPart{},
433 Model: largeModel.ModelCfg.Model,
434 Provider: largeModel.ModelCfg.Provider,
435 })
436 if err != nil {
437 return err
438 }
439 assistant.AddFinish(message.FinishReasonCanceled, "User canceled request", "")
440 return a.messages.Update(writeCtx, assistant)
441}
442
443// publishRunComplete emits the authoritative terminal event for a turn.
444// It honors the per-call OnComplete hook when set (so the coordinator can
445// coalesce retries) and otherwise falls back to the RunComplete broker.
446// ctx is used only for the bounded-blocking must-deliver publish; the
447// terminal payload is supplied by the caller. This is the single emit path
448// shared by the streaming defer and the cancel-on-entry early return so a
449// caller waiting on RunComplete (e.g. `crush run` with a RunID) always
450// observes exactly one terminal event regardless of which Run branch ends
451// the turn.
452func (a *sessionAgent) publishRunComplete(ctx context.Context, call SessionAgentCall, complete notify.RunComplete) {
453 if call.OnComplete != nil {
454 call.OnComplete(complete)
455 return
456 }
457 if a.runComplete == nil {
458 return
459 }
460 a.runComplete.PublishMustDeliver(ctx, pubsub.UpdatedEvent, complete)
461}
462
463// ValidateCall performs the cheap structural validation that
464// sessionAgent.Run requires before a call can be dispatched: a call must
465// carry either a non-empty prompt or a text attachment, and it must name a
466// session. It is exported so callers that accept a run before dispatching it
467// (e.g. backend.SendMessage) can apply the same checks and keep the error
468// contract consistent.
469func ValidateCall(call SessionAgentCall) error {
470 if call.Prompt == "" && !message.ContainsTextAttachment(call.Attachments) {
471 return ErrEmptyPrompt
472 }
473 if call.SessionID == "" {
474 return ErrSessionMissing
475 }
476 return nil
477}
478
479func (a *sessionAgent) Run(ctx context.Context, call SessionAgentCall) (result *fantasy.AgentResult, retErr error) {
480 if err := ValidateCall(call); err != nil {
481 return nil, err
482 }
483
484 // genCtx/cancel are the run context and its cancel func. For the
485 // accepted (fire-and-forget) dispatch path they are created under
486 // dispatchMu below so a concurrent Cancel can observe the
487 // activeRequests entry before the assistant message exists. For
488 // the in-process path they stay nil here and are created later,
489 // preserving the original ordering.
490 var (
491 genCtx context.Context
492 cancel context.CancelFunc
493 activeRegistered bool
494 userMsgCreated bool
495 )
496
497 if call.Accepted != nil {
498 // Serialize the accepted -> (cancel-on-entry | queued |
499 // active) transition against a concurrent Cancel. Cancel takes
500 // the same per-session lock, so every cancel observes at least
501 // one of: a cancel mark, an activeRequests entry, or a
502 // messageQueue entry it then clears.
503 mu := a.sessionMu(call.SessionID)
504 mu.Lock()
505
506 if a.canceledBySeq(call.SessionID, call.Accepted.seq) {
507 // Cancel-on-entry: a cancel arrived while this run was
508 // dispatched but not yet active, and this handle's accept
509 // sequence is at or below the session's cancel mark. The
510 // mark is left in place so sibling handles it also covers
511 // observe the same cancel; release the accept reservation,
512 // drop the lock, and persist a canceled turn without
513 // entering Stream.
514 //
515 // This path returns before the streaming defer that
516 // publishes RunComplete is installed, so emit the terminal
517 // event explicitly. Without it, a caller waiting on
518 // RunComplete for this RunID (e.g. `crush run`, which
519 // ignores message events and blocks on RunComplete) would
520 // hang on an immediately-canceled accepted run.
521 call.Accepted.Close()
522 mu.Unlock()
523 complete := notify.RunComplete{
524 SessionID: call.SessionID,
525 RunID: call.RunID,
526 Cancelled: true,
527 }
528 if err := a.persistCanceledTurn(ctx, call, false); err != nil {
529 complete.Error = err.Error()
530 a.publishRunComplete(ctx, call, complete)
531 return nil, err
532 }
533 a.publishRunComplete(ctx, call, complete)
534 return nil, nil
535 }
536
537 if a.IsSessionBusy(call.SessionID) {
538 // Busy: an earlier prompt is active. Queue this call and
539 // release the accept reservation. A Cancel arriving after
540 // this point sees the active entry and clears the queue.
541 a.enqueueCall(call)
542 call.Accepted.Close()
543 mu.Unlock()
544 return nil, nil
545 }
546
547 // Idle: become the active run. Register the cancel func before
548 // dropping the lock so a Cancel that arrives between here and
549 // assistant creation is not lost.
550 runCtx := context.WithValue(ctx, tools.SessionIDContextKey, call.SessionID)
551 genCtx, cancel = context.WithCancel(runCtx)
552 a.activeRequests.Set(call.SessionID, cancel)
553 activeRegistered = true
554 call.Accepted.Close()
555 mu.Unlock()
556
557 defer cancel()
558 defer a.activeRequests.Del(call.SessionID)
559 } else if a.IsSessionBusy(call.SessionID) {
560 // Queue the message if busy. Strip OnComplete: the caller that
561 // supplied the hook (typically coordinator.Run) has its own
562 // retry/coalesce scope that ends when it returns, so by the time
563 // the queue drains nobody is left to consume the buffered
564 // terminal event. The recursive Run will fall back to the
565 // default broker publish, which is what existing subscribers
566 // expect for queued turns.
567 a.enqueueCall(call)
568 return nil, nil
569 }
570
571 // Copy mutable fields under lock to avoid races with SetTools/SetModels.
572 agentTools := a.tools.Copy()
573 largeModel := a.largeModel.Get()
574 systemPrompt := a.systemPrompt.Get()
575 promptPrefix := a.systemPromptPrefix.Get()
576 var instructions strings.Builder
577
578 for _, server := range mcp.GetStates() {
579 if server.State != mcp.StateConnected {
580 continue
581 }
582 if s := server.Client.InitializeResult().Instructions; s != "" {
583 instructions.WriteString(s)
584 instructions.WriteString("\n\n")
585 }
586 }
587
588 if s := instructions.String(); s != "" {
589 systemPrompt += "\n\n<mcp-instructions>\n" + s + "\n</mcp-instructions>"
590 }
591
592 if len(agentTools) > 0 {
593 // Add Anthropic caching to the last tool.
594 agentTools[len(agentTools)-1].SetProviderOptions(a.getCacheControlOptions())
595 }
596
597 agent := fantasy.NewAgent(
598 largeModel.Model,
599 fantasy.WithSystemPrompt(systemPrompt),
600 fantasy.WithTools(agentTools...),
601 fantasy.WithUserAgent(userAgent),
602 )
603
604 sessionLock := sync.Mutex{}
605 currentSession, err := a.sessions.Get(ctx, call.SessionID)
606 if err != nil {
607 return nil, fmt.Errorf("failed to get session: %w", err)
608 }
609
610 msgs, err := a.getSessionMessages(ctx, currentSession)
611 if err != nil {
612 return nil, fmt.Errorf("failed to get session messages: %w", err)
613 }
614
615 var wg sync.WaitGroup
616 // Generate title if first message.
617 if len(msgs) == 0 {
618 titleCtx := ctx // Copy to avoid race with ctx reassignment below.
619 wg.Go(func() {
620 a.generateTitle(titleCtx, call.SessionID, call.Prompt)
621 })
622 }
623 defer wg.Wait()
624
625 // Add the user message to the session.
626 _, err = a.createUserMessage(ctx, call)
627 if err != nil {
628 return nil, err
629 }
630 userMsgCreated = true
631
632 // Add the session to the context.
633 ctx = context.WithValue(ctx, tools.SessionIDContextKey, call.SessionID)
634
635 // For the accepted dispatch path the run context and cancel func
636 // were already created and registered under dispatchMu above; reuse
637 // them. For the in-process path create them here, preserving the
638 // original ordering.
639 if !activeRegistered {
640 genCtx, cancel = context.WithCancel(ctx)
641 a.activeRequests.Set(call.SessionID, cancel)
642
643 defer cancel()
644 defer a.activeRequests.Del(call.SessionID)
645 }
646 // skipRunComplete is set just before the queued-recursion path so
647 // the outer Run doesn't publish a RunComplete that would race
648 // with — and be superseded by — the recursive call's own
649 // RunComplete (each queued user prompt is its own turn and
650 // publishes exactly one terminal event).
651 var skipRunComplete bool
652 // currentAssistant is declared here so the deferred RunComplete
653 // publish below can capture the pointer that PrepareStep will
654 // later (re)assign for each streaming step. The final assistant
655 // message of the turn is the value reachable through this
656 // pointer when the defer runs.
657 var currentAssistant *message.Message
658 // Drain any debounced message updates before returning. message.Service
659 // already flushes synchronously on terminal updates, but a defer here
660 // guarantees the contract at every Run exit (success, error, panic
661 // recovery upstream) without callers needing to know.
662 //
663 // After the flush completes — meaning all per-message
664 // Publish(UpdatedEvent) calls have fired and been buffered into
665 // every subscriber's channel — publish the authoritative
666 // RunComplete event for this turn. The flush-then-publish order
667 // gives well-behaved clients the best chance of seeing the final
668 // message event before RunComplete; the embedded Text field
669 // reconciles for clients that observe the events out of order
670 // (the pubsub broker fan-in does not serialize publishes from
671 // different upstream brokers).
672 defer func() {
673 // Use a context detached from the run context: workspace
674 // shutdown cancels ctx before this goroutine returns, but the
675 // buffered streaming deltas must still land before the DB is
676 // closed. A short timeout bounds the flush.
677 flushCtx, flushCancel := context.WithTimeout(context.WithoutCancel(ctx), 5*time.Second)
678 defer flushCancel()
679 if flushErr := a.messages.FlushAll(flushCtx); flushErr != nil {
680 slog.Error("Failed to flush pending message updates after run", "error", flushErr)
681 }
682 if skipRunComplete {
683 return
684 }
685 complete := notify.RunComplete{SessionID: call.SessionID, RunID: call.RunID}
686 if currentAssistant != nil {
687 complete.MessageID = currentAssistant.ID
688 complete.Text = currentAssistant.Content().String()
689 }
690 if retErr != nil {
691 complete.Error = retErr.Error()
692 complete.Cancelled = errors.Is(retErr, context.Canceled)
693 } else if ctx.Err() != nil {
694 complete.Cancelled = true
695 }
696 // Prefer the per-call hook when supplied so the coordinator
697 // can coalesce retries (e.g. unauthorized → re-auth → retry)
698 // into a single user-visible terminal event. The fallback
699 // must-deliver publish applies bounded-blocking semantics to
700 // the authoritative terminal event so a momentarily-full
701 // subscriber channel can't silently drop it and hang
702 // non-interactive clients waiting on RunComplete.
703 a.publishRunComplete(ctx, call, complete)
704 }()
705
706 history, files := a.preparePrompt(msgs, largeModel.CatwalkCfg.SupportsImages, call.Attachments...)
707
708 startTime := time.Now()
709 a.eventPromptSent(call.SessionID)
710
711 var stepMessages []fantasy.Message
712 var shouldSummarize bool
713 // Don't send MaxOutputTokens if 0 — some providers (e.g. LM Studio) reject it
714 var maxOutputTokens *int64
715 if call.MaxOutputTokens > 0 {
716 maxOutputTokens = &call.MaxOutputTokens
717 }
718 result, err = agent.Stream(genCtx, fantasy.AgentStreamCall{
719 Prompt: message.PromptWithTextAttachments(call.Prompt, call.Attachments),
720 Files: files,
721 Messages: history,
722 ProviderOptions: call.ProviderOptions,
723 MaxOutputTokens: maxOutputTokens,
724 TopP: call.TopP,
725 Temperature: call.Temperature,
726 PresencePenalty: call.PresencePenalty,
727 TopK: call.TopK,
728 FrequencyPenalty: call.FrequencyPenalty,
729 PrepareStep: func(callContext context.Context, options fantasy.PrepareStepFunctionOptions) (_ context.Context, prepared fantasy.PrepareStepResult, err error) {
730 prepared.Messages = options.Messages
731 for i := range prepared.Messages {
732 prepared.Messages[i].ProviderOptions = nil
733 }
734
735 // Use latest tools (updated by SetTools when MCP tools change).
736 prepared.Tools = a.tools.Copy()
737
738 // Drain queued follow-up prompts, but skip any covered by a
739 // cancel recorded while they sat in the queue: a cancel that
740 // arrived after a prompt was queued must not let it run as
741 // part of this step. Coverage is per-call by accept sequence
742 // so a follow-up queued after the cancel (higher seq) is
743 // still folded in.
744 survivors := a.drainUncanceledQueue(call.SessionID)
745 for _, queued := range survivors {
746 userMessage, createErr := a.createUserMessage(callContext, queued)
747 if createErr != nil {
748 return callContext, prepared, createErr
749 }
750 prepared.Messages = append(prepared.Messages, userMessage.ToAIMessage()...)
751 }
752
753 prepared.Messages = a.workaroundProviderMediaLimitations(prepared.Messages, largeModel)
754
755 lastSystemRoleInx := 0
756 systemMessageUpdated := false
757 for i, msg := range prepared.Messages {
758 // Only add cache control to the last message.
759 if msg.Role == fantasy.MessageRoleSystem {
760 lastSystemRoleInx = i
761 } else if !systemMessageUpdated {
762 prepared.Messages[lastSystemRoleInx].ProviderOptions = a.getCacheControlOptions()
763 systemMessageUpdated = true
764 }
765 // Than add cache control to the last 2 messages.
766 if i > len(prepared.Messages)-3 {
767 prepared.Messages[i].ProviderOptions = a.getCacheControlOptions()
768 }
769 }
770
771 if promptPrefix != "" {
772 prepared.Messages = append([]fantasy.Message{fantasy.NewSystemMessage(promptPrefix)}, prepared.Messages...)
773 }
774
775 sessionLock.Lock()
776 stepMessages = cloneFantasyMessages(prepared.Messages)
777 sessionLock.Unlock()
778
779 var assistantMsg message.Message
780 assistantMsg, err = a.messages.Create(callContext, call.SessionID, message.CreateMessageParams{
781 Role: message.Assistant,
782 Parts: []message.ContentPart{},
783 Model: largeModel.ModelCfg.Model,
784 Provider: largeModel.ModelCfg.Provider,
785 })
786 if err != nil {
787 return callContext, prepared, err
788 }
789 callContext = context.WithValue(callContext, tools.MessageIDContextKey, assistantMsg.ID)
790 callContext = context.WithValue(callContext, tools.SupportsImagesContextKey, largeModel.CatwalkCfg.SupportsImages)
791 callContext = context.WithValue(callContext, tools.ModelNameContextKey, largeModel.CatwalkCfg.Name)
792 currentAssistant = &assistantMsg
793 return callContext, prepared, err
794 },
795 OnReasoningStart: func(id string, reasoning fantasy.ReasoningContent) error {
796 currentAssistant.AppendReasoningContent(reasoning.Text)
797 return a.messages.Update(genCtx, *currentAssistant)
798 },
799 OnReasoningDelta: func(id string, text string) error {
800 currentAssistant.AppendReasoningContent(text)
801 return a.messages.Update(genCtx, *currentAssistant)
802 },
803 OnReasoningEnd: func(id string, reasoning fantasy.ReasoningContent) error {
804 // handle anthropic signature
805 if anthropicData, ok := reasoning.ProviderMetadata[anthropic.Name]; ok {
806 if reasoning, ok := anthropicData.(*anthropic.ReasoningOptionMetadata); ok {
807 currentAssistant.AppendReasoningSignature(reasoning.Signature)
808 }
809 }
810 if googleData, ok := reasoning.ProviderMetadata[google.Name]; ok {
811 if reasoning, ok := googleData.(*google.ReasoningMetadata); ok {
812 currentAssistant.AppendThoughtSignature(reasoning.Signature, reasoning.ToolID)
813 }
814 }
815 if openaiData, ok := reasoning.ProviderMetadata[openai.Name]; ok {
816 if reasoning, ok := openaiData.(*openai.ResponsesReasoningMetadata); ok {
817 currentAssistant.SetReasoningResponsesData(reasoning)
818 }
819 }
820 currentAssistant.FinishThinking()
821 return a.messages.Update(genCtx, *currentAssistant)
822 },
823 OnTextDelta: func(id string, text string) error {
824 // Strip leading newline from initial text content. This is is
825 // particularly important in non-interactive mode where leading
826 // newlines are very visible.
827 if len(currentAssistant.Parts) == 0 {
828 text = strings.TrimPrefix(text, "\n")
829 }
830
831 currentAssistant.AppendContent(text)
832 return a.messages.Update(genCtx, *currentAssistant)
833 },
834 OnToolInputStart: func(id string, toolName string) error {
835 toolCall := message.ToolCall{
836 ID: id,
837 Name: toolName,
838 ProviderExecuted: false,
839 Finished: false,
840 }
841 currentAssistant.AddToolCall(toolCall)
842 // Use parent ctx instead of genCtx to ensure the update succeeds
843 // even if the request is canceled mid-stream
844 return a.messages.Update(ctx, *currentAssistant)
845 },
846 OnRetry: func(err *fantasy.ProviderError, delay time.Duration) {
847 slog.Warn("Provider request failed, retrying", providerRetryLogFields(err, delay)...)
848 },
849 OnToolCall: func(tc fantasy.ToolCallContent) error {
850 toolCall := message.ToolCall{
851 ID: tc.ToolCallID,
852 Name: tc.ToolName,
853 Input: tc.Input,
854 ProviderExecuted: false,
855 Finished: true,
856 }
857 currentAssistant.AddToolCall(toolCall)
858 // Use parent ctx instead of genCtx to ensure the update succeeds
859 // even if the request is canceled mid-stream
860 return a.messages.Update(ctx, *currentAssistant)
861 },
862 OnToolResult: func(result fantasy.ToolResultContent) error {
863 toolResult := a.convertToToolResult(result)
864 // Use parent ctx instead of genCtx to ensure the message is created
865 // even if the request is canceled mid-stream
866 _, createMsgErr := a.messages.Create(ctx, currentAssistant.SessionID, message.CreateMessageParams{
867 Role: message.Tool,
868 Parts: []message.ContentPart{
869 toolResult,
870 },
871 })
872 return createMsgErr
873 },
874 OnStepFinish: func(stepResult fantasy.StepResult) error {
875 finishReason := message.FinishReasonUnknown
876 switch stepResult.FinishReason {
877 case fantasy.FinishReasonLength:
878 finishReason = message.FinishReasonMaxTokens
879 case fantasy.FinishReasonStop:
880 finishReason = message.FinishReasonEndTurn
881 case fantasy.FinishReasonToolCalls:
882 finishReason = message.FinishReasonToolUse
883 }
884 // If a tool result halted the turn (e.g. a hook halt or a
885 // permission denial), the step ends on FinishReasonToolCalls but
886 // the model will not be called again. Treat it as the end of the
887 // turn so the UI can render the assistant footer.
888 if finishReason == message.FinishReasonToolUse {
889 for _, tr := range stepResult.Content.ToolResults() {
890 if tr.StopTurn {
891 finishReason = message.FinishReasonEndTurn
892 break
893 }
894 }
895 }
896 currentAssistant.AddFinish(finishReason, "", "")
897 sessionLock.Lock()
898 defer sessionLock.Unlock()
899
900 updatedSession, getSessionErr := a.sessions.Get(ctx, call.SessionID)
901 if getSessionErr != nil {
902 return getSessionErr
903 }
904 usage, estimated := fallbackStepUsage(stepMessages, stepResult)
905 a.updateSessionUsage(largeModel, &updatedSession, usage, a.openrouterCost(stepResult.ProviderMetadata), estimated)
906 _, sessionErr := a.sessions.Save(ctx, updatedSession)
907 if sessionErr != nil {
908 return sessionErr
909 }
910 currentSession = updatedSession
911 return a.messages.Update(genCtx, *currentAssistant)
912 },
913 StopWhen: []fantasy.StopCondition{
914 func(_ []fantasy.StepResult) bool {
915 cw := int64(largeModel.CatwalkCfg.ContextWindow)
916 // If context window is unknown (0), skip auto-summarize
917 // to avoid immediately truncating custom/local models.
918 if cw == 0 {
919 return false
920 }
921 tokens := currentSession.CompletionTokens + currentSession.PromptTokens
922 remaining := cw - tokens
923 var threshold int64
924 if cw > largeContextWindowThreshold {
925 threshold = largeContextWindowBuffer
926 } else {
927 threshold = int64(float64(cw) * smallContextWindowRatio)
928 }
929 if (remaining <= threshold) && !a.disableAutoSummarize {
930 shouldSummarize = true
931 return true
932 }
933 return false
934 },
935 func(steps []fantasy.StepResult) bool {
936 return hasRepeatedToolCalls(steps, loopDetectionWindowSize, loopDetectionMaxRepeats)
937 },
938 },
939 })
940
941 a.eventPromptResponded(call.SessionID, time.Since(startTime).Truncate(time.Second))
942
943 if err != nil {
944 isHyper := largeModel.ModelCfg.Provider == hyper.Name
945 isCancelErr := errors.Is(err, context.Canceled)
946 if currentAssistant == nil {
947 // Cancel-before-assistant-creation window: the run was
948 // canceled after activeRequests.Set but before PrepareStep
949 // created the assistant message. Without this, the turn
950 // would return with no FinishReasonCanceled marker and no
951 // user-visible record. The user message was already created
952 // above, so persistCanceledTurn only writes the assistant
953 // record.
954 if isCancelErr {
955 if persistErr := a.persistCanceledTurn(ctx, call, userMsgCreated); persistErr != nil {
956 return nil, persistErr
957 }
958 }
959 return result, err
960 }
961 // Persist final state with a context detached from the run
962 // context. The run context (ctx) is derived from the
963 // workspace context, which workspace shutdown cancels before
964 // agent goroutines finish; using ctx here would drop the
965 // final assistant state. WithoutCancel keeps the values
966 // (e.g. session ID) while ignoring cancellation, and a short
967 // timeout bounds the cleanup writes.
968 cleanupCtx, cleanupCancel := context.WithTimeout(context.WithoutCancel(ctx), 5*time.Second)
969 defer cleanupCancel()
970 // Ensure we finish thinking on error to close the reasoning state.
971 currentAssistant.FinishThinking()
972 toolCalls := currentAssistant.ToolCalls()
973 // INFO: we use the cleanup context here because the genCtx has been cancelled.
974 msgs, createErr := a.messages.List(cleanupCtx, currentAssistant.SessionID)
975 if createErr != nil {
976 return nil, createErr
977 }
978 for _, tc := range toolCalls {
979 if !tc.Finished {
980 tc.Finished = true
981 tc.Input = "{}"
982 currentAssistant.AddToolCall(tc)
983 updateErr := a.messages.Update(cleanupCtx, *currentAssistant)
984 if updateErr != nil {
985 return nil, updateErr
986 }
987 }
988
989 found := false
990 for _, msg := range msgs {
991 if msg.Role == message.Tool {
992 for _, tr := range msg.ToolResults() {
993 if tr.ToolCallID == tc.ID {
994 found = true
995 break
996 }
997 }
998 }
999 if found {
1000 break
1001 }
1002 }
1003 if found {
1004 continue
1005 }
1006 content := "There was an error while executing the tool"
1007 if isCancelErr {
1008 content = "Error: user cancelled assistant tool calling"
1009 }
1010 toolResult := message.ToolResult{
1011 ToolCallID: tc.ID,
1012 Name: tc.Name,
1013 Content: content,
1014 IsError: true,
1015 }
1016 _, createErr = a.messages.Create(cleanupCtx, currentAssistant.SessionID, message.CreateMessageParams{
1017 Role: message.Tool,
1018 Parts: []message.ContentPart{
1019 toolResult,
1020 },
1021 })
1022 if createErr != nil {
1023 return nil, createErr
1024 }
1025 }
1026 var fantasyErr *fantasy.Error
1027 var providerErr *fantasy.ProviderError
1028 const defaultTitle = "Provider Error"
1029 linkStyle := lipgloss.NewStyle().Foreground(charmtone.Guac).Underline(true)
1030 if isCancelErr {
1031 currentAssistant.AddFinish(message.FinishReasonCanceled, "User canceled request", "")
1032 } else if isHyper && errors.As(err, &providerErr) && providerErr.StatusCode == http.StatusUnauthorized {
1033 currentAssistant.AddFinish(message.FinishReasonError, "Unauthorized", `Please re-authenticate with Hyper. You can also run "crush auth" to re-authenticate.`)
1034 if a.notify != nil {
1035 a.notify.Publish(pubsub.CreatedEvent, notify.Notification{
1036 SessionID: call.SessionID,
1037 SessionTitle: currentSession.Title,
1038 Type: notify.TypeReAuthenticate,
1039 ProviderID: largeModel.ModelCfg.Provider,
1040 })
1041 }
1042 } else if isHyper && errors.As(err, &providerErr) && providerErr.StatusCode == http.StatusPaymentRequired {
1043 url := hyper.BaseURL()
1044 link := linkStyle.Hyperlink(url, "id=hyper").Render(url)
1045 currentAssistant.AddFinish(message.FinishReasonError, "No credits", "You're out of credits. Add more at "+link)
1046 } else if errors.As(err, &providerErr) {
1047 if providerErr.Message == "The requested model is not supported." {
1048 url := "https://github.com/settings/copilot/features"
1049 link := linkStyle.Hyperlink(url, "id=copilot").Render(url)
1050 currentAssistant.AddFinish(
1051 message.FinishReasonError,
1052 "Copilot model not enabled",
1053 fmt.Sprintf("%q is not enabled in Copilot. Go to the following page to enable it. Then, wait 5 minutes before trying again. %s", largeModel.CatwalkCfg.Name, link),
1054 )
1055 } else {
1056 currentAssistant.AddFinish(message.FinishReasonError, cmp.Or(stringext.Capitalize(providerErr.Title), defaultTitle), providerErr.Message)
1057 }
1058 } else if errors.As(err, &fantasyErr) {
1059 currentAssistant.AddFinish(message.FinishReasonError, cmp.Or(stringext.Capitalize(fantasyErr.Title), defaultTitle), fantasyErr.Message)
1060 } else {
1061 currentAssistant.AddFinish(message.FinishReasonError, defaultTitle, err.Error())
1062 }
1063 // Note: we use the cleanup context here because the genCtx has been
1064 // cancelled.
1065 updateErr := a.messages.Update(cleanupCtx, *currentAssistant)
1066 if updateErr != nil {
1067 return nil, updateErr
1068 }
1069 return nil, err
1070 }
1071
1072 if shouldSummarize {
1073 a.activeRequests.Del(call.SessionID)
1074 if summarizeErr := a.Summarize(genCtx, call.SessionID, call.ProviderOptions); summarizeErr != nil {
1075 return nil, summarizeErr
1076 }
1077 // If the agent wasn't done...
1078 if len(currentAssistant.ToolCalls()) > 0 {
1079 existing, ok := a.messageQueue.Get(call.SessionID)
1080 if !ok {
1081 existing = []SessionAgentCall{}
1082 }
1083 call.Prompt = fmt.Sprintf("The previous session was interrupted because it got too long, the initial user request was: `%s`", call.Prompt)
1084 existing = append(existing, call)
1085 a.messageQueue.Set(call.SessionID, existing)
1086 }
1087 }
1088
1089 // Release active request before publishing the notification.
1090 // TUI handlers poll IsSessionBusy() and only re-evaluate when a
1091 // tea.Msg arrives, so the cleanup must precede the notify or
1092 // subscribers see stale busy state at the moment of receipt.
1093 a.activeRequests.Del(call.SessionID)
1094 cancel()
1095
1096 // Send notification that agent has finished its turn (skip for
1097 // nested/non-interactive sessions).
1098 if !call.NonInteractive && a.notify != nil {
1099 a.notify.Publish(pubsub.CreatedEvent, notify.Notification{
1100 SessionID: call.SessionID,
1101 SessionTitle: currentSession.Title,
1102 Type: notify.TypeAgentFinished,
1103 })
1104 }
1105
1106 // Hand off to the next queued prompt (if any) under dispatchMu so
1107 // the transition from this finished run to the queued run is atomic
1108 // against a concurrent Cancel. activeRequests for this session was
1109 // just deleted above, so without the lock there is a window in
1110 // which the session looks idle and a cancel becomes a no-op that
1111 // fails to stop the queued prompt. Holding the lock lets us observe
1112 // a pending cancel recorded against the session and drop the queue
1113 // instead of running it, and (for the recursion) hand a fresh
1114 // accept reservation to the dequeued call so acceptedRuns stays > 0
1115 // across the recursive Run's own dispatch handoff — keeping the
1116 // session observable to Cancel for the entire transition and
1117 // closing the dequeue -> re-register window.
1118 mu := a.sessionMu(call.SessionID)
1119 mu.Lock()
1120 queuedMessages, _ := a.messageQueue.Get(call.SessionID)
1121 if mark, ok := a.cancelMark.Get(call.SessionID); ok && mark > 0 && len(queuedMessages) > 0 {
1122 // A cancel was recorded for this session (e.g. it arrived while
1123 // this run was active and follow-ups had been queued). Drop the
1124 // queued prompts it covers (accept sequence at or below the
1125 // mark, or untracked); keep any queued after the cancel (higher
1126 // sequence) so they still run.
1127 var kept []SessionAgentCall
1128 for _, q := range queuedMessages {
1129 if q.acceptSeq == 0 || q.acceptSeq <= mark {
1130 continue
1131 }
1132 kept = append(kept, q)
1133 }
1134 queuedMessages = kept
1135 a.messageQueue.Set(call.SessionID, kept)
1136 }
1137 if len(queuedMessages) == 0 {
1138 // No queued work. Clear the cancel mark only when no accepted
1139 // run remains in flight that it might still cover; otherwise a
1140 // sibling prompt (sequence at or below the mark) waiting to
1141 // enter Run would lose its cancellation. When accepted runs are
1142 // gone, this also clears a stale mark so it can't catch a
1143 // future run.
1144 a.messageQueue.Del(call.SessionID)
1145 a.acceptedMu.Lock()
1146 inFlight, _ := a.acceptedRuns.Get(call.SessionID)
1147 a.acceptedMu.Unlock()
1148 if inFlight == 0 {
1149 a.cancelMark.Del(call.SessionID)
1150 }
1151 mu.Unlock()
1152 return result, err
1153 }
1154 // There are queued messages restart the loop. The recursive Run
1155 // publishes its own RunComplete for the queued prompt, so suppress
1156 // the outer defer's emit to avoid a duplicate event whose Error
1157 // field would belong to the recursive turn but whose MessageID/Text
1158 // would belong to the outer turn.
1159 skipRunComplete = true
1160 firstQueuedMessage := queuedMessages[0]
1161 a.messageQueue.Set(call.SessionID, queuedMessages[1:])
1162 // Reserve a fresh accept for the dequeued prompt before dropping the
1163 // lock so acceptedRuns > 0 across the handoff into the recursive
1164 // Run. This closes the window between this dequeue and the recursive
1165 // Run registering its activeRequests entry: a cancel arriving in
1166 // that window now records a pending cancel (acceptedRuns > 0) that
1167 // the recursive Run's accepted path observes as cancel-on-entry.
1168 firstQueuedMessage.Accepted = a.BeginAccepted(call.SessionID)
1169 mu.Unlock()
1170 return a.Run(ctx, firstQueuedMessage)
1171}
1172
1173func (a *sessionAgent) Summarize(ctx context.Context, sessionID string, opts fantasy.ProviderOptions) error {
1174 if a.IsSessionBusy(sessionID) {
1175 return ErrSessionBusy
1176 }
1177
1178 // Copy mutable fields under lock to avoid races with SetModels.
1179 largeModel := a.largeModel.Get()
1180 systemPromptPrefix := a.systemPromptPrefix.Get()
1181
1182 currentSession, err := a.sessions.Get(ctx, sessionID)
1183 if err != nil {
1184 return fmt.Errorf("failed to get session: %w", err)
1185 }
1186 msgs, err := a.getSessionMessages(ctx, currentSession)
1187 if err != nil {
1188 return err
1189 }
1190 if len(msgs) == 0 {
1191 // Nothing to summarize.
1192 return nil
1193 }
1194
1195 aiMsgs, _ := a.preparePrompt(msgs, largeModel.CatwalkCfg.SupportsImages)
1196
1197 genCtx, cancel := context.WithCancel(ctx)
1198 a.activeRequests.Set(sessionID, cancel)
1199 defer a.activeRequests.Del(sessionID)
1200 defer cancel()
1201 defer func() {
1202 if flushErr := a.messages.FlushAll(ctx); flushErr != nil {
1203 slog.Error("Failed to flush pending message updates after summarize", "error", flushErr)
1204 }
1205 }()
1206
1207 agent := fantasy.NewAgent(
1208 largeModel.Model,
1209 fantasy.WithSystemPrompt(string(summaryPrompt)),
1210 fantasy.WithUserAgent(userAgent),
1211 )
1212 summaryMessage, err := a.messages.Create(ctx, sessionID, message.CreateMessageParams{
1213 Role: message.Assistant,
1214 Model: largeModel.ModelCfg.Model,
1215 Provider: largeModel.ModelCfg.Provider,
1216 IsSummaryMessage: true,
1217 })
1218 if err != nil {
1219 return err
1220 }
1221
1222 summaryPromptText := buildSummaryPrompt(currentSession.Todos)
1223
1224 resp, err := agent.Stream(genCtx, fantasy.AgentStreamCall{
1225 Prompt: summaryPromptText,
1226 Messages: aiMsgs,
1227 ProviderOptions: opts,
1228 PrepareStep: func(callContext context.Context, options fantasy.PrepareStepFunctionOptions) (_ context.Context, prepared fantasy.PrepareStepResult, err error) {
1229 prepared.Messages = options.Messages
1230 if systemPromptPrefix != "" {
1231 prepared.Messages = append([]fantasy.Message{fantasy.NewSystemMessage(systemPromptPrefix)}, prepared.Messages...)
1232 }
1233 return callContext, prepared, nil
1234 },
1235 OnReasoningDelta: func(id string, text string) error {
1236 summaryMessage.AppendReasoningContent(text)
1237 return a.messages.Update(genCtx, summaryMessage)
1238 },
1239 OnReasoningEnd: func(id string, reasoning fantasy.ReasoningContent) error {
1240 // Handle anthropic signature.
1241 if anthropicData, ok := reasoning.ProviderMetadata["anthropic"]; ok {
1242 if signature, ok := anthropicData.(*anthropic.ReasoningOptionMetadata); ok && signature.Signature != "" {
1243 summaryMessage.AppendReasoningSignature(signature.Signature)
1244 }
1245 }
1246 summaryMessage.FinishThinking()
1247 return a.messages.Update(genCtx, summaryMessage)
1248 },
1249 OnTextDelta: func(id, text string) error {
1250 summaryMessage.AppendContent(text)
1251 return a.messages.Update(genCtx, summaryMessage)
1252 },
1253 })
1254 if err != nil {
1255 isCancelErr := errors.Is(err, context.Canceled)
1256 if isCancelErr {
1257 // User cancelled summarize we need to remove the summary message.
1258 deleteErr := a.messages.Delete(ctx, summaryMessage.ID)
1259 return deleteErr
1260 }
1261 // Mark the summary message as finished with an error so the UI
1262 // stops spinning.
1263 summaryMessage.AddFinish(message.FinishReasonError, "Summarization Error", err.Error())
1264 if updateErr := a.messages.Update(ctx, summaryMessage); updateErr != nil {
1265 return updateErr
1266 }
1267 return err
1268 }
1269
1270 summaryMessage.AddFinish(message.FinishReasonEndTurn, "", "")
1271 err = a.messages.Update(genCtx, summaryMessage)
1272 if err != nil {
1273 return err
1274 }
1275
1276 var openrouterCost *float64
1277 for _, step := range resp.Steps {
1278 stepCost := a.openrouterCost(step.ProviderMetadata)
1279 if stepCost != nil {
1280 newCost := *stepCost
1281 if openrouterCost != nil {
1282 newCost += *openrouterCost
1283 }
1284 openrouterCost = &newCost
1285 }
1286 }
1287
1288 a.updateSessionUsage(largeModel, ¤tSession, resp.TotalUsage, openrouterCost, false)
1289
1290 // Just in case, get just the last usage info.
1291 usage := resp.Response.Usage
1292 currentSession.SummaryMessageID = summaryMessage.ID
1293 currentSession.CompletionTokens = summaryCompletionTokens(usage, summaryMessage)
1294 currentSession.PromptTokens = 0
1295 currentSession.EstimatedUsage = usageIsZero(usage)
1296 _, err = a.sessions.Save(genCtx, currentSession)
1297 if err != nil {
1298 return err
1299 }
1300
1301 // Release the active request before processing queued messages so that
1302 // Run() does not see the session as busy.
1303 a.activeRequests.Del(sessionID)
1304 cancel()
1305
1306 // Process any messages that were queued while summarizing.
1307 queuedMessages, ok := a.messageQueue.Get(sessionID)
1308 if !ok || len(queuedMessages) == 0 {
1309 return nil
1310 }
1311 firstQueuedMessage := queuedMessages[0]
1312 a.messageQueue.Set(sessionID, queuedMessages[1:])
1313 _, qErr := a.Run(ctx, firstQueuedMessage)
1314 return qErr
1315}
1316
1317func (a *sessionAgent) getCacheControlOptions() fantasy.ProviderOptions {
1318 if t, _ := strconv.ParseBool(os.Getenv("CRUSH_DISABLE_ANTHROPIC_CACHE")); t {
1319 return fantasy.ProviderOptions{}
1320 }
1321 return fantasy.ProviderOptions{
1322 anthropic.Name: &anthropic.ProviderCacheControlOptions{
1323 CacheControl: anthropic.CacheControl{Type: "ephemeral"},
1324 },
1325 bedrock.Name: &anthropic.ProviderCacheControlOptions{
1326 CacheControl: anthropic.CacheControl{Type: "ephemeral"},
1327 },
1328 vercel.Name: &anthropic.ProviderCacheControlOptions{
1329 CacheControl: anthropic.CacheControl{Type: "ephemeral"},
1330 },
1331 }
1332}
1333
1334func (a *sessionAgent) createUserMessage(ctx context.Context, call SessionAgentCall) (message.Message, error) {
1335 parts := []message.ContentPart{message.TextContent{Text: call.Prompt}}
1336 var attachmentParts []message.ContentPart
1337 for _, attachment := range call.Attachments {
1338 attachmentParts = append(attachmentParts, message.BinaryContent{Path: attachment.FilePath, MIMEType: attachment.MimeType, Data: attachment.Content})
1339 }
1340 parts = append(parts, attachmentParts...)
1341 msg, err := a.messages.Create(ctx, call.SessionID, message.CreateMessageParams{
1342 Role: message.User,
1343 Parts: parts,
1344 })
1345 if err != nil {
1346 return message.Message{}, fmt.Errorf("failed to create user message: %w", err)
1347 }
1348 return msg, nil
1349}
1350
1351func (a *sessionAgent) preparePrompt(msgs []message.Message, supportsImages bool, attachments ...message.Attachment) ([]fantasy.Message, []fantasy.FilePart) {
1352 var history []fantasy.Message
1353 if !a.isSubAgent {
1354 history = append(history, fantasy.NewUserMessage(
1355 fmt.Sprintf(
1356 "<system_reminder>%s</system_reminder>",
1357 `This is a reminder that your todo list is currently empty. DO NOT mention this to the user explicitly because they are already aware.
1358If you are working on tasks that would benefit from a todo list please use the "todos" tool to create one.
1359If not, please feel free to ignore. Again do not mention this message to the user.`,
1360 ),
1361 ))
1362 }
1363 // Collect all tool call IDs present in assistant messages and all tool
1364 // result IDs present in tool messages. This lets us detect both orphaned
1365 // tool results (result without a call) and orphaned tool calls (call
1366 // without a result).
1367 knownToolCallIDs := make(map[string]struct{})
1368 knownToolResultIDs := make(map[string]struct{})
1369 for _, m := range msgs {
1370 switch m.Role {
1371 case message.Assistant:
1372 for _, tc := range m.ToolCalls() {
1373 knownToolCallIDs[tc.ID] = struct{}{}
1374 }
1375 case message.Tool:
1376 for _, tr := range m.ToolResults() {
1377 knownToolResultIDs[tr.ToolCallID] = struct{}{}
1378 }
1379 }
1380 }
1381
1382 for _, m := range msgs {
1383 if len(m.Parts) == 0 {
1384 continue
1385 }
1386 // Assistant message without content or tool calls (cancelled before it returned anything).
1387 if m.Role == message.Assistant && len(m.ToolCalls()) == 0 && m.Content().Text == "" && m.ReasoningContent().String() == "" {
1388 continue
1389 }
1390 if m.Role == message.Tool {
1391 if msg, ok := filterOrphanedToolResults(m, knownToolCallIDs); ok {
1392 history = append(history, msg)
1393 }
1394 continue
1395 }
1396 aiMsgs := m.ToAIMessage()
1397 if !supportsImages {
1398 for i := range aiMsgs {
1399 if aiMsgs[i].Role == fantasy.MessageRoleUser {
1400 aiMsgs[i].Content = filterFileParts(aiMsgs[i].Content)
1401 }
1402 }
1403 }
1404 history = append(history, aiMsgs...)
1405
1406 if m.Role == message.Assistant {
1407 if msg, ok := syntheticToolResultsForOrphanedCalls(m, knownToolResultIDs); ok {
1408 history = append(history, msg)
1409 }
1410 }
1411 }
1412
1413 var files []fantasy.FilePart
1414 for _, attachment := range attachments {
1415 if attachment.IsText() {
1416 continue
1417 }
1418 files = append(files, fantasy.FilePart{
1419 Filename: attachment.FileName,
1420 Data: attachment.Content,
1421 MediaType: attachment.MimeType,
1422 })
1423 }
1424
1425 return history, files
1426}
1427
1428// filterFileParts removes fantasy.FilePart entries from a slice of message
1429// parts. Used to strip image attachments from historical user messages when
1430// the current model does not support them.
1431func filterFileParts(parts []fantasy.MessagePart) []fantasy.MessagePart {
1432 filtered := make([]fantasy.MessagePart, 0, len(parts))
1433 for _, part := range parts {
1434 if _, ok := fantasy.AsMessagePart[fantasy.FilePart](part); ok {
1435 continue
1436 }
1437 filtered = append(filtered, part)
1438 }
1439 return filtered
1440}
1441
1442// filterOrphanedToolResults converts a tool message to a fantasy.Message,
1443// dropping any tool result parts whose tool_call_id has no matching tool call
1444// in the known set. An orphaned result causes API validation to fail on every
1445// subsequent turn, permanently locking the session. Returns the filtered
1446// message and true if at least one valid part remains.
1447func filterOrphanedToolResults(m message.Message, knownToolCallIDs map[string]struct{}) (fantasy.Message, bool) {
1448 aiMsgs := m.ToAIMessage()
1449 if len(aiMsgs) == 0 {
1450 return fantasy.Message{}, false
1451 }
1452 var validParts []fantasy.MessagePart
1453 for _, part := range aiMsgs[0].Content {
1454 tr, ok := fantasy.AsMessagePart[fantasy.ToolResultPart](part)
1455 if !ok {
1456 validParts = append(validParts, part)
1457 continue
1458 }
1459 if _, known := knownToolCallIDs[tr.ToolCallID]; known {
1460 validParts = append(validParts, part)
1461 } else {
1462 slog.Warn(
1463 "Dropping orphaned tool result with no matching tool call",
1464 "tool_call_id", tr.ToolCallID,
1465 )
1466 }
1467 }
1468 if len(validParts) == 0 {
1469 return fantasy.Message{}, false
1470 }
1471 msg := aiMsgs[0]
1472 msg.Content = validParts
1473 return msg, true
1474}
1475
1476// syntheticToolResultsForOrphanedCalls returns a tool message containing
1477// synthetic tool results for any tool calls in the assistant message that
1478// have no matching result in knownToolResultIDs. LLM APIs require every
1479// tool_use to be immediately followed by a tool_result; an interrupted
1480// session can leave orphaned tool_use blocks that permanently lock the
1481// conversation. Returns the message and true if any synthetic results were
1482// produced.
1483func syntheticToolResultsForOrphanedCalls(m message.Message, knownToolResultIDs map[string]struct{}) (fantasy.Message, bool) {
1484 var syntheticParts []fantasy.MessagePart
1485 for _, tc := range m.ToolCalls() {
1486 if _, hasResult := knownToolResultIDs[tc.ID]; hasResult {
1487 continue
1488 }
1489 slog.Warn(
1490 "Injecting synthetic tool result for orphaned tool call",
1491 "tool_call_id", tc.ID,
1492 "tool_name", tc.Name,
1493 )
1494 syntheticParts = append(syntheticParts, fantasy.ToolResultPart{
1495 ToolCallID: tc.ID,
1496 Output: fantasy.ToolResultOutputContentError{
1497 Error: errors.New("tool call was interrupted and did not produce a result, you may retry this call if the result is still needed"),
1498 },
1499 })
1500 }
1501 if len(syntheticParts) == 0 {
1502 return fantasy.Message{}, false
1503 }
1504 return fantasy.Message{
1505 Role: fantasy.MessageRoleTool,
1506 Content: syntheticParts,
1507 }, true
1508}
1509
1510func (a *sessionAgent) getSessionMessages(ctx context.Context, session session.Session) ([]message.Message, error) {
1511 msgs, err := a.messages.List(ctx, session.ID)
1512 if err != nil {
1513 return nil, fmt.Errorf("failed to list messages: %w", err)
1514 }
1515
1516 if session.SummaryMessageID != "" {
1517 summaryMsgIndex := -1
1518 for i, msg := range msgs {
1519 if msg.ID == session.SummaryMessageID {
1520 summaryMsgIndex = i
1521 break
1522 }
1523 }
1524 if summaryMsgIndex != -1 {
1525 msgs = msgs[summaryMsgIndex:]
1526 msgs[0].Role = message.User
1527 }
1528 }
1529 return msgs, nil
1530}
1531
1532// generateTitle generates a session titled based on the initial prompt.
1533func (a *sessionAgent) generateTitle(ctx context.Context, sessionID string, userPrompt string) {
1534 if userPrompt == "" {
1535 return
1536 }
1537
1538 smallModel := a.smallModel.Get()
1539 largeModel := a.largeModel.Get()
1540 systemPromptPrefix := a.systemPromptPrefix.Get()
1541
1542 var maxOutputTokens int64 = 40
1543 if smallModel.CatwalkCfg.CanReason {
1544 maxOutputTokens = smallModel.CatwalkCfg.DefaultMaxTokens
1545 }
1546
1547 newAgent := func(m fantasy.LanguageModel, p []byte, tok int64) fantasy.Agent {
1548 return fantasy.NewAgent(
1549 m,
1550 fantasy.WithSystemPrompt(string(p)+"\n /no_think"),
1551 fantasy.WithMaxOutputTokens(tok),
1552 fantasy.WithUserAgent(userAgent),
1553 )
1554 }
1555
1556 streamCall := fantasy.AgentStreamCall{
1557 Prompt: fmt.Sprintf("Generate a concise title for the following content:\n\n%s\n <think>\n\n</think>", userPrompt),
1558 PrepareStep: func(callCtx context.Context, opts fantasy.PrepareStepFunctionOptions) (_ context.Context, prepared fantasy.PrepareStepResult, err error) {
1559 prepared.Messages = opts.Messages
1560 if systemPromptPrefix != "" {
1561 prepared.Messages = append([]fantasy.Message{
1562 fantasy.NewSystemMessage(systemPromptPrefix),
1563 }, prepared.Messages...)
1564 }
1565 return callCtx, prepared, nil
1566 },
1567 }
1568
1569 // Use the small model to generate the title.
1570 model := smallModel
1571 agent := newAgent(model.Model, titlePrompt, maxOutputTokens)
1572 resp, err := agent.Stream(ctx, streamCall)
1573 if err == nil {
1574 // We successfully generated a title with the small model.
1575 slog.Debug("Generated title with small model")
1576 } else {
1577 // It didn't work. Let's try with the big model.
1578 slog.Error("Error generating title with small model; trying big model", "err", err)
1579 model = largeModel
1580 agent = newAgent(model.Model, titlePrompt, maxOutputTokens)
1581 resp, err = agent.Stream(ctx, streamCall)
1582 if err == nil {
1583 slog.Debug("Generated title with large model")
1584 } else {
1585 // Welp, the large model didn't work either. Use the default
1586 // session name and return.
1587 slog.Error("Error generating title with large model", "err", err)
1588 saveErr := a.sessions.Rename(ctx, sessionID, DefaultSessionName)
1589 if saveErr != nil {
1590 slog.Error("Failed to save session title", "error", saveErr)
1591 }
1592 return
1593 }
1594 }
1595
1596 if resp == nil {
1597 // Actually, we didn't get a response so we can't. Use the default
1598 // session name and return.
1599 slog.Error("Response is nil; can't generate title")
1600 saveErr := a.sessions.Rename(ctx, sessionID, DefaultSessionName)
1601 if saveErr != nil {
1602 slog.Error("Failed to save session title", "error", saveErr)
1603 }
1604 return
1605 }
1606
1607 // Clean up title.
1608 var title string
1609 title = strings.ReplaceAll(resp.Response.Content.Text(), "\n", " ")
1610
1611 // Remove thinking tags if present.
1612 title = thinkTagRegex.ReplaceAllString(title, "")
1613 title = orphanThinkTagRegex.ReplaceAllString(title, "")
1614
1615 title = strings.TrimSpace(title)
1616 title = cmp.Or(title, DefaultSessionName)
1617
1618 // Calculate usage and cost.
1619 var openrouterCost *float64
1620 for _, step := range resp.Steps {
1621 stepCost := a.openrouterCost(step.ProviderMetadata)
1622 if stepCost != nil {
1623 newCost := *stepCost
1624 if openrouterCost != nil {
1625 newCost += *openrouterCost
1626 }
1627 openrouterCost = &newCost
1628 }
1629 }
1630
1631 modelConfig := model.CatwalkCfg
1632 cost := modelConfig.CostPer1MInCached/1e6*float64(resp.TotalUsage.CacheCreationTokens) +
1633 modelConfig.CostPer1MOutCached/1e6*float64(resp.TotalUsage.CacheReadTokens) +
1634 modelConfig.CostPer1MIn/1e6*float64(resp.TotalUsage.InputTokens) +
1635 modelConfig.CostPer1MOut/1e6*float64(resp.TotalUsage.OutputTokens)
1636
1637 // Use override cost if available (e.g., from OpenRouter).
1638 if openrouterCost != nil {
1639 cost = *openrouterCost
1640 }
1641
1642 // Skip cost accumulation
1643 if model.FlatRate {
1644 cost = 0
1645 }
1646
1647 promptTokens := resp.TotalUsage.InputTokens + resp.TotalUsage.CacheCreationTokens
1648 completionTokens := resp.TotalUsage.OutputTokens
1649
1650 // Atomically update only title and usage fields to avoid overriding other
1651 // concurrent session updates.
1652 saveErr := a.sessions.UpdateTitleAndUsage(ctx, sessionID, title, promptTokens, completionTokens, cost)
1653 if saveErr != nil {
1654 slog.Error("Failed to save session title and usage", "error", saveErr)
1655 return
1656 }
1657}
1658
1659func (a *sessionAgent) openrouterCost(metadata fantasy.ProviderMetadata) *float64 {
1660 openrouterMetadata, ok := metadata[openrouter.Name]
1661 if !ok {
1662 return nil
1663 }
1664
1665 opts, ok := openrouterMetadata.(*openrouter.ProviderMetadata)
1666 if !ok {
1667 return nil
1668 }
1669 return &opts.Usage.Cost
1670}
1671
1672func (a *sessionAgent) updateSessionUsage(model Model, session *session.Session, usage fantasy.Usage, overrideCost *float64, estimated bool) {
1673 if !usageIsZero(usage) {
1674 session.EstimatedUsage = estimated
1675 }
1676
1677 modelConfig := model.CatwalkCfg
1678 cost := modelConfig.CostPer1MInCached/1e6*float64(usage.CacheCreationTokens) +
1679 modelConfig.CostPer1MOutCached/1e6*float64(usage.CacheReadTokens) +
1680 modelConfig.CostPer1MIn/1e6*float64(usage.InputTokens) +
1681 modelConfig.CostPer1MOut/1e6*float64(usage.OutputTokens)
1682
1683 if !estimated {
1684 a.eventTokensUsed(session.ID, model, usage, cost)
1685 }
1686
1687 if estimated {
1688 cost = 0
1689 } else {
1690 // Use override cost if available (e.g., from OpenRouter).
1691 if overrideCost != nil {
1692 cost = *overrideCost
1693 }
1694
1695 // Skip cost accumulation
1696 if model.FlatRate {
1697 cost = 0
1698 }
1699 }
1700
1701 session.Cost += cost
1702 updateSessionTokenCounters(session, usage)
1703}
1704
1705func updateSessionTokenCounters(session *session.Session, usage fantasy.Usage) {
1706 if usage.OutputTokens != 0 {
1707 session.CompletionTokens = usage.OutputTokens
1708 }
1709 if promptTokens := usage.InputTokens + usage.CacheReadTokens; promptTokens != 0 {
1710 session.PromptTokens = promptTokens
1711 }
1712}
1713
1714func summaryCompletionTokens(usage fantasy.Usage, summaryMessage message.Message) int64 {
1715 if usage.OutputTokens != 0 {
1716 return usage.OutputTokens
1717 }
1718 return approxTokenCount(summaryMessage.Content().Text) + approxTokenCount(summaryMessage.ReasoningContent().String())
1719}
1720
1721func (a *sessionAgent) Cancel(sessionID string) {
1722 // Serialize against the dispatch handoff in Run so the accepted ->
1723 // (cancel-on-entry | queued | active) transition is atomic against
1724 // this cancel. Every cancel observes at least one of: an active
1725 // request, an accepted run (recorded as a pending cancel), or a
1726 // queue entry it then clears. If none of those hold, an idle Escape
1727 // is a true no-op and must not poison the next prompt.
1728 mu := a.sessionMu(sessionID)
1729 mu.Lock()
1730 defer mu.Unlock()
1731
1732 // Cancel regular requests. Don't use Take() here - we need the entry to
1733 // remain in activeRequests so IsBusy() returns true until the goroutine
1734 // fully completes (including error handling that may access the DB).
1735 // The defer in processRequest will clean up the entry.
1736 if cancel, ok := a.activeRequests.Get(sessionID); ok && cancel != nil {
1737 slog.Debug("Request cancellation initiated", "session_id", sessionID)
1738 cancel()
1739 }
1740
1741 // Also check for summarize requests.
1742 if cancel, ok := a.activeRequests.Get(sessionID + "-summarize"); ok && cancel != nil {
1743 slog.Debug("Summarize cancellation initiated", "session_id", sessionID)
1744 cancel()
1745 }
1746
1747 // Record a pending cancel only when a dispatched-but-not-yet-active
1748 // run exists. This catches runs still in the goroutine scheduler or
1749 // about to enter Run's busy-queue branch, while leaving an idle
1750 // session untouched. Active and accepted are not mutually exclusive:
1751 // when a run is active and a follow-up has been accepted, both the
1752 // cancel above and this pending record fire.
1753 //
1754 // Raise the session's cancel mark to the latest accept sequence
1755 // assigned so far. Every prompt currently accepted-but-not-yet-
1756 // active has a sequence at or below that value, so one cancel covers
1757 // all of them; a prompt accepted after this cancel gets a strictly
1758 // higher sequence and is never poisoned. Using max keeps repeated
1759 // cancels idempotent while the same prompts are in flight and lets a
1760 // later cancel extend coverage to prompts accepted since.
1761 a.acceptedMu.Lock()
1762 count, ok := a.acceptedRuns.Get(sessionID)
1763 mark := a.acceptSeqGen
1764 a.acceptedMu.Unlock()
1765 if ok && count > 0 {
1766 slog.Debug("Recording cancel mark for accepted runs", "session_id", sessionID, "count", count, "mark", mark)
1767 existing, _ := a.cancelMark.Get(sessionID)
1768 a.cancelMark.Set(sessionID, max(existing, mark))
1769 }
1770
1771 if a.QueuedPrompts(sessionID) > 0 {
1772 slog.Debug("Clearing queued prompts", "session_id", sessionID)
1773 a.messageQueue.Del(sessionID)
1774 }
1775}
1776
1777func (a *sessionAgent) ClearQueue(sessionID string) {
1778 if a.QueuedPrompts(sessionID) > 0 {
1779 slog.Debug("Clearing queued prompts", "session_id", sessionID)
1780 a.messageQueue.Del(sessionID)
1781 }
1782}
1783
1784func (a *sessionAgent) CancelAll() {
1785 if !a.IsBusy() {
1786 return
1787 }
1788 for key := range a.activeRequests.Seq2() {
1789 a.Cancel(key) // key is sessionID
1790 }
1791
1792 timeout := time.After(5 * time.Second)
1793 for a.IsBusy() {
1794 select {
1795 case <-timeout:
1796 return
1797 default:
1798 time.Sleep(200 * time.Millisecond)
1799 }
1800 }
1801}
1802
1803func (a *sessionAgent) IsBusy() bool {
1804 var busy bool
1805 for cancelFunc := range a.activeRequests.Seq() {
1806 if cancelFunc != nil {
1807 busy = true
1808 break
1809 }
1810 }
1811 return busy
1812}
1813
1814func (a *sessionAgent) IsSessionBusy(sessionID string) bool {
1815 _, busy := a.activeRequests.Get(sessionID)
1816 return busy
1817}
1818
1819func (a *sessionAgent) QueuedPrompts(sessionID string) int {
1820 l, ok := a.messageQueue.Get(sessionID)
1821 if !ok {
1822 return 0
1823 }
1824 return len(l)
1825}
1826
1827func (a *sessionAgent) QueuedPromptsList(sessionID string) []string {
1828 l, ok := a.messageQueue.Get(sessionID)
1829 if !ok {
1830 return nil
1831 }
1832 prompts := make([]string, len(l))
1833 for i, call := range l {
1834 prompts[i] = call.Prompt
1835 }
1836 return prompts
1837}
1838
1839func (a *sessionAgent) SetModels(large Model, small Model) {
1840 a.largeModel.Set(large)
1841 a.smallModel.Set(small)
1842}
1843
1844func (a *sessionAgent) SetTools(tools []fantasy.AgentTool) {
1845 a.tools.SetSlice(tools)
1846}
1847
1848func (a *sessionAgent) SetSystemPrompt(systemPrompt string) {
1849 a.systemPrompt.Set(systemPrompt)
1850}
1851
1852func (a *sessionAgent) Model() Model {
1853 return a.largeModel.Get()
1854}
1855
1856// convertToToolResult converts a fantasy tool result to a message tool result.
1857func (a *sessionAgent) convertToToolResult(result fantasy.ToolResultContent) message.ToolResult {
1858 baseResult := message.ToolResult{
1859 ToolCallID: result.ToolCallID,
1860 Name: result.ToolName,
1861 Metadata: result.ClientMetadata,
1862 }
1863
1864 switch result.Result.GetType() {
1865 case fantasy.ToolResultContentTypeText:
1866 if r, ok := fantasy.AsToolResultOutputType[fantasy.ToolResultOutputContentText](result.Result); ok {
1867 baseResult.Content = r.Text
1868 }
1869 case fantasy.ToolResultContentTypeError:
1870 if r, ok := fantasy.AsToolResultOutputType[fantasy.ToolResultOutputContentError](result.Result); ok {
1871 baseResult.Content = r.Error.Error()
1872 baseResult.IsError = true
1873 }
1874 case fantasy.ToolResultContentTypeMedia:
1875 if r, ok := fantasy.AsToolResultOutputType[fantasy.ToolResultOutputContentMedia](result.Result); ok {
1876 if !stringext.IsValidBase64(r.Data) {
1877 slog.Warn(
1878 "Tool returned media with invalid base64 data, discarding image",
1879 "tool", result.ToolName,
1880 "tool_call_id", result.ToolCallID,
1881 )
1882 baseResult.Content = "Tool returned image data with invalid encoding"
1883 baseResult.IsError = true
1884 } else {
1885 content := r.Text
1886 if content == "" {
1887 content = fmt.Sprintf("Loaded %s content", r.MediaType)
1888 }
1889 baseResult.Content = content
1890 baseResult.Data = r.Data
1891 baseResult.MIMEType = r.MediaType
1892 }
1893 }
1894 }
1895
1896 return baseResult
1897}
1898
1899// workaroundProviderMediaLimitations converts media content in tool results to
1900// user messages for providers that don't natively support images in tool results.
1901//
1902// Problem: OpenAI, Google, OpenRouter, and other OpenAI-compatible providers
1903// don't support sending images/media in tool result messages - they only accept
1904// text in tool results. However, they DO support images in user messages.
1905//
1906// If we send media in tool results to these providers, the API returns an error.
1907//
1908// Solution: For these providers, we:
1909// 1. Replace the media in the tool result with a text placeholder
1910// 2. Inject a user message immediately after with the image as a file attachment
1911// 3. This maintains the tool execution flow while working around API limitations
1912//
1913// Anthropic and Bedrock support images natively in tool results, so we skip
1914// this workaround for them.
1915//
1916// Example transformation:
1917//
1918// BEFORE: [tool result: image data]
1919// AFTER: [tool result: "Image loaded - see attached"], [user: image attachment]
1920func (a *sessionAgent) workaroundProviderMediaLimitations(messages []fantasy.Message, largeModel Model) []fantasy.Message {
1921 providerSupportsMedia := largeModel.ModelCfg.Provider == string(catwalk.InferenceProviderAnthropic) ||
1922 largeModel.ModelCfg.Provider == string(catwalk.InferenceProviderBedrock) ||
1923 largeModel.ModelCfg.Provider == string(catwalk.InferenceProviderBedrockEurope)
1924
1925 if providerSupportsMedia {
1926 return messages
1927 }
1928
1929 convertedMessages := make([]fantasy.Message, 0, len(messages))
1930
1931 for _, msg := range messages {
1932 if msg.Role != fantasy.MessageRoleTool {
1933 convertedMessages = append(convertedMessages, msg)
1934 continue
1935 }
1936
1937 textParts := make([]fantasy.MessagePart, 0, len(msg.Content))
1938 var mediaFiles []fantasy.FilePart
1939
1940 for _, part := range msg.Content {
1941 toolResult, ok := fantasy.AsMessagePart[fantasy.ToolResultPart](part)
1942 if !ok {
1943 textParts = append(textParts, part)
1944 continue
1945 }
1946
1947 if media, ok := fantasy.AsToolResultOutputType[fantasy.ToolResultOutputContentMedia](toolResult.Output); ok {
1948 decoded, err := base64.StdEncoding.DecodeString(media.Data)
1949 if err != nil {
1950 slog.Warn("Failed to decode media data", "error", err)
1951 textParts = append(textParts, part)
1952 continue
1953 }
1954
1955 mediaFiles = append(mediaFiles, fantasy.FilePart{
1956 Data: decoded,
1957 MediaType: media.MediaType,
1958 Filename: fmt.Sprintf("tool-result-%s", toolResult.ToolCallID),
1959 })
1960
1961 textParts = append(textParts, fantasy.ToolResultPart{
1962 ToolCallID: toolResult.ToolCallID,
1963 Output: fantasy.ToolResultOutputContentText{
1964 Text: "[Image/media content loaded - see attached file]",
1965 },
1966 ProviderOptions: toolResult.ProviderOptions,
1967 })
1968 } else {
1969 textParts = append(textParts, part)
1970 }
1971 }
1972
1973 convertedMessages = append(convertedMessages, fantasy.Message{
1974 Role: fantasy.MessageRoleTool,
1975 Content: textParts,
1976 })
1977
1978 if len(mediaFiles) > 0 {
1979 convertedMessages = append(convertedMessages, fantasy.NewUserMessage(
1980 "Here is the media content from the tool result:",
1981 mediaFiles...,
1982 ))
1983 }
1984 }
1985
1986 return convertedMessages
1987}
1988
1989// buildSummaryPrompt constructs the prompt text for session summarization.
1990func buildSummaryPrompt(todos []session.Todo) string {
1991 var sb strings.Builder
1992 sb.WriteString("Provide a detailed summary of our conversation above.")
1993 if len(todos) > 0 {
1994 sb.WriteString("\n\n## Current Todo List\n\n")
1995 for _, t := range todos {
1996 fmt.Fprintf(&sb, "- [%s] %s\n", t.Status, t.Content)
1997 }
1998 sb.WriteString("\nInclude these tasks and their statuses in your summary. ")
1999 sb.WriteString("Instruct the resuming assistant to use the `todos` tool to continue tracking progress on these tasks.")
2000 }
2001 return sb.String()
2002}
2003
2004func providerRetryLogFields(err *fantasy.ProviderError, delay time.Duration) []any {
2005 fields := []any{
2006 "retry_delay", delay.String(),
2007 }
2008 if err == nil {
2009 return fields
2010 }
2011 fields = append(fields, "status_code", err.StatusCode)
2012 if err.Title != "" {
2013 fields = append(fields, "title", err.Title)
2014 }
2015 if err.Message != "" {
2016 fields = append(fields, "message", err.Message)
2017 }
2018 return fields
2019}