diff --git a/internal/cmd/root.go b/internal/cmd/root.go index 7ee034d4be674ad9938b370a93b284c36e7abd9e..a13b56fced053cf0a9f35619db19c1bc7f313b1b 100644 --- a/internal/cmd/root.go +++ b/internal/cmd/root.go @@ -402,9 +402,10 @@ func ensureServer(cmd *cobra.Command, hostURL *url.URL) error { } if needsStart { - if err := startDetachedServer(cmd); err != nil { - return err + if err := spawnAndWaitReady(cmd, hostURL); err != nil { + return fmt.Errorf("failed to initialize crush server: %v", err) } + return nil } if err := waitForServerReady(cmd.Context(), hostURL); err != nil { @@ -415,6 +416,70 @@ func ensureServer(cmd *cobra.Command, hostURL *url.URL) error { return nil } +// spawnAndWaitReady serializes the spawn-and-wait-for-readiness sequence +// across concurrent clients via an exclusive flock on +// $XDG_CACHE_HOME/crush/server-/start.lock. +// +// After acquiring the lock it re-probes readiness so that a client that +// blocked while another client was spawning can skip its own spawn and +// just use the now-running server. The lock is held only for the +// duration of "spawn + readiness probe" and released before the caller +// resumes its normal lifetime. +func spawnAndWaitReady(cmd *cobra.Command, hostURL *url.URL) error { + chDir, err := perHostServerDir() + if err != nil { + return err + } + release, err := acquireSpawnLock(filepath.Join(chDir, "start.lock")) + if err != nil { + // If the lock itself is unavailable, fall back to the + // unsynchronized path rather than blocking the user. + slog.Warn("Failed to acquire spawn lock, proceeding without single-flight", "error", err) + if err := startDetachedServer(cmd); err != nil { + return err + } + return waitForServerReady(cmd.Context(), hostURL) + } + defer release() + + // Another client may have just finished spawning while we were + // waiting on the lock; if the server is already responsive, skip + // the spawn entirely. + probeCtx, cancel := context.WithTimeout(cmd.Context(), 200*time.Millisecond) + probeErr := quickHealthProbe(probeCtx, hostURL) + cancel() + if probeErr == nil { + return nil + } + + if err := startDetachedServer(cmd); err != nil { + return err + } + return waitForServerReady(cmd.Context(), hostURL) +} + +// quickHealthProbe issues a single readiness request with the caller's +// context and returns nil iff the server is responsive right now. +func quickHealthProbe(ctx context.Context, hostURL *url.URL) error { + httpClient, reqURL, err := readinessHTTPClient(hostURL) + if err != nil { + return err + } + return probeHealth(ctx, httpClient, reqURL, hostURL) +} + +// perHostServerDir returns (and creates) the cache directory used for +// per-host server state (logs, start.lock, etc.). It mirrors the path +// computed in startDetachedServer so both code paths stay in sync. +func perHostServerDir() (string, error) { + safeClientHost := safeNameRegexp.ReplaceAllString(clientHost, "_") + chDir := filepath.Join(config.GlobalCacheDir(), "server-"+safeClientHost) + if err := os.MkdirAll(chDir, 0o700); err != nil { + return "", fmt.Errorf("failed to create server working directory: %v", err) + } + return chDir, nil +} + // serverReadyTimeout returns the total budget for the readiness probe. // Overridable via CRUSH_SERVER_READY_TIMEOUT (parsed as a Go duration). func serverReadyTimeout() time.Duration { @@ -572,10 +637,9 @@ func startDetachedServer(cmd *cobra.Command) error { return fmt.Errorf("failed to get executable path: %v", err) } - safeClientHost := safeNameRegexp.ReplaceAllString(clientHost, "_") - chDir := filepath.Join(config.GlobalCacheDir(), "server-"+safeClientHost) - if err := os.MkdirAll(chDir, 0o700); err != nil { - return fmt.Errorf("failed to create server working directory: %v", err) + chDir, err := perHostServerDir() + if err != nil { + return err } cmdArgs := []string{"server"} diff --git a/internal/cmd/spawnlock_other.go b/internal/cmd/spawnlock_other.go new file mode 100644 index 0000000000000000000000000000000000000000..1e07b7728a26e51e0ffaee16af1d685c13e5f424 --- /dev/null +++ b/internal/cmd/spawnlock_other.go @@ -0,0 +1,28 @@ +//go:build !windows + +package cmd + +import ( + "fmt" + "os" + + "golang.org/x/sys/unix" +) + +// acquireSpawnLock takes an exclusive flock on the given file (creating +// it if necessary) and returns a release function that unlocks and +// closes the file. Blocks until the lock is acquired. +func acquireSpawnLock(path string) (func(), error) { + f, err := os.OpenFile(path, os.O_RDWR|os.O_CREATE, 0o600) + if err != nil { + return nil, fmt.Errorf("open spawn lock %q: %v", path, err) + } + if err := unix.Flock(int(f.Fd()), unix.LOCK_EX); err != nil { + _ = f.Close() + return nil, fmt.Errorf("flock spawn lock %q: %v", path, err) + } + return func() { + _ = unix.Flock(int(f.Fd()), unix.LOCK_UN) + _ = f.Close() + }, nil +} diff --git a/internal/cmd/spawnlock_windows.go b/internal/cmd/spawnlock_windows.go new file mode 100644 index 0000000000000000000000000000000000000000..d3e7492b229ac4bb5b3eca815711d5bc14ddcf0c --- /dev/null +++ b/internal/cmd/spawnlock_windows.go @@ -0,0 +1,32 @@ +//go:build windows + +package cmd + +import ( + "fmt" + "math" + "os" + + "golang.org/x/sys/windows" +) + +// acquireSpawnLock takes an exclusive lock on the given file (creating +// it if necessary) using LockFileEx, and returns a release function +// that unlocks and closes the file. Blocks until the lock is acquired. +func acquireSpawnLock(path string) (func(), error) { + f, err := os.OpenFile(path, os.O_RDWR|os.O_CREATE, 0o600) + if err != nil { + return nil, fmt.Errorf("open spawn lock %q: %v", path, err) + } + h := windows.Handle(f.Fd()) + ol := new(windows.Overlapped) + if err := windows.LockFileEx(h, windows.LOCKFILE_EXCLUSIVE_LOCK, 0, math.MaxUint32, math.MaxUint32, ol); err != nil { + _ = f.Close() + return nil, fmt.Errorf("LockFileEx spawn lock %q: %v", path, err) + } + return func() { + ol := new(windows.Overlapped) + _ = windows.UnlockFileEx(windows.Handle(f.Fd()), 0, math.MaxUint32, math.MaxUint32, ol) + _ = f.Close() + }, nil +}