1package client
2
3import (
4 "bufio"
5 "bytes"
6 "context"
7 "encoding/json"
8 "errors"
9 "fmt"
10 "io"
11 "log/slog"
12 "net/http"
13 "time"
14
15 "github.com/charmbracelet/crush/internal/app"
16 "github.com/charmbracelet/crush/internal/config"
17 "github.com/charmbracelet/crush/internal/history"
18 "github.com/charmbracelet/crush/internal/message"
19 "github.com/charmbracelet/crush/internal/proto"
20 "github.com/charmbracelet/crush/internal/pubsub"
21 "github.com/charmbracelet/crush/internal/session"
22 "github.com/charmbracelet/x/powernap/pkg/lsp/protocol"
23)
24
25func (c *Client) SubscribeEvents(ctx context.Context) (<-chan any, error) {
26 events := make(chan any, 100)
27 rsp, err := c.get(ctx, fmt.Sprintf("/instances/%s/events", c.id), nil, http.Header{
28 "Accept": []string{"text/event-stream"},
29 "Cache-Control": []string{"no-cache"},
30 "Connection": []string{"keep-alive"},
31 })
32 if err != nil {
33 return nil, fmt.Errorf("failed to subscribe to events: %w", err)
34 }
35
36 if rsp.StatusCode != http.StatusOK {
37 return nil, fmt.Errorf("failed to subscribe to events: status code %d", rsp.StatusCode)
38 }
39
40 go func() {
41 defer rsp.Body.Close()
42
43 scr := bufio.NewReader(rsp.Body)
44 for {
45 line, err := scr.ReadBytes('\n')
46 if errors.Is(err, io.EOF) {
47 break
48 }
49 if err != nil {
50 slog.Error("reading from events stream", "error", err)
51 time.Sleep(time.Second * 2)
52 continue
53 }
54 line = bytes.TrimSpace(line)
55 if len(line) == 0 {
56 // End of an event
57 continue
58 }
59
60 data, ok := bytes.CutPrefix(line, []byte("data:"))
61 if !ok {
62 slog.Warn("invalid event format", "line", string(line))
63 continue
64 }
65
66 data = bytes.TrimSpace(data)
67
68 var event pubsub.Event[any]
69 if err := json.Unmarshal(data, &event); err != nil {
70 slog.Error("unmarshaling event", "error", err)
71 continue
72 }
73
74 type alias pubsub.Event[any]
75 aux := &struct {
76 Payload json.RawMessage `json:"payload"`
77 *alias
78 }{
79 alias: (*alias)(&event),
80 }
81
82 if err := json.Unmarshal(data, &aux); err != nil {
83 slog.Error("unmarshaling event payload", "error", err)
84 continue
85 }
86
87 var p pubsub.Payload
88 if err := json.Unmarshal(aux.Payload, &p); err != nil {
89 slog.Error("unmarshaling event payload", "error", err)
90 continue
91 }
92
93 switch p.Type {
94 case pubsub.PayloadTypeLSPEvent:
95 var e pubsub.Event[proto.LSPEvent]
96 _ = json.Unmarshal(data, &e)
97 sendEvent(ctx, events, e)
98 case pubsub.PayloadTypeMCPEvent:
99 var e pubsub.Event[proto.MCPEvent]
100 _ = json.Unmarshal(data, &e)
101 sendEvent(ctx, events, e)
102 case pubsub.PayloadTypePermissionRequest:
103 var e pubsub.Event[proto.PermissionRequest]
104 _ = json.Unmarshal(data, &e)
105 sendEvent(ctx, events, e)
106 case pubsub.PayloadTypePermissionNotification:
107 var e pubsub.Event[proto.PermissionNotification]
108 _ = json.Unmarshal(data, &e)
109 sendEvent(ctx, events, e)
110 case pubsub.PayloadTypeMessage:
111 var e pubsub.Event[proto.Message]
112 _ = json.Unmarshal(data, &e)
113 sendEvent(ctx, events, e)
114 case pubsub.PayloadTypeSession:
115 var e pubsub.Event[proto.Session]
116 _ = json.Unmarshal(data, &e)
117 sendEvent(ctx, events, e)
118 case pubsub.PayloadTypeFile:
119 var e pubsub.Event[proto.File]
120 _ = json.Unmarshal(data, &e)
121 sendEvent(ctx, events, e)
122 case pubsub.PayloadTypeAgentEvent:
123 var e pubsub.Event[proto.AgentEvent]
124 _ = json.Unmarshal(data, &e)
125 sendEvent(ctx, events, e)
126 default:
127 slog.Warn("unknown event type", "type", p.Type)
128 continue
129 }
130 }
131 }()
132
133 return events, nil
134}
135
136func sendEvent(ctx context.Context, evc chan any, ev any) {
137 slog.Info("event received", "event", fmt.Sprintf("%T %+v", ev, ev))
138 select {
139 case evc <- ev:
140 case <-ctx.Done():
141 close(evc)
142 return
143 }
144}
145
146func (c *Client) GetLSPDiagnostics(ctx context.Context, lsp string) (map[protocol.DocumentURI][]protocol.Diagnostic, error) {
147 rsp, err := c.get(ctx, fmt.Sprintf("/instances/%s/lsps/%s/diagnostics", c.id, lsp), nil, nil)
148 if err != nil {
149 return nil, fmt.Errorf("failed to get LSP diagnostics: %w", err)
150 }
151 defer rsp.Body.Close()
152 if rsp.StatusCode != http.StatusOK {
153 return nil, fmt.Errorf("failed to get LSP diagnostics: status code %d", rsp.StatusCode)
154 }
155 var diagnostics map[protocol.DocumentURI][]protocol.Diagnostic
156 if err := json.NewDecoder(rsp.Body).Decode(&diagnostics); err != nil {
157 return nil, fmt.Errorf("failed to decode LSP diagnostics: %w", err)
158 }
159 return diagnostics, nil
160}
161
162func (c *Client) GetLSPs(ctx context.Context) (map[string]app.LSPClientInfo, error) {
163 rsp, err := c.get(ctx, fmt.Sprintf("/instances/%s/lsps", c.id), nil, nil)
164 if err != nil {
165 return nil, fmt.Errorf("failed to get LSPs: %w", err)
166 }
167 defer rsp.Body.Close()
168 if rsp.StatusCode != http.StatusOK {
169 return nil, fmt.Errorf("failed to get LSPs: status code %d", rsp.StatusCode)
170 }
171 var lsps map[string]app.LSPClientInfo
172 if err := json.NewDecoder(rsp.Body).Decode(&lsps); err != nil {
173 return nil, fmt.Errorf("failed to decode LSPs: %w", err)
174 }
175 return lsps, nil
176}
177
178func (c *Client) GetAgentSessionQueuedPrompts(ctx context.Context, sessionID string) (int, error) {
179 rsp, err := c.get(ctx, fmt.Sprintf("/instances/%s/agent/sessions/%s/prompts/queued", c.id, sessionID), nil, nil)
180 if err != nil {
181 return 0, fmt.Errorf("failed to get session agent queued prompts: %w", err)
182 }
183 defer rsp.Body.Close()
184 if rsp.StatusCode != http.StatusOK {
185 return 0, fmt.Errorf("failed to get session agent queued prompts: status code %d", rsp.StatusCode)
186 }
187 var count int
188 if err := json.NewDecoder(rsp.Body).Decode(&count); err != nil {
189 return 0, fmt.Errorf("failed to decode session agent queued prompts: %w", err)
190 }
191 return count, nil
192}
193
194func (c *Client) ClearAgentSessionQueuedPrompts(ctx context.Context, sessionID string) error {
195 rsp, err := c.post(ctx, fmt.Sprintf("/instances/%s/agent/sessions/%s/prompts/clear", c.id, sessionID), nil, nil, nil)
196 if err != nil {
197 return fmt.Errorf("failed to clear session agent queued prompts: %w", err)
198 }
199 defer rsp.Body.Close()
200 if rsp.StatusCode != http.StatusOK {
201 return fmt.Errorf("failed to clear session agent queued prompts: status code %d", rsp.StatusCode)
202 }
203 return nil
204}
205
206func (c *Client) GetAgentInfo(ctx context.Context) (*proto.AgentInfo, error) {
207 rsp, err := c.get(ctx, fmt.Sprintf("/instances/%s/agent", c.id), nil, nil)
208 if err != nil {
209 return nil, fmt.Errorf("failed to get agent status: %w", err)
210 }
211 defer rsp.Body.Close()
212 if rsp.StatusCode != http.StatusOK {
213 return nil, fmt.Errorf("failed to get agent status: status code %d", rsp.StatusCode)
214 }
215 var info proto.AgentInfo
216 if err := json.NewDecoder(rsp.Body).Decode(&info); err != nil {
217 return nil, fmt.Errorf("failed to decode agent status: %w", err)
218 }
219 return &info, nil
220}
221
222func (c *Client) UpdateAgent(ctx context.Context) error {
223 rsp, err := c.post(ctx, fmt.Sprintf("/instances/%s/agent/update", c.id), nil, nil, nil)
224 if err != nil {
225 return fmt.Errorf("failed to update agent: %w", err)
226 }
227 defer rsp.Body.Close()
228 if rsp.StatusCode != http.StatusOK {
229 return fmt.Errorf("failed to update agent: status code %d", rsp.StatusCode)
230 }
231 return nil
232}
233
234func (c *Client) SendMessage(ctx context.Context, sessionID, message string, attchments ...message.Attachment) error {
235 rsp, err := c.post(ctx, fmt.Sprintf("/instances/%s/agent", c.id), nil, jsonBody(proto.AgentMessage{
236 SessionID: sessionID,
237 Prompt: message,
238 Attachments: attchments,
239 }), http.Header{"Content-Type": []string{"application/json"}})
240 if err != nil {
241 return fmt.Errorf("failed to send message to agent: %w", err)
242 }
243 defer rsp.Body.Close()
244 if rsp.StatusCode != http.StatusOK {
245 return fmt.Errorf("failed to send message to agent: status code %d", rsp.StatusCode)
246 }
247 return nil
248}
249
250func (c *Client) GetAgentSessionInfo(ctx context.Context, sessionID string) (*proto.AgentSession, error) {
251 rsp, err := c.get(ctx, fmt.Sprintf("/instances/%s/agent/sessions/%s", c.id, sessionID), nil, nil)
252 if err != nil {
253 return nil, fmt.Errorf("failed to get session agent info: %w", err)
254 }
255 defer rsp.Body.Close()
256 if rsp.StatusCode != http.StatusOK {
257 return nil, fmt.Errorf("failed to get session agent info: status code %d", rsp.StatusCode)
258 }
259 var info proto.AgentSession
260 if err := json.NewDecoder(rsp.Body).Decode(&info); err != nil {
261 return nil, fmt.Errorf("failed to decode session agent info: %w", err)
262 }
263 return &info, nil
264}
265
266func (c *Client) AgentSummarizeSession(ctx context.Context, sessionID string) error {
267 rsp, err := c.post(ctx, fmt.Sprintf("/instances/%s/agent/sessions/%s/summarize", c.id, sessionID), nil, nil, nil)
268 if err != nil {
269 return fmt.Errorf("failed to summarize session: %w", err)
270 }
271 defer rsp.Body.Close()
272 if rsp.StatusCode != http.StatusOK {
273 return fmt.Errorf("failed to summarize session: status code %d", rsp.StatusCode)
274 }
275 return nil
276}
277
278func (c *Client) ListMessages(ctx context.Context, sessionID string) ([]message.Message, error) {
279 rsp, err := c.get(ctx, fmt.Sprintf("/instances/%s/sessions/%s/messages", c.id, sessionID), nil, nil)
280 if err != nil {
281 return nil, fmt.Errorf("failed to get messages: %w", err)
282 }
283 defer rsp.Body.Close()
284 if rsp.StatusCode != http.StatusOK {
285 return nil, fmt.Errorf("failed to get messages: status code %d", rsp.StatusCode)
286 }
287 var messages []message.Message
288 if err := json.NewDecoder(rsp.Body).Decode(&messages); err != nil {
289 return nil, fmt.Errorf("failed to decode messages: %w", err)
290 }
291 return messages, nil
292}
293
294func (c *Client) GetSession(ctx context.Context, sessionID string) (*session.Session, error) {
295 rsp, err := c.get(ctx, fmt.Sprintf("/instances/%s/sessions/%s", c.id, sessionID), nil, nil)
296 if err != nil {
297 return nil, fmt.Errorf("failed to get session: %w", err)
298 }
299 defer rsp.Body.Close()
300 if rsp.StatusCode != http.StatusOK {
301 return nil, fmt.Errorf("failed to get session: status code %d", rsp.StatusCode)
302 }
303 var sess session.Session
304 if err := json.NewDecoder(rsp.Body).Decode(&sess); err != nil {
305 return nil, fmt.Errorf("failed to decode session: %w", err)
306 }
307 return &sess, nil
308}
309
310func (c *Client) InitiateAgentProcessing(ctx context.Context) error {
311 rsp, err := c.post(ctx, fmt.Sprintf("/instances/%s/agent/init", c.id), nil, nil, nil)
312 if err != nil {
313 return fmt.Errorf("failed to initiate session agent processing: %w", err)
314 }
315 defer rsp.Body.Close()
316 if rsp.StatusCode != http.StatusOK {
317 return fmt.Errorf("failed to initiate session agent processing: status code %d", rsp.StatusCode)
318 }
319 return nil
320}
321
322func (c *Client) ListSessionHistoryFiles(ctx context.Context, sessionID string) ([]history.File, error) {
323 rsp, err := c.get(ctx, fmt.Sprintf("/instances/%s/sessions/%s/history", c.id, sessionID), nil, nil)
324 if err != nil {
325 return nil, fmt.Errorf("failed to get session history files: %w", err)
326 }
327 defer rsp.Body.Close()
328 if rsp.StatusCode != http.StatusOK {
329 return nil, fmt.Errorf("failed to get session history files: status code %d", rsp.StatusCode)
330 }
331 var files []history.File
332 if err := json.NewDecoder(rsp.Body).Decode(&files); err != nil {
333 return nil, fmt.Errorf("failed to decode session history files: %w", err)
334 }
335 return files, nil
336}
337
338func (c *Client) CreateSession(ctx context.Context, title string) (*session.Session, error) {
339 rsp, err := c.post(ctx, fmt.Sprintf("/instances/%s/sessions", c.id), nil, jsonBody(session.Session{Title: title}), http.Header{"Content-Type": []string{"application/json"}})
340 if err != nil {
341 return nil, fmt.Errorf("failed to create session: %w", err)
342 }
343 defer rsp.Body.Close()
344 if rsp.StatusCode != http.StatusOK {
345 return nil, fmt.Errorf("failed to create session: status code %d", rsp.StatusCode)
346 }
347 var sess session.Session
348 if err := json.NewDecoder(rsp.Body).Decode(&sess); err != nil {
349 return nil, fmt.Errorf("failed to decode session: %w", err)
350 }
351 return &sess, nil
352}
353
354func (c *Client) ListSessions(ctx context.Context) ([]session.Session, error) {
355 rsp, err := c.get(ctx, fmt.Sprintf("/instances/%s/sessions", c.id), nil, nil)
356 if err != nil {
357 return nil, fmt.Errorf("failed to get sessions: %w", err)
358 }
359 defer rsp.Body.Close()
360 if rsp.StatusCode != http.StatusOK {
361 return nil, fmt.Errorf("failed to get sessions: status code %d", rsp.StatusCode)
362 }
363 var sessions []session.Session
364 if err := json.NewDecoder(rsp.Body).Decode(&sessions); err != nil {
365 return nil, fmt.Errorf("failed to decode sessions: %w", err)
366 }
367 return sessions, nil
368}
369
370func (c *Client) GrantPermission(ctx context.Context, req proto.PermissionGrant) error {
371 rsp, err := c.post(ctx, fmt.Sprintf("/instances/%s/permissions/grant", c.id), nil, jsonBody(req), http.Header{"Content-Type": []string{"application/json"}})
372 if err != nil {
373 return fmt.Errorf("failed to grant permission: %w", err)
374 }
375 defer rsp.Body.Close()
376 if rsp.StatusCode != http.StatusOK {
377 return fmt.Errorf("failed to grant permission: status code %d", rsp.StatusCode)
378 }
379 return nil
380}
381
382func (c *Client) SetPermissionsSkipRequests(ctx context.Context, skip bool) error {
383 rsp, err := c.post(ctx, fmt.Sprintf("/instances/%s/permissions/skip", c.id), nil, jsonBody(proto.PermissionSkipRequest{Skip: skip}), http.Header{"Content-Type": []string{"application/json"}})
384 if err != nil {
385 return fmt.Errorf("failed to set permissions skip requests: %w", err)
386 }
387 defer rsp.Body.Close()
388 if rsp.StatusCode != http.StatusOK {
389 return fmt.Errorf("failed to set permissions skip requests: status code %d", rsp.StatusCode)
390 }
391 return nil
392}
393
394func (c *Client) GetPermissionsSkipRequests(ctx context.Context) (bool, error) {
395 rsp, err := c.get(ctx, fmt.Sprintf("/instances/%s/permissions/skip", c.id), nil, nil)
396 if err != nil {
397 return false, fmt.Errorf("failed to get permissions skip requests: %w", err)
398 }
399 defer rsp.Body.Close()
400 if rsp.StatusCode != http.StatusOK {
401 return false, fmt.Errorf("failed to get permissions skip requests: status code %d", rsp.StatusCode)
402 }
403 var skip proto.PermissionSkipRequest
404 if err := json.NewDecoder(rsp.Body).Decode(&skip); err != nil {
405 return false, fmt.Errorf("failed to decode permissions skip requests: %w", err)
406 }
407 return skip.Skip, nil
408}
409
410func (c *Client) GetConfig(ctx context.Context) (*config.Config, error) {
411 rsp, err := c.get(ctx, fmt.Sprintf("/instances/%s/config", c.id), nil, nil)
412 if err != nil {
413 return nil, fmt.Errorf("failed to get config: %w", err)
414 }
415 defer rsp.Body.Close()
416 if rsp.StatusCode != http.StatusOK {
417 return nil, fmt.Errorf("failed to get config: status code %d", rsp.StatusCode)
418 }
419 var cfg config.Config
420 if err := json.NewDecoder(rsp.Body).Decode(&cfg); err != nil {
421 return nil, fmt.Errorf("failed to decode config: %w", err)
422 }
423 return &cfg, nil
424}
425
426func (c *Client) CreateInstance(ctx context.Context, ins proto.Instance) (*proto.Instance, error) {
427 rsp, err := c.post(ctx, "instances", nil, jsonBody(ins), http.Header{"Content-Type": []string{"application/json"}})
428 if err != nil {
429 return nil, fmt.Errorf("failed to create instance: %w", err)
430 }
431 defer rsp.Body.Close()
432 if rsp.StatusCode != http.StatusOK {
433 return nil, fmt.Errorf("failed to create instance: status code %d", rsp.StatusCode)
434 }
435 var created proto.Instance
436 if err := json.NewDecoder(rsp.Body).Decode(&created); err != nil {
437 return nil, fmt.Errorf("failed to decode instance: %w", err)
438 }
439 return &created, nil
440}
441
442func (c *Client) DeleteInstance(ctx context.Context, id string) error {
443 rsp, err := c.delete(ctx, fmt.Sprintf("/instances/%s", id), nil, nil)
444 if err != nil {
445 return fmt.Errorf("failed to delete instance: %w", err)
446 }
447 defer rsp.Body.Close()
448 if rsp.StatusCode != http.StatusOK {
449 return fmt.Errorf("failed to delete instance: status code %d", rsp.StatusCode)
450 }
451 return nil
452}
453
454func jsonBody(v any) *bytes.Buffer {
455 b := new(bytes.Buffer)
456 m, _ := json.Marshal(v)
457 b.Write(m)
458 return b
459}