Cargo.lock 🔗
@@ -7239,7 +7239,6 @@ dependencies = [
"libc",
"log",
"lyon",
- "mach2 0.5.0",
"media",
"metal",
"naga",
Yara 🏳️⚧️ created
Reverts zed-industries/zed#44575
Cargo.lock | 1
crates/gpui/Cargo.toml | 5
crates/gpui/build.rs | 2
crates/gpui/src/app.rs | 27 -
crates/gpui/src/app/context.rs | 23 -
crates/gpui/src/executor.rs | 162 --------
crates/gpui/src/gpui.rs | 9
crates/gpui/src/platform.rs | 12
crates/gpui/src/platform/linux/dispatcher.rs | 329 ++---------------
crates/gpui/src/platform/linux/platform.rs | 10
crates/gpui/src/platform/linux/wayland/client.rs | 10
crates/gpui/src/platform/linux/x11/client.rs | 8
crates/gpui/src/platform/mac/dispatcher.rs | 145 -------
crates/gpui/src/platform/test/dispatcher.rs | 12
crates/gpui/src/platform/windows/dispatcher.rs | 85 +--
crates/gpui/src/platform/windows/events.rs | 3
crates/gpui/src/platform/windows/platform.rs | 24
crates/gpui/src/platform/windows/window.rs | 4
crates/gpui/src/profiler.rs | 16
crates/gpui/src/queue.rs | 329 ------------------
crates/gpui/src/window.rs | 38 -
crates/repl/src/repl.rs | 4
crates/worktree/src/worktree.rs | 5
typos.toml | 2
24 files changed, 154 insertions(+), 1,111 deletions(-)
@@ -7239,7 +7239,6 @@ dependencies = [
"libc",
"log",
"lyon",
- "mach2 0.5.0",
"media",
"metal",
"naga",
@@ -21,6 +21,7 @@ default = ["font-kit", "wayland", "x11", "windows-manifest"]
test-support = [
"leak-detection",
"collections/test-support",
+ "rand",
"util/test-support",
"http_client/test-support",
"wayland",
@@ -108,7 +109,7 @@ parking = "2.0.0"
parking_lot.workspace = true
postage.workspace = true
profiling.workspace = true
-rand.workspace = true
+rand = { optional = true, workspace = true }
raw-window-handle = "0.6"
refineable.workspace = true
resvg = { version = "0.45.0", default-features = false, features = [
@@ -157,10 +158,8 @@ media.workspace = true
objc.workspace = true
objc2 = { version = "0.6", optional = true }
objc2-metal = { version = "0.3", optional = true }
-mach2.workspace = true
#TODO: replace with "objc2"
metal.workspace = true
-flume = "0.11"
[target.'cfg(any(target_os = "linux", target_os = "freebsd", target_os = "macos"))'.dependencies]
pathfinder_geometry = "0.5"
@@ -84,8 +84,6 @@ mod macos {
.allowlist_var("_dispatch_main_q")
.allowlist_var("_dispatch_source_type_data_add")
.allowlist_var("DISPATCH_QUEUE_PRIORITY_HIGH")
- .allowlist_var("DISPATCH_QUEUE_PRIORITY_DEFAULT")
- .allowlist_var("DISPATCH_QUEUE_PRIORITY_LOW")
.allowlist_var("DISPATCH_TIME_NOW")
.allowlist_function("dispatch_get_global_queue")
.allowlist_function("dispatch_async_f")
@@ -38,11 +38,10 @@ use crate::{
AssetSource, BackgroundExecutor, Bounds, ClipboardItem, CursorStyle, DispatchPhase, DisplayId,
EventEmitter, FocusHandle, FocusMap, ForegroundExecutor, Global, KeyBinding, KeyContext,
Keymap, Keystroke, LayoutId, Menu, MenuItem, OwnedMenu, PathPromptOptions, Pixels, Platform,
- PlatformDisplay, PlatformKeyboardLayout, PlatformKeyboardMapper, Point, Priority,
- PromptBuilder, PromptButton, PromptHandle, PromptLevel, Render, RenderImage,
- RenderablePromptHandle, Reservation, ScreenCaptureSource, SharedString, SubscriberSet,
- Subscription, SvgRenderer, Task, TextSystem, Window, WindowAppearance, WindowHandle, WindowId,
- WindowInvalidator,
+ PlatformDisplay, PlatformKeyboardLayout, PlatformKeyboardMapper, Point, PromptBuilder,
+ PromptButton, PromptHandle, PromptLevel, Render, RenderImage, RenderablePromptHandle,
+ Reservation, ScreenCaptureSource, SharedString, SubscriberSet, Subscription, SvgRenderer, Task,
+ TextSystem, Window, WindowAppearance, WindowHandle, WindowId, WindowInvalidator,
colors::{Colors, GlobalColors},
current_platform, hash, init_app_menus,
};
@@ -1495,24 +1494,6 @@ impl App {
.spawn(async move { f(&mut cx).await })
}
- /// Spawns the future returned by the given function on the main thread with
- /// the given priority. The closure will be invoked with [AsyncApp], which
- /// allows the application state to be accessed across await points.
- pub fn spawn_with_priority<AsyncFn, R>(&self, priority: Priority, f: AsyncFn) -> Task<R>
- where
- AsyncFn: AsyncFnOnce(&mut AsyncApp) -> R + 'static,
- R: 'static,
- {
- if self.quitting {
- debug_panic!("Can't spawn on main thread after on_app_quit")
- };
-
- let mut cx = self.to_async();
-
- self.foreground_executor
- .spawn_with_priority(priority, async move { f(&mut cx).await })
- }
-
/// Schedules the given function to be run at the end of the current effect cycle, allowing entities
/// that are currently on the stack to be returned to the app.
pub fn defer(&mut self, f: impl FnOnce(&mut App) + 'static) {
@@ -1,7 +1,7 @@
use crate::{
AnyView, AnyWindowHandle, AppContext, AsyncApp, DispatchPhase, Effect, EntityId, EventEmitter,
- FocusHandle, FocusOutEvent, Focusable, Global, KeystrokeObserver, Priority, Reservation,
- SubscriberSet, Subscription, Task, WeakEntity, WeakFocusHandle, Window, WindowHandle,
+ FocusHandle, FocusOutEvent, Focusable, Global, KeystrokeObserver, Reservation, SubscriberSet,
+ Subscription, Task, WeakEntity, WeakFocusHandle, Window, WindowHandle,
};
use anyhow::Result;
use futures::FutureExt;
@@ -667,25 +667,6 @@ impl<'a, T: 'static> Context<'a, T> {
window.spawn(self, async move |cx| f(view, cx).await)
}
- /// Schedule a future to be run asynchronously with the given priority.
- /// The given callback is invoked with a [`WeakEntity<V>`] to avoid leaking the entity for a long-running process.
- /// It's also given an [`AsyncWindowContext`], which can be used to access the state of the entity across await points.
- /// The returned future will be polled on the main thread.
- #[track_caller]
- pub fn spawn_in_with_priority<AsyncFn, R>(
- &self,
- priority: Priority,
- window: &Window,
- f: AsyncFn,
- ) -> Task<R>
- where
- R: 'static,
- AsyncFn: AsyncFnOnce(WeakEntity<T>, &mut AsyncWindowContext) -> R + 'static,
- {
- let view = self.weak_entity();
- window.spawn_with_priority(priority, self, async move |cx| f(view, cx).await)
- }
-
/// Register a callback to be invoked when the given global state changes.
pub fn observe_global_in<G: Global>(
&mut self,
@@ -1,4 +1,4 @@
-use crate::{App, PlatformDispatcher, RunnableMeta, RunnableVariant, TaskTiming, profiler};
+use crate::{App, PlatformDispatcher, RunnableMeta, RunnableVariant};
use async_task::Runnable;
use futures::channel::mpsc;
use smol::prelude::*;
@@ -46,52 +46,6 @@ pub struct ForegroundExecutor {
not_send: PhantomData<Rc<()>>,
}
-/// Realtime task priority
-#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
-#[repr(u8)]
-pub enum RealtimePriority {
- /// Audio task
- Audio,
- /// Other realtime task
- #[default]
- Other,
-}
-
-/// Task priority
-#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
-#[repr(u8)]
-pub enum Priority {
- /// Realtime priority
- ///
- /// Spawning a task with this priority will spin it off on a separate thread dedicated just to that task.
- Realtime(RealtimePriority),
- /// High priority
- ///
- /// Only use for tasks that are critical to the user experience / responsiveness of the editor.
- High,
- /// Medium priority, probably suits most of your use cases.
- #[default]
- Medium,
- /// Low priority
- ///
- /// Prioritize this for background work that can come in large quantities
- /// to not starve the executor of resources for high priority tasks
- Low,
-}
-
-impl Priority {
- #[allow(dead_code)]
- pub(crate) const fn probability(&self) -> u32 {
- match self {
- // realtime priorities are not considered for probability scheduling
- Priority::Realtime(_) => 0,
- Priority::High => 60,
- Priority::Medium => 30,
- Priority::Low => 10,
- }
- }
-}
-
/// Task is a primitive that allows work to happen in the background.
///
/// It implements [`Future`] so you can `.await` on it.
@@ -197,20 +151,7 @@ impl BackgroundExecutor {
where
R: Send + 'static,
{
- self.spawn_with_priority(Priority::default(), future)
- }
-
- /// Enqueues the given future to be run to completion on a background thread.
- #[track_caller]
- pub fn spawn_with_priority<R>(
- &self,
- priority: Priority,
- future: impl Future<Output = R> + Send + 'static,
- ) -> Task<R>
- where
- R: Send + 'static,
- {
- self.spawn_internal::<R>(Box::pin(future), None, priority)
+ self.spawn_internal::<R>(Box::pin(future), None)
}
/// Enqueues the given future to be run to completion on a background thread.
@@ -224,7 +165,7 @@ impl BackgroundExecutor {
where
R: Send + 'static,
{
- self.spawn_internal::<R>(Box::pin(future), Some(label), Priority::default())
+ self.spawn_internal::<R>(Box::pin(future), Some(label))
}
#[track_caller]
@@ -232,55 +173,15 @@ impl BackgroundExecutor {
&self,
future: AnyFuture<R>,
label: Option<TaskLabel>,
- priority: Priority,
) -> Task<R> {
let dispatcher = self.dispatcher.clone();
- let (runnable, task) = if let Priority::Realtime(realtime) = priority {
- let location = core::panic::Location::caller();
- let (mut tx, rx) = flume::bounded::<Runnable<RunnableMeta>>(1);
-
- dispatcher.spawn_realtime(
- realtime,
- Box::new(move || {
- while let Ok(runnable) = rx.recv() {
- let start = Instant::now();
- let location = runnable.metadata().location;
- let mut timing = TaskTiming {
- location,
- start,
- end: None,
- };
- profiler::add_task_timing(timing);
-
- runnable.run();
-
- let end = Instant::now();
- timing.end = Some(end);
- profiler::add_task_timing(timing);
- }
- }),
+ let location = core::panic::Location::caller();
+ let (runnable, task) = async_task::Builder::new()
+ .metadata(RunnableMeta { location })
+ .spawn(
+ move |_| future,
+ move |runnable| dispatcher.dispatch(RunnableVariant::Meta(runnable), label),
);
-
- async_task::Builder::new()
- .metadata(RunnableMeta { location })
- .spawn(
- move |_| future,
- move |runnable| {
- let _ = tx.send(runnable);
- },
- )
- } else {
- let location = core::panic::Location::caller();
- async_task::Builder::new()
- .metadata(RunnableMeta { location })
- .spawn(
- move |_| future,
- move |runnable| {
- dispatcher.dispatch(RunnableVariant::Meta(runnable), label, priority)
- },
- )
- };
-
runnable.schedule();
Task(TaskState::Spawned(task))
}
@@ -453,28 +354,11 @@ impl BackgroundExecutor {
where
F: FnOnce(&mut Scope<'scope>),
{
- let mut scope = Scope::new(self.clone(), Priority::default());
- (scheduler)(&mut scope);
- let spawned = mem::take(&mut scope.futures)
- .into_iter()
- .map(|f| self.spawn_with_priority(scope.priority, f))
- .collect::<Vec<_>>();
- for task in spawned {
- task.await;
- }
- }
-
- /// Scoped lets you start a number of tasks and waits
- /// for all of them to complete before returning.
- pub async fn scoped_priority<'scope, F>(&self, priority: Priority, scheduler: F)
- where
- F: FnOnce(&mut Scope<'scope>),
- {
- let mut scope = Scope::new(self.clone(), priority);
+ let mut scope = Scope::new(self.clone());
(scheduler)(&mut scope);
let spawned = mem::take(&mut scope.futures)
.into_iter()
- .map(|f| self.spawn_with_priority(scope.priority, f))
+ .map(|f| self.spawn(f))
.collect::<Vec<_>>();
for task in spawned {
task.await;
@@ -610,19 +494,6 @@ impl ForegroundExecutor {
/// Enqueues the given Task to run on the main thread at some point in the future.
#[track_caller]
pub fn spawn<R>(&self, future: impl Future<Output = R> + 'static) -> Task<R>
- where
- R: 'static,
- {
- self.spawn_with_priority(Priority::default(), future)
- }
-
- /// Enqueues the given Task to run on the main thread at some point in the future.
- #[track_caller]
- pub fn spawn_with_priority<R>(
- &self,
- priority: Priority,
- future: impl Future<Output = R> + 'static,
- ) -> Task<R>
where
R: 'static,
{
@@ -634,19 +505,16 @@ impl ForegroundExecutor {
dispatcher: Arc<dyn PlatformDispatcher>,
future: AnyLocalFuture<R>,
location: &'static core::panic::Location<'static>,
- priority: Priority,
) -> Task<R> {
let (runnable, task) = spawn_local_with_source_location(
future,
- move |runnable| {
- dispatcher.dispatch_on_main_thread(RunnableVariant::Meta(runnable), priority)
- },
+ move |runnable| dispatcher.dispatch_on_main_thread(RunnableVariant::Meta(runnable)),
RunnableMeta { location },
);
runnable.schedule();
Task(TaskState::Spawned(task))
}
- inner::<R>(dispatcher, Box::pin(future), location, priority)
+ inner::<R>(dispatcher, Box::pin(future), location)
}
}
@@ -722,7 +590,6 @@ where
/// Scope manages a set of tasks that are enqueued and waited on together. See [`BackgroundExecutor::scoped`].
pub struct Scope<'a> {
executor: BackgroundExecutor,
- priority: Priority,
futures: Vec<Pin<Box<dyn Future<Output = ()> + Send + 'static>>>,
tx: Option<mpsc::Sender<()>>,
rx: mpsc::Receiver<()>,
@@ -730,11 +597,10 @@ pub struct Scope<'a> {
}
impl<'a> Scope<'a> {
- fn new(executor: BackgroundExecutor, priority: Priority) -> Self {
+ fn new(executor: BackgroundExecutor) -> Self {
let (tx, rx) = mpsc::channel(1);
Self {
executor,
- priority,
tx: Some(tx),
rx,
futures: Default::default(),
@@ -31,8 +31,6 @@ mod path_builder;
mod platform;
pub mod prelude;
mod profiler;
-#[cfg(any(target_os = "windows", target_os = "linux"))]
-mod queue;
mod scene;
mod shared_string;
mod shared_uri;
@@ -91,20 +89,16 @@ pub use keymap::*;
pub use path_builder::*;
pub use platform::*;
pub use profiler::*;
-#[cfg(any(target_os = "windows", target_os = "linux"))]
-pub(crate) use queue::{PriorityQueueReceiver, PriorityQueueSender};
pub use refineable::*;
pub use scene::*;
pub use shared_string::*;
pub use shared_uri::*;
pub use smol::Timer;
-use std::{any::Any, future::Future};
pub use style::*;
pub use styled::*;
pub use subscription::*;
pub use svg_renderer::*;
pub(crate) use tab_stop::*;
-use taffy::TaffyLayoutEngine;
pub use taffy::{AvailableSpace, LayoutId};
#[cfg(any(test, feature = "test-support"))]
pub use test::*;
@@ -115,6 +109,9 @@ pub use util::{FutureExt, Timeout, arc_cow::ArcCow};
pub use view::*;
pub use window::*;
+use std::{any::Any, future::Future};
+use taffy::TaffyLayoutEngine;
+
/// The context trait, allows the different contexts in GPUI to be used
/// interchangeably for certain operations.
pub trait AppContext {
@@ -39,10 +39,9 @@ use crate::{
Action, AnyWindowHandle, App, AsyncWindowContext, BackgroundExecutor, Bounds,
DEFAULT_WINDOW_SIZE, DevicePixels, DispatchEventResult, Font, FontId, FontMetrics, FontRun,
ForegroundExecutor, GlyphId, GpuSpecs, ImageSource, Keymap, LineLayout, Pixels, PlatformInput,
- Point, Priority, RealtimePriority, RenderGlyphParams, RenderImage, RenderImageParams,
- RenderSvgParams, Scene, ShapedGlyph, ShapedRun, SharedString, Size, SvgRenderer,
- SystemWindowTab, Task, TaskLabel, TaskTiming, ThreadTaskTimings, Window, WindowControlArea,
- hash, point, px, size,
+ Point, RenderGlyphParams, RenderImage, RenderImageParams, RenderSvgParams, Scene, ShapedGlyph,
+ ShapedRun, SharedString, Size, SvgRenderer, SystemWindowTab, Task, TaskLabel, TaskTiming,
+ ThreadTaskTimings, Window, WindowControlArea, hash, point, px, size,
};
use anyhow::Result;
use async_task::Runnable;
@@ -588,10 +587,9 @@ pub trait PlatformDispatcher: Send + Sync {
fn get_all_timings(&self) -> Vec<ThreadTaskTimings>;
fn get_current_thread_timings(&self) -> Vec<TaskTiming>;
fn is_main_thread(&self) -> bool;
- fn dispatch(&self, runnable: RunnableVariant, label: Option<TaskLabel>, priority: Priority);
- fn dispatch_on_main_thread(&self, runnable: RunnableVariant, priority: Priority);
+ fn dispatch(&self, runnable: RunnableVariant, label: Option<TaskLabel>);
+ fn dispatch_on_main_thread(&self, runnable: RunnableVariant);
fn dispatch_after(&self, duration: Duration, runnable: RunnableVariant);
- fn spawn_realtime(&self, priority: RealtimePriority, f: Box<dyn FnOnce() + Send>);
fn now(&self) -> Instant {
Instant::now()
@@ -1,10 +1,9 @@
use crate::{
- GLOBAL_THREAD_TIMINGS, PlatformDispatcher, Priority, PriorityQueueReceiver,
- PriorityQueueSender, RealtimePriority, RunnableVariant, THREAD_TIMINGS, TaskLabel, TaskTiming,
- ThreadTaskTimings, profiler,
+ GLOBAL_THREAD_TIMINGS, PlatformDispatcher, RunnableVariant, THREAD_TIMINGS, TaskLabel,
+ TaskTiming, ThreadTaskTimings,
};
use calloop::{
- EventLoop, PostAction,
+ EventLoop,
channel::{self, Sender},
timer::TimeoutAction,
};
@@ -20,9 +19,9 @@ struct TimerAfter {
}
pub(crate) struct LinuxDispatcher {
- main_sender: PriorityQueueCalloopSender<RunnableVariant>,
+ main_sender: Sender<RunnableVariant>,
timer_sender: Sender<TimerAfter>,
- background_sender: PriorityQueueSender<RunnableVariant>,
+ background_sender: flume::Sender<RunnableVariant>,
_background_threads: Vec<thread::JoinHandle<()>>,
main_thread_id: thread::ThreadId,
}
@@ -30,20 +29,18 @@ pub(crate) struct LinuxDispatcher {
const MIN_THREADS: usize = 2;
impl LinuxDispatcher {
- pub fn new(main_sender: PriorityQueueCalloopSender<RunnableVariant>) -> Self {
- let (background_sender, background_receiver) = PriorityQueueReceiver::new();
+ pub fn new(main_sender: Sender<RunnableVariant>) -> Self {
+ let (background_sender, background_receiver) = flume::unbounded::<RunnableVariant>();
let thread_count =
std::thread::available_parallelism().map_or(MIN_THREADS, |i| i.get().max(MIN_THREADS));
- // These thread should really be lower prio then the foreground
- // executor
let mut background_threads = (0..thread_count)
.map(|i| {
- let mut receiver = background_receiver.clone();
+ let receiver = background_receiver.clone();
std::thread::Builder::new()
.name(format!("Worker-{i}"))
.spawn(move || {
- for runnable in receiver.iter() {
+ for runnable in receiver {
let start = Instant::now();
let mut location = match runnable {
@@ -54,7 +51,7 @@ impl LinuxDispatcher {
start,
end: None,
};
- profiler::add_task_timing(timing);
+ Self::add_task_timing(timing);
runnable.run();
timing
@@ -66,7 +63,7 @@ impl LinuxDispatcher {
start,
end: None,
};
- profiler::add_task_timing(timing);
+ Self::add_task_timing(timing);
runnable.run();
timing
@@ -75,7 +72,7 @@ impl LinuxDispatcher {
let end = Instant::now();
location.end = Some(end);
- profiler::add_task_timing(location);
+ Self::add_task_timing(location);
log::trace!(
"background thread {}: ran runnable. took: {:?}",
@@ -116,7 +113,7 @@ impl LinuxDispatcher {
start,
end: None,
};
- profiler::add_task_timing(timing);
+ Self::add_task_timing(timing);
runnable.run();
timing
@@ -127,7 +124,7 @@ impl LinuxDispatcher {
start,
end: None,
};
- profiler::add_task_timing(timing);
+ Self::add_task_timing(timing);
runnable.run();
timing
@@ -136,7 +133,7 @@ impl LinuxDispatcher {
let end = Instant::now();
timing.end = Some(end);
- profiler::add_task_timing(timing);
+ Self::add_task_timing(timing);
}
TimeoutAction::Drop
},
@@ -160,6 +157,22 @@ impl LinuxDispatcher {
main_thread_id: thread::current().id(),
}
}
+
+ pub(crate) fn add_task_timing(timing: TaskTiming) {
+ THREAD_TIMINGS.with(|timings| {
+ let mut timings = timings.lock();
+ let timings = &mut timings.timings;
+
+ if let Some(last_timing) = timings.iter_mut().rev().next() {
+ if last_timing.location == timing.location {
+ last_timing.end = timing.end;
+ return;
+ }
+ }
+
+ timings.push_back(timing);
+ });
+ }
}
impl PlatformDispatcher for LinuxDispatcher {
@@ -186,26 +199,22 @@ impl PlatformDispatcher for LinuxDispatcher {
thread::current().id() == self.main_thread_id
}
- fn dispatch(&self, runnable: RunnableVariant, _: Option<TaskLabel>, priority: Priority) {
- self.background_sender
- .send(priority, runnable)
- .unwrap_or_else(|_| panic!("blocking sender returned without value"));
+ fn dispatch(&self, runnable: RunnableVariant, _: Option<TaskLabel>) {
+ self.background_sender.send(runnable).unwrap();
}
- fn dispatch_on_main_thread(&self, runnable: RunnableVariant, priority: Priority) {
- self.main_sender
- .send(priority, runnable)
- .unwrap_or_else(|runnable| {
- // NOTE: Runnable may wrap a Future that is !Send.
- //
- // This is usually safe because we only poll it on the main thread.
- // However if the send fails, we know that:
- // 1. main_receiver has been dropped (which implies the app is shutting down)
- // 2. we are on a background thread.
- // It is not safe to drop something !Send on the wrong thread, and
- // the app will exit soon anyway, so we must forget the runnable.
- std::mem::forget(runnable);
- });
+ fn dispatch_on_main_thread(&self, runnable: RunnableVariant) {
+ self.main_sender.send(runnable).unwrap_or_else(|runnable| {
+ // NOTE: Runnable may wrap a Future that is !Send.
+ //
+ // This is usually safe because we only poll it on the main thread.
+ // However if the send fails, we know that:
+ // 1. main_receiver has been dropped (which implies the app is shutting down)
+ // 2. we are on a background thread.
+ // It is not safe to drop something !Send on the wrong thread, and
+ // the app will exit soon anyway, so we must forget the runnable.
+ std::mem::forget(runnable);
+ });
}
fn dispatch_after(&self, duration: Duration, runnable: RunnableVariant) {
@@ -213,252 +222,4 @@ impl PlatformDispatcher for LinuxDispatcher {
.send(TimerAfter { duration, runnable })
.ok();
}
-
- fn spawn_realtime(&self, priority: RealtimePriority, f: Box<dyn FnOnce() + Send>) {
- std::thread::spawn(move || {
- // SAFETY: always safe to call
- let thread_id = unsafe { libc::pthread_self() };
-
- let policy = match priority {
- RealtimePriority::Audio => libc::SCHED_FIFO,
- RealtimePriority::Other => libc::SCHED_RR,
- };
- let sched_priority = match priority {
- RealtimePriority::Audio => 65,
- RealtimePriority::Other => 45,
- };
-
- let sched_param = libc::sched_param { sched_priority };
- // SAFETY: sched_param is a valid initialized structure
- let result = unsafe { libc::pthread_setschedparam(thread_id, policy, &sched_param) };
- if result != 0 {
- log::warn!("failed to set realtime thread priority to {:?}", priority);
- }
-
- f();
- });
- }
-}
-
-pub struct PriorityQueueCalloopSender<T> {
- sender: PriorityQueueSender<T>,
- ping: calloop::ping::Ping,
-}
-
-impl<T> PriorityQueueCalloopSender<T> {
- fn new(tx: PriorityQueueSender<T>, ping: calloop::ping::Ping) -> Self {
- Self { sender: tx, ping }
- }
-
- fn send(&self, priority: Priority, item: T) -> Result<(), crate::queue::SendError<T>> {
- let res = self.sender.send(priority, item);
- if res.is_ok() {
- self.ping.ping();
- }
- res
- }
-}
-
-impl<T> Drop for PriorityQueueCalloopSender<T> {
- fn drop(&mut self) {
- self.ping.ping();
- }
-}
-
-pub struct PriorityQueueCalloopReceiver<T> {
- receiver: PriorityQueueReceiver<T>,
- source: calloop::ping::PingSource,
- ping: calloop::ping::Ping,
-}
-
-impl<T> PriorityQueueCalloopReceiver<T> {
- pub fn new() -> (PriorityQueueCalloopSender<T>, Self) {
- let (ping, source) = calloop::ping::make_ping().expect("Failed to create a Ping.");
-
- let (tx, rx) = PriorityQueueReceiver::new();
-
- (
- PriorityQueueCalloopSender::new(tx, ping.clone()),
- Self {
- receiver: rx,
- source,
- ping,
- },
- )
- }
}
-
-use calloop::channel::Event;
-
-#[derive(Debug)]
-pub struct ChannelError(calloop::ping::PingError);
-
-impl std::fmt::Display for ChannelError {
- #[cfg_attr(feature = "nightly_coverage", coverage(off))]
- fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
- std::fmt::Display::fmt(&self.0, f)
- }
-}
-
-impl std::error::Error for ChannelError {
- #[cfg_attr(feature = "nightly_coverage", coverage(off))]
- fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
- Some(&self.0)
- }
-}
-
-impl<T> calloop::EventSource for PriorityQueueCalloopReceiver<T> {
- type Event = Event<T>;
- type Metadata = ();
- type Ret = ();
- type Error = ChannelError;
-
- fn process_events<F>(
- &mut self,
- readiness: calloop::Readiness,
- token: calloop::Token,
- mut callback: F,
- ) -> Result<calloop::PostAction, Self::Error>
- where
- F: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret,
- {
- let mut clear_readiness = false;
- let mut disconnected = false;
-
- let action = self
- .source
- .process_events(readiness, token, |(), &mut ()| {
- let mut is_empty = true;
-
- let mut receiver = self.receiver.clone();
- for runnable in receiver.try_iter() {
- match runnable {
- Ok(r) => {
- callback(Event::Msg(r), &mut ());
- is_empty = false;
- }
- Err(_) => {
- disconnected = true;
- }
- }
- }
-
- if disconnected {
- callback(Event::Closed, &mut ());
- }
-
- if is_empty {
- clear_readiness = true;
- }
- })
- .map_err(ChannelError)?;
-
- if disconnected {
- Ok(PostAction::Remove)
- } else if clear_readiness {
- Ok(action)
- } else {
- // Re-notify the ping source so we can try again.
- self.ping.ping();
- Ok(PostAction::Continue)
- }
- }
-
- fn register(
- &mut self,
- poll: &mut calloop::Poll,
- token_factory: &mut calloop::TokenFactory,
- ) -> calloop::Result<()> {
- self.source.register(poll, token_factory)
- }
-
- fn reregister(
- &mut self,
- poll: &mut calloop::Poll,
- token_factory: &mut calloop::TokenFactory,
- ) -> calloop::Result<()> {
- self.source.reregister(poll, token_factory)
- }
-
- fn unregister(&mut self, poll: &mut calloop::Poll) -> calloop::Result<()> {
- self.source.unregister(poll)
- }
-}
-
-#[cfg(test)]
-mod tests {
- use super::*;
-
- #[test]
- fn calloop_works() {
- let mut event_loop = calloop::EventLoop::try_new().unwrap();
- let handle = event_loop.handle();
-
- let (tx, rx) = PriorityQueueCalloopReceiver::new();
-
- struct Data {
- got_msg: bool,
- got_closed: bool,
- }
-
- let mut data = Data {
- got_msg: false,
- got_closed: false,
- };
-
- let _channel_token = handle
- .insert_source(rx, move |evt, &mut (), data: &mut Data| match evt {
- Event::Msg(()) => {
- data.got_msg = true;
- }
-
- Event::Closed => {
- data.got_closed = true;
- }
- })
- .unwrap();
-
- // nothing is sent, nothing is received
- event_loop
- .dispatch(Some(::std::time::Duration::ZERO), &mut data)
- .unwrap();
-
- assert!(!data.got_msg);
- assert!(!data.got_closed);
- // a message is send
-
- tx.send(Priority::Medium, ()).unwrap();
- event_loop
- .dispatch(Some(::std::time::Duration::ZERO), &mut data)
- .unwrap();
-
- assert!(data.got_msg);
- assert!(!data.got_closed);
-
- // the sender is dropped
- drop(tx);
- event_loop
- .dispatch(Some(::std::time::Duration::ZERO), &mut data)
- .unwrap();
-
- assert!(data.got_msg);
- assert!(data.got_closed);
- }
-}
-
-// running 1 test
-// test platform::linux::dispatcher::tests::tomato ... FAILED
-
-// failures:
-
-// ---- platform::linux::dispatcher::tests::tomato stdout ----
-// [crates/gpui/src/platform/linux/dispatcher.rs:262:9]
-// returning 1 tasks to process
-// [crates/gpui/src/platform/linux/dispatcher.rs:480:75] evt = Msg(
-// (),
-// )
-// returning 0 tasks to process
-
-// thread 'platform::linux::dispatcher::tests::tomato' (478301) panicked at crates/gpui/src/platform/linux/dispatcher.rs:515:9:
-// assertion failed: data.got_closed
-// note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace
@@ -14,7 +14,7 @@ use std::{
};
use anyhow::{Context as _, anyhow};
-use calloop::LoopSignal;
+use calloop::{LoopSignal, channel::Channel};
use futures::channel::oneshot;
use util::ResultExt as _;
use util::command::{new_smol_command, new_std_command};
@@ -25,8 +25,8 @@ use crate::{
Action, AnyWindowHandle, BackgroundExecutor, ClipboardItem, CursorStyle, DisplayId,
ForegroundExecutor, Keymap, LinuxDispatcher, Menu, MenuItem, OwnedMenu, PathPromptOptions,
Pixels, Platform, PlatformDisplay, PlatformKeyboardLayout, PlatformKeyboardMapper,
- PlatformTextSystem, PlatformWindow, Point, PriorityQueueCalloopReceiver, Result,
- RunnableVariant, Task, WindowAppearance, WindowParams, px,
+ PlatformTextSystem, PlatformWindow, Point, Result, RunnableVariant, Task, WindowAppearance,
+ WindowParams, px,
};
#[cfg(any(feature = "wayland", feature = "x11"))]
@@ -149,8 +149,8 @@ pub(crate) struct LinuxCommon {
}
impl LinuxCommon {
- pub fn new(signal: LoopSignal) -> (Self, PriorityQueueCalloopReceiver<RunnableVariant>) {
- let (main_sender, main_receiver) = PriorityQueueCalloopReceiver::new();
+ pub fn new(signal: LoopSignal) -> (Self, Channel<RunnableVariant>) {
+ let (main_sender, main_receiver) = calloop::channel::channel::<RunnableVariant>();
#[cfg(any(feature = "wayland", feature = "x11"))]
let text_system = Arc::new(crate::CosmicTextSystem::new());
@@ -77,10 +77,10 @@ use crate::{
LinuxKeyboardLayout, Modifiers, ModifiersChangedEvent, MouseButton, MouseDownEvent,
MouseExitEvent, MouseMoveEvent, MouseUpEvent, NavigationDirection, Pixels, PlatformDisplay,
PlatformInput, PlatformKeyboardLayout, Point, ResultExt as _, SCROLL_LINES, ScrollDelta,
- ScrollWheelEvent, Size, TouchPhase, WindowParams, point, profiler, px, size,
+ ScrollWheelEvent, Size, TouchPhase, WindowParams, point, px, size,
};
use crate::{
- RunnableVariant, TaskTiming,
+ LinuxDispatcher, RunnableVariant, TaskTiming,
platform::{PlatformWindow, blade::BladeContext},
};
use crate::{
@@ -503,7 +503,7 @@ impl WaylandClient {
start,
end: None,
};
- profiler::add_task_timing(timing);
+ LinuxDispatcher::add_task_timing(timing);
runnable.run();
timing
@@ -515,7 +515,7 @@ impl WaylandClient {
start,
end: None,
};
- profiler::add_task_timing(timing);
+ LinuxDispatcher::add_task_timing(timing);
runnable.run();
timing
@@ -524,7 +524,7 @@ impl WaylandClient {
let end = Instant::now();
timing.end = Some(end);
- profiler::add_task_timing(timing);
+ LinuxDispatcher::add_task_timing(timing);
});
}
}
@@ -1,4 +1,4 @@
-use crate::{Capslock, ResultExt as _, RunnableVariant, TaskTiming, profiler, xcb_flush};
+use crate::{Capslock, LinuxDispatcher, ResultExt as _, RunnableVariant, TaskTiming, xcb_flush};
use anyhow::{Context as _, anyhow};
use ashpd::WindowIdentifier;
use calloop::{
@@ -322,7 +322,7 @@ impl X11Client {
start,
end: None,
};
- profiler::add_task_timing(timing);
+ LinuxDispatcher::add_task_timing(timing);
runnable.run();
timing
@@ -334,7 +334,7 @@ impl X11Client {
start,
end: None,
};
- profiler::add_task_timing(timing);
+ LinuxDispatcher::add_task_timing(timing);
runnable.run();
timing
@@ -343,7 +343,7 @@ impl X11Client {
let end = Instant::now();
timing.end = Some(end);
- profiler::add_task_timing(timing);
+ LinuxDispatcher::add_task_timing(timing);
});
}
}
@@ -3,22 +3,11 @@
#![allow(non_snake_case)]
use crate::{
- GLOBAL_THREAD_TIMINGS, PlatformDispatcher, Priority, RealtimePriority, RunnableMeta,
- RunnableVariant, THREAD_TIMINGS, TaskLabel, TaskTiming, ThreadTaskTimings,
+ GLOBAL_THREAD_TIMINGS, PlatformDispatcher, RunnableMeta, RunnableVariant, THREAD_TIMINGS,
+ TaskLabel, TaskTiming, ThreadTaskTimings,
};
-use anyhow::Context;
use async_task::Runnable;
-use mach2::{
- kern_return::KERN_SUCCESS,
- mach_time::mach_timebase_info_data_t,
- thread_policy::{
- THREAD_EXTENDED_POLICY, THREAD_EXTENDED_POLICY_COUNT, THREAD_PRECEDENCE_POLICY,
- THREAD_PRECEDENCE_POLICY_COUNT, THREAD_TIME_CONSTRAINT_POLICY,
- THREAD_TIME_CONSTRAINT_POLICY_COUNT, thread_extended_policy_data_t,
- thread_precedence_policy_data_t, thread_time_constraint_policy_data_t,
- },
-};
use objc::{
class, msg_send,
runtime::{BOOL, YES},
@@ -26,11 +15,9 @@ use objc::{
};
use std::{
ffi::c_void,
- mem::MaybeUninit,
ptr::{NonNull, addr_of},
time::{Duration, Instant},
};
-use util::ResultExt;
/// All items in the generated file are marked as pub, so we're gonna wrap it in a separate mod to prevent
/// these pub items from leaking into public API.
@@ -69,7 +56,7 @@ impl PlatformDispatcher for MacDispatcher {
is_main_thread == YES
}
- fn dispatch(&self, runnable: RunnableVariant, _: Option<TaskLabel>, priority: Priority) {
+ fn dispatch(&self, runnable: RunnableVariant, _: Option<TaskLabel>) {
let (context, trampoline) = match runnable {
RunnableVariant::Meta(runnable) => (
runnable.into_raw().as_ptr() as *mut c_void,
@@ -80,24 +67,16 @@ impl PlatformDispatcher for MacDispatcher {
Some(trampoline_compat as unsafe extern "C" fn(*mut c_void)),
),
};
-
- let queue_priority = match priority {
- Priority::Realtime(_) => unreachable!(),
- Priority::High => DISPATCH_QUEUE_PRIORITY_HIGH,
- Priority::Medium => DISPATCH_QUEUE_PRIORITY_DEFAULT,
- Priority::Low => DISPATCH_QUEUE_PRIORITY_LOW as u32,
- };
-
unsafe {
dispatch_async_f(
- dispatch_get_global_queue(queue_priority.try_into().unwrap(), 0),
+ dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_HIGH.try_into().unwrap(), 0),
context,
trampoline,
);
}
}
- fn dispatch_on_main_thread(&self, runnable: RunnableVariant, _priority: Priority) {
+ fn dispatch_on_main_thread(&self, runnable: RunnableVariant) {
let (context, trampoline) = match runnable {
RunnableVariant::Meta(runnable) => (
runnable.into_raw().as_ptr() as *mut c_void,
@@ -131,120 +110,6 @@ impl PlatformDispatcher for MacDispatcher {
dispatch_after_f(when, queue, context, trampoline);
}
}
-
- fn spawn_realtime(&self, priority: RealtimePriority, f: Box<dyn FnOnce() + Send>) {
- std::thread::spawn(move || {
- match priority {
- RealtimePriority::Audio => set_audio_thread_priority(),
- RealtimePriority::Other => set_high_thread_priority(),
- }
- .context(format!("for priority {:?}", priority))
- .log_err();
-
- f();
- });
- }
-}
-
-fn set_high_thread_priority() -> anyhow::Result<()> {
- // SAFETY: always safe to call
- let thread_id = unsafe { libc::pthread_self() };
-
- // SAFETY: all sched_param members are valid when initialized to zero.
- let mut sched_param = unsafe { MaybeUninit::<libc::sched_param>::zeroed().assume_init() };
- sched_param.sched_priority = 45;
-
- let result = unsafe { libc::pthread_setschedparam(thread_id, libc::SCHED_FIFO, &sched_param) };
- if result != 0 {
- anyhow::bail!("failed to set realtime thread priority")
- }
-
- Ok(())
-}
-
-fn set_audio_thread_priority() -> anyhow::Result<()> {
- // https://chromium.googlesource.com/chromium/chromium/+/master/base/threading/platform_thread_mac.mm#93
-
- // SAFETY: always safe to call
- let thread_id = unsafe { libc::pthread_self() };
-
- // SAFETY: thread_id is a valid thread id
- let thread_id = unsafe { libc::pthread_mach_thread_np(thread_id) };
-
- // Fixed priority thread
- let mut policy = thread_extended_policy_data_t { timeshare: 0 };
-
- // SAFETY: thread_id is a valid thread id
- // SAFETY: thread_extended_policy_data_t is passed as THREAD_EXTENDED_POLICY
- let result = unsafe {
- mach2::thread_policy::thread_policy_set(
- thread_id,
- THREAD_EXTENDED_POLICY,
- &mut policy as *mut _ as *mut _,
- THREAD_EXTENDED_POLICY_COUNT,
- )
- };
-
- if result != KERN_SUCCESS {
- anyhow::bail!("failed to set thread extended policy");
- }
-
- // relatively high priority
- let mut precedence = thread_precedence_policy_data_t { importance: 63 };
-
- // SAFETY: thread_id is a valid thread id
- // SAFETY: thread_precedence_policy_data_t is passed as THREAD_PRECEDENCE_POLICY
- let result = unsafe {
- mach2::thread_policy::thread_policy_set(
- thread_id,
- THREAD_PRECEDENCE_POLICY,
- &mut precedence as *mut _ as *mut _,
- THREAD_PRECEDENCE_POLICY_COUNT,
- )
- };
-
- if result != KERN_SUCCESS {
- anyhow::bail!("failed to set thread precedence policy");
- }
-
- const GUARANTEED_AUDIO_DUTY_CYCLE: f32 = 0.75;
- const MAX_AUDIO_DUTY_CYCLE: f32 = 0.85;
-
- // ~128 frames @ 44.1KHz
- const TIME_QUANTUM: f32 = 2.9;
-
- const AUDIO_TIME_NEEDED: f32 = GUARANTEED_AUDIO_DUTY_CYCLE * TIME_QUANTUM;
- const MAX_TIME_ALLOWED: f32 = MAX_AUDIO_DUTY_CYCLE * TIME_QUANTUM;
-
- let mut timebase_info = mach_timebase_info_data_t { numer: 0, denom: 0 };
- // SAFETY: timebase_info is a valid pointer to a mach_timebase_info_data_t struct
- unsafe { mach2::mach_time::mach_timebase_info(&mut timebase_info) };
-
- let ms_to_abs_time = ((timebase_info.denom as f32) / (timebase_info.numer as f32)) * 1000000f32;
-
- let mut time_constraints = thread_time_constraint_policy_data_t {
- period: (TIME_QUANTUM * ms_to_abs_time) as u32,
- computation: (AUDIO_TIME_NEEDED * ms_to_abs_time) as u32,
- constraint: (MAX_TIME_ALLOWED * ms_to_abs_time) as u32,
- preemptible: 0,
- };
-
- // SAFETY: thread_id is a valid thread id
- // SAFETY: thread_precedence_pthread_time_constraint_policy_data_t is passed as THREAD_TIME_CONSTRAINT_POLICY
- let result = unsafe {
- mach2::thread_policy::thread_policy_set(
- thread_id,
- THREAD_TIME_CONSTRAINT_POLICY,
- &mut time_constraints as *mut _ as *mut _,
- THREAD_TIME_CONSTRAINT_POLICY_COUNT,
- )
- };
-
- if result != KERN_SUCCESS {
- anyhow::bail!("failed to set thread time constraint policy");
- }
-
- Ok(())
}
extern "C" fn trampoline(runnable: *mut c_void) {
@@ -1,4 +1,4 @@
-use crate::{PlatformDispatcher, Priority, RunnableVariant, TaskLabel};
+use crate::{PlatformDispatcher, RunnableVariant, TaskLabel};
use backtrace::Backtrace;
use collections::{HashMap, HashSet, VecDeque};
use parking::Unparker;
@@ -284,7 +284,7 @@ impl PlatformDispatcher for TestDispatcher {
state.start_time + state.time
}
- fn dispatch(&self, runnable: RunnableVariant, label: Option<TaskLabel>, _priority: Priority) {
+ fn dispatch(&self, runnable: RunnableVariant, label: Option<TaskLabel>) {
{
let mut state = self.state.lock();
if label.is_some_and(|label| state.deprioritized_task_labels.contains(&label)) {
@@ -296,7 +296,7 @@ impl PlatformDispatcher for TestDispatcher {
self.unpark_all();
}
- fn dispatch_on_main_thread(&self, runnable: RunnableVariant, _priority: Priority) {
+ fn dispatch_on_main_thread(&self, runnable: RunnableVariant) {
self.state
.lock()
.foreground
@@ -318,10 +318,4 @@ impl PlatformDispatcher for TestDispatcher {
fn as_test(&self) -> Option<&TestDispatcher> {
Some(self)
}
-
- fn spawn_realtime(&self, _priority: crate::RealtimePriority, f: Box<dyn FnOnce() + Send>) {
- std::thread::spawn(move || {
- f();
- });
- }
}
@@ -4,31 +4,24 @@ use std::{
time::{Duration, Instant},
};
-use anyhow::Context;
+use flume::Sender;
use util::ResultExt;
use windows::{
- System::Threading::{
- ThreadPool, ThreadPoolTimer, TimerElapsedHandler, WorkItemHandler, WorkItemPriority,
- },
+ System::Threading::{ThreadPool, ThreadPoolTimer, TimerElapsedHandler, WorkItemHandler},
Win32::{
Foundation::{LPARAM, WPARAM},
- System::Threading::{
- GetCurrentThread, HIGH_PRIORITY_CLASS, SetPriorityClass, SetThreadPriority,
- THREAD_PRIORITY_HIGHEST, THREAD_PRIORITY_TIME_CRITICAL,
- },
UI::WindowsAndMessaging::PostMessageW,
},
};
use crate::{
- GLOBAL_THREAD_TIMINGS, HWND, PlatformDispatcher, Priority, PriorityQueueSender,
- RealtimePriority, RunnableVariant, SafeHwnd, THREAD_TIMINGS, TaskLabel, TaskTiming,
- ThreadTaskTimings, WM_GPUI_TASK_DISPATCHED_ON_MAIN_THREAD, profiler,
+ GLOBAL_THREAD_TIMINGS, HWND, PlatformDispatcher, RunnableVariant, SafeHwnd, THREAD_TIMINGS,
+ TaskLabel, TaskTiming, ThreadTaskTimings, WM_GPUI_TASK_DISPATCHED_ON_MAIN_THREAD,
};
pub(crate) struct WindowsDispatcher {
pub(crate) wake_posted: AtomicBool,
- main_sender: PriorityQueueSender<RunnableVariant>,
+ main_sender: Sender<RunnableVariant>,
main_thread_id: ThreadId,
pub(crate) platform_window_handle: SafeHwnd,
validation_number: usize,
@@ -36,7 +29,7 @@ pub(crate) struct WindowsDispatcher {
impl WindowsDispatcher {
pub(crate) fn new(
- main_sender: PriorityQueueSender<RunnableVariant>,
+ main_sender: Sender<RunnableVariant>,
platform_window_handle: HWND,
validation_number: usize,
) -> Self {
@@ -52,7 +45,7 @@ impl WindowsDispatcher {
}
}
- fn dispatch_on_threadpool(&self, priority: WorkItemPriority, runnable: RunnableVariant) {
+ fn dispatch_on_threadpool(&self, runnable: RunnableVariant) {
let handler = {
let mut task_wrapper = Some(runnable);
WorkItemHandler::new(move |_| {
@@ -60,8 +53,7 @@ impl WindowsDispatcher {
Ok(())
})
};
-
- ThreadPool::RunWithPriorityAsync(&handler, priority).log_err();
+ ThreadPool::RunAsync(&handler).log_err();
}
fn dispatch_on_threadpool_after(&self, runnable: RunnableVariant, duration: Duration) {
@@ -87,7 +79,7 @@ impl WindowsDispatcher {
start,
end: None,
};
- profiler::add_task_timing(timing);
+ Self::add_task_timing(timing);
runnable.run();
@@ -99,7 +91,7 @@ impl WindowsDispatcher {
start,
end: None,
};
- profiler::add_task_timing(timing);
+ Self::add_task_timing(timing);
runnable.run();
@@ -110,7 +102,23 @@ impl WindowsDispatcher {
let end = Instant::now();
timing.end = Some(end);
- profiler::add_task_timing(timing);
+ Self::add_task_timing(timing);
+ }
+
+ pub(crate) fn add_task_timing(timing: TaskTiming) {
+ THREAD_TIMINGS.with(|timings| {
+ let mut timings = timings.lock();
+ let timings = &mut timings.timings;
+
+ if let Some(last_timing) = timings.iter_mut().rev().next() {
+ if last_timing.location == timing.location {
+ last_timing.end = timing.end;
+ return;
+ }
+ }
+
+ timings.push_back(timing);
+ });
}
}
@@ -138,22 +146,15 @@ impl PlatformDispatcher for WindowsDispatcher {
current().id() == self.main_thread_id
}
- fn dispatch(&self, runnable: RunnableVariant, label: Option<TaskLabel>, priority: Priority) {
- let priority = match priority {
- Priority::Realtime(_) => unreachable!(),
- Priority::High => WorkItemPriority::High,
- Priority::Medium => WorkItemPriority::Normal,
- Priority::Low => WorkItemPriority::Low,
- };
- self.dispatch_on_threadpool(priority, runnable);
-
+ fn dispatch(&self, runnable: RunnableVariant, label: Option<TaskLabel>) {
+ self.dispatch_on_threadpool(runnable);
if let Some(label) = label {
log::debug!("TaskLabel: {label:?}");
}
}
- fn dispatch_on_main_thread(&self, runnable: RunnableVariant, priority: Priority) {
- match self.main_sender.send(priority, runnable) {
+ fn dispatch_on_main_thread(&self, runnable: RunnableVariant) {
+ match self.main_sender.send(runnable) {
Ok(_) => {
if !self.wake_posted.swap(true, Ordering::AcqRel) {
unsafe {
@@ -184,28 +185,4 @@ impl PlatformDispatcher for WindowsDispatcher {
fn dispatch_after(&self, duration: Duration, runnable: RunnableVariant) {
self.dispatch_on_threadpool_after(runnable, duration);
}
-
- fn spawn_realtime(&self, priority: RealtimePriority, f: Box<dyn FnOnce() + Send>) {
- std::thread::spawn(move || {
- // SAFETY: always safe to call
- let thread_handle = unsafe { GetCurrentThread() };
-
- let thread_priority = match priority {
- RealtimePriority::Audio => THREAD_PRIORITY_TIME_CRITICAL,
- RealtimePriority::Other => THREAD_PRIORITY_HIGHEST,
- };
-
- // SAFETY: thread_handle is a valid handle to a thread
- unsafe { SetPriorityClass(thread_handle, HIGH_PRIORITY_CLASS) }
- .context("thread priority class")
- .log_err();
-
- // SAFETY: thread_handle is a valid handle to a thread
- unsafe { SetThreadPriority(thread_handle, thread_priority) }
- .context("thread priority")
- .log_err();
-
- f();
- });
- }
}
@@ -243,8 +243,7 @@ impl WindowsWindowInner {
fn handle_timer_msg(&self, handle: HWND, wparam: WPARAM) -> Option<isize> {
if wparam.0 == SIZE_MOVE_LOOP_TIMER_ID {
- let mut runnables = self.main_receiver.clone().try_iter();
- while let Some(Ok(runnable)) = runnables.next() {
+ for runnable in self.main_receiver.drain() {
WindowsDispatcher::execute_runnable(runnable);
}
self.handle_paint_msg(handle)
@@ -51,7 +51,7 @@ struct WindowsPlatformInner {
raw_window_handles: std::sync::Weak<RwLock<SmallVec<[SafeHwnd; 4]>>>,
// The below members will never change throughout the entire lifecycle of the app.
validation_number: usize,
- main_receiver: PriorityQueueReceiver<RunnableVariant>,
+ main_receiver: flume::Receiver<RunnableVariant>,
dispatcher: Arc<WindowsDispatcher>,
}
@@ -98,7 +98,7 @@ impl WindowsPlatform {
OleInitialize(None).context("unable to initialize Windows OLE")?;
}
let directx_devices = DirectXDevices::new().context("Creating DirectX devices")?;
- let (main_sender, main_receiver) = PriorityQueueReceiver::new();
+ let (main_sender, main_receiver) = flume::unbounded::<RunnableVariant>();
let validation_number = if usize::BITS == 64 {
rand::random::<u64>() as usize
} else {
@@ -857,24 +857,22 @@ impl WindowsPlatformInner {
}
break 'tasks;
}
- let mut main_receiver = self.main_receiver.clone();
- match main_receiver.try_pop() {
- Ok(Some(runnable)) => WindowsDispatcher::execute_runnable(runnable),
- _ => break 'timeout_loop,
+ match self.main_receiver.try_recv() {
+ Err(_) => break 'timeout_loop,
+ Ok(runnable) => WindowsDispatcher::execute_runnable(runnable),
}
}
// Someone could enqueue a Runnable here. The flag is still true, so they will not PostMessage.
// We need to check for those Runnables after we clear the flag.
self.dispatcher.wake_posted.store(false, Ordering::Release);
- let mut main_receiver = self.main_receiver.clone();
- match main_receiver.try_pop() {
- Ok(Some(runnable)) => {
+ match self.main_receiver.try_recv() {
+ Err(_) => break 'tasks,
+ Ok(runnable) => {
self.dispatcher.wake_posted.store(true, Ordering::Release);
WindowsDispatcher::execute_runnable(runnable);
}
- _ => break 'tasks,
}
}
@@ -936,7 +934,7 @@ pub(crate) struct WindowCreationInfo {
pub(crate) windows_version: WindowsVersion,
pub(crate) drop_target_helper: IDropTargetHelper,
pub(crate) validation_number: usize,
- pub(crate) main_receiver: PriorityQueueReceiver<RunnableVariant>,
+ pub(crate) main_receiver: flume::Receiver<RunnableVariant>,
pub(crate) platform_window_handle: HWND,
pub(crate) disable_direct_composition: bool,
pub(crate) directx_devices: DirectXDevices,
@@ -949,8 +947,8 @@ struct PlatformWindowCreateContext {
inner: Option<Result<Rc<WindowsPlatformInner>>>,
raw_window_handles: std::sync::Weak<RwLock<SmallVec<[SafeHwnd; 4]>>>,
validation_number: usize,
- main_sender: Option<PriorityQueueSender<RunnableVariant>>,
- main_receiver: Option<PriorityQueueReceiver<RunnableVariant>>,
+ main_sender: Option<flume::Sender<RunnableVariant>>,
+ main_receiver: Option<flume::Receiver<RunnableVariant>>,
directx_devices: Option<DirectXDevices>,
dispatcher: Option<Arc<WindowsDispatcher>>,
}
@@ -81,7 +81,7 @@ pub(crate) struct WindowsWindowInner {
pub(crate) executor: ForegroundExecutor,
pub(crate) windows_version: WindowsVersion,
pub(crate) validation_number: usize,
- pub(crate) main_receiver: PriorityQueueReceiver<RunnableVariant>,
+ pub(crate) main_receiver: flume::Receiver<RunnableVariant>,
pub(crate) platform_window_handle: HWND,
}
@@ -362,7 +362,7 @@ struct WindowCreateContext {
windows_version: WindowsVersion,
drop_target_helper: IDropTargetHelper,
validation_number: usize,
- main_receiver: PriorityQueueReceiver<RunnableVariant>,
+ main_receiver: flume::Receiver<RunnableVariant>,
platform_window_handle: HWND,
appearance: WindowAppearance,
disable_direct_composition: bool,
@@ -216,19 +216,3 @@ impl Drop for ThreadTimings {
thread_timings.swap_remove(index);
}
}
-
-pub(crate) fn add_task_timing(timing: TaskTiming) {
- THREAD_TIMINGS.with(|timings| {
- let mut timings = timings.lock();
- let timings = &mut timings.timings;
-
- if let Some(last_timing) = timings.iter_mut().rev().next() {
- if last_timing.location == timing.location {
- last_timing.end = timing.end;
- return;
- }
- }
-
- timings.push_back(timing);
- });
-}
@@ -1,329 +0,0 @@
-use std::{
- fmt,
- iter::FusedIterator,
- sync::{Arc, atomic::AtomicUsize},
-};
-
-use rand::{Rng, SeedableRng, rngs::SmallRng};
-
-use crate::Priority;
-
-struct PriorityQueues<T> {
- high_priority: Vec<T>,
- medium_priority: Vec<T>,
- low_priority: Vec<T>,
-}
-
-impl<T> PriorityQueues<T> {
- fn is_empty(&self) -> bool {
- self.high_priority.is_empty()
- && self.medium_priority.is_empty()
- && self.low_priority.is_empty()
- }
-}
-
-struct PriorityQueueState<T> {
- queues: parking_lot::Mutex<PriorityQueues<T>>,
- condvar: parking_lot::Condvar,
- receiver_count: AtomicUsize,
- sender_count: AtomicUsize,
-}
-
-impl<T> PriorityQueueState<T> {
- fn send(&self, priority: Priority, item: T) -> Result<(), SendError<T>> {
- if self
- .receiver_count
- .load(std::sync::atomic::Ordering::Relaxed)
- == 0
- {
- return Err(SendError(item));
- }
-
- let mut queues = self.queues.lock();
- match priority {
- Priority::Realtime(_) => unreachable!(),
- Priority::High => queues.high_priority.push(item),
- Priority::Medium => queues.medium_priority.push(item),
- Priority::Low => queues.low_priority.push(item),
- };
- self.condvar.notify_one();
- Ok(())
- }
-
- fn recv<'a>(&'a self) -> Result<parking_lot::MutexGuard<'a, PriorityQueues<T>>, RecvError> {
- let mut queues = self.queues.lock();
-
- let sender_count = self.sender_count.load(std::sync::atomic::Ordering::Relaxed);
- if queues.is_empty() && sender_count == 0 {
- return Err(crate::queue::RecvError);
- }
-
- // parking_lot doesn't do spurious wakeups so an if is fine
- if queues.is_empty() {
- self.condvar.wait(&mut queues);
- }
-
- Ok(queues)
- }
-
- fn try_recv<'a>(
- &'a self,
- ) -> Result<Option<parking_lot::MutexGuard<'a, PriorityQueues<T>>>, RecvError> {
- let mut queues = self.queues.lock();
-
- let sender_count = self.sender_count.load(std::sync::atomic::Ordering::Relaxed);
- if queues.is_empty() && sender_count == 0 {
- return Err(crate::queue::RecvError);
- }
-
- if queues.is_empty() {
- Ok(None)
- } else {
- Ok(Some(queues))
- }
- }
-}
-
-pub(crate) struct PriorityQueueSender<T> {
- state: Arc<PriorityQueueState<T>>,
-}
-
-impl<T> PriorityQueueSender<T> {
- fn new(state: Arc<PriorityQueueState<T>>) -> Self {
- Self { state }
- }
-
- pub(crate) fn send(&self, priority: Priority, item: T) -> Result<(), SendError<T>> {
- self.state.send(priority, item)?;
- Ok(())
- }
-}
-
-impl<T> Drop for PriorityQueueSender<T> {
- fn drop(&mut self) {
- self.state
- .sender_count
- .fetch_sub(1, std::sync::atomic::Ordering::AcqRel);
- }
-}
-
-pub(crate) struct PriorityQueueReceiver<T> {
- state: Arc<PriorityQueueState<T>>,
- rand: SmallRng,
- disconnected: bool,
-}
-
-impl<T> Clone for PriorityQueueReceiver<T> {
- fn clone(&self) -> Self {
- self.state
- .receiver_count
- .fetch_add(1, std::sync::atomic::Ordering::AcqRel);
- Self {
- state: Arc::clone(&self.state),
- rand: SmallRng::seed_from_u64(0),
- disconnected: self.disconnected,
- }
- }
-}
-
-pub(crate) struct SendError<T>(T);
-
-impl<T: fmt::Debug> fmt::Debug for SendError<T> {
- fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
- f.debug_tuple("SendError").field(&self.0).finish()
- }
-}
-
-#[derive(Debug)]
-pub(crate) struct RecvError;
-
-#[allow(dead_code)]
-impl<T> PriorityQueueReceiver<T> {
- pub(crate) fn new() -> (PriorityQueueSender<T>, Self) {
- let state = PriorityQueueState {
- queues: parking_lot::Mutex::new(PriorityQueues {
- high_priority: Vec::new(),
- medium_priority: Vec::new(),
- low_priority: Vec::new(),
- }),
- condvar: parking_lot::Condvar::new(),
- receiver_count: AtomicUsize::new(1),
- sender_count: AtomicUsize::new(1),
- };
- let state = Arc::new(state);
-
- let sender = PriorityQueueSender::new(Arc::clone(&state));
-
- let receiver = PriorityQueueReceiver {
- state,
- rand: SmallRng::seed_from_u64(0),
- disconnected: false,
- };
-
- (sender, receiver)
- }
-
- /// Tries to pop one element from the priority queue without blocking.
- ///
- /// This will early return if there are no elements in the queue.
- ///
- /// This method is best suited if you only intend to pop one element, for better performance
- /// on large queues see [`Self::try_iter`]
- ///
- /// # Errors
- ///
- /// If the sender was dropped
- pub(crate) fn try_pop(&mut self) -> Result<Option<T>, RecvError> {
- self.pop_inner(false)
- }
-
- /// Pops an element from the priority queue blocking if necessary.
- ///
- /// This method is best suited if you only intend to pop one element, for better performance
- /// on large queues see [`Self::iter``]
- ///
- /// # Errors
- ///
- /// If the sender was dropped
- pub(crate) fn pop(&mut self) -> Result<T, RecvError> {
- self.pop_inner(true).map(|e| e.unwrap())
- }
-
- /// Returns an iterator over the elements of the queue
- /// this iterator will end when all elements have been consumed and will not wait for new ones.
- pub(crate) fn try_iter(self) -> TryIter<T> {
- TryIter {
- receiver: self,
- ended: false,
- }
- }
-
- /// Returns an iterator over the elements of the queue
- /// this iterator will wait for new elements if the queue is empty.
- pub(crate) fn iter(self) -> Iter<T> {
- Iter(self)
- }
-
- #[inline(always)]
- // algorithm is the loaded die from biased coin from
- // https://www.keithschwarz.com/darts-dice-coins/
- fn pop_inner(&mut self, block: bool) -> Result<Option<T>, RecvError> {
- use Priority as P;
-
- let mut queues = if !block {
- let Some(queues) = self.state.try_recv()? else {
- return Ok(None);
- };
- queues
- } else {
- self.state.recv()?
- };
-
- let high = P::High.probability() * !queues.high_priority.is_empty() as u32;
- let medium = P::Medium.probability() * !queues.medium_priority.is_empty() as u32;
- let low = P::Low.probability() * !queues.low_priority.is_empty() as u32;
- let mut mass = high + medium + low; //%
-
- if !queues.high_priority.is_empty() {
- let flip = self.rand.random_ratio(P::High.probability(), mass);
- if flip {
- return Ok(queues.high_priority.pop());
- }
- mass -= P::High.probability();
- }
-
- if !queues.medium_priority.is_empty() {
- let flip = self.rand.random_ratio(P::Medium.probability(), mass);
- if flip {
- return Ok(queues.medium_priority.pop());
- }
- mass -= P::Medium.probability();
- }
-
- if !queues.low_priority.is_empty() {
- let flip = self.rand.random_ratio(P::Low.probability(), mass);
- if flip {
- return Ok(queues.low_priority.pop());
- }
- }
-
- Ok(None)
- }
-}
-
-impl<T> Drop for PriorityQueueReceiver<T> {
- fn drop(&mut self) {
- self.state
- .receiver_count
- .fetch_sub(1, std::sync::atomic::Ordering::AcqRel);
- }
-}
-
-/// If None is returned the sender disconnected
-pub(crate) struct Iter<T>(PriorityQueueReceiver<T>);
-impl<T> Iterator for Iter<T> {
- type Item = T;
-
- fn next(&mut self) -> Option<Self::Item> {
- self.0.pop_inner(true).ok().flatten()
- }
-}
-impl<T> FusedIterator for Iter<T> {}
-
-/// If None is returned there are no more elements in the queue
-pub(crate) struct TryIter<T> {
- receiver: PriorityQueueReceiver<T>,
- ended: bool,
-}
-impl<T> Iterator for TryIter<T> {
- type Item = Result<T, RecvError>;
-
- fn next(&mut self) -> Option<Self::Item> {
- if self.ended {
- return None;
- }
-
- let res = self.receiver.pop_inner(false);
- self.ended = res.is_err();
-
- res.transpose()
- }
-}
-impl<T> FusedIterator for TryIter<T> {}
-
-#[cfg(test)]
-mod tests {
- use collections::HashSet;
-
- use super::*;
-
- #[test]
- fn all_tasks_get_yielded() {
- let (tx, mut rx) = PriorityQueueReceiver::new();
- tx.send(Priority::Medium, 20).unwrap();
- tx.send(Priority::High, 30).unwrap();
- tx.send(Priority::Low, 10).unwrap();
- tx.send(Priority::Medium, 21).unwrap();
- tx.send(Priority::High, 31).unwrap();
-
- drop(tx);
-
- assert_eq!(
- rx.iter().collect::<HashSet<_>>(),
- [30, 31, 20, 21, 10].into_iter().collect::<HashSet<_>>()
- )
- }
-
- #[test]
- fn new_high_prio_task_get_scheduled_quickly() {
- let (tx, mut rx) = PriorityQueueReceiver::new();
- for _ in 0..100 {
- tx.send(Priority::Low, 1).unwrap();
- }
-
- assert_eq!(rx.pop().unwrap(), 1);
- tx.send(Priority::High, 3).unwrap();
- assert_eq!(rx.pop().unwrap(), 3);
- assert_eq!(rx.pop().unwrap(), 1);
- }
-}
@@ -9,15 +9,14 @@ use crate::{
KeyBinding, KeyContext, KeyDownEvent, KeyEvent, Keystroke, KeystrokeEvent, LayoutId,
LineLayoutIndex, Modifiers, ModifiersChangedEvent, MonochromeSprite, MouseButton, MouseEvent,
MouseMoveEvent, MouseUpEvent, Path, Pixels, PlatformAtlas, PlatformDisplay, PlatformInput,
- PlatformInputHandler, PlatformWindow, Point, PolychromeSprite, Priority, PromptButton,
- PromptLevel, Quad, Render, RenderGlyphParams, RenderImage, RenderImageParams, RenderSvgParams,
- Replay, ResizeEdge, SMOOTH_SVG_SCALE_FACTOR, SUBPIXEL_VARIANTS_X, SUBPIXEL_VARIANTS_Y,
- ScaledPixels, Scene, Shadow, SharedString, Size, StrikethroughStyle, Style, SubscriberSet,
- Subscription, SystemWindowTab, SystemWindowTabController, TabStopMap, TaffyLayoutEngine, Task,
- TextStyle, TextStyleRefinement, TransformationMatrix, Underline, UnderlineStyle,
- WindowAppearance, WindowBackgroundAppearance, WindowBounds, WindowControls, WindowDecorations,
- WindowOptions, WindowParams, WindowTextSystem, point, prelude::*, px, rems, size,
- transparent_black,
+ PlatformInputHandler, PlatformWindow, Point, PolychromeSprite, PromptButton, PromptLevel, Quad,
+ Render, RenderGlyphParams, RenderImage, RenderImageParams, RenderSvgParams, Replay, ResizeEdge,
+ SMOOTH_SVG_SCALE_FACTOR, SUBPIXEL_VARIANTS_X, SUBPIXEL_VARIANTS_Y, ScaledPixels, Scene, Shadow,
+ SharedString, Size, StrikethroughStyle, Style, SubscriberSet, Subscription, SystemWindowTab,
+ SystemWindowTabController, TabStopMap, TaffyLayoutEngine, Task, TextStyle, TextStyleRefinement,
+ TransformationMatrix, Underline, UnderlineStyle, WindowAppearance, WindowBackgroundAppearance,
+ WindowBounds, WindowControls, WindowDecorations, WindowOptions, WindowParams, WindowTextSystem,
+ point, prelude::*, px, rems, size, transparent_black,
};
use anyhow::{Context as _, Result, anyhow};
use collections::{FxHashMap, FxHashSet};
@@ -1726,27 +1725,6 @@ impl Window {
})
}
- /// Spawn the future returned by the given closure on the application thread
- /// pool, with the given priority. The closure is provided a handle to the
- /// current window and an `AsyncWindowContext` for use within your future.
- #[track_caller]
- pub fn spawn_with_priority<AsyncFn, R>(
- &self,
- priority: Priority,
- cx: &App,
- f: AsyncFn,
- ) -> Task<R>
- where
- R: 'static,
- AsyncFn: AsyncFnOnce(&mut AsyncWindowContext) -> R + 'static,
- {
- let handle = self.handle;
- cx.spawn_with_priority(priority, async move |app| {
- let mut async_window_cx = AsyncWindowContext::new_context(app.clone(), handle);
- f(&mut async_window_cx).await
- })
- }
-
fn bounds_changed(&mut self, cx: &mut App) {
self.scale_factor = self.platform_window.scale_factor();
self.viewport_size = self.platform_window.content_size();
@@ -12,7 +12,7 @@ mod session;
use std::{sync::Arc, time::Duration};
use async_dispatcher::{Dispatcher, Runnable, set_dispatcher};
-use gpui::{App, PlatformDispatcher, Priority, RunnableVariant};
+use gpui::{App, PlatformDispatcher, RunnableVariant};
use project::Fs;
pub use runtimelib::ExecutionState;
@@ -46,7 +46,7 @@ fn zed_dispatcher(cx: &mut App) -> impl Dispatcher {
impl Dispatcher for ZedDispatcher {
fn dispatch(&self, runnable: Runnable) {
self.dispatcher
- .dispatch(RunnableVariant::Compat(runnable), None, Priority::default());
+ .dispatch(RunnableVariant::Compat(runnable), None);
}
fn dispatch_after(&self, duration: Duration, runnable: Runnable) {
@@ -22,8 +22,7 @@ use git::{
COMMIT_MESSAGE, DOT_GIT, FSMONITOR_DAEMON, GITIGNORE, INDEX_LOCK, LFS_DIR, status::GitSummary,
};
use gpui::{
- App, AppContext as _, AsyncApp, BackgroundExecutor, Context, Entity, EventEmitter, Priority,
- Task,
+ App, AppContext as _, AsyncApp, BackgroundExecutor, Context, Entity, EventEmitter, Task,
};
use ignore::IgnoreStack;
use language::DiskState;
@@ -4124,7 +4123,7 @@ impl BackgroundScanner {
let progress_update_count = AtomicUsize::new(0);
self.executor
- .scoped_priority(Priority::Low, |scope| {
+ .scoped(|scope| {
for _ in 0..self.executor.num_cpus() {
scope.spawn(async {
let mut last_progress_update_count = 0;
@@ -52,8 +52,6 @@ extend-exclude = [
"crates/project_panel/benches/linux_repo_snapshot.txt",
# Some multibuffer test cases have word fragments that register as typos
"crates/multi_buffer/src/multi_buffer_tests.rs",
- # Macos apis
- "crates/gpui/src/platform/mac/dispatcher.rs",
]
[default]