clock.rs

  1mod system_clock;
  2
  3use serde::{Deserialize, Serialize};
  4use smallvec::SmallVec;
  5use std::{
  6    cmp::{self, Ordering},
  7    fmt,
  8};
  9
 10pub use system_clock::*;
 11
 12/// A unique identifier for each distributed node.
 13#[derive(Clone, Copy, Default, Eq, Hash, PartialEq, Ord, PartialOrd, Serialize, Deserialize)]
 14pub struct ReplicaId(u16);
 15
 16impl ReplicaId {
 17    /// The local replica
 18    pub const LOCAL: ReplicaId = ReplicaId(0);
 19    /// The remote replica of the connected remote server.
 20    pub const REMOTE_SERVER: ReplicaId = ReplicaId(1);
 21    /// The agent's unique identifier.
 22    pub const AGENT: ReplicaId = ReplicaId(2);
 23    /// A local branch.
 24    pub const LOCAL_BRANCH: ReplicaId = ReplicaId(3);
 25    /// The first collaborative replica ID, any replica equal or greater than this is a collaborative replica.
 26    pub const FIRST_COLLAB_ID: ReplicaId = ReplicaId(8);
 27
 28    pub fn new(id: u16) -> Self {
 29        ReplicaId(id)
 30    }
 31
 32    pub fn as_u16(&self) -> u16 {
 33        self.0
 34    }
 35
 36    pub fn is_remote(self) -> bool {
 37        self == ReplicaId::REMOTE_SERVER || self >= ReplicaId::FIRST_COLLAB_ID
 38    }
 39}
 40
 41impl fmt::Debug for ReplicaId {
 42    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
 43        if *self == ReplicaId::LOCAL {
 44            write!(f, "<local>")
 45        } else if *self == ReplicaId::REMOTE_SERVER {
 46            write!(f, "<remote>")
 47        } else if *self == ReplicaId::AGENT {
 48            write!(f, "<agent>")
 49        } else if *self == ReplicaId::LOCAL_BRANCH {
 50            write!(f, "<branch>")
 51        } else {
 52            write!(f, "{}", self.0)
 53        }
 54    }
 55}
 56
 57/// A [Lamport sequence number](https://en.wikipedia.org/wiki/Lamport_timestamp).
 58pub type Seq = u32;
 59
 60/// A [Lamport timestamp](https://en.wikipedia.org/wiki/Lamport_timestamp),
 61/// used to determine the ordering of events in the editor.
 62#[derive(Clone, Copy, Eq, Hash, PartialEq, Serialize, Deserialize)]
 63pub struct Lamport {
 64    pub replica_id: ReplicaId,
 65    pub value: Seq,
 66}
 67
 68/// A [version vector](https://en.wikipedia.org/wiki/Version_vector).
 69#[derive(Clone, Default, Hash, Eq, PartialEq)]
 70pub struct Global {
 71    // 4 is chosen as it is the biggest count that does not increase the size of the field itself.
 72    // Coincidentally, it also covers all the important non-collab replica ids.
 73    values: SmallVec<[u32; 4]>,
 74}
 75
 76impl Global {
 77    pub fn new() -> Self {
 78        Self::default()
 79    }
 80
 81    /// Fetches the sequence number for the given replica ID.
 82    pub fn get(&self, replica_id: ReplicaId) -> Seq {
 83        self.values.get(replica_id.0 as usize).copied().unwrap_or(0) as Seq
 84    }
 85
 86    /// Observe the lamport timestampe.
 87    ///
 88    /// This sets the current sequence number of the observed replica ID to the maximum of this global's observed sequence and the observed timestamp.
 89    pub fn observe(&mut self, timestamp: Lamport) {
 90        debug_assert_ne!(timestamp.replica_id, Lamport::MAX.replica_id);
 91        if timestamp.value > 0 {
 92            let new_len = timestamp.replica_id.0 as usize + 1;
 93            if new_len > self.values.len() {
 94                self.values.resize(new_len, 0);
 95            }
 96
 97            let entry = &mut self.values[timestamp.replica_id.0 as usize];
 98            *entry = cmp::max(*entry, timestamp.value);
 99        }
100    }
101
102    /// Join another global.
103    ///
104    /// This observes all timestamps from the other global.
105    #[doc(alias = "synchronize")]
106    pub fn join(&mut self, other: &Self) {
107        if other.values.len() > self.values.len() {
108            self.values.resize(other.values.len(), 0);
109        }
110
111        for (left, right) in self.values.iter_mut().zip(&other.values) {
112            *left = cmp::max(*left, *right);
113        }
114    }
115
116    /// Meet another global.
117    ///
118    /// Sets all unobserved timestamps of this global to the sequences of other and sets all observed timestamps of this global to the minimum observed of both globals.
119    pub fn meet(&mut self, other: &Self) {
120        if other.values.len() > self.values.len() {
121            self.values.resize(other.values.len(), 0);
122        }
123
124        let mut new_len = 0;
125        for (ix, (left, &right)) in self.values.iter_mut().zip(&other.values).enumerate() {
126            match (*left, right) {
127                // left has not observed the replica
128                (0, _) => *left = right,
129                // right has not observed the replica
130                (_, 0) => (),
131                (_, _) => *left = cmp::min(*left, right),
132            }
133            if *left != 0 {
134                new_len = ix + 1;
135            }
136        }
137        if other.values.len() == self.values.len() {
138            // only truncate if other was equal or shorter (which at this point
139            // cant be due to the resize above) to `self` as otherwise we would
140            // truncate the unprocessed tail that is guaranteed to contain
141            // non-null timestamps
142            self.values.truncate(new_len);
143        }
144    }
145
146    pub fn observed(&self, timestamp: Lamport) -> bool {
147        self.get(timestamp.replica_id) >= timestamp.value
148    }
149
150    pub fn observed_any(&self, other: &Self) -> bool {
151        self.iter()
152            .zip(other.iter())
153            .any(|(left, right)| right.value > 0 && left.value >= right.value)
154    }
155
156    pub fn observed_all(&self, other: &Self) -> bool {
157        if self.values.len() < other.values.len() {
158            return false;
159        }
160        self.iter()
161            .zip(other.iter())
162            .all(|(left, right)| left.value >= right.value)
163    }
164
165    pub fn changed_since(&self, other: &Self) -> bool {
166        self.values.len() > other.values.len()
167            || self
168                .values
169                .iter()
170                .zip(other.values.iter())
171                .any(|(left, right)| left > right)
172    }
173
174    pub fn most_recent(&self) -> Option<Lamport> {
175        self.iter().max_by_key(|timestamp| timestamp.value)
176    }
177
178    /// Iterates all replicas observed by this global as well as any unobserved replicas whose ID is lower than the highest observed replica.
179    pub fn iter(&self) -> impl Iterator<Item = Lamport> + '_ {
180        self.values
181            .iter()
182            .enumerate()
183            .map(|(replica_id, seq)| Lamport {
184                replica_id: ReplicaId(replica_id as u16),
185                value: *seq,
186            })
187    }
188}
189
190impl FromIterator<Lamport> for Global {
191    fn from_iter<T: IntoIterator<Item = Lamport>>(locals: T) -> Self {
192        let mut result = Self::new();
193        for local in locals {
194            result.observe(local);
195        }
196        result
197    }
198}
199
200impl Ord for Lamport {
201    fn cmp(&self, other: &Self) -> Ordering {
202        // Use the replica id to break ties between concurrent events.
203        self.value
204            .cmp(&other.value)
205            .then_with(|| self.replica_id.cmp(&other.replica_id))
206    }
207}
208
209impl PartialOrd for Lamport {
210    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
211        Some(self.cmp(other))
212    }
213}
214
215impl Lamport {
216    pub const MIN: Self = Self {
217        replica_id: ReplicaId(u16::MIN),
218        value: Seq::MIN,
219    };
220
221    pub const MAX: Self = Self {
222        replica_id: ReplicaId(u16::MAX),
223        value: Seq::MAX,
224    };
225
226    pub fn new(replica_id: ReplicaId) -> Self {
227        Self {
228            value: 1,
229            replica_id,
230        }
231    }
232
233    pub fn as_u64(self) -> u64 {
234        ((self.value as u64) << 32) | (self.replica_id.0 as u64)
235    }
236
237    pub fn tick(&mut self) -> Self {
238        let timestamp = *self;
239        self.value += 1;
240        timestamp
241    }
242
243    pub fn observe(&mut self, timestamp: Self) {
244        self.value = cmp::max(self.value, timestamp.value) + 1;
245    }
246}
247
248impl fmt::Debug for Lamport {
249    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
250        if *self == Self::MAX {
251            write!(f, "Lamport {{MAX}}")
252        } else if *self == Self::MIN {
253            write!(f, "Lamport {{MIN}}")
254        } else {
255            write!(f, "Lamport {{{:?}: {}}}", self.replica_id, self.value)
256        }
257    }
258}
259
260impl fmt::Debug for Global {
261    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
262        write!(f, "Global {{")?;
263        for timestamp in self.iter() {
264            if timestamp.replica_id.0 > 0 {
265                write!(f, ", ")?;
266            }
267            write!(f, "{:?}: {}", timestamp.replica_id, timestamp.value)?;
268        }
269        write!(f, "}}")
270    }
271}