executor.rs

   1use crate::{App, PlatformDispatcher, RunnableMeta, RunnableVariant, TaskTiming, profiler};
   2use async_task::Runnable;
   3use futures::channel::mpsc;
   4use parking_lot::{Condvar, Mutex};
   5use smol::prelude::*;
   6use std::{
   7    fmt::Debug,
   8    marker::PhantomData,
   9    mem::{self, ManuallyDrop},
  10    num::NonZeroUsize,
  11    panic::Location,
  12    pin::Pin,
  13    rc::Rc,
  14    sync::{
  15        Arc,
  16        atomic::{AtomicUsize, Ordering},
  17    },
  18    task::{Context, Poll},
  19    thread::{self, ThreadId},
  20    time::{Duration, Instant},
  21};
  22use util::TryFutureExt;
  23use waker_fn::waker_fn;
  24
  25#[cfg(any(test, feature = "test-support"))]
  26use rand::rngs::StdRng;
  27
  28/// A pointer to the executor that is currently running,
  29/// for spawning background tasks.
  30#[derive(Clone)]
  31pub struct BackgroundExecutor {
  32    #[doc(hidden)]
  33    pub dispatcher: Arc<dyn PlatformDispatcher>,
  34}
  35
  36/// A pointer to the executor that is currently running,
  37/// for spawning tasks on the main thread.
  38///
  39/// This is intentionally `!Send` via the `not_send` marker field. This is because
  40/// `ForegroundExecutor::spawn` does not require `Send` but checks at runtime that the future is
  41/// only polled from the same thread it was spawned from. These checks would fail when spawning
  42/// foreground tasks from background threads.
  43#[derive(Clone)]
  44pub struct ForegroundExecutor {
  45    #[doc(hidden)]
  46    pub dispatcher: Arc<dyn PlatformDispatcher>,
  47    not_send: PhantomData<Rc<()>>,
  48}
  49
  50/// Realtime task priority
  51#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
  52#[repr(u8)]
  53pub enum RealtimePriority {
  54    /// Audio task
  55    Audio,
  56    /// Other realtime task
  57    #[default]
  58    Other,
  59}
  60
  61/// Task priority
  62#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
  63#[repr(u8)]
  64pub enum Priority {
  65    /// Realtime priority
  66    ///
  67    /// Spawning a task with this priority will spin it off on a separate thread dedicated just to that task.
  68    Realtime(RealtimePriority),
  69    /// High priority
  70    ///
  71    /// Only use for tasks that are critical to the user experience / responsiveness of the editor.
  72    High,
  73    /// Medium priority, probably suits most of your use cases.
  74    #[default]
  75    Medium,
  76    /// Low priority
  77    ///
  78    /// Prioritize this for background work that can come in large quantities
  79    /// to not starve the executor of resources for high priority tasks
  80    Low,
  81}
  82
  83impl Priority {
  84    #[allow(dead_code)]
  85    pub(crate) const fn probability(&self) -> u32 {
  86        match self {
  87            // realtime priorities are not considered for probability scheduling
  88            Priority::Realtime(_) => 0,
  89            Priority::High => 60,
  90            Priority::Medium => 30,
  91            Priority::Low => 10,
  92        }
  93    }
  94}
  95
  96/// Task is a primitive that allows work to happen in the background.
  97///
  98/// It implements [`Future`] so you can `.await` on it.
  99///
 100/// If you drop a task it will be cancelled immediately. Calling [`Task::detach`] allows
 101/// the task to continue running, but with no way to return a value.
 102#[must_use]
 103#[derive(Debug)]
 104pub struct Task<T>(TaskState<T>);
 105
 106#[derive(Debug)]
 107enum TaskState<T> {
 108    /// A task that is ready to return a value
 109    Ready(Option<T>),
 110
 111    /// A task that is currently running.
 112    Spawned(async_task::Task<T, RunnableMeta>),
 113}
 114
 115impl<T> Task<T> {
 116    /// Creates a new task that will resolve with the value
 117    pub fn ready(val: T) -> Self {
 118        Task(TaskState::Ready(Some(val)))
 119    }
 120
 121    /// Detaching a task runs it to completion in the background
 122    pub fn detach(self) {
 123        match self {
 124            Task(TaskState::Ready(_)) => {}
 125            Task(TaskState::Spawned(task)) => task.detach(),
 126        }
 127    }
 128
 129    /// Converts this task into a fallible task that returns `Option<T>`.
 130    ///
 131    /// Unlike the standard `Task<T>`, a [`FallibleTask`] will return `None`
 132    /// if the app was dropped while the task is executing.
 133    ///
 134    /// # Example
 135    ///
 136    /// ```ignore
 137    /// // Background task that gracefully handles app shutdown:
 138    /// cx.background_spawn(async move {
 139    ///     let result = foreground_task.fallible().await;
 140    ///     if let Some(value) = result {
 141    ///         // Process the value
 142    ///     }
 143    ///     // If None, app was shut down - just exit gracefully
 144    /// }).detach();
 145    /// ```
 146    pub fn fallible(self) -> FallibleTask<T> {
 147        FallibleTask(match self.0 {
 148            TaskState::Ready(val) => FallibleTaskState::Ready(val),
 149            TaskState::Spawned(task) => FallibleTaskState::Spawned(task.fallible()),
 150        })
 151    }
 152}
 153
 154impl<E, T> Task<Result<T, E>>
 155where
 156    T: 'static,
 157    E: 'static + Debug,
 158{
 159    /// Run the task to completion in the background and log any
 160    /// errors that occur.
 161    #[track_caller]
 162    pub fn detach_and_log_err(self, cx: &App) {
 163        let location = core::panic::Location::caller();
 164        cx.foreground_executor()
 165            .spawn(self.log_tracked_err(*location))
 166            .detach();
 167    }
 168}
 169
 170impl<T> Future for Task<T> {
 171    type Output = T;
 172
 173    fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
 174        match unsafe { self.get_unchecked_mut() } {
 175            Task(TaskState::Ready(val)) => Poll::Ready(val.take().unwrap()),
 176            Task(TaskState::Spawned(task)) => task.poll(cx),
 177        }
 178    }
 179}
 180
 181/// A task that returns `Option<T>` instead of panicking when cancelled.
 182#[must_use]
 183pub struct FallibleTask<T>(FallibleTaskState<T>);
 184
 185enum FallibleTaskState<T> {
 186    /// A task that is ready to return a value
 187    Ready(Option<T>),
 188
 189    /// A task that is currently running (wraps async_task::FallibleTask).
 190    Spawned(async_task::FallibleTask<T, RunnableMeta>),
 191}
 192
 193impl<T> FallibleTask<T> {
 194    /// Creates a new fallible task that will resolve with the value.
 195    pub fn ready(val: T) -> Self {
 196        FallibleTask(FallibleTaskState::Ready(Some(val)))
 197    }
 198
 199    /// Detaching a task runs it to completion in the background.
 200    pub fn detach(self) {
 201        match self.0 {
 202            FallibleTaskState::Ready(_) => {}
 203            FallibleTaskState::Spawned(task) => task.detach(),
 204        }
 205    }
 206}
 207
 208impl<T> Future for FallibleTask<T> {
 209    type Output = Option<T>;
 210
 211    fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
 212        match unsafe { self.get_unchecked_mut() } {
 213            FallibleTask(FallibleTaskState::Ready(val)) => Poll::Ready(val.take()),
 214            FallibleTask(FallibleTaskState::Spawned(task)) => Pin::new(task).poll(cx),
 215        }
 216    }
 217}
 218
 219impl<T> std::fmt::Debug for FallibleTask<T> {
 220    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
 221        match &self.0 {
 222            FallibleTaskState::Ready(_) => f.debug_tuple("FallibleTask::Ready").finish(),
 223            FallibleTaskState::Spawned(task) => {
 224                f.debug_tuple("FallibleTask::Spawned").field(task).finish()
 225            }
 226        }
 227    }
 228}
 229
 230/// A task label is an opaque identifier that you can use to
 231/// refer to a task in tests.
 232#[derive(Clone, Copy, PartialEq, Eq, Hash, Debug)]
 233pub struct TaskLabel(NonZeroUsize);
 234
 235impl Default for TaskLabel {
 236    fn default() -> Self {
 237        Self::new()
 238    }
 239}
 240
 241impl TaskLabel {
 242    /// Construct a new task label.
 243    pub fn new() -> Self {
 244        static NEXT_TASK_LABEL: AtomicUsize = AtomicUsize::new(1);
 245        Self(
 246            NEXT_TASK_LABEL
 247                .fetch_add(1, Ordering::SeqCst)
 248                .try_into()
 249                .unwrap(),
 250        )
 251    }
 252}
 253
 254type AnyLocalFuture<R> = Pin<Box<dyn 'static + Future<Output = R>>>;
 255
 256type AnyFuture<R> = Pin<Box<dyn 'static + Send + Future<Output = R>>>;
 257
 258/// BackgroundExecutor lets you run things on background threads.
 259/// In production this is a thread pool with no ordering guarantees.
 260/// In tests this is simulated by running tasks one by one in a deterministic
 261/// (but arbitrary) order controlled by the `SEED` environment variable.
 262impl BackgroundExecutor {
 263    #[doc(hidden)]
 264    pub fn new(dispatcher: Arc<dyn PlatformDispatcher>) -> Self {
 265        Self { dispatcher }
 266    }
 267
 268    /// Enqueues the given future to be run to completion on a background thread.
 269    #[track_caller]
 270    pub fn spawn<R>(&self, future: impl Future<Output = R> + Send + 'static) -> Task<R>
 271    where
 272        R: Send + 'static,
 273    {
 274        self.spawn_with_priority(Priority::default(), future)
 275    }
 276
 277    /// Enqueues the given future to be run to completion on a background thread.
 278    #[track_caller]
 279    pub fn spawn_with_priority<R>(
 280        &self,
 281        priority: Priority,
 282        future: impl Future<Output = R> + Send + 'static,
 283    ) -> Task<R>
 284    where
 285        R: Send + 'static,
 286    {
 287        self.spawn_internal::<R>(Box::pin(future), None, priority)
 288    }
 289
 290    /// Enqueues the given future to be run to completion on a background thread and blocking the current task on it.
 291    ///
 292    /// This allows to spawn background work that borrows from its scope. Note that the supplied future will run to
 293    /// completion before the current task is resumed, even if the current task is slated for cancellation.
 294    pub async fn await_on_background<R>(&self, future: impl Future<Output = R> + Send) -> R
 295    where
 296        R: Send,
 297    {
 298        // We need to ensure that cancellation of the parent task does not drop the environment
 299        // before the our own task has completed or got cancelled.
 300        struct NotifyOnDrop<'a>(&'a (Condvar, Mutex<bool>));
 301
 302        impl Drop for NotifyOnDrop<'_> {
 303            fn drop(&mut self) {
 304                *self.0.1.lock() = true;
 305                self.0.0.notify_all();
 306            }
 307        }
 308
 309        struct WaitOnDrop<'a>(&'a (Condvar, Mutex<bool>));
 310
 311        impl Drop for WaitOnDrop<'_> {
 312            fn drop(&mut self) {
 313                let mut done = self.0.1.lock();
 314                if !*done {
 315                    self.0.0.wait(&mut done);
 316                }
 317            }
 318        }
 319
 320        let dispatcher = self.dispatcher.clone();
 321        let location = core::panic::Location::caller();
 322
 323        let pair = &(Condvar::new(), Mutex::new(false));
 324        let _wait_guard = WaitOnDrop(pair);
 325
 326        let (runnable, task) = unsafe {
 327            async_task::Builder::new()
 328                .metadata(RunnableMeta {
 329                    location,
 330                    app: None,
 331                })
 332                .spawn_unchecked(
 333                    move |_| async {
 334                        let _notify_guard = NotifyOnDrop(pair);
 335                        future.await
 336                    },
 337                    move |runnable| {
 338                        dispatcher.dispatch(
 339                            RunnableVariant::Meta(runnable),
 340                            None,
 341                            Priority::default(),
 342                        )
 343                    },
 344                )
 345        };
 346        runnable.schedule();
 347        task.await
 348    }
 349
 350    /// Enqueues the given future to be run to completion on a background thread.
 351    /// The given label can be used to control the priority of the task in tests.
 352    #[track_caller]
 353    pub fn spawn_labeled<R>(
 354        &self,
 355        label: TaskLabel,
 356        future: impl Future<Output = R> + Send + 'static,
 357    ) -> Task<R>
 358    where
 359        R: Send + 'static,
 360    {
 361        self.spawn_internal::<R>(Box::pin(future), Some(label), Priority::default())
 362    }
 363
 364    #[track_caller]
 365    fn spawn_internal<R: Send + 'static>(
 366        &self,
 367        future: AnyFuture<R>,
 368        label: Option<TaskLabel>,
 369        priority: Priority,
 370    ) -> Task<R> {
 371        let dispatcher = self.dispatcher.clone();
 372        let (runnable, task) = if let Priority::Realtime(realtime) = priority {
 373            let location = core::panic::Location::caller();
 374            let (mut tx, rx) = flume::bounded::<Runnable<RunnableMeta>>(1);
 375
 376            dispatcher.spawn_realtime(
 377                realtime,
 378                Box::new(move || {
 379                    while let Ok(runnable) = rx.recv() {
 380                        let start = Instant::now();
 381                        let location = runnable.metadata().location;
 382                        let mut timing = TaskTiming {
 383                            location,
 384                            start,
 385                            end: None,
 386                        };
 387                        profiler::add_task_timing(timing);
 388
 389                        runnable.run();
 390
 391                        let end = Instant::now();
 392                        timing.end = Some(end);
 393                        profiler::add_task_timing(timing);
 394                    }
 395                }),
 396            );
 397
 398            async_task::Builder::new()
 399                .metadata(RunnableMeta {
 400                    location,
 401                    app: None,
 402                })
 403                .spawn(
 404                    move |_| future,
 405                    move |runnable| {
 406                        let _ = tx.send(runnable);
 407                    },
 408                )
 409        } else {
 410            let location = core::panic::Location::caller();
 411            async_task::Builder::new()
 412                .metadata(RunnableMeta {
 413                    location,
 414                    app: None,
 415                })
 416                .spawn(
 417                    move |_| future,
 418                    move |runnable| {
 419                        dispatcher.dispatch(RunnableVariant::Meta(runnable), label, priority)
 420                    },
 421                )
 422        };
 423
 424        runnable.schedule();
 425        Task(TaskState::Spawned(task))
 426    }
 427
 428    /// Used by the test harness to run an async test in a synchronous fashion.
 429    #[cfg(any(test, feature = "test-support"))]
 430    #[track_caller]
 431    pub fn block_test<R>(&self, future: impl Future<Output = R>) -> R {
 432        if let Ok(value) = self.block_internal(false, future, None) {
 433            value
 434        } else {
 435            unreachable!()
 436        }
 437    }
 438
 439    /// Block the current thread until the given future resolves.
 440    /// Consider using `block_with_timeout` instead.
 441    pub fn block<R>(&self, future: impl Future<Output = R>) -> R {
 442        if let Ok(value) = self.block_internal(true, future, None) {
 443            value
 444        } else {
 445            unreachable!()
 446        }
 447    }
 448
 449    #[cfg(not(any(test, feature = "test-support")))]
 450    pub(crate) fn block_internal<Fut: Future>(
 451        &self,
 452        _background_only: bool,
 453        future: Fut,
 454        timeout: Option<Duration>,
 455    ) -> Result<Fut::Output, impl Future<Output = Fut::Output> + use<Fut>> {
 456        use std::time::Instant;
 457
 458        let mut future = Box::pin(future);
 459        if timeout == Some(Duration::ZERO) {
 460            return Err(future);
 461        }
 462        let deadline = timeout.map(|timeout| Instant::now() + timeout);
 463
 464        let parker = parking::Parker::new();
 465        let unparker = parker.unparker();
 466        let waker = waker_fn(move || {
 467            unparker.unpark();
 468        });
 469        let mut cx = std::task::Context::from_waker(&waker);
 470
 471        loop {
 472            match future.as_mut().poll(&mut cx) {
 473                Poll::Ready(result) => return Ok(result),
 474                Poll::Pending => {
 475                    let timeout =
 476                        deadline.map(|deadline| deadline.saturating_duration_since(Instant::now()));
 477                    if let Some(timeout) = timeout {
 478                        if !parker.park_timeout(timeout)
 479                            && deadline.is_some_and(|deadline| deadline < Instant::now())
 480                        {
 481                            return Err(future);
 482                        }
 483                    } else {
 484                        parker.park();
 485                    }
 486                }
 487            }
 488        }
 489    }
 490
 491    #[cfg(any(test, feature = "test-support"))]
 492    #[track_caller]
 493    pub(crate) fn block_internal<Fut: Future>(
 494        &self,
 495        background_only: bool,
 496        future: Fut,
 497        timeout: Option<Duration>,
 498    ) -> Result<Fut::Output, impl Future<Output = Fut::Output> + use<Fut>> {
 499        use std::sync::atomic::AtomicBool;
 500
 501        use parking::Parker;
 502
 503        let mut future = Box::pin(future);
 504        if timeout == Some(Duration::ZERO) {
 505            return Err(future);
 506        }
 507        let Some(dispatcher) = self.dispatcher.as_test() else {
 508            return Err(future);
 509        };
 510
 511        let mut max_ticks = if timeout.is_some() {
 512            dispatcher.gen_block_on_ticks()
 513        } else {
 514            usize::MAX
 515        };
 516
 517        let parker = Parker::new();
 518        let unparker = parker.unparker();
 519
 520        let awoken = Arc::new(AtomicBool::new(false));
 521        let waker = waker_fn({
 522            let awoken = awoken.clone();
 523            let unparker = unparker.clone();
 524            move || {
 525                awoken.store(true, Ordering::SeqCst);
 526                unparker.unpark();
 527            }
 528        });
 529        let mut cx = std::task::Context::from_waker(&waker);
 530
 531        let duration = Duration::from_secs(
 532            option_env!("GPUI_TEST_TIMEOUT")
 533                .and_then(|s| s.parse::<u64>().ok())
 534                .unwrap_or(180),
 535        );
 536        let mut test_should_end_by = Instant::now() + duration;
 537
 538        loop {
 539            match future.as_mut().poll(&mut cx) {
 540                Poll::Ready(result) => return Ok(result),
 541                Poll::Pending => {
 542                    if max_ticks == 0 {
 543                        return Err(future);
 544                    }
 545                    max_ticks -= 1;
 546
 547                    if !dispatcher.tick(background_only) {
 548                        if awoken.swap(false, Ordering::SeqCst) {
 549                            continue;
 550                        }
 551
 552                        if !dispatcher.parking_allowed() {
 553                            if dispatcher.advance_clock_to_next_delayed() {
 554                                continue;
 555                            }
 556                            let mut backtrace_message = String::new();
 557                            let mut waiting_message = String::new();
 558                            if let Some(backtrace) = dispatcher.waiting_backtrace() {
 559                                backtrace_message =
 560                                    format!("\nbacktrace of waiting future:\n{:?}", backtrace);
 561                            }
 562                            if let Some(waiting_hint) = dispatcher.waiting_hint() {
 563                                waiting_message = format!("\n  waiting on: {}\n", waiting_hint);
 564                            }
 565                            panic!(
 566                                "parked with nothing left to run{waiting_message}{backtrace_message}",
 567                            )
 568                        }
 569                        dispatcher.push_unparker(unparker.clone());
 570                        parker.park_timeout(Duration::from_millis(1));
 571                        if Instant::now() > test_should_end_by {
 572                            panic!("test timed out after {duration:?} with allow_parking")
 573                        }
 574                    }
 575                }
 576            }
 577        }
 578    }
 579
 580    /// Block the current thread until the given future resolves
 581    /// or `duration` has elapsed.
 582    pub fn block_with_timeout<Fut: Future>(
 583        &self,
 584        duration: Duration,
 585        future: Fut,
 586    ) -> Result<Fut::Output, impl Future<Output = Fut::Output> + use<Fut>> {
 587        self.block_internal(true, future, Some(duration))
 588    }
 589
 590    /// Scoped lets you start a number of tasks and waits
 591    /// for all of them to complete before returning.
 592    pub async fn scoped<'scope, F>(&self, scheduler: F)
 593    where
 594        F: FnOnce(&mut Scope<'scope>),
 595    {
 596        let mut scope = Scope::new(self.clone(), Priority::default());
 597        (scheduler)(&mut scope);
 598        let spawned = mem::take(&mut scope.futures)
 599            .into_iter()
 600            .map(|f| self.spawn_with_priority(scope.priority, f))
 601            .collect::<Vec<_>>();
 602        for task in spawned {
 603            task.await;
 604        }
 605    }
 606
 607    /// Scoped lets you start a number of tasks and waits
 608    /// for all of them to complete before returning.
 609    pub async fn scoped_priority<'scope, F>(&self, priority: Priority, scheduler: F)
 610    where
 611        F: FnOnce(&mut Scope<'scope>),
 612    {
 613        let mut scope = Scope::new(self.clone(), priority);
 614        (scheduler)(&mut scope);
 615        let spawned = mem::take(&mut scope.futures)
 616            .into_iter()
 617            .map(|f| self.spawn_with_priority(scope.priority, f))
 618            .collect::<Vec<_>>();
 619        for task in spawned {
 620            task.await;
 621        }
 622    }
 623
 624    /// Get the current time.
 625    ///
 626    /// Calling this instead of `std::time::Instant::now` allows the use
 627    /// of fake timers in tests.
 628    pub fn now(&self) -> Instant {
 629        self.dispatcher.now()
 630    }
 631
 632    /// Returns a task that will complete after the given duration.
 633    /// Depending on other concurrent tasks the elapsed duration may be longer
 634    /// than requested.
 635    pub fn timer(&self, duration: Duration) -> Task<()> {
 636        if duration.is_zero() {
 637            return Task::ready(());
 638        }
 639        let location = core::panic::Location::caller();
 640        let (runnable, task) = async_task::Builder::new()
 641            .metadata(RunnableMeta {
 642                location,
 643                app: None,
 644            })
 645            .spawn(move |_| async move {}, {
 646                let dispatcher = self.dispatcher.clone();
 647                move |runnable| dispatcher.dispatch_after(duration, RunnableVariant::Meta(runnable))
 648            });
 649        runnable.schedule();
 650        Task(TaskState::Spawned(task))
 651    }
 652
 653    /// in tests, start_waiting lets you indicate which task is waiting (for debugging only)
 654    #[cfg(any(test, feature = "test-support"))]
 655    pub fn start_waiting(&self) {
 656        self.dispatcher.as_test().unwrap().start_waiting();
 657    }
 658
 659    /// in tests, removes the debugging data added by start_waiting
 660    #[cfg(any(test, feature = "test-support"))]
 661    pub fn finish_waiting(&self) {
 662        self.dispatcher.as_test().unwrap().finish_waiting();
 663    }
 664
 665    /// in tests, run an arbitrary number of tasks (determined by the SEED environment variable)
 666    #[cfg(any(test, feature = "test-support"))]
 667    pub fn simulate_random_delay(&self) -> impl Future<Output = ()> + use<> {
 668        self.dispatcher.as_test().unwrap().simulate_random_delay()
 669    }
 670
 671    /// in tests, indicate that a given task from `spawn_labeled` should run after everything else
 672    #[cfg(any(test, feature = "test-support"))]
 673    pub fn deprioritize(&self, task_label: TaskLabel) {
 674        self.dispatcher.as_test().unwrap().deprioritize(task_label)
 675    }
 676
 677    /// in tests, move time forward. This does not run any tasks, but does make `timer`s ready.
 678    #[cfg(any(test, feature = "test-support"))]
 679    pub fn advance_clock(&self, duration: Duration) {
 680        self.dispatcher.as_test().unwrap().advance_clock(duration)
 681    }
 682
 683    /// in tests, run one task.
 684    #[cfg(any(test, feature = "test-support"))]
 685    pub fn tick(&self) -> bool {
 686        self.dispatcher.as_test().unwrap().tick(false)
 687    }
 688
 689    /// in tests, run all tasks that are ready to run. If after doing so
 690    /// the test still has outstanding tasks, this will panic. (See also [`Self::allow_parking`])
 691    #[cfg(any(test, feature = "test-support"))]
 692    pub fn run_until_parked(&self) {
 693        self.dispatcher.as_test().unwrap().run_until_parked()
 694    }
 695
 696    /// in tests, prevents `run_until_parked` from panicking if there are outstanding tasks.
 697    /// This is useful when you are integrating other (non-GPUI) futures, like disk access, that
 698    /// do take real async time to run.
 699    #[cfg(any(test, feature = "test-support"))]
 700    pub fn allow_parking(&self) {
 701        self.dispatcher.as_test().unwrap().allow_parking();
 702    }
 703
 704    /// undoes the effect of [`Self::allow_parking`].
 705    #[cfg(any(test, feature = "test-support"))]
 706    pub fn forbid_parking(&self) {
 707        self.dispatcher.as_test().unwrap().forbid_parking();
 708    }
 709
 710    /// adds detail to the "parked with nothing let to run" message.
 711    #[cfg(any(test, feature = "test-support"))]
 712    pub fn set_waiting_hint(&self, msg: Option<String>) {
 713        self.dispatcher.as_test().unwrap().set_waiting_hint(msg);
 714    }
 715
 716    /// in tests, returns the rng used by the dispatcher and seeded by the `SEED` environment variable
 717    #[cfg(any(test, feature = "test-support"))]
 718    pub fn rng(&self) -> StdRng {
 719        self.dispatcher.as_test().unwrap().rng()
 720    }
 721
 722    /// How many CPUs are available to the dispatcher.
 723    pub fn num_cpus(&self) -> usize {
 724        #[cfg(any(test, feature = "test-support"))]
 725        return 4;
 726
 727        #[cfg(not(any(test, feature = "test-support")))]
 728        return num_cpus::get();
 729    }
 730
 731    /// Whether we're on the main thread.
 732    pub fn is_main_thread(&self) -> bool {
 733        self.dispatcher.is_main_thread()
 734    }
 735
 736    #[cfg(any(test, feature = "test-support"))]
 737    /// in tests, control the number of ticks that `block_with_timeout` will run before timing out.
 738    pub fn set_block_on_ticks(&self, range: std::ops::RangeInclusive<usize>) {
 739        self.dispatcher.as_test().unwrap().set_block_on_ticks(range);
 740    }
 741}
 742
 743/// ForegroundExecutor runs things on the main thread.
 744impl ForegroundExecutor {
 745    /// Creates a new ForegroundExecutor from the given PlatformDispatcher.
 746    pub fn new(dispatcher: Arc<dyn PlatformDispatcher>) -> Self {
 747        Self {
 748            dispatcher,
 749            not_send: PhantomData,
 750        }
 751    }
 752
 753    /// Enqueues the given Task to run on the main thread at some point in the future.
 754    #[track_caller]
 755    pub fn spawn<R>(&self, future: impl Future<Output = R> + 'static) -> Task<R>
 756    where
 757        R: 'static,
 758    {
 759        self.inner_spawn(None, Priority::default(), future)
 760    }
 761
 762    /// Enqueues the given Task to run on the main thread at some point in the future.
 763    #[track_caller]
 764    pub fn spawn_with_priority<R>(
 765        &self,
 766        priority: Priority,
 767        future: impl Future<Output = R> + 'static,
 768    ) -> Task<R>
 769    where
 770        R: 'static,
 771    {
 772        self.inner_spawn(None, priority, future)
 773    }
 774
 775    #[track_caller]
 776    pub(crate) fn spawn_context<R>(
 777        &self,
 778        app: std::sync::Weak<()>,
 779        future: impl Future<Output = R> + 'static,
 780    ) -> Task<R>
 781    where
 782        R: 'static,
 783    {
 784        self.inner_spawn(Some(app), Priority::default(), future)
 785    }
 786
 787    #[track_caller]
 788    pub(crate) fn inner_spawn<R>(
 789        &self,
 790        app: Option<std::sync::Weak<()>>,
 791        priority: Priority,
 792        future: impl Future<Output = R> + 'static,
 793    ) -> Task<R>
 794    where
 795        R: 'static,
 796    {
 797        let dispatcher = self.dispatcher.clone();
 798        let location = core::panic::Location::caller();
 799
 800        #[track_caller]
 801        fn inner<R: 'static>(
 802            dispatcher: Arc<dyn PlatformDispatcher>,
 803            future: AnyLocalFuture<R>,
 804            location: &'static core::panic::Location<'static>,
 805            app: Option<std::sync::Weak<()>>,
 806            priority: Priority,
 807        ) -> Task<R> {
 808            let (runnable, task) = spawn_local_with_source_location(
 809                future,
 810                move |runnable| {
 811                    dispatcher.dispatch_on_main_thread(RunnableVariant::Meta(runnable), priority)
 812                },
 813                RunnableMeta { location, app },
 814            );
 815            runnable.schedule();
 816            Task(TaskState::Spawned(task))
 817        }
 818        inner::<R>(dispatcher, Box::pin(future), location, app, priority)
 819    }
 820}
 821
 822/// Variant of `async_task::spawn_local` that includes the source location of the spawn in panics.
 823///
 824/// Copy-modified from:
 825/// <https://github.com/smol-rs/async-task/blob/ca9dbe1db9c422fd765847fa91306e30a6bb58a9/src/runnable.rs#L405>
 826#[track_caller]
 827fn spawn_local_with_source_location<Fut, S, M>(
 828    future: Fut,
 829    schedule: S,
 830    metadata: M,
 831) -> (Runnable<M>, async_task::Task<Fut::Output, M>)
 832where
 833    Fut: Future + 'static,
 834    Fut::Output: 'static,
 835    S: async_task::Schedule<M> + Send + Sync + 'static,
 836    M: 'static,
 837{
 838    #[inline]
 839    fn thread_id() -> ThreadId {
 840        std::thread_local! {
 841            static ID: ThreadId = thread::current().id();
 842        }
 843        ID.try_with(|id| *id)
 844            .unwrap_or_else(|_| thread::current().id())
 845    }
 846
 847    struct Checked<F> {
 848        id: ThreadId,
 849        inner: ManuallyDrop<F>,
 850        location: &'static Location<'static>,
 851    }
 852
 853    impl<F> Drop for Checked<F> {
 854        fn drop(&mut self) {
 855            assert!(
 856                self.id == thread_id(),
 857                "local task dropped by a thread that didn't spawn it. Task spawned at {}",
 858                self.location
 859            );
 860            unsafe { ManuallyDrop::drop(&mut self.inner) };
 861        }
 862    }
 863
 864    impl<F: Future> Future for Checked<F> {
 865        type Output = F::Output;
 866
 867        fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
 868            assert!(
 869                self.id == thread_id(),
 870                "local task polled by a thread that didn't spawn it. Task spawned at {}",
 871                self.location
 872            );
 873            unsafe { self.map_unchecked_mut(|c| &mut *c.inner).poll(cx) }
 874        }
 875    }
 876
 877    // Wrap the future into one that checks which thread it's on.
 878    let future = Checked {
 879        id: thread_id(),
 880        inner: ManuallyDrop::new(future),
 881        location: Location::caller(),
 882    };
 883
 884    unsafe {
 885        async_task::Builder::new()
 886            .metadata(metadata)
 887            .spawn_unchecked(move |_| future, schedule)
 888    }
 889}
 890
 891/// Scope manages a set of tasks that are enqueued and waited on together. See [`BackgroundExecutor::scoped`].
 892pub struct Scope<'a> {
 893    executor: BackgroundExecutor,
 894    priority: Priority,
 895    futures: Vec<Pin<Box<dyn Future<Output = ()> + Send + 'static>>>,
 896    tx: Option<mpsc::Sender<()>>,
 897    rx: mpsc::Receiver<()>,
 898    lifetime: PhantomData<&'a ()>,
 899}
 900
 901impl<'a> Scope<'a> {
 902    fn new(executor: BackgroundExecutor, priority: Priority) -> Self {
 903        let (tx, rx) = mpsc::channel(1);
 904        Self {
 905            executor,
 906            priority,
 907            tx: Some(tx),
 908            rx,
 909            futures: Default::default(),
 910            lifetime: PhantomData,
 911        }
 912    }
 913
 914    /// How many CPUs are available to the dispatcher.
 915    pub fn num_cpus(&self) -> usize {
 916        self.executor.num_cpus()
 917    }
 918
 919    /// Spawn a future into this scope.
 920    #[track_caller]
 921    pub fn spawn<F>(&mut self, f: F)
 922    where
 923        F: Future<Output = ()> + Send + 'a,
 924    {
 925        let tx = self.tx.clone().unwrap();
 926
 927        // SAFETY: The 'a lifetime is guaranteed to outlive any of these futures because
 928        // dropping this `Scope` blocks until all of the futures have resolved.
 929        let f = unsafe {
 930            mem::transmute::<
 931                Pin<Box<dyn Future<Output = ()> + Send + 'a>>,
 932                Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
 933            >(Box::pin(async move {
 934                f.await;
 935                drop(tx);
 936            }))
 937        };
 938        self.futures.push(f);
 939    }
 940}
 941
 942impl Drop for Scope<'_> {
 943    fn drop(&mut self) {
 944        self.tx.take().unwrap();
 945
 946        // Wait until the channel is closed, which means that all of the spawned
 947        // futures have resolved.
 948        self.executor.block(self.rx.next());
 949    }
 950}
 951
 952#[cfg(test)]
 953mod test {
 954    use super::*;
 955    use crate::{App, TestDispatcher, TestPlatform};
 956    use rand::SeedableRng;
 957    use std::cell::RefCell;
 958
 959    #[test]
 960    fn sanity_test_tasks_run() {
 961        let dispatcher = TestDispatcher::new(StdRng::seed_from_u64(0));
 962        let arc_dispatcher = Arc::new(dispatcher.clone());
 963        let background_executor = BackgroundExecutor::new(arc_dispatcher.clone());
 964        let foreground_executor = ForegroundExecutor::new(arc_dispatcher);
 965
 966        let platform = TestPlatform::new(background_executor, foreground_executor.clone());
 967        let asset_source = Arc::new(());
 968        let http_client = http_client::FakeHttpClient::with_404_response();
 969
 970        let app = App::new_app(platform, asset_source, http_client);
 971        let liveness_token = std::sync::Arc::downgrade(&app.borrow().liveness);
 972
 973        let task_ran = Rc::new(RefCell::new(false));
 974
 975        foreground_executor
 976            .spawn_context(liveness_token, {
 977                let task_ran = Rc::clone(&task_ran);
 978                async move {
 979                    *task_ran.borrow_mut() = true;
 980                }
 981            })
 982            .detach();
 983
 984        // Run dispatcher while app is still alive
 985        dispatcher.run_until_parked();
 986
 987        // Task should have run
 988        assert!(
 989            *task_ran.borrow(),
 990            "Task should run normally when app is alive"
 991        );
 992    }
 993
 994    #[test]
 995    fn test_task_cancelled_when_app_dropped() {
 996        let dispatcher = TestDispatcher::new(StdRng::seed_from_u64(0));
 997        let arc_dispatcher = Arc::new(dispatcher.clone());
 998        let background_executor = BackgroundExecutor::new(arc_dispatcher.clone());
 999        let foreground_executor = ForegroundExecutor::new(arc_dispatcher);
1000
1001        let platform = TestPlatform::new(background_executor, foreground_executor.clone());
1002        let asset_source = Arc::new(());
1003        let http_client = http_client::FakeHttpClient::with_404_response();
1004
1005        let app = App::new_app(platform, asset_source, http_client);
1006        let liveness_token = std::sync::Arc::downgrade(&app.borrow().liveness);
1007        let app_weak = Rc::downgrade(&app);
1008
1009        let task_ran = Rc::new(RefCell::new(false));
1010        let task_ran_clone = Rc::clone(&task_ran);
1011
1012        foreground_executor
1013            .spawn_context(liveness_token, async move {
1014                *task_ran_clone.borrow_mut() = true;
1015            })
1016            .detach();
1017
1018        drop(app);
1019
1020        assert!(app_weak.upgrade().is_none(), "App should have been dropped");
1021
1022        dispatcher.run_until_parked();
1023
1024        // The task should have been cancelled, not run
1025        assert!(
1026            !*task_ran.borrow(),
1027            "Task should have been cancelled when app was dropped, but it ran!"
1028        );
1029    }
1030
1031    #[test]
1032    fn test_nested_tasks_both_cancel() {
1033        let dispatcher = TestDispatcher::new(StdRng::seed_from_u64(0));
1034        let arc_dispatcher = Arc::new(dispatcher.clone());
1035        let background_executor = BackgroundExecutor::new(arc_dispatcher.clone());
1036        let foreground_executor = ForegroundExecutor::new(arc_dispatcher);
1037
1038        let platform = TestPlatform::new(background_executor, foreground_executor.clone());
1039        let asset_source = Arc::new(());
1040        let http_client = http_client::FakeHttpClient::with_404_response();
1041
1042        let app = App::new_app(platform, asset_source, http_client);
1043        let liveness_token = std::sync::Arc::downgrade(&app.borrow().liveness);
1044        let app_weak = Rc::downgrade(&app);
1045
1046        let outer_completed = Rc::new(RefCell::new(false));
1047        let inner_completed = Rc::new(RefCell::new(false));
1048        let reached_await = Rc::new(RefCell::new(false));
1049
1050        let outer_flag = Rc::clone(&outer_completed);
1051        let inner_flag = Rc::clone(&inner_completed);
1052        let await_flag = Rc::clone(&reached_await);
1053
1054        // Channel to block the inner task until we're ready
1055        let (tx, rx) = futures::channel::oneshot::channel::<()>();
1056
1057        // We need clones of executor and liveness_token for the inner spawn
1058        let inner_executor = foreground_executor.clone();
1059        let inner_liveness_token = liveness_token.clone();
1060
1061        foreground_executor
1062            .spawn_context(liveness_token, async move {
1063                let inner_task = inner_executor.spawn_context(inner_liveness_token, {
1064                    let inner_flag = Rc::clone(&inner_flag);
1065                    async move {
1066                        rx.await.ok();
1067                        *inner_flag.borrow_mut() = true;
1068                    }
1069                });
1070
1071                *await_flag.borrow_mut() = true;
1072
1073                inner_task.await;
1074
1075                *outer_flag.borrow_mut() = true;
1076            })
1077            .detach();
1078
1079        // Run dispatcher until outer task reaches the await point
1080        // The inner task will be blocked on the channel
1081        dispatcher.run_until_parked();
1082
1083        // Verify we actually reached the await point before dropping the app
1084        assert!(
1085            *reached_await.borrow(),
1086            "Outer task should have reached the await point"
1087        );
1088
1089        // Neither task should have completed yet
1090        assert!(
1091            !*outer_completed.borrow(),
1092            "Outer task should not have completed yet"
1093        );
1094        assert!(
1095            !*inner_completed.borrow(),
1096            "Inner task should not have completed yet"
1097        );
1098
1099        // Drop the channel sender and app while outer is awaiting inner
1100        drop(tx);
1101        drop(app);
1102        assert!(app_weak.upgrade().is_none(), "App should have been dropped");
1103
1104        // Run dispatcher - both tasks should be cancelled
1105        dispatcher.run_until_parked();
1106
1107        // Neither task should have completed (both were cancelled)
1108        assert!(
1109            !*outer_completed.borrow(),
1110            "Outer task should have been cancelled, not completed"
1111        );
1112        assert!(
1113            !*inner_completed.borrow(),
1114            "Inner task should have been cancelled, not completed"
1115        );
1116    }
1117
1118    #[test]
1119    fn test_task_without_app_tracking_still_runs() {
1120        let dispatcher = TestDispatcher::new(StdRng::seed_from_u64(0));
1121        let arc_dispatcher = Arc::new(dispatcher.clone());
1122        let background_executor = BackgroundExecutor::new(arc_dispatcher.clone());
1123        let foreground_executor = ForegroundExecutor::new(arc_dispatcher);
1124
1125        let platform = TestPlatform::new(background_executor, foreground_executor.clone());
1126        let asset_source = Arc::new(());
1127        let http_client = http_client::FakeHttpClient::with_404_response();
1128
1129        let app = App::new_app(platform, asset_source, http_client);
1130        let app_weak = Rc::downgrade(&app);
1131
1132        let task_ran = Rc::new(RefCell::new(false));
1133        let task_ran_clone = Rc::clone(&task_ran);
1134
1135        let _task = foreground_executor.spawn(async move {
1136            *task_ran_clone.borrow_mut() = true;
1137        });
1138
1139        drop(app);
1140
1141        assert!(app_weak.upgrade().is_none(), "App should have been dropped");
1142
1143        dispatcher.run_until_parked();
1144
1145        assert!(
1146            *task_ran.borrow(),
1147            "Task without app tracking should still run after app is dropped"
1148        );
1149    }
1150
1151    #[test]
1152    #[should_panic]
1153    fn test_polling_cancelled_task_panics() {
1154        let dispatcher = TestDispatcher::new(StdRng::seed_from_u64(0));
1155        let arc_dispatcher = Arc::new(dispatcher.clone());
1156        let background_executor = BackgroundExecutor::new(arc_dispatcher.clone());
1157        let foreground_executor = ForegroundExecutor::new(arc_dispatcher);
1158
1159        let platform = TestPlatform::new(background_executor.clone(), foreground_executor.clone());
1160        let asset_source = Arc::new(());
1161        let http_client = http_client::FakeHttpClient::with_404_response();
1162
1163        let app = App::new_app(platform, asset_source, http_client);
1164        let liveness_token = std::sync::Arc::downgrade(&app.borrow().liveness);
1165        let app_weak = Rc::downgrade(&app);
1166
1167        let task = foreground_executor.spawn_context(liveness_token, async move { 42 });
1168
1169        drop(app);
1170
1171        assert!(app_weak.upgrade().is_none(), "App should have been dropped");
1172
1173        dispatcher.run_until_parked();
1174
1175        background_executor.block(task);
1176    }
1177
1178    #[test]
1179    fn test_polling_cancelled_task_returns_none_with_fallible() {
1180        let dispatcher = TestDispatcher::new(StdRng::seed_from_u64(0));
1181        let arc_dispatcher = Arc::new(dispatcher.clone());
1182        let background_executor = BackgroundExecutor::new(arc_dispatcher.clone());
1183        let foreground_executor = ForegroundExecutor::new(arc_dispatcher);
1184
1185        let platform = TestPlatform::new(background_executor.clone(), foreground_executor.clone());
1186        let asset_source = Arc::new(());
1187        let http_client = http_client::FakeHttpClient::with_404_response();
1188
1189        let app = App::new_app(platform, asset_source, http_client);
1190        let liveness_token = std::sync::Arc::downgrade(&app.borrow().liveness);
1191        let app_weak = Rc::downgrade(&app);
1192
1193        let task = foreground_executor
1194            .spawn_context(liveness_token, async move { 42 })
1195            .fallible();
1196
1197        drop(app);
1198
1199        assert!(app_weak.upgrade().is_none(), "App should have been dropped");
1200
1201        dispatcher.run_until_parked();
1202
1203        let result = background_executor.block(task);
1204        assert_eq!(result, None, "Cancelled task should return None");
1205    }
1206}