1package server
2
3import (
4 "bufio"
5 "bytes"
6 "context"
7 "encoding/json"
8 "io"
9 "net/http"
10 "net/http/httptest"
11 "net/url"
12 "sync"
13 "sync/atomic"
14 "testing"
15 "time"
16
17 "github.com/charmbracelet/crush/internal/app"
18 "github.com/charmbracelet/crush/internal/backend"
19 "github.com/charmbracelet/crush/internal/message"
20 "github.com/charmbracelet/crush/internal/permission"
21 "github.com/charmbracelet/crush/internal/proto"
22 "github.com/charmbracelet/crush/internal/pubsub"
23 "github.com/google/uuid"
24 "github.com/stretchr/testify/require"
25)
26
27// e2eHarness wires a Server, its Backend (with a custom shutdownFn we
28// can observe), an httptest.NewServer, and a synthetic Workspace whose
29// embedded App has a live event broker. It is the minimum scaffolding
30// the multi-client end-to-end scenarios in PLAN item 6 need.
31type e2eHarness struct {
32 httpSrv *httptest.Server
33 srv *Server
34 backend *backend.Backend
35 workspace *backend.Workspace
36 app *app.App
37 shutdownHit atomic.Bool
38
39 // sseWG tracks every SSE reader goroutine spawned by
40 // [e2eHarness.subscribeSSE]. The harness's cleanup hook waits on
41 // it after the httptest server has been closed so that the test
42 // cannot leave behind background readers (and therefore unclosed
43 // response bodies) after returning.
44 sseWG sync.WaitGroup
45}
46
47// installServer attaches a fresh Server (with a custom shutdown
48// callback that flips [e2eHarness.shutdownHit]) wrapped in an
49// [httptest.Server] onto h. It registers the cleanup hooks for the
50// httptest server and the SSE reader WaitGroup in the order required
51// by the LIFO contract documented on [newE2EHarness].
52//
53// Callers that want a fully synthetic workspace use [newE2EHarness];
54// callers that want to drive the real CreateWorkspace HTTP path use
55// [newRealCreateHarness] and then [e2eHarness.postWorkspace].
56func (h *e2eHarness) installServer(t *testing.T) {
57 t.Helper()
58 srv := &Server{}
59 srv.backend = backend.New(context.Background(), nil, func() {
60 h.shutdownHit.Store(true)
61 })
62 srv.installHandler()
63
64 hs := httptest.NewServer(srv.Handler())
65 // Order matters: t.Cleanup is LIFO and the test's own per-
66 // stream cancels (cancelA/cancelB) run first. After those, we
67 // want hs.Close to fire first (so any handler still parked in
68 // its `select` returns), THEN sseWG.Wait so every reader
69 // goroutine exits and closes its response body. Any caller-
70 // owned cleanups registered *before* installServer (e.g. App
71 // teardown for the synthetic harness) therefore run LAST,
72 // after the readers have drained.
73 t.Cleanup(h.sseWG.Wait)
74 t.Cleanup(hs.Close)
75
76 h.httpSrv = hs
77 h.srv = srv
78 h.backend = srv.backend
79}
80
81// newE2EHarness builds an in-process server + a synthetic Workspace
82// whose embedded App is a real [app.App] constructed via
83// [app.NewForTest], so its event broker delivers everything the SSE
84// pipeline expects. Used by the scenarios that do not need to
85// exercise the path-dedupe behavior of [backend.CreateWorkspace].
86//
87// Cleanup tears down the App's broker only after sseWG.Wait and
88// hs.Close have run, so SSE readers cannot observe a dead broker.
89func newE2EHarness(t *testing.T) *e2eHarness {
90 t.Helper()
91
92 h := &e2eHarness{}
93
94 // Register the App teardown FIRST so LIFO order puts it AFTER
95 // the cleanups that installServer registers below (hs.Close +
96 // sseWG.Wait).
97 appCtx, cancel := context.WithCancel(context.Background())
98 a := app.NewForTest(appCtx)
99 t.Cleanup(func() {
100 cancel()
101 a.ShutdownForTest()
102 })
103
104 h.installServer(t)
105
106 ws := &backend.Workspace{
107 ID: uuid.New().String(),
108 Path: t.TempDir(),
109 App: a,
110 }
111 // Synthetic workspaces have an incomplete App; bypass the
112 // default teardown so the "last workspace removed" path can run
113 // without panicking inside [app.App.Shutdown].
114 backend.SetWorkspaceShutdownFnForTest(ws, func() {})
115 backend.InsertWorkspaceForTest(h.backend, ws)
116
117 h.workspace = ws
118 h.app = a
119 return h
120}
121
122// newRealCreateHarness builds an in-process server WITHOUT any
123// pre-inserted workspace, intended for tests that drive the real
124// [backend.CreateWorkspace] HTTP path (path-dedupe scenario). It
125// isolates HOME/XDG_* via [t.Setenv] so [config.Init] doesn't read
126// the host machine's config, which means callers MUST NOT mark the
127// test as parallel.
128func newRealCreateHarness(t *testing.T) *e2eHarness {
129 t.Helper()
130 t.Setenv("HOME", t.TempDir())
131 t.Setenv("XDG_CACHE_HOME", t.TempDir())
132 t.Setenv("XDG_CONFIG_HOME", t.TempDir())
133 t.Setenv("XDG_DATA_HOME", t.TempDir())
134
135 h := &e2eHarness{}
136 h.installServer(t)
137 return h
138}
139
140// postWorkspace drives the real POST /v1/workspaces handler and
141// returns the resolved workspace proto. This is how scenario 1
142// exercises the path-dedupe behavior from PLAN item 1: two calls
143// with the same Path and distinct ClientIDs must return the same
144// workspace ID.
145func (h *e2eHarness) postWorkspace(t *testing.T, args proto.Workspace) proto.Workspace {
146 t.Helper()
147 body, err := json.Marshal(args)
148 require.NoError(t, err)
149 req, err := http.NewRequestWithContext(t.Context(), http.MethodPost,
150 h.httpSrv.URL+"/v1/workspaces", bytes.NewReader(body))
151 require.NoError(t, err)
152 req.Header.Set("Content-Type", "application/json")
153 resp, err := h.httpSrv.Client().Do(req)
154 require.NoError(t, err)
155 defer resp.Body.Close()
156 require.Equal(t, http.StatusOK, resp.StatusCode, "POST /v1/workspaces must succeed")
157 var out proto.Workspace
158 require.NoError(t, json.NewDecoder(resp.Body).Decode(&out))
159 require.NotEmpty(t, out.ID, "server must return a workspace id")
160 return out
161}
162
163// subscribeSSE opens an SSE stream against the test server for the
164// given workspace and client ID. It returns a channel of decoded
165// envelopes plus a cancel function that closes the stream. The
166// returned channel is closed when the stream ends.
167func (h *e2eHarness) subscribeSSE(t *testing.T, ctx context.Context, workspaceID, clientID string) (<-chan any, context.CancelFunc) {
168 t.Helper()
169 streamCtx, cancel := context.WithCancel(ctx)
170
171 q := url.Values{"client_id": []string{clientID}}
172 reqURL := h.httpSrv.URL + "/v1/workspaces/" + workspaceID + "/events?" + q.Encode()
173 req, err := http.NewRequestWithContext(streamCtx, http.MethodGet, reqURL, nil)
174 require.NoError(t, err)
175 req.Header.Set("Accept", "text/event-stream")
176
177 resp, err := h.httpSrv.Client().Do(req)
178 require.NoError(t, err)
179 require.Equal(t, http.StatusOK, resp.StatusCode, "SSE subscribe should return 200")
180
181 out := make(chan any, 64)
182 h.sseWG.Go(func() {
183 defer resp.Body.Close()
184 defer close(out)
185 reader := bufio.NewReader(resp.Body)
186 for {
187 line, err := reader.ReadBytes('\n')
188 if err != nil {
189 return
190 }
191 line = bytes.TrimSpace(line)
192 if len(line) == 0 {
193 continue
194 }
195 data, ok := bytes.CutPrefix(line, []byte("data:"))
196 if !ok {
197 continue
198 }
199 data = bytes.TrimSpace(data)
200 var p pubsub.Payload
201 if err := json.Unmarshal(data, &p); err != nil {
202 continue
203 }
204 ev, decoded := decodeSSEEnvelope(p)
205 if !decoded {
206 continue
207 }
208 select {
209 case out <- ev:
210 case <-streamCtx.Done():
211 return
212 }
213 }
214 })
215 return out, cancel
216}
217
218// decodeSSEEnvelope decodes the discriminated SSE envelope into the
219// concrete pubsub.Event[proto.X] payload the e2e tests care about.
220// Unknown payload types are skipped so tests can match on type
221// assertions without worrying about envelope noise.
222func decodeSSEEnvelope(p pubsub.Payload) (any, bool) {
223 switch p.Type {
224 case pubsub.PayloadTypePermissionRequest:
225 var e pubsub.Event[proto.PermissionRequest]
226 if err := json.Unmarshal(p.Payload, &e); err != nil {
227 return nil, false
228 }
229 return e, true
230 case pubsub.PayloadTypePermissionNotification:
231 var e pubsub.Event[proto.PermissionNotification]
232 if err := json.Unmarshal(p.Payload, &e); err != nil {
233 return nil, false
234 }
235 return e, true
236 case pubsub.PayloadTypeMessage:
237 var e pubsub.Event[proto.Message]
238 if err := json.Unmarshal(p.Payload, &e); err != nil {
239 return nil, false
240 }
241 return e, true
242 }
243 return nil, false
244}
245
246// grantPermission posts a permission grant via the HTTP surface and
247// returns the server's "resolved" verdict. Mirrors the client-side
248// GrantPermission flow without importing internal/client (which
249// would create an import cycle from this in-package test).
250func (h *e2eHarness) grantPermission(t *testing.T, ctx context.Context, workspaceID string, req proto.PermissionGrant) bool {
251 t.Helper()
252 body, err := json.Marshal(req)
253 require.NoError(t, err)
254 httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost,
255 h.httpSrv.URL+"/v1/workspaces/"+workspaceID+"/permissions/grant",
256 bytes.NewReader(body))
257 require.NoError(t, err)
258 httpReq.Header.Set("Content-Type", "application/json")
259 resp, err := h.httpSrv.Client().Do(httpReq)
260 require.NoError(t, err)
261 defer resp.Body.Close()
262 require.Equal(t, http.StatusOK, resp.StatusCode)
263 var out proto.PermissionGrantResponse
264 require.NoError(t, json.NewDecoder(resp.Body).Decode(&out))
265 return out.Resolved
266}
267
268// waitForAttached spins until the workspace's clients map reports at
269// least n entries with streams > 0. Catches the race where a test
270// publishes events before the server-side AttachClient has completed.
271func (h *e2eHarness) waitForAttached(t *testing.T, n int) {
272 t.Helper()
273 h.waitForAttachedOn(t, h.workspace, n)
274}
275
276// waitForAttachedOn is the workspace-explicit form of waitForAttached.
277// Tests that drive a workspace whose pointer is not stored on the
278// harness (e.g. the real CreateWorkspace path) pass the workspace in.
279func (h *e2eHarness) waitForAttachedOn(t *testing.T, ws *backend.Workspace, n int) {
280 t.Helper()
281 deadline := time.Now().Add(2 * time.Second)
282 for time.Now().Before(deadline) {
283 if backend.WorkspaceLiveStreamCountForTest(ws) >= n {
284 return
285 }
286 time.Sleep(5 * time.Millisecond)
287 }
288 t.Fatalf("expected %d attached streams, have %d", n,
289 backend.WorkspaceLiveStreamCountForTest(ws))
290}
291
292// drainUntil reads from evc until it sees an event of type T that
293// satisfies match, or ctx expires. Returns the matching event and
294// ok=true, or the zero value and ok=false on timeout.
295func drainUntil[T any](ctx context.Context, evc <-chan any, match func(T) bool) (T, bool) {
296 var zero T
297 for {
298 select {
299 case <-ctx.Done():
300 return zero, false
301 case ev, ok := <-evc:
302 if !ok {
303 return zero, false
304 }
305 typed, isT := ev.(T)
306 if !isT {
307 continue
308 }
309 if match == nil || match(typed) {
310 return typed, true
311 }
312 }
313 }
314}
315
316// TestE2E_TwoClientsReceiveSameMessage covers PLAN item 6 scenario 1:
317// two clients POST /v1/workspaces with the same Path and observe
318// that the server returns a single workspace (path-dedupe from PLAN
319// item 1) and that an event published on that workspace fans out to
320// both SSE streams.
321//
322// Cannot run in parallel: it isolates HOME/XDG_* via t.Setenv so
323// config.Init does not read the host machine's real config.
324func TestE2E_TwoClientsReceiveSameMessage(t *testing.T) {
325 h := newRealCreateHarness(t)
326 // Shorten the create-grace window so the workspace's pending
327 // creation holds release quickly during test cleanup once both
328 // SSE streams have been detached.
329 h.backend.SetCreateGrace(200 * time.Millisecond)
330
331 ctx, cancel := context.WithCancel(t.Context())
332 t.Cleanup(cancel)
333
334 cidA := uuid.New().String()
335 cidB := uuid.New().String()
336
337 // Shared workspace path. Two POSTs with this path must
338 // deduplicate at the backend's pathIndex and return the same
339 // workspace id.
340 wsPath := t.TempDir()
341 dataDir := t.TempDir()
342 args := proto.Workspace{Path: wsPath, DataDir: dataDir}
343
344 argsA := args
345 argsA.ClientID = cidA
346 wsRespA := h.postWorkspace(t, argsA)
347
348 argsB := args
349 argsB.ClientID = cidB
350 wsRespB := h.postWorkspace(t, argsB)
351
352 require.Equal(t, wsRespA.ID, wsRespB.ID,
353 "POST /v1/workspaces with the same Path must return the same workspace id")
354
355 // Look up the resulting workspace on the backend so the test
356 // can publish events through its real [app.App] event broker.
357 ws, err := h.backend.GetWorkspace(wsRespA.ID)
358 require.NoError(t, err)
359 // Override the shutdown callback so test cleanup doesn't run
360 // the full app.Shutdown path (which would tear down LSP/MCP/DB
361 // resources the test doesn't need to exercise).
362 backend.SetWorkspaceShutdownFnForTest(ws, func() {})
363
364 evcA, cancelA := h.subscribeSSE(t, ctx, ws.ID, cidA)
365 t.Cleanup(cancelA)
366 evcB, cancelB := h.subscribeSSE(t, ctx, ws.ID, cidB)
367 t.Cleanup(cancelB)
368
369 h.waitForAttachedOn(t, ws, 2)
370
371 const sessionID = "s-e2e-1"
372 msg := message.Message{
373 ID: "m-1",
374 SessionID: sessionID,
375 Role: message.Assistant,
376 Parts: []message.ContentPart{message.TextContent{Text: "hello multi-client"}},
377 }
378 ws.SendEvent(pubsub.Event[message.Message]{
379 Type: pubsub.CreatedEvent,
380 Payload: msg,
381 })
382
383 pickCtx, pickCancel := context.WithTimeout(ctx, 3*time.Second)
384 defer pickCancel()
385 gotA, okA := drainUntil(pickCtx, evcA, func(e pubsub.Event[proto.Message]) bool {
386 return e.Payload.ID == "m-1"
387 })
388 require.True(t, okA, "client A must receive the MessageEvent")
389 require.Equal(t, sessionID, gotA.Payload.SessionID)
390
391 gotB, okB := drainUntil(pickCtx, evcB, func(e pubsub.Event[proto.Message]) bool {
392 return e.Payload.ID == "m-1"
393 })
394 require.True(t, okB, "client B must receive the same MessageEvent")
395 require.Equal(t, sessionID, gotB.Payload.SessionID)
396}
397
398// TestE2E_PermissionFlowCrossClient covers PLAN item 6 scenario 2:
399// a tool-driven permission request is granted by client A; client B
400// observes a PermissionNotification; a redundant grant from B
401// returns the "already resolved" indicator (resolved=false from the
402// bool plumbing landed in item 3).
403func TestE2E_PermissionFlowCrossClient(t *testing.T) {
404 t.Parallel()
405 h := newE2EHarness(t)
406 ctx, cancel := context.WithCancel(t.Context())
407 t.Cleanup(cancel)
408
409 cidA := uuid.New().String()
410 cidB := uuid.New().String()
411
412 evcA, cancelA := h.subscribeSSE(t, ctx, h.workspace.ID, cidA)
413 t.Cleanup(cancelA)
414 evcB, cancelB := h.subscribeSSE(t, ctx, h.workspace.ID, cidB)
415 t.Cleanup(cancelB)
416
417 h.waitForAttached(t, 2)
418
419 // Drive the permission request from a goroutine simulating the
420 // tool path. Request blocks until resolved; capture the outcome.
421 const sessionID = "s-perm"
422 const toolCallID = "tc-1"
423 type result struct {
424 granted bool
425 err error
426 }
427 done := make(chan result, 1)
428 go func() {
429 granted, err := h.app.Permissions.Request(ctx, permission.CreatePermissionRequest{
430 SessionID: sessionID,
431 ToolCallID: toolCallID,
432 ToolName: "view",
433 Description: "read a file",
434 Action: "read",
435 Path: h.workspace.Path,
436 })
437 done <- result{granted: granted, err: err}
438 }()
439
440 // Wait for the PermissionRequest to arrive on client A's SSE
441 // stream. We need its ID to drive the grant.
442 pickCtx, pickCancel := context.WithTimeout(ctx, 3*time.Second)
443 defer pickCancel()
444 reqEv, ok := drainUntil(pickCtx, evcA, func(e pubsub.Event[proto.PermissionRequest]) bool {
445 return e.Payload.ToolCallID == toolCallID
446 })
447 require.True(t, ok, "client A must receive the PermissionRequest")
448
449 // Client A grants — first grant must report resolved=true.
450 resolvedA := h.grantPermission(t, ctx, h.workspace.ID, proto.PermissionGrant{
451 Permission: reqEv.Payload,
452 Action: proto.PermissionAllow,
453 })
454 require.True(t, resolvedA, "client A's grant must resolve the pending request")
455
456 // The blocked Request call must now return granted=true.
457 select {
458 case r := <-done:
459 require.NoError(t, r.err)
460 require.True(t, r.granted)
461 case <-pickCtx.Done():
462 t.Fatal("permission Request did not return after grant")
463 }
464
465 // Client B must receive a PermissionNotification with
466 // Granted=true for the same ToolCallID. The initial neither-
467 // granted-nor-denied notification published at the start of
468 // Request also lands on B's stream — match on the granted one.
469 notif, ok := drainUntil(pickCtx, evcB, func(e pubsub.Event[proto.PermissionNotification]) bool {
470 return e.Payload.ToolCallID == toolCallID && e.Payload.Granted
471 })
472 require.True(t, ok, "client B must receive a granting PermissionNotification")
473 require.True(t, notif.Payload.Granted)
474 require.False(t, notif.Payload.Denied)
475
476 // A follow-up grant from client B must report resolved=false
477 // (the request was already resolved by A).
478 resolvedB := h.grantPermission(t, ctx, h.workspace.ID, proto.PermissionGrant{
479 Permission: reqEv.Payload,
480 Action: proto.PermissionAllow,
481 })
482 require.False(t, resolvedB, "client B's follow-up grant must report already resolved")
483}
484
485// TestE2E_KillingClientASSEDoesNotBreakClientB covers PLAN item 6
486// scenario 3: terminating client A's SSE stream does not affect
487// client B's stream; client B continues to receive events.
488func TestE2E_KillingClientASSEDoesNotBreakClientB(t *testing.T) {
489 t.Parallel()
490 h := newE2EHarness(t)
491 ctxB, cancelB := context.WithCancel(t.Context())
492 t.Cleanup(cancelB)
493 ctxA, cancelA := context.WithCancel(t.Context())
494
495 cidA := uuid.New().String()
496 cidB := uuid.New().String()
497
498 _, killA := h.subscribeSSE(t, ctxA, h.workspace.ID, cidA)
499 t.Cleanup(killA)
500 evcB, killB := h.subscribeSSE(t, ctxB, h.workspace.ID, cidB)
501 t.Cleanup(killB)
502
503 h.waitForAttached(t, 2)
504
505 // Kill A's stream. The server's deferred DetachClient should
506 // drop A's claim, leaving B as the sole attached client.
507 cancelA()
508 killA()
509
510 require.Eventually(t, func() bool {
511 return backend.WorkspaceLiveStreamCountForTest(h.workspace) == 1
512 }, 3*time.Second, 10*time.Millisecond,
513 "expected client A's stream to drop the attached count to 1")
514
515 // Workspace must still exist (B is holding it open) and
516 // shutdown callback must not have fired yet.
517 _, err := h.backend.GetWorkspace(h.workspace.ID)
518 require.NoError(t, err, "workspace must still exist while B is attached")
519 require.False(t, h.shutdownHit.Load(),
520 "shutdown callback must not fire while B is still attached")
521
522 // Publish a fresh event; B must still receive it.
523 const sessionID = "s-after-a-died"
524 msg := message.Message{
525 ID: "m-after",
526 SessionID: sessionID,
527 Role: message.Assistant,
528 Parts: []message.ContentPart{message.TextContent{Text: "still alive"}},
529 }
530 h.app.SendEvent(pubsub.Event[message.Message]{
531 Type: pubsub.CreatedEvent,
532 Payload: msg,
533 })
534
535 pickCtx, pickCancel := context.WithTimeout(ctxB, 3*time.Second)
536 defer pickCancel()
537 got, ok := drainUntil(pickCtx, evcB, func(e pubsub.Event[proto.Message]) bool {
538 return e.Payload.ID == "m-after"
539 })
540 require.True(t, ok, "client B must still receive events after A's stream is killed")
541 require.Equal(t, sessionID, got.Payload.SessionID)
542}
543
544// TestE2E_ShutdownCallbackFiresWhenLastClientLeaves covers PLAN
545// item 6 scenario 4: once both clients disconnect, the backend
546// runs its "last workspace removed -> server shutdown" path.
547func TestE2E_ShutdownCallbackFiresWhenLastClientLeaves(t *testing.T) {
548 t.Parallel()
549 h := newE2EHarness(t)
550
551 ctxA, cancelA := context.WithCancel(t.Context())
552 ctxB, cancelB := context.WithCancel(t.Context())
553 t.Cleanup(cancelA)
554 t.Cleanup(cancelB)
555
556 cidA := uuid.New().String()
557 cidB := uuid.New().String()
558 _, killA := h.subscribeSSE(t, ctxA, h.workspace.ID, cidA)
559 t.Cleanup(killA)
560 _, killB := h.subscribeSSE(t, ctxB, h.workspace.ID, cidB)
561 t.Cleanup(killB)
562
563 h.waitForAttached(t, 2)
564 require.False(t, h.shutdownHit.Load(), "shutdown must not fire while clients are attached")
565
566 cancelA()
567 killA()
568 require.Eventually(t, func() bool {
569 return backend.WorkspaceLiveStreamCountForTest(h.workspace) == 1
570 }, 3*time.Second, 10*time.Millisecond)
571 require.False(t, h.shutdownHit.Load(),
572 "shutdown must not fire after only one client disconnects")
573
574 cancelB()
575 killB()
576 require.Eventually(t, h.shutdownHit.Load,
577 3*time.Second, 10*time.Millisecond,
578 "shutdown callback must fire once the last client disconnects")
579
580 // Workspace must be gone from the index.
581 _, err := h.backend.GetWorkspace(h.workspace.ID)
582 require.ErrorIs(t, err, backend.ErrWorkspaceNotFound)
583
584 // Subsequent GETs against the now-defunct workspace return
585 // 404, confirming the http surface still reflects the teardown.
586 req, err := http.NewRequestWithContext(t.Context(), http.MethodGet,
587 h.httpSrv.URL+"/v1/workspaces/"+h.workspace.ID, nil)
588 require.NoError(t, err)
589 r, err := h.httpSrv.Client().Do(req)
590 require.NoError(t, err)
591 _, _ = io.Copy(io.Discard, r.Body)
592 r.Body.Close()
593 require.Equal(t, http.StatusNotFound, r.StatusCode)
594}