handler.go

  1package daemon
  2
  3import (
  4	"context"
  5	"encoding/json"
  6	"fmt"
  7	"log"
  8	"os"
  9	"time"
 10
 11	"github.com/floatpane/matcha/daemonrpc"
 12	"github.com/google/uuid"
 13)
 14
 15// Per-handler timeouts. fetchTimeout covers reads against the upstream IMAP
 16// provider, which can return large bodies and so are given more headroom.
 17// mutateTimeout covers state-changing operations and folder listings, which
 18// are bounded by IMAP command latency rather than payload size.
 19const (
 20	fetchTimeout  = 60 * time.Second
 21	mutateTimeout = 30 * time.Second
 22)
 23
 24// decodeParams unmarshals raw JSON params into T. A nil/empty payload yields
 25// the zero value.
 26func decodeParams[T any](params json.RawMessage) (T, error) {
 27	var p T
 28	if params != nil {
 29		if err := json.Unmarshal(params, &p); err != nil {
 30			return p, err
 31		}
 32	}
 33	return p, nil
 34}
 35
 36// parseError wraps a params-decoding failure with the parse error code so the
 37// server forwards it verbatim instead of mapping to ErrCodeInternal.
 38func parseError(err error) error {
 39	return &daemonrpc.Error{Code: daemonrpc.ErrCodeParse, Message: err.Error()}
 40}
 41
 42func (d *Daemon) handlePing(_ context.Context, _ *daemonrpc.Conn, _ json.RawMessage) (any, error) {
 43	return daemonrpc.PingResult{Pong: true}, nil
 44}
 45
 46func (d *Daemon) handleGetStatus(_ context.Context, _ *daemonrpc.Conn, _ json.RawMessage) (any, error) {
 47	d.mu.RLock()
 48	accounts := make([]string, 0, len(d.config.Accounts))
 49	for _, acct := range d.config.Accounts {
 50		accounts = append(accounts, acct.Email)
 51	}
 52	d.mu.RUnlock()
 53
 54	return daemonrpc.StatusResult{
 55		Running:  true,
 56		Uptime:   int64(time.Since(d.startTime).Seconds()),
 57		Accounts: accounts,
 58		PID:      os.Getpid(),
 59	}, nil
 60}
 61
 62func (d *Daemon) handleGetAccounts(_ context.Context, _ *daemonrpc.Conn, _ json.RawMessage) (any, error) {
 63	d.mu.RLock()
 64	defer d.mu.RUnlock()
 65
 66	infos := make([]daemonrpc.AccountInfo, 0, len(d.config.Accounts))
 67	for _, acct := range d.config.Accounts {
 68		protocol := acct.Protocol
 69		if protocol == "" {
 70			protocol = "imap"
 71		}
 72		infos = append(infos, daemonrpc.AccountInfo{
 73			ID:       acct.ID,
 74			Name:     acct.Name,
 75			Email:    acct.Email,
 76			Protocol: protocol,
 77		})
 78	}
 79	return infos, nil
 80}
 81
 82func (d *Daemon) handleReloadConfig(_ context.Context, _ *daemonrpc.Conn, _ json.RawMessage) (any, error) {
 83	if err := d.ReloadConfig(); err != nil {
 84		return nil, err
 85	}
 86	return true, nil
 87}
 88
 89func (d *Daemon) handleFetchEmails(ctx context.Context, _ *daemonrpc.Conn, params json.RawMessage) (any, error) {
 90	args, err := decodeParams[daemonrpc.FetchEmailsParams](params)
 91	if err != nil {
 92		return nil, parseError(err)
 93	}
 94
 95	p, err := d.getProvider(args.AccountID)
 96	if err != nil {
 97		return nil, err
 98	}
 99
100	ctx, cancel := context.WithTimeout(ctx, fetchTimeout)
101	defer cancel()
102
103	emails, err := p.FetchEmails(ctx, args.Folder, args.Limit, args.Offset)
104	if err != nil {
105		return nil, err
106	}
107	return emails, nil
108}
109
110func (d *Daemon) handleFetchEmailBody(ctx context.Context, _ *daemonrpc.Conn, params json.RawMessage) (any, error) {
111	args, err := decodeParams[daemonrpc.FetchEmailBodyParams](params)
112	if err != nil {
113		return nil, parseError(err)
114	}
115
116	p, err := d.getProvider(args.AccountID)
117	if err != nil {
118		return nil, err
119	}
120
121	ctx, cancel := context.WithTimeout(ctx, fetchTimeout)
122	defer cancel()
123
124	body, mimeType, attachments, err := p.FetchEmailBody(ctx, args.Folder, args.UID)
125	if err != nil {
126		return nil, err
127	}
128
129	// Convert backend.Attachment to daemonrpc.AttachmentInfo for wire transfer.
130	var attInfos []daemonrpc.AttachmentInfo
131	for _, att := range attachments {
132		attInfos = append(attInfos, daemonrpc.AttachmentInfo{
133			Filename: att.Filename,
134			PartID:   att.PartID,
135			Encoding: att.Encoding,
136			MIMEType: att.MIMEType,
137		})
138	}
139
140	return daemonrpc.FetchEmailBodyResult{
141		Body:         body,
142		BodyMIMEType: mimeType,
143		Attachments:  attInfos,
144	}, nil
145}
146
147func (d *Daemon) handleDeleteEmails(ctx context.Context, _ *daemonrpc.Conn, params json.RawMessage) (any, error) {
148	args, err := decodeParams[daemonrpc.DeleteEmailsParams](params)
149	if err != nil {
150		return nil, parseError(err)
151	}
152
153	p, err := d.getProvider(args.AccountID)
154	if err != nil {
155		return nil, err
156	}
157
158	ctx, cancel := context.WithTimeout(ctx, mutateTimeout)
159	defer cancel()
160
161	if err := p.DeleteEmails(ctx, args.Folder, args.UIDs); err != nil {
162		return nil, err
163	}
164	return true, nil
165}
166
167func (d *Daemon) handleArchiveEmails(ctx context.Context, _ *daemonrpc.Conn, params json.RawMessage) (any, error) {
168	args, err := decodeParams[daemonrpc.ArchiveEmailsParams](params)
169	if err != nil {
170		return nil, parseError(err)
171	}
172
173	p, err := d.getProvider(args.AccountID)
174	if err != nil {
175		return nil, err
176	}
177
178	ctx, cancel := context.WithTimeout(ctx, mutateTimeout)
179	defer cancel()
180
181	if err := p.ArchiveEmails(ctx, args.Folder, args.UIDs); err != nil {
182		return nil, err
183	}
184	return true, nil
185}
186
187func (d *Daemon) handleMoveEmails(ctx context.Context, _ *daemonrpc.Conn, params json.RawMessage) (any, error) {
188	args, err := decodeParams[daemonrpc.MoveEmailsParams](params)
189	if err != nil {
190		return nil, parseError(err)
191	}
192
193	p, err := d.getProvider(args.AccountID)
194	if err != nil {
195		return nil, err
196	}
197
198	ctx, cancel := context.WithTimeout(ctx, mutateTimeout)
199	defer cancel()
200
201	if err := p.MoveEmails(ctx, args.UIDs, args.SourceFolder, args.DestFolder); err != nil {
202		return nil, err
203	}
204	return true, nil
205}
206
207func (d *Daemon) handleMarkRead(ctx context.Context, _ *daemonrpc.Conn, params json.RawMessage) (any, error) {
208	args, err := decodeParams[daemonrpc.MarkReadParams](params)
209	if err != nil {
210		return nil, parseError(err)
211	}
212
213	p, err := d.getProvider(args.AccountID)
214	if err != nil {
215		return nil, err
216	}
217
218	ctx, cancel := context.WithTimeout(ctx, mutateTimeout)
219	defer cancel()
220
221	for _, uid := range args.UIDs {
222		var err error
223		if args.Read {
224			err = p.MarkAsRead(ctx, args.Folder, uid)
225		} else {
226			err = p.MarkAsUnread(ctx, args.Folder, uid)
227		}
228		if err != nil {
229			log.Printf("daemon: mark read=%v %d failed: %v", args.Read, uid, err)
230		}
231	}
232	return true, nil
233}
234
235func (d *Daemon) handleFetchFolders(ctx context.Context, _ *daemonrpc.Conn, params json.RawMessage) (any, error) {
236	args, err := decodeParams[daemonrpc.FetchFoldersParams](params)
237	if err != nil {
238		return nil, parseError(err)
239	}
240
241	p, err := d.getProvider(args.AccountID)
242	if err != nil {
243		return nil, err
244	}
245
246	ctx, cancel := context.WithTimeout(ctx, mutateTimeout)
247	defer cancel()
248
249	folders, err := p.FetchFolders(ctx)
250	if err != nil {
251		return nil, err
252	}
253	return folders, nil
254}
255
256func (d *Daemon) handleRefreshFolder(ctx context.Context, _ *daemonrpc.Conn, params json.RawMessage) (any, error) {
257	args, err := decodeParams[daemonrpc.RefreshFolderParams](params)
258	if err != nil {
259		return nil, parseError(err)
260	}
261
262	// Async: fetch in background, push events when done. The server-scoped ctx
263	// outlives the request and is canceled on daemon shutdown.
264	go func() {
265		defer func() {
266			if r := recover(); r != nil {
267				log.Printf("daemon: refresh panic for account = %s folder = %s: %v", args.AccountID, args.Folder, r)
268				d.broadcastToSubscribers(args.AccountID, args.Folder, daemonrpc.EventSyncError, daemonrpc.SyncErrorEvent{
269					AccountID: args.AccountID,
270					Folder:    args.Folder,
271					Error:     fmt.Sprintf("panic: %v", r),
272				})
273			}
274		}()
275
276		p, err := d.getProvider(args.AccountID)
277		if err != nil {
278			log.Printf("daemon: refresh provider error: %v", err)
279			return
280		}
281
282		d.broadcastToSubscribers(args.AccountID, args.Folder, daemonrpc.EventSyncStarted, daemonrpc.SyncStartedEvent(args))
283
284		fetchCtx, cancel := context.WithTimeout(ctx, fetchTimeout)
285		defer cancel()
286
287		emails, err := p.FetchEmails(fetchCtx, args.Folder, 50, 0)
288		if err != nil {
289			d.broadcastToSubscribers(args.AccountID, args.Folder, daemonrpc.EventSyncError, daemonrpc.SyncErrorEvent{
290				AccountID: args.AccountID,
291				Folder:    args.Folder,
292				Error:     err.Error(),
293			})
294			return
295		}
296
297		d.broadcastToSubscribers(args.AccountID, args.Folder, daemonrpc.EventSyncComplete, daemonrpc.SyncCompleteEvent{
298			AccountID:  args.AccountID,
299			Folder:     args.Folder,
300			EmailCount: len(emails),
301		})
302	}()
303
304	return true, nil
305}
306
307func (d *Daemon) handleSubscribe(_ context.Context, conn *daemonrpc.Conn, params json.RawMessage) (any, error) {
308	args, err := decodeParams[daemonrpc.SubscribeParams](params)
309	if err != nil {
310		return nil, parseError(err)
311	}
312
313	key := args.AccountID + ":" + args.Folder
314
315	d.subMu.Lock()
316	if d.subscriptions[conn] == nil {
317		d.subscriptions[conn] = make(map[string]struct{})
318	}
319	d.subscriptions[conn][key] = struct{}{}
320	d.subMu.Unlock()
321
322	log.Printf("daemon: client subscribed to %s", key)
323	return true, nil
324}
325
326func (d *Daemon) handleUnsubscribe(_ context.Context, conn *daemonrpc.Conn, params json.RawMessage) (any, error) {
327	args, err := decodeParams[daemonrpc.UnsubscribeParams](params)
328	if err != nil {
329		return nil, parseError(err)
330	}
331
332	key := args.AccountID + ":" + args.Folder
333
334	d.subMu.Lock()
335	if subs, ok := d.subscriptions[conn]; ok {
336		delete(subs, key)
337	}
338	d.subMu.Unlock()
339
340	return true, nil
341}
342
343func (d *Daemon) handleQueueEmail(_ context.Context, _ *daemonrpc.Conn, params json.RawMessage) (any, error) {
344	args, err := decodeParams[daemonrpc.QueueEmailParams](params)
345	if err != nil {
346		return nil, parseError(err)
347	}
348
349	id := uuid.New().String()
350	entry := &OutboxEntry{
351		ID:     id,
352		Params: args.Email,
353		SendAt: time.Now().Add(time.Duration(args.DelaySeconds) * time.Second),
354	}
355
356	d.outboxMu.Lock()
357	d.outbox[id] = entry
358	d.outboxMu.Unlock()
359
360	log.Printf("daemon: queued email %s, sending in %ds", id, args.DelaySeconds)
361
362	return daemonrpc.QueueEmailResult{JobID: id}, nil
363}
364
365func (d *Daemon) handleCancelEmail(_ context.Context, _ *daemonrpc.Conn, params json.RawMessage) (any, error) {
366	args, err := decodeParams[daemonrpc.CancelEmailParams](params)
367	if err != nil {
368		return nil, parseError(err)
369	}
370
371	d.outboxMu.Lock()
372	_, exists := d.outbox[args.JobID]
373	if exists {
374		delete(d.outbox, args.JobID)
375	}
376	d.outboxMu.Unlock()
377
378	if !exists {
379		return nil, fmt.Errorf("job %s not found", args.JobID)
380	}
381
382	log.Printf("daemon: cancelled email %s", args.JobID)
383	return true, nil
384}