daemon.go

  1package daemon
  2
  3import (
  4	"context"
  5	"fmt"
  6	"log"
  7	"net"
  8	"os"
  9	"sort"
 10	"sync"
 11	"time"
 12
 13	udsrpc "github.com/floatpane/go-uds-jsonrpc"
 14	"github.com/floatpane/matcha/backend"
 15	"github.com/floatpane/matcha/config"
 16	"github.com/floatpane/matcha/daemonrpc"
 17	"github.com/floatpane/matcha/fetcher"
 18	"github.com/floatpane/matcha/notify"
 19	"github.com/floatpane/matcha/sender"
 20)
 21
 22const inboxFolder = "INBOX"
 23
 24// Daemon is the long-running background process that manages email
 25// connections, caching, sync, and notifications.
 26type Daemon struct {
 27	config    *config.Config
 28	providers map[string]backend.Provider
 29	server    *udsrpc.Server
 30	startTime time.Time
 31
 32	mu sync.RWMutex
 33
 34	// Per-client subscriptions: conn → set of "accountID:folder".
 35	subscriptions map[*daemonrpc.Conn]map[string]struct{}
 36	subMu         sync.RWMutex
 37
 38	// Mutex for disk cache updates.
 39	cacheMu sync.Mutex
 40
 41	// IMAP IDLE watcher for push notifications.
 42	idleWatcher *fetcher.IdleWatcher
 43	idleUpdates chan fetcher.IdleUpdate
 44
 45	// Background sync cancellation.
 46	syncCancel context.CancelFunc
 47
 48	shutdown chan struct{}
 49	done     chan struct{}
 50
 51	outbox   map[string]*OutboxEntry
 52	outboxMu sync.Mutex
 53}
 54
 55type OutboxEntry struct {
 56	ID     string
 57	Params daemonrpc.SendEmailParams
 58	SendAt time.Time
 59}
 60
 61// New creates a daemon with the given config.
 62func New(cfg *config.Config) *Daemon {
 63	idleUpdates := make(chan fetcher.IdleUpdate, 16)
 64	d := &Daemon{
 65		config:        cfg,
 66		providers:     make(map[string]backend.Provider),
 67		subscriptions: make(map[*daemonrpc.Conn]map[string]struct{}),
 68		idleWatcher:   fetcher.NewIdleWatcher(idleUpdates),
 69		idleUpdates:   idleUpdates,
 70		shutdown:      make(chan struct{}),
 71		done:          make(chan struct{}),
 72		outbox:        make(map[string]*OutboxEntry),
 73	}
 74
 75	d.server = udsrpc.NewServer()
 76	d.registerHandlers()
 77	d.server.OnConnect(func(_ *daemonrpc.Conn) {
 78		log.Println("daemon: client connected")
 79	})
 80	d.server.OnDisconnect(func(conn *daemonrpc.Conn) {
 81		d.subMu.Lock()
 82		delete(d.subscriptions, conn)
 83		d.subMu.Unlock()
 84		log.Println("daemon: client disconnected")
 85	})
 86
 87	return d
 88}
 89
 90// registerHandlers wires each RPC method to its handler on the server.
 91func (d *Daemon) registerHandlers() {
 92	d.server.Handle(daemonrpc.MethodPing, d.handlePing)
 93	d.server.Handle(daemonrpc.MethodGetStatus, d.handleGetStatus)
 94	d.server.Handle(daemonrpc.MethodGetAccounts, d.handleGetAccounts)
 95	d.server.Handle(daemonrpc.MethodReloadConfig, d.handleReloadConfig)
 96	d.server.Handle(daemonrpc.MethodFetchEmails, d.handleFetchEmails)
 97	d.server.Handle(daemonrpc.MethodFetchEmailBody, d.handleFetchEmailBody)
 98	d.server.Handle(daemonrpc.MethodDeleteEmails, d.handleDeleteEmails)
 99	d.server.Handle(daemonrpc.MethodArchiveEmails, d.handleArchiveEmails)
100	d.server.Handle(daemonrpc.MethodMoveEmails, d.handleMoveEmails)
101	d.server.Handle(daemonrpc.MethodMarkRead, d.handleMarkRead)
102	d.server.Handle(daemonrpc.MethodFetchFolders, d.handleFetchFolders)
103	d.server.Handle(daemonrpc.MethodRefreshFolder, d.handleRefreshFolder)
104	d.server.Handle(daemonrpc.MethodSubscribe, d.handleSubscribe)
105	d.server.Handle(daemonrpc.MethodUnsubscribe, d.handleUnsubscribe)
106	d.server.Handle(daemonrpc.MethodQueueEmail, d.handleQueueEmail)
107	d.server.Handle(daemonrpc.MethodCancelEmail, d.handleCancelEmail)
108}
109
110// Run starts the daemon: creates providers, starts the socket listener,
111// starts background sync, and blocks until shutdown.
112func (d *Daemon) Run() error {
113	d.startTime = time.Now()
114
115	// Ensure runtime directory exists.
116	if err := daemonrpc.EnsureRuntimeDir(); err != nil {
117		return fmt.Errorf("create runtime dir: %w", err)
118	}
119
120	// Check for existing daemon.
121	pidPath := daemonrpc.PIDPath()
122	if pid, running := IsRunning(pidPath); running {
123		return fmt.Errorf("daemon already running (PID %d)", pid)
124	}
125
126	// Write PID file.
127	if err := WritePID(pidPath); err != nil {
128		return fmt.Errorf("write PID file: %w", err)
129	}
130	defer RemovePID(pidPath) //nolint:errcheck
131
132	// Remove stale socket file.
133	sockPath := daemonrpc.SocketPath()
134	if err := os.Remove(sockPath); err != nil && !os.IsNotExist(err) {
135		return fmt.Errorf("remove stale socket: %w", err)
136	}
137
138	// Listen on Unix domain socket.
139	listener, err := net.Listen("unix", sockPath) //nolint:noctx
140	if err != nil {
141		return fmt.Errorf("listen: %w", err)
142	}
143	defer listener.Close() //nolint:errcheck
144
145	// Set socket permissions (owner only).
146	if err := os.Chmod(sockPath, 0700); err != nil { // #nosec G302
147		return fmt.Errorf("set socket permissions: %w", err)
148	}
149
150	log.Printf("daemon: listening on %s (PID %d)", sockPath, os.Getpid())
151
152	// Initialize providers for all accounts.
153	d.initProviders()
154
155	// Start IMAP IDLE watchers for all accounts.
156	d.startIdleWatchers()
157	go d.idleEventLoop()
158
159	// Handle OS signals: SIGTERM/SIGINT → shutdown, SIGHUP → reload config.
160	stopSignals := udsrpc.HandleSignals(d.Shutdown, func() {
161		log.Println("daemon: received SIGHUP, reloading config")
162		if err := d.ReloadConfig(); err != nil {
163			log.Printf("daemon: config reload failed: %v", err)
164		}
165	})
166	defer stopSignals()
167
168	// Start background sync.
169	ctx, cancel := context.WithCancel(context.Background())
170	d.syncCancel = cancel
171	go d.backgroundSync(ctx)
172
173	go d.processOutbox(ctx)
174
175	// Serve client connections via the shared RPC server. Canceling serveCtx
176	// closes the listener and unblocks Serve.
177	serveCtx, serveCancel := context.WithCancel(context.Background())
178	go func() {
179		if err := d.server.Serve(serveCtx, listener); err != nil {
180			log.Printf("daemon: serve error: %v", err)
181		}
182	}()
183
184	// Block until shutdown.
185	<-d.shutdown
186
187	// Cleanup.
188	log.Println("daemon: shutting down")
189	serveCancel()
190	for _, conn := range d.server.Clients() {
191		conn.Close() //nolint:errcheck,gosec
192	}
193	if err := d.idleWatcher.StopAllAndWaitTimeout(5 * time.Second); err != nil {
194		log.Printf("daemon: %v", err)
195	}
196	cancel()
197	d.closeProviders()
198
199	close(d.done)
200	return nil
201}
202
203// Shutdown triggers a graceful shutdown.
204func (d *Daemon) Shutdown() {
205	select {
206	case <-d.shutdown:
207		// Already shutting down.
208	default:
209		close(d.shutdown)
210	}
211}
212
213// ReloadConfig reloads the configuration from disk.
214func (d *Daemon) ReloadConfig() error {
215	cfg, err := config.LoadConfig()
216	if err != nil {
217		return fmt.Errorf("load config: %w", err)
218	}
219	d.mu.Lock()
220	d.config = cfg
221	d.mu.Unlock()
222
223	// Reinitialize providers for new/changed accounts.
224	d.initProviders()
225
226	// Notify clients.
227	d.broadcastEvent(daemonrpc.EventConfigReloaded, nil)
228
229	log.Println("daemon: config reloaded")
230	return nil
231}
232
233func (d *Daemon) initProviders() {
234	d.mu.Lock()
235	defer d.mu.Unlock()
236
237	for i := range d.config.Accounts {
238		acct := &d.config.Accounts[i]
239		if _, exists := d.providers[acct.ID]; exists {
240			continue
241		}
242		p, err := backend.New(acct)
243		if err != nil {
244			log.Printf("daemon: failed to create provider for %s: %v", acct.Email, err)
245			continue
246		}
247		d.providers[acct.ID] = p
248		log.Printf("daemon: provider ready for %s (%s)", acct.Email, acct.Protocol)
249	}
250}
251
252func (d *Daemon) closeProviders() {
253	d.mu.Lock()
254	defer d.mu.Unlock()
255	for id, p := range d.providers {
256		if err := p.Close(); err != nil {
257			log.Printf("daemon: error closing provider %s: %v", id, err)
258		}
259	}
260}
261
262// broadcastEvent sends an event to all connected clients.
263func (d *Daemon) broadcastEvent(eventType string, data interface{}) {
264	d.server.Broadcast(eventType, data)
265}
266
267// broadcastToSubscribers sends an event only to clients subscribed to the given account+folder.
268func (d *Daemon) broadcastToSubscribers(accountID, folder, eventType string, data interface{}) {
269	key := accountID + ":" + folder
270	d.server.BroadcastFunc(eventType, data, func(conn *daemonrpc.Conn) bool {
271		d.subMu.RLock()
272		defer d.subMu.RUnlock()
273		_, ok := d.subscriptions[conn][key]
274		return ok
275	})
276}
277
278// getProvider returns the provider for the given account ID.
279func (d *Daemon) getProvider(accountID string) (backend.Provider, error) {
280	d.mu.RLock()
281	defer d.mu.RUnlock()
282	p, ok := d.providers[accountID]
283	if !ok {
284		return nil, fmt.Errorf("no provider for account %s", accountID)
285	}
286	return p, nil
287}
288
289// getAccount returns the account config for the given ID.
290func (d *Daemon) getAccount(accountID string) *config.Account {
291	d.mu.RLock()
292	defer d.mu.RUnlock()
293	return d.config.GetAccountByID(accountID)
294}
295
296// backgroundSync handles periodic sync and IDLE-like notifications.
297func (d *Daemon) backgroundSync(ctx context.Context) {
298	ticker := time.NewTicker(5 * time.Minute)
299	defer ticker.Stop()
300
301	for {
302		select {
303		case <-ctx.Done():
304			return
305		case <-ticker.C:
306			d.syncAllAccounts(ctx)
307		}
308	}
309}
310
311func (d *Daemon) syncAllAccounts(ctx context.Context) {
312	d.mu.RLock()
313	accounts := make([]config.Account, len(d.config.Accounts))
314	copy(accounts, d.config.Accounts)
315	d.mu.RUnlock()
316
317	for _, acct := range accounts {
318		select {
319		case <-ctx.Done():
320			return
321		default:
322		}
323
324		d.broadcastToSubscribers(acct.ID, inboxFolder, daemonrpc.EventSyncStarted, daemonrpc.SyncStartedEvent{
325			AccountID: acct.ID,
326			Folder:    inboxFolder,
327		})
328
329		p, err := d.getProvider(acct.ID)
330		if err != nil {
331			continue
332		}
333
334		emails, err := p.FetchEmails(ctx, inboxFolder, 50, 0)
335		if err != nil {
336			log.Printf("daemon: sync %s failed: %v", acct.Email, err)
337			d.broadcastToSubscribers(acct.ID, inboxFolder, daemonrpc.EventSyncError, daemonrpc.SyncErrorEvent{
338				AccountID: acct.ID,
339				Folder:    inboxFolder,
340				Error:     err.Error(),
341			})
342			continue
343		}
344
345		oldCached, _ := config.LoadFolderEmailCache(inboxFolder)
346		oldUIDs := make(map[uint32]struct{}, len(oldCached))
347		for _, e := range oldCached {
348			if e.AccountID == acct.ID {
349				oldUIDs[e.UID] = struct{}{}
350			}
351		}
352
353		// Cache the fetched emails to disk.
354		var cached []config.CachedEmail
355		for _, e := range emails {
356			cached = append(cached, config.CachedEmail{
357				UID:        e.UID,
358				From:       e.From,
359				To:         e.To,
360				Subject:    e.Subject,
361				Date:       e.Date,
362				MessageID:  e.MessageID,
363				InReplyTo:  e.InReplyTo,
364				References: e.References,
365				AccountID:  e.AccountID,
366				IsRead:     e.IsRead,
367			})
368		}
369		if err := d.updateFolderCache(inboxFolder, acct.ID, cached); err != nil {
370			log.Printf("daemon: cache update for INBOX failed: %v", err)
371		}
372
373		d.broadcastToSubscribers(acct.ID, inboxFolder, daemonrpc.EventSyncComplete, daemonrpc.SyncCompleteEvent{
374			AccountID:  acct.ID,
375			Folder:     inboxFolder,
376			EmailCount: len(emails),
377		})
378
379		newCount := 0
380		for _, e := range emails {
381			if _, seen := oldUIDs[e.UID]; !seen {
382				newCount++
383			}
384		}
385
386		// Send desktop notification if TUI not connected.
387		noClients := len(d.server.Clients()) == 0
388
389		if noClients && newCount > 0 {
390			if !d.config.DisableNotifications {
391				go notify.Send("Matcha", fmt.Sprintf("New mail for %s", acct.FetchEmail)) //nolint:errcheck
392			}
393		}
394	}
395}
396
397// startIdleWatchers starts IMAP IDLE watchers for all accounts on INBOX.
398func (d *Daemon) startIdleWatchers() {
399	d.mu.RLock()
400	defer d.mu.RUnlock()
401
402	for i := range d.config.Accounts {
403		acct := &d.config.Accounts[i]
404		// Only IMAP accounts support IDLE.
405		protocol := acct.Protocol
406		if protocol == "" {
407			protocol = "imap"
408		}
409		if protocol != "imap" {
410			continue
411		}
412		d.idleWatcher.Watch(acct, inboxFolder)
413		log.Printf("daemon: IDLE watcher started for %s", acct.Email)
414	}
415}
416
417// idleEventLoop listens for IDLE updates and broadcasts them as events.
418func (d *Daemon) idleEventLoop() {
419	for {
420		select {
421		case <-d.shutdown:
422			return
423		case update, ok := <-d.idleUpdates:
424			if !ok {
425				return
426			}
427			log.Printf("daemon: IDLE update for %s/%s", update.AccountID, update.FolderName)
428
429			// Desktop notification when no clients connected.
430			noClients := len(d.server.Clients()) == 0
431
432			if noClients && !d.config.DisableNotifications {
433				accountName := update.AccountID
434				if acct := d.config.GetAccountByID(update.AccountID); acct != nil {
435					accountName = acct.Email
436				}
437				go notify.Send("Matcha", fmt.Sprintf("New mail in %s (%s)", update.FolderName, accountName)) //nolint:errcheck
438			}
439
440			// Broadcast to subscribed clients.
441			d.broadcastToSubscribers(update.AccountID, update.FolderName, daemonrpc.EventNewMail, daemonrpc.NewMailEvent{
442				AccountID: update.AccountID,
443				Folder:    update.FolderName,
444			})
445
446			// Fetch and cache emails so they're fresh when TUI next connects.
447			go d.fetchAndCache(update.AccountID, update.FolderName)
448		}
449	}
450}
451
452// fetchAndCache fetches emails for an account/folder and saves to disk cache.
453func (d *Daemon) fetchAndCache(accountID, folder string) {
454	acct := d.getAccount(accountID)
455	if acct == nil {
456		return
457	}
458
459	emails, err := fetcher.FetchFolderEmails(acct, folder, 50, 0)
460	if err != nil {
461		log.Printf("daemon: cache fetch for %s/%s failed: %v", accountID, folder, err)
462		return
463	}
464
465	// Convert to cache format and save.
466	var cached []config.CachedEmail
467	for _, e := range emails {
468		cached = append(cached, config.CachedEmail{
469			UID:        e.UID,
470			From:       e.From,
471			To:         e.To,
472			Subject:    e.Subject,
473			Date:       e.Date,
474			MessageID:  e.MessageID,
475			InReplyTo:  e.InReplyTo,
476			References: e.References,
477			AccountID:  e.AccountID,
478			IsRead:     e.IsRead,
479		})
480	}
481
482	if err := d.updateFolderCache(folder, accountID, cached); err != nil {
483		log.Printf("daemon: cache update for %s failed: %v", folder, err)
484		return
485	}
486
487	log.Printf("daemon: cached %d emails for %s/%s", len(cached), accountID, folder)
488
489	// Also notify subscribers that emails were updated.
490	d.broadcastToSubscribers(accountID, folder, daemonrpc.EventSyncComplete, daemonrpc.SyncCompleteEvent{
491		AccountID:  accountID,
492		Folder:     folder,
493		EmailCount: len(emails),
494	})
495}
496
497// updateFolderCache safely merges new emails for a specific account into the existing folder cache.
498func (d *Daemon) updateFolderCache(folderName, accountID string, newEmails []config.CachedEmail) error {
499	d.cacheMu.Lock()
500	defer d.cacheMu.Unlock()
501
502	// Load existing cache
503	existing, _ := config.LoadFolderEmailCache(folderName) // Ignore error, assume empty if missing
504
505	// Filter out old emails for this account
506	var merged []config.CachedEmail
507	for _, e := range existing {
508		if e.AccountID != accountID {
509			merged = append(merged, e)
510		}
511	}
512
513	// Append new emails
514	merged = append(merged, newEmails...)
515
516	// Sort newest first
517	sort.Slice(merged, func(i, j int) bool {
518		return merged[i].Date.After(merged[j].Date)
519	})
520
521	// Save merged cache
522	return config.SaveFolderEmailCache(folderName, merged)
523}
524
525func (d *Daemon) processOutbox(ctx context.Context) {
526	ticker := time.NewTicker(time.Second)
527	defer ticker.Stop()
528
529	for {
530		select {
531		case <-ctx.Done():
532			return
533		case <-ticker.C:
534			d.outboxMu.Lock()
535			for id, entry := range d.outbox {
536				if time.Now().After(entry.SendAt) {
537					delete(d.outbox, id)
538					go d.sendOutboxEntry(entry)
539				}
540			}
541			d.outboxMu.Unlock()
542		}
543	}
544}
545
546func (d *Daemon) sendOutboxEntry(entry *OutboxEntry) {
547	acct := d.getAccount(entry.Params.AccountID)
548	if acct == nil {
549		log.Printf("daemon: outbox send failed, no account for %s", entry.Params.AccountID)
550		return
551	}
552
553	rawMsg, err := sender.SendEmail(
554		acct,
555		entry.Params.To,
556		entry.Params.Cc,
557		entry.Params.Bcc,
558		entry.Params.Subject,
559		entry.Params.Body,
560		entry.Params.HTMLBody,
561		entry.Params.Images,
562		entry.Params.Attachments,
563		entry.Params.InReplyTo,
564		entry.Params.References,
565		entry.Params.SignSMIME,
566		entry.Params.EncryptSMIME,
567		entry.Params.SignPGP,
568		entry.Params.EncryptPGP,
569	)
570	if err != nil {
571		log.Printf("daemon: outbox send failed for %s: %v", entry.ID, err)
572		return
573	}
574
575	if acct.ServiceProvider != "gmail" {
576		if err := fetcher.AppendToSentMailbox(acct, rawMsg); err != nil {
577			log.Printf("daemon: append to sent failed for %s: %v", entry.ID, err)
578		}
579	}
580
581	log.Printf("daemon: outbox sent email %s", entry.ID)
582}