clock.rs

  1use smallvec::SmallVec;
  2use std::{
  3    cmp::{self, Ordering},
  4    fmt, iter,
  5    ops::{Add, AddAssign},
  6};
  7
  8pub type ReplicaId = u16;
  9pub type Seq = u32;
 10
 11#[derive(Clone, Copy, Default, Eq, Hash, PartialEq, Ord, PartialOrd)]
 12pub struct Local {
 13    pub replica_id: ReplicaId,
 14    pub value: Seq,
 15}
 16
 17#[derive(Clone, Copy, Default, Eq, Hash, PartialEq)]
 18pub struct Lamport {
 19    pub replica_id: ReplicaId,
 20    pub value: Seq,
 21}
 22
 23impl Local {
 24    pub const MIN: Self = Self {
 25        replica_id: ReplicaId::MIN,
 26        value: Seq::MIN,
 27    };
 28    pub const MAX: Self = Self {
 29        replica_id: ReplicaId::MAX,
 30        value: Seq::MAX,
 31    };
 32
 33    pub fn new(replica_id: ReplicaId) -> Self {
 34        Self {
 35            replica_id,
 36            value: 1,
 37        }
 38    }
 39
 40    pub fn tick(&mut self) -> Self {
 41        let timestamp = *self;
 42        self.value += 1;
 43        timestamp
 44    }
 45
 46    pub fn observe(&mut self, timestamp: Self) {
 47        if timestamp.replica_id == self.replica_id {
 48            self.value = cmp::max(self.value, timestamp.value + 1);
 49        }
 50    }
 51}
 52
 53impl<'a> Add<&'a Self> for Local {
 54    type Output = Local;
 55
 56    fn add(self, other: &'a Self) -> Self::Output {
 57        cmp::max(&self, other).clone()
 58    }
 59}
 60
 61impl<'a> AddAssign<&'a Local> for Local {
 62    fn add_assign(&mut self, other: &Self) {
 63        if *self < *other {
 64            *self = other.clone();
 65        }
 66    }
 67}
 68
 69#[derive(Clone, Default, Hash, Eq, PartialEq)]
 70pub struct Global(SmallVec<[u32; 8]>);
 71
 72impl From<Vec<rpc::proto::VectorClockEntry>> for Global {
 73    fn from(message: Vec<rpc::proto::VectorClockEntry>) -> Self {
 74        let mut version = Self::new();
 75        for entry in message {
 76            version.observe(Local {
 77                replica_id: entry.replica_id as ReplicaId,
 78                value: entry.timestamp,
 79            });
 80        }
 81        version
 82    }
 83}
 84
 85impl<'a> From<&'a Global> for Vec<rpc::proto::VectorClockEntry> {
 86    fn from(version: &'a Global) -> Self {
 87        version
 88            .iter()
 89            .map(|entry| rpc::proto::VectorClockEntry {
 90                replica_id: entry.replica_id as u32,
 91                timestamp: entry.value,
 92            })
 93            .collect()
 94    }
 95}
 96
 97impl From<Global> for Vec<rpc::proto::VectorClockEntry> {
 98    fn from(version: Global) -> Self {
 99        (&version).into()
100    }
101}
102
103impl Global {
104    pub fn new() -> Self {
105        Self::default()
106    }
107
108    pub fn get(&self, replica_id: ReplicaId) -> Seq {
109        self.0.get(replica_id as usize).copied().unwrap_or(0) as Seq
110    }
111
112    pub fn observe(&mut self, timestamp: Local) {
113        if timestamp.value > 0 {
114            let new_len = timestamp.replica_id as usize + 1;
115            if new_len > self.0.len() {
116                self.0.resize(new_len, 0);
117            }
118
119            let entry = &mut self.0[timestamp.replica_id as usize];
120            *entry = cmp::max(*entry, timestamp.value);
121        }
122    }
123
124    pub fn join(&mut self, other: &Self) {
125        if other.0.len() > self.0.len() {
126            self.0.resize(other.0.len(), 0);
127        }
128
129        for (left, right) in self.0.iter_mut().zip(&other.0) {
130            *left = cmp::max(*left, *right);
131        }
132    }
133
134    pub fn meet(&mut self, other: &Self) {
135        if other.0.len() > self.0.len() {
136            self.0.resize(other.0.len(), 0);
137        }
138
139        let mut new_len = 0;
140        for (ix, (left, right)) in self
141            .0
142            .iter_mut()
143            .zip(other.0.iter().chain(iter::repeat(&0)))
144            .enumerate()
145        {
146            if *left == 0 {
147                *left = *right;
148            } else if *right > 0 {
149                *left = cmp::min(*left, *right);
150            }
151
152            if *left != 0 {
153                new_len = ix + 1;
154            }
155        }
156        self.0.resize(new_len, 0);
157    }
158
159    pub fn observed(&self, timestamp: Local) -> bool {
160        self.get(timestamp.replica_id) >= timestamp.value
161    }
162
163    pub fn observed_any(&self, other: &Self) -> bool {
164        let mut lhs = self.0.iter();
165        let mut rhs = other.0.iter();
166        loop {
167            if let Some(left) = lhs.next() {
168                if let Some(right) = rhs.next() {
169                    if *right > 0 && left >= right {
170                        return true;
171                    }
172                } else {
173                    return false;
174                }
175            } else {
176                return false;
177            }
178        }
179    }
180
181    pub fn ge(&self, other: &Self) -> bool {
182        let mut lhs = self.0.iter();
183        let mut rhs = other.0.iter();
184        loop {
185            if let Some(left) = lhs.next() {
186                if let Some(right) = rhs.next() {
187                    if left < right {
188                        return false;
189                    }
190                } else {
191                    return true;
192                }
193            } else {
194                return rhs.next().is_none();
195            }
196        }
197    }
198
199    pub fn gt(&self, other: &Self) -> bool {
200        let mut lhs = self.0.iter();
201        let mut rhs = other.0.iter();
202        loop {
203            if let Some(left) = lhs.next() {
204                if let Some(right) = rhs.next() {
205                    if left <= right {
206                        return false;
207                    }
208                } else {
209                    return true;
210                }
211            } else {
212                return rhs.next().is_none();
213            }
214        }
215    }
216
217    pub fn iter<'a>(&'a self) -> impl 'a + Iterator<Item = Local> {
218        self.0.iter().enumerate().map(|(replica_id, seq)| Local {
219            replica_id: replica_id as ReplicaId,
220            value: *seq,
221        })
222    }
223}
224
225impl Ord for Lamport {
226    fn cmp(&self, other: &Self) -> Ordering {
227        // Use the replica id to break ties between concurrent events.
228        self.value
229            .cmp(&other.value)
230            .then_with(|| self.replica_id.cmp(&other.replica_id))
231    }
232}
233
234impl PartialOrd for Lamport {
235    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
236        Some(self.cmp(other))
237    }
238}
239
240impl Lamport {
241    pub fn new(replica_id: ReplicaId) -> Self {
242        Self {
243            value: 1,
244            replica_id,
245        }
246    }
247
248    pub fn tick(&mut self) -> Self {
249        let timestamp = *self;
250        self.value += 1;
251        timestamp
252    }
253
254    pub fn observe(&mut self, timestamp: Self) {
255        self.value = cmp::max(self.value, timestamp.value) + 1;
256    }
257}
258
259impl fmt::Debug for Local {
260    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
261        write!(f, "Local {{{}: {}}}", self.replica_id, self.value)
262    }
263}
264
265impl fmt::Debug for Lamport {
266    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
267        write!(f, "Lamport {{{}: {}}}", self.replica_id, self.value)
268    }
269}
270
271impl fmt::Debug for Global {
272    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
273        write!(f, "Global {{")?;
274        for timestamp in self.iter() {
275            if timestamp.replica_id > 0 {
276                write!(f, ", ")?;
277            }
278            write!(f, "{}: {}", timestamp.replica_id, timestamp.value)?;
279        }
280        write!(f, "}}")
281    }
282}