channel_index.rs

  1use std::{ops::Deref, sync::Arc};
  2
  3use crate::{Channel, ChannelId};
  4use collections::BTreeMap;
  5use rpc::proto;
  6
  7use super::ChannelPath;
  8
  9#[derive(Default, Debug)]
 10pub struct ChannelIndex {
 11    paths: Vec<ChannelPath>,
 12    channels_by_id: BTreeMap<ChannelId, Arc<Channel>>,
 13}
 14
 15impl ChannelIndex {
 16    pub fn by_id(&self) -> &BTreeMap<ChannelId, Arc<Channel>> {
 17        &self.channels_by_id
 18    }
 19
 20    pub fn clear(&mut self) {
 21        self.paths.clear();
 22        self.channels_by_id.clear();
 23    }
 24
 25    /// Delete the given channels from this index.
 26    pub fn delete_channels(&mut self, channels: &[ChannelId]) {
 27        self.channels_by_id
 28            .retain(|channel_id, _| !channels.contains(channel_id));
 29        self.paths.retain(|path| {
 30            path.iter()
 31                .all(|channel_id| self.channels_by_id.contains_key(channel_id))
 32        });
 33    }
 34
 35    pub fn bulk_insert(&mut self) -> ChannelPathsInsertGuard {
 36        ChannelPathsInsertGuard {
 37            paths: &mut self.paths,
 38            channels_by_id: &mut self.channels_by_id,
 39        }
 40    }
 41
 42    pub fn acknowledge_note_version(
 43        &mut self,
 44        channel_id: ChannelId,
 45        epoch: u64,
 46        version: &clock::Global,
 47    ) {
 48        if let Some(channel) = self.channels_by_id.get_mut(&channel_id) {
 49            let channel = Arc::make_mut(channel);
 50            if let Some((unseen_epoch, unseen_version)) = &channel.unseen_note_version {
 51                if epoch > *unseen_epoch
 52                    || epoch == *unseen_epoch && version.observed_all(unseen_version)
 53                {
 54                    channel.unseen_note_version = None;
 55                }
 56            }
 57        }
 58    }
 59
 60    pub fn acknowledge_message_id(&mut self, channel_id: ChannelId, message_id: u64) {
 61        if let Some(channel) = self.channels_by_id.get_mut(&channel_id) {
 62            let channel = Arc::make_mut(channel);
 63            if let Some(unseen_message_id) = channel.unseen_message_id {
 64                if message_id >= unseen_message_id {
 65                    channel.unseen_message_id = None;
 66                }
 67            }
 68        }
 69    }
 70
 71    pub fn note_changed(&mut self, channel_id: ChannelId, epoch: u64, version: &clock::Global) {
 72        insert_note_changed(&mut self.channels_by_id, channel_id, epoch, version);
 73    }
 74
 75    pub fn new_message(&mut self, channel_id: ChannelId, message_id: u64) {
 76        insert_new_message(&mut self.channels_by_id, channel_id, message_id)
 77    }
 78}
 79
 80impl Deref for ChannelIndex {
 81    type Target = [ChannelPath];
 82
 83    fn deref(&self) -> &Self::Target {
 84        &self.paths
 85    }
 86}
 87
 88/// A guard for ensuring that the paths index maintains its sort and uniqueness
 89/// invariants after a series of insertions
 90#[derive(Debug)]
 91pub struct ChannelPathsInsertGuard<'a> {
 92    paths: &'a mut Vec<ChannelPath>,
 93    channels_by_id: &'a mut BTreeMap<ChannelId, Arc<Channel>>,
 94}
 95
 96impl<'a> ChannelPathsInsertGuard<'a> {
 97    /// Remove the given edge from this index. This will not remove the channel.
 98    /// If this operation would result in a dangling edge, re-insert it.
 99    pub fn delete_edge(&mut self, parent_id: ChannelId, channel_id: ChannelId) {
100        self.paths.retain(|path| {
101            !path
102                .windows(2)
103                .any(|window| window == [parent_id, channel_id])
104        });
105
106        // Ensure that there is at least one channel path in the index
107        if !self
108            .paths
109            .iter()
110            .any(|path| path.iter().any(|id| id == &channel_id))
111        {
112            self.insert_root(channel_id);
113        }
114    }
115
116    pub fn note_changed(&mut self, channel_id: ChannelId, epoch: u64, version: &clock::Global) {
117        insert_note_changed(&mut self.channels_by_id, channel_id, epoch, &version);
118    }
119
120    pub fn new_messages(&mut self, channel_id: ChannelId, message_id: u64) {
121        insert_new_message(&mut self.channels_by_id, channel_id, message_id)
122    }
123
124    pub fn insert(&mut self, channel_proto: proto::Channel) {
125        if let Some(existing_channel) = self.channels_by_id.get_mut(&channel_proto.id) {
126            let existing_channel = Arc::make_mut(existing_channel);
127            existing_channel.visibility = channel_proto.visibility();
128            existing_channel.name = channel_proto.name;
129        } else {
130            self.channels_by_id.insert(
131                channel_proto.id,
132                Arc::new(Channel {
133                    id: channel_proto.id,
134                    visibility: channel_proto.visibility(),
135                    name: channel_proto.name,
136                    unseen_note_version: None,
137                    unseen_message_id: None,
138                }),
139            );
140            self.insert_root(channel_proto.id);
141        }
142    }
143
144    pub fn insert_edge(&mut self, channel_id: ChannelId, parent_id: ChannelId) {
145        let mut parents = Vec::new();
146        let mut descendants = Vec::new();
147        let mut ixs_to_remove = Vec::new();
148
149        for (ix, path) in self.paths.iter().enumerate() {
150            if path
151                .windows(2)
152                .any(|window| window[0] == parent_id && window[1] == channel_id)
153            {
154                // We already have this edge in the index
155                return;
156            }
157            if path.ends_with(&[parent_id]) {
158                parents.push(path);
159            } else if let Some(position) = path.iter().position(|id| id == &channel_id) {
160                if position == 0 {
161                    ixs_to_remove.push(ix);
162                }
163                descendants.push(path.split_at(position).1);
164            }
165        }
166
167        let mut new_paths = Vec::new();
168        for parent in parents.iter() {
169            if descendants.is_empty() {
170                let mut new_path = Vec::with_capacity(parent.len() + 1);
171                new_path.extend_from_slice(parent);
172                new_path.push(channel_id);
173                new_paths.push(ChannelPath::new(new_path.into()));
174            } else {
175                for descendant in descendants.iter() {
176                    let mut new_path = Vec::with_capacity(parent.len() + descendant.len());
177                    new_path.extend_from_slice(parent);
178                    new_path.extend_from_slice(descendant);
179                    new_paths.push(ChannelPath::new(new_path.into()));
180                }
181            }
182        }
183
184        for ix in ixs_to_remove.into_iter().rev() {
185            self.paths.swap_remove(ix);
186        }
187        self.paths.extend(new_paths)
188    }
189
190    fn insert_root(&mut self, channel_id: ChannelId) {
191        self.paths.push(ChannelPath::new(Arc::from([channel_id])));
192    }
193}
194
195impl<'a> Drop for ChannelPathsInsertGuard<'a> {
196    fn drop(&mut self) {
197        self.paths.sort_by(|a, b| {
198            let a = channel_path_sorting_key(a, &self.channels_by_id);
199            let b = channel_path_sorting_key(b, &self.channels_by_id);
200            a.cmp(b)
201        });
202        self.paths.dedup();
203    }
204}
205
206fn channel_path_sorting_key<'a>(
207    path: &'a [ChannelId],
208    channels_by_id: &'a BTreeMap<ChannelId, Arc<Channel>>,
209) -> impl 'a + Iterator<Item = Option<&'a str>> {
210    path.iter()
211        .map(|id| Some(channels_by_id.get(id)?.name.as_str()))
212}
213
214fn insert_note_changed(
215    channels_by_id: &mut BTreeMap<ChannelId, Arc<Channel>>,
216    channel_id: u64,
217    epoch: u64,
218    version: &clock::Global,
219) {
220    if let Some(channel) = channels_by_id.get_mut(&channel_id) {
221        let unseen_version = Arc::make_mut(channel)
222            .unseen_note_version
223            .get_or_insert((0, clock::Global::new()));
224        if epoch > unseen_version.0 {
225            *unseen_version = (epoch, version.clone());
226        } else {
227            unseen_version.1.join(&version);
228        }
229    }
230}
231
232fn insert_new_message(
233    channels_by_id: &mut BTreeMap<ChannelId, Arc<Channel>>,
234    channel_id: u64,
235    message_id: u64,
236) {
237    if let Some(channel) = channels_by_id.get_mut(&channel_id) {
238        let unseen_message_id = Arc::make_mut(channel).unseen_message_id.get_or_insert(0);
239        *unseen_message_id = message_id.max(*unseen_message_id);
240    }
241}