executor.rs

  1use crate::{App, PlatformDispatcher, PlatformScheduler};
  2use futures::channel::mpsc;
  3use futures::prelude::*;
  4use gpui_util::{TryFutureExt, TryFutureExtBacktrace};
  5use scheduler::Instant;
  6use scheduler::Scheduler;
  7use std::{
  8    fmt::Debug, future::Future, marker::PhantomData, mem, pin::Pin, rc::Rc, sync::Arc,
  9    time::Duration,
 10};
 11
 12pub use scheduler::{FallibleTask, ForegroundExecutor as SchedulerForegroundExecutor, Priority};
 13
 14/// A pointer to the executor that is currently running,
 15/// for spawning background tasks.
 16#[derive(Clone)]
 17pub struct BackgroundExecutor {
 18    inner: scheduler::BackgroundExecutor,
 19    dispatcher: Arc<dyn PlatformDispatcher>,
 20}
 21
 22/// A pointer to the executor that is currently running,
 23/// for spawning tasks on the main thread.
 24#[derive(Clone)]
 25pub struct ForegroundExecutor {
 26    inner: scheduler::ForegroundExecutor,
 27    dispatcher: Arc<dyn PlatformDispatcher>,
 28    not_send: PhantomData<Rc<()>>,
 29}
 30
 31/// Task is a primitive that allows work to happen in the background.
 32///
 33/// It implements [`Future`] so you can `.await` on it.
 34///
 35/// If you drop a task it will be cancelled immediately. Calling [`Task::detach`] allows
 36/// the task to continue running, but with no way to return a value.
 37#[must_use]
 38#[derive(Debug)]
 39pub struct Task<T>(scheduler::Task<T>);
 40
 41impl<T> Task<T> {
 42    /// Creates a new task that will resolve with the value.
 43    pub fn ready(val: T) -> Self {
 44        Task(scheduler::Task::ready(val))
 45    }
 46
 47    /// Returns true if the task has completed or was created with `Task::ready`.
 48    pub fn is_ready(&self) -> bool {
 49        self.0.is_ready()
 50    }
 51
 52    /// Detaching a task runs it to completion in the background.
 53    pub fn detach(self) {
 54        self.0.detach()
 55    }
 56
 57    /// Wraps a scheduler::Task.
 58    pub fn from_scheduler(task: scheduler::Task<T>) -> Self {
 59        Task(task)
 60    }
 61
 62    /// Converts this task into a fallible task that returns `Option<T>`.
 63    ///
 64    /// Unlike the standard `Task<T>`, a [`FallibleTask`] will return `None`
 65    /// if the task was cancelled.
 66    ///
 67    /// # Example
 68    ///
 69    /// ```ignore
 70    /// // Background task that gracefully handles cancellation:
 71    /// cx.background_spawn(async move {
 72    ///     let result = foreground_task.fallible().await;
 73    ///     if let Some(value) = result {
 74    ///         // Process the value
 75    ///     }
 76    ///     // If None, task was cancelled - just exit gracefully
 77    /// }).detach();
 78    /// ```
 79    pub fn fallible(self) -> FallibleTask<T> {
 80        self.0.fallible()
 81    }
 82}
 83
 84impl<T, E> Task<Result<T, E>>
 85where
 86    T: 'static,
 87    E: 'static + std::fmt::Display,
 88{
 89    /// Run the task to completion in the background and log any errors that occur.
 90    #[track_caller]
 91    pub fn detach_and_log_err(self, cx: &App) {
 92        let location = core::panic::Location::caller();
 93        cx.foreground_executor()
 94            .spawn(self.log_tracked_err(*location))
 95            .detach();
 96    }
 97}
 98
 99impl<T, E> Task<Result<T, E>>
100where
101    T: 'static,
102    E: 'static + std::fmt::Debug,
103{
104    /// Like [`Self::detach_and_log_err`], but uses `{:?}` formatting on failure so `anyhow::Error`
105    /// values emit their full backtrace. Prefer `detach_and_log_err` unless a backtrace is wanted.
106    #[track_caller]
107    pub fn detach_and_log_err_with_backtrace(self, cx: &App) {
108        let location = *core::panic::Location::caller();
109        cx.foreground_executor()
110            .spawn(self.log_tracked_err_with_backtrace(location))
111            .detach();
112    }
113}
114
115impl<T> std::future::Future for Task<T> {
116    type Output = T;
117
118    fn poll(
119        self: std::pin::Pin<&mut Self>,
120        cx: &mut std::task::Context<'_>,
121    ) -> std::task::Poll<Self::Output> {
122        // SAFETY: Task is a repr(transparent) wrapper around scheduler::Task,
123        // and we're just projecting the pin through to the inner task.
124        let inner = unsafe { self.map_unchecked_mut(|t| &mut t.0) };
125        inner.poll(cx)
126    }
127}
128
129impl BackgroundExecutor {
130    /// Creates a new BackgroundExecutor from the given PlatformDispatcher.
131    pub fn new(dispatcher: Arc<dyn PlatformDispatcher>) -> Self {
132        #[cfg(any(test, feature = "test-support"))]
133        let scheduler: Arc<dyn Scheduler> = if let Some(test_dispatcher) = dispatcher.as_test() {
134            test_dispatcher.scheduler().clone()
135        } else {
136            Arc::new(PlatformScheduler::new(dispatcher.clone()))
137        };
138
139        #[cfg(not(any(test, feature = "test-support")))]
140        let scheduler: Arc<dyn Scheduler> = Arc::new(PlatformScheduler::new(dispatcher.clone()));
141
142        Self {
143            inner: scheduler::BackgroundExecutor::new(scheduler),
144            dispatcher,
145        }
146    }
147
148    /// Returns the underlying scheduler::BackgroundExecutor.
149    ///
150    /// This is used by Ex to pass the executor to thread/worktree code.
151    pub fn scheduler_executor(&self) -> scheduler::BackgroundExecutor {
152        self.inner.clone()
153    }
154
155    /// Enqueues the given future to be run to completion on a background thread.
156    #[track_caller]
157    pub fn spawn<R>(&self, future: impl Future<Output = R> + Send + 'static) -> Task<R>
158    where
159        R: Send + 'static,
160    {
161        self.spawn_with_priority(Priority::default(), future.boxed())
162    }
163
164    /// Enqueues the given future to be run to completion on a background thread with the given priority.
165    ///
166    /// When `Priority::RealtimeAudio` is used, the task runs on a dedicated thread with
167    /// realtime scheduling priority, suitable for audio processing.
168    #[track_caller]
169    pub fn spawn_with_priority<R>(
170        &self,
171        priority: Priority,
172        future: impl Future<Output = R> + Send + 'static,
173    ) -> Task<R>
174    where
175        R: Send + 'static,
176    {
177        if priority == Priority::RealtimeAudio {
178            Task::from_scheduler(self.inner.spawn_realtime(future))
179        } else {
180            Task::from_scheduler(self.inner.spawn_with_priority(priority, future))
181        }
182    }
183
184    /// Enqueues the given future to be run to completion on a background thread and blocking the current task on it.
185    ///
186    /// This allows to spawn background work that borrows from its scope. Note that the supplied future will run to
187    /// completion before the current task is resumed, even if the current task is slated for cancellation.
188    pub async fn await_on_background<R>(&self, future: impl Future<Output = R> + Send) -> R
189    where
190        R: Send,
191    {
192        use crate::RunnableMeta;
193        use parking_lot::{Condvar, Mutex};
194
195        struct NotifyOnDrop<'a>(&'a (Condvar, Mutex<bool>));
196
197        impl Drop for NotifyOnDrop<'_> {
198            fn drop(&mut self) {
199                *self.0.1.lock() = true;
200                self.0.0.notify_all();
201            }
202        }
203
204        struct WaitOnDrop<'a>(&'a (Condvar, Mutex<bool>));
205
206        impl Drop for WaitOnDrop<'_> {
207            fn drop(&mut self) {
208                let mut done = self.0.1.lock();
209                if !*done {
210                    self.0.0.wait(&mut done);
211                }
212            }
213        }
214
215        let dispatcher = self.dispatcher.clone();
216        let location = core::panic::Location::caller();
217
218        let pair = &(Condvar::new(), Mutex::new(false));
219        let _wait_guard = WaitOnDrop(pair);
220
221        let (runnable, task) = unsafe {
222            async_task::Builder::new()
223                .metadata(RunnableMeta { location })
224                .spawn_unchecked(
225                    move |_| async {
226                        let _notify_guard = NotifyOnDrop(pair);
227                        future.await
228                    },
229                    move |runnable| {
230                        dispatcher.dispatch(runnable, Priority::default());
231                    },
232                )
233        };
234        runnable.schedule();
235        task.await
236    }
237
238    /// Scoped lets you start a number of tasks and waits
239    /// for all of them to complete before returning.
240    pub async fn scoped<'scope, F>(&self, scheduler: F)
241    where
242        F: FnOnce(&mut Scope<'scope>),
243    {
244        let mut scope = Scope::new(self.clone(), Priority::default());
245        (scheduler)(&mut scope);
246        let spawned = mem::take(&mut scope.futures)
247            .into_iter()
248            .map(|f| self.spawn_with_priority(scope.priority, f))
249            .collect::<Vec<_>>();
250        for task in spawned {
251            task.await;
252        }
253    }
254
255    /// Scoped lets you start a number of tasks and waits
256    /// for all of them to complete before returning.
257    pub async fn scoped_priority<'scope, F>(&self, priority: Priority, scheduler: F)
258    where
259        F: FnOnce(&mut Scope<'scope>),
260    {
261        let mut scope = Scope::new(self.clone(), priority);
262        (scheduler)(&mut scope);
263        let spawned = mem::take(&mut scope.futures)
264            .into_iter()
265            .map(|f| self.spawn_with_priority(scope.priority, f))
266            .collect::<Vec<_>>();
267        for task in spawned {
268            task.await;
269        }
270    }
271
272    /// Get the current time.
273    ///
274    /// Calling this instead of `std::time::Instant::now` allows the use
275    /// of fake timers in tests.
276    pub fn now(&self) -> Instant {
277        self.inner.scheduler().clock().now()
278    }
279
280    /// Returns a task that will complete after the given duration.
281    /// Depending on other concurrent tasks the elapsed duration may be longer
282    /// than requested.
283    #[track_caller]
284    pub fn timer(&self, duration: Duration) -> Task<()> {
285        if duration.is_zero() {
286            return Task::ready(());
287        }
288        self.spawn(self.inner.scheduler().timer(duration))
289    }
290
291    /// In tests, run an arbitrary number of tasks (determined by the SEED environment variable)
292    #[cfg(any(test, feature = "test-support"))]
293    pub fn simulate_random_delay(&self) -> impl Future<Output = ()> + use<> {
294        self.dispatcher.as_test().unwrap().simulate_random_delay()
295    }
296
297    /// In tests, move time forward. This does not run any tasks, but does make `timer`s ready.
298    #[cfg(any(test, feature = "test-support"))]
299    pub fn advance_clock(&self, duration: Duration) {
300        self.dispatcher.as_test().unwrap().advance_clock(duration)
301    }
302
303    /// In tests, run one task.
304    #[cfg(any(test, feature = "test-support"))]
305    pub fn tick(&self) -> bool {
306        self.dispatcher.as_test().unwrap().scheduler().tick()
307    }
308
309    /// In tests, run tasks until the scheduler would park.
310    ///
311    /// Under the scheduler-backed test dispatcher, `tick()` will not advance the clock, so a pending
312    /// timer can keep `has_pending_tasks()` true even after all currently-runnable tasks have been
313    /// drained. To preserve the historical semantics that tests relied on (drain all work that can
314    /// make progress), we advance the clock to the next timer when no runnable tasks remain.
315    #[cfg(any(test, feature = "test-support"))]
316    pub fn run_until_parked(&self) {
317        let scheduler = self.dispatcher.as_test().unwrap().scheduler();
318        scheduler.run();
319    }
320
321    /// In tests, prevents `run_until_parked` from panicking if there are outstanding tasks.
322    #[cfg(any(test, feature = "test-support"))]
323    pub fn allow_parking(&self) {
324        self.dispatcher
325            .as_test()
326            .unwrap()
327            .scheduler()
328            .allow_parking();
329
330        if std::env::var("GPUI_RUN_UNTIL_PARKED_LOG").ok().as_deref() == Some("1") {
331            log::warn!("[gpui::executor] allow_parking: enabled");
332        }
333    }
334
335    /// Sets the range of ticks to run before timing out in block_on.
336    #[cfg(any(test, feature = "test-support"))]
337    pub fn set_block_on_ticks(&self, range: std::ops::RangeInclusive<usize>) {
338        self.dispatcher
339            .as_test()
340            .unwrap()
341            .scheduler()
342            .set_timeout_ticks(range);
343    }
344
345    /// Undoes the effect of [`Self::allow_parking`].
346    #[cfg(any(test, feature = "test-support"))]
347    pub fn forbid_parking(&self) {
348        self.dispatcher
349            .as_test()
350            .unwrap()
351            .scheduler()
352            .forbid_parking();
353    }
354
355    /// In tests, returns the rng used by the dispatcher.
356    #[cfg(any(test, feature = "test-support"))]
357    pub fn rng(&self) -> scheduler::SharedRng {
358        self.dispatcher.as_test().unwrap().scheduler().rng()
359    }
360
361    /// How many CPUs are available to the dispatcher.
362    pub fn num_cpus(&self) -> usize {
363        #[cfg(any(test, feature = "test-support"))]
364        if let Some(test) = self.dispatcher.as_test() {
365            return test.num_cpus_override().unwrap_or(4);
366        }
367        num_cpus::get()
368    }
369
370    /// Override the number of CPUs reported by this executor in tests.
371    /// Panics if not called on a test executor.
372    #[cfg(any(test, feature = "test-support"))]
373    pub fn set_num_cpus(&self, count: usize) {
374        self.dispatcher
375            .as_test()
376            .expect("set_num_cpus can only be called on a test executor")
377            .set_num_cpus(count);
378    }
379
380    /// Whether we're on the main thread.
381    pub fn is_main_thread(&self) -> bool {
382        self.dispatcher.is_main_thread()
383    }
384
385    #[doc(hidden)]
386    pub fn dispatcher(&self) -> &Arc<dyn PlatformDispatcher> {
387        &self.dispatcher
388    }
389}
390
391impl ForegroundExecutor {
392    /// Creates a new ForegroundExecutor from the given PlatformDispatcher.
393    pub fn new(dispatcher: Arc<dyn PlatformDispatcher>) -> Self {
394        #[cfg(any(test, feature = "test-support"))]
395        let (scheduler, session_id): (Arc<dyn Scheduler>, _) =
396            if let Some(test_dispatcher) = dispatcher.as_test() {
397                (
398                    test_dispatcher.scheduler().clone(),
399                    test_dispatcher.session_id(),
400                )
401            } else {
402                let platform_scheduler = Arc::new(PlatformScheduler::new(dispatcher.clone()));
403                let session_id = platform_scheduler.allocate_session_id();
404                (platform_scheduler, session_id)
405            };
406
407        #[cfg(not(any(test, feature = "test-support")))]
408        let (scheduler, session_id): (Arc<dyn Scheduler>, _) = {
409            let platform_scheduler = Arc::new(PlatformScheduler::new(dispatcher.clone()));
410            let session_id = platform_scheduler.allocate_session_id();
411            (platform_scheduler, session_id)
412        };
413
414        let inner = scheduler::ForegroundExecutor::new(session_id, scheduler);
415
416        Self {
417            inner,
418            dispatcher,
419            not_send: PhantomData,
420        }
421    }
422
423    /// Enqueues the given Task to run on the main thread.
424    #[track_caller]
425    pub fn spawn<R>(&self, future: impl Future<Output = R> + 'static) -> Task<R>
426    where
427        R: 'static,
428    {
429        Task::from_scheduler(self.inner.spawn(future.boxed_local()))
430    }
431
432    /// Enqueues the given Task to run on the main thread with the given priority.
433    #[track_caller]
434    pub fn spawn_with_priority<R>(
435        &self,
436        _priority: Priority,
437        future: impl Future<Output = R> + 'static,
438    ) -> Task<R>
439    where
440        R: 'static,
441    {
442        // Priority is ignored for foreground tasks - they run in order on the main thread
443        Task::from_scheduler(self.inner.spawn(future))
444    }
445
446    /// Used by the test harness to run an async test in a synchronous fashion.
447    #[cfg(any(test, feature = "test-support"))]
448    #[track_caller]
449    pub fn block_test<R>(&self, future: impl Future<Output = R>) -> R {
450        use std::cell::Cell;
451
452        let scheduler = self.inner.scheduler();
453
454        let output = Cell::new(None);
455        let future = async {
456            output.set(Some(future.await));
457        };
458        let mut future = std::pin::pin!(future);
459
460        // In async GPUI tests, we must allow foreground tasks scheduled by the test itself
461        // (which are associated with the test session) to make progress while we block.
462        // Otherwise, awaiting futures that depend on same-session foreground work can deadlock.
463        scheduler.block(None, future.as_mut(), None);
464
465        output.take().expect("block_test future did not complete")
466    }
467
468    /// Block the current thread until the given future resolves.
469    /// Consider using `block_with_timeout` instead.
470    pub fn block_on<R>(&self, future: impl Future<Output = R>) -> R {
471        self.inner.block_on(future)
472    }
473
474    /// Block the current thread until the given future resolves or the timeout elapses.
475    pub fn block_with_timeout<R, Fut: Future<Output = R>>(
476        &self,
477        duration: Duration,
478        future: Fut,
479    ) -> Result<R, impl Future<Output = R> + use<R, Fut>> {
480        self.inner.block_with_timeout(duration, future)
481    }
482
483    #[doc(hidden)]
484    pub fn dispatcher(&self) -> &Arc<dyn PlatformDispatcher> {
485        &self.dispatcher
486    }
487
488    #[doc(hidden)]
489    pub fn scheduler_executor(&self) -> SchedulerForegroundExecutor {
490        self.inner.clone()
491    }
492}
493
494/// Scope manages a set of tasks that are enqueued and waited on together. See [`BackgroundExecutor::scoped`].
495pub struct Scope<'a> {
496    executor: BackgroundExecutor,
497    priority: Priority,
498    futures: Vec<Pin<Box<dyn Future<Output = ()> + Send + 'static>>>,
499    tx: Option<mpsc::Sender<()>>,
500    rx: mpsc::Receiver<()>,
501    lifetime: PhantomData<&'a ()>,
502}
503
504impl<'a> Scope<'a> {
505    fn new(executor: BackgroundExecutor, priority: Priority) -> Self {
506        let (tx, rx) = mpsc::channel(1);
507        Self {
508            executor,
509            priority,
510            tx: Some(tx),
511            rx,
512            futures: Default::default(),
513            lifetime: PhantomData,
514        }
515    }
516
517    /// How many CPUs are available to the dispatcher.
518    pub fn num_cpus(&self) -> usize {
519        self.executor.num_cpus()
520    }
521
522    /// Spawn a future into this scope.
523    #[track_caller]
524    pub fn spawn<F>(&mut self, f: F)
525    where
526        F: Future<Output = ()> + Send + 'a,
527    {
528        let tx = self.tx.clone().unwrap();
529
530        // SAFETY: The 'a lifetime is guaranteed to outlive any of these futures because
531        // dropping this `Scope` blocks until all of the futures have resolved.
532        let f = unsafe {
533            mem::transmute::<
534                Pin<Box<dyn Future<Output = ()> + Send + 'a>>,
535                Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
536            >(Box::pin(async move {
537                f.await;
538                drop(tx);
539            }))
540        };
541        self.futures.push(f);
542    }
543}
544
545impl Drop for Scope<'_> {
546    fn drop(&mut self) {
547        self.tx.take().unwrap();
548
549        // Wait until the channel is closed, which means that all of the spawned
550        // futures have resolved.
551        let future = async {
552            self.rx.next().await;
553        };
554        let mut future = std::pin::pin!(future);
555        self.executor
556            .inner
557            .scheduler()
558            .block(None, future.as_mut(), None);
559    }
560}
561
562#[cfg(test)]
563mod test {
564    use super::*;
565    use crate::{App, TestDispatcher, TestPlatform};
566    use std::cell::RefCell;
567
568    /// Helper to create test infrastructure.
569    /// Returns (dispatcher, background_executor, app).
570    fn create_test_app() -> (TestDispatcher, BackgroundExecutor, Rc<crate::AppCell>) {
571        let dispatcher = TestDispatcher::new(0);
572        let arc_dispatcher = Arc::new(dispatcher.clone());
573        let background_executor = BackgroundExecutor::new(arc_dispatcher.clone());
574        let foreground_executor = ForegroundExecutor::new(arc_dispatcher);
575
576        let platform = TestPlatform::new(background_executor.clone(), foreground_executor);
577        let asset_source = Arc::new(());
578        let http_client = http_client::FakeHttpClient::with_404_response();
579
580        let app = App::new_app(platform, asset_source, http_client);
581        (dispatcher, background_executor, app)
582    }
583
584    #[test]
585    fn sanity_test_tasks_run() {
586        let (dispatcher, _background_executor, app) = create_test_app();
587        let foreground_executor = app.borrow().foreground_executor.clone();
588
589        let task_ran = Rc::new(RefCell::new(false));
590
591        foreground_executor
592            .spawn({
593                let task_ran = Rc::clone(&task_ran);
594                async move {
595                    *task_ran.borrow_mut() = true;
596                }
597            })
598            .detach();
599
600        // Run dispatcher while app is still alive
601        dispatcher.run_until_parked();
602
603        // Task should have run
604        assert!(
605            *task_ran.borrow(),
606            "Task should run normally when app is alive"
607        );
608    }
609}