1package backend
2
3import (
4 "context"
5 "errors"
6
7 "github.com/charmbracelet/crush/internal/agent"
8 "github.com/charmbracelet/crush/internal/agent/notify"
9 "github.com/charmbracelet/crush/internal/config"
10 "github.com/charmbracelet/crush/internal/proto"
11 "github.com/charmbracelet/crush/internal/pubsub"
12)
13
14// SendMessage validates and accepts a prompt for the workspace's agent,
15// then dispatches the run on a goroutine bound to the workspace context
16// and returns immediately. It does not wait for the LLM turn to
17// complete: the run's lifetime is owned by the workspace, not by the
18// caller. Errors from the dispatched run reach observers through the
19// agent event channels (a notify.TypeAgentError notification), not
20// through this return value.
21//
22// SendMessage returns synchronously when the request cannot be accepted:
23// ErrWorkspaceNotFound if the workspace is missing, ErrAgentNotInitialized
24// if its coordinator is nil, the structural validation errors from
25// agent.ValidateCall (ErrEmptyPrompt, ErrSessionMissing) when the prompt
26// or session is missing, and ErrWorkspaceClosing if the workspace is
27// being torn down.
28func (b *Backend) SendMessage(workspaceID string, msg proto.AgentMessage) error {
29 ws, err := b.GetWorkspace(workspaceID)
30 if err != nil {
31 return err
32 }
33
34 if ws.AgentCoordinator == nil {
35 return ErrAgentNotInitialized
36 }
37
38 if err := agent.ValidateCall(agent.SessionAgentCall{
39 SessionID: msg.SessionID,
40 Prompt: msg.Prompt,
41 Attachments: proto.AttachmentsToMessage(msg.Attachments),
42 }); err != nil {
43 return err
44 }
45
46 accept := ws.AgentCoordinator.BeginAccepted(msg.SessionID)
47
48 ws.runMu.Lock()
49 if ws.closing {
50 ws.runMu.Unlock()
51 accept.Close()
52 return ErrWorkspaceClosing
53 }
54 ws.runWG.Add(1)
55 ws.runMu.Unlock()
56
57 go b.runAgent(ws, msg, accept)
58 return nil
59}
60
61// runAgent executes an accepted agent run for the workspace. It owns the
62// accept reservation (releasing it on return) and the runWG ticket added
63// by SendMessage. The run is bound to the workspace context so its
64// lifetime is independent of any client's HTTP request.
65//
66// On a non-cancel error it surfaces the failure to observers via a
67// notify.TypeAgentError notification (lossy, best-effort). That alone is
68// not a reliable terminal signal: the agent-event fan-in uses lossy
69// subscribers, so a `crush run` caller blocking on its RunID could hang
70// if the event is dropped. To guarantee termination, when msg.RunID is
71// non-empty and the coordinator did not already publish the run's
72// authoritative terminal RunComplete (e.g. the error was returned before
73// sessionAgent.Run executed, such as a readyWg or UpdateModels failure),
74// runAgent emits an errored RunComplete on the must-deliver
75// runCompletions broker so the waiter observes a deterministic terminal
76// event. context.Canceled is expected (sessionAgent.Run already
77// publishes the cancelled terminal marker) and produces no error
78// terminal event.
79//
80// When msg.RunID is non-empty it is attached to the context via
81// agent.WithRunID so the coordinator can stamp the terminal
82// notify.RunComplete event with that correlator. A run-complete marker
83// is also attached so the coordinator can report whether it published
84// the terminal event, letting runAgent avoid a duplicate fallback.
85func (b *Backend) runAgent(ws *Workspace, msg proto.AgentMessage, accept *agent.AcceptedRun) {
86 defer ws.runWG.Done()
87 defer accept.Close()
88
89 ctx := ws.ctx
90 if msg.RunID != "" {
91 ctx = agent.WithRunID(ctx, msg.RunID)
92 }
93 ctx = agent.WithRunCompleteMarker(ctx)
94
95 _, err := ws.AgentCoordinator.RunAccepted(ctx, accept, msg.SessionID, msg.Prompt, proto.AttachmentsToMessage(msg.Attachments)...)
96 if err == nil || errors.Is(err, context.Canceled) {
97 return
98 }
99
100 ws.AgentNotifications().Publish(pubsub.CreatedEvent, notify.Notification{
101 SessionID: msg.SessionID,
102 RunID: msg.RunID,
103 Type: notify.TypeAgentError,
104 Message: err.Error(),
105 })
106
107 // Reliable terminal fallback. Only needed when a RunID waiter
108 // exists and the coordinator has not already emitted the run's
109 // terminal RunComplete; otherwise this would be a duplicate.
110 if msg.RunID == "" || agent.RunCompletePublished(ctx) {
111 return
112 }
113 if rc := ws.RunCompletions(); rc != nil {
114 rc.PublishMustDeliver(ctx, pubsub.UpdatedEvent, notify.RunComplete{
115 SessionID: msg.SessionID,
116 RunID: msg.RunID,
117 Error: err.Error(),
118 })
119 }
120}
121
122// GetAgentInfo returns the agent's model and busy status.
123func (b *Backend) GetAgentInfo(workspaceID string) (proto.AgentInfo, error) {
124 ws, err := b.GetWorkspace(workspaceID)
125 if err != nil {
126 return proto.AgentInfo{}, err
127 }
128
129 var agentInfo proto.AgentInfo
130 if ws.AgentCoordinator != nil {
131 m := ws.AgentCoordinator.Model()
132 agentInfo = proto.AgentInfo{
133 Model: m.CatwalkCfg,
134 ModelCfg: m.ModelCfg,
135 IsBusy: ws.AgentCoordinator.IsBusy(),
136 IsReady: true,
137 }
138 }
139 return agentInfo, nil
140}
141
142// InitAgent initializes the coder agent for the workspace.
143func (b *Backend) InitAgent(ctx context.Context, workspaceID string) error {
144 ws, err := b.GetWorkspace(workspaceID)
145 if err != nil {
146 return err
147 }
148
149 return ws.InitCoderAgent(ctx)
150}
151
152// UpdateAgent reloads the agent model configuration.
153func (b *Backend) UpdateAgent(ctx context.Context, workspaceID string) error {
154 ws, err := b.GetWorkspace(workspaceID)
155 if err != nil {
156 return err
157 }
158
159 return ws.UpdateAgentModel(ctx)
160}
161
162// CancelSession cancels an ongoing agent operation for the given
163// session.
164func (b *Backend) CancelSession(workspaceID, sessionID string) error {
165 ws, err := b.GetWorkspace(workspaceID)
166 if err != nil {
167 return err
168 }
169
170 if ws.AgentCoordinator != nil {
171 ws.AgentCoordinator.Cancel(sessionID)
172 }
173 return nil
174}
175
176// SummarizeSession triggers a session summarization.
177func (b *Backend) SummarizeSession(ctx context.Context, workspaceID, sessionID string) error {
178 ws, err := b.GetWorkspace(workspaceID)
179 if err != nil {
180 return err
181 }
182
183 if ws.AgentCoordinator == nil {
184 return ErrAgentNotInitialized
185 }
186
187 return ws.AgentCoordinator.Summarize(ctx, sessionID)
188}
189
190// QueuedPrompts returns the number of queued prompts for the session.
191func (b *Backend) QueuedPrompts(workspaceID, sessionID string) (int, error) {
192 ws, err := b.GetWorkspace(workspaceID)
193 if err != nil {
194 return 0, err
195 }
196
197 if ws.AgentCoordinator == nil {
198 return 0, nil
199 }
200
201 return ws.AgentCoordinator.QueuedPrompts(sessionID), nil
202}
203
204// ClearQueue clears the prompt queue for the session.
205func (b *Backend) ClearQueue(workspaceID, sessionID string) error {
206 ws, err := b.GetWorkspace(workspaceID)
207 if err != nil {
208 return err
209 }
210
211 if ws.AgentCoordinator != nil {
212 ws.AgentCoordinator.ClearQueue(sessionID)
213 }
214 return nil
215}
216
217// QueuedPromptsList returns the list of queued prompt strings for a
218// session.
219func (b *Backend) QueuedPromptsList(workspaceID, sessionID string) ([]string, error) {
220 ws, err := b.GetWorkspace(workspaceID)
221 if err != nil {
222 return nil, err
223 }
224
225 if ws.AgentCoordinator == nil {
226 return nil, nil
227 }
228
229 return ws.AgentCoordinator.QueuedPromptsList(sessionID), nil
230}
231
232// GetDefaultSmallModel returns the default small model for a provider.
233func (b *Backend) GetDefaultSmallModel(workspaceID, providerID string) (config.SelectedModel, error) {
234 ws, err := b.GetWorkspace(workspaceID)
235 if err != nil {
236 return config.SelectedModel{}, err
237 }
238
239 return ws.GetDefaultSmallModel(providerID), nil
240}