Defer activating Subscriptions that are invoked as a part of an effect.

Piotr Osiewicz and Antonio created

Fixes test test_edit_events.

Co-authored-by: Antonio <antonio@zed.dev>

Change summary

crates/gpui2/src/app.rs               | 37 ++++++++----
crates/gpui2/src/app/model_context.rs | 24 +++++--
crates/gpui2/src/subscription.rs      | 50 +++++++++++++++--
crates/gpui2/src/window.rs            | 78 +++++++++++++++++++---------
4 files changed, 135 insertions(+), 54 deletions(-)

Detailed changes

crates/gpui2/src/app.rs 🔗

@@ -358,7 +358,7 @@ impl AppContext {
     {
         let entity_id = entity.entity_id();
         let handle = entity.downgrade();
-        self.observers.insert(
+        let (subscription, activate) = self.observers.insert(
             entity_id,
             Box::new(move |cx| {
                 if let Some(handle) = E::upgrade_from(&handle) {
@@ -367,7 +367,9 @@ impl AppContext {
                     false
                 }
             }),
-        )
+        );
+        self.defer(move |_| activate());
+        subscription
     }
 
     pub fn subscribe<T, E, Evt>(
@@ -398,8 +400,7 @@ impl AppContext {
     {
         let entity_id = entity.entity_id();
         let entity = entity.downgrade();
-
-        self.event_listeners.insert(
+        let (subscription, activate) = self.event_listeners.insert(
             entity_id,
             (
                 TypeId::of::<Evt>(),
@@ -412,7 +413,9 @@ impl AppContext {
                     }
                 }),
             ),
-        )
+        );
+        self.defer(move |_| activate());
+        subscription
     }
 
     pub fn windows(&self) -> Vec<AnyWindowHandle> {
@@ -873,13 +876,15 @@ impl AppContext {
         &mut self,
         mut f: impl FnMut(&mut Self) + 'static,
     ) -> Subscription {
-        self.global_observers.insert(
+        let (subscription, activate) = self.global_observers.insert(
             TypeId::of::<G>(),
             Box::new(move |cx| {
                 f(cx);
                 true
             }),
-        )
+        );
+        self.defer(move |_| activate());
+        subscription
     }
 
     /// Move the global of the given type to the stack.
@@ -903,7 +908,7 @@ impl AppContext {
         &mut self,
         on_new: impl 'static + Fn(&mut V, &mut ViewContext<V>),
     ) -> Subscription {
-        self.new_view_observers.insert(
+        let (subscription, activate) = self.new_view_observers.insert(
             TypeId::of::<V>(),
             Box::new(move |any_view: AnyView, cx: &mut WindowContext| {
                 any_view
@@ -913,7 +918,9 @@ impl AppContext {
                         on_new(view_state, cx);
                     })
             }),
-        )
+        );
+        activate();
+        subscription
     }
 
     pub fn observe_release<E, T>(
@@ -925,13 +932,15 @@ impl AppContext {
         E: Entity<T>,
         T: 'static,
     {
-        self.release_listeners.insert(
+        let (subscription, activate) = self.release_listeners.insert(
             handle.entity_id(),
             Box::new(move |entity, cx| {
                 let entity = entity.downcast_mut().expect("invalid entity type");
                 on_release(entity, cx)
             }),
-        )
+        );
+        activate();
+        subscription
     }
 
     pub(crate) fn push_text_style(&mut self, text_style: TextStyleRefinement) {
@@ -996,13 +1005,15 @@ impl AppContext {
     where
         Fut: 'static + Future<Output = ()>,
     {
-        self.quit_observers.insert(
+        let (subscription, activate) = self.quit_observers.insert(
             (),
             Box::new(move |cx| {
                 let future = on_quit(cx);
                 async move { future.await }.boxed_local()
             }),
-        )
+        );
+        activate();
+        subscription
     }
 }
 

crates/gpui2/src/app/model_context.rs 🔗

@@ -88,13 +88,15 @@ impl<'a, T: 'static> ModelContext<'a, T> {
     where
         T: 'static,
     {
-        self.app.release_listeners.insert(
+        let (subscription, activate) = self.app.release_listeners.insert(
             self.model_state.entity_id,
             Box::new(move |this, cx| {
                 let this = this.downcast_mut().expect("invalid entity type");
                 on_release(this, cx);
             }),
-        )
+        );
+        activate();
+        subscription
     }
 
     pub fn observe_release<T2, E>(
@@ -109,7 +111,7 @@ impl<'a, T: 'static> ModelContext<'a, T> {
     {
         let entity_id = entity.entity_id();
         let this = self.weak_model();
-        self.app.release_listeners.insert(
+        let (subscription, activate) = self.app.release_listeners.insert(
             entity_id,
             Box::new(move |entity, cx| {
                 let entity = entity.downcast_mut().expect("invalid entity type");
@@ -117,7 +119,9 @@ impl<'a, T: 'static> ModelContext<'a, T> {
                     this.update(cx, |this, cx| on_release(this, entity, cx));
                 }
             }),
-        )
+        );
+        activate();
+        subscription
     }
 
     pub fn observe_global<G: 'static>(
@@ -128,10 +132,12 @@ impl<'a, T: 'static> ModelContext<'a, T> {
         T: 'static,
     {
         let handle = self.weak_model();
-        self.global_observers.insert(
+        let (subscription, activate) = self.global_observers.insert(
             TypeId::of::<G>(),
             Box::new(move |cx| handle.update(cx, |view, cx| f(view, cx)).is_ok()),
-        )
+        );
+        self.defer(move |_| activate());
+        subscription
     }
 
     pub fn on_app_quit<Fut>(
@@ -143,7 +149,7 @@ impl<'a, T: 'static> ModelContext<'a, T> {
         T: 'static,
     {
         let handle = self.weak_model();
-        self.app.quit_observers.insert(
+        let (subscription, activate) = self.app.quit_observers.insert(
             (),
             Box::new(move |cx| {
                 let future = handle.update(cx, |entity, cx| on_quit(entity, cx)).ok();
@@ -154,7 +160,9 @@ impl<'a, T: 'static> ModelContext<'a, T> {
                 }
                 .boxed_local()
             }),
-        )
+        );
+        activate();
+        subscription
     }
 
     pub fn notify(&mut self) {

crates/gpui2/src/subscription.rs 🔗

@@ -1,6 +1,6 @@
 use collections::{BTreeMap, BTreeSet};
 use parking_lot::Mutex;
-use std::{fmt::Debug, mem, sync::Arc};
+use std::{cell::Cell, fmt::Debug, mem, rc::Rc, sync::Arc};
 use util::post_inc;
 
 pub(crate) struct SubscriberSet<EmitterKey, Callback>(
@@ -14,11 +14,16 @@ impl<EmitterKey, Callback> Clone for SubscriberSet<EmitterKey, Callback> {
 }
 
 struct SubscriberSetState<EmitterKey, Callback> {
-    subscribers: BTreeMap<EmitterKey, Option<BTreeMap<usize, Callback>>>,
+    subscribers: BTreeMap<EmitterKey, Option<BTreeMap<usize, Subscriber<Callback>>>>,
     dropped_subscribers: BTreeSet<(EmitterKey, usize)>,
     next_subscriber_id: usize,
 }
 
+struct Subscriber<Callback> {
+    active: Rc<Cell<bool>>,
+    callback: Callback,
+}
+
 impl<EmitterKey, Callback> SubscriberSet<EmitterKey, Callback>
 where
     EmitterKey: 'static + Ord + Clone + Debug,
@@ -32,16 +37,33 @@ where
         })))
     }
 
-    pub fn insert(&self, emitter_key: EmitterKey, callback: Callback) -> Subscription {
+    /// Inserts a new `[Subscription]` for the given `emitter_key`. By default, subscriptions
+    /// are inert, meaning that they won't be listed when calling `[SubscriberSet::remove]` or `[SubscriberSet::retain]`.
+    /// This method returns a tuple of a `[Subscription]` and an `impl FnOnce`, and you can use the latter
+    /// to activate the `[Subscription]`.
+    #[must_use]
+    pub fn insert(
+        &self,
+        emitter_key: EmitterKey,
+        callback: Callback,
+    ) -> (Subscription, impl FnOnce()) {
+        let active = Rc::new(Cell::new(false));
         let mut lock = self.0.lock();
         let subscriber_id = post_inc(&mut lock.next_subscriber_id);
         lock.subscribers
             .entry(emitter_key.clone())
             .or_default()
             .get_or_insert_with(|| Default::default())
-            .insert(subscriber_id, callback);
+            .insert(
+                subscriber_id,
+                Subscriber {
+                    active: active.clone(),
+                    callback,
+                },
+            );
         let this = self.0.clone();
-        Subscription {
+
+        let subscription = Subscription {
             unsubscribe: Some(Box::new(move || {
                 let mut lock = this.lock();
                 let Some(subscribers) = lock.subscribers.get_mut(&emitter_key) else {
@@ -63,7 +85,8 @@ where
                 lock.dropped_subscribers
                     .insert((emitter_key, subscriber_id));
             })),
-        }
+        };
+        (subscription, move || active.set(true))
     }
 
     pub fn remove(&self, emitter: &EmitterKey) -> impl IntoIterator<Item = Callback> {
@@ -73,6 +96,13 @@ where
             .map(|s| s.into_values())
             .into_iter()
             .flatten()
+            .filter_map(|subscriber| {
+                if subscriber.active.get() {
+                    Some(subscriber.callback)
+                } else {
+                    None
+                }
+            })
     }
 
     /// Call the given callback for each subscriber to the given emitter.
@@ -91,7 +121,13 @@ where
             return;
         };
 
-        subscribers.retain(|_, callback| f(callback));
+        subscribers.retain(|_, subscriber| {
+            if subscriber.active.get() {
+                f(&mut subscriber.callback)
+            } else {
+                true
+            }
+        });
         let mut lock = self.0.lock();
 
         // Add any new subscribers that were added while invoking the callback.

crates/gpui2/src/window.rs 🔗

@@ -490,7 +490,7 @@ impl<'a> WindowContext<'a> {
         let entity_id = entity.entity_id();
         let entity = entity.downgrade();
         let window_handle = self.window.handle;
-        self.app.event_listeners.insert(
+        let (subscription, activate) = self.app.event_listeners.insert(
             entity_id,
             (
                 TypeId::of::<Evt>(),
@@ -508,7 +508,9 @@ impl<'a> WindowContext<'a> {
                         .unwrap_or(false)
                 }),
             ),
-        )
+        );
+        self.app.defer(move |_| activate());
+        subscription
     }
 
     /// Create an `AsyncWindowContext`, which has a static lifetime and can be held across
@@ -1453,10 +1455,12 @@ impl<'a> WindowContext<'a> {
         f: impl Fn(&mut WindowContext<'_>) + 'static,
     ) -> Subscription {
         let window_handle = self.window.handle;
-        self.global_observers.insert(
+        let (subscription, activate) = self.global_observers.insert(
             TypeId::of::<G>(),
             Box::new(move |cx| window_handle.update(cx, |_, cx| f(cx)).is_ok()),
-        )
+        );
+        self.app.defer(move |_| activate());
+        subscription
     }
 
     pub fn activate_window(&self) {
@@ -2096,7 +2100,7 @@ impl<'a, V: 'static> ViewContext<'a, V> {
         let entity_id = entity.entity_id();
         let entity = entity.downgrade();
         let window_handle = self.window.handle;
-        self.app.observers.insert(
+        let (subscription, activate) = self.app.observers.insert(
             entity_id,
             Box::new(move |cx| {
                 window_handle
@@ -2110,7 +2114,9 @@ impl<'a, V: 'static> ViewContext<'a, V> {
                     })
                     .unwrap_or(false)
             }),
-        )
+        );
+        self.app.defer(move |_| activate());
+        subscription
     }
 
     pub fn subscribe<V2, E, Evt>(
@@ -2127,7 +2133,7 @@ impl<'a, V: 'static> ViewContext<'a, V> {
         let entity_id = entity.entity_id();
         let handle = entity.downgrade();
         let window_handle = self.window.handle;
-        self.app.event_listeners.insert(
+        let (subscription, activate) = self.app.event_listeners.insert(
             entity_id,
             (
                 TypeId::of::<Evt>(),
@@ -2145,7 +2151,9 @@ impl<'a, V: 'static> ViewContext<'a, V> {
                         .unwrap_or(false)
                 }),
             ),
-        )
+        );
+        self.app.defer(move |_| activate());
+        subscription
     }
 
     pub fn on_release(
@@ -2153,13 +2161,15 @@ impl<'a, V: 'static> ViewContext<'a, V> {
         on_release: impl FnOnce(&mut V, &mut WindowContext) + 'static,
     ) -> Subscription {
         let window_handle = self.window.handle;
-        self.app.release_listeners.insert(
+        let (subscription, activate) = self.app.release_listeners.insert(
             self.view.model.entity_id,
             Box::new(move |this, cx| {
                 let this = this.downcast_mut().expect("invalid entity type");
                 let _ = window_handle.update(cx, |_, cx| on_release(this, cx));
             }),
-        )
+        );
+        activate();
+        subscription
     }
 
     pub fn observe_release<V2, E>(
@@ -2175,7 +2185,7 @@ impl<'a, V: 'static> ViewContext<'a, V> {
         let view = self.view().downgrade();
         let entity_id = entity.entity_id();
         let window_handle = self.window.handle;
-        self.app.release_listeners.insert(
+        let (subscription, activate) = self.app.release_listeners.insert(
             entity_id,
             Box::new(move |entity, cx| {
                 let entity = entity.downcast_mut().expect("invalid entity type");
@@ -2183,7 +2193,9 @@ impl<'a, V: 'static> ViewContext<'a, V> {
                     view.update(cx, |this, cx| on_release(this, entity, cx))
                 });
             }),
-        )
+        );
+        activate();
+        subscription
     }
 
     pub fn notify(&mut self) {
@@ -2198,10 +2210,12 @@ impl<'a, V: 'static> ViewContext<'a, V> {
         mut callback: impl FnMut(&mut V, &mut ViewContext<V>) + 'static,
     ) -> Subscription {
         let view = self.view.downgrade();
-        self.window.bounds_observers.insert(
+        let (subscription, activate) = self.window.bounds_observers.insert(
             (),
             Box::new(move |cx| view.update(cx, |view, cx| callback(view, cx)).is_ok()),
-        )
+        );
+        activate();
+        subscription
     }
 
     pub fn observe_window_activation(
@@ -2209,10 +2223,12 @@ impl<'a, V: 'static> ViewContext<'a, V> {
         mut callback: impl FnMut(&mut V, &mut ViewContext<V>) + 'static,
     ) -> Subscription {
         let view = self.view.downgrade();
-        self.window.activation_observers.insert(
+        let (subscription, activate) = self.window.activation_observers.insert(
             (),
             Box::new(move |cx| view.update(cx, |view, cx| callback(view, cx)).is_ok()),
-        )
+        );
+        activate();
+        subscription
     }
 
     /// Register a listener to be called when the given focus handle receives focus.
@@ -2225,7 +2241,7 @@ impl<'a, V: 'static> ViewContext<'a, V> {
     ) -> Subscription {
         let view = self.view.downgrade();
         let focus_id = handle.id;
-        self.window.focus_listeners.insert(
+        let (subscription, activate) = self.window.focus_listeners.insert(
             (),
             Box::new(move |event, cx| {
                 view.update(cx, |view, cx| {
@@ -2235,7 +2251,9 @@ impl<'a, V: 'static> ViewContext<'a, V> {
                 })
                 .is_ok()
             }),
-        )
+        );
+        self.app.defer(move |_| activate());
+        subscription
     }
 
     /// Register a listener to be called when the given focus handle or one of its descendants receives focus.
@@ -2248,7 +2266,7 @@ impl<'a, V: 'static> ViewContext<'a, V> {
     ) -> Subscription {
         let view = self.view.downgrade();
         let focus_id = handle.id;
-        self.window.focus_listeners.insert(
+        let (subscription, activate) = self.window.focus_listeners.insert(
             (),
             Box::new(move |event, cx| {
                 view.update(cx, |view, cx| {
@@ -2262,7 +2280,9 @@ impl<'a, V: 'static> ViewContext<'a, V> {
                 })
                 .is_ok()
             }),
-        )
+        );
+        self.app.defer(move |_| activate());
+        subscription
     }
 
     /// Register a listener to be called when the given focus handle loses focus.
@@ -2275,7 +2295,7 @@ impl<'a, V: 'static> ViewContext<'a, V> {
     ) -> Subscription {
         let view = self.view.downgrade();
         let focus_id = handle.id;
-        self.window.focus_listeners.insert(
+        let (subscription, activate) = self.window.focus_listeners.insert(
             (),
             Box::new(move |event, cx| {
                 view.update(cx, |view, cx| {
@@ -2285,7 +2305,9 @@ impl<'a, V: 'static> ViewContext<'a, V> {
                 })
                 .is_ok()
             }),
-        )
+        );
+        self.app.defer(move |_| activate());
+        subscription
     }
 
     /// Register a listener to be called when the given focus handle or one of its descendants loses focus.
@@ -2298,7 +2320,7 @@ impl<'a, V: 'static> ViewContext<'a, V> {
     ) -> Subscription {
         let view = self.view.downgrade();
         let focus_id = handle.id;
-        self.window.focus_listeners.insert(
+        let (subscription, activate) = self.window.focus_listeners.insert(
             (),
             Box::new(move |event, cx| {
                 view.update(cx, |view, cx| {
@@ -2312,7 +2334,9 @@ impl<'a, V: 'static> ViewContext<'a, V> {
                 })
                 .is_ok()
             }),
-        )
+        );
+        self.app.defer(move |_| activate());
+        subscription
     }
 
     pub fn spawn<Fut, R>(
@@ -2343,14 +2367,16 @@ impl<'a, V: 'static> ViewContext<'a, V> {
     ) -> Subscription {
         let window_handle = self.window.handle;
         let view = self.view().downgrade();
-        self.global_observers.insert(
+        let (subscription, activate) = self.global_observers.insert(
             TypeId::of::<G>(),
             Box::new(move |cx| {
                 window_handle
                     .update(cx, |_, cx| view.update(cx, |view, cx| f(view, cx)).is_ok())
                     .unwrap_or(false)
             }),
-        )
+        );
+        self.app.defer(move |_| activate());
+        subscription
     }
 
     pub fn on_mouse_event<Event: 'static>(