1mod system_clock;
2
3use serde::{Deserialize, Serialize};
4use smallvec::SmallVec;
5use std::{
6 cmp::{self, Ordering},
7 fmt,
8};
9
10pub use system_clock::*;
11
12/// A unique identifier for each distributed node.
13#[derive(Clone, Copy, Default, Eq, Hash, PartialEq, Ord, PartialOrd, Serialize, Deserialize)]
14pub struct ReplicaId(u16);
15
16impl ReplicaId {
17 /// The local replica
18 pub const LOCAL: ReplicaId = ReplicaId(0);
19 /// The remote replica of the connected remote server.
20 pub const REMOTE_SERVER: ReplicaId = ReplicaId(1);
21 /// The agent's unique identifier.
22 pub const AGENT: ReplicaId = ReplicaId(2);
23 /// A local branch.
24 pub const LOCAL_BRANCH: ReplicaId = ReplicaId(3);
25 /// The first collaborative replica ID, any replica equal or greater than this is a collaborative replica.
26 pub const FIRST_COLLAB_ID: ReplicaId = ReplicaId(8);
27
28 pub fn new(id: u16) -> Self {
29 ReplicaId(id)
30 }
31
32 pub fn as_u16(&self) -> u16 {
33 self.0
34 }
35
36 pub fn is_remote(self) -> bool {
37 self == ReplicaId::REMOTE_SERVER || self >= ReplicaId::FIRST_COLLAB_ID
38 }
39}
40
41impl fmt::Debug for ReplicaId {
42 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
43 if *self == ReplicaId::LOCAL {
44 write!(f, "<local>")
45 } else if *self == ReplicaId::REMOTE_SERVER {
46 write!(f, "<remote>")
47 } else if *self == ReplicaId::AGENT {
48 write!(f, "<agent>")
49 } else if *self == ReplicaId::LOCAL_BRANCH {
50 write!(f, "<branch>")
51 } else {
52 write!(f, "{}", self.0)
53 }
54 }
55}
56
57/// A [Lamport sequence number](https://en.wikipedia.org/wiki/Lamport_timestamp).
58pub type Seq = u32;
59
60/// A [Lamport timestamp](https://en.wikipedia.org/wiki/Lamport_timestamp),
61/// used to determine the ordering of events in the editor.
62#[derive(Clone, Copy, Eq, Hash, PartialEq, Serialize, Deserialize)]
63pub struct Lamport {
64 pub value: Seq,
65 pub replica_id: ReplicaId,
66}
67
68/// A [version vector](https://en.wikipedia.org/wiki/Version_vector).
69#[derive(Default, Hash, Eq, PartialEq)]
70pub struct Global {
71 // 4 is chosen as it is the biggest count that does not increase the size of the field itself.
72 // Coincidentally, it also covers all the important non-collab replica ids.
73 values: SmallVec<[u32; 4]>,
74}
75
76impl Clone for Global {
77 fn clone(&self) -> Self {
78 // We manually implement clone to avoid the overhead of SmallVec's clone implementation.
79 // Using `from_slice` is faster than `clone` for SmallVec as we can use our `Copy` implementation of u32.
80 Self {
81 values: SmallVec::from_slice(&self.values),
82 }
83 }
84
85 fn clone_from(&mut self, source: &Self) {
86 self.values.clone_from(&source.values);
87 }
88}
89
90impl Global {
91 pub fn new() -> Self {
92 Self::default()
93 }
94
95 /// Fetches the sequence number for the given replica ID.
96 pub fn get(&self, replica_id: ReplicaId) -> Seq {
97 self.values.get(replica_id.0 as usize).copied().unwrap_or(0) as Seq
98 }
99
100 /// Observe the lamport timestamp.
101 ///
102 /// This sets the current sequence number of the observed replica ID to the maximum of this global's observed sequence and the observed timestamp.
103 pub fn observe(&mut self, timestamp: Lamport) {
104 debug_assert_ne!(timestamp.replica_id, Lamport::MAX.replica_id);
105 if timestamp.value > 0 {
106 let new_len = timestamp.replica_id.0 as usize + 1;
107 if new_len > self.values.len() {
108 self.values.resize(new_len, 0);
109 }
110
111 let entry = &mut self.values[timestamp.replica_id.0 as usize];
112 *entry = cmp::max(*entry, timestamp.value);
113 }
114 }
115
116 /// Join another global.
117 ///
118 /// This observes all timestamps from the other global.
119 #[doc(alias = "synchronize")]
120 pub fn join(&mut self, other: &Self) {
121 if other.values.len() > self.values.len() {
122 self.values.resize(other.values.len(), 0);
123 }
124
125 for (left, right) in self.values.iter_mut().zip(&other.values) {
126 *left = cmp::max(*left, *right);
127 }
128 }
129
130 /// Meet another global.
131 ///
132 /// Sets all unobserved timestamps of this global to the sequences of other and sets all observed timestamps of this global to the minimum observed of both globals.
133 pub fn meet(&mut self, other: &Self) {
134 if other.values.len() > self.values.len() {
135 self.values.resize(other.values.len(), 0);
136 }
137
138 let mut new_len = 0;
139 for (ix, (left, &right)) in self.values.iter_mut().zip(&other.values).enumerate() {
140 match (*left, right) {
141 // left has not observed the replica
142 (0, _) => *left = right,
143 // right has not observed the replica
144 (_, 0) => (),
145 (_, _) => *left = cmp::min(*left, right),
146 }
147 if *left != 0 {
148 new_len = ix + 1;
149 }
150 }
151 if other.values.len() == self.values.len() {
152 // only truncate if other was equal or shorter (which at this point
153 // cant be due to the resize above) to `self` as otherwise we would
154 // truncate the unprocessed tail that is guaranteed to contain
155 // non-null timestamps
156 self.values.truncate(new_len);
157 }
158 }
159
160 pub fn observed(&self, timestamp: Lamport) -> bool {
161 self.get(timestamp.replica_id) >= timestamp.value
162 }
163
164 pub fn observed_any(&self, other: &Self) -> bool {
165 self.iter()
166 .zip(other.iter())
167 .any(|(left, right)| right.value > 0 && left.value >= right.value)
168 }
169
170 pub fn observed_all(&self, other: &Self) -> bool {
171 if self.values.len() < other.values.len() {
172 return false;
173 }
174 self.iter()
175 .zip(other.iter())
176 .all(|(left, right)| left.value >= right.value)
177 }
178
179 pub fn changed_since(&self, other: &Self) -> bool {
180 self.values.len() > other.values.len()
181 || self
182 .values
183 .iter()
184 .zip(other.values.iter())
185 .any(|(left, right)| left > right)
186 }
187
188 pub fn most_recent(&self) -> Option<Lamport> {
189 self.iter().max_by_key(|timestamp| timestamp.value)
190 }
191
192 /// Iterates all replicas observed by this global as well as any unobserved replicas whose ID is lower than the highest observed replica.
193 pub fn iter(&self) -> impl Iterator<Item = Lamport> + '_ {
194 self.values
195 .iter()
196 .enumerate()
197 .map(|(replica_id, seq)| Lamport {
198 replica_id: ReplicaId(replica_id as u16),
199 value: *seq,
200 })
201 }
202}
203
204impl FromIterator<Lamport> for Global {
205 fn from_iter<T: IntoIterator<Item = Lamport>>(locals: T) -> Self {
206 let mut result = Self::new();
207 for local in locals {
208 result.observe(local);
209 }
210 result
211 }
212}
213
214impl Ord for Lamport {
215 fn cmp(&self, other: &Self) -> Ordering {
216 // Use the replica id to break ties between concurrent events.
217 self.value
218 .cmp(&other.value)
219 .then_with(|| self.replica_id.cmp(&other.replica_id))
220 }
221}
222
223impl PartialOrd for Lamport {
224 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
225 Some(self.cmp(other))
226 }
227}
228
229impl Lamport {
230 pub const MIN: Self = Self {
231 replica_id: ReplicaId(u16::MIN),
232 value: Seq::MIN,
233 };
234
235 pub const MAX: Self = Self {
236 replica_id: ReplicaId(u16::MAX),
237 value: Seq::MAX,
238 };
239
240 pub fn new(replica_id: ReplicaId) -> Self {
241 Self {
242 value: 1,
243 replica_id,
244 }
245 }
246
247 pub fn as_u64(self) -> u64 {
248 ((self.value as u64) << 32) | (self.replica_id.0 as u64)
249 }
250
251 pub fn tick(&mut self) -> Self {
252 let timestamp = *self;
253 self.value += 1;
254 timestamp
255 }
256
257 pub fn observe(&mut self, timestamp: Self) {
258 self.value = cmp::max(self.value, timestamp.value) + 1;
259 }
260}
261
262impl fmt::Debug for Lamport {
263 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
264 if *self == Self::MAX {
265 write!(f, "Lamport {{MAX}}")
266 } else if *self == Self::MIN {
267 write!(f, "Lamport {{MIN}}")
268 } else {
269 write!(f, "Lamport {{{:?}: {}}}", self.replica_id, self.value)
270 }
271 }
272}
273
274impl fmt::Debug for Global {
275 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
276 write!(f, "Global {{")?;
277 for timestamp in self.iter().filter(|t| t.value > 0) {
278 if timestamp.replica_id.0 > 0 {
279 write!(f, ", ")?;
280 }
281 write!(f, "{:?}: {}", timestamp.replica_id, timestamp.value)?;
282 }
283 write!(f, "}}")
284 }
285}