1use collections::{BTreeMap, BTreeSet};
2use parking_lot::Mutex;
3use std::{fmt::Debug, mem, sync::Arc};
4use util::post_inc;
5
6pub(crate) struct SubscriberSet<EmitterKey, Callback>(
7 Arc<Mutex<SubscriberSetState<EmitterKey, Callback>>>,
8);
9
10impl<EmitterKey, Callback> Clone for SubscriberSet<EmitterKey, Callback> {
11 fn clone(&self) -> Self {
12 SubscriberSet(self.0.clone())
13 }
14}
15
16struct SubscriberSetState<EmitterKey, Callback> {
17 subscribers: BTreeMap<EmitterKey, BTreeMap<usize, Callback>>,
18 dropped_subscribers: BTreeSet<(EmitterKey, usize)>,
19 next_subscriber_id: usize,
20}
21
22impl<EmitterKey, Callback> SubscriberSet<EmitterKey, Callback>
23where
24 EmitterKey: 'static + Send + Ord + Clone + Debug,
25 Callback: 'static + Send,
26{
27 pub fn new() -> Self {
28 Self(Arc::new(Mutex::new(SubscriberSetState {
29 subscribers: Default::default(),
30 dropped_subscribers: Default::default(),
31 next_subscriber_id: 0,
32 })))
33 }
34
35 pub fn insert(&self, emitter_key: EmitterKey, callback: Callback) -> Subscription {
36 let mut lock = self.0.lock();
37 let subscriber_id = post_inc(&mut lock.next_subscriber_id);
38 lock.subscribers
39 .entry(emitter_key.clone())
40 .or_default()
41 .insert(subscriber_id, callback);
42 let this = self.0.clone();
43 Subscription {
44 unsubscribe: Some(Box::new(move || {
45 let mut lock = this.lock();
46 if let Some(subscribers) = lock.subscribers.get_mut(&emitter_key) {
47 subscribers.remove(&subscriber_id);
48 if subscribers.is_empty() {
49 lock.subscribers.remove(&emitter_key);
50 return;
51 }
52 }
53
54 // We didn't manage to remove the subscription, which means it was dropped
55 // while invoking the callback. Mark it as dropped so that we can remove it
56 // later.
57 lock.dropped_subscribers
58 .insert((emitter_key, subscriber_id));
59 })),
60 }
61 }
62
63 pub fn remove(&self, emitter: &EmitterKey) -> impl IntoIterator<Item = Callback> {
64 let subscribers = self.0.lock().subscribers.remove(&emitter);
65 subscribers.unwrap_or_default().into_values()
66 }
67
68 pub fn retain<F>(&self, emitter: &EmitterKey, mut f: F)
69 where
70 F: FnMut(&mut Callback) -> bool,
71 {
72 let entry = self.0.lock().subscribers.remove_entry(emitter);
73 if let Some((emitter, mut subscribers)) = entry {
74 subscribers.retain(|_, callback| f(callback));
75 let mut lock = self.0.lock();
76
77 // Add any new subscribers that were added while invoking the callback.
78 if let Some(new_subscribers) = lock.subscribers.remove(&emitter) {
79 subscribers.extend(new_subscribers);
80 }
81
82 // Remove any dropped subscriptions that were dropped while invoking the callback.
83 for (dropped_emitter, dropped_subscription_id) in
84 mem::take(&mut lock.dropped_subscribers)
85 {
86 debug_assert_eq!(emitter, dropped_emitter);
87 subscribers.remove(&dropped_subscription_id);
88 }
89
90 if !subscribers.is_empty() {
91 lock.subscribers.insert(emitter, subscribers);
92 }
93 }
94 }
95}
96
97#[must_use]
98pub struct Subscription {
99 unsubscribe: Option<Box<dyn FnOnce() + Send + 'static>>,
100}
101
102impl Subscription {
103 pub fn detach(mut self) {
104 self.unsubscribe.take();
105 }
106}
107
108impl Drop for Subscription {
109 fn drop(&mut self) {
110 if let Some(unsubscribe) = self.unsubscribe.take() {
111 unsubscribe();
112 }
113 }
114}