Merge branch 'main' into editor-integration

Mikayla created

Change summary

Cargo.lock                                              |  21 
Cargo.toml                                              |   1 
crates/collab2/Cargo.toml                               |   4 
crates/collab2/src/tests/channel_message_tests.rs       | 204 ++--
crates/collab2/src/tests/channel_tests.rs               |   2 
crates/collab2/src/tests/notification_tests.rs          | 320 +++---
crates/collab2/src/tests/random_channel_buffer_tests.rs |   8 
crates/collab2/src/tests/randomized_test_helpers.rs     |   2 
crates/collab2/src/tests/test_server.rs                 |  17 
crates/editor2/src/editor.rs                            |   4 
crates/editor2/src/element.rs                           |  19 
crates/gpui2/src/element.rs                             |  25 
crates/gpui2/src/gpui2.rs                               |   4 
crates/gpui2/src/input.rs                               | 114 ++
crates/gpui2/src/platform.rs                            |  10 
crates/gpui2/src/window.rs                              |  20 
crates/gpui2/src/window_input_handler.rs                | 167 ---
crates/notifications2/Cargo.toml                        |  42 
crates/notifications2/src/notification_store2.rs        | 466 +++++++++++
19 files changed, 948 insertions(+), 502 deletions(-)

Detailed changes

Cargo.lock 🔗

@@ -1748,6 +1748,7 @@ dependencies = [
  "lsp2",
  "nanoid",
  "node_runtime",
+ "notifications2",
  "parking_lot 0.11.2",
  "pretty_assertions",
  "project2",
@@ -5484,6 +5485,26 @@ dependencies = [
  "util",
 ]
 
+[[package]]
+name = "notifications2"
+version = "0.1.0"
+dependencies = [
+ "anyhow",
+ "channel2",
+ "client2",
+ "clock",
+ "collections",
+ "db2",
+ "feature_flags2",
+ "gpui2",
+ "rpc2",
+ "settings2",
+ "sum_tree",
+ "text2",
+ "time",
+ "util",
+]
+
 [[package]]
 name = "ntapi"
 version = "0.3.7"

Cargo.toml 🔗

@@ -67,6 +67,7 @@ members = [
     "crates/multi_buffer2",
     "crates/node_runtime",
     "crates/notifications",
+    "crates/notifications2",
     "crates/outline",
     "crates/picker",
     "crates/picker2",

crates/collab2/Cargo.toml 🔗

@@ -72,10 +72,8 @@ fs = { package = "fs2", path = "../fs2", features = ["test-support"] }
 git = { package = "git3", path = "../git3", features = ["test-support"] }
 live_kit_client = { package = "live_kit_client2", path = "../live_kit_client2", features = ["test-support"] }
 lsp = { package = "lsp2", path = "../lsp2", features = ["test-support"] }
-
 node_runtime = { path = "../node_runtime" }
-#todo!(notifications)
-#notifications = { path = "../notifications", features = ["test-support"] }
+notifications = { package = "notifications2", path = "../notifications2", features = ["test-support"] }
 
 project = { package = "project2", path = "../project2", features = ["test-support"] }
 rpc = { package = "rpc2", path = "../rpc2", features = ["test-support"] }

crates/collab2/src/tests/channel_message_tests.rs 🔗

@@ -1,115 +1,115 @@
 use crate::{rpc::RECONNECT_TIMEOUT, tests::TestServer};
-use channel::{ChannelChat, ChannelMessageId};
+use channel::{ChannelChat, ChannelMessageId, MessageParams};
 use gpui::{BackgroundExecutor, Model, TestAppContext};
+use rpc::Notification;
 
-// todo!(notifications)
-// #[gpui::test]
-// async fn test_basic_channel_messages(
-//     executor: BackgroundExecutor,
-//     mut cx_a: &mut TestAppContext,
-//     mut cx_b: &mut TestAppContext,
-//     mut cx_c: &mut TestAppContext,
-// ) {
-//     let mut server = TestServer::start(executor.clone()).await;
-//     let client_a = server.create_client(cx_a, "user_a").await;
-//     let client_b = server.create_client(cx_b, "user_b").await;
-//     let client_c = server.create_client(cx_c, "user_c").await;
+#[gpui::test]
+async fn test_basic_channel_messages(
+    executor: BackgroundExecutor,
+    mut cx_a: &mut TestAppContext,
+    mut cx_b: &mut TestAppContext,
+    mut cx_c: &mut TestAppContext,
+) {
+    let mut server = TestServer::start(executor.clone()).await;
+    let client_a = server.create_client(cx_a, "user_a").await;
+    let client_b = server.create_client(cx_b, "user_b").await;
+    let client_c = server.create_client(cx_c, "user_c").await;
 
-//     let channel_id = server
-//         .make_channel(
-//             "the-channel",
-//             None,
-//             (&client_a, cx_a),
-//             &mut [(&client_b, cx_b), (&client_c, cx_c)],
-//         )
-//         .await;
+    let channel_id = server
+        .make_channel(
+            "the-channel",
+            None,
+            (&client_a, cx_a),
+            &mut [(&client_b, cx_b), (&client_c, cx_c)],
+        )
+        .await;
 
-//     let channel_chat_a = client_a
-//         .channel_store()
-//         .update(cx_a, |store, cx| store.open_channel_chat(channel_id, cx))
-//         .await
-//         .unwrap();
-//     let channel_chat_b = client_b
-//         .channel_store()
-//         .update(cx_b, |store, cx| store.open_channel_chat(channel_id, cx))
-//         .await
-//         .unwrap();
+    let channel_chat_a = client_a
+        .channel_store()
+        .update(cx_a, |store, cx| store.open_channel_chat(channel_id, cx))
+        .await
+        .unwrap();
+    let channel_chat_b = client_b
+        .channel_store()
+        .update(cx_b, |store, cx| store.open_channel_chat(channel_id, cx))
+        .await
+        .unwrap();
 
-//     let message_id = channel_chat_a
-//         .update(cx_a, |c, cx| {
-//             c.send_message(
-//                 MessageParams {
-//                     text: "hi @user_c!".into(),
-//                     mentions: vec![(3..10, client_c.id())],
-//                 },
-//                 cx,
-//             )
-//             .unwrap()
-//         })
-//         .await
-//         .unwrap();
-//     channel_chat_a
-//         .update(cx_a, |c, cx| c.send_message("two".into(), cx).unwrap())
-//         .await
-//         .unwrap();
+    let message_id = channel_chat_a
+        .update(cx_a, |c, cx| {
+            c.send_message(
+                MessageParams {
+                    text: "hi @user_c!".into(),
+                    mentions: vec![(3..10, client_c.id())],
+                },
+                cx,
+            )
+            .unwrap()
+        })
+        .await
+        .unwrap();
+    channel_chat_a
+        .update(cx_a, |c, cx| c.send_message("two".into(), cx).unwrap())
+        .await
+        .unwrap();
 
-//     executor.run_until_parked();
-//     channel_chat_b
-//         .update(cx_b, |c, cx| c.send_message("three".into(), cx).unwrap())
-//         .await
-//         .unwrap();
+    executor.run_until_parked();
+    channel_chat_b
+        .update(cx_b, |c, cx| c.send_message("three".into(), cx).unwrap())
+        .await
+        .unwrap();
 
-//     executor.run_until_parked();
+    executor.run_until_parked();
 
-//     let channel_chat_c = client_c
-//         .channel_store()
-//         .update(cx_c, |store, cx| store.open_channel_chat(channel_id, cx))
-//         .await
-//         .unwrap();
+    let channel_chat_c = client_c
+        .channel_store()
+        .update(cx_c, |store, cx| store.open_channel_chat(channel_id, cx))
+        .await
+        .unwrap();
 
-//     for (chat, cx) in [
-//         (&channel_chat_a, &mut cx_a),
-//         (&channel_chat_b, &mut cx_b),
-//         (&channel_chat_c, &mut cx_c),
-//     ] {
-//         chat.update(*cx, |c, _| {
-//             assert_eq!(
-//                 c.messages()
-//                     .iter()
-//                     .map(|m| (m.body.as_str(), m.mentions.as_slice()))
-//                     .collect::<Vec<_>>(),
-//                 vec![
-//                     ("hi @user_c!", [(3..10, client_c.id())].as_slice()),
-//                     ("two", &[]),
-//                     ("three", &[])
-//                 ],
-//                 "results for user {}",
-//                 c.client().id(),
-//             );
-//         });
-//     }
-
-//     client_c.notification_store().update(cx_c, |store, _| {
-//         assert_eq!(store.notification_count(), 2);
-//         assert_eq!(store.unread_notification_count(), 1);
-//         assert_eq!(
-//             store.notification_at(0).unwrap().notification,
-//             Notification::ChannelMessageMention {
-//                 message_id,
-//                 sender_id: client_a.id(),
-//                 channel_id,
-//             }
-//         );
-//         assert_eq!(
-//             store.notification_at(1).unwrap().notification,
-//             Notification::ChannelInvitation {
-//                 channel_id,
-//                 channel_name: "the-channel".to_string(),
-//                 inviter_id: client_a.id()
-//             }
-//         );
-//     });
-// }
+    for (chat, cx) in [
+        (&channel_chat_a, &mut cx_a),
+        (&channel_chat_b, &mut cx_b),
+        (&channel_chat_c, &mut cx_c),
+    ] {
+        chat.update(*cx, |c, _| {
+            assert_eq!(
+                c.messages()
+                    .iter()
+                    .map(|m| (m.body.as_str(), m.mentions.as_slice()))
+                    .collect::<Vec<_>>(),
+                vec![
+                    ("hi @user_c!", [(3..10, client_c.id())].as_slice()),
+                    ("two", &[]),
+                    ("three", &[])
+                ],
+                "results for user {}",
+                c.client().id(),
+            );
+        });
+    }
+
+    client_c.notification_store().update(cx_c, |store, _| {
+        assert_eq!(store.notification_count(), 2);
+        assert_eq!(store.unread_notification_count(), 1);
+        assert_eq!(
+            store.notification_at(0).unwrap().notification,
+            Notification::ChannelMessageMention {
+                message_id,
+                sender_id: client_a.id(),
+                channel_id,
+            }
+        );
+        assert_eq!(
+            store.notification_at(1).unwrap().notification,
+            Notification::ChannelInvitation {
+                channel_id,
+                channel_name: "the-channel".to_string(),
+                inviter_id: client_a.id()
+            }
+        );
+    });
+}
 
 #[gpui::test]
 async fn test_rejoin_channel_chat(

crates/collab2/src/tests/channel_tests.rs 🔗

@@ -1128,6 +1128,8 @@ async fn test_channel_link_notifications(
         .await
         .unwrap();
 
+    executor.run_until_parked();
+
     // the members-only channel is still shown for c, but hidden for b
     assert_channels_list_shape(
         client_b.channel_store(),

crates/collab2/src/tests/notification_tests.rs 🔗

@@ -1,160 +1,160 @@
-//todo!(notifications)
-// use crate::tests::TestServer;
-// use gpui::{executor::Deterministic, TestAppContext};
-// use notifications::NotificationEvent;
-// use parking_lot::Mutex;
-// use rpc::{proto, Notification};
-// use std::sync::Arc;
-
-// #[gpui::test]
-// async fn test_notifications(
-//     deterministic: Arc<Deterministic>,
-//     cx_a: &mut TestAppContext,
-//     cx_b: &mut TestAppContext,
-// ) {
-//     deterministic.forbid_parking();
-//     let mut server = TestServer::start(&deterministic).await;
-//     let client_a = server.create_client(cx_a, "user_a").await;
-//     let client_b = server.create_client(cx_b, "user_b").await;
-
-//     let notification_events_a = Arc::new(Mutex::new(Vec::new()));
-//     let notification_events_b = Arc::new(Mutex::new(Vec::new()));
-//     client_a.notification_store().update(cx_a, |_, cx| {
-//         let events = notification_events_a.clone();
-//         cx.subscribe(&cx.handle(), move |_, _, event, _| {
-//             events.lock().push(event.clone());
-//         })
-//         .detach()
-//     });
-//     client_b.notification_store().update(cx_b, |_, cx| {
-//         let events = notification_events_b.clone();
-//         cx.subscribe(&cx.handle(), move |_, _, event, _| {
-//             events.lock().push(event.clone());
-//         })
-//         .detach()
-//     });
-
-//     // Client A sends a contact request to client B.
-//     client_a
-//         .user_store()
-//         .update(cx_a, |store, cx| store.request_contact(client_b.id(), cx))
-//         .await
-//         .unwrap();
-
-//     // Client B receives a contact request notification and responds to the
-//     // request, accepting it.
-//     deterministic.run_until_parked();
-//     client_b.notification_store().update(cx_b, |store, cx| {
-//         assert_eq!(store.notification_count(), 1);
-//         assert_eq!(store.unread_notification_count(), 1);
-
-//         let entry = store.notification_at(0).unwrap();
-//         assert_eq!(
-//             entry.notification,
-//             Notification::ContactRequest {
-//                 sender_id: client_a.id()
-//             }
-//         );
-//         assert!(!entry.is_read);
-//         assert_eq!(
-//             &notification_events_b.lock()[0..],
-//             &[
-//                 NotificationEvent::NewNotification {
-//                     entry: entry.clone(),
-//                 },
-//                 NotificationEvent::NotificationsUpdated {
-//                     old_range: 0..0,
-//                     new_count: 1
-//                 }
-//             ]
-//         );
-
-//         store.respond_to_notification(entry.notification.clone(), true, cx);
-//     });
-
-//     // Client B sees the notification is now read, and that they responded.
-//     deterministic.run_until_parked();
-//     client_b.notification_store().read_with(cx_b, |store, _| {
-//         assert_eq!(store.notification_count(), 1);
-//         assert_eq!(store.unread_notification_count(), 0);
-
-//         let entry = store.notification_at(0).unwrap();
-//         assert!(entry.is_read);
-//         assert_eq!(entry.response, Some(true));
-//         assert_eq!(
-//             &notification_events_b.lock()[2..],
-//             &[
-//                 NotificationEvent::NotificationRead {
-//                     entry: entry.clone(),
-//                 },
-//                 NotificationEvent::NotificationsUpdated {
-//                     old_range: 0..1,
-//                     new_count: 1
-//                 }
-//             ]
-//         );
-//     });
-
-//     // Client A receives a notification that client B accepted their request.
-//     client_a.notification_store().read_with(cx_a, |store, _| {
-//         assert_eq!(store.notification_count(), 1);
-//         assert_eq!(store.unread_notification_count(), 1);
-
-//         let entry = store.notification_at(0).unwrap();
-//         assert_eq!(
-//             entry.notification,
-//             Notification::ContactRequestAccepted {
-//                 responder_id: client_b.id()
-//             }
-//         );
-//         assert!(!entry.is_read);
-//     });
-
-//     // Client A creates a channel and invites client B to be a member.
-//     let channel_id = client_a
-//         .channel_store()
-//         .update(cx_a, |store, cx| {
-//             store.create_channel("the-channel", None, cx)
-//         })
-//         .await
-//         .unwrap();
-//     client_a
-//         .channel_store()
-//         .update(cx_a, |store, cx| {
-//             store.invite_member(channel_id, client_b.id(), proto::ChannelRole::Member, cx)
-//         })
-//         .await
-//         .unwrap();
-
-//     // Client B receives a channel invitation notification and responds to the
-//     // invitation, accepting it.
-//     deterministic.run_until_parked();
-//     client_b.notification_store().update(cx_b, |store, cx| {
-//         assert_eq!(store.notification_count(), 2);
-//         assert_eq!(store.unread_notification_count(), 1);
-
-//         let entry = store.notification_at(0).unwrap();
-//         assert_eq!(
-//             entry.notification,
-//             Notification::ChannelInvitation {
-//                 channel_id,
-//                 channel_name: "the-channel".to_string(),
-//                 inviter_id: client_a.id()
-//             }
-//         );
-//         assert!(!entry.is_read);
-
-//         store.respond_to_notification(entry.notification.clone(), true, cx);
-//     });
-
-//     // Client B sees the notification is now read, and that they responded.
-//     deterministic.run_until_parked();
-//     client_b.notification_store().read_with(cx_b, |store, _| {
-//         assert_eq!(store.notification_count(), 2);
-//         assert_eq!(store.unread_notification_count(), 0);
-
-//         let entry = store.notification_at(0).unwrap();
-//         assert!(entry.is_read);
-//         assert_eq!(entry.response, Some(true));
-//     });
-// }
+use std::sync::Arc;
+
+use gpui::{BackgroundExecutor, TestAppContext};
+use notifications::NotificationEvent;
+use parking_lot::Mutex;
+use rpc::{proto, Notification};
+
+use crate::tests::TestServer;
+
+#[gpui::test]
+async fn test_notifications(
+    executor: BackgroundExecutor,
+    cx_a: &mut TestAppContext,
+    cx_b: &mut TestAppContext,
+) {
+    let mut server = TestServer::start(executor.clone()).await;
+    let client_a = server.create_client(cx_a, "user_a").await;
+    let client_b = server.create_client(cx_b, "user_b").await;
+
+    let notification_events_a = Arc::new(Mutex::new(Vec::new()));
+    let notification_events_b = Arc::new(Mutex::new(Vec::new()));
+    client_a.notification_store().update(cx_a, |_, cx| {
+        let events = notification_events_a.clone();
+        cx.subscribe(&cx.handle(), move |_, _, event, _| {
+            events.lock().push(event.clone());
+        })
+        .detach()
+    });
+    client_b.notification_store().update(cx_b, |_, cx| {
+        let events = notification_events_b.clone();
+        cx.subscribe(&cx.handle(), move |_, _, event, _| {
+            events.lock().push(event.clone());
+        })
+        .detach()
+    });
+
+    // Client A sends a contact request to client B.
+    client_a
+        .user_store()
+        .update(cx_a, |store, cx| store.request_contact(client_b.id(), cx))
+        .await
+        .unwrap();
+
+    // Client B receives a contact request notification and responds to the
+    // request, accepting it.
+    executor.run_until_parked();
+    client_b.notification_store().update(cx_b, |store, cx| {
+        assert_eq!(store.notification_count(), 1);
+        assert_eq!(store.unread_notification_count(), 1);
+
+        let entry = store.notification_at(0).unwrap();
+        assert_eq!(
+            entry.notification,
+            Notification::ContactRequest {
+                sender_id: client_a.id()
+            }
+        );
+        assert!(!entry.is_read);
+        assert_eq!(
+            &notification_events_b.lock()[0..],
+            &[
+                NotificationEvent::NewNotification {
+                    entry: entry.clone(),
+                },
+                NotificationEvent::NotificationsUpdated {
+                    old_range: 0..0,
+                    new_count: 1
+                }
+            ]
+        );
+
+        store.respond_to_notification(entry.notification.clone(), true, cx);
+    });
+
+    // Client B sees the notification is now read, and that they responded.
+    executor.run_until_parked();
+    client_b.notification_store().read_with(cx_b, |store, _| {
+        assert_eq!(store.notification_count(), 1);
+        assert_eq!(store.unread_notification_count(), 0);
+
+        let entry = store.notification_at(0).unwrap();
+        assert!(entry.is_read);
+        assert_eq!(entry.response, Some(true));
+        assert_eq!(
+            &notification_events_b.lock()[2..],
+            &[
+                NotificationEvent::NotificationRead {
+                    entry: entry.clone(),
+                },
+                NotificationEvent::NotificationsUpdated {
+                    old_range: 0..1,
+                    new_count: 1
+                }
+            ]
+        );
+    });
+
+    // Client A receives a notification that client B accepted their request.
+    client_a.notification_store().read_with(cx_a, |store, _| {
+        assert_eq!(store.notification_count(), 1);
+        assert_eq!(store.unread_notification_count(), 1);
+
+        let entry = store.notification_at(0).unwrap();
+        assert_eq!(
+            entry.notification,
+            Notification::ContactRequestAccepted {
+                responder_id: client_b.id()
+            }
+        );
+        assert!(!entry.is_read);
+    });
+
+    // Client A creates a channel and invites client B to be a member.
+    let channel_id = client_a
+        .channel_store()
+        .update(cx_a, |store, cx| {
+            store.create_channel("the-channel", None, cx)
+        })
+        .await
+        .unwrap();
+    client_a
+        .channel_store()
+        .update(cx_a, |store, cx| {
+            store.invite_member(channel_id, client_b.id(), proto::ChannelRole::Member, cx)
+        })
+        .await
+        .unwrap();
+
+    // Client B receives a channel invitation notification and responds to the
+    // invitation, accepting it.
+    executor.run_until_parked();
+    client_b.notification_store().update(cx_b, |store, cx| {
+        assert_eq!(store.notification_count(), 2);
+        assert_eq!(store.unread_notification_count(), 1);
+
+        let entry = store.notification_at(0).unwrap();
+        assert_eq!(
+            entry.notification,
+            Notification::ChannelInvitation {
+                channel_id,
+                channel_name: "the-channel".to_string(),
+                inviter_id: client_a.id()
+            }
+        );
+        assert!(!entry.is_read);
+
+        store.respond_to_notification(entry.notification.clone(), true, cx);
+    });
+
+    // Client B sees the notification is now read, and that they responded.
+    executor.run_until_parked();
+    client_b.notification_store().read_with(cx_b, |store, _| {
+        assert_eq!(store.notification_count(), 2);
+        assert_eq!(store.unread_notification_count(), 0);
+
+        let entry = store.notification_at(0).unwrap();
+        assert!(entry.is_read);
+        assert_eq!(entry.response, Some(true));
+    });
+}

crates/collab2/src/tests/random_channel_buffer_tests.rs 🔗

@@ -220,14 +220,6 @@ impl RandomizedTest for RandomChannelBufferTest {
         Ok(())
     }
 
-    async fn on_client_added(client: &Rc<TestClient>, cx: &mut TestAppContext) {
-        let channel_store = client.channel_store();
-        while channel_store.read_with(cx, |store, _| store.channel_count() == 0) {
-            // todo!(notifications)
-            // channel_store.next_notification(cx).await;
-        }
-    }
-
     async fn on_quiesce(server: &mut TestServer, clients: &mut [(Rc<TestClient>, TestAppContext)]) {
         let channels = server.app_state.db.all_channels().await.unwrap();
 

crates/collab2/src/tests/randomized_test_helpers.rs 🔗

@@ -115,7 +115,7 @@ pub trait RandomizedTest: 'static + Sized {
 
     async fn initialize(server: &mut TestServer, users: &[UserTestPlan]);
 
-    async fn on_client_added(client: &Rc<TestClient>, cx: &mut TestAppContext);
+    async fn on_client_added(_client: &Rc<TestClient>, _cx: &mut TestAppContext) {}
 
     async fn on_quiesce(server: &mut TestServer, client: &mut [(Rc<TestClient>, TestAppContext)]);
 }

crates/collab2/src/tests/test_server.rs 🔗

@@ -17,6 +17,7 @@ use gpui::{BackgroundExecutor, Context, Model, TestAppContext, WindowHandle};
 use language::LanguageRegistry;
 use node_runtime::FakeNodeRuntime;
 
+use notifications::NotificationStore;
 use parking_lot::Mutex;
 use project::{Project, WorktreeId};
 use rpc::{proto::ChannelRole, RECEIVE_TIMEOUT};
@@ -47,8 +48,7 @@ pub struct TestClient {
     pub username: String,
     pub app_state: Arc<workspace::AppState>,
     channel_store: Model<ChannelStore>,
-    // todo!(notifications)
-    // notification_store: Model<NotificationStore>,
+    notification_store: Model<NotificationStore>,
     state: RefCell<TestClientState>,
 }
 
@@ -234,8 +234,7 @@ impl TestServer {
             audio::init((), cx);
             call::init(client.clone(), user_store.clone(), cx);
             channel::init(&client, user_store.clone(), cx);
-            //todo(notifications)
-            // notifications::init(client.clone(), user_store, cx);
+            notifications::init(client.clone(), user_store, cx);
         });
 
         client
@@ -247,8 +246,7 @@ impl TestServer {
             app_state,
             username: name.to_string(),
             channel_store: cx.read(ChannelStore::global).clone(),
-            // todo!(notifications)
-            // notification_store: cx.read(NotificationStore::global).clone(),
+            notification_store: cx.read(NotificationStore::global).clone(),
             state: Default::default(),
         };
         client.wait_for_current_user(cx).await;
@@ -456,10 +454,9 @@ impl TestClient {
         &self.channel_store
     }
 
-    // todo!(notifications)
-    // pub fn notification_store(&self) -> &Model<NotificationStore> {
-    //     &self.notification_store
-    // }
+    pub fn notification_store(&self) -> &Model<NotificationStore> {
+        &self.notification_store
+    }
 
     pub fn user_store(&self) -> &Model<UserStore> {
         &self.app_state.user_store

crates/editor2/src/editor.rs 🔗

@@ -9580,7 +9580,7 @@ impl Render for Editor {
 
 impl InputHandler for Editor {
     fn text_for_range(
-        &self,
+        &mut self,
         range_utf16: Range<usize>,
         cx: &mut ViewContext<Self>,
     ) -> Option<String> {
@@ -9593,7 +9593,7 @@ impl InputHandler for Editor {
         )
     }
 
-    fn selected_text_range(&self, cx: &mut ViewContext<Self>) -> Option<Range<usize>> {
+    fn selected_text_range(&mut self, cx: &mut ViewContext<Self>) -> Option<Range<usize>> {
         // Prevent the IME menu from appearing when holding down an alphabetic key
         // while input is disabled.
         if !self.input_enabled {

crates/editor2/src/element.rs 🔗

@@ -17,11 +17,10 @@ use collections::{BTreeMap, HashMap};
 use gpui::{
     black, hsla, point, px, relative, size, transparent_black, Action, AnyElement,
     BorrowAppContext, BorrowWindow, Bounds, ContentMask, Corners, DispatchContext, DispatchPhase,
-    Edges, Element, ElementId, Entity, FocusHandle, GlobalElementId, Hsla, InputHandler,
-    InputHandlerView, KeyDownEvent, KeyListener, KeyMatch, Line, LineLayout, Modifiers,
-    MouseButton, MouseDownEvent, MouseMoveEvent, MouseUpEvent, Pixels, ScrollWheelEvent,
-    ShapedGlyph, Size, Style, TextRun, TextStyle, TextSystem, ViewContext, WindowContext,
-    WrappedLineLayout,
+    Edges, Element, ElementId, ElementInputHandler, Entity, FocusHandle, GlobalElementId, Hsla,
+    InputHandler, KeyDownEvent, KeyListener, KeyMatch, Line, LineLayout, Modifiers, MouseButton,
+    MouseDownEvent, MouseMoveEvent, MouseUpEvent, Pixels, ScrollWheelEvent, ShapedGlyph, Size,
+    Style, TextRun, TextStyle, TextSystem, ViewContext, WindowContext, WrappedLineLayout,
 };
 use itertools::Itertools;
 use language::language_settings::ShowWhitespaceSetting;
@@ -2517,16 +2516,10 @@ impl Element<Editor> for EditorElement {
                 self.paint_gutter(gutter_bounds, &layout, editor, cx);
             }
             self.paint_text(text_bounds, &layout, editor, cx);
+            let input_handler = ElementInputHandler::new(bounds, cx);
+            cx.handle_input(&editor.focus_handle, input_handler);
         });
     }
-
-    fn handle_text_input<'a>(
-        &self,
-        editor: &'a mut Editor,
-        cx: &mut ViewContext<Editor>,
-    ) -> Option<(Box<dyn InputHandlerView>, &'a FocusHandle)> {
-        Some((Box::new(cx.view()), &editor.focus_handle))
-    }
 }
 
 // impl EditorElement {

crates/gpui2/src/element.rs 🔗

@@ -1,7 +1,4 @@
-use crate::{
-    BorrowWindow, Bounds, ElementId, FocusHandle, InputHandlerView, LayoutId, Pixels, ViewContext,
-    WindowInputHandler,
-};
+use crate::{BorrowWindow, Bounds, ElementId, LayoutId, Pixels, ViewContext};
 use derive_more::{Deref, DerefMut};
 pub(crate) use smallvec::SmallVec;
 use std::{any::Any, mem};
@@ -34,14 +31,6 @@ pub trait Element<V: 'static> {
         element_state: &mut Self::ElementState,
         cx: &mut ViewContext<V>,
     );
-
-    fn handle_text_input<'a>(
-        &self,
-        _view_state: &'a mut V,
-        _cx: &mut ViewContext<V>,
-    ) -> Option<(Box<dyn InputHandlerView>, &'a FocusHandle)> {
-        None
-    }
 }
 
 #[derive(Deref, DerefMut, Default, Clone, Debug, Eq, PartialEq, Hash)]
@@ -165,18 +154,6 @@ where
                 mut frame_state,
             } => {
                 let bounds = cx.layout_bounds(layout_id);
-                if let Some((input_handler, focus_handle)) =
-                    self.element.handle_text_input(view_state, cx)
-                {
-                    if focus_handle.is_focused(cx) {
-                        cx.window.requested_input_handler = Some(Box::new(WindowInputHandler {
-                            cx: cx.app.this.clone(),
-                            window: cx.window_handle(),
-                            input_handler,
-                            element_bounds: bounds,
-                        }));
-                    }
-                }
                 if let Some(id) = self.element.id() {
                     cx.with_element_state(id, |element_state, cx| {
                         let mut element_state = element_state.unwrap();

crates/gpui2/src/gpui2.rs 🔗

@@ -9,6 +9,7 @@ mod executor;
 mod focusable;
 mod geometry;
 mod image_cache;
+mod input;
 mod interactive;
 mod keymap;
 mod platform;
@@ -24,7 +25,6 @@ mod text_system;
 mod util;
 mod view;
 mod window;
-mod window_input_handler;
 
 mod private {
     /// A mechanism for restricting implementations of a trait to only those in GPUI.
@@ -45,6 +45,7 @@ pub use focusable::*;
 pub use geometry::*;
 pub use gpui2_macros::*;
 pub use image_cache::*;
+pub use input::*;
 pub use interactive::*;
 pub use keymap::*;
 pub use platform::*;
@@ -66,7 +67,6 @@ pub use text_system::*;
 pub use util::arc_cow::ArcCow;
 pub use view::*;
 pub use window::*;
-pub use window_input_handler::*;
 
 use derive_more::{Deref, DerefMut};
 use std::{

crates/gpui2/src/input.rs 🔗

@@ -0,0 +1,114 @@
+use crate::{AsyncWindowContext, Bounds, Pixels, PlatformInputHandler, View, ViewContext};
+use std::ops::Range;
+
+/// Implement this trait to allow views to handle textual input when implementing an editor, field, etc.
+///
+/// Once your view `V` implements this trait, you can use it to construct an [ElementInputHandler<V>].
+/// This input handler can then be assigned during paint by calling [WindowContext::handle_input].
+pub trait InputHandler: 'static + Sized {
+    fn text_for_range(&mut self, range: Range<usize>, cx: &mut ViewContext<Self>)
+        -> Option<String>;
+    fn selected_text_range(&mut self, cx: &mut ViewContext<Self>) -> Option<Range<usize>>;
+    fn marked_text_range(&self, cx: &mut ViewContext<Self>) -> Option<Range<usize>>;
+    fn unmark_text(&mut self, cx: &mut ViewContext<Self>);
+    fn replace_text_in_range(
+        &mut self,
+        range: Option<Range<usize>>,
+        text: &str,
+        cx: &mut ViewContext<Self>,
+    );
+    fn replace_and_mark_text_in_range(
+        &mut self,
+        range: Option<Range<usize>>,
+        new_text: &str,
+        new_selected_range: Option<Range<usize>>,
+        cx: &mut ViewContext<Self>,
+    );
+    fn bounds_for_range(
+        &mut self,
+        range_utf16: Range<usize>,
+        element_bounds: Bounds<Pixels>,
+        cx: &mut ViewContext<Self>,
+    ) -> Option<Bounds<Pixels>>;
+}
+
+/// The canonical implementation of `PlatformInputHandler`. Call `WindowContext::handle_input`
+/// with an instance during your element's paint.
+pub struct ElementInputHandler<V> {
+    view: View<V>,
+    element_bounds: Bounds<Pixels>,
+    cx: AsyncWindowContext,
+}
+
+impl<V: 'static> ElementInputHandler<V> {
+    /// Used in [Element::paint] with the element's bounds and a view context for its
+    /// containing view.
+    pub fn new(element_bounds: Bounds<Pixels>, cx: &mut ViewContext<V>) -> Self {
+        ElementInputHandler {
+            view: cx.view(),
+            element_bounds,
+            cx: cx.to_async(),
+        }
+    }
+}
+
+impl<V: InputHandler> PlatformInputHandler for ElementInputHandler<V> {
+    fn selected_text_range(&mut self) -> Option<Range<usize>> {
+        self.view
+            .update(&mut self.cx, |view, cx| view.selected_text_range(cx))
+            .ok()
+            .flatten()
+    }
+
+    fn marked_text_range(&mut self) -> Option<Range<usize>> {
+        self.view
+            .update(&mut self.cx, |view, cx| view.marked_text_range(cx))
+            .ok()
+            .flatten()
+    }
+
+    fn text_for_range(&mut self, range_utf16: Range<usize>) -> Option<String> {
+        self.view
+            .update(&mut self.cx, |view, cx| {
+                view.text_for_range(range_utf16, cx)
+            })
+            .ok()
+            .flatten()
+    }
+
+    fn replace_text_in_range(&mut self, replacement_range: Option<Range<usize>>, text: &str) {
+        self.view
+            .update(&mut self.cx, |view, cx| {
+                view.replace_text_in_range(replacement_range, text, cx)
+            })
+            .ok();
+    }
+
+    fn replace_and_mark_text_in_range(
+        &mut self,
+        range_utf16: Option<Range<usize>>,
+        new_text: &str,
+        new_selected_range: Option<Range<usize>>,
+    ) {
+        self.view
+            .update(&mut self.cx, |view, cx| {
+                view.replace_and_mark_text_in_range(range_utf16, new_text, new_selected_range, cx)
+            })
+            .ok();
+    }
+
+    fn unmark_text(&mut self) {
+        self.view
+            .update(&mut self.cx, |view, cx| view.unmark_text(cx))
+            .ok();
+    }
+
+    fn bounds_for_range(&mut self, range_utf16: Range<usize>) -> Option<Bounds<Pixels>> {
+        self.view
+            .update(&mut self.cx, |view, cx| {
+                view.bounds_for_range(range_utf16, self.element_bounds, cx)
+            })
+            .ok()
+            .flatten()
+    }
+}

crates/gpui2/src/platform.rs 🔗

@@ -293,10 +293,10 @@ impl From<TileId> for etagere::AllocId {
     }
 }
 
-pub trait PlatformInputHandler {
-    fn selected_text_range(&self) -> Option<Range<usize>>;
-    fn marked_text_range(&self) -> Option<Range<usize>>;
-    fn text_for_range(&self, range_utf16: Range<usize>) -> Option<String>;
+pub trait PlatformInputHandler: 'static {
+    fn selected_text_range(&mut self) -> Option<Range<usize>>;
+    fn marked_text_range(&mut self) -> Option<Range<usize>>;
+    fn text_for_range(&mut self, range_utf16: Range<usize>) -> Option<String>;
     fn replace_text_in_range(&mut self, replacement_range: Option<Range<usize>>, text: &str);
     fn replace_and_mark_text_in_range(
         &mut self,
@@ -305,7 +305,7 @@ pub trait PlatformInputHandler {
         new_selected_range: Option<Range<usize>>,
     );
     fn unmark_text(&mut self);
-    fn bounds_for_range(&self, range_utf16: Range<usize>) -> Option<Bounds<Pixels>>;
+    fn bounds_for_range(&mut self, range_utf16: Range<usize>) -> Option<Bounds<Pixels>>;
 }
 
 #[derive(Debug)]

crates/gpui2/src/window.rs 🔗

@@ -211,7 +211,6 @@ pub struct Window {
     default_prevented: bool,
     mouse_position: Point<Pixels>,
     requested_cursor_style: Option<CursorStyle>,
-    pub(crate) requested_input_handler: Option<Box<dyn PlatformInputHandler>>,
     scale_factor: f32,
     bounds: WindowBounds,
     bounds_observers: SubscriberSet<(), AnyObserver>,
@@ -311,7 +310,6 @@ impl Window {
             default_prevented: true,
             mouse_position,
             requested_cursor_style: None,
-            requested_input_handler: None,
             scale_factor,
             bounds,
             bounds_observers: SubscriberSet::new(),
@@ -1052,9 +1050,6 @@ impl<'a> WindowContext<'a> {
             .take()
             .unwrap_or(CursorStyle::Arrow);
         self.platform.set_cursor_style(cursor_style);
-        if let Some(handler) = self.window.requested_input_handler.take() {
-            self.window.platform_window.set_input_handler(handler);
-        }
 
         self.window.dirty = false;
     }
@@ -2182,6 +2177,21 @@ impl<'a, V: 'static> ViewContext<'a, V> {
             })
         });
     }
+
+    /// Set an input handler, such as [ElementInputHandler], which interfaces with the
+    /// platform to receive textual input with proper integration with concerns such
+    /// as IME interactions.
+    pub fn handle_input(
+        &mut self,
+        focus_handle: &FocusHandle,
+        input_handler: impl PlatformInputHandler,
+    ) {
+        if focus_handle.is_focused(self) {
+            self.window
+                .platform_window
+                .set_input_handler(Box::new(input_handler));
+        }
+    }
 }
 
 impl<V> ViewContext<'_, V> {

crates/gpui2/src/window_input_handler.rs 🔗

@@ -1,167 +0,0 @@
-use crate::{
-    AnyWindowHandle, AppCell, Bounds, Context, Pixels, PlatformInputHandler, View, ViewContext,
-    WindowContext,
-};
-use std::{ops::Range, rc::Weak};
-
-pub struct WindowInputHandler {
-    pub cx: Weak<AppCell>,
-    pub input_handler: Box<dyn InputHandlerView>,
-    pub window: AnyWindowHandle,
-    pub element_bounds: Bounds<Pixels>,
-}
-
-pub trait InputHandlerView {
-    fn text_for_range(&self, range: Range<usize>, cx: &mut WindowContext) -> Option<String>;
-    fn selected_text_range(&self, cx: &mut WindowContext) -> Option<Range<usize>>;
-    fn marked_text_range(&self, cx: &mut WindowContext) -> Option<Range<usize>>;
-    fn unmark_text(&self, cx: &mut WindowContext);
-    fn replace_text_in_range(
-        &self,
-        range: Option<Range<usize>>,
-        text: &str,
-        cx: &mut WindowContext,
-    );
-    fn replace_and_mark_text_in_range(
-        &self,
-        range: Option<Range<usize>>,
-        new_text: &str,
-        new_selected_range: Option<Range<usize>>,
-        cx: &mut WindowContext,
-    );
-    fn bounds_for_range(
-        &self,
-        range_utf16: std::ops::Range<usize>,
-        element_bounds: crate::Bounds<Pixels>,
-        cx: &mut WindowContext,
-    ) -> Option<crate::Bounds<Pixels>>;
-}
-
-pub trait InputHandler: Sized {
-    fn text_for_range(&self, range: Range<usize>, cx: &mut ViewContext<Self>) -> Option<String>;
-    fn selected_text_range(&self, cx: &mut ViewContext<Self>) -> Option<Range<usize>>;
-    fn marked_text_range(&self, cx: &mut ViewContext<Self>) -> Option<Range<usize>>;
-    fn unmark_text(&mut self, cx: &mut ViewContext<Self>);
-    fn replace_text_in_range(
-        &mut self,
-        range: Option<Range<usize>>,
-        text: &str,
-        cx: &mut ViewContext<Self>,
-    );
-    fn replace_and_mark_text_in_range(
-        &mut self,
-        range: Option<Range<usize>>,
-        new_text: &str,
-        new_selected_range: Option<Range<usize>>,
-        cx: &mut ViewContext<Self>,
-    );
-    fn bounds_for_range(
-        &mut self,
-        range_utf16: std::ops::Range<usize>,
-        element_bounds: crate::Bounds<Pixels>,
-        cx: &mut ViewContext<Self>,
-    ) -> Option<crate::Bounds<Pixels>>;
-}
-
-impl<V: InputHandler + 'static> InputHandlerView for View<V> {
-    fn text_for_range(&self, range: Range<usize>, cx: &mut WindowContext) -> Option<String> {
-        self.update(cx, |this, cx| this.text_for_range(range, cx))
-    }
-
-    fn selected_text_range(&self, cx: &mut WindowContext) -> Option<Range<usize>> {
-        self.update(cx, |this, cx| this.selected_text_range(cx))
-    }
-
-    fn marked_text_range(&self, cx: &mut WindowContext) -> Option<Range<usize>> {
-        self.update(cx, |this, cx| this.marked_text_range(cx))
-    }
-
-    fn unmark_text(&self, cx: &mut WindowContext) {
-        self.update(cx, |this, cx| this.unmark_text(cx))
-    }
-
-    fn replace_text_in_range(
-        &self,
-        range: Option<Range<usize>>,
-        text: &str,
-        cx: &mut WindowContext,
-    ) {
-        self.update(cx, |this, cx| this.replace_text_in_range(range, text, cx))
-    }
-
-    fn replace_and_mark_text_in_range(
-        &self,
-        range: Option<Range<usize>>,
-        new_text: &str,
-        new_selected_range: Option<Range<usize>>,
-        cx: &mut WindowContext,
-    ) {
-        self.update(cx, |this, cx| {
-            this.replace_and_mark_text_in_range(range, new_text, new_selected_range, cx)
-        })
-    }
-
-    fn bounds_for_range(
-        &self,
-        range_utf16: std::ops::Range<usize>,
-        element_bounds: crate::Bounds<Pixels>,
-        cx: &mut WindowContext,
-    ) -> Option<crate::Bounds<Pixels>> {
-        self.update(cx, |this, cx| {
-            this.bounds_for_range(range_utf16, element_bounds, cx)
-        })
-    }
-}
-
-impl PlatformInputHandler for WindowInputHandler {
-    fn selected_text_range(&self) -> Option<Range<usize>> {
-        self.update(|handler, cx| handler.selected_text_range(cx))
-            .flatten()
-    }
-
-    fn marked_text_range(&self) -> Option<Range<usize>> {
-        self.update(|handler, cx| handler.marked_text_range(cx))
-            .flatten()
-    }
-
-    fn text_for_range(&self, range_utf16: Range<usize>) -> Option<String> {
-        self.update(|handler, cx| handler.text_for_range(range_utf16, cx))
-            .flatten()
-    }
-
-    fn replace_text_in_range(&mut self, replacement_range: Option<Range<usize>>, text: &str) {
-        self.update(|handler, cx| handler.replace_text_in_range(replacement_range, text, cx));
-    }
-
-    fn replace_and_mark_text_in_range(
-        &mut self,
-        range_utf16: Option<Range<usize>>,
-        new_text: &str,
-        new_selected_range: Option<Range<usize>>,
-    ) {
-        self.update(|handler, cx| {
-            handler.replace_and_mark_text_in_range(range_utf16, new_text, new_selected_range, cx)
-        });
-    }
-
-    fn unmark_text(&mut self) {
-        self.update(|handler, cx| handler.unmark_text(cx));
-    }
-
-    fn bounds_for_range(&self, range_utf16: Range<usize>) -> Option<Bounds<Pixels>> {
-        self.update(|handler, cx| handler.bounds_for_range(range_utf16, self.element_bounds, cx))
-            .flatten()
-    }
-}
-
-impl WindowInputHandler {
-    fn update<R>(
-        &self,
-        f: impl FnOnce(&dyn InputHandlerView, &mut WindowContext) -> R,
-    ) -> Option<R> {
-        let cx = self.cx.upgrade()?;
-        let mut cx = cx.borrow_mut();
-        cx.update_window(self.window, |_, cx| f(&*self.input_handler, cx))
-            .ok()
-    }
-}

crates/notifications2/Cargo.toml 🔗

@@ -0,0 +1,42 @@
+[package]
+name = "notifications2"
+version = "0.1.0"
+edition = "2021"
+publish = false
+
+[lib]
+path = "src/notification_store2.rs"
+doctest = false
+
+[features]
+test-support = [
+    "channel/test-support",
+    "collections/test-support",
+    "gpui/test-support",
+    "rpc/test-support",
+]
+
+[dependencies]
+channel = { package = "channel2", path = "../channel2" }
+client = { package = "client2", path = "../client2" }
+clock = { path = "../clock" }
+collections = { path = "../collections" }
+db = { package = "db2", path = "../db2" }
+feature_flags = { package = "feature_flags2", path = "../feature_flags2" }
+gpui = { package = "gpui2", path = "../gpui2" }
+rpc = { package = "rpc2", path = "../rpc2" }
+settings = { package = "settings2", path = "../settings2" }
+sum_tree = { path = "../sum_tree" }
+text = { package = "text2", path = "../text2" }
+util = { path = "../util" }
+
+anyhow.workspace = true
+time.workspace = true
+
+[dev-dependencies]
+client = { package = "client2", path = "../client2", features = ["test-support"] }
+collections = { path = "../collections", features = ["test-support"] }
+gpui = { package = "gpui2", path = "../gpui2", features = ["test-support"] }
+rpc = { package = "rpc2", path = "../rpc2", features = ["test-support"] }
+settings = { package = "settings2", path = "../settings2", features = ["test-support"] }
+util = { path = "../util", features = ["test-support"] }

crates/notifications2/src/notification_store2.rs 🔗

@@ -0,0 +1,466 @@
+use anyhow::{Context, Result};
+use channel::{ChannelMessage, ChannelMessageId, ChannelStore};
+use client::{Client, UserStore};
+use collections::HashMap;
+use db::smol::stream::StreamExt;
+use gpui::{AppContext, AsyncAppContext, Context as _, EventEmitter, Model, ModelContext, Task};
+use rpc::{proto, Notification, TypedEnvelope};
+use std::{ops::Range, sync::Arc};
+use sum_tree::{Bias, SumTree};
+use time::OffsetDateTime;
+use util::ResultExt;
+
+pub fn init(client: Arc<Client>, user_store: Model<UserStore>, cx: &mut AppContext) {
+    let notification_store = cx.build_model(|cx| NotificationStore::new(client, user_store, cx));
+    cx.set_global(notification_store);
+}
+
+pub struct NotificationStore {
+    client: Arc<Client>,
+    user_store: Model<UserStore>,
+    channel_messages: HashMap<u64, ChannelMessage>,
+    channel_store: Model<ChannelStore>,
+    notifications: SumTree<NotificationEntry>,
+    loaded_all_notifications: bool,
+    _watch_connection_status: Task<Option<()>>,
+    _subscriptions: Vec<client::Subscription>,
+}
+
+#[derive(Clone, PartialEq, Eq, Debug)]
+pub enum NotificationEvent {
+    NotificationsUpdated {
+        old_range: Range<usize>,
+        new_count: usize,
+    },
+    NewNotification {
+        entry: NotificationEntry,
+    },
+    NotificationRemoved {
+        entry: NotificationEntry,
+    },
+    NotificationRead {
+        entry: NotificationEntry,
+    },
+}
+
+#[derive(Debug, PartialEq, Eq, Clone)]
+pub struct NotificationEntry {
+    pub id: u64,
+    pub notification: Notification,
+    pub timestamp: OffsetDateTime,
+    pub is_read: bool,
+    pub response: Option<bool>,
+}
+
+#[derive(Clone, Debug, Default)]
+pub struct NotificationSummary {
+    max_id: u64,
+    count: usize,
+    unread_count: usize,
+}
+
+#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, PartialOrd, Ord)]
+struct Count(usize);
+
+#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, PartialOrd, Ord)]
+struct UnreadCount(usize);
+
+#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, PartialOrd, Ord)]
+struct NotificationId(u64);
+
+impl NotificationStore {
+    pub fn global(cx: &AppContext) -> Model<Self> {
+        cx.global::<Model<Self>>().clone()
+    }
+
+    pub fn new(
+        client: Arc<Client>,
+        user_store: Model<UserStore>,
+        cx: &mut ModelContext<Self>,
+    ) -> Self {
+        let mut connection_status = client.status();
+        let watch_connection_status = cx.spawn(|this, mut cx| async move {
+            while let Some(status) = connection_status.next().await {
+                let this = this.upgrade()?;
+                match status {
+                    client::Status::Connected { .. } => {
+                        if let Some(task) = this
+                            .update(&mut cx, |this, cx| this.handle_connect(cx))
+                            .log_err()?
+                        {
+                            task.await.log_err()?;
+                        }
+                    }
+                    _ => this
+                        .update(&mut cx, |this, cx| this.handle_disconnect(cx))
+                        .log_err()?,
+                }
+            }
+            Some(())
+        });
+
+        Self {
+            channel_store: ChannelStore::global(cx),
+            notifications: Default::default(),
+            loaded_all_notifications: false,
+            channel_messages: Default::default(),
+            _watch_connection_status: watch_connection_status,
+            _subscriptions: vec![
+                client.add_message_handler(cx.weak_model(), Self::handle_new_notification),
+                client.add_message_handler(cx.weak_model(), Self::handle_delete_notification),
+            ],
+            user_store,
+            client,
+        }
+    }
+
+    pub fn notification_count(&self) -> usize {
+        self.notifications.summary().count
+    }
+
+    pub fn unread_notification_count(&self) -> usize {
+        self.notifications.summary().unread_count
+    }
+
+    pub fn channel_message_for_id(&self, id: u64) -> Option<&ChannelMessage> {
+        self.channel_messages.get(&id)
+    }
+
+    // Get the nth newest notification.
+    pub fn notification_at(&self, ix: usize) -> Option<&NotificationEntry> {
+        let count = self.notifications.summary().count;
+        if ix >= count {
+            return None;
+        }
+        let ix = count - 1 - ix;
+        let mut cursor = self.notifications.cursor::<Count>();
+        cursor.seek(&Count(ix), Bias::Right, &());
+        cursor.item()
+    }
+
+    pub fn notification_for_id(&self, id: u64) -> Option<&NotificationEntry> {
+        let mut cursor = self.notifications.cursor::<NotificationId>();
+        cursor.seek(&NotificationId(id), Bias::Left, &());
+        if let Some(item) = cursor.item() {
+            if item.id == id {
+                return Some(item);
+            }
+        }
+        None
+    }
+
+    pub fn load_more_notifications(
+        &self,
+        clear_old: bool,
+        cx: &mut ModelContext<Self>,
+    ) -> Option<Task<Result<()>>> {
+        if self.loaded_all_notifications && !clear_old {
+            return None;
+        }
+
+        let before_id = if clear_old {
+            None
+        } else {
+            self.notifications.first().map(|entry| entry.id)
+        };
+        let request = self.client.request(proto::GetNotifications { before_id });
+        Some(cx.spawn(|this, mut cx| async move {
+            let this = this
+                .upgrade()
+                .context("Notification store was dropped while loading notifications")?;
+
+            let response = request.await?;
+            this.update(&mut cx, |this, _| {
+                this.loaded_all_notifications = response.done
+            })?;
+            Self::add_notifications(
+                this,
+                response.notifications,
+                AddNotificationsOptions {
+                    is_new: false,
+                    clear_old,
+                    includes_first: response.done,
+                },
+                cx,
+            )
+            .await?;
+            Ok(())
+        }))
+    }
+
+    fn handle_connect(&mut self, cx: &mut ModelContext<Self>) -> Option<Task<Result<()>>> {
+        self.notifications = Default::default();
+        self.channel_messages = Default::default();
+        cx.notify();
+        self.load_more_notifications(true, cx)
+    }
+
+    fn handle_disconnect(&mut self, cx: &mut ModelContext<Self>) {
+        cx.notify()
+    }
+
+    async fn handle_new_notification(
+        this: Model<Self>,
+        envelope: TypedEnvelope<proto::AddNotification>,
+        _: Arc<Client>,
+        cx: AsyncAppContext,
+    ) -> Result<()> {
+        Self::add_notifications(
+            this,
+            envelope.payload.notification.into_iter().collect(),
+            AddNotificationsOptions {
+                is_new: true,
+                clear_old: false,
+                includes_first: false,
+            },
+            cx,
+        )
+        .await
+    }
+
+    async fn handle_delete_notification(
+        this: Model<Self>,
+        envelope: TypedEnvelope<proto::DeleteNotification>,
+        _: Arc<Client>,
+        mut cx: AsyncAppContext,
+    ) -> Result<()> {
+        this.update(&mut cx, |this, cx| {
+            this.splice_notifications([(envelope.payload.notification_id, None)], false, cx);
+            Ok(())
+        })?
+    }
+
+    async fn add_notifications(
+        this: Model<Self>,
+        notifications: Vec<proto::Notification>,
+        options: AddNotificationsOptions,
+        mut cx: AsyncAppContext,
+    ) -> Result<()> {
+        let mut user_ids = Vec::new();
+        let mut message_ids = Vec::new();
+
+        let notifications = notifications
+            .into_iter()
+            .filter_map(|message| {
+                Some(NotificationEntry {
+                    id: message.id,
+                    is_read: message.is_read,
+                    timestamp: OffsetDateTime::from_unix_timestamp(message.timestamp as i64)
+                        .ok()?,
+                    notification: Notification::from_proto(&message)?,
+                    response: message.response,
+                })
+            })
+            .collect::<Vec<_>>();
+        if notifications.is_empty() {
+            return Ok(());
+        }
+
+        for entry in &notifications {
+            match entry.notification {
+                Notification::ChannelInvitation { inviter_id, .. } => {
+                    user_ids.push(inviter_id);
+                }
+                Notification::ContactRequest {
+                    sender_id: requester_id,
+                } => {
+                    user_ids.push(requester_id);
+                }
+                Notification::ContactRequestAccepted {
+                    responder_id: contact_id,
+                } => {
+                    user_ids.push(contact_id);
+                }
+                Notification::ChannelMessageMention {
+                    sender_id,
+                    message_id,
+                    ..
+                } => {
+                    user_ids.push(sender_id);
+                    message_ids.push(message_id);
+                }
+            }
+        }
+
+        let (user_store, channel_store) = this.read_with(&cx, |this, _| {
+            (this.user_store.clone(), this.channel_store.clone())
+        })?;
+
+        user_store
+            .update(&mut cx, |store, cx| store.get_users(user_ids, cx))?
+            .await?;
+        let messages = channel_store
+            .update(&mut cx, |store, cx| {
+                store.fetch_channel_messages(message_ids, cx)
+            })?
+            .await?;
+        this.update(&mut cx, |this, cx| {
+            if options.clear_old {
+                cx.emit(NotificationEvent::NotificationsUpdated {
+                    old_range: 0..this.notifications.summary().count,
+                    new_count: 0,
+                });
+                this.notifications = SumTree::default();
+                this.channel_messages.clear();
+                this.loaded_all_notifications = false;
+            }
+
+            if options.includes_first {
+                this.loaded_all_notifications = true;
+            }
+
+            this.channel_messages
+                .extend(messages.into_iter().filter_map(|message| {
+                    if let ChannelMessageId::Saved(id) = message.id {
+                        Some((id, message))
+                    } else {
+                        None
+                    }
+                }));
+
+            this.splice_notifications(
+                notifications
+                    .into_iter()
+                    .map(|notification| (notification.id, Some(notification))),
+                options.is_new,
+                cx,
+            );
+        })
+        .log_err();
+
+        Ok(())
+    }
+
+    fn splice_notifications(
+        &mut self,
+        notifications: impl IntoIterator<Item = (u64, Option<NotificationEntry>)>,
+        is_new: bool,
+        cx: &mut ModelContext<'_, NotificationStore>,
+    ) {
+        let mut cursor = self.notifications.cursor::<(NotificationId, Count)>();
+        let mut new_notifications = SumTree::new();
+        let mut old_range = 0..0;
+
+        for (i, (id, new_notification)) in notifications.into_iter().enumerate() {
+            new_notifications.append(cursor.slice(&NotificationId(id), Bias::Left, &()), &());
+
+            if i == 0 {
+                old_range.start = cursor.start().1 .0;
+            }
+
+            let old_notification = cursor.item();
+            if let Some(old_notification) = old_notification {
+                if old_notification.id == id {
+                    cursor.next(&());
+
+                    if let Some(new_notification) = &new_notification {
+                        if new_notification.is_read {
+                            cx.emit(NotificationEvent::NotificationRead {
+                                entry: new_notification.clone(),
+                            });
+                        }
+                    } else {
+                        cx.emit(NotificationEvent::NotificationRemoved {
+                            entry: old_notification.clone(),
+                        });
+                    }
+                }
+            } else if let Some(new_notification) = &new_notification {
+                if is_new {
+                    cx.emit(NotificationEvent::NewNotification {
+                        entry: new_notification.clone(),
+                    });
+                }
+            }
+
+            if let Some(notification) = new_notification {
+                new_notifications.push(notification, &());
+            }
+        }
+
+        old_range.end = cursor.start().1 .0;
+        let new_count = new_notifications.summary().count - old_range.start;
+        new_notifications.append(cursor.suffix(&()), &());
+        drop(cursor);
+
+        self.notifications = new_notifications;
+        cx.emit(NotificationEvent::NotificationsUpdated {
+            old_range,
+            new_count,
+        });
+    }
+
+    pub fn respond_to_notification(
+        &mut self,
+        notification: Notification,
+        response: bool,
+        cx: &mut ModelContext<Self>,
+    ) {
+        match notification {
+            Notification::ContactRequest { sender_id } => {
+                self.user_store
+                    .update(cx, |store, cx| {
+                        store.respond_to_contact_request(sender_id, response, cx)
+                    })
+                    .detach();
+            }
+            Notification::ChannelInvitation { channel_id, .. } => {
+                self.channel_store
+                    .update(cx, |store, cx| {
+                        store.respond_to_channel_invite(channel_id, response, cx)
+                    })
+                    .detach();
+            }
+            _ => {}
+        }
+    }
+}
+
+impl EventEmitter<NotificationEvent> for NotificationStore {}
+
+impl sum_tree::Item for NotificationEntry {
+    type Summary = NotificationSummary;
+
+    fn summary(&self) -> Self::Summary {
+        NotificationSummary {
+            max_id: self.id,
+            count: 1,
+            unread_count: if self.is_read { 0 } else { 1 },
+        }
+    }
+}
+
+impl sum_tree::Summary for NotificationSummary {
+    type Context = ();
+
+    fn add_summary(&mut self, summary: &Self, _: &()) {
+        self.max_id = self.max_id.max(summary.max_id);
+        self.count += summary.count;
+        self.unread_count += summary.unread_count;
+    }
+}
+
+impl<'a> sum_tree::Dimension<'a, NotificationSummary> for NotificationId {
+    fn add_summary(&mut self, summary: &NotificationSummary, _: &()) {
+        debug_assert!(summary.max_id > self.0);
+        self.0 = summary.max_id;
+    }
+}
+
+impl<'a> sum_tree::Dimension<'a, NotificationSummary> for Count {
+    fn add_summary(&mut self, summary: &NotificationSummary, _: &()) {
+        self.0 += summary.count;
+    }
+}
+
+impl<'a> sum_tree::Dimension<'a, NotificationSummary> for UnreadCount {
+    fn add_summary(&mut self, summary: &NotificationSummary, _: &()) {
+        self.0 += summary.unread_count;
+    }
+}
+
+struct AddNotificationsOptions {
+    is_new: bool,
+    clear_old: bool,
+    includes_first: bool,
+}