daemon.go

  1package daemon
  2
  3import (
  4	"context"
  5	"fmt"
  6	"log"
  7	"net"
  8	"os"
  9	"sort"
 10	"sync"
 11	"time"
 12
 13	udsrpc "github.com/floatpane/go-uds-jsonrpc"
 14	"github.com/floatpane/matcha/backend"
 15	"github.com/floatpane/matcha/config"
 16	"github.com/floatpane/matcha/daemonrpc"
 17	"github.com/floatpane/matcha/fetcher"
 18	"github.com/floatpane/matcha/notify"
 19)
 20
 21const inboxFolder = "INBOX"
 22
 23// Daemon is the long-running background process that manages email
 24// connections, caching, sync, and notifications.
 25type Daemon struct {
 26	config    *config.Config
 27	providers map[string]backend.Provider
 28	server    *udsrpc.Server
 29	startTime time.Time
 30
 31	mu sync.RWMutex
 32
 33	// Per-client subscriptions: conn → set of "accountID:folder".
 34	subscriptions map[*daemonrpc.Conn]map[string]struct{}
 35	subMu         sync.RWMutex
 36
 37	// Mutex for disk cache updates.
 38	cacheMu sync.Mutex
 39
 40	// IMAP IDLE watcher for push notifications.
 41	idleWatcher *fetcher.IdleWatcher
 42	idleUpdates chan fetcher.IdleUpdate
 43
 44	// Background sync cancellation.
 45	syncCancel context.CancelFunc
 46
 47	shutdown chan struct{}
 48	done     chan struct{}
 49}
 50
 51// New creates a daemon with the given config.
 52func New(cfg *config.Config) *Daemon {
 53	idleUpdates := make(chan fetcher.IdleUpdate, 16)
 54	d := &Daemon{
 55		config:        cfg,
 56		providers:     make(map[string]backend.Provider),
 57		subscriptions: make(map[*daemonrpc.Conn]map[string]struct{}),
 58		idleWatcher:   fetcher.NewIdleWatcher(idleUpdates),
 59		idleUpdates:   idleUpdates,
 60		shutdown:      make(chan struct{}),
 61		done:          make(chan struct{}),
 62	}
 63
 64	d.server = udsrpc.NewServer()
 65	d.registerHandlers()
 66	d.server.OnConnect(func(_ *daemonrpc.Conn) {
 67		log.Println("daemon: client connected")
 68	})
 69	d.server.OnDisconnect(func(conn *daemonrpc.Conn) {
 70		d.subMu.Lock()
 71		delete(d.subscriptions, conn)
 72		d.subMu.Unlock()
 73		log.Println("daemon: client disconnected")
 74	})
 75
 76	return d
 77}
 78
 79// registerHandlers wires each RPC method to its handler on the server.
 80func (d *Daemon) registerHandlers() {
 81	d.server.Handle(daemonrpc.MethodPing, d.handlePing)
 82	d.server.Handle(daemonrpc.MethodGetStatus, d.handleGetStatus)
 83	d.server.Handle(daemonrpc.MethodGetAccounts, d.handleGetAccounts)
 84	d.server.Handle(daemonrpc.MethodReloadConfig, d.handleReloadConfig)
 85	d.server.Handle(daemonrpc.MethodFetchEmails, d.handleFetchEmails)
 86	d.server.Handle(daemonrpc.MethodFetchEmailBody, d.handleFetchEmailBody)
 87	d.server.Handle(daemonrpc.MethodDeleteEmails, d.handleDeleteEmails)
 88	d.server.Handle(daemonrpc.MethodArchiveEmails, d.handleArchiveEmails)
 89	d.server.Handle(daemonrpc.MethodMoveEmails, d.handleMoveEmails)
 90	d.server.Handle(daemonrpc.MethodMarkRead, d.handleMarkRead)
 91	d.server.Handle(daemonrpc.MethodFetchFolders, d.handleFetchFolders)
 92	d.server.Handle(daemonrpc.MethodRefreshFolder, d.handleRefreshFolder)
 93	d.server.Handle(daemonrpc.MethodSubscribe, d.handleSubscribe)
 94	d.server.Handle(daemonrpc.MethodUnsubscribe, d.handleUnsubscribe)
 95}
 96
 97// Run starts the daemon: creates providers, starts the socket listener,
 98// starts background sync, and blocks until shutdown.
 99func (d *Daemon) Run() error {
100	d.startTime = time.Now()
101
102	// Ensure runtime directory exists.
103	if err := daemonrpc.EnsureRuntimeDir(); err != nil {
104		return fmt.Errorf("create runtime dir: %w", err)
105	}
106
107	// Check for existing daemon.
108	pidPath := daemonrpc.PIDPath()
109	if pid, running := IsRunning(pidPath); running {
110		return fmt.Errorf("daemon already running (PID %d)", pid)
111	}
112
113	// Write PID file.
114	if err := WritePID(pidPath); err != nil {
115		return fmt.Errorf("write PID file: %w", err)
116	}
117	defer RemovePID(pidPath) //nolint:errcheck
118
119	// Remove stale socket file.
120	sockPath := daemonrpc.SocketPath()
121	if err := os.Remove(sockPath); err != nil && !os.IsNotExist(err) {
122		return fmt.Errorf("remove stale socket: %w", err)
123	}
124
125	// Listen on Unix domain socket.
126	listener, err := net.Listen("unix", sockPath) //nolint:noctx
127	if err != nil {
128		return fmt.Errorf("listen: %w", err)
129	}
130	defer listener.Close() //nolint:errcheck
131
132	// Set socket permissions (owner only).
133	if err := os.Chmod(sockPath, 0700); err != nil { // #nosec G302
134		return fmt.Errorf("set socket permissions: %w", err)
135	}
136
137	log.Printf("daemon: listening on %s (PID %d)", sockPath, os.Getpid())
138
139	// Initialize providers for all accounts.
140	d.initProviders()
141
142	// Start IMAP IDLE watchers for all accounts.
143	d.startIdleWatchers()
144	go d.idleEventLoop()
145
146	// Handle OS signals: SIGTERM/SIGINT → shutdown, SIGHUP → reload config.
147	stopSignals := udsrpc.HandleSignals(d.Shutdown, func() {
148		log.Println("daemon: received SIGHUP, reloading config")
149		if err := d.ReloadConfig(); err != nil {
150			log.Printf("daemon: config reload failed: %v", err)
151		}
152	})
153	defer stopSignals()
154
155	// Start background sync.
156	ctx, cancel := context.WithCancel(context.Background())
157	d.syncCancel = cancel
158	go d.backgroundSync(ctx)
159
160	// Serve client connections via the shared RPC server. Canceling serveCtx
161	// closes the listener and unblocks Serve.
162	serveCtx, serveCancel := context.WithCancel(context.Background())
163	go func() {
164		if err := d.server.Serve(serveCtx, listener); err != nil {
165			log.Printf("daemon: serve error: %v", err)
166		}
167	}()
168
169	// Block until shutdown.
170	<-d.shutdown
171
172	// Cleanup.
173	log.Println("daemon: shutting down")
174	serveCancel()
175	for _, conn := range d.server.Clients() {
176		conn.Close() //nolint:errcheck,gosec
177	}
178	if err := d.idleWatcher.StopAllAndWaitTimeout(5 * time.Second); err != nil {
179		log.Printf("daemon: %v", err)
180	}
181	cancel()
182	d.closeProviders()
183
184	close(d.done)
185	return nil
186}
187
188// Shutdown triggers a graceful shutdown.
189func (d *Daemon) Shutdown() {
190	select {
191	case <-d.shutdown:
192		// Already shutting down.
193	default:
194		close(d.shutdown)
195	}
196}
197
198// ReloadConfig reloads the configuration from disk.
199func (d *Daemon) ReloadConfig() error {
200	cfg, err := config.LoadConfig()
201	if err != nil {
202		return fmt.Errorf("load config: %w", err)
203	}
204	d.mu.Lock()
205	d.config = cfg
206	d.mu.Unlock()
207
208	// Reinitialize providers for new/changed accounts.
209	d.initProviders()
210
211	// Notify clients.
212	d.broadcastEvent(daemonrpc.EventConfigReloaded, nil)
213
214	log.Println("daemon: config reloaded")
215	return nil
216}
217
218func (d *Daemon) initProviders() {
219	d.mu.Lock()
220	defer d.mu.Unlock()
221
222	for i := range d.config.Accounts {
223		acct := &d.config.Accounts[i]
224		if _, exists := d.providers[acct.ID]; exists {
225			continue
226		}
227		p, err := backend.New(acct)
228		if err != nil {
229			log.Printf("daemon: failed to create provider for %s: %v", acct.Email, err)
230			continue
231		}
232		d.providers[acct.ID] = p
233		log.Printf("daemon: provider ready for %s (%s)", acct.Email, acct.Protocol)
234	}
235}
236
237func (d *Daemon) closeProviders() {
238	d.mu.Lock()
239	defer d.mu.Unlock()
240	for id, p := range d.providers {
241		if err := p.Close(); err != nil {
242			log.Printf("daemon: error closing provider %s: %v", id, err)
243		}
244	}
245}
246
247// broadcastEvent sends an event to all connected clients.
248func (d *Daemon) broadcastEvent(eventType string, data interface{}) {
249	d.server.Broadcast(eventType, data)
250}
251
252// broadcastToSubscribers sends an event only to clients subscribed to the given account+folder.
253func (d *Daemon) broadcastToSubscribers(accountID, folder, eventType string, data interface{}) {
254	key := accountID + ":" + folder
255	d.server.BroadcastFunc(eventType, data, func(conn *daemonrpc.Conn) bool {
256		d.subMu.RLock()
257		defer d.subMu.RUnlock()
258		_, ok := d.subscriptions[conn][key]
259		return ok
260	})
261}
262
263// getProvider returns the provider for the given account ID.
264func (d *Daemon) getProvider(accountID string) (backend.Provider, error) {
265	d.mu.RLock()
266	defer d.mu.RUnlock()
267	p, ok := d.providers[accountID]
268	if !ok {
269		return nil, fmt.Errorf("no provider for account %s", accountID)
270	}
271	return p, nil
272}
273
274// getAccount returns the account config for the given ID.
275func (d *Daemon) getAccount(accountID string) *config.Account {
276	d.mu.RLock()
277	defer d.mu.RUnlock()
278	return d.config.GetAccountByID(accountID)
279}
280
281// backgroundSync handles periodic sync and IDLE-like notifications.
282func (d *Daemon) backgroundSync(ctx context.Context) {
283	ticker := time.NewTicker(5 * time.Minute)
284	defer ticker.Stop()
285
286	for {
287		select {
288		case <-ctx.Done():
289			return
290		case <-ticker.C:
291			d.syncAllAccounts(ctx)
292		}
293	}
294}
295
296func (d *Daemon) syncAllAccounts(ctx context.Context) {
297	d.mu.RLock()
298	accounts := make([]config.Account, len(d.config.Accounts))
299	copy(accounts, d.config.Accounts)
300	d.mu.RUnlock()
301
302	for _, acct := range accounts {
303		select {
304		case <-ctx.Done():
305			return
306		default:
307		}
308
309		d.broadcastToSubscribers(acct.ID, inboxFolder, daemonrpc.EventSyncStarted, daemonrpc.SyncStartedEvent{
310			AccountID: acct.ID,
311			Folder:    inboxFolder,
312		})
313
314		p, err := d.getProvider(acct.ID)
315		if err != nil {
316			continue
317		}
318
319		emails, err := p.FetchEmails(ctx, inboxFolder, 50, 0)
320		if err != nil {
321			log.Printf("daemon: sync %s failed: %v", acct.Email, err)
322			d.broadcastToSubscribers(acct.ID, inboxFolder, daemonrpc.EventSyncError, daemonrpc.SyncErrorEvent{
323				AccountID: acct.ID,
324				Folder:    inboxFolder,
325				Error:     err.Error(),
326			})
327			continue
328		}
329
330		oldCached, _ := config.LoadFolderEmailCache(inboxFolder)
331		oldUIDs := make(map[uint32]struct{}, len(oldCached))
332		for _, e := range oldCached {
333			if e.AccountID == acct.ID {
334				oldUIDs[e.UID] = struct{}{}
335			}
336		}
337
338		// Cache the fetched emails to disk.
339		var cached []config.CachedEmail
340		for _, e := range emails {
341			cached = append(cached, config.CachedEmail{
342				UID:        e.UID,
343				From:       e.From,
344				To:         e.To,
345				Subject:    e.Subject,
346				Date:       e.Date,
347				MessageID:  e.MessageID,
348				InReplyTo:  e.InReplyTo,
349				References: e.References,
350				AccountID:  e.AccountID,
351				IsRead:     e.IsRead,
352			})
353		}
354		if err := d.updateFolderCache(inboxFolder, acct.ID, cached); err != nil {
355			log.Printf("daemon: cache update for INBOX failed: %v", err)
356		}
357
358		d.broadcastToSubscribers(acct.ID, inboxFolder, daemonrpc.EventSyncComplete, daemonrpc.SyncCompleteEvent{
359			AccountID:  acct.ID,
360			Folder:     inboxFolder,
361			EmailCount: len(emails),
362		})
363
364		newCount := 0
365		for _, e := range emails {
366			if _, seen := oldUIDs[e.UID]; !seen {
367				newCount++
368			}
369		}
370
371		// Send desktop notification if TUI not connected.
372		noClients := len(d.server.Clients()) == 0
373
374		if noClients && newCount > 0 {
375			if !d.config.DisableNotifications {
376				go notify.Send("Matcha", fmt.Sprintf("New mail for %s", acct.FetchEmail)) //nolint:errcheck
377			}
378		}
379	}
380}
381
382// startIdleWatchers starts IMAP IDLE watchers for all accounts on INBOX.
383func (d *Daemon) startIdleWatchers() {
384	d.mu.RLock()
385	defer d.mu.RUnlock()
386
387	for i := range d.config.Accounts {
388		acct := &d.config.Accounts[i]
389		// Only IMAP accounts support IDLE.
390		protocol := acct.Protocol
391		if protocol == "" {
392			protocol = "imap"
393		}
394		if protocol != "imap" {
395			continue
396		}
397		d.idleWatcher.Watch(acct, inboxFolder)
398		log.Printf("daemon: IDLE watcher started for %s", acct.Email)
399	}
400}
401
402// idleEventLoop listens for IDLE updates and broadcasts them as events.
403func (d *Daemon) idleEventLoop() {
404	for {
405		select {
406		case <-d.shutdown:
407			return
408		case update, ok := <-d.idleUpdates:
409			if !ok {
410				return
411			}
412			log.Printf("daemon: IDLE update for %s/%s", update.AccountID, update.FolderName)
413
414			// Desktop notification when no clients connected.
415			noClients := len(d.server.Clients()) == 0
416
417			if noClients && !d.config.DisableNotifications {
418				accountName := update.AccountID
419				if acct := d.config.GetAccountByID(update.AccountID); acct != nil {
420					accountName = acct.Email
421				}
422				go notify.Send("Matcha", fmt.Sprintf("New mail in %s (%s)", update.FolderName, accountName)) //nolint:errcheck
423			}
424
425			// Broadcast to subscribed clients.
426			d.broadcastToSubscribers(update.AccountID, update.FolderName, daemonrpc.EventNewMail, daemonrpc.NewMailEvent{
427				AccountID: update.AccountID,
428				Folder:    update.FolderName,
429			})
430
431			// Fetch and cache emails so they're fresh when TUI next connects.
432			go d.fetchAndCache(update.AccountID, update.FolderName)
433		}
434	}
435}
436
437// fetchAndCache fetches emails for an account/folder and saves to disk cache.
438func (d *Daemon) fetchAndCache(accountID, folder string) {
439	acct := d.getAccount(accountID)
440	if acct == nil {
441		return
442	}
443
444	emails, err := fetcher.FetchFolderEmails(acct, folder, 50, 0)
445	if err != nil {
446		log.Printf("daemon: cache fetch for %s/%s failed: %v", accountID, folder, err)
447		return
448	}
449
450	// Convert to cache format and save.
451	var cached []config.CachedEmail
452	for _, e := range emails {
453		cached = append(cached, config.CachedEmail{
454			UID:        e.UID,
455			From:       e.From,
456			To:         e.To,
457			Subject:    e.Subject,
458			Date:       e.Date,
459			MessageID:  e.MessageID,
460			InReplyTo:  e.InReplyTo,
461			References: e.References,
462			AccountID:  e.AccountID,
463			IsRead:     e.IsRead,
464		})
465	}
466
467	if err := d.updateFolderCache(folder, accountID, cached); err != nil {
468		log.Printf("daemon: cache update for %s failed: %v", folder, err)
469		return
470	}
471
472	log.Printf("daemon: cached %d emails for %s/%s", len(cached), accountID, folder)
473
474	// Also notify subscribers that emails were updated.
475	d.broadcastToSubscribers(accountID, folder, daemonrpc.EventSyncComplete, daemonrpc.SyncCompleteEvent{
476		AccountID:  accountID,
477		Folder:     folder,
478		EmailCount: len(emails),
479	})
480}
481
482// updateFolderCache safely merges new emails for a specific account into the existing folder cache.
483func (d *Daemon) updateFolderCache(folderName, accountID string, newEmails []config.CachedEmail) error {
484	d.cacheMu.Lock()
485	defer d.cacheMu.Unlock()
486
487	// Load existing cache
488	existing, _ := config.LoadFolderEmailCache(folderName) // Ignore error, assume empty if missing
489
490	// Filter out old emails for this account
491	var merged []config.CachedEmail
492	for _, e := range existing {
493		if e.AccountID != accountID {
494			merged = append(merged, e)
495		}
496	}
497
498	// Append new emails
499	merged = append(merged, newEmails...)
500
501	// Sort newest first
502	sort.Slice(merged, func(i, j int) bool {
503		return merged[i].Date.After(merged[j].Date)
504	})
505
506	// Save merged cache
507	return config.SaveFolderEmailCache(folderName, merged)
508}