1use smallvec::SmallVec;
2use std::{
3 cmp::{self, Ordering},
4 fmt,
5 ops::{Add, AddAssign},
6 slice,
7};
8
9pub type ReplicaId = u16;
10pub type Seq = u32;
11
12#[derive(Clone, Copy, Default, Eq, Hash, PartialEq, Ord, PartialOrd)]
13pub struct Local {
14 pub replica_id: ReplicaId,
15 pub value: Seq,
16}
17
18#[derive(Clone, Copy, Default, Eq, Hash, PartialEq)]
19pub struct Lamport {
20 pub replica_id: ReplicaId,
21 pub value: Seq,
22}
23
24impl Local {
25 pub fn new(replica_id: ReplicaId) -> Self {
26 Self {
27 replica_id,
28 value: 1,
29 }
30 }
31
32 pub fn tick(&mut self) -> Self {
33 let timestamp = *self;
34 self.value += 1;
35 timestamp
36 }
37
38 pub fn observe(&mut self, timestamp: Self) {
39 if timestamp.replica_id == self.replica_id {
40 self.value = cmp::max(self.value, timestamp.value + 1);
41 }
42 }
43}
44
45impl<'a> Add<&'a Self> for Local {
46 type Output = Local;
47
48 fn add(self, other: &'a Self) -> Self::Output {
49 cmp::max(&self, other).clone()
50 }
51}
52
53impl<'a> AddAssign<&'a Local> for Local {
54 fn add_assign(&mut self, other: &Self) {
55 if *self < *other {
56 *self = other.clone();
57 }
58 }
59}
60
61#[derive(Clone, Default, Hash, Eq, PartialEq)]
62pub struct Global(SmallVec<[Local; 3]>);
63
64impl From<Vec<zrpc::proto::VectorClockEntry>> for Global {
65 fn from(message: Vec<zrpc::proto::VectorClockEntry>) -> Self {
66 let mut version = Self::new();
67 for entry in message {
68 version.observe(Local {
69 replica_id: entry.replica_id as ReplicaId,
70 value: entry.timestamp,
71 });
72 }
73 version
74 }
75}
76
77impl<'a> From<&'a Global> for Vec<zrpc::proto::VectorClockEntry> {
78 fn from(version: &'a Global) -> Self {
79 version
80 .iter()
81 .map(|entry| zrpc::proto::VectorClockEntry {
82 replica_id: entry.replica_id as u32,
83 timestamp: entry.value,
84 })
85 .collect()
86 }
87}
88
89impl Global {
90 pub fn new() -> Self {
91 Self::default()
92 }
93
94 pub fn get(&self, replica_id: ReplicaId) -> Seq {
95 self.0
96 .iter()
97 .find(|t| t.replica_id == replica_id)
98 .map_or(0, |t| t.value)
99 }
100
101 pub fn observe(&mut self, timestamp: Local) {
102 if let Some(entry) = self
103 .0
104 .iter_mut()
105 .find(|t| t.replica_id == timestamp.replica_id)
106 {
107 entry.value = cmp::max(entry.value, timestamp.value);
108 } else {
109 self.0.push(timestamp);
110 }
111 }
112
113 pub fn join(&mut self, other: &Self) {
114 for timestamp in other.0.iter() {
115 self.observe(*timestamp);
116 }
117 }
118
119 pub fn meet(&mut self, other: &Self) {
120 for timestamp in other.0.iter() {
121 if let Some(entry) = self
122 .0
123 .iter_mut()
124 .find(|t| t.replica_id == timestamp.replica_id)
125 {
126 entry.value = cmp::min(entry.value, timestamp.value);
127 } else {
128 self.0.push(*timestamp);
129 }
130 }
131 }
132
133 pub fn observed(&self, timestamp: Local) -> bool {
134 self.get(timestamp.replica_id) >= timestamp.value
135 }
136
137 pub fn changed_since(&self, other: &Self) -> bool {
138 self.0.iter().any(|t| t.value > other.get(t.replica_id))
139 }
140
141 pub fn iter(&self) -> slice::Iter<Local> {
142 self.0.iter()
143 }
144}
145
146impl PartialOrd for Global {
147 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
148 let mut global_ordering = Ordering::Equal;
149
150 for timestamp in self.0.iter().chain(other.0.iter()) {
151 let ordering = self
152 .get(timestamp.replica_id)
153 .cmp(&other.get(timestamp.replica_id));
154 if ordering != Ordering::Equal {
155 if global_ordering == Ordering::Equal {
156 global_ordering = ordering;
157 } else if ordering != global_ordering {
158 return None;
159 }
160 }
161 }
162
163 Some(global_ordering)
164 }
165}
166
167impl Ord for Lamport {
168 fn cmp(&self, other: &Self) -> Ordering {
169 // Use the replica id to break ties between concurrent events.
170 self.value
171 .cmp(&other.value)
172 .then_with(|| self.replica_id.cmp(&other.replica_id))
173 }
174}
175
176impl PartialOrd for Lamport {
177 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
178 Some(self.cmp(other))
179 }
180}
181
182impl Lamport {
183 pub fn new(replica_id: ReplicaId) -> Self {
184 Self {
185 value: 1,
186 replica_id,
187 }
188 }
189
190 pub fn tick(&mut self) -> Self {
191 let timestamp = *self;
192 self.value += 1;
193 timestamp
194 }
195
196 pub fn observe(&mut self, timestamp: Self) {
197 self.value = cmp::max(self.value, timestamp.value) + 1;
198 }
199}
200
201impl fmt::Debug for Local {
202 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
203 write!(f, "Local {{{}: {}}}", self.replica_id, self.value)
204 }
205}
206
207impl fmt::Debug for Lamport {
208 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
209 write!(f, "Lamport {{{}: {}}}", self.replica_id, self.value)
210 }
211}
212
213impl fmt::Debug for Global {
214 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
215 write!(f, "Global {{")?;
216 for (i, element) in self.0.iter().enumerate() {
217 if i > 0 {
218 write!(f, ", ")?;
219 }
220 write!(f, "{}: {}", element.replica_id, element.value)?;
221 }
222 write!(f, "}}")
223 }
224}