coordinator.go

   1package agent
   2
   3import (
   4	"bytes"
   5	"cmp"
   6	"context"
   7	"encoding/json"
   8	"errors"
   9	"fmt"
  10	"io"
  11	"log/slog"
  12	"maps"
  13	"net/http"
  14	"os"
  15	"path/filepath"
  16	"slices"
  17	"strings"
  18
  19	"charm.land/catwalk/pkg/catwalk"
  20	"charm.land/fantasy"
  21	"github.com/charmbracelet/crush/internal/agent/hyper"
  22	"github.com/charmbracelet/crush/internal/agent/notify"
  23	"github.com/charmbracelet/crush/internal/agent/prompt"
  24	"github.com/charmbracelet/crush/internal/agent/tools"
  25	"github.com/charmbracelet/crush/internal/config"
  26	"github.com/charmbracelet/crush/internal/event"
  27	"github.com/charmbracelet/crush/internal/filetracker"
  28	"github.com/charmbracelet/crush/internal/history"
  29	"github.com/charmbracelet/crush/internal/hooks"
  30	"github.com/charmbracelet/crush/internal/log"
  31	"github.com/charmbracelet/crush/internal/lsp"
  32	"github.com/charmbracelet/crush/internal/message"
  33	"github.com/charmbracelet/crush/internal/oauth/copilot"
  34	"github.com/charmbracelet/crush/internal/permission"
  35	"github.com/charmbracelet/crush/internal/pubsub"
  36	"github.com/charmbracelet/crush/internal/session"
  37	"github.com/charmbracelet/crush/internal/skills"
  38	"golang.org/x/sync/errgroup"
  39
  40	"charm.land/fantasy/providers/anthropic"
  41	"charm.land/fantasy/providers/azure"
  42	"charm.land/fantasy/providers/bedrock"
  43	"charm.land/fantasy/providers/google"
  44	"charm.land/fantasy/providers/openai"
  45	"charm.land/fantasy/providers/openaicompat"
  46	"charm.land/fantasy/providers/openrouter"
  47	"charm.land/fantasy/providers/vercel"
  48	openaisdk "github.com/charmbracelet/openai-go/option"
  49	"github.com/qjebbs/go-jsons"
  50)
  51
  52// Coordinator errors.
  53var (
  54	errCoderAgentNotConfigured         = errors.New("coder agent not configured")
  55	errModelProviderNotConfigured      = errors.New("model provider not configured")
  56	errLargeModelNotSelected           = errors.New("large model not selected")
  57	errSmallModelNotSelected           = errors.New("small model not selected")
  58	errLargeModelProviderNotConfigured = errors.New("large model provider not configured")
  59	errSmallModelProviderNotConfigured = errors.New("small model provider not configured")
  60	errLargeModelNotFound              = errors.New("large model not found in provider config")
  61	errSmallModelNotFound              = errors.New("small model not found in provider config")
  62)
  63
  64// Copilot models that use the Responses API instead of Chat Completions.
  65var copilotResponsesModels = map[string]bool{
  66	"gpt-5.2":       true,
  67	"gpt-5.2-codex": true,
  68	"gpt-5.3-codex": true,
  69	"gpt-5.4":       true,
  70	"gpt-5.4-mini":  true,
  71	"gpt-5.5":       true,
  72	"gpt-5-mini":    true,
  73}
  74
  75// OpenCode models that user Anthropic Messages API instead of Chat Completions.
  76var opencodeMessagesModels = map[string]bool{
  77	"qwen3.7-max": true,
  78}
  79
  80type Coordinator interface {
  81	// INFO: (kujtim) this is not used yet we will use this when we have multiple agents
  82	// SetMainAgent(string)
  83	Run(ctx context.Context, sessionID, prompt string, attachments ...message.Attachment) (*fantasy.AgentResult, error)
  84	// RunAccepted runs a call that was already accepted via
  85	// BeginAccepted on the fire-and-forget dispatch path. The handle is
  86	// the only carrier of accept-state across the backend.runAgent /
  87	// Coordinator / sessionAgent.Run layers: it reaches
  88	// sessionAgent.Run as SessionAgentCall.Accepted, where it is
  89	// consumed under dispatchMu once the accepted -> (cancel-on-entry |
  90	// queued | active) transition is chosen.
  91	RunAccepted(ctx context.Context, accept *AcceptedRun, sessionID, prompt string, attachments ...message.Attachment) (*fantasy.AgentResult, error)
  92	BeginAccepted(sessionID string) *AcceptedRun
  93	Cancel(sessionID string)
  94	CancelAll()
  95	IsSessionBusy(sessionID string) bool
  96	IsBusy() bool
  97	QueuedPrompts(sessionID string) int
  98	QueuedPromptsList(sessionID string) []string
  99	ClearQueue(sessionID string)
 100	Summarize(context.Context, string) error
 101	Model() Model
 102	UpdateModels(ctx context.Context) error
 103}
 104
 105type coordinator struct {
 106	cfg         *config.ConfigStore
 107	sessions    session.Service
 108	messages    message.Service
 109	permissions permission.Service
 110	history     history.Service
 111	filetracker filetracker.Service
 112	lspManager  *lsp.Manager
 113	notify      pubsub.Publisher[notify.Notification]
 114	runComplete pubsub.Publisher[notify.RunComplete]
 115
 116	currentAgent SessionAgent
 117	agents       map[string]SessionAgent
 118
 119	// Skills discovery results (session-start snapshot).
 120	allSkills    []*skills.Skill // Pre-filter: all discovered after dedup.
 121	activeSkills []*skills.Skill // Post-filter: active skills only.
 122	skillTracker *skills.Tracker
 123
 124	readyWg errgroup.Group
 125}
 126
 127func NewCoordinator(
 128	ctx context.Context,
 129	cfg *config.ConfigStore,
 130	sessions session.Service,
 131	messages message.Service,
 132	permissions permission.Service,
 133	history history.Service,
 134	filetracker filetracker.Service,
 135	lspManager *lsp.Manager,
 136	notify pubsub.Publisher[notify.Notification],
 137	runComplete pubsub.Publisher[notify.RunComplete],
 138	skillsMgr *skills.Manager,
 139) (Coordinator, error) {
 140	// Skills are pre-discovered by the caller (see app.New /
 141	// backend.CreateWorkspace) and passed in via the manager. If no
 142	// manager was provided (legacy callers), fall back to an in-line
 143	// discovery so the coordinator still works.
 144	var allSkills, activeSkills []*skills.Skill
 145	if skillsMgr != nil {
 146		allSkills = skillsMgr.AllSkills()
 147		activeSkills = skillsMgr.ActiveSkills()
 148	} else {
 149		allSkills, activeSkills = discoverSkills(cfg)
 150	}
 151	skillTracker := skills.NewTracker(activeSkills)
 152
 153	c := &coordinator{
 154		cfg:          cfg,
 155		sessions:     sessions,
 156		messages:     messages,
 157		permissions:  permissions,
 158		history:      history,
 159		filetracker:  filetracker,
 160		lspManager:   lspManager,
 161		notify:       notify,
 162		runComplete:  runComplete,
 163		agents:       make(map[string]SessionAgent),
 164		allSkills:    allSkills,
 165		activeSkills: activeSkills,
 166		skillTracker: skillTracker,
 167	}
 168
 169	agentCfg, ok := cfg.Config().Agents[config.AgentCoder]
 170	if !ok {
 171		return nil, errCoderAgentNotConfigured
 172	}
 173
 174	// TODO: make this dynamic when we support multiple agents
 175	prompt, err := coderPrompt(prompt.WithWorkingDir(c.cfg.WorkingDir()))
 176	if err != nil {
 177		return nil, err
 178	}
 179
 180	agent, err := c.buildAgent(ctx, prompt, agentCfg, false)
 181	if err != nil {
 182		return nil, err
 183	}
 184	c.currentAgent = agent
 185	c.agents[config.AgentCoder] = agent
 186	return c, nil
 187}
 188
 189// Run implements Coordinator.
 190func (c *coordinator) Run(ctx context.Context, sessionID string, prompt string, attachments ...message.Attachment) (*fantasy.AgentResult, error) {
 191	return c.run(ctx, nil, sessionID, prompt, attachments...)
 192}
 193
 194// RunAccepted implements Coordinator.
 195func (c *coordinator) RunAccepted(ctx context.Context, accept *AcceptedRun, sessionID string, prompt string, attachments ...message.Attachment) (*fantasy.AgentResult, error) {
 196	return c.run(ctx, accept, sessionID, prompt, attachments...)
 197}
 198
 199// run is the shared implementation behind Run and RunAccepted. When
 200// accept is non-nil it is threaded onto the SessionAgentCall as
 201// Accepted so sessionAgent.Run can consume the accept reservation under
 202// dispatchMu; when nil (the in-process/local path) no accept tracking
 203// applies.
 204func (c *coordinator) run(ctx context.Context, accept *AcceptedRun, sessionID string, prompt string, attachments ...message.Attachment) (*fantasy.AgentResult, error) {
 205	if err := c.readyWg.Wait(); err != nil {
 206		return nil, err
 207	}
 208
 209	// refresh models before each run
 210	if err := c.UpdateModels(ctx); err != nil {
 211		return nil, fmt.Errorf("failed to update models: %w", err)
 212	}
 213
 214	model := c.currentAgent.Model()
 215	maxTokens := model.CatwalkCfg.DefaultMaxTokens
 216	if model.ModelCfg.MaxTokens != 0 {
 217		maxTokens = model.ModelCfg.MaxTokens
 218	}
 219
 220	if !model.CatwalkCfg.SupportsImages && attachments != nil {
 221		// filter out image attachments
 222		filteredAttachments := make([]message.Attachment, 0, len(attachments))
 223		for _, att := range attachments {
 224			if att.IsText() {
 225				filteredAttachments = append(filteredAttachments, att)
 226			}
 227		}
 228		attachments = filteredAttachments
 229	}
 230
 231	providerCfg, ok := c.cfg.Config().Providers.Get(model.ModelCfg.Provider)
 232	if !ok {
 233		return nil, errModelProviderNotConfigured
 234	}
 235
 236	mergedOptions, temp, topP, topK, freqPenalty, presPenalty := mergeCallOptions(model, providerCfg)
 237
 238	if err := c.refreshTokenIfExpired(ctx, providerCfg); err != nil {
 239		// NOTE(@andreynering): We don't return here because the event handling to ask the user to reauthenticate
 240		// depends on the flow below. If refresh fails, proceed with the token we have.
 241		slog.Error("Failed to refresh OAuth2 token. Proceeding with existing token.", "error", err)
 242	}
 243
 244	// Coalesce per-attempt RunComplete payloads so only the final
 245	// outcome reaches subscribers. Without this, the first attempt's
 246	// failed RunComplete (unauthorized) would race ahead of the
 247	// retry's success, and `crush run` would exit on the stale error
 248	// before ever seeing the retry result. Each attempt's
 249	// SessionAgentCall.OnComplete hook overwrites latest; we publish
 250	// exactly once after retries resolve, via PublishMustDeliver, so
 251	// a momentarily-full subscriber buffer can't silently drop the
 252	// terminal event.
 253	var (
 254		latest    notify.RunComplete
 255		hasLatest bool
 256	)
 257	onComplete := func(rc notify.RunComplete) {
 258		latest = rc
 259		hasLatest = true
 260	}
 261	// Propagate the caller-supplied RunID (set via agent.WithRunID
 262	// at the HTTP boundary in backend.SendMessage) onto the
 263	// SessionAgentCall so the terminal RunComplete event echoes it
 264	// back. Both attempts in the retry chain reuse the same RunID;
 265	// the coalesce closure publishes the final outcome under that
 266	// same correlator.
 267	runID := RunIDFromContext(ctx)
 268	run := func() (*fantasy.AgentResult, error) {
 269		return c.currentAgent.Run(ctx, SessionAgentCall{
 270			SessionID:        sessionID,
 271			RunID:            runID,
 272			Prompt:           prompt,
 273			Attachments:      attachments,
 274			MaxOutputTokens:  maxTokens,
 275			ProviderOptions:  mergedOptions,
 276			Temperature:      temp,
 277			TopP:             topP,
 278			TopK:             topK,
 279			FrequencyPenalty: freqPenalty,
 280			PresencePenalty:  presPenalty,
 281			OnComplete:       onComplete,
 282			Accepted:         accept,
 283		})
 284	}
 285	beforeLoaded := c.skillTracker.LoadedNames()
 286	var result *fantasy.AgentResult
 287	originalErr := c.runWithUnauthorizedRetry(ctx, providerCfg, func() error {
 288		var err error
 289		result, err = run()
 290		return err
 291	})
 292	logTurnSkillUsage(sessionID, prompt, c.activeSkills, c.skillTracker, beforeLoaded)
 293
 294	// Notify only if still unauthorized after retry — a successful
 295	// retry means the user doesn't need to re-authenticate.
 296	if originalErr != nil && c.isUnauthorized(originalErr) && c.notify != nil && model.ModelCfg.Provider == hyper.Name {
 297		c.notify.Publish(pubsub.CreatedEvent, notify.Notification{
 298			Type:       notify.TypeReAuthenticate,
 299			ProviderID: model.ModelCfg.Provider,
 300		})
 301	}
 302
 303	if hasLatest && c.runComplete != nil {
 304		c.runComplete.PublishMustDeliver(ctx, pubsub.UpdatedEvent, latest)
 305		// Signal to the dispatcher (backend.runAgent) that the
 306		// authoritative terminal RunComplete for this run was already
 307		// emitted, so it does not publish a duplicate fallback for the
 308		// error it is about to receive.
 309		MarkRunCompletePublished(ctx)
 310	}
 311	return result, originalErr
 312}
 313
 314func getProviderOptions(model Model, providerCfg config.ProviderConfig) fantasy.ProviderOptions {
 315	options := fantasy.ProviderOptions{}
 316
 317	cfgOpts := []byte("{}")
 318	providerCfgOpts := []byte("{}")
 319	catwalkOpts := []byte("{}")
 320
 321	if model.ModelCfg.ProviderOptions != nil {
 322		data, err := json.Marshal(model.ModelCfg.ProviderOptions)
 323		if err == nil {
 324			cfgOpts = data
 325		}
 326	}
 327
 328	if providerCfg.ProviderOptions != nil {
 329		data, err := json.Marshal(providerCfg.ProviderOptions)
 330		if err == nil {
 331			providerCfgOpts = data
 332		}
 333	}
 334
 335	if model.CatwalkCfg.Options.ProviderOptions != nil {
 336		data, err := json.Marshal(model.CatwalkCfg.Options.ProviderOptions)
 337		if err == nil {
 338			catwalkOpts = data
 339		}
 340	}
 341
 342	readers := []io.Reader{
 343		bytes.NewReader(catwalkOpts),
 344		bytes.NewReader(providerCfgOpts),
 345		bytes.NewReader(cfgOpts),
 346	}
 347
 348	got, err := jsons.Merge(readers)
 349	if err != nil {
 350		slog.Error("Could not merge call config", "err", err)
 351		return options
 352	}
 353
 354	mergedOptions := make(map[string]any)
 355
 356	err = json.Unmarshal([]byte(got), &mergedOptions)
 357	if err != nil {
 358		slog.Error("Could not create config for call", "err", err)
 359		return options
 360	}
 361
 362	shouldSetEffort := model.CatwalkCfg.CanReason &&
 363		slices.Contains(model.CatwalkCfg.ReasoningLevels, model.ModelCfg.ReasoningEffort)
 364
 365	switch providerCfg.Type {
 366	case openai.Name, azure.Name:
 367		_, hasReasoningEffort := mergedOptions["reasoning_effort"]
 368		if !hasReasoningEffort && shouldSetEffort {
 369			mergedOptions["reasoning_effort"] = model.ModelCfg.ReasoningEffort
 370		}
 371		if openai.IsResponsesModel(model.CatwalkCfg.ID) {
 372			if openai.IsResponsesReasoningModel(model.CatwalkCfg.ID) {
 373				mergedOptions["reasoning_summary"] = "auto"
 374				mergedOptions["include"] = []openai.IncludeType{openai.IncludeReasoningEncryptedContent}
 375			}
 376			parsed, err := openai.ParseResponsesOptions(mergedOptions)
 377			if err == nil {
 378				options[openai.Name] = parsed
 379			}
 380		} else {
 381			parsed, err := openai.ParseOptions(mergedOptions)
 382			if err == nil {
 383				options[openai.Name] = parsed
 384			}
 385		}
 386	case anthropic.Name, bedrock.Name:
 387		var (
 388			_, hasEffort = mergedOptions["effort"]
 389			_, hasThink  = mergedOptions["thinking"]
 390			extraBody    = make(map[string]any)
 391		)
 392
 393		switch providerCfg.ID {
 394		case string(catwalk.InferenceProviderAlibabaSingapore):
 395			switch {
 396			case !hasEffort && shouldSetEffort:
 397				extraBody["reasoning_effort"] = model.ModelCfg.ReasoningEffort
 398			case !hasThink && model.CatwalkCfg.CanReason:
 399				if model.ModelCfg.Think {
 400					extraBody["thinking"] = map[string]any{"type": "enabled"}
 401				} else {
 402					extraBody["thinking"] = map[string]any{"type": "disabled"}
 403				}
 404			}
 405			mergedOptions["extra_body"] = extraBody
 406
 407		default:
 408			switch {
 409			case !hasEffort && shouldSetEffort:
 410				mergedOptions["effort"] = model.ModelCfg.ReasoningEffort
 411			case !hasThink && model.ModelCfg.Think:
 412				mergedOptions["thinking"] = map[string]any{"budget_tokens": 2000}
 413			}
 414		}
 415
 416		parsed, err := anthropic.ParseOptions(mergedOptions)
 417		if err == nil {
 418			options[anthropic.Name] = parsed
 419		}
 420
 421	case openrouter.Name:
 422		_, hasReasoning := mergedOptions["reasoning"]
 423		if !hasReasoning && shouldSetEffort {
 424			mergedOptions["reasoning"] = map[string]any{
 425				"enabled": true,
 426				"effort":  model.ModelCfg.ReasoningEffort,
 427			}
 428		}
 429		parsed, err := openrouter.ParseOptions(mergedOptions)
 430		if err == nil {
 431			options[openrouter.Name] = parsed
 432		}
 433	case vercel.Name:
 434		_, hasReasoning := mergedOptions["reasoning"]
 435		if !hasReasoning && shouldSetEffort {
 436			mergedOptions["reasoning"] = map[string]any{
 437				"enabled": true,
 438				"effort":  model.ModelCfg.ReasoningEffort,
 439			}
 440		}
 441		parsed, err := vercel.ParseOptions(mergedOptions)
 442		if err == nil {
 443			options[vercel.Name] = parsed
 444		}
 445	case google.Name:
 446		_, hasReasoning := mergedOptions["thinking_config"]
 447		if !hasReasoning {
 448			if strings.HasPrefix(model.CatwalkCfg.ID, "gemini-2") {
 449				mergedOptions["thinking_config"] = map[string]any{
 450					"thinking_budget":  2000,
 451					"include_thoughts": true,
 452				}
 453			} else {
 454				mergedOptions["thinking_config"] = map[string]any{
 455					"thinking_level":   model.ModelCfg.ReasoningEffort,
 456					"include_thoughts": true,
 457				}
 458			}
 459		}
 460		parsed, err := google.ParseOptions(mergedOptions)
 461		if err == nil {
 462			options[google.Name] = parsed
 463		}
 464	case openaicompat.Name, hyper.Name:
 465		extraBody := make(map[string]any)
 466
 467		_, hasReasoningEffort := mergedOptions["reasoning_effort"]
 468		if !hasReasoningEffort && shouldSetEffort {
 469			switch providerCfg.ID {
 470			case string(catwalk.InferenceProviderIoNet):
 471				extraBody["reasoning"] = map[string]string{"effort": model.ModelCfg.ReasoningEffort}
 472			default:
 473				mergedOptions["reasoning_effort"] = model.ModelCfg.ReasoningEffort
 474			}
 475		}
 476
 477		// "reasoning effort" is a standard OpenAI field, but "thinking" is not.
 478		// Setting it in the right way for each provider.
 479		// TODO: Abstract this in Fantasy somehow?
 480		// TODO: Allow custom providers to specify how to set this?
 481		switch providerCfg.ID {
 482		case hyper.Name:
 483			extraBody["thinking"] = model.ModelCfg.Think
 484		case string(catwalk.InferenceProviderIoNet):
 485			if _, ok := extraBody["reasoning"]; !ok && model.CatwalkCfg.CanReason {
 486				if model.ModelCfg.Think {
 487					extraBody["reasoning"] = map[string]string{"effort": "medium"}
 488				} else {
 489					extraBody["reasoning"] = map[string]string{"effort": "none"}
 490				}
 491			}
 492		case string(catwalk.InferenceProviderZAI), string(catwalk.InferenceProviderDeepSeek):
 493			if model.ModelCfg.Think || model.ModelCfg.ReasoningEffort != "" {
 494				extraBody["thinking"] = map[string]any{
 495					"type": "enabled",
 496				}
 497			} else {
 498				extraBody["thinking"] = map[string]any{
 499					"type": "disabled",
 500				}
 501			}
 502		case string(catwalk.InferenceProviderAlibabaSingapore):
 503			if model.CatwalkCfg.CanReason {
 504				extraBody["enable_thinking"] = model.ModelCfg.Think
 505			}
 506		}
 507
 508		mergedOptions["extra_body"] = extraBody
 509
 510		parsed, err := openaicompat.ParseOptions(mergedOptions)
 511		if err == nil {
 512			options[openaicompat.Name] = parsed
 513		}
 514	}
 515
 516	return options
 517}
 518
 519func mergeCallOptions(model Model, cfg config.ProviderConfig) (fantasy.ProviderOptions, *float64, *float64, *int64, *float64, *float64) {
 520	modelOptions := getProviderOptions(model, cfg)
 521	temp := cmp.Or(model.ModelCfg.Temperature, model.CatwalkCfg.Options.Temperature)
 522	topP := cmp.Or(model.ModelCfg.TopP, model.CatwalkCfg.Options.TopP)
 523	topK := cmp.Or(model.ModelCfg.TopK, model.CatwalkCfg.Options.TopK)
 524	freqPenalty := cmp.Or(model.ModelCfg.FrequencyPenalty, model.CatwalkCfg.Options.FrequencyPenalty)
 525	presPenalty := cmp.Or(model.ModelCfg.PresencePenalty, model.CatwalkCfg.Options.PresencePenalty)
 526	return modelOptions, temp, topP, topK, freqPenalty, presPenalty
 527}
 528
 529func (c *coordinator) buildAgent(ctx context.Context, prompt *prompt.Prompt, agent config.Agent, isSubAgent bool) (SessionAgent, error) {
 530	large, small, err := c.buildAgentModels(ctx, isSubAgent)
 531	if err != nil {
 532		return nil, err
 533	}
 534
 535	largeProviderCfg, _ := c.cfg.Config().Providers.Get(large.ModelCfg.Provider)
 536	result := NewSessionAgent(SessionAgentOptions{
 537		LargeModel:           large,
 538		SmallModel:           small,
 539		SystemPromptPrefix:   largeProviderCfg.SystemPromptPrefix,
 540		SystemPrompt:         "",
 541		IsSubAgent:           isSubAgent,
 542		DisableAutoSummarize: c.cfg.Config().Options.DisableAutoSummarize,
 543		IsYolo:               c.permissions.SkipRequests(),
 544		Sessions:             c.sessions,
 545		Messages:             c.messages,
 546		Tools:                nil,
 547		Notify:               c.notify,
 548		RunComplete:          c.runComplete,
 549	})
 550
 551	c.readyWg.Go(func() error {
 552		systemPrompt, err := prompt.Build(ctx, large.Model.Provider(), large.Model.Model(), c.cfg)
 553		if err != nil {
 554			return err
 555		}
 556		result.SetSystemPrompt(systemPrompt)
 557		return nil
 558	})
 559
 560	c.readyWg.Go(func() error {
 561		tools, err := c.buildTools(ctx, agent, isSubAgent)
 562		if err != nil {
 563			return err
 564		}
 565		result.SetTools(tools)
 566		return nil
 567	})
 568
 569	return result, nil
 570}
 571
 572func (c *coordinator) buildTools(ctx context.Context, agent config.Agent, isSubAgent bool) ([]fantasy.AgentTool, error) {
 573	var allTools []fantasy.AgentTool
 574	if slices.Contains(agent.AllowedTools, AgentToolName) {
 575		agentTool, err := c.agentTool(ctx)
 576		if err != nil {
 577			return nil, err
 578		}
 579		allTools = append(allTools, agentTool)
 580	}
 581
 582	if slices.Contains(agent.AllowedTools, tools.AgenticFetchToolName) {
 583		agenticFetchTool, err := c.agenticFetchTool(ctx, nil)
 584		if err != nil {
 585			return nil, err
 586		}
 587		allTools = append(allTools, agenticFetchTool)
 588	}
 589
 590	// Get the model name for the agent
 591	modelID := ""
 592	if modelCfg, ok := c.cfg.Config().Models[agent.Model]; ok {
 593		if model := c.cfg.Config().GetModel(modelCfg.Provider, modelCfg.Model); model != nil {
 594			modelID = model.ID
 595		}
 596	}
 597
 598	logFile := filepath.Join(c.cfg.Config().Options.DataDirectory, "logs", "crush.log")
 599
 600	// Build hook runner if PreToolUse hooks are configured.
 601	var hookRunner *hooks.Runner
 602	if preToolHooks := c.cfg.Config().Hooks[hooks.EventPreToolUse]; len(preToolHooks) > 0 {
 603		hookRunner = hooks.NewRunner(preToolHooks, c.cfg.WorkingDir(), c.cfg.WorkingDir())
 604	}
 605
 606	allTools = append(
 607		allTools,
 608		tools.NewBashTool(c.permissions, c.cfg.WorkingDir(), c.cfg.Config().Options.Attribution, modelID),
 609		tools.NewCrushInfoTool(c.cfg, c.lspManager, c.allSkills, c.activeSkills, c.skillTracker),
 610		tools.NewCrushLogsTool(logFile),
 611		tools.NewJobOutputTool(),
 612		tools.NewJobKillTool(),
 613		tools.NewDownloadTool(c.permissions, c.cfg.WorkingDir(), nil),
 614		tools.NewEditTool(c.lspManager, c.permissions, c.history, c.filetracker, c.cfg.WorkingDir()),
 615		tools.NewMultiEditTool(c.lspManager, c.permissions, c.history, c.filetracker, c.cfg.WorkingDir()),
 616		tools.NewFetchTool(c.permissions, c.cfg.WorkingDir(), nil),
 617		tools.NewGlobTool(c.cfg.WorkingDir()),
 618		tools.NewGrepTool(c.cfg.WorkingDir(), c.cfg.Config().Tools.Grep),
 619		tools.NewLsTool(c.permissions, c.cfg.WorkingDir(), c.cfg.Config().Tools.Ls),
 620		tools.NewSourcegraphTool(nil),
 621		tools.NewTodosTool(c.sessions),
 622		tools.NewViewTool(c.lspManager, c.permissions, c.filetracker, c.skillTracker, c.cfg.WorkingDir(), c.cfg.Config().Options.SkillsPaths...),
 623		tools.NewWriteTool(c.lspManager, c.permissions, c.history, c.filetracker, c.cfg.WorkingDir()),
 624	)
 625
 626	// Add LSP tools if user has configured LSPs or auto_lsp is enabled (nil or true).
 627	if len(c.cfg.Config().LSP) > 0 || c.cfg.Config().Options.AutoLSP == nil || *c.cfg.Config().Options.AutoLSP {
 628		allTools = append(allTools, tools.NewDiagnosticsTool(c.lspManager), tools.NewReferencesTool(c.lspManager), tools.NewLSPRestartTool(c.lspManager))
 629	}
 630
 631	if len(c.cfg.Config().MCP) > 0 {
 632		allTools = append(
 633			allTools,
 634			tools.NewListMCPResourcesTool(c.cfg, c.permissions),
 635			tools.NewReadMCPResourceTool(c.cfg, c.permissions),
 636		)
 637	}
 638
 639	var filteredTools []fantasy.AgentTool
 640	for _, tool := range allTools {
 641		if slices.Contains(agent.AllowedTools, tool.Info().Name) {
 642			filteredTools = append(filteredTools, tool)
 643		}
 644	}
 645
 646	for _, tool := range tools.GetMCPTools(c.permissions, c.cfg, c.cfg.WorkingDir()) {
 647		if agent.AllowedMCP == nil {
 648			// No MCP restrictions
 649			filteredTools = append(filteredTools, tool)
 650			continue
 651		}
 652		if len(agent.AllowedMCP) == 0 {
 653			// No MCPs allowed
 654			slog.Debug("No MCPs allowed", "tool", tool.Name(), "agent", agent.Name)
 655			break
 656		}
 657
 658		for mcp, tools := range agent.AllowedMCP {
 659			if mcp != tool.MCP() {
 660				continue
 661			}
 662			if len(tools) == 0 || slices.Contains(tools, tool.MCPToolName()) {
 663				filteredTools = append(filteredTools, tool)
 664				break
 665			}
 666			slog.Debug("MCP not allowed", "tool", tool.Name(), "agent", agent.Name)
 667		}
 668	}
 669	slices.SortFunc(filteredTools, func(a, b fantasy.AgentTool) int {
 670		return strings.Compare(a.Info().Name, b.Info().Name)
 671	})
 672
 673	// Wrap tools with hook interception for the top-level agent only.
 674	// Sub-agents (the `agent` task tool, `agentic_fetch`, etc.) run
 675	// without hook interception to avoid firing the user's hook N times
 676	// per delegated turn. The top-level invocation of the sub-agent tool
 677	// itself is still wrapped from the coder's side.
 678	filteredTools = wrapToolsWithHooks(filteredTools, hookRunner, isSubAgent)
 679
 680	return filteredTools, nil
 681}
 682
 683// TODO: when we support multiple agents we need to change this so that we pass in the agent specific model config
 684func (c *coordinator) buildAgentModels(ctx context.Context, isSubAgent bool) (Model, Model, error) {
 685	largeModelCfg, ok := c.cfg.Config().Models[config.SelectedModelTypeLarge]
 686	if !ok {
 687		return Model{}, Model{}, errLargeModelNotSelected
 688	}
 689	smallModelCfg, ok := c.cfg.Config().Models[config.SelectedModelTypeSmall]
 690	if !ok {
 691		return Model{}, Model{}, errSmallModelNotSelected
 692	}
 693
 694	largeProviderCfg, ok := c.cfg.Config().Providers.Get(largeModelCfg.Provider)
 695	if !ok {
 696		return Model{}, Model{}, errLargeModelProviderNotConfigured
 697	}
 698
 699	largeProvider, err := c.buildProvider(largeProviderCfg, largeModelCfg, isSubAgent)
 700	if err != nil {
 701		return Model{}, Model{}, err
 702	}
 703
 704	smallProviderCfg, ok := c.cfg.Config().Providers.Get(smallModelCfg.Provider)
 705	if !ok {
 706		return Model{}, Model{}, errSmallModelProviderNotConfigured
 707	}
 708
 709	smallProvider, err := c.buildProvider(smallProviderCfg, smallModelCfg, true)
 710	if err != nil {
 711		return Model{}, Model{}, err
 712	}
 713
 714	var largeCatwalkModel *catwalk.Model
 715	var smallCatwalkModel *catwalk.Model
 716
 717	for _, m := range largeProviderCfg.Models {
 718		if m.ID == largeModelCfg.Model {
 719			largeCatwalkModel = &m
 720		}
 721	}
 722	for _, m := range smallProviderCfg.Models {
 723		if m.ID == smallModelCfg.Model {
 724			smallCatwalkModel = &m
 725		}
 726	}
 727
 728	if largeCatwalkModel == nil {
 729		return Model{}, Model{}, errLargeModelNotFound
 730	}
 731
 732	if smallCatwalkModel == nil {
 733		return Model{}, Model{}, errSmallModelNotFound
 734	}
 735
 736	largeModelID := largeModelCfg.Model
 737	smallModelID := smallModelCfg.Model
 738
 739	if largeModelCfg.Provider == openrouter.Name && isExactoSupported(largeModelID) {
 740		largeModelID += ":exacto"
 741	}
 742
 743	if smallModelCfg.Provider == openrouter.Name && isExactoSupported(smallModelID) {
 744		smallModelID += ":exacto"
 745	}
 746
 747	largeModel, err := largeProvider.LanguageModel(ctx, largeModelID)
 748	if err != nil {
 749		return Model{}, Model{}, err
 750	}
 751	smallModel, err := smallProvider.LanguageModel(ctx, smallModelID)
 752	if err != nil {
 753		return Model{}, Model{}, err
 754	}
 755
 756	return Model{
 757			Model:      largeModel,
 758			CatwalkCfg: *largeCatwalkModel,
 759			ModelCfg:   largeModelCfg,
 760			FlatRate:   largeProviderCfg.FlatRate,
 761		}, Model{
 762			Model:      smallModel,
 763			CatwalkCfg: *smallCatwalkModel,
 764			ModelCfg:   smallModelCfg,
 765			FlatRate:   smallProviderCfg.FlatRate,
 766		}, nil
 767}
 768
 769func (c *coordinator) buildAnthropicProvider(baseURL, apiKey string, headers map[string]string, providerID string) (fantasy.Provider, error) {
 770	var opts []anthropic.Option
 771
 772	switch {
 773	case strings.HasPrefix(apiKey, "Bearer "):
 774		// NOTE: Prevent the SDK from picking up the API key from env.
 775		os.Setenv("ANTHROPIC_API_KEY", "")
 776		headers["Authorization"] = apiKey
 777	case providerID == string(catwalk.InferenceProviderMiniMax) || providerID == string(catwalk.InferenceProviderMiniMaxChina):
 778		// NOTE: Prevent the SDK from picking up the API key from env.
 779		os.Setenv("ANTHROPIC_API_KEY", "")
 780		headers["Authorization"] = "Bearer " + apiKey
 781	case apiKey != "":
 782		// X-Api-Key header
 783		opts = append(opts, anthropic.WithAPIKey(apiKey))
 784	}
 785
 786	if len(headers) > 0 {
 787		opts = append(opts, anthropic.WithHeaders(headers))
 788	}
 789
 790	if baseURL != "" {
 791		opts = append(opts, anthropic.WithBaseURL(baseURL))
 792	}
 793
 794	if c.cfg.Config().Options.Debug {
 795		httpClient := log.NewHTTPClient()
 796		opts = append(opts, anthropic.WithHTTPClient(httpClient))
 797	}
 798	return anthropic.New(opts...)
 799}
 800
 801func (c *coordinator) buildOpenaiProvider(baseURL, apiKey string, headers map[string]string) (fantasy.Provider, error) {
 802	opts := []openai.Option{
 803		openai.WithAPIKey(apiKey),
 804		openai.WithUseResponsesAPI(),
 805	}
 806	if c.cfg.Config().Options.Debug {
 807		httpClient := log.NewHTTPClient()
 808		opts = append(opts, openai.WithHTTPClient(httpClient))
 809	}
 810	if len(headers) > 0 {
 811		opts = append(opts, openai.WithHeaders(headers))
 812	}
 813	if baseURL != "" {
 814		opts = append(opts, openai.WithBaseURL(baseURL))
 815	}
 816	return openai.New(opts...)
 817}
 818
 819func (c *coordinator) buildOpenrouterProvider(_, apiKey string, headers map[string]string) (fantasy.Provider, error) {
 820	opts := []openrouter.Option{
 821		openrouter.WithAPIKey(apiKey),
 822	}
 823	if c.cfg.Config().Options.Debug {
 824		httpClient := log.NewHTTPClient()
 825		opts = append(opts, openrouter.WithHTTPClient(httpClient))
 826	}
 827	if len(headers) > 0 {
 828		opts = append(opts, openrouter.WithHeaders(headers))
 829	}
 830	return openrouter.New(opts...)
 831}
 832
 833func (c *coordinator) buildVercelProvider(_, apiKey string, headers map[string]string) (fantasy.Provider, error) {
 834	opts := []vercel.Option{
 835		vercel.WithAPIKey(apiKey),
 836	}
 837	if c.cfg.Config().Options.Debug {
 838		httpClient := log.NewHTTPClient()
 839		opts = append(opts, vercel.WithHTTPClient(httpClient))
 840	}
 841	if len(headers) > 0 {
 842		opts = append(opts, vercel.WithHeaders(headers))
 843	}
 844	return vercel.New(opts...)
 845}
 846
 847func (c *coordinator) buildOpenaiCompatProvider(baseURL, apiKey string, headers map[string]string, extraBody map[string]any, providerID string, isSubAgent bool) (fantasy.Provider, error) {
 848	opts := []openaicompat.Option{
 849		openaicompat.WithBaseURL(baseURL),
 850		openaicompat.WithAPIKey(apiKey),
 851	}
 852
 853	// Set HTTP client based on provider and debug mode.
 854	var httpClient *http.Client
 855	switch providerID {
 856	case string(catwalk.InferenceProviderCopilot):
 857		opts = append(
 858			opts,
 859			openaicompat.WithUseResponsesAPI(),
 860			openaicompat.WithResponsesAPIFunc(func(modelID string) bool {
 861				return copilotResponsesModels[modelID]
 862			}),
 863		)
 864		httpClient = copilot.NewClient(isSubAgent, c.cfg.Config().Options.Debug)
 865	}
 866	if httpClient == nil && c.cfg.Config().Options.Debug {
 867		httpClient = log.NewHTTPClient()
 868	}
 869	if httpClient != nil {
 870		opts = append(opts, openaicompat.WithHTTPClient(httpClient))
 871	}
 872
 873	if len(headers) > 0 {
 874		opts = append(opts, openaicompat.WithHeaders(headers))
 875	}
 876
 877	for extraKey, extraValue := range extraBody {
 878		opts = append(opts, openaicompat.WithSDKOptions(openaisdk.WithJSONSet(extraKey, extraValue)))
 879	}
 880
 881	return openaicompat.New(opts...)
 882}
 883
 884func (c *coordinator) buildAzureProvider(baseURL, apiKey string, headers map[string]string, options map[string]string) (fantasy.Provider, error) {
 885	opts := []azure.Option{
 886		azure.WithBaseURL(baseURL),
 887		azure.WithAPIKey(apiKey),
 888		azure.WithUseResponsesAPI(),
 889	}
 890	if c.cfg.Config().Options.Debug {
 891		httpClient := log.NewHTTPClient()
 892		opts = append(opts, azure.WithHTTPClient(httpClient))
 893	}
 894	if options == nil {
 895		options = make(map[string]string)
 896	}
 897	if apiVersion, ok := options["apiVersion"]; ok {
 898		opts = append(opts, azure.WithAPIVersion(apiVersion))
 899	}
 900	if len(headers) > 0 {
 901		opts = append(opts, azure.WithHeaders(headers))
 902	}
 903
 904	return azure.New(opts...)
 905}
 906
 907func (c *coordinator) buildBedrockProvider(apiKey string, headers map[string]string, providerID string) (fantasy.Provider, error) {
 908	var opts []bedrock.Option
 909	if c.cfg.Config().Options.Debug {
 910		httpClient := log.NewHTTPClient()
 911		opts = append(opts, bedrock.WithHTTPClient(httpClient))
 912	}
 913	if len(headers) > 0 {
 914		opts = append(opts, bedrock.WithHeaders(headers))
 915	}
 916
 917	switch {
 918	case apiKey != "":
 919		opts = append(opts, bedrock.WithAPIKey(apiKey))
 920	case os.Getenv("AWS_BEARER_TOKEN_BEDROCK") != "":
 921		opts = append(opts, bedrock.WithAPIKey(os.Getenv("AWS_BEARER_TOKEN_BEDROCK")))
 922	default:
 923		// Skip, let the SDK do authentication.
 924	}
 925
 926	switch providerID {
 927	case string(catwalk.InferenceProviderBedrockEurope):
 928		opts = append(opts, bedrock.WithRegion("eu-west-1"))
 929	default:
 930		opts = append(opts, bedrock.WithRegion("us-east-1"))
 931	}
 932
 933	return bedrock.New(opts...)
 934}
 935
 936func (c *coordinator) buildGoogleProvider(baseURL, apiKey string, headers map[string]string) (fantasy.Provider, error) {
 937	opts := []google.Option{
 938		google.WithBaseURL(baseURL),
 939		google.WithGeminiAPIKey(apiKey),
 940	}
 941	if c.cfg.Config().Options.Debug {
 942		httpClient := log.NewHTTPClient()
 943		opts = append(opts, google.WithHTTPClient(httpClient))
 944	}
 945	if len(headers) > 0 {
 946		opts = append(opts, google.WithHeaders(headers))
 947	}
 948	return google.New(opts...)
 949}
 950
 951func (c *coordinator) buildGoogleVertexProvider(headers map[string]string, options map[string]string) (fantasy.Provider, error) {
 952	opts := []google.Option{}
 953	if c.cfg.Config().Options.Debug {
 954		httpClient := log.NewHTTPClient()
 955		opts = append(opts, google.WithHTTPClient(httpClient))
 956	}
 957	if len(headers) > 0 {
 958		opts = append(opts, google.WithHeaders(headers))
 959	}
 960
 961	project := options["project"]
 962	location := options["location"]
 963
 964	opts = append(opts, google.WithVertex(project, location))
 965
 966	return google.New(opts...)
 967}
 968
 969func (c *coordinator) isAnthropicThinking(model config.SelectedModel) bool {
 970	if model.Think {
 971		return true
 972	}
 973	opts, err := anthropic.ParseOptions(model.ProviderOptions)
 974	return err == nil && opts.Thinking != nil
 975}
 976
 977func (c *coordinator) buildProvider(providerCfg config.ProviderConfig, model config.SelectedModel, isSubAgent bool) (fantasy.Provider, error) {
 978	headers := maps.Clone(providerCfg.ExtraHeaders)
 979	if headers == nil {
 980		headers = make(map[string]string)
 981	}
 982
 983	// handle special headers for anthropic
 984	if providerCfg.Type == anthropic.Name && c.isAnthropicThinking(model) {
 985		if v, ok := headers["anthropic-beta"]; ok {
 986			headers["anthropic-beta"] = v + ",interleaved-thinking-2025-05-14"
 987		} else {
 988			headers["anthropic-beta"] = "interleaved-thinking-2025-05-14"
 989		}
 990	}
 991
 992	apiKey, _ := c.cfg.Resolve(providerCfg.APIKey)
 993	baseURL, _ := c.cfg.Resolve(providerCfg.BaseURL)
 994
 995	switch providerCfg.ID {
 996	case string(catwalk.InferenceProviderOpenCodeGo), string(catwalk.InferenceProviderOpenCodeZen):
 997		if opencodeMessagesModels[model.Model] {
 998			baseURL = strings.TrimSuffix(baseURL, "/v1")
 999			return c.buildAnthropicProvider(baseURL, apiKey, headers, providerCfg.ID)
1000		}
1001	}
1002
1003	switch providerCfg.Type {
1004	case openai.Name:
1005		return c.buildOpenaiProvider(baseURL, apiKey, headers)
1006	case anthropic.Name:
1007		return c.buildAnthropicProvider(baseURL, apiKey, headers, providerCfg.ID)
1008	case openrouter.Name:
1009		return c.buildOpenrouterProvider(baseURL, apiKey, headers)
1010	case vercel.Name:
1011		return c.buildVercelProvider(baseURL, apiKey, headers)
1012	case azure.Name:
1013		return c.buildAzureProvider(baseURL, apiKey, headers, providerCfg.ExtraParams)
1014	case bedrock.Name:
1015		return c.buildBedrockProvider(apiKey, headers, providerCfg.ID)
1016	case google.Name:
1017		return c.buildGoogleProvider(baseURL, apiKey, headers)
1018	case "google-vertex":
1019		return c.buildGoogleVertexProvider(headers, providerCfg.ExtraParams)
1020	case openaicompat.Name, hyper.Name:
1021		switch providerCfg.ID {
1022		case hyper.Name:
1023			baseURL = hyper.BaseURL() + "/v1"
1024			headers["x-crush-id"] = event.GetID()
1025		case string(catwalk.InferenceProviderZAI):
1026			if providerCfg.ExtraBody == nil {
1027				providerCfg.ExtraBody = map[string]any{}
1028			}
1029			providerCfg.ExtraBody["tool_stream"] = true
1030		}
1031		return c.buildOpenaiCompatProvider(baseURL, apiKey, headers, providerCfg.ExtraBody, providerCfg.ID, isSubAgent)
1032	default:
1033		return nil, fmt.Errorf("provider type not supported: %q", providerCfg.Type)
1034	}
1035}
1036
1037func isExactoSupported(modelID string) bool {
1038	supportedModels := []string{
1039		"moonshotai/kimi-k2-0905",
1040		"deepseek/deepseek-v3.1-terminus",
1041		"z-ai/glm-4.6",
1042		"openai/gpt-oss-120b",
1043		"qwen/qwen3-coder",
1044	}
1045	return slices.Contains(supportedModels, modelID)
1046}
1047
1048// BeginAccepted reserves an accept slot for sessionID on the active
1049// agent and returns the ownership handle. It is the fire-and-forget
1050// dispatch path's only way to mark a run as accepted-but-not-yet-active
1051// so a cancel arriving before the run registers in activeRequests is not
1052// lost.
1053func (c *coordinator) BeginAccepted(sessionID string) *AcceptedRun {
1054	return c.currentAgent.BeginAccepted(sessionID)
1055}
1056
1057func (c *coordinator) Cancel(sessionID string) {
1058	c.currentAgent.Cancel(sessionID)
1059}
1060
1061func (c *coordinator) CancelAll() {
1062	c.currentAgent.CancelAll()
1063}
1064
1065func (c *coordinator) ClearQueue(sessionID string) {
1066	c.currentAgent.ClearQueue(sessionID)
1067}
1068
1069func (c *coordinator) IsBusy() bool {
1070	return c.currentAgent.IsBusy()
1071}
1072
1073func (c *coordinator) IsSessionBusy(sessionID string) bool {
1074	return c.currentAgent.IsSessionBusy(sessionID)
1075}
1076
1077func (c *coordinator) Model() Model {
1078	return c.currentAgent.Model()
1079}
1080
1081func (c *coordinator) UpdateModels(ctx context.Context) error {
1082	// build the models again so we make sure we get the latest config
1083	large, small, err := c.buildAgentModels(ctx, false)
1084	if err != nil {
1085		return err
1086	}
1087	c.currentAgent.SetModels(large, small)
1088
1089	agentCfg, ok := c.cfg.Config().Agents[config.AgentCoder]
1090	if !ok {
1091		return errCoderAgentNotConfigured
1092	}
1093
1094	tools, err := c.buildTools(ctx, agentCfg, false)
1095	if err != nil {
1096		return err
1097	}
1098	c.currentAgent.SetTools(tools)
1099	return nil
1100}
1101
1102func (c *coordinator) QueuedPrompts(sessionID string) int {
1103	return c.currentAgent.QueuedPrompts(sessionID)
1104}
1105
1106func (c *coordinator) QueuedPromptsList(sessionID string) []string {
1107	return c.currentAgent.QueuedPromptsList(sessionID)
1108}
1109
1110func (c *coordinator) Summarize(ctx context.Context, sessionID string) error {
1111	providerCfg, ok := c.cfg.Config().Providers.Get(c.currentAgent.Model().ModelCfg.Provider)
1112	if !ok {
1113		return errModelProviderNotConfigured
1114	}
1115
1116	if err := c.refreshTokenIfExpired(ctx, providerCfg); err != nil {
1117		slog.Error("Failed to refresh OAuth2 token before summarize. Proceeding with existing token.", "error", err)
1118	}
1119
1120	summarize := func() error {
1121		return c.currentAgent.Summarize(ctx, sessionID, getProviderOptions(c.currentAgent.Model(), providerCfg))
1122	}
1123
1124	return c.runWithUnauthorizedRetry(ctx, providerCfg, summarize)
1125}
1126
1127// refreshTokenIfExpired proactively refreshes the OAuth token if it has expired.
1128func (c *coordinator) refreshTokenIfExpired(ctx context.Context, providerCfg config.ProviderConfig) error {
1129	if providerCfg.OAuthToken == nil || !providerCfg.OAuthToken.IsExpired() {
1130		return nil
1131	}
1132	slog.Debug("Token needs to be refreshed", "provider", providerCfg.ID)
1133	return c.refreshOAuth2Token(ctx, providerCfg)
1134}
1135
1136// runWithUnauthorizedRetry executes fn. If fn returns a 401 error, it
1137// attempts to refresh credentials and re-runs fn once. Returns the
1138// final error: from the retry if a retry was attempted, otherwise from
1139// the original run. Callers that need to notify the user on persistent
1140// failure should check isUnauthorized on the returned error.
1141func (c *coordinator) runWithUnauthorizedRetry(ctx context.Context, providerCfg config.ProviderConfig, fn func() error) error {
1142	err := fn()
1143	if err != nil && c.isUnauthorized(err) {
1144		if retryErr := c.retryAfterUnauthorized(ctx, providerCfg); retryErr == nil {
1145			return fn()
1146		}
1147	}
1148	return err
1149}
1150
1151// retryAfterUnauthorized attempts to refresh credentials after receiving a 401
1152// and returns nil if retry should be attempted.
1153func (c *coordinator) retryAfterUnauthorized(ctx context.Context, providerCfg config.ProviderConfig) error {
1154	switch {
1155	case providerCfg.OAuthToken != nil:
1156		slog.Debug("Received 401. Refreshing token and retrying", "provider", providerCfg.ID)
1157		return c.refreshOAuth2Token(ctx, providerCfg)
1158	case strings.Contains(providerCfg.APIKeyTemplate, "$"):
1159		slog.Debug("Received 401. Refreshing API Key template and retrying", "provider", providerCfg.ID)
1160		return c.refreshApiKeyTemplate(ctx, providerCfg)
1161	default:
1162		return nil
1163	}
1164}
1165
1166func (c *coordinator) isUnauthorized(err error) bool {
1167	var providerErr *fantasy.ProviderError
1168	return errors.As(err, &providerErr) && providerErr.StatusCode == http.StatusUnauthorized
1169}
1170
1171func (c *coordinator) refreshOAuth2Token(ctx context.Context, providerCfg config.ProviderConfig) error {
1172	if err := c.cfg.RefreshOAuthToken(ctx, config.ScopeGlobal, providerCfg.ID); err != nil {
1173		slog.Error("Failed to refresh OAuth token after 401 error", "provider", providerCfg.ID, "error", err)
1174		return err
1175	}
1176	if err := c.UpdateModels(ctx); err != nil {
1177		return err
1178	}
1179	return nil
1180}
1181
1182func (c *coordinator) refreshApiKeyTemplate(ctx context.Context, providerCfg config.ProviderConfig) error {
1183	newAPIKey, err := c.cfg.Resolve(providerCfg.APIKeyTemplate)
1184	if err != nil {
1185		slog.Error("Failed to re-resolve API key after 401 error", "provider", providerCfg.ID, "error", err)
1186		return err
1187	}
1188
1189	providerCfg.APIKey = newAPIKey
1190	c.cfg.Config().Providers.Set(providerCfg.ID, providerCfg)
1191
1192	if err := c.UpdateModels(ctx); err != nil {
1193		return err
1194	}
1195	return nil
1196}
1197
1198// subAgentParams holds the parameters for running a sub-agent.
1199type subAgentParams struct {
1200	Agent          SessionAgent
1201	SessionID      string
1202	AgentMessageID string
1203	ToolCallID     string
1204	Prompt         string
1205	SessionTitle   string
1206	// SessionSetup is an optional callback invoked after session creation
1207	// but before agent execution, for custom session configuration.
1208	SessionSetup func(sessionID string)
1209}
1210
1211// runSubAgent runs a sub-agent and handles session management and cost accumulation.
1212// It creates a sub-session, runs the agent with the given prompt, and propagates
1213// the cost to the parent session.
1214func (c *coordinator) runSubAgent(ctx context.Context, params subAgentParams) (fantasy.ToolResponse, error) {
1215	// Create sub-session
1216	agentToolSessionID := c.sessions.CreateAgentToolSessionID(params.AgentMessageID, params.ToolCallID)
1217	session, err := c.sessions.CreateTaskSession(ctx, agentToolSessionID, params.SessionID, params.SessionTitle)
1218	if err != nil {
1219		return fantasy.ToolResponse{}, fmt.Errorf("create session: %w", err)
1220	}
1221
1222	// Call session setup function if provided
1223	if params.SessionSetup != nil {
1224		params.SessionSetup(session.ID)
1225	}
1226
1227	// Get model configuration
1228	model := params.Agent.Model()
1229	maxTokens := model.CatwalkCfg.DefaultMaxTokens
1230	if model.ModelCfg.MaxTokens != 0 {
1231		maxTokens = model.ModelCfg.MaxTokens
1232	}
1233
1234	providerCfg, ok := c.cfg.Config().Providers.Get(model.ModelCfg.Provider)
1235	if !ok {
1236		return fantasy.ToolResponse{}, errModelProviderNotConfigured
1237	}
1238
1239	// Run the agent
1240	run := func() (*fantasy.AgentResult, error) {
1241		return params.Agent.Run(ctx, SessionAgentCall{
1242			SessionID:        session.ID,
1243			Prompt:           params.Prompt,
1244			MaxOutputTokens:  maxTokens,
1245			ProviderOptions:  getProviderOptions(model, providerCfg),
1246			Temperature:      model.ModelCfg.Temperature,
1247			TopP:             model.ModelCfg.TopP,
1248			TopK:             model.ModelCfg.TopK,
1249			FrequencyPenalty: model.ModelCfg.FrequencyPenalty,
1250			PresencePenalty:  model.ModelCfg.PresencePenalty,
1251			NonInteractive:   true,
1252		})
1253	}
1254	var result *fantasy.AgentResult
1255	err = c.runWithUnauthorizedRetry(ctx, providerCfg, func() error {
1256		var runErr error
1257		result, runErr = run()
1258		return runErr
1259	})
1260	// Notify only if still unauthorized after retry.
1261	if err != nil && c.isUnauthorized(err) && c.notify != nil && model.ModelCfg.Provider == hyper.Name {
1262		c.notify.Publish(pubsub.CreatedEvent, notify.Notification{
1263			Type:       notify.TypeReAuthenticate,
1264			ProviderID: model.ModelCfg.Provider,
1265		})
1266	}
1267	if err != nil {
1268		return fantasy.NewTextErrorResponse(fmt.Sprintf("Failed to generate response: %s", err)), nil
1269	}
1270
1271	// Update parent session cost
1272	if err := c.updateParentSessionCost(ctx, session.ID, params.SessionID); err != nil {
1273		return fantasy.ToolResponse{}, err
1274	}
1275
1276	return fantasy.NewTextResponse(result.Response.Content.Text()), nil
1277}
1278
1279// updateParentSessionCost accumulates the cost from a child session to its parent session.
1280func (c *coordinator) updateParentSessionCost(ctx context.Context, childSessionID, parentSessionID string) error {
1281	childSession, err := c.sessions.Get(ctx, childSessionID)
1282	if err != nil {
1283		return fmt.Errorf("get child session: %w", err)
1284	}
1285
1286	parentSession, err := c.sessions.Get(ctx, parentSessionID)
1287	if err != nil {
1288		return fmt.Errorf("get parent session: %w", err)
1289	}
1290
1291	parentSession.Cost += childSession.Cost
1292
1293	if _, err := c.sessions.Save(ctx, parentSession); err != nil {
1294		return fmt.Errorf("save parent session: %w", err)
1295	}
1296
1297	return nil
1298}
1299
1300// discoverSkills is a thin fallback wrapper used only when no
1301// skills.Manager has been threaded through to the coordinator. All
1302// production call sites (backend.CreateWorkspace, setupLocalWorkspace)
1303// run discovery in advance and pass the results via the manager;
1304// reaching this path means a caller bypassed both. It deliberately does
1305// NOT publish to the package-level broker — there are no subscribers in
1306// that case, so doing so would be misleading without delivering the
1307// snapshot anywhere useful.
1308func discoverSkills(cfg *config.ConfigStore) (allSkills, activeSkills []*skills.Skill) {
1309	opts := cfg.Config().Options
1310	var paths, disabled []string
1311	if opts != nil {
1312		paths = opts.SkillsPaths
1313		disabled = opts.DisabledSkills
1314	}
1315	var resolver func(string) (string, error)
1316	if r := cfg.Resolver(); r != nil {
1317		resolver = r.ResolveValue
1318	}
1319	allSkills, activeSkills, states := skills.DiscoverFromConfig(skills.DiscoveryConfig{
1320		SkillsPaths:    paths,
1321		DisabledSkills: disabled,
1322		Resolver:       resolver,
1323	})
1324	logDiscoveryStats(states, paths, allSkills, activeSkills, disabled)
1325	return allSkills, activeSkills
1326}
1327
1328// logTurnSkillUsage emits a per-turn diagnostic line showing which skills
1329// (if any) were loaded during this turn and which looked relevant based on
1330// a cheap keyword match against the user prompt. The goal is to surface
1331// "should-have-loaded but didn't" situations for later analysis.
1332//
1333// Logged at Info level under component=skills; heavy fields are elided when
1334// there is nothing interesting to report.
1335func logTurnSkillUsage(
1336	sessionID string,
1337	prompt string,
1338	activeSkills []*skills.Skill,
1339	tracker *skills.Tracker,
1340	before []string,
1341) {
1342	if tracker == nil || len(activeSkills) == 0 {
1343		return
1344	}
1345
1346	after := tracker.LoadedNames()
1347
1348	beforeSet := make(map[string]bool, len(before))
1349	for _, n := range before {
1350		beforeSet[n] = true
1351	}
1352	var loadedThisTurn []string
1353	for _, n := range after {
1354		if !beforeSet[n] {
1355			loadedThisTurn = append(loadedThisTurn, n)
1356		}
1357	}
1358
1359	slog.Info(
1360		"Skill turn summary",
1361		"component", "skills",
1362		"session_id", sessionID,
1363		"prompt_len", len(prompt),
1364		"active_total", len(activeSkills),
1365		"loaded_total", len(after),
1366		"loaded_this_turn", loadedThisTurn,
1367	)
1368}
1369
1370// logDiscoveryStats emits a single structured log line summarising skill
1371// discovery for the current session. It is intentionally low-volume: one
1372// line per session start. Builtin vs user counts are derived from the
1373// SkillState.Path — builtin states use the "builtin/" embed prefix.
1374func logDiscoveryStats(
1375	states []*skills.SkillState,
1376	userPaths []string,
1377	allSkills, activeSkills []*skills.Skill,
1378	disabled []string,
1379) {
1380	var builtinOK, builtinErr, userOK, userErr int
1381	for _, s := range states {
1382		isBuiltin := strings.HasPrefix(s.Path, "builtin/")
1383		switch {
1384		case isBuiltin && s.State == skills.StateNormal:
1385			builtinOK++
1386		case isBuiltin && s.State == skills.StateError:
1387			builtinErr++
1388		case !isBuiltin && s.State == skills.StateNormal:
1389			userOK++
1390		case !isBuiltin && s.State == skills.StateError:
1391			userErr++
1392		}
1393	}
1394
1395	activeNames := make([]string, 0, len(activeSkills))
1396	for _, s := range activeSkills {
1397		activeNames = append(activeNames, s.Name)
1398	}
1399
1400	xml := skills.ToPromptXML(activeSkills)
1401
1402	slog.Info(
1403		"Skill discovery complete",
1404		"component", "skills",
1405		"builtin_ok", builtinOK,
1406		"builtin_errors", builtinErr,
1407		"user_ok", userOK,
1408		"user_errors", userErr,
1409		"user_paths", len(userPaths),
1410		"deduped_total", len(allSkills),
1411		"active", len(activeSkills),
1412		"disabled", len(disabled),
1413		"prompt_bytes", len(xml),
1414		"prompt_tok_est", skills.ApproxTokenCount(xml),
1415		"active_names", activeNames,
1416	)
1417}