1use super::Operation;
2use std::{fmt::Debug, ops::Add};
3use sum_tree::{Cursor, Dimension, Edit, Item, KeyedItem, SumTree, Summary};
4
5#[derive(Clone, Debug)]
6pub struct OperationQueue(SumTree<Operation>);
7
8#[derive(Clone, Copy, Debug, Default, Eq, Ord, PartialEq, PartialOrd)]
9pub struct OperationKey(clock::Lamport);
10
11#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
12pub struct OperationSummary {
13 pub key: OperationKey,
14 pub len: usize,
15}
16
17impl OperationKey {
18 pub fn new(timestamp: clock::Lamport) -> Self {
19 Self(timestamp)
20 }
21}
22
23impl OperationQueue {
24 pub fn new() -> Self {
25 OperationQueue(SumTree::new())
26 }
27
28 pub fn len(&self) -> usize {
29 self.0.summary().len
30 }
31
32 pub fn insert(&mut self, mut ops: Vec<Operation>) {
33 ops.sort_by_key(|op| op.lamport_timestamp());
34 ops.dedup_by_key(|op| op.lamport_timestamp());
35 self.0
36 .edit(ops.into_iter().map(Edit::Insert).collect(), &());
37 }
38
39 pub fn drain(&mut self) -> Self {
40 let clone = self.clone();
41 self.0 = SumTree::new();
42 clone
43 }
44
45 pub fn cursor(&self) -> Cursor<Operation, ()> {
46 self.0.cursor()
47 }
48}
49
50impl Summary for OperationSummary {
51 type Context = ();
52
53 fn add_summary(&mut self, other: &Self, _: &()) {
54 assert!(self.key < other.key);
55 self.key = other.key;
56 self.len += other.len;
57 }
58}
59
60impl<'a> Add<&'a Self> for OperationSummary {
61 type Output = Self;
62
63 fn add(self, other: &Self) -> Self {
64 assert!(self.key < other.key);
65 OperationSummary {
66 key: other.key,
67 len: self.len + other.len,
68 }
69 }
70}
71
72impl<'a> Dimension<'a, OperationSummary> for OperationKey {
73 fn add_summary(&mut self, summary: &OperationSummary, _: &()) {
74 assert!(*self <= summary.key);
75 *self = summary.key;
76 }
77}
78
79impl Item for Operation {
80 type Summary = OperationSummary;
81
82 fn summary(&self) -> Self::Summary {
83 OperationSummary {
84 key: OperationKey::new(self.lamport_timestamp()),
85 len: 1,
86 }
87 }
88}
89
90impl KeyedItem for Operation {
91 type Key = OperationKey;
92
93 fn key(&self) -> Self::Key {
94 OperationKey::new(self.lamport_timestamp())
95 }
96}
97
98#[cfg(test)]
99mod tests {
100 use super::*;
101
102 #[test]
103 fn test_len() {
104 let mut clock = clock::Lamport::new(0);
105
106 let mut queue = OperationQueue::new();
107 assert_eq!(queue.len(), 0);
108
109 queue.insert(vec![
110 Operation::Test(clock.tick()),
111 Operation::Test(clock.tick()),
112 ]);
113 assert_eq!(queue.len(), 2);
114
115 queue.insert(vec![Operation::Test(clock.tick())]);
116 assert_eq!(queue.len(), 3);
117
118 drop(queue.drain());
119 assert_eq!(queue.len(), 0);
120
121 queue.insert(vec![Operation::Test(clock.tick())]);
122 assert_eq!(queue.len(), 1);
123 }
124
125 #[derive(Clone, Debug, Eq, PartialEq)]
126 struct TestOperation(clock::Lamport);
127}