1use collections::{BTreeMap, BTreeSet};
2use parking_lot::Mutex;
3use std::{cell::Cell, fmt::Debug, mem, rc::Rc, sync::Arc};
4use util::post_inc;
5
6pub(crate) struct SubscriberSet<EmitterKey, Callback>(
7 Arc<Mutex<SubscriberSetState<EmitterKey, Callback>>>,
8);
9
10impl<EmitterKey, Callback> Clone for SubscriberSet<EmitterKey, Callback> {
11 fn clone(&self) -> Self {
12 SubscriberSet(self.0.clone())
13 }
14}
15
16struct SubscriberSetState<EmitterKey, Callback> {
17 subscribers: BTreeMap<EmitterKey, Option<BTreeMap<usize, Subscriber<Callback>>>>,
18 dropped_subscribers: BTreeSet<(EmitterKey, usize)>,
19 next_subscriber_id: usize,
20}
21
22struct Subscriber<Callback> {
23 active: Rc<Cell<bool>>,
24 callback: Callback,
25}
26
27impl<EmitterKey, Callback> SubscriberSet<EmitterKey, Callback>
28where
29 EmitterKey: 'static + Ord + Clone + Debug,
30 Callback: 'static,
31{
32 pub fn new() -> Self {
33 Self(Arc::new(Mutex::new(SubscriberSetState {
34 subscribers: Default::default(),
35 dropped_subscribers: Default::default(),
36 next_subscriber_id: 0,
37 })))
38 }
39
40 /// Inserts a new [`Subscription`] for the given `emitter_key`. By default, subscriptions
41 /// are inert, meaning that they won't be listed when calling `[SubscriberSet::remove]` or `[SubscriberSet::retain]`.
42 /// This method returns a tuple of a [`Subscription`] and an `impl FnOnce`, and you can use the latter
43 /// to activate the [`Subscription`].
44 pub fn insert(
45 &self,
46 emitter_key: EmitterKey,
47 callback: Callback,
48 ) -> (Subscription, impl FnOnce() + use<EmitterKey, Callback>) {
49 let active = Rc::new(Cell::new(false));
50 let mut lock = self.0.lock();
51 let subscriber_id = post_inc(&mut lock.next_subscriber_id);
52 lock.subscribers
53 .entry(emitter_key.clone())
54 .or_default()
55 .get_or_insert_with(Default::default)
56 .insert(
57 subscriber_id,
58 Subscriber {
59 active: active.clone(),
60 callback,
61 },
62 );
63 let this = self.0.clone();
64
65 let subscription = Subscription {
66 unsubscribe: Some(Box::new(move || {
67 let mut lock = this.lock();
68 let Some(subscribers) = lock.subscribers.get_mut(&emitter_key) else {
69 // remove was called with this emitter_key
70 return;
71 };
72
73 if let Some(subscribers) = subscribers {
74 subscribers.remove(&subscriber_id);
75 if subscribers.is_empty() {
76 lock.subscribers.remove(&emitter_key);
77 }
78 return;
79 }
80
81 // We didn't manage to remove the subscription, which means it was dropped
82 // while invoking the callback. Mark it as dropped so that we can remove it
83 // later.
84 lock.dropped_subscribers
85 .insert((emitter_key, subscriber_id));
86 })),
87 };
88 (subscription, move || active.set(true))
89 }
90
91 pub fn remove(
92 &self,
93 emitter: &EmitterKey,
94 ) -> impl IntoIterator<Item = Callback> + use<EmitterKey, Callback> {
95 let subscribers = self.0.lock().subscribers.remove(emitter);
96 subscribers
97 .unwrap_or_default()
98 .map(|s| s.into_values())
99 .into_iter()
100 .flatten()
101 .filter_map(|subscriber| {
102 if subscriber.active.get() {
103 Some(subscriber.callback)
104 } else {
105 None
106 }
107 })
108 }
109
110 /// Call the given callback for each subscriber to the given emitter.
111 /// If the callback returns false, the subscriber is removed.
112 pub fn retain<F>(&self, emitter: &EmitterKey, mut f: F)
113 where
114 F: FnMut(&mut Callback) -> bool,
115 {
116 let Some(mut subscribers) = self
117 .0
118 .lock()
119 .subscribers
120 .get_mut(emitter)
121 .and_then(|s| s.take())
122 else {
123 return;
124 };
125
126 subscribers.retain(|_, subscriber| {
127 if subscriber.active.get() {
128 f(&mut subscriber.callback)
129 } else {
130 true
131 }
132 });
133 let mut lock = self.0.lock();
134
135 // Add any new subscribers that were added while invoking the callback.
136 if let Some(Some(new_subscribers)) = lock.subscribers.remove(emitter) {
137 subscribers.extend(new_subscribers);
138 }
139
140 // Remove any dropped subscriptions that were dropped while invoking the callback.
141 for (dropped_emitter, dropped_subscription_id) in mem::take(&mut lock.dropped_subscribers) {
142 debug_assert_eq!(*emitter, dropped_emitter);
143 subscribers.remove(&dropped_subscription_id);
144 }
145
146 if !subscribers.is_empty() {
147 lock.subscribers.insert(emitter.clone(), Some(subscribers));
148 }
149 }
150}
151
152/// A handle to a subscription created by GPUI. When dropped, the subscription
153/// is cancelled and the callback will no longer be invoked.
154#[must_use]
155pub struct Subscription {
156 unsubscribe: Option<Box<dyn FnOnce() + 'static>>,
157}
158
159impl Subscription {
160 /// Creates a new subscription with a callback that gets invoked when
161 /// this subscription is dropped.
162 pub fn new(unsubscribe: impl 'static + FnOnce()) -> Self {
163 Self {
164 unsubscribe: Some(Box::new(unsubscribe)),
165 }
166 }
167
168 /// Detaches the subscription from this handle. The callback will
169 /// continue to be invoked until the entities it has been
170 /// subscribed to are dropped
171 pub fn detach(mut self) {
172 self.unsubscribe.take();
173 }
174
175 /// Joins two subscriptions into a single subscription. Detach will
176 /// detach both interior subscriptions.
177 pub fn join(mut subscription_a: Self, mut subscription_b: Self) -> Self {
178 let a_unsubscribe = subscription_a.unsubscribe.take();
179 let b_unsubscribe = subscription_b.unsubscribe.take();
180 Self {
181 unsubscribe: Some(Box::new(move || {
182 if let Some(self_unsubscribe) = a_unsubscribe {
183 self_unsubscribe();
184 }
185 if let Some(other_unsubscribe) = b_unsubscribe {
186 other_unsubscribe();
187 }
188 })),
189 }
190 }
191}
192
193impl Drop for Subscription {
194 fn drop(&mut self) {
195 if let Some(unsubscribe) = self.unsubscribe.take() {
196 unsubscribe();
197 }
198 }
199}