1use smallvec::SmallVec;
2use std::{
3 cmp::{self, Ordering},
4 fmt, iter,
5 ops::{Add, AddAssign},
6};
7
8pub type ReplicaId = u16;
9pub type Seq = u32;
10
11#[derive(Clone, Copy, Default, Eq, Hash, PartialEq, Ord, PartialOrd)]
12pub struct Local {
13 pub replica_id: ReplicaId,
14 pub value: Seq,
15}
16
17#[derive(Clone, Copy, Default, Eq, Hash, PartialEq)]
18pub struct Lamport {
19 pub replica_id: ReplicaId,
20 pub value: Seq,
21}
22
23impl Local {
24 pub fn new(replica_id: ReplicaId) -> Self {
25 Self {
26 replica_id,
27 value: 1,
28 }
29 }
30
31 pub fn tick(&mut self) -> Self {
32 let timestamp = *self;
33 self.value += 1;
34 timestamp
35 }
36
37 pub fn observe(&mut self, timestamp: Self) {
38 if timestamp.replica_id == self.replica_id {
39 self.value = cmp::max(self.value, timestamp.value + 1);
40 }
41 }
42}
43
44impl<'a> Add<&'a Self> for Local {
45 type Output = Local;
46
47 fn add(self, other: &'a Self) -> Self::Output {
48 cmp::max(&self, other).clone()
49 }
50}
51
52impl<'a> AddAssign<&'a Local> for Local {
53 fn add_assign(&mut self, other: &Self) {
54 if *self < *other {
55 *self = other.clone();
56 }
57 }
58}
59
60#[derive(Clone, Default, Hash, Eq, PartialEq)]
61pub struct Global(SmallVec<[u32; 8]>);
62
63impl From<Vec<rpc::proto::VectorClockEntry>> for Global {
64 fn from(message: Vec<rpc::proto::VectorClockEntry>) -> Self {
65 let mut version = Self::new();
66 for entry in message {
67 version.observe(Local {
68 replica_id: entry.replica_id as ReplicaId,
69 value: entry.timestamp,
70 });
71 }
72 version
73 }
74}
75
76impl<'a> From<&'a Global> for Vec<rpc::proto::VectorClockEntry> {
77 fn from(version: &'a Global) -> Self {
78 version
79 .iter()
80 .map(|entry| rpc::proto::VectorClockEntry {
81 replica_id: entry.replica_id as u32,
82 timestamp: entry.value,
83 })
84 .collect()
85 }
86}
87
88impl From<Global> for Vec<rpc::proto::VectorClockEntry> {
89 fn from(version: Global) -> Self {
90 (&version).into()
91 }
92}
93
94impl Global {
95 pub fn new() -> Self {
96 Self::default()
97 }
98
99 pub fn get(&self, replica_id: ReplicaId) -> Seq {
100 self.0.get(replica_id as usize).copied().unwrap_or(0) as Seq
101 }
102
103 pub fn observe(&mut self, timestamp: Local) {
104 if timestamp.value > 0 {
105 let new_len = timestamp.replica_id as usize + 1;
106 if new_len > self.0.len() {
107 self.0.resize(new_len, 0);
108 }
109
110 let entry = &mut self.0[timestamp.replica_id as usize];
111 *entry = cmp::max(*entry, timestamp.value);
112 }
113 }
114
115 pub fn join(&mut self, other: &Self) {
116 if other.0.len() > self.0.len() {
117 self.0.resize(other.0.len(), 0);
118 }
119
120 for (left, right) in self.0.iter_mut().zip(&other.0) {
121 *left = cmp::max(*left, *right);
122 }
123 }
124
125 pub fn meet(&mut self, other: &Self) {
126 if other.0.len() > self.0.len() {
127 self.0.resize(other.0.len(), 0);
128 }
129
130 let mut new_len = 0;
131 for (ix, (left, right)) in self
132 .0
133 .iter_mut()
134 .zip(other.0.iter().chain(iter::repeat(&0)))
135 .enumerate()
136 {
137 if *left == 0 {
138 *left = *right;
139 } else if *right > 0 {
140 *left = cmp::min(*left, *right);
141 }
142
143 if *left != 0 {
144 new_len = ix + 1;
145 }
146 }
147 self.0.resize(new_len, 0);
148 }
149
150 pub fn observed(&self, timestamp: Local) -> bool {
151 self.get(timestamp.replica_id) >= timestamp.value
152 }
153
154 pub fn observed_any(&self, other: &Self) -> bool {
155 let mut lhs = self.0.iter();
156 let mut rhs = other.0.iter();
157 loop {
158 if let Some(left) = lhs.next() {
159 if let Some(right) = rhs.next() {
160 if *right > 0 && left >= right {
161 return true;
162 }
163 } else {
164 return false;
165 }
166 } else {
167 return false;
168 }
169 }
170 }
171
172 pub fn ge(&self, other: &Self) -> bool {
173 let mut lhs = self.0.iter();
174 let mut rhs = other.0.iter();
175 loop {
176 if let Some(left) = lhs.next() {
177 if let Some(right) = rhs.next() {
178 if left < right {
179 return false;
180 }
181 } else {
182 return true;
183 }
184 } else {
185 return rhs.next().is_none();
186 }
187 }
188 }
189
190 pub fn gt(&self, other: &Self) -> bool {
191 let mut lhs = self.0.iter();
192 let mut rhs = other.0.iter();
193 loop {
194 if let Some(left) = lhs.next() {
195 if let Some(right) = rhs.next() {
196 if left <= right {
197 return false;
198 }
199 } else {
200 return true;
201 }
202 } else {
203 return rhs.next().is_none();
204 }
205 }
206 }
207
208 pub fn iter<'a>(&'a self) -> impl 'a + Iterator<Item = Local> {
209 self.0.iter().enumerate().map(|(replica_id, seq)| Local {
210 replica_id: replica_id as ReplicaId,
211 value: *seq,
212 })
213 }
214}
215
216impl Ord for Lamport {
217 fn cmp(&self, other: &Self) -> Ordering {
218 // Use the replica id to break ties between concurrent events.
219 self.value
220 .cmp(&other.value)
221 .then_with(|| self.replica_id.cmp(&other.replica_id))
222 }
223}
224
225impl PartialOrd for Lamport {
226 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
227 Some(self.cmp(other))
228 }
229}
230
231impl Lamport {
232 pub fn new(replica_id: ReplicaId) -> Self {
233 Self {
234 value: 1,
235 replica_id,
236 }
237 }
238
239 pub fn tick(&mut self) -> Self {
240 let timestamp = *self;
241 self.value += 1;
242 timestamp
243 }
244
245 pub fn observe(&mut self, timestamp: Self) {
246 self.value = cmp::max(self.value, timestamp.value) + 1;
247 }
248}
249
250impl fmt::Debug for Local {
251 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
252 write!(f, "Local {{{}: {}}}", self.replica_id, self.value)
253 }
254}
255
256impl fmt::Debug for Lamport {
257 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
258 write!(f, "Lamport {{{}: {}}}", self.replica_id, self.value)
259 }
260}
261
262impl fmt::Debug for Global {
263 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
264 write!(f, "Global {{")?;
265 for timestamp in self.iter() {
266 if timestamp.replica_id > 0 {
267 write!(f, ", ")?;
268 }
269 write!(f, "{}: {}", timestamp.replica_id, timestamp.value)?;
270 }
271 write!(f, "}}")
272 }
273}