daemon.go

  1package daemon
  2
  3import (
  4	"context"
  5	"fmt"
  6	"log"
  7	"net"
  8	"os"
  9	"runtime/debug"
 10	"sort"
 11	"sync"
 12	"time"
 13
 14	"github.com/floatpane/matcha/backend"
 15	"github.com/floatpane/matcha/config"
 16	"github.com/floatpane/matcha/daemonrpc"
 17	"github.com/floatpane/matcha/fetcher"
 18	"github.com/floatpane/matcha/notify"
 19)
 20
 21const inboxFolder = "INBOX"
 22
 23// Daemon is the long-running background process that manages email
 24// connections, caching, sync, and notifications.
 25type Daemon struct {
 26	config    *config.Config
 27	providers map[string]backend.Provider
 28	listener  net.Listener
 29	startTime time.Time
 30
 31	// Connected TUI/CLI clients.
 32	clients map[*daemonrpc.Conn]struct{}
 33	mu      sync.RWMutex
 34
 35	// Per-client subscriptions: conn → set of "accountID:folder".
 36	subscriptions map[*daemonrpc.Conn]map[string]struct{}
 37	subMu         sync.RWMutex
 38
 39	// Mutex for disk cache updates.
 40	cacheMu sync.Mutex
 41
 42	// IMAP IDLE watcher for push notifications.
 43	idleWatcher *fetcher.IdleWatcher
 44	idleUpdates chan fetcher.IdleUpdate
 45
 46	// Background sync cancellation.
 47	syncCancel context.CancelFunc
 48
 49	shutdown chan struct{}
 50	done     chan struct{}
 51}
 52
 53// New creates a daemon with the given config.
 54func New(cfg *config.Config) *Daemon {
 55	idleUpdates := make(chan fetcher.IdleUpdate, 16)
 56	return &Daemon{
 57		config:        cfg,
 58		providers:     make(map[string]backend.Provider),
 59		clients:       make(map[*daemonrpc.Conn]struct{}),
 60		subscriptions: make(map[*daemonrpc.Conn]map[string]struct{}),
 61		idleWatcher:   fetcher.NewIdleWatcher(idleUpdates),
 62		idleUpdates:   idleUpdates,
 63		shutdown:      make(chan struct{}),
 64		done:          make(chan struct{}),
 65	}
 66}
 67
 68// Run starts the daemon: creates providers, starts the socket listener,
 69// starts background sync, and blocks until shutdown.
 70func (d *Daemon) Run() error {
 71	d.startTime = time.Now()
 72
 73	// Ensure runtime directory exists.
 74	if err := daemonrpc.EnsureRuntimeDir(); err != nil {
 75		return fmt.Errorf("create runtime dir: %w", err)
 76	}
 77
 78	// Check for existing daemon.
 79	pidPath := daemonrpc.PIDPath()
 80	if pid, running := IsRunning(pidPath); running {
 81		return fmt.Errorf("daemon already running (PID %d)", pid)
 82	}
 83
 84	// Write PID file.
 85	if err := WritePID(pidPath); err != nil {
 86		return fmt.Errorf("write PID file: %w", err)
 87	}
 88	defer RemovePID(pidPath) //nolint:errcheck
 89
 90	// Remove stale socket file.
 91	sockPath := daemonrpc.SocketPath()
 92	if err := os.Remove(sockPath); err != nil && !os.IsNotExist(err) {
 93		return fmt.Errorf("remove stale socket: %w", err)
 94	}
 95
 96	// Listen on Unix domain socket.
 97	var err error
 98	d.listener, err = net.Listen("unix", sockPath) //nolint:noctx
 99	if err != nil {
100		return fmt.Errorf("listen: %w", err)
101	}
102	defer d.listener.Close() //nolint:errcheck
103
104	// Set socket permissions (owner only).
105	if err := os.Chmod(sockPath, 0700); err != nil { // #nosec G302
106		return fmt.Errorf("set socket permissions: %w", err)
107	}
108
109	log.Printf("daemon: listening on %s (PID %d)", sockPath, os.Getpid())
110
111	// Initialize providers for all accounts.
112	d.initProviders()
113
114	// Start IMAP IDLE watchers for all accounts.
115	d.startIdleWatchers()
116	go d.idleEventLoop()
117
118	// Start signal handler.
119	go d.handleSignals()
120
121	// Start background sync.
122	ctx, cancel := context.WithCancel(context.Background())
123	d.syncCancel = cancel
124	go d.backgroundSync(ctx)
125
126	// Accept client connections.
127	go d.acceptLoop()
128
129	// Block until shutdown.
130	<-d.shutdown
131
132	// Cleanup.
133	log.Println("daemon: shutting down")
134	d.listener.Close() //nolint:errcheck,gosec
135	d.idleWatcher.StopAll()
136	cancel()
137	d.closeAllClients()
138	d.closeProviders()
139
140	close(d.done)
141	return nil
142}
143
144// Shutdown triggers a graceful shutdown.
145func (d *Daemon) Shutdown() {
146	select {
147	case <-d.shutdown:
148		// Already shutting down.
149	default:
150		close(d.shutdown)
151	}
152}
153
154// ReloadConfig reloads the configuration from disk.
155func (d *Daemon) ReloadConfig() error {
156	cfg, err := config.LoadConfig()
157	if err != nil {
158		return fmt.Errorf("load config: %w", err)
159	}
160	d.mu.Lock()
161	d.config = cfg
162	d.mu.Unlock()
163
164	// Reinitialize providers for new/changed accounts.
165	d.initProviders()
166
167	// Notify clients.
168	d.broadcastEvent(daemonrpc.EventConfigReloaded, nil)
169
170	log.Println("daemon: config reloaded")
171	return nil
172}
173
174func (d *Daemon) initProviders() {
175	d.mu.Lock()
176	defer d.mu.Unlock()
177
178	for i := range d.config.Accounts {
179		acct := &d.config.Accounts[i]
180		if _, exists := d.providers[acct.ID]; exists {
181			continue
182		}
183		p, err := backend.New(acct)
184		if err != nil {
185			log.Printf("daemon: failed to create provider for %s: %v", acct.Email, err)
186			continue
187		}
188		d.providers[acct.ID] = p
189		log.Printf("daemon: provider ready for %s (%s)", acct.Email, acct.Protocol)
190	}
191}
192
193func (d *Daemon) acceptLoop() {
194	for {
195		done := func() bool {
196			defer func() {
197				if r := recover(); r != nil {
198					log.Printf("daemon: acceptLoop panic recovered: %v\n%s", r, debug.Stack())
199				}
200			}()
201			conn, err := d.listener.Accept()
202			if err != nil {
203				select {
204				case <-d.shutdown:
205					return true
206				default:
207					log.Printf("daemon: accept error: %v", err)
208					return false
209				}
210			}
211			rpcConn := daemonrpc.NewConn(conn)
212			d.addClient(rpcConn)
213			go d.handleClient(rpcConn)
214			return false
215		}()
216		if done {
217			return
218		}
219	}
220}
221
222func (d *Daemon) handleClient(conn *daemonrpc.Conn) {
223	defer d.removeClient(conn)
224	defer conn.Close() //nolint:errcheck
225
226	for {
227		msg, err := conn.ReceiveMessage()
228		if err != nil {
229			// Client disconnected or read error.
230			return
231		}
232		if msg.Request != nil {
233			d.handleRequest(conn, msg.Request)
234		}
235	}
236}
237
238func (d *Daemon) addClient(conn *daemonrpc.Conn) {
239	d.mu.Lock()
240	defer d.mu.Unlock()
241	d.clients[conn] = struct{}{}
242	log.Println("daemon: client connected")
243}
244
245func (d *Daemon) removeClient(conn *daemonrpc.Conn) {
246	d.mu.Lock()
247	delete(d.clients, conn)
248	d.mu.Unlock()
249
250	d.subMu.Lock()
251	delete(d.subscriptions, conn)
252	d.subMu.Unlock()
253
254	log.Println("daemon: client disconnected")
255}
256
257func (d *Daemon) closeAllClients() {
258	d.mu.Lock()
259	defer d.mu.Unlock()
260	for conn := range d.clients {
261		conn.Close() //nolint:errcheck,gosec
262	}
263	d.clients = make(map[*daemonrpc.Conn]struct{})
264}
265
266func (d *Daemon) closeProviders() {
267	d.mu.Lock()
268	defer d.mu.Unlock()
269	for id, p := range d.providers {
270		if err := p.Close(); err != nil {
271			log.Printf("daemon: error closing provider %s: %v", id, err)
272		}
273	}
274}
275
276// broadcastEvent sends an event to all connected clients.
277func (d *Daemon) broadcastEvent(eventType string, data interface{}) {
278	d.mu.RLock()
279	defer d.mu.RUnlock()
280	for conn := range d.clients {
281		if err := conn.SendEvent(eventType, data); err != nil {
282			log.Printf("daemon: broadcast error: %v", err)
283		}
284	}
285}
286
287// broadcastToSubscribers sends an event only to clients subscribed to the given account+folder.
288func (d *Daemon) broadcastToSubscribers(accountID, folder, eventType string, data interface{}) {
289	key := accountID + ":" + folder
290	d.subMu.RLock()
291	defer d.subMu.RUnlock()
292
293	for conn, subs := range d.subscriptions {
294		if _, ok := subs[key]; ok {
295			if err := conn.SendEvent(eventType, data); err != nil {
296				log.Printf("daemon: subscriber broadcast error: %v", err)
297			}
298		}
299	}
300}
301
302// getProvider returns the provider for the given account ID.
303func (d *Daemon) getProvider(accountID string) (backend.Provider, error) {
304	d.mu.RLock()
305	defer d.mu.RUnlock()
306	p, ok := d.providers[accountID]
307	if !ok {
308		return nil, fmt.Errorf("no provider for account %s", accountID)
309	}
310	return p, nil
311}
312
313// getAccount returns the account config for the given ID.
314func (d *Daemon) getAccount(accountID string) *config.Account {
315	d.mu.RLock()
316	defer d.mu.RUnlock()
317	return d.config.GetAccountByID(accountID)
318}
319
320// backgroundSync handles periodic sync and IDLE-like notifications.
321func (d *Daemon) backgroundSync(ctx context.Context) {
322	ticker := time.NewTicker(5 * time.Minute)
323	defer ticker.Stop()
324
325	for {
326		select {
327		case <-ctx.Done():
328			return
329		case <-ticker.C:
330			d.syncAllAccounts(ctx)
331		}
332	}
333}
334
335func (d *Daemon) syncAllAccounts(ctx context.Context) {
336	d.mu.RLock()
337	accounts := make([]config.Account, len(d.config.Accounts))
338	copy(accounts, d.config.Accounts)
339	d.mu.RUnlock()
340
341	for _, acct := range accounts {
342		select {
343		case <-ctx.Done():
344			return
345		default:
346		}
347
348		d.broadcastToSubscribers(acct.ID, inboxFolder, daemonrpc.EventSyncStarted, daemonrpc.SyncStartedEvent{
349			AccountID: acct.ID,
350			Folder:    inboxFolder,
351		})
352
353		p, err := d.getProvider(acct.ID)
354		if err != nil {
355			continue
356		}
357
358		emails, err := p.FetchEmails(ctx, inboxFolder, 50, 0)
359		if err != nil {
360			log.Printf("daemon: sync %s failed: %v", acct.Email, err)
361			d.broadcastToSubscribers(acct.ID, inboxFolder, daemonrpc.EventSyncError, daemonrpc.SyncErrorEvent{
362				AccountID: acct.ID,
363				Folder:    inboxFolder,
364				Error:     err.Error(),
365			})
366			continue
367		}
368
369		oldCached, _ := config.LoadFolderEmailCache(inboxFolder)
370		oldUIDs := make(map[uint32]struct{}, len(oldCached))
371		for _, e := range oldCached {
372			if e.AccountID == acct.ID {
373				oldUIDs[e.UID] = struct{}{}
374			}
375		}
376
377		// Cache the fetched emails to disk.
378		var cached []config.CachedEmail
379		for _, e := range emails {
380			cached = append(cached, config.CachedEmail{
381				UID:        e.UID,
382				From:       e.From,
383				To:         e.To,
384				Subject:    e.Subject,
385				Date:       e.Date,
386				MessageID:  e.MessageID,
387				InReplyTo:  e.InReplyTo,
388				References: e.References,
389				AccountID:  e.AccountID,
390				IsRead:     e.IsRead,
391			})
392		}
393		if err := d.updateFolderCache(inboxFolder, acct.ID, cached); err != nil {
394			log.Printf("daemon: cache update for INBOX failed: %v", err)
395		}
396
397		d.broadcastToSubscribers(acct.ID, inboxFolder, daemonrpc.EventSyncComplete, daemonrpc.SyncCompleteEvent{
398			AccountID:  acct.ID,
399			Folder:     inboxFolder,
400			EmailCount: len(emails),
401		})
402
403		newCount := 0
404		for _, e := range emails {
405			if _, seen := oldUIDs[e.UID]; !seen {
406				newCount++
407			}
408		}
409
410		// Send desktop notification if TUI not connected.
411		d.mu.RLock()
412		noClients := len(d.clients) == 0
413		d.mu.RUnlock()
414
415		if noClients && newCount > 0 {
416			if !d.config.DisableNotifications {
417				go notify.Send("Matcha", fmt.Sprintf("New mail for %s", acct.FetchEmail)) //nolint:errcheck
418			}
419		}
420	}
421}
422
423// startIdleWatchers starts IMAP IDLE watchers for all accounts on INBOX.
424func (d *Daemon) startIdleWatchers() {
425	d.mu.RLock()
426	defer d.mu.RUnlock()
427
428	for i := range d.config.Accounts {
429		acct := &d.config.Accounts[i]
430		// Only IMAP accounts support IDLE.
431		protocol := acct.Protocol
432		if protocol == "" {
433			protocol = "imap"
434		}
435		if protocol != "imap" {
436			continue
437		}
438		d.idleWatcher.Watch(acct, inboxFolder)
439		log.Printf("daemon: IDLE watcher started for %s", acct.Email)
440	}
441}
442
443// idleEventLoop listens for IDLE updates and broadcasts them as events.
444func (d *Daemon) idleEventLoop() {
445	for {
446		select {
447		case <-d.shutdown:
448			return
449		case update, ok := <-d.idleUpdates:
450			if !ok {
451				return
452			}
453			log.Printf("daemon: IDLE update for %s/%s", update.AccountID, update.FolderName)
454
455			// Desktop notification when no clients connected.
456			d.mu.RLock()
457			noClients := len(d.clients) == 0
458			d.mu.RUnlock()
459
460			if noClients && !d.config.DisableNotifications {
461				accountName := update.AccountID
462				if acct := d.config.GetAccountByID(update.AccountID); acct != nil {
463					accountName = acct.Email
464				}
465				go notify.Send("Matcha", fmt.Sprintf("New mail in %s (%s)", update.FolderName, accountName)) //nolint:errcheck
466			}
467
468			// Broadcast to subscribed clients.
469			d.broadcastToSubscribers(update.AccountID, update.FolderName, daemonrpc.EventNewMail, daemonrpc.NewMailEvent{
470				AccountID: update.AccountID,
471				Folder:    update.FolderName,
472			})
473
474			// Fetch and cache emails so they're fresh when TUI next connects.
475			go d.fetchAndCache(update.AccountID, update.FolderName)
476		}
477	}
478}
479
480// fetchAndCache fetches emails for an account/folder and saves to disk cache.
481func (d *Daemon) fetchAndCache(accountID, folder string) {
482	acct := d.getAccount(accountID)
483	if acct == nil {
484		return
485	}
486
487	emails, err := fetcher.FetchFolderEmails(acct, folder, 50, 0)
488	if err != nil {
489		log.Printf("daemon: cache fetch for %s/%s failed: %v", accountID, folder, err)
490		return
491	}
492
493	// Convert to cache format and save.
494	var cached []config.CachedEmail
495	for _, e := range emails {
496		cached = append(cached, config.CachedEmail{
497			UID:        e.UID,
498			From:       e.From,
499			To:         e.To,
500			Subject:    e.Subject,
501			Date:       e.Date,
502			MessageID:  e.MessageID,
503			InReplyTo:  e.InReplyTo,
504			References: e.References,
505			AccountID:  e.AccountID,
506			IsRead:     e.IsRead,
507		})
508	}
509
510	if err := d.updateFolderCache(folder, accountID, cached); err != nil {
511		log.Printf("daemon: cache update for %s failed: %v", folder, err)
512		return
513	}
514
515	log.Printf("daemon: cached %d emails for %s/%s", len(cached), accountID, folder)
516
517	// Also notify subscribers that emails were updated.
518	d.broadcastToSubscribers(accountID, folder, daemonrpc.EventSyncComplete, daemonrpc.SyncCompleteEvent{
519		AccountID:  accountID,
520		Folder:     folder,
521		EmailCount: len(emails),
522	})
523}
524
525// updateFolderCache safely merges new emails for a specific account into the existing folder cache.
526func (d *Daemon) updateFolderCache(folderName, accountID string, newEmails []config.CachedEmail) error {
527	d.cacheMu.Lock()
528	defer d.cacheMu.Unlock()
529
530	// Load existing cache
531	existing, _ := config.LoadFolderEmailCache(folderName) // Ignore error, assume empty if missing
532
533	// Filter out old emails for this account
534	var merged []config.CachedEmail
535	for _, e := range existing {
536		if e.AccountID != accountID {
537			merged = append(merged, e)
538		}
539	}
540
541	// Append new emails
542	merged = append(merged, newEmails...)
543
544	// Sort newest first
545	sort.Slice(merged, func(i, j int) bool {
546		return merged[i].Date.After(merged[j].Date)
547	})
548
549	// Save merged cache
550	return config.SaveFolderEmailCache(folderName, merged)
551}