1use smallvec::SmallVec;
2use std::{
3 cmp::{self, Ordering},
4 fmt, iter,
5};
6
7/// A unique identifier for each distributed node
8pub type ReplicaId = u16;
9
10/// A [Lamport sequence number](https://en.wikipedia.org/wiki/Lamport_timestamp),
11pub type Seq = u32;
12
13/// A [Lamport timestamp](https://en.wikipedia.org/wiki/Lamport_timestamp),
14/// used to determine the ordering of events in the editor.
15#[derive(Clone, Copy, Default, Eq, Hash, PartialEq)]
16pub struct Lamport {
17 pub replica_id: ReplicaId,
18 pub value: Seq,
19}
20
21/// A [vector clock](https://en.wikipedia.org/wiki/Vector_clock)
22#[derive(Clone, Default, Hash, Eq, PartialEq)]
23pub struct Global(SmallVec<[u32; 8]>);
24
25impl Global {
26 pub fn new() -> Self {
27 Self::default()
28 }
29
30 pub fn get(&self, replica_id: ReplicaId) -> Seq {
31 self.0.get(replica_id as usize).copied().unwrap_or(0) as Seq
32 }
33
34 pub fn observe(&mut self, timestamp: Lamport) {
35 if timestamp.value > 0 {
36 let new_len = timestamp.replica_id as usize + 1;
37 if new_len > self.0.len() {
38 self.0.resize(new_len, 0);
39 }
40
41 let entry = &mut self.0[timestamp.replica_id as usize];
42 *entry = cmp::max(*entry, timestamp.value);
43 }
44 }
45
46 pub fn join(&mut self, other: &Self) {
47 if other.0.len() > self.0.len() {
48 self.0.resize(other.0.len(), 0);
49 }
50
51 for (left, right) in self.0.iter_mut().zip(&other.0) {
52 *left = cmp::max(*left, *right);
53 }
54 }
55
56 pub fn meet(&mut self, other: &Self) {
57 if other.0.len() > self.0.len() {
58 self.0.resize(other.0.len(), 0);
59 }
60
61 let mut new_len = 0;
62 for (ix, (left, right)) in self
63 .0
64 .iter_mut()
65 .zip(other.0.iter().chain(iter::repeat(&0)))
66 .enumerate()
67 {
68 if *left == 0 {
69 *left = *right;
70 } else if *right > 0 {
71 *left = cmp::min(*left, *right);
72 }
73
74 if *left != 0 {
75 new_len = ix + 1;
76 }
77 }
78 self.0.resize(new_len, 0);
79 }
80
81 pub fn observed(&self, timestamp: Lamport) -> bool {
82 self.get(timestamp.replica_id) >= timestamp.value
83 }
84
85 pub fn observed_any(&self, other: &Self) -> bool {
86 let mut lhs = self.0.iter();
87 let mut rhs = other.0.iter();
88 loop {
89 if let Some(left) = lhs.next() {
90 if let Some(right) = rhs.next() {
91 if *right > 0 && left >= right {
92 return true;
93 }
94 } else {
95 return false;
96 }
97 } else {
98 return false;
99 }
100 }
101 }
102
103 pub fn observed_all(&self, other: &Self) -> bool {
104 let mut lhs = self.0.iter();
105 let mut rhs = other.0.iter();
106 loop {
107 if let Some(left) = lhs.next() {
108 if let Some(right) = rhs.next() {
109 if left < right {
110 return false;
111 }
112 } else {
113 return true;
114 }
115 } else {
116 return rhs.next().is_none();
117 }
118 }
119 }
120
121 pub fn changed_since(&self, other: &Self) -> bool {
122 if self.0.len() > other.0.len() {
123 return true;
124 }
125 for (left, right) in self.0.iter().zip(other.0.iter()) {
126 if left > right {
127 return true;
128 }
129 }
130 false
131 }
132
133 pub fn iter(&self) -> impl Iterator<Item = Lamport> + '_ {
134 self.0.iter().enumerate().map(|(replica_id, seq)| Lamport {
135 replica_id: replica_id as ReplicaId,
136 value: *seq,
137 })
138 }
139}
140
141impl FromIterator<Lamport> for Global {
142 fn from_iter<T: IntoIterator<Item = Lamport>>(locals: T) -> Self {
143 let mut result = Self::new();
144 for local in locals {
145 result.observe(local);
146 }
147 result
148 }
149}
150
151impl Ord for Lamport {
152 fn cmp(&self, other: &Self) -> Ordering {
153 // Use the replica id to break ties between concurrent events.
154 self.value
155 .cmp(&other.value)
156 .then_with(|| self.replica_id.cmp(&other.replica_id))
157 }
158}
159
160impl PartialOrd for Lamport {
161 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
162 Some(self.cmp(other))
163 }
164}
165
166impl Lamport {
167 pub const MIN: Self = Self {
168 replica_id: ReplicaId::MIN,
169 value: Seq::MIN,
170 };
171
172 pub const MAX: Self = Self {
173 replica_id: ReplicaId::MAX,
174 value: Seq::MAX,
175 };
176
177 pub fn new(replica_id: ReplicaId) -> Self {
178 Self {
179 value: 1,
180 replica_id,
181 }
182 }
183
184 pub fn tick(&mut self) -> Self {
185 let timestamp = *self;
186 self.value += 1;
187 timestamp
188 }
189
190 pub fn observe(&mut self, timestamp: Self) {
191 self.value = cmp::max(self.value, timestamp.value) + 1;
192 }
193}
194
195impl fmt::Debug for Lamport {
196 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
197 write!(f, "Lamport {{{}: {}}}", self.replica_id, self.value)
198 }
199}
200
201impl fmt::Debug for Global {
202 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
203 write!(f, "Global {{")?;
204 for timestamp in self.iter() {
205 if timestamp.replica_id > 0 {
206 write!(f, ", ")?;
207 }
208 write!(f, "{}: {}", timestamp.replica_id, timestamp.value)?;
209 }
210 write!(f, "}}")
211 }
212}