e2e_test.go

  1package server
  2
  3import (
  4	"bufio"
  5	"bytes"
  6	"context"
  7	"encoding/json"
  8	"io"
  9	"net/http"
 10	"net/http/httptest"
 11	"net/url"
 12	"sync"
 13	"sync/atomic"
 14	"testing"
 15	"time"
 16
 17	"github.com/charmbracelet/crush/internal/app"
 18	"github.com/charmbracelet/crush/internal/backend"
 19	"github.com/charmbracelet/crush/internal/message"
 20	"github.com/charmbracelet/crush/internal/permission"
 21	"github.com/charmbracelet/crush/internal/proto"
 22	"github.com/charmbracelet/crush/internal/pubsub"
 23	"github.com/google/uuid"
 24	"github.com/stretchr/testify/require"
 25)
 26
 27// e2eHarness wires a Server, its Backend (with a custom shutdownFn we
 28// can observe), an httptest.NewServer, and a synthetic Workspace whose
 29// embedded App has a live event broker. It is the minimum scaffolding
 30// the multi-client end-to-end scenarios in PLAN item 6 need.
 31type e2eHarness struct {
 32	httpSrv     *httptest.Server
 33	srv         *Server
 34	backend     *backend.Backend
 35	workspace   *backend.Workspace
 36	app         *app.App
 37	shutdownHit atomic.Bool
 38
 39	// sseWG tracks every SSE reader goroutine spawned by
 40	// [e2eHarness.subscribeSSE]. The harness's cleanup hook waits on
 41	// it after the httptest server has been closed so that the test
 42	// cannot leave behind background readers (and therefore unclosed
 43	// response bodies) after returning.
 44	sseWG sync.WaitGroup
 45}
 46
 47// installServer attaches a fresh Server (with a custom shutdown
 48// callback that flips [e2eHarness.shutdownHit]) wrapped in an
 49// [httptest.Server] onto h. It registers the cleanup hooks for the
 50// httptest server and the SSE reader WaitGroup in the order required
 51// by the LIFO contract documented on [newE2EHarness].
 52//
 53// Callers that want a fully synthetic workspace use [newE2EHarness];
 54// callers that want to drive the real CreateWorkspace HTTP path use
 55// [newRealCreateHarness] and then [e2eHarness.postWorkspace].
 56func (h *e2eHarness) installServer(t *testing.T) {
 57	t.Helper()
 58	srv := &Server{}
 59	srv.backend = backend.New(context.Background(), nil, func() {
 60		h.shutdownHit.Store(true)
 61	})
 62	srv.installHandler()
 63
 64	hs := httptest.NewServer(srv.Handler())
 65	// Order matters: t.Cleanup is LIFO and the test's own per-
 66	// stream cancels (cancelA/cancelB) run first. After those, we
 67	// want hs.Close to fire first (so any handler still parked in
 68	// its `select` returns), THEN sseWG.Wait so every reader
 69	// goroutine exits and closes its response body. Any caller-
 70	// owned cleanups registered *before* installServer (e.g. App
 71	// teardown for the synthetic harness) therefore run LAST,
 72	// after the readers have drained.
 73	t.Cleanup(h.sseWG.Wait)
 74	t.Cleanup(hs.Close)
 75
 76	h.httpSrv = hs
 77	h.srv = srv
 78	h.backend = srv.backend
 79}
 80
 81// newE2EHarness builds an in-process server + a synthetic Workspace
 82// whose embedded App is a real [app.App] constructed via
 83// [app.NewForTest], so its event broker delivers everything the SSE
 84// pipeline expects. Used by the scenarios that do not need to
 85// exercise the path-dedupe behavior of [backend.CreateWorkspace].
 86//
 87// Cleanup tears down the App's broker only after sseWG.Wait and
 88// hs.Close have run, so SSE readers cannot observe a dead broker.
 89func newE2EHarness(t *testing.T) *e2eHarness {
 90	t.Helper()
 91
 92	h := &e2eHarness{}
 93
 94	// Register the App teardown FIRST so LIFO order puts it AFTER
 95	// the cleanups that installServer registers below (hs.Close +
 96	// sseWG.Wait).
 97	appCtx, cancel := context.WithCancel(context.Background())
 98	a := app.NewForTest(appCtx)
 99	t.Cleanup(func() {
100		cancel()
101		a.ShutdownForTest()
102	})
103
104	h.installServer(t)
105
106	ws := &backend.Workspace{
107		ID:   uuid.New().String(),
108		Path: t.TempDir(),
109		App:  a,
110	}
111	// Synthetic workspaces have an incomplete App; bypass the
112	// default teardown so the "last workspace removed" path can run
113	// without panicking inside [app.App.Shutdown].
114	backend.SetWorkspaceShutdownFnForTest(ws, func() {})
115	backend.InsertWorkspaceForTest(h.backend, ws)
116
117	h.workspace = ws
118	h.app = a
119	return h
120}
121
122// newRealCreateHarness builds an in-process server WITHOUT any
123// pre-inserted workspace, intended for tests that drive the real
124// [backend.CreateWorkspace] HTTP path (path-dedupe scenario). It
125// isolates HOME/XDG_* via [t.Setenv] so [config.Init] doesn't read
126// the host machine's config, which means callers MUST NOT mark the
127// test as parallel.
128func newRealCreateHarness(t *testing.T) *e2eHarness {
129	t.Helper()
130	t.Setenv("HOME", t.TempDir())
131	t.Setenv("XDG_CACHE_HOME", t.TempDir())
132	t.Setenv("XDG_CONFIG_HOME", t.TempDir())
133	t.Setenv("XDG_DATA_HOME", t.TempDir())
134
135	h := &e2eHarness{}
136	h.installServer(t)
137	return h
138}
139
140// postWorkspace drives the real POST /v1/workspaces handler and
141// returns the resolved workspace proto. This is how scenario 1
142// exercises the path-dedupe behavior from PLAN item 1: two calls
143// with the same Path and distinct ClientIDs must return the same
144// workspace ID.
145func (h *e2eHarness) postWorkspace(t *testing.T, args proto.Workspace) proto.Workspace {
146	t.Helper()
147	body, err := json.Marshal(args)
148	require.NoError(t, err)
149	req, err := http.NewRequestWithContext(t.Context(), http.MethodPost,
150		h.httpSrv.URL+"/v1/workspaces", bytes.NewReader(body))
151	require.NoError(t, err)
152	req.Header.Set("Content-Type", "application/json")
153	resp, err := h.httpSrv.Client().Do(req)
154	require.NoError(t, err)
155	defer resp.Body.Close()
156	require.Equal(t, http.StatusOK, resp.StatusCode, "POST /v1/workspaces must succeed")
157	var out proto.Workspace
158	require.NoError(t, json.NewDecoder(resp.Body).Decode(&out))
159	require.NotEmpty(t, out.ID, "server must return a workspace id")
160	return out
161}
162
163// subscribeSSE opens an SSE stream against the test server for the
164// given workspace and client ID. It returns a channel of decoded
165// envelopes plus a cancel function that closes the stream. The
166// returned channel is closed when the stream ends.
167func (h *e2eHarness) subscribeSSE(t *testing.T, ctx context.Context, workspaceID, clientID string) (<-chan any, context.CancelFunc) {
168	t.Helper()
169	streamCtx, cancel := context.WithCancel(ctx)
170
171	q := url.Values{"client_id": []string{clientID}}
172	reqURL := h.httpSrv.URL + "/v1/workspaces/" + workspaceID + "/events?" + q.Encode()
173	req, err := http.NewRequestWithContext(streamCtx, http.MethodGet, reqURL, nil)
174	require.NoError(t, err)
175	req.Header.Set("Accept", "text/event-stream")
176
177	resp, err := h.httpSrv.Client().Do(req)
178	require.NoError(t, err)
179	require.Equal(t, http.StatusOK, resp.StatusCode, "SSE subscribe should return 200")
180
181	out := make(chan any, 64)
182	h.sseWG.Go(func() {
183		defer resp.Body.Close()
184		defer close(out)
185		reader := bufio.NewReader(resp.Body)
186		for {
187			line, err := reader.ReadBytes('\n')
188			if err != nil {
189				return
190			}
191			line = bytes.TrimSpace(line)
192			if len(line) == 0 {
193				continue
194			}
195			data, ok := bytes.CutPrefix(line, []byte("data:"))
196			if !ok {
197				continue
198			}
199			data = bytes.TrimSpace(data)
200			var p pubsub.Payload
201			if err := json.Unmarshal(data, &p); err != nil {
202				continue
203			}
204			ev, decoded := decodeSSEEnvelope(p)
205			if !decoded {
206				continue
207			}
208			select {
209			case out <- ev:
210			case <-streamCtx.Done():
211				return
212			}
213		}
214	})
215	return out, cancel
216}
217
218// decodeSSEEnvelope decodes the discriminated SSE envelope into the
219// concrete pubsub.Event[proto.X] payload the e2e tests care about.
220// Unknown payload types are skipped so tests can match on type
221// assertions without worrying about envelope noise.
222func decodeSSEEnvelope(p pubsub.Payload) (any, bool) {
223	switch p.Type {
224	case pubsub.PayloadTypePermissionRequest:
225		var e pubsub.Event[proto.PermissionRequest]
226		if err := json.Unmarshal(p.Payload, &e); err != nil {
227			return nil, false
228		}
229		return e, true
230	case pubsub.PayloadTypePermissionNotification:
231		var e pubsub.Event[proto.PermissionNotification]
232		if err := json.Unmarshal(p.Payload, &e); err != nil {
233			return nil, false
234		}
235		return e, true
236	case pubsub.PayloadTypeMessage:
237		var e pubsub.Event[proto.Message]
238		if err := json.Unmarshal(p.Payload, &e); err != nil {
239			return nil, false
240		}
241		return e, true
242	}
243	return nil, false
244}
245
246// grantPermission posts a permission grant via the HTTP surface and
247// returns the server's "resolved" verdict. Mirrors the client-side
248// GrantPermission flow without importing internal/client (which
249// would create an import cycle from this in-package test).
250func (h *e2eHarness) grantPermission(t *testing.T, ctx context.Context, workspaceID string, req proto.PermissionGrant) bool {
251	t.Helper()
252	body, err := json.Marshal(req)
253	require.NoError(t, err)
254	httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost,
255		h.httpSrv.URL+"/v1/workspaces/"+workspaceID+"/permissions/grant",
256		bytes.NewReader(body))
257	require.NoError(t, err)
258	httpReq.Header.Set("Content-Type", "application/json")
259	resp, err := h.httpSrv.Client().Do(httpReq)
260	require.NoError(t, err)
261	defer resp.Body.Close()
262	require.Equal(t, http.StatusOK, resp.StatusCode)
263	var out proto.PermissionGrantResponse
264	require.NoError(t, json.NewDecoder(resp.Body).Decode(&out))
265	return out.Resolved
266}
267
268// waitForAttached spins until the workspace's clients map reports at
269// least n entries with streams > 0. Catches the race where a test
270// publishes events before the server-side AttachClient has completed.
271func (h *e2eHarness) waitForAttached(t *testing.T, n int) {
272	t.Helper()
273	h.waitForAttachedOn(t, h.workspace, n)
274}
275
276// waitForAttachedOn is the workspace-explicit form of waitForAttached.
277// Tests that drive a workspace whose pointer is not stored on the
278// harness (e.g. the real CreateWorkspace path) pass the workspace in.
279func (h *e2eHarness) waitForAttachedOn(t *testing.T, ws *backend.Workspace, n int) {
280	t.Helper()
281	deadline := time.Now().Add(2 * time.Second)
282	for time.Now().Before(deadline) {
283		if backend.WorkspaceLiveStreamCountForTest(ws) >= n {
284			return
285		}
286		time.Sleep(5 * time.Millisecond)
287	}
288	t.Fatalf("expected %d attached streams, have %d", n,
289		backend.WorkspaceLiveStreamCountForTest(ws))
290}
291
292// drainUntil reads from evc until it sees an event of type T that
293// satisfies match, or ctx expires. Returns the matching event and
294// ok=true, or the zero value and ok=false on timeout.
295func drainUntil[T any](ctx context.Context, evc <-chan any, match func(T) bool) (T, bool) {
296	var zero T
297	for {
298		select {
299		case <-ctx.Done():
300			return zero, false
301		case ev, ok := <-evc:
302			if !ok {
303				return zero, false
304			}
305			typed, isT := ev.(T)
306			if !isT {
307				continue
308			}
309			if match == nil || match(typed) {
310				return typed, true
311			}
312		}
313	}
314}
315
316// TestE2E_TwoClientsReceiveSameMessage covers PLAN item 6 scenario 1:
317// two clients POST /v1/workspaces with the same Path and observe
318// that the server returns a single workspace (path-dedupe from PLAN
319// item 1) and that an event published on that workspace fans out to
320// both SSE streams.
321//
322// Cannot run in parallel: it isolates HOME/XDG_* via t.Setenv so
323// config.Init does not read the host machine's real config.
324func TestE2E_TwoClientsReceiveSameMessage(t *testing.T) {
325	h := newRealCreateHarness(t)
326	// Shorten the create-grace window so the workspace's pending
327	// creation holds release quickly during test cleanup once both
328	// SSE streams have been detached.
329	h.backend.SetCreateGrace(200 * time.Millisecond)
330
331	ctx, cancel := context.WithCancel(t.Context())
332	t.Cleanup(cancel)
333
334	cidA := uuid.New().String()
335	cidB := uuid.New().String()
336
337	// Shared workspace path. Two POSTs with this path must
338	// deduplicate at the backend's pathIndex and return the same
339	// workspace id.
340	wsPath := t.TempDir()
341	dataDir := t.TempDir()
342	args := proto.Workspace{Path: wsPath, DataDir: dataDir}
343
344	argsA := args
345	argsA.ClientID = cidA
346	wsRespA := h.postWorkspace(t, argsA)
347
348	argsB := args
349	argsB.ClientID = cidB
350	wsRespB := h.postWorkspace(t, argsB)
351
352	require.Equal(t, wsRespA.ID, wsRespB.ID,
353		"POST /v1/workspaces with the same Path must return the same workspace id")
354
355	// Look up the resulting workspace on the backend so the test
356	// can publish events through its real [app.App] event broker.
357	ws, err := h.backend.GetWorkspace(wsRespA.ID)
358	require.NoError(t, err)
359	// Override the shutdown callback so test cleanup doesn't run
360	// the full app.Shutdown path (which would tear down LSP/MCP/DB
361	// resources the test doesn't need to exercise).
362	backend.SetWorkspaceShutdownFnForTest(ws, func() {})
363
364	evcA, cancelA := h.subscribeSSE(t, ctx, ws.ID, cidA)
365	t.Cleanup(cancelA)
366	evcB, cancelB := h.subscribeSSE(t, ctx, ws.ID, cidB)
367	t.Cleanup(cancelB)
368
369	h.waitForAttachedOn(t, ws, 2)
370
371	const sessionID = "s-e2e-1"
372	msg := message.Message{
373		ID:        "m-1",
374		SessionID: sessionID,
375		Role:      message.Assistant,
376		Parts:     []message.ContentPart{message.TextContent{Text: "hello multi-client"}},
377	}
378	ws.SendEvent(pubsub.Event[message.Message]{
379		Type:    pubsub.CreatedEvent,
380		Payload: msg,
381	})
382
383	pickCtx, pickCancel := context.WithTimeout(ctx, 3*time.Second)
384	defer pickCancel()
385	gotA, okA := drainUntil(pickCtx, evcA, func(e pubsub.Event[proto.Message]) bool {
386		return e.Payload.ID == "m-1"
387	})
388	require.True(t, okA, "client A must receive the MessageEvent")
389	require.Equal(t, sessionID, gotA.Payload.SessionID)
390
391	gotB, okB := drainUntil(pickCtx, evcB, func(e pubsub.Event[proto.Message]) bool {
392		return e.Payload.ID == "m-1"
393	})
394	require.True(t, okB, "client B must receive the same MessageEvent")
395	require.Equal(t, sessionID, gotB.Payload.SessionID)
396}
397
398// TestE2E_PermissionFlowCrossClient covers PLAN item 6 scenario 2:
399// a tool-driven permission request is granted by client A; client B
400// observes a PermissionNotification; a redundant grant from B
401// returns the "already resolved" indicator (resolved=false from the
402// bool plumbing landed in item 3).
403func TestE2E_PermissionFlowCrossClient(t *testing.T) {
404	t.Parallel()
405	h := newE2EHarness(t)
406	ctx, cancel := context.WithCancel(t.Context())
407	t.Cleanup(cancel)
408
409	cidA := uuid.New().String()
410	cidB := uuid.New().String()
411
412	evcA, cancelA := h.subscribeSSE(t, ctx, h.workspace.ID, cidA)
413	t.Cleanup(cancelA)
414	evcB, cancelB := h.subscribeSSE(t, ctx, h.workspace.ID, cidB)
415	t.Cleanup(cancelB)
416
417	h.waitForAttached(t, 2)
418
419	// Drive the permission request from a goroutine simulating the
420	// tool path. Request blocks until resolved; capture the outcome.
421	const sessionID = "s-perm"
422	const toolCallID = "tc-1"
423	type result struct {
424		granted bool
425		err     error
426	}
427	done := make(chan result, 1)
428	go func() {
429		granted, err := h.app.Permissions.Request(ctx, permission.CreatePermissionRequest{
430			SessionID:   sessionID,
431			ToolCallID:  toolCallID,
432			ToolName:    "view",
433			Description: "read a file",
434			Action:      "read",
435			Path:        h.workspace.Path,
436		})
437		done <- result{granted: granted, err: err}
438	}()
439
440	// Wait for the PermissionRequest to arrive on client A's SSE
441	// stream. We need its ID to drive the grant.
442	pickCtx, pickCancel := context.WithTimeout(ctx, 3*time.Second)
443	defer pickCancel()
444	reqEv, ok := drainUntil(pickCtx, evcA, func(e pubsub.Event[proto.PermissionRequest]) bool {
445		return e.Payload.ToolCallID == toolCallID
446	})
447	require.True(t, ok, "client A must receive the PermissionRequest")
448
449	// Client A grants — first grant must report resolved=true.
450	resolvedA := h.grantPermission(t, ctx, h.workspace.ID, proto.PermissionGrant{
451		Permission: reqEv.Payload,
452		Action:     proto.PermissionAllow,
453	})
454	require.True(t, resolvedA, "client A's grant must resolve the pending request")
455
456	// The blocked Request call must now return granted=true.
457	select {
458	case r := <-done:
459		require.NoError(t, r.err)
460		require.True(t, r.granted)
461	case <-pickCtx.Done():
462		t.Fatal("permission Request did not return after grant")
463	}
464
465	// Client B must receive a PermissionNotification with
466	// Granted=true for the same ToolCallID. The initial neither-
467	// granted-nor-denied notification published at the start of
468	// Request also lands on B's stream — match on the granted one.
469	notif, ok := drainUntil(pickCtx, evcB, func(e pubsub.Event[proto.PermissionNotification]) bool {
470		return e.Payload.ToolCallID == toolCallID && e.Payload.Granted
471	})
472	require.True(t, ok, "client B must receive a granting PermissionNotification")
473	require.True(t, notif.Payload.Granted)
474	require.False(t, notif.Payload.Denied)
475
476	// A follow-up grant from client B must report resolved=false
477	// (the request was already resolved by A).
478	resolvedB := h.grantPermission(t, ctx, h.workspace.ID, proto.PermissionGrant{
479		Permission: reqEv.Payload,
480		Action:     proto.PermissionAllow,
481	})
482	require.False(t, resolvedB, "client B's follow-up grant must report already resolved")
483}
484
485// TestE2E_KillingClientASSEDoesNotBreakClientB covers PLAN item 6
486// scenario 3: terminating client A's SSE stream does not affect
487// client B's stream; client B continues to receive events.
488func TestE2E_KillingClientASSEDoesNotBreakClientB(t *testing.T) {
489	t.Parallel()
490	h := newE2EHarness(t)
491	ctxB, cancelB := context.WithCancel(t.Context())
492	t.Cleanup(cancelB)
493	ctxA, cancelA := context.WithCancel(t.Context())
494
495	cidA := uuid.New().String()
496	cidB := uuid.New().String()
497
498	_, killA := h.subscribeSSE(t, ctxA, h.workspace.ID, cidA)
499	t.Cleanup(killA)
500	evcB, killB := h.subscribeSSE(t, ctxB, h.workspace.ID, cidB)
501	t.Cleanup(killB)
502
503	h.waitForAttached(t, 2)
504
505	// Kill A's stream. The server's deferred DetachClient should
506	// drop A's claim, leaving B as the sole attached client.
507	cancelA()
508	killA()
509
510	require.Eventually(t, func() bool {
511		return backend.WorkspaceLiveStreamCountForTest(h.workspace) == 1
512	}, 3*time.Second, 10*time.Millisecond,
513		"expected client A's stream to drop the attached count to 1")
514
515	// Workspace must still exist (B is holding it open) and
516	// shutdown callback must not have fired yet.
517	_, err := h.backend.GetWorkspace(h.workspace.ID)
518	require.NoError(t, err, "workspace must still exist while B is attached")
519	require.False(t, h.shutdownHit.Load(),
520		"shutdown callback must not fire while B is still attached")
521
522	// Publish a fresh event; B must still receive it.
523	const sessionID = "s-after-a-died"
524	msg := message.Message{
525		ID:        "m-after",
526		SessionID: sessionID,
527		Role:      message.Assistant,
528		Parts:     []message.ContentPart{message.TextContent{Text: "still alive"}},
529	}
530	h.app.SendEvent(pubsub.Event[message.Message]{
531		Type:    pubsub.CreatedEvent,
532		Payload: msg,
533	})
534
535	pickCtx, pickCancel := context.WithTimeout(ctxB, 3*time.Second)
536	defer pickCancel()
537	got, ok := drainUntil(pickCtx, evcB, func(e pubsub.Event[proto.Message]) bool {
538		return e.Payload.ID == "m-after"
539	})
540	require.True(t, ok, "client B must still receive events after A's stream is killed")
541	require.Equal(t, sessionID, got.Payload.SessionID)
542}
543
544// TestE2E_ShutdownCallbackFiresWhenLastClientLeaves covers PLAN
545// item 6 scenario 4: once both clients disconnect, the backend
546// runs its "last workspace removed -> server shutdown" path.
547func TestE2E_ShutdownCallbackFiresWhenLastClientLeaves(t *testing.T) {
548	t.Parallel()
549	h := newE2EHarness(t)
550
551	ctxA, cancelA := context.WithCancel(t.Context())
552	ctxB, cancelB := context.WithCancel(t.Context())
553	t.Cleanup(cancelA)
554	t.Cleanup(cancelB)
555
556	cidA := uuid.New().String()
557	cidB := uuid.New().String()
558	_, killA := h.subscribeSSE(t, ctxA, h.workspace.ID, cidA)
559	t.Cleanup(killA)
560	_, killB := h.subscribeSSE(t, ctxB, h.workspace.ID, cidB)
561	t.Cleanup(killB)
562
563	h.waitForAttached(t, 2)
564	require.False(t, h.shutdownHit.Load(), "shutdown must not fire while clients are attached")
565
566	cancelA()
567	killA()
568	require.Eventually(t, func() bool {
569		return backend.WorkspaceLiveStreamCountForTest(h.workspace) == 1
570	}, 3*time.Second, 10*time.Millisecond)
571	require.False(t, h.shutdownHit.Load(),
572		"shutdown must not fire after only one client disconnects")
573
574	cancelB()
575	killB()
576	require.Eventually(t, h.shutdownHit.Load,
577		3*time.Second, 10*time.Millisecond,
578		"shutdown callback must fire once the last client disconnects")
579
580	// Workspace must be gone from the index.
581	_, err := h.backend.GetWorkspace(h.workspace.ID)
582	require.ErrorIs(t, err, backend.ErrWorkspaceNotFound)
583
584	// Subsequent GETs against the now-defunct workspace return
585	// 404, confirming the http surface still reflects the teardown.
586	req, err := http.NewRequestWithContext(t.Context(), http.MethodGet,
587		h.httpSrv.URL+"/v1/workspaces/"+h.workspace.ID, nil)
588	require.NoError(t, err)
589	r, err := h.httpSrv.Client().Do(req)
590	require.NoError(t, err)
591	_, _ = io.Copy(io.Discard, r.Body)
592	r.Body.Close()
593	require.Equal(t, http.StatusNotFound, r.StatusCode)
594}