executor.rs

  1use crate::{App, PlatformDispatcher, PlatformScheduler};
  2use futures::channel::mpsc;
  3use futures::prelude::*;
  4use gpui_util::TryFutureExt;
  5use scheduler::Instant;
  6use scheduler::Scheduler;
  7use std::{
  8    fmt::Debug, future::Future, marker::PhantomData, mem, pin::Pin, rc::Rc, sync::Arc,
  9    time::Duration,
 10};
 11
 12pub use scheduler::{
 13    FallibleTask, ForegroundExecutor as SchedulerForegroundExecutor, Priority, Task,
 14};
 15
 16/// A pointer to the executor that is currently running,
 17/// for spawning background tasks.
 18#[derive(Clone)]
 19pub struct BackgroundExecutor {
 20    inner: scheduler::BackgroundExecutor,
 21    dispatcher: Arc<dyn PlatformDispatcher>,
 22}
 23
 24/// A pointer to the executor that is currently running,
 25/// for spawning tasks on the main thread.
 26#[derive(Clone)]
 27pub struct ForegroundExecutor {
 28    inner: scheduler::ForegroundExecutor,
 29    dispatcher: Arc<dyn PlatformDispatcher>,
 30    not_send: PhantomData<Rc<()>>,
 31}
 32
 33/// Extension trait for `Task<Result<T, E>>` that adds `detach_and_log_err` with an `&App` context.
 34///
 35/// This trait is automatically implemented for all `Task<Result<T, E>>` types.
 36pub trait TaskExt<T, E> {
 37    /// Run the task to completion in the background and log any errors that occur.
 38    fn detach_and_log_err(self, cx: &App);
 39}
 40
 41impl<T, E> TaskExt<T, E> for Task<Result<T, E>>
 42where
 43    T: 'static,
 44    E: 'static + Debug,
 45{
 46    #[track_caller]
 47    fn detach_and_log_err(self, cx: &App) {
 48        let location = core::panic::Location::caller();
 49        cx.foreground_executor()
 50            .spawn(self.log_tracked_err(*location))
 51            .detach();
 52    }
 53}
 54
 55impl BackgroundExecutor {
 56    /// Creates a new BackgroundExecutor from the given PlatformDispatcher.
 57    pub fn new(dispatcher: Arc<dyn PlatformDispatcher>) -> Self {
 58        #[cfg(any(test, feature = "test-support"))]
 59        let scheduler: Arc<dyn Scheduler> = if let Some(test_dispatcher) = dispatcher.as_test() {
 60            test_dispatcher.scheduler().clone()
 61        } else {
 62            Arc::new(PlatformScheduler::new(dispatcher.clone()))
 63        };
 64
 65        #[cfg(not(any(test, feature = "test-support")))]
 66        let scheduler: Arc<dyn Scheduler> = Arc::new(PlatformScheduler::new(dispatcher.clone()));
 67
 68        Self {
 69            inner: scheduler::BackgroundExecutor::new(scheduler),
 70            dispatcher,
 71        }
 72    }
 73
 74    /// Returns the underlying scheduler::BackgroundExecutor.
 75    ///
 76    /// This is used by Ex to pass the executor to thread/worktree code.
 77    pub fn scheduler_executor(&self) -> scheduler::BackgroundExecutor {
 78        self.inner.clone()
 79    }
 80
 81    /// Enqueues the given future to be run to completion on a background thread.
 82    #[track_caller]
 83    pub fn spawn<R>(&self, future: impl Future<Output = R> + Send + 'static) -> Task<R>
 84    where
 85        R: Send + 'static,
 86    {
 87        self.spawn_with_priority(Priority::default(), future.boxed())
 88    }
 89
 90    /// Enqueues the given future to be run to completion on a background thread with the given priority.
 91    ///
 92    /// When `Priority::RealtimeAudio` is used, the task runs on a dedicated thread with
 93    /// realtime scheduling priority, suitable for audio processing.
 94    #[track_caller]
 95    pub fn spawn_with_priority<R>(
 96        &self,
 97        priority: Priority,
 98        future: impl Future<Output = R> + Send + 'static,
 99    ) -> Task<R>
100    where
101        R: Send + 'static,
102    {
103        if priority == Priority::RealtimeAudio {
104            self.inner.spawn_realtime(future)
105        } else {
106            self.inner.spawn_with_priority(priority, future)
107        }
108    }
109
110    /// Enqueues the given future to be run to completion on a background thread and blocking the current task on it.
111    ///
112    /// This allows to spawn background work that borrows from its scope. Note that the supplied future will run to
113    /// completion before the current task is resumed, even if the current task is slated for cancellation.
114    pub async fn await_on_background<R>(&self, future: impl Future<Output = R> + Send) -> R
115    where
116        R: Send,
117    {
118        use crate::RunnableMeta;
119        use parking_lot::{Condvar, Mutex};
120
121        struct NotifyOnDrop<'a>(&'a (Condvar, Mutex<bool>));
122
123        impl Drop for NotifyOnDrop<'_> {
124            fn drop(&mut self) {
125                *self.0.1.lock() = true;
126                self.0.0.notify_all();
127            }
128        }
129
130        struct WaitOnDrop<'a>(&'a (Condvar, Mutex<bool>));
131
132        impl Drop for WaitOnDrop<'_> {
133            fn drop(&mut self) {
134                let mut done = self.0.1.lock();
135                if !*done {
136                    self.0.0.wait(&mut done);
137                }
138            }
139        }
140
141        let dispatcher = self.dispatcher.clone();
142        let location = core::panic::Location::caller();
143
144        let pair = &(Condvar::new(), Mutex::new(false));
145        let _wait_guard = WaitOnDrop(pair);
146
147        let (runnable, task) = unsafe {
148            async_task::Builder::new()
149                .metadata(RunnableMeta { location })
150                .spawn_unchecked(
151                    move |_| async {
152                        let _notify_guard = NotifyOnDrop(pair);
153                        future.await
154                    },
155                    move |runnable| {
156                        dispatcher.dispatch(runnable, Priority::default());
157                    },
158                )
159        };
160        runnable.schedule();
161        task.await
162    }
163
164    /// Scoped lets you start a number of tasks and waits
165    /// for all of them to complete before returning.
166    pub async fn scoped<'scope, F>(&self, scheduler: F)
167    where
168        F: FnOnce(&mut Scope<'scope>),
169    {
170        let mut scope = Scope::new(self.clone(), Priority::default());
171        (scheduler)(&mut scope);
172        let spawned = mem::take(&mut scope.futures)
173            .into_iter()
174            .map(|f| self.spawn_with_priority(scope.priority, f))
175            .collect::<Vec<_>>();
176        for task in spawned {
177            task.await;
178        }
179    }
180
181    /// Scoped lets you start a number of tasks and waits
182    /// for all of them to complete before returning.
183    pub async fn scoped_priority<'scope, F>(&self, priority: Priority, scheduler: F)
184    where
185        F: FnOnce(&mut Scope<'scope>),
186    {
187        let mut scope = Scope::new(self.clone(), priority);
188        (scheduler)(&mut scope);
189        let spawned = mem::take(&mut scope.futures)
190            .into_iter()
191            .map(|f| self.spawn_with_priority(scope.priority, f))
192            .collect::<Vec<_>>();
193        for task in spawned {
194            task.await;
195        }
196    }
197
198    /// Get the current time.
199    ///
200    /// Calling this instead of `std::time::Instant::now` allows the use
201    /// of fake timers in tests.
202    pub fn now(&self) -> Instant {
203        self.inner.scheduler().clock().now()
204    }
205
206    /// Returns a task that will complete after the given duration.
207    /// Depending on other concurrent tasks the elapsed duration may be longer
208    /// than requested.
209    #[track_caller]
210    pub fn timer(&self, duration: Duration) -> Task<()> {
211        if duration.is_zero() {
212            return Task::ready(());
213        }
214        self.spawn(self.inner.scheduler().timer(duration))
215    }
216
217    /// In tests, run an arbitrary number of tasks (determined by the SEED environment variable)
218    #[cfg(any(test, feature = "test-support"))]
219    pub fn simulate_random_delay(&self) -> impl Future<Output = ()> + use<> {
220        self.dispatcher.as_test().unwrap().simulate_random_delay()
221    }
222
223    /// In tests, move time forward. This does not run any tasks, but does make `timer`s ready.
224    #[cfg(any(test, feature = "test-support"))]
225    pub fn advance_clock(&self, duration: Duration) {
226        self.dispatcher.as_test().unwrap().advance_clock(duration)
227    }
228
229    /// In tests, run one task.
230    #[cfg(any(test, feature = "test-support"))]
231    pub fn tick(&self) -> bool {
232        self.dispatcher.as_test().unwrap().scheduler().tick()
233    }
234
235    /// In tests, run tasks until the scheduler would park.
236    ///
237    /// Under the scheduler-backed test dispatcher, `tick()` will not advance the clock, so a pending
238    /// timer can keep `has_pending_tasks()` true even after all currently-runnable tasks have been
239    /// drained. To preserve the historical semantics that tests relied on (drain all work that can
240    /// make progress), we advance the clock to the next timer when no runnable tasks remain.
241    #[cfg(any(test, feature = "test-support"))]
242    pub fn run_until_parked(&self) {
243        let scheduler = self.dispatcher.as_test().unwrap().scheduler();
244        scheduler.run();
245    }
246
247    /// In tests, prevents `run_until_parked` from panicking if there are outstanding tasks.
248    #[cfg(any(test, feature = "test-support"))]
249    pub fn allow_parking(&self) {
250        self.dispatcher
251            .as_test()
252            .unwrap()
253            .scheduler()
254            .allow_parking();
255
256        if std::env::var("GPUI_RUN_UNTIL_PARKED_LOG").ok().as_deref() == Some("1") {
257            log::warn!("[gpui::executor] allow_parking: enabled");
258        }
259    }
260
261    /// Sets the range of ticks to run before timing out in block_on.
262    #[cfg(any(test, feature = "test-support"))]
263    pub fn set_block_on_ticks(&self, range: std::ops::RangeInclusive<usize>) {
264        self.dispatcher
265            .as_test()
266            .unwrap()
267            .scheduler()
268            .set_timeout_ticks(range);
269    }
270
271    /// Undoes the effect of [`Self::allow_parking`].
272    #[cfg(any(test, feature = "test-support"))]
273    pub fn forbid_parking(&self) {
274        self.dispatcher
275            .as_test()
276            .unwrap()
277            .scheduler()
278            .forbid_parking();
279    }
280
281    /// In tests, returns the rng used by the dispatcher.
282    #[cfg(any(test, feature = "test-support"))]
283    pub fn rng(&self) -> scheduler::SharedRng {
284        self.dispatcher.as_test().unwrap().scheduler().rng()
285    }
286
287    /// How many CPUs are available to the dispatcher.
288    pub fn num_cpus(&self) -> usize {
289        #[cfg(any(test, feature = "test-support"))]
290        if let Some(test) = self.dispatcher.as_test() {
291            return test.num_cpus_override().unwrap_or(4);
292        }
293        num_cpus::get()
294    }
295
296    /// Override the number of CPUs reported by this executor in tests.
297    /// Panics if not called on a test executor.
298    #[cfg(any(test, feature = "test-support"))]
299    pub fn set_num_cpus(&self, count: usize) {
300        self.dispatcher
301            .as_test()
302            .expect("set_num_cpus can only be called on a test executor")
303            .set_num_cpus(count);
304    }
305
306    /// Whether we're on the main thread.
307    pub fn is_main_thread(&self) -> bool {
308        self.dispatcher.is_main_thread()
309    }
310
311    #[doc(hidden)]
312    pub fn dispatcher(&self) -> &Arc<dyn PlatformDispatcher> {
313        &self.dispatcher
314    }
315}
316
317impl ForegroundExecutor {
318    /// Creates a new ForegroundExecutor from the given PlatformDispatcher.
319    pub fn new(dispatcher: Arc<dyn PlatformDispatcher>) -> Self {
320        #[cfg(any(test, feature = "test-support"))]
321        let (scheduler, session_id): (Arc<dyn Scheduler>, _) =
322            if let Some(test_dispatcher) = dispatcher.as_test() {
323                (
324                    test_dispatcher.scheduler().clone(),
325                    test_dispatcher.session_id(),
326                )
327            } else {
328                let platform_scheduler = Arc::new(PlatformScheduler::new(dispatcher.clone()));
329                let session_id = platform_scheduler.allocate_session_id();
330                (platform_scheduler, session_id)
331            };
332
333        #[cfg(not(any(test, feature = "test-support")))]
334        let (scheduler, session_id): (Arc<dyn Scheduler>, _) = {
335            let platform_scheduler = Arc::new(PlatformScheduler::new(dispatcher.clone()));
336            let session_id = platform_scheduler.allocate_session_id();
337            (platform_scheduler, session_id)
338        };
339
340        let inner = scheduler::ForegroundExecutor::new(session_id, scheduler);
341
342        Self {
343            inner,
344            dispatcher,
345            not_send: PhantomData,
346        }
347    }
348
349    /// Enqueues the given Task to run on the main thread.
350    #[track_caller]
351    pub fn spawn<R>(&self, future: impl Future<Output = R> + 'static) -> Task<R>
352    where
353        R: 'static,
354    {
355        self.inner.spawn(future.boxed_local())
356    }
357
358    /// Enqueues the given Task to run on the main thread with the given priority.
359    #[track_caller]
360    pub fn spawn_with_priority<R>(
361        &self,
362        _priority: Priority,
363        future: impl Future<Output = R> + 'static,
364    ) -> Task<R>
365    where
366        R: 'static,
367    {
368        // Priority is ignored for foreground tasks - they run in order on the main thread
369        self.inner.spawn(future)
370    }
371
372    /// Used by the test harness to run an async test in a synchronous fashion.
373    #[cfg(any(test, feature = "test-support"))]
374    #[track_caller]
375    pub fn block_test<R>(&self, future: impl Future<Output = R>) -> R {
376        use std::cell::Cell;
377
378        let scheduler = self.inner.scheduler();
379
380        let output = Cell::new(None);
381        let future = async {
382            output.set(Some(future.await));
383        };
384        let mut future = std::pin::pin!(future);
385
386        // In async GPUI tests, we must allow foreground tasks scheduled by the test itself
387        // (which are associated with the test session) to make progress while we block.
388        // Otherwise, awaiting futures that depend on same-session foreground work can deadlock.
389        scheduler.block(None, future.as_mut(), None);
390
391        output.take().expect("block_test future did not complete")
392    }
393
394    /// Block the current thread until the given future resolves.
395    /// Consider using `block_with_timeout` instead.
396    pub fn block_on<R>(&self, future: impl Future<Output = R>) -> R {
397        self.inner.block_on(future)
398    }
399
400    /// Block the current thread until the given future resolves or the timeout elapses.
401    pub fn block_with_timeout<R, Fut: Future<Output = R>>(
402        &self,
403        duration: Duration,
404        future: Fut,
405    ) -> Result<R, impl Future<Output = R> + use<R, Fut>> {
406        self.inner.block_with_timeout(duration, future)
407    }
408
409    #[doc(hidden)]
410    pub fn dispatcher(&self) -> &Arc<dyn PlatformDispatcher> {
411        &self.dispatcher
412    }
413
414    #[doc(hidden)]
415    pub fn scheduler_executor(&self) -> SchedulerForegroundExecutor {
416        self.inner.clone()
417    }
418}
419
420/// Scope manages a set of tasks that are enqueued and waited on together. See [`BackgroundExecutor::scoped`].
421pub struct Scope<'a> {
422    executor: BackgroundExecutor,
423    priority: Priority,
424    futures: Vec<Pin<Box<dyn Future<Output = ()> + Send + 'static>>>,
425    tx: Option<mpsc::Sender<()>>,
426    rx: mpsc::Receiver<()>,
427    lifetime: PhantomData<&'a ()>,
428}
429
430impl<'a> Scope<'a> {
431    fn new(executor: BackgroundExecutor, priority: Priority) -> Self {
432        let (tx, rx) = mpsc::channel(1);
433        Self {
434            executor,
435            priority,
436            tx: Some(tx),
437            rx,
438            futures: Default::default(),
439            lifetime: PhantomData,
440        }
441    }
442
443    /// How many CPUs are available to the dispatcher.
444    pub fn num_cpus(&self) -> usize {
445        self.executor.num_cpus()
446    }
447
448    /// Spawn a future into this scope.
449    #[track_caller]
450    pub fn spawn<F>(&mut self, f: F)
451    where
452        F: Future<Output = ()> + Send + 'a,
453    {
454        let tx = self.tx.clone().unwrap();
455
456        // SAFETY: The 'a lifetime is guaranteed to outlive any of these futures because
457        // dropping this `Scope` blocks until all of the futures have resolved.
458        let f = unsafe {
459            mem::transmute::<
460                Pin<Box<dyn Future<Output = ()> + Send + 'a>>,
461                Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
462            >(Box::pin(async move {
463                f.await;
464                drop(tx);
465            }))
466        };
467        self.futures.push(f);
468    }
469}
470
471impl Drop for Scope<'_> {
472    fn drop(&mut self) {
473        self.tx.take().unwrap();
474
475        // Wait until the channel is closed, which means that all of the spawned
476        // futures have resolved.
477        let future = async {
478            self.rx.next().await;
479        };
480        let mut future = std::pin::pin!(future);
481        self.executor
482            .inner
483            .scheduler()
484            .block(None, future.as_mut(), None);
485    }
486}
487
488#[cfg(test)]
489mod test {
490    use super::*;
491    use crate::{App, TestDispatcher, TestPlatform};
492    use std::cell::RefCell;
493
494    /// Helper to create test infrastructure.
495    /// Returns (dispatcher, background_executor, app).
496    fn create_test_app() -> (TestDispatcher, BackgroundExecutor, Rc<crate::AppCell>) {
497        let dispatcher = TestDispatcher::new(0);
498        let arc_dispatcher = Arc::new(dispatcher.clone());
499        let background_executor = BackgroundExecutor::new(arc_dispatcher.clone());
500        let foreground_executor = ForegroundExecutor::new(arc_dispatcher);
501
502        let platform = TestPlatform::new(background_executor.clone(), foreground_executor);
503        let asset_source = Arc::new(());
504        let http_client = http_client::FakeHttpClient::with_404_response();
505
506        let app = App::new_app(platform, asset_source, http_client);
507        (dispatcher, background_executor, app)
508    }
509
510    #[test]
511    fn sanity_test_tasks_run() {
512        let (dispatcher, _background_executor, app) = create_test_app();
513        let foreground_executor = app.borrow().foreground_executor.clone();
514
515        let task_ran = Rc::new(RefCell::new(false));
516
517        foreground_executor
518            .spawn({
519                let task_ran = Rc::clone(&task_ran);
520                async move {
521                    *task_ran.borrow_mut() = true;
522                }
523            })
524            .detach();
525
526        // Run dispatcher while app is still alive
527        dispatcher.run_until_parked();
528
529        // Task should have run
530        assert!(
531            *task_ran.borrow(),
532            "Task should run normally when app is alive"
533        );
534    }
535}