Cargo.lock 🔗
@@ -7239,6 +7239,7 @@ dependencies = [
"libc",
"log",
"lyon",
+ "mach2 0.5.0",
"media",
"metal",
"naga",
localcc , Yara , and dvdsk created
Improves the scheduler by allowing tasks to have a set priority which
will significantly improve responsiveness.
Release notes:
- N/A
---------
Co-authored-by: Yara <git@yara.blue>
Co-authored-by: dvdsk <noreply@davidsk.dev>
Cargo.lock | 1
crates/gpui/Cargo.toml | 5
crates/gpui/build.rs | 2
crates/gpui/src/app.rs | 27 +
crates/gpui/src/app/context.rs | 23 +
crates/gpui/src/executor.rs | 170 ++++++++
crates/gpui/src/gpui.rs | 9
crates/gpui/src/platform.rs | 12
crates/gpui/src/platform/linux/dispatcher.rs | 329 +++++++++++++++--
crates/gpui/src/platform/linux/platform.rs | 10
crates/gpui/src/platform/linux/wayland/client.rs | 10
crates/gpui/src/platform/linux/x11/client.rs | 8
crates/gpui/src/platform/mac/dispatcher.rs | 145 +++++++
crates/gpui/src/platform/test/dispatcher.rs | 12
crates/gpui/src/platform/windows/dispatcher.rs | 85 ++-
crates/gpui/src/platform/windows/events.rs | 3
crates/gpui/src/platform/windows/platform.rs | 24
crates/gpui/src/platform/windows/window.rs | 4
crates/gpui/src/profiler.rs | 16
crates/gpui/src/queue.rs | 329 ++++++++++++++++++
crates/gpui/src/window.rs | 38 +
crates/repl/src/repl.rs | 4
crates/worktree/src/worktree.rs | 5
typos.toml | 2
24 files changed, 1,118 insertions(+), 155 deletions(-)
@@ -7239,6 +7239,7 @@ dependencies = [
"libc",
"log",
"lyon",
+ "mach2 0.5.0",
"media",
"metal",
"naga",
@@ -21,7 +21,6 @@ default = ["font-kit", "wayland", "x11", "windows-manifest"]
test-support = [
"leak-detection",
"collections/test-support",
- "rand",
"util/test-support",
"http_client/test-support",
"wayland",
@@ -109,7 +108,7 @@ parking = "2.0.0"
parking_lot.workspace = true
postage.workspace = true
profiling.workspace = true
-rand = { optional = true, workspace = true }
+rand.workspace = true
raw-window-handle = "0.6"
refineable.workspace = true
resvg = { version = "0.45.0", default-features = false, features = [
@@ -158,8 +157,10 @@ media.workspace = true
objc.workspace = true
objc2 = { version = "0.6", optional = true }
objc2-metal = { version = "0.3", optional = true }
+mach2.workspace = true
#TODO: replace with "objc2"
metal.workspace = true
+flume = "0.11"
[target.'cfg(any(target_os = "linux", target_os = "freebsd", target_os = "macos"))'.dependencies]
pathfinder_geometry = "0.5"
@@ -84,6 +84,8 @@ mod macos {
.allowlist_var("_dispatch_main_q")
.allowlist_var("_dispatch_source_type_data_add")
.allowlist_var("DISPATCH_QUEUE_PRIORITY_HIGH")
+ .allowlist_var("DISPATCH_QUEUE_PRIORITY_DEFAULT")
+ .allowlist_var("DISPATCH_QUEUE_PRIORITY_LOW")
.allowlist_var("DISPATCH_TIME_NOW")
.allowlist_function("dispatch_get_global_queue")
.allowlist_function("dispatch_async_f")
@@ -38,10 +38,11 @@ use crate::{
AssetSource, BackgroundExecutor, Bounds, ClipboardItem, CursorStyle, DispatchPhase, DisplayId,
EventEmitter, FocusHandle, FocusMap, ForegroundExecutor, Global, KeyBinding, KeyContext,
Keymap, Keystroke, LayoutId, Menu, MenuItem, OwnedMenu, PathPromptOptions, Pixels, Platform,
- PlatformDisplay, PlatformKeyboardLayout, PlatformKeyboardMapper, Point, PromptBuilder,
- PromptButton, PromptHandle, PromptLevel, Render, RenderImage, RenderablePromptHandle,
- Reservation, ScreenCaptureSource, SharedString, SubscriberSet, Subscription, SvgRenderer, Task,
- TextSystem, Window, WindowAppearance, WindowHandle, WindowId, WindowInvalidator,
+ PlatformDisplay, PlatformKeyboardLayout, PlatformKeyboardMapper, Point, Priority,
+ PromptBuilder, PromptButton, PromptHandle, PromptLevel, Render, RenderImage,
+ RenderablePromptHandle, Reservation, ScreenCaptureSource, SharedString, SubscriberSet,
+ Subscription, SvgRenderer, Task, TextSystem, Window, WindowAppearance, WindowHandle, WindowId,
+ WindowInvalidator,
colors::{Colors, GlobalColors},
current_platform, hash, init_app_menus,
};
@@ -1494,6 +1495,24 @@ impl App {
.spawn(async move { f(&mut cx).await })
}
+ /// Spawns the future returned by the given function on the main thread with
+ /// the given priority. The closure will be invoked with [AsyncApp], which
+ /// allows the application state to be accessed across await points.
+ pub fn spawn_with_priority<AsyncFn, R>(&self, priority: Priority, f: AsyncFn) -> Task<R>
+ where
+ AsyncFn: AsyncFnOnce(&mut AsyncApp) -> R + 'static,
+ R: 'static,
+ {
+ if self.quitting {
+ debug_panic!("Can't spawn on main thread after on_app_quit")
+ };
+
+ let mut cx = self.to_async();
+
+ self.foreground_executor
+ .spawn_with_priority(priority, async move { f(&mut cx).await })
+ }
+
/// Schedules the given function to be run at the end of the current effect cycle, allowing entities
/// that are currently on the stack to be returned to the app.
pub fn defer(&mut self, f: impl FnOnce(&mut App) + 'static) {
@@ -1,7 +1,7 @@
use crate::{
AnyView, AnyWindowHandle, AppContext, AsyncApp, DispatchPhase, Effect, EntityId, EventEmitter,
- FocusHandle, FocusOutEvent, Focusable, Global, KeystrokeObserver, Reservation, SubscriberSet,
- Subscription, Task, WeakEntity, WeakFocusHandle, Window, WindowHandle,
+ FocusHandle, FocusOutEvent, Focusable, Global, KeystrokeObserver, Priority, Reservation,
+ SubscriberSet, Subscription, Task, WeakEntity, WeakFocusHandle, Window, WindowHandle,
};
use anyhow::Result;
use futures::FutureExt;
@@ -667,6 +667,25 @@ impl<'a, T: 'static> Context<'a, T> {
window.spawn(self, async move |cx| f(view, cx).await)
}
+ /// Schedule a future to be run asynchronously with the given priority.
+ /// The given callback is invoked with a [`WeakEntity<V>`] to avoid leaking the entity for a long-running process.
+ /// It's also given an [`AsyncWindowContext`], which can be used to access the state of the entity across await points.
+ /// The returned future will be polled on the main thread.
+ #[track_caller]
+ pub fn spawn_in_with_priority<AsyncFn, R>(
+ &self,
+ priority: Priority,
+ window: &Window,
+ f: AsyncFn,
+ ) -> Task<R>
+ where
+ R: 'static,
+ AsyncFn: AsyncFnOnce(WeakEntity<T>, &mut AsyncWindowContext) -> R + 'static,
+ {
+ let view = self.weak_entity();
+ window.spawn_with_priority(priority, self, async move |cx| f(view, cx).await)
+ }
+
/// Register a callback to be invoked when the given global state changes.
pub fn observe_global_in<G: Global>(
&mut self,
@@ -1,4 +1,4 @@
-use crate::{App, PlatformDispatcher, RunnableMeta, RunnableVariant};
+use crate::{App, PlatformDispatcher, RunnableMeta, RunnableVariant, TaskTiming, profiler};
use async_task::Runnable;
use futures::channel::mpsc;
use parking_lot::{Condvar, Mutex};
@@ -47,6 +47,52 @@ pub struct ForegroundExecutor {
not_send: PhantomData<Rc<()>>,
}
+/// Realtime task priority
+#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
+#[repr(u8)]
+pub enum RealtimePriority {
+ /// Audio task
+ Audio,
+ /// Other realtime task
+ #[default]
+ Other,
+}
+
+/// Task priority
+#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
+#[repr(u8)]
+pub enum Priority {
+ /// Realtime priority
+ ///
+ /// Spawning a task with this priority will spin it off on a separate thread dedicated just to that task.
+ Realtime(RealtimePriority),
+ /// High priority
+ ///
+ /// Only use for tasks that are critical to the user experience / responsiveness of the editor.
+ High,
+ /// Medium priority, probably suits most of your use cases.
+ #[default]
+ Medium,
+ /// Low priority
+ ///
+ /// Prioritize this for background work that can come in large quantities
+ /// to not starve the executor of resources for high priority tasks
+ Low,
+}
+
+impl Priority {
+ #[allow(dead_code)]
+ pub(crate) const fn probability(&self) -> u32 {
+ match self {
+ // realtime priorities are not considered for probability scheduling
+ Priority::Realtime(_) => 0,
+ Priority::High => 60,
+ Priority::Medium => 30,
+ Priority::Low => 10,
+ }
+ }
+}
+
/// Task is a primitive that allows work to happen in the background.
///
/// It implements [`Future`] so you can `.await` on it.
@@ -152,7 +198,20 @@ impl BackgroundExecutor {
where
R: Send + 'static,
{
- self.spawn_internal::<R>(Box::pin(future), None)
+ self.spawn_with_priority(Priority::default(), future)
+ }
+
+ /// Enqueues the given future to be run to completion on a background thread.
+ #[track_caller]
+ pub fn spawn_with_priority<R>(
+ &self,
+ priority: Priority,
+ future: impl Future<Output = R> + Send + 'static,
+ ) -> Task<R>
+ where
+ R: Send + 'static,
+ {
+ self.spawn_internal::<R>(Box::pin(future), None, priority)
}
/// Enqueues the given future to be run to completion on a background thread and blocking the current task on it.
@@ -199,7 +258,13 @@ impl BackgroundExecutor {
let _notify_guard = NotifyOnDrop(pair);
future.await
},
- move |runnable| dispatcher.dispatch(RunnableVariant::Meta(runnable), None),
+ move |runnable| {
+ dispatcher.dispatch(
+ RunnableVariant::Meta(runnable),
+ None,
+ Priority::default(),
+ )
+ },
)
};
runnable.schedule();
@@ -217,7 +282,7 @@ impl BackgroundExecutor {
where
R: Send + 'static,
{
- self.spawn_internal::<R>(Box::pin(future), Some(label))
+ self.spawn_internal::<R>(Box::pin(future), Some(label), Priority::default())
}
#[track_caller]
@@ -225,15 +290,55 @@ impl BackgroundExecutor {
&self,
future: AnyFuture<R>,
label: Option<TaskLabel>,
+ priority: Priority,
) -> Task<R> {
let dispatcher = self.dispatcher.clone();
- let location = core::panic::Location::caller();
- let (runnable, task) = async_task::Builder::new()
- .metadata(RunnableMeta { location })
- .spawn(
- move |_| future,
- move |runnable| dispatcher.dispatch(RunnableVariant::Meta(runnable), label),
+ let (runnable, task) = if let Priority::Realtime(realtime) = priority {
+ let location = core::panic::Location::caller();
+ let (mut tx, rx) = flume::bounded::<Runnable<RunnableMeta>>(1);
+
+ dispatcher.spawn_realtime(
+ realtime,
+ Box::new(move || {
+ while let Ok(runnable) = rx.recv() {
+ let start = Instant::now();
+ let location = runnable.metadata().location;
+ let mut timing = TaskTiming {
+ location,
+ start,
+ end: None,
+ };
+ profiler::add_task_timing(timing);
+
+ runnable.run();
+
+ let end = Instant::now();
+ timing.end = Some(end);
+ profiler::add_task_timing(timing);
+ }
+ }),
);
+
+ async_task::Builder::new()
+ .metadata(RunnableMeta { location })
+ .spawn(
+ move |_| future,
+ move |runnable| {
+ let _ = tx.send(runnable);
+ },
+ )
+ } else {
+ let location = core::panic::Location::caller();
+ async_task::Builder::new()
+ .metadata(RunnableMeta { location })
+ .spawn(
+ move |_| future,
+ move |runnable| {
+ dispatcher.dispatch(RunnableVariant::Meta(runnable), label, priority)
+ },
+ )
+ };
+
runnable.schedule();
Task(TaskState::Spawned(task))
}
@@ -406,11 +511,28 @@ impl BackgroundExecutor {
where
F: FnOnce(&mut Scope<'scope>),
{
- let mut scope = Scope::new(self.clone());
+ let mut scope = Scope::new(self.clone(), Priority::default());
(scheduler)(&mut scope);
let spawned = mem::take(&mut scope.futures)
.into_iter()
- .map(|f| self.spawn(f))
+ .map(|f| self.spawn_with_priority(scope.priority, f))
+ .collect::<Vec<_>>();
+ for task in spawned {
+ task.await;
+ }
+ }
+
+ /// Scoped lets you start a number of tasks and waits
+ /// for all of them to complete before returning.
+ pub async fn scoped_priority<'scope, F>(&self, priority: Priority, scheduler: F)
+ where
+ F: FnOnce(&mut Scope<'scope>),
+ {
+ let mut scope = Scope::new(self.clone(), priority);
+ (scheduler)(&mut scope);
+ let spawned = mem::take(&mut scope.futures)
+ .into_iter()
+ .map(|f| self.spawn_with_priority(scope.priority, f))
.collect::<Vec<_>>();
for task in spawned {
task.await;
@@ -546,6 +668,19 @@ impl ForegroundExecutor {
/// Enqueues the given Task to run on the main thread at some point in the future.
#[track_caller]
pub fn spawn<R>(&self, future: impl Future<Output = R> + 'static) -> Task<R>
+ where
+ R: 'static,
+ {
+ self.spawn_with_priority(Priority::default(), future)
+ }
+
+ /// Enqueues the given Task to run on the main thread at some point in the future.
+ #[track_caller]
+ pub fn spawn_with_priority<R>(
+ &self,
+ priority: Priority,
+ future: impl Future<Output = R> + 'static,
+ ) -> Task<R>
where
R: 'static,
{
@@ -557,16 +692,19 @@ impl ForegroundExecutor {
dispatcher: Arc<dyn PlatformDispatcher>,
future: AnyLocalFuture<R>,
location: &'static core::panic::Location<'static>,
+ priority: Priority,
) -> Task<R> {
let (runnable, task) = spawn_local_with_source_location(
future,
- move |runnable| dispatcher.dispatch_on_main_thread(RunnableVariant::Meta(runnable)),
+ move |runnable| {
+ dispatcher.dispatch_on_main_thread(RunnableVariant::Meta(runnable), priority)
+ },
RunnableMeta { location },
);
runnable.schedule();
Task(TaskState::Spawned(task))
}
- inner::<R>(dispatcher, Box::pin(future), location)
+ inner::<R>(dispatcher, Box::pin(future), location, priority)
}
}
@@ -642,6 +780,7 @@ where
/// Scope manages a set of tasks that are enqueued and waited on together. See [`BackgroundExecutor::scoped`].
pub struct Scope<'a> {
executor: BackgroundExecutor,
+ priority: Priority,
futures: Vec<Pin<Box<dyn Future<Output = ()> + Send + 'static>>>,
tx: Option<mpsc::Sender<()>>,
rx: mpsc::Receiver<()>,
@@ -649,10 +788,11 @@ pub struct Scope<'a> {
}
impl<'a> Scope<'a> {
- fn new(executor: BackgroundExecutor) -> Self {
+ fn new(executor: BackgroundExecutor, priority: Priority) -> Self {
let (tx, rx) = mpsc::channel(1);
Self {
executor,
+ priority,
tx: Some(tx),
rx,
futures: Default::default(),
@@ -31,6 +31,8 @@ mod path_builder;
mod platform;
pub mod prelude;
mod profiler;
+#[cfg(any(target_os = "windows", target_os = "linux"))]
+mod queue;
mod scene;
mod shared_string;
mod shared_uri;
@@ -89,16 +91,20 @@ pub use keymap::*;
pub use path_builder::*;
pub use platform::*;
pub use profiler::*;
+#[cfg(any(target_os = "windows", target_os = "linux"))]
+pub(crate) use queue::{PriorityQueueReceiver, PriorityQueueSender};
pub use refineable::*;
pub use scene::*;
pub use shared_string::*;
pub use shared_uri::*;
pub use smol::Timer;
+use std::{any::Any, future::Future};
pub use style::*;
pub use styled::*;
pub use subscription::*;
pub use svg_renderer::*;
pub(crate) use tab_stop::*;
+use taffy::TaffyLayoutEngine;
pub use taffy::{AvailableSpace, LayoutId};
#[cfg(any(test, feature = "test-support"))]
pub use test::*;
@@ -109,9 +115,6 @@ pub use util::{FutureExt, Timeout, arc_cow::ArcCow};
pub use view::*;
pub use window::*;
-use std::{any::Any, future::Future};
-use taffy::TaffyLayoutEngine;
-
/// The context trait, allows the different contexts in GPUI to be used
/// interchangeably for certain operations.
pub trait AppContext {
@@ -39,9 +39,10 @@ use crate::{
Action, AnyWindowHandle, App, AsyncWindowContext, BackgroundExecutor, Bounds,
DEFAULT_WINDOW_SIZE, DevicePixels, DispatchEventResult, Font, FontId, FontMetrics, FontRun,
ForegroundExecutor, GlyphId, GpuSpecs, ImageSource, Keymap, LineLayout, Pixels, PlatformInput,
- Point, RenderGlyphParams, RenderImage, RenderImageParams, RenderSvgParams, Scene, ShapedGlyph,
- ShapedRun, SharedString, Size, SvgRenderer, SystemWindowTab, Task, TaskLabel, TaskTiming,
- ThreadTaskTimings, Window, WindowControlArea, hash, point, px, size,
+ Point, Priority, RealtimePriority, RenderGlyphParams, RenderImage, RenderImageParams,
+ RenderSvgParams, Scene, ShapedGlyph, ShapedRun, SharedString, Size, SvgRenderer,
+ SystemWindowTab, Task, TaskLabel, TaskTiming, ThreadTaskTimings, Window, WindowControlArea,
+ hash, point, px, size,
};
use anyhow::Result;
use async_task::Runnable;
@@ -587,9 +588,10 @@ pub trait PlatformDispatcher: Send + Sync {
fn get_all_timings(&self) -> Vec<ThreadTaskTimings>;
fn get_current_thread_timings(&self) -> Vec<TaskTiming>;
fn is_main_thread(&self) -> bool;
- fn dispatch(&self, runnable: RunnableVariant, label: Option<TaskLabel>);
- fn dispatch_on_main_thread(&self, runnable: RunnableVariant);
+ fn dispatch(&self, runnable: RunnableVariant, label: Option<TaskLabel>, priority: Priority);
+ fn dispatch_on_main_thread(&self, runnable: RunnableVariant, priority: Priority);
fn dispatch_after(&self, duration: Duration, runnable: RunnableVariant);
+ fn spawn_realtime(&self, priority: RealtimePriority, f: Box<dyn FnOnce() + Send>);
fn now(&self) -> Instant {
Instant::now()
@@ -1,9 +1,10 @@
use crate::{
- GLOBAL_THREAD_TIMINGS, PlatformDispatcher, RunnableVariant, THREAD_TIMINGS, TaskLabel,
- TaskTiming, ThreadTaskTimings,
+ GLOBAL_THREAD_TIMINGS, PlatformDispatcher, Priority, PriorityQueueReceiver,
+ PriorityQueueSender, RealtimePriority, RunnableVariant, THREAD_TIMINGS, TaskLabel, TaskTiming,
+ ThreadTaskTimings, profiler,
};
use calloop::{
- EventLoop,
+ EventLoop, PostAction,
channel::{self, Sender},
timer::TimeoutAction,
};
@@ -19,9 +20,9 @@ struct TimerAfter {
}
pub(crate) struct LinuxDispatcher {
- main_sender: Sender<RunnableVariant>,
+ main_sender: PriorityQueueCalloopSender<RunnableVariant>,
timer_sender: Sender<TimerAfter>,
- background_sender: flume::Sender<RunnableVariant>,
+ background_sender: PriorityQueueSender<RunnableVariant>,
_background_threads: Vec<thread::JoinHandle<()>>,
main_thread_id: thread::ThreadId,
}
@@ -29,18 +30,20 @@ pub(crate) struct LinuxDispatcher {
const MIN_THREADS: usize = 2;
impl LinuxDispatcher {
- pub fn new(main_sender: Sender<RunnableVariant>) -> Self {
- let (background_sender, background_receiver) = flume::unbounded::<RunnableVariant>();
+ pub fn new(main_sender: PriorityQueueCalloopSender<RunnableVariant>) -> Self {
+ let (background_sender, background_receiver) = PriorityQueueReceiver::new();
let thread_count =
std::thread::available_parallelism().map_or(MIN_THREADS, |i| i.get().max(MIN_THREADS));
+ // These thread should really be lower prio then the foreground
+ // executor
let mut background_threads = (0..thread_count)
.map(|i| {
- let receiver = background_receiver.clone();
+ let mut receiver = background_receiver.clone();
std::thread::Builder::new()
.name(format!("Worker-{i}"))
.spawn(move || {
- for runnable in receiver {
+ for runnable in receiver.iter() {
let start = Instant::now();
let mut location = match runnable {
@@ -51,7 +54,7 @@ impl LinuxDispatcher {
start,
end: None,
};
- Self::add_task_timing(timing);
+ profiler::add_task_timing(timing);
runnable.run();
timing
@@ -63,7 +66,7 @@ impl LinuxDispatcher {
start,
end: None,
};
- Self::add_task_timing(timing);
+ profiler::add_task_timing(timing);
runnable.run();
timing
@@ -72,7 +75,7 @@ impl LinuxDispatcher {
let end = Instant::now();
location.end = Some(end);
- Self::add_task_timing(location);
+ profiler::add_task_timing(location);
log::trace!(
"background thread {}: ran runnable. took: {:?}",
@@ -113,7 +116,7 @@ impl LinuxDispatcher {
start,
end: None,
};
- Self::add_task_timing(timing);
+ profiler::add_task_timing(timing);
runnable.run();
timing
@@ -124,7 +127,7 @@ impl LinuxDispatcher {
start,
end: None,
};
- Self::add_task_timing(timing);
+ profiler::add_task_timing(timing);
runnable.run();
timing
@@ -133,7 +136,7 @@ impl LinuxDispatcher {
let end = Instant::now();
timing.end = Some(end);
- Self::add_task_timing(timing);
+ profiler::add_task_timing(timing);
}
TimeoutAction::Drop
},
@@ -157,22 +160,6 @@ impl LinuxDispatcher {
main_thread_id: thread::current().id(),
}
}
-
- pub(crate) fn add_task_timing(timing: TaskTiming) {
- THREAD_TIMINGS.with(|timings| {
- let mut timings = timings.lock();
- let timings = &mut timings.timings;
-
- if let Some(last_timing) = timings.iter_mut().rev().next() {
- if last_timing.location == timing.location {
- last_timing.end = timing.end;
- return;
- }
- }
-
- timings.push_back(timing);
- });
- }
}
impl PlatformDispatcher for LinuxDispatcher {
@@ -199,22 +186,26 @@ impl PlatformDispatcher for LinuxDispatcher {
thread::current().id() == self.main_thread_id
}
- fn dispatch(&self, runnable: RunnableVariant, _: Option<TaskLabel>) {
- self.background_sender.send(runnable).unwrap();
+ fn dispatch(&self, runnable: RunnableVariant, _: Option<TaskLabel>, priority: Priority) {
+ self.background_sender
+ .send(priority, runnable)
+ .unwrap_or_else(|_| panic!("blocking sender returned without value"));
}
- fn dispatch_on_main_thread(&self, runnable: RunnableVariant) {
- self.main_sender.send(runnable).unwrap_or_else(|runnable| {
- // NOTE: Runnable may wrap a Future that is !Send.
- //
- // This is usually safe because we only poll it on the main thread.
- // However if the send fails, we know that:
- // 1. main_receiver has been dropped (which implies the app is shutting down)
- // 2. we are on a background thread.
- // It is not safe to drop something !Send on the wrong thread, and
- // the app will exit soon anyway, so we must forget the runnable.
- std::mem::forget(runnable);
- });
+ fn dispatch_on_main_thread(&self, runnable: RunnableVariant, priority: Priority) {
+ self.main_sender
+ .send(priority, runnable)
+ .unwrap_or_else(|runnable| {
+ // NOTE: Runnable may wrap a Future that is !Send.
+ //
+ // This is usually safe because we only poll it on the main thread.
+ // However if the send fails, we know that:
+ // 1. main_receiver has been dropped (which implies the app is shutting down)
+ // 2. we are on a background thread.
+ // It is not safe to drop something !Send on the wrong thread, and
+ // the app will exit soon anyway, so we must forget the runnable.
+ std::mem::forget(runnable);
+ });
}
fn dispatch_after(&self, duration: Duration, runnable: RunnableVariant) {
@@ -222,4 +213,252 @@ impl PlatformDispatcher for LinuxDispatcher {
.send(TimerAfter { duration, runnable })
.ok();
}
+
+ fn spawn_realtime(&self, priority: RealtimePriority, f: Box<dyn FnOnce() + Send>) {
+ std::thread::spawn(move || {
+ // SAFETY: always safe to call
+ let thread_id = unsafe { libc::pthread_self() };
+
+ let policy = match priority {
+ RealtimePriority::Audio => libc::SCHED_FIFO,
+ RealtimePriority::Other => libc::SCHED_RR,
+ };
+ let sched_priority = match priority {
+ RealtimePriority::Audio => 65,
+ RealtimePriority::Other => 45,
+ };
+
+ let sched_param = libc::sched_param { sched_priority };
+ // SAFETY: sched_param is a valid initialized structure
+ let result = unsafe { libc::pthread_setschedparam(thread_id, policy, &sched_param) };
+ if result != 0 {
+ log::warn!("failed to set realtime thread priority to {:?}", priority);
+ }
+
+ f();
+ });
+ }
+}
+
+pub struct PriorityQueueCalloopSender<T> {
+ sender: PriorityQueueSender<T>,
+ ping: calloop::ping::Ping,
+}
+
+impl<T> PriorityQueueCalloopSender<T> {
+ fn new(tx: PriorityQueueSender<T>, ping: calloop::ping::Ping) -> Self {
+ Self { sender: tx, ping }
+ }
+
+ fn send(&self, priority: Priority, item: T) -> Result<(), crate::queue::SendError<T>> {
+ let res = self.sender.send(priority, item);
+ if res.is_ok() {
+ self.ping.ping();
+ }
+ res
+ }
+}
+
+impl<T> Drop for PriorityQueueCalloopSender<T> {
+ fn drop(&mut self) {
+ self.ping.ping();
+ }
+}
+
+pub struct PriorityQueueCalloopReceiver<T> {
+ receiver: PriorityQueueReceiver<T>,
+ source: calloop::ping::PingSource,
+ ping: calloop::ping::Ping,
+}
+
+impl<T> PriorityQueueCalloopReceiver<T> {
+ pub fn new() -> (PriorityQueueCalloopSender<T>, Self) {
+ let (ping, source) = calloop::ping::make_ping().expect("Failed to create a Ping.");
+
+ let (tx, rx) = PriorityQueueReceiver::new();
+
+ (
+ PriorityQueueCalloopSender::new(tx, ping.clone()),
+ Self {
+ receiver: rx,
+ source,
+ ping,
+ },
+ )
+ }
}
+
+use calloop::channel::Event;
+
+#[derive(Debug)]
+pub struct ChannelError(calloop::ping::PingError);
+
+impl std::fmt::Display for ChannelError {
+ #[cfg_attr(feature = "nightly_coverage", coverage(off))]
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ std::fmt::Display::fmt(&self.0, f)
+ }
+}
+
+impl std::error::Error for ChannelError {
+ #[cfg_attr(feature = "nightly_coverage", coverage(off))]
+ fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
+ Some(&self.0)
+ }
+}
+
+impl<T> calloop::EventSource for PriorityQueueCalloopReceiver<T> {
+ type Event = Event<T>;
+ type Metadata = ();
+ type Ret = ();
+ type Error = ChannelError;
+
+ fn process_events<F>(
+ &mut self,
+ readiness: calloop::Readiness,
+ token: calloop::Token,
+ mut callback: F,
+ ) -> Result<calloop::PostAction, Self::Error>
+ where
+ F: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret,
+ {
+ let mut clear_readiness = false;
+ let mut disconnected = false;
+
+ let action = self
+ .source
+ .process_events(readiness, token, |(), &mut ()| {
+ let mut is_empty = true;
+
+ let mut receiver = self.receiver.clone();
+ for runnable in receiver.try_iter() {
+ match runnable {
+ Ok(r) => {
+ callback(Event::Msg(r), &mut ());
+ is_empty = false;
+ }
+ Err(_) => {
+ disconnected = true;
+ }
+ }
+ }
+
+ if disconnected {
+ callback(Event::Closed, &mut ());
+ }
+
+ if is_empty {
+ clear_readiness = true;
+ }
+ })
+ .map_err(ChannelError)?;
+
+ if disconnected {
+ Ok(PostAction::Remove)
+ } else if clear_readiness {
+ Ok(action)
+ } else {
+ // Re-notify the ping source so we can try again.
+ self.ping.ping();
+ Ok(PostAction::Continue)
+ }
+ }
+
+ fn register(
+ &mut self,
+ poll: &mut calloop::Poll,
+ token_factory: &mut calloop::TokenFactory,
+ ) -> calloop::Result<()> {
+ self.source.register(poll, token_factory)
+ }
+
+ fn reregister(
+ &mut self,
+ poll: &mut calloop::Poll,
+ token_factory: &mut calloop::TokenFactory,
+ ) -> calloop::Result<()> {
+ self.source.reregister(poll, token_factory)
+ }
+
+ fn unregister(&mut self, poll: &mut calloop::Poll) -> calloop::Result<()> {
+ self.source.unregister(poll)
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn calloop_works() {
+ let mut event_loop = calloop::EventLoop::try_new().unwrap();
+ let handle = event_loop.handle();
+
+ let (tx, rx) = PriorityQueueCalloopReceiver::new();
+
+ struct Data {
+ got_msg: bool,
+ got_closed: bool,
+ }
+
+ let mut data = Data {
+ got_msg: false,
+ got_closed: false,
+ };
+
+ let _channel_token = handle
+ .insert_source(rx, move |evt, &mut (), data: &mut Data| match evt {
+ Event::Msg(()) => {
+ data.got_msg = true;
+ }
+
+ Event::Closed => {
+ data.got_closed = true;
+ }
+ })
+ .unwrap();
+
+ // nothing is sent, nothing is received
+ event_loop
+ .dispatch(Some(::std::time::Duration::ZERO), &mut data)
+ .unwrap();
+
+ assert!(!data.got_msg);
+ assert!(!data.got_closed);
+ // a message is send
+
+ tx.send(Priority::Medium, ()).unwrap();
+ event_loop
+ .dispatch(Some(::std::time::Duration::ZERO), &mut data)
+ .unwrap();
+
+ assert!(data.got_msg);
+ assert!(!data.got_closed);
+
+ // the sender is dropped
+ drop(tx);
+ event_loop
+ .dispatch(Some(::std::time::Duration::ZERO), &mut data)
+ .unwrap();
+
+ assert!(data.got_msg);
+ assert!(data.got_closed);
+ }
+}
+
+// running 1 test
+// test platform::linux::dispatcher::tests::tomato ... FAILED
+
+// failures:
+
+// ---- platform::linux::dispatcher::tests::tomato stdout ----
+// [crates/gpui/src/platform/linux/dispatcher.rs:262:9]
+// returning 1 tasks to process
+// [crates/gpui/src/platform/linux/dispatcher.rs:480:75] evt = Msg(
+// (),
+// )
+// returning 0 tasks to process
+
+// thread 'platform::linux::dispatcher::tests::tomato' (478301) panicked at crates/gpui/src/platform/linux/dispatcher.rs:515:9:
+// assertion failed: data.got_closed
+// note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace
@@ -14,7 +14,7 @@ use std::{
};
use anyhow::{Context as _, anyhow};
-use calloop::{LoopSignal, channel::Channel};
+use calloop::LoopSignal;
use futures::channel::oneshot;
use util::ResultExt as _;
use util::command::{new_smol_command, new_std_command};
@@ -25,8 +25,8 @@ use crate::{
Action, AnyWindowHandle, BackgroundExecutor, ClipboardItem, CursorStyle, DisplayId,
ForegroundExecutor, Keymap, LinuxDispatcher, Menu, MenuItem, OwnedMenu, PathPromptOptions,
Pixels, Platform, PlatformDisplay, PlatformKeyboardLayout, PlatformKeyboardMapper,
- PlatformTextSystem, PlatformWindow, Point, Result, RunnableVariant, Task, WindowAppearance,
- WindowParams, px,
+ PlatformTextSystem, PlatformWindow, Point, PriorityQueueCalloopReceiver, Result,
+ RunnableVariant, Task, WindowAppearance, WindowParams, px,
};
#[cfg(any(feature = "wayland", feature = "x11"))]
@@ -149,8 +149,8 @@ pub(crate) struct LinuxCommon {
}
impl LinuxCommon {
- pub fn new(signal: LoopSignal) -> (Self, Channel<RunnableVariant>) {
- let (main_sender, main_receiver) = calloop::channel::channel::<RunnableVariant>();
+ pub fn new(signal: LoopSignal) -> (Self, PriorityQueueCalloopReceiver<RunnableVariant>) {
+ let (main_sender, main_receiver) = PriorityQueueCalloopReceiver::new();
#[cfg(any(feature = "wayland", feature = "x11"))]
let text_system = Arc::new(crate::CosmicTextSystem::new());
@@ -77,10 +77,10 @@ use crate::{
LinuxKeyboardLayout, Modifiers, ModifiersChangedEvent, MouseButton, MouseDownEvent,
MouseExitEvent, MouseMoveEvent, MouseUpEvent, NavigationDirection, Pixels, PlatformDisplay,
PlatformInput, PlatformKeyboardLayout, Point, ResultExt as _, SCROLL_LINES, ScrollDelta,
- ScrollWheelEvent, Size, TouchPhase, WindowParams, point, px, size,
+ ScrollWheelEvent, Size, TouchPhase, WindowParams, point, profiler, px, size,
};
use crate::{
- LinuxDispatcher, RunnableVariant, TaskTiming,
+ RunnableVariant, TaskTiming,
platform::{PlatformWindow, blade::BladeContext},
};
use crate::{
@@ -503,7 +503,7 @@ impl WaylandClient {
start,
end: None,
};
- LinuxDispatcher::add_task_timing(timing);
+ profiler::add_task_timing(timing);
runnable.run();
timing
@@ -515,7 +515,7 @@ impl WaylandClient {
start,
end: None,
};
- LinuxDispatcher::add_task_timing(timing);
+ profiler::add_task_timing(timing);
runnable.run();
timing
@@ -524,7 +524,7 @@ impl WaylandClient {
let end = Instant::now();
timing.end = Some(end);
- LinuxDispatcher::add_task_timing(timing);
+ profiler::add_task_timing(timing);
});
}
}
@@ -1,4 +1,4 @@
-use crate::{Capslock, LinuxDispatcher, ResultExt as _, RunnableVariant, TaskTiming, xcb_flush};
+use crate::{Capslock, ResultExt as _, RunnableVariant, TaskTiming, profiler, xcb_flush};
use anyhow::{Context as _, anyhow};
use ashpd::WindowIdentifier;
use calloop::{
@@ -322,7 +322,7 @@ impl X11Client {
start,
end: None,
};
- LinuxDispatcher::add_task_timing(timing);
+ profiler::add_task_timing(timing);
runnable.run();
timing
@@ -334,7 +334,7 @@ impl X11Client {
start,
end: None,
};
- LinuxDispatcher::add_task_timing(timing);
+ profiler::add_task_timing(timing);
runnable.run();
timing
@@ -343,7 +343,7 @@ impl X11Client {
let end = Instant::now();
timing.end = Some(end);
- LinuxDispatcher::add_task_timing(timing);
+ profiler::add_task_timing(timing);
});
}
}
@@ -3,11 +3,22 @@
#![allow(non_snake_case)]
use crate::{
- GLOBAL_THREAD_TIMINGS, PlatformDispatcher, RunnableMeta, RunnableVariant, THREAD_TIMINGS,
- TaskLabel, TaskTiming, ThreadTaskTimings,
+ GLOBAL_THREAD_TIMINGS, PlatformDispatcher, Priority, RealtimePriority, RunnableMeta,
+ RunnableVariant, THREAD_TIMINGS, TaskLabel, TaskTiming, ThreadTaskTimings,
};
+use anyhow::Context;
use async_task::Runnable;
+use mach2::{
+ kern_return::KERN_SUCCESS,
+ mach_time::mach_timebase_info_data_t,
+ thread_policy::{
+ THREAD_EXTENDED_POLICY, THREAD_EXTENDED_POLICY_COUNT, THREAD_PRECEDENCE_POLICY,
+ THREAD_PRECEDENCE_POLICY_COUNT, THREAD_TIME_CONSTRAINT_POLICY,
+ THREAD_TIME_CONSTRAINT_POLICY_COUNT, thread_extended_policy_data_t,
+ thread_precedence_policy_data_t, thread_time_constraint_policy_data_t,
+ },
+};
use objc::{
class, msg_send,
runtime::{BOOL, YES},
@@ -15,9 +26,11 @@ use objc::{
};
use std::{
ffi::c_void,
+ mem::MaybeUninit,
ptr::{NonNull, addr_of},
time::{Duration, Instant},
};
+use util::ResultExt;
/// All items in the generated file are marked as pub, so we're gonna wrap it in a separate mod to prevent
/// these pub items from leaking into public API.
@@ -56,7 +69,7 @@ impl PlatformDispatcher for MacDispatcher {
is_main_thread == YES
}
- fn dispatch(&self, runnable: RunnableVariant, _: Option<TaskLabel>) {
+ fn dispatch(&self, runnable: RunnableVariant, _: Option<TaskLabel>, priority: Priority) {
let (context, trampoline) = match runnable {
RunnableVariant::Meta(runnable) => (
runnable.into_raw().as_ptr() as *mut c_void,
@@ -67,16 +80,24 @@ impl PlatformDispatcher for MacDispatcher {
Some(trampoline_compat as unsafe extern "C" fn(*mut c_void)),
),
};
+
+ let queue_priority = match priority {
+ Priority::Realtime(_) => unreachable!(),
+ Priority::High => DISPATCH_QUEUE_PRIORITY_HIGH as isize,
+ Priority::Medium => DISPATCH_QUEUE_PRIORITY_DEFAULT as isize,
+ Priority::Low => DISPATCH_QUEUE_PRIORITY_LOW as isize,
+ };
+
unsafe {
dispatch_async_f(
- dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_HIGH.try_into().unwrap(), 0),
+ dispatch_get_global_queue(queue_priority, 0),
context,
trampoline,
);
}
}
- fn dispatch_on_main_thread(&self, runnable: RunnableVariant) {
+ fn dispatch_on_main_thread(&self, runnable: RunnableVariant, _priority: Priority) {
let (context, trampoline) = match runnable {
RunnableVariant::Meta(runnable) => (
runnable.into_raw().as_ptr() as *mut c_void,
@@ -110,6 +131,120 @@ impl PlatformDispatcher for MacDispatcher {
dispatch_after_f(when, queue, context, trampoline);
}
}
+
+ fn spawn_realtime(&self, priority: RealtimePriority, f: Box<dyn FnOnce() + Send>) {
+ std::thread::spawn(move || {
+ match priority {
+ RealtimePriority::Audio => set_audio_thread_priority(),
+ RealtimePriority::Other => set_high_thread_priority(),
+ }
+ .context(format!("for priority {:?}", priority))
+ .log_err();
+
+ f();
+ });
+ }
+}
+
+fn set_high_thread_priority() -> anyhow::Result<()> {
+ // SAFETY: always safe to call
+ let thread_id = unsafe { libc::pthread_self() };
+
+ // SAFETY: all sched_param members are valid when initialized to zero.
+ let mut sched_param = unsafe { MaybeUninit::<libc::sched_param>::zeroed().assume_init() };
+ sched_param.sched_priority = 45;
+
+ let result = unsafe { libc::pthread_setschedparam(thread_id, libc::SCHED_FIFO, &sched_param) };
+ if result != 0 {
+ anyhow::bail!("failed to set realtime thread priority")
+ }
+
+ Ok(())
+}
+
+fn set_audio_thread_priority() -> anyhow::Result<()> {
+ // https://chromium.googlesource.com/chromium/chromium/+/master/base/threading/platform_thread_mac.mm#93
+
+ // SAFETY: always safe to call
+ let thread_id = unsafe { libc::pthread_self() };
+
+ // SAFETY: thread_id is a valid thread id
+ let thread_id = unsafe { libc::pthread_mach_thread_np(thread_id) };
+
+ // Fixed priority thread
+ let mut policy = thread_extended_policy_data_t { timeshare: 0 };
+
+ // SAFETY: thread_id is a valid thread id
+ // SAFETY: thread_extended_policy_data_t is passed as THREAD_EXTENDED_POLICY
+ let result = unsafe {
+ mach2::thread_policy::thread_policy_set(
+ thread_id,
+ THREAD_EXTENDED_POLICY,
+ &mut policy as *mut _ as *mut _,
+ THREAD_EXTENDED_POLICY_COUNT,
+ )
+ };
+
+ if result != KERN_SUCCESS {
+ anyhow::bail!("failed to set thread extended policy");
+ }
+
+ // relatively high priority
+ let mut precedence = thread_precedence_policy_data_t { importance: 63 };
+
+ // SAFETY: thread_id is a valid thread id
+ // SAFETY: thread_precedence_policy_data_t is passed as THREAD_PRECEDENCE_POLICY
+ let result = unsafe {
+ mach2::thread_policy::thread_policy_set(
+ thread_id,
+ THREAD_PRECEDENCE_POLICY,
+ &mut precedence as *mut _ as *mut _,
+ THREAD_PRECEDENCE_POLICY_COUNT,
+ )
+ };
+
+ if result != KERN_SUCCESS {
+ anyhow::bail!("failed to set thread precedence policy");
+ }
+
+ const GUARANTEED_AUDIO_DUTY_CYCLE: f32 = 0.75;
+ const MAX_AUDIO_DUTY_CYCLE: f32 = 0.85;
+
+ // ~128 frames @ 44.1KHz
+ const TIME_QUANTUM: f32 = 2.9;
+
+ const AUDIO_TIME_NEEDED: f32 = GUARANTEED_AUDIO_DUTY_CYCLE * TIME_QUANTUM;
+ const MAX_TIME_ALLOWED: f32 = MAX_AUDIO_DUTY_CYCLE * TIME_QUANTUM;
+
+ let mut timebase_info = mach_timebase_info_data_t { numer: 0, denom: 0 };
+ // SAFETY: timebase_info is a valid pointer to a mach_timebase_info_data_t struct
+ unsafe { mach2::mach_time::mach_timebase_info(&mut timebase_info) };
+
+ let ms_to_abs_time = ((timebase_info.denom as f32) / (timebase_info.numer as f32)) * 1000000f32;
+
+ let mut time_constraints = thread_time_constraint_policy_data_t {
+ period: (TIME_QUANTUM * ms_to_abs_time) as u32,
+ computation: (AUDIO_TIME_NEEDED * ms_to_abs_time) as u32,
+ constraint: (MAX_TIME_ALLOWED * ms_to_abs_time) as u32,
+ preemptible: 0,
+ };
+
+ // SAFETY: thread_id is a valid thread id
+ // SAFETY: thread_precedence_pthread_time_constraint_policy_data_t is passed as THREAD_TIME_CONSTRAINT_POLICY
+ let result = unsafe {
+ mach2::thread_policy::thread_policy_set(
+ thread_id,
+ THREAD_TIME_CONSTRAINT_POLICY,
+ &mut time_constraints as *mut _ as *mut _,
+ THREAD_TIME_CONSTRAINT_POLICY_COUNT,
+ )
+ };
+
+ if result != KERN_SUCCESS {
+ anyhow::bail!("failed to set thread time constraint policy");
+ }
+
+ Ok(())
}
extern "C" fn trampoline(runnable: *mut c_void) {
@@ -1,4 +1,4 @@
-use crate::{PlatformDispatcher, RunnableVariant, TaskLabel};
+use crate::{PlatformDispatcher, Priority, RunnableVariant, TaskLabel};
use backtrace::Backtrace;
use collections::{HashMap, HashSet, VecDeque};
use parking::Unparker;
@@ -284,7 +284,7 @@ impl PlatformDispatcher for TestDispatcher {
state.start_time + state.time
}
- fn dispatch(&self, runnable: RunnableVariant, label: Option<TaskLabel>) {
+ fn dispatch(&self, runnable: RunnableVariant, label: Option<TaskLabel>, _priority: Priority) {
{
let mut state = self.state.lock();
if label.is_some_and(|label| state.deprioritized_task_labels.contains(&label)) {
@@ -296,7 +296,7 @@ impl PlatformDispatcher for TestDispatcher {
self.unpark_all();
}
- fn dispatch_on_main_thread(&self, runnable: RunnableVariant) {
+ fn dispatch_on_main_thread(&self, runnable: RunnableVariant, _priority: Priority) {
self.state
.lock()
.foreground
@@ -318,4 +318,10 @@ impl PlatformDispatcher for TestDispatcher {
fn as_test(&self) -> Option<&TestDispatcher> {
Some(self)
}
+
+ fn spawn_realtime(&self, _priority: crate::RealtimePriority, f: Box<dyn FnOnce() + Send>) {
+ std::thread::spawn(move || {
+ f();
+ });
+ }
}
@@ -4,24 +4,31 @@ use std::{
time::{Duration, Instant},
};
-use flume::Sender;
+use anyhow::Context;
use util::ResultExt;
use windows::{
- System::Threading::{ThreadPool, ThreadPoolTimer, TimerElapsedHandler, WorkItemHandler},
+ System::Threading::{
+ ThreadPool, ThreadPoolTimer, TimerElapsedHandler, WorkItemHandler, WorkItemPriority,
+ },
Win32::{
Foundation::{LPARAM, WPARAM},
+ System::Threading::{
+ GetCurrentThread, HIGH_PRIORITY_CLASS, SetPriorityClass, SetThreadPriority,
+ THREAD_PRIORITY_HIGHEST, THREAD_PRIORITY_TIME_CRITICAL,
+ },
UI::WindowsAndMessaging::PostMessageW,
},
};
use crate::{
- GLOBAL_THREAD_TIMINGS, HWND, PlatformDispatcher, RunnableVariant, SafeHwnd, THREAD_TIMINGS,
- TaskLabel, TaskTiming, ThreadTaskTimings, WM_GPUI_TASK_DISPATCHED_ON_MAIN_THREAD,
+ GLOBAL_THREAD_TIMINGS, HWND, PlatformDispatcher, Priority, PriorityQueueSender,
+ RealtimePriority, RunnableVariant, SafeHwnd, THREAD_TIMINGS, TaskLabel, TaskTiming,
+ ThreadTaskTimings, WM_GPUI_TASK_DISPATCHED_ON_MAIN_THREAD, profiler,
};
pub(crate) struct WindowsDispatcher {
pub(crate) wake_posted: AtomicBool,
- main_sender: Sender<RunnableVariant>,
+ main_sender: PriorityQueueSender<RunnableVariant>,
main_thread_id: ThreadId,
pub(crate) platform_window_handle: SafeHwnd,
validation_number: usize,
@@ -29,7 +36,7 @@ pub(crate) struct WindowsDispatcher {
impl WindowsDispatcher {
pub(crate) fn new(
- main_sender: Sender<RunnableVariant>,
+ main_sender: PriorityQueueSender<RunnableVariant>,
platform_window_handle: HWND,
validation_number: usize,
) -> Self {
@@ -45,7 +52,7 @@ impl WindowsDispatcher {
}
}
- fn dispatch_on_threadpool(&self, runnable: RunnableVariant) {
+ fn dispatch_on_threadpool(&self, priority: WorkItemPriority, runnable: RunnableVariant) {
let handler = {
let mut task_wrapper = Some(runnable);
WorkItemHandler::new(move |_| {
@@ -53,7 +60,8 @@ impl WindowsDispatcher {
Ok(())
})
};
- ThreadPool::RunAsync(&handler).log_err();
+
+ ThreadPool::RunWithPriorityAsync(&handler, priority).log_err();
}
fn dispatch_on_threadpool_after(&self, runnable: RunnableVariant, duration: Duration) {
@@ -79,7 +87,7 @@ impl WindowsDispatcher {
start,
end: None,
};
- Self::add_task_timing(timing);
+ profiler::add_task_timing(timing);
runnable.run();
@@ -91,7 +99,7 @@ impl WindowsDispatcher {
start,
end: None,
};
- Self::add_task_timing(timing);
+ profiler::add_task_timing(timing);
runnable.run();
@@ -102,23 +110,7 @@ impl WindowsDispatcher {
let end = Instant::now();
timing.end = Some(end);
- Self::add_task_timing(timing);
- }
-
- pub(crate) fn add_task_timing(timing: TaskTiming) {
- THREAD_TIMINGS.with(|timings| {
- let mut timings = timings.lock();
- let timings = &mut timings.timings;
-
- if let Some(last_timing) = timings.iter_mut().rev().next() {
- if last_timing.location == timing.location {
- last_timing.end = timing.end;
- return;
- }
- }
-
- timings.push_back(timing);
- });
+ profiler::add_task_timing(timing);
}
}
@@ -146,15 +138,22 @@ impl PlatformDispatcher for WindowsDispatcher {
current().id() == self.main_thread_id
}
- fn dispatch(&self, runnable: RunnableVariant, label: Option<TaskLabel>) {
- self.dispatch_on_threadpool(runnable);
+ fn dispatch(&self, runnable: RunnableVariant, label: Option<TaskLabel>, priority: Priority) {
+ let priority = match priority {
+ Priority::Realtime(_) => unreachable!(),
+ Priority::High => WorkItemPriority::High,
+ Priority::Medium => WorkItemPriority::Normal,
+ Priority::Low => WorkItemPriority::Low,
+ };
+ self.dispatch_on_threadpool(priority, runnable);
+
if let Some(label) = label {
log::debug!("TaskLabel: {label:?}");
}
}
- fn dispatch_on_main_thread(&self, runnable: RunnableVariant) {
- match self.main_sender.send(runnable) {
+ fn dispatch_on_main_thread(&self, runnable: RunnableVariant, priority: Priority) {
+ match self.main_sender.send(priority, runnable) {
Ok(_) => {
if !self.wake_posted.swap(true, Ordering::AcqRel) {
unsafe {
@@ -185,4 +184,28 @@ impl PlatformDispatcher for WindowsDispatcher {
fn dispatch_after(&self, duration: Duration, runnable: RunnableVariant) {
self.dispatch_on_threadpool_after(runnable, duration);
}
+
+ fn spawn_realtime(&self, priority: RealtimePriority, f: Box<dyn FnOnce() + Send>) {
+ std::thread::spawn(move || {
+ // SAFETY: always safe to call
+ let thread_handle = unsafe { GetCurrentThread() };
+
+ let thread_priority = match priority {
+ RealtimePriority::Audio => THREAD_PRIORITY_TIME_CRITICAL,
+ RealtimePriority::Other => THREAD_PRIORITY_HIGHEST,
+ };
+
+ // SAFETY: thread_handle is a valid handle to a thread
+ unsafe { SetPriorityClass(thread_handle, HIGH_PRIORITY_CLASS) }
+ .context("thread priority class")
+ .log_err();
+
+ // SAFETY: thread_handle is a valid handle to a thread
+ unsafe { SetThreadPriority(thread_handle, thread_priority) }
+ .context("thread priority")
+ .log_err();
+
+ f();
+ });
+ }
}
@@ -243,7 +243,8 @@ impl WindowsWindowInner {
fn handle_timer_msg(&self, handle: HWND, wparam: WPARAM) -> Option<isize> {
if wparam.0 == SIZE_MOVE_LOOP_TIMER_ID {
- for runnable in self.main_receiver.drain() {
+ let mut runnables = self.main_receiver.clone().try_iter();
+ while let Some(Ok(runnable)) = runnables.next() {
WindowsDispatcher::execute_runnable(runnable);
}
self.handle_paint_msg(handle)
@@ -51,7 +51,7 @@ struct WindowsPlatformInner {
raw_window_handles: std::sync::Weak<RwLock<SmallVec<[SafeHwnd; 4]>>>,
// The below members will never change throughout the entire lifecycle of the app.
validation_number: usize,
- main_receiver: flume::Receiver<RunnableVariant>,
+ main_receiver: PriorityQueueReceiver<RunnableVariant>,
dispatcher: Arc<WindowsDispatcher>,
}
@@ -98,7 +98,7 @@ impl WindowsPlatform {
OleInitialize(None).context("unable to initialize Windows OLE")?;
}
let directx_devices = DirectXDevices::new().context("Creating DirectX devices")?;
- let (main_sender, main_receiver) = flume::unbounded::<RunnableVariant>();
+ let (main_sender, main_receiver) = PriorityQueueReceiver::new();
let validation_number = if usize::BITS == 64 {
rand::random::<u64>() as usize
} else {
@@ -857,22 +857,24 @@ impl WindowsPlatformInner {
}
break 'tasks;
}
- match self.main_receiver.try_recv() {
- Err(_) => break 'timeout_loop,
- Ok(runnable) => WindowsDispatcher::execute_runnable(runnable),
+ let mut main_receiver = self.main_receiver.clone();
+ match main_receiver.try_pop() {
+ Ok(Some(runnable)) => WindowsDispatcher::execute_runnable(runnable),
+ _ => break 'timeout_loop,
}
}
// Someone could enqueue a Runnable here. The flag is still true, so they will not PostMessage.
// We need to check for those Runnables after we clear the flag.
self.dispatcher.wake_posted.store(false, Ordering::Release);
- match self.main_receiver.try_recv() {
- Err(_) => break 'tasks,
- Ok(runnable) => {
+ let mut main_receiver = self.main_receiver.clone();
+ match main_receiver.try_pop() {
+ Ok(Some(runnable)) => {
self.dispatcher.wake_posted.store(true, Ordering::Release);
WindowsDispatcher::execute_runnable(runnable);
}
+ _ => break 'tasks,
}
}
@@ -934,7 +936,7 @@ pub(crate) struct WindowCreationInfo {
pub(crate) windows_version: WindowsVersion,
pub(crate) drop_target_helper: IDropTargetHelper,
pub(crate) validation_number: usize,
- pub(crate) main_receiver: flume::Receiver<RunnableVariant>,
+ pub(crate) main_receiver: PriorityQueueReceiver<RunnableVariant>,
pub(crate) platform_window_handle: HWND,
pub(crate) disable_direct_composition: bool,
pub(crate) directx_devices: DirectXDevices,
@@ -947,8 +949,8 @@ struct PlatformWindowCreateContext {
inner: Option<Result<Rc<WindowsPlatformInner>>>,
raw_window_handles: std::sync::Weak<RwLock<SmallVec<[SafeHwnd; 4]>>>,
validation_number: usize,
- main_sender: Option<flume::Sender<RunnableVariant>>,
- main_receiver: Option<flume::Receiver<RunnableVariant>>,
+ main_sender: Option<PriorityQueueSender<RunnableVariant>>,
+ main_receiver: Option<PriorityQueueReceiver<RunnableVariant>>,
directx_devices: Option<DirectXDevices>,
dispatcher: Option<Arc<WindowsDispatcher>>,
}
@@ -81,7 +81,7 @@ pub(crate) struct WindowsWindowInner {
pub(crate) executor: ForegroundExecutor,
pub(crate) windows_version: WindowsVersion,
pub(crate) validation_number: usize,
- pub(crate) main_receiver: flume::Receiver<RunnableVariant>,
+ pub(crate) main_receiver: PriorityQueueReceiver<RunnableVariant>,
pub(crate) platform_window_handle: HWND,
}
@@ -362,7 +362,7 @@ struct WindowCreateContext {
windows_version: WindowsVersion,
drop_target_helper: IDropTargetHelper,
validation_number: usize,
- main_receiver: flume::Receiver<RunnableVariant>,
+ main_receiver: PriorityQueueReceiver<RunnableVariant>,
platform_window_handle: HWND,
appearance: WindowAppearance,
disable_direct_composition: bool,
@@ -216,3 +216,19 @@ impl Drop for ThreadTimings {
thread_timings.swap_remove(index);
}
}
+
+pub(crate) fn add_task_timing(timing: TaskTiming) {
+ THREAD_TIMINGS.with(|timings| {
+ let mut timings = timings.lock();
+ let timings = &mut timings.timings;
+
+ if let Some(last_timing) = timings.iter_mut().rev().next() {
+ if last_timing.location == timing.location {
+ last_timing.end = timing.end;
+ return;
+ }
+ }
+
+ timings.push_back(timing);
+ });
+}
@@ -0,0 +1,329 @@
+use std::{
+ fmt,
+ iter::FusedIterator,
+ sync::{Arc, atomic::AtomicUsize},
+};
+
+use rand::{Rng, SeedableRng, rngs::SmallRng};
+
+use crate::Priority;
+
+struct PriorityQueues<T> {
+ high_priority: Vec<T>,
+ medium_priority: Vec<T>,
+ low_priority: Vec<T>,
+}
+
+impl<T> PriorityQueues<T> {
+ fn is_empty(&self) -> bool {
+ self.high_priority.is_empty()
+ && self.medium_priority.is_empty()
+ && self.low_priority.is_empty()
+ }
+}
+
+struct PriorityQueueState<T> {
+ queues: parking_lot::Mutex<PriorityQueues<T>>,
+ condvar: parking_lot::Condvar,
+ receiver_count: AtomicUsize,
+ sender_count: AtomicUsize,
+}
+
+impl<T> PriorityQueueState<T> {
+ fn send(&self, priority: Priority, item: T) -> Result<(), SendError<T>> {
+ if self
+ .receiver_count
+ .load(std::sync::atomic::Ordering::Relaxed)
+ == 0
+ {
+ return Err(SendError(item));
+ }
+
+ let mut queues = self.queues.lock();
+ match priority {
+ Priority::Realtime(_) => unreachable!(),
+ Priority::High => queues.high_priority.push(item),
+ Priority::Medium => queues.medium_priority.push(item),
+ Priority::Low => queues.low_priority.push(item),
+ };
+ self.condvar.notify_one();
+ Ok(())
+ }
+
+ fn recv<'a>(&'a self) -> Result<parking_lot::MutexGuard<'a, PriorityQueues<T>>, RecvError> {
+ let mut queues = self.queues.lock();
+
+ let sender_count = self.sender_count.load(std::sync::atomic::Ordering::Relaxed);
+ if queues.is_empty() && sender_count == 0 {
+ return Err(crate::queue::RecvError);
+ }
+
+ // parking_lot doesn't do spurious wakeups so an if is fine
+ if queues.is_empty() {
+ self.condvar.wait(&mut queues);
+ }
+
+ Ok(queues)
+ }
+
+ fn try_recv<'a>(
+ &'a self,
+ ) -> Result<Option<parking_lot::MutexGuard<'a, PriorityQueues<T>>>, RecvError> {
+ let mut queues = self.queues.lock();
+
+ let sender_count = self.sender_count.load(std::sync::atomic::Ordering::Relaxed);
+ if queues.is_empty() && sender_count == 0 {
+ return Err(crate::queue::RecvError);
+ }
+
+ if queues.is_empty() {
+ Ok(None)
+ } else {
+ Ok(Some(queues))
+ }
+ }
+}
+
+pub(crate) struct PriorityQueueSender<T> {
+ state: Arc<PriorityQueueState<T>>,
+}
+
+impl<T> PriorityQueueSender<T> {
+ fn new(state: Arc<PriorityQueueState<T>>) -> Self {
+ Self { state }
+ }
+
+ pub(crate) fn send(&self, priority: Priority, item: T) -> Result<(), SendError<T>> {
+ self.state.send(priority, item)?;
+ Ok(())
+ }
+}
+
+impl<T> Drop for PriorityQueueSender<T> {
+ fn drop(&mut self) {
+ self.state
+ .sender_count
+ .fetch_sub(1, std::sync::atomic::Ordering::AcqRel);
+ }
+}
+
+pub(crate) struct PriorityQueueReceiver<T> {
+ state: Arc<PriorityQueueState<T>>,
+ rand: SmallRng,
+ disconnected: bool,
+}
+
+impl<T> Clone for PriorityQueueReceiver<T> {
+ fn clone(&self) -> Self {
+ self.state
+ .receiver_count
+ .fetch_add(1, std::sync::atomic::Ordering::AcqRel);
+ Self {
+ state: Arc::clone(&self.state),
+ rand: SmallRng::seed_from_u64(0),
+ disconnected: self.disconnected,
+ }
+ }
+}
+
+pub(crate) struct SendError<T>(T);
+
+impl<T: fmt::Debug> fmt::Debug for SendError<T> {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_tuple("SendError").field(&self.0).finish()
+ }
+}
+
+#[derive(Debug)]
+pub(crate) struct RecvError;
+
+#[allow(dead_code)]
+impl<T> PriorityQueueReceiver<T> {
+ pub(crate) fn new() -> (PriorityQueueSender<T>, Self) {
+ let state = PriorityQueueState {
+ queues: parking_lot::Mutex::new(PriorityQueues {
+ high_priority: Vec::new(),
+ medium_priority: Vec::new(),
+ low_priority: Vec::new(),
+ }),
+ condvar: parking_lot::Condvar::new(),
+ receiver_count: AtomicUsize::new(1),
+ sender_count: AtomicUsize::new(1),
+ };
+ let state = Arc::new(state);
+
+ let sender = PriorityQueueSender::new(Arc::clone(&state));
+
+ let receiver = PriorityQueueReceiver {
+ state,
+ rand: SmallRng::seed_from_u64(0),
+ disconnected: false,
+ };
+
+ (sender, receiver)
+ }
+
+ /// Tries to pop one element from the priority queue without blocking.
+ ///
+ /// This will early return if there are no elements in the queue.
+ ///
+ /// This method is best suited if you only intend to pop one element, for better performance
+ /// on large queues see [`Self::try_iter`]
+ ///
+ /// # Errors
+ ///
+ /// If the sender was dropped
+ pub(crate) fn try_pop(&mut self) -> Result<Option<T>, RecvError> {
+ self.pop_inner(false)
+ }
+
+ /// Pops an element from the priority queue blocking if necessary.
+ ///
+ /// This method is best suited if you only intend to pop one element, for better performance
+ /// on large queues see [`Self::iter``]
+ ///
+ /// # Errors
+ ///
+ /// If the sender was dropped
+ pub(crate) fn pop(&mut self) -> Result<T, RecvError> {
+ self.pop_inner(true).map(|e| e.unwrap())
+ }
+
+ /// Returns an iterator over the elements of the queue
+ /// this iterator will end when all elements have been consumed and will not wait for new ones.
+ pub(crate) fn try_iter(self) -> TryIter<T> {
+ TryIter {
+ receiver: self,
+ ended: false,
+ }
+ }
+
+ /// Returns an iterator over the elements of the queue
+ /// this iterator will wait for new elements if the queue is empty.
+ pub(crate) fn iter(self) -> Iter<T> {
+ Iter(self)
+ }
+
+ #[inline(always)]
+ // algorithm is the loaded die from biased coin from
+ // https://www.keithschwarz.com/darts-dice-coins/
+ fn pop_inner(&mut self, block: bool) -> Result<Option<T>, RecvError> {
+ use Priority as P;
+
+ let mut queues = if !block {
+ let Some(queues) = self.state.try_recv()? else {
+ return Ok(None);
+ };
+ queues
+ } else {
+ self.state.recv()?
+ };
+
+ let high = P::High.probability() * !queues.high_priority.is_empty() as u32;
+ let medium = P::Medium.probability() * !queues.medium_priority.is_empty() as u32;
+ let low = P::Low.probability() * !queues.low_priority.is_empty() as u32;
+ let mut mass = high + medium + low; //%
+
+ if !queues.high_priority.is_empty() {
+ let flip = self.rand.random_ratio(P::High.probability(), mass);
+ if flip {
+ return Ok(queues.high_priority.pop());
+ }
+ mass -= P::High.probability();
+ }
+
+ if !queues.medium_priority.is_empty() {
+ let flip = self.rand.random_ratio(P::Medium.probability(), mass);
+ if flip {
+ return Ok(queues.medium_priority.pop());
+ }
+ mass -= P::Medium.probability();
+ }
+
+ if !queues.low_priority.is_empty() {
+ let flip = self.rand.random_ratio(P::Low.probability(), mass);
+ if flip {
+ return Ok(queues.low_priority.pop());
+ }
+ }
+
+ Ok(None)
+ }
+}
+
+impl<T> Drop for PriorityQueueReceiver<T> {
+ fn drop(&mut self) {
+ self.state
+ .receiver_count
+ .fetch_sub(1, std::sync::atomic::Ordering::AcqRel);
+ }
+}
+
+/// If None is returned the sender disconnected
+pub(crate) struct Iter<T>(PriorityQueueReceiver<T>);
+impl<T> Iterator for Iter<T> {
+ type Item = T;
+
+ fn next(&mut self) -> Option<Self::Item> {
+ self.0.pop_inner(true).ok().flatten()
+ }
+}
+impl<T> FusedIterator for Iter<T> {}
+
+/// If None is returned there are no more elements in the queue
+pub(crate) struct TryIter<T> {
+ receiver: PriorityQueueReceiver<T>,
+ ended: bool,
+}
+impl<T> Iterator for TryIter<T> {
+ type Item = Result<T, RecvError>;
+
+ fn next(&mut self) -> Option<Self::Item> {
+ if self.ended {
+ return None;
+ }
+
+ let res = self.receiver.pop_inner(false);
+ self.ended = res.is_err();
+
+ res.transpose()
+ }
+}
+impl<T> FusedIterator for TryIter<T> {}
+
+#[cfg(test)]
+mod tests {
+ use collections::HashSet;
+
+ use super::*;
+
+ #[test]
+ fn all_tasks_get_yielded() {
+ let (tx, mut rx) = PriorityQueueReceiver::new();
+ tx.send(Priority::Medium, 20).unwrap();
+ tx.send(Priority::High, 30).unwrap();
+ tx.send(Priority::Low, 10).unwrap();
+ tx.send(Priority::Medium, 21).unwrap();
+ tx.send(Priority::High, 31).unwrap();
+
+ drop(tx);
+
+ assert_eq!(
+ rx.iter().collect::<HashSet<_>>(),
+ [30, 31, 20, 21, 10].into_iter().collect::<HashSet<_>>()
+ )
+ }
+
+ #[test]
+ fn new_high_prio_task_get_scheduled_quickly() {
+ let (tx, mut rx) = PriorityQueueReceiver::new();
+ for _ in 0..100 {
+ tx.send(Priority::Low, 1).unwrap();
+ }
+
+ assert_eq!(rx.pop().unwrap(), 1);
+ tx.send(Priority::High, 3).unwrap();
+ assert_eq!(rx.pop().unwrap(), 3);
+ assert_eq!(rx.pop().unwrap(), 1);
+ }
+}
@@ -9,14 +9,15 @@ use crate::{
KeyBinding, KeyContext, KeyDownEvent, KeyEvent, Keystroke, KeystrokeEvent, LayoutId,
LineLayoutIndex, Modifiers, ModifiersChangedEvent, MonochromeSprite, MouseButton, MouseEvent,
MouseMoveEvent, MouseUpEvent, Path, Pixels, PlatformAtlas, PlatformDisplay, PlatformInput,
- PlatformInputHandler, PlatformWindow, Point, PolychromeSprite, PromptButton, PromptLevel, Quad,
- Render, RenderGlyphParams, RenderImage, RenderImageParams, RenderSvgParams, Replay, ResizeEdge,
- SMOOTH_SVG_SCALE_FACTOR, SUBPIXEL_VARIANTS_X, SUBPIXEL_VARIANTS_Y, ScaledPixels, Scene, Shadow,
- SharedString, Size, StrikethroughStyle, Style, SubscriberSet, Subscription, SystemWindowTab,
- SystemWindowTabController, TabStopMap, TaffyLayoutEngine, Task, TextStyle, TextStyleRefinement,
- TransformationMatrix, Underline, UnderlineStyle, WindowAppearance, WindowBackgroundAppearance,
- WindowBounds, WindowControls, WindowDecorations, WindowOptions, WindowParams, WindowTextSystem,
- point, prelude::*, px, rems, size, transparent_black,
+ PlatformInputHandler, PlatformWindow, Point, PolychromeSprite, Priority, PromptButton,
+ PromptLevel, Quad, Render, RenderGlyphParams, RenderImage, RenderImageParams, RenderSvgParams,
+ Replay, ResizeEdge, SMOOTH_SVG_SCALE_FACTOR, SUBPIXEL_VARIANTS_X, SUBPIXEL_VARIANTS_Y,
+ ScaledPixels, Scene, Shadow, SharedString, Size, StrikethroughStyle, Style, SubscriberSet,
+ Subscription, SystemWindowTab, SystemWindowTabController, TabStopMap, TaffyLayoutEngine, Task,
+ TextStyle, TextStyleRefinement, TransformationMatrix, Underline, UnderlineStyle,
+ WindowAppearance, WindowBackgroundAppearance, WindowBounds, WindowControls, WindowDecorations,
+ WindowOptions, WindowParams, WindowTextSystem, point, prelude::*, px, rems, size,
+ transparent_black,
};
use anyhow::{Context as _, Result, anyhow};
use collections::{FxHashMap, FxHashSet};
@@ -1725,6 +1726,27 @@ impl Window {
})
}
+ /// Spawn the future returned by the given closure on the application thread
+ /// pool, with the given priority. The closure is provided a handle to the
+ /// current window and an `AsyncWindowContext` for use within your future.
+ #[track_caller]
+ pub fn spawn_with_priority<AsyncFn, R>(
+ &self,
+ priority: Priority,
+ cx: &App,
+ f: AsyncFn,
+ ) -> Task<R>
+ where
+ R: 'static,
+ AsyncFn: AsyncFnOnce(&mut AsyncWindowContext) -> R + 'static,
+ {
+ let handle = self.handle;
+ cx.spawn_with_priority(priority, async move |app| {
+ let mut async_window_cx = AsyncWindowContext::new_context(app.clone(), handle);
+ f(&mut async_window_cx).await
+ })
+ }
+
fn bounds_changed(&mut self, cx: &mut App) {
self.scale_factor = self.platform_window.scale_factor();
self.viewport_size = self.platform_window.content_size();
@@ -12,7 +12,7 @@ mod session;
use std::{sync::Arc, time::Duration};
use async_dispatcher::{Dispatcher, Runnable, set_dispatcher};
-use gpui::{App, PlatformDispatcher, RunnableVariant};
+use gpui::{App, PlatformDispatcher, Priority, RunnableVariant};
use project::Fs;
pub use runtimelib::ExecutionState;
@@ -46,7 +46,7 @@ fn zed_dispatcher(cx: &mut App) -> impl Dispatcher {
impl Dispatcher for ZedDispatcher {
fn dispatch(&self, runnable: Runnable) {
self.dispatcher
- .dispatch(RunnableVariant::Compat(runnable), None);
+ .dispatch(RunnableVariant::Compat(runnable), None, Priority::default());
}
fn dispatch_after(&self, duration: Duration, runnable: Runnable) {
@@ -22,7 +22,8 @@ use git::{
COMMIT_MESSAGE, DOT_GIT, FSMONITOR_DAEMON, GITIGNORE, INDEX_LOCK, LFS_DIR, status::GitSummary,
};
use gpui::{
- App, AppContext as _, AsyncApp, BackgroundExecutor, Context, Entity, EventEmitter, Task,
+ App, AppContext as _, AsyncApp, BackgroundExecutor, Context, Entity, EventEmitter, Priority,
+ Task,
};
use ignore::IgnoreStack;
use language::DiskState;
@@ -4144,7 +4145,7 @@ impl BackgroundScanner {
let progress_update_count = AtomicUsize::new(0);
self.executor
- .scoped(|scope| {
+ .scoped_priority(Priority::Low, |scope| {
for _ in 0..self.executor.num_cpus() {
scope.spawn(async {
let mut last_progress_update_count = 0;
@@ -52,6 +52,8 @@ extend-exclude = [
"crates/project_panel/benches/linux_repo_snapshot.txt",
# Some multibuffer test cases have word fragments that register as typos
"crates/multi_buffer/src/multi_buffer_tests.rs",
+ # Macos apis
+ "crates/gpui/src/platform/mac/dispatcher.rs",
]
[default]