message_test.go

  1package message
  2
  3import (
  4	"context"
  5	"sync"
  6	"sync/atomic"
  7	"testing"
  8	"time"
  9
 10	"github.com/charmbracelet/crush/internal/db"
 11	"github.com/charmbracelet/crush/internal/pubsub"
 12	"github.com/charmbracelet/crush/internal/session"
 13	"github.com/stretchr/testify/require"
 14)
 15
 16// slowUpdateQuerier wraps a [db.Querier] and forces UpdateMessage to
 17// hang on a release channel. Used to simulate an in-flight SQL write.
 18type slowUpdateQuerier struct {
 19	db.Querier
 20	release   chan struct{}
 21	started   chan struct{}
 22	startOnce sync.Once
 23}
 24
 25func (s *slowUpdateQuerier) UpdateMessage(ctx context.Context, arg db.UpdateMessageParams) error {
 26	s.startOnce.Do(func() { close(s.started) })
 27	select {
 28	case <-s.release:
 29	case <-ctx.Done():
 30		return ctx.Err()
 31	}
 32	return s.Querier.UpdateMessage(ctx, arg)
 33}
 34
 35// newTestService spins up a fresh in-memory message.Service backed by a
 36// temporary on-disk SQLite database. Returns the service plus a session
 37// ID to attach messages to.
 38func newTestService(t *testing.T, opts ...ServiceOption) (Service, string) {
 39	t.Helper()
 40	conn, err := db.Connect(t.Context(), t.TempDir())
 41	require.NoError(t, err)
 42	t.Cleanup(func() { _ = conn.Close() })
 43
 44	q := db.New(conn)
 45	sessions := session.NewService(q, conn)
 46	sess, err := sessions.Create(t.Context(), "test")
 47	require.NoError(t, err)
 48
 49	svc := NewService(q, opts...)
 50	return svc, sess.ID
 51}
 52
 53// eventCollector consumes broker events into a slice in a goroutine
 54// and exposes thread-safe Snapshot / Reset helpers for assertions.
 55type eventCollector struct {
 56	mu     sync.Mutex
 57	events []pubsub.Event[Message]
 58}
 59
 60func collect(ctx context.Context, sub <-chan pubsub.Event[Message]) *eventCollector {
 61	c := &eventCollector{}
 62	go func() {
 63		for {
 64			select {
 65			case <-ctx.Done():
 66				return
 67			case ev, ok := <-sub:
 68				if !ok {
 69					return
 70				}
 71				c.mu.Lock()
 72				c.events = append(c.events, ev)
 73				c.mu.Unlock()
 74			}
 75		}
 76	}()
 77	return c
 78}
 79
 80func (c *eventCollector) snapshot() []pubsub.Event[Message] {
 81	c.mu.Lock()
 82	defer c.mu.Unlock()
 83	out := make([]pubsub.Event[Message], len(c.events))
 84	copy(out, c.events)
 85	return out
 86}
 87
 88func (c *eventCollector) reset() {
 89	c.mu.Lock()
 90	defer c.mu.Unlock()
 91	c.events = nil
 92}
 93
 94func TestUpdate_DebouncesTextDeltas(t *testing.T) {
 95	t.Parallel()
 96
 97	// Long-enough debounce that we can verify nothing flushes prematurely.
 98	svc, sessionID := newTestService(t, WithDebounce(50*time.Millisecond))
 99
100	subCtx, cancelSub := context.WithCancel(t.Context())
101	defer cancelSub()
102	sub := svc.Subscribe(subCtx)
103	collector := collect(subCtx, sub)
104
105	msg, err := svc.Create(t.Context(), sessionID, CreateMessageParams{
106		Role: Assistant,
107	})
108	require.NoError(t, err)
109	// Drop the CreatedEvent emitted by Create.
110	time.Sleep(5 * time.Millisecond)
111	collector.reset()
112
113	// Push 5 deltas inside a single debounce window.
114	for i := 0; i < 5; i++ {
115		msg.AppendContent("a")
116		require.NoError(t, svc.Update(t.Context(), msg))
117	}
118
119	// Before the debounce expires no UpdatedEvent should have landed.
120	time.Sleep(10 * time.Millisecond)
121	require.Empty(t, collector.snapshot(), "no events should land before debounce window expires")
122
123	// Wait for the debounce timer to fire.
124	require.Eventually(t, func() bool {
125		return len(collector.snapshot()) >= 1
126	}, time.Second, 5*time.Millisecond)
127	events := collector.snapshot()
128	require.Len(t, events, 1, "5 deltas should coalesce into 1 UpdatedEvent")
129	require.Equal(t, pubsub.UpdatedEvent, events[0].Type)
130	require.Equal(t, "aaaaa", events[0].Payload.Content().Text)
131
132	// Final state must be persisted.
133	got, err := svc.Get(t.Context(), msg.ID)
134	require.NoError(t, err)
135	require.Equal(t, "aaaaa", got.Content().Text)
136}
137
138func TestUpdate_TerminalUpdatesFlushSynchronously(t *testing.T) {
139	t.Parallel()
140
141	svc, sessionID := newTestService(t, WithDebounce(time.Hour))
142
143	subCtx, cancelSub := context.WithCancel(t.Context())
144	defer cancelSub()
145	sub := svc.Subscribe(subCtx)
146	collector := collect(subCtx, sub)
147
148	msg, err := svc.Create(t.Context(), sessionID, CreateMessageParams{Role: Assistant})
149	require.NoError(t, err)
150	time.Sleep(5 * time.Millisecond)
151	collector.reset()
152
153	// AddFinish makes the message terminal; Update must flush
154	// synchronously even with a 1-hour debounce.
155	msg.AppendContent("done")
156	msg.AddFinish(FinishReasonEndTurn, "", "")
157	require.NoError(t, svc.Update(t.Context(), msg))
158
159	require.Eventually(t, func() bool {
160		return len(collector.snapshot()) >= 1
161	}, time.Second, 5*time.Millisecond,
162		"terminal update must publish without waiting for debounce")
163	events := collector.snapshot()
164	require.Len(t, events, 1)
165	require.True(t, events[0].Payload.IsFinished())
166
167	got, err := svc.Get(t.Context(), msg.ID)
168	require.NoError(t, err)
169	require.True(t, got.IsFinished())
170}
171
172func TestUpdate_ToolCallStructuralChangeFlushes(t *testing.T) {
173	t.Parallel()
174
175	svc, sessionID := newTestService(t, WithDebounce(time.Hour))
176
177	msg, err := svc.Create(t.Context(), sessionID, CreateMessageParams{Role: Assistant})
178	require.NoError(t, err)
179
180	// Adding a new tool call is a structural change → sync flush.
181	msg.AddToolCall(ToolCall{ID: "tc1", Name: "view", Finished: false})
182	require.NoError(t, svc.Update(t.Context(), msg))
183
184	got, err := svc.Get(t.Context(), msg.ID)
185	require.NoError(t, err)
186	require.Len(t, got.ToolCalls(), 1)
187	require.Equal(t, "tc1", got.ToolCalls()[0].ID)
188
189	// Marking the tool call finished is also a structural change.
190	msg.AddToolCall(ToolCall{ID: "tc1", Name: "view", Input: "{}", Finished: true})
191	require.NoError(t, svc.Update(t.Context(), msg))
192
193	got, err = svc.Get(t.Context(), msg.ID)
194	require.NoError(t, err)
195	require.True(t, got.ToolCalls()[0].Finished)
196}
197
198func TestUpdate_ReasoningEndFlushes(t *testing.T) {
199	t.Parallel()
200
201	svc, sessionID := newTestService(t, WithDebounce(time.Hour))
202
203	msg, err := svc.Create(t.Context(), sessionID, CreateMessageParams{Role: Assistant})
204	require.NoError(t, err)
205
206	// Reasoning deltas alone debounce.
207	msg.AppendReasoningContent("hmm")
208	require.NoError(t, svc.Update(t.Context(), msg))
209	got, err := svc.Get(t.Context(), msg.ID)
210	require.NoError(t, err)
211	require.Empty(t, got.ReasoningContent().Thinking, "reasoning delta should still be in the debounce buffer")
212
213	// FinishThinking sets FinishedAt → terminal flush.
214	msg.FinishThinking()
215	require.NoError(t, svc.Update(t.Context(), msg))
216
217	got, err = svc.Get(t.Context(), msg.ID)
218	require.NoError(t, err)
219	require.Equal(t, "hmm", got.ReasoningContent().Thinking)
220	require.NotZero(t, got.ReasoningContent().FinishedAt)
221}
222
223func TestFlush_DrainsPendingDebouncedUpdates(t *testing.T) {
224	t.Parallel()
225
226	svc, sessionID := newTestService(t, WithDebounce(time.Hour))
227
228	msg, err := svc.Create(t.Context(), sessionID, CreateMessageParams{Role: Assistant})
229	require.NoError(t, err)
230	msg.AppendContent("buffered")
231	require.NoError(t, svc.Update(t.Context(), msg))
232
233	// Without a flush the SQL row is unchanged from Create.
234	got, err := svc.Get(t.Context(), msg.ID)
235	require.NoError(t, err)
236	require.Empty(t, got.Content().Text)
237
238	require.NoError(t, svc.Flush(t.Context(), msg.ID))
239
240	got, err = svc.Get(t.Context(), msg.ID)
241	require.NoError(t, err)
242	require.Equal(t, "buffered", got.Content().Text)
243
244	// Subsequent Flush is a no-op.
245	require.NoError(t, svc.Flush(t.Context(), msg.ID))
246}
247
248func TestFlushAll_DrainsAllPending(t *testing.T) {
249	t.Parallel()
250
251	svc, sessionID := newTestService(t, WithDebounce(time.Hour))
252
253	const n = 5
254	msgs := make([]Message, n)
255	for i := range msgs {
256		m, err := svc.Create(t.Context(), sessionID, CreateMessageParams{Role: Assistant})
257		require.NoError(t, err)
258		m.AppendContent("hi")
259		require.NoError(t, svc.Update(t.Context(), m))
260		msgs[i] = m
261	}
262
263	require.NoError(t, svc.FlushAll(t.Context()))
264
265	for _, m := range msgs {
266		got, err := svc.Get(t.Context(), m.ID)
267		require.NoError(t, err)
268		require.Equal(t, "hi", got.Content().Text, "FlushAll should drain every pending message")
269	}
270}
271
272func TestUpdate_OrderingMatchesNonCoalesced(t *testing.T) {
273	t.Parallel()
274
275	// Compare the final state after coalesced vs zero-debounce updates.
276	// A sequence of interleaved text/reasoning/tool-call updates must
277	// converge to the same final DB row either way.
278	build := func(svc Service, sessionID string) Message {
279		msg, err := svc.Create(t.Context(), sessionID, CreateMessageParams{Role: Assistant})
280		require.NoError(t, err)
281		msg.AppendReasoningContent("thinking 1 ")
282		require.NoError(t, svc.Update(t.Context(), msg))
283		msg.AppendReasoningContent("thinking 2")
284		require.NoError(t, svc.Update(t.Context(), msg))
285		msg.FinishThinking()
286		require.NoError(t, svc.Update(t.Context(), msg))
287		msg.AppendContent("hello ")
288		require.NoError(t, svc.Update(t.Context(), msg))
289		msg.AppendContent("world")
290		require.NoError(t, svc.Update(t.Context(), msg))
291		msg.AddToolCall(ToolCall{ID: "tc", Name: "x", Finished: false})
292		require.NoError(t, svc.Update(t.Context(), msg))
293		msg.AddToolCall(ToolCall{ID: "tc", Name: "x", Input: "{}", Finished: true})
294		require.NoError(t, svc.Update(t.Context(), msg))
295		msg.AddFinish(FinishReasonEndTurn, "", "")
296		require.NoError(t, svc.Update(t.Context(), msg))
297		return msg
298	}
299
300	coalesced, sid1 := newTestService(t, WithDebounce(20*time.Millisecond))
301	a := build(coalesced, sid1)
302	require.NoError(t, coalesced.FlushAll(t.Context()))
303	gotA, err := coalesced.Get(t.Context(), a.ID)
304	require.NoError(t, err)
305
306	immediate, sid2 := newTestService(t, WithDebounce(0))
307	b := build(immediate, sid2)
308	gotB, err := immediate.Get(t.Context(), b.ID)
309	require.NoError(t, err)
310
311	require.Equal(t, gotA.Content().Text, gotB.Content().Text)
312	require.Equal(t, gotA.ReasoningContent().Thinking, gotB.ReasoningContent().Thinking)
313	require.Equal(t, len(gotA.ToolCalls()), len(gotB.ToolCalls()))
314	require.Equal(t, gotA.IsFinished(), gotB.IsFinished())
315}
316
317func TestDelete_DropsPendingState(t *testing.T) {
318	t.Parallel()
319
320	svc, sessionID := newTestService(t, WithDebounce(time.Hour))
321	msg, err := svc.Create(t.Context(), sessionID, CreateMessageParams{Role: Assistant})
322	require.NoError(t, err)
323	msg.AppendContent("dropped")
324	require.NoError(t, svc.Update(t.Context(), msg))
325
326	require.NoError(t, svc.Delete(t.Context(), msg.ID))
327
328	// FlushAll after Delete must not write to the deleted row.
329	require.NoError(t, svc.FlushAll(t.Context()))
330
331	_, err = svc.Get(t.Context(), msg.ID)
332	require.Error(t, err, "deleted message must remain deleted")
333}
334
335func TestBroker_PublishLossyDropCounter(t *testing.T) {
336	t.Parallel()
337
338	// Tiny channel buffer so we can saturate from a single sender.
339	b := pubsub.NewBrokerWithOptions[int](1)
340	defer b.Shutdown()
341
342	subCtx, cancel := context.WithCancel(t.Context())
343	defer cancel()
344	sub := b.Subscribe(subCtx)
345	require.NotNil(t, sub)
346
347	// Don't read from sub. Saturate the buffer.
348	for range 100 {
349		b.Publish(pubsub.UpdatedEvent, 1)
350	}
351
352	require.GreaterOrEqual(t, b.DropCount(), uint64(1),
353		"lossy Publish must increment the drop counter under contention")
354}
355
356func TestBroker_PublishMustDeliverHonorsTimeout(t *testing.T) {
357	t.Parallel()
358
359	b := pubsub.NewBrokerWithOptions[int](1)
360	b.SetMustDeliverTimeout(20 * time.Millisecond)
361	defer b.Shutdown()
362
363	subCtx, cancel := context.WithCancel(t.Context())
364	defer cancel()
365	sub := b.Subscribe(subCtx)
366	require.NotNil(t, sub)
367
368	// Saturate: one event sits in the buffer, the second must wait.
369	b.Publish(pubsub.UpdatedEvent, 1)
370
371	// PublishMustDeliver should block up to 20ms then drop.
372	start := time.Now()
373	b.PublishMustDeliver(t.Context(), pubsub.UpdatedEvent, 2)
374	elapsed := time.Since(start)
375
376	require.GreaterOrEqual(t, elapsed, 20*time.Millisecond,
377		"PublishMustDeliver should block at least the timeout under contention")
378	require.Less(t, elapsed, 200*time.Millisecond,
379		"PublishMustDeliver must not block indefinitely")
380	require.GreaterOrEqual(t, b.MustDeliverDropCount(), uint64(1),
381		"timeout must increment the must-deliver drop counter")
382}
383
384func TestBroker_PublishMustDeliverWithReader(t *testing.T) {
385	t.Parallel()
386
387	b := pubsub.NewBrokerWithOptions[int](1)
388	b.SetMustDeliverTimeout(50 * time.Millisecond)
389	defer b.Shutdown()
390
391	subCtx, cancel := context.WithCancel(t.Context())
392	defer cancel()
393	sub := b.Subscribe(subCtx)
394
395	var received atomic.Uint64
396	done := make(chan struct{})
397	go func() {
398		defer close(done)
399		for {
400			select {
401			case <-subCtx.Done():
402				return
403			case _, ok := <-sub:
404				if !ok {
405					return
406				}
407				received.Add(1)
408			}
409		}
410	}()
411
412	for i := range 10 {
413		b.PublishMustDeliver(t.Context(), pubsub.UpdatedEvent, i)
414	}
415
416	// All 10 should land within the must-deliver timeout window.
417	require.Eventually(t, func() bool { return received.Load() == 10 },
418		time.Second, 5*time.Millisecond,
419		"all must-deliver events should reach an active subscriber")
420	require.Zero(t, b.MustDeliverDropCount(),
421		"no drops expected when subscriber drains promptly")
422}
423
424func TestUpdate_TerminalEventUsesMustDeliver(t *testing.T) {
425	t.Parallel()
426
427	svc, sessionID := newTestService(t, WithDebounce(time.Hour))
428
429	subCtx, cancel := context.WithCancel(t.Context())
430	defer cancel()
431	sub := svc.Subscribe(subCtx)
432
433	var seenFinish atomic.Bool
434	done := make(chan struct{})
435	go func() {
436		defer close(done)
437		for {
438			select {
439			case <-subCtx.Done():
440				return
441			case ev, ok := <-sub:
442				if !ok {
443					return
444				}
445				if ev.Type == pubsub.UpdatedEvent && ev.Payload.IsFinished() {
446					seenFinish.Store(true)
447				}
448			}
449		}
450	}()
451
452	msg, err := svc.Create(t.Context(), sessionID, CreateMessageParams{Role: Assistant})
453	require.NoError(t, err)
454	msg.AppendContent("final")
455	msg.AddFinish(FinishReasonEndTurn, "", "")
456	require.NoError(t, svc.Update(t.Context(), msg))
457
458	require.Eventually(t, func() bool { return seenFinish.Load() },
459		time.Second, 10*time.Millisecond,
460		"terminal update must reach subscribers via the must-deliver path")
461}
462
463func TestUpdate_ZeroDebounceFlushesEveryUpdate(t *testing.T) {
464	t.Parallel()
465
466	svc, sessionID := newTestService(t, WithDebounce(0))
467
468	msg, err := svc.Create(t.Context(), sessionID, CreateMessageParams{Role: Assistant})
469	require.NoError(t, err)
470
471	for i := 0; i < 3; i++ {
472		msg.AppendContent("x")
473		require.NoError(t, svc.Update(t.Context(), msg))
474		got, err := svc.Get(t.Context(), msg.ID)
475		require.NoError(t, err)
476		require.Len(t, got.Content().Text, i+1, "every update must land synchronously when debounce is 0")
477	}
478}
479
480// TestFlush_WaitsForInFlightWrite reproduces the failure where Flush
481// or FlushAll could return before a concurrent in-flight SQL write
482// completed. We block UpdateMessage on a release channel, fire the
483// debounce timer, then call Flush and assert it does not return until
484// the in-flight write actually lands.
485func TestFlush_WaitsForInFlightWrite(t *testing.T) {
486	t.Parallel()
487
488	conn, err := db.Connect(t.Context(), t.TempDir())
489	require.NoError(t, err)
490	t.Cleanup(func() { _ = conn.Close() })
491
492	q := db.New(conn)
493	sessions := session.NewService(q, conn)
494	sess, err := sessions.Create(t.Context(), "test")
495	require.NoError(t, err)
496
497	slow := &slowUpdateQuerier{
498		Querier: q,
499		release: make(chan struct{}),
500		started: make(chan struct{}),
501	}
502	// Short debounce so the timer fires quickly.
503	svc := NewService(slow, WithDebounce(10*time.Millisecond))
504
505	msg, err := svc.Create(t.Context(), sess.ID, CreateMessageParams{Role: Assistant})
506	require.NoError(t, err)
507	msg.AppendContent("payload")
508	require.NoError(t, svc.Update(t.Context(), msg))
509
510	// Wait for the timer-fired flush to enter UpdateMessage.
511	select {
512	case <-slow.started:
513	case <-time.After(time.Second):
514		t.Fatal("timer-fired flush never reached UpdateMessage")
515	}
516
517	// At this point the buffer is dirty=false but flushing=true. A
518	// naive Flush would early-return on !dirty. Spawn Flush in a
519	// goroutine and assert it has not returned while the write is
520	// still blocked.
521	flushDone := make(chan error, 1)
522	go func() { flushDone <- svc.Flush(t.Context(), msg.ID) }()
523
524	select {
525	case err := <-flushDone:
526		t.Fatalf("Flush returned %v while in-flight write was still blocked", err)
527	case <-time.After(50 * time.Millisecond):
528		// Expected: Flush is correctly waiting.
529	}
530
531	// Release the slow write; Flush must now return cleanly.
532	close(slow.release)
533	select {
534	case err := <-flushDone:
535		require.NoError(t, err)
536	case <-time.After(time.Second):
537		t.Fatal("Flush did not return after in-flight write completed")
538	}
539
540	// The SQL row should now reflect the buffered payload.
541	got, err := svc.Get(t.Context(), msg.ID)
542	require.NoError(t, err)
543	require.Equal(t, "payload", got.Content().Text)
544}
545
546// TestFlushAll_WaitsForInFlightWrite asserts FlushAll picks up IDs
547// whose buffer is currently flushing (dirty=false) so shutdown and
548// session-switch callers don't return while an SQL write is mid-flight.
549func TestFlushAll_WaitsForInFlightWrite(t *testing.T) {
550	t.Parallel()
551
552	conn, err := db.Connect(t.Context(), t.TempDir())
553	require.NoError(t, err)
554	t.Cleanup(func() { _ = conn.Close() })
555
556	q := db.New(conn)
557	sessions := session.NewService(q, conn)
558	sess, err := sessions.Create(t.Context(), "test")
559	require.NoError(t, err)
560
561	slow := &slowUpdateQuerier{
562		Querier: q,
563		release: make(chan struct{}),
564		started: make(chan struct{}),
565	}
566	svc := NewService(slow, WithDebounce(10*time.Millisecond))
567
568	msg, err := svc.Create(t.Context(), sess.ID, CreateMessageParams{Role: Assistant})
569	require.NoError(t, err)
570	msg.AppendContent("payload")
571	require.NoError(t, svc.Update(t.Context(), msg))
572
573	select {
574	case <-slow.started:
575	case <-time.After(time.Second):
576		t.Fatal("timer-fired flush never reached UpdateMessage")
577	}
578
579	flushDone := make(chan error, 1)
580	go func() { flushDone <- svc.FlushAll(t.Context()) }()
581
582	select {
583	case err := <-flushDone:
584		t.Fatalf("FlushAll returned %v while in-flight write was still blocked", err)
585	case <-time.After(50 * time.Millisecond):
586	}
587
588	close(slow.release)
589	select {
590	case err := <-flushDone:
591		require.NoError(t, err)
592	case <-time.After(time.Second):
593		t.Fatal("FlushAll did not return after in-flight write completed")
594	}
595
596	got, err := svc.Get(t.Context(), msg.ID)
597	require.NoError(t, err)
598	require.Equal(t, "payload", got.Content().Text)
599}
600
601// TestUpdate_StructuralFlushUsesMustDeliver covers the second review
602// finding: structural terminal events (tool-call add, tool-call
603// finish, reasoning end) must publish via the must-deliver path even
604// when the message itself is not yet IsFinished.
605//
606// We detect which path was taken by saturating a subscriber's channel
607// buffer with no reader. With a short must-deliver timeout, the
608// must-deliver path increments [pubsub.Broker.MustDeliverDropCount]
609// after the timeout expires; the lossy path increments
610// [pubsub.Broker.DropCount] immediately. The two counters are
611// disjoint, so they precisely identify which call site published the
612// event.
613func TestUpdate_StructuralFlushUsesMustDeliver(t *testing.T) {
614	t.Parallel()
615
616	cases := []struct {
617		name string
618		mut  func(*Message)
619	}{
620		{
621			name: "tool call add",
622			mut: func(m *Message) {
623				m.AddToolCall(ToolCall{ID: "tc1", Name: "view"})
624			},
625		},
626		{
627			name: "tool call finish",
628			mut: func(m *Message) {
629				m.AddToolCall(ToolCall{ID: "tc1", Name: "view", Input: "{}", Finished: true})
630			},
631		},
632		{
633			name: "reasoning end",
634			mut: func(m *Message) {
635				m.AppendReasoningContent("hmm")
636				m.FinishThinking()
637			},
638		},
639	}
640
641	for _, tc := range cases {
642		t.Run(tc.name, func(t *testing.T) {
643			t.Parallel()
644
645			conn, err := db.Connect(t.Context(), t.TempDir())
646			require.NoError(t, err)
647			t.Cleanup(func() { _ = conn.Close() })
648
649			q := db.New(conn)
650			sessions := session.NewService(q, conn)
651			sess, err := sessions.Create(t.Context(), "test")
652			require.NoError(t, err)
653
654			// Replace the default broker with a tiny buffer + short
655			// must-deliver timeout so we can fully saturate from a
656			// single sender and observe drops without long waits.
657			svc := NewService(q, WithDebounce(time.Hour))
658			impl := svc.(*service)
659			impl.Shutdown()
660			impl.Broker = pubsub.NewBrokerWithOptions[Message](1)
661			impl.SetMustDeliverTimeout(40 * time.Millisecond)
662
663			subCtx, cancel := context.WithCancel(t.Context())
664			defer cancel()
665			sub := svc.Subscribe(subCtx)
666
667			msg, err := svc.Create(t.Context(), sess.ID, CreateMessageParams{Role: Assistant})
668			require.NoError(t, err)
669
670			// Saturate the subscriber's buffer (capacity 1). The
671			// CreatedEvent from Create above already left one event
672			// in the buffer; we never read sub, so the next publish
673			// has nowhere to go.
674			_ = sub // intentionally not drained.
675
676			// Drive the structural change. With debounce=1h, Update
677			// flushes synchronously and routes through whichever
678			// publish path the service chose for structural events.
679			tc.mut(&msg)
680			require.NoError(t, svc.Update(t.Context(), msg))
681
682			// Must-deliver timeout (40ms) should have expired with
683			// no drain. If structural events are routed through
684			// must-deliver: MustDeliverDropCount > 0, DropCount
685			// unchanged. If routed through lossy Publish:
686			// DropCount > 0, MustDeliverDropCount == 0.
687			require.Eventually(t, func() bool {
688				return impl.MustDeliverDropCount() >= 1
689			}, time.Second, 5*time.Millisecond,
690				"structural terminal event should publish via must-deliver, not lossy Publish")
691			require.Zero(t, impl.DropCount(),
692				"structural terminal event must not be silently dropped via lossy Publish")
693		})
694	}
695}