Remove Executor::close() (#50970)

Conrad Irwin and Eric Holk created

Co-Authored-By: Eric Holk <eric@zed.dev>

In app drop we had been calling `.close()` on the executors. This caused
problems with the BackgroundExecutor on Linux because it raced with
concurrent work: If task A was running and about to poll task B, the
poll to task B would panic with "Task polled after completion". This
didn't really matter (because the app was shutting down anyway) but
inflated our panic metrics on Linux.

It turns out that the call to `.close()` is not needed. It was added to
prevent foreground tasks being scheduled after the app was dropped; but
on all platforms the App run method does not return until after the
ForegroundExecutor is stopped (so no further tasks will run anyway).

The background case is more interesting. In test code it didn't matter
(the background executor is simulated on the main thread so tests can't
leak tasks); in app code it also didn't really make a difference. When
`fn main` returns (which it does immediately after the app is dropped)
all the background threads will be cancelled anyway.

Further confounding debugging, it turns out that the App does not get
dropped on macOS and Windows due to a reference cycle; so this was only
happening on Linux where the app quit callback is dropped instead of
retained after being called. (Fix in #50985)

Release Notes:

- N/A

---------

Co-authored-by: Eric Holk <eric@zed.dev>

Change summary

crates/gpui/Cargo.toml                    |   1 
crates/gpui/src/app.rs                    |   7 -
crates/gpui/src/executor.rs               | 154 ------------------------
crates/gpui/src/platform_scheduler.rs     |   5 
crates/gpui_linux/src/linux/dispatcher.rs |  10 -
crates/gpui_macos/src/dispatcher.rs       |   9 -
crates/gpui_web/src/dispatcher.rs         |  20 --
crates/gpui_windows/src/dispatcher.rs     |   8 -
crates/repl/src/repl.rs                   |   8 
crates/scheduler/src/executor.rs          |  45 ------
crates/scheduler/src/scheduler.rs         |  16 --
crates/scheduler/src/test_scheduler.rs    |   4 
12 files changed, 16 insertions(+), 271 deletions(-)

Detailed changes

crates/gpui/Cargo.toml 🔗

@@ -151,6 +151,7 @@ rand.workspace = true
 scheduler = { workspace = true, features = ["test-support"] }
 unicode-segmentation.workspace = true
 gpui_util = { workspace = true }
+proptest = { workspace = true }
 
 [target.'cfg(not(target_family = "wasm"))'.dev-dependencies]
 http_client = { workspace = true, features = ["test-support"] }

crates/gpui/src/app.rs 🔗

@@ -2613,13 +2613,6 @@ impl<'a, T> Drop for GpuiBorrow<'a, T> {
     }
 }
 
-impl Drop for App {
-    fn drop(&mut self) {
-        self.foreground_executor.close();
-        self.background_executor.close();
-    }
-}
-
 #[cfg(test)]
 mod test {
     use std::{cell::RefCell, rc::Rc};

crates/gpui/src/executor.rs 🔗

@@ -129,11 +129,6 @@ impl BackgroundExecutor {
         }
     }
 
-    /// Close this executor. Tasks will not run after this is called.
-    pub fn close(&self) {
-        self.inner.close();
-    }
-
     /// Enqueues the given future to be run to completion on a background thread.
     #[track_caller]
     pub fn spawn<R>(&self, future: impl Future<Output = R> + Send + 'static) -> Task<R>
@@ -173,7 +168,6 @@ impl BackgroundExecutor {
     {
         use crate::RunnableMeta;
         use parking_lot::{Condvar, Mutex};
-        use std::sync::{Arc, atomic::AtomicBool};
 
         struct NotifyOnDrop<'a>(&'a (Condvar, Mutex<bool>));
 
@@ -197,14 +191,13 @@ impl BackgroundExecutor {
 
         let dispatcher = self.dispatcher.clone();
         let location = core::panic::Location::caller();
-        let closed = Arc::new(AtomicBool::new(false));
 
         let pair = &(Condvar::new(), Mutex::new(false));
         let _wait_guard = WaitOnDrop(pair);
 
         let (runnable, task) = unsafe {
             async_task::Builder::new()
-                .metadata(RunnableMeta { location, closed })
+                .metadata(RunnableMeta { location })
                 .spawn_unchecked(
                     move |_| async {
                         let _notify_guard = NotifyOnDrop(pair);
@@ -404,11 +397,6 @@ impl ForegroundExecutor {
         }
     }
 
-    /// Close this executor. Tasks will not run after this is called.
-    pub fn close(&self) {
-        self.inner.close();
-    }
-
     /// Enqueues the given Task to run on the main thread.
     #[track_caller]
     pub fn spawn<R>(&self, future: impl Future<Output = R> + 'static) -> Task<R>
@@ -595,144 +583,4 @@ mod test {
             "Task should run normally when app is alive"
         );
     }
-
-    #[test]
-    fn test_task_cancelled_when_app_dropped() {
-        let (dispatcher, _background_executor, app) = create_test_app();
-        let foreground_executor = app.borrow().foreground_executor.clone();
-        let app_weak = Rc::downgrade(&app);
-
-        let task_ran = Rc::new(RefCell::new(false));
-        let task_ran_clone = Rc::clone(&task_ran);
-
-        foreground_executor
-            .spawn(async move {
-                *task_ran_clone.borrow_mut() = true;
-            })
-            .detach();
-
-        drop(app);
-
-        assert!(app_weak.upgrade().is_none(), "App should have been dropped");
-
-        dispatcher.run_until_parked();
-
-        // The task should have been cancelled, not run
-        assert!(
-            !*task_ran.borrow(),
-            "Task should have been cancelled when app was dropped, but it ran!"
-        );
-    }
-
-    #[test]
-    fn test_nested_tasks_both_cancel() {
-        let (dispatcher, _background_executor, app) = create_test_app();
-        let foreground_executor = app.borrow().foreground_executor.clone();
-        let app_weak = Rc::downgrade(&app);
-
-        let outer_completed = Rc::new(RefCell::new(false));
-        let inner_completed = Rc::new(RefCell::new(false));
-        let reached_await = Rc::new(RefCell::new(false));
-
-        let outer_flag = Rc::clone(&outer_completed);
-        let inner_flag = Rc::clone(&inner_completed);
-        let await_flag = Rc::clone(&reached_await);
-
-        // Channel to block the inner task until we're ready
-        let (tx, rx) = futures::channel::oneshot::channel::<()>();
-
-        let inner_executor = foreground_executor.clone();
-
-        foreground_executor
-            .spawn(async move {
-                let inner_task = inner_executor.spawn({
-                    let inner_flag = Rc::clone(&inner_flag);
-                    async move {
-                        rx.await.ok();
-                        *inner_flag.borrow_mut() = true;
-                    }
-                });
-
-                *await_flag.borrow_mut() = true;
-
-                inner_task.await;
-
-                *outer_flag.borrow_mut() = true;
-            })
-            .detach();
-
-        // Run dispatcher until outer task reaches the await point
-        // The inner task will be blocked on the channel
-        dispatcher.run_until_parked();
-
-        // Verify we actually reached the await point before dropping the app
-        assert!(
-            *reached_await.borrow(),
-            "Outer task should have reached the await point"
-        );
-
-        // Neither task should have completed yet
-        assert!(
-            !*outer_completed.borrow(),
-            "Outer task should not have completed yet"
-        );
-        assert!(
-            !*inner_completed.borrow(),
-            "Inner task should not have completed yet"
-        );
-
-        // Drop the channel sender and app while outer is awaiting inner
-        drop(tx);
-        drop(app);
-        assert!(app_weak.upgrade().is_none(), "App should have been dropped");
-
-        // Run dispatcher - both tasks should be cancelled
-        dispatcher.run_until_parked();
-
-        // Neither task should have completed (both were cancelled)
-        assert!(
-            !*outer_completed.borrow(),
-            "Outer task should have been cancelled, not completed"
-        );
-        assert!(
-            !*inner_completed.borrow(),
-            "Inner task should have been cancelled, not completed"
-        );
-    }
-
-    #[test]
-    #[should_panic]
-    fn test_polling_cancelled_task_panics() {
-        let (dispatcher, _background_executor, app) = create_test_app();
-        let foreground_executor = app.borrow().foreground_executor.clone();
-        let app_weak = Rc::downgrade(&app);
-
-        let task = foreground_executor.spawn(async move { 42 });
-
-        drop(app);
-
-        assert!(app_weak.upgrade().is_none(), "App should have been dropped");
-
-        dispatcher.run_until_parked();
-
-        foreground_executor.block_on(task);
-    }
-
-    #[test]
-    fn test_polling_cancelled_task_returns_none_with_fallible() {
-        let (dispatcher, _background_executor, app) = create_test_app();
-        let foreground_executor = app.borrow().foreground_executor.clone();
-        let app_weak = Rc::downgrade(&app);
-
-        let task = foreground_executor.spawn(async move { 42 }).fallible();
-
-        drop(app);
-
-        assert!(app_weak.upgrade().is_none(), "App should have been dropped");
-
-        dispatcher.run_until_parked();
-
-        let result = foreground_executor.block_on(task);
-        assert_eq!(result, None, "Cancelled task should return None");
-    }
 }

crates/gpui/src/platform_scheduler.rs 🔗

@@ -109,16 +109,13 @@ impl Scheduler for PlatformScheduler {
 
     #[track_caller]
     fn timer(&self, duration: Duration) -> Timer {
-        use std::sync::{Arc, atomic::AtomicBool};
-
         let (tx, rx) = oneshot::channel();
         let dispatcher = self.dispatcher.clone();
 
         // Create a runnable that will send the completion signal
         let location = std::panic::Location::caller();
-        let closed = Arc::new(AtomicBool::new(false));
         let (runnable, _task) = async_task::Builder::new()
-            .metadata(RunnableMeta { location, closed })
+            .metadata(RunnableMeta { location })
             .spawn(
                 move |_| async move {
                     let _ = tx.send(());

crates/gpui_linux/src/linux/dispatcher.rs 🔗

@@ -44,11 +44,6 @@ impl LinuxDispatcher {
                     .name(format!("Worker-{i}"))
                     .spawn(move || {
                         for runnable in receiver.iter() {
-                            // Check if the executor that spawned this task was closed
-                            if runnable.metadata().is_closed() {
-                                continue;
-                            }
-
                             let start = Instant::now();
 
                             let location = runnable.metadata().location;
@@ -94,11 +89,6 @@ impl LinuxDispatcher {
                                     calloop::timer::Timer::from_duration(timer.duration),
                                     move |_, _, _| {
                                         if let Some(runnable) = runnable.take() {
-                                            // Check if the executor that spawned this task was closed
-                                            if runnable.metadata().is_closed() {
-                                                return TimeoutAction::Drop;
-                                            }
-
                                             let start = Instant::now();
                                             let location = runnable.metadata().location;
                                             let mut timing = TaskTiming {

crates/gpui_macos/src/dispatcher.rs 🔗

@@ -201,14 +201,7 @@ extern "C" fn trampoline(context: *mut c_void) {
     let runnable =
         unsafe { Runnable::<RunnableMeta>::from_raw(NonNull::new_unchecked(context as *mut ())) };
 
-    let metadata = runnable.metadata();
-
-    // Check if the executor that spawned this task was closed
-    if metadata.is_closed() {
-        return;
-    }
-
-    let location = metadata.location;
+    let location = runnable.metadata().location;
 
     let start = Instant::now();
     let timing = TaskTiming {

crates/gpui_web/src/dispatcher.rs 🔗

@@ -184,10 +184,6 @@ impl WebDispatcher {
                                     }
                                 };
 
-                                if runnable.metadata().is_closed() {

-                                    continue;

-                                }

-

                                 runnable.run();
                             }
                         })
@@ -263,9 +259,7 @@ impl PlatformDispatcher for WebDispatcher {
         let millis = duration.as_millis().min(i32::MAX as u128) as i32;
         if self.on_main_thread() {
             let callback = Closure::once_into_js(move || {
-                if !runnable.metadata().is_closed() {

-                    runnable.run();

-                }

+                runnable.run();

             });
             self.browser_window
                 .set_timeout_with_callback_and_timeout_and_arguments_0(
@@ -300,15 +294,11 @@ impl PlatformDispatcher for WebDispatcher {
 fn execute_on_main_thread(window: &web_sys::Window, item: MainThreadItem) {
     match item {
         MainThreadItem::Runnable(runnable) => {
-            if !runnable.metadata().is_closed() {

-                runnable.run();

-            }

+            runnable.run();

         }
         MainThreadItem::Delayed { runnable, millis } => {
             let callback = Closure::once_into_js(move || {
-                if !runnable.metadata().is_closed() {

-                    runnable.run();

-                }

+                runnable.run();

             });
             window
                 .set_timeout_with_callback_and_timeout_and_arguments_0(
@@ -325,9 +315,7 @@ fn execute_on_main_thread(window: &web_sys::Window, item: MainThreadItem) {
 
 fn schedule_runnable(window: &web_sys::Window, runnable: RunnableVariant, priority: Priority) {
     let callback = Closure::once_into_js(move || {
-        if !runnable.metadata().is_closed() {

-            runnable.run();

-        }

+        runnable.run();

     });
     let callback: &js_sys::Function = callback.unchecked_ref();
 

crates/gpui_windows/src/dispatcher.rs 🔗

@@ -58,10 +58,6 @@ impl WindowsDispatcher {
             let mut task_wrapper = Some(runnable);
             WorkItemHandler::new(move |_| {
                 let runnable = task_wrapper.take().unwrap();
-                // Check if the executor that spawned this task was closed
-                if runnable.metadata().is_closed() {
-                    return Ok(());
-                }
                 Self::execute_runnable(runnable);
                 Ok(())
             })
@@ -75,10 +71,6 @@ impl WindowsDispatcher {
             let mut task_wrapper = Some(runnable);
             TimerElapsedHandler::new(move |_| {
                 let runnable = task_wrapper.take().unwrap();
-                // Check if the executor that spawned this task was closed
-                if runnable.metadata().is_closed() {
-                    return Ok(());
-                }
                 Self::execute_runnable(runnable);
                 Ok(())
             })

crates/repl/src/repl.rs 🔗

@@ -46,11 +46,9 @@ fn zed_dispatcher(cx: &mut App) -> impl Dispatcher {
     impl Dispatcher for ZedDispatcher {
         #[track_caller]
         fn dispatch(&self, runnable: Runnable) {
-            use std::sync::{Arc, atomic::AtomicBool};
             let location = core::panic::Location::caller();
-            let closed = Arc::new(AtomicBool::new(false));
             let (wrapper, task) = async_task::Builder::new()
-                .metadata(RunnableMeta { location, closed })
+                .metadata(RunnableMeta { location })
                 .spawn(|_| async move { runnable.run() }, {
                     let dispatcher = self.dispatcher.clone();
                     move |r| dispatcher.dispatch(r, Priority::default())
@@ -61,11 +59,9 @@ fn zed_dispatcher(cx: &mut App) -> impl Dispatcher {
 
         #[track_caller]
         fn dispatch_after(&self, duration: Duration, runnable: Runnable) {
-            use std::sync::{Arc, atomic::AtomicBool};
             let location = core::panic::Location::caller();
-            let closed = Arc::new(AtomicBool::new(false));
             let (wrapper, task) = async_task::Builder::new()
-                .metadata(RunnableMeta { location, closed })
+                .metadata(RunnableMeta { location })
                 .spawn(|_| async move { runnable.run() }, {
                     let dispatcher = self.dispatcher.clone();
                     move |r| dispatcher.dispatch_after(duration, r)

crates/scheduler/src/executor.rs 🔗

@@ -6,10 +6,7 @@ use std::{
     panic::Location,
     pin::Pin,
     rc::Rc,
-    sync::{
-        Arc,
-        atomic::{AtomicBool, Ordering},
-    },
+    sync::Arc,
     task::{Context, Poll},
     thread::{self, ThreadId},
     time::Duration,
@@ -19,7 +16,6 @@ use std::{
 pub struct ForegroundExecutor {
     session_id: SessionId,
     scheduler: Arc<dyn Scheduler>,
-    closed: Arc<AtomicBool>,
     not_send: PhantomData<Rc<()>>,
 }
 
@@ -28,7 +24,6 @@ impl ForegroundExecutor {
         Self {
             session_id,
             scheduler,
-            closed: Arc::new(AtomicBool::new(false)),
             not_send: PhantomData,
         }
     }
@@ -41,16 +36,6 @@ impl ForegroundExecutor {
         &self.scheduler
     }
 
-    /// Returns the closed flag for this executor.
-    pub fn closed(&self) -> &Arc<AtomicBool> {
-        &self.closed
-    }
-
-    /// Close this executor. Tasks will not run after this is called.
-    pub fn close(&self) {
-        self.closed.store(true, Ordering::SeqCst);
-    }
-
     #[track_caller]
     pub fn spawn<F>(&self, future: F) -> Task<F::Output>
     where
@@ -60,13 +45,12 @@ impl ForegroundExecutor {
         let session_id = self.session_id;
         let scheduler = Arc::clone(&self.scheduler);
         let location = Location::caller();
-        let closed = self.closed.clone();
         let (runnable, task) = spawn_local_with_source_location(
             future,
             move |runnable| {
                 scheduler.schedule_foreground(session_id, runnable);
             },
-            RunnableMeta { location, closed },
+            RunnableMeta { location },
         );
         runnable.schedule();
         Task(TaskState::Spawned(task))
@@ -129,25 +113,11 @@ impl ForegroundExecutor {
 #[derive(Clone)]
 pub struct BackgroundExecutor {
     scheduler: Arc<dyn Scheduler>,
-    closed: Arc<AtomicBool>,
 }
 
 impl BackgroundExecutor {
     pub fn new(scheduler: Arc<dyn Scheduler>) -> Self {
-        Self {
-            scheduler,
-            closed: Arc::new(AtomicBool::new(false)),
-        }
-    }
-
-    /// Returns the closed flag for this executor.
-    pub fn closed(&self) -> &Arc<AtomicBool> {
-        &self.closed
-    }
-
-    /// Close this executor. Tasks will not run after this is called.
-    pub fn close(&self) {
-        self.closed.store(true, Ordering::SeqCst);
+        Self { scheduler }
     }
 
     #[track_caller]
@@ -167,9 +137,8 @@ impl BackgroundExecutor {
     {
         let scheduler = Arc::clone(&self.scheduler);
         let location = Location::caller();
-        let closed = self.closed.clone();
         let (runnable, task) = async_task::Builder::new()
-            .metadata(RunnableMeta { location, closed })
+            .metadata(RunnableMeta { location })
             .spawn(
                 move |_| future,
                 move |runnable| {
@@ -188,20 +157,16 @@ impl BackgroundExecutor {
         F::Output: Send + 'static,
     {
         let location = Location::caller();
-        let closed = self.closed.clone();
         let (tx, rx) = flume::bounded::<async_task::Runnable<RunnableMeta>>(1);
 
         self.scheduler.spawn_realtime(Box::new(move || {
             while let Ok(runnable) = rx.recv() {
-                if runnable.metadata().is_closed() {
-                    continue;
-                }
                 runnable.run();
             }
         }));
 
         let (runnable, task) = async_task::Builder::new()
-            .metadata(RunnableMeta { location, closed })
+            .metadata(RunnableMeta { location })
             .spawn(
                 move |_| future,
                 move |runnable| {

crates/scheduler/src/scheduler.rs 🔗

@@ -14,10 +14,7 @@ use std::{
     future::Future,
     panic::Location,
     pin::Pin,
-    sync::{
-        Arc,
-        atomic::{AtomicBool, Ordering},
-    },
+    sync::Arc,
     task::{Context, Poll},
     time::Duration,
 };
@@ -62,23 +59,12 @@ impl Priority {
 pub struct RunnableMeta {
     /// The source location where the task was spawned.
     pub location: &'static Location<'static>,
-    /// Shared flag indicating whether the scheduler has been closed.
-    /// When true, tasks should be dropped without running.
-    pub closed: Arc<AtomicBool>,
-}
-
-impl RunnableMeta {
-    /// Returns true if the scheduler has been closed and this task should not run.
-    pub fn is_closed(&self) -> bool {
-        self.closed.load(Ordering::SeqCst)
-    }
 }
 
 impl std::fmt::Debug for RunnableMeta {
     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
         f.debug_struct("RunnableMeta")
             .field("location", &self.location)
-            .field("closed", &self.is_closed())
             .finish()
     }
 }

crates/scheduler/src/test_scheduler.rs 🔗

@@ -320,10 +320,6 @@ impl TestScheduler {
         };
 
         if let Some(runnable) = runnable {
-            // Check if the executor that spawned this task was closed
-            if runnable.runnable.metadata().is_closed() {
-                return true;
-            }
             let is_foreground = runnable.session_id.is_some();
             let was_main_thread = self.state.lock().is_main_thread;
             self.state.lock().is_main_thread = is_foreground;