diff --git a/internal/app/testing.go b/internal/app/testing.go new file mode 100644 index 0000000000000000000000000000000000000000..f17e94cfa99411b4594fce72bd894cc5fba4c4fd --- /dev/null +++ b/internal/app/testing.go @@ -0,0 +1,69 @@ +package app + +import ( + "context" + "sync" + + tea "charm.land/bubbletea/v2" + "github.com/charmbracelet/crush/internal/agent/notify" + "github.com/charmbracelet/crush/internal/permission" + "github.com/charmbracelet/crush/internal/pubsub" +) + +// NewForTest constructs a minimal [App] suitable for in-process tests +// that need a working event broker and permission service without +// booting a real config, database, LSP, MCP, or agent coordinator. +// +// The returned App has: +// +// - A live `events` broker that [App.SendEvent] publishes to and +// [App.Events] subscribes from. +// - A real [permission.Service] whose request and notification +// brokers are fanned into the events broker, so subscribers to +// [App.Events] observe the same permission events the production +// wiring would deliver to SSE clients. +// - An [App.agentNotifications] broker. +// +// The caller owns lifetime: cancel ctx (or call [App.Shutdown]) to +// tear down the fan-in goroutines and the events broker. +func NewForTest(ctx context.Context) *App { + app := &App{ + Permissions: permission.NewPermissionService("", false, nil), + globalCtx: ctx, + events: pubsub.NewBroker[tea.Msg](), + serviceEventsWG: &sync.WaitGroup{}, + tuiWG: &sync.WaitGroup{}, + agentNotifications: pubsub.NewBroker[notify.Notification](), + } + + eventsCtx, cancel := context.WithCancel(ctx) + app.eventsCtx = eventsCtx + setupSubscriber(eventsCtx, app.serviceEventsWG, "permissions", + app.Permissions.Subscribe, app.events) + setupSubscriber(eventsCtx, app.serviceEventsWG, "permissions-notifications", + app.Permissions.SubscribeNotifications, app.events) + setupSubscriber(eventsCtx, app.serviceEventsWG, "agent-notifications", + app.agentNotifications.Subscribe, app.events) + app.cleanupFuncs = append(app.cleanupFuncs, func(context.Context) error { + cancel() + app.serviceEventsWG.Wait() + app.events.Shutdown() + return nil + }) + return app +} + +// ShutdownForTest tears down the App's event broker and fan-in +// goroutines. It is safe to call multiple times. +// +// Use this in tests instead of [App.Shutdown], which drives a full +// production shutdown path (database release, LSP teardown, MCP +// shutdown) that synthetic test apps cannot satisfy. +func (app *App) ShutdownForTest() { + for _, cleanup := range app.cleanupFuncs { + if cleanup != nil { + _ = cleanup(context.Background()) + } + } + app.cleanupFuncs = nil +} diff --git a/internal/backend/testing.go b/internal/backend/testing.go index 9ffbc58edbc8b1ff86f50bd9633ac65f9564394c..6616e0f19e06595fac68808b484394d960d7f79f 100644 --- a/internal/backend/testing.go +++ b/internal/backend/testing.go @@ -38,3 +38,18 @@ func RegisterClientForTesting(b *Backend, ws *Workspace, clientID string) error func SetWorkspaceShutdownFnForTest(ws *Workspace, fn func()) { ws.shutdownFn = fn } + +// WorkspaceLiveStreamCountForTest returns the number of clients on ws +// that have at least one live SSE stream. Used by integration tests +// in other packages to wait for SSE attaches before publishing events. +func WorkspaceLiveStreamCountForTest(ws *Workspace) int { + ws.clientsMu.Lock() + defer ws.clientsMu.Unlock() + n := 0 + for _, cs := range ws.clients { + if cs.streams > 0 { + n++ + } + } + return n +} diff --git a/internal/server/e2e_test.go b/internal/server/e2e_test.go new file mode 100644 index 0000000000000000000000000000000000000000..2798e460a51627f8c0534042da4a65deb13706dd --- /dev/null +++ b/internal/server/e2e_test.go @@ -0,0 +1,594 @@ +package server + +import ( + "bufio" + "bytes" + "context" + "encoding/json" + "io" + "net/http" + "net/http/httptest" + "net/url" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/charmbracelet/crush/internal/app" + "github.com/charmbracelet/crush/internal/backend" + "github.com/charmbracelet/crush/internal/message" + "github.com/charmbracelet/crush/internal/permission" + "github.com/charmbracelet/crush/internal/proto" + "github.com/charmbracelet/crush/internal/pubsub" + "github.com/google/uuid" + "github.com/stretchr/testify/require" +) + +// e2eHarness wires a Server, its Backend (with a custom shutdownFn we +// can observe), an httptest.NewServer, and a synthetic Workspace whose +// embedded App has a live event broker. It is the minimum scaffolding +// the multi-client end-to-end scenarios in PLAN item 6 need. +type e2eHarness struct { + httpSrv *httptest.Server + srv *Server + backend *backend.Backend + workspace *backend.Workspace + app *app.App + shutdownHit atomic.Bool + + // sseWG tracks every SSE reader goroutine spawned by + // [e2eHarness.subscribeSSE]. The harness's cleanup hook waits on + // it after the httptest server has been closed so that the test + // cannot leave behind background readers (and therefore unclosed + // response bodies) after returning. + sseWG sync.WaitGroup +} + +// installServer attaches a fresh Server (with a custom shutdown +// callback that flips [e2eHarness.shutdownHit]) wrapped in an +// [httptest.Server] onto h. It registers the cleanup hooks for the +// httptest server and the SSE reader WaitGroup in the order required +// by the LIFO contract documented on [newE2EHarness]. +// +// Callers that want a fully synthetic workspace use [newE2EHarness]; +// callers that want to drive the real CreateWorkspace HTTP path use +// [newRealCreateHarness] and then [e2eHarness.postWorkspace]. +func (h *e2eHarness) installServer(t *testing.T) { + t.Helper() + srv := &Server{} + srv.backend = backend.New(context.Background(), nil, func() { + h.shutdownHit.Store(true) + }) + srv.installHandler() + + hs := httptest.NewServer(srv.Handler()) + // Order matters: t.Cleanup is LIFO and the test's own per- + // stream cancels (cancelA/cancelB) run first. After those, we + // want hs.Close to fire first (so any handler still parked in + // its `select` returns), THEN sseWG.Wait so every reader + // goroutine exits and closes its response body. Any caller- + // owned cleanups registered *before* installServer (e.g. App + // teardown for the synthetic harness) therefore run LAST, + // after the readers have drained. + t.Cleanup(h.sseWG.Wait) + t.Cleanup(hs.Close) + + h.httpSrv = hs + h.srv = srv + h.backend = srv.backend +} + +// newE2EHarness builds an in-process server + a synthetic Workspace +// whose embedded App is a real [app.App] constructed via +// [app.NewForTest], so its event broker delivers everything the SSE +// pipeline expects. Used by the scenarios that do not need to +// exercise the path-dedupe behavior of [backend.CreateWorkspace]. +// +// Cleanup tears down the App's broker only after sseWG.Wait and +// hs.Close have run, so SSE readers cannot observe a dead broker. +func newE2EHarness(t *testing.T) *e2eHarness { + t.Helper() + + h := &e2eHarness{} + + // Register the App teardown FIRST so LIFO order puts it AFTER + // the cleanups that installServer registers below (hs.Close + + // sseWG.Wait). + appCtx, cancel := context.WithCancel(context.Background()) + a := app.NewForTest(appCtx) + t.Cleanup(func() { + cancel() + a.ShutdownForTest() + }) + + h.installServer(t) + + ws := &backend.Workspace{ + ID: uuid.New().String(), + Path: t.TempDir(), + App: a, + } + // Synthetic workspaces have an incomplete App; bypass the + // default teardown so the "last workspace removed" path can run + // without panicking inside [app.App.Shutdown]. + backend.SetWorkspaceShutdownFnForTest(ws, func() {}) + backend.InsertWorkspaceForTest(h.backend, ws) + + h.workspace = ws + h.app = a + return h +} + +// newRealCreateHarness builds an in-process server WITHOUT any +// pre-inserted workspace, intended for tests that drive the real +// [backend.CreateWorkspace] HTTP path (path-dedupe scenario). It +// isolates HOME/XDG_* via [t.Setenv] so [config.Init] doesn't read +// the host machine's config, which means callers MUST NOT mark the +// test as parallel. +func newRealCreateHarness(t *testing.T) *e2eHarness { + t.Helper() + t.Setenv("HOME", t.TempDir()) + t.Setenv("XDG_CACHE_HOME", t.TempDir()) + t.Setenv("XDG_CONFIG_HOME", t.TempDir()) + t.Setenv("XDG_DATA_HOME", t.TempDir()) + + h := &e2eHarness{} + h.installServer(t) + return h +} + +// postWorkspace drives the real POST /v1/workspaces handler and +// returns the resolved workspace proto. This is how scenario 1 +// exercises the path-dedupe behavior from PLAN item 1: two calls +// with the same Path and distinct ClientIDs must return the same +// workspace ID. +func (h *e2eHarness) postWorkspace(t *testing.T, args proto.Workspace) proto.Workspace { + t.Helper() + body, err := json.Marshal(args) + require.NoError(t, err) + req, err := http.NewRequestWithContext(t.Context(), http.MethodPost, + h.httpSrv.URL+"/v1/workspaces", bytes.NewReader(body)) + require.NoError(t, err) + req.Header.Set("Content-Type", "application/json") + resp, err := h.httpSrv.Client().Do(req) + require.NoError(t, err) + defer resp.Body.Close() + require.Equal(t, http.StatusOK, resp.StatusCode, "POST /v1/workspaces must succeed") + var out proto.Workspace + require.NoError(t, json.NewDecoder(resp.Body).Decode(&out)) + require.NotEmpty(t, out.ID, "server must return a workspace id") + return out +} + +// subscribeSSE opens an SSE stream against the test server for the +// given workspace and client ID. It returns a channel of decoded +// envelopes plus a cancel function that closes the stream. The +// returned channel is closed when the stream ends. +func (h *e2eHarness) subscribeSSE(t *testing.T, ctx context.Context, workspaceID, clientID string) (<-chan any, context.CancelFunc) { + t.Helper() + streamCtx, cancel := context.WithCancel(ctx) + + q := url.Values{"client_id": []string{clientID}} + reqURL := h.httpSrv.URL + "/v1/workspaces/" + workspaceID + "/events?" + q.Encode() + req, err := http.NewRequestWithContext(streamCtx, http.MethodGet, reqURL, nil) + require.NoError(t, err) + req.Header.Set("Accept", "text/event-stream") + + resp, err := h.httpSrv.Client().Do(req) + require.NoError(t, err) + require.Equal(t, http.StatusOK, resp.StatusCode, "SSE subscribe should return 200") + + out := make(chan any, 64) + h.sseWG.Go(func() { + defer resp.Body.Close() + defer close(out) + reader := bufio.NewReader(resp.Body) + for { + line, err := reader.ReadBytes('\n') + if err != nil { + return + } + line = bytes.TrimSpace(line) + if len(line) == 0 { + continue + } + data, ok := bytes.CutPrefix(line, []byte("data:")) + if !ok { + continue + } + data = bytes.TrimSpace(data) + var p pubsub.Payload + if err := json.Unmarshal(data, &p); err != nil { + continue + } + ev, decoded := decodeSSEEnvelope(p) + if !decoded { + continue + } + select { + case out <- ev: + case <-streamCtx.Done(): + return + } + } + }) + return out, cancel +} + +// decodeSSEEnvelope decodes the discriminated SSE envelope into the +// concrete pubsub.Event[proto.X] payload the e2e tests care about. +// Unknown payload types are skipped so tests can match on type +// assertions without worrying about envelope noise. +func decodeSSEEnvelope(p pubsub.Payload) (any, bool) { + switch p.Type { + case pubsub.PayloadTypePermissionRequest: + var e pubsub.Event[proto.PermissionRequest] + if err := json.Unmarshal(p.Payload, &e); err != nil { + return nil, false + } + return e, true + case pubsub.PayloadTypePermissionNotification: + var e pubsub.Event[proto.PermissionNotification] + if err := json.Unmarshal(p.Payload, &e); err != nil { + return nil, false + } + return e, true + case pubsub.PayloadTypeMessage: + var e pubsub.Event[proto.Message] + if err := json.Unmarshal(p.Payload, &e); err != nil { + return nil, false + } + return e, true + } + return nil, false +} + +// grantPermission posts a permission grant via the HTTP surface and +// returns the server's "resolved" verdict. Mirrors the client-side +// GrantPermission flow without importing internal/client (which +// would create an import cycle from this in-package test). +func (h *e2eHarness) grantPermission(t *testing.T, ctx context.Context, workspaceID string, req proto.PermissionGrant) bool { + t.Helper() + body, err := json.Marshal(req) + require.NoError(t, err) + httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, + h.httpSrv.URL+"/v1/workspaces/"+workspaceID+"/permissions/grant", + bytes.NewReader(body)) + require.NoError(t, err) + httpReq.Header.Set("Content-Type", "application/json") + resp, err := h.httpSrv.Client().Do(httpReq) + require.NoError(t, err) + defer resp.Body.Close() + require.Equal(t, http.StatusOK, resp.StatusCode) + var out proto.PermissionGrantResponse + require.NoError(t, json.NewDecoder(resp.Body).Decode(&out)) + return out.Resolved +} + +// waitForAttached spins until the workspace's clients map reports at +// least n entries with streams > 0. Catches the race where a test +// publishes events before the server-side AttachClient has completed. +func (h *e2eHarness) waitForAttached(t *testing.T, n int) { + t.Helper() + h.waitForAttachedOn(t, h.workspace, n) +} + +// waitForAttachedOn is the workspace-explicit form of waitForAttached. +// Tests that drive a workspace whose pointer is not stored on the +// harness (e.g. the real CreateWorkspace path) pass the workspace in. +func (h *e2eHarness) waitForAttachedOn(t *testing.T, ws *backend.Workspace, n int) { + t.Helper() + deadline := time.Now().Add(2 * time.Second) + for time.Now().Before(deadline) { + if backend.WorkspaceLiveStreamCountForTest(ws) >= n { + return + } + time.Sleep(5 * time.Millisecond) + } + t.Fatalf("expected %d attached streams, have %d", n, + backend.WorkspaceLiveStreamCountForTest(ws)) +} + +// drainUntil reads from evc until it sees an event of type T that +// satisfies match, or ctx expires. Returns the matching event and +// ok=true, or the zero value and ok=false on timeout. +func drainUntil[T any](ctx context.Context, evc <-chan any, match func(T) bool) (T, bool) { + var zero T + for { + select { + case <-ctx.Done(): + return zero, false + case ev, ok := <-evc: + if !ok { + return zero, false + } + typed, isT := ev.(T) + if !isT { + continue + } + if match == nil || match(typed) { + return typed, true + } + } + } +} + +// TestE2E_TwoClientsReceiveSameMessage covers PLAN item 6 scenario 1: +// two clients POST /v1/workspaces with the same Path and observe +// that the server returns a single workspace (path-dedupe from PLAN +// item 1) and that an event published on that workspace fans out to +// both SSE streams. +// +// Cannot run in parallel: it isolates HOME/XDG_* via t.Setenv so +// config.Init does not read the host machine's real config. +func TestE2E_TwoClientsReceiveSameMessage(t *testing.T) { + h := newRealCreateHarness(t) + // Shorten the create-grace window so the workspace's pending + // creation holds release quickly during test cleanup once both + // SSE streams have been detached. + h.backend.SetCreateGrace(200 * time.Millisecond) + + ctx, cancel := context.WithCancel(t.Context()) + t.Cleanup(cancel) + + cidA := uuid.New().String() + cidB := uuid.New().String() + + // Shared workspace path. Two POSTs with this path must + // deduplicate at the backend's pathIndex and return the same + // workspace id. + wsPath := t.TempDir() + dataDir := t.TempDir() + args := proto.Workspace{Path: wsPath, DataDir: dataDir} + + argsA := args + argsA.ClientID = cidA + wsRespA := h.postWorkspace(t, argsA) + + argsB := args + argsB.ClientID = cidB + wsRespB := h.postWorkspace(t, argsB) + + require.Equal(t, wsRespA.ID, wsRespB.ID, + "POST /v1/workspaces with the same Path must return the same workspace id") + + // Look up the resulting workspace on the backend so the test + // can publish events through its real [app.App] event broker. + ws, err := h.backend.GetWorkspace(wsRespA.ID) + require.NoError(t, err) + // Override the shutdown callback so test cleanup doesn't run + // the full app.Shutdown path (which would tear down LSP/MCP/DB + // resources the test doesn't need to exercise). + backend.SetWorkspaceShutdownFnForTest(ws, func() {}) + + evcA, cancelA := h.subscribeSSE(t, ctx, ws.ID, cidA) + t.Cleanup(cancelA) + evcB, cancelB := h.subscribeSSE(t, ctx, ws.ID, cidB) + t.Cleanup(cancelB) + + h.waitForAttachedOn(t, ws, 2) + + const sessionID = "s-e2e-1" + msg := message.Message{ + ID: "m-1", + SessionID: sessionID, + Role: message.Assistant, + Parts: []message.ContentPart{message.TextContent{Text: "hello multi-client"}}, + } + ws.SendEvent(pubsub.Event[message.Message]{ + Type: pubsub.CreatedEvent, + Payload: msg, + }) + + pickCtx, pickCancel := context.WithTimeout(ctx, 3*time.Second) + defer pickCancel() + gotA, okA := drainUntil(pickCtx, evcA, func(e pubsub.Event[proto.Message]) bool { + return e.Payload.ID == "m-1" + }) + require.True(t, okA, "client A must receive the MessageEvent") + require.Equal(t, sessionID, gotA.Payload.SessionID) + + gotB, okB := drainUntil(pickCtx, evcB, func(e pubsub.Event[proto.Message]) bool { + return e.Payload.ID == "m-1" + }) + require.True(t, okB, "client B must receive the same MessageEvent") + require.Equal(t, sessionID, gotB.Payload.SessionID) +} + +// TestE2E_PermissionFlowCrossClient covers PLAN item 6 scenario 2: +// a tool-driven permission request is granted by client A; client B +// observes a PermissionNotification; a redundant grant from B +// returns the "already resolved" indicator (resolved=false from the +// bool plumbing landed in item 3). +func TestE2E_PermissionFlowCrossClient(t *testing.T) { + t.Parallel() + h := newE2EHarness(t) + ctx, cancel := context.WithCancel(t.Context()) + t.Cleanup(cancel) + + cidA := uuid.New().String() + cidB := uuid.New().String() + + evcA, cancelA := h.subscribeSSE(t, ctx, h.workspace.ID, cidA) + t.Cleanup(cancelA) + evcB, cancelB := h.subscribeSSE(t, ctx, h.workspace.ID, cidB) + t.Cleanup(cancelB) + + h.waitForAttached(t, 2) + + // Drive the permission request from a goroutine simulating the + // tool path. Request blocks until resolved; capture the outcome. + const sessionID = "s-perm" + const toolCallID = "tc-1" + type result struct { + granted bool + err error + } + done := make(chan result, 1) + go func() { + granted, err := h.app.Permissions.Request(ctx, permission.CreatePermissionRequest{ + SessionID: sessionID, + ToolCallID: toolCallID, + ToolName: "view", + Description: "read a file", + Action: "read", + Path: h.workspace.Path, + }) + done <- result{granted: granted, err: err} + }() + + // Wait for the PermissionRequest to arrive on client A's SSE + // stream. We need its ID to drive the grant. + pickCtx, pickCancel := context.WithTimeout(ctx, 3*time.Second) + defer pickCancel() + reqEv, ok := drainUntil(pickCtx, evcA, func(e pubsub.Event[proto.PermissionRequest]) bool { + return e.Payload.ToolCallID == toolCallID + }) + require.True(t, ok, "client A must receive the PermissionRequest") + + // Client A grants — first grant must report resolved=true. + resolvedA := h.grantPermission(t, ctx, h.workspace.ID, proto.PermissionGrant{ + Permission: reqEv.Payload, + Action: proto.PermissionAllow, + }) + require.True(t, resolvedA, "client A's grant must resolve the pending request") + + // The blocked Request call must now return granted=true. + select { + case r := <-done: + require.NoError(t, r.err) + require.True(t, r.granted) + case <-pickCtx.Done(): + t.Fatal("permission Request did not return after grant") + } + + // Client B must receive a PermissionNotification with + // Granted=true for the same ToolCallID. The initial neither- + // granted-nor-denied notification published at the start of + // Request also lands on B's stream — match on the granted one. + notif, ok := drainUntil(pickCtx, evcB, func(e pubsub.Event[proto.PermissionNotification]) bool { + return e.Payload.ToolCallID == toolCallID && e.Payload.Granted + }) + require.True(t, ok, "client B must receive a granting PermissionNotification") + require.True(t, notif.Payload.Granted) + require.False(t, notif.Payload.Denied) + + // A follow-up grant from client B must report resolved=false + // (the request was already resolved by A). + resolvedB := h.grantPermission(t, ctx, h.workspace.ID, proto.PermissionGrant{ + Permission: reqEv.Payload, + Action: proto.PermissionAllow, + }) + require.False(t, resolvedB, "client B's follow-up grant must report already resolved") +} + +// TestE2E_KillingClientASSEDoesNotBreakClientB covers PLAN item 6 +// scenario 3: terminating client A's SSE stream does not affect +// client B's stream; client B continues to receive events. +func TestE2E_KillingClientASSEDoesNotBreakClientB(t *testing.T) { + t.Parallel() + h := newE2EHarness(t) + ctxB, cancelB := context.WithCancel(t.Context()) + t.Cleanup(cancelB) + ctxA, cancelA := context.WithCancel(t.Context()) + + cidA := uuid.New().String() + cidB := uuid.New().String() + + _, killA := h.subscribeSSE(t, ctxA, h.workspace.ID, cidA) + t.Cleanup(killA) + evcB, killB := h.subscribeSSE(t, ctxB, h.workspace.ID, cidB) + t.Cleanup(killB) + + h.waitForAttached(t, 2) + + // Kill A's stream. The server's deferred DetachClient should + // drop A's claim, leaving B as the sole attached client. + cancelA() + killA() + + require.Eventually(t, func() bool { + return backend.WorkspaceLiveStreamCountForTest(h.workspace) == 1 + }, 3*time.Second, 10*time.Millisecond, + "expected client A's stream to drop the attached count to 1") + + // Workspace must still exist (B is holding it open) and + // shutdown callback must not have fired yet. + _, err := h.backend.GetWorkspace(h.workspace.ID) + require.NoError(t, err, "workspace must still exist while B is attached") + require.False(t, h.shutdownHit.Load(), + "shutdown callback must not fire while B is still attached") + + // Publish a fresh event; B must still receive it. + const sessionID = "s-after-a-died" + msg := message.Message{ + ID: "m-after", + SessionID: sessionID, + Role: message.Assistant, + Parts: []message.ContentPart{message.TextContent{Text: "still alive"}}, + } + h.app.SendEvent(pubsub.Event[message.Message]{ + Type: pubsub.CreatedEvent, + Payload: msg, + }) + + pickCtx, pickCancel := context.WithTimeout(ctxB, 3*time.Second) + defer pickCancel() + got, ok := drainUntil(pickCtx, evcB, func(e pubsub.Event[proto.Message]) bool { + return e.Payload.ID == "m-after" + }) + require.True(t, ok, "client B must still receive events after A's stream is killed") + require.Equal(t, sessionID, got.Payload.SessionID) +} + +// TestE2E_ShutdownCallbackFiresWhenLastClientLeaves covers PLAN +// item 6 scenario 4: once both clients disconnect, the backend +// runs its "last workspace removed -> server shutdown" path. +func TestE2E_ShutdownCallbackFiresWhenLastClientLeaves(t *testing.T) { + t.Parallel() + h := newE2EHarness(t) + + ctxA, cancelA := context.WithCancel(t.Context()) + ctxB, cancelB := context.WithCancel(t.Context()) + t.Cleanup(cancelA) + t.Cleanup(cancelB) + + cidA := uuid.New().String() + cidB := uuid.New().String() + _, killA := h.subscribeSSE(t, ctxA, h.workspace.ID, cidA) + t.Cleanup(killA) + _, killB := h.subscribeSSE(t, ctxB, h.workspace.ID, cidB) + t.Cleanup(killB) + + h.waitForAttached(t, 2) + require.False(t, h.shutdownHit.Load(), "shutdown must not fire while clients are attached") + + cancelA() + killA() + require.Eventually(t, func() bool { + return backend.WorkspaceLiveStreamCountForTest(h.workspace) == 1 + }, 3*time.Second, 10*time.Millisecond) + require.False(t, h.shutdownHit.Load(), + "shutdown must not fire after only one client disconnects") + + cancelB() + killB() + require.Eventually(t, h.shutdownHit.Load, + 3*time.Second, 10*time.Millisecond, + "shutdown callback must fire once the last client disconnects") + + // Workspace must be gone from the index. + _, err := h.backend.GetWorkspace(h.workspace.ID) + require.ErrorIs(t, err, backend.ErrWorkspaceNotFound) + + // Subsequent GETs against the now-defunct workspace return + // 404, confirming the http surface still reflects the teardown. + req, err := http.NewRequestWithContext(t.Context(), http.MethodGet, + h.httpSrv.URL+"/v1/workspaces/"+h.workspace.ID, nil) + require.NoError(t, err) + r, err := h.httpSrv.Client().Do(req) + require.NoError(t, err) + _, _ = io.Copy(io.Discard, r.Body) + r.Body.Close() + require.Equal(t, http.StatusNotFound, r.StatusCode) +} diff --git a/internal/server/server.go b/internal/server/server.go index 7b05db719fdfbdabfebde6a6e91f3da1fb843d3a..314b8e922cb86acf5fabaa92eb8bbd112a90db94 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -100,7 +100,18 @@ func NewServer(cfg *config.ConfigStore, network, address string) *Server { } }() }) + s.installHandler() + if network == "tcp" { + s.h.Addr = address + } + return s +} +// installHandler builds the protocol/router around s.backend and +// assigns the resulting http.Server to s.h. Extracted from +// [NewServer] so test harnesses can wire a Server around a +// pre-constructed backend. +func (s *Server) installHandler() { var p http.Protocols p.SetHTTP1(true) p.SetUnencryptedHTTP2(true) @@ -171,10 +182,6 @@ func NewServer(cfg *config.ConfigStore, network, address string) *Server { Protocols: &p, Handler: s.recoverHandler(s.loggingHandler(mux)), } - if network == "tcp" { - s.h.Addr = address - } - return s } // Handler returns the server's HTTP handler. Exposed so test harnesses