1use super::*;
2use clock::ReplicaId;
3use rand::prelude::*;
4use std::{
5 cmp::Ordering,
6 env,
7 iter::Iterator,
8 time::{Duration, Instant},
9};
10
11#[cfg(test)]
12#[ctor::ctor]
13fn init_logger() {
14 // std::env::set_var("RUST_LOG", "info");
15 env_logger::init();
16}
17
18#[test]
19fn test_edit() {
20 let mut buffer = Buffer::new(0, 0, History::new("abc".into()));
21 assert_eq!(buffer.text(), "abc");
22 buffer.edit(vec![3..3], "def");
23 assert_eq!(buffer.text(), "abcdef");
24 buffer.edit(vec![0..0], "ghi");
25 assert_eq!(buffer.text(), "ghiabcdef");
26 buffer.edit(vec![5..5], "jkl");
27 assert_eq!(buffer.text(), "ghiabjklcdef");
28 buffer.edit(vec![6..7], "");
29 assert_eq!(buffer.text(), "ghiabjlcdef");
30 buffer.edit(vec![4..9], "mno");
31 assert_eq!(buffer.text(), "ghiamnoef");
32}
33
34#[gpui::test(iterations = 100)]
35fn test_random_edits(mut rng: StdRng) {
36 let operations = env::var("OPERATIONS")
37 .map(|i| i.parse().expect("invalid `OPERATIONS` variable"))
38 .unwrap_or(10);
39
40 let reference_string_len = rng.gen_range(0..3);
41 let mut reference_string = RandomCharIter::new(&mut rng)
42 .take(reference_string_len)
43 .collect::<String>();
44 let mut buffer = Buffer::new(0, 0, History::new(reference_string.clone().into()));
45 buffer.history.group_interval = Duration::from_millis(rng.gen_range(0..=200));
46 let mut buffer_versions = Vec::new();
47 log::info!(
48 "buffer text {:?}, version: {:?}",
49 buffer.text(),
50 buffer.version()
51 );
52
53 for _i in 0..operations {
54 let (old_ranges, new_text, _) = buffer.randomly_edit(&mut rng, 5);
55 for old_range in old_ranges.iter().rev() {
56 reference_string.replace_range(old_range.clone(), &new_text);
57 }
58 assert_eq!(buffer.text(), reference_string);
59 log::info!(
60 "buffer text {:?}, version: {:?}",
61 buffer.text(),
62 buffer.version()
63 );
64
65 if rng.gen_bool(0.25) {
66 buffer.randomly_undo_redo(&mut rng);
67 reference_string = buffer.text();
68 log::info!(
69 "buffer text {:?}, version: {:?}",
70 buffer.text(),
71 buffer.version()
72 );
73 }
74
75 let range = buffer.random_byte_range(0, &mut rng);
76 assert_eq!(
77 buffer.text_summary_for_range::<TextSummary, _>(range.clone()),
78 TextSummary::from(&reference_string[range])
79 );
80
81 if rng.gen_bool(0.3) {
82 buffer_versions.push((buffer.clone(), buffer.subscribe()));
83 }
84 }
85
86 for (old_buffer, subscription) in buffer_versions {
87 let edits = buffer
88 .edits_since::<usize>(&old_buffer.version)
89 .collect::<Vec<_>>();
90
91 log::info!(
92 "applying edits since version {:?} to old text: {:?}: {:?}",
93 old_buffer.version(),
94 old_buffer.text(),
95 edits,
96 );
97
98 let mut text = old_buffer.visible_text.clone();
99 for edit in edits {
100 let new_text: String = buffer.text_for_range(edit.new.clone()).collect();
101 text.replace(edit.new.start..edit.new.start + edit.old.len(), &new_text);
102 }
103 assert_eq!(text.to_string(), buffer.text());
104
105 let subscription_edits = subscription.consume();
106 log::info!(
107 "applying subscription edits since version {:?} to old text: {:?}: {:?}",
108 old_buffer.version(),
109 old_buffer.text(),
110 subscription_edits,
111 );
112
113 let mut text = old_buffer.visible_text.clone();
114 for edit in subscription_edits.into_inner() {
115 let new_text: String = buffer.text_for_range(edit.new.clone()).collect();
116 text.replace(edit.new.start..edit.new.start + edit.old.len(), &new_text);
117 }
118 assert_eq!(text.to_string(), buffer.text());
119 }
120}
121
122#[test]
123fn test_line_len() {
124 let mut buffer = Buffer::new(0, 0, History::new("".into()));
125 buffer.edit(vec![0..0], "abcd\nefg\nhij");
126 buffer.edit(vec![12..12], "kl\nmno");
127 buffer.edit(vec![18..18], "\npqrs\n");
128 buffer.edit(vec![18..21], "\nPQ");
129
130 assert_eq!(buffer.line_len(0), 4);
131 assert_eq!(buffer.line_len(1), 3);
132 assert_eq!(buffer.line_len(2), 5);
133 assert_eq!(buffer.line_len(3), 3);
134 assert_eq!(buffer.line_len(4), 4);
135 assert_eq!(buffer.line_len(5), 0);
136}
137
138#[test]
139fn test_text_summary_for_range() {
140 let buffer = Buffer::new(0, 0, History::new("ab\nefg\nhklm\nnopqrs\ntuvwxyz".into()));
141 assert_eq!(
142 buffer.text_summary_for_range::<TextSummary, _>(1..3),
143 TextSummary {
144 bytes: 2,
145 lines: Point::new(1, 0),
146 lines_utf16: PointUtf16::new(1, 0),
147 first_line_chars: 1,
148 last_line_chars: 0,
149 longest_row: 0,
150 longest_row_chars: 1,
151 }
152 );
153 assert_eq!(
154 buffer.text_summary_for_range::<TextSummary, _>(1..12),
155 TextSummary {
156 bytes: 11,
157 lines: Point::new(3, 0),
158 lines_utf16: PointUtf16::new(3, 0),
159 first_line_chars: 1,
160 last_line_chars: 0,
161 longest_row: 2,
162 longest_row_chars: 4,
163 }
164 );
165 assert_eq!(
166 buffer.text_summary_for_range::<TextSummary, _>(0..20),
167 TextSummary {
168 bytes: 20,
169 lines: Point::new(4, 1),
170 lines_utf16: PointUtf16::new(4, 1),
171 first_line_chars: 2,
172 last_line_chars: 1,
173 longest_row: 3,
174 longest_row_chars: 6,
175 }
176 );
177 assert_eq!(
178 buffer.text_summary_for_range::<TextSummary, _>(0..22),
179 TextSummary {
180 bytes: 22,
181 lines: Point::new(4, 3),
182 lines_utf16: PointUtf16::new(4, 3),
183 first_line_chars: 2,
184 last_line_chars: 3,
185 longest_row: 3,
186 longest_row_chars: 6,
187 }
188 );
189 assert_eq!(
190 buffer.text_summary_for_range::<TextSummary, _>(7..22),
191 TextSummary {
192 bytes: 15,
193 lines: Point::new(2, 3),
194 lines_utf16: PointUtf16::new(2, 3),
195 first_line_chars: 4,
196 last_line_chars: 3,
197 longest_row: 1,
198 longest_row_chars: 6,
199 }
200 );
201}
202
203#[test]
204fn test_chars_at() {
205 let mut buffer = Buffer::new(0, 0, History::new("".into()));
206 buffer.edit(vec![0..0], "abcd\nefgh\nij");
207 buffer.edit(vec![12..12], "kl\nmno");
208 buffer.edit(vec![18..18], "\npqrs");
209 buffer.edit(vec![18..21], "\nPQ");
210
211 let chars = buffer.chars_at(Point::new(0, 0));
212 assert_eq!(chars.collect::<String>(), "abcd\nefgh\nijkl\nmno\nPQrs");
213
214 let chars = buffer.chars_at(Point::new(1, 0));
215 assert_eq!(chars.collect::<String>(), "efgh\nijkl\nmno\nPQrs");
216
217 let chars = buffer.chars_at(Point::new(2, 0));
218 assert_eq!(chars.collect::<String>(), "ijkl\nmno\nPQrs");
219
220 let chars = buffer.chars_at(Point::new(3, 0));
221 assert_eq!(chars.collect::<String>(), "mno\nPQrs");
222
223 let chars = buffer.chars_at(Point::new(4, 0));
224 assert_eq!(chars.collect::<String>(), "PQrs");
225
226 // Regression test:
227 let mut buffer = Buffer::new(0, 0, History::new("".into()));
228 buffer.edit(vec![0..0], "[workspace]\nmembers = [\n \"xray_core\",\n \"xray_server\",\n \"xray_cli\",\n \"xray_wasm\",\n]\n");
229 buffer.edit(vec![60..60], "\n");
230
231 let chars = buffer.chars_at(Point::new(6, 0));
232 assert_eq!(chars.collect::<String>(), " \"xray_wasm\",\n]\n");
233}
234
235#[test]
236fn test_anchors() {
237 let mut buffer = Buffer::new(0, 0, History::new("".into()));
238 buffer.edit(vec![0..0], "abc");
239 let left_anchor = buffer.anchor_before(2);
240 let right_anchor = buffer.anchor_after(2);
241
242 buffer.edit(vec![1..1], "def\n");
243 assert_eq!(buffer.text(), "adef\nbc");
244 assert_eq!(left_anchor.to_offset(&buffer), 6);
245 assert_eq!(right_anchor.to_offset(&buffer), 6);
246 assert_eq!(left_anchor.to_point(&buffer), Point { row: 1, column: 1 });
247 assert_eq!(right_anchor.to_point(&buffer), Point { row: 1, column: 1 });
248
249 buffer.edit(vec![2..3], "");
250 assert_eq!(buffer.text(), "adf\nbc");
251 assert_eq!(left_anchor.to_offset(&buffer), 5);
252 assert_eq!(right_anchor.to_offset(&buffer), 5);
253 assert_eq!(left_anchor.to_point(&buffer), Point { row: 1, column: 1 });
254 assert_eq!(right_anchor.to_point(&buffer), Point { row: 1, column: 1 });
255
256 buffer.edit(vec![5..5], "ghi\n");
257 assert_eq!(buffer.text(), "adf\nbghi\nc");
258 assert_eq!(left_anchor.to_offset(&buffer), 5);
259 assert_eq!(right_anchor.to_offset(&buffer), 9);
260 assert_eq!(left_anchor.to_point(&buffer), Point { row: 1, column: 1 });
261 assert_eq!(right_anchor.to_point(&buffer), Point { row: 2, column: 0 });
262
263 buffer.edit(vec![7..9], "");
264 assert_eq!(buffer.text(), "adf\nbghc");
265 assert_eq!(left_anchor.to_offset(&buffer), 5);
266 assert_eq!(right_anchor.to_offset(&buffer), 7);
267 assert_eq!(left_anchor.to_point(&buffer), Point { row: 1, column: 1 },);
268 assert_eq!(right_anchor.to_point(&buffer), Point { row: 1, column: 3 });
269
270 // Ensure anchoring to a point is equivalent to anchoring to an offset.
271 assert_eq!(
272 buffer.anchor_before(Point { row: 0, column: 0 }),
273 buffer.anchor_before(0)
274 );
275 assert_eq!(
276 buffer.anchor_before(Point { row: 0, column: 1 }),
277 buffer.anchor_before(1)
278 );
279 assert_eq!(
280 buffer.anchor_before(Point { row: 0, column: 2 }),
281 buffer.anchor_before(2)
282 );
283 assert_eq!(
284 buffer.anchor_before(Point { row: 0, column: 3 }),
285 buffer.anchor_before(3)
286 );
287 assert_eq!(
288 buffer.anchor_before(Point { row: 1, column: 0 }),
289 buffer.anchor_before(4)
290 );
291 assert_eq!(
292 buffer.anchor_before(Point { row: 1, column: 1 }),
293 buffer.anchor_before(5)
294 );
295 assert_eq!(
296 buffer.anchor_before(Point { row: 1, column: 2 }),
297 buffer.anchor_before(6)
298 );
299 assert_eq!(
300 buffer.anchor_before(Point { row: 1, column: 3 }),
301 buffer.anchor_before(7)
302 );
303 assert_eq!(
304 buffer.anchor_before(Point { row: 1, column: 4 }),
305 buffer.anchor_before(8)
306 );
307
308 // Comparison between anchors.
309 let anchor_at_offset_0 = buffer.anchor_before(0);
310 let anchor_at_offset_1 = buffer.anchor_before(1);
311 let anchor_at_offset_2 = buffer.anchor_before(2);
312
313 assert_eq!(
314 anchor_at_offset_0
315 .cmp(&anchor_at_offset_0, &buffer)
316 .unwrap(),
317 Ordering::Equal
318 );
319 assert_eq!(
320 anchor_at_offset_1
321 .cmp(&anchor_at_offset_1, &buffer)
322 .unwrap(),
323 Ordering::Equal
324 );
325 assert_eq!(
326 anchor_at_offset_2
327 .cmp(&anchor_at_offset_2, &buffer)
328 .unwrap(),
329 Ordering::Equal
330 );
331
332 assert_eq!(
333 anchor_at_offset_0
334 .cmp(&anchor_at_offset_1, &buffer)
335 .unwrap(),
336 Ordering::Less
337 );
338 assert_eq!(
339 anchor_at_offset_1
340 .cmp(&anchor_at_offset_2, &buffer)
341 .unwrap(),
342 Ordering::Less
343 );
344 assert_eq!(
345 anchor_at_offset_0
346 .cmp(&anchor_at_offset_2, &buffer)
347 .unwrap(),
348 Ordering::Less
349 );
350
351 assert_eq!(
352 anchor_at_offset_1
353 .cmp(&anchor_at_offset_0, &buffer)
354 .unwrap(),
355 Ordering::Greater
356 );
357 assert_eq!(
358 anchor_at_offset_2
359 .cmp(&anchor_at_offset_1, &buffer)
360 .unwrap(),
361 Ordering::Greater
362 );
363 assert_eq!(
364 anchor_at_offset_2
365 .cmp(&anchor_at_offset_0, &buffer)
366 .unwrap(),
367 Ordering::Greater
368 );
369}
370
371#[test]
372fn test_anchors_at_start_and_end() {
373 let mut buffer = Buffer::new(0, 0, History::new("".into()));
374 let before_start_anchor = buffer.anchor_before(0);
375 let after_end_anchor = buffer.anchor_after(0);
376
377 buffer.edit(vec![0..0], "abc");
378 assert_eq!(buffer.text(), "abc");
379 assert_eq!(before_start_anchor.to_offset(&buffer), 0);
380 assert_eq!(after_end_anchor.to_offset(&buffer), 3);
381
382 let after_start_anchor = buffer.anchor_after(0);
383 let before_end_anchor = buffer.anchor_before(3);
384
385 buffer.edit(vec![3..3], "def");
386 buffer.edit(vec![0..0], "ghi");
387 assert_eq!(buffer.text(), "ghiabcdef");
388 assert_eq!(before_start_anchor.to_offset(&buffer), 0);
389 assert_eq!(after_start_anchor.to_offset(&buffer), 3);
390 assert_eq!(before_end_anchor.to_offset(&buffer), 6);
391 assert_eq!(after_end_anchor.to_offset(&buffer), 9);
392}
393
394#[test]
395fn test_undo_redo() {
396 let mut buffer = Buffer::new(0, 0, History::new("1234".into()));
397 // Set group interval to zero so as to not group edits in the undo stack.
398 buffer.history.group_interval = Duration::from_secs(0);
399
400 buffer.edit(vec![1..1], "abx");
401 buffer.edit(vec![3..4], "yzef");
402 buffer.edit(vec![3..5], "cd");
403 assert_eq!(buffer.text(), "1abcdef234");
404
405 let transactions = buffer.history.undo_stack.clone();
406 assert_eq!(transactions.len(), 3);
407
408 buffer.undo_or_redo(transactions[0].clone()).unwrap();
409 assert_eq!(buffer.text(), "1cdef234");
410 buffer.undo_or_redo(transactions[0].clone()).unwrap();
411 assert_eq!(buffer.text(), "1abcdef234");
412
413 buffer.undo_or_redo(transactions[1].clone()).unwrap();
414 assert_eq!(buffer.text(), "1abcdx234");
415 buffer.undo_or_redo(transactions[2].clone()).unwrap();
416 assert_eq!(buffer.text(), "1abx234");
417 buffer.undo_or_redo(transactions[1].clone()).unwrap();
418 assert_eq!(buffer.text(), "1abyzef234");
419 buffer.undo_or_redo(transactions[2].clone()).unwrap();
420 assert_eq!(buffer.text(), "1abcdef234");
421
422 buffer.undo_or_redo(transactions[2].clone()).unwrap();
423 assert_eq!(buffer.text(), "1abyzef234");
424 buffer.undo_or_redo(transactions[0].clone()).unwrap();
425 assert_eq!(buffer.text(), "1yzef234");
426 buffer.undo_or_redo(transactions[1].clone()).unwrap();
427 assert_eq!(buffer.text(), "1234");
428}
429
430#[test]
431fn test_history() {
432 let mut now = Instant::now();
433 let mut buffer = Buffer::new(0, 0, History::new("123456".into()));
434
435 let set_id = if let Operation::UpdateSelections { set_id, .. } =
436 buffer.add_selection_set(&buffer.selections_from_ranges(vec![4..4]).unwrap())
437 {
438 set_id
439 } else {
440 unreachable!()
441 };
442 buffer.start_transaction_at(Some(set_id), now).unwrap();
443 buffer.edit(vec![2..4], "cd");
444 buffer.end_transaction_at(Some(set_id), now).unwrap();
445 assert_eq!(buffer.text(), "12cd56");
446 assert_eq!(buffer.selection_ranges(set_id).unwrap(), vec![4..4]);
447
448 buffer.start_transaction_at(Some(set_id), now).unwrap();
449 buffer
450 .update_selection_set(set_id, &buffer.selections_from_ranges(vec![1..3]).unwrap())
451 .unwrap();
452 buffer.edit(vec![4..5], "e");
453 buffer.end_transaction_at(Some(set_id), now).unwrap();
454 assert_eq!(buffer.text(), "12cde6");
455 assert_eq!(buffer.selection_ranges(set_id).unwrap(), vec![1..3]);
456
457 now += buffer.history.group_interval + Duration::from_millis(1);
458 buffer.start_transaction_at(Some(set_id), now).unwrap();
459 buffer
460 .update_selection_set(set_id, &buffer.selections_from_ranges(vec![2..2]).unwrap())
461 .unwrap();
462 buffer.edit(vec![0..1], "a");
463 buffer.edit(vec![1..1], "b");
464 buffer.end_transaction_at(Some(set_id), now).unwrap();
465 assert_eq!(buffer.text(), "ab2cde6");
466 assert_eq!(buffer.selection_ranges(set_id).unwrap(), vec![3..3]);
467
468 // Last transaction happened past the group interval, undo it on its
469 // own.
470 buffer.undo();
471 assert_eq!(buffer.text(), "12cde6");
472 assert_eq!(buffer.selection_ranges(set_id).unwrap(), vec![1..3]);
473
474 // First two transactions happened within the group interval, undo them
475 // together.
476 buffer.undo();
477 assert_eq!(buffer.text(), "123456");
478 assert_eq!(buffer.selection_ranges(set_id).unwrap(), vec![4..4]);
479
480 // Redo the first two transactions together.
481 buffer.redo();
482 assert_eq!(buffer.text(), "12cde6");
483 assert_eq!(buffer.selection_ranges(set_id).unwrap(), vec![1..3]);
484
485 // Redo the last transaction on its own.
486 buffer.redo();
487 assert_eq!(buffer.text(), "ab2cde6");
488 assert_eq!(buffer.selection_ranges(set_id).unwrap(), vec![3..3]);
489
490 buffer.start_transaction_at(None, now).unwrap();
491 assert!(buffer.end_transaction_at(None, now).is_none());
492 buffer.undo();
493 assert_eq!(buffer.text(), "12cde6");
494}
495
496#[test]
497fn test_concurrent_edits() {
498 let text = "abcdef";
499
500 let mut buffer1 = Buffer::new(1, 0, History::new(text.into()));
501 let mut buffer2 = Buffer::new(2, 0, History::new(text.into()));
502 let mut buffer3 = Buffer::new(3, 0, History::new(text.into()));
503
504 let buf1_op = buffer1.edit(vec![1..2], "12");
505 assert_eq!(buffer1.text(), "a12cdef");
506 let buf2_op = buffer2.edit(vec![3..4], "34");
507 assert_eq!(buffer2.text(), "abc34ef");
508 let buf3_op = buffer3.edit(vec![5..6], "56");
509 assert_eq!(buffer3.text(), "abcde56");
510
511 buffer1.apply_op(Operation::Edit(buf2_op.clone())).unwrap();
512 buffer1.apply_op(Operation::Edit(buf3_op.clone())).unwrap();
513 buffer2.apply_op(Operation::Edit(buf1_op.clone())).unwrap();
514 buffer2.apply_op(Operation::Edit(buf3_op.clone())).unwrap();
515 buffer3.apply_op(Operation::Edit(buf1_op.clone())).unwrap();
516 buffer3.apply_op(Operation::Edit(buf2_op.clone())).unwrap();
517
518 assert_eq!(buffer1.text(), "a12c34e56");
519 assert_eq!(buffer2.text(), "a12c34e56");
520 assert_eq!(buffer3.text(), "a12c34e56");
521}
522
523#[gpui::test(iterations = 100)]
524fn test_random_concurrent_edits(mut rng: StdRng) {
525 let peers = env::var("PEERS")
526 .map(|i| i.parse().expect("invalid `PEERS` variable"))
527 .unwrap_or(5);
528 let operations = env::var("OPERATIONS")
529 .map(|i| i.parse().expect("invalid `OPERATIONS` variable"))
530 .unwrap_or(10);
531
532 let base_text_len = rng.gen_range(0..10);
533 let base_text = RandomCharIter::new(&mut rng)
534 .take(base_text_len)
535 .collect::<String>();
536 let mut replica_ids = Vec::new();
537 let mut buffers = Vec::new();
538 let mut network = Network::new(rng.clone());
539
540 for i in 0..peers {
541 let mut buffer = Buffer::new(i as ReplicaId, 0, History::new(base_text.clone().into()));
542 buffer.history.group_interval = Duration::from_millis(rng.gen_range(0..=200));
543 buffers.push(buffer);
544 replica_ids.push(i as u16);
545 network.add_peer(i as u16);
546 }
547
548 log::info!("initial text: {:?}", base_text);
549
550 let mut mutation_count = operations;
551 loop {
552 let replica_index = rng.gen_range(0..peers);
553 let replica_id = replica_ids[replica_index];
554 let buffer = &mut buffers[replica_index];
555 match rng.gen_range(0..=100) {
556 0..=50 if mutation_count != 0 => {
557 let ops = buffer.randomly_mutate(&mut rng);
558 network.broadcast(buffer.replica_id, ops);
559 log::info!("buffer {} text: {:?}", buffer.replica_id, buffer.text());
560 mutation_count -= 1;
561 }
562 51..=70 if mutation_count != 0 => {
563 let ops = buffer.randomly_undo_redo(&mut rng);
564 network.broadcast(buffer.replica_id, ops);
565 mutation_count -= 1;
566 }
567 71..=100 if network.has_unreceived(replica_id) => {
568 let ops = network.receive(replica_id);
569 if !ops.is_empty() {
570 log::info!(
571 "peer {} applying {} ops from the network.",
572 replica_id,
573 ops.len()
574 );
575 buffer.apply_ops(ops).unwrap();
576 }
577 }
578 _ => {}
579 }
580
581 if mutation_count == 0 && network.is_idle() {
582 break;
583 }
584 }
585
586 let first_buffer = &buffers[0];
587 for buffer in &buffers[1..] {
588 assert_eq!(
589 buffer.text(),
590 first_buffer.text(),
591 "Replica {} text != Replica 0 text",
592 buffer.replica_id
593 );
594 assert_eq!(
595 buffer.selection_sets().collect::<HashMap<_, _>>(),
596 first_buffer.selection_sets().collect::<HashMap<_, _>>()
597 );
598 assert_eq!(
599 buffer
600 .all_selection_ranges::<usize>()
601 .collect::<HashMap<_, _>>(),
602 first_buffer
603 .all_selection_ranges::<usize>()
604 .collect::<HashMap<_, _>>()
605 );
606 }
607}
608
609#[derive(Clone)]
610struct Envelope<T: Clone> {
611 message: T,
612 sender: ReplicaId,
613}
614
615struct Network<T: Clone, R: rand::Rng> {
616 inboxes: std::collections::BTreeMap<ReplicaId, Vec<Envelope<T>>>,
617 all_messages: Vec<T>,
618 rng: R,
619}
620
621impl<T: Clone, R: rand::Rng> Network<T, R> {
622 fn new(rng: R) -> Self {
623 Network {
624 inboxes: Default::default(),
625 all_messages: Vec::new(),
626 rng,
627 }
628 }
629
630 fn add_peer(&mut self, id: ReplicaId) {
631 self.inboxes.insert(id, Vec::new());
632 }
633
634 fn is_idle(&self) -> bool {
635 self.inboxes.values().all(|i| i.is_empty())
636 }
637
638 fn broadcast(&mut self, sender: ReplicaId, messages: Vec<T>) {
639 for (replica, inbox) in self.inboxes.iter_mut() {
640 if *replica != sender {
641 for message in &messages {
642 let min_index = inbox
643 .iter()
644 .enumerate()
645 .rev()
646 .find_map(|(index, envelope)| {
647 if sender == envelope.sender {
648 Some(index + 1)
649 } else {
650 None
651 }
652 })
653 .unwrap_or(0);
654
655 // Insert one or more duplicates of this message *after* the previous
656 // message delivered by this replica.
657 for _ in 0..self.rng.gen_range(1..4) {
658 let insertion_index = self.rng.gen_range(min_index..inbox.len() + 1);
659 inbox.insert(
660 insertion_index,
661 Envelope {
662 message: message.clone(),
663 sender,
664 },
665 );
666 }
667 }
668 }
669 }
670 self.all_messages.extend(messages);
671 }
672
673 fn has_unreceived(&self, receiver: ReplicaId) -> bool {
674 !self.inboxes[&receiver].is_empty()
675 }
676
677 fn receive(&mut self, receiver: ReplicaId) -> Vec<T> {
678 let inbox = self.inboxes.get_mut(&receiver).unwrap();
679 let count = self.rng.gen_range(0..inbox.len() + 1);
680 inbox
681 .drain(0..count)
682 .map(|envelope| envelope.message)
683 .collect()
684 }
685}