1use std::{fmt::Debug, ops::Add};
2use sum_tree::{Dimension, Edit, Item, KeyedItem, SumTree, Summary};
3
4pub trait Operation: Clone + Debug {
5 fn lamport_timestamp(&self) -> clock::Lamport;
6}
7
8#[derive(Clone, Debug)]
9struct OperationItem<T>(T);
10
11#[derive(Clone, Debug)]
12pub struct OperationQueue<T: Operation>(SumTree<OperationItem<T>>);
13
14#[derive(Clone, Copy, Debug, Default, Eq, Ord, PartialEq, PartialOrd)]
15pub struct OperationKey(clock::Lamport);
16
17#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
18pub struct OperationSummary {
19 pub key: OperationKey,
20 pub len: usize,
21}
22
23impl OperationKey {
24 pub fn new(timestamp: clock::Lamport) -> Self {
25 Self(timestamp)
26 }
27}
28
29impl<T: Operation> Default for OperationQueue<T> {
30 fn default() -> Self {
31 OperationQueue::new()
32 }
33}
34
35impl<T: Operation> OperationQueue<T> {
36 pub fn new() -> Self {
37 OperationQueue(SumTree::default())
38 }
39
40 pub fn len(&self) -> usize {
41 self.0.summary().len
42 }
43
44 pub fn is_empty(&self) -> bool {
45 self.len() == 0
46 }
47
48 pub fn insert(&mut self, mut ops: Vec<T>) {
49 ops.sort_by_key(|op| op.lamport_timestamp());
50 ops.dedup_by_key(|op| op.lamport_timestamp());
51 self.0.edit(
52 ops.into_iter()
53 .map(|op| Edit::Insert(OperationItem(op)))
54 .collect(),
55 &(),
56 );
57 }
58
59 pub fn drain(&mut self) -> Self {
60 let clone = self.clone();
61 self.0 = SumTree::default();
62 clone
63 }
64
65 pub fn iter(&self) -> impl Iterator<Item = &T> {
66 self.0.iter().map(|i| &i.0)
67 }
68}
69
70impl Summary for OperationSummary {
71 type Context = ();
72
73 fn zero(_cx: &()) -> Self {
74 Default::default()
75 }
76
77 fn add_summary(&mut self, other: &Self, _: &()) {
78 assert!(self.key < other.key);
79 self.key = other.key;
80 self.len += other.len;
81 }
82}
83
84impl Add<&Self> for OperationSummary {
85 type Output = Self;
86
87 fn add(self, other: &Self) -> Self {
88 assert!(self.key < other.key);
89 OperationSummary {
90 key: other.key,
91 len: self.len + other.len,
92 }
93 }
94}
95
96impl Dimension<'_, OperationSummary> for OperationKey {
97 fn zero(_cx: &()) -> Self {
98 Default::default()
99 }
100
101 fn add_summary(&mut self, summary: &OperationSummary, _: &()) {
102 assert!(*self <= summary.key);
103 *self = summary.key;
104 }
105}
106
107impl<T: Operation> Item for OperationItem<T> {
108 type Summary = OperationSummary;
109
110 fn summary(&self, _cx: &()) -> Self::Summary {
111 OperationSummary {
112 key: OperationKey::new(self.0.lamport_timestamp()),
113 len: 1,
114 }
115 }
116}
117
118impl<T: Operation> KeyedItem for OperationItem<T> {
119 type Key = OperationKey;
120
121 fn key(&self) -> Self::Key {
122 OperationKey::new(self.0.lamport_timestamp())
123 }
124}
125
126#[cfg(test)]
127mod tests {
128 use super::*;
129
130 #[test]
131 fn test_len() {
132 let mut clock = clock::Lamport::new(0);
133
134 let mut queue = OperationQueue::new();
135 assert_eq!(queue.len(), 0);
136
137 queue.insert(vec![
138 TestOperation(clock.tick()),
139 TestOperation(clock.tick()),
140 ]);
141 assert_eq!(queue.len(), 2);
142
143 queue.insert(vec![TestOperation(clock.tick())]);
144 assert_eq!(queue.len(), 3);
145
146 drop(queue.drain());
147 assert_eq!(queue.len(), 0);
148
149 queue.insert(vec![TestOperation(clock.tick())]);
150 assert_eq!(queue.len(), 1);
151 }
152
153 #[derive(Clone, Debug, Eq, PartialEq)]
154 struct TestOperation(clock::Lamport);
155
156 impl Operation for TestOperation {
157 fn lamport_timestamp(&self) -> clock::Lamport {
158 self.0
159 }
160 }
161}