1use std::fmt::Debug;
 2
 3use clock::ReplicaId;
 4use collections::{BTreeMap, HashSet};
 5
 6pub struct Network<T: Clone, R: rand::Rng> {
 7    inboxes: BTreeMap<ReplicaId, Vec<Envelope<T>>>,
 8    disconnected_peers: HashSet<ReplicaId>,
 9    rng: R,
10}
11
12#[derive(Clone, Debug)]
13struct Envelope<T: Clone> {
14    message: T,
15}
16
17impl<T: Clone, R: rand::Rng> Network<T, R> {
18    pub fn new(rng: R) -> Self {
19        Network {
20            inboxes: BTreeMap::default(),
21            disconnected_peers: HashSet::default(),
22            rng,
23        }
24    }
25
26    pub fn add_peer(&mut self, id: ReplicaId) {
27        self.inboxes.insert(id, Vec::new());
28    }
29
30    pub fn disconnect_peer(&mut self, id: ReplicaId) {
31        self.disconnected_peers.insert(id);
32        self.inboxes.get_mut(&id).unwrap().clear();
33    }
34
35    pub fn reconnect_peer(&mut self, id: ReplicaId, replicate_from: ReplicaId) {
36        assert!(self.disconnected_peers.remove(&id));
37        self.replicate(replicate_from, id);
38    }
39
40    pub fn is_disconnected(&self, id: ReplicaId) -> bool {
41        self.disconnected_peers.contains(&id)
42    }
43
44    pub fn contains_disconnected_peers(&self) -> bool {
45        !self.disconnected_peers.is_empty()
46    }
47
48    pub fn replicate(&mut self, old_replica_id: ReplicaId, new_replica_id: ReplicaId) {
49        self.inboxes
50            .insert(new_replica_id, self.inboxes[&old_replica_id].clone());
51    }
52
53    pub fn is_idle(&self) -> bool {
54        self.inboxes.values().all(|i| i.is_empty())
55    }
56
57    pub fn broadcast(&mut self, sender: ReplicaId, messages: Vec<T>) {
58        // Drop messages from disconnected peers.
59        if self.disconnected_peers.contains(&sender) {
60            return;
61        }
62
63        for (replica, inbox) in self.inboxes.iter_mut() {
64            if *replica != sender && !self.disconnected_peers.contains(replica) {
65                for message in &messages {
66                    // Insert one or more duplicates of this message, potentially *before* the previous
67                    // message sent by this peer to simulate out-of-order delivery.
68                    for _ in 0..self.rng.random_range(1..4) {
69                        let insertion_index = self.rng.random_range(0..inbox.len() + 1);
70                        inbox.insert(
71                            insertion_index,
72                            Envelope {
73                                message: message.clone(),
74                            },
75                        );
76                    }
77                }
78            }
79        }
80    }
81
82    pub fn has_unreceived(&self, receiver: ReplicaId) -> bool {
83        !self.inboxes[&receiver].is_empty()
84    }
85
86    pub fn receive(&mut self, receiver: ReplicaId) -> Vec<T> {
87        let inbox = self.inboxes.get_mut(&receiver).unwrap();
88        let count = self.rng.random_range(0..inbox.len() + 1);
89        inbox
90            .drain(0..count)
91            .map(|envelope| envelope.message)
92            .collect()
93    }
94}