executor.rs

   1use crate::{App, PlatformDispatcher, RunnableMeta, RunnableVariant, TaskTiming, profiler};
   2use async_task::Runnable;
   3use futures::channel::mpsc;
   4use parking_lot::{Condvar, Mutex};
   5use smol::prelude::*;
   6use std::{
   7    fmt::Debug,
   8    marker::PhantomData,
   9    mem::{self, ManuallyDrop},
  10    num::NonZeroUsize,
  11    panic::Location,
  12    pin::Pin,
  13    rc::Rc,
  14    sync::{
  15        Arc,
  16        atomic::{AtomicUsize, Ordering},
  17    },
  18    task::{Context, Poll},
  19    thread::{self, ThreadId},
  20    time::{Duration, Instant},
  21};
  22use util::TryFutureExt;
  23use waker_fn::waker_fn;
  24
  25#[cfg(any(test, feature = "test-support"))]
  26use rand::rngs::StdRng;
  27
  28/// A pointer to the executor that is currently running,
  29/// for spawning background tasks.
  30#[derive(Clone)]
  31pub struct BackgroundExecutor {
  32    #[doc(hidden)]
  33    pub dispatcher: Arc<dyn PlatformDispatcher>,
  34}
  35
  36/// A pointer to the executor that is currently running,
  37/// for spawning tasks on the main thread.
  38///
  39/// This is intentionally `!Send` via the `not_send` marker field. This is because
  40/// `ForegroundExecutor::spawn` does not require `Send` but checks at runtime that the future is
  41/// only polled from the same thread it was spawned from. These checks would fail when spawning
  42/// foreground tasks from background threads.
  43#[derive(Clone)]
  44pub struct ForegroundExecutor {
  45    #[doc(hidden)]
  46    pub dispatcher: Arc<dyn PlatformDispatcher>,
  47    not_send: PhantomData<Rc<()>>,
  48}
  49
  50/// Realtime task priority
  51#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
  52#[repr(u8)]
  53pub enum RealtimePriority {
  54    /// Audio task
  55    Audio,
  56    /// Other realtime task
  57    #[default]
  58    Other,
  59}
  60
  61/// Task priority
  62#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
  63#[repr(u8)]
  64pub enum Priority {
  65    /// Realtime priority
  66    ///
  67    /// Spawning a task with this priority will spin it off on a separate thread dedicated just to that task.
  68    Realtime(RealtimePriority),
  69    /// High priority
  70    ///
  71    /// Only use for tasks that are critical to the user experience / responsiveness of the editor.
  72    High,
  73    /// Medium priority, probably suits most of your use cases.
  74    #[default]
  75    Medium,
  76    /// Low priority
  77    ///
  78    /// Prioritize this for background work that can come in large quantities
  79    /// to not starve the executor of resources for high priority tasks
  80    Low,
  81}
  82
  83impl Priority {
  84    #[allow(dead_code)]
  85    pub(crate) const fn probability(&self) -> u32 {
  86        match self {
  87            // realtime priorities are not considered for probability scheduling
  88            Priority::Realtime(_) => 0,
  89            Priority::High => 60,
  90            Priority::Medium => 30,
  91            Priority::Low => 10,
  92        }
  93    }
  94}
  95
  96/// Task is a primitive that allows work to happen in the background.
  97///
  98/// It implements [`Future`] so you can `.await` on it.
  99///
 100/// If you drop a task it will be cancelled immediately. Calling [`Task::detach`] allows
 101/// the task to continue running, but with no way to return a value.
 102#[must_use]
 103#[derive(Debug)]
 104pub struct Task<T>(TaskState<T>);
 105
 106#[derive(Debug)]
 107enum TaskState<T> {
 108    /// A task that is ready to return a value
 109    Ready(Option<T>),
 110
 111    /// A task that is currently running.
 112    Spawned(async_task::Task<T, RunnableMeta>),
 113}
 114
 115impl<T> Task<T> {
 116    /// Creates a new task that will resolve with the value
 117    pub fn ready(val: T) -> Self {
 118        Task(TaskState::Ready(Some(val)))
 119    }
 120
 121    /// Detaching a task runs it to completion in the background
 122    pub fn detach(self) {
 123        match self {
 124            Task(TaskState::Ready(_)) => {}
 125            Task(TaskState::Spawned(task)) => task.detach(),
 126        }
 127    }
 128
 129    /// Converts this task into a fallible task that returns `Option<T>`.
 130    ///
 131    /// Unlike the standard `Task<T>`, a [`FallibleTask`] will return `None`
 132    /// if the app was dropped while the task is executing.
 133    ///
 134    /// # Example
 135    ///
 136    /// ```ignore
 137    /// // Background task that gracefully handles app shutdown:
 138    /// cx.background_spawn(async move {
 139    ///     let result = foreground_task.fallible().await;
 140    ///     if let Some(value) = result {
 141    ///         // Process the value
 142    ///     }
 143    ///     // If None, app was shut down - just exit gracefully
 144    /// }).detach();
 145    /// ```
 146    pub fn fallible(self) -> FallibleTask<T> {
 147        FallibleTask(match self.0 {
 148            TaskState::Ready(val) => FallibleTaskState::Ready(val),
 149            TaskState::Spawned(task) => FallibleTaskState::Spawned(task.fallible()),
 150        })
 151    }
 152}
 153
 154impl<E, T> Task<Result<T, E>>
 155where
 156    T: 'static,
 157    E: 'static + Debug,
 158{
 159    /// Run the task to completion in the background and log any
 160    /// errors that occur.
 161    #[track_caller]
 162    pub fn detach_and_log_err(self, cx: &App) {
 163        let location = core::panic::Location::caller();
 164        cx.foreground_executor()
 165            .spawn(self.log_tracked_err(*location))
 166            .detach();
 167    }
 168}
 169
 170impl<T> Future for Task<T> {
 171    type Output = T;
 172
 173    fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
 174        match unsafe { self.get_unchecked_mut() } {
 175            Task(TaskState::Ready(val)) => Poll::Ready(val.take().unwrap()),
 176            Task(TaskState::Spawned(task)) => task.poll(cx),
 177        }
 178    }
 179}
 180
 181/// A task that returns `Option<T>` instead of panicking when cancelled.
 182#[must_use]
 183pub struct FallibleTask<T>(FallibleTaskState<T>);
 184
 185enum FallibleTaskState<T> {
 186    /// A task that is ready to return a value
 187    Ready(Option<T>),
 188
 189    /// A task that is currently running (wraps async_task::FallibleTask).
 190    Spawned(async_task::FallibleTask<T, RunnableMeta>),
 191}
 192
 193impl<T> FallibleTask<T> {
 194    /// Creates a new fallible task that will resolve with the value.
 195    pub fn ready(val: T) -> Self {
 196        FallibleTask(FallibleTaskState::Ready(Some(val)))
 197    }
 198
 199    /// Detaching a task runs it to completion in the background.
 200    pub fn detach(self) {
 201        match self.0 {
 202            FallibleTaskState::Ready(_) => {}
 203            FallibleTaskState::Spawned(task) => task.detach(),
 204        }
 205    }
 206}
 207
 208impl<T> Future for FallibleTask<T> {
 209    type Output = Option<T>;
 210
 211    fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
 212        match unsafe { self.get_unchecked_mut() } {
 213            FallibleTask(FallibleTaskState::Ready(val)) => Poll::Ready(val.take()),
 214            FallibleTask(FallibleTaskState::Spawned(task)) => Pin::new(task).poll(cx),
 215        }
 216    }
 217}
 218
 219impl<T> std::fmt::Debug for FallibleTask<T> {
 220    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
 221        match &self.0 {
 222            FallibleTaskState::Ready(_) => f.debug_tuple("FallibleTask::Ready").finish(),
 223            FallibleTaskState::Spawned(task) => {
 224                f.debug_tuple("FallibleTask::Spawned").field(task).finish()
 225            }
 226        }
 227    }
 228}
 229
 230/// A task label is an opaque identifier that you can use to
 231/// refer to a task in tests.
 232#[derive(Clone, Copy, PartialEq, Eq, Hash, Debug)]
 233pub struct TaskLabel(NonZeroUsize);
 234
 235impl Default for TaskLabel {
 236    fn default() -> Self {
 237        Self::new()
 238    }
 239}
 240
 241impl TaskLabel {
 242    /// Construct a new task label.
 243    pub fn new() -> Self {
 244        static NEXT_TASK_LABEL: AtomicUsize = AtomicUsize::new(1);
 245        Self(
 246            NEXT_TASK_LABEL
 247                .fetch_add(1, Ordering::SeqCst)
 248                .try_into()
 249                .unwrap(),
 250        )
 251    }
 252}
 253
 254type AnyLocalFuture<R> = Pin<Box<dyn 'static + Future<Output = R>>>;
 255
 256type AnyFuture<R> = Pin<Box<dyn 'static + Send + Future<Output = R>>>;
 257
 258/// BackgroundExecutor lets you run things on background threads.
 259/// In production this is a thread pool with no ordering guarantees.
 260/// In tests this is simulated by running tasks one by one in a deterministic
 261/// (but arbitrary) order controlled by the `SEED` environment variable.
 262impl BackgroundExecutor {
 263    #[doc(hidden)]
 264    pub fn new(dispatcher: Arc<dyn PlatformDispatcher>) -> Self {
 265        Self { dispatcher }
 266    }
 267
 268    /// Enqueues the given future to be run to completion on a background thread.
 269    #[track_caller]
 270    pub fn spawn<R>(&self, future: impl Future<Output = R> + Send + 'static) -> Task<R>
 271    where
 272        R: Send + 'static,
 273    {
 274        self.spawn_with_priority(Priority::default(), future)
 275    }
 276
 277    /// Enqueues the given future to be run to completion on a background thread.
 278    #[track_caller]
 279    pub fn spawn_with_priority<R>(
 280        &self,
 281        priority: Priority,
 282        future: impl Future<Output = R> + Send + 'static,
 283    ) -> Task<R>
 284    where
 285        R: Send + 'static,
 286    {
 287        self.spawn_internal::<R>(Box::pin(future), None, priority)
 288    }
 289
 290    /// Enqueues the given future to be run to completion on a background thread and blocking the current task on it.
 291    ///
 292    /// This allows to spawn background work that borrows from its scope. Note that the supplied future will run to
 293    /// completion before the current task is resumed, even if the current task is slated for cancellation.
 294    pub async fn await_on_background<R>(&self, future: impl Future<Output = R> + Send) -> R
 295    where
 296        R: Send,
 297    {
 298        // We need to ensure that cancellation of the parent task does not drop the environment
 299        // before the our own task has completed or got cancelled.
 300        struct NotifyOnDrop<'a>(&'a (Condvar, Mutex<bool>));
 301
 302        impl Drop for NotifyOnDrop<'_> {
 303            fn drop(&mut self) {
 304                *self.0.1.lock() = true;
 305                self.0.0.notify_all();
 306            }
 307        }
 308
 309        struct WaitOnDrop<'a>(&'a (Condvar, Mutex<bool>));
 310
 311        impl Drop for WaitOnDrop<'_> {
 312            fn drop(&mut self) {
 313                let mut done = self.0.1.lock();
 314                if !*done {
 315                    self.0.0.wait(&mut done);
 316                }
 317            }
 318        }
 319
 320        let dispatcher = self.dispatcher.clone();
 321        let location = core::panic::Location::caller();
 322
 323        let pair = &(Condvar::new(), Mutex::new(false));
 324        let _wait_guard = WaitOnDrop(pair);
 325
 326        let (runnable, task) = unsafe {
 327            async_task::Builder::new()
 328                .metadata(RunnableMeta {
 329                    location,
 330                    app: None,
 331                })
 332                .spawn_unchecked(
 333                    move |_| async {
 334                        let _notify_guard = NotifyOnDrop(pair);
 335                        future.await
 336                    },
 337                    move |runnable| {
 338                        dispatcher.dispatch(
 339                            RunnableVariant::Meta(runnable),
 340                            None,
 341                            Priority::default(),
 342                        )
 343                    },
 344                )
 345        };
 346        runnable.schedule();
 347        task.await
 348    }
 349
 350    /// Enqueues the given future to be run to completion on a background thread.
 351    /// The given label can be used to control the priority of the task in tests.
 352    #[track_caller]
 353    pub fn spawn_labeled<R>(
 354        &self,
 355        label: TaskLabel,
 356        future: impl Future<Output = R> + Send + 'static,
 357    ) -> Task<R>
 358    where
 359        R: Send + 'static,
 360    {
 361        self.spawn_internal::<R>(Box::pin(future), Some(label), Priority::default())
 362    }
 363
 364    #[track_caller]
 365    fn spawn_internal<R: Send + 'static>(
 366        &self,
 367        future: AnyFuture<R>,
 368        label: Option<TaskLabel>,
 369        priority: Priority,
 370    ) -> Task<R> {
 371        let dispatcher = self.dispatcher.clone();
 372        let (runnable, task) = if let Priority::Realtime(realtime) = priority {
 373            let location = core::panic::Location::caller();
 374            let (mut tx, rx) = flume::bounded::<Runnable<RunnableMeta>>(1);
 375
 376            dispatcher.spawn_realtime(
 377                realtime,
 378                Box::new(move || {
 379                    while let Ok(runnable) = rx.recv() {
 380                        let start = Instant::now();
 381                        let location = runnable.metadata().location;
 382                        let mut timing = TaskTiming {
 383                            location,
 384                            start,
 385                            end: None,
 386                        };
 387                        profiler::add_task_timing(timing);
 388
 389                        runnable.run();
 390
 391                        let end = Instant::now();
 392                        timing.end = Some(end);
 393                        profiler::add_task_timing(timing);
 394                    }
 395                }),
 396            );
 397
 398            async_task::Builder::new()
 399                .metadata(RunnableMeta {
 400                    location,
 401                    app: None,
 402                })
 403                .spawn(
 404                    move |_| future,
 405                    move |runnable| {
 406                        let _ = tx.send(runnable);
 407                    },
 408                )
 409        } else {
 410            let location = core::panic::Location::caller();
 411            async_task::Builder::new()
 412                .metadata(RunnableMeta {
 413                    location,
 414                    app: None,
 415                })
 416                .spawn(
 417                    move |_| future,
 418                    move |runnable| {
 419                        dispatcher.dispatch(RunnableVariant::Meta(runnable), label, priority)
 420                    },
 421                )
 422        };
 423
 424        runnable.schedule();
 425        Task(TaskState::Spawned(task))
 426    }
 427
 428    /// Used by the test harness to run an async test in a synchronous fashion.
 429    #[cfg(any(test, feature = "test-support"))]
 430    #[track_caller]
 431    pub fn block_test<R>(&self, future: impl Future<Output = R>) -> R {
 432        if let Ok(value) = self.block_internal(false, future, None) {
 433            value
 434        } else {
 435            unreachable!()
 436        }
 437    }
 438
 439    /// Block the current thread until the given future resolves.
 440    /// Consider using `block_with_timeout` instead.
 441    pub fn block<R>(&self, future: impl Future<Output = R>) -> R {
 442        if let Ok(value) = self.block_internal(true, future, None) {
 443            value
 444        } else {
 445            unreachable!()
 446        }
 447    }
 448
 449    #[cfg(not(any(test, feature = "test-support")))]
 450    pub(crate) fn block_internal<Fut: Future>(
 451        &self,
 452        _background_only: bool,
 453        future: Fut,
 454        timeout: Option<Duration>,
 455    ) -> Result<Fut::Output, impl Future<Output = Fut::Output> + use<Fut>> {
 456        use std::time::Instant;
 457
 458        let mut future = Box::pin(future);
 459        if timeout == Some(Duration::ZERO) {
 460            return Err(future);
 461        }
 462        let deadline = timeout.map(|timeout| Instant::now() + timeout);
 463
 464        let parker = parking::Parker::new();
 465        let unparker = parker.unparker();
 466        let waker = waker_fn(move || {
 467            unparker.unpark();
 468        });
 469        let mut cx = std::task::Context::from_waker(&waker);
 470
 471        loop {
 472            match future.as_mut().poll(&mut cx) {
 473                Poll::Ready(result) => return Ok(result),
 474                Poll::Pending => {
 475                    let timeout =
 476                        deadline.map(|deadline| deadline.saturating_duration_since(Instant::now()));
 477                    if let Some(timeout) = timeout {
 478                        if !parker.park_timeout(timeout)
 479                            && deadline.is_some_and(|deadline| deadline < Instant::now())
 480                        {
 481                            return Err(future);
 482                        }
 483                    } else {
 484                        parker.park();
 485                    }
 486                }
 487            }
 488        }
 489    }
 490
 491    #[cfg(any(test, feature = "test-support"))]
 492    #[track_caller]
 493    pub(crate) fn block_internal<Fut: Future>(
 494        &self,
 495        background_only: bool,
 496        future: Fut,
 497        timeout: Option<Duration>,
 498    ) -> Result<Fut::Output, impl Future<Output = Fut::Output> + use<Fut>> {
 499        use std::sync::atomic::AtomicBool;
 500        use std::time::Instant;
 501
 502        use parking::Parker;
 503
 504        let mut future = Box::pin(future);
 505        if timeout == Some(Duration::ZERO) {
 506            return Err(future);
 507        }
 508
 509        // When using a real platform (e.g., MacPlatform for visual tests that need actual
 510        // Metal rendering), there's no test dispatcher. In this case, we block the thread
 511        // directly by polling the future and parking until woken. This is required for
 512        // VisualTestAppContext which uses real platform rendering but still needs blocking
 513        // behavior for code paths like editor initialization that call block_with_timeout.
 514        let Some(dispatcher) = self.dispatcher.as_test() else {
 515            let deadline = timeout.map(|timeout| Instant::now() + timeout);
 516
 517            let parker = Parker::new();
 518            let unparker = parker.unparker();
 519            let waker = waker_fn(move || {
 520                unparker.unpark();
 521            });
 522            let mut cx = std::task::Context::from_waker(&waker);
 523
 524            loop {
 525                match future.as_mut().poll(&mut cx) {
 526                    Poll::Ready(result) => return Ok(result),
 527                    Poll::Pending => {
 528                        let timeout = deadline
 529                            .map(|deadline| deadline.saturating_duration_since(Instant::now()));
 530                        if let Some(timeout) = timeout {
 531                            if !parker.park_timeout(timeout)
 532                                && deadline.is_some_and(|deadline| deadline < Instant::now())
 533                            {
 534                                return Err(future);
 535                            }
 536                        } else {
 537                            parker.park();
 538                        }
 539                    }
 540                }
 541            }
 542        };
 543
 544        let mut max_ticks = if timeout.is_some() {
 545            dispatcher.gen_block_on_ticks()
 546        } else {
 547            usize::MAX
 548        };
 549
 550        let parker = Parker::new();
 551        let unparker = parker.unparker();
 552
 553        let awoken = Arc::new(AtomicBool::new(false));
 554        let waker = waker_fn({
 555            let awoken = awoken.clone();
 556            let unparker = unparker.clone();
 557            move || {
 558                awoken.store(true, Ordering::SeqCst);
 559                unparker.unpark();
 560            }
 561        });
 562        let mut cx = std::task::Context::from_waker(&waker);
 563
 564        let duration = Duration::from_secs(
 565            option_env!("GPUI_TEST_TIMEOUT")
 566                .and_then(|s| s.parse::<u64>().ok())
 567                .unwrap_or(180),
 568        );
 569        let mut test_should_end_by = Instant::now() + duration;
 570
 571        loop {
 572            match future.as_mut().poll(&mut cx) {
 573                Poll::Ready(result) => return Ok(result),
 574                Poll::Pending => {
 575                    if max_ticks == 0 {
 576                        return Err(future);
 577                    }
 578                    max_ticks -= 1;
 579
 580                    if !dispatcher.tick(background_only) {
 581                        if awoken.swap(false, Ordering::SeqCst) {
 582                            continue;
 583                        }
 584
 585                        if !dispatcher.parking_allowed() {
 586                            if dispatcher.advance_clock_to_next_delayed() {
 587                                continue;
 588                            }
 589                            let mut backtrace_message = String::new();
 590                            let mut waiting_message = String::new();
 591                            if let Some(backtrace) = dispatcher.waiting_backtrace() {
 592                                backtrace_message =
 593                                    format!("\nbacktrace of waiting future:\n{:?}", backtrace);
 594                            }
 595                            if let Some(waiting_hint) = dispatcher.waiting_hint() {
 596                                waiting_message = format!("\n  waiting on: {}\n", waiting_hint);
 597                            }
 598                            panic!(
 599                                "parked with nothing left to run{waiting_message}{backtrace_message}",
 600                            )
 601                        }
 602                        dispatcher.push_unparker(unparker.clone());
 603                        parker.park_timeout(Duration::from_millis(1));
 604                        if Instant::now() > test_should_end_by {
 605                            panic!("test timed out after {duration:?} with allow_parking")
 606                        }
 607                    }
 608                }
 609            }
 610        }
 611    }
 612
 613    /// Block the current thread until the given future resolves
 614    /// or `duration` has elapsed.
 615    pub fn block_with_timeout<Fut: Future>(
 616        &self,
 617        duration: Duration,
 618        future: Fut,
 619    ) -> Result<Fut::Output, impl Future<Output = Fut::Output> + use<Fut>> {
 620        self.block_internal(true, future, Some(duration))
 621    }
 622
 623    /// Scoped lets you start a number of tasks and waits
 624    /// for all of them to complete before returning.
 625    pub async fn scoped<'scope, F>(&self, scheduler: F)
 626    where
 627        F: FnOnce(&mut Scope<'scope>),
 628    {
 629        let mut scope = Scope::new(self.clone(), Priority::default());
 630        (scheduler)(&mut scope);
 631        let spawned = mem::take(&mut scope.futures)
 632            .into_iter()
 633            .map(|f| self.spawn_with_priority(scope.priority, f))
 634            .collect::<Vec<_>>();
 635        for task in spawned {
 636            task.await;
 637        }
 638    }
 639
 640    /// Scoped lets you start a number of tasks and waits
 641    /// for all of them to complete before returning.
 642    pub async fn scoped_priority<'scope, F>(&self, priority: Priority, scheduler: F)
 643    where
 644        F: FnOnce(&mut Scope<'scope>),
 645    {
 646        let mut scope = Scope::new(self.clone(), priority);
 647        (scheduler)(&mut scope);
 648        let spawned = mem::take(&mut scope.futures)
 649            .into_iter()
 650            .map(|f| self.spawn_with_priority(scope.priority, f))
 651            .collect::<Vec<_>>();
 652        for task in spawned {
 653            task.await;
 654        }
 655    }
 656
 657    /// Get the current time.
 658    ///
 659    /// Calling this instead of `std::time::Instant::now` allows the use
 660    /// of fake timers in tests.
 661    pub fn now(&self) -> Instant {
 662        self.dispatcher.now()
 663    }
 664
 665    /// Returns a task that will complete after the given duration.
 666    /// Depending on other concurrent tasks the elapsed duration may be longer
 667    /// than requested.
 668    pub fn timer(&self, duration: Duration) -> Task<()> {
 669        if duration.is_zero() {
 670            return Task::ready(());
 671        }
 672        let location = core::panic::Location::caller();
 673        let (runnable, task) = async_task::Builder::new()
 674            .metadata(RunnableMeta {
 675                location,
 676                app: None,
 677            })
 678            .spawn(move |_| async move {}, {
 679                let dispatcher = self.dispatcher.clone();
 680                move |runnable| dispatcher.dispatch_after(duration, RunnableVariant::Meta(runnable))
 681            });
 682        runnable.schedule();
 683        Task(TaskState::Spawned(task))
 684    }
 685
 686    /// in tests, start_waiting lets you indicate which task is waiting (for debugging only)
 687    #[cfg(any(test, feature = "test-support"))]
 688    pub fn start_waiting(&self) {
 689        self.dispatcher.as_test().unwrap().start_waiting();
 690    }
 691
 692    /// in tests, removes the debugging data added by start_waiting
 693    #[cfg(any(test, feature = "test-support"))]
 694    pub fn finish_waiting(&self) {
 695        self.dispatcher.as_test().unwrap().finish_waiting();
 696    }
 697
 698    /// in tests, run an arbitrary number of tasks (determined by the SEED environment variable)
 699    #[cfg(any(test, feature = "test-support"))]
 700    pub fn simulate_random_delay(&self) -> impl Future<Output = ()> + use<> {
 701        self.dispatcher.as_test().unwrap().simulate_random_delay()
 702    }
 703
 704    /// in tests, indicate that a given task from `spawn_labeled` should run after everything else
 705    #[cfg(any(test, feature = "test-support"))]
 706    pub fn deprioritize(&self, task_label: TaskLabel) {
 707        self.dispatcher.as_test().unwrap().deprioritize(task_label)
 708    }
 709
 710    /// in tests, move time forward. This does not run any tasks, but does make `timer`s ready.
 711    #[cfg(any(test, feature = "test-support"))]
 712    pub fn advance_clock(&self, duration: Duration) {
 713        self.dispatcher.as_test().unwrap().advance_clock(duration)
 714    }
 715
 716    /// in tests, run one task.
 717    #[cfg(any(test, feature = "test-support"))]
 718    pub fn tick(&self) -> bool {
 719        self.dispatcher.as_test().unwrap().tick(false)
 720    }
 721
 722    /// in tests, run all tasks that are ready to run. If after doing so
 723    /// the test still has outstanding tasks, this will panic. (See also [`Self::allow_parking`])
 724    #[cfg(any(test, feature = "test-support"))]
 725    pub fn run_until_parked(&self) {
 726        self.dispatcher.as_test().unwrap().run_until_parked()
 727    }
 728
 729    /// in tests, prevents `run_until_parked` from panicking if there are outstanding tasks.
 730    /// This is useful when you are integrating other (non-GPUI) futures, like disk access, that
 731    /// do take real async time to run.
 732    #[cfg(any(test, feature = "test-support"))]
 733    pub fn allow_parking(&self) {
 734        self.dispatcher.as_test().unwrap().allow_parking();
 735    }
 736
 737    /// undoes the effect of [`Self::allow_parking`].
 738    #[cfg(any(test, feature = "test-support"))]
 739    pub fn forbid_parking(&self) {
 740        self.dispatcher.as_test().unwrap().forbid_parking();
 741    }
 742
 743    /// adds detail to the "parked with nothing let to run" message.
 744    #[cfg(any(test, feature = "test-support"))]
 745    pub fn set_waiting_hint(&self, msg: Option<String>) {
 746        self.dispatcher.as_test().unwrap().set_waiting_hint(msg);
 747    }
 748
 749    /// in tests, returns the rng used by the dispatcher and seeded by the `SEED` environment variable
 750    #[cfg(any(test, feature = "test-support"))]
 751    pub fn rng(&self) -> StdRng {
 752        self.dispatcher.as_test().unwrap().rng()
 753    }
 754
 755    /// How many CPUs are available to the dispatcher.
 756    pub fn num_cpus(&self) -> usize {
 757        #[cfg(any(test, feature = "test-support"))]
 758        return 4;
 759
 760        #[cfg(not(any(test, feature = "test-support")))]
 761        return num_cpus::get();
 762    }
 763
 764    /// Whether we're on the main thread.
 765    pub fn is_main_thread(&self) -> bool {
 766        self.dispatcher.is_main_thread()
 767    }
 768
 769    #[cfg(any(test, feature = "test-support"))]
 770    /// in tests, control the number of ticks that `block_with_timeout` will run before timing out.
 771    pub fn set_block_on_ticks(&self, range: std::ops::RangeInclusive<usize>) {
 772        self.dispatcher.as_test().unwrap().set_block_on_ticks(range);
 773    }
 774}
 775
 776/// ForegroundExecutor runs things on the main thread.
 777impl ForegroundExecutor {
 778    /// Creates a new ForegroundExecutor from the given PlatformDispatcher.
 779    pub fn new(dispatcher: Arc<dyn PlatformDispatcher>) -> Self {
 780        Self {
 781            dispatcher,
 782            not_send: PhantomData,
 783        }
 784    }
 785
 786    /// Enqueues the given Task to run on the main thread at some point in the future.
 787    #[track_caller]
 788    pub fn spawn<R>(&self, future: impl Future<Output = R> + 'static) -> Task<R>
 789    where
 790        R: 'static,
 791    {
 792        self.inner_spawn(None, Priority::default(), future)
 793    }
 794
 795    /// Enqueues the given Task to run on the main thread at some point in the future.
 796    #[track_caller]
 797    pub fn spawn_with_priority<R>(
 798        &self,
 799        priority: Priority,
 800        future: impl Future<Output = R> + 'static,
 801    ) -> Task<R>
 802    where
 803        R: 'static,
 804    {
 805        self.inner_spawn(None, priority, future)
 806    }
 807
 808    #[track_caller]
 809    pub(crate) fn spawn_context<R>(
 810        &self,
 811        app: std::sync::Weak<()>,
 812        future: impl Future<Output = R> + 'static,
 813    ) -> Task<R>
 814    where
 815        R: 'static,
 816    {
 817        self.inner_spawn(Some(app), Priority::default(), future)
 818    }
 819
 820    #[track_caller]
 821    pub(crate) fn inner_spawn<R>(
 822        &self,
 823        app: Option<std::sync::Weak<()>>,
 824        priority: Priority,
 825        future: impl Future<Output = R> + 'static,
 826    ) -> Task<R>
 827    where
 828        R: 'static,
 829    {
 830        let dispatcher = self.dispatcher.clone();
 831        let location = core::panic::Location::caller();
 832
 833        #[track_caller]
 834        fn inner<R: 'static>(
 835            dispatcher: Arc<dyn PlatformDispatcher>,
 836            future: AnyLocalFuture<R>,
 837            location: &'static core::panic::Location<'static>,
 838            app: Option<std::sync::Weak<()>>,
 839            priority: Priority,
 840        ) -> Task<R> {
 841            let (runnable, task) = spawn_local_with_source_location(
 842                future,
 843                move |runnable| {
 844                    dispatcher.dispatch_on_main_thread(RunnableVariant::Meta(runnable), priority)
 845                },
 846                RunnableMeta { location, app },
 847            );
 848            runnable.schedule();
 849            Task(TaskState::Spawned(task))
 850        }
 851        inner::<R>(dispatcher, Box::pin(future), location, app, priority)
 852    }
 853}
 854
 855/// Variant of `async_task::spawn_local` that includes the source location of the spawn in panics.
 856///
 857/// Copy-modified from:
 858/// <https://github.com/smol-rs/async-task/blob/ca9dbe1db9c422fd765847fa91306e30a6bb58a9/src/runnable.rs#L405>
 859#[track_caller]
 860fn spawn_local_with_source_location<Fut, S, M>(
 861    future: Fut,
 862    schedule: S,
 863    metadata: M,
 864) -> (Runnable<M>, async_task::Task<Fut::Output, M>)
 865where
 866    Fut: Future + 'static,
 867    Fut::Output: 'static,
 868    S: async_task::Schedule<M> + Send + Sync + 'static,
 869    M: 'static,
 870{
 871    #[inline]
 872    fn thread_id() -> ThreadId {
 873        std::thread_local! {
 874            static ID: ThreadId = thread::current().id();
 875        }
 876        ID.try_with(|id| *id)
 877            .unwrap_or_else(|_| thread::current().id())
 878    }
 879
 880    struct Checked<F> {
 881        id: ThreadId,
 882        inner: ManuallyDrop<F>,
 883        location: &'static Location<'static>,
 884    }
 885
 886    impl<F> Drop for Checked<F> {
 887        fn drop(&mut self) {
 888            assert!(
 889                self.id == thread_id(),
 890                "local task dropped by a thread that didn't spawn it. Task spawned at {}",
 891                self.location
 892            );
 893            unsafe { ManuallyDrop::drop(&mut self.inner) };
 894        }
 895    }
 896
 897    impl<F: Future> Future for Checked<F> {
 898        type Output = F::Output;
 899
 900        fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
 901            assert!(
 902                self.id == thread_id(),
 903                "local task polled by a thread that didn't spawn it. Task spawned at {}",
 904                self.location
 905            );
 906            unsafe { self.map_unchecked_mut(|c| &mut *c.inner).poll(cx) }
 907        }
 908    }
 909
 910    // Wrap the future into one that checks which thread it's on.
 911    let future = Checked {
 912        id: thread_id(),
 913        inner: ManuallyDrop::new(future),
 914        location: Location::caller(),
 915    };
 916
 917    unsafe {
 918        async_task::Builder::new()
 919            .metadata(metadata)
 920            .spawn_unchecked(move |_| future, schedule)
 921    }
 922}
 923
 924/// Scope manages a set of tasks that are enqueued and waited on together. See [`BackgroundExecutor::scoped`].
 925pub struct Scope<'a> {
 926    executor: BackgroundExecutor,
 927    priority: Priority,
 928    futures: Vec<Pin<Box<dyn Future<Output = ()> + Send + 'static>>>,
 929    tx: Option<mpsc::Sender<()>>,
 930    rx: mpsc::Receiver<()>,
 931    lifetime: PhantomData<&'a ()>,
 932}
 933
 934impl<'a> Scope<'a> {
 935    fn new(executor: BackgroundExecutor, priority: Priority) -> Self {
 936        let (tx, rx) = mpsc::channel(1);
 937        Self {
 938            executor,
 939            priority,
 940            tx: Some(tx),
 941            rx,
 942            futures: Default::default(),
 943            lifetime: PhantomData,
 944        }
 945    }
 946
 947    /// How many CPUs are available to the dispatcher.
 948    pub fn num_cpus(&self) -> usize {
 949        self.executor.num_cpus()
 950    }
 951
 952    /// Spawn a future into this scope.
 953    #[track_caller]
 954    pub fn spawn<F>(&mut self, f: F)
 955    where
 956        F: Future<Output = ()> + Send + 'a,
 957    {
 958        let tx = self.tx.clone().unwrap();
 959
 960        // SAFETY: The 'a lifetime is guaranteed to outlive any of these futures because
 961        // dropping this `Scope` blocks until all of the futures have resolved.
 962        let f = unsafe {
 963            mem::transmute::<
 964                Pin<Box<dyn Future<Output = ()> + Send + 'a>>,
 965                Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
 966            >(Box::pin(async move {
 967                f.await;
 968                drop(tx);
 969            }))
 970        };
 971        self.futures.push(f);
 972    }
 973}
 974
 975impl Drop for Scope<'_> {
 976    fn drop(&mut self) {
 977        self.tx.take().unwrap();
 978
 979        // Wait until the channel is closed, which means that all of the spawned
 980        // futures have resolved.
 981        self.executor.block(self.rx.next());
 982    }
 983}
 984
 985#[cfg(test)]
 986mod test {
 987    use super::*;
 988    use crate::{App, TestDispatcher, TestPlatform};
 989    use rand::SeedableRng;
 990    use std::cell::RefCell;
 991
 992    #[test]
 993    fn sanity_test_tasks_run() {
 994        let dispatcher = TestDispatcher::new(StdRng::seed_from_u64(0));
 995        let arc_dispatcher = Arc::new(dispatcher.clone());
 996        let background_executor = BackgroundExecutor::new(arc_dispatcher.clone());
 997        let foreground_executor = ForegroundExecutor::new(arc_dispatcher);
 998
 999        let platform = TestPlatform::new(background_executor, foreground_executor.clone());
1000        let asset_source = Arc::new(());
1001        let http_client = http_client::FakeHttpClient::with_404_response();
1002
1003        let app = App::new_app(platform, asset_source, http_client);
1004        let liveness_token = std::sync::Arc::downgrade(&app.borrow().liveness);
1005
1006        let task_ran = Rc::new(RefCell::new(false));
1007
1008        foreground_executor
1009            .spawn_context(liveness_token, {
1010                let task_ran = Rc::clone(&task_ran);
1011                async move {
1012                    *task_ran.borrow_mut() = true;
1013                }
1014            })
1015            .detach();
1016
1017        // Run dispatcher while app is still alive
1018        dispatcher.run_until_parked();
1019
1020        // Task should have run
1021        assert!(
1022            *task_ran.borrow(),
1023            "Task should run normally when app is alive"
1024        );
1025    }
1026
1027    #[test]
1028    fn test_task_cancelled_when_app_dropped() {
1029        let dispatcher = TestDispatcher::new(StdRng::seed_from_u64(0));
1030        let arc_dispatcher = Arc::new(dispatcher.clone());
1031        let background_executor = BackgroundExecutor::new(arc_dispatcher.clone());
1032        let foreground_executor = ForegroundExecutor::new(arc_dispatcher);
1033
1034        let platform = TestPlatform::new(background_executor, foreground_executor.clone());
1035        let asset_source = Arc::new(());
1036        let http_client = http_client::FakeHttpClient::with_404_response();
1037
1038        let app = App::new_app(platform, asset_source, http_client);
1039        let liveness_token = std::sync::Arc::downgrade(&app.borrow().liveness);
1040        let app_weak = Rc::downgrade(&app);
1041
1042        let task_ran = Rc::new(RefCell::new(false));
1043        let task_ran_clone = Rc::clone(&task_ran);
1044
1045        foreground_executor
1046            .spawn_context(liveness_token, async move {
1047                *task_ran_clone.borrow_mut() = true;
1048            })
1049            .detach();
1050
1051        drop(app);
1052
1053        assert!(app_weak.upgrade().is_none(), "App should have been dropped");
1054
1055        dispatcher.run_until_parked();
1056
1057        // The task should have been cancelled, not run
1058        assert!(
1059            !*task_ran.borrow(),
1060            "Task should have been cancelled when app was dropped, but it ran!"
1061        );
1062    }
1063
1064    #[test]
1065    fn test_nested_tasks_both_cancel() {
1066        let dispatcher = TestDispatcher::new(StdRng::seed_from_u64(0));
1067        let arc_dispatcher = Arc::new(dispatcher.clone());
1068        let background_executor = BackgroundExecutor::new(arc_dispatcher.clone());
1069        let foreground_executor = ForegroundExecutor::new(arc_dispatcher);
1070
1071        let platform = TestPlatform::new(background_executor, foreground_executor.clone());
1072        let asset_source = Arc::new(());
1073        let http_client = http_client::FakeHttpClient::with_404_response();
1074
1075        let app = App::new_app(platform, asset_source, http_client);
1076        let liveness_token = std::sync::Arc::downgrade(&app.borrow().liveness);
1077        let app_weak = Rc::downgrade(&app);
1078
1079        let outer_completed = Rc::new(RefCell::new(false));
1080        let inner_completed = Rc::new(RefCell::new(false));
1081        let reached_await = Rc::new(RefCell::new(false));
1082
1083        let outer_flag = Rc::clone(&outer_completed);
1084        let inner_flag = Rc::clone(&inner_completed);
1085        let await_flag = Rc::clone(&reached_await);
1086
1087        // Channel to block the inner task until we're ready
1088        let (tx, rx) = futures::channel::oneshot::channel::<()>();
1089
1090        // We need clones of executor and liveness_token for the inner spawn
1091        let inner_executor = foreground_executor.clone();
1092        let inner_liveness_token = liveness_token.clone();
1093
1094        foreground_executor
1095            .spawn_context(liveness_token, async move {
1096                let inner_task = inner_executor.spawn_context(inner_liveness_token, {
1097                    let inner_flag = Rc::clone(&inner_flag);
1098                    async move {
1099                        rx.await.ok();
1100                        *inner_flag.borrow_mut() = true;
1101                    }
1102                });
1103
1104                *await_flag.borrow_mut() = true;
1105
1106                inner_task.await;
1107
1108                *outer_flag.borrow_mut() = true;
1109            })
1110            .detach();
1111
1112        // Run dispatcher until outer task reaches the await point
1113        // The inner task will be blocked on the channel
1114        dispatcher.run_until_parked();
1115
1116        // Verify we actually reached the await point before dropping the app
1117        assert!(
1118            *reached_await.borrow(),
1119            "Outer task should have reached the await point"
1120        );
1121
1122        // Neither task should have completed yet
1123        assert!(
1124            !*outer_completed.borrow(),
1125            "Outer task should not have completed yet"
1126        );
1127        assert!(
1128            !*inner_completed.borrow(),
1129            "Inner task should not have completed yet"
1130        );
1131
1132        // Drop the channel sender and app while outer is awaiting inner
1133        drop(tx);
1134        drop(app);
1135        assert!(app_weak.upgrade().is_none(), "App should have been dropped");
1136
1137        // Run dispatcher - both tasks should be cancelled
1138        dispatcher.run_until_parked();
1139
1140        // Neither task should have completed (both were cancelled)
1141        assert!(
1142            !*outer_completed.borrow(),
1143            "Outer task should have been cancelled, not completed"
1144        );
1145        assert!(
1146            !*inner_completed.borrow(),
1147            "Inner task should have been cancelled, not completed"
1148        );
1149    }
1150
1151    #[test]
1152    fn test_task_without_app_tracking_still_runs() {
1153        let dispatcher = TestDispatcher::new(StdRng::seed_from_u64(0));
1154        let arc_dispatcher = Arc::new(dispatcher.clone());
1155        let background_executor = BackgroundExecutor::new(arc_dispatcher.clone());
1156        let foreground_executor = ForegroundExecutor::new(arc_dispatcher);
1157
1158        let platform = TestPlatform::new(background_executor, foreground_executor.clone());
1159        let asset_source = Arc::new(());
1160        let http_client = http_client::FakeHttpClient::with_404_response();
1161
1162        let app = App::new_app(platform, asset_source, http_client);
1163        let app_weak = Rc::downgrade(&app);
1164
1165        let task_ran = Rc::new(RefCell::new(false));
1166        let task_ran_clone = Rc::clone(&task_ran);
1167
1168        let _task = foreground_executor.spawn(async move {
1169            *task_ran_clone.borrow_mut() = true;
1170        });
1171
1172        drop(app);
1173
1174        assert!(app_weak.upgrade().is_none(), "App should have been dropped");
1175
1176        dispatcher.run_until_parked();
1177
1178        assert!(
1179            *task_ran.borrow(),
1180            "Task without app tracking should still run after app is dropped"
1181        );
1182    }
1183
1184    #[test]
1185    #[should_panic]
1186    fn test_polling_cancelled_task_panics() {
1187        let dispatcher = TestDispatcher::new(StdRng::seed_from_u64(0));
1188        let arc_dispatcher = Arc::new(dispatcher.clone());
1189        let background_executor = BackgroundExecutor::new(arc_dispatcher.clone());
1190        let foreground_executor = ForegroundExecutor::new(arc_dispatcher);
1191
1192        let platform = TestPlatform::new(background_executor.clone(), foreground_executor.clone());
1193        let asset_source = Arc::new(());
1194        let http_client = http_client::FakeHttpClient::with_404_response();
1195
1196        let app = App::new_app(platform, asset_source, http_client);
1197        let liveness_token = std::sync::Arc::downgrade(&app.borrow().liveness);
1198        let app_weak = Rc::downgrade(&app);
1199
1200        let task = foreground_executor.spawn_context(liveness_token, async move { 42 });
1201
1202        drop(app);
1203
1204        assert!(app_weak.upgrade().is_none(), "App should have been dropped");
1205
1206        dispatcher.run_until_parked();
1207
1208        background_executor.block(task);
1209    }
1210
1211    #[test]
1212    fn test_polling_cancelled_task_returns_none_with_fallible() {
1213        let dispatcher = TestDispatcher::new(StdRng::seed_from_u64(0));
1214        let arc_dispatcher = Arc::new(dispatcher.clone());
1215        let background_executor = BackgroundExecutor::new(arc_dispatcher.clone());
1216        let foreground_executor = ForegroundExecutor::new(arc_dispatcher);
1217
1218        let platform = TestPlatform::new(background_executor.clone(), foreground_executor.clone());
1219        let asset_source = Arc::new(());
1220        let http_client = http_client::FakeHttpClient::with_404_response();
1221
1222        let app = App::new_app(platform, asset_source, http_client);
1223        let liveness_token = std::sync::Arc::downgrade(&app.borrow().liveness);
1224        let app_weak = Rc::downgrade(&app);
1225
1226        let task = foreground_executor
1227            .spawn_context(liveness_token, async move { 42 })
1228            .fallible();
1229
1230        drop(app);
1231
1232        assert!(app_weak.upgrade().is_none(), "App should have been dropped");
1233
1234        dispatcher.run_until_parked();
1235
1236        let result = background_executor.block(task);
1237        assert_eq!(result, None, "Cancelled task should return None");
1238    }
1239}