1package agent
2
3import (
4 "bytes"
5 "cmp"
6 "context"
7 "encoding/json"
8 "errors"
9 "fmt"
10 "io"
11 "log/slog"
12 "maps"
13 "net/http"
14 "os"
15 "path/filepath"
16 "slices"
17 "strings"
18
19 "charm.land/catwalk/pkg/catwalk"
20 "charm.land/fantasy"
21 "github.com/charmbracelet/crush/internal/agent/hyper"
22 "github.com/charmbracelet/crush/internal/agent/notify"
23 "github.com/charmbracelet/crush/internal/agent/prompt"
24 "github.com/charmbracelet/crush/internal/agent/tools"
25 "github.com/charmbracelet/crush/internal/config"
26 "github.com/charmbracelet/crush/internal/event"
27 "github.com/charmbracelet/crush/internal/filetracker"
28 "github.com/charmbracelet/crush/internal/history"
29 "github.com/charmbracelet/crush/internal/home"
30 "github.com/charmbracelet/crush/internal/log"
31 "github.com/charmbracelet/crush/internal/lsp"
32 "github.com/charmbracelet/crush/internal/message"
33 "github.com/charmbracelet/crush/internal/oauth/copilot"
34 "github.com/charmbracelet/crush/internal/permission"
35 "github.com/charmbracelet/crush/internal/pubsub"
36 "github.com/charmbracelet/crush/internal/session"
37 "github.com/charmbracelet/crush/internal/skills"
38 "golang.org/x/sync/errgroup"
39
40 "charm.land/fantasy/providers/anthropic"
41 "charm.land/fantasy/providers/azure"
42 "charm.land/fantasy/providers/bedrock"
43 "charm.land/fantasy/providers/google"
44 "charm.land/fantasy/providers/openai"
45 "charm.land/fantasy/providers/openaicompat"
46 "charm.land/fantasy/providers/openrouter"
47 "charm.land/fantasy/providers/vercel"
48 openaisdk "github.com/charmbracelet/openai-go/option"
49 "github.com/qjebbs/go-jsons"
50)
51
52// Coordinator errors.
53var (
54 errCoderAgentNotConfigured = errors.New("coder agent not configured")
55 errModelProviderNotConfigured = errors.New("model provider not configured")
56 errLargeModelNotSelected = errors.New("large model not selected")
57 errSmallModelNotSelected = errors.New("small model not selected")
58 errLargeModelProviderNotConfigured = errors.New("large model provider not configured")
59 errSmallModelProviderNotConfigured = errors.New("small model provider not configured")
60 errLargeModelNotFound = errors.New("large model not found in provider config")
61 errSmallModelNotFound = errors.New("small model not found in provider config")
62)
63
64type Coordinator interface {
65 // INFO: (kujtim) this is not used yet we will use this when we have multiple agents
66 // SetMainAgent(string)
67 Run(ctx context.Context, sessionID, prompt string, attachments ...message.Attachment) (*fantasy.AgentResult, error)
68 Cancel(sessionID string)
69 CancelAll()
70 IsSessionBusy(sessionID string) bool
71 IsBusy() bool
72 QueuedPrompts(sessionID string) int
73 QueuedPromptsList(sessionID string) []string
74 ClearQueue(sessionID string)
75 Summarize(context.Context, string) error
76 Model() Model
77 UpdateModels(ctx context.Context) error
78}
79
80type coordinator struct {
81 cfg *config.ConfigStore
82 sessions session.Service
83 messages message.Service
84 permissions permission.Service
85 history history.Service
86 filetracker filetracker.Service
87 lspManager *lsp.Manager
88 notify pubsub.Publisher[notify.Notification]
89
90 currentAgent SessionAgent
91 agents map[string]SessionAgent
92
93 // Skills discovery results (session-start snapshot).
94 allSkills []*skills.Skill // Pre-filter: all discovered after dedup.
95 activeSkills []*skills.Skill // Post-filter: active skills only.
96 skillTracker *skills.Tracker
97
98 readyWg errgroup.Group
99}
100
101func NewCoordinator(
102 ctx context.Context,
103 cfg *config.ConfigStore,
104 sessions session.Service,
105 messages message.Service,
106 permissions permission.Service,
107 history history.Service,
108 filetracker filetracker.Service,
109 lspManager *lsp.Manager,
110 notify pubsub.Publisher[notify.Notification],
111) (Coordinator, error) {
112 // Discover skills once at session start.
113 allSkills, activeSkills := discoverSkills(cfg)
114 skillTracker := skills.NewTracker(activeSkills)
115
116 c := &coordinator{
117 cfg: cfg,
118 sessions: sessions,
119 messages: messages,
120 permissions: permissions,
121 history: history,
122 filetracker: filetracker,
123 lspManager: lspManager,
124 notify: notify,
125 agents: make(map[string]SessionAgent),
126 allSkills: allSkills,
127 activeSkills: activeSkills,
128 skillTracker: skillTracker,
129 }
130
131 agentCfg, ok := cfg.Config().Agents[config.AgentCoder]
132 if !ok {
133 return nil, errCoderAgentNotConfigured
134 }
135
136 // TODO: make this dynamic when we support multiple agents
137 prompt, err := coderPrompt(prompt.WithWorkingDir(c.cfg.WorkingDir()))
138 if err != nil {
139 return nil, err
140 }
141
142 agent, err := c.buildAgent(ctx, prompt, agentCfg, false)
143 if err != nil {
144 return nil, err
145 }
146 c.currentAgent = agent
147 c.agents[config.AgentCoder] = agent
148 return c, nil
149}
150
151// Run implements Coordinator.
152func (c *coordinator) Run(ctx context.Context, sessionID string, prompt string, attachments ...message.Attachment) (*fantasy.AgentResult, error) {
153 if err := c.readyWg.Wait(); err != nil {
154 return nil, err
155 }
156
157 // refresh models before each run
158 if err := c.UpdateModels(ctx); err != nil {
159 return nil, fmt.Errorf("failed to update models: %w", err)
160 }
161
162 model := c.currentAgent.Model()
163 maxTokens := model.CatwalkCfg.DefaultMaxTokens
164 if model.ModelCfg.MaxTokens != 0 {
165 maxTokens = model.ModelCfg.MaxTokens
166 }
167
168 if !model.CatwalkCfg.SupportsImages && attachments != nil {
169 // filter out image attachments
170 filteredAttachments := make([]message.Attachment, 0, len(attachments))
171 for _, att := range attachments {
172 if att.IsText() {
173 filteredAttachments = append(filteredAttachments, att)
174 }
175 }
176 attachments = filteredAttachments
177 }
178
179 providerCfg, ok := c.cfg.Config().Providers.Get(model.ModelCfg.Provider)
180 if !ok {
181 return nil, errModelProviderNotConfigured
182 }
183
184 mergedOptions, temp, topP, topK, freqPenalty, presPenalty := mergeCallOptions(model, providerCfg)
185
186 if providerCfg.OAuthToken != nil && providerCfg.OAuthToken.IsExpired() {
187 slog.Debug("Token needs to be refreshed", "provider", providerCfg.ID)
188 if err := c.refreshOAuth2Token(ctx, providerCfg); err != nil {
189 // NOTE(@andreynering): We don't return here because the event handling to ask the user to reauthenticate
190 // depends on the flow below. If refresh fails, proceed with the token we have.
191 slog.Error("Failed to refresh OAuth2 token. Proceeding with existing token.", "error", err)
192 }
193 }
194
195 run := func() (*fantasy.AgentResult, error) {
196 return c.currentAgent.Run(ctx, SessionAgentCall{
197 SessionID: sessionID,
198 Prompt: prompt,
199 Attachments: attachments,
200 MaxOutputTokens: maxTokens,
201 ProviderOptions: mergedOptions,
202 Temperature: temp,
203 TopP: topP,
204 TopK: topK,
205 FrequencyPenalty: freqPenalty,
206 PresencePenalty: presPenalty,
207 })
208 }
209 beforeLoaded := c.skillTracker.LoadedNames()
210 result, originalErr := run()
211 logTurnSkillUsage(sessionID, prompt, c.activeSkills, c.skillTracker, beforeLoaded)
212
213 if c.isUnauthorized(originalErr) {
214 switch {
215 case providerCfg.OAuthToken != nil:
216 slog.Debug("Received 401. Refreshing token and retrying", "provider", providerCfg.ID)
217 if err := c.refreshOAuth2Token(ctx, providerCfg); err != nil {
218 return nil, originalErr
219 }
220 slog.Debug("Retrying request with refreshed OAuth token", "provider", providerCfg.ID)
221 return run()
222 case strings.Contains(providerCfg.APIKeyTemplate, "$"):
223 slog.Debug("Received 401. Refreshing API Key template and retrying", "provider", providerCfg.ID)
224 if err := c.refreshApiKeyTemplate(ctx, providerCfg); err != nil {
225 return nil, originalErr
226 }
227 slog.Debug("Retrying request with refreshed API key", "provider", providerCfg.ID)
228 return run()
229 }
230 }
231
232 return result, originalErr
233}
234
235func getProviderOptions(model Model, providerCfg config.ProviderConfig) fantasy.ProviderOptions {
236 options := fantasy.ProviderOptions{}
237
238 cfgOpts := []byte("{}")
239 providerCfgOpts := []byte("{}")
240 catwalkOpts := []byte("{}")
241
242 if model.ModelCfg.ProviderOptions != nil {
243 data, err := json.Marshal(model.ModelCfg.ProviderOptions)
244 if err == nil {
245 cfgOpts = data
246 }
247 }
248
249 if providerCfg.ProviderOptions != nil {
250 data, err := json.Marshal(providerCfg.ProviderOptions)
251 if err == nil {
252 providerCfgOpts = data
253 }
254 }
255
256 if model.CatwalkCfg.Options.ProviderOptions != nil {
257 data, err := json.Marshal(model.CatwalkCfg.Options.ProviderOptions)
258 if err == nil {
259 catwalkOpts = data
260 }
261 }
262
263 readers := []io.Reader{
264 bytes.NewReader(catwalkOpts),
265 bytes.NewReader(providerCfgOpts),
266 bytes.NewReader(cfgOpts),
267 }
268
269 got, err := jsons.Merge(readers)
270 if err != nil {
271 slog.Error("Could not merge call config", "err", err)
272 return options
273 }
274
275 mergedOptions := make(map[string]any)
276
277 err = json.Unmarshal([]byte(got), &mergedOptions)
278 if err != nil {
279 slog.Error("Could not create config for call", "err", err)
280 return options
281 }
282
283 providerType := providerCfg.Type
284 if providerType == "hyper" {
285 if strings.Contains(model.CatwalkCfg.ID, "claude") {
286 providerType = anthropic.Name
287 } else if strings.Contains(model.CatwalkCfg.ID, "gpt") {
288 providerType = openai.Name
289 } else if strings.Contains(model.CatwalkCfg.ID, "gemini") {
290 providerType = google.Name
291 } else {
292 providerType = openaicompat.Name
293 }
294 }
295
296 switch providerType {
297 case openai.Name, azure.Name:
298 _, hasReasoningEffort := mergedOptions["reasoning_effort"]
299 if !hasReasoningEffort && model.ModelCfg.ReasoningEffort != "" {
300 mergedOptions["reasoning_effort"] = model.ModelCfg.ReasoningEffort
301 }
302 if openai.IsResponsesModel(model.CatwalkCfg.ID) {
303 if openai.IsResponsesReasoningModel(model.CatwalkCfg.ID) {
304 mergedOptions["reasoning_summary"] = "auto"
305 mergedOptions["include"] = []openai.IncludeType{openai.IncludeReasoningEncryptedContent}
306 }
307 parsed, err := openai.ParseResponsesOptions(mergedOptions)
308 if err == nil {
309 options[openai.Name] = parsed
310 }
311 } else {
312 parsed, err := openai.ParseOptions(mergedOptions)
313 if err == nil {
314 options[openai.Name] = parsed
315 }
316 }
317 case anthropic.Name:
318 var (
319 _, hasEffort = mergedOptions["effort"]
320 _, hasThink = mergedOptions["thinking"]
321 )
322 switch {
323 case !hasEffort && model.ModelCfg.ReasoningEffort != "":
324 mergedOptions["effort"] = model.ModelCfg.ReasoningEffort
325 case !hasThink && model.ModelCfg.Think:
326 mergedOptions["thinking"] = map[string]any{"budget_tokens": 2000}
327 }
328 parsed, err := anthropic.ParseOptions(mergedOptions)
329 if err == nil {
330 options[anthropic.Name] = parsed
331 }
332
333 case openrouter.Name:
334 _, hasReasoning := mergedOptions["reasoning"]
335 if !hasReasoning && model.ModelCfg.ReasoningEffort != "" {
336 mergedOptions["reasoning"] = map[string]any{
337 "enabled": true,
338 "effort": model.ModelCfg.ReasoningEffort,
339 }
340 }
341 parsed, err := openrouter.ParseOptions(mergedOptions)
342 if err == nil {
343 options[openrouter.Name] = parsed
344 }
345 case vercel.Name:
346 _, hasReasoning := mergedOptions["reasoning"]
347 if !hasReasoning && model.ModelCfg.ReasoningEffort != "" {
348 mergedOptions["reasoning"] = map[string]any{
349 "enabled": true,
350 "effort": model.ModelCfg.ReasoningEffort,
351 }
352 }
353 parsed, err := vercel.ParseOptions(mergedOptions)
354 if err == nil {
355 options[vercel.Name] = parsed
356 }
357 case google.Name:
358 _, hasReasoning := mergedOptions["thinking_config"]
359 if !hasReasoning {
360 if strings.HasPrefix(model.CatwalkCfg.ID, "gemini-2") {
361 mergedOptions["thinking_config"] = map[string]any{
362 "thinking_budget": 2000,
363 "include_thoughts": true,
364 }
365 } else {
366 mergedOptions["thinking_config"] = map[string]any{
367 "thinking_level": model.ModelCfg.ReasoningEffort,
368 "include_thoughts": true,
369 }
370 }
371 }
372 parsed, err := google.ParseOptions(mergedOptions)
373 if err == nil {
374 options[google.Name] = parsed
375 }
376 case openaicompat.Name:
377 _, hasReasoningEffort := mergedOptions["reasoning_effort"]
378 if !hasReasoningEffort && model.ModelCfg.ReasoningEffort != "" {
379 mergedOptions["reasoning_effort"] = model.ModelCfg.ReasoningEffort
380 }
381 parsed, err := openaicompat.ParseOptions(mergedOptions)
382 if err == nil {
383 options[openaicompat.Name] = parsed
384 }
385 }
386
387 return options
388}
389
390func mergeCallOptions(model Model, cfg config.ProviderConfig) (fantasy.ProviderOptions, *float64, *float64, *int64, *float64, *float64) {
391 modelOptions := getProviderOptions(model, cfg)
392 temp := cmp.Or(model.ModelCfg.Temperature, model.CatwalkCfg.Options.Temperature)
393 topP := cmp.Or(model.ModelCfg.TopP, model.CatwalkCfg.Options.TopP)
394 topK := cmp.Or(model.ModelCfg.TopK, model.CatwalkCfg.Options.TopK)
395 freqPenalty := cmp.Or(model.ModelCfg.FrequencyPenalty, model.CatwalkCfg.Options.FrequencyPenalty)
396 presPenalty := cmp.Or(model.ModelCfg.PresencePenalty, model.CatwalkCfg.Options.PresencePenalty)
397 return modelOptions, temp, topP, topK, freqPenalty, presPenalty
398}
399
400func (c *coordinator) buildAgent(ctx context.Context, prompt *prompt.Prompt, agent config.Agent, isSubAgent bool) (SessionAgent, error) {
401 large, small, err := c.buildAgentModels(ctx, isSubAgent)
402 if err != nil {
403 return nil, err
404 }
405
406 largeProviderCfg, _ := c.cfg.Config().Providers.Get(large.ModelCfg.Provider)
407 result := NewSessionAgent(SessionAgentOptions{
408 LargeModel: large,
409 SmallModel: small,
410 SystemPromptPrefix: largeProviderCfg.SystemPromptPrefix,
411 SystemPrompt: "",
412 IsSubAgent: isSubAgent,
413 DisableAutoSummarize: c.cfg.Config().Options.DisableAutoSummarize,
414 IsYolo: c.permissions.SkipRequests(),
415 Sessions: c.sessions,
416 Messages: c.messages,
417 Tools: nil,
418 Notify: c.notify,
419 })
420
421 c.readyWg.Go(func() error {
422 systemPrompt, err := prompt.Build(ctx, large.Model.Provider(), large.Model.Model(), c.cfg)
423 if err != nil {
424 return err
425 }
426 result.SetSystemPrompt(systemPrompt)
427 return nil
428 })
429
430 c.readyWg.Go(func() error {
431 tools, err := c.buildTools(ctx, agent)
432 if err != nil {
433 return err
434 }
435 result.SetTools(tools)
436 return nil
437 })
438
439 return result, nil
440}
441
442func (c *coordinator) buildTools(ctx context.Context, agent config.Agent) ([]fantasy.AgentTool, error) {
443 var allTools []fantasy.AgentTool
444 if slices.Contains(agent.AllowedTools, AgentToolName) {
445 agentTool, err := c.agentTool(ctx)
446 if err != nil {
447 return nil, err
448 }
449 allTools = append(allTools, agentTool)
450 }
451
452 if slices.Contains(agent.AllowedTools, tools.AgenticFetchToolName) {
453 agenticFetchTool, err := c.agenticFetchTool(ctx, nil)
454 if err != nil {
455 return nil, err
456 }
457 allTools = append(allTools, agenticFetchTool)
458 }
459
460 // Get the model name for the agent
461 modelName := ""
462 if modelCfg, ok := c.cfg.Config().Models[agent.Model]; ok {
463 if model := c.cfg.Config().GetModel(modelCfg.Provider, modelCfg.Model); model != nil {
464 modelName = model.Name
465 }
466 }
467
468 logFile := filepath.Join(c.cfg.Config().Options.DataDirectory, "logs", "crush.log")
469
470 allTools = append(allTools,
471 tools.NewBashTool(c.permissions, c.cfg.WorkingDir(), c.cfg.Config().Options.Attribution, modelName),
472 tools.NewCrushInfoTool(c.cfg, c.lspManager, c.allSkills, c.activeSkills, c.skillTracker),
473 tools.NewCrushLogsTool(logFile),
474 tools.NewJobOutputTool(),
475 tools.NewJobKillTool(),
476 tools.NewDownloadTool(c.permissions, c.cfg.WorkingDir(), nil),
477 tools.NewEditTool(c.lspManager, c.permissions, c.history, c.filetracker, c.cfg.WorkingDir()),
478 tools.NewMultiEditTool(c.lspManager, c.permissions, c.history, c.filetracker, c.cfg.WorkingDir()),
479 tools.NewFetchTool(c.permissions, c.cfg.WorkingDir(), nil),
480 tools.NewGlobTool(c.cfg.WorkingDir()),
481 tools.NewGrepTool(c.cfg.WorkingDir(), c.cfg.Config().Tools.Grep),
482 tools.NewLsTool(c.permissions, c.cfg.WorkingDir(), c.cfg.Config().Tools.Ls),
483 tools.NewSourcegraphTool(nil),
484 tools.NewTodosTool(c.sessions),
485 tools.NewViewTool(c.lspManager, c.permissions, c.filetracker, c.skillTracker, c.cfg.WorkingDir(), c.cfg.Config().Options.SkillsPaths...),
486 tools.NewWriteTool(c.lspManager, c.permissions, c.history, c.filetracker, c.cfg.WorkingDir()),
487 )
488
489 // Add LSP tools if user has configured LSPs or auto_lsp is enabled (nil or true).
490 if len(c.cfg.Config().LSP) > 0 || c.cfg.Config().Options.AutoLSP == nil || *c.cfg.Config().Options.AutoLSP {
491 allTools = append(allTools, tools.NewDiagnosticsTool(c.lspManager), tools.NewReferencesTool(c.lspManager), tools.NewLSPRestartTool(c.lspManager))
492 }
493
494 if len(c.cfg.Config().MCP) > 0 {
495 allTools = append(
496 allTools,
497 tools.NewListMCPResourcesTool(c.cfg, c.permissions),
498 tools.NewReadMCPResourceTool(c.cfg, c.permissions),
499 )
500 }
501
502 var filteredTools []fantasy.AgentTool
503 for _, tool := range allTools {
504 if slices.Contains(agent.AllowedTools, tool.Info().Name) {
505 filteredTools = append(filteredTools, tool)
506 }
507 }
508
509 for _, tool := range tools.GetMCPTools(c.permissions, c.cfg, c.cfg.WorkingDir()) {
510 if agent.AllowedMCP == nil {
511 // No MCP restrictions
512 filteredTools = append(filteredTools, tool)
513 continue
514 }
515 if len(agent.AllowedMCP) == 0 {
516 // No MCPs allowed
517 slog.Debug("No MCPs allowed", "tool", tool.Name(), "agent", agent.Name)
518 break
519 }
520
521 for mcp, tools := range agent.AllowedMCP {
522 if mcp != tool.MCP() {
523 continue
524 }
525 if len(tools) == 0 || slices.Contains(tools, tool.MCPToolName()) {
526 filteredTools = append(filteredTools, tool)
527 break
528 }
529 slog.Debug("MCP not allowed", "tool", tool.Name(), "agent", agent.Name)
530 }
531 }
532 slices.SortFunc(filteredTools, func(a, b fantasy.AgentTool) int {
533 return strings.Compare(a.Info().Name, b.Info().Name)
534 })
535 return filteredTools, nil
536}
537
538// TODO: when we support multiple agents we need to change this so that we pass in the agent specific model config
539func (c *coordinator) buildAgentModels(ctx context.Context, isSubAgent bool) (Model, Model, error) {
540 largeModelCfg, ok := c.cfg.Config().Models[config.SelectedModelTypeLarge]
541 if !ok {
542 return Model{}, Model{}, errLargeModelNotSelected
543 }
544 smallModelCfg, ok := c.cfg.Config().Models[config.SelectedModelTypeSmall]
545 if !ok {
546 return Model{}, Model{}, errSmallModelNotSelected
547 }
548
549 largeProviderCfg, ok := c.cfg.Config().Providers.Get(largeModelCfg.Provider)
550 if !ok {
551 return Model{}, Model{}, errLargeModelProviderNotConfigured
552 }
553
554 largeProvider, err := c.buildProvider(largeProviderCfg, largeModelCfg, isSubAgent)
555 if err != nil {
556 return Model{}, Model{}, err
557 }
558
559 smallProviderCfg, ok := c.cfg.Config().Providers.Get(smallModelCfg.Provider)
560 if !ok {
561 return Model{}, Model{}, errSmallModelProviderNotConfigured
562 }
563
564 smallProvider, err := c.buildProvider(smallProviderCfg, smallModelCfg, true)
565 if err != nil {
566 return Model{}, Model{}, err
567 }
568
569 var largeCatwalkModel *catwalk.Model
570 var smallCatwalkModel *catwalk.Model
571
572 for _, m := range largeProviderCfg.Models {
573 if m.ID == largeModelCfg.Model {
574 largeCatwalkModel = &m
575 }
576 }
577 for _, m := range smallProviderCfg.Models {
578 if m.ID == smallModelCfg.Model {
579 smallCatwalkModel = &m
580 }
581 }
582
583 if largeCatwalkModel == nil {
584 return Model{}, Model{}, errLargeModelNotFound
585 }
586
587 if smallCatwalkModel == nil {
588 return Model{}, Model{}, errSmallModelNotFound
589 }
590
591 largeModelID := largeModelCfg.Model
592 smallModelID := smallModelCfg.Model
593
594 if largeModelCfg.Provider == openrouter.Name && isExactoSupported(largeModelID) {
595 largeModelID += ":exacto"
596 }
597
598 if smallModelCfg.Provider == openrouter.Name && isExactoSupported(smallModelID) {
599 smallModelID += ":exacto"
600 }
601
602 largeModel, err := largeProvider.LanguageModel(ctx, largeModelID)
603 if err != nil {
604 return Model{}, Model{}, err
605 }
606 smallModel, err := smallProvider.LanguageModel(ctx, smallModelID)
607 if err != nil {
608 return Model{}, Model{}, err
609 }
610
611 return Model{
612 Model: largeModel,
613 CatwalkCfg: *largeCatwalkModel,
614 ModelCfg: largeModelCfg,
615 }, Model{
616 Model: smallModel,
617 CatwalkCfg: *smallCatwalkModel,
618 ModelCfg: smallModelCfg,
619 }, nil
620}
621
622func (c *coordinator) buildAnthropicProvider(baseURL, apiKey string, headers map[string]string, providerID string) (fantasy.Provider, error) {
623 var opts []anthropic.Option
624
625 switch {
626 case strings.HasPrefix(apiKey, "Bearer "):
627 // NOTE: Prevent the SDK from picking up the API key from env.
628 os.Setenv("ANTHROPIC_API_KEY", "")
629 headers["Authorization"] = apiKey
630 case providerID == string(catwalk.InferenceProviderMiniMax) || providerID == string(catwalk.InferenceProviderMiniMaxChina):
631 // NOTE: Prevent the SDK from picking up the API key from env.
632 os.Setenv("ANTHROPIC_API_KEY", "")
633 headers["Authorization"] = "Bearer " + apiKey
634 case apiKey != "":
635 // X-Api-Key header
636 opts = append(opts, anthropic.WithAPIKey(apiKey))
637 }
638
639 if len(headers) > 0 {
640 opts = append(opts, anthropic.WithHeaders(headers))
641 }
642
643 if baseURL != "" {
644 opts = append(opts, anthropic.WithBaseURL(baseURL))
645 }
646
647 if c.cfg.Config().Options.Debug {
648 httpClient := log.NewHTTPClient()
649 opts = append(opts, anthropic.WithHTTPClient(httpClient))
650 }
651 return anthropic.New(opts...)
652}
653
654func (c *coordinator) buildOpenaiProvider(baseURL, apiKey string, headers map[string]string) (fantasy.Provider, error) {
655 opts := []openai.Option{
656 openai.WithAPIKey(apiKey),
657 openai.WithUseResponsesAPI(),
658 }
659 if c.cfg.Config().Options.Debug {
660 httpClient := log.NewHTTPClient()
661 opts = append(opts, openai.WithHTTPClient(httpClient))
662 }
663 if len(headers) > 0 {
664 opts = append(opts, openai.WithHeaders(headers))
665 }
666 if baseURL != "" {
667 opts = append(opts, openai.WithBaseURL(baseURL))
668 }
669 return openai.New(opts...)
670}
671
672func (c *coordinator) buildOpenrouterProvider(_, apiKey string, headers map[string]string) (fantasy.Provider, error) {
673 opts := []openrouter.Option{
674 openrouter.WithAPIKey(apiKey),
675 }
676 if c.cfg.Config().Options.Debug {
677 httpClient := log.NewHTTPClient()
678 opts = append(opts, openrouter.WithHTTPClient(httpClient))
679 }
680 if len(headers) > 0 {
681 opts = append(opts, openrouter.WithHeaders(headers))
682 }
683 return openrouter.New(opts...)
684}
685
686func (c *coordinator) buildVercelProvider(_, apiKey string, headers map[string]string) (fantasy.Provider, error) {
687 opts := []vercel.Option{
688 vercel.WithAPIKey(apiKey),
689 }
690 if c.cfg.Config().Options.Debug {
691 httpClient := log.NewHTTPClient()
692 opts = append(opts, vercel.WithHTTPClient(httpClient))
693 }
694 if len(headers) > 0 {
695 opts = append(opts, vercel.WithHeaders(headers))
696 }
697 return vercel.New(opts...)
698}
699
700func (c *coordinator) buildOpenaiCompatProvider(baseURL, apiKey string, headers map[string]string, extraBody map[string]any, providerID string, isSubAgent bool) (fantasy.Provider, error) {
701 opts := []openaicompat.Option{
702 openaicompat.WithBaseURL(baseURL),
703 openaicompat.WithAPIKey(apiKey),
704 }
705
706 // Set HTTP client based on provider and debug mode.
707 var httpClient *http.Client
708 if providerID == string(catwalk.InferenceProviderCopilot) {
709 opts = append(opts, openaicompat.WithUseResponsesAPI())
710 httpClient = copilot.NewClient(isSubAgent, c.cfg.Config().Options.Debug)
711 } else if c.cfg.Config().Options.Debug {
712 httpClient = log.NewHTTPClient()
713 }
714 if httpClient != nil {
715 opts = append(opts, openaicompat.WithHTTPClient(httpClient))
716 }
717
718 if len(headers) > 0 {
719 opts = append(opts, openaicompat.WithHeaders(headers))
720 }
721
722 for extraKey, extraValue := range extraBody {
723 opts = append(opts, openaicompat.WithSDKOptions(openaisdk.WithJSONSet(extraKey, extraValue)))
724 }
725
726 return openaicompat.New(opts...)
727}
728
729func (c *coordinator) buildAzureProvider(baseURL, apiKey string, headers map[string]string, options map[string]string) (fantasy.Provider, error) {
730 opts := []azure.Option{
731 azure.WithBaseURL(baseURL),
732 azure.WithAPIKey(apiKey),
733 azure.WithUseResponsesAPI(),
734 }
735 if c.cfg.Config().Options.Debug {
736 httpClient := log.NewHTTPClient()
737 opts = append(opts, azure.WithHTTPClient(httpClient))
738 }
739 if options == nil {
740 options = make(map[string]string)
741 }
742 if apiVersion, ok := options["apiVersion"]; ok {
743 opts = append(opts, azure.WithAPIVersion(apiVersion))
744 }
745 if len(headers) > 0 {
746 opts = append(opts, azure.WithHeaders(headers))
747 }
748
749 return azure.New(opts...)
750}
751
752func (c *coordinator) buildBedrockProvider(apiKey string, headers map[string]string) (fantasy.Provider, error) {
753 var opts []bedrock.Option
754 if c.cfg.Config().Options.Debug {
755 httpClient := log.NewHTTPClient()
756 opts = append(opts, bedrock.WithHTTPClient(httpClient))
757 }
758 if len(headers) > 0 {
759 opts = append(opts, bedrock.WithHeaders(headers))
760 }
761 switch {
762 case apiKey != "":
763 opts = append(opts, bedrock.WithAPIKey(apiKey))
764 case os.Getenv("AWS_BEARER_TOKEN_BEDROCK") != "":
765 opts = append(opts, bedrock.WithAPIKey(os.Getenv("AWS_BEARER_TOKEN_BEDROCK")))
766 default:
767 // Skip, let the SDK do authentication.
768 }
769 return bedrock.New(opts...)
770}
771
772func (c *coordinator) buildGoogleProvider(baseURL, apiKey string, headers map[string]string) (fantasy.Provider, error) {
773 opts := []google.Option{
774 google.WithBaseURL(baseURL),
775 google.WithGeminiAPIKey(apiKey),
776 }
777 if c.cfg.Config().Options.Debug {
778 httpClient := log.NewHTTPClient()
779 opts = append(opts, google.WithHTTPClient(httpClient))
780 }
781 if len(headers) > 0 {
782 opts = append(opts, google.WithHeaders(headers))
783 }
784 return google.New(opts...)
785}
786
787func (c *coordinator) buildGoogleVertexProvider(headers map[string]string, options map[string]string) (fantasy.Provider, error) {
788 opts := []google.Option{}
789 if c.cfg.Config().Options.Debug {
790 httpClient := log.NewHTTPClient()
791 opts = append(opts, google.WithHTTPClient(httpClient))
792 }
793 if len(headers) > 0 {
794 opts = append(opts, google.WithHeaders(headers))
795 }
796
797 project := options["project"]
798 location := options["location"]
799
800 opts = append(opts, google.WithVertex(project, location))
801
802 return google.New(opts...)
803}
804
805func (c *coordinator) isAnthropicThinking(model config.SelectedModel) bool {
806 if model.Think {
807 return true
808 }
809 opts, err := anthropic.ParseOptions(model.ProviderOptions)
810 return err == nil && opts.Thinking != nil
811}
812
813func (c *coordinator) buildProvider(providerCfg config.ProviderConfig, model config.SelectedModel, isSubAgent bool) (fantasy.Provider, error) {
814 headers := maps.Clone(providerCfg.ExtraHeaders)
815 if headers == nil {
816 headers = make(map[string]string)
817 }
818
819 // handle special headers for anthropic
820 if providerCfg.Type == anthropic.Name && c.isAnthropicThinking(model) {
821 if v, ok := headers["anthropic-beta"]; ok {
822 headers["anthropic-beta"] = v + ",interleaved-thinking-2025-05-14"
823 } else {
824 headers["anthropic-beta"] = "interleaved-thinking-2025-05-14"
825 }
826 }
827
828 apiKey, _ := c.cfg.Resolve(providerCfg.APIKey)
829 baseURL, _ := c.cfg.Resolve(providerCfg.BaseURL)
830
831 switch providerCfg.Type {
832 case openai.Name:
833 return c.buildOpenaiProvider(baseURL, apiKey, headers)
834 case anthropic.Name:
835 return c.buildAnthropicProvider(baseURL, apiKey, headers, providerCfg.ID)
836 case openrouter.Name:
837 return c.buildOpenrouterProvider(baseURL, apiKey, headers)
838 case vercel.Name:
839 return c.buildVercelProvider(baseURL, apiKey, headers)
840 case azure.Name:
841 return c.buildAzureProvider(baseURL, apiKey, headers, providerCfg.ExtraParams)
842 case bedrock.Name:
843 return c.buildBedrockProvider(apiKey, headers)
844 case google.Name:
845 return c.buildGoogleProvider(baseURL, apiKey, headers)
846 case "google-vertex":
847 return c.buildGoogleVertexProvider(headers, providerCfg.ExtraParams)
848 case openaicompat.Name, hyper.Name:
849 switch providerCfg.ID {
850 case hyper.Name:
851 baseURL = hyper.BaseURL() + "/v1"
852 headers["x-crush-id"] = event.GetID()
853 case string(catwalk.InferenceProviderZAI):
854 if providerCfg.ExtraBody == nil {
855 providerCfg.ExtraBody = map[string]any{}
856 }
857 providerCfg.ExtraBody["tool_stream"] = true
858 }
859 return c.buildOpenaiCompatProvider(baseURL, apiKey, headers, providerCfg.ExtraBody, providerCfg.ID, isSubAgent)
860 default:
861 return nil, fmt.Errorf("provider type not supported: %q", providerCfg.Type)
862 }
863}
864
865func isExactoSupported(modelID string) bool {
866 supportedModels := []string{
867 "moonshotai/kimi-k2-0905",
868 "deepseek/deepseek-v3.1-terminus",
869 "z-ai/glm-4.6",
870 "openai/gpt-oss-120b",
871 "qwen/qwen3-coder",
872 }
873 return slices.Contains(supportedModels, modelID)
874}
875
876func (c *coordinator) Cancel(sessionID string) {
877 c.currentAgent.Cancel(sessionID)
878}
879
880func (c *coordinator) CancelAll() {
881 c.currentAgent.CancelAll()
882}
883
884func (c *coordinator) ClearQueue(sessionID string) {
885 c.currentAgent.ClearQueue(sessionID)
886}
887
888func (c *coordinator) IsBusy() bool {
889 return c.currentAgent.IsBusy()
890}
891
892func (c *coordinator) IsSessionBusy(sessionID string) bool {
893 return c.currentAgent.IsSessionBusy(sessionID)
894}
895
896func (c *coordinator) Model() Model {
897 return c.currentAgent.Model()
898}
899
900func (c *coordinator) UpdateModels(ctx context.Context) error {
901 // build the models again so we make sure we get the latest config
902 large, small, err := c.buildAgentModels(ctx, false)
903 if err != nil {
904 return err
905 }
906 c.currentAgent.SetModels(large, small)
907
908 agentCfg, ok := c.cfg.Config().Agents[config.AgentCoder]
909 if !ok {
910 return errCoderAgentNotConfigured
911 }
912
913 tools, err := c.buildTools(ctx, agentCfg)
914 if err != nil {
915 return err
916 }
917 c.currentAgent.SetTools(tools)
918 return nil
919}
920
921func (c *coordinator) QueuedPrompts(sessionID string) int {
922 return c.currentAgent.QueuedPrompts(sessionID)
923}
924
925func (c *coordinator) QueuedPromptsList(sessionID string) []string {
926 return c.currentAgent.QueuedPromptsList(sessionID)
927}
928
929func (c *coordinator) Summarize(ctx context.Context, sessionID string) error {
930 providerCfg, ok := c.cfg.Config().Providers.Get(c.currentAgent.Model().ModelCfg.Provider)
931 if !ok {
932 return errModelProviderNotConfigured
933 }
934 return c.currentAgent.Summarize(ctx, sessionID, getProviderOptions(c.currentAgent.Model(), providerCfg))
935}
936
937func (c *coordinator) isUnauthorized(err error) bool {
938 var providerErr *fantasy.ProviderError
939 return errors.As(err, &providerErr) && providerErr.StatusCode == http.StatusUnauthorized
940}
941
942func (c *coordinator) refreshOAuth2Token(ctx context.Context, providerCfg config.ProviderConfig) error {
943 if err := c.cfg.RefreshOAuthToken(ctx, config.ScopeGlobal, providerCfg.ID); err != nil {
944 slog.Error("Failed to refresh OAuth token after 401 error", "provider", providerCfg.ID, "error", err)
945 return err
946 }
947 if err := c.UpdateModels(ctx); err != nil {
948 return err
949 }
950 return nil
951}
952
953func (c *coordinator) refreshApiKeyTemplate(ctx context.Context, providerCfg config.ProviderConfig) error {
954 newAPIKey, err := c.cfg.Resolve(providerCfg.APIKeyTemplate)
955 if err != nil {
956 slog.Error("Failed to re-resolve API key after 401 error", "provider", providerCfg.ID, "error", err)
957 return err
958 }
959
960 providerCfg.APIKey = newAPIKey
961 c.cfg.Config().Providers.Set(providerCfg.ID, providerCfg)
962
963 if err := c.UpdateModels(ctx); err != nil {
964 return err
965 }
966 return nil
967}
968
969// subAgentParams holds the parameters for running a sub-agent.
970type subAgentParams struct {
971 Agent SessionAgent
972 SessionID string
973 AgentMessageID string
974 ToolCallID string
975 Prompt string
976 SessionTitle string
977 // SessionSetup is an optional callback invoked after session creation
978 // but before agent execution, for custom session configuration.
979 SessionSetup func(sessionID string)
980}
981
982// runSubAgent runs a sub-agent and handles session management and cost accumulation.
983// It creates a sub-session, runs the agent with the given prompt, and propagates
984// the cost to the parent session.
985func (c *coordinator) runSubAgent(ctx context.Context, params subAgentParams) (fantasy.ToolResponse, error) {
986 // Create sub-session
987 agentToolSessionID := c.sessions.CreateAgentToolSessionID(params.AgentMessageID, params.ToolCallID)
988 session, err := c.sessions.CreateTaskSession(ctx, agentToolSessionID, params.SessionID, params.SessionTitle)
989 if err != nil {
990 return fantasy.ToolResponse{}, fmt.Errorf("create session: %w", err)
991 }
992
993 // Call session setup function if provided
994 if params.SessionSetup != nil {
995 params.SessionSetup(session.ID)
996 }
997
998 // Get model configuration
999 model := params.Agent.Model()
1000 maxTokens := model.CatwalkCfg.DefaultMaxTokens
1001 if model.ModelCfg.MaxTokens != 0 {
1002 maxTokens = model.ModelCfg.MaxTokens
1003 }
1004
1005 providerCfg, ok := c.cfg.Config().Providers.Get(model.ModelCfg.Provider)
1006 if !ok {
1007 return fantasy.ToolResponse{}, errModelProviderNotConfigured
1008 }
1009
1010 // Run the agent
1011 result, err := params.Agent.Run(ctx, SessionAgentCall{
1012 SessionID: session.ID,
1013 Prompt: params.Prompt,
1014 MaxOutputTokens: maxTokens,
1015 ProviderOptions: getProviderOptions(model, providerCfg),
1016 Temperature: model.ModelCfg.Temperature,
1017 TopP: model.ModelCfg.TopP,
1018 TopK: model.ModelCfg.TopK,
1019 FrequencyPenalty: model.ModelCfg.FrequencyPenalty,
1020 PresencePenalty: model.ModelCfg.PresencePenalty,
1021 NonInteractive: true,
1022 })
1023 if err != nil {
1024 return fantasy.NewTextErrorResponse("error generating response"), nil
1025 }
1026
1027 // Update parent session cost
1028 if err := c.updateParentSessionCost(ctx, session.ID, params.SessionID); err != nil {
1029 return fantasy.ToolResponse{}, err
1030 }
1031
1032 return fantasy.NewTextResponse(result.Response.Content.Text()), nil
1033}
1034
1035// updateParentSessionCost accumulates the cost from a child session to its parent session.
1036func (c *coordinator) updateParentSessionCost(ctx context.Context, childSessionID, parentSessionID string) error {
1037 childSession, err := c.sessions.Get(ctx, childSessionID)
1038 if err != nil {
1039 return fmt.Errorf("get child session: %w", err)
1040 }
1041
1042 parentSession, err := c.sessions.Get(ctx, parentSessionID)
1043 if err != nil {
1044 return fmt.Errorf("get parent session: %w", err)
1045 }
1046
1047 parentSession.Cost += childSession.Cost
1048
1049 if _, err := c.sessions.Save(ctx, parentSession); err != nil {
1050 return fmt.Errorf("save parent session: %w", err)
1051 }
1052
1053 return nil
1054}
1055
1056// discoverSkills runs the skill discovery pipeline and returns both the
1057// pre-filter (all discovered, after dedup) and post-filter (active) lists.
1058// It also emits a single diagnostic log line summarising the outcome to
1059// help track skill-loading health over time.
1060func discoverSkills(cfg *config.ConfigStore) (allSkills, activeSkills []*skills.Skill) {
1061 builtin, builtinStates := skills.DiscoverBuiltinWithStates()
1062 discovered := append([]*skills.Skill(nil), builtin...)
1063
1064 var userStates []*skills.SkillState
1065 var userPaths []string
1066
1067 opts := cfg.Config().Options
1068 if opts != nil && len(opts.SkillsPaths) > 0 {
1069 userPaths = make([]string, 0, len(opts.SkillsPaths))
1070 for _, pth := range opts.SkillsPaths {
1071 expanded := home.Long(pth)
1072 if strings.HasPrefix(expanded, "$") {
1073 if resolved, err := cfg.Resolver().ResolveValue(expanded); err == nil {
1074 expanded = resolved
1075 }
1076 }
1077 userPaths = append(userPaths, expanded)
1078 }
1079 var userSkills []*skills.Skill
1080 userSkills, userStates = skills.DiscoverWithStates(userPaths)
1081 discovered = append(discovered, userSkills...)
1082 }
1083
1084 allSkills = skills.Deduplicate(discovered)
1085 var disabledSkills []string
1086 if opts != nil {
1087 disabledSkills = opts.DisabledSkills
1088 }
1089 activeSkills = skills.Filter(allSkills, disabledSkills)
1090
1091 logDiscoveryStats(builtin, builtinStates, userStates, userPaths, allSkills, activeSkills, disabledSkills)
1092 return allSkills, activeSkills
1093}
1094
1095// logTurnSkillUsage emits a per-turn diagnostic line showing which skills
1096// (if any) were loaded during this turn and which looked relevant based on
1097// a cheap keyword match against the user prompt. The goal is to surface
1098// "should-have-loaded but didn't" situations for later analysis.
1099//
1100// Logged at Info level under component=skills; heavy fields are elided when
1101// there is nothing interesting to report.
1102func logTurnSkillUsage(
1103 sessionID string,
1104 prompt string,
1105 activeSkills []*skills.Skill,
1106 tracker *skills.Tracker,
1107 before []string,
1108) {
1109 if tracker == nil || len(activeSkills) == 0 {
1110 return
1111 }
1112
1113 after := tracker.LoadedNames()
1114
1115 beforeSet := make(map[string]bool, len(before))
1116 for _, n := range before {
1117 beforeSet[n] = true
1118 }
1119 var loadedThisTurn []string
1120 for _, n := range after {
1121 if !beforeSet[n] {
1122 loadedThisTurn = append(loadedThisTurn, n)
1123 }
1124 }
1125
1126 slog.Info("Skill turn summary",
1127 "component", "skills",
1128 "session_id", sessionID,
1129 "prompt_len", len(prompt),
1130 "active_total", len(activeSkills),
1131 "loaded_total", len(after),
1132 "loaded_this_turn", loadedThisTurn,
1133 )
1134}
1135
1136// logDiscoveryStats emits a single structured log line summarising skill
1137// discovery for the current session. It is intentionally low-volume: one
1138// line per session start.
1139func logDiscoveryStats(
1140 builtin []*skills.Skill,
1141 builtinStates, userStates []*skills.SkillState,
1142 userPaths []string,
1143 allSkills, activeSkills []*skills.Skill,
1144 disabled []string,
1145) {
1146 countErrors := func(states []*skills.SkillState) int {
1147 n := 0
1148 for _, s := range states {
1149 if s.State == skills.StateError {
1150 n++
1151 }
1152 }
1153 return n
1154 }
1155
1156 userOK := 0
1157 for _, s := range userStates {
1158 if s.State == skills.StateNormal {
1159 userOK++
1160 }
1161 }
1162
1163 activeNames := make([]string, 0, len(activeSkills))
1164 for _, s := range activeSkills {
1165 activeNames = append(activeNames, s.Name)
1166 }
1167
1168 xml := skills.ToPromptXML(activeSkills)
1169
1170 slog.Info("Skill discovery complete",
1171 "component", "skills",
1172 "builtin_ok", len(builtin),
1173 "builtin_errors", countErrors(builtinStates),
1174 "user_ok", userOK,
1175 "user_errors", countErrors(userStates),
1176 "user_paths", len(userPaths),
1177 "deduped_total", len(allSkills),
1178 "active", len(activeSkills),
1179 "disabled", len(disabled),
1180 "prompt_bytes", len(xml),
1181 "prompt_tok_est", skills.ApproxTokenCount(xml),
1182 "active_names", activeNames,
1183 )
1184}