1package client
  2
  3import (
  4	"bufio"
  5	"bytes"
  6	"context"
  7	"encoding/json"
  8	"errors"
  9	"fmt"
 10	"io"
 11	"log/slog"
 12	"net/http"
 13	"time"
 14
 15	"github.com/charmbracelet/crush/internal/app"
 16	"github.com/charmbracelet/crush/internal/config"
 17	"github.com/charmbracelet/crush/internal/history"
 18	"github.com/charmbracelet/crush/internal/message"
 19	"github.com/charmbracelet/crush/internal/proto"
 20	"github.com/charmbracelet/crush/internal/pubsub"
 21	"github.com/charmbracelet/crush/internal/session"
 22	"github.com/charmbracelet/x/powernap/pkg/lsp/protocol"
 23)
 24
 25func (c *Client) SubscribeEvents(ctx context.Context, id string) (<-chan any, error) {
 26	events := make(chan any, 100)
 27	rsp, err := c.get(ctx, fmt.Sprintf("/instances/%s/events", id), nil, http.Header{
 28		"Accept":        []string{"text/event-stream"},
 29		"Cache-Control": []string{"no-cache"},
 30		"Connection":    []string{"keep-alive"},
 31	})
 32	if err != nil {
 33		return nil, fmt.Errorf("failed to subscribe to events: %w", err)
 34	}
 35
 36	if rsp.StatusCode != http.StatusOK {
 37		return nil, fmt.Errorf("failed to subscribe to events: status code %d", rsp.StatusCode)
 38	}
 39
 40	go func() {
 41		defer rsp.Body.Close()
 42
 43		scr := bufio.NewReader(rsp.Body)
 44		for {
 45			line, err := scr.ReadBytes('\n')
 46			if errors.Is(err, io.EOF) {
 47				break
 48			}
 49			if err != nil {
 50				slog.Error("reading from events stream", "error", err)
 51				time.Sleep(time.Second * 2)
 52				continue
 53			}
 54			line = bytes.TrimSpace(line)
 55			if len(line) == 0 {
 56				// End of an event
 57				continue
 58			}
 59
 60			data, ok := bytes.CutPrefix(line, []byte("data:"))
 61			if !ok {
 62				slog.Warn("invalid event format", "line", string(line))
 63				continue
 64			}
 65
 66			data = bytes.TrimSpace(data)
 67
 68			var event pubsub.Event[any]
 69			if err := json.Unmarshal(data, &event); err != nil {
 70				slog.Error("unmarshaling event", "error", err)
 71				continue
 72			}
 73
 74			type alias pubsub.Event[any]
 75			aux := &struct {
 76				Payload json.RawMessage `json:"payload"`
 77				*alias
 78			}{
 79				alias: (*alias)(&event),
 80			}
 81
 82			if err := json.Unmarshal(data, &aux); err != nil {
 83				slog.Error("unmarshaling event payload", "error", err)
 84				continue
 85			}
 86
 87			var p pubsub.Payload
 88			if err := json.Unmarshal(aux.Payload, &p); err != nil {
 89				slog.Error("unmarshaling event payload", "error", err)
 90				continue
 91			}
 92
 93			switch p.Type {
 94			case pubsub.PayloadTypeLSPEvent:
 95				var e pubsub.Event[proto.LSPEvent]
 96				_ = json.Unmarshal(data, &e)
 97				sendEvent(ctx, events, e)
 98			case pubsub.PayloadTypeMCPEvent:
 99				var e pubsub.Event[proto.MCPEvent]
100				_ = json.Unmarshal(data, &e)
101				sendEvent(ctx, events, e)
102			case pubsub.PayloadTypePermissionRequest:
103				var e pubsub.Event[proto.PermissionRequest]
104				_ = json.Unmarshal(data, &e)
105				sendEvent(ctx, events, e)
106			case pubsub.PayloadTypePermissionNotification:
107				var e pubsub.Event[proto.PermissionNotification]
108				_ = json.Unmarshal(data, &e)
109				sendEvent(ctx, events, e)
110			case pubsub.PayloadTypeMessage:
111				var e pubsub.Event[proto.Message]
112				_ = json.Unmarshal(data, &e)
113				sendEvent(ctx, events, e)
114			case pubsub.PayloadTypeSession:
115				var e pubsub.Event[proto.Session]
116				_ = json.Unmarshal(data, &e)
117				sendEvent(ctx, events, e)
118			case pubsub.PayloadTypeFile:
119				var e pubsub.Event[proto.File]
120				_ = json.Unmarshal(data, &e)
121				sendEvent(ctx, events, e)
122			case pubsub.PayloadTypeAgentEvent:
123				var e pubsub.Event[proto.AgentEvent]
124				_ = json.Unmarshal(data, &e)
125				sendEvent(ctx, events, e)
126			default:
127				slog.Warn("unknown event type", "type", p.Type)
128				continue
129			}
130		}
131	}()
132
133	return events, nil
134}
135
136func sendEvent(ctx context.Context, evc chan any, ev any) {
137	slog.Info("event received", "event", fmt.Sprintf("%T %+v", ev, ev))
138	select {
139	case evc <- ev:
140	case <-ctx.Done():
141		close(evc)
142		return
143	}
144}
145
146func (c *Client) GetLSPDiagnostics(ctx context.Context, id string, lsp string) (map[protocol.DocumentURI][]protocol.Diagnostic, error) {
147	rsp, err := c.get(ctx, fmt.Sprintf("/instances/%s/lsps/%s/diagnostics", id, lsp), nil, nil)
148	if err != nil {
149		return nil, fmt.Errorf("failed to get LSP diagnostics: %w", err)
150	}
151	defer rsp.Body.Close()
152	if rsp.StatusCode != http.StatusOK {
153		return nil, fmt.Errorf("failed to get LSP diagnostics: status code %d", rsp.StatusCode)
154	}
155	var diagnostics map[protocol.DocumentURI][]protocol.Diagnostic
156	if err := json.NewDecoder(rsp.Body).Decode(&diagnostics); err != nil {
157		return nil, fmt.Errorf("failed to decode LSP diagnostics: %w", err)
158	}
159	return diagnostics, nil
160}
161
162func (c *Client) GetLSPs(ctx context.Context, id string) (map[string]app.LSPClientInfo, error) {
163	rsp, err := c.get(ctx, fmt.Sprintf("/instances/%s/lsps", id), nil, nil)
164	if err != nil {
165		return nil, fmt.Errorf("failed to get LSPs: %w", err)
166	}
167	defer rsp.Body.Close()
168	if rsp.StatusCode != http.StatusOK {
169		return nil, fmt.Errorf("failed to get LSPs: status code %d", rsp.StatusCode)
170	}
171	var lsps map[string]app.LSPClientInfo
172	if err := json.NewDecoder(rsp.Body).Decode(&lsps); err != nil {
173		return nil, fmt.Errorf("failed to decode LSPs: %w", err)
174	}
175	return lsps, nil
176}
177
178func (c *Client) GetAgentSessionQueuedPrompts(ctx context.Context, id string, sessionID string) (int, error) {
179	rsp, err := c.get(ctx, fmt.Sprintf("/instances/%s/agent/sessions/%s/prompts/queued", id, sessionID), nil, nil)
180	if err != nil {
181		return 0, fmt.Errorf("failed to get session agent queued prompts: %w", err)
182	}
183	defer rsp.Body.Close()
184	if rsp.StatusCode != http.StatusOK {
185		return 0, fmt.Errorf("failed to get session agent queued prompts: status code %d", rsp.StatusCode)
186	}
187	var count int
188	if err := json.NewDecoder(rsp.Body).Decode(&count); err != nil {
189		return 0, fmt.Errorf("failed to decode session agent queued prompts: %w", err)
190	}
191	return count, nil
192}
193
194func (c *Client) ClearAgentSessionQueuedPrompts(ctx context.Context, id string, sessionID string) error {
195	rsp, err := c.post(ctx, fmt.Sprintf("/instances/%s/agent/sessions/%s/prompts/clear", id, sessionID), nil, nil, nil)
196	if err != nil {
197		return fmt.Errorf("failed to clear session agent queued prompts: %w", err)
198	}
199	defer rsp.Body.Close()
200	if rsp.StatusCode != http.StatusOK {
201		return fmt.Errorf("failed to clear session agent queued prompts: status code %d", rsp.StatusCode)
202	}
203	return nil
204}
205
206func (c *Client) GetAgentInfo(ctx context.Context, id string) (*proto.AgentInfo, error) {
207	rsp, err := c.get(ctx, fmt.Sprintf("/instances/%s/agent", id), nil, nil)
208	if err != nil {
209		return nil, fmt.Errorf("failed to get agent status: %w", err)
210	}
211	defer rsp.Body.Close()
212	if rsp.StatusCode != http.StatusOK {
213		return nil, fmt.Errorf("failed to get agent status: status code %d", rsp.StatusCode)
214	}
215	var info proto.AgentInfo
216	if err := json.NewDecoder(rsp.Body).Decode(&info); err != nil {
217		return nil, fmt.Errorf("failed to decode agent status: %w", err)
218	}
219	return &info, nil
220}
221
222func (c *Client) UpdateAgent(ctx context.Context, id string) error {
223	rsp, err := c.post(ctx, fmt.Sprintf("/instances/%s/agent/update", id), nil, nil, nil)
224	if err != nil {
225		return fmt.Errorf("failed to update agent: %w", err)
226	}
227	defer rsp.Body.Close()
228	if rsp.StatusCode != http.StatusOK {
229		return fmt.Errorf("failed to update agent: status code %d", rsp.StatusCode)
230	}
231	return nil
232}
233
234func (c *Client) SendMessage(ctx context.Context, id string, sessionID, message string, attchments ...message.Attachment) error {
235	rsp, err := c.post(ctx, fmt.Sprintf("/instances/%s/agent", id), nil, jsonBody(proto.AgentMessage{
236		SessionID:   sessionID,
237		Prompt:      message,
238		Attachments: attchments,
239	}), http.Header{"Content-Type": []string{"application/json"}})
240	if err != nil {
241		return fmt.Errorf("failed to send message to agent: %w", err)
242	}
243	defer rsp.Body.Close()
244	if rsp.StatusCode != http.StatusOK {
245		return fmt.Errorf("failed to send message to agent: status code %d", rsp.StatusCode)
246	}
247	return nil
248}
249
250func (c *Client) GetAgentSessionInfo(ctx context.Context, id string, sessionID string) (*proto.AgentSession, error) {
251	rsp, err := c.get(ctx, fmt.Sprintf("/instances/%s/agent/sessions/%s", id, sessionID), nil, nil)
252	if err != nil {
253		return nil, fmt.Errorf("failed to get session agent info: %w", err)
254	}
255	defer rsp.Body.Close()
256	if rsp.StatusCode != http.StatusOK {
257		return nil, fmt.Errorf("failed to get session agent info: status code %d", rsp.StatusCode)
258	}
259	var info proto.AgentSession
260	if err := json.NewDecoder(rsp.Body).Decode(&info); err != nil {
261		return nil, fmt.Errorf("failed to decode session agent info: %w", err)
262	}
263	return &info, nil
264}
265
266func (c *Client) AgentSummarizeSession(ctx context.Context, id string, sessionID string) error {
267	rsp, err := c.post(ctx, fmt.Sprintf("/instances/%s/agent/sessions/%s/summarize", id, sessionID), nil, nil, nil)
268	if err != nil {
269		return fmt.Errorf("failed to summarize session: %w", err)
270	}
271	defer rsp.Body.Close()
272	if rsp.StatusCode != http.StatusOK {
273		return fmt.Errorf("failed to summarize session: status code %d", rsp.StatusCode)
274	}
275	return nil
276}
277
278func (c *Client) ListMessages(ctx context.Context, id string, sessionID string) ([]message.Message, error) {
279	rsp, err := c.get(ctx, fmt.Sprintf("/instances/%s/sessions/%s/messages", id, sessionID), nil, nil)
280	if err != nil {
281		return nil, fmt.Errorf("failed to get messages: %w", err)
282	}
283	defer rsp.Body.Close()
284	if rsp.StatusCode != http.StatusOK {
285		return nil, fmt.Errorf("failed to get messages: status code %d", rsp.StatusCode)
286	}
287	var messages []message.Message
288	if err := json.NewDecoder(rsp.Body).Decode(&messages); err != nil {
289		return nil, fmt.Errorf("failed to decode messages: %w", err)
290	}
291	return messages, nil
292}
293
294func (c *Client) GetSession(ctx context.Context, id string, sessionID string) (*session.Session, error) {
295	rsp, err := c.get(ctx, fmt.Sprintf("/instances/%s/sessions/%s", id, sessionID), nil, nil)
296	if err != nil {
297		return nil, fmt.Errorf("failed to get session: %w", err)
298	}
299	defer rsp.Body.Close()
300	if rsp.StatusCode != http.StatusOK {
301		return nil, fmt.Errorf("failed to get session: status code %d", rsp.StatusCode)
302	}
303	var sess session.Session
304	if err := json.NewDecoder(rsp.Body).Decode(&sess); err != nil {
305		return nil, fmt.Errorf("failed to decode session: %w", err)
306	}
307	return &sess, nil
308}
309
310func (c *Client) InitiateAgentProcessing(ctx context.Context, id string) error {
311	rsp, err := c.post(ctx, fmt.Sprintf("/instances/%s/agent/init", id), nil, nil, nil)
312	if err != nil {
313		return fmt.Errorf("failed to initiate session agent processing: %w", err)
314	}
315	defer rsp.Body.Close()
316	if rsp.StatusCode != http.StatusOK {
317		return fmt.Errorf("failed to initiate session agent processing: status code %d", rsp.StatusCode)
318	}
319	return nil
320}
321
322func (c *Client) ListSessionHistoryFiles(ctx context.Context, id string, sessionID string) ([]history.File, error) {
323	rsp, err := c.get(ctx, fmt.Sprintf("/instances/%s/sessions/%s/history", id, sessionID), nil, nil)
324	if err != nil {
325		return nil, fmt.Errorf("failed to get session history files: %w", err)
326	}
327	defer rsp.Body.Close()
328	if rsp.StatusCode != http.StatusOK {
329		return nil, fmt.Errorf("failed to get session history files: status code %d", rsp.StatusCode)
330	}
331	var files []history.File
332	if err := json.NewDecoder(rsp.Body).Decode(&files); err != nil {
333		return nil, fmt.Errorf("failed to decode session history files: %w", err)
334	}
335	return files, nil
336}
337
338func (c *Client) CreateSession(ctx context.Context, id string, title string) (*session.Session, error) {
339	rsp, err := c.post(ctx, fmt.Sprintf("/instances/%s/sessions", id), nil, jsonBody(session.Session{Title: title}), http.Header{"Content-Type": []string{"application/json"}})
340	if err != nil {
341		return nil, fmt.Errorf("failed to create session: %w", err)
342	}
343	defer rsp.Body.Close()
344	if rsp.StatusCode != http.StatusOK {
345		return nil, fmt.Errorf("failed to create session: status code %d", rsp.StatusCode)
346	}
347	var sess session.Session
348	if err := json.NewDecoder(rsp.Body).Decode(&sess); err != nil {
349		return nil, fmt.Errorf("failed to decode session: %w", err)
350	}
351	return &sess, nil
352}
353
354func (c *Client) ListSessions(ctx context.Context, id string) ([]session.Session, error) {
355	rsp, err := c.get(ctx, fmt.Sprintf("/instances/%s/sessions", id), nil, nil)
356	if err != nil {
357		return nil, fmt.Errorf("failed to get sessions: %w", err)
358	}
359	defer rsp.Body.Close()
360	if rsp.StatusCode != http.StatusOK {
361		return nil, fmt.Errorf("failed to get sessions: status code %d", rsp.StatusCode)
362	}
363	var sessions []session.Session
364	if err := json.NewDecoder(rsp.Body).Decode(&sessions); err != nil {
365		return nil, fmt.Errorf("failed to decode sessions: %w", err)
366	}
367	return sessions, nil
368}
369
370func (c *Client) GrantPermission(ctx context.Context, id string, req proto.PermissionGrant) error {
371	rsp, err := c.post(ctx, fmt.Sprintf("/instances/%s/permissions/grant", id), nil, jsonBody(req), http.Header{"Content-Type": []string{"application/json"}})
372	if err != nil {
373		return fmt.Errorf("failed to grant permission: %w", err)
374	}
375	defer rsp.Body.Close()
376	if rsp.StatusCode != http.StatusOK {
377		return fmt.Errorf("failed to grant permission: status code %d", rsp.StatusCode)
378	}
379	return nil
380}
381
382func (c *Client) SetPermissionsSkipRequests(ctx context.Context, id string, skip bool) error {
383	rsp, err := c.post(ctx, fmt.Sprintf("/instances/%s/permissions/skip", id), nil, jsonBody(proto.PermissionSkipRequest{Skip: skip}), http.Header{"Content-Type": []string{"application/json"}})
384	if err != nil {
385		return fmt.Errorf("failed to set permissions skip requests: %w", err)
386	}
387	defer rsp.Body.Close()
388	if rsp.StatusCode != http.StatusOK {
389		return fmt.Errorf("failed to set permissions skip requests: status code %d", rsp.StatusCode)
390	}
391	return nil
392}
393
394func (c *Client) GetPermissionsSkipRequests(ctx context.Context, id string) (bool, error) {
395	rsp, err := c.get(ctx, fmt.Sprintf("/instances/%s/permissions/skip", id), nil, nil)
396	if err != nil {
397		return false, fmt.Errorf("failed to get permissions skip requests: %w", err)
398	}
399	defer rsp.Body.Close()
400	if rsp.StatusCode != http.StatusOK {
401		return false, fmt.Errorf("failed to get permissions skip requests: status code %d", rsp.StatusCode)
402	}
403	var skip proto.PermissionSkipRequest
404	if err := json.NewDecoder(rsp.Body).Decode(&skip); err != nil {
405		return false, fmt.Errorf("failed to decode permissions skip requests: %w", err)
406	}
407	return skip.Skip, nil
408}
409
410func (c *Client) GetConfig(ctx context.Context, id string) (*config.Config, error) {
411	rsp, err := c.get(ctx, fmt.Sprintf("/instances/%s/config", id), nil, nil)
412	if err != nil {
413		return nil, fmt.Errorf("failed to get config: %w", err)
414	}
415	defer rsp.Body.Close()
416	if rsp.StatusCode != http.StatusOK {
417		return nil, fmt.Errorf("failed to get config: status code %d", rsp.StatusCode)
418	}
419	var cfg config.Config
420	if err := json.NewDecoder(rsp.Body).Decode(&cfg); err != nil {
421		return nil, fmt.Errorf("failed to decode config: %w", err)
422	}
423	return &cfg, nil
424}
425
426func (c *Client) CreateInstance(ctx context.Context, ins proto.Instance) (*proto.Instance, error) {
427	rsp, err := c.post(ctx, "instances", nil, jsonBody(ins), http.Header{"Content-Type": []string{"application/json"}})
428	if err != nil {
429		return nil, fmt.Errorf("failed to create instance: %w", err)
430	}
431	defer rsp.Body.Close()
432	if rsp.StatusCode != http.StatusOK {
433		return nil, fmt.Errorf("failed to create instance: status code %d", rsp.StatusCode)
434	}
435	var created proto.Instance
436	if err := json.NewDecoder(rsp.Body).Decode(&created); err != nil {
437		return nil, fmt.Errorf("failed to decode instance: %w", err)
438	}
439	return &created, nil
440}
441
442func (c *Client) DeleteInstance(ctx context.Context, id string) error {
443	rsp, err := c.delete(ctx, fmt.Sprintf("/instances/%s", id), nil, nil)
444	if err != nil {
445		return fmt.Errorf("failed to delete instance: %w", err)
446	}
447	defer rsp.Body.Close()
448	if rsp.StatusCode != http.StatusOK {
449		return fmt.Errorf("failed to delete instance: status code %d", rsp.StatusCode)
450	}
451	return nil
452}
453
454func jsonBody(v any) *bytes.Buffer {
455	b := new(bytes.Buffer)
456	m, _ := json.Marshal(v)
457	b.Write(m)
458	return b
459}