agent.go

   1// Package agent is the core orchestration layer for Crush AI agents.
   2//
   3// It provides session-based AI agent functionality for managing
   4// conversations, tool execution, and message handling. It coordinates
   5// interactions between language models, messages, sessions, and tools while
   6// handling features like automatic summarization, queuing, and token
   7// management.
   8package agent
   9
  10import (
  11	"cmp"
  12	"context"
  13	_ "embed"
  14	"encoding/base64"
  15	"errors"
  16	"fmt"
  17	"log/slog"
  18	"net/http"
  19	"os"
  20	"regexp"
  21	"strconv"
  22	"strings"
  23	"sync"
  24	"time"
  25
  26	"charm.land/catwalk/pkg/catwalk"
  27	"charm.land/fantasy"
  28	"charm.land/fantasy/providers/anthropic"
  29	"charm.land/fantasy/providers/bedrock"
  30	"charm.land/fantasy/providers/google"
  31	"charm.land/fantasy/providers/openai"
  32	"charm.land/fantasy/providers/openrouter"
  33	"charm.land/fantasy/providers/vercel"
  34	"charm.land/lipgloss/v2"
  35	"github.com/charmbracelet/crush/internal/agent/hyper"
  36	"github.com/charmbracelet/crush/internal/agent/notify"
  37	"github.com/charmbracelet/crush/internal/agent/tools"
  38	"github.com/charmbracelet/crush/internal/agent/tools/mcp"
  39	"github.com/charmbracelet/crush/internal/config"
  40	"github.com/charmbracelet/crush/internal/csync"
  41	"github.com/charmbracelet/crush/internal/message"
  42	"github.com/charmbracelet/crush/internal/pubsub"
  43	"github.com/charmbracelet/crush/internal/session"
  44	"github.com/charmbracelet/crush/internal/stringext"
  45	"github.com/charmbracelet/crush/internal/version"
  46	"github.com/charmbracelet/x/exp/charmtone"
  47)
  48
  49const (
  50	DefaultSessionName = "Untitled Session"
  51
  52	// Constants for auto-summarization thresholds
  53	largeContextWindowThreshold = 200_000
  54	largeContextWindowBuffer    = 20_000
  55	smallContextWindowRatio     = 0.2
  56)
  57
  58var userAgent = fmt.Sprintf("Charm-Crush/%s (https://charm.land/crush)", version.Version)
  59
  60//go:embed templates/title.md
  61var titlePrompt []byte
  62
  63//go:embed templates/summary.md
  64var summaryPrompt []byte
  65
  66// Used to remove <think> tags from generated titles.
  67var (
  68	thinkTagRegex       = regexp.MustCompile(`(?s)<think>.*?</think>`)
  69	orphanThinkTagRegex = regexp.MustCompile(`</?think>`)
  70)
  71
  72type SessionAgentCall struct {
  73	SessionID string
  74	// RunID, when non-empty, is the caller-supplied correlator that
  75	// gets echoed back on the notify.RunComplete event emitted for
  76	// this turn. It is preserved when the call is enqueued behind a
  77	// busy session so the queued turn's terminal event is still
  78	// recognisable to the original caller. Callers that need a
  79	// reliable completion contract (e.g. `crush run` against a
  80	// session that may be busy) MUST set it; SessionID alone is
  81	// ambiguous when concurrent turns share the same session.
  82	RunID            string
  83	Prompt           string
  84	ProviderOptions  fantasy.ProviderOptions
  85	Attachments      []message.Attachment
  86	MaxOutputTokens  int64
  87	Temperature      *float64
  88	TopP             *float64
  89	TopK             *int64
  90	FrequencyPenalty *float64
  91	PresencePenalty  *float64
  92	NonInteractive   bool
  93	// OnComplete, when non-nil, replaces the default RunComplete
  94	// publish path: the inner Run hands the terminal payload to this
  95	// callback instead of emitting it on the RunComplete broker. The
  96	// coordinator uses this hook to coalesce the unauthorized →
  97	// re-auth → retry chain into a single user-visible terminal
  98	// event, so non-interactive clients (e.g. `crush run`) don't
  99	// exit on a stale failed-attempt RunComplete before the
 100	// successful retry. It is intentionally stripped when queueing
 101	// a busy-session call (see Run): the originating
 102	// coordinator.Run has long returned by the time the queued
 103	// recursion drains, so falling back to the default broker
 104	// publish keeps the event visible to subscribers.
 105	OnComplete func(notify.RunComplete)
 106}
 107
 108type SessionAgent interface {
 109	Run(context.Context, SessionAgentCall) (*fantasy.AgentResult, error)
 110	SetModels(large Model, small Model)
 111	SetTools(tools []fantasy.AgentTool)
 112	SetSystemPrompt(systemPrompt string)
 113	Cancel(sessionID string)
 114	CancelAll()
 115	IsSessionBusy(sessionID string) bool
 116	IsBusy() bool
 117	QueuedPrompts(sessionID string) int
 118	QueuedPromptsList(sessionID string) []string
 119	ClearQueue(sessionID string)
 120	Summarize(context.Context, string, fantasy.ProviderOptions) error
 121	Model() Model
 122}
 123
 124type Model struct {
 125	Model      fantasy.LanguageModel
 126	CatwalkCfg catwalk.Model
 127	ModelCfg   config.SelectedModel
 128	FlatRate   bool
 129}
 130
 131type sessionAgent struct {
 132	largeModel         *csync.Value[Model]
 133	smallModel         *csync.Value[Model]
 134	systemPromptPrefix *csync.Value[string]
 135	systemPrompt       *csync.Value[string]
 136	tools              *csync.Slice[fantasy.AgentTool]
 137
 138	isSubAgent           bool
 139	sessions             session.Service
 140	messages             message.Service
 141	disableAutoSummarize bool
 142	isYolo               bool
 143	notify               pubsub.Publisher[notify.Notification]
 144	runComplete          pubsub.Publisher[notify.RunComplete]
 145
 146	messageQueue   *csync.Map[string, []SessionAgentCall]
 147	activeRequests *csync.Map[string, context.CancelFunc]
 148}
 149
 150type SessionAgentOptions struct {
 151	LargeModel           Model
 152	SmallModel           Model
 153	SystemPromptPrefix   string
 154	SystemPrompt         string
 155	IsSubAgent           bool
 156	DisableAutoSummarize bool
 157	IsYolo               bool
 158	Sessions             session.Service
 159	Messages             message.Service
 160	Tools                []fantasy.AgentTool
 161	Notify               pubsub.Publisher[notify.Notification]
 162	RunComplete          pubsub.Publisher[notify.RunComplete]
 163}
 164
 165func NewSessionAgent(
 166	opts SessionAgentOptions,
 167) SessionAgent {
 168	return &sessionAgent{
 169		largeModel:           csync.NewValue(opts.LargeModel),
 170		smallModel:           csync.NewValue(opts.SmallModel),
 171		systemPromptPrefix:   csync.NewValue(opts.SystemPromptPrefix),
 172		systemPrompt:         csync.NewValue(opts.SystemPrompt),
 173		isSubAgent:           opts.IsSubAgent,
 174		sessions:             opts.Sessions,
 175		messages:             opts.Messages,
 176		disableAutoSummarize: opts.DisableAutoSummarize,
 177		tools:                csync.NewSliceFrom(opts.Tools),
 178		isYolo:               opts.IsYolo,
 179		notify:               opts.Notify,
 180		runComplete:          opts.RunComplete,
 181		messageQueue:         csync.NewMap[string, []SessionAgentCall](),
 182		activeRequests:       csync.NewMap[string, context.CancelFunc](),
 183	}
 184}
 185
 186func (a *sessionAgent) Run(ctx context.Context, call SessionAgentCall) (result *fantasy.AgentResult, retErr error) {
 187	if call.Prompt == "" && !message.ContainsTextAttachment(call.Attachments) {
 188		return nil, ErrEmptyPrompt
 189	}
 190	if call.SessionID == "" {
 191		return nil, ErrSessionMissing
 192	}
 193
 194	// Queue the message if busy. Strip OnComplete: the caller that
 195	// supplied the hook (typically coordinator.Run) has its own
 196	// retry/coalesce scope that ends when it returns, so by the time
 197	// the queue drains nobody is left to consume the buffered
 198	// terminal event. The recursive Run will fall back to the
 199	// default broker publish, which is what existing subscribers
 200	// expect for queued turns.
 201	if a.IsSessionBusy(call.SessionID) {
 202		existing, ok := a.messageQueue.Get(call.SessionID)
 203		if !ok {
 204			existing = []SessionAgentCall{}
 205		}
 206		queued := call
 207		queued.OnComplete = nil
 208		existing = append(existing, queued)
 209		a.messageQueue.Set(call.SessionID, existing)
 210		return nil, nil
 211	}
 212
 213	// Copy mutable fields under lock to avoid races with SetTools/SetModels.
 214	agentTools := a.tools.Copy()
 215	largeModel := a.largeModel.Get()
 216	systemPrompt := a.systemPrompt.Get()
 217	promptPrefix := a.systemPromptPrefix.Get()
 218	var instructions strings.Builder
 219
 220	for _, server := range mcp.GetStates() {
 221		if server.State != mcp.StateConnected {
 222			continue
 223		}
 224		if s := server.Client.InitializeResult().Instructions; s != "" {
 225			instructions.WriteString(s)
 226			instructions.WriteString("\n\n")
 227		}
 228	}
 229
 230	if s := instructions.String(); s != "" {
 231		systemPrompt += "\n\n<mcp-instructions>\n" + s + "\n</mcp-instructions>"
 232	}
 233
 234	if len(agentTools) > 0 {
 235		// Add Anthropic caching to the last tool.
 236		agentTools[len(agentTools)-1].SetProviderOptions(a.getCacheControlOptions())
 237	}
 238
 239	agent := fantasy.NewAgent(
 240		largeModel.Model,
 241		fantasy.WithSystemPrompt(systemPrompt),
 242		fantasy.WithTools(agentTools...),
 243		fantasy.WithUserAgent(userAgent),
 244	)
 245
 246	sessionLock := sync.Mutex{}
 247	currentSession, err := a.sessions.Get(ctx, call.SessionID)
 248	if err != nil {
 249		return nil, fmt.Errorf("failed to get session: %w", err)
 250	}
 251
 252	msgs, err := a.getSessionMessages(ctx, currentSession)
 253	if err != nil {
 254		return nil, fmt.Errorf("failed to get session messages: %w", err)
 255	}
 256
 257	var wg sync.WaitGroup
 258	// Generate title if first message.
 259	if len(msgs) == 0 {
 260		titleCtx := ctx // Copy to avoid race with ctx reassignment below.
 261		wg.Go(func() {
 262			a.generateTitle(titleCtx, call.SessionID, call.Prompt)
 263		})
 264	}
 265	defer wg.Wait()
 266
 267	// Add the user message to the session.
 268	_, err = a.createUserMessage(ctx, call)
 269	if err != nil {
 270		return nil, err
 271	}
 272
 273	// Add the session to the context.
 274	ctx = context.WithValue(ctx, tools.SessionIDContextKey, call.SessionID)
 275
 276	genCtx, cancel := context.WithCancel(ctx)
 277	a.activeRequests.Set(call.SessionID, cancel)
 278
 279	defer cancel()
 280	defer a.activeRequests.Del(call.SessionID)
 281	// skipRunComplete is set just before the queued-recursion path so
 282	// the outer Run doesn't publish a RunComplete that would race
 283	// with — and be superseded by — the recursive call's own
 284	// RunComplete (each queued user prompt is its own turn and
 285	// publishes exactly one terminal event).
 286	var skipRunComplete bool
 287	// currentAssistant is declared here so the deferred RunComplete
 288	// publish below can capture the pointer that PrepareStep will
 289	// later (re)assign for each streaming step. The final assistant
 290	// message of the turn is the value reachable through this
 291	// pointer when the defer runs.
 292	var currentAssistant *message.Message
 293	// Drain any debounced message updates before returning. message.Service
 294	// already flushes synchronously on terminal updates, but a defer here
 295	// guarantees the contract at every Run exit (success, error, panic
 296	// recovery upstream) without callers needing to know.
 297	//
 298	// After the flush completes — meaning all per-message
 299	// Publish(UpdatedEvent) calls have fired and been buffered into
 300	// every subscriber's channel — publish the authoritative
 301	// RunComplete event for this turn. The flush-then-publish order
 302	// gives well-behaved clients the best chance of seeing the final
 303	// message event before RunComplete; the embedded Text field
 304	// reconciles for clients that observe the events out of order
 305	// (the pubsub broker fan-in does not serialize publishes from
 306	// different upstream brokers).
 307	defer func() {
 308		if flushErr := a.messages.FlushAll(ctx); flushErr != nil {
 309			slog.Error("Failed to flush pending message updates after run", "error", flushErr)
 310		}
 311		if skipRunComplete {
 312			return
 313		}
 314		complete := notify.RunComplete{SessionID: call.SessionID, RunID: call.RunID}
 315		if currentAssistant != nil {
 316			complete.MessageID = currentAssistant.ID
 317			complete.Text = currentAssistant.Content().String()
 318		}
 319		if retErr != nil {
 320			complete.Error = retErr.Error()
 321			complete.Cancelled = errors.Is(retErr, context.Canceled)
 322		} else if ctx.Err() != nil {
 323			complete.Cancelled = true
 324		}
 325		// Prefer the per-call hook when supplied so the coordinator
 326		// can coalesce retries (e.g. unauthorized → re-auth → retry)
 327		// into a single user-visible terminal event. The fallback
 328		// must-deliver publish applies bounded-blocking semantics to
 329		// the authoritative terminal event so a momentarily-full
 330		// subscriber channel can't silently drop it and hang
 331		// non-interactive clients waiting on RunComplete.
 332		if call.OnComplete != nil {
 333			call.OnComplete(complete)
 334			return
 335		}
 336		if a.runComplete == nil {
 337			return
 338		}
 339		a.runComplete.PublishMustDeliver(ctx, pubsub.UpdatedEvent, complete)
 340	}()
 341
 342	history, files := a.preparePrompt(msgs, largeModel.CatwalkCfg.SupportsImages, call.Attachments...)
 343
 344	startTime := time.Now()
 345	a.eventPromptSent(call.SessionID)
 346
 347	var stepMessages []fantasy.Message
 348	var shouldSummarize bool
 349	// Don't send MaxOutputTokens if 0 — some providers (e.g. LM Studio) reject it
 350	var maxOutputTokens *int64
 351	if call.MaxOutputTokens > 0 {
 352		maxOutputTokens = &call.MaxOutputTokens
 353	}
 354	result, err = agent.Stream(genCtx, fantasy.AgentStreamCall{
 355		Prompt:           message.PromptWithTextAttachments(call.Prompt, call.Attachments),
 356		Files:            files,
 357		Messages:         history,
 358		ProviderOptions:  call.ProviderOptions,
 359		MaxOutputTokens:  maxOutputTokens,
 360		TopP:             call.TopP,
 361		Temperature:      call.Temperature,
 362		PresencePenalty:  call.PresencePenalty,
 363		TopK:             call.TopK,
 364		FrequencyPenalty: call.FrequencyPenalty,
 365		PrepareStep: func(callContext context.Context, options fantasy.PrepareStepFunctionOptions) (_ context.Context, prepared fantasy.PrepareStepResult, err error) {
 366			prepared.Messages = options.Messages
 367			for i := range prepared.Messages {
 368				prepared.Messages[i].ProviderOptions = nil
 369			}
 370
 371			// Use latest tools (updated by SetTools when MCP tools change).
 372			prepared.Tools = a.tools.Copy()
 373
 374			queuedCalls, _ := a.messageQueue.Get(call.SessionID)
 375			a.messageQueue.Del(call.SessionID)
 376			for _, queued := range queuedCalls {
 377				userMessage, createErr := a.createUserMessage(callContext, queued)
 378				if createErr != nil {
 379					return callContext, prepared, createErr
 380				}
 381				prepared.Messages = append(prepared.Messages, userMessage.ToAIMessage()...)
 382			}
 383
 384			prepared.Messages = a.workaroundProviderMediaLimitations(prepared.Messages, largeModel)
 385
 386			lastSystemRoleInx := 0
 387			systemMessageUpdated := false
 388			for i, msg := range prepared.Messages {
 389				// Only add cache control to the last message.
 390				if msg.Role == fantasy.MessageRoleSystem {
 391					lastSystemRoleInx = i
 392				} else if !systemMessageUpdated {
 393					prepared.Messages[lastSystemRoleInx].ProviderOptions = a.getCacheControlOptions()
 394					systemMessageUpdated = true
 395				}
 396				// Than add cache control to the last 2 messages.
 397				if i > len(prepared.Messages)-3 {
 398					prepared.Messages[i].ProviderOptions = a.getCacheControlOptions()
 399				}
 400			}
 401
 402			if promptPrefix != "" {
 403				prepared.Messages = append([]fantasy.Message{fantasy.NewSystemMessage(promptPrefix)}, prepared.Messages...)
 404			}
 405
 406			sessionLock.Lock()
 407			stepMessages = cloneFantasyMessages(prepared.Messages)
 408			sessionLock.Unlock()
 409
 410			var assistantMsg message.Message
 411			assistantMsg, err = a.messages.Create(callContext, call.SessionID, message.CreateMessageParams{
 412				Role:     message.Assistant,
 413				Parts:    []message.ContentPart{},
 414				Model:    largeModel.ModelCfg.Model,
 415				Provider: largeModel.ModelCfg.Provider,
 416			})
 417			if err != nil {
 418				return callContext, prepared, err
 419			}
 420			callContext = context.WithValue(callContext, tools.MessageIDContextKey, assistantMsg.ID)
 421			callContext = context.WithValue(callContext, tools.SupportsImagesContextKey, largeModel.CatwalkCfg.SupportsImages)
 422			callContext = context.WithValue(callContext, tools.ModelNameContextKey, largeModel.CatwalkCfg.Name)
 423			currentAssistant = &assistantMsg
 424			return callContext, prepared, err
 425		},
 426		OnReasoningStart: func(id string, reasoning fantasy.ReasoningContent) error {
 427			currentAssistant.AppendReasoningContent(reasoning.Text)
 428			return a.messages.Update(genCtx, *currentAssistant)
 429		},
 430		OnReasoningDelta: func(id string, text string) error {
 431			currentAssistant.AppendReasoningContent(text)
 432			return a.messages.Update(genCtx, *currentAssistant)
 433		},
 434		OnReasoningEnd: func(id string, reasoning fantasy.ReasoningContent) error {
 435			// handle anthropic signature
 436			if anthropicData, ok := reasoning.ProviderMetadata[anthropic.Name]; ok {
 437				if reasoning, ok := anthropicData.(*anthropic.ReasoningOptionMetadata); ok {
 438					currentAssistant.AppendReasoningSignature(reasoning.Signature)
 439				}
 440			}
 441			if googleData, ok := reasoning.ProviderMetadata[google.Name]; ok {
 442				if reasoning, ok := googleData.(*google.ReasoningMetadata); ok {
 443					currentAssistant.AppendThoughtSignature(reasoning.Signature, reasoning.ToolID)
 444				}
 445			}
 446			if openaiData, ok := reasoning.ProviderMetadata[openai.Name]; ok {
 447				if reasoning, ok := openaiData.(*openai.ResponsesReasoningMetadata); ok {
 448					currentAssistant.SetReasoningResponsesData(reasoning)
 449				}
 450			}
 451			currentAssistant.FinishThinking()
 452			return a.messages.Update(genCtx, *currentAssistant)
 453		},
 454		OnTextDelta: func(id string, text string) error {
 455			// Strip leading newline from initial text content. This is is
 456			// particularly important in non-interactive mode where leading
 457			// newlines are very visible.
 458			if len(currentAssistant.Parts) == 0 {
 459				text = strings.TrimPrefix(text, "\n")
 460			}
 461
 462			currentAssistant.AppendContent(text)
 463			return a.messages.Update(genCtx, *currentAssistant)
 464		},
 465		OnToolInputStart: func(id string, toolName string) error {
 466			toolCall := message.ToolCall{
 467				ID:               id,
 468				Name:             toolName,
 469				ProviderExecuted: false,
 470				Finished:         false,
 471			}
 472			currentAssistant.AddToolCall(toolCall)
 473			// Use parent ctx instead of genCtx to ensure the update succeeds
 474			// even if the request is canceled mid-stream
 475			return a.messages.Update(ctx, *currentAssistant)
 476		},
 477		OnRetry: func(err *fantasy.ProviderError, delay time.Duration) {
 478			slog.Warn("Provider request failed, retrying", providerRetryLogFields(err, delay)...)
 479		},
 480		OnToolCall: func(tc fantasy.ToolCallContent) error {
 481			toolCall := message.ToolCall{
 482				ID:               tc.ToolCallID,
 483				Name:             tc.ToolName,
 484				Input:            tc.Input,
 485				ProviderExecuted: false,
 486				Finished:         true,
 487			}
 488			currentAssistant.AddToolCall(toolCall)
 489			// Use parent ctx instead of genCtx to ensure the update succeeds
 490			// even if the request is canceled mid-stream
 491			return a.messages.Update(ctx, *currentAssistant)
 492		},
 493		OnToolResult: func(result fantasy.ToolResultContent) error {
 494			toolResult := a.convertToToolResult(result)
 495			// Use parent ctx instead of genCtx to ensure the message is created
 496			// even if the request is canceled mid-stream
 497			_, createMsgErr := a.messages.Create(ctx, currentAssistant.SessionID, message.CreateMessageParams{
 498				Role: message.Tool,
 499				Parts: []message.ContentPart{
 500					toolResult,
 501				},
 502			})
 503			return createMsgErr
 504		},
 505		OnStepFinish: func(stepResult fantasy.StepResult) error {
 506			finishReason := message.FinishReasonUnknown
 507			switch stepResult.FinishReason {
 508			case fantasy.FinishReasonLength:
 509				finishReason = message.FinishReasonMaxTokens
 510			case fantasy.FinishReasonStop:
 511				finishReason = message.FinishReasonEndTurn
 512			case fantasy.FinishReasonToolCalls:
 513				finishReason = message.FinishReasonToolUse
 514			}
 515			// If a tool result halted the turn (e.g. a hook halt or a
 516			// permission denial), the step ends on FinishReasonToolCalls but
 517			// the model will not be called again. Treat it as the end of the
 518			// turn so the UI can render the assistant footer.
 519			if finishReason == message.FinishReasonToolUse {
 520				for _, tr := range stepResult.Content.ToolResults() {
 521					if tr.StopTurn {
 522						finishReason = message.FinishReasonEndTurn
 523						break
 524					}
 525				}
 526			}
 527			currentAssistant.AddFinish(finishReason, "", "")
 528			sessionLock.Lock()
 529			defer sessionLock.Unlock()
 530
 531			updatedSession, getSessionErr := a.sessions.Get(ctx, call.SessionID)
 532			if getSessionErr != nil {
 533				return getSessionErr
 534			}
 535			usage, estimated := fallbackStepUsage(stepMessages, stepResult)
 536			a.updateSessionUsage(largeModel, &updatedSession, usage, a.openrouterCost(stepResult.ProviderMetadata), estimated)
 537			_, sessionErr := a.sessions.Save(ctx, updatedSession)
 538			if sessionErr != nil {
 539				return sessionErr
 540			}
 541			currentSession = updatedSession
 542			return a.messages.Update(genCtx, *currentAssistant)
 543		},
 544		StopWhen: []fantasy.StopCondition{
 545			func(_ []fantasy.StepResult) bool {
 546				cw := int64(largeModel.CatwalkCfg.ContextWindow)
 547				// If context window is unknown (0), skip auto-summarize
 548				// to avoid immediately truncating custom/local models.
 549				if cw == 0 {
 550					return false
 551				}
 552				tokens := currentSession.CompletionTokens + currentSession.PromptTokens
 553				remaining := cw - tokens
 554				var threshold int64
 555				if cw > largeContextWindowThreshold {
 556					threshold = largeContextWindowBuffer
 557				} else {
 558					threshold = int64(float64(cw) * smallContextWindowRatio)
 559				}
 560				if (remaining <= threshold) && !a.disableAutoSummarize {
 561					shouldSummarize = true
 562					return true
 563				}
 564				return false
 565			},
 566			func(steps []fantasy.StepResult) bool {
 567				return hasRepeatedToolCalls(steps, loopDetectionWindowSize, loopDetectionMaxRepeats)
 568			},
 569		},
 570	})
 571
 572	a.eventPromptResponded(call.SessionID, time.Since(startTime).Truncate(time.Second))
 573
 574	if err != nil {
 575		isHyper := largeModel.ModelCfg.Provider == hyper.Name
 576		isCancelErr := errors.Is(err, context.Canceled)
 577		if currentAssistant == nil {
 578			return result, err
 579		}
 580		// Ensure we finish thinking on error to close the reasoning state.
 581		currentAssistant.FinishThinking()
 582		toolCalls := currentAssistant.ToolCalls()
 583		// INFO: we use the parent context here because the genCtx has been cancelled.
 584		msgs, createErr := a.messages.List(ctx, currentAssistant.SessionID)
 585		if createErr != nil {
 586			return nil, createErr
 587		}
 588		for _, tc := range toolCalls {
 589			if !tc.Finished {
 590				tc.Finished = true
 591				tc.Input = "{}"
 592				currentAssistant.AddToolCall(tc)
 593				updateErr := a.messages.Update(ctx, *currentAssistant)
 594				if updateErr != nil {
 595					return nil, updateErr
 596				}
 597			}
 598
 599			found := false
 600			for _, msg := range msgs {
 601				if msg.Role == message.Tool {
 602					for _, tr := range msg.ToolResults() {
 603						if tr.ToolCallID == tc.ID {
 604							found = true
 605							break
 606						}
 607					}
 608				}
 609				if found {
 610					break
 611				}
 612			}
 613			if found {
 614				continue
 615			}
 616			content := "There was an error while executing the tool"
 617			if isCancelErr {
 618				content = "Error: user cancelled assistant tool calling"
 619			}
 620			toolResult := message.ToolResult{
 621				ToolCallID: tc.ID,
 622				Name:       tc.Name,
 623				Content:    content,
 624				IsError:    true,
 625			}
 626			_, createErr = a.messages.Create(ctx, currentAssistant.SessionID, message.CreateMessageParams{
 627				Role: message.Tool,
 628				Parts: []message.ContentPart{
 629					toolResult,
 630				},
 631			})
 632			if createErr != nil {
 633				return nil, createErr
 634			}
 635		}
 636		var fantasyErr *fantasy.Error
 637		var providerErr *fantasy.ProviderError
 638		const defaultTitle = "Provider Error"
 639		linkStyle := lipgloss.NewStyle().Foreground(charmtone.Guac).Underline(true)
 640		if isCancelErr {
 641			currentAssistant.AddFinish(message.FinishReasonCanceled, "User canceled request", "")
 642		} else if isHyper && errors.As(err, &providerErr) && providerErr.StatusCode == http.StatusUnauthorized {
 643			currentAssistant.AddFinish(message.FinishReasonError, "Unauthorized", `Please re-authenticate with Hyper. You can also run "crush auth" to re-authenticate.`)
 644		} else if isHyper && errors.As(err, &providerErr) && providerErr.StatusCode == http.StatusPaymentRequired {
 645			url := hyper.BaseURL()
 646			link := linkStyle.Hyperlink(url, "id=hyper").Render(url)
 647			currentAssistant.AddFinish(message.FinishReasonError, "No credits", "You're out of credits. Add more at "+link)
 648		} else if errors.As(err, &providerErr) {
 649			if providerErr.Message == "The requested model is not supported." {
 650				url := "https://github.com/settings/copilot/features"
 651				link := linkStyle.Hyperlink(url, "id=copilot").Render(url)
 652				currentAssistant.AddFinish(
 653					message.FinishReasonError,
 654					"Copilot model not enabled",
 655					fmt.Sprintf("%q is not enabled in Copilot. Go to the following page to enable it. Then, wait 5 minutes before trying again. %s", largeModel.CatwalkCfg.Name, link),
 656				)
 657			} else {
 658				currentAssistant.AddFinish(message.FinishReasonError, cmp.Or(stringext.Capitalize(providerErr.Title), defaultTitle), providerErr.Message)
 659			}
 660		} else if errors.As(err, &fantasyErr) {
 661			currentAssistant.AddFinish(message.FinishReasonError, cmp.Or(stringext.Capitalize(fantasyErr.Title), defaultTitle), fantasyErr.Message)
 662		} else {
 663			currentAssistant.AddFinish(message.FinishReasonError, defaultTitle, err.Error())
 664		}
 665		// Note: we use the parent context here because the genCtx has been
 666		// cancelled.
 667		updateErr := a.messages.Update(ctx, *currentAssistant)
 668		if updateErr != nil {
 669			return nil, updateErr
 670		}
 671		return nil, err
 672	}
 673
 674	if shouldSummarize {
 675		a.activeRequests.Del(call.SessionID)
 676		if summarizeErr := a.Summarize(genCtx, call.SessionID, call.ProviderOptions); summarizeErr != nil {
 677			return nil, summarizeErr
 678		}
 679		// If the agent wasn't done...
 680		if len(currentAssistant.ToolCalls()) > 0 {
 681			existing, ok := a.messageQueue.Get(call.SessionID)
 682			if !ok {
 683				existing = []SessionAgentCall{}
 684			}
 685			call.Prompt = fmt.Sprintf("The previous session was interrupted because it got too long, the initial user request was: `%s`", call.Prompt)
 686			existing = append(existing, call)
 687			a.messageQueue.Set(call.SessionID, existing)
 688		}
 689	}
 690
 691	// Release active request before publishing the notification.
 692	// TUI handlers poll IsSessionBusy() and only re-evaluate when a
 693	// tea.Msg arrives, so the cleanup must precede the notify or
 694	// subscribers see stale busy state at the moment of receipt.
 695	a.activeRequests.Del(call.SessionID)
 696	cancel()
 697
 698	// Send notification that agent has finished its turn (skip for
 699	// nested/non-interactive sessions).
 700	if !call.NonInteractive && a.notify != nil {
 701		a.notify.Publish(pubsub.CreatedEvent, notify.Notification{
 702			SessionID:    call.SessionID,
 703			SessionTitle: currentSession.Title,
 704			Type:         notify.TypeAgentFinished,
 705		})
 706	}
 707
 708	queuedMessages, ok := a.messageQueue.Get(call.SessionID)
 709	if !ok || len(queuedMessages) == 0 {
 710		return result, err
 711	}
 712	// There are queued messages restart the loop. The recursive Run
 713	// publishes its own RunComplete for the queued prompt, so suppress
 714	// the outer defer's emit to avoid a duplicate event whose Error
 715	// field would belong to the recursive turn but whose MessageID/Text
 716	// would belong to the outer turn.
 717	skipRunComplete = true
 718	firstQueuedMessage := queuedMessages[0]
 719	a.messageQueue.Set(call.SessionID, queuedMessages[1:])
 720	return a.Run(ctx, firstQueuedMessage)
 721}
 722
 723func (a *sessionAgent) Summarize(ctx context.Context, sessionID string, opts fantasy.ProviderOptions) error {
 724	if a.IsSessionBusy(sessionID) {
 725		return ErrSessionBusy
 726	}
 727
 728	// Copy mutable fields under lock to avoid races with SetModels.
 729	largeModel := a.largeModel.Get()
 730	systemPromptPrefix := a.systemPromptPrefix.Get()
 731
 732	currentSession, err := a.sessions.Get(ctx, sessionID)
 733	if err != nil {
 734		return fmt.Errorf("failed to get session: %w", err)
 735	}
 736	msgs, err := a.getSessionMessages(ctx, currentSession)
 737	if err != nil {
 738		return err
 739	}
 740	if len(msgs) == 0 {
 741		// Nothing to summarize.
 742		return nil
 743	}
 744
 745	aiMsgs, _ := a.preparePrompt(msgs, largeModel.CatwalkCfg.SupportsImages)
 746
 747	genCtx, cancel := context.WithCancel(ctx)
 748	a.activeRequests.Set(sessionID, cancel)
 749	defer a.activeRequests.Del(sessionID)
 750	defer cancel()
 751	defer func() {
 752		if flushErr := a.messages.FlushAll(ctx); flushErr != nil {
 753			slog.Error("Failed to flush pending message updates after summarize", "error", flushErr)
 754		}
 755	}()
 756
 757	agent := fantasy.NewAgent(
 758		largeModel.Model,
 759		fantasy.WithSystemPrompt(string(summaryPrompt)),
 760		fantasy.WithUserAgent(userAgent),
 761	)
 762	summaryMessage, err := a.messages.Create(ctx, sessionID, message.CreateMessageParams{
 763		Role:             message.Assistant,
 764		Model:            largeModel.ModelCfg.Model,
 765		Provider:         largeModel.ModelCfg.Provider,
 766		IsSummaryMessage: true,
 767	})
 768	if err != nil {
 769		return err
 770	}
 771
 772	summaryPromptText := buildSummaryPrompt(currentSession.Todos)
 773
 774	resp, err := agent.Stream(genCtx, fantasy.AgentStreamCall{
 775		Prompt:          summaryPromptText,
 776		Messages:        aiMsgs,
 777		ProviderOptions: opts,
 778		PrepareStep: func(callContext context.Context, options fantasy.PrepareStepFunctionOptions) (_ context.Context, prepared fantasy.PrepareStepResult, err error) {
 779			prepared.Messages = options.Messages
 780			if systemPromptPrefix != "" {
 781				prepared.Messages = append([]fantasy.Message{fantasy.NewSystemMessage(systemPromptPrefix)}, prepared.Messages...)
 782			}
 783			return callContext, prepared, nil
 784		},
 785		OnReasoningDelta: func(id string, text string) error {
 786			summaryMessage.AppendReasoningContent(text)
 787			return a.messages.Update(genCtx, summaryMessage)
 788		},
 789		OnReasoningEnd: func(id string, reasoning fantasy.ReasoningContent) error {
 790			// Handle anthropic signature.
 791			if anthropicData, ok := reasoning.ProviderMetadata["anthropic"]; ok {
 792				if signature, ok := anthropicData.(*anthropic.ReasoningOptionMetadata); ok && signature.Signature != "" {
 793					summaryMessage.AppendReasoningSignature(signature.Signature)
 794				}
 795			}
 796			summaryMessage.FinishThinking()
 797			return a.messages.Update(genCtx, summaryMessage)
 798		},
 799		OnTextDelta: func(id, text string) error {
 800			summaryMessage.AppendContent(text)
 801			return a.messages.Update(genCtx, summaryMessage)
 802		},
 803	})
 804	if err != nil {
 805		isCancelErr := errors.Is(err, context.Canceled)
 806		if isCancelErr {
 807			// User cancelled summarize we need to remove the summary message.
 808			deleteErr := a.messages.Delete(ctx, summaryMessage.ID)
 809			return deleteErr
 810		}
 811		// Mark the summary message as finished with an error so the UI
 812		// stops spinning.
 813		summaryMessage.AddFinish(message.FinishReasonError, "Summarization Error", err.Error())
 814		if updateErr := a.messages.Update(ctx, summaryMessage); updateErr != nil {
 815			return updateErr
 816		}
 817		return err
 818	}
 819
 820	summaryMessage.AddFinish(message.FinishReasonEndTurn, "", "")
 821	err = a.messages.Update(genCtx, summaryMessage)
 822	if err != nil {
 823		return err
 824	}
 825
 826	var openrouterCost *float64
 827	for _, step := range resp.Steps {
 828		stepCost := a.openrouterCost(step.ProviderMetadata)
 829		if stepCost != nil {
 830			newCost := *stepCost
 831			if openrouterCost != nil {
 832				newCost += *openrouterCost
 833			}
 834			openrouterCost = &newCost
 835		}
 836	}
 837
 838	a.updateSessionUsage(largeModel, &currentSession, resp.TotalUsage, openrouterCost, false)
 839
 840	// Just in case, get just the last usage info.
 841	usage := resp.Response.Usage
 842	currentSession.SummaryMessageID = summaryMessage.ID
 843	currentSession.CompletionTokens = summaryCompletionTokens(usage, summaryMessage)
 844	currentSession.PromptTokens = 0
 845	currentSession.EstimatedUsage = usageIsZero(usage)
 846	_, err = a.sessions.Save(genCtx, currentSession)
 847	if err != nil {
 848		return err
 849	}
 850
 851	// Release the active request before processing queued messages so that
 852	// Run() does not see the session as busy.
 853	a.activeRequests.Del(sessionID)
 854	cancel()
 855
 856	// Process any messages that were queued while summarizing.
 857	queuedMessages, ok := a.messageQueue.Get(sessionID)
 858	if !ok || len(queuedMessages) == 0 {
 859		return nil
 860	}
 861	firstQueuedMessage := queuedMessages[0]
 862	a.messageQueue.Set(sessionID, queuedMessages[1:])
 863	_, qErr := a.Run(ctx, firstQueuedMessage)
 864	return qErr
 865}
 866
 867func (a *sessionAgent) getCacheControlOptions() fantasy.ProviderOptions {
 868	if t, _ := strconv.ParseBool(os.Getenv("CRUSH_DISABLE_ANTHROPIC_CACHE")); t {
 869		return fantasy.ProviderOptions{}
 870	}
 871	return fantasy.ProviderOptions{
 872		anthropic.Name: &anthropic.ProviderCacheControlOptions{
 873			CacheControl: anthropic.CacheControl{Type: "ephemeral"},
 874		},
 875		bedrock.Name: &anthropic.ProviderCacheControlOptions{
 876			CacheControl: anthropic.CacheControl{Type: "ephemeral"},
 877		},
 878		vercel.Name: &anthropic.ProviderCacheControlOptions{
 879			CacheControl: anthropic.CacheControl{Type: "ephemeral"},
 880		},
 881	}
 882}
 883
 884func (a *sessionAgent) createUserMessage(ctx context.Context, call SessionAgentCall) (message.Message, error) {
 885	parts := []message.ContentPart{message.TextContent{Text: call.Prompt}}
 886	var attachmentParts []message.ContentPart
 887	for _, attachment := range call.Attachments {
 888		attachmentParts = append(attachmentParts, message.BinaryContent{Path: attachment.FilePath, MIMEType: attachment.MimeType, Data: attachment.Content})
 889	}
 890	parts = append(parts, attachmentParts...)
 891	msg, err := a.messages.Create(ctx, call.SessionID, message.CreateMessageParams{
 892		Role:  message.User,
 893		Parts: parts,
 894	})
 895	if err != nil {
 896		return message.Message{}, fmt.Errorf("failed to create user message: %w", err)
 897	}
 898	return msg, nil
 899}
 900
 901func (a *sessionAgent) preparePrompt(msgs []message.Message, supportsImages bool, attachments ...message.Attachment) ([]fantasy.Message, []fantasy.FilePart) {
 902	var history []fantasy.Message
 903	if !a.isSubAgent {
 904		history = append(history, fantasy.NewUserMessage(
 905			fmt.Sprintf(
 906				"<system_reminder>%s</system_reminder>",
 907				`This is a reminder that your todo list is currently empty. DO NOT mention this to the user explicitly because they are already aware.
 908If you are working on tasks that would benefit from a todo list please use the "todos" tool to create one.
 909If not, please feel free to ignore. Again do not mention this message to the user.`,
 910			),
 911		))
 912	}
 913	// Collect all tool call IDs present in assistant messages and all tool
 914	// result IDs present in tool messages. This lets us detect both orphaned
 915	// tool results (result without a call) and orphaned tool calls (call
 916	// without a result).
 917	knownToolCallIDs := make(map[string]struct{})
 918	knownToolResultIDs := make(map[string]struct{})
 919	for _, m := range msgs {
 920		switch m.Role {
 921		case message.Assistant:
 922			for _, tc := range m.ToolCalls() {
 923				knownToolCallIDs[tc.ID] = struct{}{}
 924			}
 925		case message.Tool:
 926			for _, tr := range m.ToolResults() {
 927				knownToolResultIDs[tr.ToolCallID] = struct{}{}
 928			}
 929		}
 930	}
 931
 932	for _, m := range msgs {
 933		if len(m.Parts) == 0 {
 934			continue
 935		}
 936		// Assistant message without content or tool calls (cancelled before it returned anything).
 937		if m.Role == message.Assistant && len(m.ToolCalls()) == 0 && m.Content().Text == "" && m.ReasoningContent().String() == "" {
 938			continue
 939		}
 940		if m.Role == message.Tool {
 941			if msg, ok := filterOrphanedToolResults(m, knownToolCallIDs); ok {
 942				history = append(history, msg)
 943			}
 944			continue
 945		}
 946		aiMsgs := m.ToAIMessage()
 947		if !supportsImages {
 948			for i := range aiMsgs {
 949				if aiMsgs[i].Role == fantasy.MessageRoleUser {
 950					aiMsgs[i].Content = filterFileParts(aiMsgs[i].Content)
 951				}
 952			}
 953		}
 954		history = append(history, aiMsgs...)
 955
 956		if m.Role == message.Assistant {
 957			if msg, ok := syntheticToolResultsForOrphanedCalls(m, knownToolResultIDs); ok {
 958				history = append(history, msg)
 959			}
 960		}
 961	}
 962
 963	var files []fantasy.FilePart
 964	for _, attachment := range attachments {
 965		if attachment.IsText() {
 966			continue
 967		}
 968		files = append(files, fantasy.FilePart{
 969			Filename:  attachment.FileName,
 970			Data:      attachment.Content,
 971			MediaType: attachment.MimeType,
 972		})
 973	}
 974
 975	return history, files
 976}
 977
 978// filterFileParts removes fantasy.FilePart entries from a slice of message
 979// parts. Used to strip image attachments from historical user messages when
 980// the current model does not support them.
 981func filterFileParts(parts []fantasy.MessagePart) []fantasy.MessagePart {
 982	filtered := make([]fantasy.MessagePart, 0, len(parts))
 983	for _, part := range parts {
 984		if _, ok := fantasy.AsMessagePart[fantasy.FilePart](part); ok {
 985			continue
 986		}
 987		filtered = append(filtered, part)
 988	}
 989	return filtered
 990}
 991
 992// filterOrphanedToolResults converts a tool message to a fantasy.Message,
 993// dropping any tool result parts whose tool_call_id has no matching tool call
 994// in the known set. An orphaned result causes API validation to fail on every
 995// subsequent turn, permanently locking the session. Returns the filtered
 996// message and true if at least one valid part remains.
 997func filterOrphanedToolResults(m message.Message, knownToolCallIDs map[string]struct{}) (fantasy.Message, bool) {
 998	aiMsgs := m.ToAIMessage()
 999	if len(aiMsgs) == 0 {
1000		return fantasy.Message{}, false
1001	}
1002	var validParts []fantasy.MessagePart
1003	for _, part := range aiMsgs[0].Content {
1004		tr, ok := fantasy.AsMessagePart[fantasy.ToolResultPart](part)
1005		if !ok {
1006			validParts = append(validParts, part)
1007			continue
1008		}
1009		if _, known := knownToolCallIDs[tr.ToolCallID]; known {
1010			validParts = append(validParts, part)
1011		} else {
1012			slog.Warn(
1013				"Dropping orphaned tool result with no matching tool call",
1014				"tool_call_id", tr.ToolCallID,
1015			)
1016		}
1017	}
1018	if len(validParts) == 0 {
1019		return fantasy.Message{}, false
1020	}
1021	msg := aiMsgs[0]
1022	msg.Content = validParts
1023	return msg, true
1024}
1025
1026// syntheticToolResultsForOrphanedCalls returns a tool message containing
1027// synthetic tool results for any tool calls in the assistant message that
1028// have no matching result in knownToolResultIDs. LLM APIs require every
1029// tool_use to be immediately followed by a tool_result; an interrupted
1030// session can leave orphaned tool_use blocks that permanently lock the
1031// conversation. Returns the message and true if any synthetic results were
1032// produced.
1033func syntheticToolResultsForOrphanedCalls(m message.Message, knownToolResultIDs map[string]struct{}) (fantasy.Message, bool) {
1034	var syntheticParts []fantasy.MessagePart
1035	for _, tc := range m.ToolCalls() {
1036		if _, hasResult := knownToolResultIDs[tc.ID]; hasResult {
1037			continue
1038		}
1039		slog.Warn(
1040			"Injecting synthetic tool result for orphaned tool call",
1041			"tool_call_id", tc.ID,
1042			"tool_name", tc.Name,
1043		)
1044		syntheticParts = append(syntheticParts, fantasy.ToolResultPart{
1045			ToolCallID: tc.ID,
1046			Output: fantasy.ToolResultOutputContentError{
1047				Error: errors.New("tool call was interrupted and did not produce a result, you may retry this call if the result is still needed"),
1048			},
1049		})
1050	}
1051	if len(syntheticParts) == 0 {
1052		return fantasy.Message{}, false
1053	}
1054	return fantasy.Message{
1055		Role:    fantasy.MessageRoleTool,
1056		Content: syntheticParts,
1057	}, true
1058}
1059
1060func (a *sessionAgent) getSessionMessages(ctx context.Context, session session.Session) ([]message.Message, error) {
1061	msgs, err := a.messages.List(ctx, session.ID)
1062	if err != nil {
1063		return nil, fmt.Errorf("failed to list messages: %w", err)
1064	}
1065
1066	if session.SummaryMessageID != "" {
1067		summaryMsgIndex := -1
1068		for i, msg := range msgs {
1069			if msg.ID == session.SummaryMessageID {
1070				summaryMsgIndex = i
1071				break
1072			}
1073		}
1074		if summaryMsgIndex != -1 {
1075			msgs = msgs[summaryMsgIndex:]
1076			msgs[0].Role = message.User
1077		}
1078	}
1079	return msgs, nil
1080}
1081
1082// generateTitle generates a session titled based on the initial prompt.
1083func (a *sessionAgent) generateTitle(ctx context.Context, sessionID string, userPrompt string) {
1084	if userPrompt == "" {
1085		return
1086	}
1087
1088	smallModel := a.smallModel.Get()
1089	largeModel := a.largeModel.Get()
1090	systemPromptPrefix := a.systemPromptPrefix.Get()
1091
1092	var maxOutputTokens int64 = 40
1093	if smallModel.CatwalkCfg.CanReason {
1094		maxOutputTokens = smallModel.CatwalkCfg.DefaultMaxTokens
1095	}
1096
1097	newAgent := func(m fantasy.LanguageModel, p []byte, tok int64) fantasy.Agent {
1098		return fantasy.NewAgent(
1099			m,
1100			fantasy.WithSystemPrompt(string(p)+"\n /no_think"),
1101			fantasy.WithMaxOutputTokens(tok),
1102			fantasy.WithUserAgent(userAgent),
1103		)
1104	}
1105
1106	streamCall := fantasy.AgentStreamCall{
1107		Prompt: fmt.Sprintf("Generate a concise title for the following content:\n\n%s\n <think>\n\n</think>", userPrompt),
1108		PrepareStep: func(callCtx context.Context, opts fantasy.PrepareStepFunctionOptions) (_ context.Context, prepared fantasy.PrepareStepResult, err error) {
1109			prepared.Messages = opts.Messages
1110			if systemPromptPrefix != "" {
1111				prepared.Messages = append([]fantasy.Message{
1112					fantasy.NewSystemMessage(systemPromptPrefix),
1113				}, prepared.Messages...)
1114			}
1115			return callCtx, prepared, nil
1116		},
1117	}
1118
1119	// Use the small model to generate the title.
1120	model := smallModel
1121	agent := newAgent(model.Model, titlePrompt, maxOutputTokens)
1122	resp, err := agent.Stream(ctx, streamCall)
1123	if err == nil {
1124		// We successfully generated a title with the small model.
1125		slog.Debug("Generated title with small model")
1126	} else {
1127		// It didn't work. Let's try with the big model.
1128		slog.Error("Error generating title with small model; trying big model", "err", err)
1129		model = largeModel
1130		agent = newAgent(model.Model, titlePrompt, maxOutputTokens)
1131		resp, err = agent.Stream(ctx, streamCall)
1132		if err == nil {
1133			slog.Debug("Generated title with large model")
1134		} else {
1135			// Welp, the large model didn't work either. Use the default
1136			// session name and return.
1137			slog.Error("Error generating title with large model", "err", err)
1138			saveErr := a.sessions.Rename(ctx, sessionID, DefaultSessionName)
1139			if saveErr != nil {
1140				slog.Error("Failed to save session title", "error", saveErr)
1141			}
1142			return
1143		}
1144	}
1145
1146	if resp == nil {
1147		// Actually, we didn't get a response so we can't. Use the default
1148		// session name and return.
1149		slog.Error("Response is nil; can't generate title")
1150		saveErr := a.sessions.Rename(ctx, sessionID, DefaultSessionName)
1151		if saveErr != nil {
1152			slog.Error("Failed to save session title", "error", saveErr)
1153		}
1154		return
1155	}
1156
1157	// Clean up title.
1158	var title string
1159	title = strings.ReplaceAll(resp.Response.Content.Text(), "\n", " ")
1160
1161	// Remove thinking tags if present.
1162	title = thinkTagRegex.ReplaceAllString(title, "")
1163	title = orphanThinkTagRegex.ReplaceAllString(title, "")
1164
1165	title = strings.TrimSpace(title)
1166	title = cmp.Or(title, DefaultSessionName)
1167
1168	// Calculate usage and cost.
1169	var openrouterCost *float64
1170	for _, step := range resp.Steps {
1171		stepCost := a.openrouterCost(step.ProviderMetadata)
1172		if stepCost != nil {
1173			newCost := *stepCost
1174			if openrouterCost != nil {
1175				newCost += *openrouterCost
1176			}
1177			openrouterCost = &newCost
1178		}
1179	}
1180
1181	modelConfig := model.CatwalkCfg
1182	cost := modelConfig.CostPer1MInCached/1e6*float64(resp.TotalUsage.CacheCreationTokens) +
1183		modelConfig.CostPer1MOutCached/1e6*float64(resp.TotalUsage.CacheReadTokens) +
1184		modelConfig.CostPer1MIn/1e6*float64(resp.TotalUsage.InputTokens) +
1185		modelConfig.CostPer1MOut/1e6*float64(resp.TotalUsage.OutputTokens)
1186
1187	// Use override cost if available (e.g., from OpenRouter).
1188	if openrouterCost != nil {
1189		cost = *openrouterCost
1190	}
1191
1192	// Skip cost accumulation
1193	if model.FlatRate {
1194		cost = 0
1195	}
1196
1197	promptTokens := resp.TotalUsage.InputTokens + resp.TotalUsage.CacheCreationTokens
1198	completionTokens := resp.TotalUsage.OutputTokens
1199
1200	// Atomically update only title and usage fields to avoid overriding other
1201	// concurrent session updates.
1202	saveErr := a.sessions.UpdateTitleAndUsage(ctx, sessionID, title, promptTokens, completionTokens, cost)
1203	if saveErr != nil {
1204		slog.Error("Failed to save session title and usage", "error", saveErr)
1205		return
1206	}
1207}
1208
1209func (a *sessionAgent) openrouterCost(metadata fantasy.ProviderMetadata) *float64 {
1210	openrouterMetadata, ok := metadata[openrouter.Name]
1211	if !ok {
1212		return nil
1213	}
1214
1215	opts, ok := openrouterMetadata.(*openrouter.ProviderMetadata)
1216	if !ok {
1217		return nil
1218	}
1219	return &opts.Usage.Cost
1220}
1221
1222func (a *sessionAgent) updateSessionUsage(model Model, session *session.Session, usage fantasy.Usage, overrideCost *float64, estimated bool) {
1223	if !usageIsZero(usage) {
1224		session.EstimatedUsage = estimated
1225	}
1226
1227	modelConfig := model.CatwalkCfg
1228	cost := modelConfig.CostPer1MInCached/1e6*float64(usage.CacheCreationTokens) +
1229		modelConfig.CostPer1MOutCached/1e6*float64(usage.CacheReadTokens) +
1230		modelConfig.CostPer1MIn/1e6*float64(usage.InputTokens) +
1231		modelConfig.CostPer1MOut/1e6*float64(usage.OutputTokens)
1232
1233	if !estimated {
1234		a.eventTokensUsed(session.ID, model, usage, cost)
1235	}
1236
1237	if estimated {
1238		cost = 0
1239	} else {
1240		// Use override cost if available (e.g., from OpenRouter).
1241		if overrideCost != nil {
1242			cost = *overrideCost
1243		}
1244
1245		// Skip cost accumulation
1246		if model.FlatRate {
1247			cost = 0
1248		}
1249	}
1250
1251	session.Cost += cost
1252	updateSessionTokenCounters(session, usage)
1253}
1254
1255func updateSessionTokenCounters(session *session.Session, usage fantasy.Usage) {
1256	if usage.OutputTokens != 0 {
1257		session.CompletionTokens = usage.OutputTokens
1258	}
1259	if promptTokens := usage.InputTokens + usage.CacheReadTokens; promptTokens != 0 {
1260		session.PromptTokens = promptTokens
1261	}
1262}
1263
1264func summaryCompletionTokens(usage fantasy.Usage, summaryMessage message.Message) int64 {
1265	if usage.OutputTokens != 0 {
1266		return usage.OutputTokens
1267	}
1268	return approxTokenCount(summaryMessage.Content().Text) + approxTokenCount(summaryMessage.ReasoningContent().String())
1269}
1270
1271func (a *sessionAgent) Cancel(sessionID string) {
1272	// Cancel regular requests. Don't use Take() here - we need the entry to
1273	// remain in activeRequests so IsBusy() returns true until the goroutine
1274	// fully completes (including error handling that may access the DB).
1275	// The defer in processRequest will clean up the entry.
1276	if cancel, ok := a.activeRequests.Get(sessionID); ok && cancel != nil {
1277		slog.Debug("Request cancellation initiated", "session_id", sessionID)
1278		cancel()
1279	}
1280
1281	// Also check for summarize requests.
1282	if cancel, ok := a.activeRequests.Get(sessionID + "-summarize"); ok && cancel != nil {
1283		slog.Debug("Summarize cancellation initiated", "session_id", sessionID)
1284		cancel()
1285	}
1286
1287	if a.QueuedPrompts(sessionID) > 0 {
1288		slog.Debug("Clearing queued prompts", "session_id", sessionID)
1289		a.messageQueue.Del(sessionID)
1290	}
1291}
1292
1293func (a *sessionAgent) ClearQueue(sessionID string) {
1294	if a.QueuedPrompts(sessionID) > 0 {
1295		slog.Debug("Clearing queued prompts", "session_id", sessionID)
1296		a.messageQueue.Del(sessionID)
1297	}
1298}
1299
1300func (a *sessionAgent) CancelAll() {
1301	if !a.IsBusy() {
1302		return
1303	}
1304	for key := range a.activeRequests.Seq2() {
1305		a.Cancel(key) // key is sessionID
1306	}
1307
1308	timeout := time.After(5 * time.Second)
1309	for a.IsBusy() {
1310		select {
1311		case <-timeout:
1312			return
1313		default:
1314			time.Sleep(200 * time.Millisecond)
1315		}
1316	}
1317}
1318
1319func (a *sessionAgent) IsBusy() bool {
1320	var busy bool
1321	for cancelFunc := range a.activeRequests.Seq() {
1322		if cancelFunc != nil {
1323			busy = true
1324			break
1325		}
1326	}
1327	return busy
1328}
1329
1330func (a *sessionAgent) IsSessionBusy(sessionID string) bool {
1331	_, busy := a.activeRequests.Get(sessionID)
1332	return busy
1333}
1334
1335func (a *sessionAgent) QueuedPrompts(sessionID string) int {
1336	l, ok := a.messageQueue.Get(sessionID)
1337	if !ok {
1338		return 0
1339	}
1340	return len(l)
1341}
1342
1343func (a *sessionAgent) QueuedPromptsList(sessionID string) []string {
1344	l, ok := a.messageQueue.Get(sessionID)
1345	if !ok {
1346		return nil
1347	}
1348	prompts := make([]string, len(l))
1349	for i, call := range l {
1350		prompts[i] = call.Prompt
1351	}
1352	return prompts
1353}
1354
1355func (a *sessionAgent) SetModels(large Model, small Model) {
1356	a.largeModel.Set(large)
1357	a.smallModel.Set(small)
1358}
1359
1360func (a *sessionAgent) SetTools(tools []fantasy.AgentTool) {
1361	a.tools.SetSlice(tools)
1362}
1363
1364func (a *sessionAgent) SetSystemPrompt(systemPrompt string) {
1365	a.systemPrompt.Set(systemPrompt)
1366}
1367
1368func (a *sessionAgent) Model() Model {
1369	return a.largeModel.Get()
1370}
1371
1372// convertToToolResult converts a fantasy tool result to a message tool result.
1373func (a *sessionAgent) convertToToolResult(result fantasy.ToolResultContent) message.ToolResult {
1374	baseResult := message.ToolResult{
1375		ToolCallID: result.ToolCallID,
1376		Name:       result.ToolName,
1377		Metadata:   result.ClientMetadata,
1378	}
1379
1380	switch result.Result.GetType() {
1381	case fantasy.ToolResultContentTypeText:
1382		if r, ok := fantasy.AsToolResultOutputType[fantasy.ToolResultOutputContentText](result.Result); ok {
1383			baseResult.Content = r.Text
1384		}
1385	case fantasy.ToolResultContentTypeError:
1386		if r, ok := fantasy.AsToolResultOutputType[fantasy.ToolResultOutputContentError](result.Result); ok {
1387			baseResult.Content = r.Error.Error()
1388			baseResult.IsError = true
1389		}
1390	case fantasy.ToolResultContentTypeMedia:
1391		if r, ok := fantasy.AsToolResultOutputType[fantasy.ToolResultOutputContentMedia](result.Result); ok {
1392			if !stringext.IsValidBase64(r.Data) {
1393				slog.Warn(
1394					"Tool returned media with invalid base64 data, discarding image",
1395					"tool", result.ToolName,
1396					"tool_call_id", result.ToolCallID,
1397				)
1398				baseResult.Content = "Tool returned image data with invalid encoding"
1399				baseResult.IsError = true
1400			} else {
1401				content := r.Text
1402				if content == "" {
1403					content = fmt.Sprintf("Loaded %s content", r.MediaType)
1404				}
1405				baseResult.Content = content
1406				baseResult.Data = r.Data
1407				baseResult.MIMEType = r.MediaType
1408			}
1409		}
1410	}
1411
1412	return baseResult
1413}
1414
1415// workaroundProviderMediaLimitations converts media content in tool results to
1416// user messages for providers that don't natively support images in tool results.
1417//
1418// Problem: OpenAI, Google, OpenRouter, and other OpenAI-compatible providers
1419// don't support sending images/media in tool result messages - they only accept
1420// text in tool results. However, they DO support images in user messages.
1421//
1422// If we send media in tool results to these providers, the API returns an error.
1423//
1424// Solution: For these providers, we:
1425//  1. Replace the media in the tool result with a text placeholder
1426//  2. Inject a user message immediately after with the image as a file attachment
1427//  3. This maintains the tool execution flow while working around API limitations
1428//
1429// Anthropic and Bedrock support images natively in tool results, so we skip
1430// this workaround for them.
1431//
1432// Example transformation:
1433//
1434//	BEFORE: [tool result: image data]
1435//	AFTER:  [tool result: "Image loaded - see attached"], [user: image attachment]
1436func (a *sessionAgent) workaroundProviderMediaLimitations(messages []fantasy.Message, largeModel Model) []fantasy.Message {
1437	providerSupportsMedia := largeModel.ModelCfg.Provider == string(catwalk.InferenceProviderAnthropic) ||
1438		largeModel.ModelCfg.Provider == string(catwalk.InferenceProviderBedrock) ||
1439		largeModel.ModelCfg.Provider == string(catwalk.InferenceProviderBedrockEurope)
1440
1441	if providerSupportsMedia {
1442		return messages
1443	}
1444
1445	convertedMessages := make([]fantasy.Message, 0, len(messages))
1446
1447	for _, msg := range messages {
1448		if msg.Role != fantasy.MessageRoleTool {
1449			convertedMessages = append(convertedMessages, msg)
1450			continue
1451		}
1452
1453		textParts := make([]fantasy.MessagePart, 0, len(msg.Content))
1454		var mediaFiles []fantasy.FilePart
1455
1456		for _, part := range msg.Content {
1457			toolResult, ok := fantasy.AsMessagePart[fantasy.ToolResultPart](part)
1458			if !ok {
1459				textParts = append(textParts, part)
1460				continue
1461			}
1462
1463			if media, ok := fantasy.AsToolResultOutputType[fantasy.ToolResultOutputContentMedia](toolResult.Output); ok {
1464				decoded, err := base64.StdEncoding.DecodeString(media.Data)
1465				if err != nil {
1466					slog.Warn("Failed to decode media data", "error", err)
1467					textParts = append(textParts, part)
1468					continue
1469				}
1470
1471				mediaFiles = append(mediaFiles, fantasy.FilePart{
1472					Data:      decoded,
1473					MediaType: media.MediaType,
1474					Filename:  fmt.Sprintf("tool-result-%s", toolResult.ToolCallID),
1475				})
1476
1477				textParts = append(textParts, fantasy.ToolResultPart{
1478					ToolCallID: toolResult.ToolCallID,
1479					Output: fantasy.ToolResultOutputContentText{
1480						Text: "[Image/media content loaded - see attached file]",
1481					},
1482					ProviderOptions: toolResult.ProviderOptions,
1483				})
1484			} else {
1485				textParts = append(textParts, part)
1486			}
1487		}
1488
1489		convertedMessages = append(convertedMessages, fantasy.Message{
1490			Role:    fantasy.MessageRoleTool,
1491			Content: textParts,
1492		})
1493
1494		if len(mediaFiles) > 0 {
1495			convertedMessages = append(convertedMessages, fantasy.NewUserMessage(
1496				"Here is the media content from the tool result:",
1497				mediaFiles...,
1498			))
1499		}
1500	}
1501
1502	return convertedMessages
1503}
1504
1505// buildSummaryPrompt constructs the prompt text for session summarization.
1506func buildSummaryPrompt(todos []session.Todo) string {
1507	var sb strings.Builder
1508	sb.WriteString("Provide a detailed summary of our conversation above.")
1509	if len(todos) > 0 {
1510		sb.WriteString("\n\n## Current Todo List\n\n")
1511		for _, t := range todos {
1512			fmt.Fprintf(&sb, "- [%s] %s\n", t.Status, t.Content)
1513		}
1514		sb.WriteString("\nInclude these tasks and their statuses in your summary. ")
1515		sb.WriteString("Instruct the resuming assistant to use the `todos` tool to continue tracking progress on these tasks.")
1516	}
1517	return sb.String()
1518}
1519
1520func providerRetryLogFields(err *fantasy.ProviderError, delay time.Duration) []any {
1521	fields := []any{
1522		"retry_delay", delay.String(),
1523	}
1524	if err == nil {
1525		return fields
1526	}
1527	fields = append(fields, "status_code", err.StatusCode)
1528	if err.Title != "" {
1529		fields = append(fields, "title", err.Title)
1530	}
1531	if err.Message != "" {
1532		fields = append(fields, "message", err.Message)
1533	}
1534	return fields
1535}