From 90b1a2a67bf70a2f5fc1f998ccd84317cab5d68b Mon Sep 17 00:00:00 2001 From: Christian Rocha Date: Sun, 31 May 2026 21:48:57 -0400 Subject: [PATCH] feat(server): make server prompts independent of client connections Give each workspace its own run lifetime and shutdown gate so agent work is not tied to the HTTP request that submitted it. Final state can still be saved during shutdown, avoiding lost completion or error records when a workspace is canceled. Co-Authored-By: Charm Crush --- internal/agent/agent.go | 29 ++++++++++++++----- internal/backend/backend.go | 57 ++++++++++++++++++++++++++++++++++++- 2 files changed, 78 insertions(+), 8 deletions(-) diff --git a/internal/agent/agent.go b/internal/agent/agent.go index 53e63af3b95e8bb4fba6144675d97c3686e78546..ef37f164d711c608916e98dc15ddedcaf1694033 100644 --- a/internal/agent/agent.go +++ b/internal/agent/agent.go @@ -305,7 +305,13 @@ func (a *sessionAgent) Run(ctx context.Context, call SessionAgentCall) (result * // (the pubsub broker fan-in does not serialize publishes from // different upstream brokers). defer func() { - if flushErr := a.messages.FlushAll(ctx); flushErr != nil { + // Use a context detached from the run context: workspace + // shutdown cancels ctx before this goroutine returns, but the + // buffered streaming deltas must still land before the DB is + // closed. A short timeout bounds the flush. + flushCtx, flushCancel := context.WithTimeout(context.WithoutCancel(ctx), 5*time.Second) + defer flushCancel() + if flushErr := a.messages.FlushAll(flushCtx); flushErr != nil { slog.Error("Failed to flush pending message updates after run", "error", flushErr) } if skipRunComplete { @@ -577,11 +583,20 @@ func (a *sessionAgent) Run(ctx context.Context, call SessionAgentCall) (result * if currentAssistant == nil { return result, err } + // Persist final state with a context detached from the run + // context. The run context (ctx) is derived from the + // workspace context, which workspace shutdown cancels before + // agent goroutines finish; using ctx here would drop the + // final assistant state. WithoutCancel keeps the values + // (e.g. session ID) while ignoring cancellation, and a short + // timeout bounds the cleanup writes. + cleanupCtx, cleanupCancel := context.WithTimeout(context.WithoutCancel(ctx), 5*time.Second) + defer cleanupCancel() // Ensure we finish thinking on error to close the reasoning state. currentAssistant.FinishThinking() toolCalls := currentAssistant.ToolCalls() - // INFO: we use the parent context here because the genCtx has been cancelled. - msgs, createErr := a.messages.List(ctx, currentAssistant.SessionID) + // INFO: we use the cleanup context here because the genCtx has been cancelled. + msgs, createErr := a.messages.List(cleanupCtx, currentAssistant.SessionID) if createErr != nil { return nil, createErr } @@ -590,7 +605,7 @@ func (a *sessionAgent) Run(ctx context.Context, call SessionAgentCall) (result * tc.Finished = true tc.Input = "{}" currentAssistant.AddToolCall(tc) - updateErr := a.messages.Update(ctx, *currentAssistant) + updateErr := a.messages.Update(cleanupCtx, *currentAssistant) if updateErr != nil { return nil, updateErr } @@ -623,7 +638,7 @@ func (a *sessionAgent) Run(ctx context.Context, call SessionAgentCall) (result * Content: content, IsError: true, } - _, createErr = a.messages.Create(ctx, currentAssistant.SessionID, message.CreateMessageParams{ + _, createErr = a.messages.Create(cleanupCtx, currentAssistant.SessionID, message.CreateMessageParams{ Role: message.Tool, Parts: []message.ContentPart{ toolResult, @@ -670,9 +685,9 @@ func (a *sessionAgent) Run(ctx context.Context, call SessionAgentCall) (result * } else { currentAssistant.AddFinish(message.FinishReasonError, defaultTitle, err.Error()) } - // Note: we use the parent context here because the genCtx has been + // Note: we use the cleanup context here because the genCtx has been // cancelled. - updateErr := a.messages.Update(ctx, *currentAssistant) + updateErr := a.messages.Update(cleanupCtx, *currentAssistant) if updateErr != nil { return nil, updateErr } diff --git a/internal/backend/backend.go b/internal/backend/backend.go index 9c0f47f7ab8dfd48f90a31004d637dd6e54fd912..2ea24a86c71fbc7c4f58c5de680aeb83d421584b 100644 --- a/internal/backend/backend.go +++ b/internal/backend/backend.go @@ -34,6 +34,7 @@ var ( ErrUnknownCommand = errors.New("unknown command") ErrInvalidClientID = errors.New("invalid client_id") ErrClientNotAttached = errors.New("client not attached") + ErrWorkspaceClosing = errors.New("workspace closing") ) // DefaultCreateGrace is the window in which a client must open an SSE @@ -108,6 +109,23 @@ type Workspace struct { // with fallback to the cleaned absolute path. resolvedPath string + // ctx is the workspace-scoped run context. It is derived from + // the backend context in CreateWorkspace and lives for the + // lifetime of the workspace; cancel tears it down. Agent runs + // dispatched on behalf of this workspace are bound to ctx so + // their lifetime is owned by the workspace, not by any single + // client's HTTP request. + ctx context.Context + cancel context.CancelFunc + + // runMu guards closing and gates dispatch of new agent runs. + // closing is set by Shutdown so no new runs are accepted once + // teardown has begun. runWG tracks dispatched agent goroutines + // so Shutdown can wait for them to return before app cleanup. + runMu sync.Mutex + closing bool + runWG sync.WaitGroup + // clientsMu guards clients. It is held only briefly (no IO). clientsMu sync.Mutex // clients tracks each client's claim on this workspace. Refcount @@ -122,7 +140,7 @@ type Workspace struct { } // invokeShutdown calls the workspace shutdown hook if set, falling -// back to the embedded [app.App.Shutdown] when not. +// back to the workspace [Workspace.Shutdown] wrapper when not. func (w *Workspace) invokeShutdown() { if w.shutdownFn != nil { w.shutdownFn() @@ -133,6 +151,40 @@ func (w *Workspace) invokeShutdown() { } } +// Shutdown tears the workspace down in an order that is safe for +// agent runs whose lifetime is bound to the workspace context. It +// shadows the promoted [app.App.Shutdown] so callers reaching +// ws.Shutdown() always observe this ordering: +// +// 1. Mark the workspace closing so no new agent runs are accepted. +// 2. Cancel the workspace run context so any dispatched goroutine +// that has not yet registered its per-session cancel still +// observes cancellation. +// 3. Cancel active coordinator work for runs that already +// registered their per-session cancel function. +// 4. Wait for dispatched agent goroutines to return. +// 5. Run the embedded [app.App.Shutdown] cleanup (DB, LSP, etc). +// +// CancelAll is idempotent, so the second call inside app.App.Shutdown +// is harmless; the important guarantee is that cancel -> CancelAll -> +// runWG.Wait completes before the embedded cleanup touches the DB. +func (w *Workspace) Shutdown() { + w.runMu.Lock() + w.closing = true + w.runMu.Unlock() + + if w.cancel != nil { + w.cancel() + } + if w.App != nil && w.AgentCoordinator != nil { + w.AgentCoordinator.CancelAll() + } + w.runWG.Wait() + if w.App != nil { + w.App.Shutdown() + } +} + // New creates a new [Backend]. func New(ctx context.Context, cfg *config.ConfigStore, shutdownFn ShutdownFunc) *Backend { return &Backend{ @@ -247,6 +299,7 @@ func (b *Backend) CreateWorkspace(args proto.Workspace) (*Workspace, proto.Works return nil, proto.Workspace{}, fmt.Errorf("failed to create app workspace: %w", err) } + wsCtx, wsCancel := context.WithCancel(b.ctx) ws := &Workspace{ App: appWorkspace, ID: id, @@ -255,6 +308,8 @@ func (b *Backend) CreateWorkspace(args proto.Workspace) (*Workspace, proto.Works Env: args.Env, Skills: skillsMgr, resolvedPath: key, + ctx: wsCtx, + cancel: wsCancel, clients: make(map[string]*clientState), }