From 6c9b813f38182a368de6dcecd59e60b162a6ae0c Mon Sep 17 00:00:00 2001 From: Conrad Irwin Date: Fri, 6 Mar 2026 21:11:45 -0700 Subject: [PATCH] Remove Executor::close() (#50970) Co-Authored-By: Eric Holk In app drop we had been calling `.close()` on the executors. This caused problems with the BackgroundExecutor on Linux because it raced with concurrent work: If task A was running and about to poll task B, the poll to task B would panic with "Task polled after completion". This didn't really matter (because the app was shutting down anyway) but inflated our panic metrics on Linux. It turns out that the call to `.close()` is not needed. It was added to prevent foreground tasks being scheduled after the app was dropped; but on all platforms the App run method does not return until after the ForegroundExecutor is stopped (so no further tasks will run anyway). The background case is more interesting. In test code it didn't matter (the background executor is simulated on the main thread so tests can't leak tasks); in app code it also didn't really make a difference. When `fn main` returns (which it does immediately after the app is dropped) all the background threads will be cancelled anyway. Further confounding debugging, it turns out that the App does not get dropped on macOS and Windows due to a reference cycle; so this was only happening on Linux where the app quit callback is dropped instead of retained after being called. (Fix in #50985) Release Notes: - N/A --------- Co-authored-by: Eric Holk --- crates/gpui/Cargo.toml | 1 + crates/gpui/src/app.rs | 7 - crates/gpui/src/executor.rs | 154 +--------------------- crates/gpui/src/platform_scheduler.rs | 5 +- crates/gpui_linux/src/linux/dispatcher.rs | 10 -- crates/gpui_macos/src/dispatcher.rs | 9 +- crates/gpui_web/src/dispatcher.rs | 20 +-- crates/gpui_windows/src/dispatcher.rs | 8 -- crates/repl/src/repl.rs | 8 +- crates/scheduler/src/executor.rs | 45 +------ crates/scheduler/src/scheduler.rs | 16 +-- crates/scheduler/src/test_scheduler.rs | 4 - 12 files changed, 16 insertions(+), 271 deletions(-) diff --git a/crates/gpui/Cargo.toml b/crates/gpui/Cargo.toml index a07eb08576c31236df26787c9c9ade4186c466d6..28350e55702a88a0aef6686f16f45303c99a75d0 100644 --- a/crates/gpui/Cargo.toml +++ b/crates/gpui/Cargo.toml @@ -151,6 +151,7 @@ rand.workspace = true scheduler = { workspace = true, features = ["test-support"] } unicode-segmentation.workspace = true gpui_util = { workspace = true } +proptest = { workspace = true } [target.'cfg(not(target_family = "wasm"))'.dev-dependencies] http_client = { workspace = true, features = ["test-support"] } diff --git a/crates/gpui/src/app.rs b/crates/gpui/src/app.rs index f1fe264f4ef4ccb09081a6672c7c4ddb1d24dc97..dbe221911a2619aad11dbd31f5bbf07ca8b9fb93 100644 --- a/crates/gpui/src/app.rs +++ b/crates/gpui/src/app.rs @@ -2613,13 +2613,6 @@ impl<'a, T> Drop for GpuiBorrow<'a, T> { } } -impl Drop for App { - fn drop(&mut self) { - self.foreground_executor.close(); - self.background_executor.close(); - } -} - #[cfg(test)] mod test { use std::{cell::RefCell, rc::Rc}; diff --git a/crates/gpui/src/executor.rs b/crates/gpui/src/executor.rs index 31c1ed80b92efb5dfa9ead6dcaf9050fe68ea399..cb65f758d5a521f15f77e7be266b1b4ed0480d03 100644 --- a/crates/gpui/src/executor.rs +++ b/crates/gpui/src/executor.rs @@ -129,11 +129,6 @@ impl BackgroundExecutor { } } - /// Close this executor. Tasks will not run after this is called. - pub fn close(&self) { - self.inner.close(); - } - /// Enqueues the given future to be run to completion on a background thread. #[track_caller] pub fn spawn(&self, future: impl Future + Send + 'static) -> Task @@ -173,7 +168,6 @@ impl BackgroundExecutor { { use crate::RunnableMeta; use parking_lot::{Condvar, Mutex}; - use std::sync::{Arc, atomic::AtomicBool}; struct NotifyOnDrop<'a>(&'a (Condvar, Mutex)); @@ -197,14 +191,13 @@ impl BackgroundExecutor { let dispatcher = self.dispatcher.clone(); let location = core::panic::Location::caller(); - let closed = Arc::new(AtomicBool::new(false)); let pair = &(Condvar::new(), Mutex::new(false)); let _wait_guard = WaitOnDrop(pair); let (runnable, task) = unsafe { async_task::Builder::new() - .metadata(RunnableMeta { location, closed }) + .metadata(RunnableMeta { location }) .spawn_unchecked( move |_| async { let _notify_guard = NotifyOnDrop(pair); @@ -404,11 +397,6 @@ impl ForegroundExecutor { } } - /// Close this executor. Tasks will not run after this is called. - pub fn close(&self) { - self.inner.close(); - } - /// Enqueues the given Task to run on the main thread. #[track_caller] pub fn spawn(&self, future: impl Future + 'static) -> Task @@ -595,144 +583,4 @@ mod test { "Task should run normally when app is alive" ); } - - #[test] - fn test_task_cancelled_when_app_dropped() { - let (dispatcher, _background_executor, app) = create_test_app(); - let foreground_executor = app.borrow().foreground_executor.clone(); - let app_weak = Rc::downgrade(&app); - - let task_ran = Rc::new(RefCell::new(false)); - let task_ran_clone = Rc::clone(&task_ran); - - foreground_executor - .spawn(async move { - *task_ran_clone.borrow_mut() = true; - }) - .detach(); - - drop(app); - - assert!(app_weak.upgrade().is_none(), "App should have been dropped"); - - dispatcher.run_until_parked(); - - // The task should have been cancelled, not run - assert!( - !*task_ran.borrow(), - "Task should have been cancelled when app was dropped, but it ran!" - ); - } - - #[test] - fn test_nested_tasks_both_cancel() { - let (dispatcher, _background_executor, app) = create_test_app(); - let foreground_executor = app.borrow().foreground_executor.clone(); - let app_weak = Rc::downgrade(&app); - - let outer_completed = Rc::new(RefCell::new(false)); - let inner_completed = Rc::new(RefCell::new(false)); - let reached_await = Rc::new(RefCell::new(false)); - - let outer_flag = Rc::clone(&outer_completed); - let inner_flag = Rc::clone(&inner_completed); - let await_flag = Rc::clone(&reached_await); - - // Channel to block the inner task until we're ready - let (tx, rx) = futures::channel::oneshot::channel::<()>(); - - let inner_executor = foreground_executor.clone(); - - foreground_executor - .spawn(async move { - let inner_task = inner_executor.spawn({ - let inner_flag = Rc::clone(&inner_flag); - async move { - rx.await.ok(); - *inner_flag.borrow_mut() = true; - } - }); - - *await_flag.borrow_mut() = true; - - inner_task.await; - - *outer_flag.borrow_mut() = true; - }) - .detach(); - - // Run dispatcher until outer task reaches the await point - // The inner task will be blocked on the channel - dispatcher.run_until_parked(); - - // Verify we actually reached the await point before dropping the app - assert!( - *reached_await.borrow(), - "Outer task should have reached the await point" - ); - - // Neither task should have completed yet - assert!( - !*outer_completed.borrow(), - "Outer task should not have completed yet" - ); - assert!( - !*inner_completed.borrow(), - "Inner task should not have completed yet" - ); - - // Drop the channel sender and app while outer is awaiting inner - drop(tx); - drop(app); - assert!(app_weak.upgrade().is_none(), "App should have been dropped"); - - // Run dispatcher - both tasks should be cancelled - dispatcher.run_until_parked(); - - // Neither task should have completed (both were cancelled) - assert!( - !*outer_completed.borrow(), - "Outer task should have been cancelled, not completed" - ); - assert!( - !*inner_completed.borrow(), - "Inner task should have been cancelled, not completed" - ); - } - - #[test] - #[should_panic] - fn test_polling_cancelled_task_panics() { - let (dispatcher, _background_executor, app) = create_test_app(); - let foreground_executor = app.borrow().foreground_executor.clone(); - let app_weak = Rc::downgrade(&app); - - let task = foreground_executor.spawn(async move { 42 }); - - drop(app); - - assert!(app_weak.upgrade().is_none(), "App should have been dropped"); - - dispatcher.run_until_parked(); - - foreground_executor.block_on(task); - } - - #[test] - fn test_polling_cancelled_task_returns_none_with_fallible() { - let (dispatcher, _background_executor, app) = create_test_app(); - let foreground_executor = app.borrow().foreground_executor.clone(); - let app_weak = Rc::downgrade(&app); - - let task = foreground_executor.spawn(async move { 42 }).fallible(); - - drop(app); - - assert!(app_weak.upgrade().is_none(), "App should have been dropped"); - - dispatcher.run_until_parked(); - - let result = foreground_executor.block_on(task); - assert_eq!(result, None, "Cancelled task should return None"); - } } diff --git a/crates/gpui/src/platform_scheduler.rs b/crates/gpui/src/platform_scheduler.rs index 900cd6041d38380f4d9cb3ff9b87a3605b0ebd78..0087c588d8d6381fa1fe590a2366c2e35ffe0a7a 100644 --- a/crates/gpui/src/platform_scheduler.rs +++ b/crates/gpui/src/platform_scheduler.rs @@ -109,16 +109,13 @@ impl Scheduler for PlatformScheduler { #[track_caller] fn timer(&self, duration: Duration) -> Timer { - use std::sync::{Arc, atomic::AtomicBool}; - let (tx, rx) = oneshot::channel(); let dispatcher = self.dispatcher.clone(); // Create a runnable that will send the completion signal let location = std::panic::Location::caller(); - let closed = Arc::new(AtomicBool::new(false)); let (runnable, _task) = async_task::Builder::new() - .metadata(RunnableMeta { location, closed }) + .metadata(RunnableMeta { location }) .spawn( move |_| async move { let _ = tx.send(()); diff --git a/crates/gpui_linux/src/linux/dispatcher.rs b/crates/gpui_linux/src/linux/dispatcher.rs index ff17fd238ae2a4b40ebdf8e36133c05f3e41f9b3..a72276cc7658a399505fa62bd2d5fe7b41e43e14 100644 --- a/crates/gpui_linux/src/linux/dispatcher.rs +++ b/crates/gpui_linux/src/linux/dispatcher.rs @@ -44,11 +44,6 @@ impl LinuxDispatcher { .name(format!("Worker-{i}")) .spawn(move || { for runnable in receiver.iter() { - // Check if the executor that spawned this task was closed - if runnable.metadata().is_closed() { - continue; - } - let start = Instant::now(); let location = runnable.metadata().location; @@ -94,11 +89,6 @@ impl LinuxDispatcher { calloop::timer::Timer::from_duration(timer.duration), move |_, _, _| { if let Some(runnable) = runnable.take() { - // Check if the executor that spawned this task was closed - if runnable.metadata().is_closed() { - return TimeoutAction::Drop; - } - let start = Instant::now(); let location = runnable.metadata().location; let mut timing = TaskTiming { diff --git a/crates/gpui_macos/src/dispatcher.rs b/crates/gpui_macos/src/dispatcher.rs index 07638639e4bf5d3f002c1babfc213bc330e63dce..dd6f546f68b88efe6babc13e2d923d634eff5825 100644 --- a/crates/gpui_macos/src/dispatcher.rs +++ b/crates/gpui_macos/src/dispatcher.rs @@ -201,14 +201,7 @@ extern "C" fn trampoline(context: *mut c_void) { let runnable = unsafe { Runnable::::from_raw(NonNull::new_unchecked(context as *mut ())) }; - let metadata = runnable.metadata(); - - // Check if the executor that spawned this task was closed - if metadata.is_closed() { - return; - } - - let location = metadata.location; + let location = runnable.metadata().location; let start = Instant::now(); let timing = TaskTiming { diff --git a/crates/gpui_web/src/dispatcher.rs b/crates/gpui_web/src/dispatcher.rs index d9419fb35353cfadd809b0bbc1cb9e7dbf124cda..5a0911f7ef1a33d1959de6d03f9f9797978b7a9b 100644 --- a/crates/gpui_web/src/dispatcher.rs +++ b/crates/gpui_web/src/dispatcher.rs @@ -184,10 +184,6 @@ impl WebDispatcher { } }; - if runnable.metadata().is_closed() { - continue; - } - runnable.run(); } }) @@ -263,9 +259,7 @@ impl PlatformDispatcher for WebDispatcher { let millis = duration.as_millis().min(i32::MAX as u128) as i32; if self.on_main_thread() { let callback = Closure::once_into_js(move || { - if !runnable.metadata().is_closed() { - runnable.run(); - } + runnable.run(); }); self.browser_window .set_timeout_with_callback_and_timeout_and_arguments_0( @@ -300,15 +294,11 @@ impl PlatformDispatcher for WebDispatcher { fn execute_on_main_thread(window: &web_sys::Window, item: MainThreadItem) { match item { MainThreadItem::Runnable(runnable) => { - if !runnable.metadata().is_closed() { - runnable.run(); - } + runnable.run(); } MainThreadItem::Delayed { runnable, millis } => { let callback = Closure::once_into_js(move || { - if !runnable.metadata().is_closed() { - runnable.run(); - } + runnable.run(); }); window .set_timeout_with_callback_and_timeout_and_arguments_0( @@ -325,9 +315,7 @@ fn execute_on_main_thread(window: &web_sys::Window, item: MainThreadItem) { fn schedule_runnable(window: &web_sys::Window, runnable: RunnableVariant, priority: Priority) { let callback = Closure::once_into_js(move || { - if !runnable.metadata().is_closed() { - runnable.run(); - } + runnable.run(); }); let callback: &js_sys::Function = callback.unchecked_ref(); diff --git a/crates/gpui_windows/src/dispatcher.rs b/crates/gpui_windows/src/dispatcher.rs index 060cdb7ba626133b9c201980e54bd0479694faa6..a5cfd9dc10d9afcce9580565943c28cb83dc9dab 100644 --- a/crates/gpui_windows/src/dispatcher.rs +++ b/crates/gpui_windows/src/dispatcher.rs @@ -58,10 +58,6 @@ impl WindowsDispatcher { let mut task_wrapper = Some(runnable); WorkItemHandler::new(move |_| { let runnable = task_wrapper.take().unwrap(); - // Check if the executor that spawned this task was closed - if runnable.metadata().is_closed() { - return Ok(()); - } Self::execute_runnable(runnable); Ok(()) }) @@ -75,10 +71,6 @@ impl WindowsDispatcher { let mut task_wrapper = Some(runnable); TimerElapsedHandler::new(move |_| { let runnable = task_wrapper.take().unwrap(); - // Check if the executor that spawned this task was closed - if runnable.metadata().is_closed() { - return Ok(()); - } Self::execute_runnable(runnable); Ok(()) }) diff --git a/crates/repl/src/repl.rs b/crates/repl/src/repl.rs index f17cf8dfba5f5e0e950bd5f2967a6b20d2eebb51..8c3d15a2ad2dfdd18976d750c71e2b3cfb0393a4 100644 --- a/crates/repl/src/repl.rs +++ b/crates/repl/src/repl.rs @@ -46,11 +46,9 @@ fn zed_dispatcher(cx: &mut App) -> impl Dispatcher { impl Dispatcher for ZedDispatcher { #[track_caller] fn dispatch(&self, runnable: Runnable) { - use std::sync::{Arc, atomic::AtomicBool}; let location = core::panic::Location::caller(); - let closed = Arc::new(AtomicBool::new(false)); let (wrapper, task) = async_task::Builder::new() - .metadata(RunnableMeta { location, closed }) + .metadata(RunnableMeta { location }) .spawn(|_| async move { runnable.run() }, { let dispatcher = self.dispatcher.clone(); move |r| dispatcher.dispatch(r, Priority::default()) @@ -61,11 +59,9 @@ fn zed_dispatcher(cx: &mut App) -> impl Dispatcher { #[track_caller] fn dispatch_after(&self, duration: Duration, runnable: Runnable) { - use std::sync::{Arc, atomic::AtomicBool}; let location = core::panic::Location::caller(); - let closed = Arc::new(AtomicBool::new(false)); let (wrapper, task) = async_task::Builder::new() - .metadata(RunnableMeta { location, closed }) + .metadata(RunnableMeta { location }) .spawn(|_| async move { runnable.run() }, { let dispatcher = self.dispatcher.clone(); move |r| dispatcher.dispatch_after(duration, r) diff --git a/crates/scheduler/src/executor.rs b/crates/scheduler/src/executor.rs index 76df2e69f66398e3709e1db58a847b1cd0079fc4..602404142a1f4d19bbce841b3b06996cc2a7427b 100644 --- a/crates/scheduler/src/executor.rs +++ b/crates/scheduler/src/executor.rs @@ -6,10 +6,7 @@ use std::{ panic::Location, pin::Pin, rc::Rc, - sync::{ - Arc, - atomic::{AtomicBool, Ordering}, - }, + sync::Arc, task::{Context, Poll}, thread::{self, ThreadId}, time::Duration, @@ -19,7 +16,6 @@ use std::{ pub struct ForegroundExecutor { session_id: SessionId, scheduler: Arc, - closed: Arc, not_send: PhantomData>, } @@ -28,7 +24,6 @@ impl ForegroundExecutor { Self { session_id, scheduler, - closed: Arc::new(AtomicBool::new(false)), not_send: PhantomData, } } @@ -41,16 +36,6 @@ impl ForegroundExecutor { &self.scheduler } - /// Returns the closed flag for this executor. - pub fn closed(&self) -> &Arc { - &self.closed - } - - /// Close this executor. Tasks will not run after this is called. - pub fn close(&self) { - self.closed.store(true, Ordering::SeqCst); - } - #[track_caller] pub fn spawn(&self, future: F) -> Task where @@ -60,13 +45,12 @@ impl ForegroundExecutor { let session_id = self.session_id; let scheduler = Arc::clone(&self.scheduler); let location = Location::caller(); - let closed = self.closed.clone(); let (runnable, task) = spawn_local_with_source_location( future, move |runnable| { scheduler.schedule_foreground(session_id, runnable); }, - RunnableMeta { location, closed }, + RunnableMeta { location }, ); runnable.schedule(); Task(TaskState::Spawned(task)) @@ -129,25 +113,11 @@ impl ForegroundExecutor { #[derive(Clone)] pub struct BackgroundExecutor { scheduler: Arc, - closed: Arc, } impl BackgroundExecutor { pub fn new(scheduler: Arc) -> Self { - Self { - scheduler, - closed: Arc::new(AtomicBool::new(false)), - } - } - - /// Returns the closed flag for this executor. - pub fn closed(&self) -> &Arc { - &self.closed - } - - /// Close this executor. Tasks will not run after this is called. - pub fn close(&self) { - self.closed.store(true, Ordering::SeqCst); + Self { scheduler } } #[track_caller] @@ -167,9 +137,8 @@ impl BackgroundExecutor { { let scheduler = Arc::clone(&self.scheduler); let location = Location::caller(); - let closed = self.closed.clone(); let (runnable, task) = async_task::Builder::new() - .metadata(RunnableMeta { location, closed }) + .metadata(RunnableMeta { location }) .spawn( move |_| future, move |runnable| { @@ -188,20 +157,16 @@ impl BackgroundExecutor { F::Output: Send + 'static, { let location = Location::caller(); - let closed = self.closed.clone(); let (tx, rx) = flume::bounded::>(1); self.scheduler.spawn_realtime(Box::new(move || { while let Ok(runnable) = rx.recv() { - if runnable.metadata().is_closed() { - continue; - } runnable.run(); } })); let (runnable, task) = async_task::Builder::new() - .metadata(RunnableMeta { location, closed }) + .metadata(RunnableMeta { location }) .spawn( move |_| future, move |runnable| { diff --git a/crates/scheduler/src/scheduler.rs b/crates/scheduler/src/scheduler.rs index 5b1fac258d088d3be7a2254bbf68431cdb507c70..05d285df8d9622ac901618f5543d2f219290ee0d 100644 --- a/crates/scheduler/src/scheduler.rs +++ b/crates/scheduler/src/scheduler.rs @@ -14,10 +14,7 @@ use std::{ future::Future, panic::Location, pin::Pin, - sync::{ - Arc, - atomic::{AtomicBool, Ordering}, - }, + sync::Arc, task::{Context, Poll}, time::Duration, }; @@ -62,23 +59,12 @@ impl Priority { pub struct RunnableMeta { /// The source location where the task was spawned. pub location: &'static Location<'static>, - /// Shared flag indicating whether the scheduler has been closed. - /// When true, tasks should be dropped without running. - pub closed: Arc, -} - -impl RunnableMeta { - /// Returns true if the scheduler has been closed and this task should not run. - pub fn is_closed(&self) -> bool { - self.closed.load(Ordering::SeqCst) - } } impl std::fmt::Debug for RunnableMeta { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("RunnableMeta") .field("location", &self.location) - .field("closed", &self.is_closed()) .finish() } } diff --git a/crates/scheduler/src/test_scheduler.rs b/crates/scheduler/src/test_scheduler.rs index e4c330dcd162ad6512da05c9e66449fd7da36083..5a14f9c335bfaaa16cbac2344a2d89dd585225a7 100644 --- a/crates/scheduler/src/test_scheduler.rs +++ b/crates/scheduler/src/test_scheduler.rs @@ -320,10 +320,6 @@ impl TestScheduler { }; if let Some(runnable) = runnable { - // Check if the executor that spawned this task was closed - if runnable.runnable.metadata().is_closed() { - return true; - } let is_foreground = runnable.session_id.is_some(); let was_main_thread = self.state.lock().is_main_thread; self.state.lock().is_main_thread = is_foreground;