Detailed changes
@@ -507,6 +507,36 @@ func (b *Backend) SetCurrentSession(workspaceID, clientID, sessionID string) err
return nil
}
+// AttachedClients returns the number of clients currently viewing
+// sessionID in the given workspace. Only clients with at least one live
+// SSE stream (streams > 0) AND a matching currentSessionID are counted;
+// pure creation holds do not contribute. Returns [ErrWorkspaceNotFound]
+// if the workspace is unknown.
+func (b *Backend) AttachedClients(workspaceID, sessionID string) (int, error) {
+ ws, ok := b.workspaces.Get(workspaceID)
+ if !ok {
+ return 0, ErrWorkspaceNotFound
+ }
+ return ws.AttachedClientsForSession(sessionID), nil
+}
+
+// AttachedClientsForSession returns the number of clients in this
+// workspace whose currentSessionID equals sessionID and which have at
+// least one live SSE stream. Hold-only clients (streams == 0) do not
+// contribute. Acquires the workspace's [clientsMu] briefly; the
+// returned count is a point-in-time snapshot.
+func (w *Workspace) AttachedClientsForSession(sessionID string) int {
+ w.clientsMu.Lock()
+ defer w.clientsMu.Unlock()
+ n := 0
+ for _, cs := range w.clients {
+ if cs.streams > 0 && cs.currentSessionID == sessionID {
+ n++
+ }
+ }
+ return n
+}
+
// GetWorkspaceProto returns the proto representation of a workspace.
func (b *Backend) GetWorkspaceProto(id string) (proto.Workspace, error) {
ws, err := b.GetWorkspace(id)
@@ -1132,3 +1132,91 @@ func TestSetCurrentSession_RaceWithDetach(t *testing.T) {
require.Contains(t, ws.clients, cidB, "remaining client must still be present")
require.Equal(t, "SB", ws.clients[cidB].currentSessionID, "remaining client must keep its last set session")
}
+
+// TestAttachedClients_BasicLifecycle walks one session's count through
+// attach -> set -> second client joins -> switch -> detach. It also
+// confirms hold-only and unselected clients do not contribute.
+func TestAttachedClients_BasicLifecycle(t *testing.T) {
+ t.Parallel()
+
+ b, _ := newTestBackend(t)
+ // Keep the grace window long so the hold-only client survives.
+ b.createGrace = time.Hour
+ ws, _ := insertTestWorkspace(t, b, "/tmp/attached-clients-basic")
+
+ // No clients yet.
+ n, err := b.AttachedClients(ws.ID, "S1")
+ require.NoError(t, err)
+ require.Zero(t, n)
+
+ // Attach A, set to S1. Count for S1 is 1; count for S2 is 0.
+ cidA := newClientID(t)
+ require.NoError(t, b.AttachClient(ws.ID, cidA))
+ require.NoError(t, b.SetCurrentSession(ws.ID, cidA, "S1"))
+
+ n, err = b.AttachedClients(ws.ID, "S1")
+ require.NoError(t, err)
+ require.Equal(t, 1, n)
+ n, err = b.AttachedClients(ws.ID, "S2")
+ require.NoError(t, err)
+ require.Zero(t, n)
+
+ // Attach B, set to S1. Count for S1 is 2.
+ cidB := newClientID(t)
+ require.NoError(t, b.AttachClient(ws.ID, cidB))
+ require.NoError(t, b.SetCurrentSession(ws.ID, cidB, "S1"))
+
+ n, _ = b.AttachedClients(ws.ID, "S1")
+ require.Equal(t, 2, n)
+
+ // B switches to S2; counts redistribute.
+ require.NoError(t, b.SetCurrentSession(ws.ID, cidB, "S2"))
+ n, _ = b.AttachedClients(ws.ID, "S1")
+ require.Equal(t, 1, n)
+ n, _ = b.AttachedClients(ws.ID, "S2")
+ require.Equal(t, 1, n)
+
+ // A hold-only client must NOT be counted, even if we were able to
+ // imagine a currentSessionID on it. registerClient leaves
+ // currentSessionID empty by construction, and SetCurrentSession
+ // rejects hold-only writers — so the contract holds two ways.
+ cidHold := newClientID(t)
+ b.registerClient(ws, cidHold)
+ t.Cleanup(func() { _ = b.releaseHold(ws.ID, cidHold) })
+ n, _ = b.AttachedClients(ws.ID, "S1")
+ require.Equal(t, 1, n, "hold-only client must not contribute")
+ n, _ = b.AttachedClients(ws.ID, "")
+ require.Equal(t, 0, n,
+ "empty sessionID must not match the hold-only entry (streams==0)")
+
+ // A client with streams > 0 but currentSessionID == "" is NOT
+ // counted toward any non-empty session, and is matched only
+ // against the empty session id (which represents the landing
+ // screen).
+ cidC := newClientID(t)
+ require.NoError(t, b.AttachClient(ws.ID, cidC))
+ n, _ = b.AttachedClients(ws.ID, "S1")
+ require.Equal(t, 1, n, "stream-only client with empty currentSessionID must not be counted toward S1")
+ n, _ = b.AttachedClients(ws.ID, "")
+ require.Equal(t, 1, n, "stream-only client with empty currentSessionID matches the empty session id")
+
+ // B detaches: count for S2 drops to 0.
+ b.DetachClient(ws.ID, cidB)
+ n, _ = b.AttachedClients(ws.ID, "S2")
+ require.Zero(t, n)
+ n, _ = b.AttachedClients(ws.ID, "S1")
+ require.Equal(t, 1, n, "A still on S1")
+
+ // Final cleanup.
+ b.DetachClient(ws.ID, cidA)
+ b.DetachClient(ws.ID, cidC)
+}
+
+// TestAttachedClients_UnknownWorkspace verifies the error surface.
+func TestAttachedClients_UnknownWorkspace(t *testing.T) {
+ t.Parallel()
+
+ b, _ := newTestBackend(t)
+ _, err := b.AttachedClients("00000000-0000-0000-0000-000000000000", "S1")
+ require.ErrorIs(t, err, ErrWorkspaceNotFound)
+}
@@ -30,3 +30,11 @@ func RegisterClientForTesting(b *Backend, ws *Workspace, clientID string) error
b.registerClient(ws, clientID)
return nil
}
+
+// SetWorkspaceShutdownFnForTest overrides the workspace teardown
+// callback. Useful for tests in other packages that drive synthetic
+// workspaces (where the embedded [app.App] is incomplete) through
+// detach paths that would otherwise crash inside App.Shutdown.
+func SetWorkspaceShutdownFnForTest(ws *Workspace, fn func()) {
+ ws.shutdownFn = fn
+}
@@ -7,6 +7,12 @@ package proto
// It is populated by REST handlers in internal/server/proto.go from the
// workspace's AgentCoordinator. The Session SSE event path does not set
// it, since SSE consumers can compute presence from other agent signals.
+//
+// AttachedClients counts the number of clients currently viewing this
+// session — i.e. entries in the workspace's clients map whose
+// currentSessionID equals this session's ID and which have at least one
+// live SSE stream. Hold-only clients (streams == 0) do not contribute.
+// Like IsBusy, it is computed on read by REST handlers.
type Session struct {
ID string `json:"id"`
ParentSessionID string `json:"parent_session_id"`
@@ -20,6 +26,7 @@ type Session struct {
CreatedAt int64 `json:"created_at"`
UpdatedAt int64 `json:"updated_at"`
IsBusy bool `json:"is_busy"`
+ AttachedClients int `json:"attached_clients"`
}
// Todo represents a single todo entry on a session in the proto layer.
@@ -156,6 +156,17 @@ func isSessionBusy(ws *backend.Workspace, sessionID string) bool {
return ws.AgentCoordinator.IsSessionBusy(sessionID)
}
+// attachedClients returns the number of clients currently viewing
+// sessionID in ws. Hold-only clients (streams == 0) do not contribute.
+// A nil workspace is treated as zero so handlers can pass GetWorkspace's
+// result through without an extra guard.
+func attachedClients(ws *backend.Workspace, sessionID string) int {
+ if ws == nil {
+ return 0
+ }
+ return ws.AttachedClientsForSession(sessionID)
+}
+
func todosToProto(todos []session.Todo) []proto.Todo {
if len(todos) == 0 {
return nil
@@ -381,6 +381,7 @@ func (c *controllerV1) handleGetWorkspaceSessions(w http.ResponseWriter, r *http
for i, s := range sessions {
result[i] = sessionToProto(s)
result[i].IsBusy = isSessionBusy(ws, s.ID)
+ result[i].AttachedClients = attachedClients(ws, s.ID)
}
jsonEncode(w, result)
}
@@ -416,6 +417,7 @@ func (c *controllerV1) handlePostWorkspaceSessions(w http.ResponseWriter, r *htt
ws, _ := c.backend.GetWorkspace(id)
out := sessionToProto(sess)
out.IsBusy = isSessionBusy(ws, sess.ID)
+ out.AttachedClients = attachedClients(ws, sess.ID)
jsonEncode(w, out)
}
@@ -441,6 +443,7 @@ func (c *controllerV1) handleGetWorkspaceSession(w http.ResponseWriter, r *http.
ws, _ := c.backend.GetWorkspace(id)
out := sessionToProto(sess)
out.IsBusy = isSessionBusy(ws, sess.ID)
+ out.AttachedClients = attachedClients(ws, sess.ID)
jsonEncode(w, out)
}
@@ -520,6 +523,7 @@ func (c *controllerV1) handlePutWorkspaceSession(w http.ResponseWriter, r *http.
ws, _ := c.backend.GetWorkspace(id)
out := sessionToProto(saved)
out.IsBusy = isSessionBusy(ws, saved.ID)
+ out.AttachedClients = attachedClients(ws, saved.ID)
jsonEncode(w, out)
}
@@ -174,3 +174,152 @@ func TestProtoSessionIsBusyBackwardCompat(t *testing.T) {
require.Equal(t, "s1", old.ID)
require.Equal(t, "t", old.Title)
}
+
+// buildMultiSessionWorkspace returns a controller wired to a backend
+// that owns a workspace with the given session IDs. Used to exercise
+// AttachedClients counts across more than one session.
+func buildMultiSessionWorkspace(t *testing.T, sessionIDs ...string) (*controllerV1, *backend.Workspace) {
+ t.Helper()
+
+ b := backend.New(context.Background(), nil, nil)
+ a := &app.App{AgentCoordinator: &stubCoordinator{}}
+ sessions := make([]session.Session, len(sessionIDs))
+ for i, sid := range sessionIDs {
+ sessions[i] = session.Session{ID: sid, Title: sid}
+ }
+ a.Sessions = &stubSessions{all: sessions}
+
+ ws := &backend.Workspace{
+ ID: uuid.New().String(),
+ Path: t.TempDir(),
+ App: a,
+ }
+ backend.InsertWorkspaceForTest(b, ws)
+ // Synthetic workspaces have an incomplete App; bypass the
+ // default teardown to avoid panics when the last client detaches.
+ backend.SetWorkspaceShutdownFnForTest(ws, func() {})
+
+ s := &Server{backend: b}
+ return &controllerV1{backend: b, server: s}, ws
+}
+
+// listSessions invokes handleGetWorkspaceSessions and returns the
+// decoded response so tests can assert per-session counts.
+func listSessions(t *testing.T, c *controllerV1, wsID string) []proto.Session {
+ t.Helper()
+ req := httptest.NewRequestWithContext(t.Context(), http.MethodGet, "/v1/workspaces/"+wsID+"/sessions", nil)
+ req.SetPathValue("id", wsID)
+ rec := httptest.NewRecorder()
+ c.handleGetWorkspaceSessions(rec, req)
+ require.Equal(t, http.StatusOK, rec.Code)
+ var got []proto.Session
+ require.NoError(t, json.Unmarshal(rec.Body.Bytes(), &got))
+ return got
+}
+
+func countsBySessionID(sessions []proto.Session) map[string]int {
+ out := make(map[string]int, len(sessions))
+ for _, s := range sessions {
+ out[s.ID] = s.AttachedClients
+ }
+ return out
+}
+
+// TestSessionListIncludesAttachedClients walks two sessions through
+// the same lifecycle covered by TestAttachedClients_BasicLifecycle in
+// the backend package, but observed at the handler boundary.
+func TestSessionListIncludesAttachedClients(t *testing.T) {
+ t.Parallel()
+ c, ws := buildMultiSessionWorkspace(t, "S1", "S2")
+
+ // No attached clients yet.
+ counts := countsBySessionID(listSessions(t, c, ws.ID))
+ require.Equal(t, 0, counts["S1"])
+ require.Equal(t, 0, counts["S2"])
+
+ // Attach A, set to S1: S1=1.
+ cidA := uuid.New().String()
+ require.NoError(t, c.backend.AttachClient(ws.ID, cidA))
+ t.Cleanup(func() { c.backend.DetachClient(ws.ID, cidA) })
+ require.NoError(t, c.backend.SetCurrentSession(ws.ID, cidA, "S1"))
+ counts = countsBySessionID(listSessions(t, c, ws.ID))
+ require.Equal(t, 1, counts["S1"])
+ require.Equal(t, 0, counts["S2"])
+
+ // Attach B, set to S1: S1=2.
+ cidB := uuid.New().String()
+ require.NoError(t, c.backend.AttachClient(ws.ID, cidB))
+ require.NoError(t, c.backend.SetCurrentSession(ws.ID, cidB, "S1"))
+ counts = countsBySessionID(listSessions(t, c, ws.ID))
+ require.Equal(t, 2, counts["S1"])
+ require.Equal(t, 0, counts["S2"])
+
+ // B switches to S2: counts redistribute.
+ require.NoError(t, c.backend.SetCurrentSession(ws.ID, cidB, "S2"))
+ counts = countsBySessionID(listSessions(t, c, ws.ID))
+ require.Equal(t, 1, counts["S1"])
+ require.Equal(t, 1, counts["S2"])
+
+ // B detaches: S2 drops to 0.
+ c.backend.DetachClient(ws.ID, cidB)
+ counts = countsBySessionID(listSessions(t, c, ws.ID))
+ require.Equal(t, 1, counts["S1"])
+ require.Equal(t, 0, counts["S2"])
+}
+
+// TestSessionListExcludesHoldOnlyClient verifies that a registered
+// client without an SSE stream (streams == 0) does not contribute to
+// AttachedClients, even though it has an entry in the workspace's
+// clients map.
+func TestSessionListExcludesHoldOnlyClient(t *testing.T) {
+ t.Parallel()
+ c, ws := buildMultiSessionWorkspace(t, "S1")
+
+ cid := uuid.New().String()
+ require.NoError(t, backend.RegisterClientForTesting(c.backend, ws, cid))
+ t.Cleanup(func() { _ = c.backend.DeleteWorkspace(ws.ID, cid) })
+
+ counts := countsBySessionID(listSessions(t, c, ws.ID))
+ require.Equal(t, 0, counts["S1"], "hold-only client must not be counted")
+}
+
+// TestSessionListExcludesUnselectedAttachedClient verifies that a
+// client with a live SSE stream but no current session
+// (currentSessionID == "") does not show up under any session's count.
+func TestSessionListExcludesUnselectedAttachedClient(t *testing.T) {
+ t.Parallel()
+ c, ws := buildMultiSessionWorkspace(t, "S1")
+
+ cid := uuid.New().String()
+ require.NoError(t, c.backend.AttachClient(ws.ID, cid))
+ t.Cleanup(func() { c.backend.DetachClient(ws.ID, cid) })
+ // Intentionally do NOT call SetCurrentSession.
+
+ counts := countsBySessionID(listSessions(t, c, ws.ID))
+ require.Equal(t, 0, counts["S1"],
+ "attached client with no current session must not contribute to S1")
+}
+
+// TestSessionGetIncludesAttachedClients verifies the single-session
+// handler also populates AttachedClients.
+func TestSessionGetIncludesAttachedClients(t *testing.T) {
+ t.Parallel()
+ c, ws := buildMultiSessionWorkspace(t, "S1")
+
+ cid := uuid.New().String()
+ require.NoError(t, c.backend.AttachClient(ws.ID, cid))
+ t.Cleanup(func() { c.backend.DetachClient(ws.ID, cid) })
+ require.NoError(t, c.backend.SetCurrentSession(ws.ID, cid, "S1"))
+
+ req := httptest.NewRequestWithContext(t.Context(), http.MethodGet,
+ "/v1/workspaces/"+ws.ID+"/sessions/S1", nil)
+ req.SetPathValue("id", ws.ID)
+ req.SetPathValue("sid", "S1")
+ rec := httptest.NewRecorder()
+ c.handleGetWorkspaceSession(rec, req)
+ require.Equal(t, http.StatusOK, rec.Code)
+
+ var got proto.Session
+ require.NoError(t, json.Unmarshal(rec.Body.Bytes(), &got))
+ require.Equal(t, 1, got.AttachedClients)
+}