accepted_run_integration_test.go

  1package backend
  2
  3import (
  4	"context"
  5	"testing"
  6	"time"
  7
  8	"charm.land/fantasy"
  9	"github.com/charmbracelet/crush/internal/agent"
 10	"github.com/charmbracelet/crush/internal/agent/agenttest"
 11	"github.com/charmbracelet/crush/internal/db"
 12	"github.com/charmbracelet/crush/internal/message"
 13	"github.com/charmbracelet/crush/internal/proto"
 14	"github.com/charmbracelet/crush/internal/session"
 15	"github.com/stretchr/testify/require"
 16)
 17
 18// gatedCoordinator wraps a real agent.Coordinator and parks RunAccepted
 19// before delegating to it. Every method other than RunAccepted is
 20// inherited from the embedded coordinator, so BeginAccepted (called by
 21// Backend.SendMessage) and RunAccepted (called by the dispatched run)
 22// are the production agent.Coordinator implementations under test, not
 23// stubs. The gate only delays entry into the real RunAccepted so a
 24// cancel can be made to land in the accepted-but-not-yet-active window
 25// deterministically: the accept handle is not consumed by
 26// sessionAgent.Run until the real RunAccepted runs after the gate opens.
 27type gatedCoordinator struct {
 28	agent.Coordinator
 29	entered chan struct{}
 30	gate    chan struct{}
 31}
 32
 33func (c *gatedCoordinator) RunAccepted(ctx context.Context, accept *agent.AcceptedRun, sessionID, prompt string, attachments ...message.Attachment) (*fantasy.AgentResult, error) {
 34	close(c.entered)
 35	<-c.gate
 36	return c.Coordinator.RunAccepted(ctx, accept, sessionID, prompt, attachments...)
 37}
 38
 39// newRealCoordinator builds a production agent.Coordinator over a
 40// DB-backed session/message store, wrapped in a gate. It is constructed
 41// through the real agent.NewCoordinator path (via the test-only
 42// agenttest helper) with an offline-resolvable model: the
 43// cancel-on-entry path under test persists a canceled turn and returns
 44// before any model call, so no network I/O happens.
 45func newRealCoordinator(t *testing.T) (*gatedCoordinator, session.Service, message.Service) {
 46	t.Helper()
 47	conn, err := db.Connect(t.Context(), t.TempDir())
 48	require.NoError(t, err)
 49	t.Cleanup(func() { conn.Close() })
 50
 51	q := db.New(conn)
 52	sessions := session.NewService(q, conn)
 53	messages := message.NewService(q)
 54
 55	coord, err := agenttest.NewCoordinator(t.Context(), t.TempDir(), sessions, messages)
 56	require.NoError(t, err)
 57
 58	return &gatedCoordinator{
 59		Coordinator: coord,
 60		entered:     make(chan struct{}),
 61		gate:        make(chan struct{}),
 62	}, sessions, messages
 63}
 64
 65// TestSendMessage_AcceptedCancelRace_RealMachinery exercises the
 66// 202/cancel race end-to-end through Backend.SendMessage against the
 67// production agent.Coordinator (BeginAccepted + RunAccepted), not a
 68// stub. It asserts that a cancel arriving after the prompt is accepted
 69// but before the run becomes active is not lost: the accepted handle
 70// reaches sessionAgent.Run and drives cancel-on-entry, which persists a
 71// canceled turn instead of streaming.
 72//
 73// This test would fail if Coordinator.BeginAccepted returned nil (Cancel
 74// would find no accepted run and record no mark, and the run would
 75// receive a nil Accepted handle and skip cancel-on-entry) or if
 76// Coordinator.RunAccepted dropped the handle on its way into
 77// sessionAgent.Run (the run would likewise skip cancel-on-entry and try
 78// to stream the model). In either case no FinishReasonCanceled turn
 79// would be persisted.
 80func TestSendMessage_AcceptedCancelRace_RealMachinery(t *testing.T) {
 81	t.Parallel()
 82	b, _ := newTestBackend(t)
 83
 84	coord, sessions, messages := newRealCoordinator(t)
 85	sess, err := sessions.Create(t.Context(), "session")
 86	require.NoError(t, err)
 87
 88	ws := insertAgentWorkspace(t, b, coord)
 89
 90	require.NoError(t, b.SendMessage(ws.ID, proto.AgentMessage{SessionID: sess.ID, Prompt: "hi"}))
 91
 92	// Coordinator.BeginAccepted ran synchronously inside SendMessage
 93	// before dispatch; the dispatched run has now entered the gate but
 94	// has not yet called the real RunAccepted, so the accept handle is
 95	// not yet consumed: the prompt is accepted but not active.
 96	select {
 97	case <-coord.entered:
 98	case <-time.After(2 * time.Second):
 99		t.Fatal("dispatched run never entered RunAccepted")
100	}
101
102	// A cancel arriving now lands in the accepted-but-not-yet-active
103	// window and is only recorded because BeginAccepted incremented the
104	// accept counter.
105	require.NoError(t, b.CancelSession(ws.ID, sess.ID))
106
107	// Release the gate so the real RunAccepted threads the handle into
108	// sessionAgent.Run, which drives cancel-on-entry.
109	close(coord.gate)
110
111	// The dispatched run returns nil (cancel-on-entry), so runWG drains.
112	waited := make(chan struct{})
113	go func() {
114		ws.runWG.Wait()
115		close(waited)
116	}()
117	select {
118	case <-waited:
119	case <-time.After(5 * time.Second):
120		t.Fatal("runWG.Wait did not complete after the canceled run returned")
121	}
122
123	// The accepted-but-not-yet-active cancel persisted a canceled turn
124	// rather than streaming a real response.
125	msgs, err := messages.List(t.Context(), sess.ID)
126	require.NoError(t, err)
127	require.Len(t, msgs, 2)
128	require.Equal(t, message.User, msgs[0].Role)
129	require.Equal(t, message.Assistant, msgs[1].Role)
130	require.Equal(t, message.FinishReasonCanceled, msgs[1].FinishReason())
131}