port notifications2 and integration tests (#3283)

Mikayla Maki created

Release Notes:

- N/A

Change summary

Cargo.lock                                              |  21 
Cargo.toml                                              |   1 
crates/collab2/Cargo.toml                               |   4 
crates/collab2/src/tests/channel_message_tests.rs       | 204 ++--
crates/collab2/src/tests/channel_tests.rs               |   2 
crates/collab2/src/tests/notification_tests.rs          | 320 +++---
crates/collab2/src/tests/random_channel_buffer_tests.rs |   8 
crates/collab2/src/tests/randomized_test_helpers.rs     |   2 
crates/collab2/src/tests/test_server.rs                 |  17 
crates/notifications2/Cargo.toml                        |  42 
crates/notifications2/src/notification_store2.rs        | 466 +++++++++++
11 files changed, 803 insertions(+), 284 deletions(-)

Detailed changes

Cargo.lock 🔗

@@ -1748,6 +1748,7 @@ dependencies = [
  "lsp2",
  "nanoid",
  "node_runtime",
+ "notifications2",
  "parking_lot 0.11.2",
  "pretty_assertions",
  "project2",
@@ -5484,6 +5485,26 @@ dependencies = [
  "util",
 ]
 
+[[package]]
+name = "notifications2"
+version = "0.1.0"
+dependencies = [
+ "anyhow",
+ "channel2",
+ "client2",
+ "clock",
+ "collections",
+ "db2",
+ "feature_flags2",
+ "gpui2",
+ "rpc2",
+ "settings2",
+ "sum_tree",
+ "text2",
+ "time",
+ "util",
+]
+
 [[package]]
 name = "ntapi"
 version = "0.3.7"

Cargo.toml 🔗

@@ -67,6 +67,7 @@ members = [
     "crates/multi_buffer2",
     "crates/node_runtime",
     "crates/notifications",
+    "crates/notifications2",
     "crates/outline",
     "crates/picker",
     "crates/picker2",

crates/collab2/Cargo.toml 🔗

@@ -72,10 +72,8 @@ fs = { package = "fs2", path = "../fs2", features = ["test-support"] }
 git = { package = "git3", path = "../git3", features = ["test-support"] }
 live_kit_client = { package = "live_kit_client2", path = "../live_kit_client2", features = ["test-support"] }
 lsp = { package = "lsp2", path = "../lsp2", features = ["test-support"] }
-
 node_runtime = { path = "../node_runtime" }
-#todo!(notifications)
-#notifications = { path = "../notifications", features = ["test-support"] }
+notifications = { package = "notifications2", path = "../notifications2", features = ["test-support"] }
 
 project = { package = "project2", path = "../project2", features = ["test-support"] }
 rpc = { package = "rpc2", path = "../rpc2", features = ["test-support"] }

crates/collab2/src/tests/channel_message_tests.rs 🔗

@@ -1,115 +1,115 @@
 use crate::{rpc::RECONNECT_TIMEOUT, tests::TestServer};
-use channel::{ChannelChat, ChannelMessageId};
+use channel::{ChannelChat, ChannelMessageId, MessageParams};
 use gpui::{BackgroundExecutor, Model, TestAppContext};
+use rpc::Notification;
 
-// todo!(notifications)
-// #[gpui::test]
-// async fn test_basic_channel_messages(
-//     executor: BackgroundExecutor,
-//     mut cx_a: &mut TestAppContext,
-//     mut cx_b: &mut TestAppContext,
-//     mut cx_c: &mut TestAppContext,
-// ) {
-//     let mut server = TestServer::start(executor.clone()).await;
-//     let client_a = server.create_client(cx_a, "user_a").await;
-//     let client_b = server.create_client(cx_b, "user_b").await;
-//     let client_c = server.create_client(cx_c, "user_c").await;
+#[gpui::test]
+async fn test_basic_channel_messages(
+    executor: BackgroundExecutor,
+    mut cx_a: &mut TestAppContext,
+    mut cx_b: &mut TestAppContext,
+    mut cx_c: &mut TestAppContext,
+) {
+    let mut server = TestServer::start(executor.clone()).await;
+    let client_a = server.create_client(cx_a, "user_a").await;
+    let client_b = server.create_client(cx_b, "user_b").await;
+    let client_c = server.create_client(cx_c, "user_c").await;
 
-//     let channel_id = server
-//         .make_channel(
-//             "the-channel",
-//             None,
-//             (&client_a, cx_a),
-//             &mut [(&client_b, cx_b), (&client_c, cx_c)],
-//         )
-//         .await;
+    let channel_id = server
+        .make_channel(
+            "the-channel",
+            None,
+            (&client_a, cx_a),
+            &mut [(&client_b, cx_b), (&client_c, cx_c)],
+        )
+        .await;
 
-//     let channel_chat_a = client_a
-//         .channel_store()
-//         .update(cx_a, |store, cx| store.open_channel_chat(channel_id, cx))
-//         .await
-//         .unwrap();
-//     let channel_chat_b = client_b
-//         .channel_store()
-//         .update(cx_b, |store, cx| store.open_channel_chat(channel_id, cx))
-//         .await
-//         .unwrap();
+    let channel_chat_a = client_a
+        .channel_store()
+        .update(cx_a, |store, cx| store.open_channel_chat(channel_id, cx))
+        .await
+        .unwrap();
+    let channel_chat_b = client_b
+        .channel_store()
+        .update(cx_b, |store, cx| store.open_channel_chat(channel_id, cx))
+        .await
+        .unwrap();
 
-//     let message_id = channel_chat_a
-//         .update(cx_a, |c, cx| {
-//             c.send_message(
-//                 MessageParams {
-//                     text: "hi @user_c!".into(),
-//                     mentions: vec![(3..10, client_c.id())],
-//                 },
-//                 cx,
-//             )
-//             .unwrap()
-//         })
-//         .await
-//         .unwrap();
-//     channel_chat_a
-//         .update(cx_a, |c, cx| c.send_message("two".into(), cx).unwrap())
-//         .await
-//         .unwrap();
+    let message_id = channel_chat_a
+        .update(cx_a, |c, cx| {
+            c.send_message(
+                MessageParams {
+                    text: "hi @user_c!".into(),
+                    mentions: vec![(3..10, client_c.id())],
+                },
+                cx,
+            )
+            .unwrap()
+        })
+        .await
+        .unwrap();
+    channel_chat_a
+        .update(cx_a, |c, cx| c.send_message("two".into(), cx).unwrap())
+        .await
+        .unwrap();
 
-//     executor.run_until_parked();
-//     channel_chat_b
-//         .update(cx_b, |c, cx| c.send_message("three".into(), cx).unwrap())
-//         .await
-//         .unwrap();
+    executor.run_until_parked();
+    channel_chat_b
+        .update(cx_b, |c, cx| c.send_message("three".into(), cx).unwrap())
+        .await
+        .unwrap();
 
-//     executor.run_until_parked();
+    executor.run_until_parked();
 
-//     let channel_chat_c = client_c
-//         .channel_store()
-//         .update(cx_c, |store, cx| store.open_channel_chat(channel_id, cx))
-//         .await
-//         .unwrap();
+    let channel_chat_c = client_c
+        .channel_store()
+        .update(cx_c, |store, cx| store.open_channel_chat(channel_id, cx))
+        .await
+        .unwrap();
 
-//     for (chat, cx) in [
-//         (&channel_chat_a, &mut cx_a),
-//         (&channel_chat_b, &mut cx_b),
-//         (&channel_chat_c, &mut cx_c),
-//     ] {
-//         chat.update(*cx, |c, _| {
-//             assert_eq!(
-//                 c.messages()
-//                     .iter()
-//                     .map(|m| (m.body.as_str(), m.mentions.as_slice()))
-//                     .collect::<Vec<_>>(),
-//                 vec![
-//                     ("hi @user_c!", [(3..10, client_c.id())].as_slice()),
-//                     ("two", &[]),
-//                     ("three", &[])
-//                 ],
-//                 "results for user {}",
-//                 c.client().id(),
-//             );
-//         });
-//     }
-
-//     client_c.notification_store().update(cx_c, |store, _| {
-//         assert_eq!(store.notification_count(), 2);
-//         assert_eq!(store.unread_notification_count(), 1);
-//         assert_eq!(
-//             store.notification_at(0).unwrap().notification,
-//             Notification::ChannelMessageMention {
-//                 message_id,
-//                 sender_id: client_a.id(),
-//                 channel_id,
-//             }
-//         );
-//         assert_eq!(
-//             store.notification_at(1).unwrap().notification,
-//             Notification::ChannelInvitation {
-//                 channel_id,
-//                 channel_name: "the-channel".to_string(),
-//                 inviter_id: client_a.id()
-//             }
-//         );
-//     });
-// }
+    for (chat, cx) in [
+        (&channel_chat_a, &mut cx_a),
+        (&channel_chat_b, &mut cx_b),
+        (&channel_chat_c, &mut cx_c),
+    ] {
+        chat.update(*cx, |c, _| {
+            assert_eq!(
+                c.messages()
+                    .iter()
+                    .map(|m| (m.body.as_str(), m.mentions.as_slice()))
+                    .collect::<Vec<_>>(),
+                vec![
+                    ("hi @user_c!", [(3..10, client_c.id())].as_slice()),
+                    ("two", &[]),
+                    ("three", &[])
+                ],
+                "results for user {}",
+                c.client().id(),
+            );
+        });
+    }
+
+    client_c.notification_store().update(cx_c, |store, _| {
+        assert_eq!(store.notification_count(), 2);
+        assert_eq!(store.unread_notification_count(), 1);
+        assert_eq!(
+            store.notification_at(0).unwrap().notification,
+            Notification::ChannelMessageMention {
+                message_id,
+                sender_id: client_a.id(),
+                channel_id,
+            }
+        );
+        assert_eq!(
+            store.notification_at(1).unwrap().notification,
+            Notification::ChannelInvitation {
+                channel_id,
+                channel_name: "the-channel".to_string(),
+                inviter_id: client_a.id()
+            }
+        );
+    });
+}
 
 #[gpui::test]
 async fn test_rejoin_channel_chat(

crates/collab2/src/tests/channel_tests.rs 🔗

@@ -1128,6 +1128,8 @@ async fn test_channel_link_notifications(
         .await
         .unwrap();
 
+    executor.run_until_parked();
+
     // the members-only channel is still shown for c, but hidden for b
     assert_channels_list_shape(
         client_b.channel_store(),

crates/collab2/src/tests/notification_tests.rs 🔗

@@ -1,160 +1,160 @@
-//todo!(notifications)
-// use crate::tests::TestServer;
-// use gpui::{executor::Deterministic, TestAppContext};
-// use notifications::NotificationEvent;
-// use parking_lot::Mutex;
-// use rpc::{proto, Notification};
-// use std::sync::Arc;
-
-// #[gpui::test]
-// async fn test_notifications(
-//     deterministic: Arc<Deterministic>,
-//     cx_a: &mut TestAppContext,
-//     cx_b: &mut TestAppContext,
-// ) {
-//     deterministic.forbid_parking();
-//     let mut server = TestServer::start(&deterministic).await;
-//     let client_a = server.create_client(cx_a, "user_a").await;
-//     let client_b = server.create_client(cx_b, "user_b").await;
-
-//     let notification_events_a = Arc::new(Mutex::new(Vec::new()));
-//     let notification_events_b = Arc::new(Mutex::new(Vec::new()));
-//     client_a.notification_store().update(cx_a, |_, cx| {
-//         let events = notification_events_a.clone();
-//         cx.subscribe(&cx.handle(), move |_, _, event, _| {
-//             events.lock().push(event.clone());
-//         })
-//         .detach()
-//     });
-//     client_b.notification_store().update(cx_b, |_, cx| {
-//         let events = notification_events_b.clone();
-//         cx.subscribe(&cx.handle(), move |_, _, event, _| {
-//             events.lock().push(event.clone());
-//         })
-//         .detach()
-//     });
-
-//     // Client A sends a contact request to client B.
-//     client_a
-//         .user_store()
-//         .update(cx_a, |store, cx| store.request_contact(client_b.id(), cx))
-//         .await
-//         .unwrap();
-
-//     // Client B receives a contact request notification and responds to the
-//     // request, accepting it.
-//     deterministic.run_until_parked();
-//     client_b.notification_store().update(cx_b, |store, cx| {
-//         assert_eq!(store.notification_count(), 1);
-//         assert_eq!(store.unread_notification_count(), 1);
-
-//         let entry = store.notification_at(0).unwrap();
-//         assert_eq!(
-//             entry.notification,
-//             Notification::ContactRequest {
-//                 sender_id: client_a.id()
-//             }
-//         );
-//         assert!(!entry.is_read);
-//         assert_eq!(
-//             &notification_events_b.lock()[0..],
-//             &[
-//                 NotificationEvent::NewNotification {
-//                     entry: entry.clone(),
-//                 },
-//                 NotificationEvent::NotificationsUpdated {
-//                     old_range: 0..0,
-//                     new_count: 1
-//                 }
-//             ]
-//         );
-
-//         store.respond_to_notification(entry.notification.clone(), true, cx);
-//     });
-
-//     // Client B sees the notification is now read, and that they responded.
-//     deterministic.run_until_parked();
-//     client_b.notification_store().read_with(cx_b, |store, _| {
-//         assert_eq!(store.notification_count(), 1);
-//         assert_eq!(store.unread_notification_count(), 0);
-
-//         let entry = store.notification_at(0).unwrap();
-//         assert!(entry.is_read);
-//         assert_eq!(entry.response, Some(true));
-//         assert_eq!(
-//             &notification_events_b.lock()[2..],
-//             &[
-//                 NotificationEvent::NotificationRead {
-//                     entry: entry.clone(),
-//                 },
-//                 NotificationEvent::NotificationsUpdated {
-//                     old_range: 0..1,
-//                     new_count: 1
-//                 }
-//             ]
-//         );
-//     });
-
-//     // Client A receives a notification that client B accepted their request.
-//     client_a.notification_store().read_with(cx_a, |store, _| {
-//         assert_eq!(store.notification_count(), 1);
-//         assert_eq!(store.unread_notification_count(), 1);
-
-//         let entry = store.notification_at(0).unwrap();
-//         assert_eq!(
-//             entry.notification,
-//             Notification::ContactRequestAccepted {
-//                 responder_id: client_b.id()
-//             }
-//         );
-//         assert!(!entry.is_read);
-//     });
-
-//     // Client A creates a channel and invites client B to be a member.
-//     let channel_id = client_a
-//         .channel_store()
-//         .update(cx_a, |store, cx| {
-//             store.create_channel("the-channel", None, cx)
-//         })
-//         .await
-//         .unwrap();
-//     client_a
-//         .channel_store()
-//         .update(cx_a, |store, cx| {
-//             store.invite_member(channel_id, client_b.id(), proto::ChannelRole::Member, cx)
-//         })
-//         .await
-//         .unwrap();
-
-//     // Client B receives a channel invitation notification and responds to the
-//     // invitation, accepting it.
-//     deterministic.run_until_parked();
-//     client_b.notification_store().update(cx_b, |store, cx| {
-//         assert_eq!(store.notification_count(), 2);
-//         assert_eq!(store.unread_notification_count(), 1);
-
-//         let entry = store.notification_at(0).unwrap();
-//         assert_eq!(
-//             entry.notification,
-//             Notification::ChannelInvitation {
-//                 channel_id,
-//                 channel_name: "the-channel".to_string(),
-//                 inviter_id: client_a.id()
-//             }
-//         );
-//         assert!(!entry.is_read);
-
-//         store.respond_to_notification(entry.notification.clone(), true, cx);
-//     });
-
-//     // Client B sees the notification is now read, and that they responded.
-//     deterministic.run_until_parked();
-//     client_b.notification_store().read_with(cx_b, |store, _| {
-//         assert_eq!(store.notification_count(), 2);
-//         assert_eq!(store.unread_notification_count(), 0);
-
-//         let entry = store.notification_at(0).unwrap();
-//         assert!(entry.is_read);
-//         assert_eq!(entry.response, Some(true));
-//     });
-// }
+use std::sync::Arc;
+
+use gpui::{BackgroundExecutor, TestAppContext};
+use notifications::NotificationEvent;
+use parking_lot::Mutex;
+use rpc::{proto, Notification};
+
+use crate::tests::TestServer;
+
+#[gpui::test]
+async fn test_notifications(
+    executor: BackgroundExecutor,
+    cx_a: &mut TestAppContext,
+    cx_b: &mut TestAppContext,
+) {
+    let mut server = TestServer::start(executor.clone()).await;
+    let client_a = server.create_client(cx_a, "user_a").await;
+    let client_b = server.create_client(cx_b, "user_b").await;
+
+    let notification_events_a = Arc::new(Mutex::new(Vec::new()));
+    let notification_events_b = Arc::new(Mutex::new(Vec::new()));
+    client_a.notification_store().update(cx_a, |_, cx| {
+        let events = notification_events_a.clone();
+        cx.subscribe(&cx.handle(), move |_, _, event, _| {
+            events.lock().push(event.clone());
+        })
+        .detach()
+    });
+    client_b.notification_store().update(cx_b, |_, cx| {
+        let events = notification_events_b.clone();
+        cx.subscribe(&cx.handle(), move |_, _, event, _| {
+            events.lock().push(event.clone());
+        })
+        .detach()
+    });
+
+    // Client A sends a contact request to client B.
+    client_a
+        .user_store()
+        .update(cx_a, |store, cx| store.request_contact(client_b.id(), cx))
+        .await
+        .unwrap();
+
+    // Client B receives a contact request notification and responds to the
+    // request, accepting it.
+    executor.run_until_parked();
+    client_b.notification_store().update(cx_b, |store, cx| {
+        assert_eq!(store.notification_count(), 1);
+        assert_eq!(store.unread_notification_count(), 1);
+
+        let entry = store.notification_at(0).unwrap();
+        assert_eq!(
+            entry.notification,
+            Notification::ContactRequest {
+                sender_id: client_a.id()
+            }
+        );
+        assert!(!entry.is_read);
+        assert_eq!(
+            &notification_events_b.lock()[0..],
+            &[
+                NotificationEvent::NewNotification {
+                    entry: entry.clone(),
+                },
+                NotificationEvent::NotificationsUpdated {
+                    old_range: 0..0,
+                    new_count: 1
+                }
+            ]
+        );
+
+        store.respond_to_notification(entry.notification.clone(), true, cx);
+    });
+
+    // Client B sees the notification is now read, and that they responded.
+    executor.run_until_parked();
+    client_b.notification_store().read_with(cx_b, |store, _| {
+        assert_eq!(store.notification_count(), 1);
+        assert_eq!(store.unread_notification_count(), 0);
+
+        let entry = store.notification_at(0).unwrap();
+        assert!(entry.is_read);
+        assert_eq!(entry.response, Some(true));
+        assert_eq!(
+            &notification_events_b.lock()[2..],
+            &[
+                NotificationEvent::NotificationRead {
+                    entry: entry.clone(),
+                },
+                NotificationEvent::NotificationsUpdated {
+                    old_range: 0..1,
+                    new_count: 1
+                }
+            ]
+        );
+    });
+
+    // Client A receives a notification that client B accepted their request.
+    client_a.notification_store().read_with(cx_a, |store, _| {
+        assert_eq!(store.notification_count(), 1);
+        assert_eq!(store.unread_notification_count(), 1);
+
+        let entry = store.notification_at(0).unwrap();
+        assert_eq!(
+            entry.notification,
+            Notification::ContactRequestAccepted {
+                responder_id: client_b.id()
+            }
+        );
+        assert!(!entry.is_read);
+    });
+
+    // Client A creates a channel and invites client B to be a member.
+    let channel_id = client_a
+        .channel_store()
+        .update(cx_a, |store, cx| {
+            store.create_channel("the-channel", None, cx)
+        })
+        .await
+        .unwrap();
+    client_a
+        .channel_store()
+        .update(cx_a, |store, cx| {
+            store.invite_member(channel_id, client_b.id(), proto::ChannelRole::Member, cx)
+        })
+        .await
+        .unwrap();
+
+    // Client B receives a channel invitation notification and responds to the
+    // invitation, accepting it.
+    executor.run_until_parked();
+    client_b.notification_store().update(cx_b, |store, cx| {
+        assert_eq!(store.notification_count(), 2);
+        assert_eq!(store.unread_notification_count(), 1);
+
+        let entry = store.notification_at(0).unwrap();
+        assert_eq!(
+            entry.notification,
+            Notification::ChannelInvitation {
+                channel_id,
+                channel_name: "the-channel".to_string(),
+                inviter_id: client_a.id()
+            }
+        );
+        assert!(!entry.is_read);
+
+        store.respond_to_notification(entry.notification.clone(), true, cx);
+    });
+
+    // Client B sees the notification is now read, and that they responded.
+    executor.run_until_parked();
+    client_b.notification_store().read_with(cx_b, |store, _| {
+        assert_eq!(store.notification_count(), 2);
+        assert_eq!(store.unread_notification_count(), 0);
+
+        let entry = store.notification_at(0).unwrap();
+        assert!(entry.is_read);
+        assert_eq!(entry.response, Some(true));
+    });
+}

crates/collab2/src/tests/random_channel_buffer_tests.rs 🔗

@@ -220,14 +220,6 @@ impl RandomizedTest for RandomChannelBufferTest {
         Ok(())
     }
 
-    async fn on_client_added(client: &Rc<TestClient>, cx: &mut TestAppContext) {
-        let channel_store = client.channel_store();
-        while channel_store.read_with(cx, |store, _| store.channel_count() == 0) {
-            // todo!(notifications)
-            // channel_store.next_notification(cx).await;
-        }
-    }
-
     async fn on_quiesce(server: &mut TestServer, clients: &mut [(Rc<TestClient>, TestAppContext)]) {
         let channels = server.app_state.db.all_channels().await.unwrap();
 

crates/collab2/src/tests/randomized_test_helpers.rs 🔗

@@ -115,7 +115,7 @@ pub trait RandomizedTest: 'static + Sized {
 
     async fn initialize(server: &mut TestServer, users: &[UserTestPlan]);
 
-    async fn on_client_added(client: &Rc<TestClient>, cx: &mut TestAppContext);
+    async fn on_client_added(_client: &Rc<TestClient>, _cx: &mut TestAppContext) {}
 
     async fn on_quiesce(server: &mut TestServer, client: &mut [(Rc<TestClient>, TestAppContext)]);
 }

crates/collab2/src/tests/test_server.rs 🔗

@@ -17,6 +17,7 @@ use gpui::{BackgroundExecutor, Context, Model, TestAppContext, WindowHandle};
 use language::LanguageRegistry;
 use node_runtime::FakeNodeRuntime;
 
+use notifications::NotificationStore;
 use parking_lot::Mutex;
 use project::{Project, WorktreeId};
 use rpc::{proto::ChannelRole, RECEIVE_TIMEOUT};
@@ -47,8 +48,7 @@ pub struct TestClient {
     pub username: String,
     pub app_state: Arc<workspace::AppState>,
     channel_store: Model<ChannelStore>,
-    // todo!(notifications)
-    // notification_store: Model<NotificationStore>,
+    notification_store: Model<NotificationStore>,
     state: RefCell<TestClientState>,
 }
 
@@ -234,8 +234,7 @@ impl TestServer {
             audio::init((), cx);
             call::init(client.clone(), user_store.clone(), cx);
             channel::init(&client, user_store.clone(), cx);
-            //todo(notifications)
-            // notifications::init(client.clone(), user_store, cx);
+            notifications::init(client.clone(), user_store, cx);
         });
 
         client
@@ -247,8 +246,7 @@ impl TestServer {
             app_state,
             username: name.to_string(),
             channel_store: cx.read(ChannelStore::global).clone(),
-            // todo!(notifications)
-            // notification_store: cx.read(NotificationStore::global).clone(),
+            notification_store: cx.read(NotificationStore::global).clone(),
             state: Default::default(),
         };
         client.wait_for_current_user(cx).await;
@@ -456,10 +454,9 @@ impl TestClient {
         &self.channel_store
     }
 
-    // todo!(notifications)
-    // pub fn notification_store(&self) -> &Model<NotificationStore> {
-    //     &self.notification_store
-    // }
+    pub fn notification_store(&self) -> &Model<NotificationStore> {
+        &self.notification_store
+    }
 
     pub fn user_store(&self) -> &Model<UserStore> {
         &self.app_state.user_store

crates/notifications2/Cargo.toml 🔗

@@ -0,0 +1,42 @@
+[package]
+name = "notifications2"
+version = "0.1.0"
+edition = "2021"
+publish = false
+
+[lib]
+path = "src/notification_store2.rs"
+doctest = false
+
+[features]
+test-support = [
+    "channel/test-support",
+    "collections/test-support",
+    "gpui/test-support",
+    "rpc/test-support",
+]
+
+[dependencies]
+channel = { package = "channel2", path = "../channel2" }
+client = { package = "client2", path = "../client2" }
+clock = { path = "../clock" }
+collections = { path = "../collections" }
+db = { package = "db2", path = "../db2" }
+feature_flags = { package = "feature_flags2", path = "../feature_flags2" }
+gpui = { package = "gpui2", path = "../gpui2" }
+rpc = { package = "rpc2", path = "../rpc2" }
+settings = { package = "settings2", path = "../settings2" }
+sum_tree = { path = "../sum_tree" }
+text = { package = "text2", path = "../text2" }
+util = { path = "../util" }
+
+anyhow.workspace = true
+time.workspace = true
+
+[dev-dependencies]
+client = { package = "client2", path = "../client2", features = ["test-support"] }
+collections = { path = "../collections", features = ["test-support"] }
+gpui = { package = "gpui2", path = "../gpui2", features = ["test-support"] }
+rpc = { package = "rpc2", path = "../rpc2", features = ["test-support"] }
+settings = { package = "settings2", path = "../settings2", features = ["test-support"] }
+util = { path = "../util", features = ["test-support"] }

crates/notifications2/src/notification_store2.rs 🔗

@@ -0,0 +1,466 @@
+use anyhow::{Context, Result};
+use channel::{ChannelMessage, ChannelMessageId, ChannelStore};
+use client::{Client, UserStore};
+use collections::HashMap;
+use db::smol::stream::StreamExt;
+use gpui::{AppContext, AsyncAppContext, Context as _, EventEmitter, Model, ModelContext, Task};
+use rpc::{proto, Notification, TypedEnvelope};
+use std::{ops::Range, sync::Arc};
+use sum_tree::{Bias, SumTree};
+use time::OffsetDateTime;
+use util::ResultExt;
+
+pub fn init(client: Arc<Client>, user_store: Model<UserStore>, cx: &mut AppContext) {
+    let notification_store = cx.build_model(|cx| NotificationStore::new(client, user_store, cx));
+    cx.set_global(notification_store);
+}
+
+pub struct NotificationStore {
+    client: Arc<Client>,
+    user_store: Model<UserStore>,
+    channel_messages: HashMap<u64, ChannelMessage>,
+    channel_store: Model<ChannelStore>,
+    notifications: SumTree<NotificationEntry>,
+    loaded_all_notifications: bool,
+    _watch_connection_status: Task<Option<()>>,
+    _subscriptions: Vec<client::Subscription>,
+}
+
+#[derive(Clone, PartialEq, Eq, Debug)]
+pub enum NotificationEvent {
+    NotificationsUpdated {
+        old_range: Range<usize>,
+        new_count: usize,
+    },
+    NewNotification {
+        entry: NotificationEntry,
+    },
+    NotificationRemoved {
+        entry: NotificationEntry,
+    },
+    NotificationRead {
+        entry: NotificationEntry,
+    },
+}
+
+#[derive(Debug, PartialEq, Eq, Clone)]
+pub struct NotificationEntry {
+    pub id: u64,
+    pub notification: Notification,
+    pub timestamp: OffsetDateTime,
+    pub is_read: bool,
+    pub response: Option<bool>,
+}
+
+#[derive(Clone, Debug, Default)]
+pub struct NotificationSummary {
+    max_id: u64,
+    count: usize,
+    unread_count: usize,
+}
+
+#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, PartialOrd, Ord)]
+struct Count(usize);
+
+#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, PartialOrd, Ord)]
+struct UnreadCount(usize);
+
+#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, PartialOrd, Ord)]
+struct NotificationId(u64);
+
+impl NotificationStore {
+    pub fn global(cx: &AppContext) -> Model<Self> {
+        cx.global::<Model<Self>>().clone()
+    }
+
+    pub fn new(
+        client: Arc<Client>,
+        user_store: Model<UserStore>,
+        cx: &mut ModelContext<Self>,
+    ) -> Self {
+        let mut connection_status = client.status();
+        let watch_connection_status = cx.spawn(|this, mut cx| async move {
+            while let Some(status) = connection_status.next().await {
+                let this = this.upgrade()?;
+                match status {
+                    client::Status::Connected { .. } => {
+                        if let Some(task) = this
+                            .update(&mut cx, |this, cx| this.handle_connect(cx))
+                            .log_err()?
+                        {
+                            task.await.log_err()?;
+                        }
+                    }
+                    _ => this
+                        .update(&mut cx, |this, cx| this.handle_disconnect(cx))
+                        .log_err()?,
+                }
+            }
+            Some(())
+        });
+
+        Self {
+            channel_store: ChannelStore::global(cx),
+            notifications: Default::default(),
+            loaded_all_notifications: false,
+            channel_messages: Default::default(),
+            _watch_connection_status: watch_connection_status,
+            _subscriptions: vec![
+                client.add_message_handler(cx.weak_model(), Self::handle_new_notification),
+                client.add_message_handler(cx.weak_model(), Self::handle_delete_notification),
+            ],
+            user_store,
+            client,
+        }
+    }
+
+    pub fn notification_count(&self) -> usize {
+        self.notifications.summary().count
+    }
+
+    pub fn unread_notification_count(&self) -> usize {
+        self.notifications.summary().unread_count
+    }
+
+    pub fn channel_message_for_id(&self, id: u64) -> Option<&ChannelMessage> {
+        self.channel_messages.get(&id)
+    }
+
+    // Get the nth newest notification.
+    pub fn notification_at(&self, ix: usize) -> Option<&NotificationEntry> {
+        let count = self.notifications.summary().count;
+        if ix >= count {
+            return None;
+        }
+        let ix = count - 1 - ix;
+        let mut cursor = self.notifications.cursor::<Count>();
+        cursor.seek(&Count(ix), Bias::Right, &());
+        cursor.item()
+    }
+
+    pub fn notification_for_id(&self, id: u64) -> Option<&NotificationEntry> {
+        let mut cursor = self.notifications.cursor::<NotificationId>();
+        cursor.seek(&NotificationId(id), Bias::Left, &());
+        if let Some(item) = cursor.item() {
+            if item.id == id {
+                return Some(item);
+            }
+        }
+        None
+    }
+
+    pub fn load_more_notifications(
+        &self,
+        clear_old: bool,
+        cx: &mut ModelContext<Self>,
+    ) -> Option<Task<Result<()>>> {
+        if self.loaded_all_notifications && !clear_old {
+            return None;
+        }
+
+        let before_id = if clear_old {
+            None
+        } else {
+            self.notifications.first().map(|entry| entry.id)
+        };
+        let request = self.client.request(proto::GetNotifications { before_id });
+        Some(cx.spawn(|this, mut cx| async move {
+            let this = this
+                .upgrade()
+                .context("Notification store was dropped while loading notifications")?;
+
+            let response = request.await?;
+            this.update(&mut cx, |this, _| {
+                this.loaded_all_notifications = response.done
+            })?;
+            Self::add_notifications(
+                this,
+                response.notifications,
+                AddNotificationsOptions {
+                    is_new: false,
+                    clear_old,
+                    includes_first: response.done,
+                },
+                cx,
+            )
+            .await?;
+            Ok(())
+        }))
+    }
+
+    fn handle_connect(&mut self, cx: &mut ModelContext<Self>) -> Option<Task<Result<()>>> {
+        self.notifications = Default::default();
+        self.channel_messages = Default::default();
+        cx.notify();
+        self.load_more_notifications(true, cx)
+    }
+
+    fn handle_disconnect(&mut self, cx: &mut ModelContext<Self>) {
+        cx.notify()
+    }
+
+    async fn handle_new_notification(
+        this: Model<Self>,
+        envelope: TypedEnvelope<proto::AddNotification>,
+        _: Arc<Client>,
+        cx: AsyncAppContext,
+    ) -> Result<()> {
+        Self::add_notifications(
+            this,
+            envelope.payload.notification.into_iter().collect(),
+            AddNotificationsOptions {
+                is_new: true,
+                clear_old: false,
+                includes_first: false,
+            },
+            cx,
+        )
+        .await
+    }
+
+    async fn handle_delete_notification(
+        this: Model<Self>,
+        envelope: TypedEnvelope<proto::DeleteNotification>,
+        _: Arc<Client>,
+        mut cx: AsyncAppContext,
+    ) -> Result<()> {
+        this.update(&mut cx, |this, cx| {
+            this.splice_notifications([(envelope.payload.notification_id, None)], false, cx);
+            Ok(())
+        })?
+    }
+
+    async fn add_notifications(
+        this: Model<Self>,
+        notifications: Vec<proto::Notification>,
+        options: AddNotificationsOptions,
+        mut cx: AsyncAppContext,
+    ) -> Result<()> {
+        let mut user_ids = Vec::new();
+        let mut message_ids = Vec::new();
+
+        let notifications = notifications
+            .into_iter()
+            .filter_map(|message| {
+                Some(NotificationEntry {
+                    id: message.id,
+                    is_read: message.is_read,
+                    timestamp: OffsetDateTime::from_unix_timestamp(message.timestamp as i64)
+                        .ok()?,
+                    notification: Notification::from_proto(&message)?,
+                    response: message.response,
+                })
+            })
+            .collect::<Vec<_>>();
+        if notifications.is_empty() {
+            return Ok(());
+        }
+
+        for entry in &notifications {
+            match entry.notification {
+                Notification::ChannelInvitation { inviter_id, .. } => {
+                    user_ids.push(inviter_id);
+                }
+                Notification::ContactRequest {
+                    sender_id: requester_id,
+                } => {
+                    user_ids.push(requester_id);
+                }
+                Notification::ContactRequestAccepted {
+                    responder_id: contact_id,
+                } => {
+                    user_ids.push(contact_id);
+                }
+                Notification::ChannelMessageMention {
+                    sender_id,
+                    message_id,
+                    ..
+                } => {
+                    user_ids.push(sender_id);
+                    message_ids.push(message_id);
+                }
+            }
+        }
+
+        let (user_store, channel_store) = this.read_with(&cx, |this, _| {
+            (this.user_store.clone(), this.channel_store.clone())
+        })?;
+
+        user_store
+            .update(&mut cx, |store, cx| store.get_users(user_ids, cx))?
+            .await?;
+        let messages = channel_store
+            .update(&mut cx, |store, cx| {
+                store.fetch_channel_messages(message_ids, cx)
+            })?
+            .await?;
+        this.update(&mut cx, |this, cx| {
+            if options.clear_old {
+                cx.emit(NotificationEvent::NotificationsUpdated {
+                    old_range: 0..this.notifications.summary().count,
+                    new_count: 0,
+                });
+                this.notifications = SumTree::default();
+                this.channel_messages.clear();
+                this.loaded_all_notifications = false;
+            }
+
+            if options.includes_first {
+                this.loaded_all_notifications = true;
+            }
+
+            this.channel_messages
+                .extend(messages.into_iter().filter_map(|message| {
+                    if let ChannelMessageId::Saved(id) = message.id {
+                        Some((id, message))
+                    } else {
+                        None
+                    }
+                }));
+
+            this.splice_notifications(
+                notifications
+                    .into_iter()
+                    .map(|notification| (notification.id, Some(notification))),
+                options.is_new,
+                cx,
+            );
+        })
+        .log_err();
+
+        Ok(())
+    }
+
+    fn splice_notifications(
+        &mut self,
+        notifications: impl IntoIterator<Item = (u64, Option<NotificationEntry>)>,
+        is_new: bool,
+        cx: &mut ModelContext<'_, NotificationStore>,
+    ) {
+        let mut cursor = self.notifications.cursor::<(NotificationId, Count)>();
+        let mut new_notifications = SumTree::new();
+        let mut old_range = 0..0;
+
+        for (i, (id, new_notification)) in notifications.into_iter().enumerate() {
+            new_notifications.append(cursor.slice(&NotificationId(id), Bias::Left, &()), &());
+
+            if i == 0 {
+                old_range.start = cursor.start().1 .0;
+            }
+
+            let old_notification = cursor.item();
+            if let Some(old_notification) = old_notification {
+                if old_notification.id == id {
+                    cursor.next(&());
+
+                    if let Some(new_notification) = &new_notification {
+                        if new_notification.is_read {
+                            cx.emit(NotificationEvent::NotificationRead {
+                                entry: new_notification.clone(),
+                            });
+                        }
+                    } else {
+                        cx.emit(NotificationEvent::NotificationRemoved {
+                            entry: old_notification.clone(),
+                        });
+                    }
+                }
+            } else if let Some(new_notification) = &new_notification {
+                if is_new {
+                    cx.emit(NotificationEvent::NewNotification {
+                        entry: new_notification.clone(),
+                    });
+                }
+            }
+
+            if let Some(notification) = new_notification {
+                new_notifications.push(notification, &());
+            }
+        }
+
+        old_range.end = cursor.start().1 .0;
+        let new_count = new_notifications.summary().count - old_range.start;
+        new_notifications.append(cursor.suffix(&()), &());
+        drop(cursor);
+
+        self.notifications = new_notifications;
+        cx.emit(NotificationEvent::NotificationsUpdated {
+            old_range,
+            new_count,
+        });
+    }
+
+    pub fn respond_to_notification(
+        &mut self,
+        notification: Notification,
+        response: bool,
+        cx: &mut ModelContext<Self>,
+    ) {
+        match notification {
+            Notification::ContactRequest { sender_id } => {
+                self.user_store
+                    .update(cx, |store, cx| {
+                        store.respond_to_contact_request(sender_id, response, cx)
+                    })
+                    .detach();
+            }
+            Notification::ChannelInvitation { channel_id, .. } => {
+                self.channel_store
+                    .update(cx, |store, cx| {
+                        store.respond_to_channel_invite(channel_id, response, cx)
+                    })
+                    .detach();
+            }
+            _ => {}
+        }
+    }
+}
+
+impl EventEmitter<NotificationEvent> for NotificationStore {}
+
+impl sum_tree::Item for NotificationEntry {
+    type Summary = NotificationSummary;
+
+    fn summary(&self) -> Self::Summary {
+        NotificationSummary {
+            max_id: self.id,
+            count: 1,
+            unread_count: if self.is_read { 0 } else { 1 },
+        }
+    }
+}
+
+impl sum_tree::Summary for NotificationSummary {
+    type Context = ();
+
+    fn add_summary(&mut self, summary: &Self, _: &()) {
+        self.max_id = self.max_id.max(summary.max_id);
+        self.count += summary.count;
+        self.unread_count += summary.unread_count;
+    }
+}
+
+impl<'a> sum_tree::Dimension<'a, NotificationSummary> for NotificationId {
+    fn add_summary(&mut self, summary: &NotificationSummary, _: &()) {
+        debug_assert!(summary.max_id > self.0);
+        self.0 = summary.max_id;
+    }
+}
+
+impl<'a> sum_tree::Dimension<'a, NotificationSummary> for Count {
+    fn add_summary(&mut self, summary: &NotificationSummary, _: &()) {
+        self.0 += summary.count;
+    }
+}
+
+impl<'a> sum_tree::Dimension<'a, NotificationSummary> for UnreadCount {
+    fn add_summary(&mut self, summary: &NotificationSummary, _: &()) {
+        self.0 += summary.unread_count;
+    }
+}
+
+struct AddNotificationsOptions {
+    is_new: bool,
+    clear_old: bool,
+    includes_first: bool,
+}