agent.go

   1// Package agent is the core orchestration layer for Crush AI agents.
   2//
   3// It provides session-based AI agent functionality for managing
   4// conversations, tool execution, and message handling. It coordinates
   5// interactions between language models, messages, sessions, and tools while
   6// handling features like automatic summarization, queuing, and token
   7// management.
   8package agent
   9
  10import (
  11	"cmp"
  12	"context"
  13	_ "embed"
  14	"encoding/base64"
  15	"errors"
  16	"fmt"
  17	"log/slog"
  18	"net/http"
  19	"os"
  20	"regexp"
  21	"strconv"
  22	"strings"
  23	"sync"
  24	"sync/atomic"
  25	"time"
  26
  27	"charm.land/catwalk/pkg/catwalk"
  28	"charm.land/fantasy"
  29	"charm.land/fantasy/providers/anthropic"
  30	"charm.land/fantasy/providers/bedrock"
  31	"charm.land/fantasy/providers/google"
  32	"charm.land/fantasy/providers/openai"
  33	"charm.land/fantasy/providers/openrouter"
  34	"charm.land/fantasy/providers/vercel"
  35	"charm.land/lipgloss/v2"
  36	"github.com/charmbracelet/crush/internal/agent/hyper"
  37	"github.com/charmbracelet/crush/internal/agent/notify"
  38	"github.com/charmbracelet/crush/internal/agent/tools"
  39	"github.com/charmbracelet/crush/internal/agent/tools/mcp"
  40	"github.com/charmbracelet/crush/internal/config"
  41	"github.com/charmbracelet/crush/internal/csync"
  42	"github.com/charmbracelet/crush/internal/message"
  43	"github.com/charmbracelet/crush/internal/pubsub"
  44	"github.com/charmbracelet/crush/internal/session"
  45	"github.com/charmbracelet/crush/internal/stringext"
  46	"github.com/charmbracelet/crush/internal/version"
  47	"github.com/charmbracelet/x/exp/charmtone"
  48)
  49
  50const (
  51	DefaultSessionName = "Untitled Session"
  52
  53	// Constants for auto-summarization thresholds
  54	largeContextWindowThreshold = 200_000
  55	largeContextWindowBuffer    = 20_000
  56	smallContextWindowRatio     = 0.2
  57)
  58
  59var userAgent = fmt.Sprintf("Charm-Crush/%s (https://charm.land/crush)", version.Version)
  60
  61//go:embed templates/title.md
  62var titlePrompt []byte
  63
  64//go:embed templates/summary.md
  65var summaryPrompt []byte
  66
  67// Used to remove <think> tags from generated titles.
  68var (
  69	thinkTagRegex       = regexp.MustCompile(`(?s)<think>.*?</think>`)
  70	orphanThinkTagRegex = regexp.MustCompile(`</?think>`)
  71)
  72
  73type SessionAgentCall struct {
  74	SessionID string
  75	// RunID, when non-empty, is the caller-supplied correlator that
  76	// gets echoed back on the notify.RunComplete event emitted for
  77	// this turn. It is preserved when the call is enqueued behind a
  78	// busy session so the queued turn's terminal event is still
  79	// recognisable to the original caller. Callers that need a
  80	// reliable completion contract (e.g. `crush run` against a
  81	// session that may be busy) MUST set it; SessionID alone is
  82	// ambiguous when concurrent turns share the same session.
  83	RunID            string
  84	Prompt           string
  85	ProviderOptions  fantasy.ProviderOptions
  86	Attachments      []message.Attachment
  87	MaxOutputTokens  int64
  88	Temperature      *float64
  89	TopP             *float64
  90	TopK             *int64
  91	FrequencyPenalty *float64
  92	PresencePenalty  *float64
  93	NonInteractive   bool
  94	// OnComplete, when non-nil, replaces the default RunComplete
  95	// publish path: the inner Run hands the terminal payload to this
  96	// callback instead of emitting it on the RunComplete broker. The
  97	// coordinator uses this hook to coalesce the unauthorized →
  98	// re-auth → retry chain into a single user-visible terminal
  99	// event, so non-interactive clients (e.g. `crush run`) don't
 100	// exit on a stale failed-attempt RunComplete before the
 101	// successful retry. It is intentionally stripped when queueing
 102	// a busy-session call (see Run): the originating
 103	// coordinator.Run has long returned by the time the queued
 104	// recursion drains, so falling back to the default broker
 105	// publish keeps the event visible to subscribers.
 106	OnComplete func(notify.RunComplete)
 107	// Accepted, when non-nil, is the accept reservation taken by
 108	// BeginAccepted before the call was dispatched onto a goroutine
 109	// (the client/server fire-and-forget path). Run consumes it under
 110	// dispatchMu[SessionID] once the accepted -> (cancel-on-entry |
 111	// queued | active) transition has been chosen. When nil
 112	// (in-process / local callers like AppWorkspace), behavior is
 113	// unchanged and no accept tracking applies.
 114	Accepted *AcceptedRun
 115	// acceptSeq carries the accept sequence of the handle that produced
 116	// this call after it has been enqueued and its Accepted handle
 117	// stripped. The queue-drain paths compare it against a session's
 118	// cancel mark so a follow-up queued before a cancel is dropped while
 119	// one queued after the cancel survives. 0 means untracked (an
 120	// in-process enqueue with no accept reservation), which the drain
 121	// paths treat as covered by any present mark, preserving the
 122	// pre-sequence behavior.
 123	acceptSeq uint64
 124}
 125
 126type SessionAgent interface {
 127	Run(context.Context, SessionAgentCall) (*fantasy.AgentResult, error)
 128	BeginAccepted(sessionID string) *AcceptedRun
 129	SetModels(large Model, small Model)
 130	SetTools(tools []fantasy.AgentTool)
 131	SetSystemPrompt(systemPrompt string)
 132	Cancel(sessionID string)
 133	CancelAll()
 134	IsSessionBusy(sessionID string) bool
 135	IsBusy() bool
 136	QueuedPrompts(sessionID string) int
 137	QueuedPromptsList(sessionID string) []string
 138	ClearQueue(sessionID string)
 139	Summarize(context.Context, string, fantasy.ProviderOptions) error
 140	Model() Model
 141}
 142
 143type Model struct {
 144	Model      fantasy.LanguageModel
 145	CatwalkCfg catwalk.Model
 146	ModelCfg   config.SelectedModel
 147	FlatRate   bool
 148}
 149
 150type sessionAgent struct {
 151	largeModel         *csync.Value[Model]
 152	smallModel         *csync.Value[Model]
 153	systemPromptPrefix *csync.Value[string]
 154	systemPrompt       *csync.Value[string]
 155	tools              *csync.Slice[fantasy.AgentTool]
 156
 157	isSubAgent           bool
 158	sessions             session.Service
 159	messages             message.Service
 160	disableAutoSummarize bool
 161	isYolo               bool
 162	notify               pubsub.Publisher[notify.Notification]
 163	runComplete          pubsub.Publisher[notify.RunComplete]
 164
 165	messageQueue   *csync.Map[string, []SessionAgentCall]
 166	activeRequests *csync.Map[string, context.CancelFunc]
 167
 168	// dispatchMu holds a per-session mutex that serializes the
 169	// accepted -> (cancel-on-entry | queued | active) transition in
 170	// Run against a concurrent Cancel. The lock is held only during
 171	// the brief handoff (no DB or LLM I/O under the lock).
 172	dispatchMu *csync.Map[string, *sync.Mutex]
 173	// acceptedRuns counts dispatched-but-not-yet-active runs per
 174	// session. A counter > 0 means a dispatched prompt is in flight
 175	// and has not yet completed the dispatch handoff in Run. Only
 176	// BeginAccepted increments it; only AcceptedRun.Close decrements
 177	// it.
 178	acceptedRuns *csync.Map[string, int]
 179	// cancelMark records, per session, a high-water accept sequence: an
 180	// accepted handle is canceled by it iff the handle's sequence is at
 181	// or below the mark. Cancel raises the mark to the latest sequence
 182	// assigned at cancel time, so a single Cancel covers every prompt
 183	// accepted-but-not-yet-active then, while a prompt accepted later
 184	// (higher sequence) is never poisoned. Absent or 0 means no pending
 185	// cancel. It is only raised by Cancel when acceptedRuns > 0, so an
 186	// idle Escape never records a mark.
 187	cancelMark *csync.Map[string, uint64]
 188	// dispatchMuCreate guards lazy creation of per-session entries in
 189	// dispatchMu so two goroutines can't race to lock different mutex
 190	// instances for the same session.
 191	dispatchMuCreate sync.Mutex
 192	// acceptedMu serializes increments/decrements of acceptedRuns and
 193	// the assignment of accept sequence numbers from acceptSeqGen. It
 194	// is separate from dispatchMu so AcceptedRun.Close (which may run
 195	// while Run holds dispatchMu for the same session) does not
 196	// deadlock by re-entering the dispatch lock.
 197	acceptedMu sync.Mutex
 198	// acceptSeqGen is the monotonic source of accept sequence numbers.
 199	// Each BeginAccepted increments it under acceptedMu and stamps the
 200	// returned handle, so sequences strictly increase in accept order
 201	// across the agent. Cancel uses its current value as the per-session
 202	// high-water mark.
 203	acceptSeqGen uint64
 204}
 205
 206type SessionAgentOptions struct {
 207	LargeModel           Model
 208	SmallModel           Model
 209	SystemPromptPrefix   string
 210	SystemPrompt         string
 211	IsSubAgent           bool
 212	DisableAutoSummarize bool
 213	IsYolo               bool
 214	Sessions             session.Service
 215	Messages             message.Service
 216	Tools                []fantasy.AgentTool
 217	Notify               pubsub.Publisher[notify.Notification]
 218	RunComplete          pubsub.Publisher[notify.RunComplete]
 219}
 220
 221func NewSessionAgent(
 222	opts SessionAgentOptions,
 223) SessionAgent {
 224	return &sessionAgent{
 225		largeModel:           csync.NewValue(opts.LargeModel),
 226		smallModel:           csync.NewValue(opts.SmallModel),
 227		systemPromptPrefix:   csync.NewValue(opts.SystemPromptPrefix),
 228		systemPrompt:         csync.NewValue(opts.SystemPrompt),
 229		isSubAgent:           opts.IsSubAgent,
 230		sessions:             opts.Sessions,
 231		messages:             opts.Messages,
 232		disableAutoSummarize: opts.DisableAutoSummarize,
 233		tools:                csync.NewSliceFrom(opts.Tools),
 234		isYolo:               opts.IsYolo,
 235		notify:               opts.Notify,
 236		runComplete:          opts.RunComplete,
 237		messageQueue:         csync.NewMap[string, []SessionAgentCall](),
 238		activeRequests:       csync.NewMap[string, context.CancelFunc](),
 239		dispatchMu:           csync.NewMap[string, *sync.Mutex](),
 240		acceptedRuns:         csync.NewMap[string, int](),
 241		cancelMark:           csync.NewMap[string, uint64](),
 242	}
 243}
 244
 245// AcceptedRun owns exactly one accept reservation taken by
 246// BeginAccepted. It is the only carrier of accept-state across the
 247// backend.runAgent / Coordinator.Run / sessionAgent.Run layers: a
 248// counter > 0 means a dispatched prompt is in flight and has not yet
 249// completed the dispatch handoff in Run. Close is the only way to
 250// release the reservation and is idempotent.
 251type AcceptedRun struct {
 252	agent     *sessionAgent
 253	sessionID string
 254	// seq is the monotonic accept sequence stamped by BeginAccepted. A
 255	// cancel covers this handle iff seq is at or below the session's
 256	// cancel mark, so a handle accepted after a cancel (higher seq) is
 257	// never poisoned by it.
 258	seq  uint64
 259	done atomic.Bool
 260}
 261
 262// Close decrements the accept counter for this reservation. It is safe
 263// to call multiple times; only the first call has effect.
 264func (r *AcceptedRun) Close() {
 265	if r == nil {
 266		return
 267	}
 268	if !r.done.CompareAndSwap(false, true) {
 269		return
 270	}
 271	r.agent.endAccepted(r.sessionID)
 272}
 273
 274// SessionID exposes the session this reservation is for so the run path
 275// can use it without an extra parameter.
 276func (r *AcceptedRun) SessionID() string {
 277	if r == nil {
 278		return ""
 279	}
 280	return r.sessionID
 281}
 282
 283// BeginAccepted increments the accept counter for sessionID and returns
 284// a handle whose Close is the only way to decrement it. It is the only
 285// entry point that mutates acceptedRuns.
 286func (a *sessionAgent) BeginAccepted(sessionID string) *AcceptedRun {
 287	a.acceptedMu.Lock()
 288	defer a.acceptedMu.Unlock()
 289	count, _ := a.acceptedRuns.Get(sessionID)
 290	a.acceptedRuns.Set(sessionID, count+1)
 291	a.acceptSeqGen++
 292	return &AcceptedRun{agent: a, sessionID: sessionID, seq: a.acceptSeqGen}
 293}
 294
 295// endAccepted decrements the accept counter for sessionID. It is only
 296// called via AcceptedRun.Close. It uses a dedicated lock (not the
 297// per-session dispatch mutex) so it can run while Run holds dispatchMu
 298// for the same session without deadlocking.
 299//
 300// When the count reaches zero the session's cancel mark is dropped: no
 301// accepted handle remains for it to cover, and any handle accepted later
 302// gets a strictly higher sequence that the mark would not match anyway.
 303// Handles canceled on entry never reach RunComplete, so this is the only
 304// place that clears the mark for an all-canceled batch. Sibling handles
 305// covered by the same mark are serialized on the per-session dispatch
 306// mutex and read the mark before they Close, so this never clears it out
 307// from under a covered handle still waiting to enter Run.
 308func (a *sessionAgent) endAccepted(sessionID string) {
 309	a.acceptedMu.Lock()
 310	defer a.acceptedMu.Unlock()
 311	count, ok := a.acceptedRuns.Get(sessionID)
 312	if !ok || count <= 1 {
 313		a.acceptedRuns.Del(sessionID)
 314		a.cancelMark.Del(sessionID)
 315		return
 316	}
 317	a.acceptedRuns.Set(sessionID, count-1)
 318}
 319
 320// sessionMu returns the per-session dispatch mutex, creating it on first
 321// use. Creation is guarded so concurrent callers always observe the same
 322// mutex instance for a given session.
 323func (a *sessionAgent) sessionMu(sessionID string) *sync.Mutex {
 324	if mu, ok := a.dispatchMu.Get(sessionID); ok {
 325		return mu
 326	}
 327	a.dispatchMuCreate.Lock()
 328	defer a.dispatchMuCreate.Unlock()
 329	if mu, ok := a.dispatchMu.Get(sessionID); ok {
 330		return mu
 331	}
 332	mu := &sync.Mutex{}
 333	a.dispatchMu.Set(sessionID, mu)
 334	return mu
 335}
 336
 337// enqueueCall appends call to the session's message queue. The
 338// OnComplete hook is stripped: the caller that supplied it (typically
 339// coordinator.Run) has its own retry/coalesce scope that ends when it
 340// returns, so by the time the queue drains nobody is left to consume the
 341// buffered terminal event. The recursive Run falls back to the default
 342// broker publish, which is what existing subscribers expect for queued
 343// turns.
 344func (a *sessionAgent) enqueueCall(call SessionAgentCall) {
 345	existing, ok := a.messageQueue.Get(call.SessionID)
 346	if !ok {
 347		existing = []SessionAgentCall{}
 348	}
 349	queued := call
 350	if call.Accepted != nil {
 351		// Preserve the accept sequence after the handle is stripped so
 352		// the queue-drain paths can tell a follow-up queued before a
 353		// cancel (covered by the mark) from one queued after it.
 354		queued.acceptSeq = call.Accepted.seq
 355	}
 356	queued.OnComplete = nil
 357	queued.Accepted = nil
 358	existing = append(existing, queued)
 359	a.messageQueue.Set(call.SessionID, existing)
 360}
 361
 362// drainQueueForStep partitions the session's queued calls for the current
 363// streaming step under the per-session dispatch mutex so the filtering is
 364// atomic against a concurrent Cancel: canceledBySeq requires the caller to
 365// hold that mutex, and evaluating it here (rather than after unlocking)
 366// prevents a cancel recorded between the drain and the check from being
 367// observed inconsistently.
 368//
 369// Calls covered by a pending cancel are dropped; the dropped ones that
 370// carry a RunID are returned in canceledWithRunID so the caller can
 371// publish their terminal cancelled RunComplete (a caller waiting on that
 372// RunID, e.g. `crush run`, would otherwise hang). Uncanceled calls without
 373// a RunID are returned in fold to be folded into the active turn,
 374// preserving the existing follow-up behavior. Uncanceled calls that carry
 375// a RunID are left in the queue so each runs as its own turn via the
 376// recursive run path and publishes its own RunComplete, giving every
 377// RunID-bearing prompt an explicit lifecycle instead of being silently
 378// absorbed into another turn. fold is processed by the caller without the
 379// lock held.
 380func (a *sessionAgent) drainQueueForStep(sessionID string) (fold, canceledWithRunID []SessionAgentCall) {
 381	dispatchLock := a.sessionMu(sessionID)
 382	dispatchLock.Lock()
 383	defer dispatchLock.Unlock()
 384	queuedCalls, _ := a.messageQueue.Get(sessionID)
 385	var keep []SessionAgentCall
 386	for _, queued := range queuedCalls {
 387		if a.canceledBySeq(sessionID, queued.acceptSeq) {
 388			if queued.RunID != "" {
 389				canceledWithRunID = append(canceledWithRunID, queued)
 390			}
 391			continue
 392		}
 393		if queued.RunID != "" {
 394			keep = append(keep, queued)
 395			continue
 396		}
 397		fold = append(fold, queued)
 398	}
 399	if len(keep) == 0 {
 400		a.messageQueue.Del(sessionID)
 401	} else {
 402		a.messageQueue.Set(sessionID, keep)
 403	}
 404	return fold, canceledWithRunID
 405}
 406
 407// publishCanceledQueueDrops emits a terminal cancelled RunComplete for
 408// every dropped queued call that carries a RunID. A queued prompt removed
 409// from the queue without ever running — covered by a pending cancel, or
 410// cleared by Cancel/ClearQueue — would otherwise leave a caller blocked on
 411// that RunID: `crush run` ignores live message events and exits only on a
 412// RunComplete whose RunID matches. Calls without a RunID had no such waiter
 413// and are dropped silently as before. A detached, bounded context keeps the
 414// must-deliver publish alive even when the run context that triggered the
 415// drop is already canceled.
 416func (a *sessionAgent) publishCanceledQueueDrops(drops []SessionAgentCall) {
 417	var hasRunID bool
 418	for _, d := range drops {
 419		if d.RunID != "" {
 420			hasRunID = true
 421			break
 422		}
 423	}
 424	if !hasRunID {
 425		return
 426	}
 427	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
 428	defer cancel()
 429	for _, d := range drops {
 430		if d.RunID == "" {
 431			continue
 432		}
 433		a.publishRunComplete(ctx, d, notify.RunComplete{
 434			SessionID: d.SessionID,
 435			RunID:     d.RunID,
 436			Cancelled: true,
 437		})
 438	}
 439}
 440
 441// clearQueueAndNotify removes all queued prompts for the session and
 442// publishes a terminal cancelled RunComplete for any that carried a RunID,
 443// so callers waiting on those RunIDs (e.g. `crush run`) are not left
 444// hanging when their queued prompt is discarded without running.
 445func (a *sessionAgent) clearQueueAndNotify(sessionID string) {
 446	queued, ok := a.messageQueue.Get(sessionID)
 447	a.messageQueue.Del(sessionID)
 448	if !ok {
 449		return
 450	}
 451	a.publishCanceledQueueDrops(queued)
 452}
 453
 454// clearPendingCancel removes any pending-cancel mark for sessionID. It
 455// takes the per-session dispatch lock so it is ordered against Cancel
 456// and the dispatch handoff.
 457func (a *sessionAgent) clearPendingCancel(sessionID string) {
 458	mu := a.sessionMu(sessionID)
 459	mu.Lock()
 460	defer mu.Unlock()
 461	a.cancelMark.Del(sessionID)
 462}
 463
 464// canceledBySeq reports whether an accepted handle or queued call with
 465// the given accept sequence is covered by a pending cancel for the
 466// session. Callers must hold the session's dispatch mutex. A tracked
 467// sequence (seq > 0) is covered only when it is at or below the cancel
 468// high-water mark, so a prompt accepted after the cancel (higher seq) is
 469// never poisoned. An untracked sequence (seq == 0, an in-process enqueue
 470// with no accept reservation) is covered whenever any mark is present,
 471// preserving the pre-sequence behavior. The mark is not consumed: it
 472// stays so every sibling handle it covers observes the same cancel, and
 473// a later handle (higher seq) ignores it regardless.
 474func (a *sessionAgent) canceledBySeq(sessionID string, seq uint64) bool {
 475	mark, ok := a.cancelMark.Get(sessionID)
 476	if !ok || mark == 0 {
 477		return false
 478	}
 479	return seq == 0 || seq <= mark
 480}
 481
 482// persistCanceledTurn writes the user/assistant records for a turn that
 483// was canceled before (or just as) streaming would have produced them.
 484// It creates the user message only when it was not already created by an
 485// earlier createUserMessage call (userMsgCreated), then writes an
 486// assistant message with FinishReasonCanceled. Both writes use
 487// context.WithoutCancel(ctx) so workspace shutdown (which cancels the run
 488// context) can't drop them.
 489func (a *sessionAgent) persistCanceledTurn(ctx context.Context, call SessionAgentCall, userMsgCreated bool) error {
 490	writeCtx, cancel := context.WithTimeout(context.WithoutCancel(ctx), 5*time.Second)
 491	defer cancel()
 492	if !userMsgCreated {
 493		if _, err := a.createUserMessage(writeCtx, call); err != nil {
 494			return err
 495		}
 496	}
 497	largeModel := a.largeModel.Get()
 498	assistant, err := a.messages.Create(writeCtx, call.SessionID, message.CreateMessageParams{
 499		Role:     message.Assistant,
 500		Parts:    []message.ContentPart{},
 501		Model:    largeModel.ModelCfg.Model,
 502		Provider: largeModel.ModelCfg.Provider,
 503	})
 504	if err != nil {
 505		return err
 506	}
 507	assistant.AddFinish(message.FinishReasonCanceled, "User canceled request", "")
 508	return a.messages.Update(writeCtx, assistant)
 509}
 510
 511// publishRunComplete emits the authoritative terminal event for a turn.
 512// It honors the per-call OnComplete hook when set (so the coordinator can
 513// coalesce retries) and otherwise falls back to the RunComplete broker.
 514// ctx is used only for the bounded-blocking must-deliver publish; the
 515// terminal payload is supplied by the caller. This is the single emit path
 516// shared by the streaming defer and the cancel-on-entry early return so a
 517// caller waiting on RunComplete (e.g. `crush run` with a RunID) always
 518// observes exactly one terminal event regardless of which Run branch ends
 519// the turn.
 520func (a *sessionAgent) publishRunComplete(ctx context.Context, call SessionAgentCall, complete notify.RunComplete) {
 521	if call.OnComplete != nil {
 522		call.OnComplete(complete)
 523		return
 524	}
 525	if a.runComplete == nil {
 526		return
 527	}
 528	a.runComplete.PublishMustDeliver(ctx, pubsub.UpdatedEvent, complete)
 529}
 530
 531// ValidateCall performs the cheap structural validation that
 532// sessionAgent.Run requires before a call can be dispatched: a call must
 533// carry either a non-empty prompt or a text attachment, and it must name a
 534// session. It is exported so callers that accept a run before dispatching it
 535// (e.g. backend.SendMessage) can apply the same checks and keep the error
 536// contract consistent.
 537func ValidateCall(call SessionAgentCall) error {
 538	if call.Prompt == "" && !message.ContainsTextAttachment(call.Attachments) {
 539		return ErrEmptyPrompt
 540	}
 541	if call.SessionID == "" {
 542		return ErrSessionMissing
 543	}
 544	return nil
 545}
 546
 547func (a *sessionAgent) Run(ctx context.Context, call SessionAgentCall) (result *fantasy.AgentResult, retErr error) {
 548	if err := ValidateCall(call); err != nil {
 549		return nil, err
 550	}
 551
 552	// genCtx/cancel are the run context and its cancel func. For the
 553	// accepted (fire-and-forget) dispatch path they are created under
 554	// dispatchMu below so a concurrent Cancel can observe the
 555	// activeRequests entry before the assistant message exists. For
 556	// the in-process path they stay nil here and are created later,
 557	// preserving the original ordering.
 558	var (
 559		genCtx           context.Context
 560		cancel           context.CancelFunc
 561		activeRegistered bool
 562		userMsgCreated   bool
 563	)
 564
 565	if call.Accepted != nil {
 566		// Serialize the accepted -> (cancel-on-entry | queued |
 567		// active) transition against a concurrent Cancel. Cancel takes
 568		// the same per-session lock, so every cancel observes at least
 569		// one of: a cancel mark, an activeRequests entry, or a
 570		// messageQueue entry it then clears.
 571		mu := a.sessionMu(call.SessionID)
 572		mu.Lock()
 573
 574		if a.canceledBySeq(call.SessionID, call.Accepted.seq) {
 575			// Cancel-on-entry: a cancel arrived while this run was
 576			// dispatched but not yet active, and this handle's accept
 577			// sequence is at or below the session's cancel mark. The
 578			// mark is left in place so sibling handles it also covers
 579			// observe the same cancel; release the accept reservation,
 580			// drop the lock, and persist a canceled turn without
 581			// entering Stream.
 582			//
 583			// This path returns before the streaming defer that
 584			// publishes RunComplete is installed, so emit the terminal
 585			// event explicitly. Without it, a caller waiting on
 586			// RunComplete for this RunID (e.g. `crush run`, which
 587			// ignores message events and blocks on RunComplete) would
 588			// hang on an immediately-canceled accepted run.
 589			call.Accepted.Close()
 590			mu.Unlock()
 591			complete := notify.RunComplete{
 592				SessionID: call.SessionID,
 593				RunID:     call.RunID,
 594				Cancelled: true,
 595			}
 596			if err := a.persistCanceledTurn(ctx, call, false); err != nil {
 597				complete.Error = err.Error()
 598				a.publishRunComplete(ctx, call, complete)
 599				return nil, err
 600			}
 601			a.publishRunComplete(ctx, call, complete)
 602			return nil, nil
 603		}
 604
 605		if a.IsSessionBusy(call.SessionID) {
 606			// Busy: an earlier prompt is active. Queue this call and
 607			// release the accept reservation. A Cancel arriving after
 608			// this point sees the active entry and clears the queue.
 609			a.enqueueCall(call)
 610			call.Accepted.Close()
 611			mu.Unlock()
 612			return nil, nil
 613		}
 614
 615		// Idle: become the active run. Register the cancel func before
 616		// dropping the lock so a Cancel that arrives between here and
 617		// assistant creation is not lost.
 618		runCtx := context.WithValue(ctx, tools.SessionIDContextKey, call.SessionID)
 619		genCtx, cancel = context.WithCancel(runCtx)
 620		a.activeRequests.Set(call.SessionID, cancel)
 621		activeRegistered = true
 622		call.Accepted.Close()
 623		mu.Unlock()
 624
 625		defer cancel()
 626		defer a.activeRequests.Del(call.SessionID)
 627	} else if a.IsSessionBusy(call.SessionID) {
 628		// Queue the message if busy. Strip OnComplete: the caller that
 629		// supplied the hook (typically coordinator.Run) has its own
 630		// retry/coalesce scope that ends when it returns, so by the time
 631		// the queue drains nobody is left to consume the buffered
 632		// terminal event. The recursive Run will fall back to the
 633		// default broker publish, which is what existing subscribers
 634		// expect for queued turns.
 635		a.enqueueCall(call)
 636		return nil, nil
 637	}
 638
 639	// Copy mutable fields under lock to avoid races with SetTools/SetModels.
 640	agentTools := a.tools.Copy()
 641	largeModel := a.largeModel.Get()
 642	systemPrompt := a.systemPrompt.Get()
 643	promptPrefix := a.systemPromptPrefix.Get()
 644	var instructions strings.Builder
 645
 646	for _, server := range mcp.GetStates() {
 647		if server.State != mcp.StateConnected {
 648			continue
 649		}
 650		if s := server.Client.InitializeResult().Instructions; s != "" {
 651			instructions.WriteString(s)
 652			instructions.WriteString("\n\n")
 653		}
 654	}
 655
 656	if s := instructions.String(); s != "" {
 657		systemPrompt += "\n\n<mcp-instructions>\n" + s + "\n</mcp-instructions>"
 658	}
 659
 660	if len(agentTools) > 0 {
 661		// Add Anthropic caching to the last tool.
 662		agentTools[len(agentTools)-1].SetProviderOptions(a.getCacheControlOptions())
 663	}
 664
 665	agent := fantasy.NewAgent(
 666		largeModel.Model,
 667		fantasy.WithSystemPrompt(systemPrompt),
 668		fantasy.WithTools(agentTools...),
 669		fantasy.WithUserAgent(userAgent),
 670	)
 671
 672	sessionLock := sync.Mutex{}
 673	currentSession, err := a.sessions.Get(ctx, call.SessionID)
 674	if err != nil {
 675		return nil, fmt.Errorf("failed to get session: %w", err)
 676	}
 677
 678	msgs, err := a.getSessionMessages(ctx, currentSession)
 679	if err != nil {
 680		return nil, fmt.Errorf("failed to get session messages: %w", err)
 681	}
 682
 683	var wg sync.WaitGroup
 684	// Generate title if first message.
 685	if len(msgs) == 0 {
 686		titleCtx := ctx // Copy to avoid race with ctx reassignment below.
 687		wg.Go(func() {
 688			a.generateTitle(titleCtx, call.SessionID, call.Prompt)
 689		})
 690	}
 691	defer wg.Wait()
 692
 693	// Add the user message to the session.
 694	_, err = a.createUserMessage(ctx, call)
 695	if err != nil {
 696		return nil, err
 697	}
 698	userMsgCreated = true
 699
 700	// Add the session to the context.
 701	ctx = context.WithValue(ctx, tools.SessionIDContextKey, call.SessionID)
 702
 703	// For the accepted dispatch path the run context and cancel func
 704	// were already created and registered under dispatchMu above; reuse
 705	// them. For the in-process path create them here, preserving the
 706	// original ordering.
 707	if !activeRegistered {
 708		genCtx, cancel = context.WithCancel(ctx)
 709		a.activeRequests.Set(call.SessionID, cancel)
 710
 711		defer cancel()
 712		defer a.activeRequests.Del(call.SessionID)
 713	}
 714	// skipRunComplete is set just before the queued-recursion path so
 715	// the outer Run doesn't publish a RunComplete that would race
 716	// with — and be superseded by — the recursive call's own
 717	// RunComplete (each queued user prompt is its own turn and
 718	// publishes exactly one terminal event).
 719	var skipRunComplete bool
 720	// currentAssistant is declared here so the deferred RunComplete
 721	// publish below can capture the pointer that PrepareStep will
 722	// later (re)assign for each streaming step. The final assistant
 723	// message of the turn is the value reachable through this
 724	// pointer when the defer runs.
 725	var currentAssistant *message.Message
 726	// Drain any debounced message updates before returning. message.Service
 727	// already flushes synchronously on terminal updates, but a defer here
 728	// guarantees the contract at every Run exit (success, error, panic
 729	// recovery upstream) without callers needing to know.
 730	//
 731	// After the flush completes — meaning all per-message
 732	// Publish(UpdatedEvent) calls have fired and been buffered into
 733	// every subscriber's channel — publish the authoritative
 734	// RunComplete event for this turn. The flush-then-publish order
 735	// gives well-behaved clients the best chance of seeing the final
 736	// message event before RunComplete; the embedded Text field
 737	// reconciles for clients that observe the events out of order
 738	// (the pubsub broker fan-in does not serialize publishes from
 739	// different upstream brokers).
 740	defer func() {
 741		// Use a context detached from the run context: workspace
 742		// shutdown cancels ctx before this goroutine returns, but the
 743		// buffered streaming deltas must still land before the DB is
 744		// closed. A short timeout bounds the flush.
 745		flushCtx, flushCancel := context.WithTimeout(context.WithoutCancel(ctx), 5*time.Second)
 746		defer flushCancel()
 747		if flushErr := a.messages.FlushAll(flushCtx); flushErr != nil {
 748			slog.Error("Failed to flush pending message updates after run", "error", flushErr)
 749		}
 750		if skipRunComplete {
 751			return
 752		}
 753		complete := notify.RunComplete{SessionID: call.SessionID, RunID: call.RunID}
 754		if currentAssistant != nil {
 755			complete.MessageID = currentAssistant.ID
 756			complete.Text = currentAssistant.Content().String()
 757		}
 758		if retErr != nil {
 759			complete.Error = retErr.Error()
 760			complete.Cancelled = errors.Is(retErr, context.Canceled)
 761		} else if ctx.Err() != nil {
 762			complete.Cancelled = true
 763		}
 764		// Prefer the per-call hook when supplied so the coordinator
 765		// can coalesce retries (e.g. unauthorized → re-auth → retry)
 766		// into a single user-visible terminal event. The fallback
 767		// must-deliver publish applies bounded-blocking semantics to
 768		// the authoritative terminal event so a momentarily-full
 769		// subscriber channel can't silently drop it and hang
 770		// non-interactive clients waiting on RunComplete.
 771		a.publishRunComplete(ctx, call, complete)
 772	}()
 773
 774	history, files := a.preparePrompt(msgs, largeModel.CatwalkCfg.SupportsImages, call.Attachments...)
 775
 776	startTime := time.Now()
 777	a.eventPromptSent(call.SessionID)
 778
 779	var stepMessages []fantasy.Message
 780	var shouldSummarize bool
 781	// Don't send MaxOutputTokens if 0 — some providers (e.g. LM Studio) reject it
 782	var maxOutputTokens *int64
 783	if call.MaxOutputTokens > 0 {
 784		maxOutputTokens = &call.MaxOutputTokens
 785	}
 786	result, err = agent.Stream(genCtx, fantasy.AgentStreamCall{
 787		Prompt:           message.PromptWithTextAttachments(call.Prompt, call.Attachments),
 788		Files:            files,
 789		Messages:         history,
 790		ProviderOptions:  call.ProviderOptions,
 791		MaxOutputTokens:  maxOutputTokens,
 792		TopP:             call.TopP,
 793		Temperature:      call.Temperature,
 794		PresencePenalty:  call.PresencePenalty,
 795		TopK:             call.TopK,
 796		FrequencyPenalty: call.FrequencyPenalty,
 797		PrepareStep: func(callContext context.Context, options fantasy.PrepareStepFunctionOptions) (_ context.Context, prepared fantasy.PrepareStepResult, err error) {
 798			prepared.Messages = options.Messages
 799			for i := range prepared.Messages {
 800				prepared.Messages[i].ProviderOptions = nil
 801			}
 802
 803			// Use latest tools (updated by SetTools when MCP tools change).
 804			prepared.Tools = a.tools.Copy()
 805
 806			// Drain queued follow-up prompts for this step. Calls covered
 807			// by a cancel recorded while they sat in the queue are dropped:
 808			// a cancel that arrived after a prompt was queued must not let
 809			// it run as part of this step. Coverage is per-call by accept
 810			// sequence so a follow-up queued after the cancel (higher seq)
 811			// is not dropped. A dropped prompt carrying a RunID still gets
 812			// its terminal cancelled RunComplete so a caller waiting on it
 813			// does not hang. Uncanceled prompts without a RunID are folded
 814			// into this turn; uncanceled prompts with a RunID are left
 815			// queued so each runs as its own turn (with its own
 816			// RunComplete) via the recursive run path below.
 817			fold, canceledRunIDs := a.drainQueueForStep(call.SessionID)
 818			a.publishCanceledQueueDrops(canceledRunIDs)
 819			for _, queued := range fold {
 820				userMessage, createErr := a.createUserMessage(callContext, queued)
 821				if createErr != nil {
 822					return callContext, prepared, createErr
 823				}
 824				prepared.Messages = append(prepared.Messages, userMessage.ToAIMessage()...)
 825			}
 826
 827			prepared.Messages = a.workaroundProviderMediaLimitations(prepared.Messages, largeModel)
 828
 829			lastSystemRoleInx := 0
 830			systemMessageUpdated := false
 831			for i, msg := range prepared.Messages {
 832				// Only add cache control to the last message.
 833				if msg.Role == fantasy.MessageRoleSystem {
 834					lastSystemRoleInx = i
 835				} else if !systemMessageUpdated {
 836					prepared.Messages[lastSystemRoleInx].ProviderOptions = a.getCacheControlOptions()
 837					systemMessageUpdated = true
 838				}
 839				// Than add cache control to the last 2 messages.
 840				if i > len(prepared.Messages)-3 {
 841					prepared.Messages[i].ProviderOptions = a.getCacheControlOptions()
 842				}
 843			}
 844
 845			if promptPrefix != "" {
 846				prepared.Messages = append([]fantasy.Message{fantasy.NewSystemMessage(promptPrefix)}, prepared.Messages...)
 847			}
 848
 849			sessionLock.Lock()
 850			stepMessages = cloneFantasyMessages(prepared.Messages)
 851			sessionLock.Unlock()
 852
 853			var assistantMsg message.Message
 854			assistantMsg, err = a.messages.Create(callContext, call.SessionID, message.CreateMessageParams{
 855				Role:     message.Assistant,
 856				Parts:    []message.ContentPart{},
 857				Model:    largeModel.ModelCfg.Model,
 858				Provider: largeModel.ModelCfg.Provider,
 859			})
 860			if err != nil {
 861				return callContext, prepared, err
 862			}
 863			callContext = context.WithValue(callContext, tools.MessageIDContextKey, assistantMsg.ID)
 864			callContext = context.WithValue(callContext, tools.SupportsImagesContextKey, largeModel.CatwalkCfg.SupportsImages)
 865			callContext = context.WithValue(callContext, tools.ModelNameContextKey, largeModel.CatwalkCfg.Name)
 866			currentAssistant = &assistantMsg
 867			return callContext, prepared, err
 868		},
 869		OnReasoningStart: func(id string, reasoning fantasy.ReasoningContent) error {
 870			currentAssistant.AppendReasoningContent(reasoning.Text)
 871			return a.messages.Update(genCtx, *currentAssistant)
 872		},
 873		OnReasoningDelta: func(id string, text string) error {
 874			currentAssistant.AppendReasoningContent(text)
 875			return a.messages.Update(genCtx, *currentAssistant)
 876		},
 877		OnReasoningEnd: func(id string, reasoning fantasy.ReasoningContent) error {
 878			// handle anthropic signature
 879			if anthropicData, ok := reasoning.ProviderMetadata[anthropic.Name]; ok {
 880				if reasoning, ok := anthropicData.(*anthropic.ReasoningOptionMetadata); ok {
 881					currentAssistant.AppendReasoningSignature(reasoning.Signature)
 882				}
 883			}
 884			if googleData, ok := reasoning.ProviderMetadata[google.Name]; ok {
 885				if reasoning, ok := googleData.(*google.ReasoningMetadata); ok {
 886					currentAssistant.AppendThoughtSignature(reasoning.Signature, reasoning.ToolID)
 887				}
 888			}
 889			if openaiData, ok := reasoning.ProviderMetadata[openai.Name]; ok {
 890				if reasoning, ok := openaiData.(*openai.ResponsesReasoningMetadata); ok {
 891					currentAssistant.SetReasoningResponsesData(reasoning)
 892				}
 893			}
 894			currentAssistant.FinishThinking()
 895			return a.messages.Update(genCtx, *currentAssistant)
 896		},
 897		OnTextDelta: func(id string, text string) error {
 898			// Strip leading newline from initial text content. This is is
 899			// particularly important in non-interactive mode where leading
 900			// newlines are very visible.
 901			if len(currentAssistant.Parts) == 0 {
 902				text = strings.TrimPrefix(text, "\n")
 903			}
 904
 905			currentAssistant.AppendContent(text)
 906			return a.messages.Update(genCtx, *currentAssistant)
 907		},
 908		OnToolInputStart: func(id string, toolName string) error {
 909			toolCall := message.ToolCall{
 910				ID:               id,
 911				Name:             toolName,
 912				ProviderExecuted: false,
 913				Finished:         false,
 914			}
 915			currentAssistant.AddToolCall(toolCall)
 916			// Use parent ctx instead of genCtx to ensure the update succeeds
 917			// even if the request is canceled mid-stream
 918			return a.messages.Update(ctx, *currentAssistant)
 919		},
 920		OnRetry: func(err *fantasy.ProviderError, delay time.Duration) {
 921			slog.Warn("Provider request failed, retrying", providerRetryLogFields(err, delay)...)
 922		},
 923		OnToolCall: func(tc fantasy.ToolCallContent) error {
 924			toolCall := message.ToolCall{
 925				ID:               tc.ToolCallID,
 926				Name:             tc.ToolName,
 927				Input:            tc.Input,
 928				ProviderExecuted: false,
 929				Finished:         true,
 930			}
 931			currentAssistant.AddToolCall(toolCall)
 932			// Use parent ctx instead of genCtx to ensure the update succeeds
 933			// even if the request is canceled mid-stream
 934			return a.messages.Update(ctx, *currentAssistant)
 935		},
 936		OnToolResult: func(result fantasy.ToolResultContent) error {
 937			toolResult := a.convertToToolResult(result)
 938			// Use parent ctx instead of genCtx to ensure the message is created
 939			// even if the request is canceled mid-stream
 940			_, createMsgErr := a.messages.Create(ctx, currentAssistant.SessionID, message.CreateMessageParams{
 941				Role: message.Tool,
 942				Parts: []message.ContentPart{
 943					toolResult,
 944				},
 945			})
 946			return createMsgErr
 947		},
 948		OnStepFinish: func(stepResult fantasy.StepResult) error {
 949			finishReason := message.FinishReasonUnknown
 950			switch stepResult.FinishReason {
 951			case fantasy.FinishReasonLength:
 952				finishReason = message.FinishReasonMaxTokens
 953			case fantasy.FinishReasonStop:
 954				finishReason = message.FinishReasonEndTurn
 955			case fantasy.FinishReasonToolCalls:
 956				finishReason = message.FinishReasonToolUse
 957			}
 958			// If a tool result halted the turn (e.g. a hook halt or a
 959			// permission denial), the step ends on FinishReasonToolCalls but
 960			// the model will not be called again. Treat it as the end of the
 961			// turn so the UI can render the assistant footer.
 962			if finishReason == message.FinishReasonToolUse {
 963				for _, tr := range stepResult.Content.ToolResults() {
 964					if tr.StopTurn {
 965						finishReason = message.FinishReasonEndTurn
 966						break
 967					}
 968				}
 969			}
 970			currentAssistant.AddFinish(finishReason, "", "")
 971			sessionLock.Lock()
 972			defer sessionLock.Unlock()
 973
 974			updatedSession, getSessionErr := a.sessions.Get(ctx, call.SessionID)
 975			if getSessionErr != nil {
 976				return getSessionErr
 977			}
 978			usage, estimated := fallbackStepUsage(stepMessages, stepResult)
 979			a.updateSessionUsage(largeModel, &updatedSession, usage, a.openrouterCost(stepResult.ProviderMetadata), estimated)
 980			_, sessionErr := a.sessions.Save(ctx, updatedSession)
 981			if sessionErr != nil {
 982				return sessionErr
 983			}
 984			currentSession = updatedSession
 985			return a.messages.Update(genCtx, *currentAssistant)
 986		},
 987		StopWhen: []fantasy.StopCondition{
 988			func(_ []fantasy.StepResult) bool {
 989				cw := int64(largeModel.CatwalkCfg.ContextWindow)
 990				// If context window is unknown (0), skip auto-summarize
 991				// to avoid immediately truncating custom/local models.
 992				if cw == 0 {
 993					return false
 994				}
 995				tokens := currentSession.CompletionTokens + currentSession.PromptTokens
 996				remaining := cw - tokens
 997				var threshold int64
 998				if cw > largeContextWindowThreshold {
 999					threshold = largeContextWindowBuffer
1000				} else {
1001					threshold = int64(float64(cw) * smallContextWindowRatio)
1002				}
1003				if (remaining <= threshold) && !a.disableAutoSummarize {
1004					shouldSummarize = true
1005					return true
1006				}
1007				return false
1008			},
1009			func(steps []fantasy.StepResult) bool {
1010				return hasRepeatedToolCalls(steps, loopDetectionWindowSize, loopDetectionMaxRepeats)
1011			},
1012		},
1013	})
1014
1015	a.eventPromptResponded(call.SessionID, time.Since(startTime).Truncate(time.Second))
1016
1017	if err != nil {
1018		isHyper := largeModel.ModelCfg.Provider == hyper.Name
1019		isCancelErr := errors.Is(err, context.Canceled)
1020		if currentAssistant == nil {
1021			// Cancel-before-assistant-creation window: the run was
1022			// canceled after activeRequests.Set but before PrepareStep
1023			// created the assistant message. Without this, the turn
1024			// would return with no FinishReasonCanceled marker and no
1025			// user-visible record. The user message was already created
1026			// above, so persistCanceledTurn only writes the assistant
1027			// record.
1028			if isCancelErr {
1029				if persistErr := a.persistCanceledTurn(ctx, call, userMsgCreated); persistErr != nil {
1030					return nil, persistErr
1031				}
1032			}
1033			return result, err
1034		}
1035		// Persist final state with a context detached from the run
1036		// context. The run context (ctx) is derived from the
1037		// workspace context, which workspace shutdown cancels before
1038		// agent goroutines finish; using ctx here would drop the
1039		// final assistant state. WithoutCancel keeps the values
1040		// (e.g. session ID) while ignoring cancellation, and a short
1041		// timeout bounds the cleanup writes.
1042		cleanupCtx, cleanupCancel := context.WithTimeout(context.WithoutCancel(ctx), 5*time.Second)
1043		defer cleanupCancel()
1044		// Ensure we finish thinking on error to close the reasoning state.
1045		currentAssistant.FinishThinking()
1046		toolCalls := currentAssistant.ToolCalls()
1047		// INFO: we use the cleanup context here because the genCtx has been cancelled.
1048		msgs, createErr := a.messages.List(cleanupCtx, currentAssistant.SessionID)
1049		if createErr != nil {
1050			return nil, createErr
1051		}
1052		for _, tc := range toolCalls {
1053			if !tc.Finished {
1054				tc.Finished = true
1055				tc.Input = "{}"
1056				currentAssistant.AddToolCall(tc)
1057				updateErr := a.messages.Update(cleanupCtx, *currentAssistant)
1058				if updateErr != nil {
1059					return nil, updateErr
1060				}
1061			}
1062
1063			found := false
1064			for _, msg := range msgs {
1065				if msg.Role == message.Tool {
1066					for _, tr := range msg.ToolResults() {
1067						if tr.ToolCallID == tc.ID {
1068							found = true
1069							break
1070						}
1071					}
1072				}
1073				if found {
1074					break
1075				}
1076			}
1077			if found {
1078				continue
1079			}
1080			content := "There was an error while executing the tool"
1081			if isCancelErr {
1082				content = "Error: user cancelled assistant tool calling"
1083			}
1084			toolResult := message.ToolResult{
1085				ToolCallID: tc.ID,
1086				Name:       tc.Name,
1087				Content:    content,
1088				IsError:    true,
1089			}
1090			_, createErr = a.messages.Create(cleanupCtx, currentAssistant.SessionID, message.CreateMessageParams{
1091				Role: message.Tool,
1092				Parts: []message.ContentPart{
1093					toolResult,
1094				},
1095			})
1096			if createErr != nil {
1097				return nil, createErr
1098			}
1099		}
1100		var fantasyErr *fantasy.Error
1101		var providerErr *fantasy.ProviderError
1102		const defaultTitle = "Provider Error"
1103		linkStyle := lipgloss.NewStyle().Foreground(charmtone.Guac).Underline(true)
1104		if isCancelErr {
1105			currentAssistant.AddFinish(message.FinishReasonCanceled, "User canceled request", "")
1106		} else if isHyper && errors.As(err, &providerErr) && providerErr.StatusCode == http.StatusUnauthorized {
1107			currentAssistant.AddFinish(message.FinishReasonError, "Unauthorized", `Please re-authenticate with Hyper. You can also run "crush auth" to re-authenticate.`)
1108		} else if isHyper && errors.As(err, &providerErr) && providerErr.StatusCode == http.StatusPaymentRequired {
1109			url := hyper.BaseURL()
1110			link := linkStyle.Hyperlink(url, "id=hyper").Render(url)
1111			currentAssistant.AddFinish(message.FinishReasonError, "No credits", "You're out of credits. Add more at "+link)
1112		} else if errors.As(err, &providerErr) {
1113			if providerErr.Message == "The requested model is not supported." {
1114				url := "https://github.com/settings/copilot/features"
1115				link := linkStyle.Hyperlink(url, "id=copilot").Render(url)
1116				currentAssistant.AddFinish(
1117					message.FinishReasonError,
1118					"Copilot model not enabled",
1119					fmt.Sprintf("%q is not enabled in Copilot. Go to the following page to enable it. Then, wait 5 minutes before trying again. %s", largeModel.CatwalkCfg.Name, link),
1120				)
1121			} else {
1122				currentAssistant.AddFinish(message.FinishReasonError, cmp.Or(stringext.Capitalize(providerErr.Title), defaultTitle), providerErr.Message)
1123			}
1124		} else if errors.As(err, &fantasyErr) {
1125			currentAssistant.AddFinish(message.FinishReasonError, cmp.Or(stringext.Capitalize(fantasyErr.Title), defaultTitle), fantasyErr.Message)
1126		} else {
1127			currentAssistant.AddFinish(message.FinishReasonError, defaultTitle, err.Error())
1128		}
1129		// Note: we use the cleanup context here because the genCtx has been
1130		// cancelled.
1131		updateErr := a.messages.Update(cleanupCtx, *currentAssistant)
1132		if updateErr != nil {
1133			return nil, updateErr
1134		}
1135		return nil, err
1136	}
1137
1138	if shouldSummarize {
1139		a.activeRequests.Del(call.SessionID)
1140		if summarizeErr := a.Summarize(genCtx, call.SessionID, call.ProviderOptions); summarizeErr != nil {
1141			return nil, summarizeErr
1142		}
1143		// If the agent wasn't done...
1144		if len(currentAssistant.ToolCalls()) > 0 {
1145			existing, ok := a.messageQueue.Get(call.SessionID)
1146			if !ok {
1147				existing = []SessionAgentCall{}
1148			}
1149			call.Prompt = fmt.Sprintf("The previous session was interrupted because it got too long, the initial user request was: `%s`", call.Prompt)
1150			existing = append(existing, call)
1151			a.messageQueue.Set(call.SessionID, existing)
1152		}
1153	}
1154
1155	// Release active request before publishing the notification.
1156	// TUI handlers poll IsSessionBusy() and only re-evaluate when a
1157	// tea.Msg arrives, so the cleanup must precede the notify or
1158	// subscribers see stale busy state at the moment of receipt.
1159	a.activeRequests.Del(call.SessionID)
1160	cancel()
1161
1162	// Send notification that agent has finished its turn (skip for
1163	// nested/non-interactive sessions).
1164	if !call.NonInteractive && a.notify != nil {
1165		a.notify.Publish(pubsub.CreatedEvent, notify.Notification{
1166			SessionID:    call.SessionID,
1167			SessionTitle: currentSession.Title,
1168			Type:         notify.TypeAgentFinished,
1169		})
1170	}
1171
1172	// Hand off to the next queued prompt (if any) under dispatchMu so
1173	// the transition from this finished run to the queued run is atomic
1174	// against a concurrent Cancel. activeRequests for this session was
1175	// just deleted above, so without the lock there is a window in
1176	// which the session looks idle and a cancel becomes a no-op that
1177	// fails to stop the queued prompt. Holding the lock lets us observe
1178	// a pending cancel recorded against the session and drop the queue
1179	// instead of running it, and (for the recursion) hand a fresh
1180	// accept reservation to the dequeued call so acceptedRuns stays > 0
1181	// across the recursive Run's own dispatch handoff — keeping the
1182	// session observable to Cancel for the entire transition and
1183	// closing the dequeue -> re-register window.
1184	mu := a.sessionMu(call.SessionID)
1185	mu.Lock()
1186	queuedMessages, _ := a.messageQueue.Get(call.SessionID)
1187	if mark, ok := a.cancelMark.Get(call.SessionID); ok && mark > 0 && len(queuedMessages) > 0 {
1188		// A cancel was recorded for this session (e.g. it arrived while
1189		// this run was active and follow-ups had been queued). Drop the
1190		// queued prompts it covers (accept sequence at or below the
1191		// mark, or untracked); keep any queued after the cancel (higher
1192		// sequence) so they still run.
1193		var kept []SessionAgentCall
1194		var canceledRunIDDrops []SessionAgentCall
1195		for _, q := range queuedMessages {
1196			if q.acceptSeq == 0 || q.acceptSeq <= mark {
1197				if q.RunID != "" {
1198					canceledRunIDDrops = append(canceledRunIDDrops, q)
1199				}
1200				continue
1201			}
1202			kept = append(kept, q)
1203		}
1204		queuedMessages = kept
1205		a.messageQueue.Set(call.SessionID, kept)
1206		// A dropped prompt carrying a RunID must still publish its
1207		// terminal cancelled RunComplete so a caller waiting on that
1208		// RunID does not hang.
1209		a.publishCanceledQueueDrops(canceledRunIDDrops)
1210	}
1211	if len(queuedMessages) == 0 {
1212		// No queued work. Clear the cancel mark only when no accepted
1213		// run remains in flight that it might still cover; otherwise a
1214		// sibling prompt (sequence at or below the mark) waiting to
1215		// enter Run would lose its cancellation. When accepted runs are
1216		// gone, this also clears a stale mark so it can't catch a
1217		// future run.
1218		a.messageQueue.Del(call.SessionID)
1219		a.acceptedMu.Lock()
1220		inFlight, _ := a.acceptedRuns.Get(call.SessionID)
1221		a.acceptedMu.Unlock()
1222		if inFlight == 0 {
1223			a.cancelMark.Del(call.SessionID)
1224		}
1225		mu.Unlock()
1226		return result, err
1227	}
1228	// There are queued messages, restart the loop. Suppress the outer
1229	// defer's emit: it would otherwise observe the recursive Run's retErr
1230	// (named-return clobbering through the return below) against this
1231	// turn's MessageID/Text and publish a mixed, racing event.
1232	skipRunComplete = true
1233	// Decide whether this turn still owes its own terminal RunComplete.
1234	// Each submitted prompt with a RunID has its own lifecycle, so a turn
1235	// that is finished and handing off to a *different* queued prompt must
1236	// publish its own RunComplete here — leaving it to the recursive turn
1237	// (which carries a different RunID) would hang a caller waiting on
1238	// this turn's RunID. The exception is the summarize-continuation path,
1239	// which re-queues this same call (same RunID) to resume after a
1240	// summary; in that case the eventual terminal turn for this RunID
1241	// publishes, so publishing now would double-emit.
1242	outerOwesRunComplete := call.RunID != ""
1243	if outerOwesRunComplete {
1244		for _, q := range queuedMessages {
1245			if q.RunID == call.RunID {
1246				outerOwesRunComplete = false
1247				break
1248			}
1249		}
1250	}
1251	firstQueuedMessage := queuedMessages[0]
1252	a.messageQueue.Set(call.SessionID, queuedMessages[1:])
1253	// Reserve a fresh accept for the dequeued prompt before dropping the
1254	// lock so acceptedRuns > 0 across the handoff into the recursive
1255	// Run. This closes the window between this dequeue and the recursive
1256	// Run registering its activeRequests entry: a cancel arriving in
1257	// that window now records a pending cancel (acceptedRuns > 0) that
1258	// the recursive Run's accepted path observes as cancel-on-entry.
1259	firstQueuedMessage.Accepted = a.BeginAccepted(call.SessionID)
1260	mu.Unlock()
1261	if outerOwesRunComplete {
1262		complete := notify.RunComplete{SessionID: call.SessionID, RunID: call.RunID}
1263		if currentAssistant != nil {
1264			complete.MessageID = currentAssistant.ID
1265			complete.Text = currentAssistant.Content().String()
1266		}
1267		if ctx.Err() != nil {
1268			complete.Cancelled = true
1269		}
1270		a.publishRunComplete(ctx, call, complete)
1271	}
1272	return a.Run(ctx, firstQueuedMessage)
1273}
1274
1275func (a *sessionAgent) Summarize(ctx context.Context, sessionID string, opts fantasy.ProviderOptions) error {
1276	if a.IsSessionBusy(sessionID) {
1277		return ErrSessionBusy
1278	}
1279
1280	// Copy mutable fields under lock to avoid races with SetModels.
1281	largeModel := a.largeModel.Get()
1282	systemPromptPrefix := a.systemPromptPrefix.Get()
1283
1284	currentSession, err := a.sessions.Get(ctx, sessionID)
1285	if err != nil {
1286		return fmt.Errorf("failed to get session: %w", err)
1287	}
1288	msgs, err := a.getSessionMessages(ctx, currentSession)
1289	if err != nil {
1290		return err
1291	}
1292	if len(msgs) == 0 {
1293		// Nothing to summarize.
1294		return nil
1295	}
1296
1297	aiMsgs, _ := a.preparePrompt(msgs, largeModel.CatwalkCfg.SupportsImages)
1298
1299	genCtx, cancel := context.WithCancel(ctx)
1300	a.activeRequests.Set(sessionID, cancel)
1301	defer a.activeRequests.Del(sessionID)
1302	defer cancel()
1303	defer func() {
1304		if flushErr := a.messages.FlushAll(ctx); flushErr != nil {
1305			slog.Error("Failed to flush pending message updates after summarize", "error", flushErr)
1306		}
1307	}()
1308
1309	agent := fantasy.NewAgent(
1310		largeModel.Model,
1311		fantasy.WithSystemPrompt(string(summaryPrompt)),
1312		fantasy.WithUserAgent(userAgent),
1313	)
1314	summaryMessage, err := a.messages.Create(ctx, sessionID, message.CreateMessageParams{
1315		Role:             message.Assistant,
1316		Model:            largeModel.ModelCfg.Model,
1317		Provider:         largeModel.ModelCfg.Provider,
1318		IsSummaryMessage: true,
1319	})
1320	if err != nil {
1321		return err
1322	}
1323
1324	summaryPromptText := buildSummaryPrompt(currentSession.Todos)
1325
1326	resp, err := agent.Stream(genCtx, fantasy.AgentStreamCall{
1327		Prompt:          summaryPromptText,
1328		Messages:        aiMsgs,
1329		ProviderOptions: opts,
1330		PrepareStep: func(callContext context.Context, options fantasy.PrepareStepFunctionOptions) (_ context.Context, prepared fantasy.PrepareStepResult, err error) {
1331			prepared.Messages = options.Messages
1332			if systemPromptPrefix != "" {
1333				prepared.Messages = append([]fantasy.Message{fantasy.NewSystemMessage(systemPromptPrefix)}, prepared.Messages...)
1334			}
1335			return callContext, prepared, nil
1336		},
1337		OnReasoningDelta: func(id string, text string) error {
1338			summaryMessage.AppendReasoningContent(text)
1339			return a.messages.Update(genCtx, summaryMessage)
1340		},
1341		OnReasoningEnd: func(id string, reasoning fantasy.ReasoningContent) error {
1342			// Handle anthropic signature.
1343			if anthropicData, ok := reasoning.ProviderMetadata["anthropic"]; ok {
1344				if signature, ok := anthropicData.(*anthropic.ReasoningOptionMetadata); ok && signature.Signature != "" {
1345					summaryMessage.AppendReasoningSignature(signature.Signature)
1346				}
1347			}
1348			summaryMessage.FinishThinking()
1349			return a.messages.Update(genCtx, summaryMessage)
1350		},
1351		OnTextDelta: func(id, text string) error {
1352			summaryMessage.AppendContent(text)
1353			return a.messages.Update(genCtx, summaryMessage)
1354		},
1355	})
1356	if err != nil {
1357		isCancelErr := errors.Is(err, context.Canceled)
1358		if isCancelErr {
1359			// User cancelled summarize we need to remove the summary message.
1360			deleteErr := a.messages.Delete(ctx, summaryMessage.ID)
1361			return deleteErr
1362		}
1363		// Mark the summary message as finished with an error so the UI
1364		// stops spinning.
1365		summaryMessage.AddFinish(message.FinishReasonError, "Summarization Error", err.Error())
1366		if updateErr := a.messages.Update(ctx, summaryMessage); updateErr != nil {
1367			return updateErr
1368		}
1369		return err
1370	}
1371
1372	summaryMessage.AddFinish(message.FinishReasonEndTurn, "", "")
1373	err = a.messages.Update(genCtx, summaryMessage)
1374	if err != nil {
1375		return err
1376	}
1377
1378	var openrouterCost *float64
1379	for _, step := range resp.Steps {
1380		stepCost := a.openrouterCost(step.ProviderMetadata)
1381		if stepCost != nil {
1382			newCost := *stepCost
1383			if openrouterCost != nil {
1384				newCost += *openrouterCost
1385			}
1386			openrouterCost = &newCost
1387		}
1388	}
1389
1390	a.updateSessionUsage(largeModel, &currentSession, resp.TotalUsage, openrouterCost, false)
1391
1392	// Just in case, get just the last usage info.
1393	usage := resp.Response.Usage
1394	currentSession.SummaryMessageID = summaryMessage.ID
1395	currentSession.CompletionTokens = summaryCompletionTokens(usage, summaryMessage)
1396	currentSession.PromptTokens = 0
1397	currentSession.EstimatedUsage = usageIsZero(usage)
1398	_, err = a.sessions.Save(genCtx, currentSession)
1399	if err != nil {
1400		return err
1401	}
1402
1403	// Release the active request before processing queued messages so that
1404	// Run() does not see the session as busy.
1405	a.activeRequests.Del(sessionID)
1406	cancel()
1407
1408	// Process any messages that were queued while summarizing.
1409	queuedMessages, ok := a.messageQueue.Get(sessionID)
1410	if !ok || len(queuedMessages) == 0 {
1411		return nil
1412	}
1413	firstQueuedMessage := queuedMessages[0]
1414	a.messageQueue.Set(sessionID, queuedMessages[1:])
1415	_, qErr := a.Run(ctx, firstQueuedMessage)
1416	return qErr
1417}
1418
1419func (a *sessionAgent) getCacheControlOptions() fantasy.ProviderOptions {
1420	if t, _ := strconv.ParseBool(os.Getenv("CRUSH_DISABLE_ANTHROPIC_CACHE")); t {
1421		return fantasy.ProviderOptions{}
1422	}
1423	return fantasy.ProviderOptions{
1424		anthropic.Name: &anthropic.ProviderCacheControlOptions{
1425			CacheControl: anthropic.CacheControl{Type: "ephemeral"},
1426		},
1427		bedrock.Name: &anthropic.ProviderCacheControlOptions{
1428			CacheControl: anthropic.CacheControl{Type: "ephemeral"},
1429		},
1430		vercel.Name: &anthropic.ProviderCacheControlOptions{
1431			CacheControl: anthropic.CacheControl{Type: "ephemeral"},
1432		},
1433	}
1434}
1435
1436func (a *sessionAgent) createUserMessage(ctx context.Context, call SessionAgentCall) (message.Message, error) {
1437	parts := []message.ContentPart{message.TextContent{Text: call.Prompt}}
1438	var attachmentParts []message.ContentPart
1439	for _, attachment := range call.Attachments {
1440		attachmentParts = append(attachmentParts, message.BinaryContent{Path: attachment.FilePath, MIMEType: attachment.MimeType, Data: attachment.Content})
1441	}
1442	parts = append(parts, attachmentParts...)
1443	msg, err := a.messages.Create(ctx, call.SessionID, message.CreateMessageParams{
1444		Role:  message.User,
1445		Parts: parts,
1446	})
1447	if err != nil {
1448		return message.Message{}, fmt.Errorf("failed to create user message: %w", err)
1449	}
1450	return msg, nil
1451}
1452
1453func (a *sessionAgent) preparePrompt(msgs []message.Message, supportsImages bool, attachments ...message.Attachment) ([]fantasy.Message, []fantasy.FilePart) {
1454	var history []fantasy.Message
1455	if !a.isSubAgent {
1456		history = append(history, fantasy.NewUserMessage(
1457			fmt.Sprintf(
1458				"<system_reminder>%s</system_reminder>",
1459				`This is a reminder that your todo list is currently empty. DO NOT mention this to the user explicitly because they are already aware.
1460If you are working on tasks that would benefit from a todo list please use the "todos" tool to create one.
1461If not, please feel free to ignore. Again do not mention this message to the user.`,
1462			),
1463		))
1464	}
1465	// Collect all tool call IDs present in assistant messages and all tool
1466	// result IDs present in tool messages. This lets us detect both orphaned
1467	// tool results (result without a call) and orphaned tool calls (call
1468	// without a result).
1469	knownToolCallIDs := make(map[string]struct{})
1470	knownToolResultIDs := make(map[string]struct{})
1471	for _, m := range msgs {
1472		switch m.Role {
1473		case message.Assistant:
1474			for _, tc := range m.ToolCalls() {
1475				knownToolCallIDs[tc.ID] = struct{}{}
1476			}
1477		case message.Tool:
1478			for _, tr := range m.ToolResults() {
1479				knownToolResultIDs[tr.ToolCallID] = struct{}{}
1480			}
1481		}
1482	}
1483
1484	for _, m := range msgs {
1485		if len(m.Parts) == 0 {
1486			continue
1487		}
1488		// Assistant message without content or tool calls (cancelled before it returned anything).
1489		if m.Role == message.Assistant && len(m.ToolCalls()) == 0 && m.Content().Text == "" && m.ReasoningContent().String() == "" {
1490			continue
1491		}
1492		if m.Role == message.Tool {
1493			if msg, ok := filterOrphanedToolResults(m, knownToolCallIDs); ok {
1494				history = append(history, msg)
1495			}
1496			continue
1497		}
1498		aiMsgs := m.ToAIMessage()
1499		if !supportsImages {
1500			for i := range aiMsgs {
1501				if aiMsgs[i].Role == fantasy.MessageRoleUser {
1502					aiMsgs[i].Content = filterFileParts(aiMsgs[i].Content)
1503				}
1504			}
1505		}
1506		history = append(history, aiMsgs...)
1507
1508		if m.Role == message.Assistant {
1509			if msg, ok := syntheticToolResultsForOrphanedCalls(m, knownToolResultIDs); ok {
1510				history = append(history, msg)
1511			}
1512		}
1513	}
1514
1515	var files []fantasy.FilePart
1516	for _, attachment := range attachments {
1517		if attachment.IsText() {
1518			continue
1519		}
1520		files = append(files, fantasy.FilePart{
1521			Filename:  attachment.FileName,
1522			Data:      attachment.Content,
1523			MediaType: attachment.MimeType,
1524		})
1525	}
1526
1527	return history, files
1528}
1529
1530// filterFileParts removes fantasy.FilePart entries from a slice of message
1531// parts. Used to strip image attachments from historical user messages when
1532// the current model does not support them.
1533func filterFileParts(parts []fantasy.MessagePart) []fantasy.MessagePart {
1534	filtered := make([]fantasy.MessagePart, 0, len(parts))
1535	for _, part := range parts {
1536		if _, ok := fantasy.AsMessagePart[fantasy.FilePart](part); ok {
1537			continue
1538		}
1539		filtered = append(filtered, part)
1540	}
1541	return filtered
1542}
1543
1544// filterOrphanedToolResults converts a tool message to a fantasy.Message,
1545// dropping any tool result parts whose tool_call_id has no matching tool call
1546// in the known set. An orphaned result causes API validation to fail on every
1547// subsequent turn, permanently locking the session. Returns the filtered
1548// message and true if at least one valid part remains.
1549func filterOrphanedToolResults(m message.Message, knownToolCallIDs map[string]struct{}) (fantasy.Message, bool) {
1550	aiMsgs := m.ToAIMessage()
1551	if len(aiMsgs) == 0 {
1552		return fantasy.Message{}, false
1553	}
1554	var validParts []fantasy.MessagePart
1555	for _, part := range aiMsgs[0].Content {
1556		tr, ok := fantasy.AsMessagePart[fantasy.ToolResultPart](part)
1557		if !ok {
1558			validParts = append(validParts, part)
1559			continue
1560		}
1561		if _, known := knownToolCallIDs[tr.ToolCallID]; known {
1562			validParts = append(validParts, part)
1563		} else {
1564			slog.Warn(
1565				"Dropping orphaned tool result with no matching tool call",
1566				"tool_call_id", tr.ToolCallID,
1567			)
1568		}
1569	}
1570	if len(validParts) == 0 {
1571		return fantasy.Message{}, false
1572	}
1573	msg := aiMsgs[0]
1574	msg.Content = validParts
1575	return msg, true
1576}
1577
1578// syntheticToolResultsForOrphanedCalls returns a tool message containing
1579// synthetic tool results for any tool calls in the assistant message that
1580// have no matching result in knownToolResultIDs. LLM APIs require every
1581// tool_use to be immediately followed by a tool_result; an interrupted
1582// session can leave orphaned tool_use blocks that permanently lock the
1583// conversation. Returns the message and true if any synthetic results were
1584// produced.
1585func syntheticToolResultsForOrphanedCalls(m message.Message, knownToolResultIDs map[string]struct{}) (fantasy.Message, bool) {
1586	var syntheticParts []fantasy.MessagePart
1587	for _, tc := range m.ToolCalls() {
1588		if _, hasResult := knownToolResultIDs[tc.ID]; hasResult {
1589			continue
1590		}
1591		slog.Warn(
1592			"Injecting synthetic tool result for orphaned tool call",
1593			"tool_call_id", tc.ID,
1594			"tool_name", tc.Name,
1595		)
1596		syntheticParts = append(syntheticParts, fantasy.ToolResultPart{
1597			ToolCallID: tc.ID,
1598			Output: fantasy.ToolResultOutputContentError{
1599				Error: errors.New("tool call was interrupted and did not produce a result, you may retry this call if the result is still needed"),
1600			},
1601		})
1602	}
1603	if len(syntheticParts) == 0 {
1604		return fantasy.Message{}, false
1605	}
1606	return fantasy.Message{
1607		Role:    fantasy.MessageRoleTool,
1608		Content: syntheticParts,
1609	}, true
1610}
1611
1612func (a *sessionAgent) getSessionMessages(ctx context.Context, session session.Session) ([]message.Message, error) {
1613	msgs, err := a.messages.List(ctx, session.ID)
1614	if err != nil {
1615		return nil, fmt.Errorf("failed to list messages: %w", err)
1616	}
1617
1618	if session.SummaryMessageID != "" {
1619		summaryMsgIndex := -1
1620		for i, msg := range msgs {
1621			if msg.ID == session.SummaryMessageID {
1622				summaryMsgIndex = i
1623				break
1624			}
1625		}
1626		if summaryMsgIndex != -1 {
1627			msgs = msgs[summaryMsgIndex:]
1628			msgs[0].Role = message.User
1629		}
1630	}
1631	return msgs, nil
1632}
1633
1634// generateTitle generates a session titled based on the initial prompt.
1635func (a *sessionAgent) generateTitle(ctx context.Context, sessionID string, userPrompt string) {
1636	if userPrompt == "" {
1637		return
1638	}
1639
1640	smallModel := a.smallModel.Get()
1641	largeModel := a.largeModel.Get()
1642	systemPromptPrefix := a.systemPromptPrefix.Get()
1643
1644	var maxOutputTokens int64 = 40
1645	if smallModel.CatwalkCfg.CanReason {
1646		maxOutputTokens = smallModel.CatwalkCfg.DefaultMaxTokens
1647	}
1648
1649	newAgent := func(m fantasy.LanguageModel, p []byte, tok int64) fantasy.Agent {
1650		return fantasy.NewAgent(
1651			m,
1652			fantasy.WithSystemPrompt(string(p)+"\n /no_think"),
1653			fantasy.WithMaxOutputTokens(tok),
1654			fantasy.WithUserAgent(userAgent),
1655		)
1656	}
1657
1658	streamCall := fantasy.AgentStreamCall{
1659		Prompt: fmt.Sprintf("Generate a concise title for the following content:\n\n%s\n <think>\n\n</think>", userPrompt),
1660		PrepareStep: func(callCtx context.Context, opts fantasy.PrepareStepFunctionOptions) (_ context.Context, prepared fantasy.PrepareStepResult, err error) {
1661			prepared.Messages = opts.Messages
1662			if systemPromptPrefix != "" {
1663				prepared.Messages = append([]fantasy.Message{
1664					fantasy.NewSystemMessage(systemPromptPrefix),
1665				}, prepared.Messages...)
1666			}
1667			return callCtx, prepared, nil
1668		},
1669	}
1670
1671	// Use the small model to generate the title.
1672	model := smallModel
1673	agent := newAgent(model.Model, titlePrompt, maxOutputTokens)
1674	resp, err := agent.Stream(ctx, streamCall)
1675	if err == nil {
1676		// We successfully generated a title with the small model.
1677		slog.Debug("Generated title with small model")
1678	} else {
1679		// It didn't work. Let's try with the big model.
1680		slog.Error("Error generating title with small model; trying big model", "err", err)
1681		model = largeModel
1682		agent = newAgent(model.Model, titlePrompt, maxOutputTokens)
1683		resp, err = agent.Stream(ctx, streamCall)
1684		if err == nil {
1685			slog.Debug("Generated title with large model")
1686		} else {
1687			// Welp, the large model didn't work either. Use the default
1688			// session name and return.
1689			slog.Error("Error generating title with large model", "err", err)
1690			saveErr := a.sessions.Rename(ctx, sessionID, DefaultSessionName)
1691			if saveErr != nil {
1692				slog.Error("Failed to save session title", "error", saveErr)
1693			}
1694			return
1695		}
1696	}
1697
1698	if resp == nil {
1699		// Actually, we didn't get a response so we can't. Use the default
1700		// session name and return.
1701		slog.Error("Response is nil; can't generate title")
1702		saveErr := a.sessions.Rename(ctx, sessionID, DefaultSessionName)
1703		if saveErr != nil {
1704			slog.Error("Failed to save session title", "error", saveErr)
1705		}
1706		return
1707	}
1708
1709	// Clean up title.
1710	var title string
1711	title = strings.ReplaceAll(resp.Response.Content.Text(), "\n", " ")
1712
1713	// Remove thinking tags if present.
1714	title = thinkTagRegex.ReplaceAllString(title, "")
1715	title = orphanThinkTagRegex.ReplaceAllString(title, "")
1716
1717	title = strings.TrimSpace(title)
1718	title = cmp.Or(title, DefaultSessionName)
1719
1720	// Calculate usage and cost.
1721	var openrouterCost *float64
1722	for _, step := range resp.Steps {
1723		stepCost := a.openrouterCost(step.ProviderMetadata)
1724		if stepCost != nil {
1725			newCost := *stepCost
1726			if openrouterCost != nil {
1727				newCost += *openrouterCost
1728			}
1729			openrouterCost = &newCost
1730		}
1731	}
1732
1733	modelConfig := model.CatwalkCfg
1734	cost := modelConfig.CostPer1MInCached/1e6*float64(resp.TotalUsage.CacheCreationTokens) +
1735		modelConfig.CostPer1MOutCached/1e6*float64(resp.TotalUsage.CacheReadTokens) +
1736		modelConfig.CostPer1MIn/1e6*float64(resp.TotalUsage.InputTokens) +
1737		modelConfig.CostPer1MOut/1e6*float64(resp.TotalUsage.OutputTokens)
1738
1739	// Use override cost if available (e.g., from OpenRouter).
1740	if openrouterCost != nil {
1741		cost = *openrouterCost
1742	}
1743
1744	// Skip cost accumulation
1745	if model.FlatRate {
1746		cost = 0
1747	}
1748
1749	promptTokens := resp.TotalUsage.InputTokens + resp.TotalUsage.CacheCreationTokens
1750	completionTokens := resp.TotalUsage.OutputTokens
1751
1752	// Atomically update only title and usage fields to avoid overriding other
1753	// concurrent session updates.
1754	saveErr := a.sessions.UpdateTitleAndUsage(ctx, sessionID, title, promptTokens, completionTokens, cost)
1755	if saveErr != nil {
1756		slog.Error("Failed to save session title and usage", "error", saveErr)
1757		return
1758	}
1759}
1760
1761func (a *sessionAgent) openrouterCost(metadata fantasy.ProviderMetadata) *float64 {
1762	openrouterMetadata, ok := metadata[openrouter.Name]
1763	if !ok {
1764		return nil
1765	}
1766
1767	opts, ok := openrouterMetadata.(*openrouter.ProviderMetadata)
1768	if !ok {
1769		return nil
1770	}
1771	return &opts.Usage.Cost
1772}
1773
1774func (a *sessionAgent) updateSessionUsage(model Model, session *session.Session, usage fantasy.Usage, overrideCost *float64, estimated bool) {
1775	if !usageIsZero(usage) {
1776		session.EstimatedUsage = estimated
1777	}
1778
1779	modelConfig := model.CatwalkCfg
1780	cost := modelConfig.CostPer1MInCached/1e6*float64(usage.CacheCreationTokens) +
1781		modelConfig.CostPer1MOutCached/1e6*float64(usage.CacheReadTokens) +
1782		modelConfig.CostPer1MIn/1e6*float64(usage.InputTokens) +
1783		modelConfig.CostPer1MOut/1e6*float64(usage.OutputTokens)
1784
1785	if !estimated {
1786		a.eventTokensUsed(session.ID, model, usage, cost)
1787	}
1788
1789	if estimated {
1790		cost = 0
1791	} else {
1792		// Use override cost if available (e.g., from OpenRouter).
1793		if overrideCost != nil {
1794			cost = *overrideCost
1795		}
1796
1797		// Skip cost accumulation
1798		if model.FlatRate {
1799			cost = 0
1800		}
1801	}
1802
1803	session.Cost += cost
1804	updateSessionTokenCounters(session, usage)
1805}
1806
1807func updateSessionTokenCounters(session *session.Session, usage fantasy.Usage) {
1808	if usage.OutputTokens != 0 {
1809		session.CompletionTokens = usage.OutputTokens
1810	}
1811	if promptTokens := usage.InputTokens + usage.CacheReadTokens; promptTokens != 0 {
1812		session.PromptTokens = promptTokens
1813	}
1814}
1815
1816func summaryCompletionTokens(usage fantasy.Usage, summaryMessage message.Message) int64 {
1817	if usage.OutputTokens != 0 {
1818		return usage.OutputTokens
1819	}
1820	return approxTokenCount(summaryMessage.Content().Text) + approxTokenCount(summaryMessage.ReasoningContent().String())
1821}
1822
1823func (a *sessionAgent) Cancel(sessionID string) {
1824	// Serialize against the dispatch handoff in Run so the accepted ->
1825	// (cancel-on-entry | queued | active) transition is atomic against
1826	// this cancel. Every cancel observes at least one of: an active
1827	// request, an accepted run (recorded as a pending cancel), or a
1828	// queue entry it then clears. If none of those hold, an idle Escape
1829	// is a true no-op and must not poison the next prompt.
1830	mu := a.sessionMu(sessionID)
1831	mu.Lock()
1832	defer mu.Unlock()
1833
1834	// Cancel regular requests. Don't use Take() here - we need the entry to
1835	// remain in activeRequests so IsBusy() returns true until the goroutine
1836	// fully completes (including error handling that may access the DB).
1837	// The defer in processRequest will clean up the entry.
1838	if cancel, ok := a.activeRequests.Get(sessionID); ok && cancel != nil {
1839		slog.Debug("Request cancellation initiated", "session_id", sessionID)
1840		cancel()
1841	}
1842
1843	// Also check for summarize requests.
1844	if cancel, ok := a.activeRequests.Get(sessionID + "-summarize"); ok && cancel != nil {
1845		slog.Debug("Summarize cancellation initiated", "session_id", sessionID)
1846		cancel()
1847	}
1848
1849	// Record a pending cancel only when a dispatched-but-not-yet-active
1850	// run exists. This catches runs still in the goroutine scheduler or
1851	// about to enter Run's busy-queue branch, while leaving an idle
1852	// session untouched. Active and accepted are not mutually exclusive:
1853	// when a run is active and a follow-up has been accepted, both the
1854	// cancel above and this pending record fire.
1855	//
1856	// Raise the session's cancel mark to the latest accept sequence
1857	// assigned so far. Every prompt currently accepted-but-not-yet-
1858	// active has a sequence at or below that value, so one cancel covers
1859	// all of them; a prompt accepted after this cancel gets a strictly
1860	// higher sequence and is never poisoned. Using max keeps repeated
1861	// cancels idempotent while the same prompts are in flight and lets a
1862	// later cancel extend coverage to prompts accepted since.
1863	a.acceptedMu.Lock()
1864	count, ok := a.acceptedRuns.Get(sessionID)
1865	mark := a.acceptSeqGen
1866	a.acceptedMu.Unlock()
1867	if ok && count > 0 {
1868		slog.Debug("Recording cancel mark for accepted runs", "session_id", sessionID, "count", count, "mark", mark)
1869		existing, _ := a.cancelMark.Get(sessionID)
1870		a.cancelMark.Set(sessionID, max(existing, mark))
1871	}
1872
1873	if a.QueuedPrompts(sessionID) > 0 {
1874		slog.Debug("Clearing queued prompts", "session_id", sessionID)
1875		a.clearQueueAndNotify(sessionID)
1876	}
1877}
1878
1879func (a *sessionAgent) ClearQueue(sessionID string) {
1880	if a.QueuedPrompts(sessionID) > 0 {
1881		slog.Debug("Clearing queued prompts", "session_id", sessionID)
1882		a.clearQueueAndNotify(sessionID)
1883	}
1884}
1885
1886func (a *sessionAgent) CancelAll() {
1887	if !a.IsBusy() {
1888		return
1889	}
1890	for key := range a.activeRequests.Seq2() {
1891		a.Cancel(key) // key is sessionID
1892	}
1893
1894	timeout := time.After(5 * time.Second)
1895	for a.IsBusy() {
1896		select {
1897		case <-timeout:
1898			return
1899		default:
1900			time.Sleep(200 * time.Millisecond)
1901		}
1902	}
1903}
1904
1905func (a *sessionAgent) IsBusy() bool {
1906	var busy bool
1907	for cancelFunc := range a.activeRequests.Seq() {
1908		if cancelFunc != nil {
1909			busy = true
1910			break
1911		}
1912	}
1913	return busy
1914}
1915
1916func (a *sessionAgent) IsSessionBusy(sessionID string) bool {
1917	_, busy := a.activeRequests.Get(sessionID)
1918	return busy
1919}
1920
1921func (a *sessionAgent) QueuedPrompts(sessionID string) int {
1922	l, ok := a.messageQueue.Get(sessionID)
1923	if !ok {
1924		return 0
1925	}
1926	return len(l)
1927}
1928
1929func (a *sessionAgent) QueuedPromptsList(sessionID string) []string {
1930	l, ok := a.messageQueue.Get(sessionID)
1931	if !ok {
1932		return nil
1933	}
1934	prompts := make([]string, len(l))
1935	for i, call := range l {
1936		prompts[i] = call.Prompt
1937	}
1938	return prompts
1939}
1940
1941func (a *sessionAgent) SetModels(large Model, small Model) {
1942	a.largeModel.Set(large)
1943	a.smallModel.Set(small)
1944}
1945
1946func (a *sessionAgent) SetTools(tools []fantasy.AgentTool) {
1947	a.tools.SetSlice(tools)
1948}
1949
1950func (a *sessionAgent) SetSystemPrompt(systemPrompt string) {
1951	a.systemPrompt.Set(systemPrompt)
1952}
1953
1954func (a *sessionAgent) Model() Model {
1955	return a.largeModel.Get()
1956}
1957
1958// convertToToolResult converts a fantasy tool result to a message tool result.
1959func (a *sessionAgent) convertToToolResult(result fantasy.ToolResultContent) message.ToolResult {
1960	baseResult := message.ToolResult{
1961		ToolCallID: result.ToolCallID,
1962		Name:       result.ToolName,
1963		Metadata:   result.ClientMetadata,
1964	}
1965
1966	switch result.Result.GetType() {
1967	case fantasy.ToolResultContentTypeText:
1968		if r, ok := fantasy.AsToolResultOutputType[fantasy.ToolResultOutputContentText](result.Result); ok {
1969			baseResult.Content = r.Text
1970		}
1971	case fantasy.ToolResultContentTypeError:
1972		if r, ok := fantasy.AsToolResultOutputType[fantasy.ToolResultOutputContentError](result.Result); ok {
1973			baseResult.Content = r.Error.Error()
1974			baseResult.IsError = true
1975		}
1976	case fantasy.ToolResultContentTypeMedia:
1977		if r, ok := fantasy.AsToolResultOutputType[fantasy.ToolResultOutputContentMedia](result.Result); ok {
1978			if !stringext.IsValidBase64(r.Data) {
1979				slog.Warn(
1980					"Tool returned media with invalid base64 data, discarding image",
1981					"tool", result.ToolName,
1982					"tool_call_id", result.ToolCallID,
1983				)
1984				baseResult.Content = "Tool returned image data with invalid encoding"
1985				baseResult.IsError = true
1986			} else {
1987				content := r.Text
1988				if content == "" {
1989					content = fmt.Sprintf("Loaded %s content", r.MediaType)
1990				}
1991				baseResult.Content = content
1992				baseResult.Data = r.Data
1993				baseResult.MIMEType = r.MediaType
1994			}
1995		}
1996	}
1997
1998	return baseResult
1999}
2000
2001// workaroundProviderMediaLimitations converts media content in tool results to
2002// user messages for providers that don't natively support images in tool results.
2003//
2004// Problem: OpenAI, Google, OpenRouter, and other OpenAI-compatible providers
2005// don't support sending images/media in tool result messages - they only accept
2006// text in tool results. However, they DO support images in user messages.
2007//
2008// If we send media in tool results to these providers, the API returns an error.
2009//
2010// Solution: For these providers, we:
2011//  1. Replace the media in the tool result with a text placeholder
2012//  2. Inject a user message immediately after with the image as a file attachment
2013//  3. This maintains the tool execution flow while working around API limitations
2014//
2015// Anthropic and Bedrock support images natively in tool results, so we skip
2016// this workaround for them.
2017//
2018// Example transformation:
2019//
2020//	BEFORE: [tool result: image data]
2021//	AFTER:  [tool result: "Image loaded - see attached"], [user: image attachment]
2022func (a *sessionAgent) workaroundProviderMediaLimitations(messages []fantasy.Message, largeModel Model) []fantasy.Message {
2023	providerSupportsMedia := largeModel.ModelCfg.Provider == string(catwalk.InferenceProviderAnthropic) ||
2024		largeModel.ModelCfg.Provider == string(catwalk.InferenceProviderBedrock) ||
2025		largeModel.ModelCfg.Provider == string(catwalk.InferenceProviderBedrockEurope)
2026
2027	if providerSupportsMedia {
2028		return messages
2029	}
2030
2031	convertedMessages := make([]fantasy.Message, 0, len(messages))
2032
2033	for _, msg := range messages {
2034		if msg.Role != fantasy.MessageRoleTool {
2035			convertedMessages = append(convertedMessages, msg)
2036			continue
2037		}
2038
2039		textParts := make([]fantasy.MessagePart, 0, len(msg.Content))
2040		var mediaFiles []fantasy.FilePart
2041
2042		for _, part := range msg.Content {
2043			toolResult, ok := fantasy.AsMessagePart[fantasy.ToolResultPart](part)
2044			if !ok {
2045				textParts = append(textParts, part)
2046				continue
2047			}
2048
2049			if media, ok := fantasy.AsToolResultOutputType[fantasy.ToolResultOutputContentMedia](toolResult.Output); ok {
2050				decoded, err := base64.StdEncoding.DecodeString(media.Data)
2051				if err != nil {
2052					slog.Warn("Failed to decode media data", "error", err)
2053					textParts = append(textParts, part)
2054					continue
2055				}
2056
2057				mediaFiles = append(mediaFiles, fantasy.FilePart{
2058					Data:      decoded,
2059					MediaType: media.MediaType,
2060					Filename:  fmt.Sprintf("tool-result-%s", toolResult.ToolCallID),
2061				})
2062
2063				textParts = append(textParts, fantasy.ToolResultPart{
2064					ToolCallID: toolResult.ToolCallID,
2065					Output: fantasy.ToolResultOutputContentText{
2066						Text: "[Image/media content loaded - see attached file]",
2067					},
2068					ProviderOptions: toolResult.ProviderOptions,
2069				})
2070			} else {
2071				textParts = append(textParts, part)
2072			}
2073		}
2074
2075		convertedMessages = append(convertedMessages, fantasy.Message{
2076			Role:    fantasy.MessageRoleTool,
2077			Content: textParts,
2078		})
2079
2080		if len(mediaFiles) > 0 {
2081			convertedMessages = append(convertedMessages, fantasy.NewUserMessage(
2082				"Here is the media content from the tool result:",
2083				mediaFiles...,
2084			))
2085		}
2086	}
2087
2088	return convertedMessages
2089}
2090
2091// buildSummaryPrompt constructs the prompt text for session summarization.
2092func buildSummaryPrompt(todos []session.Todo) string {
2093	var sb strings.Builder
2094	sb.WriteString("Provide a detailed summary of our conversation above.")
2095	if len(todos) > 0 {
2096		sb.WriteString("\n\n## Current Todo List\n\n")
2097		for _, t := range todos {
2098			fmt.Fprintf(&sb, "- [%s] %s\n", t.Status, t.Content)
2099		}
2100		sb.WriteString("\nInclude these tasks and their statuses in your summary. ")
2101		sb.WriteString("Instruct the resuming assistant to use the `todos` tool to continue tracking progress on these tasks.")
2102	}
2103	return sb.String()
2104}
2105
2106func providerRetryLogFields(err *fantasy.ProviderError, delay time.Duration) []any {
2107	fields := []any{
2108		"retry_delay", delay.String(),
2109	}
2110	if err == nil {
2111		return fields
2112	}
2113	fields = append(fields, "status_code", err.StatusCode)
2114	if err.Title != "" {
2115		fields = append(fields, "title", err.Title)
2116	}
2117	if err.Message != "" {
2118		fields = append(fields, "message", err.Message)
2119	}
2120	return fields
2121}