diff --git a/go.mod b/go.mod index b59f4d649ad9c635d5dc8d583e993208b06547a6..31320cd8d28607ca8af704ae803b721389709939 100644 --- a/go.mod +++ b/go.mod @@ -24,6 +24,7 @@ require ( github.com/charmbracelet/x/exp/golden v0.0.0-20250207160936-21c02780d27a github.com/disintegration/imageorient v0.0.0-20180920195336-8147d86e83ec github.com/google/uuid v1.6.0 + github.com/hashicorp/net-rpc-msgpackrpc/v2 v2.0.1 github.com/invopop/jsonschema v0.13.0 github.com/joho/godotenv v1.5.1 github.com/mark3labs/mcp-go v0.40.0 @@ -43,9 +44,16 @@ require ( github.com/tidwall/sjson v1.2.5 github.com/zeebo/xxh3 v1.0.2 gopkg.in/natefinch/lumberjack.v2 v2.2.1 + gopkg.in/natefinch/npipe.v2 v2.0.0-20160621034901-c1b8fa8bdcce mvdan.cc/sh/v3 v3.12.1-0.20250902163504-3cf4fd5717a5 ) +require ( + github.com/hashicorp/errwrap v1.0.0 // indirect + github.com/hashicorp/go-msgpack/v2 v2.1.3 // indirect + github.com/hashicorp/go-multierror v1.1.1 // indirect +) + require ( cloud.google.com/go v0.116.0 // indirect cloud.google.com/go/auth v0.13.0 // indirect diff --git a/go.sum b/go.sum index a921201c472e338f3c068503d9404d68a7bcba12..4ce162c241099724f3534b21d741f0744807ad24 100644 --- a/go.sum +++ b/go.sum @@ -162,6 +162,14 @@ github.com/gorilla/css v1.0.1/go.mod h1:BvnYkspnSzMmwRK+b8/xgNPLiIuNZr6vbZBTPQ2A github.com/gorilla/websocket v1.4.1/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg= github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= +github.com/hashicorp/errwrap v1.0.0 h1:hLrqtEDnRye3+sgx6z4qVLNuviH3MR5aQ0ykNJa/UYA= +github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= +github.com/hashicorp/go-msgpack/v2 v2.1.3 h1:cB1w4Zrk0O3jQBTcFMKqYQWRFfsSQ/TYKNyUUVyCP2c= +github.com/hashicorp/go-msgpack/v2 v2.1.3/go.mod h1:SjlwKKFnwBXvxD/I1bEcfJIBbEJ+MCUn39TxymNR5ZU= +github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo= +github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM= +github.com/hashicorp/net-rpc-msgpackrpc/v2 v2.0.1 h1:Y1sd8ZCCUUlUetCk+3MCpOwdWd+WicHdk2zk2yUM0qw= +github.com/hashicorp/net-rpc-msgpackrpc/v2 v2.0.1/go.mod h1:wASEfI5dofjm9S9Jp3JM4pfoBZy8Z07JUE2wHNi0zuc= github.com/hexops/gotextdiff v1.0.3 h1:gitA9+qJrrTCsiCl7+kh75nPqQt1cx4ZkudSTLoUqJM= github.com/hexops/gotextdiff v1.0.3/go.mod h1:pSWU5MAI3yDq+fZBTazCSJysOMbxWL1BSow5/V2vxeg= github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8= @@ -433,6 +441,8 @@ gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntN gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= gopkg.in/natefinch/lumberjack.v2 v2.2.1 h1:bBRl1b0OH9s/DuPhuXpNl+VtCaJXFZ5/uEFST95x9zc= gopkg.in/natefinch/lumberjack.v2 v2.2.1/go.mod h1:YD8tP3GAjkrDg1eZH7EGmyESg/lsYskCTPBJVb9jqSc= +gopkg.in/natefinch/npipe.v2 v2.0.0-20160621034901-c1b8fa8bdcce h1:+JknDZhAj8YMt7GC73Ei8pv4MzjDUNPHgQWJdtMAaDU= +gopkg.in/natefinch/npipe.v2 v2.0.0-20160621034901-c1b8fa8bdcce/go.mod h1:5AcXVHNjg+BDxry382+8OKon8SEWiKktQR07RKPsv1c= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= diff --git a/internal/client/client.go b/internal/client/client.go new file mode 100644 index 0000000000000000000000000000000000000000..a7ed20a973a6204a056d611edcc21be3a8a10353 --- /dev/null +++ b/internal/client/client.go @@ -0,0 +1,41 @@ +package client + +import ( + "net/rpc" + + "github.com/charmbracelet/crush/internal/config" + "github.com/charmbracelet/crush/internal/proto" + "github.com/charmbracelet/crush/internal/server" + msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc/v2" +) + +// Client represents an RPC client connected to a Crush server. +type Client struct { + rpc *rpc.Client +} + +// DefaultClient creates a new [Client] connected to the default server address. +func DefaultClient() (*Client, error) { + return NewClient("unix", server.DefaultAddr()) +} + +// NewClient creates a new [Client] connected to the server at the given +// network and address. +func NewClient(network, address string) (*Client, error) { + rpc, err := msgpackrpc.Dial(network, address) + if err != nil { + return nil, err + } + return &Client{rpc: rpc}, nil +} + +// GetConfig retrieves the server's configuration via RPC. +func (c *Client) GetConfig() (*config.Config, error) { + var cfg config.Config + var args proto.Args + err := c.rpc.Call("ServerProto.GetConfig", &args, &cfg) + if err != nil { + return nil, err + } + return &cfg, nil +} diff --git a/internal/cmd/server.go b/internal/cmd/server.go new file mode 100644 index 0000000000000000000000000000000000000000..b0582ad1d20b8a78908b13be322c1f9c7ea50583 --- /dev/null +++ b/internal/cmd/server.go @@ -0,0 +1,84 @@ +package cmd + +import ( + "context" + "errors" + "fmt" + "log/slog" + "os" + "os/signal" + "time" + + "github.com/charmbracelet/crush/internal/config" + "github.com/charmbracelet/crush/internal/server" + "github.com/charmbracelet/log/v2" + "github.com/spf13/cobra" +) + +var serverCmd = &cobra.Command{ + Use: "server", + Short: "Start the Crush server", + RunE: func(cmd *cobra.Command, args []string) error { + dataDir, err := cmd.Flags().GetString("data-dir") + if err != nil { + return fmt.Errorf("failed to get data directory: %v", err) + } + debug, err := cmd.Flags().GetBool("debug") + if err != nil { + return fmt.Errorf("failed to get debug flag: %v", err) + } + + cfg, err := config.Load("", dataDir, debug) + if err != nil { + return fmt.Errorf("failed to load configuration: %v", err) + } + + logger := log.New(os.Stderr) + logger.SetReportTimestamp(true) + slog.SetDefault(slog.New(logger)) + if debug { + logger.SetLevel(log.DebugLevel) + slog.SetLogLoggerLevel(slog.LevelDebug) + } + + srv := server.NewServer(cfg, "unix", server.DefaultAddr()) + slog.Info("Starting Crush server...", "addr", srv.Addr) + + errch := make(chan error, 1) + sigch := make(chan os.Signal, 1) + sigs := []os.Signal{os.Interrupt} + sigs = append(sigs, addSignals(sigs)...) + signal.Notify(sigch, sigs...) + + go func() { + errch <- srv.ListenAndServe() + }() + + select { + case <-sigch: + slog.Info("Received interrupt signal...") + case err := <-errch: + if err != nil && !errors.Is(err, server.ErrServerClosed) { + _ = srv.Close() + slog.Error("Server error", "error", err) + return fmt.Errorf("server error: %v", err) + } + } + + ctx, cancel := context.WithTimeout(cmd.Context(), 5*time.Second) + defer cancel() + + slog.Info("Shutting down...") + + if err := srv.Shutdown(ctx); err != nil { + slog.Error("Failed to shutdown server", "error", err) + return fmt.Errorf("failed to shutdown server: %v", err) + } + + return nil + }, +} + +func init() { + rootCmd.AddCommand(serverCmd) +} diff --git a/internal/cmd/server_other.go b/internal/cmd/server_other.go new file mode 100644 index 0000000000000000000000000000000000000000..58b05629bf5b85a579ded6379dec53f555fb68e7 --- /dev/null +++ b/internal/cmd/server_other.go @@ -0,0 +1,13 @@ +//go:build !windows +// +build !windows + +package cmd + +import ( + "os" + "syscall" +) + +func addSignals(sigs []os.Signal) []os.Signal { + return append(sigs, syscall.SIGTERM) +} diff --git a/internal/cmd/server_windows.go b/internal/cmd/server_windows.go new file mode 100644 index 0000000000000000000000000000000000000000..eff60b8b635128e08a6e27f4273520607f05a4c3 --- /dev/null +++ b/internal/cmd/server_windows.go @@ -0,0 +1,10 @@ +//go:build windows +// +build windows + +package cmd + +import "os" + +func addSignals(sigs []os.Signal) []os.Signal { + return sigs +} diff --git a/internal/proto/proto.go b/internal/proto/proto.go new file mode 100644 index 0000000000000000000000000000000000000000..9a9139334f20db9727911e221bd95ab8f68394b0 --- /dev/null +++ b/internal/proto/proto.go @@ -0,0 +1,4 @@ +package proto + +// Args represents generic arguments that apply to all RPC routines. +type Args struct{} diff --git a/internal/server/net_other.go b/internal/server/net_other.go new file mode 100644 index 0000000000000000000000000000000000000000..b1fba90cf306b45c5764eb3702d2da642122ca69 --- /dev/null +++ b/internal/server/net_other.go @@ -0,0 +1,10 @@ +//go:build !windows +// +build !windows + +package server + +import "net" + +func listen(network, address string) (net.Listener, error) { + return net.Listen(network, address) +} diff --git a/internal/server/net_windows.go b/internal/server/net_windows.go new file mode 100644 index 0000000000000000000000000000000000000000..5c3551f5acd159a801cd8145bc18c3fc68db331a --- /dev/null +++ b/internal/server/net_windows.go @@ -0,0 +1,18 @@ +//go:build windows +// +build windows + +package server + +import ( + "net" + "strings" + + "gopkg.in/natefinch/npipe.v2" +) + +func listen(network, address string) (net.Listener, error) { + if !strings.HasPrefix(address, "tcp") { + return npipe.Listen(address) + } + return net.Listen(network, address) +} diff --git a/internal/server/proto.go b/internal/server/proto.go new file mode 100644 index 0000000000000000000000000000000000000000..4d95366d3af5c59722bd53bddea3583e331a305c --- /dev/null +++ b/internal/server/proto.go @@ -0,0 +1,17 @@ +package server + +import ( + "github.com/charmbracelet/crush/internal/config" + "github.com/charmbracelet/crush/internal/proto" +) + +// ServerProto defines the RPC methods exposed by the Crush server. +type ServerProto struct { + *Server +} + +// GetConfig is an RPC routine that returns the server's configuration. +func (s *ServerProto) GetConfig(args *proto.Args, reply *config.Config) error { + *reply = *s.cfg + return nil +} diff --git a/internal/server/server.go b/internal/server/server.go new file mode 100644 index 0000000000000000000000000000000000000000..3b7e16643bac914b9b634ce70a2afd8106872e37 --- /dev/null +++ b/internal/server/server.go @@ -0,0 +1,210 @@ +package server + +import ( + "context" + "fmt" + "log/slog" + "net" + "net/rpc" + "os" + "os/user" + "path/filepath" + "runtime" + "strings" + "sync/atomic" + "time" + + "github.com/charmbracelet/crush/internal/app" + "github.com/charmbracelet/crush/internal/config" + "github.com/charmbracelet/crush/internal/csync" + + msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc/v2" +) + +// ErrServerClosed is returned when the server is closed. +var ErrServerClosed = fmt.Errorf("server closed") + +// InstanceState represents the state of a running [app.App] instance. +type InstanceState uint8 + +const ( + // InstanceStateCreated indicates that the instance has been created but not yet started. + InstanceStateCreated InstanceState = iota + // InstanceStateStarted indicates that the instance is currently running. + InstanceStateStarted + // InstanceStateStopped indicates that the instance has been stopped. + InstanceStateStopped +) + +// Instance represents a running [app.App] instance with its associated +// resources and state. +type Instance struct { + *app.App + State InstanceState + id string + path string +} + +// ID returns the unique identifier of the instance. +func (i *Instance) ID() string { + return i.id +} + +// Path returns the filesystem path associated with the instance. +func (i *Instance) Path() string { + return i.path +} + +// DefaultAddr returns the default address path for the Crush server based on +// the operating system. +func DefaultAddr() string { + sock := "crush.sock" + user, err := user.Current() + if err == nil && user.Uid != "" { + sock = fmt.Sprintf("crush-%s.sock", user.Uid) + } + if runtime.GOOS == "windows" { + return fmt.Sprintf(`\\.\pipe\%s`, sock) + } + return filepath.Join(os.TempDir(), sock) +} + +// Server represents a Crush server instance bound to a specific address. +type Server struct { + // Addr can be a TCP address, a Unix socket path, or a Windows named pipe. + Addr string + + // instances is a map of running applications managed by the server. + instances *csync.Map[string, *Instance] + // listeners is the network listener for the server. + listeners *csync.Map[*net.Listener, struct{}] + cfg *config.Config + logger *slog.Logger + + shutdown atomic.Bool +} + +// DefaultServer returns a new [Server] instance with the default address. +func DefaultServer(cfg *config.Config) *Server { + return NewServer(cfg, "unix", DefaultAddr()) +} + +// NewServer is a helper to create a new [Server] instance with the given +// address. On Windows, if the address is not a "tcp" address, it will be +// converted to a named pipe format. +func NewServer(cfg *config.Config, network, address string) *Server { + if runtime.GOOS == "windows" && !strings.HasPrefix(address, "tcp") && + !strings.HasPrefix(address, `\\.\pipe\`) { + // On Windows, convert to named pipe format if not TCP + // (e.g., "mypipe" -> "\\.\pipe\mypipe") + address = fmt.Sprintf(`\\.\pipe\%s`, address) + } + + s := new(Server) + s.Addr = address + s.cfg = cfg + s.instances = csync.NewMap[string, *Instance]() + rpc.Register(&ServerProto{s}) + return s +} + +// Serve accepts incoming connections on the listener. +func (s *Server) Serve(ln net.Listener) error { + if s.listeners == nil { + s.listeners = csync.NewMap[*net.Listener, struct{}]() + } + s.listeners.Set(&ln, struct{}{}) + + var tempDelay time.Duration // how long to sleep on accept failure + for { + conn, err := ln.Accept() + if err != nil { + if s.shuttingDown() { + return ErrServerClosed + } + if ne, ok := err.(net.Error); ok && ne.Temporary() { + if tempDelay == 0 { + tempDelay = 5 * time.Millisecond + } else { + tempDelay *= 2 + } + if max := 1 * time.Second; tempDelay > max { + tempDelay = max + } + time.Sleep(tempDelay) + continue + } + return fmt.Errorf("failed to accept connection: %w", err) + } + go s.handleConn(conn) + } +} + +// ListenAndServe starts the server and begins accepting connections. +func (s *Server) ListenAndServe() error { + ln, err := listen("unix", s.Addr) + if err != nil { + return fmt.Errorf("failed to listen on %s: %w", s.Addr, err) + } + return s.Serve(ln) +} + +// Close force close all listeners and connections. +func (s *Server) Close() error { + s.shutdown.Store(true) + var firstErr error + for k := range s.listeners.Seq2() { + if err := (*k).Close(); err != nil && firstErr == nil { + firstErr = err + } + s.listeners.Del(k) + } + return firstErr +} + +// Shutdown gracefully shuts down the server without interrupting active +// connections. It stops accepting new connections and waits for existing +// connections to finish. +func (s *Server) Shutdown(ctx context.Context) error { + // TODO: implement graceful shutdown + return s.Close() +} + +func (s *Server) handleConn(conn net.Conn) { + s.info("accepted connection from %s", conn.RemoteAddr()) + msgpackrpc.ServeConn(conn) + // var req rpc.Request + // codec := msgpackrpc.NewServerCodec(conn) + // if err := codec.ReadRequestHeader(&req); err != nil { + // s.error("failed to read request header: %v", err) + // } + // rpc.ServeCodec(codec) +} + +func (s *Server) shuttingDown() bool { + return s.shutdown.Load() +} + +func (s *Server) info(msg string, args ...any) { + if s.logger != nil { + s.logger.Info(msg, args...) + } +} + +func (s *Server) debug(msg string, args ...any) { + if s.logger != nil { + s.logger.Debug(msg, args...) + } +} + +func (s *Server) error(msg string, args ...any) { + if s.logger != nil { + s.logger.Error(msg, args...) + } +} + +func (s *Server) warn(msg string, args ...any) { + if s.logger != nil { + s.logger.Warn(msg, args...) + } +}