1use collections::{BTreeMap, HashMap, HashSet};
2use parking_lot::Mutex;
3use std::sync::Arc;
4use std::{hash::Hash, sync::Weak};
5
6pub struct CallbackCollection<K: Clone + Hash + Eq, F> {
7 internal: Arc<Mutex<Mapping<K, F>>>,
8}
9
10pub struct Subscription<K: Clone + Hash + Eq, F> {
11 key: K,
12 id: usize,
13 mapping: Option<Weak<Mutex<Mapping<K, F>>>>,
14}
15
16struct Mapping<K, F> {
17 callbacks: HashMap<K, BTreeMap<usize, F>>,
18 dropped_subscriptions: HashMap<K, HashSet<usize>>,
19}
20
21impl<K: Hash + Eq, F> Mapping<K, F> {
22 fn clear_dropped_state(&mut self, key: &K, subscription_id: usize) -> bool {
23 if let Some(subscriptions) = self.dropped_subscriptions.get_mut(&key) {
24 subscriptions.remove(&subscription_id)
25 } else {
26 false
27 }
28 }
29}
30
31impl<K, F> Default for Mapping<K, F> {
32 fn default() -> Self {
33 Self {
34 callbacks: Default::default(),
35 dropped_subscriptions: Default::default(),
36 }
37 }
38}
39
40impl<K: Clone + Hash + Eq, F> Clone for CallbackCollection<K, F> {
41 fn clone(&self) -> Self {
42 Self {
43 internal: self.internal.clone(),
44 }
45 }
46}
47
48impl<K: Clone + Hash + Eq + Copy, F> Default for CallbackCollection<K, F> {
49 fn default() -> Self {
50 CallbackCollection {
51 internal: Arc::new(Mutex::new(Default::default())),
52 }
53 }
54}
55
56impl<K: Clone + Hash + Eq + Copy, F> CallbackCollection<K, F> {
57 #[cfg(test)]
58 pub fn is_empty(&self) -> bool {
59 self.internal.lock().callbacks.is_empty()
60 }
61
62 pub fn subscribe(&mut self, key: K, subscription_id: usize) -> Subscription<K, F> {
63 Subscription {
64 key,
65 id: subscription_id,
66 mapping: Some(Arc::downgrade(&self.internal)),
67 }
68 }
69
70 pub fn add_callback(&mut self, key: K, subscription_id: usize, callback: F) {
71 let mut this = self.internal.lock();
72
73 // If this callback's subscription was dropped before the callback was
74 // added, then just drop the callback.
75 if this.clear_dropped_state(&key, subscription_id) {
76 return;
77 }
78
79 this.callbacks
80 .entry(key)
81 .or_default()
82 .insert(subscription_id, callback);
83 }
84
85 pub fn remove(&mut self, key: K) {
86 // Drop these callbacks after releasing the lock, in case one of them
87 // owns a subscription to this callback collection.
88 let mut this = self.internal.lock();
89 let callbacks = this.callbacks.remove(&key);
90 this.dropped_subscriptions.remove(&key);
91 drop(this);
92 drop(callbacks);
93 }
94
95 pub fn emit<C>(&mut self, key: K, mut call_callback: C)
96 where
97 C: FnMut(&mut F) -> bool,
98 {
99 let callbacks = self.internal.lock().callbacks.remove(&key);
100 if let Some(callbacks) = callbacks {
101 for (subscription_id, mut callback) in callbacks {
102 // If this callback's subscription was dropped while invoking an
103 // earlier callback, then just drop the callback.
104 let mut this = self.internal.lock();
105 if this.clear_dropped_state(&key, subscription_id) {
106 continue;
107 }
108
109 drop(this);
110 let alive = call_callback(&mut callback);
111
112 // If this callback's subscription was dropped while invoking the callback
113 // itself, or if the callback returns false, then just drop the callback.
114 let mut this = self.internal.lock();
115 if this.clear_dropped_state(&key, subscription_id) || !alive {
116 continue;
117 }
118
119 this.callbacks
120 .entry(key)
121 .or_default()
122 .insert(subscription_id, callback);
123 }
124 }
125 }
126}
127
128impl<K: Clone + Hash + Eq, F> Subscription<K, F> {
129 pub fn id(&self) -> usize {
130 self.id
131 }
132
133 pub fn detach(&mut self) {
134 self.mapping.take();
135 }
136}
137
138impl<K: Clone + Hash + Eq, F> Drop for Subscription<K, F> {
139 fn drop(&mut self) {
140 if let Some(mapping) = self.mapping.as_ref().and_then(|mapping| mapping.upgrade()) {
141 let mut mapping = mapping.lock();
142
143 // If the callback is present in the mapping, then just remove it.
144 if let Some(callbacks) = mapping.callbacks.get_mut(&self.key) {
145 let callback = callbacks.remove(&self.id);
146 if callback.is_some() {
147 drop(mapping);
148 drop(callback);
149 return;
150 }
151 }
152
153 // If this subscription's callback is not present, then either it has been
154 // temporarily removed during emit, or it has not yet been added. Record
155 // that this subscription has been dropped so that the callback can be
156 // removed later.
157 mapping
158 .dropped_subscriptions
159 .entry(self.key.clone())
160 .or_default()
161 .insert(self.id);
162 }
163 }
164}