operation_queue.rs

  1use std::{fmt::Debug, ops::Add};
  2use sum_tree::{Dimension, Edit, Item, KeyedItem, SumTree, Summary};
  3
  4pub trait Operation: Clone + Debug {
  5    fn lamport_timestamp(&self) -> clock::Lamport;
  6}
  7
  8#[derive(Clone, Debug)]
  9struct OperationItem<T>(T);
 10
 11#[derive(Clone, Debug)]
 12pub struct OperationQueue<T: Operation>(SumTree<OperationItem<T>>);
 13
 14#[derive(Clone, Copy, Debug, Default, Eq, Ord, PartialEq, PartialOrd)]
 15pub struct OperationKey(clock::Lamport);
 16
 17#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
 18pub struct OperationSummary {
 19    pub key: OperationKey,
 20    pub len: usize,
 21}
 22
 23impl OperationKey {
 24    pub fn new(timestamp: clock::Lamport) -> Self {
 25        Self(timestamp)
 26    }
 27}
 28
 29impl<T: Operation> OperationQueue<T> {
 30    pub fn new() -> Self {
 31        OperationQueue(SumTree::new())
 32    }
 33
 34    pub fn len(&self) -> usize {
 35        self.0.summary().len
 36    }
 37
 38    pub fn insert(&mut self, mut ops: Vec<T>) {
 39        ops.sort_by_key(|op| op.lamport_timestamp());
 40        ops.dedup_by_key(|op| op.lamport_timestamp());
 41        self.0.edit(
 42            ops.into_iter()
 43                .map(|op| Edit::Insert(OperationItem(op)))
 44                .collect(),
 45            &(),
 46        );
 47    }
 48
 49    pub fn drain(&mut self) -> Self {
 50        let clone = self.clone();
 51        self.0 = SumTree::new();
 52        clone
 53    }
 54
 55    pub fn iter(&self) -> impl Iterator<Item = &T> {
 56        self.0.iter().map(|i| &i.0)
 57    }
 58}
 59
 60impl Summary for OperationSummary {
 61    type Context = ();
 62
 63    fn add_summary(&mut self, other: &Self, _: &()) {
 64        assert!(self.key < other.key);
 65        self.key = other.key;
 66        self.len += other.len;
 67    }
 68}
 69
 70impl<'a> Add<&'a Self> for OperationSummary {
 71    type Output = Self;
 72
 73    fn add(self, other: &Self) -> Self {
 74        assert!(self.key < other.key);
 75        OperationSummary {
 76            key: other.key,
 77            len: self.len + other.len,
 78        }
 79    }
 80}
 81
 82impl<'a> Dimension<'a, OperationSummary> for OperationKey {
 83    fn add_summary(&mut self, summary: &OperationSummary, _: &()) {
 84        assert!(*self <= summary.key);
 85        *self = summary.key;
 86    }
 87}
 88
 89impl<T: Operation> Item for OperationItem<T> {
 90    type Summary = OperationSummary;
 91
 92    fn summary(&self) -> Self::Summary {
 93        OperationSummary {
 94            key: OperationKey::new(self.0.lamport_timestamp()),
 95            len: 1,
 96        }
 97    }
 98}
 99
100impl<T: Operation> KeyedItem for OperationItem<T> {
101    type Key = OperationKey;
102
103    fn key(&self) -> Self::Key {
104        OperationKey::new(self.0.lamport_timestamp())
105    }
106}
107
108#[cfg(test)]
109mod tests {
110    use super::*;
111
112    #[test]
113    fn test_len() {
114        let mut clock = clock::Lamport::new(0);
115
116        let mut queue = OperationQueue::new();
117        assert_eq!(queue.len(), 0);
118
119        queue.insert(vec![
120            TestOperation(clock.tick()),
121            TestOperation(clock.tick()),
122        ]);
123        assert_eq!(queue.len(), 2);
124
125        queue.insert(vec![TestOperation(clock.tick())]);
126        assert_eq!(queue.len(), 3);
127
128        drop(queue.drain());
129        assert_eq!(queue.len(), 0);
130
131        queue.insert(vec![TestOperation(clock.tick())]);
132        assert_eq!(queue.len(), 1);
133    }
134
135    #[derive(Clone, Debug, Eq, PartialEq)]
136    struct TestOperation(clock::Lamport);
137
138    impl Operation for TestOperation {
139        fn lamport_timestamp(&self) -> clock::Lamport {
140            self.0
141        }
142    }
143}