@@ -294,6 +294,11 @@ func (c *coordinator) run(ctx context.Context, accept *AcceptedRun, sessionID st
if hasLatest && c.runComplete != nil {
c.runComplete.PublishMustDeliver(ctx, pubsub.UpdatedEvent, latest)
+ // Signal to the dispatcher (backend.runAgent) that the
+ // authoritative terminal RunComplete for this run was already
+ // emitted, so it does not publish a duplicate fallback for the
+ // error it is about to receive.
+ MarkRunCompletePublished(ctx)
}
return result, originalErr
}
@@ -0,0 +1,52 @@
+package agent
+
+import (
+ "context"
+ "sync/atomic"
+)
+
+// runCompleteMarkerKey is the unexported context key carrying a
+// [runCompleteMarker] from the dispatch boundary (backend.runAgent)
+// down into the coordinator. It lets the dispatcher learn whether the
+// coordinator already published the authoritative terminal
+// notify.RunComplete for the run, so a fallback terminal event is only
+// emitted when one is actually missing (e.g. an error returned before
+// sessionAgent.Run ever executed). It avoids a breaking change to the
+// Coordinator interface.
+type runCompleteMarkerKey struct{}
+
+// runCompleteMarker records whether a terminal RunComplete has been
+// published for a run. It is shared by pointer through the context so
+// a publish deep in the call stack is observable by the dispatcher
+// after the call returns.
+type runCompleteMarker struct {
+ published atomic.Bool
+}
+
+// WithRunCompleteMarker returns ctx carrying a fresh marker the
+// coordinator can flag via [MarkRunCompletePublished] once it emits the
+// run's terminal RunComplete. Callers read the result with
+// [RunCompletePublished]. Attaching the marker is optional: code paths
+// without one simply skip the dedup signal.
+func WithRunCompleteMarker(ctx context.Context) context.Context {
+ return context.WithValue(ctx, runCompleteMarkerKey{}, &runCompleteMarker{})
+}
+
+// MarkRunCompletePublished records that the authoritative terminal
+// RunComplete has been published for the run carried by ctx. It is a
+// no-op when no marker is present (e.g. the in-process/local Run path,
+// which is not dispatched through backend.runAgent).
+func MarkRunCompletePublished(ctx context.Context) {
+ if m, ok := ctx.Value(runCompleteMarkerKey{}).(*runCompleteMarker); ok {
+ m.published.Store(true)
+ }
+}
+
+// RunCompletePublished reports whether [MarkRunCompletePublished] was
+// called on ctx's marker. It returns false when no marker is present.
+func RunCompletePublished(ctx context.Context) bool {
+ if m, ok := ctx.Value(runCompleteMarkerKey{}).(*runCompleteMarker); ok {
+ return m.published.Load()
+ }
+ return false
+}
@@ -61,14 +61,27 @@ func (b *Backend) SendMessage(workspaceID string, msg proto.AgentMessage) error
// runAgent executes an accepted agent run for the workspace. It owns the
// accept reservation (releasing it on return) and the runWG ticket added
// by SendMessage. The run is bound to the workspace context so its
-// lifetime is independent of any client's HTTP request. On a non-cancel
-// error it surfaces the failure to observers via a notify.TypeAgentError
-// notification; context.Canceled is expected (the FinishReasonCanceled
-// marker is already published by sessionAgent.Run) and swallowed.
+// lifetime is independent of any client's HTTP request.
+//
+// On a non-cancel error it surfaces the failure to observers via a
+// notify.TypeAgentError notification (lossy, best-effort). That alone is
+// not a reliable terminal signal: the agent-event fan-in uses lossy
+// subscribers, so a `crush run` caller blocking on its RunID could hang
+// if the event is dropped. To guarantee termination, when msg.RunID is
+// non-empty and the coordinator did not already publish the run's
+// authoritative terminal RunComplete (e.g. the error was returned before
+// sessionAgent.Run executed, such as a readyWg or UpdateModels failure),
+// runAgent emits an errored RunComplete on the must-deliver
+// runCompletions broker so the waiter observes a deterministic terminal
+// event. context.Canceled is expected (sessionAgent.Run already
+// publishes the cancelled terminal marker) and produces no error
+// terminal event.
//
// When msg.RunID is non-empty it is attached to the context via
// agent.WithRunID so the coordinator can stamp the terminal
-// notify.RunComplete event with that correlator.
+// notify.RunComplete event with that correlator. A run-complete marker
+// is also attached so the coordinator can report whether it published
+// the terminal event, letting runAgent avoid a duplicate fallback.
func (b *Backend) runAgent(ws *Workspace, msg proto.AgentMessage, accept *agent.AcceptedRun) {
defer ws.runWG.Done()
defer accept.Close()
@@ -77,6 +90,7 @@ func (b *Backend) runAgent(ws *Workspace, msg proto.AgentMessage, accept *agent.
if msg.RunID != "" {
ctx = agent.WithRunID(ctx, msg.RunID)
}
+ ctx = agent.WithRunCompleteMarker(ctx)
_, err := ws.AgentCoordinator.RunAccepted(ctx, accept, msg.SessionID, msg.Prompt, proto.AttachmentsToMessage(msg.Attachments)...)
if err == nil || errors.Is(err, context.Canceled) {
@@ -89,6 +103,20 @@ func (b *Backend) runAgent(ws *Workspace, msg proto.AgentMessage, accept *agent.
Type: notify.TypeAgentError,
Message: err.Error(),
})
+
+ // Reliable terminal fallback. Only needed when a RunID waiter
+ // exists and the coordinator has not already emitted the run's
+ // terminal RunComplete; otherwise this would be a duplicate.
+ if msg.RunID == "" || agent.RunCompletePublished(ctx) {
+ return
+ }
+ if rc := ws.RunCompletions(); rc != nil {
+ rc.PublishMustDeliver(ctx, pubsub.UpdatedEvent, notify.RunComplete{
+ SessionID: msg.SessionID,
+ RunID: msg.RunID,
+ Error: err.Error(),
+ })
+ }
}
// GetAgentInfo returns the agent's model and busy status.
@@ -0,0 +1,162 @@
+package backend
+
+import (
+ "context"
+ "errors"
+ "testing"
+ "time"
+
+ "charm.land/fantasy"
+ "github.com/charmbracelet/crush/internal/agent"
+ "github.com/charmbracelet/crush/internal/app"
+ "github.com/charmbracelet/crush/internal/message"
+ "github.com/charmbracelet/crush/internal/proto"
+ "github.com/google/uuid"
+ "github.com/stretchr/testify/require"
+)
+
+// errorCoordinator is a minimal agent.Coordinator whose RunAccepted
+// returns a configurable error. When markPublished is true it stamps
+// the run-complete marker on the context before returning, simulating a
+// real coordinator that already published the run's authoritative
+// terminal RunComplete (so runAgent must not emit a duplicate fallback).
+type errorCoordinator struct {
+ err error
+ markPublished bool
+}
+
+func (c *errorCoordinator) Run(ctx context.Context, sessionID, prompt string, attachments ...message.Attachment) (*fantasy.AgentResult, error) {
+ return nil, c.err
+}
+
+func (c *errorCoordinator) RunAccepted(ctx context.Context, accept *agent.AcceptedRun, sessionID, prompt string, attachments ...message.Attachment) (*fantasy.AgentResult, error) {
+ if c.markPublished {
+ agent.MarkRunCompletePublished(ctx)
+ }
+ return nil, c.err
+}
+
+func (c *errorCoordinator) BeginAccepted(sessionID string) *agent.AcceptedRun { return nil }
+func (c *errorCoordinator) Cancel(string) {}
+func (c *errorCoordinator) CancelAll() {}
+func (c *errorCoordinator) IsBusy() bool { return false }
+func (c *errorCoordinator) IsSessionBusy(string) bool { return false }
+func (c *errorCoordinator) QueuedPrompts(string) int { return 0 }
+func (c *errorCoordinator) QueuedPromptsList(string) []string { return nil }
+func (c *errorCoordinator) ClearQueue(string) {}
+func (c *errorCoordinator) Summarize(context.Context, string) error { return nil }
+func (c *errorCoordinator) Model() agent.Model { return agent.Model{} }
+func (c *errorCoordinator) UpdateModels(context.Context) error { return nil }
+
+// insertRunCompleteWorkspace installs a workspace backed by a real
+// app.App (so the runCompletions broker exists) with the given
+// coordinator and a workspace run context derived from base.
+func insertRunCompleteWorkspace(t *testing.T, b *Backend, base context.Context, coord agent.Coordinator) *Workspace {
+ t.Helper()
+ a := app.NewForTest(base)
+ a.AgentCoordinator = coord
+ t.Cleanup(a.ShutdownForTest)
+ ws := &Workspace{
+ ID: uuid.New().String(),
+ Path: t.TempDir(),
+ resolvedPath: t.TempDir(),
+ clients: make(map[string]*clientState),
+ shutdownFn: func() {},
+ }
+ ws.App = a
+ ws.ctx, ws.cancel = context.WithCancel(base)
+ b.mu.Lock()
+ b.workspaces.Set(ws.ID, ws)
+ b.pathIndex[ws.resolvedPath] = ws.ID
+ b.mu.Unlock()
+ return ws
+}
+
+// TestRunAgent_PreRunErrorPublishesTerminalRunComplete proves that an
+// error returned from RunAccepted before the coordinator could publish
+// its own terminal event (e.g. a readyWg or UpdateModels failure,
+// modeled here by a stub coordinator) still yields a reliable terminal
+// RunComplete for the run's RunID. Without it, a `crush run` caller
+// blocking on that RunID would hang because the lossy TypeAgentError
+// event is not a guaranteed terminal signal.
+func TestRunAgent_PreRunErrorPublishesTerminalRunComplete(t *testing.T) {
+ t.Parallel()
+ b, _ := newTestBackend(t)
+ runErr := errors.New("update models failed")
+ ws := insertRunCompleteWorkspace(t, b, context.Background(), &errorCoordinator{err: runErr})
+
+ subCtx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+ ch := ws.RunCompletions().Subscribe(subCtx)
+
+ err := b.SendMessage(ws.ID, proto.AgentMessage{SessionID: "S1", RunID: "run-1", Prompt: "hi"})
+ require.NoError(t, err)
+
+ select {
+ case ev := <-ch:
+ require.Equal(t, "run-1", ev.Payload.RunID,
+ "the terminal RunComplete must carry the dispatched RunID")
+ require.Equal(t, "S1", ev.Payload.SessionID)
+ require.Equal(t, runErr.Error(), ev.Payload.Error,
+ "the fallback terminal event must be marked errored")
+ require.False(t, ev.Payload.Cancelled)
+ case <-time.After(2 * time.Second):
+ t.Fatal("no terminal RunComplete published for a pre-run error; a run waiter would hang")
+ }
+}
+
+// TestRunAgent_NoFallbackWhenCoordinatorPublished ensures the fallback
+// is suppressed when the coordinator already emitted the run's
+// authoritative terminal RunComplete, so callers never observe a
+// duplicate terminal event for the same RunID.
+func TestRunAgent_NoFallbackWhenCoordinatorPublished(t *testing.T) {
+ t.Parallel()
+ b, _ := newTestBackend(t)
+ runErr := errors.New("stream failed after publishing terminal event")
+ ws := insertRunCompleteWorkspace(t, b, context.Background(),
+ &errorCoordinator{err: runErr, markPublished: true})
+
+ subCtx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+ ch := ws.RunCompletions().Subscribe(subCtx)
+
+ err := b.SendMessage(ws.ID, proto.AgentMessage{SessionID: "S1", RunID: "run-1", Prompt: "hi"})
+ require.NoError(t, err)
+
+ // Wait for the dispatched run goroutine to return so any publish
+ // has already happened.
+ ws.runWG.Wait()
+
+ select {
+ case ev := <-ch:
+ t.Fatalf("runAgent published a duplicate terminal RunComplete: %+v", ev.Payload)
+ case <-time.After(200 * time.Millisecond):
+ }
+}
+
+// TestRunAgent_CancellationPublishesNoErrorTerminal verifies that a
+// context.Canceled result from RunAccepted produces no errored terminal
+// RunComplete from runAgent: cancellation is sessionAgent.Run's
+// responsibility (it publishes the cancelled marker) and the dispatcher
+// must not synthesize an error terminal for it.
+func TestRunAgent_CancellationPublishesNoErrorTerminal(t *testing.T) {
+ t.Parallel()
+ b, _ := newTestBackend(t)
+ ws := insertRunCompleteWorkspace(t, b, context.Background(),
+ &errorCoordinator{err: context.Canceled})
+
+ subCtx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+ ch := ws.RunCompletions().Subscribe(subCtx)
+
+ err := b.SendMessage(ws.ID, proto.AgentMessage{SessionID: "S1", RunID: "run-1", Prompt: "hi"})
+ require.NoError(t, err)
+
+ ws.runWG.Wait()
+
+ select {
+ case ev := <-ch:
+ t.Fatalf("cancellation must not publish a terminal RunComplete: %+v", ev.Payload)
+ case <-time.After(200 * time.Millisecond):
+ }
+}