@@ -636,9 +636,11 @@ impl ReadViewWith for TestAppContext {
type ActionCallback =
dyn FnMut(&mut dyn AnyView, &dyn AnyAction, &mut MutableAppContext, usize, usize) -> bool;
-
type GlobalActionCallback = dyn FnMut(&dyn AnyAction, &mut MutableAppContext);
+type SubscriptionCallback = Box<dyn FnMut(&dyn Any, &mut MutableAppContext) -> bool>;
+type ObservationCallback = Box<dyn FnMut(&mut MutableAppContext) -> bool>;
+
pub struct MutableAppContext {
weak_self: Option<rc::Weak<RefCell<Self>>>,
foreground_platform: Rc<dyn platform::ForegroundPlatform>,
@@ -649,8 +651,9 @@ pub struct MutableAppContext {
keystroke_matcher: keymap::Matcher,
next_entity_id: usize,
next_window_id: usize,
- subscriptions: HashMap<usize, Vec<Box<dyn FnMut(&dyn Any, &mut MutableAppContext) -> bool>>>,
- observations: HashMap<usize, Vec<Box<dyn FnMut(&mut MutableAppContext) -> bool>>>,
+ next_subscription_id: usize,
+ subscriptions: Arc<Mutex<HashMap<usize, HashMap<usize, SubscriptionCallback>>>>,
+ observations: Arc<Mutex<HashMap<usize, HashMap<usize, ObservationCallback>>>>,
presenters_and_platform_windows:
HashMap<usize, (Rc<RefCell<Presenter>>, Box<dyn platform::Window>)>,
debug_elements_callbacks: HashMap<usize, Box<dyn Fn(&AppContext) -> crate::json::Value>>,
@@ -688,8 +691,9 @@ impl MutableAppContext {
keystroke_matcher: keymap::Matcher::default(),
next_entity_id: 0,
next_window_id: 0,
- subscriptions: HashMap::new(),
- observations: HashMap::new(),
+ next_subscription_id: 0,
+ subscriptions: Default::default(),
+ observations: Default::default(),
presenters_and_platform_windows: HashMap::new(),
debug_elements_callbacks: HashMap::new(),
foreground,
@@ -877,7 +881,7 @@ impl MutableAppContext {
);
}
- pub fn subscribe<E, H, F>(&mut self, handle: &H, mut callback: F)
+ pub fn subscribe<E, H, F>(&mut self, handle: &H, mut callback: F) -> Subscription
where
E: Entity,
E::Event: 'static,
@@ -890,7 +894,7 @@ impl MutableAppContext {
})
}
- fn observe<E, H, F>(&mut self, handle: &H, mut callback: F)
+ fn observe<E, H, F>(&mut self, handle: &H, mut callback: F) -> Subscription
where
E: Entity,
E::Event: 'static,
@@ -903,45 +907,65 @@ impl MutableAppContext {
})
}
- pub fn subscribe_internal<E, H, F>(&mut self, handle: &H, mut callback: F)
+ pub fn subscribe_internal<E, H, F>(&mut self, handle: &H, mut callback: F) -> Subscription
where
E: Entity,
E::Event: 'static,
H: Handle<E>,
F: 'static + FnMut(H, &E::Event, &mut Self) -> bool,
{
+ let id = post_inc(&mut self.next_subscription_id);
let emitter = handle.downgrade();
self.subscriptions
+ .lock()
.entry(handle.id())
.or_default()
- .push(Box::new(move |payload, cx| {
- if let Some(emitter) = H::upgrade_from(&emitter, cx.as_ref()) {
- let payload = payload.downcast_ref().expect("downcast is type safe");
- callback(emitter, payload, cx)
- } else {
- false
- }
- }))
+ .insert(
+ id,
+ Box::new(move |payload, cx| {
+ if let Some(emitter) = H::upgrade_from(&emitter, cx.as_ref()) {
+ let payload = payload.downcast_ref().expect("downcast is type safe");
+ callback(emitter, payload, cx)
+ } else {
+ false
+ }
+ }),
+ );
+ Subscription::Subscription {
+ id,
+ entity_id: handle.id(),
+ subscriptions: Some(Arc::downgrade(&self.subscriptions)),
+ }
}
- fn observe_internal<E, H, F>(&mut self, handle: &H, mut callback: F)
+ fn observe_internal<E, H, F>(&mut self, handle: &H, mut callback: F) -> Subscription
where
E: Entity,
E::Event: 'static,
H: Handle<E>,
F: 'static + FnMut(H, &mut Self) -> bool,
{
+ let id = post_inc(&mut self.next_subscription_id);
let observed = handle.downgrade();
self.observations
+ .lock()
.entry(handle.id())
.or_default()
- .push(Box::new(move |cx| {
- if let Some(observed) = H::upgrade_from(&observed, cx) {
- callback(observed, cx)
- } else {
- false
- }
- }))
+ .insert(
+ id,
+ Box::new(move |cx| {
+ if let Some(observed) = H::upgrade_from(&observed, cx) {
+ callback(observed, cx)
+ } else {
+ false
+ }
+ }),
+ );
+ Subscription::Observation {
+ id,
+ entity_id: handle.id(),
+ observations: Some(Arc::downgrade(&self.observations)),
+ }
}
pub(crate) fn notify_view(&mut self, window_id: usize, view_id: usize) {
@@ -1248,15 +1272,15 @@ impl MutableAppContext {
}
for model_id in dropped_models {
- self.subscriptions.remove(&model_id);
- self.observations.remove(&model_id);
+ self.subscriptions.lock().remove(&model_id);
+ self.observations.lock().remove(&model_id);
let mut model = self.cx.models.remove(&model_id).unwrap();
model.release(self);
}
for (window_id, view_id) in dropped_views {
- self.subscriptions.remove(&view_id);
- self.observations.remove(&view_id);
+ self.subscriptions.lock().remove(&view_id);
+ self.observations.lock().remove(&view_id);
let mut view = self.cx.views.remove(&(window_id, view_id)).unwrap();
view.release(self);
let change_focus_to = self.cx.windows.get_mut(&window_id).and_then(|window| {
@@ -1343,29 +1367,33 @@ impl MutableAppContext {
}
fn emit_event(&mut self, entity_id: usize, payload: Box<dyn Any>) {
- if let Some(callbacks) = self.subscriptions.remove(&entity_id) {
- for mut callback in callbacks {
+ let callbacks = self.subscriptions.lock().remove(&entity_id);
+ if let Some(callbacks) = callbacks {
+ for (id, mut callback) in callbacks {
let alive = callback(payload.as_ref(), self);
if alive {
self.subscriptions
+ .lock()
.entry(entity_id)
.or_default()
- .push(callback);
+ .insert(id, callback);
}
}
}
}
fn notify_model_observers(&mut self, observed_id: usize) {
- if let Some(callbacks) = self.observations.remove(&observed_id) {
+ let callbacks = self.observations.lock().remove(&observed_id);
+ if let Some(callbacks) = callbacks {
if self.cx.models.contains_key(&observed_id) {
- for mut callback in callbacks {
+ for (id, mut callback) in callbacks {
let alive = callback(self);
if alive {
self.observations
+ .lock()
.entry(observed_id)
.or_default()
- .push(callback);
+ .insert(id, callback);
}
}
}
@@ -1381,19 +1409,21 @@ impl MutableAppContext {
.insert(observed_view_id);
}
- if let Some(callbacks) = self.observations.remove(&observed_view_id) {
+ let callbacks = self.observations.lock().remove(&observed_view_id);
+ if let Some(callbacks) = callbacks {
if self
.cx
.views
.contains_key(&(observed_window_id, observed_view_id))
{
- for mut callback in callbacks {
+ for (id, mut callback) in callbacks {
let alive = callback(self);
if alive {
self.observations
+ .lock()
.entry(observed_view_id)
.or_default()
- .push(callback);
+ .insert(id, callback);
}
}
}
@@ -1873,7 +1903,11 @@ impl<'a, T: Entity> ModelContext<'a, T> {
});
}
- pub fn subscribe<S: Entity, F>(&mut self, handle: &ModelHandle<S>, mut callback: F)
+ pub fn subscribe<S: Entity, F>(
+ &mut self,
+ handle: &ModelHandle<S>,
+ mut callback: F,
+ ) -> Subscription
where
S::Event: 'static,
F: 'static + FnMut(&mut T, ModelHandle<S>, &S::Event, &mut ModelContext<T>),
@@ -1889,10 +1923,10 @@ impl<'a, T: Entity> ModelContext<'a, T> {
} else {
false
}
- });
+ })
}
- pub fn observe<S, F>(&mut self, handle: &ModelHandle<S>, mut callback: F)
+ pub fn observe<S, F>(&mut self, handle: &ModelHandle<S>, mut callback: F) -> Subscription
where
S: Entity,
F: 'static + FnMut(&mut T, ModelHandle<S>, &mut ModelContext<T>),
@@ -1907,7 +1941,7 @@ impl<'a, T: Entity> ModelContext<'a, T> {
} else {
false
}
- });
+ })
}
pub fn handle(&self) -> ModelHandle<T> {
@@ -2097,7 +2131,7 @@ impl<'a, T: View> ViewContext<'a, T> {
self.app.add_option_view(self.window_id, build_view)
}
- pub fn subscribe<E, H, F>(&mut self, handle: &H, mut callback: F)
+ pub fn subscribe<E, H, F>(&mut self, handle: &H, mut callback: F) -> Subscription
where
E: Entity,
E::Event: 'static,
@@ -2115,10 +2149,10 @@ impl<'a, T: View> ViewContext<'a, T> {
} else {
false
}
- });
+ })
}
- pub fn observe<E, F, H>(&mut self, handle: &H, mut callback: F)
+ pub fn observe<E, F, H>(&mut self, handle: &H, mut callback: F) -> Subscription
where
E: Entity,
H: Handle<E>,
@@ -2134,7 +2168,7 @@ impl<'a, T: View> ViewContext<'a, T> {
} else {
false
}
- });
+ })
}
pub fn emit(&mut self, payload: T::Event) {
@@ -2330,18 +2364,20 @@ impl<T: Entity> ModelHandle<T> {
let (tx, mut rx) = mpsc::channel(1024);
let mut cx = cx.cx.borrow_mut();
- cx.observe(self, {
- let mut tx = tx.clone();
- move |_, _| {
- tx.blocking_send(()).ok();
- }
- });
- cx.subscribe(self, {
- let mut tx = tx.clone();
- move |_, _, _| {
- tx.blocking_send(()).ok();
- }
- });
+ let subscriptions = (
+ cx.observe(self, {
+ let mut tx = tx.clone();
+ move |_, _| {
+ tx.blocking_send(()).ok();
+ }
+ }),
+ cx.subscribe(self, {
+ let mut tx = tx.clone();
+ move |_, _, _| {
+ tx.blocking_send(()).ok();
+ }
+ }),
+ );
let cx = cx.weak_self.as_ref().unwrap().upgrade().unwrap();
let handle = self.downgrade();
@@ -2375,6 +2411,7 @@ impl<T: Entity> ModelHandle<T> {
})
.await
.expect("condition timed out");
+ drop(subscriptions);
}
}
}
@@ -2559,20 +2596,21 @@ impl<T: View> ViewHandle<T> {
let (tx, mut rx) = mpsc::channel(1024);
let mut cx = cx.cx.borrow_mut();
- self.update(&mut *cx, |_, cx| {
- cx.observe(self, {
- let mut tx = tx.clone();
- move |_, _, _| {
- tx.blocking_send(()).ok();
- }
- });
-
- cx.subscribe(self, {
- let mut tx = tx.clone();
- move |_, _, _, _| {
- tx.blocking_send(()).ok();
- }
- })
+ let subscriptions = self.update(&mut *cx, |_, cx| {
+ (
+ cx.observe(self, {
+ let mut tx = tx.clone();
+ move |_, _, _| {
+ tx.blocking_send(()).ok();
+ }
+ }),
+ cx.subscribe(self, {
+ let mut tx = tx.clone();
+ move |_, _, _, _| {
+ tx.blocking_send(()).ok();
+ }
+ }),
+ )
});
let cx = cx.weak_self.as_ref().unwrap().upgrade().unwrap();
@@ -2607,6 +2645,7 @@ impl<T: View> ViewHandle<T> {
})
.await
.expect("condition timed out");
+ drop(subscriptions);
}
}
}
@@ -2876,6 +2915,62 @@ impl<T> Drop for ValueHandle<T> {
}
}
+#[must_use]
+pub enum Subscription {
+ Subscription {
+ id: usize,
+ entity_id: usize,
+ subscriptions: Option<Weak<Mutex<HashMap<usize, HashMap<usize, SubscriptionCallback>>>>>,
+ },
+ Observation {
+ id: usize,
+ entity_id: usize,
+ observations: Option<Weak<Mutex<HashMap<usize, HashMap<usize, ObservationCallback>>>>>,
+ },
+}
+
+impl Subscription {
+ pub fn detach(&mut self) {
+ match self {
+ Subscription::Subscription { subscriptions, .. } => {
+ subscriptions.take();
+ }
+ Subscription::Observation { observations, .. } => {
+ observations.take();
+ }
+ }
+ }
+}
+
+impl Drop for Subscription {
+ fn drop(&mut self) {
+ match self {
+ Subscription::Observation {
+ id,
+ entity_id,
+ observations,
+ } => {
+ if let Some(observations) = observations.as_ref().and_then(Weak::upgrade) {
+ if let Some(observations) = observations.lock().get_mut(entity_id) {
+ observations.remove(id);
+ }
+ }
+ }
+ Subscription::Subscription {
+ id,
+ entity_id,
+ subscriptions,
+ } => {
+ if let Some(subscriptions) = subscriptions.as_ref().and_then(Weak::upgrade) {
+ if let Some(subscriptions) = subscriptions.lock().get_mut(entity_id) {
+ subscriptions.remove(id);
+ }
+ }
+ }
+ }
+ }
+}
+
#[derive(Default)]
struct RefCounts {
entity_counts: HashMap<usize, usize>,
@@ -2982,10 +3077,12 @@ mod tests {
if let Some(other) = other.as_ref() {
cx.observe(other, |me, _, _| {
me.events.push("notified".into());
- });
+ })
+ .detach();
cx.subscribe(other, |me, _, event, _| {
me.events.push(format!("observed event {}", event));
- });
+ })
+ .detach();
}
Self {
@@ -3021,8 +3118,8 @@ mod tests {
});
assert_eq!(cx.cx.models.len(), 1);
- assert!(cx.subscriptions.is_empty());
- assert!(cx.observations.is_empty());
+ assert!(cx.subscriptions.lock().is_empty());
+ assert!(cx.observations.lock().is_empty());
}
#[crate::test(self)]
@@ -3046,8 +3143,10 @@ mod tests {
c.subscribe(&handle_2b, |model, _, event, _| {
model.events.push(*event * 2);
- });
- });
+ })
+ .detach();
+ })
+ .detach();
});
handle_2.update(cx, |_, c| c.emit(7));
@@ -3078,8 +3177,10 @@ mod tests {
model.events.push(observed.read(c).count);
c.observe(&handle_2b, |model, observed, c| {
model.events.push(observed.read(c).count * 2);
- });
- });
+ })
+ .detach();
+ })
+ .detach();
});
handle_2.update(cx, |model, c| {
@@ -3121,7 +3222,8 @@ mod tests {
if let Some(other) = other.as_ref() {
cx.subscribe(other, |me, _, event, _| {
me.events.push(format!("observed event {}", event));
- });
+ })
+ .detach();
}
Self {
other,
@@ -3155,8 +3257,8 @@ mod tests {
});
assert_eq!(cx.cx.views.len(), 2);
- assert!(cx.subscriptions.is_empty());
- assert!(cx.observations.is_empty());
+ assert!(cx.subscriptions.lock().is_empty());
+ assert!(cx.observations.lock().is_empty());
}
#[crate::test(self)]
@@ -3298,12 +3400,15 @@ mod tests {
c.subscribe(&handle_2b, |me, _, event, _| {
me.events.push(*event * 2);
- });
- });
+ })
+ .detach();
+ })
+ .detach();
c.subscribe(&handle_3, |me, _, event, _| {
me.events.push(*event);
})
+ .detach();
});
handle_2.update(cx, |_, c| c.emit(7));
@@ -3347,11 +3452,11 @@ mod tests {
let observed_model = cx.add_model(|_| Model);
observing_view.update(cx, |_, cx| {
- cx.subscribe(&emitting_view, |_, _, _, _| {});
- cx.subscribe(&observed_model, |_, _, _, _| {});
+ cx.subscribe(&emitting_view, |_, _, _, _| {}).detach();
+ cx.subscribe(&observed_model, |_, _, _, _| {}).detach();
});
observing_model.update(cx, |_, cx| {
- cx.subscribe(&observed_model, |_, _, _, _| {});
+ cx.subscribe(&observed_model, |_, _, _, _| {}).detach();
});
cx.update(|| {
@@ -3399,7 +3504,8 @@ mod tests {
view.update(cx, |_, c| {
c.observe(&model, |me, observed, c| {
me.events.push(observed.read(c).count)
- });
+ })
+ .detach();
});
model.update(cx, |model, c| {
@@ -3439,10 +3545,10 @@ mod tests {
let observed_model = cx.add_model(|_| Model);
observing_view.update(cx, |_, cx| {
- cx.observe(&observed_model, |_, _, _| {});
+ cx.observe(&observed_model, |_, _, _| {}).detach();
});
observing_model.update(cx, |_, cx| {
- cx.observe(&observed_model, |_, _, _| {});
+ cx.observe(&observed_model, |_, _, _| {}).detach();
});
cx.update(|| {