Coalesce dispatcher window messages on Windows (#41861)

John Tur created

Take 2 at https://github.com/zed-industries/zed/pull/41595

Release Notes:

- N/A

Change summary

crates/gpui/src/platform/windows/dispatcher.rs | 25 +++++++----
crates/gpui/src/platform/windows/platform.rs   | 45 ++++++++++++++++---
2 files changed, 53 insertions(+), 17 deletions(-)

Detailed changes

crates/gpui/src/platform/windows/dispatcher.rs 🔗

@@ -1,4 +1,5 @@
 use std::{
+    sync::atomic::{AtomicBool, Ordering},
     thread::{ThreadId, current},
     time::Duration,
 };
@@ -21,6 +22,7 @@ use crate::{
 };
 
 pub(crate) struct WindowsDispatcher {
+    pub(crate) wake_posted: AtomicBool,
     main_sender: Sender<Runnable>,
     main_thread_id: ThreadId,
     platform_window_handle: SafeHwnd,
@@ -41,6 +43,7 @@ impl WindowsDispatcher {
             main_thread_id,
             platform_window_handle,
             validation_number,
+            wake_posted: AtomicBool::new(false),
         }
     }
 
@@ -81,15 +84,19 @@ impl PlatformDispatcher for WindowsDispatcher {
 
     fn dispatch_on_main_thread(&self, runnable: Runnable) {
         match self.main_sender.send(runnable) {
-            Ok(_) => unsafe {
-                PostMessageW(
-                    Some(self.platform_window_handle.as_raw()),
-                    WM_GPUI_TASK_DISPATCHED_ON_MAIN_THREAD,
-                    WPARAM(self.validation_number),
-                    LPARAM(0),
-                )
-                .log_err();
-            },
+            Ok(_) => {
+                if !self.wake_posted.swap(true, Ordering::AcqRel) {
+                    unsafe {
+                        PostMessageW(
+                            Some(self.platform_window_handle.as_raw()),
+                            WM_GPUI_TASK_DISPATCHED_ON_MAIN_THREAD,
+                            WPARAM(self.validation_number),
+                            LPARAM(0),
+                        )
+                        .log_err();
+                    }
+                }
+            }
             Err(runnable) => {
                 // NOTE: Runnable may wrap a Future that is !Send.
                 //

crates/gpui/src/platform/windows/platform.rs 🔗

@@ -4,7 +4,7 @@ use std::{
     mem::ManuallyDrop,
     path::{Path, PathBuf},
     rc::{Rc, Weak},
-    sync::Arc,
+    sync::{Arc, atomic::Ordering},
 };
 
 use ::util::{ResultExt, paths::SanitizedPath};
@@ -48,6 +48,7 @@ struct WindowsPlatformInner {
     // The below members will never change throughout the entire lifecycle of the app.
     validation_number: usize,
     main_receiver: flume::Receiver<Runnable>,
+    dispatcher: Arc<WindowsDispatcher>,
 }
 
 pub(crate) struct WindowsPlatformState {
@@ -109,8 +110,10 @@ impl WindowsPlatform {
             inner: None,
             raw_window_handles: Arc::downgrade(&raw_window_handles),
             validation_number,
+            main_sender: Some(main_sender),
             main_receiver: Some(main_receiver),
             directx_devices: Some(directx_devices),
+            dispatcher: None,
         };
         let result = unsafe {
             CreateWindowExW(
@@ -129,12 +132,9 @@ impl WindowsPlatform {
             )
         };
         let inner = context.inner.take().unwrap()?;
+        let dispatcher = context.dispatcher.take().unwrap();
         let handle = result?;
-        let dispatcher = Arc::new(WindowsDispatcher::new(
-            main_sender,
-            handle,
-            validation_number,
-        ));
+
         let disable_direct_composition = std::env::var(DISABLE_DIRECT_COMPOSITION)
             .is_ok_and(|value| value == "true" || value == "1");
         let background_executor = BackgroundExecutor::new(dispatcher.clone());
@@ -682,6 +682,7 @@ impl WindowsPlatformInner {
         Ok(Rc::new(Self {
             state,
             raw_window_handles: context.raw_window_handles.clone(),
+            dispatcher: context.dispatcher.as_ref().unwrap().clone(),
             validation_number: context.validation_number,
             main_receiver: context.main_receiver.take().unwrap(),
         }))
@@ -746,9 +747,28 @@ impl WindowsPlatformInner {
 
     #[inline]
     fn run_foreground_task(&self) -> Option<isize> {
-        for runnable in self.main_receiver.drain() {
-            runnable.run();
+        loop {
+            for runnable in self.main_receiver.drain() {
+                runnable.run();
+            }
+
+            // Someone could enqueue a Runnable here. The flag is still true, so they will not PostMessage.
+            // We need to check for those Runnables after we clear the flag.
+            let dispatcher = self.dispatcher.clone();
+
+            dispatcher.wake_posted.store(false, Ordering::Release);
+            match self.main_receiver.try_recv() {
+                Ok(runnable) => {
+                    let _ = dispatcher.wake_posted.swap(true, Ordering::AcqRel);
+                    runnable.run();
+                    continue;
+                }
+                _ => {
+                    break;
+                }
+            }
         }
+
         Some(0)
     }
 
@@ -832,8 +852,10 @@ struct PlatformWindowCreateContext {
     inner: Option<Result<Rc<WindowsPlatformInner>>>,
     raw_window_handles: std::sync::Weak<RwLock<SmallVec<[SafeHwnd; 4]>>>,
     validation_number: usize,
+    main_sender: Option<flume::Sender<Runnable>>,
     main_receiver: Option<flume::Receiver<Runnable>>,
     directx_devices: Option<DirectXDevices>,
+    dispatcher: Option<Arc<WindowsDispatcher>>,
 }
 
 fn open_target(target: impl AsRef<OsStr>) -> Result<()> {
@@ -1115,6 +1137,13 @@ unsafe extern "system" fn window_procedure(
         let params = unsafe { &*params };
         let creation_context = params.lpCreateParams as *mut PlatformWindowCreateContext;
         let creation_context = unsafe { &mut *creation_context };
+
+        creation_context.dispatcher = Some(Arc::new(WindowsDispatcher::new(
+            creation_context.main_sender.take().unwrap(),
+            hwnd,
+            creation_context.validation_number,
+        )));
+
         return match WindowsPlatformInner::new(creation_context) {
             Ok(inner) => {
                 let weak = Box::new(Rc::downgrade(&inner));