fix(client): client event streaming types and config and permission handling

Ayman Bagabas created

Change summary

internal/client/client.go |   5 
internal/client/proto.go  | 191 +++++++++++++++++++++++++++++++++++++---
2 files changed, 180 insertions(+), 16 deletions(-)

Detailed changes

internal/client/client.go 🔗

@@ -43,6 +43,7 @@ func NewClient(path, network, address string) (*Client, error) {
 	}
 	h := &http.Client{
 		Transport: tr,
+		Timeout:   0, // we need this to be 0 for long-lived connections and SSE streams
 	}
 	hasher := sha256.New()
 	hasher.Write([]byte(path))
@@ -64,8 +65,8 @@ func (c *Client) Path() string {
 	return c.path
 }
 
-// GetConfig retrieves the server's configuration via RPC.
-func (c *Client) GetConfig() (*config.Config, error) {
+// GetGlobalConfig retrieves the server's configuration via RPC.
+func (c *Client) GetGlobalConfig() (*config.Config, error) {
 	var cfg config.Config
 	rsp, err := c.h.Get("http://localhost/v1/config")
 	if err != nil {

internal/client/proto.go 🔗

@@ -13,9 +13,11 @@ import (
 	"time"
 
 	"github.com/charmbracelet/crush/internal/app"
+	"github.com/charmbracelet/crush/internal/config"
 	"github.com/charmbracelet/crush/internal/history"
 	"github.com/charmbracelet/crush/internal/message"
 	"github.com/charmbracelet/crush/internal/proto"
+	"github.com/charmbracelet/crush/internal/pubsub"
 	"github.com/charmbracelet/crush/internal/session"
 	"github.com/charmbracelet/x/powernap/pkg/lsp/protocol"
 )
@@ -26,18 +28,25 @@ func (c *Client) SubscribeEvents(ctx context.Context) (<-chan any, error) {
 	if err != nil {
 		return nil, fmt.Errorf("failed to create request: %w", err)
 	}
-	rsp, err := c.h.Do(r)
-	if err != nil {
-		return nil, fmt.Errorf("failed to subscribe to events: %w", err)
-	}
-	if rsp.StatusCode != http.StatusOK {
-		rsp.Body.Close()
-		return nil, fmt.Errorf("failed to subscribe to events: status code %d", rsp.StatusCode)
-	}
+
+	r.Header.Set("Accept", "text/event-stream")
+	r.Header.Set("Cache-Control", "no-cache")
+	r.Header.Set("Connection", "keep-alive")
 
 	go func() {
+		rsp, err := c.h.Do(r)
+		if err != nil {
+			slog.Error("subscribing to events", "error", err)
+			return
+		}
+
 		defer rsp.Body.Close()
 
+		if rsp.StatusCode != http.StatusOK {
+			slog.Error("subscribing to events", "status_code", rsp.StatusCode)
+			return
+		}
+
 		scr := bufio.NewReader(rsp.Body)
 		for {
 			line, err := scr.ReadBytes('\n')
@@ -63,17 +72,67 @@ func (c *Client) SubscribeEvents(ctx context.Context) (<-chan any, error) {
 
 			data = bytes.TrimSpace(data)
 
-			var event any
+			var event pubsub.Event[any]
 			if err := json.Unmarshal(data, &event); err != nil {
 				slog.Error("unmarshaling event", "error", err)
 				continue
 			}
 
-			select {
-			case events <- event:
-			case <-ctx.Done():
-				close(events)
-				return
+			type alias pubsub.Event[any]
+			aux := &struct {
+				Payload json.RawMessage `json:"payload"`
+				*alias
+			}{
+				alias: (*alias)(&event),
+			}
+
+			if err := json.Unmarshal(data, &aux); err != nil {
+				slog.Error("unmarshaling event payload", "error", err)
+				continue
+			}
+
+			var p pubsub.Payload
+			if err := json.Unmarshal(aux.Payload, &p); err != nil {
+				slog.Error("unmarshaling event payload", "error", err)
+				continue
+			}
+
+			switch p.Type {
+			case pubsub.PayloadTypeLSPEvent:
+				var e pubsub.Event[proto.LSPEvent]
+				_ = json.Unmarshal(data, &e)
+				sendEvent(ctx, events, e)
+			case pubsub.PayloadTypeMCPEvent:
+				var e pubsub.Event[proto.MCPEvent]
+				_ = json.Unmarshal(data, &e)
+				sendEvent(ctx, events, e)
+			case pubsub.PayloadTypePermissionRequest:
+				var e pubsub.Event[proto.PermissionRequest]
+				_ = json.Unmarshal(data, &e)
+				sendEvent(ctx, events, e)
+			case pubsub.PayloadTypePermissionNotification:
+				var e pubsub.Event[proto.PermissionNotification]
+				_ = json.Unmarshal(data, &e)
+				sendEvent(ctx, events, e)
+			case pubsub.PayloadTypeMessage:
+				var e pubsub.Event[proto.Message]
+				_ = json.Unmarshal(data, &e)
+				sendEvent(ctx, events, e)
+			case pubsub.PayloadTypeSession:
+				var e pubsub.Event[proto.Session]
+				_ = json.Unmarshal(data, &e)
+				sendEvent(ctx, events, e)
+			case pubsub.PayloadTypeFile:
+				var e pubsub.Event[proto.File]
+				_ = json.Unmarshal(data, &e)
+				sendEvent(ctx, events, e)
+			case pubsub.PayloadTypeAgentEvent:
+				var e pubsub.Event[proto.AgentEvent]
+				_ = json.Unmarshal(data, &e)
+				sendEvent(ctx, events, e)
+			default:
+				slog.Warn("unknown event type", "type", p.Type)
+				continue
 			}
 		}
 	}()
@@ -81,6 +140,16 @@ func (c *Client) SubscribeEvents(ctx context.Context) (<-chan any, error) {
 	return events, nil
 }
 
+func sendEvent(ctx context.Context, evc chan any, ev any) {
+	slog.Info("event received", "event", fmt.Sprintf("%T %+v", ev, ev))
+	select {
+	case evc <- ev:
+	case <-ctx.Done():
+		close(evc)
+		return
+	}
+}
+
 func (c *Client) GetLSPDiagnostics(ctx context.Context, lsp string) (map[protocol.DocumentURI][]protocol.Diagnostic, error) {
 	r, err := http.NewRequestWithContext(ctx, "GET", fmt.Sprintf("http://localhost/v1/instances/%s/lsps/%s/diagnostics", c.id, lsp), nil)
 	if err != nil {
@@ -213,6 +282,26 @@ func (c *Client) SendMessage(ctx context.Context, sessionID, message string, att
 	return nil
 }
 
+func (c *Client) GetAgentSessionInfo(ctx context.Context, sessionID string) (*proto.AgentSession, error) {
+	r, err := http.NewRequestWithContext(ctx, "GET", fmt.Sprintf("http://localhost/v1/instances/%s/agent/sessions/%s", c.id, sessionID), nil)
+	if err != nil {
+		return nil, fmt.Errorf("failed to create request: %w", err)
+	}
+	rsp, err := c.h.Do(r)
+	if err != nil {
+		return nil, fmt.Errorf("failed to get session agent info: %w", err)
+	}
+	defer rsp.Body.Close()
+	if rsp.StatusCode != http.StatusOK {
+		return nil, fmt.Errorf("failed to get session agent info: status code %d", rsp.StatusCode)
+	}
+	var info proto.AgentSession
+	if err := json.NewDecoder(rsp.Body).Decode(&info); err != nil {
+		return nil, fmt.Errorf("failed to decode session agent info: %w", err)
+	}
+	return &info, nil
+}
+
 func (c *Client) AgentSummarizeSession(ctx context.Context, sessionID string) error {
 	r, err := http.NewRequestWithContext(ctx, "POST", fmt.Sprintf("http://localhost/v1/instances/%s/agent/sessions/%s/summarize", c.id, sessionID), nil)
 	if err != nil {
@@ -346,6 +435,80 @@ func (c *Client) ListSessions(ctx context.Context) ([]session.Session, error) {
 	return sessions, nil
 }
 
+func (c *Client) GrantPermission(ctx context.Context, req proto.PermissionGrant) error {
+	r, err := http.NewRequestWithContext(ctx, "POST", fmt.Sprintf("http://localhost/v1/instances/%s/permissions/grant", c.id), jsonBody(req))
+	if err != nil {
+		return fmt.Errorf("failed to create request: %w", err)
+	}
+	r.Header.Set("Content-Type", "application/json")
+	rsp, err := c.h.Do(r)
+	if err != nil {
+		return fmt.Errorf("failed to grant permission: %w", err)
+	}
+	defer rsp.Body.Close()
+	if rsp.StatusCode != http.StatusOK {
+		return fmt.Errorf("failed to grant permission: status code %d", rsp.StatusCode)
+	}
+	return nil
+}
+
+func (c *Client) SetPermissionsSkipRequests(ctx context.Context, skip bool) error {
+	r, err := http.NewRequestWithContext(ctx, "POST", fmt.Sprintf("http://localhost/v1/instances/%s/permissions/skip", c.id), jsonBody(proto.PermissionSkipRequest{Skip: skip}))
+	if err != nil {
+		return fmt.Errorf("failed to create request: %w", err)
+	}
+	r.Header.Set("Content-Type", "application/json")
+	rsp, err := c.h.Do(r)
+	if err != nil {
+		return fmt.Errorf("failed to set permissions skip requests: %w", err)
+	}
+	defer rsp.Body.Close()
+	if rsp.StatusCode != http.StatusOK {
+		return fmt.Errorf("failed to set permissions skip requests: status code %d", rsp.StatusCode)
+	}
+	return nil
+}
+
+func (c *Client) GetPermissionsSkipRequests(ctx context.Context) (bool, error) {
+	r, err := http.NewRequestWithContext(ctx, "GET", fmt.Sprintf("http://localhost/v1/instances/%s/permissions/skip", c.id), nil)
+	if err != nil {
+		return false, fmt.Errorf("failed to create request: %w", err)
+	}
+	rsp, err := c.h.Do(r)
+	if err != nil {
+		return false, fmt.Errorf("failed to get permissions skip requests: %w", err)
+	}
+	defer rsp.Body.Close()
+	if rsp.StatusCode != http.StatusOK {
+		return false, fmt.Errorf("failed to get permissions skip requests: status code %d", rsp.StatusCode)
+	}
+	var skip proto.PermissionSkipRequest
+	if err := json.NewDecoder(rsp.Body).Decode(&skip); err != nil {
+		return false, fmt.Errorf("failed to decode permissions skip requests: %w", err)
+	}
+	return skip.Skip, nil
+}
+
+func (c *Client) GetConfig(ctx context.Context) (*config.Config, error) {
+	r, err := http.NewRequestWithContext(ctx, "GET", fmt.Sprintf("http://localhost/v1/instances/%s/config", c.id), nil)
+	if err != nil {
+		return nil, fmt.Errorf("failed to create request: %w", err)
+	}
+	rsp, err := c.h.Do(r)
+	if err != nil {
+		return nil, fmt.Errorf("failed to get config: %w", err)
+	}
+	defer rsp.Body.Close()
+	if rsp.StatusCode != http.StatusOK {
+		return nil, fmt.Errorf("failed to get config: status code %d", rsp.StatusCode)
+	}
+	var cfg config.Config
+	if err := json.NewDecoder(rsp.Body).Decode(&cfg); err != nil {
+		return nil, fmt.Errorf("failed to decode config: %w", err)
+	}
+	return &cfg, nil
+}
+
 func (c *Client) CreateInstance(ctx context.Context, ins proto.Instance) (*proto.Instance, error) {
 	r, err := http.NewRequestWithContext(ctx, "POST", "http://localhost/v1/instances", jsonBody(ins))
 	if err != nil {