1use smallvec::SmallVec;
2use std::{
3 cmp::{self, Ordering},
4 fmt, iter,
5 ops::{Add, AddAssign},
6};
7
8pub type ReplicaId = u16;
9pub type Seq = u32;
10
11#[derive(Clone, Copy, Default, Eq, Hash, PartialEq, Ord, PartialOrd)]
12pub struct Local {
13 pub replica_id: ReplicaId,
14 pub value: Seq,
15}
16
17#[derive(Clone, Copy, Default, Eq, Hash, PartialEq)]
18pub struct Lamport {
19 pub replica_id: ReplicaId,
20 pub value: Seq,
21}
22
23impl Local {
24 pub const MIN: Self = Self {
25 replica_id: ReplicaId::MIN,
26 value: Seq::MIN,
27 };
28 pub const MAX: Self = Self {
29 replica_id: ReplicaId::MAX,
30 value: Seq::MAX,
31 };
32
33 pub fn new(replica_id: ReplicaId) -> Self {
34 Self {
35 replica_id,
36 value: 1,
37 }
38 }
39
40 pub fn tick(&mut self) -> Self {
41 let timestamp = *self;
42 self.value += 1;
43 timestamp
44 }
45
46 pub fn observe(&mut self, timestamp: Self) {
47 if timestamp.replica_id == self.replica_id {
48 self.value = cmp::max(self.value, timestamp.value + 1);
49 }
50 }
51}
52
53impl<'a> Add<&'a Self> for Local {
54 type Output = Local;
55
56 fn add(self, other: &'a Self) -> Self::Output {
57 cmp::max(&self, other).clone()
58 }
59}
60
61impl<'a> AddAssign<&'a Local> for Local {
62 fn add_assign(&mut self, other: &Self) {
63 if *self < *other {
64 *self = other.clone();
65 }
66 }
67}
68
69#[derive(Clone, Default, Hash, Eq, PartialEq)]
70pub struct Global(SmallVec<[u32; 8]>);
71
72impl From<Vec<rpc::proto::VectorClockEntry>> for Global {
73 fn from(message: Vec<rpc::proto::VectorClockEntry>) -> Self {
74 let mut version = Self::new();
75 for entry in message {
76 version.observe(Local {
77 replica_id: entry.replica_id as ReplicaId,
78 value: entry.timestamp,
79 });
80 }
81 version
82 }
83}
84
85impl<'a> From<&'a Global> for Vec<rpc::proto::VectorClockEntry> {
86 fn from(version: &'a Global) -> Self {
87 version
88 .iter()
89 .map(|entry| rpc::proto::VectorClockEntry {
90 replica_id: entry.replica_id as u32,
91 timestamp: entry.value,
92 })
93 .collect()
94 }
95}
96
97impl From<Global> for Vec<rpc::proto::VectorClockEntry> {
98 fn from(version: Global) -> Self {
99 (&version).into()
100 }
101}
102
103impl Global {
104 pub fn new() -> Self {
105 Self::default()
106 }
107
108 pub fn get(&self, replica_id: ReplicaId) -> Seq {
109 self.0.get(replica_id as usize).copied().unwrap_or(0) as Seq
110 }
111
112 pub fn observe(&mut self, timestamp: Local) {
113 if timestamp.value > 0 {
114 let new_len = timestamp.replica_id as usize + 1;
115 if new_len > self.0.len() {
116 self.0.resize(new_len, 0);
117 }
118
119 let entry = &mut self.0[timestamp.replica_id as usize];
120 *entry = cmp::max(*entry, timestamp.value);
121 }
122 }
123
124 pub fn join(&mut self, other: &Self) {
125 if other.0.len() > self.0.len() {
126 self.0.resize(other.0.len(), 0);
127 }
128
129 for (left, right) in self.0.iter_mut().zip(&other.0) {
130 *left = cmp::max(*left, *right);
131 }
132 }
133
134 pub fn meet(&mut self, other: &Self) {
135 if other.0.len() > self.0.len() {
136 self.0.resize(other.0.len(), 0);
137 }
138
139 let mut new_len = 0;
140 for (ix, (left, right)) in self
141 .0
142 .iter_mut()
143 .zip(other.0.iter().chain(iter::repeat(&0)))
144 .enumerate()
145 {
146 if *left == 0 {
147 *left = *right;
148 } else if *right > 0 {
149 *left = cmp::min(*left, *right);
150 }
151
152 if *left != 0 {
153 new_len = ix + 1;
154 }
155 }
156 self.0.resize(new_len, 0);
157 }
158
159 pub fn observed(&self, timestamp: Local) -> bool {
160 self.get(timestamp.replica_id) >= timestamp.value
161 }
162
163 pub fn observed_any(&self, other: &Self) -> bool {
164 let mut lhs = self.0.iter();
165 let mut rhs = other.0.iter();
166 loop {
167 if let Some(left) = lhs.next() {
168 if let Some(right) = rhs.next() {
169 if *right > 0 && left >= right {
170 return true;
171 }
172 } else {
173 return false;
174 }
175 } else {
176 return false;
177 }
178 }
179 }
180
181 pub fn observed_all(&self, other: &Self) -> bool {
182 let mut lhs = self.0.iter();
183 let mut rhs = other.0.iter();
184 loop {
185 if let Some(left) = lhs.next() {
186 if let Some(right) = rhs.next() {
187 if left < right {
188 return false;
189 }
190 } else {
191 return true;
192 }
193 } else {
194 return rhs.next().is_none();
195 }
196 }
197 }
198
199 pub fn changed_since(&self, other: &Self) -> bool {
200 if self.0.len() > other.0.len() {
201 return true;
202 }
203 for (left, right) in self.0.iter().zip(other.0.iter()) {
204 if left > right {
205 return true;
206 }
207 }
208 false
209 }
210
211 pub fn iter<'a>(&'a self) -> impl 'a + Iterator<Item = Local> {
212 self.0.iter().enumerate().map(|(replica_id, seq)| Local {
213 replica_id: replica_id as ReplicaId,
214 value: *seq,
215 })
216 }
217}
218
219impl FromIterator<Local> for Global {
220 fn from_iter<T: IntoIterator<Item = Local>>(locals: T) -> Self {
221 let mut result = Self::new();
222 for local in locals {
223 result.observe(local);
224 }
225 result
226 }
227}
228
229impl Ord for Lamport {
230 fn cmp(&self, other: &Self) -> Ordering {
231 // Use the replica id to break ties between concurrent events.
232 self.value
233 .cmp(&other.value)
234 .then_with(|| self.replica_id.cmp(&other.replica_id))
235 }
236}
237
238impl PartialOrd for Lamport {
239 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
240 Some(self.cmp(other))
241 }
242}
243
244impl Lamport {
245 pub fn new(replica_id: ReplicaId) -> Self {
246 Self {
247 value: 1,
248 replica_id,
249 }
250 }
251
252 pub fn tick(&mut self) -> Self {
253 let timestamp = *self;
254 self.value += 1;
255 timestamp
256 }
257
258 pub fn observe(&mut self, timestamp: Self) {
259 self.value = cmp::max(self.value, timestamp.value) + 1;
260 }
261}
262
263impl fmt::Debug for Local {
264 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
265 write!(f, "Local {{{}: {}}}", self.replica_id, self.value)
266 }
267}
268
269impl fmt::Debug for Lamport {
270 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
271 write!(f, "Lamport {{{}: {}}}", self.replica_id, self.value)
272 }
273}
274
275impl fmt::Debug for Global {
276 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
277 write!(f, "Global {{")?;
278 for timestamp in self.iter() {
279 if timestamp.replica_id > 0 {
280 write!(f, ", ")?;
281 }
282 write!(f, "{}: {}", timestamp.replica_id, timestamp.value)?;
283 }
284 write!(f, "}}")
285 }
286}