1package agent
2
3import (
4 "context"
5 "testing"
6 "time"
7
8 "github.com/charmbracelet/crush/internal/agent/notify"
9 "github.com/charmbracelet/crush/internal/pubsub"
10 "github.com/stretchr/testify/require"
11)
12
13// TestSessionAgentRun_QueueStripsOnComplete verifies that when a Run
14// call is enqueued (because the session is already busy), the
15// OnComplete hook is NOT propagated onto the queued copy. The hook
16// belongs to the caller's retry/coalesce scope (typically
17// coordinator.Run) which has already returned by the time the queue
18// drains; carrying it forward would silently funnel the terminal
19// event into a closure nobody reads, and subscribers (`crush run`)
20// would hang waiting for a RunComplete that never publishes.
21func TestSessionAgentRun_QueueStripsOnComplete(t *testing.T) {
22 t.Parallel()
23
24 env := testEnv(t)
25 a := NewSessionAgent(SessionAgentOptions{
26 Sessions: env.sessions,
27 Messages: env.messages,
28 }).(*sessionAgent)
29
30 const sessionID = "queued-session"
31 // Mark the session as busy so Run takes the queue branch
32 // without needing a real model.
33 a.activeRequests.Set(sessionID, func() {})
34
35 var called bool
36 hook := func(notify.RunComplete) { called = true }
37
38 res, err := a.Run(t.Context(), SessionAgentCall{
39 SessionID: sessionID,
40 RunID: "run-xyz",
41 Prompt: "queued prompt",
42 OnComplete: hook,
43 })
44 require.NoError(t, err)
45 require.Nil(t, res, "queued Run must return (nil, nil)")
46 require.False(t, called,
47 "OnComplete must not fire on the enqueue path; the caller's scope is still live")
48
49 queued, ok := a.messageQueue.Get(sessionID)
50 require.True(t, ok)
51 require.Len(t, queued, 1)
52 require.Nil(t, queued[0].OnComplete,
53 "queued SessionAgentCall must have OnComplete stripped so the drain falls back to the default broker publish")
54 require.Equal(t, "queued prompt", queued[0].Prompt,
55 "all other fields must be preserved on the queued copy")
56 require.Equal(t, "run-xyz", queued[0].RunID,
57 "RunID must be preserved on the queued copy so the drained turn's "+
58 "RunComplete still correlates with the originating SendMessage")
59}
60
61// TestRunCompletePublisher_MustDeliverOverTakesPublish exercises the
62// pubsub.Publisher interface change end-to-end: a Broker is the only
63// concrete Publisher implementation and must satisfy both Publish and
64// PublishMustDeliver. The coordinator's final RunComplete emit relies
65// on PublishMustDeliver to apply bounded-blocking semantics so a
66// momentarily-full subscriber buffer can't silently drop the
67// authoritative end-of-run event.
68func TestRunCompletePublisher_MustDeliverOverTakesPublish(t *testing.T) {
69 t.Parallel()
70
71 broker := pubsub.NewBroker[notify.RunComplete]()
72 t.Cleanup(broker.Shutdown)
73
74 // Subscribe before publishing so the event is delivered.
75 ctx, cancel := context.WithCancel(t.Context())
76 defer cancel()
77 ch := broker.Subscribe(ctx)
78
79 rc := notify.RunComplete{SessionID: "S", MessageID: "m", Text: "ok"}
80 var pub pubsub.Publisher[notify.RunComplete] = broker
81 pub.PublishMustDeliver(t.Context(), pubsub.UpdatedEvent, rc)
82
83 select {
84 case got := <-ch:
85 require.Equal(t, rc, got.Payload)
86 case <-time.After(time.Second):
87 t.Fatal("PublishMustDeliver did not deliver event")
88 }
89}