Complete finish_pending_tasks future when tasks are cancelled

Nathan Sobo and Max Brunsfeld created

Co-Authored-By: Max Brunsfeld <maxbrunsfeld@gmail.com>

Change summary

gpui/src/app.rs | 62 +++++++++++++++++++++++++++++++++++++++++++++++---
1 file changed, 58 insertions(+), 4 deletions(-)

Detailed changes

gpui/src/app.rs 🔗

@@ -983,6 +983,8 @@ impl MutableAppContext {
             task_id,
             task,
             TaskHandlerMap::Future(self.future_handlers.clone()),
+            self.task_done.0.clone(),
+            self.background.clone(),
         )
     }
 
@@ -1020,6 +1022,8 @@ impl MutableAppContext {
             task_id,
             task,
             TaskHandlerMap::Stream(self.stream_handlers.clone()),
+            self.task_done.0.clone(),
+            self.background.clone(),
         )
     }
 
@@ -1039,7 +1043,6 @@ impl MutableAppContext {
                     result = Some(callback(model.as_any_mut(), output, self, model_id));
                     self.ctx.models.insert(model_id, model);
                 }
-                self.task_done(task_id);
             }
             FutureHandler::View {
                 window_id,
@@ -1060,11 +1063,11 @@ impl MutableAppContext {
                         .views
                         .insert(view_id, view);
                 }
-                self.task_done(task_id);
             }
         };
 
         self.flush_effects();
+        self.task_done(task_id);
         result
     }
 
@@ -1119,6 +1122,8 @@ impl MutableAppContext {
     }
 
     fn stream_completed(&mut self, task_id: usize) -> Option<Box<dyn Any>> {
+        self.pending_flushes += 1;
+
         let stream_handler = self.stream_handlers.borrow_mut().remove(&task_id).unwrap();
         let result = match stream_handler {
             StreamHandler::Model {
@@ -1159,6 +1164,8 @@ impl MutableAppContext {
                 }
             }
         };
+
+        self.flush_effects();
         self.task_done(task_id);
         result
     }
@@ -2350,6 +2357,8 @@ pub struct Task<T> {
     id: usize,
     task: Option<executor::Task<T>>,
     handler_map: TaskHandlerMap,
+    background: Arc<executor::Background>,
+    task_done: channel::Sender<usize>,
 }
 
 enum TaskHandlerMap {
@@ -2359,11 +2368,19 @@ enum TaskHandlerMap {
 }
 
 impl<T> Task<T> {
-    fn new(id: usize, task: executor::Task<T>, handler_map: TaskHandlerMap) -> Self {
+    fn new(
+        id: usize,
+        task: executor::Task<T>,
+        handler_map: TaskHandlerMap,
+        task_done: channel::Sender<usize>,
+        background: Arc<executor::Background>,
+    ) -> Self {
         Self {
             id,
             task: Some(task),
             handler_map,
+            task_done,
+            background,
         }
     }
 
@@ -2393,7 +2410,9 @@ impl<T> Future for Task<T> {
 impl<T> Drop for Task<T> {
     fn drop(self: &mut Self) {
         match &self.handler_map {
-            TaskHandlerMap::Detached => {}
+            TaskHandlerMap::Detached => {
+                return;
+            }
             TaskHandlerMap::Future(map) => {
                 map.borrow_mut().remove(&self.id);
             }
@@ -2401,6 +2420,13 @@ impl<T> Drop for Task<T> {
                 map.borrow_mut().remove(&self.id);
             }
         }
+        let task_done = self.task_done.clone();
+        let task_id = self.id;
+        self.background
+            .spawn(async move {
+                let _ = task_done.send(task_id).await;
+            })
+            .detach()
     }
 }
 
@@ -3382,6 +3408,34 @@ mod tests {
             app.finish_pending_tasks().await;
             assert!(app.0.borrow().stream_handlers.borrow().is_empty());
             app.finish_pending_tasks().await; // Don't block if there are no tasks
+
+            // Tasks are considered finished when we drop handles
+            let mut tasks = Vec::new();
+            model.update(&mut app, |_, ctx| {
+                tasks.push(Box::new(ctx.spawn(async {}, |_, _, _| {})));
+                tasks.push(Box::new(ctx.spawn_stream(
+                    smol::stream::iter(vec![1, 2, 3]),
+                    |_, _, _| {},
+                    |_, _| {},
+                )));
+            });
+
+            view.update(&mut app, |_, ctx| {
+                tasks.push(Box::new(ctx.spawn(async {}, |_, _, _| {})));
+                tasks.push(Box::new(ctx.spawn_stream(
+                    smol::stream::iter(vec![1, 2, 3]),
+                    |_, _, _| {},
+                    |_, _| {},
+                )));
+            });
+
+            assert!(!app.0.borrow().stream_handlers.borrow().is_empty());
+
+            let finish_pending_tasks = app.finish_pending_tasks();
+            drop(tasks);
+            finish_pending_tasks.await;
+            assert!(app.0.borrow().stream_handlers.borrow().is_empty());
+            app.finish_pending_tasks().await; // Don't block if there are no tasks
         });
     }
 }