1use rand::Rng;
2use std::{
3 collections::BTreeMap,
4 path::{Path, PathBuf},
5 time::{Duration, Instant},
6};
7use tempdir::TempDir;
8
9use crate::time::ReplicaId;
10
11#[derive(Clone)]
12struct Envelope<T: Clone> {
13 message: T,
14 sender: ReplicaId,
15}
16
17pub(crate) struct Network<T: Clone> {
18 inboxes: BTreeMap<ReplicaId, Vec<Envelope<T>>>,
19 all_messages: Vec<T>,
20}
21
22impl<T: Clone> Network<T> {
23 pub fn new() -> Self {
24 Network {
25 inboxes: BTreeMap::new(),
26 all_messages: Vec::new(),
27 }
28 }
29
30 pub fn add_peer(&mut self, id: ReplicaId) {
31 self.inboxes.insert(id, Vec::new());
32 }
33
34 pub fn is_idle(&self) -> bool {
35 self.inboxes.values().all(|i| i.is_empty())
36 }
37
38 pub fn broadcast<R>(&mut self, sender: ReplicaId, messages: Vec<T>, rng: &mut R)
39 where
40 R: Rng,
41 {
42 for (replica, inbox) in self.inboxes.iter_mut() {
43 if *replica != sender {
44 for message in &messages {
45 let min_index = inbox
46 .iter()
47 .enumerate()
48 .rev()
49 .find_map(|(index, envelope)| {
50 if sender == envelope.sender {
51 Some(index + 1)
52 } else {
53 None
54 }
55 })
56 .unwrap_or(0);
57
58 // Insert one or more duplicates of this message *after* the previous
59 // message delivered by this replica.
60 for _ in 0..rng.gen_range(1..4) {
61 let insertion_index = rng.gen_range(min_index..inbox.len() + 1);
62 inbox.insert(
63 insertion_index,
64 Envelope {
65 message: message.clone(),
66 sender,
67 },
68 );
69 }
70 }
71 }
72 }
73 self.all_messages.extend(messages);
74 }
75
76 pub fn has_unreceived(&self, receiver: ReplicaId) -> bool {
77 !self.inboxes[&receiver].is_empty()
78 }
79
80 pub fn receive<R>(&mut self, receiver: ReplicaId, rng: &mut R) -> Vec<T>
81 where
82 R: Rng,
83 {
84 let inbox = self.inboxes.get_mut(&receiver).unwrap();
85 let count = rng.gen_range(0..inbox.len() + 1);
86 inbox
87 .drain(0..count)
88 .map(|envelope| envelope.message)
89 .collect()
90 }
91}
92
93pub fn sample_text(rows: usize, cols: usize) -> String {
94 let mut text = String::new();
95 for row in 0..rows {
96 let c: char = ('a' as u32 + row as u32) as u8 as char;
97 let mut line = c.to_string().repeat(cols);
98 if row < rows - 1 {
99 line.push('\n');
100 }
101 text += &line;
102 }
103 text
104}
105
106pub fn temp_tree(tree: serde_json::Value) -> TempDir {
107 let dir = TempDir::new("").unwrap();
108 write_tree(dir.path(), tree);
109 dir
110}
111
112fn write_tree(path: &Path, tree: serde_json::Value) {
113 use serde_json::Value;
114 use std::fs;
115
116 if let Value::Object(map) = tree {
117 for (name, contents) in map {
118 let mut path = PathBuf::from(path);
119 path.push(name);
120 match contents {
121 Value::Object(_) => {
122 fs::create_dir(&path).unwrap();
123 write_tree(&path, contents);
124 }
125 Value::Null => {
126 fs::create_dir(&path).unwrap();
127 }
128 Value::String(contents) => {
129 fs::write(&path, contents).unwrap();
130 }
131 _ => {
132 panic!("JSON object must contain only objects, strings, or null");
133 }
134 }
135 }
136 } else {
137 panic!("You must pass a JSON object to this helper")
138 }
139}
140
141pub async fn assert_condition(poll_interval: u64, timeout: u64, mut f: impl FnMut() -> bool) {
142 let poll_interval = Duration::from_millis(poll_interval);
143 let timeout = Duration::from_millis(timeout);
144 let start = Instant::now();
145 loop {
146 if f() {
147 return;
148 } else if Instant::now().duration_since(start) < timeout {
149 smol::Timer::after(poll_interval).await;
150 } else {
151 panic!("timed out waiting on condition");
152 }
153 }
154}