1use anyhow::{Context, Result};
2use channel::{ChannelMessage, ChannelMessageId, ChannelStore};
3use client::{ChannelId, Client, UserStore};
4use collections::HashMap;
5use db::smol::stream::StreamExt;
6use gpui::{
7 AppContext, AsyncAppContext, Context as _, EventEmitter, Global, Model, ModelContext, Task,
8};
9use rpc::{proto, Notification, TypedEnvelope};
10use std::{ops::Range, sync::Arc};
11use sum_tree::{Bias, SumTree};
12use time::OffsetDateTime;
13use util::ResultExt;
14
15pub fn init(client: Arc<Client>, user_store: Model<UserStore>, cx: &mut AppContext) {
16 let notification_store = cx.new_model(|cx| NotificationStore::new(client, user_store, cx));
17 cx.set_global(GlobalNotificationStore(notification_store));
18}
19
20struct GlobalNotificationStore(Model<NotificationStore>);
21
22impl Global for GlobalNotificationStore {}
23
24pub struct NotificationStore {
25 client: Arc<Client>,
26 user_store: Model<UserStore>,
27 channel_messages: HashMap<u64, ChannelMessage>,
28 channel_store: Model<ChannelStore>,
29 notifications: SumTree<NotificationEntry>,
30 loaded_all_notifications: bool,
31 _watch_connection_status: Task<Option<()>>,
32 _subscriptions: Vec<client::Subscription>,
33}
34
35#[derive(Clone, PartialEq, Eq, Debug)]
36pub enum NotificationEvent {
37 NotificationsUpdated {
38 old_range: Range<usize>,
39 new_count: usize,
40 },
41 NewNotification {
42 entry: NotificationEntry,
43 },
44 NotificationRemoved {
45 entry: NotificationEntry,
46 },
47 NotificationRead {
48 entry: NotificationEntry,
49 },
50}
51
52#[derive(Debug, PartialEq, Eq, Clone)]
53pub struct NotificationEntry {
54 pub id: u64,
55 pub notification: Notification,
56 pub timestamp: OffsetDateTime,
57 pub is_read: bool,
58 pub response: Option<bool>,
59}
60
61#[derive(Clone, Debug, Default)]
62pub struct NotificationSummary {
63 max_id: u64,
64 count: usize,
65 unread_count: usize,
66}
67
68#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, PartialOrd, Ord)]
69struct Count(usize);
70
71#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, PartialOrd, Ord)]
72struct NotificationId(u64);
73
74impl NotificationStore {
75 pub fn global(cx: &AppContext) -> Model<Self> {
76 cx.global::<GlobalNotificationStore>().0.clone()
77 }
78
79 pub fn new(
80 client: Arc<Client>,
81 user_store: Model<UserStore>,
82 cx: &mut ModelContext<Self>,
83 ) -> Self {
84 let mut connection_status = client.status();
85 let watch_connection_status = cx.spawn(|this, mut cx| async move {
86 while let Some(status) = connection_status.next().await {
87 let this = this.upgrade()?;
88 match status {
89 client::Status::Connected { .. } => {
90 if let Some(task) = this
91 .update(&mut cx, |this, cx| this.handle_connect(cx))
92 .log_err()?
93 {
94 task.await.log_err()?;
95 }
96 }
97 _ => this
98 .update(&mut cx, |this, cx| this.handle_disconnect(cx))
99 .log_err()?,
100 }
101 }
102 Some(())
103 });
104
105 Self {
106 channel_store: ChannelStore::global(cx),
107 notifications: Default::default(),
108 loaded_all_notifications: false,
109 channel_messages: Default::default(),
110 _watch_connection_status: watch_connection_status,
111 _subscriptions: vec![
112 client.add_message_handler(cx.weak_model(), Self::handle_new_notification),
113 client.add_message_handler(cx.weak_model(), Self::handle_delete_notification),
114 client.add_message_handler(cx.weak_model(), Self::handle_update_notification),
115 ],
116 user_store,
117 client,
118 }
119 }
120
121 pub fn notification_count(&self) -> usize {
122 self.notifications.summary().count
123 }
124
125 pub fn unread_notification_count(&self) -> usize {
126 self.notifications.summary().unread_count
127 }
128
129 pub fn channel_message_for_id(&self, id: u64) -> Option<&ChannelMessage> {
130 self.channel_messages.get(&id)
131 }
132
133 // Get the nth newest notification.
134 pub fn notification_at(&self, ix: usize) -> Option<&NotificationEntry> {
135 let count = self.notifications.summary().count;
136 if ix >= count {
137 return None;
138 }
139 let ix = count - 1 - ix;
140 let mut cursor = self.notifications.cursor::<Count>();
141 cursor.seek(&Count(ix), Bias::Right, &());
142 cursor.item()
143 }
144
145 pub fn notification_for_id(&self, id: u64) -> Option<&NotificationEntry> {
146 let mut cursor = self.notifications.cursor::<NotificationId>();
147 cursor.seek(&NotificationId(id), Bias::Left, &());
148 if let Some(item) = cursor.item() {
149 if item.id == id {
150 return Some(item);
151 }
152 }
153 None
154 }
155
156 pub fn load_more_notifications(
157 &self,
158 clear_old: bool,
159 cx: &mut ModelContext<Self>,
160 ) -> Option<Task<Result<()>>> {
161 if self.loaded_all_notifications && !clear_old {
162 return None;
163 }
164
165 let before_id = if clear_old {
166 None
167 } else {
168 self.notifications.first().map(|entry| entry.id)
169 };
170 let request = self.client.request(proto::GetNotifications { before_id });
171 Some(cx.spawn(|this, mut cx| async move {
172 let this = this
173 .upgrade()
174 .context("Notification store was dropped while loading notifications")?;
175
176 let response = request.await?;
177 this.update(&mut cx, |this, _| {
178 this.loaded_all_notifications = response.done
179 })?;
180 Self::add_notifications(
181 this,
182 response.notifications,
183 AddNotificationsOptions {
184 is_new: false,
185 clear_old,
186 includes_first: response.done,
187 },
188 cx,
189 )
190 .await?;
191 Ok(())
192 }))
193 }
194
195 fn handle_connect(&mut self, cx: &mut ModelContext<Self>) -> Option<Task<Result<()>>> {
196 self.notifications = Default::default();
197 self.channel_messages = Default::default();
198 cx.notify();
199 self.load_more_notifications(true, cx)
200 }
201
202 fn handle_disconnect(&mut self, cx: &mut ModelContext<Self>) {
203 cx.notify()
204 }
205
206 async fn handle_new_notification(
207 this: Model<Self>,
208 envelope: TypedEnvelope<proto::AddNotification>,
209 cx: AsyncAppContext,
210 ) -> Result<()> {
211 Self::add_notifications(
212 this,
213 envelope.payload.notification.into_iter().collect(),
214 AddNotificationsOptions {
215 is_new: true,
216 clear_old: false,
217 includes_first: false,
218 },
219 cx,
220 )
221 .await
222 }
223
224 async fn handle_delete_notification(
225 this: Model<Self>,
226 envelope: TypedEnvelope<proto::DeleteNotification>,
227 mut cx: AsyncAppContext,
228 ) -> Result<()> {
229 this.update(&mut cx, |this, cx| {
230 this.splice_notifications([(envelope.payload.notification_id, None)], false, cx);
231 Ok(())
232 })?
233 }
234
235 async fn handle_update_notification(
236 this: Model<Self>,
237 envelope: TypedEnvelope<proto::UpdateNotification>,
238 mut cx: AsyncAppContext,
239 ) -> Result<()> {
240 this.update(&mut cx, |this, cx| {
241 if let Some(notification) = envelope.payload.notification {
242 if let Some(rpc::Notification::ChannelMessageMention {
243 message_id,
244 sender_id: _,
245 channel_id: _,
246 }) = Notification::from_proto(¬ification)
247 {
248 let fetch_message_task = this.channel_store.update(cx, |this, cx| {
249 this.fetch_channel_messages(vec![message_id], cx)
250 });
251
252 cx.spawn(|this, mut cx| async move {
253 let messages = fetch_message_task.await?;
254 this.update(&mut cx, move |this, cx| {
255 for message in messages {
256 this.channel_messages.insert(message_id, message);
257 }
258 cx.notify();
259 })
260 })
261 .detach_and_log_err(cx)
262 }
263 }
264 Ok(())
265 })?
266 }
267
268 async fn add_notifications(
269 this: Model<Self>,
270 notifications: Vec<proto::Notification>,
271 options: AddNotificationsOptions,
272 mut cx: AsyncAppContext,
273 ) -> Result<()> {
274 let mut user_ids = Vec::new();
275 let mut message_ids = Vec::new();
276
277 let notifications = notifications
278 .into_iter()
279 .filter_map(|message| {
280 Some(NotificationEntry {
281 id: message.id,
282 is_read: message.is_read,
283 timestamp: OffsetDateTime::from_unix_timestamp(message.timestamp as i64)
284 .ok()?,
285 notification: Notification::from_proto(&message)?,
286 response: message.response,
287 })
288 })
289 .collect::<Vec<_>>();
290 if notifications.is_empty() {
291 return Ok(());
292 }
293
294 for entry in ¬ifications {
295 match entry.notification {
296 Notification::ChannelInvitation { inviter_id, .. } => {
297 user_ids.push(inviter_id);
298 }
299 Notification::ContactRequest {
300 sender_id: requester_id,
301 } => {
302 user_ids.push(requester_id);
303 }
304 Notification::ContactRequestAccepted {
305 responder_id: contact_id,
306 } => {
307 user_ids.push(contact_id);
308 }
309 Notification::ChannelMessageMention {
310 sender_id,
311 message_id,
312 ..
313 } => {
314 user_ids.push(sender_id);
315 message_ids.push(message_id);
316 }
317 }
318 }
319
320 let (user_store, channel_store) = this.read_with(&cx, |this, _| {
321 (this.user_store.clone(), this.channel_store.clone())
322 })?;
323
324 user_store
325 .update(&mut cx, |store, cx| store.get_users(user_ids, cx))?
326 .await?;
327 let messages = channel_store
328 .update(&mut cx, |store, cx| {
329 store.fetch_channel_messages(message_ids, cx)
330 })?
331 .await?;
332 this.update(&mut cx, |this, cx| {
333 if options.clear_old {
334 cx.emit(NotificationEvent::NotificationsUpdated {
335 old_range: 0..this.notifications.summary().count,
336 new_count: 0,
337 });
338 this.notifications = SumTree::default();
339 this.channel_messages.clear();
340 this.loaded_all_notifications = false;
341 }
342
343 if options.includes_first {
344 this.loaded_all_notifications = true;
345 }
346
347 this.channel_messages
348 .extend(messages.into_iter().filter_map(|message| {
349 if let ChannelMessageId::Saved(id) = message.id {
350 Some((id, message))
351 } else {
352 None
353 }
354 }));
355
356 this.splice_notifications(
357 notifications
358 .into_iter()
359 .map(|notification| (notification.id, Some(notification))),
360 options.is_new,
361 cx,
362 );
363 })
364 .log_err();
365
366 Ok(())
367 }
368
369 fn splice_notifications(
370 &mut self,
371 notifications: impl IntoIterator<Item = (u64, Option<NotificationEntry>)>,
372 is_new: bool,
373 cx: &mut ModelContext<'_, NotificationStore>,
374 ) {
375 let mut cursor = self.notifications.cursor::<(NotificationId, Count)>();
376 let mut new_notifications = SumTree::new();
377 let mut old_range = 0..0;
378
379 for (i, (id, new_notification)) in notifications.into_iter().enumerate() {
380 new_notifications.append(cursor.slice(&NotificationId(id), Bias::Left, &()), &());
381
382 if i == 0 {
383 old_range.start = cursor.start().1 .0;
384 }
385
386 let old_notification = cursor.item();
387 if let Some(old_notification) = old_notification {
388 if old_notification.id == id {
389 cursor.next(&());
390
391 if let Some(new_notification) = &new_notification {
392 if new_notification.is_read {
393 cx.emit(NotificationEvent::NotificationRead {
394 entry: new_notification.clone(),
395 });
396 }
397 } else {
398 cx.emit(NotificationEvent::NotificationRemoved {
399 entry: old_notification.clone(),
400 });
401 }
402 }
403 } else if let Some(new_notification) = &new_notification {
404 if is_new {
405 cx.emit(NotificationEvent::NewNotification {
406 entry: new_notification.clone(),
407 });
408 }
409 }
410
411 if let Some(notification) = new_notification {
412 new_notifications.push(notification, &());
413 }
414 }
415
416 old_range.end = cursor.start().1 .0;
417 let new_count = new_notifications.summary().count - old_range.start;
418 new_notifications.append(cursor.suffix(&()), &());
419 drop(cursor);
420
421 self.notifications = new_notifications;
422 cx.emit(NotificationEvent::NotificationsUpdated {
423 old_range,
424 new_count,
425 });
426 }
427
428 pub fn respond_to_notification(
429 &mut self,
430 notification: Notification,
431 response: bool,
432 cx: &mut ModelContext<Self>,
433 ) {
434 match notification {
435 Notification::ContactRequest { sender_id } => {
436 self.user_store
437 .update(cx, |store, cx| {
438 store.respond_to_contact_request(sender_id, response, cx)
439 })
440 .detach();
441 }
442 Notification::ChannelInvitation { channel_id, .. } => {
443 self.channel_store
444 .update(cx, |store, cx| {
445 store.respond_to_channel_invite(ChannelId(channel_id), response, cx)
446 })
447 .detach();
448 }
449 _ => {}
450 }
451 }
452}
453
454impl EventEmitter<NotificationEvent> for NotificationStore {}
455
456impl sum_tree::Item for NotificationEntry {
457 type Summary = NotificationSummary;
458
459 fn summary(&self) -> Self::Summary {
460 NotificationSummary {
461 max_id: self.id,
462 count: 1,
463 unread_count: if self.is_read { 0 } else { 1 },
464 }
465 }
466}
467
468impl sum_tree::Summary for NotificationSummary {
469 type Context = ();
470
471 fn add_summary(&mut self, summary: &Self, _: &()) {
472 self.max_id = self.max_id.max(summary.max_id);
473 self.count += summary.count;
474 self.unread_count += summary.unread_count;
475 }
476}
477
478impl<'a> sum_tree::Dimension<'a, NotificationSummary> for NotificationId {
479 fn add_summary(&mut self, summary: &NotificationSummary, _: &()) {
480 debug_assert!(summary.max_id > self.0);
481 self.0 = summary.max_id;
482 }
483}
484
485impl<'a> sum_tree::Dimension<'a, NotificationSummary> for Count {
486 fn add_summary(&mut self, summary: &NotificationSummary, _: &()) {
487 self.0 += summary.count;
488 }
489}
490
491struct AddNotificationsOptions {
492 is_new: bool,
493 clear_old: bool,
494 includes_first: bool,
495}