From 843498810325aa5d123aea442590aa11b559b5c9 Mon Sep 17 00:00:00 2001 From: Ayman Bagabas Date: Tue, 23 Sep 2025 16:29:14 -0400 Subject: [PATCH] fix(client): client event streaming types and config and permission handling --- internal/client/client.go | 5 +- internal/client/proto.go | 191 +++++++++++++++++++++++++++++++++++--- 2 files changed, 180 insertions(+), 16 deletions(-) diff --git a/internal/client/client.go b/internal/client/client.go index 3a29182cec2d7f3d1be7851809329eaf238d737f..54156344136b7ad18816285521e4f53ac58b5c27 100644 --- a/internal/client/client.go +++ b/internal/client/client.go @@ -43,6 +43,7 @@ func NewClient(path, network, address string) (*Client, error) { } h := &http.Client{ Transport: tr, + Timeout: 0, // we need this to be 0 for long-lived connections and SSE streams } hasher := sha256.New() hasher.Write([]byte(path)) @@ -64,8 +65,8 @@ func (c *Client) Path() string { return c.path } -// GetConfig retrieves the server's configuration via RPC. -func (c *Client) GetConfig() (*config.Config, error) { +// GetGlobalConfig retrieves the server's configuration via RPC. +func (c *Client) GetGlobalConfig() (*config.Config, error) { var cfg config.Config rsp, err := c.h.Get("http://localhost/v1/config") if err != nil { diff --git a/internal/client/proto.go b/internal/client/proto.go index d7c03eadb0a05382e6c45b0316027227720af2bf..64dc542b307ff454fc7cf7b037e3c2eedb22c491 100644 --- a/internal/client/proto.go +++ b/internal/client/proto.go @@ -13,9 +13,11 @@ import ( "time" "github.com/charmbracelet/crush/internal/app" + "github.com/charmbracelet/crush/internal/config" "github.com/charmbracelet/crush/internal/history" "github.com/charmbracelet/crush/internal/message" "github.com/charmbracelet/crush/internal/proto" + "github.com/charmbracelet/crush/internal/pubsub" "github.com/charmbracelet/crush/internal/session" "github.com/charmbracelet/x/powernap/pkg/lsp/protocol" ) @@ -26,18 +28,25 @@ func (c *Client) SubscribeEvents(ctx context.Context) (<-chan any, error) { if err != nil { return nil, fmt.Errorf("failed to create request: %w", err) } - rsp, err := c.h.Do(r) - if err != nil { - return nil, fmt.Errorf("failed to subscribe to events: %w", err) - } - if rsp.StatusCode != http.StatusOK { - rsp.Body.Close() - return nil, fmt.Errorf("failed to subscribe to events: status code %d", rsp.StatusCode) - } + + r.Header.Set("Accept", "text/event-stream") + r.Header.Set("Cache-Control", "no-cache") + r.Header.Set("Connection", "keep-alive") go func() { + rsp, err := c.h.Do(r) + if err != nil { + slog.Error("subscribing to events", "error", err) + return + } + defer rsp.Body.Close() + if rsp.StatusCode != http.StatusOK { + slog.Error("subscribing to events", "status_code", rsp.StatusCode) + return + } + scr := bufio.NewReader(rsp.Body) for { line, err := scr.ReadBytes('\n') @@ -63,17 +72,67 @@ func (c *Client) SubscribeEvents(ctx context.Context) (<-chan any, error) { data = bytes.TrimSpace(data) - var event any + var event pubsub.Event[any] if err := json.Unmarshal(data, &event); err != nil { slog.Error("unmarshaling event", "error", err) continue } - select { - case events <- event: - case <-ctx.Done(): - close(events) - return + type alias pubsub.Event[any] + aux := &struct { + Payload json.RawMessage `json:"payload"` + *alias + }{ + alias: (*alias)(&event), + } + + if err := json.Unmarshal(data, &aux); err != nil { + slog.Error("unmarshaling event payload", "error", err) + continue + } + + var p pubsub.Payload + if err := json.Unmarshal(aux.Payload, &p); err != nil { + slog.Error("unmarshaling event payload", "error", err) + continue + } + + switch p.Type { + case pubsub.PayloadTypeLSPEvent: + var e pubsub.Event[proto.LSPEvent] + _ = json.Unmarshal(data, &e) + sendEvent(ctx, events, e) + case pubsub.PayloadTypeMCPEvent: + var e pubsub.Event[proto.MCPEvent] + _ = json.Unmarshal(data, &e) + sendEvent(ctx, events, e) + case pubsub.PayloadTypePermissionRequest: + var e pubsub.Event[proto.PermissionRequest] + _ = json.Unmarshal(data, &e) + sendEvent(ctx, events, e) + case pubsub.PayloadTypePermissionNotification: + var e pubsub.Event[proto.PermissionNotification] + _ = json.Unmarshal(data, &e) + sendEvent(ctx, events, e) + case pubsub.PayloadTypeMessage: + var e pubsub.Event[proto.Message] + _ = json.Unmarshal(data, &e) + sendEvent(ctx, events, e) + case pubsub.PayloadTypeSession: + var e pubsub.Event[proto.Session] + _ = json.Unmarshal(data, &e) + sendEvent(ctx, events, e) + case pubsub.PayloadTypeFile: + var e pubsub.Event[proto.File] + _ = json.Unmarshal(data, &e) + sendEvent(ctx, events, e) + case pubsub.PayloadTypeAgentEvent: + var e pubsub.Event[proto.AgentEvent] + _ = json.Unmarshal(data, &e) + sendEvent(ctx, events, e) + default: + slog.Warn("unknown event type", "type", p.Type) + continue } } }() @@ -81,6 +140,16 @@ func (c *Client) SubscribeEvents(ctx context.Context) (<-chan any, error) { return events, nil } +func sendEvent(ctx context.Context, evc chan any, ev any) { + slog.Info("event received", "event", fmt.Sprintf("%T %+v", ev, ev)) + select { + case evc <- ev: + case <-ctx.Done(): + close(evc) + return + } +} + func (c *Client) GetLSPDiagnostics(ctx context.Context, lsp string) (map[protocol.DocumentURI][]protocol.Diagnostic, error) { r, err := http.NewRequestWithContext(ctx, "GET", fmt.Sprintf("http://localhost/v1/instances/%s/lsps/%s/diagnostics", c.id, lsp), nil) if err != nil { @@ -213,6 +282,26 @@ func (c *Client) SendMessage(ctx context.Context, sessionID, message string, att return nil } +func (c *Client) GetAgentSessionInfo(ctx context.Context, sessionID string) (*proto.AgentSession, error) { + r, err := http.NewRequestWithContext(ctx, "GET", fmt.Sprintf("http://localhost/v1/instances/%s/agent/sessions/%s", c.id, sessionID), nil) + if err != nil { + return nil, fmt.Errorf("failed to create request: %w", err) + } + rsp, err := c.h.Do(r) + if err != nil { + return nil, fmt.Errorf("failed to get session agent info: %w", err) + } + defer rsp.Body.Close() + if rsp.StatusCode != http.StatusOK { + return nil, fmt.Errorf("failed to get session agent info: status code %d", rsp.StatusCode) + } + var info proto.AgentSession + if err := json.NewDecoder(rsp.Body).Decode(&info); err != nil { + return nil, fmt.Errorf("failed to decode session agent info: %w", err) + } + return &info, nil +} + func (c *Client) AgentSummarizeSession(ctx context.Context, sessionID string) error { r, err := http.NewRequestWithContext(ctx, "POST", fmt.Sprintf("http://localhost/v1/instances/%s/agent/sessions/%s/summarize", c.id, sessionID), nil) if err != nil { @@ -346,6 +435,80 @@ func (c *Client) ListSessions(ctx context.Context) ([]session.Session, error) { return sessions, nil } +func (c *Client) GrantPermission(ctx context.Context, req proto.PermissionGrant) error { + r, err := http.NewRequestWithContext(ctx, "POST", fmt.Sprintf("http://localhost/v1/instances/%s/permissions/grant", c.id), jsonBody(req)) + if err != nil { + return fmt.Errorf("failed to create request: %w", err) + } + r.Header.Set("Content-Type", "application/json") + rsp, err := c.h.Do(r) + if err != nil { + return fmt.Errorf("failed to grant permission: %w", err) + } + defer rsp.Body.Close() + if rsp.StatusCode != http.StatusOK { + return fmt.Errorf("failed to grant permission: status code %d", rsp.StatusCode) + } + return nil +} + +func (c *Client) SetPermissionsSkipRequests(ctx context.Context, skip bool) error { + r, err := http.NewRequestWithContext(ctx, "POST", fmt.Sprintf("http://localhost/v1/instances/%s/permissions/skip", c.id), jsonBody(proto.PermissionSkipRequest{Skip: skip})) + if err != nil { + return fmt.Errorf("failed to create request: %w", err) + } + r.Header.Set("Content-Type", "application/json") + rsp, err := c.h.Do(r) + if err != nil { + return fmt.Errorf("failed to set permissions skip requests: %w", err) + } + defer rsp.Body.Close() + if rsp.StatusCode != http.StatusOK { + return fmt.Errorf("failed to set permissions skip requests: status code %d", rsp.StatusCode) + } + return nil +} + +func (c *Client) GetPermissionsSkipRequests(ctx context.Context) (bool, error) { + r, err := http.NewRequestWithContext(ctx, "GET", fmt.Sprintf("http://localhost/v1/instances/%s/permissions/skip", c.id), nil) + if err != nil { + return false, fmt.Errorf("failed to create request: %w", err) + } + rsp, err := c.h.Do(r) + if err != nil { + return false, fmt.Errorf("failed to get permissions skip requests: %w", err) + } + defer rsp.Body.Close() + if rsp.StatusCode != http.StatusOK { + return false, fmt.Errorf("failed to get permissions skip requests: status code %d", rsp.StatusCode) + } + var skip proto.PermissionSkipRequest + if err := json.NewDecoder(rsp.Body).Decode(&skip); err != nil { + return false, fmt.Errorf("failed to decode permissions skip requests: %w", err) + } + return skip.Skip, nil +} + +func (c *Client) GetConfig(ctx context.Context) (*config.Config, error) { + r, err := http.NewRequestWithContext(ctx, "GET", fmt.Sprintf("http://localhost/v1/instances/%s/config", c.id), nil) + if err != nil { + return nil, fmt.Errorf("failed to create request: %w", err) + } + rsp, err := c.h.Do(r) + if err != nil { + return nil, fmt.Errorf("failed to get config: %w", err) + } + defer rsp.Body.Close() + if rsp.StatusCode != http.StatusOK { + return nil, fmt.Errorf("failed to get config: status code %d", rsp.StatusCode) + } + var cfg config.Config + if err := json.NewDecoder(rsp.Body).Decode(&cfg); err != nil { + return nil, fmt.Errorf("failed to decode config: %w", err) + } + return &cfg, nil +} + func (c *Client) CreateInstance(ctx context.Context, ins proto.Instance) (*proto.Instance, error) { r, err := http.NewRequestWithContext(ctx, "POST", "http://localhost/v1/instances", jsonBody(ins)) if err != nil {