executor.rs

   1use anyhow::{anyhow, Result};
   2use async_task::Runnable;
   3use futures::channel::mpsc;
   4use smol::{channel, prelude::*, Executor};
   5use std::{
   6    any::Any,
   7    fmt::{self, Display},
   8    marker::PhantomData,
   9    mem,
  10    pin::Pin,
  11    rc::Rc,
  12    sync::Arc,
  13    task::{Context, Poll},
  14    thread,
  15    time::Duration,
  16};
  17
  18use crate::{
  19    platform::{self, Dispatcher},
  20    util, AppContext,
  21};
  22
  23pub enum Foreground {
  24    Platform {
  25        dispatcher: Arc<dyn platform::Dispatcher>,
  26        _not_send_or_sync: PhantomData<Rc<()>>,
  27    },
  28    #[cfg(any(test, feature = "test-support"))]
  29    Deterministic {
  30        cx_id: usize,
  31        executor: Arc<Deterministic>,
  32    },
  33}
  34
  35pub enum Background {
  36    #[cfg(any(test, feature = "test-support"))]
  37    Deterministic { executor: Arc<Deterministic> },
  38    Production {
  39        executor: Arc<smol::Executor<'static>>,
  40        _stop: channel::Sender<()>,
  41    },
  42}
  43
  44type AnyLocalFuture = Pin<Box<dyn 'static + Future<Output = Box<dyn Any + 'static>>>>;
  45type AnyFuture = Pin<Box<dyn 'static + Send + Future<Output = Box<dyn Any + Send + 'static>>>>;
  46type AnyTask = async_task::Task<Box<dyn Any + Send + 'static>>;
  47type AnyLocalTask = async_task::Task<Box<dyn Any + 'static>>;
  48
  49#[must_use]
  50pub enum Task<T> {
  51    Ready(Option<T>),
  52    Local {
  53        any_task: AnyLocalTask,
  54        result_type: PhantomData<T>,
  55    },
  56    Send {
  57        any_task: AnyTask,
  58        result_type: PhantomData<T>,
  59    },
  60}
  61
  62unsafe impl<T: Send> Send for Task<T> {}
  63
  64#[cfg(any(test, feature = "test-support"))]
  65struct DeterministicState {
  66    rng: rand::prelude::StdRng,
  67    seed: u64,
  68    scheduled_from_foreground: collections::HashMap<usize, Vec<ForegroundRunnable>>,
  69    scheduled_from_background: Vec<BackgroundRunnable>,
  70    forbid_parking: bool,
  71    block_on_ticks: std::ops::RangeInclusive<usize>,
  72    now: std::time::Instant,
  73    next_timer_id: usize,
  74    pending_timers: Vec<(usize, std::time::Instant, postage::barrier::Sender)>,
  75    waiting_backtrace: Option<backtrace::Backtrace>,
  76    next_runnable_id: usize,
  77    poll_history: Vec<ExecutorEvent>,
  78    previous_poll_history: Option<Vec<ExecutorEvent>>,
  79    enable_runnable_backtraces: bool,
  80    runnable_backtraces: collections::HashMap<usize, backtrace::Backtrace>,
  81}
  82
  83#[derive(Copy, Clone, Debug, PartialEq, Eq)]
  84pub enum ExecutorEvent {
  85    PollRunnable { id: usize },
  86    EnqueuRunnable { id: usize },
  87}
  88
  89#[cfg(any(test, feature = "test-support"))]
  90struct ForegroundRunnable {
  91    id: usize,
  92    runnable: Runnable,
  93    main: bool,
  94}
  95
  96#[cfg(any(test, feature = "test-support"))]
  97struct BackgroundRunnable {
  98    id: usize,
  99    runnable: Runnable,
 100}
 101
 102#[cfg(any(test, feature = "test-support"))]
 103pub struct Deterministic {
 104    state: Arc<parking_lot::Mutex<DeterministicState>>,
 105    parker: parking_lot::Mutex<parking::Parker>,
 106}
 107
 108pub enum Timer {
 109    Production(smol::Timer),
 110    #[cfg(any(test, feature = "test-support"))]
 111    Deterministic(DeterministicTimer),
 112}
 113
 114#[cfg(any(test, feature = "test-support"))]
 115pub struct DeterministicTimer {
 116    rx: postage::barrier::Receiver,
 117    id: usize,
 118    state: Arc<parking_lot::Mutex<DeterministicState>>,
 119}
 120
 121#[cfg(any(test, feature = "test-support"))]
 122impl Deterministic {
 123    pub fn new(seed: u64) -> Arc<Self> {
 124        use rand::prelude::*;
 125
 126        Arc::new(Self {
 127            state: Arc::new(parking_lot::Mutex::new(DeterministicState {
 128                rng: StdRng::seed_from_u64(seed),
 129                seed,
 130                scheduled_from_foreground: Default::default(),
 131                scheduled_from_background: Default::default(),
 132                forbid_parking: false,
 133                block_on_ticks: 0..=1000,
 134                now: std::time::Instant::now(),
 135                next_timer_id: Default::default(),
 136                pending_timers: Default::default(),
 137                waiting_backtrace: None,
 138                next_runnable_id: 0,
 139                poll_history: Default::default(),
 140                previous_poll_history: Default::default(),
 141                enable_runnable_backtraces: false,
 142                runnable_backtraces: Default::default(),
 143            })),
 144            parker: Default::default(),
 145        })
 146    }
 147
 148    pub fn execution_history(&self) -> Vec<ExecutorEvent> {
 149        self.state.lock().poll_history.clone()
 150    }
 151
 152    pub fn set_previous_execution_history(&self, history: Option<Vec<ExecutorEvent>>) {
 153        self.state.lock().previous_poll_history = history;
 154    }
 155
 156    pub fn enable_runnable_backtrace(&self) {
 157        self.state.lock().enable_runnable_backtraces = true;
 158    }
 159
 160    pub fn runnable_backtrace(&self, runnable_id: usize) -> backtrace::Backtrace {
 161        let mut backtrace = self.state.lock().runnable_backtraces[&runnable_id].clone();
 162        backtrace.resolve();
 163        backtrace
 164    }
 165
 166    pub fn build_background(self: &Arc<Self>) -> Arc<Background> {
 167        Arc::new(Background::Deterministic {
 168            executor: self.clone(),
 169        })
 170    }
 171
 172    pub fn build_foreground(self: &Arc<Self>, id: usize) -> Rc<Foreground> {
 173        Rc::new(Foreground::Deterministic {
 174            cx_id: id,
 175            executor: self.clone(),
 176        })
 177    }
 178
 179    fn spawn_from_foreground(
 180        &self,
 181        cx_id: usize,
 182        future: AnyLocalFuture,
 183        main: bool,
 184    ) -> AnyLocalTask {
 185        let state = self.state.clone();
 186        let id;
 187        {
 188            let mut state = state.lock();
 189            id = util::post_inc(&mut state.next_runnable_id);
 190            if state.enable_runnable_backtraces {
 191                state
 192                    .runnable_backtraces
 193                    .insert(id, backtrace::Backtrace::new_unresolved());
 194            }
 195        }
 196
 197        let unparker = self.parker.lock().unparker();
 198        let (runnable, task) = async_task::spawn_local(future, move |runnable| {
 199            let mut state = state.lock();
 200            state.push_to_history(ExecutorEvent::EnqueuRunnable { id });
 201            state
 202                .scheduled_from_foreground
 203                .entry(cx_id)
 204                .or_default()
 205                .push(ForegroundRunnable { id, runnable, main });
 206            unparker.unpark();
 207        });
 208        runnable.schedule();
 209        task
 210    }
 211
 212    fn spawn(&self, future: AnyFuture) -> AnyTask {
 213        let state = self.state.clone();
 214        let id;
 215        {
 216            let mut state = state.lock();
 217            id = util::post_inc(&mut state.next_runnable_id);
 218            if state.enable_runnable_backtraces {
 219                state
 220                    .runnable_backtraces
 221                    .insert(id, backtrace::Backtrace::new_unresolved());
 222            }
 223        }
 224
 225        let unparker = self.parker.lock().unparker();
 226        let (runnable, task) = async_task::spawn(future, move |runnable| {
 227            let mut state = state.lock();
 228            state
 229                .poll_history
 230                .push(ExecutorEvent::EnqueuRunnable { id });
 231            state
 232                .scheduled_from_background
 233                .push(BackgroundRunnable { id, runnable });
 234            unparker.unpark();
 235        });
 236        runnable.schedule();
 237        task
 238    }
 239
 240    fn run<'a>(
 241        &self,
 242        cx_id: usize,
 243        main_future: Pin<Box<dyn 'a + Future<Output = Box<dyn Any>>>>,
 244    ) -> Box<dyn Any> {
 245        use std::sync::atomic::{AtomicBool, Ordering::SeqCst};
 246
 247        let woken = Arc::new(AtomicBool::new(false));
 248
 249        let state = self.state.clone();
 250        let id;
 251        {
 252            let mut state = state.lock();
 253            id = util::post_inc(&mut state.next_runnable_id);
 254            if state.enable_runnable_backtraces {
 255                state
 256                    .runnable_backtraces
 257                    .insert(id, backtrace::Backtrace::new_unresolved());
 258            }
 259        }
 260
 261        let unparker = self.parker.lock().unparker();
 262        let (runnable, mut main_task) = unsafe {
 263            async_task::spawn_unchecked(main_future, move |runnable| {
 264                let state = &mut *state.lock();
 265                state
 266                    .scheduled_from_foreground
 267                    .entry(cx_id)
 268                    .or_default()
 269                    .push(ForegroundRunnable {
 270                        id: util::post_inc(&mut state.next_runnable_id),
 271                        runnable,
 272                        main: true,
 273                    });
 274                unparker.unpark();
 275            })
 276        };
 277        runnable.schedule();
 278
 279        loop {
 280            if let Some(result) = self.run_internal(woken.clone(), Some(&mut main_task)) {
 281                return result;
 282            }
 283
 284            if !woken.load(SeqCst) {
 285                self.state.lock().will_park();
 286            }
 287
 288            woken.store(false, SeqCst);
 289            self.parker.lock().park();
 290        }
 291    }
 292
 293    pub fn run_until_parked(&self) {
 294        use std::sync::atomic::AtomicBool;
 295        let woken = Arc::new(AtomicBool::new(false));
 296        self.run_internal(woken, None);
 297    }
 298
 299    fn run_internal(
 300        &self,
 301        woken: Arc<std::sync::atomic::AtomicBool>,
 302        mut main_task: Option<&mut AnyLocalTask>,
 303    ) -> Option<Box<dyn Any>> {
 304        use rand::prelude::*;
 305        use std::sync::atomic::Ordering::SeqCst;
 306
 307        let unparker = self.parker.lock().unparker();
 308        let waker = waker_fn::waker_fn(move || {
 309            woken.store(true, SeqCst);
 310            unparker.unpark();
 311        });
 312
 313        let mut cx = Context::from_waker(&waker);
 314        loop {
 315            let mut state = self.state.lock();
 316
 317            if state.scheduled_from_foreground.is_empty()
 318                && state.scheduled_from_background.is_empty()
 319            {
 320                if let Some(main_task) = main_task {
 321                    if let Poll::Ready(result) = main_task.poll(&mut cx) {
 322                        return Some(result);
 323                    }
 324                }
 325
 326                return None;
 327            }
 328
 329            if !state.scheduled_from_background.is_empty() && state.rng.gen() {
 330                let background_len = state.scheduled_from_background.len();
 331                let ix = state.rng.gen_range(0..background_len);
 332                let background_runnable = state.scheduled_from_background.remove(ix);
 333                state.push_to_history(ExecutorEvent::PollRunnable {
 334                    id: background_runnable.id,
 335                });
 336                drop(state);
 337                background_runnable.runnable.run();
 338            } else if !state.scheduled_from_foreground.is_empty() {
 339                let available_cx_ids = state
 340                    .scheduled_from_foreground
 341                    .keys()
 342                    .copied()
 343                    .collect::<Vec<_>>();
 344                let cx_id_to_run = *available_cx_ids.iter().choose(&mut state.rng).unwrap();
 345                let scheduled_from_cx = state
 346                    .scheduled_from_foreground
 347                    .get_mut(&cx_id_to_run)
 348                    .unwrap();
 349                let foreground_runnable = scheduled_from_cx.remove(0);
 350                if scheduled_from_cx.is_empty() {
 351                    state.scheduled_from_foreground.remove(&cx_id_to_run);
 352                }
 353                state.push_to_history(ExecutorEvent::PollRunnable {
 354                    id: foreground_runnable.id,
 355                });
 356
 357                drop(state);
 358
 359                foreground_runnable.runnable.run();
 360                if let Some(main_task) = main_task.as_mut() {
 361                    if foreground_runnable.main {
 362                        if let Poll::Ready(result) = main_task.poll(&mut cx) {
 363                            return Some(result);
 364                        }
 365                    }
 366                }
 367            }
 368        }
 369    }
 370
 371    fn block<F, T>(&self, future: &mut F, max_ticks: usize) -> Option<T>
 372    where
 373        F: Unpin + Future<Output = T>,
 374    {
 375        use rand::prelude::*;
 376
 377        let unparker = self.parker.lock().unparker();
 378        let waker = waker_fn::waker_fn(move || {
 379            unparker.unpark();
 380        });
 381
 382        let mut cx = Context::from_waker(&waker);
 383        for _ in 0..max_ticks {
 384            let mut state = self.state.lock();
 385            let runnable_count = state.scheduled_from_background.len();
 386            let ix = state.rng.gen_range(0..=runnable_count);
 387            if ix < state.scheduled_from_background.len() {
 388                let background_runnable = state.scheduled_from_background.remove(ix);
 389                state.push_to_history(ExecutorEvent::PollRunnable {
 390                    id: background_runnable.id,
 391                });
 392                drop(state);
 393                background_runnable.runnable.run();
 394            } else {
 395                drop(state);
 396                if let Poll::Ready(result) = future.poll(&mut cx) {
 397                    return Some(result);
 398                }
 399                let mut state = self.state.lock();
 400                if state.scheduled_from_background.is_empty() {
 401                    state.will_park();
 402                    drop(state);
 403                    self.parker.lock().park();
 404                }
 405
 406                continue;
 407            }
 408        }
 409
 410        None
 411    }
 412
 413    pub fn timer(&self, duration: Duration) -> Timer {
 414        let (tx, rx) = postage::barrier::channel();
 415        let mut state = self.state.lock();
 416        let wakeup_at = state.now + duration;
 417        let id = util::post_inc(&mut state.next_timer_id);
 418        match state
 419            .pending_timers
 420            .binary_search_by_key(&wakeup_at, |e| e.1)
 421        {
 422            Ok(ix) | Err(ix) => state.pending_timers.insert(ix, (id, wakeup_at, tx)),
 423        }
 424        let state = self.state.clone();
 425        Timer::Deterministic(DeterministicTimer { rx, id, state })
 426    }
 427
 428    pub fn now(&self) -> std::time::Instant {
 429        let state = self.state.lock();
 430        state.now
 431    }
 432
 433    pub fn advance_clock(&self, duration: Duration) {
 434        let new_now = self.state.lock().now + duration;
 435        loop {
 436            self.run_until_parked();
 437            let mut state = self.state.lock();
 438
 439            if let Some((_, wakeup_time, _)) = state.pending_timers.first() {
 440                let wakeup_time = *wakeup_time;
 441                if wakeup_time <= new_now {
 442                    let timer_count = state
 443                        .pending_timers
 444                        .iter()
 445                        .take_while(|(_, t, _)| *t == wakeup_time)
 446                        .count();
 447                    state.now = wakeup_time;
 448                    let timers_to_wake = state
 449                        .pending_timers
 450                        .drain(0..timer_count)
 451                        .collect::<Vec<_>>();
 452                    drop(state);
 453                    drop(timers_to_wake);
 454                    continue;
 455                }
 456            }
 457
 458            break;
 459        }
 460
 461        self.state.lock().now = new_now;
 462    }
 463
 464    pub fn start_waiting(&self) {
 465        self.state.lock().waiting_backtrace = Some(backtrace::Backtrace::new_unresolved());
 466    }
 467
 468    pub fn finish_waiting(&self) {
 469        self.state.lock().waiting_backtrace.take();
 470    }
 471
 472    pub fn forbid_parking(&self) {
 473        use rand::prelude::*;
 474
 475        let mut state = self.state.lock();
 476        state.forbid_parking = true;
 477        state.rng = StdRng::seed_from_u64(state.seed);
 478    }
 479
 480    pub async fn simulate_random_delay(&self) {
 481        use rand::prelude::*;
 482        use smol::future::yield_now;
 483        if self.state.lock().rng.gen_bool(0.2) {
 484            let yields = self.state.lock().rng.gen_range(1..=10);
 485            for _ in 0..yields {
 486                yield_now().await;
 487            }
 488        }
 489    }
 490
 491    pub fn record_backtrace(&self) {
 492        let mut state = self.state.lock();
 493        if state.enable_runnable_backtraces {
 494            let current_id = state
 495                .poll_history
 496                .iter()
 497                .rev()
 498                .find_map(|event| match event {
 499                    ExecutorEvent::PollRunnable { id } => Some(*id),
 500                    _ => None,
 501                });
 502            if let Some(id) = current_id {
 503                state
 504                    .runnable_backtraces
 505                    .insert(id, backtrace::Backtrace::new_unresolved());
 506            }
 507        }
 508    }
 509}
 510
 511impl Drop for Timer {
 512    fn drop(&mut self) {
 513        #[cfg(any(test, feature = "test-support"))]
 514        if let Timer::Deterministic(DeterministicTimer { state, id, .. }) = self {
 515            state
 516                .lock()
 517                .pending_timers
 518                .retain(|(timer_id, _, _)| timer_id != id)
 519        }
 520    }
 521}
 522
 523impl Future for Timer {
 524    type Output = ();
 525
 526    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
 527        match &mut *self {
 528            #[cfg(any(test, feature = "test-support"))]
 529            Self::Deterministic(DeterministicTimer { rx, .. }) => {
 530                use postage::stream::{PollRecv, Stream as _};
 531                smol::pin!(rx);
 532                match rx.poll_recv(&mut postage::Context::from_waker(cx.waker())) {
 533                    PollRecv::Ready(()) | PollRecv::Closed => Poll::Ready(()),
 534                    PollRecv::Pending => Poll::Pending,
 535                }
 536            }
 537            Self::Production(timer) => {
 538                smol::pin!(timer);
 539                match timer.poll(cx) {
 540                    Poll::Ready(_) => Poll::Ready(()),
 541                    Poll::Pending => Poll::Pending,
 542                }
 543            }
 544        }
 545    }
 546}
 547
 548#[cfg(any(test, feature = "test-support"))]
 549impl DeterministicState {
 550    fn push_to_history(&mut self, event: ExecutorEvent) {
 551        use std::fmt::Write as _;
 552
 553        self.poll_history.push(event);
 554        if let Some(prev_history) = &self.previous_poll_history {
 555            let ix = self.poll_history.len() - 1;
 556            let prev_event = prev_history[ix];
 557            if event != prev_event {
 558                let mut message = String::new();
 559                writeln!(
 560                    &mut message,
 561                    "current runnable backtrace:\n{:?}",
 562                    self.runnable_backtraces.get_mut(&event.id()).map(|trace| {
 563                        trace.resolve();
 564                        util::CwdBacktrace(trace)
 565                    })
 566                )
 567                .unwrap();
 568                writeln!(
 569                    &mut message,
 570                    "previous runnable backtrace:\n{:?}",
 571                    self.runnable_backtraces
 572                        .get_mut(&prev_event.id())
 573                        .map(|trace| {
 574                            trace.resolve();
 575                            util::CwdBacktrace(trace)
 576                        })
 577                )
 578                .unwrap();
 579                panic!("detected non-determinism after {ix}. {message}");
 580            }
 581        }
 582    }
 583
 584    fn will_park(&mut self) {
 585        if self.forbid_parking {
 586            let mut backtrace_message = String::new();
 587            #[cfg(any(test, feature = "test-support"))]
 588            if let Some(backtrace) = self.waiting_backtrace.as_mut() {
 589                backtrace.resolve();
 590                backtrace_message = format!(
 591                    "\nbacktrace of waiting future:\n{:?}",
 592                    util::CwdBacktrace(backtrace)
 593                );
 594            }
 595
 596            panic!(
 597                "deterministic executor parked after a call to forbid_parking{}",
 598                backtrace_message
 599            );
 600        }
 601    }
 602}
 603
 604#[cfg(any(test, feature = "test-support"))]
 605impl ExecutorEvent {
 606    pub fn id(&self) -> usize {
 607        match self {
 608            ExecutorEvent::PollRunnable { id } => *id,
 609            ExecutorEvent::EnqueuRunnable { id } => *id,
 610        }
 611    }
 612}
 613
 614impl Foreground {
 615    pub fn platform(dispatcher: Arc<dyn platform::Dispatcher>) -> Result<Self> {
 616        if dispatcher.is_main_thread() {
 617            Ok(Self::Platform {
 618                dispatcher,
 619                _not_send_or_sync: PhantomData,
 620            })
 621        } else {
 622            Err(anyhow!("must be constructed on main thread"))
 623        }
 624    }
 625
 626    pub fn spawn<T: 'static>(&self, future: impl Future<Output = T> + 'static) -> Task<T> {
 627        let future = any_local_future(future);
 628        let any_task = match self {
 629            #[cfg(any(test, feature = "test-support"))]
 630            Self::Deterministic { cx_id, executor } => {
 631                executor.spawn_from_foreground(*cx_id, future, false)
 632            }
 633            Self::Platform { dispatcher, .. } => {
 634                fn spawn_inner(
 635                    future: AnyLocalFuture,
 636                    dispatcher: &Arc<dyn Dispatcher>,
 637                ) -> AnyLocalTask {
 638                    let dispatcher = dispatcher.clone();
 639                    let schedule =
 640                        move |runnable: Runnable| dispatcher.run_on_main_thread(runnable);
 641                    let (runnable, task) = async_task::spawn_local(future, schedule);
 642                    runnable.schedule();
 643                    task
 644                }
 645                spawn_inner(future, dispatcher)
 646            }
 647        };
 648        Task::local(any_task)
 649    }
 650
 651    #[cfg(any(test, feature = "test-support"))]
 652    pub fn run<T: 'static>(&self, future: impl Future<Output = T>) -> T {
 653        let future = async move { Box::new(future.await) as Box<dyn Any> }.boxed_local();
 654        let result = match self {
 655            Self::Deterministic { cx_id, executor } => executor.run(*cx_id, future),
 656            Self::Platform { .. } => panic!("you can't call run on a platform foreground executor"),
 657        };
 658        *result.downcast().unwrap()
 659    }
 660
 661    #[cfg(any(test, feature = "test-support"))]
 662    pub fn run_until_parked(&self) {
 663        match self {
 664            Self::Deterministic { executor, .. } => executor.run_until_parked(),
 665            _ => panic!("this method can only be called on a deterministic executor"),
 666        }
 667    }
 668
 669    #[cfg(any(test, feature = "test-support"))]
 670    pub fn parking_forbidden(&self) -> bool {
 671        match self {
 672            Self::Deterministic { executor, .. } => executor.state.lock().forbid_parking,
 673            _ => panic!("this method can only be called on a deterministic executor"),
 674        }
 675    }
 676
 677    #[cfg(any(test, feature = "test-support"))]
 678    pub fn start_waiting(&self) {
 679        match self {
 680            Self::Deterministic { executor, .. } => executor.start_waiting(),
 681            _ => panic!("this method can only be called on a deterministic executor"),
 682        }
 683    }
 684
 685    #[cfg(any(test, feature = "test-support"))]
 686    pub fn finish_waiting(&self) {
 687        match self {
 688            Self::Deterministic { executor, .. } => executor.finish_waiting(),
 689            _ => panic!("this method can only be called on a deterministic executor"),
 690        }
 691    }
 692
 693    #[cfg(any(test, feature = "test-support"))]
 694    pub fn forbid_parking(&self) {
 695        match self {
 696            Self::Deterministic { executor, .. } => executor.forbid_parking(),
 697            _ => panic!("this method can only be called on a deterministic executor"),
 698        }
 699    }
 700
 701    #[cfg(any(test, feature = "test-support"))]
 702    pub fn advance_clock(&self, duration: Duration) {
 703        match self {
 704            Self::Deterministic { executor, .. } => executor.advance_clock(duration),
 705            _ => panic!("this method can only be called on a deterministic executor"),
 706        }
 707    }
 708
 709    #[cfg(any(test, feature = "test-support"))]
 710    pub fn set_block_on_ticks(&self, range: std::ops::RangeInclusive<usize>) {
 711        match self {
 712            Self::Deterministic { executor, .. } => executor.state.lock().block_on_ticks = range,
 713            _ => panic!("this method can only be called on a deterministic executor"),
 714        }
 715    }
 716}
 717
 718impl Background {
 719    pub fn new() -> Self {
 720        let executor = Arc::new(Executor::new());
 721        let stop = channel::unbounded::<()>();
 722
 723        for i in 0..2 * num_cpus::get() {
 724            let executor = executor.clone();
 725            let stop = stop.1.clone();
 726            thread::Builder::new()
 727                .name(format!("background-executor-{}", i))
 728                .spawn(move || smol::block_on(executor.run(stop.recv())))
 729                .unwrap();
 730        }
 731
 732        Self::Production {
 733            executor,
 734            _stop: stop.0,
 735        }
 736    }
 737
 738    pub fn num_cpus(&self) -> usize {
 739        num_cpus::get()
 740    }
 741
 742    pub fn spawn<T, F>(&self, future: F) -> Task<T>
 743    where
 744        T: 'static + Send,
 745        F: Send + Future<Output = T> + 'static,
 746    {
 747        let future = any_future(future);
 748        let any_task = match self {
 749            Self::Production { executor, .. } => executor.spawn(future),
 750            #[cfg(any(test, feature = "test-support"))]
 751            Self::Deterministic { executor } => executor.spawn(future),
 752        };
 753        Task::send(any_task)
 754    }
 755
 756    pub fn block<F, T>(&self, future: F) -> T
 757    where
 758        F: Future<Output = T>,
 759    {
 760        smol::pin!(future);
 761        match self {
 762            Self::Production { .. } => smol::block_on(&mut future),
 763            #[cfg(any(test, feature = "test-support"))]
 764            Self::Deterministic { executor, .. } => {
 765                executor.block(&mut future, usize::MAX).unwrap()
 766            }
 767        }
 768    }
 769
 770    pub fn block_with_timeout<F, T>(
 771        &self,
 772        timeout: Duration,
 773        future: F,
 774    ) -> Result<T, impl Future<Output = T>>
 775    where
 776        T: 'static,
 777        F: 'static + Unpin + Future<Output = T>,
 778    {
 779        let mut future = any_local_future(future);
 780        if !timeout.is_zero() {
 781            let output = match self {
 782                Self::Production { .. } => smol::block_on(util::timeout(timeout, &mut future)).ok(),
 783                #[cfg(any(test, feature = "test-support"))]
 784                Self::Deterministic { executor, .. } => {
 785                    use rand::prelude::*;
 786                    let max_ticks = {
 787                        let mut state = executor.state.lock();
 788                        let range = state.block_on_ticks.clone();
 789                        state.rng.gen_range(range)
 790                    };
 791                    executor.block(&mut future, max_ticks)
 792                }
 793            };
 794            if let Some(output) = output {
 795                return Ok(*output.downcast().unwrap());
 796            }
 797        }
 798        Err(async { *future.await.downcast().unwrap() })
 799    }
 800
 801    pub async fn scoped<'scope, F>(self: &Arc<Self>, scheduler: F)
 802    where
 803        F: FnOnce(&mut Scope<'scope>),
 804    {
 805        let mut scope = Scope::new(self.clone());
 806        (scheduler)(&mut scope);
 807        let spawned = mem::take(&mut scope.futures)
 808            .into_iter()
 809            .map(|f| self.spawn(f))
 810            .collect::<Vec<_>>();
 811        for task in spawned {
 812            task.await;
 813        }
 814    }
 815
 816    pub fn timer(&self, duration: Duration) -> Timer {
 817        match self {
 818            Background::Production { .. } => Timer::Production(smol::Timer::after(duration)),
 819            #[cfg(any(test, feature = "test-support"))]
 820            Background::Deterministic { executor } => executor.timer(duration),
 821        }
 822    }
 823
 824    pub fn now(&self) -> std::time::Instant {
 825        match self {
 826            Background::Production { .. } => std::time::Instant::now(),
 827            #[cfg(any(test, feature = "test-support"))]
 828            Background::Deterministic { executor } => executor.now(),
 829        }
 830    }
 831
 832    #[cfg(any(test, feature = "test-support"))]
 833    pub fn rng<'a>(&'a self) -> impl 'a + std::ops::DerefMut<Target = rand::prelude::StdRng> {
 834        match self {
 835            Self::Deterministic { executor, .. } => {
 836                parking_lot::lock_api::MutexGuard::map(executor.state.lock(), |s| &mut s.rng)
 837            }
 838            _ => panic!("this method can only be called on a deterministic executor"),
 839        }
 840    }
 841
 842    #[cfg(any(test, feature = "test-support"))]
 843    pub async fn simulate_random_delay(&self) {
 844        match self {
 845            Self::Deterministic { executor, .. } => {
 846                executor.simulate_random_delay().await;
 847            }
 848            _ => {
 849                panic!("this method can only be called on a deterministic executor")
 850            }
 851        }
 852    }
 853
 854    #[cfg(any(test, feature = "test-support"))]
 855    pub fn record_backtrace(&self) {
 856        match self {
 857            Self::Deterministic { executor, .. } => executor.record_backtrace(),
 858            _ => {
 859                panic!("this method can only be called on a deterministic executor")
 860            }
 861        }
 862    }
 863}
 864
 865impl Default for Background {
 866    fn default() -> Self {
 867        Self::new()
 868    }
 869}
 870
 871pub struct Scope<'a> {
 872    executor: Arc<Background>,
 873    futures: Vec<Pin<Box<dyn Future<Output = ()> + Send + 'static>>>,
 874    tx: Option<mpsc::Sender<()>>,
 875    rx: mpsc::Receiver<()>,
 876    _phantom: PhantomData<&'a ()>,
 877}
 878
 879impl<'a> Scope<'a> {
 880    fn new(executor: Arc<Background>) -> Self {
 881        let (tx, rx) = mpsc::channel(1);
 882        Self {
 883            executor,
 884            tx: Some(tx),
 885            rx,
 886            futures: Default::default(),
 887            _phantom: PhantomData,
 888        }
 889    }
 890
 891    pub fn spawn<F>(&mut self, f: F)
 892    where
 893        F: Future<Output = ()> + Send + 'a,
 894    {
 895        let tx = self.tx.clone().unwrap();
 896
 897        // Safety: The 'a lifetime is guaranteed to outlive any of these futures because
 898        // dropping this `Scope` blocks until all of the futures have resolved.
 899        let f = unsafe {
 900            mem::transmute::<
 901                Pin<Box<dyn Future<Output = ()> + Send + 'a>>,
 902                Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
 903            >(Box::pin(async move {
 904                f.await;
 905                drop(tx);
 906            }))
 907        };
 908        self.futures.push(f);
 909    }
 910}
 911
 912impl<'a> Drop for Scope<'a> {
 913    fn drop(&mut self) {
 914        self.tx.take().unwrap();
 915
 916        // Wait until the channel is closed, which means that all of the spawned
 917        // futures have resolved.
 918        self.executor.block(self.rx.next());
 919    }
 920}
 921
 922impl<T> Task<T> {
 923    pub fn ready(value: T) -> Self {
 924        Self::Ready(Some(value))
 925    }
 926
 927    fn local(any_task: AnyLocalTask) -> Self {
 928        Self::Local {
 929            any_task,
 930            result_type: PhantomData,
 931        }
 932    }
 933
 934    pub fn detach(self) {
 935        match self {
 936            Task::Ready(_) => {}
 937            Task::Local { any_task, .. } => any_task.detach(),
 938            Task::Send { any_task, .. } => any_task.detach(),
 939        }
 940    }
 941}
 942
 943impl<T: 'static, E: 'static + Display> Task<Result<T, E>> {
 944    pub fn detach_and_log_err(self, cx: &mut AppContext) {
 945        cx.spawn(|_| async move {
 946            if let Err(err) = self.await {
 947                log::error!("{}", err);
 948            }
 949        })
 950        .detach();
 951    }
 952}
 953
 954impl<T: Send> Task<T> {
 955    fn send(any_task: AnyTask) -> Self {
 956        Self::Send {
 957            any_task,
 958            result_type: PhantomData,
 959        }
 960    }
 961}
 962
 963impl<T: fmt::Debug> fmt::Debug for Task<T> {
 964    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
 965        match self {
 966            Task::Ready(value) => value.fmt(f),
 967            Task::Local { any_task, .. } => any_task.fmt(f),
 968            Task::Send { any_task, .. } => any_task.fmt(f),
 969        }
 970    }
 971}
 972
 973impl<T: 'static> Future for Task<T> {
 974    type Output = T;
 975
 976    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
 977        match unsafe { self.get_unchecked_mut() } {
 978            Task::Ready(value) => Poll::Ready(value.take().unwrap()),
 979            Task::Local { any_task, .. } => {
 980                any_task.poll(cx).map(|value| *value.downcast().unwrap())
 981            }
 982            Task::Send { any_task, .. } => {
 983                any_task.poll(cx).map(|value| *value.downcast().unwrap())
 984            }
 985        }
 986    }
 987}
 988
 989fn any_future<T, F>(future: F) -> AnyFuture
 990where
 991    T: 'static + Send,
 992    F: Future<Output = T> + Send + 'static,
 993{
 994    async { Box::new(future.await) as Box<dyn Any + Send> }.boxed()
 995}
 996
 997fn any_local_future<T, F>(future: F) -> AnyLocalFuture
 998where
 999    T: 'static,
1000    F: Future<Output = T> + 'static,
1001{
1002    async { Box::new(future.await) as Box<dyn Any> }.boxed_local()
1003}