operation_queue.rs

  1use crate::{
  2    sum_tree::{Cursor, Dimension, Edit, Item, KeyedItem, SumTree, Summary},
  3    time,
  4};
  5use std::{fmt::Debug, ops::Add};
  6
  7pub trait Operation: Clone + Debug + Eq {
  8    fn timestamp(&self) -> time::Lamport;
  9}
 10
 11#[derive(Clone, Debug)]
 12pub struct OperationQueue<T: Operation>(SumTree<T>);
 13
 14#[derive(Clone, Copy, Debug, Default, Eq, Ord, PartialEq, PartialOrd)]
 15pub struct OperationKey(time::Lamport);
 16
 17#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
 18pub struct OperationSummary {
 19    key: OperationKey,
 20    len: usize,
 21}
 22
 23impl<T: Operation> OperationQueue<T> {
 24    pub fn new() -> Self {
 25        OperationQueue(SumTree::new())
 26    }
 27
 28    pub fn len(&self) -> usize {
 29        self.0.summary().len
 30    }
 31
 32    pub fn insert(&mut self, mut ops: Vec<T>) {
 33        ops.sort_by_key(|op| op.timestamp());
 34        ops.dedup_by_key(|op| op.timestamp());
 35        self.0
 36            .edit(ops.into_iter().map(Edit::Insert).collect(), &());
 37    }
 38
 39    pub fn drain(&mut self) -> Self {
 40        let clone = self.clone();
 41        self.0 = SumTree::new();
 42        clone
 43    }
 44
 45    pub fn cursor(&self) -> Cursor<T, (), ()> {
 46        self.0.cursor()
 47    }
 48}
 49
 50impl<T: Operation> Item for T {
 51    type Summary = OperationSummary;
 52
 53    fn summary(&self) -> Self::Summary {
 54        OperationSummary {
 55            key: OperationKey(self.timestamp()),
 56            len: 1,
 57        }
 58    }
 59}
 60
 61impl<T: Operation> KeyedItem for T {
 62    type Key = OperationKey;
 63
 64    fn key(&self) -> Self::Key {
 65        OperationKey(self.timestamp())
 66    }
 67}
 68
 69impl Summary for OperationSummary {
 70    type Context = ();
 71
 72    fn add_summary(&mut self, other: &Self, _: &()) {
 73        assert!(self.key < other.key);
 74        self.key = other.key;
 75        self.len += other.len;
 76    }
 77}
 78
 79impl<'a> Add<&'a Self> for OperationSummary {
 80    type Output = Self;
 81
 82    fn add(self, other: &Self) -> Self {
 83        assert!(self.key < other.key);
 84        OperationSummary {
 85            key: other.key,
 86            len: self.len + other.len,
 87        }
 88    }
 89}
 90
 91impl<'a> Dimension<'a, OperationSummary> for OperationKey {
 92    fn add_summary(&mut self, summary: &OperationSummary, _: &()) {
 93        assert!(*self <= summary.key);
 94        *self = summary.key;
 95    }
 96}
 97
 98#[cfg(test)]
 99mod tests {
100    use super::*;
101
102    #[test]
103    fn test_len() {
104        let mut clock = time::Lamport::new(0);
105
106        let mut queue = OperationQueue::new();
107        assert_eq!(queue.len(), 0);
108
109        queue.insert(vec![
110            TestOperation(clock.tick()),
111            TestOperation(clock.tick()),
112        ]);
113        assert_eq!(queue.len(), 2);
114
115        queue.insert(vec![TestOperation(clock.tick())]);
116        assert_eq!(queue.len(), 3);
117
118        drop(queue.drain());
119        assert_eq!(queue.len(), 0);
120
121        queue.insert(vec![TestOperation(clock.tick())]);
122        assert_eq!(queue.len(), 1);
123    }
124
125    #[derive(Clone, Debug, Eq, PartialEq)]
126    struct TestOperation(time::Lamport);
127
128    impl Operation for TestOperation {
129        fn timestamp(&self) -> time::Lamport {
130            self.0
131        }
132    }
133}