coordinator.go

   1package agent
   2
   3import (
   4	"bytes"
   5	"cmp"
   6	"context"
   7	"encoding/json"
   8	"errors"
   9	"fmt"
  10	"io"
  11	"log/slog"
  12	"maps"
  13	"net/http"
  14	"os"
  15	"path/filepath"
  16	"slices"
  17	"strings"
  18
  19	"charm.land/catwalk/pkg/catwalk"
  20	"charm.land/fantasy"
  21	"github.com/charmbracelet/crush/internal/agent/hyper"
  22	"github.com/charmbracelet/crush/internal/agent/notify"
  23	"github.com/charmbracelet/crush/internal/agent/prompt"
  24	"github.com/charmbracelet/crush/internal/agent/tools"
  25	"github.com/charmbracelet/crush/internal/config"
  26	"github.com/charmbracelet/crush/internal/event"
  27	"github.com/charmbracelet/crush/internal/filetracker"
  28	"github.com/charmbracelet/crush/internal/history"
  29	"github.com/charmbracelet/crush/internal/home"
  30	"github.com/charmbracelet/crush/internal/log"
  31	"github.com/charmbracelet/crush/internal/lsp"
  32	"github.com/charmbracelet/crush/internal/message"
  33	"github.com/charmbracelet/crush/internal/oauth/copilot"
  34	"github.com/charmbracelet/crush/internal/permission"
  35	"github.com/charmbracelet/crush/internal/pubsub"
  36	"github.com/charmbracelet/crush/internal/session"
  37	"github.com/charmbracelet/crush/internal/skills"
  38	"golang.org/x/sync/errgroup"
  39
  40	"charm.land/fantasy/providers/anthropic"
  41	"charm.land/fantasy/providers/azure"
  42	"charm.land/fantasy/providers/bedrock"
  43	"charm.land/fantasy/providers/google"
  44	"charm.land/fantasy/providers/openai"
  45	"charm.land/fantasy/providers/openaicompat"
  46	"charm.land/fantasy/providers/openrouter"
  47	"charm.land/fantasy/providers/vercel"
  48	openaisdk "github.com/charmbracelet/openai-go/option"
  49	"github.com/qjebbs/go-jsons"
  50)
  51
  52// Coordinator errors.
  53var (
  54	errCoderAgentNotConfigured         = errors.New("coder agent not configured")
  55	errModelProviderNotConfigured      = errors.New("model provider not configured")
  56	errLargeModelNotSelected           = errors.New("large model not selected")
  57	errSmallModelNotSelected           = errors.New("small model not selected")
  58	errLargeModelProviderNotConfigured = errors.New("large model provider not configured")
  59	errSmallModelProviderNotConfigured = errors.New("small model provider not configured")
  60	errLargeModelNotFound              = errors.New("large model not found in provider config")
  61	errSmallModelNotFound              = errors.New("small model not found in provider config")
  62)
  63
  64type Coordinator interface {
  65	// INFO: (kujtim) this is not used yet we will use this when we have multiple agents
  66	// SetMainAgent(string)
  67	Run(ctx context.Context, sessionID, prompt string, attachments ...message.Attachment) (*fantasy.AgentResult, error)
  68	Cancel(sessionID string)
  69	CancelAll()
  70	IsSessionBusy(sessionID string) bool
  71	IsBusy() bool
  72	QueuedPrompts(sessionID string) int
  73	QueuedPromptsList(sessionID string) []string
  74	ClearQueue(sessionID string)
  75	Summarize(context.Context, string) error
  76	Model() Model
  77	UpdateModels(ctx context.Context) error
  78}
  79
  80type coordinator struct {
  81	cfg         *config.ConfigStore
  82	sessions    session.Service
  83	messages    message.Service
  84	permissions permission.Service
  85	history     history.Service
  86	filetracker filetracker.Service
  87	lspManager  *lsp.Manager
  88	notify      pubsub.Publisher[notify.Notification]
  89
  90	currentAgent SessionAgent
  91	agents       map[string]SessionAgent
  92
  93	// Skills discovery results (session-start snapshot).
  94	allSkills    []*skills.Skill // Pre-filter: all discovered after dedup.
  95	activeSkills []*skills.Skill // Post-filter: active skills only.
  96	skillTracker *skills.Tracker
  97
  98	readyWg errgroup.Group
  99}
 100
 101func NewCoordinator(
 102	ctx context.Context,
 103	cfg *config.ConfigStore,
 104	sessions session.Service,
 105	messages message.Service,
 106	permissions permission.Service,
 107	history history.Service,
 108	filetracker filetracker.Service,
 109	lspManager *lsp.Manager,
 110	notify pubsub.Publisher[notify.Notification],
 111) (Coordinator, error) {
 112	// Discover skills once at session start.
 113	allSkills, activeSkills := discoverSkills(cfg)
 114	skillTracker := skills.NewTracker(activeSkills)
 115
 116	c := &coordinator{
 117		cfg:          cfg,
 118		sessions:     sessions,
 119		messages:     messages,
 120		permissions:  permissions,
 121		history:      history,
 122		filetracker:  filetracker,
 123		lspManager:   lspManager,
 124		notify:       notify,
 125		agents:       make(map[string]SessionAgent),
 126		allSkills:    allSkills,
 127		activeSkills: activeSkills,
 128		skillTracker: skillTracker,
 129	}
 130
 131	agentCfg, ok := cfg.Config().Agents[config.AgentCoder]
 132	if !ok {
 133		return nil, errCoderAgentNotConfigured
 134	}
 135
 136	// TODO: make this dynamic when we support multiple agents
 137	prompt, err := coderPrompt(prompt.WithWorkingDir(c.cfg.WorkingDir()))
 138	if err != nil {
 139		return nil, err
 140	}
 141
 142	agent, err := c.buildAgent(ctx, prompt, agentCfg, false)
 143	if err != nil {
 144		return nil, err
 145	}
 146	c.currentAgent = agent
 147	c.agents[config.AgentCoder] = agent
 148	return c, nil
 149}
 150
 151// Run implements Coordinator.
 152func (c *coordinator) Run(ctx context.Context, sessionID string, prompt string, attachments ...message.Attachment) (*fantasy.AgentResult, error) {
 153	if err := c.readyWg.Wait(); err != nil {
 154		return nil, err
 155	}
 156
 157	// refresh models before each run
 158	if err := c.UpdateModels(ctx); err != nil {
 159		return nil, fmt.Errorf("failed to update models: %w", err)
 160	}
 161
 162	model := c.currentAgent.Model()
 163	maxTokens := model.CatwalkCfg.DefaultMaxTokens
 164	if model.ModelCfg.MaxTokens != 0 {
 165		maxTokens = model.ModelCfg.MaxTokens
 166	}
 167
 168	if !model.CatwalkCfg.SupportsImages && attachments != nil {
 169		// filter out image attachments
 170		filteredAttachments := make([]message.Attachment, 0, len(attachments))
 171		for _, att := range attachments {
 172			if att.IsText() {
 173				filteredAttachments = append(filteredAttachments, att)
 174			}
 175		}
 176		attachments = filteredAttachments
 177	}
 178
 179	providerCfg, ok := c.cfg.Config().Providers.Get(model.ModelCfg.Provider)
 180	if !ok {
 181		return nil, errModelProviderNotConfigured
 182	}
 183
 184	mergedOptions, temp, topP, topK, freqPenalty, presPenalty := mergeCallOptions(model, providerCfg)
 185
 186	if providerCfg.OAuthToken != nil && providerCfg.OAuthToken.IsExpired() {
 187		slog.Debug("Token needs to be refreshed", "provider", providerCfg.ID)
 188		if err := c.refreshOAuth2Token(ctx, providerCfg); err != nil {
 189			return nil, err
 190		}
 191	}
 192
 193	run := func() (*fantasy.AgentResult, error) {
 194		return c.currentAgent.Run(ctx, SessionAgentCall{
 195			SessionID:        sessionID,
 196			Prompt:           prompt,
 197			Attachments:      attachments,
 198			MaxOutputTokens:  maxTokens,
 199			ProviderOptions:  mergedOptions,
 200			Temperature:      temp,
 201			TopP:             topP,
 202			TopK:             topK,
 203			FrequencyPenalty: freqPenalty,
 204			PresencePenalty:  presPenalty,
 205		})
 206	}
 207	beforeLoaded := c.skillTracker.LoadedNames()
 208	result, originalErr := run()
 209	logTurnSkillUsage(sessionID, prompt, c.activeSkills, c.skillTracker, beforeLoaded)
 210
 211	if c.isUnauthorized(originalErr) {
 212		switch {
 213		case providerCfg.OAuthToken != nil:
 214			slog.Debug("Received 401. Refreshing token and retrying", "provider", providerCfg.ID)
 215			if err := c.refreshOAuth2Token(ctx, providerCfg); err != nil {
 216				return nil, originalErr
 217			}
 218			slog.Debug("Retrying request with refreshed OAuth token", "provider", providerCfg.ID)
 219			return run()
 220		case strings.Contains(providerCfg.APIKeyTemplate, "$"):
 221			slog.Debug("Received 401. Refreshing API Key template and retrying", "provider", providerCfg.ID)
 222			if err := c.refreshApiKeyTemplate(ctx, providerCfg); err != nil {
 223				return nil, originalErr
 224			}
 225			slog.Debug("Retrying request with refreshed API key", "provider", providerCfg.ID)
 226			return run()
 227		}
 228	}
 229
 230	return result, originalErr
 231}
 232
 233func getProviderOptions(model Model, providerCfg config.ProviderConfig) fantasy.ProviderOptions {
 234	options := fantasy.ProviderOptions{}
 235
 236	cfgOpts := []byte("{}")
 237	providerCfgOpts := []byte("{}")
 238	catwalkOpts := []byte("{}")
 239
 240	if model.ModelCfg.ProviderOptions != nil {
 241		data, err := json.Marshal(model.ModelCfg.ProviderOptions)
 242		if err == nil {
 243			cfgOpts = data
 244		}
 245	}
 246
 247	if providerCfg.ProviderOptions != nil {
 248		data, err := json.Marshal(providerCfg.ProviderOptions)
 249		if err == nil {
 250			providerCfgOpts = data
 251		}
 252	}
 253
 254	if model.CatwalkCfg.Options.ProviderOptions != nil {
 255		data, err := json.Marshal(model.CatwalkCfg.Options.ProviderOptions)
 256		if err == nil {
 257			catwalkOpts = data
 258		}
 259	}
 260
 261	readers := []io.Reader{
 262		bytes.NewReader(catwalkOpts),
 263		bytes.NewReader(providerCfgOpts),
 264		bytes.NewReader(cfgOpts),
 265	}
 266
 267	got, err := jsons.Merge(readers)
 268	if err != nil {
 269		slog.Error("Could not merge call config", "err", err)
 270		return options
 271	}
 272
 273	mergedOptions := make(map[string]any)
 274
 275	err = json.Unmarshal([]byte(got), &mergedOptions)
 276	if err != nil {
 277		slog.Error("Could not create config for call", "err", err)
 278		return options
 279	}
 280
 281	providerType := providerCfg.Type
 282	if providerType == "hyper" {
 283		if strings.Contains(model.CatwalkCfg.ID, "claude") {
 284			providerType = anthropic.Name
 285		} else if strings.Contains(model.CatwalkCfg.ID, "gpt") {
 286			providerType = openai.Name
 287		} else if strings.Contains(model.CatwalkCfg.ID, "gemini") {
 288			providerType = google.Name
 289		} else {
 290			providerType = openaicompat.Name
 291		}
 292	}
 293
 294	switch providerType {
 295	case openai.Name, azure.Name:
 296		_, hasReasoningEffort := mergedOptions["reasoning_effort"]
 297		if !hasReasoningEffort && model.ModelCfg.ReasoningEffort != "" {
 298			mergedOptions["reasoning_effort"] = model.ModelCfg.ReasoningEffort
 299		}
 300		if openai.IsResponsesModel(model.CatwalkCfg.ID) {
 301			if openai.IsResponsesReasoningModel(model.CatwalkCfg.ID) {
 302				mergedOptions["reasoning_summary"] = "auto"
 303				mergedOptions["include"] = []openai.IncludeType{openai.IncludeReasoningEncryptedContent}
 304			}
 305			parsed, err := openai.ParseResponsesOptions(mergedOptions)
 306			if err == nil {
 307				options[openai.Name] = parsed
 308			}
 309		} else {
 310			parsed, err := openai.ParseOptions(mergedOptions)
 311			if err == nil {
 312				options[openai.Name] = parsed
 313			}
 314		}
 315	case anthropic.Name:
 316		var (
 317			_, hasEffort = mergedOptions["effort"]
 318			_, hasThink  = mergedOptions["thinking"]
 319		)
 320		switch {
 321		case !hasEffort && model.ModelCfg.ReasoningEffort != "":
 322			mergedOptions["effort"] = model.ModelCfg.ReasoningEffort
 323		case !hasThink && model.ModelCfg.Think:
 324			mergedOptions["thinking"] = map[string]any{"budget_tokens": 2000}
 325		}
 326		parsed, err := anthropic.ParseOptions(mergedOptions)
 327		if err == nil {
 328			options[anthropic.Name] = parsed
 329		}
 330
 331	case openrouter.Name:
 332		_, hasReasoning := mergedOptions["reasoning"]
 333		if !hasReasoning && model.ModelCfg.ReasoningEffort != "" {
 334			mergedOptions["reasoning"] = map[string]any{
 335				"enabled": true,
 336				"effort":  model.ModelCfg.ReasoningEffort,
 337			}
 338		}
 339		parsed, err := openrouter.ParseOptions(mergedOptions)
 340		if err == nil {
 341			options[openrouter.Name] = parsed
 342		}
 343	case vercel.Name:
 344		_, hasReasoning := mergedOptions["reasoning"]
 345		if !hasReasoning && model.ModelCfg.ReasoningEffort != "" {
 346			mergedOptions["reasoning"] = map[string]any{
 347				"enabled": true,
 348				"effort":  model.ModelCfg.ReasoningEffort,
 349			}
 350		}
 351		parsed, err := vercel.ParseOptions(mergedOptions)
 352		if err == nil {
 353			options[vercel.Name] = parsed
 354		}
 355	case google.Name:
 356		_, hasReasoning := mergedOptions["thinking_config"]
 357		if !hasReasoning {
 358			if strings.HasPrefix(model.CatwalkCfg.ID, "gemini-2") {
 359				mergedOptions["thinking_config"] = map[string]any{
 360					"thinking_budget":  2000,
 361					"include_thoughts": true,
 362				}
 363			} else {
 364				mergedOptions["thinking_config"] = map[string]any{
 365					"thinking_level":   model.ModelCfg.ReasoningEffort,
 366					"include_thoughts": true,
 367				}
 368			}
 369		}
 370		parsed, err := google.ParseOptions(mergedOptions)
 371		if err == nil {
 372			options[google.Name] = parsed
 373		}
 374	case openaicompat.Name:
 375		_, hasReasoningEffort := mergedOptions["reasoning_effort"]
 376		if !hasReasoningEffort && model.ModelCfg.ReasoningEffort != "" {
 377			mergedOptions["reasoning_effort"] = model.ModelCfg.ReasoningEffort
 378		}
 379		parsed, err := openaicompat.ParseOptions(mergedOptions)
 380		if err == nil {
 381			options[openaicompat.Name] = parsed
 382		}
 383	}
 384
 385	return options
 386}
 387
 388func mergeCallOptions(model Model, cfg config.ProviderConfig) (fantasy.ProviderOptions, *float64, *float64, *int64, *float64, *float64) {
 389	modelOptions := getProviderOptions(model, cfg)
 390	temp := cmp.Or(model.ModelCfg.Temperature, model.CatwalkCfg.Options.Temperature)
 391	topP := cmp.Or(model.ModelCfg.TopP, model.CatwalkCfg.Options.TopP)
 392	topK := cmp.Or(model.ModelCfg.TopK, model.CatwalkCfg.Options.TopK)
 393	freqPenalty := cmp.Or(model.ModelCfg.FrequencyPenalty, model.CatwalkCfg.Options.FrequencyPenalty)
 394	presPenalty := cmp.Or(model.ModelCfg.PresencePenalty, model.CatwalkCfg.Options.PresencePenalty)
 395	return modelOptions, temp, topP, topK, freqPenalty, presPenalty
 396}
 397
 398func (c *coordinator) buildAgent(ctx context.Context, prompt *prompt.Prompt, agent config.Agent, isSubAgent bool) (SessionAgent, error) {
 399	large, small, err := c.buildAgentModels(ctx, isSubAgent)
 400	if err != nil {
 401		return nil, err
 402	}
 403
 404	largeProviderCfg, _ := c.cfg.Config().Providers.Get(large.ModelCfg.Provider)
 405	result := NewSessionAgent(SessionAgentOptions{
 406		LargeModel:           large,
 407		SmallModel:           small,
 408		SystemPromptPrefix:   largeProviderCfg.SystemPromptPrefix,
 409		SystemPrompt:         "",
 410		IsSubAgent:           isSubAgent,
 411		DisableAutoSummarize: c.cfg.Config().Options.DisableAutoSummarize,
 412		IsYolo:               c.permissions.SkipRequests(),
 413		Sessions:             c.sessions,
 414		Messages:             c.messages,
 415		Tools:                nil,
 416		Notify:               c.notify,
 417	})
 418
 419	c.readyWg.Go(func() error {
 420		systemPrompt, err := prompt.Build(ctx, large.Model.Provider(), large.Model.Model(), c.cfg)
 421		if err != nil {
 422			return err
 423		}
 424		result.SetSystemPrompt(systemPrompt)
 425		return nil
 426	})
 427
 428	c.readyWg.Go(func() error {
 429		tools, err := c.buildTools(ctx, agent)
 430		if err != nil {
 431			return err
 432		}
 433		result.SetTools(tools)
 434		return nil
 435	})
 436
 437	return result, nil
 438}
 439
 440func (c *coordinator) buildTools(ctx context.Context, agent config.Agent) ([]fantasy.AgentTool, error) {
 441	var allTools []fantasy.AgentTool
 442	if slices.Contains(agent.AllowedTools, AgentToolName) {
 443		agentTool, err := c.agentTool(ctx)
 444		if err != nil {
 445			return nil, err
 446		}
 447		allTools = append(allTools, agentTool)
 448	}
 449
 450	if slices.Contains(agent.AllowedTools, tools.AgenticFetchToolName) {
 451		agenticFetchTool, err := c.agenticFetchTool(ctx, nil)
 452		if err != nil {
 453			return nil, err
 454		}
 455		allTools = append(allTools, agenticFetchTool)
 456	}
 457
 458	// Get the model name for the agent
 459	modelName := ""
 460	if modelCfg, ok := c.cfg.Config().Models[agent.Model]; ok {
 461		if model := c.cfg.Config().GetModel(modelCfg.Provider, modelCfg.Model); model != nil {
 462			modelName = model.Name
 463		}
 464	}
 465
 466	logFile := filepath.Join(c.cfg.Config().Options.DataDirectory, "logs", "crush.log")
 467
 468	allTools = append(allTools,
 469		tools.NewBashTool(c.permissions, c.cfg.WorkingDir(), c.cfg.Config().Options.Attribution, modelName),
 470		tools.NewCrushInfoTool(c.cfg, c.lspManager, c.allSkills, c.activeSkills, c.skillTracker),
 471		tools.NewCrushLogsTool(logFile),
 472		tools.NewJobOutputTool(),
 473		tools.NewJobKillTool(),
 474		tools.NewDownloadTool(c.permissions, c.cfg.WorkingDir(), nil),
 475		tools.NewEditTool(c.lspManager, c.permissions, c.history, c.filetracker, c.cfg.WorkingDir()),
 476		tools.NewMultiEditTool(c.lspManager, c.permissions, c.history, c.filetracker, c.cfg.WorkingDir()),
 477		tools.NewFetchTool(c.permissions, c.cfg.WorkingDir(), nil),
 478		tools.NewGlobTool(c.cfg.WorkingDir()),
 479		tools.NewGrepTool(c.cfg.WorkingDir(), c.cfg.Config().Tools.Grep),
 480		tools.NewLsTool(c.permissions, c.cfg.WorkingDir(), c.cfg.Config().Tools.Ls),
 481		tools.NewSourcegraphTool(nil),
 482		tools.NewTodosTool(c.sessions),
 483		tools.NewViewTool(c.lspManager, c.permissions, c.filetracker, c.skillTracker, c.cfg.WorkingDir(), c.cfg.Config().Options.SkillsPaths...),
 484		tools.NewWriteTool(c.lspManager, c.permissions, c.history, c.filetracker, c.cfg.WorkingDir()),
 485	)
 486
 487	// Add LSP tools if user has configured LSPs or auto_lsp is enabled (nil or true).
 488	if len(c.cfg.Config().LSP) > 0 || c.cfg.Config().Options.AutoLSP == nil || *c.cfg.Config().Options.AutoLSP {
 489		allTools = append(allTools, tools.NewDiagnosticsTool(c.lspManager), tools.NewReferencesTool(c.lspManager), tools.NewLSPRestartTool(c.lspManager))
 490	}
 491
 492	if len(c.cfg.Config().MCP) > 0 {
 493		allTools = append(
 494			allTools,
 495			tools.NewListMCPResourcesTool(c.cfg, c.permissions),
 496			tools.NewReadMCPResourceTool(c.cfg, c.permissions),
 497		)
 498	}
 499
 500	var filteredTools []fantasy.AgentTool
 501	for _, tool := range allTools {
 502		if slices.Contains(agent.AllowedTools, tool.Info().Name) {
 503			filteredTools = append(filteredTools, tool)
 504		}
 505	}
 506
 507	for _, tool := range tools.GetMCPTools(c.permissions, c.cfg, c.cfg.WorkingDir()) {
 508		if agent.AllowedMCP == nil {
 509			// No MCP restrictions
 510			filteredTools = append(filteredTools, tool)
 511			continue
 512		}
 513		if len(agent.AllowedMCP) == 0 {
 514			// No MCPs allowed
 515			slog.Debug("No MCPs allowed", "tool", tool.Name(), "agent", agent.Name)
 516			break
 517		}
 518
 519		for mcp, tools := range agent.AllowedMCP {
 520			if mcp != tool.MCP() {
 521				continue
 522			}
 523			if len(tools) == 0 || slices.Contains(tools, tool.MCPToolName()) {
 524				filteredTools = append(filteredTools, tool)
 525				break
 526			}
 527			slog.Debug("MCP not allowed", "tool", tool.Name(), "agent", agent.Name)
 528		}
 529	}
 530	slices.SortFunc(filteredTools, func(a, b fantasy.AgentTool) int {
 531		return strings.Compare(a.Info().Name, b.Info().Name)
 532	})
 533	return filteredTools, nil
 534}
 535
 536// TODO: when we support multiple agents we need to change this so that we pass in the agent specific model config
 537func (c *coordinator) buildAgentModels(ctx context.Context, isSubAgent bool) (Model, Model, error) {
 538	largeModelCfg, ok := c.cfg.Config().Models[config.SelectedModelTypeLarge]
 539	if !ok {
 540		return Model{}, Model{}, errLargeModelNotSelected
 541	}
 542	smallModelCfg, ok := c.cfg.Config().Models[config.SelectedModelTypeSmall]
 543	if !ok {
 544		return Model{}, Model{}, errSmallModelNotSelected
 545	}
 546
 547	largeProviderCfg, ok := c.cfg.Config().Providers.Get(largeModelCfg.Provider)
 548	if !ok {
 549		return Model{}, Model{}, errLargeModelProviderNotConfigured
 550	}
 551
 552	largeProvider, err := c.buildProvider(largeProviderCfg, largeModelCfg, isSubAgent)
 553	if err != nil {
 554		return Model{}, Model{}, err
 555	}
 556
 557	smallProviderCfg, ok := c.cfg.Config().Providers.Get(smallModelCfg.Provider)
 558	if !ok {
 559		return Model{}, Model{}, errSmallModelProviderNotConfigured
 560	}
 561
 562	smallProvider, err := c.buildProvider(smallProviderCfg, smallModelCfg, true)
 563	if err != nil {
 564		return Model{}, Model{}, err
 565	}
 566
 567	var largeCatwalkModel *catwalk.Model
 568	var smallCatwalkModel *catwalk.Model
 569
 570	for _, m := range largeProviderCfg.Models {
 571		if m.ID == largeModelCfg.Model {
 572			largeCatwalkModel = &m
 573		}
 574	}
 575	for _, m := range smallProviderCfg.Models {
 576		if m.ID == smallModelCfg.Model {
 577			smallCatwalkModel = &m
 578		}
 579	}
 580
 581	if largeCatwalkModel == nil {
 582		return Model{}, Model{}, errLargeModelNotFound
 583	}
 584
 585	if smallCatwalkModel == nil {
 586		return Model{}, Model{}, errSmallModelNotFound
 587	}
 588
 589	largeModelID := largeModelCfg.Model
 590	smallModelID := smallModelCfg.Model
 591
 592	if largeModelCfg.Provider == openrouter.Name && isExactoSupported(largeModelID) {
 593		largeModelID += ":exacto"
 594	}
 595
 596	if smallModelCfg.Provider == openrouter.Name && isExactoSupported(smallModelID) {
 597		smallModelID += ":exacto"
 598	}
 599
 600	largeModel, err := largeProvider.LanguageModel(ctx, largeModelID)
 601	if err != nil {
 602		return Model{}, Model{}, err
 603	}
 604	smallModel, err := smallProvider.LanguageModel(ctx, smallModelID)
 605	if err != nil {
 606		return Model{}, Model{}, err
 607	}
 608
 609	return Model{
 610			Model:      largeModel,
 611			CatwalkCfg: *largeCatwalkModel,
 612			ModelCfg:   largeModelCfg,
 613		}, Model{
 614			Model:      smallModel,
 615			CatwalkCfg: *smallCatwalkModel,
 616			ModelCfg:   smallModelCfg,
 617		}, nil
 618}
 619
 620func (c *coordinator) buildAnthropicProvider(baseURL, apiKey string, headers map[string]string, providerID string) (fantasy.Provider, error) {
 621	var opts []anthropic.Option
 622
 623	switch {
 624	case strings.HasPrefix(apiKey, "Bearer "):
 625		// NOTE: Prevent the SDK from picking up the API key from env.
 626		os.Setenv("ANTHROPIC_API_KEY", "")
 627		headers["Authorization"] = apiKey
 628	case providerID == string(catwalk.InferenceProviderMiniMax) || providerID == string(catwalk.InferenceProviderMiniMaxChina):
 629		// NOTE: Prevent the SDK from picking up the API key from env.
 630		os.Setenv("ANTHROPIC_API_KEY", "")
 631		headers["Authorization"] = "Bearer " + apiKey
 632	case apiKey != "":
 633		// X-Api-Key header
 634		opts = append(opts, anthropic.WithAPIKey(apiKey))
 635	}
 636
 637	if len(headers) > 0 {
 638		opts = append(opts, anthropic.WithHeaders(headers))
 639	}
 640
 641	if baseURL != "" {
 642		opts = append(opts, anthropic.WithBaseURL(baseURL))
 643	}
 644
 645	if c.cfg.Config().Options.Debug {
 646		httpClient := log.NewHTTPClient()
 647		opts = append(opts, anthropic.WithHTTPClient(httpClient))
 648	}
 649	return anthropic.New(opts...)
 650}
 651
 652func (c *coordinator) buildOpenaiProvider(baseURL, apiKey string, headers map[string]string) (fantasy.Provider, error) {
 653	opts := []openai.Option{
 654		openai.WithAPIKey(apiKey),
 655		openai.WithUseResponsesAPI(),
 656	}
 657	if c.cfg.Config().Options.Debug {
 658		httpClient := log.NewHTTPClient()
 659		opts = append(opts, openai.WithHTTPClient(httpClient))
 660	}
 661	if len(headers) > 0 {
 662		opts = append(opts, openai.WithHeaders(headers))
 663	}
 664	if baseURL != "" {
 665		opts = append(opts, openai.WithBaseURL(baseURL))
 666	}
 667	return openai.New(opts...)
 668}
 669
 670func (c *coordinator) buildOpenrouterProvider(_, apiKey string, headers map[string]string) (fantasy.Provider, error) {
 671	opts := []openrouter.Option{
 672		openrouter.WithAPIKey(apiKey),
 673	}
 674	if c.cfg.Config().Options.Debug {
 675		httpClient := log.NewHTTPClient()
 676		opts = append(opts, openrouter.WithHTTPClient(httpClient))
 677	}
 678	if len(headers) > 0 {
 679		opts = append(opts, openrouter.WithHeaders(headers))
 680	}
 681	return openrouter.New(opts...)
 682}
 683
 684func (c *coordinator) buildVercelProvider(_, apiKey string, headers map[string]string) (fantasy.Provider, error) {
 685	opts := []vercel.Option{
 686		vercel.WithAPIKey(apiKey),
 687	}
 688	if c.cfg.Config().Options.Debug {
 689		httpClient := log.NewHTTPClient()
 690		opts = append(opts, vercel.WithHTTPClient(httpClient))
 691	}
 692	if len(headers) > 0 {
 693		opts = append(opts, vercel.WithHeaders(headers))
 694	}
 695	return vercel.New(opts...)
 696}
 697
 698func (c *coordinator) buildOpenaiCompatProvider(baseURL, apiKey string, headers map[string]string, extraBody map[string]any, providerID string, isSubAgent bool) (fantasy.Provider, error) {
 699	opts := []openaicompat.Option{
 700		openaicompat.WithBaseURL(baseURL),
 701		openaicompat.WithAPIKey(apiKey),
 702	}
 703
 704	// Set HTTP client based on provider and debug mode.
 705	var httpClient *http.Client
 706	if providerID == string(catwalk.InferenceProviderCopilot) {
 707		opts = append(opts, openaicompat.WithUseResponsesAPI())
 708		httpClient = copilot.NewClient(isSubAgent, c.cfg.Config().Options.Debug)
 709	} else if c.cfg.Config().Options.Debug {
 710		httpClient = log.NewHTTPClient()
 711	}
 712	if httpClient != nil {
 713		opts = append(opts, openaicompat.WithHTTPClient(httpClient))
 714	}
 715
 716	if len(headers) > 0 {
 717		opts = append(opts, openaicompat.WithHeaders(headers))
 718	}
 719
 720	for extraKey, extraValue := range extraBody {
 721		opts = append(opts, openaicompat.WithSDKOptions(openaisdk.WithJSONSet(extraKey, extraValue)))
 722	}
 723
 724	return openaicompat.New(opts...)
 725}
 726
 727func (c *coordinator) buildAzureProvider(baseURL, apiKey string, headers map[string]string, options map[string]string) (fantasy.Provider, error) {
 728	opts := []azure.Option{
 729		azure.WithBaseURL(baseURL),
 730		azure.WithAPIKey(apiKey),
 731		azure.WithUseResponsesAPI(),
 732	}
 733	if c.cfg.Config().Options.Debug {
 734		httpClient := log.NewHTTPClient()
 735		opts = append(opts, azure.WithHTTPClient(httpClient))
 736	}
 737	if options == nil {
 738		options = make(map[string]string)
 739	}
 740	if apiVersion, ok := options["apiVersion"]; ok {
 741		opts = append(opts, azure.WithAPIVersion(apiVersion))
 742	}
 743	if len(headers) > 0 {
 744		opts = append(opts, azure.WithHeaders(headers))
 745	}
 746
 747	return azure.New(opts...)
 748}
 749
 750func (c *coordinator) buildBedrockProvider(apiKey string, headers map[string]string) (fantasy.Provider, error) {
 751	var opts []bedrock.Option
 752	if c.cfg.Config().Options.Debug {
 753		httpClient := log.NewHTTPClient()
 754		opts = append(opts, bedrock.WithHTTPClient(httpClient))
 755	}
 756	if len(headers) > 0 {
 757		opts = append(opts, bedrock.WithHeaders(headers))
 758	}
 759	switch {
 760	case apiKey != "":
 761		opts = append(opts, bedrock.WithAPIKey(apiKey))
 762	case os.Getenv("AWS_BEARER_TOKEN_BEDROCK") != "":
 763		opts = append(opts, bedrock.WithAPIKey(os.Getenv("AWS_BEARER_TOKEN_BEDROCK")))
 764	default:
 765		// Skip, let the SDK do authentication.
 766	}
 767	return bedrock.New(opts...)
 768}
 769
 770func (c *coordinator) buildGoogleProvider(baseURL, apiKey string, headers map[string]string) (fantasy.Provider, error) {
 771	opts := []google.Option{
 772		google.WithBaseURL(baseURL),
 773		google.WithGeminiAPIKey(apiKey),
 774	}
 775	if c.cfg.Config().Options.Debug {
 776		httpClient := log.NewHTTPClient()
 777		opts = append(opts, google.WithHTTPClient(httpClient))
 778	}
 779	if len(headers) > 0 {
 780		opts = append(opts, google.WithHeaders(headers))
 781	}
 782	return google.New(opts...)
 783}
 784
 785func (c *coordinator) buildGoogleVertexProvider(headers map[string]string, options map[string]string) (fantasy.Provider, error) {
 786	opts := []google.Option{}
 787	if c.cfg.Config().Options.Debug {
 788		httpClient := log.NewHTTPClient()
 789		opts = append(opts, google.WithHTTPClient(httpClient))
 790	}
 791	if len(headers) > 0 {
 792		opts = append(opts, google.WithHeaders(headers))
 793	}
 794
 795	project := options["project"]
 796	location := options["location"]
 797
 798	opts = append(opts, google.WithVertex(project, location))
 799
 800	return google.New(opts...)
 801}
 802
 803func (c *coordinator) isAnthropicThinking(model config.SelectedModel) bool {
 804	if model.Think {
 805		return true
 806	}
 807	opts, err := anthropic.ParseOptions(model.ProviderOptions)
 808	return err == nil && opts.Thinking != nil
 809}
 810
 811func (c *coordinator) buildProvider(providerCfg config.ProviderConfig, model config.SelectedModel, isSubAgent bool) (fantasy.Provider, error) {
 812	headers := maps.Clone(providerCfg.ExtraHeaders)
 813	if headers == nil {
 814		headers = make(map[string]string)
 815	}
 816
 817	// handle special headers for anthropic
 818	if providerCfg.Type == anthropic.Name && c.isAnthropicThinking(model) {
 819		if v, ok := headers["anthropic-beta"]; ok {
 820			headers["anthropic-beta"] = v + ",interleaved-thinking-2025-05-14"
 821		} else {
 822			headers["anthropic-beta"] = "interleaved-thinking-2025-05-14"
 823		}
 824	}
 825
 826	apiKey, _ := c.cfg.Resolve(providerCfg.APIKey)
 827	baseURL, _ := c.cfg.Resolve(providerCfg.BaseURL)
 828
 829	switch providerCfg.Type {
 830	case openai.Name:
 831		return c.buildOpenaiProvider(baseURL, apiKey, headers)
 832	case anthropic.Name:
 833		return c.buildAnthropicProvider(baseURL, apiKey, headers, providerCfg.ID)
 834	case openrouter.Name:
 835		return c.buildOpenrouterProvider(baseURL, apiKey, headers)
 836	case vercel.Name:
 837		return c.buildVercelProvider(baseURL, apiKey, headers)
 838	case azure.Name:
 839		return c.buildAzureProvider(baseURL, apiKey, headers, providerCfg.ExtraParams)
 840	case bedrock.Name:
 841		return c.buildBedrockProvider(apiKey, headers)
 842	case google.Name:
 843		return c.buildGoogleProvider(baseURL, apiKey, headers)
 844	case "google-vertex":
 845		return c.buildGoogleVertexProvider(headers, providerCfg.ExtraParams)
 846	case openaicompat.Name, hyper.Name:
 847		switch providerCfg.ID {
 848		case hyper.Name:
 849			baseURL = hyper.BaseURL() + "/v1"
 850			headers["x-crush-id"] = event.GetID()
 851		case string(catwalk.InferenceProviderZAI):
 852			if providerCfg.ExtraBody == nil {
 853				providerCfg.ExtraBody = map[string]any{}
 854			}
 855			providerCfg.ExtraBody["tool_stream"] = true
 856		}
 857		return c.buildOpenaiCompatProvider(baseURL, apiKey, headers, providerCfg.ExtraBody, providerCfg.ID, isSubAgent)
 858	default:
 859		return nil, fmt.Errorf("provider type not supported: %q", providerCfg.Type)
 860	}
 861}
 862
 863func isExactoSupported(modelID string) bool {
 864	supportedModels := []string{
 865		"moonshotai/kimi-k2-0905",
 866		"deepseek/deepseek-v3.1-terminus",
 867		"z-ai/glm-4.6",
 868		"openai/gpt-oss-120b",
 869		"qwen/qwen3-coder",
 870	}
 871	return slices.Contains(supportedModels, modelID)
 872}
 873
 874func (c *coordinator) Cancel(sessionID string) {
 875	c.currentAgent.Cancel(sessionID)
 876}
 877
 878func (c *coordinator) CancelAll() {
 879	c.currentAgent.CancelAll()
 880}
 881
 882func (c *coordinator) ClearQueue(sessionID string) {
 883	c.currentAgent.ClearQueue(sessionID)
 884}
 885
 886func (c *coordinator) IsBusy() bool {
 887	return c.currentAgent.IsBusy()
 888}
 889
 890func (c *coordinator) IsSessionBusy(sessionID string) bool {
 891	return c.currentAgent.IsSessionBusy(sessionID)
 892}
 893
 894func (c *coordinator) Model() Model {
 895	return c.currentAgent.Model()
 896}
 897
 898func (c *coordinator) UpdateModels(ctx context.Context) error {
 899	// build the models again so we make sure we get the latest config
 900	large, small, err := c.buildAgentModels(ctx, false)
 901	if err != nil {
 902		return err
 903	}
 904	c.currentAgent.SetModels(large, small)
 905
 906	agentCfg, ok := c.cfg.Config().Agents[config.AgentCoder]
 907	if !ok {
 908		return errCoderAgentNotConfigured
 909	}
 910
 911	tools, err := c.buildTools(ctx, agentCfg)
 912	if err != nil {
 913		return err
 914	}
 915	c.currentAgent.SetTools(tools)
 916	return nil
 917}
 918
 919func (c *coordinator) QueuedPrompts(sessionID string) int {
 920	return c.currentAgent.QueuedPrompts(sessionID)
 921}
 922
 923func (c *coordinator) QueuedPromptsList(sessionID string) []string {
 924	return c.currentAgent.QueuedPromptsList(sessionID)
 925}
 926
 927func (c *coordinator) Summarize(ctx context.Context, sessionID string) error {
 928	providerCfg, ok := c.cfg.Config().Providers.Get(c.currentAgent.Model().ModelCfg.Provider)
 929	if !ok {
 930		return errModelProviderNotConfigured
 931	}
 932	return c.currentAgent.Summarize(ctx, sessionID, getProviderOptions(c.currentAgent.Model(), providerCfg))
 933}
 934
 935func (c *coordinator) isUnauthorized(err error) bool {
 936	var providerErr *fantasy.ProviderError
 937	return errors.As(err, &providerErr) && providerErr.StatusCode == http.StatusUnauthorized
 938}
 939
 940func (c *coordinator) refreshOAuth2Token(ctx context.Context, providerCfg config.ProviderConfig) error {
 941	if err := c.cfg.RefreshOAuthToken(ctx, config.ScopeGlobal, providerCfg.ID); err != nil {
 942		slog.Error("Failed to refresh OAuth token after 401 error", "provider", providerCfg.ID, "error", err)
 943		return err
 944	}
 945	if err := c.UpdateModels(ctx); err != nil {
 946		return err
 947	}
 948	return nil
 949}
 950
 951func (c *coordinator) refreshApiKeyTemplate(ctx context.Context, providerCfg config.ProviderConfig) error {
 952	newAPIKey, err := c.cfg.Resolve(providerCfg.APIKeyTemplate)
 953	if err != nil {
 954		slog.Error("Failed to re-resolve API key after 401 error", "provider", providerCfg.ID, "error", err)
 955		return err
 956	}
 957
 958	providerCfg.APIKey = newAPIKey
 959	c.cfg.Config().Providers.Set(providerCfg.ID, providerCfg)
 960
 961	if err := c.UpdateModels(ctx); err != nil {
 962		return err
 963	}
 964	return nil
 965}
 966
 967// subAgentParams holds the parameters for running a sub-agent.
 968type subAgentParams struct {
 969	Agent          SessionAgent
 970	SessionID      string
 971	AgentMessageID string
 972	ToolCallID     string
 973	Prompt         string
 974	SessionTitle   string
 975	// SessionSetup is an optional callback invoked after session creation
 976	// but before agent execution, for custom session configuration.
 977	SessionSetup func(sessionID string)
 978}
 979
 980// runSubAgent runs a sub-agent and handles session management and cost accumulation.
 981// It creates a sub-session, runs the agent with the given prompt, and propagates
 982// the cost to the parent session.
 983func (c *coordinator) runSubAgent(ctx context.Context, params subAgentParams) (fantasy.ToolResponse, error) {
 984	// Create sub-session
 985	agentToolSessionID := c.sessions.CreateAgentToolSessionID(params.AgentMessageID, params.ToolCallID)
 986	session, err := c.sessions.CreateTaskSession(ctx, agentToolSessionID, params.SessionID, params.SessionTitle)
 987	if err != nil {
 988		return fantasy.ToolResponse{}, fmt.Errorf("create session: %w", err)
 989	}
 990
 991	// Call session setup function if provided
 992	if params.SessionSetup != nil {
 993		params.SessionSetup(session.ID)
 994	}
 995
 996	// Get model configuration
 997	model := params.Agent.Model()
 998	maxTokens := model.CatwalkCfg.DefaultMaxTokens
 999	if model.ModelCfg.MaxTokens != 0 {
1000		maxTokens = model.ModelCfg.MaxTokens
1001	}
1002
1003	providerCfg, ok := c.cfg.Config().Providers.Get(model.ModelCfg.Provider)
1004	if !ok {
1005		return fantasy.ToolResponse{}, errModelProviderNotConfigured
1006	}
1007
1008	// Run the agent
1009	result, err := params.Agent.Run(ctx, SessionAgentCall{
1010		SessionID:        session.ID,
1011		Prompt:           params.Prompt,
1012		MaxOutputTokens:  maxTokens,
1013		ProviderOptions:  getProviderOptions(model, providerCfg),
1014		Temperature:      model.ModelCfg.Temperature,
1015		TopP:             model.ModelCfg.TopP,
1016		TopK:             model.ModelCfg.TopK,
1017		FrequencyPenalty: model.ModelCfg.FrequencyPenalty,
1018		PresencePenalty:  model.ModelCfg.PresencePenalty,
1019		NonInteractive:   true,
1020	})
1021	if err != nil {
1022		return fantasy.NewTextErrorResponse("error generating response"), nil
1023	}
1024
1025	// Update parent session cost
1026	if err := c.updateParentSessionCost(ctx, session.ID, params.SessionID); err != nil {
1027		return fantasy.ToolResponse{}, err
1028	}
1029
1030	return fantasy.NewTextResponse(result.Response.Content.Text()), nil
1031}
1032
1033// updateParentSessionCost accumulates the cost from a child session to its parent session.
1034func (c *coordinator) updateParentSessionCost(ctx context.Context, childSessionID, parentSessionID string) error {
1035	childSession, err := c.sessions.Get(ctx, childSessionID)
1036	if err != nil {
1037		return fmt.Errorf("get child session: %w", err)
1038	}
1039
1040	parentSession, err := c.sessions.Get(ctx, parentSessionID)
1041	if err != nil {
1042		return fmt.Errorf("get parent session: %w", err)
1043	}
1044
1045	parentSession.Cost += childSession.Cost
1046
1047	if _, err := c.sessions.Save(ctx, parentSession); err != nil {
1048		return fmt.Errorf("save parent session: %w", err)
1049	}
1050
1051	return nil
1052}
1053
1054// discoverSkills runs the skill discovery pipeline and returns both the
1055// pre-filter (all discovered, after dedup) and post-filter (active) lists.
1056// It also emits a single diagnostic log line summarising the outcome to
1057// help track skill-loading health over time.
1058func discoverSkills(cfg *config.ConfigStore) (allSkills, activeSkills []*skills.Skill) {
1059	builtin, builtinStates := skills.DiscoverBuiltinWithStates()
1060	discovered := append([]*skills.Skill(nil), builtin...)
1061
1062	var userStates []*skills.SkillState
1063	var userPaths []string
1064
1065	opts := cfg.Config().Options
1066	if opts != nil && len(opts.SkillsPaths) > 0 {
1067		userPaths = make([]string, 0, len(opts.SkillsPaths))
1068		for _, pth := range opts.SkillsPaths {
1069			expanded := home.Long(pth)
1070			if strings.HasPrefix(expanded, "$") {
1071				if resolved, err := cfg.Resolver().ResolveValue(expanded); err == nil {
1072					expanded = resolved
1073				}
1074			}
1075			userPaths = append(userPaths, expanded)
1076		}
1077		var userSkills []*skills.Skill
1078		userSkills, userStates = skills.DiscoverWithStates(userPaths)
1079		discovered = append(discovered, userSkills...)
1080	}
1081
1082	allSkills = skills.Deduplicate(discovered)
1083	var disabledSkills []string
1084	if opts != nil {
1085		disabledSkills = opts.DisabledSkills
1086	}
1087	activeSkills = skills.Filter(allSkills, disabledSkills)
1088
1089	logDiscoveryStats(builtin, builtinStates, userStates, userPaths, allSkills, activeSkills, disabledSkills)
1090	return allSkills, activeSkills
1091}
1092
1093// logTurnSkillUsage emits a per-turn diagnostic line showing which skills
1094// (if any) were loaded during this turn and which looked relevant based on
1095// a cheap keyword match against the user prompt. The goal is to surface
1096// "should-have-loaded but didn't" situations for later analysis.
1097//
1098// Logged at Info level under component=skills; heavy fields are elided when
1099// there is nothing interesting to report.
1100func logTurnSkillUsage(
1101	sessionID string,
1102	prompt string,
1103	activeSkills []*skills.Skill,
1104	tracker *skills.Tracker,
1105	before []string,
1106) {
1107	if tracker == nil || len(activeSkills) == 0 {
1108		return
1109	}
1110
1111	after := tracker.LoadedNames()
1112
1113	beforeSet := make(map[string]bool, len(before))
1114	for _, n := range before {
1115		beforeSet[n] = true
1116	}
1117	var loadedThisTurn []string
1118	for _, n := range after {
1119		if !beforeSet[n] {
1120			loadedThisTurn = append(loadedThisTurn, n)
1121		}
1122	}
1123
1124	slog.Info("Skill turn summary",
1125		"component", "skills",
1126		"session_id", sessionID,
1127		"prompt_len", len(prompt),
1128		"active_total", len(activeSkills),
1129		"loaded_total", len(after),
1130		"loaded_this_turn", loadedThisTurn,
1131	)
1132}
1133
1134// logDiscoveryStats emits a single structured log line summarising skill
1135// discovery for the current session. It is intentionally low-volume: one
1136// line per session start.
1137func logDiscoveryStats(
1138	builtin []*skills.Skill,
1139	builtinStates, userStates []*skills.SkillState,
1140	userPaths []string,
1141	allSkills, activeSkills []*skills.Skill,
1142	disabled []string,
1143) {
1144	countErrors := func(states []*skills.SkillState) int {
1145		n := 0
1146		for _, s := range states {
1147			if s.State == skills.StateError {
1148				n++
1149			}
1150		}
1151		return n
1152	}
1153
1154	userOK := 0
1155	for _, s := range userStates {
1156		if s.State == skills.StateNormal {
1157			userOK++
1158		}
1159	}
1160
1161	activeNames := make([]string, 0, len(activeSkills))
1162	for _, s := range activeSkills {
1163		activeNames = append(activeNames, s.Name)
1164	}
1165
1166	xml := skills.ToPromptXML(activeSkills)
1167
1168	slog.Info("Skill discovery complete",
1169		"component", "skills",
1170		"builtin_ok", len(builtin),
1171		"builtin_errors", countErrors(builtinStates),
1172		"user_ok", userOK,
1173		"user_errors", countErrors(userStates),
1174		"user_paths", len(userPaths),
1175		"deduped_total", len(allSkills),
1176		"active", len(activeSkills),
1177		"disabled", len(disabled),
1178		"prompt_bytes", len(xml),
1179		"prompt_tok_est", skills.ApproxTokenCount(xml),
1180		"active_names", activeNames,
1181	)
1182}