1package server
2
3import (
4 "encoding/json"
5 "errors"
6 "fmt"
7 "log/slog"
8
9 "github.com/charmbracelet/crush/internal/agent/notify"
10 "github.com/charmbracelet/crush/internal/agent/tools/mcp"
11 "github.com/charmbracelet/crush/internal/app"
12 "github.com/charmbracelet/crush/internal/backend"
13 "github.com/charmbracelet/crush/internal/history"
14 "github.com/charmbracelet/crush/internal/message"
15 "github.com/charmbracelet/crush/internal/permission"
16 "github.com/charmbracelet/crush/internal/proto"
17 "github.com/charmbracelet/crush/internal/pubsub"
18 "github.com/charmbracelet/crush/internal/session"
19 "github.com/charmbracelet/crush/internal/skills"
20)
21
22// wrapEvent converts a raw tea.Msg (a pubsub.Event[T] from the app
23// event fan-in) into a pubsub.Payload envelope with the correct
24// PayloadType discriminator and a proto-typed inner payload that has
25// proper JSON tags. Returns nil if the event type is unrecognized.
26func wrapEvent(ev any) *pubsub.Payload {
27 switch e := ev.(type) {
28 case pubsub.Event[app.LSPEvent]:
29 return envelope(pubsub.PayloadTypeLSPEvent, pubsub.Event[proto.LSPEvent]{
30 Type: e.Type,
31 Payload: proto.LSPEvent{
32 Type: proto.LSPEventType(e.Payload.Type),
33 Name: e.Payload.Name,
34 State: e.Payload.State,
35 Error: e.Payload.Error,
36 DiagnosticCount: e.Payload.DiagnosticCount,
37 },
38 })
39 case pubsub.Event[mcp.Event]:
40 return envelope(pubsub.PayloadTypeMCPEvent, pubsub.Event[proto.MCPEvent]{
41 Type: e.Type,
42 Payload: proto.MCPEvent{
43 Type: mcpEventTypeToProto(e.Payload.Type),
44 Name: e.Payload.Name,
45 State: proto.MCPState(e.Payload.State),
46 Error: e.Payload.Error,
47 ToolCount: e.Payload.Counts.Tools,
48 },
49 })
50 case pubsub.Event[permission.PermissionRequest]:
51 return envelope(pubsub.PayloadTypePermissionRequest, pubsub.Event[proto.PermissionRequest]{
52 Type: e.Type,
53 Payload: proto.PermissionRequest{
54 ID: e.Payload.ID,
55 SessionID: e.Payload.SessionID,
56 ToolCallID: e.Payload.ToolCallID,
57 ToolName: e.Payload.ToolName,
58 Description: e.Payload.Description,
59 Action: e.Payload.Action,
60 Path: e.Payload.Path,
61 Params: e.Payload.Params,
62 },
63 })
64 case pubsub.Event[permission.PermissionNotification]:
65 return envelope(pubsub.PayloadTypePermissionNotification, pubsub.Event[proto.PermissionNotification]{
66 Type: e.Type,
67 Payload: proto.PermissionNotification{
68 ToolCallID: e.Payload.ToolCallID,
69 Granted: e.Payload.Granted,
70 Denied: e.Payload.Denied,
71 },
72 })
73 case pubsub.Event[message.Message]:
74 return envelope(pubsub.PayloadTypeMessage, pubsub.Event[proto.Message]{
75 Type: e.Type,
76 Payload: messageToProto(e.Payload),
77 })
78 case pubsub.Event[session.Session]:
79 return envelope(pubsub.PayloadTypeSession, pubsub.Event[proto.Session]{
80 Type: e.Type,
81 Payload: sessionToProto(e.Payload),
82 })
83 case pubsub.Event[history.File]:
84 return envelope(pubsub.PayloadTypeFile, pubsub.Event[proto.File]{
85 Type: e.Type,
86 Payload: fileToProto(e.Payload),
87 })
88 case pubsub.Event[notify.Notification]:
89 payload := proto.AgentEvent{
90 SessionID: e.Payload.SessionID,
91 SessionTitle: e.Payload.SessionTitle,
92 RunID: e.Payload.RunID,
93 Type: proto.AgentEventType(e.Payload.Type),
94 }
95 if e.Payload.Type == notify.TypeAgentError {
96 payload.Type = proto.AgentEventTypeError
97 payload.Error = errors.New(e.Payload.Message)
98 }
99 return envelope(pubsub.PayloadTypeAgentEvent, pubsub.Event[proto.AgentEvent]{
100 Type: e.Type,
101 Payload: payload,
102 })
103 case pubsub.Event[notify.RunComplete]:
104 return envelope(pubsub.PayloadTypeRunComplete, pubsub.Event[proto.RunComplete]{
105 Type: e.Type,
106 Payload: proto.RunComplete{
107 SessionID: e.Payload.SessionID,
108 RunID: e.Payload.RunID,
109 MessageID: e.Payload.MessageID,
110 Text: e.Payload.Text,
111 Error: e.Payload.Error,
112 Cancelled: e.Payload.Cancelled,
113 },
114 })
115 case pubsub.Event[proto.ConfigChanged]:
116 return envelope(pubsub.PayloadTypeConfigChanged, e)
117 case pubsub.Event[skills.Event]:
118 return envelope(pubsub.PayloadTypeSkillsEvent, pubsub.Event[proto.SkillsEvent]{
119 Type: e.Type,
120 Payload: skillsEventToProto(e.Payload),
121 })
122 default:
123 slog.Warn("Unrecognized event type for SSE wrapping", "type", fmt.Sprintf("%T", ev))
124 return nil
125 }
126}
127
128// envelope marshals the inner event and wraps it in a pubsub.Payload.
129func envelope(payloadType pubsub.PayloadType, inner any) *pubsub.Payload {
130 raw, err := json.Marshal(inner)
131 if err != nil {
132 slog.Error("Failed to marshal event payload", "error", err)
133 return nil
134 }
135 return &pubsub.Payload{
136 Type: payloadType,
137 Payload: raw,
138 }
139}
140
141func mcpEventTypeToProto(t mcp.EventType) proto.MCPEventType {
142 switch t {
143 case mcp.EventStateChanged:
144 return proto.MCPEventStateChanged
145 case mcp.EventToolsListChanged:
146 return proto.MCPEventToolsListChanged
147 case mcp.EventPromptsListChanged:
148 return proto.MCPEventPromptsListChanged
149 case mcp.EventResourcesListChanged:
150 return proto.MCPEventResourcesListChanged
151 default:
152 return proto.MCPEventStateChanged
153 }
154}
155
156func sessionToProto(s session.Session) proto.Session {
157 return proto.Session{
158 ID: s.ID,
159 ParentSessionID: s.ParentSessionID,
160 Title: s.Title,
161 SummaryMessageID: s.SummaryMessageID,
162 MessageCount: s.MessageCount,
163 PromptTokens: s.PromptTokens,
164 CompletionTokens: s.CompletionTokens,
165 Cost: s.Cost,
166 Todos: todosToProto(s.Todos),
167 CreatedAt: s.CreatedAt,
168 UpdatedAt: s.UpdatedAt,
169 }
170}
171
172// isSessionBusy reports whether the given workspace has an in-flight
173// agent run for sessionID. It tolerates a nil workspace (treating it as
174// "not busy") so REST handlers can pass GetWorkspace's result through
175// unconditionally — the workspace lookup error is already surfaced by
176// the prior ListSessions/GetSession call when relevant.
177func isSessionBusy(ws *backend.Workspace, sessionID string) bool {
178 if ws == nil || ws.App == nil || ws.AgentCoordinator == nil {
179 return false
180 }
181 return ws.AgentCoordinator.IsSessionBusy(sessionID)
182}
183
184// attachedClients returns the number of clients currently viewing
185// sessionID in ws. Hold-only clients (streams == 0) do not contribute.
186// A nil workspace is treated as zero so handlers can pass GetWorkspace's
187// result through without an extra guard.
188func attachedClients(ws *backend.Workspace, sessionID string) int {
189 if ws == nil {
190 return 0
191 }
192 return ws.AttachedClientsForSession(sessionID)
193}
194
195func todosToProto(todos []session.Todo) []proto.Todo {
196 if len(todos) == 0 {
197 return nil
198 }
199 out := make([]proto.Todo, len(todos))
200 for i, t := range todos {
201 out[i] = proto.Todo{
202 Content: t.Content,
203 Status: string(t.Status),
204 ActiveForm: t.ActiveForm,
205 }
206 }
207 return out
208}
209
210func fileToProto(f history.File) proto.File {
211 return proto.File{
212 ID: f.ID,
213 SessionID: f.SessionID,
214 Path: f.Path,
215 Content: f.Content,
216 Version: f.Version,
217 CreatedAt: f.CreatedAt,
218 UpdatedAt: f.UpdatedAt,
219 }
220}
221
222func messageToProto(m message.Message) proto.Message {
223 msg := proto.Message{
224 ID: m.ID,
225 SessionID: m.SessionID,
226 Role: proto.MessageRole(m.Role),
227 Model: m.Model,
228 Provider: m.Provider,
229 CreatedAt: m.CreatedAt,
230 UpdatedAt: m.UpdatedAt,
231 }
232
233 for _, p := range m.Parts {
234 switch v := p.(type) {
235 case message.TextContent:
236 msg.Parts = append(msg.Parts, proto.TextContent{Text: v.Text})
237 case message.ReasoningContent:
238 msg.Parts = append(msg.Parts, proto.ReasoningContent{
239 Thinking: v.Thinking,
240 Signature: v.Signature,
241 StartedAt: v.StartedAt,
242 FinishedAt: v.FinishedAt,
243 })
244 case message.ToolCall:
245 msg.Parts = append(msg.Parts, proto.ToolCall{
246 ID: v.ID,
247 Name: v.Name,
248 Input: v.Input,
249 Finished: v.Finished,
250 })
251 case message.ToolResult:
252 msg.Parts = append(msg.Parts, proto.ToolResult{
253 ToolCallID: v.ToolCallID,
254 Name: v.Name,
255 Content: v.Content,
256 Data: v.Data,
257 MIMEType: v.MIMEType,
258 Metadata: v.Metadata,
259 IsError: v.IsError,
260 })
261 case message.Finish:
262 msg.Parts = append(msg.Parts, proto.Finish{
263 Reason: proto.FinishReason(v.Reason),
264 Time: v.Time,
265 Message: v.Message,
266 Details: v.Details,
267 })
268 case message.ImageURLContent:
269 msg.Parts = append(msg.Parts, proto.ImageURLContent{URL: v.URL, Detail: v.Detail})
270 case message.BinaryContent:
271 msg.Parts = append(msg.Parts, proto.BinaryContent{Path: v.Path, MIMEType: v.MIMEType, Data: v.Data})
272 }
273 }
274
275 return msg
276}
277
278// skillsEventToProto converts a skills.Event into its wire form. Errors
279// are flattened to strings because error does not round-trip over JSON.
280func skillsEventToProto(e skills.Event) proto.SkillsEvent {
281 if len(e.States) == 0 {
282 return proto.SkillsEvent{}
283 }
284 out := proto.SkillsEvent{States: make([]proto.SkillState, len(e.States))}
285 for i, s := range e.States {
286 entry := proto.SkillState{
287 Name: s.Name,
288 Path: s.Path,
289 State: proto.SkillDiscoveryState(s.State),
290 }
291 if s.Err != nil {
292 entry.Error = s.Err.Error()
293 }
294 out.States[i] = entry
295 }
296 return out
297}
298
299func messagesToProto(msgs []message.Message) []proto.Message {
300 out := make([]proto.Message, len(msgs))
301 for i, m := range msgs {
302 out[i] = messageToProto(m)
303 }
304 return out
305}