1use crate::{
2 sum_tree::{Cursor, Dimension, Edit, Item, KeyedItem, SumTree},
3 time,
4};
5use std::{
6 fmt::Debug,
7 ops::{Add, AddAssign},
8};
9
10pub trait Operation: Clone + Debug + Eq {
11 fn timestamp(&self) -> time::Lamport;
12}
13
14#[derive(Clone, Debug)]
15pub struct OperationQueue<T: Operation>(SumTree<T>);
16
17#[derive(Clone, Copy, Debug, Default, Eq, Ord, PartialEq, PartialOrd)]
18pub struct OperationKey(time::Lamport);
19
20#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
21pub struct OperationSummary {
22 key: OperationKey,
23 len: usize,
24}
25
26impl<T: Operation> OperationQueue<T> {
27 pub fn new() -> Self {
28 OperationQueue(SumTree::new())
29 }
30
31 pub fn len(&self) -> usize {
32 self.0.summary().len
33 }
34
35 pub fn insert(&mut self, mut ops: Vec<T>) {
36 ops.sort_by_key(|op| op.timestamp());
37 ops.dedup_by_key(|op| op.timestamp());
38 let mut edits = ops
39 .into_iter()
40 .map(|op| Edit::Insert(op))
41 .collect::<Vec<_>>();
42 self.0.edit(&mut edits);
43 }
44
45 pub fn drain(&mut self) -> Self {
46 let clone = self.clone();
47 self.0 = SumTree::new();
48 clone
49 }
50
51 pub fn cursor(&self) -> Cursor<T, (), ()> {
52 self.0.cursor()
53 }
54}
55
56impl<T: Operation> Item for T {
57 type Summary = OperationSummary;
58
59 fn summary(&self) -> Self::Summary {
60 OperationSummary {
61 key: OperationKey(self.timestamp()),
62 len: 1,
63 }
64 }
65}
66
67impl<T: Operation> KeyedItem for T {
68 type Key = OperationKey;
69
70 fn key(&self) -> Self::Key {
71 OperationKey(self.timestamp())
72 }
73}
74
75impl<'a> AddAssign<&'a Self> for OperationSummary {
76 fn add_assign(&mut self, other: &Self) {
77 assert!(self.key < other.key);
78 self.key = other.key;
79 self.len += other.len;
80 }
81}
82
83impl<'a> Add<&'a Self> for OperationSummary {
84 type Output = Self;
85
86 fn add(self, other: &Self) -> Self {
87 assert!(self.key < other.key);
88 OperationSummary {
89 key: other.key,
90 len: self.len + other.len,
91 }
92 }
93}
94
95impl<'a> Dimension<'a, OperationSummary> for OperationKey {
96 fn add_summary(&mut self, summary: &OperationSummary) {
97 assert!(*self <= summary.key);
98 *self = summary.key;
99 }
100}
101
102#[cfg(test)]
103mod tests {
104 use super::*;
105
106 #[test]
107 fn test_len() {
108 let mut clock = time::Lamport::new(0);
109
110 let mut queue = OperationQueue::new();
111 assert_eq!(queue.len(), 0);
112
113 queue.insert(vec![
114 TestOperation(clock.tick()),
115 TestOperation(clock.tick()),
116 ]);
117 assert_eq!(queue.len(), 2);
118
119 queue.insert(vec![TestOperation(clock.tick())]);
120 assert_eq!(queue.len(), 3);
121
122 drop(queue.drain());
123 assert_eq!(queue.len(), 0);
124
125 queue.insert(vec![TestOperation(clock.tick())]);
126 assert_eq!(queue.len(), 1);
127 }
128
129 #[derive(Clone, Debug, Eq, PartialEq)]
130 struct TestOperation(time::Lamport);
131
132 impl Operation for TestOperation {
133 fn timestamp(&self) -> time::Lamport {
134 self.0
135 }
136 }
137}