executor.rs

   1use crate::{App, PlatformDispatcher, RunnableMeta, RunnableVariant, TaskTiming, profiler};
   2use async_task::Runnable;
   3use futures::channel::mpsc;
   4use parking_lot::{Condvar, Mutex};
   5use smol::prelude::*;
   6use std::{
   7    fmt::Debug,
   8    marker::PhantomData,
   9    mem::{self, ManuallyDrop},
  10    num::NonZeroUsize,
  11    panic::Location,
  12    pin::Pin,
  13    rc::Rc,
  14    sync::{
  15        Arc,
  16        atomic::{AtomicUsize, Ordering},
  17    },
  18    task::{Context, Poll},
  19    thread::{self, ThreadId},
  20    time::{Duration, Instant},
  21};
  22use util::TryFutureExt;
  23use waker_fn::waker_fn;
  24
  25#[cfg(any(test, feature = "test-support"))]
  26use rand::rngs::StdRng;
  27
  28/// A pointer to the executor that is currently running,
  29/// for spawning background tasks.
  30#[derive(Clone)]
  31pub struct BackgroundExecutor {
  32    #[doc(hidden)]
  33    pub dispatcher: Arc<dyn PlatformDispatcher>,
  34}
  35
  36/// A pointer to the executor that is currently running,
  37/// for spawning tasks on the main thread.
  38///
  39/// This is intentionally `!Send` via the `not_send` marker field. This is because
  40/// `ForegroundExecutor::spawn` does not require `Send` but checks at runtime that the future is
  41/// only polled from the same thread it was spawned from. These checks would fail when spawning
  42/// foreground tasks from background threads.
  43#[derive(Clone)]
  44pub struct ForegroundExecutor {
  45    #[doc(hidden)]
  46    pub dispatcher: Arc<dyn PlatformDispatcher>,
  47    not_send: PhantomData<Rc<()>>,
  48}
  49
  50/// Realtime task priority
  51#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
  52#[repr(u8)]
  53pub enum RealtimePriority {
  54    /// Audio task
  55    Audio,
  56    /// Other realtime task
  57    #[default]
  58    Other,
  59}
  60
  61/// Task priority
  62#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
  63#[repr(u8)]
  64pub enum Priority {
  65    /// Realtime priority
  66    ///
  67    /// Spawning a task with this priority will spin it off on a separate thread dedicated just to that task.
  68    Realtime(RealtimePriority),
  69    /// High priority
  70    ///
  71    /// Only use for tasks that are critical to the user experience / responsiveness of the editor.
  72    High,
  73    /// Medium priority, probably suits most of your use cases.
  74    #[default]
  75    Medium,
  76    /// Low priority
  77    ///
  78    /// Prioritize this for background work that can come in large quantities
  79    /// to not starve the executor of resources for high priority tasks
  80    Low,
  81}
  82
  83impl Priority {
  84    #[allow(dead_code)]
  85    pub(crate) const fn probability(&self) -> u32 {
  86        match self {
  87            // realtime priorities are not considered for probability scheduling
  88            Priority::Realtime(_) => 0,
  89            Priority::High => 60,
  90            Priority::Medium => 30,
  91            Priority::Low => 10,
  92        }
  93    }
  94}
  95
  96/// Task is a primitive that allows work to happen in the background.
  97///
  98/// It implements [`Future`] so you can `.await` on it.
  99///
 100/// If you drop a task it will be cancelled immediately. Calling [`Task::detach`] allows
 101/// the task to continue running, but with no way to return a value.
 102#[must_use]
 103#[derive(Debug)]
 104pub struct Task<T>(TaskState<T>);
 105
 106#[derive(Debug)]
 107enum TaskState<T> {
 108    /// A task that is ready to return a value
 109    Ready(Option<T>),
 110
 111    /// A task that is currently running.
 112    Spawned(async_task::Task<T, RunnableMeta>),
 113}
 114
 115impl<T> Task<T> {
 116    /// Creates a new task that will resolve with the value
 117    pub fn ready(val: T) -> Self {
 118        Task(TaskState::Ready(Some(val)))
 119    }
 120
 121    /// Detaching a task runs it to completion in the background
 122    pub fn detach(self) {
 123        match self {
 124            Task(TaskState::Ready(_)) => {}
 125            Task(TaskState::Spawned(task)) => task.detach(),
 126        }
 127    }
 128
 129    /// Converts this task into a fallible task that returns `Option<T>`.
 130    ///
 131    /// Unlike the standard `Task<T>`, a [`FallibleTask`] will return `None`
 132    /// if the app was dropped while the task is executing.
 133    ///
 134    /// # Example
 135    ///
 136    /// ```ignore
 137    /// // Background task that gracefully handles app shutdown:
 138    /// cx.background_spawn(async move {
 139    ///     let result = foreground_task.fallible().await;
 140    ///     if let Some(value) = result {
 141    ///         // Process the value
 142    ///     }
 143    ///     // If None, app was shut down - just exit gracefully
 144    /// }).detach();
 145    /// ```
 146    pub fn fallible(self) -> FallibleTask<T> {
 147        FallibleTask(match self.0 {
 148            TaskState::Ready(val) => FallibleTaskState::Ready(val),
 149            TaskState::Spawned(task) => FallibleTaskState::Spawned(task.fallible()),
 150        })
 151    }
 152}
 153
 154impl<E, T> Task<Result<T, E>>
 155where
 156    T: 'static,
 157    E: 'static + Debug,
 158{
 159    /// Run the task to completion in the background and log any
 160    /// errors that occur.
 161    #[track_caller]
 162    pub fn detach_and_log_err(self, cx: &App) {
 163        let location = core::panic::Location::caller();
 164        cx.foreground_executor()
 165            .spawn(self.log_tracked_err(*location))
 166            .detach();
 167    }
 168}
 169
 170impl<T> Future for Task<T> {
 171    type Output = T;
 172
 173    fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
 174        match unsafe { self.get_unchecked_mut() } {
 175            Task(TaskState::Ready(val)) => Poll::Ready(val.take().unwrap()),
 176            Task(TaskState::Spawned(task)) => task.poll(cx),
 177        }
 178    }
 179}
 180
 181/// A task that returns `Option<T>` instead of panicking when cancelled.
 182#[must_use]
 183pub struct FallibleTask<T>(FallibleTaskState<T>);
 184
 185enum FallibleTaskState<T> {
 186    /// A task that is ready to return a value
 187    Ready(Option<T>),
 188
 189    /// A task that is currently running (wraps async_task::FallibleTask).
 190    Spawned(async_task::FallibleTask<T, RunnableMeta>),
 191}
 192
 193impl<T> FallibleTask<T> {
 194    /// Creates a new fallible task that will resolve with the value.
 195    pub fn ready(val: T) -> Self {
 196        FallibleTask(FallibleTaskState::Ready(Some(val)))
 197    }
 198
 199    /// Detaching a task runs it to completion in the background.
 200    pub fn detach(self) {
 201        match self.0 {
 202            FallibleTaskState::Ready(_) => {}
 203            FallibleTaskState::Spawned(task) => task.detach(),
 204        }
 205    }
 206}
 207
 208impl<T> Future for FallibleTask<T> {
 209    type Output = Option<T>;
 210
 211    fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
 212        match unsafe { self.get_unchecked_mut() } {
 213            FallibleTask(FallibleTaskState::Ready(val)) => Poll::Ready(val.take()),
 214            FallibleTask(FallibleTaskState::Spawned(task)) => Pin::new(task).poll(cx),
 215        }
 216    }
 217}
 218
 219impl<T> std::fmt::Debug for FallibleTask<T> {
 220    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
 221        match &self.0 {
 222            FallibleTaskState::Ready(_) => f.debug_tuple("FallibleTask::Ready").finish(),
 223            FallibleTaskState::Spawned(task) => {
 224                f.debug_tuple("FallibleTask::Spawned").field(task).finish()
 225            }
 226        }
 227    }
 228}
 229
 230/// A task label is an opaque identifier that you can use to
 231/// refer to a task in tests.
 232#[derive(Clone, Copy, PartialEq, Eq, Hash, Debug)]
 233pub struct TaskLabel(NonZeroUsize);
 234
 235impl Default for TaskLabel {
 236    fn default() -> Self {
 237        Self::new()
 238    }
 239}
 240
 241impl TaskLabel {
 242    /// Construct a new task label.
 243    pub fn new() -> Self {
 244        static NEXT_TASK_LABEL: AtomicUsize = AtomicUsize::new(1);
 245        Self(
 246            NEXT_TASK_LABEL
 247                .fetch_add(1, Ordering::SeqCst)
 248                .try_into()
 249                .unwrap(),
 250        )
 251    }
 252}
 253
 254type AnyLocalFuture<R> = Pin<Box<dyn 'static + Future<Output = R>>>;
 255
 256type AnyFuture<R> = Pin<Box<dyn 'static + Send + Future<Output = R>>>;
 257
 258/// BackgroundExecutor lets you run things on background threads.
 259/// In production this is a thread pool with no ordering guarantees.
 260/// In tests this is simulated by running tasks one by one in a deterministic
 261/// (but arbitrary) order controlled by the `SEED` environment variable.
 262impl BackgroundExecutor {
 263    #[doc(hidden)]
 264    pub fn new(dispatcher: Arc<dyn PlatformDispatcher>) -> Self {
 265        Self { dispatcher }
 266    }
 267
 268    /// Enqueues the given future to be run to completion on a background thread.
 269    #[track_caller]
 270    pub fn spawn<R>(&self, future: impl Future<Output = R> + Send + 'static) -> Task<R>
 271    where
 272        R: Send + 'static,
 273    {
 274        self.spawn_with_priority(Priority::default(), future)
 275    }
 276
 277    /// Enqueues the given future to be run to completion on a background thread.
 278    #[track_caller]
 279    pub fn spawn_with_priority<R>(
 280        &self,
 281        priority: Priority,
 282        future: impl Future<Output = R> + Send + 'static,
 283    ) -> Task<R>
 284    where
 285        R: Send + 'static,
 286    {
 287        self.spawn_internal::<R>(Box::pin(future), None, priority)
 288    }
 289
 290    /// Enqueues the given future to be run to completion on a background thread and blocking the current task on it.
 291    ///
 292    /// This allows to spawn background work that borrows from its scope. Note that the supplied future will run to
 293    /// completion before the current task is resumed, even if the current task is slated for cancellation.
 294    pub async fn await_on_background<R>(&self, future: impl Future<Output = R> + Send) -> R
 295    where
 296        R: Send,
 297    {
 298        // We need to ensure that cancellation of the parent task does not drop the environment
 299        // before the our own task has completed or got cancelled.
 300        struct NotifyOnDrop<'a>(&'a (Condvar, Mutex<bool>));
 301
 302        impl Drop for NotifyOnDrop<'_> {
 303            fn drop(&mut self) {
 304                *self.0.1.lock() = true;
 305                self.0.0.notify_all();
 306            }
 307        }
 308
 309        struct WaitOnDrop<'a>(&'a (Condvar, Mutex<bool>));
 310
 311        impl Drop for WaitOnDrop<'_> {
 312            fn drop(&mut self) {
 313                let mut done = self.0.1.lock();
 314                if !*done {
 315                    self.0.0.wait(&mut done);
 316                }
 317            }
 318        }
 319
 320        let dispatcher = self.dispatcher.clone();
 321        let location = core::panic::Location::caller();
 322
 323        let pair = &(Condvar::new(), Mutex::new(false));
 324        let _wait_guard = WaitOnDrop(pair);
 325
 326        let (runnable, task) = unsafe {
 327            async_task::Builder::new()
 328                .metadata(RunnableMeta {
 329                    location,
 330                    app: None,
 331                })
 332                .spawn_unchecked(
 333                    move |_| async {
 334                        let _notify_guard = NotifyOnDrop(pair);
 335                        future.await
 336                    },
 337                    move |runnable| {
 338                        dispatcher.dispatch(
 339                            RunnableVariant::Meta(runnable),
 340                            None,
 341                            Priority::default(),
 342                        )
 343                    },
 344                )
 345        };
 346        runnable.schedule();
 347        task.await
 348    }
 349
 350    /// Enqueues the given future to be run to completion on a background thread.
 351    /// The given label can be used to control the priority of the task in tests.
 352    #[track_caller]
 353    pub fn spawn_labeled<R>(
 354        &self,
 355        label: TaskLabel,
 356        future: impl Future<Output = R> + Send + 'static,
 357    ) -> Task<R>
 358    where
 359        R: Send + 'static,
 360    {
 361        self.spawn_internal::<R>(Box::pin(future), Some(label), Priority::default())
 362    }
 363
 364    #[track_caller]
 365    fn spawn_internal<R: Send + 'static>(
 366        &self,
 367        future: AnyFuture<R>,
 368        label: Option<TaskLabel>,
 369        #[cfg_attr(
 370            target_os = "windows",
 371            expect(
 372                unused_variables,
 373                reason = "Multi priority scheduler is broken on windows"
 374            )
 375        )]
 376        priority: Priority,
 377    ) -> Task<R> {
 378        let dispatcher = self.dispatcher.clone();
 379        #[cfg(target_os = "windows")]
 380        let priority = Priority::Medium; // multi-prio scheduler is broken on windows
 381
 382        let (runnable, task) = if let Priority::Realtime(realtime) = priority {
 383            let location = core::panic::Location::caller();
 384            let (mut tx, rx) = flume::bounded::<Runnable<RunnableMeta>>(1);
 385
 386            dispatcher.spawn_realtime(
 387                realtime,
 388                Box::new(move || {
 389                    while let Ok(runnable) = rx.recv() {
 390                        let start = Instant::now();
 391                        let location = runnable.metadata().location;
 392                        let mut timing = TaskTiming {
 393                            location,
 394                            start,
 395                            end: None,
 396                        };
 397                        profiler::add_task_timing(timing);
 398
 399                        runnable.run();
 400
 401                        let end = Instant::now();
 402                        timing.end = Some(end);
 403                        profiler::add_task_timing(timing);
 404                    }
 405                }),
 406            );
 407
 408            async_task::Builder::new()
 409                .metadata(RunnableMeta {
 410                    location,
 411                    app: None,
 412                })
 413                .spawn(
 414                    move |_| future,
 415                    move |runnable| {
 416                        let _ = tx.send(runnable);
 417                    },
 418                )
 419        } else {
 420            let location = core::panic::Location::caller();
 421            async_task::Builder::new()
 422                .metadata(RunnableMeta {
 423                    location,
 424                    app: None,
 425                })
 426                .spawn(
 427                    move |_| future,
 428                    move |runnable| {
 429                        dispatcher.dispatch(RunnableVariant::Meta(runnable), label, priority)
 430                    },
 431                )
 432        };
 433
 434        runnable.schedule();
 435        Task(TaskState::Spawned(task))
 436    }
 437
 438    /// Used by the test harness to run an async test in a synchronous fashion.
 439    #[cfg(any(test, feature = "test-support"))]
 440    #[track_caller]
 441    pub fn block_test<R>(&self, future: impl Future<Output = R>) -> R {
 442        if let Ok(value) = self.block_internal(false, future, None) {
 443            value
 444        } else {
 445            unreachable!()
 446        }
 447    }
 448
 449    /// Block the current thread until the given future resolves.
 450    /// Consider using `block_with_timeout` instead.
 451    pub fn block<R>(&self, future: impl Future<Output = R>) -> R {
 452        if let Ok(value) = self.block_internal(true, future, None) {
 453            value
 454        } else {
 455            unreachable!()
 456        }
 457    }
 458
 459    #[cfg(not(any(test, feature = "test-support")))]
 460    pub(crate) fn block_internal<Fut: Future>(
 461        &self,
 462        _background_only: bool,
 463        future: Fut,
 464        timeout: Option<Duration>,
 465    ) -> Result<Fut::Output, impl Future<Output = Fut::Output> + use<Fut>> {
 466        use std::time::Instant;
 467
 468        let mut future = Box::pin(future);
 469        if timeout == Some(Duration::ZERO) {
 470            return Err(future);
 471        }
 472        let deadline = timeout.map(|timeout| Instant::now() + timeout);
 473
 474        let parker = parking::Parker::new();
 475        let unparker = parker.unparker();
 476        let waker = waker_fn(move || {
 477            unparker.unpark();
 478        });
 479        let mut cx = std::task::Context::from_waker(&waker);
 480
 481        loop {
 482            match future.as_mut().poll(&mut cx) {
 483                Poll::Ready(result) => return Ok(result),
 484                Poll::Pending => {
 485                    let timeout =
 486                        deadline.map(|deadline| deadline.saturating_duration_since(Instant::now()));
 487                    if let Some(timeout) = timeout {
 488                        if !parker.park_timeout(timeout)
 489                            && deadline.is_some_and(|deadline| deadline < Instant::now())
 490                        {
 491                            return Err(future);
 492                        }
 493                    } else {
 494                        parker.park();
 495                    }
 496                }
 497            }
 498        }
 499    }
 500
 501    #[cfg(any(test, feature = "test-support"))]
 502    #[track_caller]
 503    pub(crate) fn block_internal<Fut: Future>(
 504        &self,
 505        background_only: bool,
 506        future: Fut,
 507        timeout: Option<Duration>,
 508    ) -> Result<Fut::Output, impl Future<Output = Fut::Output> + use<Fut>> {
 509        use std::sync::atomic::AtomicBool;
 510
 511        use parking::Parker;
 512
 513        let mut future = Box::pin(future);
 514        if timeout == Some(Duration::ZERO) {
 515            return Err(future);
 516        }
 517        let Some(dispatcher) = self.dispatcher.as_test() else {
 518            return Err(future);
 519        };
 520
 521        let mut max_ticks = if timeout.is_some() {
 522            dispatcher.gen_block_on_ticks()
 523        } else {
 524            usize::MAX
 525        };
 526
 527        let parker = Parker::new();
 528        let unparker = parker.unparker();
 529
 530        let awoken = Arc::new(AtomicBool::new(false));
 531        let waker = waker_fn({
 532            let awoken = awoken.clone();
 533            let unparker = unparker.clone();
 534            move || {
 535                awoken.store(true, Ordering::SeqCst);
 536                unparker.unpark();
 537            }
 538        });
 539        let mut cx = std::task::Context::from_waker(&waker);
 540
 541        let duration = Duration::from_secs(
 542            option_env!("GPUI_TEST_TIMEOUT")
 543                .and_then(|s| s.parse::<u64>().ok())
 544                .unwrap_or(180),
 545        );
 546        let mut test_should_end_by = Instant::now() + duration;
 547
 548        loop {
 549            match future.as_mut().poll(&mut cx) {
 550                Poll::Ready(result) => return Ok(result),
 551                Poll::Pending => {
 552                    if max_ticks == 0 {
 553                        return Err(future);
 554                    }
 555                    max_ticks -= 1;
 556
 557                    if !dispatcher.tick(background_only) {
 558                        if awoken.swap(false, Ordering::SeqCst) {
 559                            continue;
 560                        }
 561
 562                        if !dispatcher.parking_allowed() {
 563                            if dispatcher.advance_clock_to_next_delayed() {
 564                                continue;
 565                            }
 566                            let mut backtrace_message = String::new();
 567                            let mut waiting_message = String::new();
 568                            if let Some(backtrace) = dispatcher.waiting_backtrace() {
 569                                backtrace_message =
 570                                    format!("\nbacktrace of waiting future:\n{:?}", backtrace);
 571                            }
 572                            if let Some(waiting_hint) = dispatcher.waiting_hint() {
 573                                waiting_message = format!("\n  waiting on: {}\n", waiting_hint);
 574                            }
 575                            panic!(
 576                                "parked with nothing left to run{waiting_message}{backtrace_message}",
 577                            )
 578                        }
 579                        dispatcher.push_unparker(unparker.clone());
 580                        parker.park_timeout(Duration::from_millis(1));
 581                        if Instant::now() > test_should_end_by {
 582                            panic!("test timed out after {duration:?} with allow_parking")
 583                        }
 584                    }
 585                }
 586            }
 587        }
 588    }
 589
 590    /// Block the current thread until the given future resolves
 591    /// or `duration` has elapsed.
 592    pub fn block_with_timeout<Fut: Future>(
 593        &self,
 594        duration: Duration,
 595        future: Fut,
 596    ) -> Result<Fut::Output, impl Future<Output = Fut::Output> + use<Fut>> {
 597        self.block_internal(true, future, Some(duration))
 598    }
 599
 600    /// Scoped lets you start a number of tasks and waits
 601    /// for all of them to complete before returning.
 602    pub async fn scoped<'scope, F>(&self, scheduler: F)
 603    where
 604        F: FnOnce(&mut Scope<'scope>),
 605    {
 606        let mut scope = Scope::new(self.clone(), Priority::default());
 607        (scheduler)(&mut scope);
 608        let spawned = mem::take(&mut scope.futures)
 609            .into_iter()
 610            .map(|f| self.spawn_with_priority(scope.priority, f))
 611            .collect::<Vec<_>>();
 612        for task in spawned {
 613            task.await;
 614        }
 615    }
 616
 617    /// Scoped lets you start a number of tasks and waits
 618    /// for all of them to complete before returning.
 619    pub async fn scoped_priority<'scope, F>(&self, priority: Priority, scheduler: F)
 620    where
 621        F: FnOnce(&mut Scope<'scope>),
 622    {
 623        let mut scope = Scope::new(self.clone(), priority);
 624        (scheduler)(&mut scope);
 625        let spawned = mem::take(&mut scope.futures)
 626            .into_iter()
 627            .map(|f| self.spawn_with_priority(scope.priority, f))
 628            .collect::<Vec<_>>();
 629        for task in spawned {
 630            task.await;
 631        }
 632    }
 633
 634    /// Get the current time.
 635    ///
 636    /// Calling this instead of `std::time::Instant::now` allows the use
 637    /// of fake timers in tests.
 638    pub fn now(&self) -> Instant {
 639        self.dispatcher.now()
 640    }
 641
 642    /// Returns a task that will complete after the given duration.
 643    /// Depending on other concurrent tasks the elapsed duration may be longer
 644    /// than requested.
 645    pub fn timer(&self, duration: Duration) -> Task<()> {
 646        if duration.is_zero() {
 647            return Task::ready(());
 648        }
 649        let location = core::panic::Location::caller();
 650        let (runnable, task) = async_task::Builder::new()
 651            .metadata(RunnableMeta {
 652                location,
 653                app: None,
 654            })
 655            .spawn(move |_| async move {}, {
 656                let dispatcher = self.dispatcher.clone();
 657                move |runnable| dispatcher.dispatch_after(duration, RunnableVariant::Meta(runnable))
 658            });
 659        runnable.schedule();
 660        Task(TaskState::Spawned(task))
 661    }
 662
 663    /// in tests, start_waiting lets you indicate which task is waiting (for debugging only)
 664    #[cfg(any(test, feature = "test-support"))]
 665    pub fn start_waiting(&self) {
 666        self.dispatcher.as_test().unwrap().start_waiting();
 667    }
 668
 669    /// in tests, removes the debugging data added by start_waiting
 670    #[cfg(any(test, feature = "test-support"))]
 671    pub fn finish_waiting(&self) {
 672        self.dispatcher.as_test().unwrap().finish_waiting();
 673    }
 674
 675    /// in tests, run an arbitrary number of tasks (determined by the SEED environment variable)
 676    #[cfg(any(test, feature = "test-support"))]
 677    pub fn simulate_random_delay(&self) -> impl Future<Output = ()> + use<> {
 678        self.dispatcher.as_test().unwrap().simulate_random_delay()
 679    }
 680
 681    /// in tests, indicate that a given task from `spawn_labeled` should run after everything else
 682    #[cfg(any(test, feature = "test-support"))]
 683    pub fn deprioritize(&self, task_label: TaskLabel) {
 684        self.dispatcher.as_test().unwrap().deprioritize(task_label)
 685    }
 686
 687    /// in tests, move time forward. This does not run any tasks, but does make `timer`s ready.
 688    #[cfg(any(test, feature = "test-support"))]
 689    pub fn advance_clock(&self, duration: Duration) {
 690        self.dispatcher.as_test().unwrap().advance_clock(duration)
 691    }
 692
 693    /// in tests, run one task.
 694    #[cfg(any(test, feature = "test-support"))]
 695    pub fn tick(&self) -> bool {
 696        self.dispatcher.as_test().unwrap().tick(false)
 697    }
 698
 699    /// in tests, run all tasks that are ready to run. If after doing so
 700    /// the test still has outstanding tasks, this will panic. (See also [`Self::allow_parking`])
 701    #[cfg(any(test, feature = "test-support"))]
 702    pub fn run_until_parked(&self) {
 703        self.dispatcher.as_test().unwrap().run_until_parked()
 704    }
 705
 706    /// in tests, prevents `run_until_parked` from panicking if there are outstanding tasks.
 707    /// This is useful when you are integrating other (non-GPUI) futures, like disk access, that
 708    /// do take real async time to run.
 709    #[cfg(any(test, feature = "test-support"))]
 710    pub fn allow_parking(&self) {
 711        self.dispatcher.as_test().unwrap().allow_parking();
 712    }
 713
 714    /// undoes the effect of [`Self::allow_parking`].
 715    #[cfg(any(test, feature = "test-support"))]
 716    pub fn forbid_parking(&self) {
 717        self.dispatcher.as_test().unwrap().forbid_parking();
 718    }
 719
 720    /// adds detail to the "parked with nothing let to run" message.
 721    #[cfg(any(test, feature = "test-support"))]
 722    pub fn set_waiting_hint(&self, msg: Option<String>) {
 723        self.dispatcher.as_test().unwrap().set_waiting_hint(msg);
 724    }
 725
 726    /// in tests, returns the rng used by the dispatcher and seeded by the `SEED` environment variable
 727    #[cfg(any(test, feature = "test-support"))]
 728    pub fn rng(&self) -> StdRng {
 729        self.dispatcher.as_test().unwrap().rng()
 730    }
 731
 732    /// How many CPUs are available to the dispatcher.
 733    pub fn num_cpus(&self) -> usize {
 734        #[cfg(any(test, feature = "test-support"))]
 735        return 4;
 736
 737        #[cfg(not(any(test, feature = "test-support")))]
 738        return num_cpus::get();
 739    }
 740
 741    /// Whether we're on the main thread.
 742    pub fn is_main_thread(&self) -> bool {
 743        self.dispatcher.is_main_thread()
 744    }
 745
 746    #[cfg(any(test, feature = "test-support"))]
 747    /// in tests, control the number of ticks that `block_with_timeout` will run before timing out.
 748    pub fn set_block_on_ticks(&self, range: std::ops::RangeInclusive<usize>) {
 749        self.dispatcher.as_test().unwrap().set_block_on_ticks(range);
 750    }
 751}
 752
 753/// ForegroundExecutor runs things on the main thread.
 754impl ForegroundExecutor {
 755    /// Creates a new ForegroundExecutor from the given PlatformDispatcher.
 756    pub fn new(dispatcher: Arc<dyn PlatformDispatcher>) -> Self {
 757        Self {
 758            dispatcher,
 759            not_send: PhantomData,
 760        }
 761    }
 762
 763    /// Enqueues the given Task to run on the main thread at some point in the future.
 764    #[track_caller]
 765    pub fn spawn<R>(&self, future: impl Future<Output = R> + 'static) -> Task<R>
 766    where
 767        R: 'static,
 768    {
 769        self.spawn_with_app_and_priority(None, Priority::default(), future)
 770    }
 771
 772    /// Enqueues the given Task to run on the main thread at some point in the future.
 773    #[track_caller]
 774    pub fn spawn_with_priority<R>(
 775        &self,
 776        priority: Priority,
 777        future: impl Future<Output = R> + 'static,
 778    ) -> Task<R>
 779    where
 780        R: 'static,
 781    {
 782        self.spawn_with_app_and_priority(None, priority, future)
 783    }
 784
 785    /// Enqueues the given Task to run on the main thread at some point in the future,
 786    /// with a weak reference to the app for cancellation checking.
 787    ///
 788    /// When the app is dropped, pending tasks spawned with this method will be cancelled
 789    /// before they run, rather than panicking when they try to access the dropped app.
 790    #[track_caller]
 791    pub fn spawn_with_app<R>(
 792        &self,
 793        app: std::rc::Weak<crate::AppCell>,
 794        future: impl Future<Output = R> + 'static,
 795    ) -> Task<R>
 796    where
 797        R: 'static,
 798    {
 799        self.spawn_with_app_and_priority(Some(app), Priority::default(), future)
 800    }
 801
 802    /// Enqueues the given Task to run on the main thread at some point in the future,
 803    /// with an optional weak reference to the app for cancellation checking and a specific priority.
 804    #[track_caller]
 805    pub fn spawn_with_app_and_priority<R>(
 806        &self,
 807        app: Option<std::rc::Weak<crate::AppCell>>,
 808        priority: Priority,
 809        future: impl Future<Output = R> + 'static,
 810    ) -> Task<R>
 811    where
 812        R: 'static,
 813    {
 814        let dispatcher = self.dispatcher.clone();
 815        let location = core::panic::Location::caller();
 816
 817        #[track_caller]
 818        fn inner<R: 'static>(
 819            dispatcher: Arc<dyn PlatformDispatcher>,
 820            future: AnyLocalFuture<R>,
 821            location: &'static core::panic::Location<'static>,
 822            app: Option<std::rc::Weak<crate::AppCell>>,
 823            priority: Priority,
 824        ) -> Task<R> {
 825            // SAFETY: We are on the main thread (ForegroundExecutor is !Send), and the
 826            // MainThreadWeak will only be accessed on the main thread in the trampoline.
 827            let app_weak = app.map(|weak| unsafe { crate::MainThreadWeak::new(weak) });
 828            let (runnable, task) = spawn_local_with_source_location(
 829                future,
 830                move |runnable| {
 831                    dispatcher.dispatch_on_main_thread(RunnableVariant::Meta(runnable), priority)
 832                },
 833                RunnableMeta {
 834                    location,
 835                    app: app_weak,
 836                },
 837            );
 838            runnable.schedule();
 839            Task(TaskState::Spawned(task))
 840        }
 841        inner::<R>(dispatcher, Box::pin(future), location, app, priority)
 842    }
 843}
 844
 845/// Variant of `async_task::spawn_local` that includes the source location of the spawn in panics.
 846///
 847/// Copy-modified from:
 848/// <https://github.com/smol-rs/async-task/blob/ca9dbe1db9c422fd765847fa91306e30a6bb58a9/src/runnable.rs#L405>
 849#[track_caller]
 850fn spawn_local_with_source_location<Fut, S, M>(
 851    future: Fut,
 852    schedule: S,
 853    metadata: M,
 854) -> (Runnable<M>, async_task::Task<Fut::Output, M>)
 855where
 856    Fut: Future + 'static,
 857    Fut::Output: 'static,
 858    S: async_task::Schedule<M> + Send + Sync + 'static,
 859    M: 'static,
 860{
 861    #[inline]
 862    fn thread_id() -> ThreadId {
 863        std::thread_local! {
 864            static ID: ThreadId = thread::current().id();
 865        }
 866        ID.try_with(|id| *id)
 867            .unwrap_or_else(|_| thread::current().id())
 868    }
 869
 870    struct Checked<F> {
 871        id: ThreadId,
 872        inner: ManuallyDrop<F>,
 873        location: &'static Location<'static>,
 874    }
 875
 876    impl<F> Drop for Checked<F> {
 877        fn drop(&mut self) {
 878            assert!(
 879                self.id == thread_id(),
 880                "local task dropped by a thread that didn't spawn it. Task spawned at {}",
 881                self.location
 882            );
 883            unsafe { ManuallyDrop::drop(&mut self.inner) };
 884        }
 885    }
 886
 887    impl<F: Future> Future for Checked<F> {
 888        type Output = F::Output;
 889
 890        fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
 891            assert!(
 892                self.id == thread_id(),
 893                "local task polled by a thread that didn't spawn it. Task spawned at {}",
 894                self.location
 895            );
 896            unsafe { self.map_unchecked_mut(|c| &mut *c.inner).poll(cx) }
 897        }
 898    }
 899
 900    // Wrap the future into one that checks which thread it's on.
 901    let future = Checked {
 902        id: thread_id(),
 903        inner: ManuallyDrop::new(future),
 904        location: Location::caller(),
 905    };
 906
 907    unsafe {
 908        async_task::Builder::new()
 909            .metadata(metadata)
 910            .spawn_unchecked(move |_| future, schedule)
 911    }
 912}
 913
 914/// Scope manages a set of tasks that are enqueued and waited on together. See [`BackgroundExecutor::scoped`].
 915pub struct Scope<'a> {
 916    executor: BackgroundExecutor,
 917    priority: Priority,
 918    futures: Vec<Pin<Box<dyn Future<Output = ()> + Send + 'static>>>,
 919    tx: Option<mpsc::Sender<()>>,
 920    rx: mpsc::Receiver<()>,
 921    lifetime: PhantomData<&'a ()>,
 922}
 923
 924impl<'a> Scope<'a> {
 925    fn new(executor: BackgroundExecutor, priority: Priority) -> Self {
 926        let (tx, rx) = mpsc::channel(1);
 927        Self {
 928            executor,
 929            priority,
 930            tx: Some(tx),
 931            rx,
 932            futures: Default::default(),
 933            lifetime: PhantomData,
 934        }
 935    }
 936
 937    /// How many CPUs are available to the dispatcher.
 938    pub fn num_cpus(&self) -> usize {
 939        self.executor.num_cpus()
 940    }
 941
 942    /// Spawn a future into this scope.
 943    #[track_caller]
 944    pub fn spawn<F>(&mut self, f: F)
 945    where
 946        F: Future<Output = ()> + Send + 'a,
 947    {
 948        let tx = self.tx.clone().unwrap();
 949
 950        // SAFETY: The 'a lifetime is guaranteed to outlive any of these futures because
 951        // dropping this `Scope` blocks until all of the futures have resolved.
 952        let f = unsafe {
 953            mem::transmute::<
 954                Pin<Box<dyn Future<Output = ()> + Send + 'a>>,
 955                Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
 956            >(Box::pin(async move {
 957                f.await;
 958                drop(tx);
 959            }))
 960        };
 961        self.futures.push(f);
 962    }
 963}
 964
 965impl Drop for Scope<'_> {
 966    fn drop(&mut self) {
 967        self.tx.take().unwrap();
 968
 969        // Wait until the channel is closed, which means that all of the spawned
 970        // futures have resolved.
 971        self.executor.block(self.rx.next());
 972    }
 973}
 974
 975#[cfg(test)]
 976mod test {
 977    use super::*;
 978    use crate::{App, TestDispatcher, TestPlatform};
 979    use rand::SeedableRng;
 980    use std::{cell::RefCell, rc::Weak};
 981
 982    #[test]
 983    fn sanity_test_tasks_run() {
 984        let dispatcher = TestDispatcher::new(StdRng::seed_from_u64(0));
 985        let arc_dispatcher = Arc::new(dispatcher.clone());
 986        let background_executor = BackgroundExecutor::new(arc_dispatcher.clone());
 987        let foreground_executor = ForegroundExecutor::new(arc_dispatcher);
 988
 989        let platform = TestPlatform::new(background_executor, foreground_executor.clone());
 990        let asset_source = Arc::new(());
 991        let http_client = http_client::FakeHttpClient::with_404_response();
 992
 993        let app = App::new_app(platform, asset_source, http_client);
 994
 995        let task_ran = Rc::new(RefCell::new(false));
 996
 997        foreground_executor
 998            .spawn_with_app(Rc::downgrade(&app), {
 999                let task_ran = Rc::clone(&task_ran);
1000                async move {
1001                    *task_ran.borrow_mut() = true;
1002                }
1003            })
1004            .detach();
1005
1006        // Run dispatcher while app is still alive
1007        dispatcher.run_until_parked();
1008
1009        // Task should have run
1010        assert!(
1011            *task_ran.borrow(),
1012            "Task should run normally when app is alive"
1013        );
1014    }
1015
1016    #[test]
1017    fn test_task_cancelled_when_app_dropped() {
1018        let dispatcher = TestDispatcher::new(StdRng::seed_from_u64(0));
1019        let arc_dispatcher = Arc::new(dispatcher.clone());
1020        let background_executor = BackgroundExecutor::new(arc_dispatcher.clone());
1021        let foreground_executor = ForegroundExecutor::new(arc_dispatcher);
1022
1023        let platform = TestPlatform::new(background_executor, foreground_executor.clone());
1024        let asset_source = Arc::new(());
1025        let http_client = http_client::FakeHttpClient::with_404_response();
1026
1027        let app = App::new_app(platform, asset_source, http_client);
1028        let app_weak = Rc::downgrade(&app);
1029
1030        let task_ran = Rc::new(RefCell::new(false));
1031        let task_ran_clone = Rc::clone(&task_ran);
1032
1033        foreground_executor
1034            .spawn_with_app(Weak::clone(&app_weak), async move {
1035                *task_ran_clone.borrow_mut() = true;
1036            })
1037            .detach();
1038
1039        assert!(
1040            Rc::weak_count(&app) > 0,
1041            "Task should hold a weak reference"
1042        );
1043
1044        drop(app);
1045
1046        assert!(app_weak.upgrade().is_none(), "App should have been dropped");
1047
1048        dispatcher.run_until_parked();
1049
1050        // The task should have been cancelled, not run
1051        assert!(
1052            !*task_ran.borrow(),
1053            "Task should have been cancelled when app was dropped, but it ran!"
1054        );
1055    }
1056
1057    #[test]
1058    fn test_nested_tasks_both_cancel() {
1059        let dispatcher = TestDispatcher::new(StdRng::seed_from_u64(0));
1060        let arc_dispatcher = Arc::new(dispatcher.clone());
1061        let background_executor = BackgroundExecutor::new(arc_dispatcher.clone());
1062        let foreground_executor = ForegroundExecutor::new(arc_dispatcher);
1063
1064        let platform = TestPlatform::new(background_executor, foreground_executor.clone());
1065        let asset_source = Arc::new(());
1066        let http_client = http_client::FakeHttpClient::with_404_response();
1067
1068        let app = App::new_app(platform, asset_source, http_client);
1069        let app_weak = Rc::downgrade(&app);
1070
1071        let outer_completed = Rc::new(RefCell::new(false));
1072        let inner_completed = Rc::new(RefCell::new(false));
1073        let reached_await = Rc::new(RefCell::new(false));
1074
1075        let outer_flag = Rc::clone(&outer_completed);
1076        let inner_flag = Rc::clone(&inner_completed);
1077        let await_flag = Rc::clone(&reached_await);
1078
1079        // Channel to block the inner task until we're ready
1080        let (tx, rx) = futures::channel::oneshot::channel::<()>();
1081
1082        // We need clones of executor and app_weak for the inner spawn
1083        let inner_executor = foreground_executor.clone();
1084        let inner_app_weak = app_weak.clone();
1085
1086        // Spawn outer task that will spawn and await an inner task
1087        foreground_executor
1088            .spawn_with_app(Weak::clone(&app_weak), async move {
1089                let inner_flag_clone = Rc::clone(&inner_flag);
1090
1091                // Spawn inner task that blocks on a channel
1092                let inner_task = inner_executor.spawn_with_app(inner_app_weak, async move {
1093                    // Wait for signal (which will never come - we'll drop the app instead)
1094                    rx.await.ok();
1095                    *inner_flag_clone.borrow_mut() = true;
1096                });
1097
1098                // Mark that we've reached the await point
1099                *await_flag.borrow_mut() = true;
1100
1101                // Await inner task - this should not panic when both are cancelled
1102                inner_task.await;
1103
1104                // Mark outer as complete (should never reach here)
1105                *outer_flag.borrow_mut() = true;
1106            })
1107            .detach();
1108
1109        // Run dispatcher until outer task reaches the await point
1110        // The inner task will be blocked on the channel
1111        dispatcher.run_until_parked();
1112
1113        // Verify we actually reached the await point before dropping the app
1114        assert!(
1115            *reached_await.borrow(),
1116            "Outer task should have reached the await point"
1117        );
1118
1119        // Neither task should have completed yet
1120        assert!(
1121            !*outer_completed.borrow(),
1122            "Outer task should not have completed yet"
1123        );
1124        assert!(
1125            !*inner_completed.borrow(),
1126            "Inner task should not have completed yet"
1127        );
1128
1129        // Drop the channel sender and app while outer is awaiting inner
1130        drop(tx);
1131        drop(app);
1132        assert!(app_weak.upgrade().is_none(), "App should have been dropped");
1133
1134        // Run dispatcher - both tasks should be cancelled
1135        dispatcher.run_until_parked();
1136
1137        // Neither task should have completed (both were cancelled)
1138        assert!(
1139            !*outer_completed.borrow(),
1140            "Outer task should have been cancelled, not completed"
1141        );
1142        assert!(
1143            !*inner_completed.borrow(),
1144            "Inner task should have been cancelled, not completed"
1145        );
1146    }
1147
1148    #[test]
1149    fn test_task_without_app_tracking_still_runs() {
1150        let dispatcher = TestDispatcher::new(StdRng::seed_from_u64(0));
1151        let arc_dispatcher = Arc::new(dispatcher.clone());
1152        let background_executor = BackgroundExecutor::new(arc_dispatcher.clone());
1153        let foreground_executor = ForegroundExecutor::new(arc_dispatcher);
1154
1155        let platform = TestPlatform::new(background_executor, foreground_executor.clone());
1156        let asset_source = Arc::new(());
1157        let http_client = http_client::FakeHttpClient::with_404_response();
1158
1159        let app = App::new_app(platform, asset_source, http_client);
1160        let app_weak = Rc::downgrade(&app);
1161
1162        let task_ran = Rc::new(RefCell::new(false));
1163        let task_ran_clone = Rc::clone(&task_ran);
1164
1165        let _task = foreground_executor.spawn(async move {
1166            *task_ran_clone.borrow_mut() = true;
1167        });
1168
1169        drop(app);
1170
1171        assert!(app_weak.upgrade().is_none(), "App should have been dropped");
1172
1173        dispatcher.run_until_parked();
1174
1175        assert!(
1176            *task_ran.borrow(),
1177            "Task without app tracking should still run after app is dropped"
1178        );
1179    }
1180
1181    #[test]
1182    #[should_panic]
1183    fn test_polling_cancelled_task_panics() {
1184        let dispatcher = TestDispatcher::new(StdRng::seed_from_u64(0));
1185        let arc_dispatcher = Arc::new(dispatcher.clone());
1186        let background_executor = BackgroundExecutor::new(arc_dispatcher.clone());
1187        let foreground_executor = ForegroundExecutor::new(arc_dispatcher);
1188
1189        let platform = TestPlatform::new(background_executor.clone(), foreground_executor.clone());
1190        let asset_source = Arc::new(());
1191        let http_client = http_client::FakeHttpClient::with_404_response();
1192
1193        let app = App::new_app(platform, asset_source, http_client);
1194        let app_weak = Rc::downgrade(&app);
1195
1196        let task = foreground_executor.spawn_with_app(Weak::clone(&app_weak), async move { 42 });
1197
1198        drop(app);
1199
1200        assert!(app_weak.upgrade().is_none(), "App should have been dropped");
1201
1202        dispatcher.run_until_parked();
1203
1204        background_executor.block(task);
1205    }
1206
1207    #[test]
1208    fn test_polling_cancelled_task_returns_none_with_fallible() {
1209        let dispatcher = TestDispatcher::new(StdRng::seed_from_u64(0));
1210        let arc_dispatcher = Arc::new(dispatcher.clone());
1211        let background_executor = BackgroundExecutor::new(arc_dispatcher.clone());
1212        let foreground_executor = ForegroundExecutor::new(arc_dispatcher);
1213
1214        let platform = TestPlatform::new(background_executor.clone(), foreground_executor.clone());
1215        let asset_source = Arc::new(());
1216        let http_client = http_client::FakeHttpClient::with_404_response();
1217
1218        let app = App::new_app(platform, asset_source, http_client);
1219        let app_weak = Rc::downgrade(&app);
1220
1221        let task = foreground_executor
1222            .spawn_with_app(Weak::clone(&app_weak), async move { 42 })
1223            .fallible();
1224
1225        drop(app);
1226
1227        assert!(app_weak.upgrade().is_none(), "App should have been dropped");
1228
1229        dispatcher.run_until_parked();
1230
1231        let result = background_executor.block(task);
1232        assert_eq!(result, None, "Cancelled task should return None");
1233    }
1234}