1package agent
2
3import (
4 "bytes"
5 "cmp"
6 "context"
7 "encoding/json"
8 "errors"
9 "fmt"
10 "io"
11 "log/slog"
12 "maps"
13 "net/http"
14 "os"
15 "path/filepath"
16 "slices"
17 "strings"
18
19 "charm.land/catwalk/pkg/catwalk"
20 "charm.land/fantasy"
21 "github.com/charmbracelet/crush/internal/agent/hyper"
22 "github.com/charmbracelet/crush/internal/agent/notify"
23 "github.com/charmbracelet/crush/internal/agent/prompt"
24 "github.com/charmbracelet/crush/internal/agent/tools"
25 "github.com/charmbracelet/crush/internal/config"
26 "github.com/charmbracelet/crush/internal/filetracker"
27 "github.com/charmbracelet/crush/internal/history"
28 "github.com/charmbracelet/crush/internal/home"
29 "github.com/charmbracelet/crush/internal/log"
30 "github.com/charmbracelet/crush/internal/lsp"
31 "github.com/charmbracelet/crush/internal/message"
32 "github.com/charmbracelet/crush/internal/oauth/copilot"
33 "github.com/charmbracelet/crush/internal/permission"
34 "github.com/charmbracelet/crush/internal/pubsub"
35 "github.com/charmbracelet/crush/internal/session"
36 "github.com/charmbracelet/crush/internal/skills"
37 "golang.org/x/sync/errgroup"
38
39 "charm.land/fantasy/providers/anthropic"
40 "charm.land/fantasy/providers/azure"
41 "charm.land/fantasy/providers/bedrock"
42 "charm.land/fantasy/providers/google"
43 "charm.land/fantasy/providers/openai"
44 "charm.land/fantasy/providers/openaicompat"
45 "charm.land/fantasy/providers/openrouter"
46 "charm.land/fantasy/providers/vercel"
47 openaisdk "github.com/charmbracelet/openai-go/option"
48 "github.com/qjebbs/go-jsons"
49)
50
51// Coordinator errors.
52var (
53 errCoderAgentNotConfigured = errors.New("coder agent not configured")
54 errModelProviderNotConfigured = errors.New("model provider not configured")
55 errLargeModelNotSelected = errors.New("large model not selected")
56 errSmallModelNotSelected = errors.New("small model not selected")
57 errLargeModelProviderNotConfigured = errors.New("large model provider not configured")
58 errSmallModelProviderNotConfigured = errors.New("small model provider not configured")
59 errLargeModelNotFound = errors.New("large model not found in provider config")
60 errSmallModelNotFound = errors.New("small model not found in provider config")
61)
62
63type Coordinator interface {
64 // INFO: (kujtim) this is not used yet we will use this when we have multiple agents
65 // SetMainAgent(string)
66 Run(ctx context.Context, sessionID, prompt string, attachments ...message.Attachment) (*fantasy.AgentResult, error)
67 Cancel(sessionID string)
68 CancelAll()
69 IsSessionBusy(sessionID string) bool
70 IsBusy() bool
71 QueuedPrompts(sessionID string) int
72 QueuedPromptsList(sessionID string) []string
73 ClearQueue(sessionID string)
74 Summarize(context.Context, string) error
75 Model() Model
76 UpdateModels(ctx context.Context) error
77}
78
79type coordinator struct {
80 cfg *config.ConfigStore
81 sessions session.Service
82 messages message.Service
83 permissions permission.Service
84 history history.Service
85 filetracker filetracker.Service
86 lspManager *lsp.Manager
87 notify pubsub.Publisher[notify.Notification]
88
89 currentAgent SessionAgent
90 agents map[string]SessionAgent
91
92 // Skills discovery results (session-start snapshot).
93 allSkills []*skills.Skill // Pre-filter: all discovered after dedup.
94 activeSkills []*skills.Skill // Post-filter: active skills only.
95 skillTracker *skills.Tracker
96
97 readyWg errgroup.Group
98}
99
100func NewCoordinator(
101 ctx context.Context,
102 cfg *config.ConfigStore,
103 sessions session.Service,
104 messages message.Service,
105 permissions permission.Service,
106 history history.Service,
107 filetracker filetracker.Service,
108 lspManager *lsp.Manager,
109 notify pubsub.Publisher[notify.Notification],
110) (Coordinator, error) {
111 // Discover skills once at session start.
112 allSkills, activeSkills := discoverSkills(cfg)
113 skillTracker := skills.NewTracker(activeSkills)
114
115 c := &coordinator{
116 cfg: cfg,
117 sessions: sessions,
118 messages: messages,
119 permissions: permissions,
120 history: history,
121 filetracker: filetracker,
122 lspManager: lspManager,
123 notify: notify,
124 agents: make(map[string]SessionAgent),
125 allSkills: allSkills,
126 activeSkills: activeSkills,
127 skillTracker: skillTracker,
128 }
129
130 agentCfg, ok := cfg.Config().Agents[config.AgentCoder]
131 if !ok {
132 return nil, errCoderAgentNotConfigured
133 }
134
135 // TODO: make this dynamic when we support multiple agents
136 prompt, err := coderPrompt(prompt.WithWorkingDir(c.cfg.WorkingDir()))
137 if err != nil {
138 return nil, err
139 }
140
141 agent, err := c.buildAgent(ctx, prompt, agentCfg, false)
142 if err != nil {
143 return nil, err
144 }
145 c.currentAgent = agent
146 c.agents[config.AgentCoder] = agent
147 return c, nil
148}
149
150// Run implements Coordinator.
151func (c *coordinator) Run(ctx context.Context, sessionID string, prompt string, attachments ...message.Attachment) (*fantasy.AgentResult, error) {
152 if err := c.readyWg.Wait(); err != nil {
153 return nil, err
154 }
155
156 // refresh models before each run
157 if err := c.UpdateModels(ctx); err != nil {
158 return nil, fmt.Errorf("failed to update models: %w", err)
159 }
160
161 model := c.currentAgent.Model()
162 maxTokens := model.CatwalkCfg.DefaultMaxTokens
163 if model.ModelCfg.MaxTokens != 0 {
164 maxTokens = model.ModelCfg.MaxTokens
165 }
166
167 if !model.CatwalkCfg.SupportsImages && attachments != nil {
168 // filter out image attachments
169 filteredAttachments := make([]message.Attachment, 0, len(attachments))
170 for _, att := range attachments {
171 if att.IsText() {
172 filteredAttachments = append(filteredAttachments, att)
173 }
174 }
175 attachments = filteredAttachments
176 }
177
178 providerCfg, ok := c.cfg.Config().Providers.Get(model.ModelCfg.Provider)
179 if !ok {
180 return nil, errModelProviderNotConfigured
181 }
182
183 mergedOptions, temp, topP, topK, freqPenalty, presPenalty := mergeCallOptions(model, providerCfg)
184
185 if providerCfg.OAuthToken != nil && providerCfg.OAuthToken.IsExpired() {
186 slog.Debug("Token needs to be refreshed", "provider", providerCfg.ID)
187 if err := c.refreshOAuth2Token(ctx, providerCfg); err != nil {
188 return nil, err
189 }
190 }
191
192 run := func() (*fantasy.AgentResult, error) {
193 return c.currentAgent.Run(ctx, SessionAgentCall{
194 SessionID: sessionID,
195 Prompt: prompt,
196 Attachments: attachments,
197 MaxOutputTokens: maxTokens,
198 ProviderOptions: mergedOptions,
199 Temperature: temp,
200 TopP: topP,
201 TopK: topK,
202 FrequencyPenalty: freqPenalty,
203 PresencePenalty: presPenalty,
204 })
205 }
206 result, originalErr := run()
207
208 if c.isUnauthorized(originalErr) {
209 switch {
210 case providerCfg.OAuthToken != nil:
211 slog.Debug("Received 401. Refreshing token and retrying", "provider", providerCfg.ID)
212 if err := c.refreshOAuth2Token(ctx, providerCfg); err != nil {
213 return nil, originalErr
214 }
215 slog.Debug("Retrying request with refreshed OAuth token", "provider", providerCfg.ID)
216 return run()
217 case strings.Contains(providerCfg.APIKeyTemplate, "$"):
218 slog.Debug("Received 401. Refreshing API Key template and retrying", "provider", providerCfg.ID)
219 if err := c.refreshApiKeyTemplate(ctx, providerCfg); err != nil {
220 return nil, originalErr
221 }
222 slog.Debug("Retrying request with refreshed API key", "provider", providerCfg.ID)
223 return run()
224 }
225 }
226
227 return result, originalErr
228}
229
230func getProviderOptions(model Model, providerCfg config.ProviderConfig) fantasy.ProviderOptions {
231 options := fantasy.ProviderOptions{}
232
233 cfgOpts := []byte("{}")
234 providerCfgOpts := []byte("{}")
235 catwalkOpts := []byte("{}")
236
237 if model.ModelCfg.ProviderOptions != nil {
238 data, err := json.Marshal(model.ModelCfg.ProviderOptions)
239 if err == nil {
240 cfgOpts = data
241 }
242 }
243
244 if providerCfg.ProviderOptions != nil {
245 data, err := json.Marshal(providerCfg.ProviderOptions)
246 if err == nil {
247 providerCfgOpts = data
248 }
249 }
250
251 if model.CatwalkCfg.Options.ProviderOptions != nil {
252 data, err := json.Marshal(model.CatwalkCfg.Options.ProviderOptions)
253 if err == nil {
254 catwalkOpts = data
255 }
256 }
257
258 readers := []io.Reader{
259 bytes.NewReader(catwalkOpts),
260 bytes.NewReader(providerCfgOpts),
261 bytes.NewReader(cfgOpts),
262 }
263
264 got, err := jsons.Merge(readers)
265 if err != nil {
266 slog.Error("Could not merge call config", "err", err)
267 return options
268 }
269
270 mergedOptions := make(map[string]any)
271
272 err = json.Unmarshal([]byte(got), &mergedOptions)
273 if err != nil {
274 slog.Error("Could not create config for call", "err", err)
275 return options
276 }
277
278 providerType := providerCfg.Type
279 if providerType == "hyper" {
280 if strings.Contains(model.CatwalkCfg.ID, "claude") {
281 providerType = anthropic.Name
282 } else if strings.Contains(model.CatwalkCfg.ID, "gpt") {
283 providerType = openai.Name
284 } else if strings.Contains(model.CatwalkCfg.ID, "gemini") {
285 providerType = google.Name
286 } else {
287 providerType = openaicompat.Name
288 }
289 }
290
291 switch providerType {
292 case openai.Name, azure.Name:
293 _, hasReasoningEffort := mergedOptions["reasoning_effort"]
294 if !hasReasoningEffort && model.ModelCfg.ReasoningEffort != "" {
295 mergedOptions["reasoning_effort"] = model.ModelCfg.ReasoningEffort
296 }
297 if openai.IsResponsesModel(model.CatwalkCfg.ID) {
298 if openai.IsResponsesReasoningModel(model.CatwalkCfg.ID) {
299 mergedOptions["reasoning_summary"] = "auto"
300 mergedOptions["include"] = []openai.IncludeType{openai.IncludeReasoningEncryptedContent}
301 }
302 parsed, err := openai.ParseResponsesOptions(mergedOptions)
303 if err == nil {
304 options[openai.Name] = parsed
305 }
306 } else {
307 parsed, err := openai.ParseOptions(mergedOptions)
308 if err == nil {
309 options[openai.Name] = parsed
310 }
311 }
312 case anthropic.Name:
313 var (
314 _, hasEffort = mergedOptions["effort"]
315 _, hasThink = mergedOptions["thinking"]
316 )
317 switch {
318 case !hasEffort && model.ModelCfg.ReasoningEffort != "":
319 mergedOptions["effort"] = model.ModelCfg.ReasoningEffort
320 case !hasThink && model.ModelCfg.Think:
321 mergedOptions["thinking"] = map[string]any{"budget_tokens": 2000}
322 }
323 parsed, err := anthropic.ParseOptions(mergedOptions)
324 if err == nil {
325 options[anthropic.Name] = parsed
326 }
327
328 case openrouter.Name:
329 _, hasReasoning := mergedOptions["reasoning"]
330 if !hasReasoning && model.ModelCfg.ReasoningEffort != "" {
331 mergedOptions["reasoning"] = map[string]any{
332 "enabled": true,
333 "effort": model.ModelCfg.ReasoningEffort,
334 }
335 }
336 parsed, err := openrouter.ParseOptions(mergedOptions)
337 if err == nil {
338 options[openrouter.Name] = parsed
339 }
340 case vercel.Name:
341 _, hasReasoning := mergedOptions["reasoning"]
342 if !hasReasoning && model.ModelCfg.ReasoningEffort != "" {
343 mergedOptions["reasoning"] = map[string]any{
344 "enabled": true,
345 "effort": model.ModelCfg.ReasoningEffort,
346 }
347 }
348 parsed, err := vercel.ParseOptions(mergedOptions)
349 if err == nil {
350 options[vercel.Name] = parsed
351 }
352 case google.Name:
353 _, hasReasoning := mergedOptions["thinking_config"]
354 if !hasReasoning {
355 if strings.HasPrefix(model.CatwalkCfg.ID, "gemini-2") {
356 mergedOptions["thinking_config"] = map[string]any{
357 "thinking_budget": 2000,
358 "include_thoughts": true,
359 }
360 } else {
361 mergedOptions["thinking_config"] = map[string]any{
362 "thinking_level": model.ModelCfg.ReasoningEffort,
363 "include_thoughts": true,
364 }
365 }
366 }
367 parsed, err := google.ParseOptions(mergedOptions)
368 if err == nil {
369 options[google.Name] = parsed
370 }
371 case openaicompat.Name:
372 _, hasReasoningEffort := mergedOptions["reasoning_effort"]
373 if !hasReasoningEffort && model.ModelCfg.ReasoningEffort != "" {
374 mergedOptions["reasoning_effort"] = model.ModelCfg.ReasoningEffort
375 }
376 parsed, err := openaicompat.ParseOptions(mergedOptions)
377 if err == nil {
378 options[openaicompat.Name] = parsed
379 }
380 }
381
382 return options
383}
384
385func mergeCallOptions(model Model, cfg config.ProviderConfig) (fantasy.ProviderOptions, *float64, *float64, *int64, *float64, *float64) {
386 modelOptions := getProviderOptions(model, cfg)
387 temp := cmp.Or(model.ModelCfg.Temperature, model.CatwalkCfg.Options.Temperature)
388 topP := cmp.Or(model.ModelCfg.TopP, model.CatwalkCfg.Options.TopP)
389 topK := cmp.Or(model.ModelCfg.TopK, model.CatwalkCfg.Options.TopK)
390 freqPenalty := cmp.Or(model.ModelCfg.FrequencyPenalty, model.CatwalkCfg.Options.FrequencyPenalty)
391 presPenalty := cmp.Or(model.ModelCfg.PresencePenalty, model.CatwalkCfg.Options.PresencePenalty)
392 return modelOptions, temp, topP, topK, freqPenalty, presPenalty
393}
394
395func (c *coordinator) buildAgent(ctx context.Context, prompt *prompt.Prompt, agent config.Agent, isSubAgent bool) (SessionAgent, error) {
396 large, small, err := c.buildAgentModels(ctx, isSubAgent)
397 if err != nil {
398 return nil, err
399 }
400
401 largeProviderCfg, _ := c.cfg.Config().Providers.Get(large.ModelCfg.Provider)
402 result := NewSessionAgent(SessionAgentOptions{
403 LargeModel: large,
404 SmallModel: small,
405 SystemPromptPrefix: largeProviderCfg.SystemPromptPrefix,
406 SystemPrompt: "",
407 IsSubAgent: isSubAgent,
408 DisableAutoSummarize: c.cfg.Config().Options.DisableAutoSummarize,
409 IsYolo: c.permissions.SkipRequests(),
410 Sessions: c.sessions,
411 Messages: c.messages,
412 Tools: nil,
413 Notify: c.notify,
414 })
415
416 c.readyWg.Go(func() error {
417 systemPrompt, err := prompt.Build(ctx, large.Model.Provider(), large.Model.Model(), c.cfg)
418 if err != nil {
419 return err
420 }
421 result.SetSystemPrompt(systemPrompt)
422 return nil
423 })
424
425 c.readyWg.Go(func() error {
426 tools, err := c.buildTools(ctx, agent)
427 if err != nil {
428 return err
429 }
430 result.SetTools(tools)
431 return nil
432 })
433
434 return result, nil
435}
436
437func (c *coordinator) buildTools(ctx context.Context, agent config.Agent) ([]fantasy.AgentTool, error) {
438 var allTools []fantasy.AgentTool
439 if slices.Contains(agent.AllowedTools, AgentToolName) {
440 agentTool, err := c.agentTool(ctx)
441 if err != nil {
442 return nil, err
443 }
444 allTools = append(allTools, agentTool)
445 }
446
447 if slices.Contains(agent.AllowedTools, tools.AgenticFetchToolName) {
448 agenticFetchTool, err := c.agenticFetchTool(ctx, nil)
449 if err != nil {
450 return nil, err
451 }
452 allTools = append(allTools, agenticFetchTool)
453 }
454
455 // Get the model name for the agent
456 modelName := ""
457 if modelCfg, ok := c.cfg.Config().Models[agent.Model]; ok {
458 if model := c.cfg.Config().GetModel(modelCfg.Provider, modelCfg.Model); model != nil {
459 modelName = model.Name
460 }
461 }
462
463 logFile := filepath.Join(c.cfg.Config().Options.DataDirectory, "logs", "crush.log")
464
465 allTools = append(allTools,
466 tools.NewBashTool(c.permissions, c.cfg.WorkingDir(), c.cfg.Config().Options.Attribution, modelName),
467 tools.NewCrushInfoTool(c.cfg, c.lspManager, c.allSkills, c.activeSkills, c.skillTracker),
468 tools.NewCrushLogsTool(logFile),
469 tools.NewJobOutputTool(),
470 tools.NewJobKillTool(),
471 tools.NewDownloadTool(c.permissions, c.cfg.WorkingDir(), nil),
472 tools.NewEditTool(c.lspManager, c.permissions, c.history, c.filetracker, c.cfg.WorkingDir()),
473 tools.NewMultiEditTool(c.lspManager, c.permissions, c.history, c.filetracker, c.cfg.WorkingDir()),
474 tools.NewFetchTool(c.permissions, c.cfg.WorkingDir(), nil),
475 tools.NewGlobTool(c.cfg.WorkingDir()),
476 tools.NewGrepTool(c.cfg.WorkingDir(), c.cfg.Config().Tools.Grep),
477 tools.NewLsTool(c.permissions, c.cfg.WorkingDir(), c.cfg.Config().Tools.Ls),
478 tools.NewSourcegraphTool(nil),
479 tools.NewTodosTool(c.sessions),
480 tools.NewViewTool(c.lspManager, c.permissions, c.filetracker, c.skillTracker, c.cfg.WorkingDir(), c.cfg.Config().Options.SkillsPaths...),
481 tools.NewWriteTool(c.lspManager, c.permissions, c.history, c.filetracker, c.cfg.WorkingDir()),
482 )
483
484 // Add LSP tools if user has configured LSPs or auto_lsp is enabled (nil or true).
485 if len(c.cfg.Config().LSP) > 0 || c.cfg.Config().Options.AutoLSP == nil || *c.cfg.Config().Options.AutoLSP {
486 allTools = append(allTools, tools.NewDiagnosticsTool(c.lspManager), tools.NewReferencesTool(c.lspManager), tools.NewLSPRestartTool(c.lspManager))
487 }
488
489 if len(c.cfg.Config().MCP) > 0 {
490 allTools = append(
491 allTools,
492 tools.NewListMCPResourcesTool(c.cfg, c.permissions),
493 tools.NewReadMCPResourceTool(c.cfg, c.permissions),
494 )
495 }
496
497 var filteredTools []fantasy.AgentTool
498 for _, tool := range allTools {
499 if slices.Contains(agent.AllowedTools, tool.Info().Name) {
500 filteredTools = append(filteredTools, tool)
501 }
502 }
503
504 for _, tool := range tools.GetMCPTools(c.permissions, c.cfg, c.cfg.WorkingDir()) {
505 if agent.AllowedMCP == nil {
506 // No MCP restrictions
507 filteredTools = append(filteredTools, tool)
508 continue
509 }
510 if len(agent.AllowedMCP) == 0 {
511 // No MCPs allowed
512 slog.Debug("No MCPs allowed", "tool", tool.Name(), "agent", agent.Name)
513 break
514 }
515
516 for mcp, tools := range agent.AllowedMCP {
517 if mcp != tool.MCP() {
518 continue
519 }
520 if len(tools) == 0 || slices.Contains(tools, tool.MCPToolName()) {
521 filteredTools = append(filteredTools, tool)
522 break
523 }
524 slog.Debug("MCP not allowed", "tool", tool.Name(), "agent", agent.Name)
525 }
526 }
527 slices.SortFunc(filteredTools, func(a, b fantasy.AgentTool) int {
528 return strings.Compare(a.Info().Name, b.Info().Name)
529 })
530 return filteredTools, nil
531}
532
533// TODO: when we support multiple agents we need to change this so that we pass in the agent specific model config
534func (c *coordinator) buildAgentModels(ctx context.Context, isSubAgent bool) (Model, Model, error) {
535 largeModelCfg, ok := c.cfg.Config().Models[config.SelectedModelTypeLarge]
536 if !ok {
537 return Model{}, Model{}, errLargeModelNotSelected
538 }
539 smallModelCfg, ok := c.cfg.Config().Models[config.SelectedModelTypeSmall]
540 if !ok {
541 return Model{}, Model{}, errSmallModelNotSelected
542 }
543
544 largeProviderCfg, ok := c.cfg.Config().Providers.Get(largeModelCfg.Provider)
545 if !ok {
546 return Model{}, Model{}, errLargeModelProviderNotConfigured
547 }
548
549 largeProvider, err := c.buildProvider(largeProviderCfg, largeModelCfg, isSubAgent)
550 if err != nil {
551 return Model{}, Model{}, err
552 }
553
554 smallProviderCfg, ok := c.cfg.Config().Providers.Get(smallModelCfg.Provider)
555 if !ok {
556 return Model{}, Model{}, errSmallModelProviderNotConfigured
557 }
558
559 smallProvider, err := c.buildProvider(smallProviderCfg, smallModelCfg, true)
560 if err != nil {
561 return Model{}, Model{}, err
562 }
563
564 var largeCatwalkModel *catwalk.Model
565 var smallCatwalkModel *catwalk.Model
566
567 for _, m := range largeProviderCfg.Models {
568 if m.ID == largeModelCfg.Model {
569 largeCatwalkModel = &m
570 }
571 }
572 for _, m := range smallProviderCfg.Models {
573 if m.ID == smallModelCfg.Model {
574 smallCatwalkModel = &m
575 }
576 }
577
578 if largeCatwalkModel == nil {
579 return Model{}, Model{}, errLargeModelNotFound
580 }
581
582 if smallCatwalkModel == nil {
583 return Model{}, Model{}, errSmallModelNotFound
584 }
585
586 largeModelID := largeModelCfg.Model
587 smallModelID := smallModelCfg.Model
588
589 if largeModelCfg.Provider == openrouter.Name && isExactoSupported(largeModelID) {
590 largeModelID += ":exacto"
591 }
592
593 if smallModelCfg.Provider == openrouter.Name && isExactoSupported(smallModelID) {
594 smallModelID += ":exacto"
595 }
596
597 largeModel, err := largeProvider.LanguageModel(ctx, largeModelID)
598 if err != nil {
599 return Model{}, Model{}, err
600 }
601 smallModel, err := smallProvider.LanguageModel(ctx, smallModelID)
602 if err != nil {
603 return Model{}, Model{}, err
604 }
605
606 return Model{
607 Model: largeModel,
608 CatwalkCfg: *largeCatwalkModel,
609 ModelCfg: largeModelCfg,
610 }, Model{
611 Model: smallModel,
612 CatwalkCfg: *smallCatwalkModel,
613 ModelCfg: smallModelCfg,
614 }, nil
615}
616
617func (c *coordinator) buildAnthropicProvider(baseURL, apiKey string, headers map[string]string, providerID string) (fantasy.Provider, error) {
618 var opts []anthropic.Option
619
620 switch {
621 case strings.HasPrefix(apiKey, "Bearer "):
622 // NOTE: Prevent the SDK from picking up the API key from env.
623 os.Setenv("ANTHROPIC_API_KEY", "")
624 headers["Authorization"] = apiKey
625 case providerID == string(catwalk.InferenceProviderMiniMax) || providerID == string(catwalk.InferenceProviderMiniMaxChina):
626 // NOTE: Prevent the SDK from picking up the API key from env.
627 os.Setenv("ANTHROPIC_API_KEY", "")
628 headers["Authorization"] = "Bearer " + apiKey
629 case apiKey != "":
630 // X-Api-Key header
631 opts = append(opts, anthropic.WithAPIKey(apiKey))
632 }
633
634 if len(headers) > 0 {
635 opts = append(opts, anthropic.WithHeaders(headers))
636 }
637
638 if baseURL != "" {
639 opts = append(opts, anthropic.WithBaseURL(baseURL))
640 }
641
642 if c.cfg.Config().Options.Debug {
643 httpClient := log.NewHTTPClient()
644 opts = append(opts, anthropic.WithHTTPClient(httpClient))
645 }
646 return anthropic.New(opts...)
647}
648
649func (c *coordinator) buildOpenaiProvider(baseURL, apiKey string, headers map[string]string) (fantasy.Provider, error) {
650 opts := []openai.Option{
651 openai.WithAPIKey(apiKey),
652 openai.WithUseResponsesAPI(),
653 }
654 if c.cfg.Config().Options.Debug {
655 httpClient := log.NewHTTPClient()
656 opts = append(opts, openai.WithHTTPClient(httpClient))
657 }
658 if len(headers) > 0 {
659 opts = append(opts, openai.WithHeaders(headers))
660 }
661 if baseURL != "" {
662 opts = append(opts, openai.WithBaseURL(baseURL))
663 }
664 return openai.New(opts...)
665}
666
667func (c *coordinator) buildOpenrouterProvider(_, apiKey string, headers map[string]string) (fantasy.Provider, error) {
668 opts := []openrouter.Option{
669 openrouter.WithAPIKey(apiKey),
670 }
671 if c.cfg.Config().Options.Debug {
672 httpClient := log.NewHTTPClient()
673 opts = append(opts, openrouter.WithHTTPClient(httpClient))
674 }
675 if len(headers) > 0 {
676 opts = append(opts, openrouter.WithHeaders(headers))
677 }
678 return openrouter.New(opts...)
679}
680
681func (c *coordinator) buildVercelProvider(_, apiKey string, headers map[string]string) (fantasy.Provider, error) {
682 opts := []vercel.Option{
683 vercel.WithAPIKey(apiKey),
684 }
685 if c.cfg.Config().Options.Debug {
686 httpClient := log.NewHTTPClient()
687 opts = append(opts, vercel.WithHTTPClient(httpClient))
688 }
689 if len(headers) > 0 {
690 opts = append(opts, vercel.WithHeaders(headers))
691 }
692 return vercel.New(opts...)
693}
694
695func (c *coordinator) buildOpenaiCompatProvider(baseURL, apiKey string, headers map[string]string, extraBody map[string]any, providerID string, isSubAgent bool) (fantasy.Provider, error) {
696 opts := []openaicompat.Option{
697 openaicompat.WithBaseURL(baseURL),
698 openaicompat.WithAPIKey(apiKey),
699 }
700
701 // Set HTTP client based on provider and debug mode.
702 var httpClient *http.Client
703 if providerID == string(catwalk.InferenceProviderCopilot) {
704 opts = append(opts, openaicompat.WithUseResponsesAPI())
705 httpClient = copilot.NewClient(isSubAgent, c.cfg.Config().Options.Debug)
706 } else if c.cfg.Config().Options.Debug {
707 httpClient = log.NewHTTPClient()
708 }
709 if httpClient != nil {
710 opts = append(opts, openaicompat.WithHTTPClient(httpClient))
711 }
712
713 if len(headers) > 0 {
714 opts = append(opts, openaicompat.WithHeaders(headers))
715 }
716
717 for extraKey, extraValue := range extraBody {
718 opts = append(opts, openaicompat.WithSDKOptions(openaisdk.WithJSONSet(extraKey, extraValue)))
719 }
720
721 return openaicompat.New(opts...)
722}
723
724func (c *coordinator) buildAzureProvider(baseURL, apiKey string, headers map[string]string, options map[string]string) (fantasy.Provider, error) {
725 opts := []azure.Option{
726 azure.WithBaseURL(baseURL),
727 azure.WithAPIKey(apiKey),
728 azure.WithUseResponsesAPI(),
729 }
730 if c.cfg.Config().Options.Debug {
731 httpClient := log.NewHTTPClient()
732 opts = append(opts, azure.WithHTTPClient(httpClient))
733 }
734 if options == nil {
735 options = make(map[string]string)
736 }
737 if apiVersion, ok := options["apiVersion"]; ok {
738 opts = append(opts, azure.WithAPIVersion(apiVersion))
739 }
740 if len(headers) > 0 {
741 opts = append(opts, azure.WithHeaders(headers))
742 }
743
744 return azure.New(opts...)
745}
746
747func (c *coordinator) buildBedrockProvider(apiKey string, headers map[string]string) (fantasy.Provider, error) {
748 var opts []bedrock.Option
749 if c.cfg.Config().Options.Debug {
750 httpClient := log.NewHTTPClient()
751 opts = append(opts, bedrock.WithHTTPClient(httpClient))
752 }
753 if len(headers) > 0 {
754 opts = append(opts, bedrock.WithHeaders(headers))
755 }
756 switch {
757 case apiKey != "":
758 opts = append(opts, bedrock.WithAPIKey(apiKey))
759 case os.Getenv("AWS_BEARER_TOKEN_BEDROCK") != "":
760 opts = append(opts, bedrock.WithAPIKey(os.Getenv("AWS_BEARER_TOKEN_BEDROCK")))
761 default:
762 // Skip, let the SDK do authentication.
763 }
764 return bedrock.New(opts...)
765}
766
767func (c *coordinator) buildGoogleProvider(baseURL, apiKey string, headers map[string]string) (fantasy.Provider, error) {
768 opts := []google.Option{
769 google.WithBaseURL(baseURL),
770 google.WithGeminiAPIKey(apiKey),
771 }
772 if c.cfg.Config().Options.Debug {
773 httpClient := log.NewHTTPClient()
774 opts = append(opts, google.WithHTTPClient(httpClient))
775 }
776 if len(headers) > 0 {
777 opts = append(opts, google.WithHeaders(headers))
778 }
779 return google.New(opts...)
780}
781
782func (c *coordinator) buildGoogleVertexProvider(headers map[string]string, options map[string]string) (fantasy.Provider, error) {
783 opts := []google.Option{}
784 if c.cfg.Config().Options.Debug {
785 httpClient := log.NewHTTPClient()
786 opts = append(opts, google.WithHTTPClient(httpClient))
787 }
788 if len(headers) > 0 {
789 opts = append(opts, google.WithHeaders(headers))
790 }
791
792 project := options["project"]
793 location := options["location"]
794
795 opts = append(opts, google.WithVertex(project, location))
796
797 return google.New(opts...)
798}
799
800func (c *coordinator) buildHyperProvider(apiKey string) (fantasy.Provider, error) {
801 opts := []hyper.Option{
802 hyper.WithAPIKey(apiKey),
803 }
804 if c.cfg.Config().Options.Debug {
805 httpClient := log.NewHTTPClient()
806 opts = append(opts, hyper.WithHTTPClient(httpClient))
807 }
808 return hyper.New(opts...)
809}
810
811func (c *coordinator) isAnthropicThinking(model config.SelectedModel) bool {
812 if model.Think {
813 return true
814 }
815 opts, err := anthropic.ParseOptions(model.ProviderOptions)
816 return err == nil && opts.Thinking != nil
817}
818
819func (c *coordinator) buildProvider(providerCfg config.ProviderConfig, model config.SelectedModel, isSubAgent bool) (fantasy.Provider, error) {
820 headers := maps.Clone(providerCfg.ExtraHeaders)
821 if headers == nil {
822 headers = make(map[string]string)
823 }
824
825 // handle special headers for anthropic
826 if providerCfg.Type == anthropic.Name && c.isAnthropicThinking(model) {
827 if v, ok := headers["anthropic-beta"]; ok {
828 headers["anthropic-beta"] = v + ",interleaved-thinking-2025-05-14"
829 } else {
830 headers["anthropic-beta"] = "interleaved-thinking-2025-05-14"
831 }
832 }
833
834 apiKey, _ := c.cfg.Resolve(providerCfg.APIKey)
835 baseURL, _ := c.cfg.Resolve(providerCfg.BaseURL)
836
837 switch providerCfg.Type {
838 case openai.Name:
839 return c.buildOpenaiProvider(baseURL, apiKey, headers)
840 case anthropic.Name:
841 return c.buildAnthropicProvider(baseURL, apiKey, headers, providerCfg.ID)
842 case openrouter.Name:
843 return c.buildOpenrouterProvider(baseURL, apiKey, headers)
844 case vercel.Name:
845 return c.buildVercelProvider(baseURL, apiKey, headers)
846 case azure.Name:
847 return c.buildAzureProvider(baseURL, apiKey, headers, providerCfg.ExtraParams)
848 case bedrock.Name:
849 return c.buildBedrockProvider(apiKey, headers)
850 case google.Name:
851 return c.buildGoogleProvider(baseURL, apiKey, headers)
852 case "google-vertex":
853 return c.buildGoogleVertexProvider(headers, providerCfg.ExtraParams)
854 case openaicompat.Name:
855 if providerCfg.ID == string(catwalk.InferenceProviderZAI) {
856 if providerCfg.ExtraBody == nil {
857 providerCfg.ExtraBody = map[string]any{}
858 }
859 providerCfg.ExtraBody["tool_stream"] = true
860 }
861 return c.buildOpenaiCompatProvider(baseURL, apiKey, headers, providerCfg.ExtraBody, providerCfg.ID, isSubAgent)
862 case hyper.Name:
863 return c.buildHyperProvider(apiKey)
864 default:
865 return nil, fmt.Errorf("provider type not supported: %q", providerCfg.Type)
866 }
867}
868
869func isExactoSupported(modelID string) bool {
870 supportedModels := []string{
871 "moonshotai/kimi-k2-0905",
872 "deepseek/deepseek-v3.1-terminus",
873 "z-ai/glm-4.6",
874 "openai/gpt-oss-120b",
875 "qwen/qwen3-coder",
876 }
877 return slices.Contains(supportedModels, modelID)
878}
879
880func (c *coordinator) Cancel(sessionID string) {
881 c.currentAgent.Cancel(sessionID)
882}
883
884func (c *coordinator) CancelAll() {
885 c.currentAgent.CancelAll()
886}
887
888func (c *coordinator) ClearQueue(sessionID string) {
889 c.currentAgent.ClearQueue(sessionID)
890}
891
892func (c *coordinator) IsBusy() bool {
893 return c.currentAgent.IsBusy()
894}
895
896func (c *coordinator) IsSessionBusy(sessionID string) bool {
897 return c.currentAgent.IsSessionBusy(sessionID)
898}
899
900func (c *coordinator) Model() Model {
901 return c.currentAgent.Model()
902}
903
904func (c *coordinator) UpdateModels(ctx context.Context) error {
905 // build the models again so we make sure we get the latest config
906 large, small, err := c.buildAgentModels(ctx, false)
907 if err != nil {
908 return err
909 }
910 c.currentAgent.SetModels(large, small)
911
912 agentCfg, ok := c.cfg.Config().Agents[config.AgentCoder]
913 if !ok {
914 return errCoderAgentNotConfigured
915 }
916
917 tools, err := c.buildTools(ctx, agentCfg)
918 if err != nil {
919 return err
920 }
921 c.currentAgent.SetTools(tools)
922 return nil
923}
924
925func (c *coordinator) QueuedPrompts(sessionID string) int {
926 return c.currentAgent.QueuedPrompts(sessionID)
927}
928
929func (c *coordinator) QueuedPromptsList(sessionID string) []string {
930 return c.currentAgent.QueuedPromptsList(sessionID)
931}
932
933func (c *coordinator) Summarize(ctx context.Context, sessionID string) error {
934 providerCfg, ok := c.cfg.Config().Providers.Get(c.currentAgent.Model().ModelCfg.Provider)
935 if !ok {
936 return errModelProviderNotConfigured
937 }
938 return c.currentAgent.Summarize(ctx, sessionID, getProviderOptions(c.currentAgent.Model(), providerCfg))
939}
940
941func (c *coordinator) isUnauthorized(err error) bool {
942 var providerErr *fantasy.ProviderError
943 return (errors.As(err, &providerErr) && providerErr.StatusCode == http.StatusUnauthorized) ||
944 errors.Is(err, hyper.ErrUnauthorized)
945}
946
947func (c *coordinator) refreshOAuth2Token(ctx context.Context, providerCfg config.ProviderConfig) error {
948 if err := c.cfg.RefreshOAuthToken(ctx, config.ScopeGlobal, providerCfg.ID); err != nil {
949 slog.Error("Failed to refresh OAuth token after 401 error", "provider", providerCfg.ID, "error", err)
950 return err
951 }
952 if err := c.UpdateModels(ctx); err != nil {
953 return err
954 }
955 return nil
956}
957
958func (c *coordinator) refreshApiKeyTemplate(ctx context.Context, providerCfg config.ProviderConfig) error {
959 newAPIKey, err := c.cfg.Resolve(providerCfg.APIKeyTemplate)
960 if err != nil {
961 slog.Error("Failed to re-resolve API key after 401 error", "provider", providerCfg.ID, "error", err)
962 return err
963 }
964
965 providerCfg.APIKey = newAPIKey
966 c.cfg.Config().Providers.Set(providerCfg.ID, providerCfg)
967
968 if err := c.UpdateModels(ctx); err != nil {
969 return err
970 }
971 return nil
972}
973
974// subAgentParams holds the parameters for running a sub-agent.
975type subAgentParams struct {
976 Agent SessionAgent
977 SessionID string
978 AgentMessageID string
979 ToolCallID string
980 Prompt string
981 SessionTitle string
982 // SessionSetup is an optional callback invoked after session creation
983 // but before agent execution, for custom session configuration.
984 SessionSetup func(sessionID string)
985}
986
987// runSubAgent runs a sub-agent and handles session management and cost accumulation.
988// It creates a sub-session, runs the agent with the given prompt, and propagates
989// the cost to the parent session.
990func (c *coordinator) runSubAgent(ctx context.Context, params subAgentParams) (fantasy.ToolResponse, error) {
991 // Create sub-session
992 agentToolSessionID := c.sessions.CreateAgentToolSessionID(params.AgentMessageID, params.ToolCallID)
993 session, err := c.sessions.CreateTaskSession(ctx, agentToolSessionID, params.SessionID, params.SessionTitle)
994 if err != nil {
995 return fantasy.ToolResponse{}, fmt.Errorf("create session: %w", err)
996 }
997
998 // Call session setup function if provided
999 if params.SessionSetup != nil {
1000 params.SessionSetup(session.ID)
1001 }
1002
1003 // Get model configuration
1004 model := params.Agent.Model()
1005 maxTokens := model.CatwalkCfg.DefaultMaxTokens
1006 if model.ModelCfg.MaxTokens != 0 {
1007 maxTokens = model.ModelCfg.MaxTokens
1008 }
1009
1010 providerCfg, ok := c.cfg.Config().Providers.Get(model.ModelCfg.Provider)
1011 if !ok {
1012 return fantasy.ToolResponse{}, errModelProviderNotConfigured
1013 }
1014
1015 // Run the agent
1016 result, err := params.Agent.Run(ctx, SessionAgentCall{
1017 SessionID: session.ID,
1018 Prompt: params.Prompt,
1019 MaxOutputTokens: maxTokens,
1020 ProviderOptions: getProviderOptions(model, providerCfg),
1021 Temperature: model.ModelCfg.Temperature,
1022 TopP: model.ModelCfg.TopP,
1023 TopK: model.ModelCfg.TopK,
1024 FrequencyPenalty: model.ModelCfg.FrequencyPenalty,
1025 PresencePenalty: model.ModelCfg.PresencePenalty,
1026 NonInteractive: true,
1027 })
1028 if err != nil {
1029 return fantasy.NewTextErrorResponse("error generating response"), nil
1030 }
1031
1032 // Update parent session cost
1033 if err := c.updateParentSessionCost(ctx, session.ID, params.SessionID); err != nil {
1034 return fantasy.ToolResponse{}, err
1035 }
1036
1037 return fantasy.NewTextResponse(result.Response.Content.Text()), nil
1038}
1039
1040// updateParentSessionCost accumulates the cost from a child session to its parent session.
1041func (c *coordinator) updateParentSessionCost(ctx context.Context, childSessionID, parentSessionID string) error {
1042 childSession, err := c.sessions.Get(ctx, childSessionID)
1043 if err != nil {
1044 return fmt.Errorf("get child session: %w", err)
1045 }
1046
1047 parentSession, err := c.sessions.Get(ctx, parentSessionID)
1048 if err != nil {
1049 return fmt.Errorf("get parent session: %w", err)
1050 }
1051
1052 parentSession.Cost += childSession.Cost
1053
1054 if _, err := c.sessions.Save(ctx, parentSession); err != nil {
1055 return fmt.Errorf("save parent session: %w", err)
1056 }
1057
1058 return nil
1059}
1060
1061// discoverSkills runs the skill discovery pipeline and returns both the
1062// pre-filter (all discovered, after dedup) and post-filter (active) lists.
1063func discoverSkills(cfg *config.ConfigStore) (allSkills, activeSkills []*skills.Skill) {
1064 discovered := skills.DiscoverBuiltin()
1065
1066 opts := cfg.Config().Options
1067 if opts != nil && len(opts.SkillsPaths) > 0 {
1068 expandedPaths := make([]string, 0, len(opts.SkillsPaths))
1069 for _, pth := range opts.SkillsPaths {
1070 expanded := home.Long(pth)
1071 if strings.HasPrefix(expanded, "$") {
1072 if resolved, err := cfg.Resolver().ResolveValue(expanded); err == nil {
1073 expanded = resolved
1074 }
1075 }
1076 expandedPaths = append(expandedPaths, expanded)
1077 }
1078 discovered = append(discovered, skills.Discover(expandedPaths)...)
1079 }
1080
1081 allSkills = skills.Deduplicate(discovered)
1082 var disabledSkills []string
1083 if opts != nil {
1084 disabledSkills = opts.DisabledSkills
1085 }
1086 activeSkills = skills.Filter(allSkills, disabledSkills)
1087 return allSkills, activeSkills
1088}