@@ -1,6 +1,6 @@
use crate::{
elements::Element,
- executor::{self},
+ executor::{self, ForegroundTask},
keymap::{self, Keystroke},
platform::{self, App as _},
util::post_inc,
@@ -292,7 +292,8 @@ pub struct MutableAppContext {
HashMap<usize, Box<dyn FnMut(WindowInvalidation, &mut MutableAppContext)>>,
foreground: Rc<executor::Foreground>,
background: Arc<executor::Background>,
- task_callbacks: HashMap<usize, TaskCallback>,
+ future_handlers: HashMap<usize, FutureHandler>,
+ stream_handlers: HashMap<usize, StreamHandler>,
task_done: (channel::Sender<usize>, channel::Receiver<usize>),
pending_effects: VecDeque<Effect>,
pending_flushes: usize,
@@ -321,7 +322,8 @@ impl MutableAppContext {
invalidation_callbacks: HashMap::new(),
foreground,
background: Arc::new(executor::Background::new()),
- task_callbacks: HashMap::new(),
+ future_handlers: HashMap::new(),
+ stream_handlers: HashMap::new(),
task_done: channel::unbounded(),
pending_effects: VecDeque::new(),
pending_flushes: 0,
@@ -869,97 +871,71 @@ impl MutableAppContext {
self.flush_effects();
}
- fn spawn_local<F>(&mut self, future: F) -> usize
+ fn spawn<F, T>(&mut self, future: F) -> (usize, ForegroundTask<Option<T>>)
where
F: 'static + Future,
+ T: 'static,
{
let task_id = post_inc(&mut self.next_task_id);
- let app = self.weak_self.as_ref().unwrap().clone();
- self.foreground
- .spawn(async move {
- let output = future.await;
- if let Some(app) = app.upgrade() {
- app.borrow_mut()
- .relay_task_output(task_id, Box::new(output));
- }
- })
- .detach();
- task_id
+ let app = self.weak_self.as_ref().unwrap().upgrade().unwrap();
+ let task = self.foreground.spawn(async move {
+ let output = future.await;
+ app.borrow_mut()
+ .handle_future_output(task_id, Box::new(output))
+ .map(|result| *result.downcast::<T>().unwrap())
+ });
+ (task_id, task)
}
- fn spawn_stream_local<F>(&mut self, mut stream: F, done_tx: channel::Sender<()>) -> usize
+ fn spawn_stream<F, T>(&mut self, mut stream: F) -> (usize, ForegroundTask<Option<T>>)
where
F: 'static + Stream + Unpin,
+ T: 'static,
{
let task_id = post_inc(&mut self.next_task_id);
- let app = self.weak_self.as_ref().unwrap().clone();
- self.foreground
- .spawn(async move {
- loop {
- match stream.next().await {
- item @ Some(_) => {
- if let Some(app) = app.upgrade() {
- let mut app = app.borrow_mut();
- if app.relay_task_output(task_id, Box::new(item)) {
- app.stream_completed(task_id);
- break;
- }
- } else {
- break;
- }
- }
- item @ None => {
- if let Some(app) = app.upgrade() {
- let mut app = app.borrow_mut();
- app.relay_task_output(task_id, Box::new(item));
- app.stream_completed(task_id);
- }
- let _ = done_tx.send(()).await;
+ let app = self.weak_self.as_ref().unwrap().upgrade().unwrap();
+ let task = self.foreground.spawn(async move {
+ loop {
+ match stream.next().await {
+ Some(item) => {
+ let mut app = app.borrow_mut();
+ if app.handle_stream_item(task_id, Box::new(item)) {
break;
}
}
+ None => {
+ break;
+ }
}
- })
- .detach();
- task_id
+ }
+
+ app.borrow_mut()
+ .stream_completed(task_id)
+ .map(|result| *result.downcast::<T>().unwrap())
+ });
+
+ (task_id, task)
}
- fn relay_task_output(&mut self, task_id: usize, output: Box<dyn Any>) -> bool {
+ fn handle_future_output(
+ &mut self,
+ task_id: usize,
+ output: Box<dyn Any>,
+ ) -> Option<Box<dyn Any>> {
self.pending_flushes += 1;
- let task_callback = self.task_callbacks.remove(&task_id).unwrap();
+ let future_callback = self.future_handlers.remove(&task_id).unwrap();
+
+ let mut result = None;
- let halt = match task_callback {
- TaskCallback::OnModelFromFuture { model_id, callback } => {
+ match future_callback {
+ FutureHandler::Model { model_id, callback } => {
if let Some(mut model) = self.ctx.models.remove(&model_id) {
- callback(
- model.as_any_mut(),
- output,
- self,
- model_id,
- self.foreground.clone(),
- );
+ result = Some(callback(model.as_any_mut(), output, self, model_id));
self.ctx.models.insert(model_id, model);
}
self.task_done(task_id);
- true
}
- TaskCallback::OnModelFromStream {
- model_id,
- mut callback,
- } => {
- if let Some(mut model) = self.ctx.models.remove(&model_id) {
- let halt = callback(model.as_any_mut(), output, self, model_id);
- self.ctx.models.insert(model_id, model);
- self.task_callbacks.insert(
- task_id,
- TaskCallback::OnModelFromStream { model_id, callback },
- );
- halt
- } else {
- true
- }
- }
- TaskCallback::OnViewFromFuture {
+ FutureHandler::View {
window_id,
view_id,
callback,
@@ -970,14 +946,7 @@ impl MutableAppContext {
.get_mut(&window_id)
.and_then(|w| w.views.remove(&view_id))
{
- callback(
- view.as_mut(),
- output,
- self,
- window_id,
- view_id,
- self.foreground.clone(),
- );
+ result = Some(callback(view.as_mut(), output, self, window_id, view_id));
self.ctx
.windows
.get_mut(&window_id)
@@ -986,12 +955,37 @@ impl MutableAppContext {
.insert(view_id, view);
}
self.task_done(task_id);
- true
}
- TaskCallback::OnViewFromStream {
+ };
+
+ self.flush_effects();
+ result
+ }
+
+ fn handle_stream_item(&mut self, task_id: usize, output: Box<dyn Any>) -> bool {
+ self.pending_flushes += 1;
+
+ let mut handler = self.stream_handlers.remove(&task_id).unwrap();
+ let halt = match &mut handler {
+ StreamHandler::Model {
+ model_id,
+ item_callback,
+ ..
+ } => {
+ if let Some(mut model) = self.ctx.models.remove(&model_id) {
+ let halt = item_callback(model.as_any_mut(), output, self, *model_id);
+ self.ctx.models.insert(*model_id, model);
+ self.stream_handlers.insert(task_id, handler);
+ halt
+ } else {
+ true
+ }
+ }
+ StreamHandler::View {
window_id,
view_id,
- mut callback,
+ item_callback,
+ ..
} => {
if let Some(mut view) = self
.ctx
@@ -999,34 +993,67 @@ impl MutableAppContext {
.get_mut(&window_id)
.and_then(|w| w.views.remove(&view_id))
{
- let halt = callback(view.as_mut(), output, self, window_id, view_id);
+ let halt = item_callback(view.as_mut(), output, self, *window_id, *view_id);
self.ctx
.windows
.get_mut(&window_id)
.unwrap()
.views
- .insert(view_id, view);
- self.task_callbacks.insert(
- task_id,
- TaskCallback::OnViewFromStream {
- window_id,
- view_id,
- callback,
- },
- );
+ .insert(*view_id, view);
+ self.stream_handlers.insert(task_id, handler);
halt
} else {
true
}
}
};
+
self.flush_effects();
halt
}
- fn stream_completed(&mut self, task_id: usize) {
- self.task_callbacks.remove(&task_id);
+ fn stream_completed(&mut self, task_id: usize) -> Option<Box<dyn Any>> {
+ let result = match self.stream_handlers.remove(&task_id).unwrap() {
+ StreamHandler::Model {
+ model_id,
+ done_callback,
+ ..
+ } => {
+ if let Some(mut model) = self.ctx.models.remove(&model_id) {
+ let result = done_callback(model.as_any_mut(), self, model_id);
+ self.ctx.models.insert(model_id, model);
+ Some(result)
+ } else {
+ None
+ }
+ }
+ StreamHandler::View {
+ window_id,
+ view_id,
+ done_callback,
+ ..
+ } => {
+ if let Some(mut view) = self
+ .ctx
+ .windows
+ .get_mut(&window_id)
+ .and_then(|w| w.views.remove(&view_id))
+ {
+ let result = done_callback(view.as_mut(), self, window_id, view_id);
+ self.ctx
+ .windows
+ .get_mut(&window_id)
+ .unwrap()
+ .views
+ .insert(view_id, view);
+ Some(result)
+ } else {
+ None
+ }
+ }
+ };
self.task_done(task_id);
+ result
}
fn task_done(&self, task_id: usize) {
@@ -1039,7 +1066,7 @@ impl MutableAppContext {
}
pub fn finish_pending_tasks(&self) -> impl Future<Output = ()> {
- let mut pending_tasks = self.task_callbacks.keys().cloned().collect::<HashSet<_>>();
+ let mut pending_tasks = self.future_handlers.keys().cloned().collect::<HashSet<_>>();
let task_done = self.task_done.1.clone();
async move {
@@ -1404,82 +1431,68 @@ impl<'a, T: Entity> ModelContext<'a, T> {
});
}
- pub fn spawn_local<S, F, U>(&mut self, future: S, callback: F) -> impl Future<Output = U>
+ pub fn spawn<S, F, U>(&mut self, future: S, callback: F) -> ForegroundTask<Option<U>>
where
S: 'static + Future,
F: 'static + FnOnce(&mut T, S::Output, &mut ModelContext<T>) -> U,
U: 'static,
{
- let (tx, rx) = channel::bounded(1);
+ let (task_id, task) = self.app.spawn::<S, U>(future);
- let task_id = self.app.spawn_local(future);
-
- self.app.task_callbacks.insert(
+ self.app.future_handlers.insert(
task_id,
- TaskCallback::OnModelFromFuture {
+ FutureHandler::Model {
model_id: self.model_id,
- callback: Box::new(move |model, output, app, model_id, executor| {
+ callback: Box::new(move |model, output, app, model_id| {
let model = model.downcast_mut().unwrap();
let output = *output.downcast().unwrap();
- let result = callback(model, output, &mut ModelContext::new(app, model_id));
- executor
- .spawn(async move { tx.send(result).await })
- .detach();
+ Box::new(callback(
+ model,
+ output,
+ &mut ModelContext::new(app, model_id),
+ ))
}),
},
);
- async move { rx.recv().await.unwrap() }
+ task
}
- pub fn spawn<S, F, U>(&mut self, future: S, callback: F) -> impl Future<Output = U>
- where
- S: 'static + Future + Send,
- S::Output: Send,
- F: 'static + FnOnce(&mut T, S::Output, &mut ModelContext<T>) -> U,
- U: 'static,
- {
- let (tx, rx) = channel::bounded(1);
-
- self.app
- .background
- .spawn(async move {
- if let Err(e) = tx.send(future.await).await {
- log::error!("error sending background task result to main thread: {}", e);
- }
- })
- .detach();
-
- self.spawn_local(async move { rx.recv().await.unwrap() }, callback)
- }
-
- pub fn spawn_stream_local<S, F>(
+ pub fn spawn_stream<S, F, G, U>(
&mut self,
stream: S,
- mut callback: F,
- ) -> impl Future<Output = ()>
+ mut item_callback: F,
+ done_callback: G,
+ ) -> ForegroundTask<Option<U>>
where
S: 'static + Stream + Unpin,
- F: 'static + FnMut(&mut T, Option<S::Item>, &mut ModelContext<T>),
+ F: 'static + FnMut(&mut T, S::Item, &mut ModelContext<T>),
+ G: 'static + FnOnce(&mut T, &mut ModelContext<T>) -> U,
+ U: 'static + Any,
{
- let (tx, rx) = channel::bounded(1);
-
- let task_id = self.app.spawn_stream_local(stream, tx);
- self.app.task_callbacks.insert(
+ let (task_id, task) = self.app.spawn_stream(stream);
+ self.app.stream_handlers.insert(
task_id,
- TaskCallback::OnModelFromStream {
+ StreamHandler::Model {
model_id: self.model_id,
- callback: Box::new(move |model, output, app, model_id| {
+ item_callback: Box::new(move |model, output, app, model_id| {
let model = model.downcast_mut().unwrap();
let output = *output.downcast().unwrap();
let mut ctx = ModelContext::new(app, model_id);
- callback(model, output, &mut ctx);
+ item_callback(model, output, &mut ctx);
ctx.halt_stream
}),
+ done_callback: Box::new(
+ move |model: &mut dyn Any, app: &mut MutableAppContext, model_id| {
+ let model = model.downcast_mut().unwrap();
+ let mut ctx = ModelContext::new(app, model_id);
+ Box::new(done_callback(model, &mut ctx))
+ },
+ ),
},
);
- async move { rx.recv().await.unwrap() }
+ task
}
}
@@ -1674,85 +1687,67 @@ impl<'a, T: View> ViewContext<'a, T> {
self.halt_stream = true;
}
- pub fn spawn_local<S, F, U>(&mut self, future: S, callback: F) -> impl Future<Output = U>
+ pub fn spawn<S, F, U>(&mut self, future: S, callback: F) -> ForegroundTask<Option<U>>
where
S: 'static + Future,
F: 'static + FnOnce(&mut T, S::Output, &mut ViewContext<T>) -> U,
U: 'static,
{
- let (tx, rx) = channel::bounded(1);
+ let (task_id, task) = self.app.spawn(future);
- let task_id = self.app.spawn_local(future);
-
- self.app.task_callbacks.insert(
+ self.app.future_handlers.insert(
task_id,
- TaskCallback::OnViewFromFuture {
+ FutureHandler::View {
window_id: self.window_id,
view_id: self.view_id,
- callback: Box::new(move |view, output, app, window_id, view_id, executor| {
+ callback: Box::new(move |view, output, app, window_id, view_id| {
let view = view.as_any_mut().downcast_mut().unwrap();
let output = *output.downcast().unwrap();
- let result =
- callback(view, output, &mut ViewContext::new(app, window_id, view_id));
- executor
- .spawn(async move { tx.send(result).await })
- .detach();
+ Box::new(callback(
+ view,
+ output,
+ &mut ViewContext::new(app, window_id, view_id),
+ ))
}),
},
);
- async move { rx.recv().await.unwrap() }
+ task
}
- pub fn spawn<S, F, U>(&mut self, future: S, callback: F) -> impl Future<Output = U>
- where
- S: 'static + Future + Send,
- S::Output: Send,
- F: 'static + FnOnce(&mut T, S::Output, &mut ViewContext<T>) -> U,
- U: 'static,
- {
- let (tx, rx) = channel::bounded(1);
-
- self.app
- .background
- .spawn(async move {
- if let Err(_) = tx.send(future.await).await {
- log::error!("Error sending background task result to main thread",);
- }
- })
- .detach();
-
- self.spawn_local(async move { rx.recv().await.unwrap() }, callback)
- }
-
- pub fn spawn_stream_local<S, F>(
+ pub fn spawn_stream<S, F, G, U>(
&mut self,
stream: S,
- mut callback: F,
- ) -> impl Future<Output = ()>
+ mut item_callback: F,
+ done_callback: G,
+ ) -> ForegroundTask<Option<U>>
where
S: 'static + Stream + Unpin,
- F: 'static + FnMut(&mut T, Option<S::Item>, &mut ViewContext<T>),
+ F: 'static + FnMut(&mut T, S::Item, &mut ViewContext<T>),
+ G: 'static + FnOnce(&mut T, &mut ViewContext<T>) -> U,
+ U: 'static + Any,
{
- let (tx, rx) = channel::bounded(1);
-
- let task_id = self.app.spawn_stream_local(stream, tx);
- self.app.task_callbacks.insert(
+ let (task_id, task) = self.app.spawn_stream(stream);
+ self.app.stream_handlers.insert(
task_id,
- TaskCallback::OnViewFromStream {
+ StreamHandler::View {
window_id: self.window_id,
view_id: self.view_id,
- callback: Box::new(move |view, output, app, window_id, view_id| {
+ item_callback: Box::new(move |view, output, app, window_id, view_id| {
let view = view.as_any_mut().downcast_mut().unwrap();
let output = *output.downcast().unwrap();
let mut ctx = ViewContext::new(app, window_id, view_id);
- callback(view, output, &mut ctx);
+ item_callback(view, output, &mut ctx);
ctx.halt_stream
}),
+ done_callback: Box::new(move |view, app, window_id, view_id| {
+ let view = view.as_any_mut().downcast_mut().unwrap();
+ let mut ctx = ViewContext::new(app, window_id, view_id);
+ Box::new(done_callback(view, &mut ctx))
+ }),
},
);
-
- async move { rx.recv().await.unwrap() }
+ task
}
}
@@ -2192,24 +2187,14 @@ enum Observation {
},
}
-enum TaskCallback {
- OnModelFromFuture {
+enum FutureHandler {
+ Model {
model_id: usize,
callback: Box<
- dyn FnOnce(
- &mut dyn Any,
- Box<dyn Any>,
- &mut MutableAppContext,
- usize,
- Rc<executor::Foreground>,
- ),
+ dyn FnOnce(&mut dyn Any, Box<dyn Any>, &mut MutableAppContext, usize) -> Box<dyn Any>,
>,
},
- OnModelFromStream {
- model_id: usize,
- callback: Box<dyn FnMut(&mut dyn Any, Box<dyn Any>, &mut MutableAppContext, usize) -> bool>,
- },
- OnViewFromFuture {
+ View {
window_id: usize,
view_id: usize,
callback: Box<
@@ -2219,16 +2204,26 @@ enum TaskCallback {
&mut MutableAppContext,
usize,
usize,
- Rc<executor::Foreground>,
- ),
+ ) -> Box<dyn Any>,
>,
},
- OnViewFromStream {
+}
+
+enum StreamHandler {
+ Model {
+ model_id: usize,
+ item_callback:
+ Box<dyn FnMut(&mut dyn Any, Box<dyn Any>, &mut MutableAppContext, usize) -> bool>,
+ done_callback: Box<dyn FnOnce(&mut dyn Any, &mut MutableAppContext, usize) -> Box<dyn Any>>,
+ },
+ View {
window_id: usize,
view_id: usize,
- callback: Box<
+ item_callback: Box<
dyn FnMut(&mut dyn AnyView, Box<dyn Any>, &mut MutableAppContext, usize, usize) -> bool,
>,
+ done_callback:
+ Box<dyn FnOnce(&mut dyn AnyView, &mut MutableAppContext, usize, usize) -> Box<dyn Any>>,
},
}
@@ -2395,7 +2390,7 @@ mod tests {
let handle = app.add_model(|_| Model::default());
handle
.update(&mut app, |_, c| {
- c.spawn_local(async { 7 }, |model, output, _| {
+ c.spawn(async { 7 }, |model, output, _| {
model.count = output;
})
})
@@ -2428,9 +2423,15 @@ mod tests {
let handle = app.add_model(|_| Model::default());
handle
.update(&mut app, |_, c| {
- c.spawn_stream_local(smol::stream::iter(vec![1, 2, 3]), |model, output, _| {
- model.events.push(output);
- })
+ c.spawn_stream(
+ smol::stream::iter(vec![1, 2, 3]),
+ |model, output, _| {
+ model.events.push(Some(output));
+ },
+ |model, _| {
+ model.events.push(None);
+ },
+ )
})
.await;
@@ -2802,7 +2803,7 @@ mod tests {
let (_, handle) = app.add_window(|_| View::default());
handle
.update(&mut app, |_, c| {
- c.spawn_local(async { 7 }, |me, output, _| {
+ c.spawn(async { 7 }, |me, output, _| {
me.count = output;
})
})
@@ -2844,9 +2845,15 @@ mod tests {
let (_, handle) = app.add_window(|_| View::default());
handle
.update(&mut app, |_, c| {
- c.spawn_stream_local(smol::stream::iter(vec![1, 2, 3]), |me, output, _| {
- me.events.push(output);
- })
+ c.spawn_stream(
+ smol::stream::iter(vec![1_usize, 2, 3]),
+ |me, output, _| {
+ me.events.push(Some(output));
+ },
+ |me, _| {
+ me.events.push(None);
+ },
+ )
})
.await;
@@ -3159,19 +3166,21 @@ mod tests {
model.update(&mut app, |_, ctx| {
let _ = ctx.spawn(async {}, |_, _, _| {});
- let _ = ctx.spawn_local(async {}, |_, _, _| {});
- let _ = ctx.spawn_stream_local(smol::stream::iter(vec![1, 2, 3]), |_, _, _| {});
+ let _ = ctx.spawn(async {}, |_, _, _| {});
+ let _ =
+ ctx.spawn_stream(smol::stream::iter(vec![1, 2, 3]), |_, _, _| {}, |_, _| {});
});
view.update(&mut app, |_, ctx| {
let _ = ctx.spawn(async {}, |_, _, _| {});
- let _ = ctx.spawn_local(async {}, |_, _, _| {});
- let _ = ctx.spawn_stream_local(smol::stream::iter(vec![1, 2, 3]), |_, _, _| {});
+ let _ = ctx.spawn(async {}, |_, _, _| {});
+ let _ =
+ ctx.spawn_stream(smol::stream::iter(vec![1, 2, 3]), |_, _, _| {}, |_, _| {});
});
- assert!(!app.0.borrow().task_callbacks.is_empty());
+ assert!(!app.0.borrow().future_handlers.is_empty());
app.finish_pending_tasks().await;
- assert!(app.0.borrow().task_callbacks.is_empty());
+ assert!(app.0.borrow().future_handlers.is_empty());
app.finish_pending_tasks().await; // Don't block if there are no tasks
});
}