@@ -402,9 +402,10 @@ func ensureServer(cmd *cobra.Command, hostURL *url.URL) error {
}
if needsStart {
- if err := startDetachedServer(cmd); err != nil {
- return err
+ if err := spawnAndWaitReady(cmd, hostURL); err != nil {
+ return fmt.Errorf("failed to initialize crush server: %v", err)
}
+ return nil
}
if err := waitForServerReady(cmd.Context(), hostURL); err != nil {
@@ -415,6 +416,70 @@ func ensureServer(cmd *cobra.Command, hostURL *url.URL) error {
return nil
}
+// spawnAndWaitReady serializes the spawn-and-wait-for-readiness sequence
+// across concurrent clients via an exclusive flock on
+// $XDG_CACHE_HOME/crush/server-<safeHost>/start.lock.
+//
+// After acquiring the lock it re-probes readiness so that a client that
+// blocked while another client was spawning can skip its own spawn and
+// just use the now-running server. The lock is held only for the
+// duration of "spawn + readiness probe" and released before the caller
+// resumes its normal lifetime.
+func spawnAndWaitReady(cmd *cobra.Command, hostURL *url.URL) error {
+ chDir, err := perHostServerDir()
+ if err != nil {
+ return err
+ }
+ release, err := acquireSpawnLock(filepath.Join(chDir, "start.lock"))
+ if err != nil {
+ // If the lock itself is unavailable, fall back to the
+ // unsynchronized path rather than blocking the user.
+ slog.Warn("Failed to acquire spawn lock, proceeding without single-flight", "error", err)
+ if err := startDetachedServer(cmd); err != nil {
+ return err
+ }
+ return waitForServerReady(cmd.Context(), hostURL)
+ }
+ defer release()
+
+ // Another client may have just finished spawning while we were
+ // waiting on the lock; if the server is already responsive, skip
+ // the spawn entirely.
+ probeCtx, cancel := context.WithTimeout(cmd.Context(), 200*time.Millisecond)
+ probeErr := quickHealthProbe(probeCtx, hostURL)
+ cancel()
+ if probeErr == nil {
+ return nil
+ }
+
+ if err := startDetachedServer(cmd); err != nil {
+ return err
+ }
+ return waitForServerReady(cmd.Context(), hostURL)
+}
+
+// quickHealthProbe issues a single readiness request with the caller's
+// context and returns nil iff the server is responsive right now.
+func quickHealthProbe(ctx context.Context, hostURL *url.URL) error {
+ httpClient, reqURL, err := readinessHTTPClient(hostURL)
+ if err != nil {
+ return err
+ }
+ return probeHealth(ctx, httpClient, reqURL, hostURL)
+}
+
+// perHostServerDir returns (and creates) the cache directory used for
+// per-host server state (logs, start.lock, etc.). It mirrors the path
+// computed in startDetachedServer so both code paths stay in sync.
+func perHostServerDir() (string, error) {
+ safeClientHost := safeNameRegexp.ReplaceAllString(clientHost, "_")
+ chDir := filepath.Join(config.GlobalCacheDir(), "server-"+safeClientHost)
+ if err := os.MkdirAll(chDir, 0o700); err != nil {
+ return "", fmt.Errorf("failed to create server working directory: %v", err)
+ }
+ return chDir, nil
+}
+
// serverReadyTimeout returns the total budget for the readiness probe.
// Overridable via CRUSH_SERVER_READY_TIMEOUT (parsed as a Go duration).
func serverReadyTimeout() time.Duration {
@@ -572,10 +637,9 @@ func startDetachedServer(cmd *cobra.Command) error {
return fmt.Errorf("failed to get executable path: %v", err)
}
- safeClientHost := safeNameRegexp.ReplaceAllString(clientHost, "_")
- chDir := filepath.Join(config.GlobalCacheDir(), "server-"+safeClientHost)
- if err := os.MkdirAll(chDir, 0o700); err != nil {
- return fmt.Errorf("failed to create server working directory: %v", err)
+ chDir, err := perHostServerDir()
+ if err != nil {
+ return err
}
cmdArgs := []string{"server"}
@@ -0,0 +1,28 @@
+//go:build !windows
+
+package cmd
+
+import (
+ "fmt"
+ "os"
+
+ "golang.org/x/sys/unix"
+)
+
+// acquireSpawnLock takes an exclusive flock on the given file (creating
+// it if necessary) and returns a release function that unlocks and
+// closes the file. Blocks until the lock is acquired.
+func acquireSpawnLock(path string) (func(), error) {
+ f, err := os.OpenFile(path, os.O_RDWR|os.O_CREATE, 0o600)
+ if err != nil {
+ return nil, fmt.Errorf("open spawn lock %q: %v", path, err)
+ }
+ if err := unix.Flock(int(f.Fd()), unix.LOCK_EX); err != nil {
+ _ = f.Close()
+ return nil, fmt.Errorf("flock spawn lock %q: %v", path, err)
+ }
+ return func() {
+ _ = unix.Flock(int(f.Fd()), unix.LOCK_UN)
+ _ = f.Close()
+ }, nil
+}
@@ -0,0 +1,32 @@
+//go:build windows
+
+package cmd
+
+import (
+ "fmt"
+ "math"
+ "os"
+
+ "golang.org/x/sys/windows"
+)
+
+// acquireSpawnLock takes an exclusive lock on the given file (creating
+// it if necessary) using LockFileEx, and returns a release function
+// that unlocks and closes the file. Blocks until the lock is acquired.
+func acquireSpawnLock(path string) (func(), error) {
+ f, err := os.OpenFile(path, os.O_RDWR|os.O_CREATE, 0o600)
+ if err != nil {
+ return nil, fmt.Errorf("open spawn lock %q: %v", path, err)
+ }
+ h := windows.Handle(f.Fd())
+ ol := new(windows.Overlapped)
+ if err := windows.LockFileEx(h, windows.LOCKFILE_EXCLUSIVE_LOCK, 0, math.MaxUint32, math.MaxUint32, ol); err != nil {
+ _ = f.Close()
+ return nil, fmt.Errorf("LockFileEx spawn lock %q: %v", path, err)
+ }
+ return func() {
+ ol := new(windows.Overlapped)
+ _ = windows.UnlockFileEx(windows.Handle(f.Fd()), 0, math.MaxUint32, math.MaxUint32, ol)
+ _ = f.Close()
+ }, nil
+}