1mod system_clock;
2
3use serde::{Deserialize, Serialize};
4use smallvec::SmallVec;
5use std::{
6 cmp::{self, Ordering},
7 fmt,
8};
9
10pub use system_clock::*;
11
12/// A unique identifier for each distributed node.
13#[derive(Clone, Copy, Default, Eq, Hash, PartialEq, Ord, PartialOrd, Serialize, Deserialize)]
14pub struct ReplicaId(u16);
15
16impl ReplicaId {
17 /// The local replica
18 pub const LOCAL: ReplicaId = ReplicaId(0);
19 /// The remote replica of the connected remote server.
20 pub const REMOTE_SERVER: ReplicaId = ReplicaId(1);
21 /// The agent's unique identifier.
22 pub const AGENT: ReplicaId = ReplicaId(2);
23 /// A local branch.
24 pub const LOCAL_BRANCH: ReplicaId = ReplicaId(3);
25 /// The first collaborative replica ID, any replica equal or greater than this is a collaborative replica.
26 pub const FIRST_COLLAB_ID: ReplicaId = ReplicaId(8);
27
28 pub fn new(id: u16) -> Self {
29 ReplicaId(id)
30 }
31
32 pub fn as_u16(&self) -> u16 {
33 self.0
34 }
35
36 pub fn is_remote(self) -> bool {
37 self == ReplicaId::REMOTE_SERVER || self >= ReplicaId::FIRST_COLLAB_ID
38 }
39}
40
41impl fmt::Debug for ReplicaId {
42 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
43 if *self == ReplicaId::LOCAL {
44 write!(f, "<local>")
45 } else if *self == ReplicaId::REMOTE_SERVER {
46 write!(f, "<remote>")
47 } else if *self == ReplicaId::AGENT {
48 write!(f, "<agent>")
49 } else if *self == ReplicaId::LOCAL_BRANCH {
50 write!(f, "<branch>")
51 } else {
52 write!(f, "{}", self.0)
53 }
54 }
55}
56
57/// A [Lamport sequence number](https://en.wikipedia.org/wiki/Lamport_timestamp).
58pub type Seq = u32;
59
60/// A [Lamport timestamp](https://en.wikipedia.org/wiki/Lamport_timestamp),
61/// used to determine the ordering of events in the editor.
62#[derive(Clone, Copy, Eq, Hash, PartialEq, Serialize, Deserialize)]
63pub struct Lamport {
64 pub replica_id: ReplicaId,
65 pub value: Seq,
66}
67
68/// A [version vector](https://en.wikipedia.org/wiki/Version_vector).
69#[derive(Clone, Default, Hash, Eq, PartialEq)]
70pub struct Global {
71 // 4 is chosen as it is the biggest count that does not increase the size of the field itself.
72 // Coincidentally, it also covers all the important non-collab replica ids.
73 values: SmallVec<[u32; 4]>,
74}
75
76impl Global {
77 pub fn new() -> Self {
78 Self::default()
79 }
80
81 /// Fetches the sequence number for the given replica ID.
82 pub fn get(&self, replica_id: ReplicaId) -> Seq {
83 self.values.get(replica_id.0 as usize).copied().unwrap_or(0) as Seq
84 }
85
86 /// Observe the lamport timestamp.
87 ///
88 /// This sets the current sequence number of the observed replica ID to the maximum of this global's observed sequence and the observed timestamp.
89 pub fn observe(&mut self, timestamp: Lamport) {
90 debug_assert_ne!(timestamp.replica_id, Lamport::MAX.replica_id);
91 if timestamp.value > 0 {
92 let new_len = timestamp.replica_id.0 as usize + 1;
93 if new_len > self.values.len() {
94 self.values.resize(new_len, 0);
95 }
96
97 let entry = &mut self.values[timestamp.replica_id.0 as usize];
98 *entry = cmp::max(*entry, timestamp.value);
99 }
100 }
101
102 /// Join another global.
103 ///
104 /// This observes all timestamps from the other global.
105 #[doc(alias = "synchronize")]
106 pub fn join(&mut self, other: &Self) {
107 if other.values.len() > self.values.len() {
108 self.values.resize(other.values.len(), 0);
109 }
110
111 for (left, right) in self.values.iter_mut().zip(&other.values) {
112 *left = cmp::max(*left, *right);
113 }
114 }
115
116 /// Meet another global.
117 ///
118 /// Sets all unobserved timestamps of this global to the sequences of other and sets all observed timestamps of this global to the minimum observed of both globals.
119 pub fn meet(&mut self, other: &Self) {
120 if other.values.len() > self.values.len() {
121 self.values.resize(other.values.len(), 0);
122 }
123
124 let mut new_len = 0;
125 for (ix, (left, &right)) in self.values.iter_mut().zip(&other.values).enumerate() {
126 match (*left, right) {
127 // left has not observed the replica
128 (0, _) => *left = right,
129 // right has not observed the replica
130 (_, 0) => (),
131 (_, _) => *left = cmp::min(*left, right),
132 }
133 if *left != 0 {
134 new_len = ix + 1;
135 }
136 }
137 if other.values.len() == self.values.len() {
138 // only truncate if other was equal or shorter (which at this point
139 // cant be due to the resize above) to `self` as otherwise we would
140 // truncate the unprocessed tail that is guaranteed to contain
141 // non-null timestamps
142 self.values.truncate(new_len);
143 }
144 }
145
146 pub fn observed(&self, timestamp: Lamport) -> bool {
147 self.get(timestamp.replica_id) >= timestamp.value
148 }
149
150 pub fn observed_any(&self, other: &Self) -> bool {
151 self.iter()
152 .zip(other.iter())
153 .any(|(left, right)| right.value > 0 && left.value >= right.value)
154 }
155
156 pub fn observed_all(&self, other: &Self) -> bool {
157 if self.values.len() < other.values.len() {
158 return false;
159 }
160 self.iter()
161 .zip(other.iter())
162 .all(|(left, right)| left.value >= right.value)
163 }
164
165 pub fn changed_since(&self, other: &Self) -> bool {
166 self.values.len() > other.values.len()
167 || self
168 .values
169 .iter()
170 .zip(other.values.iter())
171 .any(|(left, right)| left > right)
172 }
173
174 pub fn most_recent(&self) -> Option<Lamport> {
175 self.iter().max_by_key(|timestamp| timestamp.value)
176 }
177
178 /// Iterates all replicas observed by this global as well as any unobserved replicas whose ID is lower than the highest observed replica.
179 pub fn iter(&self) -> impl Iterator<Item = Lamport> + '_ {
180 self.values
181 .iter()
182 .enumerate()
183 .map(|(replica_id, seq)| Lamport {
184 replica_id: ReplicaId(replica_id as u16),
185 value: *seq,
186 })
187 }
188}
189
190impl FromIterator<Lamport> for Global {
191 fn from_iter<T: IntoIterator<Item = Lamport>>(locals: T) -> Self {
192 let mut result = Self::new();
193 for local in locals {
194 result.observe(local);
195 }
196 result
197 }
198}
199
200impl Ord for Lamport {
201 fn cmp(&self, other: &Self) -> Ordering {
202 // Use the replica id to break ties between concurrent events.
203 self.value
204 .cmp(&other.value)
205 .then_with(|| self.replica_id.cmp(&other.replica_id))
206 }
207}
208
209impl PartialOrd for Lamport {
210 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
211 Some(self.cmp(other))
212 }
213}
214
215impl Lamport {
216 pub const MIN: Self = Self {
217 replica_id: ReplicaId(u16::MIN),
218 value: Seq::MIN,
219 };
220
221 pub const MAX: Self = Self {
222 replica_id: ReplicaId(u16::MAX),
223 value: Seq::MAX,
224 };
225
226 pub fn new(replica_id: ReplicaId) -> Self {
227 Self {
228 value: 1,
229 replica_id,
230 }
231 }
232
233 pub fn as_u64(self) -> u64 {
234 ((self.value as u64) << 32) | (self.replica_id.0 as u64)
235 }
236
237 pub fn tick(&mut self) -> Self {
238 let timestamp = *self;
239 self.value += 1;
240 timestamp
241 }
242
243 pub fn observe(&mut self, timestamp: Self) {
244 self.value = cmp::max(self.value, timestamp.value) + 1;
245 }
246}
247
248impl fmt::Debug for Lamport {
249 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
250 if *self == Self::MAX {
251 write!(f, "Lamport {{MAX}}")
252 } else if *self == Self::MIN {
253 write!(f, "Lamport {{MIN}}")
254 } else {
255 write!(f, "Lamport {{{:?}: {}}}", self.replica_id, self.value)
256 }
257 }
258}
259
260impl fmt::Debug for Global {
261 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
262 write!(f, "Global {{")?;
263 for timestamp in self.iter() {
264 if timestamp.replica_id.0 > 0 {
265 write!(f, ", ")?;
266 }
267 write!(f, "{:?}: {}", timestamp.replica_id, timestamp.value)?;
268 }
269 write!(f, "}}")
270 }
271}