coordinator.go

   1package agent
   2
   3import (
   4	"bytes"
   5	"cmp"
   6	"context"
   7	"encoding/json"
   8	"errors"
   9	"fmt"
  10	"io"
  11	"log/slog"
  12	"maps"
  13	"net/http"
  14	"os"
  15	"path/filepath"
  16	"slices"
  17	"strings"
  18
  19	"charm.land/catwalk/pkg/catwalk"
  20	"charm.land/fantasy"
  21	"github.com/charmbracelet/crush/internal/agent/hyper"
  22	"github.com/charmbracelet/crush/internal/agent/notify"
  23	"github.com/charmbracelet/crush/internal/agent/prompt"
  24	"github.com/charmbracelet/crush/internal/agent/tools"
  25	"github.com/charmbracelet/crush/internal/config"
  26	"github.com/charmbracelet/crush/internal/event"
  27	"github.com/charmbracelet/crush/internal/filetracker"
  28	"github.com/charmbracelet/crush/internal/history"
  29	"github.com/charmbracelet/crush/internal/home"
  30	"github.com/charmbracelet/crush/internal/log"
  31	"github.com/charmbracelet/crush/internal/lsp"
  32	"github.com/charmbracelet/crush/internal/message"
  33	"github.com/charmbracelet/crush/internal/oauth/copilot"
  34	"github.com/charmbracelet/crush/internal/permission"
  35	"github.com/charmbracelet/crush/internal/pubsub"
  36	"github.com/charmbracelet/crush/internal/session"
  37	"github.com/charmbracelet/crush/internal/skills"
  38	"golang.org/x/sync/errgroup"
  39
  40	"charm.land/fantasy/providers/anthropic"
  41	"charm.land/fantasy/providers/azure"
  42	"charm.land/fantasy/providers/bedrock"
  43	"charm.land/fantasy/providers/google"
  44	"charm.land/fantasy/providers/openai"
  45	"charm.land/fantasy/providers/openaicompat"
  46	"charm.land/fantasy/providers/openrouter"
  47	"charm.land/fantasy/providers/vercel"
  48	openaisdk "github.com/charmbracelet/openai-go/option"
  49	"github.com/qjebbs/go-jsons"
  50)
  51
  52// Coordinator errors.
  53var (
  54	errCoderAgentNotConfigured         = errors.New("coder agent not configured")
  55	errModelProviderNotConfigured      = errors.New("model provider not configured")
  56	errLargeModelNotSelected           = errors.New("large model not selected")
  57	errSmallModelNotSelected           = errors.New("small model not selected")
  58	errLargeModelProviderNotConfigured = errors.New("large model provider not configured")
  59	errSmallModelProviderNotConfigured = errors.New("small model provider not configured")
  60	errLargeModelNotFound              = errors.New("large model not found in provider config")
  61	errSmallModelNotFound              = errors.New("small model not found in provider config")
  62)
  63
  64type Coordinator interface {
  65	// INFO: (kujtim) this is not used yet we will use this when we have multiple agents
  66	// SetMainAgent(string)
  67	Run(ctx context.Context, sessionID, prompt string, attachments ...message.Attachment) (*fantasy.AgentResult, error)
  68	Cancel(sessionID string)
  69	CancelAll()
  70	IsSessionBusy(sessionID string) bool
  71	IsBusy() bool
  72	QueuedPrompts(sessionID string) int
  73	QueuedPromptsList(sessionID string) []string
  74	ClearQueue(sessionID string)
  75	Summarize(context.Context, string) error
  76	Model() Model
  77	UpdateModels(ctx context.Context) error
  78}
  79
  80type coordinator struct {
  81	cfg         *config.ConfigStore
  82	sessions    session.Service
  83	messages    message.Service
  84	permissions permission.Service
  85	history     history.Service
  86	filetracker filetracker.Service
  87	lspManager  *lsp.Manager
  88	notify      pubsub.Publisher[notify.Notification]
  89
  90	currentAgent SessionAgent
  91	agents       map[string]SessionAgent
  92
  93	// Skills discovery results (session-start snapshot).
  94	allSkills    []*skills.Skill // Pre-filter: all discovered after dedup.
  95	activeSkills []*skills.Skill // Post-filter: active skills only.
  96	skillTracker *skills.Tracker
  97
  98	readyWg errgroup.Group
  99}
 100
 101func NewCoordinator(
 102	ctx context.Context,
 103	cfg *config.ConfigStore,
 104	sessions session.Service,
 105	messages message.Service,
 106	permissions permission.Service,
 107	history history.Service,
 108	filetracker filetracker.Service,
 109	lspManager *lsp.Manager,
 110	notify pubsub.Publisher[notify.Notification],
 111) (Coordinator, error) {
 112	// Discover skills once at session start.
 113	allSkills, activeSkills := discoverSkills(cfg)
 114	skillTracker := skills.NewTracker(activeSkills)
 115
 116	c := &coordinator{
 117		cfg:          cfg,
 118		sessions:     sessions,
 119		messages:     messages,
 120		permissions:  permissions,
 121		history:      history,
 122		filetracker:  filetracker,
 123		lspManager:   lspManager,
 124		notify:       notify,
 125		agents:       make(map[string]SessionAgent),
 126		allSkills:    allSkills,
 127		activeSkills: activeSkills,
 128		skillTracker: skillTracker,
 129	}
 130
 131	agentCfg, ok := cfg.Config().Agents[config.AgentCoder]
 132	if !ok {
 133		return nil, errCoderAgentNotConfigured
 134	}
 135
 136	// TODO: make this dynamic when we support multiple agents
 137	prompt, err := coderPrompt(prompt.WithWorkingDir(c.cfg.WorkingDir()))
 138	if err != nil {
 139		return nil, err
 140	}
 141
 142	agent, err := c.buildAgent(ctx, prompt, agentCfg, false)
 143	if err != nil {
 144		return nil, err
 145	}
 146	c.currentAgent = agent
 147	c.agents[config.AgentCoder] = agent
 148	return c, nil
 149}
 150
 151// Run implements Coordinator.
 152func (c *coordinator) Run(ctx context.Context, sessionID string, prompt string, attachments ...message.Attachment) (*fantasy.AgentResult, error) {
 153	if err := c.readyWg.Wait(); err != nil {
 154		return nil, err
 155	}
 156
 157	// refresh models before each run
 158	if err := c.UpdateModels(ctx); err != nil {
 159		return nil, fmt.Errorf("failed to update models: %w", err)
 160	}
 161
 162	model := c.currentAgent.Model()
 163	maxTokens := model.CatwalkCfg.DefaultMaxTokens
 164	if model.ModelCfg.MaxTokens != 0 {
 165		maxTokens = model.ModelCfg.MaxTokens
 166	}
 167
 168	if !model.CatwalkCfg.SupportsImages && attachments != nil {
 169		// filter out image attachments
 170		filteredAttachments := make([]message.Attachment, 0, len(attachments))
 171		for _, att := range attachments {
 172			if att.IsText() {
 173				filteredAttachments = append(filteredAttachments, att)
 174			}
 175		}
 176		attachments = filteredAttachments
 177	}
 178
 179	providerCfg, ok := c.cfg.Config().Providers.Get(model.ModelCfg.Provider)
 180	if !ok {
 181		return nil, errModelProviderNotConfigured
 182	}
 183
 184	mergedOptions, temp, topP, topK, freqPenalty, presPenalty := mergeCallOptions(model, providerCfg)
 185
 186	if providerCfg.OAuthToken != nil && providerCfg.OAuthToken.IsExpired() {
 187		slog.Debug("Token needs to be refreshed", "provider", providerCfg.ID)
 188		if err := c.refreshOAuth2Token(ctx, providerCfg); err != nil {
 189			return nil, err
 190		}
 191	}
 192
 193	run := func() (*fantasy.AgentResult, error) {
 194		return c.currentAgent.Run(ctx, SessionAgentCall{
 195			SessionID:        sessionID,
 196			Prompt:           prompt,
 197			Attachments:      attachments,
 198			MaxOutputTokens:  maxTokens,
 199			ProviderOptions:  mergedOptions,
 200			Temperature:      temp,
 201			TopP:             topP,
 202			TopK:             topK,
 203			FrequencyPenalty: freqPenalty,
 204			PresencePenalty:  presPenalty,
 205		})
 206	}
 207	result, originalErr := run()
 208
 209	if c.isUnauthorized(originalErr) {
 210		switch {
 211		case providerCfg.OAuthToken != nil:
 212			slog.Debug("Received 401. Refreshing token and retrying", "provider", providerCfg.ID)
 213			if err := c.refreshOAuth2Token(ctx, providerCfg); err != nil {
 214				return nil, originalErr
 215			}
 216			slog.Debug("Retrying request with refreshed OAuth token", "provider", providerCfg.ID)
 217			return run()
 218		case strings.Contains(providerCfg.APIKeyTemplate, "$"):
 219			slog.Debug("Received 401. Refreshing API Key template and retrying", "provider", providerCfg.ID)
 220			if err := c.refreshApiKeyTemplate(ctx, providerCfg); err != nil {
 221				return nil, originalErr
 222			}
 223			slog.Debug("Retrying request with refreshed API key", "provider", providerCfg.ID)
 224			return run()
 225		}
 226	}
 227
 228	return result, originalErr
 229}
 230
 231func getProviderOptions(model Model, providerCfg config.ProviderConfig) fantasy.ProviderOptions {
 232	options := fantasy.ProviderOptions{}
 233
 234	cfgOpts := []byte("{}")
 235	providerCfgOpts := []byte("{}")
 236	catwalkOpts := []byte("{}")
 237
 238	if model.ModelCfg.ProviderOptions != nil {
 239		data, err := json.Marshal(model.ModelCfg.ProviderOptions)
 240		if err == nil {
 241			cfgOpts = data
 242		}
 243	}
 244
 245	if providerCfg.ProviderOptions != nil {
 246		data, err := json.Marshal(providerCfg.ProviderOptions)
 247		if err == nil {
 248			providerCfgOpts = data
 249		}
 250	}
 251
 252	if model.CatwalkCfg.Options.ProviderOptions != nil {
 253		data, err := json.Marshal(model.CatwalkCfg.Options.ProviderOptions)
 254		if err == nil {
 255			catwalkOpts = data
 256		}
 257	}
 258
 259	readers := []io.Reader{
 260		bytes.NewReader(catwalkOpts),
 261		bytes.NewReader(providerCfgOpts),
 262		bytes.NewReader(cfgOpts),
 263	}
 264
 265	got, err := jsons.Merge(readers)
 266	if err != nil {
 267		slog.Error("Could not merge call config", "err", err)
 268		return options
 269	}
 270
 271	mergedOptions := make(map[string]any)
 272
 273	err = json.Unmarshal([]byte(got), &mergedOptions)
 274	if err != nil {
 275		slog.Error("Could not create config for call", "err", err)
 276		return options
 277	}
 278
 279	providerType := providerCfg.Type
 280	if providerType == "hyper" {
 281		if strings.Contains(model.CatwalkCfg.ID, "claude") {
 282			providerType = anthropic.Name
 283		} else if strings.Contains(model.CatwalkCfg.ID, "gpt") {
 284			providerType = openai.Name
 285		} else if strings.Contains(model.CatwalkCfg.ID, "gemini") {
 286			providerType = google.Name
 287		} else {
 288			providerType = openaicompat.Name
 289		}
 290	}
 291
 292	switch providerType {
 293	case openai.Name, azure.Name:
 294		_, hasReasoningEffort := mergedOptions["reasoning_effort"]
 295		if !hasReasoningEffort && model.ModelCfg.ReasoningEffort != "" {
 296			mergedOptions["reasoning_effort"] = model.ModelCfg.ReasoningEffort
 297		}
 298		if openai.IsResponsesModel(model.CatwalkCfg.ID) {
 299			if openai.IsResponsesReasoningModel(model.CatwalkCfg.ID) {
 300				mergedOptions["reasoning_summary"] = "auto"
 301				mergedOptions["include"] = []openai.IncludeType{openai.IncludeReasoningEncryptedContent}
 302			}
 303			parsed, err := openai.ParseResponsesOptions(mergedOptions)
 304			if err == nil {
 305				options[openai.Name] = parsed
 306			}
 307		} else {
 308			parsed, err := openai.ParseOptions(mergedOptions)
 309			if err == nil {
 310				options[openai.Name] = parsed
 311			}
 312		}
 313	case anthropic.Name:
 314		var (
 315			_, hasEffort = mergedOptions["effort"]
 316			_, hasThink  = mergedOptions["thinking"]
 317		)
 318		switch {
 319		case !hasEffort && model.ModelCfg.ReasoningEffort != "":
 320			mergedOptions["effort"] = model.ModelCfg.ReasoningEffort
 321		case !hasThink && model.ModelCfg.Think:
 322			mergedOptions["thinking"] = map[string]any{"budget_tokens": 2000}
 323		}
 324		parsed, err := anthropic.ParseOptions(mergedOptions)
 325		if err == nil {
 326			options[anthropic.Name] = parsed
 327		}
 328
 329	case openrouter.Name:
 330		_, hasReasoning := mergedOptions["reasoning"]
 331		if !hasReasoning && model.ModelCfg.ReasoningEffort != "" {
 332			mergedOptions["reasoning"] = map[string]any{
 333				"enabled": true,
 334				"effort":  model.ModelCfg.ReasoningEffort,
 335			}
 336		}
 337		parsed, err := openrouter.ParseOptions(mergedOptions)
 338		if err == nil {
 339			options[openrouter.Name] = parsed
 340		}
 341	case vercel.Name:
 342		_, hasReasoning := mergedOptions["reasoning"]
 343		if !hasReasoning && model.ModelCfg.ReasoningEffort != "" {
 344			mergedOptions["reasoning"] = map[string]any{
 345				"enabled": true,
 346				"effort":  model.ModelCfg.ReasoningEffort,
 347			}
 348		}
 349		parsed, err := vercel.ParseOptions(mergedOptions)
 350		if err == nil {
 351			options[vercel.Name] = parsed
 352		}
 353	case google.Name:
 354		_, hasReasoning := mergedOptions["thinking_config"]
 355		if !hasReasoning {
 356			if strings.HasPrefix(model.CatwalkCfg.ID, "gemini-2") {
 357				mergedOptions["thinking_config"] = map[string]any{
 358					"thinking_budget":  2000,
 359					"include_thoughts": true,
 360				}
 361			} else {
 362				mergedOptions["thinking_config"] = map[string]any{
 363					"thinking_level":   model.ModelCfg.ReasoningEffort,
 364					"include_thoughts": true,
 365				}
 366			}
 367		}
 368		parsed, err := google.ParseOptions(mergedOptions)
 369		if err == nil {
 370			options[google.Name] = parsed
 371		}
 372	case openaicompat.Name:
 373		_, hasReasoningEffort := mergedOptions["reasoning_effort"]
 374		if !hasReasoningEffort && model.ModelCfg.ReasoningEffort != "" {
 375			mergedOptions["reasoning_effort"] = model.ModelCfg.ReasoningEffort
 376		}
 377		parsed, err := openaicompat.ParseOptions(mergedOptions)
 378		if err == nil {
 379			options[openaicompat.Name] = parsed
 380		}
 381	}
 382
 383	return options
 384}
 385
 386func mergeCallOptions(model Model, cfg config.ProviderConfig) (fantasy.ProviderOptions, *float64, *float64, *int64, *float64, *float64) {
 387	modelOptions := getProviderOptions(model, cfg)
 388	temp := cmp.Or(model.ModelCfg.Temperature, model.CatwalkCfg.Options.Temperature)
 389	topP := cmp.Or(model.ModelCfg.TopP, model.CatwalkCfg.Options.TopP)
 390	topK := cmp.Or(model.ModelCfg.TopK, model.CatwalkCfg.Options.TopK)
 391	freqPenalty := cmp.Or(model.ModelCfg.FrequencyPenalty, model.CatwalkCfg.Options.FrequencyPenalty)
 392	presPenalty := cmp.Or(model.ModelCfg.PresencePenalty, model.CatwalkCfg.Options.PresencePenalty)
 393	return modelOptions, temp, topP, topK, freqPenalty, presPenalty
 394}
 395
 396func (c *coordinator) buildAgent(ctx context.Context, prompt *prompt.Prompt, agent config.Agent, isSubAgent bool) (SessionAgent, error) {
 397	large, small, err := c.buildAgentModels(ctx, isSubAgent)
 398	if err != nil {
 399		return nil, err
 400	}
 401
 402	largeProviderCfg, _ := c.cfg.Config().Providers.Get(large.ModelCfg.Provider)
 403	result := NewSessionAgent(SessionAgentOptions{
 404		LargeModel:           large,
 405		SmallModel:           small,
 406		SystemPromptPrefix:   largeProviderCfg.SystemPromptPrefix,
 407		SystemPrompt:         "",
 408		IsSubAgent:           isSubAgent,
 409		DisableAutoSummarize: c.cfg.Config().Options.DisableAutoSummarize,
 410		IsYolo:               c.permissions.SkipRequests(),
 411		Sessions:             c.sessions,
 412		Messages:             c.messages,
 413		Tools:                nil,
 414		Notify:               c.notify,
 415	})
 416
 417	c.readyWg.Go(func() error {
 418		systemPrompt, err := prompt.Build(ctx, large.Model.Provider(), large.Model.Model(), c.cfg)
 419		if err != nil {
 420			return err
 421		}
 422		result.SetSystemPrompt(systemPrompt)
 423		return nil
 424	})
 425
 426	c.readyWg.Go(func() error {
 427		tools, err := c.buildTools(ctx, agent)
 428		if err != nil {
 429			return err
 430		}
 431		result.SetTools(tools)
 432		return nil
 433	})
 434
 435	return result, nil
 436}
 437
 438func (c *coordinator) buildTools(ctx context.Context, agent config.Agent) ([]fantasy.AgentTool, error) {
 439	var allTools []fantasy.AgentTool
 440	if slices.Contains(agent.AllowedTools, AgentToolName) {
 441		agentTool, err := c.agentTool(ctx)
 442		if err != nil {
 443			return nil, err
 444		}
 445		allTools = append(allTools, agentTool)
 446	}
 447
 448	if slices.Contains(agent.AllowedTools, tools.AgenticFetchToolName) {
 449		agenticFetchTool, err := c.agenticFetchTool(ctx, nil)
 450		if err != nil {
 451			return nil, err
 452		}
 453		allTools = append(allTools, agenticFetchTool)
 454	}
 455
 456	// Get the model name for the agent
 457	modelName := ""
 458	if modelCfg, ok := c.cfg.Config().Models[agent.Model]; ok {
 459		if model := c.cfg.Config().GetModel(modelCfg.Provider, modelCfg.Model); model != nil {
 460			modelName = model.Name
 461		}
 462	}
 463
 464	logFile := filepath.Join(c.cfg.Config().Options.DataDirectory, "logs", "crush.log")
 465
 466	allTools = append(allTools,
 467		tools.NewBashTool(c.permissions, c.cfg.WorkingDir(), c.cfg.Config().Options.Attribution, modelName),
 468		tools.NewCrushInfoTool(c.cfg, c.lspManager, c.allSkills, c.activeSkills, c.skillTracker),
 469		tools.NewCrushLogsTool(logFile),
 470		tools.NewJobOutputTool(),
 471		tools.NewJobKillTool(),
 472		tools.NewDownloadTool(c.permissions, c.cfg.WorkingDir(), nil),
 473		tools.NewEditTool(c.lspManager, c.permissions, c.history, c.filetracker, c.cfg.WorkingDir()),
 474		tools.NewMultiEditTool(c.lspManager, c.permissions, c.history, c.filetracker, c.cfg.WorkingDir()),
 475		tools.NewFetchTool(c.permissions, c.cfg.WorkingDir(), nil),
 476		tools.NewGlobTool(c.cfg.WorkingDir()),
 477		tools.NewGrepTool(c.cfg.WorkingDir(), c.cfg.Config().Tools.Grep),
 478		tools.NewLsTool(c.permissions, c.cfg.WorkingDir(), c.cfg.Config().Tools.Ls),
 479		tools.NewSourcegraphTool(nil),
 480		tools.NewTodosTool(c.sessions),
 481		tools.NewViewTool(c.lspManager, c.permissions, c.filetracker, c.skillTracker, c.cfg.WorkingDir(), c.cfg.Config().Options.SkillsPaths...),
 482		tools.NewWriteTool(c.lspManager, c.permissions, c.history, c.filetracker, c.cfg.WorkingDir()),
 483	)
 484
 485	// Add LSP tools if user has configured LSPs or auto_lsp is enabled (nil or true).
 486	if len(c.cfg.Config().LSP) > 0 || c.cfg.Config().Options.AutoLSP == nil || *c.cfg.Config().Options.AutoLSP {
 487		allTools = append(allTools, tools.NewDiagnosticsTool(c.lspManager), tools.NewReferencesTool(c.lspManager), tools.NewLSPRestartTool(c.lspManager))
 488	}
 489
 490	if len(c.cfg.Config().MCP) > 0 {
 491		allTools = append(
 492			allTools,
 493			tools.NewListMCPResourcesTool(c.cfg, c.permissions),
 494			tools.NewReadMCPResourceTool(c.cfg, c.permissions),
 495		)
 496	}
 497
 498	var filteredTools []fantasy.AgentTool
 499	for _, tool := range allTools {
 500		if slices.Contains(agent.AllowedTools, tool.Info().Name) {
 501			filteredTools = append(filteredTools, tool)
 502		}
 503	}
 504
 505	for _, tool := range tools.GetMCPTools(c.permissions, c.cfg, c.cfg.WorkingDir()) {
 506		if agent.AllowedMCP == nil {
 507			// No MCP restrictions
 508			filteredTools = append(filteredTools, tool)
 509			continue
 510		}
 511		if len(agent.AllowedMCP) == 0 {
 512			// No MCPs allowed
 513			slog.Debug("No MCPs allowed", "tool", tool.Name(), "agent", agent.Name)
 514			break
 515		}
 516
 517		for mcp, tools := range agent.AllowedMCP {
 518			if mcp != tool.MCP() {
 519				continue
 520			}
 521			if len(tools) == 0 || slices.Contains(tools, tool.MCPToolName()) {
 522				filteredTools = append(filteredTools, tool)
 523				break
 524			}
 525			slog.Debug("MCP not allowed", "tool", tool.Name(), "agent", agent.Name)
 526		}
 527	}
 528	slices.SortFunc(filteredTools, func(a, b fantasy.AgentTool) int {
 529		return strings.Compare(a.Info().Name, b.Info().Name)
 530	})
 531	return filteredTools, nil
 532}
 533
 534// TODO: when we support multiple agents we need to change this so that we pass in the agent specific model config
 535func (c *coordinator) buildAgentModels(ctx context.Context, isSubAgent bool) (Model, Model, error) {
 536	largeModelCfg, ok := c.cfg.Config().Models[config.SelectedModelTypeLarge]
 537	if !ok {
 538		return Model{}, Model{}, errLargeModelNotSelected
 539	}
 540	smallModelCfg, ok := c.cfg.Config().Models[config.SelectedModelTypeSmall]
 541	if !ok {
 542		return Model{}, Model{}, errSmallModelNotSelected
 543	}
 544
 545	largeProviderCfg, ok := c.cfg.Config().Providers.Get(largeModelCfg.Provider)
 546	if !ok {
 547		return Model{}, Model{}, errLargeModelProviderNotConfigured
 548	}
 549
 550	largeProvider, err := c.buildProvider(largeProviderCfg, largeModelCfg, isSubAgent)
 551	if err != nil {
 552		return Model{}, Model{}, err
 553	}
 554
 555	smallProviderCfg, ok := c.cfg.Config().Providers.Get(smallModelCfg.Provider)
 556	if !ok {
 557		return Model{}, Model{}, errSmallModelProviderNotConfigured
 558	}
 559
 560	smallProvider, err := c.buildProvider(smallProviderCfg, smallModelCfg, true)
 561	if err != nil {
 562		return Model{}, Model{}, err
 563	}
 564
 565	var largeCatwalkModel *catwalk.Model
 566	var smallCatwalkModel *catwalk.Model
 567
 568	for _, m := range largeProviderCfg.Models {
 569		if m.ID == largeModelCfg.Model {
 570			largeCatwalkModel = &m
 571		}
 572	}
 573	for _, m := range smallProviderCfg.Models {
 574		if m.ID == smallModelCfg.Model {
 575			smallCatwalkModel = &m
 576		}
 577	}
 578
 579	if largeCatwalkModel == nil {
 580		return Model{}, Model{}, errLargeModelNotFound
 581	}
 582
 583	if smallCatwalkModel == nil {
 584		return Model{}, Model{}, errSmallModelNotFound
 585	}
 586
 587	largeModelID := largeModelCfg.Model
 588	smallModelID := smallModelCfg.Model
 589
 590	if largeModelCfg.Provider == openrouter.Name && isExactoSupported(largeModelID) {
 591		largeModelID += ":exacto"
 592	}
 593
 594	if smallModelCfg.Provider == openrouter.Name && isExactoSupported(smallModelID) {
 595		smallModelID += ":exacto"
 596	}
 597
 598	largeModel, err := largeProvider.LanguageModel(ctx, largeModelID)
 599	if err != nil {
 600		return Model{}, Model{}, err
 601	}
 602	smallModel, err := smallProvider.LanguageModel(ctx, smallModelID)
 603	if err != nil {
 604		return Model{}, Model{}, err
 605	}
 606
 607	return Model{
 608			Model:      largeModel,
 609			CatwalkCfg: *largeCatwalkModel,
 610			ModelCfg:   largeModelCfg,
 611		}, Model{
 612			Model:      smallModel,
 613			CatwalkCfg: *smallCatwalkModel,
 614			ModelCfg:   smallModelCfg,
 615		}, nil
 616}
 617
 618func (c *coordinator) buildAnthropicProvider(baseURL, apiKey string, headers map[string]string, providerID string) (fantasy.Provider, error) {
 619	var opts []anthropic.Option
 620
 621	switch {
 622	case strings.HasPrefix(apiKey, "Bearer "):
 623		// NOTE: Prevent the SDK from picking up the API key from env.
 624		os.Setenv("ANTHROPIC_API_KEY", "")
 625		headers["Authorization"] = apiKey
 626	case providerID == string(catwalk.InferenceProviderMiniMax) || providerID == string(catwalk.InferenceProviderMiniMaxChina):
 627		// NOTE: Prevent the SDK from picking up the API key from env.
 628		os.Setenv("ANTHROPIC_API_KEY", "")
 629		headers["Authorization"] = "Bearer " + apiKey
 630	case apiKey != "":
 631		// X-Api-Key header
 632		opts = append(opts, anthropic.WithAPIKey(apiKey))
 633	}
 634
 635	if len(headers) > 0 {
 636		opts = append(opts, anthropic.WithHeaders(headers))
 637	}
 638
 639	if baseURL != "" {
 640		opts = append(opts, anthropic.WithBaseURL(baseURL))
 641	}
 642
 643	if c.cfg.Config().Options.Debug {
 644		httpClient := log.NewHTTPClient()
 645		opts = append(opts, anthropic.WithHTTPClient(httpClient))
 646	}
 647	return anthropic.New(opts...)
 648}
 649
 650func (c *coordinator) buildOpenaiProvider(baseURL, apiKey string, headers map[string]string) (fantasy.Provider, error) {
 651	opts := []openai.Option{
 652		openai.WithAPIKey(apiKey),
 653		openai.WithUseResponsesAPI(),
 654	}
 655	if c.cfg.Config().Options.Debug {
 656		httpClient := log.NewHTTPClient()
 657		opts = append(opts, openai.WithHTTPClient(httpClient))
 658	}
 659	if len(headers) > 0 {
 660		opts = append(opts, openai.WithHeaders(headers))
 661	}
 662	if baseURL != "" {
 663		opts = append(opts, openai.WithBaseURL(baseURL))
 664	}
 665	return openai.New(opts...)
 666}
 667
 668func (c *coordinator) buildOpenrouterProvider(_, apiKey string, headers map[string]string) (fantasy.Provider, error) {
 669	opts := []openrouter.Option{
 670		openrouter.WithAPIKey(apiKey),
 671	}
 672	if c.cfg.Config().Options.Debug {
 673		httpClient := log.NewHTTPClient()
 674		opts = append(opts, openrouter.WithHTTPClient(httpClient))
 675	}
 676	if len(headers) > 0 {
 677		opts = append(opts, openrouter.WithHeaders(headers))
 678	}
 679	return openrouter.New(opts...)
 680}
 681
 682func (c *coordinator) buildVercelProvider(_, apiKey string, headers map[string]string) (fantasy.Provider, error) {
 683	opts := []vercel.Option{
 684		vercel.WithAPIKey(apiKey),
 685	}
 686	if c.cfg.Config().Options.Debug {
 687		httpClient := log.NewHTTPClient()
 688		opts = append(opts, vercel.WithHTTPClient(httpClient))
 689	}
 690	if len(headers) > 0 {
 691		opts = append(opts, vercel.WithHeaders(headers))
 692	}
 693	return vercel.New(opts...)
 694}
 695
 696func (c *coordinator) buildOpenaiCompatProvider(baseURL, apiKey string, headers map[string]string, extraBody map[string]any, providerID string, isSubAgent bool) (fantasy.Provider, error) {
 697	opts := []openaicompat.Option{
 698		openaicompat.WithBaseURL(baseURL),
 699		openaicompat.WithAPIKey(apiKey),
 700	}
 701
 702	// Set HTTP client based on provider and debug mode.
 703	var httpClient *http.Client
 704	if providerID == string(catwalk.InferenceProviderCopilot) {
 705		opts = append(opts, openaicompat.WithUseResponsesAPI())
 706		httpClient = copilot.NewClient(isSubAgent, c.cfg.Config().Options.Debug)
 707	} else if c.cfg.Config().Options.Debug {
 708		httpClient = log.NewHTTPClient()
 709	}
 710	if httpClient != nil {
 711		opts = append(opts, openaicompat.WithHTTPClient(httpClient))
 712	}
 713
 714	if len(headers) > 0 {
 715		opts = append(opts, openaicompat.WithHeaders(headers))
 716	}
 717
 718	for extraKey, extraValue := range extraBody {
 719		opts = append(opts, openaicompat.WithSDKOptions(openaisdk.WithJSONSet(extraKey, extraValue)))
 720	}
 721
 722	return openaicompat.New(opts...)
 723}
 724
 725func (c *coordinator) buildAzureProvider(baseURL, apiKey string, headers map[string]string, options map[string]string) (fantasy.Provider, error) {
 726	opts := []azure.Option{
 727		azure.WithBaseURL(baseURL),
 728		azure.WithAPIKey(apiKey),
 729		azure.WithUseResponsesAPI(),
 730	}
 731	if c.cfg.Config().Options.Debug {
 732		httpClient := log.NewHTTPClient()
 733		opts = append(opts, azure.WithHTTPClient(httpClient))
 734	}
 735	if options == nil {
 736		options = make(map[string]string)
 737	}
 738	if apiVersion, ok := options["apiVersion"]; ok {
 739		opts = append(opts, azure.WithAPIVersion(apiVersion))
 740	}
 741	if len(headers) > 0 {
 742		opts = append(opts, azure.WithHeaders(headers))
 743	}
 744
 745	return azure.New(opts...)
 746}
 747
 748func (c *coordinator) buildBedrockProvider(apiKey string, headers map[string]string) (fantasy.Provider, error) {
 749	var opts []bedrock.Option
 750	if c.cfg.Config().Options.Debug {
 751		httpClient := log.NewHTTPClient()
 752		opts = append(opts, bedrock.WithHTTPClient(httpClient))
 753	}
 754	if len(headers) > 0 {
 755		opts = append(opts, bedrock.WithHeaders(headers))
 756	}
 757	switch {
 758	case apiKey != "":
 759		opts = append(opts, bedrock.WithAPIKey(apiKey))
 760	case os.Getenv("AWS_BEARER_TOKEN_BEDROCK") != "":
 761		opts = append(opts, bedrock.WithAPIKey(os.Getenv("AWS_BEARER_TOKEN_BEDROCK")))
 762	default:
 763		// Skip, let the SDK do authentication.
 764	}
 765	return bedrock.New(opts...)
 766}
 767
 768func (c *coordinator) buildGoogleProvider(baseURL, apiKey string, headers map[string]string) (fantasy.Provider, error) {
 769	opts := []google.Option{
 770		google.WithBaseURL(baseURL),
 771		google.WithGeminiAPIKey(apiKey),
 772	}
 773	if c.cfg.Config().Options.Debug {
 774		httpClient := log.NewHTTPClient()
 775		opts = append(opts, google.WithHTTPClient(httpClient))
 776	}
 777	if len(headers) > 0 {
 778		opts = append(opts, google.WithHeaders(headers))
 779	}
 780	return google.New(opts...)
 781}
 782
 783func (c *coordinator) buildGoogleVertexProvider(headers map[string]string, options map[string]string) (fantasy.Provider, error) {
 784	opts := []google.Option{}
 785	if c.cfg.Config().Options.Debug {
 786		httpClient := log.NewHTTPClient()
 787		opts = append(opts, google.WithHTTPClient(httpClient))
 788	}
 789	if len(headers) > 0 {
 790		opts = append(opts, google.WithHeaders(headers))
 791	}
 792
 793	project := options["project"]
 794	location := options["location"]
 795
 796	opts = append(opts, google.WithVertex(project, location))
 797
 798	return google.New(opts...)
 799}
 800
 801func (c *coordinator) isAnthropicThinking(model config.SelectedModel) bool {
 802	if model.Think {
 803		return true
 804	}
 805	opts, err := anthropic.ParseOptions(model.ProviderOptions)
 806	return err == nil && opts.Thinking != nil
 807}
 808
 809func (c *coordinator) buildProvider(providerCfg config.ProviderConfig, model config.SelectedModel, isSubAgent bool) (fantasy.Provider, error) {
 810	headers := maps.Clone(providerCfg.ExtraHeaders)
 811	if headers == nil {
 812		headers = make(map[string]string)
 813	}
 814
 815	// handle special headers for anthropic
 816	if providerCfg.Type == anthropic.Name && c.isAnthropicThinking(model) {
 817		if v, ok := headers["anthropic-beta"]; ok {
 818			headers["anthropic-beta"] = v + ",interleaved-thinking-2025-05-14"
 819		} else {
 820			headers["anthropic-beta"] = "interleaved-thinking-2025-05-14"
 821		}
 822	}
 823
 824	apiKey, _ := c.cfg.Resolve(providerCfg.APIKey)
 825	baseURL, _ := c.cfg.Resolve(providerCfg.BaseURL)
 826
 827	switch providerCfg.Type {
 828	case openai.Name:
 829		return c.buildOpenaiProvider(baseURL, apiKey, headers)
 830	case anthropic.Name:
 831		return c.buildAnthropicProvider(baseURL, apiKey, headers, providerCfg.ID)
 832	case openrouter.Name:
 833		return c.buildOpenrouterProvider(baseURL, apiKey, headers)
 834	case vercel.Name:
 835		return c.buildVercelProvider(baseURL, apiKey, headers)
 836	case azure.Name:
 837		return c.buildAzureProvider(baseURL, apiKey, headers, providerCfg.ExtraParams)
 838	case bedrock.Name:
 839		return c.buildBedrockProvider(apiKey, headers)
 840	case google.Name:
 841		return c.buildGoogleProvider(baseURL, apiKey, headers)
 842	case "google-vertex":
 843		return c.buildGoogleVertexProvider(headers, providerCfg.ExtraParams)
 844	case openaicompat.Name, hyper.Name:
 845		switch providerCfg.ID {
 846		case hyper.Name:
 847			baseURL = hyper.BaseURL() + "/v1"
 848			headers["x-crush-id"] = event.GetID()
 849		case string(catwalk.InferenceProviderZAI):
 850			if providerCfg.ExtraBody == nil {
 851				providerCfg.ExtraBody = map[string]any{}
 852			}
 853			providerCfg.ExtraBody["tool_stream"] = true
 854		}
 855		return c.buildOpenaiCompatProvider(baseURL, apiKey, headers, providerCfg.ExtraBody, providerCfg.ID, isSubAgent)
 856	default:
 857		return nil, fmt.Errorf("provider type not supported: %q", providerCfg.Type)
 858	}
 859}
 860
 861func isExactoSupported(modelID string) bool {
 862	supportedModels := []string{
 863		"moonshotai/kimi-k2-0905",
 864		"deepseek/deepseek-v3.1-terminus",
 865		"z-ai/glm-4.6",
 866		"openai/gpt-oss-120b",
 867		"qwen/qwen3-coder",
 868	}
 869	return slices.Contains(supportedModels, modelID)
 870}
 871
 872func (c *coordinator) Cancel(sessionID string) {
 873	c.currentAgent.Cancel(sessionID)
 874}
 875
 876func (c *coordinator) CancelAll() {
 877	c.currentAgent.CancelAll()
 878}
 879
 880func (c *coordinator) ClearQueue(sessionID string) {
 881	c.currentAgent.ClearQueue(sessionID)
 882}
 883
 884func (c *coordinator) IsBusy() bool {
 885	return c.currentAgent.IsBusy()
 886}
 887
 888func (c *coordinator) IsSessionBusy(sessionID string) bool {
 889	return c.currentAgent.IsSessionBusy(sessionID)
 890}
 891
 892func (c *coordinator) Model() Model {
 893	return c.currentAgent.Model()
 894}
 895
 896func (c *coordinator) UpdateModels(ctx context.Context) error {
 897	// build the models again so we make sure we get the latest config
 898	large, small, err := c.buildAgentModels(ctx, false)
 899	if err != nil {
 900		return err
 901	}
 902	c.currentAgent.SetModels(large, small)
 903
 904	agentCfg, ok := c.cfg.Config().Agents[config.AgentCoder]
 905	if !ok {
 906		return errCoderAgentNotConfigured
 907	}
 908
 909	tools, err := c.buildTools(ctx, agentCfg)
 910	if err != nil {
 911		return err
 912	}
 913	c.currentAgent.SetTools(tools)
 914	return nil
 915}
 916
 917func (c *coordinator) QueuedPrompts(sessionID string) int {
 918	return c.currentAgent.QueuedPrompts(sessionID)
 919}
 920
 921func (c *coordinator) QueuedPromptsList(sessionID string) []string {
 922	return c.currentAgent.QueuedPromptsList(sessionID)
 923}
 924
 925func (c *coordinator) Summarize(ctx context.Context, sessionID string) error {
 926	providerCfg, ok := c.cfg.Config().Providers.Get(c.currentAgent.Model().ModelCfg.Provider)
 927	if !ok {
 928		return errModelProviderNotConfigured
 929	}
 930	return c.currentAgent.Summarize(ctx, sessionID, getProviderOptions(c.currentAgent.Model(), providerCfg))
 931}
 932
 933func (c *coordinator) isUnauthorized(err error) bool {
 934	var providerErr *fantasy.ProviderError
 935	return errors.As(err, &providerErr) && providerErr.StatusCode == http.StatusUnauthorized
 936}
 937
 938func (c *coordinator) refreshOAuth2Token(ctx context.Context, providerCfg config.ProviderConfig) error {
 939	if err := c.cfg.RefreshOAuthToken(ctx, config.ScopeGlobal, providerCfg.ID); err != nil {
 940		slog.Error("Failed to refresh OAuth token after 401 error", "provider", providerCfg.ID, "error", err)
 941		return err
 942	}
 943	if err := c.UpdateModels(ctx); err != nil {
 944		return err
 945	}
 946	return nil
 947}
 948
 949func (c *coordinator) refreshApiKeyTemplate(ctx context.Context, providerCfg config.ProviderConfig) error {
 950	newAPIKey, err := c.cfg.Resolve(providerCfg.APIKeyTemplate)
 951	if err != nil {
 952		slog.Error("Failed to re-resolve API key after 401 error", "provider", providerCfg.ID, "error", err)
 953		return err
 954	}
 955
 956	providerCfg.APIKey = newAPIKey
 957	c.cfg.Config().Providers.Set(providerCfg.ID, providerCfg)
 958
 959	if err := c.UpdateModels(ctx); err != nil {
 960		return err
 961	}
 962	return nil
 963}
 964
 965// subAgentParams holds the parameters for running a sub-agent.
 966type subAgentParams struct {
 967	Agent          SessionAgent
 968	SessionID      string
 969	AgentMessageID string
 970	ToolCallID     string
 971	Prompt         string
 972	SessionTitle   string
 973	// SessionSetup is an optional callback invoked after session creation
 974	// but before agent execution, for custom session configuration.
 975	SessionSetup func(sessionID string)
 976}
 977
 978// runSubAgent runs a sub-agent and handles session management and cost accumulation.
 979// It creates a sub-session, runs the agent with the given prompt, and propagates
 980// the cost to the parent session.
 981func (c *coordinator) runSubAgent(ctx context.Context, params subAgentParams) (fantasy.ToolResponse, error) {
 982	// Create sub-session
 983	agentToolSessionID := c.sessions.CreateAgentToolSessionID(params.AgentMessageID, params.ToolCallID)
 984	session, err := c.sessions.CreateTaskSession(ctx, agentToolSessionID, params.SessionID, params.SessionTitle)
 985	if err != nil {
 986		return fantasy.ToolResponse{}, fmt.Errorf("create session: %w", err)
 987	}
 988
 989	// Call session setup function if provided
 990	if params.SessionSetup != nil {
 991		params.SessionSetup(session.ID)
 992	}
 993
 994	// Get model configuration
 995	model := params.Agent.Model()
 996	maxTokens := model.CatwalkCfg.DefaultMaxTokens
 997	if model.ModelCfg.MaxTokens != 0 {
 998		maxTokens = model.ModelCfg.MaxTokens
 999	}
1000
1001	providerCfg, ok := c.cfg.Config().Providers.Get(model.ModelCfg.Provider)
1002	if !ok {
1003		return fantasy.ToolResponse{}, errModelProviderNotConfigured
1004	}
1005
1006	// Run the agent
1007	result, err := params.Agent.Run(ctx, SessionAgentCall{
1008		SessionID:        session.ID,
1009		Prompt:           params.Prompt,
1010		MaxOutputTokens:  maxTokens,
1011		ProviderOptions:  getProviderOptions(model, providerCfg),
1012		Temperature:      model.ModelCfg.Temperature,
1013		TopP:             model.ModelCfg.TopP,
1014		TopK:             model.ModelCfg.TopK,
1015		FrequencyPenalty: model.ModelCfg.FrequencyPenalty,
1016		PresencePenalty:  model.ModelCfg.PresencePenalty,
1017		NonInteractive:   true,
1018	})
1019	if err != nil {
1020		return fantasy.NewTextErrorResponse("error generating response"), nil
1021	}
1022
1023	// Update parent session cost
1024	if err := c.updateParentSessionCost(ctx, session.ID, params.SessionID); err != nil {
1025		return fantasy.ToolResponse{}, err
1026	}
1027
1028	return fantasy.NewTextResponse(result.Response.Content.Text()), nil
1029}
1030
1031// updateParentSessionCost accumulates the cost from a child session to its parent session.
1032func (c *coordinator) updateParentSessionCost(ctx context.Context, childSessionID, parentSessionID string) error {
1033	childSession, err := c.sessions.Get(ctx, childSessionID)
1034	if err != nil {
1035		return fmt.Errorf("get child session: %w", err)
1036	}
1037
1038	parentSession, err := c.sessions.Get(ctx, parentSessionID)
1039	if err != nil {
1040		return fmt.Errorf("get parent session: %w", err)
1041	}
1042
1043	parentSession.Cost += childSession.Cost
1044
1045	if _, err := c.sessions.Save(ctx, parentSession); err != nil {
1046		return fmt.Errorf("save parent session: %w", err)
1047	}
1048
1049	return nil
1050}
1051
1052// discoverSkills runs the skill discovery pipeline and returns both the
1053// pre-filter (all discovered, after dedup) and post-filter (active) lists.
1054func discoverSkills(cfg *config.ConfigStore) (allSkills, activeSkills []*skills.Skill) {
1055	discovered := skills.DiscoverBuiltin()
1056
1057	opts := cfg.Config().Options
1058	if opts != nil && len(opts.SkillsPaths) > 0 {
1059		expandedPaths := make([]string, 0, len(opts.SkillsPaths))
1060		for _, pth := range opts.SkillsPaths {
1061			expanded := home.Long(pth)
1062			if strings.HasPrefix(expanded, "$") {
1063				if resolved, err := cfg.Resolver().ResolveValue(expanded); err == nil {
1064					expanded = resolved
1065				}
1066			}
1067			expandedPaths = append(expandedPaths, expanded)
1068		}
1069		discovered = append(discovered, skills.Discover(expandedPaths)...)
1070	}
1071
1072	allSkills = skills.Deduplicate(discovered)
1073	var disabledSkills []string
1074	if opts != nil {
1075		disabledSkills = opts.DisabledSkills
1076	}
1077	activeSkills = skills.Filter(allSkills, disabledSkills)
1078	return allSkills, activeSkills
1079}