@@ -0,0 +1,594 @@
+package server
+
+import (
+ "bufio"
+ "bytes"
+ "context"
+ "encoding/json"
+ "io"
+ "net/http"
+ "net/http/httptest"
+ "net/url"
+ "sync"
+ "sync/atomic"
+ "testing"
+ "time"
+
+ "github.com/charmbracelet/crush/internal/app"
+ "github.com/charmbracelet/crush/internal/backend"
+ "github.com/charmbracelet/crush/internal/message"
+ "github.com/charmbracelet/crush/internal/permission"
+ "github.com/charmbracelet/crush/internal/proto"
+ "github.com/charmbracelet/crush/internal/pubsub"
+ "github.com/google/uuid"
+ "github.com/stretchr/testify/require"
+)
+
+// e2eHarness wires a Server, its Backend (with a custom shutdownFn we
+// can observe), an httptest.NewServer, and a synthetic Workspace whose
+// embedded App has a live event broker. It is the minimum scaffolding
+// the multi-client end-to-end scenarios in PLAN item 6 need.
+type e2eHarness struct {
+ httpSrv *httptest.Server
+ srv *Server
+ backend *backend.Backend
+ workspace *backend.Workspace
+ app *app.App
+ shutdownHit atomic.Bool
+
+ // sseWG tracks every SSE reader goroutine spawned by
+ // [e2eHarness.subscribeSSE]. The harness's cleanup hook waits on
+ // it after the httptest server has been closed so that the test
+ // cannot leave behind background readers (and therefore unclosed
+ // response bodies) after returning.
+ sseWG sync.WaitGroup
+}
+
+// installServer attaches a fresh Server (with a custom shutdown
+// callback that flips [e2eHarness.shutdownHit]) wrapped in an
+// [httptest.Server] onto h. It registers the cleanup hooks for the
+// httptest server and the SSE reader WaitGroup in the order required
+// by the LIFO contract documented on [newE2EHarness].
+//
+// Callers that want a fully synthetic workspace use [newE2EHarness];
+// callers that want to drive the real CreateWorkspace HTTP path use
+// [newRealCreateHarness] and then [e2eHarness.postWorkspace].
+func (h *e2eHarness) installServer(t *testing.T) {
+ t.Helper()
+ srv := &Server{}
+ srv.backend = backend.New(context.Background(), nil, func() {
+ h.shutdownHit.Store(true)
+ })
+ srv.installHandler()
+
+ hs := httptest.NewServer(srv.Handler())
+ // Order matters: t.Cleanup is LIFO and the test's own per-
+ // stream cancels (cancelA/cancelB) run first. After those, we
+ // want hs.Close to fire first (so any handler still parked in
+ // its `select` returns), THEN sseWG.Wait so every reader
+ // goroutine exits and closes its response body. Any caller-
+ // owned cleanups registered *before* installServer (e.g. App
+ // teardown for the synthetic harness) therefore run LAST,
+ // after the readers have drained.
+ t.Cleanup(h.sseWG.Wait)
+ t.Cleanup(hs.Close)
+
+ h.httpSrv = hs
+ h.srv = srv
+ h.backend = srv.backend
+}
+
+// newE2EHarness builds an in-process server + a synthetic Workspace
+// whose embedded App is a real [app.App] constructed via
+// [app.NewForTest], so its event broker delivers everything the SSE
+// pipeline expects. Used by the scenarios that do not need to
+// exercise the path-dedupe behavior of [backend.CreateWorkspace].
+//
+// Cleanup tears down the App's broker only after sseWG.Wait and
+// hs.Close have run, so SSE readers cannot observe a dead broker.
+func newE2EHarness(t *testing.T) *e2eHarness {
+ t.Helper()
+
+ h := &e2eHarness{}
+
+ // Register the App teardown FIRST so LIFO order puts it AFTER
+ // the cleanups that installServer registers below (hs.Close +
+ // sseWG.Wait).
+ appCtx, cancel := context.WithCancel(context.Background())
+ a := app.NewForTest(appCtx)
+ t.Cleanup(func() {
+ cancel()
+ a.ShutdownForTest()
+ })
+
+ h.installServer(t)
+
+ ws := &backend.Workspace{
+ ID: uuid.New().String(),
+ Path: t.TempDir(),
+ App: a,
+ }
+ // Synthetic workspaces have an incomplete App; bypass the
+ // default teardown so the "last workspace removed" path can run
+ // without panicking inside [app.App.Shutdown].
+ backend.SetWorkspaceShutdownFnForTest(ws, func() {})
+ backend.InsertWorkspaceForTest(h.backend, ws)
+
+ h.workspace = ws
+ h.app = a
+ return h
+}
+
+// newRealCreateHarness builds an in-process server WITHOUT any
+// pre-inserted workspace, intended for tests that drive the real
+// [backend.CreateWorkspace] HTTP path (path-dedupe scenario). It
+// isolates HOME/XDG_* via [t.Setenv] so [config.Init] doesn't read
+// the host machine's config, which means callers MUST NOT mark the
+// test as parallel.
+func newRealCreateHarness(t *testing.T) *e2eHarness {
+ t.Helper()
+ t.Setenv("HOME", t.TempDir())
+ t.Setenv("XDG_CACHE_HOME", t.TempDir())
+ t.Setenv("XDG_CONFIG_HOME", t.TempDir())
+ t.Setenv("XDG_DATA_HOME", t.TempDir())
+
+ h := &e2eHarness{}
+ h.installServer(t)
+ return h
+}
+
+// postWorkspace drives the real POST /v1/workspaces handler and
+// returns the resolved workspace proto. This is how scenario 1
+// exercises the path-dedupe behavior from PLAN item 1: two calls
+// with the same Path and distinct ClientIDs must return the same
+// workspace ID.
+func (h *e2eHarness) postWorkspace(t *testing.T, args proto.Workspace) proto.Workspace {
+ t.Helper()
+ body, err := json.Marshal(args)
+ require.NoError(t, err)
+ req, err := http.NewRequestWithContext(t.Context(), http.MethodPost,
+ h.httpSrv.URL+"/v1/workspaces", bytes.NewReader(body))
+ require.NoError(t, err)
+ req.Header.Set("Content-Type", "application/json")
+ resp, err := h.httpSrv.Client().Do(req)
+ require.NoError(t, err)
+ defer resp.Body.Close()
+ require.Equal(t, http.StatusOK, resp.StatusCode, "POST /v1/workspaces must succeed")
+ var out proto.Workspace
+ require.NoError(t, json.NewDecoder(resp.Body).Decode(&out))
+ require.NotEmpty(t, out.ID, "server must return a workspace id")
+ return out
+}
+
+// subscribeSSE opens an SSE stream against the test server for the
+// given workspace and client ID. It returns a channel of decoded
+// envelopes plus a cancel function that closes the stream. The
+// returned channel is closed when the stream ends.
+func (h *e2eHarness) subscribeSSE(t *testing.T, ctx context.Context, workspaceID, clientID string) (<-chan any, context.CancelFunc) {
+ t.Helper()
+ streamCtx, cancel := context.WithCancel(ctx)
+
+ q := url.Values{"client_id": []string{clientID}}
+ reqURL := h.httpSrv.URL + "/v1/workspaces/" + workspaceID + "/events?" + q.Encode()
+ req, err := http.NewRequestWithContext(streamCtx, http.MethodGet, reqURL, nil)
+ require.NoError(t, err)
+ req.Header.Set("Accept", "text/event-stream")
+
+ resp, err := h.httpSrv.Client().Do(req)
+ require.NoError(t, err)
+ require.Equal(t, http.StatusOK, resp.StatusCode, "SSE subscribe should return 200")
+
+ out := make(chan any, 64)
+ h.sseWG.Go(func() {
+ defer resp.Body.Close()
+ defer close(out)
+ reader := bufio.NewReader(resp.Body)
+ for {
+ line, err := reader.ReadBytes('\n')
+ if err != nil {
+ return
+ }
+ line = bytes.TrimSpace(line)
+ if len(line) == 0 {
+ continue
+ }
+ data, ok := bytes.CutPrefix(line, []byte("data:"))
+ if !ok {
+ continue
+ }
+ data = bytes.TrimSpace(data)
+ var p pubsub.Payload
+ if err := json.Unmarshal(data, &p); err != nil {
+ continue
+ }
+ ev, decoded := decodeSSEEnvelope(p)
+ if !decoded {
+ continue
+ }
+ select {
+ case out <- ev:
+ case <-streamCtx.Done():
+ return
+ }
+ }
+ })
+ return out, cancel
+}
+
+// decodeSSEEnvelope decodes the discriminated SSE envelope into the
+// concrete pubsub.Event[proto.X] payload the e2e tests care about.
+// Unknown payload types are skipped so tests can match on type
+// assertions without worrying about envelope noise.
+func decodeSSEEnvelope(p pubsub.Payload) (any, bool) {
+ switch p.Type {
+ case pubsub.PayloadTypePermissionRequest:
+ var e pubsub.Event[proto.PermissionRequest]
+ if err := json.Unmarshal(p.Payload, &e); err != nil {
+ return nil, false
+ }
+ return e, true
+ case pubsub.PayloadTypePermissionNotification:
+ var e pubsub.Event[proto.PermissionNotification]
+ if err := json.Unmarshal(p.Payload, &e); err != nil {
+ return nil, false
+ }
+ return e, true
+ case pubsub.PayloadTypeMessage:
+ var e pubsub.Event[proto.Message]
+ if err := json.Unmarshal(p.Payload, &e); err != nil {
+ return nil, false
+ }
+ return e, true
+ }
+ return nil, false
+}
+
+// grantPermission posts a permission grant via the HTTP surface and
+// returns the server's "resolved" verdict. Mirrors the client-side
+// GrantPermission flow without importing internal/client (which
+// would create an import cycle from this in-package test).
+func (h *e2eHarness) grantPermission(t *testing.T, ctx context.Context, workspaceID string, req proto.PermissionGrant) bool {
+ t.Helper()
+ body, err := json.Marshal(req)
+ require.NoError(t, err)
+ httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost,
+ h.httpSrv.URL+"/v1/workspaces/"+workspaceID+"/permissions/grant",
+ bytes.NewReader(body))
+ require.NoError(t, err)
+ httpReq.Header.Set("Content-Type", "application/json")
+ resp, err := h.httpSrv.Client().Do(httpReq)
+ require.NoError(t, err)
+ defer resp.Body.Close()
+ require.Equal(t, http.StatusOK, resp.StatusCode)
+ var out proto.PermissionGrantResponse
+ require.NoError(t, json.NewDecoder(resp.Body).Decode(&out))
+ return out.Resolved
+}
+
+// waitForAttached spins until the workspace's clients map reports at
+// least n entries with streams > 0. Catches the race where a test
+// publishes events before the server-side AttachClient has completed.
+func (h *e2eHarness) waitForAttached(t *testing.T, n int) {
+ t.Helper()
+ h.waitForAttachedOn(t, h.workspace, n)
+}
+
+// waitForAttachedOn is the workspace-explicit form of waitForAttached.
+// Tests that drive a workspace whose pointer is not stored on the
+// harness (e.g. the real CreateWorkspace path) pass the workspace in.
+func (h *e2eHarness) waitForAttachedOn(t *testing.T, ws *backend.Workspace, n int) {
+ t.Helper()
+ deadline := time.Now().Add(2 * time.Second)
+ for time.Now().Before(deadline) {
+ if backend.WorkspaceLiveStreamCountForTest(ws) >= n {
+ return
+ }
+ time.Sleep(5 * time.Millisecond)
+ }
+ t.Fatalf("expected %d attached streams, have %d", n,
+ backend.WorkspaceLiveStreamCountForTest(ws))
+}
+
+// drainUntil reads from evc until it sees an event of type T that
+// satisfies match, or ctx expires. Returns the matching event and
+// ok=true, or the zero value and ok=false on timeout.
+func drainUntil[T any](ctx context.Context, evc <-chan any, match func(T) bool) (T, bool) {
+ var zero T
+ for {
+ select {
+ case <-ctx.Done():
+ return zero, false
+ case ev, ok := <-evc:
+ if !ok {
+ return zero, false
+ }
+ typed, isT := ev.(T)
+ if !isT {
+ continue
+ }
+ if match == nil || match(typed) {
+ return typed, true
+ }
+ }
+ }
+}
+
+// TestE2E_TwoClientsReceiveSameMessage covers PLAN item 6 scenario 1:
+// two clients POST /v1/workspaces with the same Path and observe
+// that the server returns a single workspace (path-dedupe from PLAN
+// item 1) and that an event published on that workspace fans out to
+// both SSE streams.
+//
+// Cannot run in parallel: it isolates HOME/XDG_* via t.Setenv so
+// config.Init does not read the host machine's real config.
+func TestE2E_TwoClientsReceiveSameMessage(t *testing.T) {
+ h := newRealCreateHarness(t)
+ // Shorten the create-grace window so the workspace's pending
+ // creation holds release quickly during test cleanup once both
+ // SSE streams have been detached.
+ h.backend.SetCreateGrace(200 * time.Millisecond)
+
+ ctx, cancel := context.WithCancel(t.Context())
+ t.Cleanup(cancel)
+
+ cidA := uuid.New().String()
+ cidB := uuid.New().String()
+
+ // Shared workspace path. Two POSTs with this path must
+ // deduplicate at the backend's pathIndex and return the same
+ // workspace id.
+ wsPath := t.TempDir()
+ dataDir := t.TempDir()
+ args := proto.Workspace{Path: wsPath, DataDir: dataDir}
+
+ argsA := args
+ argsA.ClientID = cidA
+ wsRespA := h.postWorkspace(t, argsA)
+
+ argsB := args
+ argsB.ClientID = cidB
+ wsRespB := h.postWorkspace(t, argsB)
+
+ require.Equal(t, wsRespA.ID, wsRespB.ID,
+ "POST /v1/workspaces with the same Path must return the same workspace id")
+
+ // Look up the resulting workspace on the backend so the test
+ // can publish events through its real [app.App] event broker.
+ ws, err := h.backend.GetWorkspace(wsRespA.ID)
+ require.NoError(t, err)
+ // Override the shutdown callback so test cleanup doesn't run
+ // the full app.Shutdown path (which would tear down LSP/MCP/DB
+ // resources the test doesn't need to exercise).
+ backend.SetWorkspaceShutdownFnForTest(ws, func() {})
+
+ evcA, cancelA := h.subscribeSSE(t, ctx, ws.ID, cidA)
+ t.Cleanup(cancelA)
+ evcB, cancelB := h.subscribeSSE(t, ctx, ws.ID, cidB)
+ t.Cleanup(cancelB)
+
+ h.waitForAttachedOn(t, ws, 2)
+
+ const sessionID = "s-e2e-1"
+ msg := message.Message{
+ ID: "m-1",
+ SessionID: sessionID,
+ Role: message.Assistant,
+ Parts: []message.ContentPart{message.TextContent{Text: "hello multi-client"}},
+ }
+ ws.SendEvent(pubsub.Event[message.Message]{
+ Type: pubsub.CreatedEvent,
+ Payload: msg,
+ })
+
+ pickCtx, pickCancel := context.WithTimeout(ctx, 3*time.Second)
+ defer pickCancel()
+ gotA, okA := drainUntil(pickCtx, evcA, func(e pubsub.Event[proto.Message]) bool {
+ return e.Payload.ID == "m-1"
+ })
+ require.True(t, okA, "client A must receive the MessageEvent")
+ require.Equal(t, sessionID, gotA.Payload.SessionID)
+
+ gotB, okB := drainUntil(pickCtx, evcB, func(e pubsub.Event[proto.Message]) bool {
+ return e.Payload.ID == "m-1"
+ })
+ require.True(t, okB, "client B must receive the same MessageEvent")
+ require.Equal(t, sessionID, gotB.Payload.SessionID)
+}
+
+// TestE2E_PermissionFlowCrossClient covers PLAN item 6 scenario 2:
+// a tool-driven permission request is granted by client A; client B
+// observes a PermissionNotification; a redundant grant from B
+// returns the "already resolved" indicator (resolved=false from the
+// bool plumbing landed in item 3).
+func TestE2E_PermissionFlowCrossClient(t *testing.T) {
+ t.Parallel()
+ h := newE2EHarness(t)
+ ctx, cancel := context.WithCancel(t.Context())
+ t.Cleanup(cancel)
+
+ cidA := uuid.New().String()
+ cidB := uuid.New().String()
+
+ evcA, cancelA := h.subscribeSSE(t, ctx, h.workspace.ID, cidA)
+ t.Cleanup(cancelA)
+ evcB, cancelB := h.subscribeSSE(t, ctx, h.workspace.ID, cidB)
+ t.Cleanup(cancelB)
+
+ h.waitForAttached(t, 2)
+
+ // Drive the permission request from a goroutine simulating the
+ // tool path. Request blocks until resolved; capture the outcome.
+ const sessionID = "s-perm"
+ const toolCallID = "tc-1"
+ type result struct {
+ granted bool
+ err error
+ }
+ done := make(chan result, 1)
+ go func() {
+ granted, err := h.app.Permissions.Request(ctx, permission.CreatePermissionRequest{
+ SessionID: sessionID,
+ ToolCallID: toolCallID,
+ ToolName: "view",
+ Description: "read a file",
+ Action: "read",
+ Path: h.workspace.Path,
+ })
+ done <- result{granted: granted, err: err}
+ }()
+
+ // Wait for the PermissionRequest to arrive on client A's SSE
+ // stream. We need its ID to drive the grant.
+ pickCtx, pickCancel := context.WithTimeout(ctx, 3*time.Second)
+ defer pickCancel()
+ reqEv, ok := drainUntil(pickCtx, evcA, func(e pubsub.Event[proto.PermissionRequest]) bool {
+ return e.Payload.ToolCallID == toolCallID
+ })
+ require.True(t, ok, "client A must receive the PermissionRequest")
+
+ // Client A grants — first grant must report resolved=true.
+ resolvedA := h.grantPermission(t, ctx, h.workspace.ID, proto.PermissionGrant{
+ Permission: reqEv.Payload,
+ Action: proto.PermissionAllow,
+ })
+ require.True(t, resolvedA, "client A's grant must resolve the pending request")
+
+ // The blocked Request call must now return granted=true.
+ select {
+ case r := <-done:
+ require.NoError(t, r.err)
+ require.True(t, r.granted)
+ case <-pickCtx.Done():
+ t.Fatal("permission Request did not return after grant")
+ }
+
+ // Client B must receive a PermissionNotification with
+ // Granted=true for the same ToolCallID. The initial neither-
+ // granted-nor-denied notification published at the start of
+ // Request also lands on B's stream — match on the granted one.
+ notif, ok := drainUntil(pickCtx, evcB, func(e pubsub.Event[proto.PermissionNotification]) bool {
+ return e.Payload.ToolCallID == toolCallID && e.Payload.Granted
+ })
+ require.True(t, ok, "client B must receive a granting PermissionNotification")
+ require.True(t, notif.Payload.Granted)
+ require.False(t, notif.Payload.Denied)
+
+ // A follow-up grant from client B must report resolved=false
+ // (the request was already resolved by A).
+ resolvedB := h.grantPermission(t, ctx, h.workspace.ID, proto.PermissionGrant{
+ Permission: reqEv.Payload,
+ Action: proto.PermissionAllow,
+ })
+ require.False(t, resolvedB, "client B's follow-up grant must report already resolved")
+}
+
+// TestE2E_KillingClientASSEDoesNotBreakClientB covers PLAN item 6
+// scenario 3: terminating client A's SSE stream does not affect
+// client B's stream; client B continues to receive events.
+func TestE2E_KillingClientASSEDoesNotBreakClientB(t *testing.T) {
+ t.Parallel()
+ h := newE2EHarness(t)
+ ctxB, cancelB := context.WithCancel(t.Context())
+ t.Cleanup(cancelB)
+ ctxA, cancelA := context.WithCancel(t.Context())
+
+ cidA := uuid.New().String()
+ cidB := uuid.New().String()
+
+ _, killA := h.subscribeSSE(t, ctxA, h.workspace.ID, cidA)
+ t.Cleanup(killA)
+ evcB, killB := h.subscribeSSE(t, ctxB, h.workspace.ID, cidB)
+ t.Cleanup(killB)
+
+ h.waitForAttached(t, 2)
+
+ // Kill A's stream. The server's deferred DetachClient should
+ // drop A's claim, leaving B as the sole attached client.
+ cancelA()
+ killA()
+
+ require.Eventually(t, func() bool {
+ return backend.WorkspaceLiveStreamCountForTest(h.workspace) == 1
+ }, 3*time.Second, 10*time.Millisecond,
+ "expected client A's stream to drop the attached count to 1")
+
+ // Workspace must still exist (B is holding it open) and
+ // shutdown callback must not have fired yet.
+ _, err := h.backend.GetWorkspace(h.workspace.ID)
+ require.NoError(t, err, "workspace must still exist while B is attached")
+ require.False(t, h.shutdownHit.Load(),
+ "shutdown callback must not fire while B is still attached")
+
+ // Publish a fresh event; B must still receive it.
+ const sessionID = "s-after-a-died"
+ msg := message.Message{
+ ID: "m-after",
+ SessionID: sessionID,
+ Role: message.Assistant,
+ Parts: []message.ContentPart{message.TextContent{Text: "still alive"}},
+ }
+ h.app.SendEvent(pubsub.Event[message.Message]{
+ Type: pubsub.CreatedEvent,
+ Payload: msg,
+ })
+
+ pickCtx, pickCancel := context.WithTimeout(ctxB, 3*time.Second)
+ defer pickCancel()
+ got, ok := drainUntil(pickCtx, evcB, func(e pubsub.Event[proto.Message]) bool {
+ return e.Payload.ID == "m-after"
+ })
+ require.True(t, ok, "client B must still receive events after A's stream is killed")
+ require.Equal(t, sessionID, got.Payload.SessionID)
+}
+
+// TestE2E_ShutdownCallbackFiresWhenLastClientLeaves covers PLAN
+// item 6 scenario 4: once both clients disconnect, the backend
+// runs its "last workspace removed -> server shutdown" path.
+func TestE2E_ShutdownCallbackFiresWhenLastClientLeaves(t *testing.T) {
+ t.Parallel()
+ h := newE2EHarness(t)
+
+ ctxA, cancelA := context.WithCancel(t.Context())
+ ctxB, cancelB := context.WithCancel(t.Context())
+ t.Cleanup(cancelA)
+ t.Cleanup(cancelB)
+
+ cidA := uuid.New().String()
+ cidB := uuid.New().String()
+ _, killA := h.subscribeSSE(t, ctxA, h.workspace.ID, cidA)
+ t.Cleanup(killA)
+ _, killB := h.subscribeSSE(t, ctxB, h.workspace.ID, cidB)
+ t.Cleanup(killB)
+
+ h.waitForAttached(t, 2)
+ require.False(t, h.shutdownHit.Load(), "shutdown must not fire while clients are attached")
+
+ cancelA()
+ killA()
+ require.Eventually(t, func() bool {
+ return backend.WorkspaceLiveStreamCountForTest(h.workspace) == 1
+ }, 3*time.Second, 10*time.Millisecond)
+ require.False(t, h.shutdownHit.Load(),
+ "shutdown must not fire after only one client disconnects")
+
+ cancelB()
+ killB()
+ require.Eventually(t, h.shutdownHit.Load,
+ 3*time.Second, 10*time.Millisecond,
+ "shutdown callback must fire once the last client disconnects")
+
+ // Workspace must be gone from the index.
+ _, err := h.backend.GetWorkspace(h.workspace.ID)
+ require.ErrorIs(t, err, backend.ErrWorkspaceNotFound)
+
+ // Subsequent GETs against the now-defunct workspace return
+ // 404, confirming the http surface still reflects the teardown.
+ req, err := http.NewRequestWithContext(t.Context(), http.MethodGet,
+ h.httpSrv.URL+"/v1/workspaces/"+h.workspace.ID, nil)
+ require.NoError(t, err)
+ r, err := h.httpSrv.Client().Do(req)
+ require.NoError(t, err)
+ _, _ = io.Copy(io.Discard, r.Body)
+ r.Body.Close()
+ require.Equal(t, http.StatusNotFound, r.StatusCode)
+}