agent_runcomplete_test.go

  1package backend
  2
  3import (
  4	"context"
  5	"errors"
  6	"testing"
  7	"time"
  8
  9	"charm.land/fantasy"
 10	"github.com/charmbracelet/crush/internal/agent"
 11	"github.com/charmbracelet/crush/internal/app"
 12	"github.com/charmbracelet/crush/internal/message"
 13	"github.com/charmbracelet/crush/internal/proto"
 14	"github.com/google/uuid"
 15	"github.com/stretchr/testify/require"
 16)
 17
 18// errorCoordinator is a minimal agent.Coordinator whose RunAccepted
 19// returns a configurable error. When markPublished is true it stamps
 20// the run-complete marker on the context before returning, simulating a
 21// real coordinator that already published the run's authoritative
 22// terminal RunComplete (so runAgent must not emit a duplicate fallback).
 23type errorCoordinator struct {
 24	err           error
 25	markPublished bool
 26}
 27
 28func (c *errorCoordinator) Run(ctx context.Context, sessionID, prompt string, attachments ...message.Attachment) (*fantasy.AgentResult, error) {
 29	return nil, c.err
 30}
 31
 32func (c *errorCoordinator) RunAccepted(ctx context.Context, accept *agent.AcceptedRun, sessionID, prompt string, attachments ...message.Attachment) (*fantasy.AgentResult, error) {
 33	if c.markPublished {
 34		agent.MarkRunCompletePublished(ctx)
 35	}
 36	return nil, c.err
 37}
 38
 39func (c *errorCoordinator) BeginAccepted(sessionID string) *agent.AcceptedRun { return nil }
 40func (c *errorCoordinator) Cancel(string)                                     {}
 41func (c *errorCoordinator) CancelAll()                                        {}
 42func (c *errorCoordinator) IsBusy() bool                                      { return false }
 43func (c *errorCoordinator) IsSessionBusy(string) bool                         { return false }
 44func (c *errorCoordinator) QueuedPrompts(string) int                          { return 0 }
 45func (c *errorCoordinator) QueuedPromptsList(string) []string                 { return nil }
 46func (c *errorCoordinator) ClearQueue(string)                                 {}
 47func (c *errorCoordinator) Summarize(context.Context, string) error           { return nil }
 48func (c *errorCoordinator) Model() agent.Model                                { return agent.Model{} }
 49func (c *errorCoordinator) UpdateModels(context.Context) error                { return nil }
 50
 51// insertRunCompleteWorkspace installs a workspace backed by a real
 52// app.App (so the runCompletions broker exists) with the given
 53// coordinator and a workspace run context derived from base.
 54func insertRunCompleteWorkspace(t *testing.T, b *Backend, base context.Context, coord agent.Coordinator) *Workspace {
 55	t.Helper()
 56	a := app.NewForTest(base)
 57	a.AgentCoordinator = coord
 58	t.Cleanup(a.ShutdownForTest)
 59	ws := &Workspace{
 60		ID:           uuid.New().String(),
 61		Path:         t.TempDir(),
 62		resolvedPath: t.TempDir(),
 63		clients:      make(map[string]*clientState),
 64		shutdownFn:   func() {},
 65	}
 66	ws.App = a
 67	ws.ctx, ws.cancel = context.WithCancel(base)
 68	b.mu.Lock()
 69	b.workspaces.Set(ws.ID, ws)
 70	b.pathIndex[ws.resolvedPath] = ws.ID
 71	b.mu.Unlock()
 72	return ws
 73}
 74
 75// TestRunAgent_PreRunErrorPublishesTerminalRunComplete proves that an
 76// error returned from RunAccepted before the coordinator could publish
 77// its own terminal event (e.g. a readyWg or UpdateModels failure,
 78// modeled here by a stub coordinator) still yields a reliable terminal
 79// RunComplete for the run's RunID. Without it, a `crush run` caller
 80// blocking on that RunID would hang because the lossy TypeAgentError
 81// event is not a guaranteed terminal signal.
 82func TestRunAgent_PreRunErrorPublishesTerminalRunComplete(t *testing.T) {
 83	t.Parallel()
 84	b, _ := newTestBackend(t)
 85	runErr := errors.New("update models failed")
 86	ws := insertRunCompleteWorkspace(t, b, context.Background(), &errorCoordinator{err: runErr})
 87
 88	subCtx, cancel := context.WithCancel(context.Background())
 89	defer cancel()
 90	ch := ws.RunCompletions().Subscribe(subCtx)
 91
 92	err := b.SendMessage(ws.ID, proto.AgentMessage{SessionID: "S1", RunID: "run-1", Prompt: "hi"})
 93	require.NoError(t, err)
 94
 95	select {
 96	case ev := <-ch:
 97		require.Equal(t, "run-1", ev.Payload.RunID,
 98			"the terminal RunComplete must carry the dispatched RunID")
 99		require.Equal(t, "S1", ev.Payload.SessionID)
100		require.Equal(t, runErr.Error(), ev.Payload.Error,
101			"the fallback terminal event must be marked errored")
102		require.False(t, ev.Payload.Cancelled)
103	case <-time.After(2 * time.Second):
104		t.Fatal("no terminal RunComplete published for a pre-run error; a run waiter would hang")
105	}
106}
107
108// TestRunAgent_NoFallbackWhenCoordinatorPublished ensures the fallback
109// is suppressed when the coordinator already emitted the run's
110// authoritative terminal RunComplete, so callers never observe a
111// duplicate terminal event for the same RunID.
112func TestRunAgent_NoFallbackWhenCoordinatorPublished(t *testing.T) {
113	t.Parallel()
114	b, _ := newTestBackend(t)
115	runErr := errors.New("stream failed after publishing terminal event")
116	ws := insertRunCompleteWorkspace(t, b, context.Background(),
117		&errorCoordinator{err: runErr, markPublished: true})
118
119	subCtx, cancel := context.WithCancel(context.Background())
120	defer cancel()
121	ch := ws.RunCompletions().Subscribe(subCtx)
122
123	err := b.SendMessage(ws.ID, proto.AgentMessage{SessionID: "S1", RunID: "run-1", Prompt: "hi"})
124	require.NoError(t, err)
125
126	// Wait for the dispatched run goroutine to return so any publish
127	// has already happened.
128	ws.runWG.Wait()
129
130	select {
131	case ev := <-ch:
132		t.Fatalf("runAgent published a duplicate terminal RunComplete: %+v", ev.Payload)
133	case <-time.After(200 * time.Millisecond):
134	}
135}
136
137// TestRunAgent_CancellationPublishesNoErrorTerminal verifies that a
138// context.Canceled result from RunAccepted produces no errored terminal
139// RunComplete from runAgent: cancellation is sessionAgent.Run's
140// responsibility (it publishes the cancelled marker) and the dispatcher
141// must not synthesize an error terminal for it.
142func TestRunAgent_CancellationPublishesNoErrorTerminal(t *testing.T) {
143	t.Parallel()
144	b, _ := newTestBackend(t)
145	ws := insertRunCompleteWorkspace(t, b, context.Background(),
146		&errorCoordinator{err: context.Canceled})
147
148	subCtx, cancel := context.WithCancel(context.Background())
149	defer cancel()
150	ch := ws.RunCompletions().Subscribe(subCtx)
151
152	err := b.SendMessage(ws.ID, proto.AgentMessage{SessionID: "S1", RunID: "run-1", Prompt: "hi"})
153	require.NoError(t, err)
154
155	ws.runWG.Wait()
156
157	select {
158	case ev := <-ch:
159		t.Fatalf("cancellation must not publish a terminal RunComplete: %+v", ev.Payload)
160	case <-time.After(200 * time.Millisecond):
161	}
162}