Fix interleaved subscription operations (#52369)

Mikayla Maki and max created

## Context

Since https://github.com/zed-industries/zed/pull/52232/, there have been
a lot of problems related to panels randomly hiding and showing
themselves. However, the code in that PR is pretty innocuous. It
shouldn't be having that kind of effect. However, it turns out there's a
bug in how GPUI subscriptions work that caused there to be orphaned
observe callbacks firing spuriously. The core sequence is captured in
`test_unsubscribe_during_callback_with_insert`, essentially you have to
have two observers on the same item (`SettingsStore`, in this case),
that both drop themselves. If the first observer _adds_ a callback onto
the _same item_, then the second observer will never successfully drop
itself.

While in there, I also fixed an unrelated bug that @maxbrunsfeld noticed
where if you have two callbacks on the same item, and an earlier
callback drops a later one, then the second callback will spuriously
fire.

I also added a few extra smoke tests to the subscription code, and a
test capturing the observed bug at the workspace level.

## Self-Review Checklist

<!-- Check before requesting review: -->
- [x] I've reviewed my own diff for quality, security, and reliability
- [x] Unsafe blocks (if any) have justifying comments
- [x] The content is consistent with the [UI/UX
checklist](https://github.com/zed-industries/zed/blob/main/CONTRIBUTING.md#uiux-checklist)
- [x] Tests cover the new/changed behavior
- [x] Performance impact has been considered and is acceptable

Release Notes:

- N/A

---------

Co-authored-by: max <max@zed.dev>

Change summary

crates/gpui/src/subscription.rs   | 186 +++++++++++++++++++++++++++++---
crates/workspace/src/workspace.rs |  68 ++++++++++++
2 files changed, 232 insertions(+), 22 deletions(-)

Detailed changes

crates/gpui/src/subscription.rs 🔗

@@ -1,9 +1,8 @@
-use collections::{BTreeMap, BTreeSet};
+use collections::BTreeMap;
 use gpui_util::post_inc;
 use std::{
     cell::{Cell, RefCell},
     fmt::Debug,
-    mem,
     rc::Rc,
 };
 
@@ -19,12 +18,12 @@ impl<EmitterKey, Callback> Clone for SubscriberSet<EmitterKey, Callback> {
 
 struct SubscriberSetState<EmitterKey, Callback> {
     subscribers: BTreeMap<EmitterKey, Option<BTreeMap<usize, Subscriber<Callback>>>>,
-    dropped_subscribers: BTreeSet<(EmitterKey, usize)>,
     next_subscriber_id: usize,
 }
 
 struct Subscriber<Callback> {
     active: Rc<Cell<bool>>,
+    dropped: Rc<Cell<bool>>,
     callback: Callback,
 }
 
@@ -36,7 +35,6 @@ where
     pub fn new() -> Self {
         Self(Rc::new(RefCell::new(SubscriberSetState {
             subscribers: Default::default(),
-            dropped_subscribers: Default::default(),
             next_subscriber_id: 0,
         })))
     }
@@ -51,6 +49,7 @@ where
         callback: Callback,
     ) -> (Subscription, impl FnOnce() + use<EmitterKey, Callback>) {
         let active = Rc::new(Cell::new(false));
+        let dropped = Rc::new(Cell::new(false));
         let mut lock = self.0.borrow_mut();
         let subscriber_id = post_inc(&mut lock.next_subscriber_id);
         lock.subscribers
@@ -61,6 +60,7 @@ where
                 subscriber_id,
                 Subscriber {
                     active: active.clone(),
+                    dropped: dropped.clone(),
                     callback,
                 },
             );
@@ -68,9 +68,10 @@ where
 
         let subscription = Subscription {
             unsubscribe: Some(Box::new(move || {
+                dropped.set(true);
+
                 let mut lock = this.borrow_mut();
                 let Some(subscribers) = lock.subscribers.get_mut(&emitter_key) else {
-                    // remove was called with this emitter_key
                     return;
                 };
 
@@ -79,14 +80,7 @@ where
                     if subscribers.is_empty() {
                         lock.subscribers.remove(&emitter_key);
                     }
-                    return;
                 }
-
-                // We didn't manage to remove the subscription, which means it was dropped
-                // while invoking the callback. Mark it as dropped so that we can remove it
-                // later.
-                lock.dropped_subscribers
-                    .insert((emitter_key, subscriber_id));
             })),
         };
         (subscription, move || active.set(true))
@@ -128,11 +122,14 @@ where
         };
 
         subscribers.retain(|_, subscriber| {
-            if subscriber.active.get() {
-                f(&mut subscriber.callback)
-            } else {
-                true
+            if !subscriber.active.get() {
+                return true;
             }
+            if subscriber.dropped.get() {
+                return false;
+            }
+            let keep = f(&mut subscriber.callback);
+            keep && !subscriber.dropped.get()
         });
         let mut lock = self.0.borrow_mut();
 
@@ -141,12 +138,6 @@ where
             subscribers.extend(new_subscribers);
         }
 
-        // Remove any dropped subscriptions that were dropped while invoking the callback.
-        for (dropped_emitter, dropped_subscription_id) in mem::take(&mut lock.dropped_subscribers) {
-            debug_assert_eq!(*emitter, dropped_emitter);
-            subscribers.remove(&dropped_subscription_id);
-        }
-
         if !subscribers.is_empty() {
             lock.subscribers.insert(emitter.clone(), Some(subscribers));
         }
@@ -207,3 +198,154 @@ impl std::fmt::Debug for Subscription {
         f.debug_struct("Subscription").finish()
     }
 }
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use crate::{Global, TestApp};
+
+    #[test]
+    fn test_unsubscribe_during_callback_with_insert() {
+        struct TestGlobal;
+        impl Global for TestGlobal {}
+
+        let mut app = TestApp::new();
+        app.set_global(TestGlobal);
+
+        let observer_a_count = Rc::new(Cell::new(0usize));
+        let observer_b_count = Rc::new(Cell::new(0usize));
+
+        let sub_a: Rc<RefCell<Option<Subscription>>> = Default::default();
+        let sub_b: Rc<RefCell<Option<Subscription>>> = Default::default();
+
+        // Observer A fires first (lower subscriber_id). It drops itself and
+        // inserts a new observer for the same global.
+        *sub_a.borrow_mut() = Some(app.update({
+            let count = observer_a_count.clone();
+            let sub_a = sub_a.clone();
+            move |cx| {
+                cx.observe_global::<TestGlobal>(move |cx| {
+                    count.set(count.get() + 1);
+                    sub_a.borrow_mut().take();
+                    cx.observe_global::<TestGlobal>(|_| {}).detach();
+                })
+            }
+        }));
+
+        // Observer B fires second. It just drops itself.
+        *sub_b.borrow_mut() = Some(app.update({
+            let count = observer_b_count.clone();
+            let sub_b = sub_b.clone();
+            move |cx| {
+                cx.observe_global::<TestGlobal>(move |_cx| {
+                    count.set(count.get() + 1);
+                    sub_b.borrow_mut().take();
+                })
+            }
+        }));
+
+        // Both fire once.
+        app.update(|cx| cx.set_global(TestGlobal));
+        assert_eq!(observer_a_count.get(), 1);
+        assert_eq!(observer_b_count.get(), 1);
+
+        // Neither should fire again — both dropped their subscriptions.
+        app.update(|cx| cx.set_global(TestGlobal));
+        assert_eq!(observer_a_count.get(), 1);
+        assert_eq!(observer_b_count.get(), 1, "orphaned subscriber fired again");
+    }
+
+    #[test]
+    fn test_callback_dropped_by_earlier_callback_does_not_fire() {
+        struct TestGlobal;
+        impl Global for TestGlobal {}
+
+        let mut app = TestApp::new();
+        app.set_global(TestGlobal);
+
+        let observer_b_count = Rc::new(Cell::new(0usize));
+        let sub_b: Rc<RefCell<Option<Subscription>>> = Default::default();
+
+        // Observer A fires first and drops B's subscription.
+        app.update({
+            let sub_b = sub_b.clone();
+            move |cx| {
+                cx.observe_global::<TestGlobal>(move |_cx| {
+                    sub_b.borrow_mut().take();
+                })
+                .detach();
+            }
+        });
+
+        // Observer B fires second — but A already dropped it.
+        *sub_b.borrow_mut() = Some(app.update({
+            let count = observer_b_count.clone();
+            move |cx| {
+                cx.observe_global::<TestGlobal>(move |_cx| {
+                    count.set(count.get() + 1);
+                })
+            }
+        }));
+
+        app.update(|cx| cx.set_global(TestGlobal));
+        assert_eq!(
+            observer_b_count.get(),
+            0,
+            "B should not fire — A dropped its subscription"
+        );
+    }
+
+    #[test]
+    fn test_self_drop_during_callback() {
+        struct TestGlobal;
+        impl Global for TestGlobal {}
+
+        let mut app = TestApp::new();
+        app.set_global(TestGlobal);
+
+        let count = Rc::new(Cell::new(0usize));
+        let sub: Rc<RefCell<Option<Subscription>>> = Default::default();
+
+        *sub.borrow_mut() = Some(app.update({
+            let count = count.clone();
+            let sub = sub.clone();
+            move |cx| {
+                cx.observe_global::<TestGlobal>(move |_cx| {
+                    count.set(count.get() + 1);
+                    sub.borrow_mut().take();
+                })
+            }
+        }));
+
+        app.update(|cx| cx.set_global(TestGlobal));
+        assert_eq!(count.get(), 1);
+
+        app.update(|cx| cx.set_global(TestGlobal));
+        assert_eq!(count.get(), 1, "should not fire after self-drop");
+    }
+
+    #[test]
+    fn test_subscription_drop() {
+        struct TestGlobal;
+        impl Global for TestGlobal {}
+
+        let mut app = TestApp::new();
+        app.set_global(TestGlobal);
+
+        let count = Rc::new(Cell::new(0usize));
+
+        let subscription = app.update({
+            let count = count.clone();
+            move |cx| {
+                cx.observe_global::<TestGlobal>(move |_cx| {
+                    count.set(count.get() + 1);
+                })
+            }
+        });
+
+        drop(subscription);
+
+        app.update(|cx| cx.set_global(TestGlobal));
+        assert_eq!(count.get(), 0, "should not fire after drop");
+    }
+}

crates/workspace/src/workspace.rs 🔗

@@ -14542,4 +14542,72 @@ mod tests {
             assert!(panel.is_zoomed(window, cx));
         });
     }
+
+    #[gpui::test]
+    async fn test_panels_stay_open_after_position_change_and_settings_update(
+        cx: &mut gpui::TestAppContext,
+    ) {
+        init_test(cx);
+        let fs = FakeFs::new(cx.executor());
+        let project = Project::test(fs, [], cx).await;
+        let (workspace, cx) =
+            cx.add_window_view(|window, cx| Workspace::test_new(project, window, cx));
+
+        // Add two panels to the left dock and open it.
+        let (panel_a, panel_b) = workspace.update_in(cx, |workspace, window, cx| {
+            let panel_a = cx.new(|cx| TestPanel::new(DockPosition::Left, 100, cx));
+            let panel_b = cx.new(|cx| TestPanel::new(DockPosition::Left, 101, cx));
+            workspace.add_panel(panel_a.clone(), window, cx);
+            workspace.add_panel(panel_b.clone(), window, cx);
+            workspace.left_dock().update(cx, |dock, cx| {
+                dock.set_open(true, window, cx);
+                dock.activate_panel(0, window, cx);
+            });
+            (panel_a, panel_b)
+        });
+
+        workspace.update_in(cx, |workspace, _, cx| {
+            assert!(workspace.left_dock().read(cx).is_open());
+        });
+
+        // Simulate a feature flag changing default dock positions: both panels
+        // move from Left to Right.
+        workspace.update_in(cx, |_workspace, _window, cx| {
+            panel_a.update(cx, |p, _cx| p.position = DockPosition::Right);
+            panel_b.update(cx, |p, _cx| p.position = DockPosition::Right);
+            cx.update_global::<SettingsStore, _>(|_, _| {});
+        });
+
+        // Both panels should now be in the right dock.
+        workspace.update_in(cx, |workspace, _, cx| {
+            let right_dock = workspace.right_dock().read(cx);
+            assert_eq!(right_dock.panels_len(), 2);
+        });
+
+        // Open the right dock and activate panel_b (simulating the user
+        // opening the panel after it moved).
+        workspace.update_in(cx, |workspace, window, cx| {
+            workspace.right_dock().update(cx, |dock, cx| {
+                dock.set_open(true, window, cx);
+                dock.activate_panel(1, window, cx);
+            });
+        });
+
+        // Now trigger another SettingsStore change
+        workspace.update_in(cx, |_workspace, _window, cx| {
+            cx.update_global::<SettingsStore, _>(|_, _| {});
+        });
+
+        workspace.update_in(cx, |workspace, _, cx| {
+            assert!(
+                workspace.right_dock().read(cx).is_open(),
+                "Right dock should still be open after a settings change"
+            );
+            assert_eq!(
+                workspace.right_dock().read(cx).panels_len(),
+                2,
+                "Both panels should still be in the right dock"
+            );
+        });
+    }
 }