1use std::{fmt::Debug, ops::Add};
2use sum_tree::{Dimension, Edit, Item, KeyedItem, SumTree, Summary};
3
4pub trait Operation: Clone + Debug {
5 fn lamport_timestamp(&self) -> clock::Lamport;
6}
7
8#[derive(Clone, Debug)]
9struct OperationItem<T>(T);
10
11#[derive(Clone, Debug)]
12pub struct OperationQueue<T: Operation>(SumTree<OperationItem<T>>);
13
14#[derive(Clone, Copy, Debug, Default, Eq, Ord, PartialEq, PartialOrd)]
15pub struct OperationKey(clock::Lamport);
16
17#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
18pub struct OperationSummary {
19 pub key: OperationKey,
20 pub len: usize,
21}
22
23impl OperationKey {
24 pub fn new(timestamp: clock::Lamport) -> Self {
25 Self(timestamp)
26 }
27}
28
29impl<T: Operation> OperationQueue<T> {
30 pub fn new() -> Self {
31 OperationQueue(SumTree::new())
32 }
33
34 pub fn len(&self) -> usize {
35 self.0.summary().len
36 }
37
38 pub fn insert(&mut self, mut ops: Vec<T>) {
39 ops.sort_by_key(|op| op.lamport_timestamp());
40 ops.dedup_by_key(|op| op.lamport_timestamp());
41 self.0.edit(
42 ops.into_iter()
43 .map(|op| Edit::Insert(OperationItem(op)))
44 .collect(),
45 &(),
46 );
47 }
48
49 pub fn drain(&mut self) -> Self {
50 let clone = self.clone();
51 self.0 = SumTree::new();
52 clone
53 }
54
55 pub fn iter(&self) -> impl Iterator<Item = &T> {
56 self.0.iter().map(|i| &i.0)
57 }
58}
59
60impl Summary for OperationSummary {
61 type Context = ();
62
63 fn add_summary(&mut self, other: &Self, _: &()) {
64 assert!(self.key < other.key);
65 self.key = other.key;
66 self.len += other.len;
67 }
68}
69
70impl<'a> Add<&'a Self> for OperationSummary {
71 type Output = Self;
72
73 fn add(self, other: &Self) -> Self {
74 assert!(self.key < other.key);
75 OperationSummary {
76 key: other.key,
77 len: self.len + other.len,
78 }
79 }
80}
81
82impl<'a> Dimension<'a, OperationSummary> for OperationKey {
83 fn add_summary(&mut self, summary: &OperationSummary, _: &()) {
84 assert!(*self <= summary.key);
85 *self = summary.key;
86 }
87}
88
89impl<T: Operation> Item for OperationItem<T> {
90 type Summary = OperationSummary;
91
92 fn summary(&self) -> Self::Summary {
93 OperationSummary {
94 key: OperationKey::new(self.0.lamport_timestamp()),
95 len: 1,
96 }
97 }
98}
99
100impl<T: Operation> KeyedItem for OperationItem<T> {
101 type Key = OperationKey;
102
103 fn key(&self) -> Self::Key {
104 OperationKey::new(self.0.lamport_timestamp())
105 }
106}
107
108#[cfg(test)]
109mod tests {
110 use super::*;
111
112 #[test]
113 fn test_len() {
114 let mut clock = clock::Lamport::new(0);
115
116 let mut queue = OperationQueue::new();
117 assert_eq!(queue.len(), 0);
118
119 queue.insert(vec![
120 TestOperation(clock.tick()),
121 TestOperation(clock.tick()),
122 ]);
123 assert_eq!(queue.len(), 2);
124
125 queue.insert(vec![TestOperation(clock.tick())]);
126 assert_eq!(queue.len(), 3);
127
128 drop(queue.drain());
129 assert_eq!(queue.len(), 0);
130
131 queue.insert(vec![TestOperation(clock.tick())]);
132 assert_eq!(queue.len(), 1);
133 }
134
135 #[derive(Clone, Debug, Eq, PartialEq)]
136 struct TestOperation(clock::Lamport);
137
138 impl Operation for TestOperation {
139 fn lamport_timestamp(&self) -> clock::Lamport {
140 self.0
141 }
142 }
143}