executor.rs

   1use crate::{App, PlatformDispatcher, RunnableMeta, RunnableVariant, TaskTiming, profiler};
   2use async_task::Runnable;
   3use futures::channel::mpsc;
   4use parking_lot::{Condvar, Mutex};
   5use smol::prelude::*;
   6use std::{
   7    fmt::Debug,
   8    marker::PhantomData,
   9    mem::{self, ManuallyDrop},
  10    num::NonZeroUsize,
  11    panic::Location,
  12    pin::Pin,
  13    rc::Rc,
  14    sync::{
  15        Arc,
  16        atomic::{AtomicUsize, Ordering},
  17    },
  18    task::{Context, Poll},
  19    thread::{self, ThreadId},
  20    time::{Duration, Instant},
  21};
  22use util::TryFutureExt as _;
  23use waker_fn::waker_fn;
  24
  25#[cfg(any(test, feature = "test-support"))]
  26use rand::rngs::StdRng;
  27
  28/// A pointer to the executor that is currently running,
  29/// for spawning background tasks.
  30#[derive(Clone)]
  31pub struct BackgroundExecutor {
  32    #[doc(hidden)]
  33    pub dispatcher: Arc<dyn PlatformDispatcher>,
  34}
  35
  36/// A pointer to the executor that is currently running,
  37/// for spawning tasks on the main thread.
  38///
  39/// This is intentionally `!Send` via the `not_send` marker field. This is because
  40/// `ForegroundExecutor::spawn` does not require `Send` but checks at runtime that the future is
  41/// only polled from the same thread it was spawned from. These checks would fail when spawning
  42/// foreground tasks from background threads.
  43#[derive(Clone)]
  44pub struct ForegroundExecutor {
  45    #[doc(hidden)]
  46    pub dispatcher: Arc<dyn PlatformDispatcher>,
  47    liveness: std::sync::Weak<()>,
  48    not_send: PhantomData<Rc<()>>,
  49}
  50
  51/// Realtime task priority
  52#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
  53#[repr(u8)]
  54pub enum RealtimePriority {
  55    /// Audio task
  56    Audio,
  57    /// Other realtime task
  58    #[default]
  59    Other,
  60}
  61
  62/// Task priority
  63#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
  64#[repr(u8)]
  65pub enum Priority {
  66    /// Realtime priority
  67    ///
  68    /// Spawning a task with this priority will spin it off on a separate thread dedicated just to that task.
  69    Realtime(RealtimePriority),
  70    /// High priority
  71    ///
  72    /// Only use for tasks that are critical to the user experience / responsiveness of the editor.
  73    High,
  74    /// Medium priority, probably suits most of your use cases.
  75    #[default]
  76    Medium,
  77    /// Low priority
  78    ///
  79    /// Prioritize this for background work that can come in large quantities
  80    /// to not starve the executor of resources for high priority tasks
  81    Low,
  82}
  83
  84impl Priority {
  85    #[allow(dead_code)]
  86    pub(crate) const fn probability(&self) -> u32 {
  87        match self {
  88            // realtime priorities are not considered for probability scheduling
  89            Priority::Realtime(_) => 0,
  90            Priority::High => 60,
  91            Priority::Medium => 30,
  92            Priority::Low => 10,
  93        }
  94    }
  95}
  96
  97/// Task is a primitive that allows work to happen in the background.
  98///
  99/// It implements [`Future`] so you can `.await` on it.
 100///
 101/// If you drop a task it will be cancelled immediately. Calling [`Task::detach`] allows
 102/// the task to continue running, but with no way to return a value.
 103#[must_use]
 104#[derive(Debug)]
 105pub struct Task<T>(TaskState<T>);
 106
 107#[derive(Debug)]
 108enum TaskState<T> {
 109    /// A task that is ready to return a value
 110    Ready(Option<T>),
 111
 112    /// A task that is currently running.
 113    Spawned(async_task::Task<T, RunnableMeta>),
 114}
 115
 116impl<T> Task<T> {
 117    /// Creates a new task that will resolve with the value
 118    pub fn ready(val: T) -> Self {
 119        Task(TaskState::Ready(Some(val)))
 120    }
 121
 122    /// Detaching a task runs it to completion in the background
 123    pub fn detach(self) {
 124        match self {
 125            Task(TaskState::Ready(_)) => {}
 126            Task(TaskState::Spawned(task)) => task.detach(),
 127        }
 128    }
 129
 130    /// Converts this task into a fallible task that returns `Option<T>`.
 131    ///
 132    /// Unlike the standard `Task<T>`, a [`FallibleTask`] will return `None`
 133    /// if the app was dropped while the task is executing.
 134    ///
 135    /// # Example
 136    ///
 137    /// ```ignore
 138    /// // Background task that gracefully handles app shutdown:
 139    /// cx.background_spawn(async move {
 140    ///     let result = foreground_task.fallible().await;
 141    ///     if let Some(value) = result {
 142    ///         // Process the value
 143    ///     }
 144    ///     // If None, app was shut down - just exit gracefully
 145    /// }).detach();
 146    /// ```
 147    pub fn fallible(self) -> FallibleTask<T> {
 148        FallibleTask(match self.0 {
 149            TaskState::Ready(val) => FallibleTaskState::Ready(val),
 150            TaskState::Spawned(task) => FallibleTaskState::Spawned(task.fallible()),
 151        })
 152    }
 153}
 154
 155impl<E, T> Task<Result<T, E>>
 156where
 157    T: 'static,
 158    E: 'static + Debug,
 159{
 160    /// Run the task to completion in the background and log any
 161    /// errors that occur.
 162    #[track_caller]
 163    pub fn detach_and_log_err(self, cx: &App) {
 164        let location = core::panic::Location::caller();
 165        cx.foreground_executor()
 166            .spawn(self.log_tracked_err(*location))
 167            .detach();
 168    }
 169}
 170
 171impl<T> Future for Task<T> {
 172    type Output = T;
 173
 174    fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
 175        match unsafe { self.get_unchecked_mut() } {
 176            Task(TaskState::Ready(val)) => Poll::Ready(val.take().unwrap()),
 177            Task(TaskState::Spawned(task)) => task.poll(cx),
 178        }
 179    }
 180}
 181
 182/// A task that returns `Option<T>` instead of panicking when cancelled.
 183#[must_use]
 184pub struct FallibleTask<T>(FallibleTaskState<T>);
 185
 186enum FallibleTaskState<T> {
 187    /// A task that is ready to return a value
 188    Ready(Option<T>),
 189
 190    /// A task that is currently running (wraps async_task::FallibleTask).
 191    Spawned(async_task::FallibleTask<T, RunnableMeta>),
 192}
 193
 194impl<T> FallibleTask<T> {
 195    /// Creates a new fallible task that will resolve with the value.
 196    pub fn ready(val: T) -> Self {
 197        FallibleTask(FallibleTaskState::Ready(Some(val)))
 198    }
 199
 200    /// Detaching a task runs it to completion in the background.
 201    pub fn detach(self) {
 202        match self.0 {
 203            FallibleTaskState::Ready(_) => {}
 204            FallibleTaskState::Spawned(task) => task.detach(),
 205        }
 206    }
 207}
 208
 209impl<T> Future for FallibleTask<T> {
 210    type Output = Option<T>;
 211
 212    fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
 213        match unsafe { self.get_unchecked_mut() } {
 214            FallibleTask(FallibleTaskState::Ready(val)) => Poll::Ready(val.take()),
 215            FallibleTask(FallibleTaskState::Spawned(task)) => Pin::new(task).poll(cx),
 216        }
 217    }
 218}
 219
 220impl<T> std::fmt::Debug for FallibleTask<T> {
 221    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
 222        match &self.0 {
 223            FallibleTaskState::Ready(_) => f.debug_tuple("FallibleTask::Ready").finish(),
 224            FallibleTaskState::Spawned(task) => {
 225                f.debug_tuple("FallibleTask::Spawned").field(task).finish()
 226            }
 227        }
 228    }
 229}
 230
 231/// A task label is an opaque identifier that you can use to
 232/// refer to a task in tests.
 233#[derive(Clone, Copy, PartialEq, Eq, Hash, Debug)]
 234pub struct TaskLabel(NonZeroUsize);
 235
 236impl Default for TaskLabel {
 237    fn default() -> Self {
 238        Self::new()
 239    }
 240}
 241
 242impl TaskLabel {
 243    /// Construct a new task label.
 244    pub fn new() -> Self {
 245        static NEXT_TASK_LABEL: AtomicUsize = AtomicUsize::new(1);
 246        Self(
 247            NEXT_TASK_LABEL
 248                .fetch_add(1, Ordering::SeqCst)
 249                .try_into()
 250                .unwrap(),
 251        )
 252    }
 253}
 254
 255type AnyLocalFuture<R> = Pin<Box<dyn 'static + Future<Output = R>>>;
 256
 257type AnyFuture<R> = Pin<Box<dyn 'static + Send + Future<Output = R>>>;
 258
 259/// BackgroundExecutor lets you run things on background threads.
 260/// In production this is a thread pool with no ordering guarantees.
 261/// In tests this is simulated by running tasks one by one in a deterministic
 262/// (but arbitrary) order controlled by the `SEED` environment variable.
 263impl BackgroundExecutor {
 264    #[doc(hidden)]
 265    pub fn new(dispatcher: Arc<dyn PlatformDispatcher>) -> Self {
 266        Self { dispatcher }
 267    }
 268
 269    /// Enqueues the given future to be run to completion on a background thread.
 270    #[track_caller]
 271    pub fn spawn<R>(&self, future: impl Future<Output = R> + Send + 'static) -> Task<R>
 272    where
 273        R: Send + 'static,
 274    {
 275        self.spawn_with_priority(Priority::default(), future)
 276    }
 277
 278    /// Enqueues the given future to be run to completion on a background thread.
 279    #[track_caller]
 280    pub fn spawn_with_priority<R>(
 281        &self,
 282        priority: Priority,
 283        future: impl Future<Output = R> + Send + 'static,
 284    ) -> Task<R>
 285    where
 286        R: Send + 'static,
 287    {
 288        self.spawn_internal::<R>(Box::pin(future), None, priority)
 289    }
 290
 291    /// Enqueues the given future to be run to completion on a background thread and blocking the current task on it.
 292    ///
 293    /// This allows to spawn background work that borrows from its scope. Note that the supplied future will run to
 294    /// completion before the current task is resumed, even if the current task is slated for cancellation.
 295    pub async fn await_on_background<R>(&self, future: impl Future<Output = R> + Send) -> R
 296    where
 297        R: Send,
 298    {
 299        // We need to ensure that cancellation of the parent task does not drop the environment
 300        // before the our own task has completed or got cancelled.
 301        struct NotifyOnDrop<'a>(&'a (Condvar, Mutex<bool>));
 302
 303        impl Drop for NotifyOnDrop<'_> {
 304            fn drop(&mut self) {
 305                *self.0.1.lock() = true;
 306                self.0.0.notify_all();
 307            }
 308        }
 309
 310        struct WaitOnDrop<'a>(&'a (Condvar, Mutex<bool>));
 311
 312        impl Drop for WaitOnDrop<'_> {
 313            fn drop(&mut self) {
 314                let mut done = self.0.1.lock();
 315                if !*done {
 316                    self.0.0.wait(&mut done);
 317                }
 318            }
 319        }
 320
 321        let dispatcher = self.dispatcher.clone();
 322        let location = core::panic::Location::caller();
 323
 324        let pair = &(Condvar::new(), Mutex::new(false));
 325        let _wait_guard = WaitOnDrop(pair);
 326
 327        let (runnable, task) = unsafe {
 328            async_task::Builder::new()
 329                .metadata(RunnableMeta {
 330                    location,
 331                    app: None,
 332                })
 333                .spawn_unchecked(
 334                    move |_| async {
 335                        let _notify_guard = NotifyOnDrop(pair);
 336                        future.await
 337                    },
 338                    move |runnable| {
 339                        dispatcher.dispatch(
 340                            RunnableVariant::Meta(runnable),
 341                            None,
 342                            Priority::default(),
 343                        )
 344                    },
 345                )
 346        };
 347        runnable.schedule();
 348        task.await
 349    }
 350
 351    /// Enqueues the given future to be run to completion on a background thread.
 352    /// The given label can be used to control the priority of the task in tests.
 353    #[track_caller]
 354    pub fn spawn_labeled<R>(
 355        &self,
 356        label: TaskLabel,
 357        future: impl Future<Output = R> + Send + 'static,
 358    ) -> Task<R>
 359    where
 360        R: Send + 'static,
 361    {
 362        self.spawn_internal::<R>(Box::pin(future), Some(label), Priority::default())
 363    }
 364
 365    #[track_caller]
 366    fn spawn_internal<R: Send + 'static>(
 367        &self,
 368        future: AnyFuture<R>,
 369        label: Option<TaskLabel>,
 370        priority: Priority,
 371    ) -> Task<R> {
 372        let dispatcher = self.dispatcher.clone();
 373        let (runnable, task) = if let Priority::Realtime(realtime) = priority {
 374            let location = core::panic::Location::caller();
 375            let (mut tx, rx) = flume::bounded::<Runnable<RunnableMeta>>(1);
 376
 377            dispatcher.spawn_realtime(
 378                realtime,
 379                Box::new(move || {
 380                    while let Ok(runnable) = rx.recv() {
 381                        let start = Instant::now();
 382                        let location = runnable.metadata().location;
 383                        let mut timing = TaskTiming {
 384                            location,
 385                            start,
 386                            end: None,
 387                        };
 388                        profiler::add_task_timing(timing);
 389
 390                        runnable.run();
 391
 392                        let end = Instant::now();
 393                        timing.end = Some(end);
 394                        profiler::add_task_timing(timing);
 395                    }
 396                }),
 397            );
 398
 399            async_task::Builder::new()
 400                .metadata(RunnableMeta {
 401                    location,
 402                    app: None,
 403                })
 404                .spawn(
 405                    move |_| future,
 406                    move |runnable| {
 407                        let _ = tx.send(runnable);
 408                    },
 409                )
 410        } else {
 411            let location = core::panic::Location::caller();
 412            async_task::Builder::new()
 413                .metadata(RunnableMeta {
 414                    location,
 415                    app: None,
 416                })
 417                .spawn(
 418                    move |_| future,
 419                    move |runnable| {
 420                        dispatcher.dispatch(RunnableVariant::Meta(runnable), label, priority)
 421                    },
 422                )
 423        };
 424
 425        runnable.schedule();
 426        Task(TaskState::Spawned(task))
 427    }
 428
 429    /// Used by the test harness to run an async test in a synchronous fashion.
 430    #[cfg(any(test, feature = "test-support"))]
 431    #[track_caller]
 432    pub fn block_test<R>(&self, future: impl Future<Output = R>) -> R {
 433        if let Ok(value) = self.block_internal(false, future, None) {
 434            value
 435        } else {
 436            unreachable!()
 437        }
 438    }
 439
 440    /// Block the current thread until the given future resolves.
 441    /// Consider using `block_with_timeout` instead.
 442    pub fn block<R>(&self, future: impl Future<Output = R>) -> R {
 443        if let Ok(value) = self.block_internal(true, future, None) {
 444            value
 445        } else {
 446            unreachable!()
 447        }
 448    }
 449
 450    #[cfg(not(any(test, feature = "test-support")))]
 451    pub(crate) fn block_internal<Fut: Future>(
 452        &self,
 453        _background_only: bool,
 454        future: Fut,
 455        timeout: Option<Duration>,
 456    ) -> Result<Fut::Output, impl Future<Output = Fut::Output> + use<Fut>> {
 457        use std::time::Instant;
 458
 459        let mut future = Box::pin(future);
 460        if timeout == Some(Duration::ZERO) {
 461            return Err(future);
 462        }
 463        let deadline = timeout.map(|timeout| Instant::now() + timeout);
 464
 465        let parker = parking::Parker::new();
 466        let unparker = parker.unparker();
 467        let waker = waker_fn(move || {
 468            unparker.unpark();
 469        });
 470        let mut cx = std::task::Context::from_waker(&waker);
 471
 472        loop {
 473            match future.as_mut().poll(&mut cx) {
 474                Poll::Ready(result) => return Ok(result),
 475                Poll::Pending => {
 476                    let timeout =
 477                        deadline.map(|deadline| deadline.saturating_duration_since(Instant::now()));
 478                    if let Some(timeout) = timeout {
 479                        if !parker.park_timeout(timeout)
 480                            && deadline.is_some_and(|deadline| deadline < Instant::now())
 481                        {
 482                            return Err(future);
 483                        }
 484                    } else {
 485                        parker.park();
 486                    }
 487                }
 488            }
 489        }
 490    }
 491
 492    #[cfg(any(test, feature = "test-support"))]
 493    #[track_caller]
 494    pub(crate) fn block_internal<Fut: Future>(
 495        &self,
 496        background_only: bool,
 497        future: Fut,
 498        timeout: Option<Duration>,
 499    ) -> Result<Fut::Output, impl Future<Output = Fut::Output> + use<Fut>> {
 500        use std::sync::atomic::AtomicBool;
 501        use std::time::Instant;
 502
 503        use parking::Parker;
 504
 505        let mut future = Box::pin(future);
 506        if timeout == Some(Duration::ZERO) {
 507            return Err(future);
 508        }
 509
 510        // When using a real platform (e.g., MacPlatform for visual tests that need actual
 511        // Metal rendering), there's no test dispatcher. In this case, we block the thread
 512        // directly by polling the future and parking until woken. This is required for
 513        // VisualTestAppContext which uses real platform rendering but still needs blocking
 514        // behavior for code paths like editor initialization that call block_with_timeout.
 515        let Some(dispatcher) = self.dispatcher.as_test() else {
 516            let deadline = timeout.map(|timeout| Instant::now() + timeout);
 517
 518            let parker = Parker::new();
 519            let unparker = parker.unparker();
 520            let waker = waker_fn(move || {
 521                unparker.unpark();
 522            });
 523            let mut cx = std::task::Context::from_waker(&waker);
 524
 525            loop {
 526                match future.as_mut().poll(&mut cx) {
 527                    Poll::Ready(result) => return Ok(result),
 528                    Poll::Pending => {
 529                        let timeout = deadline
 530                            .map(|deadline| deadline.saturating_duration_since(Instant::now()));
 531                        if let Some(timeout) = timeout {
 532                            if !parker.park_timeout(timeout)
 533                                && deadline.is_some_and(|deadline| deadline < Instant::now())
 534                            {
 535                                return Err(future);
 536                            }
 537                        } else {
 538                            parker.park();
 539                        }
 540                    }
 541                }
 542            }
 543        };
 544
 545        let mut max_ticks = if timeout.is_some() {
 546            dispatcher.gen_block_on_ticks()
 547        } else {
 548            usize::MAX
 549        };
 550
 551        let parker = Parker::new();
 552        let unparker = parker.unparker();
 553
 554        let awoken = Arc::new(AtomicBool::new(false));
 555        let waker = waker_fn({
 556            let awoken = awoken.clone();
 557            let unparker = unparker.clone();
 558            move || {
 559                awoken.store(true, Ordering::SeqCst);
 560                unparker.unpark();
 561            }
 562        });
 563        let mut cx = std::task::Context::from_waker(&waker);
 564
 565        let duration = Duration::from_secs(
 566            option_env!("GPUI_TEST_TIMEOUT")
 567                .and_then(|s| s.parse::<u64>().ok())
 568                .unwrap_or(180),
 569        );
 570        let mut test_should_end_by = Instant::now() + duration;
 571
 572        loop {
 573            match future.as_mut().poll(&mut cx) {
 574                Poll::Ready(result) => return Ok(result),
 575                Poll::Pending => {
 576                    if max_ticks == 0 {
 577                        return Err(future);
 578                    }
 579                    max_ticks -= 1;
 580
 581                    if !dispatcher.tick(background_only) {
 582                        if awoken.swap(false, Ordering::SeqCst) {
 583                            continue;
 584                        }
 585
 586                        if !dispatcher.parking_allowed() {
 587                            if dispatcher.advance_clock_to_next_delayed() {
 588                                continue;
 589                            }
 590                            let mut backtrace_message = String::new();
 591                            let mut waiting_message = String::new();
 592                            if let Some(backtrace) = dispatcher.waiting_backtrace() {
 593                                backtrace_message =
 594                                    format!("\nbacktrace of waiting future:\n{:?}", backtrace);
 595                            }
 596                            if let Some(waiting_hint) = dispatcher.waiting_hint() {
 597                                waiting_message = format!("\n  waiting on: {}\n", waiting_hint);
 598                            }
 599                            panic!(
 600                                "parked with nothing left to run{waiting_message}{backtrace_message}",
 601                            )
 602                        }
 603                        dispatcher.push_unparker(unparker.clone());
 604                        parker.park_timeout(Duration::from_millis(1));
 605                        if Instant::now() > test_should_end_by {
 606                            panic!("test timed out after {duration:?} with allow_parking")
 607                        }
 608                    }
 609                }
 610            }
 611        }
 612    }
 613
 614    /// Block the current thread until the given future resolves
 615    /// or `duration` has elapsed.
 616    pub fn block_with_timeout<Fut: Future>(
 617        &self,
 618        duration: Duration,
 619        future: Fut,
 620    ) -> Result<Fut::Output, impl Future<Output = Fut::Output> + use<Fut>> {
 621        self.block_internal(true, future, Some(duration))
 622    }
 623
 624    /// Scoped lets you start a number of tasks and waits
 625    /// for all of them to complete before returning.
 626    pub async fn scoped<'scope, F>(&self, scheduler: F)
 627    where
 628        F: FnOnce(&mut Scope<'scope>),
 629    {
 630        let mut scope = Scope::new(self.clone(), Priority::default());
 631        (scheduler)(&mut scope);
 632        let spawned = mem::take(&mut scope.futures)
 633            .into_iter()
 634            .map(|f| self.spawn_with_priority(scope.priority, f))
 635            .collect::<Vec<_>>();
 636        for task in spawned {
 637            task.await;
 638        }
 639    }
 640
 641    /// Scoped lets you start a number of tasks and waits
 642    /// for all of them to complete before returning.
 643    pub async fn scoped_priority<'scope, F>(&self, priority: Priority, scheduler: F)
 644    where
 645        F: FnOnce(&mut Scope<'scope>),
 646    {
 647        let mut scope = Scope::new(self.clone(), priority);
 648        (scheduler)(&mut scope);
 649        let spawned = mem::take(&mut scope.futures)
 650            .into_iter()
 651            .map(|f| self.spawn_with_priority(scope.priority, f))
 652            .collect::<Vec<_>>();
 653        for task in spawned {
 654            task.await;
 655        }
 656    }
 657
 658    /// Get the current time.
 659    ///
 660    /// Calling this instead of `std::time::Instant::now` allows the use
 661    /// of fake timers in tests.
 662    pub fn now(&self) -> Instant {
 663        self.dispatcher.now()
 664    }
 665
 666    /// Returns a task that will complete after the given duration.
 667    /// Depending on other concurrent tasks the elapsed duration may be longer
 668    /// than requested.
 669    pub fn timer(&self, duration: Duration) -> Task<()> {
 670        if duration.is_zero() {
 671            return Task::ready(());
 672        }
 673        let location = core::panic::Location::caller();
 674        let (runnable, task) = async_task::Builder::new()
 675            .metadata(RunnableMeta {
 676                location,
 677                app: None,
 678            })
 679            .spawn(move |_| async move {}, {
 680                let dispatcher = self.dispatcher.clone();
 681                move |runnable| dispatcher.dispatch_after(duration, RunnableVariant::Meta(runnable))
 682            });
 683        runnable.schedule();
 684        Task(TaskState::Spawned(task))
 685    }
 686
 687    /// in tests, start_waiting lets you indicate which task is waiting (for debugging only)
 688    #[cfg(any(test, feature = "test-support"))]
 689    pub fn start_waiting(&self) {
 690        self.dispatcher.as_test().unwrap().start_waiting();
 691    }
 692
 693    /// in tests, removes the debugging data added by start_waiting
 694    #[cfg(any(test, feature = "test-support"))]
 695    pub fn finish_waiting(&self) {
 696        self.dispatcher.as_test().unwrap().finish_waiting();
 697    }
 698
 699    /// in tests, run an arbitrary number of tasks (determined by the SEED environment variable)
 700    #[cfg(any(test, feature = "test-support"))]
 701    pub fn simulate_random_delay(&self) -> impl Future<Output = ()> + use<> {
 702        self.dispatcher.as_test().unwrap().simulate_random_delay()
 703    }
 704
 705    /// in tests, indicate that a given task from `spawn_labeled` should run after everything else
 706    #[cfg(any(test, feature = "test-support"))]
 707    pub fn deprioritize(&self, task_label: TaskLabel) {
 708        self.dispatcher.as_test().unwrap().deprioritize(task_label)
 709    }
 710
 711    /// in tests, move time forward. This does not run any tasks, but does make `timer`s ready.
 712    #[cfg(any(test, feature = "test-support"))]
 713    pub fn advance_clock(&self, duration: Duration) {
 714        self.dispatcher.as_test().unwrap().advance_clock(duration)
 715    }
 716
 717    /// in tests, run one task.
 718    #[cfg(any(test, feature = "test-support"))]
 719    pub fn tick(&self) -> bool {
 720        self.dispatcher.as_test().unwrap().tick(false)
 721    }
 722
 723    /// in tests, run all tasks that are ready to run. If after doing so
 724    /// the test still has outstanding tasks, this will panic. (See also [`Self::allow_parking`])
 725    #[cfg(any(test, feature = "test-support"))]
 726    pub fn run_until_parked(&self) {
 727        self.dispatcher.as_test().unwrap().run_until_parked()
 728    }
 729
 730    /// in tests, prevents `run_until_parked` from panicking if there are outstanding tasks.
 731    /// This is useful when you are integrating other (non-GPUI) futures, like disk access, that
 732    /// do take real async time to run.
 733    #[cfg(any(test, feature = "test-support"))]
 734    pub fn allow_parking(&self) {
 735        self.dispatcher.as_test().unwrap().allow_parking();
 736    }
 737
 738    /// undoes the effect of [`Self::allow_parking`].
 739    #[cfg(any(test, feature = "test-support"))]
 740    pub fn forbid_parking(&self) {
 741        self.dispatcher.as_test().unwrap().forbid_parking();
 742    }
 743
 744    /// adds detail to the "parked with nothing let to run" message.
 745    #[cfg(any(test, feature = "test-support"))]
 746    pub fn set_waiting_hint(&self, msg: Option<String>) {
 747        self.dispatcher.as_test().unwrap().set_waiting_hint(msg);
 748    }
 749
 750    /// in tests, returns the rng used by the dispatcher and seeded by the `SEED` environment variable
 751    #[cfg(any(test, feature = "test-support"))]
 752    pub fn rng(&self) -> StdRng {
 753        self.dispatcher.as_test().unwrap().rng()
 754    }
 755
 756    /// How many CPUs are available to the dispatcher.
 757    pub fn num_cpus(&self) -> usize {
 758        #[cfg(any(test, feature = "test-support"))]
 759        return 4;
 760
 761        #[cfg(not(any(test, feature = "test-support")))]
 762        return num_cpus::get();
 763    }
 764
 765    /// Whether we're on the main thread.
 766    pub fn is_main_thread(&self) -> bool {
 767        self.dispatcher.is_main_thread()
 768    }
 769
 770    #[cfg(any(test, feature = "test-support"))]
 771    /// in tests, control the number of ticks that `block_with_timeout` will run before timing out.
 772    pub fn set_block_on_ticks(&self, range: std::ops::RangeInclusive<usize>) {
 773        self.dispatcher.as_test().unwrap().set_block_on_ticks(range);
 774    }
 775}
 776
 777/// ForegroundExecutor runs things on the main thread.
 778impl ForegroundExecutor {
 779    /// Creates a new ForegroundExecutor from the given PlatformDispatcher.
 780    pub fn new(dispatcher: Arc<dyn PlatformDispatcher>, liveness: std::sync::Weak<()>) -> Self {
 781        Self {
 782            dispatcher,
 783            liveness,
 784            not_send: PhantomData,
 785        }
 786    }
 787
 788    /// Enqueues the given Task to run on the main thread at some point in the future.
 789    #[track_caller]
 790    pub fn spawn<R>(&self, future: impl Future<Output = R> + 'static) -> Task<R>
 791    where
 792        R: 'static,
 793    {
 794        self.inner_spawn(self.liveness.clone(), Priority::default(), future)
 795    }
 796
 797    /// Enqueues the given Task to run on the main thread at some point in the future.
 798    #[track_caller]
 799    pub fn spawn_with_priority<R>(
 800        &self,
 801        priority: Priority,
 802        future: impl Future<Output = R> + 'static,
 803    ) -> Task<R>
 804    where
 805        R: 'static,
 806    {
 807        self.inner_spawn(self.liveness.clone(), priority, future)
 808    }
 809
 810    #[track_caller]
 811    pub(crate) fn inner_spawn<R>(
 812        &self,
 813        app: std::sync::Weak<()>,
 814        priority: Priority,
 815        future: impl Future<Output = R> + 'static,
 816    ) -> Task<R>
 817    where
 818        R: 'static,
 819    {
 820        let dispatcher = self.dispatcher.clone();
 821        let location = core::panic::Location::caller();
 822
 823        #[track_caller]
 824        fn inner<R: 'static>(
 825            dispatcher: Arc<dyn PlatformDispatcher>,
 826            future: AnyLocalFuture<R>,
 827            location: &'static core::panic::Location<'static>,
 828            app: std::sync::Weak<()>,
 829            priority: Priority,
 830        ) -> Task<R> {
 831            let (runnable, task) = spawn_local_with_source_location(
 832                future,
 833                move |runnable| {
 834                    dispatcher.dispatch_on_main_thread(RunnableVariant::Meta(runnable), priority)
 835                },
 836                RunnableMeta {
 837                    location,
 838                    app: Some(app),
 839                },
 840            );
 841            runnable.schedule();
 842            Task(TaskState::Spawned(task))
 843        }
 844        inner::<R>(dispatcher, Box::pin(future), location, app, priority)
 845    }
 846}
 847
 848/// Variant of `async_task::spawn_local` that includes the source location of the spawn in panics.
 849///
 850/// Copy-modified from:
 851/// <https://github.com/smol-rs/async-task/blob/ca9dbe1db9c422fd765847fa91306e30a6bb58a9/src/runnable.rs#L405>
 852#[track_caller]
 853fn spawn_local_with_source_location<Fut, S, M>(
 854    future: Fut,
 855    schedule: S,
 856    metadata: M,
 857) -> (Runnable<M>, async_task::Task<Fut::Output, M>)
 858where
 859    Fut: Future + 'static,
 860    Fut::Output: 'static,
 861    S: async_task::Schedule<M> + Send + Sync + 'static,
 862    M: 'static,
 863{
 864    #[inline]
 865    fn thread_id() -> ThreadId {
 866        std::thread_local! {
 867            static ID: ThreadId = thread::current().id();
 868        }
 869        ID.try_with(|id| *id)
 870            .unwrap_or_else(|_| thread::current().id())
 871    }
 872
 873    struct Checked<F> {
 874        id: ThreadId,
 875        inner: ManuallyDrop<F>,
 876        location: &'static Location<'static>,
 877    }
 878
 879    impl<F> Drop for Checked<F> {
 880        fn drop(&mut self) {
 881            assert!(
 882                self.id == thread_id(),
 883                "local task dropped by a thread that didn't spawn it. Task spawned at {}",
 884                self.location
 885            );
 886            unsafe { ManuallyDrop::drop(&mut self.inner) };
 887        }
 888    }
 889
 890    impl<F: Future> Future for Checked<F> {
 891        type Output = F::Output;
 892
 893        fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
 894            assert!(
 895                self.id == thread_id(),
 896                "local task polled by a thread that didn't spawn it. Task spawned at {}",
 897                self.location
 898            );
 899            unsafe { self.map_unchecked_mut(|c| &mut *c.inner).poll(cx) }
 900        }
 901    }
 902
 903    // Wrap the future into one that checks which thread it's on.
 904    let future = Checked {
 905        id: thread_id(),
 906        inner: ManuallyDrop::new(future),
 907        location: Location::caller(),
 908    };
 909
 910    unsafe {
 911        async_task::Builder::new()
 912            .metadata(metadata)
 913            .spawn_unchecked(move |_| future, schedule)
 914    }
 915}
 916
 917/// Scope manages a set of tasks that are enqueued and waited on together. See [`BackgroundExecutor::scoped`].
 918pub struct Scope<'a> {
 919    executor: BackgroundExecutor,
 920    priority: Priority,
 921    futures: Vec<Pin<Box<dyn Future<Output = ()> + Send + 'static>>>,
 922    tx: Option<mpsc::Sender<()>>,
 923    rx: mpsc::Receiver<()>,
 924    lifetime: PhantomData<&'a ()>,
 925}
 926
 927impl<'a> Scope<'a> {
 928    fn new(executor: BackgroundExecutor, priority: Priority) -> Self {
 929        let (tx, rx) = mpsc::channel(1);
 930        Self {
 931            executor,
 932            priority,
 933            tx: Some(tx),
 934            rx,
 935            futures: Default::default(),
 936            lifetime: PhantomData,
 937        }
 938    }
 939
 940    /// How many CPUs are available to the dispatcher.
 941    pub fn num_cpus(&self) -> usize {
 942        self.executor.num_cpus()
 943    }
 944
 945    /// Spawn a future into this scope.
 946    #[track_caller]
 947    pub fn spawn<F>(&mut self, f: F)
 948    where
 949        F: Future<Output = ()> + Send + 'a,
 950    {
 951        let tx = self.tx.clone().unwrap();
 952
 953        // SAFETY: The 'a lifetime is guaranteed to outlive any of these futures because
 954        // dropping this `Scope` blocks until all of the futures have resolved.
 955        let f = unsafe {
 956            mem::transmute::<
 957                Pin<Box<dyn Future<Output = ()> + Send + 'a>>,
 958                Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
 959            >(Box::pin(async move {
 960                f.await;
 961                drop(tx);
 962            }))
 963        };
 964        self.futures.push(f);
 965    }
 966}
 967
 968impl Drop for Scope<'_> {
 969    fn drop(&mut self) {
 970        self.tx.take().unwrap();
 971
 972        // Wait until the channel is closed, which means that all of the spawned
 973        // futures have resolved.
 974        self.executor.block(self.rx.next());
 975    }
 976}
 977
 978#[cfg(test)]
 979mod test {
 980    use super::*;
 981    use crate::{App, TestDispatcher, TestPlatform};
 982    use rand::SeedableRng;
 983    use std::cell::RefCell;
 984
 985    /// Helper to create test infrastructure.
 986    /// Returns (dispatcher, background_executor, app) where app's foreground_executor has liveness.
 987    fn create_test_app() -> (TestDispatcher, BackgroundExecutor, Rc<crate::AppCell>) {
 988        let dispatcher = TestDispatcher::new(StdRng::seed_from_u64(0));
 989        let arc_dispatcher = Arc::new(dispatcher.clone());
 990        // Create liveness for task cancellation
 991        let liveness = std::sync::Arc::new(());
 992        let liveness_weak = std::sync::Arc::downgrade(&liveness);
 993        let background_executor = BackgroundExecutor::new(arc_dispatcher.clone());
 994        let foreground_executor = ForegroundExecutor::new(arc_dispatcher, liveness_weak);
 995
 996        let platform = TestPlatform::new(background_executor.clone(), foreground_executor);
 997        let asset_source = Arc::new(());
 998        let http_client = http_client::FakeHttpClient::with_404_response();
 999
1000        let app = App::new_app(platform, liveness, asset_source, http_client);
1001        (dispatcher, background_executor, app)
1002    }
1003
1004    #[test]
1005    fn sanity_test_tasks_run() {
1006        let (dispatcher, _background_executor, app) = create_test_app();
1007        let foreground_executor = app.borrow().foreground_executor.clone();
1008
1009        let task_ran = Rc::new(RefCell::new(false));
1010
1011        foreground_executor
1012            .spawn({
1013                let task_ran = Rc::clone(&task_ran);
1014                async move {
1015                    *task_ran.borrow_mut() = true;
1016                }
1017            })
1018            .detach();
1019
1020        // Run dispatcher while app is still alive
1021        dispatcher.run_until_parked();
1022
1023        // Task should have run
1024        assert!(
1025            *task_ran.borrow(),
1026            "Task should run normally when app is alive"
1027        );
1028    }
1029
1030    #[test]
1031    fn test_task_cancelled_when_app_dropped() {
1032        let (dispatcher, _background_executor, app) = create_test_app();
1033        let foreground_executor = app.borrow().foreground_executor.clone();
1034        let app_weak = Rc::downgrade(&app);
1035
1036        let task_ran = Rc::new(RefCell::new(false));
1037        let task_ran_clone = Rc::clone(&task_ran);
1038
1039        foreground_executor
1040            .spawn(async move {
1041                *task_ran_clone.borrow_mut() = true;
1042            })
1043            .detach();
1044
1045        drop(app);
1046
1047        assert!(app_weak.upgrade().is_none(), "App should have been dropped");
1048
1049        dispatcher.run_until_parked();
1050
1051        // The task should have been cancelled, not run
1052        assert!(
1053            !*task_ran.borrow(),
1054            "Task should have been cancelled when app was dropped, but it ran!"
1055        );
1056    }
1057
1058    #[test]
1059    fn test_nested_tasks_both_cancel() {
1060        let (dispatcher, _background_executor, app) = create_test_app();
1061        let foreground_executor = app.borrow().foreground_executor.clone();
1062        let app_weak = Rc::downgrade(&app);
1063
1064        let outer_completed = Rc::new(RefCell::new(false));
1065        let inner_completed = Rc::new(RefCell::new(false));
1066        let reached_await = Rc::new(RefCell::new(false));
1067
1068        let outer_flag = Rc::clone(&outer_completed);
1069        let inner_flag = Rc::clone(&inner_completed);
1070        let await_flag = Rc::clone(&reached_await);
1071
1072        // Channel to block the inner task until we're ready
1073        let (tx, rx) = futures::channel::oneshot::channel::<()>();
1074
1075        let inner_executor = foreground_executor.clone();
1076
1077        foreground_executor
1078            .spawn(async move {
1079                let inner_task = inner_executor.spawn({
1080                    let inner_flag = Rc::clone(&inner_flag);
1081                    async move {
1082                        rx.await.ok();
1083                        *inner_flag.borrow_mut() = true;
1084                    }
1085                });
1086
1087                *await_flag.borrow_mut() = true;
1088
1089                inner_task.await;
1090
1091                *outer_flag.borrow_mut() = true;
1092            })
1093            .detach();
1094
1095        // Run dispatcher until outer task reaches the await point
1096        // The inner task will be blocked on the channel
1097        dispatcher.run_until_parked();
1098
1099        // Verify we actually reached the await point before dropping the app
1100        assert!(
1101            *reached_await.borrow(),
1102            "Outer task should have reached the await point"
1103        );
1104
1105        // Neither task should have completed yet
1106        assert!(
1107            !*outer_completed.borrow(),
1108            "Outer task should not have completed yet"
1109        );
1110        assert!(
1111            !*inner_completed.borrow(),
1112            "Inner task should not have completed yet"
1113        );
1114
1115        // Drop the channel sender and app while outer is awaiting inner
1116        drop(tx);
1117        drop(app);
1118        assert!(app_weak.upgrade().is_none(), "App should have been dropped");
1119
1120        // Run dispatcher - both tasks should be cancelled
1121        dispatcher.run_until_parked();
1122
1123        // Neither task should have completed (both were cancelled)
1124        assert!(
1125            !*outer_completed.borrow(),
1126            "Outer task should have been cancelled, not completed"
1127        );
1128        assert!(
1129            !*inner_completed.borrow(),
1130            "Inner task should have been cancelled, not completed"
1131        );
1132    }
1133
1134    #[test]
1135    #[should_panic]
1136    fn test_polling_cancelled_task_panics() {
1137        let (dispatcher, background_executor, app) = create_test_app();
1138        let foreground_executor = app.borrow().foreground_executor.clone();
1139        let app_weak = Rc::downgrade(&app);
1140
1141        let task = foreground_executor.spawn(async move { 42 });
1142
1143        drop(app);
1144
1145        assert!(app_weak.upgrade().is_none(), "App should have been dropped");
1146
1147        dispatcher.run_until_parked();
1148
1149        background_executor.block(task);
1150    }
1151
1152    #[test]
1153    fn test_polling_cancelled_task_returns_none_with_fallible() {
1154        let (dispatcher, background_executor, app) = create_test_app();
1155        let foreground_executor = app.borrow().foreground_executor.clone();
1156        let app_weak = Rc::downgrade(&app);
1157
1158        let task = foreground_executor.spawn(async move { 42 }).fallible();
1159
1160        drop(app);
1161
1162        assert!(app_weak.upgrade().is_none(), "App should have been dropped");
1163
1164        dispatcher.run_until_parked();
1165
1166        let result = background_executor.block(task);
1167        assert_eq!(result, None, "Cancelled task should return None");
1168    }
1169}