clock.rs

  1mod system_clock;
  2
  3use serde::{Deserialize, Serialize};
  4use smallvec::SmallVec;
  5use std::{
  6    cmp::{self, Ordering},
  7    fmt, iter,
  8};
  9
 10pub use system_clock::*;
 11
 12/// A unique identifier for each distributed node.
 13pub type ReplicaId = u16;
 14
 15/// A [Lamport sequence number](https://en.wikipedia.org/wiki/Lamport_timestamp).
 16pub type Seq = u32;
 17
 18/// A [Lamport timestamp](https://en.wikipedia.org/wiki/Lamport_timestamp),
 19/// used to determine the ordering of events in the editor.
 20#[derive(Clone, Copy, Default, Eq, Hash, PartialEq, Serialize, Deserialize)]
 21pub struct Lamport {
 22    pub replica_id: ReplicaId,
 23    pub value: Seq,
 24}
 25
 26/// A [vector clock](https://en.wikipedia.org/wiki/Vector_clock).
 27#[derive(Clone, Default, Hash, Eq, PartialEq)]
 28pub struct Global(SmallVec<[u32; 8]>);
 29
 30impl Global {
 31    pub fn new() -> Self {
 32        Self::default()
 33    }
 34
 35    pub fn get(&self, replica_id: ReplicaId) -> Seq {
 36        self.0.get(replica_id as usize).copied().unwrap_or(0) as Seq
 37    }
 38
 39    pub fn observe(&mut self, timestamp: Lamport) {
 40        if timestamp.value > 0 {
 41            let new_len = timestamp.replica_id as usize + 1;
 42            if new_len > self.0.len() {
 43                self.0.resize(new_len, 0);
 44            }
 45
 46            let entry = &mut self.0[timestamp.replica_id as usize];
 47            *entry = cmp::max(*entry, timestamp.value);
 48        }
 49    }
 50
 51    pub fn join(&mut self, other: &Self) {
 52        if other.0.len() > self.0.len() {
 53            self.0.resize(other.0.len(), 0);
 54        }
 55
 56        for (left, right) in self.0.iter_mut().zip(&other.0) {
 57            *left = cmp::max(*left, *right);
 58        }
 59    }
 60
 61    pub fn meet(&mut self, other: &Self) {
 62        if other.0.len() > self.0.len() {
 63            self.0.resize(other.0.len(), 0);
 64        }
 65
 66        let mut new_len = 0;
 67        for (ix, (left, right)) in self
 68            .0
 69            .iter_mut()
 70            .zip(other.0.iter().chain(iter::repeat(&0)))
 71            .enumerate()
 72        {
 73            if *left == 0 {
 74                *left = *right;
 75            } else if *right > 0 {
 76                *left = cmp::min(*left, *right);
 77            }
 78
 79            if *left != 0 {
 80                new_len = ix + 1;
 81            }
 82        }
 83        self.0.resize(new_len, 0);
 84    }
 85
 86    pub fn observed(&self, timestamp: Lamport) -> bool {
 87        self.get(timestamp.replica_id) >= timestamp.value
 88    }
 89
 90    pub fn observed_any(&self, other: &Self) -> bool {
 91        self.0
 92            .iter()
 93            .zip(other.0.iter())
 94            .any(|(left, right)| *right > 0 && left >= right)
 95    }
 96
 97    pub fn observed_all(&self, other: &Self) -> bool {
 98        let mut rhs = other.0.iter();
 99        self.0.iter().all(|left| match rhs.next() {
100            Some(right) => left >= right,
101            None => true,
102        }) && rhs.next().is_none()
103    }
104
105    pub fn changed_since(&self, other: &Self) -> bool {
106        self.0.len() > other.0.len()
107            || self
108                .0
109                .iter()
110                .zip(other.0.iter())
111                .any(|(left, right)| left > right)
112    }
113
114    pub fn iter(&self) -> impl Iterator<Item = Lamport> + '_ {
115        self.0.iter().enumerate().map(|(replica_id, seq)| Lamport {
116            replica_id: replica_id as ReplicaId,
117            value: *seq,
118        })
119    }
120}
121
122impl FromIterator<Lamport> for Global {
123    fn from_iter<T: IntoIterator<Item = Lamport>>(locals: T) -> Self {
124        let mut result = Self::new();
125        for local in locals {
126            result.observe(local);
127        }
128        result
129    }
130}
131
132impl Ord for Lamport {
133    fn cmp(&self, other: &Self) -> Ordering {
134        // Use the replica id to break ties between concurrent events.
135        self.value
136            .cmp(&other.value)
137            .then_with(|| self.replica_id.cmp(&other.replica_id))
138    }
139}
140
141impl PartialOrd for Lamport {
142    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
143        Some(self.cmp(other))
144    }
145}
146
147impl Lamport {
148    pub const MIN: Self = Self {
149        replica_id: ReplicaId::MIN,
150        value: Seq::MIN,
151    };
152
153    pub const MAX: Self = Self {
154        replica_id: ReplicaId::MAX,
155        value: Seq::MAX,
156    };
157
158    pub fn new(replica_id: ReplicaId) -> Self {
159        Self {
160            value: 1,
161            replica_id,
162        }
163    }
164
165    pub fn as_u64(self) -> u64 {
166        ((self.value as u64) << 32) | (self.replica_id as u64)
167    }
168
169    pub fn tick(&mut self) -> Self {
170        let timestamp = *self;
171        self.value += 1;
172        timestamp
173    }
174
175    pub fn observe(&mut self, timestamp: Self) {
176        self.value = cmp::max(self.value, timestamp.value) + 1;
177    }
178}
179
180impl fmt::Debug for Lamport {
181    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
182        write!(f, "Lamport {{{}: {}}}", self.replica_id, self.value)
183    }
184}
185
186impl fmt::Debug for Global {
187    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
188        write!(f, "Global {{")?;
189        for timestamp in self.iter() {
190            if timestamp.replica_id > 0 {
191                write!(f, ", ")?;
192            }
193            write!(f, "{}: {}", timestamp.replica_id, timestamp.value)?;
194        }
195        write!(f, "}}")
196    }
197}