windows: Improve foreground task dispatching on Windows (#23283)

张小白 created

Closes #22653

After some investigation, I found this bug is due to that sometimes
`foreground_task` is not dispatched to the main thread unless there is
user input. The current Windows implementation works as follows: when
the `WindowsDispatcher` receives a `foreground_task`, it adds the task
to a queue and uses `SetEvent(dispatch_event)` to notify the main
thread.

The main thread then listens for notifications using
`MsgWaitForMultipleObjects(&[dispatch_event])`.

Essentially, this is a synchronous method, but it is not robust. For
example, if 100 `foreground_task`s are sent, `dispatch_event` should
theoretically be triggered 100 times, and
`MsgWaitForMultipleObjects(&[dispatch_event])` should receive 100
notifications, causing the main thread to execute all 100 tasks.
However, in practice, some `foreground_task`s may not get a chance to
execute due to certain reasons.

As shown in the attached video, when I don't move the mouse, there are
about 20-30 `foreground_task`s waiting in the queue to be executed. When
I move the mouse, `run_foreground_tasks()` is called, which processes
the tasks in the queue.



https://github.com/user-attachments/assets/83cd09ca-4b17-4a1f-9a2a-5d1569b23483



To address this, this PR adopts an approach similar to `winit`. In
`winit`, an invisible window is created for message passing. In this PR,
we use `PostThreadMessage` to directly send messages to the main thread.

With this implementation, when 100 `foreground_task`s are sent, the
`WindowsDispatcher` uses `PostThreadMessageW(thread_id,
RUNNABLE_DISPATCHED)` to notify the main thread. This approach enqueues
100 `RUNNABLE_DISPATCHED` messages in the main thread's message queue,
ensuring that each `foreground_task` is executed as expected. The main
thread continuously processes these messages, guaranteeing that all 100
tasks are executed.

Release Notes:

- N/A

Change summary

crates/gpui/src/platform/windows/dispatcher.rs |  38 ++++-
crates/gpui/src/platform/windows/events.rs     |  13 +-
crates/gpui/src/platform/windows/platform.rs   | 120 +++++++++++--------
crates/gpui/src/platform/windows/window.rs     |   5 
4 files changed, 110 insertions(+), 66 deletions(-)

Detailed changes

crates/gpui/src/platform/windows/dispatcher.rs 🔗

@@ -15,29 +15,37 @@ use windows::{
         ThreadPool, ThreadPoolTimer, TimerElapsedHandler, WorkItemHandler, WorkItemOptions,
         WorkItemPriority,
     },
-    Win32::{Foundation::HANDLE, System::Threading::SetEvent},
+    Win32::{
+        Foundation::{LPARAM, WPARAM},
+        UI::WindowsAndMessaging::PostThreadMessageW,
+    },
 };
 
-use crate::{PlatformDispatcher, SafeHandle, TaskLabel};
+use crate::{PlatformDispatcher, TaskLabel, WM_ZED_EVENT_DISPATCHED_ON_MAIN_THREAD};
 
 pub(crate) struct WindowsDispatcher {
     main_sender: Sender<Runnable>,
-    dispatch_event: SafeHandle,
     parker: Mutex<Parker>,
     main_thread_id: ThreadId,
+    main_thread_id_win32: u32,
+    validation_number: usize,
 }
 
 impl WindowsDispatcher {
-    pub(crate) fn new(main_sender: Sender<Runnable>, dispatch_event: HANDLE) -> Self {
-        let dispatch_event = dispatch_event.into();
+    pub(crate) fn new(
+        main_sender: Sender<Runnable>,
+        main_thread_id_win32: u32,
+        validation_number: usize,
+    ) -> Self {
         let parker = Mutex::new(Parker::new());
         let main_thread_id = current().id();
 
         WindowsDispatcher {
             main_sender,
-            dispatch_event,
             parker,
             main_thread_id,
+            main_thread_id_win32,
+            validation_number,
         }
     }
 
@@ -87,11 +95,23 @@ impl PlatformDispatcher for WindowsDispatcher {
     }
 
     fn dispatch_on_main_thread(&self, runnable: Runnable) {
-        self.main_sender
+        if self
+            .main_sender
             .send(runnable)
             .context("Dispatch on main thread failed")
-            .log_err();
-        unsafe { SetEvent(*self.dispatch_event).log_err() };
+            .log_err()
+            .is_some()
+        {
+            unsafe {
+                PostThreadMessageW(
+                    self.main_thread_id_win32,
+                    WM_ZED_EVENT_DISPATCHED_ON_MAIN_THREAD,
+                    WPARAM(self.validation_number),
+                    LPARAM(0),
+                )
+                .log_err();
+            }
+        }
     }
 
     fn dispatch_after(&self, duration: Duration, runnable: Runnable) {

crates/gpui/src/platform/windows/events.rs 🔗

@@ -16,8 +16,9 @@ use windows::Win32::{
 
 use crate::*;
 
-pub(crate) const CURSOR_STYLE_CHANGED: u32 = WM_USER + 1;
-pub(crate) const CLOSE_ONE_WINDOW: u32 = WM_USER + 2;
+pub(crate) const WM_ZED_CURSOR_STYLE_CHANGED: u32 = WM_USER + 1;
+pub(crate) const WM_ZED_CLOSE_ONE_WINDOW: u32 = WM_USER + 2;
+pub(crate) const WM_ZED_EVENT_DISPATCHED_ON_MAIN_THREAD: u32 = WM_USER + 3;
 
 const SIZE_MOVE_LOOP_TIMER_ID: usize = 1;
 const AUTO_HIDE_TASKBAR_THICKNESS_PX: i32 = 1;
@@ -89,7 +90,7 @@ pub(crate) fn handle_msg(
         WM_SETCURSOR => handle_set_cursor(lparam, state_ptr),
         WM_SETTINGCHANGE => handle_system_settings_changed(handle, state_ptr),
         WM_DWMCOLORIZATIONCOLORCHANGED => handle_system_theme_changed(state_ptr),
-        CURSOR_STYLE_CHANGED => handle_cursor_changed(lparam, state_ptr),
+        WM_ZED_CURSOR_STYLE_CHANGED => handle_cursor_changed(lparam, state_ptr),
         _ => None,
     };
     if let Some(n) = handled {
@@ -243,9 +244,9 @@ fn handle_destroy_msg(handle: HWND, state_ptr: Rc<WindowsWindowStatePtr>) -> Opt
         callback();
     }
     unsafe {
-        PostMessageW(
-            None,
-            CLOSE_ONE_WINDOW,
+        PostThreadMessageW(
+            state_ptr.main_thread_id_win32,
+            WM_ZED_CLOSE_ONE_WINDOW,
             WPARAM(state_ptr.validation_number),
             LPARAM(handle.0 as isize),
         )

crates/gpui/src/platform/windows/platform.rs 🔗

@@ -37,13 +37,13 @@ pub(crate) struct WindowsPlatform {
     // The below members will never change throughout the entire lifecycle of the app.
     icon: HICON,
     main_receiver: flume::Receiver<Runnable>,
-    dispatch_event: HANDLE,
     background_executor: BackgroundExecutor,
     foreground_executor: ForegroundExecutor,
     text_system: Arc<DirectWriteTextSystem>,
     windows_version: WindowsVersion,
     bitmap_factory: ManuallyDrop<IWICImagingFactory>,
     validation_number: usize,
+    main_thread_id_win32: u32,
 }
 
 pub(crate) struct WindowsPlatformState {
@@ -82,8 +82,13 @@ impl WindowsPlatform {
             OleInitialize(None).expect("unable to initialize Windows OLE");
         }
         let (main_sender, main_receiver) = flume::unbounded::<Runnable>();
-        let dispatch_event = unsafe { CreateEventW(None, false, false, None) }.unwrap();
-        let dispatcher = Arc::new(WindowsDispatcher::new(main_sender, dispatch_event));
+        let main_thread_id_win32 = unsafe { GetCurrentThreadId() };
+        let validation_number = rand::random::<usize>();
+        let dispatcher = Arc::new(WindowsDispatcher::new(
+            main_sender,
+            main_thread_id_win32,
+            validation_number,
+        ));
         let background_executor = BackgroundExecutor::new(dispatcher.clone());
         let foreground_executor = ForegroundExecutor::new(dispatcher);
         let bitmap_factory = ManuallyDrop::new(unsafe {
@@ -99,7 +104,6 @@ impl WindowsPlatform {
         let raw_window_handles = RwLock::new(SmallVec::new());
         let gpu_context = BladeContext::new().expect("Unable to init GPU context");
         let windows_version = WindowsVersion::new().expect("Error retrieve windows version");
-        let validation_number = rand::random::<usize>();
 
         Self {
             state,
@@ -107,13 +111,13 @@ impl WindowsPlatform {
             gpu_context,
             icon,
             main_receiver,
-            dispatch_event,
             background_executor,
             foreground_executor,
             text_system,
             windows_version,
             bitmap_factory,
             validation_number,
+            main_thread_id_win32,
         }
     }
 
@@ -150,16 +154,7 @@ impl WindowsPlatform {
             });
     }
 
-    fn close_one_window(
-        &self,
-        target_window: HWND,
-        validation_number: usize,
-        msg: *const MSG,
-    ) -> bool {
-        if validation_number != self.validation_number {
-            unsafe { DispatchMessageW(msg) };
-            return false;
-        }
+    fn close_one_window(&self, target_window: HWND) -> bool {
         let mut lock = self.raw_window_handles.write();
         let index = lock
             .iter()
@@ -171,8 +166,8 @@ impl WindowsPlatform {
     }
 
     #[inline]
-    fn run_foreground_tasks(&self) {
-        for runnable in self.main_receiver.drain() {
+    fn run_foreground_task(&self) {
+        if let Ok(runnable) = self.main_receiver.try_recv() {
             runnable.run();
         }
     }
@@ -185,7 +180,54 @@ impl WindowsPlatform {
             windows_version: self.windows_version,
             validation_number: self.validation_number,
             main_receiver: self.main_receiver.clone(),
+            main_thread_id_win32: self.main_thread_id_win32,
+        }
+    }
+
+    fn handle_events(&self) -> bool {
+        let mut msg = MSG::default();
+        unsafe {
+            while PeekMessageW(&mut msg, None, 0, 0, PM_REMOVE).as_bool() {
+                match msg.message {
+                    WM_QUIT => return true,
+                    WM_ZED_CLOSE_ONE_WINDOW | WM_ZED_EVENT_DISPATCHED_ON_MAIN_THREAD => {
+                        if self.handle_zed_evnets(msg.message, msg.wParam, msg.lParam, &msg) {
+                            return true;
+                        }
+                    }
+                    _ => {
+                        // todo(windows)
+                        // crate `windows 0.56` reports true as Err
+                        TranslateMessage(&msg).as_bool();
+                        DispatchMessageW(&msg);
+                    }
+                }
+            }
         }
+        false
+    }
+
+    fn handle_zed_evnets(
+        &self,
+        message: u32,
+        wparam: WPARAM,
+        lparam: LPARAM,
+        msg: *const MSG,
+    ) -> bool {
+        if wparam.0 != self.validation_number {
+            unsafe { DispatchMessageW(msg) };
+            return false;
+        }
+        match message {
+            WM_ZED_CLOSE_ONE_WINDOW => {
+                if self.close_one_window(HWND(lparam.0 as _)) {
+                    return true;
+                }
+            }
+            WM_ZED_EVENT_DISPATCHED_ON_MAIN_THREAD => self.run_foreground_task(),
+            _ => unreachable!(),
+        }
+        false
     }
 }
 
@@ -216,46 +258,17 @@ impl Platform for WindowsPlatform {
         begin_vsync(*vsync_event);
         'a: loop {
             let wait_result = unsafe {
-                MsgWaitForMultipleObjects(
-                    Some(&[*vsync_event, self.dispatch_event]),
-                    false,
-                    INFINITE,
-                    QS_ALLINPUT,
-                )
+                MsgWaitForMultipleObjects(Some(&[*vsync_event]), false, INFINITE, QS_ALLINPUT)
             };
 
             match wait_result {
                 // compositor clock ticked so we should draw a frame
                 WAIT_EVENT(0) => self.redraw_all(),
-                // foreground tasks are dispatched
-                WAIT_EVENT(1) => self.run_foreground_tasks(),
                 // Windows thread messages are posted
-                WAIT_EVENT(2) => {
-                    let mut msg = MSG::default();
-                    unsafe {
-                        while PeekMessageW(&mut msg, None, 0, 0, PM_REMOVE).as_bool() {
-                            match msg.message {
-                                WM_QUIT => break 'a,
-                                CLOSE_ONE_WINDOW => {
-                                    if self.close_one_window(
-                                        HWND(msg.lParam.0 as _),
-                                        msg.wParam.0,
-                                        &msg,
-                                    ) {
-                                        break 'a;
-                                    }
-                                }
-                                _ => {
-                                    // todo(windows)
-                                    // crate `windows 0.56` reports true as Err
-                                    TranslateMessage(&msg).as_bool();
-                                    DispatchMessageW(&msg);
-                                }
-                            }
-                        }
+                WAIT_EVENT(1) => {
+                    if self.handle_events() {
+                        break 'a;
                     }
-                    // foreground tasks may have been queued in the message handlers
-                    self.run_foreground_tasks();
                 }
                 _ => {
                     log::error!("Something went wrong while waiting {:?}", wait_result);
@@ -492,7 +505,11 @@ impl Platform for WindowsPlatform {
         let hcursor = load_cursor(style);
         let mut lock = self.state.borrow_mut();
         if lock.current_cursor.0 != hcursor.0 {
-            self.post_message(CURSOR_STYLE_CHANGED, WPARAM(0), LPARAM(hcursor.0 as isize));
+            self.post_message(
+                WM_ZED_CURSOR_STYLE_CHANGED,
+                WPARAM(0),
+                LPARAM(hcursor.0 as isize),
+            );
             lock.current_cursor = hcursor;
         }
     }
@@ -598,6 +615,7 @@ pub(crate) struct WindowCreationInfo {
     pub(crate) windows_version: WindowsVersion,
     pub(crate) validation_number: usize,
     pub(crate) main_receiver: flume::Receiver<Runnable>,
+    pub(crate) main_thread_id_win32: u32,
 }
 
 fn open_target(target: &str) {

crates/gpui/src/platform/windows/window.rs 🔗

@@ -69,6 +69,7 @@ pub(crate) struct WindowsWindowStatePtr {
     pub(crate) windows_version: WindowsVersion,
     pub(crate) validation_number: usize,
     pub(crate) main_receiver: flume::Receiver<Runnable>,
+    pub(crate) main_thread_id_win32: u32,
 }
 
 impl WindowsWindowState {
@@ -242,6 +243,7 @@ impl WindowsWindowStatePtr {
             windows_version: context.windows_version,
             validation_number: context.validation_number,
             main_receiver: context.main_receiver.clone(),
+            main_thread_id_win32: context.main_thread_id_win32,
         }))
     }
 
@@ -355,6 +357,7 @@ struct WindowCreateContext<'a> {
     validation_number: usize,
     main_receiver: flume::Receiver<Runnable>,
     gpu_context: &'a BladeContext,
+    main_thread_id_win32: u32,
 }
 
 impl WindowsWindow {
@@ -371,6 +374,7 @@ impl WindowsWindow {
             windows_version,
             validation_number,
             main_receiver,
+            main_thread_id_win32,
         } = creation_info;
         let classname = register_wnd_class(icon);
         let hide_title_bar = params
@@ -415,6 +419,7 @@ impl WindowsWindow {
             validation_number,
             main_receiver,
             gpu_context,
+            main_thread_id_win32,
         };
         let lpparam = Some(&context as *const _ as *const _);
         let creation_result = unsafe {