1package agent
2
3import (
4 "bytes"
5 "cmp"
6 "context"
7 "encoding/json"
8 "errors"
9 "fmt"
10 "io"
11 "log/slog"
12 "maps"
13 "net/http"
14 "os"
15 "path/filepath"
16 "slices"
17 "strings"
18
19 "charm.land/catwalk/pkg/catwalk"
20 "charm.land/fantasy"
21 "github.com/charmbracelet/crush/internal/agent/hyper"
22 "github.com/charmbracelet/crush/internal/agent/notify"
23 "github.com/charmbracelet/crush/internal/agent/prompt"
24 "github.com/charmbracelet/crush/internal/agent/tools"
25 "github.com/charmbracelet/crush/internal/config"
26 "github.com/charmbracelet/crush/internal/event"
27 "github.com/charmbracelet/crush/internal/filetracker"
28 "github.com/charmbracelet/crush/internal/history"
29 "github.com/charmbracelet/crush/internal/home"
30 "github.com/charmbracelet/crush/internal/hooks"
31 "github.com/charmbracelet/crush/internal/log"
32 "github.com/charmbracelet/crush/internal/lsp"
33 "github.com/charmbracelet/crush/internal/message"
34 "github.com/charmbracelet/crush/internal/oauth/copilot"
35 "github.com/charmbracelet/crush/internal/permission"
36 "github.com/charmbracelet/crush/internal/pubsub"
37 "github.com/charmbracelet/crush/internal/session"
38 "github.com/charmbracelet/crush/internal/skills"
39 "golang.org/x/sync/errgroup"
40
41 "charm.land/fantasy/providers/anthropic"
42 "charm.land/fantasy/providers/azure"
43 "charm.land/fantasy/providers/bedrock"
44 "charm.land/fantasy/providers/google"
45 "charm.land/fantasy/providers/openai"
46 "charm.land/fantasy/providers/openaicompat"
47 "charm.land/fantasy/providers/openrouter"
48 "charm.land/fantasy/providers/vercel"
49 openaisdk "github.com/charmbracelet/openai-go/option"
50 "github.com/qjebbs/go-jsons"
51)
52
53// Coordinator errors.
54var (
55 errCoderAgentNotConfigured = errors.New("coder agent not configured")
56 errModelProviderNotConfigured = errors.New("model provider not configured")
57 errLargeModelNotSelected = errors.New("large model not selected")
58 errSmallModelNotSelected = errors.New("small model not selected")
59 errLargeModelProviderNotConfigured = errors.New("large model provider not configured")
60 errSmallModelProviderNotConfigured = errors.New("small model provider not configured")
61 errLargeModelNotFound = errors.New("large model not found in provider config")
62 errSmallModelNotFound = errors.New("small model not found in provider config")
63)
64
65type Coordinator interface {
66 // INFO: (kujtim) this is not used yet we will use this when we have multiple agents
67 // SetMainAgent(string)
68 Run(ctx context.Context, sessionID, prompt string, attachments ...message.Attachment) (*fantasy.AgentResult, error)
69 Cancel(sessionID string)
70 CancelAll()
71 IsSessionBusy(sessionID string) bool
72 IsBusy() bool
73 QueuedPrompts(sessionID string) int
74 QueuedPromptsList(sessionID string) []string
75 ClearQueue(sessionID string)
76 Summarize(context.Context, string) error
77 Model() Model
78 UpdateModels(ctx context.Context) error
79}
80
81type coordinator struct {
82 cfg *config.ConfigStore
83 sessions session.Service
84 messages message.Service
85 permissions permission.Service
86 history history.Service
87 filetracker filetracker.Service
88 lspManager *lsp.Manager
89 notify pubsub.Publisher[notify.Notification]
90
91 currentAgent SessionAgent
92 agents map[string]SessionAgent
93
94 // Skills discovery results (session-start snapshot).
95 allSkills []*skills.Skill // Pre-filter: all discovered after dedup.
96 activeSkills []*skills.Skill // Post-filter: active skills only.
97 skillTracker *skills.Tracker
98
99 readyWg errgroup.Group
100}
101
102func NewCoordinator(
103 ctx context.Context,
104 cfg *config.ConfigStore,
105 sessions session.Service,
106 messages message.Service,
107 permissions permission.Service,
108 history history.Service,
109 filetracker filetracker.Service,
110 lspManager *lsp.Manager,
111 notify pubsub.Publisher[notify.Notification],
112) (Coordinator, error) {
113 // Discover skills once at session start.
114 allSkills, activeSkills := discoverSkills(cfg)
115 skillTracker := skills.NewTracker(activeSkills)
116
117 c := &coordinator{
118 cfg: cfg,
119 sessions: sessions,
120 messages: messages,
121 permissions: permissions,
122 history: history,
123 filetracker: filetracker,
124 lspManager: lspManager,
125 notify: notify,
126 agents: make(map[string]SessionAgent),
127 allSkills: allSkills,
128 activeSkills: activeSkills,
129 skillTracker: skillTracker,
130 }
131
132 agentCfg, ok := cfg.Config().Agents[config.AgentCoder]
133 if !ok {
134 return nil, errCoderAgentNotConfigured
135 }
136
137 // TODO: make this dynamic when we support multiple agents
138 prompt, err := coderPrompt(prompt.WithWorkingDir(c.cfg.WorkingDir()))
139 if err != nil {
140 return nil, err
141 }
142
143 agent, err := c.buildAgent(ctx, prompt, agentCfg, false)
144 if err != nil {
145 return nil, err
146 }
147 c.currentAgent = agent
148 c.agents[config.AgentCoder] = agent
149 return c, nil
150}
151
152// Run implements Coordinator.
153func (c *coordinator) Run(ctx context.Context, sessionID string, prompt string, attachments ...message.Attachment) (*fantasy.AgentResult, error) {
154 if err := c.readyWg.Wait(); err != nil {
155 return nil, err
156 }
157
158 // refresh models before each run
159 if err := c.UpdateModels(ctx); err != nil {
160 return nil, fmt.Errorf("failed to update models: %w", err)
161 }
162
163 model := c.currentAgent.Model()
164 maxTokens := model.CatwalkCfg.DefaultMaxTokens
165 if model.ModelCfg.MaxTokens != 0 {
166 maxTokens = model.ModelCfg.MaxTokens
167 }
168
169 if !model.CatwalkCfg.SupportsImages && attachments != nil {
170 // filter out image attachments
171 filteredAttachments := make([]message.Attachment, 0, len(attachments))
172 for _, att := range attachments {
173 if att.IsText() {
174 filteredAttachments = append(filteredAttachments, att)
175 }
176 }
177 attachments = filteredAttachments
178 }
179
180 providerCfg, ok := c.cfg.Config().Providers.Get(model.ModelCfg.Provider)
181 if !ok {
182 return nil, errModelProviderNotConfigured
183 }
184
185 mergedOptions, temp, topP, topK, freqPenalty, presPenalty := mergeCallOptions(model, providerCfg)
186
187 if err := c.refreshTokenIfExpired(ctx, providerCfg); err != nil {
188 // NOTE(@andreynering): We don't return here because the event handling to ask the user to reauthenticate
189 // depends on the flow below. If refresh fails, proceed with the token we have.
190 slog.Error("Failed to refresh OAuth2 token. Proceeding with existing token.", "error", err)
191 }
192
193 run := func() (*fantasy.AgentResult, error) {
194 return c.currentAgent.Run(ctx, SessionAgentCall{
195 SessionID: sessionID,
196 Prompt: prompt,
197 Attachments: attachments,
198 MaxOutputTokens: maxTokens,
199 ProviderOptions: mergedOptions,
200 Temperature: temp,
201 TopP: topP,
202 TopK: topK,
203 FrequencyPenalty: freqPenalty,
204 PresencePenalty: presPenalty,
205 })
206 }
207 beforeLoaded := c.skillTracker.LoadedNames()
208 result, originalErr := run()
209 logTurnSkillUsage(sessionID, prompt, c.activeSkills, c.skillTracker, beforeLoaded)
210
211 if c.isUnauthorized(originalErr) {
212 if err := c.retryAfterUnauthorized(ctx, providerCfg); err == nil {
213 return run()
214 }
215 }
216
217 return result, originalErr
218}
219
220func getProviderOptions(model Model, providerCfg config.ProviderConfig) fantasy.ProviderOptions {
221 options := fantasy.ProviderOptions{}
222
223 cfgOpts := []byte("{}")
224 providerCfgOpts := []byte("{}")
225 catwalkOpts := []byte("{}")
226
227 if model.ModelCfg.ProviderOptions != nil {
228 data, err := json.Marshal(model.ModelCfg.ProviderOptions)
229 if err == nil {
230 cfgOpts = data
231 }
232 }
233
234 if providerCfg.ProviderOptions != nil {
235 data, err := json.Marshal(providerCfg.ProviderOptions)
236 if err == nil {
237 providerCfgOpts = data
238 }
239 }
240
241 if model.CatwalkCfg.Options.ProviderOptions != nil {
242 data, err := json.Marshal(model.CatwalkCfg.Options.ProviderOptions)
243 if err == nil {
244 catwalkOpts = data
245 }
246 }
247
248 readers := []io.Reader{
249 bytes.NewReader(catwalkOpts),
250 bytes.NewReader(providerCfgOpts),
251 bytes.NewReader(cfgOpts),
252 }
253
254 got, err := jsons.Merge(readers)
255 if err != nil {
256 slog.Error("Could not merge call config", "err", err)
257 return options
258 }
259
260 mergedOptions := make(map[string]any)
261
262 err = json.Unmarshal([]byte(got), &mergedOptions)
263 if err != nil {
264 slog.Error("Could not create config for call", "err", err)
265 return options
266 }
267
268 switch providerCfg.Type {
269 case openai.Name, azure.Name:
270 _, hasReasoningEffort := mergedOptions["reasoning_effort"]
271 if !hasReasoningEffort && model.ModelCfg.ReasoningEffort != "" {
272 mergedOptions["reasoning_effort"] = model.ModelCfg.ReasoningEffort
273 }
274 if openai.IsResponsesModel(model.CatwalkCfg.ID) {
275 if openai.IsResponsesReasoningModel(model.CatwalkCfg.ID) {
276 mergedOptions["reasoning_summary"] = "auto"
277 mergedOptions["include"] = []openai.IncludeType{openai.IncludeReasoningEncryptedContent}
278 }
279 parsed, err := openai.ParseResponsesOptions(mergedOptions)
280 if err == nil {
281 options[openai.Name] = parsed
282 }
283 } else {
284 parsed, err := openai.ParseOptions(mergedOptions)
285 if err == nil {
286 options[openai.Name] = parsed
287 }
288 }
289 case anthropic.Name:
290 var (
291 _, hasEffort = mergedOptions["effort"]
292 _, hasThink = mergedOptions["thinking"]
293 )
294 switch {
295 case !hasEffort && model.ModelCfg.ReasoningEffort != "":
296 mergedOptions["effort"] = model.ModelCfg.ReasoningEffort
297 case !hasThink && model.ModelCfg.Think:
298 mergedOptions["thinking"] = map[string]any{"budget_tokens": 2000}
299 }
300 parsed, err := anthropic.ParseOptions(mergedOptions)
301 if err == nil {
302 options[anthropic.Name] = parsed
303 }
304
305 case openrouter.Name:
306 _, hasReasoning := mergedOptions["reasoning"]
307 if !hasReasoning && model.ModelCfg.ReasoningEffort != "" {
308 mergedOptions["reasoning"] = map[string]any{
309 "enabled": true,
310 "effort": model.ModelCfg.ReasoningEffort,
311 }
312 }
313 parsed, err := openrouter.ParseOptions(mergedOptions)
314 if err == nil {
315 options[openrouter.Name] = parsed
316 }
317 case vercel.Name:
318 _, hasReasoning := mergedOptions["reasoning"]
319 if !hasReasoning && model.ModelCfg.ReasoningEffort != "" {
320 mergedOptions["reasoning"] = map[string]any{
321 "enabled": true,
322 "effort": model.ModelCfg.ReasoningEffort,
323 }
324 }
325 parsed, err := vercel.ParseOptions(mergedOptions)
326 if err == nil {
327 options[vercel.Name] = parsed
328 }
329 case google.Name:
330 _, hasReasoning := mergedOptions["thinking_config"]
331 if !hasReasoning {
332 if strings.HasPrefix(model.CatwalkCfg.ID, "gemini-2") {
333 mergedOptions["thinking_config"] = map[string]any{
334 "thinking_budget": 2000,
335 "include_thoughts": true,
336 }
337 } else {
338 mergedOptions["thinking_config"] = map[string]any{
339 "thinking_level": model.ModelCfg.ReasoningEffort,
340 "include_thoughts": true,
341 }
342 }
343 }
344 parsed, err := google.ParseOptions(mergedOptions)
345 if err == nil {
346 options[google.Name] = parsed
347 }
348 case openaicompat.Name, hyper.Name:
349 _, hasReasoningEffort := mergedOptions["reasoning_effort"]
350 if !hasReasoningEffort && model.ModelCfg.ReasoningEffort != "" {
351 mergedOptions["reasoning_effort"] = model.ModelCfg.ReasoningEffort
352 }
353
354 extraBody := make(map[string]any)
355
356 // "reasoning effort" is a standard OpenAI field, but "thinking" is not.
357 // Setting it in the right way for each provider.
358 // TODO: Abstract this in Fantasy somehow?
359 // TODO: Allow custom providers to specify how to set this?
360 switch providerCfg.ID {
361 case hyper.Name:
362 extraBody["thinking"] = model.ModelCfg.Think
363 case string(catwalk.InferenceProviderIoNet):
364 extraBody["chat_template_kwargs"] = map[string]any{
365 "thinking": model.ModelCfg.Think,
366 }
367 case string(catwalk.InferenceProviderZAI):
368 if model.ModelCfg.Think {
369 extraBody["thinking"] = map[string]any{
370 "type": "enabled",
371 }
372 } else {
373 extraBody["thinking"] = map[string]any{
374 "type": "disabled",
375 }
376 }
377 }
378
379 mergedOptions["extra_body"] = extraBody
380
381 parsed, err := openaicompat.ParseOptions(mergedOptions)
382 if err == nil {
383 options[openaicompat.Name] = parsed
384 }
385 }
386
387 return options
388}
389
390func mergeCallOptions(model Model, cfg config.ProviderConfig) (fantasy.ProviderOptions, *float64, *float64, *int64, *float64, *float64) {
391 modelOptions := getProviderOptions(model, cfg)
392 temp := cmp.Or(model.ModelCfg.Temperature, model.CatwalkCfg.Options.Temperature)
393 topP := cmp.Or(model.ModelCfg.TopP, model.CatwalkCfg.Options.TopP)
394 topK := cmp.Or(model.ModelCfg.TopK, model.CatwalkCfg.Options.TopK)
395 freqPenalty := cmp.Or(model.ModelCfg.FrequencyPenalty, model.CatwalkCfg.Options.FrequencyPenalty)
396 presPenalty := cmp.Or(model.ModelCfg.PresencePenalty, model.CatwalkCfg.Options.PresencePenalty)
397 return modelOptions, temp, topP, topK, freqPenalty, presPenalty
398}
399
400func (c *coordinator) buildAgent(ctx context.Context, prompt *prompt.Prompt, agent config.Agent, isSubAgent bool) (SessionAgent, error) {
401 large, small, err := c.buildAgentModels(ctx, isSubAgent)
402 if err != nil {
403 return nil, err
404 }
405
406 largeProviderCfg, _ := c.cfg.Config().Providers.Get(large.ModelCfg.Provider)
407 result := NewSessionAgent(SessionAgentOptions{
408 LargeModel: large,
409 SmallModel: small,
410 SystemPromptPrefix: largeProviderCfg.SystemPromptPrefix,
411 SystemPrompt: "",
412 IsSubAgent: isSubAgent,
413 DisableAutoSummarize: c.cfg.Config().Options.DisableAutoSummarize,
414 IsYolo: c.permissions.SkipRequests(),
415 Sessions: c.sessions,
416 Messages: c.messages,
417 Tools: nil,
418 Notify: c.notify,
419 })
420
421 c.readyWg.Go(func() error {
422 systemPrompt, err := prompt.Build(ctx, large.Model.Provider(), large.Model.Model(), c.cfg)
423 if err != nil {
424 return err
425 }
426 result.SetSystemPrompt(systemPrompt)
427 return nil
428 })
429
430 c.readyWg.Go(func() error {
431 tools, err := c.buildTools(ctx, agent, isSubAgent)
432 if err != nil {
433 return err
434 }
435 result.SetTools(tools)
436 return nil
437 })
438
439 return result, nil
440}
441
442func (c *coordinator) buildTools(ctx context.Context, agent config.Agent, isSubAgent bool) ([]fantasy.AgentTool, error) {
443 var allTools []fantasy.AgentTool
444 if slices.Contains(agent.AllowedTools, AgentToolName) {
445 agentTool, err := c.agentTool(ctx)
446 if err != nil {
447 return nil, err
448 }
449 allTools = append(allTools, agentTool)
450 }
451
452 if slices.Contains(agent.AllowedTools, tools.AgenticFetchToolName) {
453 agenticFetchTool, err := c.agenticFetchTool(ctx, nil)
454 if err != nil {
455 return nil, err
456 }
457 allTools = append(allTools, agenticFetchTool)
458 }
459
460 // Get the model name for the agent
461 modelName := ""
462 if modelCfg, ok := c.cfg.Config().Models[agent.Model]; ok {
463 if model := c.cfg.Config().GetModel(modelCfg.Provider, modelCfg.Model); model != nil {
464 modelName = model.Name
465 }
466 }
467
468 logFile := filepath.Join(c.cfg.Config().Options.DataDirectory, "logs", "crush.log")
469
470 // Build hook runner if PreToolUse hooks are configured.
471 var hookRunner *hooks.Runner
472 if preToolHooks := c.cfg.Config().Hooks[hooks.EventPreToolUse]; len(preToolHooks) > 0 {
473 hookRunner = hooks.NewRunner(preToolHooks, c.cfg.WorkingDir(), c.cfg.WorkingDir())
474 }
475
476 allTools = append(allTools,
477 tools.NewBashTool(c.permissions, c.cfg.WorkingDir(), c.cfg.Config().Options.Attribution, modelName),
478 tools.NewCrushInfoTool(c.cfg, c.lspManager, c.allSkills, c.activeSkills, c.skillTracker),
479 tools.NewCrushLogsTool(logFile),
480 tools.NewJobOutputTool(),
481 tools.NewJobKillTool(),
482 tools.NewDownloadTool(c.permissions, c.cfg.WorkingDir(), nil),
483 tools.NewEditTool(c.lspManager, c.permissions, c.history, c.filetracker, c.cfg.WorkingDir()),
484 tools.NewMultiEditTool(c.lspManager, c.permissions, c.history, c.filetracker, c.cfg.WorkingDir()),
485 tools.NewFetchTool(c.permissions, c.cfg.WorkingDir(), nil),
486 tools.NewGlobTool(c.cfg.WorkingDir()),
487 tools.NewGrepTool(c.cfg.WorkingDir(), c.cfg.Config().Tools.Grep),
488 tools.NewLsTool(c.permissions, c.cfg.WorkingDir(), c.cfg.Config().Tools.Ls),
489 tools.NewSourcegraphTool(nil),
490 tools.NewTodosTool(c.sessions),
491 tools.NewViewTool(c.lspManager, c.permissions, c.filetracker, c.skillTracker, c.cfg.WorkingDir(), c.cfg.Config().Options.SkillsPaths...),
492 tools.NewWriteTool(c.lspManager, c.permissions, c.history, c.filetracker, c.cfg.WorkingDir()),
493 )
494
495 // Add LSP tools if user has configured LSPs or auto_lsp is enabled (nil or true).
496 if len(c.cfg.Config().LSP) > 0 || c.cfg.Config().Options.AutoLSP == nil || *c.cfg.Config().Options.AutoLSP {
497 allTools = append(allTools, tools.NewDiagnosticsTool(c.lspManager), tools.NewReferencesTool(c.lspManager), tools.NewLSPRestartTool(c.lspManager))
498 }
499
500 if len(c.cfg.Config().MCP) > 0 {
501 allTools = append(
502 allTools,
503 tools.NewListMCPResourcesTool(c.cfg, c.permissions),
504 tools.NewReadMCPResourceTool(c.cfg, c.permissions),
505 )
506 }
507
508 var filteredTools []fantasy.AgentTool
509 for _, tool := range allTools {
510 if slices.Contains(agent.AllowedTools, tool.Info().Name) {
511 filteredTools = append(filteredTools, tool)
512 }
513 }
514
515 for _, tool := range tools.GetMCPTools(c.permissions, c.cfg, c.cfg.WorkingDir()) {
516 if agent.AllowedMCP == nil {
517 // No MCP restrictions
518 filteredTools = append(filteredTools, tool)
519 continue
520 }
521 if len(agent.AllowedMCP) == 0 {
522 // No MCPs allowed
523 slog.Debug("No MCPs allowed", "tool", tool.Name(), "agent", agent.Name)
524 break
525 }
526
527 for mcp, tools := range agent.AllowedMCP {
528 if mcp != tool.MCP() {
529 continue
530 }
531 if len(tools) == 0 || slices.Contains(tools, tool.MCPToolName()) {
532 filteredTools = append(filteredTools, tool)
533 break
534 }
535 slog.Debug("MCP not allowed", "tool", tool.Name(), "agent", agent.Name)
536 }
537 }
538 slices.SortFunc(filteredTools, func(a, b fantasy.AgentTool) int {
539 return strings.Compare(a.Info().Name, b.Info().Name)
540 })
541
542 // Wrap tools with hook interception for the top-level agent only.
543 // Sub-agents (the `agent` task tool, `agentic_fetch`, etc.) run
544 // without hook interception to avoid firing the user's hook N times
545 // per delegated turn. The top-level invocation of the sub-agent tool
546 // itself is still wrapped from the coder's side.
547 filteredTools = wrapToolsWithHooks(filteredTools, hookRunner, isSubAgent)
548
549 return filteredTools, nil
550}
551
552// TODO: when we support multiple agents we need to change this so that we pass in the agent specific model config
553func (c *coordinator) buildAgentModels(ctx context.Context, isSubAgent bool) (Model, Model, error) {
554 largeModelCfg, ok := c.cfg.Config().Models[config.SelectedModelTypeLarge]
555 if !ok {
556 return Model{}, Model{}, errLargeModelNotSelected
557 }
558 smallModelCfg, ok := c.cfg.Config().Models[config.SelectedModelTypeSmall]
559 if !ok {
560 return Model{}, Model{}, errSmallModelNotSelected
561 }
562
563 largeProviderCfg, ok := c.cfg.Config().Providers.Get(largeModelCfg.Provider)
564 if !ok {
565 return Model{}, Model{}, errLargeModelProviderNotConfigured
566 }
567
568 largeProvider, err := c.buildProvider(largeProviderCfg, largeModelCfg, isSubAgent)
569 if err != nil {
570 return Model{}, Model{}, err
571 }
572
573 smallProviderCfg, ok := c.cfg.Config().Providers.Get(smallModelCfg.Provider)
574 if !ok {
575 return Model{}, Model{}, errSmallModelProviderNotConfigured
576 }
577
578 smallProvider, err := c.buildProvider(smallProviderCfg, smallModelCfg, true)
579 if err != nil {
580 return Model{}, Model{}, err
581 }
582
583 var largeCatwalkModel *catwalk.Model
584 var smallCatwalkModel *catwalk.Model
585
586 for _, m := range largeProviderCfg.Models {
587 if m.ID == largeModelCfg.Model {
588 largeCatwalkModel = &m
589 }
590 }
591 for _, m := range smallProviderCfg.Models {
592 if m.ID == smallModelCfg.Model {
593 smallCatwalkModel = &m
594 }
595 }
596
597 if largeCatwalkModel == nil {
598 return Model{}, Model{}, errLargeModelNotFound
599 }
600
601 if smallCatwalkModel == nil {
602 return Model{}, Model{}, errSmallModelNotFound
603 }
604
605 largeModelID := largeModelCfg.Model
606 smallModelID := smallModelCfg.Model
607
608 if largeModelCfg.Provider == openrouter.Name && isExactoSupported(largeModelID) {
609 largeModelID += ":exacto"
610 }
611
612 if smallModelCfg.Provider == openrouter.Name && isExactoSupported(smallModelID) {
613 smallModelID += ":exacto"
614 }
615
616 largeModel, err := largeProvider.LanguageModel(ctx, largeModelID)
617 if err != nil {
618 return Model{}, Model{}, err
619 }
620 smallModel, err := smallProvider.LanguageModel(ctx, smallModelID)
621 if err != nil {
622 return Model{}, Model{}, err
623 }
624
625 return Model{
626 Model: largeModel,
627 CatwalkCfg: *largeCatwalkModel,
628 ModelCfg: largeModelCfg,
629 FlatRate: largeProviderCfg.FlatRate,
630 }, Model{
631 Model: smallModel,
632 CatwalkCfg: *smallCatwalkModel,
633 ModelCfg: smallModelCfg,
634 FlatRate: smallProviderCfg.FlatRate,
635 }, nil
636}
637
638func (c *coordinator) buildAnthropicProvider(baseURL, apiKey string, headers map[string]string, providerID string) (fantasy.Provider, error) {
639 var opts []anthropic.Option
640
641 switch {
642 case strings.HasPrefix(apiKey, "Bearer "):
643 // NOTE: Prevent the SDK from picking up the API key from env.
644 os.Setenv("ANTHROPIC_API_KEY", "")
645 headers["Authorization"] = apiKey
646 case providerID == string(catwalk.InferenceProviderMiniMax) || providerID == string(catwalk.InferenceProviderMiniMaxChina):
647 // NOTE: Prevent the SDK from picking up the API key from env.
648 os.Setenv("ANTHROPIC_API_KEY", "")
649 headers["Authorization"] = "Bearer " + apiKey
650 case apiKey != "":
651 // X-Api-Key header
652 opts = append(opts, anthropic.WithAPIKey(apiKey))
653 }
654
655 if len(headers) > 0 {
656 opts = append(opts, anthropic.WithHeaders(headers))
657 }
658
659 if baseURL != "" {
660 opts = append(opts, anthropic.WithBaseURL(baseURL))
661 }
662
663 if c.cfg.Config().Options.Debug {
664 httpClient := log.NewHTTPClient()
665 opts = append(opts, anthropic.WithHTTPClient(httpClient))
666 }
667 return anthropic.New(opts...)
668}
669
670func (c *coordinator) buildOpenaiProvider(baseURL, apiKey string, headers map[string]string) (fantasy.Provider, error) {
671 opts := []openai.Option{
672 openai.WithAPIKey(apiKey),
673 openai.WithUseResponsesAPI(),
674 }
675 if c.cfg.Config().Options.Debug {
676 httpClient := log.NewHTTPClient()
677 opts = append(opts, openai.WithHTTPClient(httpClient))
678 }
679 if len(headers) > 0 {
680 opts = append(opts, openai.WithHeaders(headers))
681 }
682 if baseURL != "" {
683 opts = append(opts, openai.WithBaseURL(baseURL))
684 }
685 return openai.New(opts...)
686}
687
688func (c *coordinator) buildOpenrouterProvider(_, apiKey string, headers map[string]string) (fantasy.Provider, error) {
689 opts := []openrouter.Option{
690 openrouter.WithAPIKey(apiKey),
691 }
692 if c.cfg.Config().Options.Debug {
693 httpClient := log.NewHTTPClient()
694 opts = append(opts, openrouter.WithHTTPClient(httpClient))
695 }
696 if len(headers) > 0 {
697 opts = append(opts, openrouter.WithHeaders(headers))
698 }
699 return openrouter.New(opts...)
700}
701
702func (c *coordinator) buildVercelProvider(_, apiKey string, headers map[string]string) (fantasy.Provider, error) {
703 opts := []vercel.Option{
704 vercel.WithAPIKey(apiKey),
705 }
706 if c.cfg.Config().Options.Debug {
707 httpClient := log.NewHTTPClient()
708 opts = append(opts, vercel.WithHTTPClient(httpClient))
709 }
710 if len(headers) > 0 {
711 opts = append(opts, vercel.WithHeaders(headers))
712 }
713 return vercel.New(opts...)
714}
715
716func (c *coordinator) buildOpenaiCompatProvider(baseURL, apiKey string, headers map[string]string, extraBody map[string]any, providerID string, isSubAgent bool) (fantasy.Provider, error) {
717 opts := []openaicompat.Option{
718 openaicompat.WithBaseURL(baseURL),
719 openaicompat.WithAPIKey(apiKey),
720 }
721
722 // Set HTTP client based on provider and debug mode.
723 var httpClient *http.Client
724 if providerID == string(catwalk.InferenceProviderCopilot) {
725 opts = append(opts, openaicompat.WithUseResponsesAPI())
726 httpClient = copilot.NewClient(isSubAgent, c.cfg.Config().Options.Debug)
727 } else if c.cfg.Config().Options.Debug {
728 httpClient = log.NewHTTPClient()
729 }
730 if httpClient != nil {
731 opts = append(opts, openaicompat.WithHTTPClient(httpClient))
732 }
733
734 if len(headers) > 0 {
735 opts = append(opts, openaicompat.WithHeaders(headers))
736 }
737
738 for extraKey, extraValue := range extraBody {
739 opts = append(opts, openaicompat.WithSDKOptions(openaisdk.WithJSONSet(extraKey, extraValue)))
740 }
741
742 return openaicompat.New(opts...)
743}
744
745func (c *coordinator) buildAzureProvider(baseURL, apiKey string, headers map[string]string, options map[string]string) (fantasy.Provider, error) {
746 opts := []azure.Option{
747 azure.WithBaseURL(baseURL),
748 azure.WithAPIKey(apiKey),
749 azure.WithUseResponsesAPI(),
750 }
751 if c.cfg.Config().Options.Debug {
752 httpClient := log.NewHTTPClient()
753 opts = append(opts, azure.WithHTTPClient(httpClient))
754 }
755 if options == nil {
756 options = make(map[string]string)
757 }
758 if apiVersion, ok := options["apiVersion"]; ok {
759 opts = append(opts, azure.WithAPIVersion(apiVersion))
760 }
761 if len(headers) > 0 {
762 opts = append(opts, azure.WithHeaders(headers))
763 }
764
765 return azure.New(opts...)
766}
767
768func (c *coordinator) buildBedrockProvider(apiKey string, headers map[string]string) (fantasy.Provider, error) {
769 var opts []bedrock.Option
770 if c.cfg.Config().Options.Debug {
771 httpClient := log.NewHTTPClient()
772 opts = append(opts, bedrock.WithHTTPClient(httpClient))
773 }
774 if len(headers) > 0 {
775 opts = append(opts, bedrock.WithHeaders(headers))
776 }
777 switch {
778 case apiKey != "":
779 opts = append(opts, bedrock.WithAPIKey(apiKey))
780 case os.Getenv("AWS_BEARER_TOKEN_BEDROCK") != "":
781 opts = append(opts, bedrock.WithAPIKey(os.Getenv("AWS_BEARER_TOKEN_BEDROCK")))
782 default:
783 // Skip, let the SDK do authentication.
784 }
785 return bedrock.New(opts...)
786}
787
788func (c *coordinator) buildGoogleProvider(baseURL, apiKey string, headers map[string]string) (fantasy.Provider, error) {
789 opts := []google.Option{
790 google.WithBaseURL(baseURL),
791 google.WithGeminiAPIKey(apiKey),
792 }
793 if c.cfg.Config().Options.Debug {
794 httpClient := log.NewHTTPClient()
795 opts = append(opts, google.WithHTTPClient(httpClient))
796 }
797 if len(headers) > 0 {
798 opts = append(opts, google.WithHeaders(headers))
799 }
800 return google.New(opts...)
801}
802
803func (c *coordinator) buildGoogleVertexProvider(headers map[string]string, options map[string]string) (fantasy.Provider, error) {
804 opts := []google.Option{}
805 if c.cfg.Config().Options.Debug {
806 httpClient := log.NewHTTPClient()
807 opts = append(opts, google.WithHTTPClient(httpClient))
808 }
809 if len(headers) > 0 {
810 opts = append(opts, google.WithHeaders(headers))
811 }
812
813 project := options["project"]
814 location := options["location"]
815
816 opts = append(opts, google.WithVertex(project, location))
817
818 return google.New(opts...)
819}
820
821func (c *coordinator) isAnthropicThinking(model config.SelectedModel) bool {
822 if model.Think {
823 return true
824 }
825 opts, err := anthropic.ParseOptions(model.ProviderOptions)
826 return err == nil && opts.Thinking != nil
827}
828
829func (c *coordinator) buildProvider(providerCfg config.ProviderConfig, model config.SelectedModel, isSubAgent bool) (fantasy.Provider, error) {
830 headers := maps.Clone(providerCfg.ExtraHeaders)
831 if headers == nil {
832 headers = make(map[string]string)
833 }
834
835 // handle special headers for anthropic
836 if providerCfg.Type == anthropic.Name && c.isAnthropicThinking(model) {
837 if v, ok := headers["anthropic-beta"]; ok {
838 headers["anthropic-beta"] = v + ",interleaved-thinking-2025-05-14"
839 } else {
840 headers["anthropic-beta"] = "interleaved-thinking-2025-05-14"
841 }
842 }
843
844 apiKey, _ := c.cfg.Resolve(providerCfg.APIKey)
845 baseURL, _ := c.cfg.Resolve(providerCfg.BaseURL)
846
847 switch providerCfg.Type {
848 case openai.Name:
849 return c.buildOpenaiProvider(baseURL, apiKey, headers)
850 case anthropic.Name:
851 return c.buildAnthropicProvider(baseURL, apiKey, headers, providerCfg.ID)
852 case openrouter.Name:
853 return c.buildOpenrouterProvider(baseURL, apiKey, headers)
854 case vercel.Name:
855 return c.buildVercelProvider(baseURL, apiKey, headers)
856 case azure.Name:
857 return c.buildAzureProvider(baseURL, apiKey, headers, providerCfg.ExtraParams)
858 case bedrock.Name:
859 return c.buildBedrockProvider(apiKey, headers)
860 case google.Name:
861 return c.buildGoogleProvider(baseURL, apiKey, headers)
862 case "google-vertex":
863 return c.buildGoogleVertexProvider(headers, providerCfg.ExtraParams)
864 case openaicompat.Name, hyper.Name:
865 switch providerCfg.ID {
866 case hyper.Name:
867 baseURL = hyper.BaseURL() + "/v1"
868 headers["x-crush-id"] = event.GetID()
869 case string(catwalk.InferenceProviderZAI):
870 if providerCfg.ExtraBody == nil {
871 providerCfg.ExtraBody = map[string]any{}
872 }
873 providerCfg.ExtraBody["tool_stream"] = true
874 }
875 return c.buildOpenaiCompatProvider(baseURL, apiKey, headers, providerCfg.ExtraBody, providerCfg.ID, isSubAgent)
876 default:
877 return nil, fmt.Errorf("provider type not supported: %q", providerCfg.Type)
878 }
879}
880
881func isExactoSupported(modelID string) bool {
882 supportedModels := []string{
883 "moonshotai/kimi-k2-0905",
884 "deepseek/deepseek-v3.1-terminus",
885 "z-ai/glm-4.6",
886 "openai/gpt-oss-120b",
887 "qwen/qwen3-coder",
888 }
889 return slices.Contains(supportedModels, modelID)
890}
891
892func (c *coordinator) Cancel(sessionID string) {
893 c.currentAgent.Cancel(sessionID)
894}
895
896func (c *coordinator) CancelAll() {
897 c.currentAgent.CancelAll()
898}
899
900func (c *coordinator) ClearQueue(sessionID string) {
901 c.currentAgent.ClearQueue(sessionID)
902}
903
904func (c *coordinator) IsBusy() bool {
905 return c.currentAgent.IsBusy()
906}
907
908func (c *coordinator) IsSessionBusy(sessionID string) bool {
909 return c.currentAgent.IsSessionBusy(sessionID)
910}
911
912func (c *coordinator) Model() Model {
913 return c.currentAgent.Model()
914}
915
916func (c *coordinator) UpdateModels(ctx context.Context) error {
917 // build the models again so we make sure we get the latest config
918 large, small, err := c.buildAgentModels(ctx, false)
919 if err != nil {
920 return err
921 }
922 c.currentAgent.SetModels(large, small)
923
924 agentCfg, ok := c.cfg.Config().Agents[config.AgentCoder]
925 if !ok {
926 return errCoderAgentNotConfigured
927 }
928
929 tools, err := c.buildTools(ctx, agentCfg, false)
930 if err != nil {
931 return err
932 }
933 c.currentAgent.SetTools(tools)
934 return nil
935}
936
937func (c *coordinator) QueuedPrompts(sessionID string) int {
938 return c.currentAgent.QueuedPrompts(sessionID)
939}
940
941func (c *coordinator) QueuedPromptsList(sessionID string) []string {
942 return c.currentAgent.QueuedPromptsList(sessionID)
943}
944
945func (c *coordinator) Summarize(ctx context.Context, sessionID string) error {
946 providerCfg, ok := c.cfg.Config().Providers.Get(c.currentAgent.Model().ModelCfg.Provider)
947 if !ok {
948 return errModelProviderNotConfigured
949 }
950
951 if err := c.refreshTokenIfExpired(ctx, providerCfg); err != nil {
952 slog.Error("Failed to refresh OAuth2 token before summarize. Proceeding with existing token.", "error", err)
953 }
954
955 summarize := func() error {
956 return c.currentAgent.Summarize(ctx, sessionID, getProviderOptions(c.currentAgent.Model(), providerCfg))
957 }
958
959 err := summarize()
960 if err != nil && c.isUnauthorized(err) {
961 if retryErr := c.retryAfterUnauthorized(ctx, providerCfg); retryErr == nil {
962 return summarize()
963 }
964 }
965
966 return err
967}
968
969// refreshTokenIfExpired proactively refreshes the OAuth token if it has expired.
970func (c *coordinator) refreshTokenIfExpired(ctx context.Context, providerCfg config.ProviderConfig) error {
971 if providerCfg.OAuthToken == nil || !providerCfg.OAuthToken.IsExpired() {
972 return nil
973 }
974 slog.Debug("Token needs to be refreshed", "provider", providerCfg.ID)
975 return c.refreshOAuth2Token(ctx, providerCfg)
976}
977
978// retryAfterUnauthorized attempts to refresh credentials after receiving a 401
979// and returns nil if retry should be attempted.
980func (c *coordinator) retryAfterUnauthorized(ctx context.Context, providerCfg config.ProviderConfig) error {
981 switch {
982 case providerCfg.OAuthToken != nil:
983 slog.Debug("Received 401. Refreshing token and retrying", "provider", providerCfg.ID)
984 return c.refreshOAuth2Token(ctx, providerCfg)
985 case strings.Contains(providerCfg.APIKeyTemplate, "$"):
986 slog.Debug("Received 401. Refreshing API Key template and retrying", "provider", providerCfg.ID)
987 return c.refreshApiKeyTemplate(ctx, providerCfg)
988 default:
989 return nil
990 }
991}
992
993func (c *coordinator) isUnauthorized(err error) bool {
994 var providerErr *fantasy.ProviderError
995 return errors.As(err, &providerErr) && providerErr.StatusCode == http.StatusUnauthorized
996}
997
998func (c *coordinator) refreshOAuth2Token(ctx context.Context, providerCfg config.ProviderConfig) error {
999 if err := c.cfg.RefreshOAuthToken(ctx, config.ScopeGlobal, providerCfg.ID); err != nil {
1000 slog.Error("Failed to refresh OAuth token after 401 error", "provider", providerCfg.ID, "error", err)
1001 return err
1002 }
1003 if err := c.UpdateModels(ctx); err != nil {
1004 return err
1005 }
1006 return nil
1007}
1008
1009func (c *coordinator) refreshApiKeyTemplate(ctx context.Context, providerCfg config.ProviderConfig) error {
1010 newAPIKey, err := c.cfg.Resolve(providerCfg.APIKeyTemplate)
1011 if err != nil {
1012 slog.Error("Failed to re-resolve API key after 401 error", "provider", providerCfg.ID, "error", err)
1013 return err
1014 }
1015
1016 providerCfg.APIKey = newAPIKey
1017 c.cfg.Config().Providers.Set(providerCfg.ID, providerCfg)
1018
1019 if err := c.UpdateModels(ctx); err != nil {
1020 return err
1021 }
1022 return nil
1023}
1024
1025// subAgentParams holds the parameters for running a sub-agent.
1026type subAgentParams struct {
1027 Agent SessionAgent
1028 SessionID string
1029 AgentMessageID string
1030 ToolCallID string
1031 Prompt string
1032 SessionTitle string
1033 // SessionSetup is an optional callback invoked after session creation
1034 // but before agent execution, for custom session configuration.
1035 SessionSetup func(sessionID string)
1036}
1037
1038// runSubAgent runs a sub-agent and handles session management and cost accumulation.
1039// It creates a sub-session, runs the agent with the given prompt, and propagates
1040// the cost to the parent session.
1041func (c *coordinator) runSubAgent(ctx context.Context, params subAgentParams) (fantasy.ToolResponse, error) {
1042 // Create sub-session
1043 agentToolSessionID := c.sessions.CreateAgentToolSessionID(params.AgentMessageID, params.ToolCallID)
1044 session, err := c.sessions.CreateTaskSession(ctx, agentToolSessionID, params.SessionID, params.SessionTitle)
1045 if err != nil {
1046 return fantasy.ToolResponse{}, fmt.Errorf("create session: %w", err)
1047 }
1048
1049 // Call session setup function if provided
1050 if params.SessionSetup != nil {
1051 params.SessionSetup(session.ID)
1052 }
1053
1054 // Get model configuration
1055 model := params.Agent.Model()
1056 maxTokens := model.CatwalkCfg.DefaultMaxTokens
1057 if model.ModelCfg.MaxTokens != 0 {
1058 maxTokens = model.ModelCfg.MaxTokens
1059 }
1060
1061 providerCfg, ok := c.cfg.Config().Providers.Get(model.ModelCfg.Provider)
1062 if !ok {
1063 return fantasy.ToolResponse{}, errModelProviderNotConfigured
1064 }
1065
1066 // Run the agent
1067 result, err := params.Agent.Run(ctx, SessionAgentCall{
1068 SessionID: session.ID,
1069 Prompt: params.Prompt,
1070 MaxOutputTokens: maxTokens,
1071 ProviderOptions: getProviderOptions(model, providerCfg),
1072 Temperature: model.ModelCfg.Temperature,
1073 TopP: model.ModelCfg.TopP,
1074 TopK: model.ModelCfg.TopK,
1075 FrequencyPenalty: model.ModelCfg.FrequencyPenalty,
1076 PresencePenalty: model.ModelCfg.PresencePenalty,
1077 NonInteractive: true,
1078 })
1079 if err != nil {
1080 return fantasy.NewTextErrorResponse(fmt.Sprintf("Failed to generate response: %s", err)), nil
1081 }
1082
1083 // Update parent session cost
1084 if err := c.updateParentSessionCost(ctx, session.ID, params.SessionID); err != nil {
1085 return fantasy.ToolResponse{}, err
1086 }
1087
1088 return fantasy.NewTextResponse(result.Response.Content.Text()), nil
1089}
1090
1091// updateParentSessionCost accumulates the cost from a child session to its parent session.
1092func (c *coordinator) updateParentSessionCost(ctx context.Context, childSessionID, parentSessionID string) error {
1093 childSession, err := c.sessions.Get(ctx, childSessionID)
1094 if err != nil {
1095 return fmt.Errorf("get child session: %w", err)
1096 }
1097
1098 parentSession, err := c.sessions.Get(ctx, parentSessionID)
1099 if err != nil {
1100 return fmt.Errorf("get parent session: %w", err)
1101 }
1102
1103 parentSession.Cost += childSession.Cost
1104
1105 if _, err := c.sessions.Save(ctx, parentSession); err != nil {
1106 return fmt.Errorf("save parent session: %w", err)
1107 }
1108
1109 return nil
1110}
1111
1112// discoverSkills runs the skill discovery pipeline and returns both the
1113// pre-filter (all discovered, after dedup) and post-filter (active) lists.
1114// It also emits a single diagnostic log line summarising the outcome to
1115// help track skill-loading health over time.
1116func discoverSkills(cfg *config.ConfigStore) (allSkills, activeSkills []*skills.Skill) {
1117 builtin, builtinStates := skills.DiscoverBuiltinWithStates()
1118 discovered := append([]*skills.Skill(nil), builtin...)
1119
1120 var userStates []*skills.SkillState
1121 var userPaths []string
1122
1123 opts := cfg.Config().Options
1124 if opts != nil && len(opts.SkillsPaths) > 0 {
1125 userPaths = make([]string, 0, len(opts.SkillsPaths))
1126 for _, pth := range opts.SkillsPaths {
1127 expanded := home.Long(pth)
1128 if strings.HasPrefix(expanded, "$") {
1129 if resolved, err := cfg.Resolver().ResolveValue(expanded); err == nil {
1130 expanded = resolved
1131 }
1132 }
1133 userPaths = append(userPaths, expanded)
1134 }
1135 var userSkills []*skills.Skill
1136 userSkills, userStates = skills.DiscoverWithStates(userPaths)
1137 discovered = append(discovered, userSkills...)
1138 }
1139
1140 allSkills = skills.Deduplicate(discovered)
1141 var disabledSkills []string
1142 if opts != nil {
1143 disabledSkills = opts.DisabledSkills
1144 }
1145 activeSkills = skills.Filter(allSkills, disabledSkills)
1146
1147 logDiscoveryStats(builtin, builtinStates, userStates, userPaths, allSkills, activeSkills, disabledSkills)
1148 return allSkills, activeSkills
1149}
1150
1151// logTurnSkillUsage emits a per-turn diagnostic line showing which skills
1152// (if any) were loaded during this turn and which looked relevant based on
1153// a cheap keyword match against the user prompt. The goal is to surface
1154// "should-have-loaded but didn't" situations for later analysis.
1155//
1156// Logged at Info level under component=skills; heavy fields are elided when
1157// there is nothing interesting to report.
1158func logTurnSkillUsage(
1159 sessionID string,
1160 prompt string,
1161 activeSkills []*skills.Skill,
1162 tracker *skills.Tracker,
1163 before []string,
1164) {
1165 if tracker == nil || len(activeSkills) == 0 {
1166 return
1167 }
1168
1169 after := tracker.LoadedNames()
1170
1171 beforeSet := make(map[string]bool, len(before))
1172 for _, n := range before {
1173 beforeSet[n] = true
1174 }
1175 var loadedThisTurn []string
1176 for _, n := range after {
1177 if !beforeSet[n] {
1178 loadedThisTurn = append(loadedThisTurn, n)
1179 }
1180 }
1181
1182 slog.Info("Skill turn summary",
1183 "component", "skills",
1184 "session_id", sessionID,
1185 "prompt_len", len(prompt),
1186 "active_total", len(activeSkills),
1187 "loaded_total", len(after),
1188 "loaded_this_turn", loadedThisTurn,
1189 )
1190}
1191
1192// logDiscoveryStats emits a single structured log line summarising skill
1193// discovery for the current session. It is intentionally low-volume: one
1194// line per session start.
1195func logDiscoveryStats(
1196 builtin []*skills.Skill,
1197 builtinStates, userStates []*skills.SkillState,
1198 userPaths []string,
1199 allSkills, activeSkills []*skills.Skill,
1200 disabled []string,
1201) {
1202 countErrors := func(states []*skills.SkillState) int {
1203 n := 0
1204 for _, s := range states {
1205 if s.State == skills.StateError {
1206 n++
1207 }
1208 }
1209 return n
1210 }
1211
1212 userOK := 0
1213 for _, s := range userStates {
1214 if s.State == skills.StateNormal {
1215 userOK++
1216 }
1217 }
1218
1219 activeNames := make([]string, 0, len(activeSkills))
1220 for _, s := range activeSkills {
1221 activeNames = append(activeNames, s.Name)
1222 }
1223
1224 xml := skills.ToPromptXML(activeSkills)
1225
1226 slog.Info("Skill discovery complete",
1227 "component", "skills",
1228 "builtin_ok", len(builtin),
1229 "builtin_errors", countErrors(builtinStates),
1230 "user_ok", userOK,
1231 "user_errors", countErrors(userStates),
1232 "user_paths", len(userPaths),
1233 "deduped_total", len(allSkills),
1234 "active", len(activeSkills),
1235 "disabled", len(disabled),
1236 "prompt_bytes", len(xml),
1237 "prompt_tok_est", skills.ApproxTokenCount(xml),
1238 "active_names", activeNames,
1239 )
1240}