1use super::*;
2use clock::ReplicaId;
3use rand::prelude::*;
4use std::{
5 cmp::Ordering,
6 env,
7 iter::Iterator,
8 time::{Duration, Instant},
9};
10
11#[cfg(test)]
12#[ctor::ctor]
13fn init_logger() {
14 // std::env::set_var("RUST_LOG", "info");
15 env_logger::init();
16}
17
18#[test]
19fn test_edit() {
20 let mut buffer = Buffer::new(0, 0, History::new("abc".into()));
21 assert_eq!(buffer.text(), "abc");
22 buffer.edit(vec![3..3], "def");
23 assert_eq!(buffer.text(), "abcdef");
24 buffer.edit(vec![0..0], "ghi");
25 assert_eq!(buffer.text(), "ghiabcdef");
26 buffer.edit(vec![5..5], "jkl");
27 assert_eq!(buffer.text(), "ghiabjklcdef");
28 buffer.edit(vec![6..7], "");
29 assert_eq!(buffer.text(), "ghiabjlcdef");
30 buffer.edit(vec![4..9], "mno");
31 assert_eq!(buffer.text(), "ghiamnoef");
32}
33
34#[gpui::test(iterations = 100)]
35fn test_random_edits(mut rng: StdRng) {
36 let operations = env::var("OPERATIONS")
37 .map(|i| i.parse().expect("invalid `OPERATIONS` variable"))
38 .unwrap_or(10);
39
40 let reference_string_len = rng.gen_range(0..3);
41 let mut reference_string = RandomCharIter::new(&mut rng)
42 .take(reference_string_len)
43 .collect::<String>();
44 let mut buffer = Buffer::new(0, 0, History::new(reference_string.clone().into()));
45 buffer.history.group_interval = Duration::from_millis(rng.gen_range(0..=200));
46 let mut buffer_versions = Vec::new();
47 log::info!(
48 "buffer text {:?}, version: {:?}",
49 buffer.text(),
50 buffer.version()
51 );
52
53 for _i in 0..operations {
54 let (old_ranges, new_text, _) = buffer.randomly_edit(&mut rng, 5);
55 for old_range in old_ranges.iter().rev() {
56 reference_string.replace_range(old_range.clone(), &new_text);
57 }
58 assert_eq!(buffer.text(), reference_string);
59 log::info!(
60 "buffer text {:?}, version: {:?}",
61 buffer.text(),
62 buffer.version()
63 );
64
65 if rng.gen_bool(0.25) {
66 buffer.randomly_undo_redo(&mut rng);
67 reference_string = buffer.text();
68 log::info!(
69 "buffer text {:?}, version: {:?}",
70 buffer.text(),
71 buffer.version()
72 );
73 }
74
75 let range = buffer.random_byte_range(0, &mut rng);
76 assert_eq!(
77 buffer.text_summary_for_range::<TextSummary, _>(range.clone()),
78 TextSummary::from(&reference_string[range])
79 );
80
81 buffer.check_invariants();
82
83 if rng.gen_bool(0.3) {
84 buffer_versions.push((buffer.clone(), buffer.subscribe()));
85 }
86 }
87
88 for (old_buffer, subscription) in buffer_versions {
89 let edits = buffer
90 .edits_since::<usize>(&old_buffer.version)
91 .collect::<Vec<_>>();
92
93 log::info!(
94 "applying edits since version {:?} to old text: {:?}: {:?}",
95 old_buffer.version(),
96 old_buffer.text(),
97 edits,
98 );
99
100 let mut text = old_buffer.visible_text.clone();
101 for edit in edits {
102 let new_text: String = buffer.text_for_range(edit.new.clone()).collect();
103 text.replace(edit.new.start..edit.new.start + edit.old.len(), &new_text);
104 }
105 assert_eq!(text.to_string(), buffer.text());
106
107 let subscription_edits = subscription.consume();
108 log::info!(
109 "applying subscription edits since version {:?} to old text: {:?}: {:?}",
110 old_buffer.version(),
111 old_buffer.text(),
112 subscription_edits,
113 );
114
115 let mut text = old_buffer.visible_text.clone();
116 for edit in subscription_edits.into_inner() {
117 let new_text: String = buffer.text_for_range(edit.new.clone()).collect();
118 text.replace(edit.new.start..edit.new.start + edit.old.len(), &new_text);
119 }
120 assert_eq!(text.to_string(), buffer.text());
121 }
122}
123
124#[test]
125fn test_line_len() {
126 let mut buffer = Buffer::new(0, 0, History::new("".into()));
127 buffer.edit(vec![0..0], "abcd\nefg\nhij");
128 buffer.edit(vec![12..12], "kl\nmno");
129 buffer.edit(vec![18..18], "\npqrs\n");
130 buffer.edit(vec![18..21], "\nPQ");
131
132 assert_eq!(buffer.line_len(0), 4);
133 assert_eq!(buffer.line_len(1), 3);
134 assert_eq!(buffer.line_len(2), 5);
135 assert_eq!(buffer.line_len(3), 3);
136 assert_eq!(buffer.line_len(4), 4);
137 assert_eq!(buffer.line_len(5), 0);
138}
139
140#[test]
141fn test_text_summary_for_range() {
142 let buffer = Buffer::new(0, 0, History::new("ab\nefg\nhklm\nnopqrs\ntuvwxyz".into()));
143 assert_eq!(
144 buffer.text_summary_for_range::<TextSummary, _>(1..3),
145 TextSummary {
146 bytes: 2,
147 lines: Point::new(1, 0),
148 lines_utf16: PointUtf16::new(1, 0),
149 first_line_chars: 1,
150 last_line_chars: 0,
151 longest_row: 0,
152 longest_row_chars: 1,
153 }
154 );
155 assert_eq!(
156 buffer.text_summary_for_range::<TextSummary, _>(1..12),
157 TextSummary {
158 bytes: 11,
159 lines: Point::new(3, 0),
160 lines_utf16: PointUtf16::new(3, 0),
161 first_line_chars: 1,
162 last_line_chars: 0,
163 longest_row: 2,
164 longest_row_chars: 4,
165 }
166 );
167 assert_eq!(
168 buffer.text_summary_for_range::<TextSummary, _>(0..20),
169 TextSummary {
170 bytes: 20,
171 lines: Point::new(4, 1),
172 lines_utf16: PointUtf16::new(4, 1),
173 first_line_chars: 2,
174 last_line_chars: 1,
175 longest_row: 3,
176 longest_row_chars: 6,
177 }
178 );
179 assert_eq!(
180 buffer.text_summary_for_range::<TextSummary, _>(0..22),
181 TextSummary {
182 bytes: 22,
183 lines: Point::new(4, 3),
184 lines_utf16: PointUtf16::new(4, 3),
185 first_line_chars: 2,
186 last_line_chars: 3,
187 longest_row: 3,
188 longest_row_chars: 6,
189 }
190 );
191 assert_eq!(
192 buffer.text_summary_for_range::<TextSummary, _>(7..22),
193 TextSummary {
194 bytes: 15,
195 lines: Point::new(2, 3),
196 lines_utf16: PointUtf16::new(2, 3),
197 first_line_chars: 4,
198 last_line_chars: 3,
199 longest_row: 1,
200 longest_row_chars: 6,
201 }
202 );
203}
204
205#[test]
206fn test_chars_at() {
207 let mut buffer = Buffer::new(0, 0, History::new("".into()));
208 buffer.edit(vec![0..0], "abcd\nefgh\nij");
209 buffer.edit(vec![12..12], "kl\nmno");
210 buffer.edit(vec![18..18], "\npqrs");
211 buffer.edit(vec![18..21], "\nPQ");
212
213 let chars = buffer.chars_at(Point::new(0, 0));
214 assert_eq!(chars.collect::<String>(), "abcd\nefgh\nijkl\nmno\nPQrs");
215
216 let chars = buffer.chars_at(Point::new(1, 0));
217 assert_eq!(chars.collect::<String>(), "efgh\nijkl\nmno\nPQrs");
218
219 let chars = buffer.chars_at(Point::new(2, 0));
220 assert_eq!(chars.collect::<String>(), "ijkl\nmno\nPQrs");
221
222 let chars = buffer.chars_at(Point::new(3, 0));
223 assert_eq!(chars.collect::<String>(), "mno\nPQrs");
224
225 let chars = buffer.chars_at(Point::new(4, 0));
226 assert_eq!(chars.collect::<String>(), "PQrs");
227
228 // Regression test:
229 let mut buffer = Buffer::new(0, 0, History::new("".into()));
230 buffer.edit(vec![0..0], "[workspace]\nmembers = [\n \"xray_core\",\n \"xray_server\",\n \"xray_cli\",\n \"xray_wasm\",\n]\n");
231 buffer.edit(vec![60..60], "\n");
232
233 let chars = buffer.chars_at(Point::new(6, 0));
234 assert_eq!(chars.collect::<String>(), " \"xray_wasm\",\n]\n");
235}
236
237#[test]
238fn test_anchors() {
239 let mut buffer = Buffer::new(0, 0, History::new("".into()));
240 buffer.edit(vec![0..0], "abc");
241 let left_anchor = buffer.anchor_before(2);
242 let right_anchor = buffer.anchor_after(2);
243
244 buffer.edit(vec![1..1], "def\n");
245 assert_eq!(buffer.text(), "adef\nbc");
246 assert_eq!(left_anchor.to_offset(&buffer), 6);
247 assert_eq!(right_anchor.to_offset(&buffer), 6);
248 assert_eq!(left_anchor.to_point(&buffer), Point { row: 1, column: 1 });
249 assert_eq!(right_anchor.to_point(&buffer), Point { row: 1, column: 1 });
250
251 buffer.edit(vec![2..3], "");
252 assert_eq!(buffer.text(), "adf\nbc");
253 assert_eq!(left_anchor.to_offset(&buffer), 5);
254 assert_eq!(right_anchor.to_offset(&buffer), 5);
255 assert_eq!(left_anchor.to_point(&buffer), Point { row: 1, column: 1 });
256 assert_eq!(right_anchor.to_point(&buffer), Point { row: 1, column: 1 });
257
258 buffer.edit(vec![5..5], "ghi\n");
259 assert_eq!(buffer.text(), "adf\nbghi\nc");
260 assert_eq!(left_anchor.to_offset(&buffer), 5);
261 assert_eq!(right_anchor.to_offset(&buffer), 9);
262 assert_eq!(left_anchor.to_point(&buffer), Point { row: 1, column: 1 });
263 assert_eq!(right_anchor.to_point(&buffer), Point { row: 2, column: 0 });
264
265 buffer.edit(vec![7..9], "");
266 assert_eq!(buffer.text(), "adf\nbghc");
267 assert_eq!(left_anchor.to_offset(&buffer), 5);
268 assert_eq!(right_anchor.to_offset(&buffer), 7);
269 assert_eq!(left_anchor.to_point(&buffer), Point { row: 1, column: 1 },);
270 assert_eq!(right_anchor.to_point(&buffer), Point { row: 1, column: 3 });
271
272 // Ensure anchoring to a point is equivalent to anchoring to an offset.
273 assert_eq!(
274 buffer.anchor_before(Point { row: 0, column: 0 }),
275 buffer.anchor_before(0)
276 );
277 assert_eq!(
278 buffer.anchor_before(Point { row: 0, column: 1 }),
279 buffer.anchor_before(1)
280 );
281 assert_eq!(
282 buffer.anchor_before(Point { row: 0, column: 2 }),
283 buffer.anchor_before(2)
284 );
285 assert_eq!(
286 buffer.anchor_before(Point { row: 0, column: 3 }),
287 buffer.anchor_before(3)
288 );
289 assert_eq!(
290 buffer.anchor_before(Point { row: 1, column: 0 }),
291 buffer.anchor_before(4)
292 );
293 assert_eq!(
294 buffer.anchor_before(Point { row: 1, column: 1 }),
295 buffer.anchor_before(5)
296 );
297 assert_eq!(
298 buffer.anchor_before(Point { row: 1, column: 2 }),
299 buffer.anchor_before(6)
300 );
301 assert_eq!(
302 buffer.anchor_before(Point { row: 1, column: 3 }),
303 buffer.anchor_before(7)
304 );
305 assert_eq!(
306 buffer.anchor_before(Point { row: 1, column: 4 }),
307 buffer.anchor_before(8)
308 );
309
310 // Comparison between anchors.
311 let anchor_at_offset_0 = buffer.anchor_before(0);
312 let anchor_at_offset_1 = buffer.anchor_before(1);
313 let anchor_at_offset_2 = buffer.anchor_before(2);
314
315 assert_eq!(
316 anchor_at_offset_0
317 .cmp(&anchor_at_offset_0, &buffer)
318 .unwrap(),
319 Ordering::Equal
320 );
321 assert_eq!(
322 anchor_at_offset_1
323 .cmp(&anchor_at_offset_1, &buffer)
324 .unwrap(),
325 Ordering::Equal
326 );
327 assert_eq!(
328 anchor_at_offset_2
329 .cmp(&anchor_at_offset_2, &buffer)
330 .unwrap(),
331 Ordering::Equal
332 );
333
334 assert_eq!(
335 anchor_at_offset_0
336 .cmp(&anchor_at_offset_1, &buffer)
337 .unwrap(),
338 Ordering::Less
339 );
340 assert_eq!(
341 anchor_at_offset_1
342 .cmp(&anchor_at_offset_2, &buffer)
343 .unwrap(),
344 Ordering::Less
345 );
346 assert_eq!(
347 anchor_at_offset_0
348 .cmp(&anchor_at_offset_2, &buffer)
349 .unwrap(),
350 Ordering::Less
351 );
352
353 assert_eq!(
354 anchor_at_offset_1
355 .cmp(&anchor_at_offset_0, &buffer)
356 .unwrap(),
357 Ordering::Greater
358 );
359 assert_eq!(
360 anchor_at_offset_2
361 .cmp(&anchor_at_offset_1, &buffer)
362 .unwrap(),
363 Ordering::Greater
364 );
365 assert_eq!(
366 anchor_at_offset_2
367 .cmp(&anchor_at_offset_0, &buffer)
368 .unwrap(),
369 Ordering::Greater
370 );
371}
372
373#[test]
374fn test_anchors_at_start_and_end() {
375 let mut buffer = Buffer::new(0, 0, History::new("".into()));
376 let before_start_anchor = buffer.anchor_before(0);
377 let after_end_anchor = buffer.anchor_after(0);
378
379 buffer.edit(vec![0..0], "abc");
380 assert_eq!(buffer.text(), "abc");
381 assert_eq!(before_start_anchor.to_offset(&buffer), 0);
382 assert_eq!(after_end_anchor.to_offset(&buffer), 3);
383
384 let after_start_anchor = buffer.anchor_after(0);
385 let before_end_anchor = buffer.anchor_before(3);
386
387 buffer.edit(vec![3..3], "def");
388 buffer.edit(vec![0..0], "ghi");
389 assert_eq!(buffer.text(), "ghiabcdef");
390 assert_eq!(before_start_anchor.to_offset(&buffer), 0);
391 assert_eq!(after_start_anchor.to_offset(&buffer), 3);
392 assert_eq!(before_end_anchor.to_offset(&buffer), 6);
393 assert_eq!(after_end_anchor.to_offset(&buffer), 9);
394}
395
396#[test]
397fn test_undo_redo() {
398 let mut buffer = Buffer::new(0, 0, History::new("1234".into()));
399 // Set group interval to zero so as to not group edits in the undo stack.
400 buffer.history.group_interval = Duration::from_secs(0);
401
402 buffer.edit(vec![1..1], "abx");
403 buffer.edit(vec![3..4], "yzef");
404 buffer.edit(vec![3..5], "cd");
405 assert_eq!(buffer.text(), "1abcdef234");
406
407 let transactions = buffer.history.undo_stack.clone();
408 assert_eq!(transactions.len(), 3);
409
410 buffer.undo_or_redo(transactions[0].clone()).unwrap();
411 assert_eq!(buffer.text(), "1cdef234");
412 buffer.undo_or_redo(transactions[0].clone()).unwrap();
413 assert_eq!(buffer.text(), "1abcdef234");
414
415 buffer.undo_or_redo(transactions[1].clone()).unwrap();
416 assert_eq!(buffer.text(), "1abcdx234");
417 buffer.undo_or_redo(transactions[2].clone()).unwrap();
418 assert_eq!(buffer.text(), "1abx234");
419 buffer.undo_or_redo(transactions[1].clone()).unwrap();
420 assert_eq!(buffer.text(), "1abyzef234");
421 buffer.undo_or_redo(transactions[2].clone()).unwrap();
422 assert_eq!(buffer.text(), "1abcdef234");
423
424 buffer.undo_or_redo(transactions[2].clone()).unwrap();
425 assert_eq!(buffer.text(), "1abyzef234");
426 buffer.undo_or_redo(transactions[0].clone()).unwrap();
427 assert_eq!(buffer.text(), "1yzef234");
428 buffer.undo_or_redo(transactions[1].clone()).unwrap();
429 assert_eq!(buffer.text(), "1234");
430}
431
432#[test]
433fn test_history() {
434 let mut now = Instant::now();
435 let mut buffer = Buffer::new(0, 0, History::new("123456".into()));
436
437 let set_id = if let Operation::UpdateSelections { set_id, .. } =
438 buffer.add_selection_set(&buffer.selections_from_ranges(vec![4..4]).unwrap())
439 {
440 set_id
441 } else {
442 unreachable!()
443 };
444 buffer.start_transaction_at(Some(set_id), now).unwrap();
445 buffer.edit(vec![2..4], "cd");
446 buffer.end_transaction_at(Some(set_id), now).unwrap();
447 assert_eq!(buffer.text(), "12cd56");
448 assert_eq!(buffer.selection_ranges(set_id).unwrap(), vec![4..4]);
449
450 buffer.start_transaction_at(Some(set_id), now).unwrap();
451 buffer
452 .update_selection_set(set_id, &buffer.selections_from_ranges(vec![1..3]).unwrap())
453 .unwrap();
454 buffer.edit(vec![4..5], "e");
455 buffer.end_transaction_at(Some(set_id), now).unwrap();
456 assert_eq!(buffer.text(), "12cde6");
457 assert_eq!(buffer.selection_ranges(set_id).unwrap(), vec![1..3]);
458
459 now += buffer.history.group_interval + Duration::from_millis(1);
460 buffer.start_transaction_at(Some(set_id), now).unwrap();
461 buffer
462 .update_selection_set(set_id, &buffer.selections_from_ranges(vec![2..2]).unwrap())
463 .unwrap();
464 buffer.edit(vec![0..1], "a");
465 buffer.edit(vec![1..1], "b");
466 buffer.end_transaction_at(Some(set_id), now).unwrap();
467 assert_eq!(buffer.text(), "ab2cde6");
468 assert_eq!(buffer.selection_ranges(set_id).unwrap(), vec![3..3]);
469
470 // Last transaction happened past the group interval, undo it on its
471 // own.
472 buffer.undo();
473 assert_eq!(buffer.text(), "12cde6");
474 assert_eq!(buffer.selection_ranges(set_id).unwrap(), vec![1..3]);
475
476 // First two transactions happened within the group interval, undo them
477 // together.
478 buffer.undo();
479 assert_eq!(buffer.text(), "123456");
480 assert_eq!(buffer.selection_ranges(set_id).unwrap(), vec![4..4]);
481
482 // Redo the first two transactions together.
483 buffer.redo();
484 assert_eq!(buffer.text(), "12cde6");
485 assert_eq!(buffer.selection_ranges(set_id).unwrap(), vec![1..3]);
486
487 // Redo the last transaction on its own.
488 buffer.redo();
489 assert_eq!(buffer.text(), "ab2cde6");
490 assert_eq!(buffer.selection_ranges(set_id).unwrap(), vec![3..3]);
491
492 buffer.start_transaction_at(None, now).unwrap();
493 assert!(buffer.end_transaction_at(None, now).is_none());
494 buffer.undo();
495 assert_eq!(buffer.text(), "12cde6");
496}
497
498#[test]
499fn test_concurrent_edits() {
500 let text = "abcdef";
501
502 let mut buffer1 = Buffer::new(1, 0, History::new(text.into()));
503 let mut buffer2 = Buffer::new(2, 0, History::new(text.into()));
504 let mut buffer3 = Buffer::new(3, 0, History::new(text.into()));
505
506 let buf1_op = buffer1.edit(vec![1..2], "12");
507 assert_eq!(buffer1.text(), "a12cdef");
508 let buf2_op = buffer2.edit(vec![3..4], "34");
509 assert_eq!(buffer2.text(), "abc34ef");
510 let buf3_op = buffer3.edit(vec![5..6], "56");
511 assert_eq!(buffer3.text(), "abcde56");
512
513 buffer1.apply_op(Operation::Edit(buf2_op.clone())).unwrap();
514 buffer1.apply_op(Operation::Edit(buf3_op.clone())).unwrap();
515 buffer2.apply_op(Operation::Edit(buf1_op.clone())).unwrap();
516 buffer2.apply_op(Operation::Edit(buf3_op.clone())).unwrap();
517 buffer3.apply_op(Operation::Edit(buf1_op.clone())).unwrap();
518 buffer3.apply_op(Operation::Edit(buf2_op.clone())).unwrap();
519
520 assert_eq!(buffer1.text(), "a12c34e56");
521 assert_eq!(buffer2.text(), "a12c34e56");
522 assert_eq!(buffer3.text(), "a12c34e56");
523}
524
525#[gpui::test(iterations = 100)]
526fn test_random_concurrent_edits(mut rng: StdRng) {
527 let peers = env::var("PEERS")
528 .map(|i| i.parse().expect("invalid `PEERS` variable"))
529 .unwrap_or(5);
530 let operations = env::var("OPERATIONS")
531 .map(|i| i.parse().expect("invalid `OPERATIONS` variable"))
532 .unwrap_or(10);
533
534 let base_text_len = rng.gen_range(0..10);
535 let base_text = RandomCharIter::new(&mut rng)
536 .take(base_text_len)
537 .collect::<String>();
538 let mut replica_ids = Vec::new();
539 let mut buffers = Vec::new();
540 let mut network = Network::new(rng.clone());
541
542 for i in 0..peers {
543 let mut buffer = Buffer::new(i as ReplicaId, 0, History::new(base_text.clone().into()));
544 buffer.history.group_interval = Duration::from_millis(rng.gen_range(0..=200));
545 buffers.push(buffer);
546 replica_ids.push(i as u16);
547 network.add_peer(i as u16);
548 }
549
550 log::info!("initial text: {:?}", base_text);
551
552 let mut mutation_count = operations;
553 loop {
554 let replica_index = rng.gen_range(0..peers);
555 let replica_id = replica_ids[replica_index];
556 let buffer = &mut buffers[replica_index];
557 match rng.gen_range(0..=100) {
558 0..=50 if mutation_count != 0 => {
559 let ops = buffer.randomly_mutate(&mut rng);
560 network.broadcast(buffer.replica_id, ops);
561 log::info!("buffer {} text: {:?}", buffer.replica_id, buffer.text());
562 mutation_count -= 1;
563 }
564 51..=70 if mutation_count != 0 => {
565 let ops = buffer.randomly_undo_redo(&mut rng);
566 network.broadcast(buffer.replica_id, ops);
567 mutation_count -= 1;
568 }
569 71..=100 if network.has_unreceived(replica_id) => {
570 let ops = network.receive(replica_id);
571 if !ops.is_empty() {
572 log::info!(
573 "peer {} applying {} ops from the network.",
574 replica_id,
575 ops.len()
576 );
577 buffer.apply_ops(ops).unwrap();
578 }
579 }
580 _ => {}
581 }
582 buffer.check_invariants();
583
584 if mutation_count == 0 && network.is_idle() {
585 break;
586 }
587 }
588
589 let first_buffer = &buffers[0];
590 for buffer in &buffers[1..] {
591 assert_eq!(
592 buffer.text(),
593 first_buffer.text(),
594 "Replica {} text != Replica 0 text",
595 buffer.replica_id
596 );
597 assert_eq!(
598 buffer.selection_sets().collect::<HashMap<_, _>>(),
599 first_buffer.selection_sets().collect::<HashMap<_, _>>()
600 );
601 assert_eq!(
602 buffer
603 .all_selection_ranges::<usize>()
604 .collect::<HashMap<_, _>>(),
605 first_buffer
606 .all_selection_ranges::<usize>()
607 .collect::<HashMap<_, _>>()
608 );
609 buffer.check_invariants();
610 }
611}
612
613#[derive(Clone)]
614struct Envelope<T: Clone> {
615 message: T,
616 sender: ReplicaId,
617}
618
619struct Network<T: Clone, R: rand::Rng> {
620 inboxes: std::collections::BTreeMap<ReplicaId, Vec<Envelope<T>>>,
621 all_messages: Vec<T>,
622 rng: R,
623}
624
625impl Buffer {
626 fn check_invariants(&self) {
627 // Ensure every fragment is ordered by locator in the fragment tree and corresponds
628 // to an insertion fragment in the insertions tree.
629 let mut prev_fragment_id = Locator::min();
630 for fragment in self.snapshot.fragments.items(&None) {
631 assert!(fragment.id > prev_fragment_id);
632 prev_fragment_id = fragment.id.clone();
633
634 let insertion_fragment = self
635 .snapshot
636 .insertions
637 .get(
638 &InsertionFragmentKey {
639 timestamp: fragment.insertion_timestamp.local(),
640 split_offset: fragment.insertion_offset,
641 },
642 &(),
643 )
644 .unwrap();
645 assert_eq!(insertion_fragment.fragment_id, fragment.id);
646 }
647
648 let mut cursor = self.snapshot.fragments.cursor::<Locator>();
649 for insertion_fragment in self.snapshot.insertions.cursor::<()>() {
650 cursor.seek(&insertion_fragment.fragment_id, Bias::Left, &None);
651 let fragment = cursor.item().unwrap();
652 assert_eq!(insertion_fragment.fragment_id, fragment.id);
653 assert_eq!(insertion_fragment.split_offset, fragment.insertion_offset);
654 }
655 }
656}
657
658impl<T: Clone, R: rand::Rng> Network<T, R> {
659 fn new(rng: R) -> Self {
660 Network {
661 inboxes: Default::default(),
662 all_messages: Vec::new(),
663 rng,
664 }
665 }
666
667 fn add_peer(&mut self, id: ReplicaId) {
668 self.inboxes.insert(id, Vec::new());
669 }
670
671 fn is_idle(&self) -> bool {
672 self.inboxes.values().all(|i| i.is_empty())
673 }
674
675 fn broadcast(&mut self, sender: ReplicaId, messages: Vec<T>) {
676 for (replica, inbox) in self.inboxes.iter_mut() {
677 if *replica != sender {
678 for message in &messages {
679 let min_index = inbox
680 .iter()
681 .enumerate()
682 .rev()
683 .find_map(|(index, envelope)| {
684 if sender == envelope.sender {
685 Some(index + 1)
686 } else {
687 None
688 }
689 })
690 .unwrap_or(0);
691
692 // Insert one or more duplicates of this message *after* the previous
693 // message delivered by this replica.
694 for _ in 0..self.rng.gen_range(1..4) {
695 let insertion_index = self.rng.gen_range(min_index..inbox.len() + 1);
696 inbox.insert(
697 insertion_index,
698 Envelope {
699 message: message.clone(),
700 sender,
701 },
702 );
703 }
704 }
705 }
706 }
707 self.all_messages.extend(messages);
708 }
709
710 fn has_unreceived(&self, receiver: ReplicaId) -> bool {
711 !self.inboxes[&receiver].is_empty()
712 }
713
714 fn receive(&mut self, receiver: ReplicaId) -> Vec<T> {
715 let inbox = self.inboxes.get_mut(&receiver).unwrap();
716 let count = self.rng.gen_range(0..inbox.len() + 1);
717 inbox
718 .drain(0..count)
719 .map(|envelope| envelope.message)
720 .collect()
721 }
722}