run_complete_test.go

  1package agent
  2
  3import (
  4	"context"
  5	"testing"
  6	"time"
  7
  8	"github.com/charmbracelet/crush/internal/agent/notify"
  9	"github.com/charmbracelet/crush/internal/pubsub"
 10	"github.com/stretchr/testify/require"
 11)
 12
 13// TestSessionAgentRun_QueueStripsOnComplete verifies that when a Run
 14// call is enqueued (because the session is already busy), the
 15// OnComplete hook is NOT propagated onto the queued copy. The hook
 16// belongs to the caller's retry/coalesce scope (typically
 17// coordinator.Run) which has already returned by the time the queue
 18// drains; carrying it forward would silently funnel the terminal
 19// event into a closure nobody reads, and subscribers (`crush run`)
 20// would hang waiting for a RunComplete that never publishes.
 21func TestSessionAgentRun_QueueStripsOnComplete(t *testing.T) {
 22	t.Parallel()
 23
 24	env := testEnv(t)
 25	a := NewSessionAgent(SessionAgentOptions{
 26		Sessions: env.sessions,
 27		Messages: env.messages,
 28	}).(*sessionAgent)
 29
 30	const sessionID = "queued-session"
 31	// Mark the session as busy so Run takes the queue branch
 32	// without needing a real model.
 33	a.activeRequests.Set(sessionID, func() {})
 34
 35	var called bool
 36	hook := func(notify.RunComplete) { called = true }
 37
 38	res, err := a.Run(t.Context(), SessionAgentCall{
 39		SessionID:  sessionID,
 40		RunID:      "run-xyz",
 41		Prompt:     "queued prompt",
 42		OnComplete: hook,
 43	})
 44	require.NoError(t, err)
 45	require.Nil(t, res, "queued Run must return (nil, nil)")
 46	require.False(t, called,
 47		"OnComplete must not fire on the enqueue path; the caller's scope is still live")
 48
 49	queued, ok := a.messageQueue.Get(sessionID)
 50	require.True(t, ok)
 51	require.Len(t, queued, 1)
 52	require.Nil(t, queued[0].OnComplete,
 53		"queued SessionAgentCall must have OnComplete stripped so the drain falls back to the default broker publish")
 54	require.Equal(t, "queued prompt", queued[0].Prompt,
 55		"all other fields must be preserved on the queued copy")
 56	require.Equal(t, "run-xyz", queued[0].RunID,
 57		"RunID must be preserved on the queued copy so the drained turn's "+
 58			"RunComplete still correlates with the originating SendMessage")
 59}
 60
 61// TestDrainQueueForStep_FiltersUnderDispatchLock verifies that the queue
 62// drain evaluates the per-session cancel mark while holding the dispatch
 63// mutex (canceledBySeq's documented precondition). Queued calls at or
 64// below the cancel high-water mark are dropped, calls queued after the
 65// cancel (higher seq) are folded, untracked enqueues (seq == 0) are
 66// dropped whenever any mark is present, and the queue is cleared. These
 67// calls carry no RunID, so all foldable survivors are returned for
 68// folding (the existing follow-up behavior).
 69func TestDrainQueueForStep_FiltersUnderDispatchLock(t *testing.T) {
 70	t.Parallel()
 71
 72	env := testEnv(t)
 73	a := NewSessionAgent(SessionAgentOptions{
 74		Sessions: env.sessions,
 75		Messages: env.messages,
 76	}).(*sessionAgent)
 77
 78	const sessionID = "drain-session"
 79	a.messageQueue.Set(sessionID, []SessionAgentCall{
 80		{SessionID: sessionID, Prompt: "below", acceptSeq: 1},
 81		{SessionID: sessionID, Prompt: "at-mark", acceptSeq: 2},
 82		{SessionID: sessionID, Prompt: "after", acceptSeq: 3},
 83		{SessionID: sessionID, Prompt: "untracked", acceptSeq: 0},
 84	})
 85	// Cancel high-water mark at seq 2: seq <= 2 and seq == 0 are covered.
 86	a.cancelMark.Set(sessionID, 2)
 87
 88	fold, canceledWithRunID := a.drainQueueForStep(sessionID)
 89
 90	require.Len(t, fold, 1,
 91		"only the follow-up queued after the cancel (seq > mark) must be folded")
 92	require.Equal(t, "after", fold[0].Prompt)
 93	require.Empty(t, canceledWithRunID,
 94		"no dropped call carried a RunID, so none need a terminal RunComplete")
 95
 96	_, ok := a.messageQueue.Get(sessionID)
 97	require.False(t, ok, "drain must clear the session message queue when nothing is kept")
 98}
 99
100// TestDrainQueueForStep_NoMarkFoldsAllNonRunID verifies that with no
101// cancel mark recorded, every queued call without a RunID is folded.
102func TestDrainQueueForStep_NoMarkFoldsAllNonRunID(t *testing.T) {
103	t.Parallel()
104
105	env := testEnv(t)
106	a := NewSessionAgent(SessionAgentOptions{
107		Sessions: env.sessions,
108		Messages: env.messages,
109	}).(*sessionAgent)
110
111	const sessionID = "drain-nomark"
112	a.messageQueue.Set(sessionID, []SessionAgentCall{
113		{SessionID: sessionID, Prompt: "a", acceptSeq: 0},
114		{SessionID: sessionID, Prompt: "b", acceptSeq: 5},
115	})
116
117	fold, canceledWithRunID := a.drainQueueForStep(sessionID)
118	require.Len(t, fold, 2, "no cancel mark means all non-RunID queued calls are folded")
119	require.Empty(t, canceledWithRunID)
120}
121
122// TestDrainQueueForStep_KeepsRunIDPromptsQueued is the core of fix 2: a
123// queued prompt that carries a RunID must NOT be folded into the active
124// turn. Folding it would silently absorb it into another turn and never
125// publish a RunComplete for its RunID, hanging a `crush run` caller that
126// blocks on that event. Such prompts are left in the queue so the
127// recursive run path gives each its own turn and its own RunComplete.
128// Non-RunID prompts are still folded.
129func TestDrainQueueForStep_KeepsRunIDPromptsQueued(t *testing.T) {
130	t.Parallel()
131
132	env := testEnv(t)
133	a := NewSessionAgent(SessionAgentOptions{
134		Sessions: env.sessions,
135		Messages: env.messages,
136	}).(*sessionAgent)
137
138	const sessionID = "drain-runid"
139	a.messageQueue.Set(sessionID, []SessionAgentCall{
140		{SessionID: sessionID, Prompt: "fold-me", acceptSeq: 1},
141		{SessionID: sessionID, RunID: "run-a", Prompt: "keep-me", acceptSeq: 2},
142		{SessionID: sessionID, RunID: "run-b", Prompt: "keep-me-too", acceptSeq: 3},
143	})
144
145	fold, canceledWithRunID := a.drainQueueForStep(sessionID)
146
147	require.Len(t, fold, 1, "only the non-RunID prompt is folded into the active turn")
148	require.Equal(t, "fold-me", fold[0].Prompt)
149	require.Empty(t, canceledWithRunID)
150
151	kept, ok := a.messageQueue.Get(sessionID)
152	require.True(t, ok, "RunID-bearing prompts must remain queued for the recursive run path")
153	require.Len(t, kept, 2)
154	require.Equal(t, "run-a", kept[0].RunID)
155	require.Equal(t, "run-b", kept[1].RunID)
156}
157
158// TestDrainQueueForStep_ReportsCanceledRunIDDrops verifies that a queued
159// prompt carrying a RunID that is dropped because a cancel covers it is
160// reported in canceledWithRunID so the caller can publish its terminal
161// cancelled RunComplete. A canceled prompt without a RunID is dropped
162// silently as before.
163func TestDrainQueueForStep_ReportsCanceledRunIDDrops(t *testing.T) {
164	t.Parallel()
165
166	env := testEnv(t)
167	a := NewSessionAgent(SessionAgentOptions{
168		Sessions: env.sessions,
169		Messages: env.messages,
170	}).(*sessionAgent)
171
172	const sessionID = "drain-cancel-runid"
173	a.messageQueue.Set(sessionID, []SessionAgentCall{
174		{SessionID: sessionID, RunID: "run-canceled", Prompt: "canceled", acceptSeq: 1},
175		{SessionID: sessionID, Prompt: "canceled-no-runid", acceptSeq: 1},
176		{SessionID: sessionID, RunID: "run-survives", Prompt: "survives", acceptSeq: 5},
177	})
178	a.cancelMark.Set(sessionID, 2)
179
180	fold, canceledWithRunID := a.drainQueueForStep(sessionID)
181
182	require.Empty(t, fold, "no uncanceled non-RunID prompts to fold")
183	require.Len(t, canceledWithRunID, 1,
184		"only the dropped RunID-bearing prompt needs a terminal RunComplete")
185	require.Equal(t, "run-canceled", canceledWithRunID[0].RunID)
186
187	kept, ok := a.messageQueue.Get(sessionID)
188	require.True(t, ok)
189	require.Len(t, kept, 1, "the uncanceled RunID prompt stays queued")
190	require.Equal(t, "run-survives", kept[0].RunID)
191}
192
193// TestRunCompletePublisher_MustDeliverOverTakesPublish exercises the
194// pubsub.Publisher interface change end-to-end: a Broker is the only
195// concrete Publisher implementation and must satisfy both Publish and
196// PublishMustDeliver. The coordinator's final RunComplete emit relies
197// on PublishMustDeliver to apply bounded-blocking semantics so a
198// momentarily-full subscriber buffer can't silently drop the
199// authoritative end-of-run event.
200func TestRunCompletePublisher_MustDeliverOverTakesPublish(t *testing.T) {
201	t.Parallel()
202
203	broker := pubsub.NewBroker[notify.RunComplete]()
204	t.Cleanup(broker.Shutdown)
205
206	// Subscribe before publishing so the event is delivered.
207	ctx, cancel := context.WithCancel(t.Context())
208	defer cancel()
209	ch := broker.Subscribe(ctx)
210
211	rc := notify.RunComplete{SessionID: "S", MessageID: "m", Text: "ok"}
212	var pub pubsub.Publisher[notify.RunComplete] = broker
213	pub.PublishMustDeliver(t.Context(), pubsub.UpdatedEvent, rc)
214
215	select {
216	case got := <-ch:
217		require.Equal(t, rc, got.Payload)
218	case <-time.After(time.Second):
219		t.Fatal("PublishMustDeliver did not deliver event")
220	}
221}
222
223// requireSingleCancelledRunComplete reads exactly one RunComplete from ch,
224// asserts it is the cancelled terminal event for runID, and verifies no
225// second event arrives. This observes the published pubsub event rather
226// than internal bookkeeping, which is the contract a `crush run` caller
227// blocking on the broker actually relies on.
228func requireSingleCancelledRunComplete(t *testing.T, ch <-chan pubsub.Event[notify.RunComplete], sessionID, runID string) {
229	t.Helper()
230	select {
231	case ev := <-ch:
232		require.Equal(t, runID, ev.Payload.RunID,
233			"the published RunComplete must carry the dropped queued prompt's RunID")
234		require.Equal(t, sessionID, ev.Payload.SessionID)
235		require.True(t, ev.Payload.Cancelled,
236			"a dropped queued prompt must publish a cancelled RunComplete")
237	case <-time.After(5 * time.Second):
238		t.Fatal("timed out waiting for the cancelled RunComplete")
239	}
240	select {
241	case extra := <-ch:
242		t.Fatalf("expected exactly one RunComplete, got a second: %+v", extra.Payload)
243	case <-time.After(100 * time.Millisecond):
244	}
245}
246
247// TestCancel_QueuedRunIDPromptPublishesCancelledRunComplete proves the
248// terminal-event behavior end-to-end: a RunID-bearing prompt sitting in
249// the queue that is canceled while queued (via the public Cancel path,
250// which routes through clearQueueAndNotify -> publishCanceledQueueDrops)
251// must emit exactly one cancelled RunComplete on the broker for its
252// RunID. A queued prompt without a RunID is dropped silently. This is the
253// coverage the earlier drain test lacked: it asserted the returned
254// bookkeeping slice, not the published event a `crush run` caller awaits.
255func TestCancel_QueuedRunIDPromptPublishesCancelledRunComplete(t *testing.T) {
256	t.Parallel()
257
258	env := testEnv(t)
259	broker := pubsub.NewBroker[notify.RunComplete]()
260	t.Cleanup(broker.Shutdown)
261
262	a := NewSessionAgent(SessionAgentOptions{
263		Sessions:    env.sessions,
264		Messages:    env.messages,
265		RunComplete: broker,
266	}).(*sessionAgent)
267
268	subCtx, subCancel := context.WithCancel(t.Context())
269	defer subCancel()
270	ch := broker.Subscribe(subCtx)
271
272	const sessionID = "cancel-queued-runid"
273	a.messageQueue.Set(sessionID, []SessionAgentCall{
274		{SessionID: sessionID, Prompt: "no-runid", acceptSeq: 1},
275		{SessionID: sessionID, RunID: "run-queued", Prompt: "queued", acceptSeq: 2},
276	})
277
278	a.Cancel(sessionID)
279
280	requireSingleCancelledRunComplete(t, ch, sessionID, "run-queued")
281
282	_, ok := a.messageQueue.Get(sessionID)
283	require.False(t, ok, "Cancel must clear the queue")
284}
285
286// TestDrainQueueForStep_DroppedRunIDPublishesCancelledRunComplete drives
287// the production drain sequence (drainQueueForStep then
288// publishCanceledQueueDrops, mirroring the PrepareStep handoff) and
289// asserts the dropped RunID-bearing prompt actually publishes exactly one
290// cancelled RunComplete on the broker. The companion bookkeeping test
291// covers the returned slice; this one covers the observable terminal
292// event.
293func TestDrainQueueForStep_DroppedRunIDPublishesCancelledRunComplete(t *testing.T) {
294	t.Parallel()
295
296	env := testEnv(t)
297	broker := pubsub.NewBroker[notify.RunComplete]()
298	t.Cleanup(broker.Shutdown)
299
300	a := NewSessionAgent(SessionAgentOptions{
301		Sessions:    env.sessions,
302		Messages:    env.messages,
303		RunComplete: broker,
304	}).(*sessionAgent)
305
306	subCtx, subCancel := context.WithCancel(t.Context())
307	defer subCancel()
308	ch := broker.Subscribe(subCtx)
309
310	const sessionID = "drain-drop-runid"
311	a.messageQueue.Set(sessionID, []SessionAgentCall{
312		{SessionID: sessionID, RunID: "run-dropped", Prompt: "dropped", acceptSeq: 1},
313		{SessionID: sessionID, Prompt: "dropped-no-runid", acceptSeq: 1},
314	})
315	a.cancelMark.Set(sessionID, 2)
316
317	_, canceledWithRunID := a.drainQueueForStep(sessionID)
318	require.Len(t, canceledWithRunID, 1)
319	a.publishCanceledQueueDrops(canceledWithRunID)
320
321	requireSingleCancelledRunComplete(t, ch, sessionID, "run-dropped")
322}