diff --git a/Cargo.lock b/Cargo.lock index bcd278fd439ab0fa0fa9e3d2a5e676cbb200a189..0e4b6c0eeb711b70c669ebf715fad409f24d6000 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -14365,6 +14365,7 @@ dependencies = [ "async-task", "backtrace", "chrono", + "flume", "futures 0.3.31", "parking_lot", "rand 0.9.2", diff --git a/crates/gpui/src/executor.rs b/crates/gpui/src/executor.rs index ca6be865d49ceb2f58927fc64167d297872498c8..9ff4a9e196d7fda25fa709dc628674099a38a758 100644 --- a/crates/gpui/src/executor.rs +++ b/crates/gpui/src/executor.rs @@ -150,13 +150,8 @@ impl BackgroundExecutor { /// Enqueues the given future to be run to completion on a background thread with the given priority. /// - /// `Priority::Realtime` is currently treated as `Priority::High`. - /// - /// This is intentionally *not* a "downgrade" feature: realtime execution is effectively - /// disabled until we have an in-tree use case and are confident about the semantics and - /// failure modes (especially around channel backpressure and the risk of blocking - /// latency-sensitive threads). It should be straightforward to add a true realtime - /// implementation back once those constraints are well-defined. + /// When `Priority::RealtimeAudio` is used, the task runs on a dedicated thread with + /// realtime scheduling priority, suitable for audio processing. #[track_caller] pub fn spawn_with_priority( &self, @@ -166,7 +161,11 @@ impl BackgroundExecutor { where R: Send + 'static, { - Task::from_scheduler(self.inner.spawn_with_priority(priority, future)) + if priority == Priority::RealtimeAudio { + Task::from_scheduler(self.inner.spawn_realtime(future)) + } else { + Task::from_scheduler(self.inner.spawn_with_priority(priority, future)) + } } /// Enqueues the given future to be run to completion on a background thread and blocking the current task on it. diff --git a/crates/gpui/src/platform_scheduler.rs b/crates/gpui/src/platform_scheduler.rs index 7dce57d830834e8f529abf608a39af9836c54454..4306a730d2ef7b6f19ff071041cb5ca672de8a12 100644 --- a/crates/gpui/src/platform_scheduler.rs +++ b/crates/gpui/src/platform_scheduler.rs @@ -85,6 +85,10 @@ impl Scheduler for PlatformScheduler { self.dispatcher.dispatch(runnable, priority); } + fn spawn_realtime(&self, f: Box) { + self.dispatcher.spawn_realtime(f); + } + fn timer(&self, duration: Duration) -> Timer { use std::sync::{Arc, atomic::AtomicBool}; diff --git a/crates/scheduler/Cargo.toml b/crates/scheduler/Cargo.toml index bbab41dcdb04bad70f390aac3625dcf73e68baa6..c1c791c06736297a284dcc16396f5e5d040c7bad 100644 --- a/crates/scheduler/Cargo.toml +++ b/crates/scheduler/Cargo.toml @@ -19,6 +19,7 @@ test-support = [] async-task.workspace = true backtrace.workspace = true chrono.workspace = true +flume = "0.11" futures.workspace = true parking_lot.workspace = true rand.workspace = true diff --git a/crates/scheduler/src/executor.rs b/crates/scheduler/src/executor.rs index 17ab30fac63b0c4d19e2624618e77f4980801ccd..1bad3a9c3bbfde2a60e1425b7d336cef062ffb49 100644 --- a/crates/scheduler/src/executor.rs +++ b/crates/scheduler/src/executor.rs @@ -179,6 +179,38 @@ impl BackgroundExecutor { Task(TaskState::Spawned(task)) } + /// Spawns a future on a dedicated realtime thread for audio processing. + #[track_caller] + pub fn spawn_realtime(&self, future: F) -> Task + where + F: Future + Send + 'static, + F::Output: Send + 'static, + { + let location = Location::caller(); + let closed = self.closed.clone(); + let (tx, rx) = flume::bounded::>(1); + + self.scheduler.spawn_realtime(Box::new(move || { + while let Ok(runnable) = rx.recv() { + if runnable.metadata().is_closed() { + continue; + } + runnable.run(); + } + })); + + let (runnable, task) = async_task::Builder::new() + .metadata(RunnableMeta { location, closed }) + .spawn( + move |_| future, + move |runnable| { + let _ = tx.send(runnable); + }, + ); + runnable.schedule(); + Task(TaskState::Spawned(task)) + } + pub fn timer(&self, duration: Duration) -> Timer { self.scheduler.timer(duration) } diff --git a/crates/scheduler/src/scheduler.rs b/crates/scheduler/src/scheduler.rs index 7b3207f8790a856654efc642698404ba8c65a535..349492aaa3d970eb44c8c80101415ce2ad7da3a9 100644 --- a/crates/scheduler/src/scheduler.rs +++ b/crates/scheduler/src/scheduler.rs @@ -105,6 +105,9 @@ pub trait Scheduler: Send + Sync { priority: Priority, ); + /// Spawn a closure on a dedicated realtime thread for audio processing. + fn spawn_realtime(&self, f: Box); + /// Schedule a background task with default (medium) priority. fn schedule_background(&self, runnable: Runnable) { self.schedule_background_with_priority(runnable, Priority::default()); diff --git a/crates/scheduler/src/test_scheduler.rs b/crates/scheduler/src/test_scheduler.rs index 54a06a0cca3b45157489c3cb509943d035df9621..54a098dceed7f64b53e941203f6ccf7433f474c9 100644 --- a/crates/scheduler/src/test_scheduler.rs +++ b/crates/scheduler/src/test_scheduler.rs @@ -516,6 +516,12 @@ impl Scheduler for TestScheduler { self.thread.unpark(); } + fn spawn_realtime(&self, f: Box) { + std::thread::spawn(move || { + f(); + }); + } + fn timer(&self, duration: Duration) -> Timer { let (tx, rx) = oneshot::channel(); let state = &mut *self.state.lock();