race_test.go

  1// Package clientserverrace_test is a regression test for the
  2// CRUSH_CLIENT_SERVER=1 socket-init race documented in
  3// docs/notes/2026-05-11-client-server-socket-init-race.md (item F5).
  4//
  5// It lives in its own directory so it can build even if other test
  6// files in internal/cmd are temporarily broken — this test only needs
  7// the binary, not the cmd package.
  8package clientserverrace_test
  9
 10import (
 11	"context"
 12	"fmt"
 13	"net"
 14	"net/http"
 15	"os"
 16	"os/exec"
 17	"path/filepath"
 18	"runtime"
 19	"strings"
 20	"sync"
 21	"sync/atomic"
 22	"testing"
 23	"time"
 24)
 25
 26// readinessErrSubstr is the user-visible error string emitted by
 27// ensureServer when it gives up waiting for the server socket /
 28// readiness probe (internal/cmd/root.go). Seeing this in any client's
 29// output means the race fired.
 30const readinessErrSubstr = "failed to initialize crush server"
 31
 32// numClients is intentionally larger than the typical CPU count to
 33// ensure the spawn lock + readiness probe are exercised under
 34// contention.
 35const numClients = 8
 36
 37// clientTimeout bounds each child invocation. It only needs to be long
 38// enough for the spawn-and-readiness phase to complete on a cold cache;
 39// after that, the client may legitimately keep running (e.g.
 40// subscribing to server events) and we'll cancel it. The race we care
 41// about is observable strictly within ensureServer.
 42const clientTimeout = 15 * time.Second
 43
 44func TestClientServerSpawnRace(t *testing.T) {
 45	if testing.Short() {
 46		t.Skip("skipping client/server spawn race test in -short mode")
 47	}
 48	// The race and its fix are unix-socket specific. Windows uses
 49	// named pipes via a different code path; not covered here.
 50	if runtime.GOOS == "windows" {
 51		t.Skip("skipping unix-socket specific race test on windows")
 52	}
 53	if _, err := exec.LookPath("go"); err != nil {
 54		t.Skip("skipping: 'go' not available on PATH")
 55	}
 56
 57	repoRoot := repoRootFromTest(t)
 58	bin := buildCrushBinary(t, repoRoot)
 59
 60	// Use /tmp directly so the unix socket path stays under the
 61	// 104-char sockaddr_un limit on darwin. t.TempDir() can return a
 62	// path inside /var/folders/... that is too long.
 63	runDir, err := os.MkdirTemp("/tmp", "crush-race-")
 64	if err != nil {
 65		t.Fatalf("mkdtemp: %v", err)
 66	}
 67	t.Cleanup(func() { _ = os.RemoveAll(runDir) })
 68
 69	socketPath := filepath.Join(runDir, "crush.sock")
 70	host := "unix://" + socketPath
 71
 72	// Fresh, isolated XDG/HOME so we don't touch the user's real
 73	// state or any other test's cache. These all live under runDir
 74	// so cleanup is one RemoveAll.
 75	cacheHome := filepath.Join(runDir, "cache")
 76	dataHome := filepath.Join(runDir, "data")
 77	configHome := filepath.Join(runDir, "config")
 78	homeDir := filepath.Join(runDir, "home")
 79	for _, d := range []string{cacheHome, dataHome, configHome, homeDir} {
 80		if err := os.MkdirAll(d, 0o700); err != nil {
 81			t.Fatalf("mkdir %s: %v", d, err)
 82		}
 83	}
 84
 85	env := append(
 86		os.Environ(),
 87		"CRUSH_CLIENT_SERVER=1",
 88		"XDG_CACHE_HOME="+cacheHome,
 89		"XDG_DATA_HOME="+dataHome,
 90		"XDG_CONFIG_HOME="+configHome,
 91		"HOME="+homeDir,
 92		// Belt-and-suspenders: if anything tries to talk to a real
 93		// provider, fail loudly rather than make a network call.
 94		"CRUSH_DISABLE_PROVIDER_AUTO_UPDATE=1",
 95	)
 96
 97	// Make sure no server is up before we start.
 98	if _, err := os.Stat(socketPath); err == nil {
 99		t.Fatalf("socket %s exists before test started", socketPath)
100	}
101
102	// Always try to shut down any server we spawned, regardless of
103	// outcome.
104	t.Cleanup(func() { shutdownServer(t, socketPath) })
105
106	type result struct {
107		idx    int
108		stdout string
109		stderr string
110	}
111	results := make(chan result, numClients)
112
113	// Probe /v1/health concurrently while the clients are still
114	// running. The server self-shuts-down when the last workspace is
115	// released (internal/backend/backend.go:DeleteWorkspace), so once
116	// all clients exit cleanly the socket may legitimately be gone —
117	// asserting the socket post-hoc would race with that documented
118	// self-shutdown. Instead we require that during the parallel run
119	// at least one /v1/health probe got a 2xx, which proves the
120	// spawn-and-readiness path actually produced a live server.
121	var sawHealthy atomic.Bool
122	probeDone := make(chan struct{})
123	stopProbe := make(chan struct{})
124
125	var wg sync.WaitGroup
126	start := make(chan struct{})
127
128	go func() {
129		defer close(probeDone)
130		<-start
131		deadline := time.Now().Add(clientTimeout)
132		for time.Now().Before(deadline) {
133			select {
134			case <-stopProbe:
135				return
136			default:
137			}
138			if err := pingHealth(socketPath); err == nil {
139				sawHealthy.Store(true)
140				return
141			}
142			select {
143			case <-stopProbe:
144				return
145			case <-time.After(50 * time.Millisecond):
146			}
147		}
148	}()
149
150	for i := range numClients {
151		wg.Add(1)
152		go func(i int) {
153			defer wg.Done()
154
155			// Each client gets its own working directory so the
156			// per-client workspace registration paths don't collide
157			// in confusing ways.
158			cwd := filepath.Join(runDir, fmt.Sprintf("ws-%d", i))
159			if err := os.MkdirAll(cwd, 0o700); err != nil {
160				results <- result{idx: i, stderr: fmt.Sprintf("mkdir cwd: %v", err)}
161				return
162			}
163
164			ctx, cancel := context.WithTimeout(context.Background(), clientTimeout)
165			defer cancel()
166
167			// `crush run` exercises connectToServer (which is where
168			// the readiness race lives). On a fresh sandbox the
169			// command may legitimately keep running past the race
170			// (e.g. waiting on event subscriptions); the context
171			// timeout above bounds that. We assert race outcomes
172			// purely from output, not exit code.
173			c := exec.CommandContext(
174				ctx, bin,
175				"--host", host,
176				"--cwd", cwd,
177				"run", "hi",
178			)
179			c.Env = env
180			var outBuf, errBuf strings.Builder
181			c.Stdout = &outBuf
182			c.Stderr = &errBuf
183
184			<-start
185			_ = c.Run()
186			results <- result{
187				idx:    i,
188				stdout: outBuf.String(),
189				stderr: errBuf.String(),
190			}
191		}(i)
192	}
193
194	close(start) // release all clients as simultaneously as possible
195	wg.Wait()
196	close(results)
197	close(stopProbe)
198	<-probeDone
199
200	var raceFailures []string
201	for r := range results {
202		if strings.Contains(r.stderr, readinessErrSubstr) ||
203			strings.Contains(r.stdout, readinessErrSubstr) {
204			raceFailures = append(raceFailures, fmt.Sprintf(
205				"client %d: readiness error in output\nstderr:\n%s\nstdout:\n%s",
206				r.idx, r.stderr, r.stdout,
207			))
208		}
209	}
210
211	if len(raceFailures) > 0 {
212		t.Fatalf(
213			"client/server spawn race regressed: %d/%d clients failed\n\n%s",
214			len(raceFailures), numClients,
215			strings.Join(raceFailures, "\n---\n"),
216		)
217	}
218
219	// Positive sanity check: at some point during the parallel run a
220	// /v1/health probe must have succeeded. We deliberately do *not*
221	// stat the socket post-hoc: when every client returns cleanly
222	// (e.g. exits early because no providers are configured), the
223	// last DeleteWorkspace triggers the server's self-shutdown and
224	// the socket disappears. That is correct behaviour, not a race
225	// regression.
226	if !sawHealthy.Load() {
227		t.Fatalf("no /v1/health probe succeeded on %s while %d clients were running",
228			socketPath, numClients)
229	}
230}
231
232// pingHealth issues a single GET /v1/health over the unix socket and
233// requires a 2xx response.
234func pingHealth(socketPath string) error {
235	tr := &http.Transport{
236		DialContext: func(ctx context.Context, _, _ string) (net.Conn, error) {
237			var d net.Dialer
238			return d.DialContext(ctx, "unix", socketPath)
239		},
240	}
241	defer tr.CloseIdleConnections()
242	hc := &http.Client{Transport: tr, Timeout: 2 * time.Second}
243
244	ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
245	defer cancel()
246	req, err := http.NewRequestWithContext(ctx, http.MethodGet,
247		"http://crush.local/v1/health", nil)
248	if err != nil {
249		return err
250	}
251	rsp, err := hc.Do(req)
252	if err != nil {
253		return err
254	}
255	defer rsp.Body.Close()
256	if rsp.StatusCode < 200 || rsp.StatusCode >= 300 {
257		return fmt.Errorf("health check returned %s", rsp.Status)
258	}
259	return nil
260}
261
262// repoRootFromTest walks up from this test file's directory to find
263// the repo root (the directory containing go.mod). Walking up by a
264// fixed count is fragile across reorganisations.
265func repoRootFromTest(t *testing.T) string {
266	t.Helper()
267	cwd, err := os.Getwd()
268	if err != nil {
269		t.Fatalf("getwd: %v", err)
270	}
271	dir := cwd
272	for {
273		if _, err := os.Stat(filepath.Join(dir, "go.mod")); err == nil {
274			return dir
275		}
276		parent := filepath.Dir(dir)
277		if parent == dir {
278			t.Fatalf("could not find go.mod walking up from %s", cwd)
279		}
280		dir = parent
281	}
282}
283
284// buildCrushBinary builds the crush binary once at the start of the
285// test and returns the absolute path. Subsequent t.Cleanup removes
286// the built artefact.
287func buildCrushBinary(t *testing.T, repoRoot string) string {
288	t.Helper()
289
290	binDir, err := os.MkdirTemp("", "crush-race-bin-")
291	if err != nil {
292		t.Fatalf("mkdtemp bin: %v", err)
293	}
294	t.Cleanup(func() { _ = os.RemoveAll(binDir) })
295
296	binPath := filepath.Join(binDir, "crush")
297
298	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
299	defer cancel()
300
301	cmd := exec.CommandContext(ctx, "go", "build", "-o", binPath, ".")
302	cmd.Dir = repoRoot
303	// Match the project's standard build flags. CGO_ENABLED=0 keeps
304	// the binary statically linked and avoids surprising the test on
305	// hosts without a C toolchain.
306	cmd.Env = append(os.Environ(), "CGO_ENABLED=0")
307	out, err := cmd.CombinedOutput()
308	if err != nil {
309		t.Fatalf("go build crush: %v\n%s", err, out)
310	}
311	return binPath
312}
313
314// shutdownServer best-effort terminates any crush server bound to
315// socketPath by POSTing to /v1/control. We don't import the project's
316// own client package to keep this test free of internal API churn.
317func shutdownServer(t *testing.T, socketPath string) {
318	t.Helper()
319	if _, err := os.Stat(socketPath); err != nil {
320		return
321	}
322
323	tr := &http.Transport{
324		DialContext: func(ctx context.Context, _, _ string) (net.Conn, error) {
325			var d net.Dialer
326			return d.DialContext(ctx, "unix", socketPath)
327		},
328	}
329	hc := &http.Client{Transport: tr, Timeout: 5 * time.Second}
330	defer tr.CloseIdleConnections()
331
332	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
333	defer cancel()
334
335	body := strings.NewReader(`{"command":"shutdown"}`)
336	req, err := http.NewRequestWithContext(ctx, http.MethodPost,
337		"http://crush.local/v1/control", body)
338	if err != nil {
339		t.Logf("shutdown: build request: %v", err)
340		return
341	}
342	req.Header.Set("Content-Type", "application/json")
343
344	resp, err := hc.Do(req)
345	if err != nil {
346		// Server may already be gone — not an error.
347		t.Logf("shutdown: %v (probably already exited)", err)
348		return
349	}
350	_ = resp.Body.Close()
351
352	// Wait briefly for the socket to disappear so the next test
353	// using the same path doesn't race.
354	deadline := time.Now().Add(5 * time.Second)
355	for time.Now().Before(deadline) {
356		if _, err := os.Stat(socketPath); err != nil {
357			return
358		}
359		time.Sleep(50 * time.Millisecond)
360	}
361}