Merge pull request #2008 from zed-industries/callback-leaks

Max Brunsfeld created

Fix callback leaks when subscriptions are added and dropped in the same effect cycle

Change summary

crates/gpui/src/app.rs                     | 693 ++++++-----------------
crates/gpui/src/app/callback_collection.rs | 185 ++++--
2 files changed, 301 insertions(+), 577 deletions(-)

Detailed changes

crates/gpui/src/app.rs 🔗

@@ -26,8 +26,8 @@ use smallvec::SmallVec;
 use smol::prelude::*;
 
 pub use action::*;
-use callback_collection::{CallbackCollection, Mapping};
-use collections::{btree_map, hash_map::Entry, BTreeMap, HashMap, HashSet, VecDeque};
+use callback_collection::CallbackCollection;
+use collections::{hash_map::Entry, HashMap, HashSet, VecDeque};
 use keymap::MatchResult;
 use platform::Event;
 #[cfg(any(test, feature = "test-support"))]
@@ -588,9 +588,9 @@ type GlobalActionCallback = dyn FnMut(&dyn Action, &mut MutableAppContext);
 type SubscriptionCallback = Box<dyn FnMut(&dyn Any, &mut MutableAppContext) -> bool>;
 type GlobalSubscriptionCallback = Box<dyn FnMut(&dyn Any, &mut MutableAppContext)>;
 type ObservationCallback = Box<dyn FnMut(&mut MutableAppContext) -> bool>;
-type FocusObservationCallback = Box<dyn FnMut(bool, &mut MutableAppContext) -> bool>;
 type GlobalObservationCallback = Box<dyn FnMut(&mut MutableAppContext)>;
-type ReleaseObservationCallback = Box<dyn FnOnce(&dyn Any, &mut MutableAppContext)>;
+type FocusObservationCallback = Box<dyn FnMut(bool, &mut MutableAppContext) -> bool>;
+type ReleaseObservationCallback = Box<dyn FnMut(&dyn Any, &mut MutableAppContext)>;
 type ActionObservationCallback = Box<dyn FnMut(TypeId, &mut MutableAppContext)>;
 type WindowActivationCallback = Box<dyn FnMut(bool, &mut MutableAppContext) -> bool>;
 type WindowFullscreenCallback = Box<dyn FnMut(bool, &mut MutableAppContext) -> bool>;
@@ -615,18 +615,17 @@ pub struct MutableAppContext {
     next_subscription_id: usize,
     frame_count: usize,
 
-    focus_observations: CallbackCollection<usize, FocusObservationCallback>,
-    global_subscriptions: CallbackCollection<TypeId, GlobalSubscriptionCallback>,
-    global_observations: CallbackCollection<TypeId, GlobalObservationCallback>,
     subscriptions: CallbackCollection<usize, SubscriptionCallback>,
+    global_subscriptions: CallbackCollection<TypeId, GlobalSubscriptionCallback>,
     observations: CallbackCollection<usize, ObservationCallback>,
+    global_observations: CallbackCollection<TypeId, GlobalObservationCallback>,
+    focus_observations: CallbackCollection<usize, FocusObservationCallback>,
+    release_observations: CallbackCollection<usize, ReleaseObservationCallback>,
+    action_dispatch_observations: CallbackCollection<(), ActionObservationCallback>,
     window_activation_observations: CallbackCollection<usize, WindowActivationCallback>,
     window_fullscreen_observations: CallbackCollection<usize, WindowFullscreenCallback>,
     keystroke_observations: CallbackCollection<usize, KeystrokeCallback>,
 
-    release_observations: Arc<Mutex<HashMap<usize, BTreeMap<usize, ReleaseObservationCallback>>>>,
-    action_dispatch_observations: Arc<Mutex<BTreeMap<usize, ActionObservationCallback>>>,
-
     #[allow(clippy::type_complexity)]
     presenters_and_platform_windows:
         HashMap<usize, (Rc<RefCell<Presenter>>, Box<dyn platform::Window>)>,
@@ -1047,12 +1046,10 @@ impl MutableAppContext {
                 callback(payload, cx)
             }),
         });
-
-        Subscription::GlobalSubscription {
-            id: subscription_id,
-            type_id,
-            subscriptions: Some(self.global_subscriptions.downgrade()),
-        }
+        Subscription::GlobalSubscription(
+            self.global_subscriptions
+                .subscribe(type_id, subscription_id),
+        )
     }
 
     pub fn observe<E, H, F>(&mut self, handle: &H, mut callback: F) -> Subscription
@@ -1089,11 +1086,7 @@ impl MutableAppContext {
                 }
             }),
         });
-        Subscription::Subscription {
-            id: subscription_id,
-            entity_id: handle.id(),
-            subscriptions: Some(self.subscriptions.downgrade()),
-        }
+        Subscription::Subscription(self.subscriptions.subscribe(handle.id(), subscription_id))
     }
 
     fn observe_internal<E, H, F>(&mut self, handle: &H, mut callback: F) -> Subscription
@@ -1117,11 +1110,7 @@ impl MutableAppContext {
                 }
             }),
         });
-        Subscription::Observation {
-            id: subscription_id,
-            entity_id,
-            observations: Some(self.observations.downgrade()),
-        }
+        Subscription::Observation(self.observations.subscribe(entity_id, subscription_id))
     }
 
     fn observe_focus<F, V>(&mut self, handle: &ViewHandle<V>, mut callback: F) -> Subscription
@@ -1144,12 +1133,7 @@ impl MutableAppContext {
                 }
             }),
         });
-
-        Subscription::FocusObservation {
-            id: subscription_id,
-            view_id,
-            observations: Some(self.focus_observations.downgrade()),
-        }
+        Subscription::FocusObservation(self.focus_observations.subscribe(view_id, subscription_id))
     }
 
     pub fn observe_global<G, F>(&mut self, mut observe: F) -> Subscription
@@ -1165,12 +1149,7 @@ impl MutableAppContext {
             id,
             Box::new(move |cx: &mut MutableAppContext| observe(cx)),
         );
-
-        Subscription::GlobalObservation {
-            id,
-            type_id,
-            observations: Some(self.global_observations.downgrade()),
-        }
+        Subscription::GlobalObservation(self.global_observations.subscribe(type_id, id))
     }
 
     pub fn observe_default_global<G, F>(&mut self, observe: F) -> Subscription
@@ -1192,36 +1171,31 @@ impl MutableAppContext {
         F: 'static + FnOnce(&E, &mut Self),
     {
         let id = post_inc(&mut self.next_subscription_id);
-        self.release_observations
-            .lock()
-            .entry(handle.id())
-            .or_default()
-            .insert(
-                id,
-                Box::new(move |released, cx| {
-                    let released = released.downcast_ref().unwrap();
-                    callback(released, cx)
-                }),
-            );
-        Subscription::ReleaseObservation {
+        let mut callback = Some(callback);
+        self.release_observations.add_callback(
+            handle.id(),
             id,
-            entity_id: handle.id(),
-            observations: Some(Arc::downgrade(&self.release_observations)),
-        }
+            Box::new(move |released, cx| {
+                let released = released.downcast_ref().unwrap();
+                if let Some(callback) = callback.take() {
+                    callback(released, cx)
+                }
+            }),
+        );
+        Subscription::ReleaseObservation(self.release_observations.subscribe(handle.id(), id))
     }
 
     pub fn observe_actions<F>(&mut self, callback: F) -> Subscription
     where
         F: 'static + FnMut(TypeId, &mut MutableAppContext),
     {
-        let id = post_inc(&mut self.next_subscription_id);
+        let subscription_id = post_inc(&mut self.next_subscription_id);
         self.action_dispatch_observations
-            .lock()
-            .insert(id, Box::new(callback));
-        Subscription::ActionObservation {
-            id,
-            observations: Some(Arc::downgrade(&self.action_dispatch_observations)),
-        }
+            .add_callback((), subscription_id, Box::new(callback));
+        Subscription::ActionObservation(
+            self.action_dispatch_observations
+                .subscribe((), subscription_id),
+        )
     }
 
     fn observe_window_activation<F>(&mut self, window_id: usize, callback: F) -> Subscription
@@ -1235,11 +1209,10 @@ impl MutableAppContext {
                 subscription_id,
                 callback: Box::new(callback),
             });
-        Subscription::WindowActivationObservation {
-            id: subscription_id,
-            window_id,
-            observations: Some(self.window_activation_observations.downgrade()),
-        }
+        Subscription::WindowActivationObservation(
+            self.window_activation_observations
+                .subscribe(window_id, subscription_id),
+        )
     }
 
     fn observe_fullscreen<F>(&mut self, window_id: usize, callback: F) -> Subscription
@@ -1253,11 +1226,10 @@ impl MutableAppContext {
                 subscription_id,
                 callback: Box::new(callback),
             });
-        Subscription::WindowFullscreenObservation {
-            id: subscription_id,
-            window_id,
-            observations: Some(self.window_activation_observations.downgrade()),
-        }
+        Subscription::WindowActivationObservation(
+            self.window_activation_observations
+                .subscribe(window_id, subscription_id),
+        )
     }
 
     pub fn observe_keystrokes<F>(&mut self, window_id: usize, callback: F) -> Subscription
@@ -1273,12 +1245,10 @@ impl MutableAppContext {
         let subscription_id = post_inc(&mut self.next_subscription_id);
         self.keystroke_observations
             .add_callback(window_id, subscription_id, Box::new(callback));
-
-        Subscription::KeystrokeObservation {
-            id: subscription_id,
-            window_id,
-            observations: Some(self.keystroke_observations.downgrade()),
-        }
+        Subscription::KeystrokeObservation(
+            self.keystroke_observations
+                .subscribe(window_id, subscription_id),
+        )
     }
 
     pub fn defer(&mut self, callback: impl 'static + FnOnce(&mut MutableAppContext)) {
@@ -1999,15 +1969,13 @@ impl MutableAppContext {
                             entity_id,
                             subscription_id,
                             callback,
-                        } => self.subscriptions.add_or_remove_callback(
-                            entity_id,
-                            subscription_id,
-                            callback,
-                        ),
+                        } => self
+                            .subscriptions
+                            .add_callback(entity_id, subscription_id, callback),
 
                         Effect::Event { entity_id, payload } => {
                             let mut subscriptions = self.subscriptions.clone();
-                            subscriptions.emit_and_cleanup(entity_id, self, |callback, this| {
+                            subscriptions.emit(entity_id, self, |callback, this| {
                                 callback(payload.as_ref(), this)
                             })
                         }
@@ -2016,7 +1984,7 @@ impl MutableAppContext {
                             type_id,
                             subscription_id,
                             callback,
-                        } => self.global_subscriptions.add_or_remove_callback(
+                        } => self.global_subscriptions.add_callback(
                             type_id,
                             subscription_id,
                             callback,
@@ -2028,16 +1996,13 @@ impl MutableAppContext {
                             entity_id,
                             subscription_id,
                             callback,
-                        } => self.observations.add_or_remove_callback(
-                            entity_id,
-                            subscription_id,
-                            callback,
-                        ),
+                        } => self
+                            .observations
+                            .add_callback(entity_id, subscription_id, callback),
 
                         Effect::ModelNotification { model_id } => {
                             let mut observations = self.observations.clone();
-                            observations
-                                .emit_and_cleanup(model_id, self, |callback, this| callback(this));
+                            observations.emit(model_id, self, |callback, this| callback(this));
                         }
 
                         Effect::ViewNotification { window_id, view_id } => {
@@ -2046,7 +2011,7 @@ impl MutableAppContext {
 
                         Effect::GlobalNotification { type_id } => {
                             let mut subscriptions = self.global_observations.clone();
-                            subscriptions.emit_and_cleanup(type_id, self, |callback, this| {
+                            subscriptions.emit(type_id, self, |callback, this| {
                                 callback(this);
                                 true
                             });
@@ -2080,7 +2045,7 @@ impl MutableAppContext {
                             subscription_id,
                             callback,
                         } => {
-                            self.focus_observations.add_or_remove_callback(
+                            self.focus_observations.add_callback(
                                 view_id,
                                 subscription_id,
                                 callback,
@@ -2099,7 +2064,7 @@ impl MutableAppContext {
                             window_id,
                             subscription_id,
                             callback,
-                        } => self.window_activation_observations.add_or_remove_callback(
+                        } => self.window_activation_observations.add_callback(
                             window_id,
                             subscription_id,
                             callback,
@@ -2114,7 +2079,7 @@ impl MutableAppContext {
                             window_id,
                             subscription_id,
                             callback,
-                        } => self.window_fullscreen_observations.add_or_remove_callback(
+                        } => self.window_fullscreen_observations.add_callback(
                             window_id,
                             subscription_id,
                             callback,
@@ -2159,6 +2124,7 @@ impl MutableAppContext {
                     self.remove_dropped_entities();
                 } else {
                     self.remove_dropped_entities();
+
                     if refreshing {
                         self.perform_window_refresh();
                     } else {
@@ -2295,7 +2261,7 @@ impl MutableAppContext {
         let type_id = (&*payload).type_id();
 
         let mut subscriptions = self.global_subscriptions.clone();
-        subscriptions.emit_and_cleanup(type_id, self, |callback, this| {
+        subscriptions.emit(type_id, self, |callback, this| {
             callback(payload.as_ref(), this);
             true //Always alive
         });
@@ -2320,17 +2286,18 @@ impl MutableAppContext {
             }
 
             let mut observations = self.observations.clone();
-            observations.emit_and_cleanup(observed_view_id, self, |callback, this| callback(this));
+            observations.emit(observed_view_id, self, |callback, this| callback(this));
         }
     }
 
     fn handle_entity_release_effect(&mut self, entity_id: usize, entity: &dyn Any) {
-        let callbacks = self.release_observations.lock().remove(&entity_id);
-        if let Some(callbacks) = callbacks {
-            for (_, callback) in callbacks {
-                callback(entity, self);
-            }
-        }
+        self.release_observations
+            .clone()
+            .emit(entity_id, self, |callback, this| {
+                callback(entity, this);
+                // Release observations happen one time. So clear the callback by returning false
+                false
+            })
     }
 
     fn handle_fullscreen_effect(&mut self, window_id: usize, is_fullscreen: bool) {
@@ -2350,7 +2317,7 @@ impl MutableAppContext {
             window.is_fullscreen = is_fullscreen;
 
             let mut observations = this.window_fullscreen_observations.clone();
-            observations.emit_and_cleanup(window_id, this, |callback, this| {
+            observations.emit(window_id, this, |callback, this| {
                 callback(is_fullscreen, this)
             });
 
@@ -2367,7 +2334,7 @@ impl MutableAppContext {
     ) {
         self.update(|this| {
             let mut observations = this.keystroke_observations.clone();
-            observations.emit_and_cleanup(window_id, this, {
+            observations.emit(window_id, this, {
                 move |callback, this| callback(&keystroke, &result, handled_by.as_ref(), this)
             });
         });
@@ -2403,7 +2370,7 @@ impl MutableAppContext {
             }
 
             let mut observations = this.window_activation_observations.clone();
-            observations.emit_and_cleanup(window_id, this, |callback, this| callback(active, this));
+            observations.emit(window_id, this, |callback, this| callback(active, this));
 
             Some(())
         });
@@ -2443,8 +2410,7 @@ impl MutableAppContext {
                 }
 
                 let mut subscriptions = this.focus_observations.clone();
-                subscriptions
-                    .emit_and_cleanup(blurred_id, this, |callback, this| callback(false, this));
+                subscriptions.emit(blurred_id, this, |callback, this| callback(false, this));
             }
 
             if let Some(focused_id) = focused_id {
@@ -2456,8 +2422,7 @@ impl MutableAppContext {
                 }
 
                 let mut subscriptions = this.focus_observations.clone();
-                subscriptions
-                    .emit_and_cleanup(focused_id, this, |callback, this| callback(true, this));
+                subscriptions.emit(focused_id, this, |callback, this| callback(true, this));
             }
         })
     }
@@ -2513,11 +2478,12 @@ impl MutableAppContext {
     }
 
     fn handle_action_dispatch_notification_effect(&mut self, action_id: TypeId) {
-        let mut callbacks = mem::take(&mut *self.action_dispatch_observations.lock());
-        for callback in callbacks.values_mut() {
-            callback(action_id, self);
-        }
-        self.action_dispatch_observations.lock().extend(callbacks);
+        self.action_dispatch_observations
+            .clone()
+            .emit((), self, |callback, this| {
+                callback(action_id, this);
+                true
+            });
     }
 
     fn handle_window_should_close_subscription_effect(
@@ -5106,269 +5072,46 @@ impl<T> Drop for ElementStateHandle<T> {
 
 #[must_use]
 pub enum Subscription {
-    Subscription {
-        id: usize,
-        entity_id: usize,
-        subscriptions: Option<Weak<Mapping<usize, SubscriptionCallback>>>,
-    },
-    GlobalSubscription {
-        id: usize,
-        type_id: TypeId,
-        subscriptions: Option<Weak<Mapping<TypeId, GlobalSubscriptionCallback>>>,
-    },
-    Observation {
-        id: usize,
-        entity_id: usize,
-        observations: Option<Weak<Mapping<usize, ObservationCallback>>>,
-    },
-    GlobalObservation {
-        id: usize,
-        type_id: TypeId,
-        observations: Option<Weak<Mapping<TypeId, GlobalObservationCallback>>>,
-    },
-    FocusObservation {
-        id: usize,
-        view_id: usize,
-        observations: Option<Weak<Mapping<usize, FocusObservationCallback>>>,
-    },
-    WindowActivationObservation {
-        id: usize,
-        window_id: usize,
-        observations: Option<Weak<Mapping<usize, WindowActivationCallback>>>,
-    },
-    WindowFullscreenObservation {
-        id: usize,
-        window_id: usize,
-        observations: Option<Weak<Mapping<usize, WindowFullscreenCallback>>>,
-    },
-    KeystrokeObservation {
-        id: usize,
-        window_id: usize,
-        observations: Option<Weak<Mapping<usize, KeystrokeCallback>>>,
-    },
-
-    ReleaseObservation {
-        id: usize,
-        entity_id: usize,
-        #[allow(clippy::type_complexity)]
-        observations:
-            Option<Weak<Mutex<HashMap<usize, BTreeMap<usize, ReleaseObservationCallback>>>>>,
-    },
-    ActionObservation {
-        id: usize,
-        observations: Option<Weak<Mutex<BTreeMap<usize, ActionObservationCallback>>>>,
-    },
+    Subscription(callback_collection::Subscription<usize, SubscriptionCallback>),
+    Observation(callback_collection::Subscription<usize, ObservationCallback>),
+    GlobalSubscription(callback_collection::Subscription<TypeId, GlobalSubscriptionCallback>),
+    GlobalObservation(callback_collection::Subscription<TypeId, GlobalObservationCallback>),
+    FocusObservation(callback_collection::Subscription<usize, FocusObservationCallback>),
+    WindowActivationObservation(callback_collection::Subscription<usize, WindowActivationCallback>),
+    WindowFullscreenObservation(callback_collection::Subscription<usize, WindowFullscreenCallback>),
+    KeystrokeObservation(callback_collection::Subscription<usize, KeystrokeCallback>),
+    ReleaseObservation(callback_collection::Subscription<usize, ReleaseObservationCallback>),
+    ActionObservation(callback_collection::Subscription<(), ActionObservationCallback>),
 }
 
 impl Subscription {
-    pub fn detach(&mut self) {
+    pub fn id(&self) -> usize {
         match self {
-            Subscription::Subscription { subscriptions, .. } => {
-                subscriptions.take();
-            }
-            Subscription::GlobalSubscription { subscriptions, .. } => {
-                subscriptions.take();
-            }
-            Subscription::Observation { observations, .. } => {
-                observations.take();
-            }
-            Subscription::GlobalObservation { observations, .. } => {
-                observations.take();
-            }
-            Subscription::ReleaseObservation { observations, .. } => {
-                observations.take();
-            }
-            Subscription::FocusObservation { observations, .. } => {
-                observations.take();
-            }
-            Subscription::ActionObservation { observations, .. } => {
-                observations.take();
-            }
-            Subscription::KeystrokeObservation { observations, .. } => {
-                observations.take();
-            }
-            Subscription::WindowActivationObservation { observations, .. } => {
-                observations.take();
-            }
-            Subscription::WindowFullscreenObservation { observations, .. } => {
-                observations.take();
-            }
+            Subscription::Subscription(subscription) => subscription.id(),
+            Subscription::Observation(subscription) => subscription.id(),
+            Subscription::GlobalSubscription(subscription) => subscription.id(),
+            Subscription::GlobalObservation(subscription) => subscription.id(),
+            Subscription::FocusObservation(subscription) => subscription.id(),
+            Subscription::WindowActivationObservation(subscription) => subscription.id(),
+            Subscription::WindowFullscreenObservation(subscription) => subscription.id(),
+            Subscription::KeystrokeObservation(subscription) => subscription.id(),
+            Subscription::ReleaseObservation(subscription) => subscription.id(),
+            Subscription::ActionObservation(subscription) => subscription.id(),
         }
     }
-}
 
-impl Drop for Subscription {
-    fn drop(&mut self) {
+    pub fn detach(&mut self) {
         match self {
-            Subscription::Subscription {
-                id,
-                entity_id,
-                subscriptions,
-            } => {
-                if let Some(subscriptions) = subscriptions.as_ref().and_then(Weak::upgrade) {
-                    match subscriptions
-                        .lock()
-                        .entry(*entity_id)
-                        .or_default()
-                        .entry(*id)
-                    {
-                        btree_map::Entry::Vacant(entry) => {
-                            entry.insert(None);
-                        }
-                        btree_map::Entry::Occupied(entry) => {
-                            entry.remove();
-                        }
-                    }
-                }
-            }
-            Subscription::GlobalSubscription {
-                id,
-                type_id,
-                subscriptions,
-            } => {
-                if let Some(subscriptions) = subscriptions.as_ref().and_then(Weak::upgrade) {
-                    match subscriptions.lock().entry(*type_id).or_default().entry(*id) {
-                        btree_map::Entry::Vacant(entry) => {
-                            entry.insert(None);
-                        }
-                        btree_map::Entry::Occupied(entry) => {
-                            entry.remove();
-                        }
-                    }
-                }
-            }
-            Subscription::Observation {
-                id,
-                entity_id,
-                observations,
-            } => {
-                if let Some(observations) = observations.as_ref().and_then(Weak::upgrade) {
-                    match observations
-                        .lock()
-                        .entry(*entity_id)
-                        .or_default()
-                        .entry(*id)
-                    {
-                        btree_map::Entry::Vacant(entry) => {
-                            entry.insert(None);
-                        }
-                        btree_map::Entry::Occupied(entry) => {
-                            entry.remove();
-                        }
-                    }
-                }
-            }
-            Subscription::GlobalObservation {
-                id,
-                type_id,
-                observations,
-            } => {
-                if let Some(observations) = observations.as_ref().and_then(Weak::upgrade) {
-                    match observations.lock().entry(*type_id).or_default().entry(*id) {
-                        collections::btree_map::Entry::Vacant(entry) => {
-                            entry.insert(None);
-                        }
-                        collections::btree_map::Entry::Occupied(entry) => {
-                            entry.remove();
-                        }
-                    }
-                }
-            }
-            Subscription::ReleaseObservation {
-                id,
-                entity_id,
-                observations,
-            } => {
-                if let Some(observations) = observations.as_ref().and_then(Weak::upgrade) {
-                    if let Some(observations) = observations.lock().get_mut(entity_id) {
-                        observations.remove(id);
-                    }
-                }
-            }
-            Subscription::FocusObservation {
-                id,
-                view_id,
-                observations,
-            } => {
-                if let Some(observations) = observations.as_ref().and_then(Weak::upgrade) {
-                    match observations.lock().entry(*view_id).or_default().entry(*id) {
-                        btree_map::Entry::Vacant(entry) => {
-                            entry.insert(None);
-                        }
-                        btree_map::Entry::Occupied(entry) => {
-                            entry.remove();
-                        }
-                    }
-                }
-            }
-            Subscription::ActionObservation { id, observations } => {
-                if let Some(observations) = observations.as_ref().and_then(Weak::upgrade) {
-                    observations.lock().remove(id);
-                }
-            }
-            Subscription::KeystrokeObservation {
-                id,
-                window_id,
-                observations,
-            } => {
-                if let Some(observations) = observations.as_ref().and_then(Weak::upgrade) {
-                    match observations
-                        .lock()
-                        .entry(*window_id)
-                        .or_default()
-                        .entry(*id)
-                    {
-                        btree_map::Entry::Vacant(entry) => {
-                            entry.insert(None);
-                        }
-                        btree_map::Entry::Occupied(entry) => {
-                            entry.remove();
-                        }
-                    }
-                }
-            }
-            Subscription::WindowActivationObservation {
-                id,
-                window_id,
-                observations,
-            } => {
-                if let Some(observations) = observations.as_ref().and_then(Weak::upgrade) {
-                    match observations
-                        .lock()
-                        .entry(*window_id)
-                        .or_default()
-                        .entry(*id)
-                    {
-                        btree_map::Entry::Vacant(entry) => {
-                            entry.insert(None);
-                        }
-                        btree_map::Entry::Occupied(entry) => {
-                            entry.remove();
-                        }
-                    }
-                }
-            }
-            Subscription::WindowFullscreenObservation {
-                id,
-                window_id,
-                observations,
-            } => {
-                if let Some(observations) = observations.as_ref().and_then(Weak::upgrade) {
-                    match observations
-                        .lock()
-                        .entry(*window_id)
-                        .or_default()
-                        .entry(*id)
-                    {
-                        btree_map::Entry::Vacant(entry) => {
-                            entry.insert(None);
-                        }
-                        btree_map::Entry::Occupied(entry) => {
-                            entry.remove();
-                        }
-                    }
-                }
-            }
+            Subscription::Subscription(subscription) => subscription.detach(),
+            Subscription::GlobalSubscription(subscription) => subscription.detach(),
+            Subscription::Observation(subscription) => subscription.detach(),
+            Subscription::GlobalObservation(subscription) => subscription.detach(),
+            Subscription::FocusObservation(subscription) => subscription.detach(),
+            Subscription::KeystrokeObservation(subscription) => subscription.detach(),
+            Subscription::WindowActivationObservation(subscription) => subscription.detach(),
+            Subscription::WindowFullscreenObservation(subscription) => subscription.detach(),
+            Subscription::ReleaseObservation(subscription) => subscription.detach(),
+            Subscription::ActionObservation(subscription) => subscription.detach(),
         }
     }
 }
@@ -6015,60 +5758,44 @@ mod tests {
 
     #[crate::test(self)]
     fn test_view_events(cx: &mut MutableAppContext) {
-        #[derive(Default)]
-        struct View {
-            events: Vec<usize>,
-        }
-
-        impl Entity for View {
-            type Event = usize;
-        }
-
-        impl super::View for View {
-            fn render(&mut self, _: &mut RenderContext<Self>) -> ElementBox {
-                Empty::new().boxed()
-            }
-
-            fn ui_name() -> &'static str {
-                "View"
-            }
-        }
-
         struct Model;
 
         impl Entity for Model {
-            type Event = usize;
+            type Event = String;
         }
 
-        let (_, handle_1) = cx.add_window(Default::default(), |_| View::default());
-        let handle_2 = cx.add_view(&handle_1, |_| View::default());
+        let (_, handle_1) = cx.add_window(Default::default(), |_| TestView::default());
+        let handle_2 = cx.add_view(&handle_1, |_| TestView::default());
         let handle_3 = cx.add_model(|_| Model);
 
         handle_1.update(cx, |_, cx| {
             cx.subscribe(&handle_2, move |me, emitter, event, cx| {
-                me.events.push(*event);
+                me.events.push(event.clone());
 
                 cx.subscribe(&emitter, |me, _, event, _| {
-                    me.events.push(*event * 2);
+                    me.events.push(format!("{event} from inner"));
                 })
                 .detach();
             })
             .detach();
 
             cx.subscribe(&handle_3, |me, _, event, _| {
-                me.events.push(*event);
+                me.events.push(event.clone());
             })
             .detach();
         });
 
-        handle_2.update(cx, |_, c| c.emit(7));
-        assert_eq!(handle_1.read(cx).events, vec![7]);
+        handle_2.update(cx, |_, c| c.emit("7".into()));
+        assert_eq!(handle_1.read(cx).events, vec!["7"]);
 
-        handle_2.update(cx, |_, c| c.emit(5));
-        assert_eq!(handle_1.read(cx).events, vec![7, 5, 10]);
+        handle_2.update(cx, |_, c| c.emit("5".into()));
+        assert_eq!(handle_1.read(cx).events, vec!["7", "5", "5 from inner"]);
 
-        handle_3.update(cx, |_, c| c.emit(9));
-        assert_eq!(handle_1.read(cx).events, vec![7, 5, 10, 9]);
+        handle_3.update(cx, |_, c| c.emit("9".into()));
+        assert_eq!(
+            handle_1.read(cx).events,
+            vec!["7", "5", "5 from inner", "9"]
+        );
     }
 
     #[crate::test(self)]
@@ -6259,31 +5986,15 @@ mod tests {
 
     #[crate::test(self)]
     fn test_dropping_subscribers(cx: &mut MutableAppContext) {
-        struct View;
-
-        impl Entity for View {
-            type Event = ();
-        }
-
-        impl super::View for View {
-            fn render(&mut self, _: &mut RenderContext<Self>) -> ElementBox {
-                Empty::new().boxed()
-            }
-
-            fn ui_name() -> &'static str {
-                "View"
-            }
-        }
-
         struct Model;
 
         impl Entity for Model {
             type Event = ();
         }
 
-        let (_, root_view) = cx.add_window(Default::default(), |_| View);
-        let observing_view = cx.add_view(&root_view, |_| View);
-        let emitting_view = cx.add_view(&root_view, |_| View);
+        let (_, root_view) = cx.add_window(Default::default(), |_| TestView::default());
+        let observing_view = cx.add_view(&root_view, |_| TestView::default());
+        let emitting_view = cx.add_view(&root_view, |_| TestView::default());
         let observing_model = cx.add_model(|_| Model);
         let observed_model = cx.add_model(|_| Model);
 
@@ -6300,165 +6011,117 @@ mod tests {
             drop(observing_model);
         });
 
-        emitting_view.update(cx, |_, cx| cx.emit(()));
+        emitting_view.update(cx, |_, cx| cx.emit(Default::default()));
         observed_model.update(cx, |_, cx| cx.emit(()));
     }
 
     #[crate::test(self)]
     fn test_view_emit_before_subscribe_in_same_update_cycle(cx: &mut MutableAppContext) {
-        #[derive(Default)]
-        struct TestView;
-
-        impl Entity for TestView {
-            type Event = ();
-        }
-
-        impl View for TestView {
-            fn ui_name() -> &'static str {
-                "TestView"
-            }
-
-            fn render(&mut self, _: &mut RenderContext<Self>) -> ElementBox {
-                Empty::new().boxed()
-            }
-        }
-
-        let events = Rc::new(RefCell::new(Vec::new()));
-        cx.add_window(Default::default(), |cx| {
+        let (_, view) = cx.add_window::<TestView, _>(Default::default(), |cx| {
             drop(cx.subscribe(&cx.handle(), {
-                let events = events.clone();
-                move |_, _, _, _| events.borrow_mut().push("dropped before flush")
+                move |this, _, _, _| this.events.push("dropped before flush".into())
             }));
             cx.subscribe(&cx.handle(), {
-                let events = events.clone();
-                move |_, _, _, _| events.borrow_mut().push("before emit")
+                move |this, _, _, _| this.events.push("before emit".into())
             })
             .detach();
-            cx.emit(());
+            cx.emit("the event".into());
             cx.subscribe(&cx.handle(), {
-                let events = events.clone();
-                move |_, _, _, _| events.borrow_mut().push("after emit")
+                move |this, _, _, _| this.events.push("after emit".into())
             })
             .detach();
-            TestView
+            TestView { events: Vec::new() }
         });
-        assert_eq!(*events.borrow(), ["before emit"]);
+
+        assert_eq!(view.read(cx).events, ["before emit"]);
     }
 
     #[crate::test(self)]
     fn test_observe_and_notify_from_view(cx: &mut MutableAppContext) {
-        #[derive(Default)]
-        struct View {
-            events: Vec<usize>,
-        }
-
-        impl Entity for View {
-            type Event = usize;
-        }
-
-        impl super::View for View {
-            fn render(&mut self, _: &mut RenderContext<Self>) -> ElementBox {
-                Empty::new().boxed()
-            }
-
-            fn ui_name() -> &'static str {
-                "View"
-            }
-        }
-
         #[derive(Default)]
         struct Model {
-            count: usize,
+            state: String,
         }
 
         impl Entity for Model {
             type Event = ();
         }
 
-        let (_, view) = cx.add_window(Default::default(), |_| View::default());
-        let model = cx.add_model(|_| Model::default());
+        let (_, view) = cx.add_window(Default::default(), |_| TestView::default());
+        let model = cx.add_model(|_| Model {
+            state: "old-state".into(),
+        });
 
         view.update(cx, |_, c| {
             c.observe(&model, |me, observed, c| {
-                me.events.push(observed.read(c).count)
+                me.events.push(observed.read(c).state.clone())
             })
             .detach();
         });
 
-        model.update(cx, |model, c| {
-            model.count = 11;
-            c.notify();
+        model.update(cx, |model, cx| {
+            model.state = "new-state".into();
+            cx.notify();
         });
-        assert_eq!(view.read(cx).events, vec![11]);
+        assert_eq!(view.read(cx).events, vec!["new-state"]);
     }
 
     #[crate::test(self)]
     fn test_view_notify_before_observe_in_same_update_cycle(cx: &mut MutableAppContext) {
-        #[derive(Default)]
-        struct TestView;
-
-        impl Entity for TestView {
-            type Event = ();
-        }
-
-        impl View for TestView {
-            fn ui_name() -> &'static str {
-                "TestView"
-            }
-
-            fn render(&mut self, _: &mut RenderContext<Self>) -> ElementBox {
-                Empty::new().boxed()
-            }
-        }
-
-        let events = Rc::new(RefCell::new(Vec::new()));
-        cx.add_window(Default::default(), |cx| {
+        let (_, view) = cx.add_window::<TestView, _>(Default::default(), |cx| {
             drop(cx.observe(&cx.handle(), {
-                let events = events.clone();
-                move |_, _, _| events.borrow_mut().push("dropped before flush")
+                move |this, _, _| this.events.push("dropped before flush".into())
             }));
             cx.observe(&cx.handle(), {
-                let events = events.clone();
-                move |_, _, _| events.borrow_mut().push("before notify")
+                move |this, _, _| this.events.push("before notify".into())
             })
             .detach();
             cx.notify();
             cx.observe(&cx.handle(), {
-                let events = events.clone();
-                move |_, _, _| events.borrow_mut().push("after notify")
+                move |this, _, _| this.events.push("after notify".into())
             })
             .detach();
-            TestView
+            TestView { events: Vec::new() }
         });
-        assert_eq!(*events.borrow(), ["before notify"]);
+
+        assert_eq!(view.read(cx).events, ["before notify"]);
     }
 
     #[crate::test(self)]
-    fn test_dropping_observers(cx: &mut MutableAppContext) {
-        struct View;
-
-        impl Entity for View {
+    fn test_notify_and_drop_observe_subscription_in_same_update_cycle(cx: &mut MutableAppContext) {
+        struct Model;
+        impl Entity for Model {
             type Event = ();
         }
 
-        impl super::View for View {
-            fn render(&mut self, _: &mut RenderContext<Self>) -> ElementBox {
-                Empty::new().boxed()
-            }
+        let model = cx.add_model(|_| Model);
+        let (_, view) = cx.add_window(Default::default(), |_| TestView::default());
 
-            fn ui_name() -> &'static str {
-                "View"
-            }
+        view.update(cx, |_, cx| {
+            model.update(cx, |_, cx| cx.notify());
+            drop(cx.observe(&model, move |this, _, _| {
+                this.events.push("model notified".into());
+            }));
+            model.update(cx, |_, cx| cx.notify());
+        });
+
+        for _ in 0..3 {
+            model.update(cx, |_, cx| cx.notify());
         }
 
+        assert_eq!(view.read(cx).events, Vec::<String>::new());
+    }
+
+    #[crate::test(self)]
+    fn test_dropping_observers(cx: &mut MutableAppContext) {
         struct Model;
 
         impl Entity for Model {
             type Event = ();
         }
 
-        let (_, root_view) = cx.add_window(Default::default(), |_| View);
-        let observing_view = cx.add_view(root_view, |_| View);
+        let (_, root_view) = cx.add_window(Default::default(), |_| TestView::default());
+        let observing_view = cx.add_view(root_view, |_| TestView::default());
         let observing_model = cx.add_model(|_| Model);
         let observed_model = cx.add_model(|_| Model);
 

crates/gpui/src/app/callback_collection.rs 🔗

@@ -1,19 +1,44 @@
+use crate::MutableAppContext;
+use collections::{BTreeMap, HashMap, HashSet};
+use parking_lot::Mutex;
 use std::sync::Arc;
 use std::{hash::Hash, sync::Weak};
 
-use parking_lot::Mutex;
+pub struct CallbackCollection<K: Clone + Hash + Eq, F> {
+    internal: Arc<Mutex<Mapping<K, F>>>,
+}
 
-use collections::{btree_map, BTreeMap, HashMap};
+pub struct Subscription<K: Clone + Hash + Eq, F> {
+    key: K,
+    id: usize,
+    mapping: Option<Weak<Mutex<Mapping<K, F>>>>,
+}
 
-use crate::MutableAppContext;
+struct Mapping<K, F> {
+    callbacks: HashMap<K, BTreeMap<usize, F>>,
+    dropped_subscriptions: HashMap<K, HashSet<usize>>,
+}
 
-pub type Mapping<K, F> = Mutex<HashMap<K, BTreeMap<usize, Option<F>>>>;
+impl<K: Hash + Eq, F> Mapping<K, F> {
+    fn clear_dropped_state(&mut self, key: &K, subscription_id: usize) -> bool {
+        if let Some(subscriptions) = self.dropped_subscriptions.get_mut(&key) {
+            subscriptions.remove(&subscription_id)
+        } else {
+            false
+        }
+    }
+}
 
-pub struct CallbackCollection<K: Hash + Eq, F> {
-    internal: Arc<Mapping<K, F>>,
+impl<K, F> Default for Mapping<K, F> {
+    fn default() -> Self {
+        Self {
+            callbacks: Default::default(),
+            dropped_subscriptions: Default::default(),
+        }
+    }
 }
 
-impl<K: Hash + Eq, F> Clone for CallbackCollection<K, F> {
+impl<K: Clone + Hash + Eq, F> Clone for CallbackCollection<K, F> {
     fn clone(&self) -> Self {
         Self {
             internal: self.internal.clone(),
@@ -21,7 +46,7 @@ impl<K: Hash + Eq, F> Clone for CallbackCollection<K, F> {
     }
 }
 
-impl<K: Hash + Eq + Copy, F> Default for CallbackCollection<K, F> {
+impl<K: Clone + Hash + Eq + Copy, F> Default for CallbackCollection<K, F> {
     fn default() -> Self {
         CallbackCollection {
             internal: Arc::new(Mutex::new(Default::default())),
@@ -29,78 +54,114 @@ impl<K: Hash + Eq + Copy, F> Default for CallbackCollection<K, F> {
     }
 }
 
-impl<K: Hash + Eq + Copy, F> CallbackCollection<K, F> {
-    pub fn downgrade(&self) -> Weak<Mapping<K, F>> {
-        Arc::downgrade(&self.internal)
-    }
-
+impl<K: Clone + Hash + Eq + Copy, F> CallbackCollection<K, F> {
     #[cfg(test)]
     pub fn is_empty(&self) -> bool {
-        self.internal.lock().is_empty()
+        self.internal.lock().callbacks.is_empty()
     }
 
-    pub fn add_callback(&mut self, id: K, subscription_id: usize, callback: F) {
-        self.internal
-            .lock()
-            .entry(id)
-            .or_default()
-            .insert(subscription_id, Some(callback));
+    pub fn subscribe(&mut self, key: K, subscription_id: usize) -> Subscription<K, F> {
+        Subscription {
+            key,
+            id: subscription_id,
+            mapping: Some(Arc::downgrade(&self.internal)),
+        }
     }
 
-    pub fn remove(&mut self, id: K) {
-        self.internal.lock().remove(&id);
-    }
+    pub fn add_callback(&mut self, key: K, subscription_id: usize, callback: F) {
+        let mut this = self.internal.lock();
+
+        // If this callback's subscription was dropped before the callback was
+        // added, then just drop the callback.
+        if this.clear_dropped_state(&key, subscription_id) {
+            return;
+        }
 
-    pub fn add_or_remove_callback(&mut self, id: K, subscription_id: usize, callback: F) {
-        match self
-            .internal
-            .lock()
-            .entry(id)
+        this.callbacks
+            .entry(key)
             .or_default()
-            .entry(subscription_id)
-        {
-            btree_map::Entry::Vacant(entry) => {
-                entry.insert(Some(callback));
-            }
+            .insert(subscription_id, callback);
+    }
 
-            btree_map::Entry::Occupied(entry) => {
-                // TODO: This seems like it should never be called because no code
-                // should ever attempt to remove an existing callback
-                debug_assert!(entry.get().is_none());
-                entry.remove();
-            }
-        }
+    pub fn remove(&mut self, key: K) {
+        // Drop these callbacks after releasing the lock, in case one of them
+        // owns a subscription to this callback collection.
+        let mut this = self.internal.lock();
+        let callbacks = this.callbacks.remove(&key);
+        this.dropped_subscriptions.remove(&key);
+        drop(this);
+        drop(callbacks);
     }
 
-    pub fn emit_and_cleanup<C: FnMut(&mut F, &mut MutableAppContext) -> bool>(
+    pub fn emit<C: FnMut(&mut F, &mut MutableAppContext) -> bool>(
         &mut self,
-        id: K,
+        key: K,
         cx: &mut MutableAppContext,
         mut call_callback: C,
     ) {
-        let callbacks = self.internal.lock().remove(&id);
+        let callbacks = self.internal.lock().callbacks.remove(&key);
         if let Some(callbacks) = callbacks {
-            for (subscription_id, callback) in callbacks {
-                if let Some(mut callback) = callback {
-                    let alive = call_callback(&mut callback, cx);
-                    if alive {
-                        match self
-                            .internal
-                            .lock()
-                            .entry(id)
-                            .or_default()
-                            .entry(subscription_id)
-                        {
-                            btree_map::Entry::Vacant(entry) => {
-                                entry.insert(Some(callback));
-                            }
-                            btree_map::Entry::Occupied(entry) => {
-                                entry.remove();
-                            }
-                        }
-                    }
+            for (subscription_id, mut callback) in callbacks {
+                // If this callback's subscription was dropped while invoking an
+                // earlier callback, then just drop the callback.
+                let mut this = self.internal.lock();
+                if this.clear_dropped_state(&key, subscription_id) {
+                    continue;
                 }
+
+                drop(this);
+                let alive = call_callback(&mut callback, cx);
+
+                // If this callback's subscription was dropped while invoking the callback
+                // itself, or if the callback returns false, then just drop the callback.
+                let mut this = self.internal.lock();
+                if this.clear_dropped_state(&key, subscription_id) || !alive {
+                    continue;
+                }
+
+                this.callbacks
+                    .entry(key)
+                    .or_default()
+                    .insert(subscription_id, callback);
             }
         }
     }
 }
+
+impl<K: Clone + Hash + Eq, F> Subscription<K, F> {
+    pub fn id(&self) -> usize {
+        self.id
+    }
+
+    pub fn detach(&mut self) {
+        self.mapping.take();
+    }
+}
+
+impl<K: Clone + Hash + Eq, F> Drop for Subscription<K, F> {
+    fn drop(&mut self) {
+        if let Some(mapping) = self.mapping.as_ref().and_then(|mapping| mapping.upgrade()) {
+            let mut mapping = mapping.lock();
+
+            // If the callback is present in the mapping, then just remove it.
+            if let Some(callbacks) = mapping.callbacks.get_mut(&self.key) {
+                let callback = callbacks.remove(&self.id);
+                if callback.is_some() {
+                    drop(mapping);
+                    drop(callback);
+                    return;
+                }
+            }
+
+            // If this subscription's callback is not present, then either it has been
+            // temporarily removed during emit, or it has not yet been added. Record
+            // that this subscription has been dropped so that the callback can be
+            // removed later.
+            mapping
+                .dropped_subscriptions
+                .entry(self.key.clone())
+                .or_default()
+                .insert(self.id);
+        }
+    }
+}