@@ -14,7 +14,7 @@ impl<EmitterKey, Callback> Clone for SubscriberSet<EmitterKey, Callback> {
}
struct SubscriberSetState<EmitterKey, Callback> {
- subscribers: BTreeMap<EmitterKey, BTreeMap<usize, Callback>>,
+ subscribers: BTreeMap<EmitterKey, Option<BTreeMap<usize, Callback>>>,
dropped_subscribers: BTreeSet<(EmitterKey, usize)>,
next_subscriber_id: usize,
}
@@ -38,12 +38,18 @@ where
lock.subscribers
.entry(emitter_key.clone())
.or_default()
+ .insert(Default::default())
.insert(subscriber_id, callback);
let this = self.0.clone();
Subscription {
unsubscribe: Some(Box::new(move || {
let mut lock = this.lock();
- if let Some(subscribers) = lock.subscribers.get_mut(&emitter_key) {
+ let Some(subscribers) = lock.subscribers.get_mut(&emitter_key) else {
+ // remove was called with this emitter_key
+ return;
+ };
+
+ if let Some(subscribers) = subscribers {
subscribers.remove(&subscriber_id);
if subscribers.is_empty() {
lock.subscribers.remove(&emitter_key);
@@ -62,34 +68,43 @@ where
pub fn remove(&self, emitter: &EmitterKey) -> impl IntoIterator<Item = Callback> {
let subscribers = self.0.lock().subscribers.remove(&emitter);
- subscribers.unwrap_or_default().into_values()
+ subscribers
+ .unwrap_or_default()
+ .map(|s| s.into_values())
+ .into_iter()
+ .flatten()
}
pub fn retain<F>(&self, emitter: &EmitterKey, mut f: F)
where
F: FnMut(&mut Callback) -> bool,
{
- let entry = self.0.lock().subscribers.remove_entry(emitter);
- if let Some((emitter, mut subscribers)) = entry {
- subscribers.retain(|_, callback| f(callback));
- let mut lock = self.0.lock();
-
- // Add any new subscribers that were added while invoking the callback.
- if let Some(new_subscribers) = lock.subscribers.remove(&emitter) {
- subscribers.extend(new_subscribers);
- }
-
- // Remove any dropped subscriptions that were dropped while invoking the callback.
- for (dropped_emitter, dropped_subscription_id) in
- mem::take(&mut lock.dropped_subscribers)
- {
- debug_assert_eq!(emitter, dropped_emitter);
- subscribers.remove(&dropped_subscription_id);
- }
-
- if !subscribers.is_empty() {
- lock.subscribers.insert(emitter, subscribers);
- }
+ let Some(mut subscribers) = self
+ .0
+ .lock()
+ .subscribers
+ .get_mut(emitter)
+ .and_then(|s| s.take())
+ else {
+ return;
+ };
+
+ subscribers.retain(|_, callback| f(callback));
+ let mut lock = self.0.lock();
+
+ // Add any new subscribers that were added while invoking the callback.
+ if let Some(Some(new_subscribers)) = lock.subscribers.remove(&emitter) {
+ subscribers.extend(new_subscribers);
+ }
+
+ // Remove any dropped subscriptions that were dropped while invoking the callback.
+ for (dropped_emitter, dropped_subscription_id) in mem::take(&mut lock.dropped_subscribers) {
+ debug_assert_eq!(*emitter, dropped_emitter);
+ subscribers.remove(&dropped_subscription_id);
+ }
+
+ if !subscribers.is_empty() {
+ lock.subscribers.insert(emitter.clone(), Some(subscribers));
}
}
}