Merge pull request #56 from zed-industries/async-ctx

Max Brunsfeld created

Simplify spawning and asynchronous code

Change summary

gpui/src/app.rs               | 598 +++++++++---------------------------
zed/src/editor/buffer/mod.rs  |  29 
zed/src/editor/buffer_view.rs |  29 -
zed/src/file_finder.rs        |   8 
zed/src/workspace.rs          | 102 +++---
zed/src/worktree.rs           |  50 ++
6 files changed, 271 insertions(+), 545 deletions(-)

Detailed changes

gpui/src/app.rs 🔗

@@ -8,6 +8,7 @@ use crate::{
     AssetCache, AssetSource, ClipboardItem, FontCache, PathPromptOptions, TextLayoutCache,
 };
 use anyhow::{anyhow, Result};
+use async_task::Task;
 use keymap::MatchResult;
 use parking_lot::{Mutex, RwLock};
 use pathfinder_geometry::{rect::RectF, vector::vec2f};
@@ -50,6 +51,14 @@ pub trait ReadModel {
     fn read_model<T: Entity>(&self, handle: &ModelHandle<T>) -> &T;
 }
 
+pub trait ReadModelWith {
+    fn read_model_with<E: Entity, F: FnOnce(&E, &AppContext) -> T, T>(
+        &self,
+        handle: &ModelHandle<E>,
+        read: F,
+    ) -> T;
+}
+
 pub trait UpdateModel {
     fn update_model<T, F, S>(&mut self, handle: &ModelHandle<T>, update: F) -> S
     where
@@ -61,6 +70,13 @@ pub trait ReadView {
     fn read_view<T: View>(&self, handle: &ViewHandle<T>) -> &T;
 }
 
+pub trait ReadViewWith {
+    fn read_view_with<V, F, T>(&self, handle: &ViewHandle<V>, read: F) -> T
+    where
+        V: View,
+        F: FnOnce(&V, &AppContext) -> T;
+}
+
 pub trait UpdateView {
     fn update_view<T, F, S>(&mut self, handle: &ViewHandle<T>, update: F) -> S
     where
@@ -86,6 +102,8 @@ pub enum MenuItem<'a> {
 #[derive(Clone)]
 pub struct App(Rc<RefCell<MutableAppContext>>);
 
+pub struct AsyncAppContext(Rc<RefCell<MutableAppContext>>);
+
 #[derive(Clone)]
 pub struct TestAppContext(Rc<RefCell<MutableAppContext>>, Rc<platform::test::Platform>);
 
@@ -345,6 +363,80 @@ impl TestAppContext {
     }
 }
 
+impl AsyncAppContext {
+    pub fn read<T, F: FnOnce(&AppContext) -> T>(&mut self, callback: F) -> T {
+        callback(self.0.borrow().as_ref())
+    }
+
+    pub fn update<T, F: FnOnce(&mut MutableAppContext) -> T>(&mut self, callback: F) -> T {
+        let mut state = self.0.borrow_mut();
+        state.pending_flushes += 1;
+        let result = callback(&mut *state);
+        state.flush_effects();
+        result
+    }
+
+    pub fn add_model<T, F>(&mut self, build_model: F) -> ModelHandle<T>
+    where
+        T: Entity,
+        F: FnOnce(&mut ModelContext<T>) -> T,
+    {
+        self.update(|ctx| ctx.add_model(build_model))
+    }
+}
+
+impl UpdateModel for AsyncAppContext {
+    fn update_model<T, F, S>(&mut self, handle: &ModelHandle<T>, update: F) -> S
+    where
+        T: Entity,
+        F: FnOnce(&mut T, &mut ModelContext<T>) -> S,
+    {
+        let mut state = self.0.borrow_mut();
+        state.pending_flushes += 1;
+        let result = state.update_model(handle, update);
+        state.flush_effects();
+        result
+    }
+}
+
+impl ReadModelWith for AsyncAppContext {
+    fn read_model_with<E: Entity, F: FnOnce(&E, &AppContext) -> T, T>(
+        &self,
+        handle: &ModelHandle<E>,
+        read: F,
+    ) -> T {
+        let ctx = self.0.borrow();
+        let ctx = ctx.as_ref();
+        read(handle.read(ctx), ctx)
+    }
+}
+
+impl UpdateView for AsyncAppContext {
+    fn update_view<T, F, S>(&mut self, handle: &ViewHandle<T>, update: F) -> S
+    where
+        T: View,
+        F: FnOnce(&mut T, &mut ViewContext<T>) -> S,
+    {
+        let mut state = self.0.borrow_mut();
+        state.pending_flushes += 1;
+        let result = state.update_view(handle, update);
+        state.flush_effects();
+        result
+    }
+}
+
+impl ReadViewWith for AsyncAppContext {
+    fn read_view_with<V, F, T>(&self, handle: &ViewHandle<V>, read: F) -> T
+    where
+        V: View,
+        F: FnOnce(&V, &AppContext) -> T,
+    {
+        let ctx = self.0.borrow();
+        let ctx = ctx.as_ref();
+        read(handle.read(ctx), ctx)
+    }
+}
+
 impl UpdateModel for TestAppContext {
     fn update_model<T, F, S>(&mut self, handle: &ModelHandle<T>, update: F) -> S
     where
@@ -359,6 +451,18 @@ impl UpdateModel for TestAppContext {
     }
 }
 
+impl ReadModelWith for TestAppContext {
+    fn read_model_with<E: Entity, F: FnOnce(&E, &AppContext) -> T, T>(
+        &self,
+        handle: &ModelHandle<E>,
+        read: F,
+    ) -> T {
+        let ctx = self.0.borrow();
+        let ctx = ctx.as_ref();
+        read(handle.read(ctx), ctx)
+    }
+}
+
 impl UpdateView for TestAppContext {
     fn update_view<T, F, S>(&mut self, handle: &ViewHandle<T>, update: F) -> S
     where
@@ -373,6 +477,18 @@ impl UpdateView for TestAppContext {
     }
 }
 
+impl ReadViewWith for TestAppContext {
+    fn read_view_with<V, F, T>(&self, handle: &ViewHandle<V>, read: F) -> T
+    where
+        V: View,
+        F: FnOnce(&V, &AppContext) -> T,
+    {
+        let ctx = self.0.borrow();
+        let ctx = ctx.as_ref();
+        read(handle.read(ctx), ctx)
+    }
+}
+
 type ActionCallback =
     dyn FnMut(&mut dyn AnyView, &dyn Any, &mut MutableAppContext, usize, usize) -> bool;
 
@@ -388,7 +504,6 @@ pub struct MutableAppContext {
     keystroke_matcher: keymap::Matcher,
     next_entity_id: usize,
     next_window_id: usize,
-    next_task_id: usize,
     subscriptions: HashMap<usize, Vec<Subscription>>,
     model_observations: HashMap<usize, Vec<ModelObservation>>,
     view_observations: HashMap<usize, Vec<ViewObservation>>,
@@ -396,8 +511,6 @@ pub struct MutableAppContext {
         HashMap<usize, (Rc<RefCell<Presenter>>, Box<dyn platform::Window>)>,
     debug_elements_callbacks: HashMap<usize, Box<dyn Fn(&AppContext) -> crate::json::Value>>,
     foreground: Rc<executor::Foreground>,
-    future_handlers: Rc<RefCell<HashMap<usize, FutureHandler>>>,
-    stream_handlers: Rc<RefCell<HashMap<usize, StreamHandler>>>,
     pending_effects: VecDeque<Effect>,
     pending_flushes: usize,
     flushing_effects: bool,
@@ -429,15 +542,12 @@ impl MutableAppContext {
             keystroke_matcher: keymap::Matcher::default(),
             next_entity_id: 0,
             next_window_id: 0,
-            next_task_id: 0,
             subscriptions: HashMap::new(),
             model_observations: HashMap::new(),
             view_observations: HashMap::new(),
             presenters_and_platform_windows: HashMap::new(),
             debug_elements_callbacks: HashMap::new(),
             foreground,
-            future_handlers: Default::default(),
-            stream_handlers: Default::default(),
             pending_effects: VecDeque::new(),
             pending_flushes: 0,
             flushing_effects: false,
@@ -1159,96 +1269,14 @@ impl MutableAppContext {
         self.flush_effects();
     }
 
-    fn spawn<F, T>(&mut self, spawner: Spawner, future: F) -> EntityTask<T>
-    where
-        F: 'static + Future,
-        T: 'static,
-    {
-        let task_id = post_inc(&mut self.next_task_id);
-        let app = self.weak_self.as_ref().unwrap().upgrade().unwrap();
-        let task = {
-            let app = app.clone();
-            self.foreground.spawn(async move {
-                let output = future.await;
-                app.borrow_mut()
-                    .handle_future_output(task_id, Box::new(output))
-                    .map(|output| *output.downcast::<T>().unwrap())
-            })
-        };
-        EntityTask::new(
-            task_id,
-            task,
-            spawner,
-            TaskHandlerMap::Future(self.future_handlers.clone()),
-        )
-    }
-
-    fn spawn_stream<F, T>(&mut self, spawner: Spawner, mut stream: F) -> EntityTask<T>
+    pub fn spawn<F, Fut, T>(&self, f: F) -> Task<T>
     where
-        F: 'static + Stream + Unpin,
+        F: FnOnce(AsyncAppContext) -> Fut,
+        Fut: 'static + Future<Output = T>,
         T: 'static,
     {
-        let task_id = post_inc(&mut self.next_task_id);
-        let app = self.weak_self.as_ref().unwrap().upgrade().unwrap();
-        let task = self.foreground.spawn(async move {
-            loop {
-                match stream.next().await {
-                    Some(item) => {
-                        let mut app = app.borrow_mut();
-                        if app.handle_stream_item(task_id, Box::new(item)) {
-                            break;
-                        }
-                    }
-                    None => {
-                        break;
-                    }
-                }
-            }
-
-            app.borrow_mut()
-                .stream_completed(task_id)
-                .map(|output| *output.downcast::<T>().unwrap())
-        });
-
-        EntityTask::new(
-            task_id,
-            task,
-            spawner,
-            TaskHandlerMap::Stream(self.stream_handlers.clone()),
-        )
-    }
-
-    fn handle_future_output(
-        &mut self,
-        task_id: usize,
-        output: Box<dyn Any>,
-    ) -> Option<Box<dyn Any>> {
-        self.pending_flushes += 1;
-        let future_callback = self.future_handlers.borrow_mut().remove(&task_id).unwrap();
-        let result = future_callback(output, self);
-        self.flush_effects();
-        result
-    }
-
-    fn handle_stream_item(&mut self, task_id: usize, output: Box<dyn Any>) -> bool {
-        self.pending_flushes += 1;
-
-        let mut handler = self.stream_handlers.borrow_mut().remove(&task_id).unwrap();
-        let halt = (handler.item_callback)(output, self);
-        self.stream_handlers.borrow_mut().insert(task_id, handler);
-
-        self.flush_effects();
-        halt
-    }
-
-    fn stream_completed(&mut self, task_id: usize) -> Option<Box<dyn Any>> {
-        self.pending_flushes += 1;
-
-        let handler = self.stream_handlers.borrow_mut().remove(&task_id).unwrap();
-        let result = (handler.done_callback)(self);
-
-        self.flush_effects();
-        result
+        let ctx = AsyncAppContext(self.weak_self.as_ref().unwrap().upgrade().unwrap());
+        self.foreground.spawn(f(ctx))
     }
 
     pub fn write_to_clipboard(&self, item: ClipboardItem) {
@@ -1624,76 +1652,14 @@ impl<'a, T: Entity> ModelContext<'a, T> {
         ModelHandle::new(self.model_id, &self.app.ctx.ref_counts)
     }
 
-    pub fn spawn<S, F, U>(&mut self, future: S, callback: F) -> EntityTask<U>
-    where
-        S: 'static + Future,
-        F: 'static + FnOnce(&mut T, S::Output, &mut ModelContext<T>) -> U,
-        U: 'static,
-    {
-        let handle = self.handle();
-        let weak_handle = handle.downgrade();
-        let task = self
-            .app
-            .spawn::<S, U>(Spawner::Model(handle.into()), future);
-
-        self.app.future_handlers.borrow_mut().insert(
-            task.id,
-            Box::new(move |output, ctx| {
-                weak_handle.upgrade(ctx.as_ref()).map(|handle| {
-                    let output = *output.downcast().unwrap();
-                    handle.update(ctx, |model, ctx| {
-                        Box::new(callback(model, output, ctx)) as Box<dyn Any>
-                    })
-                })
-            }),
-        );
-
-        task
-    }
-
-    pub fn spawn_stream<S, F, G, U>(
-        &mut self,
-        stream: S,
-        mut item_callback: F,
-        done_callback: G,
-    ) -> EntityTask<U>
+    pub fn spawn<F, Fut, S>(&self, f: F) -> Task<S>
     where
-        S: 'static + Stream + Unpin,
-        F: 'static + FnMut(&mut T, S::Item, &mut ModelContext<T>),
-        G: 'static + FnOnce(&mut T, &mut ModelContext<T>) -> U,
-        U: 'static + Any,
+        F: FnOnce(ModelHandle<T>, AsyncAppContext) -> Fut,
+        Fut: 'static + Future<Output = S>,
+        S: 'static,
     {
         let handle = self.handle();
-        let weak_handle = handle.downgrade();
-        let task = self.app.spawn_stream(Spawner::Model(handle.into()), stream);
-
-        self.app.stream_handlers.borrow_mut().insert(
-            task.id,
-            StreamHandler {
-                item_callback: {
-                    let weak_handle = weak_handle.clone();
-                    Box::new(move |output, app| {
-                        if let Some(handle) = weak_handle.upgrade(app.as_ref()) {
-                            let output = *output.downcast().unwrap();
-                            handle.update(app, |model, ctx| {
-                                item_callback(model, output, ctx);
-                                ctx.halt_stream
-                            })
-                        } else {
-                            true
-                        }
-                    })
-                },
-                done_callback: Box::new(move |app| {
-                    weak_handle.upgrade(app.as_ref()).map(|handle| {
-                        handle.update(app, |model, ctx| Box::new(done_callback(model, ctx)))
-                            as Box<dyn Any>
-                    })
-                }),
-            },
-        );
-
-        task
+        self.app.spawn(|ctx| f(handle, ctx))
     }
 }
 
@@ -1731,7 +1697,6 @@ pub struct ViewContext<'a, T: ?Sized> {
     view_id: usize,
     view_type: PhantomData<T>,
     halt_action_dispatch: bool,
-    halt_stream: bool,
 }
 
 impl<'a, T: View> ViewContext<'a, T> {
@@ -1742,7 +1707,6 @@ impl<'a, T: View> ViewContext<'a, T> {
             view_id,
             view_type: PhantomData,
             halt_action_dispatch: true,
-            halt_stream: false,
         }
     }
 
@@ -1944,77 +1908,14 @@ impl<'a, T: View> ViewContext<'a, T> {
         self.halt_action_dispatch = false;
     }
 
-    pub fn halt_stream(&mut self) {
-        self.halt_stream = true;
-    }
-
-    pub fn spawn<S, F, U>(&mut self, future: S, callback: F) -> EntityTask<U>
+    pub fn spawn<F, Fut, S>(&self, f: F) -> Task<S>
     where
-        S: 'static + Future,
-        F: 'static + FnOnce(&mut T, S::Output, &mut ViewContext<T>) -> U,
-        U: 'static,
+        F: FnOnce(ViewHandle<T>, AsyncAppContext) -> Fut,
+        Fut: 'static + Future<Output = S>,
+        S: 'static,
     {
         let handle = self.handle();
-        let weak_handle = handle.downgrade();
-        let task = self.app.spawn(Spawner::View(handle.into()), future);
-
-        self.app.future_handlers.borrow_mut().insert(
-            task.id,
-            Box::new(move |output, app| {
-                weak_handle.upgrade(app.as_ref()).map(|handle| {
-                    let output = *output.downcast().unwrap();
-                    handle.update(app, |view, ctx| {
-                        Box::new(callback(view, output, ctx)) as Box<dyn Any>
-                    })
-                })
-            }),
-        );
-
-        task
-    }
-
-    pub fn spawn_stream<S, F, G, U>(
-        &mut self,
-        stream: S,
-        mut item_callback: F,
-        done_callback: G,
-    ) -> EntityTask<U>
-    where
-        S: 'static + Stream + Unpin,
-        F: 'static + FnMut(&mut T, S::Item, &mut ViewContext<T>),
-        G: 'static + FnOnce(&mut T, &mut ViewContext<T>) -> U,
-        U: 'static + Any,
-    {
-        let handle = self.handle();
-        let weak_handle = handle.downgrade();
-        let task = self.app.spawn_stream(Spawner::View(handle.into()), stream);
-        self.app.stream_handlers.borrow_mut().insert(
-            task.id,
-            StreamHandler {
-                item_callback: {
-                    let weak_handle = weak_handle.clone();
-                    Box::new(move |output, ctx| {
-                        if let Some(handle) = weak_handle.upgrade(ctx.as_ref()) {
-                            let output = *output.downcast().unwrap();
-                            handle.update(ctx, |view, ctx| {
-                                item_callback(view, output, ctx);
-                                ctx.halt_stream
-                            })
-                        } else {
-                            true
-                        }
-                    })
-                },
-                done_callback: Box::new(move |ctx| {
-                    weak_handle.upgrade(ctx.as_ref()).map(|handle| {
-                        handle.update(ctx, |view, ctx| {
-                            Box::new(done_callback(view, ctx)) as Box<dyn Any>
-                        })
-                    })
-                }),
-            },
-        );
-        task
+        self.app.spawn(|ctx| f(handle, ctx))
     }
 }
 
@@ -2107,6 +2008,14 @@ impl<T: Entity> ModelHandle<T> {
         app.read_model(self)
     }
 
+    pub fn read_with<'a, A, F, S>(&self, ctx: &A, read: F) -> S
+    where
+        A: ReadModelWith,
+        F: FnOnce(&T, &AppContext) -> S,
+    {
+        ctx.read_model_with(self, read)
+    }
+
     pub fn update<A, F, S>(&self, app: &mut A, update: F) -> S
     where
         A: UpdateModel,
@@ -2249,9 +2158,10 @@ impl<T: Entity> WeakModelHandle<T> {
         }
     }
 
-    pub fn upgrade(&self, app: &AppContext) -> Option<ModelHandle<T>> {
-        if app.models.contains_key(&self.model_id) {
-            Some(ModelHandle::new(self.model_id, &app.ref_counts))
+    pub fn upgrade(&self, ctx: impl AsRef<AppContext>) -> Option<ModelHandle<T>> {
+        let ctx = ctx.as_ref();
+        if ctx.models.contains_key(&self.model_id) {
+            Some(ModelHandle::new(self.model_id, &ctx.ref_counts))
         } else {
             None
         }
@@ -2301,6 +2211,14 @@ impl<T: View> ViewHandle<T> {
         app.read_view(self)
     }
 
+    pub fn read_with<A, F, S>(&self, ctx: &A, read: F) -> S
+    where
+        A: ReadViewWith,
+        F: FnOnce(&T, &AppContext) -> S,
+    {
+        ctx.read_view_with(self, read)
+    }
+
     pub fn update<A, F, S>(&self, app: &mut A, update: F) -> S
     where
         A: UpdateView,
@@ -2711,86 +2629,6 @@ struct ViewObservation {
     callback: Box<dyn FnMut(&mut dyn Any, usize, usize, &mut MutableAppContext, usize, usize)>,
 }
 
-type FutureHandler = Box<dyn FnOnce(Box<dyn Any>, &mut MutableAppContext) -> Option<Box<dyn Any>>>;
-
-struct StreamHandler {
-    item_callback: Box<dyn FnMut(Box<dyn Any>, &mut MutableAppContext) -> bool>,
-    done_callback: Box<dyn FnOnce(&mut MutableAppContext) -> Option<Box<dyn Any>>>,
-}
-
-#[must_use]
-pub struct EntityTask<T> {
-    id: usize,
-    task: Option<executor::Task<Option<T>>>,
-    _spawner: Spawner, // Keeps the spawning entity alive for as long as the task exists
-    handler_map: TaskHandlerMap,
-}
-
-pub enum Spawner {
-    Model(AnyModelHandle),
-    View(AnyViewHandle),
-}
-
-enum TaskHandlerMap {
-    Detached,
-    Future(Rc<RefCell<HashMap<usize, FutureHandler>>>),
-    Stream(Rc<RefCell<HashMap<usize, StreamHandler>>>),
-}
-
-impl<T> EntityTask<T> {
-    fn new(
-        id: usize,
-        task: executor::Task<Option<T>>,
-        spawner: Spawner,
-        handler_map: TaskHandlerMap,
-    ) -> Self {
-        Self {
-            id,
-            task: Some(task),
-            _spawner: spawner,
-            handler_map,
-        }
-    }
-
-    pub fn detach(mut self) {
-        self.handler_map = TaskHandlerMap::Detached;
-        self.task.take().unwrap().detach();
-    }
-
-    pub async fn cancel(mut self) -> Option<T> {
-        let task = self.task.take().unwrap();
-        task.cancel().await.unwrap()
-    }
-}
-
-impl<T> Future for EntityTask<T> {
-    type Output = T;
-
-    fn poll(
-        self: std::pin::Pin<&mut Self>,
-        ctx: &mut std::task::Context<'_>,
-    ) -> std::task::Poll<Self::Output> {
-        let task = unsafe { self.map_unchecked_mut(|task| task.task.as_mut().unwrap()) };
-        task.poll(ctx).map(|output| output.unwrap())
-    }
-}
-
-impl<T> Drop for EntityTask<T> {
-    fn drop(self: &mut Self) {
-        match &self.handler_map {
-            TaskHandlerMap::Detached => {
-                return;
-            }
-            TaskHandlerMap::Future(map) => {
-                map.borrow_mut().remove(&self.id);
-            }
-            TaskHandlerMap::Stream(map) => {
-                map.borrow_mut().remove(&self.id);
-            }
-        }
-    }
-}
-
 #[cfg(test)]
 mod tests {
     use super::*;
@@ -2926,61 +2764,6 @@ mod tests {
         assert_eq!(handle_1.read(app).events, vec![7, 10, 5])
     }
 
-    #[crate::test(self)]
-    async fn test_spawn_from_model(mut app: TestAppContext) {
-        #[derive(Default)]
-        struct Model {
-            count: usize,
-        }
-
-        impl Entity for Model {
-            type Event = ();
-        }
-
-        let handle = app.add_model(|_| Model::default());
-        handle
-            .update(&mut app, |_, c| {
-                c.spawn(async { 7 }, |model, output, _| {
-                    model.count = output;
-                })
-            })
-            .await;
-        app.read(|ctx| assert_eq!(handle.read(ctx).count, 7));
-
-        handle
-            .update(&mut app, |_, c| {
-                c.spawn(async { 14 }, |model, output, _| {
-                    model.count = output;
-                })
-            })
-            .await;
-        app.read(|ctx| assert_eq!(handle.read(ctx).count, 14));
-    }
-
-    #[crate::test(self)]
-    async fn test_spawn_stream_local_from_model(mut app: TestAppContext) {
-        #[derive(Default)]
-        struct Model {
-            events: Vec<Option<usize>>,
-        }
-
-        impl Entity for Model {
-            type Event = ();
-        }
-
-        let handle = app.add_model(|_| Model::default());
-        handle
-            .update(&mut app, |_, c| {
-                c.spawn_stream(
-                    smol::stream::iter(vec![1, 2, 3]),
-                    |model, output, _| model.events.push(Some(output)),
-                    |model, _| model.events.push(None),
-                )
-            })
-            .await;
-        app.read(|ctx| assert_eq!(handle.read(ctx).events, [Some(1), Some(2), Some(3), None]));
-    }
-
     #[crate::test(self)]
     fn test_view_handles(app: &mut MutableAppContext) {
         struct View {
@@ -3297,85 +3080,6 @@ mod tests {
         );
     }
 
-    #[crate::test(self)]
-    async fn test_spawn_from_view(mut app: TestAppContext) {
-        #[derive(Default)]
-        struct View {
-            count: usize,
-        }
-
-        impl Entity for View {
-            type Event = ();
-        }
-
-        impl super::View for View {
-            fn render<'a>(&self, _: &AppContext) -> ElementBox {
-                Empty::new().boxed()
-            }
-
-            fn ui_name() -> &'static str {
-                "View"
-            }
-        }
-
-        let handle = app.add_window(|_| View::default()).1;
-        handle
-            .update(&mut app, |_, c| {
-                c.spawn(async { 7 }, |me, output, _| {
-                    me.count = output;
-                })
-            })
-            .await;
-        app.read(|ctx| assert_eq!(handle.read(ctx).count, 7));
-        handle
-            .update(&mut app, |_, c| {
-                c.spawn(async { 14 }, |me, output, _| {
-                    me.count = output;
-                })
-            })
-            .await;
-        app.read(|ctx| assert_eq!(handle.read(ctx).count, 14));
-    }
-
-    #[crate::test(self)]
-    async fn test_spawn_stream_local_from_view(mut app: TestAppContext) {
-        #[derive(Default)]
-        struct View {
-            events: Vec<Option<usize>>,
-        }
-
-        impl Entity for View {
-            type Event = ();
-        }
-
-        impl super::View for View {
-            fn render<'a>(&self, _: &AppContext) -> ElementBox {
-                Empty::new().boxed()
-            }
-
-            fn ui_name() -> &'static str {
-                "View"
-            }
-        }
-
-        let (_, handle) = app.add_window(|_| View::default());
-        handle
-            .update(&mut app, |_, c| {
-                c.spawn_stream(
-                    smol::stream::iter(vec![1_usize, 2, 3]),
-                    |me, output, _| {
-                        me.events.push(Some(output));
-                    },
-                    |me, _| {
-                        me.events.push(None);
-                    },
-                )
-            })
-            .await;
-
-        app.read(|ctx| assert_eq!(handle.read(ctx).events, [Some(1), Some(2), Some(3), None]))
-    }
-
     #[crate::test(self)]
     fn test_dispatch_action(app: &mut MutableAppContext) {
         struct ViewA {

zed/src/editor/buffer/mod.rs 🔗

@@ -4,11 +4,9 @@ mod selection;
 mod text;
 
 pub use anchor::*;
-use futures_core::future::LocalBoxFuture;
 pub use point::*;
 use seahash::SeaHasher;
 pub use selection::*;
-use smol::future::FutureExt;
 pub use text::*;
 
 use crate::{
@@ -19,7 +17,7 @@ use crate::{
     worktree::FileHandle,
 };
 use anyhow::{anyhow, Result};
-use gpui::{Entity, ModelContext};
+use gpui::{Entity, ModelContext, Task};
 use lazy_static::lazy_static;
 use rand::prelude::*;
 use std::{
@@ -475,21 +473,22 @@ impl Buffer {
         &mut self,
         new_file: Option<FileHandle>,
         ctx: &mut ModelContext<Self>,
-    ) -> LocalBoxFuture<'static, Result<()>> {
+    ) -> Task<Result<()>> {
         let snapshot = self.snapshot();
         let version = self.version.clone();
-        if let Some(file) = new_file.as_ref().or(self.file.as_ref()) {
-            let save_task = file.save(snapshot, ctx.as_ref());
-            ctx.spawn(save_task, |me, save_result, ctx| {
-                if save_result.is_ok() {
-                    me.did_save(version, new_file, ctx);
+        let file = self.file.clone();
+
+        ctx.spawn(|handle, mut ctx| async move {
+            if let Some(file) = new_file.as_ref().or(file.as_ref()) {
+                let result = ctx.read(|ctx| file.save(snapshot, ctx.as_ref())).await;
+                if result.is_ok() {
+                    handle.update(&mut ctx, |me, ctx| me.did_save(version, new_file, ctx));
                 }
-                save_result
-            })
-            .boxed_local()
-        } else {
-            async { Ok(()) }.boxed_local()
-        }
+                result
+            } else {
+                Ok(())
+            }
+        })
     }
 
     fn did_save(

zed/src/editor/buffer_view.rs 🔗

@@ -4,11 +4,10 @@ use super::{
 };
 use crate::{settings::Settings, util::post_inc, workspace, worktree::FileHandle};
 use anyhow::Result;
-use futures_core::future::LocalBoxFuture;
 use gpui::{
     fonts::Properties as FontProperties, geometry::vector::Vector2F, keymap::Binding, text_layout,
     AppContext, ClipboardItem, Element, ElementBox, Entity, FontCache, ModelHandle,
-    MutableAppContext, TextLayoutCache, View, ViewContext, WeakViewHandle,
+    MutableAppContext, Task, TextLayoutCache, View, ViewContext, WeakViewHandle,
 };
 use parking_lot::Mutex;
 use postage::watch;
@@ -2348,13 +2347,12 @@ impl BufferView {
         ctx.notify();
 
         let epoch = self.next_blink_epoch();
-        ctx.spawn(
-            async move {
-                Timer::after(CURSOR_BLINK_INTERVAL).await;
-                epoch
-            },
-            Self::resume_cursor_blinking,
-        )
+        ctx.spawn(|this, mut ctx| async move {
+            Timer::after(CURSOR_BLINK_INTERVAL).await;
+            this.update(&mut ctx, |this, ctx| {
+                this.resume_cursor_blinking(epoch, ctx);
+            })
+        })
         .detach();
     }
 
@@ -2371,13 +2369,10 @@ impl BufferView {
             ctx.notify();
 
             let epoch = self.next_blink_epoch();
-            ctx.spawn(
-                async move {
-                    Timer::after(CURSOR_BLINK_INTERVAL).await;
-                    epoch
-                },
-                Self::blink_cursors,
-            )
+            ctx.spawn(|this, mut ctx| async move {
+                Timer::after(CURSOR_BLINK_INTERVAL).await;
+                this.update(&mut ctx, |this, ctx| this.blink_cursors(epoch, ctx));
+            })
             .detach();
         }
     }
@@ -2498,7 +2493,7 @@ impl workspace::ItemView for BufferView {
         &mut self,
         new_file: Option<FileHandle>,
         ctx: &mut ViewContext<Self>,
-    ) -> LocalBoxFuture<'static, Result<()>> {
+    ) -> Task<Result<()>> {
         self.buffer.update(ctx, |b, ctx| b.save(new_file, ctx))
     }
 

zed/src/file_finder.rs 🔗

@@ -399,7 +399,7 @@ impl FileFinder {
         self.cancel_flag.store(true, atomic::Ordering::Relaxed);
         self.cancel_flag = Arc::new(AtomicBool::new(false));
         let cancel_flag = self.cancel_flag.clone();
-        let task = ctx.background_executor().spawn(async move {
+        let background_task = ctx.background_executor().spawn(async move {
             let include_root_name = snapshots.len() > 1;
             let matches = match_paths(
                 snapshots.iter(),
@@ -415,7 +415,11 @@ impl FileFinder {
             (search_id, did_cancel, query, matches)
         });
 
-        ctx.spawn(task, Self::update_matches).detach();
+        ctx.spawn(|this, mut ctx| async move {
+            let matches = background_task.await;
+            this.update(&mut ctx, |this, ctx| this.update_matches(matches, ctx));
+        })
+        .detach();
 
         Some(())
     }

zed/src/workspace.rs 🔗

@@ -6,10 +6,10 @@ use crate::{
     time::ReplicaId,
     worktree::{FileHandle, Worktree, WorktreeHandle},
 };
-use futures_core::{future::LocalBoxFuture, Future};
+use futures_core::Future;
 use gpui::{
     color::rgbu, elements::*, json::to_string_pretty, keymap::Binding, AnyViewHandle, AppContext,
-    ClipboardItem, Entity, EntityTask, ModelHandle, MutableAppContext, PathPromptOptions, View,
+    ClipboardItem, Entity, ModelHandle, MutableAppContext, PathPromptOptions, Task, View,
     ViewContext, ViewHandle, WeakModelHandle,
 };
 use log::error;
@@ -123,7 +123,7 @@ pub trait ItemView: View {
         &mut self,
         _: Option<FileHandle>,
         _: &mut ViewContext<Self>,
-    ) -> LocalBoxFuture<'static, anyhow::Result<()>>;
+    ) -> Task<anyhow::Result<()>>;
     fn should_activate_item_on_event(_: &Self::Event) -> bool {
         false
     }
@@ -161,7 +161,7 @@ pub trait ItemViewHandle: Send + Sync {
         &self,
         file: Option<FileHandle>,
         ctx: &mut MutableAppContext,
-    ) -> LocalBoxFuture<'static, anyhow::Result<()>>;
+    ) -> Task<anyhow::Result<()>>;
 }
 
 impl<T: Item> ItemHandle for ModelHandle<T> {
@@ -239,7 +239,7 @@ impl<T: ItemView> ItemViewHandle for ViewHandle<T> {
         &self,
         file: Option<FileHandle>,
         ctx: &mut MutableAppContext,
-    ) -> LocalBoxFuture<'static, anyhow::Result<()>> {
+    ) -> Task<anyhow::Result<()>> {
         self.update(ctx, |item, ctx| item.save(file, ctx))
     }
 
@@ -359,16 +359,17 @@ impl Workspace {
             .cloned()
             .zip(entries.into_iter())
             .map(|(abs_path, file)| {
-                ctx.spawn(
-                    bg.spawn(async move { abs_path.is_file() }),
-                    move |me, is_file, ctx| {
+                let is_file = bg.spawn(async move { abs_path.is_file() });
+                ctx.spawn(|this, mut ctx| async move {
+                    let is_file = is_file.await;
+                    this.update(&mut ctx, |this, ctx| {
                         if is_file {
-                            me.open_entry(file.entry_id(), ctx)
+                            this.open_entry(file.entry_id(), ctx)
                         } else {
                             None
                         }
-                    },
-                )
+                    })
+                })
             })
             .collect::<Vec<_>>();
         async move {
@@ -442,7 +443,7 @@ impl Workspace {
         &mut self,
         entry: (usize, Arc<Path>),
         ctx: &mut ViewContext<Self>,
-    ) -> Option<EntityTask<()>> {
+    ) -> Option<Task<()>> {
         // If the active pane contains a view for this file, then activate
         // that item view.
         if self
@@ -496,44 +497,46 @@ impl Workspace {
             let history = ctx
                 .background_executor()
                 .spawn(file.load_history(ctx.as_ref()));
-            ctx.spawn(history, move |_, history, ctx| {
-                *tx.borrow_mut() = Some(match history {
-                    Ok(history) => Ok(Box::new(ctx.add_model(|ctx| {
-                        Buffer::from_history(replica_id, history, Some(file), ctx)
-                    }))),
-                    Err(error) => Err(Arc::new(error)),
+
+            ctx.as_mut()
+                .spawn(|mut ctx| async move {
+                    *tx.borrow_mut() = Some(match history.await {
+                        Ok(history) => Ok(Box::new(ctx.add_model(|ctx| {
+                            Buffer::from_history(replica_id, history, Some(file), ctx)
+                        }))),
+                        Err(error) => Err(Arc::new(error)),
+                    })
                 })
-            })
-            .detach()
+                .detach();
         }
 
         let mut watch = self.loading_items.get(&entry).unwrap().clone();
-        Some(ctx.spawn(
-            async move {
-                loop {
-                    if let Some(load_result) = watch.borrow().as_ref() {
-                        return load_result.clone();
-                    }
-                    watch.next().await;
+
+        Some(ctx.spawn(|this, mut ctx| async move {
+            let load_result = loop {
+                if let Some(load_result) = watch.borrow().as_ref() {
+                    break load_result.clone();
                 }
-            },
-            move |me, load_result, ctx| {
-                me.loading_items.remove(&entry);
+                watch.next().await;
+            };
+
+            this.update(&mut ctx, |this, ctx| {
+                this.loading_items.remove(&entry);
                 match load_result {
                     Ok(item) => {
                         let weak_item = item.downgrade();
                         let view = weak_item
                             .add_view(ctx.window_id(), settings, ctx.as_mut())
                             .unwrap();
-                        me.items.push(weak_item);
-                        me.add_item_view(view, ctx);
+                        this.items.push(weak_item);
+                        this.add_item_view(view, ctx);
                     }
                     Err(error) => {
                         log::error!("error opening item: {}", error);
                     }
                 }
-            },
-        ))
+            })
+        }))
     }
 
     pub fn active_item(&self, ctx: &ViewContext<Self>) -> Option<Box<dyn ItemViewHandle>> {
@@ -552,28 +555,27 @@ impl Workspace {
                     .to_path_buf();
                 ctx.prompt_for_new_path(&start_path, move |path, ctx| {
                     if let Some(path) = path {
-                        handle.update(ctx, move |this, ctx| {
-                            let file = this.file_for_path(&path, ctx);
-                            let task = item.save(Some(file), ctx.as_mut());
-                            ctx.spawn(task, move |_, result, _| {
-                                if let Err(e) = result {
-                                    error!("failed to save item: {:?}, ", e);
-                                }
-                            })
-                            .detach()
+                        ctx.spawn(|mut ctx| async move {
+                            let file =
+                                handle.update(&mut ctx, |me, ctx| me.file_for_path(&path, ctx));
+                            if let Err(error) = ctx.update(|ctx| item.save(Some(file), ctx)).await {
+                                error!("failed to save item: {:?}, ", error);
+                            }
                         })
+                        .detach()
                     }
                 });
                 return;
             }
 
-            let task = item.save(None, ctx.as_mut());
-            ctx.spawn(task, |_, result, _| {
-                if let Err(e) = result {
-                    error!("failed to save item: {:?}, ", e);
-                }
-            })
-            .detach()
+            let save = item.save(None, ctx.as_mut());
+            ctx.foreground()
+                .spawn(async move {
+                    if let Err(e) = save.await {
+                        error!("failed to save item: {:?}, ", e);
+                    }
+                })
+                .detach();
         }
     }
 

zed/src/worktree.rs 🔗

@@ -16,7 +16,7 @@ use postage::{
     prelude::{Sink, Stream},
     watch,
 };
-use smol::{channel::Sender, Timer};
+use smol::channel::Sender;
 use std::{
     cmp,
     collections::{HashMap, HashSet},
@@ -98,8 +98,27 @@ impl Worktree {
             scanner.run(event_stream)
         });
 
-        ctx.spawn_stream(scan_state_rx, Self::observe_scan_state, |_, _| {})
-            .detach();
+        ctx.spawn(|this, mut ctx| {
+            let this = this.downgrade();
+            async move {
+                while let Ok(scan_state) = scan_state_rx.recv().await {
+                    let alive = ctx.update(|ctx| {
+                        if let Some(handle) = this.upgrade(&ctx) {
+                            handle
+                                .update(ctx, |this, ctx| this.observe_scan_state(scan_state, ctx));
+                            true
+                        } else {
+                            false
+                        }
+                    });
+
+                    if !alive {
+                        break;
+                    }
+                }
+            }
+        })
+        .detach();
 
         tree
     }
@@ -116,15 +135,16 @@ impl Worktree {
 
     pub fn next_scan_complete(&self, ctx: &mut ModelContext<Self>) -> impl Future<Output = ()> {
         let scan_id = self.snapshot.scan_id;
-        ctx.spawn_stream(
-            self.scan_state.1.clone(),
-            move |this, scan_state, ctx| {
-                if matches!(scan_state, ScanState::Idle) && this.snapshot.scan_id > scan_id {
-                    ctx.halt_stream();
+        let mut scan_state = self.scan_state.1.clone();
+        ctx.spawn(|this, ctx| async move {
+            while let Some(scan_state) = scan_state.recv().await {
+                if this.read_with(&ctx, |this, _| {
+                    matches!(scan_state, ScanState::Idle) && this.snapshot.scan_id > scan_id
+                }) {
+                    break;
                 }
-            },
-            |_, _| {},
-        )
+            }
+        })
     }
 
     fn observe_scan_state(&mut self, scan_state: ScanState, ctx: &mut ModelContext<Self>) {
@@ -137,9 +157,11 @@ impl Worktree {
         ctx.notify();
 
         if self.is_scanning() && !self.poll_scheduled {
-            ctx.spawn(Timer::after(Duration::from_millis(100)), |this, _, ctx| {
-                this.poll_scheduled = false;
-                this.poll_entries(ctx);
+            ctx.spawn(|this, mut ctx| async move {
+                this.update(&mut ctx, |this, ctx| {
+                    this.poll_scheduled = false;
+                    this.poll_entries(ctx);
+                })
             })
             .detach();
             self.poll_scheduled = true;