diff --git a/daemon/daemon.go b/daemon/daemon.go index bd0b20c65ab92c3a9fff1eeb77c8272a31cd502a..e478638eb90d367fb937c036bdc9119dd98d6975 100644 --- a/daemon/daemon.go +++ b/daemon/daemon.go @@ -6,11 +6,11 @@ import ( "log" "net" "os" - "runtime/debug" "sort" "sync" "time" + udsrpc "github.com/floatpane/go-uds-jsonrpc" "github.com/floatpane/matcha/backend" "github.com/floatpane/matcha/config" "github.com/floatpane/matcha/daemonrpc" @@ -25,12 +25,10 @@ const inboxFolder = "INBOX" type Daemon struct { config *config.Config providers map[string]backend.Provider - listener net.Listener + server *udsrpc.Server startTime time.Time - // Connected TUI/CLI clients. - clients map[*daemonrpc.Conn]struct{} - mu sync.RWMutex + mu sync.RWMutex // Per-client subscriptions: conn → set of "accountID:folder". subscriptions map[*daemonrpc.Conn]map[string]struct{} @@ -53,16 +51,47 @@ type Daemon struct { // New creates a daemon with the given config. func New(cfg *config.Config) *Daemon { idleUpdates := make(chan fetcher.IdleUpdate, 16) - return &Daemon{ + d := &Daemon{ config: cfg, providers: make(map[string]backend.Provider), - clients: make(map[*daemonrpc.Conn]struct{}), subscriptions: make(map[*daemonrpc.Conn]map[string]struct{}), idleWatcher: fetcher.NewIdleWatcher(idleUpdates), idleUpdates: idleUpdates, shutdown: make(chan struct{}), done: make(chan struct{}), } + + d.server = udsrpc.NewServer() + d.registerHandlers() + d.server.OnConnect(func(_ *daemonrpc.Conn) { + log.Println("daemon: client connected") + }) + d.server.OnDisconnect(func(conn *daemonrpc.Conn) { + d.subMu.Lock() + delete(d.subscriptions, conn) + d.subMu.Unlock() + log.Println("daemon: client disconnected") + }) + + return d +} + +// registerHandlers wires each RPC method to its handler on the server. +func (d *Daemon) registerHandlers() { + d.server.Handle(daemonrpc.MethodPing, d.handlePing) + d.server.Handle(daemonrpc.MethodGetStatus, d.handleGetStatus) + d.server.Handle(daemonrpc.MethodGetAccounts, d.handleGetAccounts) + d.server.Handle(daemonrpc.MethodReloadConfig, d.handleReloadConfig) + d.server.Handle(daemonrpc.MethodFetchEmails, d.handleFetchEmails) + d.server.Handle(daemonrpc.MethodFetchEmailBody, d.handleFetchEmailBody) + d.server.Handle(daemonrpc.MethodDeleteEmails, d.handleDeleteEmails) + d.server.Handle(daemonrpc.MethodArchiveEmails, d.handleArchiveEmails) + d.server.Handle(daemonrpc.MethodMoveEmails, d.handleMoveEmails) + d.server.Handle(daemonrpc.MethodMarkRead, d.handleMarkRead) + d.server.Handle(daemonrpc.MethodFetchFolders, d.handleFetchFolders) + d.server.Handle(daemonrpc.MethodRefreshFolder, d.handleRefreshFolder) + d.server.Handle(daemonrpc.MethodSubscribe, d.handleSubscribe) + d.server.Handle(daemonrpc.MethodUnsubscribe, d.handleUnsubscribe) } // Run starts the daemon: creates providers, starts the socket listener, @@ -94,12 +123,11 @@ func (d *Daemon) Run() error { } // Listen on Unix domain socket. - var err error - d.listener, err = net.Listen("unix", sockPath) //nolint:noctx + listener, err := net.Listen("unix", sockPath) //nolint:noctx if err != nil { return fmt.Errorf("listen: %w", err) } - defer d.listener.Close() //nolint:errcheck + defer listener.Close() //nolint:errcheck // Set socket permissions (owner only). if err := os.Chmod(sockPath, 0700); err != nil { // #nosec G302 @@ -115,28 +143,42 @@ func (d *Daemon) Run() error { d.startIdleWatchers() go d.idleEventLoop() - // Start signal handler. - go d.handleSignals() + // Handle OS signals: SIGTERM/SIGINT → shutdown, SIGHUP → reload config. + stopSignals := udsrpc.HandleSignals(d.Shutdown, func() { + log.Println("daemon: received SIGHUP, reloading config") + if err := d.ReloadConfig(); err != nil { + log.Printf("daemon: config reload failed: %v", err) + } + }) + defer stopSignals() // Start background sync. ctx, cancel := context.WithCancel(context.Background()) d.syncCancel = cancel go d.backgroundSync(ctx) - // Accept client connections. - go d.acceptLoop() + // Serve client connections via the shared RPC server. Canceling serveCtx + // closes the listener and unblocks Serve. + serveCtx, serveCancel := context.WithCancel(context.Background()) + go func() { + if err := d.server.Serve(serveCtx, listener); err != nil { + log.Printf("daemon: serve error: %v", err) + } + }() // Block until shutdown. <-d.shutdown // Cleanup. log.Println("daemon: shutting down") - d.listener.Close() //nolint:errcheck,gosec + serveCancel() + for _, conn := range d.server.Clients() { + conn.Close() //nolint:errcheck,gosec + } if err := d.idleWatcher.StopAllAndWaitTimeout(5 * time.Second); err != nil { log.Printf("daemon: %v", err) } cancel() - d.closeAllClients() d.closeProviders() close(d.done) @@ -192,79 +234,6 @@ func (d *Daemon) initProviders() { } } -func (d *Daemon) acceptLoop() { - for { - done := func() bool { - defer func() { - if r := recover(); r != nil { - log.Printf("daemon: acceptLoop panic recovered: %v\n%s", r, debug.Stack()) - } - }() - conn, err := d.listener.Accept() - if err != nil { - select { - case <-d.shutdown: - return true - default: - log.Printf("daemon: accept error: %v", err) - return false - } - } - rpcConn := daemonrpc.NewConn(conn) - d.addClient(rpcConn) - go d.handleClient(rpcConn) - return false - }() - if done { - return - } - } -} - -func (d *Daemon) handleClient(conn *daemonrpc.Conn) { - defer d.removeClient(conn) - defer conn.Close() //nolint:errcheck - - for { - msg, err := conn.ReceiveMessage() - if err != nil { - // Client disconnected or read error. - return - } - if msg.Request != nil { - d.handleRequest(conn, msg.Request) - } - } -} - -func (d *Daemon) addClient(conn *daemonrpc.Conn) { - d.mu.Lock() - defer d.mu.Unlock() - d.clients[conn] = struct{}{} - log.Println("daemon: client connected") -} - -func (d *Daemon) removeClient(conn *daemonrpc.Conn) { - d.mu.Lock() - delete(d.clients, conn) - d.mu.Unlock() - - d.subMu.Lock() - delete(d.subscriptions, conn) - d.subMu.Unlock() - - log.Println("daemon: client disconnected") -} - -func (d *Daemon) closeAllClients() { - d.mu.Lock() - defer d.mu.Unlock() - for conn := range d.clients { - conn.Close() //nolint:errcheck,gosec - } - d.clients = make(map[*daemonrpc.Conn]struct{}) -} - func (d *Daemon) closeProviders() { d.mu.Lock() defer d.mu.Unlock() @@ -277,28 +246,18 @@ func (d *Daemon) closeProviders() { // broadcastEvent sends an event to all connected clients. func (d *Daemon) broadcastEvent(eventType string, data interface{}) { - d.mu.RLock() - defer d.mu.RUnlock() - for conn := range d.clients { - if err := conn.SendEvent(eventType, data); err != nil { - log.Printf("daemon: broadcast error: %v", err) - } - } + d.server.Broadcast(eventType, data) } // broadcastToSubscribers sends an event only to clients subscribed to the given account+folder. func (d *Daemon) broadcastToSubscribers(accountID, folder, eventType string, data interface{}) { key := accountID + ":" + folder - d.subMu.RLock() - defer d.subMu.RUnlock() - - for conn, subs := range d.subscriptions { - if _, ok := subs[key]; ok { - if err := conn.SendEvent(eventType, data); err != nil { - log.Printf("daemon: subscriber broadcast error: %v", err) - } - } - } + d.server.BroadcastFunc(eventType, data, func(conn *daemonrpc.Conn) bool { + d.subMu.RLock() + defer d.subMu.RUnlock() + _, ok := d.subscriptions[conn][key] + return ok + }) } // getProvider returns the provider for the given account ID. @@ -410,9 +369,7 @@ func (d *Daemon) syncAllAccounts(ctx context.Context) { } // Send desktop notification if TUI not connected. - d.mu.RLock() - noClients := len(d.clients) == 0 - d.mu.RUnlock() + noClients := len(d.server.Clients()) == 0 if noClients && newCount > 0 { if !d.config.DisableNotifications { @@ -455,9 +412,7 @@ func (d *Daemon) idleEventLoop() { log.Printf("daemon: IDLE update for %s/%s", update.AccountID, update.FolderName) // Desktop notification when no clients connected. - d.mu.RLock() - noClients := len(d.clients) == 0 - d.mu.RUnlock() + noClients := len(d.server.Clients()) == 0 if noClients && !d.config.DisableNotifications { accountName := update.AccountID diff --git a/daemon/daemon_test.go b/daemon/daemon_test.go index e6fe250ab6617c9026b5f8851391ffb42cb55399..3d21ee78be5952fddfea619d93abbe2f507fa145 100644 --- a/daemon/daemon_test.go +++ b/daemon/daemon_test.go @@ -1,6 +1,7 @@ package daemon import ( + "context" "encoding/json" "net" "os" @@ -68,23 +69,38 @@ func TestPIDFile_DeadProcess(t *testing.T) { } } -// handlerTest sets up a client/server pipe and runs a single RPC exchange. -// The handler runs in a goroutine so the pipe doesn't deadlock. -func handlerTest(t *testing.T, d *Daemon, req *daemonrpc.Request) daemonrpc.Message { +// serveDaemon starts d's RPC server on a temporary unix socket and returns a +// connected client. The server and connection are torn down via t.Cleanup. +func serveDaemon(t *testing.T, d *Daemon) *daemonrpc.Conn { t.Helper() - clientConn, serverConn := net.Pipe() - defer clientConn.Close() - defer serverConn.Close() - - server := daemonrpc.NewConn(serverConn) - client := daemonrpc.NewConn(clientConn) + sock := filepath.Join(t.TempDir(), "d.sock") + l, err := net.Listen("unix", sock) + if err != nil { + t.Fatalf("listen: %v", err) + } + ctx, cancel := context.WithCancel(context.Background()) + go func() { _ = d.server.Serve(ctx, l) }() + t.Cleanup(func() { + cancel() + _ = l.Close() + }) - // Handle request in goroutine (SendResponse blocks until client reads). - go func() { - d.handleRequest(server, req) - }() + c, err := net.Dial("unix", sock) + if err != nil { + t.Fatalf("dial: %v", err) + } + conn := daemonrpc.NewConn(c) + t.Cleanup(func() { _ = conn.Close() }) + return conn +} - msg, err := client.ReceiveMessage() +// roundTrip sends a request and returns the decoded response message. +func roundTrip(t *testing.T, conn *daemonrpc.Conn, req *daemonrpc.Request) daemonrpc.Message { + t.Helper() + if err := conn.Send(req); err != nil { + t.Fatal(err) + } + msg, err := conn.ReceiveMessage() if err != nil { t.Fatal(err) } @@ -92,34 +108,25 @@ func handlerTest(t *testing.T, d *Daemon, req *daemonrpc.Request) daemonrpc.Mess } func TestDaemon_PingHandler(t *testing.T) { - d := &Daemon{shutdown: make(chan struct{})} - msg := handlerTest(t, d, &daemonrpc.Request{ID: 1, Method: daemonrpc.MethodPing}) - - if msg.Response == nil { - t.Fatal("expected Response") - } - var result daemonrpc.PingResult - if err := json.Unmarshal(msg.Response.Result, &result); err != nil { - t.Fatalf("failed to unmarshal ping result: %v", err) + d := New(&config.Config{}) + res, err := d.handlePing(context.Background(), nil, nil) + if err != nil { + t.Fatalf("handlePing: %v", err) } - if !result.Pong { + if !res.(daemonrpc.PingResult).Pong { t.Error("expected pong=true") } } func TestDaemon_StatusHandler(t *testing.T) { - d := &Daemon{ - startTime: time.Now().Add(-2 * time.Minute), - shutdown: make(chan struct{}), - config: &config.Config{}, - } + d := New(&config.Config{}) + d.startTime = time.Now().Add(-2 * time.Minute) - msg := handlerTest(t, d, &daemonrpc.Request{ID: 1, Method: daemonrpc.MethodGetStatus}) - - var result daemonrpc.StatusResult - if err := json.Unmarshal(msg.Response.Result, &result); err != nil { - t.Fatalf("failed to unmarshal status result: %v", err) + res, err := d.handleGetStatus(context.Background(), nil, nil) + if err != nil { + t.Fatalf("handleGetStatus: %v", err) } + result := res.(daemonrpc.StatusResult) if !result.Running { t.Error("expected running=true") @@ -130,10 +137,11 @@ func TestDaemon_StatusHandler(t *testing.T) { } func TestDaemon_UnknownMethod(t *testing.T) { - d := &Daemon{shutdown: make(chan struct{})} - msg := handlerTest(t, d, &daemonrpc.Request{ID: 1, Method: "DoesNotExist"}) + d := New(&config.Config{}) + conn := serveDaemon(t, d) - if msg.Response.Error == nil { + msg := roundTrip(t, conn, &daemonrpc.Request{ID: 1, Method: "DoesNotExist"}) + if msg.Response == nil || msg.Response.Error == nil { t.Fatal("expected error for unknown method") } if msg.Response.Error.Code != daemonrpc.ErrCodeNotFound { @@ -142,78 +150,52 @@ func TestDaemon_UnknownMethod(t *testing.T) { } func TestDaemon_Subscribe(t *testing.T) { - d := &Daemon{ - subscriptions: make(map[*daemonrpc.Conn]map[string]struct{}), - shutdown: make(chan struct{}), - } - - clientConn, serverConn := net.Pipe() - defer clientConn.Close() - defer serverConn.Close() - - server := daemonrpc.NewConn(serverConn) - client := daemonrpc.NewConn(clientConn) + d := New(&config.Config{}) + conn := serveDaemon(t, d) params, _ := json.Marshal(daemonrpc.SubscribeParams{ AccountID: "acc1", Folder: "INBOX", }) - go func() { - d.handleRequest(server, &daemonrpc.Request{ - ID: 1, - Method: daemonrpc.MethodSubscribe, - Params: params, - }) - }() - - // Read response. - msg, err := client.ReceiveMessage() - if err != nil { - t.Fatal(err) - } + msg := roundTrip(t, conn, &daemonrpc.Request{ + ID: 1, + Method: daemonrpc.MethodSubscribe, + Params: params, + }) if msg.Response.Error != nil { t.Errorf("unexpected error: %v", msg.Response.Error) } - // Verify subscription was recorded. + // The response is sent after the handler records the subscription, so it + // is visible by the time we read the reply. d.subMu.RLock() - subs, ok := d.subscriptions[server] - d.subMu.RUnlock() - - if !ok { - t.Fatal("expected subscription entry for connection") - } - if _, ok := subs["acc1:INBOX"]; !ok { + defer d.subMu.RUnlock() + found := false + for _, subs := range d.subscriptions { + if _, ok := subs["acc1:INBOX"]; ok { + found = true + } + } + if !found { t.Error("expected subscription for acc1:INBOX") } } func TestDaemon_BroadcastEvent(t *testing.T) { - d := &Daemon{ - clients: make(map[*daemonrpc.Conn]struct{}), - shutdown: make(chan struct{}), - } - - clientConn, serverConn := net.Pipe() - defer clientConn.Close() - defer serverConn.Close() + d := New(&config.Config{}) + conn := serveDaemon(t, d) - server := daemonrpc.NewConn(serverConn) - client := daemonrpc.NewConn(clientConn) + // Ping round-trip ensures the client is registered with the server before + // we broadcast. + roundTrip(t, conn, &daemonrpc.Request{ID: 1, Method: daemonrpc.MethodPing}) - d.mu.Lock() - d.clients[server] = struct{}{} - d.mu.Unlock() - - go func() { - d.broadcastEvent(daemonrpc.EventNewMail, daemonrpc.NewMailEvent{ - AccountID: "acc1", - Folder: "INBOX", - }) - }() + d.broadcastEvent(daemonrpc.EventNewMail, daemonrpc.NewMailEvent{ + AccountID: "acc1", + Folder: "INBOX", + }) - msg, err := client.ReceiveMessage() + msg, err := conn.ReceiveMessage() if err != nil { t.Fatal(err) } diff --git a/daemon/handler.go b/daemon/handler.go index dd8319a655f4e68a4804f0c649681f27c5ef2907..df43fd2b1d3a975e995c1ea08f72c0bf982a4095 100644 --- a/daemon/handler.go +++ b/daemon/handler.go @@ -20,56 +20,29 @@ const ( mutateTimeout = 30 * time.Second ) -func (d *Daemon) handleRequest(conn *daemonrpc.Conn, req *daemonrpc.Request) { - switch req.Method { - case daemonrpc.MethodPing: - d.handlePing(conn, req) - case daemonrpc.MethodGetStatus: - d.handleGetStatus(conn, req) - case daemonrpc.MethodGetAccounts: - d.handleGetAccounts(conn, req) - case daemonrpc.MethodReloadConfig: - d.handleReloadConfig(conn, req) - case daemonrpc.MethodFetchEmails: - d.handleFetchEmails(conn, req) - case daemonrpc.MethodFetchEmailBody: - d.handleFetchEmailBody(conn, req) - case daemonrpc.MethodDeleteEmails: - d.handleDeleteEmails(conn, req) - case daemonrpc.MethodArchiveEmails: - d.handleArchiveEmails(conn, req) - case daemonrpc.MethodMoveEmails: - d.handleMoveEmails(conn, req) - case daemonrpc.MethodMarkRead: - d.handleMarkRead(conn, req) - case daemonrpc.MethodFetchFolders: - d.handleFetchFolders(conn, req) - case daemonrpc.MethodRefreshFolder: - d.handleRefreshFolder(conn, req) - case daemonrpc.MethodSubscribe: - d.handleSubscribe(conn, req) - case daemonrpc.MethodUnsubscribe: - d.handleUnsubscribe(conn, req) - default: - conn.SendError(req.ID, daemonrpc.ErrCodeNotFound, fmt.Sprintf("unknown method: %s", req.Method)) //nolint:errcheck,gosec +// decodeParams unmarshals raw JSON params into T. A nil/empty payload yields +// the zero value. +func decodeParams[T any](params json.RawMessage) (T, error) { + var p T + if params != nil { + if err := json.Unmarshal(params, &p); err != nil { + return p, err + } } + return p, nil } -func decodeParams[T any](req *daemonrpc.Request) (T, error) { - var params T - if req.Params != nil { - if err := json.Unmarshal(req.Params, ¶ms); err != nil { - return params, err - } - } - return params, nil +// parseError wraps a params-decoding failure with the parse error code so the +// server forwards it verbatim instead of mapping to ErrCodeInternal. +func parseError(err error) error { + return &daemonrpc.Error{Code: daemonrpc.ErrCodeParse, Message: err.Error()} } -func (d *Daemon) handlePing(conn *daemonrpc.Conn, req *daemonrpc.Request) { - conn.SendResponse(req.ID, daemonrpc.PingResult{Pong: true}) //nolint:errcheck,gosec +func (d *Daemon) handlePing(_ context.Context, _ *daemonrpc.Conn, _ json.RawMessage) (any, error) { + return daemonrpc.PingResult{Pong: true}, nil } -func (d *Daemon) handleGetStatus(conn *daemonrpc.Conn, req *daemonrpc.Request) { +func (d *Daemon) handleGetStatus(_ context.Context, _ *daemonrpc.Conn, _ json.RawMessage) (any, error) { d.mu.RLock() accounts := make([]string, 0, len(d.config.Accounts)) for _, acct := range d.config.Accounts { @@ -77,15 +50,15 @@ func (d *Daemon) handleGetStatus(conn *daemonrpc.Conn, req *daemonrpc.Request) { } d.mu.RUnlock() - conn.SendResponse(req.ID, daemonrpc.StatusResult{ //nolint:errcheck,gosec + return daemonrpc.StatusResult{ Running: true, Uptime: int64(time.Since(d.startTime).Seconds()), Accounts: accounts, PID: os.Getpid(), - }) + }, nil } -func (d *Daemon) handleGetAccounts(conn *daemonrpc.Conn, req *daemonrpc.Request) { +func (d *Daemon) handleGetAccounts(_ context.Context, _ *daemonrpc.Conn, _ json.RawMessage) (any, error) { d.mu.RLock() defer d.mu.RUnlock() @@ -102,62 +75,54 @@ func (d *Daemon) handleGetAccounts(conn *daemonrpc.Conn, req *daemonrpc.Request) Protocol: protocol, }) } - conn.SendResponse(req.ID, infos) //nolint:errcheck,gosec + return infos, nil } -func (d *Daemon) handleReloadConfig(conn *daemonrpc.Conn, req *daemonrpc.Request) { +func (d *Daemon) handleReloadConfig(_ context.Context, _ *daemonrpc.Conn, _ json.RawMessage) (any, error) { if err := d.ReloadConfig(); err != nil { - conn.SendError(req.ID, daemonrpc.ErrCodeInternal, err.Error()) //nolint:errcheck,gosec - return + return nil, err } - conn.SendResponse(req.ID, true) //nolint:errcheck,gosec + return true, nil } -func (d *Daemon) handleFetchEmails(conn *daemonrpc.Conn, req *daemonrpc.Request) { - params, err := decodeParams[daemonrpc.FetchEmailsParams](req) +func (d *Daemon) handleFetchEmails(ctx context.Context, _ *daemonrpc.Conn, params json.RawMessage) (any, error) { + args, err := decodeParams[daemonrpc.FetchEmailsParams](params) if err != nil { - conn.SendError(req.ID, daemonrpc.ErrCodeParse, err.Error()) //nolint:errcheck,gosec - return + return nil, parseError(err) } - p, err := d.getProvider(params.AccountID) + p, err := d.getProvider(args.AccountID) if err != nil { - conn.SendError(req.ID, daemonrpc.ErrCodeInternal, err.Error()) //nolint:errcheck,gosec - return + return nil, err } - ctx, cancel := context.WithTimeout(context.Background(), fetchTimeout) + ctx, cancel := context.WithTimeout(ctx, fetchTimeout) defer cancel() - emails, err := p.FetchEmails(ctx, params.Folder, params.Limit, params.Offset) + emails, err := p.FetchEmails(ctx, args.Folder, args.Limit, args.Offset) if err != nil { - conn.SendError(req.ID, daemonrpc.ErrCodeInternal, err.Error()) //nolint:errcheck,gosec - return + return nil, err } - - conn.SendResponse(req.ID, emails) //nolint:errcheck,gosec + return emails, nil } -func (d *Daemon) handleFetchEmailBody(conn *daemonrpc.Conn, req *daemonrpc.Request) { - params, err := decodeParams[daemonrpc.FetchEmailBodyParams](req) +func (d *Daemon) handleFetchEmailBody(ctx context.Context, _ *daemonrpc.Conn, params json.RawMessage) (any, error) { + args, err := decodeParams[daemonrpc.FetchEmailBodyParams](params) if err != nil { - conn.SendError(req.ID, daemonrpc.ErrCodeParse, err.Error()) //nolint:errcheck,gosec - return + return nil, parseError(err) } - p, err := d.getProvider(params.AccountID) + p, err := d.getProvider(args.AccountID) if err != nil { - conn.SendError(req.ID, daemonrpc.ErrCodeInternal, err.Error()) //nolint:errcheck,gosec - return + return nil, err } - ctx, cancel := context.WithTimeout(context.Background(), fetchTimeout) + ctx, cancel := context.WithTimeout(ctx, fetchTimeout) defer cancel() - body, mimeType, attachments, err := p.FetchEmailBody(ctx, params.Folder, params.UID) + body, mimeType, attachments, err := p.FetchEmailBody(ctx, args.Folder, args.UID) if err != nil { - conn.SendError(req.ID, daemonrpc.ErrCodeInternal, err.Error()) //nolint:errcheck,gosec - return + return nil, err } // Convert backend.Attachment to daemonrpc.AttachmentInfo for wire transfer. @@ -171,195 +136,180 @@ func (d *Daemon) handleFetchEmailBody(conn *daemonrpc.Conn, req *daemonrpc.Reque }) } - conn.SendResponse(req.ID, daemonrpc.FetchEmailBodyResult{ //nolint:errcheck,gosec + return daemonrpc.FetchEmailBodyResult{ Body: body, BodyMIMEType: mimeType, Attachments: attInfos, - }) + }, nil } -func (d *Daemon) handleDeleteEmails(conn *daemonrpc.Conn, req *daemonrpc.Request) { - params, err := decodeParams[daemonrpc.DeleteEmailsParams](req) +func (d *Daemon) handleDeleteEmails(ctx context.Context, _ *daemonrpc.Conn, params json.RawMessage) (any, error) { + args, err := decodeParams[daemonrpc.DeleteEmailsParams](params) if err != nil { - conn.SendError(req.ID, daemonrpc.ErrCodeParse, err.Error()) //nolint:errcheck,gosec - return + return nil, parseError(err) } - p, err := d.getProvider(params.AccountID) + p, err := d.getProvider(args.AccountID) if err != nil { - conn.SendError(req.ID, daemonrpc.ErrCodeInternal, err.Error()) //nolint:errcheck,gosec - return + return nil, err } - ctx, cancel := context.WithTimeout(context.Background(), mutateTimeout) + ctx, cancel := context.WithTimeout(ctx, mutateTimeout) defer cancel() - if err := p.DeleteEmails(ctx, params.Folder, params.UIDs); err != nil { - conn.SendError(req.ID, daemonrpc.ErrCodeInternal, err.Error()) //nolint:errcheck,gosec - return + if err := p.DeleteEmails(ctx, args.Folder, args.UIDs); err != nil { + return nil, err } - conn.SendResponse(req.ID, true) //nolint:errcheck,gosec + return true, nil } -func (d *Daemon) handleArchiveEmails(conn *daemonrpc.Conn, req *daemonrpc.Request) { - params, err := decodeParams[daemonrpc.ArchiveEmailsParams](req) +func (d *Daemon) handleArchiveEmails(ctx context.Context, _ *daemonrpc.Conn, params json.RawMessage) (any, error) { + args, err := decodeParams[daemonrpc.ArchiveEmailsParams](params) if err != nil { - conn.SendError(req.ID, daemonrpc.ErrCodeParse, err.Error()) //nolint:errcheck,gosec - return + return nil, parseError(err) } - p, err := d.getProvider(params.AccountID) + p, err := d.getProvider(args.AccountID) if err != nil { - conn.SendError(req.ID, daemonrpc.ErrCodeInternal, err.Error()) //nolint:errcheck,gosec - return + return nil, err } - ctx, cancel := context.WithTimeout(context.Background(), mutateTimeout) + ctx, cancel := context.WithTimeout(ctx, mutateTimeout) defer cancel() - if err := p.ArchiveEmails(ctx, params.Folder, params.UIDs); err != nil { - conn.SendError(req.ID, daemonrpc.ErrCodeInternal, err.Error()) //nolint:errcheck,gosec - return + if err := p.ArchiveEmails(ctx, args.Folder, args.UIDs); err != nil { + return nil, err } - conn.SendResponse(req.ID, true) //nolint:errcheck,gosec + return true, nil } -func (d *Daemon) handleMoveEmails(conn *daemonrpc.Conn, req *daemonrpc.Request) { - params, err := decodeParams[daemonrpc.MoveEmailsParams](req) +func (d *Daemon) handleMoveEmails(ctx context.Context, _ *daemonrpc.Conn, params json.RawMessage) (any, error) { + args, err := decodeParams[daemonrpc.MoveEmailsParams](params) if err != nil { - conn.SendError(req.ID, daemonrpc.ErrCodeParse, err.Error()) //nolint:errcheck,gosec - return + return nil, parseError(err) } - p, err := d.getProvider(params.AccountID) + p, err := d.getProvider(args.AccountID) if err != nil { - conn.SendError(req.ID, daemonrpc.ErrCodeInternal, err.Error()) //nolint:errcheck,gosec - return + return nil, err } - ctx, cancel := context.WithTimeout(context.Background(), mutateTimeout) + ctx, cancel := context.WithTimeout(ctx, mutateTimeout) defer cancel() - if err := p.MoveEmails(ctx, params.UIDs, params.SourceFolder, params.DestFolder); err != nil { - conn.SendError(req.ID, daemonrpc.ErrCodeInternal, err.Error()) //nolint:errcheck,gosec - return + if err := p.MoveEmails(ctx, args.UIDs, args.SourceFolder, args.DestFolder); err != nil { + return nil, err } - conn.SendResponse(req.ID, true) //nolint:errcheck,gosec + return true, nil } -func (d *Daemon) handleMarkRead(conn *daemonrpc.Conn, req *daemonrpc.Request) { - params, err := decodeParams[daemonrpc.MarkReadParams](req) +func (d *Daemon) handleMarkRead(ctx context.Context, _ *daemonrpc.Conn, params json.RawMessage) (any, error) { + args, err := decodeParams[daemonrpc.MarkReadParams](params) if err != nil { - conn.SendError(req.ID, daemonrpc.ErrCodeParse, err.Error()) //nolint:errcheck,gosec - return + return nil, parseError(err) } - p, err := d.getProvider(params.AccountID) + p, err := d.getProvider(args.AccountID) if err != nil { - conn.SendError(req.ID, daemonrpc.ErrCodeInternal, err.Error()) //nolint:errcheck,gosec - return + return nil, err } - ctx, cancel := context.WithTimeout(context.Background(), mutateTimeout) + ctx, cancel := context.WithTimeout(ctx, mutateTimeout) defer cancel() - for _, uid := range params.UIDs { + for _, uid := range args.UIDs { var err error - if params.Read { - err = p.MarkAsRead(ctx, params.Folder, uid) + if args.Read { + err = p.MarkAsRead(ctx, args.Folder, uid) } else { - err = p.MarkAsUnread(ctx, params.Folder, uid) + err = p.MarkAsUnread(ctx, args.Folder, uid) } if err != nil { - log.Printf("daemon: mark read=%v %d failed: %v", params.Read, uid, err) + log.Printf("daemon: mark read=%v %d failed: %v", args.Read, uid, err) } } - conn.SendResponse(req.ID, true) //nolint:errcheck,gosec + return true, nil } -func (d *Daemon) handleFetchFolders(conn *daemonrpc.Conn, req *daemonrpc.Request) { - params, err := decodeParams[daemonrpc.FetchFoldersParams](req) +func (d *Daemon) handleFetchFolders(ctx context.Context, _ *daemonrpc.Conn, params json.RawMessage) (any, error) { + args, err := decodeParams[daemonrpc.FetchFoldersParams](params) if err != nil { - conn.SendError(req.ID, daemonrpc.ErrCodeParse, err.Error()) //nolint:errcheck,gosec - return + return nil, parseError(err) } - p, err := d.getProvider(params.AccountID) + p, err := d.getProvider(args.AccountID) if err != nil { - conn.SendError(req.ID, daemonrpc.ErrCodeInternal, err.Error()) //nolint:errcheck,gosec - return + return nil, err } - ctx, cancel := context.WithTimeout(context.Background(), mutateTimeout) + ctx, cancel := context.WithTimeout(ctx, mutateTimeout) defer cancel() folders, err := p.FetchFolders(ctx) if err != nil { - conn.SendError(req.ID, daemonrpc.ErrCodeInternal, err.Error()) //nolint:errcheck,gosec - return + return nil, err } - conn.SendResponse(req.ID, folders) //nolint:errcheck,gosec + return folders, nil } -func (d *Daemon) handleRefreshFolder(conn *daemonrpc.Conn, req *daemonrpc.Request) { - params, err := decodeParams[daemonrpc.RefreshFolderParams](req) +func (d *Daemon) handleRefreshFolder(ctx context.Context, _ *daemonrpc.Conn, params json.RawMessage) (any, error) { + args, err := decodeParams[daemonrpc.RefreshFolderParams](params) if err != nil { - conn.SendError(req.ID, daemonrpc.ErrCodeParse, err.Error()) //nolint:errcheck,gosec - return + return nil, parseError(err) } - // Async: fetch in background, push events when done. + // Async: fetch in background, push events when done. The server-scoped ctx + // outlives the request and is canceled on daemon shutdown. go func() { defer func() { if r := recover(); r != nil { - log.Printf("daemon: refresh panic for account = %s folder = %s: %v", params.AccountID, params.Folder, r) - d.broadcastToSubscribers(params.AccountID, params.Folder, daemonrpc.EventSyncError, daemonrpc.SyncErrorEvent{ - AccountID: params.AccountID, - Folder: params.Folder, + log.Printf("daemon: refresh panic for account = %s folder = %s: %v", args.AccountID, args.Folder, r) + d.broadcastToSubscribers(args.AccountID, args.Folder, daemonrpc.EventSyncError, daemonrpc.SyncErrorEvent{ + AccountID: args.AccountID, + Folder: args.Folder, Error: fmt.Sprintf("panic: %v", r), }) } }() - p, err := d.getProvider(params.AccountID) + p, err := d.getProvider(args.AccountID) if err != nil { log.Printf("daemon: refresh provider error: %v", err) return } - d.broadcastToSubscribers(params.AccountID, params.Folder, daemonrpc.EventSyncStarted, daemonrpc.SyncStartedEvent(params)) + d.broadcastToSubscribers(args.AccountID, args.Folder, daemonrpc.EventSyncStarted, daemonrpc.SyncStartedEvent(args)) - ctx, cancel := context.WithTimeout(context.Background(), fetchTimeout) + fetchCtx, cancel := context.WithTimeout(ctx, fetchTimeout) defer cancel() - emails, err := p.FetchEmails(ctx, params.Folder, 50, 0) + emails, err := p.FetchEmails(fetchCtx, args.Folder, 50, 0) if err != nil { - d.broadcastToSubscribers(params.AccountID, params.Folder, daemonrpc.EventSyncError, daemonrpc.SyncErrorEvent{ - AccountID: params.AccountID, - Folder: params.Folder, + d.broadcastToSubscribers(args.AccountID, args.Folder, daemonrpc.EventSyncError, daemonrpc.SyncErrorEvent{ + AccountID: args.AccountID, + Folder: args.Folder, Error: err.Error(), }) return } - d.broadcastToSubscribers(params.AccountID, params.Folder, daemonrpc.EventSyncComplete, daemonrpc.SyncCompleteEvent{ - AccountID: params.AccountID, - Folder: params.Folder, + d.broadcastToSubscribers(args.AccountID, args.Folder, daemonrpc.EventSyncComplete, daemonrpc.SyncCompleteEvent{ + AccountID: args.AccountID, + Folder: args.Folder, EmailCount: len(emails), }) }() - conn.SendResponse(req.ID, true) //nolint:errcheck,gosec + return true, nil } -func (d *Daemon) handleSubscribe(conn *daemonrpc.Conn, req *daemonrpc.Request) { - params, err := decodeParams[daemonrpc.SubscribeParams](req) +func (d *Daemon) handleSubscribe(_ context.Context, conn *daemonrpc.Conn, params json.RawMessage) (any, error) { + args, err := decodeParams[daemonrpc.SubscribeParams](params) if err != nil { - conn.SendError(req.ID, daemonrpc.ErrCodeParse, err.Error()) //nolint:errcheck,gosec - return + return nil, parseError(err) } - key := params.AccountID + ":" + params.Folder + key := args.AccountID + ":" + args.Folder d.subMu.Lock() if d.subscriptions[conn] == nil { @@ -369,17 +319,16 @@ func (d *Daemon) handleSubscribe(conn *daemonrpc.Conn, req *daemonrpc.Request) { d.subMu.Unlock() log.Printf("daemon: client subscribed to %s", key) - conn.SendResponse(req.ID, true) //nolint:errcheck,gosec + return true, nil } -func (d *Daemon) handleUnsubscribe(conn *daemonrpc.Conn, req *daemonrpc.Request) { - params, err := decodeParams[daemonrpc.UnsubscribeParams](req) +func (d *Daemon) handleUnsubscribe(_ context.Context, conn *daemonrpc.Conn, params json.RawMessage) (any, error) { + args, err := decodeParams[daemonrpc.UnsubscribeParams](params) if err != nil { - conn.SendError(req.ID, daemonrpc.ErrCodeParse, err.Error()) //nolint:errcheck,gosec - return + return nil, parseError(err) } - key := params.AccountID + ":" + params.Folder + key := args.AccountID + ":" + args.Folder d.subMu.Lock() if subs, ok := d.subscriptions[conn]; ok { @@ -387,5 +336,5 @@ func (d *Daemon) handleUnsubscribe(conn *daemonrpc.Conn, req *daemonrpc.Request) } d.subMu.Unlock() - conn.SendResponse(req.ID, true) //nolint:errcheck,gosec + return true, nil } diff --git a/daemon/pidfile.go b/daemon/pidfile.go new file mode 100644 index 0000000000000000000000000000000000000000..413495f7bb0f4c233052ca6aa34f2c5957d1369c --- /dev/null +++ b/daemon/pidfile.go @@ -0,0 +1,14 @@ +package daemon + +import udsrpc "github.com/floatpane/go-uds-jsonrpc" + +// PID file helpers come from the shared go-uds-jsonrpc library, which handles +// the Unix (signal-0 probe) and Windows (OpenProcess) implementations behind +// build tags. Re-exported so daemon.WritePID/IsRunning/etc. keep working for +// callers in main.go. +var ( + WritePID = udsrpc.WritePID + ReadPID = udsrpc.ReadPID + IsRunning = udsrpc.IsRunning + RemovePID = udsrpc.RemovePID +) diff --git a/daemon/pidfile_unix.go b/daemon/pidfile_unix.go deleted file mode 100644 index 8182fe9e1cc2b522137ea7e868c695b1e00aef61..0000000000000000000000000000000000000000 --- a/daemon/pidfile_unix.go +++ /dev/null @@ -1,45 +0,0 @@ -//go:build !windows - -package daemon - -import ( - "fmt" - "os" - "strconv" - "strings" - "syscall" -) - -// WritePID writes the current process ID to the given path. -func WritePID(path string) error { - return os.WriteFile(path, []byte(strconv.Itoa(os.Getpid())), 0644) -} - -// ReadPID reads the process ID from the given path. -func ReadPID(path string) (int, error) { - data, err := os.ReadFile(path) - if err != nil { - return 0, err - } - pid, err := strconv.Atoi(strings.TrimSpace(string(data))) - if err != nil { - return 0, fmt.Errorf("invalid PID file: %w", err) - } - return pid, nil -} - -// IsRunning checks if a daemon process is alive using the PID file. -func IsRunning(path string) (int, bool) { - pid, err := ReadPID(path) - if err != nil { - return 0, false - } - // Signal 0 checks if process exists without sending a signal. - err = syscall.Kill(pid, 0) - return pid, err == nil -} - -// RemovePID removes the PID file. -func RemovePID(path string) error { - return os.Remove(path) -} diff --git a/daemon/pidfile_windows.go b/daemon/pidfile_windows.go deleted file mode 100644 index 98bf2f192ac948fc287b858024b44185949c0861..0000000000000000000000000000000000000000 --- a/daemon/pidfile_windows.go +++ /dev/null @@ -1,62 +0,0 @@ -//go:build windows - -package daemon - -import ( - "fmt" - "os" - "strconv" - "strings" - "syscall" -) - -// WritePID writes the current process ID to the given path. -func WritePID(path string) error { - return os.WriteFile(path, []byte(strconv.Itoa(os.Getpid())), 0644) -} - -// ReadPID reads the process ID from the given path. -func ReadPID(path string) (int, error) { - data, err := os.ReadFile(path) - if err != nil { - return 0, err - } - pid, err := strconv.Atoi(strings.TrimSpace(string(data))) - if err != nil { - return 0, fmt.Errorf("invalid PID file: %w", err) - } - return pid, nil -} - -// IsRunning checks if a daemon process is alive using the PID file. -func IsRunning(path string) (int, bool) { - pid, err := ReadPID(path) - if err != nil { - return 0, false - } - - // On Windows, syscall.Kill is not available. We use OpenProcess instead. - // We only need PROCESS_QUERY_LIMITED_INFORMATION (0x1000) - const PROCESS_QUERY_LIMITED_INFORMATION = 0x1000 - h, err := syscall.OpenProcess(PROCESS_QUERY_LIMITED_INFORMATION, false, uint32(pid)) - if err != nil { - // Process could not be opened, which likely means it doesn't exist - return pid, false - } - defer syscall.CloseHandle(h) - - // Check if the process is still running or has exited - var exitCode uint32 - err = syscall.GetExitCodeProcess(h, &exitCode) - if err != nil { - return pid, false - } - - // STILL_ACTIVE is 259 - return pid, exitCode == 259 -} - -// RemovePID removes the PID file. -func RemovePID(path string) error { - return os.Remove(path) -} diff --git a/daemon/signals.go b/daemon/signals.go deleted file mode 100644 index 03b15aba5ddaccae6e5326b8865ddd30ef7b7765..0000000000000000000000000000000000000000 --- a/daemon/signals.go +++ /dev/null @@ -1,35 +0,0 @@ -package daemon - -import ( - "log" - "os" - "os/signal" - "syscall" -) - -// handleSignals listens for OS signals and triggers daemon actions. -// SIGTERM/SIGINT → graceful shutdown -// SIGHUP → config reload -func (d *Daemon) handleSignals() { - ch := make(chan os.Signal, 1) - signal.Notify(ch, syscall.SIGTERM, syscall.SIGINT, syscall.SIGHUP) - - for { - select { - case sig := <-ch: - switch sig { - case syscall.SIGTERM, syscall.SIGINT: - log.Println("daemon: received shutdown signal") - d.Shutdown() - return - case syscall.SIGHUP: - log.Println("daemon: received SIGHUP, reloading config") - if err := d.ReloadConfig(); err != nil { - log.Printf("daemon: config reload failed: %v", err) - } - } - case <-d.shutdown: - return - } - } -} diff --git a/daemonrpc/protocol.go b/daemonrpc/protocol.go index 0aefcbe5b5a30ce672502ba7cb6a2c8d87746055..1713a61206c2ab908aac8d7982b57f7d4c8a7562 100644 --- a/daemonrpc/protocol.go +++ b/daemonrpc/protocol.go @@ -1,84 +1,30 @@ package daemonrpc -import "encoding/json" - -// Request from client to daemon. Has an ID for matching responses. -type Request struct { - ID uint64 `json:"id"` - Method string `json:"method"` - Params json.RawMessage `json:"params,omitempty"` -} - -// Response from daemon to client. Matched to request by ID. -type Response struct { - ID uint64 `json:"id"` - Result json.RawMessage `json:"result,omitempty"` - Error *Error `json:"error,omitempty"` -} - -// Event pushed from daemon to subscribed clients. No ID field. -type Event struct { - Type string `json:"type"` - Data json.RawMessage `json:"data,omitempty"` -} - -// Error returned in a Response. -type Error struct { - Code int `json:"code"` - Message string `json:"message"` -} - -func (e *Error) Error() string { return e.Message } - -// Message is a union type for wire decoding. Exactly one of the -// fields will be populated based on the presence of "id" and "type". -type Message struct { - Request *Request - Response *Response - Event *Event -} +import udsrpc "github.com/floatpane/go-uds-jsonrpc" + +// Wire-level message types and the discriminating decoder live in the shared +// go-uds-jsonrpc library. They are aliased here so matcha code keeps using the +// daemonrpc.* names while sharing a single implementation with the daemon's +// transport layer. +type ( + Request = udsrpc.Request + Response = udsrpc.Response + Event = udsrpc.Event + Error = udsrpc.Error + Message = udsrpc.Message +) -// Discriminate: if "type" present → Event, if "method" present → Request, else → Response. -func DecodeMessage(raw json.RawMessage) (Message, error) { - var probe struct { - Type string `json:"type"` - Method string `json:"method"` - ID *uint64 `json:"id"` - } - if err := json.Unmarshal(raw, &probe); err != nil { - return Message{}, err - } - - var m Message - switch { - case probe.Type != "": - var ev Event - if err := json.Unmarshal(raw, &ev); err != nil { - return m, err - } - m.Event = &ev - case probe.Method != "": - var req Request - if err := json.Unmarshal(raw, &req); err != nil { - return m, err - } - m.Request = &req - default: - var resp Response - if err := json.Unmarshal(raw, &resp); err != nil { - return m, err - } - m.Response = &resp - } - return m, nil -} +// DecodeMessage discriminates a raw JSON object into a Request, Response, or +// Event. +var DecodeMessage = udsrpc.DecodeMessage -// Standard error codes. +// Standard error codes, re-exported from the shared library. const ( - ErrCodeParse = -32700 - ErrCodeInvalidReq = -32600 - ErrCodeNotFound = -32601 - ErrCodeInternal = -32603 + ErrCodeParse = udsrpc.ErrCodeParse + ErrCodeInvalidReq = udsrpc.ErrCodeInvalidReq + ErrCodeInvalidParams = udsrpc.ErrCodeInvalidParams + ErrCodeNotFound = udsrpc.ErrCodeNotFound + ErrCodeInternal = udsrpc.ErrCodeInternal ) // RPC method names. diff --git a/daemonrpc/socket.go b/daemonrpc/socket.go index b8e9286ef015a7202e2ea1c976020461f12733e4..6f9f1c8bf95efcc61d59388d88174e20f66383ec 100644 --- a/daemonrpc/socket.go +++ b/daemonrpc/socket.go @@ -1,44 +1,27 @@ package daemonrpc import ( - "fmt" - "os" "path/filepath" - "runtime" -) -// runtimeDir returns the base directory for daemon runtime files. -// Linux: $XDG_RUNTIME_DIR/matcha/ -// macOS: ~/Library/Caches/matcha/ -func runtimeDir() string { - switch runtime.GOOS { - case "darwin": - home, _ := os.UserHomeDir() - return filepath.Join(home, "Library", "Caches", "matcha") - default: // linux and others - if dir := os.Getenv("XDG_RUNTIME_DIR"); dir != "" { - return filepath.Join(dir, "matcha") - } - // Fallback: /tmp/matcha- - return filepath.Join(os.TempDir(), "matcha-"+uidStr()) - } -} + udsrpc "github.com/floatpane/go-uds-jsonrpc" +) -func uidStr() string { - return fmt.Sprintf("%d", os.Getuid()) -} +// appName is the per-user runtime directory namespace for matcha's daemon +// files. It matches the historical layout: $XDG_RUNTIME_DIR/matcha/ on Linux, +// ~/Library/Caches/matcha/ on macOS. +const appName = "matcha" // SocketPath returns the path to the daemon's Unix domain socket. func SocketPath() string { - return filepath.Join(runtimeDir(), "daemon.sock") + return filepath.Join(udsrpc.RuntimeDir(appName), "daemon.sock") } // PIDPath returns the path to the daemon's PID file. func PIDPath() string { - return filepath.Join(runtimeDir(), "daemon.pid") + return filepath.Join(udsrpc.RuntimeDir(appName), "daemon.pid") } // EnsureRuntimeDir creates the runtime directory if it doesn't exist. func EnsureRuntimeDir() error { - return os.MkdirAll(runtimeDir(), 0700) + return udsrpc.EnsureRuntimeDir(appName) } diff --git a/daemonrpc/transport.go b/daemonrpc/transport.go index 2c85cbd487d1440cdab384ddd180ef58dc79bfcb..a33aecaf0c27dd60be4b4232bfe0fbc7df6ecae9 100644 --- a/daemonrpc/transport.go +++ b/daemonrpc/transport.go @@ -1,90 +1,11 @@ package daemonrpc -import ( - "encoding/json" - "fmt" - "net" - "sync" -) +import udsrpc "github.com/floatpane/go-uds-jsonrpc" -// Conn wraps a net.Conn with newline-delimited JSON encoding/decoding. -type Conn struct { - conn net.Conn - enc *json.Encoder - dec *json.Decoder - mu sync.Mutex // serializes writes -} +// Conn is the newline-delimited JSON-RPC connection provided by the shared +// go-uds-jsonrpc transport. Aliased so the rest of matcha keeps referring to +// daemonrpc.Conn. +type Conn = udsrpc.Conn -// NewConn wraps an existing network connection. -func NewConn(c net.Conn) *Conn { - return &Conn{ - conn: c, - enc: json.NewEncoder(c), - dec: json.NewDecoder(c), - } -} - -// Send writes a JSON-encoded message followed by a newline. -// Thread-safe. -func (c *Conn) Send(v interface{}) error { - c.mu.Lock() - defer c.mu.Unlock() - return c.enc.Encode(v) -} - -// SendResponse sends a successful response with the given result. -func (c *Conn) SendResponse(id uint64, result interface{}) error { - raw, err := json.Marshal(result) - if err != nil { - return fmt.Errorf("marshal result: %w", err) - } - return c.Send(&Response{ - ID: id, - Result: raw, - }) -} - -// SendError sends an error response. -func (c *Conn) SendError(id uint64, code int, message string) error { - return c.Send(&Response{ - ID: id, - Error: &Error{Code: code, Message: message}, - }) -} - -// SendEvent sends a push event to the client. -func (c *Conn) SendEvent(eventType string, data interface{}) error { - raw, err := json.Marshal(data) - if err != nil { - return fmt.Errorf("marshal event data: %w", err) - } - return c.Send(&Event{ - Type: eventType, - Data: raw, - }) -} - -// ReceiveMessage reads and decodes the next JSON message, returning -// a discriminated Message (Request, Response, or Event). -func (c *Conn) ReceiveMessage() (Message, error) { - var raw json.RawMessage - if err := c.dec.Decode(&raw); err != nil { - return Message{}, err - } - return DecodeMessage(raw) -} - -// Close closes the underlying connection. -func (c *Conn) Close() error { - return c.conn.Close() -} - -// RemoteAddr returns the remote address of the connection. -func (c *Conn) RemoteAddr() net.Addr { - return c.conn.RemoteAddr() -} - -// LocalAddr returns the local address of the connection. -func (c *Conn) LocalAddr() net.Addr { - return c.conn.LocalAddr() -} +// NewConn wraps an existing net.Conn. +var NewConn = udsrpc.NewConn diff --git a/go.mod b/go.mod index 3c098c25324908e96ca116f016cd4ccb601bba70..6d21083b51bd5e693e75ce50f684357a525ac643 100644 --- a/go.mod +++ b/go.mod @@ -19,6 +19,7 @@ require ( github.com/emersion/go-message v0.18.2 github.com/emersion/go-pgpmail v0.2.2 github.com/emersion/go-sasl v0.0.0-20241020182733-b788ff22d5a6 + github.com/floatpane/go-uds-jsonrpc v0.0.1 github.com/floatpane/termimage v0.2.0 github.com/google/uuid v1.6.0 github.com/hashicorp/golang-lru/v2 v2.0.7 diff --git a/go.sum b/go.sum index 74ca8cba48b81313c67478f1b04b77df0b7b0f3f..6ad684f82ac6b384d9a6cd2689a8e75000ad4eed 100644 --- a/go.sum +++ b/go.sum @@ -68,6 +68,8 @@ github.com/emersion/go-pgpmail v0.2.2/go.mod h1:mRB5P7QKiAuOvcT36tdRZvm7nSt7V+f6 github.com/emersion/go-sasl v0.0.0-20241020182733-b788ff22d5a6 h1:oP4q0fw+fOSWn3DfFi4EXdT+B+gTtzx8GC9xsc26Znk= github.com/emersion/go-sasl v0.0.0-20241020182733-b788ff22d5a6/go.mod h1:iL2twTeMvZnrg54ZoPDNfJaJaqy0xIQFuBdrLsmspwQ= github.com/emersion/go-textwrapper v0.0.0-20200911093747-65d896831594/go.mod h1:aqO8z8wPrjkscevZJFVE1wXJrLpC5LtJG7fqLOsPb2U= +github.com/floatpane/go-uds-jsonrpc v0.0.1 h1:/sBlCXVAP9SyLWLj0wlFI07dX/SfXeUM67B4tRwK2QA= +github.com/floatpane/go-uds-jsonrpc v0.0.1/go.mod h1:G/YeDIocGkPIU+uyhJ/e8ynn9wIEMIkJ74d3VUuC4rM= github.com/floatpane/termimage v0.2.0 h1:NGjG7VUFAqpuYiPn/Vqcq2eHYqLZNmD3HkAdQTV0Lc0= github.com/floatpane/termimage v0.2.0/go.mod h1:5Mcw99w/AI4pmYVVyZKM4DkldHClH6uYO0eCQQGmaes= github.com/godbus/dbus/v5 v5.2.2 h1:TUR3TgtSVDmjiXOgAAyaZbYmIeP3DPkld3jgKGV8mXQ=