Remove future/stream handlers when task is dropped

Nathan Sobo and Max Brunsfeld created

Co-Authored-By: Max Brunsfeld <maxbrunsfeld@gmail.com>

Change summary

gpui/src/app.rs | 193 ++++++++++++++++++++++++++++++++++++--------------
1 file changed, 139 insertions(+), 54 deletions(-)

Detailed changes

gpui/src/app.rs 🔗

@@ -1,6 +1,6 @@
 use crate::{
     elements::ElementBox,
-    executor::{self, Task},
+    executor,
     keymap::{self, Keystroke},
     platform::{self, App as _, WindowOptions},
     presenter::Presenter,
@@ -311,8 +311,8 @@ pub struct MutableAppContext {
         HashMap<usize, Box<dyn FnMut(WindowInvalidation, &mut MutableAppContext)>>,
     foreground: Rc<executor::Foreground>,
     background: Arc<executor::Background>,
-    future_handlers: HashMap<usize, FutureHandler>,
-    stream_handlers: HashMap<usize, StreamHandler>,
+    future_handlers: Rc<RefCell<HashMap<usize, FutureHandler>>>,
+    stream_handlers: Rc<RefCell<HashMap<usize, StreamHandler>>>,
     task_done: (channel::Sender<usize>, channel::Receiver<usize>),
     pending_effects: VecDeque<Effect>,
     pending_flushes: usize,
@@ -348,8 +348,8 @@ impl MutableAppContext {
             invalidation_callbacks: HashMap::new(),
             foreground,
             background: Arc::new(executor::Background::new()),
-            future_handlers: HashMap::new(),
-            stream_handlers: HashMap::new(),
+            future_handlers: Default::default(),
+            stream_handlers: Default::default(),
             task_done: channel::unbounded(),
             pending_effects: VecDeque::new(),
             pending_flushes: 0,
@@ -963,50 +963,64 @@ impl MutableAppContext {
         self.flush_effects();
     }
 
-    fn spawn<F, T>(&mut self, future: F) -> (usize, Task<Option<T>>)
+    fn spawn<F, T>(&mut self, future: F) -> Task<Option<T>>
     where
         F: 'static + Future,
         T: 'static,
     {
         let task_id = post_inc(&mut self.next_task_id);
         let app = self.weak_self.as_ref().unwrap().upgrade().unwrap();
-        let task = self.foreground.spawn(async move {
-            let output = future.await;
-            app.borrow_mut()
-                .handle_future_output(task_id, Box::new(output))
-                .map(|result| *result.downcast::<T>().unwrap())
-        });
-        (task_id, task)
+        let task = {
+            let app = app.clone();
+            self.foreground.spawn(async move {
+                let output = future.await;
+                app.borrow_mut()
+                    .handle_future_output(task_id, Box::new(output))
+                    .map(|result| *result.downcast::<T>().unwrap())
+            })
+        };
+        Task::new(
+            task_id,
+            task,
+            TaskHandlerMap::Future(self.future_handlers.clone()),
+        )
     }
 
-    fn spawn_stream<F, T>(&mut self, mut stream: F) -> (usize, Task<Option<T>>)
+    fn spawn_stream<F, T>(&mut self, mut stream: F) -> Task<Option<T>>
     where
         F: 'static + Stream + Unpin,
         T: 'static,
     {
         let task_id = post_inc(&mut self.next_task_id);
         let app = self.weak_self.as_ref().unwrap().upgrade().unwrap();
-        let task = self.foreground.spawn(async move {
-            loop {
-                match stream.next().await {
-                    Some(item) => {
-                        let mut app = app.borrow_mut();
-                        if app.handle_stream_item(task_id, Box::new(item)) {
+        let task = {
+            let app = app.clone();
+            self.foreground.spawn(async move {
+                loop {
+                    match stream.next().await {
+                        Some(item) => {
+                            let mut app = app.borrow_mut();
+                            if app.handle_stream_item(task_id, Box::new(item)) {
+                                break;
+                            }
+                        }
+                        None => {
                             break;
                         }
                     }
-                    None => {
-                        break;
-                    }
                 }
-            }
 
-            app.borrow_mut()
-                .stream_completed(task_id)
-                .map(|result| *result.downcast::<T>().unwrap())
-        });
+                app.borrow_mut()
+                    .stream_completed(task_id)
+                    .map(|result| *result.downcast::<T>().unwrap())
+            })
+        };
 
-        (task_id, task)
+        Task::new(
+            task_id,
+            task,
+            TaskHandlerMap::Stream(self.stream_handlers.clone()),
+        )
     }
 
     fn handle_future_output(
@@ -1015,7 +1029,7 @@ impl MutableAppContext {
         output: Box<dyn Any>,
     ) -> Option<Box<dyn Any>> {
         self.pending_flushes += 1;
-        let future_callback = self.future_handlers.remove(&task_id).unwrap();
+        let future_callback = self.future_handlers.borrow_mut().remove(&task_id).unwrap();
 
         let mut result = None;
 
@@ -1057,7 +1071,7 @@ impl MutableAppContext {
     fn handle_stream_item(&mut self, task_id: usize, output: Box<dyn Any>) -> bool {
         self.pending_flushes += 1;
 
-        let mut handler = self.stream_handlers.remove(&task_id).unwrap();
+        let mut handler = self.stream_handlers.borrow_mut().remove(&task_id).unwrap();
         let halt = match &mut handler {
             StreamHandler::Model {
                 model_id,
@@ -1067,7 +1081,7 @@ impl MutableAppContext {
                 if let Some(mut model) = self.ctx.models.remove(&model_id) {
                     let halt = item_callback(model.as_any_mut(), output, self, *model_id);
                     self.ctx.models.insert(*model_id, model);
-                    self.stream_handlers.insert(task_id, handler);
+                    self.stream_handlers.borrow_mut().insert(task_id, handler);
                     halt
                 } else {
                     true
@@ -1092,7 +1106,7 @@ impl MutableAppContext {
                         .unwrap()
                         .views
                         .insert(*view_id, view);
-                    self.stream_handlers.insert(task_id, handler);
+                    self.stream_handlers.borrow_mut().insert(task_id, handler);
                     halt
                 } else {
                     true
@@ -1105,7 +1119,8 @@ impl MutableAppContext {
     }
 
     fn stream_completed(&mut self, task_id: usize) -> Option<Box<dyn Any>> {
-        let result = match self.stream_handlers.remove(&task_id).unwrap() {
+        let stream_handler = self.stream_handlers.borrow_mut().remove(&task_id).unwrap();
+        let result = match stream_handler {
             StreamHandler::Model {
                 model_id,
                 done_callback,
@@ -1158,8 +1173,13 @@ impl MutableAppContext {
     }
 
     pub fn finish_pending_tasks(&self) -> impl Future<Output = ()> {
-        let mut pending_tasks = self.future_handlers.keys().cloned().collect::<HashSet<_>>();
-        pending_tasks.extend(self.stream_handlers.keys());
+        let mut pending_tasks = self
+            .future_handlers
+            .borrow()
+            .keys()
+            .cloned()
+            .collect::<HashSet<_>>();
+        pending_tasks.extend(self.stream_handlers.borrow().keys());
 
         let task_done = self.task_done.1.clone();
 
@@ -1531,10 +1551,10 @@ impl<'a, T: Entity> ModelContext<'a, T> {
         F: 'static + FnOnce(&mut T, S::Output, &mut ModelContext<T>) -> U,
         U: 'static,
     {
-        let (task_id, task) = self.app.spawn::<S, U>(future);
+        let task = self.app.spawn::<S, U>(future);
 
-        self.app.future_handlers.insert(
-            task_id,
+        self.app.future_handlers.borrow_mut().insert(
+            task.id,
             FutureHandler::Model {
                 model_id: self.model_id,
                 callback: Box::new(move |model, output, app, model_id| {
@@ -1564,9 +1584,9 @@ impl<'a, T: Entity> ModelContext<'a, T> {
         G: 'static + FnOnce(&mut T, &mut ModelContext<T>) -> U,
         U: 'static + Any,
     {
-        let (task_id, task) = self.app.spawn_stream(stream);
-        self.app.stream_handlers.insert(
-            task_id,
+        let task = self.app.spawn_stream(stream);
+        self.app.stream_handlers.borrow_mut().insert(
+            task.id,
             StreamHandler::Model {
                 model_id: self.model_id,
                 item_callback: Box::new(move |model, output, app, model_id| {
@@ -1791,10 +1811,10 @@ impl<'a, T: View> ViewContext<'a, T> {
         F: 'static + FnOnce(&mut T, S::Output, &mut ViewContext<T>) -> U,
         U: 'static,
     {
-        let (task_id, task) = self.app.spawn(future);
+        let task = self.app.spawn(future);
 
-        self.app.future_handlers.insert(
-            task_id,
+        self.app.future_handlers.borrow_mut().insert(
+            task.id,
             FutureHandler::View {
                 window_id: self.window_id,
                 view_id: self.view_id,
@@ -1825,9 +1845,9 @@ impl<'a, T: View> ViewContext<'a, T> {
         G: 'static + FnOnce(&mut T, &mut ViewContext<T>) -> U,
         U: 'static + Any,
     {
-        let (task_id, task) = self.app.spawn_stream(stream);
-        self.app.stream_handlers.insert(
-            task_id,
+        let task = self.app.spawn_stream(stream);
+        self.app.stream_handlers.borrow_mut().insert(
+            task.id,
             StreamHandler::View {
                 window_id: self.window_id,
                 view_id: self.view_id,
@@ -2325,6 +2345,65 @@ enum StreamHandler {
     },
 }
 
+#[must_use]
+pub struct Task<T> {
+    id: usize,
+    task: Option<executor::Task<T>>,
+    handler_map: TaskHandlerMap,
+}
+
+enum TaskHandlerMap {
+    Detached,
+    Future(Rc<RefCell<HashMap<usize, FutureHandler>>>),
+    Stream(Rc<RefCell<HashMap<usize, StreamHandler>>>),
+}
+
+impl<T> Task<T> {
+    fn new(id: usize, task: executor::Task<T>, handler_map: TaskHandlerMap) -> Self {
+        Self {
+            id,
+            task: Some(task),
+            handler_map,
+        }
+    }
+
+    pub fn detach(mut self) {
+        self.handler_map = TaskHandlerMap::Detached;
+        self.task.take().unwrap().detach();
+    }
+
+    pub async fn cancel(mut self) -> Option<T> {
+        let task = self.task.take().unwrap();
+        task.cancel().await
+    }
+}
+
+impl<T> Future for Task<T> {
+    type Output = T;
+
+    fn poll(
+        self: std::pin::Pin<&mut Self>,
+        ctx: &mut std::task::Context<'_>,
+    ) -> std::task::Poll<Self::Output> {
+        let task = unsafe { self.map_unchecked_mut(|task| task.task.as_mut().unwrap()) };
+        task.poll(ctx)
+    }
+}
+
+impl<T> Drop for Task<T> {
+    fn drop(self: &mut Self) {
+        match &self.handler_map {
+            TaskHandlerMap::Detached => {}
+            TaskHandlerMap::Future(map) => {
+                map.borrow_mut().remove(&self.id);
+            }
+            TaskHandlerMap::Stream(map) => {
+                map.borrow_mut().remove(&self.id);
+            }
+        }
+    }
+}
+
 #[cfg(test)]
 mod tests {
     use super::*;
@@ -3270,32 +3349,38 @@ mod tests {
 
             model.update(&mut app, |_, ctx| {
                 ctx.spawn(async {}, |_, _, _| {}).detach();
-                ctx.spawn(async {}, |_, _, _| {}).detach();
+                // Cancel this task
+                drop(ctx.spawn(async {}, |_, _, _| {}));
             });
 
             view.update(&mut app, |_, ctx| {
                 ctx.spawn(async {}, |_, _, _| {}).detach();
-                ctx.spawn(async {}, |_, _, _| {}).detach();
+                // Cancel this task
+                drop(ctx.spawn(async {}, |_, _, _| {}));
             });
 
-            assert!(!app.0.borrow().future_handlers.is_empty());
+            assert!(!app.0.borrow().future_handlers.borrow().is_empty());
             app.finish_pending_tasks().await;
-            assert!(app.0.borrow().future_handlers.is_empty());
+            assert!(app.0.borrow().future_handlers.borrow().is_empty());
             app.finish_pending_tasks().await; // Don't block if there are no tasks
 
             model.update(&mut app, |_, ctx| {
                 ctx.spawn_stream(smol::stream::iter(vec![1, 2, 3]), |_, _, _| {}, |_, _| {})
                     .detach();
+                // Cancel this task
+                drop(ctx.spawn_stream(smol::stream::iter(vec![1, 2, 3]), |_, _, _| {}, |_, _| {}));
             });
 
             view.update(&mut app, |_, ctx| {
                 ctx.spawn_stream(smol::stream::iter(vec![1, 2, 3]), |_, _, _| {}, |_, _| {})
                     .detach();
+                // Cancel this task
+                drop(ctx.spawn_stream(smol::stream::iter(vec![1, 2, 3]), |_, _, _| {}, |_, _| {}));
             });
 
-            assert!(!app.0.borrow().stream_handlers.is_empty());
+            assert!(!app.0.borrow().stream_handlers.borrow().is_empty());
             app.finish_pending_tasks().await;
-            assert!(app.0.borrow().stream_handlers.is_empty());
+            assert!(app.0.borrow().stream_handlers.borrow().is_empty());
             app.finish_pending_tasks().await; // Don't block if there are no tasks
         });
     }