test.rs

  1use rand::Rng;
  2use std::{
  3    collections::BTreeMap,
  4    path::{Path, PathBuf},
  5    time::{Duration, Instant},
  6};
  7use tempdir::TempDir;
  8
  9use crate::time::ReplicaId;
 10
 11#[derive(Clone)]
 12struct Envelope<T: Clone> {
 13    message: T,
 14    sender: ReplicaId,
 15}
 16
 17pub(crate) struct Network<T: Clone> {
 18    inboxes: BTreeMap<ReplicaId, Vec<Envelope<T>>>,
 19    all_messages: Vec<T>,
 20}
 21
 22impl<T: Clone> Network<T> {
 23    pub fn new() -> Self {
 24        Network {
 25            inboxes: BTreeMap::new(),
 26            all_messages: Vec::new(),
 27        }
 28    }
 29
 30    pub fn add_peer(&mut self, id: ReplicaId) {
 31        self.inboxes.insert(id, Vec::new());
 32    }
 33
 34    pub fn is_idle(&self) -> bool {
 35        self.inboxes.values().all(|i| i.is_empty())
 36    }
 37
 38    pub fn broadcast<R>(&mut self, sender: ReplicaId, messages: Vec<T>, rng: &mut R)
 39    where
 40        R: Rng,
 41    {
 42        for (replica, inbox) in self.inboxes.iter_mut() {
 43            if *replica != sender {
 44                for message in &messages {
 45                    let min_index = inbox
 46                        .iter()
 47                        .enumerate()
 48                        .rev()
 49                        .find_map(|(index, envelope)| {
 50                            if sender == envelope.sender {
 51                                Some(index + 1)
 52                            } else {
 53                                None
 54                            }
 55                        })
 56                        .unwrap_or(0);
 57
 58                    // Insert one or more duplicates of this message *after* the previous
 59                    // message delivered by this replica.
 60                    for _ in 0..rng.gen_range(1..4) {
 61                        let insertion_index = rng.gen_range(min_index..inbox.len() + 1);
 62                        inbox.insert(
 63                            insertion_index,
 64                            Envelope {
 65                                message: message.clone(),
 66                                sender,
 67                            },
 68                        );
 69                    }
 70                }
 71            }
 72        }
 73        self.all_messages.extend(messages);
 74    }
 75
 76    pub fn has_unreceived(&self, receiver: ReplicaId) -> bool {
 77        !self.inboxes[&receiver].is_empty()
 78    }
 79
 80    pub fn receive<R>(&mut self, receiver: ReplicaId, rng: &mut R) -> Vec<T>
 81    where
 82        R: Rng,
 83    {
 84        let inbox = self.inboxes.get_mut(&receiver).unwrap();
 85        let count = rng.gen_range(0..inbox.len() + 1);
 86        inbox
 87            .drain(0..count)
 88            .map(|envelope| envelope.message)
 89            .collect()
 90    }
 91}
 92
 93pub fn sample_text(rows: usize, cols: usize) -> String {
 94    let mut text = String::new();
 95    for row in 0..rows {
 96        let c: char = ('a' as u32 + row as u32) as u8 as char;
 97        let mut line = c.to_string().repeat(cols);
 98        if row < rows - 1 {
 99            line.push('\n');
100        }
101        text += &line;
102    }
103    text
104}
105
106pub fn temp_tree(tree: serde_json::Value) -> TempDir {
107    let dir = TempDir::new("").unwrap();
108    write_tree(dir.path(), tree);
109    dir
110}
111
112fn write_tree(path: &Path, tree: serde_json::Value) {
113    use serde_json::Value;
114    use std::fs;
115
116    if let Value::Object(map) = tree {
117        for (name, contents) in map {
118            let mut path = PathBuf::from(path);
119            path.push(name);
120            match contents {
121                Value::Object(_) => {
122                    fs::create_dir(&path).unwrap();
123                    write_tree(&path, contents);
124                }
125                Value::Null => {
126                    fs::create_dir(&path).unwrap();
127                }
128                Value::String(contents) => {
129                    fs::write(&path, contents).unwrap();
130                }
131                _ => {
132                    panic!("JSON object must contain only objects, strings, or null");
133                }
134            }
135        }
136    } else {
137        panic!("You must pass a JSON object to this helper")
138    }
139}
140
141pub async fn assert_condition(poll_interval: u64, timeout: u64, mut f: impl FnMut() -> bool) {
142    let poll_interval = Duration::from_millis(poll_interval);
143    let timeout = Duration::from_millis(timeout);
144    let start = Instant::now();
145    loop {
146        if f() {
147            return;
148        } else if Instant::now().duration_since(start) < timeout {
149            smol::Timer::after(poll_interval).await;
150        } else {
151            panic!("timed out waiting on condition");
152        }
153    }
154}