gpui: Do not use a single shared parker within a Dispatcher (#40417)

Piotr Osiewicz and Cole Miller created

This caused issues with #40172, as it made Zed execute and block on tad
few more background tasks. Parker is ~cheap to create, hence we should
be ok to just create it at the time it is needed.

Release Notes:

- N/A

---------

Co-authored-by: Cole Miller <cole@zed.dev>

Change summary

crates/gpui/src/executor.rs                    | 26 ++++++++++---
crates/gpui/src/platform.rs                    |  3 -
crates/gpui/src/platform/linux/dispatcher.rs   | 17 --------
crates/gpui/src/platform/mac/dispatcher.rs     | 34 -----------------
crates/gpui/src/platform/mac/platform.rs       |  2 
crates/gpui/src/platform/test/dispatcher.rs    | 38 ++++++++++---------
crates/gpui/src/platform/windows/dispatcher.rs | 18 ---------
7 files changed, 41 insertions(+), 97 deletions(-)

Detailed changes

crates/gpui/src/executor.rs 🔗

@@ -210,7 +210,8 @@ impl BackgroundExecutor {
         }
         let deadline = timeout.map(|timeout| Instant::now() + timeout);
 
-        let unparker = self.dispatcher.unparker();
+        let parker = parking::Parker::new();
+        let unparker = parker.unparker();
         let waker = waker_fn(move || {
             unparker.unpark();
         });
@@ -222,10 +223,14 @@ impl BackgroundExecutor {
                 Poll::Pending => {
                     let timeout =
                         deadline.map(|deadline| deadline.saturating_duration_since(Instant::now()));
-                    if !self.dispatcher.park(timeout)
-                        && deadline.is_some_and(|deadline| deadline < Instant::now())
-                    {
-                        return Err(future);
+                    if let Some(timeout) = timeout {
+                        if !parker.park_timeout(timeout)
+                            && deadline.is_some_and(|deadline| deadline < Instant::now())
+                        {
+                            return Err(future);
+                        }
+                    } else {
+                        parker.park();
                     }
                 }
             }
@@ -242,6 +247,8 @@ impl BackgroundExecutor {
     ) -> Result<Fut::Output, impl Future<Output = Fut::Output> + use<Fut>> {
         use std::sync::atomic::AtomicBool;
 
+        use parking::Parker;
+
         let mut future = Box::pin(future);
         if timeout == Some(Duration::ZERO) {
             return Err(future);
@@ -255,10 +262,14 @@ impl BackgroundExecutor {
         } else {
             usize::MAX
         };
-        let unparker = self.dispatcher.unparker();
+
+        let parker = Parker::new();
+        let unparker = parker.unparker();
+
         let awoken = Arc::new(AtomicBool::new(false));
         let waker = waker_fn({
             let awoken = awoken.clone();
+            let unparker = unparker.clone();
             move || {
                 awoken.store(true, SeqCst);
                 unparker.unpark();
@@ -297,7 +308,8 @@ impl BackgroundExecutor {
                                 "parked with nothing left to run{waiting_message}{backtrace_message}",
                             )
                         }
-                        self.dispatcher.park(None);
+                        dispatcher.set_unparker(unparker.clone());
+                        parker.park();
                     }
                 }
             }

crates/gpui/src/platform.rs 🔗

@@ -48,7 +48,6 @@ use async_task::Runnable;
 use futures::channel::oneshot;
 use image::codecs::gif::GifDecoder;
 use image::{AnimationDecoder as _, Frame};
-use parking::Unparker;
 use raw_window_handle::{HasDisplayHandle, HasWindowHandle};
 use schemars::JsonSchema;
 use seahash::SeaHasher;
@@ -564,8 +563,6 @@ pub trait PlatformDispatcher: Send + Sync {
     fn dispatch(&self, runnable: Runnable, label: Option<TaskLabel>);
     fn dispatch_on_main_thread(&self, runnable: Runnable);
     fn dispatch_after(&self, duration: Duration, runnable: Runnable);
-    fn park(&self, timeout: Option<Duration>) -> bool;
-    fn unparker(&self) -> Unparker;
     fn now(&self) -> Instant {
         Instant::now()
     }

crates/gpui/src/platform/linux/dispatcher.rs 🔗

@@ -5,8 +5,6 @@ use calloop::{
     channel::{self, Sender},
     timer::TimeoutAction,
 };
-use parking::{Parker, Unparker};
-use parking_lot::Mutex;
 use std::{
     thread,
     time::{Duration, Instant},
@@ -19,7 +17,6 @@ struct TimerAfter {
 }
 
 pub(crate) struct LinuxDispatcher {
-    parker: Mutex<Parker>,
     main_sender: Sender<Runnable>,
     timer_sender: Sender<TimerAfter>,
     background_sender: flume::Sender<Runnable>,
@@ -92,7 +89,6 @@ impl LinuxDispatcher {
         background_threads.push(timer_thread);
 
         Self {
-            parker: Mutex::new(Parker::new()),
             main_sender,
             timer_sender,
             background_sender,
@@ -130,17 +126,4 @@ impl PlatformDispatcher for LinuxDispatcher {
             .send(TimerAfter { duration, runnable })
             .ok();
     }
-
-    fn park(&self, timeout: Option<Duration>) -> bool {
-        if let Some(timeout) = timeout {
-            self.parker.lock().park_timeout(timeout)
-        } else {
-            self.parker.lock().park();
-            true
-        }
-    }
-
-    fn unparker(&self) -> Unparker {
-        self.parker.lock().unparker()
-    }
 }

crates/gpui/src/platform/mac/dispatcher.rs 🔗

@@ -9,12 +9,9 @@ use objc::{
     runtime::{BOOL, YES},
     sel, sel_impl,
 };
-use parking::{Parker, Unparker};
-use parking_lot::Mutex;
 use std::{
     ffi::c_void,
     ptr::{NonNull, addr_of},
-    sync::Arc,
     time::Duration,
 };
 
@@ -29,23 +26,7 @@ pub(crate) fn dispatch_get_main_queue() -> dispatch_queue_t {
     addr_of!(_dispatch_main_q) as *const _ as dispatch_queue_t
 }
 
-pub(crate) struct MacDispatcher {
-    parker: Arc<Mutex<Parker>>,
-}
-
-impl Default for MacDispatcher {
-    fn default() -> Self {
-        Self::new()
-    }
-}
-
-impl MacDispatcher {
-    pub fn new() -> Self {
-        MacDispatcher {
-            parker: Arc::new(Mutex::new(Parker::new())),
-        }
-    }
-}
+pub(crate) struct MacDispatcher;
 
 impl PlatformDispatcher for MacDispatcher {
     fn is_main_thread(&self) -> bool {
@@ -86,19 +67,6 @@ impl PlatformDispatcher for MacDispatcher {
             );
         }
     }
-
-    fn park(&self, timeout: Option<Duration>) -> bool {
-        if let Some(timeout) = timeout {
-            self.parker.lock().park_timeout(timeout)
-        } else {
-            self.parker.lock().park();
-            true
-        }
-    }
-
-    fn unparker(&self) -> Unparker {
-        self.parker.lock().unparker()
-    }
 }
 
 extern "C" fn trampoline(runnable: *mut c_void) {

crates/gpui/src/platform/mac/platform.rs 🔗

@@ -187,7 +187,7 @@ impl Default for MacPlatform {
 
 impl MacPlatform {
     pub(crate) fn new(headless: bool) -> Self {
-        let dispatcher = Arc::new(MacDispatcher::new());
+        let dispatcher = Arc::new(MacDispatcher);
 
         #[cfg(feature = "font-kit")]
         let text_system = Arc::new(crate::MacTextSystem::new());

crates/gpui/src/platform/test/dispatcher.rs 🔗

@@ -2,7 +2,7 @@ use crate::{PlatformDispatcher, TaskLabel};
 use async_task::Runnable;
 use backtrace::Backtrace;
 use collections::{HashMap, HashSet, VecDeque};
-use parking::{Parker, Unparker};
+use parking::Unparker;
 use parking_lot::Mutex;
 use rand::prelude::*;
 use std::{
@@ -22,8 +22,6 @@ struct TestDispatcherId(usize);
 pub struct TestDispatcher {
     id: TestDispatcherId,
     state: Arc<Mutex<TestDispatcherState>>,
-    parker: Arc<Mutex<Parker>>,
-    unparker: Unparker,
 }
 
 struct TestDispatcherState {
@@ -41,11 +39,11 @@ struct TestDispatcherState {
     waiting_backtrace: Option<Backtrace>,
     deprioritized_task_labels: HashSet<TaskLabel>,
     block_on_ticks: RangeInclusive<usize>,
+    last_parked: Option<Unparker>,
 }
 
 impl TestDispatcher {
     pub fn new(random: StdRng) -> Self {
-        let (parker, unparker) = parking::pair();
         let state = TestDispatcherState {
             random,
             foreground: HashMap::default(),
@@ -61,13 +59,12 @@ impl TestDispatcher {
             waiting_backtrace: None,
             deprioritized_task_labels: Default::default(),
             block_on_ticks: 0..=1000,
+            last_parked: None,
         };
 
         TestDispatcher {
             id: TestDispatcherId(0),
             state: Arc::new(Mutex::new(state)),
-            parker: Arc::new(Mutex::new(parker)),
-            unparker,
         }
     }
 
@@ -243,6 +240,21 @@ impl TestDispatcher {
         let block_on_ticks = lock.block_on_ticks.clone();
         lock.random.random_range(block_on_ticks)
     }
+    pub fn unpark_last(&self) {
+        self.state
+            .lock()
+            .last_parked
+            .take()
+            .as_ref()
+            .map(Unparker::unpark);
+    }
+
+    pub fn set_unparker(&self, unparker: Unparker) {
+        let last = { self.state.lock().last_parked.replace(unparker) };
+        if let Some(last) = last {
+            last.unpark();
+        }
+    }
 }
 
 impl Clone for TestDispatcher {
@@ -251,8 +263,6 @@ impl Clone for TestDispatcher {
         Self {
             id: TestDispatcherId(id),
             state: self.state.clone(),
-            parker: self.parker.clone(),
-            unparker: self.unparker.clone(),
         }
     }
 }
@@ -276,7 +286,7 @@ impl PlatformDispatcher for TestDispatcher {
                 state.background.push(runnable);
             }
         }
-        self.unparker.unpark();
+        self.unpark_last();
     }
 
     fn dispatch_on_main_thread(&self, runnable: Runnable) {
@@ -286,7 +296,7 @@ impl PlatformDispatcher for TestDispatcher {
             .entry(self.id)
             .or_default()
             .push_back(runnable);
-        self.unparker.unpark();
+        self.unpark_last();
     }
 
     fn dispatch_after(&self, duration: std::time::Duration, runnable: Runnable) {
@@ -297,14 +307,6 @@ impl PlatformDispatcher for TestDispatcher {
         };
         state.delayed.insert(ix, (next_time, runnable));
     }
-    fn park(&self, _: Option<std::time::Duration>) -> bool {
-        self.parker.lock().park();
-        true
-    }
-
-    fn unparker(&self) -> Unparker {
-        self.unparker.clone()
-    }
 
     fn as_test(&self) -> Option<&TestDispatcher> {
         Some(self)

crates/gpui/src/platform/windows/dispatcher.rs 🔗

@@ -5,8 +5,6 @@ use std::{
 
 use async_task::Runnable;
 use flume::Sender;
-use parking::Parker;
-use parking_lot::Mutex;
 use util::ResultExt;
 use windows::{
     System::Threading::{
@@ -24,7 +22,6 @@ use crate::{
 
 pub(crate) struct WindowsDispatcher {
     main_sender: Sender<Runnable>,
-    parker: Mutex<Parker>,
     main_thread_id: ThreadId,
     platform_window_handle: SafeHwnd,
     validation_number: usize,
@@ -36,13 +33,11 @@ impl WindowsDispatcher {
         platform_window_handle: HWND,
         validation_number: usize,
     ) -> Self {
-        let parker = Mutex::new(Parker::new());
         let main_thread_id = current().id();
         let platform_window_handle = platform_window_handle.into();
 
         WindowsDispatcher {
             main_sender,
-            parker,
             main_thread_id,
             platform_window_handle,
             validation_number,
@@ -112,17 +107,4 @@ impl PlatformDispatcher for WindowsDispatcher {
     fn dispatch_after(&self, duration: Duration, runnable: Runnable) {
         self.dispatch_on_threadpool_after(runnable, duration);
     }
-
-    fn park(&self, timeout: Option<Duration>) -> bool {
-        if let Some(timeout) = timeout {
-            self.parker.lock().park_timeout(timeout)
-        } else {
-            self.parker.lock().park();
-            true
-        }
-    }
-
-    fn unparker(&self) -> parking::Unparker {
-        self.parker.lock().unparker()
-    }
 }