1use std::{ops::Deref, sync::Arc};
2
3use crate::{Channel, ChannelId};
4use collections::BTreeMap;
5use rpc::proto;
6
7use super::ChannelPath;
8
9#[derive(Default, Debug)]
10pub struct ChannelIndex {
11 paths: Vec<ChannelPath>,
12 channels_by_id: BTreeMap<ChannelId, Arc<Channel>>,
13}
14
15impl ChannelIndex {
16 pub fn by_id(&self) -> &BTreeMap<ChannelId, Arc<Channel>> {
17 &self.channels_by_id
18 }
19
20 pub fn clear(&mut self) {
21 self.paths.clear();
22 self.channels_by_id.clear();
23 }
24
25 /// Delete the given channels from this index.
26 pub fn delete_channels(&mut self, channels: &[ChannelId]) {
27 self.channels_by_id
28 .retain(|channel_id, _| !channels.contains(channel_id));
29 self.paths.retain(|path| {
30 path.iter()
31 .all(|channel_id| self.channels_by_id.contains_key(channel_id))
32 });
33 }
34
35 pub fn bulk_insert(&mut self) -> ChannelPathsInsertGuard {
36 ChannelPathsInsertGuard {
37 paths: &mut self.paths,
38 channels_by_id: &mut self.channels_by_id,
39 }
40 }
41
42 pub fn acknowledge_note_version(
43 &mut self,
44 channel_id: ChannelId,
45 epoch: u64,
46 version: &clock::Global,
47 ) {
48 if let Some(channel) = self.channels_by_id.get_mut(&channel_id) {
49 let channel = Arc::make_mut(channel);
50 if let Some((unseen_epoch, unseen_version)) = &channel.unseen_note_version {
51 if epoch > *unseen_epoch
52 || epoch == *unseen_epoch && version.observed_all(unseen_version)
53 {
54 channel.unseen_note_version = None;
55 }
56 }
57 }
58 }
59
60 pub fn acknowledge_message_id(&mut self, channel_id: ChannelId, message_id: u64) {
61 if let Some(channel) = self.channels_by_id.get_mut(&channel_id) {
62 let channel = Arc::make_mut(channel);
63 if let Some(unseen_message_id) = channel.unseen_message_id {
64 if message_id >= unseen_message_id {
65 channel.unseen_message_id = None;
66 }
67 }
68 }
69 }
70
71 pub fn note_changed(&mut self, channel_id: ChannelId, epoch: u64, version: &clock::Global) {
72 insert_note_changed(&mut self.channels_by_id, channel_id, epoch, version);
73 }
74
75 pub fn new_message(&mut self, channel_id: ChannelId, message_id: u64) {
76 insert_new_message(&mut self.channels_by_id, channel_id, message_id)
77 }
78}
79
80impl Deref for ChannelIndex {
81 type Target = [ChannelPath];
82
83 fn deref(&self) -> &Self::Target {
84 &self.paths
85 }
86}
87
88/// A guard for ensuring that the paths index maintains its sort and uniqueness
89/// invariants after a series of insertions
90#[derive(Debug)]
91pub struct ChannelPathsInsertGuard<'a> {
92 paths: &'a mut Vec<ChannelPath>,
93 channels_by_id: &'a mut BTreeMap<ChannelId, Arc<Channel>>,
94}
95
96impl<'a> ChannelPathsInsertGuard<'a> {
97 /// Remove the given edge from this index. This will not remove the channel.
98 /// If this operation would result in a dangling edge, re-insert it.
99 pub fn delete_edge(&mut self, parent_id: ChannelId, channel_id: ChannelId) {
100 self.paths.retain(|path| {
101 !path
102 .windows(2)
103 .any(|window| window == [parent_id, channel_id])
104 });
105
106 // Ensure that there is at least one channel path in the index
107 if !self
108 .paths
109 .iter()
110 .any(|path| path.iter().any(|id| id == &channel_id))
111 {
112 self.insert_root(channel_id);
113 }
114 }
115
116 pub fn note_changed(&mut self, channel_id: ChannelId, epoch: u64, version: &clock::Global) {
117 insert_note_changed(&mut self.channels_by_id, channel_id, epoch, &version);
118 }
119
120 pub fn new_messages(&mut self, channel_id: ChannelId, message_id: u64) {
121 insert_new_message(&mut self.channels_by_id, channel_id, message_id)
122 }
123
124 pub fn insert(&mut self, channel_proto: proto::Channel) {
125 if let Some(existing_channel) = self.channels_by_id.get_mut(&channel_proto.id) {
126 let existing_channel = Arc::make_mut(existing_channel);
127 existing_channel.visibility = channel_proto.visibility();
128 existing_channel.name = channel_proto.name;
129 } else {
130 self.channels_by_id.insert(
131 channel_proto.id,
132 Arc::new(Channel {
133 id: channel_proto.id,
134 visibility: channel_proto.visibility(),
135 name: channel_proto.name,
136 unseen_note_version: None,
137 unseen_message_id: None,
138 }),
139 );
140 self.insert_root(channel_proto.id);
141 }
142 }
143
144 pub fn insert_edge(&mut self, channel_id: ChannelId, parent_id: ChannelId) {
145 let mut parents = Vec::new();
146 let mut descendants = Vec::new();
147 let mut ixs_to_remove = Vec::new();
148
149 for (ix, path) in self.paths.iter().enumerate() {
150 if path
151 .windows(2)
152 .any(|window| window[0] == parent_id && window[1] == channel_id)
153 {
154 // We already have this edge in the index
155 return;
156 }
157 if path.ends_with(&[parent_id]) {
158 parents.push(path);
159 } else if let Some(position) = path.iter().position(|id| id == &channel_id) {
160 if position == 0 {
161 ixs_to_remove.push(ix);
162 }
163 descendants.push(path.split_at(position).1);
164 }
165 }
166
167 let mut new_paths = Vec::new();
168 for parent in parents.iter() {
169 if descendants.is_empty() {
170 let mut new_path = Vec::with_capacity(parent.len() + 1);
171 new_path.extend_from_slice(parent);
172 new_path.push(channel_id);
173 new_paths.push(ChannelPath::new(new_path.into()));
174 } else {
175 for descendant in descendants.iter() {
176 let mut new_path = Vec::with_capacity(parent.len() + descendant.len());
177 new_path.extend_from_slice(parent);
178 new_path.extend_from_slice(descendant);
179 new_paths.push(ChannelPath::new(new_path.into()));
180 }
181 }
182 }
183
184 for ix in ixs_to_remove.into_iter().rev() {
185 self.paths.swap_remove(ix);
186 }
187 self.paths.extend(new_paths)
188 }
189
190 fn insert_root(&mut self, channel_id: ChannelId) {
191 self.paths.push(ChannelPath::new(Arc::from([channel_id])));
192 }
193}
194
195impl<'a> Drop for ChannelPathsInsertGuard<'a> {
196 fn drop(&mut self) {
197 self.paths.sort_by(|a, b| {
198 let a = channel_path_sorting_key(a, &self.channels_by_id);
199 let b = channel_path_sorting_key(b, &self.channels_by_id);
200 a.cmp(b)
201 });
202 self.paths.dedup();
203 }
204}
205
206fn channel_path_sorting_key<'a>(
207 path: &'a [ChannelId],
208 channels_by_id: &'a BTreeMap<ChannelId, Arc<Channel>>,
209) -> impl 'a + Iterator<Item = Option<&'a str>> {
210 path.iter()
211 .map(|id| Some(channels_by_id.get(id)?.name.as_str()))
212}
213
214fn insert_note_changed(
215 channels_by_id: &mut BTreeMap<ChannelId, Arc<Channel>>,
216 channel_id: u64,
217 epoch: u64,
218 version: &clock::Global,
219) {
220 if let Some(channel) = channels_by_id.get_mut(&channel_id) {
221 let unseen_version = Arc::make_mut(channel)
222 .unseen_note_version
223 .get_or_insert((0, clock::Global::new()));
224 if epoch > unseen_version.0 {
225 *unseen_version = (epoch, version.clone());
226 } else {
227 unseen_version.1.join(&version);
228 }
229 }
230}
231
232fn insert_new_message(
233 channels_by_id: &mut BTreeMap<ChannelId, Arc<Channel>>,
234 channel_id: u64,
235 message_id: u64,
236) {
237 if let Some(channel) = channels_by_id.get_mut(&channel_id) {
238 let unseen_message_id = Arc::make_mut(channel).unseen_message_id.get_or_insert(0);
239 *unseen_message_id = message_id.max(*unseen_message_id);
240 }
241}