clock.rs

  1use smallvec::SmallVec;
  2use std::{
  3    cmp::{self, Ordering},
  4    fmt, iter,
  5    ops::{Add, AddAssign},
  6};
  7
  8pub type ReplicaId = u16;
  9pub type Seq = u32;
 10
 11#[derive(Clone, Copy, Default, Eq, Hash, PartialEq, Ord, PartialOrd)]
 12pub struct Local {
 13    pub replica_id: ReplicaId,
 14    pub value: Seq,
 15}
 16
 17#[derive(Clone, Copy, Default, Eq, Hash, PartialEq)]
 18pub struct Lamport {
 19    pub replica_id: ReplicaId,
 20    pub value: Seq,
 21}
 22
 23impl Local {
 24    pub const MIN: Self = Self {
 25        replica_id: ReplicaId::MIN,
 26        value: Seq::MIN,
 27    };
 28    pub const MAX: Self = Self {
 29        replica_id: ReplicaId::MAX,
 30        value: Seq::MAX,
 31    };
 32
 33    pub fn new(replica_id: ReplicaId) -> Self {
 34        Self {
 35            replica_id,
 36            value: 1,
 37        }
 38    }
 39
 40    pub fn tick(&mut self) -> Self {
 41        let timestamp = *self;
 42        self.value += 1;
 43        timestamp
 44    }
 45
 46    pub fn observe(&mut self, timestamp: Self) {
 47        if timestamp.replica_id == self.replica_id {
 48            self.value = cmp::max(self.value, timestamp.value + 1);
 49        }
 50    }
 51}
 52
 53impl<'a> Add<&'a Self> for Local {
 54    type Output = Local;
 55
 56    fn add(self, other: &'a Self) -> Self::Output {
 57        *cmp::max(&self, other)
 58    }
 59}
 60
 61impl<'a> AddAssign<&'a Local> for Local {
 62    fn add_assign(&mut self, other: &Self) {
 63        if *self < *other {
 64            *self = *other;
 65        }
 66    }
 67}
 68
 69#[derive(Clone, Default, Hash, Eq, PartialEq)]
 70pub struct Global(SmallVec<[u32; 8]>);
 71
 72impl Global {
 73    pub fn new() -> Self {
 74        Self::default()
 75    }
 76
 77    pub fn get(&self, replica_id: ReplicaId) -> Seq {
 78        self.0.get(replica_id as usize).copied().unwrap_or(0) as Seq
 79    }
 80
 81    pub fn observe(&mut self, timestamp: Local) {
 82        if timestamp.value > 0 {
 83            let new_len = timestamp.replica_id as usize + 1;
 84            if new_len > self.0.len() {
 85                self.0.resize(new_len, 0);
 86            }
 87
 88            let entry = &mut self.0[timestamp.replica_id as usize];
 89            *entry = cmp::max(*entry, timestamp.value);
 90        }
 91    }
 92
 93    pub fn join(&mut self, other: &Self) {
 94        if other.0.len() > self.0.len() {
 95            self.0.resize(other.0.len(), 0);
 96        }
 97
 98        for (left, right) in self.0.iter_mut().zip(&other.0) {
 99            *left = cmp::max(*left, *right);
100        }
101    }
102
103    pub fn meet(&mut self, other: &Self) {
104        if other.0.len() > self.0.len() {
105            self.0.resize(other.0.len(), 0);
106        }
107
108        let mut new_len = 0;
109        for (ix, (left, right)) in self
110            .0
111            .iter_mut()
112            .zip(other.0.iter().chain(iter::repeat(&0)))
113            .enumerate()
114        {
115            if *left == 0 {
116                *left = *right;
117            } else if *right > 0 {
118                *left = cmp::min(*left, *right);
119            }
120
121            if *left != 0 {
122                new_len = ix + 1;
123            }
124        }
125        self.0.resize(new_len, 0);
126    }
127
128    pub fn observed(&self, timestamp: Local) -> bool {
129        self.get(timestamp.replica_id) >= timestamp.value
130    }
131
132    pub fn observed_any(&self, other: &Self) -> bool {
133        let mut lhs = self.0.iter();
134        let mut rhs = other.0.iter();
135        loop {
136            if let Some(left) = lhs.next() {
137                if let Some(right) = rhs.next() {
138                    if *right > 0 && left >= right {
139                        return true;
140                    }
141                } else {
142                    return false;
143                }
144            } else {
145                return false;
146            }
147        }
148    }
149
150    pub fn observed_all(&self, other: &Self) -> bool {
151        let mut lhs = self.0.iter();
152        let mut rhs = other.0.iter();
153        loop {
154            if let Some(left) = lhs.next() {
155                if let Some(right) = rhs.next() {
156                    if left < right {
157                        return false;
158                    }
159                } else {
160                    return true;
161                }
162            } else {
163                return rhs.next().is_none();
164            }
165        }
166    }
167
168    pub fn changed_since(&self, other: &Self) -> bool {
169        if self.0.len() > other.0.len() {
170            return true;
171        }
172        for (left, right) in self.0.iter().zip(other.0.iter()) {
173            if left > right {
174                return true;
175            }
176        }
177        false
178    }
179
180    pub fn iter(&self) -> impl Iterator<Item = Local> + '_ {
181        self.0.iter().enumerate().map(|(replica_id, seq)| Local {
182            replica_id: replica_id as ReplicaId,
183            value: *seq,
184        })
185    }
186}
187
188impl FromIterator<Local> for Global {
189    fn from_iter<T: IntoIterator<Item = Local>>(locals: T) -> Self {
190        let mut result = Self::new();
191        for local in locals {
192            result.observe(local);
193        }
194        result
195    }
196}
197
198impl Ord for Lamport {
199    fn cmp(&self, other: &Self) -> Ordering {
200        // Use the replica id to break ties between concurrent events.
201        self.value
202            .cmp(&other.value)
203            .then_with(|| self.replica_id.cmp(&other.replica_id))
204    }
205}
206
207impl PartialOrd for Lamport {
208    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
209        Some(self.cmp(other))
210    }
211}
212
213impl Lamport {
214    pub fn new(replica_id: ReplicaId) -> Self {
215        Self {
216            value: 1,
217            replica_id,
218        }
219    }
220
221    pub fn tick(&mut self) -> Self {
222        let timestamp = *self;
223        self.value += 1;
224        timestamp
225    }
226
227    pub fn observe(&mut self, timestamp: Self) {
228        self.value = cmp::max(self.value, timestamp.value) + 1;
229    }
230}
231
232impl fmt::Debug for Local {
233    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
234        write!(f, "Local {{{}: {}}}", self.replica_id, self.value)
235    }
236}
237
238impl fmt::Debug for Lamport {
239    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
240        write!(f, "Lamport {{{}: {}}}", self.replica_id, self.value)
241    }
242}
243
244impl fmt::Debug for Global {
245    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
246        write!(f, "Global {{")?;
247        for timestamp in self.iter() {
248            if timestamp.replica_id > 0 {
249                write!(f, ", ")?;
250            }
251            write!(f, "{}: {}", timestamp.replica_id, timestamp.value)?;
252        }
253        write!(f, "}}")
254    }
255}