1use smallvec::SmallVec;
2use std::{
3 cmp::{self, Ordering},
4 fmt, iter,
5 ops::{Add, AddAssign},
6};
7
8pub type ReplicaId = u16;
9pub type Seq = u32;
10
11#[derive(Clone, Copy, Default, Eq, Hash, PartialEq, Ord, PartialOrd)]
12pub struct Local {
13 pub replica_id: ReplicaId,
14 pub value: Seq,
15}
16
17#[derive(Clone, Copy, Default, Eq, Hash, PartialEq)]
18pub struct Lamport {
19 pub replica_id: ReplicaId,
20 pub value: Seq,
21}
22
23impl Local {
24 pub const MIN: Self = Self {
25 replica_id: ReplicaId::MIN,
26 value: Seq::MIN,
27 };
28 pub const MAX: Self = Self {
29 replica_id: ReplicaId::MAX,
30 value: Seq::MAX,
31 };
32
33 pub fn new(replica_id: ReplicaId) -> Self {
34 Self {
35 replica_id,
36 value: 1,
37 }
38 }
39
40 pub fn tick(&mut self) -> Self {
41 let timestamp = *self;
42 self.value += 1;
43 timestamp
44 }
45
46 pub fn observe(&mut self, timestamp: Self) {
47 if timestamp.replica_id == self.replica_id {
48 self.value = cmp::max(self.value, timestamp.value + 1);
49 }
50 }
51}
52
53impl<'a> Add<&'a Self> for Local {
54 type Output = Local;
55
56 fn add(self, other: &'a Self) -> Self::Output {
57 *cmp::max(&self, other)
58 }
59}
60
61impl<'a> AddAssign<&'a Local> for Local {
62 fn add_assign(&mut self, other: &Self) {
63 if *self < *other {
64 *self = *other;
65 }
66 }
67}
68
69#[derive(Clone, Default, Hash, Eq, PartialEq)]
70pub struct Global(SmallVec<[u32; 8]>);
71
72impl Global {
73 pub fn new() -> Self {
74 Self::default()
75 }
76
77 pub fn get(&self, replica_id: ReplicaId) -> Seq {
78 self.0.get(replica_id as usize).copied().unwrap_or(0) as Seq
79 }
80
81 pub fn observe(&mut self, timestamp: Local) {
82 if timestamp.value > 0 {
83 let new_len = timestamp.replica_id as usize + 1;
84 if new_len > self.0.len() {
85 self.0.resize(new_len, 0);
86 }
87
88 let entry = &mut self.0[timestamp.replica_id as usize];
89 *entry = cmp::max(*entry, timestamp.value);
90 }
91 }
92
93 pub fn join(&mut self, other: &Self) {
94 if other.0.len() > self.0.len() {
95 self.0.resize(other.0.len(), 0);
96 }
97
98 for (left, right) in self.0.iter_mut().zip(&other.0) {
99 *left = cmp::max(*left, *right);
100 }
101 }
102
103 pub fn meet(&mut self, other: &Self) {
104 if other.0.len() > self.0.len() {
105 self.0.resize(other.0.len(), 0);
106 }
107
108 let mut new_len = 0;
109 for (ix, (left, right)) in self
110 .0
111 .iter_mut()
112 .zip(other.0.iter().chain(iter::repeat(&0)))
113 .enumerate()
114 {
115 if *left == 0 {
116 *left = *right;
117 } else if *right > 0 {
118 *left = cmp::min(*left, *right);
119 }
120
121 if *left != 0 {
122 new_len = ix + 1;
123 }
124 }
125 self.0.resize(new_len, 0);
126 }
127
128 pub fn observed(&self, timestamp: Local) -> bool {
129 self.get(timestamp.replica_id) >= timestamp.value
130 }
131
132 pub fn observed_any(&self, other: &Self) -> bool {
133 let mut lhs = self.0.iter();
134 let mut rhs = other.0.iter();
135 loop {
136 if let Some(left) = lhs.next() {
137 if let Some(right) = rhs.next() {
138 if *right > 0 && left >= right {
139 return true;
140 }
141 } else {
142 return false;
143 }
144 } else {
145 return false;
146 }
147 }
148 }
149
150 pub fn observed_all(&self, other: &Self) -> bool {
151 let mut lhs = self.0.iter();
152 let mut rhs = other.0.iter();
153 loop {
154 if let Some(left) = lhs.next() {
155 if let Some(right) = rhs.next() {
156 if left < right {
157 return false;
158 }
159 } else {
160 return true;
161 }
162 } else {
163 return rhs.next().is_none();
164 }
165 }
166 }
167
168 pub fn changed_since(&self, other: &Self) -> bool {
169 if self.0.len() > other.0.len() {
170 return true;
171 }
172 for (left, right) in self.0.iter().zip(other.0.iter()) {
173 if left > right {
174 return true;
175 }
176 }
177 false
178 }
179
180 pub fn iter(&self) -> impl Iterator<Item = Local> + '_ {
181 self.0.iter().enumerate().map(|(replica_id, seq)| Local {
182 replica_id: replica_id as ReplicaId,
183 value: *seq,
184 })
185 }
186}
187
188impl FromIterator<Local> for Global {
189 fn from_iter<T: IntoIterator<Item = Local>>(locals: T) -> Self {
190 let mut result = Self::new();
191 for local in locals {
192 result.observe(local);
193 }
194 result
195 }
196}
197
198impl Ord for Lamport {
199 fn cmp(&self, other: &Self) -> Ordering {
200 // Use the replica id to break ties between concurrent events.
201 self.value
202 .cmp(&other.value)
203 .then_with(|| self.replica_id.cmp(&other.replica_id))
204 }
205}
206
207impl PartialOrd for Lamport {
208 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
209 Some(self.cmp(other))
210 }
211}
212
213impl Lamport {
214 pub fn new(replica_id: ReplicaId) -> Self {
215 Self {
216 value: 1,
217 replica_id,
218 }
219 }
220
221 pub fn tick(&mut self) -> Self {
222 let timestamp = *self;
223 self.value += 1;
224 timestamp
225 }
226
227 pub fn observe(&mut self, timestamp: Self) {
228 self.value = cmp::max(self.value, timestamp.value) + 1;
229 }
230}
231
232impl fmt::Debug for Local {
233 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
234 write!(f, "Local {{{}: {}}}", self.replica_id, self.value)
235 }
236}
237
238impl fmt::Debug for Lamport {
239 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
240 write!(f, "Lamport {{{}: {}}}", self.replica_id, self.value)
241 }
242}
243
244impl fmt::Debug for Global {
245 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
246 write!(f, "Global {{")?;
247 for timestamp in self.iter() {
248 if timestamp.replica_id > 0 {
249 write!(f, ", ")?;
250 }
251 write!(f, "{}: {}", timestamp.replica_id, timestamp.value)?;
252 }
253 write!(f, "}}")
254 }
255}