@@ -359,28 +359,96 @@ func (a *sessionAgent) enqueueCall(call SessionAgentCall) {
a.messageQueue.Set(call.SessionID, existing)
}
-// drainUncanceledQueue copies and clears the session's message queue and
-// returns the queued calls not covered by a pending cancel. The queue
-// copy/delete and the cancel-mark check happen under the per-session
-// dispatch mutex so the filtering is atomic against a concurrent Cancel:
-// canceledBySeq requires the caller to hold that mutex, and evaluating it
-// here (rather than after unlocking) prevents a cancel recorded between
-// the drain and the check from being observed inconsistently. The
-// returned survivors are processed by the caller without the lock held.
-func (a *sessionAgent) drainUncanceledQueue(sessionID string) []SessionAgentCall {
+// drainQueueForStep partitions the session's queued calls for the current
+// streaming step under the per-session dispatch mutex so the filtering is
+// atomic against a concurrent Cancel: canceledBySeq requires the caller to
+// hold that mutex, and evaluating it here (rather than after unlocking)
+// prevents a cancel recorded between the drain and the check from being
+// observed inconsistently.
+//
+// Calls covered by a pending cancel are dropped; the dropped ones that
+// carry a RunID are returned in canceledWithRunID so the caller can
+// publish their terminal cancelled RunComplete (a caller waiting on that
+// RunID, e.g. `crush run`, would otherwise hang). Uncanceled calls without
+// a RunID are returned in fold to be folded into the active turn,
+// preserving the existing follow-up behavior. Uncanceled calls that carry
+// a RunID are left in the queue so each runs as its own turn via the
+// recursive run path and publishes its own RunComplete, giving every
+// RunID-bearing prompt an explicit lifecycle instead of being silently
+// absorbed into another turn. fold is processed by the caller without the
+// lock held.
+func (a *sessionAgent) drainQueueForStep(sessionID string) (fold, canceledWithRunID []SessionAgentCall) {
dispatchLock := a.sessionMu(sessionID)
dispatchLock.Lock()
defer dispatchLock.Unlock()
queuedCalls, _ := a.messageQueue.Get(sessionID)
- a.messageQueue.Del(sessionID)
- survivors := queuedCalls[:0]
+ var keep []SessionAgentCall
for _, queued := range queuedCalls {
if a.canceledBySeq(sessionID, queued.acceptSeq) {
+ if queued.RunID != "" {
+ canceledWithRunID = append(canceledWithRunID, queued)
+ }
+ continue
+ }
+ if queued.RunID != "" {
+ keep = append(keep, queued)
+ continue
+ }
+ fold = append(fold, queued)
+ }
+ if len(keep) == 0 {
+ a.messageQueue.Del(sessionID)
+ } else {
+ a.messageQueue.Set(sessionID, keep)
+ }
+ return fold, canceledWithRunID
+}
+
+// publishCanceledQueueDrops emits a terminal cancelled RunComplete for
+// every dropped queued call that carries a RunID. A queued prompt removed
+// from the queue without ever running — covered by a pending cancel, or
+// cleared by Cancel/ClearQueue — would otherwise leave a caller blocked on
+// that RunID: `crush run` ignores live message events and exits only on a
+// RunComplete whose RunID matches. Calls without a RunID had no such waiter
+// and are dropped silently as before. A detached, bounded context keeps the
+// must-deliver publish alive even when the run context that triggered the
+// drop is already canceled.
+func (a *sessionAgent) publishCanceledQueueDrops(drops []SessionAgentCall) {
+ var hasRunID bool
+ for _, d := range drops {
+ if d.RunID != "" {
+ hasRunID = true
+ break
+ }
+ }
+ if !hasRunID {
+ return
+ }
+ ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+ defer cancel()
+ for _, d := range drops {
+ if d.RunID == "" {
continue
}
- survivors = append(survivors, queued)
+ a.publishRunComplete(ctx, d, notify.RunComplete{
+ SessionID: d.SessionID,
+ RunID: d.RunID,
+ Cancelled: true,
+ })
}
- return survivors
+}
+
+// clearQueueAndNotify removes all queued prompts for the session and
+// publishes a terminal cancelled RunComplete for any that carried a RunID,
+// so callers waiting on those RunIDs (e.g. `crush run`) are not left
+// hanging when their queued prompt is discarded without running.
+func (a *sessionAgent) clearQueueAndNotify(sessionID string) {
+ queued, ok := a.messageQueue.Get(sessionID)
+ a.messageQueue.Del(sessionID)
+ if !ok {
+ return
+ }
+ a.publishCanceledQueueDrops(queued)
}
// clearPendingCancel removes any pending-cancel mark for sessionID. It
@@ -735,14 +803,20 @@ func (a *sessionAgent) Run(ctx context.Context, call SessionAgentCall) (result *
// Use latest tools (updated by SetTools when MCP tools change).
prepared.Tools = a.tools.Copy()
- // Drain queued follow-up prompts, but skip any covered by a
- // cancel recorded while they sat in the queue: a cancel that
- // arrived after a prompt was queued must not let it run as
- // part of this step. Coverage is per-call by accept sequence
- // so a follow-up queued after the cancel (higher seq) is
- // still folded in.
- survivors := a.drainUncanceledQueue(call.SessionID)
- for _, queued := range survivors {
+ // Drain queued follow-up prompts for this step. Calls covered
+ // by a cancel recorded while they sat in the queue are dropped:
+ // a cancel that arrived after a prompt was queued must not let
+ // it run as part of this step. Coverage is per-call by accept
+ // sequence so a follow-up queued after the cancel (higher seq)
+ // is not dropped. A dropped prompt carrying a RunID still gets
+ // its terminal cancelled RunComplete so a caller waiting on it
+ // does not hang. Uncanceled prompts without a RunID are folded
+ // into this turn; uncanceled prompts with a RunID are left
+ // queued so each runs as its own turn (with its own
+ // RunComplete) via the recursive run path below.
+ fold, canceledRunIDs := a.drainQueueForStep(call.SessionID)
+ a.publishCanceledQueueDrops(canceledRunIDs)
+ for _, queued := range fold {
userMessage, createErr := a.createUserMessage(callContext, queued)
if createErr != nil {
return callContext, prepared, createErr
@@ -1125,14 +1199,22 @@ func (a *sessionAgent) Run(ctx context.Context, call SessionAgentCall) (result *
// mark, or untracked); keep any queued after the cancel (higher
// sequence) so they still run.
var kept []SessionAgentCall
+ var canceledRunIDDrops []SessionAgentCall
for _, q := range queuedMessages {
if q.acceptSeq == 0 || q.acceptSeq <= mark {
+ if q.RunID != "" {
+ canceledRunIDDrops = append(canceledRunIDDrops, q)
+ }
continue
}
kept = append(kept, q)
}
queuedMessages = kept
a.messageQueue.Set(call.SessionID, kept)
+ // A dropped prompt carrying a RunID must still publish its
+ // terminal cancelled RunComplete so a caller waiting on that
+ // RunID does not hang.
+ a.publishCanceledQueueDrops(canceledRunIDDrops)
}
if len(queuedMessages) == 0 {
// No queued work. Clear the cancel mark only when no accepted
@@ -1151,12 +1233,29 @@ func (a *sessionAgent) Run(ctx context.Context, call SessionAgentCall) (result *
mu.Unlock()
return result, err
}
- // There are queued messages restart the loop. The recursive Run
- // publishes its own RunComplete for the queued prompt, so suppress
- // the outer defer's emit to avoid a duplicate event whose Error
- // field would belong to the recursive turn but whose MessageID/Text
- // would belong to the outer turn.
+ // There are queued messages, restart the loop. Suppress the outer
+ // defer's emit: it would otherwise observe the recursive Run's retErr
+ // (named-return clobbering through the return below) against this
+ // turn's MessageID/Text and publish a mixed, racing event.
skipRunComplete = true
+ // Decide whether this turn still owes its own terminal RunComplete.
+ // Each submitted prompt with a RunID has its own lifecycle, so a turn
+ // that is finished and handing off to a *different* queued prompt must
+ // publish its own RunComplete here — leaving it to the recursive turn
+ // (which carries a different RunID) would hang a caller waiting on
+ // this turn's RunID. The exception is the summarize-continuation path,
+ // which re-queues this same call (same RunID) to resume after a
+ // summary; in that case the eventual terminal turn for this RunID
+ // publishes, so publishing now would double-emit.
+ outerOwesRunComplete := call.RunID != ""
+ if outerOwesRunComplete {
+ for _, q := range queuedMessages {
+ if q.RunID == call.RunID {
+ outerOwesRunComplete = false
+ break
+ }
+ }
+ }
firstQueuedMessage := queuedMessages[0]
a.messageQueue.Set(call.SessionID, queuedMessages[1:])
// Reserve a fresh accept for the dequeued prompt before dropping the
@@ -1167,6 +1266,17 @@ func (a *sessionAgent) Run(ctx context.Context, call SessionAgentCall) (result *
// the recursive Run's accepted path observes as cancel-on-entry.
firstQueuedMessage.Accepted = a.BeginAccepted(call.SessionID)
mu.Unlock()
+ if outerOwesRunComplete {
+ complete := notify.RunComplete{SessionID: call.SessionID, RunID: call.RunID}
+ if currentAssistant != nil {
+ complete.MessageID = currentAssistant.ID
+ complete.Text = currentAssistant.Content().String()
+ }
+ if ctx.Err() != nil {
+ complete.Cancelled = true
+ }
+ a.publishRunComplete(ctx, call, complete)
+ }
return a.Run(ctx, firstQueuedMessage)
}
@@ -1770,14 +1880,14 @@ func (a *sessionAgent) Cancel(sessionID string) {
if a.QueuedPrompts(sessionID) > 0 {
slog.Debug("Clearing queued prompts", "session_id", sessionID)
- a.messageQueue.Del(sessionID)
+ a.clearQueueAndNotify(sessionID)
}
}
func (a *sessionAgent) ClearQueue(sessionID string) {
if a.QueuedPrompts(sessionID) > 0 {
slog.Debug("Clearing queued prompts", "session_id", sessionID)
- a.messageQueue.Del(sessionID)
+ a.clearQueueAndNotify(sessionID)
}
}
@@ -0,0 +1,181 @@
+package agent
+
+import (
+ "context"
+ "errors"
+ "sync/atomic"
+ "testing"
+ "time"
+
+ "charm.land/catwalk/pkg/catwalk"
+ "charm.land/fantasy"
+ "github.com/charmbracelet/crush/internal/agent/notify"
+ "github.com/charmbracelet/crush/internal/message"
+ "github.com/charmbracelet/crush/internal/pubsub"
+ "github.com/stretchr/testify/require"
+)
+
+// gatedStreamModel streams a single text part followed by a clean finish,
+// but blocks the very first Stream call until its gate is released. That
+// lets a test hold a run "active" (past PrepareStep, inside Stream) just
+// long enough to enqueue a follow-up prompt behind the busy session.
+// Subsequent Stream calls (e.g. the recursive run draining the queue)
+// proceed immediately.
+type gatedStreamModel struct {
+ text string
+ gate chan struct{}
+ entered chan struct{}
+ calls atomic.Int64
+}
+
+func (m *gatedStreamModel) Provider() string { return "fake" }
+func (m *gatedStreamModel) Model() string { return "fake-model" }
+
+func (m *gatedStreamModel) Generate(ctx context.Context, call fantasy.Call) (*fantasy.Response, error) {
+ return &fantasy.Response{
+ Content: fantasy.ResponseContent{fantasy.TextContent{Text: m.text}},
+ FinishReason: fantasy.FinishReasonStop,
+ }, nil
+}
+
+func (m *gatedStreamModel) Stream(ctx context.Context, call fantasy.Call) (fantasy.StreamResponse, error) {
+ if m.calls.Add(1) == 1 {
+ close(m.entered)
+ select {
+ case <-m.gate:
+ case <-ctx.Done():
+ }
+ }
+ text := m.text
+ return func(yield func(fantasy.StreamPart) bool) {
+ if !yield(fantasy.StreamPart{Type: fantasy.StreamPartTypeTextStart, ID: "1"}) {
+ return
+ }
+ if !yield(fantasy.StreamPart{Type: fantasy.StreamPartTypeTextDelta, ID: "1", Delta: text}) {
+ return
+ }
+ if !yield(fantasy.StreamPart{Type: fantasy.StreamPartTypeTextEnd, ID: "1"}) {
+ return
+ }
+ yield(fantasy.StreamPart{Type: fantasy.StreamPartTypeFinish, FinishReason: fantasy.FinishReasonStop})
+ }, nil
+}
+
+func (m *gatedStreamModel) GenerateObject(ctx context.Context, call fantasy.ObjectCall) (*fantasy.ObjectResponse, error) {
+ return nil, errors.New("not implemented")
+}
+
+func (m *gatedStreamModel) StreamObject(ctx context.Context, call fantasy.ObjectCall) (fantasy.ObjectStreamResponse, error) {
+ return nil, errors.New("not implemented")
+}
+
+// TestRun_QueuedRunIDPromptRunsRecursivelyAndPublishesRunComplete is the
+// end-to-end proof of fix 2: a prompt carrying a RunID that is queued
+// behind a busy session must NOT be silently folded into the active turn.
+// It runs as its own turn via the recursive run path and publishes its
+// own terminal RunComplete, so a `crush run` caller blocking on that
+// RunID does not hang. The active turn keeps its own RunComplete too.
+func TestRun_QueuedRunIDPromptRunsRecursivelyAndPublishesRunComplete(t *testing.T) {
+ t.Parallel()
+
+ env := testEnv(t)
+ broker := pubsub.NewBroker[notify.RunComplete]()
+ t.Cleanup(broker.Shutdown)
+
+ large := &gatedStreamModel{
+ text: "done",
+ gate: make(chan struct{}),
+ entered: make(chan struct{}),
+ }
+ small := &finishStreamModel{text: "title"}
+
+ sa := NewSessionAgent(SessionAgentOptions{
+ LargeModel: Model{Model: large, CatwalkCfg: catwalk.Model{ContextWindow: 200000, DefaultMaxTokens: 10000}},
+ SmallModel: Model{Model: small, CatwalkCfg: catwalk.Model{ContextWindow: 200000, DefaultMaxTokens: 10000}},
+ IsYolo: true,
+ Sessions: env.sessions,
+ Messages: env.messages,
+ RunComplete: broker,
+ }).(*sessionAgent)
+
+ sess, err := env.sessions.Create(t.Context(), "session")
+ require.NoError(t, err)
+
+ subCtx, subCancel := context.WithCancel(t.Context())
+ defer subCancel()
+ ch := broker.Subscribe(subCtx)
+
+ // Start the main turn; it blocks inside Stream once active.
+ mainDone := make(chan error, 1)
+ go func() {
+ _, runErr := sa.Run(t.Context(), SessionAgentCall{
+ SessionID: sess.ID,
+ RunID: "run-main",
+ Prompt: "main",
+ })
+ mainDone <- runErr
+ }()
+
+ // Wait until the main turn is active (inside Stream).
+ select {
+ case <-large.entered:
+ case <-time.After(5 * time.Second):
+ t.Fatal("main run never entered Stream")
+ }
+ require.True(t, sa.IsSessionBusy(sess.ID), "main run must be active before enqueueing the follow-up")
+
+ // Enqueue a RunID-bearing follow-up behind the busy session.
+ res, err := sa.Run(t.Context(), SessionAgentCall{
+ SessionID: sess.ID,
+ RunID: "run-follow",
+ Prompt: "follow",
+ })
+ require.NoError(t, err)
+ require.Nil(t, res, "a busy-session follow-up must enqueue and return (nil, nil)")
+ require.Equal(t, 1, sa.QueuedPrompts(sess.ID), "the follow-up must be queued, not folded")
+
+ // Release the main turn so it completes and hands off to the queue.
+ close(large.gate)
+ require.NoError(t, <-mainDone)
+
+ // Both turns must publish their own terminal RunComplete.
+ got := map[string]notify.RunComplete{}
+ deadline := time.After(5 * time.Second)
+ for len(got) < 2 {
+ select {
+ case ev := <-ch:
+ got[ev.Payload.RunID] = ev.Payload
+ case <-deadline:
+ t.Fatalf("timed out waiting for both RunCompletes; got %v", got)
+ }
+ }
+
+ main, ok := got["run-main"]
+ require.True(t, ok, "the active turn must publish its own RunComplete")
+ require.Empty(t, main.Error)
+ require.False(t, main.Cancelled)
+
+ follow, ok := got["run-follow"]
+ require.True(t, ok,
+ "the queued RunID prompt must publish its own RunComplete instead of being folded silently")
+ require.Empty(t, follow.Error)
+ require.False(t, follow.Cancelled)
+ require.Equal(t, "done", follow.Text, "the queued prompt ran as its own turn")
+
+ // Two distinct assistant turns prove the follow-up was not folded.
+ msgs, err := env.messages.List(t.Context(), sess.ID)
+ require.NoError(t, err)
+ var assistants, follows int
+ for _, m := range msgs {
+ switch m.Role {
+ case message.Assistant:
+ assistants++
+ case message.User:
+ if m.Content().String() == "follow" {
+ follows++
+ }
+ }
+ }
+ require.Equal(t, 2, assistants, "the active turn and the recursive turn each produce one assistant message")
+ require.Equal(t, 1, follows, "the follow-up prompt is its own user turn")
+}
@@ -58,13 +58,15 @@ func TestSessionAgentRun_QueueStripsOnComplete(t *testing.T) {
"RunComplete still correlates with the originating SendMessage")
}
-// TestDrainUncanceledQueue_FiltersUnderDispatchLock verifies that the
-// queue drain evaluates the per-session cancel mark while holding the
-// dispatch mutex (canceledBySeq's documented precondition). Queued calls
-// at or below the cancel high-water mark are dropped, calls queued after
-// the cancel (higher seq) survive, untracked enqueues (seq == 0) are
-// dropped whenever any mark is present, and the queue is cleared.
-func TestDrainUncanceledQueue_FiltersUnderDispatchLock(t *testing.T) {
+// TestDrainQueueForStep_FiltersUnderDispatchLock verifies that the queue
+// drain evaluates the per-session cancel mark while holding the dispatch
+// mutex (canceledBySeq's documented precondition). Queued calls at or
+// below the cancel high-water mark are dropped, calls queued after the
+// cancel (higher seq) are folded, untracked enqueues (seq == 0) are
+// dropped whenever any mark is present, and the queue is cleared. These
+// calls carry no RunID, so all foldable survivors are returned for
+// folding (the existing follow-up behavior).
+func TestDrainQueueForStep_FiltersUnderDispatchLock(t *testing.T) {
t.Parallel()
env := testEnv(t)
@@ -83,19 +85,21 @@ func TestDrainUncanceledQueue_FiltersUnderDispatchLock(t *testing.T) {
// Cancel high-water mark at seq 2: seq <= 2 and seq == 0 are covered.
a.cancelMark.Set(sessionID, 2)
- survivors := a.drainUncanceledQueue(sessionID)
+ fold, canceledWithRunID := a.drainQueueForStep(sessionID)
- require.Len(t, survivors, 1,
- "only the follow-up queued after the cancel (seq > mark) must survive")
- require.Equal(t, "after", survivors[0].Prompt)
+ require.Len(t, fold, 1,
+ "only the follow-up queued after the cancel (seq > mark) must be folded")
+ require.Equal(t, "after", fold[0].Prompt)
+ require.Empty(t, canceledWithRunID,
+ "no dropped call carried a RunID, so none need a terminal RunComplete")
_, ok := a.messageQueue.Get(sessionID)
- require.False(t, ok, "drain must clear the session message queue")
+ require.False(t, ok, "drain must clear the session message queue when nothing is kept")
}
-// TestDrainUncanceledQueue_NoMarkKeepsAll verifies that with no cancel
-// mark recorded, every queued call survives the drain.
-func TestDrainUncanceledQueue_NoMarkKeepsAll(t *testing.T) {
+// TestDrainQueueForStep_NoMarkFoldsAllNonRunID verifies that with no
+// cancel mark recorded, every queued call without a RunID is folded.
+func TestDrainQueueForStep_NoMarkFoldsAllNonRunID(t *testing.T) {
t.Parallel()
env := testEnv(t)
@@ -110,8 +114,80 @@ func TestDrainUncanceledQueue_NoMarkKeepsAll(t *testing.T) {
{SessionID: sessionID, Prompt: "b", acceptSeq: 5},
})
- survivors := a.drainUncanceledQueue(sessionID)
- require.Len(t, survivors, 2, "no cancel mark means all queued calls survive")
+ fold, canceledWithRunID := a.drainQueueForStep(sessionID)
+ require.Len(t, fold, 2, "no cancel mark means all non-RunID queued calls are folded")
+ require.Empty(t, canceledWithRunID)
+}
+
+// TestDrainQueueForStep_KeepsRunIDPromptsQueued is the core of fix 2: a
+// queued prompt that carries a RunID must NOT be folded into the active
+// turn. Folding it would silently absorb it into another turn and never
+// publish a RunComplete for its RunID, hanging a `crush run` caller that
+// blocks on that event. Such prompts are left in the queue so the
+// recursive run path gives each its own turn and its own RunComplete.
+// Non-RunID prompts are still folded.
+func TestDrainQueueForStep_KeepsRunIDPromptsQueued(t *testing.T) {
+ t.Parallel()
+
+ env := testEnv(t)
+ a := NewSessionAgent(SessionAgentOptions{
+ Sessions: env.sessions,
+ Messages: env.messages,
+ }).(*sessionAgent)
+
+ const sessionID = "drain-runid"
+ a.messageQueue.Set(sessionID, []SessionAgentCall{
+ {SessionID: sessionID, Prompt: "fold-me", acceptSeq: 1},
+ {SessionID: sessionID, RunID: "run-a", Prompt: "keep-me", acceptSeq: 2},
+ {SessionID: sessionID, RunID: "run-b", Prompt: "keep-me-too", acceptSeq: 3},
+ })
+
+ fold, canceledWithRunID := a.drainQueueForStep(sessionID)
+
+ require.Len(t, fold, 1, "only the non-RunID prompt is folded into the active turn")
+ require.Equal(t, "fold-me", fold[0].Prompt)
+ require.Empty(t, canceledWithRunID)
+
+ kept, ok := a.messageQueue.Get(sessionID)
+ require.True(t, ok, "RunID-bearing prompts must remain queued for the recursive run path")
+ require.Len(t, kept, 2)
+ require.Equal(t, "run-a", kept[0].RunID)
+ require.Equal(t, "run-b", kept[1].RunID)
+}
+
+// TestDrainQueueForStep_ReportsCanceledRunIDDrops verifies that a queued
+// prompt carrying a RunID that is dropped because a cancel covers it is
+// reported in canceledWithRunID so the caller can publish its terminal
+// cancelled RunComplete. A canceled prompt without a RunID is dropped
+// silently as before.
+func TestDrainQueueForStep_ReportsCanceledRunIDDrops(t *testing.T) {
+ t.Parallel()
+
+ env := testEnv(t)
+ a := NewSessionAgent(SessionAgentOptions{
+ Sessions: env.sessions,
+ Messages: env.messages,
+ }).(*sessionAgent)
+
+ const sessionID = "drain-cancel-runid"
+ a.messageQueue.Set(sessionID, []SessionAgentCall{
+ {SessionID: sessionID, RunID: "run-canceled", Prompt: "canceled", acceptSeq: 1},
+ {SessionID: sessionID, Prompt: "canceled-no-runid", acceptSeq: 1},
+ {SessionID: sessionID, RunID: "run-survives", Prompt: "survives", acceptSeq: 5},
+ })
+ a.cancelMark.Set(sessionID, 2)
+
+ fold, canceledWithRunID := a.drainQueueForStep(sessionID)
+
+ require.Empty(t, fold, "no uncanceled non-RunID prompts to fold")
+ require.Len(t, canceledWithRunID, 1,
+ "only the dropped RunID-bearing prompt needs a terminal RunComplete")
+ require.Equal(t, "run-canceled", canceledWithRunID[0].RunID)
+
+ kept, ok := a.messageQueue.Get(sessionID)
+ require.True(t, ok)
+ require.Len(t, kept, 1, "the uncanceled RunID prompt stays queued")
+ require.Equal(t, "run-survives", kept[0].RunID)
}
// TestRunCompletePublisher_MustDeliverOverTakesPublish exercises the
@@ -143,3 +219,104 @@ func TestRunCompletePublisher_MustDeliverOverTakesPublish(t *testing.T) {
t.Fatal("PublishMustDeliver did not deliver event")
}
}
+
+// requireSingleCancelledRunComplete reads exactly one RunComplete from ch,
+// asserts it is the cancelled terminal event for runID, and verifies no
+// second event arrives. This observes the published pubsub event rather
+// than internal bookkeeping, which is the contract a `crush run` caller
+// blocking on the broker actually relies on.
+func requireSingleCancelledRunComplete(t *testing.T, ch <-chan pubsub.Event[notify.RunComplete], sessionID, runID string) {
+ t.Helper()
+ select {
+ case ev := <-ch:
+ require.Equal(t, runID, ev.Payload.RunID,
+ "the published RunComplete must carry the dropped queued prompt's RunID")
+ require.Equal(t, sessionID, ev.Payload.SessionID)
+ require.True(t, ev.Payload.Cancelled,
+ "a dropped queued prompt must publish a cancelled RunComplete")
+ case <-time.After(5 * time.Second):
+ t.Fatal("timed out waiting for the cancelled RunComplete")
+ }
+ select {
+ case extra := <-ch:
+ t.Fatalf("expected exactly one RunComplete, got a second: %+v", extra.Payload)
+ case <-time.After(100 * time.Millisecond):
+ }
+}
+
+// TestCancel_QueuedRunIDPromptPublishesCancelledRunComplete proves the
+// terminal-event behavior end-to-end: a RunID-bearing prompt sitting in
+// the queue that is canceled while queued (via the public Cancel path,
+// which routes through clearQueueAndNotify -> publishCanceledQueueDrops)
+// must emit exactly one cancelled RunComplete on the broker for its
+// RunID. A queued prompt without a RunID is dropped silently. This is the
+// coverage the earlier drain test lacked: it asserted the returned
+// bookkeeping slice, not the published event a `crush run` caller awaits.
+func TestCancel_QueuedRunIDPromptPublishesCancelledRunComplete(t *testing.T) {
+ t.Parallel()
+
+ env := testEnv(t)
+ broker := pubsub.NewBroker[notify.RunComplete]()
+ t.Cleanup(broker.Shutdown)
+
+ a := NewSessionAgent(SessionAgentOptions{
+ Sessions: env.sessions,
+ Messages: env.messages,
+ RunComplete: broker,
+ }).(*sessionAgent)
+
+ subCtx, subCancel := context.WithCancel(t.Context())
+ defer subCancel()
+ ch := broker.Subscribe(subCtx)
+
+ const sessionID = "cancel-queued-runid"
+ a.messageQueue.Set(sessionID, []SessionAgentCall{
+ {SessionID: sessionID, Prompt: "no-runid", acceptSeq: 1},
+ {SessionID: sessionID, RunID: "run-queued", Prompt: "queued", acceptSeq: 2},
+ })
+
+ a.Cancel(sessionID)
+
+ requireSingleCancelledRunComplete(t, ch, sessionID, "run-queued")
+
+ _, ok := a.messageQueue.Get(sessionID)
+ require.False(t, ok, "Cancel must clear the queue")
+}
+
+// TestDrainQueueForStep_DroppedRunIDPublishesCancelledRunComplete drives
+// the production drain sequence (drainQueueForStep then
+// publishCanceledQueueDrops, mirroring the PrepareStep handoff) and
+// asserts the dropped RunID-bearing prompt actually publishes exactly one
+// cancelled RunComplete on the broker. The companion bookkeeping test
+// covers the returned slice; this one covers the observable terminal
+// event.
+func TestDrainQueueForStep_DroppedRunIDPublishesCancelledRunComplete(t *testing.T) {
+ t.Parallel()
+
+ env := testEnv(t)
+ broker := pubsub.NewBroker[notify.RunComplete]()
+ t.Cleanup(broker.Shutdown)
+
+ a := NewSessionAgent(SessionAgentOptions{
+ Sessions: env.sessions,
+ Messages: env.messages,
+ RunComplete: broker,
+ }).(*sessionAgent)
+
+ subCtx, subCancel := context.WithCancel(t.Context())
+ defer subCancel()
+ ch := broker.Subscribe(subCtx)
+
+ const sessionID = "drain-drop-runid"
+ a.messageQueue.Set(sessionID, []SessionAgentCall{
+ {SessionID: sessionID, RunID: "run-dropped", Prompt: "dropped", acceptSeq: 1},
+ {SessionID: sessionID, Prompt: "dropped-no-runid", acceptSeq: 1},
+ })
+ a.cancelMark.Set(sessionID, 2)
+
+ _, canceledWithRunID := a.drainQueueForStep(sessionID)
+ require.Len(t, canceledWithRunID, 1)
+ a.publishCanceledQueueDrops(canceledWithRunID)
+
+ requireSingleCancelledRunComplete(t, ch, sessionID, "run-dropped")
+}