1use collections::{BTreeMap, BTreeSet};
2use parking_lot::Mutex;
3use std::{cell::Cell, fmt::Debug, mem, rc::Rc, sync::Arc};
4use util::post_inc;
5
6pub(crate) struct SubscriberSet<EmitterKey, Callback>(
7 Arc<Mutex<SubscriberSetState<EmitterKey, Callback>>>,
8);
9
10impl<EmitterKey, Callback> Clone for SubscriberSet<EmitterKey, Callback> {
11 fn clone(&self) -> Self {
12 SubscriberSet(self.0.clone())
13 }
14}
15
16struct SubscriberSetState<EmitterKey, Callback> {
17 subscribers: BTreeMap<EmitterKey, Option<BTreeMap<usize, Subscriber<Callback>>>>,
18 dropped_subscribers: BTreeSet<(EmitterKey, usize)>,
19 next_subscriber_id: usize,
20}
21
22struct Subscriber<Callback> {
23 active: Rc<Cell<bool>>,
24 callback: Callback,
25}
26
27impl<EmitterKey, Callback> SubscriberSet<EmitterKey, Callback>
28where
29 EmitterKey: 'static + Ord + Clone + Debug,
30 Callback: 'static,
31{
32 pub fn new() -> Self {
33 Self(Arc::new(Mutex::new(SubscriberSetState {
34 subscribers: Default::default(),
35 dropped_subscribers: Default::default(),
36 next_subscriber_id: 0,
37 })))
38 }
39
40 /// Inserts a new [`Subscription`] for the given `emitter_key`. By default, subscriptions
41 /// are inert, meaning that they won't be listed when calling `[SubscriberSet::remove]` or `[SubscriberSet::retain]`.
42 /// This method returns a tuple of a [`Subscription`] and an `impl FnOnce`, and you can use the latter
43 /// to activate the [`Subscription`].
44 #[must_use]
45 pub fn insert(
46 &self,
47 emitter_key: EmitterKey,
48 callback: Callback,
49 ) -> (Subscription, impl FnOnce()) {
50 let active = Rc::new(Cell::new(false));
51 let mut lock = self.0.lock();
52 let subscriber_id = post_inc(&mut lock.next_subscriber_id);
53 lock.subscribers
54 .entry(emitter_key.clone())
55 .or_default()
56 .get_or_insert_with(Default::default)
57 .insert(
58 subscriber_id,
59 Subscriber {
60 active: active.clone(),
61 callback,
62 },
63 );
64 let this = self.0.clone();
65
66 let subscription = Subscription {
67 unsubscribe: Some(Box::new(move || {
68 let mut lock = this.lock();
69 let Some(subscribers) = lock.subscribers.get_mut(&emitter_key) else {
70 // remove was called with this emitter_key
71 return;
72 };
73
74 if let Some(subscribers) = subscribers {
75 subscribers.remove(&subscriber_id);
76 if subscribers.is_empty() {
77 lock.subscribers.remove(&emitter_key);
78 }
79 return;
80 }
81
82 // We didn't manage to remove the subscription, which means it was dropped
83 // while invoking the callback. Mark it as dropped so that we can remove it
84 // later.
85 lock.dropped_subscribers
86 .insert((emitter_key, subscriber_id));
87 })),
88 };
89 (subscription, move || active.set(true))
90 }
91
92 pub fn remove(&self, emitter: &EmitterKey) -> impl IntoIterator<Item = Callback> {
93 let subscribers = self.0.lock().subscribers.remove(emitter);
94 subscribers
95 .unwrap_or_default()
96 .map(|s| s.into_values())
97 .into_iter()
98 .flatten()
99 .filter_map(|subscriber| {
100 if subscriber.active.get() {
101 Some(subscriber.callback)
102 } else {
103 None
104 }
105 })
106 }
107
108 /// Call the given callback for each subscriber to the given emitter.
109 /// If the callback returns false, the subscriber is removed.
110 pub fn retain<F>(&self, emitter: &EmitterKey, mut f: F)
111 where
112 F: FnMut(&mut Callback) -> bool,
113 {
114 let Some(mut subscribers) = self
115 .0
116 .lock()
117 .subscribers
118 .get_mut(emitter)
119 .and_then(|s| s.take())
120 else {
121 return;
122 };
123
124 subscribers.retain(|_, subscriber| {
125 if subscriber.active.get() {
126 f(&mut subscriber.callback)
127 } else {
128 true
129 }
130 });
131 let mut lock = self.0.lock();
132
133 // Add any new subscribers that were added while invoking the callback.
134 if let Some(Some(new_subscribers)) = lock.subscribers.remove(emitter) {
135 subscribers.extend(new_subscribers);
136 }
137
138 // Remove any dropped subscriptions that were dropped while invoking the callback.
139 for (dropped_emitter, dropped_subscription_id) in mem::take(&mut lock.dropped_subscribers) {
140 debug_assert_eq!(*emitter, dropped_emitter);
141 subscribers.remove(&dropped_subscription_id);
142 }
143
144 if !subscribers.is_empty() {
145 lock.subscribers.insert(emitter.clone(), Some(subscribers));
146 }
147 }
148}
149
150#[must_use]
151pub struct Subscription {
152 unsubscribe: Option<Box<dyn FnOnce() + 'static>>,
153}
154
155impl Subscription {
156 pub fn detach(mut self) {
157 self.unsubscribe.take();
158 }
159}
160
161impl Drop for Subscription {
162 fn drop(&mut self) {
163 if let Some(unsubscribe) = self.unsubscribe.take() {
164 unsubscribe();
165 }
166 }
167}