channel_index.rs

  1use std::{ops::Deref, sync::Arc};
  2
  3use crate::{Channel, ChannelId};
  4use collections::BTreeMap;
  5use rpc::proto;
  6
  7use super::ChannelPath;
  8
  9#[derive(Default, Debug)]
 10pub struct ChannelIndex {
 11    paths: Vec<ChannelPath>,
 12    channels_by_id: BTreeMap<ChannelId, Arc<Channel>>,
 13}
 14
 15impl ChannelIndex {
 16    pub fn by_id(&self) -> &BTreeMap<ChannelId, Arc<Channel>> {
 17        &self.channels_by_id
 18    }
 19
 20    pub fn clear(&mut self) {
 21        self.paths.clear();
 22        self.channels_by_id.clear();
 23    }
 24
 25    /// Delete the given channels from this index.
 26    pub fn delete_channels(&mut self, channels: &[ChannelId]) {
 27        self.channels_by_id
 28            .retain(|channel_id, _| !channels.contains(channel_id));
 29        self.paths.retain(|path| {
 30            path.iter()
 31                .all(|channel_id| self.channels_by_id.contains_key(channel_id))
 32        });
 33    }
 34
 35    pub fn bulk_insert(&mut self) -> ChannelPathsInsertGuard {
 36        ChannelPathsInsertGuard {
 37            paths: &mut self.paths,
 38            channels_by_id: &mut self.channels_by_id,
 39        }
 40    }
 41
 42    pub fn acknowledge_note_version(
 43        &mut self,
 44        channel_id: ChannelId,
 45        epoch: u64,
 46        version: &clock::Global,
 47    ) {
 48        if let Some(channel) = self.channels_by_id.get_mut(&channel_id) {
 49            let channel = Arc::make_mut(channel);
 50            if let Some((unseen_epoch, unseen_version)) = &channel.unseen_note_version {
 51                if epoch > *unseen_epoch
 52                    || epoch == *unseen_epoch && version.observed_all(unseen_version)
 53                {
 54                    channel.unseen_note_version = None;
 55                }
 56            }
 57        }
 58    }
 59
 60    pub fn acknowledge_message_id(&mut self, channel_id: ChannelId, message_id: u64) {
 61        if let Some(channel) = self.channels_by_id.get_mut(&channel_id) {
 62            let channel = Arc::make_mut(channel);
 63            if let Some(unseen_message_id) = channel.unseen_message_id {
 64                if message_id >= unseen_message_id {
 65                    channel.unseen_message_id = None;
 66                }
 67            }
 68        }
 69    }
 70
 71    pub fn note_changed(&mut self, channel_id: ChannelId, epoch: u64, version: &clock::Global) {
 72        insert_note_changed(&mut self.channels_by_id, channel_id, epoch, version);
 73    }
 74
 75    pub fn new_message(&mut self, channel_id: ChannelId, message_id: u64) {
 76        insert_new_message(&mut self.channels_by_id, channel_id, message_id)
 77    }
 78}
 79
 80impl Deref for ChannelIndex {
 81    type Target = [ChannelPath];
 82
 83    fn deref(&self) -> &Self::Target {
 84        &self.paths
 85    }
 86}
 87
 88/// A guard for ensuring that the paths index maintains its sort and uniqueness
 89/// invariants after a series of insertions
 90#[derive(Debug)]
 91pub struct ChannelPathsInsertGuard<'a> {
 92    paths: &'a mut Vec<ChannelPath>,
 93    channels_by_id: &'a mut BTreeMap<ChannelId, Arc<Channel>>,
 94}
 95
 96impl<'a> ChannelPathsInsertGuard<'a> {
 97    /// Remove the given edge from this index. This will not remove the channel.
 98    /// If this operation would result in a dangling edge, re-insert it.
 99    pub fn delete_edge(&mut self, parent_id: ChannelId, channel_id: ChannelId) {
100        self.paths.retain(|path| {
101            !path
102                .windows(2)
103                .any(|window| window == [parent_id, channel_id])
104        });
105
106        // Ensure that there is at least one channel path in the index
107        if !self
108            .paths
109            .iter()
110            .any(|path| path.iter().any(|id| id == &channel_id))
111        {
112            self.insert_root(channel_id);
113        }
114    }
115
116    pub fn note_changed(&mut self, channel_id: ChannelId, epoch: u64, version: &clock::Global) {
117        insert_note_changed(&mut self.channels_by_id, channel_id, epoch, &version);
118    }
119
120    pub fn new_messages(&mut self, channel_id: ChannelId, message_id: u64) {
121        insert_new_message(&mut self.channels_by_id, channel_id, message_id)
122    }
123
124    pub fn insert(&mut self, channel_proto: proto::Channel) {
125        if let Some(existing_channel) = self.channels_by_id.get_mut(&channel_proto.id) {
126            Arc::make_mut(existing_channel).name = channel_proto.name;
127        } else {
128            self.channels_by_id.insert(
129                channel_proto.id,
130                Arc::new(Channel {
131                    id: channel_proto.id,
132                    name: channel_proto.name,
133                    unseen_note_version: None,
134                    unseen_message_id: None,
135                }),
136            );
137            self.insert_root(channel_proto.id);
138        }
139    }
140
141    pub fn insert_edge(&mut self, channel_id: ChannelId, parent_id: ChannelId) {
142        let mut parents = Vec::new();
143        let mut descendants = Vec::new();
144        let mut ixs_to_remove = Vec::new();
145
146        for (ix, path) in self.paths.iter().enumerate() {
147            if path
148                .windows(2)
149                .any(|window| window[0] == parent_id && window[1] == channel_id)
150            {
151                // We already have this edge in the index
152                return;
153            }
154            if path.ends_with(&[parent_id]) {
155                parents.push(path);
156            } else if let Some(position) = path.iter().position(|id| id == &channel_id) {
157                if position == 0 {
158                    ixs_to_remove.push(ix);
159                }
160                descendants.push(path.split_at(position).1);
161            }
162        }
163
164        let mut new_paths = Vec::new();
165        for parent in parents.iter() {
166            if descendants.is_empty() {
167                let mut new_path = Vec::with_capacity(parent.len() + 1);
168                new_path.extend_from_slice(parent);
169                new_path.push(channel_id);
170                new_paths.push(ChannelPath::new(new_path.into()));
171            } else {
172                for descendant in descendants.iter() {
173                    let mut new_path = Vec::with_capacity(parent.len() + descendant.len());
174                    new_path.extend_from_slice(parent);
175                    new_path.extend_from_slice(descendant);
176                    new_paths.push(ChannelPath::new(new_path.into()));
177                }
178            }
179        }
180
181        for ix in ixs_to_remove.into_iter().rev() {
182            self.paths.swap_remove(ix);
183        }
184        self.paths.extend(new_paths)
185    }
186
187    fn insert_root(&mut self, channel_id: ChannelId) {
188        self.paths.push(ChannelPath::new(Arc::from([channel_id])));
189    }
190}
191
192impl<'a> Drop for ChannelPathsInsertGuard<'a> {
193    fn drop(&mut self) {
194        self.paths.sort_by(|a, b| {
195            let a = channel_path_sorting_key(a, &self.channels_by_id);
196            let b = channel_path_sorting_key(b, &self.channels_by_id);
197            a.cmp(b)
198        });
199        self.paths.dedup();
200    }
201}
202
203fn channel_path_sorting_key<'a>(
204    path: &'a [ChannelId],
205    channels_by_id: &'a BTreeMap<ChannelId, Arc<Channel>>,
206) -> impl 'a + Iterator<Item = Option<&'a str>> {
207    path.iter()
208        .map(|id| Some(channels_by_id.get(id)?.name.as_str()))
209}
210
211fn insert_note_changed(
212    channels_by_id: &mut BTreeMap<ChannelId, Arc<Channel>>,
213    channel_id: u64,
214    epoch: u64,
215    version: &clock::Global,
216) {
217    if let Some(channel) = channels_by_id.get_mut(&channel_id) {
218        let unseen_version = Arc::make_mut(channel)
219            .unseen_note_version
220            .get_or_insert((0, clock::Global::new()));
221        if epoch > unseen_version.0 {
222            *unseen_version = (epoch, version.clone());
223        } else {
224            unseen_version.1.join(&version);
225        }
226    }
227}
228
229fn insert_new_message(
230    channels_by_id: &mut BTreeMap<ChannelId, Arc<Channel>>,
231    channel_id: u64,
232    message_id: u64,
233) {
234    if let Some(channel) = channels_by_id.get_mut(&channel_id) {
235        let unseen_message_id = Arc::make_mut(channel).unseen_message_id.get_or_insert(0);
236        *unseen_message_id = message_id.max(*unseen_message_id);
237    }
238}