@@ -0,0 +1,42 @@
+package server
+
+import (
+ "log/slog"
+ "net/rpc"
+
+ msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc/v2"
+)
+
+// ServerCodec is a wrapper around msgpackrpc.ServerCodec that adds logging
+// functionality.
+type ServerCodec struct {
+ *msgpackrpc.MsgpackCodec
+ logger *slog.Logger
+}
+
+var _ rpc.ServerCodec = (*ServerCodec)(nil)
+
+// ReadRequestHeader reads the request header and logs it.
+func (c *ServerCodec) ReadRequestHeader(r *rpc.Request) error {
+ err := c.MsgpackCodec.ReadRequestHeader(r)
+ if c.logger != nil {
+ c.logger.Debug("rpc request",
+ slog.String("service_method", r.ServiceMethod),
+ slog.Int("seq", int(r.Seq)),
+ )
+ }
+ return err
+}
+
+// WriteResponse writes the response and logs it.
+func (c *ServerCodec) WriteResponse(r *rpc.Response, body any) error {
+ err := c.MsgpackCodec.WriteResponse(r, body)
+ if c.logger != nil {
+ c.logger.Debug("rpc response",
+ slog.String("service_method", r.ServiceMethod),
+ slog.String("error", r.Error),
+ slog.Int("seq", int(r.Seq)),
+ )
+ }
+ return err
+}
@@ -84,6 +84,11 @@ type Server struct {
shutdown atomic.Bool
}
+// SetLogger sets the logger for the server.
+func (s *Server) SetLogger(logger *slog.Logger) {
+ s.logger = logger
+}
+
// DefaultServer returns a new [Server] instance with the default address.
func DefaultServer(cfg *config.Config) *Server {
return NewServer(cfg, "unix", DefaultAddr())
@@ -171,14 +176,15 @@ func (s *Server) Shutdown(ctx context.Context) error {
}
func (s *Server) handleConn(conn net.Conn) {
- s.info("accepted connection from %s", conn.RemoteAddr())
- msgpackrpc.ServeConn(conn)
- // var req rpc.Request
- // codec := msgpackrpc.NewServerCodec(conn)
- // if err := codec.ReadRequestHeader(&req); err != nil {
- // s.error("failed to read request header: %v", err)
- // }
- // rpc.ServeCodec(codec)
+ s.info("accepted connection", "remote_addr", conn.LocalAddr())
+ codec := &ServerCodec{
+ MsgpackCodec: msgpackrpc.NewCodec(true, true, conn),
+ logger: s.logger.With(
+ slog.String("remote_addr", conn.RemoteAddr().String()),
+ slog.String("local_addr", conn.LocalAddr().String()),
+ ),
+ }
+ rpc.ServeCodec(codec)
}
func (s *Server) shuttingDown() bool {