daemon.go

  1package daemon
  2
  3import (
  4	"context"
  5	"fmt"
  6	"log"
  7	"net"
  8	"os"
  9	"runtime/debug"
 10	"sort"
 11	"sync"
 12	"time"
 13
 14	"github.com/floatpane/matcha/backend"
 15	"github.com/floatpane/matcha/config"
 16	"github.com/floatpane/matcha/daemonrpc"
 17	"github.com/floatpane/matcha/fetcher"
 18	"github.com/floatpane/matcha/notify"
 19)
 20
 21const inboxFolder = "INBOX"
 22
 23// Daemon is the long-running background process that manages email
 24// connections, caching, sync, and notifications.
 25type Daemon struct {
 26	config    *config.Config
 27	providers map[string]backend.Provider
 28	listener  net.Listener
 29	startTime time.Time
 30
 31	// Connected TUI/CLI clients.
 32	clients map[*daemonrpc.Conn]struct{}
 33	mu      sync.RWMutex
 34
 35	// Per-client subscriptions: conn → set of "accountID:folder".
 36	subscriptions map[*daemonrpc.Conn]map[string]struct{}
 37	subMu         sync.RWMutex
 38
 39	// Mutex for disk cache updates.
 40	cacheMu sync.Mutex
 41
 42	// IMAP IDLE watcher for push notifications.
 43	idleWatcher *fetcher.IdleWatcher
 44	idleUpdates chan fetcher.IdleUpdate
 45
 46	// Background sync cancellation.
 47	syncCancel context.CancelFunc
 48
 49	shutdown chan struct{}
 50	done     chan struct{}
 51}
 52
 53// New creates a daemon with the given config.
 54func New(cfg *config.Config) *Daemon {
 55	idleUpdates := make(chan fetcher.IdleUpdate, 16)
 56	return &Daemon{
 57		config:        cfg,
 58		providers:     make(map[string]backend.Provider),
 59		clients:       make(map[*daemonrpc.Conn]struct{}),
 60		subscriptions: make(map[*daemonrpc.Conn]map[string]struct{}),
 61		idleWatcher:   fetcher.NewIdleWatcher(idleUpdates),
 62		idleUpdates:   idleUpdates,
 63		shutdown:      make(chan struct{}),
 64		done:          make(chan struct{}),
 65	}
 66}
 67
 68// Run starts the daemon: creates providers, starts the socket listener,
 69// starts background sync, and blocks until shutdown.
 70func (d *Daemon) Run() error {
 71	d.startTime = time.Now()
 72
 73	// Ensure runtime directory exists.
 74	if err := daemonrpc.EnsureRuntimeDir(); err != nil {
 75		return fmt.Errorf("create runtime dir: %w", err)
 76	}
 77
 78	// Check for existing daemon.
 79	pidPath := daemonrpc.PIDPath()
 80	if pid, running := IsRunning(pidPath); running {
 81		return fmt.Errorf("daemon already running (PID %d)", pid)
 82	}
 83
 84	// Write PID file.
 85	if err := WritePID(pidPath); err != nil {
 86		return fmt.Errorf("write PID file: %w", err)
 87	}
 88	defer RemovePID(pidPath) //nolint:errcheck
 89
 90	// Remove stale socket file.
 91	sockPath := daemonrpc.SocketPath()
 92	if err := os.Remove(sockPath); err != nil && !os.IsNotExist(err) {
 93		return fmt.Errorf("remove stale socket: %w", err)
 94	}
 95
 96	// Listen on Unix domain socket.
 97	var err error
 98	d.listener, err = net.Listen("unix", sockPath) //nolint:noctx
 99	if err != nil {
100		return fmt.Errorf("listen: %w", err)
101	}
102	defer d.listener.Close() //nolint:errcheck
103
104	// Set socket permissions (owner only).
105	if err := os.Chmod(sockPath, 0700); err != nil { // #nosec G302
106		return fmt.Errorf("set socket permissions: %w", err)
107	}
108
109	log.Printf("daemon: listening on %s (PID %d)", sockPath, os.Getpid())
110
111	// Initialize providers for all accounts.
112	d.initProviders()
113
114	// Start IMAP IDLE watchers for all accounts.
115	d.startIdleWatchers()
116	go d.idleEventLoop()
117
118	// Start signal handler.
119	go d.handleSignals()
120
121	// Start background sync.
122	ctx, cancel := context.WithCancel(context.Background())
123	d.syncCancel = cancel
124	go d.backgroundSync(ctx)
125
126	// Accept client connections.
127	go d.acceptLoop()
128
129	// Block until shutdown.
130	<-d.shutdown
131
132	// Cleanup.
133	log.Println("daemon: shutting down")
134	d.listener.Close() //nolint:errcheck,gosec
135	if err := d.idleWatcher.StopAllAndWaitTimeout(5 * time.Second); err != nil {
136		log.Printf("daemon: %v", err)
137	}
138	cancel()
139	d.closeAllClients()
140	d.closeProviders()
141
142	close(d.done)
143	return nil
144}
145
146// Shutdown triggers a graceful shutdown.
147func (d *Daemon) Shutdown() {
148	select {
149	case <-d.shutdown:
150		// Already shutting down.
151	default:
152		close(d.shutdown)
153	}
154}
155
156// ReloadConfig reloads the configuration from disk.
157func (d *Daemon) ReloadConfig() error {
158	cfg, err := config.LoadConfig()
159	if err != nil {
160		return fmt.Errorf("load config: %w", err)
161	}
162	d.mu.Lock()
163	d.config = cfg
164	d.mu.Unlock()
165
166	// Reinitialize providers for new/changed accounts.
167	d.initProviders()
168
169	// Notify clients.
170	d.broadcastEvent(daemonrpc.EventConfigReloaded, nil)
171
172	log.Println("daemon: config reloaded")
173	return nil
174}
175
176func (d *Daemon) initProviders() {
177	d.mu.Lock()
178	defer d.mu.Unlock()
179
180	for i := range d.config.Accounts {
181		acct := &d.config.Accounts[i]
182		if _, exists := d.providers[acct.ID]; exists {
183			continue
184		}
185		p, err := backend.New(acct)
186		if err != nil {
187			log.Printf("daemon: failed to create provider for %s: %v", acct.Email, err)
188			continue
189		}
190		d.providers[acct.ID] = p
191		log.Printf("daemon: provider ready for %s (%s)", acct.Email, acct.Protocol)
192	}
193}
194
195func (d *Daemon) acceptLoop() {
196	for {
197		done := func() bool {
198			defer func() {
199				if r := recover(); r != nil {
200					log.Printf("daemon: acceptLoop panic recovered: %v\n%s", r, debug.Stack())
201				}
202			}()
203			conn, err := d.listener.Accept()
204			if err != nil {
205				select {
206				case <-d.shutdown:
207					return true
208				default:
209					log.Printf("daemon: accept error: %v", err)
210					return false
211				}
212			}
213			rpcConn := daemonrpc.NewConn(conn)
214			d.addClient(rpcConn)
215			go d.handleClient(rpcConn)
216			return false
217		}()
218		if done {
219			return
220		}
221	}
222}
223
224func (d *Daemon) handleClient(conn *daemonrpc.Conn) {
225	defer d.removeClient(conn)
226	defer conn.Close() //nolint:errcheck
227
228	for {
229		msg, err := conn.ReceiveMessage()
230		if err != nil {
231			// Client disconnected or read error.
232			return
233		}
234		if msg.Request != nil {
235			d.handleRequest(conn, msg.Request)
236		}
237	}
238}
239
240func (d *Daemon) addClient(conn *daemonrpc.Conn) {
241	d.mu.Lock()
242	defer d.mu.Unlock()
243	d.clients[conn] = struct{}{}
244	log.Println("daemon: client connected")
245}
246
247func (d *Daemon) removeClient(conn *daemonrpc.Conn) {
248	d.mu.Lock()
249	delete(d.clients, conn)
250	d.mu.Unlock()
251
252	d.subMu.Lock()
253	delete(d.subscriptions, conn)
254	d.subMu.Unlock()
255
256	log.Println("daemon: client disconnected")
257}
258
259func (d *Daemon) closeAllClients() {
260	d.mu.Lock()
261	defer d.mu.Unlock()
262	for conn := range d.clients {
263		conn.Close() //nolint:errcheck,gosec
264	}
265	d.clients = make(map[*daemonrpc.Conn]struct{})
266}
267
268func (d *Daemon) closeProviders() {
269	d.mu.Lock()
270	defer d.mu.Unlock()
271	for id, p := range d.providers {
272		if err := p.Close(); err != nil {
273			log.Printf("daemon: error closing provider %s: %v", id, err)
274		}
275	}
276}
277
278// broadcastEvent sends an event to all connected clients.
279func (d *Daemon) broadcastEvent(eventType string, data interface{}) {
280	d.mu.RLock()
281	defer d.mu.RUnlock()
282	for conn := range d.clients {
283		if err := conn.SendEvent(eventType, data); err != nil {
284			log.Printf("daemon: broadcast error: %v", err)
285		}
286	}
287}
288
289// broadcastToSubscribers sends an event only to clients subscribed to the given account+folder.
290func (d *Daemon) broadcastToSubscribers(accountID, folder, eventType string, data interface{}) {
291	key := accountID + ":" + folder
292	d.subMu.RLock()
293	defer d.subMu.RUnlock()
294
295	for conn, subs := range d.subscriptions {
296		if _, ok := subs[key]; ok {
297			if err := conn.SendEvent(eventType, data); err != nil {
298				log.Printf("daemon: subscriber broadcast error: %v", err)
299			}
300		}
301	}
302}
303
304// getProvider returns the provider for the given account ID.
305func (d *Daemon) getProvider(accountID string) (backend.Provider, error) {
306	d.mu.RLock()
307	defer d.mu.RUnlock()
308	p, ok := d.providers[accountID]
309	if !ok {
310		return nil, fmt.Errorf("no provider for account %s", accountID)
311	}
312	return p, nil
313}
314
315// getAccount returns the account config for the given ID.
316func (d *Daemon) getAccount(accountID string) *config.Account {
317	d.mu.RLock()
318	defer d.mu.RUnlock()
319	return d.config.GetAccountByID(accountID)
320}
321
322// backgroundSync handles periodic sync and IDLE-like notifications.
323func (d *Daemon) backgroundSync(ctx context.Context) {
324	ticker := time.NewTicker(5 * time.Minute)
325	defer ticker.Stop()
326
327	for {
328		select {
329		case <-ctx.Done():
330			return
331		case <-ticker.C:
332			d.syncAllAccounts(ctx)
333		}
334	}
335}
336
337func (d *Daemon) syncAllAccounts(ctx context.Context) {
338	d.mu.RLock()
339	accounts := make([]config.Account, len(d.config.Accounts))
340	copy(accounts, d.config.Accounts)
341	d.mu.RUnlock()
342
343	for _, acct := range accounts {
344		select {
345		case <-ctx.Done():
346			return
347		default:
348		}
349
350		d.broadcastToSubscribers(acct.ID, inboxFolder, daemonrpc.EventSyncStarted, daemonrpc.SyncStartedEvent{
351			AccountID: acct.ID,
352			Folder:    inboxFolder,
353		})
354
355		p, err := d.getProvider(acct.ID)
356		if err != nil {
357			continue
358		}
359
360		emails, err := p.FetchEmails(ctx, inboxFolder, 50, 0)
361		if err != nil {
362			log.Printf("daemon: sync %s failed: %v", acct.Email, err)
363			d.broadcastToSubscribers(acct.ID, inboxFolder, daemonrpc.EventSyncError, daemonrpc.SyncErrorEvent{
364				AccountID: acct.ID,
365				Folder:    inboxFolder,
366				Error:     err.Error(),
367			})
368			continue
369		}
370
371		oldCached, _ := config.LoadFolderEmailCache(inboxFolder)
372		oldUIDs := make(map[uint32]struct{}, len(oldCached))
373		for _, e := range oldCached {
374			if e.AccountID == acct.ID {
375				oldUIDs[e.UID] = struct{}{}
376			}
377		}
378
379		// Cache the fetched emails to disk.
380		var cached []config.CachedEmail
381		for _, e := range emails {
382			cached = append(cached, config.CachedEmail{
383				UID:        e.UID,
384				From:       e.From,
385				To:         e.To,
386				Subject:    e.Subject,
387				Date:       e.Date,
388				MessageID:  e.MessageID,
389				InReplyTo:  e.InReplyTo,
390				References: e.References,
391				AccountID:  e.AccountID,
392				IsRead:     e.IsRead,
393			})
394		}
395		if err := d.updateFolderCache(inboxFolder, acct.ID, cached); err != nil {
396			log.Printf("daemon: cache update for INBOX failed: %v", err)
397		}
398
399		d.broadcastToSubscribers(acct.ID, inboxFolder, daemonrpc.EventSyncComplete, daemonrpc.SyncCompleteEvent{
400			AccountID:  acct.ID,
401			Folder:     inboxFolder,
402			EmailCount: len(emails),
403		})
404
405		newCount := 0
406		for _, e := range emails {
407			if _, seen := oldUIDs[e.UID]; !seen {
408				newCount++
409			}
410		}
411
412		// Send desktop notification if TUI not connected.
413		d.mu.RLock()
414		noClients := len(d.clients) == 0
415		d.mu.RUnlock()
416
417		if noClients && newCount > 0 {
418			if !d.config.DisableNotifications {
419				go notify.Send("Matcha", fmt.Sprintf("New mail for %s", acct.FetchEmail)) //nolint:errcheck
420			}
421		}
422	}
423}
424
425// startIdleWatchers starts IMAP IDLE watchers for all accounts on INBOX.
426func (d *Daemon) startIdleWatchers() {
427	d.mu.RLock()
428	defer d.mu.RUnlock()
429
430	for i := range d.config.Accounts {
431		acct := &d.config.Accounts[i]
432		// Only IMAP accounts support IDLE.
433		protocol := acct.Protocol
434		if protocol == "" {
435			protocol = "imap"
436		}
437		if protocol != "imap" {
438			continue
439		}
440		d.idleWatcher.Watch(acct, inboxFolder)
441		log.Printf("daemon: IDLE watcher started for %s", acct.Email)
442	}
443}
444
445// idleEventLoop listens for IDLE updates and broadcasts them as events.
446func (d *Daemon) idleEventLoop() {
447	for {
448		select {
449		case <-d.shutdown:
450			return
451		case update, ok := <-d.idleUpdates:
452			if !ok {
453				return
454			}
455			log.Printf("daemon: IDLE update for %s/%s", update.AccountID, update.FolderName)
456
457			// Desktop notification when no clients connected.
458			d.mu.RLock()
459			noClients := len(d.clients) == 0
460			d.mu.RUnlock()
461
462			if noClients && !d.config.DisableNotifications {
463				accountName := update.AccountID
464				if acct := d.config.GetAccountByID(update.AccountID); acct != nil {
465					accountName = acct.Email
466				}
467				go notify.Send("Matcha", fmt.Sprintf("New mail in %s (%s)", update.FolderName, accountName)) //nolint:errcheck
468			}
469
470			// Broadcast to subscribed clients.
471			d.broadcastToSubscribers(update.AccountID, update.FolderName, daemonrpc.EventNewMail, daemonrpc.NewMailEvent{
472				AccountID: update.AccountID,
473				Folder:    update.FolderName,
474			})
475
476			// Fetch and cache emails so they're fresh when TUI next connects.
477			go d.fetchAndCache(update.AccountID, update.FolderName)
478		}
479	}
480}
481
482// fetchAndCache fetches emails for an account/folder and saves to disk cache.
483func (d *Daemon) fetchAndCache(accountID, folder string) {
484	acct := d.getAccount(accountID)
485	if acct == nil {
486		return
487	}
488
489	emails, err := fetcher.FetchFolderEmails(acct, folder, 50, 0)
490	if err != nil {
491		log.Printf("daemon: cache fetch for %s/%s failed: %v", accountID, folder, err)
492		return
493	}
494
495	// Convert to cache format and save.
496	var cached []config.CachedEmail
497	for _, e := range emails {
498		cached = append(cached, config.CachedEmail{
499			UID:        e.UID,
500			From:       e.From,
501			To:         e.To,
502			Subject:    e.Subject,
503			Date:       e.Date,
504			MessageID:  e.MessageID,
505			InReplyTo:  e.InReplyTo,
506			References: e.References,
507			AccountID:  e.AccountID,
508			IsRead:     e.IsRead,
509		})
510	}
511
512	if err := d.updateFolderCache(folder, accountID, cached); err != nil {
513		log.Printf("daemon: cache update for %s failed: %v", folder, err)
514		return
515	}
516
517	log.Printf("daemon: cached %d emails for %s/%s", len(cached), accountID, folder)
518
519	// Also notify subscribers that emails were updated.
520	d.broadcastToSubscribers(accountID, folder, daemonrpc.EventSyncComplete, daemonrpc.SyncCompleteEvent{
521		AccountID:  accountID,
522		Folder:     folder,
523		EmailCount: len(emails),
524	})
525}
526
527// updateFolderCache safely merges new emails for a specific account into the existing folder cache.
528func (d *Daemon) updateFolderCache(folderName, accountID string, newEmails []config.CachedEmail) error {
529	d.cacheMu.Lock()
530	defer d.cacheMu.Unlock()
531
532	// Load existing cache
533	existing, _ := config.LoadFolderEmailCache(folderName) // Ignore error, assume empty if missing
534
535	// Filter out old emails for this account
536	var merged []config.CachedEmail
537	for _, e := range existing {
538		if e.AccountID != accountID {
539			merged = append(merged, e)
540		}
541	}
542
543	// Append new emails
544	merged = append(merged, newEmails...)
545
546	// Sort newest first
547	sort.Slice(merged, func(i, j int) bool {
548		return merged[i].Date.After(merged[j].Date)
549	})
550
551	// Save merged cache
552	return config.SaveFolderEmailCache(folderName, merged)
553}