subscription.rs

  1use collections::{BTreeMap, BTreeSet};
  2use parking_lot::Mutex;
  3use std::{cell::Cell, fmt::Debug, mem, rc::Rc, sync::Arc};
  4use util::post_inc;
  5
  6pub(crate) struct SubscriberSet<EmitterKey, Callback>(
  7    Arc<Mutex<SubscriberSetState<EmitterKey, Callback>>>,
  8);
  9
 10impl<EmitterKey, Callback> Clone for SubscriberSet<EmitterKey, Callback> {
 11    fn clone(&self) -> Self {
 12        SubscriberSet(self.0.clone())
 13    }
 14}
 15
 16struct SubscriberSetState<EmitterKey, Callback> {
 17    subscribers: BTreeMap<EmitterKey, Option<BTreeMap<usize, Subscriber<Callback>>>>,
 18    dropped_subscribers: BTreeSet<(EmitterKey, usize)>,
 19    next_subscriber_id: usize,
 20}
 21
 22struct Subscriber<Callback> {
 23    active: Rc<Cell<bool>>,
 24    callback: Callback,
 25}
 26
 27impl<EmitterKey, Callback> SubscriberSet<EmitterKey, Callback>
 28where
 29    EmitterKey: 'static + Ord + Clone + Debug,
 30    Callback: 'static,
 31{
 32    pub fn new() -> Self {
 33        Self(Arc::new(Mutex::new(SubscriberSetState {
 34            subscribers: Default::default(),
 35            dropped_subscribers: Default::default(),
 36            next_subscriber_id: 0,
 37        })))
 38    }
 39
 40    /// Inserts a new `[Subscription]` for the given `emitter_key`. By default, subscriptions
 41    /// are inert, meaning that they won't be listed when calling `[SubscriberSet::remove]` or `[SubscriberSet::retain]`.
 42    /// This method returns a tuple of a `[Subscription]` and an `impl FnOnce`, and you can use the latter
 43    /// to activate the `[Subscription]`.
 44    #[must_use]
 45    pub fn insert(
 46        &self,
 47        emitter_key: EmitterKey,
 48        callback: Callback,
 49    ) -> (Subscription, impl FnOnce()) {
 50        let active = Rc::new(Cell::new(false));
 51        let mut lock = self.0.lock();
 52        let subscriber_id = post_inc(&mut lock.next_subscriber_id);
 53        lock.subscribers
 54            .entry(emitter_key.clone())
 55            .or_default()
 56            .get_or_insert_with(|| Default::default())
 57            .insert(
 58                subscriber_id,
 59                Subscriber {
 60                    active: active.clone(),
 61                    callback,
 62                },
 63            );
 64        let this = self.0.clone();
 65
 66        let subscription = Subscription {
 67            unsubscribe: Some(Box::new(move || {
 68                let mut lock = this.lock();
 69                let Some(subscribers) = lock.subscribers.get_mut(&emitter_key) else {
 70                    // remove was called with this emitter_key
 71                    return;
 72                };
 73
 74                if let Some(subscribers) = subscribers {
 75                    subscribers.remove(&subscriber_id);
 76                    if subscribers.is_empty() {
 77                        lock.subscribers.remove(&emitter_key);
 78                    }
 79                    return;
 80                }
 81
 82                // We didn't manage to remove the subscription, which means it was dropped
 83                // while invoking the callback. Mark it as dropped so that we can remove it
 84                // later.
 85                lock.dropped_subscribers
 86                    .insert((emitter_key, subscriber_id));
 87            })),
 88        };
 89        (subscription, move || active.set(true))
 90    }
 91
 92    pub fn remove(&self, emitter: &EmitterKey) -> impl IntoIterator<Item = Callback> {
 93        let subscribers = self.0.lock().subscribers.remove(&emitter);
 94        subscribers
 95            .unwrap_or_default()
 96            .map(|s| s.into_values())
 97            .into_iter()
 98            .flatten()
 99            .filter_map(|subscriber| {
100                if subscriber.active.get() {
101                    Some(subscriber.callback)
102                } else {
103                    None
104                }
105            })
106    }
107
108    /// Call the given callback for each subscriber to the given emitter.
109    /// If the callback returns false, the subscriber is removed.
110    pub fn retain<F>(&self, emitter: &EmitterKey, mut f: F)
111    where
112        F: FnMut(&mut Callback) -> bool,
113    {
114        let Some(mut subscribers) = self
115            .0
116            .lock()
117            .subscribers
118            .get_mut(emitter)
119            .and_then(|s| s.take())
120        else {
121            return;
122        };
123
124        subscribers.retain(|_, subscriber| {
125            if subscriber.active.get() {
126                f(&mut subscriber.callback)
127            } else {
128                true
129            }
130        });
131        let mut lock = self.0.lock();
132
133        // Add any new subscribers that were added while invoking the callback.
134        if let Some(Some(new_subscribers)) = lock.subscribers.remove(&emitter) {
135            subscribers.extend(new_subscribers);
136        }
137
138        // Remove any dropped subscriptions that were dropped while invoking the callback.
139        for (dropped_emitter, dropped_subscription_id) in mem::take(&mut lock.dropped_subscribers) {
140            debug_assert_eq!(*emitter, dropped_emitter);
141            subscribers.remove(&dropped_subscription_id);
142        }
143
144        if !subscribers.is_empty() {
145            lock.subscribers.insert(emitter.clone(), Some(subscribers));
146        }
147    }
148}
149
150#[must_use]
151pub struct Subscription {
152    unsubscribe: Option<Box<dyn FnOnce() + 'static>>,
153}
154
155impl Subscription {
156    pub fn detach(mut self) {
157        self.unsubscribe.take();
158    }
159}
160
161impl Drop for Subscription {
162    fn drop(&mut self) {
163        if let Some(unsubscribe) = self.unsubscribe.take() {
164            unsubscribe();
165        }
166    }
167}