daemon.go

  1package daemon
  2
  3import (
  4	"context"
  5	"fmt"
  6	"log"
  7	"net"
  8	"os"
  9	"sort"
 10	"sync"
 11	"time"
 12
 13	"github.com/floatpane/matcha/backend"
 14	"github.com/floatpane/matcha/config"
 15	"github.com/floatpane/matcha/daemonrpc"
 16	"github.com/floatpane/matcha/fetcher"
 17	"github.com/floatpane/matcha/notify"
 18)
 19
 20// Daemon is the long-running background process that manages email
 21// connections, caching, sync, and notifications.
 22type Daemon struct {
 23	config    *config.Config
 24	providers map[string]backend.Provider
 25	listener  net.Listener
 26	startTime time.Time
 27
 28	// Connected TUI/CLI clients.
 29	clients map[*daemonrpc.Conn]struct{}
 30	mu      sync.RWMutex
 31
 32	// Per-client subscriptions: conn → set of "accountID:folder".
 33	subscriptions map[*daemonrpc.Conn]map[string]struct{}
 34	subMu         sync.RWMutex
 35
 36	// Mutex for disk cache updates.
 37	cacheMu sync.Mutex
 38
 39	// IMAP IDLE watcher for push notifications.
 40	idleWatcher *fetcher.IdleWatcher
 41	idleUpdates chan fetcher.IdleUpdate
 42
 43	// Background sync cancellation.
 44	syncCancel context.CancelFunc
 45
 46	shutdown chan struct{}
 47	done     chan struct{}
 48}
 49
 50// New creates a daemon with the given config.
 51func New(cfg *config.Config) *Daemon {
 52	idleUpdates := make(chan fetcher.IdleUpdate, 16)
 53	return &Daemon{
 54		config:        cfg,
 55		providers:     make(map[string]backend.Provider),
 56		clients:       make(map[*daemonrpc.Conn]struct{}),
 57		subscriptions: make(map[*daemonrpc.Conn]map[string]struct{}),
 58		idleWatcher:   fetcher.NewIdleWatcher(idleUpdates),
 59		idleUpdates:   idleUpdates,
 60		shutdown:      make(chan struct{}),
 61		done:          make(chan struct{}),
 62	}
 63}
 64
 65// Run starts the daemon: creates providers, starts the socket listener,
 66// starts background sync, and blocks until shutdown.
 67func (d *Daemon) Run() error {
 68	d.startTime = time.Now()
 69
 70	// Ensure runtime directory exists.
 71	if err := daemonrpc.EnsureRuntimeDir(); err != nil {
 72		return fmt.Errorf("create runtime dir: %w", err)
 73	}
 74
 75	// Check for existing daemon.
 76	pidPath := daemonrpc.PIDPath()
 77	if pid, running := IsRunning(pidPath); running {
 78		return fmt.Errorf("daemon already running (PID %d)", pid)
 79	}
 80
 81	// Write PID file.
 82	if err := WritePID(pidPath); err != nil {
 83		return fmt.Errorf("write PID file: %w", err)
 84	}
 85	defer RemovePID(pidPath)
 86
 87	// Remove stale socket file.
 88	sockPath := daemonrpc.SocketPath()
 89	os.Remove(sockPath)
 90
 91	// Listen on Unix domain socket.
 92	var err error
 93	d.listener, err = net.Listen("unix", sockPath)
 94	if err != nil {
 95		return fmt.Errorf("listen: %w", err)
 96	}
 97	defer d.listener.Close()
 98
 99	// Set socket permissions (owner only).
100	os.Chmod(sockPath, 0700)
101
102	log.Printf("daemon: listening on %s (PID %d)", sockPath, os.Getpid())
103
104	// Initialize providers for all accounts.
105	d.initProviders()
106
107	// Start IMAP IDLE watchers for all accounts.
108	d.startIdleWatchers()
109	go d.idleEventLoop()
110
111	// Start signal handler.
112	go d.handleSignals()
113
114	// Start background sync.
115	ctx, cancel := context.WithCancel(context.Background())
116	d.syncCancel = cancel
117	go d.backgroundSync(ctx)
118
119	// Accept client connections.
120	go d.acceptLoop()
121
122	// Block until shutdown.
123	<-d.shutdown
124
125	// Cleanup.
126	log.Println("daemon: shutting down")
127	d.listener.Close()
128	d.idleWatcher.StopAll()
129	cancel()
130	d.closeAllClients()
131	d.closeProviders()
132
133	close(d.done)
134	return nil
135}
136
137// Shutdown triggers a graceful shutdown.
138func (d *Daemon) Shutdown() {
139	select {
140	case <-d.shutdown:
141		// Already shutting down.
142	default:
143		close(d.shutdown)
144	}
145}
146
147// ReloadConfig reloads the configuration from disk.
148func (d *Daemon) ReloadConfig() error {
149	cfg, err := config.LoadConfig()
150	if err != nil {
151		return fmt.Errorf("load config: %w", err)
152	}
153	d.mu.Lock()
154	d.config = cfg
155	d.mu.Unlock()
156
157	// Reinitialize providers for new/changed accounts.
158	d.initProviders()
159
160	// Notify clients.
161	d.broadcastEvent(daemonrpc.EventConfigReloaded, nil)
162
163	log.Println("daemon: config reloaded")
164	return nil
165}
166
167func (d *Daemon) initProviders() {
168	d.mu.Lock()
169	defer d.mu.Unlock()
170
171	for i := range d.config.Accounts {
172		acct := &d.config.Accounts[i]
173		if _, exists := d.providers[acct.ID]; exists {
174			continue
175		}
176		p, err := backend.New(acct)
177		if err != nil {
178			log.Printf("daemon: failed to create provider for %s: %v", acct.Email, err)
179			continue
180		}
181		d.providers[acct.ID] = p
182		log.Printf("daemon: provider ready for %s (%s)", acct.Email, acct.Protocol)
183	}
184}
185
186func (d *Daemon) acceptLoop() {
187	for {
188		conn, err := d.listener.Accept()
189		if err != nil {
190			select {
191			case <-d.shutdown:
192				return
193			default:
194				log.Printf("daemon: accept error: %v", err)
195				continue
196			}
197		}
198		rpcConn := daemonrpc.NewConn(conn)
199		d.addClient(rpcConn)
200		go d.handleClient(rpcConn)
201	}
202}
203
204func (d *Daemon) handleClient(conn *daemonrpc.Conn) {
205	defer d.removeClient(conn)
206	defer conn.Close()
207
208	for {
209		msg, err := conn.ReceiveMessage()
210		if err != nil {
211			// Client disconnected or read error.
212			return
213		}
214		if msg.Request != nil {
215			d.handleRequest(conn, msg.Request)
216		}
217	}
218}
219
220func (d *Daemon) addClient(conn *daemonrpc.Conn) {
221	d.mu.Lock()
222	defer d.mu.Unlock()
223	d.clients[conn] = struct{}{}
224	log.Println("daemon: client connected")
225}
226
227func (d *Daemon) removeClient(conn *daemonrpc.Conn) {
228	d.mu.Lock()
229	delete(d.clients, conn)
230	d.mu.Unlock()
231
232	d.subMu.Lock()
233	delete(d.subscriptions, conn)
234	d.subMu.Unlock()
235
236	log.Println("daemon: client disconnected")
237}
238
239func (d *Daemon) closeAllClients() {
240	d.mu.Lock()
241	defer d.mu.Unlock()
242	for conn := range d.clients {
243		conn.Close()
244	}
245	d.clients = make(map[*daemonrpc.Conn]struct{})
246}
247
248func (d *Daemon) closeProviders() {
249	d.mu.Lock()
250	defer d.mu.Unlock()
251	for id, p := range d.providers {
252		if err := p.Close(); err != nil {
253			log.Printf("daemon: error closing provider %s: %v", id, err)
254		}
255	}
256}
257
258// broadcastEvent sends an event to all connected clients.
259func (d *Daemon) broadcastEvent(eventType string, data interface{}) {
260	d.mu.RLock()
261	defer d.mu.RUnlock()
262	for conn := range d.clients {
263		if err := conn.SendEvent(eventType, data); err != nil {
264			log.Printf("daemon: broadcast error: %v", err)
265		}
266	}
267}
268
269// broadcastToSubscribers sends an event only to clients subscribed to the given account+folder.
270func (d *Daemon) broadcastToSubscribers(accountID, folder, eventType string, data interface{}) {
271	key := accountID + ":" + folder
272	d.subMu.RLock()
273	defer d.subMu.RUnlock()
274
275	for conn, subs := range d.subscriptions {
276		if _, ok := subs[key]; ok {
277			if err := conn.SendEvent(eventType, data); err != nil {
278				log.Printf("daemon: subscriber broadcast error: %v", err)
279			}
280		}
281	}
282}
283
284// getProvider returns the provider for the given account ID.
285func (d *Daemon) getProvider(accountID string) (backend.Provider, error) {
286	d.mu.RLock()
287	defer d.mu.RUnlock()
288	p, ok := d.providers[accountID]
289	if !ok {
290		return nil, fmt.Errorf("no provider for account %s", accountID)
291	}
292	return p, nil
293}
294
295// getAccount returns the account config for the given ID.
296func (d *Daemon) getAccount(accountID string) *config.Account {
297	d.mu.RLock()
298	defer d.mu.RUnlock()
299	return d.config.GetAccountByID(accountID)
300}
301
302// backgroundSync handles periodic sync and IDLE-like notifications.
303func (d *Daemon) backgroundSync(ctx context.Context) {
304	ticker := time.NewTicker(5 * time.Minute)
305	defer ticker.Stop()
306
307	for {
308		select {
309		case <-ctx.Done():
310			return
311		case <-ticker.C:
312			d.syncAllAccounts(ctx)
313		}
314	}
315}
316
317func (d *Daemon) syncAllAccounts(ctx context.Context) {
318	d.mu.RLock()
319	accounts := make([]config.Account, len(d.config.Accounts))
320	copy(accounts, d.config.Accounts)
321	d.mu.RUnlock()
322
323	for _, acct := range accounts {
324		select {
325		case <-ctx.Done():
326			return
327		default:
328		}
329
330		d.broadcastToSubscribers(acct.ID, "INBOX", daemonrpc.EventSyncStarted, daemonrpc.SyncStartedEvent{
331			AccountID: acct.ID,
332			Folder:    "INBOX",
333		})
334
335		p, err := d.getProvider(acct.ID)
336		if err != nil {
337			continue
338		}
339
340		emails, err := p.FetchEmails(ctx, "INBOX", 50, 0)
341		if err != nil {
342			log.Printf("daemon: sync %s failed: %v", acct.Email, err)
343			d.broadcastToSubscribers(acct.ID, "INBOX", daemonrpc.EventSyncError, daemonrpc.SyncErrorEvent{
344				AccountID: acct.ID,
345				Folder:    "INBOX",
346				Error:     err.Error(),
347			})
348			continue
349		}
350
351		oldCached, _ := config.LoadFolderEmailCache("INBOX")
352		oldUIDs := make(map[uint32]struct{}, len(oldCached))
353		for _, e := range oldCached {
354			if e.AccountID == acct.ID {
355				oldUIDs[e.UID] = struct{}{}
356			}
357		}
358
359		// Cache the fetched emails to disk.
360		var cached []config.CachedEmail
361		for _, e := range emails {
362			cached = append(cached, config.CachedEmail{
363				UID:       e.UID,
364				From:      e.From,
365				To:        e.To,
366				Subject:   e.Subject,
367				Date:      e.Date,
368				MessageID: e.MessageID,
369				AccountID: e.AccountID,
370				IsRead:    e.IsRead,
371			})
372		}
373		if err := d.updateFolderCache("INBOX", acct.ID, cached); err != nil {
374			log.Printf("daemon: cache update for INBOX failed: %v", err)
375		}
376
377		d.broadcastToSubscribers(acct.ID, "INBOX", daemonrpc.EventSyncComplete, daemonrpc.SyncCompleteEvent{
378			AccountID:  acct.ID,
379			Folder:     "INBOX",
380			EmailCount: len(emails),
381		})
382
383		newCount := 0
384		for _, e := range emails {
385			if _, seen := oldUIDs[e.UID]; !seen {
386				newCount++
387			}
388		}
389
390		// Send desktop notification if TUI not connected.
391		d.mu.RLock()
392		noClients := len(d.clients) == 0
393		d.mu.RUnlock()
394
395		if noClients && newCount > 0 {
396			if !d.config.DisableNotifications {
397				go notify.Send("Matcha", fmt.Sprintf("New mail for %s", acct.FetchEmail))
398			}
399		}
400	}
401}
402
403// startIdleWatchers starts IMAP IDLE watchers for all accounts on INBOX.
404func (d *Daemon) startIdleWatchers() {
405	d.mu.RLock()
406	defer d.mu.RUnlock()
407
408	for i := range d.config.Accounts {
409		acct := &d.config.Accounts[i]
410		// Only IMAP accounts support IDLE.
411		protocol := acct.Protocol
412		if protocol == "" {
413			protocol = "imap"
414		}
415		if protocol != "imap" {
416			continue
417		}
418		d.idleWatcher.Watch(acct, "INBOX")
419		log.Printf("daemon: IDLE watcher started for %s", acct.Email)
420	}
421}
422
423// idleEventLoop listens for IDLE updates and broadcasts them as events.
424func (d *Daemon) idleEventLoop() {
425	for {
426		select {
427		case <-d.shutdown:
428			return
429		case update, ok := <-d.idleUpdates:
430			if !ok {
431				return
432			}
433			log.Printf("daemon: IDLE update for %s/%s", update.AccountID, update.FolderName)
434
435			// Desktop notification when no clients connected.
436			d.mu.RLock()
437			noClients := len(d.clients) == 0
438			d.mu.RUnlock()
439
440			if noClients && !d.config.DisableNotifications {
441				accountName := update.AccountID
442				if acct := d.config.GetAccountByID(update.AccountID); acct != nil {
443					accountName = acct.Email
444				}
445				go notify.Send("Matcha", fmt.Sprintf("New mail in %s (%s)", update.FolderName, accountName))
446			}
447
448			// Broadcast to subscribed clients.
449			d.broadcastToSubscribers(update.AccountID, update.FolderName, daemonrpc.EventNewMail, daemonrpc.NewMailEvent{
450				AccountID: update.AccountID,
451				Folder:    update.FolderName,
452			})
453
454			// Fetch and cache emails so they're fresh when TUI next connects.
455			go d.fetchAndCache(update.AccountID, update.FolderName)
456		}
457	}
458}
459
460// fetchAndCache fetches emails for an account/folder and saves to disk cache.
461func (d *Daemon) fetchAndCache(accountID, folder string) {
462	acct := d.getAccount(accountID)
463	if acct == nil {
464		return
465	}
466
467	emails, err := fetcher.FetchFolderEmails(acct, folder, 50, 0)
468	if err != nil {
469		log.Printf("daemon: cache fetch for %s/%s failed: %v", accountID, folder, err)
470		return
471	}
472
473	// Convert to cache format and save.
474	var cached []config.CachedEmail
475	for _, e := range emails {
476		cached = append(cached, config.CachedEmail{
477			UID:       e.UID,
478			From:      e.From,
479			To:        e.To,
480			Subject:   e.Subject,
481			Date:      e.Date,
482			MessageID: e.MessageID,
483			AccountID: e.AccountID,
484			IsRead:    e.IsRead,
485		})
486	}
487
488	if err := d.updateFolderCache(folder, accountID, cached); err != nil {
489		log.Printf("daemon: cache update for %s failed: %v", folder, err)
490		return
491	}
492
493	log.Printf("daemon: cached %d emails for %s/%s", len(cached), accountID, folder)
494
495	// Also notify subscribers that emails were updated.
496	d.broadcastToSubscribers(accountID, folder, daemonrpc.EventSyncComplete, daemonrpc.SyncCompleteEvent{
497		AccountID:  accountID,
498		Folder:     folder,
499		EmailCount: len(emails),
500	})
501}
502
503// updateFolderCache safely merges new emails for a specific account into the existing folder cache.
504func (d *Daemon) updateFolderCache(folderName, accountID string, newEmails []config.CachedEmail) error {
505	d.cacheMu.Lock()
506	defer d.cacheMu.Unlock()
507
508	// Load existing cache
509	existing, _ := config.LoadFolderEmailCache(folderName) // Ignore error, assume empty if missing
510
511	// Filter out old emails for this account
512	var merged []config.CachedEmail
513	for _, e := range existing {
514		if e.AccountID != accountID {
515			merged = append(merged, e)
516		}
517	}
518
519	// Append new emails
520	merged = append(merged, newEmails...)
521
522	// Sort newest first
523	sort.Slice(merged, func(i, j int) bool {
524		return merged[i].Date.After(merged[j].Date)
525	})
526
527	// Save merged cache
528	return config.SaveFolderEmailCache(folderName, merged)
529}