lib.rs

  1use smallvec::SmallVec;
  2use std::{
  3    cmp::{self, Ordering},
  4    fmt,
  5    ops::{Add, AddAssign},
  6    slice,
  7};
  8
  9pub type ReplicaId = u16;
 10pub type Seq = u32;
 11
 12#[derive(Clone, Copy, Default, Eq, Hash, PartialEq, Ord, PartialOrd)]
 13pub struct Local {
 14    pub replica_id: ReplicaId,
 15    pub value: Seq,
 16}
 17
 18#[derive(Clone, Copy, Default, Eq, Hash, PartialEq)]
 19pub struct Lamport {
 20    pub replica_id: ReplicaId,
 21    pub value: Seq,
 22}
 23
 24impl Local {
 25    pub fn new(replica_id: ReplicaId) -> Self {
 26        Self {
 27            replica_id,
 28            value: 1,
 29        }
 30    }
 31
 32    pub fn tick(&mut self) -> Self {
 33        let timestamp = *self;
 34        self.value += 1;
 35        timestamp
 36    }
 37
 38    pub fn observe(&mut self, timestamp: Self) {
 39        if timestamp.replica_id == self.replica_id {
 40            self.value = cmp::max(self.value, timestamp.value + 1);
 41        }
 42    }
 43}
 44
 45impl<'a> Add<&'a Self> for Local {
 46    type Output = Local;
 47
 48    fn add(self, other: &'a Self) -> Self::Output {
 49        cmp::max(&self, other).clone()
 50    }
 51}
 52
 53impl<'a> AddAssign<&'a Local> for Local {
 54    fn add_assign(&mut self, other: &Self) {
 55        if *self < *other {
 56            *self = other.clone();
 57        }
 58    }
 59}
 60
 61#[derive(Clone, Default, Hash, Eq, PartialEq)]
 62pub struct Global(SmallVec<[Local; 3]>);
 63
 64impl From<Vec<zrpc::proto::VectorClockEntry>> for Global {
 65    fn from(message: Vec<zrpc::proto::VectorClockEntry>) -> Self {
 66        let mut version = Self::new();
 67        for entry in message {
 68            version.observe(Local {
 69                replica_id: entry.replica_id as ReplicaId,
 70                value: entry.timestamp,
 71            });
 72        }
 73        version
 74    }
 75}
 76
 77impl<'a> From<&'a Global> for Vec<zrpc::proto::VectorClockEntry> {
 78    fn from(version: &'a Global) -> Self {
 79        version
 80            .iter()
 81            .map(|entry| zrpc::proto::VectorClockEntry {
 82                replica_id: entry.replica_id as u32,
 83                timestamp: entry.value,
 84            })
 85            .collect()
 86    }
 87}
 88
 89impl Global {
 90    pub fn new() -> Self {
 91        Self::default()
 92    }
 93
 94    pub fn get(&self, replica_id: ReplicaId) -> Seq {
 95        self.0
 96            .iter()
 97            .find(|t| t.replica_id == replica_id)
 98            .map_or(0, |t| t.value)
 99    }
100
101    pub fn observe(&mut self, timestamp: Local) {
102        if let Some(entry) = self
103            .0
104            .iter_mut()
105            .find(|t| t.replica_id == timestamp.replica_id)
106        {
107            entry.value = cmp::max(entry.value, timestamp.value);
108        } else {
109            self.0.push(timestamp);
110        }
111    }
112
113    pub fn join(&mut self, other: &Self) {
114        for timestamp in other.0.iter() {
115            self.observe(*timestamp);
116        }
117    }
118
119    pub fn meet(&mut self, other: &Self) {
120        for timestamp in other.0.iter() {
121            if let Some(entry) = self
122                .0
123                .iter_mut()
124                .find(|t| t.replica_id == timestamp.replica_id)
125            {
126                entry.value = cmp::min(entry.value, timestamp.value);
127            } else {
128                self.0.push(*timestamp);
129            }
130        }
131    }
132
133    pub fn observed(&self, timestamp: Local) -> bool {
134        self.get(timestamp.replica_id) >= timestamp.value
135    }
136
137    pub fn changed_since(&self, other: &Self) -> bool {
138        self.0.iter().any(|t| t.value > other.get(t.replica_id))
139    }
140
141    pub fn iter(&self) -> slice::Iter<Local> {
142        self.0.iter()
143    }
144}
145
146impl PartialOrd for Global {
147    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
148        let mut global_ordering = Ordering::Equal;
149
150        for timestamp in self.0.iter().chain(other.0.iter()) {
151            let ordering = self
152                .get(timestamp.replica_id)
153                .cmp(&other.get(timestamp.replica_id));
154            if ordering != Ordering::Equal {
155                if global_ordering == Ordering::Equal {
156                    global_ordering = ordering;
157                } else if ordering != global_ordering {
158                    return None;
159                }
160            }
161        }
162
163        Some(global_ordering)
164    }
165}
166
167impl Ord for Lamport {
168    fn cmp(&self, other: &Self) -> Ordering {
169        // Use the replica id to break ties between concurrent events.
170        self.value
171            .cmp(&other.value)
172            .then_with(|| self.replica_id.cmp(&other.replica_id))
173    }
174}
175
176impl PartialOrd for Lamport {
177    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
178        Some(self.cmp(other))
179    }
180}
181
182impl Lamport {
183    pub fn new(replica_id: ReplicaId) -> Self {
184        Self {
185            value: 1,
186            replica_id,
187        }
188    }
189
190    pub fn tick(&mut self) -> Self {
191        let timestamp = *self;
192        self.value += 1;
193        timestamp
194    }
195
196    pub fn observe(&mut self, timestamp: Self) {
197        self.value = cmp::max(self.value, timestamp.value) + 1;
198    }
199}
200
201impl fmt::Debug for Local {
202    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
203        write!(f, "Local {{{}: {}}}", self.replica_id, self.value)
204    }
205}
206
207impl fmt::Debug for Lamport {
208    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
209        write!(f, "Lamport {{{}: {}}}", self.replica_id, self.value)
210    }
211}
212
213impl fmt::Debug for Global {
214    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
215        write!(f, "Global {{")?;
216        for (i, element) in self.0.iter().enumerate() {
217            if i > 0 {
218                write!(f, ", ")?;
219            }
220            write!(f, "{}: {}", element.replica_id, element.value)?;
221        }
222        write!(f, "}}")
223    }
224}