subscription.rs

  1use collections::{BTreeMap, BTreeSet};
  2use parking_lot::Mutex;
  3use std::{fmt::Debug, mem, sync::Arc};
  4use util::post_inc;
  5
  6pub(crate) struct SubscriberSet<EmitterKey, Callback>(
  7    Arc<Mutex<SubscriberSetState<EmitterKey, Callback>>>,
  8);
  9
 10impl<EmitterKey, Callback> Clone for SubscriberSet<EmitterKey, Callback> {
 11    fn clone(&self) -> Self {
 12        SubscriberSet(self.0.clone())
 13    }
 14}
 15
 16struct SubscriberSetState<EmitterKey, Callback> {
 17    subscribers: BTreeMap<EmitterKey, Option<BTreeMap<usize, Callback>>>,
 18    dropped_subscribers: BTreeSet<(EmitterKey, usize)>,
 19    next_subscriber_id: usize,
 20}
 21
 22impl<EmitterKey, Callback> SubscriberSet<EmitterKey, Callback>
 23where
 24    EmitterKey: 'static + Ord + Clone + Debug,
 25    Callback: 'static,
 26{
 27    pub fn new() -> Self {
 28        Self(Arc::new(Mutex::new(SubscriberSetState {
 29            subscribers: Default::default(),
 30            dropped_subscribers: Default::default(),
 31            next_subscriber_id: 0,
 32        })))
 33    }
 34
 35    pub fn insert(&self, emitter_key: EmitterKey, callback: Callback) -> Subscription {
 36        let mut lock = self.0.lock();
 37        let subscriber_id = post_inc(&mut lock.next_subscriber_id);
 38        lock.subscribers
 39            .entry(emitter_key.clone())
 40            .or_default()
 41            .get_or_insert_with(|| Default::default())
 42            .insert(subscriber_id, callback);
 43        let this = self.0.clone();
 44        Subscription {
 45            unsubscribe: Some(Box::new(move || {
 46                let mut lock = this.lock();
 47                let Some(subscribers) = lock.subscribers.get_mut(&emitter_key) else {
 48                    // remove was called with this emitter_key
 49                    return;
 50                };
 51
 52                if let Some(subscribers) = subscribers {
 53                    subscribers.remove(&subscriber_id);
 54                    if subscribers.is_empty() {
 55                        lock.subscribers.remove(&emitter_key);
 56                    }
 57                    return;
 58                }
 59
 60                // We didn't manage to remove the subscription, which means it was dropped
 61                // while invoking the callback. Mark it as dropped so that we can remove it
 62                // later.
 63                lock.dropped_subscribers
 64                    .insert((emitter_key, subscriber_id));
 65            })),
 66        }
 67    }
 68
 69    pub fn remove(&self, emitter: &EmitterKey) -> impl IntoIterator<Item = Callback> {
 70        let subscribers = self.0.lock().subscribers.remove(&emitter);
 71        subscribers
 72            .unwrap_or_default()
 73            .map(|s| s.into_values())
 74            .into_iter()
 75            .flatten()
 76    }
 77
 78    /// Call the given callback for each subscriber to the given emitter.
 79    /// If the callback returns false, the subscriber is removed.
 80    pub fn retain<F>(&self, emitter: &EmitterKey, mut f: F)
 81    where
 82        F: FnMut(&mut Callback) -> bool,
 83    {
 84        let Some(mut subscribers) = self
 85            .0
 86            .lock()
 87            .subscribers
 88            .get_mut(emitter)
 89            .and_then(|s| s.take())
 90        else {
 91            return;
 92        };
 93
 94        subscribers.retain(|_, callback| f(callback));
 95        let mut lock = self.0.lock();
 96
 97        // Add any new subscribers that were added while invoking the callback.
 98        if let Some(Some(new_subscribers)) = lock.subscribers.remove(&emitter) {
 99            subscribers.extend(new_subscribers);
100        }
101
102        // Remove any dropped subscriptions that were dropped while invoking the callback.
103        for (dropped_emitter, dropped_subscription_id) in mem::take(&mut lock.dropped_subscribers) {
104            debug_assert_eq!(*emitter, dropped_emitter);
105            subscribers.remove(&dropped_subscription_id);
106        }
107
108        if !subscribers.is_empty() {
109            lock.subscribers.insert(emitter.clone(), Some(subscribers));
110        }
111    }
112}
113
114#[must_use]
115pub struct Subscription {
116    unsubscribe: Option<Box<dyn FnOnce() + 'static>>,
117}
118
119impl Subscription {
120    pub fn detach(mut self) {
121        self.unsubscribe.take();
122    }
123}
124
125impl Drop for Subscription {
126    fn drop(&mut self) {
127        if let Some(unsubscribe) = self.unsubscribe.take() {
128            unsubscribe();
129        }
130    }
131}