executor.rs

   1use crate::{App, GpuiRunnable, PlatformDispatcher, RunnableMeta, TaskTiming, profiler};
   2use async_task::Runnable;
   3use futures::channel::mpsc;
   4use parking_lot::{Condvar, Mutex};
   5use smol::prelude::*;
   6use std::{
   7    cell::Cell,
   8    fmt::Debug,
   9    marker::PhantomData,
  10    mem::{self, ManuallyDrop},
  11    num::NonZeroUsize,
  12    panic::Location,
  13    pin::Pin,
  14    rc::Rc,
  15    sync::{
  16        Arc,
  17        atomic::{AtomicUsize, Ordering},
  18    },
  19    task::{Context, Poll},
  20    thread::{self, ThreadId},
  21    time::{Duration, Instant},
  22};
  23use util::TryFutureExt as _;
  24use waker_fn::waker_fn;
  25
  26#[cfg(any(test, feature = "test-support"))]
  27use rand::rngs::StdRng;
  28
  29/// A pointer to the executor that is currently running,
  30/// for spawning background tasks.
  31#[derive(Clone)]
  32pub struct BackgroundExecutor {
  33    #[doc(hidden)]
  34    pub dispatcher: Arc<dyn PlatformDispatcher>,
  35}
  36
  37/// A pointer to the executor that is currently running,
  38/// for spawning tasks on the main thread.
  39///
  40/// This is intentionally `!Send` via the `not_send` marker field. This is because
  41/// `ForegroundExecutor::spawn` does not require `Send` but checks at runtime that the future is
  42/// only polled from the same thread it was spawned from. These checks would fail when spawning
  43/// foreground tasks from background threads.
  44#[derive(Clone)]
  45pub struct ForegroundExecutor {
  46    #[doc(hidden)]
  47    pub dispatcher: Arc<dyn PlatformDispatcher>,
  48    liveness: std::sync::Weak<()>,
  49    not_send: PhantomData<Rc<()>>,
  50}
  51
  52/// Realtime task priority
  53#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
  54#[repr(u8)]
  55pub enum RealtimePriority {
  56    /// Audio task
  57    Audio,
  58    /// Other realtime task
  59    #[default]
  60    Other,
  61}
  62
  63/// Task priority
  64#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
  65#[repr(u8)]
  66pub enum Priority {
  67    /// Realtime priority
  68    ///
  69    /// Spawning a task with this priority will spin it off on a separate thread dedicated just to that task.
  70    Realtime(RealtimePriority),
  71    /// High priority
  72    ///
  73    /// Only use for tasks that are critical to the user experience / responsiveness of the editor.
  74    High,
  75    /// Medium priority, probably suits most of your use cases.
  76    #[default]
  77    Medium,
  78    /// Low priority
  79    ///
  80    /// Prioritize this for background work that can come in large quantities
  81    /// to not starve the executor of resources for high priority tasks
  82    Low,
  83}
  84
  85thread_local! {
  86static CURRENT_TASKS_PRIORITY: Cell<Priority> = const { Cell::new(Priority::Medium) }; }
  87
  88impl Priority {
  89    /// Sets the priority any spawn call from the runnable about
  90    /// to be run will use
  91    pub(crate) fn set_as_default_for_spawns(&self) {
  92        CURRENT_TASKS_PRIORITY.set(*self);
  93    }
  94
  95    /// Returns the priority from the currently running task
  96    pub fn inherit() -> Self {
  97        CURRENT_TASKS_PRIORITY.get()
  98    }
  99
 100    #[allow(dead_code)]
 101    pub(crate) const fn probability(&self) -> u32 {
 102        match self {
 103            // realtime priorities are not considered for probability scheduling
 104            Priority::Realtime(_) => 0,
 105            Priority::High => 60,
 106            Priority::Medium => 30,
 107            Priority::Low => 10,
 108        }
 109    }
 110}
 111
 112/// Task is a primitive that allows work to happen in the background.
 113///
 114/// It implements [`Future`] so you can `.await` on it.
 115///
 116/// If you drop a task it will be cancelled immediately. Calling [`Task::detach`] allows
 117/// the task to continue running, but with no way to return a value.
 118#[must_use]
 119#[derive(Debug)]
 120pub struct Task<T>(TaskState<T>);
 121
 122#[derive(Debug)]
 123enum TaskState<T> {
 124    /// A task that is ready to return a value
 125    Ready(Option<T>),
 126
 127    /// A task that is currently running.
 128    Spawned(async_task::Task<T, RunnableMeta>),
 129}
 130
 131impl<T> Task<T> {
 132    /// Creates a new task that will resolve with the value
 133    pub fn ready(val: T) -> Self {
 134        Task(TaskState::Ready(Some(val)))
 135    }
 136
 137    /// Detaching a task runs it to completion in the background
 138    pub fn detach(self) {
 139        match self {
 140            Task(TaskState::Ready(_)) => {}
 141            Task(TaskState::Spawned(task)) => task.detach(),
 142        }
 143    }
 144
 145    /// Converts this task into a fallible task that returns `Option<T>`.
 146    ///
 147    /// Unlike the standard `Task<T>`, a [`FallibleTask`] will return `None`
 148    /// if the app was dropped while the task is executing.
 149    ///
 150    /// # Example
 151    ///
 152    /// ```ignore
 153    /// // Background task that gracefully handles app shutdown:
 154    /// cx.background_spawn(async move {
 155    ///     let result = foreground_task.fallible().await;
 156    ///     if let Some(value) = result {
 157    ///         // Process the value
 158    ///     }
 159    ///     // If None, app was shut down - just exit gracefully
 160    /// }).detach();
 161    /// ```
 162    pub fn fallible(self) -> FallibleTask<T> {
 163        FallibleTask(match self.0 {
 164            TaskState::Ready(val) => FallibleTaskState::Ready(val),
 165            TaskState::Spawned(task) => FallibleTaskState::Spawned(task.fallible()),
 166        })
 167    }
 168}
 169
 170impl<E, T> Task<Result<T, E>>
 171where
 172    T: 'static,
 173    E: 'static + Debug,
 174{
 175    /// Run the task to completion in the background and log any
 176    /// errors that occur.
 177    #[track_caller]
 178    pub fn detach_and_log_err(self, cx: &App) {
 179        let location = core::panic::Location::caller();
 180        cx.foreground_executor()
 181            .spawn(self.log_tracked_err(*location))
 182            .detach();
 183    }
 184}
 185
 186impl<T> Future for Task<T> {
 187    type Output = T;
 188
 189    fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
 190        match unsafe { self.get_unchecked_mut() } {
 191            Task(TaskState::Ready(val)) => Poll::Ready(val.take().unwrap()),
 192            Task(TaskState::Spawned(task)) => task.poll(cx),
 193        }
 194    }
 195}
 196
 197/// A task that returns `Option<T>` instead of panicking when cancelled.
 198#[must_use]
 199pub struct FallibleTask<T>(FallibleTaskState<T>);
 200
 201enum FallibleTaskState<T> {
 202    /// A task that is ready to return a value
 203    Ready(Option<T>),
 204
 205    /// A task that is currently running (wraps async_task::FallibleTask).
 206    Spawned(async_task::FallibleTask<T, RunnableMeta>),
 207}
 208
 209impl<T> FallibleTask<T> {
 210    /// Creates a new fallible task that will resolve with the value.
 211    pub fn ready(val: T) -> Self {
 212        FallibleTask(FallibleTaskState::Ready(Some(val)))
 213    }
 214
 215    /// Detaching a task runs it to completion in the background.
 216    pub fn detach(self) {
 217        match self.0 {
 218            FallibleTaskState::Ready(_) => {}
 219            FallibleTaskState::Spawned(task) => task.detach(),
 220        }
 221    }
 222}
 223
 224impl<T> Future for FallibleTask<T> {
 225    type Output = Option<T>;
 226
 227    fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
 228        match unsafe { self.get_unchecked_mut() } {
 229            FallibleTask(FallibleTaskState::Ready(val)) => Poll::Ready(val.take()),
 230            FallibleTask(FallibleTaskState::Spawned(task)) => Pin::new(task).poll(cx),
 231        }
 232    }
 233}
 234
 235impl<T> std::fmt::Debug for FallibleTask<T> {
 236    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
 237        match &self.0 {
 238            FallibleTaskState::Ready(_) => f.debug_tuple("FallibleTask::Ready").finish(),
 239            FallibleTaskState::Spawned(task) => {
 240                f.debug_tuple("FallibleTask::Spawned").field(task).finish()
 241            }
 242        }
 243    }
 244}
 245
 246/// A task label is an opaque identifier that you can use to
 247/// refer to a task in tests.
 248#[derive(Clone, Copy, PartialEq, Eq, Hash, Debug)]
 249pub struct TaskLabel(NonZeroUsize);
 250
 251impl Default for TaskLabel {
 252    fn default() -> Self {
 253        Self::new()
 254    }
 255}
 256
 257impl TaskLabel {
 258    /// Construct a new task label.
 259    pub fn new() -> Self {
 260        static NEXT_TASK_LABEL: AtomicUsize = AtomicUsize::new(1);
 261        Self(
 262            NEXT_TASK_LABEL
 263                .fetch_add(1, Ordering::SeqCst)
 264                .try_into()
 265                .unwrap(),
 266        )
 267    }
 268}
 269
 270type AnyLocalFuture<R> = Pin<Box<dyn 'static + Future<Output = R>>>;
 271
 272type AnyFuture<R> = Pin<Box<dyn 'static + Send + Future<Output = R>>>;
 273
 274/// BackgroundExecutor lets you run things on background threads.
 275/// In production this is a thread pool with no ordering guarantees.
 276/// In tests this is simulated by running tasks one by one in a deterministic
 277/// (but arbitrary) order controlled by the `SEED` environment variable.
 278impl BackgroundExecutor {
 279    #[doc(hidden)]
 280    pub fn new(dispatcher: Arc<dyn PlatformDispatcher>) -> Self {
 281        Self { dispatcher }
 282    }
 283
 284    /// Enqueues the given future to be run to completion on a background thread.
 285    #[track_caller]
 286    pub fn spawn<R>(&self, future: impl Future<Output = R> + Send + 'static) -> Task<R>
 287    where
 288        R: Send + 'static,
 289    {
 290        self.spawn_with_priority(Priority::default(), future)
 291    }
 292
 293    /// Enqueues the given future to be run to completion on a background thread.
 294    #[track_caller]
 295    pub fn spawn_with_priority<R>(
 296        &self,
 297        priority: Priority,
 298        future: impl Future<Output = R> + Send + 'static,
 299    ) -> Task<R>
 300    where
 301        R: Send + 'static,
 302    {
 303        self.spawn_internal::<R>(Box::pin(future), None, priority)
 304    }
 305
 306    /// Enqueues the given future to be run to completion on a background thread and blocking the current task on it.
 307    ///
 308    /// This allows to spawn background work that borrows from its scope. Note that the supplied future will run to
 309    /// completion before the current task is resumed, even if the current task is slated for cancellation.
 310    pub async fn await_on_background<R>(&self, future: impl Future<Output = R> + Send) -> R
 311    where
 312        R: Send,
 313    {
 314        // We need to ensure that cancellation of the parent task does not drop the environment
 315        // before the our own task has completed or got cancelled.
 316        struct NotifyOnDrop<'a>(&'a (Condvar, Mutex<bool>));
 317
 318        impl Drop for NotifyOnDrop<'_> {
 319            fn drop(&mut self) {
 320                *self.0.1.lock() = true;
 321                self.0.0.notify_all();
 322            }
 323        }
 324
 325        struct WaitOnDrop<'a>(&'a (Condvar, Mutex<bool>));
 326
 327        impl Drop for WaitOnDrop<'_> {
 328            fn drop(&mut self) {
 329                let mut done = self.0.1.lock();
 330                if !*done {
 331                    self.0.0.wait(&mut done);
 332                }
 333            }
 334        }
 335
 336        let dispatcher = self.dispatcher.clone();
 337        let location = core::panic::Location::caller();
 338
 339        let pair = &(Condvar::new(), Mutex::new(false));
 340        let _wait_guard = WaitOnDrop(pair);
 341
 342        let (runnable, task) = unsafe {
 343            async_task::Builder::new()
 344                .metadata(RunnableMeta {
 345                    location,
 346                    app: None,
 347                    priority: Priority::inherit(),
 348                })
 349                .spawn_unchecked(
 350                    move |_| async {
 351                        let _notify_guard = NotifyOnDrop(pair);
 352                        future.await
 353                    },
 354                    move |runnable| dispatcher.dispatch(GpuiRunnable::GpuiSpawned(runnable), None),
 355                )
 356        };
 357        runnable.schedule();
 358        task.await
 359    }
 360
 361    /// Enqueues the given future to be run to completion on a background thread.
 362    /// The given label can be used to control the priority of the task in tests.
 363    #[track_caller]
 364    pub fn spawn_labeled<R>(
 365        &self,
 366        label: TaskLabel,
 367        future: impl Future<Output = R> + Send + 'static,
 368    ) -> Task<R>
 369    where
 370        R: Send + 'static,
 371    {
 372        self.spawn_internal::<R>(Box::pin(future), Some(label), Priority::default())
 373    }
 374
 375    #[track_caller]
 376    fn spawn_internal<R: Send + 'static>(
 377        &self,
 378        future: AnyFuture<R>,
 379        label: Option<TaskLabel>,
 380        priority: Priority,
 381    ) -> Task<R> {
 382        let dispatcher = self.dispatcher.clone();
 383        let (runnable, task) = if let Priority::Realtime(realtime) = priority {
 384            let location = core::panic::Location::caller();
 385            let (mut tx, rx) = flume::bounded::<Runnable<RunnableMeta>>(1);
 386
 387            dispatcher.spawn_realtime(
 388                realtime,
 389                Box::new(move || {
 390                    while let Ok(runnable) = rx.recv() {
 391                        let start = Instant::now();
 392                        let location = runnable.metadata().location;
 393                        let mut timing = TaskTiming {
 394                            location,
 395                            start,
 396                            end: None,
 397                        };
 398                        profiler::add_task_timing(timing);
 399
 400                        Priority::Realtime(realtime).set_as_default_for_spawns();
 401                        runnable.run();
 402
 403                        let end = Instant::now();
 404                        timing.end = Some(end);
 405                        profiler::add_task_timing(timing);
 406                    }
 407                }),
 408            );
 409
 410            async_task::Builder::new()
 411                .metadata(RunnableMeta {
 412                    location,
 413                    priority,
 414                    app: None,
 415                })
 416                .spawn(
 417                    move |_| future,
 418                    move |runnable| {
 419                        let _ = tx.send(runnable);
 420                    },
 421                )
 422        } else {
 423            let location = core::panic::Location::caller();
 424            async_task::Builder::new()
 425                .metadata(RunnableMeta {
 426                    location,
 427                    priority,
 428                    app: None,
 429                })
 430                .spawn(
 431                    move |_| future,
 432                    move |runnable| dispatcher.dispatch(GpuiRunnable::GpuiSpawned(runnable), label),
 433                )
 434        };
 435
 436        runnable.schedule();
 437        Task(TaskState::Spawned(task))
 438    }
 439
 440    /// Used by the test harness to run an async test in a synchronous fashion.
 441    #[cfg(any(test, feature = "test-support"))]
 442    #[track_caller]
 443    pub fn block_test<R>(&self, future: impl Future<Output = R>) -> R {
 444        if let Ok(value) = self.block_internal(false, future, None) {
 445            value
 446        } else {
 447            unreachable!()
 448        }
 449    }
 450
 451    /// Block the current thread until the given future resolves.
 452    /// Consider using `block_with_timeout` instead.
 453    pub fn block<R>(&self, future: impl Future<Output = R>) -> R {
 454        if let Ok(value) = self.block_internal(true, future, None) {
 455            value
 456        } else {
 457            unreachable!()
 458        }
 459    }
 460
 461    #[cfg(not(any(test, feature = "test-support")))]
 462    pub(crate) fn block_internal<Fut: Future>(
 463        &self,
 464        _background_only: bool,
 465        future: Fut,
 466        timeout: Option<Duration>,
 467    ) -> Result<Fut::Output, impl Future<Output = Fut::Output> + use<Fut>> {
 468        use std::time::Instant;
 469
 470        let mut future = Box::pin(future);
 471        if timeout == Some(Duration::ZERO) {
 472            return Err(future);
 473        }
 474        let deadline = timeout.map(|timeout| Instant::now() + timeout);
 475
 476        let parker = parking::Parker::new();
 477        let unparker = parker.unparker();
 478        let waker = waker_fn(move || {
 479            unparker.unpark();
 480        });
 481        let mut cx = std::task::Context::from_waker(&waker);
 482
 483        loop {
 484            match future.as_mut().poll(&mut cx) {
 485                Poll::Ready(result) => return Ok(result),
 486                Poll::Pending => {
 487                    let timeout =
 488                        deadline.map(|deadline| deadline.saturating_duration_since(Instant::now()));
 489                    if let Some(timeout) = timeout {
 490                        if !parker.park_timeout(timeout)
 491                            && deadline.is_some_and(|deadline| deadline < Instant::now())
 492                        {
 493                            return Err(future);
 494                        }
 495                    } else {
 496                        parker.park();
 497                    }
 498                }
 499            }
 500        }
 501    }
 502
 503    #[cfg(any(test, feature = "test-support"))]
 504    #[track_caller]
 505    pub(crate) fn block_internal<Fut: Future>(
 506        &self,
 507        background_only: bool,
 508        future: Fut,
 509        timeout: Option<Duration>,
 510    ) -> Result<Fut::Output, impl Future<Output = Fut::Output> + use<Fut>> {
 511        use std::sync::atomic::AtomicBool;
 512        use std::time::Instant;
 513
 514        use parking::Parker;
 515
 516        let mut future = Box::pin(future);
 517        if timeout == Some(Duration::ZERO) {
 518            return Err(future);
 519        }
 520
 521        // When using a real platform (e.g., MacPlatform for visual tests that need actual
 522        // Metal rendering), there's no test dispatcher. In this case, we block the thread
 523        // directly by polling the future and parking until woken. This is required for
 524        // VisualTestAppContext which uses real platform rendering but still needs blocking
 525        // behavior for code paths like editor initialization that call block_with_timeout.
 526        let Some(dispatcher) = self.dispatcher.as_test() else {
 527            let deadline = timeout.map(|timeout| Instant::now() + timeout);
 528
 529            let parker = Parker::new();
 530            let unparker = parker.unparker();
 531            let waker = waker_fn(move || {
 532                unparker.unpark();
 533            });
 534            let mut cx = std::task::Context::from_waker(&waker);
 535
 536            loop {
 537                match future.as_mut().poll(&mut cx) {
 538                    Poll::Ready(result) => return Ok(result),
 539                    Poll::Pending => {
 540                        let timeout = deadline
 541                            .map(|deadline| deadline.saturating_duration_since(Instant::now()));
 542                        if let Some(timeout) = timeout {
 543                            if !parker.park_timeout(timeout)
 544                                && deadline.is_some_and(|deadline| deadline < Instant::now())
 545                            {
 546                                return Err(future);
 547                            }
 548                        } else {
 549                            parker.park();
 550                        }
 551                    }
 552                }
 553            }
 554        };
 555
 556        let mut max_ticks = if timeout.is_some() {
 557            dispatcher.gen_block_on_ticks()
 558        } else {
 559            usize::MAX
 560        };
 561
 562        let parker = Parker::new();
 563        let unparker = parker.unparker();
 564
 565        let awoken = Arc::new(AtomicBool::new(false));
 566        let waker = waker_fn({
 567            let awoken = awoken.clone();
 568            let unparker = unparker.clone();
 569            move || {
 570                awoken.store(true, Ordering::SeqCst);
 571                unparker.unpark();
 572            }
 573        });
 574        let mut cx = std::task::Context::from_waker(&waker);
 575
 576        let duration = Duration::from_secs(
 577            option_env!("GPUI_TEST_TIMEOUT")
 578                .and_then(|s| s.parse::<u64>().ok())
 579                .unwrap_or(180),
 580        );
 581        let mut test_should_end_by = Instant::now() + duration;
 582
 583        loop {
 584            match future.as_mut().poll(&mut cx) {
 585                Poll::Ready(result) => return Ok(result),
 586                Poll::Pending => {
 587                    if max_ticks == 0 {
 588                        return Err(future);
 589                    }
 590                    max_ticks -= 1;
 591
 592                    if !dispatcher.tick(background_only) {
 593                        if awoken.swap(false, Ordering::SeqCst) {
 594                            continue;
 595                        }
 596
 597                        if !dispatcher.parking_allowed() {
 598                            if dispatcher.advance_clock_to_next_delayed() {
 599                                continue;
 600                            }
 601                            let mut backtrace_message = String::new();
 602                            let mut waiting_message = String::new();
 603                            if let Some(backtrace) = dispatcher.waiting_backtrace() {
 604                                backtrace_message =
 605                                    format!("\nbacktrace of waiting future:\n{:?}", backtrace);
 606                            }
 607                            if let Some(waiting_hint) = dispatcher.waiting_hint() {
 608                                waiting_message = format!("\n  waiting on: {}\n", waiting_hint);
 609                            }
 610                            panic!(
 611                                "parked with nothing left to run{waiting_message}{backtrace_message}",
 612                            )
 613                        }
 614                        dispatcher.push_unparker(unparker.clone());
 615                        parker.park_timeout(Duration::from_millis(1));
 616                        if Instant::now() > test_should_end_by {
 617                            panic!("test timed out after {duration:?} with allow_parking")
 618                        }
 619                    }
 620                }
 621            }
 622        }
 623    }
 624
 625    /// Block the current thread until the given future resolves
 626    /// or `duration` has elapsed.
 627    pub fn block_with_timeout<Fut: Future>(
 628        &self,
 629        duration: Duration,
 630        future: Fut,
 631    ) -> Result<Fut::Output, impl Future<Output = Fut::Output> + use<Fut>> {
 632        self.block_internal(true, future, Some(duration))
 633    }
 634
 635    /// Scoped lets you start a number of tasks and waits
 636    /// for all of them to complete before returning.
 637    pub async fn scoped<'scope, F>(&self, scheduler: F)
 638    where
 639        F: FnOnce(&mut Scope<'scope>),
 640    {
 641        let mut scope = Scope::new(self.clone(), Priority::default());
 642        (scheduler)(&mut scope);
 643        let spawned = mem::take(&mut scope.futures)
 644            .into_iter()
 645            .map(|f| self.spawn_with_priority(scope.priority, f))
 646            .collect::<Vec<_>>();
 647        for task in spawned {
 648            task.await;
 649        }
 650    }
 651
 652    /// Scoped lets you start a number of tasks and waits
 653    /// for all of them to complete before returning.
 654    pub async fn scoped_priority<'scope, F>(&self, priority: Priority, scheduler: F)
 655    where
 656        F: FnOnce(&mut Scope<'scope>),
 657    {
 658        let mut scope = Scope::new(self.clone(), priority);
 659        (scheduler)(&mut scope);
 660        let spawned = mem::take(&mut scope.futures)
 661            .into_iter()
 662            .map(|f| self.spawn_with_priority(scope.priority, f))
 663            .collect::<Vec<_>>();
 664        for task in spawned {
 665            task.await;
 666        }
 667    }
 668
 669    /// Get the current time.
 670    ///
 671    /// Calling this instead of `std::time::Instant::now` allows the use
 672    /// of fake timers in tests.
 673    pub fn now(&self) -> Instant {
 674        self.dispatcher.now()
 675    }
 676
 677    /// Returns a task that will complete after the given duration.
 678    /// Depending on other concurrent tasks the elapsed duration may be longer
 679    /// than requested.
 680    pub fn timer(&self, duration: Duration) -> Task<()> {
 681        if duration.is_zero() {
 682            return Task::ready(());
 683        }
 684        let location = core::panic::Location::caller();
 685        let (runnable, task) = async_task::Builder::new()
 686            .metadata(RunnableMeta {
 687                location,
 688                priority: Priority::inherit(),
 689                app: None,
 690            })
 691            .spawn(move |_| async move {}, {
 692                let dispatcher = self.dispatcher.clone();
 693                move |runnable| {
 694                    dispatcher.dispatch_after(duration, GpuiRunnable::GpuiSpawned(runnable))
 695                }
 696            });
 697        runnable.schedule();
 698        Task(TaskState::Spawned(task))
 699    }
 700
 701    /// in tests, start_waiting lets you indicate which task is waiting (for debugging only)
 702    #[cfg(any(test, feature = "test-support"))]
 703    pub fn start_waiting(&self) {
 704        self.dispatcher.as_test().unwrap().start_waiting();
 705    }
 706
 707    /// in tests, removes the debugging data added by start_waiting
 708    #[cfg(any(test, feature = "test-support"))]
 709    pub fn finish_waiting(&self) {
 710        self.dispatcher.as_test().unwrap().finish_waiting();
 711    }
 712
 713    /// in tests, run an arbitrary number of tasks (determined by the SEED environment variable)
 714    #[cfg(any(test, feature = "test-support"))]
 715    pub fn simulate_random_delay(&self) -> impl Future<Output = ()> + use<> {
 716        self.dispatcher.as_test().unwrap().simulate_random_delay()
 717    }
 718
 719    /// in tests, indicate that a given task from `spawn_labeled` should run after everything else
 720    #[cfg(any(test, feature = "test-support"))]
 721    pub fn deprioritize(&self, task_label: TaskLabel) {
 722        self.dispatcher.as_test().unwrap().deprioritize(task_label)
 723    }
 724
 725    /// in tests, move time forward. This does not run any tasks, but does make `timer`s ready.
 726    #[cfg(any(test, feature = "test-support"))]
 727    pub fn advance_clock(&self, duration: Duration) {
 728        self.dispatcher.as_test().unwrap().advance_clock(duration)
 729    }
 730
 731    /// in tests, run one task.
 732    #[cfg(any(test, feature = "test-support"))]
 733    pub fn tick(&self) -> bool {
 734        self.dispatcher.as_test().unwrap().tick(false)
 735    }
 736
 737    /// in tests, run all tasks that are ready to run. If after doing so
 738    /// the test still has outstanding tasks, this will panic. (See also [`Self::allow_parking`])
 739    #[cfg(any(test, feature = "test-support"))]
 740    pub fn run_until_parked(&self) {
 741        self.dispatcher.as_test().unwrap().run_until_parked()
 742    }
 743
 744    /// in tests, prevents `run_until_parked` from panicking if there are outstanding tasks.
 745    /// This is useful when you are integrating other (non-GPUI) futures, like disk access, that
 746    /// do take real async time to run.
 747    #[cfg(any(test, feature = "test-support"))]
 748    pub fn allow_parking(&self) {
 749        self.dispatcher.as_test().unwrap().allow_parking();
 750    }
 751
 752    /// undoes the effect of [`Self::allow_parking`].
 753    #[cfg(any(test, feature = "test-support"))]
 754    pub fn forbid_parking(&self) {
 755        self.dispatcher.as_test().unwrap().forbid_parking();
 756    }
 757
 758    /// adds detail to the "parked with nothing let to run" message.
 759    #[cfg(any(test, feature = "test-support"))]
 760    pub fn set_waiting_hint(&self, msg: Option<String>) {
 761        self.dispatcher.as_test().unwrap().set_waiting_hint(msg);
 762    }
 763
 764    /// in tests, returns the rng used by the dispatcher and seeded by the `SEED` environment variable
 765    #[cfg(any(test, feature = "test-support"))]
 766    pub fn rng(&self) -> StdRng {
 767        self.dispatcher.as_test().unwrap().rng()
 768    }
 769
 770    /// How many CPUs are available to the dispatcher.
 771    pub fn num_cpus(&self) -> usize {
 772        #[cfg(any(test, feature = "test-support"))]
 773        return 4;
 774
 775        #[cfg(not(any(test, feature = "test-support")))]
 776        return num_cpus::get();
 777    }
 778
 779    /// Whether we're on the main thread.
 780    pub fn is_main_thread(&self) -> bool {
 781        self.dispatcher.is_main_thread()
 782    }
 783
 784    #[cfg(any(test, feature = "test-support"))]
 785    /// in tests, control the number of ticks that `block_with_timeout` will run before timing out.
 786    pub fn set_block_on_ticks(&self, range: std::ops::RangeInclusive<usize>) {
 787        self.dispatcher.as_test().unwrap().set_block_on_ticks(range);
 788    }
 789}
 790
 791/// ForegroundExecutor runs things on the main thread.
 792impl ForegroundExecutor {
 793    /// Creates a new ForegroundExecutor from the given PlatformDispatcher.
 794    pub fn new(dispatcher: Arc<dyn PlatformDispatcher>, liveness: std::sync::Weak<()>) -> Self {
 795        Self {
 796            dispatcher,
 797            liveness,
 798            not_send: PhantomData,
 799        }
 800    }
 801
 802    /// Enqueues the given Task to run on the main thread at some point in the
 803    /// future. This inherits the priority of the caller. Use
 804    /// [`spawn_with_priority`](Self::spawn_with_priority) if you want to
 805    /// overwrite that.
 806    #[track_caller]
 807    pub fn spawn<R>(&self, future: impl Future<Output = R> + 'static) -> Task<R>
 808    where
 809        R: 'static,
 810    {
 811        self.inner_spawn(self.liveness.clone(), Priority::default(), future)
 812    }
 813
 814    /// Enqueues the given Task to run on the main thread at some point in the future.
 815    #[track_caller]
 816    pub fn spawn_with_priority<R>(
 817        &self,
 818        priority: Priority,
 819        future: impl Future<Output = R> + 'static,
 820    ) -> Task<R>
 821    where
 822        R: 'static,
 823    {
 824        self.inner_spawn(self.liveness.clone(), priority, future)
 825    }
 826
 827    #[track_caller]
 828    pub(crate) fn inner_spawn<R>(
 829        &self,
 830        app: std::sync::Weak<()>,
 831        priority: Priority,
 832        future: impl Future<Output = R> + 'static,
 833    ) -> Task<R>
 834    where
 835        R: 'static,
 836    {
 837        let dispatcher = self.dispatcher.clone();
 838        let location = core::panic::Location::caller();
 839
 840        #[track_caller]
 841        fn inner<R: 'static>(
 842            dispatcher: Arc<dyn PlatformDispatcher>,
 843            future: AnyLocalFuture<R>,
 844            location: &'static core::panic::Location<'static>,
 845            app: std::sync::Weak<()>,
 846            priority: Priority,
 847        ) -> Task<R> {
 848            let (runnable, task) = spawn_local_with_source_location(
 849                future,
 850                move |runnable| {
 851                    dispatcher.dispatch_on_main_thread(GpuiRunnable::GpuiSpawned(runnable))
 852                },
 853                RunnableMeta {
 854                    location,
 855                    priority,
 856                    app: Some(app),
 857                },
 858            );
 859            runnable.schedule();
 860            Task(TaskState::Spawned(task))
 861        }
 862        inner::<R>(dispatcher, Box::pin(future), location, app, priority)
 863    }
 864}
 865
 866/// Variant of `async_task::spawn_local` that includes the source location of the spawn in panics.
 867///
 868/// Copy-modified from:
 869/// <https://github.com/smol-rs/async-task/blob/ca9dbe1db9c422fd765847fa91306e30a6bb58a9/src/runnable.rs#L405>
 870#[track_caller]
 871fn spawn_local_with_source_location<Fut, S, M>(
 872    future: Fut,
 873    schedule: S,
 874    metadata: M,
 875) -> (Runnable<M>, async_task::Task<Fut::Output, M>)
 876where
 877    Fut: Future + 'static,
 878    Fut::Output: 'static,
 879    S: async_task::Schedule<M> + Send + Sync + 'static,
 880    M: 'static,
 881{
 882    #[inline]
 883    fn thread_id() -> ThreadId {
 884        std::thread_local! {
 885            static ID: ThreadId = thread::current().id();
 886        }
 887        ID.try_with(|id| *id)
 888            .unwrap_or_else(|_| thread::current().id())
 889    }
 890
 891    struct Checked<F> {
 892        id: ThreadId,
 893        inner: ManuallyDrop<F>,
 894        location: &'static Location<'static>,
 895    }
 896
 897    impl<F> Drop for Checked<F> {
 898        fn drop(&mut self) {
 899            assert!(
 900                self.id == thread_id(),
 901                "local task dropped by a thread that didn't spawn it. Task spawned at {}",
 902                self.location
 903            );
 904            unsafe { ManuallyDrop::drop(&mut self.inner) };
 905        }
 906    }
 907
 908    impl<F: Future> Future for Checked<F> {
 909        type Output = F::Output;
 910
 911        fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
 912            assert!(
 913                self.id == thread_id(),
 914                "local task polled by a thread that didn't spawn it. Task spawned at {}",
 915                self.location
 916            );
 917            unsafe { self.map_unchecked_mut(|c| &mut *c.inner).poll(cx) }
 918        }
 919    }
 920
 921    // Wrap the future into one that checks which thread it's on.
 922    let future = Checked {
 923        id: thread_id(),
 924        inner: ManuallyDrop::new(future),
 925        location: Location::caller(),
 926    };
 927
 928    unsafe {
 929        async_task::Builder::new()
 930            .metadata(metadata)
 931            .spawn_unchecked(move |_| future, schedule)
 932    }
 933}
 934
 935/// Scope manages a set of tasks that are enqueued and waited on together. See [`BackgroundExecutor::scoped`].
 936pub struct Scope<'a> {
 937    executor: BackgroundExecutor,
 938    priority: Priority,
 939    futures: Vec<Pin<Box<dyn Future<Output = ()> + Send + 'static>>>,
 940    tx: Option<mpsc::Sender<()>>,
 941    rx: mpsc::Receiver<()>,
 942    lifetime: PhantomData<&'a ()>,
 943}
 944
 945impl<'a> Scope<'a> {
 946    fn new(executor: BackgroundExecutor, priority: Priority) -> Self {
 947        let (tx, rx) = mpsc::channel(1);
 948        Self {
 949            executor,
 950            priority,
 951            tx: Some(tx),
 952            rx,
 953            futures: Default::default(),
 954            lifetime: PhantomData,
 955        }
 956    }
 957
 958    /// How many CPUs are available to the dispatcher.
 959    pub fn num_cpus(&self) -> usize {
 960        self.executor.num_cpus()
 961    }
 962
 963    /// Spawn a future into this scope.
 964    #[track_caller]
 965    pub fn spawn<F>(&mut self, f: F)
 966    where
 967        F: Future<Output = ()> + Send + 'a,
 968    {
 969        let tx = self.tx.clone().unwrap();
 970
 971        // SAFETY: The 'a lifetime is guaranteed to outlive any of these futures because
 972        // dropping this `Scope` blocks until all of the futures have resolved.
 973        let f = unsafe {
 974            mem::transmute::<
 975                Pin<Box<dyn Future<Output = ()> + Send + 'a>>,
 976                Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
 977            >(Box::pin(async move {
 978                f.await;
 979                drop(tx);
 980            }))
 981        };
 982        self.futures.push(f);
 983    }
 984}
 985
 986impl Drop for Scope<'_> {
 987    fn drop(&mut self) {
 988        self.tx.take().unwrap();
 989
 990        // Wait until the channel is closed, which means that all of the spawned
 991        // futures have resolved.
 992        self.executor.block(self.rx.next());
 993    }
 994}
 995
 996#[cfg(test)]
 997mod test {
 998    use super::*;
 999    use crate::{App, TestDispatcher, TestPlatform};
1000    use rand::SeedableRng;
1001    use std::cell::RefCell;
1002
1003    /// Helper to create test infrastructure.
1004    /// Returns (dispatcher, background_executor, app) where app's foreground_executor has liveness.
1005    fn create_test_app() -> (TestDispatcher, BackgroundExecutor, Rc<crate::AppCell>) {
1006        let dispatcher = TestDispatcher::new(StdRng::seed_from_u64(0));
1007        let arc_dispatcher = Arc::new(dispatcher.clone());
1008        // Create liveness for task cancellation
1009        let liveness = std::sync::Arc::new(());
1010        let liveness_weak = std::sync::Arc::downgrade(&liveness);
1011        let background_executor = BackgroundExecutor::new(arc_dispatcher.clone());
1012        let foreground_executor = ForegroundExecutor::new(arc_dispatcher, liveness_weak);
1013
1014        let platform = TestPlatform::new(background_executor.clone(), foreground_executor);
1015        let asset_source = Arc::new(());
1016        let http_client = http_client::FakeHttpClient::with_404_response();
1017
1018        let app = App::new_app(platform, liveness, asset_source, http_client);
1019        (dispatcher, background_executor, app)
1020    }
1021
1022    #[test]
1023    fn sanity_test_tasks_run() {
1024        let (dispatcher, _background_executor, app) = create_test_app();
1025        let foreground_executor = app.borrow().foreground_executor.clone();
1026
1027        let task_ran = Rc::new(RefCell::new(false));
1028
1029        foreground_executor
1030            .spawn({
1031                let task_ran = Rc::clone(&task_ran);
1032                async move {
1033                    *task_ran.borrow_mut() = true;
1034                }
1035            })
1036            .detach();
1037
1038        // Run dispatcher while app is still alive
1039        dispatcher.run_until_parked();
1040
1041        // Task should have run
1042        assert!(
1043            *task_ran.borrow(),
1044            "Task should run normally when app is alive"
1045        );
1046    }
1047
1048    #[test]
1049    fn test_task_cancelled_when_app_dropped() {
1050        let (dispatcher, _background_executor, app) = create_test_app();
1051        let foreground_executor = app.borrow().foreground_executor.clone();
1052        let app_weak = Rc::downgrade(&app);
1053
1054        let task_ran = Rc::new(RefCell::new(false));
1055        let task_ran_clone = Rc::clone(&task_ran);
1056
1057        foreground_executor
1058            .spawn(async move {
1059                *task_ran_clone.borrow_mut() = true;
1060            })
1061            .detach();
1062
1063        drop(app);
1064
1065        assert!(app_weak.upgrade().is_none(), "App should have been dropped");
1066
1067        dispatcher.run_until_parked();
1068
1069        // The task should have been cancelled, not run
1070        assert!(
1071            !*task_ran.borrow(),
1072            "Task should have been cancelled when app was dropped, but it ran!"
1073        );
1074    }
1075
1076    #[test]
1077    fn test_nested_tasks_both_cancel() {
1078        let (dispatcher, _background_executor, app) = create_test_app();
1079        let foreground_executor = app.borrow().foreground_executor.clone();
1080        let app_weak = Rc::downgrade(&app);
1081
1082        let outer_completed = Rc::new(RefCell::new(false));
1083        let inner_completed = Rc::new(RefCell::new(false));
1084        let reached_await = Rc::new(RefCell::new(false));
1085
1086        let outer_flag = Rc::clone(&outer_completed);
1087        let inner_flag = Rc::clone(&inner_completed);
1088        let await_flag = Rc::clone(&reached_await);
1089
1090        // Channel to block the inner task until we're ready
1091        let (tx, rx) = futures::channel::oneshot::channel::<()>();
1092
1093        let inner_executor = foreground_executor.clone();
1094
1095        foreground_executor
1096            .spawn(async move {
1097                let inner_task = inner_executor.spawn({
1098                    let inner_flag = Rc::clone(&inner_flag);
1099                    async move {
1100                        rx.await.ok();
1101                        *inner_flag.borrow_mut() = true;
1102                    }
1103                });
1104
1105                *await_flag.borrow_mut() = true;
1106
1107                inner_task.await;
1108
1109                *outer_flag.borrow_mut() = true;
1110            })
1111            .detach();
1112
1113        // Run dispatcher until outer task reaches the await point
1114        // The inner task will be blocked on the channel
1115        dispatcher.run_until_parked();
1116
1117        // Verify we actually reached the await point before dropping the app
1118        assert!(
1119            *reached_await.borrow(),
1120            "Outer task should have reached the await point"
1121        );
1122
1123        // Neither task should have completed yet
1124        assert!(
1125            !*outer_completed.borrow(),
1126            "Outer task should not have completed yet"
1127        );
1128        assert!(
1129            !*inner_completed.borrow(),
1130            "Inner task should not have completed yet"
1131        );
1132
1133        // Drop the channel sender and app while outer is awaiting inner
1134        drop(tx);
1135        drop(app);
1136        assert!(app_weak.upgrade().is_none(), "App should have been dropped");
1137
1138        // Run dispatcher - both tasks should be cancelled
1139        dispatcher.run_until_parked();
1140
1141        // Neither task should have completed (both were cancelled)
1142        assert!(
1143            !*outer_completed.borrow(),
1144            "Outer task should have been cancelled, not completed"
1145        );
1146        assert!(
1147            !*inner_completed.borrow(),
1148            "Inner task should have been cancelled, not completed"
1149        );
1150    }
1151
1152    #[test]
1153    #[should_panic]
1154    fn test_polling_cancelled_task_panics() {
1155        let (dispatcher, background_executor, app) = create_test_app();
1156        let foreground_executor = app.borrow().foreground_executor.clone();
1157        let app_weak = Rc::downgrade(&app);
1158
1159        let task = foreground_executor.spawn(async move { 42 });
1160
1161        drop(app);
1162
1163        assert!(app_weak.upgrade().is_none(), "App should have been dropped");
1164
1165        dispatcher.run_until_parked();
1166
1167        background_executor.block(task);
1168    }
1169
1170    #[test]
1171    fn test_polling_cancelled_task_returns_none_with_fallible() {
1172        let (dispatcher, background_executor, app) = create_test_app();
1173        let foreground_executor = app.borrow().foreground_executor.clone();
1174        let app_weak = Rc::downgrade(&app);
1175
1176        let task = foreground_executor.spawn(async move { 42 }).fallible();
1177
1178        drop(app);
1179
1180        assert!(app_weak.upgrade().is_none(), "App should have been dropped");
1181
1182        dispatcher.run_until_parked();
1183
1184        let result = background_executor.block(task);
1185        assert_eq!(result, None, "Cancelled task should return None");
1186    }
1187}