diff --git a/internal/agent/agent.go b/internal/agent/agent.go index 53e63af3b95e8bb4fba6144675d97c3686e78546..ef37f164d711c608916e98dc15ddedcaf1694033 100644 --- a/internal/agent/agent.go +++ b/internal/agent/agent.go @@ -305,7 +305,13 @@ func (a *sessionAgent) Run(ctx context.Context, call SessionAgentCall) (result * // (the pubsub broker fan-in does not serialize publishes from // different upstream brokers). defer func() { - if flushErr := a.messages.FlushAll(ctx); flushErr != nil { + // Use a context detached from the run context: workspace + // shutdown cancels ctx before this goroutine returns, but the + // buffered streaming deltas must still land before the DB is + // closed. A short timeout bounds the flush. + flushCtx, flushCancel := context.WithTimeout(context.WithoutCancel(ctx), 5*time.Second) + defer flushCancel() + if flushErr := a.messages.FlushAll(flushCtx); flushErr != nil { slog.Error("Failed to flush pending message updates after run", "error", flushErr) } if skipRunComplete { @@ -577,11 +583,20 @@ func (a *sessionAgent) Run(ctx context.Context, call SessionAgentCall) (result * if currentAssistant == nil { return result, err } + // Persist final state with a context detached from the run + // context. The run context (ctx) is derived from the + // workspace context, which workspace shutdown cancels before + // agent goroutines finish; using ctx here would drop the + // final assistant state. WithoutCancel keeps the values + // (e.g. session ID) while ignoring cancellation, and a short + // timeout bounds the cleanup writes. + cleanupCtx, cleanupCancel := context.WithTimeout(context.WithoutCancel(ctx), 5*time.Second) + defer cleanupCancel() // Ensure we finish thinking on error to close the reasoning state. currentAssistant.FinishThinking() toolCalls := currentAssistant.ToolCalls() - // INFO: we use the parent context here because the genCtx has been cancelled. - msgs, createErr := a.messages.List(ctx, currentAssistant.SessionID) + // INFO: we use the cleanup context here because the genCtx has been cancelled. + msgs, createErr := a.messages.List(cleanupCtx, currentAssistant.SessionID) if createErr != nil { return nil, createErr } @@ -590,7 +605,7 @@ func (a *sessionAgent) Run(ctx context.Context, call SessionAgentCall) (result * tc.Finished = true tc.Input = "{}" currentAssistant.AddToolCall(tc) - updateErr := a.messages.Update(ctx, *currentAssistant) + updateErr := a.messages.Update(cleanupCtx, *currentAssistant) if updateErr != nil { return nil, updateErr } @@ -623,7 +638,7 @@ func (a *sessionAgent) Run(ctx context.Context, call SessionAgentCall) (result * Content: content, IsError: true, } - _, createErr = a.messages.Create(ctx, currentAssistant.SessionID, message.CreateMessageParams{ + _, createErr = a.messages.Create(cleanupCtx, currentAssistant.SessionID, message.CreateMessageParams{ Role: message.Tool, Parts: []message.ContentPart{ toolResult, @@ -670,9 +685,9 @@ func (a *sessionAgent) Run(ctx context.Context, call SessionAgentCall) (result * } else { currentAssistant.AddFinish(message.FinishReasonError, defaultTitle, err.Error()) } - // Note: we use the parent context here because the genCtx has been + // Note: we use the cleanup context here because the genCtx has been // cancelled. - updateErr := a.messages.Update(ctx, *currentAssistant) + updateErr := a.messages.Update(cleanupCtx, *currentAssistant) if updateErr != nil { return nil, updateErr } diff --git a/internal/backend/backend.go b/internal/backend/backend.go index 9c0f47f7ab8dfd48f90a31004d637dd6e54fd912..2ea24a86c71fbc7c4f58c5de680aeb83d421584b 100644 --- a/internal/backend/backend.go +++ b/internal/backend/backend.go @@ -34,6 +34,7 @@ var ( ErrUnknownCommand = errors.New("unknown command") ErrInvalidClientID = errors.New("invalid client_id") ErrClientNotAttached = errors.New("client not attached") + ErrWorkspaceClosing = errors.New("workspace closing") ) // DefaultCreateGrace is the window in which a client must open an SSE @@ -108,6 +109,23 @@ type Workspace struct { // with fallback to the cleaned absolute path. resolvedPath string + // ctx is the workspace-scoped run context. It is derived from + // the backend context in CreateWorkspace and lives for the + // lifetime of the workspace; cancel tears it down. Agent runs + // dispatched on behalf of this workspace are bound to ctx so + // their lifetime is owned by the workspace, not by any single + // client's HTTP request. + ctx context.Context + cancel context.CancelFunc + + // runMu guards closing and gates dispatch of new agent runs. + // closing is set by Shutdown so no new runs are accepted once + // teardown has begun. runWG tracks dispatched agent goroutines + // so Shutdown can wait for them to return before app cleanup. + runMu sync.Mutex + closing bool + runWG sync.WaitGroup + // clientsMu guards clients. It is held only briefly (no IO). clientsMu sync.Mutex // clients tracks each client's claim on this workspace. Refcount @@ -122,7 +140,7 @@ type Workspace struct { } // invokeShutdown calls the workspace shutdown hook if set, falling -// back to the embedded [app.App.Shutdown] when not. +// back to the workspace [Workspace.Shutdown] wrapper when not. func (w *Workspace) invokeShutdown() { if w.shutdownFn != nil { w.shutdownFn() @@ -133,6 +151,40 @@ func (w *Workspace) invokeShutdown() { } } +// Shutdown tears the workspace down in an order that is safe for +// agent runs whose lifetime is bound to the workspace context. It +// shadows the promoted [app.App.Shutdown] so callers reaching +// ws.Shutdown() always observe this ordering: +// +// 1. Mark the workspace closing so no new agent runs are accepted. +// 2. Cancel the workspace run context so any dispatched goroutine +// that has not yet registered its per-session cancel still +// observes cancellation. +// 3. Cancel active coordinator work for runs that already +// registered their per-session cancel function. +// 4. Wait for dispatched agent goroutines to return. +// 5. Run the embedded [app.App.Shutdown] cleanup (DB, LSP, etc). +// +// CancelAll is idempotent, so the second call inside app.App.Shutdown +// is harmless; the important guarantee is that cancel -> CancelAll -> +// runWG.Wait completes before the embedded cleanup touches the DB. +func (w *Workspace) Shutdown() { + w.runMu.Lock() + w.closing = true + w.runMu.Unlock() + + if w.cancel != nil { + w.cancel() + } + if w.App != nil && w.AgentCoordinator != nil { + w.AgentCoordinator.CancelAll() + } + w.runWG.Wait() + if w.App != nil { + w.App.Shutdown() + } +} + // New creates a new [Backend]. func New(ctx context.Context, cfg *config.ConfigStore, shutdownFn ShutdownFunc) *Backend { return &Backend{ @@ -247,6 +299,7 @@ func (b *Backend) CreateWorkspace(args proto.Workspace) (*Workspace, proto.Works return nil, proto.Workspace{}, fmt.Errorf("failed to create app workspace: %w", err) } + wsCtx, wsCancel := context.WithCancel(b.ctx) ws := &Workspace{ App: appWorkspace, ID: id, @@ -255,6 +308,8 @@ func (b *Backend) CreateWorkspace(args proto.Workspace) (*Workspace, proto.Works Env: args.Env, Skills: skillsMgr, resolvedPath: key, + ctx: wsCtx, + cancel: wsCancel, clients: make(map[string]*clientState), }