1use smallvec::SmallVec;
2use std::{
3 cmp::{self, Ordering},
4 fmt, iter,
5};
6
7pub type ReplicaId = u16;
8pub type Seq = u32;
9
10#[derive(Clone, Copy, Default, Eq, Hash, PartialEq)]
11pub struct Lamport {
12 pub replica_id: ReplicaId,
13 pub value: Seq,
14}
15
16/// A vector clock
17#[derive(Clone, Default, Hash, Eq, PartialEq)]
18pub struct Global(SmallVec<[u32; 8]>);
19
20impl Global {
21 pub fn new() -> Self {
22 Self::default()
23 }
24
25 pub fn get(&self, replica_id: ReplicaId) -> Seq {
26 self.0.get(replica_id as usize).copied().unwrap_or(0) as Seq
27 }
28
29 pub fn observe(&mut self, timestamp: Lamport) {
30 if timestamp.value > 0 {
31 let new_len = timestamp.replica_id as usize + 1;
32 if new_len > self.0.len() {
33 self.0.resize(new_len, 0);
34 }
35
36 let entry = &mut self.0[timestamp.replica_id as usize];
37 *entry = cmp::max(*entry, timestamp.value);
38 }
39 }
40
41 pub fn join(&mut self, other: &Self) {
42 if other.0.len() > self.0.len() {
43 self.0.resize(other.0.len(), 0);
44 }
45
46 for (left, right) in self.0.iter_mut().zip(&other.0) {
47 *left = cmp::max(*left, *right);
48 }
49 }
50
51 pub fn meet(&mut self, other: &Self) {
52 if other.0.len() > self.0.len() {
53 self.0.resize(other.0.len(), 0);
54 }
55
56 let mut new_len = 0;
57 for (ix, (left, right)) in self
58 .0
59 .iter_mut()
60 .zip(other.0.iter().chain(iter::repeat(&0)))
61 .enumerate()
62 {
63 if *left == 0 {
64 *left = *right;
65 } else if *right > 0 {
66 *left = cmp::min(*left, *right);
67 }
68
69 if *left != 0 {
70 new_len = ix + 1;
71 }
72 }
73 self.0.resize(new_len, 0);
74 }
75
76 pub fn observed(&self, timestamp: Lamport) -> bool {
77 self.get(timestamp.replica_id) >= timestamp.value
78 }
79
80 pub fn observed_any(&self, other: &Self) -> bool {
81 let mut lhs = self.0.iter();
82 let mut rhs = other.0.iter();
83 loop {
84 if let Some(left) = lhs.next() {
85 if let Some(right) = rhs.next() {
86 if *right > 0 && left >= right {
87 return true;
88 }
89 } else {
90 return false;
91 }
92 } else {
93 return false;
94 }
95 }
96 }
97
98 pub fn observed_all(&self, other: &Self) -> bool {
99 let mut lhs = self.0.iter();
100 let mut rhs = other.0.iter();
101 loop {
102 if let Some(left) = lhs.next() {
103 if let Some(right) = rhs.next() {
104 if left < right {
105 return false;
106 }
107 } else {
108 return true;
109 }
110 } else {
111 return rhs.next().is_none();
112 }
113 }
114 }
115
116 pub fn changed_since(&self, other: &Self) -> bool {
117 if self.0.len() > other.0.len() {
118 return true;
119 }
120 for (left, right) in self.0.iter().zip(other.0.iter()) {
121 if left > right {
122 return true;
123 }
124 }
125 false
126 }
127
128 pub fn iter(&self) -> impl Iterator<Item = Lamport> + '_ {
129 self.0.iter().enumerate().map(|(replica_id, seq)| Lamport {
130 replica_id: replica_id as ReplicaId,
131 value: *seq,
132 })
133 }
134}
135
136impl FromIterator<Lamport> for Global {
137 fn from_iter<T: IntoIterator<Item = Lamport>>(locals: T) -> Self {
138 let mut result = Self::new();
139 for local in locals {
140 result.observe(local);
141 }
142 result
143 }
144}
145
146impl Ord for Lamport {
147 fn cmp(&self, other: &Self) -> Ordering {
148 // Use the replica id to break ties between concurrent events.
149 self.value
150 .cmp(&other.value)
151 .then_with(|| self.replica_id.cmp(&other.replica_id))
152 }
153}
154
155impl PartialOrd for Lamport {
156 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
157 Some(self.cmp(other))
158 }
159}
160
161impl Lamport {
162 pub const MIN: Self = Self {
163 replica_id: ReplicaId::MIN,
164 value: Seq::MIN,
165 };
166
167 pub const MAX: Self = Self {
168 replica_id: ReplicaId::MAX,
169 value: Seq::MAX,
170 };
171
172 pub fn new(replica_id: ReplicaId) -> Self {
173 Self {
174 value: 1,
175 replica_id,
176 }
177 }
178
179 pub fn tick(&mut self) -> Self {
180 let timestamp = *self;
181 self.value += 1;
182 timestamp
183 }
184
185 pub fn observe(&mut self, timestamp: Self) {
186 self.value = cmp::max(self.value, timestamp.value) + 1;
187 }
188}
189
190impl fmt::Debug for Lamport {
191 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
192 write!(f, "Lamport {{{}: {}}}", self.replica_id, self.value)
193 }
194}
195
196impl fmt::Debug for Global {
197 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
198 write!(f, "Global {{")?;
199 for timestamp in self.iter() {
200 if timestamp.replica_id > 0 {
201 write!(f, ", ")?;
202 }
203 write!(f, "{}: {}", timestamp.replica_id, timestamp.value)?;
204 }
205 write!(f, "}}")
206 }
207}