lib.rs

  1use smallvec::SmallVec;
  2use std::{
  3    cmp::{self, Ordering},
  4    fmt,
  5    ops::{Add, AddAssign},
  6    slice,
  7};
  8
  9pub type ReplicaId = u16;
 10pub type Seq = u32;
 11
 12#[derive(Clone, Copy, Default, Eq, Hash, PartialEq, Ord, PartialOrd)]
 13pub struct Local {
 14    pub replica_id: ReplicaId,
 15    pub value: Seq,
 16}
 17
 18#[derive(Clone, Copy, Default, Eq, Hash, PartialEq)]
 19pub struct Lamport {
 20    pub replica_id: ReplicaId,
 21    pub value: Seq,
 22}
 23
 24impl Local {
 25    pub fn new(replica_id: ReplicaId) -> Self {
 26        Self {
 27            replica_id,
 28            value: 1,
 29        }
 30    }
 31
 32    pub fn tick(&mut self) -> Self {
 33        let timestamp = *self;
 34        self.value += 1;
 35        timestamp
 36    }
 37
 38    pub fn observe(&mut self, timestamp: Self) {
 39        if timestamp.replica_id == self.replica_id {
 40            self.value = cmp::max(self.value, timestamp.value + 1);
 41        }
 42    }
 43}
 44
 45impl<'a> Add<&'a Self> for Local {
 46    type Output = Local;
 47
 48    fn add(self, other: &'a Self) -> Self::Output {
 49        cmp::max(&self, other).clone()
 50    }
 51}
 52
 53impl<'a> AddAssign<&'a Local> for Local {
 54    fn add_assign(&mut self, other: &Self) {
 55        if *self < *other {
 56            *self = other.clone();
 57        }
 58    }
 59}
 60
 61#[derive(Clone, Default, Hash, Eq, PartialEq)]
 62pub struct Global(SmallVec<[Local; 3]>);
 63
 64impl From<Vec<rpc::proto::VectorClockEntry>> for Global {
 65    fn from(message: Vec<rpc::proto::VectorClockEntry>) -> Self {
 66        let mut version = Self::new();
 67        for entry in message {
 68            version.observe(Local {
 69                replica_id: entry.replica_id as ReplicaId,
 70                value: entry.timestamp,
 71            });
 72        }
 73        version
 74    }
 75}
 76
 77impl<'a> From<&'a Global> for Vec<rpc::proto::VectorClockEntry> {
 78    fn from(version: &'a Global) -> Self {
 79        version
 80            .iter()
 81            .map(|entry| rpc::proto::VectorClockEntry {
 82                replica_id: entry.replica_id as u32,
 83                timestamp: entry.value,
 84            })
 85            .collect()
 86    }
 87}
 88
 89impl From<Global> for Vec<rpc::proto::VectorClockEntry> {
 90    fn from(version: Global) -> Self {
 91        (&version).into()
 92    }
 93}
 94
 95impl Global {
 96    pub fn new() -> Self {
 97        Self::default()
 98    }
 99
100    pub fn get(&self, replica_id: ReplicaId) -> Seq {
101        self.0
102            .iter()
103            .find(|t| t.replica_id == replica_id)
104            .map_or(0, |t| t.value)
105    }
106
107    pub fn observe(&mut self, timestamp: Local) {
108        if let Some(entry) = self
109            .0
110            .iter_mut()
111            .find(|t| t.replica_id == timestamp.replica_id)
112        {
113            entry.value = cmp::max(entry.value, timestamp.value);
114        } else {
115            self.0.push(timestamp);
116        }
117    }
118
119    pub fn join(&mut self, other: &Self) {
120        for timestamp in other.0.iter() {
121            self.observe(*timestamp);
122        }
123    }
124
125    pub fn meet(&mut self, other: &Self) {
126        for timestamp in other.0.iter() {
127            if let Some(entry) = self
128                .0
129                .iter_mut()
130                .find(|t| t.replica_id == timestamp.replica_id)
131            {
132                entry.value = cmp::min(entry.value, timestamp.value);
133            } else {
134                self.0.push(*timestamp);
135            }
136        }
137    }
138
139    pub fn observed(&self, timestamp: Local) -> bool {
140        self.get(timestamp.replica_id) >= timestamp.value
141    }
142
143    pub fn changed_since(&self, other: &Self) -> bool {
144        self.0.iter().any(|t| t.value > other.get(t.replica_id))
145    }
146
147    pub fn iter(&self) -> slice::Iter<Local> {
148        self.0.iter()
149    }
150}
151
152impl PartialOrd for Global {
153    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
154        let mut global_ordering = Ordering::Equal;
155
156        for timestamp in self.0.iter().chain(other.0.iter()) {
157            let ordering = self
158                .get(timestamp.replica_id)
159                .cmp(&other.get(timestamp.replica_id));
160            if ordering != Ordering::Equal {
161                if global_ordering == Ordering::Equal {
162                    global_ordering = ordering;
163                } else if ordering != global_ordering {
164                    return None;
165                }
166            }
167        }
168
169        Some(global_ordering)
170    }
171}
172
173impl Ord for Lamport {
174    fn cmp(&self, other: &Self) -> Ordering {
175        // Use the replica id to break ties between concurrent events.
176        self.value
177            .cmp(&other.value)
178            .then_with(|| self.replica_id.cmp(&other.replica_id))
179    }
180}
181
182impl PartialOrd for Lamport {
183    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
184        Some(self.cmp(other))
185    }
186}
187
188impl Lamport {
189    pub fn new(replica_id: ReplicaId) -> Self {
190        Self {
191            value: 1,
192            replica_id,
193        }
194    }
195
196    pub fn tick(&mut self) -> Self {
197        let timestamp = *self;
198        self.value += 1;
199        timestamp
200    }
201
202    pub fn observe(&mut self, timestamp: Self) {
203        self.value = cmp::max(self.value, timestamp.value) + 1;
204    }
205}
206
207impl fmt::Debug for Local {
208    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
209        write!(f, "Local {{{}: {}}}", self.replica_id, self.value)
210    }
211}
212
213impl fmt::Debug for Lamport {
214    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
215        write!(f, "Lamport {{{}: {}}}", self.replica_id, self.value)
216    }
217}
218
219impl fmt::Debug for Global {
220    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
221        write!(f, "Global {{")?;
222        for (i, element) in self.0.iter().enumerate() {
223            if i > 0 {
224                write!(f, ", ")?;
225            }
226            write!(f, "{}: {}", element.replica_id, element.value)?;
227        }
228        write!(f, "}}")
229    }
230}