agent.go

   1// Package agent is the core orchestration layer for Crush AI agents.
   2//
   3// It provides session-based AI agent functionality for managing
   4// conversations, tool execution, and message handling. It coordinates
   5// interactions between language models, messages, sessions, and tools while
   6// handling features like automatic summarization, queuing, and token
   7// management.
   8package agent
   9
  10import (
  11	"cmp"
  12	"context"
  13	_ "embed"
  14	"encoding/base64"
  15	"errors"
  16	"fmt"
  17	"log/slog"
  18	"net/http"
  19	"os"
  20	"regexp"
  21	"strconv"
  22	"strings"
  23	"sync"
  24	"sync/atomic"
  25	"time"
  26
  27	"charm.land/catwalk/pkg/catwalk"
  28	"charm.land/fantasy"
  29	"charm.land/fantasy/providers/anthropic"
  30	"charm.land/fantasy/providers/bedrock"
  31	"charm.land/fantasy/providers/google"
  32	"charm.land/fantasy/providers/openai"
  33	"charm.land/fantasy/providers/openrouter"
  34	"charm.land/fantasy/providers/vercel"
  35	"charm.land/lipgloss/v2"
  36	"github.com/charmbracelet/crush/internal/agent/hyper"
  37	"github.com/charmbracelet/crush/internal/agent/notify"
  38	"github.com/charmbracelet/crush/internal/agent/tools"
  39	"github.com/charmbracelet/crush/internal/agent/tools/mcp"
  40	"github.com/charmbracelet/crush/internal/config"
  41	"github.com/charmbracelet/crush/internal/csync"
  42	"github.com/charmbracelet/crush/internal/message"
  43	"github.com/charmbracelet/crush/internal/pubsub"
  44	"github.com/charmbracelet/crush/internal/session"
  45	"github.com/charmbracelet/crush/internal/stringext"
  46	"github.com/charmbracelet/crush/internal/version"
  47	"github.com/charmbracelet/x/exp/charmtone"
  48)
  49
  50const (
  51	DefaultSessionName = "Untitled Session"
  52
  53	// Constants for auto-summarization thresholds
  54	largeContextWindowThreshold = 200_000
  55	largeContextWindowBuffer    = 20_000
  56	smallContextWindowRatio     = 0.2
  57)
  58
  59var userAgent = fmt.Sprintf("Charm-Crush/%s (https://charm.land/crush)", version.Version)
  60
  61//go:embed templates/title.md
  62var titlePrompt []byte
  63
  64//go:embed templates/summary.md
  65var summaryPrompt []byte
  66
  67// Used to remove <think> tags from generated titles.
  68var (
  69	thinkTagRegex       = regexp.MustCompile(`(?s)<think>.*?</think>`)
  70	orphanThinkTagRegex = regexp.MustCompile(`</?think>`)
  71)
  72
  73type SessionAgentCall struct {
  74	SessionID string
  75	// RunID, when non-empty, is the caller-supplied correlator that
  76	// gets echoed back on the notify.RunComplete event emitted for
  77	// this turn. It is preserved when the call is enqueued behind a
  78	// busy session so the queued turn's terminal event is still
  79	// recognisable to the original caller. Callers that need a
  80	// reliable completion contract (e.g. `crush run` against a
  81	// session that may be busy) MUST set it; SessionID alone is
  82	// ambiguous when concurrent turns share the same session.
  83	RunID            string
  84	Prompt           string
  85	ProviderOptions  fantasy.ProviderOptions
  86	Attachments      []message.Attachment
  87	MaxOutputTokens  int64
  88	Temperature      *float64
  89	TopP             *float64
  90	TopK             *int64
  91	FrequencyPenalty *float64
  92	PresencePenalty  *float64
  93	NonInteractive   bool
  94	// OnComplete, when non-nil, replaces the default RunComplete
  95	// publish path: the inner Run hands the terminal payload to this
  96	// callback instead of emitting it on the RunComplete broker. The
  97	// coordinator uses this hook to coalesce the unauthorized →
  98	// re-auth → retry chain into a single user-visible terminal
  99	// event, so non-interactive clients (e.g. `crush run`) don't
 100	// exit on a stale failed-attempt RunComplete before the
 101	// successful retry. It is intentionally stripped when queueing
 102	// a busy-session call (see Run): the originating
 103	// coordinator.Run has long returned by the time the queued
 104	// recursion drains, so falling back to the default broker
 105	// publish keeps the event visible to subscribers.
 106	OnComplete func(notify.RunComplete)
 107	// Accepted, when non-nil, is the accept reservation taken by
 108	// BeginAccepted before the call was dispatched onto a goroutine
 109	// (the client/server fire-and-forget path). Run consumes it under
 110	// dispatchMu[SessionID] once the accepted -> (cancel-on-entry |
 111	// queued | active) transition has been chosen. When nil
 112	// (in-process / local callers like AppWorkspace), behavior is
 113	// unchanged and no accept tracking applies.
 114	Accepted *AcceptedRun
 115}
 116
 117type SessionAgent interface {
 118	Run(context.Context, SessionAgentCall) (*fantasy.AgentResult, error)
 119	BeginAccepted(sessionID string) *AcceptedRun
 120	SetModels(large Model, small Model)
 121	SetTools(tools []fantasy.AgentTool)
 122	SetSystemPrompt(systemPrompt string)
 123	Cancel(sessionID string)
 124	CancelAll()
 125	IsSessionBusy(sessionID string) bool
 126	IsBusy() bool
 127	QueuedPrompts(sessionID string) int
 128	QueuedPromptsList(sessionID string) []string
 129	ClearQueue(sessionID string)
 130	Summarize(context.Context, string, fantasy.ProviderOptions) error
 131	Model() Model
 132}
 133
 134type Model struct {
 135	Model      fantasy.LanguageModel
 136	CatwalkCfg catwalk.Model
 137	ModelCfg   config.SelectedModel
 138	FlatRate   bool
 139}
 140
 141type sessionAgent struct {
 142	largeModel         *csync.Value[Model]
 143	smallModel         *csync.Value[Model]
 144	systemPromptPrefix *csync.Value[string]
 145	systemPrompt       *csync.Value[string]
 146	tools              *csync.Slice[fantasy.AgentTool]
 147
 148	isSubAgent           bool
 149	sessions             session.Service
 150	messages             message.Service
 151	disableAutoSummarize bool
 152	isYolo               bool
 153	notify               pubsub.Publisher[notify.Notification]
 154	runComplete          pubsub.Publisher[notify.RunComplete]
 155
 156	messageQueue   *csync.Map[string, []SessionAgentCall]
 157	activeRequests *csync.Map[string, context.CancelFunc]
 158
 159	// dispatchMu holds a per-session mutex that serializes the
 160	// accepted -> (cancel-on-entry | queued | active) transition in
 161	// Run against a concurrent Cancel. The lock is held only during
 162	// the brief handoff (no DB or LLM I/O under the lock).
 163	dispatchMu *csync.Map[string, *sync.Mutex]
 164	// acceptedRuns counts dispatched-but-not-yet-active runs per
 165	// session. A counter > 0 means a dispatched prompt is in flight
 166	// and has not yet completed the dispatch handoff in Run. Only
 167	// BeginAccepted increments it; only AcceptedRun.Close decrements
 168	// it.
 169	acceptedRuns *csync.Map[string, int]
 170	// pendingCancels records sessions whose dispatched-but-not-yet-
 171	// running call should observe a cancellation request. It is only
 172	// set by Cancel when acceptedRuns > 0, so an idle Escape never
 173	// poisons the next prompt.
 174	pendingCancels *csync.Map[string, struct{}]
 175	// dispatchMuCreate guards lazy creation of per-session entries in
 176	// dispatchMu so two goroutines can't race to lock different mutex
 177	// instances for the same session.
 178	dispatchMuCreate sync.Mutex
 179	// acceptedMu serializes increments/decrements of acceptedRuns. It
 180	// is separate from dispatchMu so AcceptedRun.Close (which may run
 181	// while Run holds dispatchMu for the same session) does not
 182	// deadlock by re-entering the dispatch lock.
 183	acceptedMu sync.Mutex
 184}
 185
 186type SessionAgentOptions struct {
 187	LargeModel           Model
 188	SmallModel           Model
 189	SystemPromptPrefix   string
 190	SystemPrompt         string
 191	IsSubAgent           bool
 192	DisableAutoSummarize bool
 193	IsYolo               bool
 194	Sessions             session.Service
 195	Messages             message.Service
 196	Tools                []fantasy.AgentTool
 197	Notify               pubsub.Publisher[notify.Notification]
 198	RunComplete          pubsub.Publisher[notify.RunComplete]
 199}
 200
 201func NewSessionAgent(
 202	opts SessionAgentOptions,
 203) SessionAgent {
 204	return &sessionAgent{
 205		largeModel:           csync.NewValue(opts.LargeModel),
 206		smallModel:           csync.NewValue(opts.SmallModel),
 207		systemPromptPrefix:   csync.NewValue(opts.SystemPromptPrefix),
 208		systemPrompt:         csync.NewValue(opts.SystemPrompt),
 209		isSubAgent:           opts.IsSubAgent,
 210		sessions:             opts.Sessions,
 211		messages:             opts.Messages,
 212		disableAutoSummarize: opts.DisableAutoSummarize,
 213		tools:                csync.NewSliceFrom(opts.Tools),
 214		isYolo:               opts.IsYolo,
 215		notify:               opts.Notify,
 216		runComplete:          opts.RunComplete,
 217		messageQueue:         csync.NewMap[string, []SessionAgentCall](),
 218		activeRequests:       csync.NewMap[string, context.CancelFunc](),
 219		dispatchMu:           csync.NewMap[string, *sync.Mutex](),
 220		acceptedRuns:         csync.NewMap[string, int](),
 221		pendingCancels:       csync.NewMap[string, struct{}](),
 222	}
 223}
 224
 225// AcceptedRun owns exactly one accept reservation taken by
 226// BeginAccepted. It is the only carrier of accept-state across the
 227// backend.runAgent / Coordinator.Run / sessionAgent.Run layers: a
 228// counter > 0 means a dispatched prompt is in flight and has not yet
 229// completed the dispatch handoff in Run. Close is the only way to
 230// release the reservation and is idempotent.
 231type AcceptedRun struct {
 232	agent     *sessionAgent
 233	sessionID string
 234	done      atomic.Bool
 235}
 236
 237// Close decrements the accept counter for this reservation. It is safe
 238// to call multiple times; only the first call has effect.
 239func (r *AcceptedRun) Close() {
 240	if r == nil {
 241		return
 242	}
 243	if !r.done.CompareAndSwap(false, true) {
 244		return
 245	}
 246	r.agent.endAccepted(r.sessionID)
 247}
 248
 249// SessionID exposes the session this reservation is for so the run path
 250// can use it without an extra parameter.
 251func (r *AcceptedRun) SessionID() string {
 252	if r == nil {
 253		return ""
 254	}
 255	return r.sessionID
 256}
 257
 258// BeginAccepted increments the accept counter for sessionID and returns
 259// a handle whose Close is the only way to decrement it. It is the only
 260// entry point that mutates acceptedRuns.
 261func (a *sessionAgent) BeginAccepted(sessionID string) *AcceptedRun {
 262	a.acceptedMu.Lock()
 263	defer a.acceptedMu.Unlock()
 264	count, _ := a.acceptedRuns.Get(sessionID)
 265	a.acceptedRuns.Set(sessionID, count+1)
 266	return &AcceptedRun{agent: a, sessionID: sessionID}
 267}
 268
 269// endAccepted decrements the accept counter for sessionID. It is only
 270// called via AcceptedRun.Close. It uses a dedicated lock (not the
 271// per-session dispatch mutex) so it can run while Run holds dispatchMu
 272// for the same session without deadlocking.
 273func (a *sessionAgent) endAccepted(sessionID string) {
 274	a.acceptedMu.Lock()
 275	defer a.acceptedMu.Unlock()
 276	count, ok := a.acceptedRuns.Get(sessionID)
 277	if !ok || count <= 1 {
 278		a.acceptedRuns.Del(sessionID)
 279		return
 280	}
 281	a.acceptedRuns.Set(sessionID, count-1)
 282}
 283
 284// sessionMu returns the per-session dispatch mutex, creating it on first
 285// use. Creation is guarded so concurrent callers always observe the same
 286// mutex instance for a given session.
 287func (a *sessionAgent) sessionMu(sessionID string) *sync.Mutex {
 288	if mu, ok := a.dispatchMu.Get(sessionID); ok {
 289		return mu
 290	}
 291	a.dispatchMuCreate.Lock()
 292	defer a.dispatchMuCreate.Unlock()
 293	if mu, ok := a.dispatchMu.Get(sessionID); ok {
 294		return mu
 295	}
 296	mu := &sync.Mutex{}
 297	a.dispatchMu.Set(sessionID, mu)
 298	return mu
 299}
 300
 301// enqueueCall appends call to the session's message queue. The
 302// OnComplete hook is stripped: the caller that supplied it (typically
 303// coordinator.Run) has its own retry/coalesce scope that ends when it
 304// returns, so by the time the queue drains nobody is left to consume the
 305// buffered terminal event. The recursive Run falls back to the default
 306// broker publish, which is what existing subscribers expect for queued
 307// turns.
 308func (a *sessionAgent) enqueueCall(call SessionAgentCall) {
 309	existing, ok := a.messageQueue.Get(call.SessionID)
 310	if !ok {
 311		existing = []SessionAgentCall{}
 312	}
 313	queued := call
 314	queued.OnComplete = nil
 315	queued.Accepted = nil
 316	existing = append(existing, queued)
 317	a.messageQueue.Set(call.SessionID, existing)
 318}
 319
 320// clearPendingCancel removes any pending-cancel record for sessionID. It
 321// takes the per-session dispatch lock so it is ordered against Cancel and
 322// the dispatch handoff.
 323func (a *sessionAgent) clearPendingCancel(sessionID string) {
 324	mu := a.sessionMu(sessionID)
 325	mu.Lock()
 326	defer mu.Unlock()
 327	a.pendingCancels.Del(sessionID)
 328}
 329
 330// persistCanceledTurn writes the user/assistant records for a turn that
 331// was canceled before (or just as) streaming would have produced them.
 332// It creates the user message only when it was not already created by an
 333// earlier createUserMessage call (userMsgCreated), then writes an
 334// assistant message with FinishReasonCanceled. Both writes use
 335// context.WithoutCancel(ctx) so workspace shutdown (which cancels the run
 336// context) can't drop them.
 337func (a *sessionAgent) persistCanceledTurn(ctx context.Context, call SessionAgentCall, userMsgCreated bool) error {
 338	writeCtx, cancel := context.WithTimeout(context.WithoutCancel(ctx), 5*time.Second)
 339	defer cancel()
 340	if !userMsgCreated {
 341		if _, err := a.createUserMessage(writeCtx, call); err != nil {
 342			return err
 343		}
 344	}
 345	largeModel := a.largeModel.Get()
 346	assistant, err := a.messages.Create(writeCtx, call.SessionID, message.CreateMessageParams{
 347		Role:     message.Assistant,
 348		Parts:    []message.ContentPart{},
 349		Model:    largeModel.ModelCfg.Model,
 350		Provider: largeModel.ModelCfg.Provider,
 351	})
 352	if err != nil {
 353		return err
 354	}
 355	assistant.AddFinish(message.FinishReasonCanceled, "User canceled request", "")
 356	return a.messages.Update(writeCtx, assistant)
 357}
 358
 359// ValidateCall performs the cheap structural validation that
 360// sessionAgent.Run requires before a call can be dispatched: a call must
 361// carry either a non-empty prompt or a text attachment, and it must name a
 362// session. It is exported so callers that accept a run before dispatching it
 363// (e.g. backend.SendMessage) can apply the same checks and keep the error
 364// contract consistent.
 365func ValidateCall(call SessionAgentCall) error {
 366	if call.Prompt == "" && !message.ContainsTextAttachment(call.Attachments) {
 367		return ErrEmptyPrompt
 368	}
 369	if call.SessionID == "" {
 370		return ErrSessionMissing
 371	}
 372	return nil
 373}
 374
 375func (a *sessionAgent) Run(ctx context.Context, call SessionAgentCall) (result *fantasy.AgentResult, retErr error) {
 376	if err := ValidateCall(call); err != nil {
 377		return nil, err
 378	}
 379
 380	// genCtx/cancel are the run context and its cancel func. For the
 381	// accepted (fire-and-forget) dispatch path they are created under
 382	// dispatchMu below so a concurrent Cancel can observe the
 383	// activeRequests entry before the assistant message exists. For
 384	// the in-process path they stay nil here and are created later,
 385	// preserving the original ordering.
 386	var (
 387		genCtx           context.Context
 388		cancel           context.CancelFunc
 389		activeRegistered bool
 390		userMsgCreated   bool
 391	)
 392
 393	if call.Accepted != nil {
 394		// Serialize the accepted -> (cancel-on-entry | queued |
 395		// active) transition against a concurrent Cancel. Cancel takes
 396		// the same per-session lock, so every cancel observes at least
 397		// one of: pendingCancels, an activeRequests entry, or a
 398		// messageQueue entry it then clears.
 399		mu := a.sessionMu(call.SessionID)
 400		mu.Lock()
 401
 402		if _, pending := a.pendingCancels.Get(call.SessionID); pending {
 403			// Cancel-on-entry: a cancel arrived while this run was
 404			// dispatched but not yet active. Consume the pending
 405			// cancel, release the accept reservation, drop the lock,
 406			// and persist a canceled turn without entering Stream.
 407			a.pendingCancels.Del(call.SessionID)
 408			call.Accepted.Close()
 409			mu.Unlock()
 410			if err := a.persistCanceledTurn(ctx, call, false); err != nil {
 411				return nil, err
 412			}
 413			return nil, nil
 414		}
 415
 416		if a.IsSessionBusy(call.SessionID) {
 417			// Busy: an earlier prompt is active. Queue this call and
 418			// release the accept reservation. A Cancel arriving after
 419			// this point sees the active entry and clears the queue.
 420			a.enqueueCall(call)
 421			call.Accepted.Close()
 422			mu.Unlock()
 423			return nil, nil
 424		}
 425
 426		// Idle: become the active run. Register the cancel func before
 427		// dropping the lock so a Cancel that arrives between here and
 428		// assistant creation is not lost.
 429		runCtx := context.WithValue(ctx, tools.SessionIDContextKey, call.SessionID)
 430		genCtx, cancel = context.WithCancel(runCtx)
 431		a.activeRequests.Set(call.SessionID, cancel)
 432		activeRegistered = true
 433		call.Accepted.Close()
 434		mu.Unlock()
 435
 436		defer cancel()
 437		defer a.activeRequests.Del(call.SessionID)
 438	} else if a.IsSessionBusy(call.SessionID) {
 439		// Queue the message if busy. Strip OnComplete: the caller that
 440		// supplied the hook (typically coordinator.Run) has its own
 441		// retry/coalesce scope that ends when it returns, so by the time
 442		// the queue drains nobody is left to consume the buffered
 443		// terminal event. The recursive Run will fall back to the
 444		// default broker publish, which is what existing subscribers
 445		// expect for queued turns.
 446		a.enqueueCall(call)
 447		return nil, nil
 448	}
 449
 450	// Copy mutable fields under lock to avoid races with SetTools/SetModels.
 451	agentTools := a.tools.Copy()
 452	largeModel := a.largeModel.Get()
 453	systemPrompt := a.systemPrompt.Get()
 454	promptPrefix := a.systemPromptPrefix.Get()
 455	var instructions strings.Builder
 456
 457	for _, server := range mcp.GetStates() {
 458		if server.State != mcp.StateConnected {
 459			continue
 460		}
 461		if s := server.Client.InitializeResult().Instructions; s != "" {
 462			instructions.WriteString(s)
 463			instructions.WriteString("\n\n")
 464		}
 465	}
 466
 467	if s := instructions.String(); s != "" {
 468		systemPrompt += "\n\n<mcp-instructions>\n" + s + "\n</mcp-instructions>"
 469	}
 470
 471	if len(agentTools) > 0 {
 472		// Add Anthropic caching to the last tool.
 473		agentTools[len(agentTools)-1].SetProviderOptions(a.getCacheControlOptions())
 474	}
 475
 476	agent := fantasy.NewAgent(
 477		largeModel.Model,
 478		fantasy.WithSystemPrompt(systemPrompt),
 479		fantasy.WithTools(agentTools...),
 480		fantasy.WithUserAgent(userAgent),
 481	)
 482
 483	sessionLock := sync.Mutex{}
 484	currentSession, err := a.sessions.Get(ctx, call.SessionID)
 485	if err != nil {
 486		return nil, fmt.Errorf("failed to get session: %w", err)
 487	}
 488
 489	msgs, err := a.getSessionMessages(ctx, currentSession)
 490	if err != nil {
 491		return nil, fmt.Errorf("failed to get session messages: %w", err)
 492	}
 493
 494	var wg sync.WaitGroup
 495	// Generate title if first message.
 496	if len(msgs) == 0 {
 497		titleCtx := ctx // Copy to avoid race with ctx reassignment below.
 498		wg.Go(func() {
 499			a.generateTitle(titleCtx, call.SessionID, call.Prompt)
 500		})
 501	}
 502	defer wg.Wait()
 503
 504	// Add the user message to the session.
 505	_, err = a.createUserMessage(ctx, call)
 506	if err != nil {
 507		return nil, err
 508	}
 509	userMsgCreated = true
 510
 511	// Add the session to the context.
 512	ctx = context.WithValue(ctx, tools.SessionIDContextKey, call.SessionID)
 513
 514	// For the accepted dispatch path the run context and cancel func
 515	// were already created and registered under dispatchMu above; reuse
 516	// them. For the in-process path create them here, preserving the
 517	// original ordering.
 518	if !activeRegistered {
 519		genCtx, cancel = context.WithCancel(ctx)
 520		a.activeRequests.Set(call.SessionID, cancel)
 521
 522		defer cancel()
 523		defer a.activeRequests.Del(call.SessionID)
 524	}
 525	// skipRunComplete is set just before the queued-recursion path so
 526	// the outer Run doesn't publish a RunComplete that would race
 527	// with — and be superseded by — the recursive call's own
 528	// RunComplete (each queued user prompt is its own turn and
 529	// publishes exactly one terminal event).
 530	var skipRunComplete bool
 531	// currentAssistant is declared here so the deferred RunComplete
 532	// publish below can capture the pointer that PrepareStep will
 533	// later (re)assign for each streaming step. The final assistant
 534	// message of the turn is the value reachable through this
 535	// pointer when the defer runs.
 536	var currentAssistant *message.Message
 537	// Drain any debounced message updates before returning. message.Service
 538	// already flushes synchronously on terminal updates, but a defer here
 539	// guarantees the contract at every Run exit (success, error, panic
 540	// recovery upstream) without callers needing to know.
 541	//
 542	// After the flush completes — meaning all per-message
 543	// Publish(UpdatedEvent) calls have fired and been buffered into
 544	// every subscriber's channel — publish the authoritative
 545	// RunComplete event for this turn. The flush-then-publish order
 546	// gives well-behaved clients the best chance of seeing the final
 547	// message event before RunComplete; the embedded Text field
 548	// reconciles for clients that observe the events out of order
 549	// (the pubsub broker fan-in does not serialize publishes from
 550	// different upstream brokers).
 551	defer func() {
 552		// Use a context detached from the run context: workspace
 553		// shutdown cancels ctx before this goroutine returns, but the
 554		// buffered streaming deltas must still land before the DB is
 555		// closed. A short timeout bounds the flush.
 556		flushCtx, flushCancel := context.WithTimeout(context.WithoutCancel(ctx), 5*time.Second)
 557		defer flushCancel()
 558		if flushErr := a.messages.FlushAll(flushCtx); flushErr != nil {
 559			slog.Error("Failed to flush pending message updates after run", "error", flushErr)
 560		}
 561		if skipRunComplete {
 562			return
 563		}
 564		complete := notify.RunComplete{SessionID: call.SessionID, RunID: call.RunID}
 565		if currentAssistant != nil {
 566			complete.MessageID = currentAssistant.ID
 567			complete.Text = currentAssistant.Content().String()
 568		}
 569		if retErr != nil {
 570			complete.Error = retErr.Error()
 571			complete.Cancelled = errors.Is(retErr, context.Canceled)
 572		} else if ctx.Err() != nil {
 573			complete.Cancelled = true
 574		}
 575		// Prefer the per-call hook when supplied so the coordinator
 576		// can coalesce retries (e.g. unauthorized → re-auth → retry)
 577		// into a single user-visible terminal event. The fallback
 578		// must-deliver publish applies bounded-blocking semantics to
 579		// the authoritative terminal event so a momentarily-full
 580		// subscriber channel can't silently drop it and hang
 581		// non-interactive clients waiting on RunComplete.
 582		if call.OnComplete != nil {
 583			call.OnComplete(complete)
 584			return
 585		}
 586		if a.runComplete == nil {
 587			return
 588		}
 589		a.runComplete.PublishMustDeliver(ctx, pubsub.UpdatedEvent, complete)
 590	}()
 591
 592	history, files := a.preparePrompt(msgs, largeModel.CatwalkCfg.SupportsImages, call.Attachments...)
 593
 594	startTime := time.Now()
 595	a.eventPromptSent(call.SessionID)
 596
 597	var stepMessages []fantasy.Message
 598	var shouldSummarize bool
 599	// Don't send MaxOutputTokens if 0 — some providers (e.g. LM Studio) reject it
 600	var maxOutputTokens *int64
 601	if call.MaxOutputTokens > 0 {
 602		maxOutputTokens = &call.MaxOutputTokens
 603	}
 604	result, err = agent.Stream(genCtx, fantasy.AgentStreamCall{
 605		Prompt:           message.PromptWithTextAttachments(call.Prompt, call.Attachments),
 606		Files:            files,
 607		Messages:         history,
 608		ProviderOptions:  call.ProviderOptions,
 609		MaxOutputTokens:  maxOutputTokens,
 610		TopP:             call.TopP,
 611		Temperature:      call.Temperature,
 612		PresencePenalty:  call.PresencePenalty,
 613		TopK:             call.TopK,
 614		FrequencyPenalty: call.FrequencyPenalty,
 615		PrepareStep: func(callContext context.Context, options fantasy.PrepareStepFunctionOptions) (_ context.Context, prepared fantasy.PrepareStepResult, err error) {
 616			prepared.Messages = options.Messages
 617			for i := range prepared.Messages {
 618				prepared.Messages[i].ProviderOptions = nil
 619			}
 620
 621			// Use latest tools (updated by SetTools when MCP tools change).
 622			prepared.Tools = a.tools.Copy()
 623
 624			// Drain queued follow-up prompts, but skip them if a cancel
 625			// was recorded for the session while they sat in the queue:
 626			// a cancel that arrived after the queue insertion must not
 627			// let the queued prompt run as part of this step.
 628			dispatchLock := a.sessionMu(call.SessionID)
 629			dispatchLock.Lock()
 630			_, canceled := a.pendingCancels.Get(call.SessionID)
 631			queuedCalls, _ := a.messageQueue.Get(call.SessionID)
 632			a.messageQueue.Del(call.SessionID)
 633			dispatchLock.Unlock()
 634			if !canceled {
 635				for _, queued := range queuedCalls {
 636					userMessage, createErr := a.createUserMessage(callContext, queued)
 637					if createErr != nil {
 638						return callContext, prepared, createErr
 639					}
 640					prepared.Messages = append(prepared.Messages, userMessage.ToAIMessage()...)
 641				}
 642			}
 643
 644			prepared.Messages = a.workaroundProviderMediaLimitations(prepared.Messages, largeModel)
 645
 646			lastSystemRoleInx := 0
 647			systemMessageUpdated := false
 648			for i, msg := range prepared.Messages {
 649				// Only add cache control to the last message.
 650				if msg.Role == fantasy.MessageRoleSystem {
 651					lastSystemRoleInx = i
 652				} else if !systemMessageUpdated {
 653					prepared.Messages[lastSystemRoleInx].ProviderOptions = a.getCacheControlOptions()
 654					systemMessageUpdated = true
 655				}
 656				// Than add cache control to the last 2 messages.
 657				if i > len(prepared.Messages)-3 {
 658					prepared.Messages[i].ProviderOptions = a.getCacheControlOptions()
 659				}
 660			}
 661
 662			if promptPrefix != "" {
 663				prepared.Messages = append([]fantasy.Message{fantasy.NewSystemMessage(promptPrefix)}, prepared.Messages...)
 664			}
 665
 666			sessionLock.Lock()
 667			stepMessages = cloneFantasyMessages(prepared.Messages)
 668			sessionLock.Unlock()
 669
 670			var assistantMsg message.Message
 671			assistantMsg, err = a.messages.Create(callContext, call.SessionID, message.CreateMessageParams{
 672				Role:     message.Assistant,
 673				Parts:    []message.ContentPart{},
 674				Model:    largeModel.ModelCfg.Model,
 675				Provider: largeModel.ModelCfg.Provider,
 676			})
 677			if err != nil {
 678				return callContext, prepared, err
 679			}
 680			callContext = context.WithValue(callContext, tools.MessageIDContextKey, assistantMsg.ID)
 681			callContext = context.WithValue(callContext, tools.SupportsImagesContextKey, largeModel.CatwalkCfg.SupportsImages)
 682			callContext = context.WithValue(callContext, tools.ModelNameContextKey, largeModel.CatwalkCfg.Name)
 683			currentAssistant = &assistantMsg
 684			return callContext, prepared, err
 685		},
 686		OnReasoningStart: func(id string, reasoning fantasy.ReasoningContent) error {
 687			currentAssistant.AppendReasoningContent(reasoning.Text)
 688			return a.messages.Update(genCtx, *currentAssistant)
 689		},
 690		OnReasoningDelta: func(id string, text string) error {
 691			currentAssistant.AppendReasoningContent(text)
 692			return a.messages.Update(genCtx, *currentAssistant)
 693		},
 694		OnReasoningEnd: func(id string, reasoning fantasy.ReasoningContent) error {
 695			// handle anthropic signature
 696			if anthropicData, ok := reasoning.ProviderMetadata[anthropic.Name]; ok {
 697				if reasoning, ok := anthropicData.(*anthropic.ReasoningOptionMetadata); ok {
 698					currentAssistant.AppendReasoningSignature(reasoning.Signature)
 699				}
 700			}
 701			if googleData, ok := reasoning.ProviderMetadata[google.Name]; ok {
 702				if reasoning, ok := googleData.(*google.ReasoningMetadata); ok {
 703					currentAssistant.AppendThoughtSignature(reasoning.Signature, reasoning.ToolID)
 704				}
 705			}
 706			if openaiData, ok := reasoning.ProviderMetadata[openai.Name]; ok {
 707				if reasoning, ok := openaiData.(*openai.ResponsesReasoningMetadata); ok {
 708					currentAssistant.SetReasoningResponsesData(reasoning)
 709				}
 710			}
 711			currentAssistant.FinishThinking()
 712			return a.messages.Update(genCtx, *currentAssistant)
 713		},
 714		OnTextDelta: func(id string, text string) error {
 715			// Strip leading newline from initial text content. This is is
 716			// particularly important in non-interactive mode where leading
 717			// newlines are very visible.
 718			if len(currentAssistant.Parts) == 0 {
 719				text = strings.TrimPrefix(text, "\n")
 720			}
 721
 722			currentAssistant.AppendContent(text)
 723			return a.messages.Update(genCtx, *currentAssistant)
 724		},
 725		OnToolInputStart: func(id string, toolName string) error {
 726			toolCall := message.ToolCall{
 727				ID:               id,
 728				Name:             toolName,
 729				ProviderExecuted: false,
 730				Finished:         false,
 731			}
 732			currentAssistant.AddToolCall(toolCall)
 733			// Use parent ctx instead of genCtx to ensure the update succeeds
 734			// even if the request is canceled mid-stream
 735			return a.messages.Update(ctx, *currentAssistant)
 736		},
 737		OnRetry: func(err *fantasy.ProviderError, delay time.Duration) {
 738			slog.Warn("Provider request failed, retrying", providerRetryLogFields(err, delay)...)
 739		},
 740		OnToolCall: func(tc fantasy.ToolCallContent) error {
 741			toolCall := message.ToolCall{
 742				ID:               tc.ToolCallID,
 743				Name:             tc.ToolName,
 744				Input:            tc.Input,
 745				ProviderExecuted: false,
 746				Finished:         true,
 747			}
 748			currentAssistant.AddToolCall(toolCall)
 749			// Use parent ctx instead of genCtx to ensure the update succeeds
 750			// even if the request is canceled mid-stream
 751			return a.messages.Update(ctx, *currentAssistant)
 752		},
 753		OnToolResult: func(result fantasy.ToolResultContent) error {
 754			toolResult := a.convertToToolResult(result)
 755			// Use parent ctx instead of genCtx to ensure the message is created
 756			// even if the request is canceled mid-stream
 757			_, createMsgErr := a.messages.Create(ctx, currentAssistant.SessionID, message.CreateMessageParams{
 758				Role: message.Tool,
 759				Parts: []message.ContentPart{
 760					toolResult,
 761				},
 762			})
 763			return createMsgErr
 764		},
 765		OnStepFinish: func(stepResult fantasy.StepResult) error {
 766			finishReason := message.FinishReasonUnknown
 767			switch stepResult.FinishReason {
 768			case fantasy.FinishReasonLength:
 769				finishReason = message.FinishReasonMaxTokens
 770			case fantasy.FinishReasonStop:
 771				finishReason = message.FinishReasonEndTurn
 772			case fantasy.FinishReasonToolCalls:
 773				finishReason = message.FinishReasonToolUse
 774			}
 775			// If a tool result halted the turn (e.g. a hook halt or a
 776			// permission denial), the step ends on FinishReasonToolCalls but
 777			// the model will not be called again. Treat it as the end of the
 778			// turn so the UI can render the assistant footer.
 779			if finishReason == message.FinishReasonToolUse {
 780				for _, tr := range stepResult.Content.ToolResults() {
 781					if tr.StopTurn {
 782						finishReason = message.FinishReasonEndTurn
 783						break
 784					}
 785				}
 786			}
 787			currentAssistant.AddFinish(finishReason, "", "")
 788			sessionLock.Lock()
 789			defer sessionLock.Unlock()
 790
 791			updatedSession, getSessionErr := a.sessions.Get(ctx, call.SessionID)
 792			if getSessionErr != nil {
 793				return getSessionErr
 794			}
 795			usage, estimated := fallbackStepUsage(stepMessages, stepResult)
 796			a.updateSessionUsage(largeModel, &updatedSession, usage, a.openrouterCost(stepResult.ProviderMetadata), estimated)
 797			_, sessionErr := a.sessions.Save(ctx, updatedSession)
 798			if sessionErr != nil {
 799				return sessionErr
 800			}
 801			currentSession = updatedSession
 802			return a.messages.Update(genCtx, *currentAssistant)
 803		},
 804		StopWhen: []fantasy.StopCondition{
 805			func(_ []fantasy.StepResult) bool {
 806				cw := int64(largeModel.CatwalkCfg.ContextWindow)
 807				// If context window is unknown (0), skip auto-summarize
 808				// to avoid immediately truncating custom/local models.
 809				if cw == 0 {
 810					return false
 811				}
 812				tokens := currentSession.CompletionTokens + currentSession.PromptTokens
 813				remaining := cw - tokens
 814				var threshold int64
 815				if cw > largeContextWindowThreshold {
 816					threshold = largeContextWindowBuffer
 817				} else {
 818					threshold = int64(float64(cw) * smallContextWindowRatio)
 819				}
 820				if (remaining <= threshold) && !a.disableAutoSummarize {
 821					shouldSummarize = true
 822					return true
 823				}
 824				return false
 825			},
 826			func(steps []fantasy.StepResult) bool {
 827				return hasRepeatedToolCalls(steps, loopDetectionWindowSize, loopDetectionMaxRepeats)
 828			},
 829		},
 830	})
 831
 832	a.eventPromptResponded(call.SessionID, time.Since(startTime).Truncate(time.Second))
 833
 834	if err != nil {
 835		isHyper := largeModel.ModelCfg.Provider == hyper.Name
 836		isCancelErr := errors.Is(err, context.Canceled)
 837		if currentAssistant == nil {
 838			// Cancel-before-assistant-creation window: the run was
 839			// canceled after activeRequests.Set but before PrepareStep
 840			// created the assistant message. Without this, the turn
 841			// would return with no FinishReasonCanceled marker and no
 842			// user-visible record. The user message was already created
 843			// above, so persistCanceledTurn only writes the assistant
 844			// record.
 845			if isCancelErr {
 846				if persistErr := a.persistCanceledTurn(ctx, call, userMsgCreated); persistErr != nil {
 847					return nil, persistErr
 848				}
 849			}
 850			return result, err
 851		}
 852		// Persist final state with a context detached from the run
 853		// context. The run context (ctx) is derived from the
 854		// workspace context, which workspace shutdown cancels before
 855		// agent goroutines finish; using ctx here would drop the
 856		// final assistant state. WithoutCancel keeps the values
 857		// (e.g. session ID) while ignoring cancellation, and a short
 858		// timeout bounds the cleanup writes.
 859		cleanupCtx, cleanupCancel := context.WithTimeout(context.WithoutCancel(ctx), 5*time.Second)
 860		defer cleanupCancel()
 861		// Ensure we finish thinking on error to close the reasoning state.
 862		currentAssistant.FinishThinking()
 863		toolCalls := currentAssistant.ToolCalls()
 864		// INFO: we use the cleanup context here because the genCtx has been cancelled.
 865		msgs, createErr := a.messages.List(cleanupCtx, currentAssistant.SessionID)
 866		if createErr != nil {
 867			return nil, createErr
 868		}
 869		for _, tc := range toolCalls {
 870			if !tc.Finished {
 871				tc.Finished = true
 872				tc.Input = "{}"
 873				currentAssistant.AddToolCall(tc)
 874				updateErr := a.messages.Update(cleanupCtx, *currentAssistant)
 875				if updateErr != nil {
 876					return nil, updateErr
 877				}
 878			}
 879
 880			found := false
 881			for _, msg := range msgs {
 882				if msg.Role == message.Tool {
 883					for _, tr := range msg.ToolResults() {
 884						if tr.ToolCallID == tc.ID {
 885							found = true
 886							break
 887						}
 888					}
 889				}
 890				if found {
 891					break
 892				}
 893			}
 894			if found {
 895				continue
 896			}
 897			content := "There was an error while executing the tool"
 898			if isCancelErr {
 899				content = "Error: user cancelled assistant tool calling"
 900			}
 901			toolResult := message.ToolResult{
 902				ToolCallID: tc.ID,
 903				Name:       tc.Name,
 904				Content:    content,
 905				IsError:    true,
 906			}
 907			_, createErr = a.messages.Create(cleanupCtx, currentAssistant.SessionID, message.CreateMessageParams{
 908				Role: message.Tool,
 909				Parts: []message.ContentPart{
 910					toolResult,
 911				},
 912			})
 913			if createErr != nil {
 914				return nil, createErr
 915			}
 916		}
 917		var fantasyErr *fantasy.Error
 918		var providerErr *fantasy.ProviderError
 919		const defaultTitle = "Provider Error"
 920		linkStyle := lipgloss.NewStyle().Foreground(charmtone.Guac).Underline(true)
 921		if isCancelErr {
 922			currentAssistant.AddFinish(message.FinishReasonCanceled, "User canceled request", "")
 923		} else if isHyper && errors.As(err, &providerErr) && providerErr.StatusCode == http.StatusUnauthorized {
 924			currentAssistant.AddFinish(message.FinishReasonError, "Unauthorized", `Please re-authenticate with Hyper. You can also run "crush auth" to re-authenticate.`)
 925			if a.notify != nil {
 926				a.notify.Publish(pubsub.CreatedEvent, notify.Notification{
 927					SessionID:    call.SessionID,
 928					SessionTitle: currentSession.Title,
 929					Type:         notify.TypeReAuthenticate,
 930					ProviderID:   largeModel.ModelCfg.Provider,
 931				})
 932			}
 933		} else if isHyper && errors.As(err, &providerErr) && providerErr.StatusCode == http.StatusPaymentRequired {
 934			url := hyper.BaseURL()
 935			link := linkStyle.Hyperlink(url, "id=hyper").Render(url)
 936			currentAssistant.AddFinish(message.FinishReasonError, "No credits", "You're out of credits. Add more at "+link)
 937		} else if errors.As(err, &providerErr) {
 938			if providerErr.Message == "The requested model is not supported." {
 939				url := "https://github.com/settings/copilot/features"
 940				link := linkStyle.Hyperlink(url, "id=copilot").Render(url)
 941				currentAssistant.AddFinish(
 942					message.FinishReasonError,
 943					"Copilot model not enabled",
 944					fmt.Sprintf("%q is not enabled in Copilot. Go to the following page to enable it. Then, wait 5 minutes before trying again. %s", largeModel.CatwalkCfg.Name, link),
 945				)
 946			} else {
 947				currentAssistant.AddFinish(message.FinishReasonError, cmp.Or(stringext.Capitalize(providerErr.Title), defaultTitle), providerErr.Message)
 948			}
 949		} else if errors.As(err, &fantasyErr) {
 950			currentAssistant.AddFinish(message.FinishReasonError, cmp.Or(stringext.Capitalize(fantasyErr.Title), defaultTitle), fantasyErr.Message)
 951		} else {
 952			currentAssistant.AddFinish(message.FinishReasonError, defaultTitle, err.Error())
 953		}
 954		// Note: we use the cleanup context here because the genCtx has been
 955		// cancelled.
 956		updateErr := a.messages.Update(cleanupCtx, *currentAssistant)
 957		if updateErr != nil {
 958			return nil, updateErr
 959		}
 960		return nil, err
 961	}
 962
 963	if shouldSummarize {
 964		a.activeRequests.Del(call.SessionID)
 965		if summarizeErr := a.Summarize(genCtx, call.SessionID, call.ProviderOptions); summarizeErr != nil {
 966			return nil, summarizeErr
 967		}
 968		// If the agent wasn't done...
 969		if len(currentAssistant.ToolCalls()) > 0 {
 970			existing, ok := a.messageQueue.Get(call.SessionID)
 971			if !ok {
 972				existing = []SessionAgentCall{}
 973			}
 974			call.Prompt = fmt.Sprintf("The previous session was interrupted because it got too long, the initial user request was: `%s`", call.Prompt)
 975			existing = append(existing, call)
 976			a.messageQueue.Set(call.SessionID, existing)
 977		}
 978	}
 979
 980	// Release active request before publishing the notification.
 981	// TUI handlers poll IsSessionBusy() and only re-evaluate when a
 982	// tea.Msg arrives, so the cleanup must precede the notify or
 983	// subscribers see stale busy state at the moment of receipt.
 984	a.activeRequests.Del(call.SessionID)
 985	cancel()
 986
 987	// Send notification that agent has finished its turn (skip for
 988	// nested/non-interactive sessions).
 989	if !call.NonInteractive && a.notify != nil {
 990		a.notify.Publish(pubsub.CreatedEvent, notify.Notification{
 991			SessionID:    call.SessionID,
 992			SessionTitle: currentSession.Title,
 993			Type:         notify.TypeAgentFinished,
 994		})
 995	}
 996
 997	// Hand off to the next queued prompt (if any) under dispatchMu so
 998	// the transition from this finished run to the queued run is atomic
 999	// against a concurrent Cancel. activeRequests for this session was
1000	// just deleted above, so without the lock there is a window in
1001	// which the session looks idle and a cancel becomes a no-op that
1002	// fails to stop the queued prompt. Holding the lock lets us observe
1003	// a pending cancel recorded against the session and drop the queue
1004	// instead of running it, and (for the recursion) hand a fresh
1005	// accept reservation to the dequeued call so acceptedRuns stays > 0
1006	// across the recursive Run's own dispatch handoff — keeping the
1007	// session observable to Cancel for the entire transition and
1008	// closing the dequeue -> re-register window.
1009	mu := a.sessionMu(call.SessionID)
1010	mu.Lock()
1011	if _, pending := a.pendingCancels.Get(call.SessionID); pending {
1012		// A cancel was recorded for this session (e.g. it arrived while
1013		// this run was active and a follow-up had been accepted). Drop
1014		// the queue instead of running it and consume the marker.
1015		a.pendingCancels.Del(call.SessionID)
1016		a.messageQueue.Del(call.SessionID)
1017		mu.Unlock()
1018		return result, err
1019	}
1020	queuedMessages, ok := a.messageQueue.Get(call.SessionID)
1021	if !ok || len(queuedMessages) == 0 {
1022		// No queued work. Clear any stale pending-cancel entry as a
1023		// safety net so it can't catch a future run (no-op when unset).
1024		a.pendingCancels.Del(call.SessionID)
1025		mu.Unlock()
1026		return result, err
1027	}
1028	// There are queued messages restart the loop. The recursive Run
1029	// publishes its own RunComplete for the queued prompt, so suppress
1030	// the outer defer's emit to avoid a duplicate event whose Error
1031	// field would belong to the recursive turn but whose MessageID/Text
1032	// would belong to the outer turn.
1033	skipRunComplete = true
1034	firstQueuedMessage := queuedMessages[0]
1035	a.messageQueue.Set(call.SessionID, queuedMessages[1:])
1036	// Reserve a fresh accept for the dequeued prompt before dropping the
1037	// lock so acceptedRuns > 0 across the handoff into the recursive
1038	// Run. This closes the window between this dequeue and the recursive
1039	// Run registering its activeRequests entry: a cancel arriving in
1040	// that window now records a pending cancel (acceptedRuns > 0) that
1041	// the recursive Run's accepted path observes as cancel-on-entry.
1042	firstQueuedMessage.Accepted = a.BeginAccepted(call.SessionID)
1043	mu.Unlock()
1044	return a.Run(ctx, firstQueuedMessage)
1045}
1046
1047func (a *sessionAgent) Summarize(ctx context.Context, sessionID string, opts fantasy.ProviderOptions) error {
1048	if a.IsSessionBusy(sessionID) {
1049		return ErrSessionBusy
1050	}
1051
1052	// Copy mutable fields under lock to avoid races with SetModels.
1053	largeModel := a.largeModel.Get()
1054	systemPromptPrefix := a.systemPromptPrefix.Get()
1055
1056	currentSession, err := a.sessions.Get(ctx, sessionID)
1057	if err != nil {
1058		return fmt.Errorf("failed to get session: %w", err)
1059	}
1060	msgs, err := a.getSessionMessages(ctx, currentSession)
1061	if err != nil {
1062		return err
1063	}
1064	if len(msgs) == 0 {
1065		// Nothing to summarize.
1066		return nil
1067	}
1068
1069	aiMsgs, _ := a.preparePrompt(msgs, largeModel.CatwalkCfg.SupportsImages)
1070
1071	genCtx, cancel := context.WithCancel(ctx)
1072	a.activeRequests.Set(sessionID, cancel)
1073	defer a.activeRequests.Del(sessionID)
1074	defer cancel()
1075	defer func() {
1076		if flushErr := a.messages.FlushAll(ctx); flushErr != nil {
1077			slog.Error("Failed to flush pending message updates after summarize", "error", flushErr)
1078		}
1079	}()
1080
1081	agent := fantasy.NewAgent(
1082		largeModel.Model,
1083		fantasy.WithSystemPrompt(string(summaryPrompt)),
1084		fantasy.WithUserAgent(userAgent),
1085	)
1086	summaryMessage, err := a.messages.Create(ctx, sessionID, message.CreateMessageParams{
1087		Role:             message.Assistant,
1088		Model:            largeModel.ModelCfg.Model,
1089		Provider:         largeModel.ModelCfg.Provider,
1090		IsSummaryMessage: true,
1091	})
1092	if err != nil {
1093		return err
1094	}
1095
1096	summaryPromptText := buildSummaryPrompt(currentSession.Todos)
1097
1098	resp, err := agent.Stream(genCtx, fantasy.AgentStreamCall{
1099		Prompt:          summaryPromptText,
1100		Messages:        aiMsgs,
1101		ProviderOptions: opts,
1102		PrepareStep: func(callContext context.Context, options fantasy.PrepareStepFunctionOptions) (_ context.Context, prepared fantasy.PrepareStepResult, err error) {
1103			prepared.Messages = options.Messages
1104			if systemPromptPrefix != "" {
1105				prepared.Messages = append([]fantasy.Message{fantasy.NewSystemMessage(systemPromptPrefix)}, prepared.Messages...)
1106			}
1107			return callContext, prepared, nil
1108		},
1109		OnReasoningDelta: func(id string, text string) error {
1110			summaryMessage.AppendReasoningContent(text)
1111			return a.messages.Update(genCtx, summaryMessage)
1112		},
1113		OnReasoningEnd: func(id string, reasoning fantasy.ReasoningContent) error {
1114			// Handle anthropic signature.
1115			if anthropicData, ok := reasoning.ProviderMetadata["anthropic"]; ok {
1116				if signature, ok := anthropicData.(*anthropic.ReasoningOptionMetadata); ok && signature.Signature != "" {
1117					summaryMessage.AppendReasoningSignature(signature.Signature)
1118				}
1119			}
1120			summaryMessage.FinishThinking()
1121			return a.messages.Update(genCtx, summaryMessage)
1122		},
1123		OnTextDelta: func(id, text string) error {
1124			summaryMessage.AppendContent(text)
1125			return a.messages.Update(genCtx, summaryMessage)
1126		},
1127	})
1128	if err != nil {
1129		isCancelErr := errors.Is(err, context.Canceled)
1130		if isCancelErr {
1131			// User cancelled summarize we need to remove the summary message.
1132			deleteErr := a.messages.Delete(ctx, summaryMessage.ID)
1133			return deleteErr
1134		}
1135		// Mark the summary message as finished with an error so the UI
1136		// stops spinning.
1137		summaryMessage.AddFinish(message.FinishReasonError, "Summarization Error", err.Error())
1138		if updateErr := a.messages.Update(ctx, summaryMessage); updateErr != nil {
1139			return updateErr
1140		}
1141		return err
1142	}
1143
1144	summaryMessage.AddFinish(message.FinishReasonEndTurn, "", "")
1145	err = a.messages.Update(genCtx, summaryMessage)
1146	if err != nil {
1147		return err
1148	}
1149
1150	var openrouterCost *float64
1151	for _, step := range resp.Steps {
1152		stepCost := a.openrouterCost(step.ProviderMetadata)
1153		if stepCost != nil {
1154			newCost := *stepCost
1155			if openrouterCost != nil {
1156				newCost += *openrouterCost
1157			}
1158			openrouterCost = &newCost
1159		}
1160	}
1161
1162	a.updateSessionUsage(largeModel, &currentSession, resp.TotalUsage, openrouterCost, false)
1163
1164	// Just in case, get just the last usage info.
1165	usage := resp.Response.Usage
1166	currentSession.SummaryMessageID = summaryMessage.ID
1167	currentSession.CompletionTokens = summaryCompletionTokens(usage, summaryMessage)
1168	currentSession.PromptTokens = 0
1169	currentSession.EstimatedUsage = usageIsZero(usage)
1170	_, err = a.sessions.Save(genCtx, currentSession)
1171	if err != nil {
1172		return err
1173	}
1174
1175	// Release the active request before processing queued messages so that
1176	// Run() does not see the session as busy.
1177	a.activeRequests.Del(sessionID)
1178	cancel()
1179
1180	// Process any messages that were queued while summarizing.
1181	queuedMessages, ok := a.messageQueue.Get(sessionID)
1182	if !ok || len(queuedMessages) == 0 {
1183		return nil
1184	}
1185	firstQueuedMessage := queuedMessages[0]
1186	a.messageQueue.Set(sessionID, queuedMessages[1:])
1187	_, qErr := a.Run(ctx, firstQueuedMessage)
1188	return qErr
1189}
1190
1191func (a *sessionAgent) getCacheControlOptions() fantasy.ProviderOptions {
1192	if t, _ := strconv.ParseBool(os.Getenv("CRUSH_DISABLE_ANTHROPIC_CACHE")); t {
1193		return fantasy.ProviderOptions{}
1194	}
1195	return fantasy.ProviderOptions{
1196		anthropic.Name: &anthropic.ProviderCacheControlOptions{
1197			CacheControl: anthropic.CacheControl{Type: "ephemeral"},
1198		},
1199		bedrock.Name: &anthropic.ProviderCacheControlOptions{
1200			CacheControl: anthropic.CacheControl{Type: "ephemeral"},
1201		},
1202		vercel.Name: &anthropic.ProviderCacheControlOptions{
1203			CacheControl: anthropic.CacheControl{Type: "ephemeral"},
1204		},
1205	}
1206}
1207
1208func (a *sessionAgent) createUserMessage(ctx context.Context, call SessionAgentCall) (message.Message, error) {
1209	parts := []message.ContentPart{message.TextContent{Text: call.Prompt}}
1210	var attachmentParts []message.ContentPart
1211	for _, attachment := range call.Attachments {
1212		attachmentParts = append(attachmentParts, message.BinaryContent{Path: attachment.FilePath, MIMEType: attachment.MimeType, Data: attachment.Content})
1213	}
1214	parts = append(parts, attachmentParts...)
1215	msg, err := a.messages.Create(ctx, call.SessionID, message.CreateMessageParams{
1216		Role:  message.User,
1217		Parts: parts,
1218	})
1219	if err != nil {
1220		return message.Message{}, fmt.Errorf("failed to create user message: %w", err)
1221	}
1222	return msg, nil
1223}
1224
1225func (a *sessionAgent) preparePrompt(msgs []message.Message, supportsImages bool, attachments ...message.Attachment) ([]fantasy.Message, []fantasy.FilePart) {
1226	var history []fantasy.Message
1227	if !a.isSubAgent {
1228		history = append(history, fantasy.NewUserMessage(
1229			fmt.Sprintf(
1230				"<system_reminder>%s</system_reminder>",
1231				`This is a reminder that your todo list is currently empty. DO NOT mention this to the user explicitly because they are already aware.
1232If you are working on tasks that would benefit from a todo list please use the "todos" tool to create one.
1233If not, please feel free to ignore. Again do not mention this message to the user.`,
1234			),
1235		))
1236	}
1237	// Collect all tool call IDs present in assistant messages and all tool
1238	// result IDs present in tool messages. This lets us detect both orphaned
1239	// tool results (result without a call) and orphaned tool calls (call
1240	// without a result).
1241	knownToolCallIDs := make(map[string]struct{})
1242	knownToolResultIDs := make(map[string]struct{})
1243	for _, m := range msgs {
1244		switch m.Role {
1245		case message.Assistant:
1246			for _, tc := range m.ToolCalls() {
1247				knownToolCallIDs[tc.ID] = struct{}{}
1248			}
1249		case message.Tool:
1250			for _, tr := range m.ToolResults() {
1251				knownToolResultIDs[tr.ToolCallID] = struct{}{}
1252			}
1253		}
1254	}
1255
1256	for _, m := range msgs {
1257		if len(m.Parts) == 0 {
1258			continue
1259		}
1260		// Assistant message without content or tool calls (cancelled before it returned anything).
1261		if m.Role == message.Assistant && len(m.ToolCalls()) == 0 && m.Content().Text == "" && m.ReasoningContent().String() == "" {
1262			continue
1263		}
1264		if m.Role == message.Tool {
1265			if msg, ok := filterOrphanedToolResults(m, knownToolCallIDs); ok {
1266				history = append(history, msg)
1267			}
1268			continue
1269		}
1270		aiMsgs := m.ToAIMessage()
1271		if !supportsImages {
1272			for i := range aiMsgs {
1273				if aiMsgs[i].Role == fantasy.MessageRoleUser {
1274					aiMsgs[i].Content = filterFileParts(aiMsgs[i].Content)
1275				}
1276			}
1277		}
1278		history = append(history, aiMsgs...)
1279
1280		if m.Role == message.Assistant {
1281			if msg, ok := syntheticToolResultsForOrphanedCalls(m, knownToolResultIDs); ok {
1282				history = append(history, msg)
1283			}
1284		}
1285	}
1286
1287	var files []fantasy.FilePart
1288	for _, attachment := range attachments {
1289		if attachment.IsText() {
1290			continue
1291		}
1292		files = append(files, fantasy.FilePart{
1293			Filename:  attachment.FileName,
1294			Data:      attachment.Content,
1295			MediaType: attachment.MimeType,
1296		})
1297	}
1298
1299	return history, files
1300}
1301
1302// filterFileParts removes fantasy.FilePart entries from a slice of message
1303// parts. Used to strip image attachments from historical user messages when
1304// the current model does not support them.
1305func filterFileParts(parts []fantasy.MessagePart) []fantasy.MessagePart {
1306	filtered := make([]fantasy.MessagePart, 0, len(parts))
1307	for _, part := range parts {
1308		if _, ok := fantasy.AsMessagePart[fantasy.FilePart](part); ok {
1309			continue
1310		}
1311		filtered = append(filtered, part)
1312	}
1313	return filtered
1314}
1315
1316// filterOrphanedToolResults converts a tool message to a fantasy.Message,
1317// dropping any tool result parts whose tool_call_id has no matching tool call
1318// in the known set. An orphaned result causes API validation to fail on every
1319// subsequent turn, permanently locking the session. Returns the filtered
1320// message and true if at least one valid part remains.
1321func filterOrphanedToolResults(m message.Message, knownToolCallIDs map[string]struct{}) (fantasy.Message, bool) {
1322	aiMsgs := m.ToAIMessage()
1323	if len(aiMsgs) == 0 {
1324		return fantasy.Message{}, false
1325	}
1326	var validParts []fantasy.MessagePart
1327	for _, part := range aiMsgs[0].Content {
1328		tr, ok := fantasy.AsMessagePart[fantasy.ToolResultPart](part)
1329		if !ok {
1330			validParts = append(validParts, part)
1331			continue
1332		}
1333		if _, known := knownToolCallIDs[tr.ToolCallID]; known {
1334			validParts = append(validParts, part)
1335		} else {
1336			slog.Warn(
1337				"Dropping orphaned tool result with no matching tool call",
1338				"tool_call_id", tr.ToolCallID,
1339			)
1340		}
1341	}
1342	if len(validParts) == 0 {
1343		return fantasy.Message{}, false
1344	}
1345	msg := aiMsgs[0]
1346	msg.Content = validParts
1347	return msg, true
1348}
1349
1350// syntheticToolResultsForOrphanedCalls returns a tool message containing
1351// synthetic tool results for any tool calls in the assistant message that
1352// have no matching result in knownToolResultIDs. LLM APIs require every
1353// tool_use to be immediately followed by a tool_result; an interrupted
1354// session can leave orphaned tool_use blocks that permanently lock the
1355// conversation. Returns the message and true if any synthetic results were
1356// produced.
1357func syntheticToolResultsForOrphanedCalls(m message.Message, knownToolResultIDs map[string]struct{}) (fantasy.Message, bool) {
1358	var syntheticParts []fantasy.MessagePart
1359	for _, tc := range m.ToolCalls() {
1360		if _, hasResult := knownToolResultIDs[tc.ID]; hasResult {
1361			continue
1362		}
1363		slog.Warn(
1364			"Injecting synthetic tool result for orphaned tool call",
1365			"tool_call_id", tc.ID,
1366			"tool_name", tc.Name,
1367		)
1368		syntheticParts = append(syntheticParts, fantasy.ToolResultPart{
1369			ToolCallID: tc.ID,
1370			Output: fantasy.ToolResultOutputContentError{
1371				Error: errors.New("tool call was interrupted and did not produce a result, you may retry this call if the result is still needed"),
1372			},
1373		})
1374	}
1375	if len(syntheticParts) == 0 {
1376		return fantasy.Message{}, false
1377	}
1378	return fantasy.Message{
1379		Role:    fantasy.MessageRoleTool,
1380		Content: syntheticParts,
1381	}, true
1382}
1383
1384func (a *sessionAgent) getSessionMessages(ctx context.Context, session session.Session) ([]message.Message, error) {
1385	msgs, err := a.messages.List(ctx, session.ID)
1386	if err != nil {
1387		return nil, fmt.Errorf("failed to list messages: %w", err)
1388	}
1389
1390	if session.SummaryMessageID != "" {
1391		summaryMsgIndex := -1
1392		for i, msg := range msgs {
1393			if msg.ID == session.SummaryMessageID {
1394				summaryMsgIndex = i
1395				break
1396			}
1397		}
1398		if summaryMsgIndex != -1 {
1399			msgs = msgs[summaryMsgIndex:]
1400			msgs[0].Role = message.User
1401		}
1402	}
1403	return msgs, nil
1404}
1405
1406// generateTitle generates a session titled based on the initial prompt.
1407func (a *sessionAgent) generateTitle(ctx context.Context, sessionID string, userPrompt string) {
1408	if userPrompt == "" {
1409		return
1410	}
1411
1412	smallModel := a.smallModel.Get()
1413	largeModel := a.largeModel.Get()
1414	systemPromptPrefix := a.systemPromptPrefix.Get()
1415
1416	var maxOutputTokens int64 = 40
1417	if smallModel.CatwalkCfg.CanReason {
1418		maxOutputTokens = smallModel.CatwalkCfg.DefaultMaxTokens
1419	}
1420
1421	newAgent := func(m fantasy.LanguageModel, p []byte, tok int64) fantasy.Agent {
1422		return fantasy.NewAgent(
1423			m,
1424			fantasy.WithSystemPrompt(string(p)+"\n /no_think"),
1425			fantasy.WithMaxOutputTokens(tok),
1426			fantasy.WithUserAgent(userAgent),
1427		)
1428	}
1429
1430	streamCall := fantasy.AgentStreamCall{
1431		Prompt: fmt.Sprintf("Generate a concise title for the following content:\n\n%s\n <think>\n\n</think>", userPrompt),
1432		PrepareStep: func(callCtx context.Context, opts fantasy.PrepareStepFunctionOptions) (_ context.Context, prepared fantasy.PrepareStepResult, err error) {
1433			prepared.Messages = opts.Messages
1434			if systemPromptPrefix != "" {
1435				prepared.Messages = append([]fantasy.Message{
1436					fantasy.NewSystemMessage(systemPromptPrefix),
1437				}, prepared.Messages...)
1438			}
1439			return callCtx, prepared, nil
1440		},
1441	}
1442
1443	// Use the small model to generate the title.
1444	model := smallModel
1445	agent := newAgent(model.Model, titlePrompt, maxOutputTokens)
1446	resp, err := agent.Stream(ctx, streamCall)
1447	if err == nil {
1448		// We successfully generated a title with the small model.
1449		slog.Debug("Generated title with small model")
1450	} else {
1451		// It didn't work. Let's try with the big model.
1452		slog.Error("Error generating title with small model; trying big model", "err", err)
1453		model = largeModel
1454		agent = newAgent(model.Model, titlePrompt, maxOutputTokens)
1455		resp, err = agent.Stream(ctx, streamCall)
1456		if err == nil {
1457			slog.Debug("Generated title with large model")
1458		} else {
1459			// Welp, the large model didn't work either. Use the default
1460			// session name and return.
1461			slog.Error("Error generating title with large model", "err", err)
1462			saveErr := a.sessions.Rename(ctx, sessionID, DefaultSessionName)
1463			if saveErr != nil {
1464				slog.Error("Failed to save session title", "error", saveErr)
1465			}
1466			return
1467		}
1468	}
1469
1470	if resp == nil {
1471		// Actually, we didn't get a response so we can't. Use the default
1472		// session name and return.
1473		slog.Error("Response is nil; can't generate title")
1474		saveErr := a.sessions.Rename(ctx, sessionID, DefaultSessionName)
1475		if saveErr != nil {
1476			slog.Error("Failed to save session title", "error", saveErr)
1477		}
1478		return
1479	}
1480
1481	// Clean up title.
1482	var title string
1483	title = strings.ReplaceAll(resp.Response.Content.Text(), "\n", " ")
1484
1485	// Remove thinking tags if present.
1486	title = thinkTagRegex.ReplaceAllString(title, "")
1487	title = orphanThinkTagRegex.ReplaceAllString(title, "")
1488
1489	title = strings.TrimSpace(title)
1490	title = cmp.Or(title, DefaultSessionName)
1491
1492	// Calculate usage and cost.
1493	var openrouterCost *float64
1494	for _, step := range resp.Steps {
1495		stepCost := a.openrouterCost(step.ProviderMetadata)
1496		if stepCost != nil {
1497			newCost := *stepCost
1498			if openrouterCost != nil {
1499				newCost += *openrouterCost
1500			}
1501			openrouterCost = &newCost
1502		}
1503	}
1504
1505	modelConfig := model.CatwalkCfg
1506	cost := modelConfig.CostPer1MInCached/1e6*float64(resp.TotalUsage.CacheCreationTokens) +
1507		modelConfig.CostPer1MOutCached/1e6*float64(resp.TotalUsage.CacheReadTokens) +
1508		modelConfig.CostPer1MIn/1e6*float64(resp.TotalUsage.InputTokens) +
1509		modelConfig.CostPer1MOut/1e6*float64(resp.TotalUsage.OutputTokens)
1510
1511	// Use override cost if available (e.g., from OpenRouter).
1512	if openrouterCost != nil {
1513		cost = *openrouterCost
1514	}
1515
1516	// Skip cost accumulation
1517	if model.FlatRate {
1518		cost = 0
1519	}
1520
1521	promptTokens := resp.TotalUsage.InputTokens + resp.TotalUsage.CacheCreationTokens
1522	completionTokens := resp.TotalUsage.OutputTokens
1523
1524	// Atomically update only title and usage fields to avoid overriding other
1525	// concurrent session updates.
1526	saveErr := a.sessions.UpdateTitleAndUsage(ctx, sessionID, title, promptTokens, completionTokens, cost)
1527	if saveErr != nil {
1528		slog.Error("Failed to save session title and usage", "error", saveErr)
1529		return
1530	}
1531}
1532
1533func (a *sessionAgent) openrouterCost(metadata fantasy.ProviderMetadata) *float64 {
1534	openrouterMetadata, ok := metadata[openrouter.Name]
1535	if !ok {
1536		return nil
1537	}
1538
1539	opts, ok := openrouterMetadata.(*openrouter.ProviderMetadata)
1540	if !ok {
1541		return nil
1542	}
1543	return &opts.Usage.Cost
1544}
1545
1546func (a *sessionAgent) updateSessionUsage(model Model, session *session.Session, usage fantasy.Usage, overrideCost *float64, estimated bool) {
1547	if !usageIsZero(usage) {
1548		session.EstimatedUsage = estimated
1549	}
1550
1551	modelConfig := model.CatwalkCfg
1552	cost := modelConfig.CostPer1MInCached/1e6*float64(usage.CacheCreationTokens) +
1553		modelConfig.CostPer1MOutCached/1e6*float64(usage.CacheReadTokens) +
1554		modelConfig.CostPer1MIn/1e6*float64(usage.InputTokens) +
1555		modelConfig.CostPer1MOut/1e6*float64(usage.OutputTokens)
1556
1557	if !estimated {
1558		a.eventTokensUsed(session.ID, model, usage, cost)
1559	}
1560
1561	if estimated {
1562		cost = 0
1563	} else {
1564		// Use override cost if available (e.g., from OpenRouter).
1565		if overrideCost != nil {
1566			cost = *overrideCost
1567		}
1568
1569		// Skip cost accumulation
1570		if model.FlatRate {
1571			cost = 0
1572		}
1573	}
1574
1575	session.Cost += cost
1576	updateSessionTokenCounters(session, usage)
1577}
1578
1579func updateSessionTokenCounters(session *session.Session, usage fantasy.Usage) {
1580	if usage.OutputTokens != 0 {
1581		session.CompletionTokens = usage.OutputTokens
1582	}
1583	if promptTokens := usage.InputTokens + usage.CacheReadTokens; promptTokens != 0 {
1584		session.PromptTokens = promptTokens
1585	}
1586}
1587
1588func summaryCompletionTokens(usage fantasy.Usage, summaryMessage message.Message) int64 {
1589	if usage.OutputTokens != 0 {
1590		return usage.OutputTokens
1591	}
1592	return approxTokenCount(summaryMessage.Content().Text) + approxTokenCount(summaryMessage.ReasoningContent().String())
1593}
1594
1595func (a *sessionAgent) Cancel(sessionID string) {
1596	// Serialize against the dispatch handoff in Run so the accepted ->
1597	// (cancel-on-entry | queued | active) transition is atomic against
1598	// this cancel. Every cancel observes at least one of: an active
1599	// request, an accepted run (recorded as a pending cancel), or a
1600	// queue entry it then clears. If none of those hold, an idle Escape
1601	// is a true no-op and must not poison the next prompt.
1602	mu := a.sessionMu(sessionID)
1603	mu.Lock()
1604	defer mu.Unlock()
1605
1606	// Cancel regular requests. Don't use Take() here - we need the entry to
1607	// remain in activeRequests so IsBusy() returns true until the goroutine
1608	// fully completes (including error handling that may access the DB).
1609	// The defer in processRequest will clean up the entry.
1610	if cancel, ok := a.activeRequests.Get(sessionID); ok && cancel != nil {
1611		slog.Debug("Request cancellation initiated", "session_id", sessionID)
1612		cancel()
1613	}
1614
1615	// Also check for summarize requests.
1616	if cancel, ok := a.activeRequests.Get(sessionID + "-summarize"); ok && cancel != nil {
1617		slog.Debug("Summarize cancellation initiated", "session_id", sessionID)
1618		cancel()
1619	}
1620
1621	// Record a pending cancel only when a dispatched-but-not-yet-active
1622	// run exists. This catches a run still in the goroutine scheduler or
1623	// about to enter Run's busy-queue branch, while leaving an idle
1624	// session untouched. Active and accepted are not mutually exclusive:
1625	// when a run is active and a follow-up has been accepted, both the
1626	// cancel above and this pending record fire.
1627	a.acceptedMu.Lock()
1628	count, ok := a.acceptedRuns.Get(sessionID)
1629	a.acceptedMu.Unlock()
1630	if ok && count > 0 {
1631		slog.Debug("Recording pending cancel for accepted run", "session_id", sessionID)
1632		a.pendingCancels.Set(sessionID, struct{}{})
1633	}
1634
1635	if a.QueuedPrompts(sessionID) > 0 {
1636		slog.Debug("Clearing queued prompts", "session_id", sessionID)
1637		a.messageQueue.Del(sessionID)
1638	}
1639}
1640
1641func (a *sessionAgent) ClearQueue(sessionID string) {
1642	if a.QueuedPrompts(sessionID) > 0 {
1643		slog.Debug("Clearing queued prompts", "session_id", sessionID)
1644		a.messageQueue.Del(sessionID)
1645	}
1646}
1647
1648func (a *sessionAgent) CancelAll() {
1649	if !a.IsBusy() {
1650		return
1651	}
1652	for key := range a.activeRequests.Seq2() {
1653		a.Cancel(key) // key is sessionID
1654	}
1655
1656	timeout := time.After(5 * time.Second)
1657	for a.IsBusy() {
1658		select {
1659		case <-timeout:
1660			return
1661		default:
1662			time.Sleep(200 * time.Millisecond)
1663		}
1664	}
1665}
1666
1667func (a *sessionAgent) IsBusy() bool {
1668	var busy bool
1669	for cancelFunc := range a.activeRequests.Seq() {
1670		if cancelFunc != nil {
1671			busy = true
1672			break
1673		}
1674	}
1675	return busy
1676}
1677
1678func (a *sessionAgent) IsSessionBusy(sessionID string) bool {
1679	_, busy := a.activeRequests.Get(sessionID)
1680	return busy
1681}
1682
1683func (a *sessionAgent) QueuedPrompts(sessionID string) int {
1684	l, ok := a.messageQueue.Get(sessionID)
1685	if !ok {
1686		return 0
1687	}
1688	return len(l)
1689}
1690
1691func (a *sessionAgent) QueuedPromptsList(sessionID string) []string {
1692	l, ok := a.messageQueue.Get(sessionID)
1693	if !ok {
1694		return nil
1695	}
1696	prompts := make([]string, len(l))
1697	for i, call := range l {
1698		prompts[i] = call.Prompt
1699	}
1700	return prompts
1701}
1702
1703func (a *sessionAgent) SetModels(large Model, small Model) {
1704	a.largeModel.Set(large)
1705	a.smallModel.Set(small)
1706}
1707
1708func (a *sessionAgent) SetTools(tools []fantasy.AgentTool) {
1709	a.tools.SetSlice(tools)
1710}
1711
1712func (a *sessionAgent) SetSystemPrompt(systemPrompt string) {
1713	a.systemPrompt.Set(systemPrompt)
1714}
1715
1716func (a *sessionAgent) Model() Model {
1717	return a.largeModel.Get()
1718}
1719
1720// convertToToolResult converts a fantasy tool result to a message tool result.
1721func (a *sessionAgent) convertToToolResult(result fantasy.ToolResultContent) message.ToolResult {
1722	baseResult := message.ToolResult{
1723		ToolCallID: result.ToolCallID,
1724		Name:       result.ToolName,
1725		Metadata:   result.ClientMetadata,
1726	}
1727
1728	switch result.Result.GetType() {
1729	case fantasy.ToolResultContentTypeText:
1730		if r, ok := fantasy.AsToolResultOutputType[fantasy.ToolResultOutputContentText](result.Result); ok {
1731			baseResult.Content = r.Text
1732		}
1733	case fantasy.ToolResultContentTypeError:
1734		if r, ok := fantasy.AsToolResultOutputType[fantasy.ToolResultOutputContentError](result.Result); ok {
1735			baseResult.Content = r.Error.Error()
1736			baseResult.IsError = true
1737		}
1738	case fantasy.ToolResultContentTypeMedia:
1739		if r, ok := fantasy.AsToolResultOutputType[fantasy.ToolResultOutputContentMedia](result.Result); ok {
1740			if !stringext.IsValidBase64(r.Data) {
1741				slog.Warn(
1742					"Tool returned media with invalid base64 data, discarding image",
1743					"tool", result.ToolName,
1744					"tool_call_id", result.ToolCallID,
1745				)
1746				baseResult.Content = "Tool returned image data with invalid encoding"
1747				baseResult.IsError = true
1748			} else {
1749				content := r.Text
1750				if content == "" {
1751					content = fmt.Sprintf("Loaded %s content", r.MediaType)
1752				}
1753				baseResult.Content = content
1754				baseResult.Data = r.Data
1755				baseResult.MIMEType = r.MediaType
1756			}
1757		}
1758	}
1759
1760	return baseResult
1761}
1762
1763// workaroundProviderMediaLimitations converts media content in tool results to
1764// user messages for providers that don't natively support images in tool results.
1765//
1766// Problem: OpenAI, Google, OpenRouter, and other OpenAI-compatible providers
1767// don't support sending images/media in tool result messages - they only accept
1768// text in tool results. However, they DO support images in user messages.
1769//
1770// If we send media in tool results to these providers, the API returns an error.
1771//
1772// Solution: For these providers, we:
1773//  1. Replace the media in the tool result with a text placeholder
1774//  2. Inject a user message immediately after with the image as a file attachment
1775//  3. This maintains the tool execution flow while working around API limitations
1776//
1777// Anthropic and Bedrock support images natively in tool results, so we skip
1778// this workaround for them.
1779//
1780// Example transformation:
1781//
1782//	BEFORE: [tool result: image data]
1783//	AFTER:  [tool result: "Image loaded - see attached"], [user: image attachment]
1784func (a *sessionAgent) workaroundProviderMediaLimitations(messages []fantasy.Message, largeModel Model) []fantasy.Message {
1785	providerSupportsMedia := largeModel.ModelCfg.Provider == string(catwalk.InferenceProviderAnthropic) ||
1786		largeModel.ModelCfg.Provider == string(catwalk.InferenceProviderBedrock) ||
1787		largeModel.ModelCfg.Provider == string(catwalk.InferenceProviderBedrockEurope)
1788
1789	if providerSupportsMedia {
1790		return messages
1791	}
1792
1793	convertedMessages := make([]fantasy.Message, 0, len(messages))
1794
1795	for _, msg := range messages {
1796		if msg.Role != fantasy.MessageRoleTool {
1797			convertedMessages = append(convertedMessages, msg)
1798			continue
1799		}
1800
1801		textParts := make([]fantasy.MessagePart, 0, len(msg.Content))
1802		var mediaFiles []fantasy.FilePart
1803
1804		for _, part := range msg.Content {
1805			toolResult, ok := fantasy.AsMessagePart[fantasy.ToolResultPart](part)
1806			if !ok {
1807				textParts = append(textParts, part)
1808				continue
1809			}
1810
1811			if media, ok := fantasy.AsToolResultOutputType[fantasy.ToolResultOutputContentMedia](toolResult.Output); ok {
1812				decoded, err := base64.StdEncoding.DecodeString(media.Data)
1813				if err != nil {
1814					slog.Warn("Failed to decode media data", "error", err)
1815					textParts = append(textParts, part)
1816					continue
1817				}
1818
1819				mediaFiles = append(mediaFiles, fantasy.FilePart{
1820					Data:      decoded,
1821					MediaType: media.MediaType,
1822					Filename:  fmt.Sprintf("tool-result-%s", toolResult.ToolCallID),
1823				})
1824
1825				textParts = append(textParts, fantasy.ToolResultPart{
1826					ToolCallID: toolResult.ToolCallID,
1827					Output: fantasy.ToolResultOutputContentText{
1828						Text: "[Image/media content loaded - see attached file]",
1829					},
1830					ProviderOptions: toolResult.ProviderOptions,
1831				})
1832			} else {
1833				textParts = append(textParts, part)
1834			}
1835		}
1836
1837		convertedMessages = append(convertedMessages, fantasy.Message{
1838			Role:    fantasy.MessageRoleTool,
1839			Content: textParts,
1840		})
1841
1842		if len(mediaFiles) > 0 {
1843			convertedMessages = append(convertedMessages, fantasy.NewUserMessage(
1844				"Here is the media content from the tool result:",
1845				mediaFiles...,
1846			))
1847		}
1848	}
1849
1850	return convertedMessages
1851}
1852
1853// buildSummaryPrompt constructs the prompt text for session summarization.
1854func buildSummaryPrompt(todos []session.Todo) string {
1855	var sb strings.Builder
1856	sb.WriteString("Provide a detailed summary of our conversation above.")
1857	if len(todos) > 0 {
1858		sb.WriteString("\n\n## Current Todo List\n\n")
1859		for _, t := range todos {
1860			fmt.Fprintf(&sb, "- [%s] %s\n", t.Status, t.Content)
1861		}
1862		sb.WriteString("\nInclude these tasks and their statuses in your summary. ")
1863		sb.WriteString("Instruct the resuming assistant to use the `todos` tool to continue tracking progress on these tasks.")
1864	}
1865	return sb.String()
1866}
1867
1868func providerRetryLogFields(err *fantasy.ProviderError, delay time.Duration) []any {
1869	fields := []any{
1870		"retry_delay", delay.String(),
1871	}
1872	if err == nil {
1873		return fields
1874	}
1875	fields = append(fields, "status_code", err.StatusCode)
1876	if err.Title != "" {
1877		fields = append(fields, "title", err.Title)
1878	}
1879	if err.Message != "" {
1880		fields = append(fields, "message", err.Message)
1881	}
1882	return fields
1883}