1mod system_clock;
2
3use serde::{Deserialize, Serialize};
4use smallvec::SmallVec;
5use std::{
6 cmp::{self, Ordering},
7 fmt, iter,
8};
9
10pub use system_clock::*;
11
12pub const LOCAL_BRANCH_REPLICA_ID: u16 = u16::MAX;
13pub const AGENT_REPLICA_ID: u16 = u16::MAX - 1;
14
15/// A unique identifier for each distributed node.
16pub type ReplicaId = u16;
17
18/// A [Lamport sequence number](https://en.wikipedia.org/wiki/Lamport_timestamp).
19pub type Seq = u32;
20
21/// A [Lamport timestamp](https://en.wikipedia.org/wiki/Lamport_timestamp),
22/// used to determine the ordering of events in the editor.
23#[derive(Clone, Copy, Default, Eq, Hash, PartialEq, Serialize, Deserialize)]
24pub struct Lamport {
25 pub replica_id: ReplicaId,
26 pub value: Seq,
27}
28
29/// A [vector clock](https://en.wikipedia.org/wiki/Vector_clock).
30#[derive(Clone, Default, Hash, Eq, PartialEq)]
31pub struct Global {
32 values: SmallVec<[u32; 8]>,
33 local_branch_value: u32,
34}
35
36impl Global {
37 pub fn new() -> Self {
38 Self::default()
39 }
40
41 pub fn get(&self, replica_id: ReplicaId) -> Seq {
42 if replica_id == LOCAL_BRANCH_REPLICA_ID {
43 self.local_branch_value
44 } else {
45 self.values.get(replica_id as usize).copied().unwrap_or(0) as Seq
46 }
47 }
48
49 pub fn observe(&mut self, timestamp: Lamport) {
50 if timestamp.value > 0 {
51 if timestamp.replica_id == LOCAL_BRANCH_REPLICA_ID {
52 self.local_branch_value = cmp::max(self.local_branch_value, timestamp.value);
53 } else {
54 let new_len = timestamp.replica_id as usize + 1;
55 if new_len > self.values.len() {
56 self.values.resize(new_len, 0);
57 }
58
59 let entry = &mut self.values[timestamp.replica_id as usize];
60 *entry = cmp::max(*entry, timestamp.value);
61 }
62 }
63 }
64
65 pub fn join(&mut self, other: &Self) {
66 if other.values.len() > self.values.len() {
67 self.values.resize(other.values.len(), 0);
68 }
69
70 for (left, right) in self.values.iter_mut().zip(&other.values) {
71 *left = cmp::max(*left, *right);
72 }
73
74 self.local_branch_value = cmp::max(self.local_branch_value, other.local_branch_value);
75 }
76
77 pub fn meet(&mut self, other: &Self) {
78 if other.values.len() > self.values.len() {
79 self.values.resize(other.values.len(), 0);
80 }
81
82 let mut new_len = 0;
83 for (ix, (left, right)) in self
84 .values
85 .iter_mut()
86 .zip(other.values.iter().chain(iter::repeat(&0)))
87 .enumerate()
88 {
89 if *left == 0 {
90 *left = *right;
91 } else if *right > 0 {
92 *left = cmp::min(*left, *right);
93 }
94
95 if *left != 0 {
96 new_len = ix + 1;
97 }
98 }
99 self.values.resize(new_len, 0);
100 self.local_branch_value = cmp::min(self.local_branch_value, other.local_branch_value);
101 }
102
103 pub fn observed(&self, timestamp: Lamport) -> bool {
104 self.get(timestamp.replica_id) >= timestamp.value
105 }
106
107 pub fn observed_any(&self, other: &Self) -> bool {
108 self.values
109 .iter()
110 .zip(other.values.iter())
111 .any(|(left, right)| *right > 0 && left >= right)
112 || (other.local_branch_value > 0 && self.local_branch_value >= other.local_branch_value)
113 }
114
115 pub fn observed_all(&self, other: &Self) -> bool {
116 let mut rhs = other.values.iter();
117 self.values.iter().all(|left| match rhs.next() {
118 Some(right) => left >= right,
119 None => true,
120 }) && rhs.next().is_none()
121 && self.local_branch_value >= other.local_branch_value
122 }
123
124 pub fn changed_since(&self, other: &Self) -> bool {
125 self.values.len() > other.values.len()
126 || self
127 .values
128 .iter()
129 .zip(other.values.iter())
130 .any(|(left, right)| left > right)
131 || self.local_branch_value > other.local_branch_value
132 }
133
134 pub fn iter(&self) -> impl Iterator<Item = Lamport> + '_ {
135 self.values
136 .iter()
137 .enumerate()
138 .map(|(replica_id, seq)| Lamport {
139 replica_id: replica_id as ReplicaId,
140 value: *seq,
141 })
142 .chain((self.local_branch_value > 0).then_some(Lamport {
143 replica_id: LOCAL_BRANCH_REPLICA_ID,
144 value: self.local_branch_value,
145 }))
146 }
147}
148
149impl FromIterator<Lamport> for Global {
150 fn from_iter<T: IntoIterator<Item = Lamport>>(locals: T) -> Self {
151 let mut result = Self::new();
152 for local in locals {
153 result.observe(local);
154 }
155 result
156 }
157}
158
159impl Ord for Lamport {
160 fn cmp(&self, other: &Self) -> Ordering {
161 // Use the replica id to break ties between concurrent events.
162 self.value
163 .cmp(&other.value)
164 .then_with(|| self.replica_id.cmp(&other.replica_id))
165 }
166}
167
168impl PartialOrd for Lamport {
169 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
170 Some(self.cmp(other))
171 }
172}
173
174impl Lamport {
175 pub const MIN: Self = Self {
176 replica_id: ReplicaId::MIN,
177 value: Seq::MIN,
178 };
179
180 pub const MAX: Self = Self {
181 replica_id: ReplicaId::MAX,
182 value: Seq::MAX,
183 };
184
185 pub fn new(replica_id: ReplicaId) -> Self {
186 Self {
187 value: 1,
188 replica_id,
189 }
190 }
191
192 pub fn as_u64(self) -> u64 {
193 ((self.value as u64) << 32) | (self.replica_id as u64)
194 }
195
196 pub fn tick(&mut self) -> Self {
197 let timestamp = *self;
198 self.value += 1;
199 timestamp
200 }
201
202 pub fn observe(&mut self, timestamp: Self) {
203 self.value = cmp::max(self.value, timestamp.value) + 1;
204 }
205}
206
207impl fmt::Debug for Lamport {
208 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
209 write!(f, "Lamport {{{}: {}}}", self.replica_id, self.value)
210 }
211}
212
213impl fmt::Debug for Global {
214 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
215 write!(f, "Global {{")?;
216 for timestamp in self.iter() {
217 if timestamp.replica_id > 0 {
218 write!(f, ", ")?;
219 }
220 if timestamp.replica_id == LOCAL_BRANCH_REPLICA_ID {
221 write!(f, "<branch>: {}", timestamp.value)?;
222 } else {
223 write!(f, "{}: {}", timestamp.replica_id, timestamp.value)?;
224 }
225 }
226 write!(f, "}}")
227 }
228}