operation_queue.rs

  1use crate::{
  2    sum_tree::{Cursor, Dimension, Edit, Item, KeyedItem, SumTree},
  3    time,
  4};
  5use std::{
  6    fmt::Debug,
  7    ops::{Add, AddAssign},
  8};
  9
 10pub trait Operation: Clone + Debug + Eq {
 11    fn timestamp(&self) -> time::Lamport;
 12}
 13
 14#[derive(Clone, Debug)]
 15pub struct OperationQueue<T: Operation>(SumTree<T>);
 16
 17#[derive(Clone, Copy, Debug, Default, Eq, Ord, PartialEq, PartialOrd)]
 18pub struct OperationKey(time::Lamport);
 19
 20#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
 21pub struct OperationSummary {
 22    key: OperationKey,
 23    len: usize,
 24}
 25
 26impl<T: Operation> OperationQueue<T> {
 27    pub fn new() -> Self {
 28        OperationQueue(SumTree::new())
 29    }
 30
 31    #[cfg(test)]
 32    pub fn is_empty(&self) -> bool {
 33        self.0.is_empty()
 34    }
 35
 36    pub fn len(&self) -> usize {
 37        self.0.summary().len
 38    }
 39
 40    pub fn insert(&mut self, mut ops: Vec<T>) {
 41        ops.sort_by_key(|op| op.timestamp());
 42        ops.dedup_by_key(|op| op.timestamp());
 43        let mut edits = ops
 44            .into_iter()
 45            .map(|op| Edit::Insert(op))
 46            .collect::<Vec<_>>();
 47        self.0.edit(&mut edits);
 48    }
 49
 50    pub fn drain(&mut self) -> Self {
 51        let clone = self.clone();
 52        self.0 = SumTree::new();
 53        clone
 54    }
 55
 56    pub fn cursor(&self) -> Cursor<T, (), ()> {
 57        self.0.cursor()
 58    }
 59}
 60
 61impl<T: Operation> Item for T {
 62    type Summary = OperationSummary;
 63
 64    fn summary(&self) -> Self::Summary {
 65        OperationSummary {
 66            key: OperationKey(self.timestamp()),
 67            len: 1,
 68        }
 69    }
 70}
 71
 72impl<T: Operation> KeyedItem for T {
 73    type Key = OperationKey;
 74
 75    fn key(&self) -> Self::Key {
 76        OperationKey(self.timestamp())
 77    }
 78}
 79
 80impl<'a> AddAssign<&'a Self> for OperationSummary {
 81    fn add_assign(&mut self, other: &Self) {
 82        assert!(self.key < other.key);
 83        self.key = other.key;
 84        self.len += other.len;
 85    }
 86}
 87
 88impl<'a> Add<&'a Self> for OperationSummary {
 89    type Output = Self;
 90
 91    fn add(self, other: &Self) -> Self {
 92        assert!(self.key < other.key);
 93        OperationSummary {
 94            key: other.key,
 95            len: self.len + other.len,
 96        }
 97    }
 98}
 99
100impl<'a> Dimension<'a, OperationSummary> for OperationKey {
101    fn add_summary(&mut self, summary: &OperationSummary) {
102        assert!(*self <= summary.key);
103        *self = summary.key;
104    }
105}
106
107#[cfg(test)]
108mod tests {
109    use super::*;
110
111    #[test]
112    fn test_len() {
113        let mut clock = time::Lamport::new(0);
114
115        let mut queue = OperationQueue::new();
116        assert_eq!(queue.len(), 0);
117
118        queue.insert(vec![
119            TestOperation(clock.tick()),
120            TestOperation(clock.tick()),
121        ]);
122        assert_eq!(queue.len(), 2);
123
124        queue.insert(vec![TestOperation(clock.tick())]);
125        assert_eq!(queue.len(), 3);
126
127        drop(queue.drain());
128        assert_eq!(queue.len(), 0);
129
130        queue.insert(vec![TestOperation(clock.tick())]);
131        assert_eq!(queue.len(), 1);
132    }
133
134    #[derive(Clone, Debug, Eq, PartialEq)]
135    struct TestOperation(time::Lamport);
136
137    impl Operation for TestOperation {
138        fn timestamp(&self) -> time::Lamport {
139            self.0
140        }
141    }
142}