1use collections::{BTreeMap, BTreeSet};
2use parking_lot::Mutex;
3use std::{fmt::Debug, mem, sync::Arc};
4use util::post_inc;
5
6pub(crate) struct SubscriberSet<EmitterKey, Callback>(
7 Arc<Mutex<SubscriberSetState<EmitterKey, Callback>>>,
8);
9
10impl<EmitterKey, Callback> Clone for SubscriberSet<EmitterKey, Callback> {
11 fn clone(&self) -> Self {
12 SubscriberSet(self.0.clone())
13 }
14}
15
16struct SubscriberSetState<EmitterKey, Callback> {
17 subscribers: BTreeMap<EmitterKey, Option<BTreeMap<usize, Callback>>>,
18 dropped_subscribers: BTreeSet<(EmitterKey, usize)>,
19 next_subscriber_id: usize,
20}
21
22impl<EmitterKey, Callback> SubscriberSet<EmitterKey, Callback>
23where
24 EmitterKey: 'static + Ord + Clone + Debug,
25 Callback: 'static,
26{
27 pub fn new() -> Self {
28 Self(Arc::new(Mutex::new(SubscriberSetState {
29 subscribers: Default::default(),
30 dropped_subscribers: Default::default(),
31 next_subscriber_id: 0,
32 })))
33 }
34
35 pub fn insert(&self, emitter_key: EmitterKey, callback: Callback) -> Subscription {
36 let mut lock = self.0.lock();
37 let subscriber_id = post_inc(&mut lock.next_subscriber_id);
38 lock.subscribers
39 .entry(emitter_key.clone())
40 .or_default()
41 .get_or_insert_with(|| Default::default())
42 .insert(subscriber_id, callback);
43 let this = self.0.clone();
44 Subscription {
45 unsubscribe: Some(Box::new(move || {
46 let mut lock = this.lock();
47 let Some(subscribers) = lock.subscribers.get_mut(&emitter_key) else {
48 // remove was called with this emitter_key
49 return;
50 };
51
52 if let Some(subscribers) = subscribers {
53 subscribers.remove(&subscriber_id);
54 if subscribers.is_empty() {
55 lock.subscribers.remove(&emitter_key);
56 }
57 return;
58 }
59
60 // We didn't manage to remove the subscription, which means it was dropped
61 // while invoking the callback. Mark it as dropped so that we can remove it
62 // later.
63 lock.dropped_subscribers
64 .insert((emitter_key, subscriber_id));
65 })),
66 }
67 }
68
69 pub fn remove(&self, emitter: &EmitterKey) -> impl IntoIterator<Item = Callback> {
70 let subscribers = self.0.lock().subscribers.remove(&emitter);
71 subscribers
72 .unwrap_or_default()
73 .map(|s| s.into_values())
74 .into_iter()
75 .flatten()
76 }
77
78 pub fn retain<F>(&self, emitter: &EmitterKey, mut f: F)
79 where
80 F: FnMut(&mut Callback) -> bool,
81 {
82 let Some(mut subscribers) = self
83 .0
84 .lock()
85 .subscribers
86 .get_mut(emitter)
87 .and_then(|s| s.take())
88 else {
89 return;
90 };
91
92 subscribers.retain(|_, callback| f(callback));
93 let mut lock = self.0.lock();
94
95 // Add any new subscribers that were added while invoking the callback.
96 if let Some(Some(new_subscribers)) = lock.subscribers.remove(&emitter) {
97 subscribers.extend(new_subscribers);
98 }
99
100 // Remove any dropped subscriptions that were dropped while invoking the callback.
101 for (dropped_emitter, dropped_subscription_id) in mem::take(&mut lock.dropped_subscribers) {
102 debug_assert_eq!(*emitter, dropped_emitter);
103 subscribers.remove(&dropped_subscription_id);
104 }
105
106 if !subscribers.is_empty() {
107 lock.subscribers.insert(emitter.clone(), Some(subscribers));
108 }
109 }
110}
111
112#[must_use]
113pub struct Subscription {
114 unsubscribe: Option<Box<dyn FnOnce() + 'static>>,
115}
116
117impl Subscription {
118 pub fn detach(mut self) {
119 self.unsubscribe.take();
120 }
121}
122
123impl Drop for Subscription {
124 fn drop(&mut self) {
125 if let Some(unsubscribe) = self.unsubscribe.take() {
126 unsubscribe();
127 }
128 }
129}