executor.rs

   1use anyhow::{anyhow, Result};
   2use async_task::Runnable;
   3use futures::channel::mpsc;
   4use smol::{channel, prelude::*, Executor};
   5use std::{
   6    any::Any,
   7    fmt::{self, Display},
   8    marker::PhantomData,
   9    mem,
  10    panic::Location,
  11    pin::Pin,
  12    rc::Rc,
  13    sync::Arc,
  14    task::{Context, Poll},
  15    thread,
  16    time::Duration,
  17};
  18
  19use crate::{
  20    platform::{self, Dispatcher},
  21    util, AppContext,
  22};
  23
  24pub enum Foreground {
  25    Platform {
  26        dispatcher: Arc<dyn platform::Dispatcher>,
  27        _not_send_or_sync: PhantomData<Rc<()>>,
  28    },
  29    #[cfg(any(test, feature = "test-support"))]
  30    Deterministic {
  31        cx_id: usize,
  32        executor: Arc<Deterministic>,
  33    },
  34}
  35
  36pub enum Background {
  37    #[cfg(any(test, feature = "test-support"))]
  38    Deterministic { executor: Arc<Deterministic> },
  39    Production {
  40        executor: Arc<smol::Executor<'static>>,
  41        _stop: channel::Sender<()>,
  42    },
  43}
  44
  45type AnyLocalFuture = Pin<Box<dyn 'static + Future<Output = Box<dyn Any + 'static>>>>;
  46type AnyFuture = Pin<Box<dyn 'static + Send + Future<Output = Box<dyn Any + Send + 'static>>>>;
  47type AnyTask = async_task::Task<Box<dyn Any + Send + 'static>>;
  48type AnyLocalTask = async_task::Task<Box<dyn Any + 'static>>;
  49
  50#[must_use]
  51pub enum Task<T> {
  52    Ready(Option<T>),
  53    Local {
  54        any_task: AnyLocalTask,
  55        result_type: PhantomData<T>,
  56    },
  57    Send {
  58        any_task: AnyTask,
  59        result_type: PhantomData<T>,
  60    },
  61}
  62
  63unsafe impl<T: Send> Send for Task<T> {}
  64
  65#[cfg(any(test, feature = "test-support"))]
  66struct DeterministicState {
  67    rng: rand::prelude::StdRng,
  68    seed: u64,
  69    scheduled_from_foreground: collections::HashMap<usize, Vec<ForegroundRunnable>>,
  70    scheduled_from_background: Vec<BackgroundRunnable>,
  71    forbid_parking: bool,
  72    block_on_ticks: std::ops::RangeInclusive<usize>,
  73    now: std::time::Instant,
  74    next_timer_id: usize,
  75    pending_timers: Vec<(usize, std::time::Instant, postage::barrier::Sender)>,
  76    waiting_backtrace: Option<backtrace::Backtrace>,
  77    next_runnable_id: usize,
  78    poll_history: Vec<ExecutorEvent>,
  79    previous_poll_history: Option<Vec<ExecutorEvent>>,
  80    enable_runnable_backtraces: bool,
  81    runnable_backtraces: collections::HashMap<usize, backtrace::Backtrace>,
  82}
  83
  84#[derive(Copy, Clone, Debug, PartialEq, Eq)]
  85pub enum ExecutorEvent {
  86    PollRunnable { id: usize },
  87    EnqueuRunnable { id: usize },
  88}
  89
  90#[cfg(any(test, feature = "test-support"))]
  91struct ForegroundRunnable {
  92    id: usize,
  93    runnable: Runnable,
  94    main: bool,
  95}
  96
  97#[cfg(any(test, feature = "test-support"))]
  98struct BackgroundRunnable {
  99    id: usize,
 100    runnable: Runnable,
 101}
 102
 103#[cfg(any(test, feature = "test-support"))]
 104pub struct Deterministic {
 105    state: Arc<parking_lot::Mutex<DeterministicState>>,
 106    parker: parking_lot::Mutex<parking::Parker>,
 107}
 108
 109pub enum Timer {
 110    Production(smol::Timer),
 111    #[cfg(any(test, feature = "test-support"))]
 112    Deterministic(DeterministicTimer),
 113}
 114
 115#[cfg(any(test, feature = "test-support"))]
 116pub struct DeterministicTimer {
 117    rx: postage::barrier::Receiver,
 118    id: usize,
 119    state: Arc<parking_lot::Mutex<DeterministicState>>,
 120}
 121
 122#[cfg(any(test, feature = "test-support"))]
 123impl Deterministic {
 124    pub fn new(seed: u64) -> Arc<Self> {
 125        use rand::prelude::*;
 126
 127        Arc::new(Self {
 128            state: Arc::new(parking_lot::Mutex::new(DeterministicState {
 129                rng: StdRng::seed_from_u64(seed),
 130                seed,
 131                scheduled_from_foreground: Default::default(),
 132                scheduled_from_background: Default::default(),
 133                forbid_parking: false,
 134                block_on_ticks: 0..=1000,
 135                now: std::time::Instant::now(),
 136                next_timer_id: Default::default(),
 137                pending_timers: Default::default(),
 138                waiting_backtrace: None,
 139                next_runnable_id: 0,
 140                poll_history: Default::default(),
 141                previous_poll_history: Default::default(),
 142                enable_runnable_backtraces: false,
 143                runnable_backtraces: Default::default(),
 144            })),
 145            parker: Default::default(),
 146        })
 147    }
 148
 149    pub fn execution_history(&self) -> Vec<ExecutorEvent> {
 150        self.state.lock().poll_history.clone()
 151    }
 152
 153    pub fn set_previous_execution_history(&self, history: Option<Vec<ExecutorEvent>>) {
 154        self.state.lock().previous_poll_history = history;
 155    }
 156
 157    pub fn enable_runnable_backtrace(&self) {
 158        self.state.lock().enable_runnable_backtraces = true;
 159    }
 160
 161    pub fn runnable_backtrace(&self, runnable_id: usize) -> backtrace::Backtrace {
 162        let mut backtrace = self.state.lock().runnable_backtraces[&runnable_id].clone();
 163        backtrace.resolve();
 164        backtrace
 165    }
 166
 167    pub fn build_background(self: &Arc<Self>) -> Arc<Background> {
 168        Arc::new(Background::Deterministic {
 169            executor: self.clone(),
 170        })
 171    }
 172
 173    pub fn build_foreground(self: &Arc<Self>, id: usize) -> Rc<Foreground> {
 174        Rc::new(Foreground::Deterministic {
 175            cx_id: id,
 176            executor: self.clone(),
 177        })
 178    }
 179
 180    fn spawn_from_foreground(
 181        &self,
 182        cx_id: usize,
 183        future: AnyLocalFuture,
 184        main: bool,
 185    ) -> AnyLocalTask {
 186        let state = self.state.clone();
 187        let id;
 188        {
 189            let mut state = state.lock();
 190            id = util::post_inc(&mut state.next_runnable_id);
 191            if state.enable_runnable_backtraces {
 192                state
 193                    .runnable_backtraces
 194                    .insert(id, backtrace::Backtrace::new_unresolved());
 195            }
 196        }
 197
 198        let unparker = self.parker.lock().unparker();
 199        let (runnable, task) = async_task::spawn_local(future, move |runnable| {
 200            let mut state = state.lock();
 201            state.push_to_history(ExecutorEvent::EnqueuRunnable { id });
 202            state
 203                .scheduled_from_foreground
 204                .entry(cx_id)
 205                .or_default()
 206                .push(ForegroundRunnable { id, runnable, main });
 207            unparker.unpark();
 208        });
 209        runnable.schedule();
 210        task
 211    }
 212
 213    fn spawn(&self, future: AnyFuture) -> AnyTask {
 214        let state = self.state.clone();
 215        let id;
 216        {
 217            let mut state = state.lock();
 218            id = util::post_inc(&mut state.next_runnable_id);
 219            if state.enable_runnable_backtraces {
 220                state
 221                    .runnable_backtraces
 222                    .insert(id, backtrace::Backtrace::new_unresolved());
 223            }
 224        }
 225
 226        let unparker = self.parker.lock().unparker();
 227        let (runnable, task) = async_task::spawn(future, move |runnable| {
 228            let mut state = state.lock();
 229            state
 230                .poll_history
 231                .push(ExecutorEvent::EnqueuRunnable { id });
 232            state
 233                .scheduled_from_background
 234                .push(BackgroundRunnable { id, runnable });
 235            unparker.unpark();
 236        });
 237        runnable.schedule();
 238        task
 239    }
 240
 241    fn run<'a>(
 242        &self,
 243        cx_id: usize,
 244        main_future: Pin<Box<dyn 'a + Future<Output = Box<dyn Any>>>>,
 245    ) -> Box<dyn Any> {
 246        use std::sync::atomic::{AtomicBool, Ordering::SeqCst};
 247
 248        let woken = Arc::new(AtomicBool::new(false));
 249
 250        let state = self.state.clone();
 251        let id;
 252        {
 253            let mut state = state.lock();
 254            id = util::post_inc(&mut state.next_runnable_id);
 255            if state.enable_runnable_backtraces {
 256                state
 257                    .runnable_backtraces
 258                    .insert(id, backtrace::Backtrace::new_unresolved());
 259            }
 260        }
 261
 262        let unparker = self.parker.lock().unparker();
 263        let (runnable, mut main_task) = unsafe {
 264            async_task::spawn_unchecked(main_future, move |runnable| {
 265                let state = &mut *state.lock();
 266                state
 267                    .scheduled_from_foreground
 268                    .entry(cx_id)
 269                    .or_default()
 270                    .push(ForegroundRunnable {
 271                        id: util::post_inc(&mut state.next_runnable_id),
 272                        runnable,
 273                        main: true,
 274                    });
 275                unparker.unpark();
 276            })
 277        };
 278        runnable.schedule();
 279
 280        loop {
 281            if let Some(result) = self.run_internal(woken.clone(), Some(&mut main_task)) {
 282                return result;
 283            }
 284
 285            if !woken.load(SeqCst) {
 286                self.state.lock().will_park();
 287            }
 288
 289            woken.store(false, SeqCst);
 290            self.parker.lock().park();
 291        }
 292    }
 293
 294    pub fn run_until_parked(&self) {
 295        use std::sync::atomic::AtomicBool;
 296        let woken = Arc::new(AtomicBool::new(false));
 297        self.run_internal(woken, None);
 298    }
 299
 300    fn run_internal(
 301        &self,
 302        woken: Arc<std::sync::atomic::AtomicBool>,
 303        mut main_task: Option<&mut AnyLocalTask>,
 304    ) -> Option<Box<dyn Any>> {
 305        use rand::prelude::*;
 306        use std::sync::atomic::Ordering::SeqCst;
 307
 308        let unparker = self.parker.lock().unparker();
 309        let waker = waker_fn::waker_fn(move || {
 310            woken.store(true, SeqCst);
 311            unparker.unpark();
 312        });
 313
 314        let mut cx = Context::from_waker(&waker);
 315        loop {
 316            let mut state = self.state.lock();
 317
 318            if state.scheduled_from_foreground.is_empty()
 319                && state.scheduled_from_background.is_empty()
 320            {
 321                if let Some(main_task) = main_task {
 322                    if let Poll::Ready(result) = main_task.poll(&mut cx) {
 323                        return Some(result);
 324                    }
 325                }
 326
 327                return None;
 328            }
 329
 330            if !state.scheduled_from_background.is_empty() && state.rng.gen() {
 331                let background_len = state.scheduled_from_background.len();
 332                let ix = state.rng.gen_range(0..background_len);
 333                let background_runnable = state.scheduled_from_background.remove(ix);
 334                state.push_to_history(ExecutorEvent::PollRunnable {
 335                    id: background_runnable.id,
 336                });
 337                drop(state);
 338                background_runnable.runnable.run();
 339            } else if !state.scheduled_from_foreground.is_empty() {
 340                let available_cx_ids = state
 341                    .scheduled_from_foreground
 342                    .keys()
 343                    .copied()
 344                    .collect::<Vec<_>>();
 345                let cx_id_to_run = *available_cx_ids.iter().choose(&mut state.rng).unwrap();
 346                let scheduled_from_cx = state
 347                    .scheduled_from_foreground
 348                    .get_mut(&cx_id_to_run)
 349                    .unwrap();
 350                let foreground_runnable = scheduled_from_cx.remove(0);
 351                if scheduled_from_cx.is_empty() {
 352                    state.scheduled_from_foreground.remove(&cx_id_to_run);
 353                }
 354                state.push_to_history(ExecutorEvent::PollRunnable {
 355                    id: foreground_runnable.id,
 356                });
 357
 358                drop(state);
 359
 360                foreground_runnable.runnable.run();
 361                if let Some(main_task) = main_task.as_mut() {
 362                    if foreground_runnable.main {
 363                        if let Poll::Ready(result) = main_task.poll(&mut cx) {
 364                            return Some(result);
 365                        }
 366                    }
 367                }
 368            }
 369        }
 370    }
 371
 372    fn block<F, T>(&self, future: &mut F, max_ticks: usize) -> Option<T>
 373    where
 374        F: Unpin + Future<Output = T>,
 375    {
 376        use rand::prelude::*;
 377
 378        let unparker = self.parker.lock().unparker();
 379        let waker = waker_fn::waker_fn(move || {
 380            unparker.unpark();
 381        });
 382
 383        let mut cx = Context::from_waker(&waker);
 384        for _ in 0..max_ticks {
 385            let mut state = self.state.lock();
 386            let runnable_count = state.scheduled_from_background.len();
 387            let ix = state.rng.gen_range(0..=runnable_count);
 388            if ix < state.scheduled_from_background.len() {
 389                let background_runnable = state.scheduled_from_background.remove(ix);
 390                state.push_to_history(ExecutorEvent::PollRunnable {
 391                    id: background_runnable.id,
 392                });
 393                drop(state);
 394                background_runnable.runnable.run();
 395            } else {
 396                drop(state);
 397                if let Poll::Ready(result) = future.poll(&mut cx) {
 398                    return Some(result);
 399                }
 400                let mut state = self.state.lock();
 401                if state.scheduled_from_background.is_empty() {
 402                    state.will_park();
 403                    drop(state);
 404                    self.parker.lock().park();
 405                }
 406
 407                continue;
 408            }
 409        }
 410
 411        None
 412    }
 413
 414    pub fn timer(&self, duration: Duration) -> Timer {
 415        let (tx, rx) = postage::barrier::channel();
 416        let mut state = self.state.lock();
 417        let wakeup_at = state.now + duration;
 418        let id = util::post_inc(&mut state.next_timer_id);
 419        match state
 420            .pending_timers
 421            .binary_search_by_key(&wakeup_at, |e| e.1)
 422        {
 423            Ok(ix) | Err(ix) => state.pending_timers.insert(ix, (id, wakeup_at, tx)),
 424        }
 425        let state = self.state.clone();
 426        Timer::Deterministic(DeterministicTimer { rx, id, state })
 427    }
 428
 429    pub fn now(&self) -> std::time::Instant {
 430        let state = self.state.lock();
 431        state.now
 432    }
 433
 434    pub fn advance_clock(&self, duration: Duration) {
 435        let new_now = self.state.lock().now + duration;
 436        loop {
 437            self.run_until_parked();
 438            let mut state = self.state.lock();
 439
 440            if let Some((_, wakeup_time, _)) = state.pending_timers.first() {
 441                let wakeup_time = *wakeup_time;
 442                if wakeup_time <= new_now {
 443                    let timer_count = state
 444                        .pending_timers
 445                        .iter()
 446                        .take_while(|(_, t, _)| *t == wakeup_time)
 447                        .count();
 448                    state.now = wakeup_time;
 449                    let timers_to_wake = state
 450                        .pending_timers
 451                        .drain(0..timer_count)
 452                        .collect::<Vec<_>>();
 453                    drop(state);
 454                    drop(timers_to_wake);
 455                    continue;
 456                }
 457            }
 458
 459            break;
 460        }
 461
 462        self.state.lock().now = new_now;
 463    }
 464
 465    pub fn start_waiting(&self) {
 466        self.state.lock().waiting_backtrace = Some(backtrace::Backtrace::new_unresolved());
 467    }
 468
 469    pub fn finish_waiting(&self) {
 470        self.state.lock().waiting_backtrace.take();
 471    }
 472
 473    pub fn forbid_parking(&self) {
 474        use rand::prelude::*;
 475
 476        let mut state = self.state.lock();
 477        state.forbid_parking = true;
 478        state.rng = StdRng::seed_from_u64(state.seed);
 479    }
 480
 481    pub fn allow_parking(&self) {
 482        use rand::prelude::*;
 483
 484        let mut state = self.state.lock();
 485        state.forbid_parking = false;
 486        state.rng = StdRng::seed_from_u64(state.seed);
 487    }
 488
 489    pub async fn simulate_random_delay(&self) {
 490        use rand::prelude::*;
 491        use smol::future::yield_now;
 492        if self.state.lock().rng.gen_bool(0.2) {
 493            let yields = self.state.lock().rng.gen_range(1..=10);
 494            for _ in 0..yields {
 495                yield_now().await;
 496            }
 497        }
 498    }
 499
 500    pub fn record_backtrace(&self) {
 501        let mut state = self.state.lock();
 502        if state.enable_runnable_backtraces {
 503            let current_id = state
 504                .poll_history
 505                .iter()
 506                .rev()
 507                .find_map(|event| match event {
 508                    ExecutorEvent::PollRunnable { id } => Some(*id),
 509                    _ => None,
 510                });
 511            if let Some(id) = current_id {
 512                state
 513                    .runnable_backtraces
 514                    .insert(id, backtrace::Backtrace::new_unresolved());
 515            }
 516        }
 517    }
 518}
 519
 520impl Drop for Timer {
 521    fn drop(&mut self) {
 522        #[cfg(any(test, feature = "test-support"))]
 523        if let Timer::Deterministic(DeterministicTimer { state, id, .. }) = self {
 524            state
 525                .lock()
 526                .pending_timers
 527                .retain(|(timer_id, _, _)| timer_id != id)
 528        }
 529    }
 530}
 531
 532impl Future for Timer {
 533    type Output = ();
 534
 535    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
 536        match &mut *self {
 537            #[cfg(any(test, feature = "test-support"))]
 538            Self::Deterministic(DeterministicTimer { rx, .. }) => {
 539                use postage::stream::{PollRecv, Stream as _};
 540                smol::pin!(rx);
 541                match rx.poll_recv(&mut postage::Context::from_waker(cx.waker())) {
 542                    PollRecv::Ready(()) | PollRecv::Closed => Poll::Ready(()),
 543                    PollRecv::Pending => Poll::Pending,
 544                }
 545            }
 546            Self::Production(timer) => {
 547                smol::pin!(timer);
 548                match timer.poll(cx) {
 549                    Poll::Ready(_) => Poll::Ready(()),
 550                    Poll::Pending => Poll::Pending,
 551                }
 552            }
 553        }
 554    }
 555}
 556
 557#[cfg(any(test, feature = "test-support"))]
 558impl DeterministicState {
 559    fn push_to_history(&mut self, event: ExecutorEvent) {
 560        use std::fmt::Write as _;
 561
 562        self.poll_history.push(event);
 563        if let Some(prev_history) = &self.previous_poll_history {
 564            let ix = self.poll_history.len() - 1;
 565            let prev_event = prev_history[ix];
 566            if event != prev_event {
 567                let mut message = String::new();
 568                writeln!(
 569                    &mut message,
 570                    "current runnable backtrace:\n{:?}",
 571                    self.runnable_backtraces.get_mut(&event.id()).map(|trace| {
 572                        trace.resolve();
 573                        util::CwdBacktrace(trace)
 574                    })
 575                )
 576                .unwrap();
 577                writeln!(
 578                    &mut message,
 579                    "previous runnable backtrace:\n{:?}",
 580                    self.runnable_backtraces
 581                        .get_mut(&prev_event.id())
 582                        .map(|trace| {
 583                            trace.resolve();
 584                            util::CwdBacktrace(trace)
 585                        })
 586                )
 587                .unwrap();
 588                panic!("detected non-determinism after {ix}. {message}");
 589            }
 590        }
 591    }
 592
 593    fn will_park(&mut self) {
 594        if self.forbid_parking {
 595            let mut backtrace_message = String::new();
 596            #[cfg(any(test, feature = "test-support"))]
 597            if let Some(backtrace) = self.waiting_backtrace.as_mut() {
 598                backtrace.resolve();
 599                backtrace_message = format!(
 600                    "\nbacktrace of waiting future:\n{:?}",
 601                    util::CwdBacktrace(backtrace)
 602                );
 603            }
 604
 605            panic!(
 606                "deterministic executor parked after a call to forbid_parking{}",
 607                backtrace_message
 608            );
 609        }
 610    }
 611}
 612
 613#[cfg(any(test, feature = "test-support"))]
 614impl ExecutorEvent {
 615    pub fn id(&self) -> usize {
 616        match self {
 617            ExecutorEvent::PollRunnable { id } => *id,
 618            ExecutorEvent::EnqueuRunnable { id } => *id,
 619        }
 620    }
 621}
 622
 623impl Foreground {
 624    pub fn platform(dispatcher: Arc<dyn platform::Dispatcher>) -> Result<Self> {
 625        if dispatcher.is_main_thread() {
 626            Ok(Self::Platform {
 627                dispatcher,
 628                _not_send_or_sync: PhantomData,
 629            })
 630        } else {
 631            Err(anyhow!("must be constructed on main thread"))
 632        }
 633    }
 634
 635    pub fn spawn<T: 'static>(&self, future: impl Future<Output = T> + 'static) -> Task<T> {
 636        let future = any_local_future(future);
 637        let any_task = match self {
 638            #[cfg(any(test, feature = "test-support"))]
 639            Self::Deterministic { cx_id, executor } => {
 640                executor.spawn_from_foreground(*cx_id, future, false)
 641            }
 642            Self::Platform { dispatcher, .. } => {
 643                fn spawn_inner(
 644                    future: AnyLocalFuture,
 645                    dispatcher: &Arc<dyn Dispatcher>,
 646                ) -> AnyLocalTask {
 647                    let dispatcher = dispatcher.clone();
 648                    let schedule =
 649                        move |runnable: Runnable| dispatcher.run_on_main_thread(runnable);
 650                    let (runnable, task) = async_task::spawn_local(future, schedule);
 651                    runnable.schedule();
 652                    task
 653                }
 654                spawn_inner(future, dispatcher)
 655            }
 656        };
 657        Task::local(any_task)
 658    }
 659
 660    #[cfg(any(test, feature = "test-support"))]
 661    pub fn run<T: 'static>(&self, future: impl Future<Output = T>) -> T {
 662        let future = async move { Box::new(future.await) as Box<dyn Any> }.boxed_local();
 663        let result = match self {
 664            Self::Deterministic { cx_id, executor } => executor.run(*cx_id, future),
 665            Self::Platform { .. } => panic!("you can't call run on a platform foreground executor"),
 666        };
 667        *result.downcast().unwrap()
 668    }
 669
 670    #[cfg(any(test, feature = "test-support"))]
 671    pub fn run_until_parked(&self) {
 672        match self {
 673            Self::Deterministic { executor, .. } => executor.run_until_parked(),
 674            _ => panic!("this method can only be called on a deterministic executor"),
 675        }
 676    }
 677
 678    #[cfg(any(test, feature = "test-support"))]
 679    pub fn parking_forbidden(&self) -> bool {
 680        match self {
 681            Self::Deterministic { executor, .. } => executor.state.lock().forbid_parking,
 682            _ => panic!("this method can only be called on a deterministic executor"),
 683        }
 684    }
 685
 686    #[cfg(any(test, feature = "test-support"))]
 687    pub fn start_waiting(&self) {
 688        match self {
 689            Self::Deterministic { executor, .. } => executor.start_waiting(),
 690            _ => panic!("this method can only be called on a deterministic executor"),
 691        }
 692    }
 693
 694    #[cfg(any(test, feature = "test-support"))]
 695    pub fn finish_waiting(&self) {
 696        match self {
 697            Self::Deterministic { executor, .. } => executor.finish_waiting(),
 698            _ => panic!("this method can only be called on a deterministic executor"),
 699        }
 700    }
 701
 702    #[cfg(any(test, feature = "test-support"))]
 703    pub fn forbid_parking(&self) {
 704        match self {
 705            Self::Deterministic { executor, .. } => executor.forbid_parking(),
 706            _ => panic!("this method can only be called on a deterministic executor"),
 707        }
 708    }
 709
 710    #[cfg(any(test, feature = "test-support"))]
 711    pub fn allow_parking(&self) {
 712        match self {
 713            Self::Deterministic { executor, .. } => executor.allow_parking(),
 714            _ => panic!("this method can only be called on a deterministic executor"),
 715        }
 716    }
 717
 718    #[cfg(any(test, feature = "test-support"))]
 719    pub fn advance_clock(&self, duration: Duration) {
 720        match self {
 721            Self::Deterministic { executor, .. } => executor.advance_clock(duration),
 722            _ => panic!("this method can only be called on a deterministic executor"),
 723        }
 724    }
 725
 726    #[cfg(any(test, feature = "test-support"))]
 727    pub fn set_block_on_ticks(&self, range: std::ops::RangeInclusive<usize>) {
 728        match self {
 729            Self::Deterministic { executor, .. } => executor.state.lock().block_on_ticks = range,
 730            _ => panic!("this method can only be called on a deterministic executor"),
 731        }
 732    }
 733}
 734
 735impl Background {
 736    pub fn new() -> Self {
 737        let executor = Arc::new(Executor::new());
 738        let stop = channel::unbounded::<()>();
 739
 740        for i in 0..2 * num_cpus::get() {
 741            let executor = executor.clone();
 742            let stop = stop.1.clone();
 743            thread::Builder::new()
 744                .name(format!("background-executor-{}", i))
 745                .spawn(move || smol::block_on(executor.run(stop.recv())))
 746                .unwrap();
 747        }
 748
 749        Self::Production {
 750            executor,
 751            _stop: stop.0,
 752        }
 753    }
 754
 755    pub fn num_cpus(&self) -> usize {
 756        num_cpus::get()
 757    }
 758
 759    pub fn spawn<T, F>(&self, future: F) -> Task<T>
 760    where
 761        T: 'static + Send,
 762        F: Send + Future<Output = T> + 'static,
 763    {
 764        let future = any_future(future);
 765        let any_task = match self {
 766            Self::Production { executor, .. } => executor.spawn(future),
 767            #[cfg(any(test, feature = "test-support"))]
 768            Self::Deterministic { executor } => executor.spawn(future),
 769        };
 770        Task::send(any_task)
 771    }
 772
 773    pub fn block<F, T>(&self, future: F) -> T
 774    where
 775        F: Future<Output = T>,
 776    {
 777        smol::pin!(future);
 778        match self {
 779            Self::Production { .. } => smol::block_on(&mut future),
 780            #[cfg(any(test, feature = "test-support"))]
 781            Self::Deterministic { executor, .. } => {
 782                executor.block(&mut future, usize::MAX).unwrap()
 783            }
 784        }
 785    }
 786
 787    pub fn block_with_timeout<F, T>(
 788        &self,
 789        timeout: Duration,
 790        future: F,
 791    ) -> Result<T, impl Future<Output = T>>
 792    where
 793        T: 'static,
 794        F: 'static + Unpin + Future<Output = T>,
 795    {
 796        let mut future = any_local_future(future);
 797        if !timeout.is_zero() {
 798            let output = match self {
 799                Self::Production { .. } => smol::block_on(util::timeout(timeout, &mut future)).ok(),
 800                #[cfg(any(test, feature = "test-support"))]
 801                Self::Deterministic { executor, .. } => {
 802                    use rand::prelude::*;
 803                    let max_ticks = {
 804                        let mut state = executor.state.lock();
 805                        let range = state.block_on_ticks.clone();
 806                        state.rng.gen_range(range)
 807                    };
 808                    executor.block(&mut future, max_ticks)
 809                }
 810            };
 811            if let Some(output) = output {
 812                return Ok(*output.downcast().unwrap());
 813            }
 814        }
 815        Err(async { *future.await.downcast().unwrap() })
 816    }
 817
 818    pub async fn scoped<'scope, F>(self: &Arc<Self>, scheduler: F)
 819    where
 820        F: FnOnce(&mut Scope<'scope>),
 821    {
 822        let mut scope = Scope::new(self.clone());
 823        (scheduler)(&mut scope);
 824        let spawned = mem::take(&mut scope.futures)
 825            .into_iter()
 826            .map(|f| self.spawn(f))
 827            .collect::<Vec<_>>();
 828        for task in spawned {
 829            task.await;
 830        }
 831    }
 832
 833    pub fn timer(&self, duration: Duration) -> Timer {
 834        match self {
 835            Background::Production { .. } => Timer::Production(smol::Timer::after(duration)),
 836            #[cfg(any(test, feature = "test-support"))]
 837            Background::Deterministic { executor } => executor.timer(duration),
 838        }
 839    }
 840
 841    pub fn now(&self) -> std::time::Instant {
 842        match self {
 843            Background::Production { .. } => std::time::Instant::now(),
 844            #[cfg(any(test, feature = "test-support"))]
 845            Background::Deterministic { executor } => executor.now(),
 846        }
 847    }
 848
 849    #[cfg(any(test, feature = "test-support"))]
 850    pub fn rng<'a>(&'a self) -> impl 'a + std::ops::DerefMut<Target = rand::prelude::StdRng> {
 851        match self {
 852            Self::Deterministic { executor, .. } => {
 853                parking_lot::lock_api::MutexGuard::map(executor.state.lock(), |s| &mut s.rng)
 854            }
 855            _ => panic!("this method can only be called on a deterministic executor"),
 856        }
 857    }
 858
 859    #[cfg(any(test, feature = "test-support"))]
 860    pub async fn simulate_random_delay(&self) {
 861        match self {
 862            Self::Deterministic { executor, .. } => {
 863                executor.simulate_random_delay().await;
 864            }
 865            _ => {
 866                panic!("this method can only be called on a deterministic executor")
 867            }
 868        }
 869    }
 870
 871    #[cfg(any(test, feature = "test-support"))]
 872    pub fn record_backtrace(&self) {
 873        match self {
 874            Self::Deterministic { executor, .. } => executor.record_backtrace(),
 875            _ => {
 876                panic!("this method can only be called on a deterministic executor")
 877            }
 878        }
 879    }
 880
 881    #[cfg(any(test, feature = "test-support"))]
 882    pub fn start_waiting(&self) {
 883        match self {
 884            Self::Deterministic { executor, .. } => executor.start_waiting(),
 885            _ => panic!("this method can only be called on a deterministic executor"),
 886        }
 887    }
 888}
 889
 890impl Default for Background {
 891    fn default() -> Self {
 892        Self::new()
 893    }
 894}
 895
 896pub struct Scope<'a> {
 897    executor: Arc<Background>,
 898    futures: Vec<Pin<Box<dyn Future<Output = ()> + Send + 'static>>>,
 899    tx: Option<mpsc::Sender<()>>,
 900    rx: mpsc::Receiver<()>,
 901    _phantom: PhantomData<&'a ()>,
 902}
 903
 904impl<'a> Scope<'a> {
 905    fn new(executor: Arc<Background>) -> Self {
 906        let (tx, rx) = mpsc::channel(1);
 907        Self {
 908            executor,
 909            tx: Some(tx),
 910            rx,
 911            futures: Default::default(),
 912            _phantom: PhantomData,
 913        }
 914    }
 915
 916    pub fn spawn<F>(&mut self, f: F)
 917    where
 918        F: Future<Output = ()> + Send + 'a,
 919    {
 920        let tx = self.tx.clone().unwrap();
 921
 922        // Safety: The 'a lifetime is guaranteed to outlive any of these futures because
 923        // dropping this `Scope` blocks until all of the futures have resolved.
 924        let f = unsafe {
 925            mem::transmute::<
 926                Pin<Box<dyn Future<Output = ()> + Send + 'a>>,
 927                Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
 928            >(Box::pin(async move {
 929                f.await;
 930                drop(tx);
 931            }))
 932        };
 933        self.futures.push(f);
 934    }
 935}
 936
 937impl<'a> Drop for Scope<'a> {
 938    fn drop(&mut self) {
 939        self.tx.take().unwrap();
 940
 941        // Wait until the channel is closed, which means that all of the spawned
 942        // futures have resolved.
 943        self.executor.block(self.rx.next());
 944    }
 945}
 946
 947impl<T> Task<T> {
 948    pub fn ready(value: T) -> Self {
 949        Self::Ready(Some(value))
 950    }
 951
 952    fn local(any_task: AnyLocalTask) -> Self {
 953        Self::Local {
 954            any_task,
 955            result_type: PhantomData,
 956        }
 957    }
 958
 959    pub fn detach(self) {
 960        match self {
 961            Task::Ready(_) => {}
 962            Task::Local { any_task, .. } => any_task.detach(),
 963            Task::Send { any_task, .. } => any_task.detach(),
 964        }
 965    }
 966}
 967
 968impl<T: 'static, E: 'static + Display> Task<Result<T, E>> {
 969    #[track_caller]
 970    pub fn detach_and_log_err(self, cx: &mut AppContext) {
 971        let caller = Location::caller();
 972        cx.spawn(|_| async move {
 973            if let Err(err) = self.await {
 974                log::error!("{}:{}: {:#}", caller.file(), caller.line(), err);
 975            }
 976        })
 977        .detach();
 978    }
 979}
 980
 981impl<T: Send> Task<T> {
 982    fn send(any_task: AnyTask) -> Self {
 983        Self::Send {
 984            any_task,
 985            result_type: PhantomData,
 986        }
 987    }
 988}
 989
 990impl<T: fmt::Debug> fmt::Debug for Task<T> {
 991    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
 992        match self {
 993            Task::Ready(value) => value.fmt(f),
 994            Task::Local { any_task, .. } => any_task.fmt(f),
 995            Task::Send { any_task, .. } => any_task.fmt(f),
 996        }
 997    }
 998}
 999
1000impl<T: 'static> Future for Task<T> {
1001    type Output = T;
1002
1003    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1004        match unsafe { self.get_unchecked_mut() } {
1005            Task::Ready(value) => Poll::Ready(value.take().unwrap()),
1006            Task::Local { any_task, .. } => {
1007                any_task.poll(cx).map(|value| *value.downcast().unwrap())
1008            }
1009            Task::Send { any_task, .. } => {
1010                any_task.poll(cx).map(|value| *value.downcast().unwrap())
1011            }
1012        }
1013    }
1014}
1015
1016fn any_future<T, F>(future: F) -> AnyFuture
1017where
1018    T: 'static + Send,
1019    F: Future<Output = T> + Send + 'static,
1020{
1021    async { Box::new(future.await) as Box<dyn Any + Send> }.boxed()
1022}
1023
1024fn any_local_future<T, F>(future: F) -> AnyLocalFuture
1025where
1026    T: 'static,
1027    F: Future<Output = T> + 'static,
1028{
1029    async { Box::new(future.await) as Box<dyn Any> }.boxed_local()
1030}