1use crate::{
2 sum_tree::{Cursor, Dimension, Edit, Item, KeyedItem, SumTree},
3 time,
4};
5use std::{
6 fmt::Debug,
7 ops::{Add, AddAssign},
8};
9
10pub trait Operation: Clone + Debug + Eq {
11 fn timestamp(&self) -> time::Lamport;
12}
13
14#[derive(Clone, Debug)]
15pub struct OperationQueue<T: Operation>(SumTree<T>);
16
17#[derive(Clone, Copy, Debug, Default, Eq, Ord, PartialEq, PartialOrd)]
18pub struct OperationKey(time::Lamport);
19
20#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
21pub struct OperationSummary {
22 key: OperationKey,
23 len: usize,
24}
25
26impl<T: Operation> OperationQueue<T> {
27 pub fn new() -> Self {
28 OperationQueue(SumTree::new())
29 }
30
31 #[cfg(test)]
32 pub fn is_empty(&self) -> bool {
33 self.0.is_empty()
34 }
35
36 pub fn len(&self) -> usize {
37 self.0.summary().len
38 }
39
40 pub fn insert(&mut self, mut ops: Vec<T>) {
41 ops.sort_by_key(|op| op.timestamp());
42 ops.dedup_by_key(|op| op.timestamp());
43 let mut edits = ops
44 .into_iter()
45 .map(|op| Edit::Insert(op))
46 .collect::<Vec<_>>();
47 self.0.edit(&mut edits);
48 }
49
50 pub fn drain(&mut self) -> Self {
51 let clone = self.clone();
52 self.0 = SumTree::new();
53 clone
54 }
55
56 pub fn cursor(&self) -> Cursor<T, (), ()> {
57 self.0.cursor()
58 }
59}
60
61impl<T: Operation> Item for T {
62 type Summary = OperationSummary;
63
64 fn summary(&self) -> Self::Summary {
65 OperationSummary {
66 key: OperationKey(self.timestamp()),
67 len: 1,
68 }
69 }
70}
71
72impl<T: Operation> KeyedItem for T {
73 type Key = OperationKey;
74
75 fn key(&self) -> Self::Key {
76 OperationKey(self.timestamp())
77 }
78}
79
80impl<'a> AddAssign<&'a Self> for OperationSummary {
81 fn add_assign(&mut self, other: &Self) {
82 assert!(self.key < other.key);
83 self.key = other.key;
84 self.len += other.len;
85 }
86}
87
88impl<'a> Add<&'a Self> for OperationSummary {
89 type Output = Self;
90
91 fn add(self, other: &Self) -> Self {
92 assert!(self.key < other.key);
93 OperationSummary {
94 key: other.key,
95 len: self.len + other.len,
96 }
97 }
98}
99
100impl<'a> Dimension<'a, OperationSummary> for OperationKey {
101 fn add_summary(&mut self, summary: &OperationSummary) {
102 assert!(*self <= summary.key);
103 *self = summary.key;
104 }
105}
106
107#[cfg(test)]
108mod tests {
109 use super::*;
110
111 #[test]
112 fn test_len() {
113 let mut clock = time::Lamport::new(0);
114
115 let mut queue = OperationQueue::new();
116 assert_eq!(queue.len(), 0);
117
118 queue.insert(vec![
119 TestOperation(clock.tick()),
120 TestOperation(clock.tick()),
121 ]);
122 assert_eq!(queue.len(), 2);
123
124 queue.insert(vec![TestOperation(clock.tick())]);
125 assert_eq!(queue.len(), 3);
126
127 drop(queue.drain());
128 assert_eq!(queue.len(), 0);
129
130 queue.insert(vec![TestOperation(clock.tick())]);
131 assert_eq!(queue.len(), 1);
132 }
133
134 #[derive(Clone, Debug, Eq, PartialEq)]
135 struct TestOperation(time::Lamport);
136
137 impl Operation for TestOperation {
138 fn timestamp(&self) -> time::Lamport {
139 self.0
140 }
141 }
142}