executor.rs

   1use crate::util;
   2use crate::PlatformDispatcher;
   3use anyhow::{anyhow, Result};
   4use async_task::Runnable;
   5use futures::channel::{mpsc, oneshot};
   6use smol::{channel, prelude::*, Executor};
   7use std::{
   8    any::Any,
   9    fmt::{self},
  10    marker::PhantomData,
  11    mem,
  12    pin::Pin,
  13    rc::Rc,
  14    sync::Arc,
  15    task::{Context, Poll},
  16    thread,
  17    time::Duration,
  18};
  19
  20/// Enqueues the given closure to be run on the application's event loop. Can be
  21/// called on any thread.
  22pub(crate) fn spawn_on_main<F, R>(
  23    dispatcher: Arc<dyn PlatformDispatcher>,
  24    func: impl FnOnce() -> F + Send + 'static,
  25) -> impl Future<Output = R>
  26where
  27    F: Future<Output = R> + 'static,
  28    R: Send + 'static,
  29{
  30    let (tx, rx) = oneshot::channel();
  31    let (runnable, task) = async_task::spawn(
  32        {
  33            let dispatcher = dispatcher.clone();
  34            async move {
  35                let future = func();
  36                let _ = spawn_on_main_local(dispatcher, async move {
  37                    let result = future.await;
  38                    let _ = tx.send(result);
  39                });
  40            }
  41        },
  42        move |runnable| dispatcher.run_on_main_thread(runnable),
  43    );
  44    runnable.schedule();
  45    task.detach();
  46    async move { rx.await.unwrap() }
  47}
  48
  49/// Enqueues the given closure to be run on the application's event loop. Must
  50/// be called on the main thread.
  51pub(crate) fn spawn_on_main_local<R>(
  52    dispatcher: Arc<dyn PlatformDispatcher>,
  53    future: impl Future<Output = R> + 'static,
  54) -> impl Future<Output = R>
  55where
  56    R: 'static,
  57{
  58    assert!(dispatcher.is_main_thread(), "must be called on main thread");
  59
  60    let (tx, rx) = oneshot::channel();
  61    let (runnable, task) = async_task::spawn_local(
  62        async move {
  63            let result = future.await;
  64            let _ = tx.send(result);
  65        },
  66        move |runnable| dispatcher.run_on_main_thread(runnable),
  67    );
  68    runnable.schedule();
  69    task.detach();
  70    async move { rx.await.unwrap() }
  71}
  72
  73pub enum ForegroundExecutor {
  74    Platform {
  75        dispatcher: Arc<dyn PlatformDispatcher>,
  76        _not_send_or_sync: PhantomData<Rc<()>>,
  77    },
  78    #[cfg(any(test, feature = "test"))]
  79    Deterministic {
  80        cx_id: usize,
  81        executor: Arc<Deterministic>,
  82    },
  83}
  84
  85pub enum BackgroundExecutor {
  86    #[cfg(any(test, feature = "test"))]
  87    Deterministic { executor: Arc<Deterministic> },
  88    Production {
  89        executor: Arc<smol::Executor<'static>>,
  90        _stop: channel::Sender<()>,
  91    },
  92}
  93
  94type AnyLocalFuture = Pin<Box<dyn 'static + Future<Output = Box<dyn Any + 'static>>>>;
  95type AnyFuture = Pin<Box<dyn 'static + Send + Future<Output = Box<dyn Any + Send + 'static>>>>;
  96type AnyTask = async_task::Task<Box<dyn Any + Send + 'static>>;
  97type AnyLocalTask = async_task::Task<Box<dyn Any + 'static>>;
  98
  99#[must_use]
 100pub enum Task<T> {
 101    Ready(Option<T>),
 102    Local {
 103        any_task: AnyLocalTask,
 104        result_type: PhantomData<T>,
 105    },
 106    Send {
 107        any_task: AnyTask,
 108        result_type: PhantomData<T>,
 109    },
 110}
 111
 112unsafe impl<T: Send> Send for Task<T> {}
 113
 114#[cfg(any(test, feature = "test"))]
 115struct DeterministicState {
 116    rng: rand::prelude::StdRng,
 117    seed: u64,
 118    scheduled_from_foreground: collections::HashMap<usize, Vec<ForegroundRunnable>>,
 119    scheduled_from_background: Vec<BackgroundRunnable>,
 120    forbid_parking: bool,
 121    block_on_ticks: std::ops::RangeInclusive<usize>,
 122    now: std::time::Instant,
 123    next_timer_id: usize,
 124    pending_timers: Vec<(usize, std::time::Instant, postage::barrier::Sender)>,
 125    waiting_backtrace: Option<backtrace::Backtrace>,
 126    next_runnable_id: usize,
 127    poll_history: Vec<ExecutorEvent>,
 128    previous_poll_history: Option<Vec<ExecutorEvent>>,
 129    enable_runnable_backtraces: bool,
 130    runnable_backtraces: collections::HashMap<usize, backtrace::Backtrace>,
 131}
 132
 133#[derive(Copy, Clone, Debug, PartialEq, Eq)]
 134pub enum ExecutorEvent {
 135    PollRunnable { id: usize },
 136    EnqueuRunnable { id: usize },
 137}
 138
 139#[cfg(any(test, feature = "test"))]
 140struct ForegroundRunnable {
 141    id: usize,
 142    runnable: Runnable,
 143    main: bool,
 144}
 145
 146#[cfg(any(test, feature = "test"))]
 147struct BackgroundRunnable {
 148    id: usize,
 149    runnable: Runnable,
 150}
 151
 152#[cfg(any(test, feature = "test"))]
 153pub struct Deterministic {
 154    state: Arc<parking_lot::Mutex<DeterministicState>>,
 155    parker: parking_lot::Mutex<parking::Parker>,
 156}
 157
 158#[must_use]
 159pub enum Timer {
 160    Production(smol::Timer),
 161    #[cfg(any(test, feature = "test"))]
 162    Deterministic(DeterministicTimer),
 163}
 164
 165#[cfg(any(test, feature = "test"))]
 166pub struct DeterministicTimer {
 167    rx: postage::barrier::Receiver,
 168    id: usize,
 169    state: Arc<parking_lot::Mutex<DeterministicState>>,
 170}
 171
 172#[cfg(any(test, feature = "test"))]
 173impl Deterministic {
 174    pub fn new(seed: u64) -> Arc<Self> {
 175        use rand::prelude::*;
 176
 177        Arc::new(Self {
 178            state: Arc::new(parking_lot::Mutex::new(DeterministicState {
 179                rng: StdRng::seed_from_u64(seed),
 180                seed,
 181                scheduled_from_foreground: Default::default(),
 182                scheduled_from_background: Default::default(),
 183                forbid_parking: false,
 184                block_on_ticks: 0..=1000,
 185                now: std::time::Instant::now(),
 186                next_timer_id: Default::default(),
 187                pending_timers: Default::default(),
 188                waiting_backtrace: None,
 189                next_runnable_id: 0,
 190                poll_history: Default::default(),
 191                previous_poll_history: Default::default(),
 192                enable_runnable_backtraces: false,
 193                runnable_backtraces: Default::default(),
 194            })),
 195            parker: Default::default(),
 196        })
 197    }
 198
 199    pub fn execution_history(&self) -> Vec<ExecutorEvent> {
 200        self.state.lock().poll_history.clone()
 201    }
 202
 203    pub fn set_previous_execution_history(&self, history: Option<Vec<ExecutorEvent>>) {
 204        self.state.lock().previous_poll_history = history;
 205    }
 206
 207    pub fn enable_runnable_backtrace(&self) {
 208        self.state.lock().enable_runnable_backtraces = true;
 209    }
 210
 211    pub fn runnable_backtrace(&self, runnable_id: usize) -> backtrace::Backtrace {
 212        let mut backtrace = self.state.lock().runnable_backtraces[&runnable_id].clone();
 213        backtrace.resolve();
 214        backtrace
 215    }
 216
 217    pub fn build_background(self: &Arc<Self>) -> Arc<BackgroundExecutor> {
 218        Arc::new(BackgroundExecutor::Deterministic {
 219            executor: self.clone(),
 220        })
 221    }
 222
 223    pub fn build_foreground(self: &Arc<Self>, id: usize) -> Rc<ForegroundExecutor> {
 224        Rc::new(ForegroundExecutor::Deterministic {
 225            cx_id: id,
 226            executor: self.clone(),
 227        })
 228    }
 229
 230    fn spawn_from_foreground(
 231        &self,
 232        cx_id: usize,
 233        future: AnyLocalFuture,
 234        main: bool,
 235    ) -> AnyLocalTask {
 236        let state = self.state.clone();
 237        let id;
 238        {
 239            let mut state = state.lock();
 240            id = util::post_inc(&mut state.next_runnable_id);
 241            if state.enable_runnable_backtraces {
 242                state
 243                    .runnable_backtraces
 244                    .insert(id, backtrace::Backtrace::new_unresolved());
 245            }
 246        }
 247
 248        let unparker = self.parker.lock().unparker();
 249        let (runnable, task) = async_task::spawn_local(future, move |runnable| {
 250            let mut state = state.lock();
 251            state.push_to_history(ExecutorEvent::EnqueuRunnable { id });
 252            state
 253                .scheduled_from_foreground
 254                .entry(cx_id)
 255                .or_default()
 256                .push(ForegroundRunnable { id, runnable, main });
 257            unparker.unpark();
 258        });
 259        runnable.schedule();
 260        task
 261    }
 262
 263    fn spawn(&self, future: AnyFuture) -> AnyTask {
 264        let state = self.state.clone();
 265        let id;
 266        {
 267            let mut state = state.lock();
 268            id = util::post_inc(&mut state.next_runnable_id);
 269            if state.enable_runnable_backtraces {
 270                state
 271                    .runnable_backtraces
 272                    .insert(id, backtrace::Backtrace::new_unresolved());
 273            }
 274        }
 275
 276        let unparker = self.parker.lock().unparker();
 277        let (runnable, task) = async_task::spawn(future, move |runnable| {
 278            let mut state = state.lock();
 279            state
 280                .poll_history
 281                .push(ExecutorEvent::EnqueuRunnable { id });
 282            state
 283                .scheduled_from_background
 284                .push(BackgroundRunnable { id, runnable });
 285            unparker.unpark();
 286        });
 287        runnable.schedule();
 288        task
 289    }
 290
 291    fn run<'a>(
 292        &self,
 293        cx_id: usize,
 294        main_future: Pin<Box<dyn 'a + Future<Output = Box<dyn Any>>>>,
 295    ) -> Box<dyn Any> {
 296        use std::sync::atomic::{AtomicBool, Ordering::SeqCst};
 297
 298        let woken = Arc::new(AtomicBool::new(false));
 299
 300        let state = self.state.clone();
 301        let id;
 302        {
 303            let mut state = state.lock();
 304            id = util::post_inc(&mut state.next_runnable_id);
 305            if state.enable_runnable_backtraces {
 306                state
 307                    .runnable_backtraces
 308                    .insert(id, backtrace::Backtrace::new_unresolved());
 309            }
 310        }
 311
 312        let unparker = self.parker.lock().unparker();
 313        let (runnable, mut main_task) = unsafe {
 314            async_task::spawn_unchecked(main_future, move |runnable| {
 315                let state = &mut *state.lock();
 316                state
 317                    .scheduled_from_foreground
 318                    .entry(cx_id)
 319                    .or_default()
 320                    .push(ForegroundRunnable {
 321                        id: util::post_inc(&mut state.next_runnable_id),
 322                        runnable,
 323                        main: true,
 324                    });
 325                unparker.unpark();
 326            })
 327        };
 328        runnable.schedule();
 329
 330        loop {
 331            if let Some(result) = self.run_internal(woken.clone(), Some(&mut main_task)) {
 332                return result;
 333            }
 334
 335            if !woken.load(SeqCst) {
 336                self.state.lock().will_park();
 337            }
 338
 339            woken.store(false, SeqCst);
 340            self.parker.lock().park();
 341        }
 342    }
 343
 344    pub fn run_until_parked(&self) {
 345        use std::sync::atomic::AtomicBool;
 346        let woken = Arc::new(AtomicBool::new(false));
 347        self.run_internal(woken, None);
 348    }
 349
 350    fn run_internal(
 351        &self,
 352        woken: Arc<std::sync::atomic::AtomicBool>,
 353        mut main_task: Option<&mut AnyLocalTask>,
 354    ) -> Option<Box<dyn Any>> {
 355        use rand::prelude::*;
 356        use std::sync::atomic::Ordering::SeqCst;
 357
 358        let unparker = self.parker.lock().unparker();
 359        let waker = waker_fn::waker_fn(move || {
 360            woken.store(true, SeqCst);
 361            unparker.unpark();
 362        });
 363
 364        let mut cx = Context::from_waker(&waker);
 365        loop {
 366            let mut state = self.state.lock();
 367
 368            if state.scheduled_from_foreground.is_empty()
 369                && state.scheduled_from_background.is_empty()
 370            {
 371                if let Some(main_task) = main_task {
 372                    if let Poll::Ready(result) = main_task.poll(&mut cx) {
 373                        return Some(result);
 374                    }
 375                }
 376
 377                return None;
 378            }
 379
 380            if !state.scheduled_from_background.is_empty() && state.rng.gen() {
 381                let background_len = state.scheduled_from_background.len();
 382                let ix = state.rng.gen_range(0..background_len);
 383                let background_runnable = state.scheduled_from_background.remove(ix);
 384                state.push_to_history(ExecutorEvent::PollRunnable {
 385                    id: background_runnable.id,
 386                });
 387                drop(state);
 388                background_runnable.runnable.run();
 389            } else if !state.scheduled_from_foreground.is_empty() {
 390                let available_cx_ids = state
 391                    .scheduled_from_foreground
 392                    .keys()
 393                    .copied()
 394                    .collect::<Vec<_>>();
 395                let cx_id_to_run = *available_cx_ids.iter().choose(&mut state.rng).unwrap();
 396                let scheduled_from_cx = state
 397                    .scheduled_from_foreground
 398                    .get_mut(&cx_id_to_run)
 399                    .unwrap();
 400                let foreground_runnable = scheduled_from_cx.remove(0);
 401                if scheduled_from_cx.is_empty() {
 402                    state.scheduled_from_foreground.remove(&cx_id_to_run);
 403                }
 404                state.push_to_history(ExecutorEvent::PollRunnable {
 405                    id: foreground_runnable.id,
 406                });
 407
 408                drop(state);
 409
 410                foreground_runnable.runnable.run();
 411                if let Some(main_task) = main_task.as_mut() {
 412                    if foreground_runnable.main {
 413                        if let Poll::Ready(result) = main_task.poll(&mut cx) {
 414                            return Some(result);
 415                        }
 416                    }
 417                }
 418            }
 419        }
 420    }
 421
 422    fn block<F, T>(&self, future: &mut F, max_ticks: usize) -> Option<T>
 423    where
 424        F: Unpin + Future<Output = T>,
 425    {
 426        use rand::prelude::*;
 427
 428        let unparker = self.parker.lock().unparker();
 429        let waker = waker_fn::waker_fn(move || {
 430            unparker.unpark();
 431        });
 432
 433        let mut cx = Context::from_waker(&waker);
 434        for _ in 0..max_ticks {
 435            let mut state = self.state.lock();
 436            let runnable_count = state.scheduled_from_background.len();
 437            let ix = state.rng.gen_range(0..=runnable_count);
 438            if ix < state.scheduled_from_background.len() {
 439                let background_runnable = state.scheduled_from_background.remove(ix);
 440                state.push_to_history(ExecutorEvent::PollRunnable {
 441                    id: background_runnable.id,
 442                });
 443                drop(state);
 444                background_runnable.runnable.run();
 445            } else {
 446                drop(state);
 447                if let Poll::Ready(result) = future.poll(&mut cx) {
 448                    return Some(result);
 449                }
 450                let mut state = self.state.lock();
 451                if state.scheduled_from_background.is_empty() {
 452                    state.will_park();
 453                    drop(state);
 454                    self.parker.lock().park();
 455                }
 456
 457                continue;
 458            }
 459        }
 460
 461        None
 462    }
 463
 464    pub fn timer(&self, duration: Duration) -> Timer {
 465        let (tx, rx) = postage::barrier::channel();
 466        let mut state = self.state.lock();
 467        let wakeup_at = state.now + duration;
 468        let id = util::post_inc(&mut state.next_timer_id);
 469        match state
 470            .pending_timers
 471            .binary_search_by_key(&wakeup_at, |e| e.1)
 472        {
 473            Ok(ix) | Err(ix) => state.pending_timers.insert(ix, (id, wakeup_at, tx)),
 474        }
 475        let state = self.state.clone();
 476        Timer::Deterministic(DeterministicTimer { rx, id, state })
 477    }
 478
 479    pub fn now(&self) -> std::time::Instant {
 480        let state = self.state.lock();
 481        state.now
 482    }
 483
 484    pub fn advance_clock(&self, duration: Duration) {
 485        let new_now = self.state.lock().now + duration;
 486        loop {
 487            self.run_until_parked();
 488            let mut state = self.state.lock();
 489
 490            if let Some((_, wakeup_time, _)) = state.pending_timers.first() {
 491                let wakeup_time = *wakeup_time;
 492                if wakeup_time <= new_now {
 493                    let timer_count = state
 494                        .pending_timers
 495                        .iter()
 496                        .take_while(|(_, t, _)| *t == wakeup_time)
 497                        .count();
 498                    state.now = wakeup_time;
 499                    let timers_to_wake = state
 500                        .pending_timers
 501                        .drain(0..timer_count)
 502                        .collect::<Vec<_>>();
 503                    drop(state);
 504                    drop(timers_to_wake);
 505                    continue;
 506                }
 507            }
 508
 509            break;
 510        }
 511
 512        self.state.lock().now = new_now;
 513    }
 514
 515    pub fn start_waiting(&self) {
 516        self.state.lock().waiting_backtrace = Some(backtrace::Backtrace::new_unresolved());
 517    }
 518
 519    pub fn finish_waiting(&self) {
 520        self.state.lock().waiting_backtrace.take();
 521    }
 522
 523    pub fn forbid_parking(&self) {
 524        use rand::prelude::*;
 525
 526        let mut state = self.state.lock();
 527        state.forbid_parking = true;
 528        state.rng = StdRng::seed_from_u64(state.seed);
 529    }
 530
 531    pub fn allow_parking(&self) {
 532        use rand::prelude::*;
 533
 534        let mut state = self.state.lock();
 535        state.forbid_parking = false;
 536        state.rng = StdRng::seed_from_u64(state.seed);
 537    }
 538
 539    pub async fn simulate_random_delay(&self) {
 540        use rand::prelude::*;
 541        use smol::future::yield_now;
 542        if self.state.lock().rng.gen_bool(0.2) {
 543            let yields = self.state.lock().rng.gen_range(1..=10);
 544            for _ in 0..yields {
 545                yield_now().await;
 546            }
 547        }
 548    }
 549
 550    pub fn record_backtrace(&self) {
 551        let mut state = self.state.lock();
 552        if state.enable_runnable_backtraces {
 553            let current_id = state
 554                .poll_history
 555                .iter()
 556                .rev()
 557                .find_map(|event| match event {
 558                    ExecutorEvent::PollRunnable { id } => Some(*id),
 559                    _ => None,
 560                });
 561            if let Some(id) = current_id {
 562                state
 563                    .runnable_backtraces
 564                    .insert(id, backtrace::Backtrace::new_unresolved());
 565            }
 566        }
 567    }
 568}
 569
 570impl Drop for Timer {
 571    fn drop(&mut self) {
 572        #[cfg(any(test, feature = "test"))]
 573        if let Timer::Deterministic(DeterministicTimer { state, id, .. }) = self {
 574            state
 575                .lock()
 576                .pending_timers
 577                .retain(|(timer_id, _, _)| timer_id != id)
 578        }
 579    }
 580}
 581
 582impl Future for Timer {
 583    type Output = ();
 584
 585    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
 586        match &mut *self {
 587            #[cfg(any(test, feature = "test"))]
 588            Self::Deterministic(DeterministicTimer { rx, .. }) => {
 589                use postage::stream::{PollRecv, Stream as _};
 590                smol::pin!(rx);
 591                match rx.poll_recv(&mut postage::Context::from_waker(cx.waker())) {
 592                    PollRecv::Ready(()) | PollRecv::Closed => Poll::Ready(()),
 593                    PollRecv::Pending => Poll::Pending,
 594                }
 595            }
 596            Self::Production(timer) => {
 597                smol::pin!(timer);
 598                match timer.poll(cx) {
 599                    Poll::Ready(_) => Poll::Ready(()),
 600                    Poll::Pending => Poll::Pending,
 601                }
 602            }
 603        }
 604    }
 605}
 606
 607#[cfg(any(test, feature = "test"))]
 608impl DeterministicState {
 609    fn push_to_history(&mut self, event: ExecutorEvent) {
 610        use std::fmt::Write as _;
 611
 612        self.poll_history.push(event);
 613        if let Some(prev_history) = &self.previous_poll_history {
 614            let ix = self.poll_history.len() - 1;
 615            let prev_event = prev_history[ix];
 616            if event != prev_event {
 617                let mut message = String::new();
 618                writeln!(
 619                    &mut message,
 620                    "current runnable backtrace:\n{:?}",
 621                    self.runnable_backtraces.get_mut(&event.id()).map(|trace| {
 622                        trace.resolve();
 623                        crate::util::CwdBacktrace(trace)
 624                    })
 625                )
 626                .unwrap();
 627                writeln!(
 628                    &mut message,
 629                    "previous runnable backtrace:\n{:?}",
 630                    self.runnable_backtraces
 631                        .get_mut(&prev_event.id())
 632                        .map(|trace| {
 633                            trace.resolve();
 634                            util::CwdBacktrace(trace)
 635                        })
 636                )
 637                .unwrap();
 638                panic!("detected non-determinism after {ix}. {message}");
 639            }
 640        }
 641    }
 642
 643    fn will_park(&mut self) {
 644        if self.forbid_parking {
 645            let mut backtrace_message = String::new();
 646            #[cfg(any(test, feature = "test"))]
 647            if let Some(backtrace) = self.waiting_backtrace.as_mut() {
 648                backtrace.resolve();
 649                backtrace_message = format!(
 650                    "\nbacktrace of waiting future:\n{:?}",
 651                    util::CwdBacktrace(backtrace)
 652                );
 653            }
 654
 655            panic!(
 656                "deterministic executor parked after a call to forbid_parking{}",
 657                backtrace_message
 658            );
 659        }
 660    }
 661}
 662
 663#[cfg(any(test, feature = "test"))]
 664impl ExecutorEvent {
 665    pub fn id(&self) -> usize {
 666        match self {
 667            ExecutorEvent::PollRunnable { id } => *id,
 668            ExecutorEvent::EnqueuRunnable { id } => *id,
 669        }
 670    }
 671}
 672
 673impl ForegroundExecutor {
 674    pub fn new(dispatcher: Arc<dyn PlatformDispatcher>) -> Result<Self> {
 675        if dispatcher.is_main_thread() {
 676            Ok(Self::Platform {
 677                dispatcher,
 678                _not_send_or_sync: PhantomData,
 679            })
 680        } else {
 681            Err(anyhow!("must be constructed on main thread"))
 682        }
 683    }
 684
 685    pub fn spawn<T: 'static>(&self, future: impl Future<Output = T> + 'static) -> Task<T> {
 686        let future = any_local_future(future);
 687        let any_task = match self {
 688            #[cfg(any(test, feature = "test"))]
 689            Self::Deterministic { cx_id, executor } => {
 690                executor.spawn_from_foreground(*cx_id, future, false)
 691            }
 692            Self::Platform { dispatcher, .. } => {
 693                fn spawn_inner(
 694                    future: AnyLocalFuture,
 695                    dispatcher: &Arc<dyn PlatformDispatcher>,
 696                ) -> AnyLocalTask {
 697                    let dispatcher = dispatcher.clone();
 698                    let schedule =
 699                        move |runnable: Runnable| dispatcher.run_on_main_thread(runnable);
 700                    let (runnable, task) = async_task::spawn_local(future, schedule);
 701                    runnable.schedule();
 702                    task
 703                }
 704                spawn_inner(future, dispatcher)
 705            }
 706        };
 707        Task::local(any_task)
 708    }
 709
 710    #[cfg(any(test, feature = "test"))]
 711    pub fn run<T: 'static>(&self, future: impl Future<Output = T>) -> T {
 712        let future = async move { Box::new(future.await) as Box<dyn Any> }.boxed_local();
 713        let result = match self {
 714            Self::Deterministic { cx_id, executor } => executor.run(*cx_id, future),
 715            Self::Platform { .. } => panic!("you can't call run on a platform foreground executor"),
 716        };
 717        *result.downcast().unwrap()
 718    }
 719
 720    #[cfg(any(test, feature = "test"))]
 721    pub fn run_until_parked(&self) {
 722        match self {
 723            Self::Deterministic { executor, .. } => executor.run_until_parked(),
 724            _ => panic!("this method can only be called on a deterministic executor"),
 725        }
 726    }
 727
 728    #[cfg(any(test, feature = "test"))]
 729    pub fn parking_forbidden(&self) -> bool {
 730        match self {
 731            Self::Deterministic { executor, .. } => executor.state.lock().forbid_parking,
 732            _ => panic!("this method can only be called on a deterministic executor"),
 733        }
 734    }
 735
 736    #[cfg(any(test, feature = "test"))]
 737    pub fn start_waiting(&self) {
 738        match self {
 739            Self::Deterministic { executor, .. } => executor.start_waiting(),
 740            _ => panic!("this method can only be called on a deterministic executor"),
 741        }
 742    }
 743
 744    #[cfg(any(test, feature = "test"))]
 745    pub fn finish_waiting(&self) {
 746        match self {
 747            Self::Deterministic { executor, .. } => executor.finish_waiting(),
 748            _ => panic!("this method can only be called on a deterministic executor"),
 749        }
 750    }
 751
 752    #[cfg(any(test, feature = "test"))]
 753    pub fn forbid_parking(&self) {
 754        match self {
 755            Self::Deterministic { executor, .. } => executor.forbid_parking(),
 756            _ => panic!("this method can only be called on a deterministic executor"),
 757        }
 758    }
 759
 760    #[cfg(any(test, feature = "test"))]
 761    pub fn allow_parking(&self) {
 762        match self {
 763            Self::Deterministic { executor, .. } => executor.allow_parking(),
 764            _ => panic!("this method can only be called on a deterministic executor"),
 765        }
 766    }
 767
 768    #[cfg(any(test, feature = "test"))]
 769    pub fn advance_clock(&self, duration: Duration) {
 770        match self {
 771            Self::Deterministic { executor, .. } => executor.advance_clock(duration),
 772            _ => panic!("this method can only be called on a deterministic executor"),
 773        }
 774    }
 775
 776    #[cfg(any(test, feature = "test"))]
 777    pub fn set_block_on_ticks(&self, range: std::ops::RangeInclusive<usize>) {
 778        match self {
 779            Self::Deterministic { executor, .. } => executor.state.lock().block_on_ticks = range,
 780            _ => panic!("this method can only be called on a deterministic executor"),
 781        }
 782    }
 783}
 784
 785impl BackgroundExecutor {
 786    pub fn new() -> Self {
 787        let executor = Arc::new(Executor::new());
 788        let stop = channel::unbounded::<()>();
 789
 790        for i in 0..2 * num_cpus::get() {
 791            let executor = executor.clone();
 792            let stop = stop.1.clone();
 793            thread::Builder::new()
 794                .name(format!("background-executor-{}", i))
 795                .spawn(move || smol::block_on(executor.run(stop.recv())))
 796                .unwrap();
 797        }
 798
 799        Self::Production {
 800            executor,
 801            _stop: stop.0,
 802        }
 803    }
 804
 805    pub fn num_cpus(&self) -> usize {
 806        num_cpus::get()
 807    }
 808
 809    pub fn spawn<T, F>(&self, future: F) -> Task<T>
 810    where
 811        T: 'static + Send,
 812        F: Send + Future<Output = T> + 'static,
 813    {
 814        let future = any_future(future);
 815        let any_task = match self {
 816            Self::Production { executor, .. } => executor.spawn(future),
 817            #[cfg(any(test, feature = "test"))]
 818            Self::Deterministic { executor } => executor.spawn(future),
 819        };
 820        Task::send(any_task)
 821    }
 822
 823    pub fn block<F, T>(&self, future: F) -> T
 824    where
 825        F: Future<Output = T>,
 826    {
 827        smol::pin!(future);
 828        match self {
 829            Self::Production { .. } => smol::block_on(&mut future),
 830            #[cfg(any(test, feature = "test"))]
 831            Self::Deterministic { executor, .. } => {
 832                executor.block(&mut future, usize::MAX).unwrap()
 833            }
 834        }
 835    }
 836
 837    pub fn block_with_timeout<F, T>(
 838        &self,
 839        timeout: Duration,
 840        future: F,
 841    ) -> Result<T, impl Future<Output = T>>
 842    where
 843        T: 'static,
 844        F: 'static + Unpin + Future<Output = T>,
 845    {
 846        let mut future = any_local_future(future);
 847        if !timeout.is_zero() {
 848            let output = match self {
 849                Self::Production { .. } => smol::block_on(util::timeout(timeout, &mut future)).ok(),
 850                #[cfg(any(test, feature = "test"))]
 851                Self::Deterministic { executor, .. } => {
 852                    use rand::prelude::*;
 853                    let max_ticks = {
 854                        let mut state = executor.state.lock();
 855                        let range = state.block_on_ticks.clone();
 856                        state.rng.gen_range(range)
 857                    };
 858                    executor.block(&mut future, max_ticks)
 859                }
 860            };
 861            if let Some(output) = output {
 862                return Ok(*output.downcast().unwrap());
 863            }
 864        }
 865        Err(async { *future.await.downcast().unwrap() })
 866    }
 867
 868    pub async fn scoped<'scope, F>(self: &Arc<Self>, scheduler: F)
 869    where
 870        F: FnOnce(&mut Scope<'scope>),
 871    {
 872        let mut scope = Scope::new(self.clone());
 873        (scheduler)(&mut scope);
 874        let spawned = mem::take(&mut scope.futures)
 875            .into_iter()
 876            .map(|f| self.spawn(f))
 877            .collect::<Vec<_>>();
 878        for task in spawned {
 879            task.await;
 880        }
 881    }
 882
 883    pub fn timer(&self, duration: Duration) -> Timer {
 884        match self {
 885            BackgroundExecutor::Production { .. } => {
 886                Timer::Production(smol::Timer::after(duration))
 887            }
 888            #[cfg(any(test, feature = "test"))]
 889            BackgroundExecutor::Deterministic { executor } => executor.timer(duration),
 890        }
 891    }
 892
 893    pub fn now(&self) -> std::time::Instant {
 894        match self {
 895            BackgroundExecutor::Production { .. } => std::time::Instant::now(),
 896            #[cfg(any(test, feature = "test"))]
 897            BackgroundExecutor::Deterministic { executor } => executor.now(),
 898        }
 899    }
 900
 901    #[cfg(any(test, feature = "test"))]
 902    pub fn rng<'a>(&'a self) -> impl 'a + std::ops::DerefMut<Target = rand::prelude::StdRng> {
 903        match self {
 904            Self::Deterministic { executor, .. } => {
 905                parking_lot::lock_api::MutexGuard::map(executor.state.lock(), |s| &mut s.rng)
 906            }
 907            _ => panic!("this method can only be called on a deterministic executor"),
 908        }
 909    }
 910
 911    #[cfg(any(test, feature = "test"))]
 912    pub async fn simulate_random_delay(&self) {
 913        match self {
 914            Self::Deterministic { executor, .. } => {
 915                executor.simulate_random_delay().await;
 916            }
 917            _ => {
 918                panic!("this method can only be called on a deterministic executor")
 919            }
 920        }
 921    }
 922
 923    #[cfg(any(test, feature = "test"))]
 924    pub fn record_backtrace(&self) {
 925        match self {
 926            Self::Deterministic { executor, .. } => executor.record_backtrace(),
 927            _ => {
 928                panic!("this method can only be called on a deterministic executor")
 929            }
 930        }
 931    }
 932
 933    #[cfg(any(test, feature = "test"))]
 934    pub fn start_waiting(&self) {
 935        match self {
 936            Self::Deterministic { executor, .. } => executor.start_waiting(),
 937            _ => panic!("this method can only be called on a deterministic executor"),
 938        }
 939    }
 940}
 941
 942impl Default for BackgroundExecutor {
 943    fn default() -> Self {
 944        Self::new()
 945    }
 946}
 947
 948pub struct Scope<'a> {
 949    executor: Arc<BackgroundExecutor>,
 950    futures: Vec<Pin<Box<dyn Future<Output = ()> + Send + 'static>>>,
 951    tx: Option<mpsc::Sender<()>>,
 952    rx: mpsc::Receiver<()>,
 953    _phantom: PhantomData<&'a ()>,
 954}
 955
 956impl<'a> Scope<'a> {
 957    fn new(executor: Arc<BackgroundExecutor>) -> Self {
 958        let (tx, rx) = mpsc::channel(1);
 959        Self {
 960            executor,
 961            tx: Some(tx),
 962            rx,
 963            futures: Default::default(),
 964            _phantom: PhantomData,
 965        }
 966    }
 967
 968    pub fn spawn<F>(&mut self, f: F)
 969    where
 970        F: Future<Output = ()> + Send + 'a,
 971    {
 972        let tx = self.tx.clone().unwrap();
 973
 974        // Safety: The 'a lifetime is guaranteed to outlive any of these futures because
 975        // dropping this `Scope` blocks until all of the futures have resolved.
 976        let f = unsafe {
 977            mem::transmute::<
 978                Pin<Box<dyn Future<Output = ()> + Send + 'a>>,
 979                Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
 980            >(Box::pin(async move {
 981                f.await;
 982                drop(tx);
 983            }))
 984        };
 985        self.futures.push(f);
 986    }
 987}
 988
 989impl<'a> Drop for Scope<'a> {
 990    fn drop(&mut self) {
 991        self.tx.take().unwrap();
 992
 993        // Wait until the channel is closed, which means that all of the spawned
 994        // futures have resolved.
 995        self.executor.block(self.rx.next());
 996    }
 997}
 998
 999impl<T> Task<T> {
1000    pub fn ready(value: T) -> Self {
1001        Self::Ready(Some(value))
1002    }
1003
1004    fn local(any_task: AnyLocalTask) -> Self {
1005        Self::Local {
1006            any_task,
1007            result_type: PhantomData,
1008        }
1009    }
1010
1011    pub fn detach(self) {
1012        match self {
1013            Task::Ready(_) => {}
1014            Task::Local { any_task, .. } => any_task.detach(),
1015            Task::Send { any_task, .. } => any_task.detach(),
1016        }
1017    }
1018}
1019
1020// impl<T: 'static, E: 'static + Display> Task<Result<T, E>> {
1021//     #[track_caller]
1022//     pub fn detach_and_log_err(self, cx: &mut AppContext) {
1023//         let caller = Location::caller();
1024//         cx.spawn(|_| async move {
1025//             if let Err(err) = self.await {
1026//                 log::error!("{}:{}: {:#}", caller.file(), caller.line(), err);
1027//             }
1028//         })
1029//         .detach();
1030//     }
1031// }
1032
1033impl<T: Send> Task<T> {
1034    fn send(any_task: AnyTask) -> Self {
1035        Self::Send {
1036            any_task,
1037            result_type: PhantomData,
1038        }
1039    }
1040}
1041
1042impl<T: fmt::Debug> fmt::Debug for Task<T> {
1043    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1044        match self {
1045            Task::Ready(value) => value.fmt(f),
1046            Task::Local { any_task, .. } => any_task.fmt(f),
1047            Task::Send { any_task, .. } => any_task.fmt(f),
1048        }
1049    }
1050}
1051
1052impl<T: 'static> Future for Task<T> {
1053    type Output = T;
1054
1055    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1056        match unsafe { self.get_unchecked_mut() } {
1057            Task::Ready(value) => Poll::Ready(value.take().unwrap()),
1058            Task::Local { any_task, .. } => {
1059                any_task.poll(cx).map(|value| *value.downcast().unwrap())
1060            }
1061            Task::Send { any_task, .. } => {
1062                any_task.poll(cx).map(|value| *value.downcast().unwrap())
1063            }
1064        }
1065    }
1066}
1067
1068fn any_future<T, F>(future: F) -> AnyFuture
1069where
1070    T: 'static + Send,
1071    F: Future<Output = T> + Send + 'static,
1072{
1073    async { Box::new(future.await) as Box<dyn Any + Send> }.boxed()
1074}
1075
1076fn any_local_future<T, F>(future: F) -> AnyLocalFuture
1077where
1078    T: 'static,
1079    F: Future<Output = T> + 'static,
1080{
1081    async { Box::new(future.await) as Box<dyn Any> }.boxed_local()
1082}