1package agent
2
3import (
4 "bytes"
5 "cmp"
6 "context"
7 "encoding/json"
8 "errors"
9 "fmt"
10 "io"
11 "log/slog"
12 "maps"
13 "net/http"
14 "os"
15 "path/filepath"
16 "slices"
17 "strings"
18
19 "charm.land/catwalk/pkg/catwalk"
20 "charm.land/fantasy"
21 "github.com/charmbracelet/crush/internal/agent/hyper"
22 "github.com/charmbracelet/crush/internal/agent/notify"
23 "github.com/charmbracelet/crush/internal/agent/prompt"
24 "github.com/charmbracelet/crush/internal/agent/tools"
25 "github.com/charmbracelet/crush/internal/config"
26 "github.com/charmbracelet/crush/internal/event"
27 "github.com/charmbracelet/crush/internal/filetracker"
28 "github.com/charmbracelet/crush/internal/history"
29 "github.com/charmbracelet/crush/internal/home"
30 "github.com/charmbracelet/crush/internal/log"
31 "github.com/charmbracelet/crush/internal/lsp"
32 "github.com/charmbracelet/crush/internal/message"
33 "github.com/charmbracelet/crush/internal/oauth/copilot"
34 "github.com/charmbracelet/crush/internal/permission"
35 "github.com/charmbracelet/crush/internal/pubsub"
36 "github.com/charmbracelet/crush/internal/session"
37 "github.com/charmbracelet/crush/internal/skills"
38 "golang.org/x/sync/errgroup"
39
40 "charm.land/fantasy/providers/anthropic"
41 "charm.land/fantasy/providers/azure"
42 "charm.land/fantasy/providers/bedrock"
43 "charm.land/fantasy/providers/google"
44 "charm.land/fantasy/providers/openai"
45 "charm.land/fantasy/providers/openaicompat"
46 "charm.land/fantasy/providers/openrouter"
47 "charm.land/fantasy/providers/vercel"
48 openaisdk "github.com/charmbracelet/openai-go/option"
49 "github.com/qjebbs/go-jsons"
50)
51
52// Coordinator errors.
53var (
54 errCoderAgentNotConfigured = errors.New("coder agent not configured")
55 errModelProviderNotConfigured = errors.New("model provider not configured")
56 errLargeModelNotSelected = errors.New("large model not selected")
57 errSmallModelNotSelected = errors.New("small model not selected")
58 errLargeModelProviderNotConfigured = errors.New("large model provider not configured")
59 errSmallModelProviderNotConfigured = errors.New("small model provider not configured")
60 errLargeModelNotFound = errors.New("large model not found in provider config")
61 errSmallModelNotFound = errors.New("small model not found in provider config")
62)
63
64type Coordinator interface {
65 // INFO: (kujtim) this is not used yet we will use this when we have multiple agents
66 // SetMainAgent(string)
67 Run(ctx context.Context, sessionID, prompt string, attachments ...message.Attachment) (*fantasy.AgentResult, error)
68 Cancel(sessionID string)
69 CancelAll()
70 IsSessionBusy(sessionID string) bool
71 IsBusy() bool
72 QueuedPrompts(sessionID string) int
73 QueuedPromptsList(sessionID string) []string
74 ClearQueue(sessionID string)
75 Summarize(context.Context, string) error
76 Model() Model
77 UpdateModels(ctx context.Context) error
78}
79
80type coordinator struct {
81 cfg *config.ConfigStore
82 sessions session.Service
83 messages message.Service
84 permissions permission.Service
85 history history.Service
86 filetracker filetracker.Service
87 lspManager *lsp.Manager
88 notify pubsub.Publisher[notify.Notification]
89
90 currentAgent SessionAgent
91 agents map[string]SessionAgent
92
93 // Skills discovery results (session-start snapshot).
94 allSkills []*skills.Skill // Pre-filter: all discovered after dedup.
95 activeSkills []*skills.Skill // Post-filter: active skills only.
96 skillTracker *skills.Tracker
97
98 readyWg errgroup.Group
99}
100
101func NewCoordinator(
102 ctx context.Context,
103 cfg *config.ConfigStore,
104 sessions session.Service,
105 messages message.Service,
106 permissions permission.Service,
107 history history.Service,
108 filetracker filetracker.Service,
109 lspManager *lsp.Manager,
110 notify pubsub.Publisher[notify.Notification],
111) (Coordinator, error) {
112 // Discover skills once at session start.
113 allSkills, activeSkills := discoverSkills(cfg)
114 skillTracker := skills.NewTracker(activeSkills)
115
116 c := &coordinator{
117 cfg: cfg,
118 sessions: sessions,
119 messages: messages,
120 permissions: permissions,
121 history: history,
122 filetracker: filetracker,
123 lspManager: lspManager,
124 notify: notify,
125 agents: make(map[string]SessionAgent),
126 allSkills: allSkills,
127 activeSkills: activeSkills,
128 skillTracker: skillTracker,
129 }
130
131 agentCfg, ok := cfg.Config().Agents[config.AgentCoder]
132 if !ok {
133 return nil, errCoderAgentNotConfigured
134 }
135
136 // TODO: make this dynamic when we support multiple agents
137 prompt, err := coderPrompt(prompt.WithWorkingDir(c.cfg.WorkingDir()))
138 if err != nil {
139 return nil, err
140 }
141
142 agent, err := c.buildAgent(ctx, prompt, agentCfg, false)
143 if err != nil {
144 return nil, err
145 }
146 c.currentAgent = agent
147 c.agents[config.AgentCoder] = agent
148 return c, nil
149}
150
151// Run implements Coordinator.
152func (c *coordinator) Run(ctx context.Context, sessionID string, prompt string, attachments ...message.Attachment) (*fantasy.AgentResult, error) {
153 if err := c.readyWg.Wait(); err != nil {
154 return nil, err
155 }
156
157 // refresh models before each run
158 if err := c.UpdateModels(ctx); err != nil {
159 return nil, fmt.Errorf("failed to update models: %w", err)
160 }
161
162 model := c.currentAgent.Model()
163 maxTokens := model.CatwalkCfg.DefaultMaxTokens
164 if model.ModelCfg.MaxTokens != 0 {
165 maxTokens = model.ModelCfg.MaxTokens
166 }
167
168 if !model.CatwalkCfg.SupportsImages && attachments != nil {
169 // filter out image attachments
170 filteredAttachments := make([]message.Attachment, 0, len(attachments))
171 for _, att := range attachments {
172 if att.IsText() {
173 filteredAttachments = append(filteredAttachments, att)
174 }
175 }
176 attachments = filteredAttachments
177 }
178
179 providerCfg, ok := c.cfg.Config().Providers.Get(model.ModelCfg.Provider)
180 if !ok {
181 return nil, errModelProviderNotConfigured
182 }
183
184 mergedOptions, temp, topP, topK, freqPenalty, presPenalty := mergeCallOptions(model, providerCfg)
185
186 if providerCfg.OAuthToken != nil && providerCfg.OAuthToken.IsExpired() {
187 slog.Debug("Token needs to be refreshed", "provider", providerCfg.ID)
188 if err := c.refreshOAuth2Token(ctx, providerCfg); err != nil {
189 return nil, err
190 }
191 }
192
193 run := func() (*fantasy.AgentResult, error) {
194 return c.currentAgent.Run(ctx, SessionAgentCall{
195 SessionID: sessionID,
196 Prompt: prompt,
197 Attachments: attachments,
198 MaxOutputTokens: maxTokens,
199 ProviderOptions: mergedOptions,
200 Temperature: temp,
201 TopP: topP,
202 TopK: topK,
203 FrequencyPenalty: freqPenalty,
204 PresencePenalty: presPenalty,
205 })
206 }
207 beforeLoaded := c.skillTracker.LoadedNames()
208 result, originalErr := run()
209 logTurnSkillUsage(sessionID, prompt, c.activeSkills, c.skillTracker, beforeLoaded)
210
211 if c.isUnauthorized(originalErr) {
212 switch {
213 case providerCfg.OAuthToken != nil:
214 slog.Debug("Received 401. Refreshing token and retrying", "provider", providerCfg.ID)
215 if err := c.refreshOAuth2Token(ctx, providerCfg); err != nil {
216 return nil, originalErr
217 }
218 slog.Debug("Retrying request with refreshed OAuth token", "provider", providerCfg.ID)
219 return run()
220 case strings.Contains(providerCfg.APIKeyTemplate, "$"):
221 slog.Debug("Received 401. Refreshing API Key template and retrying", "provider", providerCfg.ID)
222 if err := c.refreshApiKeyTemplate(ctx, providerCfg); err != nil {
223 return nil, originalErr
224 }
225 slog.Debug("Retrying request with refreshed API key", "provider", providerCfg.ID)
226 return run()
227 }
228 }
229
230 return result, originalErr
231}
232
233func getProviderOptions(model Model, providerCfg config.ProviderConfig) fantasy.ProviderOptions {
234 options := fantasy.ProviderOptions{}
235
236 cfgOpts := []byte("{}")
237 providerCfgOpts := []byte("{}")
238 catwalkOpts := []byte("{}")
239
240 if model.ModelCfg.ProviderOptions != nil {
241 data, err := json.Marshal(model.ModelCfg.ProviderOptions)
242 if err == nil {
243 cfgOpts = data
244 }
245 }
246
247 if providerCfg.ProviderOptions != nil {
248 data, err := json.Marshal(providerCfg.ProviderOptions)
249 if err == nil {
250 providerCfgOpts = data
251 }
252 }
253
254 if model.CatwalkCfg.Options.ProviderOptions != nil {
255 data, err := json.Marshal(model.CatwalkCfg.Options.ProviderOptions)
256 if err == nil {
257 catwalkOpts = data
258 }
259 }
260
261 readers := []io.Reader{
262 bytes.NewReader(catwalkOpts),
263 bytes.NewReader(providerCfgOpts),
264 bytes.NewReader(cfgOpts),
265 }
266
267 got, err := jsons.Merge(readers)
268 if err != nil {
269 slog.Error("Could not merge call config", "err", err)
270 return options
271 }
272
273 mergedOptions := make(map[string]any)
274
275 err = json.Unmarshal([]byte(got), &mergedOptions)
276 if err != nil {
277 slog.Error("Could not create config for call", "err", err)
278 return options
279 }
280
281 providerType := providerCfg.Type
282 if providerType == "hyper" {
283 if strings.Contains(model.CatwalkCfg.ID, "claude") {
284 providerType = anthropic.Name
285 } else if strings.Contains(model.CatwalkCfg.ID, "gpt") {
286 providerType = openai.Name
287 } else if strings.Contains(model.CatwalkCfg.ID, "gemini") {
288 providerType = google.Name
289 } else {
290 providerType = openaicompat.Name
291 }
292 }
293
294 switch providerType {
295 case openai.Name, azure.Name:
296 _, hasReasoningEffort := mergedOptions["reasoning_effort"]
297 if !hasReasoningEffort && model.ModelCfg.ReasoningEffort != "" {
298 mergedOptions["reasoning_effort"] = model.ModelCfg.ReasoningEffort
299 }
300 if openai.IsResponsesModel(model.CatwalkCfg.ID) {
301 if openai.IsResponsesReasoningModel(model.CatwalkCfg.ID) {
302 mergedOptions["reasoning_summary"] = "auto"
303 mergedOptions["include"] = []openai.IncludeType{openai.IncludeReasoningEncryptedContent}
304 }
305 parsed, err := openai.ParseResponsesOptions(mergedOptions)
306 if err == nil {
307 options[openai.Name] = parsed
308 }
309 } else {
310 parsed, err := openai.ParseOptions(mergedOptions)
311 if err == nil {
312 options[openai.Name] = parsed
313 }
314 }
315 case anthropic.Name:
316 var (
317 _, hasEffort = mergedOptions["effort"]
318 _, hasThink = mergedOptions["thinking"]
319 )
320 switch {
321 case !hasEffort && model.ModelCfg.ReasoningEffort != "":
322 mergedOptions["effort"] = model.ModelCfg.ReasoningEffort
323 case !hasThink && model.ModelCfg.Think:
324 mergedOptions["thinking"] = map[string]any{"budget_tokens": 2000}
325 }
326 parsed, err := anthropic.ParseOptions(mergedOptions)
327 if err == nil {
328 options[anthropic.Name] = parsed
329 }
330
331 case openrouter.Name:
332 _, hasReasoning := mergedOptions["reasoning"]
333 if !hasReasoning && model.ModelCfg.ReasoningEffort != "" {
334 mergedOptions["reasoning"] = map[string]any{
335 "enabled": true,
336 "effort": model.ModelCfg.ReasoningEffort,
337 }
338 }
339 parsed, err := openrouter.ParseOptions(mergedOptions)
340 if err == nil {
341 options[openrouter.Name] = parsed
342 }
343 case vercel.Name:
344 _, hasReasoning := mergedOptions["reasoning"]
345 if !hasReasoning && model.ModelCfg.ReasoningEffort != "" {
346 mergedOptions["reasoning"] = map[string]any{
347 "enabled": true,
348 "effort": model.ModelCfg.ReasoningEffort,
349 }
350 }
351 parsed, err := vercel.ParseOptions(mergedOptions)
352 if err == nil {
353 options[vercel.Name] = parsed
354 }
355 case google.Name:
356 _, hasReasoning := mergedOptions["thinking_config"]
357 if !hasReasoning {
358 if strings.HasPrefix(model.CatwalkCfg.ID, "gemini-2") {
359 mergedOptions["thinking_config"] = map[string]any{
360 "thinking_budget": 2000,
361 "include_thoughts": true,
362 }
363 } else {
364 mergedOptions["thinking_config"] = map[string]any{
365 "thinking_level": model.ModelCfg.ReasoningEffort,
366 "include_thoughts": true,
367 }
368 }
369 }
370 parsed, err := google.ParseOptions(mergedOptions)
371 if err == nil {
372 options[google.Name] = parsed
373 }
374 case openaicompat.Name:
375 _, hasReasoningEffort := mergedOptions["reasoning_effort"]
376 if !hasReasoningEffort && model.ModelCfg.ReasoningEffort != "" {
377 mergedOptions["reasoning_effort"] = model.ModelCfg.ReasoningEffort
378 }
379 parsed, err := openaicompat.ParseOptions(mergedOptions)
380 if err == nil {
381 options[openaicompat.Name] = parsed
382 }
383 }
384
385 return options
386}
387
388func mergeCallOptions(model Model, cfg config.ProviderConfig) (fantasy.ProviderOptions, *float64, *float64, *int64, *float64, *float64) {
389 modelOptions := getProviderOptions(model, cfg)
390 temp := cmp.Or(model.ModelCfg.Temperature, model.CatwalkCfg.Options.Temperature)
391 topP := cmp.Or(model.ModelCfg.TopP, model.CatwalkCfg.Options.TopP)
392 topK := cmp.Or(model.ModelCfg.TopK, model.CatwalkCfg.Options.TopK)
393 freqPenalty := cmp.Or(model.ModelCfg.FrequencyPenalty, model.CatwalkCfg.Options.FrequencyPenalty)
394 presPenalty := cmp.Or(model.ModelCfg.PresencePenalty, model.CatwalkCfg.Options.PresencePenalty)
395 return modelOptions, temp, topP, topK, freqPenalty, presPenalty
396}
397
398func (c *coordinator) buildAgent(ctx context.Context, prompt *prompt.Prompt, agent config.Agent, isSubAgent bool) (SessionAgent, error) {
399 large, small, err := c.buildAgentModels(ctx, isSubAgent)
400 if err != nil {
401 return nil, err
402 }
403
404 largeProviderCfg, _ := c.cfg.Config().Providers.Get(large.ModelCfg.Provider)
405 result := NewSessionAgent(SessionAgentOptions{
406 LargeModel: large,
407 SmallModel: small,
408 SystemPromptPrefix: largeProviderCfg.SystemPromptPrefix,
409 SystemPrompt: "",
410 IsSubAgent: isSubAgent,
411 DisableAutoSummarize: c.cfg.Config().Options.DisableAutoSummarize,
412 IsYolo: c.permissions.SkipRequests(),
413 Sessions: c.sessions,
414 Messages: c.messages,
415 Tools: nil,
416 Notify: c.notify,
417 })
418
419 c.readyWg.Go(func() error {
420 systemPrompt, err := prompt.Build(ctx, large.Model.Provider(), large.Model.Model(), c.cfg)
421 if err != nil {
422 return err
423 }
424 result.SetSystemPrompt(systemPrompt)
425 return nil
426 })
427
428 c.readyWg.Go(func() error {
429 tools, err := c.buildTools(ctx, agent)
430 if err != nil {
431 return err
432 }
433 result.SetTools(tools)
434 return nil
435 })
436
437 return result, nil
438}
439
440func (c *coordinator) buildTools(ctx context.Context, agent config.Agent) ([]fantasy.AgentTool, error) {
441 var allTools []fantasy.AgentTool
442 if slices.Contains(agent.AllowedTools, AgentToolName) {
443 agentTool, err := c.agentTool(ctx)
444 if err != nil {
445 return nil, err
446 }
447 allTools = append(allTools, agentTool)
448 }
449
450 if slices.Contains(agent.AllowedTools, tools.AgenticFetchToolName) {
451 agenticFetchTool, err := c.agenticFetchTool(ctx, nil)
452 if err != nil {
453 return nil, err
454 }
455 allTools = append(allTools, agenticFetchTool)
456 }
457
458 // Get the model name for the agent
459 modelName := ""
460 if modelCfg, ok := c.cfg.Config().Models[agent.Model]; ok {
461 if model := c.cfg.Config().GetModel(modelCfg.Provider, modelCfg.Model); model != nil {
462 modelName = model.Name
463 }
464 }
465
466 logFile := filepath.Join(c.cfg.Config().Options.DataDirectory, "logs", "crush.log")
467
468 allTools = append(allTools,
469 tools.NewBashTool(c.permissions, c.cfg.WorkingDir(), c.cfg.Config().Options.Attribution, modelName),
470 tools.NewCrushInfoTool(c.cfg, c.lspManager, c.allSkills, c.activeSkills, c.skillTracker),
471 tools.NewCrushLogsTool(logFile),
472 tools.NewJobOutputTool(),
473 tools.NewJobKillTool(),
474 tools.NewDownloadTool(c.permissions, c.cfg.WorkingDir(), nil),
475 tools.NewEditTool(c.lspManager, c.permissions, c.history, c.filetracker, c.cfg.WorkingDir()),
476 tools.NewMultiEditTool(c.lspManager, c.permissions, c.history, c.filetracker, c.cfg.WorkingDir()),
477 tools.NewFetchTool(c.permissions, c.cfg.WorkingDir(), nil),
478 tools.NewGlobTool(c.cfg.WorkingDir()),
479 tools.NewGrepTool(c.cfg.WorkingDir(), c.cfg.Config().Tools.Grep),
480 tools.NewLsTool(c.permissions, c.cfg.WorkingDir(), c.cfg.Config().Tools.Ls),
481 tools.NewSourcegraphTool(nil),
482 tools.NewTodosTool(c.sessions),
483 tools.NewViewTool(c.lspManager, c.permissions, c.filetracker, c.skillTracker, c.cfg.WorkingDir(), c.cfg.Config().Options.SkillsPaths...),
484 tools.NewWriteTool(c.lspManager, c.permissions, c.history, c.filetracker, c.cfg.WorkingDir()),
485 )
486
487 // Add LSP tools if user has configured LSPs or auto_lsp is enabled (nil or true).
488 if len(c.cfg.Config().LSP) > 0 || c.cfg.Config().Options.AutoLSP == nil || *c.cfg.Config().Options.AutoLSP {
489 allTools = append(allTools, tools.NewDiagnosticsTool(c.lspManager), tools.NewReferencesTool(c.lspManager), tools.NewLSPRestartTool(c.lspManager))
490 }
491
492 if len(c.cfg.Config().MCP) > 0 {
493 allTools = append(
494 allTools,
495 tools.NewListMCPResourcesTool(c.cfg, c.permissions),
496 tools.NewReadMCPResourceTool(c.cfg, c.permissions),
497 )
498 }
499
500 var filteredTools []fantasy.AgentTool
501 for _, tool := range allTools {
502 if slices.Contains(agent.AllowedTools, tool.Info().Name) {
503 filteredTools = append(filteredTools, tool)
504 }
505 }
506
507 for _, tool := range tools.GetMCPTools(c.permissions, c.cfg, c.cfg.WorkingDir()) {
508 if agent.AllowedMCP == nil {
509 // No MCP restrictions
510 filteredTools = append(filteredTools, tool)
511 continue
512 }
513 if len(agent.AllowedMCP) == 0 {
514 // No MCPs allowed
515 slog.Debug("No MCPs allowed", "tool", tool.Name(), "agent", agent.Name)
516 break
517 }
518
519 for mcp, tools := range agent.AllowedMCP {
520 if mcp != tool.MCP() {
521 continue
522 }
523 if len(tools) == 0 || slices.Contains(tools, tool.MCPToolName()) {
524 filteredTools = append(filteredTools, tool)
525 break
526 }
527 slog.Debug("MCP not allowed", "tool", tool.Name(), "agent", agent.Name)
528 }
529 }
530 slices.SortFunc(filteredTools, func(a, b fantasy.AgentTool) int {
531 return strings.Compare(a.Info().Name, b.Info().Name)
532 })
533 return filteredTools, nil
534}
535
536// TODO: when we support multiple agents we need to change this so that we pass in the agent specific model config
537func (c *coordinator) buildAgentModels(ctx context.Context, isSubAgent bool) (Model, Model, error) {
538 largeModelCfg, ok := c.cfg.Config().Models[config.SelectedModelTypeLarge]
539 if !ok {
540 return Model{}, Model{}, errLargeModelNotSelected
541 }
542 smallModelCfg, ok := c.cfg.Config().Models[config.SelectedModelTypeSmall]
543 if !ok {
544 return Model{}, Model{}, errSmallModelNotSelected
545 }
546
547 largeProviderCfg, ok := c.cfg.Config().Providers.Get(largeModelCfg.Provider)
548 if !ok {
549 return Model{}, Model{}, errLargeModelProviderNotConfigured
550 }
551
552 largeProvider, err := c.buildProvider(largeProviderCfg, largeModelCfg, isSubAgent)
553 if err != nil {
554 return Model{}, Model{}, err
555 }
556
557 smallProviderCfg, ok := c.cfg.Config().Providers.Get(smallModelCfg.Provider)
558 if !ok {
559 return Model{}, Model{}, errSmallModelProviderNotConfigured
560 }
561
562 smallProvider, err := c.buildProvider(smallProviderCfg, smallModelCfg, true)
563 if err != nil {
564 return Model{}, Model{}, err
565 }
566
567 var largeCatwalkModel *catwalk.Model
568 var smallCatwalkModel *catwalk.Model
569
570 for _, m := range largeProviderCfg.Models {
571 if m.ID == largeModelCfg.Model {
572 largeCatwalkModel = &m
573 }
574 }
575 for _, m := range smallProviderCfg.Models {
576 if m.ID == smallModelCfg.Model {
577 smallCatwalkModel = &m
578 }
579 }
580
581 if largeCatwalkModel == nil {
582 return Model{}, Model{}, errLargeModelNotFound
583 }
584
585 if smallCatwalkModel == nil {
586 return Model{}, Model{}, errSmallModelNotFound
587 }
588
589 largeModelID := largeModelCfg.Model
590 smallModelID := smallModelCfg.Model
591
592 if largeModelCfg.Provider == openrouter.Name && isExactoSupported(largeModelID) {
593 largeModelID += ":exacto"
594 }
595
596 if smallModelCfg.Provider == openrouter.Name && isExactoSupported(smallModelID) {
597 smallModelID += ":exacto"
598 }
599
600 largeModel, err := largeProvider.LanguageModel(ctx, largeModelID)
601 if err != nil {
602 return Model{}, Model{}, err
603 }
604 smallModel, err := smallProvider.LanguageModel(ctx, smallModelID)
605 if err != nil {
606 return Model{}, Model{}, err
607 }
608
609 return Model{
610 Model: largeModel,
611 CatwalkCfg: *largeCatwalkModel,
612 ModelCfg: largeModelCfg,
613 }, Model{
614 Model: smallModel,
615 CatwalkCfg: *smallCatwalkModel,
616 ModelCfg: smallModelCfg,
617 }, nil
618}
619
620func (c *coordinator) buildAnthropicProvider(baseURL, apiKey string, headers map[string]string, providerID string) (fantasy.Provider, error) {
621 var opts []anthropic.Option
622
623 switch {
624 case strings.HasPrefix(apiKey, "Bearer "):
625 // NOTE: Prevent the SDK from picking up the API key from env.
626 os.Setenv("ANTHROPIC_API_KEY", "")
627 headers["Authorization"] = apiKey
628 case providerID == string(catwalk.InferenceProviderMiniMax) || providerID == string(catwalk.InferenceProviderMiniMaxChina):
629 // NOTE: Prevent the SDK from picking up the API key from env.
630 os.Setenv("ANTHROPIC_API_KEY", "")
631 headers["Authorization"] = "Bearer " + apiKey
632 case apiKey != "":
633 // X-Api-Key header
634 opts = append(opts, anthropic.WithAPIKey(apiKey))
635 }
636
637 if len(headers) > 0 {
638 opts = append(opts, anthropic.WithHeaders(headers))
639 }
640
641 if baseURL != "" {
642 opts = append(opts, anthropic.WithBaseURL(baseURL))
643 }
644
645 if c.cfg.Config().Options.Debug {
646 httpClient := log.NewHTTPClient()
647 opts = append(opts, anthropic.WithHTTPClient(httpClient))
648 }
649 return anthropic.New(opts...)
650}
651
652func (c *coordinator) buildOpenaiProvider(baseURL, apiKey string, headers map[string]string) (fantasy.Provider, error) {
653 opts := []openai.Option{
654 openai.WithAPIKey(apiKey),
655 openai.WithUseResponsesAPI(),
656 }
657 if c.cfg.Config().Options.Debug {
658 httpClient := log.NewHTTPClient()
659 opts = append(opts, openai.WithHTTPClient(httpClient))
660 }
661 if len(headers) > 0 {
662 opts = append(opts, openai.WithHeaders(headers))
663 }
664 if baseURL != "" {
665 opts = append(opts, openai.WithBaseURL(baseURL))
666 }
667 return openai.New(opts...)
668}
669
670func (c *coordinator) buildOpenrouterProvider(_, apiKey string, headers map[string]string) (fantasy.Provider, error) {
671 opts := []openrouter.Option{
672 openrouter.WithAPIKey(apiKey),
673 }
674 if c.cfg.Config().Options.Debug {
675 httpClient := log.NewHTTPClient()
676 opts = append(opts, openrouter.WithHTTPClient(httpClient))
677 }
678 if len(headers) > 0 {
679 opts = append(opts, openrouter.WithHeaders(headers))
680 }
681 return openrouter.New(opts...)
682}
683
684func (c *coordinator) buildVercelProvider(_, apiKey string, headers map[string]string) (fantasy.Provider, error) {
685 opts := []vercel.Option{
686 vercel.WithAPIKey(apiKey),
687 }
688 if c.cfg.Config().Options.Debug {
689 httpClient := log.NewHTTPClient()
690 opts = append(opts, vercel.WithHTTPClient(httpClient))
691 }
692 if len(headers) > 0 {
693 opts = append(opts, vercel.WithHeaders(headers))
694 }
695 return vercel.New(opts...)
696}
697
698func (c *coordinator) buildOpenaiCompatProvider(baseURL, apiKey string, headers map[string]string, extraBody map[string]any, providerID string, isSubAgent bool) (fantasy.Provider, error) {
699 opts := []openaicompat.Option{
700 openaicompat.WithBaseURL(baseURL),
701 openaicompat.WithAPIKey(apiKey),
702 }
703
704 // Set HTTP client based on provider and debug mode.
705 var httpClient *http.Client
706 if providerID == string(catwalk.InferenceProviderCopilot) {
707 opts = append(opts, openaicompat.WithUseResponsesAPI())
708 httpClient = copilot.NewClient(isSubAgent, c.cfg.Config().Options.Debug)
709 } else if c.cfg.Config().Options.Debug {
710 httpClient = log.NewHTTPClient()
711 }
712 if httpClient != nil {
713 opts = append(opts, openaicompat.WithHTTPClient(httpClient))
714 }
715
716 if len(headers) > 0 {
717 opts = append(opts, openaicompat.WithHeaders(headers))
718 }
719
720 for extraKey, extraValue := range extraBody {
721 opts = append(opts, openaicompat.WithSDKOptions(openaisdk.WithJSONSet(extraKey, extraValue)))
722 }
723
724 return openaicompat.New(opts...)
725}
726
727func (c *coordinator) buildAzureProvider(baseURL, apiKey string, headers map[string]string, options map[string]string) (fantasy.Provider, error) {
728 opts := []azure.Option{
729 azure.WithBaseURL(baseURL),
730 azure.WithAPIKey(apiKey),
731 azure.WithUseResponsesAPI(),
732 }
733 if c.cfg.Config().Options.Debug {
734 httpClient := log.NewHTTPClient()
735 opts = append(opts, azure.WithHTTPClient(httpClient))
736 }
737 if options == nil {
738 options = make(map[string]string)
739 }
740 if apiVersion, ok := options["apiVersion"]; ok {
741 opts = append(opts, azure.WithAPIVersion(apiVersion))
742 }
743 if len(headers) > 0 {
744 opts = append(opts, azure.WithHeaders(headers))
745 }
746
747 return azure.New(opts...)
748}
749
750func (c *coordinator) buildBedrockProvider(apiKey string, headers map[string]string) (fantasy.Provider, error) {
751 var opts []bedrock.Option
752 if c.cfg.Config().Options.Debug {
753 httpClient := log.NewHTTPClient()
754 opts = append(opts, bedrock.WithHTTPClient(httpClient))
755 }
756 if len(headers) > 0 {
757 opts = append(opts, bedrock.WithHeaders(headers))
758 }
759 switch {
760 case apiKey != "":
761 opts = append(opts, bedrock.WithAPIKey(apiKey))
762 case os.Getenv("AWS_BEARER_TOKEN_BEDROCK") != "":
763 opts = append(opts, bedrock.WithAPIKey(os.Getenv("AWS_BEARER_TOKEN_BEDROCK")))
764 default:
765 // Skip, let the SDK do authentication.
766 }
767 return bedrock.New(opts...)
768}
769
770func (c *coordinator) buildGoogleProvider(baseURL, apiKey string, headers map[string]string) (fantasy.Provider, error) {
771 opts := []google.Option{
772 google.WithBaseURL(baseURL),
773 google.WithGeminiAPIKey(apiKey),
774 }
775 if c.cfg.Config().Options.Debug {
776 httpClient := log.NewHTTPClient()
777 opts = append(opts, google.WithHTTPClient(httpClient))
778 }
779 if len(headers) > 0 {
780 opts = append(opts, google.WithHeaders(headers))
781 }
782 return google.New(opts...)
783}
784
785func (c *coordinator) buildGoogleVertexProvider(headers map[string]string, options map[string]string) (fantasy.Provider, error) {
786 opts := []google.Option{}
787 if c.cfg.Config().Options.Debug {
788 httpClient := log.NewHTTPClient()
789 opts = append(opts, google.WithHTTPClient(httpClient))
790 }
791 if len(headers) > 0 {
792 opts = append(opts, google.WithHeaders(headers))
793 }
794
795 project := options["project"]
796 location := options["location"]
797
798 opts = append(opts, google.WithVertex(project, location))
799
800 return google.New(opts...)
801}
802
803func (c *coordinator) isAnthropicThinking(model config.SelectedModel) bool {
804 if model.Think {
805 return true
806 }
807 opts, err := anthropic.ParseOptions(model.ProviderOptions)
808 return err == nil && opts.Thinking != nil
809}
810
811func (c *coordinator) buildProvider(providerCfg config.ProviderConfig, model config.SelectedModel, isSubAgent bool) (fantasy.Provider, error) {
812 headers := maps.Clone(providerCfg.ExtraHeaders)
813 if headers == nil {
814 headers = make(map[string]string)
815 }
816
817 // handle special headers for anthropic
818 if providerCfg.Type == anthropic.Name && c.isAnthropicThinking(model) {
819 if v, ok := headers["anthropic-beta"]; ok {
820 headers["anthropic-beta"] = v + ",interleaved-thinking-2025-05-14"
821 } else {
822 headers["anthropic-beta"] = "interleaved-thinking-2025-05-14"
823 }
824 }
825
826 apiKey, _ := c.cfg.Resolve(providerCfg.APIKey)
827 baseURL, _ := c.cfg.Resolve(providerCfg.BaseURL)
828
829 switch providerCfg.Type {
830 case openai.Name:
831 return c.buildOpenaiProvider(baseURL, apiKey, headers)
832 case anthropic.Name:
833 return c.buildAnthropicProvider(baseURL, apiKey, headers, providerCfg.ID)
834 case openrouter.Name:
835 return c.buildOpenrouterProvider(baseURL, apiKey, headers)
836 case vercel.Name:
837 return c.buildVercelProvider(baseURL, apiKey, headers)
838 case azure.Name:
839 return c.buildAzureProvider(baseURL, apiKey, headers, providerCfg.ExtraParams)
840 case bedrock.Name:
841 return c.buildBedrockProvider(apiKey, headers)
842 case google.Name:
843 return c.buildGoogleProvider(baseURL, apiKey, headers)
844 case "google-vertex":
845 return c.buildGoogleVertexProvider(headers, providerCfg.ExtraParams)
846 case openaicompat.Name, hyper.Name:
847 switch providerCfg.ID {
848 case hyper.Name:
849 baseURL = hyper.BaseURL() + "/v1"
850 headers["x-crush-id"] = event.GetID()
851 case string(catwalk.InferenceProviderZAI):
852 if providerCfg.ExtraBody == nil {
853 providerCfg.ExtraBody = map[string]any{}
854 }
855 providerCfg.ExtraBody["tool_stream"] = true
856 }
857 return c.buildOpenaiCompatProvider(baseURL, apiKey, headers, providerCfg.ExtraBody, providerCfg.ID, isSubAgent)
858 default:
859 return nil, fmt.Errorf("provider type not supported: %q", providerCfg.Type)
860 }
861}
862
863func isExactoSupported(modelID string) bool {
864 supportedModels := []string{
865 "moonshotai/kimi-k2-0905",
866 "deepseek/deepseek-v3.1-terminus",
867 "z-ai/glm-4.6",
868 "openai/gpt-oss-120b",
869 "qwen/qwen3-coder",
870 }
871 return slices.Contains(supportedModels, modelID)
872}
873
874func (c *coordinator) Cancel(sessionID string) {
875 c.currentAgent.Cancel(sessionID)
876}
877
878func (c *coordinator) CancelAll() {
879 c.currentAgent.CancelAll()
880}
881
882func (c *coordinator) ClearQueue(sessionID string) {
883 c.currentAgent.ClearQueue(sessionID)
884}
885
886func (c *coordinator) IsBusy() bool {
887 return c.currentAgent.IsBusy()
888}
889
890func (c *coordinator) IsSessionBusy(sessionID string) bool {
891 return c.currentAgent.IsSessionBusy(sessionID)
892}
893
894func (c *coordinator) Model() Model {
895 return c.currentAgent.Model()
896}
897
898func (c *coordinator) UpdateModels(ctx context.Context) error {
899 // build the models again so we make sure we get the latest config
900 large, small, err := c.buildAgentModels(ctx, false)
901 if err != nil {
902 return err
903 }
904 c.currentAgent.SetModels(large, small)
905
906 agentCfg, ok := c.cfg.Config().Agents[config.AgentCoder]
907 if !ok {
908 return errCoderAgentNotConfigured
909 }
910
911 tools, err := c.buildTools(ctx, agentCfg)
912 if err != nil {
913 return err
914 }
915 c.currentAgent.SetTools(tools)
916 return nil
917}
918
919func (c *coordinator) QueuedPrompts(sessionID string) int {
920 return c.currentAgent.QueuedPrompts(sessionID)
921}
922
923func (c *coordinator) QueuedPromptsList(sessionID string) []string {
924 return c.currentAgent.QueuedPromptsList(sessionID)
925}
926
927func (c *coordinator) Summarize(ctx context.Context, sessionID string) error {
928 providerCfg, ok := c.cfg.Config().Providers.Get(c.currentAgent.Model().ModelCfg.Provider)
929 if !ok {
930 return errModelProviderNotConfigured
931 }
932 return c.currentAgent.Summarize(ctx, sessionID, getProviderOptions(c.currentAgent.Model(), providerCfg))
933}
934
935func (c *coordinator) isUnauthorized(err error) bool {
936 var providerErr *fantasy.ProviderError
937 return errors.As(err, &providerErr) && providerErr.StatusCode == http.StatusUnauthorized
938}
939
940func (c *coordinator) refreshOAuth2Token(ctx context.Context, providerCfg config.ProviderConfig) error {
941 if err := c.cfg.RefreshOAuthToken(ctx, config.ScopeGlobal, providerCfg.ID); err != nil {
942 slog.Error("Failed to refresh OAuth token after 401 error", "provider", providerCfg.ID, "error", err)
943 return err
944 }
945 if err := c.UpdateModels(ctx); err != nil {
946 return err
947 }
948 return nil
949}
950
951func (c *coordinator) refreshApiKeyTemplate(ctx context.Context, providerCfg config.ProviderConfig) error {
952 newAPIKey, err := c.cfg.Resolve(providerCfg.APIKeyTemplate)
953 if err != nil {
954 slog.Error("Failed to re-resolve API key after 401 error", "provider", providerCfg.ID, "error", err)
955 return err
956 }
957
958 providerCfg.APIKey = newAPIKey
959 c.cfg.Config().Providers.Set(providerCfg.ID, providerCfg)
960
961 if err := c.UpdateModels(ctx); err != nil {
962 return err
963 }
964 return nil
965}
966
967// subAgentParams holds the parameters for running a sub-agent.
968type subAgentParams struct {
969 Agent SessionAgent
970 SessionID string
971 AgentMessageID string
972 ToolCallID string
973 Prompt string
974 SessionTitle string
975 // SessionSetup is an optional callback invoked after session creation
976 // but before agent execution, for custom session configuration.
977 SessionSetup func(sessionID string)
978}
979
980// runSubAgent runs a sub-agent and handles session management and cost accumulation.
981// It creates a sub-session, runs the agent with the given prompt, and propagates
982// the cost to the parent session.
983func (c *coordinator) runSubAgent(ctx context.Context, params subAgentParams) (fantasy.ToolResponse, error) {
984 // Create sub-session
985 agentToolSessionID := c.sessions.CreateAgentToolSessionID(params.AgentMessageID, params.ToolCallID)
986 session, err := c.sessions.CreateTaskSession(ctx, agentToolSessionID, params.SessionID, params.SessionTitle)
987 if err != nil {
988 return fantasy.ToolResponse{}, fmt.Errorf("create session: %w", err)
989 }
990
991 // Call session setup function if provided
992 if params.SessionSetup != nil {
993 params.SessionSetup(session.ID)
994 }
995
996 // Get model configuration
997 model := params.Agent.Model()
998 maxTokens := model.CatwalkCfg.DefaultMaxTokens
999 if model.ModelCfg.MaxTokens != 0 {
1000 maxTokens = model.ModelCfg.MaxTokens
1001 }
1002
1003 providerCfg, ok := c.cfg.Config().Providers.Get(model.ModelCfg.Provider)
1004 if !ok {
1005 return fantasy.ToolResponse{}, errModelProviderNotConfigured
1006 }
1007
1008 // Run the agent
1009 result, err := params.Agent.Run(ctx, SessionAgentCall{
1010 SessionID: session.ID,
1011 Prompt: params.Prompt,
1012 MaxOutputTokens: maxTokens,
1013 ProviderOptions: getProviderOptions(model, providerCfg),
1014 Temperature: model.ModelCfg.Temperature,
1015 TopP: model.ModelCfg.TopP,
1016 TopK: model.ModelCfg.TopK,
1017 FrequencyPenalty: model.ModelCfg.FrequencyPenalty,
1018 PresencePenalty: model.ModelCfg.PresencePenalty,
1019 NonInteractive: true,
1020 })
1021 if err != nil {
1022 return fantasy.NewTextErrorResponse("error generating response"), nil
1023 }
1024
1025 // Update parent session cost
1026 if err := c.updateParentSessionCost(ctx, session.ID, params.SessionID); err != nil {
1027 return fantasy.ToolResponse{}, err
1028 }
1029
1030 return fantasy.NewTextResponse(result.Response.Content.Text()), nil
1031}
1032
1033// updateParentSessionCost accumulates the cost from a child session to its parent session.
1034func (c *coordinator) updateParentSessionCost(ctx context.Context, childSessionID, parentSessionID string) error {
1035 childSession, err := c.sessions.Get(ctx, childSessionID)
1036 if err != nil {
1037 return fmt.Errorf("get child session: %w", err)
1038 }
1039
1040 parentSession, err := c.sessions.Get(ctx, parentSessionID)
1041 if err != nil {
1042 return fmt.Errorf("get parent session: %w", err)
1043 }
1044
1045 parentSession.Cost += childSession.Cost
1046
1047 if _, err := c.sessions.Save(ctx, parentSession); err != nil {
1048 return fmt.Errorf("save parent session: %w", err)
1049 }
1050
1051 return nil
1052}
1053
1054// discoverSkills runs the skill discovery pipeline and returns both the
1055// pre-filter (all discovered, after dedup) and post-filter (active) lists.
1056// It also emits a single diagnostic log line summarising the outcome to
1057// help track skill-loading health over time.
1058func discoverSkills(cfg *config.ConfigStore) (allSkills, activeSkills []*skills.Skill) {
1059 builtin, builtinStates := skills.DiscoverBuiltinWithStates()
1060 discovered := append([]*skills.Skill(nil), builtin...)
1061
1062 var userStates []*skills.SkillState
1063 var userPaths []string
1064
1065 opts := cfg.Config().Options
1066 if opts != nil && len(opts.SkillsPaths) > 0 {
1067 userPaths = make([]string, 0, len(opts.SkillsPaths))
1068 for _, pth := range opts.SkillsPaths {
1069 expanded := home.Long(pth)
1070 if strings.HasPrefix(expanded, "$") {
1071 if resolved, err := cfg.Resolver().ResolveValue(expanded); err == nil {
1072 expanded = resolved
1073 }
1074 }
1075 userPaths = append(userPaths, expanded)
1076 }
1077 var userSkills []*skills.Skill
1078 userSkills, userStates = skills.DiscoverWithStates(userPaths)
1079 discovered = append(discovered, userSkills...)
1080 }
1081
1082 allSkills = skills.Deduplicate(discovered)
1083 var disabledSkills []string
1084 if opts != nil {
1085 disabledSkills = opts.DisabledSkills
1086 }
1087 activeSkills = skills.Filter(allSkills, disabledSkills)
1088
1089 logDiscoveryStats(builtin, builtinStates, userStates, userPaths, allSkills, activeSkills, disabledSkills)
1090 return allSkills, activeSkills
1091}
1092
1093// logTurnSkillUsage emits a per-turn diagnostic line showing which skills
1094// (if any) were loaded during this turn and which looked relevant based on
1095// a cheap keyword match against the user prompt. The goal is to surface
1096// "should-have-loaded but didn't" situations for later analysis.
1097//
1098// Logged at Info level under component=skills; heavy fields are elided when
1099// there is nothing interesting to report.
1100func logTurnSkillUsage(
1101 sessionID string,
1102 prompt string,
1103 activeSkills []*skills.Skill,
1104 tracker *skills.Tracker,
1105 before []string,
1106) {
1107 if tracker == nil || len(activeSkills) == 0 {
1108 return
1109 }
1110
1111 after := tracker.LoadedNames()
1112
1113 beforeSet := make(map[string]bool, len(before))
1114 for _, n := range before {
1115 beforeSet[n] = true
1116 }
1117 var loadedThisTurn []string
1118 for _, n := range after {
1119 if !beforeSet[n] {
1120 loadedThisTurn = append(loadedThisTurn, n)
1121 }
1122 }
1123
1124 slog.Info("Skill turn summary",
1125 "component", "skills",
1126 "session_id", sessionID,
1127 "prompt_len", len(prompt),
1128 "active_total", len(activeSkills),
1129 "loaded_total", len(after),
1130 "loaded_this_turn", loadedThisTurn,
1131 )
1132}
1133
1134// logDiscoveryStats emits a single structured log line summarising skill
1135// discovery for the current session. It is intentionally low-volume: one
1136// line per session start.
1137func logDiscoveryStats(
1138 builtin []*skills.Skill,
1139 builtinStates, userStates []*skills.SkillState,
1140 userPaths []string,
1141 allSkills, activeSkills []*skills.Skill,
1142 disabled []string,
1143) {
1144 countErrors := func(states []*skills.SkillState) int {
1145 n := 0
1146 for _, s := range states {
1147 if s.State == skills.StateError {
1148 n++
1149 }
1150 }
1151 return n
1152 }
1153
1154 userOK := 0
1155 for _, s := range userStates {
1156 if s.State == skills.StateNormal {
1157 userOK++
1158 }
1159 }
1160
1161 activeNames := make([]string, 0, len(activeSkills))
1162 for _, s := range activeSkills {
1163 activeNames = append(activeNames, s.Name)
1164 }
1165
1166 xml := skills.ToPromptXML(activeSkills)
1167
1168 slog.Info("Skill discovery complete",
1169 "component", "skills",
1170 "builtin_ok", len(builtin),
1171 "builtin_errors", countErrors(builtinStates),
1172 "user_ok", userOK,
1173 "user_errors", countErrors(userStates),
1174 "user_paths", len(userPaths),
1175 "deduped_total", len(allSkills),
1176 "active", len(activeSkills),
1177 "disabled", len(disabled),
1178 "prompt_bytes", len(xml),
1179 "prompt_tok_est", skills.ApproxTokenCount(xml),
1180 "active_names", activeNames,
1181 )
1182}