operation_queue.rs

  1use crate::{
  2    sum_tree::{Cursor, Dimension, Edit, Item, KeyedItem, SumTree},
  3    time,
  4};
  5use std::{
  6    fmt::Debug,
  7    ops::{Add, AddAssign},
  8};
  9
 10pub trait Operation: Clone + Debug + Eq {
 11    fn timestamp(&self) -> time::Lamport;
 12}
 13
 14#[derive(Clone, Debug)]
 15pub struct OperationQueue<T: Operation>(SumTree<T>);
 16
 17#[derive(Clone, Copy, Debug, Default, Eq, Ord, PartialEq, PartialOrd)]
 18pub struct OperationKey(time::Lamport);
 19
 20#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
 21pub struct OperationSummary {
 22    key: OperationKey,
 23    len: usize,
 24}
 25
 26impl<T: Operation> OperationQueue<T> {
 27    pub fn new() -> Self {
 28        OperationQueue(SumTree::new())
 29    }
 30
 31    pub fn len(&self) -> usize {
 32        self.0.summary().len
 33    }
 34
 35    pub fn insert(&mut self, mut ops: Vec<T>) {
 36        ops.sort_by_key(|op| op.timestamp());
 37        ops.dedup_by_key(|op| op.timestamp());
 38        let mut edits = ops
 39            .into_iter()
 40            .map(|op| Edit::Insert(op))
 41            .collect::<Vec<_>>();
 42        self.0.edit(&mut edits);
 43    }
 44
 45    pub fn drain(&mut self) -> Self {
 46        let clone = self.clone();
 47        self.0 = SumTree::new();
 48        clone
 49    }
 50
 51    pub fn cursor(&self) -> Cursor<T, (), ()> {
 52        self.0.cursor()
 53    }
 54}
 55
 56impl<T: Operation> Item for T {
 57    type Summary = OperationSummary;
 58
 59    fn summary(&self) -> Self::Summary {
 60        OperationSummary {
 61            key: OperationKey(self.timestamp()),
 62            len: 1,
 63        }
 64    }
 65}
 66
 67impl<T: Operation> KeyedItem for T {
 68    type Key = OperationKey;
 69
 70    fn key(&self) -> Self::Key {
 71        OperationKey(self.timestamp())
 72    }
 73}
 74
 75impl<'a> AddAssign<&'a Self> for OperationSummary {
 76    fn add_assign(&mut self, other: &Self) {
 77        assert!(self.key < other.key);
 78        self.key = other.key;
 79        self.len += other.len;
 80    }
 81}
 82
 83impl<'a> Add<&'a Self> for OperationSummary {
 84    type Output = Self;
 85
 86    fn add(self, other: &Self) -> Self {
 87        assert!(self.key < other.key);
 88        OperationSummary {
 89            key: other.key,
 90            len: self.len + other.len,
 91        }
 92    }
 93}
 94
 95impl<'a> Dimension<'a, OperationSummary> for OperationKey {
 96    fn add_summary(&mut self, summary: &OperationSummary) {
 97        assert!(*self <= summary.key);
 98        *self = summary.key;
 99    }
100}
101
102#[cfg(test)]
103mod tests {
104    use super::*;
105
106    #[test]
107    fn test_len() {
108        let mut clock = time::Lamport::new(0);
109
110        let mut queue = OperationQueue::new();
111        assert_eq!(queue.len(), 0);
112
113        queue.insert(vec![
114            TestOperation(clock.tick()),
115            TestOperation(clock.tick()),
116        ]);
117        assert_eq!(queue.len(), 2);
118
119        queue.insert(vec![TestOperation(clock.tick())]);
120        assert_eq!(queue.len(), 3);
121
122        drop(queue.drain());
123        assert_eq!(queue.len(), 0);
124
125        queue.insert(vec![TestOperation(clock.tick())]);
126        assert_eq!(queue.len(), 1);
127    }
128
129    #[derive(Clone, Debug, Eq, PartialEq)]
130    struct TestOperation(time::Lamport);
131
132    impl Operation for TestOperation {
133        fn timestamp(&self) -> time::Lamport {
134            self.0
135        }
136    }
137}