1use clock::Lamport;
  2use std::{fmt::Debug, ops::Add};
  3use sum_tree::{ContextLessSummary, Dimension, Edit, Item, KeyedItem, SumTree};
  4
  5pub trait Operation: Clone + Debug {
  6    fn lamport_timestamp(&self) -> clock::Lamport;
  7}
  8
  9#[derive(Clone, Debug)]
 10struct OperationItem<T>(T);
 11
 12#[derive(Clone, Debug)]
 13pub struct OperationQueue<T: Operation>(SumTree<OperationItem<T>>);
 14
 15#[derive(Clone, Copy, Debug, Eq, Ord, PartialEq, PartialOrd)]
 16pub struct OperationKey(clock::Lamport);
 17
 18#[derive(Clone, Copy, Debug, Eq, PartialEq)]
 19pub struct OperationSummary {
 20    pub key: OperationKey,
 21    pub len: usize,
 22}
 23
 24impl OperationKey {
 25    pub fn new(timestamp: clock::Lamport) -> Self {
 26        Self(timestamp)
 27    }
 28}
 29
 30impl<T: Operation> Default for OperationQueue<T> {
 31    fn default() -> Self {
 32        OperationQueue::new()
 33    }
 34}
 35
 36impl<T: Operation> OperationQueue<T> {
 37    pub fn new() -> Self {
 38        OperationQueue(SumTree::default())
 39    }
 40
 41    pub fn len(&self) -> usize {
 42        self.0.summary().len
 43    }
 44
 45    pub fn is_empty(&self) -> bool {
 46        self.len() == 0
 47    }
 48
 49    pub fn insert(&mut self, mut ops: Vec<T>) {
 50        ops.sort_by_key(|op| op.lamport_timestamp());
 51        ops.dedup_by_key(|op| op.lamport_timestamp());
 52        self.0.edit(
 53            ops.into_iter()
 54                .map(|op| Edit::Insert(OperationItem(op)))
 55                .collect(),
 56            (),
 57        );
 58    }
 59
 60    pub fn drain(&mut self) -> Self {
 61        let clone = self.clone();
 62        self.0 = SumTree::default();
 63        clone
 64    }
 65
 66    pub fn iter(&self) -> impl Iterator<Item = &T> {
 67        self.0.iter().map(|i| &i.0)
 68    }
 69}
 70
 71impl ContextLessSummary for OperationSummary {
 72    fn zero() -> Self {
 73        OperationSummary {
 74            key: OperationKey::new(Lamport::MIN),
 75            len: 0,
 76        }
 77    }
 78
 79    fn add_summary(&mut self, other: &Self) {
 80        assert!(self.key < other.key);
 81        self.key = other.key;
 82        self.len += other.len;
 83    }
 84}
 85
 86impl Add<&Self> for OperationSummary {
 87    type Output = Self;
 88
 89    fn add(self, other: &Self) -> Self {
 90        assert!(self.key < other.key);
 91        OperationSummary {
 92            key: other.key,
 93            len: self.len + other.len,
 94        }
 95    }
 96}
 97
 98impl Dimension<'_, OperationSummary> for OperationKey {
 99    fn zero(_cx: ()) -> Self {
100        OperationKey::new(Lamport::MIN)
101    }
102
103    fn add_summary(&mut self, summary: &OperationSummary, _: ()) {
104        assert!(*self <= summary.key);
105        *self = summary.key;
106    }
107}
108
109impl<T: Operation> Item for OperationItem<T> {
110    type Summary = OperationSummary;
111
112    fn summary(&self, _cx: ()) -> Self::Summary {
113        OperationSummary {
114            key: OperationKey::new(self.0.lamport_timestamp()),
115            len: 1,
116        }
117    }
118}
119
120impl<T: Operation> KeyedItem for OperationItem<T> {
121    type Key = OperationKey;
122
123    fn key(&self) -> Self::Key {
124        OperationKey::new(self.0.lamport_timestamp())
125    }
126}
127
128#[cfg(test)]
129mod tests {
130    use clock::ReplicaId;
131
132    use super::*;
133
134    #[test]
135    fn test_len() {
136        let mut clock = clock::Lamport::new(ReplicaId::LOCAL);
137
138        let mut queue = OperationQueue::new();
139        assert_eq!(queue.len(), 0);
140
141        queue.insert(vec![
142            TestOperation(clock.tick()),
143            TestOperation(clock.tick()),
144        ]);
145        assert_eq!(queue.len(), 2);
146
147        queue.insert(vec![TestOperation(clock.tick())]);
148        assert_eq!(queue.len(), 3);
149
150        drop(queue.drain());
151        assert_eq!(queue.len(), 0);
152
153        queue.insert(vec![TestOperation(clock.tick())]);
154        assert_eq!(queue.len(), 1);
155    }
156
157    #[derive(Clone, Debug, Eq, PartialEq)]
158    struct TestOperation(clock::Lamport);
159
160    impl Operation for TestOperation {
161        fn lamport_timestamp(&self) -> clock::Lamport {
162            self.0
163        }
164    }
165}