operation_queue.rs

  1use std::{fmt::Debug, ops::Add};
  2use sum_tree::{Dimension, Edit, Item, KeyedItem, SumTree, Summary};
  3
  4pub trait Operation: Clone + Debug {
  5    fn lamport_timestamp(&self) -> clock::Lamport;
  6}
  7
  8#[derive(Clone, Debug)]
  9struct OperationItem<T>(T);
 10
 11#[derive(Clone, Debug)]
 12pub struct OperationQueue<T: Operation>(SumTree<OperationItem<T>>);
 13
 14#[derive(Clone, Copy, Debug, Default, Eq, Ord, PartialEq, PartialOrd)]
 15pub struct OperationKey(clock::Lamport);
 16
 17#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
 18pub struct OperationSummary {
 19    pub key: OperationKey,
 20    pub len: usize,
 21}
 22
 23impl OperationKey {
 24    pub fn new(timestamp: clock::Lamport) -> Self {
 25        Self(timestamp)
 26    }
 27}
 28
 29impl<T: Operation> Default for OperationQueue<T> {
 30    fn default() -> Self {
 31        OperationQueue::new()
 32    }
 33}
 34
 35impl<T: Operation> OperationQueue<T> {
 36    pub fn new() -> Self {
 37        OperationQueue(SumTree::default())
 38    }
 39
 40    pub fn len(&self) -> usize {
 41        self.0.summary().len
 42    }
 43
 44    pub fn is_empty(&self) -> bool {
 45        self.len() == 0
 46    }
 47
 48    pub fn insert(&mut self, mut ops: Vec<T>) {
 49        ops.sort_by_key(|op| op.lamport_timestamp());
 50        ops.dedup_by_key(|op| op.lamport_timestamp());
 51        self.0.edit(
 52            ops.into_iter()
 53                .map(|op| Edit::Insert(OperationItem(op)))
 54                .collect(),
 55            &(),
 56        );
 57    }
 58
 59    pub fn drain(&mut self) -> Self {
 60        let clone = self.clone();
 61        self.0 = SumTree::default();
 62        clone
 63    }
 64
 65    pub fn iter(&self) -> impl Iterator<Item = &T> {
 66        self.0.iter().map(|i| &i.0)
 67    }
 68}
 69
 70impl Summary for OperationSummary {
 71    type Context = ();
 72
 73    fn zero(_cx: &()) -> Self {
 74        Default::default()
 75    }
 76
 77    fn add_summary(&mut self, other: &Self, _: &()) {
 78        assert!(self.key < other.key);
 79        self.key = other.key;
 80        self.len += other.len;
 81    }
 82}
 83
 84impl Add<&Self> for OperationSummary {
 85    type Output = Self;
 86
 87    fn add(self, other: &Self) -> Self {
 88        assert!(self.key < other.key);
 89        OperationSummary {
 90            key: other.key,
 91            len: self.len + other.len,
 92        }
 93    }
 94}
 95
 96impl Dimension<'_, OperationSummary> for OperationKey {
 97    fn zero(_cx: &()) -> Self {
 98        Default::default()
 99    }
100
101    fn add_summary(&mut self, summary: &OperationSummary, _: &()) {
102        assert!(*self <= summary.key);
103        *self = summary.key;
104    }
105}
106
107impl<T: Operation> Item for OperationItem<T> {
108    type Summary = OperationSummary;
109
110    fn summary(&self, _cx: &()) -> Self::Summary {
111        OperationSummary {
112            key: OperationKey::new(self.0.lamport_timestamp()),
113            len: 1,
114        }
115    }
116}
117
118impl<T: Operation> KeyedItem for OperationItem<T> {
119    type Key = OperationKey;
120
121    fn key(&self) -> Self::Key {
122        OperationKey::new(self.0.lamport_timestamp())
123    }
124}
125
126#[cfg(test)]
127mod tests {
128    use super::*;
129
130    #[test]
131    fn test_len() {
132        let mut clock = clock::Lamport::new(0);
133
134        let mut queue = OperationQueue::new();
135        assert_eq!(queue.len(), 0);
136
137        queue.insert(vec![
138            TestOperation(clock.tick()),
139            TestOperation(clock.tick()),
140        ]);
141        assert_eq!(queue.len(), 2);
142
143        queue.insert(vec![TestOperation(clock.tick())]);
144        assert_eq!(queue.len(), 3);
145
146        drop(queue.drain());
147        assert_eq!(queue.len(), 0);
148
149        queue.insert(vec![TestOperation(clock.tick())]);
150        assert_eq!(queue.len(), 1);
151    }
152
153    #[derive(Clone, Debug, Eq, PartialEq)]
154    struct TestOperation(clock::Lamport);
155
156    impl Operation for TestOperation {
157        fn lamport_timestamp(&self) -> clock::Lamport {
158            self.0
159        }
160    }
161}