Make block_with_timeout more robust (#11670)

Conrad Irwin created

The previous implementation relied on a background thread to wake up the
main thread,
which was prone to priority inversion under heavy load.

In a synthetic test, where we spawn 200 git processes while doing a 5ms
timeout, the old version blocked for 5-80ms, the new version blocks for
5.1-5.4ms.

Release Notes:

- Improved responsiveness of the main thread under high system load

Change summary

crates/gpui/src/executor.rs                    | 125 +++++++++++-------
crates/gpui/src/platform.rs                    |   3 
crates/gpui/src/platform/linux/dispatcher.rs   |  13 +
crates/gpui/src/platform/mac/dispatcher.rs     |  13 +
crates/gpui/src/platform/test/dispatcher.rs    | 128 ++++++++++----------
crates/gpui/src/platform/windows/dispatcher.rs |  13 +
6 files changed, 160 insertions(+), 135 deletions(-)

Detailed changes

crates/gpui/src/executor.rs 🔗

@@ -1,5 +1,5 @@
 use crate::{AppContext, PlatformDispatcher};
-use futures::{channel::mpsc, pin_mut, FutureExt};
+use futures::channel::mpsc;
 use smol::prelude::*;
 use std::{
     fmt::Debug,
@@ -9,7 +9,7 @@ use std::{
     pin::Pin,
     rc::Rc,
     sync::{
-        atomic::{AtomicBool, AtomicUsize, Ordering::SeqCst},
+        atomic::{AtomicUsize, Ordering::SeqCst},
         Arc,
     },
     task::{Context, Poll},
@@ -164,7 +164,7 @@ impl BackgroundExecutor {
     #[cfg(any(test, feature = "test-support"))]
     #[track_caller]
     pub fn block_test<R>(&self, future: impl Future<Output = R>) -> R {
-        if let Ok(value) = self.block_internal(false, future, usize::MAX) {
+        if let Ok(value) = self.block_internal(false, future, None) {
             value
         } else {
             unreachable!()
@@ -174,24 +174,75 @@ impl BackgroundExecutor {
     /// Block the current thread until the given future resolves.
     /// Consider using `block_with_timeout` instead.
     pub fn block<R>(&self, future: impl Future<Output = R>) -> R {
-        if let Ok(value) = self.block_internal(true, future, usize::MAX) {
+        if let Ok(value) = self.block_internal(true, future, None) {
             value
         } else {
             unreachable!()
         }
     }
 
+    #[cfg(not(any(test, feature = "test-support")))]
+    pub(crate) fn block_internal<R>(
+        &self,
+        _background_only: bool,
+        future: impl Future<Output = R>,
+        timeout: Option<Duration>,
+    ) -> Result<R, impl Future<Output = R>> {
+        use std::time::Instant;
+
+        let mut future = Box::pin(future);
+        if timeout == Some(Duration::ZERO) {
+            return Err(future);
+        }
+        let deadline = timeout.map(|timeout| Instant::now() + timeout);
+
+        let unparker = self.dispatcher.unparker();
+        let waker = waker_fn(move || {
+            unparker.unpark();
+        });
+        let mut cx = std::task::Context::from_waker(&waker);
+
+        loop {
+            match future.as_mut().poll(&mut cx) {
+                Poll::Ready(result) => return Ok(result),
+                Poll::Pending => {
+                    let timeout =
+                        deadline.map(|deadline| deadline.saturating_duration_since(Instant::now()));
+                    if !self.dispatcher.park(timeout) {
+                        if deadline.is_some_and(|deadline| deadline < Instant::now()) {
+                            return Err(future);
+                        }
+                    }
+                }
+            }
+        }
+    }
+
+    #[cfg(any(test, feature = "test-support"))]
     #[track_caller]
     pub(crate) fn block_internal<R>(
         &self,
         background_only: bool,
         future: impl Future<Output = R>,
-        mut max_ticks: usize,
-    ) -> Result<R, ()> {
-        pin_mut!(future);
+        timeout: Option<Duration>,
+    ) -> Result<R, impl Future<Output = R>> {
+        use std::sync::atomic::AtomicBool;
+
+        let mut future = Box::pin(future);
+        if timeout == Some(Duration::ZERO) {
+            return Err(future);
+        }
+        let Some(dispatcher) = self.dispatcher.as_test() else {
+            return Err(future);
+        };
+
+        let mut max_ticks = if timeout.is_some() {
+            dispatcher.gen_block_on_ticks()
+        } else {
+            usize::MAX
+        };
         let unparker = self.dispatcher.unparker();
         let awoken = Arc::new(AtomicBool::new(false));
-
         let waker = waker_fn({
             let awoken = awoken.clone();
             move || {
@@ -206,34 +257,30 @@ impl BackgroundExecutor {
                 Poll::Ready(result) => return Ok(result),
                 Poll::Pending => {
                     if max_ticks == 0 {
-                        return Err(());
+                        return Err(future);
                     }
                     max_ticks -= 1;
 
-                    if !self.dispatcher.tick(background_only) {
+                    if !dispatcher.tick(background_only) {
                         if awoken.swap(false, SeqCst) {
                             continue;
                         }
 
-                        #[cfg(any(test, feature = "test-support"))]
-                        if let Some(test) = self.dispatcher.as_test() {
-                            if !test.parking_allowed() {
-                                let mut backtrace_message = String::new();
-                                let mut waiting_message = String::new();
-                                if let Some(backtrace) = test.waiting_backtrace() {
-                                    backtrace_message =
-                                        format!("\nbacktrace of waiting future:\n{:?}", backtrace);
-                                }
-                                if let Some(waiting_hint) = test.waiting_hint() {
-                                    waiting_message = format!("\n  waiting on: {}\n", waiting_hint);
-                                }
-                                panic!(
+                        if !dispatcher.parking_allowed() {
+                            let mut backtrace_message = String::new();
+                            let mut waiting_message = String::new();
+                            if let Some(backtrace) = dispatcher.waiting_backtrace() {
+                                backtrace_message =
+                                    format!("\nbacktrace of waiting future:\n{:?}", backtrace);
+                            }
+                            if let Some(waiting_hint) = dispatcher.waiting_hint() {
+                                waiting_message = format!("\n  waiting on: {}\n", waiting_hint);
+                            }
+                            panic!(
                                     "parked with nothing left to run{waiting_message}{backtrace_message}",
                                 )
-                            }
                         }
-
-                        self.dispatcher.park();
+                        self.dispatcher.park(None);
                     }
                 }
             }
@@ -247,31 +294,7 @@ impl BackgroundExecutor {
         duration: Duration,
         future: impl Future<Output = R>,
     ) -> Result<R, impl Future<Output = R>> {
-        let mut future = Box::pin(future.fuse());
-        if duration.is_zero() {
-            return Err(future);
-        }
-
-        #[cfg(any(test, feature = "test-support"))]
-        let max_ticks = self
-            .dispatcher
-            .as_test()
-            .map_or(usize::MAX, |dispatcher| dispatcher.gen_block_on_ticks());
-        #[cfg(not(any(test, feature = "test-support")))]
-        let max_ticks = usize::MAX;
-
-        let mut timer = self.timer(duration).fuse();
-
-        let timeout = async {
-            futures::select_biased! {
-                value = future => Ok(value),
-                _ = timer => Err(()),
-            }
-        };
-        match self.block_internal(true, timeout, max_ticks) {
-            Ok(Ok(value)) => Ok(value),
-            _ => Err(future),
-        }
+        self.block_internal(true, future, Some(duration))
     }
 
     /// Scoped lets you start a number of tasks and waits

crates/gpui/src/platform.rs 🔗

@@ -240,8 +240,7 @@ pub trait PlatformDispatcher: Send + Sync {
     fn dispatch(&self, runnable: Runnable, label: Option<TaskLabel>);
     fn dispatch_on_main_thread(&self, runnable: Runnable);
     fn dispatch_after(&self, duration: Duration, runnable: Runnable);
-    fn tick(&self, background_only: bool) -> bool;
-    fn park(&self);
+    fn park(&self, timeout: Option<Duration>) -> bool;
     fn unparker(&self) -> Unparker;
 
     #[cfg(any(test, feature = "test-support"))]

crates/gpui/src/platform/linux/dispatcher.rs 🔗

@@ -110,12 +110,13 @@ impl PlatformDispatcher for LinuxDispatcher {
             .ok();
     }
 
-    fn tick(&self, background_only: bool) -> bool {
-        false
-    }
-
-    fn park(&self) {
-        self.parker.lock().park();
+    fn park(&self, timeout: Option<Duration>) -> bool {
+        if let Some(timeout) = timeout {
+            self.parker.lock().park_timeout(timeout)
+        } else {
+            self.parker.lock().park();
+            true
+        }
     }
 
     fn unparker(&self) -> Unparker {

crates/gpui/src/platform/mac/dispatcher.rs 🔗

@@ -87,12 +87,13 @@ impl PlatformDispatcher for MacDispatcher {
         }
     }
 
-    fn tick(&self, _background_only: bool) -> bool {
-        false
-    }
-
-    fn park(&self) {
-        self.parker.lock().park()
+    fn park(&self, timeout: Option<Duration>) -> bool {
+        if let Some(timeout) = timeout {
+            self.parker.lock().park_timeout(timeout)
+        } else {
+            self.parker.lock().park();
+            true
+        }
     }
 
     fn unparker(&self) -> Unparker {

crates/gpui/src/platform/test/dispatcher.rs 🔗

@@ -111,6 +111,68 @@ impl TestDispatcher {
         }
     }
 
+    pub fn tick(&self, background_only: bool) -> bool {
+        let mut state = self.state.lock();
+
+        while let Some((deadline, _)) = state.delayed.first() {
+            if *deadline > state.time {
+                break;
+            }
+            let (_, runnable) = state.delayed.remove(0);
+            state.background.push(runnable);
+        }
+
+        let foreground_len: usize = if background_only {
+            0
+        } else {
+            state
+                .foreground
+                .values()
+                .map(|runnables| runnables.len())
+                .sum()
+        };
+        let background_len = state.background.len();
+
+        let runnable;
+        let main_thread;
+        if foreground_len == 0 && background_len == 0 {
+            let deprioritized_background_len = state.deprioritized_background.len();
+            if deprioritized_background_len == 0 {
+                return false;
+            }
+            let ix = state.random.gen_range(0..deprioritized_background_len);
+            main_thread = false;
+            runnable = state.deprioritized_background.swap_remove(ix);
+        } else {
+            main_thread = state.random.gen_ratio(
+                foreground_len as u32,
+                (foreground_len + background_len) as u32,
+            );
+            if main_thread {
+                let state = &mut *state;
+                runnable = state
+                    .foreground
+                    .values_mut()
+                    .filter(|runnables| !runnables.is_empty())
+                    .choose(&mut state.random)
+                    .unwrap()
+                    .pop_front()
+                    .unwrap();
+            } else {
+                let ix = state.random.gen_range(0..background_len);
+                runnable = state.background.swap_remove(ix);
+            };
+        };
+
+        let was_main_thread = state.is_main_thread;
+        state.is_main_thread = main_thread;
+        drop(state);
+        runnable.run();
+        self.state.lock().is_main_thread = was_main_thread;
+
+        true
+    }
+
     pub fn deprioritize(&self, task_label: TaskLabel) {
         self.state
             .lock()
@@ -221,71 +283,9 @@ impl PlatformDispatcher for TestDispatcher {
         };
         state.delayed.insert(ix, (next_time, runnable));
     }
-
-    fn tick(&self, background_only: bool) -> bool {
-        let mut state = self.state.lock();
-
-        while let Some((deadline, _)) = state.delayed.first() {
-            if *deadline > state.time {
-                break;
-            }
-            let (_, runnable) = state.delayed.remove(0);
-            state.background.push(runnable);
-        }
-
-        let foreground_len: usize = if background_only {
-            0
-        } else {
-            state
-                .foreground
-                .values()
-                .map(|runnables| runnables.len())
-                .sum()
-        };
-        let background_len = state.background.len();
-
-        let runnable;
-        let main_thread;
-        if foreground_len == 0 && background_len == 0 {
-            let deprioritized_background_len = state.deprioritized_background.len();
-            if deprioritized_background_len == 0 {
-                return false;
-            }
-            let ix = state.random.gen_range(0..deprioritized_background_len);
-            main_thread = false;
-            runnable = state.deprioritized_background.swap_remove(ix);
-        } else {
-            main_thread = state.random.gen_ratio(
-                foreground_len as u32,
-                (foreground_len + background_len) as u32,
-            );
-            if main_thread {
-                let state = &mut *state;
-                runnable = state
-                    .foreground
-                    .values_mut()
-                    .filter(|runnables| !runnables.is_empty())
-                    .choose(&mut state.random)
-                    .unwrap()
-                    .pop_front()
-                    .unwrap();
-            } else {
-                let ix = state.random.gen_range(0..background_len);
-                runnable = state.background.swap_remove(ix);
-            };
-        };
-
-        let was_main_thread = state.is_main_thread;
-        state.is_main_thread = main_thread;
-        drop(state);
-        runnable.run();
-        self.state.lock().is_main_thread = was_main_thread;
-
-        true
-    }
-
-    fn park(&self) {
+    fn park(&self, _: Option<std::time::Duration>) -> bool {
         self.parker.lock().park();
+        true
     }
 
     fn unparker(&self) -> Unparker {

crates/gpui/src/platform/windows/dispatcher.rs 🔗

@@ -122,12 +122,13 @@ impl PlatformDispatcher for WindowsDispatcher {
         self.dispatch_on_threadpool_after(runnable, duration);
     }
 
-    fn tick(&self, _background_only: bool) -> bool {
-        false
-    }
-
-    fn park(&self) {
-        self.parker.lock().park();
+    fn park(&self, timeout: Option<Duration>) -> bool {
+        if let Some(timeout) = timeout {
+            self.parker.lock().park_timeout(timeout)
+        } else {
+            self.parker.lock().park();
+            true
+        }
     }
 
     fn unparker(&self) -> parking::Unparker {