1package message
2
3import (
4 "context"
5 "sync"
6 "sync/atomic"
7 "testing"
8 "time"
9
10 "github.com/charmbracelet/crush/internal/db"
11 "github.com/charmbracelet/crush/internal/pubsub"
12 "github.com/charmbracelet/crush/internal/session"
13 "github.com/stretchr/testify/require"
14)
15
16// slowUpdateQuerier wraps a [db.Querier] and forces UpdateMessage to
17// hang on a release channel. Used to simulate an in-flight SQL write.
18type slowUpdateQuerier struct {
19 db.Querier
20 release chan struct{}
21 started chan struct{}
22 startOnce sync.Once
23}
24
25func (s *slowUpdateQuerier) UpdateMessage(ctx context.Context, arg db.UpdateMessageParams) error {
26 s.startOnce.Do(func() { close(s.started) })
27 select {
28 case <-s.release:
29 case <-ctx.Done():
30 return ctx.Err()
31 }
32 return s.Querier.UpdateMessage(ctx, arg)
33}
34
35// newTestService spins up a fresh in-memory message.Service backed by a
36// temporary on-disk SQLite database. Returns the service plus a session
37// ID to attach messages to.
38func newTestService(t *testing.T, opts ...ServiceOption) (Service, string) {
39 t.Helper()
40 conn, err := db.Connect(t.Context(), t.TempDir())
41 require.NoError(t, err)
42 t.Cleanup(func() { _ = conn.Close() })
43
44 q := db.New(conn)
45 sessions := session.NewService(q, conn)
46 sess, err := sessions.Create(t.Context(), "test")
47 require.NoError(t, err)
48
49 svc := NewService(q, opts...)
50 return svc, sess.ID
51}
52
53// eventCollector consumes broker events into a slice in a goroutine
54// and exposes thread-safe Snapshot / Reset helpers for assertions.
55type eventCollector struct {
56 mu sync.Mutex
57 events []pubsub.Event[Message]
58}
59
60func collect(ctx context.Context, sub <-chan pubsub.Event[Message]) *eventCollector {
61 c := &eventCollector{}
62 go func() {
63 for {
64 select {
65 case <-ctx.Done():
66 return
67 case ev, ok := <-sub:
68 if !ok {
69 return
70 }
71 c.mu.Lock()
72 c.events = append(c.events, ev)
73 c.mu.Unlock()
74 }
75 }
76 }()
77 return c
78}
79
80func (c *eventCollector) snapshot() []pubsub.Event[Message] {
81 c.mu.Lock()
82 defer c.mu.Unlock()
83 out := make([]pubsub.Event[Message], len(c.events))
84 copy(out, c.events)
85 return out
86}
87
88func (c *eventCollector) reset() {
89 c.mu.Lock()
90 defer c.mu.Unlock()
91 c.events = nil
92}
93
94func TestUpdate_DebouncesTextDeltas(t *testing.T) {
95 t.Parallel()
96
97 // Long-enough debounce that we can verify nothing flushes prematurely.
98 svc, sessionID := newTestService(t, WithDebounce(50*time.Millisecond))
99
100 subCtx, cancelSub := context.WithCancel(t.Context())
101 defer cancelSub()
102 sub := svc.Subscribe(subCtx)
103 collector := collect(subCtx, sub)
104
105 msg, err := svc.Create(t.Context(), sessionID, CreateMessageParams{
106 Role: Assistant,
107 })
108 require.NoError(t, err)
109 // Drop the CreatedEvent emitted by Create.
110 time.Sleep(5 * time.Millisecond)
111 collector.reset()
112
113 // Push 5 deltas inside a single debounce window.
114 for i := 0; i < 5; i++ {
115 msg.AppendContent("a")
116 require.NoError(t, svc.Update(t.Context(), msg))
117 }
118
119 // Before the debounce expires no UpdatedEvent should have landed.
120 time.Sleep(10 * time.Millisecond)
121 require.Empty(t, collector.snapshot(), "no events should land before debounce window expires")
122
123 // Wait for the debounce timer to fire.
124 require.Eventually(t, func() bool {
125 return len(collector.snapshot()) >= 1
126 }, time.Second, 5*time.Millisecond)
127 events := collector.snapshot()
128 require.Len(t, events, 1, "5 deltas should coalesce into 1 UpdatedEvent")
129 require.Equal(t, pubsub.UpdatedEvent, events[0].Type)
130 require.Equal(t, "aaaaa", events[0].Payload.Content().Text)
131
132 // Final state must be persisted.
133 got, err := svc.Get(t.Context(), msg.ID)
134 require.NoError(t, err)
135 require.Equal(t, "aaaaa", got.Content().Text)
136}
137
138func TestUpdate_TerminalUpdatesFlushSynchronously(t *testing.T) {
139 t.Parallel()
140
141 svc, sessionID := newTestService(t, WithDebounce(time.Hour))
142
143 subCtx, cancelSub := context.WithCancel(t.Context())
144 defer cancelSub()
145 sub := svc.Subscribe(subCtx)
146 collector := collect(subCtx, sub)
147
148 msg, err := svc.Create(t.Context(), sessionID, CreateMessageParams{Role: Assistant})
149 require.NoError(t, err)
150 time.Sleep(5 * time.Millisecond)
151 collector.reset()
152
153 // AddFinish makes the message terminal; Update must flush
154 // synchronously even with a 1-hour debounce.
155 msg.AppendContent("done")
156 msg.AddFinish(FinishReasonEndTurn, "", "")
157 require.NoError(t, svc.Update(t.Context(), msg))
158
159 require.Eventually(t, func() bool {
160 return len(collector.snapshot()) >= 1
161 }, time.Second, 5*time.Millisecond,
162 "terminal update must publish without waiting for debounce")
163 events := collector.snapshot()
164 require.Len(t, events, 1)
165 require.True(t, events[0].Payload.IsFinished())
166
167 got, err := svc.Get(t.Context(), msg.ID)
168 require.NoError(t, err)
169 require.True(t, got.IsFinished())
170}
171
172func TestUpdate_ToolCallStructuralChangeFlushes(t *testing.T) {
173 t.Parallel()
174
175 svc, sessionID := newTestService(t, WithDebounce(time.Hour))
176
177 msg, err := svc.Create(t.Context(), sessionID, CreateMessageParams{Role: Assistant})
178 require.NoError(t, err)
179
180 // Adding a new tool call is a structural change → sync flush.
181 msg.AddToolCall(ToolCall{ID: "tc1", Name: "view", Finished: false})
182 require.NoError(t, svc.Update(t.Context(), msg))
183
184 got, err := svc.Get(t.Context(), msg.ID)
185 require.NoError(t, err)
186 require.Len(t, got.ToolCalls(), 1)
187 require.Equal(t, "tc1", got.ToolCalls()[0].ID)
188
189 // Marking the tool call finished is also a structural change.
190 msg.AddToolCall(ToolCall{ID: "tc1", Name: "view", Input: "{}", Finished: true})
191 require.NoError(t, svc.Update(t.Context(), msg))
192
193 got, err = svc.Get(t.Context(), msg.ID)
194 require.NoError(t, err)
195 require.True(t, got.ToolCalls()[0].Finished)
196}
197
198func TestUpdate_ReasoningEndFlushes(t *testing.T) {
199 t.Parallel()
200
201 svc, sessionID := newTestService(t, WithDebounce(time.Hour))
202
203 msg, err := svc.Create(t.Context(), sessionID, CreateMessageParams{Role: Assistant})
204 require.NoError(t, err)
205
206 // Reasoning deltas alone debounce.
207 msg.AppendReasoningContent("hmm")
208 require.NoError(t, svc.Update(t.Context(), msg))
209 got, err := svc.Get(t.Context(), msg.ID)
210 require.NoError(t, err)
211 require.Empty(t, got.ReasoningContent().Thinking, "reasoning delta should still be in the debounce buffer")
212
213 // FinishThinking sets FinishedAt → terminal flush.
214 msg.FinishThinking()
215 require.NoError(t, svc.Update(t.Context(), msg))
216
217 got, err = svc.Get(t.Context(), msg.ID)
218 require.NoError(t, err)
219 require.Equal(t, "hmm", got.ReasoningContent().Thinking)
220 require.NotZero(t, got.ReasoningContent().FinishedAt)
221}
222
223func TestFlush_DrainsPendingDebouncedUpdates(t *testing.T) {
224 t.Parallel()
225
226 svc, sessionID := newTestService(t, WithDebounce(time.Hour))
227
228 msg, err := svc.Create(t.Context(), sessionID, CreateMessageParams{Role: Assistant})
229 require.NoError(t, err)
230 msg.AppendContent("buffered")
231 require.NoError(t, svc.Update(t.Context(), msg))
232
233 // Without a flush the SQL row is unchanged from Create.
234 got, err := svc.Get(t.Context(), msg.ID)
235 require.NoError(t, err)
236 require.Empty(t, got.Content().Text)
237
238 require.NoError(t, svc.Flush(t.Context(), msg.ID))
239
240 got, err = svc.Get(t.Context(), msg.ID)
241 require.NoError(t, err)
242 require.Equal(t, "buffered", got.Content().Text)
243
244 // Subsequent Flush is a no-op.
245 require.NoError(t, svc.Flush(t.Context(), msg.ID))
246}
247
248func TestFlushAll_DrainsAllPending(t *testing.T) {
249 t.Parallel()
250
251 svc, sessionID := newTestService(t, WithDebounce(time.Hour))
252
253 const n = 5
254 msgs := make([]Message, n)
255 for i := range msgs {
256 m, err := svc.Create(t.Context(), sessionID, CreateMessageParams{Role: Assistant})
257 require.NoError(t, err)
258 m.AppendContent("hi")
259 require.NoError(t, svc.Update(t.Context(), m))
260 msgs[i] = m
261 }
262
263 require.NoError(t, svc.FlushAll(t.Context()))
264
265 for _, m := range msgs {
266 got, err := svc.Get(t.Context(), m.ID)
267 require.NoError(t, err)
268 require.Equal(t, "hi", got.Content().Text, "FlushAll should drain every pending message")
269 }
270}
271
272func TestUpdate_OrderingMatchesNonCoalesced(t *testing.T) {
273 t.Parallel()
274
275 // Compare the final state after coalesced vs zero-debounce updates.
276 // A sequence of interleaved text/reasoning/tool-call updates must
277 // converge to the same final DB row either way.
278 build := func(svc Service, sessionID string) Message {
279 msg, err := svc.Create(t.Context(), sessionID, CreateMessageParams{Role: Assistant})
280 require.NoError(t, err)
281 msg.AppendReasoningContent("thinking 1 ")
282 require.NoError(t, svc.Update(t.Context(), msg))
283 msg.AppendReasoningContent("thinking 2")
284 require.NoError(t, svc.Update(t.Context(), msg))
285 msg.FinishThinking()
286 require.NoError(t, svc.Update(t.Context(), msg))
287 msg.AppendContent("hello ")
288 require.NoError(t, svc.Update(t.Context(), msg))
289 msg.AppendContent("world")
290 require.NoError(t, svc.Update(t.Context(), msg))
291 msg.AddToolCall(ToolCall{ID: "tc", Name: "x", Finished: false})
292 require.NoError(t, svc.Update(t.Context(), msg))
293 msg.AddToolCall(ToolCall{ID: "tc", Name: "x", Input: "{}", Finished: true})
294 require.NoError(t, svc.Update(t.Context(), msg))
295 msg.AddFinish(FinishReasonEndTurn, "", "")
296 require.NoError(t, svc.Update(t.Context(), msg))
297 return msg
298 }
299
300 coalesced, sid1 := newTestService(t, WithDebounce(20*time.Millisecond))
301 a := build(coalesced, sid1)
302 require.NoError(t, coalesced.FlushAll(t.Context()))
303 gotA, err := coalesced.Get(t.Context(), a.ID)
304 require.NoError(t, err)
305
306 immediate, sid2 := newTestService(t, WithDebounce(0))
307 b := build(immediate, sid2)
308 gotB, err := immediate.Get(t.Context(), b.ID)
309 require.NoError(t, err)
310
311 require.Equal(t, gotA.Content().Text, gotB.Content().Text)
312 require.Equal(t, gotA.ReasoningContent().Thinking, gotB.ReasoningContent().Thinking)
313 require.Equal(t, len(gotA.ToolCalls()), len(gotB.ToolCalls()))
314 require.Equal(t, gotA.IsFinished(), gotB.IsFinished())
315}
316
317func TestDelete_DropsPendingState(t *testing.T) {
318 t.Parallel()
319
320 svc, sessionID := newTestService(t, WithDebounce(time.Hour))
321 msg, err := svc.Create(t.Context(), sessionID, CreateMessageParams{Role: Assistant})
322 require.NoError(t, err)
323 msg.AppendContent("dropped")
324 require.NoError(t, svc.Update(t.Context(), msg))
325
326 require.NoError(t, svc.Delete(t.Context(), msg.ID))
327
328 // FlushAll after Delete must not write to the deleted row.
329 require.NoError(t, svc.FlushAll(t.Context()))
330
331 _, err = svc.Get(t.Context(), msg.ID)
332 require.Error(t, err, "deleted message must remain deleted")
333}
334
335func TestBroker_PublishLossyDropCounter(t *testing.T) {
336 t.Parallel()
337
338 // Tiny channel buffer so we can saturate from a single sender.
339 b := pubsub.NewBrokerWithOptions[int](1)
340 defer b.Shutdown()
341
342 subCtx, cancel := context.WithCancel(t.Context())
343 defer cancel()
344 sub := b.Subscribe(subCtx)
345 require.NotNil(t, sub)
346
347 // Don't read from sub. Saturate the buffer.
348 for range 100 {
349 b.Publish(pubsub.UpdatedEvent, 1)
350 }
351
352 require.GreaterOrEqual(t, b.DropCount(), uint64(1),
353 "lossy Publish must increment the drop counter under contention")
354}
355
356func TestBroker_PublishMustDeliverHonorsTimeout(t *testing.T) {
357 t.Parallel()
358
359 b := pubsub.NewBrokerWithOptions[int](1)
360 b.SetMustDeliverTimeout(20 * time.Millisecond)
361 defer b.Shutdown()
362
363 subCtx, cancel := context.WithCancel(t.Context())
364 defer cancel()
365 sub := b.Subscribe(subCtx)
366 require.NotNil(t, sub)
367
368 // Saturate: one event sits in the buffer, the second must wait.
369 b.Publish(pubsub.UpdatedEvent, 1)
370
371 // PublishMustDeliver should block up to 20ms then drop.
372 start := time.Now()
373 b.PublishMustDeliver(t.Context(), pubsub.UpdatedEvent, 2)
374 elapsed := time.Since(start)
375
376 require.GreaterOrEqual(t, elapsed, 20*time.Millisecond,
377 "PublishMustDeliver should block at least the timeout under contention")
378 require.Less(t, elapsed, 200*time.Millisecond,
379 "PublishMustDeliver must not block indefinitely")
380 require.GreaterOrEqual(t, b.MustDeliverDropCount(), uint64(1),
381 "timeout must increment the must-deliver drop counter")
382}
383
384func TestBroker_PublishMustDeliverWithReader(t *testing.T) {
385 t.Parallel()
386
387 b := pubsub.NewBrokerWithOptions[int](1)
388 b.SetMustDeliverTimeout(50 * time.Millisecond)
389 defer b.Shutdown()
390
391 subCtx, cancel := context.WithCancel(t.Context())
392 defer cancel()
393 sub := b.Subscribe(subCtx)
394
395 var received atomic.Uint64
396 done := make(chan struct{})
397 go func() {
398 defer close(done)
399 for {
400 select {
401 case <-subCtx.Done():
402 return
403 case _, ok := <-sub:
404 if !ok {
405 return
406 }
407 received.Add(1)
408 }
409 }
410 }()
411
412 for i := range 10 {
413 b.PublishMustDeliver(t.Context(), pubsub.UpdatedEvent, i)
414 }
415
416 // All 10 should land within the must-deliver timeout window.
417 require.Eventually(t, func() bool { return received.Load() == 10 },
418 time.Second, 5*time.Millisecond,
419 "all must-deliver events should reach an active subscriber")
420 require.Zero(t, b.MustDeliverDropCount(),
421 "no drops expected when subscriber drains promptly")
422}
423
424func TestUpdate_TerminalEventUsesMustDeliver(t *testing.T) {
425 t.Parallel()
426
427 svc, sessionID := newTestService(t, WithDebounce(time.Hour))
428
429 subCtx, cancel := context.WithCancel(t.Context())
430 defer cancel()
431 sub := svc.Subscribe(subCtx)
432
433 var seenFinish atomic.Bool
434 done := make(chan struct{})
435 go func() {
436 defer close(done)
437 for {
438 select {
439 case <-subCtx.Done():
440 return
441 case ev, ok := <-sub:
442 if !ok {
443 return
444 }
445 if ev.Type == pubsub.UpdatedEvent && ev.Payload.IsFinished() {
446 seenFinish.Store(true)
447 }
448 }
449 }
450 }()
451
452 msg, err := svc.Create(t.Context(), sessionID, CreateMessageParams{Role: Assistant})
453 require.NoError(t, err)
454 msg.AppendContent("final")
455 msg.AddFinish(FinishReasonEndTurn, "", "")
456 require.NoError(t, svc.Update(t.Context(), msg))
457
458 require.Eventually(t, func() bool { return seenFinish.Load() },
459 time.Second, 10*time.Millisecond,
460 "terminal update must reach subscribers via the must-deliver path")
461}
462
463func TestUpdate_ZeroDebounceFlushesEveryUpdate(t *testing.T) {
464 t.Parallel()
465
466 svc, sessionID := newTestService(t, WithDebounce(0))
467
468 msg, err := svc.Create(t.Context(), sessionID, CreateMessageParams{Role: Assistant})
469 require.NoError(t, err)
470
471 for i := 0; i < 3; i++ {
472 msg.AppendContent("x")
473 require.NoError(t, svc.Update(t.Context(), msg))
474 got, err := svc.Get(t.Context(), msg.ID)
475 require.NoError(t, err)
476 require.Len(t, got.Content().Text, i+1, "every update must land synchronously when debounce is 0")
477 }
478}
479
480// TestFlush_WaitsForInFlightWrite reproduces the failure where Flush
481// or FlushAll could return before a concurrent in-flight SQL write
482// completed. We block UpdateMessage on a release channel, fire the
483// debounce timer, then call Flush and assert it does not return until
484// the in-flight write actually lands.
485func TestFlush_WaitsForInFlightWrite(t *testing.T) {
486 t.Parallel()
487
488 conn, err := db.Connect(t.Context(), t.TempDir())
489 require.NoError(t, err)
490 t.Cleanup(func() { _ = conn.Close() })
491
492 q := db.New(conn)
493 sessions := session.NewService(q, conn)
494 sess, err := sessions.Create(t.Context(), "test")
495 require.NoError(t, err)
496
497 slow := &slowUpdateQuerier{
498 Querier: q,
499 release: make(chan struct{}),
500 started: make(chan struct{}),
501 }
502 // Short debounce so the timer fires quickly.
503 svc := NewService(slow, WithDebounce(10*time.Millisecond))
504
505 msg, err := svc.Create(t.Context(), sess.ID, CreateMessageParams{Role: Assistant})
506 require.NoError(t, err)
507 msg.AppendContent("payload")
508 require.NoError(t, svc.Update(t.Context(), msg))
509
510 // Wait for the timer-fired flush to enter UpdateMessage.
511 select {
512 case <-slow.started:
513 case <-time.After(time.Second):
514 t.Fatal("timer-fired flush never reached UpdateMessage")
515 }
516
517 // At this point the buffer is dirty=false but flushing=true. A
518 // naive Flush would early-return on !dirty. Spawn Flush in a
519 // goroutine and assert it has not returned while the write is
520 // still blocked.
521 flushDone := make(chan error, 1)
522 go func() { flushDone <- svc.Flush(t.Context(), msg.ID) }()
523
524 select {
525 case err := <-flushDone:
526 t.Fatalf("Flush returned %v while in-flight write was still blocked", err)
527 case <-time.After(50 * time.Millisecond):
528 // Expected: Flush is correctly waiting.
529 }
530
531 // Release the slow write; Flush must now return cleanly.
532 close(slow.release)
533 select {
534 case err := <-flushDone:
535 require.NoError(t, err)
536 case <-time.After(time.Second):
537 t.Fatal("Flush did not return after in-flight write completed")
538 }
539
540 // The SQL row should now reflect the buffered payload.
541 got, err := svc.Get(t.Context(), msg.ID)
542 require.NoError(t, err)
543 require.Equal(t, "payload", got.Content().Text)
544}
545
546// TestFlushAll_WaitsForInFlightWrite asserts FlushAll picks up IDs
547// whose buffer is currently flushing (dirty=false) so shutdown and
548// session-switch callers don't return while an SQL write is mid-flight.
549func TestFlushAll_WaitsForInFlightWrite(t *testing.T) {
550 t.Parallel()
551
552 conn, err := db.Connect(t.Context(), t.TempDir())
553 require.NoError(t, err)
554 t.Cleanup(func() { _ = conn.Close() })
555
556 q := db.New(conn)
557 sessions := session.NewService(q, conn)
558 sess, err := sessions.Create(t.Context(), "test")
559 require.NoError(t, err)
560
561 slow := &slowUpdateQuerier{
562 Querier: q,
563 release: make(chan struct{}),
564 started: make(chan struct{}),
565 }
566 svc := NewService(slow, WithDebounce(10*time.Millisecond))
567
568 msg, err := svc.Create(t.Context(), sess.ID, CreateMessageParams{Role: Assistant})
569 require.NoError(t, err)
570 msg.AppendContent("payload")
571 require.NoError(t, svc.Update(t.Context(), msg))
572
573 select {
574 case <-slow.started:
575 case <-time.After(time.Second):
576 t.Fatal("timer-fired flush never reached UpdateMessage")
577 }
578
579 flushDone := make(chan error, 1)
580 go func() { flushDone <- svc.FlushAll(t.Context()) }()
581
582 select {
583 case err := <-flushDone:
584 t.Fatalf("FlushAll returned %v while in-flight write was still blocked", err)
585 case <-time.After(50 * time.Millisecond):
586 }
587
588 close(slow.release)
589 select {
590 case err := <-flushDone:
591 require.NoError(t, err)
592 case <-time.After(time.Second):
593 t.Fatal("FlushAll did not return after in-flight write completed")
594 }
595
596 got, err := svc.Get(t.Context(), msg.ID)
597 require.NoError(t, err)
598 require.Equal(t, "payload", got.Content().Text)
599}
600
601// TestUpdate_StructuralFlushUsesMustDeliver covers the second review
602// finding: structural terminal events (tool-call add, tool-call
603// finish, reasoning end) must publish via the must-deliver path even
604// when the message itself is not yet IsFinished.
605//
606// We detect which path was taken by saturating a subscriber's channel
607// buffer with no reader. With a short must-deliver timeout, the
608// must-deliver path increments [pubsub.Broker.MustDeliverDropCount]
609// after the timeout expires; the lossy path increments
610// [pubsub.Broker.DropCount] immediately. The two counters are
611// disjoint, so they precisely identify which call site published the
612// event.
613func TestUpdate_StructuralFlushUsesMustDeliver(t *testing.T) {
614 t.Parallel()
615
616 cases := []struct {
617 name string
618 mut func(*Message)
619 }{
620 {
621 name: "tool call add",
622 mut: func(m *Message) {
623 m.AddToolCall(ToolCall{ID: "tc1", Name: "view"})
624 },
625 },
626 {
627 name: "tool call finish",
628 mut: func(m *Message) {
629 m.AddToolCall(ToolCall{ID: "tc1", Name: "view", Input: "{}", Finished: true})
630 },
631 },
632 {
633 name: "reasoning end",
634 mut: func(m *Message) {
635 m.AppendReasoningContent("hmm")
636 m.FinishThinking()
637 },
638 },
639 }
640
641 for _, tc := range cases {
642 t.Run(tc.name, func(t *testing.T) {
643 t.Parallel()
644
645 conn, err := db.Connect(t.Context(), t.TempDir())
646 require.NoError(t, err)
647 t.Cleanup(func() { _ = conn.Close() })
648
649 q := db.New(conn)
650 sessions := session.NewService(q, conn)
651 sess, err := sessions.Create(t.Context(), "test")
652 require.NoError(t, err)
653
654 // Replace the default broker with a tiny buffer + short
655 // must-deliver timeout so we can fully saturate from a
656 // single sender and observe drops without long waits.
657 svc := NewService(q, WithDebounce(time.Hour))
658 impl := svc.(*service)
659 impl.Shutdown()
660 impl.Broker = pubsub.NewBrokerWithOptions[Message](1)
661 impl.SetMustDeliverTimeout(40 * time.Millisecond)
662
663 subCtx, cancel := context.WithCancel(t.Context())
664 defer cancel()
665 sub := svc.Subscribe(subCtx)
666
667 msg, err := svc.Create(t.Context(), sess.ID, CreateMessageParams{Role: Assistant})
668 require.NoError(t, err)
669
670 // Saturate the subscriber's buffer (capacity 1). The
671 // CreatedEvent from Create above already left one event
672 // in the buffer; we never read sub, so the next publish
673 // has nowhere to go.
674 _ = sub // intentionally not drained.
675
676 // Drive the structural change. With debounce=1h, Update
677 // flushes synchronously and routes through whichever
678 // publish path the service chose for structural events.
679 tc.mut(&msg)
680 require.NoError(t, svc.Update(t.Context(), msg))
681
682 // Must-deliver timeout (40ms) should have expired with
683 // no drain. If structural events are routed through
684 // must-deliver: MustDeliverDropCount > 0, DropCount
685 // unchanged. If routed through lossy Publish:
686 // DropCount > 0, MustDeliverDropCount == 0.
687 require.Eventually(t, func() bool {
688 return impl.MustDeliverDropCount() >= 1
689 }, time.Second, 5*time.Millisecond,
690 "structural terminal event should publish via must-deliver, not lossy Publish")
691 require.Zero(t, impl.DropCount(),
692 "structural terminal event must not be silently dropped via lossy Publish")
693 })
694 }
695}