agent.go

   1// Package agent is the core orchestration layer for Crush AI agents.
   2//
   3// It provides session-based AI agent functionality for managing
   4// conversations, tool execution, and message handling. It coordinates
   5// interactions between language models, messages, sessions, and tools while
   6// handling features like automatic summarization, queuing, and token
   7// management.
   8package agent
   9
  10import (
  11	"cmp"
  12	"context"
  13	_ "embed"
  14	"encoding/base64"
  15	"errors"
  16	"fmt"
  17	"log/slog"
  18	"os"
  19	"regexp"
  20	"strconv"
  21	"strings"
  22	"sync"
  23	"time"
  24
  25	"charm.land/catwalk/pkg/catwalk"
  26	"charm.land/fantasy"
  27	"charm.land/fantasy/providers/anthropic"
  28	"charm.land/fantasy/providers/bedrock"
  29	"charm.land/fantasy/providers/google"
  30	"charm.land/fantasy/providers/openai"
  31	"charm.land/fantasy/providers/openrouter"
  32	"charm.land/fantasy/providers/vercel"
  33	"charm.land/lipgloss/v2"
  34	"github.com/charmbracelet/crush/internal/agent/hyper"
  35	"github.com/charmbracelet/crush/internal/agent/tools"
  36	"github.com/charmbracelet/crush/internal/agent/tools/mcp"
  37	"github.com/charmbracelet/crush/internal/config"
  38	"github.com/charmbracelet/crush/internal/csync"
  39	"github.com/charmbracelet/crush/internal/message"
  40	"github.com/charmbracelet/crush/internal/permission"
  41	"github.com/charmbracelet/crush/internal/session"
  42	"github.com/charmbracelet/crush/internal/stringext"
  43	"github.com/charmbracelet/x/exp/charmtone"
  44)
  45
  46const (
  47	defaultSessionName = "Untitled Session"
  48
  49	// Constants for auto-summarization thresholds
  50	largeContextWindowThreshold = 200_000
  51	largeContextWindowBuffer    = 20_000
  52	smallContextWindowRatio     = 0.2
  53)
  54
  55//go:embed templates/title.md
  56var titlePrompt []byte
  57
  58//go:embed templates/summary.md
  59var summaryPrompt []byte
  60
  61// Used to remove <think> tags from generated titles.
  62var thinkTagRegex = regexp.MustCompile(`<think>.*?</think>`)
  63
  64type SessionAgentCall struct {
  65	SessionID        string
  66	Prompt           string
  67	ProviderOptions  fantasy.ProviderOptions
  68	Attachments      []message.Attachment
  69	MaxOutputTokens  int64
  70	Temperature      *float64
  71	TopP             *float64
  72	TopK             *int64
  73	FrequencyPenalty *float64
  74	PresencePenalty  *float64
  75}
  76
  77type SessionAgent interface {
  78	Run(context.Context, SessionAgentCall) (*fantasy.AgentResult, error)
  79	SetModels(large Model, small Model)
  80	SetTools(tools []fantasy.AgentTool)
  81	SetSystemPrompt(systemPrompt string)
  82	Cancel(sessionID string)
  83	CancelAll()
  84	IsSessionBusy(sessionID string) bool
  85	IsBusy() bool
  86	QueuedPrompts(sessionID string) int
  87	QueuedPromptsList(sessionID string) []string
  88	ClearQueue(sessionID string)
  89	Summarize(context.Context, string, fantasy.ProviderOptions) error
  90	Model() Model
  91}
  92
  93type Model struct {
  94	Model      fantasy.LanguageModel
  95	CatwalkCfg catwalk.Model
  96	ModelCfg   config.SelectedModel
  97}
  98
  99type sessionAgent struct {
 100	largeModel         *csync.Value[Model]
 101	smallModel         *csync.Value[Model]
 102	systemPromptPrefix *csync.Value[string]
 103	systemPrompt       *csync.Value[string]
 104	tools              *csync.Slice[fantasy.AgentTool]
 105
 106	isSubAgent           bool
 107	sessions             session.Service
 108	messages             message.Service
 109	disableAutoSummarize bool
 110	isYolo               bool
 111
 112	messageQueue   *csync.Map[string, []SessionAgentCall]
 113	activeRequests *csync.Map[string, context.CancelFunc]
 114}
 115
 116type SessionAgentOptions struct {
 117	LargeModel           Model
 118	SmallModel           Model
 119	SystemPromptPrefix   string
 120	SystemPrompt         string
 121	IsSubAgent           bool
 122	DisableAutoSummarize bool
 123	IsYolo               bool
 124	Sessions             session.Service
 125	Messages             message.Service
 126	Tools                []fantasy.AgentTool
 127}
 128
 129func NewSessionAgent(
 130	opts SessionAgentOptions,
 131) SessionAgent {
 132	return &sessionAgent{
 133		largeModel:           csync.NewValue(opts.LargeModel),
 134		smallModel:           csync.NewValue(opts.SmallModel),
 135		systemPromptPrefix:   csync.NewValue(opts.SystemPromptPrefix),
 136		systemPrompt:         csync.NewValue(opts.SystemPrompt),
 137		isSubAgent:           opts.IsSubAgent,
 138		sessions:             opts.Sessions,
 139		messages:             opts.Messages,
 140		disableAutoSummarize: opts.DisableAutoSummarize,
 141		tools:                csync.NewSliceFrom(opts.Tools),
 142		isYolo:               opts.IsYolo,
 143		messageQueue:         csync.NewMap[string, []SessionAgentCall](),
 144		activeRequests:       csync.NewMap[string, context.CancelFunc](),
 145	}
 146}
 147
 148func (a *sessionAgent) Run(ctx context.Context, call SessionAgentCall) (*fantasy.AgentResult, error) {
 149	if call.Prompt == "" && !message.ContainsTextAttachment(call.Attachments) {
 150		return nil, ErrEmptyPrompt
 151	}
 152	if call.SessionID == "" {
 153		return nil, ErrSessionMissing
 154	}
 155
 156	// Queue the message if busy
 157	if a.IsSessionBusy(call.SessionID) {
 158		existing, ok := a.messageQueue.Get(call.SessionID)
 159		if !ok {
 160			existing = []SessionAgentCall{}
 161		}
 162		existing = append(existing, call)
 163		a.messageQueue.Set(call.SessionID, existing)
 164		return nil, nil
 165	}
 166
 167	// Copy mutable fields under lock to avoid races with SetTools/SetModels.
 168	agentTools := a.tools.Copy()
 169	largeModel := a.largeModel.Get()
 170	systemPrompt := a.systemPrompt.Get()
 171	promptPrefix := a.systemPromptPrefix.Get()
 172	var instructions strings.Builder
 173
 174	for _, server := range mcp.GetStates() {
 175		if server.State != mcp.StateConnected {
 176			continue
 177		}
 178		if s := server.Client.InitializeResult().Instructions; s != "" {
 179			instructions.WriteString(s)
 180			instructions.WriteString("\n\n")
 181		}
 182	}
 183
 184	if s := instructions.String(); s != "" {
 185		systemPrompt += "\n\n<mcp-instructions>\n" + s + "\n</mcp-instructions>"
 186	}
 187
 188	if len(agentTools) > 0 {
 189		// Add Anthropic caching to the last tool.
 190		agentTools[len(agentTools)-1].SetProviderOptions(a.getCacheControlOptions())
 191	}
 192
 193	agent := fantasy.NewAgent(
 194		largeModel.Model,
 195		fantasy.WithSystemPrompt(systemPrompt),
 196		fantasy.WithTools(agentTools...),
 197	)
 198
 199	sessionLock := sync.Mutex{}
 200	currentSession, err := a.sessions.Get(ctx, call.SessionID)
 201	if err != nil {
 202		return nil, fmt.Errorf("failed to get session: %w", err)
 203	}
 204
 205	msgs, err := a.getSessionMessages(ctx, currentSession)
 206	if err != nil {
 207		return nil, fmt.Errorf("failed to get session messages: %w", err)
 208	}
 209
 210	var wg sync.WaitGroup
 211	// Generate title if first message.
 212	if len(msgs) == 0 {
 213		titleCtx := ctx // Copy to avoid race with ctx reassignment below.
 214		wg.Go(func() {
 215			a.generateTitle(titleCtx, call.SessionID, call.Prompt)
 216		})
 217	}
 218	defer wg.Wait()
 219
 220	// Add the user message to the session.
 221	_, err = a.createUserMessage(ctx, call)
 222	if err != nil {
 223		return nil, err
 224	}
 225
 226	// Add the session to the context.
 227	ctx = context.WithValue(ctx, tools.SessionIDContextKey, call.SessionID)
 228
 229	genCtx, cancel := context.WithCancel(ctx)
 230	a.activeRequests.Set(call.SessionID, cancel)
 231
 232	defer cancel()
 233	defer a.activeRequests.Del(call.SessionID)
 234
 235	history, files := a.preparePrompt(msgs, call.Attachments...)
 236
 237	startTime := time.Now()
 238	a.eventPromptSent(call.SessionID)
 239
 240	var currentAssistant *message.Message
 241	var shouldSummarize bool
 242	result, err := agent.Stream(genCtx, fantasy.AgentStreamCall{
 243		Prompt:           message.PromptWithTextAttachments(call.Prompt, call.Attachments),
 244		Files:            files,
 245		Messages:         history,
 246		ProviderOptions:  call.ProviderOptions,
 247		MaxOutputTokens:  &call.MaxOutputTokens,
 248		TopP:             call.TopP,
 249		Temperature:      call.Temperature,
 250		PresencePenalty:  call.PresencePenalty,
 251		TopK:             call.TopK,
 252		FrequencyPenalty: call.FrequencyPenalty,
 253		PrepareStep: func(callContext context.Context, options fantasy.PrepareStepFunctionOptions) (_ context.Context, prepared fantasy.PrepareStepResult, err error) {
 254			prepared.Messages = options.Messages
 255			for i := range prepared.Messages {
 256				prepared.Messages[i].ProviderOptions = nil
 257			}
 258
 259			queuedCalls, _ := a.messageQueue.Get(call.SessionID)
 260			a.messageQueue.Del(call.SessionID)
 261			for _, queued := range queuedCalls {
 262				userMessage, createErr := a.createUserMessage(callContext, queued)
 263				if createErr != nil {
 264					return callContext, prepared, createErr
 265				}
 266				prepared.Messages = append(prepared.Messages, userMessage.ToAIMessage()...)
 267			}
 268
 269			prepared.Messages = a.workaroundProviderMediaLimitations(prepared.Messages, largeModel)
 270
 271			lastSystemRoleInx := 0
 272			systemMessageUpdated := false
 273			for i, msg := range prepared.Messages {
 274				// Only add cache control to the last message.
 275				if msg.Role == fantasy.MessageRoleSystem {
 276					lastSystemRoleInx = i
 277				} else if !systemMessageUpdated {
 278					prepared.Messages[lastSystemRoleInx].ProviderOptions = a.getCacheControlOptions()
 279					systemMessageUpdated = true
 280				}
 281				// Than add cache control to the last 2 messages.
 282				if i > len(prepared.Messages)-3 {
 283					prepared.Messages[i].ProviderOptions = a.getCacheControlOptions()
 284				}
 285			}
 286
 287			if promptPrefix != "" {
 288				prepared.Messages = append([]fantasy.Message{fantasy.NewSystemMessage(promptPrefix)}, prepared.Messages...)
 289			}
 290
 291			var assistantMsg message.Message
 292			assistantMsg, err = a.messages.Create(callContext, call.SessionID, message.CreateMessageParams{
 293				Role:     message.Assistant,
 294				Parts:    []message.ContentPart{},
 295				Model:    largeModel.ModelCfg.Model,
 296				Provider: largeModel.ModelCfg.Provider,
 297			})
 298			if err != nil {
 299				return callContext, prepared, err
 300			}
 301			callContext = context.WithValue(callContext, tools.MessageIDContextKey, assistantMsg.ID)
 302			callContext = context.WithValue(callContext, tools.SupportsImagesContextKey, largeModel.CatwalkCfg.SupportsImages)
 303			callContext = context.WithValue(callContext, tools.ModelNameContextKey, largeModel.CatwalkCfg.Name)
 304			currentAssistant = &assistantMsg
 305			return callContext, prepared, err
 306		},
 307		OnReasoningStart: func(id string, reasoning fantasy.ReasoningContent) error {
 308			currentAssistant.AppendReasoningContent(reasoning.Text)
 309			return a.messages.Update(genCtx, *currentAssistant)
 310		},
 311		OnReasoningDelta: func(id string, text string) error {
 312			currentAssistant.AppendReasoningContent(text)
 313			return a.messages.Update(genCtx, *currentAssistant)
 314		},
 315		OnReasoningEnd: func(id string, reasoning fantasy.ReasoningContent) error {
 316			// handle anthropic signature
 317			if anthropicData, ok := reasoning.ProviderMetadata[anthropic.Name]; ok {
 318				if reasoning, ok := anthropicData.(*anthropic.ReasoningOptionMetadata); ok {
 319					currentAssistant.AppendReasoningSignature(reasoning.Signature)
 320				}
 321			}
 322			if googleData, ok := reasoning.ProviderMetadata[google.Name]; ok {
 323				if reasoning, ok := googleData.(*google.ReasoningMetadata); ok {
 324					currentAssistant.AppendThoughtSignature(reasoning.Signature, reasoning.ToolID)
 325				}
 326			}
 327			if openaiData, ok := reasoning.ProviderMetadata[openai.Name]; ok {
 328				if reasoning, ok := openaiData.(*openai.ResponsesReasoningMetadata); ok {
 329					currentAssistant.SetReasoningResponsesData(reasoning)
 330				}
 331			}
 332			currentAssistant.FinishThinking()
 333			return a.messages.Update(genCtx, *currentAssistant)
 334		},
 335		OnTextDelta: func(id string, text string) error {
 336			// Strip leading newline from initial text content. This is is
 337			// particularly important in non-interactive mode where leading
 338			// newlines are very visible.
 339			if len(currentAssistant.Parts) == 0 {
 340				text = strings.TrimPrefix(text, "\n")
 341			}
 342
 343			currentAssistant.AppendContent(text)
 344			return a.messages.Update(genCtx, *currentAssistant)
 345		},
 346		OnToolInputStart: func(id string, toolName string) error {
 347			toolCall := message.ToolCall{
 348				ID:               id,
 349				Name:             toolName,
 350				ProviderExecuted: false,
 351				Finished:         false,
 352			}
 353			currentAssistant.AddToolCall(toolCall)
 354			return a.messages.Update(genCtx, *currentAssistant)
 355		},
 356		OnRetry: func(err *fantasy.ProviderError, delay time.Duration) {
 357			// TODO: implement
 358		},
 359		OnToolCall: func(tc fantasy.ToolCallContent) error {
 360			toolCall := message.ToolCall{
 361				ID:               tc.ToolCallID,
 362				Name:             tc.ToolName,
 363				Input:            tc.Input,
 364				ProviderExecuted: false,
 365				Finished:         true,
 366			}
 367			currentAssistant.AddToolCall(toolCall)
 368			return a.messages.Update(genCtx, *currentAssistant)
 369		},
 370		OnToolResult: func(result fantasy.ToolResultContent) error {
 371			toolResult := a.convertToToolResult(result)
 372			_, createMsgErr := a.messages.Create(genCtx, currentAssistant.SessionID, message.CreateMessageParams{
 373				Role: message.Tool,
 374				Parts: []message.ContentPart{
 375					toolResult,
 376				},
 377			})
 378			return createMsgErr
 379		},
 380		OnStepFinish: func(stepResult fantasy.StepResult) error {
 381			finishReason := message.FinishReasonUnknown
 382			switch stepResult.FinishReason {
 383			case fantasy.FinishReasonLength:
 384				finishReason = message.FinishReasonMaxTokens
 385			case fantasy.FinishReasonStop:
 386				finishReason = message.FinishReasonEndTurn
 387			case fantasy.FinishReasonToolCalls:
 388				finishReason = message.FinishReasonToolUse
 389			}
 390			currentAssistant.AddFinish(finishReason, "", "")
 391			sessionLock.Lock()
 392			defer sessionLock.Unlock()
 393
 394			updatedSession, getSessionErr := a.sessions.Get(ctx, call.SessionID)
 395			if getSessionErr != nil {
 396				return getSessionErr
 397			}
 398			a.updateSessionUsage(largeModel, &updatedSession, stepResult.Usage, a.openrouterCost(stepResult.ProviderMetadata))
 399			_, sessionErr := a.sessions.Save(ctx, updatedSession)
 400			if sessionErr != nil {
 401				return sessionErr
 402			}
 403			currentSession = updatedSession
 404			return a.messages.Update(genCtx, *currentAssistant)
 405		},
 406		StopWhen: []fantasy.StopCondition{
 407			func(_ []fantasy.StepResult) bool {
 408				cw := int64(largeModel.CatwalkCfg.ContextWindow)
 409				tokens := currentSession.CompletionTokens + currentSession.PromptTokens
 410				remaining := cw - tokens
 411				var threshold int64
 412				if cw > largeContextWindowThreshold {
 413					threshold = largeContextWindowBuffer
 414				} else {
 415					threshold = int64(float64(cw) * smallContextWindowRatio)
 416				}
 417				if (remaining <= threshold) && !a.disableAutoSummarize {
 418					shouldSummarize = true
 419					return true
 420				}
 421				return false
 422			},
 423		},
 424	})
 425
 426	a.eventPromptResponded(call.SessionID, time.Since(startTime).Truncate(time.Second))
 427
 428	if err != nil {
 429		isCancelErr := errors.Is(err, context.Canceled)
 430		isPermissionErr := errors.Is(err, permission.ErrorPermissionDenied)
 431		if currentAssistant == nil {
 432			return result, err
 433		}
 434		// Ensure we finish thinking on error to close the reasoning state.
 435		currentAssistant.FinishThinking()
 436		toolCalls := currentAssistant.ToolCalls()
 437		// INFO: we use the parent context here because the genCtx has been cancelled.
 438		msgs, createErr := a.messages.List(ctx, currentAssistant.SessionID)
 439		if createErr != nil {
 440			return nil, createErr
 441		}
 442		for _, tc := range toolCalls {
 443			if !tc.Finished {
 444				tc.Finished = true
 445				tc.Input = "{}"
 446				currentAssistant.AddToolCall(tc)
 447				updateErr := a.messages.Update(ctx, *currentAssistant)
 448				if updateErr != nil {
 449					return nil, updateErr
 450				}
 451			}
 452
 453			found := false
 454			for _, msg := range msgs {
 455				if msg.Role == message.Tool {
 456					for _, tr := range msg.ToolResults() {
 457						if tr.ToolCallID == tc.ID {
 458							found = true
 459							break
 460						}
 461					}
 462				}
 463				if found {
 464					break
 465				}
 466			}
 467			if found {
 468				continue
 469			}
 470			content := "There was an error while executing the tool"
 471			if isCancelErr {
 472				content = "Tool execution canceled by user"
 473			} else if isPermissionErr {
 474				content = "User denied permission"
 475			}
 476			toolResult := message.ToolResult{
 477				ToolCallID: tc.ID,
 478				Name:       tc.Name,
 479				Content:    content,
 480				IsError:    true,
 481			}
 482			_, createErr = a.messages.Create(ctx, currentAssistant.SessionID, message.CreateMessageParams{
 483				Role: message.Tool,
 484				Parts: []message.ContentPart{
 485					toolResult,
 486				},
 487			})
 488			if createErr != nil {
 489				return nil, createErr
 490			}
 491		}
 492		var fantasyErr *fantasy.Error
 493		var providerErr *fantasy.ProviderError
 494		const defaultTitle = "Provider Error"
 495		linkStyle := lipgloss.NewStyle().Foreground(charmtone.Guac).Underline(true)
 496		if isCancelErr {
 497			currentAssistant.AddFinish(message.FinishReasonCanceled, "User canceled request", "")
 498		} else if isPermissionErr {
 499			currentAssistant.AddFinish(message.FinishReasonPermissionDenied, "User denied permission", "")
 500		} else if errors.Is(err, hyper.ErrNoCredits) {
 501			url := hyper.BaseURL()
 502			link := linkStyle.Hyperlink(url, "id=hyper").Render(url)
 503			currentAssistant.AddFinish(message.FinishReasonError, "No credits", "You're out of credits. Add more at "+link)
 504		} else if errors.As(err, &providerErr) {
 505			if providerErr.Message == "The requested model is not supported." {
 506				url := "https://github.com/settings/copilot/features"
 507				link := linkStyle.Hyperlink(url, "id=copilot").Render(url)
 508				currentAssistant.AddFinish(
 509					message.FinishReasonError,
 510					"Copilot model not enabled",
 511					fmt.Sprintf("%q is not enabled in Copilot. Go to the following page to enable it. Then, wait 5 minutes before trying again. %s", largeModel.CatwalkCfg.Name, link),
 512				)
 513			} else {
 514				currentAssistant.AddFinish(message.FinishReasonError, cmp.Or(stringext.Capitalize(providerErr.Title), defaultTitle), providerErr.Message)
 515			}
 516		} else if errors.As(err, &fantasyErr) {
 517			currentAssistant.AddFinish(message.FinishReasonError, cmp.Or(stringext.Capitalize(fantasyErr.Title), defaultTitle), fantasyErr.Message)
 518		} else {
 519			currentAssistant.AddFinish(message.FinishReasonError, defaultTitle, err.Error())
 520		}
 521		// Note: we use the parent context here because the genCtx has been
 522		// cancelled.
 523		updateErr := a.messages.Update(ctx, *currentAssistant)
 524		if updateErr != nil {
 525			return nil, updateErr
 526		}
 527		return nil, err
 528	}
 529
 530	if shouldSummarize {
 531		a.activeRequests.Del(call.SessionID)
 532		if summarizeErr := a.Summarize(genCtx, call.SessionID, call.ProviderOptions); summarizeErr != nil {
 533			return nil, summarizeErr
 534		}
 535		// If the agent wasn't done...
 536		if len(currentAssistant.ToolCalls()) > 0 {
 537			existing, ok := a.messageQueue.Get(call.SessionID)
 538			if !ok {
 539				existing = []SessionAgentCall{}
 540			}
 541			call.Prompt = fmt.Sprintf("The previous session was interrupted because it got too long, the initial user request was: `%s`", call.Prompt)
 542			existing = append(existing, call)
 543			a.messageQueue.Set(call.SessionID, existing)
 544		}
 545	}
 546
 547	// Release active request before processing queued messages.
 548	a.activeRequests.Del(call.SessionID)
 549	cancel()
 550
 551	queuedMessages, ok := a.messageQueue.Get(call.SessionID)
 552	if !ok || len(queuedMessages) == 0 {
 553		return result, err
 554	}
 555	// There are queued messages restart the loop.
 556	firstQueuedMessage := queuedMessages[0]
 557	a.messageQueue.Set(call.SessionID, queuedMessages[1:])
 558	return a.Run(ctx, firstQueuedMessage)
 559}
 560
 561func (a *sessionAgent) Summarize(ctx context.Context, sessionID string, opts fantasy.ProviderOptions) error {
 562	if a.IsSessionBusy(sessionID) {
 563		return ErrSessionBusy
 564	}
 565
 566	// Copy mutable fields under lock to avoid races with SetModels.
 567	largeModel := a.largeModel.Get()
 568	systemPromptPrefix := a.systemPromptPrefix.Get()
 569
 570	currentSession, err := a.sessions.Get(ctx, sessionID)
 571	if err != nil {
 572		return fmt.Errorf("failed to get session: %w", err)
 573	}
 574	msgs, err := a.getSessionMessages(ctx, currentSession)
 575	if err != nil {
 576		return err
 577	}
 578	if len(msgs) == 0 {
 579		// Nothing to summarize.
 580		return nil
 581	}
 582
 583	aiMsgs, _ := a.preparePrompt(msgs)
 584
 585	genCtx, cancel := context.WithCancel(ctx)
 586	a.activeRequests.Set(sessionID, cancel)
 587	defer a.activeRequests.Del(sessionID)
 588	defer cancel()
 589
 590	agent := fantasy.NewAgent(largeModel.Model,
 591		fantasy.WithSystemPrompt(string(summaryPrompt)),
 592	)
 593	summaryMessage, err := a.messages.Create(ctx, sessionID, message.CreateMessageParams{
 594		Role:             message.Assistant,
 595		Model:            largeModel.Model.Model(),
 596		Provider:         largeModel.Model.Provider(),
 597		IsSummaryMessage: true,
 598	})
 599	if err != nil {
 600		return err
 601	}
 602
 603	summaryPromptText := buildSummaryPrompt(currentSession.Todos)
 604
 605	resp, err := agent.Stream(genCtx, fantasy.AgentStreamCall{
 606		Prompt:          summaryPromptText,
 607		Messages:        aiMsgs,
 608		ProviderOptions: opts,
 609		PrepareStep: func(callContext context.Context, options fantasy.PrepareStepFunctionOptions) (_ context.Context, prepared fantasy.PrepareStepResult, err error) {
 610			prepared.Messages = options.Messages
 611			if systemPromptPrefix != "" {
 612				prepared.Messages = append([]fantasy.Message{fantasy.NewSystemMessage(systemPromptPrefix)}, prepared.Messages...)
 613			}
 614			return callContext, prepared, nil
 615		},
 616		OnReasoningDelta: func(id string, text string) error {
 617			summaryMessage.AppendReasoningContent(text)
 618			return a.messages.Update(genCtx, summaryMessage)
 619		},
 620		OnReasoningEnd: func(id string, reasoning fantasy.ReasoningContent) error {
 621			// Handle anthropic signature.
 622			if anthropicData, ok := reasoning.ProviderMetadata["anthropic"]; ok {
 623				if signature, ok := anthropicData.(*anthropic.ReasoningOptionMetadata); ok && signature.Signature != "" {
 624					summaryMessage.AppendReasoningSignature(signature.Signature)
 625				}
 626			}
 627			summaryMessage.FinishThinking()
 628			return a.messages.Update(genCtx, summaryMessage)
 629		},
 630		OnTextDelta: func(id, text string) error {
 631			summaryMessage.AppendContent(text)
 632			return a.messages.Update(genCtx, summaryMessage)
 633		},
 634	})
 635	if err != nil {
 636		isCancelErr := errors.Is(err, context.Canceled)
 637		if isCancelErr {
 638			// User cancelled summarize we need to remove the summary message.
 639			deleteErr := a.messages.Delete(ctx, summaryMessage.ID)
 640			return deleteErr
 641		}
 642		// Summarization failed. Mark the message as finished with an error
 643		// so the UI doesn't get stuck in a "Summarizing" state forever.
 644		summaryMessage.FinishThinking()
 645		summaryMessage.AddFinish(message.FinishReasonError, "Summarization failed", err.Error())
 646		if updateErr := a.messages.Update(ctx, summaryMessage); updateErr != nil {
 647			return updateErr
 648		}
 649		return err
 650	}
 651
 652	summaryMessage.AddFinish(message.FinishReasonEndTurn, "", "")
 653	err = a.messages.Update(genCtx, summaryMessage)
 654	if err != nil {
 655		return err
 656	}
 657
 658	var openrouterCost *float64
 659	for _, step := range resp.Steps {
 660		stepCost := a.openrouterCost(step.ProviderMetadata)
 661		if stepCost != nil {
 662			newCost := *stepCost
 663			if openrouterCost != nil {
 664				newCost += *openrouterCost
 665			}
 666			openrouterCost = &newCost
 667		}
 668	}
 669
 670	a.updateSessionUsage(largeModel, &currentSession, resp.TotalUsage, openrouterCost)
 671
 672	// Just in case, get just the last usage info.
 673	usage := resp.Response.Usage
 674	currentSession.SummaryMessageID = summaryMessage.ID
 675	currentSession.CompletionTokens = usage.OutputTokens
 676	currentSession.PromptTokens = 0
 677	_, err = a.sessions.Save(genCtx, currentSession)
 678	return err
 679}
 680
 681func (a *sessionAgent) getCacheControlOptions() fantasy.ProviderOptions {
 682	if t, _ := strconv.ParseBool(os.Getenv("CRUSH_DISABLE_ANTHROPIC_CACHE")); t {
 683		return fantasy.ProviderOptions{}
 684	}
 685	return fantasy.ProviderOptions{
 686		anthropic.Name: &anthropic.ProviderCacheControlOptions{
 687			CacheControl: anthropic.CacheControl{Type: "ephemeral"},
 688		},
 689		bedrock.Name: &anthropic.ProviderCacheControlOptions{
 690			CacheControl: anthropic.CacheControl{Type: "ephemeral"},
 691		},
 692		vercel.Name: &anthropic.ProviderCacheControlOptions{
 693			CacheControl: anthropic.CacheControl{Type: "ephemeral"},
 694		},
 695	}
 696}
 697
 698func (a *sessionAgent) createUserMessage(ctx context.Context, call SessionAgentCall) (message.Message, error) {
 699	parts := []message.ContentPart{message.TextContent{Text: call.Prompt}}
 700	var attachmentParts []message.ContentPart
 701	for _, attachment := range call.Attachments {
 702		attachmentParts = append(attachmentParts, message.BinaryContent{Path: attachment.FilePath, MIMEType: attachment.MimeType, Data: attachment.Content})
 703	}
 704	parts = append(parts, attachmentParts...)
 705	msg, err := a.messages.Create(ctx, call.SessionID, message.CreateMessageParams{
 706		Role:  message.User,
 707		Parts: parts,
 708	})
 709	if err != nil {
 710		return message.Message{}, fmt.Errorf("failed to create user message: %w", err)
 711	}
 712	return msg, nil
 713}
 714
 715func (a *sessionAgent) preparePrompt(msgs []message.Message, attachments ...message.Attachment) ([]fantasy.Message, []fantasy.FilePart) {
 716	var history []fantasy.Message
 717	if !a.isSubAgent {
 718		history = append(history, fantasy.NewUserMessage(
 719			fmt.Sprintf("<system_reminder>%s</system_reminder>",
 720				`This is a reminder that your todo list is currently empty. DO NOT mention this to the user explicitly because they are already aware.
 721If you are working on tasks that would benefit from a todo list please use the "todos" tool to create one.
 722If not, please feel free to ignore. Again do not mention this message to the user.`,
 723			),
 724		))
 725	}
 726	for _, m := range msgs {
 727		if len(m.Parts) == 0 {
 728			continue
 729		}
 730		// Assistant message without content or tool calls (cancelled before it
 731		// returned anything).
 732		if m.Role == message.Assistant && len(m.ToolCalls()) == 0 && m.Content().Text == "" && m.ReasoningContent().String() == "" {
 733			continue
 734		}
 735		history = append(history, m.ToAIMessage()...)
 736	}
 737
 738	var files []fantasy.FilePart
 739	for _, attachment := range attachments {
 740		if attachment.IsText() {
 741			continue
 742		}
 743		files = append(files, fantasy.FilePart{
 744			Filename:  attachment.FileName,
 745			Data:      attachment.Content,
 746			MediaType: attachment.MimeType,
 747		})
 748	}
 749
 750	return history, files
 751}
 752
 753func (a *sessionAgent) getSessionMessages(ctx context.Context, session session.Session) ([]message.Message, error) {
 754	msgs, err := a.messages.List(ctx, session.ID)
 755	if err != nil {
 756		return nil, fmt.Errorf("failed to list messages: %w", err)
 757	}
 758
 759	if session.SummaryMessageID != "" {
 760		summaryMsgIndex := -1
 761		for i, msg := range msgs {
 762			if msg.ID == session.SummaryMessageID {
 763				summaryMsgIndex = i
 764				break
 765			}
 766		}
 767		if summaryMsgIndex != -1 {
 768			msgs = msgs[summaryMsgIndex:]
 769			msgs[0].Role = message.User
 770		}
 771	}
 772	return msgs, nil
 773}
 774
 775// generateTitle generates a session titled based on the initial prompt.
 776func (a *sessionAgent) generateTitle(ctx context.Context, sessionID string, userPrompt string) {
 777	if userPrompt == "" {
 778		return
 779	}
 780
 781	smallModel := a.smallModel.Get()
 782	largeModel := a.largeModel.Get()
 783	systemPromptPrefix := a.systemPromptPrefix.Get()
 784
 785	var maxOutputTokens int64 = 40
 786	if smallModel.CatwalkCfg.CanReason {
 787		maxOutputTokens = smallModel.CatwalkCfg.DefaultMaxTokens
 788	}
 789
 790	newAgent := func(m fantasy.LanguageModel, p []byte, tok int64) fantasy.Agent {
 791		return fantasy.NewAgent(m,
 792			fantasy.WithSystemPrompt(string(p)+"\n /no_think"),
 793			fantasy.WithMaxOutputTokens(tok),
 794		)
 795	}
 796
 797	streamCall := fantasy.AgentStreamCall{
 798		Prompt: fmt.Sprintf("Generate a concise title for the following content:\n\n%s\n <think>\n\n</think>", userPrompt),
 799		PrepareStep: func(callCtx context.Context, opts fantasy.PrepareStepFunctionOptions) (_ context.Context, prepared fantasy.PrepareStepResult, err error) {
 800			prepared.Messages = opts.Messages
 801			if systemPromptPrefix != "" {
 802				prepared.Messages = append([]fantasy.Message{
 803					fantasy.NewSystemMessage(systemPromptPrefix),
 804				}, prepared.Messages...)
 805			}
 806			return callCtx, prepared, nil
 807		},
 808	}
 809
 810	// Use the small model to generate the title.
 811	model := smallModel
 812	agent := newAgent(model.Model, titlePrompt, maxOutputTokens)
 813	resp, err := agent.Stream(ctx, streamCall)
 814	if err == nil {
 815		// We successfully generated a title with the small model.
 816		slog.Debug("Generated title with small model")
 817	} else {
 818		// It didn't work. Let's try with the big model.
 819		slog.Error("Error generating title with small model; trying big model", "err", err)
 820		model = largeModel
 821		agent = newAgent(model.Model, titlePrompt, maxOutputTokens)
 822		resp, err = agent.Stream(ctx, streamCall)
 823		if err == nil {
 824			slog.Debug("Generated title with large model")
 825		} else {
 826			// Welp, the large model didn't work either. Use the default
 827			// session name and return.
 828			slog.Error("Error generating title with large model", "err", err)
 829			saveErr := a.sessions.UpdateTitleAndUsage(ctx, sessionID, defaultSessionName, 0, 0, 0)
 830			if saveErr != nil {
 831				slog.Error("Failed to save session title and usage", "error", saveErr)
 832			}
 833			return
 834		}
 835	}
 836
 837	if resp == nil {
 838		// Actually, we didn't get a response so we can't. Use the default
 839		// session name and return.
 840		slog.Error("Response is nil; can't generate title")
 841		saveErr := a.sessions.UpdateTitleAndUsage(ctx, sessionID, defaultSessionName, 0, 0, 0)
 842		if saveErr != nil {
 843			slog.Error("Failed to save session title and usage", "error", saveErr)
 844		}
 845		return
 846	}
 847
 848	// Clean up title.
 849	var title string
 850	title = strings.ReplaceAll(resp.Response.Content.Text(), "\n", " ")
 851
 852	// Remove thinking tags if present.
 853	title = thinkTagRegex.ReplaceAllString(title, "")
 854
 855	title = strings.TrimSpace(title)
 856	if title == "" {
 857		slog.Debug("Empty title; using fallback")
 858		title = defaultSessionName
 859	}
 860
 861	// Calculate usage and cost.
 862	var openrouterCost *float64
 863	for _, step := range resp.Steps {
 864		stepCost := a.openrouterCost(step.ProviderMetadata)
 865		if stepCost != nil {
 866			newCost := *stepCost
 867			if openrouterCost != nil {
 868				newCost += *openrouterCost
 869			}
 870			openrouterCost = &newCost
 871		}
 872	}
 873
 874	modelConfig := model.CatwalkCfg
 875	cost := modelConfig.CostPer1MInCached/1e6*float64(resp.TotalUsage.CacheCreationTokens) +
 876		modelConfig.CostPer1MOutCached/1e6*float64(resp.TotalUsage.CacheReadTokens) +
 877		modelConfig.CostPer1MIn/1e6*float64(resp.TotalUsage.InputTokens) +
 878		modelConfig.CostPer1MOut/1e6*float64(resp.TotalUsage.OutputTokens)
 879
 880	// Use override cost if available (e.g., from OpenRouter).
 881	if openrouterCost != nil {
 882		cost = *openrouterCost
 883	}
 884
 885	promptTokens := resp.TotalUsage.InputTokens + resp.TotalUsage.CacheCreationTokens
 886	completionTokens := resp.TotalUsage.OutputTokens
 887
 888	// Atomically update only title and usage fields to avoid overriding other
 889	// concurrent session updates.
 890	saveErr := a.sessions.UpdateTitleAndUsage(ctx, sessionID, title, promptTokens, completionTokens, cost)
 891	if saveErr != nil {
 892		slog.Error("Failed to save session title and usage", "error", saveErr)
 893		return
 894	}
 895}
 896
 897func (a *sessionAgent) openrouterCost(metadata fantasy.ProviderMetadata) *float64 {
 898	openrouterMetadata, ok := metadata[openrouter.Name]
 899	if !ok {
 900		return nil
 901	}
 902
 903	opts, ok := openrouterMetadata.(*openrouter.ProviderMetadata)
 904	if !ok {
 905		return nil
 906	}
 907	return &opts.Usage.Cost
 908}
 909
 910func (a *sessionAgent) updateSessionUsage(model Model, session *session.Session, usage fantasy.Usage, overrideCost *float64) {
 911	modelConfig := model.CatwalkCfg
 912	cost := modelConfig.CostPer1MInCached/1e6*float64(usage.CacheCreationTokens) +
 913		modelConfig.CostPer1MOutCached/1e6*float64(usage.CacheReadTokens) +
 914		modelConfig.CostPer1MIn/1e6*float64(usage.InputTokens) +
 915		modelConfig.CostPer1MOut/1e6*float64(usage.OutputTokens)
 916
 917	a.eventTokensUsed(session.ID, model, usage, cost)
 918
 919	if overrideCost != nil {
 920		session.Cost += *overrideCost
 921	} else {
 922		session.Cost += cost
 923	}
 924
 925	session.CompletionTokens = usage.OutputTokens
 926	session.PromptTokens = usage.InputTokens + usage.CacheReadTokens
 927}
 928
 929func (a *sessionAgent) Cancel(sessionID string) {
 930	// Cancel regular requests. Don't use Take() here - we need the entry to
 931	// remain in activeRequests so IsBusy() returns true until the goroutine
 932	// fully completes (including error handling that may access the DB).
 933	// The defer in processRequest will clean up the entry.
 934	if cancel, ok := a.activeRequests.Get(sessionID); ok && cancel != nil {
 935		slog.Debug("Request cancellation initiated", "session_id", sessionID)
 936		cancel()
 937	}
 938
 939	// Also check for summarize requests.
 940	if cancel, ok := a.activeRequests.Get(sessionID + "-summarize"); ok && cancel != nil {
 941		slog.Debug("Summarize cancellation initiated", "session_id", sessionID)
 942		cancel()
 943	}
 944
 945	if a.QueuedPrompts(sessionID) > 0 {
 946		slog.Debug("Clearing queued prompts", "session_id", sessionID)
 947		a.messageQueue.Del(sessionID)
 948	}
 949}
 950
 951func (a *sessionAgent) ClearQueue(sessionID string) {
 952	if a.QueuedPrompts(sessionID) > 0 {
 953		slog.Debug("Clearing queued prompts", "session_id", sessionID)
 954		a.messageQueue.Del(sessionID)
 955	}
 956}
 957
 958func (a *sessionAgent) CancelAll() {
 959	if !a.IsBusy() {
 960		return
 961	}
 962	for key := range a.activeRequests.Seq2() {
 963		a.Cancel(key) // key is sessionID
 964	}
 965
 966	timeout := time.After(5 * time.Second)
 967	for a.IsBusy() {
 968		select {
 969		case <-timeout:
 970			return
 971		default:
 972			time.Sleep(200 * time.Millisecond)
 973		}
 974	}
 975}
 976
 977func (a *sessionAgent) IsBusy() bool {
 978	var busy bool
 979	for cancelFunc := range a.activeRequests.Seq() {
 980		if cancelFunc != nil {
 981			busy = true
 982			break
 983		}
 984	}
 985	return busy
 986}
 987
 988func (a *sessionAgent) IsSessionBusy(sessionID string) bool {
 989	_, busy := a.activeRequests.Get(sessionID)
 990	return busy
 991}
 992
 993func (a *sessionAgent) QueuedPrompts(sessionID string) int {
 994	l, ok := a.messageQueue.Get(sessionID)
 995	if !ok {
 996		return 0
 997	}
 998	return len(l)
 999}
1000
1001func (a *sessionAgent) QueuedPromptsList(sessionID string) []string {
1002	l, ok := a.messageQueue.Get(sessionID)
1003	if !ok {
1004		return nil
1005	}
1006	prompts := make([]string, len(l))
1007	for i, call := range l {
1008		prompts[i] = call.Prompt
1009	}
1010	return prompts
1011}
1012
1013func (a *sessionAgent) SetModels(large Model, small Model) {
1014	a.largeModel.Set(large)
1015	a.smallModel.Set(small)
1016}
1017
1018func (a *sessionAgent) SetTools(tools []fantasy.AgentTool) {
1019	a.tools.SetSlice(tools)
1020}
1021
1022func (a *sessionAgent) SetSystemPrompt(systemPrompt string) {
1023	a.systemPrompt.Set(systemPrompt)
1024}
1025
1026func (a *sessionAgent) Model() Model {
1027	return a.largeModel.Get()
1028}
1029
1030// convertToToolResult converts a fantasy tool result to a message tool result.
1031func (a *sessionAgent) convertToToolResult(result fantasy.ToolResultContent) message.ToolResult {
1032	baseResult := message.ToolResult{
1033		ToolCallID: result.ToolCallID,
1034		Name:       result.ToolName,
1035		Metadata:   result.ClientMetadata,
1036	}
1037
1038	switch result.Result.GetType() {
1039	case fantasy.ToolResultContentTypeText:
1040		if r, ok := fantasy.AsToolResultOutputType[fantasy.ToolResultOutputContentText](result.Result); ok {
1041			baseResult.Content = r.Text
1042		}
1043	case fantasy.ToolResultContentTypeError:
1044		if r, ok := fantasy.AsToolResultOutputType[fantasy.ToolResultOutputContentError](result.Result); ok {
1045			baseResult.Content = r.Error.Error()
1046			baseResult.IsError = true
1047		}
1048	case fantasy.ToolResultContentTypeMedia:
1049		if r, ok := fantasy.AsToolResultOutputType[fantasy.ToolResultOutputContentMedia](result.Result); ok {
1050			content := r.Text
1051			if content == "" {
1052				content = fmt.Sprintf("Loaded %s content", r.MediaType)
1053			}
1054			baseResult.Content = content
1055			baseResult.Data = r.Data
1056			baseResult.MIMEType = r.MediaType
1057		}
1058	}
1059
1060	return baseResult
1061}
1062
1063// workaroundProviderMediaLimitations converts media content in tool results to
1064// user messages for providers that don't natively support images in tool results.
1065//
1066// Problem: OpenAI, Google, OpenRouter, and other OpenAI-compatible providers
1067// don't support sending images/media in tool result messages - they only accept
1068// text in tool results. However, they DO support images in user messages.
1069//
1070// If we send media in tool results to these providers, the API returns an error.
1071//
1072// Solution: For these providers, we:
1073//  1. Replace the media in the tool result with a text placeholder
1074//  2. Inject a user message immediately after with the image as a file attachment
1075//  3. This maintains the tool execution flow while working around API limitations
1076//
1077// Anthropic and Bedrock support images natively in tool results, so we skip
1078// this workaround for them.
1079//
1080// Example transformation:
1081//
1082//	BEFORE: [tool result: image data]
1083//	AFTER:  [tool result: "Image loaded - see attached"], [user: image attachment]
1084func (a *sessionAgent) workaroundProviderMediaLimitations(messages []fantasy.Message, largeModel Model) []fantasy.Message {
1085	providerSupportsMedia := largeModel.ModelCfg.Provider == string(catwalk.InferenceProviderAnthropic) ||
1086		largeModel.ModelCfg.Provider == string(catwalk.InferenceProviderBedrock)
1087
1088	if providerSupportsMedia {
1089		return messages
1090	}
1091
1092	convertedMessages := make([]fantasy.Message, 0, len(messages))
1093
1094	for _, msg := range messages {
1095		if msg.Role != fantasy.MessageRoleTool {
1096			convertedMessages = append(convertedMessages, msg)
1097			continue
1098		}
1099
1100		textParts := make([]fantasy.MessagePart, 0, len(msg.Content))
1101		var mediaFiles []fantasy.FilePart
1102
1103		for _, part := range msg.Content {
1104			toolResult, ok := fantasy.AsMessagePart[fantasy.ToolResultPart](part)
1105			if !ok {
1106				textParts = append(textParts, part)
1107				continue
1108			}
1109
1110			if media, ok := fantasy.AsToolResultOutputType[fantasy.ToolResultOutputContentMedia](toolResult.Output); ok {
1111				decoded, err := base64.StdEncoding.DecodeString(media.Data)
1112				if err != nil {
1113					slog.Warn("Failed to decode media data", "error", err)
1114					textParts = append(textParts, part)
1115					continue
1116				}
1117
1118				mediaFiles = append(mediaFiles, fantasy.FilePart{
1119					Data:      decoded,
1120					MediaType: media.MediaType,
1121					Filename:  fmt.Sprintf("tool-result-%s", toolResult.ToolCallID),
1122				})
1123
1124				textParts = append(textParts, fantasy.ToolResultPart{
1125					ToolCallID: toolResult.ToolCallID,
1126					Output: fantasy.ToolResultOutputContentText{
1127						Text: "[Image/media content loaded - see attached file]",
1128					},
1129					ProviderOptions: toolResult.ProviderOptions,
1130				})
1131			} else {
1132				textParts = append(textParts, part)
1133			}
1134		}
1135
1136		convertedMessages = append(convertedMessages, fantasy.Message{
1137			Role:    fantasy.MessageRoleTool,
1138			Content: textParts,
1139		})
1140
1141		if len(mediaFiles) > 0 {
1142			convertedMessages = append(convertedMessages, fantasy.NewUserMessage(
1143				"Here is the media content from the tool result:",
1144				mediaFiles...,
1145			))
1146		}
1147	}
1148
1149	return convertedMessages
1150}
1151
1152// buildSummaryPrompt constructs the prompt text for session summarization.
1153func buildSummaryPrompt(todos []session.Todo) string {
1154	var sb strings.Builder
1155	sb.WriteString("Provide a detailed summary of our conversation above.")
1156	if len(todos) > 0 {
1157		sb.WriteString("\n\n## Current Todo List\n\n")
1158		for _, t := range todos {
1159			fmt.Fprintf(&sb, "- [%s] %s\n", t.Status, t.Content)
1160		}
1161		sb.WriteString("\nInclude these tasks and their statuses in your summary. ")
1162		sb.WriteString("Instruct the resuming assistant to use the `todos` tool to continue tracking progress on these tasks.")
1163	}
1164	return sb.String()
1165}