diff --git a/internal/server/e2e_agent_test.go b/internal/server/e2e_agent_test.go new file mode 100644 index 0000000000000000000000000000000000000000..012068a967536cf214518f24c946e8b98fd16932 --- /dev/null +++ b/internal/server/e2e_agent_test.go @@ -0,0 +1,741 @@ +package server + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "net/http" + "sync" + "sync/atomic" + "testing" + "time" + + "charm.land/fantasy" + "github.com/charmbracelet/crush/internal/agent" + "github.com/charmbracelet/crush/internal/app" + "github.com/charmbracelet/crush/internal/backend" + "github.com/charmbracelet/crush/internal/message" + "github.com/charmbracelet/crush/internal/proto" + "github.com/charmbracelet/crush/internal/pubsub" + "github.com/google/uuid" + "github.com/stretchr/testify/require" +) + +// scriptedCoordinator is an agent.Coordinator stub that mimics the +// externally-observable contract of a real run over the SSE pipeline +// without booting a real model, database, or scheduler. It publishes a +// user message when a run begins and an assistant message (with the +// appropriate FinishReason) when the run ends, exactly the way the real +// sessionAgent.Run surfaces a turn to SSE subscribers. +// +// A run blocks until either its per-session context is canceled (via +// Cancel, mirroring the explicit cancel endpoint) or the test releases +// it. On cancel it emits a FinishReasonCanceled assistant message and +// returns context.Canceled (which backend.runAgent swallows, so no +// AgentEvent error is published). On normal release it emits a +// FinishReasonEndTurn assistant message and returns nil. +// +// The internal scheduler signal points the PLAN's e2e cases reference +// (e.g. "before registration in activeRequests", "between +// activeRequests.Set and assistant create") are not exposed by the +// codebase, so this stub reproduces the documented black-box outcome by +// controlling run timing directly through blockEntered / release. +type scriptedCoordinator struct { + app *app.App + + // blockEntered, when non-nil, is signaled (once) right after a run + // is entered and before the user message is emitted, letting a test + // interleave a cancel with the dispatched goroutine. + blockEntered chan struct{} + + mu sync.Mutex + // cancels holds the cancel func for every in-flight run, keyed by a + // monotonic id so concurrent runs for the same session each get their + // own entry (a map keyed only by sessionID would let a second run + // overwrite the first's cancel func and leak it). + cancels map[int64]sessionCancel + // pendingCancels counts cancels that arrived for a session while a run + // was in flight; a run for that session consumes one on entry and + // cancels itself, modeling the cancel-on-entry path a follow-up takes. + pendingCancels map[string]int + nextRunID int64 + // entered carries the monotonic run id assigned to each run as it is + // entered, so a test can correlate a later assistant message back to a + // specific run (run 1 vs an accepted follow-up). + entered chan int64 + runStarts atomic.Int32 + + release chan struct{} +} + +type sessionCancel struct { + sessionID string + cancel context.CancelFunc +} + +func newScriptedCoordinator(a *app.App) *scriptedCoordinator { + return &scriptedCoordinator{ + app: a, + cancels: make(map[int64]sessionCancel), + pendingCancels: make(map[string]int), + entered: make(chan int64, 8), + release: make(chan struct{}), + } +} + +func (c *scriptedCoordinator) emitUser(sessionID, id string) { + c.app.SendEvent(pubsub.Event[message.Message]{ + Type: pubsub.CreatedEvent, + Payload: message.Message{ + ID: id, + SessionID: sessionID, + Role: message.User, + Parts: []message.ContentPart{message.TextContent{Text: "hi"}}, + }, + }) +} + +func (c *scriptedCoordinator) emitAssistant(sessionID, id string, reason message.FinishReason) { + c.app.SendEvent(pubsub.Event[message.Message]{ + Type: pubsub.CreatedEvent, + Payload: message.Message{ + ID: id, + SessionID: sessionID, + Role: message.Assistant, + Parts: []message.ContentPart{message.Finish{Reason: reason}}, + }, + }) +} + +func (c *scriptedCoordinator) Run(ctx context.Context, sessionID, prompt string, attachments ...message.Attachment) (*fantasy.AgentResult, error) { + c.runStarts.Add(1) + runCtx, cancel := context.WithCancel(ctx) + + c.mu.Lock() + id := c.nextRunID + c.nextRunID++ + c.cancels[id] = sessionCancel{sessionID: sessionID, cancel: cancel} + // Cancel-on-entry: if a cancel for this session arrived while this + // run was still being dispatched (no run yet in flight to receive + // it), consume the pending cancel now so the run takes the canceled + // path instead of streaming output. + if c.pendingCancels[sessionID] > 0 { + c.pendingCancels[sessionID]-- + cancel() + } + c.mu.Unlock() + + select { + case c.entered <- id: + default: + } + + if c.blockEntered != nil { + select { + case <-c.blockEntered: + case <-runCtx.Done(): + } + } + + defer func() { + c.mu.Lock() + delete(c.cancels, id) + c.mu.Unlock() + cancel() + }() + + // Qualify the emitted message ids with the run id so a test can + // attribute an assistant message to the exact run that produced it + // (run 1 vs an accepted follow-up sharing the same session). + userID := fmt.Sprintf("u-%s-%d", sessionID, id) + asstID := fmt.Sprintf("a-%s-%d", sessionID, id) + + c.emitUser(sessionID, userID) + + // Cancellation takes priority: if the run was already canceled it + // must take the canceled path even when release is closed, so a + // canceled run never races into a normal FinishReasonEndTurn. + select { + case <-runCtx.Done(): + c.emitAssistant(sessionID, asstID, message.FinishReasonCanceled) + return nil, context.Canceled + default: + } + + select { + case <-c.release: + c.emitAssistant(sessionID, asstID, message.FinishReasonEndTurn) + return nil, nil + case <-runCtx.Done(): + c.emitAssistant(sessionID, asstID, message.FinishReasonCanceled) + return nil, context.Canceled + } +} + +func (c *scriptedCoordinator) RunAccepted(ctx context.Context, accept *agent.AcceptedRun, sessionID, prompt string, attachments ...message.Attachment) (*fantasy.AgentResult, error) { + return c.Run(ctx, sessionID, prompt, attachments...) +} + +func (c *scriptedCoordinator) BeginAccepted(string) *agent.AcceptedRun { return nil } + +func (c *scriptedCoordinator) Cancel(sessionID string) { + c.mu.Lock() + defer c.mu.Unlock() + // Cancel every in-flight run for this session. Concurrent runs for + // the same session (an active run plus an accepted follow-up still + // dispatching) each hold their own entry, so all of them are torn + // down by a single per-session cancel. + var canceled int + for _, sc := range c.cancels { + if sc.sessionID == sessionID { + sc.cancel() + canceled++ + } + } + // If at least one run was in flight, arm a pending cancel so a + // follow-up that has been accepted but not yet entered Run takes the + // cancel-on-entry path. With no run in flight this is a no-op, + // mirroring the production guarantee that an idle cancel does not arm + // a pending cancel against the next prompt. + if canceled > 0 { + c.pendingCancels[sessionID]++ + } +} + +func (c *scriptedCoordinator) CancelAll() { + c.mu.Lock() + defer c.mu.Unlock() + for _, sc := range c.cancels { + sc.cancel() + } +} + +func (c *scriptedCoordinator) IsBusy() bool { return false } +func (c *scriptedCoordinator) IsSessionBusy(string) bool { return false } +func (c *scriptedCoordinator) QueuedPrompts(string) int { return 0 } +func (c *scriptedCoordinator) QueuedPromptsList(string) []string { return nil } +func (c *scriptedCoordinator) ClearQueue(string) {} +func (c *scriptedCoordinator) Summarize(context.Context, string) error { return nil } +func (c *scriptedCoordinator) Model() agent.Model { return agent.Model{} } +func (c *scriptedCoordinator) UpdateModels(context.Context) error { return nil } + +// agentE2EHarness extends the SSE harness with a scripted coordinator +// wired into the workspace's embedded app.App, so POST /agent drives a +// real backend.SendMessage dispatch whose emitted user/assistant +// messages fan out over the same SSE pipeline production uses. +type agentE2EHarness struct { + *e2eHarness + coord *scriptedCoordinator +} + +func newAgentE2EHarness(t *testing.T) *agentE2EHarness { + t.Helper() + + h := &e2eHarness{} + + appCtx, cancel := context.WithCancel(context.Background()) + a := app.NewForTest(appCtx) + coord := newScriptedCoordinator(a) + a.AgentCoordinator = coord + t.Cleanup(func() { + cancel() + a.ShutdownForTest() + }) + + h.installServer(t) + + ws := &backend.Workspace{ + ID: uuid.New().String(), + Path: t.TempDir(), + App: a, + } + backend.SetWorkspaceShutdownFnForTest(ws, func() {}) + backend.InsertWorkspaceForTest(h.backend, ws) + + h.workspace = ws + h.app = a + return &agentE2EHarness{e2eHarness: h, coord: coord} +} + +// postAgentHTTP drives POST /v1/workspaces/{id}/agent over the harness's +// httptest server and returns the status code. +func (h *agentE2EHarness) postAgentHTTP(t *testing.T, ctx context.Context, sessionID string) int { + t.Helper() + body, err := json.Marshal(proto.AgentMessage{SessionID: sessionID, Prompt: "hi"}) + require.NoError(t, err) + req, err := http.NewRequestWithContext(ctx, http.MethodPost, + h.httpSrv.URL+"/v1/workspaces/"+h.workspace.ID+"/agent", bytes.NewReader(body)) + require.NoError(t, err) + req.Header.Set("Content-Type", "application/json") + resp, err := h.httpSrv.Client().Do(req) + require.NoError(t, err) + _, _ = io.Copy(io.Discard, resp.Body) + resp.Body.Close() + return resp.StatusCode +} + +// cancelAgentHTTP drives POST /v1/workspaces/{id}/agent/sessions/{sid}/cancel. +func (h *agentE2EHarness) cancelAgentHTTP(t *testing.T, ctx context.Context, sessionID string) int { + t.Helper() + req, err := http.NewRequestWithContext(ctx, http.MethodPost, + h.httpSrv.URL+"/v1/workspaces/"+h.workspace.ID+"/agent/sessions/"+sessionID+"/cancel", nil) + require.NoError(t, err) + resp, err := h.httpSrv.Client().Do(req) + require.NoError(t, err) + _, _ = io.Copy(io.Discard, resp.Body) + resp.Body.Close() + return resp.StatusCode +} + +// waitForRunEntered blocks until a dispatched run for any session has +// been entered by the scripted coordinator, or fails the test. It +// returns the monotonic run id assigned to that run so a caller can +// correlate it with a later assistant message; callers that don't need +// the id can ignore the return value. +func (h *agentE2EHarness) waitForRunEntered(t *testing.T) int64 { + t.Helper() + select { + case id := <-h.coord.entered: + return id + case <-time.After(2 * time.Second): + t.Fatal("dispatched run was never entered") + return 0 + } +} + +// finishReason extracts the assistant message's FinishReason, if any. +func finishReason(m proto.Message) (proto.FinishReason, bool) { + for _, p := range m.Parts { + if f, ok := p.(proto.Finish); ok { + return f.Reason, true + } + } + return "", false +} + +// TestE2E_CancelByOtherClientDoesNotErrorPrompter covers PLAN Tests -> +// New end-to-end coverage item 1: a second client canceling a run does +// not surface a server error to the prompter; the run ends with a +// FinishReasonCanceled assistant message and no AgentEvent carries a +// non-nil Error. +func TestE2E_CancelByOtherClientDoesNotErrorPrompter(t *testing.T) { + t.Parallel() + h := newAgentE2EHarness(t) + ctx, cancel := context.WithCancel(t.Context()) + t.Cleanup(cancel) + + cidA := uuid.New().String() + cidB := uuid.New().String() + evcA, cancelA := h.subscribeSSE(t, ctx, h.workspace.ID, cidA) + t.Cleanup(cancelA) + evcB, cancelB := h.subscribeSSE(t, ctx, h.workspace.ID, cidB) + t.Cleanup(cancelB) + h.waitForAttached(t, 2) + + const sid = "s-cancel-other" + + // A posts a long-running prompt; the handler must return 202 + // immediately (the run blocks in the coordinator). + require.Equal(t, http.StatusAccepted, h.postAgentHTTP(t, ctx, sid)) + h.waitForRunEntered(t) + + // B cancels. + require.Equal(t, http.StatusOK, h.cancelAgentHTTP(t, ctx, sid)) + + // A's SSE stream receives the FinishReasonCanceled assistant + // message. + pickCtx, pickCancel := context.WithTimeout(ctx, 3*time.Second) + defer pickCancel() + got, ok := drainUntil(pickCtx, evcA, func(e pubsub.Event[proto.Message]) bool { + r, has := finishReason(e.Payload) + return e.Payload.Role == proto.Assistant && has && r == proto.FinishReasonCanceled + }) + require.True(t, ok, "client A must observe a FinishReasonCanceled assistant message") + require.Equal(t, sid, got.Payload.SessionID) + + // No AgentEvent error reaches A (cancel is not a server error). + errCtx, errCancel := context.WithTimeout(ctx, 250*time.Millisecond) + defer errCancel() + _, gotErrA := drainUntil(errCtx, evcA, func(e pubsub.Event[proto.AgentEvent]) bool { + return e.Payload.Type == proto.AgentEventTypeError && e.Payload.Error != nil + }) + require.False(t, gotErrA, "cancel must not surface an AgentEvent error to the prompter") + + // And no AgentEvent error reaches the canceling client B either; the + // PLAN requires that *no* client observes a non-nil Error. + errCtxB, errCancelB := context.WithTimeout(ctx, 250*time.Millisecond) + defer errCancelB() + _, gotErrB := drainUntil(errCtxB, evcB, func(e pubsub.Event[proto.AgentEvent]) bool { + return e.Payload.Type == proto.AgentEventTypeError && e.Payload.Error != nil + }) + require.False(t, gotErrB, "cancel must not surface an AgentEvent error to any client") +} + +// TestE2E_CancelImmediatelyAfter202IsNotLost covers PLAN item 1a: a +// cancel that races a freshly-dispatched run (before it would emit any +// output) is not lost. The run takes the cancel-on-entry path and emits +// a user message followed by a FinishReasonCanceled assistant message +// rather than streaming model output. +func TestE2E_CancelImmediatelyAfter202IsNotLost(t *testing.T) { + t.Parallel() + h := newAgentE2EHarness(t) + // Gate the run on a signal the test controls so the cancel can be + // observed while the dispatched goroutine is parked at entry. + h.coord.blockEntered = make(chan struct{}) + + ctx, cancel := context.WithCancel(t.Context()) + t.Cleanup(cancel) + + cid := uuid.New().String() + evc, cancelSSE := h.subscribeSSE(t, ctx, h.workspace.ID, cid) + t.Cleanup(cancelSSE) + h.waitForAttached(t, 1) + + const sid = "s-race-cancel" + require.Equal(t, http.StatusAccepted, h.postAgentHTTP(t, ctx, sid)) + h.waitForRunEntered(t) + + // Cancel while the run is still blocked at entry, then release it. + require.Equal(t, http.StatusOK, h.cancelAgentHTTP(t, ctx, sid)) + close(h.coord.blockEntered) + + pickCtx, pickCancel := context.WithTimeout(ctx, 3*time.Second) + defer pickCancel() + + gotUser, okUser := drainUntil(pickCtx, evc, func(e pubsub.Event[proto.Message]) bool { + return e.Payload.Role == proto.User && e.Payload.SessionID == sid + }) + require.True(t, okUser, "the canceled turn must still record a user message") + require.Equal(t, sid, gotUser.Payload.SessionID) + + gotAsst, okAsst := drainUntil(pickCtx, evc, func(e pubsub.Event[proto.Message]) bool { + r, has := finishReason(e.Payload) + return e.Payload.Role == proto.Assistant && has && r == proto.FinishReasonCanceled + }) + require.True(t, okAsst, "the canceled turn must end with a FinishReasonCanceled assistant message") + require.Equal(t, sid, gotAsst.Payload.SessionID) +} + +// TestE2E_IdleCancelDoesNotPoisonNextPrompt covers PLAN item 1b: an +// idle cancel (no active run) must not poison the next prompt. With the +// scripted coordinator the cancel records a pending entry only if a run +// is in flight; an idle cancel records one, but the documented +// guarantee is that the *next* prompt's outcome is observable. Here we +// assert the regression-relevant external behavior: after an idle +// cancel, a subsequent normal prompt is able to run and emit output. +// +// NOTE: This is a simplified version. The real "idle Escape must not +// poison" guarantee lives inside sessionAgent.Cancel's acceptedRuns +// gating, which is covered by the agent unit tests; the e2e stub cannot +// distinguish "truly idle" from "accepted but not yet running" without +// the internal acceptedRuns signal. See test summary. +func TestE2E_IdleCancelDoesNotPoisonNextPrompt(t *testing.T) { + t.Parallel() + h := newAgentE2EHarness(t) + ctx, cancel := context.WithCancel(t.Context()) + t.Cleanup(cancel) + + cid := uuid.New().String() + evc, cancelSSE := h.subscribeSSE(t, ctx, h.workspace.ID, cid) + t.Cleanup(cancelSSE) + h.waitForAttached(t, 1) + + const sid = "s-idle-cancel" + + // Idle cancel: no run in flight. The scripted coordinator drops it + // (no pending cancel recorded for a session that has no run), which + // models the production guarantee that an idle Escape does not arm + // a cancel against the next prompt. + require.Equal(t, http.StatusOK, h.cancelAgentHTTP(t, ctx, sid)) + + // Now a normal prompt; release it so it finishes successfully. + require.Equal(t, http.StatusAccepted, h.postAgentHTTP(t, ctx, sid)) + h.waitForRunEntered(t) + close(h.coord.release) + + pickCtx, pickCancel := context.WithTimeout(ctx, 3*time.Second) + defer pickCancel() + got, ok := drainUntil(pickCtx, evc, func(e pubsub.Event[proto.Message]) bool { + r, has := finishReason(e.Payload) + return e.Payload.Role == proto.Assistant && has && r == proto.FinishReasonEndTurn + }) + require.True(t, ok, "the next prompt after an idle cancel must run to FinishReasonEndTurn") + require.Equal(t, sid, got.Payload.SessionID) + + // And it must not be marked canceled. + canCtx, canCancel := context.WithTimeout(ctx, 200*time.Millisecond) + defer canCancel() + _, gotCanceled := drainUntil(canCtx, evc, func(e pubsub.Event[proto.Message]) bool { + r, has := finishReason(e.Payload) + return e.Payload.Role == proto.Assistant && has && r == proto.FinishReasonCanceled + }) + require.False(t, gotCanceled, "an idle cancel must not produce a FinishReasonCanceled marker on the next prompt") +} + +// TestE2E_CancelBetweenActiveSetAndAssistantCreate covers PLAN item 1d: +// a cancel that arrives after the run has begun but before it would +// create the assistant message must still produce a user message and a +// FinishReasonCanceled assistant message, never a silent return. The +// blockEntered gate parks the run after entry (modeling the window +// between activeRequests.Set and assistant creation). +func TestE2E_CancelBetweenActiveSetAndAssistantCreate(t *testing.T) { + t.Parallel() + h := newAgentE2EHarness(t) + h.coord.blockEntered = make(chan struct{}) + + ctx, cancel := context.WithCancel(t.Context()) + t.Cleanup(cancel) + + cid := uuid.New().String() + evc, cancelSSE := h.subscribeSSE(t, ctx, h.workspace.ID, cid) + t.Cleanup(cancelSSE) + h.waitForAttached(t, 1) + + const sid = "s-mid-window" + require.Equal(t, http.StatusAccepted, h.postAgentHTTP(t, ctx, sid)) + h.waitForRunEntered(t) + + // Cancel while parked at entry; then release so the run proceeds + // into its cancel branch (runCtx already canceled). + require.Equal(t, http.StatusOK, h.cancelAgentHTTP(t, ctx, sid)) + close(h.coord.blockEntered) + + pickCtx, pickCancel := context.WithTimeout(ctx, 3*time.Second) + defer pickCancel() + + _, okUser := drainUntil(pickCtx, evc, func(e pubsub.Event[proto.Message]) bool { + return e.Payload.Role == proto.User && e.Payload.SessionID == sid + }) + require.True(t, okUser, "a user message must be recorded for the canceled turn") + + gotAsst, okAsst := drainUntil(pickCtx, evc, func(e pubsub.Event[proto.Message]) bool { + r, has := finishReason(e.Payload) + return e.Payload.Role == proto.Assistant && has && r == proto.FinishReasonCanceled + }) + require.True(t, okAsst, "the run must not return silently; it must emit a FinishReasonCanceled assistant message") + require.Equal(t, sid, gotAsst.Payload.SessionID) + + // No AgentEvent error is published: a cancel in the + // activeRequests.Set -> assistant-create window is not a server + // error. + errCtx, errCancel := context.WithTimeout(ctx, 250*time.Millisecond) + defer errCancel() + _, gotErr := drainUntil(errCtx, evc, func(e pubsub.Event[proto.AgentEvent]) bool { + return e.Payload.Type == proto.AgentEventTypeError && e.Payload.Error != nil + }) + require.False(t, gotErr, "no AgentEvent error must be published for the canceled turn") +} + +// TestE2E_PromptRequestContextDoesNotOwnRun covers PLAN item 2: the +// prompting client's HTTP request context does not own the run. A POST +// with a very short request-context timeout still returns 202 before +// that context would expire, and the run keeps going (observed via SSE +// finishing normally after release). +func TestE2E_PromptRequestContextDoesNotOwnRun(t *testing.T) { + t.Parallel() + h := newAgentE2EHarness(t) + streamCtx, streamCancel := context.WithCancel(t.Context()) + t.Cleanup(streamCancel) + + cid := uuid.New().String() + evc, cancelSSE := h.subscribeSSE(t, streamCtx, h.workspace.ID, cid) + t.Cleanup(cancelSSE) + h.waitForAttached(t, 1) + + const sid = "s-short-req" + + // The POST request context times out almost immediately. The + // handler must still return 202 (fire-and-forget) and the run must + // survive past the request-context deadline. + reqCtx, reqCancel := context.WithTimeout(t.Context(), 50*time.Millisecond) + defer reqCancel() + require.Equal(t, http.StatusAccepted, h.postAgentHTTP(t, reqCtx, sid)) + h.waitForRunEntered(t) + + // Let the request context expire, then release the run. + <-reqCtx.Done() + close(h.coord.release) + + pickCtx, pickCancel := context.WithTimeout(streamCtx, 3*time.Second) + defer pickCancel() + got, ok := drainUntil(pickCtx, evc, func(e pubsub.Event[proto.Message]) bool { + r, has := finishReason(e.Payload) + return e.Payload.Role == proto.Assistant && has && r == proto.FinishReasonEndTurn + }) + require.True(t, ok, "the run must finish normally even after the prompting request context expired") + require.Equal(t, sid, got.Payload.SessionID) +} + +// TestE2E_AgentRunSurvivesAcrossWorkspaceClaims covers PLAN item 3: a +// run started by client A survives A detaching as long as another +// client (B) keeps the workspace alive; B observes the run finish via +// SSE. +func TestE2E_AgentRunSurvivesAcrossWorkspaceClaims(t *testing.T) { + t.Parallel() + h := newAgentE2EHarness(t) + + ctxA, cancelA := context.WithCancel(t.Context()) + ctxB, cancelB := context.WithCancel(t.Context()) + t.Cleanup(cancelB) + + cidA := uuid.New().String() + cidB := uuid.New().String() + _, killA := h.subscribeSSE(t, ctxA, h.workspace.ID, cidA) + t.Cleanup(killA) + evcB, killB := h.subscribeSSE(t, ctxB, h.workspace.ID, cidB) + t.Cleanup(killB) + h.waitForAttached(t, 2) + + const sid = "s-survive" + // A is the poster; the run must outlive A detaching as long as B + // keeps the workspace alive. + require.Equal(t, http.StatusAccepted, h.postAgentHTTP(t, ctxA, sid)) + h.waitForRunEntered(t) + + // A detaches; B is still attached so the workspace stays alive. + cancelA() + killA() + require.Eventually(t, func() bool { + return backend.WorkspaceLiveStreamCountForTest(h.workspace) == 1 + }, 3*time.Second, 10*time.Millisecond, + "A detaching must leave B as the sole attached client") + require.False(t, h.shutdownHit.Load(), "workspace must stay alive while B is attached") + + // Release the run; B must still observe it finish. + close(h.coord.release) + pickCtx, pickCancel := context.WithTimeout(ctxB, 3*time.Second) + defer pickCancel() + got, ok := drainUntil(pickCtx, evcB, func(e pubsub.Event[proto.Message]) bool { + r, has := finishReason(e.Payload) + return e.Payload.Role == proto.Assistant && has && r == proto.FinishReasonEndTurn + }) + require.True(t, ok, "B must observe the run finish after A detaches") + require.Equal(t, sid, got.Payload.SessionID) +} + +// TestE2E_CancelOfActiveRunAlsoCancelsAcceptedFollowUp covers PLAN item +// 1c at the externally-observable level: while session sid has an active +// run, a second prompt for sid is accepted; a cancel for sid must cancel +// the active run and must not let the follow-up stream a normal +// FinishReasonEndTurn. +// +// The sequence follows the PLAN exactly: prompt 1 becomes the active +// run, prompt 2 for the same sid is accepted, then a cancel for sid +// fires, and only afterwards are any signals released. The scripted +// coordinator models the externally-observable contract of the +// busy-queue branch and pendingCancels (which depend on internal +// scheduler signals the codebase does not expose): a per-session cancel +// tears down every in-flight run for sid and arms a cancel-on-entry for +// a follow-up still dispatching. The invariant asserted is the one that +// matters: after the cancel, the active run ends canceled and the +// follow-up never streams a normal FinishReasonEndTurn. +func TestE2E_CancelOfActiveRunAlsoCancelsAcceptedFollowUp(t *testing.T) { + t.Parallel() + h := newAgentE2EHarness(t) + ctx, cancel := context.WithCancel(t.Context()) + t.Cleanup(cancel) + + cid := uuid.New().String() + evc, cancelSSE := h.subscribeSSE(t, ctx, h.workspace.ID, cid) + t.Cleanup(cancelSSE) + h.waitForAttached(t, 1) + + const sid = "s-followup" + + // (a) Prompt 1 for sid becomes the active run. Capture its run id so + // the canceled assistant message below can be attributed to run 1 + // unambiguously. + require.Equal(t, http.StatusAccepted, h.postAgentHTTP(t, ctx, sid)) + run1 := h.waitForRunEntered(t) + + // (b) Prompt 2 for the *same* sid is accepted while the active run + // is still in flight; it is the follow-up the PLAN describes + // (acceptedRuns > 0, either still dispatching or about to enter the + // busy-queue branch). + require.Equal(t, http.StatusAccepted, h.postAgentHTTP(t, ctx, sid)) + run2 := h.waitForRunEntered(t) + require.NotEqual(t, run1, run2, "the follow-up must be a distinct run from the active one") + + // (c) B cancels sid. This tears down every in-flight run for the + // session and arms a pending cancel for any follow-up that has not + // yet entered Run. + require.Equal(t, http.StatusOK, h.cancelAgentHTTP(t, ctx, sid)) + + // (d) Open the coordinator gate so any run that is NOT canceled would + // be free to proceed straight into the normal FinishReasonEndTurn + // branch. The scripted Run checks runCtx.Done() before the release + // select, so a canceled run still takes the canceled path even with + // release closed; only a non-canceled run reaches FinishReasonEndTurn. + // Releasing here is therefore what makes the assertions below + // meaningful: if the cancel had failed to tear down run 1 or arm the + // cancel-on-entry for the follow-up, the freed gate would let that run + // stream a normal FinishReasonEndTurn and the test would fail. + close(h.coord.release) + + pickCtx, pickCancel := context.WithTimeout(ctx, 3*time.Second) + defer pickCancel() + + // (e) Run 1 (the active run) must end with FinishReasonCanceled. The + // assistant message id is qualified with the run id, so matching on + // run1's id proves the cancellation is attributed to the FIRST run + // and not to the follow-up. + // + // The single drain below is also the negative assertion for run 2: + // the match closure inspects every assistant event for sid as it + // scans, and if it ever observes the follow-up (run 2) streaming a + // normal FinishReasonEndTurn it records that violation immediately. + // This is what makes the run-2 check sound: a previous two-phase + // approach could let this very drain consume and discard a run-2 + // EndTurn while still hunting for run 1's canceled message, leaving a + // later no-EndTurn check unable to prove run 2 stayed canceled. + // Folding the negative check into the same scan means a run-2 EndTurn + // can never slip past unobserved, whether it arrives before or after + // run 1's canceled message. + run1AsstID := fmt.Sprintf("a-%s-%d", sid, run1) + run2AsstID := fmt.Sprintf("a-%s-%d", sid, run2) + var followUpEndTurn bool + got, ok := drainUntil(pickCtx, evc, func(e pubsub.Event[proto.Message]) bool { + if e.Payload.SessionID != sid || e.Payload.Role != proto.Assistant { + return false + } + r, has := finishReason(e.Payload) + if !has { + return false + } + // Any normal model output for sid after the cancel is a + // violation. The follow-up (run 2) must never reach the + // FinishReasonEndTurn branch; flag it the moment it is seen so + // the assertion below fails even if this event arrives while we + // are still waiting for run 1's canceled message. + if r == proto.FinishReasonEndTurn { + if e.Payload.ID == run2AsstID || e.Payload.ID != run1AsstID { + followUpEndTurn = true + } + // Stop draining; the EndTurn observation is decisive and the + // require.False below will surface the failure. + return true + } + return e.Payload.ID == run1AsstID && r == proto.FinishReasonCanceled + }) + require.False(t, followUpEndTurn, "the accepted follow-up must not stream a normal FinishReasonEndTurn after the cancel") + require.True(t, ok, "the first (active) run must end with FinishReasonCanceled") + require.Equal(t, run1AsstID, got.Payload.ID, "the canceled message must belong to the first (active) run") + gotReason, gotHas := finishReason(got.Payload) + require.True(t, gotHas) + require.Equal(t, proto.FinishReasonCanceled, gotReason, "the matched run-1 message must be canceled, not a normal end turn") + require.Equal(t, sid, got.Payload.SessionID) + + // Confirm no normal FinishReasonEndTurn for sid is still in flight. + // By this point the scan above has already ruled out a run-2 EndTurn + // arriving before run 1's canceled message; this guards against one + // arriving afterward. + endCtx, endCancel := context.WithTimeout(ctx, 300*time.Millisecond) + defer endCancel() + _, gotEnd := drainUntil(endCtx, evc, func(e pubsub.Event[proto.Message]) bool { + r, has := finishReason(e.Payload) + return e.Payload.SessionID == sid && e.Payload.Role == proto.Assistant && has && r == proto.FinishReasonEndTurn + }) + require.False(t, gotEnd, "the accepted follow-up must not stream model output after the cancel") +} diff --git a/internal/server/e2e_test.go b/internal/server/e2e_test.go index 08aaedf66c95edd704f18b62d83d64e79966564e..565a989136536e5bea8b1134995a3770183d4caa 100644 --- a/internal/server/e2e_test.go +++ b/internal/server/e2e_test.go @@ -240,6 +240,18 @@ func decodeSSEEnvelope(p pubsub.Payload) (any, bool) { return nil, false } return e, true + case pubsub.PayloadTypeAgentEvent: + var e pubsub.Event[proto.AgentEvent] + if err := json.Unmarshal(p.Payload, &e); err != nil { + return nil, false + } + return e, true + case pubsub.PayloadTypeRunComplete: + var e pubsub.Event[proto.RunComplete] + if err := json.Unmarshal(p.Payload, &e); err != nil { + return nil, false + } + return e, true } return nil, false } diff --git a/internal/ui/model/ui.go b/internal/ui/model/ui.go index 890dfc7de8a97eae13c4ecbd56ca07b566061408..2972400f236de92533ab336684ffcc10843bbda6 100644 --- a/internal/ui/model/ui.go +++ b/internal/ui/model/ui.go @@ -3283,12 +3283,12 @@ func (m *UI) sendMessage(content string, attachments ...message.Attachment) tea. // Capture session ID to avoid race with main goroutine updating m.session. sessionID := m.session.ID cmds = append(cmds, func() tea.Msg { + // AgentRun is fire-and-forget: it returns once the prompt has + // been accepted (HTTP 202) or synchronously with a validation + // or transport error. Run failures and cancellation surface + // through SSE-derived events, not this return value. err := m.com.Workspace.AgentRun(context.Background(), sessionID, content, attachments...) if err != nil { - isCancelErr := errors.Is(err, context.Canceled) - if isCancelErr { - return nil - } return util.InfoMsg{ Type: util.InfoTypeError, Msg: fmt.Sprintf("%v", err),