Refactor notification/observation callback pattern in MutableAppContext

ForLoveOfCats and Mikayla created

Pull out duplicate code and clarify some misc behavior. Some of this
existing API feels like it's probably incorrect but that needs more
thorough investigation

Co-authored-by: Mikayla <mikayla@zed.dev>

Change summary

crates/gpui/src/app.rs | 531 +++++++++++++------------------------------
1 file changed, 164 insertions(+), 367 deletions(-)

Detailed changes

crates/gpui/src/app.rs 🔗

@@ -962,6 +962,92 @@ type WindowActivationCallback = Box<dyn FnMut(bool, &mut MutableAppContext) -> b
 type DeserializeActionCallback = fn(json: &str) -> anyhow::Result<Box<dyn Action>>;
 type WindowShouldCloseSubscriptionCallback = Box<dyn FnMut(&mut MutableAppContext) -> bool>;
 
+// type SubscriptionMappings<T, F> = Arc<Mutex<HashMap<T, BTreeMap<usize, Option<F>>>>>;
+struct SubscriptionMappings<K: Hash + Eq, F> {
+    internal: Arc<Mutex<HashMap<K, BTreeMap<usize, Option<F>>>>>,
+}
+
+impl<K: Hash + Eq + Copy, F> Default for SubscriptionMappings<K, F> {
+    fn default() -> Self {
+        SubscriptionMappings {
+            internal: Arc::new(Mutex::new(HashMap::new())),
+        }
+    }
+}
+
+impl<K: Hash + Eq + Copy, F> SubscriptionMappings<K, F> {
+    fn clone_ref(&self) -> Self {
+        Self {
+            internal: self.internal.clone(),
+        }
+    }
+
+    fn add_callback(&mut self, id: K, subscription_id: usize, callback: F) {
+        self.internal
+            .lock()
+            .entry(id)
+            .or_default()
+            .insert(subscription_id, Some(callback));
+    }
+
+    fn remove(&mut self, id: K) {
+        self.internal.lock().remove(&id);
+    }
+
+    fn add_or_remove_callback(&mut self, id: K, subscription_id: usize, callback: F) {
+        match self
+            .internal
+            .lock()
+            .entry(id)
+            .or_default()
+            .entry(subscription_id)
+        {
+            btree_map::Entry::Vacant(entry) => {
+                entry.insert(Some(callback));
+            }
+
+            btree_map::Entry::Occupied(entry) => {
+                // TODO: This seems like it should never be called because no code
+                // should ever attempt to remove an existing callback
+                debug_assert!(entry.get().is_none());
+                entry.remove();
+            }
+        }
+    }
+
+    fn emit_and_cleanup<C: FnMut(&mut F, &mut MutableAppContext) -> bool>(
+        &mut self,
+        id: K,
+        cx: &mut MutableAppContext,
+        mut call_callback: C,
+    ) {
+        let callbacks = self.internal.lock().remove(&id);
+        if let Some(callbacks) = callbacks {
+            for (subscription_id, callback) in callbacks {
+                if let Some(mut callback) = callback {
+                    let alive = call_callback(&mut callback, cx);
+                    if alive {
+                        match self
+                            .internal
+                            .lock()
+                            .entry(id)
+                            .or_default()
+                            .entry(subscription_id)
+                        {
+                            btree_map::Entry::Vacant(entry) => {
+                                entry.insert(Some(callback));
+                            }
+                            btree_map::Entry::Occupied(entry) => {
+                                entry.remove();
+                            }
+                        }
+                    }
+                }
+            }
+        }
+    }
+}
+
 pub struct MutableAppContext {
     weak_self: Option<rc::Weak<RefCell<Self>>>,
     foreground_platform: Rc<dyn platform::ForegroundPlatform>,
@@ -976,18 +1062,17 @@ pub struct MutableAppContext {
     next_window_id: usize,
     next_subscription_id: usize,
     frame_count: usize,
-    subscriptions: Arc<Mutex<HashMap<usize, BTreeMap<usize, Option<SubscriptionCallback>>>>>,
-    global_subscriptions:
-        Arc<Mutex<HashMap<TypeId, BTreeMap<usize, Option<GlobalSubscriptionCallback>>>>>,
-    observations: Arc<Mutex<HashMap<usize, BTreeMap<usize, Option<ObservationCallback>>>>>,
-    focus_observations:
-        Arc<Mutex<HashMap<usize, BTreeMap<usize, Option<FocusObservationCallback>>>>>,
-    global_observations:
-        Arc<Mutex<HashMap<TypeId, BTreeMap<usize, Option<GlobalObservationCallback>>>>>,
+
+    focus_observations: SubscriptionMappings<usize, FocusObservationCallback>,
+    global_subscriptions: SubscriptionMappings<TypeId, GlobalSubscriptionCallback>,
+    global_observations: SubscriptionMappings<TypeId, GlobalObservationCallback>,
+    subscriptions: SubscriptionMappings<usize, SubscriptionCallback>,
+    observations: SubscriptionMappings<usize, ObservationCallback>,
+    window_activation_observations: SubscriptionMappings<usize, WindowActivationCallback>,
+    
     release_observations: Arc<Mutex<HashMap<usize, BTreeMap<usize, ReleaseObservationCallback>>>>,
-    window_activation_observations:
-        Arc<Mutex<HashMap<usize, BTreeMap<usize, Option<WindowActivationCallback>>>>>,
     action_dispatch_observations: Arc<Mutex<BTreeMap<usize, ActionObservationCallback>>>,
+
     presenters_and_platform_windows:
         HashMap<usize, (Rc<RefCell<Presenter>>, Box<dyn platform::Window>)>,
     foreground: Rc<executor::Foreground>,
@@ -1395,7 +1480,7 @@ impl MutableAppContext {
         Subscription::GlobalSubscription {
             id: subscription_id,
             type_id,
-            subscriptions: Some(Arc::downgrade(&self.global_subscriptions)),
+            subscriptions: Some(Arc::downgrade(&self.global_subscriptions.internal)),
         }
     }
 
@@ -1436,7 +1521,7 @@ impl MutableAppContext {
         Subscription::Subscription {
             id: subscription_id,
             entity_id: handle.id(),
-            subscriptions: Some(Arc::downgrade(&self.subscriptions)),
+            subscriptions: Some(Arc::downgrade(&self.subscriptions.internal)),
         }
     }
 
@@ -1464,7 +1549,7 @@ impl MutableAppContext {
         Subscription::Observation {
             id: subscription_id,
             entity_id,
-            observations: Some(Arc::downgrade(&self.observations)),
+            observations: Some(Arc::downgrade(&self.observations.internal)),
         }
     }
 
@@ -1476,6 +1561,7 @@ impl MutableAppContext {
         let subscription_id = post_inc(&mut self.next_subscription_id);
         let observed = handle.downgrade();
         let view_id = handle.id();
+
         self.pending_effects.push_back(Effect::FocusObservation {
             view_id,
             subscription_id,
@@ -1487,10 +1573,11 @@ impl MutableAppContext {
                 }
             }),
         });
+
         Subscription::FocusObservation {
             id: subscription_id,
             view_id,
-            observations: Some(Arc::downgrade(&self.focus_observations)),
+            observations: Some(Arc::downgrade(&self.focus_observations.internal)),
         }
     }
 
@@ -1502,20 +1589,16 @@ impl MutableAppContext {
         let type_id = TypeId::of::<G>();
         let id = post_inc(&mut self.next_subscription_id);
 
-        self.global_observations
-            .lock()
-            .entry(type_id)
-            .or_default()
-            .insert(
-                id,
-                Some(Box::new(move |cx: &mut MutableAppContext| observe(cx))
-                    as GlobalObservationCallback),
-            );
+        self.global_observations.add_callback(
+            type_id,
+            id,
+            Box::new(move |cx: &mut MutableAppContext| observe(cx)),
+        );
 
         Subscription::GlobalObservation {
             id,
             type_id,
-            observations: Some(Arc::downgrade(&self.global_observations)),
+            observations: Some(Arc::downgrade(&self.global_observations.internal)),
         }
     }
 
@@ -1573,7 +1656,9 @@ impl MutableAppContext {
         Subscription::WindowActivationObservation {
             id: subscription_id,
             window_id,
-            observations: Some(Arc::downgrade(&self.window_activation_observations)),
+            observations: Some(Arc::downgrade(
+                &self.window_activation_observations.internal,
+            )),
         }
     }
 
@@ -2107,8 +2192,8 @@ impl MutableAppContext {
             }
 
             for model_id in dropped_models {
-                self.subscriptions.lock().remove(&model_id);
-                self.observations.lock().remove(&model_id);
+                self.subscriptions.remove(model_id);
+                self.observations.remove(model_id);
                 let mut model = self.cx.models.remove(&model_id).unwrap();
                 model.release(self);
                 self.pending_effects
@@ -2116,8 +2201,8 @@ impl MutableAppContext {
             }
 
             for (window_id, view_id) in dropped_views {
-                self.subscriptions.lock().remove(&view_id);
-                self.observations.lock().remove(&view_id);
+                self.subscriptions.remove(view_id);
+                self.observations.remove(view_id);
                 let mut view = self.cx.views.remove(&(window_id, view_id)).unwrap();
                 view.release(self);
                 let change_focus_to = self.cx.windows.get_mut(&window_id).and_then(|window| {
@@ -2165,15 +2250,24 @@ impl MutableAppContext {
                             entity_id,
                             subscription_id,
                             callback,
-                        } => self.handle_subscription_effect(entity_id, subscription_id, callback),
+                        } => self.subscriptions.add_or_remove_callback(
+                            entity_id,
+                            subscription_id,
+                            callback,
+                        ),
 
-                        Effect::Event { entity_id, payload } => self.emit_event(entity_id, payload),
+                        Effect::Event { entity_id, payload } => {
+                            let mut subscriptions = self.subscriptions.clone_ref();
+                            subscriptions.emit_and_cleanup(entity_id, self, |callback, this| {
+                                callback(payload.as_ref(), this)
+                            })
+                        }
 
                         Effect::GlobalSubscription {
                             type_id,
                             subscription_id,
                             callback,
-                        } => self.handle_global_subscription_effect(
+                        } => self.global_subscriptions.add_or_remove_callback(
                             type_id,
                             subscription_id,
                             callback,
@@ -2185,10 +2279,16 @@ impl MutableAppContext {
                             entity_id,
                             subscription_id,
                             callback,
-                        } => self.handle_observation_effect(entity_id, subscription_id, callback),
+                        } => self.observations.add_or_remove_callback(
+                            entity_id,
+                            subscription_id,
+                            callback,
+                        ),
 
                         Effect::ModelNotification { model_id } => {
-                            self.handle_model_notification_effect(model_id)
+                            let mut observations = self.observations.clone_ref();
+                            observations
+                                .emit_and_cleanup(model_id, self, |callback, this| callback(this));
                         }
 
                         Effect::ViewNotification { window_id, view_id } => {
@@ -2196,7 +2296,11 @@ impl MutableAppContext {
                         }
 
                         Effect::GlobalNotification { type_id } => {
-                            self.handle_global_notification_effect(type_id)
+                            let mut subscriptions = self.global_observations.clone_ref();
+                            subscriptions.emit_and_cleanup(type_id, self, |callback, this| {
+                                callback(this);
+                                true
+                            });
                         }
 
                         Effect::Deferred {
@@ -2227,7 +2331,11 @@ impl MutableAppContext {
                             subscription_id,
                             callback,
                         } => {
-                            self.handle_focus_observation_effect(view_id, subscription_id, callback)
+                            self.focus_observations.add_or_remove_callback(
+                                view_id,
+                                subscription_id,
+                                callback,
+                            );
                         }
 
                         Effect::ResizeWindow { window_id } => {
@@ -2242,7 +2350,7 @@ impl MutableAppContext {
                             window_id,
                             subscription_id,
                             callback,
-                        } => self.handle_window_activation_observation_effect(
+                        } => self.window_activation_observations.add_or_remove_callback(
                             window_id,
                             subscription_id,
                             callback,
@@ -2369,182 +2477,14 @@ impl MutableAppContext {
         self.presenters_and_platform_windows = presenters;
     }
 
-    fn handle_subscription_effect(
-        &mut self,
-        entity_id: usize,
-        subscription_id: usize,
-        callback: SubscriptionCallback,
-    ) {
-        match self
-            .subscriptions
-            .lock()
-            .entry(entity_id)
-            .or_default()
-            .entry(subscription_id)
-        {
-            btree_map::Entry::Vacant(entry) => {
-                entry.insert(Some(callback));
-            }
-            // Subscription was dropped before effect was processed
-            btree_map::Entry::Occupied(entry) => {
-                debug_assert!(entry.get().is_none());
-                entry.remove();
-            }
-        }
-    }
-
-    fn emit_event(&mut self, entity_id: usize, payload: Box<dyn Any>) {
-        let callbacks = self.subscriptions.lock().remove(&entity_id);
-        if let Some(callbacks) = callbacks {
-            for (id, callback) in callbacks {
-                if let Some(mut callback) = callback {
-                    let alive = callback(payload.as_ref(), self);
-                    if alive {
-                        match self
-                            .subscriptions
-                            .lock()
-                            .entry(entity_id)
-                            .or_default()
-                            .entry(id)
-                        {
-                            btree_map::Entry::Vacant(entry) => {
-                                entry.insert(Some(callback));
-                            }
-                            btree_map::Entry::Occupied(entry) => {
-                                entry.remove();
-                            }
-                        }
-                    }
-                }
-            }
-        }
-    }
-
-    fn handle_global_subscription_effect(
-        &mut self,
-        type_id: TypeId,
-        subscription_id: usize,
-        callback: GlobalSubscriptionCallback,
-    ) {
-        match self
-            .global_subscriptions
-            .lock()
-            .entry(type_id)
-            .or_default()
-            .entry(subscription_id)
-        {
-            btree_map::Entry::Vacant(entry) => {
-                entry.insert(Some(callback));
-            }
-            // Subscription was dropped before effect was processed
-            btree_map::Entry::Occupied(entry) => {
-                debug_assert!(entry.get().is_none());
-                entry.remove();
-            }
-        }
-    }
-
     fn emit_global_event(&mut self, payload: Box<dyn Any>) {
         let type_id = (&*payload).type_id();
-        let callbacks = self.global_subscriptions.lock().remove(&type_id);
-        if let Some(callbacks) = callbacks {
-            for (id, callback) in callbacks {
-                if let Some(mut callback) = callback {
-                    callback(payload.as_ref(), self);
-                    match self
-                        .global_subscriptions
-                        .lock()
-                        .entry(type_id)
-                        .or_default()
-                        .entry(id)
-                    {
-                        btree_map::Entry::Vacant(entry) => {
-                            entry.insert(Some(callback));
-                        }
-                        btree_map::Entry::Occupied(entry) => {
-                            entry.remove();
-                        }
-                    }
-                }
-            }
-        }
-    }
 
-    fn handle_observation_effect(
-        &mut self,
-        entity_id: usize,
-        subscription_id: usize,
-        callback: ObservationCallback,
-    ) {
-        match self
-            .observations
-            .lock()
-            .entry(entity_id)
-            .or_default()
-            .entry(subscription_id)
-        {
-            btree_map::Entry::Vacant(entry) => {
-                entry.insert(Some(callback));
-            }
-            // Observation was dropped before effect was processed
-            btree_map::Entry::Occupied(entry) => {
-                debug_assert!(entry.get().is_none());
-                entry.remove();
-            }
-        }
-    }
-
-    fn handle_focus_observation_effect(
-        &mut self,
-        view_id: usize,
-        subscription_id: usize,
-        callback: FocusObservationCallback,
-    ) {
-        match self
-            .focus_observations
-            .lock()
-            .entry(view_id)
-            .or_default()
-            .entry(subscription_id)
-        {
-            btree_map::Entry::Vacant(entry) => {
-                entry.insert(Some(callback));
-            }
-            // Observation was dropped before effect was processed
-            btree_map::Entry::Occupied(entry) => {
-                debug_assert!(entry.get().is_none());
-                entry.remove();
-            }
-        }
-    }
-
-    fn handle_model_notification_effect(&mut self, observed_id: usize) {
-        let callbacks = self.observations.lock().remove(&observed_id);
-        if let Some(callbacks) = callbacks {
-            if self.cx.models.contains_key(&observed_id) {
-                for (id, callback) in callbacks {
-                    if let Some(mut callback) = callback {
-                        let alive = callback(self);
-                        if alive {
-                            match self
-                                .observations
-                                .lock()
-                                .entry(observed_id)
-                                .or_default()
-                                .entry(id)
-                            {
-                                btree_map::Entry::Vacant(entry) => {
-                                    entry.insert(Some(callback));
-                                }
-                                btree_map::Entry::Occupied(entry) => {
-                                    entry.remove();
-                                }
-                            }
-                        }
-                    }
-                }
-            }
-        }
+        let mut subscriptions = self.global_subscriptions.clone_ref();
+        subscriptions.emit_and_cleanup(type_id, self, |callback, this| {
+            callback(payload.as_ref(), this);
+            true //Always alive
+        });
     }
 
     fn handle_view_notification_effect(
@@ -2552,8 +2492,6 @@ impl MutableAppContext {
         observed_window_id: usize,
         observed_view_id: usize,
     ) {
-        let callbacks = self.observations.lock().remove(&observed_view_id);
-
         if self
             .cx
             .views
@@ -2567,54 +2505,8 @@ impl MutableAppContext {
                     .insert(observed_view_id);
             }
 
-            if let Some(callbacks) = callbacks {
-                for (id, callback) in callbacks {
-                    if let Some(mut callback) = callback {
-                        let alive = callback(self);
-                        if alive {
-                            match self
-                                .observations
-                                .lock()
-                                .entry(observed_view_id)
-                                .or_default()
-                                .entry(id)
-                            {
-                                btree_map::Entry::Vacant(entry) => {
-                                    entry.insert(Some(callback));
-                                }
-                                btree_map::Entry::Occupied(entry) => {
-                                    entry.remove();
-                                }
-                            }
-                        }
-                    }
-                }
-            }
-        }
-    }
-
-    fn handle_global_notification_effect(&mut self, observed_type_id: TypeId) {
-        let callbacks = self.global_observations.lock().remove(&observed_type_id);
-        if let Some(callbacks) = callbacks {
-            for (id, callback) in callbacks {
-                if let Some(mut callback) = callback {
-                    callback(self);
-                    match self
-                        .global_observations
-                        .lock()
-                        .entry(observed_type_id)
-                        .or_default()
-                        .entry(id)
-                    {
-                        collections::btree_map::Entry::Vacant(entry) => {
-                            entry.insert(Some(callback));
-                        }
-                        collections::btree_map::Entry::Occupied(entry) => {
-                            entry.remove();
-                        }
-                    }
-                }
-            }
+            let mut observations = self.observations.clone_ref();
+            observations.emit_and_cleanup(observed_view_id, self, |callback, this| callback(this));
         }
     }
 
@@ -2627,30 +2519,6 @@ impl MutableAppContext {
         }
     }
 
-    fn handle_window_activation_observation_effect(
-        &mut self,
-        window_id: usize,
-        subscription_id: usize,
-        callback: WindowActivationCallback,
-    ) {
-        match self
-            .window_activation_observations
-            .lock()
-            .entry(window_id)
-            .or_default()
-            .entry(subscription_id)
-        {
-            btree_map::Entry::Vacant(entry) => {
-                entry.insert(Some(callback));
-            }
-            // Observation was dropped before effect was processed
-            btree_map::Entry::Occupied(entry) => {
-                debug_assert!(entry.get().is_none());
-                entry.remove();
-            }
-        }
-    }
-
     fn handle_fullscreen_effect(&mut self, window_id: usize, is_fullscreen: bool) {
         //Short circuit evaluation if we're already g2g
         if self
@@ -2667,8 +2535,6 @@ impl MutableAppContext {
             let window = this.cx.windows.get_mut(&window_id)?;
             window.is_fullscreen = is_fullscreen;
 
-            // self.emit_event(entity_id, payload);
-
             Some(())
         });
     }
@@ -2700,35 +2566,8 @@ impl MutableAppContext {
                 this.cx.views.insert((window_id, view_id), view);
             }
 
-            //Deliver events
-            let callbacks = this
-                .window_activation_observations
-                .lock()
-                .remove(&window_id);
-            if let Some(callbacks) = callbacks {
-                for (id, callback) in callbacks {
-                    if let Some(mut callback) = callback {
-                        let alive = callback(active, this);
-                        //Put entry back
-                        if alive {
-                            match this
-                                .window_activation_observations
-                                .lock()
-                                .entry(window_id)
-                                .or_default()
-                                .entry(id)
-                            {
-                                btree_map::Entry::Vacant(entry) => {
-                                    entry.insert(Some(callback));
-                                }
-                                btree_map::Entry::Occupied(entry) => {
-                                    entry.remove();
-                                }
-                            }
-                        }
-                    }
-                }
-            }
+            let mut observations = this.window_activation_observations.clone_ref();
+            observations.emit_and_cleanup(window_id, this, |callback, this| callback(active, this));
 
             Some(())
         });
@@ -2759,30 +2598,9 @@ impl MutableAppContext {
                     blurred_view.on_blur(this, window_id, blurred_id);
                     this.cx.views.insert((window_id, blurred_id), blurred_view);
 
-                    let callbacks = this.focus_observations.lock().remove(&blurred_id);
-                    if let Some(callbacks) = callbacks {
-                        for (id, callback) in callbacks {
-                            if let Some(mut callback) = callback {
-                                let alive = callback(false, this);
-                                if alive {
-                                    match this
-                                        .focus_observations
-                                        .lock()
-                                        .entry(blurred_id)
-                                        .or_default()
-                                        .entry(id)
-                                    {
-                                        btree_map::Entry::Vacant(entry) => {
-                                            entry.insert(Some(callback));
-                                        }
-                                        btree_map::Entry::Occupied(entry) => {
-                                            entry.remove();
-                                        }
-                                    }
-                                }
-                            }
-                        }
-                    }
+                    let mut subscriptions = this.focus_observations.clone_ref();
+                    subscriptions
+                        .emit_and_cleanup(blurred_id, this, |callback, this| callback(false, this));
                 }
             }
 
@@ -2791,30 +2609,9 @@ impl MutableAppContext {
                     focused_view.on_focus(this, window_id, focused_id);
                     this.cx.views.insert((window_id, focused_id), focused_view);
 
-                    let callbacks = this.focus_observations.lock().remove(&focused_id);
-                    if let Some(callbacks) = callbacks {
-                        for (id, callback) in callbacks {
-                            if let Some(mut callback) = callback {
-                                let alive = callback(true, this);
-                                if alive {
-                                    match this
-                                        .focus_observations
-                                        .lock()
-                                        .entry(focused_id)
-                                        .or_default()
-                                        .entry(id)
-                                    {
-                                        btree_map::Entry::Vacant(entry) => {
-                                            entry.insert(Some(callback));
-                                        }
-                                        btree_map::Entry::Occupied(entry) => {
-                                            entry.remove();
-                                        }
-                                    }
-                                }
-                            }
-                        }
-                    }
+                    let mut subscriptions = this.focus_observations.clone_ref();
+                    subscriptions
+                        .emit_and_cleanup(focused_id, this, |callback, this| callback(true, this));
                 }
             }
         })
@@ -5768,8 +5565,8 @@ mod tests {
         });
 
         assert_eq!(cx.cx.models.len(), 1);
-        assert!(cx.subscriptions.lock().is_empty());
-        assert!(cx.observations.lock().is_empty());
+        assert!(cx.subscriptions.internal.lock().is_empty());
+        assert!(cx.observations.internal.lock().is_empty());
     }
 
     #[crate::test(self)]
@@ -6019,8 +5816,8 @@ mod tests {
         });
 
         assert_eq!(cx.cx.views.len(), 2);
-        assert!(cx.subscriptions.lock().is_empty());
-        assert!(cx.observations.lock().is_empty());
+        assert!(cx.subscriptions.internal.lock().is_empty());
+        assert!(cx.observations.internal.lock().is_empty());
     }
 
     #[crate::test(self)]