1use crate::{
2 sum_tree::{Cursor, Dimension, Edit, Item, KeyedItem, SumTree, Summary},
3 time,
4};
5use std::{fmt::Debug, ops::Add};
6
7pub trait Operation: Clone + Debug + Eq {
8 fn timestamp(&self) -> time::Lamport;
9}
10
11#[derive(Clone, Debug)]
12pub struct OperationQueue<T: Operation>(SumTree<T>);
13
14#[derive(Clone, Copy, Debug, Default, Eq, Ord, PartialEq, PartialOrd)]
15pub struct OperationKey(time::Lamport);
16
17#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
18pub struct OperationSummary {
19 key: OperationKey,
20 len: usize,
21}
22
23impl<T: Operation> OperationQueue<T> {
24 pub fn new() -> Self {
25 OperationQueue(SumTree::new())
26 }
27
28 pub fn len(&self) -> usize {
29 self.0.summary().len
30 }
31
32 pub fn insert(&mut self, mut ops: Vec<T>) {
33 ops.sort_by_key(|op| op.timestamp());
34 ops.dedup_by_key(|op| op.timestamp());
35 self.0
36 .edit(ops.into_iter().map(Edit::Insert).collect(), &());
37 }
38
39 pub fn drain(&mut self) -> Self {
40 let clone = self.clone();
41 self.0 = SumTree::new();
42 clone
43 }
44
45 pub fn cursor(&self) -> Cursor<T, (), ()> {
46 self.0.cursor()
47 }
48}
49
50impl<T: Operation> Item for T {
51 type Summary = OperationSummary;
52
53 fn summary(&self) -> Self::Summary {
54 OperationSummary {
55 key: OperationKey(self.timestamp()),
56 len: 1,
57 }
58 }
59}
60
61impl<T: Operation> KeyedItem for T {
62 type Key = OperationKey;
63
64 fn key(&self) -> Self::Key {
65 OperationKey(self.timestamp())
66 }
67}
68
69impl Summary for OperationSummary {
70 type Context = ();
71
72 fn add_summary(&mut self, other: &Self, _: &()) {
73 assert!(self.key < other.key);
74 self.key = other.key;
75 self.len += other.len;
76 }
77}
78
79impl<'a> Add<&'a Self> for OperationSummary {
80 type Output = Self;
81
82 fn add(self, other: &Self) -> Self {
83 assert!(self.key < other.key);
84 OperationSummary {
85 key: other.key,
86 len: self.len + other.len,
87 }
88 }
89}
90
91impl<'a> Dimension<'a, OperationSummary> for OperationKey {
92 fn add_summary(&mut self, summary: &OperationSummary, _: &()) {
93 assert!(*self <= summary.key);
94 *self = summary.key;
95 }
96}
97
98#[cfg(test)]
99mod tests {
100 use super::*;
101
102 #[test]
103 fn test_len() {
104 let mut clock = time::Lamport::new(0);
105
106 let mut queue = OperationQueue::new();
107 assert_eq!(queue.len(), 0);
108
109 queue.insert(vec![
110 TestOperation(clock.tick()),
111 TestOperation(clock.tick()),
112 ]);
113 assert_eq!(queue.len(), 2);
114
115 queue.insert(vec![TestOperation(clock.tick())]);
116 assert_eq!(queue.len(), 3);
117
118 drop(queue.drain());
119 assert_eq!(queue.len(), 0);
120
121 queue.insert(vec![TestOperation(clock.tick())]);
122 assert_eq!(queue.len(), 1);
123 }
124
125 #[derive(Clone, Debug, Eq, PartialEq)]
126 struct TestOperation(time::Lamport);
127
128 impl Operation for TestOperation {
129 fn timestamp(&self) -> time::Lamport {
130 self.0
131 }
132 }
133}