agent.go

   1// Package agent is the core orchestration layer for Crush AI agents.
   2//
   3// It provides session-based AI agent functionality for managing
   4// conversations, tool execution, and message handling. It coordinates
   5// interactions between language models, messages, sessions, and tools while
   6// handling features like automatic summarization, queuing, and token
   7// management.
   8package agent
   9
  10import (
  11	"cmp"
  12	"context"
  13	_ "embed"
  14	"encoding/base64"
  15	"errors"
  16	"fmt"
  17	"log/slog"
  18	"net/http"
  19	"os"
  20	"regexp"
  21	"strconv"
  22	"strings"
  23	"sync"
  24	"time"
  25
  26	"charm.land/catwalk/pkg/catwalk"
  27	"charm.land/fantasy"
  28	"charm.land/fantasy/providers/anthropic"
  29	"charm.land/fantasy/providers/bedrock"
  30	"charm.land/fantasy/providers/google"
  31	"charm.land/fantasy/providers/openai"
  32	"charm.land/fantasy/providers/openrouter"
  33	"charm.land/fantasy/providers/vercel"
  34	"charm.land/lipgloss/v2"
  35	"github.com/charmbracelet/crush/internal/agent/hyper"
  36	"github.com/charmbracelet/crush/internal/agent/notify"
  37	"github.com/charmbracelet/crush/internal/agent/tools"
  38	"github.com/charmbracelet/crush/internal/agent/tools/mcp"
  39	"github.com/charmbracelet/crush/internal/config"
  40	"github.com/charmbracelet/crush/internal/csync"
  41	"github.com/charmbracelet/crush/internal/message"
  42	"github.com/charmbracelet/crush/internal/pubsub"
  43	"github.com/charmbracelet/crush/internal/session"
  44	"github.com/charmbracelet/crush/internal/stringext"
  45	"github.com/charmbracelet/crush/internal/version"
  46	"github.com/charmbracelet/x/exp/charmtone"
  47)
  48
  49const (
  50	DefaultSessionName = "Untitled Session"
  51
  52	// Constants for auto-summarization thresholds
  53	largeContextWindowThreshold = 200_000
  54	largeContextWindowBuffer    = 20_000
  55	smallContextWindowRatio     = 0.2
  56)
  57
  58var userAgent = fmt.Sprintf("Charm-Crush/%s (https://charm.land/crush)", version.Version)
  59
  60//go:embed templates/title.md
  61var titlePrompt []byte
  62
  63//go:embed templates/summary.md
  64var summaryPrompt []byte
  65
  66// Used to remove <think> tags from generated titles.
  67var (
  68	thinkTagRegex       = regexp.MustCompile(`(?s)<think>.*?</think>`)
  69	orphanThinkTagRegex = regexp.MustCompile(`</?think>`)
  70)
  71
  72type SessionAgentCall struct {
  73	SessionID        string
  74	Prompt           string
  75	ProviderOptions  fantasy.ProviderOptions
  76	Attachments      []message.Attachment
  77	MaxOutputTokens  int64
  78	Temperature      *float64
  79	TopP             *float64
  80	TopK             *int64
  81	FrequencyPenalty *float64
  82	PresencePenalty  *float64
  83	NonInteractive   bool
  84}
  85
  86type SessionAgent interface {
  87	Run(context.Context, SessionAgentCall) (*fantasy.AgentResult, error)
  88	SetModels(large Model, small Model)
  89	SetTools(tools []fantasy.AgentTool)
  90	SetSystemPrompt(systemPrompt string)
  91	Cancel(sessionID string)
  92	CancelAll()
  93	IsSessionBusy(sessionID string) bool
  94	IsBusy() bool
  95	QueuedPrompts(sessionID string) int
  96	QueuedPromptsList(sessionID string) []string
  97	ClearQueue(sessionID string)
  98	Summarize(context.Context, string, fantasy.ProviderOptions) error
  99	Model() Model
 100}
 101
 102type Model struct {
 103	Model      fantasy.LanguageModel
 104	CatwalkCfg catwalk.Model
 105	ModelCfg   config.SelectedModel
 106	FlatRate   bool
 107}
 108
 109type sessionAgent struct {
 110	largeModel         *csync.Value[Model]
 111	smallModel         *csync.Value[Model]
 112	systemPromptPrefix *csync.Value[string]
 113	systemPrompt       *csync.Value[string]
 114	tools              *csync.Slice[fantasy.AgentTool]
 115
 116	isSubAgent           bool
 117	sessions             session.Service
 118	messages             message.Service
 119	disableAutoSummarize bool
 120	isYolo               bool
 121	notify               pubsub.Publisher[notify.Notification]
 122
 123	messageQueue   *csync.Map[string, []SessionAgentCall]
 124	activeRequests *csync.Map[string, context.CancelFunc]
 125}
 126
 127type SessionAgentOptions struct {
 128	LargeModel           Model
 129	SmallModel           Model
 130	SystemPromptPrefix   string
 131	SystemPrompt         string
 132	IsSubAgent           bool
 133	DisableAutoSummarize bool
 134	IsYolo               bool
 135	Sessions             session.Service
 136	Messages             message.Service
 137	Tools                []fantasy.AgentTool
 138	Notify               pubsub.Publisher[notify.Notification]
 139}
 140
 141func NewSessionAgent(
 142	opts SessionAgentOptions,
 143) SessionAgent {
 144	return &sessionAgent{
 145		largeModel:           csync.NewValue(opts.LargeModel),
 146		smallModel:           csync.NewValue(opts.SmallModel),
 147		systemPromptPrefix:   csync.NewValue(opts.SystemPromptPrefix),
 148		systemPrompt:         csync.NewValue(opts.SystemPrompt),
 149		isSubAgent:           opts.IsSubAgent,
 150		sessions:             opts.Sessions,
 151		messages:             opts.Messages,
 152		disableAutoSummarize: opts.DisableAutoSummarize,
 153		tools:                csync.NewSliceFrom(opts.Tools),
 154		isYolo:               opts.IsYolo,
 155		notify:               opts.Notify,
 156		messageQueue:         csync.NewMap[string, []SessionAgentCall](),
 157		activeRequests:       csync.NewMap[string, context.CancelFunc](),
 158	}
 159}
 160
 161func (a *sessionAgent) Run(ctx context.Context, call SessionAgentCall) (*fantasy.AgentResult, error) {
 162	if call.Prompt == "" && !message.ContainsTextAttachment(call.Attachments) {
 163		return nil, ErrEmptyPrompt
 164	}
 165	if call.SessionID == "" {
 166		return nil, ErrSessionMissing
 167	}
 168
 169	// Queue the message if busy
 170	if a.IsSessionBusy(call.SessionID) {
 171		existing, ok := a.messageQueue.Get(call.SessionID)
 172		if !ok {
 173			existing = []SessionAgentCall{}
 174		}
 175		existing = append(existing, call)
 176		a.messageQueue.Set(call.SessionID, existing)
 177		return nil, nil
 178	}
 179
 180	// Copy mutable fields under lock to avoid races with SetTools/SetModels.
 181	agentTools := a.tools.Copy()
 182	largeModel := a.largeModel.Get()
 183	systemPrompt := a.systemPrompt.Get()
 184	promptPrefix := a.systemPromptPrefix.Get()
 185	var instructions strings.Builder
 186
 187	for _, server := range mcp.GetStates() {
 188		if server.State != mcp.StateConnected {
 189			continue
 190		}
 191		if s := server.Client.InitializeResult().Instructions; s != "" {
 192			instructions.WriteString(s)
 193			instructions.WriteString("\n\n")
 194		}
 195	}
 196
 197	if s := instructions.String(); s != "" {
 198		systemPrompt += "\n\n<mcp-instructions>\n" + s + "\n</mcp-instructions>"
 199	}
 200
 201	if len(agentTools) > 0 {
 202		// Add Anthropic caching to the last tool.
 203		agentTools[len(agentTools)-1].SetProviderOptions(a.getCacheControlOptions())
 204	}
 205
 206	agent := fantasy.NewAgent(
 207		largeModel.Model,
 208		fantasy.WithSystemPrompt(systemPrompt),
 209		fantasy.WithTools(agentTools...),
 210		fantasy.WithUserAgent(userAgent),
 211	)
 212
 213	sessionLock := sync.Mutex{}
 214	currentSession, err := a.sessions.Get(ctx, call.SessionID)
 215	if err != nil {
 216		return nil, fmt.Errorf("failed to get session: %w", err)
 217	}
 218
 219	msgs, err := a.getSessionMessages(ctx, currentSession)
 220	if err != nil {
 221		return nil, fmt.Errorf("failed to get session messages: %w", err)
 222	}
 223
 224	var wg sync.WaitGroup
 225	// Generate title if first message.
 226	if len(msgs) == 0 {
 227		titleCtx := ctx // Copy to avoid race with ctx reassignment below.
 228		wg.Go(func() {
 229			a.generateTitle(titleCtx, call.SessionID, call.Prompt)
 230		})
 231	}
 232	defer wg.Wait()
 233
 234	// Add the user message to the session.
 235	_, err = a.createUserMessage(ctx, call)
 236	if err != nil {
 237		return nil, err
 238	}
 239
 240	// Add the session to the context.
 241	ctx = context.WithValue(ctx, tools.SessionIDContextKey, call.SessionID)
 242
 243	genCtx, cancel := context.WithCancel(ctx)
 244	a.activeRequests.Set(call.SessionID, cancel)
 245
 246	defer cancel()
 247	defer a.activeRequests.Del(call.SessionID)
 248	// Drain any debounced message updates before returning. message.Service
 249	// already flushes synchronously on terminal updates, but a defer here
 250	// guarantees the contract at every Run exit (success, error, panic
 251	// recovery upstream) without callers needing to know.
 252	defer func() {
 253		if flushErr := a.messages.FlushAll(ctx); flushErr != nil {
 254			slog.Error("Failed to flush pending message updates after run", "error", flushErr)
 255		}
 256	}()
 257
 258	history, files := a.preparePrompt(msgs, largeModel.CatwalkCfg.SupportsImages, call.Attachments...)
 259
 260	startTime := time.Now()
 261	a.eventPromptSent(call.SessionID)
 262
 263	var currentAssistant *message.Message
 264	var shouldSummarize bool
 265	// Don't send MaxOutputTokens if 0 — some providers (e.g. LM Studio) reject it
 266	var maxOutputTokens *int64
 267	if call.MaxOutputTokens > 0 {
 268		maxOutputTokens = &call.MaxOutputTokens
 269	}
 270	result, err := agent.Stream(genCtx, fantasy.AgentStreamCall{
 271		Prompt:           message.PromptWithTextAttachments(call.Prompt, call.Attachments),
 272		Files:            files,
 273		Messages:         history,
 274		ProviderOptions:  call.ProviderOptions,
 275		MaxOutputTokens:  maxOutputTokens,
 276		TopP:             call.TopP,
 277		Temperature:      call.Temperature,
 278		PresencePenalty:  call.PresencePenalty,
 279		TopK:             call.TopK,
 280		FrequencyPenalty: call.FrequencyPenalty,
 281		PrepareStep: func(callContext context.Context, options fantasy.PrepareStepFunctionOptions) (_ context.Context, prepared fantasy.PrepareStepResult, err error) {
 282			prepared.Messages = options.Messages
 283			for i := range prepared.Messages {
 284				prepared.Messages[i].ProviderOptions = nil
 285			}
 286
 287			// Use latest tools (updated by SetTools when MCP tools change).
 288			prepared.Tools = a.tools.Copy()
 289
 290			queuedCalls, _ := a.messageQueue.Get(call.SessionID)
 291			a.messageQueue.Del(call.SessionID)
 292			for _, queued := range queuedCalls {
 293				userMessage, createErr := a.createUserMessage(callContext, queued)
 294				if createErr != nil {
 295					return callContext, prepared, createErr
 296				}
 297				prepared.Messages = append(prepared.Messages, userMessage.ToAIMessage()...)
 298			}
 299
 300			prepared.Messages = a.workaroundProviderMediaLimitations(prepared.Messages, largeModel)
 301
 302			lastSystemRoleInx := 0
 303			systemMessageUpdated := false
 304			for i, msg := range prepared.Messages {
 305				// Only add cache control to the last message.
 306				if msg.Role == fantasy.MessageRoleSystem {
 307					lastSystemRoleInx = i
 308				} else if !systemMessageUpdated {
 309					prepared.Messages[lastSystemRoleInx].ProviderOptions = a.getCacheControlOptions()
 310					systemMessageUpdated = true
 311				}
 312				// Than add cache control to the last 2 messages.
 313				if i > len(prepared.Messages)-3 {
 314					prepared.Messages[i].ProviderOptions = a.getCacheControlOptions()
 315				}
 316			}
 317
 318			if promptPrefix != "" {
 319				prepared.Messages = append([]fantasy.Message{fantasy.NewSystemMessage(promptPrefix)}, prepared.Messages...)
 320			}
 321
 322			var assistantMsg message.Message
 323			assistantMsg, err = a.messages.Create(callContext, call.SessionID, message.CreateMessageParams{
 324				Role:     message.Assistant,
 325				Parts:    []message.ContentPart{},
 326				Model:    largeModel.ModelCfg.Model,
 327				Provider: largeModel.ModelCfg.Provider,
 328			})
 329			if err != nil {
 330				return callContext, prepared, err
 331			}
 332			callContext = context.WithValue(callContext, tools.MessageIDContextKey, assistantMsg.ID)
 333			callContext = context.WithValue(callContext, tools.SupportsImagesContextKey, largeModel.CatwalkCfg.SupportsImages)
 334			callContext = context.WithValue(callContext, tools.ModelNameContextKey, largeModel.CatwalkCfg.Name)
 335			currentAssistant = &assistantMsg
 336			return callContext, prepared, err
 337		},
 338		OnReasoningStart: func(id string, reasoning fantasy.ReasoningContent) error {
 339			currentAssistant.AppendReasoningContent(reasoning.Text)
 340			return a.messages.Update(genCtx, *currentAssistant)
 341		},
 342		OnReasoningDelta: func(id string, text string) error {
 343			currentAssistant.AppendReasoningContent(text)
 344			return a.messages.Update(genCtx, *currentAssistant)
 345		},
 346		OnReasoningEnd: func(id string, reasoning fantasy.ReasoningContent) error {
 347			// handle anthropic signature
 348			if anthropicData, ok := reasoning.ProviderMetadata[anthropic.Name]; ok {
 349				if reasoning, ok := anthropicData.(*anthropic.ReasoningOptionMetadata); ok {
 350					currentAssistant.AppendReasoningSignature(reasoning.Signature)
 351				}
 352			}
 353			if googleData, ok := reasoning.ProviderMetadata[google.Name]; ok {
 354				if reasoning, ok := googleData.(*google.ReasoningMetadata); ok {
 355					currentAssistant.AppendThoughtSignature(reasoning.Signature, reasoning.ToolID)
 356				}
 357			}
 358			if openaiData, ok := reasoning.ProviderMetadata[openai.Name]; ok {
 359				if reasoning, ok := openaiData.(*openai.ResponsesReasoningMetadata); ok {
 360					currentAssistant.SetReasoningResponsesData(reasoning)
 361				}
 362			}
 363			currentAssistant.FinishThinking()
 364			return a.messages.Update(genCtx, *currentAssistant)
 365		},
 366		OnTextDelta: func(id string, text string) error {
 367			// Strip leading newline from initial text content. This is is
 368			// particularly important in non-interactive mode where leading
 369			// newlines are very visible.
 370			if len(currentAssistant.Parts) == 0 {
 371				text = strings.TrimPrefix(text, "\n")
 372			}
 373
 374			currentAssistant.AppendContent(text)
 375			return a.messages.Update(genCtx, *currentAssistant)
 376		},
 377		OnToolInputStart: func(id string, toolName string) error {
 378			toolCall := message.ToolCall{
 379				ID:               id,
 380				Name:             toolName,
 381				ProviderExecuted: false,
 382				Finished:         false,
 383			}
 384			currentAssistant.AddToolCall(toolCall)
 385			// Use parent ctx instead of genCtx to ensure the update succeeds
 386			// even if the request is canceled mid-stream
 387			return a.messages.Update(ctx, *currentAssistant)
 388		},
 389		OnRetry: func(err *fantasy.ProviderError, delay time.Duration) {
 390			slog.Warn("Provider request failed, retrying", providerRetryLogFields(err, delay)...)
 391		},
 392		OnToolCall: func(tc fantasy.ToolCallContent) error {
 393			toolCall := message.ToolCall{
 394				ID:               tc.ToolCallID,
 395				Name:             tc.ToolName,
 396				Input:            tc.Input,
 397				ProviderExecuted: false,
 398				Finished:         true,
 399			}
 400			currentAssistant.AddToolCall(toolCall)
 401			// Use parent ctx instead of genCtx to ensure the update succeeds
 402			// even if the request is canceled mid-stream
 403			return a.messages.Update(ctx, *currentAssistant)
 404		},
 405		OnToolResult: func(result fantasy.ToolResultContent) error {
 406			toolResult := a.convertToToolResult(result)
 407			// Use parent ctx instead of genCtx to ensure the message is created
 408			// even if the request is canceled mid-stream
 409			_, createMsgErr := a.messages.Create(ctx, currentAssistant.SessionID, message.CreateMessageParams{
 410				Role: message.Tool,
 411				Parts: []message.ContentPart{
 412					toolResult,
 413				},
 414			})
 415			return createMsgErr
 416		},
 417		OnStepFinish: func(stepResult fantasy.StepResult) error {
 418			finishReason := message.FinishReasonUnknown
 419			switch stepResult.FinishReason {
 420			case fantasy.FinishReasonLength:
 421				finishReason = message.FinishReasonMaxTokens
 422			case fantasy.FinishReasonStop:
 423				finishReason = message.FinishReasonEndTurn
 424			case fantasy.FinishReasonToolCalls:
 425				finishReason = message.FinishReasonToolUse
 426			}
 427			// If a tool result halted the turn (e.g. a hook halt or a
 428			// permission denial), the step ends on FinishReasonToolCalls but
 429			// the model will not be called again. Treat it as the end of the
 430			// turn so the UI can render the assistant footer.
 431			if finishReason == message.FinishReasonToolUse {
 432				for _, tr := range stepResult.Content.ToolResults() {
 433					if tr.StopTurn {
 434						finishReason = message.FinishReasonEndTurn
 435						break
 436					}
 437				}
 438			}
 439			currentAssistant.AddFinish(finishReason, "", "")
 440			sessionLock.Lock()
 441			defer sessionLock.Unlock()
 442
 443			updatedSession, getSessionErr := a.sessions.Get(ctx, call.SessionID)
 444			if getSessionErr != nil {
 445				return getSessionErr
 446			}
 447			a.updateSessionUsage(largeModel, &updatedSession, stepResult.Usage, a.openrouterCost(stepResult.ProviderMetadata))
 448			_, sessionErr := a.sessions.Save(ctx, updatedSession)
 449			if sessionErr != nil {
 450				return sessionErr
 451			}
 452			currentSession = updatedSession
 453			return a.messages.Update(genCtx, *currentAssistant)
 454		},
 455		StopWhen: []fantasy.StopCondition{
 456			func(_ []fantasy.StepResult) bool {
 457				cw := int64(largeModel.CatwalkCfg.ContextWindow)
 458				// If context window is unknown (0), skip auto-summarize
 459				// to avoid immediately truncating custom/local models.
 460				if cw == 0 {
 461					return false
 462				}
 463				tokens := currentSession.CompletionTokens + currentSession.PromptTokens
 464				remaining := cw - tokens
 465				var threshold int64
 466				if cw > largeContextWindowThreshold {
 467					threshold = largeContextWindowBuffer
 468				} else {
 469					threshold = int64(float64(cw) * smallContextWindowRatio)
 470				}
 471				if (remaining <= threshold) && !a.disableAutoSummarize {
 472					shouldSummarize = true
 473					return true
 474				}
 475				return false
 476			},
 477			func(steps []fantasy.StepResult) bool {
 478				return hasRepeatedToolCalls(steps, loopDetectionWindowSize, loopDetectionMaxRepeats)
 479			},
 480		},
 481	})
 482
 483	a.eventPromptResponded(call.SessionID, time.Since(startTime).Truncate(time.Second))
 484
 485	if err != nil {
 486		isHyper := largeModel.ModelCfg.Provider == hyper.Name
 487		isCancelErr := errors.Is(err, context.Canceled)
 488		if currentAssistant == nil {
 489			return result, err
 490		}
 491		// Ensure we finish thinking on error to close the reasoning state.
 492		currentAssistant.FinishThinking()
 493		toolCalls := currentAssistant.ToolCalls()
 494		// INFO: we use the parent context here because the genCtx has been cancelled.
 495		msgs, createErr := a.messages.List(ctx, currentAssistant.SessionID)
 496		if createErr != nil {
 497			return nil, createErr
 498		}
 499		for _, tc := range toolCalls {
 500			if !tc.Finished {
 501				tc.Finished = true
 502				tc.Input = "{}"
 503				currentAssistant.AddToolCall(tc)
 504				updateErr := a.messages.Update(ctx, *currentAssistant)
 505				if updateErr != nil {
 506					return nil, updateErr
 507				}
 508			}
 509
 510			found := false
 511			for _, msg := range msgs {
 512				if msg.Role == message.Tool {
 513					for _, tr := range msg.ToolResults() {
 514						if tr.ToolCallID == tc.ID {
 515							found = true
 516							break
 517						}
 518					}
 519				}
 520				if found {
 521					break
 522				}
 523			}
 524			if found {
 525				continue
 526			}
 527			content := "There was an error while executing the tool"
 528			if isCancelErr {
 529				content = "Error: user cancelled assistant tool calling"
 530			}
 531			toolResult := message.ToolResult{
 532				ToolCallID: tc.ID,
 533				Name:       tc.Name,
 534				Content:    content,
 535				IsError:    true,
 536			}
 537			_, createErr = a.messages.Create(ctx, currentAssistant.SessionID, message.CreateMessageParams{
 538				Role: message.Tool,
 539				Parts: []message.ContentPart{
 540					toolResult,
 541				},
 542			})
 543			if createErr != nil {
 544				return nil, createErr
 545			}
 546		}
 547		var fantasyErr *fantasy.Error
 548		var providerErr *fantasy.ProviderError
 549		const defaultTitle = "Provider Error"
 550		linkStyle := lipgloss.NewStyle().Foreground(charmtone.Guac).Underline(true)
 551		if isCancelErr {
 552			currentAssistant.AddFinish(message.FinishReasonCanceled, "User canceled request", "")
 553		} else if isHyper && errors.As(err, &providerErr) && providerErr.StatusCode == http.StatusUnauthorized {
 554			currentAssistant.AddFinish(message.FinishReasonError, "Unauthorized", `Please re-authenticate with Hyper. You can also run "crush auth" to re-authenticate.`)
 555			if a.notify != nil {
 556				a.notify.Publish(pubsub.CreatedEvent, notify.Notification{
 557					SessionID:    call.SessionID,
 558					SessionTitle: currentSession.Title,
 559					Type:         notify.TypeReAuthenticate,
 560					ProviderID:   largeModel.ModelCfg.Provider,
 561				})
 562			}
 563		} else if isHyper && errors.As(err, &providerErr) && providerErr.StatusCode == http.StatusPaymentRequired {
 564			url := hyper.BaseURL()
 565			link := linkStyle.Hyperlink(url, "id=hyper").Render(url)
 566			currentAssistant.AddFinish(message.FinishReasonError, "No credits", "You're out of credits. Add more at "+link)
 567		} else if errors.As(err, &providerErr) {
 568			if providerErr.Message == "The requested model is not supported." {
 569				url := "https://github.com/settings/copilot/features"
 570				link := linkStyle.Hyperlink(url, "id=copilot").Render(url)
 571				currentAssistant.AddFinish(
 572					message.FinishReasonError,
 573					"Copilot model not enabled",
 574					fmt.Sprintf("%q is not enabled in Copilot. Go to the following page to enable it. Then, wait 5 minutes before trying again. %s", largeModel.CatwalkCfg.Name, link),
 575				)
 576			} else {
 577				currentAssistant.AddFinish(message.FinishReasonError, cmp.Or(stringext.Capitalize(providerErr.Title), defaultTitle), providerErr.Message)
 578			}
 579		} else if errors.As(err, &fantasyErr) {
 580			currentAssistant.AddFinish(message.FinishReasonError, cmp.Or(stringext.Capitalize(fantasyErr.Title), defaultTitle), fantasyErr.Message)
 581		} else {
 582			currentAssistant.AddFinish(message.FinishReasonError, defaultTitle, err.Error())
 583		}
 584		// Note: we use the parent context here because the genCtx has been
 585		// cancelled.
 586		updateErr := a.messages.Update(ctx, *currentAssistant)
 587		if updateErr != nil {
 588			return nil, updateErr
 589		}
 590		return nil, err
 591	}
 592
 593	if shouldSummarize {
 594		a.activeRequests.Del(call.SessionID)
 595		if summarizeErr := a.Summarize(genCtx, call.SessionID, call.ProviderOptions); summarizeErr != nil {
 596			return nil, summarizeErr
 597		}
 598		// If the agent wasn't done...
 599		if len(currentAssistant.ToolCalls()) > 0 {
 600			existing, ok := a.messageQueue.Get(call.SessionID)
 601			if !ok {
 602				existing = []SessionAgentCall{}
 603			}
 604			call.Prompt = fmt.Sprintf("The previous session was interrupted because it got too long, the initial user request was: `%s`", call.Prompt)
 605			existing = append(existing, call)
 606			a.messageQueue.Set(call.SessionID, existing)
 607		}
 608	}
 609
 610	// Release active request before publishing the notification.
 611	// TUI handlers poll IsSessionBusy() and only re-evaluate when a
 612	// tea.Msg arrives, so the cleanup must precede the notify or
 613	// subscribers see stale busy state at the moment of receipt.
 614	a.activeRequests.Del(call.SessionID)
 615	cancel()
 616
 617	// Send notification that agent has finished its turn (skip for
 618	// nested/non-interactive sessions).
 619	if !call.NonInteractive && a.notify != nil {
 620		a.notify.Publish(pubsub.CreatedEvent, notify.Notification{
 621			SessionID:    call.SessionID,
 622			SessionTitle: currentSession.Title,
 623			Type:         notify.TypeAgentFinished,
 624		})
 625	}
 626
 627	queuedMessages, ok := a.messageQueue.Get(call.SessionID)
 628	if !ok || len(queuedMessages) == 0 {
 629		return result, err
 630	}
 631	// There are queued messages restart the loop.
 632	firstQueuedMessage := queuedMessages[0]
 633	a.messageQueue.Set(call.SessionID, queuedMessages[1:])
 634	return a.Run(ctx, firstQueuedMessage)
 635}
 636
 637func (a *sessionAgent) Summarize(ctx context.Context, sessionID string, opts fantasy.ProviderOptions) error {
 638	if a.IsSessionBusy(sessionID) {
 639		return ErrSessionBusy
 640	}
 641
 642	// Copy mutable fields under lock to avoid races with SetModels.
 643	largeModel := a.largeModel.Get()
 644	systemPromptPrefix := a.systemPromptPrefix.Get()
 645
 646	currentSession, err := a.sessions.Get(ctx, sessionID)
 647	if err != nil {
 648		return fmt.Errorf("failed to get session: %w", err)
 649	}
 650	msgs, err := a.getSessionMessages(ctx, currentSession)
 651	if err != nil {
 652		return err
 653	}
 654	if len(msgs) == 0 {
 655		// Nothing to summarize.
 656		return nil
 657	}
 658
 659	aiMsgs, _ := a.preparePrompt(msgs, largeModel.CatwalkCfg.SupportsImages)
 660
 661	genCtx, cancel := context.WithCancel(ctx)
 662	a.activeRequests.Set(sessionID, cancel)
 663	defer a.activeRequests.Del(sessionID)
 664	defer cancel()
 665	defer func() {
 666		if flushErr := a.messages.FlushAll(ctx); flushErr != nil {
 667			slog.Error("Failed to flush pending message updates after summarize", "error", flushErr)
 668		}
 669	}()
 670
 671	agent := fantasy.NewAgent(
 672		largeModel.Model,
 673		fantasy.WithSystemPrompt(string(summaryPrompt)),
 674		fantasy.WithUserAgent(userAgent),
 675	)
 676	summaryMessage, err := a.messages.Create(ctx, sessionID, message.CreateMessageParams{
 677		Role:             message.Assistant,
 678		Model:            largeModel.Model.Model(),
 679		Provider:         largeModel.Model.Provider(),
 680		IsSummaryMessage: true,
 681	})
 682	if err != nil {
 683		return err
 684	}
 685
 686	summaryPromptText := buildSummaryPrompt(currentSession.Todos)
 687
 688	resp, err := agent.Stream(genCtx, fantasy.AgentStreamCall{
 689		Prompt:          summaryPromptText,
 690		Messages:        aiMsgs,
 691		ProviderOptions: opts,
 692		PrepareStep: func(callContext context.Context, options fantasy.PrepareStepFunctionOptions) (_ context.Context, prepared fantasy.PrepareStepResult, err error) {
 693			prepared.Messages = options.Messages
 694			if systemPromptPrefix != "" {
 695				prepared.Messages = append([]fantasy.Message{fantasy.NewSystemMessage(systemPromptPrefix)}, prepared.Messages...)
 696			}
 697			return callContext, prepared, nil
 698		},
 699		OnReasoningDelta: func(id string, text string) error {
 700			summaryMessage.AppendReasoningContent(text)
 701			return a.messages.Update(genCtx, summaryMessage)
 702		},
 703		OnReasoningEnd: func(id string, reasoning fantasy.ReasoningContent) error {
 704			// Handle anthropic signature.
 705			if anthropicData, ok := reasoning.ProviderMetadata["anthropic"]; ok {
 706				if signature, ok := anthropicData.(*anthropic.ReasoningOptionMetadata); ok && signature.Signature != "" {
 707					summaryMessage.AppendReasoningSignature(signature.Signature)
 708				}
 709			}
 710			summaryMessage.FinishThinking()
 711			return a.messages.Update(genCtx, summaryMessage)
 712		},
 713		OnTextDelta: func(id, text string) error {
 714			summaryMessage.AppendContent(text)
 715			return a.messages.Update(genCtx, summaryMessage)
 716		},
 717	})
 718	if err != nil {
 719		isCancelErr := errors.Is(err, context.Canceled)
 720		if isCancelErr {
 721			// User cancelled summarize we need to remove the summary message.
 722			deleteErr := a.messages.Delete(ctx, summaryMessage.ID)
 723			return deleteErr
 724		}
 725		// Mark the summary message as finished with an error so the UI
 726		// stops spinning.
 727		summaryMessage.AddFinish(message.FinishReasonError, "Summarization Error", err.Error())
 728		if updateErr := a.messages.Update(ctx, summaryMessage); updateErr != nil {
 729			return updateErr
 730		}
 731		return err
 732	}
 733
 734	summaryMessage.AddFinish(message.FinishReasonEndTurn, "", "")
 735	err = a.messages.Update(genCtx, summaryMessage)
 736	if err != nil {
 737		return err
 738	}
 739
 740	var openrouterCost *float64
 741	for _, step := range resp.Steps {
 742		stepCost := a.openrouterCost(step.ProviderMetadata)
 743		if stepCost != nil {
 744			newCost := *stepCost
 745			if openrouterCost != nil {
 746				newCost += *openrouterCost
 747			}
 748			openrouterCost = &newCost
 749		}
 750	}
 751
 752	a.updateSessionUsage(largeModel, &currentSession, resp.TotalUsage, openrouterCost)
 753
 754	// Just in case, get just the last usage info.
 755	usage := resp.Response.Usage
 756	currentSession.SummaryMessageID = summaryMessage.ID
 757	currentSession.CompletionTokens = usage.OutputTokens
 758	currentSession.PromptTokens = 0
 759	_, err = a.sessions.Save(genCtx, currentSession)
 760	if err != nil {
 761		return err
 762	}
 763
 764	// Release the active request before processing queued messages so that
 765	// Run() does not see the session as busy.
 766	a.activeRequests.Del(sessionID)
 767	cancel()
 768
 769	// Process any messages that were queued while summarizing.
 770	queuedMessages, ok := a.messageQueue.Get(sessionID)
 771	if !ok || len(queuedMessages) == 0 {
 772		return nil
 773	}
 774	firstQueuedMessage := queuedMessages[0]
 775	a.messageQueue.Set(sessionID, queuedMessages[1:])
 776	_, qErr := a.Run(ctx, firstQueuedMessage)
 777	return qErr
 778}
 779
 780func (a *sessionAgent) getCacheControlOptions() fantasy.ProviderOptions {
 781	if t, _ := strconv.ParseBool(os.Getenv("CRUSH_DISABLE_ANTHROPIC_CACHE")); t {
 782		return fantasy.ProviderOptions{}
 783	}
 784	return fantasy.ProviderOptions{
 785		anthropic.Name: &anthropic.ProviderCacheControlOptions{
 786			CacheControl: anthropic.CacheControl{Type: "ephemeral"},
 787		},
 788		bedrock.Name: &anthropic.ProviderCacheControlOptions{
 789			CacheControl: anthropic.CacheControl{Type: "ephemeral"},
 790		},
 791		vercel.Name: &anthropic.ProviderCacheControlOptions{
 792			CacheControl: anthropic.CacheControl{Type: "ephemeral"},
 793		},
 794	}
 795}
 796
 797func (a *sessionAgent) createUserMessage(ctx context.Context, call SessionAgentCall) (message.Message, error) {
 798	parts := []message.ContentPart{message.TextContent{Text: call.Prompt}}
 799	var attachmentParts []message.ContentPart
 800	for _, attachment := range call.Attachments {
 801		attachmentParts = append(attachmentParts, message.BinaryContent{Path: attachment.FilePath, MIMEType: attachment.MimeType, Data: attachment.Content})
 802	}
 803	parts = append(parts, attachmentParts...)
 804	msg, err := a.messages.Create(ctx, call.SessionID, message.CreateMessageParams{
 805		Role:  message.User,
 806		Parts: parts,
 807	})
 808	if err != nil {
 809		return message.Message{}, fmt.Errorf("failed to create user message: %w", err)
 810	}
 811	return msg, nil
 812}
 813
 814func (a *sessionAgent) preparePrompt(msgs []message.Message, supportsImages bool, attachments ...message.Attachment) ([]fantasy.Message, []fantasy.FilePart) {
 815	var history []fantasy.Message
 816	if !a.isSubAgent {
 817		history = append(history, fantasy.NewUserMessage(
 818			fmt.Sprintf(
 819				"<system_reminder>%s</system_reminder>",
 820				`This is a reminder that your todo list is currently empty. DO NOT mention this to the user explicitly because they are already aware.
 821If you are working on tasks that would benefit from a todo list please use the "todos" tool to create one.
 822If not, please feel free to ignore. Again do not mention this message to the user.`,
 823			),
 824		))
 825	}
 826	// Collect all tool call IDs present in assistant messages and all tool
 827	// result IDs present in tool messages. This lets us detect both orphaned
 828	// tool results (result without a call) and orphaned tool calls (call
 829	// without a result).
 830	knownToolCallIDs := make(map[string]struct{})
 831	knownToolResultIDs := make(map[string]struct{})
 832	for _, m := range msgs {
 833		switch m.Role {
 834		case message.Assistant:
 835			for _, tc := range m.ToolCalls() {
 836				knownToolCallIDs[tc.ID] = struct{}{}
 837			}
 838		case message.Tool:
 839			for _, tr := range m.ToolResults() {
 840				knownToolResultIDs[tr.ToolCallID] = struct{}{}
 841			}
 842		}
 843	}
 844
 845	for _, m := range msgs {
 846		if len(m.Parts) == 0 {
 847			continue
 848		}
 849		// Assistant message without content or tool calls (cancelled before it returned anything).
 850		if m.Role == message.Assistant && len(m.ToolCalls()) == 0 && m.Content().Text == "" && m.ReasoningContent().String() == "" {
 851			continue
 852		}
 853		if m.Role == message.Tool {
 854			if msg, ok := filterOrphanedToolResults(m, knownToolCallIDs); ok {
 855				history = append(history, msg)
 856			}
 857			continue
 858		}
 859		aiMsgs := m.ToAIMessage()
 860		if !supportsImages {
 861			for i := range aiMsgs {
 862				if aiMsgs[i].Role == fantasy.MessageRoleUser {
 863					aiMsgs[i].Content = filterFileParts(aiMsgs[i].Content)
 864				}
 865			}
 866		}
 867		history = append(history, aiMsgs...)
 868
 869		if m.Role == message.Assistant {
 870			if msg, ok := syntheticToolResultsForOrphanedCalls(m, knownToolResultIDs); ok {
 871				history = append(history, msg)
 872			}
 873		}
 874	}
 875
 876	var files []fantasy.FilePart
 877	for _, attachment := range attachments {
 878		if attachment.IsText() {
 879			continue
 880		}
 881		files = append(files, fantasy.FilePart{
 882			Filename:  attachment.FileName,
 883			Data:      attachment.Content,
 884			MediaType: attachment.MimeType,
 885		})
 886	}
 887
 888	return history, files
 889}
 890
 891// filterFileParts removes fantasy.FilePart entries from a slice of message
 892// parts. Used to strip image attachments from historical user messages when
 893// the current model does not support them.
 894func filterFileParts(parts []fantasy.MessagePart) []fantasy.MessagePart {
 895	filtered := make([]fantasy.MessagePart, 0, len(parts))
 896	for _, part := range parts {
 897		if _, ok := fantasy.AsMessagePart[fantasy.FilePart](part); ok {
 898			continue
 899		}
 900		filtered = append(filtered, part)
 901	}
 902	return filtered
 903}
 904
 905// filterOrphanedToolResults converts a tool message to a fantasy.Message,
 906// dropping any tool result parts whose tool_call_id has no matching tool call
 907// in the known set. An orphaned result causes API validation to fail on every
 908// subsequent turn, permanently locking the session. Returns the filtered
 909// message and true if at least one valid part remains.
 910func filterOrphanedToolResults(m message.Message, knownToolCallIDs map[string]struct{}) (fantasy.Message, bool) {
 911	aiMsgs := m.ToAIMessage()
 912	if len(aiMsgs) == 0 {
 913		return fantasy.Message{}, false
 914	}
 915	var validParts []fantasy.MessagePart
 916	for _, part := range aiMsgs[0].Content {
 917		tr, ok := fantasy.AsMessagePart[fantasy.ToolResultPart](part)
 918		if !ok {
 919			validParts = append(validParts, part)
 920			continue
 921		}
 922		if _, known := knownToolCallIDs[tr.ToolCallID]; known {
 923			validParts = append(validParts, part)
 924		} else {
 925			slog.Warn(
 926				"Dropping orphaned tool result with no matching tool call",
 927				"tool_call_id", tr.ToolCallID,
 928			)
 929		}
 930	}
 931	if len(validParts) == 0 {
 932		return fantasy.Message{}, false
 933	}
 934	msg := aiMsgs[0]
 935	msg.Content = validParts
 936	return msg, true
 937}
 938
 939// syntheticToolResultsForOrphanedCalls returns a tool message containing
 940// synthetic tool results for any tool calls in the assistant message that
 941// have no matching result in knownToolResultIDs. LLM APIs require every
 942// tool_use to be immediately followed by a tool_result; an interrupted
 943// session can leave orphaned tool_use blocks that permanently lock the
 944// conversation. Returns the message and true if any synthetic results were
 945// produced.
 946func syntheticToolResultsForOrphanedCalls(m message.Message, knownToolResultIDs map[string]struct{}) (fantasy.Message, bool) {
 947	var syntheticParts []fantasy.MessagePart
 948	for _, tc := range m.ToolCalls() {
 949		if _, hasResult := knownToolResultIDs[tc.ID]; hasResult {
 950			continue
 951		}
 952		slog.Warn(
 953			"Injecting synthetic tool result for orphaned tool call",
 954			"tool_call_id", tc.ID,
 955			"tool_name", tc.Name,
 956		)
 957		syntheticParts = append(syntheticParts, fantasy.ToolResultPart{
 958			ToolCallID: tc.ID,
 959			Output: fantasy.ToolResultOutputContentError{
 960				Error: errors.New("tool call was interrupted and did not produce a result, you may retry this call if the result is still needed"),
 961			},
 962		})
 963	}
 964	if len(syntheticParts) == 0 {
 965		return fantasy.Message{}, false
 966	}
 967	return fantasy.Message{
 968		Role:    fantasy.MessageRoleTool,
 969		Content: syntheticParts,
 970	}, true
 971}
 972
 973func (a *sessionAgent) getSessionMessages(ctx context.Context, session session.Session) ([]message.Message, error) {
 974	msgs, err := a.messages.List(ctx, session.ID)
 975	if err != nil {
 976		return nil, fmt.Errorf("failed to list messages: %w", err)
 977	}
 978
 979	if session.SummaryMessageID != "" {
 980		summaryMsgIndex := -1
 981		for i, msg := range msgs {
 982			if msg.ID == session.SummaryMessageID {
 983				summaryMsgIndex = i
 984				break
 985			}
 986		}
 987		if summaryMsgIndex != -1 {
 988			msgs = msgs[summaryMsgIndex:]
 989			msgs[0].Role = message.User
 990		}
 991	}
 992	return msgs, nil
 993}
 994
 995// generateTitle generates a session titled based on the initial prompt.
 996func (a *sessionAgent) generateTitle(ctx context.Context, sessionID string, userPrompt string) {
 997	if userPrompt == "" {
 998		return
 999	}
1000
1001	smallModel := a.smallModel.Get()
1002	largeModel := a.largeModel.Get()
1003	systemPromptPrefix := a.systemPromptPrefix.Get()
1004
1005	var maxOutputTokens int64 = 40
1006	if smallModel.CatwalkCfg.CanReason {
1007		maxOutputTokens = smallModel.CatwalkCfg.DefaultMaxTokens
1008	}
1009
1010	newAgent := func(m fantasy.LanguageModel, p []byte, tok int64) fantasy.Agent {
1011		return fantasy.NewAgent(
1012			m,
1013			fantasy.WithSystemPrompt(string(p)+"\n /no_think"),
1014			fantasy.WithMaxOutputTokens(tok),
1015			fantasy.WithUserAgent(userAgent),
1016		)
1017	}
1018
1019	streamCall := fantasy.AgentStreamCall{
1020		Prompt: fmt.Sprintf("Generate a concise title for the following content:\n\n%s\n <think>\n\n</think>", userPrompt),
1021		PrepareStep: func(callCtx context.Context, opts fantasy.PrepareStepFunctionOptions) (_ context.Context, prepared fantasy.PrepareStepResult, err error) {
1022			prepared.Messages = opts.Messages
1023			if systemPromptPrefix != "" {
1024				prepared.Messages = append([]fantasy.Message{
1025					fantasy.NewSystemMessage(systemPromptPrefix),
1026				}, prepared.Messages...)
1027			}
1028			return callCtx, prepared, nil
1029		},
1030	}
1031
1032	// Use the small model to generate the title.
1033	model := smallModel
1034	agent := newAgent(model.Model, titlePrompt, maxOutputTokens)
1035	resp, err := agent.Stream(ctx, streamCall)
1036	if err == nil {
1037		// We successfully generated a title with the small model.
1038		slog.Debug("Generated title with small model")
1039	} else {
1040		// It didn't work. Let's try with the big model.
1041		slog.Error("Error generating title with small model; trying big model", "err", err)
1042		model = largeModel
1043		agent = newAgent(model.Model, titlePrompt, maxOutputTokens)
1044		resp, err = agent.Stream(ctx, streamCall)
1045		if err == nil {
1046			slog.Debug("Generated title with large model")
1047		} else {
1048			// Welp, the large model didn't work either. Use the default
1049			// session name and return.
1050			slog.Error("Error generating title with large model", "err", err)
1051			saveErr := a.sessions.Rename(ctx, sessionID, DefaultSessionName)
1052			if saveErr != nil {
1053				slog.Error("Failed to save session title", "error", saveErr)
1054			}
1055			return
1056		}
1057	}
1058
1059	if resp == nil {
1060		// Actually, we didn't get a response so we can't. Use the default
1061		// session name and return.
1062		slog.Error("Response is nil; can't generate title")
1063		saveErr := a.sessions.Rename(ctx, sessionID, DefaultSessionName)
1064		if saveErr != nil {
1065			slog.Error("Failed to save session title", "error", saveErr)
1066		}
1067		return
1068	}
1069
1070	// Clean up title.
1071	var title string
1072	title = strings.ReplaceAll(resp.Response.Content.Text(), "\n", " ")
1073
1074	// Remove thinking tags if present.
1075	title = thinkTagRegex.ReplaceAllString(title, "")
1076	title = orphanThinkTagRegex.ReplaceAllString(title, "")
1077
1078	title = strings.TrimSpace(title)
1079	title = cmp.Or(title, DefaultSessionName)
1080
1081	// Calculate usage and cost.
1082	var openrouterCost *float64
1083	for _, step := range resp.Steps {
1084		stepCost := a.openrouterCost(step.ProviderMetadata)
1085		if stepCost != nil {
1086			newCost := *stepCost
1087			if openrouterCost != nil {
1088				newCost += *openrouterCost
1089			}
1090			openrouterCost = &newCost
1091		}
1092	}
1093
1094	modelConfig := model.CatwalkCfg
1095	cost := modelConfig.CostPer1MInCached/1e6*float64(resp.TotalUsage.CacheCreationTokens) +
1096		modelConfig.CostPer1MOutCached/1e6*float64(resp.TotalUsage.CacheReadTokens) +
1097		modelConfig.CostPer1MIn/1e6*float64(resp.TotalUsage.InputTokens) +
1098		modelConfig.CostPer1MOut/1e6*float64(resp.TotalUsage.OutputTokens)
1099
1100	// Use override cost if available (e.g., from OpenRouter).
1101	if openrouterCost != nil {
1102		cost = *openrouterCost
1103	}
1104
1105	// Skip cost accumulation
1106	if model.FlatRate {
1107		cost = 0
1108	}
1109
1110	promptTokens := resp.TotalUsage.InputTokens + resp.TotalUsage.CacheCreationTokens
1111	completionTokens := resp.TotalUsage.OutputTokens
1112
1113	// Atomically update only title and usage fields to avoid overriding other
1114	// concurrent session updates.
1115	saveErr := a.sessions.UpdateTitleAndUsage(ctx, sessionID, title, promptTokens, completionTokens, cost)
1116	if saveErr != nil {
1117		slog.Error("Failed to save session title and usage", "error", saveErr)
1118		return
1119	}
1120}
1121
1122func (a *sessionAgent) openrouterCost(metadata fantasy.ProviderMetadata) *float64 {
1123	openrouterMetadata, ok := metadata[openrouter.Name]
1124	if !ok {
1125		return nil
1126	}
1127
1128	opts, ok := openrouterMetadata.(*openrouter.ProviderMetadata)
1129	if !ok {
1130		return nil
1131	}
1132	return &opts.Usage.Cost
1133}
1134
1135func (a *sessionAgent) updateSessionUsage(model Model, session *session.Session, usage fantasy.Usage, overrideCost *float64) {
1136	modelConfig := model.CatwalkCfg
1137	cost := modelConfig.CostPer1MInCached/1e6*float64(usage.CacheCreationTokens) +
1138		modelConfig.CostPer1MOutCached/1e6*float64(usage.CacheReadTokens) +
1139		modelConfig.CostPer1MIn/1e6*float64(usage.InputTokens) +
1140		modelConfig.CostPer1MOut/1e6*float64(usage.OutputTokens)
1141
1142	a.eventTokensUsed(session.ID, model, usage, cost)
1143
1144	// Use override cost if available (e.g., from OpenRouter).
1145	if overrideCost != nil {
1146		cost = *overrideCost
1147	}
1148
1149	// Skip cost accumulation
1150	if model.FlatRate {
1151		cost = 0
1152	}
1153
1154	session.Cost += cost
1155	session.CompletionTokens = usage.OutputTokens
1156	session.PromptTokens = usage.InputTokens + usage.CacheReadTokens
1157}
1158
1159func (a *sessionAgent) Cancel(sessionID string) {
1160	// Cancel regular requests. Don't use Take() here - we need the entry to
1161	// remain in activeRequests so IsBusy() returns true until the goroutine
1162	// fully completes (including error handling that may access the DB).
1163	// The defer in processRequest will clean up the entry.
1164	if cancel, ok := a.activeRequests.Get(sessionID); ok && cancel != nil {
1165		slog.Debug("Request cancellation initiated", "session_id", sessionID)
1166		cancel()
1167	}
1168
1169	// Also check for summarize requests.
1170	if cancel, ok := a.activeRequests.Get(sessionID + "-summarize"); ok && cancel != nil {
1171		slog.Debug("Summarize cancellation initiated", "session_id", sessionID)
1172		cancel()
1173	}
1174
1175	if a.QueuedPrompts(sessionID) > 0 {
1176		slog.Debug("Clearing queued prompts", "session_id", sessionID)
1177		a.messageQueue.Del(sessionID)
1178	}
1179}
1180
1181func (a *sessionAgent) ClearQueue(sessionID string) {
1182	if a.QueuedPrompts(sessionID) > 0 {
1183		slog.Debug("Clearing queued prompts", "session_id", sessionID)
1184		a.messageQueue.Del(sessionID)
1185	}
1186}
1187
1188func (a *sessionAgent) CancelAll() {
1189	if !a.IsBusy() {
1190		return
1191	}
1192	for key := range a.activeRequests.Seq2() {
1193		a.Cancel(key) // key is sessionID
1194	}
1195
1196	timeout := time.After(5 * time.Second)
1197	for a.IsBusy() {
1198		select {
1199		case <-timeout:
1200			return
1201		default:
1202			time.Sleep(200 * time.Millisecond)
1203		}
1204	}
1205}
1206
1207func (a *sessionAgent) IsBusy() bool {
1208	var busy bool
1209	for cancelFunc := range a.activeRequests.Seq() {
1210		if cancelFunc != nil {
1211			busy = true
1212			break
1213		}
1214	}
1215	return busy
1216}
1217
1218func (a *sessionAgent) IsSessionBusy(sessionID string) bool {
1219	_, busy := a.activeRequests.Get(sessionID)
1220	return busy
1221}
1222
1223func (a *sessionAgent) QueuedPrompts(sessionID string) int {
1224	l, ok := a.messageQueue.Get(sessionID)
1225	if !ok {
1226		return 0
1227	}
1228	return len(l)
1229}
1230
1231func (a *sessionAgent) QueuedPromptsList(sessionID string) []string {
1232	l, ok := a.messageQueue.Get(sessionID)
1233	if !ok {
1234		return nil
1235	}
1236	prompts := make([]string, len(l))
1237	for i, call := range l {
1238		prompts[i] = call.Prompt
1239	}
1240	return prompts
1241}
1242
1243func (a *sessionAgent) SetModels(large Model, small Model) {
1244	a.largeModel.Set(large)
1245	a.smallModel.Set(small)
1246}
1247
1248func (a *sessionAgent) SetTools(tools []fantasy.AgentTool) {
1249	a.tools.SetSlice(tools)
1250}
1251
1252func (a *sessionAgent) SetSystemPrompt(systemPrompt string) {
1253	a.systemPrompt.Set(systemPrompt)
1254}
1255
1256func (a *sessionAgent) Model() Model {
1257	return a.largeModel.Get()
1258}
1259
1260// convertToToolResult converts a fantasy tool result to a message tool result.
1261func (a *sessionAgent) convertToToolResult(result fantasy.ToolResultContent) message.ToolResult {
1262	baseResult := message.ToolResult{
1263		ToolCallID: result.ToolCallID,
1264		Name:       result.ToolName,
1265		Metadata:   result.ClientMetadata,
1266	}
1267
1268	switch result.Result.GetType() {
1269	case fantasy.ToolResultContentTypeText:
1270		if r, ok := fantasy.AsToolResultOutputType[fantasy.ToolResultOutputContentText](result.Result); ok {
1271			baseResult.Content = r.Text
1272		}
1273	case fantasy.ToolResultContentTypeError:
1274		if r, ok := fantasy.AsToolResultOutputType[fantasy.ToolResultOutputContentError](result.Result); ok {
1275			baseResult.Content = r.Error.Error()
1276			baseResult.IsError = true
1277		}
1278	case fantasy.ToolResultContentTypeMedia:
1279		if r, ok := fantasy.AsToolResultOutputType[fantasy.ToolResultOutputContentMedia](result.Result); ok {
1280			if !stringext.IsValidBase64(r.Data) {
1281				slog.Warn(
1282					"Tool returned media with invalid base64 data, discarding image",
1283					"tool", result.ToolName,
1284					"tool_call_id", result.ToolCallID,
1285				)
1286				baseResult.Content = "Tool returned image data with invalid encoding"
1287				baseResult.IsError = true
1288			} else {
1289				content := r.Text
1290				if content == "" {
1291					content = fmt.Sprintf("Loaded %s content", r.MediaType)
1292				}
1293				baseResult.Content = content
1294				baseResult.Data = r.Data
1295				baseResult.MIMEType = r.MediaType
1296			}
1297		}
1298	}
1299
1300	return baseResult
1301}
1302
1303// workaroundProviderMediaLimitations converts media content in tool results to
1304// user messages for providers that don't natively support images in tool results.
1305//
1306// Problem: OpenAI, Google, OpenRouter, and other OpenAI-compatible providers
1307// don't support sending images/media in tool result messages - they only accept
1308// text in tool results. However, they DO support images in user messages.
1309//
1310// If we send media in tool results to these providers, the API returns an error.
1311//
1312// Solution: For these providers, we:
1313//  1. Replace the media in the tool result with a text placeholder
1314//  2. Inject a user message immediately after with the image as a file attachment
1315//  3. This maintains the tool execution flow while working around API limitations
1316//
1317// Anthropic and Bedrock support images natively in tool results, so we skip
1318// this workaround for them.
1319//
1320// Example transformation:
1321//
1322//	BEFORE: [tool result: image data]
1323//	AFTER:  [tool result: "Image loaded - see attached"], [user: image attachment]
1324func (a *sessionAgent) workaroundProviderMediaLimitations(messages []fantasy.Message, largeModel Model) []fantasy.Message {
1325	providerSupportsMedia := largeModel.ModelCfg.Provider == string(catwalk.InferenceProviderAnthropic) ||
1326		largeModel.ModelCfg.Provider == string(catwalk.InferenceProviderBedrock)
1327
1328	if providerSupportsMedia {
1329		return messages
1330	}
1331
1332	convertedMessages := make([]fantasy.Message, 0, len(messages))
1333
1334	for _, msg := range messages {
1335		if msg.Role != fantasy.MessageRoleTool {
1336			convertedMessages = append(convertedMessages, msg)
1337			continue
1338		}
1339
1340		textParts := make([]fantasy.MessagePart, 0, len(msg.Content))
1341		var mediaFiles []fantasy.FilePart
1342
1343		for _, part := range msg.Content {
1344			toolResult, ok := fantasy.AsMessagePart[fantasy.ToolResultPart](part)
1345			if !ok {
1346				textParts = append(textParts, part)
1347				continue
1348			}
1349
1350			if media, ok := fantasy.AsToolResultOutputType[fantasy.ToolResultOutputContentMedia](toolResult.Output); ok {
1351				decoded, err := base64.StdEncoding.DecodeString(media.Data)
1352				if err != nil {
1353					slog.Warn("Failed to decode media data", "error", err)
1354					textParts = append(textParts, part)
1355					continue
1356				}
1357
1358				mediaFiles = append(mediaFiles, fantasy.FilePart{
1359					Data:      decoded,
1360					MediaType: media.MediaType,
1361					Filename:  fmt.Sprintf("tool-result-%s", toolResult.ToolCallID),
1362				})
1363
1364				textParts = append(textParts, fantasy.ToolResultPart{
1365					ToolCallID: toolResult.ToolCallID,
1366					Output: fantasy.ToolResultOutputContentText{
1367						Text: "[Image/media content loaded - see attached file]",
1368					},
1369					ProviderOptions: toolResult.ProviderOptions,
1370				})
1371			} else {
1372				textParts = append(textParts, part)
1373			}
1374		}
1375
1376		convertedMessages = append(convertedMessages, fantasy.Message{
1377			Role:    fantasy.MessageRoleTool,
1378			Content: textParts,
1379		})
1380
1381		if len(mediaFiles) > 0 {
1382			convertedMessages = append(convertedMessages, fantasy.NewUserMessage(
1383				"Here is the media content from the tool result:",
1384				mediaFiles...,
1385			))
1386		}
1387	}
1388
1389	return convertedMessages
1390}
1391
1392// buildSummaryPrompt constructs the prompt text for session summarization.
1393func buildSummaryPrompt(todos []session.Todo) string {
1394	var sb strings.Builder
1395	sb.WriteString("Provide a detailed summary of our conversation above.")
1396	if len(todos) > 0 {
1397		sb.WriteString("\n\n## Current Todo List\n\n")
1398		for _, t := range todos {
1399			fmt.Fprintf(&sb, "- [%s] %s\n", t.Status, t.Content)
1400		}
1401		sb.WriteString("\nInclude these tasks and their statuses in your summary. ")
1402		sb.WriteString("Instruct the resuming assistant to use the `todos` tool to continue tracking progress on these tasks.")
1403	}
1404	return sb.String()
1405}
1406
1407func providerRetryLogFields(err *fantasy.ProviderError, delay time.Duration) []any {
1408	fields := []any{
1409		"retry_delay", delay.String(),
1410	}
1411	if err == nil {
1412		return fields
1413	}
1414	fields = append(fields, "status_code", err.StatusCode)
1415	if err.Title != "" {
1416		fields = append(fields, "title", err.Title)
1417	}
1418	if err.Message != "" {
1419		fields = append(fields, "message", err.Message)
1420	}
1421	return fields
1422}