clock.rs

  1mod system_clock;
  2
  3use serde::{Deserialize, Serialize};
  4use smallvec::SmallVec;
  5use std::{
  6    cmp::{self, Ordering},
  7    fmt, iter,
  8};
  9
 10pub use system_clock::*;
 11
 12pub const LOCAL_BRANCH_REPLICA_ID: u16 = u16::MAX;
 13pub const AGENT_REPLICA_ID: u16 = u16::MAX - 1;
 14
 15/// A unique identifier for each distributed node.
 16pub type ReplicaId = u16;
 17
 18/// A [Lamport sequence number](https://en.wikipedia.org/wiki/Lamport_timestamp).
 19pub type Seq = u32;
 20
 21/// A [Lamport timestamp](https://en.wikipedia.org/wiki/Lamport_timestamp),
 22/// used to determine the ordering of events in the editor.
 23#[derive(Clone, Copy, Default, Eq, Hash, PartialEq, Serialize, Deserialize)]
 24pub struct Lamport {
 25    pub replica_id: ReplicaId,
 26    pub value: Seq,
 27}
 28
 29/// A [vector clock](https://en.wikipedia.org/wiki/Vector_clock).
 30#[derive(Clone, Default, Hash, Eq, PartialEq)]
 31pub struct Global {
 32    values: SmallVec<[u32; 8]>,
 33    local_branch_value: u32,
 34}
 35
 36impl Global {
 37    pub fn new() -> Self {
 38        Self::default()
 39    }
 40
 41    pub fn get(&self, replica_id: ReplicaId) -> Seq {
 42        if replica_id == LOCAL_BRANCH_REPLICA_ID {
 43            self.local_branch_value
 44        } else {
 45            self.values.get(replica_id as usize).copied().unwrap_or(0) as Seq
 46        }
 47    }
 48
 49    pub fn observe(&mut self, timestamp: Lamport) {
 50        if timestamp.value > 0 {
 51            if timestamp.replica_id == LOCAL_BRANCH_REPLICA_ID {
 52                self.local_branch_value = cmp::max(self.local_branch_value, timestamp.value);
 53            } else {
 54                let new_len = timestamp.replica_id as usize + 1;
 55                if new_len > self.values.len() {
 56                    self.values.resize(new_len, 0);
 57                }
 58
 59                let entry = &mut self.values[timestamp.replica_id as usize];
 60                *entry = cmp::max(*entry, timestamp.value);
 61            }
 62        }
 63    }
 64
 65    pub fn join(&mut self, other: &Self) {
 66        if other.values.len() > self.values.len() {
 67            self.values.resize(other.values.len(), 0);
 68        }
 69
 70        for (left, right) in self.values.iter_mut().zip(&other.values) {
 71            *left = cmp::max(*left, *right);
 72        }
 73
 74        self.local_branch_value = cmp::max(self.local_branch_value, other.local_branch_value);
 75    }
 76
 77    pub fn meet(&mut self, other: &Self) {
 78        if other.values.len() > self.values.len() {
 79            self.values.resize(other.values.len(), 0);
 80        }
 81
 82        let mut new_len = 0;
 83        for (ix, (left, right)) in self
 84            .values
 85            .iter_mut()
 86            .zip(other.values.iter().chain(iter::repeat(&0)))
 87            .enumerate()
 88        {
 89            if *left == 0 {
 90                *left = *right;
 91            } else if *right > 0 {
 92                *left = cmp::min(*left, *right);
 93            }
 94
 95            if *left != 0 {
 96                new_len = ix + 1;
 97            }
 98        }
 99        self.values.resize(new_len, 0);
100        self.local_branch_value = cmp::min(self.local_branch_value, other.local_branch_value);
101    }
102
103    pub fn observed(&self, timestamp: Lamport) -> bool {
104        self.get(timestamp.replica_id) >= timestamp.value
105    }
106
107    pub fn observed_any(&self, other: &Self) -> bool {
108        self.values
109            .iter()
110            .zip(other.values.iter())
111            .any(|(left, right)| *right > 0 && left >= right)
112            || (other.local_branch_value > 0 && self.local_branch_value >= other.local_branch_value)
113    }
114
115    pub fn observed_all(&self, other: &Self) -> bool {
116        let mut rhs = other.values.iter();
117        self.values.iter().all(|left| match rhs.next() {
118            Some(right) => left >= right,
119            None => true,
120        }) && rhs.next().is_none()
121            && self.local_branch_value >= other.local_branch_value
122    }
123
124    pub fn changed_since(&self, other: &Self) -> bool {
125        self.values.len() > other.values.len()
126            || self
127                .values
128                .iter()
129                .zip(other.values.iter())
130                .any(|(left, right)| left > right)
131            || self.local_branch_value > other.local_branch_value
132    }
133
134    pub fn iter(&self) -> impl Iterator<Item = Lamport> + '_ {
135        self.values
136            .iter()
137            .enumerate()
138            .map(|(replica_id, seq)| Lamport {
139                replica_id: replica_id as ReplicaId,
140                value: *seq,
141            })
142            .chain((self.local_branch_value > 0).then_some(Lamport {
143                replica_id: LOCAL_BRANCH_REPLICA_ID,
144                value: self.local_branch_value,
145            }))
146    }
147}
148
149impl FromIterator<Lamport> for Global {
150    fn from_iter<T: IntoIterator<Item = Lamport>>(locals: T) -> Self {
151        let mut result = Self::new();
152        for local in locals {
153            result.observe(local);
154        }
155        result
156    }
157}
158
159impl Ord for Lamport {
160    fn cmp(&self, other: &Self) -> Ordering {
161        // Use the replica id to break ties between concurrent events.
162        self.value
163            .cmp(&other.value)
164            .then_with(|| self.replica_id.cmp(&other.replica_id))
165    }
166}
167
168impl PartialOrd for Lamport {
169    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
170        Some(self.cmp(other))
171    }
172}
173
174impl Lamport {
175    pub const MIN: Self = Self {
176        replica_id: ReplicaId::MIN,
177        value: Seq::MIN,
178    };
179
180    pub const MAX: Self = Self {
181        replica_id: ReplicaId::MAX,
182        value: Seq::MAX,
183    };
184
185    pub fn new(replica_id: ReplicaId) -> Self {
186        Self {
187            value: 1,
188            replica_id,
189        }
190    }
191
192    pub fn as_u64(self) -> u64 {
193        ((self.value as u64) << 32) | (self.replica_id as u64)
194    }
195
196    pub fn tick(&mut self) -> Self {
197        let timestamp = *self;
198        self.value += 1;
199        timestamp
200    }
201
202    pub fn observe(&mut self, timestamp: Self) {
203        self.value = cmp::max(self.value, timestamp.value) + 1;
204    }
205}
206
207impl fmt::Debug for Lamport {
208    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
209        write!(f, "Lamport {{{}: {}}}", self.replica_id, self.value)
210    }
211}
212
213impl fmt::Debug for Global {
214    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
215        write!(f, "Global {{")?;
216        for timestamp in self.iter() {
217            if timestamp.replica_id > 0 {
218                write!(f, ", ")?;
219            }
220            if timestamp.replica_id == LOCAL_BRANCH_REPLICA_ID {
221                write!(f, "<branch>: {}", timestamp.value)?;
222            } else {
223                write!(f, "{}: {}", timestamp.replica_id, timestamp.value)?;
224            }
225        }
226        write!(f, "}}")
227    }
228}