race_test.go

  1// Package clientserverrace_test is a regression test for the
  2// CRUSH_CLIENT_SERVER=1 socket-init race documented in
  3// docs/notes/2026-05-11-client-server-socket-init-race.md (item F5).
  4//
  5// It lives in its own directory so it can build even if other test
  6// files in internal/cmd are temporarily broken — this test only needs
  7// the binary, not the cmd package.
  8package clientserverrace_test
  9
 10import (
 11	"context"
 12	"fmt"
 13	"net"
 14	"net/http"
 15	"os"
 16	"os/exec"
 17	"path/filepath"
 18	"runtime"
 19	"strings"
 20	"sync"
 21	"testing"
 22	"time"
 23)
 24
 25// readinessErrSubstr is the user-visible error string emitted by
 26// ensureServer when it gives up waiting for the server socket /
 27// readiness probe (internal/cmd/root.go). Seeing this in any client's
 28// output means the race fired.
 29const readinessErrSubstr = "failed to initialize crush server"
 30
 31// numClients is intentionally larger than the typical CPU count to
 32// ensure the spawn lock + readiness probe are exercised under
 33// contention.
 34const numClients = 8
 35
 36// clientTimeout bounds each child invocation. It only needs to be long
 37// enough for the spawn-and-readiness phase to complete on a cold cache;
 38// after that, the client may legitimately keep running (e.g.
 39// subscribing to server events) and we'll cancel it. The race we care
 40// about is observable strictly within ensureServer.
 41const clientTimeout = 15 * time.Second
 42
 43func TestClientServerSpawnRace(t *testing.T) {
 44	if testing.Short() {
 45		t.Skip("skipping client/server spawn race test in -short mode")
 46	}
 47	// The race and its fix are unix-socket specific. Windows uses
 48	// named pipes via a different code path; not covered here.
 49	if runtime.GOOS == "windows" {
 50		t.Skip("skipping unix-socket specific race test on windows")
 51	}
 52	if _, err := exec.LookPath("go"); err != nil {
 53		t.Skip("skipping: 'go' not available on PATH")
 54	}
 55
 56	repoRoot := repoRootFromTest(t)
 57	bin := buildCrushBinary(t, repoRoot)
 58
 59	// Use /tmp directly so the unix socket path stays under the
 60	// 104-char sockaddr_un limit on darwin. t.TempDir() can return a
 61	// path inside /var/folders/... that is too long.
 62	runDir, err := os.MkdirTemp("/tmp", "crush-race-")
 63	if err != nil {
 64		t.Fatalf("mkdtemp: %v", err)
 65	}
 66	t.Cleanup(func() { _ = os.RemoveAll(runDir) })
 67
 68	socketPath := filepath.Join(runDir, "crush.sock")
 69	host := "unix://" + socketPath
 70
 71	// Fresh, isolated XDG/HOME so we don't touch the user's real
 72	// state or any other test's cache. These all live under runDir
 73	// so cleanup is one RemoveAll.
 74	cacheHome := filepath.Join(runDir, "cache")
 75	dataHome := filepath.Join(runDir, "data")
 76	configHome := filepath.Join(runDir, "config")
 77	homeDir := filepath.Join(runDir, "home")
 78	for _, d := range []string{cacheHome, dataHome, configHome, homeDir} {
 79		if err := os.MkdirAll(d, 0o700); err != nil {
 80			t.Fatalf("mkdir %s: %v", d, err)
 81		}
 82	}
 83
 84	env := append(os.Environ(),
 85		"CRUSH_CLIENT_SERVER=1",
 86		"XDG_CACHE_HOME="+cacheHome,
 87		"XDG_DATA_HOME="+dataHome,
 88		"XDG_CONFIG_HOME="+configHome,
 89		"HOME="+homeDir,
 90		// Belt-and-suspenders: if anything tries to talk to a real
 91		// provider, fail loudly rather than make a network call.
 92		"CRUSH_DISABLE_PROVIDER_AUTO_UPDATE=1",
 93	)
 94
 95	// Make sure no server is up before we start.
 96	if _, err := os.Stat(socketPath); err == nil {
 97		t.Fatalf("socket %s exists before test started", socketPath)
 98	}
 99
100	// Always try to shut down any server we spawned, regardless of
101	// outcome.
102	t.Cleanup(func() { shutdownServer(t, socketPath) })
103
104	type result struct {
105		idx    int
106		stdout string
107		stderr string
108	}
109	results := make(chan result, numClients)
110
111	var wg sync.WaitGroup
112	start := make(chan struct{})
113	for i := range numClients {
114		wg.Add(1)
115		go func(i int) {
116			defer wg.Done()
117
118			// Each client gets its own working directory so the
119			// per-client workspace registration paths don't collide
120			// in confusing ways.
121			cwd := filepath.Join(runDir, fmt.Sprintf("ws-%d", i))
122			if err := os.MkdirAll(cwd, 0o700); err != nil {
123				results <- result{idx: i, stderr: fmt.Sprintf("mkdir cwd: %v", err)}
124				return
125			}
126
127			ctx, cancel := context.WithTimeout(context.Background(), clientTimeout)
128			defer cancel()
129
130			// `crush run` exercises connectToServer (which is where
131			// the readiness race lives). On a fresh sandbox the
132			// command may legitimately keep running past the race
133			// (e.g. waiting on event subscriptions); the context
134			// timeout above bounds that. We assert race outcomes
135			// purely from output, not exit code.
136			c := exec.CommandContext(ctx, bin,
137				"--host", host,
138				"--cwd", cwd,
139				"run", "hi",
140			)
141			c.Env = env
142			var outBuf, errBuf strings.Builder
143			c.Stdout = &outBuf
144			c.Stderr = &errBuf
145
146			<-start
147			_ = c.Run()
148			results <- result{
149				idx:    i,
150				stdout: outBuf.String(),
151				stderr: errBuf.String(),
152			}
153		}(i)
154	}
155
156	close(start) // release all clients as simultaneously as possible
157	wg.Wait()
158	close(results)
159
160	var raceFailures []string
161	for r := range results {
162		if strings.Contains(r.stderr, readinessErrSubstr) ||
163			strings.Contains(r.stdout, readinessErrSubstr) {
164			raceFailures = append(raceFailures, fmt.Sprintf(
165				"client %d: readiness error in output\nstderr:\n%s\nstdout:\n%s",
166				r.idx, r.stderr, r.stdout,
167			))
168		}
169	}
170
171	if len(raceFailures) > 0 {
172		t.Fatalf("client/server spawn race regressed: %d/%d clients failed\n\n%s",
173			len(raceFailures), numClients,
174			strings.Join(raceFailures, "\n---\n"),
175		)
176	}
177
178	// Positive sanity check: a single live server should be
179	// reachable on the socket. If the race had fired, we'd either
180	// have no socket or a stale one with no listener.
181	if _, err := os.Stat(socketPath); err != nil {
182		t.Fatalf("expected socket %s to exist after parallel clients ran, got: %v",
183			socketPath, err)
184	}
185	if err := pingHealth(socketPath); err != nil {
186		t.Fatalf("server is not responding on %s: %v", socketPath, err)
187	}
188}
189
190// pingHealth issues a single GET /v1/health over the unix socket and
191// requires a 2xx response.
192func pingHealth(socketPath string) error {
193	tr := &http.Transport{
194		DialContext: func(ctx context.Context, _, _ string) (net.Conn, error) {
195			var d net.Dialer
196			return d.DialContext(ctx, "unix", socketPath)
197		},
198	}
199	defer tr.CloseIdleConnections()
200	hc := &http.Client{Transport: tr, Timeout: 2 * time.Second}
201
202	ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
203	defer cancel()
204	req, err := http.NewRequestWithContext(ctx, http.MethodGet,
205		"http://crush.local/v1/health", nil)
206	if err != nil {
207		return err
208	}
209	rsp, err := hc.Do(req)
210	if err != nil {
211		return err
212	}
213	defer rsp.Body.Close()
214	if rsp.StatusCode < 200 || rsp.StatusCode >= 300 {
215		return fmt.Errorf("health check returned %s", rsp.Status)
216	}
217	return nil
218}
219
220// repoRootFromTest walks up from this test file's directory to find
221// the repo root (the directory containing go.mod). Walking up by a
222// fixed count is fragile across reorganisations.
223func repoRootFromTest(t *testing.T) string {
224	t.Helper()
225	cwd, err := os.Getwd()
226	if err != nil {
227		t.Fatalf("getwd: %v", err)
228	}
229	dir := cwd
230	for {
231		if _, err := os.Stat(filepath.Join(dir, "go.mod")); err == nil {
232			return dir
233		}
234		parent := filepath.Dir(dir)
235		if parent == dir {
236			t.Fatalf("could not find go.mod walking up from %s", cwd)
237		}
238		dir = parent
239	}
240}
241
242// buildCrushBinary builds the crush binary once at the start of the
243// test and returns the absolute path. Subsequent t.Cleanup removes
244// the built artefact.
245func buildCrushBinary(t *testing.T, repoRoot string) string {
246	t.Helper()
247
248	binDir, err := os.MkdirTemp("", "crush-race-bin-")
249	if err != nil {
250		t.Fatalf("mkdtemp bin: %v", err)
251	}
252	t.Cleanup(func() { _ = os.RemoveAll(binDir) })
253
254	binPath := filepath.Join(binDir, "crush")
255
256	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
257	defer cancel()
258
259	cmd := exec.CommandContext(ctx, "go", "build", "-o", binPath, ".")
260	cmd.Dir = repoRoot
261	// Match the project's standard build flags. CGO_ENABLED=0 keeps
262	// the binary statically linked and avoids surprising the test on
263	// hosts without a C toolchain.
264	cmd.Env = append(os.Environ(), "CGO_ENABLED=0")
265	out, err := cmd.CombinedOutput()
266	if err != nil {
267		t.Fatalf("go build crush: %v\n%s", err, out)
268	}
269	return binPath
270}
271
272// shutdownServer best-effort terminates any crush server bound to
273// socketPath by POSTing to /v1/control. We don't import the project's
274// own client package to keep this test free of internal API churn.
275func shutdownServer(t *testing.T, socketPath string) {
276	t.Helper()
277	if _, err := os.Stat(socketPath); err != nil {
278		return
279	}
280
281	tr := &http.Transport{
282		DialContext: func(ctx context.Context, _, _ string) (net.Conn, error) {
283			var d net.Dialer
284			return d.DialContext(ctx, "unix", socketPath)
285		},
286	}
287	hc := &http.Client{Transport: tr, Timeout: 5 * time.Second}
288	defer tr.CloseIdleConnections()
289
290	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
291	defer cancel()
292
293	body := strings.NewReader(`{"command":"shutdown"}`)
294	req, err := http.NewRequestWithContext(ctx, http.MethodPost,
295		"http://crush.local/v1/control", body)
296	if err != nil {
297		t.Logf("shutdown: build request: %v", err)
298		return
299	}
300	req.Header.Set("Content-Type", "application/json")
301
302	resp, err := hc.Do(req)
303	if err != nil {
304		// Server may already be gone — not an error.
305		t.Logf("shutdown: %v (probably already exited)", err)
306		return
307	}
308	_ = resp.Body.Close()
309
310	// Wait briefly for the socket to disappear so the next test
311	// using the same path doesn't race.
312	deadline := time.Now().Add(5 * time.Second)
313	for time.Now().Before(deadline) {
314		if _, err := os.Stat(socketPath); err != nil {
315			return
316		}
317		time.Sleep(50 * time.Millisecond)
318	}
319}