1use std::fmt::Debug;
2
3use clock::ReplicaId;
4use collections::{BTreeMap, HashSet};
5
6pub struct Network<T: Clone, R: rand::Rng> {
7 inboxes: BTreeMap<ReplicaId, Vec<Envelope<T>>>,
8 disconnected_peers: HashSet<ReplicaId>,
9 rng: R,
10}
11
12#[derive(Clone, Debug)]
13struct Envelope<T: Clone> {
14 message: T,
15}
16
17impl<T: Clone, R: rand::Rng> Network<T, R> {
18 pub fn new(rng: R) -> Self {
19 Network {
20 inboxes: BTreeMap::default(),
21 disconnected_peers: HashSet::default(),
22 rng,
23 }
24 }
25
26 pub fn add_peer(&mut self, id: ReplicaId) {
27 self.inboxes.insert(id, Vec::new());
28 }
29
30 pub fn disconnect_peer(&mut self, id: ReplicaId) {
31 self.disconnected_peers.insert(id);
32 self.inboxes.get_mut(&id).unwrap().clear();
33 }
34
35 pub fn reconnect_peer(&mut self, id: ReplicaId, replicate_from: ReplicaId) {
36 assert!(self.disconnected_peers.remove(&id));
37 self.replicate(replicate_from, id);
38 }
39
40 pub fn is_disconnected(&self, id: ReplicaId) -> bool {
41 self.disconnected_peers.contains(&id)
42 }
43
44 pub fn contains_disconnected_peers(&self) -> bool {
45 !self.disconnected_peers.is_empty()
46 }
47
48 pub fn replicate(&mut self, old_replica_id: ReplicaId, new_replica_id: ReplicaId) {
49 self.inboxes
50 .insert(new_replica_id, self.inboxes[&old_replica_id].clone());
51 }
52
53 pub fn is_idle(&self) -> bool {
54 self.inboxes.values().all(|i| i.is_empty())
55 }
56
57 pub fn broadcast(&mut self, sender: ReplicaId, messages: Vec<T>) {
58 // Drop messages from disconnected peers.
59 if self.disconnected_peers.contains(&sender) {
60 return;
61 }
62
63 for (replica, inbox) in self.inboxes.iter_mut() {
64 if *replica != sender && !self.disconnected_peers.contains(replica) {
65 for message in &messages {
66 // Insert one or more duplicates of this message, potentially *before* the previous
67 // message sent by this peer to simulate out-of-order delivery.
68 for _ in 0..self.rng.random_range(1..4) {
69 let insertion_index = self.rng.random_range(0..inbox.len() + 1);
70 inbox.insert(
71 insertion_index,
72 Envelope {
73 message: message.clone(),
74 },
75 );
76 }
77 }
78 }
79 }
80 }
81
82 pub fn has_unreceived(&self, receiver: ReplicaId) -> bool {
83 !self.inboxes[&receiver].is_empty()
84 }
85
86 pub fn receive(&mut self, receiver: ReplicaId) -> Vec<T> {
87 let inbox = self.inboxes.get_mut(&receiver).unwrap();
88 let count = self.rng.random_range(0..inbox.len() + 1);
89 inbox
90 .drain(0..count)
91 .map(|envelope| envelope.message)
92 .collect()
93 }
94}