1use clock::Lamport;
2use std::{fmt::Debug, ops::Add};
3use sum_tree::{ContextLessSummary, Dimension, Edit, Item, KeyedItem, SumTree};
4
5pub trait Operation: Clone + Debug {
6 fn lamport_timestamp(&self) -> clock::Lamport;
7}
8
9#[derive(Clone, Debug)]
10struct OperationItem<T>(T);
11
12#[derive(Clone, Debug)]
13pub struct OperationQueue<T: Operation>(SumTree<OperationItem<T>>);
14
15#[derive(Clone, Copy, Debug, Eq, Ord, PartialEq, PartialOrd)]
16pub struct OperationKey(clock::Lamport);
17
18#[derive(Clone, Copy, Debug, Eq, PartialEq)]
19pub struct OperationSummary {
20 pub key: OperationKey,
21 pub len: usize,
22}
23
24impl OperationKey {
25 pub fn new(timestamp: clock::Lamport) -> Self {
26 Self(timestamp)
27 }
28}
29
30impl<T: Operation> Default for OperationQueue<T> {
31 fn default() -> Self {
32 OperationQueue::new()
33 }
34}
35
36impl<T: Operation> OperationQueue<T> {
37 pub fn new() -> Self {
38 OperationQueue(SumTree::default())
39 }
40
41 pub fn len(&self) -> usize {
42 self.0.summary().len
43 }
44
45 pub fn is_empty(&self) -> bool {
46 self.len() == 0
47 }
48
49 pub fn insert(&mut self, mut ops: Vec<T>) {
50 ops.sort_by_key(|op| op.lamport_timestamp());
51 ops.dedup_by_key(|op| op.lamport_timestamp());
52 self.0.edit(
53 ops.into_iter()
54 .map(|op| Edit::Insert(OperationItem(op)))
55 .collect(),
56 (),
57 );
58 }
59
60 pub fn drain(&mut self) -> Self {
61 let clone = self.clone();
62 self.0 = SumTree::default();
63 clone
64 }
65
66 pub fn iter(&self) -> impl Iterator<Item = &T> {
67 self.0.iter().map(|i| &i.0)
68 }
69}
70
71impl ContextLessSummary for OperationSummary {
72 fn zero() -> Self {
73 OperationSummary {
74 key: OperationKey::new(Lamport::MIN),
75 len: 0,
76 }
77 }
78
79 fn add_summary(&mut self, other: &Self) {
80 assert!(self.key < other.key);
81 self.key = other.key;
82 self.len += other.len;
83 }
84}
85
86impl Add<&Self> for OperationSummary {
87 type Output = Self;
88
89 fn add(self, other: &Self) -> Self {
90 assert!(self.key < other.key);
91 OperationSummary {
92 key: other.key,
93 len: self.len + other.len,
94 }
95 }
96}
97
98impl Dimension<'_, OperationSummary> for OperationKey {
99 fn zero(_cx: ()) -> Self {
100 OperationKey::new(Lamport::MIN)
101 }
102
103 fn add_summary(&mut self, summary: &OperationSummary, _: ()) {
104 assert!(*self <= summary.key);
105 *self = summary.key;
106 }
107}
108
109impl<T: Operation> Item for OperationItem<T> {
110 type Summary = OperationSummary;
111
112 fn summary(&self, _cx: ()) -> Self::Summary {
113 OperationSummary {
114 key: OperationKey::new(self.0.lamport_timestamp()),
115 len: 1,
116 }
117 }
118}
119
120impl<T: Operation> KeyedItem for OperationItem<T> {
121 type Key = OperationKey;
122
123 fn key(&self) -> Self::Key {
124 OperationKey::new(self.0.lamport_timestamp())
125 }
126}
127
128#[cfg(test)]
129mod tests {
130 use clock::ReplicaId;
131
132 use super::*;
133
134 #[test]
135 fn test_len() {
136 let mut clock = clock::Lamport::new(ReplicaId::LOCAL);
137
138 let mut queue = OperationQueue::new();
139 assert_eq!(queue.len(), 0);
140
141 queue.insert(vec![
142 TestOperation(clock.tick()),
143 TestOperation(clock.tick()),
144 ]);
145 assert_eq!(queue.len(), 2);
146
147 queue.insert(vec![TestOperation(clock.tick())]);
148 assert_eq!(queue.len(), 3);
149
150 drop(queue.drain());
151 assert_eq!(queue.len(), 0);
152
153 queue.insert(vec![TestOperation(clock.tick())]);
154 assert_eq!(queue.len(), 1);
155 }
156
157 #[derive(Clone, Debug, Eq, PartialEq)]
158 struct TestOperation(clock::Lamport);
159
160 impl Operation for TestOperation {
161 fn lamport_timestamp(&self) -> clock::Lamport {
162 self.0
163 }
164 }
165}