executor.rs

   1use crate::{
   2    App, AppLivenessToken, PlatformDispatcher, RunnableMeta, RunnableVariant, TaskTiming, profiler,
   3};
   4use async_task::Runnable;
   5use futures::channel::mpsc;
   6use parking_lot::{Condvar, Mutex};
   7use smol::prelude::*;
   8use std::{
   9    fmt::Debug,
  10    marker::PhantomData,
  11    mem::{self, ManuallyDrop},
  12    num::NonZeroUsize,
  13    panic::Location,
  14    pin::Pin,
  15    rc::Rc,
  16    sync::{
  17        Arc,
  18        atomic::{AtomicUsize, Ordering},
  19    },
  20    task::{Context, Poll},
  21    thread::{self, ThreadId},
  22    time::{Duration, Instant},
  23};
  24use util::TryFutureExt;
  25use waker_fn::waker_fn;
  26
  27#[cfg(any(test, feature = "test-support"))]
  28use rand::rngs::StdRng;
  29
  30/// A pointer to the executor that is currently running,
  31/// for spawning background tasks.
  32#[derive(Clone)]
  33pub struct BackgroundExecutor {
  34    #[doc(hidden)]
  35    pub dispatcher: Arc<dyn PlatformDispatcher>,
  36}
  37
  38/// A pointer to the executor that is currently running,
  39/// for spawning tasks on the main thread.
  40///
  41/// This is intentionally `!Send` via the `not_send` marker field. This is because
  42/// `ForegroundExecutor::spawn` does not require `Send` but checks at runtime that the future is
  43/// only polled from the same thread it was spawned from. These checks would fail when spawning
  44/// foreground tasks from background threads.
  45#[derive(Clone)]
  46pub struct ForegroundExecutor {
  47    #[doc(hidden)]
  48    pub dispatcher: Arc<dyn PlatformDispatcher>,
  49    not_send: PhantomData<Rc<()>>,
  50}
  51
  52/// Realtime task priority
  53#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
  54#[repr(u8)]
  55pub enum RealtimePriority {
  56    /// Audio task
  57    Audio,
  58    /// Other realtime task
  59    #[default]
  60    Other,
  61}
  62
  63/// Task priority
  64#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
  65#[repr(u8)]
  66pub enum Priority {
  67    /// Realtime priority
  68    ///
  69    /// Spawning a task with this priority will spin it off on a separate thread dedicated just to that task.
  70    Realtime(RealtimePriority),
  71    /// High priority
  72    ///
  73    /// Only use for tasks that are critical to the user experience / responsiveness of the editor.
  74    High,
  75    /// Medium priority, probably suits most of your use cases.
  76    #[default]
  77    Medium,
  78    /// Low priority
  79    ///
  80    /// Prioritize this for background work that can come in large quantities
  81    /// to not starve the executor of resources for high priority tasks
  82    Low,
  83}
  84
  85impl Priority {
  86    #[allow(dead_code)]
  87    pub(crate) const fn probability(&self) -> u32 {
  88        match self {
  89            // realtime priorities are not considered for probability scheduling
  90            Priority::Realtime(_) => 0,
  91            Priority::High => 60,
  92            Priority::Medium => 30,
  93            Priority::Low => 10,
  94        }
  95    }
  96}
  97
  98/// Task is a primitive that allows work to happen in the background.
  99///
 100/// It implements [`Future`] so you can `.await` on it.
 101///
 102/// If you drop a task it will be cancelled immediately. Calling [`Task::detach`] allows
 103/// the task to continue running, but with no way to return a value.
 104#[must_use]
 105#[derive(Debug)]
 106pub struct Task<T>(TaskState<T>);
 107
 108#[derive(Debug)]
 109enum TaskState<T> {
 110    /// A task that is ready to return a value
 111    Ready(Option<T>),
 112
 113    /// A task that is currently running.
 114    Spawned(async_task::Task<T, RunnableMeta>),
 115}
 116
 117impl<T> Task<T> {
 118    /// Creates a new task that will resolve with the value
 119    pub fn ready(val: T) -> Self {
 120        Task(TaskState::Ready(Some(val)))
 121    }
 122
 123    /// Detaching a task runs it to completion in the background
 124    pub fn detach(self) {
 125        match self {
 126            Task(TaskState::Ready(_)) => {}
 127            Task(TaskState::Spawned(task)) => task.detach(),
 128        }
 129    }
 130
 131    /// Converts this task into a fallible task that returns `Option<T>`.
 132    ///
 133    /// Unlike the standard `Task<T>`, a [`FallibleTask`] will return `None`
 134    /// if the app was dropped while the task is executing.
 135    ///
 136    /// # Example
 137    ///
 138    /// ```ignore
 139    /// // Background task that gracefully handles app shutdown:
 140    /// cx.background_spawn(async move {
 141    ///     let result = foreground_task.fallible().await;
 142    ///     if let Some(value) = result {
 143    ///         // Process the value
 144    ///     }
 145    ///     // If None, app was shut down - just exit gracefully
 146    /// }).detach();
 147    /// ```
 148    pub fn fallible(self) -> FallibleTask<T> {
 149        FallibleTask(match self.0 {
 150            TaskState::Ready(val) => FallibleTaskState::Ready(val),
 151            TaskState::Spawned(task) => FallibleTaskState::Spawned(task.fallible()),
 152        })
 153    }
 154}
 155
 156impl<E, T> Task<Result<T, E>>
 157where
 158    T: 'static,
 159    E: 'static + Debug,
 160{
 161    /// Run the task to completion in the background and log any
 162    /// errors that occur.
 163    #[track_caller]
 164    pub fn detach_and_log_err(self, cx: &App) {
 165        let location = core::panic::Location::caller();
 166        cx.foreground_executor()
 167            .spawn(self.log_tracked_err(*location))
 168            .detach();
 169    }
 170}
 171
 172impl<T> Future for Task<T> {
 173    type Output = T;
 174
 175    fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
 176        match unsafe { self.get_unchecked_mut() } {
 177            Task(TaskState::Ready(val)) => Poll::Ready(val.take().unwrap()),
 178            Task(TaskState::Spawned(task)) => task.poll(cx),
 179        }
 180    }
 181}
 182
 183/// A task that returns `Option<T>` instead of panicking when cancelled.
 184#[must_use]
 185pub struct FallibleTask<T>(FallibleTaskState<T>);
 186
 187enum FallibleTaskState<T> {
 188    /// A task that is ready to return a value
 189    Ready(Option<T>),
 190
 191    /// A task that is currently running (wraps async_task::FallibleTask).
 192    Spawned(async_task::FallibleTask<T, RunnableMeta>),
 193}
 194
 195impl<T> FallibleTask<T> {
 196    /// Creates a new fallible task that will resolve with the value.
 197    pub fn ready(val: T) -> Self {
 198        FallibleTask(FallibleTaskState::Ready(Some(val)))
 199    }
 200
 201    /// Detaching a task runs it to completion in the background.
 202    pub fn detach(self) {
 203        match self.0 {
 204            FallibleTaskState::Ready(_) => {}
 205            FallibleTaskState::Spawned(task) => task.detach(),
 206        }
 207    }
 208}
 209
 210impl<T> Future for FallibleTask<T> {
 211    type Output = Option<T>;
 212
 213    fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
 214        match unsafe { self.get_unchecked_mut() } {
 215            FallibleTask(FallibleTaskState::Ready(val)) => Poll::Ready(val.take()),
 216            FallibleTask(FallibleTaskState::Spawned(task)) => Pin::new(task).poll(cx),
 217        }
 218    }
 219}
 220
 221impl<T> std::fmt::Debug for FallibleTask<T> {
 222    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
 223        match &self.0 {
 224            FallibleTaskState::Ready(_) => f.debug_tuple("FallibleTask::Ready").finish(),
 225            FallibleTaskState::Spawned(task) => {
 226                f.debug_tuple("FallibleTask::Spawned").field(task).finish()
 227            }
 228        }
 229    }
 230}
 231
 232/// A task label is an opaque identifier that you can use to
 233/// refer to a task in tests.
 234#[derive(Clone, Copy, PartialEq, Eq, Hash, Debug)]
 235pub struct TaskLabel(NonZeroUsize);
 236
 237impl Default for TaskLabel {
 238    fn default() -> Self {
 239        Self::new()
 240    }
 241}
 242
 243impl TaskLabel {
 244    /// Construct a new task label.
 245    pub fn new() -> Self {
 246        static NEXT_TASK_LABEL: AtomicUsize = AtomicUsize::new(1);
 247        Self(
 248            NEXT_TASK_LABEL
 249                .fetch_add(1, Ordering::SeqCst)
 250                .try_into()
 251                .unwrap(),
 252        )
 253    }
 254}
 255
 256type AnyLocalFuture<R> = Pin<Box<dyn 'static + Future<Output = R>>>;
 257
 258type AnyFuture<R> = Pin<Box<dyn 'static + Send + Future<Output = R>>>;
 259
 260/// BackgroundExecutor lets you run things on background threads.
 261/// In production this is a thread pool with no ordering guarantees.
 262/// In tests this is simulated by running tasks one by one in a deterministic
 263/// (but arbitrary) order controlled by the `SEED` environment variable.
 264impl BackgroundExecutor {
 265    #[doc(hidden)]
 266    pub fn new(dispatcher: Arc<dyn PlatformDispatcher>) -> Self {
 267        Self { dispatcher }
 268    }
 269
 270    /// Enqueues the given future to be run to completion on a background thread.
 271    #[track_caller]
 272    pub fn spawn<R>(&self, future: impl Future<Output = R> + Send + 'static) -> Task<R>
 273    where
 274        R: Send + 'static,
 275    {
 276        self.spawn_with_priority(Priority::default(), future)
 277    }
 278
 279    /// Enqueues the given future to be run to completion on a background thread.
 280    #[track_caller]
 281    pub fn spawn_with_priority<R>(
 282        &self,
 283        priority: Priority,
 284        future: impl Future<Output = R> + Send + 'static,
 285    ) -> Task<R>
 286    where
 287        R: Send + 'static,
 288    {
 289        self.spawn_internal::<R>(Box::pin(future), None, priority)
 290    }
 291
 292    /// Enqueues the given future to be run to completion on a background thread and blocking the current task on it.
 293    ///
 294    /// This allows to spawn background work that borrows from its scope. Note that the supplied future will run to
 295    /// completion before the current task is resumed, even if the current task is slated for cancellation.
 296    pub async fn await_on_background<R>(&self, future: impl Future<Output = R> + Send) -> R
 297    where
 298        R: Send,
 299    {
 300        // We need to ensure that cancellation of the parent task does not drop the environment
 301        // before the our own task has completed or got cancelled.
 302        struct NotifyOnDrop<'a>(&'a (Condvar, Mutex<bool>));
 303
 304        impl Drop for NotifyOnDrop<'_> {
 305            fn drop(&mut self) {
 306                *self.0.1.lock() = true;
 307                self.0.0.notify_all();
 308            }
 309        }
 310
 311        struct WaitOnDrop<'a>(&'a (Condvar, Mutex<bool>));
 312
 313        impl Drop for WaitOnDrop<'_> {
 314            fn drop(&mut self) {
 315                let mut done = self.0.1.lock();
 316                if !*done {
 317                    self.0.0.wait(&mut done);
 318                }
 319            }
 320        }
 321
 322        let dispatcher = self.dispatcher.clone();
 323        let location = core::panic::Location::caller();
 324
 325        let pair = &(Condvar::new(), Mutex::new(false));
 326        let _wait_guard = WaitOnDrop(pair);
 327
 328        let (runnable, task) = unsafe {
 329            async_task::Builder::new()
 330                .metadata(RunnableMeta {
 331                    location,
 332                    app: None,
 333                })
 334                .spawn_unchecked(
 335                    move |_| async {
 336                        let _notify_guard = NotifyOnDrop(pair);
 337                        future.await
 338                    },
 339                    move |runnable| {
 340                        dispatcher.dispatch(
 341                            RunnableVariant::Meta(runnable),
 342                            None,
 343                            Priority::default(),
 344                        )
 345                    },
 346                )
 347        };
 348        runnable.schedule();
 349        task.await
 350    }
 351
 352    /// Enqueues the given future to be run to completion on a background thread.
 353    /// The given label can be used to control the priority of the task in tests.
 354    #[track_caller]
 355    pub fn spawn_labeled<R>(
 356        &self,
 357        label: TaskLabel,
 358        future: impl Future<Output = R> + Send + 'static,
 359    ) -> Task<R>
 360    where
 361        R: Send + 'static,
 362    {
 363        self.spawn_internal::<R>(Box::pin(future), Some(label), Priority::default())
 364    }
 365
 366    #[track_caller]
 367    fn spawn_internal<R: Send + 'static>(
 368        &self,
 369        future: AnyFuture<R>,
 370        label: Option<TaskLabel>,
 371        #[cfg_attr(
 372            target_os = "windows",
 373            expect(
 374                unused_variables,
 375                reason = "Multi priority scheduler is broken on windows"
 376            )
 377        )]
 378        priority: Priority,
 379    ) -> Task<R> {
 380        let dispatcher = self.dispatcher.clone();
 381        #[cfg(target_os = "windows")]
 382        let priority = Priority::Medium; // multi-prio scheduler is broken on windows
 383
 384        let (runnable, task) = if let Priority::Realtime(realtime) = priority {
 385            let location = core::panic::Location::caller();
 386            let (mut tx, rx) = flume::bounded::<Runnable<RunnableMeta>>(1);
 387
 388            dispatcher.spawn_realtime(
 389                realtime,
 390                Box::new(move || {
 391                    while let Ok(runnable) = rx.recv() {
 392                        let start = Instant::now();
 393                        let location = runnable.metadata().location;
 394                        let mut timing = TaskTiming {
 395                            location,
 396                            start,
 397                            end: None,
 398                        };
 399                        profiler::add_task_timing(timing);
 400
 401                        runnable.run();
 402
 403                        let end = Instant::now();
 404                        timing.end = Some(end);
 405                        profiler::add_task_timing(timing);
 406                    }
 407                }),
 408            );
 409
 410            async_task::Builder::new()
 411                .metadata(RunnableMeta {
 412                    location,
 413                    app: None,
 414                })
 415                .spawn(
 416                    move |_| future,
 417                    move |runnable| {
 418                        let _ = tx.send(runnable);
 419                    },
 420                )
 421        } else {
 422            let location = core::panic::Location::caller();
 423            async_task::Builder::new()
 424                .metadata(RunnableMeta {
 425                    location,
 426                    app: None,
 427                })
 428                .spawn(
 429                    move |_| future,
 430                    move |runnable| {
 431                        dispatcher.dispatch(RunnableVariant::Meta(runnable), label, priority)
 432                    },
 433                )
 434        };
 435
 436        runnable.schedule();
 437        Task(TaskState::Spawned(task))
 438    }
 439
 440    /// Used by the test harness to run an async test in a synchronous fashion.
 441    #[cfg(any(test, feature = "test-support"))]
 442    #[track_caller]
 443    pub fn block_test<R>(&self, future: impl Future<Output = R>) -> R {
 444        if let Ok(value) = self.block_internal(false, future, None) {
 445            value
 446        } else {
 447            unreachable!()
 448        }
 449    }
 450
 451    /// Block the current thread until the given future resolves.
 452    /// Consider using `block_with_timeout` instead.
 453    pub fn block<R>(&self, future: impl Future<Output = R>) -> R {
 454        if let Ok(value) = self.block_internal(true, future, None) {
 455            value
 456        } else {
 457            unreachable!()
 458        }
 459    }
 460
 461    #[cfg(not(any(test, feature = "test-support")))]
 462    pub(crate) fn block_internal<Fut: Future>(
 463        &self,
 464        _background_only: bool,
 465        future: Fut,
 466        timeout: Option<Duration>,
 467    ) -> Result<Fut::Output, impl Future<Output = Fut::Output> + use<Fut>> {
 468        use std::time::Instant;
 469
 470        let mut future = Box::pin(future);
 471        if timeout == Some(Duration::ZERO) {
 472            return Err(future);
 473        }
 474        let deadline = timeout.map(|timeout| Instant::now() + timeout);
 475
 476        let parker = parking::Parker::new();
 477        let unparker = parker.unparker();
 478        let waker = waker_fn(move || {
 479            unparker.unpark();
 480        });
 481        let mut cx = std::task::Context::from_waker(&waker);
 482
 483        loop {
 484            match future.as_mut().poll(&mut cx) {
 485                Poll::Ready(result) => return Ok(result),
 486                Poll::Pending => {
 487                    let timeout =
 488                        deadline.map(|deadline| deadline.saturating_duration_since(Instant::now()));
 489                    if let Some(timeout) = timeout {
 490                        if !parker.park_timeout(timeout)
 491                            && deadline.is_some_and(|deadline| deadline < Instant::now())
 492                        {
 493                            return Err(future);
 494                        }
 495                    } else {
 496                        parker.park();
 497                    }
 498                }
 499            }
 500        }
 501    }
 502
 503    #[cfg(any(test, feature = "test-support"))]
 504    #[track_caller]
 505    pub(crate) fn block_internal<Fut: Future>(
 506        &self,
 507        background_only: bool,
 508        future: Fut,
 509        timeout: Option<Duration>,
 510    ) -> Result<Fut::Output, impl Future<Output = Fut::Output> + use<Fut>> {
 511        use std::sync::atomic::AtomicBool;
 512
 513        use parking::Parker;
 514
 515        let mut future = Box::pin(future);
 516        if timeout == Some(Duration::ZERO) {
 517            return Err(future);
 518        }
 519        let Some(dispatcher) = self.dispatcher.as_test() else {
 520            return Err(future);
 521        };
 522
 523        let mut max_ticks = if timeout.is_some() {
 524            dispatcher.gen_block_on_ticks()
 525        } else {
 526            usize::MAX
 527        };
 528
 529        let parker = Parker::new();
 530        let unparker = parker.unparker();
 531
 532        let awoken = Arc::new(AtomicBool::new(false));
 533        let waker = waker_fn({
 534            let awoken = awoken.clone();
 535            let unparker = unparker.clone();
 536            move || {
 537                awoken.store(true, Ordering::SeqCst);
 538                unparker.unpark();
 539            }
 540        });
 541        let mut cx = std::task::Context::from_waker(&waker);
 542
 543        let duration = Duration::from_secs(
 544            option_env!("GPUI_TEST_TIMEOUT")
 545                .and_then(|s| s.parse::<u64>().ok())
 546                .unwrap_or(180),
 547        );
 548        let mut test_should_end_by = Instant::now() + duration;
 549
 550        loop {
 551            match future.as_mut().poll(&mut cx) {
 552                Poll::Ready(result) => return Ok(result),
 553                Poll::Pending => {
 554                    if max_ticks == 0 {
 555                        return Err(future);
 556                    }
 557                    max_ticks -= 1;
 558
 559                    if !dispatcher.tick(background_only) {
 560                        if awoken.swap(false, Ordering::SeqCst) {
 561                            continue;
 562                        }
 563
 564                        if !dispatcher.parking_allowed() {
 565                            if dispatcher.advance_clock_to_next_delayed() {
 566                                continue;
 567                            }
 568                            let mut backtrace_message = String::new();
 569                            let mut waiting_message = String::new();
 570                            if let Some(backtrace) = dispatcher.waiting_backtrace() {
 571                                backtrace_message =
 572                                    format!("\nbacktrace of waiting future:\n{:?}", backtrace);
 573                            }
 574                            if let Some(waiting_hint) = dispatcher.waiting_hint() {
 575                                waiting_message = format!("\n  waiting on: {}\n", waiting_hint);
 576                            }
 577                            panic!(
 578                                "parked with nothing left to run{waiting_message}{backtrace_message}",
 579                            )
 580                        }
 581                        dispatcher.push_unparker(unparker.clone());
 582                        parker.park_timeout(Duration::from_millis(1));
 583                        if Instant::now() > test_should_end_by {
 584                            panic!("test timed out after {duration:?} with allow_parking")
 585                        }
 586                    }
 587                }
 588            }
 589        }
 590    }
 591
 592    /// Block the current thread until the given future resolves
 593    /// or `duration` has elapsed.
 594    pub fn block_with_timeout<Fut: Future>(
 595        &self,
 596        duration: Duration,
 597        future: Fut,
 598    ) -> Result<Fut::Output, impl Future<Output = Fut::Output> + use<Fut>> {
 599        self.block_internal(true, future, Some(duration))
 600    }
 601
 602    /// Scoped lets you start a number of tasks and waits
 603    /// for all of them to complete before returning.
 604    pub async fn scoped<'scope, F>(&self, scheduler: F)
 605    where
 606        F: FnOnce(&mut Scope<'scope>),
 607    {
 608        let mut scope = Scope::new(self.clone(), Priority::default());
 609        (scheduler)(&mut scope);
 610        let spawned = mem::take(&mut scope.futures)
 611            .into_iter()
 612            .map(|f| self.spawn_with_priority(scope.priority, f))
 613            .collect::<Vec<_>>();
 614        for task in spawned {
 615            task.await;
 616        }
 617    }
 618
 619    /// Scoped lets you start a number of tasks and waits
 620    /// for all of them to complete before returning.
 621    pub async fn scoped_priority<'scope, F>(&self, priority: Priority, scheduler: F)
 622    where
 623        F: FnOnce(&mut Scope<'scope>),
 624    {
 625        let mut scope = Scope::new(self.clone(), priority);
 626        (scheduler)(&mut scope);
 627        let spawned = mem::take(&mut scope.futures)
 628            .into_iter()
 629            .map(|f| self.spawn_with_priority(scope.priority, f))
 630            .collect::<Vec<_>>();
 631        for task in spawned {
 632            task.await;
 633        }
 634    }
 635
 636    /// Get the current time.
 637    ///
 638    /// Calling this instead of `std::time::Instant::now` allows the use
 639    /// of fake timers in tests.
 640    pub fn now(&self) -> Instant {
 641        self.dispatcher.now()
 642    }
 643
 644    /// Returns a task that will complete after the given duration.
 645    /// Depending on other concurrent tasks the elapsed duration may be longer
 646    /// than requested.
 647    pub fn timer(&self, duration: Duration) -> Task<()> {
 648        if duration.is_zero() {
 649            return Task::ready(());
 650        }
 651        let location = core::panic::Location::caller();
 652        let (runnable, task) = async_task::Builder::new()
 653            .metadata(RunnableMeta {
 654                location,
 655                app: None,
 656            })
 657            .spawn(move |_| async move {}, {
 658                let dispatcher = self.dispatcher.clone();
 659                move |runnable| dispatcher.dispatch_after(duration, RunnableVariant::Meta(runnable))
 660            });
 661        runnable.schedule();
 662        Task(TaskState::Spawned(task))
 663    }
 664
 665    /// in tests, start_waiting lets you indicate which task is waiting (for debugging only)
 666    #[cfg(any(test, feature = "test-support"))]
 667    pub fn start_waiting(&self) {
 668        self.dispatcher.as_test().unwrap().start_waiting();
 669    }
 670
 671    /// in tests, removes the debugging data added by start_waiting
 672    #[cfg(any(test, feature = "test-support"))]
 673    pub fn finish_waiting(&self) {
 674        self.dispatcher.as_test().unwrap().finish_waiting();
 675    }
 676
 677    /// in tests, run an arbitrary number of tasks (determined by the SEED environment variable)
 678    #[cfg(any(test, feature = "test-support"))]
 679    pub fn simulate_random_delay(&self) -> impl Future<Output = ()> + use<> {
 680        self.dispatcher.as_test().unwrap().simulate_random_delay()
 681    }
 682
 683    /// in tests, indicate that a given task from `spawn_labeled` should run after everything else
 684    #[cfg(any(test, feature = "test-support"))]
 685    pub fn deprioritize(&self, task_label: TaskLabel) {
 686        self.dispatcher.as_test().unwrap().deprioritize(task_label)
 687    }
 688
 689    /// in tests, move time forward. This does not run any tasks, but does make `timer`s ready.
 690    #[cfg(any(test, feature = "test-support"))]
 691    pub fn advance_clock(&self, duration: Duration) {
 692        self.dispatcher.as_test().unwrap().advance_clock(duration)
 693    }
 694
 695    /// in tests, run one task.
 696    #[cfg(any(test, feature = "test-support"))]
 697    pub fn tick(&self) -> bool {
 698        self.dispatcher.as_test().unwrap().tick(false)
 699    }
 700
 701    /// in tests, run all tasks that are ready to run. If after doing so
 702    /// the test still has outstanding tasks, this will panic. (See also [`Self::allow_parking`])
 703    #[cfg(any(test, feature = "test-support"))]
 704    pub fn run_until_parked(&self) {
 705        self.dispatcher.as_test().unwrap().run_until_parked()
 706    }
 707
 708    /// in tests, prevents `run_until_parked` from panicking if there are outstanding tasks.
 709    /// This is useful when you are integrating other (non-GPUI) futures, like disk access, that
 710    /// do take real async time to run.
 711    #[cfg(any(test, feature = "test-support"))]
 712    pub fn allow_parking(&self) {
 713        self.dispatcher.as_test().unwrap().allow_parking();
 714    }
 715
 716    /// undoes the effect of [`Self::allow_parking`].
 717    #[cfg(any(test, feature = "test-support"))]
 718    pub fn forbid_parking(&self) {
 719        self.dispatcher.as_test().unwrap().forbid_parking();
 720    }
 721
 722    /// adds detail to the "parked with nothing let to run" message.
 723    #[cfg(any(test, feature = "test-support"))]
 724    pub fn set_waiting_hint(&self, msg: Option<String>) {
 725        self.dispatcher.as_test().unwrap().set_waiting_hint(msg);
 726    }
 727
 728    /// in tests, returns the rng used by the dispatcher and seeded by the `SEED` environment variable
 729    #[cfg(any(test, feature = "test-support"))]
 730    pub fn rng(&self) -> StdRng {
 731        self.dispatcher.as_test().unwrap().rng()
 732    }
 733
 734    /// How many CPUs are available to the dispatcher.
 735    pub fn num_cpus(&self) -> usize {
 736        #[cfg(any(test, feature = "test-support"))]
 737        return 4;
 738
 739        #[cfg(not(any(test, feature = "test-support")))]
 740        return num_cpus::get();
 741    }
 742
 743    /// Whether we're on the main thread.
 744    pub fn is_main_thread(&self) -> bool {
 745        self.dispatcher.is_main_thread()
 746    }
 747
 748    #[cfg(any(test, feature = "test-support"))]
 749    /// in tests, control the number of ticks that `block_with_timeout` will run before timing out.
 750    pub fn set_block_on_ticks(&self, range: std::ops::RangeInclusive<usize>) {
 751        self.dispatcher.as_test().unwrap().set_block_on_ticks(range);
 752    }
 753}
 754
 755/// ForegroundExecutor runs things on the main thread.
 756impl ForegroundExecutor {
 757    /// Creates a new ForegroundExecutor from the given PlatformDispatcher.
 758    pub fn new(dispatcher: Arc<dyn PlatformDispatcher>) -> Self {
 759        Self {
 760            dispatcher,
 761            not_send: PhantomData,
 762        }
 763    }
 764
 765    /// Enqueues the given Task to run on the main thread at some point in the future.
 766    #[track_caller]
 767    pub fn spawn<R>(&self, future: impl Future<Output = R> + 'static) -> Task<R>
 768    where
 769        R: 'static,
 770    {
 771        self.inner_spawn(None, Priority::default(), future)
 772    }
 773
 774    /// Enqueues the given Task to run on the main thread at some point in the future.
 775    #[track_caller]
 776    pub fn spawn_with_priority<R>(
 777        &self,
 778        priority: Priority,
 779        future: impl Future<Output = R> + 'static,
 780    ) -> Task<R>
 781    where
 782        R: 'static,
 783    {
 784        self.inner_spawn(None, priority, future)
 785    }
 786
 787    #[track_caller]
 788    pub(crate) fn spawn_context<R>(
 789        &self,
 790        app: AppLivenessToken,
 791        future: impl Future<Output = R> + 'static,
 792    ) -> Task<R>
 793    where
 794        R: 'static,
 795    {
 796        self.inner_spawn(Some(app), Priority::default(), future)
 797    }
 798
 799    #[track_caller]
 800    pub(crate) fn inner_spawn<R>(
 801        &self,
 802        app: Option<AppLivenessToken>,
 803        priority: Priority,
 804        future: impl Future<Output = R> + 'static,
 805    ) -> Task<R>
 806    where
 807        R: 'static,
 808    {
 809        let dispatcher = self.dispatcher.clone();
 810        let location = core::panic::Location::caller();
 811
 812        #[track_caller]
 813        fn inner<R: 'static>(
 814            dispatcher: Arc<dyn PlatformDispatcher>,
 815            future: AnyLocalFuture<R>,
 816            location: &'static core::panic::Location<'static>,
 817            app: Option<AppLivenessToken>,
 818            priority: Priority,
 819        ) -> Task<R> {
 820            let (runnable, task) = spawn_local_with_source_location(
 821                future,
 822                move |runnable| {
 823                    dispatcher.dispatch_on_main_thread(RunnableVariant::Meta(runnable), priority)
 824                },
 825                RunnableMeta { location, app },
 826            );
 827            runnable.schedule();
 828            Task(TaskState::Spawned(task))
 829        }
 830        inner::<R>(dispatcher, Box::pin(future), location, app, priority)
 831    }
 832}
 833
 834/// Variant of `async_task::spawn_local` that includes the source location of the spawn in panics.
 835///
 836/// Copy-modified from:
 837/// <https://github.com/smol-rs/async-task/blob/ca9dbe1db9c422fd765847fa91306e30a6bb58a9/src/runnable.rs#L405>
 838#[track_caller]
 839fn spawn_local_with_source_location<Fut, S, M>(
 840    future: Fut,
 841    schedule: S,
 842    metadata: M,
 843) -> (Runnable<M>, async_task::Task<Fut::Output, M>)
 844where
 845    Fut: Future + 'static,
 846    Fut::Output: 'static,
 847    S: async_task::Schedule<M> + Send + Sync + 'static,
 848    M: 'static,
 849{
 850    #[inline]
 851    fn thread_id() -> ThreadId {
 852        std::thread_local! {
 853            static ID: ThreadId = thread::current().id();
 854        }
 855        ID.try_with(|id| *id)
 856            .unwrap_or_else(|_| thread::current().id())
 857    }
 858
 859    struct Checked<F> {
 860        id: ThreadId,
 861        inner: ManuallyDrop<F>,
 862        location: &'static Location<'static>,
 863    }
 864
 865    impl<F> Drop for Checked<F> {
 866        fn drop(&mut self) {
 867            assert!(
 868                self.id == thread_id(),
 869                "local task dropped by a thread that didn't spawn it. Task spawned at {}",
 870                self.location
 871            );
 872            unsafe { ManuallyDrop::drop(&mut self.inner) };
 873        }
 874    }
 875
 876    impl<F: Future> Future for Checked<F> {
 877        type Output = F::Output;
 878
 879        fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
 880            assert!(
 881                self.id == thread_id(),
 882                "local task polled by a thread that didn't spawn it. Task spawned at {}",
 883                self.location
 884            );
 885            unsafe { self.map_unchecked_mut(|c| &mut *c.inner).poll(cx) }
 886        }
 887    }
 888
 889    // Wrap the future into one that checks which thread it's on.
 890    let future = Checked {
 891        id: thread_id(),
 892        inner: ManuallyDrop::new(future),
 893        location: Location::caller(),
 894    };
 895
 896    unsafe {
 897        async_task::Builder::new()
 898            .metadata(metadata)
 899            .spawn_unchecked(move |_| future, schedule)
 900    }
 901}
 902
 903/// Scope manages a set of tasks that are enqueued and waited on together. See [`BackgroundExecutor::scoped`].
 904pub struct Scope<'a> {
 905    executor: BackgroundExecutor,
 906    priority: Priority,
 907    futures: Vec<Pin<Box<dyn Future<Output = ()> + Send + 'static>>>,
 908    tx: Option<mpsc::Sender<()>>,
 909    rx: mpsc::Receiver<()>,
 910    lifetime: PhantomData<&'a ()>,
 911}
 912
 913impl<'a> Scope<'a> {
 914    fn new(executor: BackgroundExecutor, priority: Priority) -> Self {
 915        let (tx, rx) = mpsc::channel(1);
 916        Self {
 917            executor,
 918            priority,
 919            tx: Some(tx),
 920            rx,
 921            futures: Default::default(),
 922            lifetime: PhantomData,
 923        }
 924    }
 925
 926    /// How many CPUs are available to the dispatcher.
 927    pub fn num_cpus(&self) -> usize {
 928        self.executor.num_cpus()
 929    }
 930
 931    /// Spawn a future into this scope.
 932    #[track_caller]
 933    pub fn spawn<F>(&mut self, f: F)
 934    where
 935        F: Future<Output = ()> + Send + 'a,
 936    {
 937        let tx = self.tx.clone().unwrap();
 938
 939        // SAFETY: The 'a lifetime is guaranteed to outlive any of these futures because
 940        // dropping this `Scope` blocks until all of the futures have resolved.
 941        let f = unsafe {
 942            mem::transmute::<
 943                Pin<Box<dyn Future<Output = ()> + Send + 'a>>,
 944                Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
 945            >(Box::pin(async move {
 946                f.await;
 947                drop(tx);
 948            }))
 949        };
 950        self.futures.push(f);
 951    }
 952}
 953
 954impl Drop for Scope<'_> {
 955    fn drop(&mut self) {
 956        self.tx.take().unwrap();
 957
 958        // Wait until the channel is closed, which means that all of the spawned
 959        // futures have resolved.
 960        self.executor.block(self.rx.next());
 961    }
 962}
 963
 964#[cfg(test)]
 965mod test {
 966    use super::*;
 967    use crate::{App, TestDispatcher, TestPlatform};
 968    use rand::SeedableRng;
 969    use std::cell::RefCell;
 970
 971    #[test]
 972    fn sanity_test_tasks_run() {
 973        let dispatcher = TestDispatcher::new(StdRng::seed_from_u64(0));
 974        let arc_dispatcher = Arc::new(dispatcher.clone());
 975        let background_executor = BackgroundExecutor::new(arc_dispatcher.clone());
 976        let foreground_executor = ForegroundExecutor::new(arc_dispatcher);
 977
 978        let platform = TestPlatform::new(background_executor, foreground_executor.clone());
 979        let asset_source = Arc::new(());
 980        let http_client = http_client::FakeHttpClient::with_404_response();
 981
 982        let app = App::new_app(platform, asset_source, http_client);
 983        let liveness_token = app.borrow().liveness.token();
 984
 985        let task_ran = Rc::new(RefCell::new(false));
 986
 987        foreground_executor
 988            .spawn_context(liveness_token, {
 989                let task_ran = Rc::clone(&task_ran);
 990                async move {
 991                    *task_ran.borrow_mut() = true;
 992                }
 993            })
 994            .detach();
 995
 996        // Run dispatcher while app is still alive
 997        dispatcher.run_until_parked();
 998
 999        // Task should have run
1000        assert!(
1001            *task_ran.borrow(),
1002            "Task should run normally when app is alive"
1003        );
1004    }
1005
1006    #[test]
1007    fn test_task_cancelled_when_app_dropped() {
1008        let dispatcher = TestDispatcher::new(StdRng::seed_from_u64(0));
1009        let arc_dispatcher = Arc::new(dispatcher.clone());
1010        let background_executor = BackgroundExecutor::new(arc_dispatcher.clone());
1011        let foreground_executor = ForegroundExecutor::new(arc_dispatcher);
1012
1013        let platform = TestPlatform::new(background_executor, foreground_executor.clone());
1014        let asset_source = Arc::new(());
1015        let http_client = http_client::FakeHttpClient::with_404_response();
1016
1017        let app = App::new_app(platform, asset_source, http_client);
1018        let liveness_token = app.borrow().liveness.token();
1019        let app_weak = Rc::downgrade(&app);
1020
1021        let task_ran = Rc::new(RefCell::new(false));
1022        let task_ran_clone = Rc::clone(&task_ran);
1023
1024        foreground_executor
1025            .spawn_context(liveness_token, async move {
1026                *task_ran_clone.borrow_mut() = true;
1027            })
1028            .detach();
1029
1030        drop(app);
1031
1032        assert!(app_weak.upgrade().is_none(), "App should have been dropped");
1033
1034        dispatcher.run_until_parked();
1035
1036        // The task should have been cancelled, not run
1037        assert!(
1038            !*task_ran.borrow(),
1039            "Task should have been cancelled when app was dropped, but it ran!"
1040        );
1041    }
1042
1043    #[test]
1044    fn test_nested_tasks_both_cancel() {
1045        let dispatcher = TestDispatcher::new(StdRng::seed_from_u64(0));
1046        let arc_dispatcher = Arc::new(dispatcher.clone());
1047        let background_executor = BackgroundExecutor::new(arc_dispatcher.clone());
1048        let foreground_executor = ForegroundExecutor::new(arc_dispatcher);
1049
1050        let platform = TestPlatform::new(background_executor, foreground_executor.clone());
1051        let asset_source = Arc::new(());
1052        let http_client = http_client::FakeHttpClient::with_404_response();
1053
1054        let app = App::new_app(platform, asset_source, http_client);
1055        let liveness_token = app.borrow().liveness.token();
1056        let app_weak = Rc::downgrade(&app);
1057
1058        let outer_completed = Rc::new(RefCell::new(false));
1059        let inner_completed = Rc::new(RefCell::new(false));
1060        let reached_await = Rc::new(RefCell::new(false));
1061
1062        let outer_flag = Rc::clone(&outer_completed);
1063        let inner_flag = Rc::clone(&inner_completed);
1064        let await_flag = Rc::clone(&reached_await);
1065
1066        // Channel to block the inner task until we're ready
1067        let (tx, rx) = futures::channel::oneshot::channel::<()>();
1068
1069        // We need clones of executor and liveness_token for the inner spawn
1070        let inner_executor = foreground_executor.clone();
1071        let inner_liveness_token = liveness_token.clone();
1072
1073        // Spawn outer task that will spawn and await an inner task
1074        foreground_executor
1075            .spawn_context(liveness_token, async move {
1076                let inner_flag_clone = Rc::clone(&inner_flag);
1077
1078                // Spawn inner task that blocks on a channel
1079                let inner_task = inner_executor.spawn_context(inner_liveness_token, async move {
1080                    // Wait for signal (which will never come - we'll drop the app instead)
1081                    rx.await.ok();
1082                    *inner_flag_clone.borrow_mut() = true;
1083                });
1084
1085                // Mark that we've reached the await point
1086                *await_flag.borrow_mut() = true;
1087
1088                // Await inner task - this should not panic when both are cancelled
1089                inner_task.await;
1090
1091                // Mark outer as complete (should never reach here)
1092                *outer_flag.borrow_mut() = true;
1093            })
1094            .detach();
1095
1096        // Run dispatcher until outer task reaches the await point
1097        // The inner task will be blocked on the channel
1098        dispatcher.run_until_parked();
1099
1100        // Verify we actually reached the await point before dropping the app
1101        assert!(
1102            *reached_await.borrow(),
1103            "Outer task should have reached the await point"
1104        );
1105
1106        // Neither task should have completed yet
1107        assert!(
1108            !*outer_completed.borrow(),
1109            "Outer task should not have completed yet"
1110        );
1111        assert!(
1112            !*inner_completed.borrow(),
1113            "Inner task should not have completed yet"
1114        );
1115
1116        // Drop the channel sender and app while outer is awaiting inner
1117        drop(tx);
1118        drop(app);
1119        assert!(app_weak.upgrade().is_none(), "App should have been dropped");
1120
1121        // Run dispatcher - both tasks should be cancelled
1122        dispatcher.run_until_parked();
1123
1124        // Neither task should have completed (both were cancelled)
1125        assert!(
1126            !*outer_completed.borrow(),
1127            "Outer task should have been cancelled, not completed"
1128        );
1129        assert!(
1130            !*inner_completed.borrow(),
1131            "Inner task should have been cancelled, not completed"
1132        );
1133    }
1134
1135    #[test]
1136    fn test_task_without_app_tracking_still_runs() {
1137        let dispatcher = TestDispatcher::new(StdRng::seed_from_u64(0));
1138        let arc_dispatcher = Arc::new(dispatcher.clone());
1139        let background_executor = BackgroundExecutor::new(arc_dispatcher.clone());
1140        let foreground_executor = ForegroundExecutor::new(arc_dispatcher);
1141
1142        let platform = TestPlatform::new(background_executor, foreground_executor.clone());
1143        let asset_source = Arc::new(());
1144        let http_client = http_client::FakeHttpClient::with_404_response();
1145
1146        let app = App::new_app(platform, asset_source, http_client);
1147        let app_weak = Rc::downgrade(&app);
1148
1149        let task_ran = Rc::new(RefCell::new(false));
1150        let task_ran_clone = Rc::clone(&task_ran);
1151
1152        let _task = foreground_executor.spawn(async move {
1153            *task_ran_clone.borrow_mut() = true;
1154        });
1155
1156        drop(app);
1157
1158        assert!(app_weak.upgrade().is_none(), "App should have been dropped");
1159
1160        dispatcher.run_until_parked();
1161
1162        assert!(
1163            *task_ran.borrow(),
1164            "Task without app tracking should still run after app is dropped"
1165        );
1166    }
1167
1168    #[test]
1169    #[should_panic]
1170    fn test_polling_cancelled_task_panics() {
1171        let dispatcher = TestDispatcher::new(StdRng::seed_from_u64(0));
1172        let arc_dispatcher = Arc::new(dispatcher.clone());
1173        let background_executor = BackgroundExecutor::new(arc_dispatcher.clone());
1174        let foreground_executor = ForegroundExecutor::new(arc_dispatcher);
1175
1176        let platform = TestPlatform::new(background_executor.clone(), foreground_executor.clone());
1177        let asset_source = Arc::new(());
1178        let http_client = http_client::FakeHttpClient::with_404_response();
1179
1180        let app = App::new_app(platform, asset_source, http_client);
1181        let liveness_token = app.borrow().liveness.token();
1182        let app_weak = Rc::downgrade(&app);
1183
1184        let task = foreground_executor.spawn_context(liveness_token, async move { 42 });
1185
1186        drop(app);
1187
1188        assert!(app_weak.upgrade().is_none(), "App should have been dropped");
1189
1190        dispatcher.run_until_parked();
1191
1192        background_executor.block(task);
1193    }
1194
1195    #[test]
1196    fn test_polling_cancelled_task_returns_none_with_fallible() {
1197        let dispatcher = TestDispatcher::new(StdRng::seed_from_u64(0));
1198        let arc_dispatcher = Arc::new(dispatcher.clone());
1199        let background_executor = BackgroundExecutor::new(arc_dispatcher.clone());
1200        let foreground_executor = ForegroundExecutor::new(arc_dispatcher);
1201
1202        let platform = TestPlatform::new(background_executor.clone(), foreground_executor.clone());
1203        let asset_source = Arc::new(());
1204        let http_client = http_client::FakeHttpClient::with_404_response();
1205
1206        let app = App::new_app(platform, asset_source, http_client);
1207        let liveness_token = app.borrow().liveness.token();
1208        let app_weak = Rc::downgrade(&app);
1209
1210        let task = foreground_executor
1211            .spawn_context(liveness_token, async move { 42 })
1212            .fallible();
1213
1214        drop(app);
1215
1216        assert!(app_weak.upgrade().is_none(), "App should have been dropped");
1217
1218        dispatcher.run_until_parked();
1219
1220        let result = background_executor.block(task);
1221        assert_eq!(result, None, "Cancelled task should return None");
1222    }
1223
1224    #[test]
1225    fn test_app_liveness_token_can_be_dropped_on_background_thread() {
1226        let dispatcher = TestDispatcher::new(StdRng::seed_from_u64(0));
1227        let arc_dispatcher = Arc::new(dispatcher.clone());
1228        let background_executor = BackgroundExecutor::new(arc_dispatcher.clone());
1229        let foreground_executor = ForegroundExecutor::new(arc_dispatcher);
1230
1231        let platform = TestPlatform::new(background_executor, foreground_executor);
1232        let asset_source = Arc::new(());
1233        let http_client = http_client::FakeHttpClient::with_404_response();
1234
1235        let app = App::new_app(platform, asset_source, http_client);
1236        let liveness_token = app.borrow().liveness.token();
1237
1238        // Drop the token on a real background thread.
1239        std::thread::spawn(move || {
1240            drop(liveness_token);
1241        })
1242        .join()
1243        .expect("Dropping AppLivenessToken on background thread should not panic");
1244    }
1245}