diff --git a/internal/agent/agent.go b/internal/agent/agent.go index 6728864527b2e4a7f970262f25c3953168a0b5bc..b901e162c8a3e0571b097735159d044611254616 100644 --- a/internal/agent/agent.go +++ b/internal/agent/agent.go @@ -359,28 +359,96 @@ func (a *sessionAgent) enqueueCall(call SessionAgentCall) { a.messageQueue.Set(call.SessionID, existing) } -// drainUncanceledQueue copies and clears the session's message queue and -// returns the queued calls not covered by a pending cancel. The queue -// copy/delete and the cancel-mark check happen under the per-session -// dispatch mutex so the filtering is atomic against a concurrent Cancel: -// canceledBySeq requires the caller to hold that mutex, and evaluating it -// here (rather than after unlocking) prevents a cancel recorded between -// the drain and the check from being observed inconsistently. The -// returned survivors are processed by the caller without the lock held. -func (a *sessionAgent) drainUncanceledQueue(sessionID string) []SessionAgentCall { +// drainQueueForStep partitions the session's queued calls for the current +// streaming step under the per-session dispatch mutex so the filtering is +// atomic against a concurrent Cancel: canceledBySeq requires the caller to +// hold that mutex, and evaluating it here (rather than after unlocking) +// prevents a cancel recorded between the drain and the check from being +// observed inconsistently. +// +// Calls covered by a pending cancel are dropped; the dropped ones that +// carry a RunID are returned in canceledWithRunID so the caller can +// publish their terminal cancelled RunComplete (a caller waiting on that +// RunID, e.g. `crush run`, would otherwise hang). Uncanceled calls without +// a RunID are returned in fold to be folded into the active turn, +// preserving the existing follow-up behavior. Uncanceled calls that carry +// a RunID are left in the queue so each runs as its own turn via the +// recursive run path and publishes its own RunComplete, giving every +// RunID-bearing prompt an explicit lifecycle instead of being silently +// absorbed into another turn. fold is processed by the caller without the +// lock held. +func (a *sessionAgent) drainQueueForStep(sessionID string) (fold, canceledWithRunID []SessionAgentCall) { dispatchLock := a.sessionMu(sessionID) dispatchLock.Lock() defer dispatchLock.Unlock() queuedCalls, _ := a.messageQueue.Get(sessionID) - a.messageQueue.Del(sessionID) - survivors := queuedCalls[:0] + var keep []SessionAgentCall for _, queued := range queuedCalls { if a.canceledBySeq(sessionID, queued.acceptSeq) { + if queued.RunID != "" { + canceledWithRunID = append(canceledWithRunID, queued) + } + continue + } + if queued.RunID != "" { + keep = append(keep, queued) + continue + } + fold = append(fold, queued) + } + if len(keep) == 0 { + a.messageQueue.Del(sessionID) + } else { + a.messageQueue.Set(sessionID, keep) + } + return fold, canceledWithRunID +} + +// publishCanceledQueueDrops emits a terminal cancelled RunComplete for +// every dropped queued call that carries a RunID. A queued prompt removed +// from the queue without ever running — covered by a pending cancel, or +// cleared by Cancel/ClearQueue — would otherwise leave a caller blocked on +// that RunID: `crush run` ignores live message events and exits only on a +// RunComplete whose RunID matches. Calls without a RunID had no such waiter +// and are dropped silently as before. A detached, bounded context keeps the +// must-deliver publish alive even when the run context that triggered the +// drop is already canceled. +func (a *sessionAgent) publishCanceledQueueDrops(drops []SessionAgentCall) { + var hasRunID bool + for _, d := range drops { + if d.RunID != "" { + hasRunID = true + break + } + } + if !hasRunID { + return + } + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + for _, d := range drops { + if d.RunID == "" { continue } - survivors = append(survivors, queued) + a.publishRunComplete(ctx, d, notify.RunComplete{ + SessionID: d.SessionID, + RunID: d.RunID, + Cancelled: true, + }) } - return survivors +} + +// clearQueueAndNotify removes all queued prompts for the session and +// publishes a terminal cancelled RunComplete for any that carried a RunID, +// so callers waiting on those RunIDs (e.g. `crush run`) are not left +// hanging when their queued prompt is discarded without running. +func (a *sessionAgent) clearQueueAndNotify(sessionID string) { + queued, ok := a.messageQueue.Get(sessionID) + a.messageQueue.Del(sessionID) + if !ok { + return + } + a.publishCanceledQueueDrops(queued) } // clearPendingCancel removes any pending-cancel mark for sessionID. It @@ -735,14 +803,20 @@ func (a *sessionAgent) Run(ctx context.Context, call SessionAgentCall) (result * // Use latest tools (updated by SetTools when MCP tools change). prepared.Tools = a.tools.Copy() - // Drain queued follow-up prompts, but skip any covered by a - // cancel recorded while they sat in the queue: a cancel that - // arrived after a prompt was queued must not let it run as - // part of this step. Coverage is per-call by accept sequence - // so a follow-up queued after the cancel (higher seq) is - // still folded in. - survivors := a.drainUncanceledQueue(call.SessionID) - for _, queued := range survivors { + // Drain queued follow-up prompts for this step. Calls covered + // by a cancel recorded while they sat in the queue are dropped: + // a cancel that arrived after a prompt was queued must not let + // it run as part of this step. Coverage is per-call by accept + // sequence so a follow-up queued after the cancel (higher seq) + // is not dropped. A dropped prompt carrying a RunID still gets + // its terminal cancelled RunComplete so a caller waiting on it + // does not hang. Uncanceled prompts without a RunID are folded + // into this turn; uncanceled prompts with a RunID are left + // queued so each runs as its own turn (with its own + // RunComplete) via the recursive run path below. + fold, canceledRunIDs := a.drainQueueForStep(call.SessionID) + a.publishCanceledQueueDrops(canceledRunIDs) + for _, queued := range fold { userMessage, createErr := a.createUserMessage(callContext, queued) if createErr != nil { return callContext, prepared, createErr @@ -1125,14 +1199,22 @@ func (a *sessionAgent) Run(ctx context.Context, call SessionAgentCall) (result * // mark, or untracked); keep any queued after the cancel (higher // sequence) so they still run. var kept []SessionAgentCall + var canceledRunIDDrops []SessionAgentCall for _, q := range queuedMessages { if q.acceptSeq == 0 || q.acceptSeq <= mark { + if q.RunID != "" { + canceledRunIDDrops = append(canceledRunIDDrops, q) + } continue } kept = append(kept, q) } queuedMessages = kept a.messageQueue.Set(call.SessionID, kept) + // A dropped prompt carrying a RunID must still publish its + // terminal cancelled RunComplete so a caller waiting on that + // RunID does not hang. + a.publishCanceledQueueDrops(canceledRunIDDrops) } if len(queuedMessages) == 0 { // No queued work. Clear the cancel mark only when no accepted @@ -1151,12 +1233,29 @@ func (a *sessionAgent) Run(ctx context.Context, call SessionAgentCall) (result * mu.Unlock() return result, err } - // There are queued messages restart the loop. The recursive Run - // publishes its own RunComplete for the queued prompt, so suppress - // the outer defer's emit to avoid a duplicate event whose Error - // field would belong to the recursive turn but whose MessageID/Text - // would belong to the outer turn. + // There are queued messages, restart the loop. Suppress the outer + // defer's emit: it would otherwise observe the recursive Run's retErr + // (named-return clobbering through the return below) against this + // turn's MessageID/Text and publish a mixed, racing event. skipRunComplete = true + // Decide whether this turn still owes its own terminal RunComplete. + // Each submitted prompt with a RunID has its own lifecycle, so a turn + // that is finished and handing off to a *different* queued prompt must + // publish its own RunComplete here — leaving it to the recursive turn + // (which carries a different RunID) would hang a caller waiting on + // this turn's RunID. The exception is the summarize-continuation path, + // which re-queues this same call (same RunID) to resume after a + // summary; in that case the eventual terminal turn for this RunID + // publishes, so publishing now would double-emit. + outerOwesRunComplete := call.RunID != "" + if outerOwesRunComplete { + for _, q := range queuedMessages { + if q.RunID == call.RunID { + outerOwesRunComplete = false + break + } + } + } firstQueuedMessage := queuedMessages[0] a.messageQueue.Set(call.SessionID, queuedMessages[1:]) // Reserve a fresh accept for the dequeued prompt before dropping the @@ -1167,6 +1266,17 @@ func (a *sessionAgent) Run(ctx context.Context, call SessionAgentCall) (result * // the recursive Run's accepted path observes as cancel-on-entry. firstQueuedMessage.Accepted = a.BeginAccepted(call.SessionID) mu.Unlock() + if outerOwesRunComplete { + complete := notify.RunComplete{SessionID: call.SessionID, RunID: call.RunID} + if currentAssistant != nil { + complete.MessageID = currentAssistant.ID + complete.Text = currentAssistant.Content().String() + } + if ctx.Err() != nil { + complete.Cancelled = true + } + a.publishRunComplete(ctx, call, complete) + } return a.Run(ctx, firstQueuedMessage) } @@ -1770,14 +1880,14 @@ func (a *sessionAgent) Cancel(sessionID string) { if a.QueuedPrompts(sessionID) > 0 { slog.Debug("Clearing queued prompts", "session_id", sessionID) - a.messageQueue.Del(sessionID) + a.clearQueueAndNotify(sessionID) } } func (a *sessionAgent) ClearQueue(sessionID string) { if a.QueuedPrompts(sessionID) > 0 { slog.Debug("Clearing queued prompts", "session_id", sessionID) - a.messageQueue.Del(sessionID) + a.clearQueueAndNotify(sessionID) } } diff --git a/internal/agent/queued_runid_test.go b/internal/agent/queued_runid_test.go new file mode 100644 index 0000000000000000000000000000000000000000..e3e99d4f648e12d5a98be052747841553b1fa8ae --- /dev/null +++ b/internal/agent/queued_runid_test.go @@ -0,0 +1,181 @@ +package agent + +import ( + "context" + "errors" + "sync/atomic" + "testing" + "time" + + "charm.land/catwalk/pkg/catwalk" + "charm.land/fantasy" + "github.com/charmbracelet/crush/internal/agent/notify" + "github.com/charmbracelet/crush/internal/message" + "github.com/charmbracelet/crush/internal/pubsub" + "github.com/stretchr/testify/require" +) + +// gatedStreamModel streams a single text part followed by a clean finish, +// but blocks the very first Stream call until its gate is released. That +// lets a test hold a run "active" (past PrepareStep, inside Stream) just +// long enough to enqueue a follow-up prompt behind the busy session. +// Subsequent Stream calls (e.g. the recursive run draining the queue) +// proceed immediately. +type gatedStreamModel struct { + text string + gate chan struct{} + entered chan struct{} + calls atomic.Int64 +} + +func (m *gatedStreamModel) Provider() string { return "fake" } +func (m *gatedStreamModel) Model() string { return "fake-model" } + +func (m *gatedStreamModel) Generate(ctx context.Context, call fantasy.Call) (*fantasy.Response, error) { + return &fantasy.Response{ + Content: fantasy.ResponseContent{fantasy.TextContent{Text: m.text}}, + FinishReason: fantasy.FinishReasonStop, + }, nil +} + +func (m *gatedStreamModel) Stream(ctx context.Context, call fantasy.Call) (fantasy.StreamResponse, error) { + if m.calls.Add(1) == 1 { + close(m.entered) + select { + case <-m.gate: + case <-ctx.Done(): + } + } + text := m.text + return func(yield func(fantasy.StreamPart) bool) { + if !yield(fantasy.StreamPart{Type: fantasy.StreamPartTypeTextStart, ID: "1"}) { + return + } + if !yield(fantasy.StreamPart{Type: fantasy.StreamPartTypeTextDelta, ID: "1", Delta: text}) { + return + } + if !yield(fantasy.StreamPart{Type: fantasy.StreamPartTypeTextEnd, ID: "1"}) { + return + } + yield(fantasy.StreamPart{Type: fantasy.StreamPartTypeFinish, FinishReason: fantasy.FinishReasonStop}) + }, nil +} + +func (m *gatedStreamModel) GenerateObject(ctx context.Context, call fantasy.ObjectCall) (*fantasy.ObjectResponse, error) { + return nil, errors.New("not implemented") +} + +func (m *gatedStreamModel) StreamObject(ctx context.Context, call fantasy.ObjectCall) (fantasy.ObjectStreamResponse, error) { + return nil, errors.New("not implemented") +} + +// TestRun_QueuedRunIDPromptRunsRecursivelyAndPublishesRunComplete is the +// end-to-end proof of fix 2: a prompt carrying a RunID that is queued +// behind a busy session must NOT be silently folded into the active turn. +// It runs as its own turn via the recursive run path and publishes its +// own terminal RunComplete, so a `crush run` caller blocking on that +// RunID does not hang. The active turn keeps its own RunComplete too. +func TestRun_QueuedRunIDPromptRunsRecursivelyAndPublishesRunComplete(t *testing.T) { + t.Parallel() + + env := testEnv(t) + broker := pubsub.NewBroker[notify.RunComplete]() + t.Cleanup(broker.Shutdown) + + large := &gatedStreamModel{ + text: "done", + gate: make(chan struct{}), + entered: make(chan struct{}), + } + small := &finishStreamModel{text: "title"} + + sa := NewSessionAgent(SessionAgentOptions{ + LargeModel: Model{Model: large, CatwalkCfg: catwalk.Model{ContextWindow: 200000, DefaultMaxTokens: 10000}}, + SmallModel: Model{Model: small, CatwalkCfg: catwalk.Model{ContextWindow: 200000, DefaultMaxTokens: 10000}}, + IsYolo: true, + Sessions: env.sessions, + Messages: env.messages, + RunComplete: broker, + }).(*sessionAgent) + + sess, err := env.sessions.Create(t.Context(), "session") + require.NoError(t, err) + + subCtx, subCancel := context.WithCancel(t.Context()) + defer subCancel() + ch := broker.Subscribe(subCtx) + + // Start the main turn; it blocks inside Stream once active. + mainDone := make(chan error, 1) + go func() { + _, runErr := sa.Run(t.Context(), SessionAgentCall{ + SessionID: sess.ID, + RunID: "run-main", + Prompt: "main", + }) + mainDone <- runErr + }() + + // Wait until the main turn is active (inside Stream). + select { + case <-large.entered: + case <-time.After(5 * time.Second): + t.Fatal("main run never entered Stream") + } + require.True(t, sa.IsSessionBusy(sess.ID), "main run must be active before enqueueing the follow-up") + + // Enqueue a RunID-bearing follow-up behind the busy session. + res, err := sa.Run(t.Context(), SessionAgentCall{ + SessionID: sess.ID, + RunID: "run-follow", + Prompt: "follow", + }) + require.NoError(t, err) + require.Nil(t, res, "a busy-session follow-up must enqueue and return (nil, nil)") + require.Equal(t, 1, sa.QueuedPrompts(sess.ID), "the follow-up must be queued, not folded") + + // Release the main turn so it completes and hands off to the queue. + close(large.gate) + require.NoError(t, <-mainDone) + + // Both turns must publish their own terminal RunComplete. + got := map[string]notify.RunComplete{} + deadline := time.After(5 * time.Second) + for len(got) < 2 { + select { + case ev := <-ch: + got[ev.Payload.RunID] = ev.Payload + case <-deadline: + t.Fatalf("timed out waiting for both RunCompletes; got %v", got) + } + } + + main, ok := got["run-main"] + require.True(t, ok, "the active turn must publish its own RunComplete") + require.Empty(t, main.Error) + require.False(t, main.Cancelled) + + follow, ok := got["run-follow"] + require.True(t, ok, + "the queued RunID prompt must publish its own RunComplete instead of being folded silently") + require.Empty(t, follow.Error) + require.False(t, follow.Cancelled) + require.Equal(t, "done", follow.Text, "the queued prompt ran as its own turn") + + // Two distinct assistant turns prove the follow-up was not folded. + msgs, err := env.messages.List(t.Context(), sess.ID) + require.NoError(t, err) + var assistants, follows int + for _, m := range msgs { + switch m.Role { + case message.Assistant: + assistants++ + case message.User: + if m.Content().String() == "follow" { + follows++ + } + } + } + require.Equal(t, 2, assistants, "the active turn and the recursive turn each produce one assistant message") + require.Equal(t, 1, follows, "the follow-up prompt is its own user turn") +} diff --git a/internal/agent/run_complete_test.go b/internal/agent/run_complete_test.go index 50c5fdf9355c11123d91d26b7ec2e90b219df9d4..2fb6fbefab436ce97b428e9025ef79142da2ea85 100644 --- a/internal/agent/run_complete_test.go +++ b/internal/agent/run_complete_test.go @@ -58,13 +58,15 @@ func TestSessionAgentRun_QueueStripsOnComplete(t *testing.T) { "RunComplete still correlates with the originating SendMessage") } -// TestDrainUncanceledQueue_FiltersUnderDispatchLock verifies that the -// queue drain evaluates the per-session cancel mark while holding the -// dispatch mutex (canceledBySeq's documented precondition). Queued calls -// at or below the cancel high-water mark are dropped, calls queued after -// the cancel (higher seq) survive, untracked enqueues (seq == 0) are -// dropped whenever any mark is present, and the queue is cleared. -func TestDrainUncanceledQueue_FiltersUnderDispatchLock(t *testing.T) { +// TestDrainQueueForStep_FiltersUnderDispatchLock verifies that the queue +// drain evaluates the per-session cancel mark while holding the dispatch +// mutex (canceledBySeq's documented precondition). Queued calls at or +// below the cancel high-water mark are dropped, calls queued after the +// cancel (higher seq) are folded, untracked enqueues (seq == 0) are +// dropped whenever any mark is present, and the queue is cleared. These +// calls carry no RunID, so all foldable survivors are returned for +// folding (the existing follow-up behavior). +func TestDrainQueueForStep_FiltersUnderDispatchLock(t *testing.T) { t.Parallel() env := testEnv(t) @@ -83,19 +85,21 @@ func TestDrainUncanceledQueue_FiltersUnderDispatchLock(t *testing.T) { // Cancel high-water mark at seq 2: seq <= 2 and seq == 0 are covered. a.cancelMark.Set(sessionID, 2) - survivors := a.drainUncanceledQueue(sessionID) + fold, canceledWithRunID := a.drainQueueForStep(sessionID) - require.Len(t, survivors, 1, - "only the follow-up queued after the cancel (seq > mark) must survive") - require.Equal(t, "after", survivors[0].Prompt) + require.Len(t, fold, 1, + "only the follow-up queued after the cancel (seq > mark) must be folded") + require.Equal(t, "after", fold[0].Prompt) + require.Empty(t, canceledWithRunID, + "no dropped call carried a RunID, so none need a terminal RunComplete") _, ok := a.messageQueue.Get(sessionID) - require.False(t, ok, "drain must clear the session message queue") + require.False(t, ok, "drain must clear the session message queue when nothing is kept") } -// TestDrainUncanceledQueue_NoMarkKeepsAll verifies that with no cancel -// mark recorded, every queued call survives the drain. -func TestDrainUncanceledQueue_NoMarkKeepsAll(t *testing.T) { +// TestDrainQueueForStep_NoMarkFoldsAllNonRunID verifies that with no +// cancel mark recorded, every queued call without a RunID is folded. +func TestDrainQueueForStep_NoMarkFoldsAllNonRunID(t *testing.T) { t.Parallel() env := testEnv(t) @@ -110,8 +114,80 @@ func TestDrainUncanceledQueue_NoMarkKeepsAll(t *testing.T) { {SessionID: sessionID, Prompt: "b", acceptSeq: 5}, }) - survivors := a.drainUncanceledQueue(sessionID) - require.Len(t, survivors, 2, "no cancel mark means all queued calls survive") + fold, canceledWithRunID := a.drainQueueForStep(sessionID) + require.Len(t, fold, 2, "no cancel mark means all non-RunID queued calls are folded") + require.Empty(t, canceledWithRunID) +} + +// TestDrainQueueForStep_KeepsRunIDPromptsQueued is the core of fix 2: a +// queued prompt that carries a RunID must NOT be folded into the active +// turn. Folding it would silently absorb it into another turn and never +// publish a RunComplete for its RunID, hanging a `crush run` caller that +// blocks on that event. Such prompts are left in the queue so the +// recursive run path gives each its own turn and its own RunComplete. +// Non-RunID prompts are still folded. +func TestDrainQueueForStep_KeepsRunIDPromptsQueued(t *testing.T) { + t.Parallel() + + env := testEnv(t) + a := NewSessionAgent(SessionAgentOptions{ + Sessions: env.sessions, + Messages: env.messages, + }).(*sessionAgent) + + const sessionID = "drain-runid" + a.messageQueue.Set(sessionID, []SessionAgentCall{ + {SessionID: sessionID, Prompt: "fold-me", acceptSeq: 1}, + {SessionID: sessionID, RunID: "run-a", Prompt: "keep-me", acceptSeq: 2}, + {SessionID: sessionID, RunID: "run-b", Prompt: "keep-me-too", acceptSeq: 3}, + }) + + fold, canceledWithRunID := a.drainQueueForStep(sessionID) + + require.Len(t, fold, 1, "only the non-RunID prompt is folded into the active turn") + require.Equal(t, "fold-me", fold[0].Prompt) + require.Empty(t, canceledWithRunID) + + kept, ok := a.messageQueue.Get(sessionID) + require.True(t, ok, "RunID-bearing prompts must remain queued for the recursive run path") + require.Len(t, kept, 2) + require.Equal(t, "run-a", kept[0].RunID) + require.Equal(t, "run-b", kept[1].RunID) +} + +// TestDrainQueueForStep_ReportsCanceledRunIDDrops verifies that a queued +// prompt carrying a RunID that is dropped because a cancel covers it is +// reported in canceledWithRunID so the caller can publish its terminal +// cancelled RunComplete. A canceled prompt without a RunID is dropped +// silently as before. +func TestDrainQueueForStep_ReportsCanceledRunIDDrops(t *testing.T) { + t.Parallel() + + env := testEnv(t) + a := NewSessionAgent(SessionAgentOptions{ + Sessions: env.sessions, + Messages: env.messages, + }).(*sessionAgent) + + const sessionID = "drain-cancel-runid" + a.messageQueue.Set(sessionID, []SessionAgentCall{ + {SessionID: sessionID, RunID: "run-canceled", Prompt: "canceled", acceptSeq: 1}, + {SessionID: sessionID, Prompt: "canceled-no-runid", acceptSeq: 1}, + {SessionID: sessionID, RunID: "run-survives", Prompt: "survives", acceptSeq: 5}, + }) + a.cancelMark.Set(sessionID, 2) + + fold, canceledWithRunID := a.drainQueueForStep(sessionID) + + require.Empty(t, fold, "no uncanceled non-RunID prompts to fold") + require.Len(t, canceledWithRunID, 1, + "only the dropped RunID-bearing prompt needs a terminal RunComplete") + require.Equal(t, "run-canceled", canceledWithRunID[0].RunID) + + kept, ok := a.messageQueue.Get(sessionID) + require.True(t, ok) + require.Len(t, kept, 1, "the uncanceled RunID prompt stays queued") + require.Equal(t, "run-survives", kept[0].RunID) } // TestRunCompletePublisher_MustDeliverOverTakesPublish exercises the @@ -143,3 +219,104 @@ func TestRunCompletePublisher_MustDeliverOverTakesPublish(t *testing.T) { t.Fatal("PublishMustDeliver did not deliver event") } } + +// requireSingleCancelledRunComplete reads exactly one RunComplete from ch, +// asserts it is the cancelled terminal event for runID, and verifies no +// second event arrives. This observes the published pubsub event rather +// than internal bookkeeping, which is the contract a `crush run` caller +// blocking on the broker actually relies on. +func requireSingleCancelledRunComplete(t *testing.T, ch <-chan pubsub.Event[notify.RunComplete], sessionID, runID string) { + t.Helper() + select { + case ev := <-ch: + require.Equal(t, runID, ev.Payload.RunID, + "the published RunComplete must carry the dropped queued prompt's RunID") + require.Equal(t, sessionID, ev.Payload.SessionID) + require.True(t, ev.Payload.Cancelled, + "a dropped queued prompt must publish a cancelled RunComplete") + case <-time.After(5 * time.Second): + t.Fatal("timed out waiting for the cancelled RunComplete") + } + select { + case extra := <-ch: + t.Fatalf("expected exactly one RunComplete, got a second: %+v", extra.Payload) + case <-time.After(100 * time.Millisecond): + } +} + +// TestCancel_QueuedRunIDPromptPublishesCancelledRunComplete proves the +// terminal-event behavior end-to-end: a RunID-bearing prompt sitting in +// the queue that is canceled while queued (via the public Cancel path, +// which routes through clearQueueAndNotify -> publishCanceledQueueDrops) +// must emit exactly one cancelled RunComplete on the broker for its +// RunID. A queued prompt without a RunID is dropped silently. This is the +// coverage the earlier drain test lacked: it asserted the returned +// bookkeeping slice, not the published event a `crush run` caller awaits. +func TestCancel_QueuedRunIDPromptPublishesCancelledRunComplete(t *testing.T) { + t.Parallel() + + env := testEnv(t) + broker := pubsub.NewBroker[notify.RunComplete]() + t.Cleanup(broker.Shutdown) + + a := NewSessionAgent(SessionAgentOptions{ + Sessions: env.sessions, + Messages: env.messages, + RunComplete: broker, + }).(*sessionAgent) + + subCtx, subCancel := context.WithCancel(t.Context()) + defer subCancel() + ch := broker.Subscribe(subCtx) + + const sessionID = "cancel-queued-runid" + a.messageQueue.Set(sessionID, []SessionAgentCall{ + {SessionID: sessionID, Prompt: "no-runid", acceptSeq: 1}, + {SessionID: sessionID, RunID: "run-queued", Prompt: "queued", acceptSeq: 2}, + }) + + a.Cancel(sessionID) + + requireSingleCancelledRunComplete(t, ch, sessionID, "run-queued") + + _, ok := a.messageQueue.Get(sessionID) + require.False(t, ok, "Cancel must clear the queue") +} + +// TestDrainQueueForStep_DroppedRunIDPublishesCancelledRunComplete drives +// the production drain sequence (drainQueueForStep then +// publishCanceledQueueDrops, mirroring the PrepareStep handoff) and +// asserts the dropped RunID-bearing prompt actually publishes exactly one +// cancelled RunComplete on the broker. The companion bookkeeping test +// covers the returned slice; this one covers the observable terminal +// event. +func TestDrainQueueForStep_DroppedRunIDPublishesCancelledRunComplete(t *testing.T) { + t.Parallel() + + env := testEnv(t) + broker := pubsub.NewBroker[notify.RunComplete]() + t.Cleanup(broker.Shutdown) + + a := NewSessionAgent(SessionAgentOptions{ + Sessions: env.sessions, + Messages: env.messages, + RunComplete: broker, + }).(*sessionAgent) + + subCtx, subCancel := context.WithCancel(t.Context()) + defer subCancel() + ch := broker.Subscribe(subCtx) + + const sessionID = "drain-drop-runid" + a.messageQueue.Set(sessionID, []SessionAgentCall{ + {SessionID: sessionID, RunID: "run-dropped", Prompt: "dropped", acceptSeq: 1}, + {SessionID: sessionID, Prompt: "dropped-no-runid", acceptSeq: 1}, + }) + a.cancelMark.Set(sessionID, 2) + + _, canceledWithRunID := a.drainQueueForStep(sessionID) + require.Len(t, canceledWithRunID, 1) + a.publishCanceledQueueDrops(canceledWithRunID) + + requireSingleCancelledRunComplete(t, ch, sessionID, "run-dropped") +}