time.rs

  1use smallvec::SmallVec;
  2use std::cmp::{self, Ordering};
  3use std::ops::{Add, AddAssign};
  4
  5pub type ReplicaId = u16;
  6pub type Seq = u32;
  7
  8#[derive(Clone, Copy, Debug, Default, Eq, Hash, PartialEq, Ord, PartialOrd)]
  9pub struct Local {
 10    pub replica_id: ReplicaId,
 11    pub value: Seq,
 12}
 13
 14#[derive(Clone, Copy, Debug, Default, Eq, Hash, Ord, PartialEq, PartialOrd)]
 15pub struct Lamport {
 16    pub value: Seq,
 17    pub replica_id: ReplicaId,
 18}
 19
 20impl Local {
 21    pub fn new(replica_id: ReplicaId) -> Self {
 22        Self {
 23            replica_id,
 24            value: 1,
 25        }
 26    }
 27
 28    pub fn tick(&mut self) -> Self {
 29        let timestamp = *self;
 30        self.value += 1;
 31        timestamp
 32    }
 33
 34    pub fn observe(&mut self, timestamp: Self) {
 35        if timestamp.replica_id == self.replica_id {
 36            self.value = cmp::max(self.value, timestamp.value + 1);
 37        }
 38    }
 39}
 40
 41impl<'a> Add<&'a Self> for Local {
 42    type Output = Local;
 43
 44    fn add(self, other: &'a Self) -> Self::Output {
 45        cmp::max(&self, other).clone()
 46    }
 47}
 48
 49impl<'a> AddAssign<&'a Local> for Local {
 50    fn add_assign(&mut self, other: &Self) {
 51        if *self < *other {
 52            *self = other.clone();
 53        }
 54    }
 55}
 56
 57#[derive(Clone, Debug, Default, Eq, PartialEq)]
 58pub struct Global(SmallVec<[Local; 3]>);
 59
 60impl Global {
 61    pub fn new() -> Self {
 62        Self::default()
 63    }
 64
 65    pub fn get(&self, replica_id: ReplicaId) -> Seq {
 66        self.0
 67            .iter()
 68            .find(|t| t.replica_id == replica_id)
 69            .map_or(0, |t| t.value)
 70    }
 71
 72    pub fn observe(&mut self, timestamp: Local) {
 73        if let Some(entry) = self
 74            .0
 75            .iter_mut()
 76            .find(|t| t.replica_id == timestamp.replica_id)
 77        {
 78            entry.value = cmp::max(entry.value, timestamp.value);
 79        } else {
 80            self.0.push(timestamp);
 81        }
 82    }
 83
 84    pub fn observe_all(&mut self, other: &Self) {
 85        for timestamp in other.0.iter() {
 86            self.observe(*timestamp);
 87        }
 88    }
 89
 90    pub fn observed(&self, timestamp: Local) -> bool {
 91        self.get(timestamp.replica_id) >= timestamp.value
 92    }
 93
 94    pub fn changed_since(&self, other: &Self) -> bool {
 95        self.0.iter().any(|t| t.value > other.get(t.replica_id))
 96    }
 97}
 98
 99impl PartialOrd for Global {
100    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
101        let mut global_ordering = Ordering::Equal;
102
103        for timestamp in self.0.iter().chain(other.0.iter()) {
104            let ordering = self
105                .get(timestamp.replica_id)
106                .cmp(&other.get(timestamp.replica_id));
107            if ordering != Ordering::Equal {
108                if global_ordering == Ordering::Equal {
109                    global_ordering = ordering;
110                } else if ordering != global_ordering {
111                    return None;
112                }
113            }
114        }
115
116        Some(global_ordering)
117    }
118}
119
120impl Lamport {
121    pub fn new(replica_id: ReplicaId) -> Self {
122        Self {
123            value: 1,
124            replica_id,
125        }
126    }
127
128    pub fn tick(&mut self) -> Self {
129        let timestamp = *self;
130        self.value += 1;
131        timestamp
132    }
133
134    pub fn observe(&mut self, timestamp: Self) {
135        self.value = cmp::max(self.value, timestamp.value) + 1;
136    }
137}