1package daemonclient
2
3import (
4 "encoding/json"
5 "fmt"
6 "net"
7 "sync"
8 "sync/atomic"
9
10 "github.com/floatpane/matcha/daemonrpc"
11)
12
13// Client connects to the matcha daemon over a Unix domain socket.
14type Client struct {
15 conn *daemonrpc.Conn
16 nextID atomic.Uint64
17 pending map[uint64]chan *daemonrpc.Response
18 mu sync.Mutex
19 events chan *daemonrpc.Event
20 done chan struct{}
21}
22
23// Dial connects to the daemon socket.
24func Dial() (*Client, error) {
25 sockPath := daemonrpc.SocketPath()
26 conn, err := net.Dial("unix", sockPath) //nolint:noctx
27 if err != nil {
28 return nil, fmt.Errorf("connect to daemon: %w", err)
29 }
30
31 c := &Client{
32 conn: daemonrpc.NewConn(conn),
33 pending: make(map[uint64]chan *daemonrpc.Response),
34 events: make(chan *daemonrpc.Event, 64),
35 done: make(chan struct{}),
36 }
37
38 go c.readLoop()
39 return c, nil
40}
41
42// Call makes a synchronous RPC call to the daemon.
43func (c *Client) Call(method string, params interface{}, result interface{}) error {
44 id := c.nextID.Add(1)
45
46 // Marshal params.
47 var rawParams json.RawMessage
48 if params != nil {
49 var err error
50 rawParams, err = json.Marshal(params)
51 if err != nil {
52 return fmt.Errorf("marshal params: %w", err)
53 }
54 }
55
56 // Register pending response channel.
57 ch := make(chan *daemonrpc.Response, 1)
58 c.mu.Lock()
59 c.pending[id] = ch
60 c.mu.Unlock()
61
62 defer func() {
63 c.mu.Lock()
64 delete(c.pending, id)
65 c.mu.Unlock()
66 }()
67
68 // Send request.
69 req := &daemonrpc.Request{
70 ID: id,
71 Method: method,
72 Params: rawParams,
73 }
74 if err := c.conn.Send(req); err != nil {
75 return fmt.Errorf("send request: %w", err)
76 }
77
78 // Wait for response.
79 select {
80 case resp := <-ch:
81 if resp.Error != nil {
82 return resp.Error
83 }
84 if result != nil && resp.Result != nil {
85 return json.Unmarshal(resp.Result, result)
86 }
87 return nil
88 case <-c.done:
89 return fmt.Errorf("connection closed")
90 }
91}
92
93// Events returns the channel that receives push events from the daemon.
94func (c *Client) Events() <-chan *daemonrpc.Event {
95 return c.events
96}
97
98// Close closes the connection to the daemon.
99func (c *Client) Close() error {
100 select {
101 case <-c.done:
102 return nil
103 default:
104 close(c.done)
105 }
106 return c.conn.Close()
107}
108
109// readLoop reads messages from the daemon and dispatches them.
110func (c *Client) readLoop() {
111 defer close(c.events)
112
113 for {
114 msg, err := c.conn.ReceiveMessage()
115 if err != nil {
116 select {
117 case <-c.done:
118 default:
119 close(c.done)
120 }
121 return
122 }
123
124 if msg.Response != nil {
125 c.mu.Lock()
126 ch, ok := c.pending[msg.Response.ID]
127 c.mu.Unlock()
128 if ok {
129 ch <- msg.Response
130 }
131 }
132
133 if msg.Event != nil {
134 select {
135 case c.events <- msg.Event:
136 default:
137 // Drop event if channel full.
138 }
139 }
140 }
141}
142
143// Ping checks if the daemon is responsive.
144func (c *Client) Ping() error {
145 var result daemonrpc.PingResult
146 return c.Call(daemonrpc.MethodPing, nil, &result)
147}
148
149// Status returns daemon status info.
150func (c *Client) Status() (*daemonrpc.StatusResult, error) {
151 var result daemonrpc.StatusResult
152 if err := c.Call(daemonrpc.MethodGetStatus, nil, &result); err != nil {
153 return nil, err
154 }
155 return &result, nil
156}