agent.go

   1// Package agent is the core orchestration layer for Crush AI agents.
   2//
   3// It provides session-based AI agent functionality for managing
   4// conversations, tool execution, and message handling. It coordinates
   5// interactions between language models, messages, sessions, and tools while
   6// handling features like automatic summarization, queuing, and token
   7// management.
   8package agent
   9
  10import (
  11	"cmp"
  12	"context"
  13	_ "embed"
  14	"encoding/base64"
  15	"errors"
  16	"fmt"
  17	"log/slog"
  18	"net/http"
  19	"os"
  20	"regexp"
  21	"strconv"
  22	"strings"
  23	"sync"
  24	"sync/atomic"
  25	"time"
  26
  27	"charm.land/catwalk/pkg/catwalk"
  28	"charm.land/fantasy"
  29	"charm.land/fantasy/providers/anthropic"
  30	"charm.land/fantasy/providers/bedrock"
  31	"charm.land/fantasy/providers/google"
  32	"charm.land/fantasy/providers/openai"
  33	"charm.land/fantasy/providers/openrouter"
  34	"charm.land/fantasy/providers/vercel"
  35	"charm.land/lipgloss/v2"
  36	"github.com/charmbracelet/crush/internal/agent/hyper"
  37	"github.com/charmbracelet/crush/internal/agent/notify"
  38	"github.com/charmbracelet/crush/internal/agent/tools"
  39	"github.com/charmbracelet/crush/internal/agent/tools/mcp"
  40	"github.com/charmbracelet/crush/internal/config"
  41	"github.com/charmbracelet/crush/internal/csync"
  42	"github.com/charmbracelet/crush/internal/message"
  43	"github.com/charmbracelet/crush/internal/pubsub"
  44	"github.com/charmbracelet/crush/internal/session"
  45	"github.com/charmbracelet/crush/internal/stringext"
  46	"github.com/charmbracelet/crush/internal/version"
  47	"github.com/charmbracelet/x/exp/charmtone"
  48)
  49
  50const (
  51	DefaultSessionName = "Untitled Session"
  52
  53	// Constants for auto-summarization thresholds
  54	largeContextWindowThreshold = 200_000
  55	largeContextWindowBuffer    = 20_000
  56	smallContextWindowRatio     = 0.2
  57)
  58
  59var userAgent = fmt.Sprintf("Charm-Crush/%s (https://charm.land/crush)", version.Version)
  60
  61//go:embed templates/title.md
  62var titlePrompt []byte
  63
  64//go:embed templates/summary.md
  65var summaryPrompt []byte
  66
  67// Used to remove <think> tags from generated titles.
  68var (
  69	thinkTagRegex       = regexp.MustCompile(`(?s)<think>.*?</think>`)
  70	orphanThinkTagRegex = regexp.MustCompile(`</?think>`)
  71)
  72
  73type SessionAgentCall struct {
  74	SessionID string
  75	// RunID, when non-empty, is the caller-supplied correlator that
  76	// gets echoed back on the notify.RunComplete event emitted for
  77	// this turn. It is preserved when the call is enqueued behind a
  78	// busy session so the queued turn's terminal event is still
  79	// recognisable to the original caller. Callers that need a
  80	// reliable completion contract (e.g. `crush run` against a
  81	// session that may be busy) MUST set it; SessionID alone is
  82	// ambiguous when concurrent turns share the same session.
  83	RunID            string
  84	Prompt           string
  85	ProviderOptions  fantasy.ProviderOptions
  86	Attachments      []message.Attachment
  87	MaxOutputTokens  int64
  88	Temperature      *float64
  89	TopP             *float64
  90	TopK             *int64
  91	FrequencyPenalty *float64
  92	PresencePenalty  *float64
  93	NonInteractive   bool
  94	// OnComplete, when non-nil, replaces the default RunComplete
  95	// publish path: the inner Run hands the terminal payload to this
  96	// callback instead of emitting it on the RunComplete broker. The
  97	// coordinator uses this hook to coalesce the unauthorized →
  98	// re-auth → retry chain into a single user-visible terminal
  99	// event, so non-interactive clients (e.g. `crush run`) don't
 100	// exit on a stale failed-attempt RunComplete before the
 101	// successful retry. It is intentionally stripped when queueing
 102	// a busy-session call (see Run): the originating
 103	// coordinator.Run has long returned by the time the queued
 104	// recursion drains, so falling back to the default broker
 105	// publish keeps the event visible to subscribers.
 106	OnComplete func(notify.RunComplete)
 107	// Accepted, when non-nil, is the accept reservation taken by
 108	// BeginAccepted before the call was dispatched onto a goroutine
 109	// (the client/server fire-and-forget path). Run consumes it under
 110	// dispatchMu[SessionID] once the accepted -> (cancel-on-entry |
 111	// queued | active) transition has been chosen. When nil
 112	// (in-process / local callers like AppWorkspace), behavior is
 113	// unchanged and no accept tracking applies.
 114	Accepted *AcceptedRun
 115	// acceptSeq carries the accept sequence of the handle that produced
 116	// this call after it has been enqueued and its Accepted handle
 117	// stripped. The queue-drain paths compare it against a session's
 118	// cancel mark so a follow-up queued before a cancel is dropped while
 119	// one queued after the cancel survives. 0 means untracked (an
 120	// in-process enqueue with no accept reservation), which the drain
 121	// paths treat as covered by any present mark, preserving the
 122	// pre-sequence behavior.
 123	acceptSeq uint64
 124}
 125
 126type SessionAgent interface {
 127	Run(context.Context, SessionAgentCall) (*fantasy.AgentResult, error)
 128	BeginAccepted(sessionID string) *AcceptedRun
 129	SetModels(large Model, small Model)
 130	SetTools(tools []fantasy.AgentTool)
 131	SetSystemPrompt(systemPrompt string)
 132	Cancel(sessionID string)
 133	CancelAll()
 134	IsSessionBusy(sessionID string) bool
 135	IsBusy() bool
 136	QueuedPrompts(sessionID string) int
 137	QueuedPromptsList(sessionID string) []string
 138	ClearQueue(sessionID string)
 139	Summarize(context.Context, string, fantasy.ProviderOptions) error
 140	Model() Model
 141}
 142
 143type Model struct {
 144	Model      fantasy.LanguageModel
 145	CatwalkCfg catwalk.Model
 146	ModelCfg   config.SelectedModel
 147	FlatRate   bool
 148}
 149
 150type sessionAgent struct {
 151	largeModel         *csync.Value[Model]
 152	smallModel         *csync.Value[Model]
 153	systemPromptPrefix *csync.Value[string]
 154	systemPrompt       *csync.Value[string]
 155	tools              *csync.Slice[fantasy.AgentTool]
 156
 157	isSubAgent           bool
 158	sessions             session.Service
 159	messages             message.Service
 160	disableAutoSummarize bool
 161	isYolo               bool
 162	notify               pubsub.Publisher[notify.Notification]
 163	runComplete          pubsub.Publisher[notify.RunComplete]
 164
 165	messageQueue   *csync.Map[string, []SessionAgentCall]
 166	activeRequests *csync.Map[string, context.CancelFunc]
 167
 168	// dispatchMu holds a per-session mutex that serializes the
 169	// accepted -> (cancel-on-entry | queued | active) transition in
 170	// Run against a concurrent Cancel. The lock is held only during
 171	// the brief handoff (no DB or LLM I/O under the lock).
 172	dispatchMu *csync.Map[string, *sync.Mutex]
 173	// acceptedRuns counts dispatched-but-not-yet-active runs per
 174	// session. A counter > 0 means a dispatched prompt is in flight
 175	// and has not yet completed the dispatch handoff in Run. Only
 176	// BeginAccepted increments it; only AcceptedRun.Close decrements
 177	// it.
 178	acceptedRuns *csync.Map[string, int]
 179	// cancelMark records, per session, a high-water accept sequence: an
 180	// accepted handle is canceled by it iff the handle's sequence is at
 181	// or below the mark. Cancel raises the mark to the latest sequence
 182	// assigned at cancel time, so a single Cancel covers every prompt
 183	// accepted-but-not-yet-active then, while a prompt accepted later
 184	// (higher sequence) is never poisoned. Absent or 0 means no pending
 185	// cancel. It is only raised by Cancel when acceptedRuns > 0, so an
 186	// idle Escape never records a mark.
 187	cancelMark *csync.Map[string, uint64]
 188	// dispatchMuCreate guards lazy creation of per-session entries in
 189	// dispatchMu so two goroutines can't race to lock different mutex
 190	// instances for the same session.
 191	dispatchMuCreate sync.Mutex
 192	// acceptedMu serializes increments/decrements of acceptedRuns and
 193	// the assignment of accept sequence numbers from acceptSeqGen. It
 194	// is separate from dispatchMu so AcceptedRun.Close (which may run
 195	// while Run holds dispatchMu for the same session) does not
 196	// deadlock by re-entering the dispatch lock.
 197	acceptedMu sync.Mutex
 198	// acceptSeqGen is the monotonic source of accept sequence numbers.
 199	// Each BeginAccepted increments it under acceptedMu and stamps the
 200	// returned handle, so sequences strictly increase in accept order
 201	// across the agent. Cancel uses its current value as the per-session
 202	// high-water mark.
 203	acceptSeqGen uint64
 204}
 205
 206type SessionAgentOptions struct {
 207	LargeModel           Model
 208	SmallModel           Model
 209	SystemPromptPrefix   string
 210	SystemPrompt         string
 211	IsSubAgent           bool
 212	DisableAutoSummarize bool
 213	IsYolo               bool
 214	Sessions             session.Service
 215	Messages             message.Service
 216	Tools                []fantasy.AgentTool
 217	Notify               pubsub.Publisher[notify.Notification]
 218	RunComplete          pubsub.Publisher[notify.RunComplete]
 219}
 220
 221func NewSessionAgent(
 222	opts SessionAgentOptions,
 223) SessionAgent {
 224	return &sessionAgent{
 225		largeModel:           csync.NewValue(opts.LargeModel),
 226		smallModel:           csync.NewValue(opts.SmallModel),
 227		systemPromptPrefix:   csync.NewValue(opts.SystemPromptPrefix),
 228		systemPrompt:         csync.NewValue(opts.SystemPrompt),
 229		isSubAgent:           opts.IsSubAgent,
 230		sessions:             opts.Sessions,
 231		messages:             opts.Messages,
 232		disableAutoSummarize: opts.DisableAutoSummarize,
 233		tools:                csync.NewSliceFrom(opts.Tools),
 234		isYolo:               opts.IsYolo,
 235		notify:               opts.Notify,
 236		runComplete:          opts.RunComplete,
 237		messageQueue:         csync.NewMap[string, []SessionAgentCall](),
 238		activeRequests:       csync.NewMap[string, context.CancelFunc](),
 239		dispatchMu:           csync.NewMap[string, *sync.Mutex](),
 240		acceptedRuns:         csync.NewMap[string, int](),
 241		cancelMark:           csync.NewMap[string, uint64](),
 242	}
 243}
 244
 245// AcceptedRun owns exactly one accept reservation taken by
 246// BeginAccepted. It is the only carrier of accept-state across the
 247// backend.runAgent / Coordinator.Run / sessionAgent.Run layers: a
 248// counter > 0 means a dispatched prompt is in flight and has not yet
 249// completed the dispatch handoff in Run. Close is the only way to
 250// release the reservation and is idempotent.
 251type AcceptedRun struct {
 252	agent     *sessionAgent
 253	sessionID string
 254	// seq is the monotonic accept sequence stamped by BeginAccepted. A
 255	// cancel covers this handle iff seq is at or below the session's
 256	// cancel mark, so a handle accepted after a cancel (higher seq) is
 257	// never poisoned by it.
 258	seq  uint64
 259	done atomic.Bool
 260}
 261
 262// Close decrements the accept counter for this reservation. It is safe
 263// to call multiple times; only the first call has effect.
 264func (r *AcceptedRun) Close() {
 265	if r == nil {
 266		return
 267	}
 268	if !r.done.CompareAndSwap(false, true) {
 269		return
 270	}
 271	r.agent.endAccepted(r.sessionID)
 272}
 273
 274// SessionID exposes the session this reservation is for so the run path
 275// can use it without an extra parameter.
 276func (r *AcceptedRun) SessionID() string {
 277	if r == nil {
 278		return ""
 279	}
 280	return r.sessionID
 281}
 282
 283// BeginAccepted increments the accept counter for sessionID and returns
 284// a handle whose Close is the only way to decrement it. It is the only
 285// entry point that mutates acceptedRuns.
 286func (a *sessionAgent) BeginAccepted(sessionID string) *AcceptedRun {
 287	a.acceptedMu.Lock()
 288	defer a.acceptedMu.Unlock()
 289	count, _ := a.acceptedRuns.Get(sessionID)
 290	a.acceptedRuns.Set(sessionID, count+1)
 291	a.acceptSeqGen++
 292	return &AcceptedRun{agent: a, sessionID: sessionID, seq: a.acceptSeqGen}
 293}
 294
 295// endAccepted decrements the accept counter for sessionID. It is only
 296// called via AcceptedRun.Close. It uses a dedicated lock (not the
 297// per-session dispatch mutex) so it can run while Run holds dispatchMu
 298// for the same session without deadlocking.
 299//
 300// When the count reaches zero the session's cancel mark is dropped: no
 301// accepted handle remains for it to cover, and any handle accepted later
 302// gets a strictly higher sequence that the mark would not match anyway.
 303// Handles canceled on entry never reach RunComplete, so this is the only
 304// place that clears the mark for an all-canceled batch. Sibling handles
 305// covered by the same mark are serialized on the per-session dispatch
 306// mutex and read the mark before they Close, so this never clears it out
 307// from under a covered handle still waiting to enter Run.
 308func (a *sessionAgent) endAccepted(sessionID string) {
 309	a.acceptedMu.Lock()
 310	defer a.acceptedMu.Unlock()
 311	count, ok := a.acceptedRuns.Get(sessionID)
 312	if !ok || count <= 1 {
 313		a.acceptedRuns.Del(sessionID)
 314		a.cancelMark.Del(sessionID)
 315		return
 316	}
 317	a.acceptedRuns.Set(sessionID, count-1)
 318}
 319
 320// sessionMu returns the per-session dispatch mutex, creating it on first
 321// use. Creation is guarded so concurrent callers always observe the same
 322// mutex instance for a given session.
 323func (a *sessionAgent) sessionMu(sessionID string) *sync.Mutex {
 324	if mu, ok := a.dispatchMu.Get(sessionID); ok {
 325		return mu
 326	}
 327	a.dispatchMuCreate.Lock()
 328	defer a.dispatchMuCreate.Unlock()
 329	if mu, ok := a.dispatchMu.Get(sessionID); ok {
 330		return mu
 331	}
 332	mu := &sync.Mutex{}
 333	a.dispatchMu.Set(sessionID, mu)
 334	return mu
 335}
 336
 337// enqueueCall appends call to the session's message queue. The
 338// OnComplete hook is stripped: the caller that supplied it (typically
 339// coordinator.Run) has its own retry/coalesce scope that ends when it
 340// returns, so by the time the queue drains nobody is left to consume the
 341// buffered terminal event. The recursive Run falls back to the default
 342// broker publish, which is what existing subscribers expect for queued
 343// turns.
 344func (a *sessionAgent) enqueueCall(call SessionAgentCall) {
 345	existing, ok := a.messageQueue.Get(call.SessionID)
 346	if !ok {
 347		existing = []SessionAgentCall{}
 348	}
 349	queued := call
 350	if call.Accepted != nil {
 351		// Preserve the accept sequence after the handle is stripped so
 352		// the queue-drain paths can tell a follow-up queued before a
 353		// cancel (covered by the mark) from one queued after it.
 354		queued.acceptSeq = call.Accepted.seq
 355	}
 356	queued.OnComplete = nil
 357	queued.Accepted = nil
 358	existing = append(existing, queued)
 359	a.messageQueue.Set(call.SessionID, existing)
 360}
 361
 362// drainQueueForStep partitions the session's queued calls for the current
 363// streaming step under the per-session dispatch mutex so the filtering is
 364// atomic against a concurrent Cancel: canceledBySeq requires the caller to
 365// hold that mutex, and evaluating it here (rather than after unlocking)
 366// prevents a cancel recorded between the drain and the check from being
 367// observed inconsistently.
 368//
 369// Calls covered by a pending cancel are dropped; the dropped ones that
 370// carry a RunID are returned in canceledWithRunID so the caller can
 371// publish their terminal cancelled RunComplete (a caller waiting on that
 372// RunID, e.g. `crush run`, would otherwise hang). Uncanceled calls without
 373// a RunID are returned in fold to be folded into the active turn,
 374// preserving the existing follow-up behavior. Uncanceled calls that carry
 375// a RunID are left in the queue so each runs as its own turn via the
 376// recursive run path and publishes its own RunComplete, giving every
 377// RunID-bearing prompt an explicit lifecycle instead of being silently
 378// absorbed into another turn. fold is processed by the caller without the
 379// lock held.
 380func (a *sessionAgent) drainQueueForStep(sessionID string) (fold, canceledWithRunID []SessionAgentCall) {
 381	dispatchLock := a.sessionMu(sessionID)
 382	dispatchLock.Lock()
 383	defer dispatchLock.Unlock()
 384	queuedCalls, _ := a.messageQueue.Get(sessionID)
 385	var keep []SessionAgentCall
 386	for _, queued := range queuedCalls {
 387		if a.canceledBySeq(sessionID, queued.acceptSeq) {
 388			if queued.RunID != "" {
 389				canceledWithRunID = append(canceledWithRunID, queued)
 390			}
 391			continue
 392		}
 393		if queued.RunID != "" {
 394			keep = append(keep, queued)
 395			continue
 396		}
 397		fold = append(fold, queued)
 398	}
 399	if len(keep) == 0 {
 400		a.messageQueue.Del(sessionID)
 401	} else {
 402		a.messageQueue.Set(sessionID, keep)
 403	}
 404	return fold, canceledWithRunID
 405}
 406
 407// publishCanceledQueueDrops emits a terminal cancelled RunComplete for
 408// every dropped queued call that carries a RunID. A queued prompt removed
 409// from the queue without ever running — covered by a pending cancel, or
 410// cleared by Cancel/ClearQueue — would otherwise leave a caller blocked on
 411// that RunID: `crush run` ignores live message events and exits only on a
 412// RunComplete whose RunID matches. Calls without a RunID had no such waiter
 413// and are dropped silently as before. A detached, bounded context keeps the
 414// must-deliver publish alive even when the run context that triggered the
 415// drop is already canceled.
 416func (a *sessionAgent) publishCanceledQueueDrops(drops []SessionAgentCall) {
 417	var hasRunID bool
 418	for _, d := range drops {
 419		if d.RunID != "" {
 420			hasRunID = true
 421			break
 422		}
 423	}
 424	if !hasRunID {
 425		return
 426	}
 427	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
 428	defer cancel()
 429	for _, d := range drops {
 430		if d.RunID == "" {
 431			continue
 432		}
 433		a.publishRunComplete(ctx, d, notify.RunComplete{
 434			SessionID: d.SessionID,
 435			RunID:     d.RunID,
 436			Cancelled: true,
 437		})
 438	}
 439}
 440
 441// clearQueueAndNotify removes all queued prompts for the session and
 442// publishes a terminal cancelled RunComplete for any that carried a RunID,
 443// so callers waiting on those RunIDs (e.g. `crush run`) are not left
 444// hanging when their queued prompt is discarded without running.
 445func (a *sessionAgent) clearQueueAndNotify(sessionID string) {
 446	queued, ok := a.messageQueue.Get(sessionID)
 447	a.messageQueue.Del(sessionID)
 448	if !ok {
 449		return
 450	}
 451	a.publishCanceledQueueDrops(queued)
 452}
 453
 454// clearPendingCancel removes any pending-cancel mark for sessionID. It
 455// takes the per-session dispatch lock so it is ordered against Cancel
 456// and the dispatch handoff.
 457func (a *sessionAgent) clearPendingCancel(sessionID string) {
 458	mu := a.sessionMu(sessionID)
 459	mu.Lock()
 460	defer mu.Unlock()
 461	a.cancelMark.Del(sessionID)
 462}
 463
 464// canceledBySeq reports whether an accepted handle or queued call with
 465// the given accept sequence is covered by a pending cancel for the
 466// session. Callers must hold the session's dispatch mutex. A tracked
 467// sequence (seq > 0) is covered only when it is at or below the cancel
 468// high-water mark, so a prompt accepted after the cancel (higher seq) is
 469// never poisoned. An untracked sequence (seq == 0, an in-process enqueue
 470// with no accept reservation) is covered whenever any mark is present,
 471// preserving the pre-sequence behavior. The mark is not consumed: it
 472// stays so every sibling handle it covers observes the same cancel, and
 473// a later handle (higher seq) ignores it regardless.
 474func (a *sessionAgent) canceledBySeq(sessionID string, seq uint64) bool {
 475	mark, ok := a.cancelMark.Get(sessionID)
 476	if !ok || mark == 0 {
 477		return false
 478	}
 479	return seq == 0 || seq <= mark
 480}
 481
 482// persistCanceledTurn writes the user/assistant records for a turn that
 483// was canceled before (or just as) streaming would have produced them.
 484// It creates the user message only when it was not already created by an
 485// earlier createUserMessage call (userMsgCreated), then writes an
 486// assistant message with FinishReasonCanceled. Both writes use
 487// context.WithoutCancel(ctx) so workspace shutdown (which cancels the run
 488// context) can't drop them.
 489func (a *sessionAgent) persistCanceledTurn(ctx context.Context, call SessionAgentCall, userMsgCreated bool) error {
 490	writeCtx, cancel := context.WithTimeout(context.WithoutCancel(ctx), 5*time.Second)
 491	defer cancel()
 492	if !userMsgCreated {
 493		if _, err := a.createUserMessage(writeCtx, call); err != nil {
 494			return err
 495		}
 496	}
 497	largeModel := a.largeModel.Get()
 498	assistant, err := a.messages.Create(writeCtx, call.SessionID, message.CreateMessageParams{
 499		Role:     message.Assistant,
 500		Parts:    []message.ContentPart{},
 501		Model:    largeModel.ModelCfg.Model,
 502		Provider: largeModel.ModelCfg.Provider,
 503	})
 504	if err != nil {
 505		return err
 506	}
 507	assistant.AddFinish(message.FinishReasonCanceled, "User canceled request", "")
 508	return a.messages.Update(writeCtx, assistant)
 509}
 510
 511// publishRunComplete emits the authoritative terminal event for a turn.
 512// It honors the per-call OnComplete hook when set (so the coordinator can
 513// coalesce retries) and otherwise falls back to the RunComplete broker.
 514// ctx is used only for the bounded-blocking must-deliver publish; the
 515// terminal payload is supplied by the caller. This is the single emit path
 516// shared by the streaming defer and the cancel-on-entry early return so a
 517// caller waiting on RunComplete (e.g. `crush run` with a RunID) always
 518// observes exactly one terminal event regardless of which Run branch ends
 519// the turn.
 520func (a *sessionAgent) publishRunComplete(ctx context.Context, call SessionAgentCall, complete notify.RunComplete) {
 521	if call.OnComplete != nil {
 522		call.OnComplete(complete)
 523		return
 524	}
 525	if a.runComplete == nil {
 526		return
 527	}
 528	a.runComplete.PublishMustDeliver(ctx, pubsub.UpdatedEvent, complete)
 529}
 530
 531// ValidateCall performs the cheap structural validation that
 532// sessionAgent.Run requires before a call can be dispatched: a call must
 533// carry either a non-empty prompt or a text attachment, and it must name a
 534// session. It is exported so callers that accept a run before dispatching it
 535// (e.g. backend.SendMessage) can apply the same checks and keep the error
 536// contract consistent.
 537func ValidateCall(call SessionAgentCall) error {
 538	if call.Prompt == "" && !message.ContainsTextAttachment(call.Attachments) {
 539		return ErrEmptyPrompt
 540	}
 541	if call.SessionID == "" {
 542		return ErrSessionMissing
 543	}
 544	return nil
 545}
 546
 547func (a *sessionAgent) Run(ctx context.Context, call SessionAgentCall) (result *fantasy.AgentResult, retErr error) {
 548	if err := ValidateCall(call); err != nil {
 549		return nil, err
 550	}
 551
 552	// genCtx/cancel are the run context and its cancel func. For the
 553	// accepted (fire-and-forget) dispatch path they are created under
 554	// dispatchMu below so a concurrent Cancel can observe the
 555	// activeRequests entry before the assistant message exists. For
 556	// the in-process path they stay nil here and are created later,
 557	// preserving the original ordering.
 558	var (
 559		genCtx           context.Context
 560		cancel           context.CancelFunc
 561		activeRegistered bool
 562		userMsgCreated   bool
 563	)
 564
 565	if call.Accepted != nil {
 566		// Serialize the accepted -> (cancel-on-entry | queued |
 567		// active) transition against a concurrent Cancel. Cancel takes
 568		// the same per-session lock, so every cancel observes at least
 569		// one of: a cancel mark, an activeRequests entry, or a
 570		// messageQueue entry it then clears.
 571		mu := a.sessionMu(call.SessionID)
 572		mu.Lock()
 573
 574		if a.canceledBySeq(call.SessionID, call.Accepted.seq) {
 575			// Cancel-on-entry: a cancel arrived while this run was
 576			// dispatched but not yet active, and this handle's accept
 577			// sequence is at or below the session's cancel mark. The
 578			// mark is left in place so sibling handles it also covers
 579			// observe the same cancel; release the accept reservation,
 580			// drop the lock, and persist a canceled turn without
 581			// entering Stream.
 582			//
 583			// This path returns before the streaming defer that
 584			// publishes RunComplete is installed, so emit the terminal
 585			// event explicitly. Without it, a caller waiting on
 586			// RunComplete for this RunID (e.g. `crush run`, which
 587			// ignores message events and blocks on RunComplete) would
 588			// hang on an immediately-canceled accepted run.
 589			call.Accepted.Close()
 590			mu.Unlock()
 591			complete := notify.RunComplete{
 592				SessionID: call.SessionID,
 593				RunID:     call.RunID,
 594				Cancelled: true,
 595			}
 596			if err := a.persistCanceledTurn(ctx, call, false); err != nil {
 597				complete.Error = err.Error()
 598				a.publishRunComplete(ctx, call, complete)
 599				return nil, err
 600			}
 601			a.publishRunComplete(ctx, call, complete)
 602			return nil, nil
 603		}
 604
 605		if a.IsSessionBusy(call.SessionID) {
 606			// Busy: an earlier prompt is active. Queue this call and
 607			// release the accept reservation. A Cancel arriving after
 608			// this point sees the active entry and clears the queue.
 609			a.enqueueCall(call)
 610			call.Accepted.Close()
 611			mu.Unlock()
 612			return nil, nil
 613		}
 614
 615		// Idle: become the active run. Register the cancel func before
 616		// dropping the lock so a Cancel that arrives between here and
 617		// assistant creation is not lost.
 618		runCtx := context.WithValue(ctx, tools.SessionIDContextKey, call.SessionID)
 619		genCtx, cancel = context.WithCancel(runCtx)
 620		a.activeRequests.Set(call.SessionID, cancel)
 621		activeRegistered = true
 622		call.Accepted.Close()
 623		mu.Unlock()
 624
 625		defer cancel()
 626		defer a.activeRequests.Del(call.SessionID)
 627	} else if a.IsSessionBusy(call.SessionID) {
 628		// Queue the message if busy. Strip OnComplete: the caller that
 629		// supplied the hook (typically coordinator.Run) has its own
 630		// retry/coalesce scope that ends when it returns, so by the time
 631		// the queue drains nobody is left to consume the buffered
 632		// terminal event. The recursive Run will fall back to the
 633		// default broker publish, which is what existing subscribers
 634		// expect for queued turns.
 635		a.enqueueCall(call)
 636		return nil, nil
 637	}
 638
 639	// Copy mutable fields under lock to avoid races with SetTools/SetModels.
 640	agentTools := a.tools.Copy()
 641	largeModel := a.largeModel.Get()
 642	systemPrompt := a.systemPrompt.Get()
 643	promptPrefix := a.systemPromptPrefix.Get()
 644	var instructions strings.Builder
 645
 646	for _, server := range mcp.GetStates() {
 647		if server.State != mcp.StateConnected {
 648			continue
 649		}
 650		if s := server.Client.InitializeResult().Instructions; s != "" {
 651			instructions.WriteString(s)
 652			instructions.WriteString("\n\n")
 653		}
 654	}
 655
 656	if s := instructions.String(); s != "" {
 657		systemPrompt += "\n\n<mcp-instructions>\n" + s + "\n</mcp-instructions>"
 658	}
 659
 660	if len(agentTools) > 0 {
 661		// Add Anthropic caching to the last tool.
 662		agentTools[len(agentTools)-1].SetProviderOptions(a.getCacheControlOptions())
 663	}
 664
 665	agent := fantasy.NewAgent(
 666		largeModel.Model,
 667		fantasy.WithSystemPrompt(systemPrompt),
 668		fantasy.WithTools(agentTools...),
 669		fantasy.WithUserAgent(userAgent),
 670	)
 671
 672	sessionLock := sync.Mutex{}
 673	currentSession, err := a.sessions.Get(ctx, call.SessionID)
 674	if err != nil {
 675		return nil, fmt.Errorf("failed to get session: %w", err)
 676	}
 677
 678	msgs, err := a.getSessionMessages(ctx, currentSession)
 679	if err != nil {
 680		return nil, fmt.Errorf("failed to get session messages: %w", err)
 681	}
 682
 683	var wg sync.WaitGroup
 684	// Generate title if first message.
 685	if len(msgs) == 0 {
 686		titleCtx := ctx // Copy to avoid race with ctx reassignment below.
 687		wg.Go(func() {
 688			a.generateTitle(titleCtx, call.SessionID, call.Prompt)
 689		})
 690	}
 691	defer wg.Wait()
 692
 693	// Add the user message to the session.
 694	_, err = a.createUserMessage(ctx, call)
 695	if err != nil {
 696		return nil, err
 697	}
 698	userMsgCreated = true
 699
 700	// Add the session to the context.
 701	ctx = context.WithValue(ctx, tools.SessionIDContextKey, call.SessionID)
 702
 703	// For the accepted dispatch path the run context and cancel func
 704	// were already created and registered under dispatchMu above; reuse
 705	// them. For the in-process path create them here, preserving the
 706	// original ordering.
 707	if !activeRegistered {
 708		genCtx, cancel = context.WithCancel(ctx)
 709		a.activeRequests.Set(call.SessionID, cancel)
 710
 711		defer cancel()
 712		defer a.activeRequests.Del(call.SessionID)
 713	}
 714	// skipRunComplete is set just before the queued-recursion path so
 715	// the outer Run doesn't publish a RunComplete that would race
 716	// with — and be superseded by — the recursive call's own
 717	// RunComplete (each queued user prompt is its own turn and
 718	// publishes exactly one terminal event).
 719	var skipRunComplete bool
 720	// currentAssistant is declared here so the deferred RunComplete
 721	// publish below can capture the pointer that PrepareStep will
 722	// later (re)assign for each streaming step. The final assistant
 723	// message of the turn is the value reachable through this
 724	// pointer when the defer runs.
 725	var currentAssistant *message.Message
 726	// Drain any debounced message updates before returning. message.Service
 727	// already flushes synchronously on terminal updates, but a defer here
 728	// guarantees the contract at every Run exit (success, error, panic
 729	// recovery upstream) without callers needing to know.
 730	//
 731	// After the flush completes — meaning all per-message
 732	// Publish(UpdatedEvent) calls have fired and been buffered into
 733	// every subscriber's channel — publish the authoritative
 734	// RunComplete event for this turn. The flush-then-publish order
 735	// gives well-behaved clients the best chance of seeing the final
 736	// message event before RunComplete; the embedded Text field
 737	// reconciles for clients that observe the events out of order
 738	// (the pubsub broker fan-in does not serialize publishes from
 739	// different upstream brokers).
 740	defer func() {
 741		// Use a context detached from the run context: workspace
 742		// shutdown cancels ctx before this goroutine returns, but the
 743		// buffered streaming deltas must still land before the DB is
 744		// closed. A short timeout bounds the flush.
 745		flushCtx, flushCancel := context.WithTimeout(context.WithoutCancel(ctx), 5*time.Second)
 746		defer flushCancel()
 747		if flushErr := a.messages.FlushAll(flushCtx); flushErr != nil {
 748			slog.Error("Failed to flush pending message updates after run", "error", flushErr)
 749		}
 750		if skipRunComplete {
 751			return
 752		}
 753		complete := notify.RunComplete{SessionID: call.SessionID, RunID: call.RunID}
 754		if currentAssistant != nil {
 755			complete.MessageID = currentAssistant.ID
 756			complete.Text = currentAssistant.Content().String()
 757		}
 758		if retErr != nil {
 759			complete.Error = retErr.Error()
 760			complete.Cancelled = errors.Is(retErr, context.Canceled)
 761		} else if ctx.Err() != nil {
 762			complete.Cancelled = true
 763		}
 764		// Prefer the per-call hook when supplied so the coordinator
 765		// can coalesce retries (e.g. unauthorized → re-auth → retry)
 766		// into a single user-visible terminal event. The fallback
 767		// must-deliver publish applies bounded-blocking semantics to
 768		// the authoritative terminal event so a momentarily-full
 769		// subscriber channel can't silently drop it and hang
 770		// non-interactive clients waiting on RunComplete.
 771		a.publishRunComplete(ctx, call, complete)
 772	}()
 773
 774	history, files := a.preparePrompt(msgs, largeModel.CatwalkCfg.SupportsImages, call.Attachments...)
 775
 776	startTime := time.Now()
 777	a.eventPromptSent(call.SessionID)
 778
 779	var stepMessages []fantasy.Message
 780	var shouldSummarize bool
 781	// Don't send MaxOutputTokens if 0 — some providers (e.g. LM Studio) reject it
 782	var maxOutputTokens *int64
 783	if call.MaxOutputTokens > 0 {
 784		maxOutputTokens = &call.MaxOutputTokens
 785	}
 786	result, err = agent.Stream(genCtx, fantasy.AgentStreamCall{
 787		Prompt:           message.PromptWithTextAttachments(call.Prompt, call.Attachments),
 788		Files:            files,
 789		Messages:         history,
 790		ProviderOptions:  call.ProviderOptions,
 791		MaxOutputTokens:  maxOutputTokens,
 792		TopP:             call.TopP,
 793		Temperature:      call.Temperature,
 794		PresencePenalty:  call.PresencePenalty,
 795		TopK:             call.TopK,
 796		FrequencyPenalty: call.FrequencyPenalty,
 797		PrepareStep: func(callContext context.Context, options fantasy.PrepareStepFunctionOptions) (_ context.Context, prepared fantasy.PrepareStepResult, err error) {
 798			prepared.Messages = options.Messages
 799			for i := range prepared.Messages {
 800				prepared.Messages[i].ProviderOptions = nil
 801			}
 802
 803			// Use latest tools (updated by SetTools when MCP tools change).
 804			prepared.Tools = a.tools.Copy()
 805
 806			// Drain queued follow-up prompts for this step. Calls covered
 807			// by a cancel recorded while they sat in the queue are dropped:
 808			// a cancel that arrived after a prompt was queued must not let
 809			// it run as part of this step. Coverage is per-call by accept
 810			// sequence so a follow-up queued after the cancel (higher seq)
 811			// is not dropped. A dropped prompt carrying a RunID still gets
 812			// its terminal cancelled RunComplete so a caller waiting on it
 813			// does not hang. Uncanceled prompts without a RunID are folded
 814			// into this turn; uncanceled prompts with a RunID are left
 815			// queued so each runs as its own turn (with its own
 816			// RunComplete) via the recursive run path below.
 817			fold, canceledRunIDs := a.drainQueueForStep(call.SessionID)
 818			a.publishCanceledQueueDrops(canceledRunIDs)
 819			for _, queued := range fold {
 820				userMessage, createErr := a.createUserMessage(callContext, queued)
 821				if createErr != nil {
 822					return callContext, prepared, createErr
 823				}
 824				prepared.Messages = append(prepared.Messages, userMessage.ToAIMessage()...)
 825			}
 826
 827			prepared.Messages = a.workaroundProviderMediaLimitations(prepared.Messages, largeModel)
 828
 829			lastSystemRoleInx := 0
 830			systemMessageUpdated := false
 831			for i, msg := range prepared.Messages {
 832				// Only add cache control to the last message.
 833				if msg.Role == fantasy.MessageRoleSystem {
 834					lastSystemRoleInx = i
 835				} else if !systemMessageUpdated {
 836					prepared.Messages[lastSystemRoleInx].ProviderOptions = a.getCacheControlOptions()
 837					systemMessageUpdated = true
 838				}
 839				// Than add cache control to the last 2 messages.
 840				if i > len(prepared.Messages)-3 {
 841					prepared.Messages[i].ProviderOptions = a.getCacheControlOptions()
 842				}
 843			}
 844
 845			if promptPrefix != "" {
 846				prepared.Messages = append([]fantasy.Message{fantasy.NewSystemMessage(promptPrefix)}, prepared.Messages...)
 847			}
 848
 849			sessionLock.Lock()
 850			stepMessages = cloneFantasyMessages(prepared.Messages)
 851			sessionLock.Unlock()
 852
 853			var assistantMsg message.Message
 854			assistantMsg, err = a.messages.Create(callContext, call.SessionID, message.CreateMessageParams{
 855				Role:     message.Assistant,
 856				Parts:    []message.ContentPart{},
 857				Model:    largeModel.ModelCfg.Model,
 858				Provider: largeModel.ModelCfg.Provider,
 859			})
 860			if err != nil {
 861				return callContext, prepared, err
 862			}
 863			callContext = context.WithValue(callContext, tools.MessageIDContextKey, assistantMsg.ID)
 864			callContext = context.WithValue(callContext, tools.SupportsImagesContextKey, largeModel.CatwalkCfg.SupportsImages)
 865			callContext = context.WithValue(callContext, tools.ModelNameContextKey, largeModel.CatwalkCfg.Name)
 866			currentAssistant = &assistantMsg
 867			return callContext, prepared, err
 868		},
 869		OnReasoningStart: func(id string, reasoning fantasy.ReasoningContent) error {
 870			currentAssistant.AppendReasoningContent(reasoning.Text)
 871			return a.messages.Update(genCtx, *currentAssistant)
 872		},
 873		OnReasoningDelta: func(id string, text string) error {
 874			currentAssistant.AppendReasoningContent(text)
 875			return a.messages.Update(genCtx, *currentAssistant)
 876		},
 877		OnReasoningEnd: func(id string, reasoning fantasy.ReasoningContent) error {
 878			// handle anthropic signature
 879			if anthropicData, ok := reasoning.ProviderMetadata[anthropic.Name]; ok {
 880				if reasoning, ok := anthropicData.(*anthropic.ReasoningOptionMetadata); ok {
 881					currentAssistant.AppendReasoningSignature(reasoning.Signature)
 882				}
 883			}
 884			if googleData, ok := reasoning.ProviderMetadata[google.Name]; ok {
 885				if reasoning, ok := googleData.(*google.ReasoningMetadata); ok {
 886					currentAssistant.AppendThoughtSignature(reasoning.Signature, reasoning.ToolID)
 887				}
 888			}
 889			if openaiData, ok := reasoning.ProviderMetadata[openai.Name]; ok {
 890				if reasoning, ok := openaiData.(*openai.ResponsesReasoningMetadata); ok {
 891					currentAssistant.SetReasoningResponsesData(reasoning)
 892				}
 893			}
 894			currentAssistant.FinishThinking()
 895			return a.messages.Update(genCtx, *currentAssistant)
 896		},
 897		OnTextDelta: func(id string, text string) error {
 898			// Strip leading newline from initial text content. This is is
 899			// particularly important in non-interactive mode where leading
 900			// newlines are very visible.
 901			if len(currentAssistant.Parts) == 0 {
 902				text = strings.TrimPrefix(text, "\n")
 903			}
 904
 905			currentAssistant.AppendContent(text)
 906			return a.messages.Update(genCtx, *currentAssistant)
 907		},
 908		OnToolInputStart: func(id string, toolName string) error {
 909			toolCall := message.ToolCall{
 910				ID:               id,
 911				Name:             toolName,
 912				ProviderExecuted: false,
 913				Finished:         false,
 914			}
 915			currentAssistant.AddToolCall(toolCall)
 916			// Use parent ctx instead of genCtx to ensure the update succeeds
 917			// even if the request is canceled mid-stream
 918			return a.messages.Update(ctx, *currentAssistant)
 919		},
 920		OnRetry: func(err *fantasy.ProviderError, delay time.Duration) {
 921			slog.Warn("Provider request failed, retrying", providerRetryLogFields(err, delay)...)
 922		},
 923		OnToolCall: func(tc fantasy.ToolCallContent) error {
 924			toolCall := message.ToolCall{
 925				ID:               tc.ToolCallID,
 926				Name:             tc.ToolName,
 927				Input:            tc.Input,
 928				ProviderExecuted: false,
 929				Finished:         true,
 930			}
 931			currentAssistant.AddToolCall(toolCall)
 932			// Use parent ctx instead of genCtx to ensure the update succeeds
 933			// even if the request is canceled mid-stream
 934			return a.messages.Update(ctx, *currentAssistant)
 935		},
 936		OnToolResult: func(result fantasy.ToolResultContent) error {
 937			toolResult := a.convertToToolResult(result)
 938			// Use parent ctx instead of genCtx to ensure the message is created
 939			// even if the request is canceled mid-stream
 940			_, createMsgErr := a.messages.Create(ctx, currentAssistant.SessionID, message.CreateMessageParams{
 941				Role: message.Tool,
 942				Parts: []message.ContentPart{
 943					toolResult,
 944				},
 945			})
 946			return createMsgErr
 947		},
 948		OnStepFinish: func(stepResult fantasy.StepResult) error {
 949			finishReason := message.FinishReasonUnknown
 950			switch stepResult.FinishReason {
 951			case fantasy.FinishReasonLength:
 952				finishReason = message.FinishReasonMaxTokens
 953			case fantasy.FinishReasonStop:
 954				finishReason = message.FinishReasonEndTurn
 955			case fantasy.FinishReasonToolCalls:
 956				finishReason = message.FinishReasonToolUse
 957			}
 958			// If a tool result halted the turn (e.g. a hook halt or a
 959			// permission denial), the step ends on FinishReasonToolCalls but
 960			// the model will not be called again. Treat it as the end of the
 961			// turn so the UI can render the assistant footer.
 962			if finishReason == message.FinishReasonToolUse {
 963				for _, tr := range stepResult.Content.ToolResults() {
 964					if tr.StopTurn {
 965						finishReason = message.FinishReasonEndTurn
 966						break
 967					}
 968				}
 969			}
 970			currentAssistant.AddFinish(finishReason, "", "")
 971			sessionLock.Lock()
 972			defer sessionLock.Unlock()
 973
 974			updatedSession, getSessionErr := a.sessions.Get(ctx, call.SessionID)
 975			if getSessionErr != nil {
 976				return getSessionErr
 977			}
 978			usage, estimated := fallbackStepUsage(stepMessages, stepResult)
 979			a.updateSessionUsage(largeModel, &updatedSession, usage, a.openrouterCost(stepResult.ProviderMetadata), estimated)
 980			_, sessionErr := a.sessions.Save(ctx, updatedSession)
 981			if sessionErr != nil {
 982				return sessionErr
 983			}
 984			currentSession = updatedSession
 985			return a.messages.Update(genCtx, *currentAssistant)
 986		},
 987		StopWhen: []fantasy.StopCondition{
 988			func(_ []fantasy.StepResult) bool {
 989				cw := int64(largeModel.CatwalkCfg.ContextWindow)
 990				// If context window is unknown (0), skip auto-summarize
 991				// to avoid immediately truncating custom/local models.
 992				if cw == 0 {
 993					return false
 994				}
 995				tokens := currentSession.CompletionTokens + currentSession.PromptTokens
 996				remaining := cw - tokens
 997				var threshold int64
 998				if cw > largeContextWindowThreshold {
 999					threshold = largeContextWindowBuffer
1000				} else {
1001					threshold = int64(float64(cw) * smallContextWindowRatio)
1002				}
1003				if (remaining <= threshold) && !a.disableAutoSummarize {
1004					shouldSummarize = true
1005					return true
1006				}
1007				return false
1008			},
1009			func(steps []fantasy.StepResult) bool {
1010				return hasRepeatedToolCalls(steps, loopDetectionWindowSize, loopDetectionMaxRepeats)
1011			},
1012		},
1013	})
1014
1015	a.eventPromptResponded(call.SessionID, time.Since(startTime).Truncate(time.Second))
1016
1017	if err != nil {
1018		isHyper := largeModel.ModelCfg.Provider == hyper.Name
1019		isCancelErr := errors.Is(err, context.Canceled)
1020		if currentAssistant == nil {
1021			// Cancel-before-assistant-creation window: the run was
1022			// canceled after activeRequests.Set but before PrepareStep
1023			// created the assistant message. Without this, the turn
1024			// would return with no FinishReasonCanceled marker and no
1025			// user-visible record. The user message was already created
1026			// above, so persistCanceledTurn only writes the assistant
1027			// record.
1028			if isCancelErr {
1029				if persistErr := a.persistCanceledTurn(ctx, call, userMsgCreated); persistErr != nil {
1030					return nil, persistErr
1031				}
1032			}
1033			return result, err
1034		}
1035		// Persist final state with a context detached from the run
1036		// context. The run context (ctx) is derived from the
1037		// workspace context, which workspace shutdown cancels before
1038		// agent goroutines finish; using ctx here would drop the
1039		// final assistant state. WithoutCancel keeps the values
1040		// (e.g. session ID) while ignoring cancellation, and a short
1041		// timeout bounds the cleanup writes.
1042		cleanupCtx, cleanupCancel := context.WithTimeout(context.WithoutCancel(ctx), 5*time.Second)
1043		defer cleanupCancel()
1044		// Ensure we finish thinking on error to close the reasoning state.
1045		currentAssistant.FinishThinking()
1046		toolCalls := currentAssistant.ToolCalls()
1047		// INFO: we use the cleanup context here because the genCtx has been cancelled.
1048		msgs, createErr := a.messages.List(cleanupCtx, currentAssistant.SessionID)
1049		if createErr != nil {
1050			return nil, createErr
1051		}
1052		for _, tc := range toolCalls {
1053			if !tc.Finished {
1054				tc.Finished = true
1055				tc.Input = "{}"
1056				currentAssistant.AddToolCall(tc)
1057				updateErr := a.messages.Update(cleanupCtx, *currentAssistant)
1058				if updateErr != nil {
1059					return nil, updateErr
1060				}
1061			}
1062
1063			found := false
1064			for _, msg := range msgs {
1065				if msg.Role == message.Tool {
1066					for _, tr := range msg.ToolResults() {
1067						if tr.ToolCallID == tc.ID {
1068							found = true
1069							break
1070						}
1071					}
1072				}
1073				if found {
1074					break
1075				}
1076			}
1077			if found {
1078				continue
1079			}
1080			content := "There was an error while executing the tool"
1081			if isCancelErr {
1082				content = "Error: user cancelled assistant tool calling"
1083			}
1084			toolResult := message.ToolResult{
1085				ToolCallID: tc.ID,
1086				Name:       tc.Name,
1087				Content:    content,
1088				IsError:    true,
1089			}
1090			_, createErr = a.messages.Create(cleanupCtx, currentAssistant.SessionID, message.CreateMessageParams{
1091				Role: message.Tool,
1092				Parts: []message.ContentPart{
1093					toolResult,
1094				},
1095			})
1096			if createErr != nil {
1097				return nil, createErr
1098			}
1099		}
1100		var fantasyErr *fantasy.Error
1101		var providerErr *fantasy.ProviderError
1102		const defaultTitle = "Provider Error"
1103		linkStyle := lipgloss.NewStyle().Foreground(charmtone.Guac).Underline(true)
1104		if isCancelErr {
1105			currentAssistant.AddFinish(message.FinishReasonCanceled, "User canceled request", "")
1106		} else if isHyper && errors.As(err, &providerErr) && providerErr.StatusCode == http.StatusUnauthorized {
1107			currentAssistant.AddFinish(message.FinishReasonError, "Unauthorized", `Please re-authenticate with Hyper. You can also run "crush auth" to re-authenticate.`)
1108			if a.notify != nil {
1109				a.notify.Publish(pubsub.CreatedEvent, notify.Notification{
1110					SessionID:    call.SessionID,
1111					SessionTitle: currentSession.Title,
1112					Type:         notify.TypeReAuthenticate,
1113					ProviderID:   largeModel.ModelCfg.Provider,
1114				})
1115			}
1116		} else if isHyper && errors.As(err, &providerErr) && providerErr.StatusCode == http.StatusPaymentRequired {
1117			url := hyper.BaseURL()
1118			link := linkStyle.Hyperlink(url, "id=hyper").Render(url)
1119			currentAssistant.AddFinish(message.FinishReasonError, "No credits", "You're out of credits. Add more at "+link)
1120		} else if errors.As(err, &providerErr) {
1121			if providerErr.Message == "The requested model is not supported." {
1122				url := "https://github.com/settings/copilot/features"
1123				link := linkStyle.Hyperlink(url, "id=copilot").Render(url)
1124				currentAssistant.AddFinish(
1125					message.FinishReasonError,
1126					"Copilot model not enabled",
1127					fmt.Sprintf("%q is not enabled in Copilot. Go to the following page to enable it. Then, wait 5 minutes before trying again. %s", largeModel.CatwalkCfg.Name, link),
1128				)
1129			} else {
1130				currentAssistant.AddFinish(message.FinishReasonError, cmp.Or(stringext.Capitalize(providerErr.Title), defaultTitle), providerErr.Message)
1131			}
1132		} else if errors.As(err, &fantasyErr) {
1133			currentAssistant.AddFinish(message.FinishReasonError, cmp.Or(stringext.Capitalize(fantasyErr.Title), defaultTitle), fantasyErr.Message)
1134		} else {
1135			currentAssistant.AddFinish(message.FinishReasonError, defaultTitle, err.Error())
1136		}
1137		// Note: we use the cleanup context here because the genCtx has been
1138		// cancelled.
1139		updateErr := a.messages.Update(cleanupCtx, *currentAssistant)
1140		if updateErr != nil {
1141			return nil, updateErr
1142		}
1143		return nil, err
1144	}
1145
1146	if shouldSummarize {
1147		a.activeRequests.Del(call.SessionID)
1148		if summarizeErr := a.Summarize(genCtx, call.SessionID, call.ProviderOptions); summarizeErr != nil {
1149			return nil, summarizeErr
1150		}
1151		// If the agent wasn't done...
1152		if len(currentAssistant.ToolCalls()) > 0 {
1153			existing, ok := a.messageQueue.Get(call.SessionID)
1154			if !ok {
1155				existing = []SessionAgentCall{}
1156			}
1157			call.Prompt = fmt.Sprintf("The previous session was interrupted because it got too long, the initial user request was: `%s`", call.Prompt)
1158			existing = append(existing, call)
1159			a.messageQueue.Set(call.SessionID, existing)
1160		}
1161	}
1162
1163	// Release active request before publishing the notification.
1164	// TUI handlers poll IsSessionBusy() and only re-evaluate when a
1165	// tea.Msg arrives, so the cleanup must precede the notify or
1166	// subscribers see stale busy state at the moment of receipt.
1167	a.activeRequests.Del(call.SessionID)
1168	cancel()
1169
1170	// Send notification that agent has finished its turn (skip for
1171	// nested/non-interactive sessions).
1172	if !call.NonInteractive && a.notify != nil {
1173		a.notify.Publish(pubsub.CreatedEvent, notify.Notification{
1174			SessionID:    call.SessionID,
1175			SessionTitle: currentSession.Title,
1176			Type:         notify.TypeAgentFinished,
1177		})
1178	}
1179
1180	// Hand off to the next queued prompt (if any) under dispatchMu so
1181	// the transition from this finished run to the queued run is atomic
1182	// against a concurrent Cancel. activeRequests for this session was
1183	// just deleted above, so without the lock there is a window in
1184	// which the session looks idle and a cancel becomes a no-op that
1185	// fails to stop the queued prompt. Holding the lock lets us observe
1186	// a pending cancel recorded against the session and drop the queue
1187	// instead of running it, and (for the recursion) hand a fresh
1188	// accept reservation to the dequeued call so acceptedRuns stays > 0
1189	// across the recursive Run's own dispatch handoff — keeping the
1190	// session observable to Cancel for the entire transition and
1191	// closing the dequeue -> re-register window.
1192	mu := a.sessionMu(call.SessionID)
1193	mu.Lock()
1194	queuedMessages, _ := a.messageQueue.Get(call.SessionID)
1195	if mark, ok := a.cancelMark.Get(call.SessionID); ok && mark > 0 && len(queuedMessages) > 0 {
1196		// A cancel was recorded for this session (e.g. it arrived while
1197		// this run was active and follow-ups had been queued). Drop the
1198		// queued prompts it covers (accept sequence at or below the
1199		// mark, or untracked); keep any queued after the cancel (higher
1200		// sequence) so they still run.
1201		var kept []SessionAgentCall
1202		var canceledRunIDDrops []SessionAgentCall
1203		for _, q := range queuedMessages {
1204			if q.acceptSeq == 0 || q.acceptSeq <= mark {
1205				if q.RunID != "" {
1206					canceledRunIDDrops = append(canceledRunIDDrops, q)
1207				}
1208				continue
1209			}
1210			kept = append(kept, q)
1211		}
1212		queuedMessages = kept
1213		a.messageQueue.Set(call.SessionID, kept)
1214		// A dropped prompt carrying a RunID must still publish its
1215		// terminal cancelled RunComplete so a caller waiting on that
1216		// RunID does not hang.
1217		a.publishCanceledQueueDrops(canceledRunIDDrops)
1218	}
1219	if len(queuedMessages) == 0 {
1220		// No queued work. Clear the cancel mark only when no accepted
1221		// run remains in flight that it might still cover; otherwise a
1222		// sibling prompt (sequence at or below the mark) waiting to
1223		// enter Run would lose its cancellation. When accepted runs are
1224		// gone, this also clears a stale mark so it can't catch a
1225		// future run.
1226		a.messageQueue.Del(call.SessionID)
1227		a.acceptedMu.Lock()
1228		inFlight, _ := a.acceptedRuns.Get(call.SessionID)
1229		a.acceptedMu.Unlock()
1230		if inFlight == 0 {
1231			a.cancelMark.Del(call.SessionID)
1232		}
1233		mu.Unlock()
1234		return result, err
1235	}
1236	// There are queued messages, restart the loop. Suppress the outer
1237	// defer's emit: it would otherwise observe the recursive Run's retErr
1238	// (named-return clobbering through the return below) against this
1239	// turn's MessageID/Text and publish a mixed, racing event.
1240	skipRunComplete = true
1241	// Decide whether this turn still owes its own terminal RunComplete.
1242	// Each submitted prompt with a RunID has its own lifecycle, so a turn
1243	// that is finished and handing off to a *different* queued prompt must
1244	// publish its own RunComplete here — leaving it to the recursive turn
1245	// (which carries a different RunID) would hang a caller waiting on
1246	// this turn's RunID. The exception is the summarize-continuation path,
1247	// which re-queues this same call (same RunID) to resume after a
1248	// summary; in that case the eventual terminal turn for this RunID
1249	// publishes, so publishing now would double-emit.
1250	outerOwesRunComplete := call.RunID != ""
1251	if outerOwesRunComplete {
1252		for _, q := range queuedMessages {
1253			if q.RunID == call.RunID {
1254				outerOwesRunComplete = false
1255				break
1256			}
1257		}
1258	}
1259	firstQueuedMessage := queuedMessages[0]
1260	a.messageQueue.Set(call.SessionID, queuedMessages[1:])
1261	// Reserve a fresh accept for the dequeued prompt before dropping the
1262	// lock so acceptedRuns > 0 across the handoff into the recursive
1263	// Run. This closes the window between this dequeue and the recursive
1264	// Run registering its activeRequests entry: a cancel arriving in
1265	// that window now records a pending cancel (acceptedRuns > 0) that
1266	// the recursive Run's accepted path observes as cancel-on-entry.
1267	firstQueuedMessage.Accepted = a.BeginAccepted(call.SessionID)
1268	mu.Unlock()
1269	if outerOwesRunComplete {
1270		complete := notify.RunComplete{SessionID: call.SessionID, RunID: call.RunID}
1271		if currentAssistant != nil {
1272			complete.MessageID = currentAssistant.ID
1273			complete.Text = currentAssistant.Content().String()
1274		}
1275		if ctx.Err() != nil {
1276			complete.Cancelled = true
1277		}
1278		a.publishRunComplete(ctx, call, complete)
1279	}
1280	return a.Run(ctx, firstQueuedMessage)
1281}
1282
1283func (a *sessionAgent) Summarize(ctx context.Context, sessionID string, opts fantasy.ProviderOptions) error {
1284	if a.IsSessionBusy(sessionID) {
1285		return ErrSessionBusy
1286	}
1287
1288	// Copy mutable fields under lock to avoid races with SetModels.
1289	largeModel := a.largeModel.Get()
1290	systemPromptPrefix := a.systemPromptPrefix.Get()
1291
1292	currentSession, err := a.sessions.Get(ctx, sessionID)
1293	if err != nil {
1294		return fmt.Errorf("failed to get session: %w", err)
1295	}
1296	msgs, err := a.getSessionMessages(ctx, currentSession)
1297	if err != nil {
1298		return err
1299	}
1300	if len(msgs) == 0 {
1301		// Nothing to summarize.
1302		return nil
1303	}
1304
1305	aiMsgs, _ := a.preparePrompt(msgs, largeModel.CatwalkCfg.SupportsImages)
1306
1307	genCtx, cancel := context.WithCancel(ctx)
1308	a.activeRequests.Set(sessionID, cancel)
1309	defer a.activeRequests.Del(sessionID)
1310	defer cancel()
1311	defer func() {
1312		if flushErr := a.messages.FlushAll(ctx); flushErr != nil {
1313			slog.Error("Failed to flush pending message updates after summarize", "error", flushErr)
1314		}
1315	}()
1316
1317	agent := fantasy.NewAgent(
1318		largeModel.Model,
1319		fantasy.WithSystemPrompt(string(summaryPrompt)),
1320		fantasy.WithUserAgent(userAgent),
1321	)
1322	summaryMessage, err := a.messages.Create(ctx, sessionID, message.CreateMessageParams{
1323		Role:             message.Assistant,
1324		Model:            largeModel.ModelCfg.Model,
1325		Provider:         largeModel.ModelCfg.Provider,
1326		IsSummaryMessage: true,
1327	})
1328	if err != nil {
1329		return err
1330	}
1331
1332	summaryPromptText := buildSummaryPrompt(currentSession.Todos)
1333
1334	resp, err := agent.Stream(genCtx, fantasy.AgentStreamCall{
1335		Prompt:          summaryPromptText,
1336		Messages:        aiMsgs,
1337		ProviderOptions: opts,
1338		PrepareStep: func(callContext context.Context, options fantasy.PrepareStepFunctionOptions) (_ context.Context, prepared fantasy.PrepareStepResult, err error) {
1339			prepared.Messages = options.Messages
1340			if systemPromptPrefix != "" {
1341				prepared.Messages = append([]fantasy.Message{fantasy.NewSystemMessage(systemPromptPrefix)}, prepared.Messages...)
1342			}
1343			return callContext, prepared, nil
1344		},
1345		OnReasoningDelta: func(id string, text string) error {
1346			summaryMessage.AppendReasoningContent(text)
1347			return a.messages.Update(genCtx, summaryMessage)
1348		},
1349		OnReasoningEnd: func(id string, reasoning fantasy.ReasoningContent) error {
1350			// Handle anthropic signature.
1351			if anthropicData, ok := reasoning.ProviderMetadata["anthropic"]; ok {
1352				if signature, ok := anthropicData.(*anthropic.ReasoningOptionMetadata); ok && signature.Signature != "" {
1353					summaryMessage.AppendReasoningSignature(signature.Signature)
1354				}
1355			}
1356			summaryMessage.FinishThinking()
1357			return a.messages.Update(genCtx, summaryMessage)
1358		},
1359		OnTextDelta: func(id, text string) error {
1360			summaryMessage.AppendContent(text)
1361			return a.messages.Update(genCtx, summaryMessage)
1362		},
1363	})
1364	if err != nil {
1365		isCancelErr := errors.Is(err, context.Canceled)
1366		if isCancelErr {
1367			// User cancelled summarize we need to remove the summary message.
1368			deleteErr := a.messages.Delete(ctx, summaryMessage.ID)
1369			return deleteErr
1370		}
1371		// Mark the summary message as finished with an error so the UI
1372		// stops spinning.
1373		summaryMessage.AddFinish(message.FinishReasonError, "Summarization Error", err.Error())
1374		if updateErr := a.messages.Update(ctx, summaryMessage); updateErr != nil {
1375			return updateErr
1376		}
1377		return err
1378	}
1379
1380	summaryMessage.AddFinish(message.FinishReasonEndTurn, "", "")
1381	err = a.messages.Update(genCtx, summaryMessage)
1382	if err != nil {
1383		return err
1384	}
1385
1386	var openrouterCost *float64
1387	for _, step := range resp.Steps {
1388		stepCost := a.openrouterCost(step.ProviderMetadata)
1389		if stepCost != nil {
1390			newCost := *stepCost
1391			if openrouterCost != nil {
1392				newCost += *openrouterCost
1393			}
1394			openrouterCost = &newCost
1395		}
1396	}
1397
1398	a.updateSessionUsage(largeModel, &currentSession, resp.TotalUsage, openrouterCost, false)
1399
1400	// Just in case, get just the last usage info.
1401	usage := resp.Response.Usage
1402	currentSession.SummaryMessageID = summaryMessage.ID
1403	currentSession.CompletionTokens = summaryCompletionTokens(usage, summaryMessage)
1404	currentSession.PromptTokens = 0
1405	currentSession.EstimatedUsage = usageIsZero(usage)
1406	_, err = a.sessions.Save(genCtx, currentSession)
1407	if err != nil {
1408		return err
1409	}
1410
1411	// Release the active request before processing queued messages so that
1412	// Run() does not see the session as busy.
1413	a.activeRequests.Del(sessionID)
1414	cancel()
1415
1416	// Process any messages that were queued while summarizing.
1417	queuedMessages, ok := a.messageQueue.Get(sessionID)
1418	if !ok || len(queuedMessages) == 0 {
1419		return nil
1420	}
1421	firstQueuedMessage := queuedMessages[0]
1422	a.messageQueue.Set(sessionID, queuedMessages[1:])
1423	_, qErr := a.Run(ctx, firstQueuedMessage)
1424	return qErr
1425}
1426
1427func (a *sessionAgent) getCacheControlOptions() fantasy.ProviderOptions {
1428	if t, _ := strconv.ParseBool(os.Getenv("CRUSH_DISABLE_ANTHROPIC_CACHE")); t {
1429		return fantasy.ProviderOptions{}
1430	}
1431	return fantasy.ProviderOptions{
1432		anthropic.Name: &anthropic.ProviderCacheControlOptions{
1433			CacheControl: anthropic.CacheControl{Type: "ephemeral"},
1434		},
1435		bedrock.Name: &anthropic.ProviderCacheControlOptions{
1436			CacheControl: anthropic.CacheControl{Type: "ephemeral"},
1437		},
1438		vercel.Name: &anthropic.ProviderCacheControlOptions{
1439			CacheControl: anthropic.CacheControl{Type: "ephemeral"},
1440		},
1441	}
1442}
1443
1444func (a *sessionAgent) createUserMessage(ctx context.Context, call SessionAgentCall) (message.Message, error) {
1445	parts := []message.ContentPart{message.TextContent{Text: call.Prompt}}
1446	var attachmentParts []message.ContentPart
1447	for _, attachment := range call.Attachments {
1448		attachmentParts = append(attachmentParts, message.BinaryContent{Path: attachment.FilePath, MIMEType: attachment.MimeType, Data: attachment.Content})
1449	}
1450	parts = append(parts, attachmentParts...)
1451	msg, err := a.messages.Create(ctx, call.SessionID, message.CreateMessageParams{
1452		Role:  message.User,
1453		Parts: parts,
1454	})
1455	if err != nil {
1456		return message.Message{}, fmt.Errorf("failed to create user message: %w", err)
1457	}
1458	return msg, nil
1459}
1460
1461func (a *sessionAgent) preparePrompt(msgs []message.Message, supportsImages bool, attachments ...message.Attachment) ([]fantasy.Message, []fantasy.FilePart) {
1462	var history []fantasy.Message
1463	if !a.isSubAgent {
1464		history = append(history, fantasy.NewUserMessage(
1465			fmt.Sprintf(
1466				"<system_reminder>%s</system_reminder>",
1467				`This is a reminder that your todo list is currently empty. DO NOT mention this to the user explicitly because they are already aware.
1468If you are working on tasks that would benefit from a todo list please use the "todos" tool to create one.
1469If not, please feel free to ignore. Again do not mention this message to the user.`,
1470			),
1471		))
1472	}
1473	// Collect all tool call IDs present in assistant messages and all tool
1474	// result IDs present in tool messages. This lets us detect both orphaned
1475	// tool results (result without a call) and orphaned tool calls (call
1476	// without a result).
1477	knownToolCallIDs := make(map[string]struct{})
1478	knownToolResultIDs := make(map[string]struct{})
1479	for _, m := range msgs {
1480		switch m.Role {
1481		case message.Assistant:
1482			for _, tc := range m.ToolCalls() {
1483				knownToolCallIDs[tc.ID] = struct{}{}
1484			}
1485		case message.Tool:
1486			for _, tr := range m.ToolResults() {
1487				knownToolResultIDs[tr.ToolCallID] = struct{}{}
1488			}
1489		}
1490	}
1491
1492	for _, m := range msgs {
1493		if len(m.Parts) == 0 {
1494			continue
1495		}
1496		// Assistant message without content or tool calls (cancelled before it returned anything).
1497		if m.Role == message.Assistant && len(m.ToolCalls()) == 0 && m.Content().Text == "" && m.ReasoningContent().String() == "" {
1498			continue
1499		}
1500		if m.Role == message.Tool {
1501			if msg, ok := filterOrphanedToolResults(m, knownToolCallIDs); ok {
1502				history = append(history, msg)
1503			}
1504			continue
1505		}
1506		aiMsgs := m.ToAIMessage()
1507		if !supportsImages {
1508			for i := range aiMsgs {
1509				if aiMsgs[i].Role == fantasy.MessageRoleUser {
1510					aiMsgs[i].Content = filterFileParts(aiMsgs[i].Content)
1511				}
1512			}
1513		}
1514		history = append(history, aiMsgs...)
1515
1516		if m.Role == message.Assistant {
1517			if msg, ok := syntheticToolResultsForOrphanedCalls(m, knownToolResultIDs); ok {
1518				history = append(history, msg)
1519			}
1520		}
1521	}
1522
1523	var files []fantasy.FilePart
1524	for _, attachment := range attachments {
1525		if attachment.IsText() {
1526			continue
1527		}
1528		files = append(files, fantasy.FilePart{
1529			Filename:  attachment.FileName,
1530			Data:      attachment.Content,
1531			MediaType: attachment.MimeType,
1532		})
1533	}
1534
1535	return history, files
1536}
1537
1538// filterFileParts removes fantasy.FilePart entries from a slice of message
1539// parts. Used to strip image attachments from historical user messages when
1540// the current model does not support them.
1541func filterFileParts(parts []fantasy.MessagePart) []fantasy.MessagePart {
1542	filtered := make([]fantasy.MessagePart, 0, len(parts))
1543	for _, part := range parts {
1544		if _, ok := fantasy.AsMessagePart[fantasy.FilePart](part); ok {
1545			continue
1546		}
1547		filtered = append(filtered, part)
1548	}
1549	return filtered
1550}
1551
1552// filterOrphanedToolResults converts a tool message to a fantasy.Message,
1553// dropping any tool result parts whose tool_call_id has no matching tool call
1554// in the known set. An orphaned result causes API validation to fail on every
1555// subsequent turn, permanently locking the session. Returns the filtered
1556// message and true if at least one valid part remains.
1557func filterOrphanedToolResults(m message.Message, knownToolCallIDs map[string]struct{}) (fantasy.Message, bool) {
1558	aiMsgs := m.ToAIMessage()
1559	if len(aiMsgs) == 0 {
1560		return fantasy.Message{}, false
1561	}
1562	var validParts []fantasy.MessagePart
1563	for _, part := range aiMsgs[0].Content {
1564		tr, ok := fantasy.AsMessagePart[fantasy.ToolResultPart](part)
1565		if !ok {
1566			validParts = append(validParts, part)
1567			continue
1568		}
1569		if _, known := knownToolCallIDs[tr.ToolCallID]; known {
1570			validParts = append(validParts, part)
1571		} else {
1572			slog.Warn(
1573				"Dropping orphaned tool result with no matching tool call",
1574				"tool_call_id", tr.ToolCallID,
1575			)
1576		}
1577	}
1578	if len(validParts) == 0 {
1579		return fantasy.Message{}, false
1580	}
1581	msg := aiMsgs[0]
1582	msg.Content = validParts
1583	return msg, true
1584}
1585
1586// syntheticToolResultsForOrphanedCalls returns a tool message containing
1587// synthetic tool results for any tool calls in the assistant message that
1588// have no matching result in knownToolResultIDs. LLM APIs require every
1589// tool_use to be immediately followed by a tool_result; an interrupted
1590// session can leave orphaned tool_use blocks that permanently lock the
1591// conversation. Returns the message and true if any synthetic results were
1592// produced.
1593func syntheticToolResultsForOrphanedCalls(m message.Message, knownToolResultIDs map[string]struct{}) (fantasy.Message, bool) {
1594	var syntheticParts []fantasy.MessagePart
1595	for _, tc := range m.ToolCalls() {
1596		if _, hasResult := knownToolResultIDs[tc.ID]; hasResult {
1597			continue
1598		}
1599		slog.Warn(
1600			"Injecting synthetic tool result for orphaned tool call",
1601			"tool_call_id", tc.ID,
1602			"tool_name", tc.Name,
1603		)
1604		syntheticParts = append(syntheticParts, fantasy.ToolResultPart{
1605			ToolCallID: tc.ID,
1606			Output: fantasy.ToolResultOutputContentError{
1607				Error: errors.New("tool call was interrupted and did not produce a result, you may retry this call if the result is still needed"),
1608			},
1609		})
1610	}
1611	if len(syntheticParts) == 0 {
1612		return fantasy.Message{}, false
1613	}
1614	return fantasy.Message{
1615		Role:    fantasy.MessageRoleTool,
1616		Content: syntheticParts,
1617	}, true
1618}
1619
1620func (a *sessionAgent) getSessionMessages(ctx context.Context, session session.Session) ([]message.Message, error) {
1621	msgs, err := a.messages.List(ctx, session.ID)
1622	if err != nil {
1623		return nil, fmt.Errorf("failed to list messages: %w", err)
1624	}
1625
1626	if session.SummaryMessageID != "" {
1627		summaryMsgIndex := -1
1628		for i, msg := range msgs {
1629			if msg.ID == session.SummaryMessageID {
1630				summaryMsgIndex = i
1631				break
1632			}
1633		}
1634		if summaryMsgIndex != -1 {
1635			msgs = msgs[summaryMsgIndex:]
1636			msgs[0].Role = message.User
1637		}
1638	}
1639	return msgs, nil
1640}
1641
1642// generateTitle generates a session titled based on the initial prompt.
1643func (a *sessionAgent) generateTitle(ctx context.Context, sessionID string, userPrompt string) {
1644	if userPrompt == "" {
1645		return
1646	}
1647
1648	smallModel := a.smallModel.Get()
1649	largeModel := a.largeModel.Get()
1650	systemPromptPrefix := a.systemPromptPrefix.Get()
1651
1652	var maxOutputTokens int64 = 40
1653	if smallModel.CatwalkCfg.CanReason {
1654		maxOutputTokens = smallModel.CatwalkCfg.DefaultMaxTokens
1655	}
1656
1657	newAgent := func(m fantasy.LanguageModel, p []byte, tok int64) fantasy.Agent {
1658		return fantasy.NewAgent(
1659			m,
1660			fantasy.WithSystemPrompt(string(p)+"\n /no_think"),
1661			fantasy.WithMaxOutputTokens(tok),
1662			fantasy.WithUserAgent(userAgent),
1663		)
1664	}
1665
1666	streamCall := fantasy.AgentStreamCall{
1667		Prompt: fmt.Sprintf("Generate a concise title for the following content:\n\n%s\n <think>\n\n</think>", userPrompt),
1668		PrepareStep: func(callCtx context.Context, opts fantasy.PrepareStepFunctionOptions) (_ context.Context, prepared fantasy.PrepareStepResult, err error) {
1669			prepared.Messages = opts.Messages
1670			if systemPromptPrefix != "" {
1671				prepared.Messages = append([]fantasy.Message{
1672					fantasy.NewSystemMessage(systemPromptPrefix),
1673				}, prepared.Messages...)
1674			}
1675			return callCtx, prepared, nil
1676		},
1677	}
1678
1679	// Use the small model to generate the title.
1680	model := smallModel
1681	agent := newAgent(model.Model, titlePrompt, maxOutputTokens)
1682	resp, err := agent.Stream(ctx, streamCall)
1683	if err == nil {
1684		// We successfully generated a title with the small model.
1685		slog.Debug("Generated title with small model")
1686	} else {
1687		// It didn't work. Let's try with the big model.
1688		slog.Error("Error generating title with small model; trying big model", "err", err)
1689		model = largeModel
1690		agent = newAgent(model.Model, titlePrompt, maxOutputTokens)
1691		resp, err = agent.Stream(ctx, streamCall)
1692		if err == nil {
1693			slog.Debug("Generated title with large model")
1694		} else {
1695			// Welp, the large model didn't work either. Use the default
1696			// session name and return.
1697			slog.Error("Error generating title with large model", "err", err)
1698			saveErr := a.sessions.Rename(ctx, sessionID, DefaultSessionName)
1699			if saveErr != nil {
1700				slog.Error("Failed to save session title", "error", saveErr)
1701			}
1702			return
1703		}
1704	}
1705
1706	if resp == nil {
1707		// Actually, we didn't get a response so we can't. Use the default
1708		// session name and return.
1709		slog.Error("Response is nil; can't generate title")
1710		saveErr := a.sessions.Rename(ctx, sessionID, DefaultSessionName)
1711		if saveErr != nil {
1712			slog.Error("Failed to save session title", "error", saveErr)
1713		}
1714		return
1715	}
1716
1717	// Clean up title.
1718	var title string
1719	title = strings.ReplaceAll(resp.Response.Content.Text(), "\n", " ")
1720
1721	// Remove thinking tags if present.
1722	title = thinkTagRegex.ReplaceAllString(title, "")
1723	title = orphanThinkTagRegex.ReplaceAllString(title, "")
1724
1725	title = strings.TrimSpace(title)
1726	title = cmp.Or(title, DefaultSessionName)
1727
1728	// Calculate usage and cost.
1729	var openrouterCost *float64
1730	for _, step := range resp.Steps {
1731		stepCost := a.openrouterCost(step.ProviderMetadata)
1732		if stepCost != nil {
1733			newCost := *stepCost
1734			if openrouterCost != nil {
1735				newCost += *openrouterCost
1736			}
1737			openrouterCost = &newCost
1738		}
1739	}
1740
1741	modelConfig := model.CatwalkCfg
1742	cost := modelConfig.CostPer1MInCached/1e6*float64(resp.TotalUsage.CacheCreationTokens) +
1743		modelConfig.CostPer1MOutCached/1e6*float64(resp.TotalUsage.CacheReadTokens) +
1744		modelConfig.CostPer1MIn/1e6*float64(resp.TotalUsage.InputTokens) +
1745		modelConfig.CostPer1MOut/1e6*float64(resp.TotalUsage.OutputTokens)
1746
1747	// Use override cost if available (e.g., from OpenRouter).
1748	if openrouterCost != nil {
1749		cost = *openrouterCost
1750	}
1751
1752	// Skip cost accumulation
1753	if model.FlatRate {
1754		cost = 0
1755	}
1756
1757	promptTokens := resp.TotalUsage.InputTokens + resp.TotalUsage.CacheCreationTokens
1758	completionTokens := resp.TotalUsage.OutputTokens
1759
1760	// Atomically update only title and usage fields to avoid overriding other
1761	// concurrent session updates.
1762	saveErr := a.sessions.UpdateTitleAndUsage(ctx, sessionID, title, promptTokens, completionTokens, cost)
1763	if saveErr != nil {
1764		slog.Error("Failed to save session title and usage", "error", saveErr)
1765		return
1766	}
1767}
1768
1769func (a *sessionAgent) openrouterCost(metadata fantasy.ProviderMetadata) *float64 {
1770	openrouterMetadata, ok := metadata[openrouter.Name]
1771	if !ok {
1772		return nil
1773	}
1774
1775	opts, ok := openrouterMetadata.(*openrouter.ProviderMetadata)
1776	if !ok {
1777		return nil
1778	}
1779	return &opts.Usage.Cost
1780}
1781
1782func (a *sessionAgent) updateSessionUsage(model Model, session *session.Session, usage fantasy.Usage, overrideCost *float64, estimated bool) {
1783	if !usageIsZero(usage) {
1784		session.EstimatedUsage = estimated
1785	}
1786
1787	modelConfig := model.CatwalkCfg
1788	cost := modelConfig.CostPer1MInCached/1e6*float64(usage.CacheCreationTokens) +
1789		modelConfig.CostPer1MOutCached/1e6*float64(usage.CacheReadTokens) +
1790		modelConfig.CostPer1MIn/1e6*float64(usage.InputTokens) +
1791		modelConfig.CostPer1MOut/1e6*float64(usage.OutputTokens)
1792
1793	if !estimated {
1794		a.eventTokensUsed(session.ID, model, usage, cost)
1795	}
1796
1797	if estimated {
1798		cost = 0
1799	} else {
1800		// Use override cost if available (e.g., from OpenRouter).
1801		if overrideCost != nil {
1802			cost = *overrideCost
1803		}
1804
1805		// Skip cost accumulation
1806		if model.FlatRate {
1807			cost = 0
1808		}
1809	}
1810
1811	session.Cost += cost
1812	updateSessionTokenCounters(session, usage)
1813}
1814
1815func updateSessionTokenCounters(session *session.Session, usage fantasy.Usage) {
1816	if usage.OutputTokens != 0 {
1817		session.CompletionTokens = usage.OutputTokens
1818	}
1819	if promptTokens := usage.InputTokens + usage.CacheReadTokens; promptTokens != 0 {
1820		session.PromptTokens = promptTokens
1821	}
1822}
1823
1824func summaryCompletionTokens(usage fantasy.Usage, summaryMessage message.Message) int64 {
1825	if usage.OutputTokens != 0 {
1826		return usage.OutputTokens
1827	}
1828	return approxTokenCount(summaryMessage.Content().Text) + approxTokenCount(summaryMessage.ReasoningContent().String())
1829}
1830
1831func (a *sessionAgent) Cancel(sessionID string) {
1832	// Serialize against the dispatch handoff in Run so the accepted ->
1833	// (cancel-on-entry | queued | active) transition is atomic against
1834	// this cancel. Every cancel observes at least one of: an active
1835	// request, an accepted run (recorded as a pending cancel), or a
1836	// queue entry it then clears. If none of those hold, an idle Escape
1837	// is a true no-op and must not poison the next prompt.
1838	mu := a.sessionMu(sessionID)
1839	mu.Lock()
1840	defer mu.Unlock()
1841
1842	// Cancel regular requests. Don't use Take() here - we need the entry to
1843	// remain in activeRequests so IsBusy() returns true until the goroutine
1844	// fully completes (including error handling that may access the DB).
1845	// The defer in processRequest will clean up the entry.
1846	if cancel, ok := a.activeRequests.Get(sessionID); ok && cancel != nil {
1847		slog.Debug("Request cancellation initiated", "session_id", sessionID)
1848		cancel()
1849	}
1850
1851	// Also check for summarize requests.
1852	if cancel, ok := a.activeRequests.Get(sessionID + "-summarize"); ok && cancel != nil {
1853		slog.Debug("Summarize cancellation initiated", "session_id", sessionID)
1854		cancel()
1855	}
1856
1857	// Record a pending cancel only when a dispatched-but-not-yet-active
1858	// run exists. This catches runs still in the goroutine scheduler or
1859	// about to enter Run's busy-queue branch, while leaving an idle
1860	// session untouched. Active and accepted are not mutually exclusive:
1861	// when a run is active and a follow-up has been accepted, both the
1862	// cancel above and this pending record fire.
1863	//
1864	// Raise the session's cancel mark to the latest accept sequence
1865	// assigned so far. Every prompt currently accepted-but-not-yet-
1866	// active has a sequence at or below that value, so one cancel covers
1867	// all of them; a prompt accepted after this cancel gets a strictly
1868	// higher sequence and is never poisoned. Using max keeps repeated
1869	// cancels idempotent while the same prompts are in flight and lets a
1870	// later cancel extend coverage to prompts accepted since.
1871	a.acceptedMu.Lock()
1872	count, ok := a.acceptedRuns.Get(sessionID)
1873	mark := a.acceptSeqGen
1874	a.acceptedMu.Unlock()
1875	if ok && count > 0 {
1876		slog.Debug("Recording cancel mark for accepted runs", "session_id", sessionID, "count", count, "mark", mark)
1877		existing, _ := a.cancelMark.Get(sessionID)
1878		a.cancelMark.Set(sessionID, max(existing, mark))
1879	}
1880
1881	if a.QueuedPrompts(sessionID) > 0 {
1882		slog.Debug("Clearing queued prompts", "session_id", sessionID)
1883		a.clearQueueAndNotify(sessionID)
1884	}
1885}
1886
1887func (a *sessionAgent) ClearQueue(sessionID string) {
1888	if a.QueuedPrompts(sessionID) > 0 {
1889		slog.Debug("Clearing queued prompts", "session_id", sessionID)
1890		a.clearQueueAndNotify(sessionID)
1891	}
1892}
1893
1894func (a *sessionAgent) CancelAll() {
1895	if !a.IsBusy() {
1896		return
1897	}
1898	for key := range a.activeRequests.Seq2() {
1899		a.Cancel(key) // key is sessionID
1900	}
1901
1902	timeout := time.After(5 * time.Second)
1903	for a.IsBusy() {
1904		select {
1905		case <-timeout:
1906			return
1907		default:
1908			time.Sleep(200 * time.Millisecond)
1909		}
1910	}
1911}
1912
1913func (a *sessionAgent) IsBusy() bool {
1914	var busy bool
1915	for cancelFunc := range a.activeRequests.Seq() {
1916		if cancelFunc != nil {
1917			busy = true
1918			break
1919		}
1920	}
1921	return busy
1922}
1923
1924func (a *sessionAgent) IsSessionBusy(sessionID string) bool {
1925	_, busy := a.activeRequests.Get(sessionID)
1926	return busy
1927}
1928
1929func (a *sessionAgent) QueuedPrompts(sessionID string) int {
1930	l, ok := a.messageQueue.Get(sessionID)
1931	if !ok {
1932		return 0
1933	}
1934	return len(l)
1935}
1936
1937func (a *sessionAgent) QueuedPromptsList(sessionID string) []string {
1938	l, ok := a.messageQueue.Get(sessionID)
1939	if !ok {
1940		return nil
1941	}
1942	prompts := make([]string, len(l))
1943	for i, call := range l {
1944		prompts[i] = call.Prompt
1945	}
1946	return prompts
1947}
1948
1949func (a *sessionAgent) SetModels(large Model, small Model) {
1950	a.largeModel.Set(large)
1951	a.smallModel.Set(small)
1952}
1953
1954func (a *sessionAgent) SetTools(tools []fantasy.AgentTool) {
1955	a.tools.SetSlice(tools)
1956}
1957
1958func (a *sessionAgent) SetSystemPrompt(systemPrompt string) {
1959	a.systemPrompt.Set(systemPrompt)
1960}
1961
1962func (a *sessionAgent) Model() Model {
1963	return a.largeModel.Get()
1964}
1965
1966// convertToToolResult converts a fantasy tool result to a message tool result.
1967func (a *sessionAgent) convertToToolResult(result fantasy.ToolResultContent) message.ToolResult {
1968	baseResult := message.ToolResult{
1969		ToolCallID: result.ToolCallID,
1970		Name:       result.ToolName,
1971		Metadata:   result.ClientMetadata,
1972	}
1973
1974	switch result.Result.GetType() {
1975	case fantasy.ToolResultContentTypeText:
1976		if r, ok := fantasy.AsToolResultOutputType[fantasy.ToolResultOutputContentText](result.Result); ok {
1977			baseResult.Content = r.Text
1978		}
1979	case fantasy.ToolResultContentTypeError:
1980		if r, ok := fantasy.AsToolResultOutputType[fantasy.ToolResultOutputContentError](result.Result); ok {
1981			baseResult.Content = r.Error.Error()
1982			baseResult.IsError = true
1983		}
1984	case fantasy.ToolResultContentTypeMedia:
1985		if r, ok := fantasy.AsToolResultOutputType[fantasy.ToolResultOutputContentMedia](result.Result); ok {
1986			if !stringext.IsValidBase64(r.Data) {
1987				slog.Warn(
1988					"Tool returned media with invalid base64 data, discarding image",
1989					"tool", result.ToolName,
1990					"tool_call_id", result.ToolCallID,
1991				)
1992				baseResult.Content = "Tool returned image data with invalid encoding"
1993				baseResult.IsError = true
1994			} else {
1995				content := r.Text
1996				if content == "" {
1997					content = fmt.Sprintf("Loaded %s content", r.MediaType)
1998				}
1999				baseResult.Content = content
2000				baseResult.Data = r.Data
2001				baseResult.MIMEType = r.MediaType
2002			}
2003		}
2004	}
2005
2006	return baseResult
2007}
2008
2009// workaroundProviderMediaLimitations converts media content in tool results to
2010// user messages for providers that don't natively support images in tool results.
2011//
2012// Problem: OpenAI, Google, OpenRouter, and other OpenAI-compatible providers
2013// don't support sending images/media in tool result messages - they only accept
2014// text in tool results. However, they DO support images in user messages.
2015//
2016// If we send media in tool results to these providers, the API returns an error.
2017//
2018// Solution: For these providers, we:
2019//  1. Replace the media in the tool result with a text placeholder
2020//  2. Inject a user message immediately after with the image as a file attachment
2021//  3. This maintains the tool execution flow while working around API limitations
2022//
2023// Anthropic and Bedrock support images natively in tool results, so we skip
2024// this workaround for them.
2025//
2026// Example transformation:
2027//
2028//	BEFORE: [tool result: image data]
2029//	AFTER:  [tool result: "Image loaded - see attached"], [user: image attachment]
2030func (a *sessionAgent) workaroundProviderMediaLimitations(messages []fantasy.Message, largeModel Model) []fantasy.Message {
2031	providerSupportsMedia := largeModel.ModelCfg.Provider == string(catwalk.InferenceProviderAnthropic) ||
2032		largeModel.ModelCfg.Provider == string(catwalk.InferenceProviderBedrock) ||
2033		largeModel.ModelCfg.Provider == string(catwalk.InferenceProviderBedrockEurope)
2034
2035	if providerSupportsMedia {
2036		return messages
2037	}
2038
2039	convertedMessages := make([]fantasy.Message, 0, len(messages))
2040
2041	for _, msg := range messages {
2042		if msg.Role != fantasy.MessageRoleTool {
2043			convertedMessages = append(convertedMessages, msg)
2044			continue
2045		}
2046
2047		textParts := make([]fantasy.MessagePart, 0, len(msg.Content))
2048		var mediaFiles []fantasy.FilePart
2049
2050		for _, part := range msg.Content {
2051			toolResult, ok := fantasy.AsMessagePart[fantasy.ToolResultPart](part)
2052			if !ok {
2053				textParts = append(textParts, part)
2054				continue
2055			}
2056
2057			if media, ok := fantasy.AsToolResultOutputType[fantasy.ToolResultOutputContentMedia](toolResult.Output); ok {
2058				decoded, err := base64.StdEncoding.DecodeString(media.Data)
2059				if err != nil {
2060					slog.Warn("Failed to decode media data", "error", err)
2061					textParts = append(textParts, part)
2062					continue
2063				}
2064
2065				mediaFiles = append(mediaFiles, fantasy.FilePart{
2066					Data:      decoded,
2067					MediaType: media.MediaType,
2068					Filename:  fmt.Sprintf("tool-result-%s", toolResult.ToolCallID),
2069				})
2070
2071				textParts = append(textParts, fantasy.ToolResultPart{
2072					ToolCallID: toolResult.ToolCallID,
2073					Output: fantasy.ToolResultOutputContentText{
2074						Text: "[Image/media content loaded - see attached file]",
2075					},
2076					ProviderOptions: toolResult.ProviderOptions,
2077				})
2078			} else {
2079				textParts = append(textParts, part)
2080			}
2081		}
2082
2083		convertedMessages = append(convertedMessages, fantasy.Message{
2084			Role:    fantasy.MessageRoleTool,
2085			Content: textParts,
2086		})
2087
2088		if len(mediaFiles) > 0 {
2089			convertedMessages = append(convertedMessages, fantasy.NewUserMessage(
2090				"Here is the media content from the tool result:",
2091				mediaFiles...,
2092			))
2093		}
2094	}
2095
2096	return convertedMessages
2097}
2098
2099// buildSummaryPrompt constructs the prompt text for session summarization.
2100func buildSummaryPrompt(todos []session.Todo) string {
2101	var sb strings.Builder
2102	sb.WriteString("Provide a detailed summary of our conversation above.")
2103	if len(todos) > 0 {
2104		sb.WriteString("\n\n## Current Todo List\n\n")
2105		for _, t := range todos {
2106			fmt.Fprintf(&sb, "- [%s] %s\n", t.Status, t.Content)
2107		}
2108		sb.WriteString("\nInclude these tasks and their statuses in your summary. ")
2109		sb.WriteString("Instruct the resuming assistant to use the `todos` tool to continue tracking progress on these tasks.")
2110	}
2111	return sb.String()
2112}
2113
2114func providerRetryLogFields(err *fantasy.ProviderError, delay time.Duration) []any {
2115	fields := []any{
2116		"retry_delay", delay.String(),
2117	}
2118	if err == nil {
2119		return fields
2120	}
2121	fields = append(fields, "status_code", err.StatusCode)
2122	if err.Title != "" {
2123		fields = append(fields, "title", err.Title)
2124	}
2125	if err.Message != "" {
2126		fields = append(fields, "message", err.Message)
2127	}
2128	return fields
2129}