From ecb8d3d4dd821b61b9c16457897130ffcffc4eeb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Yara=20=F0=9F=8F=B3=EF=B8=8F=E2=80=8D=E2=9A=A7=EF=B8=8F?= Date: Thu, 11 Dec 2025 16:16:43 +0100 Subject: [PATCH] Revert "Multiple priority scheduler" (#44637) Reverts zed-industries/zed#44575 --- Cargo.lock | 1 - crates/gpui/Cargo.toml | 5 +- crates/gpui/build.rs | 2 - crates/gpui/src/app.rs | 27 +- crates/gpui/src/app/context.rs | 23 +- crates/gpui/src/executor.rs | 162 +-------- crates/gpui/src/gpui.rs | 9 +- crates/gpui/src/platform.rs | 12 +- crates/gpui/src/platform/linux/dispatcher.rs | 329 +++--------------- crates/gpui/src/platform/linux/platform.rs | 10 +- .../gpui/src/platform/linux/wayland/client.rs | 10 +- crates/gpui/src/platform/linux/x11/client.rs | 8 +- crates/gpui/src/platform/mac/dispatcher.rs | 145 +------- crates/gpui/src/platform/test/dispatcher.rs | 12 +- .../gpui/src/platform/windows/dispatcher.rs | 85 ++--- crates/gpui/src/platform/windows/events.rs | 3 +- crates/gpui/src/platform/windows/platform.rs | 24 +- crates/gpui/src/platform/windows/window.rs | 4 +- crates/gpui/src/profiler.rs | 16 - crates/gpui/src/queue.rs | 329 ------------------ crates/gpui/src/window.rs | 38 +- crates/repl/src/repl.rs | 4 +- crates/worktree/src/worktree.rs | 5 +- typos.toml | 2 - 24 files changed, 154 insertions(+), 1111 deletions(-) delete mode 100644 crates/gpui/src/queue.rs diff --git a/Cargo.lock b/Cargo.lock index beeff87127cf51dad5fadf54d126a471a5053606..ff1041695e1f1e95bcbc05798d1a1e0f953533ff 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7239,7 +7239,6 @@ dependencies = [ "libc", "log", "lyon", - "mach2 0.5.0", "media", "metal", "naga", diff --git a/crates/gpui/Cargo.toml b/crates/gpui/Cargo.toml index 8fc37978683357e53ed9f9c3cf587fcd704431e2..4985cc07383aac56d6975fa09a410a0cee6c549d 100644 --- a/crates/gpui/Cargo.toml +++ b/crates/gpui/Cargo.toml @@ -21,6 +21,7 @@ default = ["font-kit", "wayland", "x11", "windows-manifest"] test-support = [ "leak-detection", "collections/test-support", + "rand", "util/test-support", "http_client/test-support", "wayland", @@ -108,7 +109,7 @@ parking = "2.0.0" parking_lot.workspace = true postage.workspace = true profiling.workspace = true -rand.workspace = true +rand = { optional = true, workspace = true } raw-window-handle = "0.6" refineable.workspace = true resvg = { version = "0.45.0", default-features = false, features = [ @@ -157,10 +158,8 @@ media.workspace = true objc.workspace = true objc2 = { version = "0.6", optional = true } objc2-metal = { version = "0.3", optional = true } -mach2.workspace = true #TODO: replace with "objc2" metal.workspace = true -flume = "0.11" [target.'cfg(any(target_os = "linux", target_os = "freebsd", target_os = "macos"))'.dependencies] pathfinder_geometry = "0.5" diff --git a/crates/gpui/build.rs b/crates/gpui/build.rs index c7ae7ac9f239f2f6ce3880f9329f2ba92b2174f3..ec35ec0bc63113582a945c71198cd7bc14301dcc 100644 --- a/crates/gpui/build.rs +++ b/crates/gpui/build.rs @@ -84,8 +84,6 @@ mod macos { .allowlist_var("_dispatch_main_q") .allowlist_var("_dispatch_source_type_data_add") .allowlist_var("DISPATCH_QUEUE_PRIORITY_HIGH") - .allowlist_var("DISPATCH_QUEUE_PRIORITY_DEFAULT") - .allowlist_var("DISPATCH_QUEUE_PRIORITY_LOW") .allowlist_var("DISPATCH_TIME_NOW") .allowlist_function("dispatch_get_global_queue") .allowlist_function("dispatch_async_f") diff --git a/crates/gpui/src/app.rs b/crates/gpui/src/app.rs index f7c57ef015e73618b8cfd9d5da8dbb717905577b..2f4c7611dcf9d24302b3dda1d05c4c2b8711a68d 100644 --- a/crates/gpui/src/app.rs +++ b/crates/gpui/src/app.rs @@ -38,11 +38,10 @@ use crate::{ AssetSource, BackgroundExecutor, Bounds, ClipboardItem, CursorStyle, DispatchPhase, DisplayId, EventEmitter, FocusHandle, FocusMap, ForegroundExecutor, Global, KeyBinding, KeyContext, Keymap, Keystroke, LayoutId, Menu, MenuItem, OwnedMenu, PathPromptOptions, Pixels, Platform, - PlatformDisplay, PlatformKeyboardLayout, PlatformKeyboardMapper, Point, Priority, - PromptBuilder, PromptButton, PromptHandle, PromptLevel, Render, RenderImage, - RenderablePromptHandle, Reservation, ScreenCaptureSource, SharedString, SubscriberSet, - Subscription, SvgRenderer, Task, TextSystem, Window, WindowAppearance, WindowHandle, WindowId, - WindowInvalidator, + PlatformDisplay, PlatformKeyboardLayout, PlatformKeyboardMapper, Point, PromptBuilder, + PromptButton, PromptHandle, PromptLevel, Render, RenderImage, RenderablePromptHandle, + Reservation, ScreenCaptureSource, SharedString, SubscriberSet, Subscription, SvgRenderer, Task, + TextSystem, Window, WindowAppearance, WindowHandle, WindowId, WindowInvalidator, colors::{Colors, GlobalColors}, current_platform, hash, init_app_menus, }; @@ -1495,24 +1494,6 @@ impl App { .spawn(async move { f(&mut cx).await }) } - /// Spawns the future returned by the given function on the main thread with - /// the given priority. The closure will be invoked with [AsyncApp], which - /// allows the application state to be accessed across await points. - pub fn spawn_with_priority(&self, priority: Priority, f: AsyncFn) -> Task - where - AsyncFn: AsyncFnOnce(&mut AsyncApp) -> R + 'static, - R: 'static, - { - if self.quitting { - debug_panic!("Can't spawn on main thread after on_app_quit") - }; - - let mut cx = self.to_async(); - - self.foreground_executor - .spawn_with_priority(priority, async move { f(&mut cx).await }) - } - /// Schedules the given function to be run at the end of the current effect cycle, allowing entities /// that are currently on the stack to be returned to the app. pub fn defer(&mut self, f: impl FnOnce(&mut App) + 'static) { diff --git a/crates/gpui/src/app/context.rs b/crates/gpui/src/app/context.rs index 27ccbecaf83cafe7bf7562c32a164268a74a396b..65bb5521e32bb6fcfac2bcd95009949499589df1 100644 --- a/crates/gpui/src/app/context.rs +++ b/crates/gpui/src/app/context.rs @@ -1,7 +1,7 @@ use crate::{ AnyView, AnyWindowHandle, AppContext, AsyncApp, DispatchPhase, Effect, EntityId, EventEmitter, - FocusHandle, FocusOutEvent, Focusable, Global, KeystrokeObserver, Priority, Reservation, - SubscriberSet, Subscription, Task, WeakEntity, WeakFocusHandle, Window, WindowHandle, + FocusHandle, FocusOutEvent, Focusable, Global, KeystrokeObserver, Reservation, SubscriberSet, + Subscription, Task, WeakEntity, WeakFocusHandle, Window, WindowHandle, }; use anyhow::Result; use futures::FutureExt; @@ -667,25 +667,6 @@ impl<'a, T: 'static> Context<'a, T> { window.spawn(self, async move |cx| f(view, cx).await) } - /// Schedule a future to be run asynchronously with the given priority. - /// The given callback is invoked with a [`WeakEntity`] to avoid leaking the entity for a long-running process. - /// It's also given an [`AsyncWindowContext`], which can be used to access the state of the entity across await points. - /// The returned future will be polled on the main thread. - #[track_caller] - pub fn spawn_in_with_priority( - &self, - priority: Priority, - window: &Window, - f: AsyncFn, - ) -> Task - where - R: 'static, - AsyncFn: AsyncFnOnce(WeakEntity, &mut AsyncWindowContext) -> R + 'static, - { - let view = self.weak_entity(); - window.spawn_with_priority(priority, self, async move |cx| f(view, cx).await) - } - /// Register a callback to be invoked when the given global state changes. pub fn observe_global_in( &mut self, diff --git a/crates/gpui/src/executor.rs b/crates/gpui/src/executor.rs index 49d06f0b07b24371bd3dd43f8fc6717bc08bba50..c0aa978c8eb0b217aa1cf7cd734664dc0736c355 100644 --- a/crates/gpui/src/executor.rs +++ b/crates/gpui/src/executor.rs @@ -1,4 +1,4 @@ -use crate::{App, PlatformDispatcher, RunnableMeta, RunnableVariant, TaskTiming, profiler}; +use crate::{App, PlatformDispatcher, RunnableMeta, RunnableVariant}; use async_task::Runnable; use futures::channel::mpsc; use smol::prelude::*; @@ -46,52 +46,6 @@ pub struct ForegroundExecutor { not_send: PhantomData>, } -/// Realtime task priority -#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)] -#[repr(u8)] -pub enum RealtimePriority { - /// Audio task - Audio, - /// Other realtime task - #[default] - Other, -} - -/// Task priority -#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)] -#[repr(u8)] -pub enum Priority { - /// Realtime priority - /// - /// Spawning a task with this priority will spin it off on a separate thread dedicated just to that task. - Realtime(RealtimePriority), - /// High priority - /// - /// Only use for tasks that are critical to the user experience / responsiveness of the editor. - High, - /// Medium priority, probably suits most of your use cases. - #[default] - Medium, - /// Low priority - /// - /// Prioritize this for background work that can come in large quantities - /// to not starve the executor of resources for high priority tasks - Low, -} - -impl Priority { - #[allow(dead_code)] - pub(crate) const fn probability(&self) -> u32 { - match self { - // realtime priorities are not considered for probability scheduling - Priority::Realtime(_) => 0, - Priority::High => 60, - Priority::Medium => 30, - Priority::Low => 10, - } - } -} - /// Task is a primitive that allows work to happen in the background. /// /// It implements [`Future`] so you can `.await` on it. @@ -197,20 +151,7 @@ impl BackgroundExecutor { where R: Send + 'static, { - self.spawn_with_priority(Priority::default(), future) - } - - /// Enqueues the given future to be run to completion on a background thread. - #[track_caller] - pub fn spawn_with_priority( - &self, - priority: Priority, - future: impl Future + Send + 'static, - ) -> Task - where - R: Send + 'static, - { - self.spawn_internal::(Box::pin(future), None, priority) + self.spawn_internal::(Box::pin(future), None) } /// Enqueues the given future to be run to completion on a background thread. @@ -224,7 +165,7 @@ impl BackgroundExecutor { where R: Send + 'static, { - self.spawn_internal::(Box::pin(future), Some(label), Priority::default()) + self.spawn_internal::(Box::pin(future), Some(label)) } #[track_caller] @@ -232,55 +173,15 @@ impl BackgroundExecutor { &self, future: AnyFuture, label: Option, - priority: Priority, ) -> Task { let dispatcher = self.dispatcher.clone(); - let (runnable, task) = if let Priority::Realtime(realtime) = priority { - let location = core::panic::Location::caller(); - let (mut tx, rx) = flume::bounded::>(1); - - dispatcher.spawn_realtime( - realtime, - Box::new(move || { - while let Ok(runnable) = rx.recv() { - let start = Instant::now(); - let location = runnable.metadata().location; - let mut timing = TaskTiming { - location, - start, - end: None, - }; - profiler::add_task_timing(timing); - - runnable.run(); - - let end = Instant::now(); - timing.end = Some(end); - profiler::add_task_timing(timing); - } - }), + let location = core::panic::Location::caller(); + let (runnable, task) = async_task::Builder::new() + .metadata(RunnableMeta { location }) + .spawn( + move |_| future, + move |runnable| dispatcher.dispatch(RunnableVariant::Meta(runnable), label), ); - - async_task::Builder::new() - .metadata(RunnableMeta { location }) - .spawn( - move |_| future, - move |runnable| { - let _ = tx.send(runnable); - }, - ) - } else { - let location = core::panic::Location::caller(); - async_task::Builder::new() - .metadata(RunnableMeta { location }) - .spawn( - move |_| future, - move |runnable| { - dispatcher.dispatch(RunnableVariant::Meta(runnable), label, priority) - }, - ) - }; - runnable.schedule(); Task(TaskState::Spawned(task)) } @@ -453,28 +354,11 @@ impl BackgroundExecutor { where F: FnOnce(&mut Scope<'scope>), { - let mut scope = Scope::new(self.clone(), Priority::default()); - (scheduler)(&mut scope); - let spawned = mem::take(&mut scope.futures) - .into_iter() - .map(|f| self.spawn_with_priority(scope.priority, f)) - .collect::>(); - for task in spawned { - task.await; - } - } - - /// Scoped lets you start a number of tasks and waits - /// for all of them to complete before returning. - pub async fn scoped_priority<'scope, F>(&self, priority: Priority, scheduler: F) - where - F: FnOnce(&mut Scope<'scope>), - { - let mut scope = Scope::new(self.clone(), priority); + let mut scope = Scope::new(self.clone()); (scheduler)(&mut scope); let spawned = mem::take(&mut scope.futures) .into_iter() - .map(|f| self.spawn_with_priority(scope.priority, f)) + .map(|f| self.spawn(f)) .collect::>(); for task in spawned { task.await; @@ -610,19 +494,6 @@ impl ForegroundExecutor { /// Enqueues the given Task to run on the main thread at some point in the future. #[track_caller] pub fn spawn(&self, future: impl Future + 'static) -> Task - where - R: 'static, - { - self.spawn_with_priority(Priority::default(), future) - } - - /// Enqueues the given Task to run on the main thread at some point in the future. - #[track_caller] - pub fn spawn_with_priority( - &self, - priority: Priority, - future: impl Future + 'static, - ) -> Task where R: 'static, { @@ -634,19 +505,16 @@ impl ForegroundExecutor { dispatcher: Arc, future: AnyLocalFuture, location: &'static core::panic::Location<'static>, - priority: Priority, ) -> Task { let (runnable, task) = spawn_local_with_source_location( future, - move |runnable| { - dispatcher.dispatch_on_main_thread(RunnableVariant::Meta(runnable), priority) - }, + move |runnable| dispatcher.dispatch_on_main_thread(RunnableVariant::Meta(runnable)), RunnableMeta { location }, ); runnable.schedule(); Task(TaskState::Spawned(task)) } - inner::(dispatcher, Box::pin(future), location, priority) + inner::(dispatcher, Box::pin(future), location) } } @@ -722,7 +590,6 @@ where /// Scope manages a set of tasks that are enqueued and waited on together. See [`BackgroundExecutor::scoped`]. pub struct Scope<'a> { executor: BackgroundExecutor, - priority: Priority, futures: Vec + Send + 'static>>>, tx: Option>, rx: mpsc::Receiver<()>, @@ -730,11 +597,10 @@ pub struct Scope<'a> { } impl<'a> Scope<'a> { - fn new(executor: BackgroundExecutor, priority: Priority) -> Self { + fn new(executor: BackgroundExecutor) -> Self { let (tx, rx) = mpsc::channel(1); Self { executor, - priority, tx: Some(tx), rx, futures: Default::default(), diff --git a/crates/gpui/src/gpui.rs b/crates/gpui/src/gpui.rs index e5c726f58e117b76e2dbb2976089d5788baa848e..bc70362047d7826519f6f7c734b7c5a84281b31f 100644 --- a/crates/gpui/src/gpui.rs +++ b/crates/gpui/src/gpui.rs @@ -31,8 +31,6 @@ mod path_builder; mod platform; pub mod prelude; mod profiler; -#[cfg(any(target_os = "windows", target_os = "linux"))] -mod queue; mod scene; mod shared_string; mod shared_uri; @@ -91,20 +89,16 @@ pub use keymap::*; pub use path_builder::*; pub use platform::*; pub use profiler::*; -#[cfg(any(target_os = "windows", target_os = "linux"))] -pub(crate) use queue::{PriorityQueueReceiver, PriorityQueueSender}; pub use refineable::*; pub use scene::*; pub use shared_string::*; pub use shared_uri::*; pub use smol::Timer; -use std::{any::Any, future::Future}; pub use style::*; pub use styled::*; pub use subscription::*; pub use svg_renderer::*; pub(crate) use tab_stop::*; -use taffy::TaffyLayoutEngine; pub use taffy::{AvailableSpace, LayoutId}; #[cfg(any(test, feature = "test-support"))] pub use test::*; @@ -115,6 +109,9 @@ pub use util::{FutureExt, Timeout, arc_cow::ArcCow}; pub use view::*; pub use window::*; +use std::{any::Any, future::Future}; +use taffy::TaffyLayoutEngine; + /// The context trait, allows the different contexts in GPUI to be used /// interchangeably for certain operations. pub trait AppContext { diff --git a/crates/gpui/src/platform.rs b/crates/gpui/src/platform.rs index f120e075fea7f9336e2f6e10c51611d8ba03564d..922cfd13c16d098380c39f8d2d1f72e66624b78f 100644 --- a/crates/gpui/src/platform.rs +++ b/crates/gpui/src/platform.rs @@ -39,10 +39,9 @@ use crate::{ Action, AnyWindowHandle, App, AsyncWindowContext, BackgroundExecutor, Bounds, DEFAULT_WINDOW_SIZE, DevicePixels, DispatchEventResult, Font, FontId, FontMetrics, FontRun, ForegroundExecutor, GlyphId, GpuSpecs, ImageSource, Keymap, LineLayout, Pixels, PlatformInput, - Point, Priority, RealtimePriority, RenderGlyphParams, RenderImage, RenderImageParams, - RenderSvgParams, Scene, ShapedGlyph, ShapedRun, SharedString, Size, SvgRenderer, - SystemWindowTab, Task, TaskLabel, TaskTiming, ThreadTaskTimings, Window, WindowControlArea, - hash, point, px, size, + Point, RenderGlyphParams, RenderImage, RenderImageParams, RenderSvgParams, Scene, ShapedGlyph, + ShapedRun, SharedString, Size, SvgRenderer, SystemWindowTab, Task, TaskLabel, TaskTiming, + ThreadTaskTimings, Window, WindowControlArea, hash, point, px, size, }; use anyhow::Result; use async_task::Runnable; @@ -588,10 +587,9 @@ pub trait PlatformDispatcher: Send + Sync { fn get_all_timings(&self) -> Vec; fn get_current_thread_timings(&self) -> Vec; fn is_main_thread(&self) -> bool; - fn dispatch(&self, runnable: RunnableVariant, label: Option, priority: Priority); - fn dispatch_on_main_thread(&self, runnable: RunnableVariant, priority: Priority); + fn dispatch(&self, runnable: RunnableVariant, label: Option); + fn dispatch_on_main_thread(&self, runnable: RunnableVariant); fn dispatch_after(&self, duration: Duration, runnable: RunnableVariant); - fn spawn_realtime(&self, priority: RealtimePriority, f: Box); fn now(&self) -> Instant { Instant::now() diff --git a/crates/gpui/src/platform/linux/dispatcher.rs b/crates/gpui/src/platform/linux/dispatcher.rs index d88eefd2c8a7fc648b20f7a2e520fe40158acd51..d0c32140f3642e037df326f4e2beae16c59dd883 100644 --- a/crates/gpui/src/platform/linux/dispatcher.rs +++ b/crates/gpui/src/platform/linux/dispatcher.rs @@ -1,10 +1,9 @@ use crate::{ - GLOBAL_THREAD_TIMINGS, PlatformDispatcher, Priority, PriorityQueueReceiver, - PriorityQueueSender, RealtimePriority, RunnableVariant, THREAD_TIMINGS, TaskLabel, TaskTiming, - ThreadTaskTimings, profiler, + GLOBAL_THREAD_TIMINGS, PlatformDispatcher, RunnableVariant, THREAD_TIMINGS, TaskLabel, + TaskTiming, ThreadTaskTimings, }; use calloop::{ - EventLoop, PostAction, + EventLoop, channel::{self, Sender}, timer::TimeoutAction, }; @@ -20,9 +19,9 @@ struct TimerAfter { } pub(crate) struct LinuxDispatcher { - main_sender: PriorityQueueCalloopSender, + main_sender: Sender, timer_sender: Sender, - background_sender: PriorityQueueSender, + background_sender: flume::Sender, _background_threads: Vec>, main_thread_id: thread::ThreadId, } @@ -30,20 +29,18 @@ pub(crate) struct LinuxDispatcher { const MIN_THREADS: usize = 2; impl LinuxDispatcher { - pub fn new(main_sender: PriorityQueueCalloopSender) -> Self { - let (background_sender, background_receiver) = PriorityQueueReceiver::new(); + pub fn new(main_sender: Sender) -> Self { + let (background_sender, background_receiver) = flume::unbounded::(); let thread_count = std::thread::available_parallelism().map_or(MIN_THREADS, |i| i.get().max(MIN_THREADS)); - // These thread should really be lower prio then the foreground - // executor let mut background_threads = (0..thread_count) .map(|i| { - let mut receiver = background_receiver.clone(); + let receiver = background_receiver.clone(); std::thread::Builder::new() .name(format!("Worker-{i}")) .spawn(move || { - for runnable in receiver.iter() { + for runnable in receiver { let start = Instant::now(); let mut location = match runnable { @@ -54,7 +51,7 @@ impl LinuxDispatcher { start, end: None, }; - profiler::add_task_timing(timing); + Self::add_task_timing(timing); runnable.run(); timing @@ -66,7 +63,7 @@ impl LinuxDispatcher { start, end: None, }; - profiler::add_task_timing(timing); + Self::add_task_timing(timing); runnable.run(); timing @@ -75,7 +72,7 @@ impl LinuxDispatcher { let end = Instant::now(); location.end = Some(end); - profiler::add_task_timing(location); + Self::add_task_timing(location); log::trace!( "background thread {}: ran runnable. took: {:?}", @@ -116,7 +113,7 @@ impl LinuxDispatcher { start, end: None, }; - profiler::add_task_timing(timing); + Self::add_task_timing(timing); runnable.run(); timing @@ -127,7 +124,7 @@ impl LinuxDispatcher { start, end: None, }; - profiler::add_task_timing(timing); + Self::add_task_timing(timing); runnable.run(); timing @@ -136,7 +133,7 @@ impl LinuxDispatcher { let end = Instant::now(); timing.end = Some(end); - profiler::add_task_timing(timing); + Self::add_task_timing(timing); } TimeoutAction::Drop }, @@ -160,6 +157,22 @@ impl LinuxDispatcher { main_thread_id: thread::current().id(), } } + + pub(crate) fn add_task_timing(timing: TaskTiming) { + THREAD_TIMINGS.with(|timings| { + let mut timings = timings.lock(); + let timings = &mut timings.timings; + + if let Some(last_timing) = timings.iter_mut().rev().next() { + if last_timing.location == timing.location { + last_timing.end = timing.end; + return; + } + } + + timings.push_back(timing); + }); + } } impl PlatformDispatcher for LinuxDispatcher { @@ -186,26 +199,22 @@ impl PlatformDispatcher for LinuxDispatcher { thread::current().id() == self.main_thread_id } - fn dispatch(&self, runnable: RunnableVariant, _: Option, priority: Priority) { - self.background_sender - .send(priority, runnable) - .unwrap_or_else(|_| panic!("blocking sender returned without value")); + fn dispatch(&self, runnable: RunnableVariant, _: Option) { + self.background_sender.send(runnable).unwrap(); } - fn dispatch_on_main_thread(&self, runnable: RunnableVariant, priority: Priority) { - self.main_sender - .send(priority, runnable) - .unwrap_or_else(|runnable| { - // NOTE: Runnable may wrap a Future that is !Send. - // - // This is usually safe because we only poll it on the main thread. - // However if the send fails, we know that: - // 1. main_receiver has been dropped (which implies the app is shutting down) - // 2. we are on a background thread. - // It is not safe to drop something !Send on the wrong thread, and - // the app will exit soon anyway, so we must forget the runnable. - std::mem::forget(runnable); - }); + fn dispatch_on_main_thread(&self, runnable: RunnableVariant) { + self.main_sender.send(runnable).unwrap_or_else(|runnable| { + // NOTE: Runnable may wrap a Future that is !Send. + // + // This is usually safe because we only poll it on the main thread. + // However if the send fails, we know that: + // 1. main_receiver has been dropped (which implies the app is shutting down) + // 2. we are on a background thread. + // It is not safe to drop something !Send on the wrong thread, and + // the app will exit soon anyway, so we must forget the runnable. + std::mem::forget(runnable); + }); } fn dispatch_after(&self, duration: Duration, runnable: RunnableVariant) { @@ -213,252 +222,4 @@ impl PlatformDispatcher for LinuxDispatcher { .send(TimerAfter { duration, runnable }) .ok(); } - - fn spawn_realtime(&self, priority: RealtimePriority, f: Box) { - std::thread::spawn(move || { - // SAFETY: always safe to call - let thread_id = unsafe { libc::pthread_self() }; - - let policy = match priority { - RealtimePriority::Audio => libc::SCHED_FIFO, - RealtimePriority::Other => libc::SCHED_RR, - }; - let sched_priority = match priority { - RealtimePriority::Audio => 65, - RealtimePriority::Other => 45, - }; - - let sched_param = libc::sched_param { sched_priority }; - // SAFETY: sched_param is a valid initialized structure - let result = unsafe { libc::pthread_setschedparam(thread_id, policy, &sched_param) }; - if result != 0 { - log::warn!("failed to set realtime thread priority to {:?}", priority); - } - - f(); - }); - } -} - -pub struct PriorityQueueCalloopSender { - sender: PriorityQueueSender, - ping: calloop::ping::Ping, -} - -impl PriorityQueueCalloopSender { - fn new(tx: PriorityQueueSender, ping: calloop::ping::Ping) -> Self { - Self { sender: tx, ping } - } - - fn send(&self, priority: Priority, item: T) -> Result<(), crate::queue::SendError> { - let res = self.sender.send(priority, item); - if res.is_ok() { - self.ping.ping(); - } - res - } -} - -impl Drop for PriorityQueueCalloopSender { - fn drop(&mut self) { - self.ping.ping(); - } -} - -pub struct PriorityQueueCalloopReceiver { - receiver: PriorityQueueReceiver, - source: calloop::ping::PingSource, - ping: calloop::ping::Ping, -} - -impl PriorityQueueCalloopReceiver { - pub fn new() -> (PriorityQueueCalloopSender, Self) { - let (ping, source) = calloop::ping::make_ping().expect("Failed to create a Ping."); - - let (tx, rx) = PriorityQueueReceiver::new(); - - ( - PriorityQueueCalloopSender::new(tx, ping.clone()), - Self { - receiver: rx, - source, - ping, - }, - ) - } } - -use calloop::channel::Event; - -#[derive(Debug)] -pub struct ChannelError(calloop::ping::PingError); - -impl std::fmt::Display for ChannelError { - #[cfg_attr(feature = "nightly_coverage", coverage(off))] - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - std::fmt::Display::fmt(&self.0, f) - } -} - -impl std::error::Error for ChannelError { - #[cfg_attr(feature = "nightly_coverage", coverage(off))] - fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { - Some(&self.0) - } -} - -impl calloop::EventSource for PriorityQueueCalloopReceiver { - type Event = Event; - type Metadata = (); - type Ret = (); - type Error = ChannelError; - - fn process_events( - &mut self, - readiness: calloop::Readiness, - token: calloop::Token, - mut callback: F, - ) -> Result - where - F: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret, - { - let mut clear_readiness = false; - let mut disconnected = false; - - let action = self - .source - .process_events(readiness, token, |(), &mut ()| { - let mut is_empty = true; - - let mut receiver = self.receiver.clone(); - for runnable in receiver.try_iter() { - match runnable { - Ok(r) => { - callback(Event::Msg(r), &mut ()); - is_empty = false; - } - Err(_) => { - disconnected = true; - } - } - } - - if disconnected { - callback(Event::Closed, &mut ()); - } - - if is_empty { - clear_readiness = true; - } - }) - .map_err(ChannelError)?; - - if disconnected { - Ok(PostAction::Remove) - } else if clear_readiness { - Ok(action) - } else { - // Re-notify the ping source so we can try again. - self.ping.ping(); - Ok(PostAction::Continue) - } - } - - fn register( - &mut self, - poll: &mut calloop::Poll, - token_factory: &mut calloop::TokenFactory, - ) -> calloop::Result<()> { - self.source.register(poll, token_factory) - } - - fn reregister( - &mut self, - poll: &mut calloop::Poll, - token_factory: &mut calloop::TokenFactory, - ) -> calloop::Result<()> { - self.source.reregister(poll, token_factory) - } - - fn unregister(&mut self, poll: &mut calloop::Poll) -> calloop::Result<()> { - self.source.unregister(poll) - } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn calloop_works() { - let mut event_loop = calloop::EventLoop::try_new().unwrap(); - let handle = event_loop.handle(); - - let (tx, rx) = PriorityQueueCalloopReceiver::new(); - - struct Data { - got_msg: bool, - got_closed: bool, - } - - let mut data = Data { - got_msg: false, - got_closed: false, - }; - - let _channel_token = handle - .insert_source(rx, move |evt, &mut (), data: &mut Data| match evt { - Event::Msg(()) => { - data.got_msg = true; - } - - Event::Closed => { - data.got_closed = true; - } - }) - .unwrap(); - - // nothing is sent, nothing is received - event_loop - .dispatch(Some(::std::time::Duration::ZERO), &mut data) - .unwrap(); - - assert!(!data.got_msg); - assert!(!data.got_closed); - // a message is send - - tx.send(Priority::Medium, ()).unwrap(); - event_loop - .dispatch(Some(::std::time::Duration::ZERO), &mut data) - .unwrap(); - - assert!(data.got_msg); - assert!(!data.got_closed); - - // the sender is dropped - drop(tx); - event_loop - .dispatch(Some(::std::time::Duration::ZERO), &mut data) - .unwrap(); - - assert!(data.got_msg); - assert!(data.got_closed); - } -} - -// running 1 test -// test platform::linux::dispatcher::tests::tomato ... FAILED - -// failures: - -// ---- platform::linux::dispatcher::tests::tomato stdout ---- -// [crates/gpui/src/platform/linux/dispatcher.rs:262:9] -// returning 1 tasks to process -// [crates/gpui/src/platform/linux/dispatcher.rs:480:75] evt = Msg( -// (), -// ) -// returning 0 tasks to process - -// thread 'platform::linux::dispatcher::tests::tomato' (478301) panicked at crates/gpui/src/platform/linux/dispatcher.rs:515:9: -// assertion failed: data.got_closed -// note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace diff --git a/crates/gpui/src/platform/linux/platform.rs b/crates/gpui/src/platform/linux/platform.rs index 06a81ec342e9d528a081456583f3ba0f3fb77b6f..51a1d5f5849d387a3f5855c12f50fce0a95d1cf4 100644 --- a/crates/gpui/src/platform/linux/platform.rs +++ b/crates/gpui/src/platform/linux/platform.rs @@ -14,7 +14,7 @@ use std::{ }; use anyhow::{Context as _, anyhow}; -use calloop::LoopSignal; +use calloop::{LoopSignal, channel::Channel}; use futures::channel::oneshot; use util::ResultExt as _; use util::command::{new_smol_command, new_std_command}; @@ -25,8 +25,8 @@ use crate::{ Action, AnyWindowHandle, BackgroundExecutor, ClipboardItem, CursorStyle, DisplayId, ForegroundExecutor, Keymap, LinuxDispatcher, Menu, MenuItem, OwnedMenu, PathPromptOptions, Pixels, Platform, PlatformDisplay, PlatformKeyboardLayout, PlatformKeyboardMapper, - PlatformTextSystem, PlatformWindow, Point, PriorityQueueCalloopReceiver, Result, - RunnableVariant, Task, WindowAppearance, WindowParams, px, + PlatformTextSystem, PlatformWindow, Point, Result, RunnableVariant, Task, WindowAppearance, + WindowParams, px, }; #[cfg(any(feature = "wayland", feature = "x11"))] @@ -149,8 +149,8 @@ pub(crate) struct LinuxCommon { } impl LinuxCommon { - pub fn new(signal: LoopSignal) -> (Self, PriorityQueueCalloopReceiver) { - let (main_sender, main_receiver) = PriorityQueueCalloopReceiver::new(); + pub fn new(signal: LoopSignal) -> (Self, Channel) { + let (main_sender, main_receiver) = calloop::channel::channel::(); #[cfg(any(feature = "wayland", feature = "x11"))] let text_system = Arc::new(crate::CosmicTextSystem::new()); diff --git a/crates/gpui/src/platform/linux/wayland/client.rs b/crates/gpui/src/platform/linux/wayland/client.rs index 0e7bf8fbf8880baf5876027e6e764d7411932577..1a7011c582ab162c8ed6c7277d3dd1f5b8c60239 100644 --- a/crates/gpui/src/platform/linux/wayland/client.rs +++ b/crates/gpui/src/platform/linux/wayland/client.rs @@ -77,10 +77,10 @@ use crate::{ LinuxKeyboardLayout, Modifiers, ModifiersChangedEvent, MouseButton, MouseDownEvent, MouseExitEvent, MouseMoveEvent, MouseUpEvent, NavigationDirection, Pixels, PlatformDisplay, PlatformInput, PlatformKeyboardLayout, Point, ResultExt as _, SCROLL_LINES, ScrollDelta, - ScrollWheelEvent, Size, TouchPhase, WindowParams, point, profiler, px, size, + ScrollWheelEvent, Size, TouchPhase, WindowParams, point, px, size, }; use crate::{ - RunnableVariant, TaskTiming, + LinuxDispatcher, RunnableVariant, TaskTiming, platform::{PlatformWindow, blade::BladeContext}, }; use crate::{ @@ -503,7 +503,7 @@ impl WaylandClient { start, end: None, }; - profiler::add_task_timing(timing); + LinuxDispatcher::add_task_timing(timing); runnable.run(); timing @@ -515,7 +515,7 @@ impl WaylandClient { start, end: None, }; - profiler::add_task_timing(timing); + LinuxDispatcher::add_task_timing(timing); runnable.run(); timing @@ -524,7 +524,7 @@ impl WaylandClient { let end = Instant::now(); timing.end = Some(end); - profiler::add_task_timing(timing); + LinuxDispatcher::add_task_timing(timing); }); } } diff --git a/crates/gpui/src/platform/linux/x11/client.rs b/crates/gpui/src/platform/linux/x11/client.rs index 60400dada57775a295fdb36c7f1ddd9dd8b83a67..aa16dc7ad1d9030665ace646ba2ac295df8c27b3 100644 --- a/crates/gpui/src/platform/linux/x11/client.rs +++ b/crates/gpui/src/platform/linux/x11/client.rs @@ -1,4 +1,4 @@ -use crate::{Capslock, ResultExt as _, RunnableVariant, TaskTiming, profiler, xcb_flush}; +use crate::{Capslock, LinuxDispatcher, ResultExt as _, RunnableVariant, TaskTiming, xcb_flush}; use anyhow::{Context as _, anyhow}; use ashpd::WindowIdentifier; use calloop::{ @@ -322,7 +322,7 @@ impl X11Client { start, end: None, }; - profiler::add_task_timing(timing); + LinuxDispatcher::add_task_timing(timing); runnable.run(); timing @@ -334,7 +334,7 @@ impl X11Client { start, end: None, }; - profiler::add_task_timing(timing); + LinuxDispatcher::add_task_timing(timing); runnable.run(); timing @@ -343,7 +343,7 @@ impl X11Client { let end = Instant::now(); timing.end = Some(end); - profiler::add_task_timing(timing); + LinuxDispatcher::add_task_timing(timing); }); } } diff --git a/crates/gpui/src/platform/mac/dispatcher.rs b/crates/gpui/src/platform/mac/dispatcher.rs index 49a0f3d7611467803ddae9057fb9589b9bbda49f..8a2f42234eea960669cb212853c437ec680a7fd7 100644 --- a/crates/gpui/src/platform/mac/dispatcher.rs +++ b/crates/gpui/src/platform/mac/dispatcher.rs @@ -3,22 +3,11 @@ #![allow(non_snake_case)] use crate::{ - GLOBAL_THREAD_TIMINGS, PlatformDispatcher, Priority, RealtimePriority, RunnableMeta, - RunnableVariant, THREAD_TIMINGS, TaskLabel, TaskTiming, ThreadTaskTimings, + GLOBAL_THREAD_TIMINGS, PlatformDispatcher, RunnableMeta, RunnableVariant, THREAD_TIMINGS, + TaskLabel, TaskTiming, ThreadTaskTimings, }; -use anyhow::Context; use async_task::Runnable; -use mach2::{ - kern_return::KERN_SUCCESS, - mach_time::mach_timebase_info_data_t, - thread_policy::{ - THREAD_EXTENDED_POLICY, THREAD_EXTENDED_POLICY_COUNT, THREAD_PRECEDENCE_POLICY, - THREAD_PRECEDENCE_POLICY_COUNT, THREAD_TIME_CONSTRAINT_POLICY, - THREAD_TIME_CONSTRAINT_POLICY_COUNT, thread_extended_policy_data_t, - thread_precedence_policy_data_t, thread_time_constraint_policy_data_t, - }, -}; use objc::{ class, msg_send, runtime::{BOOL, YES}, @@ -26,11 +15,9 @@ use objc::{ }; use std::{ ffi::c_void, - mem::MaybeUninit, ptr::{NonNull, addr_of}, time::{Duration, Instant}, }; -use util::ResultExt; /// All items in the generated file are marked as pub, so we're gonna wrap it in a separate mod to prevent /// these pub items from leaking into public API. @@ -69,7 +56,7 @@ impl PlatformDispatcher for MacDispatcher { is_main_thread == YES } - fn dispatch(&self, runnable: RunnableVariant, _: Option, priority: Priority) { + fn dispatch(&self, runnable: RunnableVariant, _: Option) { let (context, trampoline) = match runnable { RunnableVariant::Meta(runnable) => ( runnable.into_raw().as_ptr() as *mut c_void, @@ -80,24 +67,16 @@ impl PlatformDispatcher for MacDispatcher { Some(trampoline_compat as unsafe extern "C" fn(*mut c_void)), ), }; - - let queue_priority = match priority { - Priority::Realtime(_) => unreachable!(), - Priority::High => DISPATCH_QUEUE_PRIORITY_HIGH, - Priority::Medium => DISPATCH_QUEUE_PRIORITY_DEFAULT, - Priority::Low => DISPATCH_QUEUE_PRIORITY_LOW as u32, - }; - unsafe { dispatch_async_f( - dispatch_get_global_queue(queue_priority.try_into().unwrap(), 0), + dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_HIGH.try_into().unwrap(), 0), context, trampoline, ); } } - fn dispatch_on_main_thread(&self, runnable: RunnableVariant, _priority: Priority) { + fn dispatch_on_main_thread(&self, runnable: RunnableVariant) { let (context, trampoline) = match runnable { RunnableVariant::Meta(runnable) => ( runnable.into_raw().as_ptr() as *mut c_void, @@ -131,120 +110,6 @@ impl PlatformDispatcher for MacDispatcher { dispatch_after_f(when, queue, context, trampoline); } } - - fn spawn_realtime(&self, priority: RealtimePriority, f: Box) { - std::thread::spawn(move || { - match priority { - RealtimePriority::Audio => set_audio_thread_priority(), - RealtimePriority::Other => set_high_thread_priority(), - } - .context(format!("for priority {:?}", priority)) - .log_err(); - - f(); - }); - } -} - -fn set_high_thread_priority() -> anyhow::Result<()> { - // SAFETY: always safe to call - let thread_id = unsafe { libc::pthread_self() }; - - // SAFETY: all sched_param members are valid when initialized to zero. - let mut sched_param = unsafe { MaybeUninit::::zeroed().assume_init() }; - sched_param.sched_priority = 45; - - let result = unsafe { libc::pthread_setschedparam(thread_id, libc::SCHED_FIFO, &sched_param) }; - if result != 0 { - anyhow::bail!("failed to set realtime thread priority") - } - - Ok(()) -} - -fn set_audio_thread_priority() -> anyhow::Result<()> { - // https://chromium.googlesource.com/chromium/chromium/+/master/base/threading/platform_thread_mac.mm#93 - - // SAFETY: always safe to call - let thread_id = unsafe { libc::pthread_self() }; - - // SAFETY: thread_id is a valid thread id - let thread_id = unsafe { libc::pthread_mach_thread_np(thread_id) }; - - // Fixed priority thread - let mut policy = thread_extended_policy_data_t { timeshare: 0 }; - - // SAFETY: thread_id is a valid thread id - // SAFETY: thread_extended_policy_data_t is passed as THREAD_EXTENDED_POLICY - let result = unsafe { - mach2::thread_policy::thread_policy_set( - thread_id, - THREAD_EXTENDED_POLICY, - &mut policy as *mut _ as *mut _, - THREAD_EXTENDED_POLICY_COUNT, - ) - }; - - if result != KERN_SUCCESS { - anyhow::bail!("failed to set thread extended policy"); - } - - // relatively high priority - let mut precedence = thread_precedence_policy_data_t { importance: 63 }; - - // SAFETY: thread_id is a valid thread id - // SAFETY: thread_precedence_policy_data_t is passed as THREAD_PRECEDENCE_POLICY - let result = unsafe { - mach2::thread_policy::thread_policy_set( - thread_id, - THREAD_PRECEDENCE_POLICY, - &mut precedence as *mut _ as *mut _, - THREAD_PRECEDENCE_POLICY_COUNT, - ) - }; - - if result != KERN_SUCCESS { - anyhow::bail!("failed to set thread precedence policy"); - } - - const GUARANTEED_AUDIO_DUTY_CYCLE: f32 = 0.75; - const MAX_AUDIO_DUTY_CYCLE: f32 = 0.85; - - // ~128 frames @ 44.1KHz - const TIME_QUANTUM: f32 = 2.9; - - const AUDIO_TIME_NEEDED: f32 = GUARANTEED_AUDIO_DUTY_CYCLE * TIME_QUANTUM; - const MAX_TIME_ALLOWED: f32 = MAX_AUDIO_DUTY_CYCLE * TIME_QUANTUM; - - let mut timebase_info = mach_timebase_info_data_t { numer: 0, denom: 0 }; - // SAFETY: timebase_info is a valid pointer to a mach_timebase_info_data_t struct - unsafe { mach2::mach_time::mach_timebase_info(&mut timebase_info) }; - - let ms_to_abs_time = ((timebase_info.denom as f32) / (timebase_info.numer as f32)) * 1000000f32; - - let mut time_constraints = thread_time_constraint_policy_data_t { - period: (TIME_QUANTUM * ms_to_abs_time) as u32, - computation: (AUDIO_TIME_NEEDED * ms_to_abs_time) as u32, - constraint: (MAX_TIME_ALLOWED * ms_to_abs_time) as u32, - preemptible: 0, - }; - - // SAFETY: thread_id is a valid thread id - // SAFETY: thread_precedence_pthread_time_constraint_policy_data_t is passed as THREAD_TIME_CONSTRAINT_POLICY - let result = unsafe { - mach2::thread_policy::thread_policy_set( - thread_id, - THREAD_TIME_CONSTRAINT_POLICY, - &mut time_constraints as *mut _ as *mut _, - THREAD_TIME_CONSTRAINT_POLICY_COUNT, - ) - }; - - if result != KERN_SUCCESS { - anyhow::bail!("failed to set thread time constraint policy"); - } - - Ok(()) } extern "C" fn trampoline(runnable: *mut c_void) { diff --git a/crates/gpui/src/platform/test/dispatcher.rs b/crates/gpui/src/platform/test/dispatcher.rs index c271430586106abc93e0bb3258c9e25a06b12383..538aacda83a095449193db6aab63f3a06189ef7a 100644 --- a/crates/gpui/src/platform/test/dispatcher.rs +++ b/crates/gpui/src/platform/test/dispatcher.rs @@ -1,4 +1,4 @@ -use crate::{PlatformDispatcher, Priority, RunnableVariant, TaskLabel}; +use crate::{PlatformDispatcher, RunnableVariant, TaskLabel}; use backtrace::Backtrace; use collections::{HashMap, HashSet, VecDeque}; use parking::Unparker; @@ -284,7 +284,7 @@ impl PlatformDispatcher for TestDispatcher { state.start_time + state.time } - fn dispatch(&self, runnable: RunnableVariant, label: Option, _priority: Priority) { + fn dispatch(&self, runnable: RunnableVariant, label: Option) { { let mut state = self.state.lock(); if label.is_some_and(|label| state.deprioritized_task_labels.contains(&label)) { @@ -296,7 +296,7 @@ impl PlatformDispatcher for TestDispatcher { self.unpark_all(); } - fn dispatch_on_main_thread(&self, runnable: RunnableVariant, _priority: Priority) { + fn dispatch_on_main_thread(&self, runnable: RunnableVariant) { self.state .lock() .foreground @@ -318,10 +318,4 @@ impl PlatformDispatcher for TestDispatcher { fn as_test(&self) -> Option<&TestDispatcher> { Some(self) } - - fn spawn_realtime(&self, _priority: crate::RealtimePriority, f: Box) { - std::thread::spawn(move || { - f(); - }); - } } diff --git a/crates/gpui/src/platform/windows/dispatcher.rs b/crates/gpui/src/platform/windows/dispatcher.rs index 0720d414c9b44dec4a3bab5b50fd7dde47991989..6214e60e5b4b178c20b1fff655f4ac8b49be3f4c 100644 --- a/crates/gpui/src/platform/windows/dispatcher.rs +++ b/crates/gpui/src/platform/windows/dispatcher.rs @@ -4,31 +4,24 @@ use std::{ time::{Duration, Instant}, }; -use anyhow::Context; +use flume::Sender; use util::ResultExt; use windows::{ - System::Threading::{ - ThreadPool, ThreadPoolTimer, TimerElapsedHandler, WorkItemHandler, WorkItemPriority, - }, + System::Threading::{ThreadPool, ThreadPoolTimer, TimerElapsedHandler, WorkItemHandler}, Win32::{ Foundation::{LPARAM, WPARAM}, - System::Threading::{ - GetCurrentThread, HIGH_PRIORITY_CLASS, SetPriorityClass, SetThreadPriority, - THREAD_PRIORITY_HIGHEST, THREAD_PRIORITY_TIME_CRITICAL, - }, UI::WindowsAndMessaging::PostMessageW, }, }; use crate::{ - GLOBAL_THREAD_TIMINGS, HWND, PlatformDispatcher, Priority, PriorityQueueSender, - RealtimePriority, RunnableVariant, SafeHwnd, THREAD_TIMINGS, TaskLabel, TaskTiming, - ThreadTaskTimings, WM_GPUI_TASK_DISPATCHED_ON_MAIN_THREAD, profiler, + GLOBAL_THREAD_TIMINGS, HWND, PlatformDispatcher, RunnableVariant, SafeHwnd, THREAD_TIMINGS, + TaskLabel, TaskTiming, ThreadTaskTimings, WM_GPUI_TASK_DISPATCHED_ON_MAIN_THREAD, }; pub(crate) struct WindowsDispatcher { pub(crate) wake_posted: AtomicBool, - main_sender: PriorityQueueSender, + main_sender: Sender, main_thread_id: ThreadId, pub(crate) platform_window_handle: SafeHwnd, validation_number: usize, @@ -36,7 +29,7 @@ pub(crate) struct WindowsDispatcher { impl WindowsDispatcher { pub(crate) fn new( - main_sender: PriorityQueueSender, + main_sender: Sender, platform_window_handle: HWND, validation_number: usize, ) -> Self { @@ -52,7 +45,7 @@ impl WindowsDispatcher { } } - fn dispatch_on_threadpool(&self, priority: WorkItemPriority, runnable: RunnableVariant) { + fn dispatch_on_threadpool(&self, runnable: RunnableVariant) { let handler = { let mut task_wrapper = Some(runnable); WorkItemHandler::new(move |_| { @@ -60,8 +53,7 @@ impl WindowsDispatcher { Ok(()) }) }; - - ThreadPool::RunWithPriorityAsync(&handler, priority).log_err(); + ThreadPool::RunAsync(&handler).log_err(); } fn dispatch_on_threadpool_after(&self, runnable: RunnableVariant, duration: Duration) { @@ -87,7 +79,7 @@ impl WindowsDispatcher { start, end: None, }; - profiler::add_task_timing(timing); + Self::add_task_timing(timing); runnable.run(); @@ -99,7 +91,7 @@ impl WindowsDispatcher { start, end: None, }; - profiler::add_task_timing(timing); + Self::add_task_timing(timing); runnable.run(); @@ -110,7 +102,23 @@ impl WindowsDispatcher { let end = Instant::now(); timing.end = Some(end); - profiler::add_task_timing(timing); + Self::add_task_timing(timing); + } + + pub(crate) fn add_task_timing(timing: TaskTiming) { + THREAD_TIMINGS.with(|timings| { + let mut timings = timings.lock(); + let timings = &mut timings.timings; + + if let Some(last_timing) = timings.iter_mut().rev().next() { + if last_timing.location == timing.location { + last_timing.end = timing.end; + return; + } + } + + timings.push_back(timing); + }); } } @@ -138,22 +146,15 @@ impl PlatformDispatcher for WindowsDispatcher { current().id() == self.main_thread_id } - fn dispatch(&self, runnable: RunnableVariant, label: Option, priority: Priority) { - let priority = match priority { - Priority::Realtime(_) => unreachable!(), - Priority::High => WorkItemPriority::High, - Priority::Medium => WorkItemPriority::Normal, - Priority::Low => WorkItemPriority::Low, - }; - self.dispatch_on_threadpool(priority, runnable); - + fn dispatch(&self, runnable: RunnableVariant, label: Option) { + self.dispatch_on_threadpool(runnable); if let Some(label) = label { log::debug!("TaskLabel: {label:?}"); } } - fn dispatch_on_main_thread(&self, runnable: RunnableVariant, priority: Priority) { - match self.main_sender.send(priority, runnable) { + fn dispatch_on_main_thread(&self, runnable: RunnableVariant) { + match self.main_sender.send(runnable) { Ok(_) => { if !self.wake_posted.swap(true, Ordering::AcqRel) { unsafe { @@ -184,28 +185,4 @@ impl PlatformDispatcher for WindowsDispatcher { fn dispatch_after(&self, duration: Duration, runnable: RunnableVariant) { self.dispatch_on_threadpool_after(runnable, duration); } - - fn spawn_realtime(&self, priority: RealtimePriority, f: Box) { - std::thread::spawn(move || { - // SAFETY: always safe to call - let thread_handle = unsafe { GetCurrentThread() }; - - let thread_priority = match priority { - RealtimePriority::Audio => THREAD_PRIORITY_TIME_CRITICAL, - RealtimePriority::Other => THREAD_PRIORITY_HIGHEST, - }; - - // SAFETY: thread_handle is a valid handle to a thread - unsafe { SetPriorityClass(thread_handle, HIGH_PRIORITY_CLASS) } - .context("thread priority class") - .log_err(); - - // SAFETY: thread_handle is a valid handle to a thread - unsafe { SetThreadPriority(thread_handle, thread_priority) } - .context("thread priority") - .log_err(); - - f(); - }); - } } diff --git a/crates/gpui/src/platform/windows/events.rs b/crates/gpui/src/platform/windows/events.rs index f648f45cf4bf632ae07784de8bdc1503f88d6177..e6fa6006eb95ec45f1634cb72ef63e2f622455a7 100644 --- a/crates/gpui/src/platform/windows/events.rs +++ b/crates/gpui/src/platform/windows/events.rs @@ -243,8 +243,7 @@ impl WindowsWindowInner { fn handle_timer_msg(&self, handle: HWND, wparam: WPARAM) -> Option { if wparam.0 == SIZE_MOVE_LOOP_TIMER_ID { - let mut runnables = self.main_receiver.clone().try_iter(); - while let Some(Ok(runnable)) = runnables.next() { + for runnable in self.main_receiver.drain() { WindowsDispatcher::execute_runnable(runnable); } self.handle_paint_msg(handle) diff --git a/crates/gpui/src/platform/windows/platform.rs b/crates/gpui/src/platform/windows/platform.rs index fa847bca6b404538a9f75b757bf53a2e4e2a1418..af0cb89ecc94da70cc42c8d4c397aeb2a811d6fb 100644 --- a/crates/gpui/src/platform/windows/platform.rs +++ b/crates/gpui/src/platform/windows/platform.rs @@ -51,7 +51,7 @@ struct WindowsPlatformInner { raw_window_handles: std::sync::Weak>>, // The below members will never change throughout the entire lifecycle of the app. validation_number: usize, - main_receiver: PriorityQueueReceiver, + main_receiver: flume::Receiver, dispatcher: Arc, } @@ -98,7 +98,7 @@ impl WindowsPlatform { OleInitialize(None).context("unable to initialize Windows OLE")?; } let directx_devices = DirectXDevices::new().context("Creating DirectX devices")?; - let (main_sender, main_receiver) = PriorityQueueReceiver::new(); + let (main_sender, main_receiver) = flume::unbounded::(); let validation_number = if usize::BITS == 64 { rand::random::() as usize } else { @@ -857,24 +857,22 @@ impl WindowsPlatformInner { } break 'tasks; } - let mut main_receiver = self.main_receiver.clone(); - match main_receiver.try_pop() { - Ok(Some(runnable)) => WindowsDispatcher::execute_runnable(runnable), - _ => break 'timeout_loop, + match self.main_receiver.try_recv() { + Err(_) => break 'timeout_loop, + Ok(runnable) => WindowsDispatcher::execute_runnable(runnable), } } // Someone could enqueue a Runnable here. The flag is still true, so they will not PostMessage. // We need to check for those Runnables after we clear the flag. self.dispatcher.wake_posted.store(false, Ordering::Release); - let mut main_receiver = self.main_receiver.clone(); - match main_receiver.try_pop() { - Ok(Some(runnable)) => { + match self.main_receiver.try_recv() { + Err(_) => break 'tasks, + Ok(runnable) => { self.dispatcher.wake_posted.store(true, Ordering::Release); WindowsDispatcher::execute_runnable(runnable); } - _ => break 'tasks, } } @@ -936,7 +934,7 @@ pub(crate) struct WindowCreationInfo { pub(crate) windows_version: WindowsVersion, pub(crate) drop_target_helper: IDropTargetHelper, pub(crate) validation_number: usize, - pub(crate) main_receiver: PriorityQueueReceiver, + pub(crate) main_receiver: flume::Receiver, pub(crate) platform_window_handle: HWND, pub(crate) disable_direct_composition: bool, pub(crate) directx_devices: DirectXDevices, @@ -949,8 +947,8 @@ struct PlatformWindowCreateContext { inner: Option>>, raw_window_handles: std::sync::Weak>>, validation_number: usize, - main_sender: Option>, - main_receiver: Option>, + main_sender: Option>, + main_receiver: Option>, directx_devices: Option, dispatcher: Option>, } diff --git a/crates/gpui/src/platform/windows/window.rs b/crates/gpui/src/platform/windows/window.rs index 0cfa812b288406c5b4afcea37949eed3918f5c91..7ef92b4150e69424b68e9417dda377aa7f2e9cc0 100644 --- a/crates/gpui/src/platform/windows/window.rs +++ b/crates/gpui/src/platform/windows/window.rs @@ -81,7 +81,7 @@ pub(crate) struct WindowsWindowInner { pub(crate) executor: ForegroundExecutor, pub(crate) windows_version: WindowsVersion, pub(crate) validation_number: usize, - pub(crate) main_receiver: PriorityQueueReceiver, + pub(crate) main_receiver: flume::Receiver, pub(crate) platform_window_handle: HWND, } @@ -362,7 +362,7 @@ struct WindowCreateContext { windows_version: WindowsVersion, drop_target_helper: IDropTargetHelper, validation_number: usize, - main_receiver: PriorityQueueReceiver, + main_receiver: flume::Receiver, platform_window_handle: HWND, appearance: WindowAppearance, disable_direct_composition: bool, diff --git a/crates/gpui/src/profiler.rs b/crates/gpui/src/profiler.rs index 73f435d7e798c78d6c7320a49da804ebe703c434..4e3f00c412cd19c8269497ff292ce9dbdd785fbe 100644 --- a/crates/gpui/src/profiler.rs +++ b/crates/gpui/src/profiler.rs @@ -216,19 +216,3 @@ impl Drop for ThreadTimings { thread_timings.swap_remove(index); } } - -pub(crate) fn add_task_timing(timing: TaskTiming) { - THREAD_TIMINGS.with(|timings| { - let mut timings = timings.lock(); - let timings = &mut timings.timings; - - if let Some(last_timing) = timings.iter_mut().rev().next() { - if last_timing.location == timing.location { - last_timing.end = timing.end; - return; - } - } - - timings.push_back(timing); - }); -} diff --git a/crates/gpui/src/queue.rs b/crates/gpui/src/queue.rs deleted file mode 100644 index 3a4ef912ffd5fb85b80384454f7afd84cecb1648..0000000000000000000000000000000000000000 --- a/crates/gpui/src/queue.rs +++ /dev/null @@ -1,329 +0,0 @@ -use std::{ - fmt, - iter::FusedIterator, - sync::{Arc, atomic::AtomicUsize}, -}; - -use rand::{Rng, SeedableRng, rngs::SmallRng}; - -use crate::Priority; - -struct PriorityQueues { - high_priority: Vec, - medium_priority: Vec, - low_priority: Vec, -} - -impl PriorityQueues { - fn is_empty(&self) -> bool { - self.high_priority.is_empty() - && self.medium_priority.is_empty() - && self.low_priority.is_empty() - } -} - -struct PriorityQueueState { - queues: parking_lot::Mutex>, - condvar: parking_lot::Condvar, - receiver_count: AtomicUsize, - sender_count: AtomicUsize, -} - -impl PriorityQueueState { - fn send(&self, priority: Priority, item: T) -> Result<(), SendError> { - if self - .receiver_count - .load(std::sync::atomic::Ordering::Relaxed) - == 0 - { - return Err(SendError(item)); - } - - let mut queues = self.queues.lock(); - match priority { - Priority::Realtime(_) => unreachable!(), - Priority::High => queues.high_priority.push(item), - Priority::Medium => queues.medium_priority.push(item), - Priority::Low => queues.low_priority.push(item), - }; - self.condvar.notify_one(); - Ok(()) - } - - fn recv<'a>(&'a self) -> Result>, RecvError> { - let mut queues = self.queues.lock(); - - let sender_count = self.sender_count.load(std::sync::atomic::Ordering::Relaxed); - if queues.is_empty() && sender_count == 0 { - return Err(crate::queue::RecvError); - } - - // parking_lot doesn't do spurious wakeups so an if is fine - if queues.is_empty() { - self.condvar.wait(&mut queues); - } - - Ok(queues) - } - - fn try_recv<'a>( - &'a self, - ) -> Result>>, RecvError> { - let mut queues = self.queues.lock(); - - let sender_count = self.sender_count.load(std::sync::atomic::Ordering::Relaxed); - if queues.is_empty() && sender_count == 0 { - return Err(crate::queue::RecvError); - } - - if queues.is_empty() { - Ok(None) - } else { - Ok(Some(queues)) - } - } -} - -pub(crate) struct PriorityQueueSender { - state: Arc>, -} - -impl PriorityQueueSender { - fn new(state: Arc>) -> Self { - Self { state } - } - - pub(crate) fn send(&self, priority: Priority, item: T) -> Result<(), SendError> { - self.state.send(priority, item)?; - Ok(()) - } -} - -impl Drop for PriorityQueueSender { - fn drop(&mut self) { - self.state - .sender_count - .fetch_sub(1, std::sync::atomic::Ordering::AcqRel); - } -} - -pub(crate) struct PriorityQueueReceiver { - state: Arc>, - rand: SmallRng, - disconnected: bool, -} - -impl Clone for PriorityQueueReceiver { - fn clone(&self) -> Self { - self.state - .receiver_count - .fetch_add(1, std::sync::atomic::Ordering::AcqRel); - Self { - state: Arc::clone(&self.state), - rand: SmallRng::seed_from_u64(0), - disconnected: self.disconnected, - } - } -} - -pub(crate) struct SendError(T); - -impl fmt::Debug for SendError { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_tuple("SendError").field(&self.0).finish() - } -} - -#[derive(Debug)] -pub(crate) struct RecvError; - -#[allow(dead_code)] -impl PriorityQueueReceiver { - pub(crate) fn new() -> (PriorityQueueSender, Self) { - let state = PriorityQueueState { - queues: parking_lot::Mutex::new(PriorityQueues { - high_priority: Vec::new(), - medium_priority: Vec::new(), - low_priority: Vec::new(), - }), - condvar: parking_lot::Condvar::new(), - receiver_count: AtomicUsize::new(1), - sender_count: AtomicUsize::new(1), - }; - let state = Arc::new(state); - - let sender = PriorityQueueSender::new(Arc::clone(&state)); - - let receiver = PriorityQueueReceiver { - state, - rand: SmallRng::seed_from_u64(0), - disconnected: false, - }; - - (sender, receiver) - } - - /// Tries to pop one element from the priority queue without blocking. - /// - /// This will early return if there are no elements in the queue. - /// - /// This method is best suited if you only intend to pop one element, for better performance - /// on large queues see [`Self::try_iter`] - /// - /// # Errors - /// - /// If the sender was dropped - pub(crate) fn try_pop(&mut self) -> Result, RecvError> { - self.pop_inner(false) - } - - /// Pops an element from the priority queue blocking if necessary. - /// - /// This method is best suited if you only intend to pop one element, for better performance - /// on large queues see [`Self::iter``] - /// - /// # Errors - /// - /// If the sender was dropped - pub(crate) fn pop(&mut self) -> Result { - self.pop_inner(true).map(|e| e.unwrap()) - } - - /// Returns an iterator over the elements of the queue - /// this iterator will end when all elements have been consumed and will not wait for new ones. - pub(crate) fn try_iter(self) -> TryIter { - TryIter { - receiver: self, - ended: false, - } - } - - /// Returns an iterator over the elements of the queue - /// this iterator will wait for new elements if the queue is empty. - pub(crate) fn iter(self) -> Iter { - Iter(self) - } - - #[inline(always)] - // algorithm is the loaded die from biased coin from - // https://www.keithschwarz.com/darts-dice-coins/ - fn pop_inner(&mut self, block: bool) -> Result, RecvError> { - use Priority as P; - - let mut queues = if !block { - let Some(queues) = self.state.try_recv()? else { - return Ok(None); - }; - queues - } else { - self.state.recv()? - }; - - let high = P::High.probability() * !queues.high_priority.is_empty() as u32; - let medium = P::Medium.probability() * !queues.medium_priority.is_empty() as u32; - let low = P::Low.probability() * !queues.low_priority.is_empty() as u32; - let mut mass = high + medium + low; //% - - if !queues.high_priority.is_empty() { - let flip = self.rand.random_ratio(P::High.probability(), mass); - if flip { - return Ok(queues.high_priority.pop()); - } - mass -= P::High.probability(); - } - - if !queues.medium_priority.is_empty() { - let flip = self.rand.random_ratio(P::Medium.probability(), mass); - if flip { - return Ok(queues.medium_priority.pop()); - } - mass -= P::Medium.probability(); - } - - if !queues.low_priority.is_empty() { - let flip = self.rand.random_ratio(P::Low.probability(), mass); - if flip { - return Ok(queues.low_priority.pop()); - } - } - - Ok(None) - } -} - -impl Drop for PriorityQueueReceiver { - fn drop(&mut self) { - self.state - .receiver_count - .fetch_sub(1, std::sync::atomic::Ordering::AcqRel); - } -} - -/// If None is returned the sender disconnected -pub(crate) struct Iter(PriorityQueueReceiver); -impl Iterator for Iter { - type Item = T; - - fn next(&mut self) -> Option { - self.0.pop_inner(true).ok().flatten() - } -} -impl FusedIterator for Iter {} - -/// If None is returned there are no more elements in the queue -pub(crate) struct TryIter { - receiver: PriorityQueueReceiver, - ended: bool, -} -impl Iterator for TryIter { - type Item = Result; - - fn next(&mut self) -> Option { - if self.ended { - return None; - } - - let res = self.receiver.pop_inner(false); - self.ended = res.is_err(); - - res.transpose() - } -} -impl FusedIterator for TryIter {} - -#[cfg(test)] -mod tests { - use collections::HashSet; - - use super::*; - - #[test] - fn all_tasks_get_yielded() { - let (tx, mut rx) = PriorityQueueReceiver::new(); - tx.send(Priority::Medium, 20).unwrap(); - tx.send(Priority::High, 30).unwrap(); - tx.send(Priority::Low, 10).unwrap(); - tx.send(Priority::Medium, 21).unwrap(); - tx.send(Priority::High, 31).unwrap(); - - drop(tx); - - assert_eq!( - rx.iter().collect::>(), - [30, 31, 20, 21, 10].into_iter().collect::>() - ) - } - - #[test] - fn new_high_prio_task_get_scheduled_quickly() { - let (tx, mut rx) = PriorityQueueReceiver::new(); - for _ in 0..100 { - tx.send(Priority::Low, 1).unwrap(); - } - - assert_eq!(rx.pop().unwrap(), 1); - tx.send(Priority::High, 3).unwrap(); - assert_eq!(rx.pop().unwrap(), 3); - assert_eq!(rx.pop().unwrap(), 1); - } -} diff --git a/crates/gpui/src/window.rs b/crates/gpui/src/window.rs index 54fe99c2634f5afa2e1f1e224e969c21d4c38e34..1006b49c98d9d6c442c1406a6af6b0a7040e0b43 100644 --- a/crates/gpui/src/window.rs +++ b/crates/gpui/src/window.rs @@ -9,15 +9,14 @@ use crate::{ KeyBinding, KeyContext, KeyDownEvent, KeyEvent, Keystroke, KeystrokeEvent, LayoutId, LineLayoutIndex, Modifiers, ModifiersChangedEvent, MonochromeSprite, MouseButton, MouseEvent, MouseMoveEvent, MouseUpEvent, Path, Pixels, PlatformAtlas, PlatformDisplay, PlatformInput, - PlatformInputHandler, PlatformWindow, Point, PolychromeSprite, Priority, PromptButton, - PromptLevel, Quad, Render, RenderGlyphParams, RenderImage, RenderImageParams, RenderSvgParams, - Replay, ResizeEdge, SMOOTH_SVG_SCALE_FACTOR, SUBPIXEL_VARIANTS_X, SUBPIXEL_VARIANTS_Y, - ScaledPixels, Scene, Shadow, SharedString, Size, StrikethroughStyle, Style, SubscriberSet, - Subscription, SystemWindowTab, SystemWindowTabController, TabStopMap, TaffyLayoutEngine, Task, - TextStyle, TextStyleRefinement, TransformationMatrix, Underline, UnderlineStyle, - WindowAppearance, WindowBackgroundAppearance, WindowBounds, WindowControls, WindowDecorations, - WindowOptions, WindowParams, WindowTextSystem, point, prelude::*, px, rems, size, - transparent_black, + PlatformInputHandler, PlatformWindow, Point, PolychromeSprite, PromptButton, PromptLevel, Quad, + Render, RenderGlyphParams, RenderImage, RenderImageParams, RenderSvgParams, Replay, ResizeEdge, + SMOOTH_SVG_SCALE_FACTOR, SUBPIXEL_VARIANTS_X, SUBPIXEL_VARIANTS_Y, ScaledPixels, Scene, Shadow, + SharedString, Size, StrikethroughStyle, Style, SubscriberSet, Subscription, SystemWindowTab, + SystemWindowTabController, TabStopMap, TaffyLayoutEngine, Task, TextStyle, TextStyleRefinement, + TransformationMatrix, Underline, UnderlineStyle, WindowAppearance, WindowBackgroundAppearance, + WindowBounds, WindowControls, WindowDecorations, WindowOptions, WindowParams, WindowTextSystem, + point, prelude::*, px, rems, size, transparent_black, }; use anyhow::{Context as _, Result, anyhow}; use collections::{FxHashMap, FxHashSet}; @@ -1726,27 +1725,6 @@ impl Window { }) } - /// Spawn the future returned by the given closure on the application thread - /// pool, with the given priority. The closure is provided a handle to the - /// current window and an `AsyncWindowContext` for use within your future. - #[track_caller] - pub fn spawn_with_priority( - &self, - priority: Priority, - cx: &App, - f: AsyncFn, - ) -> Task - where - R: 'static, - AsyncFn: AsyncFnOnce(&mut AsyncWindowContext) -> R + 'static, - { - let handle = self.handle; - cx.spawn_with_priority(priority, async move |app| { - let mut async_window_cx = AsyncWindowContext::new_context(app.clone(), handle); - f(&mut async_window_cx).await - }) - } - fn bounds_changed(&mut self, cx: &mut App) { self.scale_factor = self.platform_window.scale_factor(); self.viewport_size = self.platform_window.content_size(); diff --git a/crates/repl/src/repl.rs b/crates/repl/src/repl.rs index 346cca0211e43d6f254cb8300f8b0dae546b6004..db21e198cc726df306bd94503615aa8633e0cbd6 100644 --- a/crates/repl/src/repl.rs +++ b/crates/repl/src/repl.rs @@ -12,7 +12,7 @@ mod session; use std::{sync::Arc, time::Duration}; use async_dispatcher::{Dispatcher, Runnable, set_dispatcher}; -use gpui::{App, PlatformDispatcher, Priority, RunnableVariant}; +use gpui::{App, PlatformDispatcher, RunnableVariant}; use project::Fs; pub use runtimelib::ExecutionState; @@ -46,7 +46,7 @@ fn zed_dispatcher(cx: &mut App) -> impl Dispatcher { impl Dispatcher for ZedDispatcher { fn dispatch(&self, runnable: Runnable) { self.dispatcher - .dispatch(RunnableVariant::Compat(runnable), None, Priority::default()); + .dispatch(RunnableVariant::Compat(runnable), None); } fn dispatch_after(&self, duration: Duration, runnable: Runnable) { diff --git a/crates/worktree/src/worktree.rs b/crates/worktree/src/worktree.rs index ed1df19f83b2bd2f73622270bc8cea222c983613..4df7a93f13e3c1ff80f716141a2db727b7a5e693 100644 --- a/crates/worktree/src/worktree.rs +++ b/crates/worktree/src/worktree.rs @@ -22,8 +22,7 @@ use git::{ COMMIT_MESSAGE, DOT_GIT, FSMONITOR_DAEMON, GITIGNORE, INDEX_LOCK, LFS_DIR, status::GitSummary, }; use gpui::{ - App, AppContext as _, AsyncApp, BackgroundExecutor, Context, Entity, EventEmitter, Priority, - Task, + App, AppContext as _, AsyncApp, BackgroundExecutor, Context, Entity, EventEmitter, Task, }; use ignore::IgnoreStack; use language::DiskState; @@ -4124,7 +4123,7 @@ impl BackgroundScanner { let progress_update_count = AtomicUsize::new(0); self.executor - .scoped_priority(Priority::Low, |scope| { + .scoped(|scope| { for _ in 0..self.executor.num_cpus() { scope.spawn(async { let mut last_progress_update_count = 0; diff --git a/typos.toml b/typos.toml index 20a7b511a85676e3c5e49c23cab71c52e471cee9..cfc4ec86a853d1aeb16ca41fefd1d9fe368659d1 100644 --- a/typos.toml +++ b/typos.toml @@ -52,8 +52,6 @@ extend-exclude = [ "crates/project_panel/benches/linux_repo_snapshot.txt", # Some multibuffer test cases have word fragments that register as typos "crates/multi_buffer/src/multi_buffer_tests.rs", - # Macos apis - "crates/gpui/src/platform/mac/dispatcher.rs", ] [default]