subscription.rs

  1use collections::{BTreeMap, BTreeSet};
  2use parking_lot::Mutex;
  3use std::{cell::Cell, fmt::Debug, mem, rc::Rc, sync::Arc};
  4use util::post_inc;
  5
  6pub(crate) struct SubscriberSet<EmitterKey, Callback>(
  7    Arc<Mutex<SubscriberSetState<EmitterKey, Callback>>>,
  8);
  9
 10impl<EmitterKey, Callback> Clone for SubscriberSet<EmitterKey, Callback> {
 11    fn clone(&self) -> Self {
 12        SubscriberSet(self.0.clone())
 13    }
 14}
 15
 16struct SubscriberSetState<EmitterKey, Callback> {
 17    subscribers: BTreeMap<EmitterKey, Option<BTreeMap<usize, Subscriber<Callback>>>>,
 18    dropped_subscribers: BTreeSet<(EmitterKey, usize)>,
 19    next_subscriber_id: usize,
 20}
 21
 22struct Subscriber<Callback> {
 23    active: Rc<Cell<bool>>,
 24    callback: Callback,
 25}
 26
 27impl<EmitterKey, Callback> SubscriberSet<EmitterKey, Callback>
 28where
 29    EmitterKey: 'static + Ord + Clone + Debug,
 30    Callback: 'static,
 31{
 32    pub fn new() -> Self {
 33        Self(Arc::new(Mutex::new(SubscriberSetState {
 34            subscribers: Default::default(),
 35            dropped_subscribers: Default::default(),
 36            next_subscriber_id: 0,
 37        })))
 38    }
 39
 40    /// Inserts a new [`Subscription`] for the given `emitter_key`. By default, subscriptions
 41    /// are inert, meaning that they won't be listed when calling `[SubscriberSet::remove]` or `[SubscriberSet::retain]`.
 42    /// This method returns a tuple of a [`Subscription`] and an `impl FnOnce`, and you can use the latter
 43    /// to activate the [`Subscription`].
 44    pub fn insert(
 45        &self,
 46        emitter_key: EmitterKey,
 47        callback: Callback,
 48    ) -> (Subscription, impl FnOnce() + use<EmitterKey, Callback>) {
 49        let active = Rc::new(Cell::new(false));
 50        let mut lock = self.0.lock();
 51        let subscriber_id = post_inc(&mut lock.next_subscriber_id);
 52        lock.subscribers
 53            .entry(emitter_key.clone())
 54            .or_default()
 55            .get_or_insert_with(Default::default)
 56            .insert(
 57                subscriber_id,
 58                Subscriber {
 59                    active: active.clone(),
 60                    callback,
 61                },
 62            );
 63        let this = self.0.clone();
 64
 65        let subscription = Subscription {
 66            unsubscribe: Some(Box::new(move || {
 67                let mut lock = this.lock();
 68                let Some(subscribers) = lock.subscribers.get_mut(&emitter_key) else {
 69                    // remove was called with this emitter_key
 70                    return;
 71                };
 72
 73                if let Some(subscribers) = subscribers {
 74                    subscribers.remove(&subscriber_id);
 75                    if subscribers.is_empty() {
 76                        lock.subscribers.remove(&emitter_key);
 77                    }
 78                    return;
 79                }
 80
 81                // We didn't manage to remove the subscription, which means it was dropped
 82                // while invoking the callback. Mark it as dropped so that we can remove it
 83                // later.
 84                lock.dropped_subscribers
 85                    .insert((emitter_key, subscriber_id));
 86            })),
 87        };
 88        (subscription, move || active.set(true))
 89    }
 90
 91    pub fn remove(
 92        &self,
 93        emitter: &EmitterKey,
 94    ) -> impl IntoIterator<Item = Callback> + use<EmitterKey, Callback> {
 95        let subscribers = self.0.lock().subscribers.remove(emitter);
 96        subscribers
 97            .unwrap_or_default()
 98            .map(|s| s.into_values())
 99            .into_iter()
100            .flatten()
101            .filter_map(|subscriber| {
102                if subscriber.active.get() {
103                    Some(subscriber.callback)
104                } else {
105                    None
106                }
107            })
108    }
109
110    /// Call the given callback for each subscriber to the given emitter.
111    /// If the callback returns false, the subscriber is removed.
112    pub fn retain<F>(&self, emitter: &EmitterKey, mut f: F)
113    where
114        F: FnMut(&mut Callback) -> bool,
115    {
116        let Some(mut subscribers) = self
117            .0
118            .lock()
119            .subscribers
120            .get_mut(emitter)
121            .and_then(|s| s.take())
122        else {
123            return;
124        };
125
126        subscribers.retain(|_, subscriber| {
127            if subscriber.active.get() {
128                f(&mut subscriber.callback)
129            } else {
130                true
131            }
132        });
133        let mut lock = self.0.lock();
134
135        // Add any new subscribers that were added while invoking the callback.
136        if let Some(Some(new_subscribers)) = lock.subscribers.remove(emitter) {
137            subscribers.extend(new_subscribers);
138        }
139
140        // Remove any dropped subscriptions that were dropped while invoking the callback.
141        for (dropped_emitter, dropped_subscription_id) in mem::take(&mut lock.dropped_subscribers) {
142            debug_assert_eq!(*emitter, dropped_emitter);
143            subscribers.remove(&dropped_subscription_id);
144        }
145
146        if !subscribers.is_empty() {
147            lock.subscribers.insert(emitter.clone(), Some(subscribers));
148        }
149    }
150}
151
152/// A handle to a subscription created by GPUI. When dropped, the subscription
153/// is cancelled and the callback will no longer be invoked.
154#[must_use]
155pub struct Subscription {
156    unsubscribe: Option<Box<dyn FnOnce() + 'static>>,
157}
158
159impl Subscription {
160    /// Creates a new subscription with a callback that gets invoked when
161    /// this subscription is dropped.
162    pub fn new(unsubscribe: impl 'static + FnOnce()) -> Self {
163        Self {
164            unsubscribe: Some(Box::new(unsubscribe)),
165        }
166    }
167
168    /// Detaches the subscription from this handle. The callback will
169    /// continue to be invoked until the entities it has been
170    /// subscribed to are dropped
171    pub fn detach(mut self) {
172        self.unsubscribe.take();
173    }
174
175    /// Joins two subscriptions into a single subscription. Detach will
176    /// detach both interior subscriptions.
177    pub fn join(mut subscription_a: Self, mut subscription_b: Self) -> Self {
178        let a_unsubscribe = subscription_a.unsubscribe.take();
179        let b_unsubscribe = subscription_b.unsubscribe.take();
180        Self {
181            unsubscribe: Some(Box::new(move || {
182                if let Some(self_unsubscribe) = a_unsubscribe {
183                    self_unsubscribe();
184                }
185                if let Some(other_unsubscribe) = b_unsubscribe {
186                    other_unsubscribe();
187                }
188            })),
189        }
190    }
191}
192
193impl Drop for Subscription {
194    fn drop(&mut self) {
195        if let Some(unsubscribe) = self.unsubscribe.take() {
196            unsubscribe();
197        }
198    }
199}