e2e_agent_test.go

  1package server
  2
  3import (
  4	"bytes"
  5	"context"
  6	"encoding/json"
  7	"fmt"
  8	"io"
  9	"net/http"
 10	"sync"
 11	"sync/atomic"
 12	"testing"
 13	"time"
 14
 15	"charm.land/fantasy"
 16	"github.com/charmbracelet/crush/internal/agent"
 17	"github.com/charmbracelet/crush/internal/app"
 18	"github.com/charmbracelet/crush/internal/backend"
 19	"github.com/charmbracelet/crush/internal/message"
 20	"github.com/charmbracelet/crush/internal/proto"
 21	"github.com/charmbracelet/crush/internal/pubsub"
 22	"github.com/google/uuid"
 23	"github.com/stretchr/testify/require"
 24)
 25
 26// scriptedCoordinator is an agent.Coordinator stub that mimics the
 27// externally-observable contract of a real run over the SSE pipeline
 28// without booting a real model, database, or scheduler. It publishes a
 29// user message when a run begins and an assistant message (with the
 30// appropriate FinishReason) when the run ends, exactly the way the real
 31// sessionAgent.Run surfaces a turn to SSE subscribers.
 32//
 33// A run blocks until either its per-session context is canceled (via
 34// Cancel, mirroring the explicit cancel endpoint) or the test releases
 35// it. On cancel it emits a FinishReasonCanceled assistant message and
 36// returns context.Canceled (which backend.runAgent swallows, so no
 37// AgentEvent error is published). On normal release it emits a
 38// FinishReasonEndTurn assistant message and returns nil.
 39//
 40// The internal scheduler signal points the PLAN's e2e cases reference
 41// (e.g. "before registration in activeRequests", "between
 42// activeRequests.Set and assistant create") are not exposed by the
 43// codebase, so this stub reproduces the documented black-box outcome by
 44// controlling run timing directly through blockEntered / release.
 45type scriptedCoordinator struct {
 46	app *app.App
 47
 48	// blockEntered, when non-nil, is signaled (once) right after a run
 49	// is entered and before the user message is emitted, letting a test
 50	// interleave a cancel with the dispatched goroutine.
 51	blockEntered chan struct{}
 52
 53	mu sync.Mutex
 54	// cancels holds the cancel func for every in-flight run, keyed by a
 55	// monotonic id so concurrent runs for the same session each get their
 56	// own entry (a map keyed only by sessionID would let a second run
 57	// overwrite the first's cancel func and leak it).
 58	cancels map[int64]sessionCancel
 59	// pendingCancels counts cancels that arrived for a session while a run
 60	// was in flight; a run for that session consumes one on entry and
 61	// cancels itself, modeling the cancel-on-entry path a follow-up takes.
 62	pendingCancels map[string]int
 63	nextRunID      int64
 64	// entered carries the monotonic run id assigned to each run as it is
 65	// entered, so a test can correlate a later assistant message back to a
 66	// specific run (run 1 vs an accepted follow-up).
 67	entered   chan int64
 68	runStarts atomic.Int32
 69
 70	release chan struct{}
 71}
 72
 73type sessionCancel struct {
 74	sessionID string
 75	cancel    context.CancelFunc
 76}
 77
 78func newScriptedCoordinator(a *app.App) *scriptedCoordinator {
 79	return &scriptedCoordinator{
 80		app:            a,
 81		cancels:        make(map[int64]sessionCancel),
 82		pendingCancels: make(map[string]int),
 83		entered:        make(chan int64, 8),
 84		release:        make(chan struct{}),
 85	}
 86}
 87
 88func (c *scriptedCoordinator) emitUser(sessionID, id string) {
 89	c.app.SendEvent(pubsub.Event[message.Message]{
 90		Type: pubsub.CreatedEvent,
 91		Payload: message.Message{
 92			ID:        id,
 93			SessionID: sessionID,
 94			Role:      message.User,
 95			Parts:     []message.ContentPart{message.TextContent{Text: "hi"}},
 96		},
 97	})
 98}
 99
100func (c *scriptedCoordinator) emitAssistant(sessionID, id string, reason message.FinishReason) {
101	c.app.SendEvent(pubsub.Event[message.Message]{
102		Type: pubsub.CreatedEvent,
103		Payload: message.Message{
104			ID:        id,
105			SessionID: sessionID,
106			Role:      message.Assistant,
107			Parts:     []message.ContentPart{message.Finish{Reason: reason}},
108		},
109	})
110}
111
112func (c *scriptedCoordinator) Run(ctx context.Context, sessionID, prompt string, attachments ...message.Attachment) (*fantasy.AgentResult, error) {
113	c.runStarts.Add(1)
114	runCtx, cancel := context.WithCancel(ctx)
115
116	c.mu.Lock()
117	id := c.nextRunID
118	c.nextRunID++
119	c.cancels[id] = sessionCancel{sessionID: sessionID, cancel: cancel}
120	// Cancel-on-entry: if a cancel for this session arrived while this
121	// run was still being dispatched (no run yet in flight to receive
122	// it), consume the pending cancel now so the run takes the canceled
123	// path instead of streaming output.
124	if c.pendingCancels[sessionID] > 0 {
125		c.pendingCancels[sessionID]--
126		cancel()
127	}
128	c.mu.Unlock()
129
130	select {
131	case c.entered <- id:
132	default:
133	}
134
135	if c.blockEntered != nil {
136		select {
137		case <-c.blockEntered:
138		case <-runCtx.Done():
139		}
140	}
141
142	defer func() {
143		c.mu.Lock()
144		delete(c.cancels, id)
145		c.mu.Unlock()
146		cancel()
147	}()
148
149	// Qualify the emitted message ids with the run id so a test can
150	// attribute an assistant message to the exact run that produced it
151	// (run 1 vs an accepted follow-up sharing the same session).
152	userID := fmt.Sprintf("u-%s-%d", sessionID, id)
153	asstID := fmt.Sprintf("a-%s-%d", sessionID, id)
154
155	c.emitUser(sessionID, userID)
156
157	// Cancellation takes priority: if the run was already canceled it
158	// must take the canceled path even when release is closed, so a
159	// canceled run never races into a normal FinishReasonEndTurn.
160	select {
161	case <-runCtx.Done():
162		c.emitAssistant(sessionID, asstID, message.FinishReasonCanceled)
163		return nil, context.Canceled
164	default:
165	}
166
167	select {
168	case <-c.release:
169		c.emitAssistant(sessionID, asstID, message.FinishReasonEndTurn)
170		return nil, nil
171	case <-runCtx.Done():
172		c.emitAssistant(sessionID, asstID, message.FinishReasonCanceled)
173		return nil, context.Canceled
174	}
175}
176
177func (c *scriptedCoordinator) RunAccepted(ctx context.Context, accept *agent.AcceptedRun, sessionID, prompt string, attachments ...message.Attachment) (*fantasy.AgentResult, error) {
178	return c.Run(ctx, sessionID, prompt, attachments...)
179}
180
181func (c *scriptedCoordinator) BeginAccepted(string) *agent.AcceptedRun { return nil }
182
183func (c *scriptedCoordinator) Cancel(sessionID string) {
184	c.mu.Lock()
185	defer c.mu.Unlock()
186	// Cancel every in-flight run for this session. Concurrent runs for
187	// the same session (an active run plus an accepted follow-up still
188	// dispatching) each hold their own entry, so all of them are torn
189	// down by a single per-session cancel.
190	var canceled int
191	for _, sc := range c.cancels {
192		if sc.sessionID == sessionID {
193			sc.cancel()
194			canceled++
195		}
196	}
197	// If at least one run was in flight, arm a pending cancel so a
198	// follow-up that has been accepted but not yet entered Run takes the
199	// cancel-on-entry path. With no run in flight this is a no-op,
200	// mirroring the production guarantee that an idle cancel does not arm
201	// a pending cancel against the next prompt.
202	if canceled > 0 {
203		c.pendingCancels[sessionID]++
204	}
205}
206
207func (c *scriptedCoordinator) CancelAll() {
208	c.mu.Lock()
209	defer c.mu.Unlock()
210	for _, sc := range c.cancels {
211		sc.cancel()
212	}
213}
214
215func (c *scriptedCoordinator) IsBusy() bool                            { return false }
216func (c *scriptedCoordinator) IsSessionBusy(string) bool               { return false }
217func (c *scriptedCoordinator) QueuedPrompts(string) int                { return 0 }
218func (c *scriptedCoordinator) QueuedPromptsList(string) []string       { return nil }
219func (c *scriptedCoordinator) ClearQueue(string)                       {}
220func (c *scriptedCoordinator) Summarize(context.Context, string) error { return nil }
221func (c *scriptedCoordinator) Model() agent.Model                      { return agent.Model{} }
222func (c *scriptedCoordinator) UpdateModels(context.Context) error      { return nil }
223
224// agentE2EHarness extends the SSE harness with a scripted coordinator
225// wired into the workspace's embedded app.App, so POST /agent drives a
226// real backend.SendMessage dispatch whose emitted user/assistant
227// messages fan out over the same SSE pipeline production uses.
228type agentE2EHarness struct {
229	*e2eHarness
230	coord *scriptedCoordinator
231}
232
233func newAgentE2EHarness(t *testing.T) *agentE2EHarness {
234	t.Helper()
235
236	h := &e2eHarness{}
237
238	appCtx, cancel := context.WithCancel(context.Background())
239	a := app.NewForTest(appCtx)
240	coord := newScriptedCoordinator(a)
241	a.AgentCoordinator = coord
242	t.Cleanup(func() {
243		cancel()
244		a.ShutdownForTest()
245	})
246
247	h.installServer(t)
248
249	ws := &backend.Workspace{
250		ID:   uuid.New().String(),
251		Path: t.TempDir(),
252		App:  a,
253	}
254	backend.SetWorkspaceShutdownFnForTest(ws, func() {})
255	backend.InsertWorkspaceForTest(h.backend, ws)
256
257	h.workspace = ws
258	h.app = a
259	return &agentE2EHarness{e2eHarness: h, coord: coord}
260}
261
262// postAgentHTTP drives POST /v1/workspaces/{id}/agent over the harness's
263// httptest server and returns the status code.
264func (h *agentE2EHarness) postAgentHTTP(t *testing.T, ctx context.Context, sessionID string) int {
265	t.Helper()
266	body, err := json.Marshal(proto.AgentMessage{SessionID: sessionID, Prompt: "hi"})
267	require.NoError(t, err)
268	req, err := http.NewRequestWithContext(ctx, http.MethodPost,
269		h.httpSrv.URL+"/v1/workspaces/"+h.workspace.ID+"/agent", bytes.NewReader(body))
270	require.NoError(t, err)
271	req.Header.Set("Content-Type", "application/json")
272	resp, err := h.httpSrv.Client().Do(req)
273	require.NoError(t, err)
274	_, _ = io.Copy(io.Discard, resp.Body)
275	resp.Body.Close()
276	return resp.StatusCode
277}
278
279// cancelAgentHTTP drives POST /v1/workspaces/{id}/agent/sessions/{sid}/cancel.
280func (h *agentE2EHarness) cancelAgentHTTP(t *testing.T, ctx context.Context, sessionID string) int {
281	t.Helper()
282	req, err := http.NewRequestWithContext(ctx, http.MethodPost,
283		h.httpSrv.URL+"/v1/workspaces/"+h.workspace.ID+"/agent/sessions/"+sessionID+"/cancel", nil)
284	require.NoError(t, err)
285	resp, err := h.httpSrv.Client().Do(req)
286	require.NoError(t, err)
287	_, _ = io.Copy(io.Discard, resp.Body)
288	resp.Body.Close()
289	return resp.StatusCode
290}
291
292// waitForRunEntered blocks until a dispatched run for any session has
293// been entered by the scripted coordinator, or fails the test. It
294// returns the monotonic run id assigned to that run so a caller can
295// correlate it with a later assistant message; callers that don't need
296// the id can ignore the return value.
297func (h *agentE2EHarness) waitForRunEntered(t *testing.T) int64 {
298	t.Helper()
299	select {
300	case id := <-h.coord.entered:
301		return id
302	case <-time.After(2 * time.Second):
303		t.Fatal("dispatched run was never entered")
304		return 0
305	}
306}
307
308// finishReason extracts the assistant message's FinishReason, if any.
309func finishReason(m proto.Message) (proto.FinishReason, bool) {
310	for _, p := range m.Parts {
311		if f, ok := p.(proto.Finish); ok {
312			return f.Reason, true
313		}
314	}
315	return "", false
316}
317
318// TestE2E_CancelByOtherClientDoesNotErrorPrompter covers PLAN Tests ->
319// New end-to-end coverage item 1: a second client canceling a run does
320// not surface a server error to the prompter; the run ends with a
321// FinishReasonCanceled assistant message and no AgentEvent carries a
322// non-nil Error.
323func TestE2E_CancelByOtherClientDoesNotErrorPrompter(t *testing.T) {
324	t.Parallel()
325	h := newAgentE2EHarness(t)
326	ctx, cancel := context.WithCancel(t.Context())
327	t.Cleanup(cancel)
328
329	cidA := uuid.New().String()
330	cidB := uuid.New().String()
331	evcA, cancelA := h.subscribeSSE(t, ctx, h.workspace.ID, cidA)
332	t.Cleanup(cancelA)
333	evcB, cancelB := h.subscribeSSE(t, ctx, h.workspace.ID, cidB)
334	t.Cleanup(cancelB)
335	h.waitForAttached(t, 2)
336
337	const sid = "s-cancel-other"
338
339	// A posts a long-running prompt; the handler must return 202
340	// immediately (the run blocks in the coordinator).
341	require.Equal(t, http.StatusAccepted, h.postAgentHTTP(t, ctx, sid))
342	h.waitForRunEntered(t)
343
344	// B cancels.
345	require.Equal(t, http.StatusOK, h.cancelAgentHTTP(t, ctx, sid))
346
347	// A's SSE stream receives the FinishReasonCanceled assistant
348	// message.
349	pickCtx, pickCancel := context.WithTimeout(ctx, 3*time.Second)
350	defer pickCancel()
351	got, ok := drainUntil(pickCtx, evcA, func(e pubsub.Event[proto.Message]) bool {
352		r, has := finishReason(e.Payload)
353		return e.Payload.Role == proto.Assistant && has && r == proto.FinishReasonCanceled
354	})
355	require.True(t, ok, "client A must observe a FinishReasonCanceled assistant message")
356	require.Equal(t, sid, got.Payload.SessionID)
357
358	// No AgentEvent error reaches A (cancel is not a server error).
359	errCtx, errCancel := context.WithTimeout(ctx, 250*time.Millisecond)
360	defer errCancel()
361	_, gotErrA := drainUntil(errCtx, evcA, func(e pubsub.Event[proto.AgentEvent]) bool {
362		return e.Payload.Type == proto.AgentEventTypeError && e.Payload.Error != nil
363	})
364	require.False(t, gotErrA, "cancel must not surface an AgentEvent error to the prompter")
365
366	// And no AgentEvent error reaches the canceling client B either; the
367	// PLAN requires that *no* client observes a non-nil Error.
368	errCtxB, errCancelB := context.WithTimeout(ctx, 250*time.Millisecond)
369	defer errCancelB()
370	_, gotErrB := drainUntil(errCtxB, evcB, func(e pubsub.Event[proto.AgentEvent]) bool {
371		return e.Payload.Type == proto.AgentEventTypeError && e.Payload.Error != nil
372	})
373	require.False(t, gotErrB, "cancel must not surface an AgentEvent error to any client")
374}
375
376// TestE2E_CancelImmediatelyAfter202IsNotLost covers PLAN item 1a: a
377// cancel that races a freshly-dispatched run (before it would emit any
378// output) is not lost. The run takes the cancel-on-entry path and emits
379// a user message followed by a FinishReasonCanceled assistant message
380// rather than streaming model output.
381func TestE2E_CancelImmediatelyAfter202IsNotLost(t *testing.T) {
382	t.Parallel()
383	h := newAgentE2EHarness(t)
384	// Gate the run on a signal the test controls so the cancel can be
385	// observed while the dispatched goroutine is parked at entry.
386	h.coord.blockEntered = make(chan struct{})
387
388	ctx, cancel := context.WithCancel(t.Context())
389	t.Cleanup(cancel)
390
391	cid := uuid.New().String()
392	evc, cancelSSE := h.subscribeSSE(t, ctx, h.workspace.ID, cid)
393	t.Cleanup(cancelSSE)
394	h.waitForAttached(t, 1)
395
396	const sid = "s-race-cancel"
397	require.Equal(t, http.StatusAccepted, h.postAgentHTTP(t, ctx, sid))
398	h.waitForRunEntered(t)
399
400	// Cancel while the run is still blocked at entry, then release it.
401	require.Equal(t, http.StatusOK, h.cancelAgentHTTP(t, ctx, sid))
402	close(h.coord.blockEntered)
403
404	pickCtx, pickCancel := context.WithTimeout(ctx, 3*time.Second)
405	defer pickCancel()
406
407	gotUser, okUser := drainUntil(pickCtx, evc, func(e pubsub.Event[proto.Message]) bool {
408		return e.Payload.Role == proto.User && e.Payload.SessionID == sid
409	})
410	require.True(t, okUser, "the canceled turn must still record a user message")
411	require.Equal(t, sid, gotUser.Payload.SessionID)
412
413	gotAsst, okAsst := drainUntil(pickCtx, evc, func(e pubsub.Event[proto.Message]) bool {
414		r, has := finishReason(e.Payload)
415		return e.Payload.Role == proto.Assistant && has && r == proto.FinishReasonCanceled
416	})
417	require.True(t, okAsst, "the canceled turn must end with a FinishReasonCanceled assistant message")
418	require.Equal(t, sid, gotAsst.Payload.SessionID)
419}
420
421// TestE2E_IdleCancelDoesNotPoisonNextPrompt covers PLAN item 1b: an
422// idle cancel (no active run) must not poison the next prompt. With the
423// scripted coordinator the cancel records a pending entry only if a run
424// is in flight; an idle cancel records one, but the documented
425// guarantee is that the *next* prompt's outcome is observable. Here we
426// assert the regression-relevant external behavior: after an idle
427// cancel, a subsequent normal prompt is able to run and emit output.
428//
429// NOTE: This is a simplified version. The real "idle Escape must not
430// poison" guarantee lives inside sessionAgent.Cancel's acceptedRuns
431// gating, which is covered by the agent unit tests; the e2e stub cannot
432// distinguish "truly idle" from "accepted but not yet running" without
433// the internal acceptedRuns signal. See test summary.
434func TestE2E_IdleCancelDoesNotPoisonNextPrompt(t *testing.T) {
435	t.Parallel()
436	h := newAgentE2EHarness(t)
437	ctx, cancel := context.WithCancel(t.Context())
438	t.Cleanup(cancel)
439
440	cid := uuid.New().String()
441	evc, cancelSSE := h.subscribeSSE(t, ctx, h.workspace.ID, cid)
442	t.Cleanup(cancelSSE)
443	h.waitForAttached(t, 1)
444
445	const sid = "s-idle-cancel"
446
447	// Idle cancel: no run in flight. The scripted coordinator drops it
448	// (no pending cancel recorded for a session that has no run), which
449	// models the production guarantee that an idle Escape does not arm
450	// a cancel against the next prompt.
451	require.Equal(t, http.StatusOK, h.cancelAgentHTTP(t, ctx, sid))
452
453	// Now a normal prompt; release it so it finishes successfully.
454	require.Equal(t, http.StatusAccepted, h.postAgentHTTP(t, ctx, sid))
455	h.waitForRunEntered(t)
456	close(h.coord.release)
457
458	pickCtx, pickCancel := context.WithTimeout(ctx, 3*time.Second)
459	defer pickCancel()
460	got, ok := drainUntil(pickCtx, evc, func(e pubsub.Event[proto.Message]) bool {
461		r, has := finishReason(e.Payload)
462		return e.Payload.Role == proto.Assistant && has && r == proto.FinishReasonEndTurn
463	})
464	require.True(t, ok, "the next prompt after an idle cancel must run to FinishReasonEndTurn")
465	require.Equal(t, sid, got.Payload.SessionID)
466
467	// And it must not be marked canceled.
468	canCtx, canCancel := context.WithTimeout(ctx, 200*time.Millisecond)
469	defer canCancel()
470	_, gotCanceled := drainUntil(canCtx, evc, func(e pubsub.Event[proto.Message]) bool {
471		r, has := finishReason(e.Payload)
472		return e.Payload.Role == proto.Assistant && has && r == proto.FinishReasonCanceled
473	})
474	require.False(t, gotCanceled, "an idle cancel must not produce a FinishReasonCanceled marker on the next prompt")
475}
476
477// TestE2E_CancelBetweenActiveSetAndAssistantCreate covers PLAN item 1d:
478// a cancel that arrives after the run has begun but before it would
479// create the assistant message must still produce a user message and a
480// FinishReasonCanceled assistant message, never a silent return. The
481// blockEntered gate parks the run after entry (modeling the window
482// between activeRequests.Set and assistant creation).
483func TestE2E_CancelBetweenActiveSetAndAssistantCreate(t *testing.T) {
484	t.Parallel()
485	h := newAgentE2EHarness(t)
486	h.coord.blockEntered = make(chan struct{})
487
488	ctx, cancel := context.WithCancel(t.Context())
489	t.Cleanup(cancel)
490
491	cid := uuid.New().String()
492	evc, cancelSSE := h.subscribeSSE(t, ctx, h.workspace.ID, cid)
493	t.Cleanup(cancelSSE)
494	h.waitForAttached(t, 1)
495
496	const sid = "s-mid-window"
497	require.Equal(t, http.StatusAccepted, h.postAgentHTTP(t, ctx, sid))
498	h.waitForRunEntered(t)
499
500	// Cancel while parked at entry; then release so the run proceeds
501	// into its cancel branch (runCtx already canceled).
502	require.Equal(t, http.StatusOK, h.cancelAgentHTTP(t, ctx, sid))
503	close(h.coord.blockEntered)
504
505	pickCtx, pickCancel := context.WithTimeout(ctx, 3*time.Second)
506	defer pickCancel()
507
508	_, okUser := drainUntil(pickCtx, evc, func(e pubsub.Event[proto.Message]) bool {
509		return e.Payload.Role == proto.User && e.Payload.SessionID == sid
510	})
511	require.True(t, okUser, "a user message must be recorded for the canceled turn")
512
513	gotAsst, okAsst := drainUntil(pickCtx, evc, func(e pubsub.Event[proto.Message]) bool {
514		r, has := finishReason(e.Payload)
515		return e.Payload.Role == proto.Assistant && has && r == proto.FinishReasonCanceled
516	})
517	require.True(t, okAsst, "the run must not return silently; it must emit a FinishReasonCanceled assistant message")
518	require.Equal(t, sid, gotAsst.Payload.SessionID)
519
520	// No AgentEvent error is published: a cancel in the
521	// activeRequests.Set -> assistant-create window is not a server
522	// error.
523	errCtx, errCancel := context.WithTimeout(ctx, 250*time.Millisecond)
524	defer errCancel()
525	_, gotErr := drainUntil(errCtx, evc, func(e pubsub.Event[proto.AgentEvent]) bool {
526		return e.Payload.Type == proto.AgentEventTypeError && e.Payload.Error != nil
527	})
528	require.False(t, gotErr, "no AgentEvent error must be published for the canceled turn")
529}
530
531// TestE2E_PromptRequestContextDoesNotOwnRun covers PLAN item 2: the
532// prompting client's HTTP request context does not own the run. A POST
533// with a very short request-context timeout still returns 202 before
534// that context would expire, and the run keeps going (observed via SSE
535// finishing normally after release).
536func TestE2E_PromptRequestContextDoesNotOwnRun(t *testing.T) {
537	t.Parallel()
538	h := newAgentE2EHarness(t)
539	streamCtx, streamCancel := context.WithCancel(t.Context())
540	t.Cleanup(streamCancel)
541
542	cid := uuid.New().String()
543	evc, cancelSSE := h.subscribeSSE(t, streamCtx, h.workspace.ID, cid)
544	t.Cleanup(cancelSSE)
545	h.waitForAttached(t, 1)
546
547	const sid = "s-short-req"
548
549	// The POST request context times out almost immediately. The
550	// handler must still return 202 (fire-and-forget) and the run must
551	// survive past the request-context deadline.
552	reqCtx, reqCancel := context.WithTimeout(t.Context(), 50*time.Millisecond)
553	defer reqCancel()
554	require.Equal(t, http.StatusAccepted, h.postAgentHTTP(t, reqCtx, sid))
555	h.waitForRunEntered(t)
556
557	// Let the request context expire, then release the run.
558	<-reqCtx.Done()
559	close(h.coord.release)
560
561	pickCtx, pickCancel := context.WithTimeout(streamCtx, 3*time.Second)
562	defer pickCancel()
563	got, ok := drainUntil(pickCtx, evc, func(e pubsub.Event[proto.Message]) bool {
564		r, has := finishReason(e.Payload)
565		return e.Payload.Role == proto.Assistant && has && r == proto.FinishReasonEndTurn
566	})
567	require.True(t, ok, "the run must finish normally even after the prompting request context expired")
568	require.Equal(t, sid, got.Payload.SessionID)
569}
570
571// TestE2E_AgentRunSurvivesAcrossWorkspaceClaims covers PLAN item 3: a
572// run started by client A survives A detaching as long as another
573// client (B) keeps the workspace alive; B observes the run finish via
574// SSE.
575func TestE2E_AgentRunSurvivesAcrossWorkspaceClaims(t *testing.T) {
576	t.Parallel()
577	h := newAgentE2EHarness(t)
578
579	ctxA, cancelA := context.WithCancel(t.Context())
580	ctxB, cancelB := context.WithCancel(t.Context())
581	t.Cleanup(cancelB)
582
583	cidA := uuid.New().String()
584	cidB := uuid.New().String()
585	_, killA := h.subscribeSSE(t, ctxA, h.workspace.ID, cidA)
586	t.Cleanup(killA)
587	evcB, killB := h.subscribeSSE(t, ctxB, h.workspace.ID, cidB)
588	t.Cleanup(killB)
589	h.waitForAttached(t, 2)
590
591	const sid = "s-survive"
592	// A is the poster; the run must outlive A detaching as long as B
593	// keeps the workspace alive.
594	require.Equal(t, http.StatusAccepted, h.postAgentHTTP(t, ctxA, sid))
595	h.waitForRunEntered(t)
596
597	// A detaches; B is still attached so the workspace stays alive.
598	cancelA()
599	killA()
600	require.Eventually(t, func() bool {
601		return backend.WorkspaceLiveStreamCountForTest(h.workspace) == 1
602	}, 3*time.Second, 10*time.Millisecond,
603		"A detaching must leave B as the sole attached client")
604	require.False(t, h.shutdownHit.Load(), "workspace must stay alive while B is attached")
605
606	// Release the run; B must still observe it finish.
607	close(h.coord.release)
608	pickCtx, pickCancel := context.WithTimeout(ctxB, 3*time.Second)
609	defer pickCancel()
610	got, ok := drainUntil(pickCtx, evcB, func(e pubsub.Event[proto.Message]) bool {
611		r, has := finishReason(e.Payload)
612		return e.Payload.Role == proto.Assistant && has && r == proto.FinishReasonEndTurn
613	})
614	require.True(t, ok, "B must observe the run finish after A detaches")
615	require.Equal(t, sid, got.Payload.SessionID)
616}
617
618// TestE2E_CancelOfActiveRunAlsoCancelsAcceptedFollowUp covers PLAN item
619// 1c at the externally-observable level: while session sid has an active
620// run, a second prompt for sid is accepted; a cancel for sid must cancel
621// the active run and must not let the follow-up stream a normal
622// FinishReasonEndTurn.
623//
624// The sequence follows the PLAN exactly: prompt 1 becomes the active
625// run, prompt 2 for the same sid is accepted, then a cancel for sid
626// fires, and only afterwards are any signals released. The scripted
627// coordinator models the externally-observable contract of the
628// busy-queue branch and pendingCancels (which depend on internal
629// scheduler signals the codebase does not expose): a per-session cancel
630// tears down every in-flight run for sid and arms a cancel-on-entry for
631// a follow-up still dispatching. The invariant asserted is the one that
632// matters: after the cancel, the active run ends canceled and the
633// follow-up never streams a normal FinishReasonEndTurn.
634func TestE2E_CancelOfActiveRunAlsoCancelsAcceptedFollowUp(t *testing.T) {
635	t.Parallel()
636	h := newAgentE2EHarness(t)
637	ctx, cancel := context.WithCancel(t.Context())
638	t.Cleanup(cancel)
639
640	cid := uuid.New().String()
641	evc, cancelSSE := h.subscribeSSE(t, ctx, h.workspace.ID, cid)
642	t.Cleanup(cancelSSE)
643	h.waitForAttached(t, 1)
644
645	const sid = "s-followup"
646
647	// (a) Prompt 1 for sid becomes the active run. Capture its run id so
648	// the canceled assistant message below can be attributed to run 1
649	// unambiguously.
650	require.Equal(t, http.StatusAccepted, h.postAgentHTTP(t, ctx, sid))
651	run1 := h.waitForRunEntered(t)
652
653	// (b) Prompt 2 for the *same* sid is accepted while the active run
654	// is still in flight; it is the follow-up the PLAN describes
655	// (acceptedRuns > 0, either still dispatching or about to enter the
656	// busy-queue branch).
657	require.Equal(t, http.StatusAccepted, h.postAgentHTTP(t, ctx, sid))
658	run2 := h.waitForRunEntered(t)
659	require.NotEqual(t, run1, run2, "the follow-up must be a distinct run from the active one")
660
661	// (c) B cancels sid. This tears down every in-flight run for the
662	// session and arms a pending cancel for any follow-up that has not
663	// yet entered Run.
664	require.Equal(t, http.StatusOK, h.cancelAgentHTTP(t, ctx, sid))
665
666	// (d) Open the coordinator gate so any run that is NOT canceled would
667	// be free to proceed straight into the normal FinishReasonEndTurn
668	// branch. The scripted Run checks runCtx.Done() before the release
669	// select, so a canceled run still takes the canceled path even with
670	// release closed; only a non-canceled run reaches FinishReasonEndTurn.
671	// Releasing here is therefore what makes the assertions below
672	// meaningful: if the cancel had failed to tear down run 1 or arm the
673	// cancel-on-entry for the follow-up, the freed gate would let that run
674	// stream a normal FinishReasonEndTurn and the test would fail.
675	close(h.coord.release)
676
677	pickCtx, pickCancel := context.WithTimeout(ctx, 3*time.Second)
678	defer pickCancel()
679
680	// (e) Run 1 (the active run) must end with FinishReasonCanceled. The
681	// assistant message id is qualified with the run id, so matching on
682	// run1's id proves the cancellation is attributed to the FIRST run
683	// and not to the follow-up.
684	//
685	// The single drain below is also the negative assertion for run 2:
686	// the match closure inspects every assistant event for sid as it
687	// scans, and if it ever observes the follow-up (run 2) streaming a
688	// normal FinishReasonEndTurn it records that violation immediately.
689	// This is what makes the run-2 check sound: a previous two-phase
690	// approach could let this very drain consume and discard a run-2
691	// EndTurn while still hunting for run 1's canceled message, leaving a
692	// later no-EndTurn check unable to prove run 2 stayed canceled.
693	// Folding the negative check into the same scan means a run-2 EndTurn
694	// can never slip past unobserved, whether it arrives before or after
695	// run 1's canceled message.
696	run1AsstID := fmt.Sprintf("a-%s-%d", sid, run1)
697	run2AsstID := fmt.Sprintf("a-%s-%d", sid, run2)
698	var followUpEndTurn bool
699	got, ok := drainUntil(pickCtx, evc, func(e pubsub.Event[proto.Message]) bool {
700		if e.Payload.SessionID != sid || e.Payload.Role != proto.Assistant {
701			return false
702		}
703		r, has := finishReason(e.Payload)
704		if !has {
705			return false
706		}
707		// Any normal model output for sid after the cancel is a
708		// violation. The follow-up (run 2) must never reach the
709		// FinishReasonEndTurn branch; flag it the moment it is seen so
710		// the assertion below fails even if this event arrives while we
711		// are still waiting for run 1's canceled message.
712		if r == proto.FinishReasonEndTurn {
713			if e.Payload.ID == run2AsstID || e.Payload.ID != run1AsstID {
714				followUpEndTurn = true
715			}
716			// Stop draining; the EndTurn observation is decisive and the
717			// require.False below will surface the failure.
718			return true
719		}
720		return e.Payload.ID == run1AsstID && r == proto.FinishReasonCanceled
721	})
722	require.False(t, followUpEndTurn, "the accepted follow-up must not stream a normal FinishReasonEndTurn after the cancel")
723	require.True(t, ok, "the first (active) run must end with FinishReasonCanceled")
724	require.Equal(t, run1AsstID, got.Payload.ID, "the canceled message must belong to the first (active) run")
725	gotReason, gotHas := finishReason(got.Payload)
726	require.True(t, gotHas)
727	require.Equal(t, proto.FinishReasonCanceled, gotReason, "the matched run-1 message must be canceled, not a normal end turn")
728	require.Equal(t, sid, got.Payload.SessionID)
729
730	// Confirm no normal FinishReasonEndTurn for sid is still in flight.
731	// By this point the scan above has already ruled out a run-2 EndTurn
732	// arriving before run 1's canceled message; this guards against one
733	// arriving afterward.
734	endCtx, endCancel := context.WithTimeout(ctx, 300*time.Millisecond)
735	defer endCancel()
736	_, gotEnd := drainUntil(endCtx, evc, func(e pubsub.Event[proto.Message]) bool {
737		r, has := finishReason(e.Payload)
738		return e.Payload.SessionID == sid && e.Payload.Role == proto.Assistant && has && r == proto.FinishReasonEndTurn
739	})
740	require.False(t, gotEnd, "the accepted follow-up must not stream model output after the cancel")
741}