agent.go

  1package backend
  2
  3import (
  4	"context"
  5	"errors"
  6
  7	"github.com/charmbracelet/crush/internal/agent"
  8	"github.com/charmbracelet/crush/internal/agent/notify"
  9	"github.com/charmbracelet/crush/internal/config"
 10	"github.com/charmbracelet/crush/internal/proto"
 11	"github.com/charmbracelet/crush/internal/pubsub"
 12)
 13
 14// SendMessage validates and accepts a prompt for the workspace's agent,
 15// then dispatches the run on a goroutine bound to the workspace context
 16// and returns immediately. It does not wait for the LLM turn to
 17// complete: the run's lifetime is owned by the workspace, not by the
 18// caller. Errors from the dispatched run reach observers through the
 19// agent event channels (a notify.TypeAgentError notification), not
 20// through this return value.
 21//
 22// SendMessage returns synchronously when the request cannot be accepted:
 23// ErrWorkspaceNotFound if the workspace is missing, ErrAgentNotInitialized
 24// if its coordinator is nil, the structural validation errors from
 25// agent.ValidateCall (ErrEmptyPrompt, ErrSessionMissing) when the prompt
 26// or session is missing, and ErrWorkspaceClosing if the workspace is
 27// being torn down.
 28func (b *Backend) SendMessage(workspaceID string, msg proto.AgentMessage) error {
 29	ws, err := b.GetWorkspace(workspaceID)
 30	if err != nil {
 31		return err
 32	}
 33
 34	if ws.AgentCoordinator == nil {
 35		return ErrAgentNotInitialized
 36	}
 37
 38	if err := agent.ValidateCall(agent.SessionAgentCall{
 39		SessionID:   msg.SessionID,
 40		Prompt:      msg.Prompt,
 41		Attachments: proto.AttachmentsToMessage(msg.Attachments),
 42	}); err != nil {
 43		return err
 44	}
 45
 46	accept := ws.AgentCoordinator.BeginAccepted(msg.SessionID)
 47
 48	ws.runMu.Lock()
 49	if ws.closing {
 50		ws.runMu.Unlock()
 51		accept.Close()
 52		return ErrWorkspaceClosing
 53	}
 54	ws.runWG.Add(1)
 55	ws.runMu.Unlock()
 56
 57	go b.runAgent(ws, msg, accept)
 58	return nil
 59}
 60
 61// runAgent executes an accepted agent run for the workspace. It owns the
 62// accept reservation (releasing it on return) and the runWG ticket added
 63// by SendMessage. The run is bound to the workspace context so its
 64// lifetime is independent of any client's HTTP request.
 65//
 66// On a non-cancel error it surfaces the failure to observers via a
 67// notify.TypeAgentError notification (lossy, best-effort). That alone is
 68// not a reliable terminal signal: the agent-event fan-in uses lossy
 69// subscribers, so a `crush run` caller blocking on its RunID could hang
 70// if the event is dropped. To guarantee termination, when msg.RunID is
 71// non-empty and the coordinator did not already publish the run's
 72// authoritative terminal RunComplete (e.g. the error was returned before
 73// sessionAgent.Run executed, such as a readyWg or UpdateModels failure),
 74// runAgent emits an errored RunComplete on the must-deliver
 75// runCompletions broker so the waiter observes a deterministic terminal
 76// event. context.Canceled is expected (sessionAgent.Run already
 77// publishes the cancelled terminal marker) and produces no error
 78// terminal event.
 79//
 80// When msg.RunID is non-empty it is attached to the context via
 81// agent.WithRunID so the coordinator can stamp the terminal
 82// notify.RunComplete event with that correlator. A run-complete marker
 83// is also attached so the coordinator can report whether it published
 84// the terminal event, letting runAgent avoid a duplicate fallback.
 85func (b *Backend) runAgent(ws *Workspace, msg proto.AgentMessage, accept *agent.AcceptedRun) {
 86	defer ws.runWG.Done()
 87	defer accept.Close()
 88
 89	ctx := ws.ctx
 90	if msg.RunID != "" {
 91		ctx = agent.WithRunID(ctx, msg.RunID)
 92	}
 93	ctx = agent.WithRunCompleteMarker(ctx)
 94
 95	_, err := ws.AgentCoordinator.RunAccepted(ctx, accept, msg.SessionID, msg.Prompt, proto.AttachmentsToMessage(msg.Attachments)...)
 96	if err == nil || errors.Is(err, context.Canceled) {
 97		return
 98	}
 99
100	ws.AgentNotifications().Publish(pubsub.CreatedEvent, notify.Notification{
101		SessionID: msg.SessionID,
102		RunID:     msg.RunID,
103		Type:      notify.TypeAgentError,
104		Message:   err.Error(),
105	})
106
107	// Reliable terminal fallback. Only needed when a RunID waiter
108	// exists and the coordinator has not already emitted the run's
109	// terminal RunComplete; otherwise this would be a duplicate.
110	if msg.RunID == "" || agent.RunCompletePublished(ctx) {
111		return
112	}
113	if rc := ws.RunCompletions(); rc != nil {
114		rc.PublishMustDeliver(ctx, pubsub.UpdatedEvent, notify.RunComplete{
115			SessionID: msg.SessionID,
116			RunID:     msg.RunID,
117			Error:     err.Error(),
118		})
119	}
120}
121
122// GetAgentInfo returns the agent's model and busy status.
123func (b *Backend) GetAgentInfo(workspaceID string) (proto.AgentInfo, error) {
124	ws, err := b.GetWorkspace(workspaceID)
125	if err != nil {
126		return proto.AgentInfo{}, err
127	}
128
129	var agentInfo proto.AgentInfo
130	if ws.AgentCoordinator != nil {
131		m := ws.AgentCoordinator.Model()
132		agentInfo = proto.AgentInfo{
133			Model:    m.CatwalkCfg,
134			ModelCfg: m.ModelCfg,
135			IsBusy:   ws.AgentCoordinator.IsBusy(),
136			IsReady:  true,
137		}
138	}
139	return agentInfo, nil
140}
141
142// InitAgent initializes the coder agent for the workspace.
143func (b *Backend) InitAgent(ctx context.Context, workspaceID string) error {
144	ws, err := b.GetWorkspace(workspaceID)
145	if err != nil {
146		return err
147	}
148
149	return ws.InitCoderAgent(ctx)
150}
151
152// UpdateAgent reloads the agent model configuration.
153func (b *Backend) UpdateAgent(ctx context.Context, workspaceID string) error {
154	ws, err := b.GetWorkspace(workspaceID)
155	if err != nil {
156		return err
157	}
158
159	return ws.UpdateAgentModel(ctx)
160}
161
162// CancelSession cancels an ongoing agent operation for the given
163// session.
164func (b *Backend) CancelSession(workspaceID, sessionID string) error {
165	ws, err := b.GetWorkspace(workspaceID)
166	if err != nil {
167		return err
168	}
169
170	if ws.AgentCoordinator != nil {
171		ws.AgentCoordinator.Cancel(sessionID)
172	}
173	return nil
174}
175
176// SummarizeSession triggers a session summarization.
177func (b *Backend) SummarizeSession(ctx context.Context, workspaceID, sessionID string) error {
178	ws, err := b.GetWorkspace(workspaceID)
179	if err != nil {
180		return err
181	}
182
183	if ws.AgentCoordinator == nil {
184		return ErrAgentNotInitialized
185	}
186
187	return ws.AgentCoordinator.Summarize(ctx, sessionID)
188}
189
190// QueuedPrompts returns the number of queued prompts for the session.
191func (b *Backend) QueuedPrompts(workspaceID, sessionID string) (int, error) {
192	ws, err := b.GetWorkspace(workspaceID)
193	if err != nil {
194		return 0, err
195	}
196
197	if ws.AgentCoordinator == nil {
198		return 0, nil
199	}
200
201	return ws.AgentCoordinator.QueuedPrompts(sessionID), nil
202}
203
204// ClearQueue clears the prompt queue for the session.
205func (b *Backend) ClearQueue(workspaceID, sessionID string) error {
206	ws, err := b.GetWorkspace(workspaceID)
207	if err != nil {
208		return err
209	}
210
211	if ws.AgentCoordinator != nil {
212		ws.AgentCoordinator.ClearQueue(sessionID)
213	}
214	return nil
215}
216
217// QueuedPromptsList returns the list of queued prompt strings for a
218// session.
219func (b *Backend) QueuedPromptsList(workspaceID, sessionID string) ([]string, error) {
220	ws, err := b.GetWorkspace(workspaceID)
221	if err != nil {
222		return nil, err
223	}
224
225	if ws.AgentCoordinator == nil {
226		return nil, nil
227	}
228
229	return ws.AgentCoordinator.QueuedPromptsList(sessionID), nil
230}
231
232// GetDefaultSmallModel returns the default small model for a provider.
233func (b *Backend) GetDefaultSmallModel(workspaceID, providerID string) (config.SelectedModel, error) {
234	ws, err := b.GetWorkspace(workspaceID)
235	if err != nil {
236		return config.SelectedModel{}, err
237	}
238
239	return ws.GetDefaultSmallModel(providerID), nil
240}