client.go

  1package daemonclient
  2
  3import (
  4	"encoding/json"
  5	"fmt"
  6	"net"
  7	"sync"
  8	"sync/atomic"
  9
 10	"github.com/floatpane/matcha/daemonrpc"
 11)
 12
 13// Client connects to the matcha daemon over a Unix domain socket.
 14type Client struct {
 15	conn    *daemonrpc.Conn
 16	nextID  atomic.Uint64
 17	pending map[uint64]chan *daemonrpc.Response
 18	mu      sync.Mutex
 19	events  chan *daemonrpc.Event
 20	done    chan struct{}
 21}
 22
 23// Dial connects to the daemon socket.
 24func Dial() (*Client, error) {
 25	sockPath := daemonrpc.SocketPath()
 26	conn, err := net.Dial("unix", sockPath) //nolint:noctx
 27	if err != nil {
 28		return nil, fmt.Errorf("connect to daemon: %w", err)
 29	}
 30
 31	c := &Client{
 32		conn:    daemonrpc.NewConn(conn),
 33		pending: make(map[uint64]chan *daemonrpc.Response),
 34		events:  make(chan *daemonrpc.Event, 64),
 35		done:    make(chan struct{}),
 36	}
 37
 38	go c.readLoop()
 39	return c, nil
 40}
 41
 42// Call makes a synchronous RPC call to the daemon.
 43func (c *Client) Call(method string, params interface{}, result interface{}) error {
 44	id := c.nextID.Add(1)
 45
 46	// Marshal params.
 47	var rawParams json.RawMessage
 48	if params != nil {
 49		var err error
 50		rawParams, err = json.Marshal(params)
 51		if err != nil {
 52			return fmt.Errorf("marshal params: %w", err)
 53		}
 54	}
 55
 56	// Register pending response channel.
 57	ch := make(chan *daemonrpc.Response, 1)
 58	c.mu.Lock()
 59	c.pending[id] = ch
 60	c.mu.Unlock()
 61
 62	defer func() {
 63		c.mu.Lock()
 64		delete(c.pending, id)
 65		c.mu.Unlock()
 66	}()
 67
 68	// Send request.
 69	req := &daemonrpc.Request{
 70		ID:     id,
 71		Method: method,
 72		Params: rawParams,
 73	}
 74	if err := c.conn.Send(req); err != nil {
 75		return fmt.Errorf("send request: %w", err)
 76	}
 77
 78	// Wait for response.
 79	select {
 80	case resp := <-ch:
 81		if resp.Error != nil {
 82			return resp.Error
 83		}
 84		if result != nil && resp.Result != nil {
 85			return json.Unmarshal(resp.Result, result)
 86		}
 87		return nil
 88	case <-c.done:
 89		return fmt.Errorf("connection closed")
 90	}
 91}
 92
 93// Events returns the channel that receives push events from the daemon.
 94func (c *Client) Events() <-chan *daemonrpc.Event {
 95	return c.events
 96}
 97
 98// Close closes the connection to the daemon.
 99func (c *Client) Close() error {
100	select {
101	case <-c.done:
102		return nil
103	default:
104		close(c.done)
105	}
106	return c.conn.Close()
107}
108
109// readLoop reads messages from the daemon and dispatches them.
110func (c *Client) readLoop() {
111	defer close(c.events)
112
113	for {
114		msg, err := c.conn.ReceiveMessage()
115		if err != nil {
116			select {
117			case <-c.done:
118			default:
119				close(c.done)
120			}
121			return
122		}
123
124		if msg.Response != nil {
125			c.mu.Lock()
126			ch, ok := c.pending[msg.Response.ID]
127			c.mu.Unlock()
128			if ok {
129				ch <- msg.Response
130			}
131		}
132
133		if msg.Event != nil {
134			select {
135			case c.events <- msg.Event:
136			default:
137				// Drop event if channel full.
138			}
139		}
140	}
141}
142
143// Ping checks if the daemon is responsive.
144func (c *Client) Ping() error {
145	var result daemonrpc.PingResult
146	return c.Call(daemonrpc.MethodPing, nil, &result)
147}
148
149// Status returns daemon status info.
150func (c *Client) Status() (*daemonrpc.StatusResult, error) {
151	var result daemonrpc.StatusResult
152	if err := c.Call(daemonrpc.MethodGetStatus, nil, &result); err != nil {
153		return nil, err
154	}
155	return &result, nil
156}