callback_collection.rs

  1use std::sync::Arc;
  2use std::{hash::Hash, sync::Weak};
  3
  4use parking_lot::Mutex;
  5
  6use collections::{BTreeMap, HashMap, HashSet};
  7
  8use crate::MutableAppContext;
  9
 10// Problem 5: Current bug callbacks currently called many times after being dropped
 11// update
 12//   notify
 13//   observe (push effect)
 14//     subscription {id : 5}
 15//     pending: [Effect::Notify, Effect::observe { id: 5 }]
 16//   drop observation subscription (write None into subscriptions)
 17// flush effects
 18//   notify
 19//   observe
 20// Problem 6: Key-value pair is leaked if you drop a callback while calling it, and then never call that set of callbacks again
 21// -----------------
 22// Problem 1: Many very similar subscription enum variants
 23// Problem 2: Subscriptions and CallbackCollections use a shared mutex to update the callback status
 24// Problem 3: Current implementation is error prone with regard to uninitialized callbacks or dropping during callback
 25// Problem 4: Calling callbacks requires removing all of them from the list and adding them back
 26
 27// Solution 1 CallbackState:
 28//      Add more state to the CallbackCollection map to communicate dropped and initialized status
 29//      Solves: P5
 30// Solution 2 DroppedSubscriptionList:
 31//      Store a parallel set of dropped subscriptions in the Mapping which stores the key and subscription id for all dropped subscriptions
 32//      which can be
 33// Solution 3 GarbageCollection:
 34//      Use some type of traditional garbage collection to handle dropping of callbacks
 35//          atomic flag per callback which is looped over in remove dropped entities
 36
 37// TODO:
 38//   - Move subscription id counter to CallbackCollection
 39//   - Consider adding a reverse map in Mapping from subscription id to key so that the dropped subscriptions
 40//     can be a hashset of usize and the Subscription doesn't need the key
 41//   - Investigate why the remaining two types of callback lists can't use the same callback collection and subscriptions
 42pub struct Subscription<K: Clone + Hash + Eq, F> {
 43    key: K,
 44    id: usize,
 45    mapping: Option<Weak<Mutex<Mapping<K, F>>>>,
 46}
 47
 48struct Mapping<K, F> {
 49    callbacks: HashMap<K, BTreeMap<usize, F>>,
 50    dropped_subscriptions: HashSet<(K, usize)>,
 51}
 52
 53impl<K, F> Default for Mapping<K, F> {
 54    fn default() -> Self {
 55        Self {
 56            callbacks: Default::default(),
 57            dropped_subscriptions: Default::default(),
 58        }
 59    }
 60}
 61
 62pub(crate) struct CallbackCollection<K: Clone + Hash + Eq, F> {
 63    internal: Arc<Mutex<Mapping<K, F>>>,
 64}
 65
 66impl<K: Clone + Hash + Eq, F> Clone for CallbackCollection<K, F> {
 67    fn clone(&self) -> Self {
 68        Self {
 69            internal: self.internal.clone(),
 70        }
 71    }
 72}
 73
 74impl<K: Clone + Hash + Eq + Copy, F> Default for CallbackCollection<K, F> {
 75    fn default() -> Self {
 76        CallbackCollection {
 77            internal: Arc::new(Mutex::new(Default::default())),
 78        }
 79    }
 80}
 81
 82impl<K: Clone + Hash + Eq + Copy, F> CallbackCollection<K, F> {
 83    #[cfg(test)]
 84    pub fn is_empty(&self) -> bool {
 85        self.internal.lock().callbacks.is_empty()
 86    }
 87
 88    pub fn subscribe(&mut self, key: K, subscription_id: usize) -> Subscription<K, F> {
 89        Subscription {
 90            key,
 91            id: subscription_id,
 92            mapping: Some(Arc::downgrade(&self.internal)),
 93        }
 94    }
 95
 96    pub fn count(&mut self, key: K) -> usize {
 97        self.internal
 98            .lock()
 99            .callbacks
100            .get(&key)
101            .map_or(0, |callbacks| callbacks.len())
102    }
103
104    pub fn add_callback(&mut self, key: K, subscription_id: usize, callback: F) {
105        let mut this = self.internal.lock();
106        if !this.dropped_subscriptions.contains(&(key, subscription_id)) {
107            this.callbacks
108                .entry(key)
109                .or_default()
110                .insert(subscription_id, callback);
111        }
112    }
113
114    pub fn remove(&mut self, key: K) {
115        self.internal.lock().callbacks.remove(&key);
116    }
117
118    pub fn emit<C: FnMut(&mut F, &mut MutableAppContext) -> bool>(
119        &mut self,
120        key: K,
121        cx: &mut MutableAppContext,
122        mut call_callback: C,
123    ) {
124        let callbacks = self.internal.lock().callbacks.remove(&key);
125        if let Some(callbacks) = callbacks {
126            for (subscription_id, mut callback) in callbacks {
127                if !self
128                    .internal
129                    .lock()
130                    .dropped_subscriptions
131                    .contains(&(key, subscription_id))
132                {
133                    if call_callback(&mut callback, cx) {
134                        self.add_callback(key, subscription_id, callback);
135                    }
136                }
137            }
138        }
139    }
140
141    pub fn gc(&mut self) {
142        let mut this = self.internal.lock();
143
144        for (key, id) in std::mem::take(&mut this.dropped_subscriptions) {
145            if let Some(callbacks) = this.callbacks.get_mut(&key) {
146                callbacks.remove(&id);
147            }
148        }
149    }
150}
151
152impl<K: Clone + Hash + Eq, F> Subscription<K, F> {
153    pub fn detach(&mut self) {
154        self.mapping.take();
155    }
156}
157
158impl<K: Clone + Hash + Eq, F> Drop for Subscription<K, F> {
159    // If the callback has been initialized (no callback in the list for the key and id),
160    // add this subscription id and key to the dropped subscriptions list
161    // Otherwise, just remove the associated callback from the callback collection
162    fn drop(&mut self) {
163        if let Some(mapping) = self.mapping.as_ref().and_then(|mapping| mapping.upgrade()) {
164            let mut mapping = mapping.lock();
165            if let Some(callbacks) = mapping.callbacks.get_mut(&self.key) {
166                if callbacks.remove(&self.id).is_some() {
167                    return;
168                }
169            }
170            mapping
171                .dropped_subscriptions
172                .insert((self.key.clone(), self.id));
173        }
174    }
175}