@@ -16,7 +16,17 @@ pub struct Subscription<K: Clone + Hash + Eq, F> {
struct Mapping<K, F> {
callbacks: HashMap<K, BTreeMap<usize, F>>,
- dropped_subscriptions: HashSet<(K, usize)>,
+ dropped_subscriptions: HashMap<K, HashSet<usize>>,
+}
+
+impl<K: Hash + Eq, F> Mapping<K, F> {
+ fn clear_dropped_state(&mut self, key: &K, subscription_id: usize) -> bool {
+ if let Some(subscriptions) = self.dropped_subscriptions.get_mut(&key) {
+ subscriptions.remove(&subscription_id)
+ } else {
+ false
+ }
+ }
}
impl<K, F> Default for Mapping<K, F> {
@@ -50,6 +60,14 @@ impl<K: Clone + Hash + Eq + Copy, F> CallbackCollection<K, F> {
self.internal.lock().callbacks.is_empty()
}
+ pub fn count(&self, key: K) -> usize {
+ self.internal
+ .lock()
+ .callbacks
+ .get(&key)
+ .map_or(0, |callbacks| callbacks.len())
+ }
+
pub fn subscribe(&mut self, key: K, subscription_id: usize) -> Subscription<K, F> {
Subscription {
key,
@@ -63,7 +81,7 @@ impl<K: Clone + Hash + Eq + Copy, F> CallbackCollection<K, F> {
// If this callback's subscription was dropped before the callback was
// added, then just drop the callback.
- if this.dropped_subscriptions.remove(&(key, subscription_id)) {
+ if this.clear_dropped_state(&key, subscription_id) {
return;
}
@@ -74,10 +92,12 @@ impl<K: Clone + Hash + Eq + Copy, F> CallbackCollection<K, F> {
}
pub fn remove(&mut self, key: K) {
- let callbacks = self.internal.lock().callbacks.remove(&key);
-
// Drop these callbacks after releasing the lock, in case one of them
// owns a subscription to this callback collection.
+ let mut this = self.internal.lock();
+ let callbacks = this.callbacks.remove(&key);
+ this.dropped_subscriptions.remove(&key);
+ drop(this);
drop(callbacks);
}
@@ -91,29 +111,26 @@ impl<K: Clone + Hash + Eq + Copy, F> CallbackCollection<K, F> {
if let Some(callbacks) = callbacks {
for (subscription_id, mut callback) in callbacks {
// If this callback's subscription was dropped while invoking an
- // earlier callback, then just drop this callback.
- if self
- .internal
- .lock()
- .dropped_subscriptions
- .remove(&(key, subscription_id))
- {
+ // earlier callback, then just drop the callback.
+ let mut this = self.internal.lock();
+ if this.clear_dropped_state(&key, subscription_id) {
continue;
}
- if call_callback(&mut callback, cx) {
- self.add_callback(key, subscription_id, callback);
- }
- }
- }
- }
+ drop(this);
+ let alive = call_callback(&mut callback, cx);
- pub fn gc(&mut self) {
- let mut this = self.internal.lock();
+ // If this callback's subscription was dropped while invoking the callback
+ // itself, or if the callback returns false, then just drop the callback.
+ let mut this = self.internal.lock();
+ if this.clear_dropped_state(&key, subscription_id) || !alive {
+ continue;
+ }
- for (key, id) in std::mem::take(&mut this.dropped_subscriptions) {
- if let Some(callbacks) = this.callbacks.get_mut(&key) {
- callbacks.remove(&id);
+ this.callbacks
+ .entry(key)
+ .or_default()
+ .insert(subscription_id, callback);
}
}
}
@@ -150,7 +167,9 @@ impl<K: Clone + Hash + Eq, F> Drop for Subscription<K, F> {
// removed later.
mapping
.dropped_subscriptions
- .insert((self.key.clone(), self.id));
+ .entry(self.key.clone())
+ .or_default()
+ .insert(self.id);
}
}
}