agent.go

   1// Package agent is the core orchestration layer for Crush AI agents.
   2//
   3// It provides session-based AI agent functionality for managing
   4// conversations, tool execution, and message handling. It coordinates
   5// interactions between language models, messages, sessions, and tools while
   6// handling features like automatic summarization, queuing, and token
   7// management.
   8package agent
   9
  10import (
  11	"cmp"
  12	"context"
  13	_ "embed"
  14	"encoding/base64"
  15	"errors"
  16	"fmt"
  17	"log/slog"
  18	"os"
  19	"regexp"
  20	"strconv"
  21	"strings"
  22	"sync"
  23	"time"
  24
  25	"charm.land/catwalk/pkg/catwalk"
  26	"charm.land/fantasy"
  27	"charm.land/fantasy/providers/anthropic"
  28	"charm.land/fantasy/providers/bedrock"
  29	"charm.land/fantasy/providers/google"
  30	"charm.land/fantasy/providers/openai"
  31	"charm.land/fantasy/providers/openrouter"
  32	"charm.land/fantasy/providers/vercel"
  33	"charm.land/lipgloss/v2"
  34	"github.com/charmbracelet/crush/internal/agent/hyper"
  35	"github.com/charmbracelet/crush/internal/agent/notify"
  36	"github.com/charmbracelet/crush/internal/agent/tools"
  37	"github.com/charmbracelet/crush/internal/agent/tools/mcp"
  38	"github.com/charmbracelet/crush/internal/config"
  39	"github.com/charmbracelet/crush/internal/csync"
  40	"github.com/charmbracelet/crush/internal/message"
  41	"github.com/charmbracelet/crush/internal/permission"
  42	"github.com/charmbracelet/crush/internal/pubsub"
  43	"github.com/charmbracelet/crush/internal/session"
  44	"github.com/charmbracelet/crush/internal/stringext"
  45	"github.com/charmbracelet/crush/internal/version"
  46	"github.com/charmbracelet/x/exp/charmtone"
  47)
  48
  49const (
  50	DefaultSessionName = "Untitled Session"
  51
  52	// Constants for auto-summarization thresholds
  53	largeContextWindowThreshold = 200_000
  54	largeContextWindowBuffer    = 20_000
  55	smallContextWindowRatio     = 0.2
  56)
  57
  58//go:embed templates/title.md
  59var titlePrompt []byte
  60
  61//go:embed templates/summary.md
  62var summaryPrompt []byte
  63
  64// Used to remove <think> tags from generated titles.
  65var thinkTagRegex = regexp.MustCompile(`<think>.*?</think>`)
  66
  67type SessionAgentCall struct {
  68	SessionID        string
  69	Prompt           string
  70	ProviderOptions  fantasy.ProviderOptions
  71	Attachments      []message.Attachment
  72	MaxOutputTokens  int64
  73	Temperature      *float64
  74	TopP             *float64
  75	TopK             *int64
  76	FrequencyPenalty *float64
  77	PresencePenalty  *float64
  78	NonInteractive   bool
  79}
  80
  81type SessionAgent interface {
  82	Run(context.Context, SessionAgentCall) (*fantasy.AgentResult, error)
  83	SetModels(large Model, small Model)
  84	SetTools(tools []fantasy.AgentTool)
  85	SetSystemPrompt(systemPrompt string)
  86	Cancel(sessionID string)
  87	CancelAll()
  88	IsSessionBusy(sessionID string) bool
  89	IsBusy() bool
  90	QueuedPrompts(sessionID string) int
  91	QueuedPromptsList(sessionID string) []string
  92	ClearQueue(sessionID string)
  93	Summarize(context.Context, string, fantasy.ProviderOptions) error
  94	Model() Model
  95}
  96
  97type Model struct {
  98	Model      fantasy.LanguageModel
  99	CatwalkCfg catwalk.Model
 100	ModelCfg   config.SelectedModel
 101}
 102
 103type sessionAgent struct {
 104	largeModel         *csync.Value[Model]
 105	smallModel         *csync.Value[Model]
 106	systemPromptPrefix *csync.Value[string]
 107	systemPrompt       *csync.Value[string]
 108	tools              *csync.Slice[fantasy.AgentTool]
 109
 110	isSubAgent           bool
 111	sessions             session.Service
 112	messages             message.Service
 113	disableAutoSummarize bool
 114	isYolo               bool
 115	notify               pubsub.Publisher[notify.Notification]
 116
 117	messageQueue   *csync.Map[string, []SessionAgentCall]
 118	activeRequests *csync.Map[string, context.CancelFunc]
 119}
 120
 121type SessionAgentOptions struct {
 122	LargeModel           Model
 123	SmallModel           Model
 124	SystemPromptPrefix   string
 125	SystemPrompt         string
 126	IsSubAgent           bool
 127	DisableAutoSummarize bool
 128	IsYolo               bool
 129	Sessions             session.Service
 130	Messages             message.Service
 131	Tools                []fantasy.AgentTool
 132	Notify               pubsub.Publisher[notify.Notification]
 133}
 134
 135func NewSessionAgent(
 136	opts SessionAgentOptions,
 137) SessionAgent {
 138	return &sessionAgent{
 139		largeModel:           csync.NewValue(opts.LargeModel),
 140		smallModel:           csync.NewValue(opts.SmallModel),
 141		systemPromptPrefix:   csync.NewValue(opts.SystemPromptPrefix),
 142		systemPrompt:         csync.NewValue(opts.SystemPrompt),
 143		isSubAgent:           opts.IsSubAgent,
 144		sessions:             opts.Sessions,
 145		messages:             opts.Messages,
 146		disableAutoSummarize: opts.DisableAutoSummarize,
 147		tools:                csync.NewSliceFrom(opts.Tools),
 148		isYolo:               opts.IsYolo,
 149		notify:               opts.Notify,
 150		messageQueue:         csync.NewMap[string, []SessionAgentCall](),
 151		activeRequests:       csync.NewMap[string, context.CancelFunc](),
 152	}
 153}
 154
 155func (a *sessionAgent) Run(ctx context.Context, call SessionAgentCall) (*fantasy.AgentResult, error) {
 156	if call.Prompt == "" && !message.ContainsTextAttachment(call.Attachments) {
 157		return nil, ErrEmptyPrompt
 158	}
 159	if call.SessionID == "" {
 160		return nil, ErrSessionMissing
 161	}
 162
 163	// Queue the message if busy
 164	if a.IsSessionBusy(call.SessionID) {
 165		existing, ok := a.messageQueue.Get(call.SessionID)
 166		if !ok {
 167			existing = []SessionAgentCall{}
 168		}
 169		existing = append(existing, call)
 170		a.messageQueue.Set(call.SessionID, existing)
 171		return nil, nil
 172	}
 173
 174	// Copy mutable fields under lock to avoid races with SetTools/SetModels.
 175	agentTools := a.tools.Copy()
 176	largeModel := a.largeModel.Get()
 177	systemPrompt := a.systemPrompt.Get()
 178	promptPrefix := a.systemPromptPrefix.Get()
 179	var instructions strings.Builder
 180
 181	for _, server := range mcp.GetStates() {
 182		if server.State != mcp.StateConnected {
 183			continue
 184		}
 185		if s := server.Client.InitializeResult().Instructions; s != "" {
 186			instructions.WriteString(s)
 187			instructions.WriteString("\n\n")
 188		}
 189	}
 190
 191	if s := instructions.String(); s != "" {
 192		systemPrompt += "\n\n<mcp-instructions>\n" + s + "\n</mcp-instructions>"
 193	}
 194
 195	if len(agentTools) > 0 {
 196		// Add Anthropic caching to the last tool.
 197		agentTools[len(agentTools)-1].SetProviderOptions(a.getCacheControlOptions())
 198	}
 199
 200	agent := fantasy.NewAgent(
 201		largeModel.Model,
 202		fantasy.WithSystemPrompt(systemPrompt),
 203		fantasy.WithTools(agentTools...),
 204		fantasy.WithUserAgent("Charm Crush/"+version.Version),
 205	)
 206
 207	sessionLock := sync.Mutex{}
 208	currentSession, err := a.sessions.Get(ctx, call.SessionID)
 209	if err != nil {
 210		return nil, fmt.Errorf("failed to get session: %w", err)
 211	}
 212
 213	msgs, err := a.getSessionMessages(ctx, currentSession)
 214	if err != nil {
 215		return nil, fmt.Errorf("failed to get session messages: %w", err)
 216	}
 217
 218	var wg sync.WaitGroup
 219	// Generate title if first message.
 220	if len(msgs) == 0 {
 221		titleCtx := ctx // Copy to avoid race with ctx reassignment below.
 222		wg.Go(func() {
 223			a.generateTitle(titleCtx, call.SessionID, call.Prompt)
 224		})
 225	}
 226	defer wg.Wait()
 227
 228	// Add the user message to the session.
 229	_, err = a.createUserMessage(ctx, call)
 230	if err != nil {
 231		return nil, err
 232	}
 233
 234	// Add the session to the context.
 235	ctx = context.WithValue(ctx, tools.SessionIDContextKey, call.SessionID)
 236
 237	genCtx, cancel := context.WithCancel(ctx)
 238	a.activeRequests.Set(call.SessionID, cancel)
 239
 240	defer cancel()
 241	defer a.activeRequests.Del(call.SessionID)
 242
 243	history, files := a.preparePrompt(msgs, call.Attachments...)
 244
 245	startTime := time.Now()
 246	a.eventPromptSent(call.SessionID)
 247
 248	var currentAssistant *message.Message
 249	var shouldSummarize bool
 250	result, err := agent.Stream(genCtx, fantasy.AgentStreamCall{
 251		Prompt:           message.PromptWithTextAttachments(call.Prompt, call.Attachments),
 252		Files:            files,
 253		Messages:         history,
 254		ProviderOptions:  call.ProviderOptions,
 255		MaxOutputTokens:  &call.MaxOutputTokens,
 256		TopP:             call.TopP,
 257		Temperature:      call.Temperature,
 258		PresencePenalty:  call.PresencePenalty,
 259		TopK:             call.TopK,
 260		FrequencyPenalty: call.FrequencyPenalty,
 261		PrepareStep: func(callContext context.Context, options fantasy.PrepareStepFunctionOptions) (_ context.Context, prepared fantasy.PrepareStepResult, err error) {
 262			prepared.Messages = options.Messages
 263			for i := range prepared.Messages {
 264				prepared.Messages[i].ProviderOptions = nil
 265			}
 266
 267			queuedCalls, _ := a.messageQueue.Get(call.SessionID)
 268			a.messageQueue.Del(call.SessionID)
 269			for _, queued := range queuedCalls {
 270				userMessage, createErr := a.createUserMessage(callContext, queued)
 271				if createErr != nil {
 272					return callContext, prepared, createErr
 273				}
 274				prepared.Messages = append(prepared.Messages, userMessage.ToAIMessage()...)
 275			}
 276
 277			prepared.Messages = a.workaroundProviderMediaLimitations(prepared.Messages, largeModel)
 278
 279			lastSystemRoleInx := 0
 280			systemMessageUpdated := false
 281			for i, msg := range prepared.Messages {
 282				// Only add cache control to the last message.
 283				if msg.Role == fantasy.MessageRoleSystem {
 284					lastSystemRoleInx = i
 285				} else if !systemMessageUpdated {
 286					prepared.Messages[lastSystemRoleInx].ProviderOptions = a.getCacheControlOptions()
 287					systemMessageUpdated = true
 288				}
 289				// Than add cache control to the last 2 messages.
 290				if i > len(prepared.Messages)-3 {
 291					prepared.Messages[i].ProviderOptions = a.getCacheControlOptions()
 292				}
 293			}
 294
 295			if promptPrefix != "" {
 296				prepared.Messages = append([]fantasy.Message{fantasy.NewSystemMessage(promptPrefix)}, prepared.Messages...)
 297			}
 298
 299			var assistantMsg message.Message
 300			assistantMsg, err = a.messages.Create(callContext, call.SessionID, message.CreateMessageParams{
 301				Role:     message.Assistant,
 302				Parts:    []message.ContentPart{},
 303				Model:    largeModel.ModelCfg.Model,
 304				Provider: largeModel.ModelCfg.Provider,
 305			})
 306			if err != nil {
 307				return callContext, prepared, err
 308			}
 309			callContext = context.WithValue(callContext, tools.MessageIDContextKey, assistantMsg.ID)
 310			callContext = context.WithValue(callContext, tools.SupportsImagesContextKey, largeModel.CatwalkCfg.SupportsImages)
 311			callContext = context.WithValue(callContext, tools.ModelNameContextKey, largeModel.CatwalkCfg.Name)
 312			currentAssistant = &assistantMsg
 313			return callContext, prepared, err
 314		},
 315		OnReasoningStart: func(id string, reasoning fantasy.ReasoningContent) error {
 316			currentAssistant.AppendReasoningContent(reasoning.Text)
 317			return a.messages.Update(genCtx, *currentAssistant)
 318		},
 319		OnReasoningDelta: func(id string, text string) error {
 320			currentAssistant.AppendReasoningContent(text)
 321			return a.messages.Update(genCtx, *currentAssistant)
 322		},
 323		OnReasoningEnd: func(id string, reasoning fantasy.ReasoningContent) error {
 324			// handle anthropic signature
 325			if anthropicData, ok := reasoning.ProviderMetadata[anthropic.Name]; ok {
 326				if reasoning, ok := anthropicData.(*anthropic.ReasoningOptionMetadata); ok {
 327					currentAssistant.AppendReasoningSignature(reasoning.Signature)
 328				}
 329			}
 330			if googleData, ok := reasoning.ProviderMetadata[google.Name]; ok {
 331				if reasoning, ok := googleData.(*google.ReasoningMetadata); ok {
 332					currentAssistant.AppendThoughtSignature(reasoning.Signature, reasoning.ToolID)
 333				}
 334			}
 335			if openaiData, ok := reasoning.ProviderMetadata[openai.Name]; ok {
 336				if reasoning, ok := openaiData.(*openai.ResponsesReasoningMetadata); ok {
 337					currentAssistant.SetReasoningResponsesData(reasoning)
 338				}
 339			}
 340			currentAssistant.FinishThinking()
 341			return a.messages.Update(genCtx, *currentAssistant)
 342		},
 343		OnTextDelta: func(id string, text string) error {
 344			// Strip leading newline from initial text content. This is is
 345			// particularly important in non-interactive mode where leading
 346			// newlines are very visible.
 347			if len(currentAssistant.Parts) == 0 {
 348				text = strings.TrimPrefix(text, "\n")
 349			}
 350
 351			currentAssistant.AppendContent(text)
 352			return a.messages.Update(genCtx, *currentAssistant)
 353		},
 354		OnToolInputStart: func(id string, toolName string) error {
 355			toolCall := message.ToolCall{
 356				ID:               id,
 357				Name:             toolName,
 358				ProviderExecuted: false,
 359				Finished:         false,
 360			}
 361			currentAssistant.AddToolCall(toolCall)
 362			return a.messages.Update(genCtx, *currentAssistant)
 363		},
 364		OnRetry: func(err *fantasy.ProviderError, delay time.Duration) {
 365			// TODO: implement
 366		},
 367		OnToolCall: func(tc fantasy.ToolCallContent) error {
 368			toolCall := message.ToolCall{
 369				ID:               tc.ToolCallID,
 370				Name:             tc.ToolName,
 371				Input:            tc.Input,
 372				ProviderExecuted: false,
 373				Finished:         true,
 374			}
 375			currentAssistant.AddToolCall(toolCall)
 376			return a.messages.Update(genCtx, *currentAssistant)
 377		},
 378		OnToolResult: func(result fantasy.ToolResultContent) error {
 379			toolResult := a.convertToToolResult(result)
 380			_, createMsgErr := a.messages.Create(genCtx, currentAssistant.SessionID, message.CreateMessageParams{
 381				Role: message.Tool,
 382				Parts: []message.ContentPart{
 383					toolResult,
 384				},
 385			})
 386			return createMsgErr
 387		},
 388		OnStepFinish: func(stepResult fantasy.StepResult) error {
 389			finishReason := message.FinishReasonUnknown
 390			switch stepResult.FinishReason {
 391			case fantasy.FinishReasonLength:
 392				finishReason = message.FinishReasonMaxTokens
 393			case fantasy.FinishReasonStop:
 394				finishReason = message.FinishReasonEndTurn
 395			case fantasy.FinishReasonToolCalls:
 396				finishReason = message.FinishReasonToolUse
 397			}
 398			currentAssistant.AddFinish(finishReason, "", "")
 399			sessionLock.Lock()
 400			defer sessionLock.Unlock()
 401
 402			updatedSession, getSessionErr := a.sessions.Get(ctx, call.SessionID)
 403			if getSessionErr != nil {
 404				return getSessionErr
 405			}
 406			a.updateSessionUsage(largeModel, &updatedSession, stepResult.Usage, a.openrouterCost(stepResult.ProviderMetadata))
 407			_, sessionErr := a.sessions.Save(ctx, updatedSession)
 408			if sessionErr != nil {
 409				return sessionErr
 410			}
 411			currentSession = updatedSession
 412			return a.messages.Update(genCtx, *currentAssistant)
 413		},
 414		StopWhen: []fantasy.StopCondition{
 415			func(_ []fantasy.StepResult) bool {
 416				cw := int64(largeModel.CatwalkCfg.ContextWindow)
 417				tokens := currentSession.CompletionTokens + currentSession.PromptTokens
 418				remaining := cw - tokens
 419				var threshold int64
 420				if cw > largeContextWindowThreshold {
 421					threshold = largeContextWindowBuffer
 422				} else {
 423					threshold = int64(float64(cw) * smallContextWindowRatio)
 424				}
 425				if (remaining <= threshold) && !a.disableAutoSummarize {
 426					shouldSummarize = true
 427					return true
 428				}
 429				return false
 430			},
 431			func(steps []fantasy.StepResult) bool {
 432				return hasRepeatedToolCalls(steps, loopDetectionWindowSize, loopDetectionMaxRepeats)
 433			},
 434		},
 435	})
 436
 437	a.eventPromptResponded(call.SessionID, time.Since(startTime).Truncate(time.Second))
 438
 439	if err != nil {
 440		isCancelErr := errors.Is(err, context.Canceled)
 441		isPermissionErr := errors.Is(err, permission.ErrorPermissionDenied)
 442		if currentAssistant == nil {
 443			return result, err
 444		}
 445		// Ensure we finish thinking on error to close the reasoning state.
 446		currentAssistant.FinishThinking()
 447		toolCalls := currentAssistant.ToolCalls()
 448		// INFO: we use the parent context here because the genCtx has been cancelled.
 449		msgs, createErr := a.messages.List(ctx, currentAssistant.SessionID)
 450		if createErr != nil {
 451			return nil, createErr
 452		}
 453		for _, tc := range toolCalls {
 454			if !tc.Finished {
 455				tc.Finished = true
 456				tc.Input = "{}"
 457				currentAssistant.AddToolCall(tc)
 458				updateErr := a.messages.Update(ctx, *currentAssistant)
 459				if updateErr != nil {
 460					return nil, updateErr
 461				}
 462			}
 463
 464			found := false
 465			for _, msg := range msgs {
 466				if msg.Role == message.Tool {
 467					for _, tr := range msg.ToolResults() {
 468						if tr.ToolCallID == tc.ID {
 469							found = true
 470							break
 471						}
 472					}
 473				}
 474				if found {
 475					break
 476				}
 477			}
 478			if found {
 479				continue
 480			}
 481			content := "There was an error while executing the tool"
 482			if isCancelErr {
 483				content = "Tool execution canceled by user"
 484			} else if isPermissionErr {
 485				content = "User denied permission"
 486			}
 487			toolResult := message.ToolResult{
 488				ToolCallID: tc.ID,
 489				Name:       tc.Name,
 490				Content:    content,
 491				IsError:    true,
 492			}
 493			_, createErr = a.messages.Create(ctx, currentAssistant.SessionID, message.CreateMessageParams{
 494				Role: message.Tool,
 495				Parts: []message.ContentPart{
 496					toolResult,
 497				},
 498			})
 499			if createErr != nil {
 500				return nil, createErr
 501			}
 502		}
 503		var fantasyErr *fantasy.Error
 504		var providerErr *fantasy.ProviderError
 505		const defaultTitle = "Provider Error"
 506		linkStyle := lipgloss.NewStyle().Foreground(charmtone.Guac).Underline(true)
 507		if isCancelErr {
 508			currentAssistant.AddFinish(message.FinishReasonCanceled, "User canceled request", "")
 509		} else if isPermissionErr {
 510			currentAssistant.AddFinish(message.FinishReasonPermissionDenied, "User denied permission", "")
 511		} else if errors.Is(err, hyper.ErrNoCredits) {
 512			url := hyper.BaseURL()
 513			link := linkStyle.Hyperlink(url, "id=hyper").Render(url)
 514			currentAssistant.AddFinish(message.FinishReasonError, "No credits", "You're out of credits. Add more at "+link)
 515		} else if errors.As(err, &providerErr) {
 516			if providerErr.Message == "The requested model is not supported." {
 517				url := "https://github.com/settings/copilot/features"
 518				link := linkStyle.Hyperlink(url, "id=copilot").Render(url)
 519				currentAssistant.AddFinish(
 520					message.FinishReasonError,
 521					"Copilot model not enabled",
 522					fmt.Sprintf("%q is not enabled in Copilot. Go to the following page to enable it. Then, wait 5 minutes before trying again. %s", largeModel.CatwalkCfg.Name, link),
 523				)
 524			} else {
 525				currentAssistant.AddFinish(message.FinishReasonError, cmp.Or(stringext.Capitalize(providerErr.Title), defaultTitle), providerErr.Message)
 526			}
 527		} else if errors.As(err, &fantasyErr) {
 528			currentAssistant.AddFinish(message.FinishReasonError, cmp.Or(stringext.Capitalize(fantasyErr.Title), defaultTitle), fantasyErr.Message)
 529		} else {
 530			currentAssistant.AddFinish(message.FinishReasonError, defaultTitle, err.Error())
 531		}
 532		// Note: we use the parent context here because the genCtx has been
 533		// cancelled.
 534		updateErr := a.messages.Update(ctx, *currentAssistant)
 535		if updateErr != nil {
 536			return nil, updateErr
 537		}
 538		return nil, err
 539	}
 540
 541	// Send notification that agent has finished its turn (skip for
 542	// nested/non-interactive sessions).
 543	if !call.NonInteractive && a.notify != nil {
 544		a.notify.Publish(pubsub.CreatedEvent, notify.Notification{
 545			SessionID:    call.SessionID,
 546			SessionTitle: currentSession.Title,
 547			Type:         notify.TypeAgentFinished,
 548		})
 549	}
 550
 551	if shouldSummarize {
 552		a.activeRequests.Del(call.SessionID)
 553		if summarizeErr := a.Summarize(genCtx, call.SessionID, call.ProviderOptions); summarizeErr != nil {
 554			return nil, summarizeErr
 555		}
 556		// If the agent wasn't done...
 557		if len(currentAssistant.ToolCalls()) > 0 {
 558			existing, ok := a.messageQueue.Get(call.SessionID)
 559			if !ok {
 560				existing = []SessionAgentCall{}
 561			}
 562			call.Prompt = fmt.Sprintf("The previous session was interrupted because it got too long, the initial user request was: `%s`", call.Prompt)
 563			existing = append(existing, call)
 564			a.messageQueue.Set(call.SessionID, existing)
 565		}
 566	}
 567
 568	// Release active request before processing queued messages.
 569	a.activeRequests.Del(call.SessionID)
 570	cancel()
 571
 572	queuedMessages, ok := a.messageQueue.Get(call.SessionID)
 573	if !ok || len(queuedMessages) == 0 {
 574		return result, err
 575	}
 576	// There are queued messages restart the loop.
 577	firstQueuedMessage := queuedMessages[0]
 578	a.messageQueue.Set(call.SessionID, queuedMessages[1:])
 579	return a.Run(ctx, firstQueuedMessage)
 580}
 581
 582func (a *sessionAgent) Summarize(ctx context.Context, sessionID string, opts fantasy.ProviderOptions) error {
 583	if a.IsSessionBusy(sessionID) {
 584		return ErrSessionBusy
 585	}
 586
 587	// Copy mutable fields under lock to avoid races with SetModels.
 588	largeModel := a.largeModel.Get()
 589	systemPromptPrefix := a.systemPromptPrefix.Get()
 590
 591	currentSession, err := a.sessions.Get(ctx, sessionID)
 592	if err != nil {
 593		return fmt.Errorf("failed to get session: %w", err)
 594	}
 595	msgs, err := a.getSessionMessages(ctx, currentSession)
 596	if err != nil {
 597		return err
 598	}
 599	if len(msgs) == 0 {
 600		// Nothing to summarize.
 601		return nil
 602	}
 603
 604	aiMsgs, _ := a.preparePrompt(msgs)
 605
 606	genCtx, cancel := context.WithCancel(ctx)
 607	a.activeRequests.Set(sessionID, cancel)
 608	defer a.activeRequests.Del(sessionID)
 609	defer cancel()
 610
 611	agent := fantasy.NewAgent(largeModel.Model,
 612		fantasy.WithSystemPrompt(string(summaryPrompt)),
 613		fantasy.WithUserAgent("Charm Crush/"+version.Version),
 614	)
 615	summaryMessage, err := a.messages.Create(ctx, sessionID, message.CreateMessageParams{
 616		Role:             message.Assistant,
 617		Model:            largeModel.Model.Model(),
 618		Provider:         largeModel.Model.Provider(),
 619		IsSummaryMessage: true,
 620	})
 621	if err != nil {
 622		return err
 623	}
 624
 625	summaryPromptText := buildSummaryPrompt(currentSession.Todos)
 626
 627	resp, err := agent.Stream(genCtx, fantasy.AgentStreamCall{
 628		Prompt:          summaryPromptText,
 629		Messages:        aiMsgs,
 630		ProviderOptions: opts,
 631		PrepareStep: func(callContext context.Context, options fantasy.PrepareStepFunctionOptions) (_ context.Context, prepared fantasy.PrepareStepResult, err error) {
 632			prepared.Messages = options.Messages
 633			if systemPromptPrefix != "" {
 634				prepared.Messages = append([]fantasy.Message{fantasy.NewSystemMessage(systemPromptPrefix)}, prepared.Messages...)
 635			}
 636			return callContext, prepared, nil
 637		},
 638		OnReasoningDelta: func(id string, text string) error {
 639			summaryMessage.AppendReasoningContent(text)
 640			return a.messages.Update(genCtx, summaryMessage)
 641		},
 642		OnReasoningEnd: func(id string, reasoning fantasy.ReasoningContent) error {
 643			// Handle anthropic signature.
 644			if anthropicData, ok := reasoning.ProviderMetadata["anthropic"]; ok {
 645				if signature, ok := anthropicData.(*anthropic.ReasoningOptionMetadata); ok && signature.Signature != "" {
 646					summaryMessage.AppendReasoningSignature(signature.Signature)
 647				}
 648			}
 649			summaryMessage.FinishThinking()
 650			return a.messages.Update(genCtx, summaryMessage)
 651		},
 652		OnTextDelta: func(id, text string) error {
 653			summaryMessage.AppendContent(text)
 654			return a.messages.Update(genCtx, summaryMessage)
 655		},
 656	})
 657	if err != nil {
 658		isCancelErr := errors.Is(err, context.Canceled)
 659		if isCancelErr {
 660			// User cancelled summarize we need to remove the summary message.
 661			deleteErr := a.messages.Delete(ctx, summaryMessage.ID)
 662			return deleteErr
 663		}
 664		return err
 665	}
 666
 667	summaryMessage.AddFinish(message.FinishReasonEndTurn, "", "")
 668	err = a.messages.Update(genCtx, summaryMessage)
 669	if err != nil {
 670		return err
 671	}
 672
 673	var openrouterCost *float64
 674	for _, step := range resp.Steps {
 675		stepCost := a.openrouterCost(step.ProviderMetadata)
 676		if stepCost != nil {
 677			newCost := *stepCost
 678			if openrouterCost != nil {
 679				newCost += *openrouterCost
 680			}
 681			openrouterCost = &newCost
 682		}
 683	}
 684
 685	a.updateSessionUsage(largeModel, &currentSession, resp.TotalUsage, openrouterCost)
 686
 687	// Just in case, get just the last usage info.
 688	usage := resp.Response.Usage
 689	currentSession.SummaryMessageID = summaryMessage.ID
 690	currentSession.CompletionTokens = usage.OutputTokens
 691	currentSession.PromptTokens = 0
 692	_, err = a.sessions.Save(genCtx, currentSession)
 693	return err
 694}
 695
 696func (a *sessionAgent) getCacheControlOptions() fantasy.ProviderOptions {
 697	if t, _ := strconv.ParseBool(os.Getenv("CRUSH_DISABLE_ANTHROPIC_CACHE")); t {
 698		return fantasy.ProviderOptions{}
 699	}
 700	return fantasy.ProviderOptions{
 701		anthropic.Name: &anthropic.ProviderCacheControlOptions{
 702			CacheControl: anthropic.CacheControl{Type: "ephemeral"},
 703		},
 704		bedrock.Name: &anthropic.ProviderCacheControlOptions{
 705			CacheControl: anthropic.CacheControl{Type: "ephemeral"},
 706		},
 707		vercel.Name: &anthropic.ProviderCacheControlOptions{
 708			CacheControl: anthropic.CacheControl{Type: "ephemeral"},
 709		},
 710	}
 711}
 712
 713func (a *sessionAgent) createUserMessage(ctx context.Context, call SessionAgentCall) (message.Message, error) {
 714	parts := []message.ContentPart{message.TextContent{Text: call.Prompt}}
 715	var attachmentParts []message.ContentPart
 716	for _, attachment := range call.Attachments {
 717		attachmentParts = append(attachmentParts, message.BinaryContent{Path: attachment.FilePath, MIMEType: attachment.MimeType, Data: attachment.Content})
 718	}
 719	parts = append(parts, attachmentParts...)
 720	msg, err := a.messages.Create(ctx, call.SessionID, message.CreateMessageParams{
 721		Role:  message.User,
 722		Parts: parts,
 723	})
 724	if err != nil {
 725		return message.Message{}, fmt.Errorf("failed to create user message: %w", err)
 726	}
 727	return msg, nil
 728}
 729
 730func (a *sessionAgent) preparePrompt(msgs []message.Message, attachments ...message.Attachment) ([]fantasy.Message, []fantasy.FilePart) {
 731	var history []fantasy.Message
 732	if !a.isSubAgent {
 733		history = append(history, fantasy.NewUserMessage(
 734			fmt.Sprintf("<system_reminder>%s</system_reminder>",
 735				`This is a reminder that your todo list is currently empty. DO NOT mention this to the user explicitly because they are already aware.
 736If you are working on tasks that would benefit from a todo list please use the "todos" tool to create one.
 737If not, please feel free to ignore. Again do not mention this message to the user.`,
 738			),
 739		))
 740	}
 741	for _, m := range msgs {
 742		if len(m.Parts) == 0 {
 743			continue
 744		}
 745		// Assistant message without content or tool calls (cancelled before it
 746		// returned anything).
 747		if m.Role == message.Assistant && len(m.ToolCalls()) == 0 && m.Content().Text == "" && m.ReasoningContent().String() == "" {
 748			continue
 749		}
 750		history = append(history, m.ToAIMessage()...)
 751	}
 752
 753	var files []fantasy.FilePart
 754	for _, attachment := range attachments {
 755		if attachment.IsText() {
 756			continue
 757		}
 758		files = append(files, fantasy.FilePart{
 759			Filename:  attachment.FileName,
 760			Data:      attachment.Content,
 761			MediaType: attachment.MimeType,
 762		})
 763	}
 764
 765	return history, files
 766}
 767
 768func (a *sessionAgent) getSessionMessages(ctx context.Context, session session.Session) ([]message.Message, error) {
 769	msgs, err := a.messages.List(ctx, session.ID)
 770	if err != nil {
 771		return nil, fmt.Errorf("failed to list messages: %w", err)
 772	}
 773
 774	if session.SummaryMessageID != "" {
 775		summaryMsgIndex := -1
 776		for i, msg := range msgs {
 777			if msg.ID == session.SummaryMessageID {
 778				summaryMsgIndex = i
 779				break
 780			}
 781		}
 782		if summaryMsgIndex != -1 {
 783			msgs = msgs[summaryMsgIndex:]
 784			msgs[0].Role = message.User
 785		}
 786	}
 787	return msgs, nil
 788}
 789
 790// generateTitle generates a session titled based on the initial prompt.
 791func (a *sessionAgent) generateTitle(ctx context.Context, sessionID string, userPrompt string) {
 792	if userPrompt == "" {
 793		return
 794	}
 795
 796	smallModel := a.smallModel.Get()
 797	largeModel := a.largeModel.Get()
 798	systemPromptPrefix := a.systemPromptPrefix.Get()
 799
 800	var maxOutputTokens int64 = 40
 801	if smallModel.CatwalkCfg.CanReason {
 802		maxOutputTokens = smallModel.CatwalkCfg.DefaultMaxTokens
 803	}
 804
 805	newAgent := func(m fantasy.LanguageModel, p []byte, tok int64) fantasy.Agent {
 806		return fantasy.NewAgent(m,
 807			fantasy.WithSystemPrompt(string(p)+"\n /no_think"),
 808			fantasy.WithMaxOutputTokens(tok),
 809			fantasy.WithUserAgent("Charm Crush/"+version.Version),
 810		)
 811	}
 812
 813	streamCall := fantasy.AgentStreamCall{
 814		Prompt: fmt.Sprintf("Generate a concise title for the following content:\n\n%s\n <think>\n\n</think>", userPrompt),
 815		PrepareStep: func(callCtx context.Context, opts fantasy.PrepareStepFunctionOptions) (_ context.Context, prepared fantasy.PrepareStepResult, err error) {
 816			prepared.Messages = opts.Messages
 817			if systemPromptPrefix != "" {
 818				prepared.Messages = append([]fantasy.Message{
 819					fantasy.NewSystemMessage(systemPromptPrefix),
 820				}, prepared.Messages...)
 821			}
 822			return callCtx, prepared, nil
 823		},
 824	}
 825
 826	// Use the small model to generate the title.
 827	model := smallModel
 828	agent := newAgent(model.Model, titlePrompt, maxOutputTokens)
 829	resp, err := agent.Stream(ctx, streamCall)
 830	if err == nil {
 831		// We successfully generated a title with the small model.
 832		slog.Debug("Generated title with small model")
 833	} else {
 834		// It didn't work. Let's try with the big model.
 835		slog.Error("Error generating title with small model; trying big model", "err", err)
 836		model = largeModel
 837		agent = newAgent(model.Model, titlePrompt, maxOutputTokens)
 838		resp, err = agent.Stream(ctx, streamCall)
 839		if err == nil {
 840			slog.Debug("Generated title with large model")
 841		} else {
 842			// Welp, the large model didn't work either. Use the default
 843			// session name and return.
 844			slog.Error("Error generating title with large model", "err", err)
 845			saveErr := a.sessions.UpdateTitleAndUsage(ctx, sessionID, DefaultSessionName, 0, 0, 0)
 846			if saveErr != nil {
 847				slog.Error("Failed to save session title and usage", "error", saveErr)
 848			}
 849			return
 850		}
 851	}
 852
 853	if resp == nil {
 854		// Actually, we didn't get a response so we can't. Use the default
 855		// session name and return.
 856		slog.Error("Response is nil; can't generate title")
 857		saveErr := a.sessions.UpdateTitleAndUsage(ctx, sessionID, DefaultSessionName, 0, 0, 0)
 858		if saveErr != nil {
 859			slog.Error("Failed to save session title and usage", "error", saveErr)
 860		}
 861		return
 862	}
 863
 864	// Clean up title.
 865	var title string
 866	title = strings.ReplaceAll(resp.Response.Content.Text(), "\n", " ")
 867
 868	// Remove thinking tags if present.
 869	title = thinkTagRegex.ReplaceAllString(title, "")
 870
 871	title = strings.TrimSpace(title)
 872	title = cmp.Or(title, DefaultSessionName)
 873
 874	// Calculate usage and cost.
 875	var openrouterCost *float64
 876	for _, step := range resp.Steps {
 877		stepCost := a.openrouterCost(step.ProviderMetadata)
 878		if stepCost != nil {
 879			newCost := *stepCost
 880			if openrouterCost != nil {
 881				newCost += *openrouterCost
 882			}
 883			openrouterCost = &newCost
 884		}
 885	}
 886
 887	modelConfig := model.CatwalkCfg
 888	cost := modelConfig.CostPer1MInCached/1e6*float64(resp.TotalUsage.CacheCreationTokens) +
 889		modelConfig.CostPer1MOutCached/1e6*float64(resp.TotalUsage.CacheReadTokens) +
 890		modelConfig.CostPer1MIn/1e6*float64(resp.TotalUsage.InputTokens) +
 891		modelConfig.CostPer1MOut/1e6*float64(resp.TotalUsage.OutputTokens)
 892
 893	// Use override cost if available (e.g., from OpenRouter).
 894	if openrouterCost != nil {
 895		cost = *openrouterCost
 896	}
 897
 898	promptTokens := resp.TotalUsage.InputTokens + resp.TotalUsage.CacheCreationTokens
 899	completionTokens := resp.TotalUsage.OutputTokens
 900
 901	// Atomically update only title and usage fields to avoid overriding other
 902	// concurrent session updates.
 903	saveErr := a.sessions.UpdateTitleAndUsage(ctx, sessionID, title, promptTokens, completionTokens, cost)
 904	if saveErr != nil {
 905		slog.Error("Failed to save session title and usage", "error", saveErr)
 906		return
 907	}
 908}
 909
 910func (a *sessionAgent) openrouterCost(metadata fantasy.ProviderMetadata) *float64 {
 911	openrouterMetadata, ok := metadata[openrouter.Name]
 912	if !ok {
 913		return nil
 914	}
 915
 916	opts, ok := openrouterMetadata.(*openrouter.ProviderMetadata)
 917	if !ok {
 918		return nil
 919	}
 920	return &opts.Usage.Cost
 921}
 922
 923func (a *sessionAgent) updateSessionUsage(model Model, session *session.Session, usage fantasy.Usage, overrideCost *float64) {
 924	modelConfig := model.CatwalkCfg
 925	cost := modelConfig.CostPer1MInCached/1e6*float64(usage.CacheCreationTokens) +
 926		modelConfig.CostPer1MOutCached/1e6*float64(usage.CacheReadTokens) +
 927		modelConfig.CostPer1MIn/1e6*float64(usage.InputTokens) +
 928		modelConfig.CostPer1MOut/1e6*float64(usage.OutputTokens)
 929
 930	a.eventTokensUsed(session.ID, model, usage, cost)
 931
 932	if overrideCost != nil {
 933		session.Cost += *overrideCost
 934	} else {
 935		session.Cost += cost
 936	}
 937
 938	session.CompletionTokens = usage.OutputTokens
 939	session.PromptTokens = usage.InputTokens + usage.CacheReadTokens
 940}
 941
 942func (a *sessionAgent) Cancel(sessionID string) {
 943	// Cancel regular requests. Don't use Take() here - we need the entry to
 944	// remain in activeRequests so IsBusy() returns true until the goroutine
 945	// fully completes (including error handling that may access the DB).
 946	// The defer in processRequest will clean up the entry.
 947	if cancel, ok := a.activeRequests.Get(sessionID); ok && cancel != nil {
 948		slog.Debug("Request cancellation initiated", "session_id", sessionID)
 949		cancel()
 950	}
 951
 952	// Also check for summarize requests.
 953	if cancel, ok := a.activeRequests.Get(sessionID + "-summarize"); ok && cancel != nil {
 954		slog.Debug("Summarize cancellation initiated", "session_id", sessionID)
 955		cancel()
 956	}
 957
 958	if a.QueuedPrompts(sessionID) > 0 {
 959		slog.Debug("Clearing queued prompts", "session_id", sessionID)
 960		a.messageQueue.Del(sessionID)
 961	}
 962}
 963
 964func (a *sessionAgent) ClearQueue(sessionID string) {
 965	if a.QueuedPrompts(sessionID) > 0 {
 966		slog.Debug("Clearing queued prompts", "session_id", sessionID)
 967		a.messageQueue.Del(sessionID)
 968	}
 969}
 970
 971func (a *sessionAgent) CancelAll() {
 972	if !a.IsBusy() {
 973		return
 974	}
 975	for key := range a.activeRequests.Seq2() {
 976		a.Cancel(key) // key is sessionID
 977	}
 978
 979	timeout := time.After(5 * time.Second)
 980	for a.IsBusy() {
 981		select {
 982		case <-timeout:
 983			return
 984		default:
 985			time.Sleep(200 * time.Millisecond)
 986		}
 987	}
 988}
 989
 990func (a *sessionAgent) IsBusy() bool {
 991	var busy bool
 992	for cancelFunc := range a.activeRequests.Seq() {
 993		if cancelFunc != nil {
 994			busy = true
 995			break
 996		}
 997	}
 998	return busy
 999}
1000
1001func (a *sessionAgent) IsSessionBusy(sessionID string) bool {
1002	_, busy := a.activeRequests.Get(sessionID)
1003	return busy
1004}
1005
1006func (a *sessionAgent) QueuedPrompts(sessionID string) int {
1007	l, ok := a.messageQueue.Get(sessionID)
1008	if !ok {
1009		return 0
1010	}
1011	return len(l)
1012}
1013
1014func (a *sessionAgent) QueuedPromptsList(sessionID string) []string {
1015	l, ok := a.messageQueue.Get(sessionID)
1016	if !ok {
1017		return nil
1018	}
1019	prompts := make([]string, len(l))
1020	for i, call := range l {
1021		prompts[i] = call.Prompt
1022	}
1023	return prompts
1024}
1025
1026func (a *sessionAgent) SetModels(large Model, small Model) {
1027	a.largeModel.Set(large)
1028	a.smallModel.Set(small)
1029}
1030
1031func (a *sessionAgent) SetTools(tools []fantasy.AgentTool) {
1032	a.tools.SetSlice(tools)
1033}
1034
1035func (a *sessionAgent) SetSystemPrompt(systemPrompt string) {
1036	a.systemPrompt.Set(systemPrompt)
1037}
1038
1039func (a *sessionAgent) Model() Model {
1040	return a.largeModel.Get()
1041}
1042
1043// convertToToolResult converts a fantasy tool result to a message tool result.
1044func (a *sessionAgent) convertToToolResult(result fantasy.ToolResultContent) message.ToolResult {
1045	baseResult := message.ToolResult{
1046		ToolCallID: result.ToolCallID,
1047		Name:       result.ToolName,
1048		Metadata:   result.ClientMetadata,
1049	}
1050
1051	switch result.Result.GetType() {
1052	case fantasy.ToolResultContentTypeText:
1053		if r, ok := fantasy.AsToolResultOutputType[fantasy.ToolResultOutputContentText](result.Result); ok {
1054			baseResult.Content = r.Text
1055		}
1056	case fantasy.ToolResultContentTypeError:
1057		if r, ok := fantasy.AsToolResultOutputType[fantasy.ToolResultOutputContentError](result.Result); ok {
1058			baseResult.Content = r.Error.Error()
1059			baseResult.IsError = true
1060		}
1061	case fantasy.ToolResultContentTypeMedia:
1062		if r, ok := fantasy.AsToolResultOutputType[fantasy.ToolResultOutputContentMedia](result.Result); ok {
1063			content := r.Text
1064			if content == "" {
1065				content = fmt.Sprintf("Loaded %s content", r.MediaType)
1066			}
1067			baseResult.Content = content
1068			baseResult.Data = r.Data
1069			baseResult.MIMEType = r.MediaType
1070		}
1071	}
1072
1073	return baseResult
1074}
1075
1076// workaroundProviderMediaLimitations converts media content in tool results to
1077// user messages for providers that don't natively support images in tool results.
1078//
1079// Problem: OpenAI, Google, OpenRouter, and other OpenAI-compatible providers
1080// don't support sending images/media in tool result messages - they only accept
1081// text in tool results. However, they DO support images in user messages.
1082//
1083// If we send media in tool results to these providers, the API returns an error.
1084//
1085// Solution: For these providers, we:
1086//  1. Replace the media in the tool result with a text placeholder
1087//  2. Inject a user message immediately after with the image as a file attachment
1088//  3. This maintains the tool execution flow while working around API limitations
1089//
1090// Anthropic and Bedrock support images natively in tool results, so we skip
1091// this workaround for them.
1092//
1093// Example transformation:
1094//
1095//	BEFORE: [tool result: image data]
1096//	AFTER:  [tool result: "Image loaded - see attached"], [user: image attachment]
1097func (a *sessionAgent) workaroundProviderMediaLimitations(messages []fantasy.Message, largeModel Model) []fantasy.Message {
1098	providerSupportsMedia := largeModel.ModelCfg.Provider == string(catwalk.InferenceProviderAnthropic) ||
1099		largeModel.ModelCfg.Provider == string(catwalk.InferenceProviderBedrock)
1100
1101	if providerSupportsMedia {
1102		return messages
1103	}
1104
1105	convertedMessages := make([]fantasy.Message, 0, len(messages))
1106
1107	for _, msg := range messages {
1108		if msg.Role != fantasy.MessageRoleTool {
1109			convertedMessages = append(convertedMessages, msg)
1110			continue
1111		}
1112
1113		textParts := make([]fantasy.MessagePart, 0, len(msg.Content))
1114		var mediaFiles []fantasy.FilePart
1115
1116		for _, part := range msg.Content {
1117			toolResult, ok := fantasy.AsMessagePart[fantasy.ToolResultPart](part)
1118			if !ok {
1119				textParts = append(textParts, part)
1120				continue
1121			}
1122
1123			if media, ok := fantasy.AsToolResultOutputType[fantasy.ToolResultOutputContentMedia](toolResult.Output); ok {
1124				decoded, err := base64.StdEncoding.DecodeString(media.Data)
1125				if err != nil {
1126					slog.Warn("Failed to decode media data", "error", err)
1127					textParts = append(textParts, part)
1128					continue
1129				}
1130
1131				mediaFiles = append(mediaFiles, fantasy.FilePart{
1132					Data:      decoded,
1133					MediaType: media.MediaType,
1134					Filename:  fmt.Sprintf("tool-result-%s", toolResult.ToolCallID),
1135				})
1136
1137				textParts = append(textParts, fantasy.ToolResultPart{
1138					ToolCallID: toolResult.ToolCallID,
1139					Output: fantasy.ToolResultOutputContentText{
1140						Text: "[Image/media content loaded - see attached file]",
1141					},
1142					ProviderOptions: toolResult.ProviderOptions,
1143				})
1144			} else {
1145				textParts = append(textParts, part)
1146			}
1147		}
1148
1149		convertedMessages = append(convertedMessages, fantasy.Message{
1150			Role:    fantasy.MessageRoleTool,
1151			Content: textParts,
1152		})
1153
1154		if len(mediaFiles) > 0 {
1155			convertedMessages = append(convertedMessages, fantasy.NewUserMessage(
1156				"Here is the media content from the tool result:",
1157				mediaFiles...,
1158			))
1159		}
1160	}
1161
1162	return convertedMessages
1163}
1164
1165// buildSummaryPrompt constructs the prompt text for session summarization.
1166func buildSummaryPrompt(todos []session.Todo) string {
1167	var sb strings.Builder
1168	sb.WriteString("Provide a detailed summary of our conversation above.")
1169	if len(todos) > 0 {
1170		sb.WriteString("\n\n## Current Todo List\n\n")
1171		for _, t := range todos {
1172			fmt.Fprintf(&sb, "- [%s] %s\n", t.Status, t.Content)
1173		}
1174		sb.WriteString("\nInclude these tasks and their statuses in your summary. ")
1175		sb.WriteString("Instruct the resuming assistant to use the `todos` tool to continue tracking progress on these tasks.")
1176	}
1177	return sb.String()
1178}