1use crate::MutableAppContext;
2use collections::{BTreeMap, HashMap, HashSet};
3use parking_lot::Mutex;
4use std::sync::Arc;
5use std::{hash::Hash, sync::Weak};
6
7pub struct CallbackCollection<K: Clone + Hash + Eq, F> {
8 internal: Arc<Mutex<Mapping<K, F>>>,
9}
10
11pub struct Subscription<K: Clone + Hash + Eq, F> {
12 key: K,
13 id: usize,
14 mapping: Option<Weak<Mutex<Mapping<K, F>>>>,
15}
16
17struct Mapping<K, F> {
18 callbacks: HashMap<K, BTreeMap<usize, F>>,
19 dropped_subscriptions: HashSet<(K, usize)>,
20}
21
22impl<K, F> Default for Mapping<K, F> {
23 fn default() -> Self {
24 Self {
25 callbacks: Default::default(),
26 dropped_subscriptions: Default::default(),
27 }
28 }
29}
30
31impl<K: Clone + Hash + Eq, F> Clone for CallbackCollection<K, F> {
32 fn clone(&self) -> Self {
33 Self {
34 internal: self.internal.clone(),
35 }
36 }
37}
38
39impl<K: Clone + Hash + Eq + Copy, F> Default for CallbackCollection<K, F> {
40 fn default() -> Self {
41 CallbackCollection {
42 internal: Arc::new(Mutex::new(Default::default())),
43 }
44 }
45}
46
47impl<K: Clone + Hash + Eq + Copy, F> CallbackCollection<K, F> {
48 #[cfg(test)]
49 pub fn is_empty(&self) -> bool {
50 self.internal.lock().callbacks.is_empty()
51 }
52
53 pub fn subscribe(&mut self, key: K, subscription_id: usize) -> Subscription<K, F> {
54 Subscription {
55 key,
56 id: subscription_id,
57 mapping: Some(Arc::downgrade(&self.internal)),
58 }
59 }
60
61 pub fn add_callback(&mut self, key: K, subscription_id: usize, callback: F) {
62 let mut this = self.internal.lock();
63 if !this.dropped_subscriptions.contains(&(key, subscription_id)) {
64 this.callbacks
65 .entry(key)
66 .or_default()
67 .insert(subscription_id, callback);
68 }
69 }
70
71 pub fn remove(&mut self, key: K) {
72 let callbacks = self.internal.lock().callbacks.remove(&key);
73 // drop these after releasing the lock
74 drop(callbacks);
75 }
76
77 pub fn emit<C: FnMut(&mut F, &mut MutableAppContext) -> bool>(
78 &mut self,
79 key: K,
80 cx: &mut MutableAppContext,
81 mut call_callback: C,
82 ) {
83 let callbacks = self.internal.lock().callbacks.remove(&key);
84 if let Some(callbacks) = callbacks {
85 for (subscription_id, mut callback) in callbacks {
86 if !self
87 .internal
88 .lock()
89 .dropped_subscriptions
90 .contains(&(key, subscription_id))
91 {
92 if call_callback(&mut callback, cx) {
93 self.add_callback(key, subscription_id, callback);
94 }
95 }
96 }
97 }
98 }
99
100 pub fn gc(&mut self) {
101 let mut this = self.internal.lock();
102
103 for (key, id) in std::mem::take(&mut this.dropped_subscriptions) {
104 if let Some(callbacks) = this.callbacks.get_mut(&key) {
105 callbacks.remove(&id);
106 }
107 }
108 }
109}
110
111impl<K: Clone + Hash + Eq, F> Subscription<K, F> {
112 pub fn id(&self) -> usize {
113 self.id
114 }
115
116 pub fn detach(&mut self) {
117 self.mapping.take();
118 }
119}
120
121impl<K: Clone + Hash + Eq, F> Drop for Subscription<K, F> {
122 // If the callback has been initialized (no callback in the list for the key and id),
123 // add this subscription id and key to the dropped subscriptions list
124 // Otherwise, just remove the associated callback from the callback collection
125 fn drop(&mut self) {
126 if let Some(mapping) = self.mapping.as_ref().and_then(|mapping| mapping.upgrade()) {
127 let mut mapping = mapping.lock();
128 if let Some(callbacks) = mapping.callbacks.get_mut(&self.key) {
129 if callbacks.remove(&self.id).is_some() {
130 return;
131 }
132 }
133 mapping
134 .dropped_subscriptions
135 .insert((self.key.clone(), self.id));
136 }
137 }
138}