1package agent
2
3import (
4 "bytes"
5 "cmp"
6 "context"
7 "encoding/json"
8 "errors"
9 "fmt"
10 "io"
11 "log/slog"
12 "maps"
13 "net/http"
14 "os"
15 "path/filepath"
16 "slices"
17 "strings"
18
19 "charm.land/catwalk/pkg/catwalk"
20 "charm.land/fantasy"
21 "github.com/charmbracelet/crush/internal/agent/hyper"
22 "github.com/charmbracelet/crush/internal/agent/notify"
23 "github.com/charmbracelet/crush/internal/agent/prompt"
24 "github.com/charmbracelet/crush/internal/agent/tools"
25 "github.com/charmbracelet/crush/internal/config"
26 "github.com/charmbracelet/crush/internal/event"
27 "github.com/charmbracelet/crush/internal/filetracker"
28 "github.com/charmbracelet/crush/internal/history"
29 "github.com/charmbracelet/crush/internal/home"
30 "github.com/charmbracelet/crush/internal/hooks"
31 "github.com/charmbracelet/crush/internal/log"
32 "github.com/charmbracelet/crush/internal/lsp"
33 "github.com/charmbracelet/crush/internal/message"
34 "github.com/charmbracelet/crush/internal/oauth/copilot"
35 "github.com/charmbracelet/crush/internal/permission"
36 "github.com/charmbracelet/crush/internal/pubsub"
37 "github.com/charmbracelet/crush/internal/session"
38 "github.com/charmbracelet/crush/internal/skills"
39 "golang.org/x/sync/errgroup"
40
41 "charm.land/fantasy/providers/anthropic"
42 "charm.land/fantasy/providers/azure"
43 "charm.land/fantasy/providers/bedrock"
44 "charm.land/fantasy/providers/google"
45 "charm.land/fantasy/providers/openai"
46 "charm.land/fantasy/providers/openaicompat"
47 "charm.land/fantasy/providers/openrouter"
48 "charm.land/fantasy/providers/vercel"
49 openaisdk "github.com/charmbracelet/openai-go/option"
50 "github.com/qjebbs/go-jsons"
51)
52
53// Coordinator errors.
54var (
55 errCoderAgentNotConfigured = errors.New("coder agent not configured")
56 errModelProviderNotConfigured = errors.New("model provider not configured")
57 errLargeModelNotSelected = errors.New("large model not selected")
58 errSmallModelNotSelected = errors.New("small model not selected")
59 errLargeModelProviderNotConfigured = errors.New("large model provider not configured")
60 errSmallModelProviderNotConfigured = errors.New("small model provider not configured")
61 errLargeModelNotFound = errors.New("large model not found in provider config")
62 errSmallModelNotFound = errors.New("small model not found in provider config")
63)
64
65type Coordinator interface {
66 // INFO: (kujtim) this is not used yet we will use this when we have multiple agents
67 // SetMainAgent(string)
68 Run(ctx context.Context, sessionID, prompt string, attachments ...message.Attachment) (*fantasy.AgentResult, error)
69 Cancel(sessionID string)
70 CancelAll()
71 IsSessionBusy(sessionID string) bool
72 IsBusy() bool
73 QueuedPrompts(sessionID string) int
74 QueuedPromptsList(sessionID string) []string
75 ClearQueue(sessionID string)
76 Summarize(context.Context, string) error
77 Model() Model
78 UpdateModels(ctx context.Context) error
79}
80
81type coordinator struct {
82 cfg *config.ConfigStore
83 sessions session.Service
84 messages message.Service
85 permissions permission.Service
86 history history.Service
87 filetracker filetracker.Service
88 lspManager *lsp.Manager
89 notify pubsub.Publisher[notify.Notification]
90
91 currentAgent SessionAgent
92 agents map[string]SessionAgent
93
94 // Skills discovery results (session-start snapshot).
95 allSkills []*skills.Skill // Pre-filter: all discovered after dedup.
96 activeSkills []*skills.Skill // Post-filter: active skills only.
97 skillTracker *skills.Tracker
98
99 readyWg errgroup.Group
100}
101
102func NewCoordinator(
103 ctx context.Context,
104 cfg *config.ConfigStore,
105 sessions session.Service,
106 messages message.Service,
107 permissions permission.Service,
108 history history.Service,
109 filetracker filetracker.Service,
110 lspManager *lsp.Manager,
111 notify pubsub.Publisher[notify.Notification],
112) (Coordinator, error) {
113 // Discover skills once at session start.
114 allSkills, activeSkills := discoverSkills(cfg)
115 skillTracker := skills.NewTracker(activeSkills)
116
117 c := &coordinator{
118 cfg: cfg,
119 sessions: sessions,
120 messages: messages,
121 permissions: permissions,
122 history: history,
123 filetracker: filetracker,
124 lspManager: lspManager,
125 notify: notify,
126 agents: make(map[string]SessionAgent),
127 allSkills: allSkills,
128 activeSkills: activeSkills,
129 skillTracker: skillTracker,
130 }
131
132 agentCfg, ok := cfg.Config().Agents[config.AgentCoder]
133 if !ok {
134 return nil, errCoderAgentNotConfigured
135 }
136
137 // TODO: make this dynamic when we support multiple agents
138 prompt, err := coderPrompt(prompt.WithWorkingDir(c.cfg.WorkingDir()))
139 if err != nil {
140 return nil, err
141 }
142
143 agent, err := c.buildAgent(ctx, prompt, agentCfg, false)
144 if err != nil {
145 return nil, err
146 }
147 c.currentAgent = agent
148 c.agents[config.AgentCoder] = agent
149 return c, nil
150}
151
152// Run implements Coordinator.
153func (c *coordinator) Run(ctx context.Context, sessionID string, prompt string, attachments ...message.Attachment) (*fantasy.AgentResult, error) {
154 if err := c.readyWg.Wait(); err != nil {
155 return nil, err
156 }
157
158 // refresh models before each run
159 if err := c.UpdateModels(ctx); err != nil {
160 return nil, fmt.Errorf("failed to update models: %w", err)
161 }
162
163 model := c.currentAgent.Model()
164 maxTokens := model.CatwalkCfg.DefaultMaxTokens
165 if model.ModelCfg.MaxTokens != 0 {
166 maxTokens = model.ModelCfg.MaxTokens
167 }
168
169 if !model.CatwalkCfg.SupportsImages && attachments != nil {
170 // filter out image attachments
171 filteredAttachments := make([]message.Attachment, 0, len(attachments))
172 for _, att := range attachments {
173 if att.IsText() {
174 filteredAttachments = append(filteredAttachments, att)
175 }
176 }
177 attachments = filteredAttachments
178 }
179
180 providerCfg, ok := c.cfg.Config().Providers.Get(model.ModelCfg.Provider)
181 if !ok {
182 return nil, errModelProviderNotConfigured
183 }
184
185 mergedOptions, temp, topP, topK, freqPenalty, presPenalty := mergeCallOptions(model, providerCfg)
186
187 if err := c.refreshTokenIfExpired(ctx, providerCfg); err != nil {
188 // NOTE(@andreynering): We don't return here because the event handling to ask the user to reauthenticate
189 // depends on the flow below. If refresh fails, proceed with the token we have.
190 slog.Error("Failed to refresh OAuth2 token. Proceeding with existing token.", "error", err)
191 }
192
193 run := func() (*fantasy.AgentResult, error) {
194 return c.currentAgent.Run(ctx, SessionAgentCall{
195 SessionID: sessionID,
196 Prompt: prompt,
197 Attachments: attachments,
198 MaxOutputTokens: maxTokens,
199 ProviderOptions: mergedOptions,
200 Temperature: temp,
201 TopP: topP,
202 TopK: topK,
203 FrequencyPenalty: freqPenalty,
204 PresencePenalty: presPenalty,
205 })
206 }
207 beforeLoaded := c.skillTracker.LoadedNames()
208 result, originalErr := run()
209 logTurnSkillUsage(sessionID, prompt, c.activeSkills, c.skillTracker, beforeLoaded)
210
211 if c.isUnauthorized(originalErr) {
212 if err := c.retryAfterUnauthorized(ctx, providerCfg); err == nil {
213 return run()
214 }
215 }
216
217 return result, originalErr
218}
219
220func getProviderOptions(model Model, providerCfg config.ProviderConfig) fantasy.ProviderOptions {
221 options := fantasy.ProviderOptions{}
222
223 cfgOpts := []byte("{}")
224 providerCfgOpts := []byte("{}")
225 catwalkOpts := []byte("{}")
226
227 if model.ModelCfg.ProviderOptions != nil {
228 data, err := json.Marshal(model.ModelCfg.ProviderOptions)
229 if err == nil {
230 cfgOpts = data
231 }
232 }
233
234 if providerCfg.ProviderOptions != nil {
235 data, err := json.Marshal(providerCfg.ProviderOptions)
236 if err == nil {
237 providerCfgOpts = data
238 }
239 }
240
241 if model.CatwalkCfg.Options.ProviderOptions != nil {
242 data, err := json.Marshal(model.CatwalkCfg.Options.ProviderOptions)
243 if err == nil {
244 catwalkOpts = data
245 }
246 }
247
248 readers := []io.Reader{
249 bytes.NewReader(catwalkOpts),
250 bytes.NewReader(providerCfgOpts),
251 bytes.NewReader(cfgOpts),
252 }
253
254 got, err := jsons.Merge(readers)
255 if err != nil {
256 slog.Error("Could not merge call config", "err", err)
257 return options
258 }
259
260 mergedOptions := make(map[string]any)
261
262 err = json.Unmarshal([]byte(got), &mergedOptions)
263 if err != nil {
264 slog.Error("Could not create config for call", "err", err)
265 return options
266 }
267
268 switch providerCfg.Type {
269 case openai.Name, azure.Name:
270 _, hasReasoningEffort := mergedOptions["reasoning_effort"]
271 if !hasReasoningEffort && model.ModelCfg.ReasoningEffort != "" {
272 mergedOptions["reasoning_effort"] = model.ModelCfg.ReasoningEffort
273 }
274 if openai.IsResponsesModel(model.CatwalkCfg.ID) {
275 if openai.IsResponsesReasoningModel(model.CatwalkCfg.ID) {
276 mergedOptions["reasoning_summary"] = "auto"
277 mergedOptions["include"] = []openai.IncludeType{openai.IncludeReasoningEncryptedContent}
278 }
279 parsed, err := openai.ParseResponsesOptions(mergedOptions)
280 if err == nil {
281 options[openai.Name] = parsed
282 }
283 } else {
284 parsed, err := openai.ParseOptions(mergedOptions)
285 if err == nil {
286 options[openai.Name] = parsed
287 }
288 }
289 case anthropic.Name:
290 var (
291 _, hasEffort = mergedOptions["effort"]
292 _, hasThink = mergedOptions["thinking"]
293 )
294 switch {
295 case !hasEffort && model.ModelCfg.ReasoningEffort != "":
296 mergedOptions["effort"] = model.ModelCfg.ReasoningEffort
297 case !hasThink && model.ModelCfg.Think:
298 mergedOptions["thinking"] = map[string]any{"budget_tokens": 2000}
299 }
300 parsed, err := anthropic.ParseOptions(mergedOptions)
301 if err == nil {
302 options[anthropic.Name] = parsed
303 }
304
305 case openrouter.Name:
306 _, hasReasoning := mergedOptions["reasoning"]
307 if !hasReasoning && model.ModelCfg.ReasoningEffort != "" {
308 mergedOptions["reasoning"] = map[string]any{
309 "enabled": true,
310 "effort": model.ModelCfg.ReasoningEffort,
311 }
312 }
313 parsed, err := openrouter.ParseOptions(mergedOptions)
314 if err == nil {
315 options[openrouter.Name] = parsed
316 }
317 case vercel.Name:
318 _, hasReasoning := mergedOptions["reasoning"]
319 if !hasReasoning && model.ModelCfg.ReasoningEffort != "" {
320 mergedOptions["reasoning"] = map[string]any{
321 "enabled": true,
322 "effort": model.ModelCfg.ReasoningEffort,
323 }
324 }
325 parsed, err := vercel.ParseOptions(mergedOptions)
326 if err == nil {
327 options[vercel.Name] = parsed
328 }
329 case google.Name:
330 _, hasReasoning := mergedOptions["thinking_config"]
331 if !hasReasoning {
332 if strings.HasPrefix(model.CatwalkCfg.ID, "gemini-2") {
333 mergedOptions["thinking_config"] = map[string]any{
334 "thinking_budget": 2000,
335 "include_thoughts": true,
336 }
337 } else {
338 mergedOptions["thinking_config"] = map[string]any{
339 "thinking_level": model.ModelCfg.ReasoningEffort,
340 "include_thoughts": true,
341 }
342 }
343 }
344 parsed, err := google.ParseOptions(mergedOptions)
345 if err == nil {
346 options[google.Name] = parsed
347 }
348 case openaicompat.Name, hyper.Name:
349 _, hasReasoningEffort := mergedOptions["reasoning_effort"]
350 if !hasReasoningEffort && model.ModelCfg.ReasoningEffort != "" {
351 mergedOptions["reasoning_effort"] = model.ModelCfg.ReasoningEffort
352 }
353
354 extraBody := make(map[string]any)
355
356 // "reasoning effort" is a standard OpenAI field, but "thinking" is not.
357 // Setting it in the right way for each provider.
358 // TODO: Abstract this in Fantasy somehow?
359 // TODO: Allow custom providers to specify how to set this?
360 switch providerCfg.ID {
361 case hyper.Name:
362 extraBody["thinking"] = model.ModelCfg.Think
363 case string(catwalk.InferenceProviderIoNet):
364 extraBody["chat_template_kwargs"] = map[string]any{
365 "thinking": model.ModelCfg.Think,
366 }
367 case string(catwalk.InferenceProviderZAI):
368 if model.ModelCfg.Think {
369 extraBody["thinking"] = map[string]any{
370 "type": "enabled",
371 }
372 } else {
373 extraBody["thinking"] = map[string]any{
374 "type": "disabled",
375 }
376 }
377 }
378
379 mergedOptions["extra_body"] = extraBody
380
381 parsed, err := openaicompat.ParseOptions(mergedOptions)
382 if err == nil {
383 options[openaicompat.Name] = parsed
384 }
385 }
386
387 return options
388}
389
390func mergeCallOptions(model Model, cfg config.ProviderConfig) (fantasy.ProviderOptions, *float64, *float64, *int64, *float64, *float64) {
391 modelOptions := getProviderOptions(model, cfg)
392 temp := cmp.Or(model.ModelCfg.Temperature, model.CatwalkCfg.Options.Temperature)
393 topP := cmp.Or(model.ModelCfg.TopP, model.CatwalkCfg.Options.TopP)
394 topK := cmp.Or(model.ModelCfg.TopK, model.CatwalkCfg.Options.TopK)
395 freqPenalty := cmp.Or(model.ModelCfg.FrequencyPenalty, model.CatwalkCfg.Options.FrequencyPenalty)
396 presPenalty := cmp.Or(model.ModelCfg.PresencePenalty, model.CatwalkCfg.Options.PresencePenalty)
397 return modelOptions, temp, topP, topK, freqPenalty, presPenalty
398}
399
400func (c *coordinator) buildAgent(ctx context.Context, prompt *prompt.Prompt, agent config.Agent, isSubAgent bool) (SessionAgent, error) {
401 large, small, err := c.buildAgentModels(ctx, isSubAgent)
402 if err != nil {
403 return nil, err
404 }
405
406 largeProviderCfg, _ := c.cfg.Config().Providers.Get(large.ModelCfg.Provider)
407 result := NewSessionAgent(SessionAgentOptions{
408 LargeModel: large,
409 SmallModel: small,
410 SystemPromptPrefix: largeProviderCfg.SystemPromptPrefix,
411 SystemPrompt: "",
412 IsSubAgent: isSubAgent,
413 DisableAutoSummarize: c.cfg.Config().Options.DisableAutoSummarize,
414 IsYolo: c.permissions.SkipRequests(),
415 Sessions: c.sessions,
416 Messages: c.messages,
417 Tools: nil,
418 Notify: c.notify,
419 })
420
421 c.readyWg.Go(func() error {
422 systemPrompt, err := prompt.Build(ctx, large.Model.Provider(), large.Model.Model(), c.cfg)
423 if err != nil {
424 return err
425 }
426 result.SetSystemPrompt(systemPrompt)
427 return nil
428 })
429
430 c.readyWg.Go(func() error {
431 tools, err := c.buildTools(ctx, agent, isSubAgent)
432 if err != nil {
433 return err
434 }
435 result.SetTools(tools)
436 return nil
437 })
438
439 return result, nil
440}
441
442func (c *coordinator) buildTools(ctx context.Context, agent config.Agent, isSubAgent bool) ([]fantasy.AgentTool, error) {
443 var allTools []fantasy.AgentTool
444 if slices.Contains(agent.AllowedTools, AgentToolName) {
445 agentTool, err := c.agentTool(ctx)
446 if err != nil {
447 return nil, err
448 }
449 allTools = append(allTools, agentTool)
450 }
451
452 if slices.Contains(agent.AllowedTools, tools.AgenticFetchToolName) {
453 agenticFetchTool, err := c.agenticFetchTool(ctx, nil)
454 if err != nil {
455 return nil, err
456 }
457 allTools = append(allTools, agenticFetchTool)
458 }
459
460 // Get the model name for the agent
461 modelName := ""
462 if modelCfg, ok := c.cfg.Config().Models[agent.Model]; ok {
463 if model := c.cfg.Config().GetModel(modelCfg.Provider, modelCfg.Model); model != nil {
464 modelName = model.Name
465 }
466 }
467
468 logFile := filepath.Join(c.cfg.Config().Options.DataDirectory, "logs", "crush.log")
469
470 // Build hook runner if PreToolUse hooks are configured.
471 var hookRunner *hooks.Runner
472 if preToolHooks := c.cfg.Config().Hooks[hooks.EventPreToolUse]; len(preToolHooks) > 0 {
473 hookRunner = hooks.NewRunner(preToolHooks, c.cfg.WorkingDir(), c.cfg.WorkingDir())
474 }
475
476 allTools = append(allTools,
477 tools.NewBashTool(c.permissions, c.cfg.WorkingDir(), c.cfg.Config().Options.Attribution, modelName),
478 tools.NewCrushInfoTool(c.cfg, c.lspManager, c.allSkills, c.activeSkills, c.skillTracker),
479 tools.NewCrushLogsTool(logFile),
480 tools.NewJobOutputTool(),
481 tools.NewJobKillTool(),
482 tools.NewDownloadTool(c.permissions, c.cfg.WorkingDir(), nil),
483 tools.NewEditTool(c.lspManager, c.permissions, c.history, c.filetracker, c.cfg.WorkingDir()),
484 tools.NewMultiEditTool(c.lspManager, c.permissions, c.history, c.filetracker, c.cfg.WorkingDir()),
485 tools.NewFetchTool(c.permissions, c.cfg.WorkingDir(), nil),
486 tools.NewGlobTool(c.cfg.WorkingDir()),
487 tools.NewGrepTool(c.cfg.WorkingDir(), c.cfg.Config().Tools.Grep),
488 tools.NewLsTool(c.permissions, c.cfg.WorkingDir(), c.cfg.Config().Tools.Ls),
489 tools.NewSourcegraphTool(nil),
490 tools.NewTodosTool(c.sessions),
491 tools.NewTouchTool(c.lspManager, c.permissions, c.history, c.filetracker, c.cfg.WorkingDir()),
492 tools.NewViewTool(c.lspManager, c.permissions, c.filetracker, c.skillTracker, c.cfg.WorkingDir(), c.cfg.Config().Options.SkillsPaths...),
493 tools.NewWriteTool(c.lspManager, c.permissions, c.history, c.filetracker, c.cfg.WorkingDir()),
494 )
495
496 // Add LSP tools if user has configured LSPs or auto_lsp is enabled (nil or true).
497 if len(c.cfg.Config().LSP) > 0 || c.cfg.Config().Options.AutoLSP == nil || *c.cfg.Config().Options.AutoLSP {
498 allTools = append(allTools, tools.NewDiagnosticsTool(c.lspManager), tools.NewReferencesTool(c.lspManager), tools.NewLSPRestartTool(c.lspManager))
499 }
500
501 if len(c.cfg.Config().MCP) > 0 {
502 allTools = append(
503 allTools,
504 tools.NewListMCPResourcesTool(c.cfg, c.permissions),
505 tools.NewReadMCPResourceTool(c.cfg, c.permissions),
506 )
507 }
508
509 var filteredTools []fantasy.AgentTool
510 for _, tool := range allTools {
511 if slices.Contains(agent.AllowedTools, tool.Info().Name) {
512 filteredTools = append(filteredTools, tool)
513 }
514 }
515
516 for _, tool := range tools.GetMCPTools(c.permissions, c.cfg, c.cfg.WorkingDir()) {
517 if agent.AllowedMCP == nil {
518 // No MCP restrictions
519 filteredTools = append(filteredTools, tool)
520 continue
521 }
522 if len(agent.AllowedMCP) == 0 {
523 // No MCPs allowed
524 slog.Debug("No MCPs allowed", "tool", tool.Name(), "agent", agent.Name)
525 break
526 }
527
528 for mcp, tools := range agent.AllowedMCP {
529 if mcp != tool.MCP() {
530 continue
531 }
532 if len(tools) == 0 || slices.Contains(tools, tool.MCPToolName()) {
533 filteredTools = append(filteredTools, tool)
534 break
535 }
536 slog.Debug("MCP not allowed", "tool", tool.Name(), "agent", agent.Name)
537 }
538 }
539 slices.SortFunc(filteredTools, func(a, b fantasy.AgentTool) int {
540 return strings.Compare(a.Info().Name, b.Info().Name)
541 })
542
543 // Wrap tools with hook interception for the top-level agent only.
544 // Sub-agents (the `agent` task tool, `agentic_fetch`, etc.) run
545 // without hook interception to avoid firing the user's hook N times
546 // per delegated turn. The top-level invocation of the sub-agent tool
547 // itself is still wrapped from the coder's side.
548 filteredTools = wrapToolsWithHooks(filteredTools, hookRunner, isSubAgent)
549
550 return filteredTools, nil
551}
552
553// TODO: when we support multiple agents we need to change this so that we pass in the agent specific model config
554func (c *coordinator) buildAgentModels(ctx context.Context, isSubAgent bool) (Model, Model, error) {
555 largeModelCfg, ok := c.cfg.Config().Models[config.SelectedModelTypeLarge]
556 if !ok {
557 return Model{}, Model{}, errLargeModelNotSelected
558 }
559 smallModelCfg, ok := c.cfg.Config().Models[config.SelectedModelTypeSmall]
560 if !ok {
561 return Model{}, Model{}, errSmallModelNotSelected
562 }
563
564 largeProviderCfg, ok := c.cfg.Config().Providers.Get(largeModelCfg.Provider)
565 if !ok {
566 return Model{}, Model{}, errLargeModelProviderNotConfigured
567 }
568
569 largeProvider, err := c.buildProvider(largeProviderCfg, largeModelCfg, isSubAgent)
570 if err != nil {
571 return Model{}, Model{}, err
572 }
573
574 smallProviderCfg, ok := c.cfg.Config().Providers.Get(smallModelCfg.Provider)
575 if !ok {
576 return Model{}, Model{}, errSmallModelProviderNotConfigured
577 }
578
579 smallProvider, err := c.buildProvider(smallProviderCfg, smallModelCfg, true)
580 if err != nil {
581 return Model{}, Model{}, err
582 }
583
584 var largeCatwalkModel *catwalk.Model
585 var smallCatwalkModel *catwalk.Model
586
587 for _, m := range largeProviderCfg.Models {
588 if m.ID == largeModelCfg.Model {
589 largeCatwalkModel = &m
590 }
591 }
592 for _, m := range smallProviderCfg.Models {
593 if m.ID == smallModelCfg.Model {
594 smallCatwalkModel = &m
595 }
596 }
597
598 if largeCatwalkModel == nil {
599 return Model{}, Model{}, errLargeModelNotFound
600 }
601
602 if smallCatwalkModel == nil {
603 return Model{}, Model{}, errSmallModelNotFound
604 }
605
606 largeModelID := largeModelCfg.Model
607 smallModelID := smallModelCfg.Model
608
609 if largeModelCfg.Provider == openrouter.Name && isExactoSupported(largeModelID) {
610 largeModelID += ":exacto"
611 }
612
613 if smallModelCfg.Provider == openrouter.Name && isExactoSupported(smallModelID) {
614 smallModelID += ":exacto"
615 }
616
617 largeModel, err := largeProvider.LanguageModel(ctx, largeModelID)
618 if err != nil {
619 return Model{}, Model{}, err
620 }
621 smallModel, err := smallProvider.LanguageModel(ctx, smallModelID)
622 if err != nil {
623 return Model{}, Model{}, err
624 }
625
626 return Model{
627 Model: largeModel,
628 CatwalkCfg: *largeCatwalkModel,
629 ModelCfg: largeModelCfg,
630 FlatRate: largeProviderCfg.FlatRate,
631 }, Model{
632 Model: smallModel,
633 CatwalkCfg: *smallCatwalkModel,
634 ModelCfg: smallModelCfg,
635 FlatRate: smallProviderCfg.FlatRate,
636 }, nil
637}
638
639func (c *coordinator) buildAnthropicProvider(baseURL, apiKey string, headers map[string]string, providerID string) (fantasy.Provider, error) {
640 var opts []anthropic.Option
641
642 switch {
643 case strings.HasPrefix(apiKey, "Bearer "):
644 // NOTE: Prevent the SDK from picking up the API key from env.
645 os.Setenv("ANTHROPIC_API_KEY", "")
646 headers["Authorization"] = apiKey
647 case providerID == string(catwalk.InferenceProviderMiniMax) || providerID == string(catwalk.InferenceProviderMiniMaxChina):
648 // NOTE: Prevent the SDK from picking up the API key from env.
649 os.Setenv("ANTHROPIC_API_KEY", "")
650 headers["Authorization"] = "Bearer " + apiKey
651 case apiKey != "":
652 // X-Api-Key header
653 opts = append(opts, anthropic.WithAPIKey(apiKey))
654 }
655
656 if len(headers) > 0 {
657 opts = append(opts, anthropic.WithHeaders(headers))
658 }
659
660 if baseURL != "" {
661 opts = append(opts, anthropic.WithBaseURL(baseURL))
662 }
663
664 if c.cfg.Config().Options.Debug {
665 httpClient := log.NewHTTPClient()
666 opts = append(opts, anthropic.WithHTTPClient(httpClient))
667 }
668 return anthropic.New(opts...)
669}
670
671func (c *coordinator) buildOpenaiProvider(baseURL, apiKey string, headers map[string]string) (fantasy.Provider, error) {
672 opts := []openai.Option{
673 openai.WithAPIKey(apiKey),
674 openai.WithUseResponsesAPI(),
675 }
676 if c.cfg.Config().Options.Debug {
677 httpClient := log.NewHTTPClient()
678 opts = append(opts, openai.WithHTTPClient(httpClient))
679 }
680 if len(headers) > 0 {
681 opts = append(opts, openai.WithHeaders(headers))
682 }
683 if baseURL != "" {
684 opts = append(opts, openai.WithBaseURL(baseURL))
685 }
686 return openai.New(opts...)
687}
688
689func (c *coordinator) buildOpenrouterProvider(_, apiKey string, headers map[string]string) (fantasy.Provider, error) {
690 opts := []openrouter.Option{
691 openrouter.WithAPIKey(apiKey),
692 }
693 if c.cfg.Config().Options.Debug {
694 httpClient := log.NewHTTPClient()
695 opts = append(opts, openrouter.WithHTTPClient(httpClient))
696 }
697 if len(headers) > 0 {
698 opts = append(opts, openrouter.WithHeaders(headers))
699 }
700 return openrouter.New(opts...)
701}
702
703func (c *coordinator) buildVercelProvider(_, apiKey string, headers map[string]string) (fantasy.Provider, error) {
704 opts := []vercel.Option{
705 vercel.WithAPIKey(apiKey),
706 }
707 if c.cfg.Config().Options.Debug {
708 httpClient := log.NewHTTPClient()
709 opts = append(opts, vercel.WithHTTPClient(httpClient))
710 }
711 if len(headers) > 0 {
712 opts = append(opts, vercel.WithHeaders(headers))
713 }
714 return vercel.New(opts...)
715}
716
717func (c *coordinator) buildOpenaiCompatProvider(baseURL, apiKey string, headers map[string]string, extraBody map[string]any, providerID string, isSubAgent bool) (fantasy.Provider, error) {
718 opts := []openaicompat.Option{
719 openaicompat.WithBaseURL(baseURL),
720 openaicompat.WithAPIKey(apiKey),
721 }
722
723 // Set HTTP client based on provider and debug mode.
724 var httpClient *http.Client
725 if providerID == string(catwalk.InferenceProviderCopilot) {
726 opts = append(opts, openaicompat.WithUseResponsesAPI())
727 httpClient = copilot.NewClient(isSubAgent, c.cfg.Config().Options.Debug)
728 } else if c.cfg.Config().Options.Debug {
729 httpClient = log.NewHTTPClient()
730 }
731 if httpClient != nil {
732 opts = append(opts, openaicompat.WithHTTPClient(httpClient))
733 }
734
735 if len(headers) > 0 {
736 opts = append(opts, openaicompat.WithHeaders(headers))
737 }
738
739 for extraKey, extraValue := range extraBody {
740 opts = append(opts, openaicompat.WithSDKOptions(openaisdk.WithJSONSet(extraKey, extraValue)))
741 }
742
743 return openaicompat.New(opts...)
744}
745
746func (c *coordinator) buildAzureProvider(baseURL, apiKey string, headers map[string]string, options map[string]string) (fantasy.Provider, error) {
747 opts := []azure.Option{
748 azure.WithBaseURL(baseURL),
749 azure.WithAPIKey(apiKey),
750 azure.WithUseResponsesAPI(),
751 }
752 if c.cfg.Config().Options.Debug {
753 httpClient := log.NewHTTPClient()
754 opts = append(opts, azure.WithHTTPClient(httpClient))
755 }
756 if options == nil {
757 options = make(map[string]string)
758 }
759 if apiVersion, ok := options["apiVersion"]; ok {
760 opts = append(opts, azure.WithAPIVersion(apiVersion))
761 }
762 if len(headers) > 0 {
763 opts = append(opts, azure.WithHeaders(headers))
764 }
765
766 return azure.New(opts...)
767}
768
769func (c *coordinator) buildBedrockProvider(apiKey string, headers map[string]string) (fantasy.Provider, error) {
770 var opts []bedrock.Option
771 if c.cfg.Config().Options.Debug {
772 httpClient := log.NewHTTPClient()
773 opts = append(opts, bedrock.WithHTTPClient(httpClient))
774 }
775 if len(headers) > 0 {
776 opts = append(opts, bedrock.WithHeaders(headers))
777 }
778 switch {
779 case apiKey != "":
780 opts = append(opts, bedrock.WithAPIKey(apiKey))
781 case os.Getenv("AWS_BEARER_TOKEN_BEDROCK") != "":
782 opts = append(opts, bedrock.WithAPIKey(os.Getenv("AWS_BEARER_TOKEN_BEDROCK")))
783 default:
784 // Skip, let the SDK do authentication.
785 }
786 return bedrock.New(opts...)
787}
788
789func (c *coordinator) buildGoogleProvider(baseURL, apiKey string, headers map[string]string) (fantasy.Provider, error) {
790 opts := []google.Option{
791 google.WithBaseURL(baseURL),
792 google.WithGeminiAPIKey(apiKey),
793 }
794 if c.cfg.Config().Options.Debug {
795 httpClient := log.NewHTTPClient()
796 opts = append(opts, google.WithHTTPClient(httpClient))
797 }
798 if len(headers) > 0 {
799 opts = append(opts, google.WithHeaders(headers))
800 }
801 return google.New(opts...)
802}
803
804func (c *coordinator) buildGoogleVertexProvider(headers map[string]string, options map[string]string) (fantasy.Provider, error) {
805 opts := []google.Option{}
806 if c.cfg.Config().Options.Debug {
807 httpClient := log.NewHTTPClient()
808 opts = append(opts, google.WithHTTPClient(httpClient))
809 }
810 if len(headers) > 0 {
811 opts = append(opts, google.WithHeaders(headers))
812 }
813
814 project := options["project"]
815 location := options["location"]
816
817 opts = append(opts, google.WithVertex(project, location))
818
819 return google.New(opts...)
820}
821
822func (c *coordinator) isAnthropicThinking(model config.SelectedModel) bool {
823 if model.Think {
824 return true
825 }
826 opts, err := anthropic.ParseOptions(model.ProviderOptions)
827 return err == nil && opts.Thinking != nil
828}
829
830func (c *coordinator) buildProvider(providerCfg config.ProviderConfig, model config.SelectedModel, isSubAgent bool) (fantasy.Provider, error) {
831 headers := maps.Clone(providerCfg.ExtraHeaders)
832 if headers == nil {
833 headers = make(map[string]string)
834 }
835
836 // handle special headers for anthropic
837 if providerCfg.Type == anthropic.Name && c.isAnthropicThinking(model) {
838 if v, ok := headers["anthropic-beta"]; ok {
839 headers["anthropic-beta"] = v + ",interleaved-thinking-2025-05-14"
840 } else {
841 headers["anthropic-beta"] = "interleaved-thinking-2025-05-14"
842 }
843 }
844
845 apiKey, _ := c.cfg.Resolve(providerCfg.APIKey)
846 baseURL, _ := c.cfg.Resolve(providerCfg.BaseURL)
847
848 switch providerCfg.Type {
849 case openai.Name:
850 return c.buildOpenaiProvider(baseURL, apiKey, headers)
851 case anthropic.Name:
852 return c.buildAnthropicProvider(baseURL, apiKey, headers, providerCfg.ID)
853 case openrouter.Name:
854 return c.buildOpenrouterProvider(baseURL, apiKey, headers)
855 case vercel.Name:
856 return c.buildVercelProvider(baseURL, apiKey, headers)
857 case azure.Name:
858 return c.buildAzureProvider(baseURL, apiKey, headers, providerCfg.ExtraParams)
859 case bedrock.Name:
860 return c.buildBedrockProvider(apiKey, headers)
861 case google.Name:
862 return c.buildGoogleProvider(baseURL, apiKey, headers)
863 case "google-vertex":
864 return c.buildGoogleVertexProvider(headers, providerCfg.ExtraParams)
865 case openaicompat.Name, hyper.Name:
866 switch providerCfg.ID {
867 case hyper.Name:
868 baseURL = hyper.BaseURL() + "/v1"
869 headers["x-crush-id"] = event.GetID()
870 case string(catwalk.InferenceProviderZAI):
871 if providerCfg.ExtraBody == nil {
872 providerCfg.ExtraBody = map[string]any{}
873 }
874 providerCfg.ExtraBody["tool_stream"] = true
875 }
876 return c.buildOpenaiCompatProvider(baseURL, apiKey, headers, providerCfg.ExtraBody, providerCfg.ID, isSubAgent)
877 default:
878 return nil, fmt.Errorf("provider type not supported: %q", providerCfg.Type)
879 }
880}
881
882func isExactoSupported(modelID string) bool {
883 supportedModels := []string{
884 "moonshotai/kimi-k2-0905",
885 "deepseek/deepseek-v3.1-terminus",
886 "z-ai/glm-4.6",
887 "openai/gpt-oss-120b",
888 "qwen/qwen3-coder",
889 }
890 return slices.Contains(supportedModels, modelID)
891}
892
893func (c *coordinator) Cancel(sessionID string) {
894 c.currentAgent.Cancel(sessionID)
895}
896
897func (c *coordinator) CancelAll() {
898 c.currentAgent.CancelAll()
899}
900
901func (c *coordinator) ClearQueue(sessionID string) {
902 c.currentAgent.ClearQueue(sessionID)
903}
904
905func (c *coordinator) IsBusy() bool {
906 return c.currentAgent.IsBusy()
907}
908
909func (c *coordinator) IsSessionBusy(sessionID string) bool {
910 return c.currentAgent.IsSessionBusy(sessionID)
911}
912
913func (c *coordinator) Model() Model {
914 return c.currentAgent.Model()
915}
916
917func (c *coordinator) UpdateModels(ctx context.Context) error {
918 // build the models again so we make sure we get the latest config
919 large, small, err := c.buildAgentModels(ctx, false)
920 if err != nil {
921 return err
922 }
923 c.currentAgent.SetModels(large, small)
924
925 agentCfg, ok := c.cfg.Config().Agents[config.AgentCoder]
926 if !ok {
927 return errCoderAgentNotConfigured
928 }
929
930 tools, err := c.buildTools(ctx, agentCfg, false)
931 if err != nil {
932 return err
933 }
934 c.currentAgent.SetTools(tools)
935 return nil
936}
937
938func (c *coordinator) QueuedPrompts(sessionID string) int {
939 return c.currentAgent.QueuedPrompts(sessionID)
940}
941
942func (c *coordinator) QueuedPromptsList(sessionID string) []string {
943 return c.currentAgent.QueuedPromptsList(sessionID)
944}
945
946func (c *coordinator) Summarize(ctx context.Context, sessionID string) error {
947 providerCfg, ok := c.cfg.Config().Providers.Get(c.currentAgent.Model().ModelCfg.Provider)
948 if !ok {
949 return errModelProviderNotConfigured
950 }
951
952 if err := c.refreshTokenIfExpired(ctx, providerCfg); err != nil {
953 slog.Error("Failed to refresh OAuth2 token before summarize. Proceeding with existing token.", "error", err)
954 }
955
956 summarize := func() error {
957 return c.currentAgent.Summarize(ctx, sessionID, getProviderOptions(c.currentAgent.Model(), providerCfg))
958 }
959
960 err := summarize()
961 if err != nil && c.isUnauthorized(err) {
962 if retryErr := c.retryAfterUnauthorized(ctx, providerCfg); retryErr == nil {
963 return summarize()
964 }
965 }
966
967 return err
968}
969
970// refreshTokenIfExpired proactively refreshes the OAuth token if it has expired.
971func (c *coordinator) refreshTokenIfExpired(ctx context.Context, providerCfg config.ProviderConfig) error {
972 if providerCfg.OAuthToken == nil || !providerCfg.OAuthToken.IsExpired() {
973 return nil
974 }
975 slog.Debug("Token needs to be refreshed", "provider", providerCfg.ID)
976 return c.refreshOAuth2Token(ctx, providerCfg)
977}
978
979// retryAfterUnauthorized attempts to refresh credentials after receiving a 401
980// and returns nil if retry should be attempted.
981func (c *coordinator) retryAfterUnauthorized(ctx context.Context, providerCfg config.ProviderConfig) error {
982 switch {
983 case providerCfg.OAuthToken != nil:
984 slog.Debug("Received 401. Refreshing token and retrying", "provider", providerCfg.ID)
985 return c.refreshOAuth2Token(ctx, providerCfg)
986 case strings.Contains(providerCfg.APIKeyTemplate, "$"):
987 slog.Debug("Received 401. Refreshing API Key template and retrying", "provider", providerCfg.ID)
988 return c.refreshApiKeyTemplate(ctx, providerCfg)
989 default:
990 return nil
991 }
992}
993
994func (c *coordinator) isUnauthorized(err error) bool {
995 var providerErr *fantasy.ProviderError
996 return errors.As(err, &providerErr) && providerErr.StatusCode == http.StatusUnauthorized
997}
998
999func (c *coordinator) refreshOAuth2Token(ctx context.Context, providerCfg config.ProviderConfig) error {
1000 if err := c.cfg.RefreshOAuthToken(ctx, config.ScopeGlobal, providerCfg.ID); err != nil {
1001 slog.Error("Failed to refresh OAuth token after 401 error", "provider", providerCfg.ID, "error", err)
1002 return err
1003 }
1004 if err := c.UpdateModels(ctx); err != nil {
1005 return err
1006 }
1007 return nil
1008}
1009
1010func (c *coordinator) refreshApiKeyTemplate(ctx context.Context, providerCfg config.ProviderConfig) error {
1011 newAPIKey, err := c.cfg.Resolve(providerCfg.APIKeyTemplate)
1012 if err != nil {
1013 slog.Error("Failed to re-resolve API key after 401 error", "provider", providerCfg.ID, "error", err)
1014 return err
1015 }
1016
1017 providerCfg.APIKey = newAPIKey
1018 c.cfg.Config().Providers.Set(providerCfg.ID, providerCfg)
1019
1020 if err := c.UpdateModels(ctx); err != nil {
1021 return err
1022 }
1023 return nil
1024}
1025
1026// subAgentParams holds the parameters for running a sub-agent.
1027type subAgentParams struct {
1028 Agent SessionAgent
1029 SessionID string
1030 AgentMessageID string
1031 ToolCallID string
1032 Prompt string
1033 SessionTitle string
1034 // SessionSetup is an optional callback invoked after session creation
1035 // but before agent execution, for custom session configuration.
1036 SessionSetup func(sessionID string)
1037}
1038
1039// runSubAgent runs a sub-agent and handles session management and cost accumulation.
1040// It creates a sub-session, runs the agent with the given prompt, and propagates
1041// the cost to the parent session.
1042func (c *coordinator) runSubAgent(ctx context.Context, params subAgentParams) (fantasy.ToolResponse, error) {
1043 // Create sub-session
1044 agentToolSessionID := c.sessions.CreateAgentToolSessionID(params.AgentMessageID, params.ToolCallID)
1045 session, err := c.sessions.CreateTaskSession(ctx, agentToolSessionID, params.SessionID, params.SessionTitle)
1046 if err != nil {
1047 return fantasy.ToolResponse{}, fmt.Errorf("create session: %w", err)
1048 }
1049
1050 // Call session setup function if provided
1051 if params.SessionSetup != nil {
1052 params.SessionSetup(session.ID)
1053 }
1054
1055 // Get model configuration
1056 model := params.Agent.Model()
1057 maxTokens := model.CatwalkCfg.DefaultMaxTokens
1058 if model.ModelCfg.MaxTokens != 0 {
1059 maxTokens = model.ModelCfg.MaxTokens
1060 }
1061
1062 providerCfg, ok := c.cfg.Config().Providers.Get(model.ModelCfg.Provider)
1063 if !ok {
1064 return fantasy.ToolResponse{}, errModelProviderNotConfigured
1065 }
1066
1067 // Run the agent
1068 result, err := params.Agent.Run(ctx, SessionAgentCall{
1069 SessionID: session.ID,
1070 Prompt: params.Prompt,
1071 MaxOutputTokens: maxTokens,
1072 ProviderOptions: getProviderOptions(model, providerCfg),
1073 Temperature: model.ModelCfg.Temperature,
1074 TopP: model.ModelCfg.TopP,
1075 TopK: model.ModelCfg.TopK,
1076 FrequencyPenalty: model.ModelCfg.FrequencyPenalty,
1077 PresencePenalty: model.ModelCfg.PresencePenalty,
1078 NonInteractive: true,
1079 })
1080 if err != nil {
1081 return fantasy.NewTextErrorResponse(fmt.Sprintf("Failed to generate response: %s", err)), nil
1082 }
1083
1084 // Update parent session cost
1085 if err := c.updateParentSessionCost(ctx, session.ID, params.SessionID); err != nil {
1086 return fantasy.ToolResponse{}, err
1087 }
1088
1089 return fantasy.NewTextResponse(result.Response.Content.Text()), nil
1090}
1091
1092// updateParentSessionCost accumulates the cost from a child session to its parent session.
1093func (c *coordinator) updateParentSessionCost(ctx context.Context, childSessionID, parentSessionID string) error {
1094 childSession, err := c.sessions.Get(ctx, childSessionID)
1095 if err != nil {
1096 return fmt.Errorf("get child session: %w", err)
1097 }
1098
1099 parentSession, err := c.sessions.Get(ctx, parentSessionID)
1100 if err != nil {
1101 return fmt.Errorf("get parent session: %w", err)
1102 }
1103
1104 parentSession.Cost += childSession.Cost
1105
1106 if _, err := c.sessions.Save(ctx, parentSession); err != nil {
1107 return fmt.Errorf("save parent session: %w", err)
1108 }
1109
1110 return nil
1111}
1112
1113// discoverSkills runs the skill discovery pipeline and returns both the
1114// pre-filter (all discovered, after dedup) and post-filter (active) lists.
1115// It also emits a single diagnostic log line summarising the outcome to
1116// help track skill-loading health over time.
1117func discoverSkills(cfg *config.ConfigStore) (allSkills, activeSkills []*skills.Skill) {
1118 builtin, builtinStates := skills.DiscoverBuiltinWithStates()
1119 discovered := append([]*skills.Skill(nil), builtin...)
1120
1121 var userStates []*skills.SkillState
1122 var userPaths []string
1123
1124 opts := cfg.Config().Options
1125 if opts != nil && len(opts.SkillsPaths) > 0 {
1126 userPaths = make([]string, 0, len(opts.SkillsPaths))
1127 for _, pth := range opts.SkillsPaths {
1128 expanded := home.Long(pth)
1129 if strings.HasPrefix(expanded, "$") {
1130 if resolved, err := cfg.Resolver().ResolveValue(expanded); err == nil {
1131 expanded = resolved
1132 }
1133 }
1134 userPaths = append(userPaths, expanded)
1135 }
1136 var userSkills []*skills.Skill
1137 userSkills, userStates = skills.DiscoverWithStates(userPaths)
1138 discovered = append(discovered, userSkills...)
1139 }
1140
1141 allSkills = skills.Deduplicate(discovered)
1142 var disabledSkills []string
1143 if opts != nil {
1144 disabledSkills = opts.DisabledSkills
1145 }
1146 activeSkills = skills.Filter(allSkills, disabledSkills)
1147
1148 logDiscoveryStats(builtin, builtinStates, userStates, userPaths, allSkills, activeSkills, disabledSkills)
1149 return allSkills, activeSkills
1150}
1151
1152// logTurnSkillUsage emits a per-turn diagnostic line showing which skills
1153// (if any) were loaded during this turn and which looked relevant based on
1154// a cheap keyword match against the user prompt. The goal is to surface
1155// "should-have-loaded but didn't" situations for later analysis.
1156//
1157// Logged at Info level under component=skills; heavy fields are elided when
1158// there is nothing interesting to report.
1159func logTurnSkillUsage(
1160 sessionID string,
1161 prompt string,
1162 activeSkills []*skills.Skill,
1163 tracker *skills.Tracker,
1164 before []string,
1165) {
1166 if tracker == nil || len(activeSkills) == 0 {
1167 return
1168 }
1169
1170 after := tracker.LoadedNames()
1171
1172 beforeSet := make(map[string]bool, len(before))
1173 for _, n := range before {
1174 beforeSet[n] = true
1175 }
1176 var loadedThisTurn []string
1177 for _, n := range after {
1178 if !beforeSet[n] {
1179 loadedThisTurn = append(loadedThisTurn, n)
1180 }
1181 }
1182
1183 slog.Info("Skill turn summary",
1184 "component", "skills",
1185 "session_id", sessionID,
1186 "prompt_len", len(prompt),
1187 "active_total", len(activeSkills),
1188 "loaded_total", len(after),
1189 "loaded_this_turn", loadedThisTurn,
1190 )
1191}
1192
1193// logDiscoveryStats emits a single structured log line summarising skill
1194// discovery for the current session. It is intentionally low-volume: one
1195// line per session start.
1196func logDiscoveryStats(
1197 builtin []*skills.Skill,
1198 builtinStates, userStates []*skills.SkillState,
1199 userPaths []string,
1200 allSkills, activeSkills []*skills.Skill,
1201 disabled []string,
1202) {
1203 countErrors := func(states []*skills.SkillState) int {
1204 n := 0
1205 for _, s := range states {
1206 if s.State == skills.StateError {
1207 n++
1208 }
1209 }
1210 return n
1211 }
1212
1213 userOK := 0
1214 for _, s := range userStates {
1215 if s.State == skills.StateNormal {
1216 userOK++
1217 }
1218 }
1219
1220 activeNames := make([]string, 0, len(activeSkills))
1221 for _, s := range activeSkills {
1222 activeNames = append(activeNames, s.Name)
1223 }
1224
1225 xml := skills.ToPromptXML(activeSkills)
1226
1227 slog.Info("Skill discovery complete",
1228 "component", "skills",
1229 "builtin_ok", len(builtin),
1230 "builtin_errors", countErrors(builtinStates),
1231 "user_ok", userOK,
1232 "user_errors", countErrors(userStates),
1233 "user_paths", len(userPaths),
1234 "deduped_total", len(allSkills),
1235 "active", len(activeSkills),
1236 "disabled", len(disabled),
1237 "prompt_bytes", len(xml),
1238 "prompt_tok_est", skills.ApproxTokenCount(xml),
1239 "active_names", activeNames,
1240 )
1241}