executor.rs

  1use crate::{App, PlatformDispatcher, PlatformScheduler};
  2use futures::channel::mpsc;
  3use futures::prelude::*;
  4use gpui_util::TryFutureExt;
  5use scheduler::Instant;
  6use scheduler::Scheduler;
  7use std::{
  8    fmt::Debug, future::Future, marker::PhantomData, mem, pin::Pin, rc::Rc, sync::Arc,
  9    time::Duration,
 10};
 11
 12pub use scheduler::{FallibleTask, ForegroundExecutor as SchedulerForegroundExecutor, Priority};
 13
 14/// A pointer to the executor that is currently running,
 15/// for spawning background tasks.
 16#[derive(Clone)]
 17pub struct BackgroundExecutor {
 18    inner: scheduler::BackgroundExecutor,
 19    dispatcher: Arc<dyn PlatformDispatcher>,
 20}
 21
 22/// A pointer to the executor that is currently running,
 23/// for spawning tasks on the main thread.
 24#[derive(Clone)]
 25pub struct ForegroundExecutor {
 26    inner: scheduler::ForegroundExecutor,
 27    dispatcher: Arc<dyn PlatformDispatcher>,
 28    not_send: PhantomData<Rc<()>>,
 29}
 30
 31/// Task is a primitive that allows work to happen in the background.
 32///
 33/// It implements [`Future`] so you can `.await` on it.
 34///
 35/// If you drop a task it will be cancelled immediately. Calling [`Task::detach`] allows
 36/// the task to continue running, but with no way to return a value.
 37#[must_use]
 38#[derive(Debug)]
 39pub struct Task<T>(scheduler::Task<T>);
 40
 41impl<T> Task<T> {
 42    /// Creates a new task that will resolve with the value.
 43    pub fn ready(val: T) -> Self {
 44        Task(scheduler::Task::ready(val))
 45    }
 46
 47    /// Returns true if the task has completed or was created with `Task::ready`.
 48    pub fn is_ready(&self) -> bool {
 49        self.0.is_ready()
 50    }
 51
 52    /// Detaching a task runs it to completion in the background.
 53    pub fn detach(self) {
 54        self.0.detach()
 55    }
 56
 57    /// Wraps a scheduler::Task.
 58    pub fn from_scheduler(task: scheduler::Task<T>) -> Self {
 59        Task(task)
 60    }
 61
 62    /// Converts this task into a fallible task that returns `Option<T>`.
 63    ///
 64    /// Unlike the standard `Task<T>`, a [`FallibleTask`] will return `None`
 65    /// if the task was cancelled.
 66    ///
 67    /// # Example
 68    ///
 69    /// ```ignore
 70    /// // Background task that gracefully handles cancellation:
 71    /// cx.background_spawn(async move {
 72    ///     let result = foreground_task.fallible().await;
 73    ///     if let Some(value) = result {
 74    ///         // Process the value
 75    ///     }
 76    ///     // If None, task was cancelled - just exit gracefully
 77    /// }).detach();
 78    /// ```
 79    pub fn fallible(self) -> FallibleTask<T> {
 80        self.0.fallible()
 81    }
 82}
 83
 84impl<T, E> Task<Result<T, E>>
 85where
 86    T: 'static,
 87    E: 'static + Debug,
 88{
 89    /// Run the task to completion in the background and log any errors that occur.
 90    #[track_caller]
 91    pub fn detach_and_log_err(self, cx: &App) {
 92        let location = core::panic::Location::caller();
 93        cx.foreground_executor()
 94            .spawn(self.log_tracked_err(*location))
 95            .detach();
 96    }
 97}
 98
 99impl<T> std::future::Future for Task<T> {
100    type Output = T;
101
102    fn poll(
103        self: std::pin::Pin<&mut Self>,
104        cx: &mut std::task::Context<'_>,
105    ) -> std::task::Poll<Self::Output> {
106        // SAFETY: Task is a repr(transparent) wrapper around scheduler::Task,
107        // and we're just projecting the pin through to the inner task.
108        let inner = unsafe { self.map_unchecked_mut(|t| &mut t.0) };
109        inner.poll(cx)
110    }
111}
112
113impl BackgroundExecutor {
114    /// Creates a new BackgroundExecutor from the given PlatformDispatcher.
115    pub fn new(dispatcher: Arc<dyn PlatformDispatcher>) -> Self {
116        #[cfg(any(test, feature = "test-support"))]
117        let scheduler: Arc<dyn Scheduler> = if let Some(test_dispatcher) = dispatcher.as_test() {
118            test_dispatcher.scheduler().clone()
119        } else {
120            Arc::new(PlatformScheduler::new(dispatcher.clone()))
121        };
122
123        #[cfg(not(any(test, feature = "test-support")))]
124        let scheduler: Arc<dyn Scheduler> = Arc::new(PlatformScheduler::new(dispatcher.clone()));
125
126        Self {
127            inner: scheduler::BackgroundExecutor::new(scheduler),
128            dispatcher,
129        }
130    }
131
132    /// Enqueues the given future to be run to completion on a background thread.
133    #[track_caller]
134    pub fn spawn<R>(&self, future: impl Future<Output = R> + Send + 'static) -> Task<R>
135    where
136        R: Send + 'static,
137    {
138        self.spawn_with_priority(Priority::default(), future.boxed())
139    }
140
141    /// Enqueues the given future to be run to completion on a background thread with the given priority.
142    ///
143    /// When `Priority::RealtimeAudio` is used, the task runs on a dedicated thread with
144    /// realtime scheduling priority, suitable for audio processing.
145    #[track_caller]
146    pub fn spawn_with_priority<R>(
147        &self,
148        priority: Priority,
149        future: impl Future<Output = R> + Send + 'static,
150    ) -> Task<R>
151    where
152        R: Send + 'static,
153    {
154        if priority == Priority::RealtimeAudio {
155            Task::from_scheduler(self.inner.spawn_realtime(future))
156        } else {
157            Task::from_scheduler(self.inner.spawn_with_priority(priority, future))
158        }
159    }
160
161    /// Enqueues the given future to be run to completion on a background thread and blocking the current task on it.
162    ///
163    /// This allows to spawn background work that borrows from its scope. Note that the supplied future will run to
164    /// completion before the current task is resumed, even if the current task is slated for cancellation.
165    pub async fn await_on_background<R>(&self, future: impl Future<Output = R> + Send) -> R
166    where
167        R: Send,
168    {
169        use crate::RunnableMeta;
170        use parking_lot::{Condvar, Mutex};
171
172        struct NotifyOnDrop<'a>(&'a (Condvar, Mutex<bool>));
173
174        impl Drop for NotifyOnDrop<'_> {
175            fn drop(&mut self) {
176                *self.0.1.lock() = true;
177                self.0.0.notify_all();
178            }
179        }
180
181        struct WaitOnDrop<'a>(&'a (Condvar, Mutex<bool>));
182
183        impl Drop for WaitOnDrop<'_> {
184            fn drop(&mut self) {
185                let mut done = self.0.1.lock();
186                if !*done {
187                    self.0.0.wait(&mut done);
188                }
189            }
190        }
191
192        let dispatcher = self.dispatcher.clone();
193        let location = core::panic::Location::caller();
194
195        let pair = &(Condvar::new(), Mutex::new(false));
196        let _wait_guard = WaitOnDrop(pair);
197
198        let (runnable, task) = unsafe {
199            async_task::Builder::new()
200                .metadata(RunnableMeta { location })
201                .spawn_unchecked(
202                    move |_| async {
203                        let _notify_guard = NotifyOnDrop(pair);
204                        future.await
205                    },
206                    move |runnable| {
207                        dispatcher.dispatch(runnable, Priority::default());
208                    },
209                )
210        };
211        runnable.schedule();
212        task.await
213    }
214
215    /// Scoped lets you start a number of tasks and waits
216    /// for all of them to complete before returning.
217    pub async fn scoped<'scope, F>(&self, scheduler: F)
218    where
219        F: FnOnce(&mut Scope<'scope>),
220    {
221        let mut scope = Scope::new(self.clone(), Priority::default());
222        (scheduler)(&mut scope);
223        let spawned = mem::take(&mut scope.futures)
224            .into_iter()
225            .map(|f| self.spawn_with_priority(scope.priority, f))
226            .collect::<Vec<_>>();
227        for task in spawned {
228            task.await;
229        }
230    }
231
232    /// Scoped lets you start a number of tasks and waits
233    /// for all of them to complete before returning.
234    pub async fn scoped_priority<'scope, F>(&self, priority: Priority, scheduler: F)
235    where
236        F: FnOnce(&mut Scope<'scope>),
237    {
238        let mut scope = Scope::new(self.clone(), priority);
239        (scheduler)(&mut scope);
240        let spawned = mem::take(&mut scope.futures)
241            .into_iter()
242            .map(|f| self.spawn_with_priority(scope.priority, f))
243            .collect::<Vec<_>>();
244        for task in spawned {
245            task.await;
246        }
247    }
248
249    /// Get the current time.
250    ///
251    /// Calling this instead of `std::time::Instant::now` allows the use
252    /// of fake timers in tests.
253    pub fn now(&self) -> Instant {
254        self.inner.scheduler().clock().now()
255    }
256
257    /// Returns a task that will complete after the given duration.
258    /// Depending on other concurrent tasks the elapsed duration may be longer
259    /// than requested.
260    #[track_caller]
261    pub fn timer(&self, duration: Duration) -> Task<()> {
262        if duration.is_zero() {
263            return Task::ready(());
264        }
265        self.spawn(self.inner.scheduler().timer(duration))
266    }
267
268    /// In tests, run an arbitrary number of tasks (determined by the SEED environment variable)
269    #[cfg(any(test, feature = "test-support"))]
270    pub fn simulate_random_delay(&self) -> impl Future<Output = ()> + use<> {
271        self.dispatcher.as_test().unwrap().simulate_random_delay()
272    }
273
274    /// In tests, move time forward. This does not run any tasks, but does make `timer`s ready.
275    #[cfg(any(test, feature = "test-support"))]
276    pub fn advance_clock(&self, duration: Duration) {
277        self.dispatcher.as_test().unwrap().advance_clock(duration)
278    }
279
280    /// In tests, run one task.
281    #[cfg(any(test, feature = "test-support"))]
282    pub fn tick(&self) -> bool {
283        self.dispatcher.as_test().unwrap().scheduler().tick()
284    }
285
286    /// In tests, run tasks until the scheduler would park.
287    ///
288    /// Under the scheduler-backed test dispatcher, `tick()` will not advance the clock, so a pending
289    /// timer can keep `has_pending_tasks()` true even after all currently-runnable tasks have been
290    /// drained. To preserve the historical semantics that tests relied on (drain all work that can
291    /// make progress), we advance the clock to the next timer when no runnable tasks remain.
292    #[cfg(any(test, feature = "test-support"))]
293    pub fn run_until_parked(&self) {
294        let scheduler = self.dispatcher.as_test().unwrap().scheduler();
295        scheduler.run();
296    }
297
298    /// In tests, prevents `run_until_parked` from panicking if there are outstanding tasks.
299    #[cfg(any(test, feature = "test-support"))]
300    pub fn allow_parking(&self) {
301        self.dispatcher
302            .as_test()
303            .unwrap()
304            .scheduler()
305            .allow_parking();
306
307        if std::env::var("GPUI_RUN_UNTIL_PARKED_LOG").ok().as_deref() == Some("1") {
308            log::warn!("[gpui::executor] allow_parking: enabled");
309        }
310    }
311
312    /// Sets the range of ticks to run before timing out in block_on.
313    #[cfg(any(test, feature = "test-support"))]
314    pub fn set_block_on_ticks(&self, range: std::ops::RangeInclusive<usize>) {
315        self.dispatcher
316            .as_test()
317            .unwrap()
318            .scheduler()
319            .set_timeout_ticks(range);
320    }
321
322    /// Undoes the effect of [`Self::allow_parking`].
323    #[cfg(any(test, feature = "test-support"))]
324    pub fn forbid_parking(&self) {
325        self.dispatcher
326            .as_test()
327            .unwrap()
328            .scheduler()
329            .forbid_parking();
330    }
331
332    /// In tests, returns the rng used by the dispatcher.
333    #[cfg(any(test, feature = "test-support"))]
334    pub fn rng(&self) -> scheduler::SharedRng {
335        self.dispatcher.as_test().unwrap().scheduler().rng()
336    }
337
338    /// How many CPUs are available to the dispatcher.
339    pub fn num_cpus(&self) -> usize {
340        #[cfg(any(test, feature = "test-support"))]
341        if let Some(test) = self.dispatcher.as_test() {
342            return test.num_cpus_override().unwrap_or(4);
343        }
344        num_cpus::get()
345    }
346
347    /// Override the number of CPUs reported by this executor in tests.
348    /// Panics if not called on a test executor.
349    #[cfg(any(test, feature = "test-support"))]
350    pub fn set_num_cpus(&self, count: usize) {
351        self.dispatcher
352            .as_test()
353            .expect("set_num_cpus can only be called on a test executor")
354            .set_num_cpus(count);
355    }
356
357    /// Whether we're on the main thread.
358    pub fn is_main_thread(&self) -> bool {
359        self.dispatcher.is_main_thread()
360    }
361
362    #[doc(hidden)]
363    pub fn dispatcher(&self) -> &Arc<dyn PlatformDispatcher> {
364        &self.dispatcher
365    }
366}
367
368impl ForegroundExecutor {
369    /// Creates a new ForegroundExecutor from the given PlatformDispatcher.
370    pub fn new(dispatcher: Arc<dyn PlatformDispatcher>) -> Self {
371        #[cfg(any(test, feature = "test-support"))]
372        let (scheduler, session_id): (Arc<dyn Scheduler>, _) =
373            if let Some(test_dispatcher) = dispatcher.as_test() {
374                (
375                    test_dispatcher.scheduler().clone(),
376                    test_dispatcher.session_id(),
377                )
378            } else {
379                let platform_scheduler = Arc::new(PlatformScheduler::new(dispatcher.clone()));
380                let session_id = platform_scheduler.allocate_session_id();
381                (platform_scheduler, session_id)
382            };
383
384        #[cfg(not(any(test, feature = "test-support")))]
385        let (scheduler, session_id): (Arc<dyn Scheduler>, _) = {
386            let platform_scheduler = Arc::new(PlatformScheduler::new(dispatcher.clone()));
387            let session_id = platform_scheduler.allocate_session_id();
388            (platform_scheduler, session_id)
389        };
390
391        let inner = scheduler::ForegroundExecutor::new(session_id, scheduler);
392
393        Self {
394            inner,
395            dispatcher,
396            not_send: PhantomData,
397        }
398    }
399
400    /// Enqueues the given Task to run on the main thread.
401    #[track_caller]
402    pub fn spawn<R>(&self, future: impl Future<Output = R> + 'static) -> Task<R>
403    where
404        R: 'static,
405    {
406        Task::from_scheduler(self.inner.spawn(future.boxed_local()))
407    }
408
409    /// Enqueues the given Task to run on the main thread with the given priority.
410    #[track_caller]
411    pub fn spawn_with_priority<R>(
412        &self,
413        _priority: Priority,
414        future: impl Future<Output = R> + 'static,
415    ) -> Task<R>
416    where
417        R: 'static,
418    {
419        // Priority is ignored for foreground tasks - they run in order on the main thread
420        Task::from_scheduler(self.inner.spawn(future))
421    }
422
423    /// Used by the test harness to run an async test in a synchronous fashion.
424    #[cfg(any(test, feature = "test-support"))]
425    #[track_caller]
426    pub fn block_test<R>(&self, future: impl Future<Output = R>) -> R {
427        use std::cell::Cell;
428
429        let scheduler = self.inner.scheduler();
430
431        let output = Cell::new(None);
432        let future = async {
433            output.set(Some(future.await));
434        };
435        let mut future = std::pin::pin!(future);
436
437        // In async GPUI tests, we must allow foreground tasks scheduled by the test itself
438        // (which are associated with the test session) to make progress while we block.
439        // Otherwise, awaiting futures that depend on same-session foreground work can deadlock.
440        scheduler.block(None, future.as_mut(), None);
441
442        output.take().expect("block_test future did not complete")
443    }
444
445    /// Block the current thread until the given future resolves.
446    /// Consider using `block_with_timeout` instead.
447    pub fn block_on<R>(&self, future: impl Future<Output = R>) -> R {
448        self.inner.block_on(future)
449    }
450
451    /// Block the current thread until the given future resolves or the timeout elapses.
452    pub fn block_with_timeout<R, Fut: Future<Output = R>>(
453        &self,
454        duration: Duration,
455        future: Fut,
456    ) -> Result<R, impl Future<Output = R> + use<R, Fut>> {
457        self.inner.block_with_timeout(duration, future)
458    }
459
460    #[doc(hidden)]
461    pub fn dispatcher(&self) -> &Arc<dyn PlatformDispatcher> {
462        &self.dispatcher
463    }
464
465    #[doc(hidden)]
466    pub fn scheduler_executor(&self) -> SchedulerForegroundExecutor {
467        self.inner.clone()
468    }
469}
470
471/// Scope manages a set of tasks that are enqueued and waited on together. See [`BackgroundExecutor::scoped`].
472pub struct Scope<'a> {
473    executor: BackgroundExecutor,
474    priority: Priority,
475    futures: Vec<Pin<Box<dyn Future<Output = ()> + Send + 'static>>>,
476    tx: Option<mpsc::Sender<()>>,
477    rx: mpsc::Receiver<()>,
478    lifetime: PhantomData<&'a ()>,
479}
480
481impl<'a> Scope<'a> {
482    fn new(executor: BackgroundExecutor, priority: Priority) -> Self {
483        let (tx, rx) = mpsc::channel(1);
484        Self {
485            executor,
486            priority,
487            tx: Some(tx),
488            rx,
489            futures: Default::default(),
490            lifetime: PhantomData,
491        }
492    }
493
494    /// How many CPUs are available to the dispatcher.
495    pub fn num_cpus(&self) -> usize {
496        self.executor.num_cpus()
497    }
498
499    /// Spawn a future into this scope.
500    #[track_caller]
501    pub fn spawn<F>(&mut self, f: F)
502    where
503        F: Future<Output = ()> + Send + 'a,
504    {
505        let tx = self.tx.clone().unwrap();
506
507        // SAFETY: The 'a lifetime is guaranteed to outlive any of these futures because
508        // dropping this `Scope` blocks until all of the futures have resolved.
509        let f = unsafe {
510            mem::transmute::<
511                Pin<Box<dyn Future<Output = ()> + Send + 'a>>,
512                Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
513            >(Box::pin(async move {
514                f.await;
515                drop(tx);
516            }))
517        };
518        self.futures.push(f);
519    }
520}
521
522impl Drop for Scope<'_> {
523    fn drop(&mut self) {
524        self.tx.take().unwrap();
525
526        // Wait until the channel is closed, which means that all of the spawned
527        // futures have resolved.
528        let future = async {
529            self.rx.next().await;
530        };
531        let mut future = std::pin::pin!(future);
532        self.executor
533            .inner
534            .scheduler()
535            .block(None, future.as_mut(), None);
536    }
537}
538
539#[cfg(test)]
540mod test {
541    use super::*;
542    use crate::{App, TestDispatcher, TestPlatform};
543    use std::cell::RefCell;
544
545    /// Helper to create test infrastructure.
546    /// Returns (dispatcher, background_executor, app).
547    fn create_test_app() -> (TestDispatcher, BackgroundExecutor, Rc<crate::AppCell>) {
548        let dispatcher = TestDispatcher::new(0);
549        let arc_dispatcher = Arc::new(dispatcher.clone());
550        let background_executor = BackgroundExecutor::new(arc_dispatcher.clone());
551        let foreground_executor = ForegroundExecutor::new(arc_dispatcher);
552
553        let platform = TestPlatform::new(background_executor.clone(), foreground_executor);
554        let asset_source = Arc::new(());
555        let http_client = http_client::FakeHttpClient::with_404_response();
556
557        let app = App::new_app(platform, asset_source, http_client);
558        (dispatcher, background_executor, app)
559    }
560
561    #[test]
562    fn sanity_test_tasks_run() {
563        let (dispatcher, _background_executor, app) = create_test_app();
564        let foreground_executor = app.borrow().foreground_executor.clone();
565
566        let task_ran = Rc::new(RefCell::new(false));
567
568        foreground_executor
569            .spawn({
570                let task_ran = Rc::clone(&task_ran);
571                async move {
572                    *task_ran.borrow_mut() = true;
573                }
574            })
575            .detach();
576
577        // Run dispatcher while app is still alive
578        dispatcher.run_until_parked();
579
580        // Task should have run
581        assert!(
582            *task_ran.borrow(),
583            "Task should run normally when app is alive"
584        );
585    }
586}