@@ -6,11 +6,11 @@ import (
"log"
"net"
"os"
- "runtime/debug"
"sort"
"sync"
"time"
+ udsrpc "github.com/floatpane/go-uds-jsonrpc"
"github.com/floatpane/matcha/backend"
"github.com/floatpane/matcha/config"
"github.com/floatpane/matcha/daemonrpc"
@@ -25,12 +25,10 @@ const inboxFolder = "INBOX"
type Daemon struct {
config *config.Config
providers map[string]backend.Provider
- listener net.Listener
+ server *udsrpc.Server
startTime time.Time
- // Connected TUI/CLI clients.
- clients map[*daemonrpc.Conn]struct{}
- mu sync.RWMutex
+ mu sync.RWMutex
// Per-client subscriptions: conn → set of "accountID:folder".
subscriptions map[*daemonrpc.Conn]map[string]struct{}
@@ -53,16 +51,47 @@ type Daemon struct {
// New creates a daemon with the given config.
func New(cfg *config.Config) *Daemon {
idleUpdates := make(chan fetcher.IdleUpdate, 16)
- return &Daemon{
+ d := &Daemon{
config: cfg,
providers: make(map[string]backend.Provider),
- clients: make(map[*daemonrpc.Conn]struct{}),
subscriptions: make(map[*daemonrpc.Conn]map[string]struct{}),
idleWatcher: fetcher.NewIdleWatcher(idleUpdates),
idleUpdates: idleUpdates,
shutdown: make(chan struct{}),
done: make(chan struct{}),
}
+
+ d.server = udsrpc.NewServer()
+ d.registerHandlers()
+ d.server.OnConnect(func(_ *daemonrpc.Conn) {
+ log.Println("daemon: client connected")
+ })
+ d.server.OnDisconnect(func(conn *daemonrpc.Conn) {
+ d.subMu.Lock()
+ delete(d.subscriptions, conn)
+ d.subMu.Unlock()
+ log.Println("daemon: client disconnected")
+ })
+
+ return d
+}
+
+// registerHandlers wires each RPC method to its handler on the server.
+func (d *Daemon) registerHandlers() {
+ d.server.Handle(daemonrpc.MethodPing, d.handlePing)
+ d.server.Handle(daemonrpc.MethodGetStatus, d.handleGetStatus)
+ d.server.Handle(daemonrpc.MethodGetAccounts, d.handleGetAccounts)
+ d.server.Handle(daemonrpc.MethodReloadConfig, d.handleReloadConfig)
+ d.server.Handle(daemonrpc.MethodFetchEmails, d.handleFetchEmails)
+ d.server.Handle(daemonrpc.MethodFetchEmailBody, d.handleFetchEmailBody)
+ d.server.Handle(daemonrpc.MethodDeleteEmails, d.handleDeleteEmails)
+ d.server.Handle(daemonrpc.MethodArchiveEmails, d.handleArchiveEmails)
+ d.server.Handle(daemonrpc.MethodMoveEmails, d.handleMoveEmails)
+ d.server.Handle(daemonrpc.MethodMarkRead, d.handleMarkRead)
+ d.server.Handle(daemonrpc.MethodFetchFolders, d.handleFetchFolders)
+ d.server.Handle(daemonrpc.MethodRefreshFolder, d.handleRefreshFolder)
+ d.server.Handle(daemonrpc.MethodSubscribe, d.handleSubscribe)
+ d.server.Handle(daemonrpc.MethodUnsubscribe, d.handleUnsubscribe)
}
// Run starts the daemon: creates providers, starts the socket listener,
@@ -94,12 +123,11 @@ func (d *Daemon) Run() error {
}
// Listen on Unix domain socket.
- var err error
- d.listener, err = net.Listen("unix", sockPath) //nolint:noctx
+ listener, err := net.Listen("unix", sockPath) //nolint:noctx
if err != nil {
return fmt.Errorf("listen: %w", err)
}
- defer d.listener.Close() //nolint:errcheck
+ defer listener.Close() //nolint:errcheck
// Set socket permissions (owner only).
if err := os.Chmod(sockPath, 0700); err != nil { // #nosec G302
@@ -115,28 +143,42 @@ func (d *Daemon) Run() error {
d.startIdleWatchers()
go d.idleEventLoop()
- // Start signal handler.
- go d.handleSignals()
+ // Handle OS signals: SIGTERM/SIGINT → shutdown, SIGHUP → reload config.
+ stopSignals := udsrpc.HandleSignals(d.Shutdown, func() {
+ log.Println("daemon: received SIGHUP, reloading config")
+ if err := d.ReloadConfig(); err != nil {
+ log.Printf("daemon: config reload failed: %v", err)
+ }
+ })
+ defer stopSignals()
// Start background sync.
ctx, cancel := context.WithCancel(context.Background())
d.syncCancel = cancel
go d.backgroundSync(ctx)
- // Accept client connections.
- go d.acceptLoop()
+ // Serve client connections via the shared RPC server. Canceling serveCtx
+ // closes the listener and unblocks Serve.
+ serveCtx, serveCancel := context.WithCancel(context.Background())
+ go func() {
+ if err := d.server.Serve(serveCtx, listener); err != nil {
+ log.Printf("daemon: serve error: %v", err)
+ }
+ }()
// Block until shutdown.
<-d.shutdown
// Cleanup.
log.Println("daemon: shutting down")
- d.listener.Close() //nolint:errcheck,gosec
+ serveCancel()
+ for _, conn := range d.server.Clients() {
+ conn.Close() //nolint:errcheck,gosec
+ }
if err := d.idleWatcher.StopAllAndWaitTimeout(5 * time.Second); err != nil {
log.Printf("daemon: %v", err)
}
cancel()
- d.closeAllClients()
d.closeProviders()
close(d.done)
@@ -192,79 +234,6 @@ func (d *Daemon) initProviders() {
}
}
-func (d *Daemon) acceptLoop() {
- for {
- done := func() bool {
- defer func() {
- if r := recover(); r != nil {
- log.Printf("daemon: acceptLoop panic recovered: %v\n%s", r, debug.Stack())
- }
- }()
- conn, err := d.listener.Accept()
- if err != nil {
- select {
- case <-d.shutdown:
- return true
- default:
- log.Printf("daemon: accept error: %v", err)
- return false
- }
- }
- rpcConn := daemonrpc.NewConn(conn)
- d.addClient(rpcConn)
- go d.handleClient(rpcConn)
- return false
- }()
- if done {
- return
- }
- }
-}
-
-func (d *Daemon) handleClient(conn *daemonrpc.Conn) {
- defer d.removeClient(conn)
- defer conn.Close() //nolint:errcheck
-
- for {
- msg, err := conn.ReceiveMessage()
- if err != nil {
- // Client disconnected or read error.
- return
- }
- if msg.Request != nil {
- d.handleRequest(conn, msg.Request)
- }
- }
-}
-
-func (d *Daemon) addClient(conn *daemonrpc.Conn) {
- d.mu.Lock()
- defer d.mu.Unlock()
- d.clients[conn] = struct{}{}
- log.Println("daemon: client connected")
-}
-
-func (d *Daemon) removeClient(conn *daemonrpc.Conn) {
- d.mu.Lock()
- delete(d.clients, conn)
- d.mu.Unlock()
-
- d.subMu.Lock()
- delete(d.subscriptions, conn)
- d.subMu.Unlock()
-
- log.Println("daemon: client disconnected")
-}
-
-func (d *Daemon) closeAllClients() {
- d.mu.Lock()
- defer d.mu.Unlock()
- for conn := range d.clients {
- conn.Close() //nolint:errcheck,gosec
- }
- d.clients = make(map[*daemonrpc.Conn]struct{})
-}
-
func (d *Daemon) closeProviders() {
d.mu.Lock()
defer d.mu.Unlock()
@@ -277,28 +246,18 @@ func (d *Daemon) closeProviders() {
// broadcastEvent sends an event to all connected clients.
func (d *Daemon) broadcastEvent(eventType string, data interface{}) {
- d.mu.RLock()
- defer d.mu.RUnlock()
- for conn := range d.clients {
- if err := conn.SendEvent(eventType, data); err != nil {
- log.Printf("daemon: broadcast error: %v", err)
- }
- }
+ d.server.Broadcast(eventType, data)
}
// broadcastToSubscribers sends an event only to clients subscribed to the given account+folder.
func (d *Daemon) broadcastToSubscribers(accountID, folder, eventType string, data interface{}) {
key := accountID + ":" + folder
- d.subMu.RLock()
- defer d.subMu.RUnlock()
-
- for conn, subs := range d.subscriptions {
- if _, ok := subs[key]; ok {
- if err := conn.SendEvent(eventType, data); err != nil {
- log.Printf("daemon: subscriber broadcast error: %v", err)
- }
- }
- }
+ d.server.BroadcastFunc(eventType, data, func(conn *daemonrpc.Conn) bool {
+ d.subMu.RLock()
+ defer d.subMu.RUnlock()
+ _, ok := d.subscriptions[conn][key]
+ return ok
+ })
}
// getProvider returns the provider for the given account ID.
@@ -410,9 +369,7 @@ func (d *Daemon) syncAllAccounts(ctx context.Context) {
}
// Send desktop notification if TUI not connected.
- d.mu.RLock()
- noClients := len(d.clients) == 0
- d.mu.RUnlock()
+ noClients := len(d.server.Clients()) == 0
if noClients && newCount > 0 {
if !d.config.DisableNotifications {
@@ -455,9 +412,7 @@ func (d *Daemon) idleEventLoop() {
log.Printf("daemon: IDLE update for %s/%s", update.AccountID, update.FolderName)
// Desktop notification when no clients connected.
- d.mu.RLock()
- noClients := len(d.clients) == 0
- d.mu.RUnlock()
+ noClients := len(d.server.Clients()) == 0
if noClients && !d.config.DisableNotifications {
accountName := update.AccountID
@@ -1,6 +1,7 @@
package daemon
import (
+ "context"
"encoding/json"
"net"
"os"
@@ -68,23 +69,38 @@ func TestPIDFile_DeadProcess(t *testing.T) {
}
}
-// handlerTest sets up a client/server pipe and runs a single RPC exchange.
-// The handler runs in a goroutine so the pipe doesn't deadlock.
-func handlerTest(t *testing.T, d *Daemon, req *daemonrpc.Request) daemonrpc.Message {
+// serveDaemon starts d's RPC server on a temporary unix socket and returns a
+// connected client. The server and connection are torn down via t.Cleanup.
+func serveDaemon(t *testing.T, d *Daemon) *daemonrpc.Conn {
t.Helper()
- clientConn, serverConn := net.Pipe()
- defer clientConn.Close()
- defer serverConn.Close()
-
- server := daemonrpc.NewConn(serverConn)
- client := daemonrpc.NewConn(clientConn)
+ sock := filepath.Join(t.TempDir(), "d.sock")
+ l, err := net.Listen("unix", sock)
+ if err != nil {
+ t.Fatalf("listen: %v", err)
+ }
+ ctx, cancel := context.WithCancel(context.Background())
+ go func() { _ = d.server.Serve(ctx, l) }()
+ t.Cleanup(func() {
+ cancel()
+ _ = l.Close()
+ })
- // Handle request in goroutine (SendResponse blocks until client reads).
- go func() {
- d.handleRequest(server, req)
- }()
+ c, err := net.Dial("unix", sock)
+ if err != nil {
+ t.Fatalf("dial: %v", err)
+ }
+ conn := daemonrpc.NewConn(c)
+ t.Cleanup(func() { _ = conn.Close() })
+ return conn
+}
- msg, err := client.ReceiveMessage()
+// roundTrip sends a request and returns the decoded response message.
+func roundTrip(t *testing.T, conn *daemonrpc.Conn, req *daemonrpc.Request) daemonrpc.Message {
+ t.Helper()
+ if err := conn.Send(req); err != nil {
+ t.Fatal(err)
+ }
+ msg, err := conn.ReceiveMessage()
if err != nil {
t.Fatal(err)
}
@@ -92,34 +108,25 @@ func handlerTest(t *testing.T, d *Daemon, req *daemonrpc.Request) daemonrpc.Mess
}
func TestDaemon_PingHandler(t *testing.T) {
- d := &Daemon{shutdown: make(chan struct{})}
- msg := handlerTest(t, d, &daemonrpc.Request{ID: 1, Method: daemonrpc.MethodPing})
-
- if msg.Response == nil {
- t.Fatal("expected Response")
- }
- var result daemonrpc.PingResult
- if err := json.Unmarshal(msg.Response.Result, &result); err != nil {
- t.Fatalf("failed to unmarshal ping result: %v", err)
+ d := New(&config.Config{})
+ res, err := d.handlePing(context.Background(), nil, nil)
+ if err != nil {
+ t.Fatalf("handlePing: %v", err)
}
- if !result.Pong {
+ if !res.(daemonrpc.PingResult).Pong {
t.Error("expected pong=true")
}
}
func TestDaemon_StatusHandler(t *testing.T) {
- d := &Daemon{
- startTime: time.Now().Add(-2 * time.Minute),
- shutdown: make(chan struct{}),
- config: &config.Config{},
- }
+ d := New(&config.Config{})
+ d.startTime = time.Now().Add(-2 * time.Minute)
- msg := handlerTest(t, d, &daemonrpc.Request{ID: 1, Method: daemonrpc.MethodGetStatus})
-
- var result daemonrpc.StatusResult
- if err := json.Unmarshal(msg.Response.Result, &result); err != nil {
- t.Fatalf("failed to unmarshal status result: %v", err)
+ res, err := d.handleGetStatus(context.Background(), nil, nil)
+ if err != nil {
+ t.Fatalf("handleGetStatus: %v", err)
}
+ result := res.(daemonrpc.StatusResult)
if !result.Running {
t.Error("expected running=true")
@@ -130,10 +137,11 @@ func TestDaemon_StatusHandler(t *testing.T) {
}
func TestDaemon_UnknownMethod(t *testing.T) {
- d := &Daemon{shutdown: make(chan struct{})}
- msg := handlerTest(t, d, &daemonrpc.Request{ID: 1, Method: "DoesNotExist"})
+ d := New(&config.Config{})
+ conn := serveDaemon(t, d)
- if msg.Response.Error == nil {
+ msg := roundTrip(t, conn, &daemonrpc.Request{ID: 1, Method: "DoesNotExist"})
+ if msg.Response == nil || msg.Response.Error == nil {
t.Fatal("expected error for unknown method")
}
if msg.Response.Error.Code != daemonrpc.ErrCodeNotFound {
@@ -142,78 +150,52 @@ func TestDaemon_UnknownMethod(t *testing.T) {
}
func TestDaemon_Subscribe(t *testing.T) {
- d := &Daemon{
- subscriptions: make(map[*daemonrpc.Conn]map[string]struct{}),
- shutdown: make(chan struct{}),
- }
-
- clientConn, serverConn := net.Pipe()
- defer clientConn.Close()
- defer serverConn.Close()
-
- server := daemonrpc.NewConn(serverConn)
- client := daemonrpc.NewConn(clientConn)
+ d := New(&config.Config{})
+ conn := serveDaemon(t, d)
params, _ := json.Marshal(daemonrpc.SubscribeParams{
AccountID: "acc1",
Folder: "INBOX",
})
- go func() {
- d.handleRequest(server, &daemonrpc.Request{
- ID: 1,
- Method: daemonrpc.MethodSubscribe,
- Params: params,
- })
- }()
-
- // Read response.
- msg, err := client.ReceiveMessage()
- if err != nil {
- t.Fatal(err)
- }
+ msg := roundTrip(t, conn, &daemonrpc.Request{
+ ID: 1,
+ Method: daemonrpc.MethodSubscribe,
+ Params: params,
+ })
if msg.Response.Error != nil {
t.Errorf("unexpected error: %v", msg.Response.Error)
}
- // Verify subscription was recorded.
+ // The response is sent after the handler records the subscription, so it
+ // is visible by the time we read the reply.
d.subMu.RLock()
- subs, ok := d.subscriptions[server]
- d.subMu.RUnlock()
-
- if !ok {
- t.Fatal("expected subscription entry for connection")
- }
- if _, ok := subs["acc1:INBOX"]; !ok {
+ defer d.subMu.RUnlock()
+ found := false
+ for _, subs := range d.subscriptions {
+ if _, ok := subs["acc1:INBOX"]; ok {
+ found = true
+ }
+ }
+ if !found {
t.Error("expected subscription for acc1:INBOX")
}
}
func TestDaemon_BroadcastEvent(t *testing.T) {
- d := &Daemon{
- clients: make(map[*daemonrpc.Conn]struct{}),
- shutdown: make(chan struct{}),
- }
-
- clientConn, serverConn := net.Pipe()
- defer clientConn.Close()
- defer serverConn.Close()
+ d := New(&config.Config{})
+ conn := serveDaemon(t, d)
- server := daemonrpc.NewConn(serverConn)
- client := daemonrpc.NewConn(clientConn)
+ // Ping round-trip ensures the client is registered with the server before
+ // we broadcast.
+ roundTrip(t, conn, &daemonrpc.Request{ID: 1, Method: daemonrpc.MethodPing})
- d.mu.Lock()
- d.clients[server] = struct{}{}
- d.mu.Unlock()
-
- go func() {
- d.broadcastEvent(daemonrpc.EventNewMail, daemonrpc.NewMailEvent{
- AccountID: "acc1",
- Folder: "INBOX",
- })
- }()
+ d.broadcastEvent(daemonrpc.EventNewMail, daemonrpc.NewMailEvent{
+ AccountID: "acc1",
+ Folder: "INBOX",
+ })
- msg, err := client.ReceiveMessage()
+ msg, err := conn.ReceiveMessage()
if err != nil {
t.Fatal(err)
}
@@ -20,56 +20,29 @@ const (
mutateTimeout = 30 * time.Second
)
-func (d *Daemon) handleRequest(conn *daemonrpc.Conn, req *daemonrpc.Request) {
- switch req.Method {
- case daemonrpc.MethodPing:
- d.handlePing(conn, req)
- case daemonrpc.MethodGetStatus:
- d.handleGetStatus(conn, req)
- case daemonrpc.MethodGetAccounts:
- d.handleGetAccounts(conn, req)
- case daemonrpc.MethodReloadConfig:
- d.handleReloadConfig(conn, req)
- case daemonrpc.MethodFetchEmails:
- d.handleFetchEmails(conn, req)
- case daemonrpc.MethodFetchEmailBody:
- d.handleFetchEmailBody(conn, req)
- case daemonrpc.MethodDeleteEmails:
- d.handleDeleteEmails(conn, req)
- case daemonrpc.MethodArchiveEmails:
- d.handleArchiveEmails(conn, req)
- case daemonrpc.MethodMoveEmails:
- d.handleMoveEmails(conn, req)
- case daemonrpc.MethodMarkRead:
- d.handleMarkRead(conn, req)
- case daemonrpc.MethodFetchFolders:
- d.handleFetchFolders(conn, req)
- case daemonrpc.MethodRefreshFolder:
- d.handleRefreshFolder(conn, req)
- case daemonrpc.MethodSubscribe:
- d.handleSubscribe(conn, req)
- case daemonrpc.MethodUnsubscribe:
- d.handleUnsubscribe(conn, req)
- default:
- conn.SendError(req.ID, daemonrpc.ErrCodeNotFound, fmt.Sprintf("unknown method: %s", req.Method)) //nolint:errcheck,gosec
+// decodeParams unmarshals raw JSON params into T. A nil/empty payload yields
+// the zero value.
+func decodeParams[T any](params json.RawMessage) (T, error) {
+ var p T
+ if params != nil {
+ if err := json.Unmarshal(params, &p); err != nil {
+ return p, err
+ }
}
+ return p, nil
}
-func decodeParams[T any](req *daemonrpc.Request) (T, error) {
- var params T
- if req.Params != nil {
- if err := json.Unmarshal(req.Params, ¶ms); err != nil {
- return params, err
- }
- }
- return params, nil
+// parseError wraps a params-decoding failure with the parse error code so the
+// server forwards it verbatim instead of mapping to ErrCodeInternal.
+func parseError(err error) error {
+ return &daemonrpc.Error{Code: daemonrpc.ErrCodeParse, Message: err.Error()}
}
-func (d *Daemon) handlePing(conn *daemonrpc.Conn, req *daemonrpc.Request) {
- conn.SendResponse(req.ID, daemonrpc.PingResult{Pong: true}) //nolint:errcheck,gosec
+func (d *Daemon) handlePing(_ context.Context, _ *daemonrpc.Conn, _ json.RawMessage) (any, error) {
+ return daemonrpc.PingResult{Pong: true}, nil
}
-func (d *Daemon) handleGetStatus(conn *daemonrpc.Conn, req *daemonrpc.Request) {
+func (d *Daemon) handleGetStatus(_ context.Context, _ *daemonrpc.Conn, _ json.RawMessage) (any, error) {
d.mu.RLock()
accounts := make([]string, 0, len(d.config.Accounts))
for _, acct := range d.config.Accounts {
@@ -77,15 +50,15 @@ func (d *Daemon) handleGetStatus(conn *daemonrpc.Conn, req *daemonrpc.Request) {
}
d.mu.RUnlock()
- conn.SendResponse(req.ID, daemonrpc.StatusResult{ //nolint:errcheck,gosec
+ return daemonrpc.StatusResult{
Running: true,
Uptime: int64(time.Since(d.startTime).Seconds()),
Accounts: accounts,
PID: os.Getpid(),
- })
+ }, nil
}
-func (d *Daemon) handleGetAccounts(conn *daemonrpc.Conn, req *daemonrpc.Request) {
+func (d *Daemon) handleGetAccounts(_ context.Context, _ *daemonrpc.Conn, _ json.RawMessage) (any, error) {
d.mu.RLock()
defer d.mu.RUnlock()
@@ -102,62 +75,54 @@ func (d *Daemon) handleGetAccounts(conn *daemonrpc.Conn, req *daemonrpc.Request)
Protocol: protocol,
})
}
- conn.SendResponse(req.ID, infos) //nolint:errcheck,gosec
+ return infos, nil
}
-func (d *Daemon) handleReloadConfig(conn *daemonrpc.Conn, req *daemonrpc.Request) {
+func (d *Daemon) handleReloadConfig(_ context.Context, _ *daemonrpc.Conn, _ json.RawMessage) (any, error) {
if err := d.ReloadConfig(); err != nil {
- conn.SendError(req.ID, daemonrpc.ErrCodeInternal, err.Error()) //nolint:errcheck,gosec
- return
+ return nil, err
}
- conn.SendResponse(req.ID, true) //nolint:errcheck,gosec
+ return true, nil
}
-func (d *Daemon) handleFetchEmails(conn *daemonrpc.Conn, req *daemonrpc.Request) {
- params, err := decodeParams[daemonrpc.FetchEmailsParams](req)
+func (d *Daemon) handleFetchEmails(ctx context.Context, _ *daemonrpc.Conn, params json.RawMessage) (any, error) {
+ args, err := decodeParams[daemonrpc.FetchEmailsParams](params)
if err != nil {
- conn.SendError(req.ID, daemonrpc.ErrCodeParse, err.Error()) //nolint:errcheck,gosec
- return
+ return nil, parseError(err)
}
- p, err := d.getProvider(params.AccountID)
+ p, err := d.getProvider(args.AccountID)
if err != nil {
- conn.SendError(req.ID, daemonrpc.ErrCodeInternal, err.Error()) //nolint:errcheck,gosec
- return
+ return nil, err
}
- ctx, cancel := context.WithTimeout(context.Background(), fetchTimeout)
+ ctx, cancel := context.WithTimeout(ctx, fetchTimeout)
defer cancel()
- emails, err := p.FetchEmails(ctx, params.Folder, params.Limit, params.Offset)
+ emails, err := p.FetchEmails(ctx, args.Folder, args.Limit, args.Offset)
if err != nil {
- conn.SendError(req.ID, daemonrpc.ErrCodeInternal, err.Error()) //nolint:errcheck,gosec
- return
+ return nil, err
}
-
- conn.SendResponse(req.ID, emails) //nolint:errcheck,gosec
+ return emails, nil
}
-func (d *Daemon) handleFetchEmailBody(conn *daemonrpc.Conn, req *daemonrpc.Request) {
- params, err := decodeParams[daemonrpc.FetchEmailBodyParams](req)
+func (d *Daemon) handleFetchEmailBody(ctx context.Context, _ *daemonrpc.Conn, params json.RawMessage) (any, error) {
+ args, err := decodeParams[daemonrpc.FetchEmailBodyParams](params)
if err != nil {
- conn.SendError(req.ID, daemonrpc.ErrCodeParse, err.Error()) //nolint:errcheck,gosec
- return
+ return nil, parseError(err)
}
- p, err := d.getProvider(params.AccountID)
+ p, err := d.getProvider(args.AccountID)
if err != nil {
- conn.SendError(req.ID, daemonrpc.ErrCodeInternal, err.Error()) //nolint:errcheck,gosec
- return
+ return nil, err
}
- ctx, cancel := context.WithTimeout(context.Background(), fetchTimeout)
+ ctx, cancel := context.WithTimeout(ctx, fetchTimeout)
defer cancel()
- body, mimeType, attachments, err := p.FetchEmailBody(ctx, params.Folder, params.UID)
+ body, mimeType, attachments, err := p.FetchEmailBody(ctx, args.Folder, args.UID)
if err != nil {
- conn.SendError(req.ID, daemonrpc.ErrCodeInternal, err.Error()) //nolint:errcheck,gosec
- return
+ return nil, err
}
// Convert backend.Attachment to daemonrpc.AttachmentInfo for wire transfer.
@@ -171,195 +136,180 @@ func (d *Daemon) handleFetchEmailBody(conn *daemonrpc.Conn, req *daemonrpc.Reque
})
}
- conn.SendResponse(req.ID, daemonrpc.FetchEmailBodyResult{ //nolint:errcheck,gosec
+ return daemonrpc.FetchEmailBodyResult{
Body: body,
BodyMIMEType: mimeType,
Attachments: attInfos,
- })
+ }, nil
}
-func (d *Daemon) handleDeleteEmails(conn *daemonrpc.Conn, req *daemonrpc.Request) {
- params, err := decodeParams[daemonrpc.DeleteEmailsParams](req)
+func (d *Daemon) handleDeleteEmails(ctx context.Context, _ *daemonrpc.Conn, params json.RawMessage) (any, error) {
+ args, err := decodeParams[daemonrpc.DeleteEmailsParams](params)
if err != nil {
- conn.SendError(req.ID, daemonrpc.ErrCodeParse, err.Error()) //nolint:errcheck,gosec
- return
+ return nil, parseError(err)
}
- p, err := d.getProvider(params.AccountID)
+ p, err := d.getProvider(args.AccountID)
if err != nil {
- conn.SendError(req.ID, daemonrpc.ErrCodeInternal, err.Error()) //nolint:errcheck,gosec
- return
+ return nil, err
}
- ctx, cancel := context.WithTimeout(context.Background(), mutateTimeout)
+ ctx, cancel := context.WithTimeout(ctx, mutateTimeout)
defer cancel()
- if err := p.DeleteEmails(ctx, params.Folder, params.UIDs); err != nil {
- conn.SendError(req.ID, daemonrpc.ErrCodeInternal, err.Error()) //nolint:errcheck,gosec
- return
+ if err := p.DeleteEmails(ctx, args.Folder, args.UIDs); err != nil {
+ return nil, err
}
- conn.SendResponse(req.ID, true) //nolint:errcheck,gosec
+ return true, nil
}
-func (d *Daemon) handleArchiveEmails(conn *daemonrpc.Conn, req *daemonrpc.Request) {
- params, err := decodeParams[daemonrpc.ArchiveEmailsParams](req)
+func (d *Daemon) handleArchiveEmails(ctx context.Context, _ *daemonrpc.Conn, params json.RawMessage) (any, error) {
+ args, err := decodeParams[daemonrpc.ArchiveEmailsParams](params)
if err != nil {
- conn.SendError(req.ID, daemonrpc.ErrCodeParse, err.Error()) //nolint:errcheck,gosec
- return
+ return nil, parseError(err)
}
- p, err := d.getProvider(params.AccountID)
+ p, err := d.getProvider(args.AccountID)
if err != nil {
- conn.SendError(req.ID, daemonrpc.ErrCodeInternal, err.Error()) //nolint:errcheck,gosec
- return
+ return nil, err
}
- ctx, cancel := context.WithTimeout(context.Background(), mutateTimeout)
+ ctx, cancel := context.WithTimeout(ctx, mutateTimeout)
defer cancel()
- if err := p.ArchiveEmails(ctx, params.Folder, params.UIDs); err != nil {
- conn.SendError(req.ID, daemonrpc.ErrCodeInternal, err.Error()) //nolint:errcheck,gosec
- return
+ if err := p.ArchiveEmails(ctx, args.Folder, args.UIDs); err != nil {
+ return nil, err
}
- conn.SendResponse(req.ID, true) //nolint:errcheck,gosec
+ return true, nil
}
-func (d *Daemon) handleMoveEmails(conn *daemonrpc.Conn, req *daemonrpc.Request) {
- params, err := decodeParams[daemonrpc.MoveEmailsParams](req)
+func (d *Daemon) handleMoveEmails(ctx context.Context, _ *daemonrpc.Conn, params json.RawMessage) (any, error) {
+ args, err := decodeParams[daemonrpc.MoveEmailsParams](params)
if err != nil {
- conn.SendError(req.ID, daemonrpc.ErrCodeParse, err.Error()) //nolint:errcheck,gosec
- return
+ return nil, parseError(err)
}
- p, err := d.getProvider(params.AccountID)
+ p, err := d.getProvider(args.AccountID)
if err != nil {
- conn.SendError(req.ID, daemonrpc.ErrCodeInternal, err.Error()) //nolint:errcheck,gosec
- return
+ return nil, err
}
- ctx, cancel := context.WithTimeout(context.Background(), mutateTimeout)
+ ctx, cancel := context.WithTimeout(ctx, mutateTimeout)
defer cancel()
- if err := p.MoveEmails(ctx, params.UIDs, params.SourceFolder, params.DestFolder); err != nil {
- conn.SendError(req.ID, daemonrpc.ErrCodeInternal, err.Error()) //nolint:errcheck,gosec
- return
+ if err := p.MoveEmails(ctx, args.UIDs, args.SourceFolder, args.DestFolder); err != nil {
+ return nil, err
}
- conn.SendResponse(req.ID, true) //nolint:errcheck,gosec
+ return true, nil
}
-func (d *Daemon) handleMarkRead(conn *daemonrpc.Conn, req *daemonrpc.Request) {
- params, err := decodeParams[daemonrpc.MarkReadParams](req)
+func (d *Daemon) handleMarkRead(ctx context.Context, _ *daemonrpc.Conn, params json.RawMessage) (any, error) {
+ args, err := decodeParams[daemonrpc.MarkReadParams](params)
if err != nil {
- conn.SendError(req.ID, daemonrpc.ErrCodeParse, err.Error()) //nolint:errcheck,gosec
- return
+ return nil, parseError(err)
}
- p, err := d.getProvider(params.AccountID)
+ p, err := d.getProvider(args.AccountID)
if err != nil {
- conn.SendError(req.ID, daemonrpc.ErrCodeInternal, err.Error()) //nolint:errcheck,gosec
- return
+ return nil, err
}
- ctx, cancel := context.WithTimeout(context.Background(), mutateTimeout)
+ ctx, cancel := context.WithTimeout(ctx, mutateTimeout)
defer cancel()
- for _, uid := range params.UIDs {
+ for _, uid := range args.UIDs {
var err error
- if params.Read {
- err = p.MarkAsRead(ctx, params.Folder, uid)
+ if args.Read {
+ err = p.MarkAsRead(ctx, args.Folder, uid)
} else {
- err = p.MarkAsUnread(ctx, params.Folder, uid)
+ err = p.MarkAsUnread(ctx, args.Folder, uid)
}
if err != nil {
- log.Printf("daemon: mark read=%v %d failed: %v", params.Read, uid, err)
+ log.Printf("daemon: mark read=%v %d failed: %v", args.Read, uid, err)
}
}
- conn.SendResponse(req.ID, true) //nolint:errcheck,gosec
+ return true, nil
}
-func (d *Daemon) handleFetchFolders(conn *daemonrpc.Conn, req *daemonrpc.Request) {
- params, err := decodeParams[daemonrpc.FetchFoldersParams](req)
+func (d *Daemon) handleFetchFolders(ctx context.Context, _ *daemonrpc.Conn, params json.RawMessage) (any, error) {
+ args, err := decodeParams[daemonrpc.FetchFoldersParams](params)
if err != nil {
- conn.SendError(req.ID, daemonrpc.ErrCodeParse, err.Error()) //nolint:errcheck,gosec
- return
+ return nil, parseError(err)
}
- p, err := d.getProvider(params.AccountID)
+ p, err := d.getProvider(args.AccountID)
if err != nil {
- conn.SendError(req.ID, daemonrpc.ErrCodeInternal, err.Error()) //nolint:errcheck,gosec
- return
+ return nil, err
}
- ctx, cancel := context.WithTimeout(context.Background(), mutateTimeout)
+ ctx, cancel := context.WithTimeout(ctx, mutateTimeout)
defer cancel()
folders, err := p.FetchFolders(ctx)
if err != nil {
- conn.SendError(req.ID, daemonrpc.ErrCodeInternal, err.Error()) //nolint:errcheck,gosec
- return
+ return nil, err
}
- conn.SendResponse(req.ID, folders) //nolint:errcheck,gosec
+ return folders, nil
}
-func (d *Daemon) handleRefreshFolder(conn *daemonrpc.Conn, req *daemonrpc.Request) {
- params, err := decodeParams[daemonrpc.RefreshFolderParams](req)
+func (d *Daemon) handleRefreshFolder(ctx context.Context, _ *daemonrpc.Conn, params json.RawMessage) (any, error) {
+ args, err := decodeParams[daemonrpc.RefreshFolderParams](params)
if err != nil {
- conn.SendError(req.ID, daemonrpc.ErrCodeParse, err.Error()) //nolint:errcheck,gosec
- return
+ return nil, parseError(err)
}
- // Async: fetch in background, push events when done.
+ // Async: fetch in background, push events when done. The server-scoped ctx
+ // outlives the request and is canceled on daemon shutdown.
go func() {
defer func() {
if r := recover(); r != nil {
- log.Printf("daemon: refresh panic for account = %s folder = %s: %v", params.AccountID, params.Folder, r)
- d.broadcastToSubscribers(params.AccountID, params.Folder, daemonrpc.EventSyncError, daemonrpc.SyncErrorEvent{
- AccountID: params.AccountID,
- Folder: params.Folder,
+ log.Printf("daemon: refresh panic for account = %s folder = %s: %v", args.AccountID, args.Folder, r)
+ d.broadcastToSubscribers(args.AccountID, args.Folder, daemonrpc.EventSyncError, daemonrpc.SyncErrorEvent{
+ AccountID: args.AccountID,
+ Folder: args.Folder,
Error: fmt.Sprintf("panic: %v", r),
})
}
}()
- p, err := d.getProvider(params.AccountID)
+ p, err := d.getProvider(args.AccountID)
if err != nil {
log.Printf("daemon: refresh provider error: %v", err)
return
}
- d.broadcastToSubscribers(params.AccountID, params.Folder, daemonrpc.EventSyncStarted, daemonrpc.SyncStartedEvent(params))
+ d.broadcastToSubscribers(args.AccountID, args.Folder, daemonrpc.EventSyncStarted, daemonrpc.SyncStartedEvent(args))
- ctx, cancel := context.WithTimeout(context.Background(), fetchTimeout)
+ fetchCtx, cancel := context.WithTimeout(ctx, fetchTimeout)
defer cancel()
- emails, err := p.FetchEmails(ctx, params.Folder, 50, 0)
+ emails, err := p.FetchEmails(fetchCtx, args.Folder, 50, 0)
if err != nil {
- d.broadcastToSubscribers(params.AccountID, params.Folder, daemonrpc.EventSyncError, daemonrpc.SyncErrorEvent{
- AccountID: params.AccountID,
- Folder: params.Folder,
+ d.broadcastToSubscribers(args.AccountID, args.Folder, daemonrpc.EventSyncError, daemonrpc.SyncErrorEvent{
+ AccountID: args.AccountID,
+ Folder: args.Folder,
Error: err.Error(),
})
return
}
- d.broadcastToSubscribers(params.AccountID, params.Folder, daemonrpc.EventSyncComplete, daemonrpc.SyncCompleteEvent{
- AccountID: params.AccountID,
- Folder: params.Folder,
+ d.broadcastToSubscribers(args.AccountID, args.Folder, daemonrpc.EventSyncComplete, daemonrpc.SyncCompleteEvent{
+ AccountID: args.AccountID,
+ Folder: args.Folder,
EmailCount: len(emails),
})
}()
- conn.SendResponse(req.ID, true) //nolint:errcheck,gosec
+ return true, nil
}
-func (d *Daemon) handleSubscribe(conn *daemonrpc.Conn, req *daemonrpc.Request) {
- params, err := decodeParams[daemonrpc.SubscribeParams](req)
+func (d *Daemon) handleSubscribe(_ context.Context, conn *daemonrpc.Conn, params json.RawMessage) (any, error) {
+ args, err := decodeParams[daemonrpc.SubscribeParams](params)
if err != nil {
- conn.SendError(req.ID, daemonrpc.ErrCodeParse, err.Error()) //nolint:errcheck,gosec
- return
+ return nil, parseError(err)
}
- key := params.AccountID + ":" + params.Folder
+ key := args.AccountID + ":" + args.Folder
d.subMu.Lock()
if d.subscriptions[conn] == nil {
@@ -369,17 +319,16 @@ func (d *Daemon) handleSubscribe(conn *daemonrpc.Conn, req *daemonrpc.Request) {
d.subMu.Unlock()
log.Printf("daemon: client subscribed to %s", key)
- conn.SendResponse(req.ID, true) //nolint:errcheck,gosec
+ return true, nil
}
-func (d *Daemon) handleUnsubscribe(conn *daemonrpc.Conn, req *daemonrpc.Request) {
- params, err := decodeParams[daemonrpc.UnsubscribeParams](req)
+func (d *Daemon) handleUnsubscribe(_ context.Context, conn *daemonrpc.Conn, params json.RawMessage) (any, error) {
+ args, err := decodeParams[daemonrpc.UnsubscribeParams](params)
if err != nil {
- conn.SendError(req.ID, daemonrpc.ErrCodeParse, err.Error()) //nolint:errcheck,gosec
- return
+ return nil, parseError(err)
}
- key := params.AccountID + ":" + params.Folder
+ key := args.AccountID + ":" + args.Folder
d.subMu.Lock()
if subs, ok := d.subscriptions[conn]; ok {
@@ -387,5 +336,5 @@ func (d *Daemon) handleUnsubscribe(conn *daemonrpc.Conn, req *daemonrpc.Request)
}
d.subMu.Unlock()
- conn.SendResponse(req.ID, true) //nolint:errcheck,gosec
+ return true, nil
}