channel_index.rs

  1use crate::{Channel, ChannelId};
  2use collections::BTreeMap;
  3use rpc::proto;
  4use std::sync::Arc;
  5
  6#[derive(Default, Debug)]
  7pub struct ChannelIndex {
  8    channels_ordered: Vec<ChannelId>,
  9    channels_by_id: BTreeMap<ChannelId, Arc<Channel>>,
 10}
 11
 12impl ChannelIndex {
 13    pub fn by_id(&self) -> &BTreeMap<ChannelId, Arc<Channel>> {
 14        &self.channels_by_id
 15    }
 16
 17    pub fn ordered_channels(&self) -> &[ChannelId] {
 18        &self.channels_ordered
 19    }
 20
 21    pub fn clear(&mut self) {
 22        self.channels_ordered.clear();
 23        self.channels_by_id.clear();
 24    }
 25
 26    /// Delete the given channels from this index.
 27    pub fn delete_channels(&mut self, channels: &[ChannelId]) {
 28        self.channels_by_id
 29            .retain(|channel_id, _| !channels.contains(channel_id));
 30        self.channels_ordered
 31            .retain(|channel_id| !channels.contains(channel_id));
 32    }
 33
 34    pub fn bulk_insert(&mut self) -> ChannelPathsInsertGuard {
 35        ChannelPathsInsertGuard {
 36            channels_ordered: &mut self.channels_ordered,
 37            channels_by_id: &mut self.channels_by_id,
 38        }
 39    }
 40
 41    pub fn acknowledge_note_version(
 42        &mut self,
 43        channel_id: ChannelId,
 44        epoch: u64,
 45        version: &clock::Global,
 46    ) {
 47        if let Some(channel) = self.channels_by_id.get_mut(&channel_id) {
 48            let channel = Arc::make_mut(channel);
 49            if let Some((unseen_epoch, unseen_version)) = &channel.unseen_note_version {
 50                if epoch > *unseen_epoch
 51                    || epoch == *unseen_epoch && version.observed_all(unseen_version)
 52                {
 53                    channel.unseen_note_version = None;
 54                }
 55            }
 56        }
 57    }
 58
 59    pub fn acknowledge_message_id(&mut self, channel_id: ChannelId, message_id: u64) {
 60        if let Some(channel) = self.channels_by_id.get_mut(&channel_id) {
 61            let channel = Arc::make_mut(channel);
 62            if let Some(unseen_message_id) = channel.unseen_message_id {
 63                if message_id >= unseen_message_id {
 64                    channel.unseen_message_id = None;
 65                }
 66            }
 67        }
 68    }
 69
 70    pub fn note_changed(&mut self, channel_id: ChannelId, epoch: u64, version: &clock::Global) {
 71        insert_note_changed(&mut self.channels_by_id, channel_id, epoch, version);
 72    }
 73
 74    pub fn new_message(&mut self, channel_id: ChannelId, message_id: u64) {
 75        insert_new_message(&mut self.channels_by_id, channel_id, message_id)
 76    }
 77}
 78
 79/// A guard for ensuring that the paths index maintains its sort and uniqueness
 80/// invariants after a series of insertions
 81#[derive(Debug)]
 82pub struct ChannelPathsInsertGuard<'a> {
 83    channels_ordered: &'a mut Vec<ChannelId>,
 84    channels_by_id: &'a mut BTreeMap<ChannelId, Arc<Channel>>,
 85}
 86
 87impl<'a> ChannelPathsInsertGuard<'a> {
 88    pub fn note_changed(&mut self, channel_id: ChannelId, epoch: u64, version: &clock::Global) {
 89        insert_note_changed(&mut self.channels_by_id, channel_id, epoch, &version);
 90    }
 91
 92    pub fn new_messages(&mut self, channel_id: ChannelId, message_id: u64) {
 93        insert_new_message(&mut self.channels_by_id, channel_id, message_id)
 94    }
 95
 96    pub fn insert(&mut self, channel_proto: proto::Channel) -> bool {
 97        let mut ret = false;
 98        if let Some(existing_channel) = self.channels_by_id.get_mut(&channel_proto.id) {
 99            let existing_channel = Arc::make_mut(existing_channel);
100
101            ret = existing_channel.visibility != channel_proto.visibility()
102                || existing_channel.role != channel_proto.role()
103                || existing_channel.name != channel_proto.name;
104
105            existing_channel.visibility = channel_proto.visibility();
106            existing_channel.role = channel_proto.role();
107            existing_channel.name = channel_proto.name;
108        } else {
109            self.channels_by_id.insert(
110                channel_proto.id,
111                Arc::new(Channel {
112                    id: channel_proto.id,
113                    visibility: channel_proto.visibility(),
114                    role: channel_proto.role(),
115                    name: channel_proto.name,
116                    unseen_note_version: None,
117                    unseen_message_id: None,
118                    parent_path: channel_proto.parent_path,
119                }),
120            );
121            self.insert_root(channel_proto.id);
122        }
123        ret
124    }
125
126    fn insert_root(&mut self, channel_id: ChannelId) {
127        self.channels_ordered.push(channel_id);
128    }
129}
130
131impl<'a> Drop for ChannelPathsInsertGuard<'a> {
132    fn drop(&mut self) {
133        self.channels_ordered.sort_by(|a, b| {
134            let a = channel_path_sorting_key(*a, &self.channels_by_id);
135            let b = channel_path_sorting_key(*b, &self.channels_by_id);
136            a.cmp(b)
137        });
138        self.channels_ordered.dedup();
139    }
140}
141
142fn channel_path_sorting_key<'a>(
143    id: ChannelId,
144    channels_by_id: &'a BTreeMap<ChannelId, Arc<Channel>>,
145) -> impl Iterator<Item = &str> {
146    let (parent_path, name) = channels_by_id
147        .get(&id)
148        .map_or((&[] as &[_], None), |channel| {
149            (channel.parent_path.as_slice(), Some(channel.name.as_str()))
150        });
151    parent_path
152        .iter()
153        .filter_map(|id| Some(channels_by_id.get(id)?.name.as_str()))
154        .chain(name)
155}
156
157fn insert_note_changed(
158    channels_by_id: &mut BTreeMap<ChannelId, Arc<Channel>>,
159    channel_id: u64,
160    epoch: u64,
161    version: &clock::Global,
162) {
163    if let Some(channel) = channels_by_id.get_mut(&channel_id) {
164        let unseen_version = Arc::make_mut(channel)
165            .unseen_note_version
166            .get_or_insert((0, clock::Global::new()));
167        if epoch > unseen_version.0 {
168            *unseen_version = (epoch, version.clone());
169        } else {
170            unseen_version.1.join(&version);
171        }
172    }
173}
174
175fn insert_new_message(
176    channels_by_id: &mut BTreeMap<ChannelId, Arc<Channel>>,
177    channel_id: u64,
178    message_id: u64,
179) {
180    if let Some(channel) = channels_by_id.get_mut(&channel_id) {
181        let unseen_message_id = Arc::make_mut(channel).unseen_message_id.get_or_insert(0);
182        *unseen_message_id = message_id.max(*unseen_message_id);
183    }
184}