From 95e246ac1cab4df7b7718744e8c146679c8db128 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BC=A0=E5=B0=8F=E7=99=BD?= <364772080@qq.com> Date: Fri, 10 May 2024 05:24:57 +0800 Subject: [PATCH] windows: Better dispatcher (#11485) This PR leverages a more modern Windows API to implement `WindowsDispatcher`, aligning its implementation more closely with that of the `macOS` platform. The following improvements have been made: 1. Similar to `macOS`, there is no longer a need to use `sender` and `receiver` to dispatch a `Runnable` on the main thread. 2. There is also no longer a need to use an `Event` for synchronization. 3. Consistent with #7506 and #11269, `Runnable` is now executed with high priority. However, this PR raises the minimum Windows version requirement of `GPUI` to Windows 10, specifically Windows 10 Fall Creators Update (10.0.16299). However, the `alacritty_terminal` dependency in Zed relies on `conPTY` on Windows, an API introduced in the Windows 10 Fall Creators Update. Therefore, the impact of this PR on Zed should be minimal. I'd like to hear your voices about this PR, especially about the minimum Windows version bumping. Release Notes: - N/A --- Cargo.toml | 5 +- .../gpui/src/platform/windows/dispatcher.rs | 197 +++++++----------- crates/gpui/src/platform/windows/events.rs | 3 - crates/gpui/src/platform/windows/platform.rs | 30 +-- crates/gpui/src/platform/windows/window.rs | 6 - 5 files changed, 86 insertions(+), 155 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index ceab28c4fb74e84b62378189b5e624dca3fc4073..498ba467df8b3459ceaf786b1ab26ac6a08caba4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -227,7 +227,7 @@ snippet = { path = "crates/snippet" } sqlez = { path = "crates/sqlez" } sqlez_macros = { path = "crates/sqlez_macros" } supermaven = { path = "crates/supermaven" } -supermaven_api = { path = "crates/supermaven_api"} +supermaven_api = { path = "crates/supermaven_api" } story = { path = "crates/story" } storybook = { path = "crates/storybook" } sum_tree = { path = "crates/sum_tree" } @@ -386,6 +386,8 @@ version = "0.53.0" features = [ "implement", "Foundation_Numerics", + "System", + "System_Threading", "Wdk_System_SystemServices", "Win32_Globalization", "Win32_Graphics_Direct2D", @@ -409,6 +411,7 @@ features = [ "Win32_System_SystemServices", "Win32_System_Threading", "Win32_System_Time", + "Win32_System_WinRT", "Win32_UI_Controls", "Win32_UI_HiDpi", "Win32_UI_Input_Ime", diff --git a/crates/gpui/src/platform/windows/dispatcher.rs b/crates/gpui/src/platform/windows/dispatcher.rs index 5a7e2b8ee1d18dc3cea39acc7198923549bfe8e1..593213221035fe684600c005327f5e6f4e4da950 100644 --- a/crates/gpui/src/platform/windows/dispatcher.rs +++ b/crates/gpui/src/platform/windows/dispatcher.rs @@ -1,71 +1,97 @@ use std::{ - sync::{ - atomic::{AtomicIsize, Ordering}, - Arc, - }, thread::{current, ThreadId}, + time::Duration, }; use async_task::Runnable; -use flume::Sender; use parking::Parker; use parking_lot::Mutex; -use windows::Win32::{Foundation::*, System::Threading::*}; +use util::ResultExt; +use windows::{ + Foundation::TimeSpan, + System::{ + DispatcherQueue, DispatcherQueueController, DispatcherQueueHandler, + Threading::{ + ThreadPool, ThreadPoolTimer, TimerElapsedHandler, WorkItemHandler, WorkItemOptions, + WorkItemPriority, + }, + }, + Win32::System::WinRT::{ + CreateDispatcherQueueController, DispatcherQueueOptions, DQTAT_COM_NONE, + DQTYPE_THREAD_CURRENT, + }, +}; use crate::{PlatformDispatcher, TaskLabel}; pub(crate) struct WindowsDispatcher { - threadpool: PTP_POOL, - main_sender: Sender, + controller: DispatcherQueueController, + main_queue: DispatcherQueue, parker: Mutex, main_thread_id: ThreadId, - dispatch_event: HANDLE, } +unsafe impl Send for WindowsDispatcher {} +unsafe impl Sync for WindowsDispatcher {} + impl WindowsDispatcher { - pub(crate) fn new(main_sender: Sender, dispatch_event: HANDLE) -> Self { - let parker = Mutex::new(Parker::new()); - let threadpool = unsafe { - let ret = CreateThreadpool(None); - if ret.0 == 0 { - panic!( - "unable to initialize a thread pool: {}", - std::io::Error::last_os_error() - ); - } - // set minimum 1 thread in threadpool - let _ = SetThreadpoolThreadMinimum(ret, 1) - .inspect_err(|_| log::error!("unable to configure thread pool")); - - ret + pub(crate) fn new() -> Self { + let controller = unsafe { + let options = DispatcherQueueOptions { + dwSize: std::mem::size_of::() as u32, + threadType: DQTYPE_THREAD_CURRENT, + apartmentType: DQTAT_COM_NONE, + }; + CreateDispatcherQueueController(options).unwrap() }; + let main_queue = controller.DispatcherQueue().unwrap(); + let parker = Mutex::new(Parker::new()); let main_thread_id = current().id(); + WindowsDispatcher { - threadpool, - main_sender, + controller, + main_queue, parker, main_thread_id, - dispatch_event, } } fn dispatch_on_threadpool(&self, runnable: Runnable) { - unsafe { - let ptr = Box::into_raw(Box::new(runnable)); - let environment = get_threadpool_environment(self.threadpool); - let Ok(work) = - CreateThreadpoolWork(Some(threadpool_runner), Some(ptr as _), Some(&environment)) - .inspect_err(|_| { - log::error!( - "unable to dispatch work on thread pool: {}", - std::io::Error::last_os_error() - ) - }) - else { - return; - }; - SubmitThreadpoolWork(work); - } + let handler = { + let mut task_wrapper = Some(runnable); + WorkItemHandler::new(move |_| { + task_wrapper.take().unwrap().run(); + Ok(()) + }) + }; + ThreadPool::RunWithPriorityAndOptionsAsync( + &handler, + WorkItemPriority::High, + WorkItemOptions::TimeSliced, + ) + .log_err(); + } + + fn dispatch_on_threadpool_after(&self, runnable: Runnable, duration: Duration) { + let handler = { + let mut task_wrapper = Some(runnable); + TimerElapsedHandler::new(move |_| { + task_wrapper.take().unwrap().run(); + Ok(()) + }) + }; + let delay = TimeSpan { + // A time period expressed in 100-nanosecond units. + // 10,000,000 ticks per second + Duration: (duration.as_nanos() / 100) as i64, + }; + ThreadPoolTimer::CreateTimer(&handler, delay).log_err(); + } +} + +impl Drop for WindowsDispatcher { + fn drop(&mut self) { + self.controller.ShutdownQueueAsync().log_err(); } } @@ -82,38 +108,18 @@ impl PlatformDispatcher for WindowsDispatcher { } fn dispatch_on_main_thread(&self, runnable: Runnable) { - self.main_sender - .send(runnable) - .inspect_err(|e| log::error!("Dispatch failed: {e}")) - .ok(); - unsafe { SetEvent(self.dispatch_event) }.ok(); + let handler = { + let mut task_wrapper = Some(runnable); + DispatcherQueueHandler::new(move || { + task_wrapper.take().unwrap().run(); + Ok(()) + }) + }; + self.main_queue.TryEnqueue(&handler).log_err(); } - fn dispatch_after(&self, duration: std::time::Duration, runnable: Runnable) { - if duration.as_millis() == 0 { - self.dispatch_on_threadpool(runnable); - return; - } - unsafe { - let mut handle = std::mem::zeroed(); - let task = Arc::new(DelayedTask::new(runnable)); - let _ = CreateTimerQueueTimer( - &mut handle, - None, - Some(timer_queue_runner), - Some(Arc::into_raw(task.clone()) as _), - duration.as_millis() as u32, - 0, - WT_EXECUTEONLYONCE, - ) - .inspect_err(|_| { - log::error!( - "unable to dispatch delayed task: {}", - std::io::Error::last_os_error() - ) - }); - task.raw_timer_handle.store(handle.0, Ordering::SeqCst); - } + fn dispatch_after(&self, duration: Duration, runnable: Runnable) { + self.dispatch_on_threadpool_after(runnable, duration); } fn tick(&self, _background_only: bool) -> bool { @@ -128,48 +134,3 @@ impl PlatformDispatcher for WindowsDispatcher { self.parker.lock().unparker() } } - -extern "system" fn threadpool_runner( - _: PTP_CALLBACK_INSTANCE, - ptr: *mut std::ffi::c_void, - _: PTP_WORK, -) { - unsafe { - let runnable = Box::from_raw(ptr as *mut Runnable); - runnable.run(); - } -} - -unsafe extern "system" fn timer_queue_runner(ptr: *mut std::ffi::c_void, _: BOOLEAN) { - let task = Arc::from_raw(ptr as *mut DelayedTask); - task.runnable.lock().take().unwrap().run(); - unsafe { - let timer = task.raw_timer_handle.load(Ordering::SeqCst); - let _ = DeleteTimerQueueTimer(None, HANDLE(timer), None); - } -} - -struct DelayedTask { - runnable: Mutex>, - raw_timer_handle: AtomicIsize, -} - -impl DelayedTask { - pub fn new(runnable: Runnable) -> Self { - DelayedTask { - runnable: Mutex::new(Some(runnable)), - raw_timer_handle: AtomicIsize::new(0), - } - } -} - -#[inline] -fn get_threadpool_environment(pool: PTP_POOL) -> TP_CALLBACK_ENVIRON_V3 { - TP_CALLBACK_ENVIRON_V3 { - Version: 3, // Win7+, otherwise this value should be 1 - Pool: pool, - CallbackPriority: TP_CALLBACK_PRIORITY_NORMAL, - Size: std::mem::size_of::() as _, - ..Default::default() - } -} diff --git a/crates/gpui/src/platform/windows/events.rs b/crates/gpui/src/platform/windows/events.rs index 89315eac071fca8fa9f622b0356cadafd60159fe..b79ddced5ded1a9b9ea893e8df0fc5d354acc146 100644 --- a/crates/gpui/src/platform/windows/events.rs +++ b/crates/gpui/src/platform/windows/events.rs @@ -171,9 +171,6 @@ fn handle_timer_msg( state_ptr: Rc, ) -> Option { if wparam.0 == SIZE_MOVE_LOOP_TIMER_ID { - for runnable in state_ptr.main_receiver.drain() { - runnable.run(); - } handle_paint_msg(handle, state_ptr) } else { None diff --git a/crates/gpui/src/platform/windows/platform.rs b/crates/gpui/src/platform/windows/platform.rs index a38e0d18b4ed93bdafdb1ae2f183beef2d6ed081..470d6be0e46602115c0b7a7c73e9aa88d3acc52d 100644 --- a/crates/gpui/src/platform/windows/platform.rs +++ b/crates/gpui/src/platform/windows/platform.rs @@ -13,7 +13,6 @@ use std::{ use ::util::ResultExt; use anyhow::{anyhow, Context, Result}; -use async_task::Runnable; use copypasta::{ClipboardContext, ClipboardProvider}; use futures::channel::oneshot::{self, Receiver}; use itertools::Itertools; @@ -42,11 +41,9 @@ pub(crate) struct WindowsPlatform { raw_window_handles: RwLock>, // The below members will never change throughout the entire lifecycle of the app. icon: HICON, - main_receiver: flume::Receiver, background_executor: BackgroundExecutor, foreground_executor: ForegroundExecutor, text_system: Arc, - dispatch_event: OwnedHandle, } pub(crate) struct WindowsPlatformState { @@ -85,10 +82,7 @@ impl WindowsPlatform { unsafe { OleInitialize(None).expect("unable to initialize Windows OLE"); } - let (main_sender, main_receiver) = flume::unbounded::(); - let dispatch_event = - OwnedHandle::new(unsafe { CreateEventW(None, false, false, None) }.unwrap()); - let dispatcher = Arc::new(WindowsDispatcher::new(main_sender, dispatch_event.to_raw())); + let dispatcher = Arc::new(WindowsDispatcher::new()); let background_executor = BackgroundExecutor::new(dispatcher.clone()); let foreground_executor = ForegroundExecutor::new(dispatcher); let text_system = if let Some(direct_write) = DirectWriteTextSystem::new().log_err() { @@ -106,18 +100,9 @@ impl WindowsPlatform { state, raw_window_handles, icon, - main_receiver, background_executor, foreground_executor, text_system, - dispatch_event, - } - } - - #[inline] - fn run_foreground_tasks(&self) { - for runnable in self.main_receiver.drain() { - runnable.run(); } } @@ -201,7 +186,6 @@ impl Platform for WindowsPlatform { fn run(&self, on_finish_launching: Box) { on_finish_launching(); - let dispatch_event = self.dispatch_event.to_raw(); let vsync_event = create_event().unwrap(); let timer_stop_event = create_event().unwrap(); let raw_timer_stop_event = timer_stop_event.to_raw(); @@ -209,7 +193,7 @@ impl Platform for WindowsPlatform { 'a: loop { let wait_result = unsafe { MsgWaitForMultipleObjects( - Some(&[vsync_event.to_raw(), dispatch_event]), + Some(&[vsync_event.to_raw()]), false, INFINITE, QS_ALLINPUT, @@ -221,12 +205,8 @@ impl Platform for WindowsPlatform { WAIT_EVENT(0) => { self.redraw_all(); } - // foreground tasks are dispatched - WAIT_EVENT(1) => { - self.run_foreground_tasks(); - } // Windows thread messages are posted - WAIT_EVENT(2) => { + WAIT_EVENT(1) => { let mut msg = MSG::default(); unsafe { while PeekMessageW(&mut msg, None, 0, 0, PM_REMOVE).as_bool() { @@ -245,9 +225,6 @@ impl Platform for WindowsPlatform { } } } - - // foreground tasks may have been queued in the message handlers - self.run_foreground_tasks(); } _ => { log::error!("Something went wrong while waiting {:?}", wait_result); @@ -344,7 +321,6 @@ impl Platform for WindowsPlatform { options, self.icon, self.foreground_executor.clone(), - self.main_receiver.clone(), lock.settings.mouse_wheel_settings, lock.current_cursor, ); diff --git a/crates/gpui/src/platform/windows/window.rs b/crates/gpui/src/platform/windows/window.rs index 753abe56d8c3c0499d75c6ae26fa101470013591..55ecd0829c641ef1631076fcaedbbae3a8818325 100644 --- a/crates/gpui/src/platform/windows/window.rs +++ b/crates/gpui/src/platform/windows/window.rs @@ -12,7 +12,6 @@ use std::{ use ::util::ResultExt; use anyhow::Context; -use async_task::Runnable; use futures::channel::oneshot::{self, Receiver}; use itertools::Itertools; use raw_window_handle as rwh; @@ -58,7 +57,6 @@ pub(crate) struct WindowsWindowStatePtr { pub(crate) handle: AnyWindowHandle, pub(crate) hide_title_bar: bool, pub(crate) executor: ForegroundExecutor, - pub(crate) main_receiver: flume::Receiver, } impl WindowsWindowState { @@ -208,7 +206,6 @@ impl WindowsWindowStatePtr { handle: context.handle, hide_title_bar: context.hide_title_bar, executor: context.executor.clone(), - main_receiver: context.main_receiver.clone(), }) } } @@ -232,7 +229,6 @@ struct WindowCreateContext { display: WindowsDisplay, transparent: bool, executor: ForegroundExecutor, - main_receiver: flume::Receiver, mouse_wheel_settings: MouseWheelSettings, current_cursor: HCURSOR, } @@ -243,7 +239,6 @@ impl WindowsWindow { params: WindowParams, icon: HICON, executor: ForegroundExecutor, - main_receiver: flume::Receiver, mouse_wheel_settings: MouseWheelSettings, current_cursor: HCURSOR, ) -> Self { @@ -272,7 +267,6 @@ impl WindowsWindow { display: WindowsDisplay::primary_monitor().unwrap(), transparent: params.window_background != WindowBackgroundAppearance::Opaque, executor, - main_receiver, mouse_wheel_settings, current_cursor, };