Use a CallbackCollection for action dispatch observations

Max Brunsfeld created

Change summary

crates/gpui/src/app.rs                     | 66 +++++++++++------------
crates/gpui/src/app/callback_collection.rs | 51 +++--------------
2 files changed, 41 insertions(+), 76 deletions(-)

Detailed changes

crates/gpui/src/app.rs 🔗

@@ -27,7 +27,7 @@ use smol::prelude::*;
 
 pub use action::*;
 use callback_collection::CallbackCollection;
-use collections::{hash_map::Entry, BTreeMap, HashMap, HashSet, VecDeque};
+use collections::{hash_map::Entry, HashMap, HashSet, VecDeque};
 use keymap::MatchResult;
 use platform::Event;
 #[cfg(any(test, feature = "test-support"))]
@@ -621,7 +621,7 @@ pub struct MutableAppContext {
     global_observations: CallbackCollection<TypeId, GlobalObservationCallback>,
     focus_observations: CallbackCollection<usize, FocusObservationCallback>,
     release_observations: CallbackCollection<usize, ReleaseObservationCallback>,
-    action_dispatch_observations: Arc<Mutex<BTreeMap<usize, ActionObservationCallback>>>,
+    action_dispatch_observations: CallbackCollection<(), ActionObservationCallback>,
     window_activation_observations: CallbackCollection<usize, WindowActivationCallback>,
     window_fullscreen_observations: CallbackCollection<usize, WindowFullscreenCallback>,
     keystroke_observations: CallbackCollection<usize, KeystrokeCallback>,
@@ -1189,14 +1189,13 @@ impl MutableAppContext {
     where
         F: 'static + FnMut(TypeId, &mut MutableAppContext),
     {
-        let id = post_inc(&mut self.next_subscription_id);
+        let subscription_id = post_inc(&mut self.next_subscription_id);
         self.action_dispatch_observations
-            .lock()
-            .insert(id, Box::new(callback));
-        Subscription::ActionObservation {
-            id,
-            observations: Some(Arc::downgrade(&self.action_dispatch_observations)),
-        }
+            .add_callback((), subscription_id, Box::new(callback));
+        Subscription::ActionObservation(
+            self.action_dispatch_observations
+                .subscribe((), subscription_id),
+        )
     }
 
     fn observe_window_activation<F>(&mut self, window_id: usize, callback: F) -> Subscription
@@ -2489,11 +2488,12 @@ impl MutableAppContext {
     }
 
     fn handle_action_dispatch_notification_effect(&mut self, action_id: TypeId) {
-        let mut callbacks = mem::take(&mut *self.action_dispatch_observations.lock());
-        for callback in callbacks.values_mut() {
-            callback(action_id, self);
-        }
-        self.action_dispatch_observations.lock().extend(callbacks);
+        self.action_dispatch_observations
+            .clone()
+            .emit((), self, |callback, this| {
+                callback(action_id, this);
+                true
+            });
     }
 
     fn handle_window_should_close_subscription_effect(
@@ -5091,14 +5091,25 @@ pub enum Subscription {
     WindowFullscreenObservation(callback_collection::Subscription<usize, WindowFullscreenCallback>),
     KeystrokeObservation(callback_collection::Subscription<usize, KeystrokeCallback>),
     ReleaseObservation(callback_collection::Subscription<usize, ReleaseObservationCallback>),
-
-    ActionObservation {
-        id: usize,
-        observations: Option<Weak<Mutex<BTreeMap<usize, ActionObservationCallback>>>>,
-    },
+    ActionObservation(callback_collection::Subscription<(), ActionObservationCallback>),
 }
 
 impl Subscription {
+    pub fn id(&self) -> usize {
+        match self {
+            Subscription::Subscription(subscription) => subscription.id(),
+            Subscription::Observation(subscription) => subscription.id(),
+            Subscription::GlobalSubscription(subscription) => subscription.id(),
+            Subscription::GlobalObservation(subscription) => subscription.id(),
+            Subscription::FocusObservation(subscription) => subscription.id(),
+            Subscription::WindowActivationObservation(subscription) => subscription.id(),
+            Subscription::WindowFullscreenObservation(subscription) => subscription.id(),
+            Subscription::KeystrokeObservation(subscription) => subscription.id(),
+            Subscription::ReleaseObservation(subscription) => subscription.id(),
+            Subscription::ActionObservation(subscription) => subscription.id(),
+        }
+    }
+
     pub fn detach(&mut self) {
         match self {
             Subscription::Subscription(subscription) => subscription.detach(),
@@ -5110,22 +5121,7 @@ impl Subscription {
             Subscription::WindowActivationObservation(subscription) => subscription.detach(),
             Subscription::WindowFullscreenObservation(subscription) => subscription.detach(),
             Subscription::ReleaseObservation(subscription) => subscription.detach(),
-            Subscription::ActionObservation { observations, .. } => {
-                observations.take();
-            }
-        }
-    }
-}
-
-impl Drop for Subscription {
-    fn drop(&mut self) {
-        match self {
-            Subscription::ActionObservation { id, observations } => {
-                if let Some(observations) = observations.as_ref().and_then(Weak::upgrade) {
-                    observations.lock().remove(id);
-                }
-            }
-            _ => {}
+            Subscription::ActionObservation(subscription) => subscription.detach(),
         }
     }
 }

crates/gpui/src/app/callback_collection.rs 🔗

@@ -1,44 +1,13 @@
+use crate::MutableAppContext;
+use collections::{BTreeMap, HashMap, HashSet};
+use parking_lot::Mutex;
 use std::sync::Arc;
 use std::{hash::Hash, sync::Weak};
 
-use parking_lot::Mutex;
-
-use collections::{BTreeMap, HashMap, HashSet};
-
-use crate::MutableAppContext;
+pub struct CallbackCollection<K: Clone + Hash + Eq, F> {
+    internal: Arc<Mutex<Mapping<K, F>>>,
+}
 
-// Problem 5: Current bug callbacks currently called many times after being dropped
-// update
-//   notify
-//   observe (push effect)
-//     subscription {id : 5}
-//     pending: [Effect::Notify, Effect::observe { id: 5 }]
-//   drop observation subscription (write None into subscriptions)
-// flush effects
-//   notify
-//   observe
-// Problem 6: Key-value pair is leaked if you drop a callback while calling it, and then never call that set of callbacks again
-// -----------------
-// Problem 1: Many very similar subscription enum variants
-// Problem 2: Subscriptions and CallbackCollections use a shared mutex to update the callback status
-// Problem 3: Current implementation is error prone with regard to uninitialized callbacks or dropping during callback
-// Problem 4: Calling callbacks requires removing all of them from the list and adding them back
-
-// Solution 1 CallbackState:
-//      Add more state to the CallbackCollection map to communicate dropped and initialized status
-//      Solves: P5
-// Solution 2 DroppedSubscriptionList:
-//      Store a parallel set of dropped subscriptions in the Mapping which stores the key and subscription id for all dropped subscriptions
-//      which can be
-// Solution 3 GarbageCollection:
-//      Use some type of traditional garbage collection to handle dropping of callbacks
-//          atomic flag per callback which is looped over in remove dropped entities
-
-// TODO:
-//   - Move subscription id counter to CallbackCollection
-//   - Consider adding a reverse map in Mapping from subscription id to key so that the dropped subscriptions
-//     can be a hashset of usize and the Subscription doesn't need the key
-//   - Investigate why the remaining two types of callback lists can't use the same callback collection and subscriptions
 pub struct Subscription<K: Clone + Hash + Eq, F> {
     key: K,
     id: usize,
@@ -59,10 +28,6 @@ impl<K, F> Default for Mapping<K, F> {
     }
 }
 
-pub(crate) struct CallbackCollection<K: Clone + Hash + Eq, F> {
-    internal: Arc<Mutex<Mapping<K, F>>>,
-}
-
 impl<K: Clone + Hash + Eq, F> Clone for CallbackCollection<K, F> {
     fn clone(&self) -> Self {
         Self {
@@ -150,6 +115,10 @@ impl<K: Clone + Hash + Eq + Copy, F> CallbackCollection<K, F> {
 }
 
 impl<K: Clone + Hash + Eq, F> Subscription<K, F> {
+    pub fn id(&self) -> usize {
+        self.id
+    }
+
     pub fn detach(&mut self) {
         self.mapping.take();
     }