jq_test.go

  1package shell
  2
  3import (
  4	"bytes"
  5	"context"
  6	"errors"
  7	"io"
  8	"strings"
  9	"testing"
 10	"time"
 11)
 12
 13// TestJQ_CtxCancel verifies that handleJQ polls ctx during iteration and
 14// returns ctx.Err() (not an interp.ExitStatus) when the context is
 15// cancelled. This is what lets hook timeouts interrupt long-running jq
 16// filters rather than waiting for the iterator to terminate naturally.
 17func TestJQ_CtxCancel(t *testing.T) {
 18	t.Parallel()
 19
 20	// `range(N)` generates a large stream of values. With a slurped input
 21	// the filter produces all N values in sequence; ctx cancellation
 22	// between values should short-circuit the loop.
 23	const filter = "range(10000000)"
 24	stdin := strings.NewReader("null\n")
 25
 26	ctx, cancel := context.WithCancel(t.Context())
 27	// Cancel almost immediately so we catch the next iteration check.
 28	cancel()
 29
 30	err := handleJQ(ctx, []string{"jq", filter}, stdin, io.Discard, io.Discard)
 31	if err == nil {
 32		t.Fatal("expected ctx cancel error, got nil")
 33	}
 34	if !errors.Is(err, context.Canceled) {
 35		t.Fatalf("expected context.Canceled, got %v", err)
 36	}
 37}
 38
 39// TestJQ_CtxCancel_DuringFilter verifies cancellation mid-stream: ctx is
 40// cancelled after jq has started producing output, and the loop must
 41// observe the cancel on the next iteration rather than running to
 42// completion.
 43func TestJQ_CtxCancel_DuringFilter(t *testing.T) {
 44	t.Parallel()
 45
 46	ctx, cancel := context.WithTimeout(t.Context(), 50*time.Millisecond)
 47	defer cancel()
 48
 49	// 100M values; without ctx polling this would take many seconds to
 50	// fully emit. With ctx polling the loop exits shortly after the
 51	// deadline.
 52	stdin := strings.NewReader("null\n")
 53	var stdout, stderr bytes.Buffer
 54
 55	start := time.Now()
 56	err := handleJQ(ctx, []string{"jq", "-c", "range(100000000)"}, stdin, &stdout, &stderr)
 57	elapsed := time.Since(start)
 58
 59	if err == nil {
 60		t.Fatal("expected ctx timeout error, got nil")
 61	}
 62	if !errors.Is(err, context.DeadlineExceeded) {
 63		t.Fatalf("expected context.DeadlineExceeded, got %v", err)
 64	}
 65	// Allow generous slack for slow CI; the important invariant is that we
 66	// don't run all 100M iterations (which would take orders of magnitude
 67	// longer than 1s).
 68	if elapsed > time.Second {
 69		t.Fatalf("handleJQ took %v after 50ms timeout; ctx polling is not tight enough", elapsed)
 70	}
 71}
 72
 73// slowReader serves bytes in small chunks with a fixed delay between
 74// Read calls. It never blocks indefinitely — each Read returns after
 75// chunkDelay — so cancellation must be observed via ctxReader's ctx
 76// check, not by the underlying reader itself. That isolates the
 77// behavior we want to test: the wrapper polling ctx between chunks.
 78type slowReader struct {
 79	remaining  []byte
 80	chunk      int
 81	chunkDelay time.Duration
 82}
 83
 84func (s *slowReader) Read(p []byte) (int, error) {
 85	if len(s.remaining) == 0 {
 86		return 0, io.EOF
 87	}
 88	time.Sleep(s.chunkDelay)
 89	n := min(len(p), min(s.chunk, len(s.remaining)))
 90	copy(p, s.remaining[:n])
 91	s.remaining = s.remaining[n:]
 92	return n, nil
 93}
 94
 95// TestJQ_CtxCancel_MidReadAll verifies that ctx cancellation observed
 96// *during* io.ReadAll — after several chunks have already been consumed
 97// — short-circuits the read via ctxReader, rather than draining the
 98// whole source. This is the guarantee the hook runner relies on when
 99// it feeds a large bytes.Reader payload.
100//
101// The reader serves bytes in 512-byte chunks with a 5ms gap between
102// reads. ctx is cancelled after ~50ms, so several chunks have already
103// been read when ctxReader first observes the cancellation. The test
104// asserts that (a) we got a context.Canceled error and (b) the call
105// returned well before the reader would have been fully drained.
106func TestJQ_CtxCancel_MidReadAll(t *testing.T) {
107	t.Parallel()
108
109	const (
110		size       = 64 * 1024 * 1024 // 64 MiB
111		chunk      = 512
112		chunkDelay = 5 * time.Millisecond
113	)
114	// At 512 bytes / 5ms, draining 64 MiB would take ~11 minutes. Any
115	// return within a second proves cancel was observed mid-stream, not
116	// after EOF.
117	reader := &slowReader{
118		remaining:  bytes.Repeat([]byte("a"), size),
119		chunk:      chunk,
120		chunkDelay: chunkDelay,
121	}
122
123	ctx, cancel := context.WithCancel(t.Context())
124	defer cancel()
125
126	// Cancel after enough time that several Read calls have completed
127	// and io.ReadAll is actively consuming the source.
128	go func() {
129		time.Sleep(50 * time.Millisecond)
130		cancel()
131	}()
132
133	start := time.Now()
134	err := handleJQ(ctx, []string{"jq", "-R", "."}, reader, io.Discard, io.Discard)
135	elapsed := time.Since(start)
136
137	if !errors.Is(err, context.Canceled) {
138		t.Fatalf("expected context.Canceled, got %v", err)
139	}
140	// Generous slack for slow CI; the invariant is orders-of-magnitude
141	// faster than draining the full source.
142	if elapsed > time.Second {
143		t.Fatalf("mid-ReadAll cancel took %v; ctxReader is not polling between chunks", elapsed)
144	}
145	// Sanity check: we should have been cancelled mid-stream, not
146	// before any reads happened. If remaining == size, cancel fired so
147	// early nothing was consumed — that's a fast-fail path, not the
148	// mid-read guarantee we want to verify.
149	consumed := size - len(reader.remaining)
150	if consumed == 0 {
151		t.Fatal("reader was never read from; test did not exercise mid-ReadAll cancel")
152	}
153	if consumed >= size {
154		t.Fatal("reader was fully drained; cancel was not observed mid-read")
155	}
156}
157
158// TestJQ_CtxCancel_PreCancel verifies the fast-fail path: a ctx already
159// cancelled before handleJQ is called returns context.Canceled
160// immediately via the outer-loop guard, never entering io.ReadAll.
161// Complements TestJQ_CtxCancel_MidReadAll.
162func TestJQ_CtxCancel_PreCancel(t *testing.T) {
163	t.Parallel()
164
165	ctx, cancel := context.WithCancel(t.Context())
166	cancel()
167
168	start := time.Now()
169	err := handleJQ(ctx, []string{"jq", "-R", "."},
170		bytes.NewReader(bytes.Repeat([]byte("a"), 1024)),
171		io.Discard, io.Discard)
172	elapsed := time.Since(start)
173
174	if !errors.Is(err, context.Canceled) {
175		t.Fatalf("expected context.Canceled, got %v", err)
176	}
177	if elapsed > 100*time.Millisecond {
178		t.Fatalf("pre-cancel fast-fail took %v; outer guard is not firing", elapsed)
179	}
180}
181
182// TestJQ_Success confirms the ctx-aware refactor did not regress the
183// success path.
184func TestJQ_Success(t *testing.T) {
185	t.Parallel()
186
187	var stdout bytes.Buffer
188	err := handleJQ(t.Context(),
189		[]string{"jq", "-c", ".a"},
190		strings.NewReader(`{"a":1}`),
191		&stdout, io.Discard,
192	)
193	if err != nil {
194		t.Fatalf("handleJQ returned error: %v", err)
195	}
196	if got := stdout.String(); got != "1\n" {
197		t.Fatalf("stdout = %q, want %q", got, "1\n")
198	}
199}