callback_collection.rs

  1use crate::AppContext;
  2use collections::{BTreeMap, HashMap, HashSet};
  3use parking_lot::Mutex;
  4use std::sync::Arc;
  5use std::{hash::Hash, sync::Weak};
  6
  7pub struct CallbackCollection<K: Clone + Hash + Eq, F> {
  8    internal: Arc<Mutex<Mapping<K, F>>>,
  9}
 10
 11pub struct Subscription<K: Clone + Hash + Eq, F> {
 12    key: K,
 13    id: usize,
 14    mapping: Option<Weak<Mutex<Mapping<K, F>>>>,
 15}
 16
 17struct Mapping<K, F> {
 18    callbacks: HashMap<K, BTreeMap<usize, F>>,
 19    dropped_subscriptions: HashMap<K, HashSet<usize>>,
 20}
 21
 22impl<K: Hash + Eq, F> Mapping<K, F> {
 23    fn clear_dropped_state(&mut self, key: &K, subscription_id: usize) -> bool {
 24        if let Some(subscriptions) = self.dropped_subscriptions.get_mut(&key) {
 25            subscriptions.remove(&subscription_id)
 26        } else {
 27            false
 28        }
 29    }
 30}
 31
 32impl<K, F> Default for Mapping<K, F> {
 33    fn default() -> Self {
 34        Self {
 35            callbacks: Default::default(),
 36            dropped_subscriptions: Default::default(),
 37        }
 38    }
 39}
 40
 41impl<K: Clone + Hash + Eq, F> Clone for CallbackCollection<K, F> {
 42    fn clone(&self) -> Self {
 43        Self {
 44            internal: self.internal.clone(),
 45        }
 46    }
 47}
 48
 49impl<K: Clone + Hash + Eq + Copy, F> Default for CallbackCollection<K, F> {
 50    fn default() -> Self {
 51        CallbackCollection {
 52            internal: Arc::new(Mutex::new(Default::default())),
 53        }
 54    }
 55}
 56
 57impl<K: Clone + Hash + Eq + Copy, F> CallbackCollection<K, F> {
 58    #[cfg(test)]
 59    pub fn is_empty(&self) -> bool {
 60        self.internal.lock().callbacks.is_empty()
 61    }
 62
 63    pub fn subscribe(&mut self, key: K, subscription_id: usize) -> Subscription<K, F> {
 64        Subscription {
 65            key,
 66            id: subscription_id,
 67            mapping: Some(Arc::downgrade(&self.internal)),
 68        }
 69    }
 70
 71    pub fn add_callback(&mut self, key: K, subscription_id: usize, callback: F) {
 72        let mut this = self.internal.lock();
 73
 74        // If this callback's subscription was dropped before the callback was
 75        // added, then just drop the callback.
 76        if this.clear_dropped_state(&key, subscription_id) {
 77            return;
 78        }
 79
 80        this.callbacks
 81            .entry(key)
 82            .or_default()
 83            .insert(subscription_id, callback);
 84    }
 85
 86    pub fn remove(&mut self, key: K) {
 87        // Drop these callbacks after releasing the lock, in case one of them
 88        // owns a subscription to this callback collection.
 89        let mut this = self.internal.lock();
 90        let callbacks = this.callbacks.remove(&key);
 91        this.dropped_subscriptions.remove(&key);
 92        drop(this);
 93        drop(callbacks);
 94    }
 95
 96    pub fn emit<C: FnMut(&mut F, &mut AppContext) -> bool>(
 97        &mut self,
 98        key: K,
 99        cx: &mut AppContext,
100        mut call_callback: C,
101    ) {
102        let callbacks = self.internal.lock().callbacks.remove(&key);
103        if let Some(callbacks) = callbacks {
104            for (subscription_id, mut callback) in callbacks {
105                // If this callback's subscription was dropped while invoking an
106                // earlier callback, then just drop the callback.
107                let mut this = self.internal.lock();
108                if this.clear_dropped_state(&key, subscription_id) {
109                    continue;
110                }
111
112                drop(this);
113                let alive = call_callback(&mut callback, cx);
114
115                // If this callback's subscription was dropped while invoking the callback
116                // itself, or if the callback returns false, then just drop the callback.
117                let mut this = self.internal.lock();
118                if this.clear_dropped_state(&key, subscription_id) || !alive {
119                    continue;
120                }
121
122                this.callbacks
123                    .entry(key)
124                    .or_default()
125                    .insert(subscription_id, callback);
126            }
127        }
128    }
129}
130
131impl<K: Clone + Hash + Eq, F> Subscription<K, F> {
132    pub fn id(&self) -> usize {
133        self.id
134    }
135
136    pub fn detach(&mut self) {
137        self.mapping.take();
138    }
139}
140
141impl<K: Clone + Hash + Eq, F> Drop for Subscription<K, F> {
142    fn drop(&mut self) {
143        if let Some(mapping) = self.mapping.as_ref().and_then(|mapping| mapping.upgrade()) {
144            let mut mapping = mapping.lock();
145
146            // If the callback is present in the mapping, then just remove it.
147            if let Some(callbacks) = mapping.callbacks.get_mut(&self.key) {
148                let callback = callbacks.remove(&self.id);
149                if callback.is_some() {
150                    drop(mapping);
151                    drop(callback);
152                    return;
153                }
154            }
155
156            // If this subscription's callback is not present, then either it has been
157            // temporarily removed during emit, or it has not yet been added. Record
158            // that this subscription has been dropped so that the callback can be
159            // removed later.
160            mapping
161                .dropped_subscriptions
162                .entry(self.key.clone())
163                .or_default()
164                .insert(self.id);
165        }
166    }
167}