daemon.go

  1package daemon
  2
  3import (
  4	"context"
  5	"fmt"
  6	"log"
  7	"net"
  8	"os"
  9	"runtime/debug"
 10	"sort"
 11	"sync"
 12	"time"
 13
 14	"github.com/floatpane/matcha/backend"
 15	"github.com/floatpane/matcha/config"
 16	"github.com/floatpane/matcha/daemonrpc"
 17	"github.com/floatpane/matcha/fetcher"
 18	"github.com/floatpane/matcha/notify"
 19)
 20
 21// Daemon is the long-running background process that manages email
 22// connections, caching, sync, and notifications.
 23type Daemon struct {
 24	config    *config.Config
 25	providers map[string]backend.Provider
 26	listener  net.Listener
 27	startTime time.Time
 28
 29	// Connected TUI/CLI clients.
 30	clients map[*daemonrpc.Conn]struct{}
 31	mu      sync.RWMutex
 32
 33	// Per-client subscriptions: conn → set of "accountID:folder".
 34	subscriptions map[*daemonrpc.Conn]map[string]struct{}
 35	subMu         sync.RWMutex
 36
 37	// Mutex for disk cache updates.
 38	cacheMu sync.Mutex
 39
 40	// IMAP IDLE watcher for push notifications.
 41	idleWatcher *fetcher.IdleWatcher
 42	idleUpdates chan fetcher.IdleUpdate
 43
 44	// Background sync cancellation.
 45	syncCancel context.CancelFunc
 46
 47	shutdown chan struct{}
 48	done     chan struct{}
 49}
 50
 51// New creates a daemon with the given config.
 52func New(cfg *config.Config) *Daemon {
 53	idleUpdates := make(chan fetcher.IdleUpdate, 16)
 54	return &Daemon{
 55		config:        cfg,
 56		providers:     make(map[string]backend.Provider),
 57		clients:       make(map[*daemonrpc.Conn]struct{}),
 58		subscriptions: make(map[*daemonrpc.Conn]map[string]struct{}),
 59		idleWatcher:   fetcher.NewIdleWatcher(idleUpdates),
 60		idleUpdates:   idleUpdates,
 61		shutdown:      make(chan struct{}),
 62		done:          make(chan struct{}),
 63	}
 64}
 65
 66// Run starts the daemon: creates providers, starts the socket listener,
 67// starts background sync, and blocks until shutdown.
 68func (d *Daemon) Run() error {
 69	d.startTime = time.Now()
 70
 71	// Ensure runtime directory exists.
 72	if err := daemonrpc.EnsureRuntimeDir(); err != nil {
 73		return fmt.Errorf("create runtime dir: %w", err)
 74	}
 75
 76	// Check for existing daemon.
 77	pidPath := daemonrpc.PIDPath()
 78	if pid, running := IsRunning(pidPath); running {
 79		return fmt.Errorf("daemon already running (PID %d)", pid)
 80	}
 81
 82	// Write PID file.
 83	if err := WritePID(pidPath); err != nil {
 84		return fmt.Errorf("write PID file: %w", err)
 85	}
 86	defer RemovePID(pidPath)
 87
 88	// Remove stale socket file.
 89	sockPath := daemonrpc.SocketPath()
 90	os.Remove(sockPath)
 91
 92	// Listen on Unix domain socket.
 93	var err error
 94	d.listener, err = net.Listen("unix", sockPath)
 95	if err != nil {
 96		return fmt.Errorf("listen: %w", err)
 97	}
 98	defer d.listener.Close()
 99
100	// Set socket permissions (owner only).
101	os.Chmod(sockPath, 0700)
102
103	log.Printf("daemon: listening on %s (PID %d)", sockPath, os.Getpid())
104
105	// Initialize providers for all accounts.
106	d.initProviders()
107
108	// Start IMAP IDLE watchers for all accounts.
109	d.startIdleWatchers()
110	go d.idleEventLoop()
111
112	// Start signal handler.
113	go d.handleSignals()
114
115	// Start background sync.
116	ctx, cancel := context.WithCancel(context.Background())
117	d.syncCancel = cancel
118	go d.backgroundSync(ctx)
119
120	// Accept client connections.
121	go d.acceptLoop()
122
123	// Block until shutdown.
124	<-d.shutdown
125
126	// Cleanup.
127	log.Println("daemon: shutting down")
128	d.listener.Close()
129	d.idleWatcher.StopAll()
130	cancel()
131	d.closeAllClients()
132	d.closeProviders()
133
134	close(d.done)
135	return nil
136}
137
138// Shutdown triggers a graceful shutdown.
139func (d *Daemon) Shutdown() {
140	select {
141	case <-d.shutdown:
142		// Already shutting down.
143	default:
144		close(d.shutdown)
145	}
146}
147
148// ReloadConfig reloads the configuration from disk.
149func (d *Daemon) ReloadConfig() error {
150	cfg, err := config.LoadConfig()
151	if err != nil {
152		return fmt.Errorf("load config: %w", err)
153	}
154	d.mu.Lock()
155	d.config = cfg
156	d.mu.Unlock()
157
158	// Reinitialize providers for new/changed accounts.
159	d.initProviders()
160
161	// Notify clients.
162	d.broadcastEvent(daemonrpc.EventConfigReloaded, nil)
163
164	log.Println("daemon: config reloaded")
165	return nil
166}
167
168func (d *Daemon) initProviders() {
169	d.mu.Lock()
170	defer d.mu.Unlock()
171
172	for i := range d.config.Accounts {
173		acct := &d.config.Accounts[i]
174		if _, exists := d.providers[acct.ID]; exists {
175			continue
176		}
177		p, err := backend.New(acct)
178		if err != nil {
179			log.Printf("daemon: failed to create provider for %s: %v", acct.Email, err)
180			continue
181		}
182		d.providers[acct.ID] = p
183		log.Printf("daemon: provider ready for %s (%s)", acct.Email, acct.Protocol)
184	}
185}
186
187func (d *Daemon) acceptLoop() {
188	for {
189		done := func() bool {
190			defer func() {
191				if r := recover(); r != nil {
192					log.Printf("daemon: acceptLoop panic recovered: %v\n%s", r, debug.Stack())
193				}
194			}()
195			conn, err := d.listener.Accept()
196			if err != nil {
197				select {
198				case <-d.shutdown:
199					return true
200				default:
201					log.Printf("daemon: accept error: %v", err)
202					return false
203				}
204			}
205			rpcConn := daemonrpc.NewConn(conn)
206			d.addClient(rpcConn)
207			go d.handleClient(rpcConn)
208			return false
209		}()
210		if done {
211			return
212		}
213	}
214}
215
216func (d *Daemon) handleClient(conn *daemonrpc.Conn) {
217	defer d.removeClient(conn)
218	defer conn.Close()
219
220	for {
221		msg, err := conn.ReceiveMessage()
222		if err != nil {
223			// Client disconnected or read error.
224			return
225		}
226		if msg.Request != nil {
227			d.handleRequest(conn, msg.Request)
228		}
229	}
230}
231
232func (d *Daemon) addClient(conn *daemonrpc.Conn) {
233	d.mu.Lock()
234	defer d.mu.Unlock()
235	d.clients[conn] = struct{}{}
236	log.Println("daemon: client connected")
237}
238
239func (d *Daemon) removeClient(conn *daemonrpc.Conn) {
240	d.mu.Lock()
241	delete(d.clients, conn)
242	d.mu.Unlock()
243
244	d.subMu.Lock()
245	delete(d.subscriptions, conn)
246	d.subMu.Unlock()
247
248	log.Println("daemon: client disconnected")
249}
250
251func (d *Daemon) closeAllClients() {
252	d.mu.Lock()
253	defer d.mu.Unlock()
254	for conn := range d.clients {
255		conn.Close()
256	}
257	d.clients = make(map[*daemonrpc.Conn]struct{})
258}
259
260func (d *Daemon) closeProviders() {
261	d.mu.Lock()
262	defer d.mu.Unlock()
263	for id, p := range d.providers {
264		if err := p.Close(); err != nil {
265			log.Printf("daemon: error closing provider %s: %v", id, err)
266		}
267	}
268}
269
270// broadcastEvent sends an event to all connected clients.
271func (d *Daemon) broadcastEvent(eventType string, data interface{}) {
272	d.mu.RLock()
273	defer d.mu.RUnlock()
274	for conn := range d.clients {
275		if err := conn.SendEvent(eventType, data); err != nil {
276			log.Printf("daemon: broadcast error: %v", err)
277		}
278	}
279}
280
281// broadcastToSubscribers sends an event only to clients subscribed to the given account+folder.
282func (d *Daemon) broadcastToSubscribers(accountID, folder, eventType string, data interface{}) {
283	key := accountID + ":" + folder
284	d.subMu.RLock()
285	defer d.subMu.RUnlock()
286
287	for conn, subs := range d.subscriptions {
288		if _, ok := subs[key]; ok {
289			if err := conn.SendEvent(eventType, data); err != nil {
290				log.Printf("daemon: subscriber broadcast error: %v", err)
291			}
292		}
293	}
294}
295
296// getProvider returns the provider for the given account ID.
297func (d *Daemon) getProvider(accountID string) (backend.Provider, error) {
298	d.mu.RLock()
299	defer d.mu.RUnlock()
300	p, ok := d.providers[accountID]
301	if !ok {
302		return nil, fmt.Errorf("no provider for account %s", accountID)
303	}
304	return p, nil
305}
306
307// getAccount returns the account config for the given ID.
308func (d *Daemon) getAccount(accountID string) *config.Account {
309	d.mu.RLock()
310	defer d.mu.RUnlock()
311	return d.config.GetAccountByID(accountID)
312}
313
314// backgroundSync handles periodic sync and IDLE-like notifications.
315func (d *Daemon) backgroundSync(ctx context.Context) {
316	ticker := time.NewTicker(5 * time.Minute)
317	defer ticker.Stop()
318
319	for {
320		select {
321		case <-ctx.Done():
322			return
323		case <-ticker.C:
324			d.syncAllAccounts(ctx)
325		}
326	}
327}
328
329func (d *Daemon) syncAllAccounts(ctx context.Context) {
330	d.mu.RLock()
331	accounts := make([]config.Account, len(d.config.Accounts))
332	copy(accounts, d.config.Accounts)
333	d.mu.RUnlock()
334
335	for _, acct := range accounts {
336		select {
337		case <-ctx.Done():
338			return
339		default:
340		}
341
342		d.broadcastToSubscribers(acct.ID, "INBOX", daemonrpc.EventSyncStarted, daemonrpc.SyncStartedEvent{
343			AccountID: acct.ID,
344			Folder:    "INBOX",
345		})
346
347		p, err := d.getProvider(acct.ID)
348		if err != nil {
349			continue
350		}
351
352		emails, err := p.FetchEmails(ctx, "INBOX", 50, 0)
353		if err != nil {
354			log.Printf("daemon: sync %s failed: %v", acct.Email, err)
355			d.broadcastToSubscribers(acct.ID, "INBOX", daemonrpc.EventSyncError, daemonrpc.SyncErrorEvent{
356				AccountID: acct.ID,
357				Folder:    "INBOX",
358				Error:     err.Error(),
359			})
360			continue
361		}
362
363		oldCached, _ := config.LoadFolderEmailCache("INBOX")
364		oldUIDs := make(map[uint32]struct{}, len(oldCached))
365		for _, e := range oldCached {
366			if e.AccountID == acct.ID {
367				oldUIDs[e.UID] = struct{}{}
368			}
369		}
370
371		// Cache the fetched emails to disk.
372		var cached []config.CachedEmail
373		for _, e := range emails {
374			cached = append(cached, config.CachedEmail{
375				UID:        e.UID,
376				From:       e.From,
377				To:         e.To,
378				Subject:    e.Subject,
379				Date:       e.Date,
380				MessageID:  e.MessageID,
381				InReplyTo:  e.InReplyTo,
382				References: e.References,
383				AccountID:  e.AccountID,
384				IsRead:     e.IsRead,
385			})
386		}
387		if err := d.updateFolderCache("INBOX", acct.ID, cached); err != nil {
388			log.Printf("daemon: cache update for INBOX failed: %v", err)
389		}
390
391		d.broadcastToSubscribers(acct.ID, "INBOX", daemonrpc.EventSyncComplete, daemonrpc.SyncCompleteEvent{
392			AccountID:  acct.ID,
393			Folder:     "INBOX",
394			EmailCount: len(emails),
395		})
396
397		newCount := 0
398		for _, e := range emails {
399			if _, seen := oldUIDs[e.UID]; !seen {
400				newCount++
401			}
402		}
403
404		// Send desktop notification if TUI not connected.
405		d.mu.RLock()
406		noClients := len(d.clients) == 0
407		d.mu.RUnlock()
408
409		if noClients && newCount > 0 {
410			if !d.config.DisableNotifications {
411				go notify.Send("Matcha", fmt.Sprintf("New mail for %s", acct.FetchEmail))
412			}
413		}
414	}
415}
416
417// startIdleWatchers starts IMAP IDLE watchers for all accounts on INBOX.
418func (d *Daemon) startIdleWatchers() {
419	d.mu.RLock()
420	defer d.mu.RUnlock()
421
422	for i := range d.config.Accounts {
423		acct := &d.config.Accounts[i]
424		// Only IMAP accounts support IDLE.
425		protocol := acct.Protocol
426		if protocol == "" {
427			protocol = "imap"
428		}
429		if protocol != "imap" {
430			continue
431		}
432		d.idleWatcher.Watch(acct, "INBOX")
433		log.Printf("daemon: IDLE watcher started for %s", acct.Email)
434	}
435}
436
437// idleEventLoop listens for IDLE updates and broadcasts them as events.
438func (d *Daemon) idleEventLoop() {
439	for {
440		select {
441		case <-d.shutdown:
442			return
443		case update, ok := <-d.idleUpdates:
444			if !ok {
445				return
446			}
447			log.Printf("daemon: IDLE update for %s/%s", update.AccountID, update.FolderName)
448
449			// Desktop notification when no clients connected.
450			d.mu.RLock()
451			noClients := len(d.clients) == 0
452			d.mu.RUnlock()
453
454			if noClients && !d.config.DisableNotifications {
455				accountName := update.AccountID
456				if acct := d.config.GetAccountByID(update.AccountID); acct != nil {
457					accountName = acct.Email
458				}
459				go notify.Send("Matcha", fmt.Sprintf("New mail in %s (%s)", update.FolderName, accountName))
460			}
461
462			// Broadcast to subscribed clients.
463			d.broadcastToSubscribers(update.AccountID, update.FolderName, daemonrpc.EventNewMail, daemonrpc.NewMailEvent{
464				AccountID: update.AccountID,
465				Folder:    update.FolderName,
466			})
467
468			// Fetch and cache emails so they're fresh when TUI next connects.
469			go d.fetchAndCache(update.AccountID, update.FolderName)
470		}
471	}
472}
473
474// fetchAndCache fetches emails for an account/folder and saves to disk cache.
475func (d *Daemon) fetchAndCache(accountID, folder string) {
476	acct := d.getAccount(accountID)
477	if acct == nil {
478		return
479	}
480
481	emails, err := fetcher.FetchFolderEmails(acct, folder, 50, 0)
482	if err != nil {
483		log.Printf("daemon: cache fetch for %s/%s failed: %v", accountID, folder, err)
484		return
485	}
486
487	// Convert to cache format and save.
488	var cached []config.CachedEmail
489	for _, e := range emails {
490		cached = append(cached, config.CachedEmail{
491			UID:        e.UID,
492			From:       e.From,
493			To:         e.To,
494			Subject:    e.Subject,
495			Date:       e.Date,
496			MessageID:  e.MessageID,
497			InReplyTo:  e.InReplyTo,
498			References: e.References,
499			AccountID:  e.AccountID,
500			IsRead:     e.IsRead,
501		})
502	}
503
504	if err := d.updateFolderCache(folder, accountID, cached); err != nil {
505		log.Printf("daemon: cache update for %s failed: %v", folder, err)
506		return
507	}
508
509	log.Printf("daemon: cached %d emails for %s/%s", len(cached), accountID, folder)
510
511	// Also notify subscribers that emails were updated.
512	d.broadcastToSubscribers(accountID, folder, daemonrpc.EventSyncComplete, daemonrpc.SyncCompleteEvent{
513		AccountID:  accountID,
514		Folder:     folder,
515		EmailCount: len(emails),
516	})
517}
518
519// updateFolderCache safely merges new emails for a specific account into the existing folder cache.
520func (d *Daemon) updateFolderCache(folderName, accountID string, newEmails []config.CachedEmail) error {
521	d.cacheMu.Lock()
522	defer d.cacheMu.Unlock()
523
524	// Load existing cache
525	existing, _ := config.LoadFolderEmailCache(folderName) // Ignore error, assume empty if missing
526
527	// Filter out old emails for this account
528	var merged []config.CachedEmail
529	for _, e := range existing {
530		if e.AccountID != accountID {
531			merged = append(merged, e)
532		}
533	}
534
535	// Append new emails
536	merged = append(merged, newEmails...)
537
538	// Sort newest first
539	sort.Slice(merged, func(i, j int) bool {
540		return merged[i].Date.After(merged[j].Date)
541	})
542
543	// Save merged cache
544	return config.SaveFolderEmailCache(folderName, merged)
545}