agent.go

   1// Package agent is the core orchestration layer for Crush AI agents.
   2//
   3// It provides session-based AI agent functionality for managing
   4// conversations, tool execution, and message handling. It coordinates
   5// interactions between language models, messages, sessions, and tools while
   6// handling features like automatic summarization, queuing, and token
   7// management.
   8package agent
   9
  10import (
  11	"cmp"
  12	"context"
  13	_ "embed"
  14	"encoding/base64"
  15	"errors"
  16	"fmt"
  17	"log/slog"
  18	"net/http"
  19	"os"
  20	"regexp"
  21	"strconv"
  22	"strings"
  23	"sync"
  24	"time"
  25
  26	"charm.land/catwalk/pkg/catwalk"
  27	"charm.land/fantasy"
  28	"charm.land/fantasy/providers/anthropic"
  29	"charm.land/fantasy/providers/bedrock"
  30	"charm.land/fantasy/providers/google"
  31	"charm.land/fantasy/providers/openai"
  32	"charm.land/fantasy/providers/openrouter"
  33	"charm.land/fantasy/providers/vercel"
  34	"charm.land/lipgloss/v2"
  35	"github.com/charmbracelet/crush/internal/agent/hyper"
  36	"github.com/charmbracelet/crush/internal/agent/notify"
  37	"github.com/charmbracelet/crush/internal/agent/tools"
  38	"github.com/charmbracelet/crush/internal/agent/tools/mcp"
  39	"github.com/charmbracelet/crush/internal/config"
  40	"github.com/charmbracelet/crush/internal/csync"
  41	"github.com/charmbracelet/crush/internal/message"
  42	"github.com/charmbracelet/crush/internal/pubsub"
  43	"github.com/charmbracelet/crush/internal/session"
  44	"github.com/charmbracelet/crush/internal/stringext"
  45	"github.com/charmbracelet/crush/internal/version"
  46	"github.com/charmbracelet/x/exp/charmtone"
  47)
  48
  49const (
  50	DefaultSessionName = "Untitled Session"
  51
  52	// Constants for auto-summarization thresholds
  53	largeContextWindowThreshold = 200_000
  54	largeContextWindowBuffer    = 20_000
  55	smallContextWindowRatio     = 0.2
  56)
  57
  58var userAgent = fmt.Sprintf("Charm-Crush/%s (https://charm.land/crush)", version.Version)
  59
  60//go:embed templates/title.md
  61var titlePrompt []byte
  62
  63//go:embed templates/summary.md
  64var summaryPrompt []byte
  65
  66// Used to remove <think> tags from generated titles.
  67var (
  68	thinkTagRegex       = regexp.MustCompile(`(?s)<think>.*?</think>`)
  69	orphanThinkTagRegex = regexp.MustCompile(`</?think>`)
  70)
  71
  72type SessionAgentCall struct {
  73	SessionID string
  74	// RunID, when non-empty, is the caller-supplied correlator that
  75	// gets echoed back on the notify.RunComplete event emitted for
  76	// this turn. It is preserved when the call is enqueued behind a
  77	// busy session so the queued turn's terminal event is still
  78	// recognisable to the original caller. Callers that need a
  79	// reliable completion contract (e.g. `crush run` against a
  80	// session that may be busy) MUST set it; SessionID alone is
  81	// ambiguous when concurrent turns share the same session.
  82	RunID            string
  83	Prompt           string
  84	ProviderOptions  fantasy.ProviderOptions
  85	Attachments      []message.Attachment
  86	MaxOutputTokens  int64
  87	Temperature      *float64
  88	TopP             *float64
  89	TopK             *int64
  90	FrequencyPenalty *float64
  91	PresencePenalty  *float64
  92	NonInteractive   bool
  93	// OnComplete, when non-nil, replaces the default RunComplete
  94	// publish path: the inner Run hands the terminal payload to this
  95	// callback instead of emitting it on the RunComplete broker. The
  96	// coordinator uses this hook to coalesce the unauthorized →
  97	// re-auth → retry chain into a single user-visible terminal
  98	// event, so non-interactive clients (e.g. `crush run`) don't
  99	// exit on a stale failed-attempt RunComplete before the
 100	// successful retry. It is intentionally stripped when queueing
 101	// a busy-session call (see Run): the originating
 102	// coordinator.Run has long returned by the time the queued
 103	// recursion drains, so falling back to the default broker
 104	// publish keeps the event visible to subscribers.
 105	OnComplete func(notify.RunComplete)
 106}
 107
 108type SessionAgent interface {
 109	Run(context.Context, SessionAgentCall) (*fantasy.AgentResult, error)
 110	SetModels(large Model, small Model)
 111	SetTools(tools []fantasy.AgentTool)
 112	SetSystemPrompt(systemPrompt string)
 113	Cancel(sessionID string)
 114	CancelAll()
 115	IsSessionBusy(sessionID string) bool
 116	IsBusy() bool
 117	QueuedPrompts(sessionID string) int
 118	QueuedPromptsList(sessionID string) []string
 119	ClearQueue(sessionID string)
 120	Summarize(context.Context, string, fantasy.ProviderOptions) error
 121	Model() Model
 122}
 123
 124type Model struct {
 125	Model      fantasy.LanguageModel
 126	CatwalkCfg catwalk.Model
 127	ModelCfg   config.SelectedModel
 128	FlatRate   bool
 129}
 130
 131type sessionAgent struct {
 132	largeModel         *csync.Value[Model]
 133	smallModel         *csync.Value[Model]
 134	systemPromptPrefix *csync.Value[string]
 135	systemPrompt       *csync.Value[string]
 136	tools              *csync.Slice[fantasy.AgentTool]
 137
 138	isSubAgent           bool
 139	sessions             session.Service
 140	messages             message.Service
 141	disableAutoSummarize bool
 142	isYolo               bool
 143	notify               pubsub.Publisher[notify.Notification]
 144	runComplete          pubsub.Publisher[notify.RunComplete]
 145
 146	messageQueue   *csync.Map[string, []SessionAgentCall]
 147	activeRequests *csync.Map[string, context.CancelFunc]
 148}
 149
 150type SessionAgentOptions struct {
 151	LargeModel           Model
 152	SmallModel           Model
 153	SystemPromptPrefix   string
 154	SystemPrompt         string
 155	IsSubAgent           bool
 156	DisableAutoSummarize bool
 157	IsYolo               bool
 158	Sessions             session.Service
 159	Messages             message.Service
 160	Tools                []fantasy.AgentTool
 161	Notify               pubsub.Publisher[notify.Notification]
 162	RunComplete          pubsub.Publisher[notify.RunComplete]
 163}
 164
 165func NewSessionAgent(
 166	opts SessionAgentOptions,
 167) SessionAgent {
 168	return &sessionAgent{
 169		largeModel:           csync.NewValue(opts.LargeModel),
 170		smallModel:           csync.NewValue(opts.SmallModel),
 171		systemPromptPrefix:   csync.NewValue(opts.SystemPromptPrefix),
 172		systemPrompt:         csync.NewValue(opts.SystemPrompt),
 173		isSubAgent:           opts.IsSubAgent,
 174		sessions:             opts.Sessions,
 175		messages:             opts.Messages,
 176		disableAutoSummarize: opts.DisableAutoSummarize,
 177		tools:                csync.NewSliceFrom(opts.Tools),
 178		isYolo:               opts.IsYolo,
 179		notify:               opts.Notify,
 180		runComplete:          opts.RunComplete,
 181		messageQueue:         csync.NewMap[string, []SessionAgentCall](),
 182		activeRequests:       csync.NewMap[string, context.CancelFunc](),
 183	}
 184}
 185
 186func (a *sessionAgent) Run(ctx context.Context, call SessionAgentCall) (result *fantasy.AgentResult, retErr error) {
 187	if call.Prompt == "" && !message.ContainsTextAttachment(call.Attachments) {
 188		return nil, ErrEmptyPrompt
 189	}
 190	if call.SessionID == "" {
 191		return nil, ErrSessionMissing
 192	}
 193
 194	// Queue the message if busy. Strip OnComplete: the caller that
 195	// supplied the hook (typically coordinator.Run) has its own
 196	// retry/coalesce scope that ends when it returns, so by the time
 197	// the queue drains nobody is left to consume the buffered
 198	// terminal event. The recursive Run will fall back to the
 199	// default broker publish, which is what existing subscribers
 200	// expect for queued turns.
 201	if a.IsSessionBusy(call.SessionID) {
 202		existing, ok := a.messageQueue.Get(call.SessionID)
 203		if !ok {
 204			existing = []SessionAgentCall{}
 205		}
 206		queued := call
 207		queued.OnComplete = nil
 208		existing = append(existing, queued)
 209		a.messageQueue.Set(call.SessionID, existing)
 210		return nil, nil
 211	}
 212
 213	// Copy mutable fields under lock to avoid races with SetTools/SetModels.
 214	agentTools := a.tools.Copy()
 215	largeModel := a.largeModel.Get()
 216	systemPrompt := a.systemPrompt.Get()
 217	promptPrefix := a.systemPromptPrefix.Get()
 218	var instructions strings.Builder
 219
 220	for _, server := range mcp.GetStates() {
 221		if server.State != mcp.StateConnected {
 222			continue
 223		}
 224		if s := server.Client.InitializeResult().Instructions; s != "" {
 225			instructions.WriteString(s)
 226			instructions.WriteString("\n\n")
 227		}
 228	}
 229
 230	if s := instructions.String(); s != "" {
 231		systemPrompt += "\n\n<mcp-instructions>\n" + s + "\n</mcp-instructions>"
 232	}
 233
 234	if len(agentTools) > 0 {
 235		// Add Anthropic caching to the last tool.
 236		agentTools[len(agentTools)-1].SetProviderOptions(a.getCacheControlOptions())
 237	}
 238
 239	agent := fantasy.NewAgent(
 240		largeModel.Model,
 241		fantasy.WithSystemPrompt(systemPrompt),
 242		fantasy.WithTools(agentTools...),
 243		fantasy.WithUserAgent(userAgent),
 244	)
 245
 246	sessionLock := sync.Mutex{}
 247	currentSession, err := a.sessions.Get(ctx, call.SessionID)
 248	if err != nil {
 249		return nil, fmt.Errorf("failed to get session: %w", err)
 250	}
 251
 252	msgs, err := a.getSessionMessages(ctx, currentSession)
 253	if err != nil {
 254		return nil, fmt.Errorf("failed to get session messages: %w", err)
 255	}
 256
 257	var wg sync.WaitGroup
 258	// Generate title if first message.
 259	if len(msgs) == 0 {
 260		titleCtx := ctx // Copy to avoid race with ctx reassignment below.
 261		wg.Go(func() {
 262			a.generateTitle(titleCtx, call.SessionID, call.Prompt)
 263		})
 264	}
 265	defer wg.Wait()
 266
 267	// Add the user message to the session.
 268	_, err = a.createUserMessage(ctx, call)
 269	if err != nil {
 270		return nil, err
 271	}
 272
 273	// Add the session to the context.
 274	ctx = context.WithValue(ctx, tools.SessionIDContextKey, call.SessionID)
 275
 276	genCtx, cancel := context.WithCancel(ctx)
 277	a.activeRequests.Set(call.SessionID, cancel)
 278
 279	defer cancel()
 280	defer a.activeRequests.Del(call.SessionID)
 281	// skipRunComplete is set just before the queued-recursion path so
 282	// the outer Run doesn't publish a RunComplete that would race
 283	// with — and be superseded by — the recursive call's own
 284	// RunComplete (each queued user prompt is its own turn and
 285	// publishes exactly one terminal event).
 286	var skipRunComplete bool
 287	// currentAssistant is declared here so the deferred RunComplete
 288	// publish below can capture the pointer that PrepareStep will
 289	// later (re)assign for each streaming step. The final assistant
 290	// message of the turn is the value reachable through this
 291	// pointer when the defer runs.
 292	var currentAssistant *message.Message
 293	// Drain any debounced message updates before returning. message.Service
 294	// already flushes synchronously on terminal updates, but a defer here
 295	// guarantees the contract at every Run exit (success, error, panic
 296	// recovery upstream) without callers needing to know.
 297	//
 298	// After the flush completes — meaning all per-message
 299	// Publish(UpdatedEvent) calls have fired and been buffered into
 300	// every subscriber's channel — publish the authoritative
 301	// RunComplete event for this turn. The flush-then-publish order
 302	// gives well-behaved clients the best chance of seeing the final
 303	// message event before RunComplete; the embedded Text field
 304	// reconciles for clients that observe the events out of order
 305	// (the pubsub broker fan-in does not serialize publishes from
 306	// different upstream brokers).
 307	defer func() {
 308		if flushErr := a.messages.FlushAll(ctx); flushErr != nil {
 309			slog.Error("Failed to flush pending message updates after run", "error", flushErr)
 310		}
 311		if skipRunComplete {
 312			return
 313		}
 314		complete := notify.RunComplete{SessionID: call.SessionID, RunID: call.RunID}
 315		if currentAssistant != nil {
 316			complete.MessageID = currentAssistant.ID
 317			complete.Text = currentAssistant.Content().String()
 318		}
 319		if retErr != nil {
 320			complete.Error = retErr.Error()
 321			complete.Cancelled = errors.Is(retErr, context.Canceled)
 322		} else if ctx.Err() != nil {
 323			complete.Cancelled = true
 324		}
 325		// Prefer the per-call hook when supplied so the coordinator
 326		// can coalesce retries (e.g. unauthorized → re-auth → retry)
 327		// into a single user-visible terminal event. The fallback
 328		// must-deliver publish applies bounded-blocking semantics to
 329		// the authoritative terminal event so a momentarily-full
 330		// subscriber channel can't silently drop it and hang
 331		// non-interactive clients waiting on RunComplete.
 332		if call.OnComplete != nil {
 333			call.OnComplete(complete)
 334			return
 335		}
 336		if a.runComplete == nil {
 337			return
 338		}
 339		a.runComplete.PublishMustDeliver(ctx, pubsub.UpdatedEvent, complete)
 340	}()
 341
 342	history, files := a.preparePrompt(msgs, largeModel.CatwalkCfg.SupportsImages, call.Attachments...)
 343
 344	startTime := time.Now()
 345	a.eventPromptSent(call.SessionID)
 346
 347	var stepMessages []fantasy.Message
 348	var shouldSummarize bool
 349	// Don't send MaxOutputTokens if 0 — some providers (e.g. LM Studio) reject it
 350	var maxOutputTokens *int64
 351	if call.MaxOutputTokens > 0 {
 352		maxOutputTokens = &call.MaxOutputTokens
 353	}
 354	result, err = agent.Stream(genCtx, fantasy.AgentStreamCall{
 355		Prompt:           message.PromptWithTextAttachments(call.Prompt, call.Attachments),
 356		Files:            files,
 357		Messages:         history,
 358		ProviderOptions:  call.ProviderOptions,
 359		MaxOutputTokens:  maxOutputTokens,
 360		TopP:             call.TopP,
 361		Temperature:      call.Temperature,
 362		PresencePenalty:  call.PresencePenalty,
 363		TopK:             call.TopK,
 364		FrequencyPenalty: call.FrequencyPenalty,
 365		PrepareStep: func(callContext context.Context, options fantasy.PrepareStepFunctionOptions) (_ context.Context, prepared fantasy.PrepareStepResult, err error) {
 366			prepared.Messages = options.Messages
 367			for i := range prepared.Messages {
 368				prepared.Messages[i].ProviderOptions = nil
 369			}
 370
 371			// Use latest tools (updated by SetTools when MCP tools change).
 372			prepared.Tools = a.tools.Copy()
 373
 374			queuedCalls, _ := a.messageQueue.Get(call.SessionID)
 375			a.messageQueue.Del(call.SessionID)
 376			for _, queued := range queuedCalls {
 377				userMessage, createErr := a.createUserMessage(callContext, queued)
 378				if createErr != nil {
 379					return callContext, prepared, createErr
 380				}
 381				prepared.Messages = append(prepared.Messages, userMessage.ToAIMessage()...)
 382			}
 383
 384			prepared.Messages = a.workaroundProviderMediaLimitations(prepared.Messages, largeModel)
 385
 386			lastSystemRoleInx := 0
 387			systemMessageUpdated := false
 388			for i, msg := range prepared.Messages {
 389				// Only add cache control to the last message.
 390				if msg.Role == fantasy.MessageRoleSystem {
 391					lastSystemRoleInx = i
 392				} else if !systemMessageUpdated {
 393					prepared.Messages[lastSystemRoleInx].ProviderOptions = a.getCacheControlOptions()
 394					systemMessageUpdated = true
 395				}
 396				// Than add cache control to the last 2 messages.
 397				if i > len(prepared.Messages)-3 {
 398					prepared.Messages[i].ProviderOptions = a.getCacheControlOptions()
 399				}
 400			}
 401
 402			if promptPrefix != "" {
 403				prepared.Messages = append([]fantasy.Message{fantasy.NewSystemMessage(promptPrefix)}, prepared.Messages...)
 404			}
 405
 406			sessionLock.Lock()
 407			stepMessages = cloneFantasyMessages(prepared.Messages)
 408			sessionLock.Unlock()
 409
 410			var assistantMsg message.Message
 411			assistantMsg, err = a.messages.Create(callContext, call.SessionID, message.CreateMessageParams{
 412				Role:     message.Assistant,
 413				Parts:    []message.ContentPart{},
 414				Model:    largeModel.ModelCfg.Model,
 415				Provider: largeModel.ModelCfg.Provider,
 416			})
 417			if err != nil {
 418				return callContext, prepared, err
 419			}
 420			callContext = context.WithValue(callContext, tools.MessageIDContextKey, assistantMsg.ID)
 421			callContext = context.WithValue(callContext, tools.SupportsImagesContextKey, largeModel.CatwalkCfg.SupportsImages)
 422			callContext = context.WithValue(callContext, tools.ModelNameContextKey, largeModel.CatwalkCfg.Name)
 423			currentAssistant = &assistantMsg
 424			return callContext, prepared, err
 425		},
 426		OnReasoningStart: func(id string, reasoning fantasy.ReasoningContent) error {
 427			currentAssistant.AppendReasoningContent(reasoning.Text)
 428			return a.messages.Update(genCtx, *currentAssistant)
 429		},
 430		OnReasoningDelta: func(id string, text string) error {
 431			currentAssistant.AppendReasoningContent(text)
 432			return a.messages.Update(genCtx, *currentAssistant)
 433		},
 434		OnReasoningEnd: func(id string, reasoning fantasy.ReasoningContent) error {
 435			// handle anthropic signature
 436			if anthropicData, ok := reasoning.ProviderMetadata[anthropic.Name]; ok {
 437				if reasoning, ok := anthropicData.(*anthropic.ReasoningOptionMetadata); ok {
 438					currentAssistant.AppendReasoningSignature(reasoning.Signature)
 439				}
 440			}
 441			if googleData, ok := reasoning.ProviderMetadata[google.Name]; ok {
 442				if reasoning, ok := googleData.(*google.ReasoningMetadata); ok {
 443					currentAssistant.AppendThoughtSignature(reasoning.Signature, reasoning.ToolID)
 444				}
 445			}
 446			if openaiData, ok := reasoning.ProviderMetadata[openai.Name]; ok {
 447				if reasoning, ok := openaiData.(*openai.ResponsesReasoningMetadata); ok {
 448					currentAssistant.SetReasoningResponsesData(reasoning)
 449				}
 450			}
 451			currentAssistant.FinishThinking()
 452			return a.messages.Update(genCtx, *currentAssistant)
 453		},
 454		OnTextDelta: func(id string, text string) error {
 455			// Strip leading newline from initial text content. This is is
 456			// particularly important in non-interactive mode where leading
 457			// newlines are very visible.
 458			if len(currentAssistant.Parts) == 0 {
 459				text = strings.TrimPrefix(text, "\n")
 460			}
 461
 462			currentAssistant.AppendContent(text)
 463			return a.messages.Update(genCtx, *currentAssistant)
 464		},
 465		OnToolInputStart: func(id string, toolName string) error {
 466			toolCall := message.ToolCall{
 467				ID:               id,
 468				Name:             toolName,
 469				ProviderExecuted: false,
 470				Finished:         false,
 471			}
 472			currentAssistant.AddToolCall(toolCall)
 473			// Use parent ctx instead of genCtx to ensure the update succeeds
 474			// even if the request is canceled mid-stream
 475			return a.messages.Update(ctx, *currentAssistant)
 476		},
 477		OnRetry: func(err *fantasy.ProviderError, delay time.Duration) {
 478			slog.Warn("Provider request failed, retrying", providerRetryLogFields(err, delay)...)
 479		},
 480		OnToolCall: func(tc fantasy.ToolCallContent) error {
 481			toolCall := message.ToolCall{
 482				ID:               tc.ToolCallID,
 483				Name:             tc.ToolName,
 484				Input:            tc.Input,
 485				ProviderExecuted: false,
 486				Finished:         true,
 487			}
 488			currentAssistant.AddToolCall(toolCall)
 489			// Use parent ctx instead of genCtx to ensure the update succeeds
 490			// even if the request is canceled mid-stream
 491			return a.messages.Update(ctx, *currentAssistant)
 492		},
 493		OnToolResult: func(result fantasy.ToolResultContent) error {
 494			toolResult := a.convertToToolResult(result)
 495			// Use parent ctx instead of genCtx to ensure the message is created
 496			// even if the request is canceled mid-stream
 497			_, createMsgErr := a.messages.Create(ctx, currentAssistant.SessionID, message.CreateMessageParams{
 498				Role: message.Tool,
 499				Parts: []message.ContentPart{
 500					toolResult,
 501				},
 502			})
 503			return createMsgErr
 504		},
 505		OnStepFinish: func(stepResult fantasy.StepResult) error {
 506			finishReason := message.FinishReasonUnknown
 507			switch stepResult.FinishReason {
 508			case fantasy.FinishReasonLength:
 509				finishReason = message.FinishReasonMaxTokens
 510			case fantasy.FinishReasonStop:
 511				finishReason = message.FinishReasonEndTurn
 512			case fantasy.FinishReasonToolCalls:
 513				finishReason = message.FinishReasonToolUse
 514			}
 515			// If a tool result halted the turn (e.g. a hook halt or a
 516			// permission denial), the step ends on FinishReasonToolCalls but
 517			// the model will not be called again. Treat it as the end of the
 518			// turn so the UI can render the assistant footer.
 519			if finishReason == message.FinishReasonToolUse {
 520				for _, tr := range stepResult.Content.ToolResults() {
 521					if tr.StopTurn {
 522						finishReason = message.FinishReasonEndTurn
 523						break
 524					}
 525				}
 526			}
 527			currentAssistant.AddFinish(finishReason, "", "")
 528			sessionLock.Lock()
 529			defer sessionLock.Unlock()
 530
 531			updatedSession, getSessionErr := a.sessions.Get(ctx, call.SessionID)
 532			if getSessionErr != nil {
 533				return getSessionErr
 534			}
 535			usage, estimated := fallbackStepUsage(stepMessages, stepResult)
 536			a.updateSessionUsage(largeModel, &updatedSession, usage, a.openrouterCost(stepResult.ProviderMetadata), estimated)
 537			_, sessionErr := a.sessions.Save(ctx, updatedSession)
 538			if sessionErr != nil {
 539				return sessionErr
 540			}
 541			currentSession = updatedSession
 542			return a.messages.Update(genCtx, *currentAssistant)
 543		},
 544		StopWhen: []fantasy.StopCondition{
 545			func(_ []fantasy.StepResult) bool {
 546				cw := int64(largeModel.CatwalkCfg.ContextWindow)
 547				// If context window is unknown (0), skip auto-summarize
 548				// to avoid immediately truncating custom/local models.
 549				if cw == 0 {
 550					return false
 551				}
 552				tokens := currentSession.CompletionTokens + currentSession.PromptTokens
 553				remaining := cw - tokens
 554				var threshold int64
 555				if cw > largeContextWindowThreshold {
 556					threshold = largeContextWindowBuffer
 557				} else {
 558					threshold = int64(float64(cw) * smallContextWindowRatio)
 559				}
 560				if (remaining <= threshold) && !a.disableAutoSummarize {
 561					shouldSummarize = true
 562					return true
 563				}
 564				return false
 565			},
 566			func(steps []fantasy.StepResult) bool {
 567				return hasRepeatedToolCalls(steps, loopDetectionWindowSize, loopDetectionMaxRepeats)
 568			},
 569		},
 570	})
 571
 572	a.eventPromptResponded(call.SessionID, time.Since(startTime).Truncate(time.Second))
 573
 574	if err != nil {
 575		isHyper := largeModel.ModelCfg.Provider == hyper.Name
 576		isCancelErr := errors.Is(err, context.Canceled)
 577		if currentAssistant == nil {
 578			return result, err
 579		}
 580		// Ensure we finish thinking on error to close the reasoning state.
 581		currentAssistant.FinishThinking()
 582		toolCalls := currentAssistant.ToolCalls()
 583		// INFO: we use the parent context here because the genCtx has been cancelled.
 584		msgs, createErr := a.messages.List(ctx, currentAssistant.SessionID)
 585		if createErr != nil {
 586			return nil, createErr
 587		}
 588		for _, tc := range toolCalls {
 589			if !tc.Finished {
 590				tc.Finished = true
 591				tc.Input = "{}"
 592				currentAssistant.AddToolCall(tc)
 593				updateErr := a.messages.Update(ctx, *currentAssistant)
 594				if updateErr != nil {
 595					return nil, updateErr
 596				}
 597			}
 598
 599			found := false
 600			for _, msg := range msgs {
 601				if msg.Role == message.Tool {
 602					for _, tr := range msg.ToolResults() {
 603						if tr.ToolCallID == tc.ID {
 604							found = true
 605							break
 606						}
 607					}
 608				}
 609				if found {
 610					break
 611				}
 612			}
 613			if found {
 614				continue
 615			}
 616			content := "There was an error while executing the tool"
 617			if isCancelErr {
 618				content = "Error: user cancelled assistant tool calling"
 619			}
 620			toolResult := message.ToolResult{
 621				ToolCallID: tc.ID,
 622				Name:       tc.Name,
 623				Content:    content,
 624				IsError:    true,
 625			}
 626			_, createErr = a.messages.Create(ctx, currentAssistant.SessionID, message.CreateMessageParams{
 627				Role: message.Tool,
 628				Parts: []message.ContentPart{
 629					toolResult,
 630				},
 631			})
 632			if createErr != nil {
 633				return nil, createErr
 634			}
 635		}
 636		var fantasyErr *fantasy.Error
 637		var providerErr *fantasy.ProviderError
 638		const defaultTitle = "Provider Error"
 639		linkStyle := lipgloss.NewStyle().Foreground(charmtone.Guac).Underline(true)
 640		if isCancelErr {
 641			currentAssistant.AddFinish(message.FinishReasonCanceled, "User canceled request", "")
 642		} else if isHyper && errors.As(err, &providerErr) && providerErr.StatusCode == http.StatusUnauthorized {
 643			currentAssistant.AddFinish(message.FinishReasonError, "Unauthorized", `Please re-authenticate with Hyper. You can also run "crush auth" to re-authenticate.`)
 644			if a.notify != nil {
 645				a.notify.Publish(pubsub.CreatedEvent, notify.Notification{
 646					SessionID:    call.SessionID,
 647					SessionTitle: currentSession.Title,
 648					Type:         notify.TypeReAuthenticate,
 649					ProviderID:   largeModel.ModelCfg.Provider,
 650				})
 651			}
 652		} else if isHyper && errors.As(err, &providerErr) && providerErr.StatusCode == http.StatusPaymentRequired {
 653			url := hyper.BaseURL()
 654			link := linkStyle.Hyperlink(url, "id=hyper").Render(url)
 655			currentAssistant.AddFinish(message.FinishReasonError, "No credits", "You're out of credits. Add more at "+link)
 656		} else if errors.As(err, &providerErr) {
 657			if providerErr.Message == "The requested model is not supported." {
 658				url := "https://github.com/settings/copilot/features"
 659				link := linkStyle.Hyperlink(url, "id=copilot").Render(url)
 660				currentAssistant.AddFinish(
 661					message.FinishReasonError,
 662					"Copilot model not enabled",
 663					fmt.Sprintf("%q is not enabled in Copilot. Go to the following page to enable it. Then, wait 5 minutes before trying again. %s", largeModel.CatwalkCfg.Name, link),
 664				)
 665			} else {
 666				currentAssistant.AddFinish(message.FinishReasonError, cmp.Or(stringext.Capitalize(providerErr.Title), defaultTitle), providerErr.Message)
 667			}
 668		} else if errors.As(err, &fantasyErr) {
 669			currentAssistant.AddFinish(message.FinishReasonError, cmp.Or(stringext.Capitalize(fantasyErr.Title), defaultTitle), fantasyErr.Message)
 670		} else {
 671			currentAssistant.AddFinish(message.FinishReasonError, defaultTitle, err.Error())
 672		}
 673		// Note: we use the parent context here because the genCtx has been
 674		// cancelled.
 675		updateErr := a.messages.Update(ctx, *currentAssistant)
 676		if updateErr != nil {
 677			return nil, updateErr
 678		}
 679		return nil, err
 680	}
 681
 682	if shouldSummarize {
 683		a.activeRequests.Del(call.SessionID)
 684		if summarizeErr := a.Summarize(genCtx, call.SessionID, call.ProviderOptions); summarizeErr != nil {
 685			return nil, summarizeErr
 686		}
 687		// If the agent wasn't done...
 688		if len(currentAssistant.ToolCalls()) > 0 {
 689			existing, ok := a.messageQueue.Get(call.SessionID)
 690			if !ok {
 691				existing = []SessionAgentCall{}
 692			}
 693			call.Prompt = fmt.Sprintf("The previous session was interrupted because it got too long, the initial user request was: `%s`", call.Prompt)
 694			existing = append(existing, call)
 695			a.messageQueue.Set(call.SessionID, existing)
 696		}
 697	}
 698
 699	// Release active request before publishing the notification.
 700	// TUI handlers poll IsSessionBusy() and only re-evaluate when a
 701	// tea.Msg arrives, so the cleanup must precede the notify or
 702	// subscribers see stale busy state at the moment of receipt.
 703	a.activeRequests.Del(call.SessionID)
 704	cancel()
 705
 706	// Send notification that agent has finished its turn (skip for
 707	// nested/non-interactive sessions).
 708	if !call.NonInteractive && a.notify != nil {
 709		a.notify.Publish(pubsub.CreatedEvent, notify.Notification{
 710			SessionID:    call.SessionID,
 711			SessionTitle: currentSession.Title,
 712			Type:         notify.TypeAgentFinished,
 713		})
 714	}
 715
 716	queuedMessages, ok := a.messageQueue.Get(call.SessionID)
 717	if !ok || len(queuedMessages) == 0 {
 718		return result, err
 719	}
 720	// There are queued messages restart the loop. The recursive Run
 721	// publishes its own RunComplete for the queued prompt, so suppress
 722	// the outer defer's emit to avoid a duplicate event whose Error
 723	// field would belong to the recursive turn but whose MessageID/Text
 724	// would belong to the outer turn.
 725	skipRunComplete = true
 726	firstQueuedMessage := queuedMessages[0]
 727	a.messageQueue.Set(call.SessionID, queuedMessages[1:])
 728	return a.Run(ctx, firstQueuedMessage)
 729}
 730
 731func (a *sessionAgent) Summarize(ctx context.Context, sessionID string, opts fantasy.ProviderOptions) error {
 732	if a.IsSessionBusy(sessionID) {
 733		return ErrSessionBusy
 734	}
 735
 736	// Copy mutable fields under lock to avoid races with SetModels.
 737	largeModel := a.largeModel.Get()
 738	systemPromptPrefix := a.systemPromptPrefix.Get()
 739
 740	currentSession, err := a.sessions.Get(ctx, sessionID)
 741	if err != nil {
 742		return fmt.Errorf("failed to get session: %w", err)
 743	}
 744	msgs, err := a.getSessionMessages(ctx, currentSession)
 745	if err != nil {
 746		return err
 747	}
 748	if len(msgs) == 0 {
 749		// Nothing to summarize.
 750		return nil
 751	}
 752
 753	aiMsgs, _ := a.preparePrompt(msgs, largeModel.CatwalkCfg.SupportsImages)
 754
 755	genCtx, cancel := context.WithCancel(ctx)
 756	a.activeRequests.Set(sessionID, cancel)
 757	defer a.activeRequests.Del(sessionID)
 758	defer cancel()
 759	defer func() {
 760		if flushErr := a.messages.FlushAll(ctx); flushErr != nil {
 761			slog.Error("Failed to flush pending message updates after summarize", "error", flushErr)
 762		}
 763	}()
 764
 765	agent := fantasy.NewAgent(
 766		largeModel.Model,
 767		fantasy.WithSystemPrompt(string(summaryPrompt)),
 768		fantasy.WithUserAgent(userAgent),
 769	)
 770	summaryMessage, err := a.messages.Create(ctx, sessionID, message.CreateMessageParams{
 771		Role:             message.Assistant,
 772		Model:            largeModel.ModelCfg.Model,
 773		Provider:         largeModel.ModelCfg.Provider,
 774		IsSummaryMessage: true,
 775	})
 776	if err != nil {
 777		return err
 778	}
 779
 780	summaryPromptText := buildSummaryPrompt(currentSession.Todos)
 781
 782	resp, err := agent.Stream(genCtx, fantasy.AgentStreamCall{
 783		Prompt:          summaryPromptText,
 784		Messages:        aiMsgs,
 785		ProviderOptions: opts,
 786		PrepareStep: func(callContext context.Context, options fantasy.PrepareStepFunctionOptions) (_ context.Context, prepared fantasy.PrepareStepResult, err error) {
 787			prepared.Messages = options.Messages
 788			if systemPromptPrefix != "" {
 789				prepared.Messages = append([]fantasy.Message{fantasy.NewSystemMessage(systemPromptPrefix)}, prepared.Messages...)
 790			}
 791			return callContext, prepared, nil
 792		},
 793		OnReasoningDelta: func(id string, text string) error {
 794			summaryMessage.AppendReasoningContent(text)
 795			return a.messages.Update(genCtx, summaryMessage)
 796		},
 797		OnReasoningEnd: func(id string, reasoning fantasy.ReasoningContent) error {
 798			// Handle anthropic signature.
 799			if anthropicData, ok := reasoning.ProviderMetadata["anthropic"]; ok {
 800				if signature, ok := anthropicData.(*anthropic.ReasoningOptionMetadata); ok && signature.Signature != "" {
 801					summaryMessage.AppendReasoningSignature(signature.Signature)
 802				}
 803			}
 804			summaryMessage.FinishThinking()
 805			return a.messages.Update(genCtx, summaryMessage)
 806		},
 807		OnTextDelta: func(id, text string) error {
 808			summaryMessage.AppendContent(text)
 809			return a.messages.Update(genCtx, summaryMessage)
 810		},
 811	})
 812	if err != nil {
 813		isCancelErr := errors.Is(err, context.Canceled)
 814		if isCancelErr {
 815			// User cancelled summarize we need to remove the summary message.
 816			deleteErr := a.messages.Delete(ctx, summaryMessage.ID)
 817			return deleteErr
 818		}
 819		// Mark the summary message as finished with an error so the UI
 820		// stops spinning.
 821		summaryMessage.AddFinish(message.FinishReasonError, "Summarization Error", err.Error())
 822		if updateErr := a.messages.Update(ctx, summaryMessage); updateErr != nil {
 823			return updateErr
 824		}
 825		return err
 826	}
 827
 828	summaryMessage.AddFinish(message.FinishReasonEndTurn, "", "")
 829	err = a.messages.Update(genCtx, summaryMessage)
 830	if err != nil {
 831		return err
 832	}
 833
 834	var openrouterCost *float64
 835	for _, step := range resp.Steps {
 836		stepCost := a.openrouterCost(step.ProviderMetadata)
 837		if stepCost != nil {
 838			newCost := *stepCost
 839			if openrouterCost != nil {
 840				newCost += *openrouterCost
 841			}
 842			openrouterCost = &newCost
 843		}
 844	}
 845
 846	a.updateSessionUsage(largeModel, &currentSession, resp.TotalUsage, openrouterCost, false)
 847
 848	// Just in case, get just the last usage info.
 849	usage := resp.Response.Usage
 850	currentSession.SummaryMessageID = summaryMessage.ID
 851	currentSession.CompletionTokens = summaryCompletionTokens(usage, summaryMessage)
 852	currentSession.PromptTokens = 0
 853	currentSession.EstimatedUsage = usageIsZero(usage)
 854	_, err = a.sessions.Save(genCtx, currentSession)
 855	if err != nil {
 856		return err
 857	}
 858
 859	// Release the active request before processing queued messages so that
 860	// Run() does not see the session as busy.
 861	a.activeRequests.Del(sessionID)
 862	cancel()
 863
 864	// Process any messages that were queued while summarizing.
 865	queuedMessages, ok := a.messageQueue.Get(sessionID)
 866	if !ok || len(queuedMessages) == 0 {
 867		return nil
 868	}
 869	firstQueuedMessage := queuedMessages[0]
 870	a.messageQueue.Set(sessionID, queuedMessages[1:])
 871	_, qErr := a.Run(ctx, firstQueuedMessage)
 872	return qErr
 873}
 874
 875func (a *sessionAgent) getCacheControlOptions() fantasy.ProviderOptions {
 876	if t, _ := strconv.ParseBool(os.Getenv("CRUSH_DISABLE_ANTHROPIC_CACHE")); t {
 877		return fantasy.ProviderOptions{}
 878	}
 879	return fantasy.ProviderOptions{
 880		anthropic.Name: &anthropic.ProviderCacheControlOptions{
 881			CacheControl: anthropic.CacheControl{Type: "ephemeral"},
 882		},
 883		bedrock.Name: &anthropic.ProviderCacheControlOptions{
 884			CacheControl: anthropic.CacheControl{Type: "ephemeral"},
 885		},
 886		vercel.Name: &anthropic.ProviderCacheControlOptions{
 887			CacheControl: anthropic.CacheControl{Type: "ephemeral"},
 888		},
 889	}
 890}
 891
 892func (a *sessionAgent) createUserMessage(ctx context.Context, call SessionAgentCall) (message.Message, error) {
 893	parts := []message.ContentPart{message.TextContent{Text: call.Prompt}}
 894	var attachmentParts []message.ContentPart
 895	for _, attachment := range call.Attachments {
 896		attachmentParts = append(attachmentParts, message.BinaryContent{Path: attachment.FilePath, MIMEType: attachment.MimeType, Data: attachment.Content})
 897	}
 898	parts = append(parts, attachmentParts...)
 899	msg, err := a.messages.Create(ctx, call.SessionID, message.CreateMessageParams{
 900		Role:  message.User,
 901		Parts: parts,
 902	})
 903	if err != nil {
 904		return message.Message{}, fmt.Errorf("failed to create user message: %w", err)
 905	}
 906	return msg, nil
 907}
 908
 909func (a *sessionAgent) preparePrompt(msgs []message.Message, supportsImages bool, attachments ...message.Attachment) ([]fantasy.Message, []fantasy.FilePart) {
 910	var history []fantasy.Message
 911	if !a.isSubAgent {
 912		history = append(history, fantasy.NewUserMessage(
 913			fmt.Sprintf(
 914				"<system_reminder>%s</system_reminder>",
 915				`This is a reminder that your todo list is currently empty. DO NOT mention this to the user explicitly because they are already aware.
 916If you are working on tasks that would benefit from a todo list please use the "todos" tool to create one.
 917If not, please feel free to ignore. Again do not mention this message to the user.`,
 918			),
 919		))
 920	}
 921	// Collect all tool call IDs present in assistant messages and all tool
 922	// result IDs present in tool messages. This lets us detect both orphaned
 923	// tool results (result without a call) and orphaned tool calls (call
 924	// without a result).
 925	knownToolCallIDs := make(map[string]struct{})
 926	knownToolResultIDs := make(map[string]struct{})
 927	for _, m := range msgs {
 928		switch m.Role {
 929		case message.Assistant:
 930			for _, tc := range m.ToolCalls() {
 931				knownToolCallIDs[tc.ID] = struct{}{}
 932			}
 933		case message.Tool:
 934			for _, tr := range m.ToolResults() {
 935				knownToolResultIDs[tr.ToolCallID] = struct{}{}
 936			}
 937		}
 938	}
 939
 940	for _, m := range msgs {
 941		if len(m.Parts) == 0 {
 942			continue
 943		}
 944		// Assistant message without content or tool calls (cancelled before it returned anything).
 945		if m.Role == message.Assistant && len(m.ToolCalls()) == 0 && m.Content().Text == "" && m.ReasoningContent().String() == "" {
 946			continue
 947		}
 948		if m.Role == message.Tool {
 949			if msg, ok := filterOrphanedToolResults(m, knownToolCallIDs); ok {
 950				history = append(history, msg)
 951			}
 952			continue
 953		}
 954		aiMsgs := m.ToAIMessage()
 955		if !supportsImages {
 956			for i := range aiMsgs {
 957				if aiMsgs[i].Role == fantasy.MessageRoleUser {
 958					aiMsgs[i].Content = filterFileParts(aiMsgs[i].Content)
 959				}
 960			}
 961		}
 962		history = append(history, aiMsgs...)
 963
 964		if m.Role == message.Assistant {
 965			if msg, ok := syntheticToolResultsForOrphanedCalls(m, knownToolResultIDs); ok {
 966				history = append(history, msg)
 967			}
 968		}
 969	}
 970
 971	var files []fantasy.FilePart
 972	for _, attachment := range attachments {
 973		if attachment.IsText() {
 974			continue
 975		}
 976		files = append(files, fantasy.FilePart{
 977			Filename:  attachment.FileName,
 978			Data:      attachment.Content,
 979			MediaType: attachment.MimeType,
 980		})
 981	}
 982
 983	return history, files
 984}
 985
 986// filterFileParts removes fantasy.FilePart entries from a slice of message
 987// parts. Used to strip image attachments from historical user messages when
 988// the current model does not support them.
 989func filterFileParts(parts []fantasy.MessagePart) []fantasy.MessagePart {
 990	filtered := make([]fantasy.MessagePart, 0, len(parts))
 991	for _, part := range parts {
 992		if _, ok := fantasy.AsMessagePart[fantasy.FilePart](part); ok {
 993			continue
 994		}
 995		filtered = append(filtered, part)
 996	}
 997	return filtered
 998}
 999
1000// filterOrphanedToolResults converts a tool message to a fantasy.Message,
1001// dropping any tool result parts whose tool_call_id has no matching tool call
1002// in the known set. An orphaned result causes API validation to fail on every
1003// subsequent turn, permanently locking the session. Returns the filtered
1004// message and true if at least one valid part remains.
1005func filterOrphanedToolResults(m message.Message, knownToolCallIDs map[string]struct{}) (fantasy.Message, bool) {
1006	aiMsgs := m.ToAIMessage()
1007	if len(aiMsgs) == 0 {
1008		return fantasy.Message{}, false
1009	}
1010	var validParts []fantasy.MessagePart
1011	for _, part := range aiMsgs[0].Content {
1012		tr, ok := fantasy.AsMessagePart[fantasy.ToolResultPart](part)
1013		if !ok {
1014			validParts = append(validParts, part)
1015			continue
1016		}
1017		if _, known := knownToolCallIDs[tr.ToolCallID]; known {
1018			validParts = append(validParts, part)
1019		} else {
1020			slog.Warn(
1021				"Dropping orphaned tool result with no matching tool call",
1022				"tool_call_id", tr.ToolCallID,
1023			)
1024		}
1025	}
1026	if len(validParts) == 0 {
1027		return fantasy.Message{}, false
1028	}
1029	msg := aiMsgs[0]
1030	msg.Content = validParts
1031	return msg, true
1032}
1033
1034// syntheticToolResultsForOrphanedCalls returns a tool message containing
1035// synthetic tool results for any tool calls in the assistant message that
1036// have no matching result in knownToolResultIDs. LLM APIs require every
1037// tool_use to be immediately followed by a tool_result; an interrupted
1038// session can leave orphaned tool_use blocks that permanently lock the
1039// conversation. Returns the message and true if any synthetic results were
1040// produced.
1041func syntheticToolResultsForOrphanedCalls(m message.Message, knownToolResultIDs map[string]struct{}) (fantasy.Message, bool) {
1042	var syntheticParts []fantasy.MessagePart
1043	for _, tc := range m.ToolCalls() {
1044		if _, hasResult := knownToolResultIDs[tc.ID]; hasResult {
1045			continue
1046		}
1047		slog.Warn(
1048			"Injecting synthetic tool result for orphaned tool call",
1049			"tool_call_id", tc.ID,
1050			"tool_name", tc.Name,
1051		)
1052		syntheticParts = append(syntheticParts, fantasy.ToolResultPart{
1053			ToolCallID: tc.ID,
1054			Output: fantasy.ToolResultOutputContentError{
1055				Error: errors.New("tool call was interrupted and did not produce a result, you may retry this call if the result is still needed"),
1056			},
1057		})
1058	}
1059	if len(syntheticParts) == 0 {
1060		return fantasy.Message{}, false
1061	}
1062	return fantasy.Message{
1063		Role:    fantasy.MessageRoleTool,
1064		Content: syntheticParts,
1065	}, true
1066}
1067
1068func (a *sessionAgent) getSessionMessages(ctx context.Context, session session.Session) ([]message.Message, error) {
1069	msgs, err := a.messages.List(ctx, session.ID)
1070	if err != nil {
1071		return nil, fmt.Errorf("failed to list messages: %w", err)
1072	}
1073
1074	if session.SummaryMessageID != "" {
1075		summaryMsgIndex := -1
1076		for i, msg := range msgs {
1077			if msg.ID == session.SummaryMessageID {
1078				summaryMsgIndex = i
1079				break
1080			}
1081		}
1082		if summaryMsgIndex != -1 {
1083			msgs = msgs[summaryMsgIndex:]
1084			msgs[0].Role = message.User
1085		}
1086	}
1087	return msgs, nil
1088}
1089
1090// generateTitle generates a session titled based on the initial prompt.
1091func (a *sessionAgent) generateTitle(ctx context.Context, sessionID string, userPrompt string) {
1092	if userPrompt == "" {
1093		return
1094	}
1095
1096	smallModel := a.smallModel.Get()
1097	largeModel := a.largeModel.Get()
1098	systemPromptPrefix := a.systemPromptPrefix.Get()
1099
1100	var maxOutputTokens int64 = 40
1101	if smallModel.CatwalkCfg.CanReason {
1102		maxOutputTokens = smallModel.CatwalkCfg.DefaultMaxTokens
1103	}
1104
1105	newAgent := func(m fantasy.LanguageModel, p []byte, tok int64) fantasy.Agent {
1106		return fantasy.NewAgent(
1107			m,
1108			fantasy.WithSystemPrompt(string(p)+"\n /no_think"),
1109			fantasy.WithMaxOutputTokens(tok),
1110			fantasy.WithUserAgent(userAgent),
1111		)
1112	}
1113
1114	streamCall := fantasy.AgentStreamCall{
1115		Prompt: fmt.Sprintf("Generate a concise title for the following content:\n\n%s\n <think>\n\n</think>", userPrompt),
1116		PrepareStep: func(callCtx context.Context, opts fantasy.PrepareStepFunctionOptions) (_ context.Context, prepared fantasy.PrepareStepResult, err error) {
1117			prepared.Messages = opts.Messages
1118			if systemPromptPrefix != "" {
1119				prepared.Messages = append([]fantasy.Message{
1120					fantasy.NewSystemMessage(systemPromptPrefix),
1121				}, prepared.Messages...)
1122			}
1123			return callCtx, prepared, nil
1124		},
1125	}
1126
1127	// Use the small model to generate the title.
1128	model := smallModel
1129	agent := newAgent(model.Model, titlePrompt, maxOutputTokens)
1130	resp, err := agent.Stream(ctx, streamCall)
1131	if err == nil {
1132		// We successfully generated a title with the small model.
1133		slog.Debug("Generated title with small model")
1134	} else {
1135		// It didn't work. Let's try with the big model.
1136		slog.Error("Error generating title with small model; trying big model", "err", err)
1137		model = largeModel
1138		agent = newAgent(model.Model, titlePrompt, maxOutputTokens)
1139		resp, err = agent.Stream(ctx, streamCall)
1140		if err == nil {
1141			slog.Debug("Generated title with large model")
1142		} else {
1143			// Welp, the large model didn't work either. Use the default
1144			// session name and return.
1145			slog.Error("Error generating title with large model", "err", err)
1146			saveErr := a.sessions.Rename(ctx, sessionID, DefaultSessionName)
1147			if saveErr != nil {
1148				slog.Error("Failed to save session title", "error", saveErr)
1149			}
1150			return
1151		}
1152	}
1153
1154	if resp == nil {
1155		// Actually, we didn't get a response so we can't. Use the default
1156		// session name and return.
1157		slog.Error("Response is nil; can't generate title")
1158		saveErr := a.sessions.Rename(ctx, sessionID, DefaultSessionName)
1159		if saveErr != nil {
1160			slog.Error("Failed to save session title", "error", saveErr)
1161		}
1162		return
1163	}
1164
1165	// Clean up title.
1166	var title string
1167	title = strings.ReplaceAll(resp.Response.Content.Text(), "\n", " ")
1168
1169	// Remove thinking tags if present.
1170	title = thinkTagRegex.ReplaceAllString(title, "")
1171	title = orphanThinkTagRegex.ReplaceAllString(title, "")
1172
1173	title = strings.TrimSpace(title)
1174	title = cmp.Or(title, DefaultSessionName)
1175
1176	// Calculate usage and cost.
1177	var openrouterCost *float64
1178	for _, step := range resp.Steps {
1179		stepCost := a.openrouterCost(step.ProviderMetadata)
1180		if stepCost != nil {
1181			newCost := *stepCost
1182			if openrouterCost != nil {
1183				newCost += *openrouterCost
1184			}
1185			openrouterCost = &newCost
1186		}
1187	}
1188
1189	modelConfig := model.CatwalkCfg
1190	cost := modelConfig.CostPer1MInCached/1e6*float64(resp.TotalUsage.CacheCreationTokens) +
1191		modelConfig.CostPer1MOutCached/1e6*float64(resp.TotalUsage.CacheReadTokens) +
1192		modelConfig.CostPer1MIn/1e6*float64(resp.TotalUsage.InputTokens) +
1193		modelConfig.CostPer1MOut/1e6*float64(resp.TotalUsage.OutputTokens)
1194
1195	// Use override cost if available (e.g., from OpenRouter).
1196	if openrouterCost != nil {
1197		cost = *openrouterCost
1198	}
1199
1200	// Skip cost accumulation
1201	if model.FlatRate {
1202		cost = 0
1203	}
1204
1205	promptTokens := resp.TotalUsage.InputTokens + resp.TotalUsage.CacheCreationTokens
1206	completionTokens := resp.TotalUsage.OutputTokens
1207
1208	// Atomically update only title and usage fields to avoid overriding other
1209	// concurrent session updates.
1210	saveErr := a.sessions.UpdateTitleAndUsage(ctx, sessionID, title, promptTokens, completionTokens, cost)
1211	if saveErr != nil {
1212		slog.Error("Failed to save session title and usage", "error", saveErr)
1213		return
1214	}
1215}
1216
1217func (a *sessionAgent) openrouterCost(metadata fantasy.ProviderMetadata) *float64 {
1218	openrouterMetadata, ok := metadata[openrouter.Name]
1219	if !ok {
1220		return nil
1221	}
1222
1223	opts, ok := openrouterMetadata.(*openrouter.ProviderMetadata)
1224	if !ok {
1225		return nil
1226	}
1227	return &opts.Usage.Cost
1228}
1229
1230func (a *sessionAgent) updateSessionUsage(model Model, session *session.Session, usage fantasy.Usage, overrideCost *float64, estimated bool) {
1231	if !usageIsZero(usage) {
1232		session.EstimatedUsage = estimated
1233	}
1234
1235	modelConfig := model.CatwalkCfg
1236	cost := modelConfig.CostPer1MInCached/1e6*float64(usage.CacheCreationTokens) +
1237		modelConfig.CostPer1MOutCached/1e6*float64(usage.CacheReadTokens) +
1238		modelConfig.CostPer1MIn/1e6*float64(usage.InputTokens) +
1239		modelConfig.CostPer1MOut/1e6*float64(usage.OutputTokens)
1240
1241	if !estimated {
1242		a.eventTokensUsed(session.ID, model, usage, cost)
1243	}
1244
1245	if estimated {
1246		cost = 0
1247	} else {
1248		// Use override cost if available (e.g., from OpenRouter).
1249		if overrideCost != nil {
1250			cost = *overrideCost
1251		}
1252
1253		// Skip cost accumulation
1254		if model.FlatRate {
1255			cost = 0
1256		}
1257	}
1258
1259	session.Cost += cost
1260	updateSessionTokenCounters(session, usage)
1261}
1262
1263func updateSessionTokenCounters(session *session.Session, usage fantasy.Usage) {
1264	if usage.OutputTokens != 0 {
1265		session.CompletionTokens = usage.OutputTokens
1266	}
1267	if promptTokens := usage.InputTokens + usage.CacheReadTokens; promptTokens != 0 {
1268		session.PromptTokens = promptTokens
1269	}
1270}
1271
1272func summaryCompletionTokens(usage fantasy.Usage, summaryMessage message.Message) int64 {
1273	if usage.OutputTokens != 0 {
1274		return usage.OutputTokens
1275	}
1276	return approxTokenCount(summaryMessage.Content().Text) + approxTokenCount(summaryMessage.ReasoningContent().String())
1277}
1278
1279func (a *sessionAgent) Cancel(sessionID string) {
1280	// Cancel regular requests. Don't use Take() here - we need the entry to
1281	// remain in activeRequests so IsBusy() returns true until the goroutine
1282	// fully completes (including error handling that may access the DB).
1283	// The defer in processRequest will clean up the entry.
1284	if cancel, ok := a.activeRequests.Get(sessionID); ok && cancel != nil {
1285		slog.Debug("Request cancellation initiated", "session_id", sessionID)
1286		cancel()
1287	}
1288
1289	// Also check for summarize requests.
1290	if cancel, ok := a.activeRequests.Get(sessionID + "-summarize"); ok && cancel != nil {
1291		slog.Debug("Summarize cancellation initiated", "session_id", sessionID)
1292		cancel()
1293	}
1294
1295	if a.QueuedPrompts(sessionID) > 0 {
1296		slog.Debug("Clearing queued prompts", "session_id", sessionID)
1297		a.messageQueue.Del(sessionID)
1298	}
1299}
1300
1301func (a *sessionAgent) ClearQueue(sessionID string) {
1302	if a.QueuedPrompts(sessionID) > 0 {
1303		slog.Debug("Clearing queued prompts", "session_id", sessionID)
1304		a.messageQueue.Del(sessionID)
1305	}
1306}
1307
1308func (a *sessionAgent) CancelAll() {
1309	if !a.IsBusy() {
1310		return
1311	}
1312	for key := range a.activeRequests.Seq2() {
1313		a.Cancel(key) // key is sessionID
1314	}
1315
1316	timeout := time.After(5 * time.Second)
1317	for a.IsBusy() {
1318		select {
1319		case <-timeout:
1320			return
1321		default:
1322			time.Sleep(200 * time.Millisecond)
1323		}
1324	}
1325}
1326
1327func (a *sessionAgent) IsBusy() bool {
1328	var busy bool
1329	for cancelFunc := range a.activeRequests.Seq() {
1330		if cancelFunc != nil {
1331			busy = true
1332			break
1333		}
1334	}
1335	return busy
1336}
1337
1338func (a *sessionAgent) IsSessionBusy(sessionID string) bool {
1339	_, busy := a.activeRequests.Get(sessionID)
1340	return busy
1341}
1342
1343func (a *sessionAgent) QueuedPrompts(sessionID string) int {
1344	l, ok := a.messageQueue.Get(sessionID)
1345	if !ok {
1346		return 0
1347	}
1348	return len(l)
1349}
1350
1351func (a *sessionAgent) QueuedPromptsList(sessionID string) []string {
1352	l, ok := a.messageQueue.Get(sessionID)
1353	if !ok {
1354		return nil
1355	}
1356	prompts := make([]string, len(l))
1357	for i, call := range l {
1358		prompts[i] = call.Prompt
1359	}
1360	return prompts
1361}
1362
1363func (a *sessionAgent) SetModels(large Model, small Model) {
1364	a.largeModel.Set(large)
1365	a.smallModel.Set(small)
1366}
1367
1368func (a *sessionAgent) SetTools(tools []fantasy.AgentTool) {
1369	a.tools.SetSlice(tools)
1370}
1371
1372func (a *sessionAgent) SetSystemPrompt(systemPrompt string) {
1373	a.systemPrompt.Set(systemPrompt)
1374}
1375
1376func (a *sessionAgent) Model() Model {
1377	return a.largeModel.Get()
1378}
1379
1380// convertToToolResult converts a fantasy tool result to a message tool result.
1381func (a *sessionAgent) convertToToolResult(result fantasy.ToolResultContent) message.ToolResult {
1382	baseResult := message.ToolResult{
1383		ToolCallID: result.ToolCallID,
1384		Name:       result.ToolName,
1385		Metadata:   result.ClientMetadata,
1386	}
1387
1388	switch result.Result.GetType() {
1389	case fantasy.ToolResultContentTypeText:
1390		if r, ok := fantasy.AsToolResultOutputType[fantasy.ToolResultOutputContentText](result.Result); ok {
1391			baseResult.Content = r.Text
1392		}
1393	case fantasy.ToolResultContentTypeError:
1394		if r, ok := fantasy.AsToolResultOutputType[fantasy.ToolResultOutputContentError](result.Result); ok {
1395			baseResult.Content = r.Error.Error()
1396			baseResult.IsError = true
1397		}
1398	case fantasy.ToolResultContentTypeMedia:
1399		if r, ok := fantasy.AsToolResultOutputType[fantasy.ToolResultOutputContentMedia](result.Result); ok {
1400			if !stringext.IsValidBase64(r.Data) {
1401				slog.Warn(
1402					"Tool returned media with invalid base64 data, discarding image",
1403					"tool", result.ToolName,
1404					"tool_call_id", result.ToolCallID,
1405				)
1406				baseResult.Content = "Tool returned image data with invalid encoding"
1407				baseResult.IsError = true
1408			} else {
1409				content := r.Text
1410				if content == "" {
1411					content = fmt.Sprintf("Loaded %s content", r.MediaType)
1412				}
1413				baseResult.Content = content
1414				baseResult.Data = r.Data
1415				baseResult.MIMEType = r.MediaType
1416			}
1417		}
1418	}
1419
1420	return baseResult
1421}
1422
1423// workaroundProviderMediaLimitations converts media content in tool results to
1424// user messages for providers that don't natively support images in tool results.
1425//
1426// Problem: OpenAI, Google, OpenRouter, and other OpenAI-compatible providers
1427// don't support sending images/media in tool result messages - they only accept
1428// text in tool results. However, they DO support images in user messages.
1429//
1430// If we send media in tool results to these providers, the API returns an error.
1431//
1432// Solution: For these providers, we:
1433//  1. Replace the media in the tool result with a text placeholder
1434//  2. Inject a user message immediately after with the image as a file attachment
1435//  3. This maintains the tool execution flow while working around API limitations
1436//
1437// Anthropic and Bedrock support images natively in tool results, so we skip
1438// this workaround for them.
1439//
1440// Example transformation:
1441//
1442//	BEFORE: [tool result: image data]
1443//	AFTER:  [tool result: "Image loaded - see attached"], [user: image attachment]
1444func (a *sessionAgent) workaroundProviderMediaLimitations(messages []fantasy.Message, largeModel Model) []fantasy.Message {
1445	providerSupportsMedia := largeModel.ModelCfg.Provider == string(catwalk.InferenceProviderAnthropic) ||
1446		largeModel.ModelCfg.Provider == string(catwalk.InferenceProviderBedrock) ||
1447		largeModel.ModelCfg.Provider == string(catwalk.InferenceProviderBedrockEurope)
1448
1449	if providerSupportsMedia {
1450		return messages
1451	}
1452
1453	convertedMessages := make([]fantasy.Message, 0, len(messages))
1454
1455	for _, msg := range messages {
1456		if msg.Role != fantasy.MessageRoleTool {
1457			convertedMessages = append(convertedMessages, msg)
1458			continue
1459		}
1460
1461		textParts := make([]fantasy.MessagePart, 0, len(msg.Content))
1462		var mediaFiles []fantasy.FilePart
1463
1464		for _, part := range msg.Content {
1465			toolResult, ok := fantasy.AsMessagePart[fantasy.ToolResultPart](part)
1466			if !ok {
1467				textParts = append(textParts, part)
1468				continue
1469			}
1470
1471			if media, ok := fantasy.AsToolResultOutputType[fantasy.ToolResultOutputContentMedia](toolResult.Output); ok {
1472				decoded, err := base64.StdEncoding.DecodeString(media.Data)
1473				if err != nil {
1474					slog.Warn("Failed to decode media data", "error", err)
1475					textParts = append(textParts, part)
1476					continue
1477				}
1478
1479				mediaFiles = append(mediaFiles, fantasy.FilePart{
1480					Data:      decoded,
1481					MediaType: media.MediaType,
1482					Filename:  fmt.Sprintf("tool-result-%s", toolResult.ToolCallID),
1483				})
1484
1485				textParts = append(textParts, fantasy.ToolResultPart{
1486					ToolCallID: toolResult.ToolCallID,
1487					Output: fantasy.ToolResultOutputContentText{
1488						Text: "[Image/media content loaded - see attached file]",
1489					},
1490					ProviderOptions: toolResult.ProviderOptions,
1491				})
1492			} else {
1493				textParts = append(textParts, part)
1494			}
1495		}
1496
1497		convertedMessages = append(convertedMessages, fantasy.Message{
1498			Role:    fantasy.MessageRoleTool,
1499			Content: textParts,
1500		})
1501
1502		if len(mediaFiles) > 0 {
1503			convertedMessages = append(convertedMessages, fantasy.NewUserMessage(
1504				"Here is the media content from the tool result:",
1505				mediaFiles...,
1506			))
1507		}
1508	}
1509
1510	return convertedMessages
1511}
1512
1513// buildSummaryPrompt constructs the prompt text for session summarization.
1514func buildSummaryPrompt(todos []session.Todo) string {
1515	var sb strings.Builder
1516	sb.WriteString("Provide a detailed summary of our conversation above.")
1517	if len(todos) > 0 {
1518		sb.WriteString("\n\n## Current Todo List\n\n")
1519		for _, t := range todos {
1520			fmt.Fprintf(&sb, "- [%s] %s\n", t.Status, t.Content)
1521		}
1522		sb.WriteString("\nInclude these tasks and their statuses in your summary. ")
1523		sb.WriteString("Instruct the resuming assistant to use the `todos` tool to continue tracking progress on these tasks.")
1524	}
1525	return sb.String()
1526}
1527
1528func providerRetryLogFields(err *fantasy.ProviderError, delay time.Duration) []any {
1529	fields := []any{
1530		"retry_delay", delay.String(),
1531	}
1532	if err == nil {
1533		return fields
1534	}
1535	fields = append(fields, "status_code", err.StatusCode)
1536	if err.Title != "" {
1537		fields = append(fields, "title", err.Title)
1538	}
1539	if err.Message != "" {
1540		fields = append(fields, "message", err.Message)
1541	}
1542	return fields
1543}