coordinator.go

   1package agent
   2
   3import (
   4	"bytes"
   5	"cmp"
   6	"context"
   7	"encoding/json"
   8	"errors"
   9	"fmt"
  10	"io"
  11	"log/slog"
  12	"maps"
  13	"net/http"
  14	"os"
  15	"path/filepath"
  16	"slices"
  17	"strings"
  18
  19	"charm.land/catwalk/pkg/catwalk"
  20	"charm.land/fantasy"
  21	"github.com/charmbracelet/crush/internal/agent/hyper"
  22	"github.com/charmbracelet/crush/internal/agent/notify"
  23	"github.com/charmbracelet/crush/internal/agent/prompt"
  24	"github.com/charmbracelet/crush/internal/agent/tools"
  25	"github.com/charmbracelet/crush/internal/config"
  26	"github.com/charmbracelet/crush/internal/event"
  27	"github.com/charmbracelet/crush/internal/filetracker"
  28	"github.com/charmbracelet/crush/internal/history"
  29	"github.com/charmbracelet/crush/internal/home"
  30	"github.com/charmbracelet/crush/internal/log"
  31	"github.com/charmbracelet/crush/internal/lsp"
  32	"github.com/charmbracelet/crush/internal/message"
  33	"github.com/charmbracelet/crush/internal/oauth/copilot"
  34	"github.com/charmbracelet/crush/internal/permission"
  35	"github.com/charmbracelet/crush/internal/pubsub"
  36	"github.com/charmbracelet/crush/internal/session"
  37	"github.com/charmbracelet/crush/internal/skills"
  38	"golang.org/x/sync/errgroup"
  39
  40	"charm.land/fantasy/providers/anthropic"
  41	"charm.land/fantasy/providers/azure"
  42	"charm.land/fantasy/providers/bedrock"
  43	"charm.land/fantasy/providers/google"
  44	"charm.land/fantasy/providers/openai"
  45	"charm.land/fantasy/providers/openaicompat"
  46	"charm.land/fantasy/providers/openrouter"
  47	"charm.land/fantasy/providers/vercel"
  48	openaisdk "github.com/charmbracelet/openai-go/option"
  49	"github.com/qjebbs/go-jsons"
  50)
  51
  52// Coordinator errors.
  53var (
  54	errCoderAgentNotConfigured         = errors.New("coder agent not configured")
  55	errModelProviderNotConfigured      = errors.New("model provider not configured")
  56	errLargeModelNotSelected           = errors.New("large model not selected")
  57	errSmallModelNotSelected           = errors.New("small model not selected")
  58	errLargeModelProviderNotConfigured = errors.New("large model provider not configured")
  59	errSmallModelProviderNotConfigured = errors.New("small model provider not configured")
  60	errLargeModelNotFound              = errors.New("large model not found in provider config")
  61	errSmallModelNotFound              = errors.New("small model not found in provider config")
  62)
  63
  64type Coordinator interface {
  65	// INFO: (kujtim) this is not used yet we will use this when we have multiple agents
  66	// SetMainAgent(string)
  67	Run(ctx context.Context, sessionID, prompt string, attachments ...message.Attachment) (*fantasy.AgentResult, error)
  68	Cancel(sessionID string)
  69	CancelAll()
  70	IsSessionBusy(sessionID string) bool
  71	IsBusy() bool
  72	QueuedPrompts(sessionID string) int
  73	QueuedPromptsList(sessionID string) []string
  74	ClearQueue(sessionID string)
  75	Summarize(context.Context, string) error
  76	Model() Model
  77	UpdateModels(ctx context.Context) error
  78}
  79
  80type coordinator struct {
  81	cfg         *config.ConfigStore
  82	sessions    session.Service
  83	messages    message.Service
  84	permissions permission.Service
  85	history     history.Service
  86	filetracker filetracker.Service
  87	lspManager  *lsp.Manager
  88	notify      pubsub.Publisher[notify.Notification]
  89
  90	currentAgent SessionAgent
  91	agents       map[string]SessionAgent
  92
  93	// Skills discovery results (session-start snapshot).
  94	allSkills    []*skills.Skill // Pre-filter: all discovered after dedup.
  95	activeSkills []*skills.Skill // Post-filter: active skills only.
  96	skillTracker *skills.Tracker
  97
  98	readyWg errgroup.Group
  99}
 100
 101func NewCoordinator(
 102	ctx context.Context,
 103	cfg *config.ConfigStore,
 104	sessions session.Service,
 105	messages message.Service,
 106	permissions permission.Service,
 107	history history.Service,
 108	filetracker filetracker.Service,
 109	lspManager *lsp.Manager,
 110	notify pubsub.Publisher[notify.Notification],
 111) (Coordinator, error) {
 112	// Discover skills once at session start.
 113	allSkills, activeSkills := discoverSkills(cfg)
 114	skillTracker := skills.NewTracker(activeSkills)
 115
 116	c := &coordinator{
 117		cfg:          cfg,
 118		sessions:     sessions,
 119		messages:     messages,
 120		permissions:  permissions,
 121		history:      history,
 122		filetracker:  filetracker,
 123		lspManager:   lspManager,
 124		notify:       notify,
 125		agents:       make(map[string]SessionAgent),
 126		allSkills:    allSkills,
 127		activeSkills: activeSkills,
 128		skillTracker: skillTracker,
 129	}
 130
 131	agentCfg, ok := cfg.Config().Agents[config.AgentCoder]
 132	if !ok {
 133		return nil, errCoderAgentNotConfigured
 134	}
 135
 136	// TODO: make this dynamic when we support multiple agents
 137	prompt, err := coderPrompt(prompt.WithWorkingDir(c.cfg.WorkingDir()))
 138	if err != nil {
 139		return nil, err
 140	}
 141
 142	agent, err := c.buildAgent(ctx, prompt, agentCfg, false)
 143	if err != nil {
 144		return nil, err
 145	}
 146	c.currentAgent = agent
 147	c.agents[config.AgentCoder] = agent
 148	return c, nil
 149}
 150
 151// Run implements Coordinator.
 152func (c *coordinator) Run(ctx context.Context, sessionID string, prompt string, attachments ...message.Attachment) (*fantasy.AgentResult, error) {
 153	if err := c.readyWg.Wait(); err != nil {
 154		return nil, err
 155	}
 156
 157	// refresh models before each run
 158	if err := c.UpdateModels(ctx); err != nil {
 159		return nil, fmt.Errorf("failed to update models: %w", err)
 160	}
 161
 162	model := c.currentAgent.Model()
 163	maxTokens := model.CatwalkCfg.DefaultMaxTokens
 164	if model.ModelCfg.MaxTokens != 0 {
 165		maxTokens = model.ModelCfg.MaxTokens
 166	}
 167
 168	if !model.CatwalkCfg.SupportsImages && attachments != nil {
 169		// filter out image attachments
 170		filteredAttachments := make([]message.Attachment, 0, len(attachments))
 171		for _, att := range attachments {
 172			if att.IsText() {
 173				filteredAttachments = append(filteredAttachments, att)
 174			}
 175		}
 176		attachments = filteredAttachments
 177	}
 178
 179	providerCfg, ok := c.cfg.Config().Providers.Get(model.ModelCfg.Provider)
 180	if !ok {
 181		return nil, errModelProviderNotConfigured
 182	}
 183
 184	mergedOptions, temp, topP, topK, freqPenalty, presPenalty := mergeCallOptions(model, providerCfg)
 185
 186	if providerCfg.OAuthToken != nil && providerCfg.OAuthToken.IsExpired() {
 187		slog.Debug("Token needs to be refreshed", "provider", providerCfg.ID)
 188		if err := c.refreshOAuth2Token(ctx, providerCfg); err != nil {
 189			// NOTE(@andreynering): We don't return here because the event handling to ask the user to reauthenticate
 190			// depends on the flow below. If refresh fails, proceed with the token we have.
 191			slog.Error("Failed to refresh OAuth2 token. Proceeding with existing token.", "error", err)
 192		}
 193	}
 194
 195	run := func() (*fantasy.AgentResult, error) {
 196		return c.currentAgent.Run(ctx, SessionAgentCall{
 197			SessionID:        sessionID,
 198			Prompt:           prompt,
 199			Attachments:      attachments,
 200			MaxOutputTokens:  maxTokens,
 201			ProviderOptions:  mergedOptions,
 202			Temperature:      temp,
 203			TopP:             topP,
 204			TopK:             topK,
 205			FrequencyPenalty: freqPenalty,
 206			PresencePenalty:  presPenalty,
 207		})
 208	}
 209	beforeLoaded := c.skillTracker.LoadedNames()
 210	result, originalErr := run()
 211	logTurnSkillUsage(sessionID, prompt, c.activeSkills, c.skillTracker, beforeLoaded)
 212
 213	if c.isUnauthorized(originalErr) {
 214		switch {
 215		case providerCfg.OAuthToken != nil:
 216			slog.Debug("Received 401. Refreshing token and retrying", "provider", providerCfg.ID)
 217			if err := c.refreshOAuth2Token(ctx, providerCfg); err != nil {
 218				return nil, originalErr
 219			}
 220			slog.Debug("Retrying request with refreshed OAuth token", "provider", providerCfg.ID)
 221			return run()
 222		case strings.Contains(providerCfg.APIKeyTemplate, "$"):
 223			slog.Debug("Received 401. Refreshing API Key template and retrying", "provider", providerCfg.ID)
 224			if err := c.refreshApiKeyTemplate(ctx, providerCfg); err != nil {
 225				return nil, originalErr
 226			}
 227			slog.Debug("Retrying request with refreshed API key", "provider", providerCfg.ID)
 228			return run()
 229		}
 230	}
 231
 232	return result, originalErr
 233}
 234
 235func getProviderOptions(model Model, providerCfg config.ProviderConfig) fantasy.ProviderOptions {
 236	options := fantasy.ProviderOptions{}
 237
 238	cfgOpts := []byte("{}")
 239	providerCfgOpts := []byte("{}")
 240	catwalkOpts := []byte("{}")
 241
 242	if model.ModelCfg.ProviderOptions != nil {
 243		data, err := json.Marshal(model.ModelCfg.ProviderOptions)
 244		if err == nil {
 245			cfgOpts = data
 246		}
 247	}
 248
 249	if providerCfg.ProviderOptions != nil {
 250		data, err := json.Marshal(providerCfg.ProviderOptions)
 251		if err == nil {
 252			providerCfgOpts = data
 253		}
 254	}
 255
 256	if model.CatwalkCfg.Options.ProviderOptions != nil {
 257		data, err := json.Marshal(model.CatwalkCfg.Options.ProviderOptions)
 258		if err == nil {
 259			catwalkOpts = data
 260		}
 261	}
 262
 263	readers := []io.Reader{
 264		bytes.NewReader(catwalkOpts),
 265		bytes.NewReader(providerCfgOpts),
 266		bytes.NewReader(cfgOpts),
 267	}
 268
 269	got, err := jsons.Merge(readers)
 270	if err != nil {
 271		slog.Error("Could not merge call config", "err", err)
 272		return options
 273	}
 274
 275	mergedOptions := make(map[string]any)
 276
 277	err = json.Unmarshal([]byte(got), &mergedOptions)
 278	if err != nil {
 279		slog.Error("Could not create config for call", "err", err)
 280		return options
 281	}
 282
 283	providerType := providerCfg.Type
 284	if providerType == "hyper" {
 285		if strings.Contains(model.CatwalkCfg.ID, "claude") {
 286			providerType = anthropic.Name
 287		} else if strings.Contains(model.CatwalkCfg.ID, "gpt") {
 288			providerType = openai.Name
 289		} else if strings.Contains(model.CatwalkCfg.ID, "gemini") {
 290			providerType = google.Name
 291		} else {
 292			providerType = openaicompat.Name
 293		}
 294	}
 295
 296	switch providerType {
 297	case openai.Name, azure.Name:
 298		_, hasReasoningEffort := mergedOptions["reasoning_effort"]
 299		if !hasReasoningEffort && model.ModelCfg.ReasoningEffort != "" {
 300			mergedOptions["reasoning_effort"] = model.ModelCfg.ReasoningEffort
 301		}
 302		if openai.IsResponsesModel(model.CatwalkCfg.ID) {
 303			if openai.IsResponsesReasoningModel(model.CatwalkCfg.ID) {
 304				mergedOptions["reasoning_summary"] = "auto"
 305				mergedOptions["include"] = []openai.IncludeType{openai.IncludeReasoningEncryptedContent}
 306			}
 307			parsed, err := openai.ParseResponsesOptions(mergedOptions)
 308			if err == nil {
 309				options[openai.Name] = parsed
 310			}
 311		} else {
 312			parsed, err := openai.ParseOptions(mergedOptions)
 313			if err == nil {
 314				options[openai.Name] = parsed
 315			}
 316		}
 317	case anthropic.Name:
 318		var (
 319			_, hasEffort = mergedOptions["effort"]
 320			_, hasThink  = mergedOptions["thinking"]
 321		)
 322		switch {
 323		case !hasEffort && model.ModelCfg.ReasoningEffort != "":
 324			mergedOptions["effort"] = model.ModelCfg.ReasoningEffort
 325		case !hasThink && model.ModelCfg.Think:
 326			mergedOptions["thinking"] = map[string]any{"budget_tokens": 2000}
 327		}
 328		parsed, err := anthropic.ParseOptions(mergedOptions)
 329		if err == nil {
 330			options[anthropic.Name] = parsed
 331		}
 332
 333	case openrouter.Name:
 334		_, hasReasoning := mergedOptions["reasoning"]
 335		if !hasReasoning && model.ModelCfg.ReasoningEffort != "" {
 336			mergedOptions["reasoning"] = map[string]any{
 337				"enabled": true,
 338				"effort":  model.ModelCfg.ReasoningEffort,
 339			}
 340		}
 341		parsed, err := openrouter.ParseOptions(mergedOptions)
 342		if err == nil {
 343			options[openrouter.Name] = parsed
 344		}
 345	case vercel.Name:
 346		_, hasReasoning := mergedOptions["reasoning"]
 347		if !hasReasoning && model.ModelCfg.ReasoningEffort != "" {
 348			mergedOptions["reasoning"] = map[string]any{
 349				"enabled": true,
 350				"effort":  model.ModelCfg.ReasoningEffort,
 351			}
 352		}
 353		parsed, err := vercel.ParseOptions(mergedOptions)
 354		if err == nil {
 355			options[vercel.Name] = parsed
 356		}
 357	case google.Name:
 358		_, hasReasoning := mergedOptions["thinking_config"]
 359		if !hasReasoning {
 360			if strings.HasPrefix(model.CatwalkCfg.ID, "gemini-2") {
 361				mergedOptions["thinking_config"] = map[string]any{
 362					"thinking_budget":  2000,
 363					"include_thoughts": true,
 364				}
 365			} else {
 366				mergedOptions["thinking_config"] = map[string]any{
 367					"thinking_level":   model.ModelCfg.ReasoningEffort,
 368					"include_thoughts": true,
 369				}
 370			}
 371		}
 372		parsed, err := google.ParseOptions(mergedOptions)
 373		if err == nil {
 374			options[google.Name] = parsed
 375		}
 376	case openaicompat.Name:
 377		_, hasReasoningEffort := mergedOptions["reasoning_effort"]
 378		if !hasReasoningEffort && model.ModelCfg.ReasoningEffort != "" {
 379			mergedOptions["reasoning_effort"] = model.ModelCfg.ReasoningEffort
 380		}
 381		parsed, err := openaicompat.ParseOptions(mergedOptions)
 382		if err == nil {
 383			options[openaicompat.Name] = parsed
 384		}
 385	}
 386
 387	return options
 388}
 389
 390func mergeCallOptions(model Model, cfg config.ProviderConfig) (fantasy.ProviderOptions, *float64, *float64, *int64, *float64, *float64) {
 391	modelOptions := getProviderOptions(model, cfg)
 392	temp := cmp.Or(model.ModelCfg.Temperature, model.CatwalkCfg.Options.Temperature)
 393	topP := cmp.Or(model.ModelCfg.TopP, model.CatwalkCfg.Options.TopP)
 394	topK := cmp.Or(model.ModelCfg.TopK, model.CatwalkCfg.Options.TopK)
 395	freqPenalty := cmp.Or(model.ModelCfg.FrequencyPenalty, model.CatwalkCfg.Options.FrequencyPenalty)
 396	presPenalty := cmp.Or(model.ModelCfg.PresencePenalty, model.CatwalkCfg.Options.PresencePenalty)
 397	return modelOptions, temp, topP, topK, freqPenalty, presPenalty
 398}
 399
 400func (c *coordinator) buildAgent(ctx context.Context, prompt *prompt.Prompt, agent config.Agent, isSubAgent bool) (SessionAgent, error) {
 401	large, small, err := c.buildAgentModels(ctx, isSubAgent)
 402	if err != nil {
 403		return nil, err
 404	}
 405
 406	largeProviderCfg, _ := c.cfg.Config().Providers.Get(large.ModelCfg.Provider)
 407	result := NewSessionAgent(SessionAgentOptions{
 408		LargeModel:           large,
 409		SmallModel:           small,
 410		SystemPromptPrefix:   largeProviderCfg.SystemPromptPrefix,
 411		SystemPrompt:         "",
 412		IsSubAgent:           isSubAgent,
 413		DisableAutoSummarize: c.cfg.Config().Options.DisableAutoSummarize,
 414		IsYolo:               c.permissions.SkipRequests(),
 415		Sessions:             c.sessions,
 416		Messages:             c.messages,
 417		Tools:                nil,
 418		Notify:               c.notify,
 419	})
 420
 421	c.readyWg.Go(func() error {
 422		systemPrompt, err := prompt.Build(ctx, large.Model.Provider(), large.Model.Model(), c.cfg)
 423		if err != nil {
 424			return err
 425		}
 426		result.SetSystemPrompt(systemPrompt)
 427		return nil
 428	})
 429
 430	c.readyWg.Go(func() error {
 431		tools, err := c.buildTools(ctx, agent)
 432		if err != nil {
 433			return err
 434		}
 435		result.SetTools(tools)
 436		return nil
 437	})
 438
 439	return result, nil
 440}
 441
 442func (c *coordinator) buildTools(ctx context.Context, agent config.Agent) ([]fantasy.AgentTool, error) {
 443	var allTools []fantasy.AgentTool
 444	if slices.Contains(agent.AllowedTools, AgentToolName) {
 445		agentTool, err := c.agentTool(ctx)
 446		if err != nil {
 447			return nil, err
 448		}
 449		allTools = append(allTools, agentTool)
 450	}
 451
 452	if slices.Contains(agent.AllowedTools, tools.AgenticFetchToolName) {
 453		agenticFetchTool, err := c.agenticFetchTool(ctx, nil)
 454		if err != nil {
 455			return nil, err
 456		}
 457		allTools = append(allTools, agenticFetchTool)
 458	}
 459
 460	// Get the model name for the agent
 461	modelName := ""
 462	if modelCfg, ok := c.cfg.Config().Models[agent.Model]; ok {
 463		if model := c.cfg.Config().GetModel(modelCfg.Provider, modelCfg.Model); model != nil {
 464			modelName = model.Name
 465		}
 466	}
 467
 468	logFile := filepath.Join(c.cfg.Config().Options.DataDirectory, "logs", "crush.log")
 469
 470	allTools = append(allTools,
 471		tools.NewBashTool(c.permissions, c.cfg.WorkingDir(), c.cfg.Config().Options.Attribution, modelName),
 472		tools.NewCrushInfoTool(c.cfg, c.lspManager, c.allSkills, c.activeSkills, c.skillTracker),
 473		tools.NewCrushLogsTool(logFile),
 474		tools.NewJobOutputTool(),
 475		tools.NewJobKillTool(),
 476		tools.NewDownloadTool(c.permissions, c.cfg.WorkingDir(), nil),
 477		tools.NewEditTool(c.lspManager, c.permissions, c.history, c.filetracker, c.cfg.WorkingDir()),
 478		tools.NewMultiEditTool(c.lspManager, c.permissions, c.history, c.filetracker, c.cfg.WorkingDir()),
 479		tools.NewFetchTool(c.permissions, c.cfg.WorkingDir(), nil),
 480		tools.NewGlobTool(c.cfg.WorkingDir()),
 481		tools.NewGrepTool(c.cfg.WorkingDir(), c.cfg.Config().Tools.Grep),
 482		tools.NewLsTool(c.permissions, c.cfg.WorkingDir(), c.cfg.Config().Tools.Ls),
 483		tools.NewSourcegraphTool(nil),
 484		tools.NewTodosTool(c.sessions),
 485		tools.NewViewTool(c.lspManager, c.permissions, c.filetracker, c.skillTracker, c.cfg.WorkingDir(), c.cfg.Config().Options.SkillsPaths...),
 486		tools.NewWriteTool(c.lspManager, c.permissions, c.history, c.filetracker, c.cfg.WorkingDir()),
 487	)
 488
 489	// Add LSP tools if user has configured LSPs or auto_lsp is enabled (nil or true).
 490	if len(c.cfg.Config().LSP) > 0 || c.cfg.Config().Options.AutoLSP == nil || *c.cfg.Config().Options.AutoLSP {
 491		allTools = append(allTools, tools.NewDiagnosticsTool(c.lspManager), tools.NewReferencesTool(c.lspManager), tools.NewLSPRestartTool(c.lspManager))
 492	}
 493
 494	if len(c.cfg.Config().MCP) > 0 {
 495		allTools = append(
 496			allTools,
 497			tools.NewListMCPResourcesTool(c.cfg, c.permissions),
 498			tools.NewReadMCPResourceTool(c.cfg, c.permissions),
 499		)
 500	}
 501
 502	var filteredTools []fantasy.AgentTool
 503	for _, tool := range allTools {
 504		if slices.Contains(agent.AllowedTools, tool.Info().Name) {
 505			filteredTools = append(filteredTools, tool)
 506		}
 507	}
 508
 509	for _, tool := range tools.GetMCPTools(c.permissions, c.cfg, c.cfg.WorkingDir()) {
 510		if agent.AllowedMCP == nil {
 511			// No MCP restrictions
 512			filteredTools = append(filteredTools, tool)
 513			continue
 514		}
 515		if len(agent.AllowedMCP) == 0 {
 516			// No MCPs allowed
 517			slog.Debug("No MCPs allowed", "tool", tool.Name(), "agent", agent.Name)
 518			break
 519		}
 520
 521		for mcp, tools := range agent.AllowedMCP {
 522			if mcp != tool.MCP() {
 523				continue
 524			}
 525			if len(tools) == 0 || slices.Contains(tools, tool.MCPToolName()) {
 526				filteredTools = append(filteredTools, tool)
 527				break
 528			}
 529			slog.Debug("MCP not allowed", "tool", tool.Name(), "agent", agent.Name)
 530		}
 531	}
 532	slices.SortFunc(filteredTools, func(a, b fantasy.AgentTool) int {
 533		return strings.Compare(a.Info().Name, b.Info().Name)
 534	})
 535	return filteredTools, nil
 536}
 537
 538// TODO: when we support multiple agents we need to change this so that we pass in the agent specific model config
 539func (c *coordinator) buildAgentModels(ctx context.Context, isSubAgent bool) (Model, Model, error) {
 540	largeModelCfg, ok := c.cfg.Config().Models[config.SelectedModelTypeLarge]
 541	if !ok {
 542		return Model{}, Model{}, errLargeModelNotSelected
 543	}
 544	smallModelCfg, ok := c.cfg.Config().Models[config.SelectedModelTypeSmall]
 545	if !ok {
 546		return Model{}, Model{}, errSmallModelNotSelected
 547	}
 548
 549	largeProviderCfg, ok := c.cfg.Config().Providers.Get(largeModelCfg.Provider)
 550	if !ok {
 551		return Model{}, Model{}, errLargeModelProviderNotConfigured
 552	}
 553
 554	largeProvider, err := c.buildProvider(largeProviderCfg, largeModelCfg, isSubAgent)
 555	if err != nil {
 556		return Model{}, Model{}, err
 557	}
 558
 559	smallProviderCfg, ok := c.cfg.Config().Providers.Get(smallModelCfg.Provider)
 560	if !ok {
 561		return Model{}, Model{}, errSmallModelProviderNotConfigured
 562	}
 563
 564	smallProvider, err := c.buildProvider(smallProviderCfg, smallModelCfg, true)
 565	if err != nil {
 566		return Model{}, Model{}, err
 567	}
 568
 569	var largeCatwalkModel *catwalk.Model
 570	var smallCatwalkModel *catwalk.Model
 571
 572	for _, m := range largeProviderCfg.Models {
 573		if m.ID == largeModelCfg.Model {
 574			largeCatwalkModel = &m
 575		}
 576	}
 577	for _, m := range smallProviderCfg.Models {
 578		if m.ID == smallModelCfg.Model {
 579			smallCatwalkModel = &m
 580		}
 581	}
 582
 583	if largeCatwalkModel == nil {
 584		return Model{}, Model{}, errLargeModelNotFound
 585	}
 586
 587	if smallCatwalkModel == nil {
 588		return Model{}, Model{}, errSmallModelNotFound
 589	}
 590
 591	largeModelID := largeModelCfg.Model
 592	smallModelID := smallModelCfg.Model
 593
 594	if largeModelCfg.Provider == openrouter.Name && isExactoSupported(largeModelID) {
 595		largeModelID += ":exacto"
 596	}
 597
 598	if smallModelCfg.Provider == openrouter.Name && isExactoSupported(smallModelID) {
 599		smallModelID += ":exacto"
 600	}
 601
 602	largeModel, err := largeProvider.LanguageModel(ctx, largeModelID)
 603	if err != nil {
 604		return Model{}, Model{}, err
 605	}
 606	smallModel, err := smallProvider.LanguageModel(ctx, smallModelID)
 607	if err != nil {
 608		return Model{}, Model{}, err
 609	}
 610
 611	return Model{
 612			Model:      largeModel,
 613			CatwalkCfg: *largeCatwalkModel,
 614			ModelCfg:   largeModelCfg,
 615		}, Model{
 616			Model:      smallModel,
 617			CatwalkCfg: *smallCatwalkModel,
 618			ModelCfg:   smallModelCfg,
 619		}, nil
 620}
 621
 622func (c *coordinator) buildAnthropicProvider(baseURL, apiKey string, headers map[string]string, providerID string) (fantasy.Provider, error) {
 623	var opts []anthropic.Option
 624
 625	switch {
 626	case strings.HasPrefix(apiKey, "Bearer "):
 627		// NOTE: Prevent the SDK from picking up the API key from env.
 628		os.Setenv("ANTHROPIC_API_KEY", "")
 629		headers["Authorization"] = apiKey
 630	case providerID == string(catwalk.InferenceProviderMiniMax) || providerID == string(catwalk.InferenceProviderMiniMaxChina):
 631		// NOTE: Prevent the SDK from picking up the API key from env.
 632		os.Setenv("ANTHROPIC_API_KEY", "")
 633		headers["Authorization"] = "Bearer " + apiKey
 634	case apiKey != "":
 635		// X-Api-Key header
 636		opts = append(opts, anthropic.WithAPIKey(apiKey))
 637	}
 638
 639	if len(headers) > 0 {
 640		opts = append(opts, anthropic.WithHeaders(headers))
 641	}
 642
 643	if baseURL != "" {
 644		opts = append(opts, anthropic.WithBaseURL(baseURL))
 645	}
 646
 647	if c.cfg.Config().Options.Debug {
 648		httpClient := log.NewHTTPClient()
 649		opts = append(opts, anthropic.WithHTTPClient(httpClient))
 650	}
 651	return anthropic.New(opts...)
 652}
 653
 654func (c *coordinator) buildOpenaiProvider(baseURL, apiKey string, headers map[string]string) (fantasy.Provider, error) {
 655	opts := []openai.Option{
 656		openai.WithAPIKey(apiKey),
 657		openai.WithUseResponsesAPI(),
 658	}
 659	if c.cfg.Config().Options.Debug {
 660		httpClient := log.NewHTTPClient()
 661		opts = append(opts, openai.WithHTTPClient(httpClient))
 662	}
 663	if len(headers) > 0 {
 664		opts = append(opts, openai.WithHeaders(headers))
 665	}
 666	if baseURL != "" {
 667		opts = append(opts, openai.WithBaseURL(baseURL))
 668	}
 669	return openai.New(opts...)
 670}
 671
 672func (c *coordinator) buildOpenrouterProvider(_, apiKey string, headers map[string]string) (fantasy.Provider, error) {
 673	opts := []openrouter.Option{
 674		openrouter.WithAPIKey(apiKey),
 675	}
 676	if c.cfg.Config().Options.Debug {
 677		httpClient := log.NewHTTPClient()
 678		opts = append(opts, openrouter.WithHTTPClient(httpClient))
 679	}
 680	if len(headers) > 0 {
 681		opts = append(opts, openrouter.WithHeaders(headers))
 682	}
 683	return openrouter.New(opts...)
 684}
 685
 686func (c *coordinator) buildVercelProvider(_, apiKey string, headers map[string]string) (fantasy.Provider, error) {
 687	opts := []vercel.Option{
 688		vercel.WithAPIKey(apiKey),
 689	}
 690	if c.cfg.Config().Options.Debug {
 691		httpClient := log.NewHTTPClient()
 692		opts = append(opts, vercel.WithHTTPClient(httpClient))
 693	}
 694	if len(headers) > 0 {
 695		opts = append(opts, vercel.WithHeaders(headers))
 696	}
 697	return vercel.New(opts...)
 698}
 699
 700func (c *coordinator) buildOpenaiCompatProvider(baseURL, apiKey string, headers map[string]string, extraBody map[string]any, providerID string, isSubAgent bool) (fantasy.Provider, error) {
 701	opts := []openaicompat.Option{
 702		openaicompat.WithBaseURL(baseURL),
 703		openaicompat.WithAPIKey(apiKey),
 704	}
 705
 706	// Set HTTP client based on provider and debug mode.
 707	var httpClient *http.Client
 708	if providerID == string(catwalk.InferenceProviderCopilot) {
 709		opts = append(opts, openaicompat.WithUseResponsesAPI())
 710		httpClient = copilot.NewClient(isSubAgent, c.cfg.Config().Options.Debug)
 711	} else if c.cfg.Config().Options.Debug {
 712		httpClient = log.NewHTTPClient()
 713	}
 714	if httpClient != nil {
 715		opts = append(opts, openaicompat.WithHTTPClient(httpClient))
 716	}
 717
 718	if len(headers) > 0 {
 719		opts = append(opts, openaicompat.WithHeaders(headers))
 720	}
 721
 722	for extraKey, extraValue := range extraBody {
 723		opts = append(opts, openaicompat.WithSDKOptions(openaisdk.WithJSONSet(extraKey, extraValue)))
 724	}
 725
 726	return openaicompat.New(opts...)
 727}
 728
 729func (c *coordinator) buildAzureProvider(baseURL, apiKey string, headers map[string]string, options map[string]string) (fantasy.Provider, error) {
 730	opts := []azure.Option{
 731		azure.WithBaseURL(baseURL),
 732		azure.WithAPIKey(apiKey),
 733		azure.WithUseResponsesAPI(),
 734	}
 735	if c.cfg.Config().Options.Debug {
 736		httpClient := log.NewHTTPClient()
 737		opts = append(opts, azure.WithHTTPClient(httpClient))
 738	}
 739	if options == nil {
 740		options = make(map[string]string)
 741	}
 742	if apiVersion, ok := options["apiVersion"]; ok {
 743		opts = append(opts, azure.WithAPIVersion(apiVersion))
 744	}
 745	if len(headers) > 0 {
 746		opts = append(opts, azure.WithHeaders(headers))
 747	}
 748
 749	return azure.New(opts...)
 750}
 751
 752func (c *coordinator) buildBedrockProvider(apiKey string, headers map[string]string) (fantasy.Provider, error) {
 753	var opts []bedrock.Option
 754	if c.cfg.Config().Options.Debug {
 755		httpClient := log.NewHTTPClient()
 756		opts = append(opts, bedrock.WithHTTPClient(httpClient))
 757	}
 758	if len(headers) > 0 {
 759		opts = append(opts, bedrock.WithHeaders(headers))
 760	}
 761	switch {
 762	case apiKey != "":
 763		opts = append(opts, bedrock.WithAPIKey(apiKey))
 764	case os.Getenv("AWS_BEARER_TOKEN_BEDROCK") != "":
 765		opts = append(opts, bedrock.WithAPIKey(os.Getenv("AWS_BEARER_TOKEN_BEDROCK")))
 766	default:
 767		// Skip, let the SDK do authentication.
 768	}
 769	return bedrock.New(opts...)
 770}
 771
 772func (c *coordinator) buildGoogleProvider(baseURL, apiKey string, headers map[string]string) (fantasy.Provider, error) {
 773	opts := []google.Option{
 774		google.WithBaseURL(baseURL),
 775		google.WithGeminiAPIKey(apiKey),
 776	}
 777	if c.cfg.Config().Options.Debug {
 778		httpClient := log.NewHTTPClient()
 779		opts = append(opts, google.WithHTTPClient(httpClient))
 780	}
 781	if len(headers) > 0 {
 782		opts = append(opts, google.WithHeaders(headers))
 783	}
 784	return google.New(opts...)
 785}
 786
 787func (c *coordinator) buildGoogleVertexProvider(headers map[string]string, options map[string]string) (fantasy.Provider, error) {
 788	opts := []google.Option{}
 789	if c.cfg.Config().Options.Debug {
 790		httpClient := log.NewHTTPClient()
 791		opts = append(opts, google.WithHTTPClient(httpClient))
 792	}
 793	if len(headers) > 0 {
 794		opts = append(opts, google.WithHeaders(headers))
 795	}
 796
 797	project := options["project"]
 798	location := options["location"]
 799
 800	opts = append(opts, google.WithVertex(project, location))
 801
 802	return google.New(opts...)
 803}
 804
 805func (c *coordinator) isAnthropicThinking(model config.SelectedModel) bool {
 806	if model.Think {
 807		return true
 808	}
 809	opts, err := anthropic.ParseOptions(model.ProviderOptions)
 810	return err == nil && opts.Thinking != nil
 811}
 812
 813func (c *coordinator) buildProvider(providerCfg config.ProviderConfig, model config.SelectedModel, isSubAgent bool) (fantasy.Provider, error) {
 814	headers := maps.Clone(providerCfg.ExtraHeaders)
 815	if headers == nil {
 816		headers = make(map[string]string)
 817	}
 818
 819	// handle special headers for anthropic
 820	if providerCfg.Type == anthropic.Name && c.isAnthropicThinking(model) {
 821		if v, ok := headers["anthropic-beta"]; ok {
 822			headers["anthropic-beta"] = v + ",interleaved-thinking-2025-05-14"
 823		} else {
 824			headers["anthropic-beta"] = "interleaved-thinking-2025-05-14"
 825		}
 826	}
 827
 828	apiKey, _ := c.cfg.Resolve(providerCfg.APIKey)
 829	baseURL, _ := c.cfg.Resolve(providerCfg.BaseURL)
 830
 831	switch providerCfg.Type {
 832	case openai.Name:
 833		return c.buildOpenaiProvider(baseURL, apiKey, headers)
 834	case anthropic.Name:
 835		return c.buildAnthropicProvider(baseURL, apiKey, headers, providerCfg.ID)
 836	case openrouter.Name:
 837		return c.buildOpenrouterProvider(baseURL, apiKey, headers)
 838	case vercel.Name:
 839		return c.buildVercelProvider(baseURL, apiKey, headers)
 840	case azure.Name:
 841		return c.buildAzureProvider(baseURL, apiKey, headers, providerCfg.ExtraParams)
 842	case bedrock.Name:
 843		return c.buildBedrockProvider(apiKey, headers)
 844	case google.Name:
 845		return c.buildGoogleProvider(baseURL, apiKey, headers)
 846	case "google-vertex":
 847		return c.buildGoogleVertexProvider(headers, providerCfg.ExtraParams)
 848	case openaicompat.Name, hyper.Name:
 849		switch providerCfg.ID {
 850		case hyper.Name:
 851			baseURL = hyper.BaseURL() + "/v1"
 852			headers["x-crush-id"] = event.GetID()
 853		case string(catwalk.InferenceProviderZAI):
 854			if providerCfg.ExtraBody == nil {
 855				providerCfg.ExtraBody = map[string]any{}
 856			}
 857			providerCfg.ExtraBody["tool_stream"] = true
 858		}
 859		return c.buildOpenaiCompatProvider(baseURL, apiKey, headers, providerCfg.ExtraBody, providerCfg.ID, isSubAgent)
 860	default:
 861		return nil, fmt.Errorf("provider type not supported: %q", providerCfg.Type)
 862	}
 863}
 864
 865func isExactoSupported(modelID string) bool {
 866	supportedModels := []string{
 867		"moonshotai/kimi-k2-0905",
 868		"deepseek/deepseek-v3.1-terminus",
 869		"z-ai/glm-4.6",
 870		"openai/gpt-oss-120b",
 871		"qwen/qwen3-coder",
 872	}
 873	return slices.Contains(supportedModels, modelID)
 874}
 875
 876func (c *coordinator) Cancel(sessionID string) {
 877	c.currentAgent.Cancel(sessionID)
 878}
 879
 880func (c *coordinator) CancelAll() {
 881	c.currentAgent.CancelAll()
 882}
 883
 884func (c *coordinator) ClearQueue(sessionID string) {
 885	c.currentAgent.ClearQueue(sessionID)
 886}
 887
 888func (c *coordinator) IsBusy() bool {
 889	return c.currentAgent.IsBusy()
 890}
 891
 892func (c *coordinator) IsSessionBusy(sessionID string) bool {
 893	return c.currentAgent.IsSessionBusy(sessionID)
 894}
 895
 896func (c *coordinator) Model() Model {
 897	return c.currentAgent.Model()
 898}
 899
 900func (c *coordinator) UpdateModels(ctx context.Context) error {
 901	// build the models again so we make sure we get the latest config
 902	large, small, err := c.buildAgentModels(ctx, false)
 903	if err != nil {
 904		return err
 905	}
 906	c.currentAgent.SetModels(large, small)
 907
 908	agentCfg, ok := c.cfg.Config().Agents[config.AgentCoder]
 909	if !ok {
 910		return errCoderAgentNotConfigured
 911	}
 912
 913	tools, err := c.buildTools(ctx, agentCfg)
 914	if err != nil {
 915		return err
 916	}
 917	c.currentAgent.SetTools(tools)
 918	return nil
 919}
 920
 921func (c *coordinator) QueuedPrompts(sessionID string) int {
 922	return c.currentAgent.QueuedPrompts(sessionID)
 923}
 924
 925func (c *coordinator) QueuedPromptsList(sessionID string) []string {
 926	return c.currentAgent.QueuedPromptsList(sessionID)
 927}
 928
 929func (c *coordinator) Summarize(ctx context.Context, sessionID string) error {
 930	providerCfg, ok := c.cfg.Config().Providers.Get(c.currentAgent.Model().ModelCfg.Provider)
 931	if !ok {
 932		return errModelProviderNotConfigured
 933	}
 934	return c.currentAgent.Summarize(ctx, sessionID, getProviderOptions(c.currentAgent.Model(), providerCfg))
 935}
 936
 937func (c *coordinator) isUnauthorized(err error) bool {
 938	var providerErr *fantasy.ProviderError
 939	return errors.As(err, &providerErr) && providerErr.StatusCode == http.StatusUnauthorized
 940}
 941
 942func (c *coordinator) refreshOAuth2Token(ctx context.Context, providerCfg config.ProviderConfig) error {
 943	if err := c.cfg.RefreshOAuthToken(ctx, config.ScopeGlobal, providerCfg.ID); err != nil {
 944		slog.Error("Failed to refresh OAuth token after 401 error", "provider", providerCfg.ID, "error", err)
 945		return err
 946	}
 947	if err := c.UpdateModels(ctx); err != nil {
 948		return err
 949	}
 950	return nil
 951}
 952
 953func (c *coordinator) refreshApiKeyTemplate(ctx context.Context, providerCfg config.ProviderConfig) error {
 954	newAPIKey, err := c.cfg.Resolve(providerCfg.APIKeyTemplate)
 955	if err != nil {
 956		slog.Error("Failed to re-resolve API key after 401 error", "provider", providerCfg.ID, "error", err)
 957		return err
 958	}
 959
 960	providerCfg.APIKey = newAPIKey
 961	c.cfg.Config().Providers.Set(providerCfg.ID, providerCfg)
 962
 963	if err := c.UpdateModels(ctx); err != nil {
 964		return err
 965	}
 966	return nil
 967}
 968
 969// subAgentParams holds the parameters for running a sub-agent.
 970type subAgentParams struct {
 971	Agent          SessionAgent
 972	SessionID      string
 973	AgentMessageID string
 974	ToolCallID     string
 975	Prompt         string
 976	SessionTitle   string
 977	// SessionSetup is an optional callback invoked after session creation
 978	// but before agent execution, for custom session configuration.
 979	SessionSetup func(sessionID string)
 980}
 981
 982// runSubAgent runs a sub-agent and handles session management and cost accumulation.
 983// It creates a sub-session, runs the agent with the given prompt, and propagates
 984// the cost to the parent session.
 985func (c *coordinator) runSubAgent(ctx context.Context, params subAgentParams) (fantasy.ToolResponse, error) {
 986	// Create sub-session
 987	agentToolSessionID := c.sessions.CreateAgentToolSessionID(params.AgentMessageID, params.ToolCallID)
 988	session, err := c.sessions.CreateTaskSession(ctx, agentToolSessionID, params.SessionID, params.SessionTitle)
 989	if err != nil {
 990		return fantasy.ToolResponse{}, fmt.Errorf("create session: %w", err)
 991	}
 992
 993	// Call session setup function if provided
 994	if params.SessionSetup != nil {
 995		params.SessionSetup(session.ID)
 996	}
 997
 998	// Get model configuration
 999	model := params.Agent.Model()
1000	maxTokens := model.CatwalkCfg.DefaultMaxTokens
1001	if model.ModelCfg.MaxTokens != 0 {
1002		maxTokens = model.ModelCfg.MaxTokens
1003	}
1004
1005	providerCfg, ok := c.cfg.Config().Providers.Get(model.ModelCfg.Provider)
1006	if !ok {
1007		return fantasy.ToolResponse{}, errModelProviderNotConfigured
1008	}
1009
1010	// Run the agent
1011	result, err := params.Agent.Run(ctx, SessionAgentCall{
1012		SessionID:        session.ID,
1013		Prompt:           params.Prompt,
1014		MaxOutputTokens:  maxTokens,
1015		ProviderOptions:  getProviderOptions(model, providerCfg),
1016		Temperature:      model.ModelCfg.Temperature,
1017		TopP:             model.ModelCfg.TopP,
1018		TopK:             model.ModelCfg.TopK,
1019		FrequencyPenalty: model.ModelCfg.FrequencyPenalty,
1020		PresencePenalty:  model.ModelCfg.PresencePenalty,
1021		NonInteractive:   true,
1022	})
1023	if err != nil {
1024		return fantasy.NewTextErrorResponse("error generating response"), nil
1025	}
1026
1027	// Update parent session cost
1028	if err := c.updateParentSessionCost(ctx, session.ID, params.SessionID); err != nil {
1029		return fantasy.ToolResponse{}, err
1030	}
1031
1032	return fantasy.NewTextResponse(result.Response.Content.Text()), nil
1033}
1034
1035// updateParentSessionCost accumulates the cost from a child session to its parent session.
1036func (c *coordinator) updateParentSessionCost(ctx context.Context, childSessionID, parentSessionID string) error {
1037	childSession, err := c.sessions.Get(ctx, childSessionID)
1038	if err != nil {
1039		return fmt.Errorf("get child session: %w", err)
1040	}
1041
1042	parentSession, err := c.sessions.Get(ctx, parentSessionID)
1043	if err != nil {
1044		return fmt.Errorf("get parent session: %w", err)
1045	}
1046
1047	parentSession.Cost += childSession.Cost
1048
1049	if _, err := c.sessions.Save(ctx, parentSession); err != nil {
1050		return fmt.Errorf("save parent session: %w", err)
1051	}
1052
1053	return nil
1054}
1055
1056// discoverSkills runs the skill discovery pipeline and returns both the
1057// pre-filter (all discovered, after dedup) and post-filter (active) lists.
1058// It also emits a single diagnostic log line summarising the outcome to
1059// help track skill-loading health over time.
1060func discoverSkills(cfg *config.ConfigStore) (allSkills, activeSkills []*skills.Skill) {
1061	builtin, builtinStates := skills.DiscoverBuiltinWithStates()
1062	discovered := append([]*skills.Skill(nil), builtin...)
1063
1064	var userStates []*skills.SkillState
1065	var userPaths []string
1066
1067	opts := cfg.Config().Options
1068	if opts != nil && len(opts.SkillsPaths) > 0 {
1069		userPaths = make([]string, 0, len(opts.SkillsPaths))
1070		for _, pth := range opts.SkillsPaths {
1071			expanded := home.Long(pth)
1072			if strings.HasPrefix(expanded, "$") {
1073				if resolved, err := cfg.Resolver().ResolveValue(expanded); err == nil {
1074					expanded = resolved
1075				}
1076			}
1077			userPaths = append(userPaths, expanded)
1078		}
1079		var userSkills []*skills.Skill
1080		userSkills, userStates = skills.DiscoverWithStates(userPaths)
1081		discovered = append(discovered, userSkills...)
1082	}
1083
1084	allSkills = skills.Deduplicate(discovered)
1085	var disabledSkills []string
1086	if opts != nil {
1087		disabledSkills = opts.DisabledSkills
1088	}
1089	activeSkills = skills.Filter(allSkills, disabledSkills)
1090
1091	logDiscoveryStats(builtin, builtinStates, userStates, userPaths, allSkills, activeSkills, disabledSkills)
1092	return allSkills, activeSkills
1093}
1094
1095// logTurnSkillUsage emits a per-turn diagnostic line showing which skills
1096// (if any) were loaded during this turn and which looked relevant based on
1097// a cheap keyword match against the user prompt. The goal is to surface
1098// "should-have-loaded but didn't" situations for later analysis.
1099//
1100// Logged at Info level under component=skills; heavy fields are elided when
1101// there is nothing interesting to report.
1102func logTurnSkillUsage(
1103	sessionID string,
1104	prompt string,
1105	activeSkills []*skills.Skill,
1106	tracker *skills.Tracker,
1107	before []string,
1108) {
1109	if tracker == nil || len(activeSkills) == 0 {
1110		return
1111	}
1112
1113	after := tracker.LoadedNames()
1114
1115	beforeSet := make(map[string]bool, len(before))
1116	for _, n := range before {
1117		beforeSet[n] = true
1118	}
1119	var loadedThisTurn []string
1120	for _, n := range after {
1121		if !beforeSet[n] {
1122			loadedThisTurn = append(loadedThisTurn, n)
1123		}
1124	}
1125
1126	slog.Info("Skill turn summary",
1127		"component", "skills",
1128		"session_id", sessionID,
1129		"prompt_len", len(prompt),
1130		"active_total", len(activeSkills),
1131		"loaded_total", len(after),
1132		"loaded_this_turn", loadedThisTurn,
1133	)
1134}
1135
1136// logDiscoveryStats emits a single structured log line summarising skill
1137// discovery for the current session. It is intentionally low-volume: one
1138// line per session start.
1139func logDiscoveryStats(
1140	builtin []*skills.Skill,
1141	builtinStates, userStates []*skills.SkillState,
1142	userPaths []string,
1143	allSkills, activeSkills []*skills.Skill,
1144	disabled []string,
1145) {
1146	countErrors := func(states []*skills.SkillState) int {
1147		n := 0
1148		for _, s := range states {
1149			if s.State == skills.StateError {
1150				n++
1151			}
1152		}
1153		return n
1154	}
1155
1156	userOK := 0
1157	for _, s := range userStates {
1158		if s.State == skills.StateNormal {
1159			userOK++
1160		}
1161	}
1162
1163	activeNames := make([]string, 0, len(activeSkills))
1164	for _, s := range activeSkills {
1165		activeNames = append(activeNames, s.Name)
1166	}
1167
1168	xml := skills.ToPromptXML(activeSkills)
1169
1170	slog.Info("Skill discovery complete",
1171		"component", "skills",
1172		"builtin_ok", len(builtin),
1173		"builtin_errors", countErrors(builtinStates),
1174		"user_ok", userOK,
1175		"user_errors", countErrors(userStates),
1176		"user_paths", len(userPaths),
1177		"deduped_total", len(allSkills),
1178		"active", len(activeSkills),
1179		"disabled", len(disabled),
1180		"prompt_bytes", len(xml),
1181		"prompt_tok_est", skills.ApproxTokenCount(xml),
1182		"active_names", activeNames,
1183	)
1184}