subscription.rs

  1use collections::BTreeMap;
  2use gpui_util::post_inc;
  3use std::{
  4    cell::{Cell, RefCell},
  5    fmt::Debug,
  6    rc::Rc,
  7};
  8
  9pub(crate) struct SubscriberSet<EmitterKey, Callback>(
 10    Rc<RefCell<SubscriberSetState<EmitterKey, Callback>>>,
 11);
 12
 13impl<EmitterKey, Callback> Clone for SubscriberSet<EmitterKey, Callback> {
 14    fn clone(&self) -> Self {
 15        SubscriberSet(self.0.clone())
 16    }
 17}
 18
 19struct SubscriberSetState<EmitterKey, Callback> {
 20    subscribers: BTreeMap<EmitterKey, Option<BTreeMap<usize, Subscriber<Callback>>>>,
 21    next_subscriber_id: usize,
 22}
 23
 24struct Subscriber<Callback> {
 25    active: Rc<Cell<bool>>,
 26    dropped: Rc<Cell<bool>>,
 27    callback: Callback,
 28}
 29
 30impl<EmitterKey, Callback> SubscriberSet<EmitterKey, Callback>
 31where
 32    EmitterKey: 'static + Ord + Clone + Debug,
 33    Callback: 'static,
 34{
 35    pub fn new() -> Self {
 36        Self(Rc::new(RefCell::new(SubscriberSetState {
 37            subscribers: Default::default(),
 38            next_subscriber_id: 0,
 39        })))
 40    }
 41
 42    /// Inserts a new [`Subscription`] for the given `emitter_key`. By default, subscriptions
 43    /// are inert, meaning that they won't be listed when calling `[SubscriberSet::remove]` or `[SubscriberSet::retain]`.
 44    /// This method returns a tuple of a [`Subscription`] and an `impl FnOnce`, and you can use the latter
 45    /// to activate the [`Subscription`].
 46    pub fn insert(
 47        &self,
 48        emitter_key: EmitterKey,
 49        callback: Callback,
 50    ) -> (Subscription, impl FnOnce() + use<EmitterKey, Callback>) {
 51        let active = Rc::new(Cell::new(false));
 52        let dropped = Rc::new(Cell::new(false));
 53        let mut lock = self.0.borrow_mut();
 54        let subscriber_id = post_inc(&mut lock.next_subscriber_id);
 55        lock.subscribers
 56            .entry(emitter_key.clone())
 57            .or_default()
 58            .get_or_insert_with(Default::default)
 59            .insert(
 60                subscriber_id,
 61                Subscriber {
 62                    active: active.clone(),
 63                    dropped: dropped.clone(),
 64                    callback,
 65                },
 66            );
 67        let this = self.0.clone();
 68
 69        let subscription = Subscription {
 70            unsubscribe: Some(Box::new(move || {
 71                dropped.set(true);
 72
 73                let mut lock = this.borrow_mut();
 74                let Some(subscribers) = lock.subscribers.get_mut(&emitter_key) else {
 75                    return;
 76                };
 77
 78                if let Some(subscribers) = subscribers {
 79                    subscribers.remove(&subscriber_id);
 80                    if subscribers.is_empty() {
 81                        lock.subscribers.remove(&emitter_key);
 82                    }
 83                }
 84            })),
 85        };
 86        (subscription, move || active.set(true))
 87    }
 88
 89    pub fn remove(
 90        &self,
 91        emitter: &EmitterKey,
 92    ) -> impl IntoIterator<Item = Callback> + use<EmitterKey, Callback> {
 93        let subscribers = self.0.borrow_mut().subscribers.remove(emitter);
 94        subscribers
 95            .unwrap_or_default()
 96            .map(|s| s.into_values())
 97            .into_iter()
 98            .flatten()
 99            .filter_map(|subscriber| {
100                if subscriber.active.get() {
101                    Some(subscriber.callback)
102                } else {
103                    None
104                }
105            })
106    }
107
108    /// Call the given callback for each subscriber to the given emitter.
109    /// If the callback returns false, the subscriber is removed.
110    pub fn retain<F>(&self, emitter: &EmitterKey, mut f: F)
111    where
112        F: FnMut(&mut Callback) -> bool,
113    {
114        let Some(mut subscribers) = self
115            .0
116            .borrow_mut()
117            .subscribers
118            .get_mut(emitter)
119            .and_then(|s| s.take())
120        else {
121            return;
122        };
123
124        subscribers.retain(|_, subscriber| {
125            if !subscriber.active.get() {
126                return true;
127            }
128            if subscriber.dropped.get() {
129                return false;
130            }
131            let keep = f(&mut subscriber.callback);
132            keep && !subscriber.dropped.get()
133        });
134        let mut lock = self.0.borrow_mut();
135
136        // Add any new subscribers that were added while invoking the callback.
137        if let Some(Some(new_subscribers)) = lock.subscribers.remove(emitter) {
138            subscribers.extend(new_subscribers);
139        }
140
141        if !subscribers.is_empty() {
142            lock.subscribers.insert(emitter.clone(), Some(subscribers));
143        }
144    }
145}
146
147/// A handle to a subscription created by GPUI. When dropped, the subscription
148/// is cancelled and the callback will no longer be invoked.
149#[must_use]
150pub struct Subscription {
151    unsubscribe: Option<Box<dyn FnOnce() + 'static>>,
152}
153
154impl Subscription {
155    /// Creates a new subscription with a callback that gets invoked when
156    /// this subscription is dropped.
157    pub fn new(unsubscribe: impl 'static + FnOnce()) -> Self {
158        Self {
159            unsubscribe: Some(Box::new(unsubscribe)),
160        }
161    }
162
163    /// Detaches the subscription from this handle. The callback will
164    /// continue to be invoked until the entities it has been
165    /// subscribed to are dropped
166    pub fn detach(mut self) {
167        self.unsubscribe.take();
168    }
169
170    /// Joins two subscriptions into a single subscription. Detach will
171    /// detach both interior subscriptions.
172    pub fn join(mut subscription_a: Self, mut subscription_b: Self) -> Self {
173        let a_unsubscribe = subscription_a.unsubscribe.take();
174        let b_unsubscribe = subscription_b.unsubscribe.take();
175        Self {
176            unsubscribe: Some(Box::new(move || {
177                if let Some(self_unsubscribe) = a_unsubscribe {
178                    self_unsubscribe();
179                }
180                if let Some(other_unsubscribe) = b_unsubscribe {
181                    other_unsubscribe();
182                }
183            })),
184        }
185    }
186}
187
188impl Drop for Subscription {
189    fn drop(&mut self) {
190        if let Some(unsubscribe) = self.unsubscribe.take() {
191            unsubscribe();
192        }
193    }
194}
195
196impl std::fmt::Debug for Subscription {
197    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
198        f.debug_struct("Subscription").finish()
199    }
200}
201
202#[cfg(test)]
203mod tests {
204    use super::*;
205    use crate::{Global, TestApp};
206
207    #[test]
208    fn test_unsubscribe_during_callback_with_insert() {
209        struct TestGlobal;
210        impl Global for TestGlobal {}
211
212        let mut app = TestApp::new();
213        app.set_global(TestGlobal);
214
215        let observer_a_count = Rc::new(Cell::new(0usize));
216        let observer_b_count = Rc::new(Cell::new(0usize));
217
218        let sub_a: Rc<RefCell<Option<Subscription>>> = Default::default();
219        let sub_b: Rc<RefCell<Option<Subscription>>> = Default::default();
220
221        // Observer A fires first (lower subscriber_id). It drops itself and
222        // inserts a new observer for the same global.
223        *sub_a.borrow_mut() = Some(app.update({
224            let count = observer_a_count.clone();
225            let sub_a = sub_a.clone();
226            move |cx| {
227                cx.observe_global::<TestGlobal>(move |cx| {
228                    count.set(count.get() + 1);
229                    sub_a.borrow_mut().take();
230                    cx.observe_global::<TestGlobal>(|_| {}).detach();
231                })
232            }
233        }));
234
235        // Observer B fires second. It just drops itself.
236        *sub_b.borrow_mut() = Some(app.update({
237            let count = observer_b_count.clone();
238            let sub_b = sub_b.clone();
239            move |cx| {
240                cx.observe_global::<TestGlobal>(move |_cx| {
241                    count.set(count.get() + 1);
242                    sub_b.borrow_mut().take();
243                })
244            }
245        }));
246
247        // Both fire once.
248        app.update(|cx| cx.set_global(TestGlobal));
249        assert_eq!(observer_a_count.get(), 1);
250        assert_eq!(observer_b_count.get(), 1);
251
252        // Neither should fire again — both dropped their subscriptions.
253        app.update(|cx| cx.set_global(TestGlobal));
254        assert_eq!(observer_a_count.get(), 1);
255        assert_eq!(observer_b_count.get(), 1, "orphaned subscriber fired again");
256    }
257
258    #[test]
259    fn test_callback_dropped_by_earlier_callback_does_not_fire() {
260        struct TestGlobal;
261        impl Global for TestGlobal {}
262
263        let mut app = TestApp::new();
264        app.set_global(TestGlobal);
265
266        let observer_b_count = Rc::new(Cell::new(0usize));
267        let sub_b: Rc<RefCell<Option<Subscription>>> = Default::default();
268
269        // Observer A fires first and drops B's subscription.
270        app.update({
271            let sub_b = sub_b.clone();
272            move |cx| {
273                cx.observe_global::<TestGlobal>(move |_cx| {
274                    sub_b.borrow_mut().take();
275                })
276                .detach();
277            }
278        });
279
280        // Observer B fires second — but A already dropped it.
281        *sub_b.borrow_mut() = Some(app.update({
282            let count = observer_b_count.clone();
283            move |cx| {
284                cx.observe_global::<TestGlobal>(move |_cx| {
285                    count.set(count.get() + 1);
286                })
287            }
288        }));
289
290        app.update(|cx| cx.set_global(TestGlobal));
291        assert_eq!(
292            observer_b_count.get(),
293            0,
294            "B should not fire — A dropped its subscription"
295        );
296    }
297
298    #[test]
299    fn test_self_drop_during_callback() {
300        struct TestGlobal;
301        impl Global for TestGlobal {}
302
303        let mut app = TestApp::new();
304        app.set_global(TestGlobal);
305
306        let count = Rc::new(Cell::new(0usize));
307        let sub: Rc<RefCell<Option<Subscription>>> = Default::default();
308
309        *sub.borrow_mut() = Some(app.update({
310            let count = count.clone();
311            let sub = sub.clone();
312            move |cx| {
313                cx.observe_global::<TestGlobal>(move |_cx| {
314                    count.set(count.get() + 1);
315                    sub.borrow_mut().take();
316                })
317            }
318        }));
319
320        app.update(|cx| cx.set_global(TestGlobal));
321        assert_eq!(count.get(), 1);
322
323        app.update(|cx| cx.set_global(TestGlobal));
324        assert_eq!(count.get(), 1, "should not fire after self-drop");
325    }
326
327    #[test]
328    fn test_subscription_drop() {
329        struct TestGlobal;
330        impl Global for TestGlobal {}
331
332        let mut app = TestApp::new();
333        app.set_global(TestGlobal);
334
335        let count = Rc::new(Cell::new(0usize));
336
337        let subscription = app.update({
338            let count = count.clone();
339            move |cx| {
340                cx.observe_global::<TestGlobal>(move |_cx| {
341                    count.set(count.get() + 1);
342                })
343            }
344        });
345
346        drop(subscription);
347
348        app.update(|cx| cx.set_global(TestGlobal));
349        assert_eq!(count.get(), 0, "should not fire after drop");
350    }
351}