run_complete_test.go

 1package agent
 2
 3import (
 4	"context"
 5	"testing"
 6	"time"
 7
 8	"github.com/charmbracelet/crush/internal/agent/notify"
 9	"github.com/charmbracelet/crush/internal/pubsub"
10	"github.com/stretchr/testify/require"
11)
12
13// TestSessionAgentRun_QueueStripsOnComplete verifies that when a Run
14// call is enqueued (because the session is already busy), the
15// OnComplete hook is NOT propagated onto the queued copy. The hook
16// belongs to the caller's retry/coalesce scope (typically
17// coordinator.Run) which has already returned by the time the queue
18// drains; carrying it forward would silently funnel the terminal
19// event into a closure nobody reads, and subscribers (`crush run`)
20// would hang waiting for a RunComplete that never publishes.
21func TestSessionAgentRun_QueueStripsOnComplete(t *testing.T) {
22	t.Parallel()
23
24	env := testEnv(t)
25	a := NewSessionAgent(SessionAgentOptions{
26		Sessions: env.sessions,
27		Messages: env.messages,
28	}).(*sessionAgent)
29
30	const sessionID = "queued-session"
31	// Mark the session as busy so Run takes the queue branch
32	// without needing a real model.
33	a.activeRequests.Set(sessionID, func() {})
34
35	var called bool
36	hook := func(notify.RunComplete) { called = true }
37
38	res, err := a.Run(t.Context(), SessionAgentCall{
39		SessionID:  sessionID,
40		RunID:      "run-xyz",
41		Prompt:     "queued prompt",
42		OnComplete: hook,
43	})
44	require.NoError(t, err)
45	require.Nil(t, res, "queued Run must return (nil, nil)")
46	require.False(t, called,
47		"OnComplete must not fire on the enqueue path; the caller's scope is still live")
48
49	queued, ok := a.messageQueue.Get(sessionID)
50	require.True(t, ok)
51	require.Len(t, queued, 1)
52	require.Nil(t, queued[0].OnComplete,
53		"queued SessionAgentCall must have OnComplete stripped so the drain falls back to the default broker publish")
54	require.Equal(t, "queued prompt", queued[0].Prompt,
55		"all other fields must be preserved on the queued copy")
56	require.Equal(t, "run-xyz", queued[0].RunID,
57		"RunID must be preserved on the queued copy so the drained turn's "+
58			"RunComplete still correlates with the originating SendMessage")
59}
60
61// TestRunCompletePublisher_MustDeliverOverTakesPublish exercises the
62// pubsub.Publisher interface change end-to-end: a Broker is the only
63// concrete Publisher implementation and must satisfy both Publish and
64// PublishMustDeliver. The coordinator's final RunComplete emit relies
65// on PublishMustDeliver to apply bounded-blocking semantics so a
66// momentarily-full subscriber buffer can't silently drop the
67// authoritative end-of-run event.
68func TestRunCompletePublisher_MustDeliverOverTakesPublish(t *testing.T) {
69	t.Parallel()
70
71	broker := pubsub.NewBroker[notify.RunComplete]()
72	t.Cleanup(broker.Shutdown)
73
74	// Subscribe before publishing so the event is delivered.
75	ctx, cancel := context.WithCancel(t.Context())
76	defer cancel()
77	ch := broker.Subscribe(ctx)
78
79	rc := notify.RunComplete{SessionID: "S", MessageID: "m", Text: "ok"}
80	var pub pubsub.Publisher[notify.RunComplete] = broker
81	pub.PublishMustDeliver(t.Context(), pubsub.UpdatedEvent, rc)
82
83	select {
84	case got := <-ch:
85		require.Equal(t, rc, got.Payload)
86	case <-time.After(time.Second):
87		t.Fatal("PublishMustDeliver did not deliver event")
88	}
89}