agent.go

   1// Package agent is the core orchestration layer for Crush AI agents.
   2//
   3// It provides session-based AI agent functionality for managing
   4// conversations, tool execution, and message handling. It coordinates
   5// interactions between language models, messages, sessions, and tools while
   6// handling features like automatic summarization, queuing, and token
   7// management.
   8package agent
   9
  10import (
  11	"cmp"
  12	"context"
  13	_ "embed"
  14	"encoding/base64"
  15	"errors"
  16	"fmt"
  17	"log/slog"
  18	"net/http"
  19	"os"
  20	"regexp"
  21	"strconv"
  22	"strings"
  23	"sync"
  24	"sync/atomic"
  25	"time"
  26
  27	"charm.land/catwalk/pkg/catwalk"
  28	"charm.land/fantasy"
  29	"charm.land/fantasy/providers/anthropic"
  30	"charm.land/fantasy/providers/bedrock"
  31	"charm.land/fantasy/providers/google"
  32	"charm.land/fantasy/providers/openai"
  33	"charm.land/fantasy/providers/openrouter"
  34	"charm.land/fantasy/providers/vercel"
  35	"charm.land/lipgloss/v2"
  36	"github.com/charmbracelet/crush/internal/agent/hyper"
  37	"github.com/charmbracelet/crush/internal/agent/notify"
  38	"github.com/charmbracelet/crush/internal/agent/tools"
  39	"github.com/charmbracelet/crush/internal/agent/tools/mcp"
  40	"github.com/charmbracelet/crush/internal/config"
  41	"github.com/charmbracelet/crush/internal/csync"
  42	"github.com/charmbracelet/crush/internal/message"
  43	"github.com/charmbracelet/crush/internal/pubsub"
  44	"github.com/charmbracelet/crush/internal/session"
  45	"github.com/charmbracelet/crush/internal/stringext"
  46	"github.com/charmbracelet/crush/internal/version"
  47	"github.com/charmbracelet/x/exp/charmtone"
  48)
  49
  50const (
  51	DefaultSessionName = "Untitled Session"
  52
  53	// Constants for auto-summarization thresholds
  54	largeContextWindowThreshold = 200_000
  55	largeContextWindowBuffer    = 20_000
  56	smallContextWindowRatio     = 0.2
  57)
  58
  59var userAgent = fmt.Sprintf("Charm-Crush/%s (https://charm.land/crush)", version.Version)
  60
  61//go:embed templates/title.md
  62var titlePrompt []byte
  63
  64//go:embed templates/summary.md
  65var summaryPrompt []byte
  66
  67// Used to remove <think> tags from generated titles.
  68var (
  69	thinkTagRegex       = regexp.MustCompile(`(?s)<think>.*?</think>`)
  70	orphanThinkTagRegex = regexp.MustCompile(`</?think>`)
  71)
  72
  73type SessionAgentCall struct {
  74	SessionID string
  75	// RunID, when non-empty, is the caller-supplied correlator that
  76	// gets echoed back on the notify.RunComplete event emitted for
  77	// this turn. It is preserved when the call is enqueued behind a
  78	// busy session so the queued turn's terminal event is still
  79	// recognisable to the original caller. Callers that need a
  80	// reliable completion contract (e.g. `crush run` against a
  81	// session that may be busy) MUST set it; SessionID alone is
  82	// ambiguous when concurrent turns share the same session.
  83	RunID            string
  84	Prompt           string
  85	ProviderOptions  fantasy.ProviderOptions
  86	Attachments      []message.Attachment
  87	MaxOutputTokens  int64
  88	Temperature      *float64
  89	TopP             *float64
  90	TopK             *int64
  91	FrequencyPenalty *float64
  92	PresencePenalty  *float64
  93	NonInteractive   bool
  94	// OnComplete, when non-nil, replaces the default RunComplete
  95	// publish path: the inner Run hands the terminal payload to this
  96	// callback instead of emitting it on the RunComplete broker. The
  97	// coordinator uses this hook to coalesce the unauthorized →
  98	// re-auth → retry chain into a single user-visible terminal
  99	// event, so non-interactive clients (e.g. `crush run`) don't
 100	// exit on a stale failed-attempt RunComplete before the
 101	// successful retry. It is intentionally stripped when queueing
 102	// a busy-session call (see Run): the originating
 103	// coordinator.Run has long returned by the time the queued
 104	// recursion drains, so falling back to the default broker
 105	// publish keeps the event visible to subscribers.
 106	OnComplete func(notify.RunComplete)
 107	// Accepted, when non-nil, is the accept reservation taken by
 108	// BeginAccepted before the call was dispatched onto a goroutine
 109	// (the client/server fire-and-forget path). Run consumes it under
 110	// dispatchMu[SessionID] once the accepted -> (cancel-on-entry |
 111	// queued | active) transition has been chosen. When nil
 112	// (in-process / local callers like AppWorkspace), behavior is
 113	// unchanged and no accept tracking applies.
 114	Accepted *AcceptedRun
 115	// acceptSeq carries the accept sequence of the handle that produced
 116	// this call after it has been enqueued and its Accepted handle
 117	// stripped. The queue-drain paths compare it against a session's
 118	// cancel mark so a follow-up queued before a cancel is dropped while
 119	// one queued after the cancel survives. 0 means untracked (an
 120	// in-process enqueue with no accept reservation), which the drain
 121	// paths treat as covered by any present mark, preserving the
 122	// pre-sequence behavior.
 123	acceptSeq uint64
 124}
 125
 126type SessionAgent interface {
 127	Run(context.Context, SessionAgentCall) (*fantasy.AgentResult, error)
 128	BeginAccepted(sessionID string) *AcceptedRun
 129	SetModels(large Model, small Model)
 130	SetTools(tools []fantasy.AgentTool)
 131	SetSystemPrompt(systemPrompt string)
 132	Cancel(sessionID string)
 133	CancelAll()
 134	IsSessionBusy(sessionID string) bool
 135	IsBusy() bool
 136	QueuedPrompts(sessionID string) int
 137	QueuedPromptsList(sessionID string) []string
 138	ClearQueue(sessionID string)
 139	Summarize(context.Context, string, fantasy.ProviderOptions) error
 140	Model() Model
 141}
 142
 143type Model struct {
 144	Model      fantasy.LanguageModel
 145	CatwalkCfg catwalk.Model
 146	ModelCfg   config.SelectedModel
 147	FlatRate   bool
 148}
 149
 150type sessionAgent struct {
 151	largeModel         *csync.Value[Model]
 152	smallModel         *csync.Value[Model]
 153	systemPromptPrefix *csync.Value[string]
 154	systemPrompt       *csync.Value[string]
 155	tools              *csync.Slice[fantasy.AgentTool]
 156
 157	isSubAgent           bool
 158	sessions             session.Service
 159	messages             message.Service
 160	disableAutoSummarize bool
 161	isYolo               bool
 162	notify               pubsub.Publisher[notify.Notification]
 163	runComplete          pubsub.Publisher[notify.RunComplete]
 164
 165	messageQueue   *csync.Map[string, []SessionAgentCall]
 166	activeRequests *csync.Map[string, context.CancelFunc]
 167
 168	// dispatchMu holds a per-session mutex that serializes the
 169	// accepted -> (cancel-on-entry | queued | active) transition in
 170	// Run against a concurrent Cancel. The lock is held only during
 171	// the brief handoff (no DB or LLM I/O under the lock).
 172	dispatchMu *csync.Map[string, *sync.Mutex]
 173	// acceptedRuns counts dispatched-but-not-yet-active runs per
 174	// session. A counter > 0 means a dispatched prompt is in flight
 175	// and has not yet completed the dispatch handoff in Run. Only
 176	// BeginAccepted increments it; only AcceptedRun.Close decrements
 177	// it.
 178	acceptedRuns *csync.Map[string, int]
 179	// cancelMark records, per session, a high-water accept sequence: an
 180	// accepted handle is canceled by it iff the handle's sequence is at
 181	// or below the mark. Cancel raises the mark to the latest sequence
 182	// assigned at cancel time, so a single Cancel covers every prompt
 183	// accepted-but-not-yet-active then, while a prompt accepted later
 184	// (higher sequence) is never poisoned. Absent or 0 means no pending
 185	// cancel. It is only raised by Cancel when acceptedRuns > 0, so an
 186	// idle Escape never records a mark.
 187	cancelMark *csync.Map[string, uint64]
 188	// dispatchMuCreate guards lazy creation of per-session entries in
 189	// dispatchMu so two goroutines can't race to lock different mutex
 190	// instances for the same session.
 191	dispatchMuCreate sync.Mutex
 192	// acceptedMu serializes increments/decrements of acceptedRuns and
 193	// the assignment of accept sequence numbers from acceptSeqGen. It
 194	// is separate from dispatchMu so AcceptedRun.Close (which may run
 195	// while Run holds dispatchMu for the same session) does not
 196	// deadlock by re-entering the dispatch lock.
 197	acceptedMu sync.Mutex
 198	// acceptSeqGen is the monotonic source of accept sequence numbers.
 199	// Each BeginAccepted increments it under acceptedMu and stamps the
 200	// returned handle, so sequences strictly increase in accept order
 201	// across the agent. Cancel uses its current value as the per-session
 202	// high-water mark.
 203	acceptSeqGen uint64
 204}
 205
 206type SessionAgentOptions struct {
 207	LargeModel           Model
 208	SmallModel           Model
 209	SystemPromptPrefix   string
 210	SystemPrompt         string
 211	IsSubAgent           bool
 212	DisableAutoSummarize bool
 213	IsYolo               bool
 214	Sessions             session.Service
 215	Messages             message.Service
 216	Tools                []fantasy.AgentTool
 217	Notify               pubsub.Publisher[notify.Notification]
 218	RunComplete          pubsub.Publisher[notify.RunComplete]
 219}
 220
 221func NewSessionAgent(
 222	opts SessionAgentOptions,
 223) SessionAgent {
 224	return &sessionAgent{
 225		largeModel:           csync.NewValue(opts.LargeModel),
 226		smallModel:           csync.NewValue(opts.SmallModel),
 227		systemPromptPrefix:   csync.NewValue(opts.SystemPromptPrefix),
 228		systemPrompt:         csync.NewValue(opts.SystemPrompt),
 229		isSubAgent:           opts.IsSubAgent,
 230		sessions:             opts.Sessions,
 231		messages:             opts.Messages,
 232		disableAutoSummarize: opts.DisableAutoSummarize,
 233		tools:                csync.NewSliceFrom(opts.Tools),
 234		isYolo:               opts.IsYolo,
 235		notify:               opts.Notify,
 236		runComplete:          opts.RunComplete,
 237		messageQueue:         csync.NewMap[string, []SessionAgentCall](),
 238		activeRequests:       csync.NewMap[string, context.CancelFunc](),
 239		dispatchMu:           csync.NewMap[string, *sync.Mutex](),
 240		acceptedRuns:         csync.NewMap[string, int](),
 241		cancelMark:           csync.NewMap[string, uint64](),
 242	}
 243}
 244
 245// AcceptedRun owns exactly one accept reservation taken by
 246// BeginAccepted. It is the only carrier of accept-state across the
 247// backend.runAgent / Coordinator.Run / sessionAgent.Run layers: a
 248// counter > 0 means a dispatched prompt is in flight and has not yet
 249// completed the dispatch handoff in Run. Close is the only way to
 250// release the reservation and is idempotent.
 251type AcceptedRun struct {
 252	agent     *sessionAgent
 253	sessionID string
 254	// seq is the monotonic accept sequence stamped by BeginAccepted. A
 255	// cancel covers this handle iff seq is at or below the session's
 256	// cancel mark, so a handle accepted after a cancel (higher seq) is
 257	// never poisoned by it.
 258	seq  uint64
 259	done atomic.Bool
 260}
 261
 262// Close decrements the accept counter for this reservation. It is safe
 263// to call multiple times; only the first call has effect.
 264func (r *AcceptedRun) Close() {
 265	if r == nil {
 266		return
 267	}
 268	if !r.done.CompareAndSwap(false, true) {
 269		return
 270	}
 271	r.agent.endAccepted(r.sessionID)
 272}
 273
 274// SessionID exposes the session this reservation is for so the run path
 275// can use it without an extra parameter.
 276func (r *AcceptedRun) SessionID() string {
 277	if r == nil {
 278		return ""
 279	}
 280	return r.sessionID
 281}
 282
 283// BeginAccepted increments the accept counter for sessionID and returns
 284// a handle whose Close is the only way to decrement it. It is the only
 285// entry point that mutates acceptedRuns.
 286func (a *sessionAgent) BeginAccepted(sessionID string) *AcceptedRun {
 287	a.acceptedMu.Lock()
 288	defer a.acceptedMu.Unlock()
 289	count, _ := a.acceptedRuns.Get(sessionID)
 290	a.acceptedRuns.Set(sessionID, count+1)
 291	a.acceptSeqGen++
 292	return &AcceptedRun{agent: a, sessionID: sessionID, seq: a.acceptSeqGen}
 293}
 294
 295// endAccepted decrements the accept counter for sessionID. It is only
 296// called via AcceptedRun.Close. It uses a dedicated lock (not the
 297// per-session dispatch mutex) so it can run while Run holds dispatchMu
 298// for the same session without deadlocking.
 299//
 300// When the count reaches zero the session's cancel mark is dropped: no
 301// accepted handle remains for it to cover, and any handle accepted later
 302// gets a strictly higher sequence that the mark would not match anyway.
 303// Handles canceled on entry never reach RunComplete, so this is the only
 304// place that clears the mark for an all-canceled batch. Sibling handles
 305// covered by the same mark are serialized on the per-session dispatch
 306// mutex and read the mark before they Close, so this never clears it out
 307// from under a covered handle still waiting to enter Run.
 308func (a *sessionAgent) endAccepted(sessionID string) {
 309	a.acceptedMu.Lock()
 310	defer a.acceptedMu.Unlock()
 311	count, ok := a.acceptedRuns.Get(sessionID)
 312	if !ok || count <= 1 {
 313		a.acceptedRuns.Del(sessionID)
 314		a.cancelMark.Del(sessionID)
 315		return
 316	}
 317	a.acceptedRuns.Set(sessionID, count-1)
 318}
 319
 320// sessionMu returns the per-session dispatch mutex, creating it on first
 321// use. Creation is guarded so concurrent callers always observe the same
 322// mutex instance for a given session.
 323func (a *sessionAgent) sessionMu(sessionID string) *sync.Mutex {
 324	if mu, ok := a.dispatchMu.Get(sessionID); ok {
 325		return mu
 326	}
 327	a.dispatchMuCreate.Lock()
 328	defer a.dispatchMuCreate.Unlock()
 329	if mu, ok := a.dispatchMu.Get(sessionID); ok {
 330		return mu
 331	}
 332	mu := &sync.Mutex{}
 333	a.dispatchMu.Set(sessionID, mu)
 334	return mu
 335}
 336
 337// enqueueCall appends call to the session's message queue. The
 338// OnComplete hook is stripped: the caller that supplied it (typically
 339// coordinator.Run) has its own retry/coalesce scope that ends when it
 340// returns, so by the time the queue drains nobody is left to consume the
 341// buffered terminal event. The recursive Run falls back to the default
 342// broker publish, which is what existing subscribers expect for queued
 343// turns.
 344func (a *sessionAgent) enqueueCall(call SessionAgentCall) {
 345	existing, ok := a.messageQueue.Get(call.SessionID)
 346	if !ok {
 347		existing = []SessionAgentCall{}
 348	}
 349	queued := call
 350	if call.Accepted != nil {
 351		// Preserve the accept sequence after the handle is stripped so
 352		// the queue-drain paths can tell a follow-up queued before a
 353		// cancel (covered by the mark) from one queued after it.
 354		queued.acceptSeq = call.Accepted.seq
 355	}
 356	queued.OnComplete = nil
 357	queued.Accepted = nil
 358	existing = append(existing, queued)
 359	a.messageQueue.Set(call.SessionID, existing)
 360}
 361
 362// clearPendingCancel removes any pending-cancel mark for sessionID. It
 363// takes the per-session dispatch lock so it is ordered against Cancel
 364// and the dispatch handoff.
 365func (a *sessionAgent) clearPendingCancel(sessionID string) {
 366	mu := a.sessionMu(sessionID)
 367	mu.Lock()
 368	defer mu.Unlock()
 369	a.cancelMark.Del(sessionID)
 370}
 371
 372// canceledBySeq reports whether an accepted handle or queued call with
 373// the given accept sequence is covered by a pending cancel for the
 374// session. Callers must hold the session's dispatch mutex. A tracked
 375// sequence (seq > 0) is covered only when it is at or below the cancel
 376// high-water mark, so a prompt accepted after the cancel (higher seq) is
 377// never poisoned. An untracked sequence (seq == 0, an in-process enqueue
 378// with no accept reservation) is covered whenever any mark is present,
 379// preserving the pre-sequence behavior. The mark is not consumed: it
 380// stays so every sibling handle it covers observes the same cancel, and
 381// a later handle (higher seq) ignores it regardless.
 382func (a *sessionAgent) canceledBySeq(sessionID string, seq uint64) bool {
 383	mark, ok := a.cancelMark.Get(sessionID)
 384	if !ok || mark == 0 {
 385		return false
 386	}
 387	return seq == 0 || seq <= mark
 388}
 389
 390// persistCanceledTurn writes the user/assistant records for a turn that
 391// was canceled before (or just as) streaming would have produced them.
 392// It creates the user message only when it was not already created by an
 393// earlier createUserMessage call (userMsgCreated), then writes an
 394// assistant message with FinishReasonCanceled. Both writes use
 395// context.WithoutCancel(ctx) so workspace shutdown (which cancels the run
 396// context) can't drop them.
 397func (a *sessionAgent) persistCanceledTurn(ctx context.Context, call SessionAgentCall, userMsgCreated bool) error {
 398	writeCtx, cancel := context.WithTimeout(context.WithoutCancel(ctx), 5*time.Second)
 399	defer cancel()
 400	if !userMsgCreated {
 401		if _, err := a.createUserMessage(writeCtx, call); err != nil {
 402			return err
 403		}
 404	}
 405	largeModel := a.largeModel.Get()
 406	assistant, err := a.messages.Create(writeCtx, call.SessionID, message.CreateMessageParams{
 407		Role:     message.Assistant,
 408		Parts:    []message.ContentPart{},
 409		Model:    largeModel.ModelCfg.Model,
 410		Provider: largeModel.ModelCfg.Provider,
 411	})
 412	if err != nil {
 413		return err
 414	}
 415	assistant.AddFinish(message.FinishReasonCanceled, "User canceled request", "")
 416	return a.messages.Update(writeCtx, assistant)
 417}
 418
 419// publishRunComplete emits the authoritative terminal event for a turn.
 420// It honors the per-call OnComplete hook when set (so the coordinator can
 421// coalesce retries) and otherwise falls back to the RunComplete broker.
 422// ctx is used only for the bounded-blocking must-deliver publish; the
 423// terminal payload is supplied by the caller. This is the single emit path
 424// shared by the streaming defer and the cancel-on-entry early return so a
 425// caller waiting on RunComplete (e.g. `crush run` with a RunID) always
 426// observes exactly one terminal event regardless of which Run branch ends
 427// the turn.
 428func (a *sessionAgent) publishRunComplete(ctx context.Context, call SessionAgentCall, complete notify.RunComplete) {
 429	if call.OnComplete != nil {
 430		call.OnComplete(complete)
 431		return
 432	}
 433	if a.runComplete == nil {
 434		return
 435	}
 436	a.runComplete.PublishMustDeliver(ctx, pubsub.UpdatedEvent, complete)
 437}
 438
 439// ValidateCall performs the cheap structural validation that
 440// sessionAgent.Run requires before a call can be dispatched: a call must
 441// carry either a non-empty prompt or a text attachment, and it must name a
 442// session. It is exported so callers that accept a run before dispatching it
 443// (e.g. backend.SendMessage) can apply the same checks and keep the error
 444// contract consistent.
 445func ValidateCall(call SessionAgentCall) error {
 446	if call.Prompt == "" && !message.ContainsTextAttachment(call.Attachments) {
 447		return ErrEmptyPrompt
 448	}
 449	if call.SessionID == "" {
 450		return ErrSessionMissing
 451	}
 452	return nil
 453}
 454
 455func (a *sessionAgent) Run(ctx context.Context, call SessionAgentCall) (result *fantasy.AgentResult, retErr error) {
 456	if err := ValidateCall(call); err != nil {
 457		return nil, err
 458	}
 459
 460	// genCtx/cancel are the run context and its cancel func. For the
 461	// accepted (fire-and-forget) dispatch path they are created under
 462	// dispatchMu below so a concurrent Cancel can observe the
 463	// activeRequests entry before the assistant message exists. For
 464	// the in-process path they stay nil here and are created later,
 465	// preserving the original ordering.
 466	var (
 467		genCtx           context.Context
 468		cancel           context.CancelFunc
 469		activeRegistered bool
 470		userMsgCreated   bool
 471	)
 472
 473	if call.Accepted != nil {
 474		// Serialize the accepted -> (cancel-on-entry | queued |
 475		// active) transition against a concurrent Cancel. Cancel takes
 476		// the same per-session lock, so every cancel observes at least
 477		// one of: a cancel mark, an activeRequests entry, or a
 478		// messageQueue entry it then clears.
 479		mu := a.sessionMu(call.SessionID)
 480		mu.Lock()
 481
 482		if a.canceledBySeq(call.SessionID, call.Accepted.seq) {
 483			// Cancel-on-entry: a cancel arrived while this run was
 484			// dispatched but not yet active, and this handle's accept
 485			// sequence is at or below the session's cancel mark. The
 486			// mark is left in place so sibling handles it also covers
 487			// observe the same cancel; release the accept reservation,
 488			// drop the lock, and persist a canceled turn without
 489			// entering Stream.
 490			//
 491			// This path returns before the streaming defer that
 492			// publishes RunComplete is installed, so emit the terminal
 493			// event explicitly. Without it, a caller waiting on
 494			// RunComplete for this RunID (e.g. `crush run`, which
 495			// ignores message events and blocks on RunComplete) would
 496			// hang on an immediately-canceled accepted run.
 497			call.Accepted.Close()
 498			mu.Unlock()
 499			complete := notify.RunComplete{
 500				SessionID: call.SessionID,
 501				RunID:     call.RunID,
 502				Cancelled: true,
 503			}
 504			if err := a.persistCanceledTurn(ctx, call, false); err != nil {
 505				complete.Error = err.Error()
 506				a.publishRunComplete(ctx, call, complete)
 507				return nil, err
 508			}
 509			a.publishRunComplete(ctx, call, complete)
 510			return nil, nil
 511		}
 512
 513		if a.IsSessionBusy(call.SessionID) {
 514			// Busy: an earlier prompt is active. Queue this call and
 515			// release the accept reservation. A Cancel arriving after
 516			// this point sees the active entry and clears the queue.
 517			a.enqueueCall(call)
 518			call.Accepted.Close()
 519			mu.Unlock()
 520			return nil, nil
 521		}
 522
 523		// Idle: become the active run. Register the cancel func before
 524		// dropping the lock so a Cancel that arrives between here and
 525		// assistant creation is not lost.
 526		runCtx := context.WithValue(ctx, tools.SessionIDContextKey, call.SessionID)
 527		genCtx, cancel = context.WithCancel(runCtx)
 528		a.activeRequests.Set(call.SessionID, cancel)
 529		activeRegistered = true
 530		call.Accepted.Close()
 531		mu.Unlock()
 532
 533		defer cancel()
 534		defer a.activeRequests.Del(call.SessionID)
 535	} else if a.IsSessionBusy(call.SessionID) {
 536		// Queue the message if busy. Strip OnComplete: the caller that
 537		// supplied the hook (typically coordinator.Run) has its own
 538		// retry/coalesce scope that ends when it returns, so by the time
 539		// the queue drains nobody is left to consume the buffered
 540		// terminal event. The recursive Run will fall back to the
 541		// default broker publish, which is what existing subscribers
 542		// expect for queued turns.
 543		a.enqueueCall(call)
 544		return nil, nil
 545	}
 546
 547	// Copy mutable fields under lock to avoid races with SetTools/SetModels.
 548	agentTools := a.tools.Copy()
 549	largeModel := a.largeModel.Get()
 550	systemPrompt := a.systemPrompt.Get()
 551	promptPrefix := a.systemPromptPrefix.Get()
 552	var instructions strings.Builder
 553
 554	for _, server := range mcp.GetStates() {
 555		if server.State != mcp.StateConnected {
 556			continue
 557		}
 558		if s := server.Client.InitializeResult().Instructions; s != "" {
 559			instructions.WriteString(s)
 560			instructions.WriteString("\n\n")
 561		}
 562	}
 563
 564	if s := instructions.String(); s != "" {
 565		systemPrompt += "\n\n<mcp-instructions>\n" + s + "\n</mcp-instructions>"
 566	}
 567
 568	if len(agentTools) > 0 {
 569		// Add Anthropic caching to the last tool.
 570		agentTools[len(agentTools)-1].SetProviderOptions(a.getCacheControlOptions())
 571	}
 572
 573	agent := fantasy.NewAgent(
 574		largeModel.Model,
 575		fantasy.WithSystemPrompt(systemPrompt),
 576		fantasy.WithTools(agentTools...),
 577		fantasy.WithUserAgent(userAgent),
 578	)
 579
 580	sessionLock := sync.Mutex{}
 581	currentSession, err := a.sessions.Get(ctx, call.SessionID)
 582	if err != nil {
 583		return nil, fmt.Errorf("failed to get session: %w", err)
 584	}
 585
 586	msgs, err := a.getSessionMessages(ctx, currentSession)
 587	if err != nil {
 588		return nil, fmt.Errorf("failed to get session messages: %w", err)
 589	}
 590
 591	var wg sync.WaitGroup
 592	// Generate title if first message.
 593	if len(msgs) == 0 {
 594		titleCtx := ctx // Copy to avoid race with ctx reassignment below.
 595		wg.Go(func() {
 596			a.generateTitle(titleCtx, call.SessionID, call.Prompt)
 597		})
 598	}
 599	defer wg.Wait()
 600
 601	// Add the user message to the session.
 602	_, err = a.createUserMessage(ctx, call)
 603	if err != nil {
 604		return nil, err
 605	}
 606	userMsgCreated = true
 607
 608	// Add the session to the context.
 609	ctx = context.WithValue(ctx, tools.SessionIDContextKey, call.SessionID)
 610
 611	// For the accepted dispatch path the run context and cancel func
 612	// were already created and registered under dispatchMu above; reuse
 613	// them. For the in-process path create them here, preserving the
 614	// original ordering.
 615	if !activeRegistered {
 616		genCtx, cancel = context.WithCancel(ctx)
 617		a.activeRequests.Set(call.SessionID, cancel)
 618
 619		defer cancel()
 620		defer a.activeRequests.Del(call.SessionID)
 621	}
 622	// skipRunComplete is set just before the queued-recursion path so
 623	// the outer Run doesn't publish a RunComplete that would race
 624	// with — and be superseded by — the recursive call's own
 625	// RunComplete (each queued user prompt is its own turn and
 626	// publishes exactly one terminal event).
 627	var skipRunComplete bool
 628	// currentAssistant is declared here so the deferred RunComplete
 629	// publish below can capture the pointer that PrepareStep will
 630	// later (re)assign for each streaming step. The final assistant
 631	// message of the turn is the value reachable through this
 632	// pointer when the defer runs.
 633	var currentAssistant *message.Message
 634	// Drain any debounced message updates before returning. message.Service
 635	// already flushes synchronously on terminal updates, but a defer here
 636	// guarantees the contract at every Run exit (success, error, panic
 637	// recovery upstream) without callers needing to know.
 638	//
 639	// After the flush completes — meaning all per-message
 640	// Publish(UpdatedEvent) calls have fired and been buffered into
 641	// every subscriber's channel — publish the authoritative
 642	// RunComplete event for this turn. The flush-then-publish order
 643	// gives well-behaved clients the best chance of seeing the final
 644	// message event before RunComplete; the embedded Text field
 645	// reconciles for clients that observe the events out of order
 646	// (the pubsub broker fan-in does not serialize publishes from
 647	// different upstream brokers).
 648	defer func() {
 649		// Use a context detached from the run context: workspace
 650		// shutdown cancels ctx before this goroutine returns, but the
 651		// buffered streaming deltas must still land before the DB is
 652		// closed. A short timeout bounds the flush.
 653		flushCtx, flushCancel := context.WithTimeout(context.WithoutCancel(ctx), 5*time.Second)
 654		defer flushCancel()
 655		if flushErr := a.messages.FlushAll(flushCtx); flushErr != nil {
 656			slog.Error("Failed to flush pending message updates after run", "error", flushErr)
 657		}
 658		if skipRunComplete {
 659			return
 660		}
 661		complete := notify.RunComplete{SessionID: call.SessionID, RunID: call.RunID}
 662		if currentAssistant != nil {
 663			complete.MessageID = currentAssistant.ID
 664			complete.Text = currentAssistant.Content().String()
 665		}
 666		if retErr != nil {
 667			complete.Error = retErr.Error()
 668			complete.Cancelled = errors.Is(retErr, context.Canceled)
 669		} else if ctx.Err() != nil {
 670			complete.Cancelled = true
 671		}
 672		// Prefer the per-call hook when supplied so the coordinator
 673		// can coalesce retries (e.g. unauthorized → re-auth → retry)
 674		// into a single user-visible terminal event. The fallback
 675		// must-deliver publish applies bounded-blocking semantics to
 676		// the authoritative terminal event so a momentarily-full
 677		// subscriber channel can't silently drop it and hang
 678		// non-interactive clients waiting on RunComplete.
 679		a.publishRunComplete(ctx, call, complete)
 680	}()
 681
 682	history, files := a.preparePrompt(msgs, largeModel.CatwalkCfg.SupportsImages, call.Attachments...)
 683
 684	startTime := time.Now()
 685	a.eventPromptSent(call.SessionID)
 686
 687	var stepMessages []fantasy.Message
 688	var shouldSummarize bool
 689	// Don't send MaxOutputTokens if 0 — some providers (e.g. LM Studio) reject it
 690	var maxOutputTokens *int64
 691	if call.MaxOutputTokens > 0 {
 692		maxOutputTokens = &call.MaxOutputTokens
 693	}
 694	result, err = agent.Stream(genCtx, fantasy.AgentStreamCall{
 695		Prompt:           message.PromptWithTextAttachments(call.Prompt, call.Attachments),
 696		Files:            files,
 697		Messages:         history,
 698		ProviderOptions:  call.ProviderOptions,
 699		MaxOutputTokens:  maxOutputTokens,
 700		TopP:             call.TopP,
 701		Temperature:      call.Temperature,
 702		PresencePenalty:  call.PresencePenalty,
 703		TopK:             call.TopK,
 704		FrequencyPenalty: call.FrequencyPenalty,
 705		PrepareStep: func(callContext context.Context, options fantasy.PrepareStepFunctionOptions) (_ context.Context, prepared fantasy.PrepareStepResult, err error) {
 706			prepared.Messages = options.Messages
 707			for i := range prepared.Messages {
 708				prepared.Messages[i].ProviderOptions = nil
 709			}
 710
 711			// Use latest tools (updated by SetTools when MCP tools change).
 712			prepared.Tools = a.tools.Copy()
 713
 714			// Drain queued follow-up prompts, but skip any covered by a
 715			// cancel recorded while they sat in the queue: a cancel that
 716			// arrived after a prompt was queued must not let it run as
 717			// part of this step. Coverage is per-call by accept sequence
 718			// so a follow-up queued after the cancel (higher seq) is
 719			// still folded in.
 720			dispatchLock := a.sessionMu(call.SessionID)
 721			dispatchLock.Lock()
 722			queuedCalls, _ := a.messageQueue.Get(call.SessionID)
 723			a.messageQueue.Del(call.SessionID)
 724			dispatchLock.Unlock()
 725			for _, queued := range queuedCalls {
 726				if a.canceledBySeq(call.SessionID, queued.acceptSeq) {
 727					continue
 728				}
 729				userMessage, createErr := a.createUserMessage(callContext, queued)
 730				if createErr != nil {
 731					return callContext, prepared, createErr
 732				}
 733				prepared.Messages = append(prepared.Messages, userMessage.ToAIMessage()...)
 734			}
 735
 736			prepared.Messages = a.workaroundProviderMediaLimitations(prepared.Messages, largeModel)
 737
 738			lastSystemRoleInx := 0
 739			systemMessageUpdated := false
 740			for i, msg := range prepared.Messages {
 741				// Only add cache control to the last message.
 742				if msg.Role == fantasy.MessageRoleSystem {
 743					lastSystemRoleInx = i
 744				} else if !systemMessageUpdated {
 745					prepared.Messages[lastSystemRoleInx].ProviderOptions = a.getCacheControlOptions()
 746					systemMessageUpdated = true
 747				}
 748				// Than add cache control to the last 2 messages.
 749				if i > len(prepared.Messages)-3 {
 750					prepared.Messages[i].ProviderOptions = a.getCacheControlOptions()
 751				}
 752			}
 753
 754			if promptPrefix != "" {
 755				prepared.Messages = append([]fantasy.Message{fantasy.NewSystemMessage(promptPrefix)}, prepared.Messages...)
 756			}
 757
 758			sessionLock.Lock()
 759			stepMessages = cloneFantasyMessages(prepared.Messages)
 760			sessionLock.Unlock()
 761
 762			var assistantMsg message.Message
 763			assistantMsg, err = a.messages.Create(callContext, call.SessionID, message.CreateMessageParams{
 764				Role:     message.Assistant,
 765				Parts:    []message.ContentPart{},
 766				Model:    largeModel.ModelCfg.Model,
 767				Provider: largeModel.ModelCfg.Provider,
 768			})
 769			if err != nil {
 770				return callContext, prepared, err
 771			}
 772			callContext = context.WithValue(callContext, tools.MessageIDContextKey, assistantMsg.ID)
 773			callContext = context.WithValue(callContext, tools.SupportsImagesContextKey, largeModel.CatwalkCfg.SupportsImages)
 774			callContext = context.WithValue(callContext, tools.ModelNameContextKey, largeModel.CatwalkCfg.Name)
 775			currentAssistant = &assistantMsg
 776			return callContext, prepared, err
 777		},
 778		OnReasoningStart: func(id string, reasoning fantasy.ReasoningContent) error {
 779			currentAssistant.AppendReasoningContent(reasoning.Text)
 780			return a.messages.Update(genCtx, *currentAssistant)
 781		},
 782		OnReasoningDelta: func(id string, text string) error {
 783			currentAssistant.AppendReasoningContent(text)
 784			return a.messages.Update(genCtx, *currentAssistant)
 785		},
 786		OnReasoningEnd: func(id string, reasoning fantasy.ReasoningContent) error {
 787			// handle anthropic signature
 788			if anthropicData, ok := reasoning.ProviderMetadata[anthropic.Name]; ok {
 789				if reasoning, ok := anthropicData.(*anthropic.ReasoningOptionMetadata); ok {
 790					currentAssistant.AppendReasoningSignature(reasoning.Signature)
 791				}
 792			}
 793			if googleData, ok := reasoning.ProviderMetadata[google.Name]; ok {
 794				if reasoning, ok := googleData.(*google.ReasoningMetadata); ok {
 795					currentAssistant.AppendThoughtSignature(reasoning.Signature, reasoning.ToolID)
 796				}
 797			}
 798			if openaiData, ok := reasoning.ProviderMetadata[openai.Name]; ok {
 799				if reasoning, ok := openaiData.(*openai.ResponsesReasoningMetadata); ok {
 800					currentAssistant.SetReasoningResponsesData(reasoning)
 801				}
 802			}
 803			currentAssistant.FinishThinking()
 804			return a.messages.Update(genCtx, *currentAssistant)
 805		},
 806		OnTextDelta: func(id string, text string) error {
 807			// Strip leading newline from initial text content. This is is
 808			// particularly important in non-interactive mode where leading
 809			// newlines are very visible.
 810			if len(currentAssistant.Parts) == 0 {
 811				text = strings.TrimPrefix(text, "\n")
 812			}
 813
 814			currentAssistant.AppendContent(text)
 815			return a.messages.Update(genCtx, *currentAssistant)
 816		},
 817		OnToolInputStart: func(id string, toolName string) error {
 818			toolCall := message.ToolCall{
 819				ID:               id,
 820				Name:             toolName,
 821				ProviderExecuted: false,
 822				Finished:         false,
 823			}
 824			currentAssistant.AddToolCall(toolCall)
 825			// Use parent ctx instead of genCtx to ensure the update succeeds
 826			// even if the request is canceled mid-stream
 827			return a.messages.Update(ctx, *currentAssistant)
 828		},
 829		OnRetry: func(err *fantasy.ProviderError, delay time.Duration) {
 830			slog.Warn("Provider request failed, retrying", providerRetryLogFields(err, delay)...)
 831		},
 832		OnToolCall: func(tc fantasy.ToolCallContent) error {
 833			toolCall := message.ToolCall{
 834				ID:               tc.ToolCallID,
 835				Name:             tc.ToolName,
 836				Input:            tc.Input,
 837				ProviderExecuted: false,
 838				Finished:         true,
 839			}
 840			currentAssistant.AddToolCall(toolCall)
 841			// Use parent ctx instead of genCtx to ensure the update succeeds
 842			// even if the request is canceled mid-stream
 843			return a.messages.Update(ctx, *currentAssistant)
 844		},
 845		OnToolResult: func(result fantasy.ToolResultContent) error {
 846			toolResult := a.convertToToolResult(result)
 847			// Use parent ctx instead of genCtx to ensure the message is created
 848			// even if the request is canceled mid-stream
 849			_, createMsgErr := a.messages.Create(ctx, currentAssistant.SessionID, message.CreateMessageParams{
 850				Role: message.Tool,
 851				Parts: []message.ContentPart{
 852					toolResult,
 853				},
 854			})
 855			return createMsgErr
 856		},
 857		OnStepFinish: func(stepResult fantasy.StepResult) error {
 858			finishReason := message.FinishReasonUnknown
 859			switch stepResult.FinishReason {
 860			case fantasy.FinishReasonLength:
 861				finishReason = message.FinishReasonMaxTokens
 862			case fantasy.FinishReasonStop:
 863				finishReason = message.FinishReasonEndTurn
 864			case fantasy.FinishReasonToolCalls:
 865				finishReason = message.FinishReasonToolUse
 866			}
 867			// If a tool result halted the turn (e.g. a hook halt or a
 868			// permission denial), the step ends on FinishReasonToolCalls but
 869			// the model will not be called again. Treat it as the end of the
 870			// turn so the UI can render the assistant footer.
 871			if finishReason == message.FinishReasonToolUse {
 872				for _, tr := range stepResult.Content.ToolResults() {
 873					if tr.StopTurn {
 874						finishReason = message.FinishReasonEndTurn
 875						break
 876					}
 877				}
 878			}
 879			currentAssistant.AddFinish(finishReason, "", "")
 880			sessionLock.Lock()
 881			defer sessionLock.Unlock()
 882
 883			updatedSession, getSessionErr := a.sessions.Get(ctx, call.SessionID)
 884			if getSessionErr != nil {
 885				return getSessionErr
 886			}
 887			usage, estimated := fallbackStepUsage(stepMessages, stepResult)
 888			a.updateSessionUsage(largeModel, &updatedSession, usage, a.openrouterCost(stepResult.ProviderMetadata), estimated)
 889			_, sessionErr := a.sessions.Save(ctx, updatedSession)
 890			if sessionErr != nil {
 891				return sessionErr
 892			}
 893			currentSession = updatedSession
 894			return a.messages.Update(genCtx, *currentAssistant)
 895		},
 896		StopWhen: []fantasy.StopCondition{
 897			func(_ []fantasy.StepResult) bool {
 898				cw := int64(largeModel.CatwalkCfg.ContextWindow)
 899				// If context window is unknown (0), skip auto-summarize
 900				// to avoid immediately truncating custom/local models.
 901				if cw == 0 {
 902					return false
 903				}
 904				tokens := currentSession.CompletionTokens + currentSession.PromptTokens
 905				remaining := cw - tokens
 906				var threshold int64
 907				if cw > largeContextWindowThreshold {
 908					threshold = largeContextWindowBuffer
 909				} else {
 910					threshold = int64(float64(cw) * smallContextWindowRatio)
 911				}
 912				if (remaining <= threshold) && !a.disableAutoSummarize {
 913					shouldSummarize = true
 914					return true
 915				}
 916				return false
 917			},
 918			func(steps []fantasy.StepResult) bool {
 919				return hasRepeatedToolCalls(steps, loopDetectionWindowSize, loopDetectionMaxRepeats)
 920			},
 921		},
 922	})
 923
 924	a.eventPromptResponded(call.SessionID, time.Since(startTime).Truncate(time.Second))
 925
 926	if err != nil {
 927		isHyper := largeModel.ModelCfg.Provider == hyper.Name
 928		isCancelErr := errors.Is(err, context.Canceled)
 929		if currentAssistant == nil {
 930			// Cancel-before-assistant-creation window: the run was
 931			// canceled after activeRequests.Set but before PrepareStep
 932			// created the assistant message. Without this, the turn
 933			// would return with no FinishReasonCanceled marker and no
 934			// user-visible record. The user message was already created
 935			// above, so persistCanceledTurn only writes the assistant
 936			// record.
 937			if isCancelErr {
 938				if persistErr := a.persistCanceledTurn(ctx, call, userMsgCreated); persistErr != nil {
 939					return nil, persistErr
 940				}
 941			}
 942			return result, err
 943		}
 944		// Persist final state with a context detached from the run
 945		// context. The run context (ctx) is derived from the
 946		// workspace context, which workspace shutdown cancels before
 947		// agent goroutines finish; using ctx here would drop the
 948		// final assistant state. WithoutCancel keeps the values
 949		// (e.g. session ID) while ignoring cancellation, and a short
 950		// timeout bounds the cleanup writes.
 951		cleanupCtx, cleanupCancel := context.WithTimeout(context.WithoutCancel(ctx), 5*time.Second)
 952		defer cleanupCancel()
 953		// Ensure we finish thinking on error to close the reasoning state.
 954		currentAssistant.FinishThinking()
 955		toolCalls := currentAssistant.ToolCalls()
 956		// INFO: we use the cleanup context here because the genCtx has been cancelled.
 957		msgs, createErr := a.messages.List(cleanupCtx, currentAssistant.SessionID)
 958		if createErr != nil {
 959			return nil, createErr
 960		}
 961		for _, tc := range toolCalls {
 962			if !tc.Finished {
 963				tc.Finished = true
 964				tc.Input = "{}"
 965				currentAssistant.AddToolCall(tc)
 966				updateErr := a.messages.Update(cleanupCtx, *currentAssistant)
 967				if updateErr != nil {
 968					return nil, updateErr
 969				}
 970			}
 971
 972			found := false
 973			for _, msg := range msgs {
 974				if msg.Role == message.Tool {
 975					for _, tr := range msg.ToolResults() {
 976						if tr.ToolCallID == tc.ID {
 977							found = true
 978							break
 979						}
 980					}
 981				}
 982				if found {
 983					break
 984				}
 985			}
 986			if found {
 987				continue
 988			}
 989			content := "There was an error while executing the tool"
 990			if isCancelErr {
 991				content = "Error: user cancelled assistant tool calling"
 992			}
 993			toolResult := message.ToolResult{
 994				ToolCallID: tc.ID,
 995				Name:       tc.Name,
 996				Content:    content,
 997				IsError:    true,
 998			}
 999			_, createErr = a.messages.Create(cleanupCtx, currentAssistant.SessionID, message.CreateMessageParams{
1000				Role: message.Tool,
1001				Parts: []message.ContentPart{
1002					toolResult,
1003				},
1004			})
1005			if createErr != nil {
1006				return nil, createErr
1007			}
1008		}
1009		var fantasyErr *fantasy.Error
1010		var providerErr *fantasy.ProviderError
1011		const defaultTitle = "Provider Error"
1012		linkStyle := lipgloss.NewStyle().Foreground(charmtone.Guac).Underline(true)
1013		if isCancelErr {
1014			currentAssistant.AddFinish(message.FinishReasonCanceled, "User canceled request", "")
1015		} else if isHyper && errors.As(err, &providerErr) && providerErr.StatusCode == http.StatusUnauthorized {
1016			currentAssistant.AddFinish(message.FinishReasonError, "Unauthorized", `Please re-authenticate with Hyper. You can also run "crush auth" to re-authenticate.`)
1017			if a.notify != nil {
1018				a.notify.Publish(pubsub.CreatedEvent, notify.Notification{
1019					SessionID:    call.SessionID,
1020					SessionTitle: currentSession.Title,
1021					Type:         notify.TypeReAuthenticate,
1022					ProviderID:   largeModel.ModelCfg.Provider,
1023				})
1024			}
1025		} else if isHyper && errors.As(err, &providerErr) && providerErr.StatusCode == http.StatusPaymentRequired {
1026			url := hyper.BaseURL()
1027			link := linkStyle.Hyperlink(url, "id=hyper").Render(url)
1028			currentAssistant.AddFinish(message.FinishReasonError, "No credits", "You're out of credits. Add more at "+link)
1029		} else if errors.As(err, &providerErr) {
1030			if providerErr.Message == "The requested model is not supported." {
1031				url := "https://github.com/settings/copilot/features"
1032				link := linkStyle.Hyperlink(url, "id=copilot").Render(url)
1033				currentAssistant.AddFinish(
1034					message.FinishReasonError,
1035					"Copilot model not enabled",
1036					fmt.Sprintf("%q is not enabled in Copilot. Go to the following page to enable it. Then, wait 5 minutes before trying again. %s", largeModel.CatwalkCfg.Name, link),
1037				)
1038			} else {
1039				currentAssistant.AddFinish(message.FinishReasonError, cmp.Or(stringext.Capitalize(providerErr.Title), defaultTitle), providerErr.Message)
1040			}
1041		} else if errors.As(err, &fantasyErr) {
1042			currentAssistant.AddFinish(message.FinishReasonError, cmp.Or(stringext.Capitalize(fantasyErr.Title), defaultTitle), fantasyErr.Message)
1043		} else {
1044			currentAssistant.AddFinish(message.FinishReasonError, defaultTitle, err.Error())
1045		}
1046		// Note: we use the cleanup context here because the genCtx has been
1047		// cancelled.
1048		updateErr := a.messages.Update(cleanupCtx, *currentAssistant)
1049		if updateErr != nil {
1050			return nil, updateErr
1051		}
1052		return nil, err
1053	}
1054
1055	if shouldSummarize {
1056		a.activeRequests.Del(call.SessionID)
1057		if summarizeErr := a.Summarize(genCtx, call.SessionID, call.ProviderOptions); summarizeErr != nil {
1058			return nil, summarizeErr
1059		}
1060		// If the agent wasn't done...
1061		if len(currentAssistant.ToolCalls()) > 0 {
1062			existing, ok := a.messageQueue.Get(call.SessionID)
1063			if !ok {
1064				existing = []SessionAgentCall{}
1065			}
1066			call.Prompt = fmt.Sprintf("The previous session was interrupted because it got too long, the initial user request was: `%s`", call.Prompt)
1067			existing = append(existing, call)
1068			a.messageQueue.Set(call.SessionID, existing)
1069		}
1070	}
1071
1072	// Release active request before publishing the notification.
1073	// TUI handlers poll IsSessionBusy() and only re-evaluate when a
1074	// tea.Msg arrives, so the cleanup must precede the notify or
1075	// subscribers see stale busy state at the moment of receipt.
1076	a.activeRequests.Del(call.SessionID)
1077	cancel()
1078
1079	// Send notification that agent has finished its turn (skip for
1080	// nested/non-interactive sessions).
1081	if !call.NonInteractive && a.notify != nil {
1082		a.notify.Publish(pubsub.CreatedEvent, notify.Notification{
1083			SessionID:    call.SessionID,
1084			SessionTitle: currentSession.Title,
1085			Type:         notify.TypeAgentFinished,
1086		})
1087	}
1088
1089	// Hand off to the next queued prompt (if any) under dispatchMu so
1090	// the transition from this finished run to the queued run is atomic
1091	// against a concurrent Cancel. activeRequests for this session was
1092	// just deleted above, so without the lock there is a window in
1093	// which the session looks idle and a cancel becomes a no-op that
1094	// fails to stop the queued prompt. Holding the lock lets us observe
1095	// a pending cancel recorded against the session and drop the queue
1096	// instead of running it, and (for the recursion) hand a fresh
1097	// accept reservation to the dequeued call so acceptedRuns stays > 0
1098	// across the recursive Run's own dispatch handoff — keeping the
1099	// session observable to Cancel for the entire transition and
1100	// closing the dequeue -> re-register window.
1101	mu := a.sessionMu(call.SessionID)
1102	mu.Lock()
1103	queuedMessages, _ := a.messageQueue.Get(call.SessionID)
1104	if mark, ok := a.cancelMark.Get(call.SessionID); ok && mark > 0 && len(queuedMessages) > 0 {
1105		// A cancel was recorded for this session (e.g. it arrived while
1106		// this run was active and follow-ups had been queued). Drop the
1107		// queued prompts it covers (accept sequence at or below the
1108		// mark, or untracked); keep any queued after the cancel (higher
1109		// sequence) so they still run.
1110		var kept []SessionAgentCall
1111		for _, q := range queuedMessages {
1112			if q.acceptSeq == 0 || q.acceptSeq <= mark {
1113				continue
1114			}
1115			kept = append(kept, q)
1116		}
1117		queuedMessages = kept
1118		a.messageQueue.Set(call.SessionID, kept)
1119	}
1120	if len(queuedMessages) == 0 {
1121		// No queued work. Clear the cancel mark only when no accepted
1122		// run remains in flight that it might still cover; otherwise a
1123		// sibling prompt (sequence at or below the mark) waiting to
1124		// enter Run would lose its cancellation. When accepted runs are
1125		// gone, this also clears a stale mark so it can't catch a
1126		// future run.
1127		a.messageQueue.Del(call.SessionID)
1128		a.acceptedMu.Lock()
1129		inFlight, _ := a.acceptedRuns.Get(call.SessionID)
1130		a.acceptedMu.Unlock()
1131		if inFlight == 0 {
1132			a.cancelMark.Del(call.SessionID)
1133		}
1134		mu.Unlock()
1135		return result, err
1136	}
1137	// There are queued messages restart the loop. The recursive Run
1138	// publishes its own RunComplete for the queued prompt, so suppress
1139	// the outer defer's emit to avoid a duplicate event whose Error
1140	// field would belong to the recursive turn but whose MessageID/Text
1141	// would belong to the outer turn.
1142	skipRunComplete = true
1143	firstQueuedMessage := queuedMessages[0]
1144	a.messageQueue.Set(call.SessionID, queuedMessages[1:])
1145	// Reserve a fresh accept for the dequeued prompt before dropping the
1146	// lock so acceptedRuns > 0 across the handoff into the recursive
1147	// Run. This closes the window between this dequeue and the recursive
1148	// Run registering its activeRequests entry: a cancel arriving in
1149	// that window now records a pending cancel (acceptedRuns > 0) that
1150	// the recursive Run's accepted path observes as cancel-on-entry.
1151	firstQueuedMessage.Accepted = a.BeginAccepted(call.SessionID)
1152	mu.Unlock()
1153	return a.Run(ctx, firstQueuedMessage)
1154}
1155
1156func (a *sessionAgent) Summarize(ctx context.Context, sessionID string, opts fantasy.ProviderOptions) error {
1157	if a.IsSessionBusy(sessionID) {
1158		return ErrSessionBusy
1159	}
1160
1161	// Copy mutable fields under lock to avoid races with SetModels.
1162	largeModel := a.largeModel.Get()
1163	systemPromptPrefix := a.systemPromptPrefix.Get()
1164
1165	currentSession, err := a.sessions.Get(ctx, sessionID)
1166	if err != nil {
1167		return fmt.Errorf("failed to get session: %w", err)
1168	}
1169	msgs, err := a.getSessionMessages(ctx, currentSession)
1170	if err != nil {
1171		return err
1172	}
1173	if len(msgs) == 0 {
1174		// Nothing to summarize.
1175		return nil
1176	}
1177
1178	aiMsgs, _ := a.preparePrompt(msgs, largeModel.CatwalkCfg.SupportsImages)
1179
1180	genCtx, cancel := context.WithCancel(ctx)
1181	a.activeRequests.Set(sessionID, cancel)
1182	defer a.activeRequests.Del(sessionID)
1183	defer cancel()
1184	defer func() {
1185		if flushErr := a.messages.FlushAll(ctx); flushErr != nil {
1186			slog.Error("Failed to flush pending message updates after summarize", "error", flushErr)
1187		}
1188	}()
1189
1190	agent := fantasy.NewAgent(
1191		largeModel.Model,
1192		fantasy.WithSystemPrompt(string(summaryPrompt)),
1193		fantasy.WithUserAgent(userAgent),
1194	)
1195	summaryMessage, err := a.messages.Create(ctx, sessionID, message.CreateMessageParams{
1196		Role:             message.Assistant,
1197		Model:            largeModel.ModelCfg.Model,
1198		Provider:         largeModel.ModelCfg.Provider,
1199		IsSummaryMessage: true,
1200	})
1201	if err != nil {
1202		return err
1203	}
1204
1205	summaryPromptText := buildSummaryPrompt(currentSession.Todos)
1206
1207	resp, err := agent.Stream(genCtx, fantasy.AgentStreamCall{
1208		Prompt:          summaryPromptText,
1209		Messages:        aiMsgs,
1210		ProviderOptions: opts,
1211		PrepareStep: func(callContext context.Context, options fantasy.PrepareStepFunctionOptions) (_ context.Context, prepared fantasy.PrepareStepResult, err error) {
1212			prepared.Messages = options.Messages
1213			if systemPromptPrefix != "" {
1214				prepared.Messages = append([]fantasy.Message{fantasy.NewSystemMessage(systemPromptPrefix)}, prepared.Messages...)
1215			}
1216			return callContext, prepared, nil
1217		},
1218		OnReasoningDelta: func(id string, text string) error {
1219			summaryMessage.AppendReasoningContent(text)
1220			return a.messages.Update(genCtx, summaryMessage)
1221		},
1222		OnReasoningEnd: func(id string, reasoning fantasy.ReasoningContent) error {
1223			// Handle anthropic signature.
1224			if anthropicData, ok := reasoning.ProviderMetadata["anthropic"]; ok {
1225				if signature, ok := anthropicData.(*anthropic.ReasoningOptionMetadata); ok && signature.Signature != "" {
1226					summaryMessage.AppendReasoningSignature(signature.Signature)
1227				}
1228			}
1229			summaryMessage.FinishThinking()
1230			return a.messages.Update(genCtx, summaryMessage)
1231		},
1232		OnTextDelta: func(id, text string) error {
1233			summaryMessage.AppendContent(text)
1234			return a.messages.Update(genCtx, summaryMessage)
1235		},
1236	})
1237	if err != nil {
1238		isCancelErr := errors.Is(err, context.Canceled)
1239		if isCancelErr {
1240			// User cancelled summarize we need to remove the summary message.
1241			deleteErr := a.messages.Delete(ctx, summaryMessage.ID)
1242			return deleteErr
1243		}
1244		// Mark the summary message as finished with an error so the UI
1245		// stops spinning.
1246		summaryMessage.AddFinish(message.FinishReasonError, "Summarization Error", err.Error())
1247		if updateErr := a.messages.Update(ctx, summaryMessage); updateErr != nil {
1248			return updateErr
1249		}
1250		return err
1251	}
1252
1253	summaryMessage.AddFinish(message.FinishReasonEndTurn, "", "")
1254	err = a.messages.Update(genCtx, summaryMessage)
1255	if err != nil {
1256		return err
1257	}
1258
1259	var openrouterCost *float64
1260	for _, step := range resp.Steps {
1261		stepCost := a.openrouterCost(step.ProviderMetadata)
1262		if stepCost != nil {
1263			newCost := *stepCost
1264			if openrouterCost != nil {
1265				newCost += *openrouterCost
1266			}
1267			openrouterCost = &newCost
1268		}
1269	}
1270
1271	a.updateSessionUsage(largeModel, &currentSession, resp.TotalUsage, openrouterCost, false)
1272
1273	// Just in case, get just the last usage info.
1274	usage := resp.Response.Usage
1275	currentSession.SummaryMessageID = summaryMessage.ID
1276	currentSession.CompletionTokens = summaryCompletionTokens(usage, summaryMessage)
1277	currentSession.PromptTokens = 0
1278	currentSession.EstimatedUsage = usageIsZero(usage)
1279	_, err = a.sessions.Save(genCtx, currentSession)
1280	if err != nil {
1281		return err
1282	}
1283
1284	// Release the active request before processing queued messages so that
1285	// Run() does not see the session as busy.
1286	a.activeRequests.Del(sessionID)
1287	cancel()
1288
1289	// Process any messages that were queued while summarizing.
1290	queuedMessages, ok := a.messageQueue.Get(sessionID)
1291	if !ok || len(queuedMessages) == 0 {
1292		return nil
1293	}
1294	firstQueuedMessage := queuedMessages[0]
1295	a.messageQueue.Set(sessionID, queuedMessages[1:])
1296	_, qErr := a.Run(ctx, firstQueuedMessage)
1297	return qErr
1298}
1299
1300func (a *sessionAgent) getCacheControlOptions() fantasy.ProviderOptions {
1301	if t, _ := strconv.ParseBool(os.Getenv("CRUSH_DISABLE_ANTHROPIC_CACHE")); t {
1302		return fantasy.ProviderOptions{}
1303	}
1304	return fantasy.ProviderOptions{
1305		anthropic.Name: &anthropic.ProviderCacheControlOptions{
1306			CacheControl: anthropic.CacheControl{Type: "ephemeral"},
1307		},
1308		bedrock.Name: &anthropic.ProviderCacheControlOptions{
1309			CacheControl: anthropic.CacheControl{Type: "ephemeral"},
1310		},
1311		vercel.Name: &anthropic.ProviderCacheControlOptions{
1312			CacheControl: anthropic.CacheControl{Type: "ephemeral"},
1313		},
1314	}
1315}
1316
1317func (a *sessionAgent) createUserMessage(ctx context.Context, call SessionAgentCall) (message.Message, error) {
1318	parts := []message.ContentPart{message.TextContent{Text: call.Prompt}}
1319	var attachmentParts []message.ContentPart
1320	for _, attachment := range call.Attachments {
1321		attachmentParts = append(attachmentParts, message.BinaryContent{Path: attachment.FilePath, MIMEType: attachment.MimeType, Data: attachment.Content})
1322	}
1323	parts = append(parts, attachmentParts...)
1324	msg, err := a.messages.Create(ctx, call.SessionID, message.CreateMessageParams{
1325		Role:  message.User,
1326		Parts: parts,
1327	})
1328	if err != nil {
1329		return message.Message{}, fmt.Errorf("failed to create user message: %w", err)
1330	}
1331	return msg, nil
1332}
1333
1334func (a *sessionAgent) preparePrompt(msgs []message.Message, supportsImages bool, attachments ...message.Attachment) ([]fantasy.Message, []fantasy.FilePart) {
1335	var history []fantasy.Message
1336	if !a.isSubAgent {
1337		history = append(history, fantasy.NewUserMessage(
1338			fmt.Sprintf(
1339				"<system_reminder>%s</system_reminder>",
1340				`This is a reminder that your todo list is currently empty. DO NOT mention this to the user explicitly because they are already aware.
1341If you are working on tasks that would benefit from a todo list please use the "todos" tool to create one.
1342If not, please feel free to ignore. Again do not mention this message to the user.`,
1343			),
1344		))
1345	}
1346	// Collect all tool call IDs present in assistant messages and all tool
1347	// result IDs present in tool messages. This lets us detect both orphaned
1348	// tool results (result without a call) and orphaned tool calls (call
1349	// without a result).
1350	knownToolCallIDs := make(map[string]struct{})
1351	knownToolResultIDs := make(map[string]struct{})
1352	for _, m := range msgs {
1353		switch m.Role {
1354		case message.Assistant:
1355			for _, tc := range m.ToolCalls() {
1356				knownToolCallIDs[tc.ID] = struct{}{}
1357			}
1358		case message.Tool:
1359			for _, tr := range m.ToolResults() {
1360				knownToolResultIDs[tr.ToolCallID] = struct{}{}
1361			}
1362		}
1363	}
1364
1365	for _, m := range msgs {
1366		if len(m.Parts) == 0 {
1367			continue
1368		}
1369		// Assistant message without content or tool calls (cancelled before it returned anything).
1370		if m.Role == message.Assistant && len(m.ToolCalls()) == 0 && m.Content().Text == "" && m.ReasoningContent().String() == "" {
1371			continue
1372		}
1373		if m.Role == message.Tool {
1374			if msg, ok := filterOrphanedToolResults(m, knownToolCallIDs); ok {
1375				history = append(history, msg)
1376			}
1377			continue
1378		}
1379		aiMsgs := m.ToAIMessage()
1380		if !supportsImages {
1381			for i := range aiMsgs {
1382				if aiMsgs[i].Role == fantasy.MessageRoleUser {
1383					aiMsgs[i].Content = filterFileParts(aiMsgs[i].Content)
1384				}
1385			}
1386		}
1387		history = append(history, aiMsgs...)
1388
1389		if m.Role == message.Assistant {
1390			if msg, ok := syntheticToolResultsForOrphanedCalls(m, knownToolResultIDs); ok {
1391				history = append(history, msg)
1392			}
1393		}
1394	}
1395
1396	var files []fantasy.FilePart
1397	for _, attachment := range attachments {
1398		if attachment.IsText() {
1399			continue
1400		}
1401		files = append(files, fantasy.FilePart{
1402			Filename:  attachment.FileName,
1403			Data:      attachment.Content,
1404			MediaType: attachment.MimeType,
1405		})
1406	}
1407
1408	return history, files
1409}
1410
1411// filterFileParts removes fantasy.FilePart entries from a slice of message
1412// parts. Used to strip image attachments from historical user messages when
1413// the current model does not support them.
1414func filterFileParts(parts []fantasy.MessagePart) []fantasy.MessagePart {
1415	filtered := make([]fantasy.MessagePart, 0, len(parts))
1416	for _, part := range parts {
1417		if _, ok := fantasy.AsMessagePart[fantasy.FilePart](part); ok {
1418			continue
1419		}
1420		filtered = append(filtered, part)
1421	}
1422	return filtered
1423}
1424
1425// filterOrphanedToolResults converts a tool message to a fantasy.Message,
1426// dropping any tool result parts whose tool_call_id has no matching tool call
1427// in the known set. An orphaned result causes API validation to fail on every
1428// subsequent turn, permanently locking the session. Returns the filtered
1429// message and true if at least one valid part remains.
1430func filterOrphanedToolResults(m message.Message, knownToolCallIDs map[string]struct{}) (fantasy.Message, bool) {
1431	aiMsgs := m.ToAIMessage()
1432	if len(aiMsgs) == 0 {
1433		return fantasy.Message{}, false
1434	}
1435	var validParts []fantasy.MessagePart
1436	for _, part := range aiMsgs[0].Content {
1437		tr, ok := fantasy.AsMessagePart[fantasy.ToolResultPart](part)
1438		if !ok {
1439			validParts = append(validParts, part)
1440			continue
1441		}
1442		if _, known := knownToolCallIDs[tr.ToolCallID]; known {
1443			validParts = append(validParts, part)
1444		} else {
1445			slog.Warn(
1446				"Dropping orphaned tool result with no matching tool call",
1447				"tool_call_id", tr.ToolCallID,
1448			)
1449		}
1450	}
1451	if len(validParts) == 0 {
1452		return fantasy.Message{}, false
1453	}
1454	msg := aiMsgs[0]
1455	msg.Content = validParts
1456	return msg, true
1457}
1458
1459// syntheticToolResultsForOrphanedCalls returns a tool message containing
1460// synthetic tool results for any tool calls in the assistant message that
1461// have no matching result in knownToolResultIDs. LLM APIs require every
1462// tool_use to be immediately followed by a tool_result; an interrupted
1463// session can leave orphaned tool_use blocks that permanently lock the
1464// conversation. Returns the message and true if any synthetic results were
1465// produced.
1466func syntheticToolResultsForOrphanedCalls(m message.Message, knownToolResultIDs map[string]struct{}) (fantasy.Message, bool) {
1467	var syntheticParts []fantasy.MessagePart
1468	for _, tc := range m.ToolCalls() {
1469		if _, hasResult := knownToolResultIDs[tc.ID]; hasResult {
1470			continue
1471		}
1472		slog.Warn(
1473			"Injecting synthetic tool result for orphaned tool call",
1474			"tool_call_id", tc.ID,
1475			"tool_name", tc.Name,
1476		)
1477		syntheticParts = append(syntheticParts, fantasy.ToolResultPart{
1478			ToolCallID: tc.ID,
1479			Output: fantasy.ToolResultOutputContentError{
1480				Error: errors.New("tool call was interrupted and did not produce a result, you may retry this call if the result is still needed"),
1481			},
1482		})
1483	}
1484	if len(syntheticParts) == 0 {
1485		return fantasy.Message{}, false
1486	}
1487	return fantasy.Message{
1488		Role:    fantasy.MessageRoleTool,
1489		Content: syntheticParts,
1490	}, true
1491}
1492
1493func (a *sessionAgent) getSessionMessages(ctx context.Context, session session.Session) ([]message.Message, error) {
1494	msgs, err := a.messages.List(ctx, session.ID)
1495	if err != nil {
1496		return nil, fmt.Errorf("failed to list messages: %w", err)
1497	}
1498
1499	if session.SummaryMessageID != "" {
1500		summaryMsgIndex := -1
1501		for i, msg := range msgs {
1502			if msg.ID == session.SummaryMessageID {
1503				summaryMsgIndex = i
1504				break
1505			}
1506		}
1507		if summaryMsgIndex != -1 {
1508			msgs = msgs[summaryMsgIndex:]
1509			msgs[0].Role = message.User
1510		}
1511	}
1512	return msgs, nil
1513}
1514
1515// generateTitle generates a session titled based on the initial prompt.
1516func (a *sessionAgent) generateTitle(ctx context.Context, sessionID string, userPrompt string) {
1517	if userPrompt == "" {
1518		return
1519	}
1520
1521	smallModel := a.smallModel.Get()
1522	largeModel := a.largeModel.Get()
1523	systemPromptPrefix := a.systemPromptPrefix.Get()
1524
1525	var maxOutputTokens int64 = 40
1526	if smallModel.CatwalkCfg.CanReason {
1527		maxOutputTokens = smallModel.CatwalkCfg.DefaultMaxTokens
1528	}
1529
1530	newAgent := func(m fantasy.LanguageModel, p []byte, tok int64) fantasy.Agent {
1531		return fantasy.NewAgent(
1532			m,
1533			fantasy.WithSystemPrompt(string(p)+"\n /no_think"),
1534			fantasy.WithMaxOutputTokens(tok),
1535			fantasy.WithUserAgent(userAgent),
1536		)
1537	}
1538
1539	streamCall := fantasy.AgentStreamCall{
1540		Prompt: fmt.Sprintf("Generate a concise title for the following content:\n\n%s\n <think>\n\n</think>", userPrompt),
1541		PrepareStep: func(callCtx context.Context, opts fantasy.PrepareStepFunctionOptions) (_ context.Context, prepared fantasy.PrepareStepResult, err error) {
1542			prepared.Messages = opts.Messages
1543			if systemPromptPrefix != "" {
1544				prepared.Messages = append([]fantasy.Message{
1545					fantasy.NewSystemMessage(systemPromptPrefix),
1546				}, prepared.Messages...)
1547			}
1548			return callCtx, prepared, nil
1549		},
1550	}
1551
1552	// Use the small model to generate the title.
1553	model := smallModel
1554	agent := newAgent(model.Model, titlePrompt, maxOutputTokens)
1555	resp, err := agent.Stream(ctx, streamCall)
1556	if err == nil {
1557		// We successfully generated a title with the small model.
1558		slog.Debug("Generated title with small model")
1559	} else {
1560		// It didn't work. Let's try with the big model.
1561		slog.Error("Error generating title with small model; trying big model", "err", err)
1562		model = largeModel
1563		agent = newAgent(model.Model, titlePrompt, maxOutputTokens)
1564		resp, err = agent.Stream(ctx, streamCall)
1565		if err == nil {
1566			slog.Debug("Generated title with large model")
1567		} else {
1568			// Welp, the large model didn't work either. Use the default
1569			// session name and return.
1570			slog.Error("Error generating title with large model", "err", err)
1571			saveErr := a.sessions.Rename(ctx, sessionID, DefaultSessionName)
1572			if saveErr != nil {
1573				slog.Error("Failed to save session title", "error", saveErr)
1574			}
1575			return
1576		}
1577	}
1578
1579	if resp == nil {
1580		// Actually, we didn't get a response so we can't. Use the default
1581		// session name and return.
1582		slog.Error("Response is nil; can't generate title")
1583		saveErr := a.sessions.Rename(ctx, sessionID, DefaultSessionName)
1584		if saveErr != nil {
1585			slog.Error("Failed to save session title", "error", saveErr)
1586		}
1587		return
1588	}
1589
1590	// Clean up title.
1591	var title string
1592	title = strings.ReplaceAll(resp.Response.Content.Text(), "\n", " ")
1593
1594	// Remove thinking tags if present.
1595	title = thinkTagRegex.ReplaceAllString(title, "")
1596	title = orphanThinkTagRegex.ReplaceAllString(title, "")
1597
1598	title = strings.TrimSpace(title)
1599	title = cmp.Or(title, DefaultSessionName)
1600
1601	// Calculate usage and cost.
1602	var openrouterCost *float64
1603	for _, step := range resp.Steps {
1604		stepCost := a.openrouterCost(step.ProviderMetadata)
1605		if stepCost != nil {
1606			newCost := *stepCost
1607			if openrouterCost != nil {
1608				newCost += *openrouterCost
1609			}
1610			openrouterCost = &newCost
1611		}
1612	}
1613
1614	modelConfig := model.CatwalkCfg
1615	cost := modelConfig.CostPer1MInCached/1e6*float64(resp.TotalUsage.CacheCreationTokens) +
1616		modelConfig.CostPer1MOutCached/1e6*float64(resp.TotalUsage.CacheReadTokens) +
1617		modelConfig.CostPer1MIn/1e6*float64(resp.TotalUsage.InputTokens) +
1618		modelConfig.CostPer1MOut/1e6*float64(resp.TotalUsage.OutputTokens)
1619
1620	// Use override cost if available (e.g., from OpenRouter).
1621	if openrouterCost != nil {
1622		cost = *openrouterCost
1623	}
1624
1625	// Skip cost accumulation
1626	if model.FlatRate {
1627		cost = 0
1628	}
1629
1630	promptTokens := resp.TotalUsage.InputTokens + resp.TotalUsage.CacheCreationTokens
1631	completionTokens := resp.TotalUsage.OutputTokens
1632
1633	// Atomically update only title and usage fields to avoid overriding other
1634	// concurrent session updates.
1635	saveErr := a.sessions.UpdateTitleAndUsage(ctx, sessionID, title, promptTokens, completionTokens, cost)
1636	if saveErr != nil {
1637		slog.Error("Failed to save session title and usage", "error", saveErr)
1638		return
1639	}
1640}
1641
1642func (a *sessionAgent) openrouterCost(metadata fantasy.ProviderMetadata) *float64 {
1643	openrouterMetadata, ok := metadata[openrouter.Name]
1644	if !ok {
1645		return nil
1646	}
1647
1648	opts, ok := openrouterMetadata.(*openrouter.ProviderMetadata)
1649	if !ok {
1650		return nil
1651	}
1652	return &opts.Usage.Cost
1653}
1654
1655func (a *sessionAgent) updateSessionUsage(model Model, session *session.Session, usage fantasy.Usage, overrideCost *float64, estimated bool) {
1656	if !usageIsZero(usage) {
1657		session.EstimatedUsage = estimated
1658	}
1659
1660	modelConfig := model.CatwalkCfg
1661	cost := modelConfig.CostPer1MInCached/1e6*float64(usage.CacheCreationTokens) +
1662		modelConfig.CostPer1MOutCached/1e6*float64(usage.CacheReadTokens) +
1663		modelConfig.CostPer1MIn/1e6*float64(usage.InputTokens) +
1664		modelConfig.CostPer1MOut/1e6*float64(usage.OutputTokens)
1665
1666	if !estimated {
1667		a.eventTokensUsed(session.ID, model, usage, cost)
1668	}
1669
1670	if estimated {
1671		cost = 0
1672	} else {
1673		// Use override cost if available (e.g., from OpenRouter).
1674		if overrideCost != nil {
1675			cost = *overrideCost
1676		}
1677
1678		// Skip cost accumulation
1679		if model.FlatRate {
1680			cost = 0
1681		}
1682	}
1683
1684	session.Cost += cost
1685	updateSessionTokenCounters(session, usage)
1686}
1687
1688func updateSessionTokenCounters(session *session.Session, usage fantasy.Usage) {
1689	if usage.OutputTokens != 0 {
1690		session.CompletionTokens = usage.OutputTokens
1691	}
1692	if promptTokens := usage.InputTokens + usage.CacheReadTokens; promptTokens != 0 {
1693		session.PromptTokens = promptTokens
1694	}
1695}
1696
1697func summaryCompletionTokens(usage fantasy.Usage, summaryMessage message.Message) int64 {
1698	if usage.OutputTokens != 0 {
1699		return usage.OutputTokens
1700	}
1701	return approxTokenCount(summaryMessage.Content().Text) + approxTokenCount(summaryMessage.ReasoningContent().String())
1702}
1703
1704func (a *sessionAgent) Cancel(sessionID string) {
1705	// Serialize against the dispatch handoff in Run so the accepted ->
1706	// (cancel-on-entry | queued | active) transition is atomic against
1707	// this cancel. Every cancel observes at least one of: an active
1708	// request, an accepted run (recorded as a pending cancel), or a
1709	// queue entry it then clears. If none of those hold, an idle Escape
1710	// is a true no-op and must not poison the next prompt.
1711	mu := a.sessionMu(sessionID)
1712	mu.Lock()
1713	defer mu.Unlock()
1714
1715	// Cancel regular requests. Don't use Take() here - we need the entry to
1716	// remain in activeRequests so IsBusy() returns true until the goroutine
1717	// fully completes (including error handling that may access the DB).
1718	// The defer in processRequest will clean up the entry.
1719	if cancel, ok := a.activeRequests.Get(sessionID); ok && cancel != nil {
1720		slog.Debug("Request cancellation initiated", "session_id", sessionID)
1721		cancel()
1722	}
1723
1724	// Also check for summarize requests.
1725	if cancel, ok := a.activeRequests.Get(sessionID + "-summarize"); ok && cancel != nil {
1726		slog.Debug("Summarize cancellation initiated", "session_id", sessionID)
1727		cancel()
1728	}
1729
1730	// Record a pending cancel only when a dispatched-but-not-yet-active
1731	// run exists. This catches runs still in the goroutine scheduler or
1732	// about to enter Run's busy-queue branch, while leaving an idle
1733	// session untouched. Active and accepted are not mutually exclusive:
1734	// when a run is active and a follow-up has been accepted, both the
1735	// cancel above and this pending record fire.
1736	//
1737	// Raise the session's cancel mark to the latest accept sequence
1738	// assigned so far. Every prompt currently accepted-but-not-yet-
1739	// active has a sequence at or below that value, so one cancel covers
1740	// all of them; a prompt accepted after this cancel gets a strictly
1741	// higher sequence and is never poisoned. Using max keeps repeated
1742	// cancels idempotent while the same prompts are in flight and lets a
1743	// later cancel extend coverage to prompts accepted since.
1744	a.acceptedMu.Lock()
1745	count, ok := a.acceptedRuns.Get(sessionID)
1746	mark := a.acceptSeqGen
1747	a.acceptedMu.Unlock()
1748	if ok && count > 0 {
1749		slog.Debug("Recording cancel mark for accepted runs", "session_id", sessionID, "count", count, "mark", mark)
1750		existing, _ := a.cancelMark.Get(sessionID)
1751		a.cancelMark.Set(sessionID, max(existing, mark))
1752	}
1753
1754	if a.QueuedPrompts(sessionID) > 0 {
1755		slog.Debug("Clearing queued prompts", "session_id", sessionID)
1756		a.messageQueue.Del(sessionID)
1757	}
1758}
1759
1760func (a *sessionAgent) ClearQueue(sessionID string) {
1761	if a.QueuedPrompts(sessionID) > 0 {
1762		slog.Debug("Clearing queued prompts", "session_id", sessionID)
1763		a.messageQueue.Del(sessionID)
1764	}
1765}
1766
1767func (a *sessionAgent) CancelAll() {
1768	if !a.IsBusy() {
1769		return
1770	}
1771	for key := range a.activeRequests.Seq2() {
1772		a.Cancel(key) // key is sessionID
1773	}
1774
1775	timeout := time.After(5 * time.Second)
1776	for a.IsBusy() {
1777		select {
1778		case <-timeout:
1779			return
1780		default:
1781			time.Sleep(200 * time.Millisecond)
1782		}
1783	}
1784}
1785
1786func (a *sessionAgent) IsBusy() bool {
1787	var busy bool
1788	for cancelFunc := range a.activeRequests.Seq() {
1789		if cancelFunc != nil {
1790			busy = true
1791			break
1792		}
1793	}
1794	return busy
1795}
1796
1797func (a *sessionAgent) IsSessionBusy(sessionID string) bool {
1798	_, busy := a.activeRequests.Get(sessionID)
1799	return busy
1800}
1801
1802func (a *sessionAgent) QueuedPrompts(sessionID string) int {
1803	l, ok := a.messageQueue.Get(sessionID)
1804	if !ok {
1805		return 0
1806	}
1807	return len(l)
1808}
1809
1810func (a *sessionAgent) QueuedPromptsList(sessionID string) []string {
1811	l, ok := a.messageQueue.Get(sessionID)
1812	if !ok {
1813		return nil
1814	}
1815	prompts := make([]string, len(l))
1816	for i, call := range l {
1817		prompts[i] = call.Prompt
1818	}
1819	return prompts
1820}
1821
1822func (a *sessionAgent) SetModels(large Model, small Model) {
1823	a.largeModel.Set(large)
1824	a.smallModel.Set(small)
1825}
1826
1827func (a *sessionAgent) SetTools(tools []fantasy.AgentTool) {
1828	a.tools.SetSlice(tools)
1829}
1830
1831func (a *sessionAgent) SetSystemPrompt(systemPrompt string) {
1832	a.systemPrompt.Set(systemPrompt)
1833}
1834
1835func (a *sessionAgent) Model() Model {
1836	return a.largeModel.Get()
1837}
1838
1839// convertToToolResult converts a fantasy tool result to a message tool result.
1840func (a *sessionAgent) convertToToolResult(result fantasy.ToolResultContent) message.ToolResult {
1841	baseResult := message.ToolResult{
1842		ToolCallID: result.ToolCallID,
1843		Name:       result.ToolName,
1844		Metadata:   result.ClientMetadata,
1845	}
1846
1847	switch result.Result.GetType() {
1848	case fantasy.ToolResultContentTypeText:
1849		if r, ok := fantasy.AsToolResultOutputType[fantasy.ToolResultOutputContentText](result.Result); ok {
1850			baseResult.Content = r.Text
1851		}
1852	case fantasy.ToolResultContentTypeError:
1853		if r, ok := fantasy.AsToolResultOutputType[fantasy.ToolResultOutputContentError](result.Result); ok {
1854			baseResult.Content = r.Error.Error()
1855			baseResult.IsError = true
1856		}
1857	case fantasy.ToolResultContentTypeMedia:
1858		if r, ok := fantasy.AsToolResultOutputType[fantasy.ToolResultOutputContentMedia](result.Result); ok {
1859			if !stringext.IsValidBase64(r.Data) {
1860				slog.Warn(
1861					"Tool returned media with invalid base64 data, discarding image",
1862					"tool", result.ToolName,
1863					"tool_call_id", result.ToolCallID,
1864				)
1865				baseResult.Content = "Tool returned image data with invalid encoding"
1866				baseResult.IsError = true
1867			} else {
1868				content := r.Text
1869				if content == "" {
1870					content = fmt.Sprintf("Loaded %s content", r.MediaType)
1871				}
1872				baseResult.Content = content
1873				baseResult.Data = r.Data
1874				baseResult.MIMEType = r.MediaType
1875			}
1876		}
1877	}
1878
1879	return baseResult
1880}
1881
1882// workaroundProviderMediaLimitations converts media content in tool results to
1883// user messages for providers that don't natively support images in tool results.
1884//
1885// Problem: OpenAI, Google, OpenRouter, and other OpenAI-compatible providers
1886// don't support sending images/media in tool result messages - they only accept
1887// text in tool results. However, they DO support images in user messages.
1888//
1889// If we send media in tool results to these providers, the API returns an error.
1890//
1891// Solution: For these providers, we:
1892//  1. Replace the media in the tool result with a text placeholder
1893//  2. Inject a user message immediately after with the image as a file attachment
1894//  3. This maintains the tool execution flow while working around API limitations
1895//
1896// Anthropic and Bedrock support images natively in tool results, so we skip
1897// this workaround for them.
1898//
1899// Example transformation:
1900//
1901//	BEFORE: [tool result: image data]
1902//	AFTER:  [tool result: "Image loaded - see attached"], [user: image attachment]
1903func (a *sessionAgent) workaroundProviderMediaLimitations(messages []fantasy.Message, largeModel Model) []fantasy.Message {
1904	providerSupportsMedia := largeModel.ModelCfg.Provider == string(catwalk.InferenceProviderAnthropic) ||
1905		largeModel.ModelCfg.Provider == string(catwalk.InferenceProviderBedrock) ||
1906		largeModel.ModelCfg.Provider == string(catwalk.InferenceProviderBedrockEurope)
1907
1908	if providerSupportsMedia {
1909		return messages
1910	}
1911
1912	convertedMessages := make([]fantasy.Message, 0, len(messages))
1913
1914	for _, msg := range messages {
1915		if msg.Role != fantasy.MessageRoleTool {
1916			convertedMessages = append(convertedMessages, msg)
1917			continue
1918		}
1919
1920		textParts := make([]fantasy.MessagePart, 0, len(msg.Content))
1921		var mediaFiles []fantasy.FilePart
1922
1923		for _, part := range msg.Content {
1924			toolResult, ok := fantasy.AsMessagePart[fantasy.ToolResultPart](part)
1925			if !ok {
1926				textParts = append(textParts, part)
1927				continue
1928			}
1929
1930			if media, ok := fantasy.AsToolResultOutputType[fantasy.ToolResultOutputContentMedia](toolResult.Output); ok {
1931				decoded, err := base64.StdEncoding.DecodeString(media.Data)
1932				if err != nil {
1933					slog.Warn("Failed to decode media data", "error", err)
1934					textParts = append(textParts, part)
1935					continue
1936				}
1937
1938				mediaFiles = append(mediaFiles, fantasy.FilePart{
1939					Data:      decoded,
1940					MediaType: media.MediaType,
1941					Filename:  fmt.Sprintf("tool-result-%s", toolResult.ToolCallID),
1942				})
1943
1944				textParts = append(textParts, fantasy.ToolResultPart{
1945					ToolCallID: toolResult.ToolCallID,
1946					Output: fantasy.ToolResultOutputContentText{
1947						Text: "[Image/media content loaded - see attached file]",
1948					},
1949					ProviderOptions: toolResult.ProviderOptions,
1950				})
1951			} else {
1952				textParts = append(textParts, part)
1953			}
1954		}
1955
1956		convertedMessages = append(convertedMessages, fantasy.Message{
1957			Role:    fantasy.MessageRoleTool,
1958			Content: textParts,
1959		})
1960
1961		if len(mediaFiles) > 0 {
1962			convertedMessages = append(convertedMessages, fantasy.NewUserMessage(
1963				"Here is the media content from the tool result:",
1964				mediaFiles...,
1965			))
1966		}
1967	}
1968
1969	return convertedMessages
1970}
1971
1972// buildSummaryPrompt constructs the prompt text for session summarization.
1973func buildSummaryPrompt(todos []session.Todo) string {
1974	var sb strings.Builder
1975	sb.WriteString("Provide a detailed summary of our conversation above.")
1976	if len(todos) > 0 {
1977		sb.WriteString("\n\n## Current Todo List\n\n")
1978		for _, t := range todos {
1979			fmt.Fprintf(&sb, "- [%s] %s\n", t.Status, t.Content)
1980		}
1981		sb.WriteString("\nInclude these tasks and their statuses in your summary. ")
1982		sb.WriteString("Instruct the resuming assistant to use the `todos` tool to continue tracking progress on these tasks.")
1983	}
1984	return sb.String()
1985}
1986
1987func providerRetryLogFields(err *fantasy.ProviderError, delay time.Duration) []any {
1988	fields := []any{
1989		"retry_delay", delay.String(),
1990	}
1991	if err == nil {
1992		return fields
1993	}
1994	fields = append(fields, "status_code", err.StatusCode)
1995	if err.Title != "" {
1996		fields = append(fields, "title", err.Title)
1997	}
1998	if err.Message != "" {
1999		fields = append(fields, "message", err.Message)
2000	}
2001	return fields
2002}