executor.rs

   1use crate::util;
   2use crate::PlatformDispatcher;
   3use anyhow::{anyhow, Result};
   4use async_task::Runnable;
   5use futures::channel::{mpsc, oneshot};
   6use smol::{channel, prelude::*, Executor};
   7use std::{
   8    any::Any,
   9    fmt::{self},
  10    marker::PhantomData,
  11    mem,
  12    pin::Pin,
  13    rc::Rc,
  14    sync::Arc,
  15    task::{Context, Poll},
  16    thread,
  17    time::Duration,
  18};
  19
  20/// Enqueues the given closure to run on the application's event loop.
  21/// Returns the result asynchronously.
  22pub(crate) fn run_on_main<F, R>(
  23    dispatcher: Arc<dyn PlatformDispatcher>,
  24    func: F,
  25) -> impl Future<Output = R>
  26where
  27    F: FnOnce() -> R + Send + 'static,
  28    R: Send + 'static,
  29{
  30    let (tx, rx) = oneshot::channel();
  31    if dispatcher.is_main_thread() {
  32        let _ = tx.send(func());
  33    } else {
  34        let _ = spawn_on_main(dispatcher, move || async move {
  35            let _ = tx.send(func());
  36        });
  37    }
  38    async move { rx.await.unwrap() }
  39}
  40
  41/// Enqueues the given closure to be run on the application's event loop. The
  42/// closure returns a future which will be run to completion on the main thread.
  43pub(crate) fn spawn_on_main<F, R>(
  44    dispatcher: Arc<dyn PlatformDispatcher>,
  45    func: impl FnOnce() -> F + Send + 'static,
  46) -> impl Future<Output = R>
  47where
  48    F: Future<Output = R> + 'static,
  49    R: Send + 'static,
  50{
  51    let (tx, rx) = oneshot::channel();
  52    let (runnable, task) = async_task::spawn(
  53        {
  54            let dispatcher = dispatcher.clone();
  55            async move {
  56                let future = func();
  57                let _ = spawn_on_main_local(dispatcher, async move {
  58                    let result = future.await;
  59                    let _ = tx.send(result);
  60                });
  61            }
  62        },
  63        move |runnable| dispatcher.run_on_main_thread(runnable),
  64    );
  65    runnable.schedule();
  66    task.detach();
  67    async move { rx.await.unwrap() }
  68}
  69
  70/// Enqueues the given closure to be run on the application's event loop. Must
  71/// be called on the main thread.
  72pub(crate) fn spawn_on_main_local<R>(
  73    dispatcher: Arc<dyn PlatformDispatcher>,
  74    future: impl Future<Output = R> + 'static,
  75) -> impl Future<Output = R>
  76where
  77    R: 'static,
  78{
  79    assert!(dispatcher.is_main_thread(), "must be called on main thread");
  80
  81    let (tx, rx) = oneshot::channel();
  82    let (runnable, task) = async_task::spawn_local(
  83        async move {
  84            let result = future.await;
  85            let _ = tx.send(result);
  86        },
  87        move |runnable| dispatcher.run_on_main_thread(runnable),
  88    );
  89    runnable.schedule();
  90    task.detach();
  91    async move { rx.await.unwrap() }
  92}
  93
  94pub enum ForegroundExecutor {
  95    Platform {
  96        dispatcher: Arc<dyn PlatformDispatcher>,
  97        _not_send_or_sync: PhantomData<Rc<()>>,
  98    },
  99    #[cfg(any(test, feature = "test"))]
 100    Deterministic {
 101        cx_id: usize,
 102        executor: Arc<Deterministic>,
 103    },
 104}
 105
 106pub enum BackgroundExecutor {
 107    #[cfg(any(test, feature = "test"))]
 108    Deterministic { executor: Arc<Deterministic> },
 109    Production {
 110        executor: Arc<smol::Executor<'static>>,
 111        _stop: channel::Sender<()>,
 112    },
 113}
 114
 115type AnyLocalFuture = Pin<Box<dyn 'static + Future<Output = Box<dyn Any + 'static>>>>;
 116type AnyFuture = Pin<Box<dyn 'static + Send + Future<Output = Box<dyn Any + Send + 'static>>>>;
 117type AnyTask = async_task::Task<Box<dyn Any + Send + 'static>>;
 118type AnyLocalTask = async_task::Task<Box<dyn Any + 'static>>;
 119
 120#[must_use]
 121pub enum Task<T> {
 122    Ready(Option<T>),
 123    Local {
 124        any_task: AnyLocalTask,
 125        result_type: PhantomData<T>,
 126    },
 127    Send {
 128        any_task: AnyTask,
 129        result_type: PhantomData<T>,
 130    },
 131}
 132
 133unsafe impl<T: Send> Send for Task<T> {}
 134
 135#[cfg(any(test, feature = "test"))]
 136struct DeterministicState {
 137    rng: rand::prelude::StdRng,
 138    seed: u64,
 139    scheduled_from_foreground: collections::HashMap<usize, Vec<ForegroundRunnable>>,
 140    scheduled_from_background: Vec<BackgroundRunnable>,
 141    forbid_parking: bool,
 142    block_on_ticks: std::ops::RangeInclusive<usize>,
 143    now: std::time::Instant,
 144    next_timer_id: usize,
 145    pending_timers: Vec<(usize, std::time::Instant, postage::barrier::Sender)>,
 146    waiting_backtrace: Option<backtrace::Backtrace>,
 147    next_runnable_id: usize,
 148    poll_history: Vec<ExecutorEvent>,
 149    previous_poll_history: Option<Vec<ExecutorEvent>>,
 150    enable_runnable_backtraces: bool,
 151    runnable_backtraces: collections::HashMap<usize, backtrace::Backtrace>,
 152}
 153
 154#[derive(Copy, Clone, Debug, PartialEq, Eq)]
 155pub enum ExecutorEvent {
 156    PollRunnable { id: usize },
 157    EnqueuRunnable { id: usize },
 158}
 159
 160#[cfg(any(test, feature = "test"))]
 161struct ForegroundRunnable {
 162    id: usize,
 163    runnable: Runnable,
 164    main: bool,
 165}
 166
 167#[cfg(any(test, feature = "test"))]
 168struct BackgroundRunnable {
 169    id: usize,
 170    runnable: Runnable,
 171}
 172
 173#[cfg(any(test, feature = "test"))]
 174pub struct Deterministic {
 175    state: Arc<parking_lot::Mutex<DeterministicState>>,
 176    parker: parking_lot::Mutex<parking::Parker>,
 177}
 178
 179#[must_use]
 180pub enum Timer {
 181    Production(smol::Timer),
 182    #[cfg(any(test, feature = "test"))]
 183    Deterministic(DeterministicTimer),
 184}
 185
 186#[cfg(any(test, feature = "test"))]
 187pub struct DeterministicTimer {
 188    rx: postage::barrier::Receiver,
 189    id: usize,
 190    state: Arc<parking_lot::Mutex<DeterministicState>>,
 191}
 192
 193#[cfg(any(test, feature = "test"))]
 194impl Deterministic {
 195    pub fn new(seed: u64) -> Arc<Self> {
 196        use rand::prelude::*;
 197
 198        Arc::new(Self {
 199            state: Arc::new(parking_lot::Mutex::new(DeterministicState {
 200                rng: StdRng::seed_from_u64(seed),
 201                seed,
 202                scheduled_from_foreground: Default::default(),
 203                scheduled_from_background: Default::default(),
 204                forbid_parking: false,
 205                block_on_ticks: 0..=1000,
 206                now: std::time::Instant::now(),
 207                next_timer_id: Default::default(),
 208                pending_timers: Default::default(),
 209                waiting_backtrace: None,
 210                next_runnable_id: 0,
 211                poll_history: Default::default(),
 212                previous_poll_history: Default::default(),
 213                enable_runnable_backtraces: false,
 214                runnable_backtraces: Default::default(),
 215            })),
 216            parker: Default::default(),
 217        })
 218    }
 219
 220    pub fn execution_history(&self) -> Vec<ExecutorEvent> {
 221        self.state.lock().poll_history.clone()
 222    }
 223
 224    pub fn set_previous_execution_history(&self, history: Option<Vec<ExecutorEvent>>) {
 225        self.state.lock().previous_poll_history = history;
 226    }
 227
 228    pub fn enable_runnable_backtrace(&self) {
 229        self.state.lock().enable_runnable_backtraces = true;
 230    }
 231
 232    pub fn runnable_backtrace(&self, runnable_id: usize) -> backtrace::Backtrace {
 233        let mut backtrace = self.state.lock().runnable_backtraces[&runnable_id].clone();
 234        backtrace.resolve();
 235        backtrace
 236    }
 237
 238    pub fn build_background(self: &Arc<Self>) -> Arc<BackgroundExecutor> {
 239        Arc::new(BackgroundExecutor::Deterministic {
 240            executor: self.clone(),
 241        })
 242    }
 243
 244    pub fn build_foreground(self: &Arc<Self>, id: usize) -> Rc<ForegroundExecutor> {
 245        Rc::new(ForegroundExecutor::Deterministic {
 246            cx_id: id,
 247            executor: self.clone(),
 248        })
 249    }
 250
 251    fn spawn_from_foreground(
 252        &self,
 253        cx_id: usize,
 254        future: AnyLocalFuture,
 255        main: bool,
 256    ) -> AnyLocalTask {
 257        let state = self.state.clone();
 258        let id;
 259        {
 260            let mut state = state.lock();
 261            id = util::post_inc(&mut state.next_runnable_id);
 262            if state.enable_runnable_backtraces {
 263                state
 264                    .runnable_backtraces
 265                    .insert(id, backtrace::Backtrace::new_unresolved());
 266            }
 267        }
 268
 269        let unparker = self.parker.lock().unparker();
 270        let (runnable, task) = async_task::spawn_local(future, move |runnable| {
 271            let mut state = state.lock();
 272            state.push_to_history(ExecutorEvent::EnqueuRunnable { id });
 273            state
 274                .scheduled_from_foreground
 275                .entry(cx_id)
 276                .or_default()
 277                .push(ForegroundRunnable { id, runnable, main });
 278            unparker.unpark();
 279        });
 280        runnable.schedule();
 281        task
 282    }
 283
 284    fn spawn(&self, future: AnyFuture) -> AnyTask {
 285        let state = self.state.clone();
 286        let id;
 287        {
 288            let mut state = state.lock();
 289            id = util::post_inc(&mut state.next_runnable_id);
 290            if state.enable_runnable_backtraces {
 291                state
 292                    .runnable_backtraces
 293                    .insert(id, backtrace::Backtrace::new_unresolved());
 294            }
 295        }
 296
 297        let unparker = self.parker.lock().unparker();
 298        let (runnable, task) = async_task::spawn(future, move |runnable| {
 299            let mut state = state.lock();
 300            state
 301                .poll_history
 302                .push(ExecutorEvent::EnqueuRunnable { id });
 303            state
 304                .scheduled_from_background
 305                .push(BackgroundRunnable { id, runnable });
 306            unparker.unpark();
 307        });
 308        runnable.schedule();
 309        task
 310    }
 311
 312    fn run<'a>(
 313        &self,
 314        cx_id: usize,
 315        main_future: Pin<Box<dyn 'a + Future<Output = Box<dyn Any>>>>,
 316    ) -> Box<dyn Any> {
 317        use std::sync::atomic::{AtomicBool, Ordering::SeqCst};
 318
 319        let woken = Arc::new(AtomicBool::new(false));
 320
 321        let state = self.state.clone();
 322        let id;
 323        {
 324            let mut state = state.lock();
 325            id = util::post_inc(&mut state.next_runnable_id);
 326            if state.enable_runnable_backtraces {
 327                state
 328                    .runnable_backtraces
 329                    .insert(id, backtrace::Backtrace::new_unresolved());
 330            }
 331        }
 332
 333        let unparker = self.parker.lock().unparker();
 334        let (runnable, mut main_task) = unsafe {
 335            async_task::spawn_unchecked(main_future, move |runnable| {
 336                let state = &mut *state.lock();
 337                state
 338                    .scheduled_from_foreground
 339                    .entry(cx_id)
 340                    .or_default()
 341                    .push(ForegroundRunnable {
 342                        id: util::post_inc(&mut state.next_runnable_id),
 343                        runnable,
 344                        main: true,
 345                    });
 346                unparker.unpark();
 347            })
 348        };
 349        runnable.schedule();
 350
 351        loop {
 352            if let Some(result) = self.run_internal(woken.clone(), Some(&mut main_task)) {
 353                return result;
 354            }
 355
 356            if !woken.load(SeqCst) {
 357                self.state.lock().will_park();
 358            }
 359
 360            woken.store(false, SeqCst);
 361            self.parker.lock().park();
 362        }
 363    }
 364
 365    pub fn run_until_parked(&self) {
 366        use std::sync::atomic::AtomicBool;
 367        let woken = Arc::new(AtomicBool::new(false));
 368        self.run_internal(woken, None);
 369    }
 370
 371    fn run_internal(
 372        &self,
 373        woken: Arc<std::sync::atomic::AtomicBool>,
 374        mut main_task: Option<&mut AnyLocalTask>,
 375    ) -> Option<Box<dyn Any>> {
 376        use rand::prelude::*;
 377        use std::sync::atomic::Ordering::SeqCst;
 378
 379        let unparker = self.parker.lock().unparker();
 380        let waker = waker_fn::waker_fn(move || {
 381            woken.store(true, SeqCst);
 382            unparker.unpark();
 383        });
 384
 385        let mut cx = Context::from_waker(&waker);
 386        loop {
 387            let mut state = self.state.lock();
 388
 389            if state.scheduled_from_foreground.is_empty()
 390                && state.scheduled_from_background.is_empty()
 391            {
 392                if let Some(main_task) = main_task {
 393                    if let Poll::Ready(result) = main_task.poll(&mut cx) {
 394                        return Some(result);
 395                    }
 396                }
 397
 398                return None;
 399            }
 400
 401            if !state.scheduled_from_background.is_empty() && state.rng.gen() {
 402                let background_len = state.scheduled_from_background.len();
 403                let ix = state.rng.gen_range(0..background_len);
 404                let background_runnable = state.scheduled_from_background.remove(ix);
 405                state.push_to_history(ExecutorEvent::PollRunnable {
 406                    id: background_runnable.id,
 407                });
 408                drop(state);
 409                background_runnable.runnable.run();
 410            } else if !state.scheduled_from_foreground.is_empty() {
 411                let available_cx_ids = state
 412                    .scheduled_from_foreground
 413                    .keys()
 414                    .copied()
 415                    .collect::<Vec<_>>();
 416                let cx_id_to_run = *available_cx_ids.iter().choose(&mut state.rng).unwrap();
 417                let scheduled_from_cx = state
 418                    .scheduled_from_foreground
 419                    .get_mut(&cx_id_to_run)
 420                    .unwrap();
 421                let foreground_runnable = scheduled_from_cx.remove(0);
 422                if scheduled_from_cx.is_empty() {
 423                    state.scheduled_from_foreground.remove(&cx_id_to_run);
 424                }
 425                state.push_to_history(ExecutorEvent::PollRunnable {
 426                    id: foreground_runnable.id,
 427                });
 428
 429                drop(state);
 430
 431                foreground_runnable.runnable.run();
 432                if let Some(main_task) = main_task.as_mut() {
 433                    if foreground_runnable.main {
 434                        if let Poll::Ready(result) = main_task.poll(&mut cx) {
 435                            return Some(result);
 436                        }
 437                    }
 438                }
 439            }
 440        }
 441    }
 442
 443    fn block<F, T>(&self, future: &mut F, max_ticks: usize) -> Option<T>
 444    where
 445        F: Unpin + Future<Output = T>,
 446    {
 447        use rand::prelude::*;
 448
 449        let unparker = self.parker.lock().unparker();
 450        let waker = waker_fn::waker_fn(move || {
 451            unparker.unpark();
 452        });
 453
 454        let mut cx = Context::from_waker(&waker);
 455        for _ in 0..max_ticks {
 456            let mut state = self.state.lock();
 457            let runnable_count = state.scheduled_from_background.len();
 458            let ix = state.rng.gen_range(0..=runnable_count);
 459            if ix < state.scheduled_from_background.len() {
 460                let background_runnable = state.scheduled_from_background.remove(ix);
 461                state.push_to_history(ExecutorEvent::PollRunnable {
 462                    id: background_runnable.id,
 463                });
 464                drop(state);
 465                background_runnable.runnable.run();
 466            } else {
 467                drop(state);
 468                if let Poll::Ready(result) = future.poll(&mut cx) {
 469                    return Some(result);
 470                }
 471                let mut state = self.state.lock();
 472                if state.scheduled_from_background.is_empty() {
 473                    state.will_park();
 474                    drop(state);
 475                    self.parker.lock().park();
 476                }
 477
 478                continue;
 479            }
 480        }
 481
 482        None
 483    }
 484
 485    pub fn timer(&self, duration: Duration) -> Timer {
 486        let (tx, rx) = postage::barrier::channel();
 487        let mut state = self.state.lock();
 488        let wakeup_at = state.now + duration;
 489        let id = util::post_inc(&mut state.next_timer_id);
 490        match state
 491            .pending_timers
 492            .binary_search_by_key(&wakeup_at, |e| e.1)
 493        {
 494            Ok(ix) | Err(ix) => state.pending_timers.insert(ix, (id, wakeup_at, tx)),
 495        }
 496        let state = self.state.clone();
 497        Timer::Deterministic(DeterministicTimer { rx, id, state })
 498    }
 499
 500    pub fn now(&self) -> std::time::Instant {
 501        let state = self.state.lock();
 502        state.now
 503    }
 504
 505    pub fn advance_clock(&self, duration: Duration) {
 506        let new_now = self.state.lock().now + duration;
 507        loop {
 508            self.run_until_parked();
 509            let mut state = self.state.lock();
 510
 511            if let Some((_, wakeup_time, _)) = state.pending_timers.first() {
 512                let wakeup_time = *wakeup_time;
 513                if wakeup_time <= new_now {
 514                    let timer_count = state
 515                        .pending_timers
 516                        .iter()
 517                        .take_while(|(_, t, _)| *t == wakeup_time)
 518                        .count();
 519                    state.now = wakeup_time;
 520                    let timers_to_wake = state
 521                        .pending_timers
 522                        .drain(0..timer_count)
 523                        .collect::<Vec<_>>();
 524                    drop(state);
 525                    drop(timers_to_wake);
 526                    continue;
 527                }
 528            }
 529
 530            break;
 531        }
 532
 533        self.state.lock().now = new_now;
 534    }
 535
 536    pub fn start_waiting(&self) {
 537        self.state.lock().waiting_backtrace = Some(backtrace::Backtrace::new_unresolved());
 538    }
 539
 540    pub fn finish_waiting(&self) {
 541        self.state.lock().waiting_backtrace.take();
 542    }
 543
 544    pub fn forbid_parking(&self) {
 545        use rand::prelude::*;
 546
 547        let mut state = self.state.lock();
 548        state.forbid_parking = true;
 549        state.rng = StdRng::seed_from_u64(state.seed);
 550    }
 551
 552    pub fn allow_parking(&self) {
 553        use rand::prelude::*;
 554
 555        let mut state = self.state.lock();
 556        state.forbid_parking = false;
 557        state.rng = StdRng::seed_from_u64(state.seed);
 558    }
 559
 560    pub async fn simulate_random_delay(&self) {
 561        use rand::prelude::*;
 562        use smol::future::yield_now;
 563        if self.state.lock().rng.gen_bool(0.2) {
 564            let yields = self.state.lock().rng.gen_range(1..=10);
 565            for _ in 0..yields {
 566                yield_now().await;
 567            }
 568        }
 569    }
 570
 571    pub fn record_backtrace(&self) {
 572        let mut state = self.state.lock();
 573        if state.enable_runnable_backtraces {
 574            let current_id = state
 575                .poll_history
 576                .iter()
 577                .rev()
 578                .find_map(|event| match event {
 579                    ExecutorEvent::PollRunnable { id } => Some(*id),
 580                    _ => None,
 581                });
 582            if let Some(id) = current_id {
 583                state
 584                    .runnable_backtraces
 585                    .insert(id, backtrace::Backtrace::new_unresolved());
 586            }
 587        }
 588    }
 589}
 590
 591impl Drop for Timer {
 592    fn drop(&mut self) {
 593        #[cfg(any(test, feature = "test"))]
 594        if let Timer::Deterministic(DeterministicTimer { state, id, .. }) = self {
 595            state
 596                .lock()
 597                .pending_timers
 598                .retain(|(timer_id, _, _)| timer_id != id)
 599        }
 600    }
 601}
 602
 603impl Future for Timer {
 604    type Output = ();
 605
 606    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
 607        match &mut *self {
 608            #[cfg(any(test, feature = "test"))]
 609            Self::Deterministic(DeterministicTimer { rx, .. }) => {
 610                use postage::stream::{PollRecv, Stream as _};
 611                smol::pin!(rx);
 612                match rx.poll_recv(&mut postage::Context::from_waker(cx.waker())) {
 613                    PollRecv::Ready(()) | PollRecv::Closed => Poll::Ready(()),
 614                    PollRecv::Pending => Poll::Pending,
 615                }
 616            }
 617            Self::Production(timer) => {
 618                smol::pin!(timer);
 619                match timer.poll(cx) {
 620                    Poll::Ready(_) => Poll::Ready(()),
 621                    Poll::Pending => Poll::Pending,
 622                }
 623            }
 624        }
 625    }
 626}
 627
 628#[cfg(any(test, feature = "test"))]
 629impl DeterministicState {
 630    fn push_to_history(&mut self, event: ExecutorEvent) {
 631        use std::fmt::Write as _;
 632
 633        self.poll_history.push(event);
 634        if let Some(prev_history) = &self.previous_poll_history {
 635            let ix = self.poll_history.len() - 1;
 636            let prev_event = prev_history[ix];
 637            if event != prev_event {
 638                let mut message = String::new();
 639                writeln!(
 640                    &mut message,
 641                    "current runnable backtrace:\n{:?}",
 642                    self.runnable_backtraces.get_mut(&event.id()).map(|trace| {
 643                        trace.resolve();
 644                        crate::util::CwdBacktrace(trace)
 645                    })
 646                )
 647                .unwrap();
 648                writeln!(
 649                    &mut message,
 650                    "previous runnable backtrace:\n{:?}",
 651                    self.runnable_backtraces
 652                        .get_mut(&prev_event.id())
 653                        .map(|trace| {
 654                            trace.resolve();
 655                            util::CwdBacktrace(trace)
 656                        })
 657                )
 658                .unwrap();
 659                panic!("detected non-determinism after {ix}. {message}");
 660            }
 661        }
 662    }
 663
 664    fn will_park(&mut self) {
 665        if self.forbid_parking {
 666            let mut backtrace_message = String::new();
 667            #[cfg(any(test, feature = "test"))]
 668            if let Some(backtrace) = self.waiting_backtrace.as_mut() {
 669                backtrace.resolve();
 670                backtrace_message = format!(
 671                    "\nbacktrace of waiting future:\n{:?}",
 672                    util::CwdBacktrace(backtrace)
 673                );
 674            }
 675
 676            panic!(
 677                "deterministic executor parked after a call to forbid_parking{}",
 678                backtrace_message
 679            );
 680        }
 681    }
 682}
 683
 684#[cfg(any(test, feature = "test"))]
 685impl ExecutorEvent {
 686    pub fn id(&self) -> usize {
 687        match self {
 688            ExecutorEvent::PollRunnable { id } => *id,
 689            ExecutorEvent::EnqueuRunnable { id } => *id,
 690        }
 691    }
 692}
 693
 694impl ForegroundExecutor {
 695    pub fn new(dispatcher: Arc<dyn PlatformDispatcher>) -> Result<Self> {
 696        if dispatcher.is_main_thread() {
 697            Ok(Self::Platform {
 698                dispatcher,
 699                _not_send_or_sync: PhantomData,
 700            })
 701        } else {
 702            Err(anyhow!("must be constructed on main thread"))
 703        }
 704    }
 705
 706    pub fn spawn<T: 'static>(&self, future: impl Future<Output = T> + 'static) -> Task<T> {
 707        let future = any_local_future(future);
 708        let any_task = match self {
 709            #[cfg(any(test, feature = "test"))]
 710            Self::Deterministic { cx_id, executor } => {
 711                executor.spawn_from_foreground(*cx_id, future, false)
 712            }
 713            Self::Platform { dispatcher, .. } => {
 714                fn spawn_inner(
 715                    future: AnyLocalFuture,
 716                    dispatcher: &Arc<dyn PlatformDispatcher>,
 717                ) -> AnyLocalTask {
 718                    let dispatcher = dispatcher.clone();
 719                    let schedule =
 720                        move |runnable: Runnable| dispatcher.run_on_main_thread(runnable);
 721                    let (runnable, task) = async_task::spawn_local(future, schedule);
 722                    runnable.schedule();
 723                    task
 724                }
 725                spawn_inner(future, dispatcher)
 726            }
 727        };
 728        Task::local(any_task)
 729    }
 730
 731    #[cfg(any(test, feature = "test"))]
 732    pub fn run<T: 'static>(&self, future: impl Future<Output = T>) -> T {
 733        let future = async move { Box::new(future.await) as Box<dyn Any> }.boxed_local();
 734        let result = match self {
 735            Self::Deterministic { cx_id, executor } => executor.run(*cx_id, future),
 736            Self::Platform { .. } => panic!("you can't call run on a platform foreground executor"),
 737        };
 738        *result.downcast().unwrap()
 739    }
 740
 741    #[cfg(any(test, feature = "test"))]
 742    pub fn run_until_parked(&self) {
 743        match self {
 744            Self::Deterministic { executor, .. } => executor.run_until_parked(),
 745            _ => panic!("this method can only be called on a deterministic executor"),
 746        }
 747    }
 748
 749    #[cfg(any(test, feature = "test"))]
 750    pub fn parking_forbidden(&self) -> bool {
 751        match self {
 752            Self::Deterministic { executor, .. } => executor.state.lock().forbid_parking,
 753            _ => panic!("this method can only be called on a deterministic executor"),
 754        }
 755    }
 756
 757    #[cfg(any(test, feature = "test"))]
 758    pub fn start_waiting(&self) {
 759        match self {
 760            Self::Deterministic { executor, .. } => executor.start_waiting(),
 761            _ => panic!("this method can only be called on a deterministic executor"),
 762        }
 763    }
 764
 765    #[cfg(any(test, feature = "test"))]
 766    pub fn finish_waiting(&self) {
 767        match self {
 768            Self::Deterministic { executor, .. } => executor.finish_waiting(),
 769            _ => panic!("this method can only be called on a deterministic executor"),
 770        }
 771    }
 772
 773    #[cfg(any(test, feature = "test"))]
 774    pub fn forbid_parking(&self) {
 775        match self {
 776            Self::Deterministic { executor, .. } => executor.forbid_parking(),
 777            _ => panic!("this method can only be called on a deterministic executor"),
 778        }
 779    }
 780
 781    #[cfg(any(test, feature = "test"))]
 782    pub fn allow_parking(&self) {
 783        match self {
 784            Self::Deterministic { executor, .. } => executor.allow_parking(),
 785            _ => panic!("this method can only be called on a deterministic executor"),
 786        }
 787    }
 788
 789    #[cfg(any(test, feature = "test"))]
 790    pub fn advance_clock(&self, duration: Duration) {
 791        match self {
 792            Self::Deterministic { executor, .. } => executor.advance_clock(duration),
 793            _ => panic!("this method can only be called on a deterministic executor"),
 794        }
 795    }
 796
 797    #[cfg(any(test, feature = "test"))]
 798    pub fn set_block_on_ticks(&self, range: std::ops::RangeInclusive<usize>) {
 799        match self {
 800            Self::Deterministic { executor, .. } => executor.state.lock().block_on_ticks = range,
 801            _ => panic!("this method can only be called on a deterministic executor"),
 802        }
 803    }
 804}
 805
 806impl BackgroundExecutor {
 807    pub fn new() -> Self {
 808        let executor = Arc::new(Executor::new());
 809        let stop = channel::unbounded::<()>();
 810
 811        for i in 0..2 * num_cpus::get() {
 812            let executor = executor.clone();
 813            let stop = stop.1.clone();
 814            thread::Builder::new()
 815                .name(format!("background-executor-{}", i))
 816                .spawn(move || smol::block_on(executor.run(stop.recv())))
 817                .unwrap();
 818        }
 819
 820        Self::Production {
 821            executor,
 822            _stop: stop.0,
 823        }
 824    }
 825
 826    pub fn num_cpus(&self) -> usize {
 827        num_cpus::get()
 828    }
 829
 830    pub fn spawn<T, F>(&self, future: F) -> Task<T>
 831    where
 832        T: 'static + Send,
 833        F: Send + Future<Output = T> + 'static,
 834    {
 835        let future = any_future(future);
 836        let any_task = match self {
 837            Self::Production { executor, .. } => executor.spawn(future),
 838            #[cfg(any(test, feature = "test"))]
 839            Self::Deterministic { executor } => executor.spawn(future),
 840        };
 841        Task::send(any_task)
 842    }
 843
 844    pub fn block<F, T>(&self, future: F) -> T
 845    where
 846        F: Future<Output = T>,
 847    {
 848        smol::pin!(future);
 849        match self {
 850            Self::Production { .. } => smol::block_on(&mut future),
 851            #[cfg(any(test, feature = "test"))]
 852            Self::Deterministic { executor, .. } => {
 853                executor.block(&mut future, usize::MAX).unwrap()
 854            }
 855        }
 856    }
 857
 858    pub fn block_with_timeout<F, T>(
 859        &self,
 860        timeout: Duration,
 861        future: F,
 862    ) -> Result<T, impl Future<Output = T>>
 863    where
 864        T: 'static,
 865        F: 'static + Unpin + Future<Output = T>,
 866    {
 867        let mut future = any_local_future(future);
 868        if !timeout.is_zero() {
 869            let output = match self {
 870                Self::Production { .. } => smol::block_on(util::timeout(timeout, &mut future)).ok(),
 871                #[cfg(any(test, feature = "test"))]
 872                Self::Deterministic { executor, .. } => {
 873                    use rand::prelude::*;
 874                    let max_ticks = {
 875                        let mut state = executor.state.lock();
 876                        let range = state.block_on_ticks.clone();
 877                        state.rng.gen_range(range)
 878                    };
 879                    executor.block(&mut future, max_ticks)
 880                }
 881            };
 882            if let Some(output) = output {
 883                return Ok(*output.downcast().unwrap());
 884            }
 885        }
 886        Err(async { *future.await.downcast().unwrap() })
 887    }
 888
 889    pub async fn scoped<'scope, F>(self: &Arc<Self>, scheduler: F)
 890    where
 891        F: FnOnce(&mut Scope<'scope>),
 892    {
 893        let mut scope = Scope::new(self.clone());
 894        (scheduler)(&mut scope);
 895        let spawned = mem::take(&mut scope.futures)
 896            .into_iter()
 897            .map(|f| self.spawn(f))
 898            .collect::<Vec<_>>();
 899        for task in spawned {
 900            task.await;
 901        }
 902    }
 903
 904    pub fn timer(&self, duration: Duration) -> Timer {
 905        match self {
 906            BackgroundExecutor::Production { .. } => {
 907                Timer::Production(smol::Timer::after(duration))
 908            }
 909            #[cfg(any(test, feature = "test"))]
 910            BackgroundExecutor::Deterministic { executor } => executor.timer(duration),
 911        }
 912    }
 913
 914    pub fn now(&self) -> std::time::Instant {
 915        match self {
 916            BackgroundExecutor::Production { .. } => std::time::Instant::now(),
 917            #[cfg(any(test, feature = "test"))]
 918            BackgroundExecutor::Deterministic { executor } => executor.now(),
 919        }
 920    }
 921
 922    #[cfg(any(test, feature = "test"))]
 923    pub fn rng<'a>(&'a self) -> impl 'a + std::ops::DerefMut<Target = rand::prelude::StdRng> {
 924        match self {
 925            Self::Deterministic { executor, .. } => {
 926                parking_lot::lock_api::MutexGuard::map(executor.state.lock(), |s| &mut s.rng)
 927            }
 928            _ => panic!("this method can only be called on a deterministic executor"),
 929        }
 930    }
 931
 932    #[cfg(any(test, feature = "test"))]
 933    pub async fn simulate_random_delay(&self) {
 934        match self {
 935            Self::Deterministic { executor, .. } => {
 936                executor.simulate_random_delay().await;
 937            }
 938            _ => {
 939                panic!("this method can only be called on a deterministic executor")
 940            }
 941        }
 942    }
 943
 944    #[cfg(any(test, feature = "test"))]
 945    pub fn record_backtrace(&self) {
 946        match self {
 947            Self::Deterministic { executor, .. } => executor.record_backtrace(),
 948            _ => {
 949                panic!("this method can only be called on a deterministic executor")
 950            }
 951        }
 952    }
 953
 954    #[cfg(any(test, feature = "test"))]
 955    pub fn start_waiting(&self) {
 956        match self {
 957            Self::Deterministic { executor, .. } => executor.start_waiting(),
 958            _ => panic!("this method can only be called on a deterministic executor"),
 959        }
 960    }
 961}
 962
 963impl Default for BackgroundExecutor {
 964    fn default() -> Self {
 965        Self::new()
 966    }
 967}
 968
 969pub struct Scope<'a> {
 970    executor: Arc<BackgroundExecutor>,
 971    futures: Vec<Pin<Box<dyn Future<Output = ()> + Send + 'static>>>,
 972    tx: Option<mpsc::Sender<()>>,
 973    rx: mpsc::Receiver<()>,
 974    _phantom: PhantomData<&'a ()>,
 975}
 976
 977impl<'a> Scope<'a> {
 978    fn new(executor: Arc<BackgroundExecutor>) -> Self {
 979        let (tx, rx) = mpsc::channel(1);
 980        Self {
 981            executor,
 982            tx: Some(tx),
 983            rx,
 984            futures: Default::default(),
 985            _phantom: PhantomData,
 986        }
 987    }
 988
 989    pub fn spawn<F>(&mut self, f: F)
 990    where
 991        F: Future<Output = ()> + Send + 'a,
 992    {
 993        let tx = self.tx.clone().unwrap();
 994
 995        // Safety: The 'a lifetime is guaranteed to outlive any of these futures because
 996        // dropping this `Scope` blocks until all of the futures have resolved.
 997        let f = unsafe {
 998            mem::transmute::<
 999                Pin<Box<dyn Future<Output = ()> + Send + 'a>>,
1000                Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
1001            >(Box::pin(async move {
1002                f.await;
1003                drop(tx);
1004            }))
1005        };
1006        self.futures.push(f);
1007    }
1008}
1009
1010impl<'a> Drop for Scope<'a> {
1011    fn drop(&mut self) {
1012        self.tx.take().unwrap();
1013
1014        // Wait until the channel is closed, which means that all of the spawned
1015        // futures have resolved.
1016        self.executor.block(self.rx.next());
1017    }
1018}
1019
1020impl<T> Task<T> {
1021    pub fn ready(value: T) -> Self {
1022        Self::Ready(Some(value))
1023    }
1024
1025    fn local(any_task: AnyLocalTask) -> Self {
1026        Self::Local {
1027            any_task,
1028            result_type: PhantomData,
1029        }
1030    }
1031
1032    pub fn detach(self) {
1033        match self {
1034            Task::Ready(_) => {}
1035            Task::Local { any_task, .. } => any_task.detach(),
1036            Task::Send { any_task, .. } => any_task.detach(),
1037        }
1038    }
1039}
1040
1041// impl<T: 'static, E: 'static + Display> Task<Result<T, E>> {
1042//     #[track_caller]
1043//     pub fn detach_and_log_err(self, cx: &mut AppContext) {
1044//         let caller = Location::caller();
1045//         cx.spawn(|_| async move {
1046//             if let Err(err) = self.await {
1047//                 log::error!("{}:{}: {:#}", caller.file(), caller.line(), err);
1048//             }
1049//         })
1050//         .detach();
1051//     }
1052// }
1053
1054impl<T: Send> Task<T> {
1055    fn send(any_task: AnyTask) -> Self {
1056        Self::Send {
1057            any_task,
1058            result_type: PhantomData,
1059        }
1060    }
1061}
1062
1063impl<T: fmt::Debug> fmt::Debug for Task<T> {
1064    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1065        match self {
1066            Task::Ready(value) => value.fmt(f),
1067            Task::Local { any_task, .. } => any_task.fmt(f),
1068            Task::Send { any_task, .. } => any_task.fmt(f),
1069        }
1070    }
1071}
1072
1073impl<T: 'static> Future for Task<T> {
1074    type Output = T;
1075
1076    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1077        match unsafe { self.get_unchecked_mut() } {
1078            Task::Ready(value) => Poll::Ready(value.take().unwrap()),
1079            Task::Local { any_task, .. } => {
1080                any_task.poll(cx).map(|value| *value.downcast().unwrap())
1081            }
1082            Task::Send { any_task, .. } => {
1083                any_task.poll(cx).map(|value| *value.downcast().unwrap())
1084            }
1085        }
1086    }
1087}
1088
1089fn any_future<T, F>(future: F) -> AnyFuture
1090where
1091    T: 'static + Send,
1092    F: Future<Output = T> + Send + 'static,
1093{
1094    async { Box::new(future.await) as Box<dyn Any + Send> }.boxed()
1095}
1096
1097fn any_local_future<T, F>(future: F) -> AnyLocalFuture
1098where
1099    T: 'static,
1100    F: Future<Output = T> + 'static,
1101{
1102    async { Box::new(future.await) as Box<dyn Any> }.boxed_local()
1103}