agent.go

   1// Package agent is the core orchestration layer for Crush AI agents.
   2//
   3// It provides session-based AI agent functionality for managing
   4// conversations, tool execution, and message handling. It coordinates
   5// interactions between language models, messages, sessions, and tools while
   6// handling features like automatic summarization, queuing, and token
   7// management.
   8package agent
   9
  10import (
  11	"cmp"
  12	"context"
  13	_ "embed"
  14	"encoding/base64"
  15	"errors"
  16	"fmt"
  17	"log/slog"
  18	"net/http"
  19	"os"
  20	"regexp"
  21	"strconv"
  22	"strings"
  23	"sync"
  24	"time"
  25
  26	"charm.land/catwalk/pkg/catwalk"
  27	"charm.land/fantasy"
  28	"charm.land/fantasy/providers/anthropic"
  29	"charm.land/fantasy/providers/bedrock"
  30	"charm.land/fantasy/providers/google"
  31	"charm.land/fantasy/providers/openai"
  32	"charm.land/fantasy/providers/openrouter"
  33	"charm.land/fantasy/providers/vercel"
  34	"charm.land/lipgloss/v2"
  35	"github.com/charmbracelet/crush/internal/agent/hyper"
  36	"github.com/charmbracelet/crush/internal/agent/notify"
  37	"github.com/charmbracelet/crush/internal/agent/tools"
  38	"github.com/charmbracelet/crush/internal/agent/tools/mcp"
  39	"github.com/charmbracelet/crush/internal/config"
  40	"github.com/charmbracelet/crush/internal/csync"
  41	"github.com/charmbracelet/crush/internal/message"
  42	"github.com/charmbracelet/crush/internal/pubsub"
  43	"github.com/charmbracelet/crush/internal/session"
  44	"github.com/charmbracelet/crush/internal/stringext"
  45	"github.com/charmbracelet/crush/internal/version"
  46	"github.com/charmbracelet/x/exp/charmtone"
  47)
  48
  49const (
  50	DefaultSessionName = "Untitled Session"
  51
  52	// Constants for auto-summarization thresholds
  53	largeContextWindowThreshold = 200_000
  54	largeContextWindowBuffer    = 20_000
  55	smallContextWindowRatio     = 0.2
  56)
  57
  58var userAgent = fmt.Sprintf("Charm-Crush/%s (https://charm.land/crush)", version.Version)
  59
  60//go:embed templates/title.md
  61var titlePrompt []byte
  62
  63//go:embed templates/summary.md
  64var summaryPrompt []byte
  65
  66// Used to remove <think> tags from generated titles.
  67var (
  68	thinkTagRegex       = regexp.MustCompile(`(?s)<think>.*?</think>`)
  69	orphanThinkTagRegex = regexp.MustCompile(`</?think>`)
  70)
  71
  72type SessionAgentCall struct {
  73	SessionID        string
  74	Prompt           string
  75	ProviderOptions  fantasy.ProviderOptions
  76	Attachments      []message.Attachment
  77	MaxOutputTokens  int64
  78	Temperature      *float64
  79	TopP             *float64
  80	TopK             *int64
  81	FrequencyPenalty *float64
  82	PresencePenalty  *float64
  83	NonInteractive   bool
  84}
  85
  86type SessionAgent interface {
  87	Run(context.Context, SessionAgentCall) (*fantasy.AgentResult, error)
  88	SetModels(large Model, small Model)
  89	SetTools(tools []fantasy.AgentTool)
  90	SetSystemPrompt(systemPrompt string)
  91	Cancel(sessionID string)
  92	CancelAll()
  93	IsSessionBusy(sessionID string) bool
  94	IsBusy() bool
  95	QueuedPrompts(sessionID string) int
  96	QueuedPromptsList(sessionID string) []string
  97	ClearQueue(sessionID string)
  98	Summarize(context.Context, string, fantasy.ProviderOptions) error
  99	Model() Model
 100}
 101
 102type Model struct {
 103	Model      fantasy.LanguageModel
 104	CatwalkCfg catwalk.Model
 105	ModelCfg   config.SelectedModel
 106	FlatRate   bool
 107}
 108
 109type sessionAgent struct {
 110	largeModel         *csync.Value[Model]
 111	smallModel         *csync.Value[Model]
 112	systemPromptPrefix *csync.Value[string]
 113	systemPrompt       *csync.Value[string]
 114	tools              *csync.Slice[fantasy.AgentTool]
 115
 116	isSubAgent           bool
 117	sessions             session.Service
 118	messages             message.Service
 119	disableAutoSummarize bool
 120	isYolo               bool
 121	notify               pubsub.Publisher[notify.Notification]
 122
 123	messageQueue   *csync.Map[string, []SessionAgentCall]
 124	activeRequests *csync.Map[string, context.CancelFunc]
 125}
 126
 127type SessionAgentOptions struct {
 128	LargeModel           Model
 129	SmallModel           Model
 130	SystemPromptPrefix   string
 131	SystemPrompt         string
 132	IsSubAgent           bool
 133	DisableAutoSummarize bool
 134	IsYolo               bool
 135	Sessions             session.Service
 136	Messages             message.Service
 137	Tools                []fantasy.AgentTool
 138	Notify               pubsub.Publisher[notify.Notification]
 139}
 140
 141func NewSessionAgent(
 142	opts SessionAgentOptions,
 143) SessionAgent {
 144	return &sessionAgent{
 145		largeModel:           csync.NewValue(opts.LargeModel),
 146		smallModel:           csync.NewValue(opts.SmallModel),
 147		systemPromptPrefix:   csync.NewValue(opts.SystemPromptPrefix),
 148		systemPrompt:         csync.NewValue(opts.SystemPrompt),
 149		isSubAgent:           opts.IsSubAgent,
 150		sessions:             opts.Sessions,
 151		messages:             opts.Messages,
 152		disableAutoSummarize: opts.DisableAutoSummarize,
 153		tools:                csync.NewSliceFrom(opts.Tools),
 154		isYolo:               opts.IsYolo,
 155		notify:               opts.Notify,
 156		messageQueue:         csync.NewMap[string, []SessionAgentCall](),
 157		activeRequests:       csync.NewMap[string, context.CancelFunc](),
 158	}
 159}
 160
 161func (a *sessionAgent) Run(ctx context.Context, call SessionAgentCall) (*fantasy.AgentResult, error) {
 162	if call.Prompt == "" && !message.ContainsTextAttachment(call.Attachments) {
 163		return nil, ErrEmptyPrompt
 164	}
 165	if call.SessionID == "" {
 166		return nil, ErrSessionMissing
 167	}
 168
 169	// Queue the message if busy
 170	if a.IsSessionBusy(call.SessionID) {
 171		existing, ok := a.messageQueue.Get(call.SessionID)
 172		if !ok {
 173			existing = []SessionAgentCall{}
 174		}
 175		existing = append(existing, call)
 176		a.messageQueue.Set(call.SessionID, existing)
 177		return nil, nil
 178	}
 179
 180	// Copy mutable fields under lock to avoid races with SetTools/SetModels.
 181	agentTools := a.tools.Copy()
 182	largeModel := a.largeModel.Get()
 183	systemPrompt := a.systemPrompt.Get()
 184	promptPrefix := a.systemPromptPrefix.Get()
 185	var instructions strings.Builder
 186
 187	for _, server := range mcp.GetStates() {
 188		if server.State != mcp.StateConnected {
 189			continue
 190		}
 191		if s := server.Client.InitializeResult().Instructions; s != "" {
 192			instructions.WriteString(s)
 193			instructions.WriteString("\n\n")
 194		}
 195	}
 196
 197	if s := instructions.String(); s != "" {
 198		systemPrompt += "\n\n<mcp-instructions>\n" + s + "\n</mcp-instructions>"
 199	}
 200
 201	if len(agentTools) > 0 {
 202		// Add Anthropic caching to the last tool.
 203		agentTools[len(agentTools)-1].SetProviderOptions(a.getCacheControlOptions())
 204	}
 205
 206	agent := fantasy.NewAgent(
 207		largeModel.Model,
 208		fantasy.WithSystemPrompt(systemPrompt),
 209		fantasy.WithTools(agentTools...),
 210		fantasy.WithUserAgent(userAgent),
 211	)
 212
 213	sessionLock := sync.Mutex{}
 214	currentSession, err := a.sessions.Get(ctx, call.SessionID)
 215	if err != nil {
 216		return nil, fmt.Errorf("failed to get session: %w", err)
 217	}
 218
 219	msgs, err := a.getSessionMessages(ctx, currentSession)
 220	if err != nil {
 221		return nil, fmt.Errorf("failed to get session messages: %w", err)
 222	}
 223
 224	var wg sync.WaitGroup
 225	// Generate title if first message.
 226	if len(msgs) == 0 {
 227		titleCtx := ctx // Copy to avoid race with ctx reassignment below.
 228		wg.Go(func() {
 229			a.generateTitle(titleCtx, call.SessionID, call.Prompt)
 230		})
 231	}
 232	defer wg.Wait()
 233
 234	// Add the user message to the session.
 235	_, err = a.createUserMessage(ctx, call)
 236	if err != nil {
 237		return nil, err
 238	}
 239
 240	// Add the session to the context.
 241	ctx = context.WithValue(ctx, tools.SessionIDContextKey, call.SessionID)
 242
 243	genCtx, cancel := context.WithCancel(ctx)
 244	a.activeRequests.Set(call.SessionID, cancel)
 245
 246	defer cancel()
 247	defer a.activeRequests.Del(call.SessionID)
 248	// Drain any debounced message updates before returning. message.Service
 249	// already flushes synchronously on terminal updates, but a defer here
 250	// guarantees the contract at every Run exit (success, error, panic
 251	// recovery upstream) without callers needing to know.
 252	defer func() {
 253		if flushErr := a.messages.FlushAll(ctx); flushErr != nil {
 254			slog.Error("Failed to flush pending message updates after run", "error", flushErr)
 255		}
 256	}()
 257
 258	history, files := a.preparePrompt(msgs, largeModel.CatwalkCfg.SupportsImages, call.Attachments...)
 259
 260	startTime := time.Now()
 261	a.eventPromptSent(call.SessionID)
 262
 263	var currentAssistant *message.Message
 264	var shouldSummarize bool
 265	// Don't send MaxOutputTokens if 0 — some providers (e.g. LM Studio) reject it
 266	var maxOutputTokens *int64
 267	if call.MaxOutputTokens > 0 {
 268		maxOutputTokens = &call.MaxOutputTokens
 269	}
 270	result, err := agent.Stream(genCtx, fantasy.AgentStreamCall{
 271		Prompt:           message.PromptWithTextAttachments(call.Prompt, call.Attachments),
 272		Files:            files,
 273		Messages:         history,
 274		ProviderOptions:  call.ProviderOptions,
 275		MaxOutputTokens:  maxOutputTokens,
 276		TopP:             call.TopP,
 277		Temperature:      call.Temperature,
 278		PresencePenalty:  call.PresencePenalty,
 279		TopK:             call.TopK,
 280		FrequencyPenalty: call.FrequencyPenalty,
 281		PrepareStep: func(callContext context.Context, options fantasy.PrepareStepFunctionOptions) (_ context.Context, prepared fantasy.PrepareStepResult, err error) {
 282			prepared.Messages = options.Messages
 283			for i := range prepared.Messages {
 284				prepared.Messages[i].ProviderOptions = nil
 285			}
 286
 287			// Use latest tools (updated by SetTools when MCP tools change).
 288			prepared.Tools = a.tools.Copy()
 289
 290			queuedCalls, _ := a.messageQueue.Get(call.SessionID)
 291			a.messageQueue.Del(call.SessionID)
 292			for _, queued := range queuedCalls {
 293				userMessage, createErr := a.createUserMessage(callContext, queued)
 294				if createErr != nil {
 295					return callContext, prepared, createErr
 296				}
 297				prepared.Messages = append(prepared.Messages, userMessage.ToAIMessage()...)
 298			}
 299
 300			prepared.Messages = a.workaroundProviderMediaLimitations(prepared.Messages, largeModel)
 301
 302			lastSystemRoleInx := 0
 303			systemMessageUpdated := false
 304			for i, msg := range prepared.Messages {
 305				// Only add cache control to the last message.
 306				if msg.Role == fantasy.MessageRoleSystem {
 307					lastSystemRoleInx = i
 308				} else if !systemMessageUpdated {
 309					prepared.Messages[lastSystemRoleInx].ProviderOptions = a.getCacheControlOptions()
 310					systemMessageUpdated = true
 311				}
 312				// Than add cache control to the last 2 messages.
 313				if i > len(prepared.Messages)-3 {
 314					prepared.Messages[i].ProviderOptions = a.getCacheControlOptions()
 315				}
 316			}
 317
 318			if promptPrefix != "" {
 319				prepared.Messages = append([]fantasy.Message{fantasy.NewSystemMessage(promptPrefix)}, prepared.Messages...)
 320			}
 321
 322			var assistantMsg message.Message
 323			assistantMsg, err = a.messages.Create(callContext, call.SessionID, message.CreateMessageParams{
 324				Role:     message.Assistant,
 325				Parts:    []message.ContentPart{},
 326				Model:    largeModel.ModelCfg.Model,
 327				Provider: largeModel.ModelCfg.Provider,
 328			})
 329			if err != nil {
 330				return callContext, prepared, err
 331			}
 332			callContext = context.WithValue(callContext, tools.MessageIDContextKey, assistantMsg.ID)
 333			callContext = context.WithValue(callContext, tools.SupportsImagesContextKey, largeModel.CatwalkCfg.SupportsImages)
 334			callContext = context.WithValue(callContext, tools.ModelNameContextKey, largeModel.CatwalkCfg.Name)
 335			currentAssistant = &assistantMsg
 336			return callContext, prepared, err
 337		},
 338		OnReasoningStart: func(id string, reasoning fantasy.ReasoningContent) error {
 339			currentAssistant.AppendReasoningContent(reasoning.Text)
 340			return a.messages.Update(genCtx, *currentAssistant)
 341		},
 342		OnReasoningDelta: func(id string, text string) error {
 343			currentAssistant.AppendReasoningContent(text)
 344			return a.messages.Update(genCtx, *currentAssistant)
 345		},
 346		OnReasoningEnd: func(id string, reasoning fantasy.ReasoningContent) error {
 347			// handle anthropic signature
 348			if anthropicData, ok := reasoning.ProviderMetadata[anthropic.Name]; ok {
 349				if reasoning, ok := anthropicData.(*anthropic.ReasoningOptionMetadata); ok {
 350					currentAssistant.AppendReasoningSignature(reasoning.Signature)
 351				}
 352			}
 353			if googleData, ok := reasoning.ProviderMetadata[google.Name]; ok {
 354				if reasoning, ok := googleData.(*google.ReasoningMetadata); ok {
 355					currentAssistant.AppendThoughtSignature(reasoning.Signature, reasoning.ToolID)
 356				}
 357			}
 358			if openaiData, ok := reasoning.ProviderMetadata[openai.Name]; ok {
 359				if reasoning, ok := openaiData.(*openai.ResponsesReasoningMetadata); ok {
 360					currentAssistant.SetReasoningResponsesData(reasoning)
 361				}
 362			}
 363			currentAssistant.FinishThinking()
 364			return a.messages.Update(genCtx, *currentAssistant)
 365		},
 366		OnTextDelta: func(id string, text string) error {
 367			// Strip leading newline from initial text content. This is is
 368			// particularly important in non-interactive mode where leading
 369			// newlines are very visible.
 370			if len(currentAssistant.Parts) == 0 {
 371				text = strings.TrimPrefix(text, "\n")
 372			}
 373
 374			currentAssistant.AppendContent(text)
 375			return a.messages.Update(genCtx, *currentAssistant)
 376		},
 377		OnToolInputStart: func(id string, toolName string) error {
 378			toolCall := message.ToolCall{
 379				ID:               id,
 380				Name:             toolName,
 381				ProviderExecuted: false,
 382				Finished:         false,
 383			}
 384			currentAssistant.AddToolCall(toolCall)
 385			// Use parent ctx instead of genCtx to ensure the update succeeds
 386			// even if the request is canceled mid-stream
 387			return a.messages.Update(ctx, *currentAssistant)
 388		},
 389		OnRetry: func(err *fantasy.ProviderError, delay time.Duration) {
 390			slog.Warn("Provider request failed, retrying", providerRetryLogFields(err, delay)...)
 391		},
 392		OnToolCall: func(tc fantasy.ToolCallContent) error {
 393			toolCall := message.ToolCall{
 394				ID:               tc.ToolCallID,
 395				Name:             tc.ToolName,
 396				Input:            tc.Input,
 397				ProviderExecuted: false,
 398				Finished:         true,
 399			}
 400			currentAssistant.AddToolCall(toolCall)
 401			// Use parent ctx instead of genCtx to ensure the update succeeds
 402			// even if the request is canceled mid-stream
 403			return a.messages.Update(ctx, *currentAssistant)
 404		},
 405		OnToolResult: func(result fantasy.ToolResultContent) error {
 406			toolResult := a.convertToToolResult(result)
 407			// Use parent ctx instead of genCtx to ensure the message is created
 408			// even if the request is canceled mid-stream
 409			_, createMsgErr := a.messages.Create(ctx, currentAssistant.SessionID, message.CreateMessageParams{
 410				Role: message.Tool,
 411				Parts: []message.ContentPart{
 412					toolResult,
 413				},
 414			})
 415			return createMsgErr
 416		},
 417		OnStepFinish: func(stepResult fantasy.StepResult) error {
 418			finishReason := message.FinishReasonUnknown
 419			switch stepResult.FinishReason {
 420			case fantasy.FinishReasonLength:
 421				finishReason = message.FinishReasonMaxTokens
 422			case fantasy.FinishReasonStop:
 423				finishReason = message.FinishReasonEndTurn
 424			case fantasy.FinishReasonToolCalls:
 425				finishReason = message.FinishReasonToolUse
 426			}
 427			// If a tool result halted the turn (e.g. a hook halt or a
 428			// permission denial), the step ends on FinishReasonToolCalls but
 429			// the model will not be called again. Treat it as the end of the
 430			// turn so the UI can render the assistant footer.
 431			if finishReason == message.FinishReasonToolUse {
 432				for _, tr := range stepResult.Content.ToolResults() {
 433					if tr.StopTurn {
 434						finishReason = message.FinishReasonEndTurn
 435						break
 436					}
 437				}
 438			}
 439			currentAssistant.AddFinish(finishReason, "", "")
 440			sessionLock.Lock()
 441			defer sessionLock.Unlock()
 442
 443			updatedSession, getSessionErr := a.sessions.Get(ctx, call.SessionID)
 444			if getSessionErr != nil {
 445				return getSessionErr
 446			}
 447			a.updateSessionUsage(largeModel, &updatedSession, stepResult.Usage, a.openrouterCost(stepResult.ProviderMetadata))
 448			_, sessionErr := a.sessions.Save(ctx, updatedSession)
 449			if sessionErr != nil {
 450				return sessionErr
 451			}
 452			currentSession = updatedSession
 453			return a.messages.Update(genCtx, *currentAssistant)
 454		},
 455		StopWhen: []fantasy.StopCondition{
 456			func(_ []fantasy.StepResult) bool {
 457				cw := int64(largeModel.CatwalkCfg.ContextWindow)
 458				// If context window is unknown (0), skip auto-summarize
 459				// to avoid immediately truncating custom/local models.
 460				if cw == 0 {
 461					return false
 462				}
 463				tokens := currentSession.CompletionTokens + currentSession.PromptTokens
 464				remaining := cw - tokens
 465				var threshold int64
 466				if cw > largeContextWindowThreshold {
 467					threshold = largeContextWindowBuffer
 468				} else {
 469					threshold = int64(float64(cw) * smallContextWindowRatio)
 470				}
 471				if (remaining <= threshold) && !a.disableAutoSummarize {
 472					shouldSummarize = true
 473					return true
 474				}
 475				return false
 476			},
 477			func(steps []fantasy.StepResult) bool {
 478				return hasRepeatedToolCalls(steps, loopDetectionWindowSize, loopDetectionMaxRepeats)
 479			},
 480		},
 481	})
 482
 483	a.eventPromptResponded(call.SessionID, time.Since(startTime).Truncate(time.Second))
 484
 485	if err != nil {
 486		isHyper := largeModel.ModelCfg.Provider == hyper.Name
 487		isCancelErr := errors.Is(err, context.Canceled)
 488		if currentAssistant == nil {
 489			return result, err
 490		}
 491		// Ensure we finish thinking on error to close the reasoning state.
 492		currentAssistant.FinishThinking()
 493		toolCalls := currentAssistant.ToolCalls()
 494		// INFO: we use the parent context here because the genCtx has been cancelled.
 495		msgs, createErr := a.messages.List(ctx, currentAssistant.SessionID)
 496		if createErr != nil {
 497			return nil, createErr
 498		}
 499		for _, tc := range toolCalls {
 500			if !tc.Finished {
 501				tc.Finished = true
 502				tc.Input = "{}"
 503				currentAssistant.AddToolCall(tc)
 504				updateErr := a.messages.Update(ctx, *currentAssistant)
 505				if updateErr != nil {
 506					return nil, updateErr
 507				}
 508			}
 509
 510			found := false
 511			for _, msg := range msgs {
 512				if msg.Role == message.Tool {
 513					for _, tr := range msg.ToolResults() {
 514						if tr.ToolCallID == tc.ID {
 515							found = true
 516							break
 517						}
 518					}
 519				}
 520				if found {
 521					break
 522				}
 523			}
 524			if found {
 525				continue
 526			}
 527			content := "There was an error while executing the tool"
 528			if isCancelErr {
 529				content = "Error: user cancelled assistant tool calling"
 530			}
 531			toolResult := message.ToolResult{
 532				ToolCallID: tc.ID,
 533				Name:       tc.Name,
 534				Content:    content,
 535				IsError:    true,
 536			}
 537			_, createErr = a.messages.Create(ctx, currentAssistant.SessionID, message.CreateMessageParams{
 538				Role: message.Tool,
 539				Parts: []message.ContentPart{
 540					toolResult,
 541				},
 542			})
 543			if createErr != nil {
 544				return nil, createErr
 545			}
 546		}
 547		var fantasyErr *fantasy.Error
 548		var providerErr *fantasy.ProviderError
 549		const defaultTitle = "Provider Error"
 550		linkStyle := lipgloss.NewStyle().Foreground(charmtone.Guac).Underline(true)
 551		if isCancelErr {
 552			currentAssistant.AddFinish(message.FinishReasonCanceled, "User canceled request", "")
 553		} else if isHyper && errors.As(err, &providerErr) && providerErr.StatusCode == http.StatusUnauthorized {
 554			currentAssistant.AddFinish(message.FinishReasonError, "Unauthorized", `Please re-authenticate with Hyper. You can also run "crush auth" to re-authenticate.`)
 555			if a.notify != nil {
 556				a.notify.Publish(pubsub.CreatedEvent, notify.Notification{
 557					SessionID:    call.SessionID,
 558					SessionTitle: currentSession.Title,
 559					Type:         notify.TypeReAuthenticate,
 560					ProviderID:   largeModel.ModelCfg.Provider,
 561				})
 562			}
 563		} else if isHyper && errors.As(err, &providerErr) && providerErr.StatusCode == http.StatusPaymentRequired {
 564			url := hyper.BaseURL()
 565			link := linkStyle.Hyperlink(url, "id=hyper").Render(url)
 566			currentAssistant.AddFinish(message.FinishReasonError, "No credits", "You're out of credits. Add more at "+link)
 567		} else if errors.As(err, &providerErr) {
 568			if providerErr.Message == "The requested model is not supported." {
 569				url := "https://github.com/settings/copilot/features"
 570				link := linkStyle.Hyperlink(url, "id=copilot").Render(url)
 571				currentAssistant.AddFinish(
 572					message.FinishReasonError,
 573					"Copilot model not enabled",
 574					fmt.Sprintf("%q is not enabled in Copilot. Go to the following page to enable it. Then, wait 5 minutes before trying again. %s", largeModel.CatwalkCfg.Name, link),
 575				)
 576			} else {
 577				currentAssistant.AddFinish(message.FinishReasonError, cmp.Or(stringext.Capitalize(providerErr.Title), defaultTitle), providerErr.Message)
 578			}
 579		} else if errors.As(err, &fantasyErr) {
 580			currentAssistant.AddFinish(message.FinishReasonError, cmp.Or(stringext.Capitalize(fantasyErr.Title), defaultTitle), fantasyErr.Message)
 581		} else {
 582			currentAssistant.AddFinish(message.FinishReasonError, defaultTitle, err.Error())
 583		}
 584		// Note: we use the parent context here because the genCtx has been
 585		// cancelled.
 586		updateErr := a.messages.Update(ctx, *currentAssistant)
 587		if updateErr != nil {
 588			return nil, updateErr
 589		}
 590		return nil, err
 591	}
 592
 593	if shouldSummarize {
 594		a.activeRequests.Del(call.SessionID)
 595		if summarizeErr := a.Summarize(genCtx, call.SessionID, call.ProviderOptions); summarizeErr != nil {
 596			return nil, summarizeErr
 597		}
 598		// If the agent wasn't done...
 599		if len(currentAssistant.ToolCalls()) > 0 {
 600			existing, ok := a.messageQueue.Get(call.SessionID)
 601			if !ok {
 602				existing = []SessionAgentCall{}
 603			}
 604			call.Prompt = fmt.Sprintf("The previous session was interrupted because it got too long, the initial user request was: `%s`", call.Prompt)
 605			existing = append(existing, call)
 606			a.messageQueue.Set(call.SessionID, existing)
 607		}
 608	}
 609
 610	// Release active request before publishing the notification.
 611	// TUI handlers poll IsSessionBusy() and only re-evaluate when a
 612	// tea.Msg arrives, so the cleanup must precede the notify or
 613	// subscribers see stale busy state at the moment of receipt.
 614	a.activeRequests.Del(call.SessionID)
 615	cancel()
 616
 617	// Send notification that agent has finished its turn (skip for
 618	// nested/non-interactive sessions).
 619	if !call.NonInteractive && a.notify != nil {
 620		a.notify.Publish(pubsub.CreatedEvent, notify.Notification{
 621			SessionID:    call.SessionID,
 622			SessionTitle: currentSession.Title,
 623			Type:         notify.TypeAgentFinished,
 624		})
 625	}
 626
 627	queuedMessages, ok := a.messageQueue.Get(call.SessionID)
 628	if !ok || len(queuedMessages) == 0 {
 629		return result, err
 630	}
 631	// There are queued messages restart the loop.
 632	firstQueuedMessage := queuedMessages[0]
 633	a.messageQueue.Set(call.SessionID, queuedMessages[1:])
 634	return a.Run(ctx, firstQueuedMessage)
 635}
 636
 637func (a *sessionAgent) Summarize(ctx context.Context, sessionID string, opts fantasy.ProviderOptions) error {
 638	if a.IsSessionBusy(sessionID) {
 639		return ErrSessionBusy
 640	}
 641
 642	// Copy mutable fields under lock to avoid races with SetModels.
 643	largeModel := a.largeModel.Get()
 644	systemPromptPrefix := a.systemPromptPrefix.Get()
 645
 646	currentSession, err := a.sessions.Get(ctx, sessionID)
 647	if err != nil {
 648		return fmt.Errorf("failed to get session: %w", err)
 649	}
 650	msgs, err := a.getSessionMessages(ctx, currentSession)
 651	if err != nil {
 652		return err
 653	}
 654	if len(msgs) == 0 {
 655		// Nothing to summarize.
 656		return nil
 657	}
 658
 659	aiMsgs, _ := a.preparePrompt(msgs, largeModel.CatwalkCfg.SupportsImages)
 660
 661	genCtx, cancel := context.WithCancel(ctx)
 662	a.activeRequests.Set(sessionID, cancel)
 663	defer a.activeRequests.Del(sessionID)
 664	defer cancel()
 665	defer func() {
 666		if flushErr := a.messages.FlushAll(ctx); flushErr != nil {
 667			slog.Error("Failed to flush pending message updates after summarize", "error", flushErr)
 668		}
 669	}()
 670
 671	agent := fantasy.NewAgent(largeModel.Model,
 672		fantasy.WithSystemPrompt(string(summaryPrompt)),
 673		fantasy.WithUserAgent(userAgent),
 674	)
 675	summaryMessage, err := a.messages.Create(ctx, sessionID, message.CreateMessageParams{
 676		Role:             message.Assistant,
 677		Model:            largeModel.Model.Model(),
 678		Provider:         largeModel.Model.Provider(),
 679		IsSummaryMessage: true,
 680	})
 681	if err != nil {
 682		return err
 683	}
 684
 685	summaryPromptText := buildSummaryPrompt(currentSession.Todos)
 686
 687	resp, err := agent.Stream(genCtx, fantasy.AgentStreamCall{
 688		Prompt:          summaryPromptText,
 689		Messages:        aiMsgs,
 690		ProviderOptions: opts,
 691		PrepareStep: func(callContext context.Context, options fantasy.PrepareStepFunctionOptions) (_ context.Context, prepared fantasy.PrepareStepResult, err error) {
 692			prepared.Messages = options.Messages
 693			if systemPromptPrefix != "" {
 694				prepared.Messages = append([]fantasy.Message{fantasy.NewSystemMessage(systemPromptPrefix)}, prepared.Messages...)
 695			}
 696			return callContext, prepared, nil
 697		},
 698		OnReasoningDelta: func(id string, text string) error {
 699			summaryMessage.AppendReasoningContent(text)
 700			return a.messages.Update(genCtx, summaryMessage)
 701		},
 702		OnReasoningEnd: func(id string, reasoning fantasy.ReasoningContent) error {
 703			// Handle anthropic signature.
 704			if anthropicData, ok := reasoning.ProviderMetadata["anthropic"]; ok {
 705				if signature, ok := anthropicData.(*anthropic.ReasoningOptionMetadata); ok && signature.Signature != "" {
 706					summaryMessage.AppendReasoningSignature(signature.Signature)
 707				}
 708			}
 709			summaryMessage.FinishThinking()
 710			return a.messages.Update(genCtx, summaryMessage)
 711		},
 712		OnTextDelta: func(id, text string) error {
 713			summaryMessage.AppendContent(text)
 714			return a.messages.Update(genCtx, summaryMessage)
 715		},
 716	})
 717	if err != nil {
 718		isCancelErr := errors.Is(err, context.Canceled)
 719		if isCancelErr {
 720			// User cancelled summarize we need to remove the summary message.
 721			deleteErr := a.messages.Delete(ctx, summaryMessage.ID)
 722			return deleteErr
 723		}
 724		// Mark the summary message as finished with an error so the UI
 725		// stops spinning.
 726		summaryMessage.AddFinish(message.FinishReasonError, "Summarization Error", err.Error())
 727		if updateErr := a.messages.Update(ctx, summaryMessage); updateErr != nil {
 728			return updateErr
 729		}
 730		return err
 731	}
 732
 733	summaryMessage.AddFinish(message.FinishReasonEndTurn, "", "")
 734	err = a.messages.Update(genCtx, summaryMessage)
 735	if err != nil {
 736		return err
 737	}
 738
 739	var openrouterCost *float64
 740	for _, step := range resp.Steps {
 741		stepCost := a.openrouterCost(step.ProviderMetadata)
 742		if stepCost != nil {
 743			newCost := *stepCost
 744			if openrouterCost != nil {
 745				newCost += *openrouterCost
 746			}
 747			openrouterCost = &newCost
 748		}
 749	}
 750
 751	a.updateSessionUsage(largeModel, &currentSession, resp.TotalUsage, openrouterCost)
 752
 753	// Just in case, get just the last usage info.
 754	usage := resp.Response.Usage
 755	currentSession.SummaryMessageID = summaryMessage.ID
 756	currentSession.CompletionTokens = usage.OutputTokens
 757	currentSession.PromptTokens = 0
 758	_, err = a.sessions.Save(genCtx, currentSession)
 759	if err != nil {
 760		return err
 761	}
 762
 763	// Release the active request before processing queued messages so that
 764	// Run() does not see the session as busy.
 765	a.activeRequests.Del(sessionID)
 766	cancel()
 767
 768	// Process any messages that were queued while summarizing.
 769	queuedMessages, ok := a.messageQueue.Get(sessionID)
 770	if !ok || len(queuedMessages) == 0 {
 771		return nil
 772	}
 773	firstQueuedMessage := queuedMessages[0]
 774	a.messageQueue.Set(sessionID, queuedMessages[1:])
 775	_, qErr := a.Run(ctx, firstQueuedMessage)
 776	return qErr
 777}
 778
 779func (a *sessionAgent) getCacheControlOptions() fantasy.ProviderOptions {
 780	if t, _ := strconv.ParseBool(os.Getenv("CRUSH_DISABLE_ANTHROPIC_CACHE")); t {
 781		return fantasy.ProviderOptions{}
 782	}
 783	return fantasy.ProviderOptions{
 784		anthropic.Name: &anthropic.ProviderCacheControlOptions{
 785			CacheControl: anthropic.CacheControl{Type: "ephemeral"},
 786		},
 787		bedrock.Name: &anthropic.ProviderCacheControlOptions{
 788			CacheControl: anthropic.CacheControl{Type: "ephemeral"},
 789		},
 790		vercel.Name: &anthropic.ProviderCacheControlOptions{
 791			CacheControl: anthropic.CacheControl{Type: "ephemeral"},
 792		},
 793	}
 794}
 795
 796func (a *sessionAgent) createUserMessage(ctx context.Context, call SessionAgentCall) (message.Message, error) {
 797	parts := []message.ContentPart{message.TextContent{Text: call.Prompt}}
 798	var attachmentParts []message.ContentPart
 799	for _, attachment := range call.Attachments {
 800		attachmentParts = append(attachmentParts, message.BinaryContent{Path: attachment.FilePath, MIMEType: attachment.MimeType, Data: attachment.Content})
 801	}
 802	parts = append(parts, attachmentParts...)
 803	msg, err := a.messages.Create(ctx, call.SessionID, message.CreateMessageParams{
 804		Role:  message.User,
 805		Parts: parts,
 806	})
 807	if err != nil {
 808		return message.Message{}, fmt.Errorf("failed to create user message: %w", err)
 809	}
 810	return msg, nil
 811}
 812
 813func (a *sessionAgent) preparePrompt(msgs []message.Message, supportsImages bool, attachments ...message.Attachment) ([]fantasy.Message, []fantasy.FilePart) {
 814	var history []fantasy.Message
 815	if !a.isSubAgent {
 816		history = append(history, fantasy.NewUserMessage(
 817			fmt.Sprintf("<system_reminder>%s</system_reminder>",
 818				`This is a reminder that your todo list is currently empty. DO NOT mention this to the user explicitly because they are already aware.
 819If you are working on tasks that would benefit from a todo list please use the "todos" tool to create one.
 820If not, please feel free to ignore. Again do not mention this message to the user.`,
 821			),
 822		))
 823	}
 824	// Collect all tool call IDs present in assistant messages and all tool
 825	// result IDs present in tool messages. This lets us detect both orphaned
 826	// tool results (result without a call) and orphaned tool calls (call
 827	// without a result).
 828	knownToolCallIDs := make(map[string]struct{})
 829	knownToolResultIDs := make(map[string]struct{})
 830	for _, m := range msgs {
 831		switch m.Role {
 832		case message.Assistant:
 833			for _, tc := range m.ToolCalls() {
 834				knownToolCallIDs[tc.ID] = struct{}{}
 835			}
 836		case message.Tool:
 837			for _, tr := range m.ToolResults() {
 838				knownToolResultIDs[tr.ToolCallID] = struct{}{}
 839			}
 840		}
 841	}
 842
 843	for _, m := range msgs {
 844		if len(m.Parts) == 0 {
 845			continue
 846		}
 847		// Assistant message without content or tool calls (cancelled before it returned anything).
 848		if m.Role == message.Assistant && len(m.ToolCalls()) == 0 && m.Content().Text == "" && m.ReasoningContent().String() == "" {
 849			continue
 850		}
 851		if m.Role == message.Tool {
 852			if msg, ok := filterOrphanedToolResults(m, knownToolCallIDs); ok {
 853				history = append(history, msg)
 854			}
 855			continue
 856		}
 857		aiMsgs := m.ToAIMessage()
 858		if !supportsImages {
 859			for i := range aiMsgs {
 860				if aiMsgs[i].Role == fantasy.MessageRoleUser {
 861					aiMsgs[i].Content = filterFileParts(aiMsgs[i].Content)
 862				}
 863			}
 864		}
 865		history = append(history, aiMsgs...)
 866
 867		if m.Role == message.Assistant {
 868			if msg, ok := syntheticToolResultsForOrphanedCalls(m, knownToolResultIDs); ok {
 869				history = append(history, msg)
 870			}
 871		}
 872	}
 873
 874	var files []fantasy.FilePart
 875	for _, attachment := range attachments {
 876		if attachment.IsText() {
 877			continue
 878		}
 879		files = append(files, fantasy.FilePart{
 880			Filename:  attachment.FileName,
 881			Data:      attachment.Content,
 882			MediaType: attachment.MimeType,
 883		})
 884	}
 885
 886	return history, files
 887}
 888
 889// filterFileParts removes fantasy.FilePart entries from a slice of message
 890// parts. Used to strip image attachments from historical user messages when
 891// the current model does not support them.
 892func filterFileParts(parts []fantasy.MessagePart) []fantasy.MessagePart {
 893	filtered := make([]fantasy.MessagePart, 0, len(parts))
 894	for _, part := range parts {
 895		if _, ok := fantasy.AsMessagePart[fantasy.FilePart](part); ok {
 896			continue
 897		}
 898		filtered = append(filtered, part)
 899	}
 900	return filtered
 901}
 902
 903// filterOrphanedToolResults converts a tool message to a fantasy.Message,
 904// dropping any tool result parts whose tool_call_id has no matching tool call
 905// in the known set. An orphaned result causes API validation to fail on every
 906// subsequent turn, permanently locking the session. Returns the filtered
 907// message and true if at least one valid part remains.
 908func filterOrphanedToolResults(m message.Message, knownToolCallIDs map[string]struct{}) (fantasy.Message, bool) {
 909	aiMsgs := m.ToAIMessage()
 910	if len(aiMsgs) == 0 {
 911		return fantasy.Message{}, false
 912	}
 913	var validParts []fantasy.MessagePart
 914	for _, part := range aiMsgs[0].Content {
 915		tr, ok := fantasy.AsMessagePart[fantasy.ToolResultPart](part)
 916		if !ok {
 917			validParts = append(validParts, part)
 918			continue
 919		}
 920		if _, known := knownToolCallIDs[tr.ToolCallID]; known {
 921			validParts = append(validParts, part)
 922		} else {
 923			slog.Warn("Dropping orphaned tool result with no matching tool call",
 924				"tool_call_id", tr.ToolCallID,
 925			)
 926		}
 927	}
 928	if len(validParts) == 0 {
 929		return fantasy.Message{}, false
 930	}
 931	msg := aiMsgs[0]
 932	msg.Content = validParts
 933	return msg, true
 934}
 935
 936// syntheticToolResultsForOrphanedCalls returns a tool message containing
 937// synthetic tool results for any tool calls in the assistant message that
 938// have no matching result in knownToolResultIDs. LLM APIs require every
 939// tool_use to be immediately followed by a tool_result; an interrupted
 940// session can leave orphaned tool_use blocks that permanently lock the
 941// conversation. Returns the message and true if any synthetic results were
 942// produced.
 943func syntheticToolResultsForOrphanedCalls(m message.Message, knownToolResultIDs map[string]struct{}) (fantasy.Message, bool) {
 944	var syntheticParts []fantasy.MessagePart
 945	for _, tc := range m.ToolCalls() {
 946		if _, hasResult := knownToolResultIDs[tc.ID]; hasResult {
 947			continue
 948		}
 949		slog.Warn("Injecting synthetic tool result for orphaned tool call",
 950			"tool_call_id", tc.ID,
 951			"tool_name", tc.Name,
 952		)
 953		syntheticParts = append(syntheticParts, fantasy.ToolResultPart{
 954			ToolCallID: tc.ID,
 955			Output: fantasy.ToolResultOutputContentError{
 956				Error: errors.New("tool call was interrupted and did not produce a result, you may retry this call if the result is still needed"),
 957			},
 958		})
 959	}
 960	if len(syntheticParts) == 0 {
 961		return fantasy.Message{}, false
 962	}
 963	return fantasy.Message{
 964		Role:    fantasy.MessageRoleTool,
 965		Content: syntheticParts,
 966	}, true
 967}
 968
 969func (a *sessionAgent) getSessionMessages(ctx context.Context, session session.Session) ([]message.Message, error) {
 970	msgs, err := a.messages.List(ctx, session.ID)
 971	if err != nil {
 972		return nil, fmt.Errorf("failed to list messages: %w", err)
 973	}
 974
 975	if session.SummaryMessageID != "" {
 976		summaryMsgIndex := -1
 977		for i, msg := range msgs {
 978			if msg.ID == session.SummaryMessageID {
 979				summaryMsgIndex = i
 980				break
 981			}
 982		}
 983		if summaryMsgIndex != -1 {
 984			msgs = msgs[summaryMsgIndex:]
 985			msgs[0].Role = message.User
 986		}
 987	}
 988	return msgs, nil
 989}
 990
 991// generateTitle generates a session titled based on the initial prompt.
 992func (a *sessionAgent) generateTitle(ctx context.Context, sessionID string, userPrompt string) {
 993	if userPrompt == "" {
 994		return
 995	}
 996
 997	smallModel := a.smallModel.Get()
 998	largeModel := a.largeModel.Get()
 999	systemPromptPrefix := a.systemPromptPrefix.Get()
1000
1001	var maxOutputTokens int64 = 40
1002	if smallModel.CatwalkCfg.CanReason {
1003		maxOutputTokens = smallModel.CatwalkCfg.DefaultMaxTokens
1004	}
1005
1006	newAgent := func(m fantasy.LanguageModel, p []byte, tok int64) fantasy.Agent {
1007		return fantasy.NewAgent(m,
1008			fantasy.WithSystemPrompt(string(p)+"\n /no_think"),
1009			fantasy.WithMaxOutputTokens(tok),
1010			fantasy.WithUserAgent(userAgent),
1011		)
1012	}
1013
1014	streamCall := fantasy.AgentStreamCall{
1015		Prompt: fmt.Sprintf("Generate a concise title for the following content:\n\n%s\n <think>\n\n</think>", userPrompt),
1016		PrepareStep: func(callCtx context.Context, opts fantasy.PrepareStepFunctionOptions) (_ context.Context, prepared fantasy.PrepareStepResult, err error) {
1017			prepared.Messages = opts.Messages
1018			if systemPromptPrefix != "" {
1019				prepared.Messages = append([]fantasy.Message{
1020					fantasy.NewSystemMessage(systemPromptPrefix),
1021				}, prepared.Messages...)
1022			}
1023			return callCtx, prepared, nil
1024		},
1025	}
1026
1027	// Use the small model to generate the title.
1028	model := smallModel
1029	agent := newAgent(model.Model, titlePrompt, maxOutputTokens)
1030	resp, err := agent.Stream(ctx, streamCall)
1031	if err == nil {
1032		// We successfully generated a title with the small model.
1033		slog.Debug("Generated title with small model")
1034	} else {
1035		// It didn't work. Let's try with the big model.
1036		slog.Error("Error generating title with small model; trying big model", "err", err)
1037		model = largeModel
1038		agent = newAgent(model.Model, titlePrompt, maxOutputTokens)
1039		resp, err = agent.Stream(ctx, streamCall)
1040		if err == nil {
1041			slog.Debug("Generated title with large model")
1042		} else {
1043			// Welp, the large model didn't work either. Use the default
1044			// session name and return.
1045			slog.Error("Error generating title with large model", "err", err)
1046			saveErr := a.sessions.Rename(ctx, sessionID, DefaultSessionName)
1047			if saveErr != nil {
1048				slog.Error("Failed to save session title", "error", saveErr)
1049			}
1050			return
1051		}
1052	}
1053
1054	if resp == nil {
1055		// Actually, we didn't get a response so we can't. Use the default
1056		// session name and return.
1057		slog.Error("Response is nil; can't generate title")
1058		saveErr := a.sessions.Rename(ctx, sessionID, DefaultSessionName)
1059		if saveErr != nil {
1060			slog.Error("Failed to save session title", "error", saveErr)
1061		}
1062		return
1063	}
1064
1065	// Clean up title.
1066	var title string
1067	title = strings.ReplaceAll(resp.Response.Content.Text(), "\n", " ")
1068
1069	// Remove thinking tags if present.
1070	title = thinkTagRegex.ReplaceAllString(title, "")
1071	title = orphanThinkTagRegex.ReplaceAllString(title, "")
1072
1073	title = strings.TrimSpace(title)
1074	title = cmp.Or(title, DefaultSessionName)
1075
1076	// Calculate usage and cost.
1077	var openrouterCost *float64
1078	for _, step := range resp.Steps {
1079		stepCost := a.openrouterCost(step.ProviderMetadata)
1080		if stepCost != nil {
1081			newCost := *stepCost
1082			if openrouterCost != nil {
1083				newCost += *openrouterCost
1084			}
1085			openrouterCost = &newCost
1086		}
1087	}
1088
1089	modelConfig := model.CatwalkCfg
1090	cost := modelConfig.CostPer1MInCached/1e6*float64(resp.TotalUsage.CacheCreationTokens) +
1091		modelConfig.CostPer1MOutCached/1e6*float64(resp.TotalUsage.CacheReadTokens) +
1092		modelConfig.CostPer1MIn/1e6*float64(resp.TotalUsage.InputTokens) +
1093		modelConfig.CostPer1MOut/1e6*float64(resp.TotalUsage.OutputTokens)
1094
1095	// Use override cost if available (e.g., from OpenRouter).
1096	if openrouterCost != nil {
1097		cost = *openrouterCost
1098	}
1099
1100	// Skip cost accumulation
1101	if model.FlatRate {
1102		cost = 0
1103	}
1104
1105	promptTokens := resp.TotalUsage.InputTokens + resp.TotalUsage.CacheCreationTokens
1106	completionTokens := resp.TotalUsage.OutputTokens
1107
1108	// Atomically update only title and usage fields to avoid overriding other
1109	// concurrent session updates.
1110	saveErr := a.sessions.UpdateTitleAndUsage(ctx, sessionID, title, promptTokens, completionTokens, cost)
1111	if saveErr != nil {
1112		slog.Error("Failed to save session title and usage", "error", saveErr)
1113		return
1114	}
1115}
1116
1117func (a *sessionAgent) openrouterCost(metadata fantasy.ProviderMetadata) *float64 {
1118	openrouterMetadata, ok := metadata[openrouter.Name]
1119	if !ok {
1120		return nil
1121	}
1122
1123	opts, ok := openrouterMetadata.(*openrouter.ProviderMetadata)
1124	if !ok {
1125		return nil
1126	}
1127	return &opts.Usage.Cost
1128}
1129
1130func (a *sessionAgent) updateSessionUsage(model Model, session *session.Session, usage fantasy.Usage, overrideCost *float64) {
1131	modelConfig := model.CatwalkCfg
1132	cost := modelConfig.CostPer1MInCached/1e6*float64(usage.CacheCreationTokens) +
1133		modelConfig.CostPer1MOutCached/1e6*float64(usage.CacheReadTokens) +
1134		modelConfig.CostPer1MIn/1e6*float64(usage.InputTokens) +
1135		modelConfig.CostPer1MOut/1e6*float64(usage.OutputTokens)
1136
1137	a.eventTokensUsed(session.ID, model, usage, cost)
1138
1139	// Use override cost if available (e.g., from OpenRouter).
1140	if overrideCost != nil {
1141		cost = *overrideCost
1142	}
1143
1144	// Skip cost accumulation
1145	if model.FlatRate {
1146		cost = 0
1147	}
1148
1149	session.Cost += cost
1150	session.CompletionTokens = usage.OutputTokens
1151	session.PromptTokens = usage.InputTokens + usage.CacheReadTokens
1152}
1153
1154func (a *sessionAgent) Cancel(sessionID string) {
1155	// Cancel regular requests. Don't use Take() here - we need the entry to
1156	// remain in activeRequests so IsBusy() returns true until the goroutine
1157	// fully completes (including error handling that may access the DB).
1158	// The defer in processRequest will clean up the entry.
1159	if cancel, ok := a.activeRequests.Get(sessionID); ok && cancel != nil {
1160		slog.Debug("Request cancellation initiated", "session_id", sessionID)
1161		cancel()
1162	}
1163
1164	// Also check for summarize requests.
1165	if cancel, ok := a.activeRequests.Get(sessionID + "-summarize"); ok && cancel != nil {
1166		slog.Debug("Summarize cancellation initiated", "session_id", sessionID)
1167		cancel()
1168	}
1169
1170	if a.QueuedPrompts(sessionID) > 0 {
1171		slog.Debug("Clearing queued prompts", "session_id", sessionID)
1172		a.messageQueue.Del(sessionID)
1173	}
1174}
1175
1176func (a *sessionAgent) ClearQueue(sessionID string) {
1177	if a.QueuedPrompts(sessionID) > 0 {
1178		slog.Debug("Clearing queued prompts", "session_id", sessionID)
1179		a.messageQueue.Del(sessionID)
1180	}
1181}
1182
1183func (a *sessionAgent) CancelAll() {
1184	if !a.IsBusy() {
1185		return
1186	}
1187	for key := range a.activeRequests.Seq2() {
1188		a.Cancel(key) // key is sessionID
1189	}
1190
1191	timeout := time.After(5 * time.Second)
1192	for a.IsBusy() {
1193		select {
1194		case <-timeout:
1195			return
1196		default:
1197			time.Sleep(200 * time.Millisecond)
1198		}
1199	}
1200}
1201
1202func (a *sessionAgent) IsBusy() bool {
1203	var busy bool
1204	for cancelFunc := range a.activeRequests.Seq() {
1205		if cancelFunc != nil {
1206			busy = true
1207			break
1208		}
1209	}
1210	return busy
1211}
1212
1213func (a *sessionAgent) IsSessionBusy(sessionID string) bool {
1214	_, busy := a.activeRequests.Get(sessionID)
1215	return busy
1216}
1217
1218func (a *sessionAgent) QueuedPrompts(sessionID string) int {
1219	l, ok := a.messageQueue.Get(sessionID)
1220	if !ok {
1221		return 0
1222	}
1223	return len(l)
1224}
1225
1226func (a *sessionAgent) QueuedPromptsList(sessionID string) []string {
1227	l, ok := a.messageQueue.Get(sessionID)
1228	if !ok {
1229		return nil
1230	}
1231	prompts := make([]string, len(l))
1232	for i, call := range l {
1233		prompts[i] = call.Prompt
1234	}
1235	return prompts
1236}
1237
1238func (a *sessionAgent) SetModels(large Model, small Model) {
1239	a.largeModel.Set(large)
1240	a.smallModel.Set(small)
1241}
1242
1243func (a *sessionAgent) SetTools(tools []fantasy.AgentTool) {
1244	a.tools.SetSlice(tools)
1245}
1246
1247func (a *sessionAgent) SetSystemPrompt(systemPrompt string) {
1248	a.systemPrompt.Set(systemPrompt)
1249}
1250
1251func (a *sessionAgent) Model() Model {
1252	return a.largeModel.Get()
1253}
1254
1255// convertToToolResult converts a fantasy tool result to a message tool result.
1256func (a *sessionAgent) convertToToolResult(result fantasy.ToolResultContent) message.ToolResult {
1257	baseResult := message.ToolResult{
1258		ToolCallID: result.ToolCallID,
1259		Name:       result.ToolName,
1260		Metadata:   result.ClientMetadata,
1261	}
1262
1263	switch result.Result.GetType() {
1264	case fantasy.ToolResultContentTypeText:
1265		if r, ok := fantasy.AsToolResultOutputType[fantasy.ToolResultOutputContentText](result.Result); ok {
1266			baseResult.Content = r.Text
1267		}
1268	case fantasy.ToolResultContentTypeError:
1269		if r, ok := fantasy.AsToolResultOutputType[fantasy.ToolResultOutputContentError](result.Result); ok {
1270			baseResult.Content = r.Error.Error()
1271			baseResult.IsError = true
1272		}
1273	case fantasy.ToolResultContentTypeMedia:
1274		if r, ok := fantasy.AsToolResultOutputType[fantasy.ToolResultOutputContentMedia](result.Result); ok {
1275			if !stringext.IsValidBase64(r.Data) {
1276				slog.Warn("Tool returned media with invalid base64 data, discarding image",
1277					"tool", result.ToolName,
1278					"tool_call_id", result.ToolCallID,
1279				)
1280				baseResult.Content = "Tool returned image data with invalid encoding"
1281				baseResult.IsError = true
1282			} else {
1283				content := r.Text
1284				if content == "" {
1285					content = fmt.Sprintf("Loaded %s content", r.MediaType)
1286				}
1287				baseResult.Content = content
1288				baseResult.Data = r.Data
1289				baseResult.MIMEType = r.MediaType
1290			}
1291		}
1292	}
1293
1294	return baseResult
1295}
1296
1297// workaroundProviderMediaLimitations converts media content in tool results to
1298// user messages for providers that don't natively support images in tool results.
1299//
1300// Problem: OpenAI, Google, OpenRouter, and other OpenAI-compatible providers
1301// don't support sending images/media in tool result messages - they only accept
1302// text in tool results. However, they DO support images in user messages.
1303//
1304// If we send media in tool results to these providers, the API returns an error.
1305//
1306// Solution: For these providers, we:
1307//  1. Replace the media in the tool result with a text placeholder
1308//  2. Inject a user message immediately after with the image as a file attachment
1309//  3. This maintains the tool execution flow while working around API limitations
1310//
1311// Anthropic and Bedrock support images natively in tool results, so we skip
1312// this workaround for them.
1313//
1314// Example transformation:
1315//
1316//	BEFORE: [tool result: image data]
1317//	AFTER:  [tool result: "Image loaded - see attached"], [user: image attachment]
1318func (a *sessionAgent) workaroundProviderMediaLimitations(messages []fantasy.Message, largeModel Model) []fantasy.Message {
1319	providerSupportsMedia := largeModel.ModelCfg.Provider == string(catwalk.InferenceProviderAnthropic) ||
1320		largeModel.ModelCfg.Provider == string(catwalk.InferenceProviderBedrock)
1321
1322	if providerSupportsMedia {
1323		return messages
1324	}
1325
1326	convertedMessages := make([]fantasy.Message, 0, len(messages))
1327
1328	for _, msg := range messages {
1329		if msg.Role != fantasy.MessageRoleTool {
1330			convertedMessages = append(convertedMessages, msg)
1331			continue
1332		}
1333
1334		textParts := make([]fantasy.MessagePart, 0, len(msg.Content))
1335		var mediaFiles []fantasy.FilePart
1336
1337		for _, part := range msg.Content {
1338			toolResult, ok := fantasy.AsMessagePart[fantasy.ToolResultPart](part)
1339			if !ok {
1340				textParts = append(textParts, part)
1341				continue
1342			}
1343
1344			if media, ok := fantasy.AsToolResultOutputType[fantasy.ToolResultOutputContentMedia](toolResult.Output); ok {
1345				decoded, err := base64.StdEncoding.DecodeString(media.Data)
1346				if err != nil {
1347					slog.Warn("Failed to decode media data", "error", err)
1348					textParts = append(textParts, part)
1349					continue
1350				}
1351
1352				mediaFiles = append(mediaFiles, fantasy.FilePart{
1353					Data:      decoded,
1354					MediaType: media.MediaType,
1355					Filename:  fmt.Sprintf("tool-result-%s", toolResult.ToolCallID),
1356				})
1357
1358				textParts = append(textParts, fantasy.ToolResultPart{
1359					ToolCallID: toolResult.ToolCallID,
1360					Output: fantasy.ToolResultOutputContentText{
1361						Text: "[Image/media content loaded - see attached file]",
1362					},
1363					ProviderOptions: toolResult.ProviderOptions,
1364				})
1365			} else {
1366				textParts = append(textParts, part)
1367			}
1368		}
1369
1370		convertedMessages = append(convertedMessages, fantasy.Message{
1371			Role:    fantasy.MessageRoleTool,
1372			Content: textParts,
1373		})
1374
1375		if len(mediaFiles) > 0 {
1376			convertedMessages = append(convertedMessages, fantasy.NewUserMessage(
1377				"Here is the media content from the tool result:",
1378				mediaFiles...,
1379			))
1380		}
1381	}
1382
1383	return convertedMessages
1384}
1385
1386// buildSummaryPrompt constructs the prompt text for session summarization.
1387func buildSummaryPrompt(todos []session.Todo) string {
1388	var sb strings.Builder
1389	sb.WriteString("Provide a detailed summary of our conversation above.")
1390	if len(todos) > 0 {
1391		sb.WriteString("\n\n## Current Todo List\n\n")
1392		for _, t := range todos {
1393			fmt.Fprintf(&sb, "- [%s] %s\n", t.Status, t.Content)
1394		}
1395		sb.WriteString("\nInclude these tasks and their statuses in your summary. ")
1396		sb.WriteString("Instruct the resuming assistant to use the `todos` tool to continue tracking progress on these tasks.")
1397	}
1398	return sb.String()
1399}
1400
1401func providerRetryLogFields(err *fantasy.ProviderError, delay time.Duration) []any {
1402	fields := []any{
1403		"retry_delay", delay.String(),
1404	}
1405	if err == nil {
1406		return fields
1407	}
1408	fields = append(fields, "status_code", err.StatusCode)
1409	if err.Title != "" {
1410		fields = append(fields, "title", err.Title)
1411	}
1412	if err.Message != "" {
1413		fields = append(fields, "message", err.Message)
1414	}
1415	return fields
1416}