clock.rs

  1use smallvec::SmallVec;
  2use std::{
  3    cmp::{self, Ordering},
  4    fmt, iter,
  5    ops::{Add, AddAssign},
  6};
  7
  8pub type ReplicaId = u16;
  9pub type Seq = u32;
 10
 11#[derive(Clone, Copy, Default, Eq, Hash, PartialEq, Ord, PartialOrd)]
 12pub struct Local {
 13    pub replica_id: ReplicaId,
 14    pub value: Seq,
 15}
 16
 17#[derive(Clone, Copy, Default, Eq, Hash, PartialEq)]
 18pub struct Lamport {
 19    pub replica_id: ReplicaId,
 20    pub value: Seq,
 21}
 22
 23impl Local {
 24    pub const MIN: Self = Self {
 25        replica_id: ReplicaId::MIN,
 26        value: Seq::MIN,
 27    };
 28    pub const MAX: Self = Self {
 29        replica_id: ReplicaId::MAX,
 30        value: Seq::MAX,
 31    };
 32
 33    pub fn new(replica_id: ReplicaId) -> Self {
 34        Self {
 35            replica_id,
 36            value: 1,
 37        }
 38    }
 39
 40    pub fn tick(&mut self) -> Self {
 41        let timestamp = *self;
 42        self.value += 1;
 43        timestamp
 44    }
 45
 46    pub fn observe(&mut self, timestamp: Self) {
 47        if timestamp.replica_id == self.replica_id {
 48            self.value = cmp::max(self.value, timestamp.value + 1);
 49        }
 50    }
 51}
 52
 53impl<'a> Add<&'a Self> for Local {
 54    type Output = Local;
 55
 56    fn add(self, other: &'a Self) -> Self::Output {
 57        *cmp::max(&self, other)
 58    }
 59}
 60
 61impl<'a> AddAssign<&'a Local> for Local {
 62    fn add_assign(&mut self, other: &Self) {
 63        if *self < *other {
 64            *self = *other;
 65        }
 66    }
 67}
 68
 69/// A vector clock
 70#[derive(Clone, Default, Hash, Eq, PartialEq)]
 71pub struct Global(SmallVec<[u32; 8]>);
 72
 73impl Global {
 74    pub fn new() -> Self {
 75        Self::default()
 76    }
 77
 78    pub fn get(&self, replica_id: ReplicaId) -> Seq {
 79        self.0.get(replica_id as usize).copied().unwrap_or(0) as Seq
 80    }
 81
 82    pub fn observe(&mut self, timestamp: Local) {
 83        if timestamp.value > 0 {
 84            let new_len = timestamp.replica_id as usize + 1;
 85            if new_len > self.0.len() {
 86                self.0.resize(new_len, 0);
 87            }
 88
 89            let entry = &mut self.0[timestamp.replica_id as usize];
 90            *entry = cmp::max(*entry, timestamp.value);
 91        }
 92    }
 93
 94    pub fn join(&mut self, other: &Self) {
 95        if other.0.len() > self.0.len() {
 96            self.0.resize(other.0.len(), 0);
 97        }
 98
 99        for (left, right) in self.0.iter_mut().zip(&other.0) {
100            *left = cmp::max(*left, *right);
101        }
102    }
103
104    pub fn meet(&mut self, other: &Self) {
105        if other.0.len() > self.0.len() {
106            self.0.resize(other.0.len(), 0);
107        }
108
109        let mut new_len = 0;
110        for (ix, (left, right)) in self
111            .0
112            .iter_mut()
113            .zip(other.0.iter().chain(iter::repeat(&0)))
114            .enumerate()
115        {
116            if *left == 0 {
117                *left = *right;
118            } else if *right > 0 {
119                *left = cmp::min(*left, *right);
120            }
121
122            if *left != 0 {
123                new_len = ix + 1;
124            }
125        }
126        self.0.resize(new_len, 0);
127    }
128
129    pub fn observed(&self, timestamp: Local) -> bool {
130        self.get(timestamp.replica_id) >= timestamp.value
131    }
132
133    pub fn observed_any(&self, other: &Self) -> bool {
134        let mut lhs = self.0.iter();
135        let mut rhs = other.0.iter();
136        loop {
137            if let Some(left) = lhs.next() {
138                if let Some(right) = rhs.next() {
139                    if *right > 0 && left >= right {
140                        return true;
141                    }
142                } else {
143                    return false;
144                }
145            } else {
146                return false;
147            }
148        }
149    }
150
151    pub fn observed_all(&self, other: &Self) -> bool {
152        let mut lhs = self.0.iter();
153        let mut rhs = other.0.iter();
154        loop {
155            if let Some(left) = lhs.next() {
156                if let Some(right) = rhs.next() {
157                    if left < right {
158                        return false;
159                    }
160                } else {
161                    return true;
162                }
163            } else {
164                return rhs.next().is_none();
165            }
166        }
167    }
168
169    pub fn changed_since(&self, other: &Self) -> bool {
170        if self.0.len() > other.0.len() {
171            return true;
172        }
173        for (left, right) in self.0.iter().zip(other.0.iter()) {
174            if left > right {
175                return true;
176            }
177        }
178        false
179    }
180
181    pub fn iter(&self) -> impl Iterator<Item = Local> + '_ {
182        self.0.iter().enumerate().map(|(replica_id, seq)| Local {
183            replica_id: replica_id as ReplicaId,
184            value: *seq,
185        })
186    }
187}
188
189impl FromIterator<Local> for Global {
190    fn from_iter<T: IntoIterator<Item = Local>>(locals: T) -> Self {
191        let mut result = Self::new();
192        for local in locals {
193            result.observe(local);
194        }
195        result
196    }
197}
198
199impl Ord for Lamport {
200    fn cmp(&self, other: &Self) -> Ordering {
201        // Use the replica id to break ties between concurrent events.
202        self.value
203            .cmp(&other.value)
204            .then_with(|| self.replica_id.cmp(&other.replica_id))
205    }
206}
207
208impl PartialOrd for Lamport {
209    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
210        Some(self.cmp(other))
211    }
212}
213
214impl Lamport {
215    pub fn new(replica_id: ReplicaId) -> Self {
216        Self {
217            value: 1,
218            replica_id,
219        }
220    }
221
222    pub fn tick(&mut self) -> Self {
223        let timestamp = *self;
224        self.value += 1;
225        timestamp
226    }
227
228    pub fn observe(&mut self, timestamp: Self) {
229        self.value = cmp::max(self.value, timestamp.value) + 1;
230    }
231}
232
233impl fmt::Debug for Local {
234    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
235        write!(f, "Local {{{}: {}}}", self.replica_id, self.value)
236    }
237}
238
239impl fmt::Debug for Lamport {
240    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
241        write!(f, "Lamport {{{}: {}}}", self.replica_id, self.value)
242    }
243}
244
245impl fmt::Debug for Global {
246    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
247        write!(f, "Global {{")?;
248        for timestamp in self.iter() {
249            if timestamp.replica_id > 0 {
250                write!(f, ", ")?;
251            }
252            write!(f, "{}: {}", timestamp.replica_id, timestamp.value)?;
253        }
254        write!(f, "}}")
255    }
256}