@@ -305,7 +305,13 @@ func (a *sessionAgent) Run(ctx context.Context, call SessionAgentCall) (result *
// (the pubsub broker fan-in does not serialize publishes from
// different upstream brokers).
defer func() {
- if flushErr := a.messages.FlushAll(ctx); flushErr != nil {
+ // Use a context detached from the run context: workspace
+ // shutdown cancels ctx before this goroutine returns, but the
+ // buffered streaming deltas must still land before the DB is
+ // closed. A short timeout bounds the flush.
+ flushCtx, flushCancel := context.WithTimeout(context.WithoutCancel(ctx), 5*time.Second)
+ defer flushCancel()
+ if flushErr := a.messages.FlushAll(flushCtx); flushErr != nil {
slog.Error("Failed to flush pending message updates after run", "error", flushErr)
}
if skipRunComplete {
@@ -577,11 +583,20 @@ func (a *sessionAgent) Run(ctx context.Context, call SessionAgentCall) (result *
if currentAssistant == nil {
return result, err
}
+ // Persist final state with a context detached from the run
+ // context. The run context (ctx) is derived from the
+ // workspace context, which workspace shutdown cancels before
+ // agent goroutines finish; using ctx here would drop the
+ // final assistant state. WithoutCancel keeps the values
+ // (e.g. session ID) while ignoring cancellation, and a short
+ // timeout bounds the cleanup writes.
+ cleanupCtx, cleanupCancel := context.WithTimeout(context.WithoutCancel(ctx), 5*time.Second)
+ defer cleanupCancel()
// Ensure we finish thinking on error to close the reasoning state.
currentAssistant.FinishThinking()
toolCalls := currentAssistant.ToolCalls()
- // INFO: we use the parent context here because the genCtx has been cancelled.
- msgs, createErr := a.messages.List(ctx, currentAssistant.SessionID)
+ // INFO: we use the cleanup context here because the genCtx has been cancelled.
+ msgs, createErr := a.messages.List(cleanupCtx, currentAssistant.SessionID)
if createErr != nil {
return nil, createErr
}
@@ -590,7 +605,7 @@ func (a *sessionAgent) Run(ctx context.Context, call SessionAgentCall) (result *
tc.Finished = true
tc.Input = "{}"
currentAssistant.AddToolCall(tc)
- updateErr := a.messages.Update(ctx, *currentAssistant)
+ updateErr := a.messages.Update(cleanupCtx, *currentAssistant)
if updateErr != nil {
return nil, updateErr
}
@@ -623,7 +638,7 @@ func (a *sessionAgent) Run(ctx context.Context, call SessionAgentCall) (result *
Content: content,
IsError: true,
}
- _, createErr = a.messages.Create(ctx, currentAssistant.SessionID, message.CreateMessageParams{
+ _, createErr = a.messages.Create(cleanupCtx, currentAssistant.SessionID, message.CreateMessageParams{
Role: message.Tool,
Parts: []message.ContentPart{
toolResult,
@@ -670,9 +685,9 @@ func (a *sessionAgent) Run(ctx context.Context, call SessionAgentCall) (result *
} else {
currentAssistant.AddFinish(message.FinishReasonError, defaultTitle, err.Error())
}
- // Note: we use the parent context here because the genCtx has been
+ // Note: we use the cleanup context here because the genCtx has been
// cancelled.
- updateErr := a.messages.Update(ctx, *currentAssistant)
+ updateErr := a.messages.Update(cleanupCtx, *currentAssistant)
if updateErr != nil {
return nil, updateErr
}
@@ -34,6 +34,7 @@ var (
ErrUnknownCommand = errors.New("unknown command")
ErrInvalidClientID = errors.New("invalid client_id")
ErrClientNotAttached = errors.New("client not attached")
+ ErrWorkspaceClosing = errors.New("workspace closing")
)
// DefaultCreateGrace is the window in which a client must open an SSE
@@ -108,6 +109,23 @@ type Workspace struct {
// with fallback to the cleaned absolute path.
resolvedPath string
+ // ctx is the workspace-scoped run context. It is derived from
+ // the backend context in CreateWorkspace and lives for the
+ // lifetime of the workspace; cancel tears it down. Agent runs
+ // dispatched on behalf of this workspace are bound to ctx so
+ // their lifetime is owned by the workspace, not by any single
+ // client's HTTP request.
+ ctx context.Context
+ cancel context.CancelFunc
+
+ // runMu guards closing and gates dispatch of new agent runs.
+ // closing is set by Shutdown so no new runs are accepted once
+ // teardown has begun. runWG tracks dispatched agent goroutines
+ // so Shutdown can wait for them to return before app cleanup.
+ runMu sync.Mutex
+ closing bool
+ runWG sync.WaitGroup
+
// clientsMu guards clients. It is held only briefly (no IO).
clientsMu sync.Mutex
// clients tracks each client's claim on this workspace. Refcount
@@ -122,7 +140,7 @@ type Workspace struct {
}
// invokeShutdown calls the workspace shutdown hook if set, falling
-// back to the embedded [app.App.Shutdown] when not.
+// back to the workspace [Workspace.Shutdown] wrapper when not.
func (w *Workspace) invokeShutdown() {
if w.shutdownFn != nil {
w.shutdownFn()
@@ -133,6 +151,40 @@ func (w *Workspace) invokeShutdown() {
}
}
+// Shutdown tears the workspace down in an order that is safe for
+// agent runs whose lifetime is bound to the workspace context. It
+// shadows the promoted [app.App.Shutdown] so callers reaching
+// ws.Shutdown() always observe this ordering:
+//
+// 1. Mark the workspace closing so no new agent runs are accepted.
+// 2. Cancel the workspace run context so any dispatched goroutine
+// that has not yet registered its per-session cancel still
+// observes cancellation.
+// 3. Cancel active coordinator work for runs that already
+// registered their per-session cancel function.
+// 4. Wait for dispatched agent goroutines to return.
+// 5. Run the embedded [app.App.Shutdown] cleanup (DB, LSP, etc).
+//
+// CancelAll is idempotent, so the second call inside app.App.Shutdown
+// is harmless; the important guarantee is that cancel -> CancelAll ->
+// runWG.Wait completes before the embedded cleanup touches the DB.
+func (w *Workspace) Shutdown() {
+ w.runMu.Lock()
+ w.closing = true
+ w.runMu.Unlock()
+
+ if w.cancel != nil {
+ w.cancel()
+ }
+ if w.App != nil && w.AgentCoordinator != nil {
+ w.AgentCoordinator.CancelAll()
+ }
+ w.runWG.Wait()
+ if w.App != nil {
+ w.App.Shutdown()
+ }
+}
+
// New creates a new [Backend].
func New(ctx context.Context, cfg *config.ConfigStore, shutdownFn ShutdownFunc) *Backend {
return &Backend{
@@ -247,6 +299,7 @@ func (b *Backend) CreateWorkspace(args proto.Workspace) (*Workspace, proto.Works
return nil, proto.Workspace{}, fmt.Errorf("failed to create app workspace: %w", err)
}
+ wsCtx, wsCancel := context.WithCancel(b.ctx)
ws := &Workspace{
App: appWorkspace,
ID: id,
@@ -255,6 +308,8 @@ func (b *Backend) CreateWorkspace(args proto.Workspace) (*Workspace, proto.Works
Env: args.Env,
Skills: skillsMgr,
resolvedPath: key,
+ ctx: wsCtx,
+ cancel: wsCancel,
clients: make(map[string]*clientState),
}