agent.go

   1// Package agent is the core orchestration layer for Crush AI agents.
   2//
   3// It provides session-based AI agent functionality for managing
   4// conversations, tool execution, and message handling. It coordinates
   5// interactions between language models, messages, sessions, and tools while
   6// handling features like automatic summarization, queuing, and token
   7// management.
   8package agent
   9
  10import (
  11	"cmp"
  12	"context"
  13	_ "embed"
  14	"encoding/base64"
  15	"errors"
  16	"fmt"
  17	"log/slog"
  18	"net/http"
  19	"os"
  20	"regexp"
  21	"strconv"
  22	"strings"
  23	"sync"
  24	"time"
  25
  26	"charm.land/catwalk/pkg/catwalk"
  27	"charm.land/fantasy"
  28	"charm.land/fantasy/providers/anthropic"
  29	"charm.land/fantasy/providers/bedrock"
  30	"charm.land/fantasy/providers/google"
  31	"charm.land/fantasy/providers/openai"
  32	"charm.land/fantasy/providers/openrouter"
  33	"charm.land/fantasy/providers/vercel"
  34	"charm.land/lipgloss/v2"
  35	"github.com/charmbracelet/crush/internal/agent/hyper"
  36	"github.com/charmbracelet/crush/internal/agent/notify"
  37	"github.com/charmbracelet/crush/internal/agent/tools"
  38	"github.com/charmbracelet/crush/internal/agent/tools/mcp"
  39	"github.com/charmbracelet/crush/internal/config"
  40	"github.com/charmbracelet/crush/internal/csync"
  41	"github.com/charmbracelet/crush/internal/message"
  42	"github.com/charmbracelet/crush/internal/pubsub"
  43	"github.com/charmbracelet/crush/internal/session"
  44	"github.com/charmbracelet/crush/internal/stringext"
  45	"github.com/charmbracelet/crush/internal/version"
  46	"github.com/charmbracelet/x/exp/charmtone"
  47)
  48
  49const (
  50	DefaultSessionName = "Untitled Session"
  51
  52	// Constants for auto-summarization thresholds
  53	largeContextWindowThreshold = 200_000
  54	largeContextWindowBuffer    = 20_000
  55	smallContextWindowRatio     = 0.2
  56)
  57
  58var userAgent = fmt.Sprintf("Charm-Crush/%s (https://charm.land/crush)", version.Version)
  59
  60//go:embed templates/title.md
  61var titlePrompt []byte
  62
  63//go:embed templates/summary.md
  64var summaryPrompt []byte
  65
  66// Used to remove <think> tags from generated titles.
  67var (
  68	thinkTagRegex       = regexp.MustCompile(`(?s)<think>.*?</think>`)
  69	orphanThinkTagRegex = regexp.MustCompile(`</?think>`)
  70)
  71
  72type SessionAgentCall struct {
  73	SessionID string
  74	// RunID, when non-empty, is the caller-supplied correlator that
  75	// gets echoed back on the notify.RunComplete event emitted for
  76	// this turn. It is preserved when the call is enqueued behind a
  77	// busy session so the queued turn's terminal event is still
  78	// recognisable to the original caller. Callers that need a
  79	// reliable completion contract (e.g. `crush run` against a
  80	// session that may be busy) MUST set it; SessionID alone is
  81	// ambiguous when concurrent turns share the same session.
  82	RunID            string
  83	Prompt           string
  84	ProviderOptions  fantasy.ProviderOptions
  85	Attachments      []message.Attachment
  86	MaxOutputTokens  int64
  87	Temperature      *float64
  88	TopP             *float64
  89	TopK             *int64
  90	FrequencyPenalty *float64
  91	PresencePenalty  *float64
  92	NonInteractive   bool
  93	// OnComplete, when non-nil, replaces the default RunComplete
  94	// publish path: the inner Run hands the terminal payload to this
  95	// callback instead of emitting it on the RunComplete broker. The
  96	// coordinator uses this hook to coalesce the unauthorized →
  97	// re-auth → retry chain into a single user-visible terminal
  98	// event, so non-interactive clients (e.g. `crush run`) don't
  99	// exit on a stale failed-attempt RunComplete before the
 100	// successful retry. It is intentionally stripped when queueing
 101	// a busy-session call (see Run): the originating
 102	// coordinator.Run has long returned by the time the queued
 103	// recursion drains, so falling back to the default broker
 104	// publish keeps the event visible to subscribers.
 105	OnComplete func(notify.RunComplete)
 106}
 107
 108type SessionAgent interface {
 109	Run(context.Context, SessionAgentCall) (*fantasy.AgentResult, error)
 110	SetModels(large Model, small Model)
 111	SetTools(tools []fantasy.AgentTool)
 112	SetSystemPrompt(systemPrompt string)
 113	Cancel(sessionID string)
 114	CancelAll()
 115	IsSessionBusy(sessionID string) bool
 116	IsBusy() bool
 117	QueuedPrompts(sessionID string) int
 118	QueuedPromptsList(sessionID string) []string
 119	ClearQueue(sessionID string)
 120	Summarize(context.Context, string, fantasy.ProviderOptions) error
 121	Model() Model
 122}
 123
 124type Model struct {
 125	Model      fantasy.LanguageModel
 126	CatwalkCfg catwalk.Model
 127	ModelCfg   config.SelectedModel
 128	FlatRate   bool
 129}
 130
 131type sessionAgent struct {
 132	largeModel         *csync.Value[Model]
 133	smallModel         *csync.Value[Model]
 134	systemPromptPrefix *csync.Value[string]
 135	systemPrompt       *csync.Value[string]
 136	tools              *csync.Slice[fantasy.AgentTool]
 137
 138	isSubAgent           bool
 139	sessions             session.Service
 140	messages             message.Service
 141	disableAutoSummarize bool
 142	isYolo               bool
 143	notify               pubsub.Publisher[notify.Notification]
 144	runComplete          pubsub.Publisher[notify.RunComplete]
 145
 146	messageQueue   *csync.Map[string, []SessionAgentCall]
 147	activeRequests *csync.Map[string, context.CancelFunc]
 148}
 149
 150type SessionAgentOptions struct {
 151	LargeModel           Model
 152	SmallModel           Model
 153	SystemPromptPrefix   string
 154	SystemPrompt         string
 155	IsSubAgent           bool
 156	DisableAutoSummarize bool
 157	IsYolo               bool
 158	Sessions             session.Service
 159	Messages             message.Service
 160	Tools                []fantasy.AgentTool
 161	Notify               pubsub.Publisher[notify.Notification]
 162	RunComplete          pubsub.Publisher[notify.RunComplete]
 163}
 164
 165func NewSessionAgent(
 166	opts SessionAgentOptions,
 167) SessionAgent {
 168	return &sessionAgent{
 169		largeModel:           csync.NewValue(opts.LargeModel),
 170		smallModel:           csync.NewValue(opts.SmallModel),
 171		systemPromptPrefix:   csync.NewValue(opts.SystemPromptPrefix),
 172		systemPrompt:         csync.NewValue(opts.SystemPrompt),
 173		isSubAgent:           opts.IsSubAgent,
 174		sessions:             opts.Sessions,
 175		messages:             opts.Messages,
 176		disableAutoSummarize: opts.DisableAutoSummarize,
 177		tools:                csync.NewSliceFrom(opts.Tools),
 178		isYolo:               opts.IsYolo,
 179		notify:               opts.Notify,
 180		runComplete:          opts.RunComplete,
 181		messageQueue:         csync.NewMap[string, []SessionAgentCall](),
 182		activeRequests:       csync.NewMap[string, context.CancelFunc](),
 183	}
 184}
 185
 186func (a *sessionAgent) Run(ctx context.Context, call SessionAgentCall) (result *fantasy.AgentResult, retErr error) {
 187	if call.Prompt == "" && !message.ContainsTextAttachment(call.Attachments) {
 188		return nil, ErrEmptyPrompt
 189	}
 190	if call.SessionID == "" {
 191		return nil, ErrSessionMissing
 192	}
 193
 194	// Queue the message if busy. Strip OnComplete: the caller that
 195	// supplied the hook (typically coordinator.Run) has its own
 196	// retry/coalesce scope that ends when it returns, so by the time
 197	// the queue drains nobody is left to consume the buffered
 198	// terminal event. The recursive Run will fall back to the
 199	// default broker publish, which is what existing subscribers
 200	// expect for queued turns.
 201	if a.IsSessionBusy(call.SessionID) {
 202		existing, ok := a.messageQueue.Get(call.SessionID)
 203		if !ok {
 204			existing = []SessionAgentCall{}
 205		}
 206		queued := call
 207		queued.OnComplete = nil
 208		existing = append(existing, queued)
 209		a.messageQueue.Set(call.SessionID, existing)
 210		return nil, nil
 211	}
 212
 213	// Copy mutable fields under lock to avoid races with SetTools/SetModels.
 214	agentTools := a.tools.Copy()
 215	largeModel := a.largeModel.Get()
 216	systemPrompt := a.systemPrompt.Get()
 217	promptPrefix := a.systemPromptPrefix.Get()
 218	var instructions strings.Builder
 219
 220	for _, server := range mcp.GetStates() {
 221		if server.State != mcp.StateConnected {
 222			continue
 223		}
 224		if s := server.Client.InitializeResult().Instructions; s != "" {
 225			instructions.WriteString(s)
 226			instructions.WriteString("\n\n")
 227		}
 228	}
 229
 230	if s := instructions.String(); s != "" {
 231		systemPrompt += "\n\n<mcp-instructions>\n" + s + "\n</mcp-instructions>"
 232	}
 233
 234	if len(agentTools) > 0 {
 235		// Add Anthropic caching to the last tool.
 236		agentTools[len(agentTools)-1].SetProviderOptions(a.getCacheControlOptions())
 237	}
 238
 239	agent := fantasy.NewAgent(
 240		largeModel.Model,
 241		fantasy.WithSystemPrompt(systemPrompt),
 242		fantasy.WithTools(agentTools...),
 243		fantasy.WithUserAgent(userAgent),
 244	)
 245
 246	sessionLock := sync.Mutex{}
 247	currentSession, err := a.sessions.Get(ctx, call.SessionID)
 248	if err != nil {
 249		return nil, fmt.Errorf("failed to get session: %w", err)
 250	}
 251
 252	msgs, err := a.getSessionMessages(ctx, currentSession)
 253	if err != nil {
 254		return nil, fmt.Errorf("failed to get session messages: %w", err)
 255	}
 256
 257	var wg sync.WaitGroup
 258	// Generate title if first message.
 259	if len(msgs) == 0 {
 260		titleCtx := ctx // Copy to avoid race with ctx reassignment below.
 261		wg.Go(func() {
 262			a.generateTitle(titleCtx, call.SessionID, call.Prompt)
 263		})
 264	}
 265	defer wg.Wait()
 266
 267	// Add the user message to the session.
 268	_, err = a.createUserMessage(ctx, call)
 269	if err != nil {
 270		return nil, err
 271	}
 272
 273	// Add the session to the context.
 274	ctx = context.WithValue(ctx, tools.SessionIDContextKey, call.SessionID)
 275
 276	genCtx, cancel := context.WithCancel(ctx)
 277	a.activeRequests.Set(call.SessionID, cancel)
 278
 279	defer cancel()
 280	defer a.activeRequests.Del(call.SessionID)
 281	// skipRunComplete is set just before the queued-recursion path so
 282	// the outer Run doesn't publish a RunComplete that would race
 283	// with — and be superseded by — the recursive call's own
 284	// RunComplete (each queued user prompt is its own turn and
 285	// publishes exactly one terminal event).
 286	var skipRunComplete bool
 287	// currentAssistant is declared here so the deferred RunComplete
 288	// publish below can capture the pointer that PrepareStep will
 289	// later (re)assign for each streaming step. The final assistant
 290	// message of the turn is the value reachable through this
 291	// pointer when the defer runs.
 292	var currentAssistant *message.Message
 293	// Drain any debounced message updates before returning. message.Service
 294	// already flushes synchronously on terminal updates, but a defer here
 295	// guarantees the contract at every Run exit (success, error, panic
 296	// recovery upstream) without callers needing to know.
 297	//
 298	// After the flush completes — meaning all per-message
 299	// Publish(UpdatedEvent) calls have fired and been buffered into
 300	// every subscriber's channel — publish the authoritative
 301	// RunComplete event for this turn. The flush-then-publish order
 302	// gives well-behaved clients the best chance of seeing the final
 303	// message event before RunComplete; the embedded Text field
 304	// reconciles for clients that observe the events out of order
 305	// (the pubsub broker fan-in does not serialize publishes from
 306	// different upstream brokers).
 307	defer func() {
 308		// Use a context detached from the run context: workspace
 309		// shutdown cancels ctx before this goroutine returns, but the
 310		// buffered streaming deltas must still land before the DB is
 311		// closed. A short timeout bounds the flush.
 312		flushCtx, flushCancel := context.WithTimeout(context.WithoutCancel(ctx), 5*time.Second)
 313		defer flushCancel()
 314		if flushErr := a.messages.FlushAll(flushCtx); flushErr != nil {
 315			slog.Error("Failed to flush pending message updates after run", "error", flushErr)
 316		}
 317		if skipRunComplete {
 318			return
 319		}
 320		complete := notify.RunComplete{SessionID: call.SessionID, RunID: call.RunID}
 321		if currentAssistant != nil {
 322			complete.MessageID = currentAssistant.ID
 323			complete.Text = currentAssistant.Content().String()
 324		}
 325		if retErr != nil {
 326			complete.Error = retErr.Error()
 327			complete.Cancelled = errors.Is(retErr, context.Canceled)
 328		} else if ctx.Err() != nil {
 329			complete.Cancelled = true
 330		}
 331		// Prefer the per-call hook when supplied so the coordinator
 332		// can coalesce retries (e.g. unauthorized → re-auth → retry)
 333		// into a single user-visible terminal event. The fallback
 334		// must-deliver publish applies bounded-blocking semantics to
 335		// the authoritative terminal event so a momentarily-full
 336		// subscriber channel can't silently drop it and hang
 337		// non-interactive clients waiting on RunComplete.
 338		if call.OnComplete != nil {
 339			call.OnComplete(complete)
 340			return
 341		}
 342		if a.runComplete == nil {
 343			return
 344		}
 345		a.runComplete.PublishMustDeliver(ctx, pubsub.UpdatedEvent, complete)
 346	}()
 347
 348	history, files := a.preparePrompt(msgs, largeModel.CatwalkCfg.SupportsImages, call.Attachments...)
 349
 350	startTime := time.Now()
 351	a.eventPromptSent(call.SessionID)
 352
 353	var stepMessages []fantasy.Message
 354	var shouldSummarize bool
 355	// Don't send MaxOutputTokens if 0 — some providers (e.g. LM Studio) reject it
 356	var maxOutputTokens *int64
 357	if call.MaxOutputTokens > 0 {
 358		maxOutputTokens = &call.MaxOutputTokens
 359	}
 360	result, err = agent.Stream(genCtx, fantasy.AgentStreamCall{
 361		Prompt:           message.PromptWithTextAttachments(call.Prompt, call.Attachments),
 362		Files:            files,
 363		Messages:         history,
 364		ProviderOptions:  call.ProviderOptions,
 365		MaxOutputTokens:  maxOutputTokens,
 366		TopP:             call.TopP,
 367		Temperature:      call.Temperature,
 368		PresencePenalty:  call.PresencePenalty,
 369		TopK:             call.TopK,
 370		FrequencyPenalty: call.FrequencyPenalty,
 371		PrepareStep: func(callContext context.Context, options fantasy.PrepareStepFunctionOptions) (_ context.Context, prepared fantasy.PrepareStepResult, err error) {
 372			prepared.Messages = options.Messages
 373			for i := range prepared.Messages {
 374				prepared.Messages[i].ProviderOptions = nil
 375			}
 376
 377			// Use latest tools (updated by SetTools when MCP tools change).
 378			prepared.Tools = a.tools.Copy()
 379
 380			queuedCalls, _ := a.messageQueue.Get(call.SessionID)
 381			a.messageQueue.Del(call.SessionID)
 382			for _, queued := range queuedCalls {
 383				userMessage, createErr := a.createUserMessage(callContext, queued)
 384				if createErr != nil {
 385					return callContext, prepared, createErr
 386				}
 387				prepared.Messages = append(prepared.Messages, userMessage.ToAIMessage()...)
 388			}
 389
 390			prepared.Messages = a.workaroundProviderMediaLimitations(prepared.Messages, largeModel)
 391
 392			lastSystemRoleInx := 0
 393			systemMessageUpdated := false
 394			for i, msg := range prepared.Messages {
 395				// Only add cache control to the last message.
 396				if msg.Role == fantasy.MessageRoleSystem {
 397					lastSystemRoleInx = i
 398				} else if !systemMessageUpdated {
 399					prepared.Messages[lastSystemRoleInx].ProviderOptions = a.getCacheControlOptions()
 400					systemMessageUpdated = true
 401				}
 402				// Than add cache control to the last 2 messages.
 403				if i > len(prepared.Messages)-3 {
 404					prepared.Messages[i].ProviderOptions = a.getCacheControlOptions()
 405				}
 406			}
 407
 408			if promptPrefix != "" {
 409				prepared.Messages = append([]fantasy.Message{fantasy.NewSystemMessage(promptPrefix)}, prepared.Messages...)
 410			}
 411
 412			sessionLock.Lock()
 413			stepMessages = cloneFantasyMessages(prepared.Messages)
 414			sessionLock.Unlock()
 415
 416			var assistantMsg message.Message
 417			assistantMsg, err = a.messages.Create(callContext, call.SessionID, message.CreateMessageParams{
 418				Role:     message.Assistant,
 419				Parts:    []message.ContentPart{},
 420				Model:    largeModel.ModelCfg.Model,
 421				Provider: largeModel.ModelCfg.Provider,
 422			})
 423			if err != nil {
 424				return callContext, prepared, err
 425			}
 426			callContext = context.WithValue(callContext, tools.MessageIDContextKey, assistantMsg.ID)
 427			callContext = context.WithValue(callContext, tools.SupportsImagesContextKey, largeModel.CatwalkCfg.SupportsImages)
 428			callContext = context.WithValue(callContext, tools.ModelNameContextKey, largeModel.CatwalkCfg.Name)
 429			currentAssistant = &assistantMsg
 430			return callContext, prepared, err
 431		},
 432		OnReasoningStart: func(id string, reasoning fantasy.ReasoningContent) error {
 433			currentAssistant.AppendReasoningContent(reasoning.Text)
 434			return a.messages.Update(genCtx, *currentAssistant)
 435		},
 436		OnReasoningDelta: func(id string, text string) error {
 437			currentAssistant.AppendReasoningContent(text)
 438			return a.messages.Update(genCtx, *currentAssistant)
 439		},
 440		OnReasoningEnd: func(id string, reasoning fantasy.ReasoningContent) error {
 441			// handle anthropic signature
 442			if anthropicData, ok := reasoning.ProviderMetadata[anthropic.Name]; ok {
 443				if reasoning, ok := anthropicData.(*anthropic.ReasoningOptionMetadata); ok {
 444					currentAssistant.AppendReasoningSignature(reasoning.Signature)
 445				}
 446			}
 447			if googleData, ok := reasoning.ProviderMetadata[google.Name]; ok {
 448				if reasoning, ok := googleData.(*google.ReasoningMetadata); ok {
 449					currentAssistant.AppendThoughtSignature(reasoning.Signature, reasoning.ToolID)
 450				}
 451			}
 452			if openaiData, ok := reasoning.ProviderMetadata[openai.Name]; ok {
 453				if reasoning, ok := openaiData.(*openai.ResponsesReasoningMetadata); ok {
 454					currentAssistant.SetReasoningResponsesData(reasoning)
 455				}
 456			}
 457			currentAssistant.FinishThinking()
 458			return a.messages.Update(genCtx, *currentAssistant)
 459		},
 460		OnTextDelta: func(id string, text string) error {
 461			// Strip leading newline from initial text content. This is is
 462			// particularly important in non-interactive mode where leading
 463			// newlines are very visible.
 464			if len(currentAssistant.Parts) == 0 {
 465				text = strings.TrimPrefix(text, "\n")
 466			}
 467
 468			currentAssistant.AppendContent(text)
 469			return a.messages.Update(genCtx, *currentAssistant)
 470		},
 471		OnToolInputStart: func(id string, toolName string) error {
 472			toolCall := message.ToolCall{
 473				ID:               id,
 474				Name:             toolName,
 475				ProviderExecuted: false,
 476				Finished:         false,
 477			}
 478			currentAssistant.AddToolCall(toolCall)
 479			// Use parent ctx instead of genCtx to ensure the update succeeds
 480			// even if the request is canceled mid-stream
 481			return a.messages.Update(ctx, *currentAssistant)
 482		},
 483		OnRetry: func(err *fantasy.ProviderError, delay time.Duration) {
 484			slog.Warn("Provider request failed, retrying", providerRetryLogFields(err, delay)...)
 485		},
 486		OnToolCall: func(tc fantasy.ToolCallContent) error {
 487			toolCall := message.ToolCall{
 488				ID:               tc.ToolCallID,
 489				Name:             tc.ToolName,
 490				Input:            tc.Input,
 491				ProviderExecuted: false,
 492				Finished:         true,
 493			}
 494			currentAssistant.AddToolCall(toolCall)
 495			// Use parent ctx instead of genCtx to ensure the update succeeds
 496			// even if the request is canceled mid-stream
 497			return a.messages.Update(ctx, *currentAssistant)
 498		},
 499		OnToolResult: func(result fantasy.ToolResultContent) error {
 500			toolResult := a.convertToToolResult(result)
 501			// Use parent ctx instead of genCtx to ensure the message is created
 502			// even if the request is canceled mid-stream
 503			_, createMsgErr := a.messages.Create(ctx, currentAssistant.SessionID, message.CreateMessageParams{
 504				Role: message.Tool,
 505				Parts: []message.ContentPart{
 506					toolResult,
 507				},
 508			})
 509			return createMsgErr
 510		},
 511		OnStepFinish: func(stepResult fantasy.StepResult) error {
 512			finishReason := message.FinishReasonUnknown
 513			switch stepResult.FinishReason {
 514			case fantasy.FinishReasonLength:
 515				finishReason = message.FinishReasonMaxTokens
 516			case fantasy.FinishReasonStop:
 517				finishReason = message.FinishReasonEndTurn
 518			case fantasy.FinishReasonToolCalls:
 519				finishReason = message.FinishReasonToolUse
 520			}
 521			// If a tool result halted the turn (e.g. a hook halt or a
 522			// permission denial), the step ends on FinishReasonToolCalls but
 523			// the model will not be called again. Treat it as the end of the
 524			// turn so the UI can render the assistant footer.
 525			if finishReason == message.FinishReasonToolUse {
 526				for _, tr := range stepResult.Content.ToolResults() {
 527					if tr.StopTurn {
 528						finishReason = message.FinishReasonEndTurn
 529						break
 530					}
 531				}
 532			}
 533			currentAssistant.AddFinish(finishReason, "", "")
 534			sessionLock.Lock()
 535			defer sessionLock.Unlock()
 536
 537			updatedSession, getSessionErr := a.sessions.Get(ctx, call.SessionID)
 538			if getSessionErr != nil {
 539				return getSessionErr
 540			}
 541			usage, estimated := fallbackStepUsage(stepMessages, stepResult)
 542			a.updateSessionUsage(largeModel, &updatedSession, usage, a.openrouterCost(stepResult.ProviderMetadata), estimated)
 543			_, sessionErr := a.sessions.Save(ctx, updatedSession)
 544			if sessionErr != nil {
 545				return sessionErr
 546			}
 547			currentSession = updatedSession
 548			return a.messages.Update(genCtx, *currentAssistant)
 549		},
 550		StopWhen: []fantasy.StopCondition{
 551			func(_ []fantasy.StepResult) bool {
 552				cw := int64(largeModel.CatwalkCfg.ContextWindow)
 553				// If context window is unknown (0), skip auto-summarize
 554				// to avoid immediately truncating custom/local models.
 555				if cw == 0 {
 556					return false
 557				}
 558				tokens := currentSession.CompletionTokens + currentSession.PromptTokens
 559				remaining := cw - tokens
 560				var threshold int64
 561				if cw > largeContextWindowThreshold {
 562					threshold = largeContextWindowBuffer
 563				} else {
 564					threshold = int64(float64(cw) * smallContextWindowRatio)
 565				}
 566				if (remaining <= threshold) && !a.disableAutoSummarize {
 567					shouldSummarize = true
 568					return true
 569				}
 570				return false
 571			},
 572			func(steps []fantasy.StepResult) bool {
 573				return hasRepeatedToolCalls(steps, loopDetectionWindowSize, loopDetectionMaxRepeats)
 574			},
 575		},
 576	})
 577
 578	a.eventPromptResponded(call.SessionID, time.Since(startTime).Truncate(time.Second))
 579
 580	if err != nil {
 581		isHyper := largeModel.ModelCfg.Provider == hyper.Name
 582		isCancelErr := errors.Is(err, context.Canceled)
 583		if currentAssistant == nil {
 584			return result, err
 585		}
 586		// Persist final state with a context detached from the run
 587		// context. The run context (ctx) is derived from the
 588		// workspace context, which workspace shutdown cancels before
 589		// agent goroutines finish; using ctx here would drop the
 590		// final assistant state. WithoutCancel keeps the values
 591		// (e.g. session ID) while ignoring cancellation, and a short
 592		// timeout bounds the cleanup writes.
 593		cleanupCtx, cleanupCancel := context.WithTimeout(context.WithoutCancel(ctx), 5*time.Second)
 594		defer cleanupCancel()
 595		// Ensure we finish thinking on error to close the reasoning state.
 596		currentAssistant.FinishThinking()
 597		toolCalls := currentAssistant.ToolCalls()
 598		// INFO: we use the cleanup context here because the genCtx has been cancelled.
 599		msgs, createErr := a.messages.List(cleanupCtx, currentAssistant.SessionID)
 600		if createErr != nil {
 601			return nil, createErr
 602		}
 603		for _, tc := range toolCalls {
 604			if !tc.Finished {
 605				tc.Finished = true
 606				tc.Input = "{}"
 607				currentAssistant.AddToolCall(tc)
 608				updateErr := a.messages.Update(cleanupCtx, *currentAssistant)
 609				if updateErr != nil {
 610					return nil, updateErr
 611				}
 612			}
 613
 614			found := false
 615			for _, msg := range msgs {
 616				if msg.Role == message.Tool {
 617					for _, tr := range msg.ToolResults() {
 618						if tr.ToolCallID == tc.ID {
 619							found = true
 620							break
 621						}
 622					}
 623				}
 624				if found {
 625					break
 626				}
 627			}
 628			if found {
 629				continue
 630			}
 631			content := "There was an error while executing the tool"
 632			if isCancelErr {
 633				content = "Error: user cancelled assistant tool calling"
 634			}
 635			toolResult := message.ToolResult{
 636				ToolCallID: tc.ID,
 637				Name:       tc.Name,
 638				Content:    content,
 639				IsError:    true,
 640			}
 641			_, createErr = a.messages.Create(cleanupCtx, currentAssistant.SessionID, message.CreateMessageParams{
 642				Role: message.Tool,
 643				Parts: []message.ContentPart{
 644					toolResult,
 645				},
 646			})
 647			if createErr != nil {
 648				return nil, createErr
 649			}
 650		}
 651		var fantasyErr *fantasy.Error
 652		var providerErr *fantasy.ProviderError
 653		const defaultTitle = "Provider Error"
 654		linkStyle := lipgloss.NewStyle().Foreground(charmtone.Guac).Underline(true)
 655		if isCancelErr {
 656			currentAssistant.AddFinish(message.FinishReasonCanceled, "User canceled request", "")
 657		} else if isHyper && errors.As(err, &providerErr) && providerErr.StatusCode == http.StatusUnauthorized {
 658			currentAssistant.AddFinish(message.FinishReasonError, "Unauthorized", `Please re-authenticate with Hyper. You can also run "crush auth" to re-authenticate.`)
 659			if a.notify != nil {
 660				a.notify.Publish(pubsub.CreatedEvent, notify.Notification{
 661					SessionID:    call.SessionID,
 662					SessionTitle: currentSession.Title,
 663					Type:         notify.TypeReAuthenticate,
 664					ProviderID:   largeModel.ModelCfg.Provider,
 665				})
 666			}
 667		} else if isHyper && errors.As(err, &providerErr) && providerErr.StatusCode == http.StatusPaymentRequired {
 668			url := hyper.BaseURL()
 669			link := linkStyle.Hyperlink(url, "id=hyper").Render(url)
 670			currentAssistant.AddFinish(message.FinishReasonError, "No credits", "You're out of credits. Add more at "+link)
 671		} else if errors.As(err, &providerErr) {
 672			if providerErr.Message == "The requested model is not supported." {
 673				url := "https://github.com/settings/copilot/features"
 674				link := linkStyle.Hyperlink(url, "id=copilot").Render(url)
 675				currentAssistant.AddFinish(
 676					message.FinishReasonError,
 677					"Copilot model not enabled",
 678					fmt.Sprintf("%q is not enabled in Copilot. Go to the following page to enable it. Then, wait 5 minutes before trying again. %s", largeModel.CatwalkCfg.Name, link),
 679				)
 680			} else {
 681				currentAssistant.AddFinish(message.FinishReasonError, cmp.Or(stringext.Capitalize(providerErr.Title), defaultTitle), providerErr.Message)
 682			}
 683		} else if errors.As(err, &fantasyErr) {
 684			currentAssistant.AddFinish(message.FinishReasonError, cmp.Or(stringext.Capitalize(fantasyErr.Title), defaultTitle), fantasyErr.Message)
 685		} else {
 686			currentAssistant.AddFinish(message.FinishReasonError, defaultTitle, err.Error())
 687		}
 688		// Note: we use the cleanup context here because the genCtx has been
 689		// cancelled.
 690		updateErr := a.messages.Update(cleanupCtx, *currentAssistant)
 691		if updateErr != nil {
 692			return nil, updateErr
 693		}
 694		return nil, err
 695	}
 696
 697	if shouldSummarize {
 698		a.activeRequests.Del(call.SessionID)
 699		if summarizeErr := a.Summarize(genCtx, call.SessionID, call.ProviderOptions); summarizeErr != nil {
 700			return nil, summarizeErr
 701		}
 702		// If the agent wasn't done...
 703		if len(currentAssistant.ToolCalls()) > 0 {
 704			existing, ok := a.messageQueue.Get(call.SessionID)
 705			if !ok {
 706				existing = []SessionAgentCall{}
 707			}
 708			call.Prompt = fmt.Sprintf("The previous session was interrupted because it got too long, the initial user request was: `%s`", call.Prompt)
 709			existing = append(existing, call)
 710			a.messageQueue.Set(call.SessionID, existing)
 711		}
 712	}
 713
 714	// Release active request before publishing the notification.
 715	// TUI handlers poll IsSessionBusy() and only re-evaluate when a
 716	// tea.Msg arrives, so the cleanup must precede the notify or
 717	// subscribers see stale busy state at the moment of receipt.
 718	a.activeRequests.Del(call.SessionID)
 719	cancel()
 720
 721	// Send notification that agent has finished its turn (skip for
 722	// nested/non-interactive sessions).
 723	if !call.NonInteractive && a.notify != nil {
 724		a.notify.Publish(pubsub.CreatedEvent, notify.Notification{
 725			SessionID:    call.SessionID,
 726			SessionTitle: currentSession.Title,
 727			Type:         notify.TypeAgentFinished,
 728		})
 729	}
 730
 731	queuedMessages, ok := a.messageQueue.Get(call.SessionID)
 732	if !ok || len(queuedMessages) == 0 {
 733		return result, err
 734	}
 735	// There are queued messages restart the loop. The recursive Run
 736	// publishes its own RunComplete for the queued prompt, so suppress
 737	// the outer defer's emit to avoid a duplicate event whose Error
 738	// field would belong to the recursive turn but whose MessageID/Text
 739	// would belong to the outer turn.
 740	skipRunComplete = true
 741	firstQueuedMessage := queuedMessages[0]
 742	a.messageQueue.Set(call.SessionID, queuedMessages[1:])
 743	return a.Run(ctx, firstQueuedMessage)
 744}
 745
 746func (a *sessionAgent) Summarize(ctx context.Context, sessionID string, opts fantasy.ProviderOptions) error {
 747	if a.IsSessionBusy(sessionID) {
 748		return ErrSessionBusy
 749	}
 750
 751	// Copy mutable fields under lock to avoid races with SetModels.
 752	largeModel := a.largeModel.Get()
 753	systemPromptPrefix := a.systemPromptPrefix.Get()
 754
 755	currentSession, err := a.sessions.Get(ctx, sessionID)
 756	if err != nil {
 757		return fmt.Errorf("failed to get session: %w", err)
 758	}
 759	msgs, err := a.getSessionMessages(ctx, currentSession)
 760	if err != nil {
 761		return err
 762	}
 763	if len(msgs) == 0 {
 764		// Nothing to summarize.
 765		return nil
 766	}
 767
 768	aiMsgs, _ := a.preparePrompt(msgs, largeModel.CatwalkCfg.SupportsImages)
 769
 770	genCtx, cancel := context.WithCancel(ctx)
 771	a.activeRequests.Set(sessionID, cancel)
 772	defer a.activeRequests.Del(sessionID)
 773	defer cancel()
 774	defer func() {
 775		if flushErr := a.messages.FlushAll(ctx); flushErr != nil {
 776			slog.Error("Failed to flush pending message updates after summarize", "error", flushErr)
 777		}
 778	}()
 779
 780	agent := fantasy.NewAgent(
 781		largeModel.Model,
 782		fantasy.WithSystemPrompt(string(summaryPrompt)),
 783		fantasy.WithUserAgent(userAgent),
 784	)
 785	summaryMessage, err := a.messages.Create(ctx, sessionID, message.CreateMessageParams{
 786		Role:             message.Assistant,
 787		Model:            largeModel.ModelCfg.Model,
 788		Provider:         largeModel.ModelCfg.Provider,
 789		IsSummaryMessage: true,
 790	})
 791	if err != nil {
 792		return err
 793	}
 794
 795	summaryPromptText := buildSummaryPrompt(currentSession.Todos)
 796
 797	resp, err := agent.Stream(genCtx, fantasy.AgentStreamCall{
 798		Prompt:          summaryPromptText,
 799		Messages:        aiMsgs,
 800		ProviderOptions: opts,
 801		PrepareStep: func(callContext context.Context, options fantasy.PrepareStepFunctionOptions) (_ context.Context, prepared fantasy.PrepareStepResult, err error) {
 802			prepared.Messages = options.Messages
 803			if systemPromptPrefix != "" {
 804				prepared.Messages = append([]fantasy.Message{fantasy.NewSystemMessage(systemPromptPrefix)}, prepared.Messages...)
 805			}
 806			return callContext, prepared, nil
 807		},
 808		OnReasoningDelta: func(id string, text string) error {
 809			summaryMessage.AppendReasoningContent(text)
 810			return a.messages.Update(genCtx, summaryMessage)
 811		},
 812		OnReasoningEnd: func(id string, reasoning fantasy.ReasoningContent) error {
 813			// Handle anthropic signature.
 814			if anthropicData, ok := reasoning.ProviderMetadata["anthropic"]; ok {
 815				if signature, ok := anthropicData.(*anthropic.ReasoningOptionMetadata); ok && signature.Signature != "" {
 816					summaryMessage.AppendReasoningSignature(signature.Signature)
 817				}
 818			}
 819			summaryMessage.FinishThinking()
 820			return a.messages.Update(genCtx, summaryMessage)
 821		},
 822		OnTextDelta: func(id, text string) error {
 823			summaryMessage.AppendContent(text)
 824			return a.messages.Update(genCtx, summaryMessage)
 825		},
 826	})
 827	if err != nil {
 828		isCancelErr := errors.Is(err, context.Canceled)
 829		if isCancelErr {
 830			// User cancelled summarize we need to remove the summary message.
 831			deleteErr := a.messages.Delete(ctx, summaryMessage.ID)
 832			return deleteErr
 833		}
 834		// Mark the summary message as finished with an error so the UI
 835		// stops spinning.
 836		summaryMessage.AddFinish(message.FinishReasonError, "Summarization Error", err.Error())
 837		if updateErr := a.messages.Update(ctx, summaryMessage); updateErr != nil {
 838			return updateErr
 839		}
 840		return err
 841	}
 842
 843	summaryMessage.AddFinish(message.FinishReasonEndTurn, "", "")
 844	err = a.messages.Update(genCtx, summaryMessage)
 845	if err != nil {
 846		return err
 847	}
 848
 849	var openrouterCost *float64
 850	for _, step := range resp.Steps {
 851		stepCost := a.openrouterCost(step.ProviderMetadata)
 852		if stepCost != nil {
 853			newCost := *stepCost
 854			if openrouterCost != nil {
 855				newCost += *openrouterCost
 856			}
 857			openrouterCost = &newCost
 858		}
 859	}
 860
 861	a.updateSessionUsage(largeModel, &currentSession, resp.TotalUsage, openrouterCost, false)
 862
 863	// Just in case, get just the last usage info.
 864	usage := resp.Response.Usage
 865	currentSession.SummaryMessageID = summaryMessage.ID
 866	currentSession.CompletionTokens = summaryCompletionTokens(usage, summaryMessage)
 867	currentSession.PromptTokens = 0
 868	currentSession.EstimatedUsage = usageIsZero(usage)
 869	_, err = a.sessions.Save(genCtx, currentSession)
 870	if err != nil {
 871		return err
 872	}
 873
 874	// Release the active request before processing queued messages so that
 875	// Run() does not see the session as busy.
 876	a.activeRequests.Del(sessionID)
 877	cancel()
 878
 879	// Process any messages that were queued while summarizing.
 880	queuedMessages, ok := a.messageQueue.Get(sessionID)
 881	if !ok || len(queuedMessages) == 0 {
 882		return nil
 883	}
 884	firstQueuedMessage := queuedMessages[0]
 885	a.messageQueue.Set(sessionID, queuedMessages[1:])
 886	_, qErr := a.Run(ctx, firstQueuedMessage)
 887	return qErr
 888}
 889
 890func (a *sessionAgent) getCacheControlOptions() fantasy.ProviderOptions {
 891	if t, _ := strconv.ParseBool(os.Getenv("CRUSH_DISABLE_ANTHROPIC_CACHE")); t {
 892		return fantasy.ProviderOptions{}
 893	}
 894	return fantasy.ProviderOptions{
 895		anthropic.Name: &anthropic.ProviderCacheControlOptions{
 896			CacheControl: anthropic.CacheControl{Type: "ephemeral"},
 897		},
 898		bedrock.Name: &anthropic.ProviderCacheControlOptions{
 899			CacheControl: anthropic.CacheControl{Type: "ephemeral"},
 900		},
 901		vercel.Name: &anthropic.ProviderCacheControlOptions{
 902			CacheControl: anthropic.CacheControl{Type: "ephemeral"},
 903		},
 904	}
 905}
 906
 907func (a *sessionAgent) createUserMessage(ctx context.Context, call SessionAgentCall) (message.Message, error) {
 908	parts := []message.ContentPart{message.TextContent{Text: call.Prompt}}
 909	var attachmentParts []message.ContentPart
 910	for _, attachment := range call.Attachments {
 911		attachmentParts = append(attachmentParts, message.BinaryContent{Path: attachment.FilePath, MIMEType: attachment.MimeType, Data: attachment.Content})
 912	}
 913	parts = append(parts, attachmentParts...)
 914	msg, err := a.messages.Create(ctx, call.SessionID, message.CreateMessageParams{
 915		Role:  message.User,
 916		Parts: parts,
 917	})
 918	if err != nil {
 919		return message.Message{}, fmt.Errorf("failed to create user message: %w", err)
 920	}
 921	return msg, nil
 922}
 923
 924func (a *sessionAgent) preparePrompt(msgs []message.Message, supportsImages bool, attachments ...message.Attachment) ([]fantasy.Message, []fantasy.FilePart) {
 925	var history []fantasy.Message
 926	if !a.isSubAgent {
 927		history = append(history, fantasy.NewUserMessage(
 928			fmt.Sprintf(
 929				"<system_reminder>%s</system_reminder>",
 930				`This is a reminder that your todo list is currently empty. DO NOT mention this to the user explicitly because they are already aware.
 931If you are working on tasks that would benefit from a todo list please use the "todos" tool to create one.
 932If not, please feel free to ignore. Again do not mention this message to the user.`,
 933			),
 934		))
 935	}
 936	// Collect all tool call IDs present in assistant messages and all tool
 937	// result IDs present in tool messages. This lets us detect both orphaned
 938	// tool results (result without a call) and orphaned tool calls (call
 939	// without a result).
 940	knownToolCallIDs := make(map[string]struct{})
 941	knownToolResultIDs := make(map[string]struct{})
 942	for _, m := range msgs {
 943		switch m.Role {
 944		case message.Assistant:
 945			for _, tc := range m.ToolCalls() {
 946				knownToolCallIDs[tc.ID] = struct{}{}
 947			}
 948		case message.Tool:
 949			for _, tr := range m.ToolResults() {
 950				knownToolResultIDs[tr.ToolCallID] = struct{}{}
 951			}
 952		}
 953	}
 954
 955	for _, m := range msgs {
 956		if len(m.Parts) == 0 {
 957			continue
 958		}
 959		// Assistant message without content or tool calls (cancelled before it returned anything).
 960		if m.Role == message.Assistant && len(m.ToolCalls()) == 0 && m.Content().Text == "" && m.ReasoningContent().String() == "" {
 961			continue
 962		}
 963		if m.Role == message.Tool {
 964			if msg, ok := filterOrphanedToolResults(m, knownToolCallIDs); ok {
 965				history = append(history, msg)
 966			}
 967			continue
 968		}
 969		aiMsgs := m.ToAIMessage()
 970		if !supportsImages {
 971			for i := range aiMsgs {
 972				if aiMsgs[i].Role == fantasy.MessageRoleUser {
 973					aiMsgs[i].Content = filterFileParts(aiMsgs[i].Content)
 974				}
 975			}
 976		}
 977		history = append(history, aiMsgs...)
 978
 979		if m.Role == message.Assistant {
 980			if msg, ok := syntheticToolResultsForOrphanedCalls(m, knownToolResultIDs); ok {
 981				history = append(history, msg)
 982			}
 983		}
 984	}
 985
 986	var files []fantasy.FilePart
 987	for _, attachment := range attachments {
 988		if attachment.IsText() {
 989			continue
 990		}
 991		files = append(files, fantasy.FilePart{
 992			Filename:  attachment.FileName,
 993			Data:      attachment.Content,
 994			MediaType: attachment.MimeType,
 995		})
 996	}
 997
 998	return history, files
 999}
1000
1001// filterFileParts removes fantasy.FilePart entries from a slice of message
1002// parts. Used to strip image attachments from historical user messages when
1003// the current model does not support them.
1004func filterFileParts(parts []fantasy.MessagePart) []fantasy.MessagePart {
1005	filtered := make([]fantasy.MessagePart, 0, len(parts))
1006	for _, part := range parts {
1007		if _, ok := fantasy.AsMessagePart[fantasy.FilePart](part); ok {
1008			continue
1009		}
1010		filtered = append(filtered, part)
1011	}
1012	return filtered
1013}
1014
1015// filterOrphanedToolResults converts a tool message to a fantasy.Message,
1016// dropping any tool result parts whose tool_call_id has no matching tool call
1017// in the known set. An orphaned result causes API validation to fail on every
1018// subsequent turn, permanently locking the session. Returns the filtered
1019// message and true if at least one valid part remains.
1020func filterOrphanedToolResults(m message.Message, knownToolCallIDs map[string]struct{}) (fantasy.Message, bool) {
1021	aiMsgs := m.ToAIMessage()
1022	if len(aiMsgs) == 0 {
1023		return fantasy.Message{}, false
1024	}
1025	var validParts []fantasy.MessagePart
1026	for _, part := range aiMsgs[0].Content {
1027		tr, ok := fantasy.AsMessagePart[fantasy.ToolResultPart](part)
1028		if !ok {
1029			validParts = append(validParts, part)
1030			continue
1031		}
1032		if _, known := knownToolCallIDs[tr.ToolCallID]; known {
1033			validParts = append(validParts, part)
1034		} else {
1035			slog.Warn(
1036				"Dropping orphaned tool result with no matching tool call",
1037				"tool_call_id", tr.ToolCallID,
1038			)
1039		}
1040	}
1041	if len(validParts) == 0 {
1042		return fantasy.Message{}, false
1043	}
1044	msg := aiMsgs[0]
1045	msg.Content = validParts
1046	return msg, true
1047}
1048
1049// syntheticToolResultsForOrphanedCalls returns a tool message containing
1050// synthetic tool results for any tool calls in the assistant message that
1051// have no matching result in knownToolResultIDs. LLM APIs require every
1052// tool_use to be immediately followed by a tool_result; an interrupted
1053// session can leave orphaned tool_use blocks that permanently lock the
1054// conversation. Returns the message and true if any synthetic results were
1055// produced.
1056func syntheticToolResultsForOrphanedCalls(m message.Message, knownToolResultIDs map[string]struct{}) (fantasy.Message, bool) {
1057	var syntheticParts []fantasy.MessagePart
1058	for _, tc := range m.ToolCalls() {
1059		if _, hasResult := knownToolResultIDs[tc.ID]; hasResult {
1060			continue
1061		}
1062		slog.Warn(
1063			"Injecting synthetic tool result for orphaned tool call",
1064			"tool_call_id", tc.ID,
1065			"tool_name", tc.Name,
1066		)
1067		syntheticParts = append(syntheticParts, fantasy.ToolResultPart{
1068			ToolCallID: tc.ID,
1069			Output: fantasy.ToolResultOutputContentError{
1070				Error: errors.New("tool call was interrupted and did not produce a result, you may retry this call if the result is still needed"),
1071			},
1072		})
1073	}
1074	if len(syntheticParts) == 0 {
1075		return fantasy.Message{}, false
1076	}
1077	return fantasy.Message{
1078		Role:    fantasy.MessageRoleTool,
1079		Content: syntheticParts,
1080	}, true
1081}
1082
1083func (a *sessionAgent) getSessionMessages(ctx context.Context, session session.Session) ([]message.Message, error) {
1084	msgs, err := a.messages.List(ctx, session.ID)
1085	if err != nil {
1086		return nil, fmt.Errorf("failed to list messages: %w", err)
1087	}
1088
1089	if session.SummaryMessageID != "" {
1090		summaryMsgIndex := -1
1091		for i, msg := range msgs {
1092			if msg.ID == session.SummaryMessageID {
1093				summaryMsgIndex = i
1094				break
1095			}
1096		}
1097		if summaryMsgIndex != -1 {
1098			msgs = msgs[summaryMsgIndex:]
1099			msgs[0].Role = message.User
1100		}
1101	}
1102	return msgs, nil
1103}
1104
1105// generateTitle generates a session titled based on the initial prompt.
1106func (a *sessionAgent) generateTitle(ctx context.Context, sessionID string, userPrompt string) {
1107	if userPrompt == "" {
1108		return
1109	}
1110
1111	smallModel := a.smallModel.Get()
1112	largeModel := a.largeModel.Get()
1113	systemPromptPrefix := a.systemPromptPrefix.Get()
1114
1115	var maxOutputTokens int64 = 40
1116	if smallModel.CatwalkCfg.CanReason {
1117		maxOutputTokens = smallModel.CatwalkCfg.DefaultMaxTokens
1118	}
1119
1120	newAgent := func(m fantasy.LanguageModel, p []byte, tok int64) fantasy.Agent {
1121		return fantasy.NewAgent(
1122			m,
1123			fantasy.WithSystemPrompt(string(p)+"\n /no_think"),
1124			fantasy.WithMaxOutputTokens(tok),
1125			fantasy.WithUserAgent(userAgent),
1126		)
1127	}
1128
1129	streamCall := fantasy.AgentStreamCall{
1130		Prompt: fmt.Sprintf("Generate a concise title for the following content:\n\n%s\n <think>\n\n</think>", userPrompt),
1131		PrepareStep: func(callCtx context.Context, opts fantasy.PrepareStepFunctionOptions) (_ context.Context, prepared fantasy.PrepareStepResult, err error) {
1132			prepared.Messages = opts.Messages
1133			if systemPromptPrefix != "" {
1134				prepared.Messages = append([]fantasy.Message{
1135					fantasy.NewSystemMessage(systemPromptPrefix),
1136				}, prepared.Messages...)
1137			}
1138			return callCtx, prepared, nil
1139		},
1140	}
1141
1142	// Use the small model to generate the title.
1143	model := smallModel
1144	agent := newAgent(model.Model, titlePrompt, maxOutputTokens)
1145	resp, err := agent.Stream(ctx, streamCall)
1146	if err == nil {
1147		// We successfully generated a title with the small model.
1148		slog.Debug("Generated title with small model")
1149	} else {
1150		// It didn't work. Let's try with the big model.
1151		slog.Error("Error generating title with small model; trying big model", "err", err)
1152		model = largeModel
1153		agent = newAgent(model.Model, titlePrompt, maxOutputTokens)
1154		resp, err = agent.Stream(ctx, streamCall)
1155		if err == nil {
1156			slog.Debug("Generated title with large model")
1157		} else {
1158			// Welp, the large model didn't work either. Use the default
1159			// session name and return.
1160			slog.Error("Error generating title with large model", "err", err)
1161			saveErr := a.sessions.Rename(ctx, sessionID, DefaultSessionName)
1162			if saveErr != nil {
1163				slog.Error("Failed to save session title", "error", saveErr)
1164			}
1165			return
1166		}
1167	}
1168
1169	if resp == nil {
1170		// Actually, we didn't get a response so we can't. Use the default
1171		// session name and return.
1172		slog.Error("Response is nil; can't generate title")
1173		saveErr := a.sessions.Rename(ctx, sessionID, DefaultSessionName)
1174		if saveErr != nil {
1175			slog.Error("Failed to save session title", "error", saveErr)
1176		}
1177		return
1178	}
1179
1180	// Clean up title.
1181	var title string
1182	title = strings.ReplaceAll(resp.Response.Content.Text(), "\n", " ")
1183
1184	// Remove thinking tags if present.
1185	title = thinkTagRegex.ReplaceAllString(title, "")
1186	title = orphanThinkTagRegex.ReplaceAllString(title, "")
1187
1188	title = strings.TrimSpace(title)
1189	title = cmp.Or(title, DefaultSessionName)
1190
1191	// Calculate usage and cost.
1192	var openrouterCost *float64
1193	for _, step := range resp.Steps {
1194		stepCost := a.openrouterCost(step.ProviderMetadata)
1195		if stepCost != nil {
1196			newCost := *stepCost
1197			if openrouterCost != nil {
1198				newCost += *openrouterCost
1199			}
1200			openrouterCost = &newCost
1201		}
1202	}
1203
1204	modelConfig := model.CatwalkCfg
1205	cost := modelConfig.CostPer1MInCached/1e6*float64(resp.TotalUsage.CacheCreationTokens) +
1206		modelConfig.CostPer1MOutCached/1e6*float64(resp.TotalUsage.CacheReadTokens) +
1207		modelConfig.CostPer1MIn/1e6*float64(resp.TotalUsage.InputTokens) +
1208		modelConfig.CostPer1MOut/1e6*float64(resp.TotalUsage.OutputTokens)
1209
1210	// Use override cost if available (e.g., from OpenRouter).
1211	if openrouterCost != nil {
1212		cost = *openrouterCost
1213	}
1214
1215	// Skip cost accumulation
1216	if model.FlatRate {
1217		cost = 0
1218	}
1219
1220	promptTokens := resp.TotalUsage.InputTokens + resp.TotalUsage.CacheCreationTokens
1221	completionTokens := resp.TotalUsage.OutputTokens
1222
1223	// Atomically update only title and usage fields to avoid overriding other
1224	// concurrent session updates.
1225	saveErr := a.sessions.UpdateTitleAndUsage(ctx, sessionID, title, promptTokens, completionTokens, cost)
1226	if saveErr != nil {
1227		slog.Error("Failed to save session title and usage", "error", saveErr)
1228		return
1229	}
1230}
1231
1232func (a *sessionAgent) openrouterCost(metadata fantasy.ProviderMetadata) *float64 {
1233	openrouterMetadata, ok := metadata[openrouter.Name]
1234	if !ok {
1235		return nil
1236	}
1237
1238	opts, ok := openrouterMetadata.(*openrouter.ProviderMetadata)
1239	if !ok {
1240		return nil
1241	}
1242	return &opts.Usage.Cost
1243}
1244
1245func (a *sessionAgent) updateSessionUsage(model Model, session *session.Session, usage fantasy.Usage, overrideCost *float64, estimated bool) {
1246	if !usageIsZero(usage) {
1247		session.EstimatedUsage = estimated
1248	}
1249
1250	modelConfig := model.CatwalkCfg
1251	cost := modelConfig.CostPer1MInCached/1e6*float64(usage.CacheCreationTokens) +
1252		modelConfig.CostPer1MOutCached/1e6*float64(usage.CacheReadTokens) +
1253		modelConfig.CostPer1MIn/1e6*float64(usage.InputTokens) +
1254		modelConfig.CostPer1MOut/1e6*float64(usage.OutputTokens)
1255
1256	if !estimated {
1257		a.eventTokensUsed(session.ID, model, usage, cost)
1258	}
1259
1260	if estimated {
1261		cost = 0
1262	} else {
1263		// Use override cost if available (e.g., from OpenRouter).
1264		if overrideCost != nil {
1265			cost = *overrideCost
1266		}
1267
1268		// Skip cost accumulation
1269		if model.FlatRate {
1270			cost = 0
1271		}
1272	}
1273
1274	session.Cost += cost
1275	updateSessionTokenCounters(session, usage)
1276}
1277
1278func updateSessionTokenCounters(session *session.Session, usage fantasy.Usage) {
1279	if usage.OutputTokens != 0 {
1280		session.CompletionTokens = usage.OutputTokens
1281	}
1282	if promptTokens := usage.InputTokens + usage.CacheReadTokens; promptTokens != 0 {
1283		session.PromptTokens = promptTokens
1284	}
1285}
1286
1287func summaryCompletionTokens(usage fantasy.Usage, summaryMessage message.Message) int64 {
1288	if usage.OutputTokens != 0 {
1289		return usage.OutputTokens
1290	}
1291	return approxTokenCount(summaryMessage.Content().Text) + approxTokenCount(summaryMessage.ReasoningContent().String())
1292}
1293
1294func (a *sessionAgent) Cancel(sessionID string) {
1295	// Cancel regular requests. Don't use Take() here - we need the entry to
1296	// remain in activeRequests so IsBusy() returns true until the goroutine
1297	// fully completes (including error handling that may access the DB).
1298	// The defer in processRequest will clean up the entry.
1299	if cancel, ok := a.activeRequests.Get(sessionID); ok && cancel != nil {
1300		slog.Debug("Request cancellation initiated", "session_id", sessionID)
1301		cancel()
1302	}
1303
1304	// Also check for summarize requests.
1305	if cancel, ok := a.activeRequests.Get(sessionID + "-summarize"); ok && cancel != nil {
1306		slog.Debug("Summarize cancellation initiated", "session_id", sessionID)
1307		cancel()
1308	}
1309
1310	if a.QueuedPrompts(sessionID) > 0 {
1311		slog.Debug("Clearing queued prompts", "session_id", sessionID)
1312		a.messageQueue.Del(sessionID)
1313	}
1314}
1315
1316func (a *sessionAgent) ClearQueue(sessionID string) {
1317	if a.QueuedPrompts(sessionID) > 0 {
1318		slog.Debug("Clearing queued prompts", "session_id", sessionID)
1319		a.messageQueue.Del(sessionID)
1320	}
1321}
1322
1323func (a *sessionAgent) CancelAll() {
1324	if !a.IsBusy() {
1325		return
1326	}
1327	for key := range a.activeRequests.Seq2() {
1328		a.Cancel(key) // key is sessionID
1329	}
1330
1331	timeout := time.After(5 * time.Second)
1332	for a.IsBusy() {
1333		select {
1334		case <-timeout:
1335			return
1336		default:
1337			time.Sleep(200 * time.Millisecond)
1338		}
1339	}
1340}
1341
1342func (a *sessionAgent) IsBusy() bool {
1343	var busy bool
1344	for cancelFunc := range a.activeRequests.Seq() {
1345		if cancelFunc != nil {
1346			busy = true
1347			break
1348		}
1349	}
1350	return busy
1351}
1352
1353func (a *sessionAgent) IsSessionBusy(sessionID string) bool {
1354	_, busy := a.activeRequests.Get(sessionID)
1355	return busy
1356}
1357
1358func (a *sessionAgent) QueuedPrompts(sessionID string) int {
1359	l, ok := a.messageQueue.Get(sessionID)
1360	if !ok {
1361		return 0
1362	}
1363	return len(l)
1364}
1365
1366func (a *sessionAgent) QueuedPromptsList(sessionID string) []string {
1367	l, ok := a.messageQueue.Get(sessionID)
1368	if !ok {
1369		return nil
1370	}
1371	prompts := make([]string, len(l))
1372	for i, call := range l {
1373		prompts[i] = call.Prompt
1374	}
1375	return prompts
1376}
1377
1378func (a *sessionAgent) SetModels(large Model, small Model) {
1379	a.largeModel.Set(large)
1380	a.smallModel.Set(small)
1381}
1382
1383func (a *sessionAgent) SetTools(tools []fantasy.AgentTool) {
1384	a.tools.SetSlice(tools)
1385}
1386
1387func (a *sessionAgent) SetSystemPrompt(systemPrompt string) {
1388	a.systemPrompt.Set(systemPrompt)
1389}
1390
1391func (a *sessionAgent) Model() Model {
1392	return a.largeModel.Get()
1393}
1394
1395// convertToToolResult converts a fantasy tool result to a message tool result.
1396func (a *sessionAgent) convertToToolResult(result fantasy.ToolResultContent) message.ToolResult {
1397	baseResult := message.ToolResult{
1398		ToolCallID: result.ToolCallID,
1399		Name:       result.ToolName,
1400		Metadata:   result.ClientMetadata,
1401	}
1402
1403	switch result.Result.GetType() {
1404	case fantasy.ToolResultContentTypeText:
1405		if r, ok := fantasy.AsToolResultOutputType[fantasy.ToolResultOutputContentText](result.Result); ok {
1406			baseResult.Content = r.Text
1407		}
1408	case fantasy.ToolResultContentTypeError:
1409		if r, ok := fantasy.AsToolResultOutputType[fantasy.ToolResultOutputContentError](result.Result); ok {
1410			baseResult.Content = r.Error.Error()
1411			baseResult.IsError = true
1412		}
1413	case fantasy.ToolResultContentTypeMedia:
1414		if r, ok := fantasy.AsToolResultOutputType[fantasy.ToolResultOutputContentMedia](result.Result); ok {
1415			if !stringext.IsValidBase64(r.Data) {
1416				slog.Warn(
1417					"Tool returned media with invalid base64 data, discarding image",
1418					"tool", result.ToolName,
1419					"tool_call_id", result.ToolCallID,
1420				)
1421				baseResult.Content = "Tool returned image data with invalid encoding"
1422				baseResult.IsError = true
1423			} else {
1424				content := r.Text
1425				if content == "" {
1426					content = fmt.Sprintf("Loaded %s content", r.MediaType)
1427				}
1428				baseResult.Content = content
1429				baseResult.Data = r.Data
1430				baseResult.MIMEType = r.MediaType
1431			}
1432		}
1433	}
1434
1435	return baseResult
1436}
1437
1438// workaroundProviderMediaLimitations converts media content in tool results to
1439// user messages for providers that don't natively support images in tool results.
1440//
1441// Problem: OpenAI, Google, OpenRouter, and other OpenAI-compatible providers
1442// don't support sending images/media in tool result messages - they only accept
1443// text in tool results. However, they DO support images in user messages.
1444//
1445// If we send media in tool results to these providers, the API returns an error.
1446//
1447// Solution: For these providers, we:
1448//  1. Replace the media in the tool result with a text placeholder
1449//  2. Inject a user message immediately after with the image as a file attachment
1450//  3. This maintains the tool execution flow while working around API limitations
1451//
1452// Anthropic and Bedrock support images natively in tool results, so we skip
1453// this workaround for them.
1454//
1455// Example transformation:
1456//
1457//	BEFORE: [tool result: image data]
1458//	AFTER:  [tool result: "Image loaded - see attached"], [user: image attachment]
1459func (a *sessionAgent) workaroundProviderMediaLimitations(messages []fantasy.Message, largeModel Model) []fantasy.Message {
1460	providerSupportsMedia := largeModel.ModelCfg.Provider == string(catwalk.InferenceProviderAnthropic) ||
1461		largeModel.ModelCfg.Provider == string(catwalk.InferenceProviderBedrock) ||
1462		largeModel.ModelCfg.Provider == string(catwalk.InferenceProviderBedrockEurope)
1463
1464	if providerSupportsMedia {
1465		return messages
1466	}
1467
1468	convertedMessages := make([]fantasy.Message, 0, len(messages))
1469
1470	for _, msg := range messages {
1471		if msg.Role != fantasy.MessageRoleTool {
1472			convertedMessages = append(convertedMessages, msg)
1473			continue
1474		}
1475
1476		textParts := make([]fantasy.MessagePart, 0, len(msg.Content))
1477		var mediaFiles []fantasy.FilePart
1478
1479		for _, part := range msg.Content {
1480			toolResult, ok := fantasy.AsMessagePart[fantasy.ToolResultPart](part)
1481			if !ok {
1482				textParts = append(textParts, part)
1483				continue
1484			}
1485
1486			if media, ok := fantasy.AsToolResultOutputType[fantasy.ToolResultOutputContentMedia](toolResult.Output); ok {
1487				decoded, err := base64.StdEncoding.DecodeString(media.Data)
1488				if err != nil {
1489					slog.Warn("Failed to decode media data", "error", err)
1490					textParts = append(textParts, part)
1491					continue
1492				}
1493
1494				mediaFiles = append(mediaFiles, fantasy.FilePart{
1495					Data:      decoded,
1496					MediaType: media.MediaType,
1497					Filename:  fmt.Sprintf("tool-result-%s", toolResult.ToolCallID),
1498				})
1499
1500				textParts = append(textParts, fantasy.ToolResultPart{
1501					ToolCallID: toolResult.ToolCallID,
1502					Output: fantasy.ToolResultOutputContentText{
1503						Text: "[Image/media content loaded - see attached file]",
1504					},
1505					ProviderOptions: toolResult.ProviderOptions,
1506				})
1507			} else {
1508				textParts = append(textParts, part)
1509			}
1510		}
1511
1512		convertedMessages = append(convertedMessages, fantasy.Message{
1513			Role:    fantasy.MessageRoleTool,
1514			Content: textParts,
1515		})
1516
1517		if len(mediaFiles) > 0 {
1518			convertedMessages = append(convertedMessages, fantasy.NewUserMessage(
1519				"Here is the media content from the tool result:",
1520				mediaFiles...,
1521			))
1522		}
1523	}
1524
1525	return convertedMessages
1526}
1527
1528// buildSummaryPrompt constructs the prompt text for session summarization.
1529func buildSummaryPrompt(todos []session.Todo) string {
1530	var sb strings.Builder
1531	sb.WriteString("Provide a detailed summary of our conversation above.")
1532	if len(todos) > 0 {
1533		sb.WriteString("\n\n## Current Todo List\n\n")
1534		for _, t := range todos {
1535			fmt.Fprintf(&sb, "- [%s] %s\n", t.Status, t.Content)
1536		}
1537		sb.WriteString("\nInclude these tasks and their statuses in your summary. ")
1538		sb.WriteString("Instruct the resuming assistant to use the `todos` tool to continue tracking progress on these tasks.")
1539	}
1540	return sb.String()
1541}
1542
1543func providerRetryLogFields(err *fantasy.ProviderError, delay time.Duration) []any {
1544	fields := []any{
1545		"retry_delay", delay.String(),
1546	}
1547	if err == nil {
1548		return fields
1549	}
1550	fields = append(fields, "status_code", err.StatusCode)
1551	if err.Title != "" {
1552		fields = append(fields, "title", err.Title)
1553	}
1554	if err.Message != "" {
1555		fields = append(fields, "message", err.Message)
1556	}
1557	return fields
1558}