1use collections::{BTreeMap, BTreeSet};
2use std::{
3 cell::{Cell, RefCell},
4 fmt::Debug,
5 mem,
6 rc::Rc,
7};
8use util::post_inc;
9
10pub(crate) struct SubscriberSet<EmitterKey, Callback>(
11 Rc<RefCell<SubscriberSetState<EmitterKey, Callback>>>,
12);
13
14impl<EmitterKey, Callback> Clone for SubscriberSet<EmitterKey, Callback> {
15 fn clone(&self) -> Self {
16 SubscriberSet(self.0.clone())
17 }
18}
19
20struct SubscriberSetState<EmitterKey, Callback> {
21 subscribers: BTreeMap<EmitterKey, Option<BTreeMap<usize, Subscriber<Callback>>>>,
22 dropped_subscribers: BTreeSet<(EmitterKey, usize)>,
23 next_subscriber_id: usize,
24}
25
26struct Subscriber<Callback> {
27 active: Rc<Cell<bool>>,
28 callback: Callback,
29}
30
31impl<EmitterKey, Callback> SubscriberSet<EmitterKey, Callback>
32where
33 EmitterKey: 'static + Ord + Clone + Debug,
34 Callback: 'static,
35{
36 pub fn new() -> Self {
37 Self(Rc::new(RefCell::new(SubscriberSetState {
38 subscribers: Default::default(),
39 dropped_subscribers: Default::default(),
40 next_subscriber_id: 0,
41 })))
42 }
43
44 /// Inserts a new [`Subscription`] for the given `emitter_key`. By default, subscriptions
45 /// are inert, meaning that they won't be listed when calling `[SubscriberSet::remove]` or `[SubscriberSet::retain]`.
46 /// This method returns a tuple of a [`Subscription`] and an `impl FnOnce`, and you can use the latter
47 /// to activate the [`Subscription`].
48 pub fn insert(
49 &self,
50 emitter_key: EmitterKey,
51 callback: Callback,
52 ) -> (Subscription, impl FnOnce() + use<EmitterKey, Callback>) {
53 let active = Rc::new(Cell::new(false));
54 let mut lock = self.0.borrow_mut();
55 let subscriber_id = post_inc(&mut lock.next_subscriber_id);
56 lock.subscribers
57 .entry(emitter_key.clone())
58 .or_default()
59 .get_or_insert_with(Default::default)
60 .insert(
61 subscriber_id,
62 Subscriber {
63 active: active.clone(),
64 callback,
65 },
66 );
67 let this = self.0.clone();
68
69 let subscription = Subscription {
70 unsubscribe: Some(Box::new(move || {
71 let mut lock = this.borrow_mut();
72 let Some(subscribers) = lock.subscribers.get_mut(&emitter_key) else {
73 // remove was called with this emitter_key
74 return;
75 };
76
77 if let Some(subscribers) = subscribers {
78 subscribers.remove(&subscriber_id);
79 if subscribers.is_empty() {
80 lock.subscribers.remove(&emitter_key);
81 }
82 return;
83 }
84
85 // We didn't manage to remove the subscription, which means it was dropped
86 // while invoking the callback. Mark it as dropped so that we can remove it
87 // later.
88 lock.dropped_subscribers
89 .insert((emitter_key, subscriber_id));
90 })),
91 };
92 (subscription, move || active.set(true))
93 }
94
95 pub fn remove(
96 &self,
97 emitter: &EmitterKey,
98 ) -> impl IntoIterator<Item = Callback> + use<EmitterKey, Callback> {
99 let subscribers = self.0.borrow_mut().subscribers.remove(emitter);
100 subscribers
101 .unwrap_or_default()
102 .map(|s| s.into_values())
103 .into_iter()
104 .flatten()
105 .filter_map(|subscriber| {
106 if subscriber.active.get() {
107 Some(subscriber.callback)
108 } else {
109 None
110 }
111 })
112 }
113
114 /// Call the given callback for each subscriber to the given emitter.
115 /// If the callback returns false, the subscriber is removed.
116 pub fn retain<F>(&self, emitter: &EmitterKey, mut f: F)
117 where
118 F: FnMut(&mut Callback) -> bool,
119 {
120 let Some(mut subscribers) = self
121 .0
122 .borrow_mut()
123 .subscribers
124 .get_mut(emitter)
125 .and_then(|s| s.take())
126 else {
127 return;
128 };
129
130 subscribers.retain(|_, subscriber| {
131 if subscriber.active.get() {
132 f(&mut subscriber.callback)
133 } else {
134 true
135 }
136 });
137 let mut lock = self.0.borrow_mut();
138
139 // Add any new subscribers that were added while invoking the callback.
140 if let Some(Some(new_subscribers)) = lock.subscribers.remove(emitter) {
141 subscribers.extend(new_subscribers);
142 }
143
144 // Remove any dropped subscriptions that were dropped while invoking the callback.
145 for (dropped_emitter, dropped_subscription_id) in mem::take(&mut lock.dropped_subscribers) {
146 debug_assert_eq!(*emitter, dropped_emitter);
147 subscribers.remove(&dropped_subscription_id);
148 }
149
150 if !subscribers.is_empty() {
151 lock.subscribers.insert(emitter.clone(), Some(subscribers));
152 }
153 }
154}
155
156/// A handle to a subscription created by GPUI. When dropped, the subscription
157/// is cancelled and the callback will no longer be invoked.
158#[must_use]
159pub struct Subscription {
160 unsubscribe: Option<Box<dyn FnOnce() + 'static>>,
161}
162
163impl Subscription {
164 /// Creates a new subscription with a callback that gets invoked when
165 /// this subscription is dropped.
166 pub fn new(unsubscribe: impl 'static + FnOnce()) -> Self {
167 Self {
168 unsubscribe: Some(Box::new(unsubscribe)),
169 }
170 }
171
172 /// Detaches the subscription from this handle. The callback will
173 /// continue to be invoked until the entities it has been
174 /// subscribed to are dropped
175 pub fn detach(mut self) {
176 self.unsubscribe.take();
177 }
178
179 /// Joins two subscriptions into a single subscription. Detach will
180 /// detach both interior subscriptions.
181 pub fn join(mut subscription_a: Self, mut subscription_b: Self) -> Self {
182 let a_unsubscribe = subscription_a.unsubscribe.take();
183 let b_unsubscribe = subscription_b.unsubscribe.take();
184 Self {
185 unsubscribe: Some(Box::new(move || {
186 if let Some(self_unsubscribe) = a_unsubscribe {
187 self_unsubscribe();
188 }
189 if let Some(other_unsubscribe) = b_unsubscribe {
190 other_unsubscribe();
191 }
192 })),
193 }
194 }
195}
196
197impl Drop for Subscription {
198 fn drop(&mut self) {
199 if let Some(unsubscribe) = self.unsubscribe.take() {
200 unsubscribe();
201 }
202 }
203}
204
205impl std::fmt::Debug for Subscription {
206 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
207 f.debug_struct("Subscription").finish()
208 }
209}