Fix hangs in new dispatcher

Max Brunsfeld and Nathan Sobo created

Co-authored-by: Nathan Sobo <nathan@zed.dev>

Change summary

crates/gpui2/src/app/test_context.rs         |  4 ++
crates/gpui2/src/executor.rs                 | 31 +++++-----------
crates/gpui2/src/platform.rs                 |  3 +
crates/gpui2/src/platform/mac/dispatcher.rs  | 23 ++++++++++++
crates/gpui2/src/platform/mac/platform.rs    |  4 +-
crates/gpui2/src/platform/test/dispatcher.rs | 18 ++++++++++
crates/gpui2/src/test.rs                     |  2 
crates/gpui2_macros/src/test.rs              |  2 
crates/project2/src/project2.rs              |  5 --
crates/project2/src/project_tests.rs         | 39 +++++++++++++++++----
crates/project2/src/worktree.rs              |  6 --
11 files changed, 93 insertions(+), 44 deletions(-)

Detailed changes

crates/gpui2/src/app/test_context.rs 🔗

@@ -70,6 +70,10 @@ impl TestAppContext {
         &self.background_executor
     }
 
+    pub fn foreground_executor(&self) -> &ForegroundExecutor {
+        &self.foreground_executor
+    }
+
     pub fn update<R>(&self, f: impl FnOnce(&mut AppContext) -> R) -> R {
         let mut cx = self.app.borrow_mut();
         cx.update(f)

crates/gpui2/src/executor.rs 🔗

@@ -88,16 +88,7 @@ impl BackgroundExecutor {
 
     #[cfg(any(test, feature = "test-support"))]
     pub fn block_test<R>(&self, future: impl Future<Output = R>) -> R {
-        let (runnable, task) = unsafe {
-            async_task::spawn_unchecked(future, {
-                let dispatcher = self.dispatcher.clone();
-                move |runnable| dispatcher.dispatch_on_main_thread(runnable)
-            })
-        };
-
-        runnable.schedule();
-
-        self.block_internal(false, task)
+        self.block_internal(false, future)
     }
 
     pub fn block<R>(&self, future: impl Future<Output = R>) -> R {
@@ -109,20 +100,19 @@ impl BackgroundExecutor {
         background_only: bool,
         future: impl Future<Output = R>,
     ) -> R {
-        dbg!("block_internal");
         pin_mut!(future);
-        let (parker, unparker) = parking::pair();
+        let unparker = self.dispatcher.unparker();
         let awoken = Arc::new(AtomicBool::new(false));
-        let awoken2 = awoken.clone();
 
-        let waker = waker_fn(move || {
-            dbg!("WAKING UP.");
-            awoken2.store(true, SeqCst);
-            unparker.unpark();
+        let waker = waker_fn({
+            let awoken = awoken.clone();
+            move || {
+                awoken.store(true, SeqCst);
+                unparker.unpark();
+            }
         });
         let mut cx = std::task::Context::from_waker(&waker);
 
-        dbg!("BOOOP");
         loop {
             match future.as_mut().poll(&mut cx) {
                 Poll::Ready(result) => return result,
@@ -143,9 +133,8 @@ impl BackgroundExecutor {
                                 panic!("parked with nothing left to run\n{:?}", backtrace_message)
                             }
                         }
-                        dbg!("PARKING!");
-                        parker.park();
-                        dbg!("CONTINUING!");
+
+                        self.dispatcher.park();
                     }
                 }
             }

crates/gpui2/src/platform.rs 🔗

@@ -12,6 +12,7 @@ use crate::{
 use anyhow::anyhow;
 use async_task::Runnable;
 use futures::channel::oneshot;
+use parking::Unparker;
 use seahash::SeaHasher;
 use serde::{Deserialize, Serialize};
 use std::borrow::Cow;
@@ -163,6 +164,8 @@ pub trait PlatformDispatcher: Send + Sync {
     fn dispatch_on_main_thread(&self, runnable: Runnable);
     fn dispatch_after(&self, duration: Duration, runnable: Runnable);
     fn poll(&self, background_only: bool) -> bool;
+    fn park(&self);
+    fn unparker(&self) -> Unparker;
 
     #[cfg(any(test, feature = "test-support"))]
     fn as_test(&self) -> Option<&TestDispatcher> {

crates/gpui2/src/platform/mac/dispatcher.rs 🔗

@@ -9,8 +9,11 @@ use objc::{
     runtime::{BOOL, YES},
     sel, sel_impl,
 };
+use parking::{Parker, Unparker};
+use parking_lot::Mutex;
 use std::{
     ffi::c_void,
+    sync::Arc,
     time::{Duration, SystemTime},
 };
 
@@ -20,7 +23,17 @@ pub fn dispatch_get_main_queue() -> dispatch_queue_t {
     unsafe { &_dispatch_main_q as *const _ as dispatch_queue_t }
 }
 
-pub struct MacDispatcher;
+pub struct MacDispatcher {
+    parker: Arc<Mutex<Parker>>,
+}
+
+impl MacDispatcher {
+    pub fn new() -> Self {
+        MacDispatcher {
+            parker: Arc::new(Mutex::new(Parker::new())),
+        }
+    }
+}
 
 impl PlatformDispatcher for MacDispatcher {
     fn is_main_thread(&self) -> bool {
@@ -71,6 +84,14 @@ impl PlatformDispatcher for MacDispatcher {
     fn poll(&self, _background_only: bool) -> bool {
         false
     }
+
+    fn park(&self) {
+        self.parker.lock().park()
+    }
+
+    fn unparker(&self) -> Unparker {
+        self.parker.lock().unparker()
+    }
 }
 
 extern "C" fn trampoline(runnable: *mut c_void) {

crates/gpui2/src/platform/mac/platform.rs 🔗

@@ -165,10 +165,10 @@ pub struct MacPlatformState {
 
 impl MacPlatform {
     pub fn new() -> Self {
-        let dispatcher = Arc::new(MacDispatcher);
+        let dispatcher = Arc::new(MacDispatcher::new());
         Self(Mutex::new(MacPlatformState {
             background_executor: BackgroundExecutor::new(dispatcher.clone()),
-            foreground_executor: ForegroundExecutor::new(dispatcher.clone()),
+            foreground_executor: ForegroundExecutor::new(dispatcher),
             text_system: Arc::new(MacTextSystem::new()),
             display_linker: MacDisplayLinker::new(),
             pasteboard: unsafe { NSPasteboard::generalPasteboard(nil) },

crates/gpui2/src/platform/test/dispatcher.rs 🔗

@@ -2,6 +2,7 @@ use crate::PlatformDispatcher;
 use async_task::Runnable;
 use backtrace::Backtrace;
 use collections::{HashMap, VecDeque};
+use parking::{Parker, Unparker};
 use parking_lot::Mutex;
 use rand::prelude::*;
 use std::{
@@ -19,6 +20,8 @@ struct TestDispatcherId(usize);
 pub struct TestDispatcher {
     id: TestDispatcherId,
     state: Arc<Mutex<TestDispatcherState>>,
+    parker: Arc<Mutex<Parker>>,
+    unparker: Unparker,
 }
 
 struct TestDispatcherState {
@@ -35,6 +38,7 @@ struct TestDispatcherState {
 
 impl TestDispatcher {
     pub fn new(random: StdRng) -> Self {
+        let (parker, unparker) = parking::pair();
         let state = TestDispatcherState {
             random,
             foreground: HashMap::default(),
@@ -50,6 +54,8 @@ impl TestDispatcher {
         TestDispatcher {
             id: TestDispatcherId(0),
             state: Arc::new(Mutex::new(state)),
+            parker: Arc::new(Mutex::new(parker)),
+            unparker,
         }
     }
 
@@ -129,6 +135,8 @@ impl Clone for TestDispatcher {
         Self {
             id: TestDispatcherId(id),
             state: self.state.clone(),
+            parker: self.parker.clone(),
+            unparker: self.unparker.clone(),
         }
     }
 }
@@ -140,6 +148,7 @@ impl PlatformDispatcher for TestDispatcher {
 
     fn dispatch(&self, runnable: Runnable) {
         self.state.lock().background.push(runnable);
+        self.unparker.unpark();
     }
 
     fn dispatch_on_main_thread(&self, runnable: Runnable) {
@@ -149,6 +158,7 @@ impl PlatformDispatcher for TestDispatcher {
             .entry(self.id)
             .or_default()
             .push_back(runnable);
+        self.unparker.unpark();
     }
 
     fn dispatch_after(&self, duration: std::time::Duration, runnable: Runnable) {
@@ -215,6 +225,14 @@ impl PlatformDispatcher for TestDispatcher {
         true
     }
 
+    fn park(&self) {
+        self.parker.lock().park();
+    }
+
+    fn unparker(&self) -> Unparker {
+        self.unparker.clone()
+    }
+
     fn as_test(&self) -> Option<&TestDispatcher> {
         Some(self)
     }

crates/gpui2/src/test.rs 🔗

@@ -28,7 +28,7 @@ pub fn run_test(
             }
             let result = panic::catch_unwind(|| {
                 let dispatcher = TestDispatcher::new(StdRng::seed_from_u64(seed));
-                test_fn(dispatcher.clone(), seed);
+                test_fn(dispatcher, seed);
             });
 
             match result {

crates/gpui2_macros/src/test.rs 🔗

@@ -91,7 +91,7 @@ pub fn test(args: TokenStream, function: TokenStream) -> TokenStream {
                         }
                         Some("BackgroundExecutor") => {
                             inner_fn_args.extend(quote!(gpui2::BackgroundExecutor::new(
-                                std::sync::Arc::new(dispatcher.clone())
+                                std::sync::Arc::new(dispatcher.clone()),
                             ),));
                             continue;
                         }

crates/project2/src/project2.rs 🔗

@@ -877,17 +877,14 @@ impl Project {
             )
         });
         for path in root_paths {
-            dbg!(&path);
             let (tree, _) = project
                 .update(cx, |project, cx| {
                     project.find_or_create_local_worktree(path, true, cx)
                 })
                 .await
                 .unwrap();
-            dbg!("aaa");
             tree.update(cx, |tree, _| tree.as_local().unwrap().scan_complete())
                 .await;
-            dbg!("bbb");
         }
         project
     }
@@ -5993,10 +5990,8 @@ impl Project {
     ) -> Task<Result<(Model<Worktree>, PathBuf)>> {
         let abs_path = abs_path.as_ref();
         if let Some((tree, relative_path)) = self.find_local_worktree(abs_path, cx) {
-            dbg!("shortcut");
             Task::ready(Ok((tree, relative_path)))
         } else {
-            dbg!("long cut");
             let worktree = self.create_local_worktree(abs_path, visible, cx);
             cx.background_executor()
                 .spawn(async move { Ok((worktree.await?, PathBuf::new())) })

crates/project2/src/project_tests.rs 🔗

@@ -15,6 +15,36 @@ use std::{os, task::Poll};
 use unindent::Unindent as _;
 use util::{assert_set_eq, test::temp_tree};
 
+#[gpui2::test]
+async fn test_block_via_channel(cx: &mut gpui2::TestAppContext) {
+    cx.executor().allow_parking();
+
+    let (tx, mut rx) = futures::channel::mpsc::unbounded();
+    let _thread = std::thread::spawn(move || {
+        std::fs::metadata("/Users").unwrap();
+        std::thread::sleep(Duration::from_millis(1000));
+        tx.unbounded_send(1).unwrap();
+    });
+    rx.next().await.unwrap();
+}
+
+#[gpui2::test]
+async fn test_block_via_smol(cx: &mut gpui2::TestAppContext) {
+    cx.executor().allow_parking();
+
+    let io_task = smol::unblock(move || {
+        println!("sleeping on thread {:?}", std::thread::current().id());
+        std::thread::sleep(Duration::from_millis(10));
+        1
+    });
+
+    let task = cx.foreground_executor().spawn(async move {
+        io_task.await;
+    });
+
+    task.await;
+}
+
 #[gpui2::test]
 async fn test_symlinks(cx: &mut gpui2::TestAppContext) {
     init_test(cx);
@@ -35,8 +65,6 @@ async fn test_symlinks(cx: &mut gpui2::TestAppContext) {
         }
     }));
 
-    dbg!("GOT HERE");
-
     let root_link_path = dir.path().join("root_link");
     os::unix::fs::symlink(&dir.path().join("root"), &root_link_path).unwrap();
     os::unix::fs::symlink(
@@ -45,11 +73,8 @@ async fn test_symlinks(cx: &mut gpui2::TestAppContext) {
     )
     .unwrap();
 
-    dbg!("GOT HERE 2");
-
     let project = Project::test(Arc::new(RealFs), [root_link_path.as_ref()], cx).await;
 
-    dbg!("GOT HERE 2.5");
     project.update(cx, |project, cx| {
         let tree = project.worktrees().next().unwrap().read(cx);
         assert_eq!(tree.file_count(), 5);
@@ -58,8 +83,6 @@ async fn test_symlinks(cx: &mut gpui2::TestAppContext) {
             tree.inode_for_path("finnochio/grape")
         );
     });
-
-    dbg!("GOT HERE 3");
 }
 
 #[gpui2::test]
@@ -2706,7 +2729,7 @@ async fn test_save_as(cx: &mut gpui2::TestAppContext) {
 #[gpui2::test(retries = 5)]
 async fn test_rescan_and_remote_updates(cx: &mut gpui2::TestAppContext) {
     init_test(cx);
-    // cx.executor().allow_parking();
+    cx.executor().allow_parking();
 
     let dir = temp_tree(json!({
         "a": {

crates/project2/src/worktree.rs 🔗

@@ -297,15 +297,12 @@ impl Worktree {
         // After determining whether the root entry is a file or a directory, populate the
         // snapshot's "root name", which will be used for the purpose of fuzzy matching.
         let abs_path = path.into();
-        eprintln!("get root metadata");
 
         let metadata = fs
             .metadata(&abs_path)
             .await
             .context("failed to stat worktree path")?;
 
-        eprintln!("got root metadata");
-
         cx.build_model(move |cx: &mut ModelContext<Worktree>| {
             let root_name = abs_path
                 .file_name()
@@ -4067,13 +4064,12 @@ impl WorktreeModelHandle for Model<Worktree> {
             fs.create_file(&root_path.join(filename), Default::default())
                 .await
                 .unwrap();
-            cx.executor().run_until_parked();
+
             assert!(tree.update(cx, |tree, _| tree.entry_for_path(filename).is_some()));
 
             fs.remove_file(&root_path.join(filename), Default::default())
                 .await
                 .unwrap();
-            cx.executor().run_until_parked();
             assert!(tree.update(cx, |tree, _| tree.entry_for_path(filename).is_none()));
 
             cx.update(|cx| tree.read(cx).as_local().unwrap().scan_complete())