1// Package clientserverrace_test is a regression test for the
2// CRUSH_CLIENT_SERVER=1 socket-init race documented in
3// docs/notes/2026-05-11-client-server-socket-init-race.md (item F5).
4//
5// It lives in its own directory so it can build even if other test
6// files in internal/cmd are temporarily broken — this test only needs
7// the binary, not the cmd package.
8package clientserverrace_test
9
10import (
11 "context"
12 "fmt"
13 "net"
14 "net/http"
15 "os"
16 "os/exec"
17 "path/filepath"
18 "runtime"
19 "strings"
20 "sync"
21 "testing"
22 "time"
23)
24
25// readinessErrSubstr is the user-visible error string emitted by
26// ensureServer when it gives up waiting for the server socket /
27// readiness probe (internal/cmd/root.go). Seeing this in any client's
28// output means the race fired.
29const readinessErrSubstr = "failed to initialize crush server"
30
31// numClients is intentionally larger than the typical CPU count to
32// ensure the spawn lock + readiness probe are exercised under
33// contention.
34const numClients = 8
35
36// clientTimeout bounds each child invocation. It only needs to be long
37// enough for the spawn-and-readiness phase to complete on a cold cache;
38// after that, the client may legitimately keep running (e.g.
39// subscribing to server events) and we'll cancel it. The race we care
40// about is observable strictly within ensureServer.
41const clientTimeout = 15 * time.Second
42
43func TestClientServerSpawnRace(t *testing.T) {
44 if testing.Short() {
45 t.Skip("skipping client/server spawn race test in -short mode")
46 }
47 // The race and its fix are unix-socket specific. Windows uses
48 // named pipes via a different code path; not covered here.
49 if runtime.GOOS == "windows" {
50 t.Skip("skipping unix-socket specific race test on windows")
51 }
52 if _, err := exec.LookPath("go"); err != nil {
53 t.Skip("skipping: 'go' not available on PATH")
54 }
55
56 repoRoot := repoRootFromTest(t)
57 bin := buildCrushBinary(t, repoRoot)
58
59 // Use /tmp directly so the unix socket path stays under the
60 // 104-char sockaddr_un limit on darwin. t.TempDir() can return a
61 // path inside /var/folders/... that is too long.
62 runDir, err := os.MkdirTemp("/tmp", "crush-race-")
63 if err != nil {
64 t.Fatalf("mkdtemp: %v", err)
65 }
66 t.Cleanup(func() { _ = os.RemoveAll(runDir) })
67
68 socketPath := filepath.Join(runDir, "crush.sock")
69 host := "unix://" + socketPath
70
71 // Fresh, isolated XDG/HOME so we don't touch the user's real
72 // state or any other test's cache. These all live under runDir
73 // so cleanup is one RemoveAll.
74 cacheHome := filepath.Join(runDir, "cache")
75 dataHome := filepath.Join(runDir, "data")
76 configHome := filepath.Join(runDir, "config")
77 homeDir := filepath.Join(runDir, "home")
78 for _, d := range []string{cacheHome, dataHome, configHome, homeDir} {
79 if err := os.MkdirAll(d, 0o700); err != nil {
80 t.Fatalf("mkdir %s: %v", d, err)
81 }
82 }
83
84 env := append(os.Environ(),
85 "CRUSH_CLIENT_SERVER=1",
86 "XDG_CACHE_HOME="+cacheHome,
87 "XDG_DATA_HOME="+dataHome,
88 "XDG_CONFIG_HOME="+configHome,
89 "HOME="+homeDir,
90 // Belt-and-suspenders: if anything tries to talk to a real
91 // provider, fail loudly rather than make a network call.
92 "CRUSH_DISABLE_PROVIDER_AUTO_UPDATE=1",
93 )
94
95 // Make sure no server is up before we start.
96 if _, err := os.Stat(socketPath); err == nil {
97 t.Fatalf("socket %s exists before test started", socketPath)
98 }
99
100 // Always try to shut down any server we spawned, regardless of
101 // outcome.
102 t.Cleanup(func() { shutdownServer(t, socketPath) })
103
104 type result struct {
105 idx int
106 stdout string
107 stderr string
108 }
109 results := make(chan result, numClients)
110
111 var wg sync.WaitGroup
112 start := make(chan struct{})
113 for i := range numClients {
114 wg.Add(1)
115 go func(i int) {
116 defer wg.Done()
117
118 // Each client gets its own working directory so the
119 // per-client workspace registration paths don't collide
120 // in confusing ways.
121 cwd := filepath.Join(runDir, fmt.Sprintf("ws-%d", i))
122 if err := os.MkdirAll(cwd, 0o700); err != nil {
123 results <- result{idx: i, stderr: fmt.Sprintf("mkdir cwd: %v", err)}
124 return
125 }
126
127 ctx, cancel := context.WithTimeout(context.Background(), clientTimeout)
128 defer cancel()
129
130 // `crush run` exercises connectToServer (which is where
131 // the readiness race lives). On a fresh sandbox the
132 // command may legitimately keep running past the race
133 // (e.g. waiting on event subscriptions); the context
134 // timeout above bounds that. We assert race outcomes
135 // purely from output, not exit code.
136 c := exec.CommandContext(ctx, bin,
137 "--host", host,
138 "--cwd", cwd,
139 "run", "hi",
140 )
141 c.Env = env
142 var outBuf, errBuf strings.Builder
143 c.Stdout = &outBuf
144 c.Stderr = &errBuf
145
146 <-start
147 _ = c.Run()
148 results <- result{
149 idx: i,
150 stdout: outBuf.String(),
151 stderr: errBuf.String(),
152 }
153 }(i)
154 }
155
156 close(start) // release all clients as simultaneously as possible
157 wg.Wait()
158 close(results)
159
160 var raceFailures []string
161 for r := range results {
162 if strings.Contains(r.stderr, readinessErrSubstr) ||
163 strings.Contains(r.stdout, readinessErrSubstr) {
164 raceFailures = append(raceFailures, fmt.Sprintf(
165 "client %d: readiness error in output\nstderr:\n%s\nstdout:\n%s",
166 r.idx, r.stderr, r.stdout,
167 ))
168 }
169 }
170
171 if len(raceFailures) > 0 {
172 t.Fatalf("client/server spawn race regressed: %d/%d clients failed\n\n%s",
173 len(raceFailures), numClients,
174 strings.Join(raceFailures, "\n---\n"),
175 )
176 }
177
178 // Positive sanity check: a single live server should be
179 // reachable on the socket. If the race had fired, we'd either
180 // have no socket or a stale one with no listener.
181 if _, err := os.Stat(socketPath); err != nil {
182 t.Fatalf("expected socket %s to exist after parallel clients ran, got: %v",
183 socketPath, err)
184 }
185 if err := pingHealth(socketPath); err != nil {
186 t.Fatalf("server is not responding on %s: %v", socketPath, err)
187 }
188}
189
190// pingHealth issues a single GET /v1/health over the unix socket and
191// requires a 2xx response.
192func pingHealth(socketPath string) error {
193 tr := &http.Transport{
194 DialContext: func(ctx context.Context, _, _ string) (net.Conn, error) {
195 var d net.Dialer
196 return d.DialContext(ctx, "unix", socketPath)
197 },
198 }
199 defer tr.CloseIdleConnections()
200 hc := &http.Client{Transport: tr, Timeout: 2 * time.Second}
201
202 ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
203 defer cancel()
204 req, err := http.NewRequestWithContext(ctx, http.MethodGet,
205 "http://crush.local/v1/health", nil)
206 if err != nil {
207 return err
208 }
209 rsp, err := hc.Do(req)
210 if err != nil {
211 return err
212 }
213 defer rsp.Body.Close()
214 if rsp.StatusCode < 200 || rsp.StatusCode >= 300 {
215 return fmt.Errorf("health check returned %s", rsp.Status)
216 }
217 return nil
218}
219
220// repoRootFromTest walks up from this test file's directory to find
221// the repo root (the directory containing go.mod). Walking up by a
222// fixed count is fragile across reorganisations.
223func repoRootFromTest(t *testing.T) string {
224 t.Helper()
225 cwd, err := os.Getwd()
226 if err != nil {
227 t.Fatalf("getwd: %v", err)
228 }
229 dir := cwd
230 for {
231 if _, err := os.Stat(filepath.Join(dir, "go.mod")); err == nil {
232 return dir
233 }
234 parent := filepath.Dir(dir)
235 if parent == dir {
236 t.Fatalf("could not find go.mod walking up from %s", cwd)
237 }
238 dir = parent
239 }
240}
241
242// buildCrushBinary builds the crush binary once at the start of the
243// test and returns the absolute path. Subsequent t.Cleanup removes
244// the built artefact.
245func buildCrushBinary(t *testing.T, repoRoot string) string {
246 t.Helper()
247
248 binDir, err := os.MkdirTemp("", "crush-race-bin-")
249 if err != nil {
250 t.Fatalf("mkdtemp bin: %v", err)
251 }
252 t.Cleanup(func() { _ = os.RemoveAll(binDir) })
253
254 binPath := filepath.Join(binDir, "crush")
255
256 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
257 defer cancel()
258
259 cmd := exec.CommandContext(ctx, "go", "build", "-o", binPath, ".")
260 cmd.Dir = repoRoot
261 // Match the project's standard build flags. CGO_ENABLED=0 keeps
262 // the binary statically linked and avoids surprising the test on
263 // hosts without a C toolchain.
264 cmd.Env = append(os.Environ(), "CGO_ENABLED=0")
265 out, err := cmd.CombinedOutput()
266 if err != nil {
267 t.Fatalf("go build crush: %v\n%s", err, out)
268 }
269 return binPath
270}
271
272// shutdownServer best-effort terminates any crush server bound to
273// socketPath by POSTing to /v1/control. We don't import the project's
274// own client package to keep this test free of internal API churn.
275func shutdownServer(t *testing.T, socketPath string) {
276 t.Helper()
277 if _, err := os.Stat(socketPath); err != nil {
278 return
279 }
280
281 tr := &http.Transport{
282 DialContext: func(ctx context.Context, _, _ string) (net.Conn, error) {
283 var d net.Dialer
284 return d.DialContext(ctx, "unix", socketPath)
285 },
286 }
287 hc := &http.Client{Transport: tr, Timeout: 5 * time.Second}
288 defer tr.CloseIdleConnections()
289
290 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
291 defer cancel()
292
293 body := strings.NewReader(`{"command":"shutdown"}`)
294 req, err := http.NewRequestWithContext(ctx, http.MethodPost,
295 "http://crush.local/v1/control", body)
296 if err != nil {
297 t.Logf("shutdown: build request: %v", err)
298 return
299 }
300 req.Header.Set("Content-Type", "application/json")
301
302 resp, err := hc.Do(req)
303 if err != nil {
304 // Server may already be gone — not an error.
305 t.Logf("shutdown: %v (probably already exited)", err)
306 return
307 }
308 _ = resp.Body.Close()
309
310 // Wait briefly for the socket to disappear so the next test
311 // using the same path doesn't race.
312 deadline := time.Now().Add(5 * time.Second)
313 for time.Now().Before(deadline) {
314 if _, err := os.Stat(socketPath); err != nil {
315 return
316 }
317 time.Sleep(50 * time.Millisecond)
318 }
319}