Fix panic when using RealtimeAudio priority (#46635)

Antonio Scandurra created

The scheduler integration (#44810) removed the special handling for
realtime audio tasks that spawn them on a dedicated thread. This caused
a panic in the Mac dispatcher when RealtimeAudio priority was passed to
the regular dispatch path.

This restores the original behavior: RealtimeAudio tasks are spawned on
a dedicated thread via dispatcher.spawn_realtime(), using a bounded
channel to send runnables to it.

Release Notes:

- N/A

Change summary

Cargo.lock                             |  1 
crates/gpui/src/executor.rs            | 15 ++++++-------
crates/gpui/src/platform_scheduler.rs  |  4 +++
crates/scheduler/Cargo.toml            |  1 
crates/scheduler/src/executor.rs       | 32 ++++++++++++++++++++++++++++
crates/scheduler/src/scheduler.rs      |  3 ++
crates/scheduler/src/test_scheduler.rs |  6 +++++
7 files changed, 54 insertions(+), 8 deletions(-)

Detailed changes

Cargo.lock 🔗

@@ -14365,6 +14365,7 @@ dependencies = [
  "async-task",
  "backtrace",
  "chrono",
+ "flume",
  "futures 0.3.31",
  "parking_lot",
  "rand 0.9.2",

crates/gpui/src/executor.rs 🔗

@@ -150,13 +150,8 @@ impl BackgroundExecutor {
 
     /// Enqueues the given future to be run to completion on a background thread with the given priority.
     ///
-    /// `Priority::Realtime` is currently treated as `Priority::High`.
-    ///
-    /// This is intentionally *not* a "downgrade" feature: realtime execution is effectively
-    /// disabled until we have an in-tree use case and are confident about the semantics and
-    /// failure modes (especially around channel backpressure and the risk of blocking
-    /// latency-sensitive threads). It should be straightforward to add a true realtime
-    /// implementation back once those constraints are well-defined.
+    /// When `Priority::RealtimeAudio` is used, the task runs on a dedicated thread with
+    /// realtime scheduling priority, suitable for audio processing.
     #[track_caller]
     pub fn spawn_with_priority<R>(
         &self,
@@ -166,7 +161,11 @@ impl BackgroundExecutor {
     where
         R: Send + 'static,
     {
-        Task::from_scheduler(self.inner.spawn_with_priority(priority, future))
+        if priority == Priority::RealtimeAudio {
+            Task::from_scheduler(self.inner.spawn_realtime(future))
+        } else {
+            Task::from_scheduler(self.inner.spawn_with_priority(priority, future))
+        }
     }
 
     /// Enqueues the given future to be run to completion on a background thread and blocking the current task on it.

crates/gpui/src/platform_scheduler.rs 🔗

@@ -85,6 +85,10 @@ impl Scheduler for PlatformScheduler {
         self.dispatcher.dispatch(runnable, priority);
     }
 
+    fn spawn_realtime(&self, f: Box<dyn FnOnce() + Send>) {
+        self.dispatcher.spawn_realtime(f);
+    }
+
     fn timer(&self, duration: Duration) -> Timer {
         use std::sync::{Arc, atomic::AtomicBool};
 

crates/scheduler/Cargo.toml 🔗

@@ -19,6 +19,7 @@ test-support = []
 async-task.workspace = true
 backtrace.workspace = true
 chrono.workspace = true
+flume = "0.11"
 futures.workspace = true
 parking_lot.workspace = true
 rand.workspace = true

crates/scheduler/src/executor.rs 🔗

@@ -179,6 +179,38 @@ impl BackgroundExecutor {
         Task(TaskState::Spawned(task))
     }
 
+    /// Spawns a future on a dedicated realtime thread for audio processing.
+    #[track_caller]
+    pub fn spawn_realtime<F>(&self, future: F) -> Task<F::Output>
+    where
+        F: Future + Send + 'static,
+        F::Output: Send + 'static,
+    {
+        let location = Location::caller();
+        let closed = self.closed.clone();
+        let (tx, rx) = flume::bounded::<async_task::Runnable<RunnableMeta>>(1);
+
+        self.scheduler.spawn_realtime(Box::new(move || {
+            while let Ok(runnable) = rx.recv() {
+                if runnable.metadata().is_closed() {
+                    continue;
+                }
+                runnable.run();
+            }
+        }));
+
+        let (runnable, task) = async_task::Builder::new()
+            .metadata(RunnableMeta { location, closed })
+            .spawn(
+                move |_| future,
+                move |runnable| {
+                    let _ = tx.send(runnable);
+                },
+            );
+        runnable.schedule();
+        Task(TaskState::Spawned(task))
+    }
+
     pub fn timer(&self, duration: Duration) -> Timer {
         self.scheduler.timer(duration)
     }

crates/scheduler/src/scheduler.rs 🔗

@@ -105,6 +105,9 @@ pub trait Scheduler: Send + Sync {
         priority: Priority,
     );
 
+    /// Spawn a closure on a dedicated realtime thread for audio processing.
+    fn spawn_realtime(&self, f: Box<dyn FnOnce() + Send>);
+
     /// Schedule a background task with default (medium) priority.
     fn schedule_background(&self, runnable: Runnable<RunnableMeta>) {
         self.schedule_background_with_priority(runnable, Priority::default());

crates/scheduler/src/test_scheduler.rs 🔗

@@ -516,6 +516,12 @@ impl Scheduler for TestScheduler {
         self.thread.unpark();
     }
 
+    fn spawn_realtime(&self, f: Box<dyn FnOnce() + Send>) {
+        std::thread::spawn(move || {
+            f();
+        });
+    }
+
     fn timer(&self, duration: Duration) -> Timer {
         let (tx, rx) = oneshot::channel();
         let state = &mut *self.state.lock();