1mod system_clock;
2
3use serde::{Deserialize, Serialize};
4use smallvec::SmallVec;
5use std::{
6 cmp::{self, Ordering},
7 fmt, iter,
8};
9
10pub use system_clock::*;
11
12/// A unique identifier for each distributed node.
13pub type ReplicaId = u16;
14
15/// A [Lamport sequence number](https://en.wikipedia.org/wiki/Lamport_timestamp).
16pub type Seq = u32;
17
18/// A [Lamport timestamp](https://en.wikipedia.org/wiki/Lamport_timestamp),
19/// used to determine the ordering of events in the editor.
20#[derive(Clone, Copy, Default, Eq, Hash, PartialEq, Serialize, Deserialize)]
21pub struct Lamport {
22 pub replica_id: ReplicaId,
23 pub value: Seq,
24}
25
26/// A [vector clock](https://en.wikipedia.org/wiki/Vector_clock).
27#[derive(Clone, Default, Hash, Eq, PartialEq)]
28pub struct Global(SmallVec<[u32; 8]>);
29
30impl Global {
31 pub fn new() -> Self {
32 Self::default()
33 }
34
35 pub fn get(&self, replica_id: ReplicaId) -> Seq {
36 self.0.get(replica_id as usize).copied().unwrap_or(0) as Seq
37 }
38
39 pub fn observe(&mut self, timestamp: Lamport) {
40 if timestamp.value > 0 {
41 let new_len = timestamp.replica_id as usize + 1;
42 if new_len > self.0.len() {
43 self.0.resize(new_len, 0);
44 }
45
46 let entry = &mut self.0[timestamp.replica_id as usize];
47 *entry = cmp::max(*entry, timestamp.value);
48 }
49 }
50
51 pub fn join(&mut self, other: &Self) {
52 if other.0.len() > self.0.len() {
53 self.0.resize(other.0.len(), 0);
54 }
55
56 for (left, right) in self.0.iter_mut().zip(&other.0) {
57 *left = cmp::max(*left, *right);
58 }
59 }
60
61 pub fn meet(&mut self, other: &Self) {
62 if other.0.len() > self.0.len() {
63 self.0.resize(other.0.len(), 0);
64 }
65
66 let mut new_len = 0;
67 for (ix, (left, right)) in self
68 .0
69 .iter_mut()
70 .zip(other.0.iter().chain(iter::repeat(&0)))
71 .enumerate()
72 {
73 if *left == 0 {
74 *left = *right;
75 } else if *right > 0 {
76 *left = cmp::min(*left, *right);
77 }
78
79 if *left != 0 {
80 new_len = ix + 1;
81 }
82 }
83 self.0.resize(new_len, 0);
84 }
85
86 pub fn observed(&self, timestamp: Lamport) -> bool {
87 self.get(timestamp.replica_id) >= timestamp.value
88 }
89
90 pub fn observed_any(&self, other: &Self) -> bool {
91 self.0
92 .iter()
93 .zip(other.0.iter())
94 .any(|(left, right)| *right > 0 && left >= right)
95 }
96
97 pub fn observed_all(&self, other: &Self) -> bool {
98 let mut rhs = other.0.iter();
99 self.0.iter().all(|left| match rhs.next() {
100 Some(right) => left >= right,
101 None => true,
102 }) && rhs.next().is_none()
103 }
104
105 pub fn changed_since(&self, other: &Self) -> bool {
106 self.0.len() > other.0.len()
107 || self
108 .0
109 .iter()
110 .zip(other.0.iter())
111 .any(|(left, right)| left > right)
112 }
113
114 pub fn iter(&self) -> impl Iterator<Item = Lamport> + '_ {
115 self.0.iter().enumerate().map(|(replica_id, seq)| Lamport {
116 replica_id: replica_id as ReplicaId,
117 value: *seq,
118 })
119 }
120}
121
122impl FromIterator<Lamport> for Global {
123 fn from_iter<T: IntoIterator<Item = Lamport>>(locals: T) -> Self {
124 let mut result = Self::new();
125 for local in locals {
126 result.observe(local);
127 }
128 result
129 }
130}
131
132impl Ord for Lamport {
133 fn cmp(&self, other: &Self) -> Ordering {
134 // Use the replica id to break ties between concurrent events.
135 self.value
136 .cmp(&other.value)
137 .then_with(|| self.replica_id.cmp(&other.replica_id))
138 }
139}
140
141impl PartialOrd for Lamport {
142 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
143 Some(self.cmp(other))
144 }
145}
146
147impl Lamport {
148 pub const MIN: Self = Self {
149 replica_id: ReplicaId::MIN,
150 value: Seq::MIN,
151 };
152
153 pub const MAX: Self = Self {
154 replica_id: ReplicaId::MAX,
155 value: Seq::MAX,
156 };
157
158 pub fn new(replica_id: ReplicaId) -> Self {
159 Self {
160 value: 1,
161 replica_id,
162 }
163 }
164
165 pub fn as_u64(self) -> u64 {
166 ((self.value as u64) << 32) | (self.replica_id as u64)
167 }
168
169 pub fn tick(&mut self) -> Self {
170 let timestamp = *self;
171 self.value += 1;
172 timestamp
173 }
174
175 pub fn observe(&mut self, timestamp: Self) {
176 self.value = cmp::max(self.value, timestamp.value) + 1;
177 }
178}
179
180impl fmt::Debug for Lamport {
181 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
182 write!(f, "Lamport {{{}: {}}}", self.replica_id, self.value)
183 }
184}
185
186impl fmt::Debug for Global {
187 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
188 write!(f, "Global {{")?;
189 for timestamp in self.iter() {
190 if timestamp.replica_id > 0 {
191 write!(f, ", ")?;
192 }
193 write!(f, "{}: {}", timestamp.replica_id, timestamp.value)?;
194 }
195 write!(f, "}}")
196 }
197}