clock.rs

  1mod system_clock;
  2
  3use serde::{Deserialize, Serialize};
  4use smallvec::SmallVec;
  5use std::{
  6    cmp::{self, Ordering},
  7    fmt,
  8};
  9
 10pub use system_clock::*;
 11
 12/// A unique identifier for each distributed node.
 13#[derive(Clone, Copy, Default, Eq, Hash, PartialEq, Ord, PartialOrd, Serialize, Deserialize)]
 14pub struct ReplicaId(u16);
 15
 16impl ReplicaId {
 17    /// The local replica
 18    pub const LOCAL: ReplicaId = ReplicaId(0);
 19    /// The remote replica of the connected remote server.
 20    pub const REMOTE_SERVER: ReplicaId = ReplicaId(1);
 21    /// The agent's unique identifier.
 22    pub const AGENT: ReplicaId = ReplicaId(2);
 23    /// A local branch.
 24    pub const LOCAL_BRANCH: ReplicaId = ReplicaId(3);
 25    /// The first collaborative replica ID, any replica equal or greater than this is a collaborative replica.
 26    pub const FIRST_COLLAB_ID: ReplicaId = ReplicaId(8);
 27
 28    pub fn new(id: u16) -> Self {
 29        ReplicaId(id)
 30    }
 31
 32    pub fn as_u16(&self) -> u16 {
 33        self.0
 34    }
 35
 36    pub fn is_remote(self) -> bool {
 37        self == ReplicaId::REMOTE_SERVER || self >= ReplicaId::FIRST_COLLAB_ID
 38    }
 39}
 40
 41impl fmt::Debug for ReplicaId {
 42    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
 43        if *self == ReplicaId::LOCAL {
 44            write!(f, "<local>")
 45        } else if *self == ReplicaId::REMOTE_SERVER {
 46            write!(f, "<remote>")
 47        } else if *self == ReplicaId::AGENT {
 48            write!(f, "<agent>")
 49        } else if *self == ReplicaId::LOCAL_BRANCH {
 50            write!(f, "<branch>")
 51        } else {
 52            write!(f, "{}", self.0)
 53        }
 54    }
 55}
 56
 57/// A [Lamport sequence number](https://en.wikipedia.org/wiki/Lamport_timestamp).
 58pub type Seq = u32;
 59
 60/// A [Lamport timestamp](https://en.wikipedia.org/wiki/Lamport_timestamp),
 61/// used to determine the ordering of events in the editor.
 62#[derive(Clone, Copy, Eq, Hash, PartialEq, Serialize, Deserialize)]
 63pub struct Lamport {
 64    pub value: Seq,
 65    pub replica_id: ReplicaId,
 66}
 67
 68/// A [version vector](https://en.wikipedia.org/wiki/Version_vector).
 69#[derive(Default, Hash, Eq, PartialEq)]
 70pub struct Global {
 71    // 4 is chosen as it is the biggest count that does not increase the size of the field itself.
 72    // Coincidentally, it also covers all the important non-collab replica ids.
 73    values: SmallVec<[u32; 4]>,
 74}
 75
 76impl Clone for Global {
 77    fn clone(&self) -> Self {
 78        // We manually implement clone to avoid the overhead of SmallVec's clone implementation.
 79        // Using `from_slice` is faster than `clone` for SmallVec as we can use our `Copy` implementation of u32.
 80        Self {
 81            values: SmallVec::from_slice(&self.values),
 82        }
 83    }
 84
 85    fn clone_from(&mut self, source: &Self) {
 86        self.values.clone_from(&source.values);
 87    }
 88}
 89
 90impl Global {
 91    pub fn new() -> Self {
 92        Self::default()
 93    }
 94
 95    /// Fetches the sequence number for the given replica ID.
 96    pub fn get(&self, replica_id: ReplicaId) -> Seq {
 97        self.values.get(replica_id.0 as usize).copied().unwrap_or(0) as Seq
 98    }
 99
100    /// Observe the lamport timestamp.
101    ///
102    /// This sets the current sequence number of the observed replica ID to the maximum of this global's observed sequence and the observed timestamp.
103    pub fn observe(&mut self, timestamp: Lamport) {
104        debug_assert_ne!(timestamp.replica_id, Lamport::MAX.replica_id);
105        if timestamp.value > 0 {
106            let new_len = timestamp.replica_id.0 as usize + 1;
107            if new_len > self.values.len() {
108                self.values.resize(new_len, 0);
109            }
110
111            let entry = &mut self.values[timestamp.replica_id.0 as usize];
112            *entry = cmp::max(*entry, timestamp.value);
113        }
114    }
115
116    /// Join another global.
117    ///
118    /// This observes all timestamps from the other global.
119    #[doc(alias = "synchronize")]
120    pub fn join(&mut self, other: &Self) {
121        if other.values.len() > self.values.len() {
122            self.values.resize(other.values.len(), 0);
123        }
124
125        for (left, right) in self.values.iter_mut().zip(&other.values) {
126            *left = cmp::max(*left, *right);
127        }
128    }
129
130    /// Meet another global.
131    ///
132    /// Sets all unobserved timestamps of this global to the sequences of other and sets all observed timestamps of this global to the minimum observed of both globals.
133    pub fn meet(&mut self, other: &Self) {
134        if other.values.len() > self.values.len() {
135            self.values.resize(other.values.len(), 0);
136        }
137
138        let mut new_len = 0;
139        for (ix, (left, &right)) in self.values.iter_mut().zip(&other.values).enumerate() {
140            match (*left, right) {
141                // left has not observed the replica
142                (0, _) => *left = right,
143                // right has not observed the replica
144                (_, 0) => (),
145                (_, _) => *left = cmp::min(*left, right),
146            }
147            if *left != 0 {
148                new_len = ix + 1;
149            }
150        }
151        if other.values.len() == self.values.len() {
152            // only truncate if other was equal or shorter (which at this point
153            // cant be due to the resize above) to `self` as otherwise we would
154            // truncate the unprocessed tail that is guaranteed to contain
155            // non-null timestamps
156            self.values.truncate(new_len);
157        }
158    }
159
160    pub fn observed(&self, timestamp: Lamport) -> bool {
161        self.get(timestamp.replica_id) >= timestamp.value
162    }
163
164    pub fn observed_any(&self, other: &Self) -> bool {
165        self.iter()
166            .zip(other.iter())
167            .any(|(left, right)| right.value > 0 && left.value >= right.value)
168    }
169
170    pub fn observed_all(&self, other: &Self) -> bool {
171        if self.values.len() < other.values.len() {
172            return false;
173        }
174        self.iter()
175            .zip(other.iter())
176            .all(|(left, right)| left.value >= right.value)
177    }
178
179    pub fn changed_since(&self, other: &Self) -> bool {
180        self.values.len() > other.values.len()
181            || self
182                .values
183                .iter()
184                .zip(other.values.iter())
185                .any(|(left, right)| left > right)
186    }
187
188    pub fn most_recent(&self) -> Option<Lamport> {
189        self.iter().max_by_key(|timestamp| timestamp.value)
190    }
191
192    /// Iterates all replicas observed by this global as well as any unobserved replicas whose ID is lower than the highest observed replica.
193    pub fn iter(&self) -> impl Iterator<Item = Lamport> + '_ {
194        self.values
195            .iter()
196            .enumerate()
197            .map(|(replica_id, seq)| Lamport {
198                replica_id: ReplicaId(replica_id as u16),
199                value: *seq,
200            })
201    }
202}
203
204impl FromIterator<Lamport> for Global {
205    fn from_iter<T: IntoIterator<Item = Lamport>>(locals: T) -> Self {
206        let mut result = Self::new();
207        for local in locals {
208            result.observe(local);
209        }
210        result
211    }
212}
213
214impl Ord for Lamport {
215    fn cmp(&self, other: &Self) -> Ordering {
216        // Use the replica id to break ties between concurrent events.
217        self.value
218            .cmp(&other.value)
219            .then_with(|| self.replica_id.cmp(&other.replica_id))
220    }
221}
222
223impl PartialOrd for Lamport {
224    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
225        Some(self.cmp(other))
226    }
227}
228
229impl Lamport {
230    pub const MIN: Self = Self {
231        replica_id: ReplicaId(u16::MIN),
232        value: Seq::MIN,
233    };
234
235    pub const MAX: Self = Self {
236        replica_id: ReplicaId(u16::MAX),
237        value: Seq::MAX,
238    };
239
240    pub fn new(replica_id: ReplicaId) -> Self {
241        Self {
242            value: 1,
243            replica_id,
244        }
245    }
246
247    pub fn as_u64(self) -> u64 {
248        ((self.value as u64) << 32) | (self.replica_id.0 as u64)
249    }
250
251    pub fn tick(&mut self) -> Self {
252        let timestamp = *self;
253        self.value += 1;
254        timestamp
255    }
256
257    pub fn observe(&mut self, timestamp: Self) {
258        self.value = cmp::max(self.value, timestamp.value) + 1;
259    }
260}
261
262impl fmt::Debug for Lamport {
263    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
264        if *self == Self::MAX {
265            write!(f, "Lamport {{MAX}}")
266        } else if *self == Self::MIN {
267            write!(f, "Lamport {{MIN}}")
268        } else {
269            write!(f, "Lamport {{{:?}: {}}}", self.replica_id, self.value)
270        }
271    }
272}
273
274impl fmt::Debug for Global {
275    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
276        write!(f, "Global {{")?;
277        for timestamp in self.iter().filter(|t| t.value > 0) {
278            if timestamp.replica_id.0 > 0 {
279                write!(f, ", ")?;
280            }
281            write!(f, "{:?}: {}", timestamp.replica_id, timestamp.value)?;
282        }
283        write!(f, "}}")
284    }
285}