executor.rs

   1use crate::{App, PlatformDispatcher, RunnableMeta, RunnableVariant, TaskTiming, profiler};
   2use async_task::Runnable;
   3use futures::channel::mpsc;
   4use parking_lot::{Condvar, Mutex};
   5use smol::prelude::*;
   6use std::{
   7    fmt::Debug,
   8    marker::PhantomData,
   9    mem::{self, ManuallyDrop},
  10    num::NonZeroUsize,
  11    panic::Location,
  12    pin::Pin,
  13    rc::Rc,
  14    sync::{
  15        Arc,
  16        atomic::{AtomicUsize, Ordering},
  17    },
  18    task::{Context, Poll},
  19    thread::{self, ThreadId},
  20    time::{Duration, Instant},
  21};
  22use util::TryFutureExt;
  23use waker_fn::waker_fn;
  24
  25#[cfg(any(test, feature = "test-support"))]
  26use rand::rngs::StdRng;
  27
  28/// A pointer to the executor that is currently running,
  29/// for spawning background tasks.
  30#[derive(Clone)]
  31pub struct BackgroundExecutor {
  32    #[doc(hidden)]
  33    pub dispatcher: Arc<dyn PlatformDispatcher>,
  34}
  35
  36/// A pointer to the executor that is currently running,
  37/// for spawning tasks on the main thread.
  38///
  39/// This is intentionally `!Send` via the `not_send` marker field. This is because
  40/// `ForegroundExecutor::spawn` does not require `Send` but checks at runtime that the future is
  41/// only polled from the same thread it was spawned from. These checks would fail when spawning
  42/// foreground tasks from background threads.
  43#[derive(Clone)]
  44pub struct ForegroundExecutor {
  45    #[doc(hidden)]
  46    pub dispatcher: Arc<dyn PlatformDispatcher>,
  47    not_send: PhantomData<Rc<()>>,
  48}
  49
  50/// Realtime task priority
  51#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
  52#[repr(u8)]
  53pub enum RealtimePriority {
  54    /// Audio task
  55    Audio,
  56    /// Other realtime task
  57    #[default]
  58    Other,
  59}
  60
  61/// Task priority
  62#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
  63#[repr(u8)]
  64pub enum Priority {
  65    /// Realtime priority
  66    ///
  67    /// Spawning a task with this priority will spin it off on a separate thread dedicated just to that task.
  68    Realtime(RealtimePriority),
  69    /// High priority
  70    ///
  71    /// Only use for tasks that are critical to the user experience / responsiveness of the editor.
  72    High,
  73    /// Medium priority, probably suits most of your use cases.
  74    #[default]
  75    Medium,
  76    /// Low priority
  77    ///
  78    /// Prioritize this for background work that can come in large quantities
  79    /// to not starve the executor of resources for high priority tasks
  80    Low,
  81}
  82
  83impl Priority {
  84    #[allow(dead_code)]
  85    pub(crate) const fn probability(&self) -> u32 {
  86        match self {
  87            // realtime priorities are not considered for probability scheduling
  88            Priority::Realtime(_) => 0,
  89            Priority::High => 60,
  90            Priority::Medium => 30,
  91            Priority::Low => 10,
  92        }
  93    }
  94}
  95
  96/// Task is a primitive that allows work to happen in the background.
  97///
  98/// It implements [`Future`] so you can `.await` on it.
  99///
 100/// If you drop a task it will be cancelled immediately. Calling [`Task::detach`] allows
 101/// the task to continue running, but with no way to return a value.
 102#[must_use]
 103#[derive(Debug)]
 104pub struct Task<T>(TaskState<T>);
 105
 106#[derive(Debug)]
 107enum TaskState<T> {
 108    /// A task that is ready to return a value
 109    Ready(Option<T>),
 110
 111    /// A task that is currently running.
 112    Spawned(async_task::Task<T, RunnableMeta>),
 113}
 114
 115impl<T> Task<T> {
 116    /// Creates a new task that will resolve with the value
 117    pub fn ready(val: T) -> Self {
 118        Task(TaskState::Ready(Some(val)))
 119    }
 120
 121    /// Detaching a task runs it to completion in the background
 122    pub fn detach(self) {
 123        match self {
 124            Task(TaskState::Ready(_)) => {}
 125            Task(TaskState::Spawned(task)) => task.detach(),
 126        }
 127    }
 128
 129    /// Converts this task into a fallible task that returns `Option<T>`.
 130    ///
 131    /// Unlike the standard `Task<T>`, a [`FallibleTask`] will return `None`
 132    /// if the app was dropped while the task is executing.
 133    ///
 134    /// # Example
 135    ///
 136    /// ```ignore
 137    /// // Background task that gracefully handles app shutdown:
 138    /// cx.background_spawn(async move {
 139    ///     let result = foreground_task.fallible().await;
 140    ///     if let Some(value) = result {
 141    ///         // Process the value
 142    ///     }
 143    ///     // If None, app was shut down - just exit gracefully
 144    /// }).detach();
 145    /// ```
 146    pub fn fallible(self) -> FallibleTask<T> {
 147        FallibleTask(match self.0 {
 148            TaskState::Ready(val) => FallibleTaskState::Ready(val),
 149            TaskState::Spawned(task) => FallibleTaskState::Spawned(task.fallible()),
 150        })
 151    }
 152}
 153
 154impl<E, T> Task<Result<T, E>>
 155where
 156    T: 'static,
 157    E: 'static + Debug,
 158{
 159    /// Run the task to completion in the background and log any
 160    /// errors that occur.
 161    #[track_caller]
 162    pub fn detach_and_log_err(self, cx: &App) {
 163        let location = core::panic::Location::caller();
 164        cx.foreground_executor()
 165            .spawn(self.log_tracked_err(*location))
 166            .detach();
 167    }
 168}
 169
 170impl<T> Future for Task<T> {
 171    type Output = T;
 172
 173    fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
 174        match unsafe { self.get_unchecked_mut() } {
 175            Task(TaskState::Ready(val)) => Poll::Ready(val.take().unwrap()),
 176            Task(TaskState::Spawned(task)) => task.poll(cx),
 177        }
 178    }
 179}
 180
 181/// A task that returns `Option<T>` instead of panicking when cancelled.
 182#[must_use]
 183pub struct FallibleTask<T>(FallibleTaskState<T>);
 184
 185enum FallibleTaskState<T> {
 186    /// A task that is ready to return a value
 187    Ready(Option<T>),
 188
 189    /// A task that is currently running (wraps async_task::FallibleTask).
 190    Spawned(async_task::FallibleTask<T, RunnableMeta>),
 191}
 192
 193impl<T> FallibleTask<T> {
 194    /// Creates a new fallible task that will resolve with the value.
 195    pub fn ready(val: T) -> Self {
 196        FallibleTask(FallibleTaskState::Ready(Some(val)))
 197    }
 198
 199    /// Detaching a task runs it to completion in the background.
 200    pub fn detach(self) {
 201        match self.0 {
 202            FallibleTaskState::Ready(_) => {}
 203            FallibleTaskState::Spawned(task) => task.detach(),
 204        }
 205    }
 206}
 207
 208impl<T> Future for FallibleTask<T> {
 209    type Output = Option<T>;
 210
 211    fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
 212        match unsafe { self.get_unchecked_mut() } {
 213            FallibleTask(FallibleTaskState::Ready(val)) => Poll::Ready(val.take()),
 214            FallibleTask(FallibleTaskState::Spawned(task)) => Pin::new(task).poll(cx),
 215        }
 216    }
 217}
 218
 219impl<T> std::fmt::Debug for FallibleTask<T> {
 220    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
 221        match &self.0 {
 222            FallibleTaskState::Ready(_) => f.debug_tuple("FallibleTask::Ready").finish(),
 223            FallibleTaskState::Spawned(task) => {
 224                f.debug_tuple("FallibleTask::Spawned").field(task).finish()
 225            }
 226        }
 227    }
 228}
 229
 230/// A task label is an opaque identifier that you can use to
 231/// refer to a task in tests.
 232#[derive(Clone, Copy, PartialEq, Eq, Hash, Debug)]
 233pub struct TaskLabel(NonZeroUsize);
 234
 235impl Default for TaskLabel {
 236    fn default() -> Self {
 237        Self::new()
 238    }
 239}
 240
 241impl TaskLabel {
 242    /// Construct a new task label.
 243    pub fn new() -> Self {
 244        static NEXT_TASK_LABEL: AtomicUsize = AtomicUsize::new(1);
 245        Self(
 246            NEXT_TASK_LABEL
 247                .fetch_add(1, Ordering::SeqCst)
 248                .try_into()
 249                .unwrap(),
 250        )
 251    }
 252}
 253
 254type AnyLocalFuture<R> = Pin<Box<dyn 'static + Future<Output = R>>>;
 255
 256type AnyFuture<R> = Pin<Box<dyn 'static + Send + Future<Output = R>>>;
 257
 258/// BackgroundExecutor lets you run things on background threads.
 259/// In production this is a thread pool with no ordering guarantees.
 260/// In tests this is simulated by running tasks one by one in a deterministic
 261/// (but arbitrary) order controlled by the `SEED` environment variable.
 262impl BackgroundExecutor {
 263    #[doc(hidden)]
 264    pub fn new(dispatcher: Arc<dyn PlatformDispatcher>) -> Self {
 265        Self { dispatcher }
 266    }
 267
 268    /// Enqueues the given future to be run to completion on a background thread.
 269    #[track_caller]
 270    pub fn spawn<R>(&self, future: impl Future<Output = R> + Send + 'static) -> Task<R>
 271    where
 272        R: Send + 'static,
 273    {
 274        self.spawn_with_priority(Priority::default(), future)
 275    }
 276
 277    /// Enqueues the given future to be run to completion on a background thread.
 278    #[track_caller]
 279    pub fn spawn_with_priority<R>(
 280        &self,
 281        priority: Priority,
 282        future: impl Future<Output = R> + Send + 'static,
 283    ) -> Task<R>
 284    where
 285        R: Send + 'static,
 286    {
 287        self.spawn_internal::<R>(Box::pin(future), None, priority)
 288    }
 289
 290    /// Enqueues the given future to be run to completion on a background thread and blocking the current task on it.
 291    ///
 292    /// This allows to spawn background work that borrows from its scope. Note that the supplied future will run to
 293    /// completion before the current task is resumed, even if the current task is slated for cancellation.
 294    pub async fn await_on_background<R>(&self, future: impl Future<Output = R> + Send) -> R
 295    where
 296        R: Send,
 297    {
 298        // We need to ensure that cancellation of the parent task does not drop the environment
 299        // before the our own task has completed or got cancelled.
 300        struct NotifyOnDrop<'a>(&'a (Condvar, Mutex<bool>));
 301
 302        impl Drop for NotifyOnDrop<'_> {
 303            fn drop(&mut self) {
 304                *self.0.1.lock() = true;
 305                self.0.0.notify_all();
 306            }
 307        }
 308
 309        struct WaitOnDrop<'a>(&'a (Condvar, Mutex<bool>));
 310
 311        impl Drop for WaitOnDrop<'_> {
 312            fn drop(&mut self) {
 313                let mut done = self.0.1.lock();
 314                if !*done {
 315                    self.0.0.wait(&mut done);
 316                }
 317            }
 318        }
 319
 320        let dispatcher = self.dispatcher.clone();
 321        let location = core::panic::Location::caller();
 322
 323        let pair = &(Condvar::new(), Mutex::new(false));
 324        let _wait_guard = WaitOnDrop(pair);
 325
 326        let (runnable, task) = unsafe {
 327            async_task::Builder::new()
 328                .metadata(RunnableMeta {
 329                    location,
 330                    app: None,
 331                })
 332                .spawn_unchecked(
 333                    move |_| async {
 334                        let _notify_guard = NotifyOnDrop(pair);
 335                        future.await
 336                    },
 337                    move |runnable| {
 338                        dispatcher.dispatch(
 339                            RunnableVariant::Meta(runnable),
 340                            None,
 341                            Priority::default(),
 342                        )
 343                    },
 344                )
 345        };
 346        runnable.schedule();
 347        task.await
 348    }
 349
 350    /// Enqueues the given future to be run to completion on a background thread.
 351    /// The given label can be used to control the priority of the task in tests.
 352    #[track_caller]
 353    pub fn spawn_labeled<R>(
 354        &self,
 355        label: TaskLabel,
 356        future: impl Future<Output = R> + Send + 'static,
 357    ) -> Task<R>
 358    where
 359        R: Send + 'static,
 360    {
 361        self.spawn_internal::<R>(Box::pin(future), Some(label), Priority::default())
 362    }
 363
 364    #[track_caller]
 365    fn spawn_internal<R: Send + 'static>(
 366        &self,
 367        future: AnyFuture<R>,
 368        label: Option<TaskLabel>,
 369        #[cfg_attr(
 370            target_os = "windows",
 371            expect(
 372                unused_variables,
 373                reason = "Multi priority scheduler is broken on windows"
 374            )
 375        )]
 376        priority: Priority,
 377    ) -> Task<R> {
 378        let dispatcher = self.dispatcher.clone();
 379        #[cfg(target_os = "windows")]
 380        let priority = Priority::Medium; // multi-prio scheduler is broken on windows
 381
 382        let (runnable, task) = if let Priority::Realtime(realtime) = priority {
 383            let location = core::panic::Location::caller();
 384            let (mut tx, rx) = flume::bounded::<Runnable<RunnableMeta>>(1);
 385
 386            dispatcher.spawn_realtime(
 387                realtime,
 388                Box::new(move || {
 389                    while let Ok(runnable) = rx.recv() {
 390                        let start = Instant::now();
 391                        let location = runnable.metadata().location;
 392                        let mut timing = TaskTiming {
 393                            location,
 394                            start,
 395                            end: None,
 396                        };
 397                        profiler::add_task_timing(timing);
 398
 399                        runnable.run();
 400
 401                        let end = Instant::now();
 402                        timing.end = Some(end);
 403                        profiler::add_task_timing(timing);
 404                    }
 405                }),
 406            );
 407
 408            async_task::Builder::new()
 409                .metadata(RunnableMeta {
 410                    location,
 411                    app: None,
 412                })
 413                .spawn(
 414                    move |_| future,
 415                    move |runnable| {
 416                        let _ = tx.send(runnable);
 417                    },
 418                )
 419        } else {
 420            let location = core::panic::Location::caller();
 421            async_task::Builder::new()
 422                .metadata(RunnableMeta {
 423                    location,
 424                    app: None,
 425                })
 426                .spawn(
 427                    move |_| future,
 428                    move |runnable| {
 429                        dispatcher.dispatch(RunnableVariant::Meta(runnable), label, priority)
 430                    },
 431                )
 432        };
 433
 434        runnable.schedule();
 435        Task(TaskState::Spawned(task))
 436    }
 437
 438    /// Used by the test harness to run an async test in a synchronous fashion.
 439    #[cfg(any(test, feature = "test-support"))]
 440    #[track_caller]
 441    pub fn block_test<R>(&self, future: impl Future<Output = R>) -> R {
 442        if let Ok(value) = self.block_internal(false, future, None) {
 443            value
 444        } else {
 445            unreachable!()
 446        }
 447    }
 448
 449    /// Block the current thread until the given future resolves.
 450    /// Consider using `block_with_timeout` instead.
 451    pub fn block<R>(&self, future: impl Future<Output = R>) -> R {
 452        if let Ok(value) = self.block_internal(true, future, None) {
 453            value
 454        } else {
 455            unreachable!()
 456        }
 457    }
 458
 459    #[cfg(not(any(test, feature = "test-support")))]
 460    pub(crate) fn block_internal<Fut: Future>(
 461        &self,
 462        _background_only: bool,
 463        future: Fut,
 464        timeout: Option<Duration>,
 465    ) -> Result<Fut::Output, impl Future<Output = Fut::Output> + use<Fut>> {
 466        use std::time::Instant;
 467
 468        let mut future = Box::pin(future);
 469        if timeout == Some(Duration::ZERO) {
 470            return Err(future);
 471        }
 472        let deadline = timeout.map(|timeout| Instant::now() + timeout);
 473
 474        let parker = parking::Parker::new();
 475        let unparker = parker.unparker();
 476        let waker = waker_fn(move || {
 477            unparker.unpark();
 478        });
 479        let mut cx = std::task::Context::from_waker(&waker);
 480
 481        loop {
 482            match future.as_mut().poll(&mut cx) {
 483                Poll::Ready(result) => return Ok(result),
 484                Poll::Pending => {
 485                    let timeout =
 486                        deadline.map(|deadline| deadline.saturating_duration_since(Instant::now()));
 487                    if let Some(timeout) = timeout {
 488                        if !parker.park_timeout(timeout)
 489                            && deadline.is_some_and(|deadline| deadline < Instant::now())
 490                        {
 491                            return Err(future);
 492                        }
 493                    } else {
 494                        parker.park();
 495                    }
 496                }
 497            }
 498        }
 499    }
 500
 501    #[cfg(any(test, feature = "test-support"))]
 502    #[track_caller]
 503    pub(crate) fn block_internal<Fut: Future>(
 504        &self,
 505        background_only: bool,
 506        future: Fut,
 507        timeout: Option<Duration>,
 508    ) -> Result<Fut::Output, impl Future<Output = Fut::Output> + use<Fut>> {
 509        use std::sync::atomic::AtomicBool;
 510
 511        use parking::Parker;
 512
 513        let mut future = Box::pin(future);
 514        if timeout == Some(Duration::ZERO) {
 515            return Err(future);
 516        }
 517        let Some(dispatcher) = self.dispatcher.as_test() else {
 518            return Err(future);
 519        };
 520
 521        let mut max_ticks = if timeout.is_some() {
 522            dispatcher.gen_block_on_ticks()
 523        } else {
 524            usize::MAX
 525        };
 526
 527        let parker = Parker::new();
 528        let unparker = parker.unparker();
 529
 530        let awoken = Arc::new(AtomicBool::new(false));
 531        let waker = waker_fn({
 532            let awoken = awoken.clone();
 533            let unparker = unparker.clone();
 534            move || {
 535                awoken.store(true, Ordering::SeqCst);
 536                unparker.unpark();
 537            }
 538        });
 539        let mut cx = std::task::Context::from_waker(&waker);
 540
 541        let duration = Duration::from_secs(
 542            option_env!("GPUI_TEST_TIMEOUT")
 543                .and_then(|s| s.parse::<u64>().ok())
 544                .unwrap_or(180),
 545        );
 546        let mut test_should_end_by = Instant::now() + duration;
 547
 548        loop {
 549            match future.as_mut().poll(&mut cx) {
 550                Poll::Ready(result) => return Ok(result),
 551                Poll::Pending => {
 552                    if max_ticks == 0 {
 553                        return Err(future);
 554                    }
 555                    max_ticks -= 1;
 556
 557                    if !dispatcher.tick(background_only) {
 558                        if awoken.swap(false, Ordering::SeqCst) {
 559                            continue;
 560                        }
 561
 562                        if !dispatcher.parking_allowed() {
 563                            if dispatcher.advance_clock_to_next_delayed() {
 564                                continue;
 565                            }
 566                            let mut backtrace_message = String::new();
 567                            let mut waiting_message = String::new();
 568                            if let Some(backtrace) = dispatcher.waiting_backtrace() {
 569                                backtrace_message =
 570                                    format!("\nbacktrace of waiting future:\n{:?}", backtrace);
 571                            }
 572                            if let Some(waiting_hint) = dispatcher.waiting_hint() {
 573                                waiting_message = format!("\n  waiting on: {}\n", waiting_hint);
 574                            }
 575                            panic!(
 576                                "parked with nothing left to run{waiting_message}{backtrace_message}",
 577                            )
 578                        }
 579                        dispatcher.push_unparker(unparker.clone());
 580                        parker.park_timeout(Duration::from_millis(1));
 581                        if Instant::now() > test_should_end_by {
 582                            panic!("test timed out after {duration:?} with allow_parking")
 583                        }
 584                    }
 585                }
 586            }
 587        }
 588    }
 589
 590    /// Block the current thread until the given future resolves
 591    /// or `duration` has elapsed.
 592    pub fn block_with_timeout<Fut: Future>(
 593        &self,
 594        duration: Duration,
 595        future: Fut,
 596    ) -> Result<Fut::Output, impl Future<Output = Fut::Output> + use<Fut>> {
 597        self.block_internal(true, future, Some(duration))
 598    }
 599
 600    /// Scoped lets you start a number of tasks and waits
 601    /// for all of them to complete before returning.
 602    pub async fn scoped<'scope, F>(&self, scheduler: F)
 603    where
 604        F: FnOnce(&mut Scope<'scope>),
 605    {
 606        let mut scope = Scope::new(self.clone(), Priority::default());
 607        (scheduler)(&mut scope);
 608        let spawned = mem::take(&mut scope.futures)
 609            .into_iter()
 610            .map(|f| self.spawn_with_priority(scope.priority, f))
 611            .collect::<Vec<_>>();
 612        for task in spawned {
 613            task.await;
 614        }
 615    }
 616
 617    /// Scoped lets you start a number of tasks and waits
 618    /// for all of them to complete before returning.
 619    pub async fn scoped_priority<'scope, F>(&self, priority: Priority, scheduler: F)
 620    where
 621        F: FnOnce(&mut Scope<'scope>),
 622    {
 623        let mut scope = Scope::new(self.clone(), priority);
 624        (scheduler)(&mut scope);
 625        let spawned = mem::take(&mut scope.futures)
 626            .into_iter()
 627            .map(|f| self.spawn_with_priority(scope.priority, f))
 628            .collect::<Vec<_>>();
 629        for task in spawned {
 630            task.await;
 631        }
 632    }
 633
 634    /// Get the current time.
 635    ///
 636    /// Calling this instead of `std::time::Instant::now` allows the use
 637    /// of fake timers in tests.
 638    pub fn now(&self) -> Instant {
 639        self.dispatcher.now()
 640    }
 641
 642    /// Returns a task that will complete after the given duration.
 643    /// Depending on other concurrent tasks the elapsed duration may be longer
 644    /// than requested.
 645    pub fn timer(&self, duration: Duration) -> Task<()> {
 646        if duration.is_zero() {
 647            return Task::ready(());
 648        }
 649        let location = core::panic::Location::caller();
 650        let (runnable, task) = async_task::Builder::new()
 651            .metadata(RunnableMeta {
 652                location,
 653                app: None,
 654            })
 655            .spawn(move |_| async move {}, {
 656                let dispatcher = self.dispatcher.clone();
 657                move |runnable| dispatcher.dispatch_after(duration, RunnableVariant::Meta(runnable))
 658            });
 659        runnable.schedule();
 660        Task(TaskState::Spawned(task))
 661    }
 662
 663    /// in tests, start_waiting lets you indicate which task is waiting (for debugging only)
 664    #[cfg(any(test, feature = "test-support"))]
 665    pub fn start_waiting(&self) {
 666        self.dispatcher.as_test().unwrap().start_waiting();
 667    }
 668
 669    /// in tests, removes the debugging data added by start_waiting
 670    #[cfg(any(test, feature = "test-support"))]
 671    pub fn finish_waiting(&self) {
 672        self.dispatcher.as_test().unwrap().finish_waiting();
 673    }
 674
 675    /// in tests, run an arbitrary number of tasks (determined by the SEED environment variable)
 676    #[cfg(any(test, feature = "test-support"))]
 677    pub fn simulate_random_delay(&self) -> impl Future<Output = ()> + use<> {
 678        self.dispatcher.as_test().unwrap().simulate_random_delay()
 679    }
 680
 681    /// in tests, indicate that a given task from `spawn_labeled` should run after everything else
 682    #[cfg(any(test, feature = "test-support"))]
 683    pub fn deprioritize(&self, task_label: TaskLabel) {
 684        self.dispatcher.as_test().unwrap().deprioritize(task_label)
 685    }
 686
 687    /// in tests, move time forward. This does not run any tasks, but does make `timer`s ready.
 688    #[cfg(any(test, feature = "test-support"))]
 689    pub fn advance_clock(&self, duration: Duration) {
 690        self.dispatcher.as_test().unwrap().advance_clock(duration)
 691    }
 692
 693    /// in tests, run one task.
 694    #[cfg(any(test, feature = "test-support"))]
 695    pub fn tick(&self) -> bool {
 696        self.dispatcher.as_test().unwrap().tick(false)
 697    }
 698
 699    /// in tests, run all tasks that are ready to run. If after doing so
 700    /// the test still has outstanding tasks, this will panic. (See also [`Self::allow_parking`])
 701    #[cfg(any(test, feature = "test-support"))]
 702    pub fn run_until_parked(&self) {
 703        self.dispatcher.as_test().unwrap().run_until_parked()
 704    }
 705
 706    /// in tests, prevents `run_until_parked` from panicking if there are outstanding tasks.
 707    /// This is useful when you are integrating other (non-GPUI) futures, like disk access, that
 708    /// do take real async time to run.
 709    #[cfg(any(test, feature = "test-support"))]
 710    pub fn allow_parking(&self) {
 711        self.dispatcher.as_test().unwrap().allow_parking();
 712    }
 713
 714    /// undoes the effect of [`Self::allow_parking`].
 715    #[cfg(any(test, feature = "test-support"))]
 716    pub fn forbid_parking(&self) {
 717        self.dispatcher.as_test().unwrap().forbid_parking();
 718    }
 719
 720    /// adds detail to the "parked with nothing let to run" message.
 721    #[cfg(any(test, feature = "test-support"))]
 722    pub fn set_waiting_hint(&self, msg: Option<String>) {
 723        self.dispatcher.as_test().unwrap().set_waiting_hint(msg);
 724    }
 725
 726    /// in tests, returns the rng used by the dispatcher and seeded by the `SEED` environment variable
 727    #[cfg(any(test, feature = "test-support"))]
 728    pub fn rng(&self) -> StdRng {
 729        self.dispatcher.as_test().unwrap().rng()
 730    }
 731
 732    /// How many CPUs are available to the dispatcher.
 733    pub fn num_cpus(&self) -> usize {
 734        #[cfg(any(test, feature = "test-support"))]
 735        return 4;
 736
 737        #[cfg(not(any(test, feature = "test-support")))]
 738        return num_cpus::get();
 739    }
 740
 741    /// Whether we're on the main thread.
 742    pub fn is_main_thread(&self) -> bool {
 743        self.dispatcher.is_main_thread()
 744    }
 745
 746    #[cfg(any(test, feature = "test-support"))]
 747    /// in tests, control the number of ticks that `block_with_timeout` will run before timing out.
 748    pub fn set_block_on_ticks(&self, range: std::ops::RangeInclusive<usize>) {
 749        self.dispatcher.as_test().unwrap().set_block_on_ticks(range);
 750    }
 751}
 752
 753/// ForegroundExecutor runs things on the main thread.
 754impl ForegroundExecutor {
 755    /// Creates a new ForegroundExecutor from the given PlatformDispatcher.
 756    pub fn new(dispatcher: Arc<dyn PlatformDispatcher>) -> Self {
 757        Self {
 758            dispatcher,
 759            not_send: PhantomData,
 760        }
 761    }
 762
 763    /// Enqueues the given Task to run on the main thread at some point in the future.
 764    #[track_caller]
 765    pub fn spawn<R>(&self, future: impl Future<Output = R> + 'static) -> Task<R>
 766    where
 767        R: 'static,
 768    {
 769        self.inner_spawn(None, Priority::default(), future)
 770    }
 771
 772    /// Enqueues the given Task to run on the main thread at some point in the future.
 773    #[track_caller]
 774    pub fn spawn_with_priority<R>(
 775        &self,
 776        priority: Priority,
 777        future: impl Future<Output = R> + 'static,
 778    ) -> Task<R>
 779    where
 780        R: 'static,
 781    {
 782        self.inner_spawn(None, priority, future)
 783    }
 784
 785    #[track_caller]
 786    pub(crate) fn spawn_context<R>(
 787        &self,
 788        app: std::sync::Weak<()>,
 789        future: impl Future<Output = R> + 'static,
 790    ) -> Task<R>
 791    where
 792        R: 'static,
 793    {
 794        self.inner_spawn(Some(app), Priority::default(), future)
 795    }
 796
 797    #[track_caller]
 798    pub(crate) fn inner_spawn<R>(
 799        &self,
 800        app: Option<std::sync::Weak<()>>,
 801        priority: Priority,
 802        future: impl Future<Output = R> + 'static,
 803    ) -> Task<R>
 804    where
 805        R: 'static,
 806    {
 807        let dispatcher = self.dispatcher.clone();
 808        let location = core::panic::Location::caller();
 809
 810        #[track_caller]
 811        fn inner<R: 'static>(
 812            dispatcher: Arc<dyn PlatformDispatcher>,
 813            future: AnyLocalFuture<R>,
 814            location: &'static core::panic::Location<'static>,
 815            app: Option<std::sync::Weak<()>>,
 816            priority: Priority,
 817        ) -> Task<R> {
 818            let (runnable, task) = spawn_local_with_source_location(
 819                future,
 820                move |runnable| {
 821                    dispatcher.dispatch_on_main_thread(RunnableVariant::Meta(runnable), priority)
 822                },
 823                RunnableMeta { location, app },
 824            );
 825            runnable.schedule();
 826            Task(TaskState::Spawned(task))
 827        }
 828        inner::<R>(dispatcher, Box::pin(future), location, app, priority)
 829    }
 830}
 831
 832/// Variant of `async_task::spawn_local` that includes the source location of the spawn in panics.
 833///
 834/// Copy-modified from:
 835/// <https://github.com/smol-rs/async-task/blob/ca9dbe1db9c422fd765847fa91306e30a6bb58a9/src/runnable.rs#L405>
 836#[track_caller]
 837fn spawn_local_with_source_location<Fut, S, M>(
 838    future: Fut,
 839    schedule: S,
 840    metadata: M,
 841) -> (Runnable<M>, async_task::Task<Fut::Output, M>)
 842where
 843    Fut: Future + 'static,
 844    Fut::Output: 'static,
 845    S: async_task::Schedule<M> + Send + Sync + 'static,
 846    M: 'static,
 847{
 848    #[inline]
 849    fn thread_id() -> ThreadId {
 850        std::thread_local! {
 851            static ID: ThreadId = thread::current().id();
 852        }
 853        ID.try_with(|id| *id)
 854            .unwrap_or_else(|_| thread::current().id())
 855    }
 856
 857    struct Checked<F> {
 858        id: ThreadId,
 859        inner: ManuallyDrop<F>,
 860        location: &'static Location<'static>,
 861    }
 862
 863    impl<F> Drop for Checked<F> {
 864        fn drop(&mut self) {
 865            assert!(
 866                self.id == thread_id(),
 867                "local task dropped by a thread that didn't spawn it. Task spawned at {}",
 868                self.location
 869            );
 870            unsafe { ManuallyDrop::drop(&mut self.inner) };
 871        }
 872    }
 873
 874    impl<F: Future> Future for Checked<F> {
 875        type Output = F::Output;
 876
 877        fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
 878            assert!(
 879                self.id == thread_id(),
 880                "local task polled by a thread that didn't spawn it. Task spawned at {}",
 881                self.location
 882            );
 883            unsafe { self.map_unchecked_mut(|c| &mut *c.inner).poll(cx) }
 884        }
 885    }
 886
 887    // Wrap the future into one that checks which thread it's on.
 888    let future = Checked {
 889        id: thread_id(),
 890        inner: ManuallyDrop::new(future),
 891        location: Location::caller(),
 892    };
 893
 894    unsafe {
 895        async_task::Builder::new()
 896            .metadata(metadata)
 897            .spawn_unchecked(move |_| future, schedule)
 898    }
 899}
 900
 901/// Scope manages a set of tasks that are enqueued and waited on together. See [`BackgroundExecutor::scoped`].
 902pub struct Scope<'a> {
 903    executor: BackgroundExecutor,
 904    priority: Priority,
 905    futures: Vec<Pin<Box<dyn Future<Output = ()> + Send + 'static>>>,
 906    tx: Option<mpsc::Sender<()>>,
 907    rx: mpsc::Receiver<()>,
 908    lifetime: PhantomData<&'a ()>,
 909}
 910
 911impl<'a> Scope<'a> {
 912    fn new(executor: BackgroundExecutor, priority: Priority) -> Self {
 913        let (tx, rx) = mpsc::channel(1);
 914        Self {
 915            executor,
 916            priority,
 917            tx: Some(tx),
 918            rx,
 919            futures: Default::default(),
 920            lifetime: PhantomData,
 921        }
 922    }
 923
 924    /// How many CPUs are available to the dispatcher.
 925    pub fn num_cpus(&self) -> usize {
 926        self.executor.num_cpus()
 927    }
 928
 929    /// Spawn a future into this scope.
 930    #[track_caller]
 931    pub fn spawn<F>(&mut self, f: F)
 932    where
 933        F: Future<Output = ()> + Send + 'a,
 934    {
 935        let tx = self.tx.clone().unwrap();
 936
 937        // SAFETY: The 'a lifetime is guaranteed to outlive any of these futures because
 938        // dropping this `Scope` blocks until all of the futures have resolved.
 939        let f = unsafe {
 940            mem::transmute::<
 941                Pin<Box<dyn Future<Output = ()> + Send + 'a>>,
 942                Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
 943            >(Box::pin(async move {
 944                f.await;
 945                drop(tx);
 946            }))
 947        };
 948        self.futures.push(f);
 949    }
 950}
 951
 952impl Drop for Scope<'_> {
 953    fn drop(&mut self) {
 954        self.tx.take().unwrap();
 955
 956        // Wait until the channel is closed, which means that all of the spawned
 957        // futures have resolved.
 958        self.executor.block(self.rx.next());
 959    }
 960}
 961
 962#[cfg(test)]
 963mod test {
 964    use super::*;
 965    use crate::{App, TestDispatcher, TestPlatform};
 966    use rand::SeedableRng;
 967    use std::cell::RefCell;
 968
 969    #[test]
 970    fn sanity_test_tasks_run() {
 971        let dispatcher = TestDispatcher::new(StdRng::seed_from_u64(0));
 972        let arc_dispatcher = Arc::new(dispatcher.clone());
 973        let background_executor = BackgroundExecutor::new(arc_dispatcher.clone());
 974        let foreground_executor = ForegroundExecutor::new(arc_dispatcher);
 975
 976        let platform = TestPlatform::new(background_executor, foreground_executor.clone());
 977        let asset_source = Arc::new(());
 978        let http_client = http_client::FakeHttpClient::with_404_response();
 979
 980        let app = App::new_app(platform, asset_source, http_client);
 981        let liveness_token = std::sync::Arc::downgrade(&app.borrow().liveness);
 982
 983        let task_ran = Rc::new(RefCell::new(false));
 984
 985        foreground_executor
 986            .spawn_context(liveness_token, {
 987                let task_ran = Rc::clone(&task_ran);
 988                async move {
 989                    *task_ran.borrow_mut() = true;
 990                }
 991            })
 992            .detach();
 993
 994        // Run dispatcher while app is still alive
 995        dispatcher.run_until_parked();
 996
 997        // Task should have run
 998        assert!(
 999            *task_ran.borrow(),
1000            "Task should run normally when app is alive"
1001        );
1002    }
1003
1004    #[test]
1005    fn test_task_cancelled_when_app_dropped() {
1006        let dispatcher = TestDispatcher::new(StdRng::seed_from_u64(0));
1007        let arc_dispatcher = Arc::new(dispatcher.clone());
1008        let background_executor = BackgroundExecutor::new(arc_dispatcher.clone());
1009        let foreground_executor = ForegroundExecutor::new(arc_dispatcher);
1010
1011        let platform = TestPlatform::new(background_executor, foreground_executor.clone());
1012        let asset_source = Arc::new(());
1013        let http_client = http_client::FakeHttpClient::with_404_response();
1014
1015        let app = App::new_app(platform, asset_source, http_client);
1016        let liveness_token = std::sync::Arc::downgrade(&app.borrow().liveness);
1017        let app_weak = Rc::downgrade(&app);
1018
1019        let task_ran = Rc::new(RefCell::new(false));
1020        let task_ran_clone = Rc::clone(&task_ran);
1021
1022        foreground_executor
1023            .spawn_context(liveness_token, async move {
1024                *task_ran_clone.borrow_mut() = true;
1025            })
1026            .detach();
1027
1028        drop(app);
1029
1030        assert!(app_weak.upgrade().is_none(), "App should have been dropped");
1031
1032        dispatcher.run_until_parked();
1033
1034        // The task should have been cancelled, not run
1035        assert!(
1036            !*task_ran.borrow(),
1037            "Task should have been cancelled when app was dropped, but it ran!"
1038        );
1039    }
1040
1041    #[test]
1042    fn test_nested_tasks_both_cancel() {
1043        let dispatcher = TestDispatcher::new(StdRng::seed_from_u64(0));
1044        let arc_dispatcher = Arc::new(dispatcher.clone());
1045        let background_executor = BackgroundExecutor::new(arc_dispatcher.clone());
1046        let foreground_executor = ForegroundExecutor::new(arc_dispatcher);
1047
1048        let platform = TestPlatform::new(background_executor, foreground_executor.clone());
1049        let asset_source = Arc::new(());
1050        let http_client = http_client::FakeHttpClient::with_404_response();
1051
1052        let app = App::new_app(platform, asset_source, http_client);
1053        let liveness_token = std::sync::Arc::downgrade(&app.borrow().liveness);
1054        let app_weak = Rc::downgrade(&app);
1055
1056        let outer_completed = Rc::new(RefCell::new(false));
1057        let inner_completed = Rc::new(RefCell::new(false));
1058        let reached_await = Rc::new(RefCell::new(false));
1059
1060        let outer_flag = Rc::clone(&outer_completed);
1061        let inner_flag = Rc::clone(&inner_completed);
1062        let await_flag = Rc::clone(&reached_await);
1063
1064        // Channel to block the inner task until we're ready
1065        let (tx, rx) = futures::channel::oneshot::channel::<()>();
1066
1067        // We need clones of executor and liveness_token for the inner spawn
1068        let inner_executor = foreground_executor.clone();
1069        let inner_liveness_token = liveness_token.clone();
1070
1071        foreground_executor
1072            .spawn_context(liveness_token, async move {
1073                let inner_task = inner_executor.spawn_context(inner_liveness_token, {
1074                    let inner_flag = Rc::clone(&inner_flag);
1075                    async move {
1076                        rx.await.ok();
1077                        *inner_flag.borrow_mut() = true;
1078                    }
1079                });
1080
1081                *await_flag.borrow_mut() = true;
1082
1083                inner_task.await;
1084
1085                *outer_flag.borrow_mut() = true;
1086            })
1087            .detach();
1088
1089        // Run dispatcher until outer task reaches the await point
1090        // The inner task will be blocked on the channel
1091        dispatcher.run_until_parked();
1092
1093        // Verify we actually reached the await point before dropping the app
1094        assert!(
1095            *reached_await.borrow(),
1096            "Outer task should have reached the await point"
1097        );
1098
1099        // Neither task should have completed yet
1100        assert!(
1101            !*outer_completed.borrow(),
1102            "Outer task should not have completed yet"
1103        );
1104        assert!(
1105            !*inner_completed.borrow(),
1106            "Inner task should not have completed yet"
1107        );
1108
1109        // Drop the channel sender and app while outer is awaiting inner
1110        drop(tx);
1111        drop(app);
1112        assert!(app_weak.upgrade().is_none(), "App should have been dropped");
1113
1114        // Run dispatcher - both tasks should be cancelled
1115        dispatcher.run_until_parked();
1116
1117        // Neither task should have completed (both were cancelled)
1118        assert!(
1119            !*outer_completed.borrow(),
1120            "Outer task should have been cancelled, not completed"
1121        );
1122        assert!(
1123            !*inner_completed.borrow(),
1124            "Inner task should have been cancelled, not completed"
1125        );
1126    }
1127
1128    #[test]
1129    fn test_task_without_app_tracking_still_runs() {
1130        let dispatcher = TestDispatcher::new(StdRng::seed_from_u64(0));
1131        let arc_dispatcher = Arc::new(dispatcher.clone());
1132        let background_executor = BackgroundExecutor::new(arc_dispatcher.clone());
1133        let foreground_executor = ForegroundExecutor::new(arc_dispatcher);
1134
1135        let platform = TestPlatform::new(background_executor, foreground_executor.clone());
1136        let asset_source = Arc::new(());
1137        let http_client = http_client::FakeHttpClient::with_404_response();
1138
1139        let app = App::new_app(platform, asset_source, http_client);
1140        let app_weak = Rc::downgrade(&app);
1141
1142        let task_ran = Rc::new(RefCell::new(false));
1143        let task_ran_clone = Rc::clone(&task_ran);
1144
1145        let _task = foreground_executor.spawn(async move {
1146            *task_ran_clone.borrow_mut() = true;
1147        });
1148
1149        drop(app);
1150
1151        assert!(app_weak.upgrade().is_none(), "App should have been dropped");
1152
1153        dispatcher.run_until_parked();
1154
1155        assert!(
1156            *task_ran.borrow(),
1157            "Task without app tracking should still run after app is dropped"
1158        );
1159    }
1160
1161    #[test]
1162    #[should_panic]
1163    fn test_polling_cancelled_task_panics() {
1164        let dispatcher = TestDispatcher::new(StdRng::seed_from_u64(0));
1165        let arc_dispatcher = Arc::new(dispatcher.clone());
1166        let background_executor = BackgroundExecutor::new(arc_dispatcher.clone());
1167        let foreground_executor = ForegroundExecutor::new(arc_dispatcher);
1168
1169        let platform = TestPlatform::new(background_executor.clone(), foreground_executor.clone());
1170        let asset_source = Arc::new(());
1171        let http_client = http_client::FakeHttpClient::with_404_response();
1172
1173        let app = App::new_app(platform, asset_source, http_client);
1174        let liveness_token = std::sync::Arc::downgrade(&app.borrow().liveness);
1175        let app_weak = Rc::downgrade(&app);
1176
1177        let task = foreground_executor.spawn_context(liveness_token, async move { 42 });
1178
1179        drop(app);
1180
1181        assert!(app_weak.upgrade().is_none(), "App should have been dropped");
1182
1183        dispatcher.run_until_parked();
1184
1185        background_executor.block(task);
1186    }
1187
1188    #[test]
1189    fn test_polling_cancelled_task_returns_none_with_fallible() {
1190        let dispatcher = TestDispatcher::new(StdRng::seed_from_u64(0));
1191        let arc_dispatcher = Arc::new(dispatcher.clone());
1192        let background_executor = BackgroundExecutor::new(arc_dispatcher.clone());
1193        let foreground_executor = ForegroundExecutor::new(arc_dispatcher);
1194
1195        let platform = TestPlatform::new(background_executor.clone(), foreground_executor.clone());
1196        let asset_source = Arc::new(());
1197        let http_client = http_client::FakeHttpClient::with_404_response();
1198
1199        let app = App::new_app(platform, asset_source, http_client);
1200        let liveness_token = std::sync::Arc::downgrade(&app.borrow().liveness);
1201        let app_weak = Rc::downgrade(&app);
1202
1203        let task = foreground_executor
1204            .spawn_context(liveness_token, async move { 42 })
1205            .fallible();
1206
1207        drop(app);
1208
1209        assert!(app_weak.upgrade().is_none(), "App should have been dropped");
1210
1211        dispatcher.run_until_parked();
1212
1213        let result = background_executor.block(task);
1214        assert_eq!(result, None, "Cancelled task should return None");
1215    }
1216}