From 5f9a4b9757b9751c1d46c2138a299488bfff4c7f Mon Sep 17 00:00:00 2001 From: Ayman Bagabas Date: Thu, 18 Sep 2025 12:57:40 -0400 Subject: [PATCH] feat: server: log RPC requests and responses --- internal/server/codec.go | 42 +++++++++++++++++++++++++++++++++++++++ internal/server/server.go | 22 ++++++++++++-------- 2 files changed, 56 insertions(+), 8 deletions(-) create mode 100644 internal/server/codec.go diff --git a/internal/server/codec.go b/internal/server/codec.go new file mode 100644 index 0000000000000000000000000000000000000000..9311a5767848ba25aa0e09d9b4de2768b05a1024 --- /dev/null +++ b/internal/server/codec.go @@ -0,0 +1,42 @@ +package server + +import ( + "log/slog" + "net/rpc" + + msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc/v2" +) + +// ServerCodec is a wrapper around msgpackrpc.ServerCodec that adds logging +// functionality. +type ServerCodec struct { + *msgpackrpc.MsgpackCodec + logger *slog.Logger +} + +var _ rpc.ServerCodec = (*ServerCodec)(nil) + +// ReadRequestHeader reads the request header and logs it. +func (c *ServerCodec) ReadRequestHeader(r *rpc.Request) error { + err := c.MsgpackCodec.ReadRequestHeader(r) + if c.logger != nil { + c.logger.Debug("rpc request", + slog.String("service_method", r.ServiceMethod), + slog.Int("seq", int(r.Seq)), + ) + } + return err +} + +// WriteResponse writes the response and logs it. +func (c *ServerCodec) WriteResponse(r *rpc.Response, body any) error { + err := c.MsgpackCodec.WriteResponse(r, body) + if c.logger != nil { + c.logger.Debug("rpc response", + slog.String("service_method", r.ServiceMethod), + slog.String("error", r.Error), + slog.Int("seq", int(r.Seq)), + ) + } + return err +} diff --git a/internal/server/server.go b/internal/server/server.go index 3b7e16643bac914b9b634ce70a2afd8106872e37..f4f75eda1a9dffd99a366aeaca02e01ae80df3a7 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -84,6 +84,11 @@ type Server struct { shutdown atomic.Bool } +// SetLogger sets the logger for the server. +func (s *Server) SetLogger(logger *slog.Logger) { + s.logger = logger +} + // DefaultServer returns a new [Server] instance with the default address. func DefaultServer(cfg *config.Config) *Server { return NewServer(cfg, "unix", DefaultAddr()) @@ -171,14 +176,15 @@ func (s *Server) Shutdown(ctx context.Context) error { } func (s *Server) handleConn(conn net.Conn) { - s.info("accepted connection from %s", conn.RemoteAddr()) - msgpackrpc.ServeConn(conn) - // var req rpc.Request - // codec := msgpackrpc.NewServerCodec(conn) - // if err := codec.ReadRequestHeader(&req); err != nil { - // s.error("failed to read request header: %v", err) - // } - // rpc.ServeCodec(codec) + s.info("accepted connection", "remote_addr", conn.LocalAddr()) + codec := &ServerCodec{ + MsgpackCodec: msgpackrpc.NewCodec(true, true, conn), + logger: s.logger.With( + slog.String("remote_addr", conn.RemoteAddr().String()), + slog.String("local_addr", conn.LocalAddr().String()), + ), + } + rpc.ServeCodec(codec) } func (s *Server) shuttingDown() bool {