executor.rs

   1use crate::util;
   2use anyhow::{anyhow, Result};
   3use async_task::Runnable;
   4use futures::channel::mpsc;
   5use smol::{channel, prelude::*, Executor};
   6use std::{
   7    any::Any,
   8    fmt::{self},
   9    marker::PhantomData,
  10    mem,
  11    pin::Pin,
  12    rc::Rc,
  13    sync::Arc,
  14    task::{Context, Poll},
  15    thread,
  16    time::Duration,
  17};
  18
  19use crate::PlatformDispatcher;
  20
  21pub enum ForegroundExecutor {
  22    Platform {
  23        dispatcher: Arc<dyn PlatformDispatcher>,
  24        _not_send_or_sync: PhantomData<Rc<()>>,
  25    },
  26    #[cfg(any(test, feature = "test"))]
  27    Deterministic {
  28        cx_id: usize,
  29        executor: Arc<Deterministic>,
  30    },
  31}
  32
  33pub enum BackgroundExecutor {
  34    #[cfg(any(test, feature = "test"))]
  35    Deterministic { executor: Arc<Deterministic> },
  36    Production {
  37        executor: Arc<smol::Executor<'static>>,
  38        _stop: channel::Sender<()>,
  39    },
  40}
  41
  42type AnyLocalFuture = Pin<Box<dyn 'static + Future<Output = Box<dyn Any + 'static>>>>;
  43type AnyFuture = Pin<Box<dyn 'static + Send + Future<Output = Box<dyn Any + Send + 'static>>>>;
  44type AnyTask = async_task::Task<Box<dyn Any + Send + 'static>>;
  45type AnyLocalTask = async_task::Task<Box<dyn Any + 'static>>;
  46
  47#[must_use]
  48pub enum Task<T> {
  49    Ready(Option<T>),
  50    Local {
  51        any_task: AnyLocalTask,
  52        result_type: PhantomData<T>,
  53    },
  54    Send {
  55        any_task: AnyTask,
  56        result_type: PhantomData<T>,
  57    },
  58}
  59
  60unsafe impl<T: Send> Send for Task<T> {}
  61
  62#[cfg(any(test, feature = "test"))]
  63struct DeterministicState {
  64    rng: rand::prelude::StdRng,
  65    seed: u64,
  66    scheduled_from_foreground: collections::HashMap<usize, Vec<ForegroundRunnable>>,
  67    scheduled_from_background: Vec<BackgroundRunnable>,
  68    forbid_parking: bool,
  69    block_on_ticks: std::ops::RangeInclusive<usize>,
  70    now: std::time::Instant,
  71    next_timer_id: usize,
  72    pending_timers: Vec<(usize, std::time::Instant, postage::barrier::Sender)>,
  73    waiting_backtrace: Option<backtrace::Backtrace>,
  74    next_runnable_id: usize,
  75    poll_history: Vec<ExecutorEvent>,
  76    previous_poll_history: Option<Vec<ExecutorEvent>>,
  77    enable_runnable_backtraces: bool,
  78    runnable_backtraces: collections::HashMap<usize, backtrace::Backtrace>,
  79}
  80
  81#[derive(Copy, Clone, Debug, PartialEq, Eq)]
  82pub enum ExecutorEvent {
  83    PollRunnable { id: usize },
  84    EnqueuRunnable { id: usize },
  85}
  86
  87#[cfg(any(test, feature = "test"))]
  88struct ForegroundRunnable {
  89    id: usize,
  90    runnable: Runnable,
  91    main: bool,
  92}
  93
  94#[cfg(any(test, feature = "test"))]
  95struct BackgroundRunnable {
  96    id: usize,
  97    runnable: Runnable,
  98}
  99
 100#[cfg(any(test, feature = "test"))]
 101pub struct Deterministic {
 102    state: Arc<parking_lot::Mutex<DeterministicState>>,
 103    parker: parking_lot::Mutex<parking::Parker>,
 104}
 105
 106#[must_use]
 107pub enum Timer {
 108    Production(smol::Timer),
 109    #[cfg(any(test, feature = "test"))]
 110    Deterministic(DeterministicTimer),
 111}
 112
 113#[cfg(any(test, feature = "test"))]
 114pub struct DeterministicTimer {
 115    rx: postage::barrier::Receiver,
 116    id: usize,
 117    state: Arc<parking_lot::Mutex<DeterministicState>>,
 118}
 119
 120#[cfg(any(test, feature = "test"))]
 121impl Deterministic {
 122    pub fn new(seed: u64) -> Arc<Self> {
 123        use rand::prelude::*;
 124
 125        Arc::new(Self {
 126            state: Arc::new(parking_lot::Mutex::new(DeterministicState {
 127                rng: StdRng::seed_from_u64(seed),
 128                seed,
 129                scheduled_from_foreground: Default::default(),
 130                scheduled_from_background: Default::default(),
 131                forbid_parking: false,
 132                block_on_ticks: 0..=1000,
 133                now: std::time::Instant::now(),
 134                next_timer_id: Default::default(),
 135                pending_timers: Default::default(),
 136                waiting_backtrace: None,
 137                next_runnable_id: 0,
 138                poll_history: Default::default(),
 139                previous_poll_history: Default::default(),
 140                enable_runnable_backtraces: false,
 141                runnable_backtraces: Default::default(),
 142            })),
 143            parker: Default::default(),
 144        })
 145    }
 146
 147    pub fn execution_history(&self) -> Vec<ExecutorEvent> {
 148        self.state.lock().poll_history.clone()
 149    }
 150
 151    pub fn set_previous_execution_history(&self, history: Option<Vec<ExecutorEvent>>) {
 152        self.state.lock().previous_poll_history = history;
 153    }
 154
 155    pub fn enable_runnable_backtrace(&self) {
 156        self.state.lock().enable_runnable_backtraces = true;
 157    }
 158
 159    pub fn runnable_backtrace(&self, runnable_id: usize) -> backtrace::Backtrace {
 160        let mut backtrace = self.state.lock().runnable_backtraces[&runnable_id].clone();
 161        backtrace.resolve();
 162        backtrace
 163    }
 164
 165    pub fn build_background(self: &Arc<Self>) -> Arc<BackgroundExecutor> {
 166        Arc::new(BackgroundExecutor::Deterministic {
 167            executor: self.clone(),
 168        })
 169    }
 170
 171    pub fn build_foreground(self: &Arc<Self>, id: usize) -> Rc<ForegroundExecutor> {
 172        Rc::new(ForegroundExecutor::Deterministic {
 173            cx_id: id,
 174            executor: self.clone(),
 175        })
 176    }
 177
 178    fn spawn_from_foreground(
 179        &self,
 180        cx_id: usize,
 181        future: AnyLocalFuture,
 182        main: bool,
 183    ) -> AnyLocalTask {
 184        let state = self.state.clone();
 185        let id;
 186        {
 187            let mut state = state.lock();
 188            id = util::post_inc(&mut state.next_runnable_id);
 189            if state.enable_runnable_backtraces {
 190                state
 191                    .runnable_backtraces
 192                    .insert(id, backtrace::Backtrace::new_unresolved());
 193            }
 194        }
 195
 196        let unparker = self.parker.lock().unparker();
 197        let (runnable, task) = async_task::spawn_local(future, move |runnable| {
 198            let mut state = state.lock();
 199            state.push_to_history(ExecutorEvent::EnqueuRunnable { id });
 200            state
 201                .scheduled_from_foreground
 202                .entry(cx_id)
 203                .or_default()
 204                .push(ForegroundRunnable { id, runnable, main });
 205            unparker.unpark();
 206        });
 207        runnable.schedule();
 208        task
 209    }
 210
 211    fn spawn(&self, future: AnyFuture) -> AnyTask {
 212        let state = self.state.clone();
 213        let id;
 214        {
 215            let mut state = state.lock();
 216            id = util::post_inc(&mut state.next_runnable_id);
 217            if state.enable_runnable_backtraces {
 218                state
 219                    .runnable_backtraces
 220                    .insert(id, backtrace::Backtrace::new_unresolved());
 221            }
 222        }
 223
 224        let unparker = self.parker.lock().unparker();
 225        let (runnable, task) = async_task::spawn(future, move |runnable| {
 226            let mut state = state.lock();
 227            state
 228                .poll_history
 229                .push(ExecutorEvent::EnqueuRunnable { id });
 230            state
 231                .scheduled_from_background
 232                .push(BackgroundRunnable { id, runnable });
 233            unparker.unpark();
 234        });
 235        runnable.schedule();
 236        task
 237    }
 238
 239    fn run<'a>(
 240        &self,
 241        cx_id: usize,
 242        main_future: Pin<Box<dyn 'a + Future<Output = Box<dyn Any>>>>,
 243    ) -> Box<dyn Any> {
 244        use std::sync::atomic::{AtomicBool, Ordering::SeqCst};
 245
 246        let woken = Arc::new(AtomicBool::new(false));
 247
 248        let state = self.state.clone();
 249        let id;
 250        {
 251            let mut state = state.lock();
 252            id = util::post_inc(&mut state.next_runnable_id);
 253            if state.enable_runnable_backtraces {
 254                state
 255                    .runnable_backtraces
 256                    .insert(id, backtrace::Backtrace::new_unresolved());
 257            }
 258        }
 259
 260        let unparker = self.parker.lock().unparker();
 261        let (runnable, mut main_task) = unsafe {
 262            async_task::spawn_unchecked(main_future, move |runnable| {
 263                let state = &mut *state.lock();
 264                state
 265                    .scheduled_from_foreground
 266                    .entry(cx_id)
 267                    .or_default()
 268                    .push(ForegroundRunnable {
 269                        id: util::post_inc(&mut state.next_runnable_id),
 270                        runnable,
 271                        main: true,
 272                    });
 273                unparker.unpark();
 274            })
 275        };
 276        runnable.schedule();
 277
 278        loop {
 279            if let Some(result) = self.run_internal(woken.clone(), Some(&mut main_task)) {
 280                return result;
 281            }
 282
 283            if !woken.load(SeqCst) {
 284                self.state.lock().will_park();
 285            }
 286
 287            woken.store(false, SeqCst);
 288            self.parker.lock().park();
 289        }
 290    }
 291
 292    pub fn run_until_parked(&self) {
 293        use std::sync::atomic::AtomicBool;
 294        let woken = Arc::new(AtomicBool::new(false));
 295        self.run_internal(woken, None);
 296    }
 297
 298    fn run_internal(
 299        &self,
 300        woken: Arc<std::sync::atomic::AtomicBool>,
 301        mut main_task: Option<&mut AnyLocalTask>,
 302    ) -> Option<Box<dyn Any>> {
 303        use rand::prelude::*;
 304        use std::sync::atomic::Ordering::SeqCst;
 305
 306        let unparker = self.parker.lock().unparker();
 307        let waker = waker_fn::waker_fn(move || {
 308            woken.store(true, SeqCst);
 309            unparker.unpark();
 310        });
 311
 312        let mut cx = Context::from_waker(&waker);
 313        loop {
 314            let mut state = self.state.lock();
 315
 316            if state.scheduled_from_foreground.is_empty()
 317                && state.scheduled_from_background.is_empty()
 318            {
 319                if let Some(main_task) = main_task {
 320                    if let Poll::Ready(result) = main_task.poll(&mut cx) {
 321                        return Some(result);
 322                    }
 323                }
 324
 325                return None;
 326            }
 327
 328            if !state.scheduled_from_background.is_empty() && state.rng.gen() {
 329                let background_len = state.scheduled_from_background.len();
 330                let ix = state.rng.gen_range(0..background_len);
 331                let background_runnable = state.scheduled_from_background.remove(ix);
 332                state.push_to_history(ExecutorEvent::PollRunnable {
 333                    id: background_runnable.id,
 334                });
 335                drop(state);
 336                background_runnable.runnable.run();
 337            } else if !state.scheduled_from_foreground.is_empty() {
 338                let available_cx_ids = state
 339                    .scheduled_from_foreground
 340                    .keys()
 341                    .copied()
 342                    .collect::<Vec<_>>();
 343                let cx_id_to_run = *available_cx_ids.iter().choose(&mut state.rng).unwrap();
 344                let scheduled_from_cx = state
 345                    .scheduled_from_foreground
 346                    .get_mut(&cx_id_to_run)
 347                    .unwrap();
 348                let foreground_runnable = scheduled_from_cx.remove(0);
 349                if scheduled_from_cx.is_empty() {
 350                    state.scheduled_from_foreground.remove(&cx_id_to_run);
 351                }
 352                state.push_to_history(ExecutorEvent::PollRunnable {
 353                    id: foreground_runnable.id,
 354                });
 355
 356                drop(state);
 357
 358                foreground_runnable.runnable.run();
 359                if let Some(main_task) = main_task.as_mut() {
 360                    if foreground_runnable.main {
 361                        if let Poll::Ready(result) = main_task.poll(&mut cx) {
 362                            return Some(result);
 363                        }
 364                    }
 365                }
 366            }
 367        }
 368    }
 369
 370    fn block<F, T>(&self, future: &mut F, max_ticks: usize) -> Option<T>
 371    where
 372        F: Unpin + Future<Output = T>,
 373    {
 374        use rand::prelude::*;
 375
 376        let unparker = self.parker.lock().unparker();
 377        let waker = waker_fn::waker_fn(move || {
 378            unparker.unpark();
 379        });
 380
 381        let mut cx = Context::from_waker(&waker);
 382        for _ in 0..max_ticks {
 383            let mut state = self.state.lock();
 384            let runnable_count = state.scheduled_from_background.len();
 385            let ix = state.rng.gen_range(0..=runnable_count);
 386            if ix < state.scheduled_from_background.len() {
 387                let background_runnable = state.scheduled_from_background.remove(ix);
 388                state.push_to_history(ExecutorEvent::PollRunnable {
 389                    id: background_runnable.id,
 390                });
 391                drop(state);
 392                background_runnable.runnable.run();
 393            } else {
 394                drop(state);
 395                if let Poll::Ready(result) = future.poll(&mut cx) {
 396                    return Some(result);
 397                }
 398                let mut state = self.state.lock();
 399                if state.scheduled_from_background.is_empty() {
 400                    state.will_park();
 401                    drop(state);
 402                    self.parker.lock().park();
 403                }
 404
 405                continue;
 406            }
 407        }
 408
 409        None
 410    }
 411
 412    pub fn timer(&self, duration: Duration) -> Timer {
 413        let (tx, rx) = postage::barrier::channel();
 414        let mut state = self.state.lock();
 415        let wakeup_at = state.now + duration;
 416        let id = util::post_inc(&mut state.next_timer_id);
 417        match state
 418            .pending_timers
 419            .binary_search_by_key(&wakeup_at, |e| e.1)
 420        {
 421            Ok(ix) | Err(ix) => state.pending_timers.insert(ix, (id, wakeup_at, tx)),
 422        }
 423        let state = self.state.clone();
 424        Timer::Deterministic(DeterministicTimer { rx, id, state })
 425    }
 426
 427    pub fn now(&self) -> std::time::Instant {
 428        let state = self.state.lock();
 429        state.now
 430    }
 431
 432    pub fn advance_clock(&self, duration: Duration) {
 433        let new_now = self.state.lock().now + duration;
 434        loop {
 435            self.run_until_parked();
 436            let mut state = self.state.lock();
 437
 438            if let Some((_, wakeup_time, _)) = state.pending_timers.first() {
 439                let wakeup_time = *wakeup_time;
 440                if wakeup_time <= new_now {
 441                    let timer_count = state
 442                        .pending_timers
 443                        .iter()
 444                        .take_while(|(_, t, _)| *t == wakeup_time)
 445                        .count();
 446                    state.now = wakeup_time;
 447                    let timers_to_wake = state
 448                        .pending_timers
 449                        .drain(0..timer_count)
 450                        .collect::<Vec<_>>();
 451                    drop(state);
 452                    drop(timers_to_wake);
 453                    continue;
 454                }
 455            }
 456
 457            break;
 458        }
 459
 460        self.state.lock().now = new_now;
 461    }
 462
 463    pub fn start_waiting(&self) {
 464        self.state.lock().waiting_backtrace = Some(backtrace::Backtrace::new_unresolved());
 465    }
 466
 467    pub fn finish_waiting(&self) {
 468        self.state.lock().waiting_backtrace.take();
 469    }
 470
 471    pub fn forbid_parking(&self) {
 472        use rand::prelude::*;
 473
 474        let mut state = self.state.lock();
 475        state.forbid_parking = true;
 476        state.rng = StdRng::seed_from_u64(state.seed);
 477    }
 478
 479    pub fn allow_parking(&self) {
 480        use rand::prelude::*;
 481
 482        let mut state = self.state.lock();
 483        state.forbid_parking = false;
 484        state.rng = StdRng::seed_from_u64(state.seed);
 485    }
 486
 487    pub async fn simulate_random_delay(&self) {
 488        use rand::prelude::*;
 489        use smol::future::yield_now;
 490        if self.state.lock().rng.gen_bool(0.2) {
 491            let yields = self.state.lock().rng.gen_range(1..=10);
 492            for _ in 0..yields {
 493                yield_now().await;
 494            }
 495        }
 496    }
 497
 498    pub fn record_backtrace(&self) {
 499        let mut state = self.state.lock();
 500        if state.enable_runnable_backtraces {
 501            let current_id = state
 502                .poll_history
 503                .iter()
 504                .rev()
 505                .find_map(|event| match event {
 506                    ExecutorEvent::PollRunnable { id } => Some(*id),
 507                    _ => None,
 508                });
 509            if let Some(id) = current_id {
 510                state
 511                    .runnable_backtraces
 512                    .insert(id, backtrace::Backtrace::new_unresolved());
 513            }
 514        }
 515    }
 516}
 517
 518impl Drop for Timer {
 519    fn drop(&mut self) {
 520        #[cfg(any(test, feature = "test"))]
 521        if let Timer::Deterministic(DeterministicTimer { state, id, .. }) = self {
 522            state
 523                .lock()
 524                .pending_timers
 525                .retain(|(timer_id, _, _)| timer_id != id)
 526        }
 527    }
 528}
 529
 530impl Future for Timer {
 531    type Output = ();
 532
 533    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
 534        match &mut *self {
 535            #[cfg(any(test, feature = "test"))]
 536            Self::Deterministic(DeterministicTimer { rx, .. }) => {
 537                use postage::stream::{PollRecv, Stream as _};
 538                smol::pin!(rx);
 539                match rx.poll_recv(&mut postage::Context::from_waker(cx.waker())) {
 540                    PollRecv::Ready(()) | PollRecv::Closed => Poll::Ready(()),
 541                    PollRecv::Pending => Poll::Pending,
 542                }
 543            }
 544            Self::Production(timer) => {
 545                smol::pin!(timer);
 546                match timer.poll(cx) {
 547                    Poll::Ready(_) => Poll::Ready(()),
 548                    Poll::Pending => Poll::Pending,
 549                }
 550            }
 551        }
 552    }
 553}
 554
 555#[cfg(any(test, feature = "test"))]
 556impl DeterministicState {
 557    fn push_to_history(&mut self, event: ExecutorEvent) {
 558        use std::fmt::Write as _;
 559
 560        self.poll_history.push(event);
 561        if let Some(prev_history) = &self.previous_poll_history {
 562            let ix = self.poll_history.len() - 1;
 563            let prev_event = prev_history[ix];
 564            if event != prev_event {
 565                let mut message = String::new();
 566                writeln!(
 567                    &mut message,
 568                    "current runnable backtrace:\n{:?}",
 569                    self.runnable_backtraces.get_mut(&event.id()).map(|trace| {
 570                        trace.resolve();
 571                        crate::util::CwdBacktrace(trace)
 572                    })
 573                )
 574                .unwrap();
 575                writeln!(
 576                    &mut message,
 577                    "previous runnable backtrace:\n{:?}",
 578                    self.runnable_backtraces
 579                        .get_mut(&prev_event.id())
 580                        .map(|trace| {
 581                            trace.resolve();
 582                            util::CwdBacktrace(trace)
 583                        })
 584                )
 585                .unwrap();
 586                panic!("detected non-determinism after {ix}. {message}");
 587            }
 588        }
 589    }
 590
 591    fn will_park(&mut self) {
 592        if self.forbid_parking {
 593            let mut backtrace_message = String::new();
 594            #[cfg(any(test, feature = "test"))]
 595            if let Some(backtrace) = self.waiting_backtrace.as_mut() {
 596                backtrace.resolve();
 597                backtrace_message = format!(
 598                    "\nbacktrace of waiting future:\n{:?}",
 599                    util::CwdBacktrace(backtrace)
 600                );
 601            }
 602
 603            panic!(
 604                "deterministic executor parked after a call to forbid_parking{}",
 605                backtrace_message
 606            );
 607        }
 608    }
 609}
 610
 611#[cfg(any(test, feature = "test"))]
 612impl ExecutorEvent {
 613    pub fn id(&self) -> usize {
 614        match self {
 615            ExecutorEvent::PollRunnable { id } => *id,
 616            ExecutorEvent::EnqueuRunnable { id } => *id,
 617        }
 618    }
 619}
 620
 621impl ForegroundExecutor {
 622    pub fn new(dispatcher: Arc<dyn PlatformDispatcher>) -> Result<Self> {
 623        if dispatcher.is_main_thread() {
 624            Ok(Self::Platform {
 625                dispatcher,
 626                _not_send_or_sync: PhantomData,
 627            })
 628        } else {
 629            Err(anyhow!("must be constructed on main thread"))
 630        }
 631    }
 632
 633    pub fn spawn<T: 'static>(&self, future: impl Future<Output = T> + 'static) -> Task<T> {
 634        let future = any_local_future(future);
 635        let any_task = match self {
 636            #[cfg(any(test, feature = "test"))]
 637            Self::Deterministic { cx_id, executor } => {
 638                executor.spawn_from_foreground(*cx_id, future, false)
 639            }
 640            Self::Platform { dispatcher, .. } => {
 641                fn spawn_inner(
 642                    future: AnyLocalFuture,
 643                    dispatcher: &Arc<dyn PlatformDispatcher>,
 644                ) -> AnyLocalTask {
 645                    let dispatcher = dispatcher.clone();
 646                    let schedule =
 647                        move |runnable: Runnable| dispatcher.run_on_main_thread(runnable);
 648                    let (runnable, task) = async_task::spawn_local(future, schedule);
 649                    runnable.schedule();
 650                    task
 651                }
 652                spawn_inner(future, dispatcher)
 653            }
 654        };
 655        Task::local(any_task)
 656    }
 657
 658    #[cfg(any(test, feature = "test"))]
 659    pub fn run<T: 'static>(&self, future: impl Future<Output = T>) -> T {
 660        let future = async move { Box::new(future.await) as Box<dyn Any> }.boxed_local();
 661        let result = match self {
 662            Self::Deterministic { cx_id, executor } => executor.run(*cx_id, future),
 663            Self::Platform { .. } => panic!("you can't call run on a platform foreground executor"),
 664        };
 665        *result.downcast().unwrap()
 666    }
 667
 668    #[cfg(any(test, feature = "test"))]
 669    pub fn run_until_parked(&self) {
 670        match self {
 671            Self::Deterministic { executor, .. } => executor.run_until_parked(),
 672            _ => panic!("this method can only be called on a deterministic executor"),
 673        }
 674    }
 675
 676    #[cfg(any(test, feature = "test"))]
 677    pub fn parking_forbidden(&self) -> bool {
 678        match self {
 679            Self::Deterministic { executor, .. } => executor.state.lock().forbid_parking,
 680            _ => panic!("this method can only be called on a deterministic executor"),
 681        }
 682    }
 683
 684    #[cfg(any(test, feature = "test"))]
 685    pub fn start_waiting(&self) {
 686        match self {
 687            Self::Deterministic { executor, .. } => executor.start_waiting(),
 688            _ => panic!("this method can only be called on a deterministic executor"),
 689        }
 690    }
 691
 692    #[cfg(any(test, feature = "test"))]
 693    pub fn finish_waiting(&self) {
 694        match self {
 695            Self::Deterministic { executor, .. } => executor.finish_waiting(),
 696            _ => panic!("this method can only be called on a deterministic executor"),
 697        }
 698    }
 699
 700    #[cfg(any(test, feature = "test"))]
 701    pub fn forbid_parking(&self) {
 702        match self {
 703            Self::Deterministic { executor, .. } => executor.forbid_parking(),
 704            _ => panic!("this method can only be called on a deterministic executor"),
 705        }
 706    }
 707
 708    #[cfg(any(test, feature = "test"))]
 709    pub fn allow_parking(&self) {
 710        match self {
 711            Self::Deterministic { executor, .. } => executor.allow_parking(),
 712            _ => panic!("this method can only be called on a deterministic executor"),
 713        }
 714    }
 715
 716    #[cfg(any(test, feature = "test"))]
 717    pub fn advance_clock(&self, duration: Duration) {
 718        match self {
 719            Self::Deterministic { executor, .. } => executor.advance_clock(duration),
 720            _ => panic!("this method can only be called on a deterministic executor"),
 721        }
 722    }
 723
 724    #[cfg(any(test, feature = "test"))]
 725    pub fn set_block_on_ticks(&self, range: std::ops::RangeInclusive<usize>) {
 726        match self {
 727            Self::Deterministic { executor, .. } => executor.state.lock().block_on_ticks = range,
 728            _ => panic!("this method can only be called on a deterministic executor"),
 729        }
 730    }
 731}
 732
 733impl BackgroundExecutor {
 734    pub fn new() -> Self {
 735        let executor = Arc::new(Executor::new());
 736        let stop = channel::unbounded::<()>();
 737
 738        for i in 0..2 * num_cpus::get() {
 739            let executor = executor.clone();
 740            let stop = stop.1.clone();
 741            thread::Builder::new()
 742                .name(format!("background-executor-{}", i))
 743                .spawn(move || smol::block_on(executor.run(stop.recv())))
 744                .unwrap();
 745        }
 746
 747        Self::Production {
 748            executor,
 749            _stop: stop.0,
 750        }
 751    }
 752
 753    pub fn num_cpus(&self) -> usize {
 754        num_cpus::get()
 755    }
 756
 757    pub fn spawn<T, F>(&self, future: F) -> Task<T>
 758    where
 759        T: 'static + Send,
 760        F: Send + Future<Output = T> + 'static,
 761    {
 762        let future = any_future(future);
 763        let any_task = match self {
 764            Self::Production { executor, .. } => executor.spawn(future),
 765            #[cfg(any(test, feature = "test"))]
 766            Self::Deterministic { executor } => executor.spawn(future),
 767        };
 768        Task::send(any_task)
 769    }
 770
 771    pub fn block<F, T>(&self, future: F) -> T
 772    where
 773        F: Future<Output = T>,
 774    {
 775        smol::pin!(future);
 776        match self {
 777            Self::Production { .. } => smol::block_on(&mut future),
 778            #[cfg(any(test, feature = "test"))]
 779            Self::Deterministic { executor, .. } => {
 780                executor.block(&mut future, usize::MAX).unwrap()
 781            }
 782        }
 783    }
 784
 785    pub fn block_with_timeout<F, T>(
 786        &self,
 787        timeout: Duration,
 788        future: F,
 789    ) -> Result<T, impl Future<Output = T>>
 790    where
 791        T: 'static,
 792        F: 'static + Unpin + Future<Output = T>,
 793    {
 794        let mut future = any_local_future(future);
 795        if !timeout.is_zero() {
 796            let output = match self {
 797                Self::Production { .. } => smol::block_on(util::timeout(timeout, &mut future)).ok(),
 798                #[cfg(any(test, feature = "test"))]
 799                Self::Deterministic { executor, .. } => {
 800                    use rand::prelude::*;
 801                    let max_ticks = {
 802                        let mut state = executor.state.lock();
 803                        let range = state.block_on_ticks.clone();
 804                        state.rng.gen_range(range)
 805                    };
 806                    executor.block(&mut future, max_ticks)
 807                }
 808            };
 809            if let Some(output) = output {
 810                return Ok(*output.downcast().unwrap());
 811            }
 812        }
 813        Err(async { *future.await.downcast().unwrap() })
 814    }
 815
 816    pub async fn scoped<'scope, F>(self: &Arc<Self>, scheduler: F)
 817    where
 818        F: FnOnce(&mut Scope<'scope>),
 819    {
 820        let mut scope = Scope::new(self.clone());
 821        (scheduler)(&mut scope);
 822        let spawned = mem::take(&mut scope.futures)
 823            .into_iter()
 824            .map(|f| self.spawn(f))
 825            .collect::<Vec<_>>();
 826        for task in spawned {
 827            task.await;
 828        }
 829    }
 830
 831    pub fn timer(&self, duration: Duration) -> Timer {
 832        match self {
 833            BackgroundExecutor::Production { .. } => {
 834                Timer::Production(smol::Timer::after(duration))
 835            }
 836            #[cfg(any(test, feature = "test"))]
 837            BackgroundExecutor::Deterministic { executor } => executor.timer(duration),
 838        }
 839    }
 840
 841    pub fn now(&self) -> std::time::Instant {
 842        match self {
 843            BackgroundExecutor::Production { .. } => std::time::Instant::now(),
 844            #[cfg(any(test, feature = "test"))]
 845            BackgroundExecutor::Deterministic { executor } => executor.now(),
 846        }
 847    }
 848
 849    #[cfg(any(test, feature = "test"))]
 850    pub fn rng<'a>(&'a self) -> impl 'a + std::ops::DerefMut<Target = rand::prelude::StdRng> {
 851        match self {
 852            Self::Deterministic { executor, .. } => {
 853                parking_lot::lock_api::MutexGuard::map(executor.state.lock(), |s| &mut s.rng)
 854            }
 855            _ => panic!("this method can only be called on a deterministic executor"),
 856        }
 857    }
 858
 859    #[cfg(any(test, feature = "test"))]
 860    pub async fn simulate_random_delay(&self) {
 861        match self {
 862            Self::Deterministic { executor, .. } => {
 863                executor.simulate_random_delay().await;
 864            }
 865            _ => {
 866                panic!("this method can only be called on a deterministic executor")
 867            }
 868        }
 869    }
 870
 871    #[cfg(any(test, feature = "test"))]
 872    pub fn record_backtrace(&self) {
 873        match self {
 874            Self::Deterministic { executor, .. } => executor.record_backtrace(),
 875            _ => {
 876                panic!("this method can only be called on a deterministic executor")
 877            }
 878        }
 879    }
 880
 881    #[cfg(any(test, feature = "test"))]
 882    pub fn start_waiting(&self) {
 883        match self {
 884            Self::Deterministic { executor, .. } => executor.start_waiting(),
 885            _ => panic!("this method can only be called on a deterministic executor"),
 886        }
 887    }
 888}
 889
 890impl Default for BackgroundExecutor {
 891    fn default() -> Self {
 892        Self::new()
 893    }
 894}
 895
 896pub struct Scope<'a> {
 897    executor: Arc<BackgroundExecutor>,
 898    futures: Vec<Pin<Box<dyn Future<Output = ()> + Send + 'static>>>,
 899    tx: Option<mpsc::Sender<()>>,
 900    rx: mpsc::Receiver<()>,
 901    _phantom: PhantomData<&'a ()>,
 902}
 903
 904impl<'a> Scope<'a> {
 905    fn new(executor: Arc<BackgroundExecutor>) -> Self {
 906        let (tx, rx) = mpsc::channel(1);
 907        Self {
 908            executor,
 909            tx: Some(tx),
 910            rx,
 911            futures: Default::default(),
 912            _phantom: PhantomData,
 913        }
 914    }
 915
 916    pub fn spawn<F>(&mut self, f: F)
 917    where
 918        F: Future<Output = ()> + Send + 'a,
 919    {
 920        let tx = self.tx.clone().unwrap();
 921
 922        // Safety: The 'a lifetime is guaranteed to outlive any of these futures because
 923        // dropping this `Scope` blocks until all of the futures have resolved.
 924        let f = unsafe {
 925            mem::transmute::<
 926                Pin<Box<dyn Future<Output = ()> + Send + 'a>>,
 927                Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
 928            >(Box::pin(async move {
 929                f.await;
 930                drop(tx);
 931            }))
 932        };
 933        self.futures.push(f);
 934    }
 935}
 936
 937impl<'a> Drop for Scope<'a> {
 938    fn drop(&mut self) {
 939        self.tx.take().unwrap();
 940
 941        // Wait until the channel is closed, which means that all of the spawned
 942        // futures have resolved.
 943        self.executor.block(self.rx.next());
 944    }
 945}
 946
 947impl<T> Task<T> {
 948    pub fn ready(value: T) -> Self {
 949        Self::Ready(Some(value))
 950    }
 951
 952    fn local(any_task: AnyLocalTask) -> Self {
 953        Self::Local {
 954            any_task,
 955            result_type: PhantomData,
 956        }
 957    }
 958
 959    pub fn detach(self) {
 960        match self {
 961            Task::Ready(_) => {}
 962            Task::Local { any_task, .. } => any_task.detach(),
 963            Task::Send { any_task, .. } => any_task.detach(),
 964        }
 965    }
 966}
 967
 968// impl<T: 'static, E: 'static + Display> Task<Result<T, E>> {
 969//     #[track_caller]
 970//     pub fn detach_and_log_err(self, cx: &mut AppContext) {
 971//         let caller = Location::caller();
 972//         cx.spawn(|_| async move {
 973//             if let Err(err) = self.await {
 974//                 log::error!("{}:{}: {:#}", caller.file(), caller.line(), err);
 975//             }
 976//         })
 977//         .detach();
 978//     }
 979// }
 980
 981impl<T: Send> Task<T> {
 982    fn send(any_task: AnyTask) -> Self {
 983        Self::Send {
 984            any_task,
 985            result_type: PhantomData,
 986        }
 987    }
 988}
 989
 990impl<T: fmt::Debug> fmt::Debug for Task<T> {
 991    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
 992        match self {
 993            Task::Ready(value) => value.fmt(f),
 994            Task::Local { any_task, .. } => any_task.fmt(f),
 995            Task::Send { any_task, .. } => any_task.fmt(f),
 996        }
 997    }
 998}
 999
1000impl<T: 'static> Future for Task<T> {
1001    type Output = T;
1002
1003    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1004        match unsafe { self.get_unchecked_mut() } {
1005            Task::Ready(value) => Poll::Ready(value.take().unwrap()),
1006            Task::Local { any_task, .. } => {
1007                any_task.poll(cx).map(|value| *value.downcast().unwrap())
1008            }
1009            Task::Send { any_task, .. } => {
1010                any_task.poll(cx).map(|value| *value.downcast().unwrap())
1011            }
1012        }
1013    }
1014}
1015
1016fn any_future<T, F>(future: F) -> AnyFuture
1017where
1018    T: 'static + Send,
1019    F: Future<Output = T> + Send + 'static,
1020{
1021    async { Box::new(future.await) as Box<dyn Any + Send> }.boxed()
1022}
1023
1024fn any_local_future<T, F>(future: F) -> AnyLocalFuture
1025where
1026    T: 'static,
1027    F: Future<Output = T> + 'static,
1028{
1029    async { Box::new(future.await) as Box<dyn Any> }.boxed_local()
1030}