daemon.go

  1package daemon
  2
  3import (
  4	"context"
  5	"fmt"
  6	"log"
  7	"net"
  8	"os"
  9	"runtime/debug"
 10	"sort"
 11	"sync"
 12	"time"
 13
 14	"github.com/floatpane/matcha/backend"
 15	"github.com/floatpane/matcha/config"
 16	"github.com/floatpane/matcha/daemonrpc"
 17	"github.com/floatpane/matcha/fetcher"
 18	"github.com/floatpane/matcha/notify"
 19)
 20
 21const inboxFolder = "INBOX"
 22
 23// Daemon is the long-running background process that manages email
 24// connections, caching, sync, and notifications.
 25type Daemon struct {
 26	config    *config.Config
 27	providers map[string]backend.Provider
 28	listener  net.Listener
 29	startTime time.Time
 30
 31	// Connected TUI/CLI clients.
 32	clients map[*daemonrpc.Conn]struct{}
 33	mu      sync.RWMutex
 34
 35	// Per-client subscriptions: conn → set of "accountID:folder".
 36	subscriptions map[*daemonrpc.Conn]map[string]struct{}
 37	subMu         sync.RWMutex
 38
 39	// Mutex for disk cache updates.
 40	cacheMu sync.Mutex
 41
 42	// IMAP IDLE watcher for push notifications.
 43	idleWatcher *fetcher.IdleWatcher
 44	idleUpdates chan fetcher.IdleUpdate
 45
 46	// Background sync cancellation.
 47	syncCancel context.CancelFunc
 48
 49	shutdown chan struct{}
 50	done     chan struct{}
 51}
 52
 53// New creates a daemon with the given config.
 54func New(cfg *config.Config) *Daemon {
 55	idleUpdates := make(chan fetcher.IdleUpdate, 16)
 56	return &Daemon{
 57		config:        cfg,
 58		providers:     make(map[string]backend.Provider),
 59		clients:       make(map[*daemonrpc.Conn]struct{}),
 60		subscriptions: make(map[*daemonrpc.Conn]map[string]struct{}),
 61		idleWatcher:   fetcher.NewIdleWatcher(idleUpdates),
 62		idleUpdates:   idleUpdates,
 63		shutdown:      make(chan struct{}),
 64		done:          make(chan struct{}),
 65	}
 66}
 67
 68// Run starts the daemon: creates providers, starts the socket listener,
 69// starts background sync, and blocks until shutdown.
 70func (d *Daemon) Run() error {
 71	d.startTime = time.Now()
 72
 73	// Ensure runtime directory exists.
 74	if err := daemonrpc.EnsureRuntimeDir(); err != nil {
 75		return fmt.Errorf("create runtime dir: %w", err)
 76	}
 77
 78	// Check for existing daemon.
 79	pidPath := daemonrpc.PIDPath()
 80	if pid, running := IsRunning(pidPath); running {
 81		return fmt.Errorf("daemon already running (PID %d)", pid)
 82	}
 83
 84	// Write PID file.
 85	if err := WritePID(pidPath); err != nil {
 86		return fmt.Errorf("write PID file: %w", err)
 87	}
 88	defer RemovePID(pidPath) //nolint:errcheck
 89
 90	// Remove stale socket file.
 91	sockPath := daemonrpc.SocketPath()
 92	os.Remove(sockPath) //nolint:errcheck,gosec
 93
 94	// Listen on Unix domain socket.
 95	var err error
 96	d.listener, err = net.Listen("unix", sockPath) //nolint:noctx
 97	if err != nil {
 98		return fmt.Errorf("listen: %w", err)
 99	}
100	defer d.listener.Close() //nolint:errcheck
101
102	// Set socket permissions (owner only).
103	os.Chmod(sockPath, 0700) //nolint:errcheck,gosec
104
105	log.Printf("daemon: listening on %s (PID %d)", sockPath, os.Getpid())
106
107	// Initialize providers for all accounts.
108	d.initProviders()
109
110	// Start IMAP IDLE watchers for all accounts.
111	d.startIdleWatchers()
112	go d.idleEventLoop()
113
114	// Start signal handler.
115	go d.handleSignals()
116
117	// Start background sync.
118	ctx, cancel := context.WithCancel(context.Background())
119	d.syncCancel = cancel
120	go d.backgroundSync(ctx)
121
122	// Accept client connections.
123	go d.acceptLoop()
124
125	// Block until shutdown.
126	<-d.shutdown
127
128	// Cleanup.
129	log.Println("daemon: shutting down")
130	d.listener.Close() //nolint:errcheck,gosec
131	d.idleWatcher.StopAll()
132	cancel()
133	d.closeAllClients()
134	d.closeProviders()
135
136	close(d.done)
137	return nil
138}
139
140// Shutdown triggers a graceful shutdown.
141func (d *Daemon) Shutdown() {
142	select {
143	case <-d.shutdown:
144		// Already shutting down.
145	default:
146		close(d.shutdown)
147	}
148}
149
150// ReloadConfig reloads the configuration from disk.
151func (d *Daemon) ReloadConfig() error {
152	cfg, err := config.LoadConfig()
153	if err != nil {
154		return fmt.Errorf("load config: %w", err)
155	}
156	d.mu.Lock()
157	d.config = cfg
158	d.mu.Unlock()
159
160	// Reinitialize providers for new/changed accounts.
161	d.initProviders()
162
163	// Notify clients.
164	d.broadcastEvent(daemonrpc.EventConfigReloaded, nil)
165
166	log.Println("daemon: config reloaded")
167	return nil
168}
169
170func (d *Daemon) initProviders() {
171	d.mu.Lock()
172	defer d.mu.Unlock()
173
174	for i := range d.config.Accounts {
175		acct := &d.config.Accounts[i]
176		if _, exists := d.providers[acct.ID]; exists {
177			continue
178		}
179		p, err := backend.New(acct)
180		if err != nil {
181			log.Printf("daemon: failed to create provider for %s: %v", acct.Email, err)
182			continue
183		}
184		d.providers[acct.ID] = p
185		log.Printf("daemon: provider ready for %s (%s)", acct.Email, acct.Protocol)
186	}
187}
188
189func (d *Daemon) acceptLoop() {
190	for {
191		done := func() bool {
192			defer func() {
193				if r := recover(); r != nil {
194					log.Printf("daemon: acceptLoop panic recovered: %v\n%s", r, debug.Stack())
195				}
196			}()
197			conn, err := d.listener.Accept()
198			if err != nil {
199				select {
200				case <-d.shutdown:
201					return true
202				default:
203					log.Printf("daemon: accept error: %v", err)
204					return false
205				}
206			}
207			rpcConn := daemonrpc.NewConn(conn)
208			d.addClient(rpcConn)
209			go d.handleClient(rpcConn)
210			return false
211		}()
212		if done {
213			return
214		}
215	}
216}
217
218func (d *Daemon) handleClient(conn *daemonrpc.Conn) {
219	defer d.removeClient(conn)
220	defer conn.Close() //nolint:errcheck
221
222	for {
223		msg, err := conn.ReceiveMessage()
224		if err != nil {
225			// Client disconnected or read error.
226			return
227		}
228		if msg.Request != nil {
229			d.handleRequest(conn, msg.Request)
230		}
231	}
232}
233
234func (d *Daemon) addClient(conn *daemonrpc.Conn) {
235	d.mu.Lock()
236	defer d.mu.Unlock()
237	d.clients[conn] = struct{}{}
238	log.Println("daemon: client connected")
239}
240
241func (d *Daemon) removeClient(conn *daemonrpc.Conn) {
242	d.mu.Lock()
243	delete(d.clients, conn)
244	d.mu.Unlock()
245
246	d.subMu.Lock()
247	delete(d.subscriptions, conn)
248	d.subMu.Unlock()
249
250	log.Println("daemon: client disconnected")
251}
252
253func (d *Daemon) closeAllClients() {
254	d.mu.Lock()
255	defer d.mu.Unlock()
256	for conn := range d.clients {
257		conn.Close() //nolint:errcheck,gosec
258	}
259	d.clients = make(map[*daemonrpc.Conn]struct{})
260}
261
262func (d *Daemon) closeProviders() {
263	d.mu.Lock()
264	defer d.mu.Unlock()
265	for id, p := range d.providers {
266		if err := p.Close(); err != nil {
267			log.Printf("daemon: error closing provider %s: %v", id, err)
268		}
269	}
270}
271
272// broadcastEvent sends an event to all connected clients.
273func (d *Daemon) broadcastEvent(eventType string, data interface{}) {
274	d.mu.RLock()
275	defer d.mu.RUnlock()
276	for conn := range d.clients {
277		if err := conn.SendEvent(eventType, data); err != nil {
278			log.Printf("daemon: broadcast error: %v", err)
279		}
280	}
281}
282
283// broadcastToSubscribers sends an event only to clients subscribed to the given account+folder.
284func (d *Daemon) broadcastToSubscribers(accountID, folder, eventType string, data interface{}) {
285	key := accountID + ":" + folder
286	d.subMu.RLock()
287	defer d.subMu.RUnlock()
288
289	for conn, subs := range d.subscriptions {
290		if _, ok := subs[key]; ok {
291			if err := conn.SendEvent(eventType, data); err != nil {
292				log.Printf("daemon: subscriber broadcast error: %v", err)
293			}
294		}
295	}
296}
297
298// getProvider returns the provider for the given account ID.
299func (d *Daemon) getProvider(accountID string) (backend.Provider, error) {
300	d.mu.RLock()
301	defer d.mu.RUnlock()
302	p, ok := d.providers[accountID]
303	if !ok {
304		return nil, fmt.Errorf("no provider for account %s", accountID)
305	}
306	return p, nil
307}
308
309// getAccount returns the account config for the given ID.
310func (d *Daemon) getAccount(accountID string) *config.Account {
311	d.mu.RLock()
312	defer d.mu.RUnlock()
313	return d.config.GetAccountByID(accountID)
314}
315
316// backgroundSync handles periodic sync and IDLE-like notifications.
317func (d *Daemon) backgroundSync(ctx context.Context) {
318	ticker := time.NewTicker(5 * time.Minute)
319	defer ticker.Stop()
320
321	for {
322		select {
323		case <-ctx.Done():
324			return
325		case <-ticker.C:
326			d.syncAllAccounts(ctx)
327		}
328	}
329}
330
331func (d *Daemon) syncAllAccounts(ctx context.Context) {
332	d.mu.RLock()
333	accounts := make([]config.Account, len(d.config.Accounts))
334	copy(accounts, d.config.Accounts)
335	d.mu.RUnlock()
336
337	for _, acct := range accounts {
338		select {
339		case <-ctx.Done():
340			return
341		default:
342		}
343
344		d.broadcastToSubscribers(acct.ID, inboxFolder, daemonrpc.EventSyncStarted, daemonrpc.SyncStartedEvent{
345			AccountID: acct.ID,
346			Folder:    inboxFolder,
347		})
348
349		p, err := d.getProvider(acct.ID)
350		if err != nil {
351			continue
352		}
353
354		emails, err := p.FetchEmails(ctx, inboxFolder, 50, 0)
355		if err != nil {
356			log.Printf("daemon: sync %s failed: %v", acct.Email, err)
357			d.broadcastToSubscribers(acct.ID, inboxFolder, daemonrpc.EventSyncError, daemonrpc.SyncErrorEvent{
358				AccountID: acct.ID,
359				Folder:    inboxFolder,
360				Error:     err.Error(),
361			})
362			continue
363		}
364
365		oldCached, _ := config.LoadFolderEmailCache(inboxFolder)
366		oldUIDs := make(map[uint32]struct{}, len(oldCached))
367		for _, e := range oldCached {
368			if e.AccountID == acct.ID {
369				oldUIDs[e.UID] = struct{}{}
370			}
371		}
372
373		// Cache the fetched emails to disk.
374		var cached []config.CachedEmail
375		for _, e := range emails {
376			cached = append(cached, config.CachedEmail{
377				UID:        e.UID,
378				From:       e.From,
379				To:         e.To,
380				Subject:    e.Subject,
381				Date:       e.Date,
382				MessageID:  e.MessageID,
383				InReplyTo:  e.InReplyTo,
384				References: e.References,
385				AccountID:  e.AccountID,
386				IsRead:     e.IsRead,
387			})
388		}
389		if err := d.updateFolderCache(inboxFolder, acct.ID, cached); err != nil {
390			log.Printf("daemon: cache update for INBOX failed: %v", err)
391		}
392
393		d.broadcastToSubscribers(acct.ID, inboxFolder, daemonrpc.EventSyncComplete, daemonrpc.SyncCompleteEvent{
394			AccountID:  acct.ID,
395			Folder:     inboxFolder,
396			EmailCount: len(emails),
397		})
398
399		newCount := 0
400		for _, e := range emails {
401			if _, seen := oldUIDs[e.UID]; !seen {
402				newCount++
403			}
404		}
405
406		// Send desktop notification if TUI not connected.
407		d.mu.RLock()
408		noClients := len(d.clients) == 0
409		d.mu.RUnlock()
410
411		if noClients && newCount > 0 {
412			if !d.config.DisableNotifications {
413				go notify.Send("Matcha", fmt.Sprintf("New mail for %s", acct.FetchEmail)) //nolint:errcheck
414			}
415		}
416	}
417}
418
419// startIdleWatchers starts IMAP IDLE watchers for all accounts on INBOX.
420func (d *Daemon) startIdleWatchers() {
421	d.mu.RLock()
422	defer d.mu.RUnlock()
423
424	for i := range d.config.Accounts {
425		acct := &d.config.Accounts[i]
426		// Only IMAP accounts support IDLE.
427		protocol := acct.Protocol
428		if protocol == "" {
429			protocol = "imap"
430		}
431		if protocol != "imap" {
432			continue
433		}
434		d.idleWatcher.Watch(acct, inboxFolder)
435		log.Printf("daemon: IDLE watcher started for %s", acct.Email)
436	}
437}
438
439// idleEventLoop listens for IDLE updates and broadcasts them as events.
440func (d *Daemon) idleEventLoop() {
441	for {
442		select {
443		case <-d.shutdown:
444			return
445		case update, ok := <-d.idleUpdates:
446			if !ok {
447				return
448			}
449			log.Printf("daemon: IDLE update for %s/%s", update.AccountID, update.FolderName)
450
451			// Desktop notification when no clients connected.
452			d.mu.RLock()
453			noClients := len(d.clients) == 0
454			d.mu.RUnlock()
455
456			if noClients && !d.config.DisableNotifications {
457				accountName := update.AccountID
458				if acct := d.config.GetAccountByID(update.AccountID); acct != nil {
459					accountName = acct.Email
460				}
461				go notify.Send("Matcha", fmt.Sprintf("New mail in %s (%s)", update.FolderName, accountName)) //nolint:errcheck
462			}
463
464			// Broadcast to subscribed clients.
465			d.broadcastToSubscribers(update.AccountID, update.FolderName, daemonrpc.EventNewMail, daemonrpc.NewMailEvent{
466				AccountID: update.AccountID,
467				Folder:    update.FolderName,
468			})
469
470			// Fetch and cache emails so they're fresh when TUI next connects.
471			go d.fetchAndCache(update.AccountID, update.FolderName)
472		}
473	}
474}
475
476// fetchAndCache fetches emails for an account/folder and saves to disk cache.
477func (d *Daemon) fetchAndCache(accountID, folder string) {
478	acct := d.getAccount(accountID)
479	if acct == nil {
480		return
481	}
482
483	emails, err := fetcher.FetchFolderEmails(acct, folder, 50, 0)
484	if err != nil {
485		log.Printf("daemon: cache fetch for %s/%s failed: %v", accountID, folder, err)
486		return
487	}
488
489	// Convert to cache format and save.
490	var cached []config.CachedEmail
491	for _, e := range emails {
492		cached = append(cached, config.CachedEmail{
493			UID:        e.UID,
494			From:       e.From,
495			To:         e.To,
496			Subject:    e.Subject,
497			Date:       e.Date,
498			MessageID:  e.MessageID,
499			InReplyTo:  e.InReplyTo,
500			References: e.References,
501			AccountID:  e.AccountID,
502			IsRead:     e.IsRead,
503		})
504	}
505
506	if err := d.updateFolderCache(folder, accountID, cached); err != nil {
507		log.Printf("daemon: cache update for %s failed: %v", folder, err)
508		return
509	}
510
511	log.Printf("daemon: cached %d emails for %s/%s", len(cached), accountID, folder)
512
513	// Also notify subscribers that emails were updated.
514	d.broadcastToSubscribers(accountID, folder, daemonrpc.EventSyncComplete, daemonrpc.SyncCompleteEvent{
515		AccountID:  accountID,
516		Folder:     folder,
517		EmailCount: len(emails),
518	})
519}
520
521// updateFolderCache safely merges new emails for a specific account into the existing folder cache.
522func (d *Daemon) updateFolderCache(folderName, accountID string, newEmails []config.CachedEmail) error {
523	d.cacheMu.Lock()
524	defer d.cacheMu.Unlock()
525
526	// Load existing cache
527	existing, _ := config.LoadFolderEmailCache(folderName) // Ignore error, assume empty if missing
528
529	// Filter out old emails for this account
530	var merged []config.CachedEmail
531	for _, e := range existing {
532		if e.AccountID != accountID {
533			merged = append(merged, e)
534		}
535	}
536
537	// Append new emails
538	merged = append(merged, newEmails...)
539
540	// Sort newest first
541	sort.Slice(merged, func(i, j int) bool {
542		return merged[i].Date.After(merged[j].Date)
543	})
544
545	// Save merged cache
546	return config.SaveFolderEmailCache(folderName, merged)
547}