1use smallvec::SmallVec;
2use std::{
3 cmp::{self, Ordering},
4 fmt,
5 ops::{Add, AddAssign},
6 slice,
7};
8
9pub type ReplicaId = u16;
10pub type Seq = u32;
11
12#[derive(Clone, Copy, Default, Eq, Hash, PartialEq, Ord, PartialOrd)]
13pub struct Local {
14 pub replica_id: ReplicaId,
15 pub value: Seq,
16}
17
18#[derive(Clone, Copy, Default, Eq, Hash, PartialEq)]
19pub struct Lamport {
20 pub replica_id: ReplicaId,
21 pub value: Seq,
22}
23
24impl Local {
25 pub fn new(replica_id: ReplicaId) -> Self {
26 Self {
27 replica_id,
28 value: 1,
29 }
30 }
31
32 pub fn tick(&mut self) -> Self {
33 let timestamp = *self;
34 self.value += 1;
35 timestamp
36 }
37
38 pub fn observe(&mut self, timestamp: Self) {
39 if timestamp.replica_id == self.replica_id {
40 self.value = cmp::max(self.value, timestamp.value + 1);
41 }
42 }
43}
44
45impl<'a> Add<&'a Self> for Local {
46 type Output = Local;
47
48 fn add(self, other: &'a Self) -> Self::Output {
49 cmp::max(&self, other).clone()
50 }
51}
52
53impl<'a> AddAssign<&'a Local> for Local {
54 fn add_assign(&mut self, other: &Self) {
55 if *self < *other {
56 *self = other.clone();
57 }
58 }
59}
60
61#[derive(Clone, Default, Hash, Eq, PartialEq)]
62pub struct Global(SmallVec<[Local; 3]>);
63
64impl From<Vec<rpc::proto::VectorClockEntry>> for Global {
65 fn from(message: Vec<rpc::proto::VectorClockEntry>) -> Self {
66 let mut version = Self::new();
67 for entry in message {
68 version.observe(Local {
69 replica_id: entry.replica_id as ReplicaId,
70 value: entry.timestamp,
71 });
72 }
73 version
74 }
75}
76
77impl<'a> From<&'a Global> for Vec<rpc::proto::VectorClockEntry> {
78 fn from(version: &'a Global) -> Self {
79 version
80 .iter()
81 .map(|entry| rpc::proto::VectorClockEntry {
82 replica_id: entry.replica_id as u32,
83 timestamp: entry.value,
84 })
85 .collect()
86 }
87}
88
89impl From<Global> for Vec<rpc::proto::VectorClockEntry> {
90 fn from(version: Global) -> Self {
91 (&version).into()
92 }
93}
94
95impl Global {
96 pub fn new() -> Self {
97 Self::default()
98 }
99
100 pub fn get(&self, replica_id: ReplicaId) -> Seq {
101 self.0
102 .iter()
103 .find(|t| t.replica_id == replica_id)
104 .map_or(0, |t| t.value)
105 }
106
107 pub fn observe(&mut self, timestamp: Local) {
108 if let Some(entry) = self
109 .0
110 .iter_mut()
111 .find(|t| t.replica_id == timestamp.replica_id)
112 {
113 entry.value = cmp::max(entry.value, timestamp.value);
114 } else {
115 self.0.push(timestamp);
116 }
117 }
118
119 pub fn join(&mut self, other: &Self) {
120 for timestamp in other.0.iter() {
121 self.observe(*timestamp);
122 }
123 }
124
125 pub fn meet(&mut self, other: &Self) {
126 for timestamp in other.0.iter() {
127 if let Some(entry) = self
128 .0
129 .iter_mut()
130 .find(|t| t.replica_id == timestamp.replica_id)
131 {
132 entry.value = cmp::min(entry.value, timestamp.value);
133 } else {
134 self.0.push(*timestamp);
135 }
136 }
137 }
138
139 pub fn observed(&self, timestamp: Local) -> bool {
140 self.get(timestamp.replica_id) >= timestamp.value
141 }
142
143 pub fn changed_since(&self, other: &Self) -> bool {
144 self.0.iter().any(|t| t.value > other.get(t.replica_id))
145 }
146
147 pub fn iter(&self) -> slice::Iter<Local> {
148 self.0.iter()
149 }
150}
151
152impl PartialOrd for Global {
153 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
154 let mut global_ordering = Ordering::Equal;
155
156 for timestamp in self.0.iter().chain(other.0.iter()) {
157 let ordering = self
158 .get(timestamp.replica_id)
159 .cmp(&other.get(timestamp.replica_id));
160 if ordering != Ordering::Equal {
161 if global_ordering == Ordering::Equal {
162 global_ordering = ordering;
163 } else if ordering != global_ordering {
164 return None;
165 }
166 }
167 }
168
169 Some(global_ordering)
170 }
171}
172
173impl Ord for Lamport {
174 fn cmp(&self, other: &Self) -> Ordering {
175 // Use the replica id to break ties between concurrent events.
176 self.value
177 .cmp(&other.value)
178 .then_with(|| self.replica_id.cmp(&other.replica_id))
179 }
180}
181
182impl PartialOrd for Lamport {
183 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
184 Some(self.cmp(other))
185 }
186}
187
188impl Lamport {
189 pub fn new(replica_id: ReplicaId) -> Self {
190 Self {
191 value: 1,
192 replica_id,
193 }
194 }
195
196 pub fn tick(&mut self) -> Self {
197 let timestamp = *self;
198 self.value += 1;
199 timestamp
200 }
201
202 pub fn observe(&mut self, timestamp: Self) {
203 self.value = cmp::max(self.value, timestamp.value) + 1;
204 }
205}
206
207impl fmt::Debug for Local {
208 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
209 write!(f, "Local {{{}: {}}}", self.replica_id, self.value)
210 }
211}
212
213impl fmt::Debug for Lamport {
214 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
215 write!(f, "Lamport {{{}: {}}}", self.replica_id, self.value)
216 }
217}
218
219impl fmt::Debug for Global {
220 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
221 write!(f, "Global {{")?;
222 for (i, element) in self.0.iter().enumerate() {
223 if i > 0 {
224 write!(f, ", ")?;
225 }
226 write!(f, "{}: {}", element.replica_id, element.value)?;
227 }
228 write!(f, "}}")
229 }
230}