1use collections::{BTreeMap, BTreeSet};
2use parking_lot::Mutex;
3use std::{fmt::Debug, mem, sync::Arc};
4use util::post_inc;
5
6pub(crate) struct SubscriberSet<EmitterKey, Callback>(
7 Arc<Mutex<SubscriberSetState<EmitterKey, Callback>>>,
8);
9
10impl<EmitterKey, Callback> Clone for SubscriberSet<EmitterKey, Callback> {
11 fn clone(&self) -> Self {
12 SubscriberSet(self.0.clone())
13 }
14}
15
16struct SubscriberSetState<EmitterKey, Callback> {
17 subscribers: BTreeMap<EmitterKey, Option<BTreeMap<usize, Callback>>>,
18 dropped_subscribers: BTreeSet<(EmitterKey, usize)>,
19 next_subscriber_id: usize,
20}
21
22impl<EmitterKey, Callback> SubscriberSet<EmitterKey, Callback>
23where
24 EmitterKey: 'static + Ord + Clone + Debug,
25 Callback: 'static,
26{
27 pub fn new() -> Self {
28 Self(Arc::new(Mutex::new(SubscriberSetState {
29 subscribers: Default::default(),
30 dropped_subscribers: Default::default(),
31 next_subscriber_id: 0,
32 })))
33 }
34
35 pub fn insert(&self, emitter_key: EmitterKey, callback: Callback) -> Subscription {
36 let mut lock = self.0.lock();
37 let subscriber_id = post_inc(&mut lock.next_subscriber_id);
38 lock.subscribers
39 .entry(emitter_key.clone())
40 .or_default()
41 .get_or_insert_with(|| Default::default())
42 .insert(subscriber_id, callback);
43 let this = self.0.clone();
44 Subscription {
45 unsubscribe: Some(Box::new(move || {
46 let mut lock = this.lock();
47 let Some(subscribers) = lock.subscribers.get_mut(&emitter_key) else {
48 // remove was called with this emitter_key
49 return;
50 };
51
52 if let Some(subscribers) = subscribers {
53 subscribers.remove(&subscriber_id);
54 if subscribers.is_empty() {
55 lock.subscribers.remove(&emitter_key);
56 }
57 return;
58 }
59
60 // We didn't manage to remove the subscription, which means it was dropped
61 // while invoking the callback. Mark it as dropped so that we can remove it
62 // later.
63 lock.dropped_subscribers
64 .insert((emitter_key, subscriber_id));
65 })),
66 }
67 }
68
69 pub fn remove(&self, emitter: &EmitterKey) -> impl IntoIterator<Item = Callback> {
70 let subscribers = self.0.lock().subscribers.remove(&emitter);
71 subscribers
72 .unwrap_or_default()
73 .map(|s| s.into_values())
74 .into_iter()
75 .flatten()
76 }
77
78 /// Call the given callback for each subscriber to the given emitter.
79 /// If the callback returns false, the subscriber is removed.
80 pub fn retain<F>(&self, emitter: &EmitterKey, mut f: F)
81 where
82 F: FnMut(&mut Callback) -> bool,
83 {
84 let Some(mut subscribers) = self
85 .0
86 .lock()
87 .subscribers
88 .get_mut(emitter)
89 .and_then(|s| s.take())
90 else {
91 return;
92 };
93
94 subscribers.retain(|_, callback| f(callback));
95 let mut lock = self.0.lock();
96
97 // Add any new subscribers that were added while invoking the callback.
98 if let Some(Some(new_subscribers)) = lock.subscribers.remove(&emitter) {
99 subscribers.extend(new_subscribers);
100 }
101
102 // Remove any dropped subscriptions that were dropped while invoking the callback.
103 for (dropped_emitter, dropped_subscription_id) in mem::take(&mut lock.dropped_subscribers) {
104 debug_assert_eq!(*emitter, dropped_emitter);
105 subscribers.remove(&dropped_subscription_id);
106 }
107
108 if !subscribers.is_empty() {
109 lock.subscribers.insert(emitter.clone(), Some(subscribers));
110 }
111 }
112}
113
114#[must_use]
115pub struct Subscription {
116 unsubscribe: Option<Box<dyn FnOnce() + 'static>>,
117}
118
119impl Subscription {
120 pub fn detach(mut self) {
121 self.unsubscribe.take();
122 }
123}
124
125impl Drop for Subscription {
126 fn drop(&mut self) {
127 if let Some(unsubscribe) = self.unsubscribe.take() {
128 unsubscribe();
129 }
130 }
131}