diff --git a/internal/agent/agenttest/coordinator.go b/internal/agent/agenttest/coordinator.go index 9cb1e139b20c4f54daf9ed89b2a2bf43ff72bfce..fdacb7e1292f8fcddcc903a0e70aba544d25fdd3 100644 --- a/internal/agent/agenttest/coordinator.go +++ b/internal/agent/agenttest/coordinator.go @@ -57,6 +57,7 @@ func NewCoordinator( selected := config.SelectedModel{Provider: providerID, Model: modelID} cfg.Config().Models[config.SelectedModelTypeLarge] = selected cfg.Config().Models[config.SelectedModelTypeSmall] = selected + cfg.SetupAgents() // Keep buildTools light: no sub-agent or agentic-fetch construction. coderCfg := cfg.Config().Agents[config.AgentCoder] diff --git a/internal/agent/coordinator.go b/internal/agent/coordinator.go index f5ca831e60cdb54edf0c0d7bfde83702a79701f1..bf05baa39b970a05922def09ab6c5ddb3b17dce1 100644 --- a/internal/agent/coordinator.go +++ b/internal/agent/coordinator.go @@ -294,6 +294,11 @@ func (c *coordinator) run(ctx context.Context, accept *AcceptedRun, sessionID st if hasLatest && c.runComplete != nil { c.runComplete.PublishMustDeliver(ctx, pubsub.UpdatedEvent, latest) + // Signal to the dispatcher (backend.runAgent) that the + // authoritative terminal RunComplete for this run was already + // emitted, so it does not publish a duplicate fallback for the + // error it is about to receive. + MarkRunCompletePublished(ctx) } return result, originalErr } diff --git a/internal/agent/run_marker.go b/internal/agent/run_marker.go new file mode 100644 index 0000000000000000000000000000000000000000..404cca1e8c41bb9179deb886552f3580a977fdfc --- /dev/null +++ b/internal/agent/run_marker.go @@ -0,0 +1,52 @@ +package agent + +import ( + "context" + "sync/atomic" +) + +// runCompleteMarkerKey is the unexported context key carrying a +// [runCompleteMarker] from the dispatch boundary (backend.runAgent) +// down into the coordinator. It lets the dispatcher learn whether the +// coordinator already published the authoritative terminal +// notify.RunComplete for the run, so a fallback terminal event is only +// emitted when one is actually missing (e.g. an error returned before +// sessionAgent.Run ever executed). It avoids a breaking change to the +// Coordinator interface. +type runCompleteMarkerKey struct{} + +// runCompleteMarker records whether a terminal RunComplete has been +// published for a run. It is shared by pointer through the context so +// a publish deep in the call stack is observable by the dispatcher +// after the call returns. +type runCompleteMarker struct { + published atomic.Bool +} + +// WithRunCompleteMarker returns ctx carrying a fresh marker the +// coordinator can flag via [MarkRunCompletePublished] once it emits the +// run's terminal RunComplete. Callers read the result with +// [RunCompletePublished]. Attaching the marker is optional: code paths +// without one simply skip the dedup signal. +func WithRunCompleteMarker(ctx context.Context) context.Context { + return context.WithValue(ctx, runCompleteMarkerKey{}, &runCompleteMarker{}) +} + +// MarkRunCompletePublished records that the authoritative terminal +// RunComplete has been published for the run carried by ctx. It is a +// no-op when no marker is present (e.g. the in-process/local Run path, +// which is not dispatched through backend.runAgent). +func MarkRunCompletePublished(ctx context.Context) { + if m, ok := ctx.Value(runCompleteMarkerKey{}).(*runCompleteMarker); ok { + m.published.Store(true) + } +} + +// RunCompletePublished reports whether [MarkRunCompletePublished] was +// called on ctx's marker. It returns false when no marker is present. +func RunCompletePublished(ctx context.Context) bool { + if m, ok := ctx.Value(runCompleteMarkerKey{}).(*runCompleteMarker); ok { + return m.published.Load() + } + return false +} diff --git a/internal/app/app.go b/internal/app/app.go index 9509fa3a9dc778d507d38f60c8ca523031b7ecb7..d8a3abc63b9901c528ec3cee7465eaa0b892349f 100644 --- a/internal/app/app.go +++ b/internal/app/app.go @@ -185,6 +185,14 @@ func (app *App) AgentNotifications() *pubsub.Broker[notify.Notification] { return app.agentNotifications } +// RunCompletions returns the broker for the authoritative per-run +// terminal RunComplete events. The dispatcher (backend.runAgent) uses +// it to emit a reliable terminal event when a run fails before the +// coordinator could publish one of its own. +func (app *App) RunCompletions() *pubsub.Broker[notify.RunComplete] { + return app.runCompletions +} + // resolveSession resolves which session to use for a non-interactive run // If continueSessionID is set, it looks up that session by ID // If useLast is set, it returns the most recently updated top-level session diff --git a/internal/backend/agent.go b/internal/backend/agent.go index 4af3b8f0d2f88ad5daff41f40664b303c948b263..3d08746ed35c07ab21221c8a6aa0df3941944fd9 100644 --- a/internal/backend/agent.go +++ b/internal/backend/agent.go @@ -61,14 +61,27 @@ func (b *Backend) SendMessage(workspaceID string, msg proto.AgentMessage) error // runAgent executes an accepted agent run for the workspace. It owns the // accept reservation (releasing it on return) and the runWG ticket added // by SendMessage. The run is bound to the workspace context so its -// lifetime is independent of any client's HTTP request. On a non-cancel -// error it surfaces the failure to observers via a notify.TypeAgentError -// notification; context.Canceled is expected (the FinishReasonCanceled -// marker is already published by sessionAgent.Run) and swallowed. +// lifetime is independent of any client's HTTP request. +// +// On a non-cancel error it surfaces the failure to observers via a +// notify.TypeAgentError notification (lossy, best-effort). That alone is +// not a reliable terminal signal: the agent-event fan-in uses lossy +// subscribers, so a `crush run` caller blocking on its RunID could hang +// if the event is dropped. To guarantee termination, when msg.RunID is +// non-empty and the coordinator did not already publish the run's +// authoritative terminal RunComplete (e.g. the error was returned before +// sessionAgent.Run executed, such as a readyWg or UpdateModels failure), +// runAgent emits an errored RunComplete on the must-deliver +// runCompletions broker so the waiter observes a deterministic terminal +// event. context.Canceled is expected (sessionAgent.Run already +// publishes the cancelled terminal marker) and produces no error +// terminal event. // // When msg.RunID is non-empty it is attached to the context via // agent.WithRunID so the coordinator can stamp the terminal -// notify.RunComplete event with that correlator. +// notify.RunComplete event with that correlator. A run-complete marker +// is also attached so the coordinator can report whether it published +// the terminal event, letting runAgent avoid a duplicate fallback. func (b *Backend) runAgent(ws *Workspace, msg proto.AgentMessage, accept *agent.AcceptedRun) { defer ws.runWG.Done() defer accept.Close() @@ -77,6 +90,7 @@ func (b *Backend) runAgent(ws *Workspace, msg proto.AgentMessage, accept *agent. if msg.RunID != "" { ctx = agent.WithRunID(ctx, msg.RunID) } + ctx = agent.WithRunCompleteMarker(ctx) _, err := ws.AgentCoordinator.RunAccepted(ctx, accept, msg.SessionID, msg.Prompt, proto.AttachmentsToMessage(msg.Attachments)...) if err == nil || errors.Is(err, context.Canceled) { @@ -89,6 +103,20 @@ func (b *Backend) runAgent(ws *Workspace, msg proto.AgentMessage, accept *agent. Type: notify.TypeAgentError, Message: err.Error(), }) + + // Reliable terminal fallback. Only needed when a RunID waiter + // exists and the coordinator has not already emitted the run's + // terminal RunComplete; otherwise this would be a duplicate. + if msg.RunID == "" || agent.RunCompletePublished(ctx) { + return + } + if rc := ws.RunCompletions(); rc != nil { + rc.PublishMustDeliver(ctx, pubsub.UpdatedEvent, notify.RunComplete{ + SessionID: msg.SessionID, + RunID: msg.RunID, + Error: err.Error(), + }) + } } // GetAgentInfo returns the agent's model and busy status. diff --git a/internal/backend/agent_runcomplete_test.go b/internal/backend/agent_runcomplete_test.go new file mode 100644 index 0000000000000000000000000000000000000000..be3df103e66b539685e42269d7ced0c7e7e94d86 --- /dev/null +++ b/internal/backend/agent_runcomplete_test.go @@ -0,0 +1,162 @@ +package backend + +import ( + "context" + "errors" + "testing" + "time" + + "charm.land/fantasy" + "github.com/charmbracelet/crush/internal/agent" + "github.com/charmbracelet/crush/internal/app" + "github.com/charmbracelet/crush/internal/message" + "github.com/charmbracelet/crush/internal/proto" + "github.com/google/uuid" + "github.com/stretchr/testify/require" +) + +// errorCoordinator is a minimal agent.Coordinator whose RunAccepted +// returns a configurable error. When markPublished is true it stamps +// the run-complete marker on the context before returning, simulating a +// real coordinator that already published the run's authoritative +// terminal RunComplete (so runAgent must not emit a duplicate fallback). +type errorCoordinator struct { + err error + markPublished bool +} + +func (c *errorCoordinator) Run(ctx context.Context, sessionID, prompt string, attachments ...message.Attachment) (*fantasy.AgentResult, error) { + return nil, c.err +} + +func (c *errorCoordinator) RunAccepted(ctx context.Context, accept *agent.AcceptedRun, sessionID, prompt string, attachments ...message.Attachment) (*fantasy.AgentResult, error) { + if c.markPublished { + agent.MarkRunCompletePublished(ctx) + } + return nil, c.err +} + +func (c *errorCoordinator) BeginAccepted(sessionID string) *agent.AcceptedRun { return nil } +func (c *errorCoordinator) Cancel(string) {} +func (c *errorCoordinator) CancelAll() {} +func (c *errorCoordinator) IsBusy() bool { return false } +func (c *errorCoordinator) IsSessionBusy(string) bool { return false } +func (c *errorCoordinator) QueuedPrompts(string) int { return 0 } +func (c *errorCoordinator) QueuedPromptsList(string) []string { return nil } +func (c *errorCoordinator) ClearQueue(string) {} +func (c *errorCoordinator) Summarize(context.Context, string) error { return nil } +func (c *errorCoordinator) Model() agent.Model { return agent.Model{} } +func (c *errorCoordinator) UpdateModels(context.Context) error { return nil } + +// insertRunCompleteWorkspace installs a workspace backed by a real +// app.App (so the runCompletions broker exists) with the given +// coordinator and a workspace run context derived from base. +func insertRunCompleteWorkspace(t *testing.T, b *Backend, base context.Context, coord agent.Coordinator) *Workspace { + t.Helper() + a := app.NewForTest(base) + a.AgentCoordinator = coord + t.Cleanup(a.ShutdownForTest) + ws := &Workspace{ + ID: uuid.New().String(), + Path: t.TempDir(), + resolvedPath: t.TempDir(), + clients: make(map[string]*clientState), + shutdownFn: func() {}, + } + ws.App = a + ws.ctx, ws.cancel = context.WithCancel(base) + b.mu.Lock() + b.workspaces.Set(ws.ID, ws) + b.pathIndex[ws.resolvedPath] = ws.ID + b.mu.Unlock() + return ws +} + +// TestRunAgent_PreRunErrorPublishesTerminalRunComplete proves that an +// error returned from RunAccepted before the coordinator could publish +// its own terminal event (e.g. a readyWg or UpdateModels failure, +// modeled here by a stub coordinator) still yields a reliable terminal +// RunComplete for the run's RunID. Without it, a `crush run` caller +// blocking on that RunID would hang because the lossy TypeAgentError +// event is not a guaranteed terminal signal. +func TestRunAgent_PreRunErrorPublishesTerminalRunComplete(t *testing.T) { + t.Parallel() + b, _ := newTestBackend(t) + runErr := errors.New("update models failed") + ws := insertRunCompleteWorkspace(t, b, context.Background(), &errorCoordinator{err: runErr}) + + subCtx, cancel := context.WithCancel(context.Background()) + defer cancel() + ch := ws.RunCompletions().Subscribe(subCtx) + + err := b.SendMessage(ws.ID, proto.AgentMessage{SessionID: "S1", RunID: "run-1", Prompt: "hi"}) + require.NoError(t, err) + + select { + case ev := <-ch: + require.Equal(t, "run-1", ev.Payload.RunID, + "the terminal RunComplete must carry the dispatched RunID") + require.Equal(t, "S1", ev.Payload.SessionID) + require.Equal(t, runErr.Error(), ev.Payload.Error, + "the fallback terminal event must be marked errored") + require.False(t, ev.Payload.Cancelled) + case <-time.After(2 * time.Second): + t.Fatal("no terminal RunComplete published for a pre-run error; a run waiter would hang") + } +} + +// TestRunAgent_NoFallbackWhenCoordinatorPublished ensures the fallback +// is suppressed when the coordinator already emitted the run's +// authoritative terminal RunComplete, so callers never observe a +// duplicate terminal event for the same RunID. +func TestRunAgent_NoFallbackWhenCoordinatorPublished(t *testing.T) { + t.Parallel() + b, _ := newTestBackend(t) + runErr := errors.New("stream failed after publishing terminal event") + ws := insertRunCompleteWorkspace(t, b, context.Background(), + &errorCoordinator{err: runErr, markPublished: true}) + + subCtx, cancel := context.WithCancel(context.Background()) + defer cancel() + ch := ws.RunCompletions().Subscribe(subCtx) + + err := b.SendMessage(ws.ID, proto.AgentMessage{SessionID: "S1", RunID: "run-1", Prompt: "hi"}) + require.NoError(t, err) + + // Wait for the dispatched run goroutine to return so any publish + // has already happened. + ws.runWG.Wait() + + select { + case ev := <-ch: + t.Fatalf("runAgent published a duplicate terminal RunComplete: %+v", ev.Payload) + case <-time.After(200 * time.Millisecond): + } +} + +// TestRunAgent_CancellationPublishesNoErrorTerminal verifies that a +// context.Canceled result from RunAccepted produces no errored terminal +// RunComplete from runAgent: cancellation is sessionAgent.Run's +// responsibility (it publishes the cancelled marker) and the dispatcher +// must not synthesize an error terminal for it. +func TestRunAgent_CancellationPublishesNoErrorTerminal(t *testing.T) { + t.Parallel() + b, _ := newTestBackend(t) + ws := insertRunCompleteWorkspace(t, b, context.Background(), + &errorCoordinator{err: context.Canceled}) + + subCtx, cancel := context.WithCancel(context.Background()) + defer cancel() + ch := ws.RunCompletions().Subscribe(subCtx) + + err := b.SendMessage(ws.ID, proto.AgentMessage{SessionID: "S1", RunID: "run-1", Prompt: "hi"}) + require.NoError(t, err) + + ws.runWG.Wait() + + select { + case ev := <-ch: + t.Fatalf("cancellation must not publish a terminal RunComplete: %+v", ev.Payload) + case <-time.After(200 * time.Millisecond): + } +}