From ec85e9ef686c23ce3366c46589cdfabc05cd2199 Mon Sep 17 00:00:00 2001 From: Christian Rocha Date: Tue, 19 May 2026 23:53:10 -0400 Subject: [PATCH] feat(api): report how many clients are watching each session Session list and detail responses now include a count of how many connected clients are currently looking at each session. Clients that have only reserved a workspace but have not yet opened an event stream are excluded, and so are clients that are connected but not actively viewing the session. This count enables UI features like a live indicator showing whether someone else is already in a session. Co-Authored-By: Charm Crush --- internal/backend/backend.go | 30 +++++ internal/backend/backend_test.go | 88 ++++++++++++++ internal/backend/testing.go | 8 ++ internal/proto/session.go | 7 ++ internal/server/events.go | 11 ++ internal/server/proto.go | 4 + internal/server/sessions_isbusy_test.go | 149 ++++++++++++++++++++++++ 7 files changed, 297 insertions(+) diff --git a/internal/backend/backend.go b/internal/backend/backend.go index dbda67f95130da304e37e0376b2bc0690ffbfb3d..8b78f85fb1c06114ec4bafc55371c82f623e2bf7 100644 --- a/internal/backend/backend.go +++ b/internal/backend/backend.go @@ -507,6 +507,36 @@ func (b *Backend) SetCurrentSession(workspaceID, clientID, sessionID string) err return nil } +// AttachedClients returns the number of clients currently viewing +// sessionID in the given workspace. Only clients with at least one live +// SSE stream (streams > 0) AND a matching currentSessionID are counted; +// pure creation holds do not contribute. Returns [ErrWorkspaceNotFound] +// if the workspace is unknown. +func (b *Backend) AttachedClients(workspaceID, sessionID string) (int, error) { + ws, ok := b.workspaces.Get(workspaceID) + if !ok { + return 0, ErrWorkspaceNotFound + } + return ws.AttachedClientsForSession(sessionID), nil +} + +// AttachedClientsForSession returns the number of clients in this +// workspace whose currentSessionID equals sessionID and which have at +// least one live SSE stream. Hold-only clients (streams == 0) do not +// contribute. Acquires the workspace's [clientsMu] briefly; the +// returned count is a point-in-time snapshot. +func (w *Workspace) AttachedClientsForSession(sessionID string) int { + w.clientsMu.Lock() + defer w.clientsMu.Unlock() + n := 0 + for _, cs := range w.clients { + if cs.streams > 0 && cs.currentSessionID == sessionID { + n++ + } + } + return n +} + // GetWorkspaceProto returns the proto representation of a workspace. func (b *Backend) GetWorkspaceProto(id string) (proto.Workspace, error) { ws, err := b.GetWorkspace(id) diff --git a/internal/backend/backend_test.go b/internal/backend/backend_test.go index 0ab01b5c54f6622b2d3997ca21e59e1bb2b0ff54..8ee69c9e574bda73351a3ddaf1b10fafa36a8ec7 100644 --- a/internal/backend/backend_test.go +++ b/internal/backend/backend_test.go @@ -1132,3 +1132,91 @@ func TestSetCurrentSession_RaceWithDetach(t *testing.T) { require.Contains(t, ws.clients, cidB, "remaining client must still be present") require.Equal(t, "SB", ws.clients[cidB].currentSessionID, "remaining client must keep its last set session") } + +// TestAttachedClients_BasicLifecycle walks one session's count through +// attach -> set -> second client joins -> switch -> detach. It also +// confirms hold-only and unselected clients do not contribute. +func TestAttachedClients_BasicLifecycle(t *testing.T) { + t.Parallel() + + b, _ := newTestBackend(t) + // Keep the grace window long so the hold-only client survives. + b.createGrace = time.Hour + ws, _ := insertTestWorkspace(t, b, "/tmp/attached-clients-basic") + + // No clients yet. + n, err := b.AttachedClients(ws.ID, "S1") + require.NoError(t, err) + require.Zero(t, n) + + // Attach A, set to S1. Count for S1 is 1; count for S2 is 0. + cidA := newClientID(t) + require.NoError(t, b.AttachClient(ws.ID, cidA)) + require.NoError(t, b.SetCurrentSession(ws.ID, cidA, "S1")) + + n, err = b.AttachedClients(ws.ID, "S1") + require.NoError(t, err) + require.Equal(t, 1, n) + n, err = b.AttachedClients(ws.ID, "S2") + require.NoError(t, err) + require.Zero(t, n) + + // Attach B, set to S1. Count for S1 is 2. + cidB := newClientID(t) + require.NoError(t, b.AttachClient(ws.ID, cidB)) + require.NoError(t, b.SetCurrentSession(ws.ID, cidB, "S1")) + + n, _ = b.AttachedClients(ws.ID, "S1") + require.Equal(t, 2, n) + + // B switches to S2; counts redistribute. + require.NoError(t, b.SetCurrentSession(ws.ID, cidB, "S2")) + n, _ = b.AttachedClients(ws.ID, "S1") + require.Equal(t, 1, n) + n, _ = b.AttachedClients(ws.ID, "S2") + require.Equal(t, 1, n) + + // A hold-only client must NOT be counted, even if we were able to + // imagine a currentSessionID on it. registerClient leaves + // currentSessionID empty by construction, and SetCurrentSession + // rejects hold-only writers — so the contract holds two ways. + cidHold := newClientID(t) + b.registerClient(ws, cidHold) + t.Cleanup(func() { _ = b.releaseHold(ws.ID, cidHold) }) + n, _ = b.AttachedClients(ws.ID, "S1") + require.Equal(t, 1, n, "hold-only client must not contribute") + n, _ = b.AttachedClients(ws.ID, "") + require.Equal(t, 0, n, + "empty sessionID must not match the hold-only entry (streams==0)") + + // A client with streams > 0 but currentSessionID == "" is NOT + // counted toward any non-empty session, and is matched only + // against the empty session id (which represents the landing + // screen). + cidC := newClientID(t) + require.NoError(t, b.AttachClient(ws.ID, cidC)) + n, _ = b.AttachedClients(ws.ID, "S1") + require.Equal(t, 1, n, "stream-only client with empty currentSessionID must not be counted toward S1") + n, _ = b.AttachedClients(ws.ID, "") + require.Equal(t, 1, n, "stream-only client with empty currentSessionID matches the empty session id") + + // B detaches: count for S2 drops to 0. + b.DetachClient(ws.ID, cidB) + n, _ = b.AttachedClients(ws.ID, "S2") + require.Zero(t, n) + n, _ = b.AttachedClients(ws.ID, "S1") + require.Equal(t, 1, n, "A still on S1") + + // Final cleanup. + b.DetachClient(ws.ID, cidA) + b.DetachClient(ws.ID, cidC) +} + +// TestAttachedClients_UnknownWorkspace verifies the error surface. +func TestAttachedClients_UnknownWorkspace(t *testing.T) { + t.Parallel() + + b, _ := newTestBackend(t) + _, err := b.AttachedClients("00000000-0000-0000-0000-000000000000", "S1") + require.ErrorIs(t, err, ErrWorkspaceNotFound) +} diff --git a/internal/backend/testing.go b/internal/backend/testing.go index 7863877b1cfeae464184aa4cd921c301cddfabae..9ffbc58edbc8b1ff86f50bd9633ac65f9564394c 100644 --- a/internal/backend/testing.go +++ b/internal/backend/testing.go @@ -30,3 +30,11 @@ func RegisterClientForTesting(b *Backend, ws *Workspace, clientID string) error b.registerClient(ws, clientID) return nil } + +// SetWorkspaceShutdownFnForTest overrides the workspace teardown +// callback. Useful for tests in other packages that drive synthetic +// workspaces (where the embedded [app.App] is incomplete) through +// detach paths that would otherwise crash inside App.Shutdown. +func SetWorkspaceShutdownFnForTest(ws *Workspace, fn func()) { + ws.shutdownFn = fn +} diff --git a/internal/proto/session.go b/internal/proto/session.go index 4652065ac881f4fe06bfbc019164cf5cdcaf8caf..9c49e439ccdfda35144740835bd7e3a25741ecb7 100644 --- a/internal/proto/session.go +++ b/internal/proto/session.go @@ -7,6 +7,12 @@ package proto // It is populated by REST handlers in internal/server/proto.go from the // workspace's AgentCoordinator. The Session SSE event path does not set // it, since SSE consumers can compute presence from other agent signals. +// +// AttachedClients counts the number of clients currently viewing this +// session — i.e. entries in the workspace's clients map whose +// currentSessionID equals this session's ID and which have at least one +// live SSE stream. Hold-only clients (streams == 0) do not contribute. +// Like IsBusy, it is computed on read by REST handlers. type Session struct { ID string `json:"id"` ParentSessionID string `json:"parent_session_id"` @@ -20,6 +26,7 @@ type Session struct { CreatedAt int64 `json:"created_at"` UpdatedAt int64 `json:"updated_at"` IsBusy bool `json:"is_busy"` + AttachedClients int `json:"attached_clients"` } // Todo represents a single todo entry on a session in the proto layer. diff --git a/internal/server/events.go b/internal/server/events.go index 2e6fcd92b6f982b7c1f597b049908cd295826a3e..8a0ab777d77dca9ece44cbaca579676b520708ca 100644 --- a/internal/server/events.go +++ b/internal/server/events.go @@ -156,6 +156,17 @@ func isSessionBusy(ws *backend.Workspace, sessionID string) bool { return ws.AgentCoordinator.IsSessionBusy(sessionID) } +// attachedClients returns the number of clients currently viewing +// sessionID in ws. Hold-only clients (streams == 0) do not contribute. +// A nil workspace is treated as zero so handlers can pass GetWorkspace's +// result through without an extra guard. +func attachedClients(ws *backend.Workspace, sessionID string) int { + if ws == nil { + return 0 + } + return ws.AttachedClientsForSession(sessionID) +} + func todosToProto(todos []session.Todo) []proto.Todo { if len(todos) == 0 { return nil diff --git a/internal/server/proto.go b/internal/server/proto.go index b6e7077fe0aded2481dc3e241f44870f1ce76c01..6d3eebb562784adb377eebfb480852dcda53642a 100644 --- a/internal/server/proto.go +++ b/internal/server/proto.go @@ -381,6 +381,7 @@ func (c *controllerV1) handleGetWorkspaceSessions(w http.ResponseWriter, r *http for i, s := range sessions { result[i] = sessionToProto(s) result[i].IsBusy = isSessionBusy(ws, s.ID) + result[i].AttachedClients = attachedClients(ws, s.ID) } jsonEncode(w, result) } @@ -416,6 +417,7 @@ func (c *controllerV1) handlePostWorkspaceSessions(w http.ResponseWriter, r *htt ws, _ := c.backend.GetWorkspace(id) out := sessionToProto(sess) out.IsBusy = isSessionBusy(ws, sess.ID) + out.AttachedClients = attachedClients(ws, sess.ID) jsonEncode(w, out) } @@ -441,6 +443,7 @@ func (c *controllerV1) handleGetWorkspaceSession(w http.ResponseWriter, r *http. ws, _ := c.backend.GetWorkspace(id) out := sessionToProto(sess) out.IsBusy = isSessionBusy(ws, sess.ID) + out.AttachedClients = attachedClients(ws, sess.ID) jsonEncode(w, out) } @@ -520,6 +523,7 @@ func (c *controllerV1) handlePutWorkspaceSession(w http.ResponseWriter, r *http. ws, _ := c.backend.GetWorkspace(id) out := sessionToProto(saved) out.IsBusy = isSessionBusy(ws, saved.ID) + out.AttachedClients = attachedClients(ws, saved.ID) jsonEncode(w, out) } diff --git a/internal/server/sessions_isbusy_test.go b/internal/server/sessions_isbusy_test.go index f5127ef6e4fec7b524781d6444c2cf3b5c7f3ea0..060c00abe9367dc7162bdb50dd77fe951041aa51 100644 --- a/internal/server/sessions_isbusy_test.go +++ b/internal/server/sessions_isbusy_test.go @@ -174,3 +174,152 @@ func TestProtoSessionIsBusyBackwardCompat(t *testing.T) { require.Equal(t, "s1", old.ID) require.Equal(t, "t", old.Title) } + +// buildMultiSessionWorkspace returns a controller wired to a backend +// that owns a workspace with the given session IDs. Used to exercise +// AttachedClients counts across more than one session. +func buildMultiSessionWorkspace(t *testing.T, sessionIDs ...string) (*controllerV1, *backend.Workspace) { + t.Helper() + + b := backend.New(context.Background(), nil, nil) + a := &app.App{AgentCoordinator: &stubCoordinator{}} + sessions := make([]session.Session, len(sessionIDs)) + for i, sid := range sessionIDs { + sessions[i] = session.Session{ID: sid, Title: sid} + } + a.Sessions = &stubSessions{all: sessions} + + ws := &backend.Workspace{ + ID: uuid.New().String(), + Path: t.TempDir(), + App: a, + } + backend.InsertWorkspaceForTest(b, ws) + // Synthetic workspaces have an incomplete App; bypass the + // default teardown to avoid panics when the last client detaches. + backend.SetWorkspaceShutdownFnForTest(ws, func() {}) + + s := &Server{backend: b} + return &controllerV1{backend: b, server: s}, ws +} + +// listSessions invokes handleGetWorkspaceSessions and returns the +// decoded response so tests can assert per-session counts. +func listSessions(t *testing.T, c *controllerV1, wsID string) []proto.Session { + t.Helper() + req := httptest.NewRequestWithContext(t.Context(), http.MethodGet, "/v1/workspaces/"+wsID+"/sessions", nil) + req.SetPathValue("id", wsID) + rec := httptest.NewRecorder() + c.handleGetWorkspaceSessions(rec, req) + require.Equal(t, http.StatusOK, rec.Code) + var got []proto.Session + require.NoError(t, json.Unmarshal(rec.Body.Bytes(), &got)) + return got +} + +func countsBySessionID(sessions []proto.Session) map[string]int { + out := make(map[string]int, len(sessions)) + for _, s := range sessions { + out[s.ID] = s.AttachedClients + } + return out +} + +// TestSessionListIncludesAttachedClients walks two sessions through +// the same lifecycle covered by TestAttachedClients_BasicLifecycle in +// the backend package, but observed at the handler boundary. +func TestSessionListIncludesAttachedClients(t *testing.T) { + t.Parallel() + c, ws := buildMultiSessionWorkspace(t, "S1", "S2") + + // No attached clients yet. + counts := countsBySessionID(listSessions(t, c, ws.ID)) + require.Equal(t, 0, counts["S1"]) + require.Equal(t, 0, counts["S2"]) + + // Attach A, set to S1: S1=1. + cidA := uuid.New().String() + require.NoError(t, c.backend.AttachClient(ws.ID, cidA)) + t.Cleanup(func() { c.backend.DetachClient(ws.ID, cidA) }) + require.NoError(t, c.backend.SetCurrentSession(ws.ID, cidA, "S1")) + counts = countsBySessionID(listSessions(t, c, ws.ID)) + require.Equal(t, 1, counts["S1"]) + require.Equal(t, 0, counts["S2"]) + + // Attach B, set to S1: S1=2. + cidB := uuid.New().String() + require.NoError(t, c.backend.AttachClient(ws.ID, cidB)) + require.NoError(t, c.backend.SetCurrentSession(ws.ID, cidB, "S1")) + counts = countsBySessionID(listSessions(t, c, ws.ID)) + require.Equal(t, 2, counts["S1"]) + require.Equal(t, 0, counts["S2"]) + + // B switches to S2: counts redistribute. + require.NoError(t, c.backend.SetCurrentSession(ws.ID, cidB, "S2")) + counts = countsBySessionID(listSessions(t, c, ws.ID)) + require.Equal(t, 1, counts["S1"]) + require.Equal(t, 1, counts["S2"]) + + // B detaches: S2 drops to 0. + c.backend.DetachClient(ws.ID, cidB) + counts = countsBySessionID(listSessions(t, c, ws.ID)) + require.Equal(t, 1, counts["S1"]) + require.Equal(t, 0, counts["S2"]) +} + +// TestSessionListExcludesHoldOnlyClient verifies that a registered +// client without an SSE stream (streams == 0) does not contribute to +// AttachedClients, even though it has an entry in the workspace's +// clients map. +func TestSessionListExcludesHoldOnlyClient(t *testing.T) { + t.Parallel() + c, ws := buildMultiSessionWorkspace(t, "S1") + + cid := uuid.New().String() + require.NoError(t, backend.RegisterClientForTesting(c.backend, ws, cid)) + t.Cleanup(func() { _ = c.backend.DeleteWorkspace(ws.ID, cid) }) + + counts := countsBySessionID(listSessions(t, c, ws.ID)) + require.Equal(t, 0, counts["S1"], "hold-only client must not be counted") +} + +// TestSessionListExcludesUnselectedAttachedClient verifies that a +// client with a live SSE stream but no current session +// (currentSessionID == "") does not show up under any session's count. +func TestSessionListExcludesUnselectedAttachedClient(t *testing.T) { + t.Parallel() + c, ws := buildMultiSessionWorkspace(t, "S1") + + cid := uuid.New().String() + require.NoError(t, c.backend.AttachClient(ws.ID, cid)) + t.Cleanup(func() { c.backend.DetachClient(ws.ID, cid) }) + // Intentionally do NOT call SetCurrentSession. + + counts := countsBySessionID(listSessions(t, c, ws.ID)) + require.Equal(t, 0, counts["S1"], + "attached client with no current session must not contribute to S1") +} + +// TestSessionGetIncludesAttachedClients verifies the single-session +// handler also populates AttachedClients. +func TestSessionGetIncludesAttachedClients(t *testing.T) { + t.Parallel() + c, ws := buildMultiSessionWorkspace(t, "S1") + + cid := uuid.New().String() + require.NoError(t, c.backend.AttachClient(ws.ID, cid)) + t.Cleanup(func() { c.backend.DetachClient(ws.ID, cid) }) + require.NoError(t, c.backend.SetCurrentSession(ws.ID, cid, "S1")) + + req := httptest.NewRequestWithContext(t.Context(), http.MethodGet, + "/v1/workspaces/"+ws.ID+"/sessions/S1", nil) + req.SetPathValue("id", ws.ID) + req.SetPathValue("sid", "S1") + rec := httptest.NewRecorder() + c.handleGetWorkspaceSession(rec, req) + require.Equal(t, http.StatusOK, rec.Code) + + var got proto.Session + require.NoError(t, json.Unmarshal(rec.Body.Bytes(), &got)) + require.Equal(t, 1, got.AttachedClients) +}