clock.rs

  1use smallvec::SmallVec;
  2use std::{
  3    cmp::{self, Ordering},
  4    fmt, iter,
  5};
  6
  7/// A unique identifier for each distributed node
  8pub type ReplicaId = u16;
  9
 10/// A [Lamport sequence number](https://en.wikipedia.org/wiki/Lamport_timestamp),
 11pub type Seq = u32;
 12
 13/// A [Lamport timestamp](https://en.wikipedia.org/wiki/Lamport_timestamp),
 14/// used to determine the ordering of events in the editor.
 15#[derive(Clone, Copy, Default, Eq, Hash, PartialEq)]
 16pub struct Lamport {
 17    pub replica_id: ReplicaId,
 18    pub value: Seq,
 19}
 20
 21/// A [vector clock](https://en.wikipedia.org/wiki/Vector_clock)
 22#[derive(Clone, Default, Hash, Eq, PartialEq)]
 23pub struct Global(SmallVec<[u32; 8]>);
 24
 25impl Global {
 26    pub fn new() -> Self {
 27        Self::default()
 28    }
 29
 30    pub fn get(&self, replica_id: ReplicaId) -> Seq {
 31        self.0.get(replica_id as usize).copied().unwrap_or(0) as Seq
 32    }
 33
 34    pub fn observe(&mut self, timestamp: Lamport) {
 35        if timestamp.value > 0 {
 36            let new_len = timestamp.replica_id as usize + 1;
 37            if new_len > self.0.len() {
 38                self.0.resize(new_len, 0);
 39            }
 40
 41            let entry = &mut self.0[timestamp.replica_id as usize];
 42            *entry = cmp::max(*entry, timestamp.value);
 43        }
 44    }
 45
 46    pub fn join(&mut self, other: &Self) {
 47        if other.0.len() > self.0.len() {
 48            self.0.resize(other.0.len(), 0);
 49        }
 50
 51        for (left, right) in self.0.iter_mut().zip(&other.0) {
 52            *left = cmp::max(*left, *right);
 53        }
 54    }
 55
 56    pub fn meet(&mut self, other: &Self) {
 57        if other.0.len() > self.0.len() {
 58            self.0.resize(other.0.len(), 0);
 59        }
 60
 61        let mut new_len = 0;
 62        for (ix, (left, right)) in self
 63            .0
 64            .iter_mut()
 65            .zip(other.0.iter().chain(iter::repeat(&0)))
 66            .enumerate()
 67        {
 68            if *left == 0 {
 69                *left = *right;
 70            } else if *right > 0 {
 71                *left = cmp::min(*left, *right);
 72            }
 73
 74            if *left != 0 {
 75                new_len = ix + 1;
 76            }
 77        }
 78        self.0.resize(new_len, 0);
 79    }
 80
 81    pub fn observed(&self, timestamp: Lamport) -> bool {
 82        self.get(timestamp.replica_id) >= timestamp.value
 83    }
 84
 85    pub fn observed_any(&self, other: &Self) -> bool {
 86        let mut lhs = self.0.iter();
 87        let mut rhs = other.0.iter();
 88        loop {
 89            if let Some(left) = lhs.next() {
 90                if let Some(right) = rhs.next() {
 91                    if *right > 0 && left >= right {
 92                        return true;
 93                    }
 94                } else {
 95                    return false;
 96                }
 97            } else {
 98                return false;
 99            }
100        }
101    }
102
103    pub fn observed_all(&self, other: &Self) -> bool {
104        let mut lhs = self.0.iter();
105        let mut rhs = other.0.iter();
106        loop {
107            if let Some(left) = lhs.next() {
108                if let Some(right) = rhs.next() {
109                    if left < right {
110                        return false;
111                    }
112                } else {
113                    return true;
114                }
115            } else {
116                return rhs.next().is_none();
117            }
118        }
119    }
120
121    pub fn changed_since(&self, other: &Self) -> bool {
122        if self.0.len() > other.0.len() {
123            return true;
124        }
125        for (left, right) in self.0.iter().zip(other.0.iter()) {
126            if left > right {
127                return true;
128            }
129        }
130        false
131    }
132
133    pub fn iter(&self) -> impl Iterator<Item = Lamport> + '_ {
134        self.0.iter().enumerate().map(|(replica_id, seq)| Lamport {
135            replica_id: replica_id as ReplicaId,
136            value: *seq,
137        })
138    }
139}
140
141impl FromIterator<Lamport> for Global {
142    fn from_iter<T: IntoIterator<Item = Lamport>>(locals: T) -> Self {
143        let mut result = Self::new();
144        for local in locals {
145            result.observe(local);
146        }
147        result
148    }
149}
150
151impl Ord for Lamport {
152    fn cmp(&self, other: &Self) -> Ordering {
153        // Use the replica id to break ties between concurrent events.
154        self.value
155            .cmp(&other.value)
156            .then_with(|| self.replica_id.cmp(&other.replica_id))
157    }
158}
159
160impl PartialOrd for Lamport {
161    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
162        Some(self.cmp(other))
163    }
164}
165
166impl Lamport {
167    pub const MIN: Self = Self {
168        replica_id: ReplicaId::MIN,
169        value: Seq::MIN,
170    };
171
172    pub const MAX: Self = Self {
173        replica_id: ReplicaId::MAX,
174        value: Seq::MAX,
175    };
176
177    pub fn new(replica_id: ReplicaId) -> Self {
178        Self {
179            value: 1,
180            replica_id,
181        }
182    }
183
184    pub fn tick(&mut self) -> Self {
185        let timestamp = *self;
186        self.value += 1;
187        timestamp
188    }
189
190    pub fn observe(&mut self, timestamp: Self) {
191        self.value = cmp::max(self.value, timestamp.value) + 1;
192    }
193}
194
195impl fmt::Debug for Lamport {
196    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
197        write!(f, "Lamport {{{}: {}}}", self.replica_id, self.value)
198    }
199}
200
201impl fmt::Debug for Global {
202    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
203        write!(f, "Global {{")?;
204        for timestamp in self.iter() {
205            if timestamp.replica_id > 0 {
206                write!(f, ", ")?;
207            }
208            write!(f, "{}: {}", timestamp.replica_id, timestamp.value)?;
209        }
210        write!(f, "}}")
211    }
212}