1use collections::BTreeMap;
2use gpui_util::post_inc;
3use std::{
4 cell::{Cell, RefCell},
5 fmt::Debug,
6 rc::Rc,
7};
8
9pub(crate) struct SubscriberSet<EmitterKey, Callback>(
10 Rc<RefCell<SubscriberSetState<EmitterKey, Callback>>>,
11);
12
13impl<EmitterKey, Callback> Clone for SubscriberSet<EmitterKey, Callback> {
14 fn clone(&self) -> Self {
15 SubscriberSet(self.0.clone())
16 }
17}
18
19struct SubscriberSetState<EmitterKey, Callback> {
20 subscribers: BTreeMap<EmitterKey, Option<BTreeMap<usize, Subscriber<Callback>>>>,
21 next_subscriber_id: usize,
22}
23
24struct Subscriber<Callback> {
25 active: Rc<Cell<bool>>,
26 dropped: Rc<Cell<bool>>,
27 callback: Callback,
28}
29
30impl<EmitterKey, Callback> SubscriberSet<EmitterKey, Callback>
31where
32 EmitterKey: 'static + Ord + Clone + Debug,
33 Callback: 'static,
34{
35 pub fn new() -> Self {
36 Self(Rc::new(RefCell::new(SubscriberSetState {
37 subscribers: Default::default(),
38 next_subscriber_id: 0,
39 })))
40 }
41
42 /// Inserts a new [`Subscription`] for the given `emitter_key`. By default, subscriptions
43 /// are inert, meaning that they won't be listed when calling `[SubscriberSet::remove]` or `[SubscriberSet::retain]`.
44 /// This method returns a tuple of a [`Subscription`] and an `impl FnOnce`, and you can use the latter
45 /// to activate the [`Subscription`].
46 pub fn insert(
47 &self,
48 emitter_key: EmitterKey,
49 callback: Callback,
50 ) -> (Subscription, impl FnOnce() + use<EmitterKey, Callback>) {
51 let active = Rc::new(Cell::new(false));
52 let dropped = Rc::new(Cell::new(false));
53 let mut lock = self.0.borrow_mut();
54 let subscriber_id = post_inc(&mut lock.next_subscriber_id);
55 lock.subscribers
56 .entry(emitter_key.clone())
57 .or_default()
58 .get_or_insert_with(Default::default)
59 .insert(
60 subscriber_id,
61 Subscriber {
62 active: active.clone(),
63 dropped: dropped.clone(),
64 callback,
65 },
66 );
67 let this = self.0.clone();
68
69 let subscription = Subscription {
70 unsubscribe: Some(Box::new(move || {
71 dropped.set(true);
72
73 let mut lock = this.borrow_mut();
74 let Some(subscribers) = lock.subscribers.get_mut(&emitter_key) else {
75 return;
76 };
77
78 if let Some(subscribers) = subscribers {
79 subscribers.remove(&subscriber_id);
80 if subscribers.is_empty() {
81 lock.subscribers.remove(&emitter_key);
82 }
83 }
84 })),
85 };
86 (subscription, move || active.set(true))
87 }
88
89 pub fn remove(
90 &self,
91 emitter: &EmitterKey,
92 ) -> impl IntoIterator<Item = Callback> + use<EmitterKey, Callback> {
93 let subscribers = self.0.borrow_mut().subscribers.remove(emitter);
94 subscribers
95 .unwrap_or_default()
96 .map(|s| s.into_values())
97 .into_iter()
98 .flatten()
99 .filter_map(|subscriber| {
100 if subscriber.active.get() {
101 Some(subscriber.callback)
102 } else {
103 None
104 }
105 })
106 }
107
108 /// Call the given callback for each subscriber to the given emitter.
109 /// If the callback returns false, the subscriber is removed.
110 pub fn retain<F>(&self, emitter: &EmitterKey, mut f: F)
111 where
112 F: FnMut(&mut Callback) -> bool,
113 {
114 let Some(mut subscribers) = self
115 .0
116 .borrow_mut()
117 .subscribers
118 .get_mut(emitter)
119 .and_then(|s| s.take())
120 else {
121 return;
122 };
123
124 subscribers.retain(|_, subscriber| {
125 if !subscriber.active.get() {
126 return true;
127 }
128 if subscriber.dropped.get() {
129 return false;
130 }
131 let keep = f(&mut subscriber.callback);
132 keep && !subscriber.dropped.get()
133 });
134 let mut lock = self.0.borrow_mut();
135
136 // Add any new subscribers that were added while invoking the callback.
137 if let Some(Some(new_subscribers)) = lock.subscribers.remove(emitter) {
138 subscribers.extend(new_subscribers);
139 }
140
141 if !subscribers.is_empty() {
142 lock.subscribers.insert(emitter.clone(), Some(subscribers));
143 }
144 }
145}
146
147/// A handle to a subscription created by GPUI. When dropped, the subscription
148/// is cancelled and the callback will no longer be invoked.
149#[must_use]
150pub struct Subscription {
151 unsubscribe: Option<Box<dyn FnOnce() + 'static>>,
152}
153
154impl Subscription {
155 /// Creates a new subscription with a callback that gets invoked when
156 /// this subscription is dropped.
157 pub fn new(unsubscribe: impl 'static + FnOnce()) -> Self {
158 Self {
159 unsubscribe: Some(Box::new(unsubscribe)),
160 }
161 }
162
163 /// Detaches the subscription from this handle. The callback will
164 /// continue to be invoked until the entities it has been
165 /// subscribed to are dropped
166 pub fn detach(mut self) {
167 self.unsubscribe.take();
168 }
169
170 /// Joins two subscriptions into a single subscription. Detach will
171 /// detach both interior subscriptions.
172 pub fn join(mut subscription_a: Self, mut subscription_b: Self) -> Self {
173 let a_unsubscribe = subscription_a.unsubscribe.take();
174 let b_unsubscribe = subscription_b.unsubscribe.take();
175 Self {
176 unsubscribe: Some(Box::new(move || {
177 if let Some(self_unsubscribe) = a_unsubscribe {
178 self_unsubscribe();
179 }
180 if let Some(other_unsubscribe) = b_unsubscribe {
181 other_unsubscribe();
182 }
183 })),
184 }
185 }
186}
187
188impl Drop for Subscription {
189 fn drop(&mut self) {
190 if let Some(unsubscribe) = self.unsubscribe.take() {
191 unsubscribe();
192 }
193 }
194}
195
196impl std::fmt::Debug for Subscription {
197 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
198 f.debug_struct("Subscription").finish()
199 }
200}
201
202#[cfg(test)]
203mod tests {
204 use super::*;
205 use crate::{Global, TestApp};
206
207 #[test]
208 fn test_unsubscribe_during_callback_with_insert() {
209 struct TestGlobal;
210 impl Global for TestGlobal {}
211
212 let mut app = TestApp::new();
213 app.set_global(TestGlobal);
214
215 let observer_a_count = Rc::new(Cell::new(0usize));
216 let observer_b_count = Rc::new(Cell::new(0usize));
217
218 let sub_a: Rc<RefCell<Option<Subscription>>> = Default::default();
219 let sub_b: Rc<RefCell<Option<Subscription>>> = Default::default();
220
221 // Observer A fires first (lower subscriber_id). It drops itself and
222 // inserts a new observer for the same global.
223 *sub_a.borrow_mut() = Some(app.update({
224 let count = observer_a_count.clone();
225 let sub_a = sub_a.clone();
226 move |cx| {
227 cx.observe_global::<TestGlobal>(move |cx| {
228 count.set(count.get() + 1);
229 sub_a.borrow_mut().take();
230 cx.observe_global::<TestGlobal>(|_| {}).detach();
231 })
232 }
233 }));
234
235 // Observer B fires second. It just drops itself.
236 *sub_b.borrow_mut() = Some(app.update({
237 let count = observer_b_count.clone();
238 let sub_b = sub_b.clone();
239 move |cx| {
240 cx.observe_global::<TestGlobal>(move |_cx| {
241 count.set(count.get() + 1);
242 sub_b.borrow_mut().take();
243 })
244 }
245 }));
246
247 // Both fire once.
248 app.update(|cx| cx.set_global(TestGlobal));
249 assert_eq!(observer_a_count.get(), 1);
250 assert_eq!(observer_b_count.get(), 1);
251
252 // Neither should fire again — both dropped their subscriptions.
253 app.update(|cx| cx.set_global(TestGlobal));
254 assert_eq!(observer_a_count.get(), 1);
255 assert_eq!(observer_b_count.get(), 1, "orphaned subscriber fired again");
256 }
257
258 #[test]
259 fn test_callback_dropped_by_earlier_callback_does_not_fire() {
260 struct TestGlobal;
261 impl Global for TestGlobal {}
262
263 let mut app = TestApp::new();
264 app.set_global(TestGlobal);
265
266 let observer_b_count = Rc::new(Cell::new(0usize));
267 let sub_b: Rc<RefCell<Option<Subscription>>> = Default::default();
268
269 // Observer A fires first and drops B's subscription.
270 app.update({
271 let sub_b = sub_b.clone();
272 move |cx| {
273 cx.observe_global::<TestGlobal>(move |_cx| {
274 sub_b.borrow_mut().take();
275 })
276 .detach();
277 }
278 });
279
280 // Observer B fires second — but A already dropped it.
281 *sub_b.borrow_mut() = Some(app.update({
282 let count = observer_b_count.clone();
283 move |cx| {
284 cx.observe_global::<TestGlobal>(move |_cx| {
285 count.set(count.get() + 1);
286 })
287 }
288 }));
289
290 app.update(|cx| cx.set_global(TestGlobal));
291 assert_eq!(
292 observer_b_count.get(),
293 0,
294 "B should not fire — A dropped its subscription"
295 );
296 }
297
298 #[test]
299 fn test_self_drop_during_callback() {
300 struct TestGlobal;
301 impl Global for TestGlobal {}
302
303 let mut app = TestApp::new();
304 app.set_global(TestGlobal);
305
306 let count = Rc::new(Cell::new(0usize));
307 let sub: Rc<RefCell<Option<Subscription>>> = Default::default();
308
309 *sub.borrow_mut() = Some(app.update({
310 let count = count.clone();
311 let sub = sub.clone();
312 move |cx| {
313 cx.observe_global::<TestGlobal>(move |_cx| {
314 count.set(count.get() + 1);
315 sub.borrow_mut().take();
316 })
317 }
318 }));
319
320 app.update(|cx| cx.set_global(TestGlobal));
321 assert_eq!(count.get(), 1);
322
323 app.update(|cx| cx.set_global(TestGlobal));
324 assert_eq!(count.get(), 1, "should not fire after self-drop");
325 }
326
327 #[test]
328 fn test_subscription_drop() {
329 struct TestGlobal;
330 impl Global for TestGlobal {}
331
332 let mut app = TestApp::new();
333 app.set_global(TestGlobal);
334
335 let count = Rc::new(Cell::new(0usize));
336
337 let subscription = app.update({
338 let count = count.clone();
339 move |cx| {
340 cx.observe_global::<TestGlobal>(move |_cx| {
341 count.set(count.get() + 1);
342 })
343 }
344 });
345
346 drop(subscription);
347
348 app.update(|cx| cx.set_global(TestGlobal));
349 assert_eq!(count.get(), 0, "should not fire after drop");
350 }
351}