1package server
2
3import (
4 "bufio"
5 "bytes"
6 "context"
7 "encoding/json"
8 "io"
9 "net/http"
10 "net/http/httptest"
11 "net/url"
12 "sync"
13 "sync/atomic"
14 "testing"
15 "time"
16
17 "github.com/charmbracelet/crush/internal/app"
18 "github.com/charmbracelet/crush/internal/backend"
19 "github.com/charmbracelet/crush/internal/db"
20 "github.com/charmbracelet/crush/internal/message"
21 "github.com/charmbracelet/crush/internal/permission"
22 "github.com/charmbracelet/crush/internal/proto"
23 "github.com/charmbracelet/crush/internal/pubsub"
24 "github.com/google/uuid"
25 "github.com/stretchr/testify/require"
26)
27
28// e2eHarness wires a Server, its Backend (with a custom shutdownFn we
29// can observe), an httptest.NewServer, and a synthetic Workspace whose
30// embedded App has a live event broker. It is the minimum scaffolding
31// the multi-client end-to-end scenarios in PLAN item 6 need.
32type e2eHarness struct {
33 httpSrv *httptest.Server
34 srv *Server
35 backend *backend.Backend
36 workspace *backend.Workspace
37 app *app.App
38 shutdownHit atomic.Bool
39
40 // sseWG tracks every SSE reader goroutine spawned by
41 // [e2eHarness.subscribeSSE]. The harness's cleanup hook waits on
42 // it after the httptest server has been closed so that the test
43 // cannot leave behind background readers (and therefore unclosed
44 // response bodies) after returning.
45 sseWG sync.WaitGroup
46}
47
48// installServer attaches a fresh Server (with a custom shutdown
49// callback that flips [e2eHarness.shutdownHit]) wrapped in an
50// [httptest.Server] onto h. It registers the cleanup hooks for the
51// httptest server and the SSE reader WaitGroup in the order required
52// by the LIFO contract documented on [newE2EHarness].
53//
54// Callers that want a fully synthetic workspace use [newE2EHarness];
55// callers that want to drive the real CreateWorkspace HTTP path use
56// [newRealCreateHarness] and then [e2eHarness.postWorkspace].
57func (h *e2eHarness) installServer(t *testing.T) {
58 t.Helper()
59 srv := &Server{}
60 srv.backend = backend.New(context.Background(), nil, func() {
61 h.shutdownHit.Store(true)
62 })
63 srv.installHandler()
64
65 hs := httptest.NewServer(srv.Handler())
66 // Order matters: t.Cleanup is LIFO and the test's own per-
67 // stream cancels (cancelA/cancelB) run first. After those, we
68 // want hs.Close to fire first (so any handler still parked in
69 // its `select` returns), THEN sseWG.Wait so every reader
70 // goroutine exits and closes its response body. Any caller-
71 // owned cleanups registered *before* installServer (e.g. App
72 // teardown for the synthetic harness) therefore run LAST,
73 // after the readers have drained.
74 t.Cleanup(h.sseWG.Wait)
75 t.Cleanup(hs.Close)
76
77 h.httpSrv = hs
78 h.srv = srv
79 h.backend = srv.backend
80}
81
82// newE2EHarness builds an in-process server + a synthetic Workspace
83// whose embedded App is a real [app.App] constructed via
84// [app.NewForTest], so its event broker delivers everything the SSE
85// pipeline expects. Used by the scenarios that do not need to
86// exercise the path-dedupe behavior of [backend.CreateWorkspace].
87//
88// Cleanup tears down the App's broker only after sseWG.Wait and
89// hs.Close have run, so SSE readers cannot observe a dead broker.
90func newE2EHarness(t *testing.T) *e2eHarness {
91 t.Helper()
92
93 h := &e2eHarness{}
94
95 // Register the App teardown FIRST so LIFO order puts it AFTER
96 // the cleanups that installServer registers below (hs.Close +
97 // sseWG.Wait).
98 appCtx, cancel := context.WithCancel(context.Background())
99 a := app.NewForTest(appCtx)
100 t.Cleanup(func() {
101 cancel()
102 a.ShutdownForTest()
103 })
104
105 h.installServer(t)
106
107 ws := &backend.Workspace{
108 ID: uuid.New().String(),
109 Path: t.TempDir(),
110 App: a,
111 }
112 // Synthetic workspaces have an incomplete App; bypass the
113 // default teardown so the "last workspace removed" path can run
114 // without panicking inside [app.App.Shutdown].
115 backend.SetWorkspaceShutdownFnForTest(ws, func() {})
116 backend.InsertWorkspaceForTest(h.backend, ws)
117
118 h.workspace = ws
119 h.app = a
120 return h
121}
122
123// newRealCreateHarness builds an in-process server WITHOUT any
124// pre-inserted workspace, intended for tests that drive the real
125// [backend.CreateWorkspace] HTTP path (path-dedupe scenario). It
126// isolates HOME/XDG_* via [t.Setenv] so [config.Init] doesn't read
127// the host machine's config, which means callers MUST NOT mark the
128// test as parallel.
129func newRealCreateHarness(t *testing.T) *e2eHarness {
130 t.Helper()
131 t.Setenv("HOME", t.TempDir())
132 t.Setenv("XDG_CACHE_HOME", t.TempDir())
133 t.Setenv("XDG_CONFIG_HOME", t.TempDir())
134 t.Setenv("XDG_DATA_HOME", t.TempDir())
135
136 h := &e2eHarness{}
137 h.installServer(t)
138 return h
139}
140
141// postWorkspace drives the real POST /v1/workspaces handler and
142// returns the resolved workspace proto. This is how scenario 1
143// exercises the path-dedupe behavior from PLAN item 1: two calls
144// with the same Path and distinct ClientIDs must return the same
145// workspace ID.
146func (h *e2eHarness) postWorkspace(t *testing.T, args proto.Workspace) proto.Workspace {
147 t.Helper()
148 body, err := json.Marshal(args)
149 require.NoError(t, err)
150 req, err := http.NewRequestWithContext(t.Context(), http.MethodPost,
151 h.httpSrv.URL+"/v1/workspaces", bytes.NewReader(body))
152 require.NoError(t, err)
153 req.Header.Set("Content-Type", "application/json")
154 resp, err := h.httpSrv.Client().Do(req)
155 require.NoError(t, err)
156 defer resp.Body.Close()
157 require.Equal(t, http.StatusOK, resp.StatusCode, "POST /v1/workspaces must succeed")
158 var out proto.Workspace
159 require.NoError(t, json.NewDecoder(resp.Body).Decode(&out))
160 require.NotEmpty(t, out.ID, "server must return a workspace id")
161 return out
162}
163
164// subscribeSSE opens an SSE stream against the test server for the
165// given workspace and client ID. It returns a channel of decoded
166// envelopes plus a cancel function that closes the stream. The
167// returned channel is closed when the stream ends.
168func (h *e2eHarness) subscribeSSE(t *testing.T, ctx context.Context, workspaceID, clientID string) (<-chan any, context.CancelFunc) {
169 t.Helper()
170 streamCtx, cancel := context.WithCancel(ctx)
171
172 q := url.Values{"client_id": []string{clientID}}
173 reqURL := h.httpSrv.URL + "/v1/workspaces/" + workspaceID + "/events?" + q.Encode()
174 req, err := http.NewRequestWithContext(streamCtx, http.MethodGet, reqURL, nil)
175 require.NoError(t, err)
176 req.Header.Set("Accept", "text/event-stream")
177
178 resp, err := h.httpSrv.Client().Do(req)
179 require.NoError(t, err)
180 require.Equal(t, http.StatusOK, resp.StatusCode, "SSE subscribe should return 200")
181
182 out := make(chan any, 64)
183 h.sseWG.Go(func() {
184 defer resp.Body.Close()
185 defer close(out)
186 reader := bufio.NewReader(resp.Body)
187 for {
188 line, err := reader.ReadBytes('\n')
189 if err != nil {
190 return
191 }
192 line = bytes.TrimSpace(line)
193 if len(line) == 0 {
194 continue
195 }
196 data, ok := bytes.CutPrefix(line, []byte("data:"))
197 if !ok {
198 continue
199 }
200 data = bytes.TrimSpace(data)
201 var p pubsub.Payload
202 if err := json.Unmarshal(data, &p); err != nil {
203 continue
204 }
205 ev, decoded := decodeSSEEnvelope(p)
206 if !decoded {
207 continue
208 }
209 select {
210 case out <- ev:
211 case <-streamCtx.Done():
212 return
213 }
214 }
215 })
216 return out, cancel
217}
218
219// decodeSSEEnvelope decodes the discriminated SSE envelope into the
220// concrete pubsub.Event[proto.X] payload the e2e tests care about.
221// Unknown payload types are skipped so tests can match on type
222// assertions without worrying about envelope noise.
223func decodeSSEEnvelope(p pubsub.Payload) (any, bool) {
224 switch p.Type {
225 case pubsub.PayloadTypePermissionRequest:
226 var e pubsub.Event[proto.PermissionRequest]
227 if err := json.Unmarshal(p.Payload, &e); err != nil {
228 return nil, false
229 }
230 return e, true
231 case pubsub.PayloadTypePermissionNotification:
232 var e pubsub.Event[proto.PermissionNotification]
233 if err := json.Unmarshal(p.Payload, &e); err != nil {
234 return nil, false
235 }
236 return e, true
237 case pubsub.PayloadTypeMessage:
238 var e pubsub.Event[proto.Message]
239 if err := json.Unmarshal(p.Payload, &e); err != nil {
240 return nil, false
241 }
242 return e, true
243 }
244 return nil, false
245}
246
247// grantPermission posts a permission grant via the HTTP surface and
248// returns the server's "resolved" verdict. Mirrors the client-side
249// GrantPermission flow without importing internal/client (which
250// would create an import cycle from this in-package test).
251func (h *e2eHarness) grantPermission(t *testing.T, ctx context.Context, workspaceID string, req proto.PermissionGrant) bool {
252 t.Helper()
253 body, err := json.Marshal(req)
254 require.NoError(t, err)
255 httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost,
256 h.httpSrv.URL+"/v1/workspaces/"+workspaceID+"/permissions/grant",
257 bytes.NewReader(body))
258 require.NoError(t, err)
259 httpReq.Header.Set("Content-Type", "application/json")
260 resp, err := h.httpSrv.Client().Do(httpReq)
261 require.NoError(t, err)
262 defer resp.Body.Close()
263 require.Equal(t, http.StatusOK, resp.StatusCode)
264 var out proto.PermissionGrantResponse
265 require.NoError(t, json.NewDecoder(resp.Body).Decode(&out))
266 return out.Resolved
267}
268
269// waitForAttached spins until the workspace's clients map reports at
270// least n entries with streams > 0. Catches the race where a test
271// publishes events before the server-side AttachClient has completed.
272func (h *e2eHarness) waitForAttached(t *testing.T, n int) {
273 t.Helper()
274 h.waitForAttachedOn(t, h.workspace, n)
275}
276
277// waitForAttachedOn is the workspace-explicit form of waitForAttached.
278// Tests that drive a workspace whose pointer is not stored on the
279// harness (e.g. the real CreateWorkspace path) pass the workspace in.
280func (h *e2eHarness) waitForAttachedOn(t *testing.T, ws *backend.Workspace, n int) {
281 t.Helper()
282 deadline := time.Now().Add(2 * time.Second)
283 for time.Now().Before(deadline) {
284 if backend.WorkspaceLiveStreamCountForTest(ws) >= n {
285 return
286 }
287 time.Sleep(5 * time.Millisecond)
288 }
289 t.Fatalf("expected %d attached streams, have %d", n,
290 backend.WorkspaceLiveStreamCountForTest(ws))
291}
292
293// drainUntil reads from evc until it sees an event of type T that
294// satisfies match, or ctx expires. Returns the matching event and
295// ok=true, or the zero value and ok=false on timeout.
296func drainUntil[T any](ctx context.Context, evc <-chan any, match func(T) bool) (T, bool) {
297 var zero T
298 for {
299 select {
300 case <-ctx.Done():
301 return zero, false
302 case ev, ok := <-evc:
303 if !ok {
304 return zero, false
305 }
306 typed, isT := ev.(T)
307 if !isT {
308 continue
309 }
310 if match == nil || match(typed) {
311 return typed, true
312 }
313 }
314 }
315}
316
317// TestE2E_TwoClientsReceiveSameMessage covers PLAN item 6 scenario 1:
318// two clients POST /v1/workspaces with the same Path and observe
319// that the server returns a single workspace (path-dedupe from PLAN
320// item 1) and that an event published on that workspace fans out to
321// both SSE streams.
322//
323// Cannot run in parallel: it isolates HOME/XDG_* via t.Setenv so
324// config.Init does not read the host machine's real config.
325func TestE2E_TwoClientsReceiveSameMessage(t *testing.T) {
326 h := newRealCreateHarness(t)
327 // Shorten the create-grace window so the workspace's pending
328 // creation holds release quickly during test cleanup once both
329 // SSE streams have been detached.
330 h.backend.SetCreateGrace(200 * time.Millisecond)
331
332 ctx, cancel := context.WithCancel(t.Context())
333 t.Cleanup(cancel)
334
335 cidA := uuid.New().String()
336 cidB := uuid.New().String()
337
338 // Shared workspace path. Two POSTs with this path must
339 // deduplicate at the backend's pathIndex and return the same
340 // workspace id.
341 wsPath := t.TempDir()
342 dataDir := t.TempDir()
343 args := proto.Workspace{Path: wsPath, DataDir: dataDir}
344
345 argsA := args
346 argsA.ClientID = cidA
347 wsRespA := h.postWorkspace(t, argsA)
348
349 argsB := args
350 argsB.ClientID = cidB
351 wsRespB := h.postWorkspace(t, argsB)
352
353 require.Equal(t, wsRespA.ID, wsRespB.ID,
354 "POST /v1/workspaces with the same Path must return the same workspace id")
355
356 // Look up the resulting workspace on the backend so the test
357 // can publish events through its real [app.App] event broker.
358 ws, err := h.backend.GetWorkspace(wsRespA.ID)
359 require.NoError(t, err)
360 // Override the shutdown callback so test cleanup doesn't run
361 // the full app.Shutdown path (which would tear down LSP/MCP
362 // resources the test doesn't need to exercise), but still
363 // release the pooled DB connection so Windows can clean up
364 // the temp data directory.
365 wsDataDir := ws.Cfg.Config().Options.DataDirectory
366 backend.SetWorkspaceShutdownFnForTest(ws, func() {
367 _ = db.Release(wsDataDir)
368 })
369
370 evcA, cancelA := h.subscribeSSE(t, ctx, ws.ID, cidA)
371 t.Cleanup(cancelA)
372 evcB, cancelB := h.subscribeSSE(t, ctx, ws.ID, cidB)
373 t.Cleanup(cancelB)
374
375 h.waitForAttachedOn(t, ws, 2)
376
377 const sessionID = "s-e2e-1"
378 msg := message.Message{
379 ID: "m-1",
380 SessionID: sessionID,
381 Role: message.Assistant,
382 Parts: []message.ContentPart{message.TextContent{Text: "hello multi-client"}},
383 }
384 ws.SendEvent(pubsub.Event[message.Message]{
385 Type: pubsub.CreatedEvent,
386 Payload: msg,
387 })
388
389 pickCtx, pickCancel := context.WithTimeout(ctx, 3*time.Second)
390 defer pickCancel()
391 gotA, okA := drainUntil(pickCtx, evcA, func(e pubsub.Event[proto.Message]) bool {
392 return e.Payload.ID == "m-1"
393 })
394 require.True(t, okA, "client A must receive the MessageEvent")
395 require.Equal(t, sessionID, gotA.Payload.SessionID)
396
397 gotB, okB := drainUntil(pickCtx, evcB, func(e pubsub.Event[proto.Message]) bool {
398 return e.Payload.ID == "m-1"
399 })
400 require.True(t, okB, "client B must receive the same MessageEvent")
401 require.Equal(t, sessionID, gotB.Payload.SessionID)
402}
403
404// TestE2E_PermissionFlowCrossClient covers PLAN item 6 scenario 2:
405// a tool-driven permission request is granted by client A; client B
406// observes a PermissionNotification; a redundant grant from B
407// returns the "already resolved" indicator (resolved=false from the
408// bool plumbing landed in item 3).
409func TestE2E_PermissionFlowCrossClient(t *testing.T) {
410 t.Parallel()
411 h := newE2EHarness(t)
412 ctx, cancel := context.WithCancel(t.Context())
413 t.Cleanup(cancel)
414
415 cidA := uuid.New().String()
416 cidB := uuid.New().String()
417
418 evcA, cancelA := h.subscribeSSE(t, ctx, h.workspace.ID, cidA)
419 t.Cleanup(cancelA)
420 evcB, cancelB := h.subscribeSSE(t, ctx, h.workspace.ID, cidB)
421 t.Cleanup(cancelB)
422
423 h.waitForAttached(t, 2)
424
425 // Drive the permission request from a goroutine simulating the
426 // tool path. Request blocks until resolved; capture the outcome.
427 const sessionID = "s-perm"
428 const toolCallID = "tc-1"
429 type result struct {
430 granted bool
431 err error
432 }
433 done := make(chan result, 1)
434 go func() {
435 granted, err := h.app.Permissions.Request(ctx, permission.CreatePermissionRequest{
436 SessionID: sessionID,
437 ToolCallID: toolCallID,
438 ToolName: "view",
439 Description: "read a file",
440 Action: "read",
441 Path: h.workspace.Path,
442 })
443 done <- result{granted: granted, err: err}
444 }()
445
446 // Wait for the PermissionRequest to arrive on client A's SSE
447 // stream. We need its ID to drive the grant.
448 pickCtx, pickCancel := context.WithTimeout(ctx, 3*time.Second)
449 defer pickCancel()
450 reqEv, ok := drainUntil(pickCtx, evcA, func(e pubsub.Event[proto.PermissionRequest]) bool {
451 return e.Payload.ToolCallID == toolCallID
452 })
453 require.True(t, ok, "client A must receive the PermissionRequest")
454
455 // Client A grants — first grant must report resolved=true.
456 resolvedA := h.grantPermission(t, ctx, h.workspace.ID, proto.PermissionGrant{
457 Permission: reqEv.Payload,
458 Action: proto.PermissionAllow,
459 })
460 require.True(t, resolvedA, "client A's grant must resolve the pending request")
461
462 // The blocked Request call must now return granted=true.
463 select {
464 case r := <-done:
465 require.NoError(t, r.err)
466 require.True(t, r.granted)
467 case <-pickCtx.Done():
468 t.Fatal("permission Request did not return after grant")
469 }
470
471 // Client B must receive a PermissionNotification with
472 // Granted=true for the same ToolCallID. The initial neither-
473 // granted-nor-denied notification published at the start of
474 // Request also lands on B's stream — match on the granted one.
475 notif, ok := drainUntil(pickCtx, evcB, func(e pubsub.Event[proto.PermissionNotification]) bool {
476 return e.Payload.ToolCallID == toolCallID && e.Payload.Granted
477 })
478 require.True(t, ok, "client B must receive a granting PermissionNotification")
479 require.True(t, notif.Payload.Granted)
480 require.False(t, notif.Payload.Denied)
481
482 // A follow-up grant from client B must report resolved=false
483 // (the request was already resolved by A).
484 resolvedB := h.grantPermission(t, ctx, h.workspace.ID, proto.PermissionGrant{
485 Permission: reqEv.Payload,
486 Action: proto.PermissionAllow,
487 })
488 require.False(t, resolvedB, "client B's follow-up grant must report already resolved")
489}
490
491// TestE2E_KillingClientASSEDoesNotBreakClientB covers PLAN item 6
492// scenario 3: terminating client A's SSE stream does not affect
493// client B's stream; client B continues to receive events.
494func TestE2E_KillingClientASSEDoesNotBreakClientB(t *testing.T) {
495 t.Parallel()
496 h := newE2EHarness(t)
497 ctxB, cancelB := context.WithCancel(t.Context())
498 t.Cleanup(cancelB)
499 ctxA, cancelA := context.WithCancel(t.Context())
500
501 cidA := uuid.New().String()
502 cidB := uuid.New().String()
503
504 _, killA := h.subscribeSSE(t, ctxA, h.workspace.ID, cidA)
505 t.Cleanup(killA)
506 evcB, killB := h.subscribeSSE(t, ctxB, h.workspace.ID, cidB)
507 t.Cleanup(killB)
508
509 h.waitForAttached(t, 2)
510
511 // Kill A's stream. The server's deferred DetachClient should
512 // drop A's claim, leaving B as the sole attached client.
513 cancelA()
514 killA()
515
516 require.Eventually(t, func() bool {
517 return backend.WorkspaceLiveStreamCountForTest(h.workspace) == 1
518 }, 3*time.Second, 10*time.Millisecond,
519 "expected client A's stream to drop the attached count to 1")
520
521 // Workspace must still exist (B is holding it open) and
522 // shutdown callback must not have fired yet.
523 _, err := h.backend.GetWorkspace(h.workspace.ID)
524 require.NoError(t, err, "workspace must still exist while B is attached")
525 require.False(t, h.shutdownHit.Load(),
526 "shutdown callback must not fire while B is still attached")
527
528 // Publish a fresh event; B must still receive it.
529 const sessionID = "s-after-a-died"
530 msg := message.Message{
531 ID: "m-after",
532 SessionID: sessionID,
533 Role: message.Assistant,
534 Parts: []message.ContentPart{message.TextContent{Text: "still alive"}},
535 }
536 h.app.SendEvent(pubsub.Event[message.Message]{
537 Type: pubsub.CreatedEvent,
538 Payload: msg,
539 })
540
541 pickCtx, pickCancel := context.WithTimeout(ctxB, 3*time.Second)
542 defer pickCancel()
543 got, ok := drainUntil(pickCtx, evcB, func(e pubsub.Event[proto.Message]) bool {
544 return e.Payload.ID == "m-after"
545 })
546 require.True(t, ok, "client B must still receive events after A's stream is killed")
547 require.Equal(t, sessionID, got.Payload.SessionID)
548}
549
550// TestE2E_ShutdownCallbackFiresWhenLastClientLeaves covers PLAN
551// item 6 scenario 4: once both clients disconnect, the backend
552// runs its "last workspace removed -> server shutdown" path.
553func TestE2E_ShutdownCallbackFiresWhenLastClientLeaves(t *testing.T) {
554 t.Parallel()
555 h := newE2EHarness(t)
556
557 ctxA, cancelA := context.WithCancel(t.Context())
558 ctxB, cancelB := context.WithCancel(t.Context())
559 t.Cleanup(cancelA)
560 t.Cleanup(cancelB)
561
562 cidA := uuid.New().String()
563 cidB := uuid.New().String()
564 _, killA := h.subscribeSSE(t, ctxA, h.workspace.ID, cidA)
565 t.Cleanup(killA)
566 _, killB := h.subscribeSSE(t, ctxB, h.workspace.ID, cidB)
567 t.Cleanup(killB)
568
569 h.waitForAttached(t, 2)
570 require.False(t, h.shutdownHit.Load(), "shutdown must not fire while clients are attached")
571
572 cancelA()
573 killA()
574 require.Eventually(t, func() bool {
575 return backend.WorkspaceLiveStreamCountForTest(h.workspace) == 1
576 }, 3*time.Second, 10*time.Millisecond)
577 require.False(t, h.shutdownHit.Load(),
578 "shutdown must not fire after only one client disconnects")
579
580 cancelB()
581 killB()
582 require.Eventually(t, h.shutdownHit.Load,
583 3*time.Second, 10*time.Millisecond,
584 "shutdown callback must fire once the last client disconnects")
585
586 // Workspace must be gone from the index.
587 _, err := h.backend.GetWorkspace(h.workspace.ID)
588 require.ErrorIs(t, err, backend.ErrWorkspaceNotFound)
589
590 // Subsequent GETs against the now-defunct workspace return
591 // 404, confirming the http surface still reflects the teardown.
592 req, err := http.NewRequestWithContext(t.Context(), http.MethodGet,
593 h.httpSrv.URL+"/v1/workspaces/"+h.workspace.ID, nil)
594 require.NoError(t, err)
595 r, err := h.httpSrv.Client().Do(req)
596 require.NoError(t, err)
597 _, _ = io.Copy(io.Discard, r.Body)
598 r.Body.Close()
599 require.Equal(t, http.StatusNotFound, r.StatusCode)
600}