callback_collection.rs

  1use collections::{BTreeMap, HashMap, HashSet};
  2use parking_lot::Mutex;
  3use std::sync::Arc;
  4use std::{hash::Hash, sync::Weak};
  5
  6pub struct CallbackCollection<K: Clone + Hash + Eq, F> {
  7    internal: Arc<Mutex<Mapping<K, F>>>,
  8}
  9
 10pub struct Subscription<K: Clone + Hash + Eq, F> {
 11    key: K,
 12    id: usize,
 13    mapping: Option<Weak<Mutex<Mapping<K, F>>>>,
 14}
 15
 16struct Mapping<K, F> {
 17    callbacks: HashMap<K, BTreeMap<usize, F>>,
 18    dropped_subscriptions: HashMap<K, HashSet<usize>>,
 19}
 20
 21impl<K: Hash + Eq, F> Mapping<K, F> {
 22    fn clear_dropped_state(&mut self, key: &K, subscription_id: usize) -> bool {
 23        if let Some(subscriptions) = self.dropped_subscriptions.get_mut(&key) {
 24            subscriptions.remove(&subscription_id)
 25        } else {
 26            false
 27        }
 28    }
 29}
 30
 31impl<K, F> Default for Mapping<K, F> {
 32    fn default() -> Self {
 33        Self {
 34            callbacks: Default::default(),
 35            dropped_subscriptions: Default::default(),
 36        }
 37    }
 38}
 39
 40impl<K: Clone + Hash + Eq, F> Clone for CallbackCollection<K, F> {
 41    fn clone(&self) -> Self {
 42        Self {
 43            internal: self.internal.clone(),
 44        }
 45    }
 46}
 47
 48impl<K: Clone + Hash + Eq + Copy, F> Default for CallbackCollection<K, F> {
 49    fn default() -> Self {
 50        CallbackCollection {
 51            internal: Arc::new(Mutex::new(Default::default())),
 52        }
 53    }
 54}
 55
 56impl<K: Clone + Hash + Eq + Copy, F> CallbackCollection<K, F> {
 57    #[cfg(test)]
 58    pub fn is_empty(&self) -> bool {
 59        self.internal.lock().callbacks.is_empty()
 60    }
 61
 62    pub fn subscribe(&mut self, key: K, subscription_id: usize) -> Subscription<K, F> {
 63        Subscription {
 64            key,
 65            id: subscription_id,
 66            mapping: Some(Arc::downgrade(&self.internal)),
 67        }
 68    }
 69
 70    pub fn add_callback(&mut self, key: K, subscription_id: usize, callback: F) {
 71        let mut this = self.internal.lock();
 72
 73        // If this callback's subscription was dropped before the callback was
 74        // added, then just drop the callback.
 75        if this.clear_dropped_state(&key, subscription_id) {
 76            return;
 77        }
 78
 79        this.callbacks
 80            .entry(key)
 81            .or_default()
 82            .insert(subscription_id, callback);
 83    }
 84
 85    pub fn remove(&mut self, key: K) {
 86        // Drop these callbacks after releasing the lock, in case one of them
 87        // owns a subscription to this callback collection.
 88        let mut this = self.internal.lock();
 89        let callbacks = this.callbacks.remove(&key);
 90        this.dropped_subscriptions.remove(&key);
 91        drop(this);
 92        drop(callbacks);
 93    }
 94
 95    pub fn emit<C>(&mut self, key: K, mut call_callback: C)
 96    where
 97        C: FnMut(&mut F) -> bool,
 98    {
 99        let callbacks = self.internal.lock().callbacks.remove(&key);
100        if let Some(callbacks) = callbacks {
101            for (subscription_id, mut callback) in callbacks {
102                // If this callback's subscription was dropped while invoking an
103                // earlier callback, then just drop the callback.
104                let mut this = self.internal.lock();
105                if this.clear_dropped_state(&key, subscription_id) {
106                    continue;
107                }
108
109                drop(this);
110                let alive = call_callback(&mut callback);
111
112                // If this callback's subscription was dropped while invoking the callback
113                // itself, or if the callback returns false, then just drop the callback.
114                let mut this = self.internal.lock();
115                if this.clear_dropped_state(&key, subscription_id) || !alive {
116                    continue;
117                }
118
119                this.callbacks
120                    .entry(key)
121                    .or_default()
122                    .insert(subscription_id, callback);
123            }
124        }
125    }
126}
127
128impl<K: Clone + Hash + Eq, F> Subscription<K, F> {
129    pub fn id(&self) -> usize {
130        self.id
131    }
132
133    pub fn detach(&mut self) {
134        self.mapping.take();
135    }
136}
137
138impl<K: Clone + Hash + Eq, F> Drop for Subscription<K, F> {
139    fn drop(&mut self) {
140        if let Some(mapping) = self.mapping.as_ref().and_then(|mapping| mapping.upgrade()) {
141            let mut mapping = mapping.lock();
142
143            // If the callback is present in the mapping, then just remove it.
144            if let Some(callbacks) = mapping.callbacks.get_mut(&self.key) {
145                let callback = callbacks.remove(&self.id);
146                if callback.is_some() {
147                    drop(mapping);
148                    drop(callback);
149                    return;
150                }
151            }
152
153            // If this subscription's callback is not present, then either it has been
154            // temporarily removed during emit, or it has not yet been added. Record
155            // that this subscription has been dropped so that the callback can be
156            // removed later.
157            mapping
158                .dropped_subscriptions
159                .entry(self.key.clone())
160                .or_default()
161                .insert(self.id);
162        }
163    }
164}