From 13f8554e7f79df8aa0a46e389b868beb0c729df9 Mon Sep 17 00:00:00 2001 From: Drew Smirnoff Date: Sun, 19 Apr 2026 19:05:54 +0400 Subject: [PATCH] feat: daemon (#682) Co-authored-by: Steve Evans Co-authored-by: Lea Co-authored-by: Andriy --- daemon/daemon.go | 514 +++++++++++++++++++++++++++++++ daemon/daemon_test.go | 222 +++++++++++++ daemon/handler.go | 368 ++++++++++++++++++++++ daemon/pidfile_unix.go | 45 +++ daemon/pidfile_windows.go | 62 ++++ daemon/signals.go | 35 +++ daemonclient/client.go | 156 ++++++++++ daemonclient/client_test.go | 155 ++++++++++ daemonclient/procattr_unix.go | 12 + daemonclient/procattr_windows.go | 12 + daemonclient/service.go | 328 ++++++++++++++++++++ daemonrpc/protocol.go | 269 ++++++++++++++++ daemonrpc/protocol_test.go | 138 +++++++++ daemonrpc/socket.go | 44 +++ daemonrpc/transport.go | 90 ++++++ daemonrpc/transport_test.go | 170 ++++++++++ docs/docs/Features/DAEMON.md | 119 +++++++ main.go | 237 +++++++++++++- tui/folder_inbox.go | 3 +- tui/messages.go | 13 +- 20 files changed, 2982 insertions(+), 10 deletions(-) create mode 100644 daemon/daemon.go create mode 100644 daemon/daemon_test.go create mode 100644 daemon/handler.go create mode 100644 daemon/pidfile_unix.go create mode 100644 daemon/pidfile_windows.go create mode 100644 daemon/signals.go create mode 100644 daemonclient/client.go create mode 100644 daemonclient/client_test.go create mode 100644 daemonclient/procattr_unix.go create mode 100644 daemonclient/procattr_windows.go create mode 100644 daemonclient/service.go create mode 100644 daemonrpc/protocol.go create mode 100644 daemonrpc/protocol_test.go create mode 100644 daemonrpc/socket.go create mode 100644 daemonrpc/transport.go create mode 100644 daemonrpc/transport_test.go create mode 100644 docs/docs/Features/DAEMON.md diff --git a/daemon/daemon.go b/daemon/daemon.go new file mode 100644 index 0000000000000000000000000000000000000000..1963df48f0e32dc814070dd2bfa067fe5f0aae20 --- /dev/null +++ b/daemon/daemon.go @@ -0,0 +1,514 @@ +package daemon + +import ( + "context" + "fmt" + "log" + "net" + "os" + "sort" + "sync" + "time" + + "github.com/floatpane/matcha/backend" + "github.com/floatpane/matcha/config" + "github.com/floatpane/matcha/daemonrpc" + "github.com/floatpane/matcha/fetcher" + "github.com/floatpane/matcha/notify" +) + +// Daemon is the long-running background process that manages email +// connections, caching, sync, and notifications. +type Daemon struct { + config *config.Config + providers map[string]backend.Provider + listener net.Listener + startTime time.Time + + // Connected TUI/CLI clients. + clients map[*daemonrpc.Conn]struct{} + mu sync.RWMutex + + // Per-client subscriptions: conn → set of "accountID:folder". + subscriptions map[*daemonrpc.Conn]map[string]struct{} + subMu sync.RWMutex + + // Mutex for disk cache updates. + cacheMu sync.Mutex + + // IMAP IDLE watcher for push notifications. + idleWatcher *fetcher.IdleWatcher + idleUpdates chan fetcher.IdleUpdate + + // Background sync cancellation. + syncCancel context.CancelFunc + + shutdown chan struct{} + done chan struct{} +} + +// New creates a daemon with the given config. +func New(cfg *config.Config) *Daemon { + idleUpdates := make(chan fetcher.IdleUpdate, 16) + return &Daemon{ + config: cfg, + providers: make(map[string]backend.Provider), + clients: make(map[*daemonrpc.Conn]struct{}), + subscriptions: make(map[*daemonrpc.Conn]map[string]struct{}), + idleWatcher: fetcher.NewIdleWatcher(idleUpdates), + idleUpdates: idleUpdates, + shutdown: make(chan struct{}), + done: make(chan struct{}), + } +} + +// Run starts the daemon: creates providers, starts the socket listener, +// starts background sync, and blocks until shutdown. +func (d *Daemon) Run() error { + d.startTime = time.Now() + + // Ensure runtime directory exists. + if err := daemonrpc.EnsureRuntimeDir(); err != nil { + return fmt.Errorf("create runtime dir: %w", err) + } + + // Check for existing daemon. + pidPath := daemonrpc.PIDPath() + if pid, running := IsRunning(pidPath); running { + return fmt.Errorf("daemon already running (PID %d)", pid) + } + + // Write PID file. + if err := WritePID(pidPath); err != nil { + return fmt.Errorf("write PID file: %w", err) + } + defer RemovePID(pidPath) + + // Remove stale socket file. + sockPath := daemonrpc.SocketPath() + os.Remove(sockPath) + + // Listen on Unix domain socket. + var err error + d.listener, err = net.Listen("unix", sockPath) + if err != nil { + return fmt.Errorf("listen: %w", err) + } + defer d.listener.Close() + + // Set socket permissions (owner only). + os.Chmod(sockPath, 0700) + + log.Printf("daemon: listening on %s (PID %d)", sockPath, os.Getpid()) + + // Initialize providers for all accounts. + d.initProviders() + + // Start IMAP IDLE watchers for all accounts. + d.startIdleWatchers() + go d.idleEventLoop() + + // Start signal handler. + go d.handleSignals() + + // Start background sync. + ctx, cancel := context.WithCancel(context.Background()) + d.syncCancel = cancel + go d.backgroundSync(ctx) + + // Accept client connections. + go d.acceptLoop() + + // Block until shutdown. + <-d.shutdown + + // Cleanup. + log.Println("daemon: shutting down") + d.listener.Close() + d.idleWatcher.StopAll() + cancel() + d.closeAllClients() + d.closeProviders() + + close(d.done) + return nil +} + +// Shutdown triggers a graceful shutdown. +func (d *Daemon) Shutdown() { + select { + case <-d.shutdown: + // Already shutting down. + default: + close(d.shutdown) + } +} + +// ReloadConfig reloads the configuration from disk. +func (d *Daemon) ReloadConfig() error { + cfg, err := config.LoadConfig() + if err != nil { + return fmt.Errorf("load config: %w", err) + } + d.mu.Lock() + d.config = cfg + d.mu.Unlock() + + // Reinitialize providers for new/changed accounts. + d.initProviders() + + // Notify clients. + d.broadcastEvent(daemonrpc.EventConfigReloaded, nil) + + log.Println("daemon: config reloaded") + return nil +} + +func (d *Daemon) initProviders() { + d.mu.Lock() + defer d.mu.Unlock() + + for i := range d.config.Accounts { + acct := &d.config.Accounts[i] + if _, exists := d.providers[acct.ID]; exists { + continue + } + p, err := backend.New(acct) + if err != nil { + log.Printf("daemon: failed to create provider for %s: %v", acct.Email, err) + continue + } + d.providers[acct.ID] = p + log.Printf("daemon: provider ready for %s (%s)", acct.Email, acct.Protocol) + } +} + +func (d *Daemon) acceptLoop() { + for { + conn, err := d.listener.Accept() + if err != nil { + select { + case <-d.shutdown: + return + default: + log.Printf("daemon: accept error: %v", err) + continue + } + } + rpcConn := daemonrpc.NewConn(conn) + d.addClient(rpcConn) + go d.handleClient(rpcConn) + } +} + +func (d *Daemon) handleClient(conn *daemonrpc.Conn) { + defer d.removeClient(conn) + defer conn.Close() + + for { + msg, err := conn.ReceiveMessage() + if err != nil { + // Client disconnected or read error. + return + } + if msg.Request != nil { + d.handleRequest(conn, msg.Request) + } + } +} + +func (d *Daemon) addClient(conn *daemonrpc.Conn) { + d.mu.Lock() + defer d.mu.Unlock() + d.clients[conn] = struct{}{} + log.Println("daemon: client connected") +} + +func (d *Daemon) removeClient(conn *daemonrpc.Conn) { + d.mu.Lock() + delete(d.clients, conn) + d.mu.Unlock() + + d.subMu.Lock() + delete(d.subscriptions, conn) + d.subMu.Unlock() + + log.Println("daemon: client disconnected") +} + +func (d *Daemon) closeAllClients() { + d.mu.Lock() + defer d.mu.Unlock() + for conn := range d.clients { + conn.Close() + } + d.clients = make(map[*daemonrpc.Conn]struct{}) +} + +func (d *Daemon) closeProviders() { + d.mu.Lock() + defer d.mu.Unlock() + for id, p := range d.providers { + if err := p.Close(); err != nil { + log.Printf("daemon: error closing provider %s: %v", id, err) + } + } +} + +// broadcastEvent sends an event to all connected clients. +func (d *Daemon) broadcastEvent(eventType string, data interface{}) { + d.mu.RLock() + defer d.mu.RUnlock() + for conn := range d.clients { + if err := conn.SendEvent(eventType, data); err != nil { + log.Printf("daemon: broadcast error: %v", err) + } + } +} + +// broadcastToSubscribers sends an event only to clients subscribed to the given account+folder. +func (d *Daemon) broadcastToSubscribers(accountID, folder, eventType string, data interface{}) { + key := accountID + ":" + folder + d.subMu.RLock() + defer d.subMu.RUnlock() + + for conn, subs := range d.subscriptions { + if _, ok := subs[key]; ok { + if err := conn.SendEvent(eventType, data); err != nil { + log.Printf("daemon: subscriber broadcast error: %v", err) + } + } + } +} + +// getProvider returns the provider for the given account ID. +func (d *Daemon) getProvider(accountID string) (backend.Provider, error) { + d.mu.RLock() + defer d.mu.RUnlock() + p, ok := d.providers[accountID] + if !ok { + return nil, fmt.Errorf("no provider for account %s", accountID) + } + return p, nil +} + +// getAccount returns the account config for the given ID. +func (d *Daemon) getAccount(accountID string) *config.Account { + d.mu.RLock() + defer d.mu.RUnlock() + return d.config.GetAccountByID(accountID) +} + +// backgroundSync handles periodic sync and IDLE-like notifications. +func (d *Daemon) backgroundSync(ctx context.Context) { + ticker := time.NewTicker(5 * time.Minute) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + d.syncAllAccounts(ctx) + } + } +} + +func (d *Daemon) syncAllAccounts(ctx context.Context) { + d.mu.RLock() + accounts := make([]config.Account, len(d.config.Accounts)) + copy(accounts, d.config.Accounts) + d.mu.RUnlock() + + for _, acct := range accounts { + select { + case <-ctx.Done(): + return + default: + } + + d.broadcastToSubscribers(acct.ID, "INBOX", daemonrpc.EventSyncStarted, daemonrpc.SyncStartedEvent{ + AccountID: acct.ID, + Folder: "INBOX", + }) + + p, err := d.getProvider(acct.ID) + if err != nil { + continue + } + + emails, err := p.FetchEmails(ctx, "INBOX", 50, 0) + if err != nil { + log.Printf("daemon: sync %s failed: %v", acct.Email, err) + d.broadcastToSubscribers(acct.ID, "INBOX", daemonrpc.EventSyncError, daemonrpc.SyncErrorEvent{ + AccountID: acct.ID, + Folder: "INBOX", + Error: err.Error(), + }) + continue + } + + // Cache the fetched emails to disk. + var cached []config.CachedEmail + for _, e := range emails { + cached = append(cached, config.CachedEmail{ + UID: e.UID, + From: e.From, + To: e.To, + Subject: e.Subject, + Date: e.Date, + MessageID: e.MessageID, + AccountID: e.AccountID, + IsRead: e.IsRead, + }) + } + if err := d.updateFolderCache("INBOX", acct.ID, cached); err != nil { + log.Printf("daemon: cache update for INBOX failed: %v", err) + } + + d.broadcastToSubscribers(acct.ID, "INBOX", daemonrpc.EventSyncComplete, daemonrpc.SyncCompleteEvent{ + AccountID: acct.ID, + Folder: "INBOX", + EmailCount: len(emails), + }) + + // Send desktop notification if TUI not connected. + d.mu.RLock() + noClients := len(d.clients) == 0 + d.mu.RUnlock() + + if noClients && len(emails) > 0 { + if !d.config.DisableNotifications { + go notify.Send("Matcha", fmt.Sprintf("New mail for %s", acct.Email)) + } + } + } +} + +// startIdleWatchers starts IMAP IDLE watchers for all accounts on INBOX. +func (d *Daemon) startIdleWatchers() { + d.mu.RLock() + defer d.mu.RUnlock() + + for i := range d.config.Accounts { + acct := &d.config.Accounts[i] + // Only IMAP accounts support IDLE. + protocol := acct.Protocol + if protocol == "" { + protocol = "imap" + } + if protocol != "imap" { + continue + } + d.idleWatcher.Watch(acct, "INBOX") + log.Printf("daemon: IDLE watcher started for %s", acct.Email) + } +} + +// idleEventLoop listens for IDLE updates and broadcasts them as events. +func (d *Daemon) idleEventLoop() { + for { + select { + case <-d.shutdown: + return + case update, ok := <-d.idleUpdates: + if !ok { + return + } + log.Printf("daemon: IDLE update for %s/%s", update.AccountID, update.FolderName) + + // Desktop notification when no clients connected. + d.mu.RLock() + noClients := len(d.clients) == 0 + d.mu.RUnlock() + + if noClients && !d.config.DisableNotifications { + accountName := update.AccountID + if acct := d.config.GetAccountByID(update.AccountID); acct != nil { + accountName = acct.Email + } + go notify.Send("Matcha", fmt.Sprintf("New mail in %s (%s)", update.FolderName, accountName)) + } + + // Broadcast to subscribed clients. + d.broadcastToSubscribers(update.AccountID, update.FolderName, daemonrpc.EventNewMail, daemonrpc.NewMailEvent{ + AccountID: update.AccountID, + Folder: update.FolderName, + }) + + // Fetch and cache emails so they're fresh when TUI next connects. + go d.fetchAndCache(update.AccountID, update.FolderName) + } + } +} + +// fetchAndCache fetches emails for an account/folder and saves to disk cache. +func (d *Daemon) fetchAndCache(accountID, folder string) { + acct := d.getAccount(accountID) + if acct == nil { + return + } + + emails, err := fetcher.FetchFolderEmails(acct, folder, 50, 0) + if err != nil { + log.Printf("daemon: cache fetch for %s/%s failed: %v", accountID, folder, err) + return + } + + // Convert to cache format and save. + var cached []config.CachedEmail + for _, e := range emails { + cached = append(cached, config.CachedEmail{ + UID: e.UID, + From: e.From, + To: e.To, + Subject: e.Subject, + Date: e.Date, + MessageID: e.MessageID, + AccountID: e.AccountID, + IsRead: e.IsRead, + }) + } + + if err := d.updateFolderCache(folder, accountID, cached); err != nil { + log.Printf("daemon: cache update for %s failed: %v", folder, err) + return + } + + log.Printf("daemon: cached %d emails for %s/%s", len(cached), accountID, folder) + + // Also notify subscribers that emails were updated. + d.broadcastToSubscribers(accountID, folder, daemonrpc.EventSyncComplete, daemonrpc.SyncCompleteEvent{ + AccountID: accountID, + Folder: folder, + EmailCount: len(emails), + }) +} + +// updateFolderCache safely merges new emails for a specific account into the existing folder cache. +func (d *Daemon) updateFolderCache(folderName, accountID string, newEmails []config.CachedEmail) error { + d.cacheMu.Lock() + defer d.cacheMu.Unlock() + + // Load existing cache + existing, _ := config.LoadFolderEmailCache(folderName) // Ignore error, assume empty if missing + + // Filter out old emails for this account + var merged []config.CachedEmail + for _, e := range existing { + if e.AccountID != accountID { + merged = append(merged, e) + } + } + + // Append new emails + merged = append(merged, newEmails...) + + // Sort newest first + sort.Slice(merged, func(i, j int) bool { + return merged[i].Date.After(merged[j].Date) + }) + + // Save merged cache + return config.SaveFolderEmailCache(folderName, merged) +} diff --git a/daemon/daemon_test.go b/daemon/daemon_test.go new file mode 100644 index 0000000000000000000000000000000000000000..2fe87a490476690018c50ce21c7ab7e72b308710 --- /dev/null +++ b/daemon/daemon_test.go @@ -0,0 +1,222 @@ +package daemon + +import ( + "encoding/json" + "net" + "os" + "path/filepath" + "testing" + "time" + + "github.com/floatpane/matcha/config" + "github.com/floatpane/matcha/daemonrpc" +) + +func TestPIDFile(t *testing.T) { + dir := t.TempDir() + path := filepath.Join(dir, "test.pid") + + if err := WritePID(path); err != nil { + t.Fatal(err) + } + + pid, err := ReadPID(path) + if err != nil { + t.Fatal(err) + } + if pid != os.Getpid() { + t.Errorf("pid = %d, want %d", pid, os.Getpid()) + } + + gotPID, running := IsRunning(path) + if !running { + t.Error("expected running=true for current process") + } + if gotPID != os.Getpid() { + t.Errorf("pid = %d, want %d", gotPID, os.Getpid()) + } + + if err := RemovePID(path); err != nil { + t.Fatal(err) + } + + _, running = IsRunning(path) + if running { + t.Error("expected running=false after remove") + } +} + +func TestPIDFile_InvalidContent(t *testing.T) { + dir := t.TempDir() + path := filepath.Join(dir, "bad.pid") + + os.WriteFile(path, []byte("notanumber"), 0644) + _, err := ReadPID(path) + if err == nil { + t.Error("expected error for invalid PID content") + } +} + +func TestPIDFile_DeadProcess(t *testing.T) { + dir := t.TempDir() + path := filepath.Join(dir, "dead.pid") + + os.WriteFile(path, []byte("99999999"), 0644) + _, running := IsRunning(path) + if running { + t.Error("expected running=false for dead PID") + } +} + +// handlerTest sets up a client/server pipe and runs a single RPC exchange. +// The handler runs in a goroutine so the pipe doesn't deadlock. +func handlerTest(t *testing.T, d *Daemon, req *daemonrpc.Request) daemonrpc.Message { + t.Helper() + clientConn, serverConn := net.Pipe() + defer clientConn.Close() + defer serverConn.Close() + + server := daemonrpc.NewConn(serverConn) + client := daemonrpc.NewConn(clientConn) + + // Handle request in goroutine (SendResponse blocks until client reads). + go func() { + d.handleRequest(server, req) + }() + + msg, err := client.ReceiveMessage() + if err != nil { + t.Fatal(err) + } + return msg +} + +func TestDaemon_PingHandler(t *testing.T) { + d := &Daemon{shutdown: make(chan struct{})} + msg := handlerTest(t, d, &daemonrpc.Request{ID: 1, Method: daemonrpc.MethodPing}) + + if msg.Response == nil { + t.Fatal("expected Response") + } + var result daemonrpc.PingResult + json.Unmarshal(msg.Response.Result, &result) + if !result.Pong { + t.Error("expected pong=true") + } +} + +func TestDaemon_StatusHandler(t *testing.T) { + d := &Daemon{ + startTime: time.Now().Add(-2 * time.Minute), + shutdown: make(chan struct{}), + config: &config.Config{}, + } + + msg := handlerTest(t, d, &daemonrpc.Request{ID: 1, Method: daemonrpc.MethodGetStatus}) + + var result daemonrpc.StatusResult + json.Unmarshal(msg.Response.Result, &result) + + if !result.Running { + t.Error("expected running=true") + } + if result.Uptime < 120 { + t.Errorf("uptime = %d, want >= 120", result.Uptime) + } +} + +func TestDaemon_UnknownMethod(t *testing.T) { + d := &Daemon{shutdown: make(chan struct{})} + msg := handlerTest(t, d, &daemonrpc.Request{ID: 1, Method: "DoesNotExist"}) + + if msg.Response.Error == nil { + t.Fatal("expected error for unknown method") + } + if msg.Response.Error.Code != daemonrpc.ErrCodeNotFound { + t.Errorf("code = %d, want %d", msg.Response.Error.Code, daemonrpc.ErrCodeNotFound) + } +} + +func TestDaemon_Subscribe(t *testing.T) { + d := &Daemon{ + subscriptions: make(map[*daemonrpc.Conn]map[string]struct{}), + shutdown: make(chan struct{}), + } + + clientConn, serverConn := net.Pipe() + defer clientConn.Close() + defer serverConn.Close() + + server := daemonrpc.NewConn(serverConn) + client := daemonrpc.NewConn(clientConn) + + params, _ := json.Marshal(daemonrpc.SubscribeParams{ + AccountID: "acc1", + Folder: "INBOX", + }) + + go func() { + d.handleRequest(server, &daemonrpc.Request{ + ID: 1, + Method: daemonrpc.MethodSubscribe, + Params: params, + }) + }() + + // Read response. + msg, err := client.ReceiveMessage() + if err != nil { + t.Fatal(err) + } + if msg.Response.Error != nil { + t.Errorf("unexpected error: %v", msg.Response.Error) + } + + // Verify subscription was recorded. + d.subMu.RLock() + subs, ok := d.subscriptions[server] + d.subMu.RUnlock() + + if !ok { + t.Fatal("expected subscription entry for connection") + } + if _, ok := subs["acc1:INBOX"]; !ok { + t.Error("expected subscription for acc1:INBOX") + } +} + +func TestDaemon_BroadcastEvent(t *testing.T) { + d := &Daemon{ + clients: make(map[*daemonrpc.Conn]struct{}), + shutdown: make(chan struct{}), + } + + clientConn, serverConn := net.Pipe() + defer clientConn.Close() + defer serverConn.Close() + + server := daemonrpc.NewConn(serverConn) + client := daemonrpc.NewConn(clientConn) + + d.mu.Lock() + d.clients[server] = struct{}{} + d.mu.Unlock() + + go func() { + d.broadcastEvent(daemonrpc.EventNewMail, daemonrpc.NewMailEvent{ + AccountID: "acc1", + Folder: "INBOX", + }) + }() + + msg, err := client.ReceiveMessage() + if err != nil { + t.Fatal(err) + } + if msg.Event == nil { + t.Fatal("expected Event") + } + if msg.Event.Type != daemonrpc.EventNewMail { + t.Errorf("type = %q, want NewMail", msg.Event.Type) + } +} diff --git a/daemon/handler.go b/daemon/handler.go new file mode 100644 index 0000000000000000000000000000000000000000..de2e73c73016132354fb23bd734aa55b6fef23da --- /dev/null +++ b/daemon/handler.go @@ -0,0 +1,368 @@ +package daemon + +import ( + "context" + "encoding/json" + "fmt" + "log" + "os" + "time" + + "github.com/floatpane/matcha/daemonrpc" +) + +func (d *Daemon) handleRequest(conn *daemonrpc.Conn, req *daemonrpc.Request) { + switch req.Method { + case daemonrpc.MethodPing: + d.handlePing(conn, req) + case daemonrpc.MethodGetStatus: + d.handleGetStatus(conn, req) + case daemonrpc.MethodGetAccounts: + d.handleGetAccounts(conn, req) + case daemonrpc.MethodReloadConfig: + d.handleReloadConfig(conn, req) + case daemonrpc.MethodFetchEmails: + d.handleFetchEmails(conn, req) + case daemonrpc.MethodFetchEmailBody: + d.handleFetchEmailBody(conn, req) + case daemonrpc.MethodDeleteEmails: + d.handleDeleteEmails(conn, req) + case daemonrpc.MethodArchiveEmails: + d.handleArchiveEmails(conn, req) + case daemonrpc.MethodMoveEmails: + d.handleMoveEmails(conn, req) + case daemonrpc.MethodMarkRead: + d.handleMarkRead(conn, req) + case daemonrpc.MethodFetchFolders: + d.handleFetchFolders(conn, req) + case daemonrpc.MethodRefreshFolder: + d.handleRefreshFolder(conn, req) + case daemonrpc.MethodSubscribe: + d.handleSubscribe(conn, req) + case daemonrpc.MethodUnsubscribe: + d.handleUnsubscribe(conn, req) + default: + conn.SendError(req.ID, daemonrpc.ErrCodeNotFound, fmt.Sprintf("unknown method: %s", req.Method)) + } +} + +func decodeParams[T any](req *daemonrpc.Request) (T, error) { + var params T + if req.Params != nil { + if err := json.Unmarshal(req.Params, ¶ms); err != nil { + return params, err + } + } + return params, nil +} + +func (d *Daemon) handlePing(conn *daemonrpc.Conn, req *daemonrpc.Request) { + conn.SendResponse(req.ID, daemonrpc.PingResult{Pong: true}) +} + +func (d *Daemon) handleGetStatus(conn *daemonrpc.Conn, req *daemonrpc.Request) { + d.mu.RLock() + var accounts []string + for _, acct := range d.config.Accounts { + accounts = append(accounts, acct.Email) + } + d.mu.RUnlock() + + conn.SendResponse(req.ID, daemonrpc.StatusResult{ + Running: true, + Uptime: int64(time.Since(d.startTime).Seconds()), + Accounts: accounts, + PID: os.Getpid(), + }) +} + +func (d *Daemon) handleGetAccounts(conn *daemonrpc.Conn, req *daemonrpc.Request) { + d.mu.RLock() + defer d.mu.RUnlock() + + var infos []daemonrpc.AccountInfo + for _, acct := range d.config.Accounts { + protocol := acct.Protocol + if protocol == "" { + protocol = "imap" + } + infos = append(infos, daemonrpc.AccountInfo{ + ID: acct.ID, + Name: acct.Name, + Email: acct.Email, + Protocol: protocol, + }) + } + conn.SendResponse(req.ID, infos) +} + +func (d *Daemon) handleReloadConfig(conn *daemonrpc.Conn, req *daemonrpc.Request) { + if err := d.ReloadConfig(); err != nil { + conn.SendError(req.ID, daemonrpc.ErrCodeInternal, err.Error()) + return + } + conn.SendResponse(req.ID, true) +} + +func (d *Daemon) handleFetchEmails(conn *daemonrpc.Conn, req *daemonrpc.Request) { + params, err := decodeParams[daemonrpc.FetchEmailsParams](req) + if err != nil { + conn.SendError(req.ID, daemonrpc.ErrCodeParse, err.Error()) + return + } + + p, err := d.getProvider(params.AccountID) + if err != nil { + conn.SendError(req.ID, daemonrpc.ErrCodeInternal, err.Error()) + return + } + + ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) + defer cancel() + + emails, err := p.FetchEmails(ctx, params.Folder, params.Limit, params.Offset) + if err != nil { + conn.SendError(req.ID, daemonrpc.ErrCodeInternal, err.Error()) + return + } + + conn.SendResponse(req.ID, emails) +} + +func (d *Daemon) handleFetchEmailBody(conn *daemonrpc.Conn, req *daemonrpc.Request) { + params, err := decodeParams[daemonrpc.FetchEmailBodyParams](req) + if err != nil { + conn.SendError(req.ID, daemonrpc.ErrCodeParse, err.Error()) + return + } + + p, err := d.getProvider(params.AccountID) + if err != nil { + conn.SendError(req.ID, daemonrpc.ErrCodeInternal, err.Error()) + return + } + + ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) + defer cancel() + + body, attachments, err := p.FetchEmailBody(ctx, params.Folder, params.UID) + if err != nil { + conn.SendError(req.ID, daemonrpc.ErrCodeInternal, err.Error()) + return + } + + // Convert backend.Attachment to daemonrpc.AttachmentInfo for wire transfer. + var attInfos []daemonrpc.AttachmentInfo + for _, att := range attachments { + attInfos = append(attInfos, daemonrpc.AttachmentInfo{ + Filename: att.Filename, + PartID: att.PartID, + Encoding: att.Encoding, + MIMEType: att.MIMEType, + }) + } + + conn.SendResponse(req.ID, daemonrpc.FetchEmailBodyResult{ + Body: body, + Attachments: attInfos, + }) +} + +func (d *Daemon) handleDeleteEmails(conn *daemonrpc.Conn, req *daemonrpc.Request) { + params, err := decodeParams[daemonrpc.DeleteEmailsParams](req) + if err != nil { + conn.SendError(req.ID, daemonrpc.ErrCodeParse, err.Error()) + return + } + + p, err := d.getProvider(params.AccountID) + if err != nil { + conn.SendError(req.ID, daemonrpc.ErrCodeInternal, err.Error()) + return + } + + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + if err := p.DeleteEmails(ctx, params.Folder, params.UIDs); err != nil { + conn.SendError(req.ID, daemonrpc.ErrCodeInternal, err.Error()) + return + } + conn.SendResponse(req.ID, true) +} + +func (d *Daemon) handleArchiveEmails(conn *daemonrpc.Conn, req *daemonrpc.Request) { + params, err := decodeParams[daemonrpc.ArchiveEmailsParams](req) + if err != nil { + conn.SendError(req.ID, daemonrpc.ErrCodeParse, err.Error()) + return + } + + p, err := d.getProvider(params.AccountID) + if err != nil { + conn.SendError(req.ID, daemonrpc.ErrCodeInternal, err.Error()) + return + } + + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + if err := p.ArchiveEmails(ctx, params.Folder, params.UIDs); err != nil { + conn.SendError(req.ID, daemonrpc.ErrCodeInternal, err.Error()) + return + } + conn.SendResponse(req.ID, true) +} + +func (d *Daemon) handleMoveEmails(conn *daemonrpc.Conn, req *daemonrpc.Request) { + params, err := decodeParams[daemonrpc.MoveEmailsParams](req) + if err != nil { + conn.SendError(req.ID, daemonrpc.ErrCodeParse, err.Error()) + return + } + + p, err := d.getProvider(params.AccountID) + if err != nil { + conn.SendError(req.ID, daemonrpc.ErrCodeInternal, err.Error()) + return + } + + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + if err := p.MoveEmails(ctx, params.UIDs, params.SourceFolder, params.DestFolder); err != nil { + conn.SendError(req.ID, daemonrpc.ErrCodeInternal, err.Error()) + return + } + conn.SendResponse(req.ID, true) +} + +func (d *Daemon) handleMarkRead(conn *daemonrpc.Conn, req *daemonrpc.Request) { + params, err := decodeParams[daemonrpc.MarkReadParams](req) + if err != nil { + conn.SendError(req.ID, daemonrpc.ErrCodeParse, err.Error()) + return + } + + p, err := d.getProvider(params.AccountID) + if err != nil { + conn.SendError(req.ID, daemonrpc.ErrCodeInternal, err.Error()) + return + } + + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + // MarkAsRead only supports one UID at a time in the Provider interface. + for _, uid := range params.UIDs { + if err := p.MarkAsRead(ctx, params.Folder, uid); err != nil { + log.Printf("daemon: mark read %d failed: %v", uid, err) + } + } + conn.SendResponse(req.ID, true) +} + +func (d *Daemon) handleFetchFolders(conn *daemonrpc.Conn, req *daemonrpc.Request) { + params, err := decodeParams[daemonrpc.FetchFoldersParams](req) + if err != nil { + conn.SendError(req.ID, daemonrpc.ErrCodeParse, err.Error()) + return + } + + p, err := d.getProvider(params.AccountID) + if err != nil { + conn.SendError(req.ID, daemonrpc.ErrCodeInternal, err.Error()) + return + } + + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + folders, err := p.FetchFolders(ctx) + if err != nil { + conn.SendError(req.ID, daemonrpc.ErrCodeInternal, err.Error()) + return + } + conn.SendResponse(req.ID, folders) +} + +func (d *Daemon) handleRefreshFolder(conn *daemonrpc.Conn, req *daemonrpc.Request) { + params, err := decodeParams[daemonrpc.RefreshFolderParams](req) + if err != nil { + conn.SendError(req.ID, daemonrpc.ErrCodeParse, err.Error()) + return + } + + // Async: fetch in background, push events when done. + go func() { + p, err := d.getProvider(params.AccountID) + if err != nil { + log.Printf("daemon: refresh provider error: %v", err) + return + } + + d.broadcastToSubscribers(params.AccountID, params.Folder, daemonrpc.EventSyncStarted, daemonrpc.SyncStartedEvent{ + AccountID: params.AccountID, + Folder: params.Folder, + }) + + ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) + defer cancel() + + emails, err := p.FetchEmails(ctx, params.Folder, 50, 0) + if err != nil { + d.broadcastToSubscribers(params.AccountID, params.Folder, daemonrpc.EventSyncError, daemonrpc.SyncErrorEvent{ + AccountID: params.AccountID, + Folder: params.Folder, + Error: err.Error(), + }) + return + } + + d.broadcastToSubscribers(params.AccountID, params.Folder, daemonrpc.EventSyncComplete, daemonrpc.SyncCompleteEvent{ + AccountID: params.AccountID, + Folder: params.Folder, + EmailCount: len(emails), + }) + }() + + conn.SendResponse(req.ID, true) +} + +func (d *Daemon) handleSubscribe(conn *daemonrpc.Conn, req *daemonrpc.Request) { + params, err := decodeParams[daemonrpc.SubscribeParams](req) + if err != nil { + conn.SendError(req.ID, daemonrpc.ErrCodeParse, err.Error()) + return + } + + key := params.AccountID + ":" + params.Folder + + d.subMu.Lock() + if d.subscriptions[conn] == nil { + d.subscriptions[conn] = make(map[string]struct{}) + } + d.subscriptions[conn][key] = struct{}{} + d.subMu.Unlock() + + log.Printf("daemon: client subscribed to %s", key) + conn.SendResponse(req.ID, true) +} + +func (d *Daemon) handleUnsubscribe(conn *daemonrpc.Conn, req *daemonrpc.Request) { + params, err := decodeParams[daemonrpc.UnsubscribeParams](req) + if err != nil { + conn.SendError(req.ID, daemonrpc.ErrCodeParse, err.Error()) + return + } + + key := params.AccountID + ":" + params.Folder + + d.subMu.Lock() + if subs, ok := d.subscriptions[conn]; ok { + delete(subs, key) + } + d.subMu.Unlock() + + conn.SendResponse(req.ID, true) +} diff --git a/daemon/pidfile_unix.go b/daemon/pidfile_unix.go new file mode 100644 index 0000000000000000000000000000000000000000..8182fe9e1cc2b522137ea7e868c695b1e00aef61 --- /dev/null +++ b/daemon/pidfile_unix.go @@ -0,0 +1,45 @@ +//go:build !windows + +package daemon + +import ( + "fmt" + "os" + "strconv" + "strings" + "syscall" +) + +// WritePID writes the current process ID to the given path. +func WritePID(path string) error { + return os.WriteFile(path, []byte(strconv.Itoa(os.Getpid())), 0644) +} + +// ReadPID reads the process ID from the given path. +func ReadPID(path string) (int, error) { + data, err := os.ReadFile(path) + if err != nil { + return 0, err + } + pid, err := strconv.Atoi(strings.TrimSpace(string(data))) + if err != nil { + return 0, fmt.Errorf("invalid PID file: %w", err) + } + return pid, nil +} + +// IsRunning checks if a daemon process is alive using the PID file. +func IsRunning(path string) (int, bool) { + pid, err := ReadPID(path) + if err != nil { + return 0, false + } + // Signal 0 checks if process exists without sending a signal. + err = syscall.Kill(pid, 0) + return pid, err == nil +} + +// RemovePID removes the PID file. +func RemovePID(path string) error { + return os.Remove(path) +} diff --git a/daemon/pidfile_windows.go b/daemon/pidfile_windows.go new file mode 100644 index 0000000000000000000000000000000000000000..98bf2f192ac948fc287b858024b44185949c0861 --- /dev/null +++ b/daemon/pidfile_windows.go @@ -0,0 +1,62 @@ +//go:build windows + +package daemon + +import ( + "fmt" + "os" + "strconv" + "strings" + "syscall" +) + +// WritePID writes the current process ID to the given path. +func WritePID(path string) error { + return os.WriteFile(path, []byte(strconv.Itoa(os.Getpid())), 0644) +} + +// ReadPID reads the process ID from the given path. +func ReadPID(path string) (int, error) { + data, err := os.ReadFile(path) + if err != nil { + return 0, err + } + pid, err := strconv.Atoi(strings.TrimSpace(string(data))) + if err != nil { + return 0, fmt.Errorf("invalid PID file: %w", err) + } + return pid, nil +} + +// IsRunning checks if a daemon process is alive using the PID file. +func IsRunning(path string) (int, bool) { + pid, err := ReadPID(path) + if err != nil { + return 0, false + } + + // On Windows, syscall.Kill is not available. We use OpenProcess instead. + // We only need PROCESS_QUERY_LIMITED_INFORMATION (0x1000) + const PROCESS_QUERY_LIMITED_INFORMATION = 0x1000 + h, err := syscall.OpenProcess(PROCESS_QUERY_LIMITED_INFORMATION, false, uint32(pid)) + if err != nil { + // Process could not be opened, which likely means it doesn't exist + return pid, false + } + defer syscall.CloseHandle(h) + + // Check if the process is still running or has exited + var exitCode uint32 + err = syscall.GetExitCodeProcess(h, &exitCode) + if err != nil { + return pid, false + } + + // STILL_ACTIVE is 259 + return pid, exitCode == 259 +} + +// RemovePID removes the PID file. +func RemovePID(path string) error { + return os.Remove(path) +} diff --git a/daemon/signals.go b/daemon/signals.go new file mode 100644 index 0000000000000000000000000000000000000000..03b15aba5ddaccae6e5326b8865ddd30ef7b7765 --- /dev/null +++ b/daemon/signals.go @@ -0,0 +1,35 @@ +package daemon + +import ( + "log" + "os" + "os/signal" + "syscall" +) + +// handleSignals listens for OS signals and triggers daemon actions. +// SIGTERM/SIGINT → graceful shutdown +// SIGHUP → config reload +func (d *Daemon) handleSignals() { + ch := make(chan os.Signal, 1) + signal.Notify(ch, syscall.SIGTERM, syscall.SIGINT, syscall.SIGHUP) + + for { + select { + case sig := <-ch: + switch sig { + case syscall.SIGTERM, syscall.SIGINT: + log.Println("daemon: received shutdown signal") + d.Shutdown() + return + case syscall.SIGHUP: + log.Println("daemon: received SIGHUP, reloading config") + if err := d.ReloadConfig(); err != nil { + log.Printf("daemon: config reload failed: %v", err) + } + } + case <-d.shutdown: + return + } + } +} diff --git a/daemonclient/client.go b/daemonclient/client.go new file mode 100644 index 0000000000000000000000000000000000000000..47a98ce52c157f0b5a772bdc4663cea8cec61960 --- /dev/null +++ b/daemonclient/client.go @@ -0,0 +1,156 @@ +package daemonclient + +import ( + "encoding/json" + "fmt" + "net" + "sync" + "sync/atomic" + + "github.com/floatpane/matcha/daemonrpc" +) + +// Client connects to the matcha daemon over a Unix domain socket. +type Client struct { + conn *daemonrpc.Conn + nextID atomic.Uint64 + pending map[uint64]chan *daemonrpc.Response + mu sync.Mutex + events chan *daemonrpc.Event + done chan struct{} +} + +// Dial connects to the daemon socket. +func Dial() (*Client, error) { + sockPath := daemonrpc.SocketPath() + conn, err := net.Dial("unix", sockPath) + if err != nil { + return nil, fmt.Errorf("connect to daemon: %w", err) + } + + c := &Client{ + conn: daemonrpc.NewConn(conn), + pending: make(map[uint64]chan *daemonrpc.Response), + events: make(chan *daemonrpc.Event, 64), + done: make(chan struct{}), + } + + go c.readLoop() + return c, nil +} + +// Call makes a synchronous RPC call to the daemon. +func (c *Client) Call(method string, params interface{}, result interface{}) error { + id := c.nextID.Add(1) + + // Marshal params. + var rawParams json.RawMessage + if params != nil { + var err error + rawParams, err = json.Marshal(params) + if err != nil { + return fmt.Errorf("marshal params: %w", err) + } + } + + // Register pending response channel. + ch := make(chan *daemonrpc.Response, 1) + c.mu.Lock() + c.pending[id] = ch + c.mu.Unlock() + + defer func() { + c.mu.Lock() + delete(c.pending, id) + c.mu.Unlock() + }() + + // Send request. + req := &daemonrpc.Request{ + ID: id, + Method: method, + Params: rawParams, + } + if err := c.conn.Send(req); err != nil { + return fmt.Errorf("send request: %w", err) + } + + // Wait for response. + select { + case resp := <-ch: + if resp.Error != nil { + return resp.Error + } + if result != nil && resp.Result != nil { + return json.Unmarshal(resp.Result, result) + } + return nil + case <-c.done: + return fmt.Errorf("connection closed") + } +} + +// Events returns the channel that receives push events from the daemon. +func (c *Client) Events() <-chan *daemonrpc.Event { + return c.events +} + +// Close closes the connection to the daemon. +func (c *Client) Close() error { + select { + case <-c.done: + return nil + default: + close(c.done) + } + return c.conn.Close() +} + +// readLoop reads messages from the daemon and dispatches them. +func (c *Client) readLoop() { + defer close(c.events) + + for { + msg, err := c.conn.ReceiveMessage() + if err != nil { + select { + case <-c.done: + default: + close(c.done) + } + return + } + + if msg.Response != nil { + c.mu.Lock() + ch, ok := c.pending[msg.Response.ID] + c.mu.Unlock() + if ok { + ch <- msg.Response + } + } + + if msg.Event != nil { + select { + case c.events <- msg.Event: + default: + // Drop event if channel full. + } + } + } +} + +// Ping checks if the daemon is responsive. +func (c *Client) Ping() error { + var result daemonrpc.PingResult + return c.Call(daemonrpc.MethodPing, nil, &result) +} + +// Status returns daemon status info. +func (c *Client) Status() (*daemonrpc.StatusResult, error) { + var result daemonrpc.StatusResult + if err := c.Call(daemonrpc.MethodGetStatus, nil, &result); err != nil { + return nil, err + } + return &result, nil +} diff --git a/daemonclient/client_test.go b/daemonclient/client_test.go new file mode 100644 index 0000000000000000000000000000000000000000..8a7e977f66bbe8a50a049e117ba7f03282f02440 --- /dev/null +++ b/daemonclient/client_test.go @@ -0,0 +1,155 @@ +package daemonclient + +import ( + "encoding/json" + "net" + "testing" + + "github.com/floatpane/matcha/daemonrpc" +) + +func mockDaemon(t *testing.T) (*Client, *daemonrpc.Conn) { + t.Helper() + serverConn, clientConn := net.Pipe() + + server := daemonrpc.NewConn(serverConn) + client := &Client{ + conn: daemonrpc.NewConn(clientConn), + pending: make(map[uint64]chan *daemonrpc.Response), + events: make(chan *daemonrpc.Event, 64), + done: make(chan struct{}), + } + go client.readLoop() + + return client, server +} + +func TestClient_Ping(t *testing.T) { + client, server := mockDaemon(t) + defer client.Close() + defer server.Close() + + // Mock server: respond to Ping. + go func() { + msg, err := server.ReceiveMessage() + if err != nil { + t.Error(err) + return + } + if msg.Request.Method != daemonrpc.MethodPing { + t.Errorf("method = %q, want Ping", msg.Request.Method) + } + server.SendResponse(msg.Request.ID, daemonrpc.PingResult{Pong: true}) + }() + + if err := client.Ping(); err != nil { + t.Fatal(err) + } +} + +func TestClient_Status(t *testing.T) { + client, server := mockDaemon(t) + defer client.Close() + defer server.Close() + + go func() { + msg, _ := server.ReceiveMessage() + server.SendResponse(msg.Request.ID, daemonrpc.StatusResult{ + Running: true, + Uptime: 120, + Accounts: []string{"alice@example.com"}, + PID: 12345, + }) + }() + + status, err := client.Status() + if err != nil { + t.Fatal(err) + } + if !status.Running { + t.Error("expected running=true") + } + if status.PID != 12345 { + t.Errorf("PID = %d, want 12345", status.PID) + } + if len(status.Accounts) != 1 || status.Accounts[0] != "alice@example.com" { + t.Errorf("accounts = %v, want [alice@example.com]", status.Accounts) + } +} + +func TestClient_CallError(t *testing.T) { + client, server := mockDaemon(t) + defer client.Close() + defer server.Close() + + go func() { + msg, _ := server.ReceiveMessage() + server.SendError(msg.Request.ID, daemonrpc.ErrCodeNotFound, "method not found") + }() + + var result daemonrpc.PingResult + err := client.Call("NonExistent", nil, &result) + if err == nil { + t.Fatal("expected error") + } + if err.Error() != "method not found" { + t.Errorf("error = %q, want 'method not found'", err.Error()) + } +} + +func TestClient_Events(t *testing.T) { + client, server := mockDaemon(t) + defer client.Close() + defer server.Close() + + // Server pushes an event. + go func() { + server.SendEvent(daemonrpc.EventNewMail, daemonrpc.NewMailEvent{ + AccountID: "acc1", + Folder: "INBOX", + }) + }() + + ev := <-client.Events() + if ev.Type != daemonrpc.EventNewMail { + t.Errorf("type = %q, want NewMail", ev.Type) + } + + var data daemonrpc.NewMailEvent + if err := json.Unmarshal(ev.Data, &data); err != nil { + t.Fatal(err) + } + if data.AccountID != "acc1" { + t.Errorf("account_id = %q, want acc1", data.AccountID) + } +} + +func TestClient_ConcurrentCalls(t *testing.T) { + client, server := mockDaemon(t) + defer client.Close() + defer server.Close() + + // Server handles two requests. + go func() { + for i := 0; i < 2; i++ { + msg, err := server.ReceiveMessage() + if err != nil { + return + } + server.SendResponse(msg.Request.ID, daemonrpc.PingResult{Pong: true}) + } + }() + + errs := make(chan error, 2) + for i := 0; i < 2; i++ { + go func() { + errs <- client.Ping() + }() + } + + for i := 0; i < 2; i++ { + if err := <-errs; err != nil { + t.Errorf("call %d failed: %v", i, err) + } + } +} diff --git a/daemonclient/procattr_unix.go b/daemonclient/procattr_unix.go new file mode 100644 index 0000000000000000000000000000000000000000..e38b456e04ce468b1c5b5eca139af52e0c1daf5b --- /dev/null +++ b/daemonclient/procattr_unix.go @@ -0,0 +1,12 @@ +//go:build !windows + +package daemonclient + +import "syscall" + +// DaemonProcAttr returns SysProcAttr for detaching the daemon process. +func DaemonProcAttr() *syscall.SysProcAttr { + return &syscall.SysProcAttr{ + Setsid: true, + } +} diff --git a/daemonclient/procattr_windows.go b/daemonclient/procattr_windows.go new file mode 100644 index 0000000000000000000000000000000000000000..678012fc1d1b47fa4b41cbfc2c8c7e4b70c82c5f --- /dev/null +++ b/daemonclient/procattr_windows.go @@ -0,0 +1,12 @@ +//go:build windows + +package daemonclient + +import "syscall" + +// DaemonProcAttr returns SysProcAttr for detaching the daemon process. +func DaemonProcAttr() *syscall.SysProcAttr { + return &syscall.SysProcAttr{ + CreationFlags: 0x00000008, // DETACHED_PROCESS + } +} diff --git a/daemonclient/service.go b/daemonclient/service.go new file mode 100644 index 0000000000000000000000000000000000000000..bcee087ab3fcc54a4a9764dbe2eaaf1624b94edc --- /dev/null +++ b/daemonclient/service.go @@ -0,0 +1,328 @@ +package daemonclient + +import ( + "context" + "log" + "os" + "os/exec" + "time" + + "github.com/floatpane/matcha/backend" + "github.com/floatpane/matcha/config" + "github.com/floatpane/matcha/daemonrpc" +) + +// Service abstracts daemon-backed vs direct email operations. +// TUI and CLI use this interface — they don't care which mode is active. +type Service interface { + FetchEmails(accountID, folder string, limit, offset uint32) ([]backend.Email, error) + FetchEmailBody(accountID, folder string, uid uint32) (string, []backend.Attachment, error) + DeleteEmails(accountID, folder string, uids []uint32) error + ArchiveEmails(accountID, folder string, uids []uint32) error + MoveEmails(accountID string, uids []uint32, src, dst string) error + MarkRead(accountID, folder string, uids []uint32) error + FetchFolders(accountID string) ([]backend.Folder, error) + RefreshFolder(accountID, folder string) error + Subscribe(accountID, folder string) error + Unsubscribe(accountID, folder string) error + Events() <-chan *daemonrpc.Event + IsDaemon() bool + Close() error +} + +// NewService connects to the daemon, auto-starting it if needed. +// Falls back to direct mode only if daemon cannot be started. +func NewService(cfg *config.Config) Service { + // Try connecting to existing daemon. + if svc := tryConnect(); svc != nil { + return svc + } + + // Daemon not running — auto-start it. + log.Println("service: daemon not running, auto-starting") + if err := autoStartDaemon(); err != nil { + log.Printf("service: auto-start failed: %v, using direct mode", err) + return newDirectService(cfg) + } + + // Wait briefly for daemon to become ready, then connect. + for i := 0; i < 20; i++ { + time.Sleep(100 * time.Millisecond) + if svc := tryConnect(); svc != nil { + log.Println("service: connected to auto-started daemon") + return svc + } + } + + log.Println("service: daemon started but not responding, using direct mode") + return newDirectService(cfg) +} + +func tryConnect() *daemonService { + client, err := Dial() + if err != nil { + return nil + } + if err := client.Ping(); err != nil { + client.Close() + return nil + } + return &daemonService{client: client} +} + +func autoStartDaemon() error { + exe, err := os.Executable() + if err != nil { + return err + } + + cmd := exec.Command(exe, "daemon", "run") + cmd.Stdout = nil + cmd.Stderr = nil + cmd.Stdin = nil + cmd.SysProcAttr = DaemonProcAttr() + + return cmd.Start() +} + +// daemonService routes all operations through the daemon socket. +type daemonService struct { + client *Client +} + +func (s *daemonService) FetchEmails(accountID, folder string, limit, offset uint32) ([]backend.Email, error) { + var emails []backend.Email + err := s.client.Call(daemonrpc.MethodFetchEmails, daemonrpc.FetchEmailsParams{ + AccountID: accountID, + Folder: folder, + Limit: limit, + Offset: offset, + }, &emails) + return emails, err +} + +func (s *daemonService) FetchEmailBody(accountID, folder string, uid uint32) (string, []backend.Attachment, error) { + var result daemonrpc.FetchEmailBodyResult + err := s.client.Call(daemonrpc.MethodFetchEmailBody, daemonrpc.FetchEmailBodyParams{ + AccountID: accountID, + Folder: folder, + UID: uid, + }, &result) + if err != nil { + return "", nil, err + } + + var attachments []backend.Attachment + for _, a := range result.Attachments { + attachments = append(attachments, backend.Attachment{ + Filename: a.Filename, + PartID: a.PartID, + Encoding: a.Encoding, + MIMEType: a.MIMEType, + }) + } + return result.Body, attachments, nil +} + +func (s *daemonService) DeleteEmails(accountID, folder string, uids []uint32) error { + return s.client.Call(daemonrpc.MethodDeleteEmails, daemonrpc.DeleteEmailsParams{ + AccountID: accountID, + Folder: folder, + UIDs: uids, + }, nil) +} + +func (s *daemonService) ArchiveEmails(accountID, folder string, uids []uint32) error { + return s.client.Call(daemonrpc.MethodArchiveEmails, daemonrpc.ArchiveEmailsParams{ + AccountID: accountID, + Folder: folder, + UIDs: uids, + }, nil) +} + +func (s *daemonService) MoveEmails(accountID string, uids []uint32, src, dst string) error { + return s.client.Call(daemonrpc.MethodMoveEmails, daemonrpc.MoveEmailsParams{ + AccountID: accountID, + UIDs: uids, + SourceFolder: src, + DestFolder: dst, + }, nil) +} + +func (s *daemonService) MarkRead(accountID, folder string, uids []uint32) error { + return s.client.Call(daemonrpc.MethodMarkRead, daemonrpc.MarkReadParams{ + AccountID: accountID, + Folder: folder, + UIDs: uids, + Read: true, + }, nil) +} + +func (s *daemonService) FetchFolders(accountID string) ([]backend.Folder, error) { + var folders []backend.Folder + err := s.client.Call(daemonrpc.MethodFetchFolders, daemonrpc.FetchFoldersParams{ + AccountID: accountID, + }, &folders) + return folders, err +} + +func (s *daemonService) RefreshFolder(accountID, folder string) error { + return s.client.Call(daemonrpc.MethodRefreshFolder, daemonrpc.RefreshFolderParams{ + AccountID: accountID, + Folder: folder, + }, nil) +} + +func (s *daemonService) Subscribe(accountID, folder string) error { + return s.client.Call(daemonrpc.MethodSubscribe, daemonrpc.SubscribeParams{ + AccountID: accountID, + Folder: folder, + }, nil) +} + +func (s *daemonService) Unsubscribe(accountID, folder string) error { + return s.client.Call(daemonrpc.MethodUnsubscribe, daemonrpc.UnsubscribeParams{ + AccountID: accountID, + Folder: folder, + }, nil) +} + +func (s *daemonService) Events() <-chan *daemonrpc.Event { + return s.client.Events() +} + +func (s *daemonService) IsDaemon() bool { return true } + +func (s *daemonService) Close() error { + return s.client.Close() +} + +// directService runs operations in-process (no daemon). +// This is the fallback when daemon is not running. +type directService struct { + cfg *config.Config + providers map[string]backend.Provider + events chan *daemonrpc.Event +} + +func newDirectService(cfg *config.Config) *directService { + s := &directService{ + cfg: cfg, + providers: make(map[string]backend.Provider), + events: make(chan *daemonrpc.Event, 64), + } + s.initProviders() + return s +} + +func (s *directService) initProviders() { + for i := range s.cfg.Accounts { + acct := &s.cfg.Accounts[i] + if _, ok := s.providers[acct.ID]; ok { + continue + } + p, err := backend.New(acct) + if err != nil { + log.Printf("direct service: provider for %s failed: %v", acct.Email, err) + continue + } + s.providers[acct.ID] = p + } +} + +func (s *directService) getProvider(accountID string) (backend.Provider, error) { + p, ok := s.providers[accountID] + if !ok { + return nil, &daemonrpc.Error{Code: daemonrpc.ErrCodeInternal, Message: "no provider for account " + accountID} + } + return p, nil +} + +func (s *directService) FetchEmails(accountID, folder string, limit, offset uint32) ([]backend.Email, error) { + p, err := s.getProvider(accountID) + if err != nil { + return nil, err + } + return p.FetchEmails(context.Background(), folder, limit, offset) +} + +func (s *directService) FetchEmailBody(accountID, folder string, uid uint32) (string, []backend.Attachment, error) { + p, err := s.getProvider(accountID) + if err != nil { + return "", nil, err + } + return p.FetchEmailBody(context.Background(), folder, uid) +} + +func (s *directService) DeleteEmails(accountID, folder string, uids []uint32) error { + p, err := s.getProvider(accountID) + if err != nil { + return err + } + return p.DeleteEmails(context.Background(), folder, uids) +} + +func (s *directService) ArchiveEmails(accountID, folder string, uids []uint32) error { + p, err := s.getProvider(accountID) + if err != nil { + return err + } + return p.ArchiveEmails(context.Background(), folder, uids) +} + +func (s *directService) MoveEmails(accountID string, uids []uint32, src, dst string) error { + p, err := s.getProvider(accountID) + if err != nil { + return err + } + return p.MoveEmails(context.Background(), uids, src, dst) +} + +func (s *directService) MarkRead(accountID, folder string, uids []uint32) error { + p, err := s.getProvider(accountID) + if err != nil { + return err + } + for _, uid := range uids { + if err := p.MarkAsRead(context.Background(), folder, uid); err != nil { + return err + } + } + return nil +} + +func (s *directService) FetchFolders(accountID string) ([]backend.Folder, error) { + p, err := s.getProvider(accountID) + if err != nil { + return nil, err + } + return p.FetchFolders(context.Background()) +} + +func (s *directService) RefreshFolder(_, _ string) error { + // In direct mode, caller handles refresh via their own fetcher calls. + return nil +} + +func (s *directService) Subscribe(_, _ string) error { + // No-op in direct mode — TUI manages its own IDLE. + return nil +} + +func (s *directService) Unsubscribe(_, _ string) error { + return nil +} + +func (s *directService) Events() <-chan *daemonrpc.Event { + return s.events +} + +func (s *directService) IsDaemon() bool { return false } + +func (s *directService) Close() error { + for _, p := range s.providers { + p.Close() + } + close(s.events) + return nil +} diff --git a/daemonrpc/protocol.go b/daemonrpc/protocol.go new file mode 100644 index 0000000000000000000000000000000000000000..6fc8ad3a4b1d6b378a19b0ce854d3d3a875b9de3 --- /dev/null +++ b/daemonrpc/protocol.go @@ -0,0 +1,269 @@ +package daemonrpc + +import "encoding/json" + +// Request from client to daemon. Has an ID for matching responses. +type Request struct { + ID uint64 `json:"id"` + Method string `json:"method"` + Params json.RawMessage `json:"params,omitempty"` +} + +// Response from daemon to client. Matched to request by ID. +type Response struct { + ID uint64 `json:"id"` + Result json.RawMessage `json:"result,omitempty"` + Error *Error `json:"error,omitempty"` +} + +// Event pushed from daemon to subscribed clients. No ID field. +type Event struct { + Type string `json:"type"` + Data json.RawMessage `json:"data,omitempty"` +} + +// Error returned in a Response. +type Error struct { + Code int `json:"code"` + Message string `json:"message"` +} + +func (e *Error) Error() string { return e.Message } + +// Message is a union type for wire decoding. Exactly one of the +// fields will be populated based on the presence of "id" and "type". +type Message struct { + Request *Request + Response *Response + Event *Event +} + +// Discriminate: if "type" present → Event, if "method" present → Request, else → Response. +func DecodeMessage(raw json.RawMessage) (Message, error) { + var probe struct { + Type string `json:"type"` + Method string `json:"method"` + ID *uint64 `json:"id"` + } + if err := json.Unmarshal(raw, &probe); err != nil { + return Message{}, err + } + + var m Message + if probe.Type != "" { + var ev Event + if err := json.Unmarshal(raw, &ev); err != nil { + return m, err + } + m.Event = &ev + } else if probe.Method != "" { + var req Request + if err := json.Unmarshal(raw, &req); err != nil { + return m, err + } + m.Request = &req + } else { + var resp Response + if err := json.Unmarshal(raw, &resp); err != nil { + return m, err + } + m.Response = &resp + } + return m, nil +} + +// Standard error codes. +const ( + ErrCodeParse = -32700 + ErrCodeInvalidReq = -32600 + ErrCodeNotFound = -32601 + ErrCodeInternal = -32603 +) + +// RPC method names. +const ( + MethodPing = "Ping" + MethodGetStatus = "GetStatus" + MethodGetAccounts = "GetAccounts" + MethodReloadConfig = "ReloadConfig" + MethodFetchEmails = "FetchEmails" + MethodFetchEmailBody = "FetchEmailBody" + MethodSendEmail = "SendEmail" + MethodDeleteEmails = "DeleteEmails" + MethodArchiveEmails = "ArchiveEmails" + MethodMoveEmails = "MoveEmails" + MethodMarkRead = "MarkRead" + MethodFetchFolders = "FetchFolders" + MethodRefreshFolder = "RefreshFolder" + MethodSubscribe = "Subscribe" + MethodUnsubscribe = "Unsubscribe" + MethodSendRSVP = "SendRSVP" + MethodGetCachedEmails = "GetCachedEmails" + MethodGetCachedBody = "GetCachedBody" + MethodExportContacts = "ExportContacts" +) + +// Event type names. +const ( + EventNewMail = "NewMail" + EventSyncStarted = "SyncStarted" + EventSyncComplete = "SyncComplete" + EventSyncError = "SyncError" + EventEmailsUpdated = "EmailsUpdated" + EventConfigReloaded = "ConfigReloaded" +) + +// Param/result types for RPC methods. + +type PingResult struct { + Pong bool `json:"pong"` +} + +type StatusResult struct { + Running bool `json:"running"` + Uptime int64 `json:"uptime_seconds"` + Accounts []string `json:"accounts"` + PID int `json:"pid"` +} + +type AccountInfo struct { + ID string `json:"id"` + Name string `json:"name"` + Email string `json:"email"` + Protocol string `json:"protocol"` +} + +type FetchEmailsParams struct { + AccountID string `json:"account_id"` + Folder string `json:"folder"` + Limit uint32 `json:"limit"` + Offset uint32 `json:"offset"` +} + +type FetchEmailBodyParams struct { + AccountID string `json:"account_id"` + Folder string `json:"folder"` + UID uint32 `json:"uid"` +} + +type FetchEmailBodyResult struct { + Body string `json:"body"` + Attachments []AttachmentInfo `json:"attachments"` +} + +type AttachmentInfo struct { + Filename string `json:"filename"` + PartID string `json:"part_id"` + Encoding string `json:"encoding"` + MIMEType string `json:"mime_type"` + IsCalendarInvite bool `json:"is_calendar_invite,omitempty"` + CalendarData []byte `json:"calendar_data,omitempty"` +} + +type SendEmailParams struct { + AccountID string `json:"account_id"` + To []string `json:"to"` + Cc []string `json:"cc,omitempty"` + Bcc []string `json:"bcc,omitempty"` + Subject string `json:"subject"` + Body string `json:"body"` + HTMLBody string `json:"html_body,omitempty"` + Attachments map[string][]byte `json:"attachments,omitempty"` + InReplyTo string `json:"in_reply_to,omitempty"` + References []string `json:"references,omitempty"` + SignSMIME bool `json:"sign_smime,omitempty"` + EncryptSMIME bool `json:"encrypt_smime,omitempty"` + SignPGP bool `json:"sign_pgp,omitempty"` + EncryptPGP bool `json:"encrypt_pgp,omitempty"` +} + +type DeleteEmailsParams struct { + AccountID string `json:"account_id"` + Folder string `json:"folder"` + UIDs []uint32 `json:"uids"` +} + +type ArchiveEmailsParams struct { + AccountID string `json:"account_id"` + Folder string `json:"folder"` + UIDs []uint32 `json:"uids"` +} + +type MoveEmailsParams struct { + AccountID string `json:"account_id"` + UIDs []uint32 `json:"uids"` + SourceFolder string `json:"source_folder"` + DestFolder string `json:"dest_folder"` +} + +type MarkReadParams struct { + AccountID string `json:"account_id"` + Folder string `json:"folder"` + UIDs []uint32 `json:"uids"` + Read bool `json:"read"` +} + +type FetchFoldersParams struct { + AccountID string `json:"account_id"` +} + +type RefreshFolderParams struct { + AccountID string `json:"account_id"` + Folder string `json:"folder"` +} + +type SubscribeParams struct { + AccountID string `json:"account_id"` + Folder string `json:"folder"` +} + +type UnsubscribeParams struct { + AccountID string `json:"account_id"` + Folder string `json:"folder"` +} + +type SendRSVPParams struct { + AccountID string `json:"account_id"` + OriginalICS []byte `json:"original_ics"` + Response string `json:"response"` + InReplyTo string `json:"in_reply_to,omitempty"` + References []string `json:"references,omitempty"` +} + +type GetCachedEmailsParams struct { + Folder string `json:"folder"` +} + +type GetCachedBodyParams struct { + Folder string `json:"folder"` + UID uint32 `json:"uid"` + AccountID string `json:"account_id"` +} + +type ExportContactsParams struct { + Format string `json:"format"` // "json" or "csv" +} + +// Event data types. + +type NewMailEvent struct { + AccountID string `json:"account_id"` + Folder string `json:"folder"` +} + +type SyncStartedEvent struct { + AccountID string `json:"account_id"` + Folder string `json:"folder"` +} + +type SyncCompleteEvent struct { + AccountID string `json:"account_id"` + Folder string `json:"folder"` + EmailCount int `json:"email_count"` +} + +type SyncErrorEvent struct { + AccountID string `json:"account_id"` + Folder string `json:"folder"` + Error string `json:"error"` +} diff --git a/daemonrpc/protocol_test.go b/daemonrpc/protocol_test.go new file mode 100644 index 0000000000000000000000000000000000000000..d78dbc6d96a8a0480ff1afa56d8cb996b0c07fe9 --- /dev/null +++ b/daemonrpc/protocol_test.go @@ -0,0 +1,138 @@ +package daemonrpc + +import ( + "encoding/json" + "testing" +) + +func TestDecodeMessage_Request(t *testing.T) { + raw := json.RawMessage(`{"id":1,"method":"Ping"}`) + msg, err := DecodeMessage(raw) + if err != nil { + t.Fatal(err) + } + if msg.Request == nil { + t.Fatal("expected Request, got nil") + } + if msg.Request.Method != "Ping" { + t.Errorf("method = %q, want Ping", msg.Request.Method) + } + if msg.Request.ID != 1 { + t.Errorf("id = %d, want 1", msg.Request.ID) + } + if msg.Response != nil || msg.Event != nil { + t.Error("expected only Request to be set") + } +} + +func TestDecodeMessage_Response(t *testing.T) { + raw := json.RawMessage(`{"id":1,"result":{"pong":true}}`) + msg, err := DecodeMessage(raw) + if err != nil { + t.Fatal(err) + } + if msg.Response == nil { + t.Fatal("expected Response, got nil") + } + if msg.Response.ID != 1 { + t.Errorf("id = %d, want 1", msg.Response.ID) + } + if msg.Response.Error != nil { + t.Error("expected no error") + } +} + +func TestDecodeMessage_ResponseError(t *testing.T) { + raw := json.RawMessage(`{"id":2,"error":{"code":-32601,"message":"not found"}}`) + msg, err := DecodeMessage(raw) + if err != nil { + t.Fatal(err) + } + if msg.Response == nil { + t.Fatal("expected Response") + } + if msg.Response.Error == nil { + t.Fatal("expected error in response") + } + if msg.Response.Error.Code != ErrCodeNotFound { + t.Errorf("code = %d, want %d", msg.Response.Error.Code, ErrCodeNotFound) + } + if msg.Response.Error.Message != "not found" { + t.Errorf("message = %q, want 'not found'", msg.Response.Error.Message) + } +} + +func TestDecodeMessage_Event(t *testing.T) { + raw := json.RawMessage(`{"type":"NewMail","data":{"account_id":"abc","folder":"INBOX"}}`) + msg, err := DecodeMessage(raw) + if err != nil { + t.Fatal(err) + } + if msg.Event == nil { + t.Fatal("expected Event, got nil") + } + if msg.Event.Type != EventNewMail { + t.Errorf("type = %q, want NewMail", msg.Event.Type) + } + + var ev NewMailEvent + if err := json.Unmarshal(msg.Event.Data, &ev); err != nil { + t.Fatal(err) + } + if ev.AccountID != "abc" { + t.Errorf("account_id = %q, want abc", ev.AccountID) + } +} + +func TestDecodeMessage_Invalid(t *testing.T) { + raw := json.RawMessage(`{invalid}`) + _, err := DecodeMessage(raw) + if err == nil { + t.Error("expected error for invalid JSON") + } +} + +func TestError_ErrorInterface(t *testing.T) { + e := &Error{Code: ErrCodeInternal, Message: "something broke"} + if e.Error() != "something broke" { + t.Errorf("Error() = %q, want 'something broke'", e.Error()) + } +} + +func TestRequestRoundTrip(t *testing.T) { + params, _ := json.Marshal(FetchEmailsParams{ + AccountID: "acc1", + Folder: "INBOX", + Limit: 50, + Offset: 0, + }) + req := Request{ + ID: 42, + Method: MethodFetchEmails, + Params: params, + } + + data, err := json.Marshal(req) + if err != nil { + t.Fatal(err) + } + + msg, err := DecodeMessage(data) + if err != nil { + t.Fatal(err) + } + if msg.Request == nil { + t.Fatal("expected Request") + } + if msg.Request.ID != 42 { + t.Errorf("id = %d, want 42", msg.Request.ID) + } + + var p FetchEmailsParams + if err := json.Unmarshal(msg.Request.Params, &p); err != nil { + t.Fatal(err) + } + if p.AccountID != "acc1" || p.Folder != "INBOX" || p.Limit != 50 { + t.Errorf("params mismatch: %+v", p) + } +} diff --git a/daemonrpc/socket.go b/daemonrpc/socket.go new file mode 100644 index 0000000000000000000000000000000000000000..b8e9286ef015a7202e2ea1c976020461f12733e4 --- /dev/null +++ b/daemonrpc/socket.go @@ -0,0 +1,44 @@ +package daemonrpc + +import ( + "fmt" + "os" + "path/filepath" + "runtime" +) + +// runtimeDir returns the base directory for daemon runtime files. +// Linux: $XDG_RUNTIME_DIR/matcha/ +// macOS: ~/Library/Caches/matcha/ +func runtimeDir() string { + switch runtime.GOOS { + case "darwin": + home, _ := os.UserHomeDir() + return filepath.Join(home, "Library", "Caches", "matcha") + default: // linux and others + if dir := os.Getenv("XDG_RUNTIME_DIR"); dir != "" { + return filepath.Join(dir, "matcha") + } + // Fallback: /tmp/matcha- + return filepath.Join(os.TempDir(), "matcha-"+uidStr()) + } +} + +func uidStr() string { + return fmt.Sprintf("%d", os.Getuid()) +} + +// SocketPath returns the path to the daemon's Unix domain socket. +func SocketPath() string { + return filepath.Join(runtimeDir(), "daemon.sock") +} + +// PIDPath returns the path to the daemon's PID file. +func PIDPath() string { + return filepath.Join(runtimeDir(), "daemon.pid") +} + +// EnsureRuntimeDir creates the runtime directory if it doesn't exist. +func EnsureRuntimeDir() error { + return os.MkdirAll(runtimeDir(), 0700) +} diff --git a/daemonrpc/transport.go b/daemonrpc/transport.go new file mode 100644 index 0000000000000000000000000000000000000000..2c85cbd487d1440cdab384ddd180ef58dc79bfcb --- /dev/null +++ b/daemonrpc/transport.go @@ -0,0 +1,90 @@ +package daemonrpc + +import ( + "encoding/json" + "fmt" + "net" + "sync" +) + +// Conn wraps a net.Conn with newline-delimited JSON encoding/decoding. +type Conn struct { + conn net.Conn + enc *json.Encoder + dec *json.Decoder + mu sync.Mutex // serializes writes +} + +// NewConn wraps an existing network connection. +func NewConn(c net.Conn) *Conn { + return &Conn{ + conn: c, + enc: json.NewEncoder(c), + dec: json.NewDecoder(c), + } +} + +// Send writes a JSON-encoded message followed by a newline. +// Thread-safe. +func (c *Conn) Send(v interface{}) error { + c.mu.Lock() + defer c.mu.Unlock() + return c.enc.Encode(v) +} + +// SendResponse sends a successful response with the given result. +func (c *Conn) SendResponse(id uint64, result interface{}) error { + raw, err := json.Marshal(result) + if err != nil { + return fmt.Errorf("marshal result: %w", err) + } + return c.Send(&Response{ + ID: id, + Result: raw, + }) +} + +// SendError sends an error response. +func (c *Conn) SendError(id uint64, code int, message string) error { + return c.Send(&Response{ + ID: id, + Error: &Error{Code: code, Message: message}, + }) +} + +// SendEvent sends a push event to the client. +func (c *Conn) SendEvent(eventType string, data interface{}) error { + raw, err := json.Marshal(data) + if err != nil { + return fmt.Errorf("marshal event data: %w", err) + } + return c.Send(&Event{ + Type: eventType, + Data: raw, + }) +} + +// ReceiveMessage reads and decodes the next JSON message, returning +// a discriminated Message (Request, Response, or Event). +func (c *Conn) ReceiveMessage() (Message, error) { + var raw json.RawMessage + if err := c.dec.Decode(&raw); err != nil { + return Message{}, err + } + return DecodeMessage(raw) +} + +// Close closes the underlying connection. +func (c *Conn) Close() error { + return c.conn.Close() +} + +// RemoteAddr returns the remote address of the connection. +func (c *Conn) RemoteAddr() net.Addr { + return c.conn.RemoteAddr() +} + +// LocalAddr returns the local address of the connection. +func (c *Conn) LocalAddr() net.Addr { + return c.conn.LocalAddr() +} diff --git a/daemonrpc/transport_test.go b/daemonrpc/transport_test.go new file mode 100644 index 0000000000000000000000000000000000000000..a1ff2658fc66b68c194c930436779919803256f6 --- /dev/null +++ b/daemonrpc/transport_test.go @@ -0,0 +1,170 @@ +package daemonrpc + +import ( + "encoding/json" + "net" + "testing" +) + +func testPipe() (*Conn, *Conn) { + a, b := net.Pipe() + return NewConn(a), NewConn(b) +} + +func TestConn_SendReceiveRequest(t *testing.T) { + client, server := testPipe() + defer client.Close() + defer server.Close() + + done := make(chan error, 1) + go func() { + params, _ := json.Marshal(PingResult{Pong: true}) + err := client.Send(&Request{ID: 1, Method: MethodPing, Params: params}) + done <- err + }() + + msg, err := server.ReceiveMessage() + if err != nil { + t.Fatal(err) + } + if msg.Request == nil { + t.Fatal("expected Request") + } + if msg.Request.Method != MethodPing { + t.Errorf("method = %q, want Ping", msg.Request.Method) + } + + if err := <-done; err != nil { + t.Fatal(err) + } +} + +func TestConn_SendResponse(t *testing.T) { + client, server := testPipe() + defer client.Close() + defer server.Close() + + done := make(chan error, 1) + go func() { + err := server.SendResponse(1, PingResult{Pong: true}) + done <- err + }() + + msg, err := client.ReceiveMessage() + if err != nil { + t.Fatal(err) + } + if msg.Response == nil { + t.Fatal("expected Response") + } + if msg.Response.ID != 1 { + t.Errorf("id = %d, want 1", msg.Response.ID) + } + + var result PingResult + if err := json.Unmarshal(msg.Response.Result, &result); err != nil { + t.Fatal(err) + } + if !result.Pong { + t.Error("expected pong=true") + } + + if err := <-done; err != nil { + t.Fatal(err) + } +} + +func TestConn_SendError(t *testing.T) { + client, server := testPipe() + defer client.Close() + defer server.Close() + + done := make(chan error, 1) + go func() { + err := server.SendError(5, ErrCodeNotFound, "method not found") + done <- err + }() + + msg, err := client.ReceiveMessage() + if err != nil { + t.Fatal(err) + } + if msg.Response == nil { + t.Fatal("expected Response") + } + if msg.Response.Error == nil { + t.Fatal("expected error") + } + if msg.Response.Error.Code != ErrCodeNotFound { + t.Errorf("code = %d, want %d", msg.Response.Error.Code, ErrCodeNotFound) + } + + if err := <-done; err != nil { + t.Fatal(err) + } +} + +func TestConn_SendEvent(t *testing.T) { + client, server := testPipe() + defer client.Close() + defer server.Close() + + done := make(chan error, 1) + go func() { + err := server.SendEvent(EventNewMail, NewMailEvent{ + AccountID: "acc1", + Folder: "INBOX", + }) + done <- err + }() + + msg, err := client.ReceiveMessage() + if err != nil { + t.Fatal(err) + } + if msg.Event == nil { + t.Fatal("expected Event") + } + if msg.Event.Type != EventNewMail { + t.Errorf("type = %q, want NewMail", msg.Event.Type) + } + + var ev NewMailEvent + if err := json.Unmarshal(msg.Event.Data, &ev); err != nil { + t.Fatal(err) + } + if ev.AccountID != "acc1" { + t.Errorf("account_id = %q, want acc1", ev.AccountID) + } + + if err := <-done; err != nil { + t.Fatal(err) + } +} + +func TestConn_MultipleMessages(t *testing.T) { + client, server := testPipe() + defer client.Close() + defer server.Close() + + go func() { + client.Send(&Request{ID: 1, Method: MethodPing}) + client.Send(&Request{ID: 2, Method: MethodGetStatus}) + }() + + msg1, err := server.ReceiveMessage() + if err != nil { + t.Fatal(err) + } + if msg1.Request.ID != 1 { + t.Errorf("first id = %d, want 1", msg1.Request.ID) + } + + msg2, err := server.ReceiveMessage() + if err != nil { + t.Fatal(err) + } + if msg2.Request.ID != 2 { + t.Errorf("second id = %d, want 2", msg2.Request.ID) + } +} diff --git a/docs/docs/Features/DAEMON.md b/docs/docs/Features/DAEMON.md new file mode 100644 index 0000000000000000000000000000000000000000..fe44960e2384d2d46bacf29fbe47d927c068f3cf --- /dev/null +++ b/docs/docs/Features/DAEMON.md @@ -0,0 +1,119 @@ +# Background Daemon + +Matcha includes an optional background daemon that keeps email connections alive, syncs mail, and sends desktop notifications — even when the TUI is closed. + +## Features + +- **Always-On IMAP IDLE**: Maintains persistent connections to detect new mail instantly. +- **Periodic Sync**: Fetches new emails every 5 minutes for all accounts. +- **Desktop Notifications**: Sends notifications when new mail arrives and the TUI is not running. +- **Instant TUI Startup**: When the TUI connects to a running daemon, email data is immediately available. +- **Automatic Fallback**: If the daemon is not running, the TUI works exactly as before (direct mode). + +## Commands + +```bash +matcha daemon start # Start the daemon in the background +matcha daemon stop # Stop the running daemon +matcha daemon status # Show daemon status (PID, uptime, accounts) +matcha daemon run # Run the daemon in the foreground (for systemd/launchd) +``` + +## How It Works + +The daemon runs as a separate background process. It communicates with the TUI over a Unix domain socket using a JSON-based protocol. + +``` +┌─────────────┐ Unix Socket ┌──────────────────┐ +│ TUI Client │◄──── JSON-RPC ─────►│ Daemon │ +│ (matcha) │ bidirectional │ (matcha daemon) │ +└─────────────┘ └──────────────────┘ +``` + +When you open the TUI: +1. It tries to connect to the daemon socket. +2. If the daemon is running, the TUI subscribes to folder updates and receives real-time push events. +3. If the daemon is not running, the TUI falls back to direct mode — identical to previous behavior. + +## Status + +```bash +$ matcha daemon status +Daemon running (PID 12345) +Uptime: 2h 15m +Accounts: 2 + - alice@gmail.com + - bob@outlook.com +``` + +## Running as a System Service + +### systemd (Linux) + +Create `~/.config/systemd/user/matcha-daemon.service`: + +```ini +[Unit] +Description=Matcha Email Daemon +After=network-online.target + +[Service] +ExecStart=/usr/bin/matcha daemon run +Restart=on-failure +RestartSec=5 + +[Install] +WantedBy=default.target +``` + +```bash +systemctl --user enable matcha-daemon +systemctl --user start matcha-daemon +``` + +### launchd (macOS) + +Create `~/Library/LaunchAgents/com.matcha.daemon.plist`: + +```xml + + + + + Label + com.matcha.daemon + ProgramArguments + + /usr/local/bin/matcha + daemon + run + + RunAtLoad + + KeepAlive + + + +``` + +```bash +launchctl load ~/Library/LaunchAgents/com.matcha.daemon.plist +``` + +## File Paths + +| File | Platform | Purpose | +|------|----------|---------| +| `$XDG_RUNTIME_DIR/matcha/daemon.sock` | Linux | Unix domain socket | +| `$XDG_RUNTIME_DIR/matcha/daemon.pid` | Linux | PID file | +| `~/Library/Caches/matcha/daemon.sock` | macOS | Unix domain socket | +| `~/Library/Caches/matcha/daemon.pid` | macOS | PID file | + +## Architecture + +The daemon is split across three packages: + +- **`daemonrpc/`** — Shared protocol definitions (request/response types, event types, transport layer). Used by both daemon and client. +- **`daemon/`** — The daemon process itself: lifecycle management, RPC handlers, IDLE watchers, periodic sync, PID file management, signal handling. +- **`daemonclient/`** — Client library with a `Service` interface that abstracts daemon mode vs direct mode. The TUI uses this interface transparently. diff --git a/main.go b/main.go index b2ae5ae2dec51c20180234568c37cab2bff0914a..27062e239905645aae671301456259c88735037a 100644 --- a/main.go +++ b/main.go @@ -31,6 +31,9 @@ import ( matchaCli "github.com/floatpane/matcha/cli" "github.com/floatpane/matcha/clib" "github.com/floatpane/matcha/config" + matchaDaemon "github.com/floatpane/matcha/daemon" + "github.com/floatpane/matcha/daemonclient" + "github.com/floatpane/matcha/daemonrpc" "github.com/floatpane/matcha/fetcher" "github.com/floatpane/matcha/notify" "github.com/floatpane/matcha/plugin" @@ -89,6 +92,8 @@ type mainModel struct { idleUpdates chan fetcher.IdleUpdate // Multi-protocol backend providers (keyed by account ID) providers map[string]backend.Provider + // Daemon client service (daemon or direct fallback) + service daemonclient.Service // Plugin prompt waiting for user input pendingPrompt *plugin.PendingPrompt } @@ -176,6 +181,9 @@ func (m *mainModel) Update(msg tea.Msg) (tea.Model, tea.Cmd) { case tea.KeyPressMsg: if msg.String() == "ctrl+c" { m.idleWatcher.StopAll() + if m.service != nil { + m.service.Close() + } return m, tea.Quit } if msg.String() == "esc" { @@ -397,17 +405,32 @@ func (m *mainModel) Update(msg tea.Msg) (tea.Model, tea.Cmd) { } m.current = m.folderInbox m.current, _ = m.current.Update(tea.WindowSizeMsg{Width: m.width, Height: m.height}) - // Start IDLE watchers for all accounts on INBOX - for i := range m.config.Accounts { - m.idleWatcher.Watch(&m.config.Accounts[i], "INBOX") + // Initialize daemon service if not already set. + if m.service == nil { + m.service = daemonclient.NewService(m.config) + } + if m.service.IsDaemon() { + // Subscribe to INBOX updates if using daemon. + for _, acct := range m.config.Accounts { + m.service.Subscribe(acct.ID, "INBOX") + } + } else { + // Start IDLE watchers for all accounts on INBOX + for i := range m.config.Accounts { + m.idleWatcher.Watch(&m.config.Accounts[i], "INBOX") + } } // Fetch folders and INBOX emails in parallel (background refresh) - return m, tea.Batch( + batchCmds := []tea.Cmd{ m.current.Init(), fetchFoldersCmd(m.config), fetchFolderEmailsCmd(m.config, "INBOX"), listenForIdleUpdates(m.idleUpdates), - ) + } + if m.service.IsDaemon() { + batchCmds = append(batchCmds, listenForDaemonEvents(m.service.Events())) + } + return m, tea.Batch(batchCmds...) case tui.FoldersFetchedMsg: if m.folderInbox == nil { @@ -437,10 +460,22 @@ func (m *mainModel) Update(msg tea.Msg) (tea.Model, tea.Cmd) { // Only start IDLE for accounts that actually have this folder folders := config.GetCachedFolders(m.config.Accounts[i].ID) if !slices.Contains(folders, msg.FolderName) { - m.idleWatcher.Stop(m.config.Accounts[i].ID) + if m.service != nil && m.service.IsDaemon() { + m.service.Unsubscribe(m.config.Accounts[i].ID, msg.PreviousFolder) + } else { + m.idleWatcher.Stop(m.config.Accounts[i].ID) + } continue } - m.idleWatcher.Watch(&m.config.Accounts[i], msg.FolderName) + if m.service != nil && m.service.IsDaemon() { + // Unsubscribe from old, subscribe to new. + if msg.PreviousFolder != "" { + m.service.Unsubscribe(m.config.Accounts[i].ID, msg.PreviousFolder) + } + m.service.Subscribe(m.config.Accounts[i].ID, msg.FolderName) + } else { + m.idleWatcher.Watch(&m.config.Accounts[i], msg.FolderName) + } } if m.plugins != nil { m.plugins.CallFolderHook(plugin.HookFolderChanged, msg.FolderName) @@ -642,6 +677,43 @@ func (m *mainModel) Update(msg tea.Msg) (tea.Model, tea.Cmd) { // Re-subscribe even if not viewing the affected folder return m, listenForIdleUpdates(m.idleUpdates) + case tui.DaemonEventMsg: + if msg.Event == nil { + return m, nil + } + var cmds []tea.Cmd + // Re-subscribe for next event. + if m.service != nil && m.service.IsDaemon() { + cmds = append(cmds, listenForDaemonEvents(m.service.Events())) + } + switch msg.Event.Type { + case daemonrpc.EventNewMail: + var ev daemonrpc.NewMailEvent + if err := json.Unmarshal(msg.Event.Data, &ev); err == nil { + if m.config == nil || !m.config.DisableNotifications { + accountName := ev.AccountID + if m.config != nil { + if acc := m.config.GetAccountByID(ev.AccountID); acc != nil { + accountName = acc.Email + } + } + go notify.Send("Matcha", fmt.Sprintf("New mail in %s (%s)", ev.Folder, accountName)) + } + + if m.folderInbox != nil && m.folderInbox.GetCurrentFolder() == ev.Folder { + cmds = append(cmds, fetchFolderEmailsCmd(m.config, ev.Folder)) + } + } + case daemonrpc.EventSyncComplete: + var ev daemonrpc.SyncCompleteEvent + if err := json.Unmarshal(msg.Event.Data, &ev); err == nil { + if m.folderInbox != nil && m.folderInbox.GetCurrentFolder() == ev.Folder { + cmds = append(cmds, fetchFolderEmailsCmd(m.config, ev.Folder)) + } + } + } + return m, tea.Batch(cmds...) + case tui.RequestRefreshMsg: // Folder-based refresh: clear folder cache and refetch if msg.FolderName != "" && m.config != nil { @@ -2210,6 +2282,19 @@ func listenForIdleUpdates(ch <-chan fetcher.IdleUpdate) tea.Cmd { } } +// --- Daemon event listener --- + +// listenForDaemonEvents blocks until a daemon event arrives, then returns it as a tea.Msg. +func listenForDaemonEvents(ch <-chan *daemonrpc.Event) tea.Cmd { + return func() tea.Msg { + ev, ok := <-ch + if !ok { + return nil + } + return tui.DaemonEventMsg{Event: ev} + } +} + // --- Folder-based command functions --- func fetchFoldersCmd(cfg *config.Config) tea.Cmd { @@ -3244,6 +3329,12 @@ func main() { os.Exit(0) } + // Daemon CLI subcommand: matcha daemon + if len(os.Args) > 1 && os.Args[1] == "daemon" { + runDaemonCLI(os.Args[2:]) + os.Exit(0) + } + // OAuth2 CLI subcommand: matcha oauth [flags] // "gmail" is kept as an alias for backwards compatibility. if len(os.Args) > 1 && (os.Args[1] == "oauth" || os.Args[1] == "gmail") { @@ -3339,3 +3430,135 @@ func main() { plugins.CallHook(plugin.HookShutdown) plugins.Close() } + +func runDaemonCLI(args []string) { + if len(args) == 0 { + fmt.Println("Usage: matcha daemon ") + fmt.Println() + fmt.Println("Commands:") + fmt.Println(" start Start the daemon in the background") + fmt.Println(" stop Stop the running daemon") + fmt.Println(" status Show daemon status") + fmt.Println(" run Run the daemon in the foreground") + os.Exit(1) + } + + switch args[0] { + case "start": + runDaemonStart() + case "stop": + runDaemonStop() + case "status": + runDaemonStatus() + case "run": + runDaemonRun() + default: + fmt.Fprintf(os.Stderr, "unknown daemon command: %s\n", args[0]) + os.Exit(1) + } +} + +func runDaemonStart() { + pidPath := daemonrpc.PIDPath() + if pid, running := matchaDaemon.IsRunning(pidPath); running { + fmt.Printf("Daemon already running (PID %d)\n", pid) + return + } + + // Fork ourselves with "daemon run". + exe, err := os.Executable() + if err != nil { + fmt.Fprintf(os.Stderr, "cannot find executable: %v\n", err) + os.Exit(1) + } + + cmd := exec.Command(exe, "daemon", "run") + cmd.Stdout = nil + cmd.Stderr = nil + cmd.Stdin = nil + + // Detach from parent process. + cmd.SysProcAttr = daemonclient.DaemonProcAttr() + + if err := cmd.Start(); err != nil { + fmt.Fprintf(os.Stderr, "failed to start daemon: %v\n", err) + os.Exit(1) + } + + fmt.Printf("Daemon started (PID %d)\n", cmd.Process.Pid) +} + +func runDaemonStop() { + pidPath := daemonrpc.PIDPath() + pid, running := matchaDaemon.IsRunning(pidPath) + if !running { + fmt.Println("Daemon is not running") + return + } + + process, err := os.FindProcess(pid) + if err != nil { + fmt.Fprintf(os.Stderr, "cannot find process %d: %v\n", pid, err) + os.Exit(1) + } + + if err := process.Signal(os.Interrupt); err != nil { + fmt.Fprintf(os.Stderr, "failed to stop daemon: %v\n", err) + os.Exit(1) + } + + fmt.Printf("Daemon stopped (PID %d)\n", pid) +} + +func runDaemonStatus() { + // Try connecting to daemon for live status. + client, err := daemonclient.Dial() + if err != nil { + pidPath := daemonrpc.PIDPath() + if pid, running := matchaDaemon.IsRunning(pidPath); running { + fmt.Printf("Daemon running (PID %d) but not responding\n", pid) + } else { + fmt.Println("Daemon is not running") + } + return + } + defer client.Close() + + status, err := client.Status() + if err != nil { + fmt.Fprintf(os.Stderr, "failed to get status: %v\n", err) + os.Exit(1) + } + + fmt.Printf("Daemon running (PID %d)\n", status.PID) + fmt.Printf("Uptime: %s\n", formatUptime(status.Uptime)) + fmt.Printf("Accounts: %d\n", len(status.Accounts)) + for _, acct := range status.Accounts { + fmt.Printf(" - %s\n", acct) + } +} + +func runDaemonRun() { + cfg, err := config.LoadConfig() + if err != nil { + fmt.Fprintf(os.Stderr, "failed to load config: %v\n", err) + os.Exit(1) + } + + d := matchaDaemon.New(cfg) + if err := d.Run(); err != nil { + fmt.Fprintf(os.Stderr, "daemon error: %v\n", err) + os.Exit(1) + } +} + +func formatUptime(seconds int64) string { + d := time.Duration(seconds) * time.Second + if d < time.Minute { + return fmt.Sprintf("%ds", int(d.Seconds())) + } + if d < time.Hour { + return fmt.Sprintf("%dm %ds", int(d.Minutes()), int(d.Seconds())%60) + } + return fmt.Sprintf("%dh %dm", int(d.Hours()), int(d.Minutes())%60) +} diff --git a/tui/folder_inbox.go b/tui/folder_inbox.go index e12f8979930bde833ded59373167a9f10eee2107..750714ebb6eca71e1dcd12464a12632c5f126d11 100644 --- a/tui/folder_inbox.go +++ b/tui/folder_inbox.go @@ -347,6 +347,7 @@ func (m *FolderInbox) moveFolderChoices() []string { func (m *FolderInbox) switchFolder() tea.Cmd { if m.activeFolderIdx >= 0 && m.activeFolderIdx < len(m.folders) { + prevFolder := m.currentFolder m.currentFolder = m.folders[m.activeFolderIdx] m.isLoadingEmails = true m.inbox.SetFolderName(m.currentFolder) @@ -354,7 +355,7 @@ func (m *FolderInbox) switchFolder() tea.Cmd { m.inbox.SetEmails(nil, m.accounts) folder := m.currentFolder return func() tea.Msg { - return SwitchFolderMsg{FolderName: folder} + return SwitchFolderMsg{FolderName: folder, PreviousFolder: prevFolder} } } return nil diff --git a/tui/messages.go b/tui/messages.go index 08698deede2969859c93bf90944ba9eac3f2cee0..f2b705cc80624f2916e61dbbb8618dfe41097819 100644 --- a/tui/messages.go +++ b/tui/messages.go @@ -3,6 +3,7 @@ package tui import ( "github.com/floatpane/matcha/calendar" "github.com/floatpane/matcha/config" + "github.com/floatpane/matcha/daemonrpc" "github.com/floatpane/matcha/fetcher" ) @@ -380,8 +381,9 @@ type FoldersFetchedMsg struct { // SwitchFolderMsg signals switching to a different IMAP folder. type SwitchFolderMsg struct { - FolderName string - AccountID string + FolderName string + PreviousFolder string + AccountID string } // FolderEmailsFetchedMsg signals that emails from a folder have been fetched. @@ -463,6 +465,13 @@ type IdleNewMailMsg struct { FolderName string } +// --- Daemon Messages --- + +// DaemonEventMsg wraps an event pushed from the daemon process. +type DaemonEventMsg struct { + Event *daemonrpc.Event +} + // --- Plugin Messages --- // PluginNotifyMsg signals that a plugin wants to show a notification.