1package agent
2
3import (
4 "bytes"
5 "cmp"
6 "context"
7 "encoding/json"
8 "errors"
9 "fmt"
10 "io"
11 "log/slog"
12 "maps"
13 "net/http"
14 "os"
15 "path/filepath"
16 "slices"
17 "strings"
18
19 "charm.land/catwalk/pkg/catwalk"
20 "charm.land/fantasy"
21 "github.com/charmbracelet/crush/internal/agent/hyper"
22 "github.com/charmbracelet/crush/internal/agent/notify"
23 "github.com/charmbracelet/crush/internal/agent/prompt"
24 "github.com/charmbracelet/crush/internal/agent/tools"
25 "github.com/charmbracelet/crush/internal/config"
26 "github.com/charmbracelet/crush/internal/event"
27 "github.com/charmbracelet/crush/internal/filetracker"
28 "github.com/charmbracelet/crush/internal/history"
29 "github.com/charmbracelet/crush/internal/home"
30 "github.com/charmbracelet/crush/internal/log"
31 "github.com/charmbracelet/crush/internal/lsp"
32 "github.com/charmbracelet/crush/internal/message"
33 "github.com/charmbracelet/crush/internal/oauth/copilot"
34 "github.com/charmbracelet/crush/internal/permission"
35 "github.com/charmbracelet/crush/internal/pubsub"
36 "github.com/charmbracelet/crush/internal/session"
37 "github.com/charmbracelet/crush/internal/skills"
38 "golang.org/x/sync/errgroup"
39
40 "charm.land/fantasy/providers/anthropic"
41 "charm.land/fantasy/providers/azure"
42 "charm.land/fantasy/providers/bedrock"
43 "charm.land/fantasy/providers/google"
44 "charm.land/fantasy/providers/openai"
45 "charm.land/fantasy/providers/openaicompat"
46 "charm.land/fantasy/providers/openrouter"
47 "charm.land/fantasy/providers/vercel"
48 openaisdk "github.com/charmbracelet/openai-go/option"
49 "github.com/qjebbs/go-jsons"
50)
51
52// Coordinator errors.
53var (
54 errCoderAgentNotConfigured = errors.New("coder agent not configured")
55 errModelProviderNotConfigured = errors.New("model provider not configured")
56 errLargeModelNotSelected = errors.New("large model not selected")
57 errSmallModelNotSelected = errors.New("small model not selected")
58 errLargeModelProviderNotConfigured = errors.New("large model provider not configured")
59 errSmallModelProviderNotConfigured = errors.New("small model provider not configured")
60 errLargeModelNotFound = errors.New("large model not found in provider config")
61 errSmallModelNotFound = errors.New("small model not found in provider config")
62)
63
64type Coordinator interface {
65 // INFO: (kujtim) this is not used yet we will use this when we have multiple agents
66 // SetMainAgent(string)
67 Run(ctx context.Context, sessionID, prompt string, attachments ...message.Attachment) (*fantasy.AgentResult, error)
68 Cancel(sessionID string)
69 CancelAll()
70 IsSessionBusy(sessionID string) bool
71 IsBusy() bool
72 QueuedPrompts(sessionID string) int
73 QueuedPromptsList(sessionID string) []string
74 ClearQueue(sessionID string)
75 Summarize(context.Context, string) error
76 Model() Model
77 UpdateModels(ctx context.Context) error
78}
79
80type coordinator struct {
81 cfg *config.ConfigStore
82 sessions session.Service
83 messages message.Service
84 permissions permission.Service
85 history history.Service
86 filetracker filetracker.Service
87 lspManager *lsp.Manager
88 notify pubsub.Publisher[notify.Notification]
89
90 currentAgent SessionAgent
91 agents map[string]SessionAgent
92
93 // Skills discovery results (session-start snapshot).
94 allSkills []*skills.Skill // Pre-filter: all discovered after dedup.
95 activeSkills []*skills.Skill // Post-filter: active skills only.
96 skillTracker *skills.Tracker
97
98 readyWg errgroup.Group
99}
100
101func NewCoordinator(
102 ctx context.Context,
103 cfg *config.ConfigStore,
104 sessions session.Service,
105 messages message.Service,
106 permissions permission.Service,
107 history history.Service,
108 filetracker filetracker.Service,
109 lspManager *lsp.Manager,
110 notify pubsub.Publisher[notify.Notification],
111) (Coordinator, error) {
112 // Discover skills once at session start.
113 allSkills, activeSkills := discoverSkills(cfg)
114 skillTracker := skills.NewTracker(activeSkills)
115
116 c := &coordinator{
117 cfg: cfg,
118 sessions: sessions,
119 messages: messages,
120 permissions: permissions,
121 history: history,
122 filetracker: filetracker,
123 lspManager: lspManager,
124 notify: notify,
125 agents: make(map[string]SessionAgent),
126 allSkills: allSkills,
127 activeSkills: activeSkills,
128 skillTracker: skillTracker,
129 }
130
131 agentCfg, ok := cfg.Config().Agents[config.AgentCoder]
132 if !ok {
133 return nil, errCoderAgentNotConfigured
134 }
135
136 // TODO: make this dynamic when we support multiple agents
137 prompt, err := coderPrompt(prompt.WithWorkingDir(c.cfg.WorkingDir()))
138 if err != nil {
139 return nil, err
140 }
141
142 agent, err := c.buildAgent(ctx, prompt, agentCfg, false)
143 if err != nil {
144 return nil, err
145 }
146 c.currentAgent = agent
147 c.agents[config.AgentCoder] = agent
148 return c, nil
149}
150
151// Run implements Coordinator.
152func (c *coordinator) Run(ctx context.Context, sessionID string, prompt string, attachments ...message.Attachment) (*fantasy.AgentResult, error) {
153 if err := c.readyWg.Wait(); err != nil {
154 return nil, err
155 }
156
157 // refresh models before each run
158 if err := c.UpdateModels(ctx); err != nil {
159 return nil, fmt.Errorf("failed to update models: %w", err)
160 }
161
162 model := c.currentAgent.Model()
163 maxTokens := model.CatwalkCfg.DefaultMaxTokens
164 if model.ModelCfg.MaxTokens != 0 {
165 maxTokens = model.ModelCfg.MaxTokens
166 }
167
168 if !model.CatwalkCfg.SupportsImages && attachments != nil {
169 // filter out image attachments
170 filteredAttachments := make([]message.Attachment, 0, len(attachments))
171 for _, att := range attachments {
172 if att.IsText() {
173 filteredAttachments = append(filteredAttachments, att)
174 }
175 }
176 attachments = filteredAttachments
177 }
178
179 providerCfg, ok := c.cfg.Config().Providers.Get(model.ModelCfg.Provider)
180 if !ok {
181 return nil, errModelProviderNotConfigured
182 }
183
184 mergedOptions, temp, topP, topK, freqPenalty, presPenalty := mergeCallOptions(model, providerCfg)
185
186 if providerCfg.OAuthToken != nil && providerCfg.OAuthToken.IsExpired() {
187 slog.Debug("Token needs to be refreshed", "provider", providerCfg.ID)
188 if err := c.refreshOAuth2Token(ctx, providerCfg); err != nil {
189 return nil, err
190 }
191 }
192
193 run := func() (*fantasy.AgentResult, error) {
194 return c.currentAgent.Run(ctx, SessionAgentCall{
195 SessionID: sessionID,
196 Prompt: prompt,
197 Attachments: attachments,
198 MaxOutputTokens: maxTokens,
199 ProviderOptions: mergedOptions,
200 Temperature: temp,
201 TopP: topP,
202 TopK: topK,
203 FrequencyPenalty: freqPenalty,
204 PresencePenalty: presPenalty,
205 })
206 }
207 result, originalErr := run()
208
209 if c.isUnauthorized(originalErr) {
210 switch {
211 case providerCfg.OAuthToken != nil:
212 slog.Debug("Received 401. Refreshing token and retrying", "provider", providerCfg.ID)
213 if err := c.refreshOAuth2Token(ctx, providerCfg); err != nil {
214 return nil, originalErr
215 }
216 slog.Debug("Retrying request with refreshed OAuth token", "provider", providerCfg.ID)
217 return run()
218 case strings.Contains(providerCfg.APIKeyTemplate, "$"):
219 slog.Debug("Received 401. Refreshing API Key template and retrying", "provider", providerCfg.ID)
220 if err := c.refreshApiKeyTemplate(ctx, providerCfg); err != nil {
221 return nil, originalErr
222 }
223 slog.Debug("Retrying request with refreshed API key", "provider", providerCfg.ID)
224 return run()
225 }
226 }
227
228 return result, originalErr
229}
230
231func getProviderOptions(model Model, providerCfg config.ProviderConfig) fantasy.ProviderOptions {
232 options := fantasy.ProviderOptions{}
233
234 cfgOpts := []byte("{}")
235 providerCfgOpts := []byte("{}")
236 catwalkOpts := []byte("{}")
237
238 if model.ModelCfg.ProviderOptions != nil {
239 data, err := json.Marshal(model.ModelCfg.ProviderOptions)
240 if err == nil {
241 cfgOpts = data
242 }
243 }
244
245 if providerCfg.ProviderOptions != nil {
246 data, err := json.Marshal(providerCfg.ProviderOptions)
247 if err == nil {
248 providerCfgOpts = data
249 }
250 }
251
252 if model.CatwalkCfg.Options.ProviderOptions != nil {
253 data, err := json.Marshal(model.CatwalkCfg.Options.ProviderOptions)
254 if err == nil {
255 catwalkOpts = data
256 }
257 }
258
259 readers := []io.Reader{
260 bytes.NewReader(catwalkOpts),
261 bytes.NewReader(providerCfgOpts),
262 bytes.NewReader(cfgOpts),
263 }
264
265 got, err := jsons.Merge(readers)
266 if err != nil {
267 slog.Error("Could not merge call config", "err", err)
268 return options
269 }
270
271 mergedOptions := make(map[string]any)
272
273 err = json.Unmarshal([]byte(got), &mergedOptions)
274 if err != nil {
275 slog.Error("Could not create config for call", "err", err)
276 return options
277 }
278
279 providerType := providerCfg.Type
280 if providerType == "hyper" {
281 if strings.Contains(model.CatwalkCfg.ID, "claude") {
282 providerType = anthropic.Name
283 } else if strings.Contains(model.CatwalkCfg.ID, "gpt") {
284 providerType = openai.Name
285 } else if strings.Contains(model.CatwalkCfg.ID, "gemini") {
286 providerType = google.Name
287 } else {
288 providerType = openaicompat.Name
289 }
290 }
291
292 switch providerType {
293 case openai.Name, azure.Name:
294 _, hasReasoningEffort := mergedOptions["reasoning_effort"]
295 if !hasReasoningEffort && model.ModelCfg.ReasoningEffort != "" {
296 mergedOptions["reasoning_effort"] = model.ModelCfg.ReasoningEffort
297 }
298 if openai.IsResponsesModel(model.CatwalkCfg.ID) {
299 if openai.IsResponsesReasoningModel(model.CatwalkCfg.ID) {
300 mergedOptions["reasoning_summary"] = "auto"
301 mergedOptions["include"] = []openai.IncludeType{openai.IncludeReasoningEncryptedContent}
302 }
303 parsed, err := openai.ParseResponsesOptions(mergedOptions)
304 if err == nil {
305 options[openai.Name] = parsed
306 }
307 } else {
308 parsed, err := openai.ParseOptions(mergedOptions)
309 if err == nil {
310 options[openai.Name] = parsed
311 }
312 }
313 case anthropic.Name:
314 var (
315 _, hasEffort = mergedOptions["effort"]
316 _, hasThink = mergedOptions["thinking"]
317 )
318 switch {
319 case !hasEffort && model.ModelCfg.ReasoningEffort != "":
320 mergedOptions["effort"] = model.ModelCfg.ReasoningEffort
321 case !hasThink && model.ModelCfg.Think:
322 mergedOptions["thinking"] = map[string]any{"budget_tokens": 2000}
323 }
324 parsed, err := anthropic.ParseOptions(mergedOptions)
325 if err == nil {
326 options[anthropic.Name] = parsed
327 }
328
329 case openrouter.Name:
330 _, hasReasoning := mergedOptions["reasoning"]
331 if !hasReasoning && model.ModelCfg.ReasoningEffort != "" {
332 mergedOptions["reasoning"] = map[string]any{
333 "enabled": true,
334 "effort": model.ModelCfg.ReasoningEffort,
335 }
336 }
337 parsed, err := openrouter.ParseOptions(mergedOptions)
338 if err == nil {
339 options[openrouter.Name] = parsed
340 }
341 case vercel.Name:
342 _, hasReasoning := mergedOptions["reasoning"]
343 if !hasReasoning && model.ModelCfg.ReasoningEffort != "" {
344 mergedOptions["reasoning"] = map[string]any{
345 "enabled": true,
346 "effort": model.ModelCfg.ReasoningEffort,
347 }
348 }
349 parsed, err := vercel.ParseOptions(mergedOptions)
350 if err == nil {
351 options[vercel.Name] = parsed
352 }
353 case google.Name:
354 _, hasReasoning := mergedOptions["thinking_config"]
355 if !hasReasoning {
356 if strings.HasPrefix(model.CatwalkCfg.ID, "gemini-2") {
357 mergedOptions["thinking_config"] = map[string]any{
358 "thinking_budget": 2000,
359 "include_thoughts": true,
360 }
361 } else {
362 mergedOptions["thinking_config"] = map[string]any{
363 "thinking_level": model.ModelCfg.ReasoningEffort,
364 "include_thoughts": true,
365 }
366 }
367 }
368 parsed, err := google.ParseOptions(mergedOptions)
369 if err == nil {
370 options[google.Name] = parsed
371 }
372 case openaicompat.Name:
373 _, hasReasoningEffort := mergedOptions["reasoning_effort"]
374 if !hasReasoningEffort && model.ModelCfg.ReasoningEffort != "" {
375 mergedOptions["reasoning_effort"] = model.ModelCfg.ReasoningEffort
376 }
377 parsed, err := openaicompat.ParseOptions(mergedOptions)
378 if err == nil {
379 options[openaicompat.Name] = parsed
380 }
381 }
382
383 return options
384}
385
386func mergeCallOptions(model Model, cfg config.ProviderConfig) (fantasy.ProviderOptions, *float64, *float64, *int64, *float64, *float64) {
387 modelOptions := getProviderOptions(model, cfg)
388 temp := cmp.Or(model.ModelCfg.Temperature, model.CatwalkCfg.Options.Temperature)
389 topP := cmp.Or(model.ModelCfg.TopP, model.CatwalkCfg.Options.TopP)
390 topK := cmp.Or(model.ModelCfg.TopK, model.CatwalkCfg.Options.TopK)
391 freqPenalty := cmp.Or(model.ModelCfg.FrequencyPenalty, model.CatwalkCfg.Options.FrequencyPenalty)
392 presPenalty := cmp.Or(model.ModelCfg.PresencePenalty, model.CatwalkCfg.Options.PresencePenalty)
393 return modelOptions, temp, topP, topK, freqPenalty, presPenalty
394}
395
396func (c *coordinator) buildAgent(ctx context.Context, prompt *prompt.Prompt, agent config.Agent, isSubAgent bool) (SessionAgent, error) {
397 large, small, err := c.buildAgentModels(ctx, isSubAgent)
398 if err != nil {
399 return nil, err
400 }
401
402 largeProviderCfg, _ := c.cfg.Config().Providers.Get(large.ModelCfg.Provider)
403 result := NewSessionAgent(SessionAgentOptions{
404 LargeModel: large,
405 SmallModel: small,
406 SystemPromptPrefix: largeProviderCfg.SystemPromptPrefix,
407 SystemPrompt: "",
408 IsSubAgent: isSubAgent,
409 DisableAutoSummarize: c.cfg.Config().Options.DisableAutoSummarize,
410 IsYolo: c.permissions.SkipRequests(),
411 Sessions: c.sessions,
412 Messages: c.messages,
413 Tools: nil,
414 Notify: c.notify,
415 })
416
417 c.readyWg.Go(func() error {
418 systemPrompt, err := prompt.Build(ctx, large.Model.Provider(), large.Model.Model(), c.cfg)
419 if err != nil {
420 return err
421 }
422 result.SetSystemPrompt(systemPrompt)
423 return nil
424 })
425
426 c.readyWg.Go(func() error {
427 tools, err := c.buildTools(ctx, agent)
428 if err != nil {
429 return err
430 }
431 result.SetTools(tools)
432 return nil
433 })
434
435 return result, nil
436}
437
438func (c *coordinator) buildTools(ctx context.Context, agent config.Agent) ([]fantasy.AgentTool, error) {
439 var allTools []fantasy.AgentTool
440 if slices.Contains(agent.AllowedTools, AgentToolName) {
441 agentTool, err := c.agentTool(ctx)
442 if err != nil {
443 return nil, err
444 }
445 allTools = append(allTools, agentTool)
446 }
447
448 if slices.Contains(agent.AllowedTools, tools.AgenticFetchToolName) {
449 agenticFetchTool, err := c.agenticFetchTool(ctx, nil)
450 if err != nil {
451 return nil, err
452 }
453 allTools = append(allTools, agenticFetchTool)
454 }
455
456 // Get the model name for the agent
457 modelName := ""
458 if modelCfg, ok := c.cfg.Config().Models[agent.Model]; ok {
459 if model := c.cfg.Config().GetModel(modelCfg.Provider, modelCfg.Model); model != nil {
460 modelName = model.Name
461 }
462 }
463
464 logFile := filepath.Join(c.cfg.Config().Options.DataDirectory, "logs", "crush.log")
465
466 allTools = append(allTools,
467 tools.NewBashTool(c.permissions, c.cfg.WorkingDir(), c.cfg.Config().Options.Attribution, modelName),
468 tools.NewCrushInfoTool(c.cfg, c.lspManager, c.allSkills, c.activeSkills, c.skillTracker),
469 tools.NewCrushLogsTool(logFile),
470 tools.NewJobOutputTool(),
471 tools.NewJobKillTool(),
472 tools.NewDownloadTool(c.permissions, c.cfg.WorkingDir(), nil),
473 tools.NewEditTool(c.lspManager, c.permissions, c.history, c.filetracker, c.cfg.WorkingDir()),
474 tools.NewMultiEditTool(c.lspManager, c.permissions, c.history, c.filetracker, c.cfg.WorkingDir()),
475 tools.NewFetchTool(c.permissions, c.cfg.WorkingDir(), nil),
476 tools.NewGlobTool(c.cfg.WorkingDir()),
477 tools.NewGrepTool(c.cfg.WorkingDir(), c.cfg.Config().Tools.Grep),
478 tools.NewLsTool(c.permissions, c.cfg.WorkingDir(), c.cfg.Config().Tools.Ls),
479 tools.NewSourcegraphTool(nil),
480 tools.NewTodosTool(c.sessions),
481 tools.NewViewTool(c.lspManager, c.permissions, c.filetracker, c.skillTracker, c.cfg.WorkingDir(), c.cfg.Config().Options.SkillsPaths...),
482 tools.NewWriteTool(c.lspManager, c.permissions, c.history, c.filetracker, c.cfg.WorkingDir()),
483 )
484
485 // Add LSP tools if user has configured LSPs or auto_lsp is enabled (nil or true).
486 if len(c.cfg.Config().LSP) > 0 || c.cfg.Config().Options.AutoLSP == nil || *c.cfg.Config().Options.AutoLSP {
487 allTools = append(allTools, tools.NewDiagnosticsTool(c.lspManager), tools.NewReferencesTool(c.lspManager), tools.NewLSPRestartTool(c.lspManager))
488 }
489
490 if len(c.cfg.Config().MCP) > 0 {
491 allTools = append(
492 allTools,
493 tools.NewListMCPResourcesTool(c.cfg, c.permissions),
494 tools.NewReadMCPResourceTool(c.cfg, c.permissions),
495 )
496 }
497
498 var filteredTools []fantasy.AgentTool
499 for _, tool := range allTools {
500 if slices.Contains(agent.AllowedTools, tool.Info().Name) {
501 filteredTools = append(filteredTools, tool)
502 }
503 }
504
505 for _, tool := range tools.GetMCPTools(c.permissions, c.cfg, c.cfg.WorkingDir()) {
506 if agent.AllowedMCP == nil {
507 // No MCP restrictions
508 filteredTools = append(filteredTools, tool)
509 continue
510 }
511 if len(agent.AllowedMCP) == 0 {
512 // No MCPs allowed
513 slog.Debug("No MCPs allowed", "tool", tool.Name(), "agent", agent.Name)
514 break
515 }
516
517 for mcp, tools := range agent.AllowedMCP {
518 if mcp != tool.MCP() {
519 continue
520 }
521 if len(tools) == 0 || slices.Contains(tools, tool.MCPToolName()) {
522 filteredTools = append(filteredTools, tool)
523 break
524 }
525 slog.Debug("MCP not allowed", "tool", tool.Name(), "agent", agent.Name)
526 }
527 }
528 slices.SortFunc(filteredTools, func(a, b fantasy.AgentTool) int {
529 return strings.Compare(a.Info().Name, b.Info().Name)
530 })
531 return filteredTools, nil
532}
533
534// TODO: when we support multiple agents we need to change this so that we pass in the agent specific model config
535func (c *coordinator) buildAgentModels(ctx context.Context, isSubAgent bool) (Model, Model, error) {
536 largeModelCfg, ok := c.cfg.Config().Models[config.SelectedModelTypeLarge]
537 if !ok {
538 return Model{}, Model{}, errLargeModelNotSelected
539 }
540 smallModelCfg, ok := c.cfg.Config().Models[config.SelectedModelTypeSmall]
541 if !ok {
542 return Model{}, Model{}, errSmallModelNotSelected
543 }
544
545 largeProviderCfg, ok := c.cfg.Config().Providers.Get(largeModelCfg.Provider)
546 if !ok {
547 return Model{}, Model{}, errLargeModelProviderNotConfigured
548 }
549
550 largeProvider, err := c.buildProvider(largeProviderCfg, largeModelCfg, isSubAgent)
551 if err != nil {
552 return Model{}, Model{}, err
553 }
554
555 smallProviderCfg, ok := c.cfg.Config().Providers.Get(smallModelCfg.Provider)
556 if !ok {
557 return Model{}, Model{}, errSmallModelProviderNotConfigured
558 }
559
560 smallProvider, err := c.buildProvider(smallProviderCfg, smallModelCfg, true)
561 if err != nil {
562 return Model{}, Model{}, err
563 }
564
565 var largeCatwalkModel *catwalk.Model
566 var smallCatwalkModel *catwalk.Model
567
568 for _, m := range largeProviderCfg.Models {
569 if m.ID == largeModelCfg.Model {
570 largeCatwalkModel = &m
571 }
572 }
573 for _, m := range smallProviderCfg.Models {
574 if m.ID == smallModelCfg.Model {
575 smallCatwalkModel = &m
576 }
577 }
578
579 if largeCatwalkModel == nil {
580 return Model{}, Model{}, errLargeModelNotFound
581 }
582
583 if smallCatwalkModel == nil {
584 return Model{}, Model{}, errSmallModelNotFound
585 }
586
587 largeModelID := largeModelCfg.Model
588 smallModelID := smallModelCfg.Model
589
590 if largeModelCfg.Provider == openrouter.Name && isExactoSupported(largeModelID) {
591 largeModelID += ":exacto"
592 }
593
594 if smallModelCfg.Provider == openrouter.Name && isExactoSupported(smallModelID) {
595 smallModelID += ":exacto"
596 }
597
598 largeModel, err := largeProvider.LanguageModel(ctx, largeModelID)
599 if err != nil {
600 return Model{}, Model{}, err
601 }
602 smallModel, err := smallProvider.LanguageModel(ctx, smallModelID)
603 if err != nil {
604 return Model{}, Model{}, err
605 }
606
607 return Model{
608 Model: largeModel,
609 CatwalkCfg: *largeCatwalkModel,
610 ModelCfg: largeModelCfg,
611 }, Model{
612 Model: smallModel,
613 CatwalkCfg: *smallCatwalkModel,
614 ModelCfg: smallModelCfg,
615 }, nil
616}
617
618func (c *coordinator) buildAnthropicProvider(baseURL, apiKey string, headers map[string]string, providerID string) (fantasy.Provider, error) {
619 var opts []anthropic.Option
620
621 switch {
622 case strings.HasPrefix(apiKey, "Bearer "):
623 // NOTE: Prevent the SDK from picking up the API key from env.
624 os.Setenv("ANTHROPIC_API_KEY", "")
625 headers["Authorization"] = apiKey
626 case providerID == string(catwalk.InferenceProviderMiniMax) || providerID == string(catwalk.InferenceProviderMiniMaxChina):
627 // NOTE: Prevent the SDK from picking up the API key from env.
628 os.Setenv("ANTHROPIC_API_KEY", "")
629 headers["Authorization"] = "Bearer " + apiKey
630 case apiKey != "":
631 // X-Api-Key header
632 opts = append(opts, anthropic.WithAPIKey(apiKey))
633 }
634
635 if len(headers) > 0 {
636 opts = append(opts, anthropic.WithHeaders(headers))
637 }
638
639 if baseURL != "" {
640 opts = append(opts, anthropic.WithBaseURL(baseURL))
641 }
642
643 if c.cfg.Config().Options.Debug {
644 httpClient := log.NewHTTPClient()
645 opts = append(opts, anthropic.WithHTTPClient(httpClient))
646 }
647 return anthropic.New(opts...)
648}
649
650func (c *coordinator) buildOpenaiProvider(baseURL, apiKey string, headers map[string]string) (fantasy.Provider, error) {
651 opts := []openai.Option{
652 openai.WithAPIKey(apiKey),
653 openai.WithUseResponsesAPI(),
654 }
655 if c.cfg.Config().Options.Debug {
656 httpClient := log.NewHTTPClient()
657 opts = append(opts, openai.WithHTTPClient(httpClient))
658 }
659 if len(headers) > 0 {
660 opts = append(opts, openai.WithHeaders(headers))
661 }
662 if baseURL != "" {
663 opts = append(opts, openai.WithBaseURL(baseURL))
664 }
665 return openai.New(opts...)
666}
667
668func (c *coordinator) buildOpenrouterProvider(_, apiKey string, headers map[string]string) (fantasy.Provider, error) {
669 opts := []openrouter.Option{
670 openrouter.WithAPIKey(apiKey),
671 }
672 if c.cfg.Config().Options.Debug {
673 httpClient := log.NewHTTPClient()
674 opts = append(opts, openrouter.WithHTTPClient(httpClient))
675 }
676 if len(headers) > 0 {
677 opts = append(opts, openrouter.WithHeaders(headers))
678 }
679 return openrouter.New(opts...)
680}
681
682func (c *coordinator) buildVercelProvider(_, apiKey string, headers map[string]string) (fantasy.Provider, error) {
683 opts := []vercel.Option{
684 vercel.WithAPIKey(apiKey),
685 }
686 if c.cfg.Config().Options.Debug {
687 httpClient := log.NewHTTPClient()
688 opts = append(opts, vercel.WithHTTPClient(httpClient))
689 }
690 if len(headers) > 0 {
691 opts = append(opts, vercel.WithHeaders(headers))
692 }
693 return vercel.New(opts...)
694}
695
696func (c *coordinator) buildOpenaiCompatProvider(baseURL, apiKey string, headers map[string]string, extraBody map[string]any, providerID string, isSubAgent bool) (fantasy.Provider, error) {
697 opts := []openaicompat.Option{
698 openaicompat.WithBaseURL(baseURL),
699 openaicompat.WithAPIKey(apiKey),
700 }
701
702 // Set HTTP client based on provider and debug mode.
703 var httpClient *http.Client
704 if providerID == string(catwalk.InferenceProviderCopilot) {
705 opts = append(opts, openaicompat.WithUseResponsesAPI())
706 httpClient = copilot.NewClient(isSubAgent, c.cfg.Config().Options.Debug)
707 } else if c.cfg.Config().Options.Debug {
708 httpClient = log.NewHTTPClient()
709 }
710 if httpClient != nil {
711 opts = append(opts, openaicompat.WithHTTPClient(httpClient))
712 }
713
714 if len(headers) > 0 {
715 opts = append(opts, openaicompat.WithHeaders(headers))
716 }
717
718 for extraKey, extraValue := range extraBody {
719 opts = append(opts, openaicompat.WithSDKOptions(openaisdk.WithJSONSet(extraKey, extraValue)))
720 }
721
722 return openaicompat.New(opts...)
723}
724
725func (c *coordinator) buildAzureProvider(baseURL, apiKey string, headers map[string]string, options map[string]string) (fantasy.Provider, error) {
726 opts := []azure.Option{
727 azure.WithBaseURL(baseURL),
728 azure.WithAPIKey(apiKey),
729 azure.WithUseResponsesAPI(),
730 }
731 if c.cfg.Config().Options.Debug {
732 httpClient := log.NewHTTPClient()
733 opts = append(opts, azure.WithHTTPClient(httpClient))
734 }
735 if options == nil {
736 options = make(map[string]string)
737 }
738 if apiVersion, ok := options["apiVersion"]; ok {
739 opts = append(opts, azure.WithAPIVersion(apiVersion))
740 }
741 if len(headers) > 0 {
742 opts = append(opts, azure.WithHeaders(headers))
743 }
744
745 return azure.New(opts...)
746}
747
748func (c *coordinator) buildBedrockProvider(apiKey string, headers map[string]string) (fantasy.Provider, error) {
749 var opts []bedrock.Option
750 if c.cfg.Config().Options.Debug {
751 httpClient := log.NewHTTPClient()
752 opts = append(opts, bedrock.WithHTTPClient(httpClient))
753 }
754 if len(headers) > 0 {
755 opts = append(opts, bedrock.WithHeaders(headers))
756 }
757 switch {
758 case apiKey != "":
759 opts = append(opts, bedrock.WithAPIKey(apiKey))
760 case os.Getenv("AWS_BEARER_TOKEN_BEDROCK") != "":
761 opts = append(opts, bedrock.WithAPIKey(os.Getenv("AWS_BEARER_TOKEN_BEDROCK")))
762 default:
763 // Skip, let the SDK do authentication.
764 }
765 return bedrock.New(opts...)
766}
767
768func (c *coordinator) buildGoogleProvider(baseURL, apiKey string, headers map[string]string) (fantasy.Provider, error) {
769 opts := []google.Option{
770 google.WithBaseURL(baseURL),
771 google.WithGeminiAPIKey(apiKey),
772 }
773 if c.cfg.Config().Options.Debug {
774 httpClient := log.NewHTTPClient()
775 opts = append(opts, google.WithHTTPClient(httpClient))
776 }
777 if len(headers) > 0 {
778 opts = append(opts, google.WithHeaders(headers))
779 }
780 return google.New(opts...)
781}
782
783func (c *coordinator) buildGoogleVertexProvider(headers map[string]string, options map[string]string) (fantasy.Provider, error) {
784 opts := []google.Option{}
785 if c.cfg.Config().Options.Debug {
786 httpClient := log.NewHTTPClient()
787 opts = append(opts, google.WithHTTPClient(httpClient))
788 }
789 if len(headers) > 0 {
790 opts = append(opts, google.WithHeaders(headers))
791 }
792
793 project := options["project"]
794 location := options["location"]
795
796 opts = append(opts, google.WithVertex(project, location))
797
798 return google.New(opts...)
799}
800
801func (c *coordinator) isAnthropicThinking(model config.SelectedModel) bool {
802 if model.Think {
803 return true
804 }
805 opts, err := anthropic.ParseOptions(model.ProviderOptions)
806 return err == nil && opts.Thinking != nil
807}
808
809func (c *coordinator) buildProvider(providerCfg config.ProviderConfig, model config.SelectedModel, isSubAgent bool) (fantasy.Provider, error) {
810 headers := maps.Clone(providerCfg.ExtraHeaders)
811 if headers == nil {
812 headers = make(map[string]string)
813 }
814
815 // handle special headers for anthropic
816 if providerCfg.Type == anthropic.Name && c.isAnthropicThinking(model) {
817 if v, ok := headers["anthropic-beta"]; ok {
818 headers["anthropic-beta"] = v + ",interleaved-thinking-2025-05-14"
819 } else {
820 headers["anthropic-beta"] = "interleaved-thinking-2025-05-14"
821 }
822 }
823
824 apiKey, _ := c.cfg.Resolve(providerCfg.APIKey)
825 baseURL, _ := c.cfg.Resolve(providerCfg.BaseURL)
826
827 switch providerCfg.Type {
828 case openai.Name:
829 return c.buildOpenaiProvider(baseURL, apiKey, headers)
830 case anthropic.Name:
831 return c.buildAnthropicProvider(baseURL, apiKey, headers, providerCfg.ID)
832 case openrouter.Name:
833 return c.buildOpenrouterProvider(baseURL, apiKey, headers)
834 case vercel.Name:
835 return c.buildVercelProvider(baseURL, apiKey, headers)
836 case azure.Name:
837 return c.buildAzureProvider(baseURL, apiKey, headers, providerCfg.ExtraParams)
838 case bedrock.Name:
839 return c.buildBedrockProvider(apiKey, headers)
840 case google.Name:
841 return c.buildGoogleProvider(baseURL, apiKey, headers)
842 case "google-vertex":
843 return c.buildGoogleVertexProvider(headers, providerCfg.ExtraParams)
844 case openaicompat.Name, hyper.Name:
845 switch providerCfg.ID {
846 case hyper.Name:
847 baseURL = hyper.BaseURL() + "/v1"
848 headers["x-crush-id"] = event.GetID()
849 case string(catwalk.InferenceProviderZAI):
850 if providerCfg.ExtraBody == nil {
851 providerCfg.ExtraBody = map[string]any{}
852 }
853 providerCfg.ExtraBody["tool_stream"] = true
854 }
855 return c.buildOpenaiCompatProvider(baseURL, apiKey, headers, providerCfg.ExtraBody, providerCfg.ID, isSubAgent)
856 default:
857 return nil, fmt.Errorf("provider type not supported: %q", providerCfg.Type)
858 }
859}
860
861func isExactoSupported(modelID string) bool {
862 supportedModels := []string{
863 "moonshotai/kimi-k2-0905",
864 "deepseek/deepseek-v3.1-terminus",
865 "z-ai/glm-4.6",
866 "openai/gpt-oss-120b",
867 "qwen/qwen3-coder",
868 }
869 return slices.Contains(supportedModels, modelID)
870}
871
872func (c *coordinator) Cancel(sessionID string) {
873 c.currentAgent.Cancel(sessionID)
874}
875
876func (c *coordinator) CancelAll() {
877 c.currentAgent.CancelAll()
878}
879
880func (c *coordinator) ClearQueue(sessionID string) {
881 c.currentAgent.ClearQueue(sessionID)
882}
883
884func (c *coordinator) IsBusy() bool {
885 return c.currentAgent.IsBusy()
886}
887
888func (c *coordinator) IsSessionBusy(sessionID string) bool {
889 return c.currentAgent.IsSessionBusy(sessionID)
890}
891
892func (c *coordinator) Model() Model {
893 return c.currentAgent.Model()
894}
895
896func (c *coordinator) UpdateModels(ctx context.Context) error {
897 // build the models again so we make sure we get the latest config
898 large, small, err := c.buildAgentModels(ctx, false)
899 if err != nil {
900 return err
901 }
902 c.currentAgent.SetModels(large, small)
903
904 agentCfg, ok := c.cfg.Config().Agents[config.AgentCoder]
905 if !ok {
906 return errCoderAgentNotConfigured
907 }
908
909 tools, err := c.buildTools(ctx, agentCfg)
910 if err != nil {
911 return err
912 }
913 c.currentAgent.SetTools(tools)
914 return nil
915}
916
917func (c *coordinator) QueuedPrompts(sessionID string) int {
918 return c.currentAgent.QueuedPrompts(sessionID)
919}
920
921func (c *coordinator) QueuedPromptsList(sessionID string) []string {
922 return c.currentAgent.QueuedPromptsList(sessionID)
923}
924
925func (c *coordinator) Summarize(ctx context.Context, sessionID string) error {
926 providerCfg, ok := c.cfg.Config().Providers.Get(c.currentAgent.Model().ModelCfg.Provider)
927 if !ok {
928 return errModelProviderNotConfigured
929 }
930 return c.currentAgent.Summarize(ctx, sessionID, getProviderOptions(c.currentAgent.Model(), providerCfg))
931}
932
933func (c *coordinator) isUnauthorized(err error) bool {
934 var providerErr *fantasy.ProviderError
935 return errors.As(err, &providerErr) && providerErr.StatusCode == http.StatusUnauthorized
936}
937
938func (c *coordinator) refreshOAuth2Token(ctx context.Context, providerCfg config.ProviderConfig) error {
939 if err := c.cfg.RefreshOAuthToken(ctx, config.ScopeGlobal, providerCfg.ID); err != nil {
940 slog.Error("Failed to refresh OAuth token after 401 error", "provider", providerCfg.ID, "error", err)
941 return err
942 }
943 if err := c.UpdateModels(ctx); err != nil {
944 return err
945 }
946 return nil
947}
948
949func (c *coordinator) refreshApiKeyTemplate(ctx context.Context, providerCfg config.ProviderConfig) error {
950 newAPIKey, err := c.cfg.Resolve(providerCfg.APIKeyTemplate)
951 if err != nil {
952 slog.Error("Failed to re-resolve API key after 401 error", "provider", providerCfg.ID, "error", err)
953 return err
954 }
955
956 providerCfg.APIKey = newAPIKey
957 c.cfg.Config().Providers.Set(providerCfg.ID, providerCfg)
958
959 if err := c.UpdateModels(ctx); err != nil {
960 return err
961 }
962 return nil
963}
964
965// subAgentParams holds the parameters for running a sub-agent.
966type subAgentParams struct {
967 Agent SessionAgent
968 SessionID string
969 AgentMessageID string
970 ToolCallID string
971 Prompt string
972 SessionTitle string
973 // SessionSetup is an optional callback invoked after session creation
974 // but before agent execution, for custom session configuration.
975 SessionSetup func(sessionID string)
976}
977
978// runSubAgent runs a sub-agent and handles session management and cost accumulation.
979// It creates a sub-session, runs the agent with the given prompt, and propagates
980// the cost to the parent session.
981func (c *coordinator) runSubAgent(ctx context.Context, params subAgentParams) (fantasy.ToolResponse, error) {
982 // Create sub-session
983 agentToolSessionID := c.sessions.CreateAgentToolSessionID(params.AgentMessageID, params.ToolCallID)
984 session, err := c.sessions.CreateTaskSession(ctx, agentToolSessionID, params.SessionID, params.SessionTitle)
985 if err != nil {
986 return fantasy.ToolResponse{}, fmt.Errorf("create session: %w", err)
987 }
988
989 // Call session setup function if provided
990 if params.SessionSetup != nil {
991 params.SessionSetup(session.ID)
992 }
993
994 // Get model configuration
995 model := params.Agent.Model()
996 maxTokens := model.CatwalkCfg.DefaultMaxTokens
997 if model.ModelCfg.MaxTokens != 0 {
998 maxTokens = model.ModelCfg.MaxTokens
999 }
1000
1001 providerCfg, ok := c.cfg.Config().Providers.Get(model.ModelCfg.Provider)
1002 if !ok {
1003 return fantasy.ToolResponse{}, errModelProviderNotConfigured
1004 }
1005
1006 // Run the agent
1007 result, err := params.Agent.Run(ctx, SessionAgentCall{
1008 SessionID: session.ID,
1009 Prompt: params.Prompt,
1010 MaxOutputTokens: maxTokens,
1011 ProviderOptions: getProviderOptions(model, providerCfg),
1012 Temperature: model.ModelCfg.Temperature,
1013 TopP: model.ModelCfg.TopP,
1014 TopK: model.ModelCfg.TopK,
1015 FrequencyPenalty: model.ModelCfg.FrequencyPenalty,
1016 PresencePenalty: model.ModelCfg.PresencePenalty,
1017 NonInteractive: true,
1018 })
1019 if err != nil {
1020 return fantasy.NewTextErrorResponse("error generating response"), nil
1021 }
1022
1023 // Update parent session cost
1024 if err := c.updateParentSessionCost(ctx, session.ID, params.SessionID); err != nil {
1025 return fantasy.ToolResponse{}, err
1026 }
1027
1028 return fantasy.NewTextResponse(result.Response.Content.Text()), nil
1029}
1030
1031// updateParentSessionCost accumulates the cost from a child session to its parent session.
1032func (c *coordinator) updateParentSessionCost(ctx context.Context, childSessionID, parentSessionID string) error {
1033 childSession, err := c.sessions.Get(ctx, childSessionID)
1034 if err != nil {
1035 return fmt.Errorf("get child session: %w", err)
1036 }
1037
1038 parentSession, err := c.sessions.Get(ctx, parentSessionID)
1039 if err != nil {
1040 return fmt.Errorf("get parent session: %w", err)
1041 }
1042
1043 parentSession.Cost += childSession.Cost
1044
1045 if _, err := c.sessions.Save(ctx, parentSession); err != nil {
1046 return fmt.Errorf("save parent session: %w", err)
1047 }
1048
1049 return nil
1050}
1051
1052// discoverSkills runs the skill discovery pipeline and returns both the
1053// pre-filter (all discovered, after dedup) and post-filter (active) lists.
1054func discoverSkills(cfg *config.ConfigStore) (allSkills, activeSkills []*skills.Skill) {
1055 discovered := skills.DiscoverBuiltin()
1056
1057 opts := cfg.Config().Options
1058 if opts != nil && len(opts.SkillsPaths) > 0 {
1059 expandedPaths := make([]string, 0, len(opts.SkillsPaths))
1060 for _, pth := range opts.SkillsPaths {
1061 expanded := home.Long(pth)
1062 if strings.HasPrefix(expanded, "$") {
1063 if resolved, err := cfg.Resolver().ResolveValue(expanded); err == nil {
1064 expanded = resolved
1065 }
1066 }
1067 expandedPaths = append(expandedPaths, expanded)
1068 }
1069 discovered = append(discovered, skills.Discover(expandedPaths)...)
1070 }
1071
1072 allSkills = skills.Deduplicate(discovered)
1073 var disabledSkills []string
1074 if opts != nil {
1075 disabledSkills = opts.DisabledSkills
1076 }
1077 activeSkills = skills.Filter(allSkills, disabledSkills)
1078 return allSkills, activeSkills
1079}