1// Package agent is the core orchestration layer for Crush AI agents.
2//
3// It provides session-based AI agent functionality for managing
4// conversations, tool execution, and message handling. It coordinates
5// interactions between language models, messages, sessions, and tools while
6// handling features like automatic summarization, queuing, and token
7// management.
8package agent
9
10import (
11 "cmp"
12 "context"
13 _ "embed"
14 "encoding/base64"
15 "errors"
16 "fmt"
17 "log/slog"
18 "os"
19 "regexp"
20 "strconv"
21 "strings"
22 "sync"
23 "time"
24
25 "charm.land/catwalk/pkg/catwalk"
26 "charm.land/fantasy"
27 "charm.land/fantasy/providers/anthropic"
28 "charm.land/fantasy/providers/bedrock"
29 "charm.land/fantasy/providers/google"
30 "charm.land/fantasy/providers/openai"
31 "charm.land/fantasy/providers/openrouter"
32 "charm.land/fantasy/providers/vercel"
33 "charm.land/lipgloss/v2"
34 "github.com/charmbracelet/crush/internal/agent/hyper"
35 "github.com/charmbracelet/crush/internal/agent/notify"
36 "github.com/charmbracelet/crush/internal/agent/tools"
37 "github.com/charmbracelet/crush/internal/agent/tools/mcp"
38 "github.com/charmbracelet/crush/internal/config"
39 "github.com/charmbracelet/crush/internal/csync"
40 "github.com/charmbracelet/crush/internal/message"
41 "github.com/charmbracelet/crush/internal/permission"
42 "github.com/charmbracelet/crush/internal/pubsub"
43 "github.com/charmbracelet/crush/internal/session"
44 "github.com/charmbracelet/crush/internal/stringext"
45 "github.com/charmbracelet/crush/internal/version"
46 "github.com/charmbracelet/x/exp/charmtone"
47)
48
49const (
50 DefaultSessionName = "Untitled Session"
51
52 // Constants for auto-summarization thresholds
53 largeContextWindowThreshold = 200_000
54 largeContextWindowBuffer = 20_000
55 smallContextWindowRatio = 0.2
56)
57
58//go:embed templates/title.md
59var titlePrompt []byte
60
61//go:embed templates/summary.md
62var summaryPrompt []byte
63
64// Used to remove <think> tags from generated titles.
65var thinkTagRegex = regexp.MustCompile(`<think>.*?</think>`)
66
67type SessionAgentCall struct {
68 SessionID string
69 Prompt string
70 ProviderOptions fantasy.ProviderOptions
71 Attachments []message.Attachment
72 MaxOutputTokens int64
73 Temperature *float64
74 TopP *float64
75 TopK *int64
76 FrequencyPenalty *float64
77 PresencePenalty *float64
78 NonInteractive bool
79}
80
81type SessionAgent interface {
82 Run(context.Context, SessionAgentCall) (*fantasy.AgentResult, error)
83 SetModels(large Model, small Model)
84 SetTools(tools []fantasy.AgentTool)
85 SetSystemPrompt(systemPrompt string)
86 Cancel(sessionID string)
87 CancelAll()
88 IsSessionBusy(sessionID string) bool
89 IsBusy() bool
90 QueuedPrompts(sessionID string) int
91 QueuedPromptsList(sessionID string) []string
92 ClearQueue(sessionID string)
93 Summarize(context.Context, string, fantasy.ProviderOptions) error
94 Model() Model
95}
96
97type Model struct {
98 Model fantasy.LanguageModel
99 CatwalkCfg catwalk.Model
100 ModelCfg config.SelectedModel
101}
102
103type sessionAgent struct {
104 largeModel *csync.Value[Model]
105 smallModel *csync.Value[Model]
106 systemPromptPrefix *csync.Value[string]
107 systemPrompt *csync.Value[string]
108 tools *csync.Slice[fantasy.AgentTool]
109
110 isSubAgent bool
111 sessions session.Service
112 messages message.Service
113 disableAutoSummarize bool
114 isYolo bool
115 notify pubsub.Publisher[notify.Notification]
116
117 messageQueue *csync.Map[string, []SessionAgentCall]
118 activeRequests *csync.Map[string, context.CancelFunc]
119}
120
121type SessionAgentOptions struct {
122 LargeModel Model
123 SmallModel Model
124 SystemPromptPrefix string
125 SystemPrompt string
126 IsSubAgent bool
127 DisableAutoSummarize bool
128 IsYolo bool
129 Sessions session.Service
130 Messages message.Service
131 Tools []fantasy.AgentTool
132 Notify pubsub.Publisher[notify.Notification]
133}
134
135func NewSessionAgent(
136 opts SessionAgentOptions,
137) SessionAgent {
138 return &sessionAgent{
139 largeModel: csync.NewValue(opts.LargeModel),
140 smallModel: csync.NewValue(opts.SmallModel),
141 systemPromptPrefix: csync.NewValue(opts.SystemPromptPrefix),
142 systemPrompt: csync.NewValue(opts.SystemPrompt),
143 isSubAgent: opts.IsSubAgent,
144 sessions: opts.Sessions,
145 messages: opts.Messages,
146 disableAutoSummarize: opts.DisableAutoSummarize,
147 tools: csync.NewSliceFrom(opts.Tools),
148 isYolo: opts.IsYolo,
149 notify: opts.Notify,
150 messageQueue: csync.NewMap[string, []SessionAgentCall](),
151 activeRequests: csync.NewMap[string, context.CancelFunc](),
152 }
153}
154
155func (a *sessionAgent) Run(ctx context.Context, call SessionAgentCall) (*fantasy.AgentResult, error) {
156 if call.Prompt == "" && !message.ContainsTextAttachment(call.Attachments) {
157 return nil, ErrEmptyPrompt
158 }
159 if call.SessionID == "" {
160 return nil, ErrSessionMissing
161 }
162
163 // Queue the message if busy
164 if a.IsSessionBusy(call.SessionID) {
165 existing, ok := a.messageQueue.Get(call.SessionID)
166 if !ok {
167 existing = []SessionAgentCall{}
168 }
169 existing = append(existing, call)
170 a.messageQueue.Set(call.SessionID, existing)
171 return nil, nil
172 }
173
174 // Copy mutable fields under lock to avoid races with SetTools/SetModels.
175 agentTools := a.tools.Copy()
176 largeModel := a.largeModel.Get()
177 systemPrompt := a.systemPrompt.Get()
178 promptPrefix := a.systemPromptPrefix.Get()
179 var instructions strings.Builder
180
181 for _, server := range mcp.GetStates() {
182 if server.State != mcp.StateConnected {
183 continue
184 }
185 if s := server.Client.InitializeResult().Instructions; s != "" {
186 instructions.WriteString(s)
187 instructions.WriteString("\n\n")
188 }
189 }
190
191 if s := instructions.String(); s != "" {
192 systemPrompt += "\n\n<mcp-instructions>\n" + s + "\n</mcp-instructions>"
193 }
194
195 if len(agentTools) > 0 {
196 // Add Anthropic caching to the last tool.
197 agentTools[len(agentTools)-1].SetProviderOptions(a.getCacheControlOptions())
198 }
199
200 agent := fantasy.NewAgent(
201 largeModel.Model,
202 fantasy.WithSystemPrompt(systemPrompt),
203 fantasy.WithTools(agentTools...),
204 fantasy.WithUserAgent("Charm Crush/"+version.Version),
205 )
206
207 sessionLock := sync.Mutex{}
208 currentSession, err := a.sessions.Get(ctx, call.SessionID)
209 if err != nil {
210 return nil, fmt.Errorf("failed to get session: %w", err)
211 }
212
213 msgs, err := a.getSessionMessages(ctx, currentSession)
214 if err != nil {
215 return nil, fmt.Errorf("failed to get session messages: %w", err)
216 }
217
218 var wg sync.WaitGroup
219 // Generate title if first message.
220 if len(msgs) == 0 {
221 titleCtx := ctx // Copy to avoid race with ctx reassignment below.
222 wg.Go(func() {
223 a.generateTitle(titleCtx, call.SessionID, call.Prompt)
224 })
225 }
226 defer wg.Wait()
227
228 // Add the user message to the session.
229 _, err = a.createUserMessage(ctx, call)
230 if err != nil {
231 return nil, err
232 }
233
234 // Add the session to the context.
235 ctx = context.WithValue(ctx, tools.SessionIDContextKey, call.SessionID)
236
237 genCtx, cancel := context.WithCancel(ctx)
238 a.activeRequests.Set(call.SessionID, cancel)
239
240 defer cancel()
241 defer a.activeRequests.Del(call.SessionID)
242
243 history, files := a.preparePrompt(msgs, call.Attachments...)
244
245 startTime := time.Now()
246 a.eventPromptSent(call.SessionID)
247
248 var currentAssistant *message.Message
249 var shouldSummarize bool
250 result, err := agent.Stream(genCtx, fantasy.AgentStreamCall{
251 Prompt: message.PromptWithTextAttachments(call.Prompt, call.Attachments),
252 Files: files,
253 Messages: history,
254 ProviderOptions: call.ProviderOptions,
255 MaxOutputTokens: &call.MaxOutputTokens,
256 TopP: call.TopP,
257 Temperature: call.Temperature,
258 PresencePenalty: call.PresencePenalty,
259 TopK: call.TopK,
260 FrequencyPenalty: call.FrequencyPenalty,
261 PrepareStep: func(callContext context.Context, options fantasy.PrepareStepFunctionOptions) (_ context.Context, prepared fantasy.PrepareStepResult, err error) {
262 prepared.Messages = options.Messages
263 for i := range prepared.Messages {
264 prepared.Messages[i].ProviderOptions = nil
265 }
266
267 queuedCalls, _ := a.messageQueue.Get(call.SessionID)
268 a.messageQueue.Del(call.SessionID)
269 for _, queued := range queuedCalls {
270 userMessage, createErr := a.createUserMessage(callContext, queued)
271 if createErr != nil {
272 return callContext, prepared, createErr
273 }
274 prepared.Messages = append(prepared.Messages, userMessage.ToAIMessage()...)
275 }
276
277 prepared.Messages = a.workaroundProviderMediaLimitations(prepared.Messages, largeModel)
278
279 lastSystemRoleInx := 0
280 systemMessageUpdated := false
281 for i, msg := range prepared.Messages {
282 // Only add cache control to the last message.
283 if msg.Role == fantasy.MessageRoleSystem {
284 lastSystemRoleInx = i
285 } else if !systemMessageUpdated {
286 prepared.Messages[lastSystemRoleInx].ProviderOptions = a.getCacheControlOptions()
287 systemMessageUpdated = true
288 }
289 // Than add cache control to the last 2 messages.
290 if i > len(prepared.Messages)-3 {
291 prepared.Messages[i].ProviderOptions = a.getCacheControlOptions()
292 }
293 }
294
295 if promptPrefix != "" {
296 prepared.Messages = append([]fantasy.Message{fantasy.NewSystemMessage(promptPrefix)}, prepared.Messages...)
297 }
298
299 var assistantMsg message.Message
300 assistantMsg, err = a.messages.Create(callContext, call.SessionID, message.CreateMessageParams{
301 Role: message.Assistant,
302 Parts: []message.ContentPart{},
303 Model: largeModel.ModelCfg.Model,
304 Provider: largeModel.ModelCfg.Provider,
305 })
306 if err != nil {
307 return callContext, prepared, err
308 }
309 callContext = context.WithValue(callContext, tools.MessageIDContextKey, assistantMsg.ID)
310 callContext = context.WithValue(callContext, tools.SupportsImagesContextKey, largeModel.CatwalkCfg.SupportsImages)
311 callContext = context.WithValue(callContext, tools.ModelNameContextKey, largeModel.CatwalkCfg.Name)
312 currentAssistant = &assistantMsg
313 return callContext, prepared, err
314 },
315 OnReasoningStart: func(id string, reasoning fantasy.ReasoningContent) error {
316 currentAssistant.AppendReasoningContent(reasoning.Text)
317 return a.messages.Update(genCtx, *currentAssistant)
318 },
319 OnReasoningDelta: func(id string, text string) error {
320 currentAssistant.AppendReasoningContent(text)
321 return a.messages.Update(genCtx, *currentAssistant)
322 },
323 OnReasoningEnd: func(id string, reasoning fantasy.ReasoningContent) error {
324 // handle anthropic signature
325 if anthropicData, ok := reasoning.ProviderMetadata[anthropic.Name]; ok {
326 if reasoning, ok := anthropicData.(*anthropic.ReasoningOptionMetadata); ok {
327 currentAssistant.AppendReasoningSignature(reasoning.Signature)
328 }
329 }
330 if googleData, ok := reasoning.ProviderMetadata[google.Name]; ok {
331 if reasoning, ok := googleData.(*google.ReasoningMetadata); ok {
332 currentAssistant.AppendThoughtSignature(reasoning.Signature, reasoning.ToolID)
333 }
334 }
335 if openaiData, ok := reasoning.ProviderMetadata[openai.Name]; ok {
336 if reasoning, ok := openaiData.(*openai.ResponsesReasoningMetadata); ok {
337 currentAssistant.SetReasoningResponsesData(reasoning)
338 }
339 }
340 currentAssistant.FinishThinking()
341 return a.messages.Update(genCtx, *currentAssistant)
342 },
343 OnTextDelta: func(id string, text string) error {
344 // Strip leading newline from initial text content. This is is
345 // particularly important in non-interactive mode where leading
346 // newlines are very visible.
347 if len(currentAssistant.Parts) == 0 {
348 text = strings.TrimPrefix(text, "\n")
349 }
350
351 currentAssistant.AppendContent(text)
352 return a.messages.Update(genCtx, *currentAssistant)
353 },
354 OnToolInputStart: func(id string, toolName string) error {
355 toolCall := message.ToolCall{
356 ID: id,
357 Name: toolName,
358 ProviderExecuted: false,
359 Finished: false,
360 }
361 currentAssistant.AddToolCall(toolCall)
362 return a.messages.Update(genCtx, *currentAssistant)
363 },
364 OnRetry: func(err *fantasy.ProviderError, delay time.Duration) {
365 // TODO: implement
366 },
367 OnToolCall: func(tc fantasy.ToolCallContent) error {
368 toolCall := message.ToolCall{
369 ID: tc.ToolCallID,
370 Name: tc.ToolName,
371 Input: tc.Input,
372 ProviderExecuted: false,
373 Finished: true,
374 }
375 currentAssistant.AddToolCall(toolCall)
376 return a.messages.Update(genCtx, *currentAssistant)
377 },
378 OnToolResult: func(result fantasy.ToolResultContent) error {
379 toolResult := a.convertToToolResult(result)
380 _, createMsgErr := a.messages.Create(genCtx, currentAssistant.SessionID, message.CreateMessageParams{
381 Role: message.Tool,
382 Parts: []message.ContentPart{
383 toolResult,
384 },
385 })
386 return createMsgErr
387 },
388 OnStepFinish: func(stepResult fantasy.StepResult) error {
389 finishReason := message.FinishReasonUnknown
390 switch stepResult.FinishReason {
391 case fantasy.FinishReasonLength:
392 finishReason = message.FinishReasonMaxTokens
393 case fantasy.FinishReasonStop:
394 finishReason = message.FinishReasonEndTurn
395 case fantasy.FinishReasonToolCalls:
396 finishReason = message.FinishReasonToolUse
397 }
398 currentAssistant.AddFinish(finishReason, "", "")
399 sessionLock.Lock()
400 defer sessionLock.Unlock()
401
402 updatedSession, getSessionErr := a.sessions.Get(ctx, call.SessionID)
403 if getSessionErr != nil {
404 return getSessionErr
405 }
406 a.updateSessionUsage(largeModel, &updatedSession, stepResult.Usage, a.openrouterCost(stepResult.ProviderMetadata))
407 _, sessionErr := a.sessions.Save(ctx, updatedSession)
408 if sessionErr != nil {
409 return sessionErr
410 }
411 currentSession = updatedSession
412 return a.messages.Update(genCtx, *currentAssistant)
413 },
414 StopWhen: []fantasy.StopCondition{
415 func(_ []fantasy.StepResult) bool {
416 cw := int64(largeModel.CatwalkCfg.ContextWindow)
417 tokens := currentSession.CompletionTokens + currentSession.PromptTokens
418 remaining := cw - tokens
419 var threshold int64
420 if cw > largeContextWindowThreshold {
421 threshold = largeContextWindowBuffer
422 } else {
423 threshold = int64(float64(cw) * smallContextWindowRatio)
424 }
425 if (remaining <= threshold) && !a.disableAutoSummarize {
426 shouldSummarize = true
427 return true
428 }
429 return false
430 },
431 func(steps []fantasy.StepResult) bool {
432 return hasRepeatedToolCalls(steps, loopDetectionWindowSize, loopDetectionMaxRepeats)
433 },
434 },
435 })
436
437 a.eventPromptResponded(call.SessionID, time.Since(startTime).Truncate(time.Second))
438
439 if err != nil {
440 isCancelErr := errors.Is(err, context.Canceled)
441 isPermissionErr := errors.Is(err, permission.ErrorPermissionDenied)
442 if currentAssistant == nil {
443 return result, err
444 }
445 // Ensure we finish thinking on error to close the reasoning state.
446 currentAssistant.FinishThinking()
447 toolCalls := currentAssistant.ToolCalls()
448 // INFO: we use the parent context here because the genCtx has been cancelled.
449 msgs, createErr := a.messages.List(ctx, currentAssistant.SessionID)
450 if createErr != nil {
451 return nil, createErr
452 }
453 for _, tc := range toolCalls {
454 if !tc.Finished {
455 tc.Finished = true
456 tc.Input = "{}"
457 currentAssistant.AddToolCall(tc)
458 updateErr := a.messages.Update(ctx, *currentAssistant)
459 if updateErr != nil {
460 return nil, updateErr
461 }
462 }
463
464 found := false
465 for _, msg := range msgs {
466 if msg.Role == message.Tool {
467 for _, tr := range msg.ToolResults() {
468 if tr.ToolCallID == tc.ID {
469 found = true
470 break
471 }
472 }
473 }
474 if found {
475 break
476 }
477 }
478 if found {
479 continue
480 }
481 content := "There was an error while executing the tool"
482 if isCancelErr {
483 content = "Tool execution canceled by user"
484 } else if isPermissionErr {
485 content = "User denied permission"
486 }
487 toolResult := message.ToolResult{
488 ToolCallID: tc.ID,
489 Name: tc.Name,
490 Content: content,
491 IsError: true,
492 }
493 _, createErr = a.messages.Create(ctx, currentAssistant.SessionID, message.CreateMessageParams{
494 Role: message.Tool,
495 Parts: []message.ContentPart{
496 toolResult,
497 },
498 })
499 if createErr != nil {
500 return nil, createErr
501 }
502 }
503 var fantasyErr *fantasy.Error
504 var providerErr *fantasy.ProviderError
505 const defaultTitle = "Provider Error"
506 linkStyle := lipgloss.NewStyle().Foreground(charmtone.Guac).Underline(true)
507 if isCancelErr {
508 currentAssistant.AddFinish(message.FinishReasonCanceled, "User canceled request", "")
509 } else if isPermissionErr {
510 currentAssistant.AddFinish(message.FinishReasonPermissionDenied, "User denied permission", "")
511 } else if errors.Is(err, hyper.ErrNoCredits) {
512 url := hyper.BaseURL()
513 link := linkStyle.Hyperlink(url, "id=hyper").Render(url)
514 currentAssistant.AddFinish(message.FinishReasonError, "No credits", "You're out of credits. Add more at "+link)
515 } else if errors.As(err, &providerErr) {
516 if providerErr.Message == "The requested model is not supported." {
517 url := "https://github.com/settings/copilot/features"
518 link := linkStyle.Hyperlink(url, "id=copilot").Render(url)
519 currentAssistant.AddFinish(
520 message.FinishReasonError,
521 "Copilot model not enabled",
522 fmt.Sprintf("%q is not enabled in Copilot. Go to the following page to enable it. Then, wait 5 minutes before trying again. %s", largeModel.CatwalkCfg.Name, link),
523 )
524 } else {
525 currentAssistant.AddFinish(message.FinishReasonError, cmp.Or(stringext.Capitalize(providerErr.Title), defaultTitle), providerErr.Message)
526 }
527 } else if errors.As(err, &fantasyErr) {
528 currentAssistant.AddFinish(message.FinishReasonError, cmp.Or(stringext.Capitalize(fantasyErr.Title), defaultTitle), fantasyErr.Message)
529 } else {
530 currentAssistant.AddFinish(message.FinishReasonError, defaultTitle, err.Error())
531 }
532 // Note: we use the parent context here because the genCtx has been
533 // cancelled.
534 updateErr := a.messages.Update(ctx, *currentAssistant)
535 if updateErr != nil {
536 return nil, updateErr
537 }
538 return nil, err
539 }
540
541 // Send notification that agent has finished its turn (skip for
542 // nested/non-interactive sessions).
543 if !call.NonInteractive && a.notify != nil {
544 a.notify.Publish(pubsub.CreatedEvent, notify.Notification{
545 SessionID: call.SessionID,
546 SessionTitle: currentSession.Title,
547 Type: notify.TypeAgentFinished,
548 })
549 }
550
551 if shouldSummarize {
552 a.activeRequests.Del(call.SessionID)
553 if summarizeErr := a.Summarize(genCtx, call.SessionID, call.ProviderOptions); summarizeErr != nil {
554 return nil, summarizeErr
555 }
556 // If the agent wasn't done...
557 if len(currentAssistant.ToolCalls()) > 0 {
558 existing, ok := a.messageQueue.Get(call.SessionID)
559 if !ok {
560 existing = []SessionAgentCall{}
561 }
562 call.Prompt = fmt.Sprintf("The previous session was interrupted because it got too long, the initial user request was: `%s`", call.Prompt)
563 existing = append(existing, call)
564 a.messageQueue.Set(call.SessionID, existing)
565 }
566 }
567
568 // Release active request before processing queued messages.
569 a.activeRequests.Del(call.SessionID)
570 cancel()
571
572 queuedMessages, ok := a.messageQueue.Get(call.SessionID)
573 if !ok || len(queuedMessages) == 0 {
574 return result, err
575 }
576 // There are queued messages restart the loop.
577 firstQueuedMessage := queuedMessages[0]
578 a.messageQueue.Set(call.SessionID, queuedMessages[1:])
579 return a.Run(ctx, firstQueuedMessage)
580}
581
582func (a *sessionAgent) Summarize(ctx context.Context, sessionID string, opts fantasy.ProviderOptions) error {
583 if a.IsSessionBusy(sessionID) {
584 return ErrSessionBusy
585 }
586
587 // Copy mutable fields under lock to avoid races with SetModels.
588 largeModel := a.largeModel.Get()
589 systemPromptPrefix := a.systemPromptPrefix.Get()
590
591 currentSession, err := a.sessions.Get(ctx, sessionID)
592 if err != nil {
593 return fmt.Errorf("failed to get session: %w", err)
594 }
595 msgs, err := a.getSessionMessages(ctx, currentSession)
596 if err != nil {
597 return err
598 }
599 if len(msgs) == 0 {
600 // Nothing to summarize.
601 return nil
602 }
603
604 aiMsgs, _ := a.preparePrompt(msgs)
605
606 genCtx, cancel := context.WithCancel(ctx)
607 a.activeRequests.Set(sessionID, cancel)
608 defer a.activeRequests.Del(sessionID)
609 defer cancel()
610
611 agent := fantasy.NewAgent(largeModel.Model,
612 fantasy.WithSystemPrompt(string(summaryPrompt)),
613 fantasy.WithUserAgent("Charm Crush/"+version.Version),
614 )
615 summaryMessage, err := a.messages.Create(ctx, sessionID, message.CreateMessageParams{
616 Role: message.Assistant,
617 Model: largeModel.Model.Model(),
618 Provider: largeModel.Model.Provider(),
619 IsSummaryMessage: true,
620 })
621 if err != nil {
622 return err
623 }
624
625 summaryPromptText := buildSummaryPrompt(currentSession.Todos)
626
627 resp, err := agent.Stream(genCtx, fantasy.AgentStreamCall{
628 Prompt: summaryPromptText,
629 Messages: aiMsgs,
630 ProviderOptions: opts,
631 PrepareStep: func(callContext context.Context, options fantasy.PrepareStepFunctionOptions) (_ context.Context, prepared fantasy.PrepareStepResult, err error) {
632 prepared.Messages = options.Messages
633 if systemPromptPrefix != "" {
634 prepared.Messages = append([]fantasy.Message{fantasy.NewSystemMessage(systemPromptPrefix)}, prepared.Messages...)
635 }
636 return callContext, prepared, nil
637 },
638 OnReasoningDelta: func(id string, text string) error {
639 summaryMessage.AppendReasoningContent(text)
640 return a.messages.Update(genCtx, summaryMessage)
641 },
642 OnReasoningEnd: func(id string, reasoning fantasy.ReasoningContent) error {
643 // Handle anthropic signature.
644 if anthropicData, ok := reasoning.ProviderMetadata["anthropic"]; ok {
645 if signature, ok := anthropicData.(*anthropic.ReasoningOptionMetadata); ok && signature.Signature != "" {
646 summaryMessage.AppendReasoningSignature(signature.Signature)
647 }
648 }
649 summaryMessage.FinishThinking()
650 return a.messages.Update(genCtx, summaryMessage)
651 },
652 OnTextDelta: func(id, text string) error {
653 summaryMessage.AppendContent(text)
654 return a.messages.Update(genCtx, summaryMessage)
655 },
656 })
657 if err != nil {
658 isCancelErr := errors.Is(err, context.Canceled)
659 if isCancelErr {
660 // User cancelled summarize we need to remove the summary message.
661 deleteErr := a.messages.Delete(ctx, summaryMessage.ID)
662 return deleteErr
663 }
664 return err
665 }
666
667 summaryMessage.AddFinish(message.FinishReasonEndTurn, "", "")
668 err = a.messages.Update(genCtx, summaryMessage)
669 if err != nil {
670 return err
671 }
672
673 var openrouterCost *float64
674 for _, step := range resp.Steps {
675 stepCost := a.openrouterCost(step.ProviderMetadata)
676 if stepCost != nil {
677 newCost := *stepCost
678 if openrouterCost != nil {
679 newCost += *openrouterCost
680 }
681 openrouterCost = &newCost
682 }
683 }
684
685 a.updateSessionUsage(largeModel, ¤tSession, resp.TotalUsage, openrouterCost)
686
687 // Just in case, get just the last usage info.
688 usage := resp.Response.Usage
689 currentSession.SummaryMessageID = summaryMessage.ID
690 currentSession.CompletionTokens = usage.OutputTokens
691 currentSession.PromptTokens = 0
692 _, err = a.sessions.Save(genCtx, currentSession)
693 return err
694}
695
696func (a *sessionAgent) getCacheControlOptions() fantasy.ProviderOptions {
697 if t, _ := strconv.ParseBool(os.Getenv("CRUSH_DISABLE_ANTHROPIC_CACHE")); t {
698 return fantasy.ProviderOptions{}
699 }
700 return fantasy.ProviderOptions{
701 anthropic.Name: &anthropic.ProviderCacheControlOptions{
702 CacheControl: anthropic.CacheControl{Type: "ephemeral"},
703 },
704 bedrock.Name: &anthropic.ProviderCacheControlOptions{
705 CacheControl: anthropic.CacheControl{Type: "ephemeral"},
706 },
707 vercel.Name: &anthropic.ProviderCacheControlOptions{
708 CacheControl: anthropic.CacheControl{Type: "ephemeral"},
709 },
710 }
711}
712
713func (a *sessionAgent) createUserMessage(ctx context.Context, call SessionAgentCall) (message.Message, error) {
714 parts := []message.ContentPart{message.TextContent{Text: call.Prompt}}
715 var attachmentParts []message.ContentPart
716 for _, attachment := range call.Attachments {
717 attachmentParts = append(attachmentParts, message.BinaryContent{Path: attachment.FilePath, MIMEType: attachment.MimeType, Data: attachment.Content})
718 }
719 parts = append(parts, attachmentParts...)
720 msg, err := a.messages.Create(ctx, call.SessionID, message.CreateMessageParams{
721 Role: message.User,
722 Parts: parts,
723 })
724 if err != nil {
725 return message.Message{}, fmt.Errorf("failed to create user message: %w", err)
726 }
727 return msg, nil
728}
729
730func (a *sessionAgent) preparePrompt(msgs []message.Message, attachments ...message.Attachment) ([]fantasy.Message, []fantasy.FilePart) {
731 var history []fantasy.Message
732 if !a.isSubAgent {
733 history = append(history, fantasy.NewUserMessage(
734 fmt.Sprintf("<system_reminder>%s</system_reminder>",
735 `This is a reminder that your todo list is currently empty. DO NOT mention this to the user explicitly because they are already aware.
736If you are working on tasks that would benefit from a todo list please use the "todos" tool to create one.
737If not, please feel free to ignore. Again do not mention this message to the user.`,
738 ),
739 ))
740 }
741 for _, m := range msgs {
742 if len(m.Parts) == 0 {
743 continue
744 }
745 // Assistant message without content or tool calls (cancelled before it
746 // returned anything).
747 if m.Role == message.Assistant && len(m.ToolCalls()) == 0 && m.Content().Text == "" && m.ReasoningContent().String() == "" {
748 continue
749 }
750 history = append(history, m.ToAIMessage()...)
751 }
752
753 var files []fantasy.FilePart
754 for _, attachment := range attachments {
755 if attachment.IsText() {
756 continue
757 }
758 files = append(files, fantasy.FilePart{
759 Filename: attachment.FileName,
760 Data: attachment.Content,
761 MediaType: attachment.MimeType,
762 })
763 }
764
765 return history, files
766}
767
768func (a *sessionAgent) getSessionMessages(ctx context.Context, session session.Session) ([]message.Message, error) {
769 msgs, err := a.messages.List(ctx, session.ID)
770 if err != nil {
771 return nil, fmt.Errorf("failed to list messages: %w", err)
772 }
773
774 if session.SummaryMessageID != "" {
775 summaryMsgIndex := -1
776 for i, msg := range msgs {
777 if msg.ID == session.SummaryMessageID {
778 summaryMsgIndex = i
779 break
780 }
781 }
782 if summaryMsgIndex != -1 {
783 msgs = msgs[summaryMsgIndex:]
784 msgs[0].Role = message.User
785 }
786 }
787 return msgs, nil
788}
789
790// generateTitle generates a session titled based on the initial prompt.
791func (a *sessionAgent) generateTitle(ctx context.Context, sessionID string, userPrompt string) {
792 if userPrompt == "" {
793 return
794 }
795
796 smallModel := a.smallModel.Get()
797 largeModel := a.largeModel.Get()
798 systemPromptPrefix := a.systemPromptPrefix.Get()
799
800 var maxOutputTokens int64 = 40
801 if smallModel.CatwalkCfg.CanReason {
802 maxOutputTokens = smallModel.CatwalkCfg.DefaultMaxTokens
803 }
804
805 newAgent := func(m fantasy.LanguageModel, p []byte, tok int64) fantasy.Agent {
806 return fantasy.NewAgent(m,
807 fantasy.WithSystemPrompt(string(p)+"\n /no_think"),
808 fantasy.WithMaxOutputTokens(tok),
809 fantasy.WithUserAgent("Charm Crush/"+version.Version),
810 )
811 }
812
813 streamCall := fantasy.AgentStreamCall{
814 Prompt: fmt.Sprintf("Generate a concise title for the following content:\n\n%s\n <think>\n\n</think>", userPrompt),
815 PrepareStep: func(callCtx context.Context, opts fantasy.PrepareStepFunctionOptions) (_ context.Context, prepared fantasy.PrepareStepResult, err error) {
816 prepared.Messages = opts.Messages
817 if systemPromptPrefix != "" {
818 prepared.Messages = append([]fantasy.Message{
819 fantasy.NewSystemMessage(systemPromptPrefix),
820 }, prepared.Messages...)
821 }
822 return callCtx, prepared, nil
823 },
824 }
825
826 // Use the small model to generate the title.
827 model := smallModel
828 agent := newAgent(model.Model, titlePrompt, maxOutputTokens)
829 resp, err := agent.Stream(ctx, streamCall)
830 if err == nil {
831 // We successfully generated a title with the small model.
832 slog.Debug("Generated title with small model")
833 } else {
834 // It didn't work. Let's try with the big model.
835 slog.Error("Error generating title with small model; trying big model", "err", err)
836 model = largeModel
837 agent = newAgent(model.Model, titlePrompt, maxOutputTokens)
838 resp, err = agent.Stream(ctx, streamCall)
839 if err == nil {
840 slog.Debug("Generated title with large model")
841 } else {
842 // Welp, the large model didn't work either. Use the default
843 // session name and return.
844 slog.Error("Error generating title with large model", "err", err)
845 saveErr := a.sessions.UpdateTitleAndUsage(ctx, sessionID, DefaultSessionName, 0, 0, 0)
846 if saveErr != nil {
847 slog.Error("Failed to save session title and usage", "error", saveErr)
848 }
849 return
850 }
851 }
852
853 if resp == nil {
854 // Actually, we didn't get a response so we can't. Use the default
855 // session name and return.
856 slog.Error("Response is nil; can't generate title")
857 saveErr := a.sessions.UpdateTitleAndUsage(ctx, sessionID, DefaultSessionName, 0, 0, 0)
858 if saveErr != nil {
859 slog.Error("Failed to save session title and usage", "error", saveErr)
860 }
861 return
862 }
863
864 // Clean up title.
865 var title string
866 title = strings.ReplaceAll(resp.Response.Content.Text(), "\n", " ")
867
868 // Remove thinking tags if present.
869 title = thinkTagRegex.ReplaceAllString(title, "")
870
871 title = strings.TrimSpace(title)
872 title = cmp.Or(title, DefaultSessionName)
873
874 // Calculate usage and cost.
875 var openrouterCost *float64
876 for _, step := range resp.Steps {
877 stepCost := a.openrouterCost(step.ProviderMetadata)
878 if stepCost != nil {
879 newCost := *stepCost
880 if openrouterCost != nil {
881 newCost += *openrouterCost
882 }
883 openrouterCost = &newCost
884 }
885 }
886
887 modelConfig := model.CatwalkCfg
888 cost := modelConfig.CostPer1MInCached/1e6*float64(resp.TotalUsage.CacheCreationTokens) +
889 modelConfig.CostPer1MOutCached/1e6*float64(resp.TotalUsage.CacheReadTokens) +
890 modelConfig.CostPer1MIn/1e6*float64(resp.TotalUsage.InputTokens) +
891 modelConfig.CostPer1MOut/1e6*float64(resp.TotalUsage.OutputTokens)
892
893 // Use override cost if available (e.g., from OpenRouter).
894 if openrouterCost != nil {
895 cost = *openrouterCost
896 }
897
898 promptTokens := resp.TotalUsage.InputTokens + resp.TotalUsage.CacheCreationTokens
899 completionTokens := resp.TotalUsage.OutputTokens
900
901 // Atomically update only title and usage fields to avoid overriding other
902 // concurrent session updates.
903 saveErr := a.sessions.UpdateTitleAndUsage(ctx, sessionID, title, promptTokens, completionTokens, cost)
904 if saveErr != nil {
905 slog.Error("Failed to save session title and usage", "error", saveErr)
906 return
907 }
908}
909
910func (a *sessionAgent) openrouterCost(metadata fantasy.ProviderMetadata) *float64 {
911 openrouterMetadata, ok := metadata[openrouter.Name]
912 if !ok {
913 return nil
914 }
915
916 opts, ok := openrouterMetadata.(*openrouter.ProviderMetadata)
917 if !ok {
918 return nil
919 }
920 return &opts.Usage.Cost
921}
922
923func (a *sessionAgent) updateSessionUsage(model Model, session *session.Session, usage fantasy.Usage, overrideCost *float64) {
924 modelConfig := model.CatwalkCfg
925 cost := modelConfig.CostPer1MInCached/1e6*float64(usage.CacheCreationTokens) +
926 modelConfig.CostPer1MOutCached/1e6*float64(usage.CacheReadTokens) +
927 modelConfig.CostPer1MIn/1e6*float64(usage.InputTokens) +
928 modelConfig.CostPer1MOut/1e6*float64(usage.OutputTokens)
929
930 a.eventTokensUsed(session.ID, model, usage, cost)
931
932 if overrideCost != nil {
933 session.Cost += *overrideCost
934 } else {
935 session.Cost += cost
936 }
937
938 session.CompletionTokens = usage.OutputTokens
939 session.PromptTokens = usage.InputTokens + usage.CacheReadTokens
940}
941
942func (a *sessionAgent) Cancel(sessionID string) {
943 // Cancel regular requests. Don't use Take() here - we need the entry to
944 // remain in activeRequests so IsBusy() returns true until the goroutine
945 // fully completes (including error handling that may access the DB).
946 // The defer in processRequest will clean up the entry.
947 if cancel, ok := a.activeRequests.Get(sessionID); ok && cancel != nil {
948 slog.Debug("Request cancellation initiated", "session_id", sessionID)
949 cancel()
950 }
951
952 // Also check for summarize requests.
953 if cancel, ok := a.activeRequests.Get(sessionID + "-summarize"); ok && cancel != nil {
954 slog.Debug("Summarize cancellation initiated", "session_id", sessionID)
955 cancel()
956 }
957
958 if a.QueuedPrompts(sessionID) > 0 {
959 slog.Debug("Clearing queued prompts", "session_id", sessionID)
960 a.messageQueue.Del(sessionID)
961 }
962}
963
964func (a *sessionAgent) ClearQueue(sessionID string) {
965 if a.QueuedPrompts(sessionID) > 0 {
966 slog.Debug("Clearing queued prompts", "session_id", sessionID)
967 a.messageQueue.Del(sessionID)
968 }
969}
970
971func (a *sessionAgent) CancelAll() {
972 if !a.IsBusy() {
973 return
974 }
975 for key := range a.activeRequests.Seq2() {
976 a.Cancel(key) // key is sessionID
977 }
978
979 timeout := time.After(5 * time.Second)
980 for a.IsBusy() {
981 select {
982 case <-timeout:
983 return
984 default:
985 time.Sleep(200 * time.Millisecond)
986 }
987 }
988}
989
990func (a *sessionAgent) IsBusy() bool {
991 var busy bool
992 for cancelFunc := range a.activeRequests.Seq() {
993 if cancelFunc != nil {
994 busy = true
995 break
996 }
997 }
998 return busy
999}
1000
1001func (a *sessionAgent) IsSessionBusy(sessionID string) bool {
1002 _, busy := a.activeRequests.Get(sessionID)
1003 return busy
1004}
1005
1006func (a *sessionAgent) QueuedPrompts(sessionID string) int {
1007 l, ok := a.messageQueue.Get(sessionID)
1008 if !ok {
1009 return 0
1010 }
1011 return len(l)
1012}
1013
1014func (a *sessionAgent) QueuedPromptsList(sessionID string) []string {
1015 l, ok := a.messageQueue.Get(sessionID)
1016 if !ok {
1017 return nil
1018 }
1019 prompts := make([]string, len(l))
1020 for i, call := range l {
1021 prompts[i] = call.Prompt
1022 }
1023 return prompts
1024}
1025
1026func (a *sessionAgent) SetModels(large Model, small Model) {
1027 a.largeModel.Set(large)
1028 a.smallModel.Set(small)
1029}
1030
1031func (a *sessionAgent) SetTools(tools []fantasy.AgentTool) {
1032 a.tools.SetSlice(tools)
1033}
1034
1035func (a *sessionAgent) SetSystemPrompt(systemPrompt string) {
1036 a.systemPrompt.Set(systemPrompt)
1037}
1038
1039func (a *sessionAgent) Model() Model {
1040 return a.largeModel.Get()
1041}
1042
1043// convertToToolResult converts a fantasy tool result to a message tool result.
1044func (a *sessionAgent) convertToToolResult(result fantasy.ToolResultContent) message.ToolResult {
1045 baseResult := message.ToolResult{
1046 ToolCallID: result.ToolCallID,
1047 Name: result.ToolName,
1048 Metadata: result.ClientMetadata,
1049 }
1050
1051 switch result.Result.GetType() {
1052 case fantasy.ToolResultContentTypeText:
1053 if r, ok := fantasy.AsToolResultOutputType[fantasy.ToolResultOutputContentText](result.Result); ok {
1054 baseResult.Content = r.Text
1055 }
1056 case fantasy.ToolResultContentTypeError:
1057 if r, ok := fantasy.AsToolResultOutputType[fantasy.ToolResultOutputContentError](result.Result); ok {
1058 baseResult.Content = r.Error.Error()
1059 baseResult.IsError = true
1060 }
1061 case fantasy.ToolResultContentTypeMedia:
1062 if r, ok := fantasy.AsToolResultOutputType[fantasy.ToolResultOutputContentMedia](result.Result); ok {
1063 content := r.Text
1064 if content == "" {
1065 content = fmt.Sprintf("Loaded %s content", r.MediaType)
1066 }
1067 baseResult.Content = content
1068 baseResult.Data = r.Data
1069 baseResult.MIMEType = r.MediaType
1070 }
1071 }
1072
1073 return baseResult
1074}
1075
1076// workaroundProviderMediaLimitations converts media content in tool results to
1077// user messages for providers that don't natively support images in tool results.
1078//
1079// Problem: OpenAI, Google, OpenRouter, and other OpenAI-compatible providers
1080// don't support sending images/media in tool result messages - they only accept
1081// text in tool results. However, they DO support images in user messages.
1082//
1083// If we send media in tool results to these providers, the API returns an error.
1084//
1085// Solution: For these providers, we:
1086// 1. Replace the media in the tool result with a text placeholder
1087// 2. Inject a user message immediately after with the image as a file attachment
1088// 3. This maintains the tool execution flow while working around API limitations
1089//
1090// Anthropic and Bedrock support images natively in tool results, so we skip
1091// this workaround for them.
1092//
1093// Example transformation:
1094//
1095// BEFORE: [tool result: image data]
1096// AFTER: [tool result: "Image loaded - see attached"], [user: image attachment]
1097func (a *sessionAgent) workaroundProviderMediaLimitations(messages []fantasy.Message, largeModel Model) []fantasy.Message {
1098 providerSupportsMedia := largeModel.ModelCfg.Provider == string(catwalk.InferenceProviderAnthropic) ||
1099 largeModel.ModelCfg.Provider == string(catwalk.InferenceProviderBedrock)
1100
1101 if providerSupportsMedia {
1102 return messages
1103 }
1104
1105 convertedMessages := make([]fantasy.Message, 0, len(messages))
1106
1107 for _, msg := range messages {
1108 if msg.Role != fantasy.MessageRoleTool {
1109 convertedMessages = append(convertedMessages, msg)
1110 continue
1111 }
1112
1113 textParts := make([]fantasy.MessagePart, 0, len(msg.Content))
1114 var mediaFiles []fantasy.FilePart
1115
1116 for _, part := range msg.Content {
1117 toolResult, ok := fantasy.AsMessagePart[fantasy.ToolResultPart](part)
1118 if !ok {
1119 textParts = append(textParts, part)
1120 continue
1121 }
1122
1123 if media, ok := fantasy.AsToolResultOutputType[fantasy.ToolResultOutputContentMedia](toolResult.Output); ok {
1124 decoded, err := base64.StdEncoding.DecodeString(media.Data)
1125 if err != nil {
1126 slog.Warn("Failed to decode media data", "error", err)
1127 textParts = append(textParts, part)
1128 continue
1129 }
1130
1131 mediaFiles = append(mediaFiles, fantasy.FilePart{
1132 Data: decoded,
1133 MediaType: media.MediaType,
1134 Filename: fmt.Sprintf("tool-result-%s", toolResult.ToolCallID),
1135 })
1136
1137 textParts = append(textParts, fantasy.ToolResultPart{
1138 ToolCallID: toolResult.ToolCallID,
1139 Output: fantasy.ToolResultOutputContentText{
1140 Text: "[Image/media content loaded - see attached file]",
1141 },
1142 ProviderOptions: toolResult.ProviderOptions,
1143 })
1144 } else {
1145 textParts = append(textParts, part)
1146 }
1147 }
1148
1149 convertedMessages = append(convertedMessages, fantasy.Message{
1150 Role: fantasy.MessageRoleTool,
1151 Content: textParts,
1152 })
1153
1154 if len(mediaFiles) > 0 {
1155 convertedMessages = append(convertedMessages, fantasy.NewUserMessage(
1156 "Here is the media content from the tool result:",
1157 mediaFiles...,
1158 ))
1159 }
1160 }
1161
1162 return convertedMessages
1163}
1164
1165// buildSummaryPrompt constructs the prompt text for session summarization.
1166func buildSummaryPrompt(todos []session.Todo) string {
1167 var sb strings.Builder
1168 sb.WriteString("Provide a detailed summary of our conversation above.")
1169 if len(todos) > 0 {
1170 sb.WriteString("\n\n## Current Todo List\n\n")
1171 for _, t := range todos {
1172 fmt.Fprintf(&sb, "- [%s] %s\n", t.Status, t.Content)
1173 }
1174 sb.WriteString("\nInclude these tasks and their statuses in your summary. ")
1175 sb.WriteString("Instruct the resuming assistant to use the `todos` tool to continue tracking progress on these tasks.")
1176 }
1177 return sb.String()
1178}