callback_collection.rs

  1use crate::MutableAppContext;
  2use collections::{BTreeMap, HashMap, HashSet};
  3use parking_lot::Mutex;
  4use std::sync::Arc;
  5use std::{hash::Hash, sync::Weak};
  6
  7pub struct CallbackCollection<K: Clone + Hash + Eq, F> {
  8    internal: Arc<Mutex<Mapping<K, F>>>,
  9}
 10
 11pub struct Subscription<K: Clone + Hash + Eq, F> {
 12    key: K,
 13    id: usize,
 14    mapping: Option<Weak<Mutex<Mapping<K, F>>>>,
 15}
 16
 17struct Mapping<K, F> {
 18    callbacks: HashMap<K, BTreeMap<usize, F>>,
 19    dropped_subscriptions: HashMap<K, HashSet<usize>>,
 20}
 21
 22impl<K: Hash + Eq, F> Mapping<K, F> {
 23    fn clear_dropped_state(&mut self, key: &K, subscription_id: usize) -> bool {
 24        if let Some(subscriptions) = self.dropped_subscriptions.get_mut(&key) {
 25            subscriptions.remove(&subscription_id)
 26        } else {
 27            false
 28        }
 29    }
 30}
 31
 32impl<K, F> Default for Mapping<K, F> {
 33    fn default() -> Self {
 34        Self {
 35            callbacks: Default::default(),
 36            dropped_subscriptions: Default::default(),
 37        }
 38    }
 39}
 40
 41impl<K: Clone + Hash + Eq, F> Clone for CallbackCollection<K, F> {
 42    fn clone(&self) -> Self {
 43        Self {
 44            internal: self.internal.clone(),
 45        }
 46    }
 47}
 48
 49impl<K: Clone + Hash + Eq + Copy, F> Default for CallbackCollection<K, F> {
 50    fn default() -> Self {
 51        CallbackCollection {
 52            internal: Arc::new(Mutex::new(Default::default())),
 53        }
 54    }
 55}
 56
 57impl<K: Clone + Hash + Eq + Copy, F> CallbackCollection<K, F> {
 58    #[cfg(test)]
 59    pub fn is_empty(&self) -> bool {
 60        self.internal.lock().callbacks.is_empty()
 61    }
 62
 63    pub fn count(&self, key: K) -> usize {
 64        self.internal
 65            .lock()
 66            .callbacks
 67            .get(&key)
 68            .map_or(0, |callbacks| callbacks.len())
 69    }
 70
 71    pub fn subscribe(&mut self, key: K, subscription_id: usize) -> Subscription<K, F> {
 72        Subscription {
 73            key,
 74            id: subscription_id,
 75            mapping: Some(Arc::downgrade(&self.internal)),
 76        }
 77    }
 78
 79    pub fn add_callback(&mut self, key: K, subscription_id: usize, callback: F) {
 80        let mut this = self.internal.lock();
 81
 82        // If this callback's subscription was dropped before the callback was
 83        // added, then just drop the callback.
 84        if this.clear_dropped_state(&key, subscription_id) {
 85            return;
 86        }
 87
 88        this.callbacks
 89            .entry(key)
 90            .or_default()
 91            .insert(subscription_id, callback);
 92    }
 93
 94    pub fn remove(&mut self, key: K) {
 95        // Drop these callbacks after releasing the lock, in case one of them
 96        // owns a subscription to this callback collection.
 97        let mut this = self.internal.lock();
 98        let callbacks = this.callbacks.remove(&key);
 99        this.dropped_subscriptions.remove(&key);
100        drop(this);
101        drop(callbacks);
102    }
103
104    pub fn emit<C: FnMut(&mut F, &mut MutableAppContext) -> bool>(
105        &mut self,
106        key: K,
107        cx: &mut MutableAppContext,
108        mut call_callback: C,
109    ) {
110        let callbacks = self.internal.lock().callbacks.remove(&key);
111        if let Some(callbacks) = callbacks {
112            for (subscription_id, mut callback) in callbacks {
113                // If this callback's subscription was dropped while invoking an
114                // earlier callback, then just drop the callback.
115                let mut this = self.internal.lock();
116                if this.clear_dropped_state(&key, subscription_id) {
117                    continue;
118                }
119
120                drop(this);
121                let alive = call_callback(&mut callback, cx);
122
123                // If this callback's subscription was dropped while invoking the callback
124                // itself, or if the callback returns false, then just drop the callback.
125                let mut this = self.internal.lock();
126                if this.clear_dropped_state(&key, subscription_id) || !alive {
127                    continue;
128                }
129
130                this.callbacks
131                    .entry(key)
132                    .or_default()
133                    .insert(subscription_id, callback);
134            }
135        }
136    }
137}
138
139impl<K: Clone + Hash + Eq, F> Subscription<K, F> {
140    pub fn id(&self) -> usize {
141        self.id
142    }
143
144    pub fn detach(&mut self) {
145        self.mapping.take();
146    }
147}
148
149impl<K: Clone + Hash + Eq, F> Drop for Subscription<K, F> {
150    fn drop(&mut self) {
151        if let Some(mapping) = self.mapping.as_ref().and_then(|mapping| mapping.upgrade()) {
152            let mut mapping = mapping.lock();
153
154            // If the callback is present in the mapping, then just remove it.
155            if let Some(callbacks) = mapping.callbacks.get_mut(&self.key) {
156                let callback = callbacks.remove(&self.id);
157                if callback.is_some() {
158                    drop(mapping);
159                    drop(callback);
160                    return;
161                }
162            }
163
164            // If this subscription's callback is not present, then either it has been
165            // temporarily removed during emit, or it has not yet been added. Record
166            // that this subscription has been dropped so that the callback can be
167            // removed later.
168            mapping
169                .dropped_subscriptions
170                .entry(self.key.clone())
171                .or_default()
172                .insert(self.id);
173        }
174    }
175}