1use smallvec::SmallVec;
2use std::{
3 cmp::{self, Ordering},
4 fmt, iter,
5 ops::{Add, AddAssign},
6};
7
8pub type ReplicaId = u16;
9pub type Seq = u32;
10
11#[derive(Clone, Copy, Default, Eq, Hash, PartialEq, Ord, PartialOrd)]
12pub struct Local {
13 pub replica_id: ReplicaId,
14 pub value: Seq,
15}
16
17#[derive(Clone, Copy, Default, Eq, Hash, PartialEq)]
18pub struct Lamport {
19 pub replica_id: ReplicaId,
20 pub value: Seq,
21}
22
23impl Local {
24 pub const MIN: Self = Self {
25 replica_id: ReplicaId::MIN,
26 value: Seq::MIN,
27 };
28 pub const MAX: Self = Self {
29 replica_id: ReplicaId::MAX,
30 value: Seq::MAX,
31 };
32
33 pub fn new(replica_id: ReplicaId) -> Self {
34 Self {
35 replica_id,
36 value: 1,
37 }
38 }
39
40 pub fn tick(&mut self) -> Self {
41 let timestamp = *self;
42 self.value += 1;
43 timestamp
44 }
45
46 pub fn observe(&mut self, timestamp: Self) {
47 if timestamp.replica_id == self.replica_id {
48 self.value = cmp::max(self.value, timestamp.value + 1);
49 }
50 }
51}
52
53impl<'a> Add<&'a Self> for Local {
54 type Output = Local;
55
56 fn add(self, other: &'a Self) -> Self::Output {
57 *cmp::max(&self, other)
58 }
59}
60
61impl<'a> AddAssign<&'a Local> for Local {
62 fn add_assign(&mut self, other: &Self) {
63 if *self < *other {
64 *self = *other;
65 }
66 }
67}
68
69/// A vector clock
70#[derive(Clone, Default, Hash, Eq, PartialEq)]
71pub struct Global(SmallVec<[u32; 8]>);
72
73impl Global {
74 pub fn new() -> Self {
75 Self::default()
76 }
77
78 pub fn get(&self, replica_id: ReplicaId) -> Seq {
79 self.0.get(replica_id as usize).copied().unwrap_or(0) as Seq
80 }
81
82 pub fn observe(&mut self, timestamp: Local) {
83 if timestamp.value > 0 {
84 let new_len = timestamp.replica_id as usize + 1;
85 if new_len > self.0.len() {
86 self.0.resize(new_len, 0);
87 }
88
89 let entry = &mut self.0[timestamp.replica_id as usize];
90 *entry = cmp::max(*entry, timestamp.value);
91 }
92 }
93
94 pub fn join(&mut self, other: &Self) {
95 if other.0.len() > self.0.len() {
96 self.0.resize(other.0.len(), 0);
97 }
98
99 for (left, right) in self.0.iter_mut().zip(&other.0) {
100 *left = cmp::max(*left, *right);
101 }
102 }
103
104 pub fn meet(&mut self, other: &Self) {
105 if other.0.len() > self.0.len() {
106 self.0.resize(other.0.len(), 0);
107 }
108
109 let mut new_len = 0;
110 for (ix, (left, right)) in self
111 .0
112 .iter_mut()
113 .zip(other.0.iter().chain(iter::repeat(&0)))
114 .enumerate()
115 {
116 if *left == 0 {
117 *left = *right;
118 } else if *right > 0 {
119 *left = cmp::min(*left, *right);
120 }
121
122 if *left != 0 {
123 new_len = ix + 1;
124 }
125 }
126 self.0.resize(new_len, 0);
127 }
128
129 pub fn observed(&self, timestamp: Local) -> bool {
130 self.get(timestamp.replica_id) >= timestamp.value
131 }
132
133 pub fn observed_any(&self, other: &Self) -> bool {
134 let mut lhs = self.0.iter();
135 let mut rhs = other.0.iter();
136 loop {
137 if let Some(left) = lhs.next() {
138 if let Some(right) = rhs.next() {
139 if *right > 0 && left >= right {
140 return true;
141 }
142 } else {
143 return false;
144 }
145 } else {
146 return false;
147 }
148 }
149 }
150
151 pub fn observed_all(&self, other: &Self) -> bool {
152 let mut lhs = self.0.iter();
153 let mut rhs = other.0.iter();
154 loop {
155 if let Some(left) = lhs.next() {
156 if let Some(right) = rhs.next() {
157 if left < right {
158 return false;
159 }
160 } else {
161 return true;
162 }
163 } else {
164 return rhs.next().is_none();
165 }
166 }
167 }
168
169 pub fn changed_since(&self, other: &Self) -> bool {
170 if self.0.len() > other.0.len() {
171 return true;
172 }
173 for (left, right) in self.0.iter().zip(other.0.iter()) {
174 if left > right {
175 return true;
176 }
177 }
178 false
179 }
180
181 pub fn iter(&self) -> impl Iterator<Item = Local> + '_ {
182 self.0.iter().enumerate().map(|(replica_id, seq)| Local {
183 replica_id: replica_id as ReplicaId,
184 value: *seq,
185 })
186 }
187}
188
189impl FromIterator<Local> for Global {
190 fn from_iter<T: IntoIterator<Item = Local>>(locals: T) -> Self {
191 let mut result = Self::new();
192 for local in locals {
193 result.observe(local);
194 }
195 result
196 }
197}
198
199impl Ord for Lamport {
200 fn cmp(&self, other: &Self) -> Ordering {
201 // Use the replica id to break ties between concurrent events.
202 self.value
203 .cmp(&other.value)
204 .then_with(|| self.replica_id.cmp(&other.replica_id))
205 }
206}
207
208impl PartialOrd for Lamport {
209 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
210 Some(self.cmp(other))
211 }
212}
213
214impl Lamport {
215 pub fn new(replica_id: ReplicaId) -> Self {
216 Self {
217 value: 1,
218 replica_id,
219 }
220 }
221
222 pub fn tick(&mut self) -> Self {
223 let timestamp = *self;
224 self.value += 1;
225 timestamp
226 }
227
228 pub fn observe(&mut self, timestamp: Self) {
229 self.value = cmp::max(self.value, timestamp.value) + 1;
230 }
231}
232
233impl fmt::Debug for Local {
234 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
235 write!(f, "Local {{{}: {}}}", self.replica_id, self.value)
236 }
237}
238
239impl fmt::Debug for Lamport {
240 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
241 write!(f, "Lamport {{{}: {}}}", self.replica_id, self.value)
242 }
243}
244
245impl fmt::Debug for Global {
246 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
247 write!(f, "Global {{")?;
248 for timestamp in self.iter() {
249 if timestamp.replica_id > 0 {
250 write!(f, ", ")?;
251 }
252 write!(f, "{}: {}", timestamp.replica_id, timestamp.value)?;
253 }
254 write!(f, "}}")
255 }
256}