clock.rs

  1use smallvec::SmallVec;
  2use std::{
  3    cmp::{self, Ordering},
  4    fmt, iter,
  5    ops::{Add, AddAssign},
  6};
  7
  8pub type ReplicaId = u16;
  9pub type Seq = u32;
 10
 11#[derive(Clone, Copy, Default, Eq, Hash, PartialEq, Ord, PartialOrd)]
 12pub struct Local {
 13    pub replica_id: ReplicaId,
 14    pub value: Seq,
 15}
 16
 17#[derive(Clone, Copy, Default, Eq, Hash, PartialEq)]
 18pub struct Lamport {
 19    pub replica_id: ReplicaId,
 20    pub value: Seq,
 21}
 22
 23impl Local {
 24    pub const MIN: Self = Self {
 25        replica_id: ReplicaId::MIN,
 26        value: Seq::MIN,
 27    };
 28    pub const MAX: Self = Self {
 29        replica_id: ReplicaId::MAX,
 30        value: Seq::MAX,
 31    };
 32
 33    pub fn new(replica_id: ReplicaId) -> Self {
 34        Self {
 35            replica_id,
 36            value: 1,
 37        }
 38    }
 39
 40    pub fn tick(&mut self) -> Self {
 41        let timestamp = *self;
 42        self.value += 1;
 43        timestamp
 44    }
 45
 46    pub fn observe(&mut self, timestamp: Self) {
 47        if timestamp.replica_id == self.replica_id {
 48            self.value = cmp::max(self.value, timestamp.value + 1);
 49        }
 50    }
 51}
 52
 53impl<'a> Add<&'a Self> for Local {
 54    type Output = Local;
 55
 56    fn add(self, other: &'a Self) -> Self::Output {
 57        cmp::max(&self, other).clone()
 58    }
 59}
 60
 61impl<'a> AddAssign<&'a Local> for Local {
 62    fn add_assign(&mut self, other: &Self) {
 63        if *self < *other {
 64            *self = other.clone();
 65        }
 66    }
 67}
 68
 69#[derive(Clone, Default, Hash, Eq, PartialEq)]
 70pub struct Global(SmallVec<[u32; 8]>);
 71
 72impl From<Vec<rpc::proto::VectorClockEntry>> for Global {
 73    fn from(message: Vec<rpc::proto::VectorClockEntry>) -> Self {
 74        let mut version = Self::new();
 75        for entry in message {
 76            version.observe(Local {
 77                replica_id: entry.replica_id as ReplicaId,
 78                value: entry.timestamp,
 79            });
 80        }
 81        version
 82    }
 83}
 84
 85impl<'a> From<&'a Global> for Vec<rpc::proto::VectorClockEntry> {
 86    fn from(version: &'a Global) -> Self {
 87        version
 88            .iter()
 89            .map(|entry| rpc::proto::VectorClockEntry {
 90                replica_id: entry.replica_id as u32,
 91                timestamp: entry.value,
 92            })
 93            .collect()
 94    }
 95}
 96
 97impl From<Global> for Vec<rpc::proto::VectorClockEntry> {
 98    fn from(version: Global) -> Self {
 99        (&version).into()
100    }
101}
102
103impl Global {
104    pub fn new() -> Self {
105        Self::default()
106    }
107
108    pub fn get(&self, replica_id: ReplicaId) -> Seq {
109        self.0.get(replica_id as usize).copied().unwrap_or(0) as Seq
110    }
111
112    pub fn observe(&mut self, timestamp: Local) {
113        if timestamp.value > 0 {
114            let new_len = timestamp.replica_id as usize + 1;
115            if new_len > self.0.len() {
116                self.0.resize(new_len, 0);
117            }
118
119            let entry = &mut self.0[timestamp.replica_id as usize];
120            *entry = cmp::max(*entry, timestamp.value);
121        }
122    }
123
124    pub fn join(&mut self, other: &Self) {
125        if other.0.len() > self.0.len() {
126            self.0.resize(other.0.len(), 0);
127        }
128
129        for (left, right) in self.0.iter_mut().zip(&other.0) {
130            *left = cmp::max(*left, *right);
131        }
132    }
133
134    pub fn meet(&mut self, other: &Self) {
135        if other.0.len() > self.0.len() {
136            self.0.resize(other.0.len(), 0);
137        }
138
139        let mut new_len = 0;
140        for (ix, (left, right)) in self
141            .0
142            .iter_mut()
143            .zip(other.0.iter().chain(iter::repeat(&0)))
144            .enumerate()
145        {
146            if *left == 0 {
147                *left = *right;
148            } else if *right > 0 {
149                *left = cmp::min(*left, *right);
150            }
151
152            if *left != 0 {
153                new_len = ix + 1;
154            }
155        }
156        self.0.resize(new_len, 0);
157    }
158
159    pub fn observed(&self, timestamp: Local) -> bool {
160        self.get(timestamp.replica_id) >= timestamp.value
161    }
162
163    pub fn observed_any(&self, other: &Self) -> bool {
164        let mut lhs = self.0.iter();
165        let mut rhs = other.0.iter();
166        loop {
167            if let Some(left) = lhs.next() {
168                if let Some(right) = rhs.next() {
169                    if *right > 0 && left >= right {
170                        return true;
171                    }
172                } else {
173                    return false;
174                }
175            } else {
176                return false;
177            }
178        }
179    }
180
181    pub fn observed_all(&self, other: &Self) -> bool {
182        let mut lhs = self.0.iter();
183        let mut rhs = other.0.iter();
184        loop {
185            if let Some(left) = lhs.next() {
186                if let Some(right) = rhs.next() {
187                    if left < right {
188                        return false;
189                    }
190                } else {
191                    return true;
192                }
193            } else {
194                return rhs.next().is_none();
195            }
196        }
197    }
198
199    pub fn changed_since(&self, other: &Self) -> bool {
200        if self.0.len() > other.0.len() {
201            return true;
202        }
203        for (left, right) in self.0.iter().zip(other.0.iter()) {
204            if left > right {
205                return true;
206            }
207        }
208        false
209    }
210
211    pub fn iter<'a>(&'a self) -> impl 'a + Iterator<Item = Local> {
212        self.0.iter().enumerate().map(|(replica_id, seq)| Local {
213            replica_id: replica_id as ReplicaId,
214            value: *seq,
215        })
216    }
217}
218
219impl Ord for Lamport {
220    fn cmp(&self, other: &Self) -> Ordering {
221        // Use the replica id to break ties between concurrent events.
222        self.value
223            .cmp(&other.value)
224            .then_with(|| self.replica_id.cmp(&other.replica_id))
225    }
226}
227
228impl PartialOrd for Lamport {
229    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
230        Some(self.cmp(other))
231    }
232}
233
234impl Lamport {
235    pub fn new(replica_id: ReplicaId) -> Self {
236        Self {
237            value: 1,
238            replica_id,
239        }
240    }
241
242    pub fn tick(&mut self) -> Self {
243        let timestamp = *self;
244        self.value += 1;
245        timestamp
246    }
247
248    pub fn observe(&mut self, timestamp: Self) {
249        self.value = cmp::max(self.value, timestamp.value) + 1;
250    }
251}
252
253impl fmt::Debug for Local {
254    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
255        write!(f, "Local {{{}: {}}}", self.replica_id, self.value)
256    }
257}
258
259impl fmt::Debug for Lamport {
260    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
261        write!(f, "Lamport {{{}: {}}}", self.replica_id, self.value)
262    }
263}
264
265impl fmt::Debug for Global {
266    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
267        write!(f, "Global {{")?;
268        for timestamp in self.iter() {
269            if timestamp.replica_id > 0 {
270                write!(f, ", ")?;
271            }
272            write!(f, "{}: {}", timestamp.replica_id, timestamp.value)?;
273        }
274        write!(f, "}}")
275    }
276}