1use std::sync::Arc;
2use std::{hash::Hash, sync::Weak};
3
4use parking_lot::Mutex;
5
6use collections::{BTreeMap, HashMap, HashSet};
7
8use crate::MutableAppContext;
9
10// Problem 5: Current bug callbacks currently called many times after being dropped
11// update
12// notify
13// observe (push effect)
14// subscription {id : 5}
15// pending: [Effect::Notify, Effect::observe { id: 5 }]
16// drop observation subscription (write None into subscriptions)
17// flush effects
18// notify
19// observe
20// Problem 6: Key-value pair is leaked if you drop a callback while calling it, and then never call that set of callbacks again
21// -----------------
22// Problem 1: Many very similar subscription enum variants
23// Problem 2: Subscriptions and CallbackCollections use a shared mutex to update the callback status
24// Problem 3: Current implementation is error prone with regard to uninitialized callbacks or dropping during callback
25// Problem 4: Calling callbacks requires removing all of them from the list and adding them back
26
27// Solution 1 CallbackState:
28// Add more state to the CallbackCollection map to communicate dropped and initialized status
29// Solves: P5
30// Solution 2 DroppedSubscriptionList:
31// Store a parallel set of dropped subscriptions in the Mapping which stores the key and subscription id for all dropped subscriptions
32// which can be
33// Solution 3 GarbageCollection:
34// Use some type of traditional garbage collection to handle dropping of callbacks
35// atomic flag per callback which is looped over in remove dropped entities
36
37// TODO:
38// - Move subscription id counter to CallbackCollection
39// - Consider adding a reverse map in Mapping from subscription id to key so that the dropped subscriptions
40// can be a hashset of usize and the Subscription doesn't need the key
41// - Investigate why the remaining two types of callback lists can't use the same callback collection and subscriptions
42pub struct Subscription<K: Clone + Hash + Eq, F> {
43 key: K,
44 id: usize,
45 mapping: Option<Weak<Mutex<Mapping<K, F>>>>,
46}
47
48struct Mapping<K, F> {
49 callbacks: HashMap<K, BTreeMap<usize, F>>,
50 dropped_subscriptions: HashSet<(K, usize)>,
51}
52
53impl<K, F> Default for Mapping<K, F> {
54 fn default() -> Self {
55 Self {
56 callbacks: Default::default(),
57 dropped_subscriptions: Default::default(),
58 }
59 }
60}
61
62pub(crate) struct CallbackCollection<K: Clone + Hash + Eq, F> {
63 internal: Arc<Mutex<Mapping<K, F>>>,
64}
65
66impl<K: Clone + Hash + Eq, F> Clone for CallbackCollection<K, F> {
67 fn clone(&self) -> Self {
68 Self {
69 internal: self.internal.clone(),
70 }
71 }
72}
73
74impl<K: Clone + Hash + Eq + Copy, F> Default for CallbackCollection<K, F> {
75 fn default() -> Self {
76 CallbackCollection {
77 internal: Arc::new(Mutex::new(Default::default())),
78 }
79 }
80}
81
82impl<K: Clone + Hash + Eq + Copy, F> CallbackCollection<K, F> {
83 #[cfg(test)]
84 pub fn is_empty(&self) -> bool {
85 self.internal.lock().callbacks.is_empty()
86 }
87
88 pub fn subscribe(&mut self, key: K, subscription_id: usize) -> Subscription<K, F> {
89 Subscription {
90 key,
91 id: subscription_id,
92 mapping: Some(Arc::downgrade(&self.internal)),
93 }
94 }
95
96 pub fn count(&mut self, key: K) -> usize {
97 self.internal
98 .lock()
99 .callbacks
100 .get(&key)
101 .map_or(0, |callbacks| callbacks.len())
102 }
103
104 pub fn add_callback(&mut self, key: K, subscription_id: usize, callback: F) {
105 let mut this = self.internal.lock();
106 if !this.dropped_subscriptions.contains(&(key, subscription_id)) {
107 this.callbacks
108 .entry(key)
109 .or_default()
110 .insert(subscription_id, callback);
111 }
112 }
113
114 pub fn remove(&mut self, key: K) {
115 self.internal.lock().callbacks.remove(&key);
116 }
117
118 pub fn emit<C: FnMut(&mut F, &mut MutableAppContext) -> bool>(
119 &mut self,
120 key: K,
121 cx: &mut MutableAppContext,
122 mut call_callback: C,
123 ) {
124 let callbacks = self.internal.lock().callbacks.remove(&key);
125 if let Some(callbacks) = callbacks {
126 for (subscription_id, mut callback) in callbacks {
127 if !self
128 .internal
129 .lock()
130 .dropped_subscriptions
131 .contains(&(key, subscription_id))
132 {
133 if call_callback(&mut callback, cx) {
134 self.add_callback(key, subscription_id, callback);
135 }
136 }
137 }
138 }
139 }
140
141 pub fn gc(&mut self) {
142 let mut this = self.internal.lock();
143
144 for (key, id) in std::mem::take(&mut this.dropped_subscriptions) {
145 if let Some(callbacks) = this.callbacks.get_mut(&key) {
146 callbacks.remove(&id);
147 }
148 }
149 }
150}
151
152impl<K: Clone + Hash + Eq, F> Subscription<K, F> {
153 pub fn detach(&mut self) {
154 self.mapping.take();
155 }
156}
157
158impl<K: Clone + Hash + Eq, F> Drop for Subscription<K, F> {
159 // If the callback has been initialized (no callback in the list for the key and id),
160 // add this subscription id and key to the dropped subscriptions list
161 // Otherwise, just remove the associated callback from the callback collection
162 fn drop(&mut self) {
163 if let Some(mapping) = self.mapping.as_ref().and_then(|mapping| mapping.upgrade()) {
164 let mut mapping = mapping.lock();
165 if let Some(callbacks) = mapping.callbacks.get_mut(&self.key) {
166 if callbacks.remove(&self.id).is_some() {
167 return;
168 }
169 }
170 mapping
171 .dropped_subscriptions
172 .insert((self.key.clone(), self.id));
173 }
174 }
175}