clock.rs

  1mod system_clock;
  2
  3use serde::{Deserialize, Serialize};
  4use smallvec::SmallVec;
  5use std::{
  6    cmp::{self, Ordering},
  7    fmt, iter,
  8};
  9
 10pub use system_clock::*;
 11
 12pub const LOCAL_BRANCH_REPLICA_ID: u16 = u16::MAX;
 13
 14/// A unique identifier for each distributed node.
 15pub type ReplicaId = u16;
 16
 17/// A [Lamport sequence number](https://en.wikipedia.org/wiki/Lamport_timestamp).
 18pub type Seq = u32;
 19
 20/// A [Lamport timestamp](https://en.wikipedia.org/wiki/Lamport_timestamp),
 21/// used to determine the ordering of events in the editor.
 22#[derive(Clone, Copy, Default, Eq, Hash, PartialEq, Serialize, Deserialize)]
 23pub struct Lamport {
 24    pub replica_id: ReplicaId,
 25    pub value: Seq,
 26}
 27
 28/// A [vector clock](https://en.wikipedia.org/wiki/Vector_clock).
 29#[derive(Clone, Default, Hash, Eq, PartialEq)]
 30pub struct Global {
 31    values: SmallVec<[u32; 8]>,
 32    local_branch_value: u32,
 33}
 34
 35impl Global {
 36    pub fn new() -> Self {
 37        Self::default()
 38    }
 39
 40    pub fn get(&self, replica_id: ReplicaId) -> Seq {
 41        if replica_id == LOCAL_BRANCH_REPLICA_ID {
 42            self.local_branch_value
 43        } else {
 44            self.values.get(replica_id as usize).copied().unwrap_or(0) as Seq
 45        }
 46    }
 47
 48    pub fn observe(&mut self, timestamp: Lamport) {
 49        if timestamp.value > 0 {
 50            if timestamp.replica_id == LOCAL_BRANCH_REPLICA_ID {
 51                self.local_branch_value = cmp::max(self.local_branch_value, timestamp.value);
 52            } else {
 53                let new_len = timestamp.replica_id as usize + 1;
 54                if new_len > self.values.len() {
 55                    self.values.resize(new_len, 0);
 56                }
 57
 58                let entry = &mut self.values[timestamp.replica_id as usize];
 59                *entry = cmp::max(*entry, timestamp.value);
 60            }
 61        }
 62    }
 63
 64    pub fn join(&mut self, other: &Self) {
 65        if other.values.len() > self.values.len() {
 66            self.values.resize(other.values.len(), 0);
 67        }
 68
 69        for (left, right) in self.values.iter_mut().zip(&other.values) {
 70            *left = cmp::max(*left, *right);
 71        }
 72
 73        self.local_branch_value = cmp::max(self.local_branch_value, other.local_branch_value);
 74    }
 75
 76    pub fn meet(&mut self, other: &Self) {
 77        if other.values.len() > self.values.len() {
 78            self.values.resize(other.values.len(), 0);
 79        }
 80
 81        let mut new_len = 0;
 82        for (ix, (left, right)) in self
 83            .values
 84            .iter_mut()
 85            .zip(other.values.iter().chain(iter::repeat(&0)))
 86            .enumerate()
 87        {
 88            if *left == 0 {
 89                *left = *right;
 90            } else if *right > 0 {
 91                *left = cmp::min(*left, *right);
 92            }
 93
 94            if *left != 0 {
 95                new_len = ix + 1;
 96            }
 97        }
 98        self.values.resize(new_len, 0);
 99        self.local_branch_value = cmp::min(self.local_branch_value, other.local_branch_value);
100    }
101
102    pub fn observed(&self, timestamp: Lamport) -> bool {
103        self.get(timestamp.replica_id) >= timestamp.value
104    }
105
106    pub fn observed_any(&self, other: &Self) -> bool {
107        self.values
108            .iter()
109            .zip(other.values.iter())
110            .any(|(left, right)| *right > 0 && left >= right)
111            || (other.local_branch_value > 0 && self.local_branch_value >= other.local_branch_value)
112    }
113
114    pub fn observed_all(&self, other: &Self) -> bool {
115        let mut rhs = other.values.iter();
116        self.values.iter().all(|left| match rhs.next() {
117            Some(right) => left >= right,
118            None => true,
119        }) && rhs.next().is_none()
120            && self.local_branch_value >= other.local_branch_value
121    }
122
123    pub fn changed_since(&self, other: &Self) -> bool {
124        self.values.len() > other.values.len()
125            || self
126                .values
127                .iter()
128                .zip(other.values.iter())
129                .any(|(left, right)| left > right)
130            || self.local_branch_value > other.local_branch_value
131    }
132
133    pub fn iter(&self) -> impl Iterator<Item = Lamport> + '_ {
134        self.values
135            .iter()
136            .enumerate()
137            .map(|(replica_id, seq)| Lamport {
138                replica_id: replica_id as ReplicaId,
139                value: *seq,
140            })
141            .chain((self.local_branch_value > 0).then_some(Lamport {
142                replica_id: LOCAL_BRANCH_REPLICA_ID,
143                value: self.local_branch_value,
144            }))
145    }
146}
147
148impl FromIterator<Lamport> for Global {
149    fn from_iter<T: IntoIterator<Item = Lamport>>(locals: T) -> Self {
150        let mut result = Self::new();
151        for local in locals {
152            result.observe(local);
153        }
154        result
155    }
156}
157
158impl Ord for Lamport {
159    fn cmp(&self, other: &Self) -> Ordering {
160        // Use the replica id to break ties between concurrent events.
161        self.value
162            .cmp(&other.value)
163            .then_with(|| self.replica_id.cmp(&other.replica_id))
164    }
165}
166
167impl PartialOrd for Lamport {
168    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
169        Some(self.cmp(other))
170    }
171}
172
173impl Lamport {
174    pub const MIN: Self = Self {
175        replica_id: ReplicaId::MIN,
176        value: Seq::MIN,
177    };
178
179    pub const MAX: Self = Self {
180        replica_id: ReplicaId::MAX,
181        value: Seq::MAX,
182    };
183
184    pub fn new(replica_id: ReplicaId) -> Self {
185        Self {
186            value: 1,
187            replica_id,
188        }
189    }
190
191    pub fn as_u64(self) -> u64 {
192        ((self.value as u64) << 32) | (self.replica_id as u64)
193    }
194
195    pub fn tick(&mut self) -> Self {
196        let timestamp = *self;
197        self.value += 1;
198        timestamp
199    }
200
201    pub fn observe(&mut self, timestamp: Self) {
202        self.value = cmp::max(self.value, timestamp.value) + 1;
203    }
204}
205
206impl fmt::Debug for Lamport {
207    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
208        write!(f, "Lamport {{{}: {}}}", self.replica_id, self.value)
209    }
210}
211
212impl fmt::Debug for Global {
213    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
214        write!(f, "Global {{")?;
215        for timestamp in self.iter() {
216            if timestamp.replica_id > 0 {
217                write!(f, ", ")?;
218            }
219            if timestamp.replica_id == LOCAL_BRANCH_REPLICA_ID {
220                write!(f, "<branch>: {}", timestamp.value)?;
221            } else {
222                write!(f, "{}: {}", timestamp.replica_id, timestamp.value)?;
223            }
224        }
225        write!(f, "}}")
226    }
227}