subscription.rs

  1use collections::{BTreeMap, BTreeSet};
  2use std::{
  3    cell::{Cell, RefCell},
  4    fmt::Debug,
  5    mem,
  6    rc::Rc,
  7};
  8use util::post_inc;
  9
 10pub(crate) struct SubscriberSet<EmitterKey, Callback>(
 11    Rc<RefCell<SubscriberSetState<EmitterKey, Callback>>>,
 12);
 13
 14impl<EmitterKey, Callback> Clone for SubscriberSet<EmitterKey, Callback> {
 15    fn clone(&self) -> Self {
 16        SubscriberSet(self.0.clone())
 17    }
 18}
 19
 20struct SubscriberSetState<EmitterKey, Callback> {
 21    subscribers: BTreeMap<EmitterKey, Option<BTreeMap<usize, Subscriber<Callback>>>>,
 22    dropped_subscribers: BTreeSet<(EmitterKey, usize)>,
 23    next_subscriber_id: usize,
 24}
 25
 26struct Subscriber<Callback> {
 27    active: Rc<Cell<bool>>,
 28    callback: Callback,
 29}
 30
 31impl<EmitterKey, Callback> SubscriberSet<EmitterKey, Callback>
 32where
 33    EmitterKey: 'static + Ord + Clone + Debug,
 34    Callback: 'static,
 35{
 36    pub fn new() -> Self {
 37        Self(Rc::new(RefCell::new(SubscriberSetState {
 38            subscribers: Default::default(),
 39            dropped_subscribers: Default::default(),
 40            next_subscriber_id: 0,
 41        })))
 42    }
 43
 44    /// Inserts a new [`Subscription`] for the given `emitter_key`. By default, subscriptions
 45    /// are inert, meaning that they won't be listed when calling `[SubscriberSet::remove]` or `[SubscriberSet::retain]`.
 46    /// This method returns a tuple of a [`Subscription`] and an `impl FnOnce`, and you can use the latter
 47    /// to activate the [`Subscription`].
 48    pub fn insert(
 49        &self,
 50        emitter_key: EmitterKey,
 51        callback: Callback,
 52    ) -> (Subscription, impl FnOnce() + use<EmitterKey, Callback>) {
 53        let active = Rc::new(Cell::new(false));
 54        let mut lock = self.0.borrow_mut();
 55        let subscriber_id = post_inc(&mut lock.next_subscriber_id);
 56        lock.subscribers
 57            .entry(emitter_key.clone())
 58            .or_default()
 59            .get_or_insert_with(Default::default)
 60            .insert(
 61                subscriber_id,
 62                Subscriber {
 63                    active: active.clone(),
 64                    callback,
 65                },
 66            );
 67        let this = self.0.clone();
 68
 69        let subscription = Subscription {
 70            unsubscribe: Some(Box::new(move || {
 71                let mut lock = this.borrow_mut();
 72                let Some(subscribers) = lock.subscribers.get_mut(&emitter_key) else {
 73                    // remove was called with this emitter_key
 74                    return;
 75                };
 76
 77                if let Some(subscribers) = subscribers {
 78                    subscribers.remove(&subscriber_id);
 79                    if subscribers.is_empty() {
 80                        lock.subscribers.remove(&emitter_key);
 81                    }
 82                    return;
 83                }
 84
 85                // We didn't manage to remove the subscription, which means it was dropped
 86                // while invoking the callback. Mark it as dropped so that we can remove it
 87                // later.
 88                lock.dropped_subscribers
 89                    .insert((emitter_key, subscriber_id));
 90            })),
 91        };
 92        (subscription, move || active.set(true))
 93    }
 94
 95    pub fn remove(
 96        &self,
 97        emitter: &EmitterKey,
 98    ) -> impl IntoIterator<Item = Callback> + use<EmitterKey, Callback> {
 99        let subscribers = self.0.borrow_mut().subscribers.remove(emitter);
100        subscribers
101            .unwrap_or_default()
102            .map(|s| s.into_values())
103            .into_iter()
104            .flatten()
105            .filter_map(|subscriber| {
106                if subscriber.active.get() {
107                    Some(subscriber.callback)
108                } else {
109                    None
110                }
111            })
112    }
113
114    /// Call the given callback for each subscriber to the given emitter.
115    /// If the callback returns false, the subscriber is removed.
116    pub fn retain<F>(&self, emitter: &EmitterKey, mut f: F)
117    where
118        F: FnMut(&mut Callback) -> bool,
119    {
120        let Some(mut subscribers) = self
121            .0
122            .borrow_mut()
123            .subscribers
124            .get_mut(emitter)
125            .and_then(|s| s.take())
126        else {
127            return;
128        };
129
130        subscribers.retain(|_, subscriber| {
131            if subscriber.active.get() {
132                f(&mut subscriber.callback)
133            } else {
134                true
135            }
136        });
137        let mut lock = self.0.borrow_mut();
138
139        // Add any new subscribers that were added while invoking the callback.
140        if let Some(Some(new_subscribers)) = lock.subscribers.remove(emitter) {
141            subscribers.extend(new_subscribers);
142        }
143
144        // Remove any dropped subscriptions that were dropped while invoking the callback.
145        for (dropped_emitter, dropped_subscription_id) in mem::take(&mut lock.dropped_subscribers) {
146            debug_assert_eq!(*emitter, dropped_emitter);
147            subscribers.remove(&dropped_subscription_id);
148        }
149
150        if !subscribers.is_empty() {
151            lock.subscribers.insert(emitter.clone(), Some(subscribers));
152        }
153    }
154}
155
156/// A handle to a subscription created by GPUI. When dropped, the subscription
157/// is cancelled and the callback will no longer be invoked.
158#[must_use]
159pub struct Subscription {
160    unsubscribe: Option<Box<dyn FnOnce() + 'static>>,
161}
162
163impl Subscription {
164    /// Creates a new subscription with a callback that gets invoked when
165    /// this subscription is dropped.
166    pub fn new(unsubscribe: impl 'static + FnOnce()) -> Self {
167        Self {
168            unsubscribe: Some(Box::new(unsubscribe)),
169        }
170    }
171
172    /// Detaches the subscription from this handle. The callback will
173    /// continue to be invoked until the entities it has been
174    /// subscribed to are dropped
175    pub fn detach(mut self) {
176        self.unsubscribe.take();
177    }
178
179    /// Joins two subscriptions into a single subscription. Detach will
180    /// detach both interior subscriptions.
181    pub fn join(mut subscription_a: Self, mut subscription_b: Self) -> Self {
182        let a_unsubscribe = subscription_a.unsubscribe.take();
183        let b_unsubscribe = subscription_b.unsubscribe.take();
184        Self {
185            unsubscribe: Some(Box::new(move || {
186                if let Some(self_unsubscribe) = a_unsubscribe {
187                    self_unsubscribe();
188                }
189                if let Some(other_unsubscribe) = b_unsubscribe {
190                    other_unsubscribe();
191                }
192            })),
193        }
194    }
195}
196
197impl Drop for Subscription {
198    fn drop(&mut self) {
199        if let Some(unsubscribe) = self.unsubscribe.take() {
200            unsubscribe();
201        }
202    }
203}
204
205impl std::fmt::Debug for Subscription {
206    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
207        f.debug_struct("Subscription").finish()
208    }
209}