1package server
2
3import (
4 "bytes"
5 "context"
6 "encoding/json"
7 "fmt"
8 "io"
9 "net/http"
10 "sync"
11 "sync/atomic"
12 "testing"
13 "time"
14
15 "charm.land/fantasy"
16 "github.com/charmbracelet/crush/internal/agent"
17 "github.com/charmbracelet/crush/internal/app"
18 "github.com/charmbracelet/crush/internal/backend"
19 "github.com/charmbracelet/crush/internal/message"
20 "github.com/charmbracelet/crush/internal/proto"
21 "github.com/charmbracelet/crush/internal/pubsub"
22 "github.com/google/uuid"
23 "github.com/stretchr/testify/require"
24)
25
26// scriptedCoordinator is an agent.Coordinator stub that mimics the
27// externally-observable contract of a real run over the SSE pipeline
28// without booting a real model, database, or scheduler. It publishes a
29// user message when a run begins and an assistant message (with the
30// appropriate FinishReason) when the run ends, exactly the way the real
31// sessionAgent.Run surfaces a turn to SSE subscribers.
32//
33// A run blocks until either its per-session context is canceled (via
34// Cancel, mirroring the explicit cancel endpoint) or the test releases
35// it. On cancel it emits a FinishReasonCanceled assistant message and
36// returns context.Canceled (which backend.runAgent swallows, so no
37// AgentEvent error is published). On normal release it emits a
38// FinishReasonEndTurn assistant message and returns nil.
39//
40// The internal scheduler signal points the PLAN's e2e cases reference
41// (e.g. "before registration in activeRequests", "between
42// activeRequests.Set and assistant create") are not exposed by the
43// codebase, so this stub reproduces the documented black-box outcome by
44// controlling run timing directly through blockEntered / release.
45type scriptedCoordinator struct {
46 app *app.App
47
48 // blockEntered, when non-nil, is signaled (once) right after a run
49 // is entered and before the user message is emitted, letting a test
50 // interleave a cancel with the dispatched goroutine.
51 blockEntered chan struct{}
52
53 mu sync.Mutex
54 // cancels holds the cancel func for every in-flight run, keyed by a
55 // monotonic id so concurrent runs for the same session each get their
56 // own entry (a map keyed only by sessionID would let a second run
57 // overwrite the first's cancel func and leak it).
58 cancels map[int64]sessionCancel
59 // pendingCancels counts cancels that arrived for a session while a run
60 // was in flight; a run for that session consumes one on entry and
61 // cancels itself, modeling the cancel-on-entry path a follow-up takes.
62 pendingCancels map[string]int
63 nextRunID int64
64 // entered carries the monotonic run id assigned to each run as it is
65 // entered, so a test can correlate a later assistant message back to a
66 // specific run (run 1 vs an accepted follow-up).
67 entered chan int64
68 runStarts atomic.Int32
69
70 release chan struct{}
71}
72
73type sessionCancel struct {
74 sessionID string
75 cancel context.CancelFunc
76}
77
78func newScriptedCoordinator(a *app.App) *scriptedCoordinator {
79 return &scriptedCoordinator{
80 app: a,
81 cancels: make(map[int64]sessionCancel),
82 pendingCancels: make(map[string]int),
83 entered: make(chan int64, 8),
84 release: make(chan struct{}),
85 }
86}
87
88func (c *scriptedCoordinator) emitUser(sessionID, id string) {
89 c.app.SendEvent(pubsub.Event[message.Message]{
90 Type: pubsub.CreatedEvent,
91 Payload: message.Message{
92 ID: id,
93 SessionID: sessionID,
94 Role: message.User,
95 Parts: []message.ContentPart{message.TextContent{Text: "hi"}},
96 },
97 })
98}
99
100func (c *scriptedCoordinator) emitAssistant(sessionID, id string, reason message.FinishReason) {
101 c.app.SendEvent(pubsub.Event[message.Message]{
102 Type: pubsub.CreatedEvent,
103 Payload: message.Message{
104 ID: id,
105 SessionID: sessionID,
106 Role: message.Assistant,
107 Parts: []message.ContentPart{message.Finish{Reason: reason}},
108 },
109 })
110}
111
112func (c *scriptedCoordinator) Run(ctx context.Context, sessionID, prompt string, attachments ...message.Attachment) (*fantasy.AgentResult, error) {
113 c.runStarts.Add(1)
114 runCtx, cancel := context.WithCancel(ctx)
115
116 c.mu.Lock()
117 id := c.nextRunID
118 c.nextRunID++
119 c.cancels[id] = sessionCancel{sessionID: sessionID, cancel: cancel}
120 // Cancel-on-entry: if a cancel for this session arrived while this
121 // run was still being dispatched (no run yet in flight to receive
122 // it), consume the pending cancel now so the run takes the canceled
123 // path instead of streaming output.
124 if c.pendingCancels[sessionID] > 0 {
125 c.pendingCancels[sessionID]--
126 cancel()
127 }
128 c.mu.Unlock()
129
130 select {
131 case c.entered <- id:
132 default:
133 }
134
135 if c.blockEntered != nil {
136 select {
137 case <-c.blockEntered:
138 case <-runCtx.Done():
139 }
140 }
141
142 defer func() {
143 c.mu.Lock()
144 delete(c.cancels, id)
145 c.mu.Unlock()
146 cancel()
147 }()
148
149 // Qualify the emitted message ids with the run id so a test can
150 // attribute an assistant message to the exact run that produced it
151 // (run 1 vs an accepted follow-up sharing the same session).
152 userID := fmt.Sprintf("u-%s-%d", sessionID, id)
153 asstID := fmt.Sprintf("a-%s-%d", sessionID, id)
154
155 c.emitUser(sessionID, userID)
156
157 // Cancellation takes priority: if the run was already canceled it
158 // must take the canceled path even when release is closed, so a
159 // canceled run never races into a normal FinishReasonEndTurn.
160 select {
161 case <-runCtx.Done():
162 c.emitAssistant(sessionID, asstID, message.FinishReasonCanceled)
163 return nil, context.Canceled
164 default:
165 }
166
167 select {
168 case <-c.release:
169 c.emitAssistant(sessionID, asstID, message.FinishReasonEndTurn)
170 return nil, nil
171 case <-runCtx.Done():
172 c.emitAssistant(sessionID, asstID, message.FinishReasonCanceled)
173 return nil, context.Canceled
174 }
175}
176
177func (c *scriptedCoordinator) RunAccepted(ctx context.Context, accept *agent.AcceptedRun, sessionID, prompt string, attachments ...message.Attachment) (*fantasy.AgentResult, error) {
178 return c.Run(ctx, sessionID, prompt, attachments...)
179}
180
181func (c *scriptedCoordinator) BeginAccepted(string) *agent.AcceptedRun { return nil }
182
183func (c *scriptedCoordinator) Cancel(sessionID string) {
184 c.mu.Lock()
185 defer c.mu.Unlock()
186 // Cancel every in-flight run for this session. Concurrent runs for
187 // the same session (an active run plus an accepted follow-up still
188 // dispatching) each hold their own entry, so all of them are torn
189 // down by a single per-session cancel.
190 var canceled int
191 for _, sc := range c.cancels {
192 if sc.sessionID == sessionID {
193 sc.cancel()
194 canceled++
195 }
196 }
197 // If at least one run was in flight, arm a pending cancel so a
198 // follow-up that has been accepted but not yet entered Run takes the
199 // cancel-on-entry path. With no run in flight this is a no-op,
200 // mirroring the production guarantee that an idle cancel does not arm
201 // a pending cancel against the next prompt.
202 if canceled > 0 {
203 c.pendingCancels[sessionID]++
204 }
205}
206
207func (c *scriptedCoordinator) CancelAll() {
208 c.mu.Lock()
209 defer c.mu.Unlock()
210 for _, sc := range c.cancels {
211 sc.cancel()
212 }
213}
214
215func (c *scriptedCoordinator) IsBusy() bool { return false }
216func (c *scriptedCoordinator) IsSessionBusy(string) bool { return false }
217func (c *scriptedCoordinator) QueuedPrompts(string) int { return 0 }
218func (c *scriptedCoordinator) QueuedPromptsList(string) []string { return nil }
219func (c *scriptedCoordinator) ClearQueue(string) {}
220func (c *scriptedCoordinator) Summarize(context.Context, string) error { return nil }
221func (c *scriptedCoordinator) Model() agent.Model { return agent.Model{} }
222func (c *scriptedCoordinator) UpdateModels(context.Context) error { return nil }
223
224// agentE2EHarness extends the SSE harness with a scripted coordinator
225// wired into the workspace's embedded app.App, so POST /agent drives a
226// real backend.SendMessage dispatch whose emitted user/assistant
227// messages fan out over the same SSE pipeline production uses.
228type agentE2EHarness struct {
229 *e2eHarness
230 coord *scriptedCoordinator
231}
232
233func newAgentE2EHarness(t *testing.T) *agentE2EHarness {
234 t.Helper()
235
236 h := &e2eHarness{}
237
238 appCtx, cancel := context.WithCancel(context.Background())
239 a := app.NewForTest(appCtx)
240 coord := newScriptedCoordinator(a)
241 a.AgentCoordinator = coord
242 t.Cleanup(func() {
243 cancel()
244 a.ShutdownForTest()
245 })
246
247 h.installServer(t)
248
249 ws := &backend.Workspace{
250 ID: uuid.New().String(),
251 Path: t.TempDir(),
252 App: a,
253 }
254 backend.SetWorkspaceShutdownFnForTest(ws, func() {})
255 backend.InsertWorkspaceForTest(h.backend, ws)
256
257 h.workspace = ws
258 h.app = a
259 return &agentE2EHarness{e2eHarness: h, coord: coord}
260}
261
262// postAgentHTTP drives POST /v1/workspaces/{id}/agent over the harness's
263// httptest server and returns the status code.
264func (h *agentE2EHarness) postAgentHTTP(t *testing.T, ctx context.Context, sessionID string) int {
265 t.Helper()
266 body, err := json.Marshal(proto.AgentMessage{SessionID: sessionID, Prompt: "hi"})
267 require.NoError(t, err)
268 req, err := http.NewRequestWithContext(ctx, http.MethodPost,
269 h.httpSrv.URL+"/v1/workspaces/"+h.workspace.ID+"/agent", bytes.NewReader(body))
270 require.NoError(t, err)
271 req.Header.Set("Content-Type", "application/json")
272 resp, err := h.httpSrv.Client().Do(req)
273 require.NoError(t, err)
274 _, _ = io.Copy(io.Discard, resp.Body)
275 resp.Body.Close()
276 return resp.StatusCode
277}
278
279// cancelAgentHTTP drives POST /v1/workspaces/{id}/agent/sessions/{sid}/cancel.
280func (h *agentE2EHarness) cancelAgentHTTP(t *testing.T, ctx context.Context, sessionID string) int {
281 t.Helper()
282 req, err := http.NewRequestWithContext(ctx, http.MethodPost,
283 h.httpSrv.URL+"/v1/workspaces/"+h.workspace.ID+"/agent/sessions/"+sessionID+"/cancel", nil)
284 require.NoError(t, err)
285 resp, err := h.httpSrv.Client().Do(req)
286 require.NoError(t, err)
287 _, _ = io.Copy(io.Discard, resp.Body)
288 resp.Body.Close()
289 return resp.StatusCode
290}
291
292// waitForRunEntered blocks until a dispatched run for any session has
293// been entered by the scripted coordinator, or fails the test. It
294// returns the monotonic run id assigned to that run so a caller can
295// correlate it with a later assistant message; callers that don't need
296// the id can ignore the return value.
297func (h *agentE2EHarness) waitForRunEntered(t *testing.T) int64 {
298 t.Helper()
299 select {
300 case id := <-h.coord.entered:
301 return id
302 case <-time.After(2 * time.Second):
303 t.Fatal("dispatched run was never entered")
304 return 0
305 }
306}
307
308// finishReason extracts the assistant message's FinishReason, if any.
309func finishReason(m proto.Message) (proto.FinishReason, bool) {
310 for _, p := range m.Parts {
311 if f, ok := p.(proto.Finish); ok {
312 return f.Reason, true
313 }
314 }
315 return "", false
316}
317
318// TestE2E_CancelByOtherClientDoesNotErrorPrompter covers PLAN Tests ->
319// New end-to-end coverage item 1: a second client canceling a run does
320// not surface a server error to the prompter; the run ends with a
321// FinishReasonCanceled assistant message and no AgentEvent carries a
322// non-nil Error.
323func TestE2E_CancelByOtherClientDoesNotErrorPrompter(t *testing.T) {
324 t.Parallel()
325 h := newAgentE2EHarness(t)
326 ctx, cancel := context.WithCancel(t.Context())
327 t.Cleanup(cancel)
328
329 cidA := uuid.New().String()
330 cidB := uuid.New().String()
331 evcA, cancelA := h.subscribeSSE(t, ctx, h.workspace.ID, cidA)
332 t.Cleanup(cancelA)
333 evcB, cancelB := h.subscribeSSE(t, ctx, h.workspace.ID, cidB)
334 t.Cleanup(cancelB)
335 h.waitForAttached(t, 2)
336
337 const sid = "s-cancel-other"
338
339 // A posts a long-running prompt; the handler must return 202
340 // immediately (the run blocks in the coordinator).
341 require.Equal(t, http.StatusAccepted, h.postAgentHTTP(t, ctx, sid))
342 h.waitForRunEntered(t)
343
344 // B cancels.
345 require.Equal(t, http.StatusOK, h.cancelAgentHTTP(t, ctx, sid))
346
347 // A's SSE stream receives the FinishReasonCanceled assistant
348 // message.
349 pickCtx, pickCancel := context.WithTimeout(ctx, 3*time.Second)
350 defer pickCancel()
351 got, ok := drainUntil(pickCtx, evcA, func(e pubsub.Event[proto.Message]) bool {
352 r, has := finishReason(e.Payload)
353 return e.Payload.Role == proto.Assistant && has && r == proto.FinishReasonCanceled
354 })
355 require.True(t, ok, "client A must observe a FinishReasonCanceled assistant message")
356 require.Equal(t, sid, got.Payload.SessionID)
357
358 // No AgentEvent error reaches A (cancel is not a server error).
359 errCtx, errCancel := context.WithTimeout(ctx, 250*time.Millisecond)
360 defer errCancel()
361 _, gotErrA := drainUntil(errCtx, evcA, func(e pubsub.Event[proto.AgentEvent]) bool {
362 return e.Payload.Type == proto.AgentEventTypeError && e.Payload.Error != nil
363 })
364 require.False(t, gotErrA, "cancel must not surface an AgentEvent error to the prompter")
365
366 // And no AgentEvent error reaches the canceling client B either; the
367 // PLAN requires that *no* client observes a non-nil Error.
368 errCtxB, errCancelB := context.WithTimeout(ctx, 250*time.Millisecond)
369 defer errCancelB()
370 _, gotErrB := drainUntil(errCtxB, evcB, func(e pubsub.Event[proto.AgentEvent]) bool {
371 return e.Payload.Type == proto.AgentEventTypeError && e.Payload.Error != nil
372 })
373 require.False(t, gotErrB, "cancel must not surface an AgentEvent error to any client")
374}
375
376// TestE2E_CancelImmediatelyAfter202IsNotLost covers PLAN item 1a: a
377// cancel that races a freshly-dispatched run (before it would emit any
378// output) is not lost. The run takes the cancel-on-entry path and emits
379// a user message followed by a FinishReasonCanceled assistant message
380// rather than streaming model output.
381func TestE2E_CancelImmediatelyAfter202IsNotLost(t *testing.T) {
382 t.Parallel()
383 h := newAgentE2EHarness(t)
384 // Gate the run on a signal the test controls so the cancel can be
385 // observed while the dispatched goroutine is parked at entry.
386 h.coord.blockEntered = make(chan struct{})
387
388 ctx, cancel := context.WithCancel(t.Context())
389 t.Cleanup(cancel)
390
391 cid := uuid.New().String()
392 evc, cancelSSE := h.subscribeSSE(t, ctx, h.workspace.ID, cid)
393 t.Cleanup(cancelSSE)
394 h.waitForAttached(t, 1)
395
396 const sid = "s-race-cancel"
397 require.Equal(t, http.StatusAccepted, h.postAgentHTTP(t, ctx, sid))
398 h.waitForRunEntered(t)
399
400 // Cancel while the run is still blocked at entry, then release it.
401 require.Equal(t, http.StatusOK, h.cancelAgentHTTP(t, ctx, sid))
402 close(h.coord.blockEntered)
403
404 pickCtx, pickCancel := context.WithTimeout(ctx, 3*time.Second)
405 defer pickCancel()
406
407 gotUser, okUser := drainUntil(pickCtx, evc, func(e pubsub.Event[proto.Message]) bool {
408 return e.Payload.Role == proto.User && e.Payload.SessionID == sid
409 })
410 require.True(t, okUser, "the canceled turn must still record a user message")
411 require.Equal(t, sid, gotUser.Payload.SessionID)
412
413 gotAsst, okAsst := drainUntil(pickCtx, evc, func(e pubsub.Event[proto.Message]) bool {
414 r, has := finishReason(e.Payload)
415 return e.Payload.Role == proto.Assistant && has && r == proto.FinishReasonCanceled
416 })
417 require.True(t, okAsst, "the canceled turn must end with a FinishReasonCanceled assistant message")
418 require.Equal(t, sid, gotAsst.Payload.SessionID)
419}
420
421// TestE2E_IdleCancelDoesNotPoisonNextPrompt covers PLAN item 1b: an
422// idle cancel (no active run) must not poison the next prompt. With the
423// scripted coordinator the cancel records a pending entry only if a run
424// is in flight; an idle cancel records one, but the documented
425// guarantee is that the *next* prompt's outcome is observable. Here we
426// assert the regression-relevant external behavior: after an idle
427// cancel, a subsequent normal prompt is able to run and emit output.
428//
429// NOTE: This is a simplified version. The real "idle Escape must not
430// poison" guarantee lives inside sessionAgent.Cancel's acceptedRuns
431// gating, which is covered by the agent unit tests; the e2e stub cannot
432// distinguish "truly idle" from "accepted but not yet running" without
433// the internal acceptedRuns signal. See test summary.
434func TestE2E_IdleCancelDoesNotPoisonNextPrompt(t *testing.T) {
435 t.Parallel()
436 h := newAgentE2EHarness(t)
437 ctx, cancel := context.WithCancel(t.Context())
438 t.Cleanup(cancel)
439
440 cid := uuid.New().String()
441 evc, cancelSSE := h.subscribeSSE(t, ctx, h.workspace.ID, cid)
442 t.Cleanup(cancelSSE)
443 h.waitForAttached(t, 1)
444
445 const sid = "s-idle-cancel"
446
447 // Idle cancel: no run in flight. The scripted coordinator drops it
448 // (no pending cancel recorded for a session that has no run), which
449 // models the production guarantee that an idle Escape does not arm
450 // a cancel against the next prompt.
451 require.Equal(t, http.StatusOK, h.cancelAgentHTTP(t, ctx, sid))
452
453 // Now a normal prompt; release it so it finishes successfully.
454 require.Equal(t, http.StatusAccepted, h.postAgentHTTP(t, ctx, sid))
455 h.waitForRunEntered(t)
456 close(h.coord.release)
457
458 pickCtx, pickCancel := context.WithTimeout(ctx, 3*time.Second)
459 defer pickCancel()
460 got, ok := drainUntil(pickCtx, evc, func(e pubsub.Event[proto.Message]) bool {
461 r, has := finishReason(e.Payload)
462 return e.Payload.Role == proto.Assistant && has && r == proto.FinishReasonEndTurn
463 })
464 require.True(t, ok, "the next prompt after an idle cancel must run to FinishReasonEndTurn")
465 require.Equal(t, sid, got.Payload.SessionID)
466
467 // And it must not be marked canceled.
468 canCtx, canCancel := context.WithTimeout(ctx, 200*time.Millisecond)
469 defer canCancel()
470 _, gotCanceled := drainUntil(canCtx, evc, func(e pubsub.Event[proto.Message]) bool {
471 r, has := finishReason(e.Payload)
472 return e.Payload.Role == proto.Assistant && has && r == proto.FinishReasonCanceled
473 })
474 require.False(t, gotCanceled, "an idle cancel must not produce a FinishReasonCanceled marker on the next prompt")
475}
476
477// TestE2E_CancelBetweenActiveSetAndAssistantCreate covers PLAN item 1d:
478// a cancel that arrives after the run has begun but before it would
479// create the assistant message must still produce a user message and a
480// FinishReasonCanceled assistant message, never a silent return. The
481// blockEntered gate parks the run after entry (modeling the window
482// between activeRequests.Set and assistant creation).
483func TestE2E_CancelBetweenActiveSetAndAssistantCreate(t *testing.T) {
484 t.Parallel()
485 h := newAgentE2EHarness(t)
486 h.coord.blockEntered = make(chan struct{})
487
488 ctx, cancel := context.WithCancel(t.Context())
489 t.Cleanup(cancel)
490
491 cid := uuid.New().String()
492 evc, cancelSSE := h.subscribeSSE(t, ctx, h.workspace.ID, cid)
493 t.Cleanup(cancelSSE)
494 h.waitForAttached(t, 1)
495
496 const sid = "s-mid-window"
497 require.Equal(t, http.StatusAccepted, h.postAgentHTTP(t, ctx, sid))
498 h.waitForRunEntered(t)
499
500 // Cancel while parked at entry; then release so the run proceeds
501 // into its cancel branch (runCtx already canceled).
502 require.Equal(t, http.StatusOK, h.cancelAgentHTTP(t, ctx, sid))
503 close(h.coord.blockEntered)
504
505 pickCtx, pickCancel := context.WithTimeout(ctx, 3*time.Second)
506 defer pickCancel()
507
508 _, okUser := drainUntil(pickCtx, evc, func(e pubsub.Event[proto.Message]) bool {
509 return e.Payload.Role == proto.User && e.Payload.SessionID == sid
510 })
511 require.True(t, okUser, "a user message must be recorded for the canceled turn")
512
513 gotAsst, okAsst := drainUntil(pickCtx, evc, func(e pubsub.Event[proto.Message]) bool {
514 r, has := finishReason(e.Payload)
515 return e.Payload.Role == proto.Assistant && has && r == proto.FinishReasonCanceled
516 })
517 require.True(t, okAsst, "the run must not return silently; it must emit a FinishReasonCanceled assistant message")
518 require.Equal(t, sid, gotAsst.Payload.SessionID)
519
520 // No AgentEvent error is published: a cancel in the
521 // activeRequests.Set -> assistant-create window is not a server
522 // error.
523 errCtx, errCancel := context.WithTimeout(ctx, 250*time.Millisecond)
524 defer errCancel()
525 _, gotErr := drainUntil(errCtx, evc, func(e pubsub.Event[proto.AgentEvent]) bool {
526 return e.Payload.Type == proto.AgentEventTypeError && e.Payload.Error != nil
527 })
528 require.False(t, gotErr, "no AgentEvent error must be published for the canceled turn")
529}
530
531// TestE2E_PromptRequestContextDoesNotOwnRun covers PLAN item 2: the
532// prompting client's HTTP request context does not own the run. A POST
533// with a very short request-context timeout still returns 202 before
534// that context would expire, and the run keeps going (observed via SSE
535// finishing normally after release).
536func TestE2E_PromptRequestContextDoesNotOwnRun(t *testing.T) {
537 t.Parallel()
538 h := newAgentE2EHarness(t)
539 streamCtx, streamCancel := context.WithCancel(t.Context())
540 t.Cleanup(streamCancel)
541
542 cid := uuid.New().String()
543 evc, cancelSSE := h.subscribeSSE(t, streamCtx, h.workspace.ID, cid)
544 t.Cleanup(cancelSSE)
545 h.waitForAttached(t, 1)
546
547 const sid = "s-short-req"
548
549 // The POST request context times out almost immediately. The
550 // handler must still return 202 (fire-and-forget) and the run must
551 // survive past the request-context deadline.
552 reqCtx, reqCancel := context.WithTimeout(t.Context(), 50*time.Millisecond)
553 defer reqCancel()
554 require.Equal(t, http.StatusAccepted, h.postAgentHTTP(t, reqCtx, sid))
555 h.waitForRunEntered(t)
556
557 // Let the request context expire, then release the run.
558 <-reqCtx.Done()
559 close(h.coord.release)
560
561 pickCtx, pickCancel := context.WithTimeout(streamCtx, 3*time.Second)
562 defer pickCancel()
563 got, ok := drainUntil(pickCtx, evc, func(e pubsub.Event[proto.Message]) bool {
564 r, has := finishReason(e.Payload)
565 return e.Payload.Role == proto.Assistant && has && r == proto.FinishReasonEndTurn
566 })
567 require.True(t, ok, "the run must finish normally even after the prompting request context expired")
568 require.Equal(t, sid, got.Payload.SessionID)
569}
570
571// TestE2E_AgentRunSurvivesAcrossWorkspaceClaims covers PLAN item 3: a
572// run started by client A survives A detaching as long as another
573// client (B) keeps the workspace alive; B observes the run finish via
574// SSE.
575func TestE2E_AgentRunSurvivesAcrossWorkspaceClaims(t *testing.T) {
576 t.Parallel()
577 h := newAgentE2EHarness(t)
578
579 ctxA, cancelA := context.WithCancel(t.Context())
580 ctxB, cancelB := context.WithCancel(t.Context())
581 t.Cleanup(cancelB)
582
583 cidA := uuid.New().String()
584 cidB := uuid.New().String()
585 _, killA := h.subscribeSSE(t, ctxA, h.workspace.ID, cidA)
586 t.Cleanup(killA)
587 evcB, killB := h.subscribeSSE(t, ctxB, h.workspace.ID, cidB)
588 t.Cleanup(killB)
589 h.waitForAttached(t, 2)
590
591 const sid = "s-survive"
592 // A is the poster; the run must outlive A detaching as long as B
593 // keeps the workspace alive.
594 require.Equal(t, http.StatusAccepted, h.postAgentHTTP(t, ctxA, sid))
595 h.waitForRunEntered(t)
596
597 // A detaches; B is still attached so the workspace stays alive.
598 cancelA()
599 killA()
600 require.Eventually(t, func() bool {
601 return backend.WorkspaceLiveStreamCountForTest(h.workspace) == 1
602 }, 3*time.Second, 10*time.Millisecond,
603 "A detaching must leave B as the sole attached client")
604 require.False(t, h.shutdownHit.Load(), "workspace must stay alive while B is attached")
605
606 // Release the run; B must still observe it finish.
607 close(h.coord.release)
608 pickCtx, pickCancel := context.WithTimeout(ctxB, 3*time.Second)
609 defer pickCancel()
610 got, ok := drainUntil(pickCtx, evcB, func(e pubsub.Event[proto.Message]) bool {
611 r, has := finishReason(e.Payload)
612 return e.Payload.Role == proto.Assistant && has && r == proto.FinishReasonEndTurn
613 })
614 require.True(t, ok, "B must observe the run finish after A detaches")
615 require.Equal(t, sid, got.Payload.SessionID)
616}
617
618// TestE2E_CancelOfActiveRunAlsoCancelsAcceptedFollowUp covers PLAN item
619// 1c at the externally-observable level: while session sid has an active
620// run, a second prompt for sid is accepted; a cancel for sid must cancel
621// the active run and must not let the follow-up stream a normal
622// FinishReasonEndTurn.
623//
624// The sequence follows the PLAN exactly: prompt 1 becomes the active
625// run, prompt 2 for the same sid is accepted, then a cancel for sid
626// fires, and only afterwards are any signals released. The scripted
627// coordinator models the externally-observable contract of the
628// busy-queue branch and pendingCancels (which depend on internal
629// scheduler signals the codebase does not expose): a per-session cancel
630// tears down every in-flight run for sid and arms a cancel-on-entry for
631// a follow-up still dispatching. The invariant asserted is the one that
632// matters: after the cancel, the active run ends canceled and the
633// follow-up never streams a normal FinishReasonEndTurn.
634func TestE2E_CancelOfActiveRunAlsoCancelsAcceptedFollowUp(t *testing.T) {
635 t.Parallel()
636 h := newAgentE2EHarness(t)
637 ctx, cancel := context.WithCancel(t.Context())
638 t.Cleanup(cancel)
639
640 cid := uuid.New().String()
641 evc, cancelSSE := h.subscribeSSE(t, ctx, h.workspace.ID, cid)
642 t.Cleanup(cancelSSE)
643 h.waitForAttached(t, 1)
644
645 const sid = "s-followup"
646
647 // (a) Prompt 1 for sid becomes the active run. Capture its run id so
648 // the canceled assistant message below can be attributed to run 1
649 // unambiguously.
650 require.Equal(t, http.StatusAccepted, h.postAgentHTTP(t, ctx, sid))
651 run1 := h.waitForRunEntered(t)
652
653 // (b) Prompt 2 for the *same* sid is accepted while the active run
654 // is still in flight; it is the follow-up the PLAN describes
655 // (acceptedRuns > 0, either still dispatching or about to enter the
656 // busy-queue branch).
657 require.Equal(t, http.StatusAccepted, h.postAgentHTTP(t, ctx, sid))
658 run2 := h.waitForRunEntered(t)
659 require.NotEqual(t, run1, run2, "the follow-up must be a distinct run from the active one")
660
661 // (c) B cancels sid. This tears down every in-flight run for the
662 // session and arms a pending cancel for any follow-up that has not
663 // yet entered Run.
664 require.Equal(t, http.StatusOK, h.cancelAgentHTTP(t, ctx, sid))
665
666 // (d) Open the coordinator gate so any run that is NOT canceled would
667 // be free to proceed straight into the normal FinishReasonEndTurn
668 // branch. The scripted Run checks runCtx.Done() before the release
669 // select, so a canceled run still takes the canceled path even with
670 // release closed; only a non-canceled run reaches FinishReasonEndTurn.
671 // Releasing here is therefore what makes the assertions below
672 // meaningful: if the cancel had failed to tear down run 1 or arm the
673 // cancel-on-entry for the follow-up, the freed gate would let that run
674 // stream a normal FinishReasonEndTurn and the test would fail.
675 close(h.coord.release)
676
677 pickCtx, pickCancel := context.WithTimeout(ctx, 3*time.Second)
678 defer pickCancel()
679
680 // (e) Run 1 (the active run) must end with FinishReasonCanceled. The
681 // assistant message id is qualified with the run id, so matching on
682 // run1's id proves the cancellation is attributed to the FIRST run
683 // and not to the follow-up.
684 //
685 // The single drain below is also the negative assertion for run 2:
686 // the match closure inspects every assistant event for sid as it
687 // scans, and if it ever observes the follow-up (run 2) streaming a
688 // normal FinishReasonEndTurn it records that violation immediately.
689 // This is what makes the run-2 check sound: a previous two-phase
690 // approach could let this very drain consume and discard a run-2
691 // EndTurn while still hunting for run 1's canceled message, leaving a
692 // later no-EndTurn check unable to prove run 2 stayed canceled.
693 // Folding the negative check into the same scan means a run-2 EndTurn
694 // can never slip past unobserved, whether it arrives before or after
695 // run 1's canceled message.
696 run1AsstID := fmt.Sprintf("a-%s-%d", sid, run1)
697 run2AsstID := fmt.Sprintf("a-%s-%d", sid, run2)
698 var followUpEndTurn bool
699 got, ok := drainUntil(pickCtx, evc, func(e pubsub.Event[proto.Message]) bool {
700 if e.Payload.SessionID != sid || e.Payload.Role != proto.Assistant {
701 return false
702 }
703 r, has := finishReason(e.Payload)
704 if !has {
705 return false
706 }
707 // Any normal model output for sid after the cancel is a
708 // violation. The follow-up (run 2) must never reach the
709 // FinishReasonEndTurn branch; flag it the moment it is seen so
710 // the assertion below fails even if this event arrives while we
711 // are still waiting for run 1's canceled message.
712 if r == proto.FinishReasonEndTurn {
713 if e.Payload.ID == run2AsstID || e.Payload.ID != run1AsstID {
714 followUpEndTurn = true
715 }
716 // Stop draining; the EndTurn observation is decisive and the
717 // require.False below will surface the failure.
718 return true
719 }
720 return e.Payload.ID == run1AsstID && r == proto.FinishReasonCanceled
721 })
722 require.False(t, followUpEndTurn, "the accepted follow-up must not stream a normal FinishReasonEndTurn after the cancel")
723 require.True(t, ok, "the first (active) run must end with FinishReasonCanceled")
724 require.Equal(t, run1AsstID, got.Payload.ID, "the canceled message must belong to the first (active) run")
725 gotReason, gotHas := finishReason(got.Payload)
726 require.True(t, gotHas)
727 require.Equal(t, proto.FinishReasonCanceled, gotReason, "the matched run-1 message must be canceled, not a normal end turn")
728 require.Equal(t, sid, got.Payload.SessionID)
729
730 // Confirm no normal FinishReasonEndTurn for sid is still in flight.
731 // By this point the scan above has already ruled out a run-2 EndTurn
732 // arriving before run 1's canceled message; this guards against one
733 // arriving afterward.
734 endCtx, endCancel := context.WithTimeout(ctx, 300*time.Millisecond)
735 defer endCancel()
736 _, gotEnd := drainUntil(endCtx, evc, func(e pubsub.Event[proto.Message]) bool {
737 r, has := finishReason(e.Payload)
738 return e.Payload.SessionID == sid && e.Payload.Role == proto.Assistant && has && r == proto.FinishReasonEndTurn
739 })
740 require.False(t, gotEnd, "the accepted follow-up must not stream model output after the cancel")
741}