1package agent
2
3import (
4 "context"
5 "testing"
6 "time"
7
8 "github.com/charmbracelet/crush/internal/agent/notify"
9 "github.com/charmbracelet/crush/internal/pubsub"
10 "github.com/stretchr/testify/require"
11)
12
13// TestSessionAgentRun_QueueStripsOnComplete verifies that when a Run
14// call is enqueued (because the session is already busy), the
15// OnComplete hook is NOT propagated onto the queued copy. The hook
16// belongs to the caller's retry/coalesce scope (typically
17// coordinator.Run) which has already returned by the time the queue
18// drains; carrying it forward would silently funnel the terminal
19// event into a closure nobody reads, and subscribers (`crush run`)
20// would hang waiting for a RunComplete that never publishes.
21func TestSessionAgentRun_QueueStripsOnComplete(t *testing.T) {
22 t.Parallel()
23
24 env := testEnv(t)
25 a := NewSessionAgent(SessionAgentOptions{
26 Sessions: env.sessions,
27 Messages: env.messages,
28 }).(*sessionAgent)
29
30 const sessionID = "queued-session"
31 // Mark the session as busy so Run takes the queue branch
32 // without needing a real model.
33 a.activeRequests.Set(sessionID, func() {})
34
35 var called bool
36 hook := func(notify.RunComplete) { called = true }
37
38 res, err := a.Run(t.Context(), SessionAgentCall{
39 SessionID: sessionID,
40 RunID: "run-xyz",
41 Prompt: "queued prompt",
42 OnComplete: hook,
43 })
44 require.NoError(t, err)
45 require.Nil(t, res, "queued Run must return (nil, nil)")
46 require.False(t, called,
47 "OnComplete must not fire on the enqueue path; the caller's scope is still live")
48
49 queued, ok := a.messageQueue.Get(sessionID)
50 require.True(t, ok)
51 require.Len(t, queued, 1)
52 require.Nil(t, queued[0].OnComplete,
53 "queued SessionAgentCall must have OnComplete stripped so the drain falls back to the default broker publish")
54 require.Equal(t, "queued prompt", queued[0].Prompt,
55 "all other fields must be preserved on the queued copy")
56 require.Equal(t, "run-xyz", queued[0].RunID,
57 "RunID must be preserved on the queued copy so the drained turn's "+
58 "RunComplete still correlates with the originating SendMessage")
59}
60
61// TestDrainQueueForStep_FiltersUnderDispatchLock verifies that the queue
62// drain evaluates the per-session cancel mark while holding the dispatch
63// mutex (canceledBySeq's documented precondition). Queued calls at or
64// below the cancel high-water mark are dropped, calls queued after the
65// cancel (higher seq) are folded, untracked enqueues (seq == 0) are
66// dropped whenever any mark is present, and the queue is cleared. These
67// calls carry no RunID, so all foldable survivors are returned for
68// folding (the existing follow-up behavior).
69func TestDrainQueueForStep_FiltersUnderDispatchLock(t *testing.T) {
70 t.Parallel()
71
72 env := testEnv(t)
73 a := NewSessionAgent(SessionAgentOptions{
74 Sessions: env.sessions,
75 Messages: env.messages,
76 }).(*sessionAgent)
77
78 const sessionID = "drain-session"
79 a.messageQueue.Set(sessionID, []SessionAgentCall{
80 {SessionID: sessionID, Prompt: "below", acceptSeq: 1},
81 {SessionID: sessionID, Prompt: "at-mark", acceptSeq: 2},
82 {SessionID: sessionID, Prompt: "after", acceptSeq: 3},
83 {SessionID: sessionID, Prompt: "untracked", acceptSeq: 0},
84 })
85 // Cancel high-water mark at seq 2: seq <= 2 and seq == 0 are covered.
86 a.cancelMark.Set(sessionID, 2)
87
88 fold, canceledWithRunID := a.drainQueueForStep(sessionID)
89
90 require.Len(t, fold, 1,
91 "only the follow-up queued after the cancel (seq > mark) must be folded")
92 require.Equal(t, "after", fold[0].Prompt)
93 require.Empty(t, canceledWithRunID,
94 "no dropped call carried a RunID, so none need a terminal RunComplete")
95
96 _, ok := a.messageQueue.Get(sessionID)
97 require.False(t, ok, "drain must clear the session message queue when nothing is kept")
98}
99
100// TestDrainQueueForStep_NoMarkFoldsAllNonRunID verifies that with no
101// cancel mark recorded, every queued call without a RunID is folded.
102func TestDrainQueueForStep_NoMarkFoldsAllNonRunID(t *testing.T) {
103 t.Parallel()
104
105 env := testEnv(t)
106 a := NewSessionAgent(SessionAgentOptions{
107 Sessions: env.sessions,
108 Messages: env.messages,
109 }).(*sessionAgent)
110
111 const sessionID = "drain-nomark"
112 a.messageQueue.Set(sessionID, []SessionAgentCall{
113 {SessionID: sessionID, Prompt: "a", acceptSeq: 0},
114 {SessionID: sessionID, Prompt: "b", acceptSeq: 5},
115 })
116
117 fold, canceledWithRunID := a.drainQueueForStep(sessionID)
118 require.Len(t, fold, 2, "no cancel mark means all non-RunID queued calls are folded")
119 require.Empty(t, canceledWithRunID)
120}
121
122// TestDrainQueueForStep_KeepsRunIDPromptsQueued is the core of fix 2: a
123// queued prompt that carries a RunID must NOT be folded into the active
124// turn. Folding it would silently absorb it into another turn and never
125// publish a RunComplete for its RunID, hanging a `crush run` caller that
126// blocks on that event. Such prompts are left in the queue so the
127// recursive run path gives each its own turn and its own RunComplete.
128// Non-RunID prompts are still folded.
129func TestDrainQueueForStep_KeepsRunIDPromptsQueued(t *testing.T) {
130 t.Parallel()
131
132 env := testEnv(t)
133 a := NewSessionAgent(SessionAgentOptions{
134 Sessions: env.sessions,
135 Messages: env.messages,
136 }).(*sessionAgent)
137
138 const sessionID = "drain-runid"
139 a.messageQueue.Set(sessionID, []SessionAgentCall{
140 {SessionID: sessionID, Prompt: "fold-me", acceptSeq: 1},
141 {SessionID: sessionID, RunID: "run-a", Prompt: "keep-me", acceptSeq: 2},
142 {SessionID: sessionID, RunID: "run-b", Prompt: "keep-me-too", acceptSeq: 3},
143 })
144
145 fold, canceledWithRunID := a.drainQueueForStep(sessionID)
146
147 require.Len(t, fold, 1, "only the non-RunID prompt is folded into the active turn")
148 require.Equal(t, "fold-me", fold[0].Prompt)
149 require.Empty(t, canceledWithRunID)
150
151 kept, ok := a.messageQueue.Get(sessionID)
152 require.True(t, ok, "RunID-bearing prompts must remain queued for the recursive run path")
153 require.Len(t, kept, 2)
154 require.Equal(t, "run-a", kept[0].RunID)
155 require.Equal(t, "run-b", kept[1].RunID)
156}
157
158// TestDrainQueueForStep_ReportsCanceledRunIDDrops verifies that a queued
159// prompt carrying a RunID that is dropped because a cancel covers it is
160// reported in canceledWithRunID so the caller can publish its terminal
161// cancelled RunComplete. A canceled prompt without a RunID is dropped
162// silently as before.
163func TestDrainQueueForStep_ReportsCanceledRunIDDrops(t *testing.T) {
164 t.Parallel()
165
166 env := testEnv(t)
167 a := NewSessionAgent(SessionAgentOptions{
168 Sessions: env.sessions,
169 Messages: env.messages,
170 }).(*sessionAgent)
171
172 const sessionID = "drain-cancel-runid"
173 a.messageQueue.Set(sessionID, []SessionAgentCall{
174 {SessionID: sessionID, RunID: "run-canceled", Prompt: "canceled", acceptSeq: 1},
175 {SessionID: sessionID, Prompt: "canceled-no-runid", acceptSeq: 1},
176 {SessionID: sessionID, RunID: "run-survives", Prompt: "survives", acceptSeq: 5},
177 })
178 a.cancelMark.Set(sessionID, 2)
179
180 fold, canceledWithRunID := a.drainQueueForStep(sessionID)
181
182 require.Empty(t, fold, "no uncanceled non-RunID prompts to fold")
183 require.Len(t, canceledWithRunID, 1,
184 "only the dropped RunID-bearing prompt needs a terminal RunComplete")
185 require.Equal(t, "run-canceled", canceledWithRunID[0].RunID)
186
187 kept, ok := a.messageQueue.Get(sessionID)
188 require.True(t, ok)
189 require.Len(t, kept, 1, "the uncanceled RunID prompt stays queued")
190 require.Equal(t, "run-survives", kept[0].RunID)
191}
192
193// TestRunCompletePublisher_MustDeliverOverTakesPublish exercises the
194// pubsub.Publisher interface change end-to-end: a Broker is the only
195// concrete Publisher implementation and must satisfy both Publish and
196// PublishMustDeliver. The coordinator's final RunComplete emit relies
197// on PublishMustDeliver to apply bounded-blocking semantics so a
198// momentarily-full subscriber buffer can't silently drop the
199// authoritative end-of-run event.
200func TestRunCompletePublisher_MustDeliverOverTakesPublish(t *testing.T) {
201 t.Parallel()
202
203 broker := pubsub.NewBroker[notify.RunComplete]()
204 t.Cleanup(broker.Shutdown)
205
206 // Subscribe before publishing so the event is delivered.
207 ctx, cancel := context.WithCancel(t.Context())
208 defer cancel()
209 ch := broker.Subscribe(ctx)
210
211 rc := notify.RunComplete{SessionID: "S", MessageID: "m", Text: "ok"}
212 var pub pubsub.Publisher[notify.RunComplete] = broker
213 pub.PublishMustDeliver(t.Context(), pubsub.UpdatedEvent, rc)
214
215 select {
216 case got := <-ch:
217 require.Equal(t, rc, got.Payload)
218 case <-time.After(time.Second):
219 t.Fatal("PublishMustDeliver did not deliver event")
220 }
221}
222
223// requireSingleCancelledRunComplete reads exactly one RunComplete from ch,
224// asserts it is the cancelled terminal event for runID, and verifies no
225// second event arrives. This observes the published pubsub event rather
226// than internal bookkeeping, which is the contract a `crush run` caller
227// blocking on the broker actually relies on.
228func requireSingleCancelledRunComplete(t *testing.T, ch <-chan pubsub.Event[notify.RunComplete], sessionID, runID string) {
229 t.Helper()
230 select {
231 case ev := <-ch:
232 require.Equal(t, runID, ev.Payload.RunID,
233 "the published RunComplete must carry the dropped queued prompt's RunID")
234 require.Equal(t, sessionID, ev.Payload.SessionID)
235 require.True(t, ev.Payload.Cancelled,
236 "a dropped queued prompt must publish a cancelled RunComplete")
237 case <-time.After(5 * time.Second):
238 t.Fatal("timed out waiting for the cancelled RunComplete")
239 }
240 select {
241 case extra := <-ch:
242 t.Fatalf("expected exactly one RunComplete, got a second: %+v", extra.Payload)
243 case <-time.After(100 * time.Millisecond):
244 }
245}
246
247// TestCancel_QueuedRunIDPromptPublishesCancelledRunComplete proves the
248// terminal-event behavior end-to-end: a RunID-bearing prompt sitting in
249// the queue that is canceled while queued (via the public Cancel path,
250// which routes through clearQueueAndNotify -> publishCanceledQueueDrops)
251// must emit exactly one cancelled RunComplete on the broker for its
252// RunID. A queued prompt without a RunID is dropped silently. This is the
253// coverage the earlier drain test lacked: it asserted the returned
254// bookkeeping slice, not the published event a `crush run` caller awaits.
255func TestCancel_QueuedRunIDPromptPublishesCancelledRunComplete(t *testing.T) {
256 t.Parallel()
257
258 env := testEnv(t)
259 broker := pubsub.NewBroker[notify.RunComplete]()
260 t.Cleanup(broker.Shutdown)
261
262 a := NewSessionAgent(SessionAgentOptions{
263 Sessions: env.sessions,
264 Messages: env.messages,
265 RunComplete: broker,
266 }).(*sessionAgent)
267
268 subCtx, subCancel := context.WithCancel(t.Context())
269 defer subCancel()
270 ch := broker.Subscribe(subCtx)
271
272 const sessionID = "cancel-queued-runid"
273 a.messageQueue.Set(sessionID, []SessionAgentCall{
274 {SessionID: sessionID, Prompt: "no-runid", acceptSeq: 1},
275 {SessionID: sessionID, RunID: "run-queued", Prompt: "queued", acceptSeq: 2},
276 })
277
278 a.Cancel(sessionID)
279
280 requireSingleCancelledRunComplete(t, ch, sessionID, "run-queued")
281
282 _, ok := a.messageQueue.Get(sessionID)
283 require.False(t, ok, "Cancel must clear the queue")
284}
285
286// TestDrainQueueForStep_DroppedRunIDPublishesCancelledRunComplete drives
287// the production drain sequence (drainQueueForStep then
288// publishCanceledQueueDrops, mirroring the PrepareStep handoff) and
289// asserts the dropped RunID-bearing prompt actually publishes exactly one
290// cancelled RunComplete on the broker. The companion bookkeeping test
291// covers the returned slice; this one covers the observable terminal
292// event.
293func TestDrainQueueForStep_DroppedRunIDPublishesCancelledRunComplete(t *testing.T) {
294 t.Parallel()
295
296 env := testEnv(t)
297 broker := pubsub.NewBroker[notify.RunComplete]()
298 t.Cleanup(broker.Shutdown)
299
300 a := NewSessionAgent(SessionAgentOptions{
301 Sessions: env.sessions,
302 Messages: env.messages,
303 RunComplete: broker,
304 }).(*sessionAgent)
305
306 subCtx, subCancel := context.WithCancel(t.Context())
307 defer subCancel()
308 ch := broker.Subscribe(subCtx)
309
310 const sessionID = "drain-drop-runid"
311 a.messageQueue.Set(sessionID, []SessionAgentCall{
312 {SessionID: sessionID, RunID: "run-dropped", Prompt: "dropped", acceptSeq: 1},
313 {SessionID: sessionID, Prompt: "dropped-no-runid", acceptSeq: 1},
314 })
315 a.cancelMark.Set(sessionID, 2)
316
317 _, canceledWithRunID := a.drainQueueForStep(sessionID)
318 require.Len(t, canceledWithRunID, 1)
319 a.publishCanceledQueueDrops(canceledWithRunID)
320
321 requireSingleCancelledRunComplete(t, ch, sessionID, "run-dropped")
322}