agent.go

   1// Package agent is the core orchestration layer for Crush AI agents.
   2//
   3// It provides session-based AI agent functionality for managing
   4// conversations, tool execution, and message handling. It coordinates
   5// interactions between language models, messages, sessions, and tools while
   6// handling features like automatic summarization, queuing, and token
   7// management.
   8package agent
   9
  10import (
  11	"cmp"
  12	"context"
  13	_ "embed"
  14	"encoding/base64"
  15	"errors"
  16	"fmt"
  17	"log/slog"
  18	"net/http"
  19	"os"
  20	"regexp"
  21	"strconv"
  22	"strings"
  23	"sync"
  24	"time"
  25
  26	"charm.land/catwalk/pkg/catwalk"
  27	"charm.land/fantasy"
  28	"charm.land/fantasy/providers/anthropic"
  29	"charm.land/fantasy/providers/bedrock"
  30	"charm.land/fantasy/providers/google"
  31	"charm.land/fantasy/providers/openai"
  32	"charm.land/fantasy/providers/openrouter"
  33	"charm.land/fantasy/providers/vercel"
  34	"charm.land/lipgloss/v2"
  35	"github.com/charmbracelet/crush/internal/agent/hyper"
  36	"github.com/charmbracelet/crush/internal/agent/notify"
  37	"github.com/charmbracelet/crush/internal/agent/tools"
  38	"github.com/charmbracelet/crush/internal/agent/tools/mcp"
  39	"github.com/charmbracelet/crush/internal/config"
  40	"github.com/charmbracelet/crush/internal/csync"
  41	"github.com/charmbracelet/crush/internal/message"
  42	"github.com/charmbracelet/crush/internal/pubsub"
  43	"github.com/charmbracelet/crush/internal/session"
  44	"github.com/charmbracelet/crush/internal/stringext"
  45	"github.com/charmbracelet/crush/internal/version"
  46	"github.com/charmbracelet/x/exp/charmtone"
  47)
  48
  49const (
  50	DefaultSessionName = "Untitled Session"
  51
  52	// Constants for auto-summarization thresholds
  53	largeContextWindowThreshold = 200_000
  54	largeContextWindowBuffer    = 20_000
  55	smallContextWindowRatio     = 0.2
  56)
  57
  58var userAgent = fmt.Sprintf("Charm-Crush/%s (https://charm.land/crush)", version.Version)
  59
  60//go:embed templates/title.md
  61var titlePrompt []byte
  62
  63//go:embed templates/summary.md
  64var summaryPrompt []byte
  65
  66// Used to remove <think> tags from generated titles.
  67var (
  68	thinkTagRegex       = regexp.MustCompile(`(?s)<think>.*?</think>`)
  69	orphanThinkTagRegex = regexp.MustCompile(`</?think>`)
  70)
  71
  72type SessionAgentCall struct {
  73	SessionID        string
  74	Prompt           string
  75	ProviderOptions  fantasy.ProviderOptions
  76	Attachments      []message.Attachment
  77	MaxOutputTokens  int64
  78	Temperature      *float64
  79	TopP             *float64
  80	TopK             *int64
  81	FrequencyPenalty *float64
  82	PresencePenalty  *float64
  83	NonInteractive   bool
  84}
  85
  86type SessionAgent interface {
  87	Run(context.Context, SessionAgentCall) (*fantasy.AgentResult, error)
  88	SetModels(large Model, small Model)
  89	SetTools(tools []fantasy.AgentTool)
  90	SetSystemPrompt(systemPrompt string)
  91	Cancel(sessionID string)
  92	CancelAll()
  93	IsSessionBusy(sessionID string) bool
  94	IsBusy() bool
  95	QueuedPrompts(sessionID string) int
  96	QueuedPromptsList(sessionID string) []string
  97	ClearQueue(sessionID string)
  98	Summarize(context.Context, string, fantasy.ProviderOptions) error
  99	Model() Model
 100}
 101
 102type Model struct {
 103	Model      fantasy.LanguageModel
 104	CatwalkCfg catwalk.Model
 105	ModelCfg   config.SelectedModel
 106	FlatRate   bool
 107}
 108
 109type sessionAgent struct {
 110	largeModel         *csync.Value[Model]
 111	smallModel         *csync.Value[Model]
 112	systemPromptPrefix *csync.Value[string]
 113	systemPrompt       *csync.Value[string]
 114	tools              *csync.Slice[fantasy.AgentTool]
 115
 116	isSubAgent           bool
 117	sessions             session.Service
 118	messages             message.Service
 119	disableAutoSummarize bool
 120	isYolo               bool
 121	notify               pubsub.Publisher[notify.Notification]
 122
 123	messageQueue   *csync.Map[string, []SessionAgentCall]
 124	activeRequests *csync.Map[string, context.CancelFunc]
 125}
 126
 127type SessionAgentOptions struct {
 128	LargeModel           Model
 129	SmallModel           Model
 130	SystemPromptPrefix   string
 131	SystemPrompt         string
 132	IsSubAgent           bool
 133	DisableAutoSummarize bool
 134	IsYolo               bool
 135	Sessions             session.Service
 136	Messages             message.Service
 137	Tools                []fantasy.AgentTool
 138	Notify               pubsub.Publisher[notify.Notification]
 139}
 140
 141func NewSessionAgent(
 142	opts SessionAgentOptions,
 143) SessionAgent {
 144	return &sessionAgent{
 145		largeModel:           csync.NewValue(opts.LargeModel),
 146		smallModel:           csync.NewValue(opts.SmallModel),
 147		systemPromptPrefix:   csync.NewValue(opts.SystemPromptPrefix),
 148		systemPrompt:         csync.NewValue(opts.SystemPrompt),
 149		isSubAgent:           opts.IsSubAgent,
 150		sessions:             opts.Sessions,
 151		messages:             opts.Messages,
 152		disableAutoSummarize: opts.DisableAutoSummarize,
 153		tools:                csync.NewSliceFrom(opts.Tools),
 154		isYolo:               opts.IsYolo,
 155		notify:               opts.Notify,
 156		messageQueue:         csync.NewMap[string, []SessionAgentCall](),
 157		activeRequests:       csync.NewMap[string, context.CancelFunc](),
 158	}
 159}
 160
 161func (a *sessionAgent) Run(ctx context.Context, call SessionAgentCall) (*fantasy.AgentResult, error) {
 162	if call.Prompt == "" && !message.ContainsTextAttachment(call.Attachments) {
 163		return nil, ErrEmptyPrompt
 164	}
 165	if call.SessionID == "" {
 166		return nil, ErrSessionMissing
 167	}
 168
 169	// Queue the message if busy
 170	if a.IsSessionBusy(call.SessionID) {
 171		existing, ok := a.messageQueue.Get(call.SessionID)
 172		if !ok {
 173			existing = []SessionAgentCall{}
 174		}
 175		existing = append(existing, call)
 176		a.messageQueue.Set(call.SessionID, existing)
 177		return nil, nil
 178	}
 179
 180	// Copy mutable fields under lock to avoid races with SetTools/SetModels.
 181	agentTools := a.tools.Copy()
 182	largeModel := a.largeModel.Get()
 183	systemPrompt := a.systemPrompt.Get()
 184	promptPrefix := a.systemPromptPrefix.Get()
 185	var instructions strings.Builder
 186
 187	for _, server := range mcp.GetStates() {
 188		if server.State != mcp.StateConnected {
 189			continue
 190		}
 191		if s := server.Client.InitializeResult().Instructions; s != "" {
 192			instructions.WriteString(s)
 193			instructions.WriteString("\n\n")
 194		}
 195	}
 196
 197	if s := instructions.String(); s != "" {
 198		systemPrompt += "\n\n<mcp-instructions>\n" + s + "\n</mcp-instructions>"
 199	}
 200
 201	if len(agentTools) > 0 {
 202		// Add Anthropic caching to the last tool.
 203		agentTools[len(agentTools)-1].SetProviderOptions(a.getCacheControlOptions())
 204	}
 205
 206	agent := fantasy.NewAgent(
 207		largeModel.Model,
 208		fantasy.WithSystemPrompt(systemPrompt),
 209		fantasy.WithTools(agentTools...),
 210		fantasy.WithUserAgent(userAgent),
 211	)
 212
 213	sessionLock := sync.Mutex{}
 214	currentSession, err := a.sessions.Get(ctx, call.SessionID)
 215	if err != nil {
 216		return nil, fmt.Errorf("failed to get session: %w", err)
 217	}
 218
 219	msgs, err := a.getSessionMessages(ctx, currentSession)
 220	if err != nil {
 221		return nil, fmt.Errorf("failed to get session messages: %w", err)
 222	}
 223
 224	var wg sync.WaitGroup
 225	// Generate title if first message.
 226	if len(msgs) == 0 {
 227		titleCtx := ctx // Copy to avoid race with ctx reassignment below.
 228		wg.Go(func() {
 229			a.generateTitle(titleCtx, call.SessionID, call.Prompt)
 230		})
 231	}
 232	defer wg.Wait()
 233
 234	// Add the user message to the session.
 235	_, err = a.createUserMessage(ctx, call)
 236	if err != nil {
 237		return nil, err
 238	}
 239
 240	// Add the session to the context.
 241	ctx = context.WithValue(ctx, tools.SessionIDContextKey, call.SessionID)
 242
 243	genCtx, cancel := context.WithCancel(ctx)
 244	a.activeRequests.Set(call.SessionID, cancel)
 245
 246	defer cancel()
 247	defer a.activeRequests.Del(call.SessionID)
 248
 249	history, files := a.preparePrompt(msgs, largeModel.CatwalkCfg.SupportsImages, call.Attachments...)
 250
 251	startTime := time.Now()
 252	a.eventPromptSent(call.SessionID)
 253
 254	var currentAssistant *message.Message
 255	var shouldSummarize bool
 256	// Don't send MaxOutputTokens if 0 — some providers (e.g. LM Studio) reject it
 257	var maxOutputTokens *int64
 258	if call.MaxOutputTokens > 0 {
 259		maxOutputTokens = &call.MaxOutputTokens
 260	}
 261	result, err := agent.Stream(genCtx, fantasy.AgentStreamCall{
 262		Prompt:           message.PromptWithTextAttachments(call.Prompt, call.Attachments),
 263		Files:            files,
 264		Messages:         history,
 265		ProviderOptions:  call.ProviderOptions,
 266		MaxOutputTokens:  maxOutputTokens,
 267		TopP:             call.TopP,
 268		Temperature:      call.Temperature,
 269		PresencePenalty:  call.PresencePenalty,
 270		TopK:             call.TopK,
 271		FrequencyPenalty: call.FrequencyPenalty,
 272		PrepareStep: func(callContext context.Context, options fantasy.PrepareStepFunctionOptions) (_ context.Context, prepared fantasy.PrepareStepResult, err error) {
 273			prepared.Messages = options.Messages
 274			for i := range prepared.Messages {
 275				prepared.Messages[i].ProviderOptions = nil
 276			}
 277
 278			// Use latest tools (updated by SetTools when MCP tools change).
 279			prepared.Tools = a.tools.Copy()
 280
 281			queuedCalls, _ := a.messageQueue.Get(call.SessionID)
 282			a.messageQueue.Del(call.SessionID)
 283			for _, queued := range queuedCalls {
 284				userMessage, createErr := a.createUserMessage(callContext, queued)
 285				if createErr != nil {
 286					return callContext, prepared, createErr
 287				}
 288				prepared.Messages = append(prepared.Messages, userMessage.ToAIMessage()...)
 289			}
 290
 291			prepared.Messages = a.workaroundProviderMediaLimitations(prepared.Messages, largeModel)
 292
 293			lastSystemRoleInx := 0
 294			systemMessageUpdated := false
 295			for i, msg := range prepared.Messages {
 296				// Only add cache control to the last message.
 297				if msg.Role == fantasy.MessageRoleSystem {
 298					lastSystemRoleInx = i
 299				} else if !systemMessageUpdated {
 300					prepared.Messages[lastSystemRoleInx].ProviderOptions = a.getCacheControlOptions()
 301					systemMessageUpdated = true
 302				}
 303				// Than add cache control to the last 2 messages.
 304				if i > len(prepared.Messages)-3 {
 305					prepared.Messages[i].ProviderOptions = a.getCacheControlOptions()
 306				}
 307			}
 308
 309			if promptPrefix != "" {
 310				prepared.Messages = append([]fantasy.Message{fantasy.NewSystemMessage(promptPrefix)}, prepared.Messages...)
 311			}
 312
 313			var assistantMsg message.Message
 314			assistantMsg, err = a.messages.Create(callContext, call.SessionID, message.CreateMessageParams{
 315				Role:     message.Assistant,
 316				Parts:    []message.ContentPart{},
 317				Model:    largeModel.ModelCfg.Model,
 318				Provider: largeModel.ModelCfg.Provider,
 319			})
 320			if err != nil {
 321				return callContext, prepared, err
 322			}
 323			callContext = context.WithValue(callContext, tools.MessageIDContextKey, assistantMsg.ID)
 324			callContext = context.WithValue(callContext, tools.SupportsImagesContextKey, largeModel.CatwalkCfg.SupportsImages)
 325			callContext = context.WithValue(callContext, tools.ModelNameContextKey, largeModel.CatwalkCfg.Name)
 326			currentAssistant = &assistantMsg
 327			return callContext, prepared, err
 328		},
 329		OnReasoningStart: func(id string, reasoning fantasy.ReasoningContent) error {
 330			currentAssistant.AppendReasoningContent(reasoning.Text)
 331			return a.messages.Update(genCtx, *currentAssistant)
 332		},
 333		OnReasoningDelta: func(id string, text string) error {
 334			currentAssistant.AppendReasoningContent(text)
 335			return a.messages.Update(genCtx, *currentAssistant)
 336		},
 337		OnReasoningEnd: func(id string, reasoning fantasy.ReasoningContent) error {
 338			// handle anthropic signature
 339			if anthropicData, ok := reasoning.ProviderMetadata[anthropic.Name]; ok {
 340				if reasoning, ok := anthropicData.(*anthropic.ReasoningOptionMetadata); ok {
 341					currentAssistant.AppendReasoningSignature(reasoning.Signature)
 342				}
 343			}
 344			if googleData, ok := reasoning.ProviderMetadata[google.Name]; ok {
 345				if reasoning, ok := googleData.(*google.ReasoningMetadata); ok {
 346					currentAssistant.AppendThoughtSignature(reasoning.Signature, reasoning.ToolID)
 347				}
 348			}
 349			if openaiData, ok := reasoning.ProviderMetadata[openai.Name]; ok {
 350				if reasoning, ok := openaiData.(*openai.ResponsesReasoningMetadata); ok {
 351					currentAssistant.SetReasoningResponsesData(reasoning)
 352				}
 353			}
 354			currentAssistant.FinishThinking()
 355			return a.messages.Update(genCtx, *currentAssistant)
 356		},
 357		OnTextDelta: func(id string, text string) error {
 358			// Strip leading newline from initial text content. This is is
 359			// particularly important in non-interactive mode where leading
 360			// newlines are very visible.
 361			if len(currentAssistant.Parts) == 0 {
 362				text = strings.TrimPrefix(text, "\n")
 363			}
 364
 365			currentAssistant.AppendContent(text)
 366			return a.messages.Update(genCtx, *currentAssistant)
 367		},
 368		OnToolInputStart: func(id string, toolName string) error {
 369			toolCall := message.ToolCall{
 370				ID:               id,
 371				Name:             toolName,
 372				ProviderExecuted: false,
 373				Finished:         false,
 374			}
 375			currentAssistant.AddToolCall(toolCall)
 376			// Use parent ctx instead of genCtx to ensure the update succeeds
 377			// even if the request is canceled mid-stream
 378			return a.messages.Update(ctx, *currentAssistant)
 379		},
 380		OnRetry: func(err *fantasy.ProviderError, delay time.Duration) {
 381			slog.Warn("Provider request failed, retrying", providerRetryLogFields(err, delay)...)
 382		},
 383		OnToolCall: func(tc fantasy.ToolCallContent) error {
 384			toolCall := message.ToolCall{
 385				ID:               tc.ToolCallID,
 386				Name:             tc.ToolName,
 387				Input:            tc.Input,
 388				ProviderExecuted: false,
 389				Finished:         true,
 390			}
 391			currentAssistant.AddToolCall(toolCall)
 392			// Use parent ctx instead of genCtx to ensure the update succeeds
 393			// even if the request is canceled mid-stream
 394			return a.messages.Update(ctx, *currentAssistant)
 395		},
 396		OnToolResult: func(result fantasy.ToolResultContent) error {
 397			toolResult := a.convertToToolResult(result)
 398			// Use parent ctx instead of genCtx to ensure the message is created
 399			// even if the request is canceled mid-stream
 400			_, createMsgErr := a.messages.Create(ctx, currentAssistant.SessionID, message.CreateMessageParams{
 401				Role: message.Tool,
 402				Parts: []message.ContentPart{
 403					toolResult,
 404				},
 405			})
 406			return createMsgErr
 407		},
 408		OnStepFinish: func(stepResult fantasy.StepResult) error {
 409			finishReason := message.FinishReasonUnknown
 410			switch stepResult.FinishReason {
 411			case fantasy.FinishReasonLength:
 412				finishReason = message.FinishReasonMaxTokens
 413			case fantasy.FinishReasonStop:
 414				finishReason = message.FinishReasonEndTurn
 415			case fantasy.FinishReasonToolCalls:
 416				finishReason = message.FinishReasonToolUse
 417			}
 418			// If a tool result halted the turn (e.g. a hook halt or a
 419			// permission denial), the step ends on FinishReasonToolCalls but
 420			// the model will not be called again. Treat it as the end of the
 421			// turn so the UI can render the assistant footer.
 422			if finishReason == message.FinishReasonToolUse {
 423				for _, tr := range stepResult.Content.ToolResults() {
 424					if tr.StopTurn {
 425						finishReason = message.FinishReasonEndTurn
 426						break
 427					}
 428				}
 429			}
 430			currentAssistant.AddFinish(finishReason, "", "")
 431			sessionLock.Lock()
 432			defer sessionLock.Unlock()
 433
 434			updatedSession, getSessionErr := a.sessions.Get(ctx, call.SessionID)
 435			if getSessionErr != nil {
 436				return getSessionErr
 437			}
 438			a.updateSessionUsage(largeModel, &updatedSession, stepResult.Usage, a.openrouterCost(stepResult.ProviderMetadata))
 439			_, sessionErr := a.sessions.Save(ctx, updatedSession)
 440			if sessionErr != nil {
 441				return sessionErr
 442			}
 443			currentSession = updatedSession
 444			return a.messages.Update(genCtx, *currentAssistant)
 445		},
 446		StopWhen: []fantasy.StopCondition{
 447			func(_ []fantasy.StepResult) bool {
 448				cw := int64(largeModel.CatwalkCfg.ContextWindow)
 449				// If context window is unknown (0), skip auto-summarize
 450				// to avoid immediately truncating custom/local models.
 451				if cw == 0 {
 452					return false
 453				}
 454				tokens := currentSession.CompletionTokens + currentSession.PromptTokens
 455				remaining := cw - tokens
 456				var threshold int64
 457				if cw > largeContextWindowThreshold {
 458					threshold = largeContextWindowBuffer
 459				} else {
 460					threshold = int64(float64(cw) * smallContextWindowRatio)
 461				}
 462				if (remaining <= threshold) && !a.disableAutoSummarize {
 463					shouldSummarize = true
 464					return true
 465				}
 466				return false
 467			},
 468			func(steps []fantasy.StepResult) bool {
 469				return hasRepeatedToolCalls(steps, loopDetectionWindowSize, loopDetectionMaxRepeats)
 470			},
 471		},
 472	})
 473
 474	a.eventPromptResponded(call.SessionID, time.Since(startTime).Truncate(time.Second))
 475
 476	if err != nil {
 477		isHyper := largeModel.ModelCfg.Provider == hyper.Name
 478		isCancelErr := errors.Is(err, context.Canceled)
 479		if currentAssistant == nil {
 480			return result, err
 481		}
 482		// Ensure we finish thinking on error to close the reasoning state.
 483		currentAssistant.FinishThinking()
 484		toolCalls := currentAssistant.ToolCalls()
 485		// INFO: we use the parent context here because the genCtx has been cancelled.
 486		msgs, createErr := a.messages.List(ctx, currentAssistant.SessionID)
 487		if createErr != nil {
 488			return nil, createErr
 489		}
 490		for _, tc := range toolCalls {
 491			if !tc.Finished {
 492				tc.Finished = true
 493				tc.Input = "{}"
 494				currentAssistant.AddToolCall(tc)
 495				updateErr := a.messages.Update(ctx, *currentAssistant)
 496				if updateErr != nil {
 497					return nil, updateErr
 498				}
 499			}
 500
 501			found := false
 502			for _, msg := range msgs {
 503				if msg.Role == message.Tool {
 504					for _, tr := range msg.ToolResults() {
 505						if tr.ToolCallID == tc.ID {
 506							found = true
 507							break
 508						}
 509					}
 510				}
 511				if found {
 512					break
 513				}
 514			}
 515			if found {
 516				continue
 517			}
 518			content := "There was an error while executing the tool"
 519			if isCancelErr {
 520				content = "Error: user cancelled assistant tool calling"
 521			}
 522			toolResult := message.ToolResult{
 523				ToolCallID: tc.ID,
 524				Name:       tc.Name,
 525				Content:    content,
 526				IsError:    true,
 527			}
 528			_, createErr = a.messages.Create(ctx, currentAssistant.SessionID, message.CreateMessageParams{
 529				Role: message.Tool,
 530				Parts: []message.ContentPart{
 531					toolResult,
 532				},
 533			})
 534			if createErr != nil {
 535				return nil, createErr
 536			}
 537		}
 538		var fantasyErr *fantasy.Error
 539		var providerErr *fantasy.ProviderError
 540		const defaultTitle = "Provider Error"
 541		linkStyle := lipgloss.NewStyle().Foreground(charmtone.Guac).Underline(true)
 542		if isCancelErr {
 543			currentAssistant.AddFinish(message.FinishReasonCanceled, "User canceled request", "")
 544		} else if isHyper && errors.As(err, &providerErr) && providerErr.StatusCode == http.StatusUnauthorized {
 545			currentAssistant.AddFinish(message.FinishReasonError, "Unauthorized", `Please re-authenticate with Hyper. You can also run "crush auth" to re-authenticate.`)
 546			if a.notify != nil {
 547				a.notify.Publish(pubsub.CreatedEvent, notify.Notification{
 548					SessionID:    call.SessionID,
 549					SessionTitle: currentSession.Title,
 550					Type:         notify.TypeReAuthenticate,
 551					ProviderID:   largeModel.ModelCfg.Provider,
 552				})
 553			}
 554		} else if isHyper && errors.As(err, &providerErr) && providerErr.StatusCode == http.StatusPaymentRequired {
 555			url := hyper.BaseURL()
 556			link := linkStyle.Hyperlink(url, "id=hyper").Render(url)
 557			currentAssistant.AddFinish(message.FinishReasonError, "No credits", "You're out of credits. Add more at "+link)
 558		} else if errors.As(err, &providerErr) {
 559			if providerErr.Message == "The requested model is not supported." {
 560				url := "https://github.com/settings/copilot/features"
 561				link := linkStyle.Hyperlink(url, "id=copilot").Render(url)
 562				currentAssistant.AddFinish(
 563					message.FinishReasonError,
 564					"Copilot model not enabled",
 565					fmt.Sprintf("%q is not enabled in Copilot. Go to the following page to enable it. Then, wait 5 minutes before trying again. %s", largeModel.CatwalkCfg.Name, link),
 566				)
 567			} else {
 568				currentAssistant.AddFinish(message.FinishReasonError, cmp.Or(stringext.Capitalize(providerErr.Title), defaultTitle), providerErr.Message)
 569			}
 570		} else if errors.As(err, &fantasyErr) {
 571			currentAssistant.AddFinish(message.FinishReasonError, cmp.Or(stringext.Capitalize(fantasyErr.Title), defaultTitle), fantasyErr.Message)
 572		} else {
 573			currentAssistant.AddFinish(message.FinishReasonError, defaultTitle, err.Error())
 574		}
 575		// Note: we use the parent context here because the genCtx has been
 576		// cancelled.
 577		updateErr := a.messages.Update(ctx, *currentAssistant)
 578		if updateErr != nil {
 579			return nil, updateErr
 580		}
 581		return nil, err
 582	}
 583
 584	if shouldSummarize {
 585		a.activeRequests.Del(call.SessionID)
 586		if summarizeErr := a.Summarize(genCtx, call.SessionID, call.ProviderOptions); summarizeErr != nil {
 587			return nil, summarizeErr
 588		}
 589		// If the agent wasn't done...
 590		if len(currentAssistant.ToolCalls()) > 0 {
 591			existing, ok := a.messageQueue.Get(call.SessionID)
 592			if !ok {
 593				existing = []SessionAgentCall{}
 594			}
 595			call.Prompt = fmt.Sprintf("The previous session was interrupted because it got too long, the initial user request was: `%s`", call.Prompt)
 596			existing = append(existing, call)
 597			a.messageQueue.Set(call.SessionID, existing)
 598		}
 599	}
 600
 601	// Release active request before publishing the notification.
 602	// TUI handlers poll IsSessionBusy() and only re-evaluate when a
 603	// tea.Msg arrives, so the cleanup must precede the notify or
 604	// subscribers see stale busy state at the moment of receipt.
 605	a.activeRequests.Del(call.SessionID)
 606	cancel()
 607
 608	// Send notification that agent has finished its turn (skip for
 609	// nested/non-interactive sessions).
 610	if !call.NonInteractive && a.notify != nil {
 611		a.notify.Publish(pubsub.CreatedEvent, notify.Notification{
 612			SessionID:    call.SessionID,
 613			SessionTitle: currentSession.Title,
 614			Type:         notify.TypeAgentFinished,
 615		})
 616	}
 617
 618	queuedMessages, ok := a.messageQueue.Get(call.SessionID)
 619	if !ok || len(queuedMessages) == 0 {
 620		return result, err
 621	}
 622	// There are queued messages restart the loop.
 623	firstQueuedMessage := queuedMessages[0]
 624	a.messageQueue.Set(call.SessionID, queuedMessages[1:])
 625	return a.Run(ctx, firstQueuedMessage)
 626}
 627
 628func (a *sessionAgent) Summarize(ctx context.Context, sessionID string, opts fantasy.ProviderOptions) error {
 629	if a.IsSessionBusy(sessionID) {
 630		return ErrSessionBusy
 631	}
 632
 633	// Copy mutable fields under lock to avoid races with SetModels.
 634	largeModel := a.largeModel.Get()
 635	systemPromptPrefix := a.systemPromptPrefix.Get()
 636
 637	currentSession, err := a.sessions.Get(ctx, sessionID)
 638	if err != nil {
 639		return fmt.Errorf("failed to get session: %w", err)
 640	}
 641	msgs, err := a.getSessionMessages(ctx, currentSession)
 642	if err != nil {
 643		return err
 644	}
 645	if len(msgs) == 0 {
 646		// Nothing to summarize.
 647		return nil
 648	}
 649
 650	aiMsgs, _ := a.preparePrompt(msgs, largeModel.CatwalkCfg.SupportsImages)
 651
 652	genCtx, cancel := context.WithCancel(ctx)
 653	a.activeRequests.Set(sessionID, cancel)
 654	defer a.activeRequests.Del(sessionID)
 655	defer cancel()
 656
 657	agent := fantasy.NewAgent(largeModel.Model,
 658		fantasy.WithSystemPrompt(string(summaryPrompt)),
 659		fantasy.WithUserAgent(userAgent),
 660	)
 661	summaryMessage, err := a.messages.Create(ctx, sessionID, message.CreateMessageParams{
 662		Role:             message.Assistant,
 663		Model:            largeModel.Model.Model(),
 664		Provider:         largeModel.Model.Provider(),
 665		IsSummaryMessage: true,
 666	})
 667	if err != nil {
 668		return err
 669	}
 670
 671	summaryPromptText := buildSummaryPrompt(currentSession.Todos)
 672
 673	resp, err := agent.Stream(genCtx, fantasy.AgentStreamCall{
 674		Prompt:          summaryPromptText,
 675		Messages:        aiMsgs,
 676		ProviderOptions: opts,
 677		PrepareStep: func(callContext context.Context, options fantasy.PrepareStepFunctionOptions) (_ context.Context, prepared fantasy.PrepareStepResult, err error) {
 678			prepared.Messages = options.Messages
 679			if systemPromptPrefix != "" {
 680				prepared.Messages = append([]fantasy.Message{fantasy.NewSystemMessage(systemPromptPrefix)}, prepared.Messages...)
 681			}
 682			return callContext, prepared, nil
 683		},
 684		OnReasoningDelta: func(id string, text string) error {
 685			summaryMessage.AppendReasoningContent(text)
 686			return a.messages.Update(genCtx, summaryMessage)
 687		},
 688		OnReasoningEnd: func(id string, reasoning fantasy.ReasoningContent) error {
 689			// Handle anthropic signature.
 690			if anthropicData, ok := reasoning.ProviderMetadata["anthropic"]; ok {
 691				if signature, ok := anthropicData.(*anthropic.ReasoningOptionMetadata); ok && signature.Signature != "" {
 692					summaryMessage.AppendReasoningSignature(signature.Signature)
 693				}
 694			}
 695			summaryMessage.FinishThinking()
 696			return a.messages.Update(genCtx, summaryMessage)
 697		},
 698		OnTextDelta: func(id, text string) error {
 699			summaryMessage.AppendContent(text)
 700			return a.messages.Update(genCtx, summaryMessage)
 701		},
 702	})
 703	if err != nil {
 704		isCancelErr := errors.Is(err, context.Canceled)
 705		if isCancelErr {
 706			// User cancelled summarize we need to remove the summary message.
 707			deleteErr := a.messages.Delete(ctx, summaryMessage.ID)
 708			return deleteErr
 709		}
 710		// Mark the summary message as finished with an error so the UI
 711		// stops spinning.
 712		summaryMessage.AddFinish(message.FinishReasonError, "Summarization Error", err.Error())
 713		if updateErr := a.messages.Update(ctx, summaryMessage); updateErr != nil {
 714			return updateErr
 715		}
 716		return err
 717	}
 718
 719	summaryMessage.AddFinish(message.FinishReasonEndTurn, "", "")
 720	err = a.messages.Update(genCtx, summaryMessage)
 721	if err != nil {
 722		return err
 723	}
 724
 725	var openrouterCost *float64
 726	for _, step := range resp.Steps {
 727		stepCost := a.openrouterCost(step.ProviderMetadata)
 728		if stepCost != nil {
 729			newCost := *stepCost
 730			if openrouterCost != nil {
 731				newCost += *openrouterCost
 732			}
 733			openrouterCost = &newCost
 734		}
 735	}
 736
 737	a.updateSessionUsage(largeModel, &currentSession, resp.TotalUsage, openrouterCost)
 738
 739	// Just in case, get just the last usage info.
 740	usage := resp.Response.Usage
 741	currentSession.SummaryMessageID = summaryMessage.ID
 742	currentSession.CompletionTokens = usage.OutputTokens
 743	currentSession.PromptTokens = 0
 744	_, err = a.sessions.Save(genCtx, currentSession)
 745	if err != nil {
 746		return err
 747	}
 748
 749	// Release the active request before processing queued messages so that
 750	// Run() does not see the session as busy.
 751	a.activeRequests.Del(sessionID)
 752	cancel()
 753
 754	// Process any messages that were queued while summarizing.
 755	queuedMessages, ok := a.messageQueue.Get(sessionID)
 756	if !ok || len(queuedMessages) == 0 {
 757		return nil
 758	}
 759	firstQueuedMessage := queuedMessages[0]
 760	a.messageQueue.Set(sessionID, queuedMessages[1:])
 761	_, qErr := a.Run(ctx, firstQueuedMessage)
 762	return qErr
 763}
 764
 765func (a *sessionAgent) getCacheControlOptions() fantasy.ProviderOptions {
 766	if t, _ := strconv.ParseBool(os.Getenv("CRUSH_DISABLE_ANTHROPIC_CACHE")); t {
 767		return fantasy.ProviderOptions{}
 768	}
 769	return fantasy.ProviderOptions{
 770		anthropic.Name: &anthropic.ProviderCacheControlOptions{
 771			CacheControl: anthropic.CacheControl{Type: "ephemeral"},
 772		},
 773		bedrock.Name: &anthropic.ProviderCacheControlOptions{
 774			CacheControl: anthropic.CacheControl{Type: "ephemeral"},
 775		},
 776		vercel.Name: &anthropic.ProviderCacheControlOptions{
 777			CacheControl: anthropic.CacheControl{Type: "ephemeral"},
 778		},
 779	}
 780}
 781
 782func (a *sessionAgent) createUserMessage(ctx context.Context, call SessionAgentCall) (message.Message, error) {
 783	parts := []message.ContentPart{message.TextContent{Text: call.Prompt}}
 784	var attachmentParts []message.ContentPart
 785	for _, attachment := range call.Attachments {
 786		attachmentParts = append(attachmentParts, message.BinaryContent{Path: attachment.FilePath, MIMEType: attachment.MimeType, Data: attachment.Content})
 787	}
 788	parts = append(parts, attachmentParts...)
 789	msg, err := a.messages.Create(ctx, call.SessionID, message.CreateMessageParams{
 790		Role:  message.User,
 791		Parts: parts,
 792	})
 793	if err != nil {
 794		return message.Message{}, fmt.Errorf("failed to create user message: %w", err)
 795	}
 796	return msg, nil
 797}
 798
 799func (a *sessionAgent) preparePrompt(msgs []message.Message, supportsImages bool, attachments ...message.Attachment) ([]fantasy.Message, []fantasy.FilePart) {
 800	var history []fantasy.Message
 801	if !a.isSubAgent {
 802		history = append(history, fantasy.NewUserMessage(
 803			fmt.Sprintf("<system_reminder>%s</system_reminder>",
 804				`This is a reminder that your todo list is currently empty. DO NOT mention this to the user explicitly because they are already aware.
 805If you are working on tasks that would benefit from a todo list please use the "todos" tool to create one.
 806If not, please feel free to ignore. Again do not mention this message to the user.`,
 807			),
 808		))
 809	}
 810	// Collect all tool call IDs present in assistant messages and all tool
 811	// result IDs present in tool messages. This lets us detect both orphaned
 812	// tool results (result without a call) and orphaned tool calls (call
 813	// without a result).
 814	knownToolCallIDs := make(map[string]struct{})
 815	knownToolResultIDs := make(map[string]struct{})
 816	for _, m := range msgs {
 817		switch m.Role {
 818		case message.Assistant:
 819			for _, tc := range m.ToolCalls() {
 820				knownToolCallIDs[tc.ID] = struct{}{}
 821			}
 822		case message.Tool:
 823			for _, tr := range m.ToolResults() {
 824				knownToolResultIDs[tr.ToolCallID] = struct{}{}
 825			}
 826		}
 827	}
 828
 829	for _, m := range msgs {
 830		if len(m.Parts) == 0 {
 831			continue
 832		}
 833		// Assistant message without content or tool calls (cancelled before it returned anything).
 834		if m.Role == message.Assistant && len(m.ToolCalls()) == 0 && m.Content().Text == "" && m.ReasoningContent().String() == "" {
 835			continue
 836		}
 837		if m.Role == message.Tool {
 838			if msg, ok := filterOrphanedToolResults(m, knownToolCallIDs); ok {
 839				history = append(history, msg)
 840			}
 841			continue
 842		}
 843		aiMsgs := m.ToAIMessage()
 844		if !supportsImages {
 845			for i := range aiMsgs {
 846				if aiMsgs[i].Role == fantasy.MessageRoleUser {
 847					aiMsgs[i].Content = filterFileParts(aiMsgs[i].Content)
 848				}
 849			}
 850		}
 851		history = append(history, aiMsgs...)
 852
 853		if m.Role == message.Assistant {
 854			if msg, ok := syntheticToolResultsForOrphanedCalls(m, knownToolResultIDs); ok {
 855				history = append(history, msg)
 856			}
 857		}
 858	}
 859
 860	var files []fantasy.FilePart
 861	for _, attachment := range attachments {
 862		if attachment.IsText() {
 863			continue
 864		}
 865		files = append(files, fantasy.FilePart{
 866			Filename:  attachment.FileName,
 867			Data:      attachment.Content,
 868			MediaType: attachment.MimeType,
 869		})
 870	}
 871
 872	return history, files
 873}
 874
 875// filterFileParts removes fantasy.FilePart entries from a slice of message
 876// parts. Used to strip image attachments from historical user messages when
 877// the current model does not support them.
 878func filterFileParts(parts []fantasy.MessagePart) []fantasy.MessagePart {
 879	filtered := make([]fantasy.MessagePart, 0, len(parts))
 880	for _, part := range parts {
 881		if _, ok := fantasy.AsMessagePart[fantasy.FilePart](part); ok {
 882			continue
 883		}
 884		filtered = append(filtered, part)
 885	}
 886	return filtered
 887}
 888
 889// filterOrphanedToolResults converts a tool message to a fantasy.Message,
 890// dropping any tool result parts whose tool_call_id has no matching tool call
 891// in the known set. An orphaned result causes API validation to fail on every
 892// subsequent turn, permanently locking the session. Returns the filtered
 893// message and true if at least one valid part remains.
 894func filterOrphanedToolResults(m message.Message, knownToolCallIDs map[string]struct{}) (fantasy.Message, bool) {
 895	aiMsgs := m.ToAIMessage()
 896	if len(aiMsgs) == 0 {
 897		return fantasy.Message{}, false
 898	}
 899	var validParts []fantasy.MessagePart
 900	for _, part := range aiMsgs[0].Content {
 901		tr, ok := fantasy.AsMessagePart[fantasy.ToolResultPart](part)
 902		if !ok {
 903			validParts = append(validParts, part)
 904			continue
 905		}
 906		if _, known := knownToolCallIDs[tr.ToolCallID]; known {
 907			validParts = append(validParts, part)
 908		} else {
 909			slog.Warn("Dropping orphaned tool result with no matching tool call",
 910				"tool_call_id", tr.ToolCallID,
 911			)
 912		}
 913	}
 914	if len(validParts) == 0 {
 915		return fantasy.Message{}, false
 916	}
 917	msg := aiMsgs[0]
 918	msg.Content = validParts
 919	return msg, true
 920}
 921
 922// syntheticToolResultsForOrphanedCalls returns a tool message containing
 923// synthetic tool results for any tool calls in the assistant message that
 924// have no matching result in knownToolResultIDs. LLM APIs require every
 925// tool_use to be immediately followed by a tool_result; an interrupted
 926// session can leave orphaned tool_use blocks that permanently lock the
 927// conversation. Returns the message and true if any synthetic results were
 928// produced.
 929func syntheticToolResultsForOrphanedCalls(m message.Message, knownToolResultIDs map[string]struct{}) (fantasy.Message, bool) {
 930	var syntheticParts []fantasy.MessagePart
 931	for _, tc := range m.ToolCalls() {
 932		if _, hasResult := knownToolResultIDs[tc.ID]; hasResult {
 933			continue
 934		}
 935		slog.Warn("Injecting synthetic tool result for orphaned tool call",
 936			"tool_call_id", tc.ID,
 937			"tool_name", tc.Name,
 938		)
 939		syntheticParts = append(syntheticParts, fantasy.ToolResultPart{
 940			ToolCallID: tc.ID,
 941			Output: fantasy.ToolResultOutputContentError{
 942				Error: errors.New("tool call was interrupted and did not produce a result, you may retry this call if the result is still needed"),
 943			},
 944		})
 945	}
 946	if len(syntheticParts) == 0 {
 947		return fantasy.Message{}, false
 948	}
 949	return fantasy.Message{
 950		Role:    fantasy.MessageRoleTool,
 951		Content: syntheticParts,
 952	}, true
 953}
 954
 955func (a *sessionAgent) getSessionMessages(ctx context.Context, session session.Session) ([]message.Message, error) {
 956	msgs, err := a.messages.List(ctx, session.ID)
 957	if err != nil {
 958		return nil, fmt.Errorf("failed to list messages: %w", err)
 959	}
 960
 961	if session.SummaryMessageID != "" {
 962		summaryMsgIndex := -1
 963		for i, msg := range msgs {
 964			if msg.ID == session.SummaryMessageID {
 965				summaryMsgIndex = i
 966				break
 967			}
 968		}
 969		if summaryMsgIndex != -1 {
 970			msgs = msgs[summaryMsgIndex:]
 971			msgs[0].Role = message.User
 972		}
 973	}
 974	return msgs, nil
 975}
 976
 977// generateTitle generates a session titled based on the initial prompt.
 978func (a *sessionAgent) generateTitle(ctx context.Context, sessionID string, userPrompt string) {
 979	if userPrompt == "" {
 980		return
 981	}
 982
 983	smallModel := a.smallModel.Get()
 984	largeModel := a.largeModel.Get()
 985	systemPromptPrefix := a.systemPromptPrefix.Get()
 986
 987	var maxOutputTokens int64 = 40
 988	if smallModel.CatwalkCfg.CanReason {
 989		maxOutputTokens = smallModel.CatwalkCfg.DefaultMaxTokens
 990	}
 991
 992	newAgent := func(m fantasy.LanguageModel, p []byte, tok int64) fantasy.Agent {
 993		return fantasy.NewAgent(m,
 994			fantasy.WithSystemPrompt(string(p)+"\n /no_think"),
 995			fantasy.WithMaxOutputTokens(tok),
 996			fantasy.WithUserAgent(userAgent),
 997		)
 998	}
 999
1000	streamCall := fantasy.AgentStreamCall{
1001		Prompt: fmt.Sprintf("Generate a concise title for the following content:\n\n%s\n <think>\n\n</think>", userPrompt),
1002		PrepareStep: func(callCtx context.Context, opts fantasy.PrepareStepFunctionOptions) (_ context.Context, prepared fantasy.PrepareStepResult, err error) {
1003			prepared.Messages = opts.Messages
1004			if systemPromptPrefix != "" {
1005				prepared.Messages = append([]fantasy.Message{
1006					fantasy.NewSystemMessage(systemPromptPrefix),
1007				}, prepared.Messages...)
1008			}
1009			return callCtx, prepared, nil
1010		},
1011	}
1012
1013	// Use the small model to generate the title.
1014	model := smallModel
1015	agent := newAgent(model.Model, titlePrompt, maxOutputTokens)
1016	resp, err := agent.Stream(ctx, streamCall)
1017	if err == nil {
1018		// We successfully generated a title with the small model.
1019		slog.Debug("Generated title with small model")
1020	} else {
1021		// It didn't work. Let's try with the big model.
1022		slog.Error("Error generating title with small model; trying big model", "err", err)
1023		model = largeModel
1024		agent = newAgent(model.Model, titlePrompt, maxOutputTokens)
1025		resp, err = agent.Stream(ctx, streamCall)
1026		if err == nil {
1027			slog.Debug("Generated title with large model")
1028		} else {
1029			// Welp, the large model didn't work either. Use the default
1030			// session name and return.
1031			slog.Error("Error generating title with large model", "err", err)
1032			saveErr := a.sessions.Rename(ctx, sessionID, DefaultSessionName)
1033			if saveErr != nil {
1034				slog.Error("Failed to save session title", "error", saveErr)
1035			}
1036			return
1037		}
1038	}
1039
1040	if resp == nil {
1041		// Actually, we didn't get a response so we can't. Use the default
1042		// session name and return.
1043		slog.Error("Response is nil; can't generate title")
1044		saveErr := a.sessions.Rename(ctx, sessionID, DefaultSessionName)
1045		if saveErr != nil {
1046			slog.Error("Failed to save session title", "error", saveErr)
1047		}
1048		return
1049	}
1050
1051	// Clean up title.
1052	var title string
1053	title = strings.ReplaceAll(resp.Response.Content.Text(), "\n", " ")
1054
1055	// Remove thinking tags if present.
1056	title = thinkTagRegex.ReplaceAllString(title, "")
1057	title = orphanThinkTagRegex.ReplaceAllString(title, "")
1058
1059	title = strings.TrimSpace(title)
1060	title = cmp.Or(title, DefaultSessionName)
1061
1062	// Calculate usage and cost.
1063	var openrouterCost *float64
1064	for _, step := range resp.Steps {
1065		stepCost := a.openrouterCost(step.ProviderMetadata)
1066		if stepCost != nil {
1067			newCost := *stepCost
1068			if openrouterCost != nil {
1069				newCost += *openrouterCost
1070			}
1071			openrouterCost = &newCost
1072		}
1073	}
1074
1075	modelConfig := model.CatwalkCfg
1076	cost := modelConfig.CostPer1MInCached/1e6*float64(resp.TotalUsage.CacheCreationTokens) +
1077		modelConfig.CostPer1MOutCached/1e6*float64(resp.TotalUsage.CacheReadTokens) +
1078		modelConfig.CostPer1MIn/1e6*float64(resp.TotalUsage.InputTokens) +
1079		modelConfig.CostPer1MOut/1e6*float64(resp.TotalUsage.OutputTokens)
1080
1081	// Use override cost if available (e.g., from OpenRouter).
1082	if openrouterCost != nil {
1083		cost = *openrouterCost
1084	}
1085
1086	// Skip cost accumulation
1087	if model.FlatRate {
1088		cost = 0
1089	}
1090
1091	promptTokens := resp.TotalUsage.InputTokens + resp.TotalUsage.CacheCreationTokens
1092	completionTokens := resp.TotalUsage.OutputTokens
1093
1094	// Atomically update only title and usage fields to avoid overriding other
1095	// concurrent session updates.
1096	saveErr := a.sessions.UpdateTitleAndUsage(ctx, sessionID, title, promptTokens, completionTokens, cost)
1097	if saveErr != nil {
1098		slog.Error("Failed to save session title and usage", "error", saveErr)
1099		return
1100	}
1101}
1102
1103func (a *sessionAgent) openrouterCost(metadata fantasy.ProviderMetadata) *float64 {
1104	openrouterMetadata, ok := metadata[openrouter.Name]
1105	if !ok {
1106		return nil
1107	}
1108
1109	opts, ok := openrouterMetadata.(*openrouter.ProviderMetadata)
1110	if !ok {
1111		return nil
1112	}
1113	return &opts.Usage.Cost
1114}
1115
1116func (a *sessionAgent) updateSessionUsage(model Model, session *session.Session, usage fantasy.Usage, overrideCost *float64) {
1117	modelConfig := model.CatwalkCfg
1118	cost := modelConfig.CostPer1MInCached/1e6*float64(usage.CacheCreationTokens) +
1119		modelConfig.CostPer1MOutCached/1e6*float64(usage.CacheReadTokens) +
1120		modelConfig.CostPer1MIn/1e6*float64(usage.InputTokens) +
1121		modelConfig.CostPer1MOut/1e6*float64(usage.OutputTokens)
1122
1123	a.eventTokensUsed(session.ID, model, usage, cost)
1124
1125	// Use override cost if available (e.g., from OpenRouter).
1126	if overrideCost != nil {
1127		cost = *overrideCost
1128	}
1129
1130	// Skip cost accumulation
1131	if model.FlatRate {
1132		cost = 0
1133	}
1134
1135	session.Cost += cost
1136	session.CompletionTokens = usage.OutputTokens
1137	session.PromptTokens = usage.InputTokens + usage.CacheReadTokens
1138}
1139
1140func (a *sessionAgent) Cancel(sessionID string) {
1141	// Cancel regular requests. Don't use Take() here - we need the entry to
1142	// remain in activeRequests so IsBusy() returns true until the goroutine
1143	// fully completes (including error handling that may access the DB).
1144	// The defer in processRequest will clean up the entry.
1145	if cancel, ok := a.activeRequests.Get(sessionID); ok && cancel != nil {
1146		slog.Debug("Request cancellation initiated", "session_id", sessionID)
1147		cancel()
1148	}
1149
1150	// Also check for summarize requests.
1151	if cancel, ok := a.activeRequests.Get(sessionID + "-summarize"); ok && cancel != nil {
1152		slog.Debug("Summarize cancellation initiated", "session_id", sessionID)
1153		cancel()
1154	}
1155
1156	if a.QueuedPrompts(sessionID) > 0 {
1157		slog.Debug("Clearing queued prompts", "session_id", sessionID)
1158		a.messageQueue.Del(sessionID)
1159	}
1160}
1161
1162func (a *sessionAgent) ClearQueue(sessionID string) {
1163	if a.QueuedPrompts(sessionID) > 0 {
1164		slog.Debug("Clearing queued prompts", "session_id", sessionID)
1165		a.messageQueue.Del(sessionID)
1166	}
1167}
1168
1169func (a *sessionAgent) CancelAll() {
1170	if !a.IsBusy() {
1171		return
1172	}
1173	for key := range a.activeRequests.Seq2() {
1174		a.Cancel(key) // key is sessionID
1175	}
1176
1177	timeout := time.After(5 * time.Second)
1178	for a.IsBusy() {
1179		select {
1180		case <-timeout:
1181			return
1182		default:
1183			time.Sleep(200 * time.Millisecond)
1184		}
1185	}
1186}
1187
1188func (a *sessionAgent) IsBusy() bool {
1189	var busy bool
1190	for cancelFunc := range a.activeRequests.Seq() {
1191		if cancelFunc != nil {
1192			busy = true
1193			break
1194		}
1195	}
1196	return busy
1197}
1198
1199func (a *sessionAgent) IsSessionBusy(sessionID string) bool {
1200	_, busy := a.activeRequests.Get(sessionID)
1201	return busy
1202}
1203
1204func (a *sessionAgent) QueuedPrompts(sessionID string) int {
1205	l, ok := a.messageQueue.Get(sessionID)
1206	if !ok {
1207		return 0
1208	}
1209	return len(l)
1210}
1211
1212func (a *sessionAgent) QueuedPromptsList(sessionID string) []string {
1213	l, ok := a.messageQueue.Get(sessionID)
1214	if !ok {
1215		return nil
1216	}
1217	prompts := make([]string, len(l))
1218	for i, call := range l {
1219		prompts[i] = call.Prompt
1220	}
1221	return prompts
1222}
1223
1224func (a *sessionAgent) SetModels(large Model, small Model) {
1225	a.largeModel.Set(large)
1226	a.smallModel.Set(small)
1227}
1228
1229func (a *sessionAgent) SetTools(tools []fantasy.AgentTool) {
1230	a.tools.SetSlice(tools)
1231}
1232
1233func (a *sessionAgent) SetSystemPrompt(systemPrompt string) {
1234	a.systemPrompt.Set(systemPrompt)
1235}
1236
1237func (a *sessionAgent) Model() Model {
1238	return a.largeModel.Get()
1239}
1240
1241// convertToToolResult converts a fantasy tool result to a message tool result.
1242func (a *sessionAgent) convertToToolResult(result fantasy.ToolResultContent) message.ToolResult {
1243	baseResult := message.ToolResult{
1244		ToolCallID: result.ToolCallID,
1245		Name:       result.ToolName,
1246		Metadata:   result.ClientMetadata,
1247	}
1248
1249	switch result.Result.GetType() {
1250	case fantasy.ToolResultContentTypeText:
1251		if r, ok := fantasy.AsToolResultOutputType[fantasy.ToolResultOutputContentText](result.Result); ok {
1252			baseResult.Content = r.Text
1253		}
1254	case fantasy.ToolResultContentTypeError:
1255		if r, ok := fantasy.AsToolResultOutputType[fantasy.ToolResultOutputContentError](result.Result); ok {
1256			baseResult.Content = r.Error.Error()
1257			baseResult.IsError = true
1258		}
1259	case fantasy.ToolResultContentTypeMedia:
1260		if r, ok := fantasy.AsToolResultOutputType[fantasy.ToolResultOutputContentMedia](result.Result); ok {
1261			if !stringext.IsValidBase64(r.Data) {
1262				slog.Warn("Tool returned media with invalid base64 data, discarding image",
1263					"tool", result.ToolName,
1264					"tool_call_id", result.ToolCallID,
1265				)
1266				baseResult.Content = "Tool returned image data with invalid encoding"
1267				baseResult.IsError = true
1268			} else {
1269				content := r.Text
1270				if content == "" {
1271					content = fmt.Sprintf("Loaded %s content", r.MediaType)
1272				}
1273				baseResult.Content = content
1274				baseResult.Data = r.Data
1275				baseResult.MIMEType = r.MediaType
1276			}
1277		}
1278	}
1279
1280	return baseResult
1281}
1282
1283// workaroundProviderMediaLimitations converts media content in tool results to
1284// user messages for providers that don't natively support images in tool results.
1285//
1286// Problem: OpenAI, Google, OpenRouter, and other OpenAI-compatible providers
1287// don't support sending images/media in tool result messages - they only accept
1288// text in tool results. However, they DO support images in user messages.
1289//
1290// If we send media in tool results to these providers, the API returns an error.
1291//
1292// Solution: For these providers, we:
1293//  1. Replace the media in the tool result with a text placeholder
1294//  2. Inject a user message immediately after with the image as a file attachment
1295//  3. This maintains the tool execution flow while working around API limitations
1296//
1297// Anthropic and Bedrock support images natively in tool results, so we skip
1298// this workaround for them.
1299//
1300// Example transformation:
1301//
1302//	BEFORE: [tool result: image data]
1303//	AFTER:  [tool result: "Image loaded - see attached"], [user: image attachment]
1304func (a *sessionAgent) workaroundProviderMediaLimitations(messages []fantasy.Message, largeModel Model) []fantasy.Message {
1305	providerSupportsMedia := largeModel.ModelCfg.Provider == string(catwalk.InferenceProviderAnthropic) ||
1306		largeModel.ModelCfg.Provider == string(catwalk.InferenceProviderBedrock)
1307
1308	if providerSupportsMedia {
1309		return messages
1310	}
1311
1312	convertedMessages := make([]fantasy.Message, 0, len(messages))
1313
1314	for _, msg := range messages {
1315		if msg.Role != fantasy.MessageRoleTool {
1316			convertedMessages = append(convertedMessages, msg)
1317			continue
1318		}
1319
1320		textParts := make([]fantasy.MessagePart, 0, len(msg.Content))
1321		var mediaFiles []fantasy.FilePart
1322
1323		for _, part := range msg.Content {
1324			toolResult, ok := fantasy.AsMessagePart[fantasy.ToolResultPart](part)
1325			if !ok {
1326				textParts = append(textParts, part)
1327				continue
1328			}
1329
1330			if media, ok := fantasy.AsToolResultOutputType[fantasy.ToolResultOutputContentMedia](toolResult.Output); ok {
1331				decoded, err := base64.StdEncoding.DecodeString(media.Data)
1332				if err != nil {
1333					slog.Warn("Failed to decode media data", "error", err)
1334					textParts = append(textParts, part)
1335					continue
1336				}
1337
1338				mediaFiles = append(mediaFiles, fantasy.FilePart{
1339					Data:      decoded,
1340					MediaType: media.MediaType,
1341					Filename:  fmt.Sprintf("tool-result-%s", toolResult.ToolCallID),
1342				})
1343
1344				textParts = append(textParts, fantasy.ToolResultPart{
1345					ToolCallID: toolResult.ToolCallID,
1346					Output: fantasy.ToolResultOutputContentText{
1347						Text: "[Image/media content loaded - see attached file]",
1348					},
1349					ProviderOptions: toolResult.ProviderOptions,
1350				})
1351			} else {
1352				textParts = append(textParts, part)
1353			}
1354		}
1355
1356		convertedMessages = append(convertedMessages, fantasy.Message{
1357			Role:    fantasy.MessageRoleTool,
1358			Content: textParts,
1359		})
1360
1361		if len(mediaFiles) > 0 {
1362			convertedMessages = append(convertedMessages, fantasy.NewUserMessage(
1363				"Here is the media content from the tool result:",
1364				mediaFiles...,
1365			))
1366		}
1367	}
1368
1369	return convertedMessages
1370}
1371
1372// buildSummaryPrompt constructs the prompt text for session summarization.
1373func buildSummaryPrompt(todos []session.Todo) string {
1374	var sb strings.Builder
1375	sb.WriteString("Provide a detailed summary of our conversation above.")
1376	if len(todos) > 0 {
1377		sb.WriteString("\n\n## Current Todo List\n\n")
1378		for _, t := range todos {
1379			fmt.Fprintf(&sb, "- [%s] %s\n", t.Status, t.Content)
1380		}
1381		sb.WriteString("\nInclude these tasks and their statuses in your summary. ")
1382		sb.WriteString("Instruct the resuming assistant to use the `todos` tool to continue tracking progress on these tasks.")
1383	}
1384	return sb.String()
1385}
1386
1387func providerRetryLogFields(err *fantasy.ProviderError, delay time.Duration) []any {
1388	fields := []any{
1389		"retry_delay", delay.String(),
1390	}
1391	if err == nil {
1392		return fields
1393	}
1394	fields = append(fields, "status_code", err.StatusCode)
1395	if err.Title != "" {
1396		fields = append(fields, "title", err.Title)
1397	}
1398	if err.Message != "" {
1399		fields = append(fields, "message", err.Message)
1400	}
1401	return fields
1402}