subscription.rs

  1use collections::{BTreeMap, BTreeSet};
  2use parking_lot::Mutex;
  3use std::{fmt::Debug, mem, sync::Arc};
  4use util::post_inc;
  5
  6pub(crate) struct SubscriberSet<EmitterKey, Callback>(
  7    Arc<Mutex<SubscriberSetState<EmitterKey, Callback>>>,
  8);
  9
 10impl<EmitterKey, Callback> Clone for SubscriberSet<EmitterKey, Callback> {
 11    fn clone(&self) -> Self {
 12        SubscriberSet(self.0.clone())
 13    }
 14}
 15
 16struct SubscriberSetState<EmitterKey, Callback> {
 17    subscribers: BTreeMap<EmitterKey, BTreeMap<usize, Callback>>,
 18    dropped_subscribers: BTreeSet<(EmitterKey, usize)>,
 19    next_subscriber_id: usize,
 20}
 21
 22impl<EmitterKey, Callback> SubscriberSet<EmitterKey, Callback>
 23where
 24    EmitterKey: 'static + Ord + Clone + Debug,
 25    Callback: 'static,
 26{
 27    pub fn new() -> Self {
 28        Self(Arc::new(Mutex::new(SubscriberSetState {
 29            subscribers: Default::default(),
 30            dropped_subscribers: Default::default(),
 31            next_subscriber_id: 0,
 32        })))
 33    }
 34
 35    pub fn insert(&self, emitter: EmitterKey, callback: Callback) -> Subscription {
 36        let mut lock = self.0.lock();
 37        let subscriber_id = post_inc(&mut lock.next_subscriber_id);
 38        lock.subscribers
 39            .entry(emitter.clone())
 40            .or_default()
 41            .insert(subscriber_id, callback);
 42        let this = self.0.clone();
 43        Subscription {
 44            unsubscribe: Some(Box::new(move || {
 45                let mut lock = this.lock();
 46                if let Some(subscribers) = lock.subscribers.get_mut(&emitter) {
 47                    subscribers.remove(&subscriber_id);
 48                    if subscribers.is_empty() {
 49                        lock.subscribers.remove(&emitter);
 50                        return;
 51                    }
 52                }
 53
 54                // We didn't manage to remove the subscription, which means it was dropped
 55                // while invoking the callback. Mark it as dropped so that we can remove it
 56                // later.
 57                lock.dropped_subscribers.insert((emitter, subscriber_id));
 58            })),
 59        }
 60    }
 61
 62    pub fn remove(&self, emitter: &EmitterKey) -> impl IntoIterator<Item = Callback> {
 63        let subscribers = self.0.lock().subscribers.remove(&emitter);
 64        subscribers.unwrap_or_default().into_values()
 65    }
 66
 67    pub fn retain<F>(&self, emitter: &EmitterKey, mut f: F)
 68    where
 69        F: FnMut(&mut Callback) -> bool,
 70    {
 71        let entry = self.0.lock().subscribers.remove_entry(emitter);
 72        if let Some((emitter, mut subscribers)) = entry {
 73            subscribers.retain(|_, callback| f(callback));
 74            let mut lock = self.0.lock();
 75
 76            // Add any new subscribers that were added while invoking the callback.
 77            if let Some(new_subscribers) = lock.subscribers.remove(&emitter) {
 78                subscribers.extend(new_subscribers);
 79            }
 80
 81            // Remove any dropped subscriptions that were dropped while invoking the callback.
 82            for (dropped_emitter, dropped_subscription_id) in
 83                mem::take(&mut lock.dropped_subscribers)
 84            {
 85                debug_assert_eq!(emitter, dropped_emitter);
 86                subscribers.remove(&dropped_subscription_id);
 87            }
 88
 89            if !subscribers.is_empty() {
 90                lock.subscribers.insert(emitter, subscribers);
 91            }
 92        }
 93    }
 94}
 95
 96#[must_use]
 97pub struct Subscription {
 98    unsubscribe: Option<Box<dyn FnOnce()>>,
 99}
100
101impl Subscription {
102    pub fn detach(mut self) {
103        self.unsubscribe.take();
104    }
105}
106
107impl Drop for Subscription {
108    fn drop(&mut self) {
109        if let Some(unsubscribe) = self.unsubscribe.take() {
110            unsubscribe();
111        }
112    }
113}