Detailed changes
@@ -24,6 +24,7 @@ require (
github.com/charmbracelet/x/exp/golden v0.0.0-20250207160936-21c02780d27a
github.com/disintegration/imageorient v0.0.0-20180920195336-8147d86e83ec
github.com/google/uuid v1.6.0
+ github.com/hashicorp/net-rpc-msgpackrpc/v2 v2.0.1
github.com/invopop/jsonschema v0.13.0
github.com/joho/godotenv v1.5.1
github.com/mark3labs/mcp-go v0.40.0
@@ -43,9 +44,16 @@ require (
github.com/tidwall/sjson v1.2.5
github.com/zeebo/xxh3 v1.0.2
gopkg.in/natefinch/lumberjack.v2 v2.2.1
+ gopkg.in/natefinch/npipe.v2 v2.0.0-20160621034901-c1b8fa8bdcce
mvdan.cc/sh/v3 v3.12.1-0.20250902163504-3cf4fd5717a5
)
+require (
+ github.com/hashicorp/errwrap v1.0.0 // indirect
+ github.com/hashicorp/go-msgpack/v2 v2.1.3 // indirect
+ github.com/hashicorp/go-multierror v1.1.1 // indirect
+)
+
require (
cloud.google.com/go v0.116.0 // indirect
cloud.google.com/go/auth v0.13.0 // indirect
@@ -162,6 +162,14 @@ github.com/gorilla/css v1.0.1/go.mod h1:BvnYkspnSzMmwRK+b8/xgNPLiIuNZr6vbZBTPQ2A
github.com/gorilla/websocket v1.4.1/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg=
github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
+github.com/hashicorp/errwrap v1.0.0 h1:hLrqtEDnRye3+sgx6z4qVLNuviH3MR5aQ0ykNJa/UYA=
+github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
+github.com/hashicorp/go-msgpack/v2 v2.1.3 h1:cB1w4Zrk0O3jQBTcFMKqYQWRFfsSQ/TYKNyUUVyCP2c=
+github.com/hashicorp/go-msgpack/v2 v2.1.3/go.mod h1:SjlwKKFnwBXvxD/I1bEcfJIBbEJ+MCUn39TxymNR5ZU=
+github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo=
+github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM=
+github.com/hashicorp/net-rpc-msgpackrpc/v2 v2.0.1 h1:Y1sd8ZCCUUlUetCk+3MCpOwdWd+WicHdk2zk2yUM0qw=
+github.com/hashicorp/net-rpc-msgpackrpc/v2 v2.0.1/go.mod h1:wASEfI5dofjm9S9Jp3JM4pfoBZy8Z07JUE2wHNi0zuc=
github.com/hexops/gotextdiff v1.0.3 h1:gitA9+qJrrTCsiCl7+kh75nPqQt1cx4ZkudSTLoUqJM=
github.com/hexops/gotextdiff v1.0.3/go.mod h1:pSWU5MAI3yDq+fZBTazCSJysOMbxWL1BSow5/V2vxeg=
github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8=
@@ -433,6 +441,8 @@ gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntN
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
gopkg.in/natefinch/lumberjack.v2 v2.2.1 h1:bBRl1b0OH9s/DuPhuXpNl+VtCaJXFZ5/uEFST95x9zc=
gopkg.in/natefinch/lumberjack.v2 v2.2.1/go.mod h1:YD8tP3GAjkrDg1eZH7EGmyESg/lsYskCTPBJVb9jqSc=
+gopkg.in/natefinch/npipe.v2 v2.0.0-20160621034901-c1b8fa8bdcce h1:+JknDZhAj8YMt7GC73Ei8pv4MzjDUNPHgQWJdtMAaDU=
+gopkg.in/natefinch/npipe.v2 v2.0.0-20160621034901-c1b8fa8bdcce/go.mod h1:5AcXVHNjg+BDxry382+8OKon8SEWiKktQR07RKPsv1c=
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ=
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
@@ -0,0 +1,41 @@
+package client
+
+import (
+ "net/rpc"
+
+ "github.com/charmbracelet/crush/internal/config"
+ "github.com/charmbracelet/crush/internal/proto"
+ "github.com/charmbracelet/crush/internal/server"
+ msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc/v2"
+)
+
+// Client represents an RPC client connected to a Crush server.
+type Client struct {
+ rpc *rpc.Client
+}
+
+// DefaultClient creates a new [Client] connected to the default server address.
+func DefaultClient() (*Client, error) {
+ return NewClient("unix", server.DefaultAddr())
+}
+
+// NewClient creates a new [Client] connected to the server at the given
+// network and address.
+func NewClient(network, address string) (*Client, error) {
+ rpc, err := msgpackrpc.Dial(network, address)
+ if err != nil {
+ return nil, err
+ }
+ return &Client{rpc: rpc}, nil
+}
+
+// GetConfig retrieves the server's configuration via RPC.
+func (c *Client) GetConfig() (*config.Config, error) {
+ var cfg config.Config
+ var args proto.Args
+ err := c.rpc.Call("ServerProto.GetConfig", &args, &cfg)
+ if err != nil {
+ return nil, err
+ }
+ return &cfg, nil
+}
@@ -0,0 +1,84 @@
+package cmd
+
+import (
+ "context"
+ "errors"
+ "fmt"
+ "log/slog"
+ "os"
+ "os/signal"
+ "time"
+
+ "github.com/charmbracelet/crush/internal/config"
+ "github.com/charmbracelet/crush/internal/server"
+ "github.com/charmbracelet/log/v2"
+ "github.com/spf13/cobra"
+)
+
+var serverCmd = &cobra.Command{
+ Use: "server",
+ Short: "Start the Crush server",
+ RunE: func(cmd *cobra.Command, args []string) error {
+ dataDir, err := cmd.Flags().GetString("data-dir")
+ if err != nil {
+ return fmt.Errorf("failed to get data directory: %v", err)
+ }
+ debug, err := cmd.Flags().GetBool("debug")
+ if err != nil {
+ return fmt.Errorf("failed to get debug flag: %v", err)
+ }
+
+ cfg, err := config.Load("", dataDir, debug)
+ if err != nil {
+ return fmt.Errorf("failed to load configuration: %v", err)
+ }
+
+ logger := log.New(os.Stderr)
+ logger.SetReportTimestamp(true)
+ slog.SetDefault(slog.New(logger))
+ if debug {
+ logger.SetLevel(log.DebugLevel)
+ slog.SetLogLoggerLevel(slog.LevelDebug)
+ }
+
+ srv := server.NewServer(cfg, "unix", server.DefaultAddr())
+ slog.Info("Starting Crush server...", "addr", srv.Addr)
+
+ errch := make(chan error, 1)
+ sigch := make(chan os.Signal, 1)
+ sigs := []os.Signal{os.Interrupt}
+ sigs = append(sigs, addSignals(sigs)...)
+ signal.Notify(sigch, sigs...)
+
+ go func() {
+ errch <- srv.ListenAndServe()
+ }()
+
+ select {
+ case <-sigch:
+ slog.Info("Received interrupt signal...")
+ case err := <-errch:
+ if err != nil && !errors.Is(err, server.ErrServerClosed) {
+ _ = srv.Close()
+ slog.Error("Server error", "error", err)
+ return fmt.Errorf("server error: %v", err)
+ }
+ }
+
+ ctx, cancel := context.WithTimeout(cmd.Context(), 5*time.Second)
+ defer cancel()
+
+ slog.Info("Shutting down...")
+
+ if err := srv.Shutdown(ctx); err != nil {
+ slog.Error("Failed to shutdown server", "error", err)
+ return fmt.Errorf("failed to shutdown server: %v", err)
+ }
+
+ return nil
+ },
+}
+
+func init() {
+ rootCmd.AddCommand(serverCmd)
+}
@@ -0,0 +1,13 @@
+//go:build !windows
+// +build !windows
+
+package cmd
+
+import (
+ "os"
+ "syscall"
+)
+
+func addSignals(sigs []os.Signal) []os.Signal {
+ return append(sigs, syscall.SIGTERM)
+}
@@ -0,0 +1,10 @@
+//go:build windows
+// +build windows
+
+package cmd
+
+import "os"
+
+func addSignals(sigs []os.Signal) []os.Signal {
+ return sigs
+}
@@ -0,0 +1,4 @@
+package proto
+
+// Args represents generic arguments that apply to all RPC routines.
+type Args struct{}
@@ -0,0 +1,10 @@
+//go:build !windows
+// +build !windows
+
+package server
+
+import "net"
+
+func listen(network, address string) (net.Listener, error) {
+ return net.Listen(network, address)
+}
@@ -0,0 +1,18 @@
+//go:build windows
+// +build windows
+
+package server
+
+import (
+ "net"
+ "strings"
+
+ "gopkg.in/natefinch/npipe.v2"
+)
+
+func listen(network, address string) (net.Listener, error) {
+ if !strings.HasPrefix(address, "tcp") {
+ return npipe.Listen(address)
+ }
+ return net.Listen(network, address)
+}
@@ -0,0 +1,17 @@
+package server
+
+import (
+ "github.com/charmbracelet/crush/internal/config"
+ "github.com/charmbracelet/crush/internal/proto"
+)
+
+// ServerProto defines the RPC methods exposed by the Crush server.
+type ServerProto struct {
+ *Server
+}
+
+// GetConfig is an RPC routine that returns the server's configuration.
+func (s *ServerProto) GetConfig(args *proto.Args, reply *config.Config) error {
+ *reply = *s.cfg
+ return nil
+}
@@ -0,0 +1,210 @@
+package server
+
+import (
+ "context"
+ "fmt"
+ "log/slog"
+ "net"
+ "net/rpc"
+ "os"
+ "os/user"
+ "path/filepath"
+ "runtime"
+ "strings"
+ "sync/atomic"
+ "time"
+
+ "github.com/charmbracelet/crush/internal/app"
+ "github.com/charmbracelet/crush/internal/config"
+ "github.com/charmbracelet/crush/internal/csync"
+
+ msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc/v2"
+)
+
+// ErrServerClosed is returned when the server is closed.
+var ErrServerClosed = fmt.Errorf("server closed")
+
+// InstanceState represents the state of a running [app.App] instance.
+type InstanceState uint8
+
+const (
+ // InstanceStateCreated indicates that the instance has been created but not yet started.
+ InstanceStateCreated InstanceState = iota
+ // InstanceStateStarted indicates that the instance is currently running.
+ InstanceStateStarted
+ // InstanceStateStopped indicates that the instance has been stopped.
+ InstanceStateStopped
+)
+
+// Instance represents a running [app.App] instance with its associated
+// resources and state.
+type Instance struct {
+ *app.App
+ State InstanceState
+ id string
+ path string
+}
+
+// ID returns the unique identifier of the instance.
+func (i *Instance) ID() string {
+ return i.id
+}
+
+// Path returns the filesystem path associated with the instance.
+func (i *Instance) Path() string {
+ return i.path
+}
+
+// DefaultAddr returns the default address path for the Crush server based on
+// the operating system.
+func DefaultAddr() string {
+ sock := "crush.sock"
+ user, err := user.Current()
+ if err == nil && user.Uid != "" {
+ sock = fmt.Sprintf("crush-%s.sock", user.Uid)
+ }
+ if runtime.GOOS == "windows" {
+ return fmt.Sprintf(`\\.\pipe\%s`, sock)
+ }
+ return filepath.Join(os.TempDir(), sock)
+}
+
+// Server represents a Crush server instance bound to a specific address.
+type Server struct {
+ // Addr can be a TCP address, a Unix socket path, or a Windows named pipe.
+ Addr string
+
+ // instances is a map of running applications managed by the server.
+ instances *csync.Map[string, *Instance]
+ // listeners is the network listener for the server.
+ listeners *csync.Map[*net.Listener, struct{}]
+ cfg *config.Config
+ logger *slog.Logger
+
+ shutdown atomic.Bool
+}
+
+// DefaultServer returns a new [Server] instance with the default address.
+func DefaultServer(cfg *config.Config) *Server {
+ return NewServer(cfg, "unix", DefaultAddr())
+}
+
+// NewServer is a helper to create a new [Server] instance with the given
+// address. On Windows, if the address is not a "tcp" address, it will be
+// converted to a named pipe format.
+func NewServer(cfg *config.Config, network, address string) *Server {
+ if runtime.GOOS == "windows" && !strings.HasPrefix(address, "tcp") &&
+ !strings.HasPrefix(address, `\\.\pipe\`) {
+ // On Windows, convert to named pipe format if not TCP
+ // (e.g., "mypipe" -> "\\.\pipe\mypipe")
+ address = fmt.Sprintf(`\\.\pipe\%s`, address)
+ }
+
+ s := new(Server)
+ s.Addr = address
+ s.cfg = cfg
+ s.instances = csync.NewMap[string, *Instance]()
+ rpc.Register(&ServerProto{s})
+ return s
+}
+
+// Serve accepts incoming connections on the listener.
+func (s *Server) Serve(ln net.Listener) error {
+ if s.listeners == nil {
+ s.listeners = csync.NewMap[*net.Listener, struct{}]()
+ }
+ s.listeners.Set(&ln, struct{}{})
+
+ var tempDelay time.Duration // how long to sleep on accept failure
+ for {
+ conn, err := ln.Accept()
+ if err != nil {
+ if s.shuttingDown() {
+ return ErrServerClosed
+ }
+ if ne, ok := err.(net.Error); ok && ne.Temporary() {
+ if tempDelay == 0 {
+ tempDelay = 5 * time.Millisecond
+ } else {
+ tempDelay *= 2
+ }
+ if max := 1 * time.Second; tempDelay > max {
+ tempDelay = max
+ }
+ time.Sleep(tempDelay)
+ continue
+ }
+ return fmt.Errorf("failed to accept connection: %w", err)
+ }
+ go s.handleConn(conn)
+ }
+}
+
+// ListenAndServe starts the server and begins accepting connections.
+func (s *Server) ListenAndServe() error {
+ ln, err := listen("unix", s.Addr)
+ if err != nil {
+ return fmt.Errorf("failed to listen on %s: %w", s.Addr, err)
+ }
+ return s.Serve(ln)
+}
+
+// Close force close all listeners and connections.
+func (s *Server) Close() error {
+ s.shutdown.Store(true)
+ var firstErr error
+ for k := range s.listeners.Seq2() {
+ if err := (*k).Close(); err != nil && firstErr == nil {
+ firstErr = err
+ }
+ s.listeners.Del(k)
+ }
+ return firstErr
+}
+
+// Shutdown gracefully shuts down the server without interrupting active
+// connections. It stops accepting new connections and waits for existing
+// connections to finish.
+func (s *Server) Shutdown(ctx context.Context) error {
+ // TODO: implement graceful shutdown
+ return s.Close()
+}
+
+func (s *Server) handleConn(conn net.Conn) {
+ s.info("accepted connection from %s", conn.RemoteAddr())
+ msgpackrpc.ServeConn(conn)
+ // var req rpc.Request
+ // codec := msgpackrpc.NewServerCodec(conn)
+ // if err := codec.ReadRequestHeader(&req); err != nil {
+ // s.error("failed to read request header: %v", err)
+ // }
+ // rpc.ServeCodec(codec)
+}
+
+func (s *Server) shuttingDown() bool {
+ return s.shutdown.Load()
+}
+
+func (s *Server) info(msg string, args ...any) {
+ if s.logger != nil {
+ s.logger.Info(msg, args...)
+ }
+}
+
+func (s *Server) debug(msg string, args ...any) {
+ if s.logger != nil {
+ s.logger.Debug(msg, args...)
+ }
+}
+
+func (s *Server) error(msg string, args ...any) {
+ if s.logger != nil {
+ s.logger.Error(msg, args...)
+ }
+}
+
+func (s *Server) warn(msg string, args ...any) {
+ if s.logger != nil {
+ s.logger.Warn(msg, args...)
+ }
+}