1package backend
2
3import (
4 "context"
5 "testing"
6 "time"
7
8 "charm.land/fantasy"
9 "github.com/charmbracelet/crush/internal/agent"
10 "github.com/charmbracelet/crush/internal/agent/agenttest"
11 "github.com/charmbracelet/crush/internal/db"
12 "github.com/charmbracelet/crush/internal/message"
13 "github.com/charmbracelet/crush/internal/proto"
14 "github.com/charmbracelet/crush/internal/session"
15 "github.com/stretchr/testify/require"
16)
17
18// gatedCoordinator wraps a real agent.Coordinator and parks RunAccepted
19// before delegating to it. Every method other than RunAccepted is
20// inherited from the embedded coordinator, so BeginAccepted (called by
21// Backend.SendMessage) and RunAccepted (called by the dispatched run)
22// are the production agent.Coordinator implementations under test, not
23// stubs. The gate only delays entry into the real RunAccepted so a
24// cancel can be made to land in the accepted-but-not-yet-active window
25// deterministically: the accept handle is not consumed by
26// sessionAgent.Run until the real RunAccepted runs after the gate opens.
27type gatedCoordinator struct {
28 agent.Coordinator
29 entered chan struct{}
30 gate chan struct{}
31}
32
33func (c *gatedCoordinator) RunAccepted(ctx context.Context, accept *agent.AcceptedRun, sessionID, prompt string, attachments ...message.Attachment) (*fantasy.AgentResult, error) {
34 close(c.entered)
35 <-c.gate
36 return c.Coordinator.RunAccepted(ctx, accept, sessionID, prompt, attachments...)
37}
38
39// newRealCoordinator builds a production agent.Coordinator over a
40// DB-backed session/message store, wrapped in a gate. It is constructed
41// through the real agent.NewCoordinator path (via the test-only
42// agenttest helper) with an offline-resolvable model: the
43// cancel-on-entry path under test persists a canceled turn and returns
44// before any model call, so no network I/O happens.
45func newRealCoordinator(t *testing.T) (*gatedCoordinator, session.Service, message.Service) {
46 t.Helper()
47 conn, err := db.Connect(t.Context(), t.TempDir())
48 require.NoError(t, err)
49 t.Cleanup(func() { conn.Close() })
50
51 q := db.New(conn)
52 sessions := session.NewService(q, conn)
53 messages := message.NewService(q)
54
55 coord, err := agenttest.NewCoordinator(t.Context(), t.TempDir(), sessions, messages)
56 require.NoError(t, err)
57
58 return &gatedCoordinator{
59 Coordinator: coord,
60 entered: make(chan struct{}),
61 gate: make(chan struct{}),
62 }, sessions, messages
63}
64
65// TestSendMessage_AcceptedCancelRace_RealMachinery exercises the
66// 202/cancel race end-to-end through Backend.SendMessage against the
67// production agent.Coordinator (BeginAccepted + RunAccepted), not a
68// stub. It asserts that a cancel arriving after the prompt is accepted
69// but before the run becomes active is not lost: the accepted handle
70// reaches sessionAgent.Run and drives cancel-on-entry, which persists a
71// canceled turn instead of streaming.
72//
73// This test would fail if Coordinator.BeginAccepted returned nil (Cancel
74// would find no accepted run and record no mark, and the run would
75// receive a nil Accepted handle and skip cancel-on-entry) or if
76// Coordinator.RunAccepted dropped the handle on its way into
77// sessionAgent.Run (the run would likewise skip cancel-on-entry and try
78// to stream the model). In either case no FinishReasonCanceled turn
79// would be persisted.
80func TestSendMessage_AcceptedCancelRace_RealMachinery(t *testing.T) {
81 t.Parallel()
82 b, _ := newTestBackend(t)
83
84 coord, sessions, messages := newRealCoordinator(t)
85 sess, err := sessions.Create(t.Context(), "session")
86 require.NoError(t, err)
87
88 ws := insertAgentWorkspace(t, b, coord)
89
90 require.NoError(t, b.SendMessage(ws.ID, proto.AgentMessage{SessionID: sess.ID, Prompt: "hi"}))
91
92 // Coordinator.BeginAccepted ran synchronously inside SendMessage
93 // before dispatch; the dispatched run has now entered the gate but
94 // has not yet called the real RunAccepted, so the accept handle is
95 // not yet consumed: the prompt is accepted but not active.
96 select {
97 case <-coord.entered:
98 case <-time.After(2 * time.Second):
99 t.Fatal("dispatched run never entered RunAccepted")
100 }
101
102 // A cancel arriving now lands in the accepted-but-not-yet-active
103 // window and is only recorded because BeginAccepted incremented the
104 // accept counter.
105 require.NoError(t, b.CancelSession(ws.ID, sess.ID))
106
107 // Release the gate so the real RunAccepted threads the handle into
108 // sessionAgent.Run, which drives cancel-on-entry.
109 close(coord.gate)
110
111 // The dispatched run returns nil (cancel-on-entry), so runWG drains.
112 waited := make(chan struct{})
113 go func() {
114 ws.runWG.Wait()
115 close(waited)
116 }()
117 select {
118 case <-waited:
119 case <-time.After(5 * time.Second):
120 t.Fatal("runWG.Wait did not complete after the canceled run returned")
121 }
122
123 // The accepted-but-not-yet-active cancel persisted a canceled turn
124 // rather than streaming a real response.
125 msgs, err := messages.List(t.Context(), sess.ID)
126 require.NoError(t, err)
127 require.Len(t, msgs, 2)
128 require.Equal(t, message.User, msgs[0].Role)
129 require.Equal(t, message.Assistant, msgs[1].Role)
130 require.Equal(t, message.FinishReasonCanceled, msgs[1].FinishReason())
131}