lib.rs

  1use smallvec::SmallVec;
  2use std::{
  3    cmp::{self, Ordering},
  4    fmt, iter,
  5    ops::{Add, AddAssign},
  6};
  7
  8pub type ReplicaId = u16;
  9pub type Seq = u32;
 10
 11#[derive(Clone, Copy, Default, Eq, Hash, PartialEq, Ord, PartialOrd)]
 12pub struct Local {
 13    pub replica_id: ReplicaId,
 14    pub value: Seq,
 15}
 16
 17#[derive(Clone, Copy, Default, Eq, Hash, PartialEq)]
 18pub struct Lamport {
 19    pub replica_id: ReplicaId,
 20    pub value: Seq,
 21}
 22
 23impl Local {
 24    pub fn new(replica_id: ReplicaId) -> Self {
 25        Self {
 26            replica_id,
 27            value: 1,
 28        }
 29    }
 30
 31    pub fn tick(&mut self) -> Self {
 32        let timestamp = *self;
 33        self.value += 1;
 34        timestamp
 35    }
 36
 37    pub fn observe(&mut self, timestamp: Self) {
 38        if timestamp.replica_id == self.replica_id {
 39            self.value = cmp::max(self.value, timestamp.value + 1);
 40        }
 41    }
 42}
 43
 44impl<'a> Add<&'a Self> for Local {
 45    type Output = Local;
 46
 47    fn add(self, other: &'a Self) -> Self::Output {
 48        cmp::max(&self, other).clone()
 49    }
 50}
 51
 52impl<'a> AddAssign<&'a Local> for Local {
 53    fn add_assign(&mut self, other: &Self) {
 54        if *self < *other {
 55            *self = other.clone();
 56        }
 57    }
 58}
 59
 60#[derive(Clone, Default, Hash, Eq, PartialEq)]
 61pub struct Global(SmallVec<[u32; 8]>);
 62
 63impl From<Vec<rpc::proto::VectorClockEntry>> for Global {
 64    fn from(message: Vec<rpc::proto::VectorClockEntry>) -> Self {
 65        let mut version = Self::new();
 66        for entry in message {
 67            version.observe(Local {
 68                replica_id: entry.replica_id as ReplicaId,
 69                value: entry.timestamp,
 70            });
 71        }
 72        version
 73    }
 74}
 75
 76impl<'a> From<&'a Global> for Vec<rpc::proto::VectorClockEntry> {
 77    fn from(version: &'a Global) -> Self {
 78        version
 79            .iter()
 80            .map(|entry| rpc::proto::VectorClockEntry {
 81                replica_id: entry.replica_id as u32,
 82                timestamp: entry.value,
 83            })
 84            .collect()
 85    }
 86}
 87
 88impl From<Global> for Vec<rpc::proto::VectorClockEntry> {
 89    fn from(version: Global) -> Self {
 90        (&version).into()
 91    }
 92}
 93
 94impl Global {
 95    pub fn new() -> Self {
 96        Self::default()
 97    }
 98
 99    pub fn get(&self, replica_id: ReplicaId) -> Seq {
100        self.0.get(replica_id as usize).copied().unwrap_or(0) as Seq
101    }
102
103    pub fn observe(&mut self, timestamp: Local) {
104        if timestamp.value > 0 {
105            let new_len = timestamp.replica_id as usize + 1;
106            if new_len > self.0.len() {
107                self.0.resize(new_len, 0);
108            }
109
110            let entry = &mut self.0[timestamp.replica_id as usize];
111            *entry = cmp::max(*entry, timestamp.value);
112        }
113    }
114
115    pub fn join(&mut self, other: &Self) {
116        if other.0.len() > self.0.len() {
117            self.0.resize(other.0.len(), 0);
118        }
119
120        for (left, right) in self.0.iter_mut().zip(&other.0) {
121            *left = cmp::max(*left, *right);
122        }
123    }
124
125    pub fn meet(&mut self, other: &Self) {
126        if other.0.len() > self.0.len() {
127            self.0.resize(other.0.len(), 0);
128        }
129
130        let mut new_len = 0;
131        for (ix, (left, right)) in self
132            .0
133            .iter_mut()
134            .zip(other.0.iter().chain(iter::repeat(&0)))
135            .enumerate()
136        {
137            if *left == 0 {
138                *left = *right;
139            } else if *right > 0 {
140                *left = cmp::min(*left, *right);
141            }
142
143            if *left != 0 {
144                new_len = ix + 1;
145            }
146        }
147        self.0.resize(new_len, 0);
148    }
149
150    pub fn observed(&self, timestamp: Local) -> bool {
151        self.get(timestamp.replica_id) >= timestamp.value
152    }
153
154    pub fn observed_any(&self, other: &Self) -> bool {
155        let mut lhs = self.0.iter();
156        let mut rhs = other.0.iter();
157        loop {
158            if let Some(left) = lhs.next() {
159                if let Some(right) = rhs.next() {
160                    if *right > 0 && left >= right {
161                        return true;
162                    }
163                } else {
164                    return false;
165                }
166            } else {
167                return false;
168            }
169        }
170    }
171
172    pub fn ge(&self, other: &Self) -> bool {
173        let mut lhs = self.0.iter();
174        let mut rhs = other.0.iter();
175        loop {
176            if let Some(left) = lhs.next() {
177                if let Some(right) = rhs.next() {
178                    if left < right {
179                        return false;
180                    }
181                } else {
182                    return true;
183                }
184            } else {
185                return rhs.next().is_none();
186            }
187        }
188    }
189
190    pub fn gt(&self, other: &Self) -> bool {
191        let mut lhs = self.0.iter();
192        let mut rhs = other.0.iter();
193        loop {
194            if let Some(left) = lhs.next() {
195                if let Some(right) = rhs.next() {
196                    if left <= right {
197                        return false;
198                    }
199                } else {
200                    return true;
201                }
202            } else {
203                return rhs.next().is_none();
204            }
205        }
206    }
207
208    pub fn iter<'a>(&'a self) -> impl 'a + Iterator<Item = Local> {
209        self.0.iter().enumerate().map(|(replica_id, seq)| Local {
210            replica_id: replica_id as ReplicaId,
211            value: *seq,
212        })
213    }
214}
215
216impl Ord for Lamport {
217    fn cmp(&self, other: &Self) -> Ordering {
218        // Use the replica id to break ties between concurrent events.
219        self.value
220            .cmp(&other.value)
221            .then_with(|| self.replica_id.cmp(&other.replica_id))
222    }
223}
224
225impl PartialOrd for Lamport {
226    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
227        Some(self.cmp(other))
228    }
229}
230
231impl Lamport {
232    pub fn new(replica_id: ReplicaId) -> Self {
233        Self {
234            value: 1,
235            replica_id,
236        }
237    }
238
239    pub fn tick(&mut self) -> Self {
240        let timestamp = *self;
241        self.value += 1;
242        timestamp
243    }
244
245    pub fn observe(&mut self, timestamp: Self) {
246        self.value = cmp::max(self.value, timestamp.value) + 1;
247    }
248}
249
250impl fmt::Debug for Local {
251    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
252        write!(f, "Local {{{}: {}}}", self.replica_id, self.value)
253    }
254}
255
256impl fmt::Debug for Lamport {
257    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
258        write!(f, "Lamport {{{}: {}}}", self.replica_id, self.value)
259    }
260}
261
262impl fmt::Debug for Global {
263    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
264        write!(f, "Global {{")?;
265        for timestamp in self.iter() {
266            if timestamp.replica_id > 0 {
267                write!(f, ", ")?;
268            }
269            write!(f, "{}: {}", timestamp.replica_id, timestamp.value)?;
270        }
271        write!(f, "}}")
272    }
273}