1use crate::MutableAppContext;
2use collections::{BTreeMap, HashMap, HashSet};
3use parking_lot::Mutex;
4use std::sync::Arc;
5use std::{hash::Hash, sync::Weak};
6
7pub struct CallbackCollection<K: Clone + Hash + Eq, F> {
8 internal: Arc<Mutex<Mapping<K, F>>>,
9}
10
11pub struct Subscription<K: Clone + Hash + Eq, F> {
12 key: K,
13 id: usize,
14 mapping: Option<Weak<Mutex<Mapping<K, F>>>>,
15}
16
17struct Mapping<K, F> {
18 callbacks: HashMap<K, BTreeMap<usize, F>>,
19 dropped_subscriptions: HashMap<K, HashSet<usize>>,
20}
21
22impl<K: Hash + Eq, F> Mapping<K, F> {
23 fn clear_dropped_state(&mut self, key: &K, subscription_id: usize) -> bool {
24 if let Some(subscriptions) = self.dropped_subscriptions.get_mut(&key) {
25 subscriptions.remove(&subscription_id)
26 } else {
27 false
28 }
29 }
30}
31
32impl<K, F> Default for Mapping<K, F> {
33 fn default() -> Self {
34 Self {
35 callbacks: Default::default(),
36 dropped_subscriptions: Default::default(),
37 }
38 }
39}
40
41impl<K: Clone + Hash + Eq, F> Clone for CallbackCollection<K, F> {
42 fn clone(&self) -> Self {
43 Self {
44 internal: self.internal.clone(),
45 }
46 }
47}
48
49impl<K: Clone + Hash + Eq + Copy, F> Default for CallbackCollection<K, F> {
50 fn default() -> Self {
51 CallbackCollection {
52 internal: Arc::new(Mutex::new(Default::default())),
53 }
54 }
55}
56
57impl<K: Clone + Hash + Eq + Copy, F> CallbackCollection<K, F> {
58 #[cfg(test)]
59 pub fn is_empty(&self) -> bool {
60 self.internal.lock().callbacks.is_empty()
61 }
62
63 pub fn subscribe(&mut self, key: K, subscription_id: usize) -> Subscription<K, F> {
64 Subscription {
65 key,
66 id: subscription_id,
67 mapping: Some(Arc::downgrade(&self.internal)),
68 }
69 }
70
71 pub fn add_callback(&mut self, key: K, subscription_id: usize, callback: F) {
72 let mut this = self.internal.lock();
73
74 // If this callback's subscription was dropped before the callback was
75 // added, then just drop the callback.
76 if this.clear_dropped_state(&key, subscription_id) {
77 return;
78 }
79
80 this.callbacks
81 .entry(key)
82 .or_default()
83 .insert(subscription_id, callback);
84 }
85
86 pub fn remove(&mut self, key: K) {
87 // Drop these callbacks after releasing the lock, in case one of them
88 // owns a subscription to this callback collection.
89 let mut this = self.internal.lock();
90 let callbacks = this.callbacks.remove(&key);
91 this.dropped_subscriptions.remove(&key);
92 drop(this);
93 drop(callbacks);
94 }
95
96 pub fn emit<C: FnMut(&mut F, &mut MutableAppContext) -> bool>(
97 &mut self,
98 key: K,
99 cx: &mut MutableAppContext,
100 mut call_callback: C,
101 ) {
102 let callbacks = self.internal.lock().callbacks.remove(&key);
103 if let Some(callbacks) = callbacks {
104 for (subscription_id, mut callback) in callbacks {
105 // If this callback's subscription was dropped while invoking an
106 // earlier callback, then just drop the callback.
107 let mut this = self.internal.lock();
108 if this.clear_dropped_state(&key, subscription_id) {
109 continue;
110 }
111
112 drop(this);
113 let alive = call_callback(&mut callback, cx);
114
115 // If this callback's subscription was dropped while invoking the callback
116 // itself, or if the callback returns false, then just drop the callback.
117 let mut this = self.internal.lock();
118 if this.clear_dropped_state(&key, subscription_id) || !alive {
119 continue;
120 }
121
122 this.callbacks
123 .entry(key)
124 .or_default()
125 .insert(subscription_id, callback);
126 }
127 }
128 }
129}
130
131impl<K: Clone + Hash + Eq, F> Subscription<K, F> {
132 pub fn id(&self) -> usize {
133 self.id
134 }
135
136 pub fn detach(&mut self) {
137 self.mapping.take();
138 }
139}
140
141impl<K: Clone + Hash + Eq, F> Drop for Subscription<K, F> {
142 fn drop(&mut self) {
143 if let Some(mapping) = self.mapping.as_ref().and_then(|mapping| mapping.upgrade()) {
144 let mut mapping = mapping.lock();
145
146 // If the callback is present in the mapping, then just remove it.
147 if let Some(callbacks) = mapping.callbacks.get_mut(&self.key) {
148 let callback = callbacks.remove(&self.id);
149 if callback.is_some() {
150 drop(mapping);
151 drop(callback);
152 return;
153 }
154 }
155
156 // If this subscription's callback is not present, then either it has been
157 // temporarily removed during emit, or it has not yet been added. Record
158 // that this subscription has been dropped so that the callback can be
159 // removed later.
160 mapping
161 .dropped_subscriptions
162 .entry(self.key.clone())
163 .or_default()
164 .insert(self.id);
165 }
166 }
167}