executor.rs

   1use crate::util;
   2use crate::PlatformDispatcher;
   3use anyhow::{anyhow, Result};
   4use async_task::Runnable;
   5use futures::channel::{mpsc, oneshot};
   6use smol::{channel, prelude::*, Executor};
   7use std::{
   8    any::Any,
   9    fmt::{self},
  10    marker::PhantomData,
  11    mem,
  12    pin::Pin,
  13    rc::Rc,
  14    sync::Arc,
  15    task::{Context, Poll},
  16    thread,
  17    time::Duration,
  18};
  19
  20/// Enqueues the given closure to run on the application's event loop.
  21/// Returns the result asynchronously.
  22pub(crate) fn run_on_main<F, R>(
  23    dispatcher: Arc<dyn PlatformDispatcher>,
  24    func: F,
  25) -> impl Future<Output = R>
  26where
  27    F: FnOnce() -> R + Send + 'static,
  28    R: Send + 'static,
  29{
  30    spawn_on_main(dispatcher, move || async move { func() })
  31}
  32
  33/// Enqueues the given closure to be run on the application's event loop. The
  34/// closure returns a future which will be run to completion on the main thread.
  35pub(crate) fn spawn_on_main<F, R>(
  36    dispatcher: Arc<dyn PlatformDispatcher>,
  37    func: impl FnOnce() -> F + Send + 'static,
  38) -> impl Future<Output = R>
  39where
  40    F: Future<Output = R> + 'static,
  41    R: Send + 'static,
  42{
  43    let (tx, rx) = oneshot::channel();
  44    let (runnable, task) = async_task::spawn(
  45        {
  46            let dispatcher = dispatcher.clone();
  47            async move {
  48                let future = func();
  49                let _ = spawn_on_main_local(dispatcher, async move {
  50                    let result = future.await;
  51                    let _ = tx.send(result);
  52                });
  53            }
  54        },
  55        move |runnable| dispatcher.run_on_main_thread(runnable),
  56    );
  57    runnable.schedule();
  58    task.detach();
  59    async move { rx.await.unwrap() }
  60}
  61
  62/// Enqueues the given closure to be run on the application's event loop. Must
  63/// be called on the main thread.
  64pub(crate) fn spawn_on_main_local<R>(
  65    dispatcher: Arc<dyn PlatformDispatcher>,
  66    future: impl Future<Output = R> + 'static,
  67) -> impl Future<Output = R>
  68where
  69    R: 'static,
  70{
  71    assert!(dispatcher.is_main_thread(), "must be called on main thread");
  72
  73    let (tx, rx) = oneshot::channel();
  74    let (runnable, task) = async_task::spawn_local(
  75        async move {
  76            let result = future.await;
  77            let _ = tx.send(result);
  78        },
  79        move |runnable| dispatcher.run_on_main_thread(runnable),
  80    );
  81    runnable.schedule();
  82    task.detach();
  83    async move { rx.await.unwrap() }
  84}
  85
  86pub enum ForegroundExecutor {
  87    Platform {
  88        dispatcher: Arc<dyn PlatformDispatcher>,
  89        _not_send_or_sync: PhantomData<Rc<()>>,
  90    },
  91    #[cfg(any(test, feature = "test"))]
  92    Deterministic {
  93        cx_id: usize,
  94        executor: Arc<Deterministic>,
  95    },
  96}
  97
  98pub enum BackgroundExecutor {
  99    #[cfg(any(test, feature = "test"))]
 100    Deterministic { executor: Arc<Deterministic> },
 101    Production {
 102        executor: Arc<smol::Executor<'static>>,
 103        _stop: channel::Sender<()>,
 104    },
 105}
 106
 107type AnyLocalFuture = Pin<Box<dyn 'static + Future<Output = Box<dyn Any + 'static>>>>;
 108type AnyFuture = Pin<Box<dyn 'static + Send + Future<Output = Box<dyn Any + Send + 'static>>>>;
 109type AnyTask = async_task::Task<Box<dyn Any + Send + 'static>>;
 110type AnyLocalTask = async_task::Task<Box<dyn Any + 'static>>;
 111
 112#[must_use]
 113pub enum Task<T> {
 114    Ready(Option<T>),
 115    Local {
 116        any_task: AnyLocalTask,
 117        result_type: PhantomData<T>,
 118    },
 119    Send {
 120        any_task: AnyTask,
 121        result_type: PhantomData<T>,
 122    },
 123}
 124
 125unsafe impl<T: Send> Send for Task<T> {}
 126
 127#[cfg(any(test, feature = "test"))]
 128struct DeterministicState {
 129    rng: rand::prelude::StdRng,
 130    seed: u64,
 131    scheduled_from_foreground: collections::HashMap<usize, Vec<ForegroundRunnable>>,
 132    scheduled_from_background: Vec<BackgroundRunnable>,
 133    forbid_parking: bool,
 134    block_on_ticks: std::ops::RangeInclusive<usize>,
 135    now: std::time::Instant,
 136    next_timer_id: usize,
 137    pending_timers: Vec<(usize, std::time::Instant, postage::barrier::Sender)>,
 138    waiting_backtrace: Option<backtrace::Backtrace>,
 139    next_runnable_id: usize,
 140    poll_history: Vec<ExecutorEvent>,
 141    previous_poll_history: Option<Vec<ExecutorEvent>>,
 142    enable_runnable_backtraces: bool,
 143    runnable_backtraces: collections::HashMap<usize, backtrace::Backtrace>,
 144}
 145
 146#[derive(Copy, Clone, Debug, PartialEq, Eq)]
 147pub enum ExecutorEvent {
 148    PollRunnable { id: usize },
 149    EnqueuRunnable { id: usize },
 150}
 151
 152#[cfg(any(test, feature = "test"))]
 153struct ForegroundRunnable {
 154    id: usize,
 155    runnable: Runnable,
 156    main: bool,
 157}
 158
 159#[cfg(any(test, feature = "test"))]
 160struct BackgroundRunnable {
 161    id: usize,
 162    runnable: Runnable,
 163}
 164
 165#[cfg(any(test, feature = "test"))]
 166pub struct Deterministic {
 167    state: Arc<parking_lot::Mutex<DeterministicState>>,
 168    parker: parking_lot::Mutex<parking::Parker>,
 169}
 170
 171#[must_use]
 172pub enum Timer {
 173    Production(smol::Timer),
 174    #[cfg(any(test, feature = "test"))]
 175    Deterministic(DeterministicTimer),
 176}
 177
 178#[cfg(any(test, feature = "test"))]
 179pub struct DeterministicTimer {
 180    rx: postage::barrier::Receiver,
 181    id: usize,
 182    state: Arc<parking_lot::Mutex<DeterministicState>>,
 183}
 184
 185#[cfg(any(test, feature = "test"))]
 186impl Deterministic {
 187    pub fn new(seed: u64) -> Arc<Self> {
 188        use rand::prelude::*;
 189
 190        Arc::new(Self {
 191            state: Arc::new(parking_lot::Mutex::new(DeterministicState {
 192                rng: StdRng::seed_from_u64(seed),
 193                seed,
 194                scheduled_from_foreground: Default::default(),
 195                scheduled_from_background: Default::default(),
 196                forbid_parking: false,
 197                block_on_ticks: 0..=1000,
 198                now: std::time::Instant::now(),
 199                next_timer_id: Default::default(),
 200                pending_timers: Default::default(),
 201                waiting_backtrace: None,
 202                next_runnable_id: 0,
 203                poll_history: Default::default(),
 204                previous_poll_history: Default::default(),
 205                enable_runnable_backtraces: false,
 206                runnable_backtraces: Default::default(),
 207            })),
 208            parker: Default::default(),
 209        })
 210    }
 211
 212    pub fn execution_history(&self) -> Vec<ExecutorEvent> {
 213        self.state.lock().poll_history.clone()
 214    }
 215
 216    pub fn set_previous_execution_history(&self, history: Option<Vec<ExecutorEvent>>) {
 217        self.state.lock().previous_poll_history = history;
 218    }
 219
 220    pub fn enable_runnable_backtrace(&self) {
 221        self.state.lock().enable_runnable_backtraces = true;
 222    }
 223
 224    pub fn runnable_backtrace(&self, runnable_id: usize) -> backtrace::Backtrace {
 225        let mut backtrace = self.state.lock().runnable_backtraces[&runnable_id].clone();
 226        backtrace.resolve();
 227        backtrace
 228    }
 229
 230    pub fn build_background(self: &Arc<Self>) -> Arc<BackgroundExecutor> {
 231        Arc::new(BackgroundExecutor::Deterministic {
 232            executor: self.clone(),
 233        })
 234    }
 235
 236    pub fn build_foreground(self: &Arc<Self>, id: usize) -> Rc<ForegroundExecutor> {
 237        Rc::new(ForegroundExecutor::Deterministic {
 238            cx_id: id,
 239            executor: self.clone(),
 240        })
 241    }
 242
 243    fn spawn_from_foreground(
 244        &self,
 245        cx_id: usize,
 246        future: AnyLocalFuture,
 247        main: bool,
 248    ) -> AnyLocalTask {
 249        let state = self.state.clone();
 250        let id;
 251        {
 252            let mut state = state.lock();
 253            id = util::post_inc(&mut state.next_runnable_id);
 254            if state.enable_runnable_backtraces {
 255                state
 256                    .runnable_backtraces
 257                    .insert(id, backtrace::Backtrace::new_unresolved());
 258            }
 259        }
 260
 261        let unparker = self.parker.lock().unparker();
 262        let (runnable, task) = async_task::spawn_local(future, move |runnable| {
 263            let mut state = state.lock();
 264            state.push_to_history(ExecutorEvent::EnqueuRunnable { id });
 265            state
 266                .scheduled_from_foreground
 267                .entry(cx_id)
 268                .or_default()
 269                .push(ForegroundRunnable { id, runnable, main });
 270            unparker.unpark();
 271        });
 272        runnable.schedule();
 273        task
 274    }
 275
 276    fn spawn(&self, future: AnyFuture) -> AnyTask {
 277        let state = self.state.clone();
 278        let id;
 279        {
 280            let mut state = state.lock();
 281            id = util::post_inc(&mut state.next_runnable_id);
 282            if state.enable_runnable_backtraces {
 283                state
 284                    .runnable_backtraces
 285                    .insert(id, backtrace::Backtrace::new_unresolved());
 286            }
 287        }
 288
 289        let unparker = self.parker.lock().unparker();
 290        let (runnable, task) = async_task::spawn(future, move |runnable| {
 291            let mut state = state.lock();
 292            state
 293                .poll_history
 294                .push(ExecutorEvent::EnqueuRunnable { id });
 295            state
 296                .scheduled_from_background
 297                .push(BackgroundRunnable { id, runnable });
 298            unparker.unpark();
 299        });
 300        runnable.schedule();
 301        task
 302    }
 303
 304    fn run<'a>(
 305        &self,
 306        cx_id: usize,
 307        main_future: Pin<Box<dyn 'a + Future<Output = Box<dyn Any>>>>,
 308    ) -> Box<dyn Any> {
 309        use std::sync::atomic::{AtomicBool, Ordering::SeqCst};
 310
 311        let woken = Arc::new(AtomicBool::new(false));
 312
 313        let state = self.state.clone();
 314        let id;
 315        {
 316            let mut state = state.lock();
 317            id = util::post_inc(&mut state.next_runnable_id);
 318            if state.enable_runnable_backtraces {
 319                state
 320                    .runnable_backtraces
 321                    .insert(id, backtrace::Backtrace::new_unresolved());
 322            }
 323        }
 324
 325        let unparker = self.parker.lock().unparker();
 326        let (runnable, mut main_task) = unsafe {
 327            async_task::spawn_unchecked(main_future, move |runnable| {
 328                let state = &mut *state.lock();
 329                state
 330                    .scheduled_from_foreground
 331                    .entry(cx_id)
 332                    .or_default()
 333                    .push(ForegroundRunnable {
 334                        id: util::post_inc(&mut state.next_runnable_id),
 335                        runnable,
 336                        main: true,
 337                    });
 338                unparker.unpark();
 339            })
 340        };
 341        runnable.schedule();
 342
 343        loop {
 344            if let Some(result) = self.run_internal(woken.clone(), Some(&mut main_task)) {
 345                return result;
 346            }
 347
 348            if !woken.load(SeqCst) {
 349                self.state.lock().will_park();
 350            }
 351
 352            woken.store(false, SeqCst);
 353            self.parker.lock().park();
 354        }
 355    }
 356
 357    pub fn run_until_parked(&self) {
 358        use std::sync::atomic::AtomicBool;
 359        let woken = Arc::new(AtomicBool::new(false));
 360        self.run_internal(woken, None);
 361    }
 362
 363    fn run_internal(
 364        &self,
 365        woken: Arc<std::sync::atomic::AtomicBool>,
 366        mut main_task: Option<&mut AnyLocalTask>,
 367    ) -> Option<Box<dyn Any>> {
 368        use rand::prelude::*;
 369        use std::sync::atomic::Ordering::SeqCst;
 370
 371        let unparker = self.parker.lock().unparker();
 372        let waker = waker_fn::waker_fn(move || {
 373            woken.store(true, SeqCst);
 374            unparker.unpark();
 375        });
 376
 377        let mut cx = Context::from_waker(&waker);
 378        loop {
 379            let mut state = self.state.lock();
 380
 381            if state.scheduled_from_foreground.is_empty()
 382                && state.scheduled_from_background.is_empty()
 383            {
 384                if let Some(main_task) = main_task {
 385                    if let Poll::Ready(result) = main_task.poll(&mut cx) {
 386                        return Some(result);
 387                    }
 388                }
 389
 390                return None;
 391            }
 392
 393            if !state.scheduled_from_background.is_empty() && state.rng.gen() {
 394                let background_len = state.scheduled_from_background.len();
 395                let ix = state.rng.gen_range(0..background_len);
 396                let background_runnable = state.scheduled_from_background.remove(ix);
 397                state.push_to_history(ExecutorEvent::PollRunnable {
 398                    id: background_runnable.id,
 399                });
 400                drop(state);
 401                background_runnable.runnable.run();
 402            } else if !state.scheduled_from_foreground.is_empty() {
 403                let available_cx_ids = state
 404                    .scheduled_from_foreground
 405                    .keys()
 406                    .copied()
 407                    .collect::<Vec<_>>();
 408                let cx_id_to_run = *available_cx_ids.iter().choose(&mut state.rng).unwrap();
 409                let scheduled_from_cx = state
 410                    .scheduled_from_foreground
 411                    .get_mut(&cx_id_to_run)
 412                    .unwrap();
 413                let foreground_runnable = scheduled_from_cx.remove(0);
 414                if scheduled_from_cx.is_empty() {
 415                    state.scheduled_from_foreground.remove(&cx_id_to_run);
 416                }
 417                state.push_to_history(ExecutorEvent::PollRunnable {
 418                    id: foreground_runnable.id,
 419                });
 420
 421                drop(state);
 422
 423                foreground_runnable.runnable.run();
 424                if let Some(main_task) = main_task.as_mut() {
 425                    if foreground_runnable.main {
 426                        if let Poll::Ready(result) = main_task.poll(&mut cx) {
 427                            return Some(result);
 428                        }
 429                    }
 430                }
 431            }
 432        }
 433    }
 434
 435    fn block<F, T>(&self, future: &mut F, max_ticks: usize) -> Option<T>
 436    where
 437        F: Unpin + Future<Output = T>,
 438    {
 439        use rand::prelude::*;
 440
 441        let unparker = self.parker.lock().unparker();
 442        let waker = waker_fn::waker_fn(move || {
 443            unparker.unpark();
 444        });
 445
 446        let mut cx = Context::from_waker(&waker);
 447        for _ in 0..max_ticks {
 448            let mut state = self.state.lock();
 449            let runnable_count = state.scheduled_from_background.len();
 450            let ix = state.rng.gen_range(0..=runnable_count);
 451            if ix < state.scheduled_from_background.len() {
 452                let background_runnable = state.scheduled_from_background.remove(ix);
 453                state.push_to_history(ExecutorEvent::PollRunnable {
 454                    id: background_runnable.id,
 455                });
 456                drop(state);
 457                background_runnable.runnable.run();
 458            } else {
 459                drop(state);
 460                if let Poll::Ready(result) = future.poll(&mut cx) {
 461                    return Some(result);
 462                }
 463                let mut state = self.state.lock();
 464                if state.scheduled_from_background.is_empty() {
 465                    state.will_park();
 466                    drop(state);
 467                    self.parker.lock().park();
 468                }
 469
 470                continue;
 471            }
 472        }
 473
 474        None
 475    }
 476
 477    pub fn timer(&self, duration: Duration) -> Timer {
 478        let (tx, rx) = postage::barrier::channel();
 479        let mut state = self.state.lock();
 480        let wakeup_at = state.now + duration;
 481        let id = util::post_inc(&mut state.next_timer_id);
 482        match state
 483            .pending_timers
 484            .binary_search_by_key(&wakeup_at, |e| e.1)
 485        {
 486            Ok(ix) | Err(ix) => state.pending_timers.insert(ix, (id, wakeup_at, tx)),
 487        }
 488        let state = self.state.clone();
 489        Timer::Deterministic(DeterministicTimer { rx, id, state })
 490    }
 491
 492    pub fn now(&self) -> std::time::Instant {
 493        let state = self.state.lock();
 494        state.now
 495    }
 496
 497    pub fn advance_clock(&self, duration: Duration) {
 498        let new_now = self.state.lock().now + duration;
 499        loop {
 500            self.run_until_parked();
 501            let mut state = self.state.lock();
 502
 503            if let Some((_, wakeup_time, _)) = state.pending_timers.first() {
 504                let wakeup_time = *wakeup_time;
 505                if wakeup_time <= new_now {
 506                    let timer_count = state
 507                        .pending_timers
 508                        .iter()
 509                        .take_while(|(_, t, _)| *t == wakeup_time)
 510                        .count();
 511                    state.now = wakeup_time;
 512                    let timers_to_wake = state
 513                        .pending_timers
 514                        .drain(0..timer_count)
 515                        .collect::<Vec<_>>();
 516                    drop(state);
 517                    drop(timers_to_wake);
 518                    continue;
 519                }
 520            }
 521
 522            break;
 523        }
 524
 525        self.state.lock().now = new_now;
 526    }
 527
 528    pub fn start_waiting(&self) {
 529        self.state.lock().waiting_backtrace = Some(backtrace::Backtrace::new_unresolved());
 530    }
 531
 532    pub fn finish_waiting(&self) {
 533        self.state.lock().waiting_backtrace.take();
 534    }
 535
 536    pub fn forbid_parking(&self) {
 537        use rand::prelude::*;
 538
 539        let mut state = self.state.lock();
 540        state.forbid_parking = true;
 541        state.rng = StdRng::seed_from_u64(state.seed);
 542    }
 543
 544    pub fn allow_parking(&self) {
 545        use rand::prelude::*;
 546
 547        let mut state = self.state.lock();
 548        state.forbid_parking = false;
 549        state.rng = StdRng::seed_from_u64(state.seed);
 550    }
 551
 552    pub async fn simulate_random_delay(&self) {
 553        use rand::prelude::*;
 554        use smol::future::yield_now;
 555        if self.state.lock().rng.gen_bool(0.2) {
 556            let yields = self.state.lock().rng.gen_range(1..=10);
 557            for _ in 0..yields {
 558                yield_now().await;
 559            }
 560        }
 561    }
 562
 563    pub fn record_backtrace(&self) {
 564        let mut state = self.state.lock();
 565        if state.enable_runnable_backtraces {
 566            let current_id = state
 567                .poll_history
 568                .iter()
 569                .rev()
 570                .find_map(|event| match event {
 571                    ExecutorEvent::PollRunnable { id } => Some(*id),
 572                    _ => None,
 573                });
 574            if let Some(id) = current_id {
 575                state
 576                    .runnable_backtraces
 577                    .insert(id, backtrace::Backtrace::new_unresolved());
 578            }
 579        }
 580    }
 581}
 582
 583impl Drop for Timer {
 584    fn drop(&mut self) {
 585        #[cfg(any(test, feature = "test"))]
 586        if let Timer::Deterministic(DeterministicTimer { state, id, .. }) = self {
 587            state
 588                .lock()
 589                .pending_timers
 590                .retain(|(timer_id, _, _)| timer_id != id)
 591        }
 592    }
 593}
 594
 595impl Future for Timer {
 596    type Output = ();
 597
 598    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
 599        match &mut *self {
 600            #[cfg(any(test, feature = "test"))]
 601            Self::Deterministic(DeterministicTimer { rx, .. }) => {
 602                use postage::stream::{PollRecv, Stream as _};
 603                smol::pin!(rx);
 604                match rx.poll_recv(&mut postage::Context::from_waker(cx.waker())) {
 605                    PollRecv::Ready(()) | PollRecv::Closed => Poll::Ready(()),
 606                    PollRecv::Pending => Poll::Pending,
 607                }
 608            }
 609            Self::Production(timer) => {
 610                smol::pin!(timer);
 611                match timer.poll(cx) {
 612                    Poll::Ready(_) => Poll::Ready(()),
 613                    Poll::Pending => Poll::Pending,
 614                }
 615            }
 616        }
 617    }
 618}
 619
 620#[cfg(any(test, feature = "test"))]
 621impl DeterministicState {
 622    fn push_to_history(&mut self, event: ExecutorEvent) {
 623        use std::fmt::Write as _;
 624
 625        self.poll_history.push(event);
 626        if let Some(prev_history) = &self.previous_poll_history {
 627            let ix = self.poll_history.len() - 1;
 628            let prev_event = prev_history[ix];
 629            if event != prev_event {
 630                let mut message = String::new();
 631                writeln!(
 632                    &mut message,
 633                    "current runnable backtrace:\n{:?}",
 634                    self.runnable_backtraces.get_mut(&event.id()).map(|trace| {
 635                        trace.resolve();
 636                        crate::util::CwdBacktrace(trace)
 637                    })
 638                )
 639                .unwrap();
 640                writeln!(
 641                    &mut message,
 642                    "previous runnable backtrace:\n{:?}",
 643                    self.runnable_backtraces
 644                        .get_mut(&prev_event.id())
 645                        .map(|trace| {
 646                            trace.resolve();
 647                            util::CwdBacktrace(trace)
 648                        })
 649                )
 650                .unwrap();
 651                panic!("detected non-determinism after {ix}. {message}");
 652            }
 653        }
 654    }
 655
 656    fn will_park(&mut self) {
 657        if self.forbid_parking {
 658            let mut backtrace_message = String::new();
 659            #[cfg(any(test, feature = "test"))]
 660            if let Some(backtrace) = self.waiting_backtrace.as_mut() {
 661                backtrace.resolve();
 662                backtrace_message = format!(
 663                    "\nbacktrace of waiting future:\n{:?}",
 664                    util::CwdBacktrace(backtrace)
 665                );
 666            }
 667
 668            panic!(
 669                "deterministic executor parked after a call to forbid_parking{}",
 670                backtrace_message
 671            );
 672        }
 673    }
 674}
 675
 676#[cfg(any(test, feature = "test"))]
 677impl ExecutorEvent {
 678    pub fn id(&self) -> usize {
 679        match self {
 680            ExecutorEvent::PollRunnable { id } => *id,
 681            ExecutorEvent::EnqueuRunnable { id } => *id,
 682        }
 683    }
 684}
 685
 686impl ForegroundExecutor {
 687    pub fn new(dispatcher: Arc<dyn PlatformDispatcher>) -> Result<Self> {
 688        if dispatcher.is_main_thread() {
 689            Ok(Self::Platform {
 690                dispatcher,
 691                _not_send_or_sync: PhantomData,
 692            })
 693        } else {
 694            Err(anyhow!("must be constructed on main thread"))
 695        }
 696    }
 697
 698    pub fn spawn<T: 'static>(&self, future: impl Future<Output = T> + 'static) -> Task<T> {
 699        let future = any_local_future(future);
 700        let any_task = match self {
 701            #[cfg(any(test, feature = "test"))]
 702            Self::Deterministic { cx_id, executor } => {
 703                executor.spawn_from_foreground(*cx_id, future, false)
 704            }
 705            Self::Platform { dispatcher, .. } => {
 706                fn spawn_inner(
 707                    future: AnyLocalFuture,
 708                    dispatcher: &Arc<dyn PlatformDispatcher>,
 709                ) -> AnyLocalTask {
 710                    let dispatcher = dispatcher.clone();
 711                    let schedule =
 712                        move |runnable: Runnable| dispatcher.run_on_main_thread(runnable);
 713                    let (runnable, task) = async_task::spawn_local(future, schedule);
 714                    runnable.schedule();
 715                    task
 716                }
 717                spawn_inner(future, dispatcher)
 718            }
 719        };
 720        Task::local(any_task)
 721    }
 722
 723    #[cfg(any(test, feature = "test"))]
 724    pub fn run<T: 'static>(&self, future: impl Future<Output = T>) -> T {
 725        let future = async move { Box::new(future.await) as Box<dyn Any> }.boxed_local();
 726        let result = match self {
 727            Self::Deterministic { cx_id, executor } => executor.run(*cx_id, future),
 728            Self::Platform { .. } => panic!("you can't call run on a platform foreground executor"),
 729        };
 730        *result.downcast().unwrap()
 731    }
 732
 733    #[cfg(any(test, feature = "test"))]
 734    pub fn run_until_parked(&self) {
 735        match self {
 736            Self::Deterministic { executor, .. } => executor.run_until_parked(),
 737            _ => panic!("this method can only be called on a deterministic executor"),
 738        }
 739    }
 740
 741    #[cfg(any(test, feature = "test"))]
 742    pub fn parking_forbidden(&self) -> bool {
 743        match self {
 744            Self::Deterministic { executor, .. } => executor.state.lock().forbid_parking,
 745            _ => panic!("this method can only be called on a deterministic executor"),
 746        }
 747    }
 748
 749    #[cfg(any(test, feature = "test"))]
 750    pub fn start_waiting(&self) {
 751        match self {
 752            Self::Deterministic { executor, .. } => executor.start_waiting(),
 753            _ => panic!("this method can only be called on a deterministic executor"),
 754        }
 755    }
 756
 757    #[cfg(any(test, feature = "test"))]
 758    pub fn finish_waiting(&self) {
 759        match self {
 760            Self::Deterministic { executor, .. } => executor.finish_waiting(),
 761            _ => panic!("this method can only be called on a deterministic executor"),
 762        }
 763    }
 764
 765    #[cfg(any(test, feature = "test"))]
 766    pub fn forbid_parking(&self) {
 767        match self {
 768            Self::Deterministic { executor, .. } => executor.forbid_parking(),
 769            _ => panic!("this method can only be called on a deterministic executor"),
 770        }
 771    }
 772
 773    #[cfg(any(test, feature = "test"))]
 774    pub fn allow_parking(&self) {
 775        match self {
 776            Self::Deterministic { executor, .. } => executor.allow_parking(),
 777            _ => panic!("this method can only be called on a deterministic executor"),
 778        }
 779    }
 780
 781    #[cfg(any(test, feature = "test"))]
 782    pub fn advance_clock(&self, duration: Duration) {
 783        match self {
 784            Self::Deterministic { executor, .. } => executor.advance_clock(duration),
 785            _ => panic!("this method can only be called on a deterministic executor"),
 786        }
 787    }
 788
 789    #[cfg(any(test, feature = "test"))]
 790    pub fn set_block_on_ticks(&self, range: std::ops::RangeInclusive<usize>) {
 791        match self {
 792            Self::Deterministic { executor, .. } => executor.state.lock().block_on_ticks = range,
 793            _ => panic!("this method can only be called on a deterministic executor"),
 794        }
 795    }
 796}
 797
 798impl BackgroundExecutor {
 799    pub fn new() -> Self {
 800        let executor = Arc::new(Executor::new());
 801        let stop = channel::unbounded::<()>();
 802
 803        for i in 0..2 * num_cpus::get() {
 804            let executor = executor.clone();
 805            let stop = stop.1.clone();
 806            thread::Builder::new()
 807                .name(format!("background-executor-{}", i))
 808                .spawn(move || smol::block_on(executor.run(stop.recv())))
 809                .unwrap();
 810        }
 811
 812        Self::Production {
 813            executor,
 814            _stop: stop.0,
 815        }
 816    }
 817
 818    pub fn num_cpus(&self) -> usize {
 819        num_cpus::get()
 820    }
 821
 822    pub fn spawn<T, F>(&self, future: F) -> Task<T>
 823    where
 824        T: 'static + Send,
 825        F: Send + Future<Output = T> + 'static,
 826    {
 827        let future = any_future(future);
 828        let any_task = match self {
 829            Self::Production { executor, .. } => executor.spawn(future),
 830            #[cfg(any(test, feature = "test"))]
 831            Self::Deterministic { executor } => executor.spawn(future),
 832        };
 833        Task::send(any_task)
 834    }
 835
 836    pub fn block<F, T>(&self, future: F) -> T
 837    where
 838        F: Future<Output = T>,
 839    {
 840        smol::pin!(future);
 841        match self {
 842            Self::Production { .. } => smol::block_on(&mut future),
 843            #[cfg(any(test, feature = "test"))]
 844            Self::Deterministic { executor, .. } => {
 845                executor.block(&mut future, usize::MAX).unwrap()
 846            }
 847        }
 848    }
 849
 850    pub fn block_with_timeout<F, T>(
 851        &self,
 852        timeout: Duration,
 853        future: F,
 854    ) -> Result<T, impl Future<Output = T>>
 855    where
 856        T: 'static,
 857        F: 'static + Unpin + Future<Output = T>,
 858    {
 859        let mut future = any_local_future(future);
 860        if !timeout.is_zero() {
 861            let output = match self {
 862                Self::Production { .. } => smol::block_on(util::timeout(timeout, &mut future)).ok(),
 863                #[cfg(any(test, feature = "test"))]
 864                Self::Deterministic { executor, .. } => {
 865                    use rand::prelude::*;
 866                    let max_ticks = {
 867                        let mut state = executor.state.lock();
 868                        let range = state.block_on_ticks.clone();
 869                        state.rng.gen_range(range)
 870                    };
 871                    executor.block(&mut future, max_ticks)
 872                }
 873            };
 874            if let Some(output) = output {
 875                return Ok(*output.downcast().unwrap());
 876            }
 877        }
 878        Err(async { *future.await.downcast().unwrap() })
 879    }
 880
 881    pub async fn scoped<'scope, F>(self: &Arc<Self>, scheduler: F)
 882    where
 883        F: FnOnce(&mut Scope<'scope>),
 884    {
 885        let mut scope = Scope::new(self.clone());
 886        (scheduler)(&mut scope);
 887        let spawned = mem::take(&mut scope.futures)
 888            .into_iter()
 889            .map(|f| self.spawn(f))
 890            .collect::<Vec<_>>();
 891        for task in spawned {
 892            task.await;
 893        }
 894    }
 895
 896    pub fn timer(&self, duration: Duration) -> Timer {
 897        match self {
 898            BackgroundExecutor::Production { .. } => {
 899                Timer::Production(smol::Timer::after(duration))
 900            }
 901            #[cfg(any(test, feature = "test"))]
 902            BackgroundExecutor::Deterministic { executor } => executor.timer(duration),
 903        }
 904    }
 905
 906    pub fn now(&self) -> std::time::Instant {
 907        match self {
 908            BackgroundExecutor::Production { .. } => std::time::Instant::now(),
 909            #[cfg(any(test, feature = "test"))]
 910            BackgroundExecutor::Deterministic { executor } => executor.now(),
 911        }
 912    }
 913
 914    #[cfg(any(test, feature = "test"))]
 915    pub fn rng<'a>(&'a self) -> impl 'a + std::ops::DerefMut<Target = rand::prelude::StdRng> {
 916        match self {
 917            Self::Deterministic { executor, .. } => {
 918                parking_lot::lock_api::MutexGuard::map(executor.state.lock(), |s| &mut s.rng)
 919            }
 920            _ => panic!("this method can only be called on a deterministic executor"),
 921        }
 922    }
 923
 924    #[cfg(any(test, feature = "test"))]
 925    pub async fn simulate_random_delay(&self) {
 926        match self {
 927            Self::Deterministic { executor, .. } => {
 928                executor.simulate_random_delay().await;
 929            }
 930            _ => {
 931                panic!("this method can only be called on a deterministic executor")
 932            }
 933        }
 934    }
 935
 936    #[cfg(any(test, feature = "test"))]
 937    pub fn record_backtrace(&self) {
 938        match self {
 939            Self::Deterministic { executor, .. } => executor.record_backtrace(),
 940            _ => {
 941                panic!("this method can only be called on a deterministic executor")
 942            }
 943        }
 944    }
 945
 946    #[cfg(any(test, feature = "test"))]
 947    pub fn start_waiting(&self) {
 948        match self {
 949            Self::Deterministic { executor, .. } => executor.start_waiting(),
 950            _ => panic!("this method can only be called on a deterministic executor"),
 951        }
 952    }
 953}
 954
 955impl Default for BackgroundExecutor {
 956    fn default() -> Self {
 957        Self::new()
 958    }
 959}
 960
 961pub struct Scope<'a> {
 962    executor: Arc<BackgroundExecutor>,
 963    futures: Vec<Pin<Box<dyn Future<Output = ()> + Send + 'static>>>,
 964    tx: Option<mpsc::Sender<()>>,
 965    rx: mpsc::Receiver<()>,
 966    _phantom: PhantomData<&'a ()>,
 967}
 968
 969impl<'a> Scope<'a> {
 970    fn new(executor: Arc<BackgroundExecutor>) -> Self {
 971        let (tx, rx) = mpsc::channel(1);
 972        Self {
 973            executor,
 974            tx: Some(tx),
 975            rx,
 976            futures: Default::default(),
 977            _phantom: PhantomData,
 978        }
 979    }
 980
 981    pub fn spawn<F>(&mut self, f: F)
 982    where
 983        F: Future<Output = ()> + Send + 'a,
 984    {
 985        let tx = self.tx.clone().unwrap();
 986
 987        // Safety: The 'a lifetime is guaranteed to outlive any of these futures because
 988        // dropping this `Scope` blocks until all of the futures have resolved.
 989        let f = unsafe {
 990            mem::transmute::<
 991                Pin<Box<dyn Future<Output = ()> + Send + 'a>>,
 992                Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
 993            >(Box::pin(async move {
 994                f.await;
 995                drop(tx);
 996            }))
 997        };
 998        self.futures.push(f);
 999    }
1000}
1001
1002impl<'a> Drop for Scope<'a> {
1003    fn drop(&mut self) {
1004        self.tx.take().unwrap();
1005
1006        // Wait until the channel is closed, which means that all of the spawned
1007        // futures have resolved.
1008        self.executor.block(self.rx.next());
1009    }
1010}
1011
1012impl<T> Task<T> {
1013    pub fn ready(value: T) -> Self {
1014        Self::Ready(Some(value))
1015    }
1016
1017    fn local(any_task: AnyLocalTask) -> Self {
1018        Self::Local {
1019            any_task,
1020            result_type: PhantomData,
1021        }
1022    }
1023
1024    pub fn detach(self) {
1025        match self {
1026            Task::Ready(_) => {}
1027            Task::Local { any_task, .. } => any_task.detach(),
1028            Task::Send { any_task, .. } => any_task.detach(),
1029        }
1030    }
1031}
1032
1033// impl<T: 'static, E: 'static + Display> Task<Result<T, E>> {
1034//     #[track_caller]
1035//     pub fn detach_and_log_err(self, cx: &mut AppContext) {
1036//         let caller = Location::caller();
1037//         cx.spawn(|_| async move {
1038//             if let Err(err) = self.await {
1039//                 log::error!("{}:{}: {:#}", caller.file(), caller.line(), err);
1040//             }
1041//         })
1042//         .detach();
1043//     }
1044// }
1045
1046impl<T: Send> Task<T> {
1047    fn send(any_task: AnyTask) -> Self {
1048        Self::Send {
1049            any_task,
1050            result_type: PhantomData,
1051        }
1052    }
1053}
1054
1055impl<T: fmt::Debug> fmt::Debug for Task<T> {
1056    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1057        match self {
1058            Task::Ready(value) => value.fmt(f),
1059            Task::Local { any_task, .. } => any_task.fmt(f),
1060            Task::Send { any_task, .. } => any_task.fmt(f),
1061        }
1062    }
1063}
1064
1065impl<T: 'static> Future for Task<T> {
1066    type Output = T;
1067
1068    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1069        match unsafe { self.get_unchecked_mut() } {
1070            Task::Ready(value) => Poll::Ready(value.take().unwrap()),
1071            Task::Local { any_task, .. } => {
1072                any_task.poll(cx).map(|value| *value.downcast().unwrap())
1073            }
1074            Task::Send { any_task, .. } => {
1075                any_task.poll(cx).map(|value| *value.downcast().unwrap())
1076            }
1077        }
1078    }
1079}
1080
1081fn any_future<T, F>(future: F) -> AnyFuture
1082where
1083    T: 'static + Send,
1084    F: Future<Output = T> + Send + 'static,
1085{
1086    async { Box::new(future.await) as Box<dyn Any + Send> }.boxed()
1087}
1088
1089fn any_local_future<T, F>(future: F) -> AnyLocalFuture
1090where
1091    T: 'static,
1092    F: Future<Output = T> + 'static,
1093{
1094    async { Box::new(future.await) as Box<dyn Any> }.boxed_local()
1095}