WIP

Antonio Scandurra created

Change summary

Cargo.lock                            |   2 
crates/client2/Cargo.toml             |   4 
crates/client2/src/client2.rs         | 803 +++++++++++++++-------------
crates/client2/src/telemetry.rs       |   4 
crates/client2/src/test.rs            | 430 +++++++-------
crates/client2/src/user.rs            |  12 
crates/gpui2/src/app.rs               |  45 +
crates/gpui2/src/app/async_context.rs |  33 +
crates/gpui2/src/app/entity_map.rs    | 153 ++++-
crates/gpui2/src/app/model_context.rs |   8 
crates/gpui2/src/executor.rs          |   6 
crates/gpui2/src/window.rs            | 109 ++-
12 files changed, 916 insertions(+), 693 deletions(-)

Detailed changes

Cargo.lock 🔗

@@ -1460,7 +1460,6 @@ dependencies = [
  "db",
  "feature_flags",
  "futures 0.3.28",
- "gpui",
  "gpui2",
  "image",
  "lazy_static",
@@ -1473,6 +1472,7 @@ dependencies = [
  "serde",
  "serde_derive",
  "settings",
+ "settings2",
  "smol",
  "sum_tree",
  "sysinfo",

crates/client2/Cargo.toml 🔗

@@ -9,7 +9,7 @@ path = "src/client2.rs"
 doctest = false
 
 [features]
-test-support = ["collections/test-support", "gpui/test-support", "rpc/test-support"]
+test-support = ["collections/test-support", "gpui2/test-support", "rpc/test-support"]
 
 [dependencies]
 collections = { path = "../collections" }
@@ -18,7 +18,7 @@ gpui2 = { path = "../gpui2" }
 util = { path = "../util" }
 rpc = { path = "../rpc" }
 text = { path = "../text" }
-settings = { path = "../settings" }
+settings2 = { path = "../settings2" }
 feature_flags = { path = "../feature_flags" }
 sum_tree = { path = "../sum_tree" }
 

crates/client2/src/client2.rs 🔗

@@ -4,7 +4,7 @@ pub mod test;
 pub mod telemetry;
 pub mod user;
 
-use anyhow::{anyhow, Context, Result};
+use anyhow::{anyhow, Context as _, Result};
 use async_recursion::async_recursion;
 use async_tungstenite::tungstenite::{
     error::Error as WebsocketError,
@@ -14,10 +14,9 @@ use futures::{
     future::LocalBoxFuture, AsyncReadExt, FutureExt, SinkExt, StreamExt, TryFutureExt as _,
     TryStreamExt,
 };
-use gpui::{
-    actions, platform::AppVersion, serde_json, AnyModelHandle, AnyWeakModelHandle,
-    AnyWeakViewHandle, AppContext, AsyncAppContext, Entity, ModelHandle, Task, View, ViewContext,
-    WeakViewHandle,
+use gpui2::{
+    serde_json, AnyHandle, AnyWeakHandle, AnyWindowHandle, AppContext, AsyncAppContext,
+    AsyncWindowContext, Handle, SemanticVersion, Task, ViewContext, WeakHandle, WindowId,
 };
 use lazy_static::lazy_static;
 use parking_lot::RwLock;
@@ -57,7 +56,7 @@ lazy_static! {
     pub static ref ADMIN_API_TOKEN: Option<String> = std::env::var("ZED_ADMIN_API_TOKEN")
         .ok()
         .and_then(|s| if s.is_empty() { None } else { Some(s) });
-    pub static ref ZED_APP_VERSION: Option<AppVersion> = std::env::var("ZED_APP_VERSION")
+    pub static ref ZED_APP_VERSION: Option<SemanticVersion> = std::env::var("ZED_APP_VERSION")
         .ok()
         .and_then(|v| v.parse().ok());
     pub static ref ZED_APP_PATH: Option<PathBuf> =
@@ -70,17 +69,25 @@ pub const ZED_SECRET_CLIENT_TOKEN: &str = "618033988749894";
 pub const INITIAL_RECONNECTION_DELAY: Duration = Duration::from_millis(100);
 pub const CONNECTION_TIMEOUT: Duration = Duration::from_secs(5);
 
-actions!(client, [SignIn, SignOut, Reconnect]);
+#[derive(Clone, Default, PartialEq, Deserialize)]
+pub struct SignIn;
+
+#[derive(Clone, Default, PartialEq, Deserialize)]
+pub struct SignOut;
+
+#[derive(Clone, Default, PartialEq, Deserialize)]
+pub struct Reconnect;
 
 pub fn init_settings(cx: &mut AppContext) {
-    settings::register::<TelemetrySettings>(cx);
+    settings2::register::<TelemetrySettings>(cx);
 }
 
 pub fn init(client: &Arc<Client>, cx: &mut AppContext) {
     init_settings(cx);
 
     let client = Arc::downgrade(client);
-    cx.add_global_action({
+    cx.register_action_type::<SignIn>();
+    cx.on_action({
         let client = client.clone();
         move |_: &SignIn, cx| {
             if let Some(client) = client.upgrade() {
@@ -91,7 +98,9 @@ pub fn init(client: &Arc<Client>, cx: &mut AppContext) {
             }
         }
     });
-    cx.add_global_action({
+
+    cx.register_action_type::<SignOut>();
+    cx.on_action({
         let client = client.clone();
         move |_: &SignOut, cx| {
             if let Some(client) = client.upgrade() {
@@ -102,7 +111,9 @@ pub fn init(client: &Arc<Client>, cx: &mut AppContext) {
             }
         }
     });
-    cx.add_global_action({
+
+    cx.register_action_type::<Reconnect>();
+    cx.on_action({
         let client = client.clone();
         move |_: &Reconnect, cx| {
             if let Some(client) = client.upgrade() {
@@ -216,7 +227,7 @@ struct ClientState {
     _reconnect_task: Option<Task<()>>,
     reconnect_interval: Duration,
     entities_by_type_and_remote_id: HashMap<(TypeId, u64), WeakSubscriber>,
-    models_by_message_type: HashMap<TypeId, AnyWeakModelHandle>,
+    models_by_message_type: HashMap<TypeId, AnyWeakHandle>,
     entity_types_by_message_type: HashMap<TypeId, TypeId>,
     #[allow(clippy::type_complexity)]
     message_handlers: HashMap<
@@ -235,14 +246,16 @@ struct ClientState {
 }
 
 enum WeakSubscriber {
-    Model(AnyWeakModelHandle),
-    View(AnyWeakViewHandle),
+    Entity {
+        handle: AnyWeakHandle,
+        window_id: Option<WindowId>,
+    },
     Pending(Vec<Box<dyn AnyTypedEnvelope>>),
 }
 
-enum Subscriber {
-    Model(AnyModelHandle),
-    View(AnyWeakViewHandle),
+struct Subscriber {
+    handle: AnyHandle,
+    window_handle: Option<AnyWindowHandle>,
 }
 
 #[derive(Clone, Debug)]
@@ -298,15 +311,18 @@ impl Drop for Subscription {
     }
 }
 
-pub struct PendingEntitySubscription<T: Entity> {
+pub struct PendingEntitySubscription<T> {
     client: Arc<Client>,
     remote_id: u64,
     _entity_type: PhantomData<T>,
     consumed: bool,
 }
 
-impl<T: Entity> PendingEntitySubscription<T> {
-    pub fn set_model(mut self, model: &ModelHandle<T>, cx: &mut AsyncAppContext) -> Subscription {
+impl<T> PendingEntitySubscription<T>
+where
+    T: 'static + Send + Sync,
+{
+    pub fn set_model(mut self, model: &Handle<T>, cx: &mut AsyncAppContext) -> Subscription {
         self.consumed = true;
         let mut state = self.client.state.write();
         let id = (TypeId::of::<T>(), self.remote_id);
@@ -316,9 +332,13 @@ impl<T: Entity> PendingEntitySubscription<T> {
             unreachable!()
         };
 
-        state
-            .entities_by_type_and_remote_id
-            .insert(id, WeakSubscriber::Model(model.downgrade().into_any()));
+        state.entities_by_type_and_remote_id.insert(
+            id,
+            WeakSubscriber::Entity {
+                handle: model.downgrade().into(),
+                window_id: None,
+            },
+        );
         drop(state);
         for message in messages {
             self.client.handle_message(message, cx);
@@ -330,7 +350,7 @@ impl<T: Entity> PendingEntitySubscription<T> {
     }
 }
 
-impl<T: Entity> Drop for PendingEntitySubscription<T> {
+impl<T> Drop for PendingEntitySubscription<T> {
     fn drop(&mut self) {
         if !self.consumed {
             let mut state = self.client.state.write();
@@ -358,7 +378,7 @@ pub struct TelemetrySettingsContent {
     pub metrics: Option<bool>,
 }
 
-impl settings::Setting for TelemetrySettings {
+impl settings2::Setting for TelemetrySettings {
     const KEY: Option<&'static str> = Some("telemetry");
 
     type FileContent = TelemetrySettingsContent;
@@ -509,23 +529,26 @@ impl Client {
         }
     }
 
-    pub fn add_view_for_remote_entity<T: View>(
+    pub fn add_view_for_remote_entity<T>(
         self: &Arc<Self>,
         remote_id: u64,
         cx: &mut ViewContext<T>,
     ) -> Subscription {
         let id = (TypeId::of::<T>(), remote_id);
-        self.state
-            .write()
-            .entities_by_type_and_remote_id
-            .insert(id, WeakSubscriber::View(cx.weak_handle().into_any()));
+        self.state.write().entities_by_type_and_remote_id.insert(
+            id,
+            WeakSubscriber::Entity {
+                handle: cx.handle().into_any(),
+                window_id: Some(cx.window_id()),
+            },
+        );
         Subscription::Entity {
             client: Arc::downgrade(self),
             id,
         }
     }
 
-    pub fn subscribe_to_entity<T: Entity>(
+    pub fn subscribe_to_entity<T>(
         self: &Arc<Self>,
         remote_id: u64,
     ) -> Result<PendingEntitySubscription<T>> {
@@ -550,16 +573,13 @@ impl Client {
     #[track_caller]
     pub fn add_message_handler<M, E, H, F>(
         self: &Arc<Self>,
-        model: ModelHandle<E>,
+        model: Handle<E>,
         handler: H,
     ) -> Subscription
     where
         M: EnvelopedMessage,
-        E: Entity,
-        H: 'static
-            + Send
-            + Sync
-            + Fn(ModelHandle<E>, TypedEnvelope<M>, Arc<Self>, AsyncAppContext) -> F,
+        E: 'static + Send + Sync,
+        H: 'static + Send + Sync + Fn(Handle<E>, TypedEnvelope<M>, Arc<Self>, AsyncAppContext) -> F,
         F: 'static + Future<Output = Result<()>>,
     {
         let message_type_id = TypeId::of::<M>();
@@ -600,16 +620,13 @@ impl Client {
 
     pub fn add_request_handler<M, E, H, F>(
         self: &Arc<Self>,
-        model: ModelHandle<E>,
+        model: Handle<E>,
         handler: H,
     ) -> Subscription
     where
         M: RequestMessage,
-        E: Entity,
-        H: 'static
-            + Send
-            + Sync
-            + Fn(ModelHandle<E>, TypedEnvelope<M>, Arc<Self>, AsyncAppContext) -> F,
+        E: 'static + Send + Sync,
+        H: 'static + Send + Sync + Fn(Handle<E>, TypedEnvelope<M>, Arc<Self>, AsyncAppContext) -> F,
         F: 'static + Future<Output = Result<M::Response>>,
     {
         self.add_message_handler(model, move |handle, envelope, this, cx| {
@@ -624,18 +641,24 @@ impl Client {
     pub fn add_view_message_handler<M, E, H, F>(self: &Arc<Self>, handler: H)
     where
         M: EntityMessage,
-        E: View,
         H: 'static
             + Send
             + Sync
-            + Fn(WeakViewHandle<E>, TypedEnvelope<M>, Arc<Self>, AsyncAppContext) -> F,
+            + Fn(WeakHandle<E>, TypedEnvelope<M>, Arc<Self>, AsyncWindowContext) -> F,
         F: 'static + Future<Output = Result<()>>,
     {
-        self.add_entity_message_handler::<M, E, _, _>(move |handle, message, client, cx| {
-            if let Subscriber::View(handle) = handle {
-                handler(handle.downcast::<E>().unwrap(), message, client, cx)
+        self.add_entity_message_handler::<M, E, _, _>(move |subscriber, message, client, cx| {
+            if let Some(window_handle) = subscriber.window_handle {
+                cx.update_window(subscriber, |cx| {
+                    handler(
+                        subscriber.handle.downcast::<E>().unwrap(),
+                        message,
+                        client,
+                        cx,
+                    )
+                })
             } else {
-                unreachable!();
+                panic!()
             }
         })
     }
@@ -643,26 +666,23 @@ impl Client {
     pub fn add_model_message_handler<M, E, H, F>(self: &Arc<Self>, handler: H)
     where
         M: EntityMessage,
-        E: Entity,
-        H: 'static
-            + Send
-            + Sync
-            + Fn(ModelHandle<E>, TypedEnvelope<M>, Arc<Self>, AsyncAppContext) -> F,
+        E: 'static + Send + Sync,
+        H: 'static + Send + Sync + Fn(Handle<E>, TypedEnvelope<M>, Arc<Self>, AsyncAppContext) -> F,
         F: 'static + Future<Output = Result<()>>,
     {
-        self.add_entity_message_handler::<M, E, _, _>(move |handle, message, client, cx| {
-            if let Subscriber::Model(handle) = handle {
-                handler(handle.downcast::<E>().unwrap(), message, client, cx)
-            } else {
-                unreachable!();
-            }
+        self.add_entity_message_handler::<M, E, _, _>(move |subscriber, message, client, cx| {
+            handler(
+                subscriber.handle.downcast::<E>().unwrap(),
+                message,
+                client,
+                cx,
+            )
         })
     }
 
     fn add_entity_message_handler<M, E, H, F>(self: &Arc<Self>, handler: H)
     where
         M: EntityMessage,
-        E: Entity,
         H: 'static
             + Send
             + Sync
@@ -704,11 +724,8 @@ impl Client {
     pub fn add_model_request_handler<M, E, H, F>(self: &Arc<Self>, handler: H)
     where
         M: EntityMessage + RequestMessage,
-        E: Entity,
-        H: 'static
-            + Send
-            + Sync
-            + Fn(ModelHandle<E>, TypedEnvelope<M>, Arc<Self>, AsyncAppContext) -> F,
+        E: 'static + Send + Sync,
+        H: 'static + Send + Sync + Fn(Handle<E>, TypedEnvelope<M>, Arc<Self>, AsyncAppContext) -> F,
         F: 'static + Future<Output = Result<M::Response>>,
     {
         self.add_model_message_handler(move |entity, envelope, client, cx| {
@@ -723,11 +740,11 @@ impl Client {
     pub fn add_view_request_handler<M, E, H, F>(self: &Arc<Self>, handler: H)
     where
         M: EntityMessage + RequestMessage,
-        E: View,
+        E: 'static + Send + Sync,
         H: 'static
             + Send
             + Sync
-            + Fn(WeakViewHandle<E>, TypedEnvelope<M>, Arc<Self>, AsyncAppContext) -> F,
+            + Fn(Handle<E>, TypedEnvelope<M>, Arc<Self>, AsyncWindowContext) -> F,
         F: 'static + Future<Output = Result<M::Response>>,
     {
         self.add_view_message_handler(move |entity, envelope, client, cx| {
@@ -823,14 +840,14 @@ impl Client {
             self.set_status(Status::Reconnecting, cx);
         }
 
-        let mut timeout = cx.background().timer(CONNECTION_TIMEOUT).fuse();
+        let mut timeout = futures::FutureExt::fuse(cx.executor()?.timer(CONNECTION_TIMEOUT));
         futures::select_biased! {
             connection = self.establish_connection(&credentials, cx).fuse() => {
                 match connection {
                     Ok(conn) => {
                         self.state.write().credentials = Some(credentials.clone());
                         if !read_from_keychain && IMPERSONATE_LOGIN.is_none() {
-                            write_credentials_to_keychain(&credentials, cx).log_err();
+                            write_credentials_to_keychain(credentials, cx).log_err();
                         }
 
                         futures::select_biased! {
@@ -844,7 +861,7 @@ impl Client {
                     Err(EstablishConnectionError::Unauthorized) => {
                         self.state.write().credentials.take();
                         if read_from_keychain {
-                            cx.platform().delete_credentials(&ZED_SERVER_URL).log_err();
+                            delete_credentials_from_keychain(cx).log_err();
                             self.set_status(Status::SignedOut, cx);
                             self.authenticate_and_connect(false, cx).await
                         } else {
@@ -874,12 +891,12 @@ impl Client {
         conn: Connection,
         cx: &AsyncAppContext,
     ) -> Result<()> {
-        let executor = cx.background();
+        let executor = cx.executor()?;
         log::info!("add connection to peer");
         let (connection_id, handle_io, mut incoming) = self
             .peer
             .add_connection(conn, move |duration| executor.timer(duration));
-        let handle_io = cx.background().spawn(handle_io);
+        let handle_io = executor.spawn(handle_io);
 
         let peer_id = async {
             log::info!("waiting for server hello");
@@ -925,10 +942,10 @@ impl Client {
             },
             cx,
         );
-        cx.foreground()
-            .spawn({
-                let cx = cx.clone();
-                let this = self.clone();
+
+        cx.spawn({
+            let this = self.clone();
+            |cx| {
                 async move {
                     while let Some(message) = incoming.next().await {
                         this.handle_message(message, &cx);
@@ -936,13 +953,13 @@ impl Client {
                         smol::future::yield_now().await;
                     }
                 }
-            })
-            .detach();
+            }
+        })?
+        .detach();
 
-        let this = self.clone();
-        let cx = cx.clone();
-        cx.foreground()
-            .spawn(async move {
+        cx.spawn({
+            let this = self.clone();
+            move |cx| async move {
                 match handle_io.await {
                     Ok(()) => {
                         if this.status().borrow().clone()
@@ -959,8 +976,8 @@ impl Client {
                         this.set_status(Status::ConnectionLost, &cx);
                     }
                 }
-            })
-            .detach();
+            }
+        })?;
 
         Ok(())
     }
@@ -1299,12 +1316,15 @@ impl Client {
 
         let mut subscriber = None;
 
-        if let Some(message_model) = state
+        if let Some(handle) = state
             .models_by_message_type
             .get(&payload_type_id)
-            .and_then(|model| model.upgrade(cx))
+            .and_then(|handle| handle.upgrade())
         {
-            subscriber = Some(Subscriber::Model(message_model));
+            subscriber = Some(Subscriber {
+                handle,
+                window_handle: None,
+            });
         } else if let Some((extract_entity_id, entity_type_id)) =
             state.entity_id_extractors.get(&payload_type_id).zip(
                 state
@@ -1393,28 +1413,39 @@ impl Client {
     }
 }
 
-fn read_credentials_from_keychain(cx: &AsyncAppContext) -> Option<Credentials> {
+async fn read_credentials_from_keychain(cx: &AsyncAppContext) -> Option<Credentials> {
     if IMPERSONATE_LOGIN.is_some() {
         return None;
     }
 
     let (user_id, access_token) = cx
-        .platform()
-        .read_credentials(&ZED_SERVER_URL)
-        .log_err()
-        .flatten()?;
+        .run_on_main(|cx| cx.read_credentials(&ZED_SERVER_URL).log_err().flatten())
+        .ok()?
+        .await?;
+
     Some(Credentials {
         user_id: user_id.parse().ok()?,
         access_token: String::from_utf8(access_token).ok()?,
     })
 }
 
-fn write_credentials_to_keychain(credentials: &Credentials, cx: &AsyncAppContext) -> Result<()> {
-    cx.platform().write_credentials(
-        &ZED_SERVER_URL,
-        &credentials.user_id.to_string(),
-        credentials.access_token.as_bytes(),
-    )
+async fn write_credentials_to_keychain(
+    credentials: Credentials,
+    cx: &AsyncAppContext,
+) -> Result<()> {
+    cx.run_on_main(move |cx| {
+        cx.write_credentials(
+            &ZED_SERVER_URL,
+            &credentials.user_id.to_string(),
+            credentials.access_token.as_bytes(),
+        )
+    })?
+    .await
+}
+
+async fn delete_credentials_from_keychain(cx: &AsyncAppContext) -> Result<()> {
+    cx.run_on_main(move |cx| cx.delete_credentials(&ZED_SERVER_URL))?
+        .await
 }
 
 const WORKTREE_URL_PREFIX: &str = "zed://worktrees/";
@@ -1434,290 +1465,290 @@ pub fn decode_worktree_url(url: &str) -> Option<(u64, String)> {
     Some((id, access_token.to_string()))
 }
 
-#[cfg(test)]
-mod tests {
-    use super::*;
-    use crate::test::FakeServer;
-    use gpui::{executor::Deterministic, TestAppContext};
-    use parking_lot::Mutex;
-    use std::future;
-    use util::http::FakeHttpClient;
-
-    #[gpui::test(iterations = 10)]
-    async fn test_reconnection(cx: &mut TestAppContext) {
-        cx.foreground().forbid_parking();
-
-        let user_id = 5;
-        let client = cx.update(|cx| Client::new(FakeHttpClient::with_404_response(), cx));
-        let server = FakeServer::for_client(user_id, &client, cx).await;
-        let mut status = client.status();
-        assert!(matches!(
-            status.next().await,
-            Some(Status::Connected { .. })
-        ));
-        assert_eq!(server.auth_count(), 1);
-
-        server.forbid_connections();
-        server.disconnect();
-        while !matches!(status.next().await, Some(Status::ReconnectionError { .. })) {}
-
-        server.allow_connections();
-        cx.foreground().advance_clock(Duration::from_secs(10));
-        while !matches!(status.next().await, Some(Status::Connected { .. })) {}
-        assert_eq!(server.auth_count(), 1); // Client reused the cached credentials when reconnecting
-
-        server.forbid_connections();
-        server.disconnect();
-        while !matches!(status.next().await, Some(Status::ReconnectionError { .. })) {}
-
-        // Clear cached credentials after authentication fails
-        server.roll_access_token();
-        server.allow_connections();
-        cx.foreground().advance_clock(Duration::from_secs(10));
-        while !matches!(status.next().await, Some(Status::Connected { .. })) {}
-        assert_eq!(server.auth_count(), 2); // Client re-authenticated due to an invalid token
-    }
-
-    #[gpui::test(iterations = 10)]
-    async fn test_connection_timeout(deterministic: Arc<Deterministic>, cx: &mut TestAppContext) {
-        deterministic.forbid_parking();
-
-        let user_id = 5;
-        let client = cx.update(|cx| Client::new(FakeHttpClient::with_404_response(), cx));
-        let mut status = client.status();
-
-        // Time out when client tries to connect.
-        client.override_authenticate(move |cx| {
-            cx.foreground().spawn(async move {
-                Ok(Credentials {
-                    user_id,
-                    access_token: "token".into(),
-                })
-            })
-        });
-        client.override_establish_connection(|_, cx| {
-            cx.foreground().spawn(async move {
-                future::pending::<()>().await;
-                unreachable!()
-            })
-        });
-        let auth_and_connect = cx.spawn({
-            let client = client.clone();
-            |cx| async move { client.authenticate_and_connect(false, &cx).await }
-        });
-        deterministic.run_until_parked();
-        assert!(matches!(status.next().await, Some(Status::Connecting)));
-
-        deterministic.advance_clock(CONNECTION_TIMEOUT);
-        assert!(matches!(
-            status.next().await,
-            Some(Status::ConnectionError { .. })
-        ));
-        auth_and_connect.await.unwrap_err();
-
-        // Allow the connection to be established.
-        let server = FakeServer::for_client(user_id, &client, cx).await;
-        assert!(matches!(
-            status.next().await,
-            Some(Status::Connected { .. })
-        ));
-
-        // Disconnect client.
-        server.forbid_connections();
-        server.disconnect();
-        while !matches!(status.next().await, Some(Status::ReconnectionError { .. })) {}
-
-        // Time out when re-establishing the connection.
-        server.allow_connections();
-        client.override_establish_connection(|_, cx| {
-            cx.foreground().spawn(async move {
-                future::pending::<()>().await;
-                unreachable!()
-            })
-        });
-        deterministic.advance_clock(2 * INITIAL_RECONNECTION_DELAY);
-        assert!(matches!(
-            status.next().await,
-            Some(Status::Reconnecting { .. })
-        ));
-
-        deterministic.advance_clock(CONNECTION_TIMEOUT);
-        assert!(matches!(
-            status.next().await,
-            Some(Status::ReconnectionError { .. })
-        ));
-    }
-
-    #[gpui::test(iterations = 10)]
-    async fn test_authenticating_more_than_once(
-        cx: &mut TestAppContext,
-        deterministic: Arc<Deterministic>,
-    ) {
-        cx.foreground().forbid_parking();
-
-        let auth_count = Arc::new(Mutex::new(0));
-        let dropped_auth_count = Arc::new(Mutex::new(0));
-        let client = cx.update(|cx| Client::new(FakeHttpClient::with_404_response(), cx));
-        client.override_authenticate({
-            let auth_count = auth_count.clone();
-            let dropped_auth_count = dropped_auth_count.clone();
-            move |cx| {
-                let auth_count = auth_count.clone();
-                let dropped_auth_count = dropped_auth_count.clone();
-                cx.foreground().spawn(async move {
-                    *auth_count.lock() += 1;
-                    let _drop = util::defer(move || *dropped_auth_count.lock() += 1);
-                    future::pending::<()>().await;
-                    unreachable!()
-                })
-            }
-        });
-
-        let _authenticate = cx.spawn(|cx| {
-            let client = client.clone();
-            async move { client.authenticate_and_connect(false, &cx).await }
-        });
-        deterministic.run_until_parked();
-        assert_eq!(*auth_count.lock(), 1);
-        assert_eq!(*dropped_auth_count.lock(), 0);
-
-        let _authenticate = cx.spawn(|cx| {
-            let client = client.clone();
-            async move { client.authenticate_and_connect(false, &cx).await }
-        });
-        deterministic.run_until_parked();
-        assert_eq!(*auth_count.lock(), 2);
-        assert_eq!(*dropped_auth_count.lock(), 1);
-    }
-
-    #[test]
-    fn test_encode_and_decode_worktree_url() {
-        let url = encode_worktree_url(5, "deadbeef");
-        assert_eq!(decode_worktree_url(&url), Some((5, "deadbeef".to_string())));
-        assert_eq!(
-            decode_worktree_url(&format!("\n {}\t", url)),
-            Some((5, "deadbeef".to_string()))
-        );
-        assert_eq!(decode_worktree_url("not://the-right-format"), None);
-    }
-
-    #[gpui::test]
-    async fn test_subscribing_to_entity(cx: &mut TestAppContext) {
-        cx.foreground().forbid_parking();
-
-        let user_id = 5;
-        let client = cx.update(|cx| Client::new(FakeHttpClient::with_404_response(), cx));
-        let server = FakeServer::for_client(user_id, &client, cx).await;
-
-        let (done_tx1, mut done_rx1) = smol::channel::unbounded();
-        let (done_tx2, mut done_rx2) = smol::channel::unbounded();
-        client.add_model_message_handler(
-            move |model: ModelHandle<Model>, _: TypedEnvelope<proto::JoinProject>, _, cx| {
-                match model.read_with(&cx, |model, _| model.id) {
-                    1 => done_tx1.try_send(()).unwrap(),
-                    2 => done_tx2.try_send(()).unwrap(),
-                    _ => unreachable!(),
-                }
-                async { Ok(()) }
-            },
-        );
-        let model1 = cx.add_model(|_| Model {
-            id: 1,
-            subscription: None,
-        });
-        let model2 = cx.add_model(|_| Model {
-            id: 2,
-            subscription: None,
-        });
-        let model3 = cx.add_model(|_| Model {
-            id: 3,
-            subscription: None,
-        });
-
-        let _subscription1 = client
-            .subscribe_to_entity(1)
-            .unwrap()
-            .set_model(&model1, &mut cx.to_async());
-        let _subscription2 = client
-            .subscribe_to_entity(2)
-            .unwrap()
-            .set_model(&model2, &mut cx.to_async());
-        // Ensure dropping a subscription for the same entity type still allows receiving of
-        // messages for other entity IDs of the same type.
-        let subscription3 = client
-            .subscribe_to_entity(3)
-            .unwrap()
-            .set_model(&model3, &mut cx.to_async());
-        drop(subscription3);
-
-        server.send(proto::JoinProject { project_id: 1 });
-        server.send(proto::JoinProject { project_id: 2 });
-        done_rx1.next().await.unwrap();
-        done_rx2.next().await.unwrap();
-    }
-
-    #[gpui::test]
-    async fn test_subscribing_after_dropping_subscription(cx: &mut TestAppContext) {
-        cx.foreground().forbid_parking();
-
-        let user_id = 5;
-        let client = cx.update(|cx| Client::new(FakeHttpClient::with_404_response(), cx));
-        let server = FakeServer::for_client(user_id, &client, cx).await;
-
-        let model = cx.add_model(|_| Model::default());
-        let (done_tx1, _done_rx1) = smol::channel::unbounded();
-        let (done_tx2, mut done_rx2) = smol::channel::unbounded();
-        let subscription1 = client.add_message_handler(
-            model.clone(),
-            move |_, _: TypedEnvelope<proto::Ping>, _, _| {
-                done_tx1.try_send(()).unwrap();
-                async { Ok(()) }
-            },
-        );
-        drop(subscription1);
-        let _subscription2 = client.add_message_handler(
-            model.clone(),
-            move |_, _: TypedEnvelope<proto::Ping>, _, _| {
-                done_tx2.try_send(()).unwrap();
-                async { Ok(()) }
-            },
-        );
-        server.send(proto::Ping {});
-        done_rx2.next().await.unwrap();
-    }
-
-    #[gpui::test]
-    async fn test_dropping_subscription_in_handler(cx: &mut TestAppContext) {
-        cx.foreground().forbid_parking();
-
-        let user_id = 5;
-        let client = cx.update(|cx| Client::new(FakeHttpClient::with_404_response(), cx));
-        let server = FakeServer::for_client(user_id, &client, cx).await;
-
-        let model = cx.add_model(|_| Model::default());
-        let (done_tx, mut done_rx) = smol::channel::unbounded();
-        let subscription = client.add_message_handler(
-            model.clone(),
-            move |model, _: TypedEnvelope<proto::Ping>, _, mut cx| {
-                model.update(&mut cx, |model, _| model.subscription.take());
-                done_tx.try_send(()).unwrap();
-                async { Ok(()) }
-            },
-        );
-        model.update(cx, |model, _| {
-            model.subscription = Some(subscription);
-        });
-        server.send(proto::Ping {});
-        done_rx.next().await.unwrap();
-    }
-
-    #[derive(Default)]
-    struct Model {
-        id: usize,
-        subscription: Option<Subscription>,
-    }
-
-    impl Entity for Model {
-        type Event = ();
-    }
-}
+// #[cfg(test)]
+// mod tests {
+//     use super::*;
+//     use crate::test::FakeServer;
+//     use gpui::{executor::Deterministic, TestAppContext};
+//     use parking_lot::Mutex;
+//     use std::future;
+//     use util::http::FakeHttpClient;
+
+//     #[gpui::test(iterations = 10)]
+//     async fn test_reconnection(cx: &mut TestAppContext) {
+//         cx.foreground().forbid_parking();
+
+//         let user_id = 5;
+//         let client = cx.update(|cx| Client::new(FakeHttpClient::with_404_response(), cx));
+//         let server = FakeServer::for_client(user_id, &client, cx).await;
+//         let mut status = client.status();
+//         assert!(matches!(
+//             status.next().await,
+//             Some(Status::Connected { .. })
+//         ));
+//         assert_eq!(server.auth_count(), 1);
+
+//         server.forbid_connections();
+//         server.disconnect();
+//         while !matches!(status.next().await, Some(Status::ReconnectionError { .. })) {}
+
+//         server.allow_connections();
+//         cx.foreground().advance_clock(Duration::from_secs(10));
+//         while !matches!(status.next().await, Some(Status::Connected { .. })) {}
+//         assert_eq!(server.auth_count(), 1); // Client reused the cached credentials when reconnecting
+
+//         server.forbid_connections();
+//         server.disconnect();
+//         while !matches!(status.next().await, Some(Status::ReconnectionError { .. })) {}
+
+//         // Clear cached credentials after authentication fails
+//         server.roll_access_token();
+//         server.allow_connections();
+//         cx.foreground().advance_clock(Duration::from_secs(10));
+//         while !matches!(status.next().await, Some(Status::Connected { .. })) {}
+//         assert_eq!(server.auth_count(), 2); // Client re-authenticated due to an invalid token
+//     }
+
+//     #[gpui::test(iterations = 10)]
+//     async fn test_connection_timeout(deterministic: Arc<Deterministic>, cx: &mut TestAppContext) {
+//         deterministic.forbid_parking();
+
+//         let user_id = 5;
+//         let client = cx.update(|cx| Client::new(FakeHttpClient::with_404_response(), cx));
+//         let mut status = client.status();
+
+//         // Time out when client tries to connect.
+//         client.override_authenticate(move |cx| {
+//             cx.foreground().spawn(async move {
+//                 Ok(Credentials {
+//                     user_id,
+//                     access_token: "token".into(),
+//                 })
+//             })
+//         });
+//         client.override_establish_connection(|_, cx| {
+//             cx.foreground().spawn(async move {
+//                 future::pending::<()>().await;
+//                 unreachable!()
+//             })
+//         });
+//         let auth_and_connect = cx.spawn({
+//             let client = client.clone();
+//             |cx| async move { client.authenticate_and_connect(false, &cx).await }
+//         });
+//         deterministic.run_until_parked();
+//         assert!(matches!(status.next().await, Some(Status::Connecting)));
+
+//         deterministic.advance_clock(CONNECTION_TIMEOUT);
+//         assert!(matches!(
+//             status.next().await,
+//             Some(Status::ConnectionError { .. })
+//         ));
+//         auth_and_connect.await.unwrap_err();
+
+//         // Allow the connection to be established.
+//         let server = FakeServer::for_client(user_id, &client, cx).await;
+//         assert!(matches!(
+//             status.next().await,
+//             Some(Status::Connected { .. })
+//         ));
+
+//         // Disconnect client.
+//         server.forbid_connections();
+//         server.disconnect();
+//         while !matches!(status.next().await, Some(Status::ReconnectionError { .. })) {}
+
+//         // Time out when re-establishing the connection.
+//         server.allow_connections();
+//         client.override_establish_connection(|_, cx| {
+//             cx.foreground().spawn(async move {
+//                 future::pending::<()>().await;
+//                 unreachable!()
+//             })
+//         });
+//         deterministic.advance_clock(2 * INITIAL_RECONNECTION_DELAY);
+//         assert!(matches!(
+//             status.next().await,
+//             Some(Status::Reconnecting { .. })
+//         ));
+
+//         deterministic.advance_clock(CONNECTION_TIMEOUT);
+//         assert!(matches!(
+//             status.next().await,
+//             Some(Status::ReconnectionError { .. })
+//         ));
+//     }
+
+//     #[gpui::test(iterations = 10)]
+//     async fn test_authenticating_more_than_once(
+//         cx: &mut TestAppContext,
+//         deterministic: Arc<Deterministic>,
+//     ) {
+//         cx.foreground().forbid_parking();
+
+//         let auth_count = Arc::new(Mutex::new(0));
+//         let dropped_auth_count = Arc::new(Mutex::new(0));
+//         let client = cx.update(|cx| Client::new(FakeHttpClient::with_404_response(), cx));
+//         client.override_authenticate({
+//             let auth_count = auth_count.clone();
+//             let dropped_auth_count = dropped_auth_count.clone();
+//             move |cx| {
+//                 let auth_count = auth_count.clone();
+//                 let dropped_auth_count = dropped_auth_count.clone();
+//                 cx.foreground().spawn(async move {
+//                     *auth_count.lock() += 1;
+//                     let _drop = util::defer(move || *dropped_auth_count.lock() += 1);
+//                     future::pending::<()>().await;
+//                     unreachable!()
+//                 })
+//             }
+//         });
+
+//         let _authenticate = cx.spawn(|cx| {
+//             let client = client.clone();
+//             async move { client.authenticate_and_connect(false, &cx).await }
+//         });
+//         deterministic.run_until_parked();
+//         assert_eq!(*auth_count.lock(), 1);
+//         assert_eq!(*dropped_auth_count.lock(), 0);
+
+//         let _authenticate = cx.spawn(|cx| {
+//             let client = client.clone();
+//             async move { client.authenticate_and_connect(false, &cx).await }
+//         });
+//         deterministic.run_until_parked();
+//         assert_eq!(*auth_count.lock(), 2);
+//         assert_eq!(*dropped_auth_count.lock(), 1);
+//     }
+
+//     #[test]
+//     fn test_encode_and_decode_worktree_url() {
+//         let url = encode_worktree_url(5, "deadbeef");
+//         assert_eq!(decode_worktree_url(&url), Some((5, "deadbeef".to_string())));
+//         assert_eq!(
+//             decode_worktree_url(&format!("\n {}\t", url)),
+//             Some((5, "deadbeef".to_string()))
+//         );
+//         assert_eq!(decode_worktree_url("not://the-right-format"), None);
+//     }
+
+//     #[gpui::test]
+//     async fn test_subscribing_to_entity(cx: &mut TestAppContext) {
+//         cx.foreground().forbid_parking();
+
+//         let user_id = 5;
+//         let client = cx.update(|cx| Client::new(FakeHttpClient::with_404_response(), cx));
+//         let server = FakeServer::for_client(user_id, &client, cx).await;
+
+//         let (done_tx1, mut done_rx1) = smol::channel::unbounded();
+//         let (done_tx2, mut done_rx2) = smol::channel::unbounded();
+//         client.add_model_message_handler(
+//             move |model: ModelHandle<Model>, _: TypedEnvelope<proto::JoinProject>, _, cx| {
+//                 match model.read_with(&cx, |model, _| model.id) {
+//                     1 => done_tx1.try_send(()).unwrap(),
+//                     2 => done_tx2.try_send(()).unwrap(),
+//                     _ => unreachable!(),
+//                 }
+//                 async { Ok(()) }
+//             },
+//         );
+//         let model1 = cx.add_model(|_| Model {
+//             id: 1,
+//             subscription: None,
+//         });
+//         let model2 = cx.add_model(|_| Model {
+//             id: 2,
+//             subscription: None,
+//         });
+//         let model3 = cx.add_model(|_| Model {
+//             id: 3,
+//             subscription: None,
+//         });
+
+//         let _subscription1 = client
+//             .subscribe_to_entity(1)
+//             .unwrap()
+//             .set_model(&model1, &mut cx.to_async());
+//         let _subscription2 = client
+//             .subscribe_to_entity(2)
+//             .unwrap()
+//             .set_model(&model2, &mut cx.to_async());
+//         // Ensure dropping a subscription for the same entity type still allows receiving of
+//         // messages for other entity IDs of the same type.
+//         let subscription3 = client
+//             .subscribe_to_entity(3)
+//             .unwrap()
+//             .set_model(&model3, &mut cx.to_async());
+//         drop(subscription3);
+
+//         server.send(proto::JoinProject { project_id: 1 });
+//         server.send(proto::JoinProject { project_id: 2 });
+//         done_rx1.next().await.unwrap();
+//         done_rx2.next().await.unwrap();
+//     }
+
+//     #[gpui::test]
+//     async fn test_subscribing_after_dropping_subscription(cx: &mut TestAppContext) {
+//         cx.foreground().forbid_parking();
+
+//         let user_id = 5;
+//         let client = cx.update(|cx| Client::new(FakeHttpClient::with_404_response(), cx));
+//         let server = FakeServer::for_client(user_id, &client, cx).await;
+
+//         let model = cx.add_model(|_| Model::default());
+//         let (done_tx1, _done_rx1) = smol::channel::unbounded();
+//         let (done_tx2, mut done_rx2) = smol::channel::unbounded();
+//         let subscription1 = client.add_message_handler(
+//             model.clone(),
+//             move |_, _: TypedEnvelope<proto::Ping>, _, _| {
+//                 done_tx1.try_send(()).unwrap();
+//                 async { Ok(()) }
+//             },
+//         );
+//         drop(subscription1);
+//         let _subscription2 = client.add_message_handler(
+//             model.clone(),
+//             move |_, _: TypedEnvelope<proto::Ping>, _, _| {
+//                 done_tx2.try_send(()).unwrap();
+//                 async { Ok(()) }
+//             },
+//         );
+//         server.send(proto::Ping {});
+//         done_rx2.next().await.unwrap();
+//     }
+
+//     #[gpui::test]
+//     async fn test_dropping_subscription_in_handler(cx: &mut TestAppContext) {
+//         cx.foreground().forbid_parking();
+
+//         let user_id = 5;
+//         let client = cx.update(|cx| Client::new(FakeHttpClient::with_404_response(), cx));
+//         let server = FakeServer::for_client(user_id, &client, cx).await;
+
+//         let model = cx.add_model(|_| Model::default());
+//         let (done_tx, mut done_rx) = smol::channel::unbounded();
+//         let subscription = client.add_message_handler(
+//             model.clone(),
+//             move |model, _: TypedEnvelope<proto::Ping>, _, mut cx| {
+//                 model.update(&mut cx, |model, _| model.subscription.take());
+//                 done_tx.try_send(()).unwrap();
+//                 async { Ok(()) }
+//             },
+//         );
+//         model.update(cx, |model, _| {
+//             model.subscription = Some(subscription);
+//         });
+//         server.send(proto::Ping {});
+//         done_rx.next().await.unwrap();
+//     }
+
+//     #[derive(Default)]
+//     struct Model {
+//         id: usize,
+//         subscription: Option<Subscription>,
+//     }
+
+//     impl Entity for Model {
+//         type Event = ();
+//     }
+// }

crates/client2/src/telemetry.rs 🔗

@@ -1,5 +1,5 @@
 use crate::{TelemetrySettings, ZED_SECRET_CLIENT_TOKEN, ZED_SERVER_URL};
-use gpui::{executor::Background, serde_json, AppContext, Task};
+use gpui2::{serde_json, AppContext, Executor, Task};
 use lazy_static::lazy_static;
 use parking_lot::Mutex;
 use serde::Serialize;
@@ -11,7 +11,7 @@ use util::{channel::ReleaseChannel, TryFutureExt};
 
 pub struct Telemetry {
     http_client: Arc<dyn HttpClient>,
-    executor: Arc<Background>,
+    executor: Executor,
     state: Mutex<TelemetryState>,
 }
 

crates/client2/src/test.rs 🔗

@@ -1,215 +1,215 @@
-use crate::{Client, Connection, Credentials, EstablishConnectionError, UserStore};
-use anyhow::{anyhow, Result};
-use futures::{stream::BoxStream, StreamExt};
-use gpui::{executor, ModelHandle, TestAppContext};
-use parking_lot::Mutex;
-use rpc::{
-    proto::{self, GetPrivateUserInfo, GetPrivateUserInfoResponse},
-    ConnectionId, Peer, Receipt, TypedEnvelope,
-};
-use std::{rc::Rc, sync::Arc};
-use util::http::FakeHttpClient;
-
-pub struct FakeServer {
-    peer: Arc<Peer>,
-    state: Arc<Mutex<FakeServerState>>,
-    user_id: u64,
-    executor: Rc<executor::Foreground>,
-}
-
-#[derive(Default)]
-struct FakeServerState {
-    incoming: Option<BoxStream<'static, Box<dyn proto::AnyTypedEnvelope>>>,
-    connection_id: Option<ConnectionId>,
-    forbid_connections: bool,
-    auth_count: usize,
-    access_token: usize,
-}
-
-impl FakeServer {
-    pub async fn for_client(
-        client_user_id: u64,
-        client: &Arc<Client>,
-        cx: &TestAppContext,
-    ) -> Self {
-        let server = Self {
-            peer: Peer::new(0),
-            state: Default::default(),
-            user_id: client_user_id,
-            executor: cx.foreground(),
-        };
-
-        client
-            .override_authenticate({
-                let state = Arc::downgrade(&server.state);
-                move |cx| {
-                    let state = state.clone();
-                    cx.spawn(move |_| async move {
-                        let state = state.upgrade().ok_or_else(|| anyhow!("server dropped"))?;
-                        let mut state = state.lock();
-                        state.auth_count += 1;
-                        let access_token = state.access_token.to_string();
-                        Ok(Credentials {
-                            user_id: client_user_id,
-                            access_token,
-                        })
-                    })
-                }
-            })
-            .override_establish_connection({
-                let peer = Arc::downgrade(&server.peer);
-                let state = Arc::downgrade(&server.state);
-                move |credentials, cx| {
-                    let peer = peer.clone();
-                    let state = state.clone();
-                    let credentials = credentials.clone();
-                    cx.spawn(move |cx| async move {
-                        let state = state.upgrade().ok_or_else(|| anyhow!("server dropped"))?;
-                        let peer = peer.upgrade().ok_or_else(|| anyhow!("server dropped"))?;
-                        if state.lock().forbid_connections {
-                            Err(EstablishConnectionError::Other(anyhow!(
-                                "server is forbidding connections"
-                            )))?
-                        }
-
-                        assert_eq!(credentials.user_id, client_user_id);
-
-                        if credentials.access_token != state.lock().access_token.to_string() {
-                            Err(EstablishConnectionError::Unauthorized)?
-                        }
-
-                        let (client_conn, server_conn, _) = Connection::in_memory(cx.background());
-                        let (connection_id, io, incoming) =
-                            peer.add_test_connection(server_conn, cx.background());
-                        cx.background().spawn(io).detach();
-                        {
-                            let mut state = state.lock();
-                            state.connection_id = Some(connection_id);
-                            state.incoming = Some(incoming);
-                        }
-                        peer.send(
-                            connection_id,
-                            proto::Hello {
-                                peer_id: Some(connection_id.into()),
-                            },
-                        )
-                        .unwrap();
-
-                        Ok(client_conn)
-                    })
-                }
-            });
-
-        client
-            .authenticate_and_connect(false, &cx.to_async())
-            .await
-            .unwrap();
-
-        server
-    }
-
-    pub fn disconnect(&self) {
-        if self.state.lock().connection_id.is_some() {
-            self.peer.disconnect(self.connection_id());
-            let mut state = self.state.lock();
-            state.connection_id.take();
-            state.incoming.take();
-        }
-    }
-
-    pub fn auth_count(&self) -> usize {
-        self.state.lock().auth_count
-    }
-
-    pub fn roll_access_token(&self) {
-        self.state.lock().access_token += 1;
-    }
-
-    pub fn forbid_connections(&self) {
-        self.state.lock().forbid_connections = true;
-    }
-
-    pub fn allow_connections(&self) {
-        self.state.lock().forbid_connections = false;
-    }
-
-    pub fn send<T: proto::EnvelopedMessage>(&self, message: T) {
-        self.peer.send(self.connection_id(), message).unwrap();
-    }
-
-    #[allow(clippy::await_holding_lock)]
-    pub async fn receive<M: proto::EnvelopedMessage>(&self) -> Result<TypedEnvelope<M>> {
-        self.executor.start_waiting();
-
-        loop {
-            let message = self
-                .state
-                .lock()
-                .incoming
-                .as_mut()
-                .expect("not connected")
-                .next()
-                .await
-                .ok_or_else(|| anyhow!("other half hung up"))?;
-            self.executor.finish_waiting();
-            let type_name = message.payload_type_name();
-            let message = message.into_any();
-
-            if message.is::<TypedEnvelope<M>>() {
-                return Ok(*message.downcast().unwrap());
-            }
-
-            if message.is::<TypedEnvelope<GetPrivateUserInfo>>() {
-                self.respond(
-                    message
-                        .downcast::<TypedEnvelope<GetPrivateUserInfo>>()
-                        .unwrap()
-                        .receipt(),
-                    GetPrivateUserInfoResponse {
-                        metrics_id: "the-metrics-id".into(),
-                        staff: false,
-                        flags: Default::default(),
-                    },
-                );
-                continue;
-            }
-
-            panic!(
-                "fake server received unexpected message type: {:?}",
-                type_name
-            );
-        }
-    }
-
-    pub fn respond<T: proto::RequestMessage>(&self, receipt: Receipt<T>, response: T::Response) {
-        self.peer.respond(receipt, response).unwrap()
-    }
-
-    fn connection_id(&self) -> ConnectionId {
-        self.state.lock().connection_id.expect("not connected")
-    }
-
-    pub async fn build_user_store(
-        &self,
-        client: Arc<Client>,
-        cx: &mut TestAppContext,
-    ) -> ModelHandle<UserStore> {
-        let http_client = FakeHttpClient::with_404_response();
-        let user_store = cx.add_model(|cx| UserStore::new(client, http_client, cx));
-        assert_eq!(
-            self.receive::<proto::GetUsers>()
-                .await
-                .unwrap()
-                .payload
-                .user_ids,
-            &[self.user_id]
-        );
-        user_store
-    }
-}
-
-impl Drop for FakeServer {
-    fn drop(&mut self) {
-        self.disconnect();
-    }
-}
+// use crate::{Client, Connection, Credentials, EstablishConnectionError, UserStore};
+// use anyhow::{anyhow, Result};
+// use futures::{stream::BoxStream, StreamExt};
+// use gpui2::{Executor, Handle, TestAppContext};
+// use parking_lot::Mutex;
+// use rpc::{
+//     proto::{self, GetPrivateUserInfo, GetPrivateUserInfoResponse},
+//     ConnectionId, Peer, Receipt, TypedEnvelope,
+// };
+// use std::{rc::Rc, sync::Arc};
+// use util::http::FakeHttpClient;
+
+// pub struct FakeServer {
+//     peer: Arc<Peer>,
+//     state: Arc<Mutex<FakeServerState>>,
+//     user_id: u64,
+//     executor: Executor,
+// }
+
+// #[derive(Default)]
+// struct FakeServerState {
+//     incoming: Option<BoxStream<'static, Box<dyn proto::AnyTypedEnvelope>>>,
+//     connection_id: Option<ConnectionId>,
+//     forbid_connections: bool,
+//     auth_count: usize,
+//     access_token: usize,
+// }
+
+// impl FakeServer {
+//     pub async fn for_client(
+//         client_user_id: u64,
+//         client: &Arc<Client>,
+//         cx: &TestAppContext,
+//     ) -> Self {
+//         let server = Self {
+//             peer: Peer::new(0),
+//             state: Default::default(),
+//             user_id: client_user_id,
+//             executor: cx.foreground(),
+//         };
+
+//         client
+//             .override_authenticate({
+//                 let state = Arc::downgrade(&server.state);
+//                 move |cx| {
+//                     let state = state.clone();
+//                     cx.spawn(move |_| async move {
+//                         let state = state.upgrade().ok_or_else(|| anyhow!("server dropped"))?;
+//                         let mut state = state.lock();
+//                         state.auth_count += 1;
+//                         let access_token = state.access_token.to_string();
+//                         Ok(Credentials {
+//                             user_id: client_user_id,
+//                             access_token,
+//                         })
+//                     })
+//                 }
+//             })
+//             .override_establish_connection({
+//                 let peer = Arc::downgrade(&server.peer);
+//                 let state = Arc::downgrade(&server.state);
+//                 move |credentials, cx| {
+//                     let peer = peer.clone();
+//                     let state = state.clone();
+//                     let credentials = credentials.clone();
+//                     cx.spawn(move |cx| async move {
+//                         let state = state.upgrade().ok_or_else(|| anyhow!("server dropped"))?;
+//                         let peer = peer.upgrade().ok_or_else(|| anyhow!("server dropped"))?;
+//                         if state.lock().forbid_connections {
+//                             Err(EstablishConnectionError::Other(anyhow!(
+//                                 "server is forbidding connections"
+//                             )))?
+//                         }
+
+//                         assert_eq!(credentials.user_id, client_user_id);
+
+//                         if credentials.access_token != state.lock().access_token.to_string() {
+//                             Err(EstablishConnectionError::Unauthorized)?
+//                         }
+
+//                         let (client_conn, server_conn, _) = Connection::in_memory(cx.background());
+//                         let (connection_id, io, incoming) =
+//                             peer.add_test_connection(server_conn, cx.background());
+//                         cx.background().spawn(io).detach();
+//                         {
+//                             let mut state = state.lock();
+//                             state.connection_id = Some(connection_id);
+//                             state.incoming = Some(incoming);
+//                         }
+//                         peer.send(
+//                             connection_id,
+//                             proto::Hello {
+//                                 peer_id: Some(connection_id.into()),
+//                             },
+//                         )
+//                         .unwrap();
+
+//                         Ok(client_conn)
+//                     })
+//                 }
+//             });
+
+//         client
+//             .authenticate_and_connect(false, &cx.to_async())
+//             .await
+//             .unwrap();
+
+//         server
+//     }
+
+//     pub fn disconnect(&self) {
+//         if self.state.lock().connection_id.is_some() {
+//             self.peer.disconnect(self.connection_id());
+//             let mut state = self.state.lock();
+//             state.connection_id.take();
+//             state.incoming.take();
+//         }
+//     }
+
+//     pub fn auth_count(&self) -> usize {
+//         self.state.lock().auth_count
+//     }
+
+//     pub fn roll_access_token(&self) {
+//         self.state.lock().access_token += 1;
+//     }
+
+//     pub fn forbid_connections(&self) {
+//         self.state.lock().forbid_connections = true;
+//     }
+
+//     pub fn allow_connections(&self) {
+//         self.state.lock().forbid_connections = false;
+//     }
+
+//     pub fn send<T: proto::EnvelopedMessage>(&self, message: T) {
+//         self.peer.send(self.connection_id(), message).unwrap();
+//     }
+
+//     #[allow(clippy::await_holding_lock)]
+//     pub async fn receive<M: proto::EnvelopedMessage>(&self) -> Result<TypedEnvelope<M>> {
+//         self.executor.start_waiting();
+
+//         loop {
+//             let message = self
+//                 .state
+//                 .lock()
+//                 .incoming
+//                 .as_mut()
+//                 .expect("not connected")
+//                 .next()
+//                 .await
+//                 .ok_or_else(|| anyhow!("other half hung up"))?;
+//             self.executor.finish_waiting();
+//             let type_name = message.payload_type_name();
+//             let message = message.into_any();
+
+//             if message.is::<TypedEnvelope<M>>() {
+//                 return Ok(*message.downcast().unwrap());
+//             }
+
+//             if message.is::<TypedEnvelope<GetPrivateUserInfo>>() {
+//                 self.respond(
+//                     message
+//                         .downcast::<TypedEnvelope<GetPrivateUserInfo>>()
+//                         .unwrap()
+//                         .receipt(),
+//                     GetPrivateUserInfoResponse {
+//                         metrics_id: "the-metrics-id".into(),
+//                         staff: false,
+//                         flags: Default::default(),
+//                     },
+//                 );
+//                 continue;
+//             }
+
+//             panic!(
+//                 "fake server received unexpected message type: {:?}",
+//                 type_name
+//             );
+//         }
+//     }
+
+//     pub fn respond<T: proto::RequestMessage>(&self, receipt: Receipt<T>, response: T::Response) {
+//         self.peer.respond(receipt, response).unwrap()
+//     }
+
+//     fn connection_id(&self) -> ConnectionId {
+//         self.state.lock().connection_id.expect("not connected")
+//     }
+
+//     pub async fn build_user_store(
+//         &self,
+//         client: Arc<Client>,
+//         cx: &mut TestAppContext,
+//     ) -> ModelHandle<UserStore> {
+//         let http_client = FakeHttpClient::with_404_response();
+//         let user_store = cx.add_model(|cx| UserStore::new(client, http_client, cx));
+//         assert_eq!(
+//             self.receive::<proto::GetUsers>()
+//                 .await
+//                 .unwrap()
+//                 .payload
+//                 .user_ids,
+//             &[self.user_id]
+//         );
+//         user_store
+//     }
+// }
+
+// impl Drop for FakeServer {
+//     fn drop(&mut self) {
+//         self.disconnect();
+//     }
+// }

crates/client2/src/user.rs 🔗

@@ -3,7 +3,7 @@ use anyhow::{anyhow, Context, Result};
 use collections::{hash_map::Entry, HashMap, HashSet};
 use feature_flags::FeatureFlagAppExt;
 use futures::{channel::mpsc, future, AsyncReadExt, Future, StreamExt};
-use gpui::{AsyncAppContext, Entity, ImageData, ModelContext, ModelHandle, Task};
+use gpui2::{AsyncAppContext, EventEmitter, Handle, ImageData, ModelContext, Task};
 use postage::{sink::Sink, watch};
 use rpc::proto::{RequestMessage, UsersResponse};
 use std::sync::{Arc, Weak};
@@ -103,7 +103,7 @@ pub enum ContactEventKind {
     Cancelled,
 }
 
-impl Entity for UserStore {
+impl EventEmitter for UserStore {
     type Event = Event;
 }
 
@@ -217,7 +217,7 @@ impl UserStore {
     }
 
     async fn handle_update_invite_info(
-        this: ModelHandle<Self>,
+        this: Handle<Self>,
         message: TypedEnvelope<proto::UpdateInviteInfo>,
         _: Arc<Client>,
         mut cx: AsyncAppContext,
@@ -233,7 +233,7 @@ impl UserStore {
     }
 
     async fn handle_show_contacts(
-        this: ModelHandle<Self>,
+        this: Handle<Self>,
         _: TypedEnvelope<proto::ShowContacts>,
         _: Arc<Client>,
         mut cx: AsyncAppContext,
@@ -247,7 +247,7 @@ impl UserStore {
     }
 
     async fn handle_update_contacts(
-        this: ModelHandle<Self>,
+        this: Handle<Self>,
         message: TypedEnvelope<proto::UpdateContacts>,
         _: Arc<Client>,
         mut cx: AsyncAppContext,
@@ -689,7 +689,7 @@ impl User {
 impl Contact {
     async fn from_proto(
         contact: proto::Contact,
-        user_store: &ModelHandle<UserStore>,
+        user_store: &Handle<UserStore>,
         cx: &mut AsyncAppContext,
     ) -> Result<Self> {
         let user = user_store

crates/gpui2/src/app.rs 🔗

@@ -9,10 +9,11 @@ use refineable::Refineable;
 use smallvec::SmallVec;
 
 use crate::{
-    current_platform, image_cache::ImageCache, Action, AssetSource, Context, DisplayId, Executor,
-    FocusEvent, FocusHandle, FocusId, KeyBinding, Keymap, LayoutId, MainThread, MainThreadOnly,
-    Platform, SemanticVersion, SharedString, SubscriberSet, SvgRenderer, Task, TextStyle,
-    TextStyleRefinement, TextSystem, View, Window, WindowContext, WindowHandle, WindowId,
+    current_platform, image_cache::ImageCache, Action, AssetSource, Context, DispatchPhase,
+    DisplayId, Executor, FocusEvent, FocusHandle, FocusId, KeyBinding, Keymap, LayoutId,
+    MainThread, MainThreadOnly, Platform, SemanticVersion, SharedString, SubscriberSet,
+    SvgRenderer, Task, TextStyle, TextStyleRefinement, TextSystem, View, Window, WindowContext,
+    WindowHandle, WindowId,
 };
 use anyhow::{anyhow, Result};
 use collections::{HashMap, HashSet, VecDeque};
@@ -67,6 +68,7 @@ impl App {
                 entities,
                 windows: SlotMap::with_key(),
                 keymap: Arc::new(RwLock::new(Keymap::default())),
+                global_action_listeners: HashMap::default(),
                 action_builders: HashMap::default(),
                 pending_notifications: Default::default(),
                 pending_effects: Default::default(),
@@ -74,6 +76,7 @@ impl App {
                 event_handlers: SubscriberSet::new(),
                 release_handlers: SubscriberSet::new(),
                 layout_id_buffer: Default::default(),
+                propagate_event: true,
             })
         }))
     }
@@ -168,6 +171,8 @@ pub struct AppContext {
     pub(crate) entities: EntityMap,
     pub(crate) windows: SlotMap<WindowId, Option<Window>>,
     pub(crate) keymap: Arc<RwLock<Keymap>>,
+    pub(crate) global_action_listeners:
+        HashMap<TypeId, Vec<Box<dyn Fn(&dyn Action, DispatchPhase, &mut Self) + Send + Sync>>>,
     action_builders: HashMap<SharedString, ActionBuilder>,
     pub(crate) pending_notifications: HashSet<EntityId>,
     pending_effects: VecDeque<Effect>,
@@ -175,6 +180,7 @@ pub struct AppContext {
     pub(crate) event_handlers: SubscriberSet<EntityId, EventHandler>,
     pub(crate) release_handlers: SubscriberSet<EntityId, ReleaseHandler>,
     pub(crate) layout_id_buffer: Vec<LayoutId>, // We recycle this memory across layout requests.
+    pub(crate) propagate_event: bool,
 }
 
 impl AppContext {
@@ -508,6 +514,21 @@ impl AppContext {
         self.push_effect(Effect::Refresh);
     }
 
+    pub fn on_action<A: Action>(
+        &mut self,
+        listener: impl Fn(&A, &mut Self) + Send + Sync + 'static,
+    ) {
+        self.global_action_listeners
+            .entry(TypeId::of::<A>())
+            .or_default()
+            .push(Box::new(move |action, phase, cx| {
+                if phase == DispatchPhase::Bubble {
+                    let action = action.as_any().downcast_ref().unwrap();
+                    listener(action, cx)
+                }
+            }));
+    }
+
     pub fn register_action_type<A: Action>(&mut self) {
         self.action_builders.insert(A::qualified_name(), A::build);
     }
@@ -523,6 +544,10 @@ impl AppContext {
             .ok_or_else(|| anyhow!("no action type registered for {}", name))?;
         (build)(params)
     }
+
+    pub fn stop_propagation(&mut self) {
+        self.propagate_event = false;
+    }
 }
 
 impl Context for AppContext {
@@ -610,6 +635,18 @@ impl MainThread<AppContext> {
         self.platform().activate(ignoring_other_apps);
     }
 
+    pub fn write_credentials(&self, url: &str, username: &str, password: &[u8]) -> Result<()> {
+        self.platform().write_credentials(url, username, password)
+    }
+
+    pub fn read_credentials(&self, url: &str) -> Result<Option<(String, Vec<u8>)>> {
+        self.platform().read_credentials(url)
+    }
+
+    pub fn delete_credentials(&self, url: &str) -> Result<()> {
+        self.platform().delete_credentials(url)
+    }
+
     pub fn open_window<S: 'static + Send + Sync>(
         &mut self,
         options: crate::WindowOptions,

crates/gpui2/src/app/async_context.rs 🔗

@@ -1,8 +1,9 @@
 use crate::{
-    AnyWindowHandle, AppContext, Context, Handle, ModelContext, Result, Task, ViewContext,
-    WindowContext,
+    AnyWindowHandle, AppContext, Context, Executor, Handle, MainThread, ModelContext, Result, Task,
+    ViewContext, WindowContext,
 };
 use anyhow::anyhow;
+use derive_more::{Deref, DerefMut};
 use parking_lot::Mutex;
 use std::{future::Future, sync::Weak};
 
@@ -75,6 +76,15 @@ impl Context for AsyncAppContext {
 }
 
 impl AsyncAppContext {
+    pub fn executor(&self) -> Result<Executor> {
+        let app = self
+            .0
+            .upgrade()
+            .ok_or_else(|| anyhow!("app was released"))?;
+        let lock = app.lock(); // Need this to compile
+        Ok(lock.executor().clone())
+    }
+
     pub fn read_window<R>(
         &self,
         handle: AnyWindowHandle,
@@ -116,10 +126,27 @@ impl AsyncAppContext {
         let app_context = app.lock();
         Ok(app_context.spawn(f))
     }
+
+    pub fn run_on_main<R>(
+        &self,
+        f: impl FnOnce(&mut MainThread<AppContext>) -> R + Send + 'static,
+    ) -> Result<Task<R>>
+    where
+        R: Send + 'static,
+    {
+        let app = self
+            .0
+            .upgrade()
+            .ok_or_else(|| anyhow!("app was released"))?;
+        let mut app_context = app.lock();
+        Ok(app_context.run_on_main(f))
+    }
 }
 
-#[derive(Clone)]
+#[derive(Clone, Deref, DerefMut)]
 pub struct AsyncWindowContext {
+    #[deref]
+    #[deref_mut]
     app: AsyncAppContext,
     window: AnyWindowHandle,
 }

crates/gpui2/src/app/entity_map.rs 🔗

@@ -4,7 +4,7 @@ use derive_more::{Deref, DerefMut};
 use parking_lot::{RwLock, RwLockUpgradableReadGuard};
 use slotmap::{SecondaryMap, SlotMap};
 use std::{
-    any::Any,
+    any::{Any, TypeId},
     marker::PhantomData,
     mem,
     sync::{
@@ -70,9 +70,12 @@ impl EntityMap {
 
     pub fn weak_handle<T: 'static + Send + Sync>(&self, id: EntityId) -> WeakHandle<T> {
         WeakHandle {
-            id,
+            any_handle: AnyWeakHandle {
+                id,
+                entity_type: TypeId::of::<T>(),
+                entity_map: Arc::downgrade(&self.0),
+            },
             entity_type: PhantomData,
-            entity_map: Arc::downgrade(&self.0),
         }
     }
 
@@ -112,44 +115,45 @@ impl<T> Drop for Lease<T> {
 #[derive(Deref, DerefMut)]
 pub struct Slot<T: Send + Sync + 'static>(Handle<T>);
 
-pub struct Handle<T: Send + Sync> {
+pub struct AnyHandle {
     pub(crate) id: EntityId,
-    entity_type: PhantomData<T>,
+    entity_type: TypeId,
     entity_map: Weak<RwLock<EntityMapState>>,
 }
 
-impl<T: 'static + Send + Sync> Handle<T> {
-    fn new(id: EntityId, entity_map: Weak<RwLock<EntityMapState>>) -> Self {
+impl AnyHandle {
+    fn new(id: EntityId, entity_type: TypeId, entity_map: Weak<RwLock<EntityMapState>>) -> Self {
         Self {
             id,
-            entity_type: PhantomData,
+            entity_type,
             entity_map,
         }
     }
 
-    pub fn downgrade(&self) -> WeakHandle<T> {
-        WeakHandle {
+    pub fn downgrade(&self) -> AnyWeakHandle {
+        AnyWeakHandle {
             id: self.id,
             entity_type: self.entity_type,
             entity_map: self.entity_map.clone(),
         }
     }
 
-    /// Update the entity referenced by this handle with the given function.
-    ///
-    /// The update function receives a context appropriate for its environment.
-    /// When updating in an `AppContext`, it receives a `ModelContext`.
-    /// When updating an a `WindowContext`, it receives a `ViewContext`.
-    pub fn update<C: Context, R>(
-        &self,
-        cx: &mut C,
-        update: impl FnOnce(&mut T, &mut C::EntityContext<'_, '_, T>) -> R,
-    ) -> C::Result<R> {
-        cx.update_entity(self, update)
+    pub fn downcast<T>(&self) -> Option<Handle<T>>
+    where
+        T: 'static + Send + Sync,
+    {
+        if TypeId::of::<T>() == self.entity_type {
+            Some(Handle {
+                any_handle: self.clone(),
+                entity_type: PhantomData,
+            })
+        } else {
+            None
+        }
     }
 }
 
-impl<T: Send + Sync> Clone for Handle<T> {
+impl Clone for AnyHandle {
     fn clone(&self) -> Self {
         if let Some(entity_map) = self.entity_map.upgrade() {
             let entity_map = entity_map.read();
@@ -163,13 +167,13 @@ impl<T: Send + Sync> Clone for Handle<T> {
 
         Self {
             id: self.id,
-            entity_type: PhantomData,
+            entity_type: self.entity_type,
             entity_map: self.entity_map.clone(),
         }
     }
 }
 
-impl<T: Send + Sync> Drop for Handle<T> {
+impl Drop for AnyHandle {
     fn drop(&mut self) {
         if let Some(entity_map) = self.entity_map.upgrade() {
             let entity_map = entity_map.upgradable_read();
@@ -193,36 +197,117 @@ impl<T: Send + Sync> Drop for Handle<T> {
     }
 }
 
-pub struct WeakHandle<T> {
-    pub(crate) id: EntityId,
+impl<T> From<Handle<T>> for AnyHandle
+where
+    T: 'static + Send + Sync,
+{
+    fn from(handle: Handle<T>) -> Self {
+        handle.any_handle
+    }
+}
+
+#[derive(Deref, DerefMut)]
+pub struct Handle<T: Send + Sync> {
+    #[deref]
+    #[deref_mut]
+    any_handle: AnyHandle,
     entity_type: PhantomData<T>,
-    entity_map: Weak<RwLock<EntityMapState>>,
 }
 
-impl<T: 'static + Send + Sync> Clone for WeakHandle<T> {
+impl<T: 'static + Send + Sync> Handle<T> {
+    fn new(id: EntityId, entity_map: Weak<RwLock<EntityMapState>>) -> Self {
+        Self {
+            any_handle: AnyHandle::new(id, TypeId::of::<T>(), entity_map),
+            entity_type: PhantomData,
+        }
+    }
+
+    pub fn downgrade(&self) -> WeakHandle<T> {
+        WeakHandle {
+            any_handle: self.any_handle.downgrade(),
+            entity_type: self.entity_type,
+        }
+    }
+
+    /// Update the entity referenced by this handle with the given function.
+    ///
+    /// The update function receives a context appropriate for its environment.
+    /// When updating in an `AppContext`, it receives a `ModelContext`.
+    /// When updating an a `WindowContext`, it receives a `ViewContext`.
+    pub fn update<C: Context, R>(
+        &self,
+        cx: &mut C,
+        update: impl FnOnce(&mut T, &mut C::EntityContext<'_, '_, T>) -> R,
+    ) -> C::Result<R> {
+        cx.update_entity(self, update)
+    }
+}
+
+impl<T: Send + Sync> Clone for Handle<T> {
     fn clone(&self) -> Self {
         Self {
-            id: self.id,
+            any_handle: self.any_handle.clone(),
             entity_type: self.entity_type,
-            entity_map: self.entity_map.clone(),
         }
     }
 }
 
-impl<T: Send + Sync + 'static> WeakHandle<T> {
-    pub fn upgrade(&self, _: &impl Context) -> Option<Handle<T>> {
+#[derive(Clone)]
+pub struct AnyWeakHandle {
+    pub(crate) id: EntityId,
+    entity_type: TypeId,
+    entity_map: Weak<RwLock<EntityMapState>>,
+}
+
+impl AnyWeakHandle {
+    pub fn upgrade(&self) -> Option<AnyHandle> {
         let entity_map = &self.entity_map.upgrade()?;
         entity_map
             .read()
             .ref_counts
             .get(self.id)?
             .fetch_add(1, SeqCst);
-        Some(Handle {
+        Some(AnyHandle {
             id: self.id,
             entity_type: self.entity_type,
             entity_map: self.entity_map.clone(),
         })
     }
+}
+
+impl<T> From<WeakHandle<T>> for AnyWeakHandle
+where
+    T: 'static + Send + Sync,
+{
+    fn from(handle: WeakHandle<T>) -> Self {
+        handle.any_handle
+    }
+}
+
+#[derive(Deref, DerefMut)]
+pub struct WeakHandle<T> {
+    #[deref]
+    #[deref_mut]
+    any_handle: AnyWeakHandle,
+    entity_type: PhantomData<T>,
+}
+
+impl<T: 'static + Send + Sync> Clone for WeakHandle<T> {
+    fn clone(&self) -> Self {
+        Self {
+            any_handle: self.any_handle.clone(),
+            entity_type: self.entity_type,
+        }
+    }
+}
+
+impl<T: Send + Sync + 'static> WeakHandle<T> {
+    pub fn upgrade(&self) -> Option<Handle<T>> {
+        Some(Handle {
+            any_handle: self.any_handle.upgrade()?,
+            entity_type: self.entity_type,
+        })
+    }
 
     /// Update the entity referenced by this handle with the given function if
     /// the referenced entity still exists. Returns an error if the entity has
@@ -240,7 +325,7 @@ impl<T: Send + Sync + 'static> WeakHandle<T> {
         Result<C::Result<R>>: crate::Flatten<R>,
     {
         crate::Flatten::flatten(
-            self.upgrade(cx)
+            self.upgrade()
                 .ok_or_else(|| anyhow!("entity release"))
                 .map(|this| cx.update_entity(&this, update)),
         )

crates/gpui2/src/app/model_context.rs 🔗

@@ -1,5 +1,5 @@
 use crate::{
-    AppContext, Context, Effect, EntityId, EventEmitter, Handle, Reference, Subscription,
+    AppContext, Context, Effect, EntityId, EventEmitter, Executor, Handle, Reference, Subscription,
     WeakHandle,
 };
 use std::marker::PhantomData;
@@ -51,7 +51,7 @@ impl<'a, T: Send + Sync + 'static> ModelContext<'a, T> {
         self.app.observers.insert(
             handle.id,
             Box::new(move |cx| {
-                if let Some((this, handle)) = this.upgrade(cx).zip(handle.upgrade(cx)) {
+                if let Some((this, handle)) = this.upgrade().zip(handle.upgrade()) {
                     this.update(cx, |this, cx| on_notify(this, handle, cx));
                     true
                 } else {
@@ -75,7 +75,7 @@ impl<'a, T: Send + Sync + 'static> ModelContext<'a, T> {
             handle.id,
             Box::new(move |event, cx| {
                 let event = event.downcast_ref().expect("invalid event type");
-                if let Some((this, handle)) = this.upgrade(cx).zip(handle.upgrade(cx)) {
+                if let Some((this, handle)) = this.upgrade().zip(handle.upgrade()) {
                     this.update(cx, |this, cx| on_event(this, handle, event, cx));
                     true
                 } else {
@@ -108,7 +108,7 @@ impl<'a, T: Send + Sync + 'static> ModelContext<'a, T> {
             handle.id,
             Box::new(move |entity, cx| {
                 let entity = entity.downcast_mut().expect("invalid entity type");
-                if let Some(this) = this.upgrade(cx) {
+                if let Some(this) = this.upgrade() {
                     this.update(cx, |this, cx| on_release(this, entity, cx));
                 }
             }),

crates/gpui2/src/executor.rs 🔗

@@ -8,6 +8,7 @@ use std::{
     pin::Pin,
     sync::Arc,
     task::{Context, Poll},
+    time::Duration,
 };
 use util::TryFutureExt;
 
@@ -151,6 +152,11 @@ impl Executor {
         }
     }
 
+    pub fn timer(&self, duration: Duration) -> smol::Timer {
+        // todo!("integrate with deterministic dispatcher")
+        smol::Timer::after(duration)
+    }
+
     pub fn is_main_thread(&self) -> bool {
         self.dispatcher.is_main_thread()
     }

crates/gpui2/src/window.rs 🔗

@@ -1,13 +1,13 @@
 use crate::{
     px, size, Action, AnyBox, AnyView, AppContext, AsyncWindowContext, AvailableSpace,
     BorrowAppContext, Bounds, BoxShadow, Context, Corners, DevicePixels, DispatchContext,
-    DisplayId, Edges, Effect, Element, EntityId, EventEmitter, FocusEvent, FontId, GlobalElementId,
-    GlyphId, Handle, Hsla, ImageData, InputEvent, IsZero, KeyListener, KeyMatch, KeyMatcher,
-    Keystroke, LayoutId, MainThread, MainThreadOnly, MonochromeSprite, MouseMoveEvent, Path,
-    Pixels, Platform, PlatformAtlas, PlatformWindow, Point, PolychromeSprite, Quad, Reference,
-    RenderGlyphParams, RenderImageParams, RenderSvgParams, ScaledPixels, SceneBuilder, Shadow,
-    SharedString, Size, Style, Subscription, TaffyLayoutEngine, Task, Underline, UnderlineStyle,
-    WeakHandle, WindowOptions, SUBPIXEL_VARIANTS,
+    DisplayId, Edges, Effect, Element, EntityId, EventEmitter, Executor, FocusEvent, FontId,
+    GlobalElementId, GlyphId, Handle, Hsla, ImageData, InputEvent, IsZero, KeyListener, KeyMatch,
+    KeyMatcher, Keystroke, LayoutId, MainThread, MainThreadOnly, MonochromeSprite, MouseMoveEvent,
+    Path, Pixels, Platform, PlatformAtlas, PlatformWindow, Point, PolychromeSprite, Quad,
+    Reference, RenderGlyphParams, RenderImageParams, RenderSvgParams, ScaledPixels, SceneBuilder,
+    Shadow, SharedString, Size, Style, Subscription, TaffyLayoutEngine, Task, Underline,
+    UnderlineStyle, WeakHandle, WindowOptions, SUBPIXEL_VARIANTS,
 };
 use anyhow::Result;
 use collections::HashMap;
@@ -167,7 +167,6 @@ pub struct Window {
     focus_parents_by_child: HashMap<FocusId, FocusId>,
     pub(crate) focus_listeners: Vec<AnyFocusListener>,
     pub(crate) focus_handles: Arc<RwLock<SlotMap<FocusId, AtomicUsize>>>,
-    propagate: bool,
     default_prevented: bool,
     mouse_position: Point<Pixels>,
     scale_factor: f32,
@@ -243,7 +242,6 @@ impl Window {
             focus_parents_by_child: HashMap::default(),
             focus_listeners: Vec::new(),
             focus_handles: Arc::new(RwLock::new(SlotMap::with_key())),
-            propagate: true,
             default_prevented: true,
             mouse_position,
             scale_factor,
@@ -302,6 +300,10 @@ impl<'a, 'w> WindowContext<'a, 'w> {
         }
     }
 
+    pub fn window_handle(&self) -> AnyWindowHandle {
+        self.window.handle
+    }
+
     pub fn notify(&mut self) {
         self.window.dirty = true;
     }
@@ -477,10 +479,6 @@ impl<'a, 'w> WindowContext<'a, 'w> {
             .to_pixels(text_style.font_size.into(), rem_size)
     }
 
-    pub fn stop_propagation(&mut self) {
-        self.window.propagate = false;
-    }
-
     pub fn prevent_default(&mut self) {
         self.window.default_prevented = true;
     }
@@ -878,7 +876,7 @@ impl<'a, 'w> WindowContext<'a, 'w> {
             }
 
             // Handlers may set this to false by calling `stop_propagation`
-            self.window.propagate = true;
+            self.app.propagate_event = true;
             self.window.default_prevented = false;
 
             if let Some(mut handlers) = self
@@ -893,16 +891,16 @@ impl<'a, 'w> WindowContext<'a, 'w> {
                 // special purposes, such as detecting events outside of a given Bounds.
                 for (_, handler) in &handlers {
                     handler(any_mouse_event, DispatchPhase::Capture, self);
-                    if !self.window.propagate {
+                    if !self.app.propagate_event {
                         break;
                     }
                 }
 
                 // Bubble phase, where most normal handlers do their work.
-                if self.window.propagate {
+                if self.app.propagate_event {
                     for (_, handler) in handlers.iter().rev() {
                         handler(any_mouse_event, DispatchPhase::Bubble, self);
-                        if !self.window.propagate {
+                        if !self.app.propagate_event {
                             break;
                         }
                     }
@@ -940,7 +938,7 @@ impl<'a, 'w> WindowContext<'a, 'w> {
                             ) {
                                 self.dispatch_action(action, &key_dispatch_stack[..ix]);
                             }
-                            if !self.window.propagate {
+                            if !self.app.propagate_event {
                                 break;
                             }
                         }
@@ -951,7 +949,7 @@ impl<'a, 'w> WindowContext<'a, 'w> {
                 }
             }
 
-            if self.window.propagate {
+            if self.app.propagate_event {
                 for (ix, frame) in key_dispatch_stack.iter().enumerate().rev() {
                     match frame {
                         KeyDispatchStackFrame::Listener {
@@ -968,7 +966,7 @@ impl<'a, 'w> WindowContext<'a, 'w> {
                                     self.dispatch_action(action, &key_dispatch_stack[..ix]);
                                 }
 
-                                if !self.window.propagate {
+                                if !self.app.propagate_event {
                                     break;
                                 }
                             }
@@ -1015,22 +1013,41 @@ impl<'a, 'w> WindowContext<'a, 'w> {
         dispatch_stack: &[KeyDispatchStackFrame],
     ) {
         let action_type = action.as_any().type_id();
-        for stack_frame in dispatch_stack {
-            if let KeyDispatchStackFrame::Listener {
-                event_type,
-                listener,
-            } = stack_frame
-            {
-                if action_type == *event_type {
-                    listener(action.as_any(), &[], DispatchPhase::Capture, self);
-                    if !self.window.propagate {
-                        break;
+
+        if let Some(mut global_listeners) = self.app.global_action_listeners.remove(&action_type) {
+            for listener in &global_listeners {
+                listener(action.as_ref(), DispatchPhase::Capture, self);
+                if !self.app.propagate_event {
+                    break;
+                }
+            }
+            global_listeners.extend(
+                self.global_action_listeners
+                    .remove(&action_type)
+                    .unwrap_or_default(),
+            );
+            self.global_action_listeners
+                .insert(action_type, global_listeners);
+        }
+
+        if self.app.propagate_event {
+            for stack_frame in dispatch_stack {
+                if let KeyDispatchStackFrame::Listener {
+                    event_type,
+                    listener,
+                } = stack_frame
+                {
+                    if action_type == *event_type {
+                        listener(action.as_any(), &[], DispatchPhase::Capture, self);
+                        if !self.app.propagate_event {
+                            break;
+                        }
                     }
                 }
             }
         }
 
-        if self.window.propagate {
+        if self.app.propagate_event {
             for stack_frame in dispatch_stack.iter().rev() {
                 if let KeyDispatchStackFrame::Listener {
                     event_type,
@@ -1039,13 +1056,33 @@ impl<'a, 'w> WindowContext<'a, 'w> {
                 {
                     if action_type == *event_type {
                         listener(action.as_any(), &[], DispatchPhase::Bubble, self);
-                        if !self.window.propagate {
+                        if !self.app.propagate_event {
                             break;
                         }
                     }
                 }
             }
         }
+
+        if self.app.propagate_event {
+            if let Some(mut global_listeners) =
+                self.app.global_action_listeners.remove(&action_type)
+            {
+                for listener in global_listeners.iter().rev() {
+                    listener(action.as_ref(), DispatchPhase::Bubble, self);
+                    if !self.app.propagate_event {
+                        break;
+                    }
+                }
+                global_listeners.extend(
+                    self.global_action_listeners
+                        .remove(&action_type)
+                        .unwrap_or_default(),
+                );
+                self.global_action_listeners
+                    .insert(action_type, global_listeners);
+            }
+        }
     }
 }
 
@@ -1313,7 +1350,7 @@ impl<'a, 'w, V: Send + Sync + 'static> ViewContext<'a, 'w, V> {
             handle.id,
             Box::new(move |cx| {
                 cx.update_window(window_handle.id, |cx| {
-                    if let Some(handle) = handle.upgrade(cx) {
+                    if let Some(handle) = handle.upgrade() {
                         this.update(cx, |this, cx| on_notify(this, handle, cx))
                             .is_ok()
                     } else {
@@ -1340,7 +1377,7 @@ impl<'a, 'w, V: Send + Sync + 'static> ViewContext<'a, 'w, V> {
             handle.id,
             Box::new(move |event, cx| {
                 cx.update_window(window_handle.id, |cx| {
-                    if let Some(handle) = handle.upgrade(cx) {
+                    if let Some(handle) = handle.upgrade() {
                         let event = event.downcast_ref().expect("invalid event type");
                         this.update(cx, |this, cx| on_event(this, handle, event, cx))
                             .is_ok()
@@ -1504,7 +1541,7 @@ impl<'a, 'w, V: Send + Sync + 'static> ViewContext<'a, 'w, V> {
             let cx = unsafe { mem::transmute::<&mut Self, &mut MainThread<Self>>(self) };
             Task::ready(Ok(f(view, cx)))
         } else {
-            let handle = self.handle().upgrade(self).unwrap();
+            let handle = self.handle().upgrade().unwrap();
             self.window_cx.run_on_main(move |cx| handle.update(cx, f))
         }
     }
@@ -1528,7 +1565,7 @@ impl<'a, 'w, V: Send + Sync + 'static> ViewContext<'a, 'w, V> {
         &mut self,
         handler: impl Fn(&mut V, &Event, DispatchPhase, &mut ViewContext<V>) + Send + Sync + 'static,
     ) {
-        let handle = self.handle().upgrade(self).unwrap();
+        let handle = self.handle().upgrade().unwrap();
         self.window_cx.on_mouse_event(move |event, phase, cx| {
             handle.update(cx, |view, cx| {
                 handler(view, event, phase, cx);