1use crate::{Channel, ChannelId};
2use collections::BTreeMap;
3use rpc::proto;
4use std::sync::Arc;
5
6#[derive(Default, Debug)]
7pub struct ChannelIndex {
8 channels_ordered: Vec<ChannelId>,
9 channels_by_id: BTreeMap<ChannelId, Arc<Channel>>,
10}
11
12impl ChannelIndex {
13 pub fn by_id(&self) -> &BTreeMap<ChannelId, Arc<Channel>> {
14 &self.channels_by_id
15 }
16
17 pub fn ordered_channels(&self) -> &[ChannelId] {
18 &self.channels_ordered
19 }
20
21 pub fn clear(&mut self) {
22 self.channels_ordered.clear();
23 self.channels_by_id.clear();
24 }
25
26 /// Delete the given channels from this index.
27 pub fn delete_channels(&mut self, channels: &[ChannelId]) {
28 self.channels_by_id
29 .retain(|channel_id, _| !channels.contains(channel_id));
30 self.channels_ordered
31 .retain(|channel_id| !channels.contains(channel_id));
32 }
33
34 pub fn bulk_insert(&mut self) -> ChannelPathsInsertGuard {
35 ChannelPathsInsertGuard {
36 channels_ordered: &mut self.channels_ordered,
37 channels_by_id: &mut self.channels_by_id,
38 }
39 }
40
41 pub fn acknowledge_note_version(
42 &mut self,
43 channel_id: ChannelId,
44 epoch: u64,
45 version: &clock::Global,
46 ) {
47 if let Some(channel) = self.channels_by_id.get_mut(&channel_id) {
48 let channel = Arc::make_mut(channel);
49 if let Some((unseen_epoch, unseen_version)) = &channel.unseen_note_version {
50 if epoch > *unseen_epoch
51 || epoch == *unseen_epoch && version.observed_all(unseen_version)
52 {
53 channel.unseen_note_version = None;
54 }
55 }
56 }
57 }
58
59 pub fn acknowledge_message_id(&mut self, channel_id: ChannelId, message_id: u64) {
60 if let Some(channel) = self.channels_by_id.get_mut(&channel_id) {
61 let channel = Arc::make_mut(channel);
62 if let Some(unseen_message_id) = channel.unseen_message_id {
63 if message_id >= unseen_message_id {
64 channel.unseen_message_id = None;
65 }
66 }
67 }
68 }
69
70 pub fn note_changed(&mut self, channel_id: ChannelId, epoch: u64, version: &clock::Global) {
71 insert_note_changed(&mut self.channels_by_id, channel_id, epoch, version);
72 }
73
74 pub fn new_message(&mut self, channel_id: ChannelId, message_id: u64) {
75 insert_new_message(&mut self.channels_by_id, channel_id, message_id)
76 }
77}
78
79/// A guard for ensuring that the paths index maintains its sort and uniqueness
80/// invariants after a series of insertions
81#[derive(Debug)]
82pub struct ChannelPathsInsertGuard<'a> {
83 channels_ordered: &'a mut Vec<ChannelId>,
84 channels_by_id: &'a mut BTreeMap<ChannelId, Arc<Channel>>,
85}
86
87impl<'a> ChannelPathsInsertGuard<'a> {
88 pub fn note_changed(&mut self, channel_id: ChannelId, epoch: u64, version: &clock::Global) {
89 insert_note_changed(&mut self.channels_by_id, channel_id, epoch, &version);
90 }
91
92 pub fn new_messages(&mut self, channel_id: ChannelId, message_id: u64) {
93 insert_new_message(&mut self.channels_by_id, channel_id, message_id)
94 }
95
96 pub fn insert(&mut self, channel_proto: proto::Channel) -> bool {
97 let mut ret = false;
98 if let Some(existing_channel) = self.channels_by_id.get_mut(&channel_proto.id) {
99 let existing_channel = Arc::make_mut(existing_channel);
100
101 ret = existing_channel.visibility != channel_proto.visibility()
102 || existing_channel.role != channel_proto.role()
103 || existing_channel.name != channel_proto.name;
104
105 existing_channel.visibility = channel_proto.visibility();
106 existing_channel.role = channel_proto.role();
107 existing_channel.name = channel_proto.name;
108 } else {
109 self.channels_by_id.insert(
110 channel_proto.id,
111 Arc::new(Channel {
112 id: channel_proto.id,
113 visibility: channel_proto.visibility(),
114 role: channel_proto.role(),
115 name: channel_proto.name,
116 unseen_note_version: None,
117 unseen_message_id: None,
118 parent_path: channel_proto.parent_path,
119 }),
120 );
121 self.insert_root(channel_proto.id);
122 }
123 ret
124 }
125
126 fn insert_root(&mut self, channel_id: ChannelId) {
127 self.channels_ordered.push(channel_id);
128 }
129}
130
131impl<'a> Drop for ChannelPathsInsertGuard<'a> {
132 fn drop(&mut self) {
133 self.channels_ordered.sort_by(|a, b| {
134 let a = channel_path_sorting_key(*a, &self.channels_by_id);
135 let b = channel_path_sorting_key(*b, &self.channels_by_id);
136 a.cmp(b)
137 });
138 self.channels_ordered.dedup();
139 }
140}
141
142fn channel_path_sorting_key<'a>(
143 id: ChannelId,
144 channels_by_id: &'a BTreeMap<ChannelId, Arc<Channel>>,
145) -> impl Iterator<Item = &str> {
146 let (parent_path, name) = channels_by_id
147 .get(&id)
148 .map_or((&[] as &[_], None), |channel| {
149 (channel.parent_path.as_slice(), Some(channel.name.as_str()))
150 });
151 parent_path
152 .iter()
153 .filter_map(|id| Some(channels_by_id.get(id)?.name.as_str()))
154 .chain(name)
155}
156
157fn insert_note_changed(
158 channels_by_id: &mut BTreeMap<ChannelId, Arc<Channel>>,
159 channel_id: u64,
160 epoch: u64,
161 version: &clock::Global,
162) {
163 if let Some(channel) = channels_by_id.get_mut(&channel_id) {
164 let unseen_version = Arc::make_mut(channel)
165 .unseen_note_version
166 .get_or_insert((0, clock::Global::new()));
167 if epoch > unseen_version.0 {
168 *unseen_version = (epoch, version.clone());
169 } else {
170 unseen_version.1.join(&version);
171 }
172 }
173}
174
175fn insert_new_message(
176 channels_by_id: &mut BTreeMap<ChannelId, Arc<Channel>>,
177 channel_id: u64,
178 message_id: u64,
179) {
180 if let Some(channel) = channels_by_id.get_mut(&channel_id) {
181 let unseen_message_id = Arc::make_mut(channel).unseen_message_id.get_or_insert(0);
182 *unseen_message_id = message_id.max(*unseen_message_id);
183 }
184}