1use std::{ops::Deref, sync::Arc};
2
3use crate::{Channel, ChannelId};
4use collections::BTreeMap;
5use rpc::proto;
6
7use super::ChannelPath;
8
9#[derive(Default, Debug)]
10pub struct ChannelIndex {
11 paths: Vec<ChannelPath>,
12 channels_by_id: BTreeMap<ChannelId, Arc<Channel>>,
13}
14
15impl ChannelIndex {
16 pub fn by_id(&self) -> &BTreeMap<ChannelId, Arc<Channel>> {
17 &self.channels_by_id
18 }
19
20 pub fn clear(&mut self) {
21 self.paths.clear();
22 self.channels_by_id.clear();
23 }
24
25 /// Delete the given channels from this index.
26 pub fn delete_channels(&mut self, channels: &[ChannelId]) {
27 self.channels_by_id
28 .retain(|channel_id, _| !channels.contains(channel_id));
29 self.paths.retain(|path| {
30 path.iter()
31 .all(|channel_id| self.channels_by_id.contains_key(channel_id))
32 });
33 }
34
35 pub fn bulk_insert(&mut self) -> ChannelPathsInsertGuard {
36 ChannelPathsInsertGuard {
37 paths: &mut self.paths,
38 channels_by_id: &mut self.channels_by_id,
39 }
40 }
41
42 pub fn acknowledge_note_version(
43 &mut self,
44 channel_id: ChannelId,
45 epoch: u64,
46 version: &clock::Global,
47 ) {
48 if let Some(channel) = self.channels_by_id.get_mut(&channel_id) {
49 let channel = Arc::make_mut(channel);
50 if let Some((unseen_epoch, unseen_version)) = &channel.unseen_note_version {
51 if epoch > *unseen_epoch
52 || epoch == *unseen_epoch && version.observed_all(unseen_version)
53 {
54 channel.unseen_note_version = None;
55 }
56 }
57 }
58 }
59
60 pub fn acknowledge_message_id(&mut self, channel_id: ChannelId, message_id: u64) {
61 if let Some(channel) = self.channels_by_id.get_mut(&channel_id) {
62 let channel = Arc::make_mut(channel);
63 if let Some(unseen_message_id) = channel.unseen_message_id {
64 if message_id >= unseen_message_id {
65 channel.unseen_message_id = None;
66 }
67 }
68 }
69 }
70
71 pub fn note_changed(&mut self, channel_id: ChannelId, epoch: u64, version: &clock::Global) {
72 insert_note_changed(&mut self.channels_by_id, channel_id, epoch, version);
73 }
74
75 pub fn new_message(&mut self, channel_id: ChannelId, message_id: u64) {
76 insert_new_message(&mut self.channels_by_id, channel_id, message_id)
77 }
78}
79
80impl Deref for ChannelIndex {
81 type Target = [ChannelPath];
82
83 fn deref(&self) -> &Self::Target {
84 &self.paths
85 }
86}
87
88/// A guard for ensuring that the paths index maintains its sort and uniqueness
89/// invariants after a series of insertions
90#[derive(Debug)]
91pub struct ChannelPathsInsertGuard<'a> {
92 paths: &'a mut Vec<ChannelPath>,
93 channels_by_id: &'a mut BTreeMap<ChannelId, Arc<Channel>>,
94}
95
96impl<'a> ChannelPathsInsertGuard<'a> {
97 /// Remove the given edge from this index. This will not remove the channel.
98 /// If this operation would result in a dangling edge, re-insert it.
99 pub fn delete_edge(&mut self, parent_id: ChannelId, channel_id: ChannelId) {
100 self.paths.retain(|path| {
101 !path
102 .windows(2)
103 .any(|window| window == [parent_id, channel_id])
104 });
105
106 // Ensure that there is at least one channel path in the index
107 if !self
108 .paths
109 .iter()
110 .any(|path| path.iter().any(|id| id == &channel_id))
111 {
112 self.insert_root(channel_id);
113 }
114 }
115
116 pub fn note_changed(&mut self, channel_id: ChannelId, epoch: u64, version: &clock::Global) {
117 insert_note_changed(&mut self.channels_by_id, channel_id, epoch, &version);
118 }
119
120 pub fn new_messages(&mut self, channel_id: ChannelId, message_id: u64) {
121 insert_new_message(&mut self.channels_by_id, channel_id, message_id)
122 }
123
124 pub fn insert(&mut self, channel_proto: proto::Channel) {
125 if let Some(existing_channel) = self.channels_by_id.get_mut(&channel_proto.id) {
126 Arc::make_mut(existing_channel).name = channel_proto.name;
127 } else {
128 self.channels_by_id.insert(
129 channel_proto.id,
130 Arc::new(Channel {
131 id: channel_proto.id,
132 name: channel_proto.name,
133 unseen_note_version: None,
134 unseen_message_id: None,
135 }),
136 );
137 self.insert_root(channel_proto.id);
138 }
139 }
140
141 pub fn insert_edge(&mut self, channel_id: ChannelId, parent_id: ChannelId) {
142 let mut parents = Vec::new();
143 let mut descendants = Vec::new();
144 let mut ixs_to_remove = Vec::new();
145
146 for (ix, path) in self.paths.iter().enumerate() {
147 if path
148 .windows(2)
149 .any(|window| window[0] == parent_id && window[1] == channel_id)
150 {
151 // We already have this edge in the index
152 return;
153 }
154 if path.ends_with(&[parent_id]) {
155 parents.push(path);
156 } else if let Some(position) = path.iter().position(|id| id == &channel_id) {
157 if position == 0 {
158 ixs_to_remove.push(ix);
159 }
160 descendants.push(path.split_at(position).1);
161 }
162 }
163
164 let mut new_paths = Vec::new();
165 for parent in parents.iter() {
166 if descendants.is_empty() {
167 let mut new_path = Vec::with_capacity(parent.len() + 1);
168 new_path.extend_from_slice(parent);
169 new_path.push(channel_id);
170 new_paths.push(ChannelPath::new(new_path.into()));
171 } else {
172 for descendant in descendants.iter() {
173 let mut new_path = Vec::with_capacity(parent.len() + descendant.len());
174 new_path.extend_from_slice(parent);
175 new_path.extend_from_slice(descendant);
176 new_paths.push(ChannelPath::new(new_path.into()));
177 }
178 }
179 }
180
181 for ix in ixs_to_remove.into_iter().rev() {
182 self.paths.swap_remove(ix);
183 }
184 self.paths.extend(new_paths)
185 }
186
187 fn insert_root(&mut self, channel_id: ChannelId) {
188 self.paths.push(ChannelPath::new(Arc::from([channel_id])));
189 }
190}
191
192impl<'a> Drop for ChannelPathsInsertGuard<'a> {
193 fn drop(&mut self) {
194 self.paths.sort_by(|a, b| {
195 let a = channel_path_sorting_key(a, &self.channels_by_id);
196 let b = channel_path_sorting_key(b, &self.channels_by_id);
197 a.cmp(b)
198 });
199 self.paths.dedup();
200 }
201}
202
203fn channel_path_sorting_key<'a>(
204 path: &'a [ChannelId],
205 channels_by_id: &'a BTreeMap<ChannelId, Arc<Channel>>,
206) -> impl 'a + Iterator<Item = Option<&'a str>> {
207 path.iter()
208 .map(|id| Some(channels_by_id.get(id)?.name.as_str()))
209}
210
211fn insert_note_changed(
212 channels_by_id: &mut BTreeMap<ChannelId, Arc<Channel>>,
213 channel_id: u64,
214 epoch: u64,
215 version: &clock::Global,
216) {
217 if let Some(channel) = channels_by_id.get_mut(&channel_id) {
218 let unseen_version = Arc::make_mut(channel)
219 .unseen_note_version
220 .get_or_insert((0, clock::Global::new()));
221 if epoch > unseen_version.0 {
222 *unseen_version = (epoch, version.clone());
223 } else {
224 unseen_version.1.join(&version);
225 }
226 }
227}
228
229fn insert_new_message(
230 channels_by_id: &mut BTreeMap<ChannelId, Arc<Channel>>,
231 channel_id: u64,
232 message_id: u64,
233) {
234 if let Some(channel) = channels_by_id.get_mut(&channel_id) {
235 let unseen_message_id = Arc::make_mut(channel).unseen_message_id.get_or_insert(0);
236 *unseen_message_id = message_id.max(*unseen_message_id);
237 }
238}