executor.rs

   1use crate::util;
   2use crate::PlatformDispatcher;
   3use anyhow::{anyhow, Result};
   4use async_task::Runnable;
   5use futures::channel::{mpsc, oneshot};
   6use smol::{channel, prelude::*, Executor};
   7use std::{
   8    any::Any,
   9    fmt::{self},
  10    marker::PhantomData,
  11    mem,
  12    pin::Pin,
  13    rc::Rc,
  14    sync::Arc,
  15    task::{Context, Poll},
  16    thread,
  17    time::Duration,
  18};
  19
  20/// Enqueues the given closure to be run on the application's event loop. Can be
  21/// called on any thread.
  22pub(crate) fn spawn_on_main<R>(
  23    dispatcher: Arc<dyn PlatformDispatcher>,
  24    future: impl Future<Output = R> + Send + 'static,
  25) -> impl Future<Output = R>
  26where
  27    R: Send + 'static,
  28{
  29    let (tx, rx) = oneshot::channel();
  30    let (runnable, task) = async_task::spawn(
  31        async move {
  32            let result = future.await;
  33            let _ = tx.send(result);
  34        },
  35        move |runnable| dispatcher.run_on_main_thread(runnable),
  36    );
  37    runnable.schedule();
  38    task.detach();
  39    async move { rx.await.unwrap() }
  40}
  41
  42/// Enqueues the given closure to be run on the application's event loop. Must
  43/// be called on the main thread.
  44pub(crate) fn spawn_on_main_local<R>(
  45    dispatcher: Arc<dyn PlatformDispatcher>,
  46    future: impl Future<Output = R> + 'static,
  47) -> impl Future<Output = R>
  48where
  49    R: 'static,
  50{
  51    assert!(dispatcher.is_main_thread(), "must be called on main thread");
  52
  53    let (tx, rx) = oneshot::channel();
  54    let (runnable, task) = async_task::spawn_local(
  55        async move {
  56            let result = future.await;
  57            let _ = tx.send(result);
  58        },
  59        move |runnable| dispatcher.run_on_main_thread(runnable),
  60    );
  61    runnable.schedule();
  62    task.detach();
  63    async move { rx.await.unwrap() }
  64}
  65
  66pub enum ForegroundExecutor {
  67    Platform {
  68        dispatcher: Arc<dyn PlatformDispatcher>,
  69        _not_send_or_sync: PhantomData<Rc<()>>,
  70    },
  71    #[cfg(any(test, feature = "test"))]
  72    Deterministic {
  73        cx_id: usize,
  74        executor: Arc<Deterministic>,
  75    },
  76}
  77
  78pub enum BackgroundExecutor {
  79    #[cfg(any(test, feature = "test"))]
  80    Deterministic { executor: Arc<Deterministic> },
  81    Production {
  82        executor: Arc<smol::Executor<'static>>,
  83        _stop: channel::Sender<()>,
  84    },
  85}
  86
  87type AnyLocalFuture = Pin<Box<dyn 'static + Future<Output = Box<dyn Any + 'static>>>>;
  88type AnyFuture = Pin<Box<dyn 'static + Send + Future<Output = Box<dyn Any + Send + 'static>>>>;
  89type AnyTask = async_task::Task<Box<dyn Any + Send + 'static>>;
  90type AnyLocalTask = async_task::Task<Box<dyn Any + 'static>>;
  91
  92#[must_use]
  93pub enum Task<T> {
  94    Ready(Option<T>),
  95    Local {
  96        any_task: AnyLocalTask,
  97        result_type: PhantomData<T>,
  98    },
  99    Send {
 100        any_task: AnyTask,
 101        result_type: PhantomData<T>,
 102    },
 103}
 104
 105unsafe impl<T: Send> Send for Task<T> {}
 106
 107#[cfg(any(test, feature = "test"))]
 108struct DeterministicState {
 109    rng: rand::prelude::StdRng,
 110    seed: u64,
 111    scheduled_from_foreground: collections::HashMap<usize, Vec<ForegroundRunnable>>,
 112    scheduled_from_background: Vec<BackgroundRunnable>,
 113    forbid_parking: bool,
 114    block_on_ticks: std::ops::RangeInclusive<usize>,
 115    now: std::time::Instant,
 116    next_timer_id: usize,
 117    pending_timers: Vec<(usize, std::time::Instant, postage::barrier::Sender)>,
 118    waiting_backtrace: Option<backtrace::Backtrace>,
 119    next_runnable_id: usize,
 120    poll_history: Vec<ExecutorEvent>,
 121    previous_poll_history: Option<Vec<ExecutorEvent>>,
 122    enable_runnable_backtraces: bool,
 123    runnable_backtraces: collections::HashMap<usize, backtrace::Backtrace>,
 124}
 125
 126#[derive(Copy, Clone, Debug, PartialEq, Eq)]
 127pub enum ExecutorEvent {
 128    PollRunnable { id: usize },
 129    EnqueuRunnable { id: usize },
 130}
 131
 132#[cfg(any(test, feature = "test"))]
 133struct ForegroundRunnable {
 134    id: usize,
 135    runnable: Runnable,
 136    main: bool,
 137}
 138
 139#[cfg(any(test, feature = "test"))]
 140struct BackgroundRunnable {
 141    id: usize,
 142    runnable: Runnable,
 143}
 144
 145#[cfg(any(test, feature = "test"))]
 146pub struct Deterministic {
 147    state: Arc<parking_lot::Mutex<DeterministicState>>,
 148    parker: parking_lot::Mutex<parking::Parker>,
 149}
 150
 151#[must_use]
 152pub enum Timer {
 153    Production(smol::Timer),
 154    #[cfg(any(test, feature = "test"))]
 155    Deterministic(DeterministicTimer),
 156}
 157
 158#[cfg(any(test, feature = "test"))]
 159pub struct DeterministicTimer {
 160    rx: postage::barrier::Receiver,
 161    id: usize,
 162    state: Arc<parking_lot::Mutex<DeterministicState>>,
 163}
 164
 165#[cfg(any(test, feature = "test"))]
 166impl Deterministic {
 167    pub fn new(seed: u64) -> Arc<Self> {
 168        use rand::prelude::*;
 169
 170        Arc::new(Self {
 171            state: Arc::new(parking_lot::Mutex::new(DeterministicState {
 172                rng: StdRng::seed_from_u64(seed),
 173                seed,
 174                scheduled_from_foreground: Default::default(),
 175                scheduled_from_background: Default::default(),
 176                forbid_parking: false,
 177                block_on_ticks: 0..=1000,
 178                now: std::time::Instant::now(),
 179                next_timer_id: Default::default(),
 180                pending_timers: Default::default(),
 181                waiting_backtrace: None,
 182                next_runnable_id: 0,
 183                poll_history: Default::default(),
 184                previous_poll_history: Default::default(),
 185                enable_runnable_backtraces: false,
 186                runnable_backtraces: Default::default(),
 187            })),
 188            parker: Default::default(),
 189        })
 190    }
 191
 192    pub fn execution_history(&self) -> Vec<ExecutorEvent> {
 193        self.state.lock().poll_history.clone()
 194    }
 195
 196    pub fn set_previous_execution_history(&self, history: Option<Vec<ExecutorEvent>>) {
 197        self.state.lock().previous_poll_history = history;
 198    }
 199
 200    pub fn enable_runnable_backtrace(&self) {
 201        self.state.lock().enable_runnable_backtraces = true;
 202    }
 203
 204    pub fn runnable_backtrace(&self, runnable_id: usize) -> backtrace::Backtrace {
 205        let mut backtrace = self.state.lock().runnable_backtraces[&runnable_id].clone();
 206        backtrace.resolve();
 207        backtrace
 208    }
 209
 210    pub fn build_background(self: &Arc<Self>) -> Arc<BackgroundExecutor> {
 211        Arc::new(BackgroundExecutor::Deterministic {
 212            executor: self.clone(),
 213        })
 214    }
 215
 216    pub fn build_foreground(self: &Arc<Self>, id: usize) -> Rc<ForegroundExecutor> {
 217        Rc::new(ForegroundExecutor::Deterministic {
 218            cx_id: id,
 219            executor: self.clone(),
 220        })
 221    }
 222
 223    fn spawn_from_foreground(
 224        &self,
 225        cx_id: usize,
 226        future: AnyLocalFuture,
 227        main: bool,
 228    ) -> AnyLocalTask {
 229        let state = self.state.clone();
 230        let id;
 231        {
 232            let mut state = state.lock();
 233            id = util::post_inc(&mut state.next_runnable_id);
 234            if state.enable_runnable_backtraces {
 235                state
 236                    .runnable_backtraces
 237                    .insert(id, backtrace::Backtrace::new_unresolved());
 238            }
 239        }
 240
 241        let unparker = self.parker.lock().unparker();
 242        let (runnable, task) = async_task::spawn_local(future, move |runnable| {
 243            let mut state = state.lock();
 244            state.push_to_history(ExecutorEvent::EnqueuRunnable { id });
 245            state
 246                .scheduled_from_foreground
 247                .entry(cx_id)
 248                .or_default()
 249                .push(ForegroundRunnable { id, runnable, main });
 250            unparker.unpark();
 251        });
 252        runnable.schedule();
 253        task
 254    }
 255
 256    fn spawn(&self, future: AnyFuture) -> AnyTask {
 257        let state = self.state.clone();
 258        let id;
 259        {
 260            let mut state = state.lock();
 261            id = util::post_inc(&mut state.next_runnable_id);
 262            if state.enable_runnable_backtraces {
 263                state
 264                    .runnable_backtraces
 265                    .insert(id, backtrace::Backtrace::new_unresolved());
 266            }
 267        }
 268
 269        let unparker = self.parker.lock().unparker();
 270        let (runnable, task) = async_task::spawn(future, move |runnable| {
 271            let mut state = state.lock();
 272            state
 273                .poll_history
 274                .push(ExecutorEvent::EnqueuRunnable { id });
 275            state
 276                .scheduled_from_background
 277                .push(BackgroundRunnable { id, runnable });
 278            unparker.unpark();
 279        });
 280        runnable.schedule();
 281        task
 282    }
 283
 284    fn run<'a>(
 285        &self,
 286        cx_id: usize,
 287        main_future: Pin<Box<dyn 'a + Future<Output = Box<dyn Any>>>>,
 288    ) -> Box<dyn Any> {
 289        use std::sync::atomic::{AtomicBool, Ordering::SeqCst};
 290
 291        let woken = Arc::new(AtomicBool::new(false));
 292
 293        let state = self.state.clone();
 294        let id;
 295        {
 296            let mut state = state.lock();
 297            id = util::post_inc(&mut state.next_runnable_id);
 298            if state.enable_runnable_backtraces {
 299                state
 300                    .runnable_backtraces
 301                    .insert(id, backtrace::Backtrace::new_unresolved());
 302            }
 303        }
 304
 305        let unparker = self.parker.lock().unparker();
 306        let (runnable, mut main_task) = unsafe {
 307            async_task::spawn_unchecked(main_future, move |runnable| {
 308                let state = &mut *state.lock();
 309                state
 310                    .scheduled_from_foreground
 311                    .entry(cx_id)
 312                    .or_default()
 313                    .push(ForegroundRunnable {
 314                        id: util::post_inc(&mut state.next_runnable_id),
 315                        runnable,
 316                        main: true,
 317                    });
 318                unparker.unpark();
 319            })
 320        };
 321        runnable.schedule();
 322
 323        loop {
 324            if let Some(result) = self.run_internal(woken.clone(), Some(&mut main_task)) {
 325                return result;
 326            }
 327
 328            if !woken.load(SeqCst) {
 329                self.state.lock().will_park();
 330            }
 331
 332            woken.store(false, SeqCst);
 333            self.parker.lock().park();
 334        }
 335    }
 336
 337    pub fn run_until_parked(&self) {
 338        use std::sync::atomic::AtomicBool;
 339        let woken = Arc::new(AtomicBool::new(false));
 340        self.run_internal(woken, None);
 341    }
 342
 343    fn run_internal(
 344        &self,
 345        woken: Arc<std::sync::atomic::AtomicBool>,
 346        mut main_task: Option<&mut AnyLocalTask>,
 347    ) -> Option<Box<dyn Any>> {
 348        use rand::prelude::*;
 349        use std::sync::atomic::Ordering::SeqCst;
 350
 351        let unparker = self.parker.lock().unparker();
 352        let waker = waker_fn::waker_fn(move || {
 353            woken.store(true, SeqCst);
 354            unparker.unpark();
 355        });
 356
 357        let mut cx = Context::from_waker(&waker);
 358        loop {
 359            let mut state = self.state.lock();
 360
 361            if state.scheduled_from_foreground.is_empty()
 362                && state.scheduled_from_background.is_empty()
 363            {
 364                if let Some(main_task) = main_task {
 365                    if let Poll::Ready(result) = main_task.poll(&mut cx) {
 366                        return Some(result);
 367                    }
 368                }
 369
 370                return None;
 371            }
 372
 373            if !state.scheduled_from_background.is_empty() && state.rng.gen() {
 374                let background_len = state.scheduled_from_background.len();
 375                let ix = state.rng.gen_range(0..background_len);
 376                let background_runnable = state.scheduled_from_background.remove(ix);
 377                state.push_to_history(ExecutorEvent::PollRunnable {
 378                    id: background_runnable.id,
 379                });
 380                drop(state);
 381                background_runnable.runnable.run();
 382            } else if !state.scheduled_from_foreground.is_empty() {
 383                let available_cx_ids = state
 384                    .scheduled_from_foreground
 385                    .keys()
 386                    .copied()
 387                    .collect::<Vec<_>>();
 388                let cx_id_to_run = *available_cx_ids.iter().choose(&mut state.rng).unwrap();
 389                let scheduled_from_cx = state
 390                    .scheduled_from_foreground
 391                    .get_mut(&cx_id_to_run)
 392                    .unwrap();
 393                let foreground_runnable = scheduled_from_cx.remove(0);
 394                if scheduled_from_cx.is_empty() {
 395                    state.scheduled_from_foreground.remove(&cx_id_to_run);
 396                }
 397                state.push_to_history(ExecutorEvent::PollRunnable {
 398                    id: foreground_runnable.id,
 399                });
 400
 401                drop(state);
 402
 403                foreground_runnable.runnable.run();
 404                if let Some(main_task) = main_task.as_mut() {
 405                    if foreground_runnable.main {
 406                        if let Poll::Ready(result) = main_task.poll(&mut cx) {
 407                            return Some(result);
 408                        }
 409                    }
 410                }
 411            }
 412        }
 413    }
 414
 415    fn block<F, T>(&self, future: &mut F, max_ticks: usize) -> Option<T>
 416    where
 417        F: Unpin + Future<Output = T>,
 418    {
 419        use rand::prelude::*;
 420
 421        let unparker = self.parker.lock().unparker();
 422        let waker = waker_fn::waker_fn(move || {
 423            unparker.unpark();
 424        });
 425
 426        let mut cx = Context::from_waker(&waker);
 427        for _ in 0..max_ticks {
 428            let mut state = self.state.lock();
 429            let runnable_count = state.scheduled_from_background.len();
 430            let ix = state.rng.gen_range(0..=runnable_count);
 431            if ix < state.scheduled_from_background.len() {
 432                let background_runnable = state.scheduled_from_background.remove(ix);
 433                state.push_to_history(ExecutorEvent::PollRunnable {
 434                    id: background_runnable.id,
 435                });
 436                drop(state);
 437                background_runnable.runnable.run();
 438            } else {
 439                drop(state);
 440                if let Poll::Ready(result) = future.poll(&mut cx) {
 441                    return Some(result);
 442                }
 443                let mut state = self.state.lock();
 444                if state.scheduled_from_background.is_empty() {
 445                    state.will_park();
 446                    drop(state);
 447                    self.parker.lock().park();
 448                }
 449
 450                continue;
 451            }
 452        }
 453
 454        None
 455    }
 456
 457    pub fn timer(&self, duration: Duration) -> Timer {
 458        let (tx, rx) = postage::barrier::channel();
 459        let mut state = self.state.lock();
 460        let wakeup_at = state.now + duration;
 461        let id = util::post_inc(&mut state.next_timer_id);
 462        match state
 463            .pending_timers
 464            .binary_search_by_key(&wakeup_at, |e| e.1)
 465        {
 466            Ok(ix) | Err(ix) => state.pending_timers.insert(ix, (id, wakeup_at, tx)),
 467        }
 468        let state = self.state.clone();
 469        Timer::Deterministic(DeterministicTimer { rx, id, state })
 470    }
 471
 472    pub fn now(&self) -> std::time::Instant {
 473        let state = self.state.lock();
 474        state.now
 475    }
 476
 477    pub fn advance_clock(&self, duration: Duration) {
 478        let new_now = self.state.lock().now + duration;
 479        loop {
 480            self.run_until_parked();
 481            let mut state = self.state.lock();
 482
 483            if let Some((_, wakeup_time, _)) = state.pending_timers.first() {
 484                let wakeup_time = *wakeup_time;
 485                if wakeup_time <= new_now {
 486                    let timer_count = state
 487                        .pending_timers
 488                        .iter()
 489                        .take_while(|(_, t, _)| *t == wakeup_time)
 490                        .count();
 491                    state.now = wakeup_time;
 492                    let timers_to_wake = state
 493                        .pending_timers
 494                        .drain(0..timer_count)
 495                        .collect::<Vec<_>>();
 496                    drop(state);
 497                    drop(timers_to_wake);
 498                    continue;
 499                }
 500            }
 501
 502            break;
 503        }
 504
 505        self.state.lock().now = new_now;
 506    }
 507
 508    pub fn start_waiting(&self) {
 509        self.state.lock().waiting_backtrace = Some(backtrace::Backtrace::new_unresolved());
 510    }
 511
 512    pub fn finish_waiting(&self) {
 513        self.state.lock().waiting_backtrace.take();
 514    }
 515
 516    pub fn forbid_parking(&self) {
 517        use rand::prelude::*;
 518
 519        let mut state = self.state.lock();
 520        state.forbid_parking = true;
 521        state.rng = StdRng::seed_from_u64(state.seed);
 522    }
 523
 524    pub fn allow_parking(&self) {
 525        use rand::prelude::*;
 526
 527        let mut state = self.state.lock();
 528        state.forbid_parking = false;
 529        state.rng = StdRng::seed_from_u64(state.seed);
 530    }
 531
 532    pub async fn simulate_random_delay(&self) {
 533        use rand::prelude::*;
 534        use smol::future::yield_now;
 535        if self.state.lock().rng.gen_bool(0.2) {
 536            let yields = self.state.lock().rng.gen_range(1..=10);
 537            for _ in 0..yields {
 538                yield_now().await;
 539            }
 540        }
 541    }
 542
 543    pub fn record_backtrace(&self) {
 544        let mut state = self.state.lock();
 545        if state.enable_runnable_backtraces {
 546            let current_id = state
 547                .poll_history
 548                .iter()
 549                .rev()
 550                .find_map(|event| match event {
 551                    ExecutorEvent::PollRunnable { id } => Some(*id),
 552                    _ => None,
 553                });
 554            if let Some(id) = current_id {
 555                state
 556                    .runnable_backtraces
 557                    .insert(id, backtrace::Backtrace::new_unresolved());
 558            }
 559        }
 560    }
 561}
 562
 563impl Drop for Timer {
 564    fn drop(&mut self) {
 565        #[cfg(any(test, feature = "test"))]
 566        if let Timer::Deterministic(DeterministicTimer { state, id, .. }) = self {
 567            state
 568                .lock()
 569                .pending_timers
 570                .retain(|(timer_id, _, _)| timer_id != id)
 571        }
 572    }
 573}
 574
 575impl Future for Timer {
 576    type Output = ();
 577
 578    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
 579        match &mut *self {
 580            #[cfg(any(test, feature = "test"))]
 581            Self::Deterministic(DeterministicTimer { rx, .. }) => {
 582                use postage::stream::{PollRecv, Stream as _};
 583                smol::pin!(rx);
 584                match rx.poll_recv(&mut postage::Context::from_waker(cx.waker())) {
 585                    PollRecv::Ready(()) | PollRecv::Closed => Poll::Ready(()),
 586                    PollRecv::Pending => Poll::Pending,
 587                }
 588            }
 589            Self::Production(timer) => {
 590                smol::pin!(timer);
 591                match timer.poll(cx) {
 592                    Poll::Ready(_) => Poll::Ready(()),
 593                    Poll::Pending => Poll::Pending,
 594                }
 595            }
 596        }
 597    }
 598}
 599
 600#[cfg(any(test, feature = "test"))]
 601impl DeterministicState {
 602    fn push_to_history(&mut self, event: ExecutorEvent) {
 603        use std::fmt::Write as _;
 604
 605        self.poll_history.push(event);
 606        if let Some(prev_history) = &self.previous_poll_history {
 607            let ix = self.poll_history.len() - 1;
 608            let prev_event = prev_history[ix];
 609            if event != prev_event {
 610                let mut message = String::new();
 611                writeln!(
 612                    &mut message,
 613                    "current runnable backtrace:\n{:?}",
 614                    self.runnable_backtraces.get_mut(&event.id()).map(|trace| {
 615                        trace.resolve();
 616                        crate::util::CwdBacktrace(trace)
 617                    })
 618                )
 619                .unwrap();
 620                writeln!(
 621                    &mut message,
 622                    "previous runnable backtrace:\n{:?}",
 623                    self.runnable_backtraces
 624                        .get_mut(&prev_event.id())
 625                        .map(|trace| {
 626                            trace.resolve();
 627                            util::CwdBacktrace(trace)
 628                        })
 629                )
 630                .unwrap();
 631                panic!("detected non-determinism after {ix}. {message}");
 632            }
 633        }
 634    }
 635
 636    fn will_park(&mut self) {
 637        if self.forbid_parking {
 638            let mut backtrace_message = String::new();
 639            #[cfg(any(test, feature = "test"))]
 640            if let Some(backtrace) = self.waiting_backtrace.as_mut() {
 641                backtrace.resolve();
 642                backtrace_message = format!(
 643                    "\nbacktrace of waiting future:\n{:?}",
 644                    util::CwdBacktrace(backtrace)
 645                );
 646            }
 647
 648            panic!(
 649                "deterministic executor parked after a call to forbid_parking{}",
 650                backtrace_message
 651            );
 652        }
 653    }
 654}
 655
 656#[cfg(any(test, feature = "test"))]
 657impl ExecutorEvent {
 658    pub fn id(&self) -> usize {
 659        match self {
 660            ExecutorEvent::PollRunnable { id } => *id,
 661            ExecutorEvent::EnqueuRunnable { id } => *id,
 662        }
 663    }
 664}
 665
 666impl ForegroundExecutor {
 667    pub fn new(dispatcher: Arc<dyn PlatformDispatcher>) -> Result<Self> {
 668        if dispatcher.is_main_thread() {
 669            Ok(Self::Platform {
 670                dispatcher,
 671                _not_send_or_sync: PhantomData,
 672            })
 673        } else {
 674            Err(anyhow!("must be constructed on main thread"))
 675        }
 676    }
 677
 678    pub fn spawn<T: 'static>(&self, future: impl Future<Output = T> + 'static) -> Task<T> {
 679        let future = any_local_future(future);
 680        let any_task = match self {
 681            #[cfg(any(test, feature = "test"))]
 682            Self::Deterministic { cx_id, executor } => {
 683                executor.spawn_from_foreground(*cx_id, future, false)
 684            }
 685            Self::Platform { dispatcher, .. } => {
 686                fn spawn_inner(
 687                    future: AnyLocalFuture,
 688                    dispatcher: &Arc<dyn PlatformDispatcher>,
 689                ) -> AnyLocalTask {
 690                    let dispatcher = dispatcher.clone();
 691                    let schedule =
 692                        move |runnable: Runnable| dispatcher.run_on_main_thread(runnable);
 693                    let (runnable, task) = async_task::spawn_local(future, schedule);
 694                    runnable.schedule();
 695                    task
 696                }
 697                spawn_inner(future, dispatcher)
 698            }
 699        };
 700        Task::local(any_task)
 701    }
 702
 703    #[cfg(any(test, feature = "test"))]
 704    pub fn run<T: 'static>(&self, future: impl Future<Output = T>) -> T {
 705        let future = async move { Box::new(future.await) as Box<dyn Any> }.boxed_local();
 706        let result = match self {
 707            Self::Deterministic { cx_id, executor } => executor.run(*cx_id, future),
 708            Self::Platform { .. } => panic!("you can't call run on a platform foreground executor"),
 709        };
 710        *result.downcast().unwrap()
 711    }
 712
 713    #[cfg(any(test, feature = "test"))]
 714    pub fn run_until_parked(&self) {
 715        match self {
 716            Self::Deterministic { executor, .. } => executor.run_until_parked(),
 717            _ => panic!("this method can only be called on a deterministic executor"),
 718        }
 719    }
 720
 721    #[cfg(any(test, feature = "test"))]
 722    pub fn parking_forbidden(&self) -> bool {
 723        match self {
 724            Self::Deterministic { executor, .. } => executor.state.lock().forbid_parking,
 725            _ => panic!("this method can only be called on a deterministic executor"),
 726        }
 727    }
 728
 729    #[cfg(any(test, feature = "test"))]
 730    pub fn start_waiting(&self) {
 731        match self {
 732            Self::Deterministic { executor, .. } => executor.start_waiting(),
 733            _ => panic!("this method can only be called on a deterministic executor"),
 734        }
 735    }
 736
 737    #[cfg(any(test, feature = "test"))]
 738    pub fn finish_waiting(&self) {
 739        match self {
 740            Self::Deterministic { executor, .. } => executor.finish_waiting(),
 741            _ => panic!("this method can only be called on a deterministic executor"),
 742        }
 743    }
 744
 745    #[cfg(any(test, feature = "test"))]
 746    pub fn forbid_parking(&self) {
 747        match self {
 748            Self::Deterministic { executor, .. } => executor.forbid_parking(),
 749            _ => panic!("this method can only be called on a deterministic executor"),
 750        }
 751    }
 752
 753    #[cfg(any(test, feature = "test"))]
 754    pub fn allow_parking(&self) {
 755        match self {
 756            Self::Deterministic { executor, .. } => executor.allow_parking(),
 757            _ => panic!("this method can only be called on a deterministic executor"),
 758        }
 759    }
 760
 761    #[cfg(any(test, feature = "test"))]
 762    pub fn advance_clock(&self, duration: Duration) {
 763        match self {
 764            Self::Deterministic { executor, .. } => executor.advance_clock(duration),
 765            _ => panic!("this method can only be called on a deterministic executor"),
 766        }
 767    }
 768
 769    #[cfg(any(test, feature = "test"))]
 770    pub fn set_block_on_ticks(&self, range: std::ops::RangeInclusive<usize>) {
 771        match self {
 772            Self::Deterministic { executor, .. } => executor.state.lock().block_on_ticks = range,
 773            _ => panic!("this method can only be called on a deterministic executor"),
 774        }
 775    }
 776}
 777
 778impl BackgroundExecutor {
 779    pub fn new() -> Self {
 780        let executor = Arc::new(Executor::new());
 781        let stop = channel::unbounded::<()>();
 782
 783        for i in 0..2 * num_cpus::get() {
 784            let executor = executor.clone();
 785            let stop = stop.1.clone();
 786            thread::Builder::new()
 787                .name(format!("background-executor-{}", i))
 788                .spawn(move || smol::block_on(executor.run(stop.recv())))
 789                .unwrap();
 790        }
 791
 792        Self::Production {
 793            executor,
 794            _stop: stop.0,
 795        }
 796    }
 797
 798    pub fn num_cpus(&self) -> usize {
 799        num_cpus::get()
 800    }
 801
 802    pub fn spawn<T, F>(&self, future: F) -> Task<T>
 803    where
 804        T: 'static + Send,
 805        F: Send + Future<Output = T> + 'static,
 806    {
 807        let future = any_future(future);
 808        let any_task = match self {
 809            Self::Production { executor, .. } => executor.spawn(future),
 810            #[cfg(any(test, feature = "test"))]
 811            Self::Deterministic { executor } => executor.spawn(future),
 812        };
 813        Task::send(any_task)
 814    }
 815
 816    pub fn block<F, T>(&self, future: F) -> T
 817    where
 818        F: Future<Output = T>,
 819    {
 820        smol::pin!(future);
 821        match self {
 822            Self::Production { .. } => smol::block_on(&mut future),
 823            #[cfg(any(test, feature = "test"))]
 824            Self::Deterministic { executor, .. } => {
 825                executor.block(&mut future, usize::MAX).unwrap()
 826            }
 827        }
 828    }
 829
 830    pub fn block_with_timeout<F, T>(
 831        &self,
 832        timeout: Duration,
 833        future: F,
 834    ) -> Result<T, impl Future<Output = T>>
 835    where
 836        T: 'static,
 837        F: 'static + Unpin + Future<Output = T>,
 838    {
 839        let mut future = any_local_future(future);
 840        if !timeout.is_zero() {
 841            let output = match self {
 842                Self::Production { .. } => smol::block_on(util::timeout(timeout, &mut future)).ok(),
 843                #[cfg(any(test, feature = "test"))]
 844                Self::Deterministic { executor, .. } => {
 845                    use rand::prelude::*;
 846                    let max_ticks = {
 847                        let mut state = executor.state.lock();
 848                        let range = state.block_on_ticks.clone();
 849                        state.rng.gen_range(range)
 850                    };
 851                    executor.block(&mut future, max_ticks)
 852                }
 853            };
 854            if let Some(output) = output {
 855                return Ok(*output.downcast().unwrap());
 856            }
 857        }
 858        Err(async { *future.await.downcast().unwrap() })
 859    }
 860
 861    pub async fn scoped<'scope, F>(self: &Arc<Self>, scheduler: F)
 862    where
 863        F: FnOnce(&mut Scope<'scope>),
 864    {
 865        let mut scope = Scope::new(self.clone());
 866        (scheduler)(&mut scope);
 867        let spawned = mem::take(&mut scope.futures)
 868            .into_iter()
 869            .map(|f| self.spawn(f))
 870            .collect::<Vec<_>>();
 871        for task in spawned {
 872            task.await;
 873        }
 874    }
 875
 876    pub fn timer(&self, duration: Duration) -> Timer {
 877        match self {
 878            BackgroundExecutor::Production { .. } => {
 879                Timer::Production(smol::Timer::after(duration))
 880            }
 881            #[cfg(any(test, feature = "test"))]
 882            BackgroundExecutor::Deterministic { executor } => executor.timer(duration),
 883        }
 884    }
 885
 886    pub fn now(&self) -> std::time::Instant {
 887        match self {
 888            BackgroundExecutor::Production { .. } => std::time::Instant::now(),
 889            #[cfg(any(test, feature = "test"))]
 890            BackgroundExecutor::Deterministic { executor } => executor.now(),
 891        }
 892    }
 893
 894    #[cfg(any(test, feature = "test"))]
 895    pub fn rng<'a>(&'a self) -> impl 'a + std::ops::DerefMut<Target = rand::prelude::StdRng> {
 896        match self {
 897            Self::Deterministic { executor, .. } => {
 898                parking_lot::lock_api::MutexGuard::map(executor.state.lock(), |s| &mut s.rng)
 899            }
 900            _ => panic!("this method can only be called on a deterministic executor"),
 901        }
 902    }
 903
 904    #[cfg(any(test, feature = "test"))]
 905    pub async fn simulate_random_delay(&self) {
 906        match self {
 907            Self::Deterministic { executor, .. } => {
 908                executor.simulate_random_delay().await;
 909            }
 910            _ => {
 911                panic!("this method can only be called on a deterministic executor")
 912            }
 913        }
 914    }
 915
 916    #[cfg(any(test, feature = "test"))]
 917    pub fn record_backtrace(&self) {
 918        match self {
 919            Self::Deterministic { executor, .. } => executor.record_backtrace(),
 920            _ => {
 921                panic!("this method can only be called on a deterministic executor")
 922            }
 923        }
 924    }
 925
 926    #[cfg(any(test, feature = "test"))]
 927    pub fn start_waiting(&self) {
 928        match self {
 929            Self::Deterministic { executor, .. } => executor.start_waiting(),
 930            _ => panic!("this method can only be called on a deterministic executor"),
 931        }
 932    }
 933}
 934
 935impl Default for BackgroundExecutor {
 936    fn default() -> Self {
 937        Self::new()
 938    }
 939}
 940
 941pub struct Scope<'a> {
 942    executor: Arc<BackgroundExecutor>,
 943    futures: Vec<Pin<Box<dyn Future<Output = ()> + Send + 'static>>>,
 944    tx: Option<mpsc::Sender<()>>,
 945    rx: mpsc::Receiver<()>,
 946    _phantom: PhantomData<&'a ()>,
 947}
 948
 949impl<'a> Scope<'a> {
 950    fn new(executor: Arc<BackgroundExecutor>) -> Self {
 951        let (tx, rx) = mpsc::channel(1);
 952        Self {
 953            executor,
 954            tx: Some(tx),
 955            rx,
 956            futures: Default::default(),
 957            _phantom: PhantomData,
 958        }
 959    }
 960
 961    pub fn spawn<F>(&mut self, f: F)
 962    where
 963        F: Future<Output = ()> + Send + 'a,
 964    {
 965        let tx = self.tx.clone().unwrap();
 966
 967        // Safety: The 'a lifetime is guaranteed to outlive any of these futures because
 968        // dropping this `Scope` blocks until all of the futures have resolved.
 969        let f = unsafe {
 970            mem::transmute::<
 971                Pin<Box<dyn Future<Output = ()> + Send + 'a>>,
 972                Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
 973            >(Box::pin(async move {
 974                f.await;
 975                drop(tx);
 976            }))
 977        };
 978        self.futures.push(f);
 979    }
 980}
 981
 982impl<'a> Drop for Scope<'a> {
 983    fn drop(&mut self) {
 984        self.tx.take().unwrap();
 985
 986        // Wait until the channel is closed, which means that all of the spawned
 987        // futures have resolved.
 988        self.executor.block(self.rx.next());
 989    }
 990}
 991
 992impl<T> Task<T> {
 993    pub fn ready(value: T) -> Self {
 994        Self::Ready(Some(value))
 995    }
 996
 997    fn local(any_task: AnyLocalTask) -> Self {
 998        Self::Local {
 999            any_task,
1000            result_type: PhantomData,
1001        }
1002    }
1003
1004    pub fn detach(self) {
1005        match self {
1006            Task::Ready(_) => {}
1007            Task::Local { any_task, .. } => any_task.detach(),
1008            Task::Send { any_task, .. } => any_task.detach(),
1009        }
1010    }
1011}
1012
1013// impl<T: 'static, E: 'static + Display> Task<Result<T, E>> {
1014//     #[track_caller]
1015//     pub fn detach_and_log_err(self, cx: &mut AppContext) {
1016//         let caller = Location::caller();
1017//         cx.spawn(|_| async move {
1018//             if let Err(err) = self.await {
1019//                 log::error!("{}:{}: {:#}", caller.file(), caller.line(), err);
1020//             }
1021//         })
1022//         .detach();
1023//     }
1024// }
1025
1026impl<T: Send> Task<T> {
1027    fn send(any_task: AnyTask) -> Self {
1028        Self::Send {
1029            any_task,
1030            result_type: PhantomData,
1031        }
1032    }
1033}
1034
1035impl<T: fmt::Debug> fmt::Debug for Task<T> {
1036    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1037        match self {
1038            Task::Ready(value) => value.fmt(f),
1039            Task::Local { any_task, .. } => any_task.fmt(f),
1040            Task::Send { any_task, .. } => any_task.fmt(f),
1041        }
1042    }
1043}
1044
1045impl<T: 'static> Future for Task<T> {
1046    type Output = T;
1047
1048    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1049        match unsafe { self.get_unchecked_mut() } {
1050            Task::Ready(value) => Poll::Ready(value.take().unwrap()),
1051            Task::Local { any_task, .. } => {
1052                any_task.poll(cx).map(|value| *value.downcast().unwrap())
1053            }
1054            Task::Send { any_task, .. } => {
1055                any_task.poll(cx).map(|value| *value.downcast().unwrap())
1056            }
1057        }
1058    }
1059}
1060
1061fn any_future<T, F>(future: F) -> AnyFuture
1062where
1063    T: 'static + Send,
1064    F: Future<Output = T> + Send + 'static,
1065{
1066    async { Box::new(future.await) as Box<dyn Any + Send> }.boxed()
1067}
1068
1069fn any_local_future<T, F>(future: F) -> AnyLocalFuture
1070where
1071    T: 'static,
1072    F: Future<Output = T> + 'static,
1073{
1074    async { Box::new(future.await) as Box<dyn Any> }.boxed_local()
1075}