agent.go

   1// Package agent is the core orchestration layer for Crush AI agents.
   2//
   3// It provides session-based AI agent functionality for managing
   4// conversations, tool execution, and message handling. It coordinates
   5// interactions between language models, messages, sessions, and tools while
   6// handling features like automatic summarization, queuing, and token
   7// management.
   8package agent
   9
  10import (
  11	"cmp"
  12	"context"
  13	_ "embed"
  14	"encoding/base64"
  15	"errors"
  16	"fmt"
  17	"log/slog"
  18	"net/http"
  19	"os"
  20	"regexp"
  21	"strconv"
  22	"strings"
  23	"sync"
  24	"sync/atomic"
  25	"time"
  26
  27	"charm.land/catwalk/pkg/catwalk"
  28	"charm.land/fantasy"
  29	"charm.land/fantasy/providers/anthropic"
  30	"charm.land/fantasy/providers/bedrock"
  31	"charm.land/fantasy/providers/google"
  32	"charm.land/fantasy/providers/openai"
  33	"charm.land/fantasy/providers/openrouter"
  34	"charm.land/fantasy/providers/vercel"
  35	"charm.land/lipgloss/v2"
  36	"github.com/charmbracelet/crush/internal/agent/hyper"
  37	"github.com/charmbracelet/crush/internal/agent/notify"
  38	"github.com/charmbracelet/crush/internal/agent/tools"
  39	"github.com/charmbracelet/crush/internal/agent/tools/mcp"
  40	"github.com/charmbracelet/crush/internal/config"
  41	"github.com/charmbracelet/crush/internal/csync"
  42	"github.com/charmbracelet/crush/internal/message"
  43	"github.com/charmbracelet/crush/internal/pubsub"
  44	"github.com/charmbracelet/crush/internal/session"
  45	"github.com/charmbracelet/crush/internal/stringext"
  46	"github.com/charmbracelet/crush/internal/version"
  47	"github.com/charmbracelet/x/exp/charmtone"
  48)
  49
  50const (
  51	DefaultSessionName = "Untitled Session"
  52
  53	// Constants for auto-summarization thresholds
  54	largeContextWindowThreshold = 200_000
  55	largeContextWindowBuffer    = 20_000
  56	smallContextWindowRatio     = 0.2
  57)
  58
  59var userAgent = fmt.Sprintf("Charm-Crush/%s (https://charm.land/crush)", version.Version)
  60
  61//go:embed templates/title.md
  62var titlePrompt []byte
  63
  64//go:embed templates/summary.md
  65var summaryPrompt []byte
  66
  67// Used to remove <think> tags from generated titles.
  68var (
  69	thinkTagRegex       = regexp.MustCompile(`(?s)<think>.*?</think>`)
  70	orphanThinkTagRegex = regexp.MustCompile(`</?think>`)
  71)
  72
  73type SessionAgentCall struct {
  74	SessionID string
  75	// RunID, when non-empty, is the caller-supplied correlator that
  76	// gets echoed back on the notify.RunComplete event emitted for
  77	// this turn. It is preserved when the call is enqueued behind a
  78	// busy session so the queued turn's terminal event is still
  79	// recognisable to the original caller. Callers that need a
  80	// reliable completion contract (e.g. `crush run` against a
  81	// session that may be busy) MUST set it; SessionID alone is
  82	// ambiguous when concurrent turns share the same session.
  83	RunID            string
  84	Prompt           string
  85	ProviderOptions  fantasy.ProviderOptions
  86	Attachments      []message.Attachment
  87	MaxOutputTokens  int64
  88	Temperature      *float64
  89	TopP             *float64
  90	TopK             *int64
  91	FrequencyPenalty *float64
  92	PresencePenalty  *float64
  93	NonInteractive   bool
  94	// OnComplete, when non-nil, replaces the default RunComplete
  95	// publish path: the inner Run hands the terminal payload to this
  96	// callback instead of emitting it on the RunComplete broker. The
  97	// coordinator uses this hook to coalesce the unauthorized →
  98	// re-auth → retry chain into a single user-visible terminal
  99	// event, so non-interactive clients (e.g. `crush run`) don't
 100	// exit on a stale failed-attempt RunComplete before the
 101	// successful retry. It is intentionally stripped when queueing
 102	// a busy-session call (see Run): the originating
 103	// coordinator.Run has long returned by the time the queued
 104	// recursion drains, so falling back to the default broker
 105	// publish keeps the event visible to subscribers.
 106	OnComplete func(notify.RunComplete)
 107	// Accepted, when non-nil, is the accept reservation taken by
 108	// BeginAccepted before the call was dispatched onto a goroutine
 109	// (the client/server fire-and-forget path). Run consumes it under
 110	// dispatchMu[SessionID] once the accepted -> (cancel-on-entry |
 111	// queued | active) transition has been chosen. When nil
 112	// (in-process / local callers like AppWorkspace), behavior is
 113	// unchanged and no accept tracking applies.
 114	Accepted *AcceptedRun
 115	// acceptSeq carries the accept sequence of the handle that produced
 116	// this call after it has been enqueued and its Accepted handle
 117	// stripped. The queue-drain paths compare it against a session's
 118	// cancel mark so a follow-up queued before a cancel is dropped while
 119	// one queued after the cancel survives. 0 means untracked (an
 120	// in-process enqueue with no accept reservation), which the drain
 121	// paths treat as covered by any present mark, preserving the
 122	// pre-sequence behavior.
 123	acceptSeq uint64
 124}
 125
 126type SessionAgent interface {
 127	Run(context.Context, SessionAgentCall) (*fantasy.AgentResult, error)
 128	BeginAccepted(sessionID string) *AcceptedRun
 129	SetModels(large Model, small Model)
 130	SetTools(tools []fantasy.AgentTool)
 131	SetSystemPrompt(systemPrompt string)
 132	Cancel(sessionID string)
 133	CancelAll()
 134	IsSessionBusy(sessionID string) bool
 135	IsBusy() bool
 136	QueuedPrompts(sessionID string) int
 137	QueuedPromptsList(sessionID string) []string
 138	ClearQueue(sessionID string)
 139	Summarize(context.Context, string, fantasy.ProviderOptions) error
 140	Model() Model
 141}
 142
 143type Model struct {
 144	Model      fantasy.LanguageModel
 145	CatwalkCfg catwalk.Model
 146	ModelCfg   config.SelectedModel
 147	FlatRate   bool
 148}
 149
 150type sessionAgent struct {
 151	largeModel         *csync.Value[Model]
 152	smallModel         *csync.Value[Model]
 153	systemPromptPrefix *csync.Value[string]
 154	systemPrompt       *csync.Value[string]
 155	tools              *csync.Slice[fantasy.AgentTool]
 156
 157	isSubAgent           bool
 158	sessions             session.Service
 159	messages             message.Service
 160	disableAutoSummarize bool
 161	isYolo               bool
 162	notify               pubsub.Publisher[notify.Notification]
 163	runComplete          pubsub.Publisher[notify.RunComplete]
 164
 165	messageQueue   *csync.Map[string, []SessionAgentCall]
 166	activeRequests *csync.Map[string, context.CancelFunc]
 167
 168	// dispatchMu holds a per-session mutex that serializes the
 169	// accepted -> (cancel-on-entry | queued | active) transition in
 170	// Run against a concurrent Cancel. The lock is held only during
 171	// the brief handoff (no DB or LLM I/O under the lock).
 172	dispatchMu *csync.Map[string, *sync.Mutex]
 173	// acceptedRuns counts dispatched-but-not-yet-active runs per
 174	// session. A counter > 0 means a dispatched prompt is in flight
 175	// and has not yet completed the dispatch handoff in Run. Only
 176	// BeginAccepted increments it; only AcceptedRun.Close decrements
 177	// it.
 178	acceptedRuns *csync.Map[string, int]
 179	// cancelMark records, per session, a high-water accept sequence: an
 180	// accepted handle is canceled by it iff the handle's sequence is at
 181	// or below the mark. Cancel raises the mark to the latest sequence
 182	// assigned at cancel time, so a single Cancel covers every prompt
 183	// accepted-but-not-yet-active then, while a prompt accepted later
 184	// (higher sequence) is never poisoned. Absent or 0 means no pending
 185	// cancel. It is only raised by Cancel when acceptedRuns > 0, so an
 186	// idle Escape never records a mark.
 187	cancelMark *csync.Map[string, uint64]
 188	// dispatchMuCreate guards lazy creation of per-session entries in
 189	// dispatchMu so two goroutines can't race to lock different mutex
 190	// instances for the same session.
 191	dispatchMuCreate sync.Mutex
 192	// acceptedMu serializes increments/decrements of acceptedRuns and
 193	// the assignment of accept sequence numbers from acceptSeqGen. It
 194	// is separate from dispatchMu so AcceptedRun.Close (which may run
 195	// while Run holds dispatchMu for the same session) does not
 196	// deadlock by re-entering the dispatch lock.
 197	acceptedMu sync.Mutex
 198	// acceptSeqGen is the monotonic source of accept sequence numbers.
 199	// Each BeginAccepted increments it under acceptedMu and stamps the
 200	// returned handle, so sequences strictly increase in accept order
 201	// across the agent. Cancel uses its current value as the per-session
 202	// high-water mark.
 203	acceptSeqGen uint64
 204}
 205
 206type SessionAgentOptions struct {
 207	LargeModel           Model
 208	SmallModel           Model
 209	SystemPromptPrefix   string
 210	SystemPrompt         string
 211	IsSubAgent           bool
 212	DisableAutoSummarize bool
 213	IsYolo               bool
 214	Sessions             session.Service
 215	Messages             message.Service
 216	Tools                []fantasy.AgentTool
 217	Notify               pubsub.Publisher[notify.Notification]
 218	RunComplete          pubsub.Publisher[notify.RunComplete]
 219}
 220
 221func NewSessionAgent(
 222	opts SessionAgentOptions,
 223) SessionAgent {
 224	return &sessionAgent{
 225		largeModel:           csync.NewValue(opts.LargeModel),
 226		smallModel:           csync.NewValue(opts.SmallModel),
 227		systemPromptPrefix:   csync.NewValue(opts.SystemPromptPrefix),
 228		systemPrompt:         csync.NewValue(opts.SystemPrompt),
 229		isSubAgent:           opts.IsSubAgent,
 230		sessions:             opts.Sessions,
 231		messages:             opts.Messages,
 232		disableAutoSummarize: opts.DisableAutoSummarize,
 233		tools:                csync.NewSliceFrom(opts.Tools),
 234		isYolo:               opts.IsYolo,
 235		notify:               opts.Notify,
 236		runComplete:          opts.RunComplete,
 237		messageQueue:         csync.NewMap[string, []SessionAgentCall](),
 238		activeRequests:       csync.NewMap[string, context.CancelFunc](),
 239		dispatchMu:           csync.NewMap[string, *sync.Mutex](),
 240		acceptedRuns:         csync.NewMap[string, int](),
 241		cancelMark:           csync.NewMap[string, uint64](),
 242	}
 243}
 244
 245// AcceptedRun owns exactly one accept reservation taken by
 246// BeginAccepted. It is the only carrier of accept-state across the
 247// backend.runAgent / Coordinator.Run / sessionAgent.Run layers: a
 248// counter > 0 means a dispatched prompt is in flight and has not yet
 249// completed the dispatch handoff in Run. Close is the only way to
 250// release the reservation and is idempotent.
 251type AcceptedRun struct {
 252	agent     *sessionAgent
 253	sessionID string
 254	// seq is the monotonic accept sequence stamped by BeginAccepted. A
 255	// cancel covers this handle iff seq is at or below the session's
 256	// cancel mark, so a handle accepted after a cancel (higher seq) is
 257	// never poisoned by it.
 258	seq  uint64
 259	done atomic.Bool
 260}
 261
 262// Close decrements the accept counter for this reservation. It is safe
 263// to call multiple times; only the first call has effect.
 264func (r *AcceptedRun) Close() {
 265	if r == nil {
 266		return
 267	}
 268	if !r.done.CompareAndSwap(false, true) {
 269		return
 270	}
 271	r.agent.endAccepted(r.sessionID)
 272}
 273
 274// SessionID exposes the session this reservation is for so the run path
 275// can use it without an extra parameter.
 276func (r *AcceptedRun) SessionID() string {
 277	if r == nil {
 278		return ""
 279	}
 280	return r.sessionID
 281}
 282
 283// BeginAccepted increments the accept counter for sessionID and returns
 284// a handle whose Close is the only way to decrement it. It is the only
 285// entry point that mutates acceptedRuns.
 286func (a *sessionAgent) BeginAccepted(sessionID string) *AcceptedRun {
 287	a.acceptedMu.Lock()
 288	defer a.acceptedMu.Unlock()
 289	count, _ := a.acceptedRuns.Get(sessionID)
 290	a.acceptedRuns.Set(sessionID, count+1)
 291	a.acceptSeqGen++
 292	return &AcceptedRun{agent: a, sessionID: sessionID, seq: a.acceptSeqGen}
 293}
 294
 295// endAccepted decrements the accept counter for sessionID. It is only
 296// called via AcceptedRun.Close. It uses a dedicated lock (not the
 297// per-session dispatch mutex) so it can run while Run holds dispatchMu
 298// for the same session without deadlocking.
 299//
 300// When the count reaches zero the session's cancel mark is dropped: no
 301// accepted handle remains for it to cover, and any handle accepted later
 302// gets a strictly higher sequence that the mark would not match anyway.
 303// Handles canceled on entry never reach RunComplete, so this is the only
 304// place that clears the mark for an all-canceled batch. Sibling handles
 305// covered by the same mark are serialized on the per-session dispatch
 306// mutex and read the mark before they Close, so this never clears it out
 307// from under a covered handle still waiting to enter Run.
 308func (a *sessionAgent) endAccepted(sessionID string) {
 309	a.acceptedMu.Lock()
 310	defer a.acceptedMu.Unlock()
 311	count, ok := a.acceptedRuns.Get(sessionID)
 312	if !ok || count <= 1 {
 313		a.acceptedRuns.Del(sessionID)
 314		a.cancelMark.Del(sessionID)
 315		return
 316	}
 317	a.acceptedRuns.Set(sessionID, count-1)
 318}
 319
 320// sessionMu returns the per-session dispatch mutex, creating it on first
 321// use. Creation is guarded so concurrent callers always observe the same
 322// mutex instance for a given session.
 323func (a *sessionAgent) sessionMu(sessionID string) *sync.Mutex {
 324	if mu, ok := a.dispatchMu.Get(sessionID); ok {
 325		return mu
 326	}
 327	a.dispatchMuCreate.Lock()
 328	defer a.dispatchMuCreate.Unlock()
 329	if mu, ok := a.dispatchMu.Get(sessionID); ok {
 330		return mu
 331	}
 332	mu := &sync.Mutex{}
 333	a.dispatchMu.Set(sessionID, mu)
 334	return mu
 335}
 336
 337// enqueueCall appends call to the session's message queue. The
 338// OnComplete hook is stripped: the caller that supplied it (typically
 339// coordinator.Run) has its own retry/coalesce scope that ends when it
 340// returns, so by the time the queue drains nobody is left to consume the
 341// buffered terminal event. The recursive Run falls back to the default
 342// broker publish, which is what existing subscribers expect for queued
 343// turns.
 344func (a *sessionAgent) enqueueCall(call SessionAgentCall) {
 345	existing, ok := a.messageQueue.Get(call.SessionID)
 346	if !ok {
 347		existing = []SessionAgentCall{}
 348	}
 349	queued := call
 350	if call.Accepted != nil {
 351		// Preserve the accept sequence after the handle is stripped so
 352		// the queue-drain paths can tell a follow-up queued before a
 353		// cancel (covered by the mark) from one queued after it.
 354		queued.acceptSeq = call.Accepted.seq
 355	}
 356	queued.OnComplete = nil
 357	queued.Accepted = nil
 358	existing = append(existing, queued)
 359	a.messageQueue.Set(call.SessionID, existing)
 360}
 361
 362// drainUncanceledQueue copies and clears the session's message queue and
 363// returns the queued calls not covered by a pending cancel. The queue
 364// copy/delete and the cancel-mark check happen under the per-session
 365// dispatch mutex so the filtering is atomic against a concurrent Cancel:
 366// canceledBySeq requires the caller to hold that mutex, and evaluating it
 367// here (rather than after unlocking) prevents a cancel recorded between
 368// the drain and the check from being observed inconsistently. The
 369// returned survivors are processed by the caller without the lock held.
 370func (a *sessionAgent) drainUncanceledQueue(sessionID string) []SessionAgentCall {
 371	dispatchLock := a.sessionMu(sessionID)
 372	dispatchLock.Lock()
 373	defer dispatchLock.Unlock()
 374	queuedCalls, _ := a.messageQueue.Get(sessionID)
 375	a.messageQueue.Del(sessionID)
 376	survivors := queuedCalls[:0]
 377	for _, queued := range queuedCalls {
 378		if a.canceledBySeq(sessionID, queued.acceptSeq) {
 379			continue
 380		}
 381		survivors = append(survivors, queued)
 382	}
 383	return survivors
 384}
 385
 386// clearPendingCancel removes any pending-cancel mark for sessionID. It
 387// takes the per-session dispatch lock so it is ordered against Cancel
 388// and the dispatch handoff.
 389func (a *sessionAgent) clearPendingCancel(sessionID string) {
 390	mu := a.sessionMu(sessionID)
 391	mu.Lock()
 392	defer mu.Unlock()
 393	a.cancelMark.Del(sessionID)
 394}
 395
 396// canceledBySeq reports whether an accepted handle or queued call with
 397// the given accept sequence is covered by a pending cancel for the
 398// session. Callers must hold the session's dispatch mutex. A tracked
 399// sequence (seq > 0) is covered only when it is at or below the cancel
 400// high-water mark, so a prompt accepted after the cancel (higher seq) is
 401// never poisoned. An untracked sequence (seq == 0, an in-process enqueue
 402// with no accept reservation) is covered whenever any mark is present,
 403// preserving the pre-sequence behavior. The mark is not consumed: it
 404// stays so every sibling handle it covers observes the same cancel, and
 405// a later handle (higher seq) ignores it regardless.
 406func (a *sessionAgent) canceledBySeq(sessionID string, seq uint64) bool {
 407	mark, ok := a.cancelMark.Get(sessionID)
 408	if !ok || mark == 0 {
 409		return false
 410	}
 411	return seq == 0 || seq <= mark
 412}
 413
 414// persistCanceledTurn writes the user/assistant records for a turn that
 415// was canceled before (or just as) streaming would have produced them.
 416// It creates the user message only when it was not already created by an
 417// earlier createUserMessage call (userMsgCreated), then writes an
 418// assistant message with FinishReasonCanceled. Both writes use
 419// context.WithoutCancel(ctx) so workspace shutdown (which cancels the run
 420// context) can't drop them.
 421func (a *sessionAgent) persistCanceledTurn(ctx context.Context, call SessionAgentCall, userMsgCreated bool) error {
 422	writeCtx, cancel := context.WithTimeout(context.WithoutCancel(ctx), 5*time.Second)
 423	defer cancel()
 424	if !userMsgCreated {
 425		if _, err := a.createUserMessage(writeCtx, call); err != nil {
 426			return err
 427		}
 428	}
 429	largeModel := a.largeModel.Get()
 430	assistant, err := a.messages.Create(writeCtx, call.SessionID, message.CreateMessageParams{
 431		Role:     message.Assistant,
 432		Parts:    []message.ContentPart{},
 433		Model:    largeModel.ModelCfg.Model,
 434		Provider: largeModel.ModelCfg.Provider,
 435	})
 436	if err != nil {
 437		return err
 438	}
 439	assistant.AddFinish(message.FinishReasonCanceled, "User canceled request", "")
 440	return a.messages.Update(writeCtx, assistant)
 441}
 442
 443// publishRunComplete emits the authoritative terminal event for a turn.
 444// It honors the per-call OnComplete hook when set (so the coordinator can
 445// coalesce retries) and otherwise falls back to the RunComplete broker.
 446// ctx is used only for the bounded-blocking must-deliver publish; the
 447// terminal payload is supplied by the caller. This is the single emit path
 448// shared by the streaming defer and the cancel-on-entry early return so a
 449// caller waiting on RunComplete (e.g. `crush run` with a RunID) always
 450// observes exactly one terminal event regardless of which Run branch ends
 451// the turn.
 452func (a *sessionAgent) publishRunComplete(ctx context.Context, call SessionAgentCall, complete notify.RunComplete) {
 453	if call.OnComplete != nil {
 454		call.OnComplete(complete)
 455		return
 456	}
 457	if a.runComplete == nil {
 458		return
 459	}
 460	a.runComplete.PublishMustDeliver(ctx, pubsub.UpdatedEvent, complete)
 461}
 462
 463// ValidateCall performs the cheap structural validation that
 464// sessionAgent.Run requires before a call can be dispatched: a call must
 465// carry either a non-empty prompt or a text attachment, and it must name a
 466// session. It is exported so callers that accept a run before dispatching it
 467// (e.g. backend.SendMessage) can apply the same checks and keep the error
 468// contract consistent.
 469func ValidateCall(call SessionAgentCall) error {
 470	if call.Prompt == "" && !message.ContainsTextAttachment(call.Attachments) {
 471		return ErrEmptyPrompt
 472	}
 473	if call.SessionID == "" {
 474		return ErrSessionMissing
 475	}
 476	return nil
 477}
 478
 479func (a *sessionAgent) Run(ctx context.Context, call SessionAgentCall) (result *fantasy.AgentResult, retErr error) {
 480	if err := ValidateCall(call); err != nil {
 481		return nil, err
 482	}
 483
 484	// genCtx/cancel are the run context and its cancel func. For the
 485	// accepted (fire-and-forget) dispatch path they are created under
 486	// dispatchMu below so a concurrent Cancel can observe the
 487	// activeRequests entry before the assistant message exists. For
 488	// the in-process path they stay nil here and are created later,
 489	// preserving the original ordering.
 490	var (
 491		genCtx           context.Context
 492		cancel           context.CancelFunc
 493		activeRegistered bool
 494		userMsgCreated   bool
 495	)
 496
 497	if call.Accepted != nil {
 498		// Serialize the accepted -> (cancel-on-entry | queued |
 499		// active) transition against a concurrent Cancel. Cancel takes
 500		// the same per-session lock, so every cancel observes at least
 501		// one of: a cancel mark, an activeRequests entry, or a
 502		// messageQueue entry it then clears.
 503		mu := a.sessionMu(call.SessionID)
 504		mu.Lock()
 505
 506		if a.canceledBySeq(call.SessionID, call.Accepted.seq) {
 507			// Cancel-on-entry: a cancel arrived while this run was
 508			// dispatched but not yet active, and this handle's accept
 509			// sequence is at or below the session's cancel mark. The
 510			// mark is left in place so sibling handles it also covers
 511			// observe the same cancel; release the accept reservation,
 512			// drop the lock, and persist a canceled turn without
 513			// entering Stream.
 514			//
 515			// This path returns before the streaming defer that
 516			// publishes RunComplete is installed, so emit the terminal
 517			// event explicitly. Without it, a caller waiting on
 518			// RunComplete for this RunID (e.g. `crush run`, which
 519			// ignores message events and blocks on RunComplete) would
 520			// hang on an immediately-canceled accepted run.
 521			call.Accepted.Close()
 522			mu.Unlock()
 523			complete := notify.RunComplete{
 524				SessionID: call.SessionID,
 525				RunID:     call.RunID,
 526				Cancelled: true,
 527			}
 528			if err := a.persistCanceledTurn(ctx, call, false); err != nil {
 529				complete.Error = err.Error()
 530				a.publishRunComplete(ctx, call, complete)
 531				return nil, err
 532			}
 533			a.publishRunComplete(ctx, call, complete)
 534			return nil, nil
 535		}
 536
 537		if a.IsSessionBusy(call.SessionID) {
 538			// Busy: an earlier prompt is active. Queue this call and
 539			// release the accept reservation. A Cancel arriving after
 540			// this point sees the active entry and clears the queue.
 541			a.enqueueCall(call)
 542			call.Accepted.Close()
 543			mu.Unlock()
 544			return nil, nil
 545		}
 546
 547		// Idle: become the active run. Register the cancel func before
 548		// dropping the lock so a Cancel that arrives between here and
 549		// assistant creation is not lost.
 550		runCtx := context.WithValue(ctx, tools.SessionIDContextKey, call.SessionID)
 551		genCtx, cancel = context.WithCancel(runCtx)
 552		a.activeRequests.Set(call.SessionID, cancel)
 553		activeRegistered = true
 554		call.Accepted.Close()
 555		mu.Unlock()
 556
 557		defer cancel()
 558		defer a.activeRequests.Del(call.SessionID)
 559	} else if a.IsSessionBusy(call.SessionID) {
 560		// Queue the message if busy. Strip OnComplete: the caller that
 561		// supplied the hook (typically coordinator.Run) has its own
 562		// retry/coalesce scope that ends when it returns, so by the time
 563		// the queue drains nobody is left to consume the buffered
 564		// terminal event. The recursive Run will fall back to the
 565		// default broker publish, which is what existing subscribers
 566		// expect for queued turns.
 567		a.enqueueCall(call)
 568		return nil, nil
 569	}
 570
 571	// Copy mutable fields under lock to avoid races with SetTools/SetModels.
 572	agentTools := a.tools.Copy()
 573	largeModel := a.largeModel.Get()
 574	systemPrompt := a.systemPrompt.Get()
 575	promptPrefix := a.systemPromptPrefix.Get()
 576	var instructions strings.Builder
 577
 578	for _, server := range mcp.GetStates() {
 579		if server.State != mcp.StateConnected {
 580			continue
 581		}
 582		if s := server.Client.InitializeResult().Instructions; s != "" {
 583			instructions.WriteString(s)
 584			instructions.WriteString("\n\n")
 585		}
 586	}
 587
 588	if s := instructions.String(); s != "" {
 589		systemPrompt += "\n\n<mcp-instructions>\n" + s + "\n</mcp-instructions>"
 590	}
 591
 592	if len(agentTools) > 0 {
 593		// Add Anthropic caching to the last tool.
 594		agentTools[len(agentTools)-1].SetProviderOptions(a.getCacheControlOptions())
 595	}
 596
 597	agent := fantasy.NewAgent(
 598		largeModel.Model,
 599		fantasy.WithSystemPrompt(systemPrompt),
 600		fantasy.WithTools(agentTools...),
 601		fantasy.WithUserAgent(userAgent),
 602	)
 603
 604	sessionLock := sync.Mutex{}
 605	currentSession, err := a.sessions.Get(ctx, call.SessionID)
 606	if err != nil {
 607		return nil, fmt.Errorf("failed to get session: %w", err)
 608	}
 609
 610	msgs, err := a.getSessionMessages(ctx, currentSession)
 611	if err != nil {
 612		return nil, fmt.Errorf("failed to get session messages: %w", err)
 613	}
 614
 615	var wg sync.WaitGroup
 616	// Generate title if first message.
 617	if len(msgs) == 0 {
 618		titleCtx := ctx // Copy to avoid race with ctx reassignment below.
 619		wg.Go(func() {
 620			a.generateTitle(titleCtx, call.SessionID, call.Prompt)
 621		})
 622	}
 623	defer wg.Wait()
 624
 625	// Add the user message to the session.
 626	_, err = a.createUserMessage(ctx, call)
 627	if err != nil {
 628		return nil, err
 629	}
 630	userMsgCreated = true
 631
 632	// Add the session to the context.
 633	ctx = context.WithValue(ctx, tools.SessionIDContextKey, call.SessionID)
 634
 635	// For the accepted dispatch path the run context and cancel func
 636	// were already created and registered under dispatchMu above; reuse
 637	// them. For the in-process path create them here, preserving the
 638	// original ordering.
 639	if !activeRegistered {
 640		genCtx, cancel = context.WithCancel(ctx)
 641		a.activeRequests.Set(call.SessionID, cancel)
 642
 643		defer cancel()
 644		defer a.activeRequests.Del(call.SessionID)
 645	}
 646	// skipRunComplete is set just before the queued-recursion path so
 647	// the outer Run doesn't publish a RunComplete that would race
 648	// with — and be superseded by — the recursive call's own
 649	// RunComplete (each queued user prompt is its own turn and
 650	// publishes exactly one terminal event).
 651	var skipRunComplete bool
 652	// currentAssistant is declared here so the deferred RunComplete
 653	// publish below can capture the pointer that PrepareStep will
 654	// later (re)assign for each streaming step. The final assistant
 655	// message of the turn is the value reachable through this
 656	// pointer when the defer runs.
 657	var currentAssistant *message.Message
 658	// Drain any debounced message updates before returning. message.Service
 659	// already flushes synchronously on terminal updates, but a defer here
 660	// guarantees the contract at every Run exit (success, error, panic
 661	// recovery upstream) without callers needing to know.
 662	//
 663	// After the flush completes — meaning all per-message
 664	// Publish(UpdatedEvent) calls have fired and been buffered into
 665	// every subscriber's channel — publish the authoritative
 666	// RunComplete event for this turn. The flush-then-publish order
 667	// gives well-behaved clients the best chance of seeing the final
 668	// message event before RunComplete; the embedded Text field
 669	// reconciles for clients that observe the events out of order
 670	// (the pubsub broker fan-in does not serialize publishes from
 671	// different upstream brokers).
 672	defer func() {
 673		// Use a context detached from the run context: workspace
 674		// shutdown cancels ctx before this goroutine returns, but the
 675		// buffered streaming deltas must still land before the DB is
 676		// closed. A short timeout bounds the flush.
 677		flushCtx, flushCancel := context.WithTimeout(context.WithoutCancel(ctx), 5*time.Second)
 678		defer flushCancel()
 679		if flushErr := a.messages.FlushAll(flushCtx); flushErr != nil {
 680			slog.Error("Failed to flush pending message updates after run", "error", flushErr)
 681		}
 682		if skipRunComplete {
 683			return
 684		}
 685		complete := notify.RunComplete{SessionID: call.SessionID, RunID: call.RunID}
 686		if currentAssistant != nil {
 687			complete.MessageID = currentAssistant.ID
 688			complete.Text = currentAssistant.Content().String()
 689		}
 690		if retErr != nil {
 691			complete.Error = retErr.Error()
 692			complete.Cancelled = errors.Is(retErr, context.Canceled)
 693		} else if ctx.Err() != nil {
 694			complete.Cancelled = true
 695		}
 696		// Prefer the per-call hook when supplied so the coordinator
 697		// can coalesce retries (e.g. unauthorized → re-auth → retry)
 698		// into a single user-visible terminal event. The fallback
 699		// must-deliver publish applies bounded-blocking semantics to
 700		// the authoritative terminal event so a momentarily-full
 701		// subscriber channel can't silently drop it and hang
 702		// non-interactive clients waiting on RunComplete.
 703		a.publishRunComplete(ctx, call, complete)
 704	}()
 705
 706	history, files := a.preparePrompt(msgs, largeModel.CatwalkCfg.SupportsImages, call.Attachments...)
 707
 708	startTime := time.Now()
 709	a.eventPromptSent(call.SessionID)
 710
 711	var stepMessages []fantasy.Message
 712	var shouldSummarize bool
 713	// Don't send MaxOutputTokens if 0 — some providers (e.g. LM Studio) reject it
 714	var maxOutputTokens *int64
 715	if call.MaxOutputTokens > 0 {
 716		maxOutputTokens = &call.MaxOutputTokens
 717	}
 718	result, err = agent.Stream(genCtx, fantasy.AgentStreamCall{
 719		Prompt:           message.PromptWithTextAttachments(call.Prompt, call.Attachments),
 720		Files:            files,
 721		Messages:         history,
 722		ProviderOptions:  call.ProviderOptions,
 723		MaxOutputTokens:  maxOutputTokens,
 724		TopP:             call.TopP,
 725		Temperature:      call.Temperature,
 726		PresencePenalty:  call.PresencePenalty,
 727		TopK:             call.TopK,
 728		FrequencyPenalty: call.FrequencyPenalty,
 729		PrepareStep: func(callContext context.Context, options fantasy.PrepareStepFunctionOptions) (_ context.Context, prepared fantasy.PrepareStepResult, err error) {
 730			prepared.Messages = options.Messages
 731			for i := range prepared.Messages {
 732				prepared.Messages[i].ProviderOptions = nil
 733			}
 734
 735			// Use latest tools (updated by SetTools when MCP tools change).
 736			prepared.Tools = a.tools.Copy()
 737
 738			// Drain queued follow-up prompts, but skip any covered by a
 739			// cancel recorded while they sat in the queue: a cancel that
 740			// arrived after a prompt was queued must not let it run as
 741			// part of this step. Coverage is per-call by accept sequence
 742			// so a follow-up queued after the cancel (higher seq) is
 743			// still folded in.
 744			survivors := a.drainUncanceledQueue(call.SessionID)
 745			for _, queued := range survivors {
 746				userMessage, createErr := a.createUserMessage(callContext, queued)
 747				if createErr != nil {
 748					return callContext, prepared, createErr
 749				}
 750				prepared.Messages = append(prepared.Messages, userMessage.ToAIMessage()...)
 751			}
 752
 753			prepared.Messages = a.workaroundProviderMediaLimitations(prepared.Messages, largeModel)
 754
 755			lastSystemRoleInx := 0
 756			systemMessageUpdated := false
 757			for i, msg := range prepared.Messages {
 758				// Only add cache control to the last message.
 759				if msg.Role == fantasy.MessageRoleSystem {
 760					lastSystemRoleInx = i
 761				} else if !systemMessageUpdated {
 762					prepared.Messages[lastSystemRoleInx].ProviderOptions = a.getCacheControlOptions()
 763					systemMessageUpdated = true
 764				}
 765				// Than add cache control to the last 2 messages.
 766				if i > len(prepared.Messages)-3 {
 767					prepared.Messages[i].ProviderOptions = a.getCacheControlOptions()
 768				}
 769			}
 770
 771			if promptPrefix != "" {
 772				prepared.Messages = append([]fantasy.Message{fantasy.NewSystemMessage(promptPrefix)}, prepared.Messages...)
 773			}
 774
 775			sessionLock.Lock()
 776			stepMessages = cloneFantasyMessages(prepared.Messages)
 777			sessionLock.Unlock()
 778
 779			var assistantMsg message.Message
 780			assistantMsg, err = a.messages.Create(callContext, call.SessionID, message.CreateMessageParams{
 781				Role:     message.Assistant,
 782				Parts:    []message.ContentPart{},
 783				Model:    largeModel.ModelCfg.Model,
 784				Provider: largeModel.ModelCfg.Provider,
 785			})
 786			if err != nil {
 787				return callContext, prepared, err
 788			}
 789			callContext = context.WithValue(callContext, tools.MessageIDContextKey, assistantMsg.ID)
 790			callContext = context.WithValue(callContext, tools.SupportsImagesContextKey, largeModel.CatwalkCfg.SupportsImages)
 791			callContext = context.WithValue(callContext, tools.ModelNameContextKey, largeModel.CatwalkCfg.Name)
 792			currentAssistant = &assistantMsg
 793			return callContext, prepared, err
 794		},
 795		OnReasoningStart: func(id string, reasoning fantasy.ReasoningContent) error {
 796			currentAssistant.AppendReasoningContent(reasoning.Text)
 797			return a.messages.Update(genCtx, *currentAssistant)
 798		},
 799		OnReasoningDelta: func(id string, text string) error {
 800			currentAssistant.AppendReasoningContent(text)
 801			return a.messages.Update(genCtx, *currentAssistant)
 802		},
 803		OnReasoningEnd: func(id string, reasoning fantasy.ReasoningContent) error {
 804			// handle anthropic signature
 805			if anthropicData, ok := reasoning.ProviderMetadata[anthropic.Name]; ok {
 806				if reasoning, ok := anthropicData.(*anthropic.ReasoningOptionMetadata); ok {
 807					currentAssistant.AppendReasoningSignature(reasoning.Signature)
 808				}
 809			}
 810			if googleData, ok := reasoning.ProviderMetadata[google.Name]; ok {
 811				if reasoning, ok := googleData.(*google.ReasoningMetadata); ok {
 812					currentAssistant.AppendThoughtSignature(reasoning.Signature, reasoning.ToolID)
 813				}
 814			}
 815			if openaiData, ok := reasoning.ProviderMetadata[openai.Name]; ok {
 816				if reasoning, ok := openaiData.(*openai.ResponsesReasoningMetadata); ok {
 817					currentAssistant.SetReasoningResponsesData(reasoning)
 818				}
 819			}
 820			currentAssistant.FinishThinking()
 821			return a.messages.Update(genCtx, *currentAssistant)
 822		},
 823		OnTextDelta: func(id string, text string) error {
 824			// Strip leading newline from initial text content. This is is
 825			// particularly important in non-interactive mode where leading
 826			// newlines are very visible.
 827			if len(currentAssistant.Parts) == 0 {
 828				text = strings.TrimPrefix(text, "\n")
 829			}
 830
 831			currentAssistant.AppendContent(text)
 832			return a.messages.Update(genCtx, *currentAssistant)
 833		},
 834		OnToolInputStart: func(id string, toolName string) error {
 835			toolCall := message.ToolCall{
 836				ID:               id,
 837				Name:             toolName,
 838				ProviderExecuted: false,
 839				Finished:         false,
 840			}
 841			currentAssistant.AddToolCall(toolCall)
 842			// Use parent ctx instead of genCtx to ensure the update succeeds
 843			// even if the request is canceled mid-stream
 844			return a.messages.Update(ctx, *currentAssistant)
 845		},
 846		OnRetry: func(err *fantasy.ProviderError, delay time.Duration) {
 847			slog.Warn("Provider request failed, retrying", providerRetryLogFields(err, delay)...)
 848		},
 849		OnToolCall: func(tc fantasy.ToolCallContent) error {
 850			toolCall := message.ToolCall{
 851				ID:               tc.ToolCallID,
 852				Name:             tc.ToolName,
 853				Input:            tc.Input,
 854				ProviderExecuted: false,
 855				Finished:         true,
 856			}
 857			currentAssistant.AddToolCall(toolCall)
 858			// Use parent ctx instead of genCtx to ensure the update succeeds
 859			// even if the request is canceled mid-stream
 860			return a.messages.Update(ctx, *currentAssistant)
 861		},
 862		OnToolResult: func(result fantasy.ToolResultContent) error {
 863			toolResult := a.convertToToolResult(result)
 864			// Use parent ctx instead of genCtx to ensure the message is created
 865			// even if the request is canceled mid-stream
 866			_, createMsgErr := a.messages.Create(ctx, currentAssistant.SessionID, message.CreateMessageParams{
 867				Role: message.Tool,
 868				Parts: []message.ContentPart{
 869					toolResult,
 870				},
 871			})
 872			return createMsgErr
 873		},
 874		OnStepFinish: func(stepResult fantasy.StepResult) error {
 875			finishReason := message.FinishReasonUnknown
 876			switch stepResult.FinishReason {
 877			case fantasy.FinishReasonLength:
 878				finishReason = message.FinishReasonMaxTokens
 879			case fantasy.FinishReasonStop:
 880				finishReason = message.FinishReasonEndTurn
 881			case fantasy.FinishReasonToolCalls:
 882				finishReason = message.FinishReasonToolUse
 883			}
 884			// If a tool result halted the turn (e.g. a hook halt or a
 885			// permission denial), the step ends on FinishReasonToolCalls but
 886			// the model will not be called again. Treat it as the end of the
 887			// turn so the UI can render the assistant footer.
 888			if finishReason == message.FinishReasonToolUse {
 889				for _, tr := range stepResult.Content.ToolResults() {
 890					if tr.StopTurn {
 891						finishReason = message.FinishReasonEndTurn
 892						break
 893					}
 894				}
 895			}
 896			currentAssistant.AddFinish(finishReason, "", "")
 897			sessionLock.Lock()
 898			defer sessionLock.Unlock()
 899
 900			updatedSession, getSessionErr := a.sessions.Get(ctx, call.SessionID)
 901			if getSessionErr != nil {
 902				return getSessionErr
 903			}
 904			usage, estimated := fallbackStepUsage(stepMessages, stepResult)
 905			a.updateSessionUsage(largeModel, &updatedSession, usage, a.openrouterCost(stepResult.ProviderMetadata), estimated)
 906			_, sessionErr := a.sessions.Save(ctx, updatedSession)
 907			if sessionErr != nil {
 908				return sessionErr
 909			}
 910			currentSession = updatedSession
 911			return a.messages.Update(genCtx, *currentAssistant)
 912		},
 913		StopWhen: []fantasy.StopCondition{
 914			func(_ []fantasy.StepResult) bool {
 915				cw := int64(largeModel.CatwalkCfg.ContextWindow)
 916				// If context window is unknown (0), skip auto-summarize
 917				// to avoid immediately truncating custom/local models.
 918				if cw == 0 {
 919					return false
 920				}
 921				tokens := currentSession.CompletionTokens + currentSession.PromptTokens
 922				remaining := cw - tokens
 923				var threshold int64
 924				if cw > largeContextWindowThreshold {
 925					threshold = largeContextWindowBuffer
 926				} else {
 927					threshold = int64(float64(cw) * smallContextWindowRatio)
 928				}
 929				if (remaining <= threshold) && !a.disableAutoSummarize {
 930					shouldSummarize = true
 931					return true
 932				}
 933				return false
 934			},
 935			func(steps []fantasy.StepResult) bool {
 936				return hasRepeatedToolCalls(steps, loopDetectionWindowSize, loopDetectionMaxRepeats)
 937			},
 938		},
 939	})
 940
 941	a.eventPromptResponded(call.SessionID, time.Since(startTime).Truncate(time.Second))
 942
 943	if err != nil {
 944		isHyper := largeModel.ModelCfg.Provider == hyper.Name
 945		isCancelErr := errors.Is(err, context.Canceled)
 946		if currentAssistant == nil {
 947			// Cancel-before-assistant-creation window: the run was
 948			// canceled after activeRequests.Set but before PrepareStep
 949			// created the assistant message. Without this, the turn
 950			// would return with no FinishReasonCanceled marker and no
 951			// user-visible record. The user message was already created
 952			// above, so persistCanceledTurn only writes the assistant
 953			// record.
 954			if isCancelErr {
 955				if persistErr := a.persistCanceledTurn(ctx, call, userMsgCreated); persistErr != nil {
 956					return nil, persistErr
 957				}
 958			}
 959			return result, err
 960		}
 961		// Persist final state with a context detached from the run
 962		// context. The run context (ctx) is derived from the
 963		// workspace context, which workspace shutdown cancels before
 964		// agent goroutines finish; using ctx here would drop the
 965		// final assistant state. WithoutCancel keeps the values
 966		// (e.g. session ID) while ignoring cancellation, and a short
 967		// timeout bounds the cleanup writes.
 968		cleanupCtx, cleanupCancel := context.WithTimeout(context.WithoutCancel(ctx), 5*time.Second)
 969		defer cleanupCancel()
 970		// Ensure we finish thinking on error to close the reasoning state.
 971		currentAssistant.FinishThinking()
 972		toolCalls := currentAssistant.ToolCalls()
 973		// INFO: we use the cleanup context here because the genCtx has been cancelled.
 974		msgs, createErr := a.messages.List(cleanupCtx, currentAssistant.SessionID)
 975		if createErr != nil {
 976			return nil, createErr
 977		}
 978		for _, tc := range toolCalls {
 979			if !tc.Finished {
 980				tc.Finished = true
 981				tc.Input = "{}"
 982				currentAssistant.AddToolCall(tc)
 983				updateErr := a.messages.Update(cleanupCtx, *currentAssistant)
 984				if updateErr != nil {
 985					return nil, updateErr
 986				}
 987			}
 988
 989			found := false
 990			for _, msg := range msgs {
 991				if msg.Role == message.Tool {
 992					for _, tr := range msg.ToolResults() {
 993						if tr.ToolCallID == tc.ID {
 994							found = true
 995							break
 996						}
 997					}
 998				}
 999				if found {
1000					break
1001				}
1002			}
1003			if found {
1004				continue
1005			}
1006			content := "There was an error while executing the tool"
1007			if isCancelErr {
1008				content = "Error: user cancelled assistant tool calling"
1009			}
1010			toolResult := message.ToolResult{
1011				ToolCallID: tc.ID,
1012				Name:       tc.Name,
1013				Content:    content,
1014				IsError:    true,
1015			}
1016			_, createErr = a.messages.Create(cleanupCtx, currentAssistant.SessionID, message.CreateMessageParams{
1017				Role: message.Tool,
1018				Parts: []message.ContentPart{
1019					toolResult,
1020				},
1021			})
1022			if createErr != nil {
1023				return nil, createErr
1024			}
1025		}
1026		var fantasyErr *fantasy.Error
1027		var providerErr *fantasy.ProviderError
1028		const defaultTitle = "Provider Error"
1029		linkStyle := lipgloss.NewStyle().Foreground(charmtone.Guac).Underline(true)
1030		if isCancelErr {
1031			currentAssistant.AddFinish(message.FinishReasonCanceled, "User canceled request", "")
1032		} else if isHyper && errors.As(err, &providerErr) && providerErr.StatusCode == http.StatusUnauthorized {
1033			currentAssistant.AddFinish(message.FinishReasonError, "Unauthorized", `Please re-authenticate with Hyper. You can also run "crush auth" to re-authenticate.`)
1034			if a.notify != nil {
1035				a.notify.Publish(pubsub.CreatedEvent, notify.Notification{
1036					SessionID:    call.SessionID,
1037					SessionTitle: currentSession.Title,
1038					Type:         notify.TypeReAuthenticate,
1039					ProviderID:   largeModel.ModelCfg.Provider,
1040				})
1041			}
1042		} else if isHyper && errors.As(err, &providerErr) && providerErr.StatusCode == http.StatusPaymentRequired {
1043			url := hyper.BaseURL()
1044			link := linkStyle.Hyperlink(url, "id=hyper").Render(url)
1045			currentAssistant.AddFinish(message.FinishReasonError, "No credits", "You're out of credits. Add more at "+link)
1046		} else if errors.As(err, &providerErr) {
1047			if providerErr.Message == "The requested model is not supported." {
1048				url := "https://github.com/settings/copilot/features"
1049				link := linkStyle.Hyperlink(url, "id=copilot").Render(url)
1050				currentAssistant.AddFinish(
1051					message.FinishReasonError,
1052					"Copilot model not enabled",
1053					fmt.Sprintf("%q is not enabled in Copilot. Go to the following page to enable it. Then, wait 5 minutes before trying again. %s", largeModel.CatwalkCfg.Name, link),
1054				)
1055			} else {
1056				currentAssistant.AddFinish(message.FinishReasonError, cmp.Or(stringext.Capitalize(providerErr.Title), defaultTitle), providerErr.Message)
1057			}
1058		} else if errors.As(err, &fantasyErr) {
1059			currentAssistant.AddFinish(message.FinishReasonError, cmp.Or(stringext.Capitalize(fantasyErr.Title), defaultTitle), fantasyErr.Message)
1060		} else {
1061			currentAssistant.AddFinish(message.FinishReasonError, defaultTitle, err.Error())
1062		}
1063		// Note: we use the cleanup context here because the genCtx has been
1064		// cancelled.
1065		updateErr := a.messages.Update(cleanupCtx, *currentAssistant)
1066		if updateErr != nil {
1067			return nil, updateErr
1068		}
1069		return nil, err
1070	}
1071
1072	if shouldSummarize {
1073		a.activeRequests.Del(call.SessionID)
1074		if summarizeErr := a.Summarize(genCtx, call.SessionID, call.ProviderOptions); summarizeErr != nil {
1075			return nil, summarizeErr
1076		}
1077		// If the agent wasn't done...
1078		if len(currentAssistant.ToolCalls()) > 0 {
1079			existing, ok := a.messageQueue.Get(call.SessionID)
1080			if !ok {
1081				existing = []SessionAgentCall{}
1082			}
1083			call.Prompt = fmt.Sprintf("The previous session was interrupted because it got too long, the initial user request was: `%s`", call.Prompt)
1084			existing = append(existing, call)
1085			a.messageQueue.Set(call.SessionID, existing)
1086		}
1087	}
1088
1089	// Release active request before publishing the notification.
1090	// TUI handlers poll IsSessionBusy() and only re-evaluate when a
1091	// tea.Msg arrives, so the cleanup must precede the notify or
1092	// subscribers see stale busy state at the moment of receipt.
1093	a.activeRequests.Del(call.SessionID)
1094	cancel()
1095
1096	// Send notification that agent has finished its turn (skip for
1097	// nested/non-interactive sessions).
1098	if !call.NonInteractive && a.notify != nil {
1099		a.notify.Publish(pubsub.CreatedEvent, notify.Notification{
1100			SessionID:    call.SessionID,
1101			SessionTitle: currentSession.Title,
1102			Type:         notify.TypeAgentFinished,
1103		})
1104	}
1105
1106	// Hand off to the next queued prompt (if any) under dispatchMu so
1107	// the transition from this finished run to the queued run is atomic
1108	// against a concurrent Cancel. activeRequests for this session was
1109	// just deleted above, so without the lock there is a window in
1110	// which the session looks idle and a cancel becomes a no-op that
1111	// fails to stop the queued prompt. Holding the lock lets us observe
1112	// a pending cancel recorded against the session and drop the queue
1113	// instead of running it, and (for the recursion) hand a fresh
1114	// accept reservation to the dequeued call so acceptedRuns stays > 0
1115	// across the recursive Run's own dispatch handoff — keeping the
1116	// session observable to Cancel for the entire transition and
1117	// closing the dequeue -> re-register window.
1118	mu := a.sessionMu(call.SessionID)
1119	mu.Lock()
1120	queuedMessages, _ := a.messageQueue.Get(call.SessionID)
1121	if mark, ok := a.cancelMark.Get(call.SessionID); ok && mark > 0 && len(queuedMessages) > 0 {
1122		// A cancel was recorded for this session (e.g. it arrived while
1123		// this run was active and follow-ups had been queued). Drop the
1124		// queued prompts it covers (accept sequence at or below the
1125		// mark, or untracked); keep any queued after the cancel (higher
1126		// sequence) so they still run.
1127		var kept []SessionAgentCall
1128		for _, q := range queuedMessages {
1129			if q.acceptSeq == 0 || q.acceptSeq <= mark {
1130				continue
1131			}
1132			kept = append(kept, q)
1133		}
1134		queuedMessages = kept
1135		a.messageQueue.Set(call.SessionID, kept)
1136	}
1137	if len(queuedMessages) == 0 {
1138		// No queued work. Clear the cancel mark only when no accepted
1139		// run remains in flight that it might still cover; otherwise a
1140		// sibling prompt (sequence at or below the mark) waiting to
1141		// enter Run would lose its cancellation. When accepted runs are
1142		// gone, this also clears a stale mark so it can't catch a
1143		// future run.
1144		a.messageQueue.Del(call.SessionID)
1145		a.acceptedMu.Lock()
1146		inFlight, _ := a.acceptedRuns.Get(call.SessionID)
1147		a.acceptedMu.Unlock()
1148		if inFlight == 0 {
1149			a.cancelMark.Del(call.SessionID)
1150		}
1151		mu.Unlock()
1152		return result, err
1153	}
1154	// There are queued messages restart the loop. The recursive Run
1155	// publishes its own RunComplete for the queued prompt, so suppress
1156	// the outer defer's emit to avoid a duplicate event whose Error
1157	// field would belong to the recursive turn but whose MessageID/Text
1158	// would belong to the outer turn.
1159	skipRunComplete = true
1160	firstQueuedMessage := queuedMessages[0]
1161	a.messageQueue.Set(call.SessionID, queuedMessages[1:])
1162	// Reserve a fresh accept for the dequeued prompt before dropping the
1163	// lock so acceptedRuns > 0 across the handoff into the recursive
1164	// Run. This closes the window between this dequeue and the recursive
1165	// Run registering its activeRequests entry: a cancel arriving in
1166	// that window now records a pending cancel (acceptedRuns > 0) that
1167	// the recursive Run's accepted path observes as cancel-on-entry.
1168	firstQueuedMessage.Accepted = a.BeginAccepted(call.SessionID)
1169	mu.Unlock()
1170	return a.Run(ctx, firstQueuedMessage)
1171}
1172
1173func (a *sessionAgent) Summarize(ctx context.Context, sessionID string, opts fantasy.ProviderOptions) error {
1174	if a.IsSessionBusy(sessionID) {
1175		return ErrSessionBusy
1176	}
1177
1178	// Copy mutable fields under lock to avoid races with SetModels.
1179	largeModel := a.largeModel.Get()
1180	systemPromptPrefix := a.systemPromptPrefix.Get()
1181
1182	currentSession, err := a.sessions.Get(ctx, sessionID)
1183	if err != nil {
1184		return fmt.Errorf("failed to get session: %w", err)
1185	}
1186	msgs, err := a.getSessionMessages(ctx, currentSession)
1187	if err != nil {
1188		return err
1189	}
1190	if len(msgs) == 0 {
1191		// Nothing to summarize.
1192		return nil
1193	}
1194
1195	aiMsgs, _ := a.preparePrompt(msgs, largeModel.CatwalkCfg.SupportsImages)
1196
1197	genCtx, cancel := context.WithCancel(ctx)
1198	a.activeRequests.Set(sessionID, cancel)
1199	defer a.activeRequests.Del(sessionID)
1200	defer cancel()
1201	defer func() {
1202		if flushErr := a.messages.FlushAll(ctx); flushErr != nil {
1203			slog.Error("Failed to flush pending message updates after summarize", "error", flushErr)
1204		}
1205	}()
1206
1207	agent := fantasy.NewAgent(
1208		largeModel.Model,
1209		fantasy.WithSystemPrompt(string(summaryPrompt)),
1210		fantasy.WithUserAgent(userAgent),
1211	)
1212	summaryMessage, err := a.messages.Create(ctx, sessionID, message.CreateMessageParams{
1213		Role:             message.Assistant,
1214		Model:            largeModel.ModelCfg.Model,
1215		Provider:         largeModel.ModelCfg.Provider,
1216		IsSummaryMessage: true,
1217	})
1218	if err != nil {
1219		return err
1220	}
1221
1222	summaryPromptText := buildSummaryPrompt(currentSession.Todos)
1223
1224	resp, err := agent.Stream(genCtx, fantasy.AgentStreamCall{
1225		Prompt:          summaryPromptText,
1226		Messages:        aiMsgs,
1227		ProviderOptions: opts,
1228		PrepareStep: func(callContext context.Context, options fantasy.PrepareStepFunctionOptions) (_ context.Context, prepared fantasy.PrepareStepResult, err error) {
1229			prepared.Messages = options.Messages
1230			if systemPromptPrefix != "" {
1231				prepared.Messages = append([]fantasy.Message{fantasy.NewSystemMessage(systemPromptPrefix)}, prepared.Messages...)
1232			}
1233			return callContext, prepared, nil
1234		},
1235		OnReasoningDelta: func(id string, text string) error {
1236			summaryMessage.AppendReasoningContent(text)
1237			return a.messages.Update(genCtx, summaryMessage)
1238		},
1239		OnReasoningEnd: func(id string, reasoning fantasy.ReasoningContent) error {
1240			// Handle anthropic signature.
1241			if anthropicData, ok := reasoning.ProviderMetadata["anthropic"]; ok {
1242				if signature, ok := anthropicData.(*anthropic.ReasoningOptionMetadata); ok && signature.Signature != "" {
1243					summaryMessage.AppendReasoningSignature(signature.Signature)
1244				}
1245			}
1246			summaryMessage.FinishThinking()
1247			return a.messages.Update(genCtx, summaryMessage)
1248		},
1249		OnTextDelta: func(id, text string) error {
1250			summaryMessage.AppendContent(text)
1251			return a.messages.Update(genCtx, summaryMessage)
1252		},
1253	})
1254	if err != nil {
1255		isCancelErr := errors.Is(err, context.Canceled)
1256		if isCancelErr {
1257			// User cancelled summarize we need to remove the summary message.
1258			deleteErr := a.messages.Delete(ctx, summaryMessage.ID)
1259			return deleteErr
1260		}
1261		// Mark the summary message as finished with an error so the UI
1262		// stops spinning.
1263		summaryMessage.AddFinish(message.FinishReasonError, "Summarization Error", err.Error())
1264		if updateErr := a.messages.Update(ctx, summaryMessage); updateErr != nil {
1265			return updateErr
1266		}
1267		return err
1268	}
1269
1270	summaryMessage.AddFinish(message.FinishReasonEndTurn, "", "")
1271	err = a.messages.Update(genCtx, summaryMessage)
1272	if err != nil {
1273		return err
1274	}
1275
1276	var openrouterCost *float64
1277	for _, step := range resp.Steps {
1278		stepCost := a.openrouterCost(step.ProviderMetadata)
1279		if stepCost != nil {
1280			newCost := *stepCost
1281			if openrouterCost != nil {
1282				newCost += *openrouterCost
1283			}
1284			openrouterCost = &newCost
1285		}
1286	}
1287
1288	a.updateSessionUsage(largeModel, &currentSession, resp.TotalUsage, openrouterCost, false)
1289
1290	// Just in case, get just the last usage info.
1291	usage := resp.Response.Usage
1292	currentSession.SummaryMessageID = summaryMessage.ID
1293	currentSession.CompletionTokens = summaryCompletionTokens(usage, summaryMessage)
1294	currentSession.PromptTokens = 0
1295	currentSession.EstimatedUsage = usageIsZero(usage)
1296	_, err = a.sessions.Save(genCtx, currentSession)
1297	if err != nil {
1298		return err
1299	}
1300
1301	// Release the active request before processing queued messages so that
1302	// Run() does not see the session as busy.
1303	a.activeRequests.Del(sessionID)
1304	cancel()
1305
1306	// Process any messages that were queued while summarizing.
1307	queuedMessages, ok := a.messageQueue.Get(sessionID)
1308	if !ok || len(queuedMessages) == 0 {
1309		return nil
1310	}
1311	firstQueuedMessage := queuedMessages[0]
1312	a.messageQueue.Set(sessionID, queuedMessages[1:])
1313	_, qErr := a.Run(ctx, firstQueuedMessage)
1314	return qErr
1315}
1316
1317func (a *sessionAgent) getCacheControlOptions() fantasy.ProviderOptions {
1318	if t, _ := strconv.ParseBool(os.Getenv("CRUSH_DISABLE_ANTHROPIC_CACHE")); t {
1319		return fantasy.ProviderOptions{}
1320	}
1321	return fantasy.ProviderOptions{
1322		anthropic.Name: &anthropic.ProviderCacheControlOptions{
1323			CacheControl: anthropic.CacheControl{Type: "ephemeral"},
1324		},
1325		bedrock.Name: &anthropic.ProviderCacheControlOptions{
1326			CacheControl: anthropic.CacheControl{Type: "ephemeral"},
1327		},
1328		vercel.Name: &anthropic.ProviderCacheControlOptions{
1329			CacheControl: anthropic.CacheControl{Type: "ephemeral"},
1330		},
1331	}
1332}
1333
1334func (a *sessionAgent) createUserMessage(ctx context.Context, call SessionAgentCall) (message.Message, error) {
1335	parts := []message.ContentPart{message.TextContent{Text: call.Prompt}}
1336	var attachmentParts []message.ContentPart
1337	for _, attachment := range call.Attachments {
1338		attachmentParts = append(attachmentParts, message.BinaryContent{Path: attachment.FilePath, MIMEType: attachment.MimeType, Data: attachment.Content})
1339	}
1340	parts = append(parts, attachmentParts...)
1341	msg, err := a.messages.Create(ctx, call.SessionID, message.CreateMessageParams{
1342		Role:  message.User,
1343		Parts: parts,
1344	})
1345	if err != nil {
1346		return message.Message{}, fmt.Errorf("failed to create user message: %w", err)
1347	}
1348	return msg, nil
1349}
1350
1351func (a *sessionAgent) preparePrompt(msgs []message.Message, supportsImages bool, attachments ...message.Attachment) ([]fantasy.Message, []fantasy.FilePart) {
1352	var history []fantasy.Message
1353	if !a.isSubAgent {
1354		history = append(history, fantasy.NewUserMessage(
1355			fmt.Sprintf(
1356				"<system_reminder>%s</system_reminder>",
1357				`This is a reminder that your todo list is currently empty. DO NOT mention this to the user explicitly because they are already aware.
1358If you are working on tasks that would benefit from a todo list please use the "todos" tool to create one.
1359If not, please feel free to ignore. Again do not mention this message to the user.`,
1360			),
1361		))
1362	}
1363	// Collect all tool call IDs present in assistant messages and all tool
1364	// result IDs present in tool messages. This lets us detect both orphaned
1365	// tool results (result without a call) and orphaned tool calls (call
1366	// without a result).
1367	knownToolCallIDs := make(map[string]struct{})
1368	knownToolResultIDs := make(map[string]struct{})
1369	for _, m := range msgs {
1370		switch m.Role {
1371		case message.Assistant:
1372			for _, tc := range m.ToolCalls() {
1373				knownToolCallIDs[tc.ID] = struct{}{}
1374			}
1375		case message.Tool:
1376			for _, tr := range m.ToolResults() {
1377				knownToolResultIDs[tr.ToolCallID] = struct{}{}
1378			}
1379		}
1380	}
1381
1382	for _, m := range msgs {
1383		if len(m.Parts) == 0 {
1384			continue
1385		}
1386		// Assistant message without content or tool calls (cancelled before it returned anything).
1387		if m.Role == message.Assistant && len(m.ToolCalls()) == 0 && m.Content().Text == "" && m.ReasoningContent().String() == "" {
1388			continue
1389		}
1390		if m.Role == message.Tool {
1391			if msg, ok := filterOrphanedToolResults(m, knownToolCallIDs); ok {
1392				history = append(history, msg)
1393			}
1394			continue
1395		}
1396		aiMsgs := m.ToAIMessage()
1397		if !supportsImages {
1398			for i := range aiMsgs {
1399				if aiMsgs[i].Role == fantasy.MessageRoleUser {
1400					aiMsgs[i].Content = filterFileParts(aiMsgs[i].Content)
1401				}
1402			}
1403		}
1404		history = append(history, aiMsgs...)
1405
1406		if m.Role == message.Assistant {
1407			if msg, ok := syntheticToolResultsForOrphanedCalls(m, knownToolResultIDs); ok {
1408				history = append(history, msg)
1409			}
1410		}
1411	}
1412
1413	var files []fantasy.FilePart
1414	for _, attachment := range attachments {
1415		if attachment.IsText() {
1416			continue
1417		}
1418		files = append(files, fantasy.FilePart{
1419			Filename:  attachment.FileName,
1420			Data:      attachment.Content,
1421			MediaType: attachment.MimeType,
1422		})
1423	}
1424
1425	return history, files
1426}
1427
1428// filterFileParts removes fantasy.FilePart entries from a slice of message
1429// parts. Used to strip image attachments from historical user messages when
1430// the current model does not support them.
1431func filterFileParts(parts []fantasy.MessagePart) []fantasy.MessagePart {
1432	filtered := make([]fantasy.MessagePart, 0, len(parts))
1433	for _, part := range parts {
1434		if _, ok := fantasy.AsMessagePart[fantasy.FilePart](part); ok {
1435			continue
1436		}
1437		filtered = append(filtered, part)
1438	}
1439	return filtered
1440}
1441
1442// filterOrphanedToolResults converts a tool message to a fantasy.Message,
1443// dropping any tool result parts whose tool_call_id has no matching tool call
1444// in the known set. An orphaned result causes API validation to fail on every
1445// subsequent turn, permanently locking the session. Returns the filtered
1446// message and true if at least one valid part remains.
1447func filterOrphanedToolResults(m message.Message, knownToolCallIDs map[string]struct{}) (fantasy.Message, bool) {
1448	aiMsgs := m.ToAIMessage()
1449	if len(aiMsgs) == 0 {
1450		return fantasy.Message{}, false
1451	}
1452	var validParts []fantasy.MessagePart
1453	for _, part := range aiMsgs[0].Content {
1454		tr, ok := fantasy.AsMessagePart[fantasy.ToolResultPart](part)
1455		if !ok {
1456			validParts = append(validParts, part)
1457			continue
1458		}
1459		if _, known := knownToolCallIDs[tr.ToolCallID]; known {
1460			validParts = append(validParts, part)
1461		} else {
1462			slog.Warn(
1463				"Dropping orphaned tool result with no matching tool call",
1464				"tool_call_id", tr.ToolCallID,
1465			)
1466		}
1467	}
1468	if len(validParts) == 0 {
1469		return fantasy.Message{}, false
1470	}
1471	msg := aiMsgs[0]
1472	msg.Content = validParts
1473	return msg, true
1474}
1475
1476// syntheticToolResultsForOrphanedCalls returns a tool message containing
1477// synthetic tool results for any tool calls in the assistant message that
1478// have no matching result in knownToolResultIDs. LLM APIs require every
1479// tool_use to be immediately followed by a tool_result; an interrupted
1480// session can leave orphaned tool_use blocks that permanently lock the
1481// conversation. Returns the message and true if any synthetic results were
1482// produced.
1483func syntheticToolResultsForOrphanedCalls(m message.Message, knownToolResultIDs map[string]struct{}) (fantasy.Message, bool) {
1484	var syntheticParts []fantasy.MessagePart
1485	for _, tc := range m.ToolCalls() {
1486		if _, hasResult := knownToolResultIDs[tc.ID]; hasResult {
1487			continue
1488		}
1489		slog.Warn(
1490			"Injecting synthetic tool result for orphaned tool call",
1491			"tool_call_id", tc.ID,
1492			"tool_name", tc.Name,
1493		)
1494		syntheticParts = append(syntheticParts, fantasy.ToolResultPart{
1495			ToolCallID: tc.ID,
1496			Output: fantasy.ToolResultOutputContentError{
1497				Error: errors.New("tool call was interrupted and did not produce a result, you may retry this call if the result is still needed"),
1498			},
1499		})
1500	}
1501	if len(syntheticParts) == 0 {
1502		return fantasy.Message{}, false
1503	}
1504	return fantasy.Message{
1505		Role:    fantasy.MessageRoleTool,
1506		Content: syntheticParts,
1507	}, true
1508}
1509
1510func (a *sessionAgent) getSessionMessages(ctx context.Context, session session.Session) ([]message.Message, error) {
1511	msgs, err := a.messages.List(ctx, session.ID)
1512	if err != nil {
1513		return nil, fmt.Errorf("failed to list messages: %w", err)
1514	}
1515
1516	if session.SummaryMessageID != "" {
1517		summaryMsgIndex := -1
1518		for i, msg := range msgs {
1519			if msg.ID == session.SummaryMessageID {
1520				summaryMsgIndex = i
1521				break
1522			}
1523		}
1524		if summaryMsgIndex != -1 {
1525			msgs = msgs[summaryMsgIndex:]
1526			msgs[0].Role = message.User
1527		}
1528	}
1529	return msgs, nil
1530}
1531
1532// generateTitle generates a session titled based on the initial prompt.
1533func (a *sessionAgent) generateTitle(ctx context.Context, sessionID string, userPrompt string) {
1534	if userPrompt == "" {
1535		return
1536	}
1537
1538	smallModel := a.smallModel.Get()
1539	largeModel := a.largeModel.Get()
1540	systemPromptPrefix := a.systemPromptPrefix.Get()
1541
1542	var maxOutputTokens int64 = 40
1543	if smallModel.CatwalkCfg.CanReason {
1544		maxOutputTokens = smallModel.CatwalkCfg.DefaultMaxTokens
1545	}
1546
1547	newAgent := func(m fantasy.LanguageModel, p []byte, tok int64) fantasy.Agent {
1548		return fantasy.NewAgent(
1549			m,
1550			fantasy.WithSystemPrompt(string(p)+"\n /no_think"),
1551			fantasy.WithMaxOutputTokens(tok),
1552			fantasy.WithUserAgent(userAgent),
1553		)
1554	}
1555
1556	streamCall := fantasy.AgentStreamCall{
1557		Prompt: fmt.Sprintf("Generate a concise title for the following content:\n\n%s\n <think>\n\n</think>", userPrompt),
1558		PrepareStep: func(callCtx context.Context, opts fantasy.PrepareStepFunctionOptions) (_ context.Context, prepared fantasy.PrepareStepResult, err error) {
1559			prepared.Messages = opts.Messages
1560			if systemPromptPrefix != "" {
1561				prepared.Messages = append([]fantasy.Message{
1562					fantasy.NewSystemMessage(systemPromptPrefix),
1563				}, prepared.Messages...)
1564			}
1565			return callCtx, prepared, nil
1566		},
1567	}
1568
1569	// Use the small model to generate the title.
1570	model := smallModel
1571	agent := newAgent(model.Model, titlePrompt, maxOutputTokens)
1572	resp, err := agent.Stream(ctx, streamCall)
1573	if err == nil {
1574		// We successfully generated a title with the small model.
1575		slog.Debug("Generated title with small model")
1576	} else {
1577		// It didn't work. Let's try with the big model.
1578		slog.Error("Error generating title with small model; trying big model", "err", err)
1579		model = largeModel
1580		agent = newAgent(model.Model, titlePrompt, maxOutputTokens)
1581		resp, err = agent.Stream(ctx, streamCall)
1582		if err == nil {
1583			slog.Debug("Generated title with large model")
1584		} else {
1585			// Welp, the large model didn't work either. Use the default
1586			// session name and return.
1587			slog.Error("Error generating title with large model", "err", err)
1588			saveErr := a.sessions.Rename(ctx, sessionID, DefaultSessionName)
1589			if saveErr != nil {
1590				slog.Error("Failed to save session title", "error", saveErr)
1591			}
1592			return
1593		}
1594	}
1595
1596	if resp == nil {
1597		// Actually, we didn't get a response so we can't. Use the default
1598		// session name and return.
1599		slog.Error("Response is nil; can't generate title")
1600		saveErr := a.sessions.Rename(ctx, sessionID, DefaultSessionName)
1601		if saveErr != nil {
1602			slog.Error("Failed to save session title", "error", saveErr)
1603		}
1604		return
1605	}
1606
1607	// Clean up title.
1608	var title string
1609	title = strings.ReplaceAll(resp.Response.Content.Text(), "\n", " ")
1610
1611	// Remove thinking tags if present.
1612	title = thinkTagRegex.ReplaceAllString(title, "")
1613	title = orphanThinkTagRegex.ReplaceAllString(title, "")
1614
1615	title = strings.TrimSpace(title)
1616	title = cmp.Or(title, DefaultSessionName)
1617
1618	// Calculate usage and cost.
1619	var openrouterCost *float64
1620	for _, step := range resp.Steps {
1621		stepCost := a.openrouterCost(step.ProviderMetadata)
1622		if stepCost != nil {
1623			newCost := *stepCost
1624			if openrouterCost != nil {
1625				newCost += *openrouterCost
1626			}
1627			openrouterCost = &newCost
1628		}
1629	}
1630
1631	modelConfig := model.CatwalkCfg
1632	cost := modelConfig.CostPer1MInCached/1e6*float64(resp.TotalUsage.CacheCreationTokens) +
1633		modelConfig.CostPer1MOutCached/1e6*float64(resp.TotalUsage.CacheReadTokens) +
1634		modelConfig.CostPer1MIn/1e6*float64(resp.TotalUsage.InputTokens) +
1635		modelConfig.CostPer1MOut/1e6*float64(resp.TotalUsage.OutputTokens)
1636
1637	// Use override cost if available (e.g., from OpenRouter).
1638	if openrouterCost != nil {
1639		cost = *openrouterCost
1640	}
1641
1642	// Skip cost accumulation
1643	if model.FlatRate {
1644		cost = 0
1645	}
1646
1647	promptTokens := resp.TotalUsage.InputTokens + resp.TotalUsage.CacheCreationTokens
1648	completionTokens := resp.TotalUsage.OutputTokens
1649
1650	// Atomically update only title and usage fields to avoid overriding other
1651	// concurrent session updates.
1652	saveErr := a.sessions.UpdateTitleAndUsage(ctx, sessionID, title, promptTokens, completionTokens, cost)
1653	if saveErr != nil {
1654		slog.Error("Failed to save session title and usage", "error", saveErr)
1655		return
1656	}
1657}
1658
1659func (a *sessionAgent) openrouterCost(metadata fantasy.ProviderMetadata) *float64 {
1660	openrouterMetadata, ok := metadata[openrouter.Name]
1661	if !ok {
1662		return nil
1663	}
1664
1665	opts, ok := openrouterMetadata.(*openrouter.ProviderMetadata)
1666	if !ok {
1667		return nil
1668	}
1669	return &opts.Usage.Cost
1670}
1671
1672func (a *sessionAgent) updateSessionUsage(model Model, session *session.Session, usage fantasy.Usage, overrideCost *float64, estimated bool) {
1673	if !usageIsZero(usage) {
1674		session.EstimatedUsage = estimated
1675	}
1676
1677	modelConfig := model.CatwalkCfg
1678	cost := modelConfig.CostPer1MInCached/1e6*float64(usage.CacheCreationTokens) +
1679		modelConfig.CostPer1MOutCached/1e6*float64(usage.CacheReadTokens) +
1680		modelConfig.CostPer1MIn/1e6*float64(usage.InputTokens) +
1681		modelConfig.CostPer1MOut/1e6*float64(usage.OutputTokens)
1682
1683	if !estimated {
1684		a.eventTokensUsed(session.ID, model, usage, cost)
1685	}
1686
1687	if estimated {
1688		cost = 0
1689	} else {
1690		// Use override cost if available (e.g., from OpenRouter).
1691		if overrideCost != nil {
1692			cost = *overrideCost
1693		}
1694
1695		// Skip cost accumulation
1696		if model.FlatRate {
1697			cost = 0
1698		}
1699	}
1700
1701	session.Cost += cost
1702	updateSessionTokenCounters(session, usage)
1703}
1704
1705func updateSessionTokenCounters(session *session.Session, usage fantasy.Usage) {
1706	if usage.OutputTokens != 0 {
1707		session.CompletionTokens = usage.OutputTokens
1708	}
1709	if promptTokens := usage.InputTokens + usage.CacheReadTokens; promptTokens != 0 {
1710		session.PromptTokens = promptTokens
1711	}
1712}
1713
1714func summaryCompletionTokens(usage fantasy.Usage, summaryMessage message.Message) int64 {
1715	if usage.OutputTokens != 0 {
1716		return usage.OutputTokens
1717	}
1718	return approxTokenCount(summaryMessage.Content().Text) + approxTokenCount(summaryMessage.ReasoningContent().String())
1719}
1720
1721func (a *sessionAgent) Cancel(sessionID string) {
1722	// Serialize against the dispatch handoff in Run so the accepted ->
1723	// (cancel-on-entry | queued | active) transition is atomic against
1724	// this cancel. Every cancel observes at least one of: an active
1725	// request, an accepted run (recorded as a pending cancel), or a
1726	// queue entry it then clears. If none of those hold, an idle Escape
1727	// is a true no-op and must not poison the next prompt.
1728	mu := a.sessionMu(sessionID)
1729	mu.Lock()
1730	defer mu.Unlock()
1731
1732	// Cancel regular requests. Don't use Take() here - we need the entry to
1733	// remain in activeRequests so IsBusy() returns true until the goroutine
1734	// fully completes (including error handling that may access the DB).
1735	// The defer in processRequest will clean up the entry.
1736	if cancel, ok := a.activeRequests.Get(sessionID); ok && cancel != nil {
1737		slog.Debug("Request cancellation initiated", "session_id", sessionID)
1738		cancel()
1739	}
1740
1741	// Also check for summarize requests.
1742	if cancel, ok := a.activeRequests.Get(sessionID + "-summarize"); ok && cancel != nil {
1743		slog.Debug("Summarize cancellation initiated", "session_id", sessionID)
1744		cancel()
1745	}
1746
1747	// Record a pending cancel only when a dispatched-but-not-yet-active
1748	// run exists. This catches runs still in the goroutine scheduler or
1749	// about to enter Run's busy-queue branch, while leaving an idle
1750	// session untouched. Active and accepted are not mutually exclusive:
1751	// when a run is active and a follow-up has been accepted, both the
1752	// cancel above and this pending record fire.
1753	//
1754	// Raise the session's cancel mark to the latest accept sequence
1755	// assigned so far. Every prompt currently accepted-but-not-yet-
1756	// active has a sequence at or below that value, so one cancel covers
1757	// all of them; a prompt accepted after this cancel gets a strictly
1758	// higher sequence and is never poisoned. Using max keeps repeated
1759	// cancels idempotent while the same prompts are in flight and lets a
1760	// later cancel extend coverage to prompts accepted since.
1761	a.acceptedMu.Lock()
1762	count, ok := a.acceptedRuns.Get(sessionID)
1763	mark := a.acceptSeqGen
1764	a.acceptedMu.Unlock()
1765	if ok && count > 0 {
1766		slog.Debug("Recording cancel mark for accepted runs", "session_id", sessionID, "count", count, "mark", mark)
1767		existing, _ := a.cancelMark.Get(sessionID)
1768		a.cancelMark.Set(sessionID, max(existing, mark))
1769	}
1770
1771	if a.QueuedPrompts(sessionID) > 0 {
1772		slog.Debug("Clearing queued prompts", "session_id", sessionID)
1773		a.messageQueue.Del(sessionID)
1774	}
1775}
1776
1777func (a *sessionAgent) ClearQueue(sessionID string) {
1778	if a.QueuedPrompts(sessionID) > 0 {
1779		slog.Debug("Clearing queued prompts", "session_id", sessionID)
1780		a.messageQueue.Del(sessionID)
1781	}
1782}
1783
1784func (a *sessionAgent) CancelAll() {
1785	if !a.IsBusy() {
1786		return
1787	}
1788	for key := range a.activeRequests.Seq2() {
1789		a.Cancel(key) // key is sessionID
1790	}
1791
1792	timeout := time.After(5 * time.Second)
1793	for a.IsBusy() {
1794		select {
1795		case <-timeout:
1796			return
1797		default:
1798			time.Sleep(200 * time.Millisecond)
1799		}
1800	}
1801}
1802
1803func (a *sessionAgent) IsBusy() bool {
1804	var busy bool
1805	for cancelFunc := range a.activeRequests.Seq() {
1806		if cancelFunc != nil {
1807			busy = true
1808			break
1809		}
1810	}
1811	return busy
1812}
1813
1814func (a *sessionAgent) IsSessionBusy(sessionID string) bool {
1815	_, busy := a.activeRequests.Get(sessionID)
1816	return busy
1817}
1818
1819func (a *sessionAgent) QueuedPrompts(sessionID string) int {
1820	l, ok := a.messageQueue.Get(sessionID)
1821	if !ok {
1822		return 0
1823	}
1824	return len(l)
1825}
1826
1827func (a *sessionAgent) QueuedPromptsList(sessionID string) []string {
1828	l, ok := a.messageQueue.Get(sessionID)
1829	if !ok {
1830		return nil
1831	}
1832	prompts := make([]string, len(l))
1833	for i, call := range l {
1834		prompts[i] = call.Prompt
1835	}
1836	return prompts
1837}
1838
1839func (a *sessionAgent) SetModels(large Model, small Model) {
1840	a.largeModel.Set(large)
1841	a.smallModel.Set(small)
1842}
1843
1844func (a *sessionAgent) SetTools(tools []fantasy.AgentTool) {
1845	a.tools.SetSlice(tools)
1846}
1847
1848func (a *sessionAgent) SetSystemPrompt(systemPrompt string) {
1849	a.systemPrompt.Set(systemPrompt)
1850}
1851
1852func (a *sessionAgent) Model() Model {
1853	return a.largeModel.Get()
1854}
1855
1856// convertToToolResult converts a fantasy tool result to a message tool result.
1857func (a *sessionAgent) convertToToolResult(result fantasy.ToolResultContent) message.ToolResult {
1858	baseResult := message.ToolResult{
1859		ToolCallID: result.ToolCallID,
1860		Name:       result.ToolName,
1861		Metadata:   result.ClientMetadata,
1862	}
1863
1864	switch result.Result.GetType() {
1865	case fantasy.ToolResultContentTypeText:
1866		if r, ok := fantasy.AsToolResultOutputType[fantasy.ToolResultOutputContentText](result.Result); ok {
1867			baseResult.Content = r.Text
1868		}
1869	case fantasy.ToolResultContentTypeError:
1870		if r, ok := fantasy.AsToolResultOutputType[fantasy.ToolResultOutputContentError](result.Result); ok {
1871			baseResult.Content = r.Error.Error()
1872			baseResult.IsError = true
1873		}
1874	case fantasy.ToolResultContentTypeMedia:
1875		if r, ok := fantasy.AsToolResultOutputType[fantasy.ToolResultOutputContentMedia](result.Result); ok {
1876			if !stringext.IsValidBase64(r.Data) {
1877				slog.Warn(
1878					"Tool returned media with invalid base64 data, discarding image",
1879					"tool", result.ToolName,
1880					"tool_call_id", result.ToolCallID,
1881				)
1882				baseResult.Content = "Tool returned image data with invalid encoding"
1883				baseResult.IsError = true
1884			} else {
1885				content := r.Text
1886				if content == "" {
1887					content = fmt.Sprintf("Loaded %s content", r.MediaType)
1888				}
1889				baseResult.Content = content
1890				baseResult.Data = r.Data
1891				baseResult.MIMEType = r.MediaType
1892			}
1893		}
1894	}
1895
1896	return baseResult
1897}
1898
1899// workaroundProviderMediaLimitations converts media content in tool results to
1900// user messages for providers that don't natively support images in tool results.
1901//
1902// Problem: OpenAI, Google, OpenRouter, and other OpenAI-compatible providers
1903// don't support sending images/media in tool result messages - they only accept
1904// text in tool results. However, they DO support images in user messages.
1905//
1906// If we send media in tool results to these providers, the API returns an error.
1907//
1908// Solution: For these providers, we:
1909//  1. Replace the media in the tool result with a text placeholder
1910//  2. Inject a user message immediately after with the image as a file attachment
1911//  3. This maintains the tool execution flow while working around API limitations
1912//
1913// Anthropic and Bedrock support images natively in tool results, so we skip
1914// this workaround for them.
1915//
1916// Example transformation:
1917//
1918//	BEFORE: [tool result: image data]
1919//	AFTER:  [tool result: "Image loaded - see attached"], [user: image attachment]
1920func (a *sessionAgent) workaroundProviderMediaLimitations(messages []fantasy.Message, largeModel Model) []fantasy.Message {
1921	providerSupportsMedia := largeModel.ModelCfg.Provider == string(catwalk.InferenceProviderAnthropic) ||
1922		largeModel.ModelCfg.Provider == string(catwalk.InferenceProviderBedrock) ||
1923		largeModel.ModelCfg.Provider == string(catwalk.InferenceProviderBedrockEurope)
1924
1925	if providerSupportsMedia {
1926		return messages
1927	}
1928
1929	convertedMessages := make([]fantasy.Message, 0, len(messages))
1930
1931	for _, msg := range messages {
1932		if msg.Role != fantasy.MessageRoleTool {
1933			convertedMessages = append(convertedMessages, msg)
1934			continue
1935		}
1936
1937		textParts := make([]fantasy.MessagePart, 0, len(msg.Content))
1938		var mediaFiles []fantasy.FilePart
1939
1940		for _, part := range msg.Content {
1941			toolResult, ok := fantasy.AsMessagePart[fantasy.ToolResultPart](part)
1942			if !ok {
1943				textParts = append(textParts, part)
1944				continue
1945			}
1946
1947			if media, ok := fantasy.AsToolResultOutputType[fantasy.ToolResultOutputContentMedia](toolResult.Output); ok {
1948				decoded, err := base64.StdEncoding.DecodeString(media.Data)
1949				if err != nil {
1950					slog.Warn("Failed to decode media data", "error", err)
1951					textParts = append(textParts, part)
1952					continue
1953				}
1954
1955				mediaFiles = append(mediaFiles, fantasy.FilePart{
1956					Data:      decoded,
1957					MediaType: media.MediaType,
1958					Filename:  fmt.Sprintf("tool-result-%s", toolResult.ToolCallID),
1959				})
1960
1961				textParts = append(textParts, fantasy.ToolResultPart{
1962					ToolCallID: toolResult.ToolCallID,
1963					Output: fantasy.ToolResultOutputContentText{
1964						Text: "[Image/media content loaded - see attached file]",
1965					},
1966					ProviderOptions: toolResult.ProviderOptions,
1967				})
1968			} else {
1969				textParts = append(textParts, part)
1970			}
1971		}
1972
1973		convertedMessages = append(convertedMessages, fantasy.Message{
1974			Role:    fantasy.MessageRoleTool,
1975			Content: textParts,
1976		})
1977
1978		if len(mediaFiles) > 0 {
1979			convertedMessages = append(convertedMessages, fantasy.NewUserMessage(
1980				"Here is the media content from the tool result:",
1981				mediaFiles...,
1982			))
1983		}
1984	}
1985
1986	return convertedMessages
1987}
1988
1989// buildSummaryPrompt constructs the prompt text for session summarization.
1990func buildSummaryPrompt(todos []session.Todo) string {
1991	var sb strings.Builder
1992	sb.WriteString("Provide a detailed summary of our conversation above.")
1993	if len(todos) > 0 {
1994		sb.WriteString("\n\n## Current Todo List\n\n")
1995		for _, t := range todos {
1996			fmt.Fprintf(&sb, "- [%s] %s\n", t.Status, t.Content)
1997		}
1998		sb.WriteString("\nInclude these tasks and their statuses in your summary. ")
1999		sb.WriteString("Instruct the resuming assistant to use the `todos` tool to continue tracking progress on these tasks.")
2000	}
2001	return sb.String()
2002}
2003
2004func providerRetryLogFields(err *fantasy.ProviderError, delay time.Duration) []any {
2005	fields := []any{
2006		"retry_delay", delay.String(),
2007	}
2008	if err == nil {
2009		return fields
2010	}
2011	fields = append(fields, "status_code", err.StatusCode)
2012	if err.Title != "" {
2013		fields = append(fields, "title", err.Title)
2014	}
2015	if err.Message != "" {
2016		fields = append(fields, "message", err.Message)
2017	}
2018	return fields
2019}