1package agent
2
3import (
4 "bytes"
5 "cmp"
6 "context"
7 "encoding/json"
8 "errors"
9 "fmt"
10 "io"
11 "log/slog"
12 "maps"
13 "net/http"
14 "os"
15 "path/filepath"
16 "slices"
17 "strings"
18
19 "charm.land/catwalk/pkg/catwalk"
20 "charm.land/fantasy"
21 "github.com/charmbracelet/crush/internal/agent/hyper"
22 "github.com/charmbracelet/crush/internal/agent/notify"
23 "github.com/charmbracelet/crush/internal/agent/prompt"
24 "github.com/charmbracelet/crush/internal/agent/tools"
25 "github.com/charmbracelet/crush/internal/config"
26 "github.com/charmbracelet/crush/internal/event"
27 "github.com/charmbracelet/crush/internal/filetracker"
28 "github.com/charmbracelet/crush/internal/history"
29 "github.com/charmbracelet/crush/internal/home"
30 "github.com/charmbracelet/crush/internal/hooks"
31 "github.com/charmbracelet/crush/internal/log"
32 "github.com/charmbracelet/crush/internal/lsp"
33 "github.com/charmbracelet/crush/internal/message"
34 "github.com/charmbracelet/crush/internal/oauth/copilot"
35 "github.com/charmbracelet/crush/internal/permission"
36 "github.com/charmbracelet/crush/internal/pubsub"
37 "github.com/charmbracelet/crush/internal/session"
38 "github.com/charmbracelet/crush/internal/skills"
39 "golang.org/x/sync/errgroup"
40
41 "charm.land/fantasy/providers/anthropic"
42 "charm.land/fantasy/providers/azure"
43 "charm.land/fantasy/providers/bedrock"
44 "charm.land/fantasy/providers/google"
45 "charm.land/fantasy/providers/openai"
46 "charm.land/fantasy/providers/openaicompat"
47 "charm.land/fantasy/providers/openrouter"
48 "charm.land/fantasy/providers/vercel"
49 openaisdk "github.com/charmbracelet/openai-go/option"
50 "github.com/qjebbs/go-jsons"
51)
52
53// Coordinator errors.
54var (
55 errCoderAgentNotConfigured = errors.New("coder agent not configured")
56 errModelProviderNotConfigured = errors.New("model provider not configured")
57 errLargeModelNotSelected = errors.New("large model not selected")
58 errSmallModelNotSelected = errors.New("small model not selected")
59 errLargeModelProviderNotConfigured = errors.New("large model provider not configured")
60 errSmallModelProviderNotConfigured = errors.New("small model provider not configured")
61 errLargeModelNotFound = errors.New("large model not found in provider config")
62 errSmallModelNotFound = errors.New("small model not found in provider config")
63)
64
65type Coordinator interface {
66 // INFO: (kujtim) this is not used yet we will use this when we have multiple agents
67 // SetMainAgent(string)
68 Run(ctx context.Context, sessionID, prompt string, attachments ...message.Attachment) (*fantasy.AgentResult, error)
69 Cancel(sessionID string)
70 CancelAll()
71 IsSessionBusy(sessionID string) bool
72 IsBusy() bool
73 QueuedPrompts(sessionID string) int
74 QueuedPromptsList(sessionID string) []string
75 ClearQueue(sessionID string)
76 Summarize(context.Context, string) error
77 Model() Model
78 UpdateModels(ctx context.Context) error
79}
80
81type coordinator struct {
82 cfg *config.ConfigStore
83 sessions session.Service
84 messages message.Service
85 permissions permission.Service
86 history history.Service
87 filetracker filetracker.Service
88 lspManager *lsp.Manager
89 notify pubsub.Publisher[notify.Notification]
90
91 currentAgent SessionAgent
92 agents map[string]SessionAgent
93
94 // Skills discovery results (session-start snapshot).
95 allSkills []*skills.Skill // Pre-filter: all discovered after dedup.
96 activeSkills []*skills.Skill // Post-filter: active skills only.
97 skillTracker *skills.Tracker
98
99 readyWg errgroup.Group
100}
101
102func NewCoordinator(
103 ctx context.Context,
104 cfg *config.ConfigStore,
105 sessions session.Service,
106 messages message.Service,
107 permissions permission.Service,
108 history history.Service,
109 filetracker filetracker.Service,
110 lspManager *lsp.Manager,
111 notify pubsub.Publisher[notify.Notification],
112) (Coordinator, error) {
113 // Discover skills once at session start.
114 allSkills, activeSkills := discoverSkills(cfg)
115 skillTracker := skills.NewTracker(activeSkills)
116
117 c := &coordinator{
118 cfg: cfg,
119 sessions: sessions,
120 messages: messages,
121 permissions: permissions,
122 history: history,
123 filetracker: filetracker,
124 lspManager: lspManager,
125 notify: notify,
126 agents: make(map[string]SessionAgent),
127 allSkills: allSkills,
128 activeSkills: activeSkills,
129 skillTracker: skillTracker,
130 }
131
132 agentCfg, ok := cfg.Config().Agents[config.AgentCoder]
133 if !ok {
134 return nil, errCoderAgentNotConfigured
135 }
136
137 // TODO: make this dynamic when we support multiple agents
138 prompt, err := coderPrompt(prompt.WithWorkingDir(c.cfg.WorkingDir()))
139 if err != nil {
140 return nil, err
141 }
142
143 agent, err := c.buildAgent(ctx, prompt, agentCfg, false)
144 if err != nil {
145 return nil, err
146 }
147 c.currentAgent = agent
148 c.agents[config.AgentCoder] = agent
149 return c, nil
150}
151
152// Run implements Coordinator.
153func (c *coordinator) Run(ctx context.Context, sessionID string, prompt string, attachments ...message.Attachment) (*fantasy.AgentResult, error) {
154 if err := c.readyWg.Wait(); err != nil {
155 return nil, err
156 }
157
158 // refresh models before each run
159 if err := c.UpdateModels(ctx); err != nil {
160 return nil, fmt.Errorf("failed to update models: %w", err)
161 }
162
163 model := c.currentAgent.Model()
164 maxTokens := model.CatwalkCfg.DefaultMaxTokens
165 if model.ModelCfg.MaxTokens != 0 {
166 maxTokens = model.ModelCfg.MaxTokens
167 }
168
169 if !model.CatwalkCfg.SupportsImages && attachments != nil {
170 // filter out image attachments
171 filteredAttachments := make([]message.Attachment, 0, len(attachments))
172 for _, att := range attachments {
173 if att.IsText() {
174 filteredAttachments = append(filteredAttachments, att)
175 }
176 }
177 attachments = filteredAttachments
178 }
179
180 providerCfg, ok := c.cfg.Config().Providers.Get(model.ModelCfg.Provider)
181 if !ok {
182 return nil, errModelProviderNotConfigured
183 }
184
185 mergedOptions, temp, topP, topK, freqPenalty, presPenalty := mergeCallOptions(model, providerCfg)
186
187 if err := c.refreshTokenIfExpired(ctx, providerCfg); err != nil {
188 // NOTE(@andreynering): We don't return here because the event handling to ask the user to reauthenticate
189 // depends on the flow below. If refresh fails, proceed with the token we have.
190 slog.Error("Failed to refresh OAuth2 token. Proceeding with existing token.", "error", err)
191 }
192
193 run := func() (*fantasy.AgentResult, error) {
194 return c.currentAgent.Run(ctx, SessionAgentCall{
195 SessionID: sessionID,
196 Prompt: prompt,
197 Attachments: attachments,
198 MaxOutputTokens: maxTokens,
199 ProviderOptions: mergedOptions,
200 Temperature: temp,
201 TopP: topP,
202 TopK: topK,
203 FrequencyPenalty: freqPenalty,
204 PresencePenalty: presPenalty,
205 })
206 }
207 beforeLoaded := c.skillTracker.LoadedNames()
208 result, originalErr := run()
209 logTurnSkillUsage(sessionID, prompt, c.activeSkills, c.skillTracker, beforeLoaded)
210
211 if c.isUnauthorized(originalErr) {
212 if err := c.retryAfterUnauthorized(ctx, providerCfg); err == nil {
213 return run()
214 }
215 }
216
217 return result, originalErr
218}
219
220func getProviderOptions(model Model, providerCfg config.ProviderConfig) fantasy.ProviderOptions {
221 options := fantasy.ProviderOptions{}
222
223 cfgOpts := []byte("{}")
224 providerCfgOpts := []byte("{}")
225 catwalkOpts := []byte("{}")
226
227 if model.ModelCfg.ProviderOptions != nil {
228 data, err := json.Marshal(model.ModelCfg.ProviderOptions)
229 if err == nil {
230 cfgOpts = data
231 }
232 }
233
234 if providerCfg.ProviderOptions != nil {
235 data, err := json.Marshal(providerCfg.ProviderOptions)
236 if err == nil {
237 providerCfgOpts = data
238 }
239 }
240
241 if model.CatwalkCfg.Options.ProviderOptions != nil {
242 data, err := json.Marshal(model.CatwalkCfg.Options.ProviderOptions)
243 if err == nil {
244 catwalkOpts = data
245 }
246 }
247
248 readers := []io.Reader{
249 bytes.NewReader(catwalkOpts),
250 bytes.NewReader(providerCfgOpts),
251 bytes.NewReader(cfgOpts),
252 }
253
254 got, err := jsons.Merge(readers)
255 if err != nil {
256 slog.Error("Could not merge call config", "err", err)
257 return options
258 }
259
260 mergedOptions := make(map[string]any)
261
262 err = json.Unmarshal([]byte(got), &mergedOptions)
263 if err != nil {
264 slog.Error("Could not create config for call", "err", err)
265 return options
266 }
267
268 switch providerCfg.Type {
269 case openai.Name, azure.Name:
270 _, hasReasoningEffort := mergedOptions["reasoning_effort"]
271 if !hasReasoningEffort && model.ModelCfg.ReasoningEffort != "" {
272 mergedOptions["reasoning_effort"] = model.ModelCfg.ReasoningEffort
273 }
274 if openai.IsResponsesModel(model.CatwalkCfg.ID) {
275 if openai.IsResponsesReasoningModel(model.CatwalkCfg.ID) {
276 mergedOptions["reasoning_summary"] = "auto"
277 mergedOptions["include"] = []openai.IncludeType{openai.IncludeReasoningEncryptedContent}
278 }
279 parsed, err := openai.ParseResponsesOptions(mergedOptions)
280 if err == nil {
281 options[openai.Name] = parsed
282 }
283 } else {
284 parsed, err := openai.ParseOptions(mergedOptions)
285 if err == nil {
286 options[openai.Name] = parsed
287 }
288 }
289 case anthropic.Name:
290 var (
291 _, hasEffort = mergedOptions["effort"]
292 _, hasThink = mergedOptions["thinking"]
293 )
294 switch {
295 case !hasEffort && model.ModelCfg.ReasoningEffort != "":
296 mergedOptions["effort"] = model.ModelCfg.ReasoningEffort
297 case !hasThink && model.ModelCfg.Think:
298 mergedOptions["thinking"] = map[string]any{"budget_tokens": 2000}
299 }
300 parsed, err := anthropic.ParseOptions(mergedOptions)
301 if err == nil {
302 options[anthropic.Name] = parsed
303 }
304
305 case openrouter.Name:
306 _, hasReasoning := mergedOptions["reasoning"]
307 if !hasReasoning && model.ModelCfg.ReasoningEffort != "" {
308 mergedOptions["reasoning"] = map[string]any{
309 "enabled": true,
310 "effort": model.ModelCfg.ReasoningEffort,
311 }
312 }
313 parsed, err := openrouter.ParseOptions(mergedOptions)
314 if err == nil {
315 options[openrouter.Name] = parsed
316 }
317 case vercel.Name:
318 _, hasReasoning := mergedOptions["reasoning"]
319 if !hasReasoning && model.ModelCfg.ReasoningEffort != "" {
320 mergedOptions["reasoning"] = map[string]any{
321 "enabled": true,
322 "effort": model.ModelCfg.ReasoningEffort,
323 }
324 }
325 parsed, err := vercel.ParseOptions(mergedOptions)
326 if err == nil {
327 options[vercel.Name] = parsed
328 }
329 case google.Name:
330 _, hasReasoning := mergedOptions["thinking_config"]
331 if !hasReasoning {
332 if strings.HasPrefix(model.CatwalkCfg.ID, "gemini-2") {
333 mergedOptions["thinking_config"] = map[string]any{
334 "thinking_budget": 2000,
335 "include_thoughts": true,
336 }
337 } else {
338 mergedOptions["thinking_config"] = map[string]any{
339 "thinking_level": model.ModelCfg.ReasoningEffort,
340 "include_thoughts": true,
341 }
342 }
343 }
344 parsed, err := google.ParseOptions(mergedOptions)
345 if err == nil {
346 options[google.Name] = parsed
347 }
348 case openaicompat.Name, hyper.Name:
349 _, hasReasoningEffort := mergedOptions["reasoning_effort"]
350 if !hasReasoningEffort && model.ModelCfg.ReasoningEffort != "" {
351 mergedOptions["reasoning_effort"] = model.ModelCfg.ReasoningEffort
352 }
353
354 extraBody := make(map[string]any)
355
356 // "reasoning effort" is a standard OpenAI field, but "thinking" is not.
357 // Setting it in the right way for each provider.
358 // TODO: Abstract this in Fantasy somehow?
359 // TODO: Allow custom providers to specify how to set this?
360 switch providerCfg.ID {
361 case hyper.Name:
362 extraBody["thinking"] = model.ModelCfg.Think
363 case string(catwalk.InferenceProviderIoNet):
364 extraBody["chat_template_kwargs"] = map[string]any{
365 "thinking": model.ModelCfg.Think,
366 }
367 case string(catwalk.InferenceProviderZAI):
368 if model.ModelCfg.Think {
369 extraBody["thinking"] = map[string]any{
370 "type": "enabled",
371 }
372 } else {
373 extraBody["thinking"] = map[string]any{
374 "type": "disabled",
375 }
376 }
377 }
378
379 mergedOptions["extra_body"] = extraBody
380
381 parsed, err := openaicompat.ParseOptions(mergedOptions)
382 if err == nil {
383 options[openaicompat.Name] = parsed
384 }
385 }
386
387 return options
388}
389
390func mergeCallOptions(model Model, cfg config.ProviderConfig) (fantasy.ProviderOptions, *float64, *float64, *int64, *float64, *float64) {
391 modelOptions := getProviderOptions(model, cfg)
392 temp := cmp.Or(model.ModelCfg.Temperature, model.CatwalkCfg.Options.Temperature)
393 topP := cmp.Or(model.ModelCfg.TopP, model.CatwalkCfg.Options.TopP)
394 topK := cmp.Or(model.ModelCfg.TopK, model.CatwalkCfg.Options.TopK)
395 freqPenalty := cmp.Or(model.ModelCfg.FrequencyPenalty, model.CatwalkCfg.Options.FrequencyPenalty)
396 presPenalty := cmp.Or(model.ModelCfg.PresencePenalty, model.CatwalkCfg.Options.PresencePenalty)
397 return modelOptions, temp, topP, topK, freqPenalty, presPenalty
398}
399
400func (c *coordinator) buildAgent(ctx context.Context, prompt *prompt.Prompt, agent config.Agent, isSubAgent bool) (SessionAgent, error) {
401 large, small, err := c.buildAgentModels(ctx, isSubAgent)
402 if err != nil {
403 return nil, err
404 }
405
406 largeProviderCfg, _ := c.cfg.Config().Providers.Get(large.ModelCfg.Provider)
407 result := NewSessionAgent(SessionAgentOptions{
408 LargeModel: large,
409 SmallModel: small,
410 SystemPromptPrefix: largeProviderCfg.SystemPromptPrefix,
411 SystemPrompt: "",
412 IsSubAgent: isSubAgent,
413 DisableAutoSummarize: c.cfg.Config().Options.DisableAutoSummarize,
414 IsYolo: c.permissions.SkipRequests(),
415 Sessions: c.sessions,
416 Messages: c.messages,
417 Tools: nil,
418 Notify: c.notify,
419 })
420
421 c.readyWg.Go(func() error {
422 systemPrompt, err := prompt.Build(ctx, large.Model.Provider(), large.Model.Model(), c.cfg)
423 if err != nil {
424 return err
425 }
426 result.SetSystemPrompt(systemPrompt)
427 return nil
428 })
429
430 c.readyWg.Go(func() error {
431 tools, err := c.buildTools(ctx, agent, isSubAgent)
432 if err != nil {
433 return err
434 }
435 result.SetTools(tools)
436 return nil
437 })
438
439 return result, nil
440}
441
442func (c *coordinator) buildTools(ctx context.Context, agent config.Agent, isSubAgent bool) ([]fantasy.AgentTool, error) {
443 var allTools []fantasy.AgentTool
444 if slices.Contains(agent.AllowedTools, AgentToolName) {
445 agentTool, err := c.agentTool(ctx)
446 if err != nil {
447 return nil, err
448 }
449 allTools = append(allTools, agentTool)
450 }
451
452 if slices.Contains(agent.AllowedTools, tools.AgenticFetchToolName) {
453 agenticFetchTool, err := c.agenticFetchTool(ctx, nil)
454 if err != nil {
455 return nil, err
456 }
457 allTools = append(allTools, agenticFetchTool)
458 }
459
460 // Get the model name for the agent
461 modelName := ""
462 if modelCfg, ok := c.cfg.Config().Models[agent.Model]; ok {
463 if model := c.cfg.Config().GetModel(modelCfg.Provider, modelCfg.Model); model != nil {
464 modelName = model.Name
465 }
466 }
467
468 logFile := filepath.Join(c.cfg.Config().Options.DataDirectory, "logs", "crush.log")
469
470 // Build hook runner if PreToolUse hooks are configured.
471 var hookRunner *hooks.Runner
472 if preToolHooks := c.cfg.Config().Hooks[hooks.EventPreToolUse]; len(preToolHooks) > 0 {
473 hookRunner = hooks.NewRunner(preToolHooks, c.cfg.WorkingDir(), c.cfg.WorkingDir())
474 }
475
476 allTools = append(allTools,
477 tools.NewBashTool(c.permissions, c.cfg.WorkingDir(), c.cfg.Config().Options.Attribution, modelName),
478 tools.NewCrushInfoTool(c.cfg, c.lspManager, c.allSkills, c.activeSkills, c.skillTracker),
479 tools.NewCrushLogsTool(logFile),
480 tools.NewJobOutputTool(),
481 tools.NewJobKillTool(),
482 tools.NewDownloadTool(c.permissions, c.cfg.WorkingDir(), nil),
483 tools.NewEditTool(c.lspManager, c.permissions, c.history, c.filetracker, c.cfg.WorkingDir()),
484 tools.NewMultiEditTool(c.lspManager, c.permissions, c.history, c.filetracker, c.cfg.WorkingDir()),
485 tools.NewFetchTool(c.permissions, c.cfg.WorkingDir(), nil),
486 tools.NewGlobTool(c.cfg.WorkingDir()),
487 tools.NewGrepTool(c.cfg.WorkingDir(), c.cfg.Config().Tools.Grep),
488 tools.NewLsTool(c.permissions, c.cfg.WorkingDir(), c.cfg.Config().Tools.Ls),
489 tools.NewSourcegraphTool(nil),
490 tools.NewTodosTool(c.sessions),
491 tools.NewViewTool(c.lspManager, c.permissions, c.filetracker, c.skillTracker, c.cfg.WorkingDir(), c.cfg.Config().Options.SkillsPaths...),
492 tools.NewWriteTool(c.lspManager, c.permissions, c.history, c.filetracker, c.cfg.WorkingDir()),
493 )
494
495 // Add LSP tools if user has configured LSPs or auto_lsp is enabled (nil or true).
496 if len(c.cfg.Config().LSP) > 0 || c.cfg.Config().Options.AutoLSP == nil || *c.cfg.Config().Options.AutoLSP {
497 allTools = append(allTools, tools.NewDiagnosticsTool(c.lspManager), tools.NewReferencesTool(c.lspManager), tools.NewLSPRestartTool(c.lspManager))
498 }
499
500 if len(c.cfg.Config().MCP) > 0 {
501 allTools = append(
502 allTools,
503 tools.NewListMCPResourcesTool(c.cfg, c.permissions),
504 tools.NewReadMCPResourceTool(c.cfg, c.permissions),
505 )
506 }
507
508 var filteredTools []fantasy.AgentTool
509 for _, tool := range allTools {
510 if slices.Contains(agent.AllowedTools, tool.Info().Name) {
511 filteredTools = append(filteredTools, tool)
512 }
513 }
514
515 for _, tool := range tools.GetMCPTools(c.permissions, c.cfg, c.cfg.WorkingDir()) {
516 if agent.AllowedMCP == nil {
517 // No MCP restrictions
518 filteredTools = append(filteredTools, tool)
519 continue
520 }
521 if len(agent.AllowedMCP) == 0 {
522 // No MCPs allowed
523 slog.Debug("No MCPs allowed", "tool", tool.Name(), "agent", agent.Name)
524 break
525 }
526
527 for mcp, tools := range agent.AllowedMCP {
528 if mcp != tool.MCP() {
529 continue
530 }
531 if len(tools) == 0 || slices.Contains(tools, tool.MCPToolName()) {
532 filteredTools = append(filteredTools, tool)
533 break
534 }
535 slog.Debug("MCP not allowed", "tool", tool.Name(), "agent", agent.Name)
536 }
537 }
538 slices.SortFunc(filteredTools, func(a, b fantasy.AgentTool) int {
539 return strings.Compare(a.Info().Name, b.Info().Name)
540 })
541
542 // Wrap tools with hook interception for the top-level agent only.
543 // Sub-agents (the `agent` task tool, `agentic_fetch`, etc.) run
544 // without hook interception to avoid firing the user's hook N times
545 // per delegated turn. The top-level invocation of the sub-agent tool
546 // itself is still wrapped from the coder's side.
547 filteredTools = wrapToolsWithHooks(filteredTools, hookRunner, isSubAgent)
548
549 return filteredTools, nil
550}
551
552// TODO: when we support multiple agents we need to change this so that we pass in the agent specific model config
553func (c *coordinator) buildAgentModels(ctx context.Context, isSubAgent bool) (Model, Model, error) {
554 largeModelCfg, ok := c.cfg.Config().Models[config.SelectedModelTypeLarge]
555 if !ok {
556 return Model{}, Model{}, errLargeModelNotSelected
557 }
558 smallModelCfg, ok := c.cfg.Config().Models[config.SelectedModelTypeSmall]
559 if !ok {
560 return Model{}, Model{}, errSmallModelNotSelected
561 }
562
563 largeProviderCfg, ok := c.cfg.Config().Providers.Get(largeModelCfg.Provider)
564 if !ok {
565 return Model{}, Model{}, errLargeModelProviderNotConfigured
566 }
567
568 largeProvider, err := c.buildProvider(largeProviderCfg, largeModelCfg, isSubAgent)
569 if err != nil {
570 return Model{}, Model{}, err
571 }
572
573 smallProviderCfg, ok := c.cfg.Config().Providers.Get(smallModelCfg.Provider)
574 if !ok {
575 return Model{}, Model{}, errSmallModelProviderNotConfigured
576 }
577
578 smallProvider, err := c.buildProvider(smallProviderCfg, smallModelCfg, true)
579 if err != nil {
580 return Model{}, Model{}, err
581 }
582
583 var largeCatwalkModel *catwalk.Model
584 var smallCatwalkModel *catwalk.Model
585
586 for _, m := range largeProviderCfg.Models {
587 if m.ID == largeModelCfg.Model {
588 largeCatwalkModel = &m
589 }
590 }
591 for _, m := range smallProviderCfg.Models {
592 if m.ID == smallModelCfg.Model {
593 smallCatwalkModel = &m
594 }
595 }
596
597 if largeCatwalkModel == nil {
598 return Model{}, Model{}, errLargeModelNotFound
599 }
600
601 if smallCatwalkModel == nil {
602 return Model{}, Model{}, errSmallModelNotFound
603 }
604
605 largeModelID := largeModelCfg.Model
606 smallModelID := smallModelCfg.Model
607
608 if largeModelCfg.Provider == openrouter.Name && isExactoSupported(largeModelID) {
609 largeModelID += ":exacto"
610 }
611
612 if smallModelCfg.Provider == openrouter.Name && isExactoSupported(smallModelID) {
613 smallModelID += ":exacto"
614 }
615
616 largeModel, err := largeProvider.LanguageModel(ctx, largeModelID)
617 if err != nil {
618 return Model{}, Model{}, err
619 }
620 smallModel, err := smallProvider.LanguageModel(ctx, smallModelID)
621 if err != nil {
622 return Model{}, Model{}, err
623 }
624
625 return Model{
626 Model: largeModel,
627 CatwalkCfg: *largeCatwalkModel,
628 ModelCfg: largeModelCfg,
629 }, Model{
630 Model: smallModel,
631 CatwalkCfg: *smallCatwalkModel,
632 ModelCfg: smallModelCfg,
633 }, nil
634}
635
636func (c *coordinator) buildAnthropicProvider(baseURL, apiKey string, headers map[string]string, providerID string) (fantasy.Provider, error) {
637 var opts []anthropic.Option
638
639 switch {
640 case strings.HasPrefix(apiKey, "Bearer "):
641 // NOTE: Prevent the SDK from picking up the API key from env.
642 os.Setenv("ANTHROPIC_API_KEY", "")
643 headers["Authorization"] = apiKey
644 case providerID == string(catwalk.InferenceProviderMiniMax) || providerID == string(catwalk.InferenceProviderMiniMaxChina):
645 // NOTE: Prevent the SDK from picking up the API key from env.
646 os.Setenv("ANTHROPIC_API_KEY", "")
647 headers["Authorization"] = "Bearer " + apiKey
648 case apiKey != "":
649 // X-Api-Key header
650 opts = append(opts, anthropic.WithAPIKey(apiKey))
651 }
652
653 if len(headers) > 0 {
654 opts = append(opts, anthropic.WithHeaders(headers))
655 }
656
657 if baseURL != "" {
658 opts = append(opts, anthropic.WithBaseURL(baseURL))
659 }
660
661 if c.cfg.Config().Options.Debug {
662 httpClient := log.NewHTTPClient()
663 opts = append(opts, anthropic.WithHTTPClient(httpClient))
664 }
665 return anthropic.New(opts...)
666}
667
668func (c *coordinator) buildOpenaiProvider(baseURL, apiKey string, headers map[string]string) (fantasy.Provider, error) {
669 opts := []openai.Option{
670 openai.WithAPIKey(apiKey),
671 openai.WithUseResponsesAPI(),
672 }
673 if c.cfg.Config().Options.Debug {
674 httpClient := log.NewHTTPClient()
675 opts = append(opts, openai.WithHTTPClient(httpClient))
676 }
677 if len(headers) > 0 {
678 opts = append(opts, openai.WithHeaders(headers))
679 }
680 if baseURL != "" {
681 opts = append(opts, openai.WithBaseURL(baseURL))
682 }
683 return openai.New(opts...)
684}
685
686func (c *coordinator) buildOpenrouterProvider(_, apiKey string, headers map[string]string) (fantasy.Provider, error) {
687 opts := []openrouter.Option{
688 openrouter.WithAPIKey(apiKey),
689 }
690 if c.cfg.Config().Options.Debug {
691 httpClient := log.NewHTTPClient()
692 opts = append(opts, openrouter.WithHTTPClient(httpClient))
693 }
694 if len(headers) > 0 {
695 opts = append(opts, openrouter.WithHeaders(headers))
696 }
697 return openrouter.New(opts...)
698}
699
700func (c *coordinator) buildVercelProvider(_, apiKey string, headers map[string]string) (fantasy.Provider, error) {
701 opts := []vercel.Option{
702 vercel.WithAPIKey(apiKey),
703 }
704 if c.cfg.Config().Options.Debug {
705 httpClient := log.NewHTTPClient()
706 opts = append(opts, vercel.WithHTTPClient(httpClient))
707 }
708 if len(headers) > 0 {
709 opts = append(opts, vercel.WithHeaders(headers))
710 }
711 return vercel.New(opts...)
712}
713
714func (c *coordinator) buildOpenaiCompatProvider(baseURL, apiKey string, headers map[string]string, extraBody map[string]any, providerID string, isSubAgent bool) (fantasy.Provider, error) {
715 opts := []openaicompat.Option{
716 openaicompat.WithBaseURL(baseURL),
717 openaicompat.WithAPIKey(apiKey),
718 }
719
720 // Set HTTP client based on provider and debug mode.
721 var httpClient *http.Client
722 if providerID == string(catwalk.InferenceProviderCopilot) {
723 opts = append(opts, openaicompat.WithUseResponsesAPI())
724 httpClient = copilot.NewClient(isSubAgent, c.cfg.Config().Options.Debug)
725 } else if c.cfg.Config().Options.Debug {
726 httpClient = log.NewHTTPClient()
727 }
728 if httpClient != nil {
729 opts = append(opts, openaicompat.WithHTTPClient(httpClient))
730 }
731
732 if len(headers) > 0 {
733 opts = append(opts, openaicompat.WithHeaders(headers))
734 }
735
736 for extraKey, extraValue := range extraBody {
737 opts = append(opts, openaicompat.WithSDKOptions(openaisdk.WithJSONSet(extraKey, extraValue)))
738 }
739
740 return openaicompat.New(opts...)
741}
742
743func (c *coordinator) buildAzureProvider(baseURL, apiKey string, headers map[string]string, options map[string]string) (fantasy.Provider, error) {
744 opts := []azure.Option{
745 azure.WithBaseURL(baseURL),
746 azure.WithAPIKey(apiKey),
747 azure.WithUseResponsesAPI(),
748 }
749 if c.cfg.Config().Options.Debug {
750 httpClient := log.NewHTTPClient()
751 opts = append(opts, azure.WithHTTPClient(httpClient))
752 }
753 if options == nil {
754 options = make(map[string]string)
755 }
756 if apiVersion, ok := options["apiVersion"]; ok {
757 opts = append(opts, azure.WithAPIVersion(apiVersion))
758 }
759 if len(headers) > 0 {
760 opts = append(opts, azure.WithHeaders(headers))
761 }
762
763 return azure.New(opts...)
764}
765
766func (c *coordinator) buildBedrockProvider(apiKey string, headers map[string]string) (fantasy.Provider, error) {
767 var opts []bedrock.Option
768 if c.cfg.Config().Options.Debug {
769 httpClient := log.NewHTTPClient()
770 opts = append(opts, bedrock.WithHTTPClient(httpClient))
771 }
772 if len(headers) > 0 {
773 opts = append(opts, bedrock.WithHeaders(headers))
774 }
775 switch {
776 case apiKey != "":
777 opts = append(opts, bedrock.WithAPIKey(apiKey))
778 case os.Getenv("AWS_BEARER_TOKEN_BEDROCK") != "":
779 opts = append(opts, bedrock.WithAPIKey(os.Getenv("AWS_BEARER_TOKEN_BEDROCK")))
780 default:
781 // Skip, let the SDK do authentication.
782 }
783 return bedrock.New(opts...)
784}
785
786func (c *coordinator) buildGoogleProvider(baseURL, apiKey string, headers map[string]string) (fantasy.Provider, error) {
787 opts := []google.Option{
788 google.WithBaseURL(baseURL),
789 google.WithGeminiAPIKey(apiKey),
790 }
791 if c.cfg.Config().Options.Debug {
792 httpClient := log.NewHTTPClient()
793 opts = append(opts, google.WithHTTPClient(httpClient))
794 }
795 if len(headers) > 0 {
796 opts = append(opts, google.WithHeaders(headers))
797 }
798 return google.New(opts...)
799}
800
801func (c *coordinator) buildGoogleVertexProvider(headers map[string]string, options map[string]string) (fantasy.Provider, error) {
802 opts := []google.Option{}
803 if c.cfg.Config().Options.Debug {
804 httpClient := log.NewHTTPClient()
805 opts = append(opts, google.WithHTTPClient(httpClient))
806 }
807 if len(headers) > 0 {
808 opts = append(opts, google.WithHeaders(headers))
809 }
810
811 project := options["project"]
812 location := options["location"]
813
814 opts = append(opts, google.WithVertex(project, location))
815
816 return google.New(opts...)
817}
818
819func (c *coordinator) isAnthropicThinking(model config.SelectedModel) bool {
820 if model.Think {
821 return true
822 }
823 opts, err := anthropic.ParseOptions(model.ProviderOptions)
824 return err == nil && opts.Thinking != nil
825}
826
827func (c *coordinator) buildProvider(providerCfg config.ProviderConfig, model config.SelectedModel, isSubAgent bool) (fantasy.Provider, error) {
828 headers := maps.Clone(providerCfg.ExtraHeaders)
829 if headers == nil {
830 headers = make(map[string]string)
831 }
832
833 // handle special headers for anthropic
834 if providerCfg.Type == anthropic.Name && c.isAnthropicThinking(model) {
835 if v, ok := headers["anthropic-beta"]; ok {
836 headers["anthropic-beta"] = v + ",interleaved-thinking-2025-05-14"
837 } else {
838 headers["anthropic-beta"] = "interleaved-thinking-2025-05-14"
839 }
840 }
841
842 apiKey, _ := c.cfg.Resolve(providerCfg.APIKey)
843 baseURL, _ := c.cfg.Resolve(providerCfg.BaseURL)
844
845 switch providerCfg.Type {
846 case openai.Name:
847 return c.buildOpenaiProvider(baseURL, apiKey, headers)
848 case anthropic.Name:
849 return c.buildAnthropicProvider(baseURL, apiKey, headers, providerCfg.ID)
850 case openrouter.Name:
851 return c.buildOpenrouterProvider(baseURL, apiKey, headers)
852 case vercel.Name:
853 return c.buildVercelProvider(baseURL, apiKey, headers)
854 case azure.Name:
855 return c.buildAzureProvider(baseURL, apiKey, headers, providerCfg.ExtraParams)
856 case bedrock.Name:
857 return c.buildBedrockProvider(apiKey, headers)
858 case google.Name:
859 return c.buildGoogleProvider(baseURL, apiKey, headers)
860 case "google-vertex":
861 return c.buildGoogleVertexProvider(headers, providerCfg.ExtraParams)
862 case openaicompat.Name, hyper.Name:
863 switch providerCfg.ID {
864 case hyper.Name:
865 baseURL = hyper.BaseURL() + "/v1"
866 headers["x-crush-id"] = event.GetID()
867 case string(catwalk.InferenceProviderZAI):
868 if providerCfg.ExtraBody == nil {
869 providerCfg.ExtraBody = map[string]any{}
870 }
871 providerCfg.ExtraBody["tool_stream"] = true
872 }
873 return c.buildOpenaiCompatProvider(baseURL, apiKey, headers, providerCfg.ExtraBody, providerCfg.ID, isSubAgent)
874 default:
875 return nil, fmt.Errorf("provider type not supported: %q", providerCfg.Type)
876 }
877}
878
879func isExactoSupported(modelID string) bool {
880 supportedModels := []string{
881 "moonshotai/kimi-k2-0905",
882 "deepseek/deepseek-v3.1-terminus",
883 "z-ai/glm-4.6",
884 "openai/gpt-oss-120b",
885 "qwen/qwen3-coder",
886 }
887 return slices.Contains(supportedModels, modelID)
888}
889
890func (c *coordinator) Cancel(sessionID string) {
891 c.currentAgent.Cancel(sessionID)
892}
893
894func (c *coordinator) CancelAll() {
895 c.currentAgent.CancelAll()
896}
897
898func (c *coordinator) ClearQueue(sessionID string) {
899 c.currentAgent.ClearQueue(sessionID)
900}
901
902func (c *coordinator) IsBusy() bool {
903 return c.currentAgent.IsBusy()
904}
905
906func (c *coordinator) IsSessionBusy(sessionID string) bool {
907 return c.currentAgent.IsSessionBusy(sessionID)
908}
909
910func (c *coordinator) Model() Model {
911 return c.currentAgent.Model()
912}
913
914func (c *coordinator) UpdateModels(ctx context.Context) error {
915 // build the models again so we make sure we get the latest config
916 large, small, err := c.buildAgentModels(ctx, false)
917 if err != nil {
918 return err
919 }
920 c.currentAgent.SetModels(large, small)
921
922 agentCfg, ok := c.cfg.Config().Agents[config.AgentCoder]
923 if !ok {
924 return errCoderAgentNotConfigured
925 }
926
927 tools, err := c.buildTools(ctx, agentCfg, false)
928 if err != nil {
929 return err
930 }
931 c.currentAgent.SetTools(tools)
932 return nil
933}
934
935func (c *coordinator) QueuedPrompts(sessionID string) int {
936 return c.currentAgent.QueuedPrompts(sessionID)
937}
938
939func (c *coordinator) QueuedPromptsList(sessionID string) []string {
940 return c.currentAgent.QueuedPromptsList(sessionID)
941}
942
943func (c *coordinator) Summarize(ctx context.Context, sessionID string) error {
944 providerCfg, ok := c.cfg.Config().Providers.Get(c.currentAgent.Model().ModelCfg.Provider)
945 if !ok {
946 return errModelProviderNotConfigured
947 }
948
949 if err := c.refreshTokenIfExpired(ctx, providerCfg); err != nil {
950 slog.Error("Failed to refresh OAuth2 token before summarize. Proceeding with existing token.", "error", err)
951 }
952
953 summarize := func() error {
954 return c.currentAgent.Summarize(ctx, sessionID, getProviderOptions(c.currentAgent.Model(), providerCfg))
955 }
956
957 err := summarize()
958 if err != nil && c.isUnauthorized(err) {
959 if retryErr := c.retryAfterUnauthorized(ctx, providerCfg); retryErr == nil {
960 return summarize()
961 }
962 }
963
964 return err
965}
966
967// refreshTokenIfExpired proactively refreshes the OAuth token if it has expired.
968func (c *coordinator) refreshTokenIfExpired(ctx context.Context, providerCfg config.ProviderConfig) error {
969 if providerCfg.OAuthToken == nil || !providerCfg.OAuthToken.IsExpired() {
970 return nil
971 }
972 slog.Debug("Token needs to be refreshed", "provider", providerCfg.ID)
973 return c.refreshOAuth2Token(ctx, providerCfg)
974}
975
976// retryAfterUnauthorized attempts to refresh credentials after receiving a 401
977// and returns nil if retry should be attempted.
978func (c *coordinator) retryAfterUnauthorized(ctx context.Context, providerCfg config.ProviderConfig) error {
979 switch {
980 case providerCfg.OAuthToken != nil:
981 slog.Debug("Received 401. Refreshing token and retrying", "provider", providerCfg.ID)
982 return c.refreshOAuth2Token(ctx, providerCfg)
983 case strings.Contains(providerCfg.APIKeyTemplate, "$"):
984 slog.Debug("Received 401. Refreshing API Key template and retrying", "provider", providerCfg.ID)
985 return c.refreshApiKeyTemplate(ctx, providerCfg)
986 default:
987 return nil
988 }
989}
990
991func (c *coordinator) isUnauthorized(err error) bool {
992 var providerErr *fantasy.ProviderError
993 return errors.As(err, &providerErr) && providerErr.StatusCode == http.StatusUnauthorized
994}
995
996func (c *coordinator) refreshOAuth2Token(ctx context.Context, providerCfg config.ProviderConfig) error {
997 if err := c.cfg.RefreshOAuthToken(ctx, config.ScopeGlobal, providerCfg.ID); err != nil {
998 slog.Error("Failed to refresh OAuth token after 401 error", "provider", providerCfg.ID, "error", err)
999 return err
1000 }
1001 if err := c.UpdateModels(ctx); err != nil {
1002 return err
1003 }
1004 return nil
1005}
1006
1007func (c *coordinator) refreshApiKeyTemplate(ctx context.Context, providerCfg config.ProviderConfig) error {
1008 newAPIKey, err := c.cfg.Resolve(providerCfg.APIKeyTemplate)
1009 if err != nil {
1010 slog.Error("Failed to re-resolve API key after 401 error", "provider", providerCfg.ID, "error", err)
1011 return err
1012 }
1013
1014 providerCfg.APIKey = newAPIKey
1015 c.cfg.Config().Providers.Set(providerCfg.ID, providerCfg)
1016
1017 if err := c.UpdateModels(ctx); err != nil {
1018 return err
1019 }
1020 return nil
1021}
1022
1023// subAgentParams holds the parameters for running a sub-agent.
1024type subAgentParams struct {
1025 Agent SessionAgent
1026 SessionID string
1027 AgentMessageID string
1028 ToolCallID string
1029 Prompt string
1030 SessionTitle string
1031 // SessionSetup is an optional callback invoked after session creation
1032 // but before agent execution, for custom session configuration.
1033 SessionSetup func(sessionID string)
1034}
1035
1036// runSubAgent runs a sub-agent and handles session management and cost accumulation.
1037// It creates a sub-session, runs the agent with the given prompt, and propagates
1038// the cost to the parent session.
1039func (c *coordinator) runSubAgent(ctx context.Context, params subAgentParams) (fantasy.ToolResponse, error) {
1040 // Create sub-session
1041 agentToolSessionID := c.sessions.CreateAgentToolSessionID(params.AgentMessageID, params.ToolCallID)
1042 session, err := c.sessions.CreateTaskSession(ctx, agentToolSessionID, params.SessionID, params.SessionTitle)
1043 if err != nil {
1044 return fantasy.ToolResponse{}, fmt.Errorf("create session: %w", err)
1045 }
1046
1047 // Call session setup function if provided
1048 if params.SessionSetup != nil {
1049 params.SessionSetup(session.ID)
1050 }
1051
1052 // Get model configuration
1053 model := params.Agent.Model()
1054 maxTokens := model.CatwalkCfg.DefaultMaxTokens
1055 if model.ModelCfg.MaxTokens != 0 {
1056 maxTokens = model.ModelCfg.MaxTokens
1057 }
1058
1059 providerCfg, ok := c.cfg.Config().Providers.Get(model.ModelCfg.Provider)
1060 if !ok {
1061 return fantasy.ToolResponse{}, errModelProviderNotConfigured
1062 }
1063
1064 // Run the agent
1065 result, err := params.Agent.Run(ctx, SessionAgentCall{
1066 SessionID: session.ID,
1067 Prompt: params.Prompt,
1068 MaxOutputTokens: maxTokens,
1069 ProviderOptions: getProviderOptions(model, providerCfg),
1070 Temperature: model.ModelCfg.Temperature,
1071 TopP: model.ModelCfg.TopP,
1072 TopK: model.ModelCfg.TopK,
1073 FrequencyPenalty: model.ModelCfg.FrequencyPenalty,
1074 PresencePenalty: model.ModelCfg.PresencePenalty,
1075 NonInteractive: true,
1076 })
1077 if err != nil {
1078 return fantasy.NewTextErrorResponse("error generating response"), nil
1079 }
1080
1081 // Update parent session cost
1082 if err := c.updateParentSessionCost(ctx, session.ID, params.SessionID); err != nil {
1083 return fantasy.ToolResponse{}, err
1084 }
1085
1086 return fantasy.NewTextResponse(result.Response.Content.Text()), nil
1087}
1088
1089// updateParentSessionCost accumulates the cost from a child session to its parent session.
1090func (c *coordinator) updateParentSessionCost(ctx context.Context, childSessionID, parentSessionID string) error {
1091 childSession, err := c.sessions.Get(ctx, childSessionID)
1092 if err != nil {
1093 return fmt.Errorf("get child session: %w", err)
1094 }
1095
1096 parentSession, err := c.sessions.Get(ctx, parentSessionID)
1097 if err != nil {
1098 return fmt.Errorf("get parent session: %w", err)
1099 }
1100
1101 parentSession.Cost += childSession.Cost
1102
1103 if _, err := c.sessions.Save(ctx, parentSession); err != nil {
1104 return fmt.Errorf("save parent session: %w", err)
1105 }
1106
1107 return nil
1108}
1109
1110// discoverSkills runs the skill discovery pipeline and returns both the
1111// pre-filter (all discovered, after dedup) and post-filter (active) lists.
1112// It also emits a single diagnostic log line summarising the outcome to
1113// help track skill-loading health over time.
1114func discoverSkills(cfg *config.ConfigStore) (allSkills, activeSkills []*skills.Skill) {
1115 builtin, builtinStates := skills.DiscoverBuiltinWithStates()
1116 discovered := append([]*skills.Skill(nil), builtin...)
1117
1118 var userStates []*skills.SkillState
1119 var userPaths []string
1120
1121 opts := cfg.Config().Options
1122 if opts != nil && len(opts.SkillsPaths) > 0 {
1123 userPaths = make([]string, 0, len(opts.SkillsPaths))
1124 for _, pth := range opts.SkillsPaths {
1125 expanded := home.Long(pth)
1126 if strings.HasPrefix(expanded, "$") {
1127 if resolved, err := cfg.Resolver().ResolveValue(expanded); err == nil {
1128 expanded = resolved
1129 }
1130 }
1131 userPaths = append(userPaths, expanded)
1132 }
1133 var userSkills []*skills.Skill
1134 userSkills, userStates = skills.DiscoverWithStates(userPaths)
1135 discovered = append(discovered, userSkills...)
1136 }
1137
1138 allSkills = skills.Deduplicate(discovered)
1139 var disabledSkills []string
1140 if opts != nil {
1141 disabledSkills = opts.DisabledSkills
1142 }
1143 activeSkills = skills.Filter(allSkills, disabledSkills)
1144
1145 logDiscoveryStats(builtin, builtinStates, userStates, userPaths, allSkills, activeSkills, disabledSkills)
1146 return allSkills, activeSkills
1147}
1148
1149// logTurnSkillUsage emits a per-turn diagnostic line showing which skills
1150// (if any) were loaded during this turn and which looked relevant based on
1151// a cheap keyword match against the user prompt. The goal is to surface
1152// "should-have-loaded but didn't" situations for later analysis.
1153//
1154// Logged at Info level under component=skills; heavy fields are elided when
1155// there is nothing interesting to report.
1156func logTurnSkillUsage(
1157 sessionID string,
1158 prompt string,
1159 activeSkills []*skills.Skill,
1160 tracker *skills.Tracker,
1161 before []string,
1162) {
1163 if tracker == nil || len(activeSkills) == 0 {
1164 return
1165 }
1166
1167 after := tracker.LoadedNames()
1168
1169 beforeSet := make(map[string]bool, len(before))
1170 for _, n := range before {
1171 beforeSet[n] = true
1172 }
1173 var loadedThisTurn []string
1174 for _, n := range after {
1175 if !beforeSet[n] {
1176 loadedThisTurn = append(loadedThisTurn, n)
1177 }
1178 }
1179
1180 slog.Info("Skill turn summary",
1181 "component", "skills",
1182 "session_id", sessionID,
1183 "prompt_len", len(prompt),
1184 "active_total", len(activeSkills),
1185 "loaded_total", len(after),
1186 "loaded_this_turn", loadedThisTurn,
1187 )
1188}
1189
1190// logDiscoveryStats emits a single structured log line summarising skill
1191// discovery for the current session. It is intentionally low-volume: one
1192// line per session start.
1193func logDiscoveryStats(
1194 builtin []*skills.Skill,
1195 builtinStates, userStates []*skills.SkillState,
1196 userPaths []string,
1197 allSkills, activeSkills []*skills.Skill,
1198 disabled []string,
1199) {
1200 countErrors := func(states []*skills.SkillState) int {
1201 n := 0
1202 for _, s := range states {
1203 if s.State == skills.StateError {
1204 n++
1205 }
1206 }
1207 return n
1208 }
1209
1210 userOK := 0
1211 for _, s := range userStates {
1212 if s.State == skills.StateNormal {
1213 userOK++
1214 }
1215 }
1216
1217 activeNames := make([]string, 0, len(activeSkills))
1218 for _, s := range activeSkills {
1219 activeNames = append(activeNames, s.Name)
1220 }
1221
1222 xml := skills.ToPromptXML(activeSkills)
1223
1224 slog.Info("Skill discovery complete",
1225 "component", "skills",
1226 "builtin_ok", len(builtin),
1227 "builtin_errors", countErrors(builtinStates),
1228 "user_ok", userOK,
1229 "user_errors", countErrors(userStates),
1230 "user_paths", len(userPaths),
1231 "deduped_total", len(allSkills),
1232 "active", len(activeSkills),
1233 "disabled", len(disabled),
1234 "prompt_bytes", len(xml),
1235 "prompt_tok_est", skills.ApproxTokenCount(xml),
1236 "active_names", activeNames,
1237 )
1238}