executor.rs

   1use anyhow::{anyhow, Result};
   2use async_task::Runnable;
   3use futures::channel::mpsc;
   4use smol::{channel, prelude::*, Executor};
   5use std::{
   6    any::Any,
   7    fmt::{self, Display},
   8    marker::PhantomData,
   9    mem,
  10    pin::Pin,
  11    rc::Rc,
  12    sync::Arc,
  13    task::{Context, Poll},
  14    thread,
  15    time::Duration, panic::Location,
  16};
  17
  18use crate::{
  19    platform::{self, Dispatcher},
  20    util, AppContext,
  21};
  22
  23pub enum Foreground {
  24    Platform {
  25        dispatcher: Arc<dyn platform::Dispatcher>,
  26        _not_send_or_sync: PhantomData<Rc<()>>,
  27    },
  28    #[cfg(any(test, feature = "test-support"))]
  29    Deterministic {
  30        cx_id: usize,
  31        executor: Arc<Deterministic>,
  32    },
  33}
  34
  35pub enum Background {
  36    #[cfg(any(test, feature = "test-support"))]
  37    Deterministic { executor: Arc<Deterministic> },
  38    Production {
  39        executor: Arc<smol::Executor<'static>>,
  40        _stop: channel::Sender<()>,
  41    },
  42}
  43
  44type AnyLocalFuture = Pin<Box<dyn 'static + Future<Output = Box<dyn Any + 'static>>>>;
  45type AnyFuture = Pin<Box<dyn 'static + Send + Future<Output = Box<dyn Any + Send + 'static>>>>;
  46type AnyTask = async_task::Task<Box<dyn Any + Send + 'static>>;
  47type AnyLocalTask = async_task::Task<Box<dyn Any + 'static>>;
  48
  49#[must_use]
  50pub enum Task<T> {
  51    Ready(Option<T>),
  52    Local {
  53        any_task: AnyLocalTask,
  54        result_type: PhantomData<T>,
  55    },
  56    Send {
  57        any_task: AnyTask,
  58        result_type: PhantomData<T>,
  59    },
  60}
  61
  62unsafe impl<T: Send> Send for Task<T> {}
  63
  64#[cfg(any(test, feature = "test-support"))]
  65struct DeterministicState {
  66    rng: rand::prelude::StdRng,
  67    seed: u64,
  68    scheduled_from_foreground: collections::HashMap<usize, Vec<ForegroundRunnable>>,
  69    scheduled_from_background: Vec<BackgroundRunnable>,
  70    forbid_parking: bool,
  71    block_on_ticks: std::ops::RangeInclusive<usize>,
  72    now: std::time::Instant,
  73    next_timer_id: usize,
  74    pending_timers: Vec<(usize, std::time::Instant, postage::barrier::Sender)>,
  75    waiting_backtrace: Option<backtrace::Backtrace>,
  76    next_runnable_id: usize,
  77    poll_history: Vec<ExecutorEvent>,
  78    previous_poll_history: Option<Vec<ExecutorEvent>>,
  79    enable_runnable_backtraces: bool,
  80    runnable_backtraces: collections::HashMap<usize, backtrace::Backtrace>,
  81}
  82
  83#[derive(Copy, Clone, Debug, PartialEq, Eq)]
  84pub enum ExecutorEvent {
  85    PollRunnable { id: usize },
  86    EnqueuRunnable { id: usize },
  87}
  88
  89#[cfg(any(test, feature = "test-support"))]
  90struct ForegroundRunnable {
  91    id: usize,
  92    runnable: Runnable,
  93    main: bool,
  94}
  95
  96#[cfg(any(test, feature = "test-support"))]
  97struct BackgroundRunnable {
  98    id: usize,
  99    runnable: Runnable,
 100}
 101
 102#[cfg(any(test, feature = "test-support"))]
 103pub struct Deterministic {
 104    state: Arc<parking_lot::Mutex<DeterministicState>>,
 105    parker: parking_lot::Mutex<parking::Parker>,
 106}
 107
 108pub enum Timer {
 109    Production(smol::Timer),
 110    #[cfg(any(test, feature = "test-support"))]
 111    Deterministic(DeterministicTimer),
 112}
 113
 114#[cfg(any(test, feature = "test-support"))]
 115pub struct DeterministicTimer {
 116    rx: postage::barrier::Receiver,
 117    id: usize,
 118    state: Arc<parking_lot::Mutex<DeterministicState>>,
 119}
 120
 121#[cfg(any(test, feature = "test-support"))]
 122impl Deterministic {
 123    pub fn new(seed: u64) -> Arc<Self> {
 124        use rand::prelude::*;
 125
 126        Arc::new(Self {
 127            state: Arc::new(parking_lot::Mutex::new(DeterministicState {
 128                rng: StdRng::seed_from_u64(seed),
 129                seed,
 130                scheduled_from_foreground: Default::default(),
 131                scheduled_from_background: Default::default(),
 132                forbid_parking: false,
 133                block_on_ticks: 0..=1000,
 134                now: std::time::Instant::now(),
 135                next_timer_id: Default::default(),
 136                pending_timers: Default::default(),
 137                waiting_backtrace: None,
 138                next_runnable_id: 0,
 139                poll_history: Default::default(),
 140                previous_poll_history: Default::default(),
 141                enable_runnable_backtraces: false,
 142                runnable_backtraces: Default::default(),
 143            })),
 144            parker: Default::default(),
 145        })
 146    }
 147
 148    pub fn execution_history(&self) -> Vec<ExecutorEvent> {
 149        self.state.lock().poll_history.clone()
 150    }
 151
 152    pub fn set_previous_execution_history(&self, history: Option<Vec<ExecutorEvent>>) {
 153        self.state.lock().previous_poll_history = history;
 154    }
 155
 156    pub fn enable_runnable_backtrace(&self) {
 157        self.state.lock().enable_runnable_backtraces = true;
 158    }
 159
 160    pub fn runnable_backtrace(&self, runnable_id: usize) -> backtrace::Backtrace {
 161        let mut backtrace = self.state.lock().runnable_backtraces[&runnable_id].clone();
 162        backtrace.resolve();
 163        backtrace
 164    }
 165
 166    pub fn build_background(self: &Arc<Self>) -> Arc<Background> {
 167        Arc::new(Background::Deterministic {
 168            executor: self.clone(),
 169        })
 170    }
 171
 172    pub fn build_foreground(self: &Arc<Self>, id: usize) -> Rc<Foreground> {
 173        Rc::new(Foreground::Deterministic {
 174            cx_id: id,
 175            executor: self.clone(),
 176        })
 177    }
 178
 179    fn spawn_from_foreground(
 180        &self,
 181        cx_id: usize,
 182        future: AnyLocalFuture,
 183        main: bool,
 184    ) -> AnyLocalTask {
 185        let state = self.state.clone();
 186        let id;
 187        {
 188            let mut state = state.lock();
 189            id = util::post_inc(&mut state.next_runnable_id);
 190            if state.enable_runnable_backtraces {
 191                state
 192                    .runnable_backtraces
 193                    .insert(id, backtrace::Backtrace::new_unresolved());
 194            }
 195        }
 196
 197        let unparker = self.parker.lock().unparker();
 198        let (runnable, task) = async_task::spawn_local(future, move |runnable| {
 199            let mut state = state.lock();
 200            state.push_to_history(ExecutorEvent::EnqueuRunnable { id });
 201            state
 202                .scheduled_from_foreground
 203                .entry(cx_id)
 204                .or_default()
 205                .push(ForegroundRunnable { id, runnable, main });
 206            unparker.unpark();
 207        });
 208        runnable.schedule();
 209        task
 210    }
 211
 212    fn spawn(&self, future: AnyFuture) -> AnyTask {
 213        let state = self.state.clone();
 214        let id;
 215        {
 216            let mut state = state.lock();
 217            id = util::post_inc(&mut state.next_runnable_id);
 218            if state.enable_runnable_backtraces {
 219                state
 220                    .runnable_backtraces
 221                    .insert(id, backtrace::Backtrace::new_unresolved());
 222            }
 223        }
 224
 225        let unparker = self.parker.lock().unparker();
 226        let (runnable, task) = async_task::spawn(future, move |runnable| {
 227            let mut state = state.lock();
 228            state
 229                .poll_history
 230                .push(ExecutorEvent::EnqueuRunnable { id });
 231            state
 232                .scheduled_from_background
 233                .push(BackgroundRunnable { id, runnable });
 234            unparker.unpark();
 235        });
 236        runnable.schedule();
 237        task
 238    }
 239
 240    fn run<'a>(
 241        &self,
 242        cx_id: usize,
 243        main_future: Pin<Box<dyn 'a + Future<Output = Box<dyn Any>>>>,
 244    ) -> Box<dyn Any> {
 245        use std::sync::atomic::{AtomicBool, Ordering::SeqCst};
 246
 247        let woken = Arc::new(AtomicBool::new(false));
 248
 249        let state = self.state.clone();
 250        let id;
 251        {
 252            let mut state = state.lock();
 253            id = util::post_inc(&mut state.next_runnable_id);
 254            if state.enable_runnable_backtraces {
 255                state
 256                    .runnable_backtraces
 257                    .insert(id, backtrace::Backtrace::new_unresolved());
 258            }
 259        }
 260
 261        let unparker = self.parker.lock().unparker();
 262        let (runnable, mut main_task) = unsafe {
 263            async_task::spawn_unchecked(main_future, move |runnable| {
 264                let state = &mut *state.lock();
 265                state
 266                    .scheduled_from_foreground
 267                    .entry(cx_id)
 268                    .or_default()
 269                    .push(ForegroundRunnable {
 270                        id: util::post_inc(&mut state.next_runnable_id),
 271                        runnable,
 272                        main: true,
 273                    });
 274                unparker.unpark();
 275            })
 276        };
 277        runnable.schedule();
 278
 279        loop {
 280            if let Some(result) = self.run_internal(woken.clone(), Some(&mut main_task)) {
 281                return result;
 282            }
 283
 284            if !woken.load(SeqCst) {
 285                self.state.lock().will_park();
 286            }
 287
 288            woken.store(false, SeqCst);
 289            self.parker.lock().park();
 290        }
 291    }
 292
 293    pub fn run_until_parked(&self) {
 294        use std::sync::atomic::AtomicBool;
 295        let woken = Arc::new(AtomicBool::new(false));
 296        self.run_internal(woken, None);
 297    }
 298
 299    fn run_internal(
 300        &self,
 301        woken: Arc<std::sync::atomic::AtomicBool>,
 302        mut main_task: Option<&mut AnyLocalTask>,
 303    ) -> Option<Box<dyn Any>> {
 304        use rand::prelude::*;
 305        use std::sync::atomic::Ordering::SeqCst;
 306
 307        let unparker = self.parker.lock().unparker();
 308        let waker = waker_fn::waker_fn(move || {
 309            woken.store(true, SeqCst);
 310            unparker.unpark();
 311        });
 312
 313        let mut cx = Context::from_waker(&waker);
 314        loop {
 315            let mut state = self.state.lock();
 316
 317            if state.scheduled_from_foreground.is_empty()
 318                && state.scheduled_from_background.is_empty()
 319            {
 320                if let Some(main_task) = main_task {
 321                    if let Poll::Ready(result) = main_task.poll(&mut cx) {
 322                        return Some(result);
 323                    }
 324                }
 325
 326                return None;
 327            }
 328
 329            if !state.scheduled_from_background.is_empty() && state.rng.gen() {
 330                let background_len = state.scheduled_from_background.len();
 331                let ix = state.rng.gen_range(0..background_len);
 332                let background_runnable = state.scheduled_from_background.remove(ix);
 333                state.push_to_history(ExecutorEvent::PollRunnable {
 334                    id: background_runnable.id,
 335                });
 336                drop(state);
 337                background_runnable.runnable.run();
 338            } else if !state.scheduled_from_foreground.is_empty() {
 339                let available_cx_ids = state
 340                    .scheduled_from_foreground
 341                    .keys()
 342                    .copied()
 343                    .collect::<Vec<_>>();
 344                let cx_id_to_run = *available_cx_ids.iter().choose(&mut state.rng).unwrap();
 345                let scheduled_from_cx = state
 346                    .scheduled_from_foreground
 347                    .get_mut(&cx_id_to_run)
 348                    .unwrap();
 349                let foreground_runnable = scheduled_from_cx.remove(0);
 350                if scheduled_from_cx.is_empty() {
 351                    state.scheduled_from_foreground.remove(&cx_id_to_run);
 352                }
 353                state.push_to_history(ExecutorEvent::PollRunnable {
 354                    id: foreground_runnable.id,
 355                });
 356
 357                drop(state);
 358
 359                foreground_runnable.runnable.run();
 360                if let Some(main_task) = main_task.as_mut() {
 361                    if foreground_runnable.main {
 362                        if let Poll::Ready(result) = main_task.poll(&mut cx) {
 363                            return Some(result);
 364                        }
 365                    }
 366                }
 367            }
 368        }
 369    }
 370
 371    fn block<F, T>(&self, future: &mut F, max_ticks: usize) -> Option<T>
 372    where
 373        F: Unpin + Future<Output = T>,
 374    {
 375        use rand::prelude::*;
 376
 377        let unparker = self.parker.lock().unparker();
 378        let waker = waker_fn::waker_fn(move || {
 379            unparker.unpark();
 380        });
 381
 382        let mut cx = Context::from_waker(&waker);
 383        for _ in 0..max_ticks {
 384            let mut state = self.state.lock();
 385            let runnable_count = state.scheduled_from_background.len();
 386            let ix = state.rng.gen_range(0..=runnable_count);
 387            if ix < state.scheduled_from_background.len() {
 388                let background_runnable = state.scheduled_from_background.remove(ix);
 389                state.push_to_history(ExecutorEvent::PollRunnable {
 390                    id: background_runnable.id,
 391                });
 392                drop(state);
 393                background_runnable.runnable.run();
 394            } else {
 395                drop(state);
 396                if let Poll::Ready(result) = future.poll(&mut cx) {
 397                    return Some(result);
 398                }
 399                let mut state = self.state.lock();
 400                if state.scheduled_from_background.is_empty() {
 401                    state.will_park();
 402                    drop(state);
 403                    self.parker.lock().park();
 404                }
 405
 406                continue;
 407            }
 408        }
 409
 410        None
 411    }
 412
 413    pub fn timer(&self, duration: Duration) -> Timer {
 414        let (tx, rx) = postage::barrier::channel();
 415        let mut state = self.state.lock();
 416        let wakeup_at = state.now + duration;
 417        let id = util::post_inc(&mut state.next_timer_id);
 418        match state
 419            .pending_timers
 420            .binary_search_by_key(&wakeup_at, |e| e.1)
 421        {
 422            Ok(ix) | Err(ix) => state.pending_timers.insert(ix, (id, wakeup_at, tx)),
 423        }
 424        let state = self.state.clone();
 425        Timer::Deterministic(DeterministicTimer { rx, id, state })
 426    }
 427
 428    pub fn now(&self) -> std::time::Instant {
 429        let state = self.state.lock();
 430        state.now
 431    }
 432
 433    pub fn advance_clock(&self, duration: Duration) {
 434        let new_now = self.state.lock().now + duration;
 435        loop {
 436            self.run_until_parked();
 437            let mut state = self.state.lock();
 438
 439            if let Some((_, wakeup_time, _)) = state.pending_timers.first() {
 440                let wakeup_time = *wakeup_time;
 441                if wakeup_time <= new_now {
 442                    let timer_count = state
 443                        .pending_timers
 444                        .iter()
 445                        .take_while(|(_, t, _)| *t == wakeup_time)
 446                        .count();
 447                    state.now = wakeup_time;
 448                    let timers_to_wake = state
 449                        .pending_timers
 450                        .drain(0..timer_count)
 451                        .collect::<Vec<_>>();
 452                    drop(state);
 453                    drop(timers_to_wake);
 454                    continue;
 455                }
 456            }
 457
 458            break;
 459        }
 460
 461        self.state.lock().now = new_now;
 462    }
 463
 464    pub fn start_waiting(&self) {
 465        self.state.lock().waiting_backtrace = Some(backtrace::Backtrace::new_unresolved());
 466    }
 467
 468    pub fn finish_waiting(&self) {
 469        self.state.lock().waiting_backtrace.take();
 470    }
 471
 472    pub fn forbid_parking(&self) {
 473        use rand::prelude::*;
 474
 475        let mut state = self.state.lock();
 476        state.forbid_parking = true;
 477        state.rng = StdRng::seed_from_u64(state.seed);
 478    }
 479
 480    pub fn allow_parking(&self) {
 481        use rand::prelude::*;
 482
 483        let mut state = self.state.lock();
 484        state.forbid_parking = false;
 485        state.rng = StdRng::seed_from_u64(state.seed);
 486    }
 487
 488    pub async fn simulate_random_delay(&self) {
 489        use rand::prelude::*;
 490        use smol::future::yield_now;
 491        if self.state.lock().rng.gen_bool(0.2) {
 492            let yields = self.state.lock().rng.gen_range(1..=10);
 493            for _ in 0..yields {
 494                yield_now().await;
 495            }
 496        }
 497    }
 498
 499    pub fn record_backtrace(&self) {
 500        let mut state = self.state.lock();
 501        if state.enable_runnable_backtraces {
 502            let current_id = state
 503                .poll_history
 504                .iter()
 505                .rev()
 506                .find_map(|event| match event {
 507                    ExecutorEvent::PollRunnable { id } => Some(*id),
 508                    _ => None,
 509                });
 510            if let Some(id) = current_id {
 511                state
 512                    .runnable_backtraces
 513                    .insert(id, backtrace::Backtrace::new_unresolved());
 514            }
 515        }
 516    }
 517}
 518
 519impl Drop for Timer {
 520    fn drop(&mut self) {
 521        #[cfg(any(test, feature = "test-support"))]
 522        if let Timer::Deterministic(DeterministicTimer { state, id, .. }) = self {
 523            state
 524                .lock()
 525                .pending_timers
 526                .retain(|(timer_id, _, _)| timer_id != id)
 527        }
 528    }
 529}
 530
 531impl Future for Timer {
 532    type Output = ();
 533
 534    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
 535        match &mut *self {
 536            #[cfg(any(test, feature = "test-support"))]
 537            Self::Deterministic(DeterministicTimer { rx, .. }) => {
 538                use postage::stream::{PollRecv, Stream as _};
 539                smol::pin!(rx);
 540                match rx.poll_recv(&mut postage::Context::from_waker(cx.waker())) {
 541                    PollRecv::Ready(()) | PollRecv::Closed => Poll::Ready(()),
 542                    PollRecv::Pending => Poll::Pending,
 543                }
 544            }
 545            Self::Production(timer) => {
 546                smol::pin!(timer);
 547                match timer.poll(cx) {
 548                    Poll::Ready(_) => Poll::Ready(()),
 549                    Poll::Pending => Poll::Pending,
 550                }
 551            }
 552        }
 553    }
 554}
 555
 556#[cfg(any(test, feature = "test-support"))]
 557impl DeterministicState {
 558    fn push_to_history(&mut self, event: ExecutorEvent) {
 559        use std::fmt::Write as _;
 560
 561        self.poll_history.push(event);
 562        if let Some(prev_history) = &self.previous_poll_history {
 563            let ix = self.poll_history.len() - 1;
 564            let prev_event = prev_history[ix];
 565            if event != prev_event {
 566                let mut message = String::new();
 567                writeln!(
 568                    &mut message,
 569                    "current runnable backtrace:\n{:?}",
 570                    self.runnable_backtraces.get_mut(&event.id()).map(|trace| {
 571                        trace.resolve();
 572                        util::CwdBacktrace(trace)
 573                    })
 574                )
 575                .unwrap();
 576                writeln!(
 577                    &mut message,
 578                    "previous runnable backtrace:\n{:?}",
 579                    self.runnable_backtraces
 580                        .get_mut(&prev_event.id())
 581                        .map(|trace| {
 582                            trace.resolve();
 583                            util::CwdBacktrace(trace)
 584                        })
 585                )
 586                .unwrap();
 587                panic!("detected non-determinism after {ix}. {message}");
 588            }
 589        }
 590    }
 591
 592    fn will_park(&mut self) {
 593        if self.forbid_parking {
 594            let mut backtrace_message = String::new();
 595            #[cfg(any(test, feature = "test-support"))]
 596            if let Some(backtrace) = self.waiting_backtrace.as_mut() {
 597                backtrace.resolve();
 598                backtrace_message = format!(
 599                    "\nbacktrace of waiting future:\n{:?}",
 600                    util::CwdBacktrace(backtrace)
 601                );
 602            }
 603
 604            panic!(
 605                "deterministic executor parked after a call to forbid_parking{}",
 606                backtrace_message
 607            );
 608        }
 609    }
 610}
 611
 612#[cfg(any(test, feature = "test-support"))]
 613impl ExecutorEvent {
 614    pub fn id(&self) -> usize {
 615        match self {
 616            ExecutorEvent::PollRunnable { id } => *id,
 617            ExecutorEvent::EnqueuRunnable { id } => *id,
 618        }
 619    }
 620}
 621
 622impl Foreground {
 623    pub fn platform(dispatcher: Arc<dyn platform::Dispatcher>) -> Result<Self> {
 624        if dispatcher.is_main_thread() {
 625            Ok(Self::Platform {
 626                dispatcher,
 627                _not_send_or_sync: PhantomData,
 628            })
 629        } else {
 630            Err(anyhow!("must be constructed on main thread"))
 631        }
 632    }
 633
 634    pub fn spawn<T: 'static>(&self, future: impl Future<Output = T> + 'static) -> Task<T> {
 635        let future = any_local_future(future);
 636        let any_task = match self {
 637            #[cfg(any(test, feature = "test-support"))]
 638            Self::Deterministic { cx_id, executor } => {
 639                executor.spawn_from_foreground(*cx_id, future, false)
 640            }
 641            Self::Platform { dispatcher, .. } => {
 642                fn spawn_inner(
 643                    future: AnyLocalFuture,
 644                    dispatcher: &Arc<dyn Dispatcher>,
 645                ) -> AnyLocalTask {
 646                    let dispatcher = dispatcher.clone();
 647                    let schedule =
 648                        move |runnable: Runnable| dispatcher.run_on_main_thread(runnable);
 649                    let (runnable, task) = async_task::spawn_local(future, schedule);
 650                    runnable.schedule();
 651                    task
 652                }
 653                spawn_inner(future, dispatcher)
 654            }
 655        };
 656        Task::local(any_task)
 657    }
 658
 659    #[cfg(any(test, feature = "test-support"))]
 660    pub fn run<T: 'static>(&self, future: impl Future<Output = T>) -> T {
 661        let future = async move { Box::new(future.await) as Box<dyn Any> }.boxed_local();
 662        let result = match self {
 663            Self::Deterministic { cx_id, executor } => executor.run(*cx_id, future),
 664            Self::Platform { .. } => panic!("you can't call run on a platform foreground executor"),
 665        };
 666        *result.downcast().unwrap()
 667    }
 668
 669    #[cfg(any(test, feature = "test-support"))]
 670    pub fn run_until_parked(&self) {
 671        match self {
 672            Self::Deterministic { executor, .. } => executor.run_until_parked(),
 673            _ => panic!("this method can only be called on a deterministic executor"),
 674        }
 675    }
 676
 677    #[cfg(any(test, feature = "test-support"))]
 678    pub fn parking_forbidden(&self) -> bool {
 679        match self {
 680            Self::Deterministic { executor, .. } => executor.state.lock().forbid_parking,
 681            _ => panic!("this method can only be called on a deterministic executor"),
 682        }
 683    }
 684
 685    #[cfg(any(test, feature = "test-support"))]
 686    pub fn start_waiting(&self) {
 687        match self {
 688            Self::Deterministic { executor, .. } => executor.start_waiting(),
 689            _ => panic!("this method can only be called on a deterministic executor"),
 690        }
 691    }
 692
 693    #[cfg(any(test, feature = "test-support"))]
 694    pub fn finish_waiting(&self) {
 695        match self {
 696            Self::Deterministic { executor, .. } => executor.finish_waiting(),
 697            _ => panic!("this method can only be called on a deterministic executor"),
 698        }
 699    }
 700
 701    #[cfg(any(test, feature = "test-support"))]
 702    pub fn forbid_parking(&self) {
 703        match self {
 704            Self::Deterministic { executor, .. } => executor.forbid_parking(),
 705            _ => panic!("this method can only be called on a deterministic executor"),
 706        }
 707    }
 708
 709    #[cfg(any(test, feature = "test-support"))]
 710    pub fn allow_parking(&self) {
 711        match self {
 712            Self::Deterministic { executor, .. } => executor.allow_parking(),
 713            _ => panic!("this method can only be called on a deterministic executor"),
 714        }
 715    }
 716
 717    #[cfg(any(test, feature = "test-support"))]
 718    pub fn advance_clock(&self, duration: Duration) {
 719        match self {
 720            Self::Deterministic { executor, .. } => executor.advance_clock(duration),
 721            _ => panic!("this method can only be called on a deterministic executor"),
 722        }
 723    }
 724
 725    #[cfg(any(test, feature = "test-support"))]
 726    pub fn set_block_on_ticks(&self, range: std::ops::RangeInclusive<usize>) {
 727        match self {
 728            Self::Deterministic { executor, .. } => executor.state.lock().block_on_ticks = range,
 729            _ => panic!("this method can only be called on a deterministic executor"),
 730        }
 731    }
 732}
 733
 734impl Background {
 735    pub fn new() -> Self {
 736        let executor = Arc::new(Executor::new());
 737        let stop = channel::unbounded::<()>();
 738
 739        for i in 0..2 * num_cpus::get() {
 740            let executor = executor.clone();
 741            let stop = stop.1.clone();
 742            thread::Builder::new()
 743                .name(format!("background-executor-{}", i))
 744                .spawn(move || smol::block_on(executor.run(stop.recv())))
 745                .unwrap();
 746        }
 747
 748        Self::Production {
 749            executor,
 750            _stop: stop.0,
 751        }
 752    }
 753
 754    pub fn num_cpus(&self) -> usize {
 755        num_cpus::get()
 756    }
 757
 758    pub fn spawn<T, F>(&self, future: F) -> Task<T>
 759    where
 760        T: 'static + Send,
 761        F: Send + Future<Output = T> + 'static,
 762    {
 763        let future = any_future(future);
 764        let any_task = match self {
 765            Self::Production { executor, .. } => executor.spawn(future),
 766            #[cfg(any(test, feature = "test-support"))]
 767            Self::Deterministic { executor } => executor.spawn(future),
 768        };
 769        Task::send(any_task)
 770    }
 771
 772    pub fn block<F, T>(&self, future: F) -> T
 773    where
 774        F: Future<Output = T>,
 775    {
 776        smol::pin!(future);
 777        match self {
 778            Self::Production { .. } => smol::block_on(&mut future),
 779            #[cfg(any(test, feature = "test-support"))]
 780            Self::Deterministic { executor, .. } => {
 781                executor.block(&mut future, usize::MAX).unwrap()
 782            }
 783        }
 784    }
 785
 786    pub fn block_with_timeout<F, T>(
 787        &self,
 788        timeout: Duration,
 789        future: F,
 790    ) -> Result<T, impl Future<Output = T>>
 791    where
 792        T: 'static,
 793        F: 'static + Unpin + Future<Output = T>,
 794    {
 795        let mut future = any_local_future(future);
 796        if !timeout.is_zero() {
 797            let output = match self {
 798                Self::Production { .. } => smol::block_on(util::timeout(timeout, &mut future)).ok(),
 799                #[cfg(any(test, feature = "test-support"))]
 800                Self::Deterministic { executor, .. } => {
 801                    use rand::prelude::*;
 802                    let max_ticks = {
 803                        let mut state = executor.state.lock();
 804                        let range = state.block_on_ticks.clone();
 805                        state.rng.gen_range(range)
 806                    };
 807                    executor.block(&mut future, max_ticks)
 808                }
 809            };
 810            if let Some(output) = output {
 811                return Ok(*output.downcast().unwrap());
 812            }
 813        }
 814        Err(async { *future.await.downcast().unwrap() })
 815    }
 816
 817    pub async fn scoped<'scope, F>(self: &Arc<Self>, scheduler: F)
 818    where
 819        F: FnOnce(&mut Scope<'scope>),
 820    {
 821        let mut scope = Scope::new(self.clone());
 822        (scheduler)(&mut scope);
 823        let spawned = mem::take(&mut scope.futures)
 824            .into_iter()
 825            .map(|f| self.spawn(f))
 826            .collect::<Vec<_>>();
 827        for task in spawned {
 828            task.await;
 829        }
 830    }
 831
 832    pub fn timer(&self, duration: Duration) -> Timer {
 833        match self {
 834            Background::Production { .. } => Timer::Production(smol::Timer::after(duration)),
 835            #[cfg(any(test, feature = "test-support"))]
 836            Background::Deterministic { executor } => executor.timer(duration),
 837        }
 838    }
 839
 840    pub fn now(&self) -> std::time::Instant {
 841        match self {
 842            Background::Production { .. } => std::time::Instant::now(),
 843            #[cfg(any(test, feature = "test-support"))]
 844            Background::Deterministic { executor } => executor.now(),
 845        }
 846    }
 847
 848    #[cfg(any(test, feature = "test-support"))]
 849    pub fn rng<'a>(&'a self) -> impl 'a + std::ops::DerefMut<Target = rand::prelude::StdRng> {
 850        match self {
 851            Self::Deterministic { executor, .. } => {
 852                parking_lot::lock_api::MutexGuard::map(executor.state.lock(), |s| &mut s.rng)
 853            }
 854            _ => panic!("this method can only be called on a deterministic executor"),
 855        }
 856    }
 857
 858    #[cfg(any(test, feature = "test-support"))]
 859    pub async fn simulate_random_delay(&self) {
 860        match self {
 861            Self::Deterministic { executor, .. } => {
 862                executor.simulate_random_delay().await;
 863            }
 864            _ => {
 865                panic!("this method can only be called on a deterministic executor")
 866            }
 867        }
 868    }
 869
 870    #[cfg(any(test, feature = "test-support"))]
 871    pub fn record_backtrace(&self) {
 872        match self {
 873            Self::Deterministic { executor, .. } => executor.record_backtrace(),
 874            _ => {
 875                panic!("this method can only be called on a deterministic executor")
 876            }
 877        }
 878    }
 879
 880    #[cfg(any(test, feature = "test-support"))]
 881    pub fn start_waiting(&self) {
 882        match self {
 883            Self::Deterministic { executor, .. } => executor.start_waiting(),
 884            _ => panic!("this method can only be called on a deterministic executor"),
 885        }
 886    }
 887}
 888
 889impl Default for Background {
 890    fn default() -> Self {
 891        Self::new()
 892    }
 893}
 894
 895pub struct Scope<'a> {
 896    executor: Arc<Background>,
 897    futures: Vec<Pin<Box<dyn Future<Output = ()> + Send + 'static>>>,
 898    tx: Option<mpsc::Sender<()>>,
 899    rx: mpsc::Receiver<()>,
 900    _phantom: PhantomData<&'a ()>,
 901}
 902
 903impl<'a> Scope<'a> {
 904    fn new(executor: Arc<Background>) -> Self {
 905        let (tx, rx) = mpsc::channel(1);
 906        Self {
 907            executor,
 908            tx: Some(tx),
 909            rx,
 910            futures: Default::default(),
 911            _phantom: PhantomData,
 912        }
 913    }
 914
 915    pub fn spawn<F>(&mut self, f: F)
 916    where
 917        F: Future<Output = ()> + Send + 'a,
 918    {
 919        let tx = self.tx.clone().unwrap();
 920
 921        // Safety: The 'a lifetime is guaranteed to outlive any of these futures because
 922        // dropping this `Scope` blocks until all of the futures have resolved.
 923        let f = unsafe {
 924            mem::transmute::<
 925                Pin<Box<dyn Future<Output = ()> + Send + 'a>>,
 926                Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
 927            >(Box::pin(async move {
 928                f.await;
 929                drop(tx);
 930            }))
 931        };
 932        self.futures.push(f);
 933    }
 934}
 935
 936impl<'a> Drop for Scope<'a> {
 937    fn drop(&mut self) {
 938        self.tx.take().unwrap();
 939
 940        // Wait until the channel is closed, which means that all of the spawned
 941        // futures have resolved.
 942        self.executor.block(self.rx.next());
 943    }
 944}
 945
 946impl<T> Task<T> {
 947    pub fn ready(value: T) -> Self {
 948        Self::Ready(Some(value))
 949    }
 950
 951    fn local(any_task: AnyLocalTask) -> Self {
 952        Self::Local {
 953            any_task,
 954            result_type: PhantomData,
 955        }
 956    }
 957
 958    pub fn detach(self) {
 959        match self {
 960            Task::Ready(_) => {}
 961            Task::Local { any_task, .. } => any_task.detach(),
 962            Task::Send { any_task, .. } => any_task.detach(),
 963        }
 964    }
 965}
 966
 967impl<T: 'static, E: 'static + Display> Task<Result<T, E>> {
 968    #[track_caller]
 969    pub fn detach_and_log_err(self, cx: &mut AppContext) {
 970        cx.spawn(|_| async move {
 971            if let Err(err) = self.await {
 972                let caller = Location::caller();
 973                log::error!("{}:{}: {:#}", caller.file(), caller.line(), err);
 974            }
 975        })
 976        .detach();
 977    }
 978}
 979
 980impl<T: Send> Task<T> {
 981    fn send(any_task: AnyTask) -> Self {
 982        Self::Send {
 983            any_task,
 984            result_type: PhantomData,
 985        }
 986    }
 987}
 988
 989impl<T: fmt::Debug> fmt::Debug for Task<T> {
 990    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
 991        match self {
 992            Task::Ready(value) => value.fmt(f),
 993            Task::Local { any_task, .. } => any_task.fmt(f),
 994            Task::Send { any_task, .. } => any_task.fmt(f),
 995        }
 996    }
 997}
 998
 999impl<T: 'static> Future for Task<T> {
1000    type Output = T;
1001
1002    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1003        match unsafe { self.get_unchecked_mut() } {
1004            Task::Ready(value) => Poll::Ready(value.take().unwrap()),
1005            Task::Local { any_task, .. } => {
1006                any_task.poll(cx).map(|value| *value.downcast().unwrap())
1007            }
1008            Task::Send { any_task, .. } => {
1009                any_task.poll(cx).map(|value| *value.downcast().unwrap())
1010            }
1011        }
1012    }
1013}
1014
1015fn any_future<T, F>(future: F) -> AnyFuture
1016where
1017    T: 'static + Send,
1018    F: Future<Output = T> + Send + 'static,
1019{
1020    async { Box::new(future.await) as Box<dyn Any + Send> }.boxed()
1021}
1022
1023fn any_local_future<T, F>(future: F) -> AnyLocalFuture
1024where
1025    T: 'static,
1026    F: Future<Output = T> + 'static,
1027{
1028    async { Box::new(future.await) as Box<dyn Any> }.boxed_local()
1029}