1package backend
2
3import (
4 "context"
5 "errors"
6 "testing"
7 "time"
8
9 "charm.land/fantasy"
10 "github.com/charmbracelet/crush/internal/agent"
11 "github.com/charmbracelet/crush/internal/app"
12 "github.com/charmbracelet/crush/internal/message"
13 "github.com/charmbracelet/crush/internal/proto"
14 "github.com/google/uuid"
15 "github.com/stretchr/testify/require"
16)
17
18// errorCoordinator is a minimal agent.Coordinator whose RunAccepted
19// returns a configurable error. When markPublished is true it stamps
20// the run-complete marker on the context before returning, simulating a
21// real coordinator that already published the run's authoritative
22// terminal RunComplete (so runAgent must not emit a duplicate fallback).
23type errorCoordinator struct {
24 err error
25 markPublished bool
26}
27
28func (c *errorCoordinator) Run(ctx context.Context, sessionID, prompt string, attachments ...message.Attachment) (*fantasy.AgentResult, error) {
29 return nil, c.err
30}
31
32func (c *errorCoordinator) RunAccepted(ctx context.Context, accept *agent.AcceptedRun, sessionID, prompt string, attachments ...message.Attachment) (*fantasy.AgentResult, error) {
33 if c.markPublished {
34 agent.MarkRunCompletePublished(ctx)
35 }
36 return nil, c.err
37}
38
39func (c *errorCoordinator) BeginAccepted(sessionID string) *agent.AcceptedRun { return nil }
40func (c *errorCoordinator) Cancel(string) {}
41func (c *errorCoordinator) CancelAll() {}
42func (c *errorCoordinator) IsBusy() bool { return false }
43func (c *errorCoordinator) IsSessionBusy(string) bool { return false }
44func (c *errorCoordinator) QueuedPrompts(string) int { return 0 }
45func (c *errorCoordinator) QueuedPromptsList(string) []string { return nil }
46func (c *errorCoordinator) ClearQueue(string) {}
47func (c *errorCoordinator) Summarize(context.Context, string) error { return nil }
48func (c *errorCoordinator) Model() agent.Model { return agent.Model{} }
49func (c *errorCoordinator) UpdateModels(context.Context) error { return nil }
50
51// insertRunCompleteWorkspace installs a workspace backed by a real
52// app.App (so the runCompletions broker exists) with the given
53// coordinator and a workspace run context derived from base.
54func insertRunCompleteWorkspace(t *testing.T, b *Backend, base context.Context, coord agent.Coordinator) *Workspace {
55 t.Helper()
56 a := app.NewForTest(base)
57 a.AgentCoordinator = coord
58 t.Cleanup(a.ShutdownForTest)
59 ws := &Workspace{
60 ID: uuid.New().String(),
61 Path: t.TempDir(),
62 resolvedPath: t.TempDir(),
63 clients: make(map[string]*clientState),
64 shutdownFn: func() {},
65 }
66 ws.App = a
67 ws.ctx, ws.cancel = context.WithCancel(base)
68 b.mu.Lock()
69 b.workspaces.Set(ws.ID, ws)
70 b.pathIndex[ws.resolvedPath] = ws.ID
71 b.mu.Unlock()
72 return ws
73}
74
75// TestRunAgent_PreRunErrorPublishesTerminalRunComplete proves that an
76// error returned from RunAccepted before the coordinator could publish
77// its own terminal event (e.g. a readyWg or UpdateModels failure,
78// modeled here by a stub coordinator) still yields a reliable terminal
79// RunComplete for the run's RunID. Without it, a `crush run` caller
80// blocking on that RunID would hang because the lossy TypeAgentError
81// event is not a guaranteed terminal signal.
82func TestRunAgent_PreRunErrorPublishesTerminalRunComplete(t *testing.T) {
83 t.Parallel()
84 b, _ := newTestBackend(t)
85 runErr := errors.New("update models failed")
86 ws := insertRunCompleteWorkspace(t, b, context.Background(), &errorCoordinator{err: runErr})
87
88 subCtx, cancel := context.WithCancel(context.Background())
89 defer cancel()
90 ch := ws.RunCompletions().Subscribe(subCtx)
91
92 err := b.SendMessage(ws.ID, proto.AgentMessage{SessionID: "S1", RunID: "run-1", Prompt: "hi"})
93 require.NoError(t, err)
94
95 select {
96 case ev := <-ch:
97 require.Equal(t, "run-1", ev.Payload.RunID,
98 "the terminal RunComplete must carry the dispatched RunID")
99 require.Equal(t, "S1", ev.Payload.SessionID)
100 require.Equal(t, runErr.Error(), ev.Payload.Error,
101 "the fallback terminal event must be marked errored")
102 require.False(t, ev.Payload.Cancelled)
103 case <-time.After(2 * time.Second):
104 t.Fatal("no terminal RunComplete published for a pre-run error; a run waiter would hang")
105 }
106}
107
108// TestRunAgent_NoFallbackWhenCoordinatorPublished ensures the fallback
109// is suppressed when the coordinator already emitted the run's
110// authoritative terminal RunComplete, so callers never observe a
111// duplicate terminal event for the same RunID.
112func TestRunAgent_NoFallbackWhenCoordinatorPublished(t *testing.T) {
113 t.Parallel()
114 b, _ := newTestBackend(t)
115 runErr := errors.New("stream failed after publishing terminal event")
116 ws := insertRunCompleteWorkspace(t, b, context.Background(),
117 &errorCoordinator{err: runErr, markPublished: true})
118
119 subCtx, cancel := context.WithCancel(context.Background())
120 defer cancel()
121 ch := ws.RunCompletions().Subscribe(subCtx)
122
123 err := b.SendMessage(ws.ID, proto.AgentMessage{SessionID: "S1", RunID: "run-1", Prompt: "hi"})
124 require.NoError(t, err)
125
126 // Wait for the dispatched run goroutine to return so any publish
127 // has already happened.
128 ws.runWG.Wait()
129
130 select {
131 case ev := <-ch:
132 t.Fatalf("runAgent published a duplicate terminal RunComplete: %+v", ev.Payload)
133 case <-time.After(200 * time.Millisecond):
134 }
135}
136
137// TestRunAgent_CancellationPublishesNoErrorTerminal verifies that a
138// context.Canceled result from RunAccepted produces no errored terminal
139// RunComplete from runAgent: cancellation is sessionAgent.Run's
140// responsibility (it publishes the cancelled marker) and the dispatcher
141// must not synthesize an error terminal for it.
142func TestRunAgent_CancellationPublishesNoErrorTerminal(t *testing.T) {
143 t.Parallel()
144 b, _ := newTestBackend(t)
145 ws := insertRunCompleteWorkspace(t, b, context.Background(),
146 &errorCoordinator{err: context.Canceled})
147
148 subCtx, cancel := context.WithCancel(context.Background())
149 defer cancel()
150 ch := ws.RunCompletions().Subscribe(subCtx)
151
152 err := b.SendMessage(ws.ID, proto.AgentMessage{SessionID: "S1", RunID: "run-1", Prompt: "hi"})
153 require.NoError(t, err)
154
155 ws.runWG.Wait()
156
157 select {
158 case ev := <-ch:
159 t.Fatalf("cancellation must not publish a terminal RunComplete: %+v", ev.Payload)
160 case <-time.After(200 * time.Millisecond):
161 }
162}