1// Package clientserverrace_test is a regression test for the
2// CRUSH_CLIENT_SERVER=1 socket-init race documented in
3// docs/notes/2026-05-11-client-server-socket-init-race.md (item F5).
4//
5// It lives in its own directory so it can build even if other test
6// files in internal/cmd are temporarily broken — this test only needs
7// the binary, not the cmd package.
8package clientserverrace_test
9
10import (
11 "context"
12 "fmt"
13 "net"
14 "net/http"
15 "os"
16 "os/exec"
17 "path/filepath"
18 "runtime"
19 "strings"
20 "sync"
21 "sync/atomic"
22 "testing"
23 "time"
24)
25
26// readinessErrSubstr is the user-visible error string emitted by
27// ensureServer when it gives up waiting for the server socket /
28// readiness probe (internal/cmd/root.go). Seeing this in any client's
29// output means the race fired.
30const readinessErrSubstr = "failed to initialize crush server"
31
32// numClients is intentionally larger than the typical CPU count to
33// ensure the spawn lock + readiness probe are exercised under
34// contention.
35const numClients = 8
36
37// clientTimeout bounds each child invocation. It only needs to be long
38// enough for the spawn-and-readiness phase to complete on a cold cache;
39// after that, the client may legitimately keep running (e.g.
40// subscribing to server events) and we'll cancel it. The race we care
41// about is observable strictly within ensureServer.
42const clientTimeout = 15 * time.Second
43
44func TestClientServerSpawnRace(t *testing.T) {
45 if testing.Short() {
46 t.Skip("skipping client/server spawn race test in -short mode")
47 }
48 // The race and its fix are unix-socket specific. Windows uses
49 // named pipes via a different code path; not covered here.
50 if runtime.GOOS == "windows" {
51 t.Skip("skipping unix-socket specific race test on windows")
52 }
53 if _, err := exec.LookPath("go"); err != nil {
54 t.Skip("skipping: 'go' not available on PATH")
55 }
56
57 repoRoot := repoRootFromTest(t)
58 bin := buildCrushBinary(t, repoRoot)
59
60 // Use /tmp directly so the unix socket path stays under the
61 // 104-char sockaddr_un limit on darwin. t.TempDir() can return a
62 // path inside /var/folders/... that is too long.
63 runDir, err := os.MkdirTemp("/tmp", "crush-race-")
64 if err != nil {
65 t.Fatalf("mkdtemp: %v", err)
66 }
67 t.Cleanup(func() { _ = os.RemoveAll(runDir) })
68
69 socketPath := filepath.Join(runDir, "crush.sock")
70 host := "unix://" + socketPath
71
72 // Fresh, isolated XDG/HOME so we don't touch the user's real
73 // state or any other test's cache. These all live under runDir
74 // so cleanup is one RemoveAll.
75 cacheHome := filepath.Join(runDir, "cache")
76 dataHome := filepath.Join(runDir, "data")
77 configHome := filepath.Join(runDir, "config")
78 homeDir := filepath.Join(runDir, "home")
79 for _, d := range []string{cacheHome, dataHome, configHome, homeDir} {
80 if err := os.MkdirAll(d, 0o700); err != nil {
81 t.Fatalf("mkdir %s: %v", d, err)
82 }
83 }
84
85 env := append(
86 os.Environ(),
87 "CRUSH_CLIENT_SERVER=1",
88 "XDG_CACHE_HOME="+cacheHome,
89 "XDG_DATA_HOME="+dataHome,
90 "XDG_CONFIG_HOME="+configHome,
91 "HOME="+homeDir,
92 // Belt-and-suspenders: if anything tries to talk to a real
93 // provider, fail loudly rather than make a network call.
94 "CRUSH_DISABLE_PROVIDER_AUTO_UPDATE=1",
95 )
96
97 // Make sure no server is up before we start.
98 if _, err := os.Stat(socketPath); err == nil {
99 t.Fatalf("socket %s exists before test started", socketPath)
100 }
101
102 // Always try to shut down any server we spawned, regardless of
103 // outcome.
104 t.Cleanup(func() { shutdownServer(t, socketPath) })
105
106 type result struct {
107 idx int
108 stdout string
109 stderr string
110 }
111 results := make(chan result, numClients)
112
113 // Probe /v1/health concurrently while the clients are still
114 // running. The server self-shuts-down when the last workspace is
115 // released (internal/backend/backend.go:DeleteWorkspace), so once
116 // all clients exit cleanly the socket may legitimately be gone —
117 // asserting the socket post-hoc would race with that documented
118 // self-shutdown. Instead we require that during the parallel run
119 // at least one /v1/health probe got a 2xx, which proves the
120 // spawn-and-readiness path actually produced a live server.
121 var sawHealthy atomic.Bool
122 probeDone := make(chan struct{})
123 stopProbe := make(chan struct{})
124
125 var wg sync.WaitGroup
126 start := make(chan struct{})
127
128 go func() {
129 defer close(probeDone)
130 <-start
131 deadline := time.Now().Add(clientTimeout)
132 for time.Now().Before(deadline) {
133 select {
134 case <-stopProbe:
135 return
136 default:
137 }
138 if err := pingHealth(socketPath); err == nil {
139 sawHealthy.Store(true)
140 return
141 }
142 select {
143 case <-stopProbe:
144 return
145 case <-time.After(50 * time.Millisecond):
146 }
147 }
148 }()
149
150 for i := range numClients {
151 wg.Add(1)
152 go func(i int) {
153 defer wg.Done()
154
155 // Each client gets its own working directory so the
156 // per-client workspace registration paths don't collide
157 // in confusing ways.
158 cwd := filepath.Join(runDir, fmt.Sprintf("ws-%d", i))
159 if err := os.MkdirAll(cwd, 0o700); err != nil {
160 results <- result{idx: i, stderr: fmt.Sprintf("mkdir cwd: %v", err)}
161 return
162 }
163
164 ctx, cancel := context.WithTimeout(context.Background(), clientTimeout)
165 defer cancel()
166
167 // `crush run` exercises connectToServer (which is where
168 // the readiness race lives). On a fresh sandbox the
169 // command may legitimately keep running past the race
170 // (e.g. waiting on event subscriptions); the context
171 // timeout above bounds that. We assert race outcomes
172 // purely from output, not exit code.
173 c := exec.CommandContext(
174 ctx, bin,
175 "--host", host,
176 "--cwd", cwd,
177 "run", "hi",
178 )
179 c.Env = env
180 var outBuf, errBuf strings.Builder
181 c.Stdout = &outBuf
182 c.Stderr = &errBuf
183
184 <-start
185 _ = c.Run()
186 results <- result{
187 idx: i,
188 stdout: outBuf.String(),
189 stderr: errBuf.String(),
190 }
191 }(i)
192 }
193
194 close(start) // release all clients as simultaneously as possible
195 wg.Wait()
196 close(results)
197 close(stopProbe)
198 <-probeDone
199
200 var raceFailures []string
201 for r := range results {
202 if strings.Contains(r.stderr, readinessErrSubstr) ||
203 strings.Contains(r.stdout, readinessErrSubstr) {
204 raceFailures = append(raceFailures, fmt.Sprintf(
205 "client %d: readiness error in output\nstderr:\n%s\nstdout:\n%s",
206 r.idx, r.stderr, r.stdout,
207 ))
208 }
209 }
210
211 if len(raceFailures) > 0 {
212 t.Fatalf(
213 "client/server spawn race regressed: %d/%d clients failed\n\n%s",
214 len(raceFailures), numClients,
215 strings.Join(raceFailures, "\n---\n"),
216 )
217 }
218
219 // Positive sanity check: at some point during the parallel run a
220 // /v1/health probe must have succeeded. We deliberately do *not*
221 // stat the socket post-hoc: when every client returns cleanly
222 // (e.g. exits early because no providers are configured), the
223 // last DeleteWorkspace triggers the server's self-shutdown and
224 // the socket disappears. That is correct behaviour, not a race
225 // regression.
226 if !sawHealthy.Load() {
227 t.Fatalf("no /v1/health probe succeeded on %s while %d clients were running",
228 socketPath, numClients)
229 }
230}
231
232// pingHealth issues a single GET /v1/health over the unix socket and
233// requires a 2xx response.
234func pingHealth(socketPath string) error {
235 tr := &http.Transport{
236 DialContext: func(ctx context.Context, _, _ string) (net.Conn, error) {
237 var d net.Dialer
238 return d.DialContext(ctx, "unix", socketPath)
239 },
240 }
241 defer tr.CloseIdleConnections()
242 hc := &http.Client{Transport: tr, Timeout: 2 * time.Second}
243
244 ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
245 defer cancel()
246 req, err := http.NewRequestWithContext(ctx, http.MethodGet,
247 "http://crush.local/v1/health", nil)
248 if err != nil {
249 return err
250 }
251 rsp, err := hc.Do(req)
252 if err != nil {
253 return err
254 }
255 defer rsp.Body.Close()
256 if rsp.StatusCode < 200 || rsp.StatusCode >= 300 {
257 return fmt.Errorf("health check returned %s", rsp.Status)
258 }
259 return nil
260}
261
262// repoRootFromTest walks up from this test file's directory to find
263// the repo root (the directory containing go.mod). Walking up by a
264// fixed count is fragile across reorganisations.
265func repoRootFromTest(t *testing.T) string {
266 t.Helper()
267 cwd, err := os.Getwd()
268 if err != nil {
269 t.Fatalf("getwd: %v", err)
270 }
271 dir := cwd
272 for {
273 if _, err := os.Stat(filepath.Join(dir, "go.mod")); err == nil {
274 return dir
275 }
276 parent := filepath.Dir(dir)
277 if parent == dir {
278 t.Fatalf("could not find go.mod walking up from %s", cwd)
279 }
280 dir = parent
281 }
282}
283
284// buildCrushBinary builds the crush binary once at the start of the
285// test and returns the absolute path. Subsequent t.Cleanup removes
286// the built artefact.
287func buildCrushBinary(t *testing.T, repoRoot string) string {
288 t.Helper()
289
290 binDir, err := os.MkdirTemp("", "crush-race-bin-")
291 if err != nil {
292 t.Fatalf("mkdtemp bin: %v", err)
293 }
294 t.Cleanup(func() { _ = os.RemoveAll(binDir) })
295
296 binPath := filepath.Join(binDir, "crush")
297
298 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
299 defer cancel()
300
301 cmd := exec.CommandContext(ctx, "go", "build", "-o", binPath, ".")
302 cmd.Dir = repoRoot
303 // Match the project's standard build flags. CGO_ENABLED=0 keeps
304 // the binary statically linked and avoids surprising the test on
305 // hosts without a C toolchain.
306 cmd.Env = append(os.Environ(), "CGO_ENABLED=0")
307 out, err := cmd.CombinedOutput()
308 if err != nil {
309 t.Fatalf("go build crush: %v\n%s", err, out)
310 }
311 return binPath
312}
313
314// shutdownServer best-effort terminates any crush server bound to
315// socketPath by POSTing to /v1/control. We don't import the project's
316// own client package to keep this test free of internal API churn.
317func shutdownServer(t *testing.T, socketPath string) {
318 t.Helper()
319 if _, err := os.Stat(socketPath); err != nil {
320 return
321 }
322
323 tr := &http.Transport{
324 DialContext: func(ctx context.Context, _, _ string) (net.Conn, error) {
325 var d net.Dialer
326 return d.DialContext(ctx, "unix", socketPath)
327 },
328 }
329 hc := &http.Client{Transport: tr, Timeout: 5 * time.Second}
330 defer tr.CloseIdleConnections()
331
332 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
333 defer cancel()
334
335 body := strings.NewReader(`{"command":"shutdown"}`)
336 req, err := http.NewRequestWithContext(ctx, http.MethodPost,
337 "http://crush.local/v1/control", body)
338 if err != nil {
339 t.Logf("shutdown: build request: %v", err)
340 return
341 }
342 req.Header.Set("Content-Type", "application/json")
343
344 resp, err := hc.Do(req)
345 if err != nil {
346 // Server may already be gone — not an error.
347 t.Logf("shutdown: %v (probably already exited)", err)
348 return
349 }
350 _ = resp.Body.Close()
351
352 // Wait briefly for the socket to disappear so the next test
353 // using the same path doesn't race.
354 deadline := time.Now().Add(5 * time.Second)
355 for time.Now().Before(deadline) {
356 if _, err := os.Stat(socketPath); err != nil {
357 return
358 }
359 time.Sleep(50 * time.Millisecond)
360 }
361}