From 794fa9d6d142b3d9a7aa3305842d6637964db5da Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Yara=20=F0=9F=8F=B3=EF=B8=8F=E2=80=8D=E2=9A=A7=EF=B8=8F?= Date: Mon, 5 Jan 2026 11:56:10 +0100 Subject: [PATCH] Revert "Revert windows implementation of "Multiple priority scheduler (#44701)"" (#46066) Reverts zed-industries/zed#44990 Release Notes: - N/A --- crates/gpui/src/executor.rs | 10 -- crates/gpui/src/gpui.rs | 4 +- .../gpui/src/platform/windows/dispatcher.rs | 91 +++++++++++-------- crates/gpui/src/platform/windows/events.rs | 3 +- crates/gpui/src/platform/windows/platform.rs | 24 ++--- crates/gpui/src/platform/windows/window.rs | 4 +- 6 files changed, 71 insertions(+), 65 deletions(-) diff --git a/crates/gpui/src/executor.rs b/crates/gpui/src/executor.rs index eb16cbd9a0bce1cc2444167cc793e1c8d55b7053..109e13d4b2ee601c9139f306b411702e984bb703 100644 --- a/crates/gpui/src/executor.rs +++ b/crates/gpui/src/executor.rs @@ -366,19 +366,9 @@ impl BackgroundExecutor { &self, future: AnyFuture, label: Option, - #[cfg_attr( - target_os = "windows", - expect( - unused_variables, - reason = "Multi priority scheduler is broken on windows" - ) - )] priority: Priority, ) -> Task { let dispatcher = self.dispatcher.clone(); - #[cfg(target_os = "windows")] - let priority = Priority::Medium; // multi-prio scheduler is broken on windows - let (runnable, task) = if let Priority::Realtime(realtime) = priority { let location = core::panic::Location::caller(); let (mut tx, rx) = flume::bounded::>(1); diff --git a/crates/gpui/src/gpui.rs b/crates/gpui/src/gpui.rs index 76a61e286d3fe6c1acae8e4e628d4c9130f1305f..e5c726f58e117b76e2dbb2976089d5788baa848e 100644 --- a/crates/gpui/src/gpui.rs +++ b/crates/gpui/src/gpui.rs @@ -31,7 +31,7 @@ mod path_builder; mod platform; pub mod prelude; mod profiler; -#[cfg(target_os = "linux")] +#[cfg(any(target_os = "windows", target_os = "linux"))] mod queue; mod scene; mod shared_string; @@ -91,7 +91,7 @@ pub use keymap::*; pub use path_builder::*; pub use platform::*; pub use profiler::*; -#[cfg(target_os = "linux")] +#[cfg(any(target_os = "windows", target_os = "linux"))] pub(crate) use queue::{PriorityQueueReceiver, PriorityQueueSender}; pub use refineable::*; pub use scene::*; diff --git a/crates/gpui/src/platform/windows/dispatcher.rs b/crates/gpui/src/platform/windows/dispatcher.rs index 14486ccee9843ef9c0792d62f22fa825f0db43ee..0720d414c9b44dec4a3bab5b50fd7dde47991989 100644 --- a/crates/gpui/src/platform/windows/dispatcher.rs +++ b/crates/gpui/src/platform/windows/dispatcher.rs @@ -4,24 +4,31 @@ use std::{ time::{Duration, Instant}, }; -use flume::Sender; +use anyhow::Context; use util::ResultExt; use windows::{ - System::Threading::{ThreadPool, ThreadPoolTimer, TimerElapsedHandler, WorkItemHandler}, + System::Threading::{ + ThreadPool, ThreadPoolTimer, TimerElapsedHandler, WorkItemHandler, WorkItemPriority, + }, Win32::{ Foundation::{LPARAM, WPARAM}, + System::Threading::{ + GetCurrentThread, HIGH_PRIORITY_CLASS, SetPriorityClass, SetThreadPriority, + THREAD_PRIORITY_HIGHEST, THREAD_PRIORITY_TIME_CRITICAL, + }, UI::WindowsAndMessaging::PostMessageW, }, }; use crate::{ - GLOBAL_THREAD_TIMINGS, HWND, PlatformDispatcher, RunnableVariant, SafeHwnd, THREAD_TIMINGS, - TaskLabel, TaskTiming, ThreadTaskTimings, WM_GPUI_TASK_DISPATCHED_ON_MAIN_THREAD, + GLOBAL_THREAD_TIMINGS, HWND, PlatformDispatcher, Priority, PriorityQueueSender, + RealtimePriority, RunnableVariant, SafeHwnd, THREAD_TIMINGS, TaskLabel, TaskTiming, + ThreadTaskTimings, WM_GPUI_TASK_DISPATCHED_ON_MAIN_THREAD, profiler, }; pub(crate) struct WindowsDispatcher { pub(crate) wake_posted: AtomicBool, - main_sender: Sender, + main_sender: PriorityQueueSender, main_thread_id: ThreadId, pub(crate) platform_window_handle: SafeHwnd, validation_number: usize, @@ -29,7 +36,7 @@ pub(crate) struct WindowsDispatcher { impl WindowsDispatcher { pub(crate) fn new( - main_sender: Sender, + main_sender: PriorityQueueSender, platform_window_handle: HWND, validation_number: usize, ) -> Self { @@ -45,7 +52,7 @@ impl WindowsDispatcher { } } - fn dispatch_on_threadpool(&self, runnable: RunnableVariant) { + fn dispatch_on_threadpool(&self, priority: WorkItemPriority, runnable: RunnableVariant) { let handler = { let mut task_wrapper = Some(runnable); WorkItemHandler::new(move |_| { @@ -53,7 +60,8 @@ impl WindowsDispatcher { Ok(()) }) }; - ThreadPool::RunAsync(&handler).log_err(); + + ThreadPool::RunWithPriorityAsync(&handler, priority).log_err(); } fn dispatch_on_threadpool_after(&self, runnable: RunnableVariant, duration: Duration) { @@ -79,7 +87,7 @@ impl WindowsDispatcher { start, end: None, }; - Self::add_task_timing(timing); + profiler::add_task_timing(timing); runnable.run(); @@ -91,7 +99,7 @@ impl WindowsDispatcher { start, end: None, }; - Self::add_task_timing(timing); + profiler::add_task_timing(timing); runnable.run(); @@ -102,23 +110,7 @@ impl WindowsDispatcher { let end = Instant::now(); timing.end = Some(end); - Self::add_task_timing(timing); - } - - pub(crate) fn add_task_timing(timing: TaskTiming) { - THREAD_TIMINGS.with(|timings| { - let mut timings = timings.lock(); - let timings = &mut timings.timings; - - if let Some(last_timing) = timings.iter_mut().rev().next() { - if last_timing.location == timing.location { - last_timing.end = timing.end; - return; - } - } - - timings.push_back(timing); - }); + profiler::add_task_timing(timing); } } @@ -146,20 +138,22 @@ impl PlatformDispatcher for WindowsDispatcher { current().id() == self.main_thread_id } - fn dispatch( - &self, - runnable: RunnableVariant, - label: Option, - _priority: gpui::Priority, - ) { - self.dispatch_on_threadpool(runnable); + fn dispatch(&self, runnable: RunnableVariant, label: Option, priority: Priority) { + let priority = match priority { + Priority::Realtime(_) => unreachable!(), + Priority::High => WorkItemPriority::High, + Priority::Medium => WorkItemPriority::Normal, + Priority::Low => WorkItemPriority::Low, + }; + self.dispatch_on_threadpool(priority, runnable); + if let Some(label) = label { log::debug!("TaskLabel: {label:?}"); } } - fn dispatch_on_main_thread(&self, runnable: RunnableVariant, _priority: gpui::Priority) { - match self.main_sender.send(runnable) { + fn dispatch_on_main_thread(&self, runnable: RunnableVariant, priority: Priority) { + match self.main_sender.send(priority, runnable) { Ok(_) => { if !self.wake_posted.swap(true, Ordering::AcqRel) { unsafe { @@ -191,8 +185,27 @@ impl PlatformDispatcher for WindowsDispatcher { self.dispatch_on_threadpool_after(runnable, duration); } - fn spawn_realtime(&self, _priority: crate::RealtimePriority, _f: Box) { - // disabled on windows for now. - unimplemented!(); + fn spawn_realtime(&self, priority: RealtimePriority, f: Box) { + std::thread::spawn(move || { + // SAFETY: always safe to call + let thread_handle = unsafe { GetCurrentThread() }; + + let thread_priority = match priority { + RealtimePriority::Audio => THREAD_PRIORITY_TIME_CRITICAL, + RealtimePriority::Other => THREAD_PRIORITY_HIGHEST, + }; + + // SAFETY: thread_handle is a valid handle to a thread + unsafe { SetPriorityClass(thread_handle, HIGH_PRIORITY_CLASS) } + .context("thread priority class") + .log_err(); + + // SAFETY: thread_handle is a valid handle to a thread + unsafe { SetThreadPriority(thread_handle, thread_priority) } + .context("thread priority") + .log_err(); + + f(); + }); } } diff --git a/crates/gpui/src/platform/windows/events.rs b/crates/gpui/src/platform/windows/events.rs index 8d44ba274ea7a3eec2714d83bb6dac63fbac370b..8cbe0430db5bb5d51e29c310fca957c3228e2af0 100644 --- a/crates/gpui/src/platform/windows/events.rs +++ b/crates/gpui/src/platform/windows/events.rs @@ -248,7 +248,8 @@ impl WindowsWindowInner { fn handle_timer_msg(&self, handle: HWND, wparam: WPARAM) -> Option { if wparam.0 == SIZE_MOVE_LOOP_TIMER_ID { - for runnable in self.main_receiver.drain() { + let mut runnables = self.main_receiver.clone().try_iter(); + while let Some(Ok(runnable)) = runnables.next() { WindowsDispatcher::execute_runnable(runnable); } self.handle_paint_msg(handle) diff --git a/crates/gpui/src/platform/windows/platform.rs b/crates/gpui/src/platform/windows/platform.rs index 0e0fdd56c54d56587c09bca14f16dd8e5aef389d..75085d0b4ec3cae5a156e1d9029e5d79353477e0 100644 --- a/crates/gpui/src/platform/windows/platform.rs +++ b/crates/gpui/src/platform/windows/platform.rs @@ -51,7 +51,7 @@ struct WindowsPlatformInner { raw_window_handles: std::sync::Weak>>, // The below members will never change throughout the entire lifecycle of the app. validation_number: usize, - main_receiver: flume::Receiver, + main_receiver: PriorityQueueReceiver, dispatcher: Arc, } @@ -98,7 +98,7 @@ impl WindowsPlatform { OleInitialize(None).context("unable to initialize Windows OLE")?; } let directx_devices = DirectXDevices::new().context("Creating DirectX devices")?; - let (main_sender, main_receiver) = flume::unbounded::(); + let (main_sender, main_receiver) = PriorityQueueReceiver::new(); let validation_number = if usize::BITS == 64 { rand::random::() as usize } else { @@ -857,22 +857,24 @@ impl WindowsPlatformInner { } break 'tasks; } - match self.main_receiver.try_recv() { - Err(_) => break 'timeout_loop, - Ok(runnable) => WindowsDispatcher::execute_runnable(runnable), + let mut main_receiver = self.main_receiver.clone(); + match main_receiver.try_pop() { + Ok(Some(runnable)) => WindowsDispatcher::execute_runnable(runnable), + _ => break 'timeout_loop, } } // Someone could enqueue a Runnable here. The flag is still true, so they will not PostMessage. // We need to check for those Runnables after we clear the flag. self.dispatcher.wake_posted.store(false, Ordering::Release); - match self.main_receiver.try_recv() { - Err(_) => break 'tasks, - Ok(runnable) => { + let mut main_receiver = self.main_receiver.clone(); + match main_receiver.try_pop() { + Ok(Some(runnable)) => { self.dispatcher.wake_posted.store(true, Ordering::Release); WindowsDispatcher::execute_runnable(runnable); } + _ => break 'tasks, } } @@ -934,7 +936,7 @@ pub(crate) struct WindowCreationInfo { pub(crate) windows_version: WindowsVersion, pub(crate) drop_target_helper: IDropTargetHelper, pub(crate) validation_number: usize, - pub(crate) main_receiver: flume::Receiver, + pub(crate) main_receiver: PriorityQueueReceiver, pub(crate) platform_window_handle: HWND, pub(crate) disable_direct_composition: bool, pub(crate) directx_devices: DirectXDevices, @@ -947,8 +949,8 @@ struct PlatformWindowCreateContext { inner: Option>>, raw_window_handles: std::sync::Weak>>, validation_number: usize, - main_sender: Option>, - main_receiver: Option>, + main_sender: Option>, + main_receiver: Option>, directx_devices: Option, dispatcher: Option>, } diff --git a/crates/gpui/src/platform/windows/window.rs b/crates/gpui/src/platform/windows/window.rs index fa804e3d160a370851701ea0a7600103b66356a8..e9da322f5327606d141517bdd246bac8e74f2d58 100644 --- a/crates/gpui/src/platform/windows/window.rs +++ b/crates/gpui/src/platform/windows/window.rs @@ -81,7 +81,7 @@ pub(crate) struct WindowsWindowInner { pub(crate) executor: ForegroundExecutor, pub(crate) windows_version: WindowsVersion, pub(crate) validation_number: usize, - pub(crate) main_receiver: flume::Receiver, + pub(crate) main_receiver: PriorityQueueReceiver, pub(crate) platform_window_handle: HWND, pub(crate) parent_hwnd: Option, } @@ -364,7 +364,7 @@ struct WindowCreateContext { windows_version: WindowsVersion, drop_target_helper: IDropTargetHelper, validation_number: usize, - main_receiver: flume::Receiver, + main_receiver: PriorityQueueReceiver, platform_window_handle: HWND, appearance: WindowAppearance, disable_direct_composition: bool,