channel_index.rs

  1use std::{ops::Deref, sync::Arc};
  2
  3use crate::{Channel, ChannelId};
  4use collections::BTreeMap;
  5use rpc::proto;
  6
  7use super::ChannelPath;
  8
  9#[derive(Default, Debug)]
 10pub struct ChannelIndex {
 11    paths: Vec<ChannelPath>,
 12    channels_by_id: BTreeMap<ChannelId, Arc<Channel>>,
 13}
 14
 15impl ChannelIndex {
 16    pub fn by_id(&self) -> &BTreeMap<ChannelId, Arc<Channel>> {
 17        &self.channels_by_id
 18    }
 19
 20    pub fn clear(&mut self) {
 21        self.paths.clear();
 22        self.channels_by_id.clear();
 23    }
 24
 25    /// Delete the given channels from this index.
 26    pub fn delete_channels(&mut self, channels: &[ChannelId]) {
 27        self.channels_by_id
 28            .retain(|channel_id, _| !channels.contains(channel_id));
 29        self.paths.retain(|path| {
 30            path.iter()
 31                .all(|channel_id| self.channels_by_id.contains_key(channel_id))
 32        });
 33    }
 34
 35    pub fn bulk_insert(&mut self) -> ChannelPathsInsertGuard {
 36        ChannelPathsInsertGuard {
 37            paths: &mut self.paths,
 38            channels_by_id: &mut self.channels_by_id,
 39        }
 40    }
 41}
 42
 43impl Deref for ChannelIndex {
 44    type Target = [ChannelPath];
 45
 46    fn deref(&self) -> &Self::Target {
 47        &self.paths
 48    }
 49}
 50
 51/// A guard for ensuring that the paths index maintains its sort and uniqueness
 52/// invariants after a series of insertions
 53#[derive(Debug)]
 54pub struct ChannelPathsInsertGuard<'a> {
 55    paths: &'a mut Vec<ChannelPath>,
 56    channels_by_id: &'a mut BTreeMap<ChannelId, Arc<Channel>>,
 57}
 58
 59impl<'a> ChannelPathsInsertGuard<'a> {
 60    /// Remove the given edge from this index. This will not remove the channel.
 61    /// If this operation would result in a dangling edge, re-insert it.
 62    pub fn delete_edge(&mut self, parent_id: ChannelId, channel_id: ChannelId) {
 63        self.paths.retain(|path| {
 64            !path
 65                .windows(2)
 66                .any(|window| window == [parent_id, channel_id])
 67        });
 68
 69        // Ensure that there is at least one channel path in the index
 70        if !self
 71            .paths
 72            .iter()
 73            .any(|path| path.iter().any(|id| id == &channel_id))
 74        {
 75            self.insert_root(channel_id);
 76        }
 77    }
 78
 79    pub fn insert(&mut self, channel_proto: proto::Channel) {
 80        if let Some(existing_channel) = self.channels_by_id.get_mut(&channel_proto.id) {
 81            Arc::make_mut(existing_channel).name = channel_proto.name;
 82        } else {
 83            self.channels_by_id.insert(
 84                channel_proto.id,
 85                Arc::new(Channel {
 86                    id: channel_proto.id,
 87                    name: channel_proto.name,
 88                }),
 89            );
 90            self.insert_root(channel_proto.id);
 91        }
 92    }
 93
 94    pub fn insert_edge(&mut self, channel_id: ChannelId, parent_id: ChannelId) {
 95        let mut parents = Vec::new();
 96        let mut descendants = Vec::new();
 97        let mut ixs_to_remove = Vec::new();
 98
 99        for (ix, path) in self.paths.iter().enumerate() {
100            if path
101                .windows(2)
102                .any(|window| window[0] == parent_id && window[1] == channel_id)
103            {
104                // We already have this edge in the index
105                return;
106            }
107            if path.ends_with(&[parent_id]) {
108                parents.push(path);
109            } else if let Some(position) = path.iter().position(|id| id == &channel_id) {
110                if position == 0 {
111                    ixs_to_remove.push(ix);
112                }
113                descendants.push(path.split_at(position).1);
114            }
115        }
116
117        let mut new_paths = Vec::new();
118        for parent in parents.iter() {
119            if descendants.is_empty() {
120                let mut new_path = Vec::with_capacity(parent.len() + 1);
121                new_path.extend_from_slice(parent);
122                new_path.push(channel_id);
123                new_paths.push(ChannelPath::new(new_path.into()));
124            } else {
125                for descendant in descendants.iter() {
126                    let mut new_path = Vec::with_capacity(parent.len() + descendant.len());
127                    new_path.extend_from_slice(parent);
128                    new_path.extend_from_slice(descendant);
129                    new_paths.push(ChannelPath::new(new_path.into()));
130                }
131            }
132        }
133
134        for ix in ixs_to_remove.into_iter().rev() {
135            self.paths.swap_remove(ix);
136        }
137        self.paths.extend(new_paths)
138    }
139
140    fn insert_root(&mut self, channel_id: ChannelId) {
141        self.paths.push(ChannelPath::new(Arc::from([channel_id])));
142    }
143}
144
145impl<'a> Drop for ChannelPathsInsertGuard<'a> {
146    fn drop(&mut self) {
147        self.paths.sort_by(|a, b| {
148            let a = channel_path_sorting_key(a, &self.channels_by_id);
149            let b = channel_path_sorting_key(b, &self.channels_by_id);
150            a.cmp(b)
151        });
152        self.paths.dedup();
153    }
154}
155
156fn channel_path_sorting_key<'a>(
157    path: &'a [ChannelId],
158    channels_by_id: &'a BTreeMap<ChannelId, Arc<Channel>>,
159) -> impl 'a + Iterator<Item = Option<&'a str>> {
160    path.iter()
161        .map(|id| Some(channels_by_id.get(id)?.name.as_str()))
162}