executor.rs

   1use anyhow::{anyhow, Result};
   2use async_task::Runnable;
   3use futures::channel::mpsc;
   4use smol::{channel, prelude::*, Executor};
   5use std::{
   6    any::Any,
   7    fmt::{self, Display},
   8    marker::PhantomData,
   9    mem,
  10    panic::Location,
  11    pin::Pin,
  12    rc::Rc,
  13    sync::Arc,
  14    task::{Context, Poll},
  15    thread,
  16    time::Duration,
  17};
  18
  19use crate::{
  20    platform::{self, Dispatcher},
  21    util, AppContext,
  22};
  23
  24pub enum Foreground {
  25    Platform {
  26        dispatcher: Arc<dyn platform::Dispatcher>,
  27        _not_send_or_sync: PhantomData<Rc<()>>,
  28    },
  29    #[cfg(any(test, feature = "test-support"))]
  30    Deterministic {
  31        cx_id: usize,
  32        executor: Arc<Deterministic>,
  33    },
  34}
  35
  36pub enum Background {
  37    #[cfg(any(test, feature = "test-support"))]
  38    Deterministic { executor: Arc<Deterministic> },
  39    Production {
  40        executor: Arc<smol::Executor<'static>>,
  41        _stop: channel::Sender<()>,
  42    },
  43}
  44
  45type AnyLocalFuture = Pin<Box<dyn 'static + Future<Output = Box<dyn Any + 'static>>>>;
  46type AnyFuture = Pin<Box<dyn 'static + Send + Future<Output = Box<dyn Any + Send + 'static>>>>;
  47type AnyTask = async_task::Task<Box<dyn Any + Send + 'static>>;
  48type AnyLocalTask = async_task::Task<Box<dyn Any + 'static>>;
  49
  50#[must_use]
  51pub enum Task<T> {
  52    Ready(Option<T>),
  53    Local {
  54        any_task: AnyLocalTask,
  55        result_type: PhantomData<T>,
  56    },
  57    Send {
  58        any_task: AnyTask,
  59        result_type: PhantomData<T>,
  60    },
  61}
  62
  63unsafe impl<T: Send> Send for Task<T> {}
  64
  65#[cfg(any(test, feature = "test-support"))]
  66struct DeterministicState {
  67    rng: rand::prelude::StdRng,
  68    seed: u64,
  69    scheduled_from_foreground: collections::HashMap<usize, Vec<ForegroundRunnable>>,
  70    scheduled_from_background: Vec<BackgroundRunnable>,
  71    forbid_parking: bool,
  72    block_on_ticks: std::ops::RangeInclusive<usize>,
  73    now: std::time::Instant,
  74    next_timer_id: usize,
  75    pending_timers: Vec<(usize, std::time::Instant, postage::barrier::Sender)>,
  76    waiting_backtrace: Option<backtrace::Backtrace>,
  77    next_runnable_id: usize,
  78    poll_history: Vec<ExecutorEvent>,
  79    previous_poll_history: Option<Vec<ExecutorEvent>>,
  80    enable_runnable_backtraces: bool,
  81    runnable_backtraces: collections::HashMap<usize, backtrace::Backtrace>,
  82}
  83
  84#[derive(Copy, Clone, Debug, PartialEq, Eq)]
  85pub enum ExecutorEvent {
  86    PollRunnable { id: usize },
  87    EnqueueRunnable { id: usize },
  88}
  89
  90#[cfg(any(test, feature = "test-support"))]
  91struct ForegroundRunnable {
  92    id: usize,
  93    runnable: Runnable,
  94    main: bool,
  95}
  96
  97#[cfg(any(test, feature = "test-support"))]
  98struct BackgroundRunnable {
  99    id: usize,
 100    runnable: Runnable,
 101}
 102
 103#[cfg(any(test, feature = "test-support"))]
 104pub struct Deterministic {
 105    state: Arc<parking_lot::Mutex<DeterministicState>>,
 106    parker: parking_lot::Mutex<parking::Parker>,
 107}
 108
 109#[must_use]
 110pub enum Timer {
 111    Production(smol::Timer),
 112    #[cfg(any(test, feature = "test-support"))]
 113    Deterministic(DeterministicTimer),
 114}
 115
 116#[cfg(any(test, feature = "test-support"))]
 117pub struct DeterministicTimer {
 118    rx: postage::barrier::Receiver,
 119    id: usize,
 120    state: Arc<parking_lot::Mutex<DeterministicState>>,
 121}
 122
 123#[cfg(any(test, feature = "test-support"))]
 124impl Deterministic {
 125    pub fn new(seed: u64) -> Arc<Self> {
 126        use rand::prelude::*;
 127
 128        Arc::new(Self {
 129            state: Arc::new(parking_lot::Mutex::new(DeterministicState {
 130                rng: StdRng::seed_from_u64(seed),
 131                seed,
 132                scheduled_from_foreground: Default::default(),
 133                scheduled_from_background: Default::default(),
 134                forbid_parking: false,
 135                block_on_ticks: 0..=1000,
 136                now: std::time::Instant::now(),
 137                next_timer_id: Default::default(),
 138                pending_timers: Default::default(),
 139                waiting_backtrace: None,
 140                next_runnable_id: 0,
 141                poll_history: Default::default(),
 142                previous_poll_history: Default::default(),
 143                enable_runnable_backtraces: false,
 144                runnable_backtraces: Default::default(),
 145            })),
 146            parker: Default::default(),
 147        })
 148    }
 149
 150    pub fn execution_history(&self) -> Vec<ExecutorEvent> {
 151        self.state.lock().poll_history.clone()
 152    }
 153
 154    pub fn set_previous_execution_history(&self, history: Option<Vec<ExecutorEvent>>) {
 155        self.state.lock().previous_poll_history = history;
 156    }
 157
 158    pub fn enable_runnable_backtrace(&self) {
 159        self.state.lock().enable_runnable_backtraces = true;
 160    }
 161
 162    pub fn runnable_backtrace(&self, runnable_id: usize) -> backtrace::Backtrace {
 163        let mut backtrace = self.state.lock().runnable_backtraces[&runnable_id].clone();
 164        backtrace.resolve();
 165        backtrace
 166    }
 167
 168    pub fn build_background(self: &Arc<Self>) -> Arc<Background> {
 169        Arc::new(Background::Deterministic {
 170            executor: self.clone(),
 171        })
 172    }
 173
 174    pub fn build_foreground(self: &Arc<Self>, id: usize) -> Rc<Foreground> {
 175        Rc::new(Foreground::Deterministic {
 176            cx_id: id,
 177            executor: self.clone(),
 178        })
 179    }
 180
 181    fn spawn_from_foreground(
 182        &self,
 183        cx_id: usize,
 184        future: AnyLocalFuture,
 185        main: bool,
 186    ) -> AnyLocalTask {
 187        let state = self.state.clone();
 188        let id;
 189        {
 190            let mut state = state.lock();
 191            id = util::post_inc(&mut state.next_runnable_id);
 192            if state.enable_runnable_backtraces {
 193                state
 194                    .runnable_backtraces
 195                    .insert(id, backtrace::Backtrace::new_unresolved());
 196            }
 197        }
 198
 199        let unparker = self.parker.lock().unparker();
 200        let (runnable, task) = async_task::spawn_local(future, move |runnable| {
 201            let mut state = state.lock();
 202            state.push_to_history(ExecutorEvent::EnqueueRunnable { id });
 203            state
 204                .scheduled_from_foreground
 205                .entry(cx_id)
 206                .or_default()
 207                .push(ForegroundRunnable { id, runnable, main });
 208            unparker.unpark();
 209        });
 210        runnable.schedule();
 211        task
 212    }
 213
 214    fn spawn(&self, future: AnyFuture) -> AnyTask {
 215        let state = self.state.clone();
 216        let id;
 217        {
 218            let mut state = state.lock();
 219            id = util::post_inc(&mut state.next_runnable_id);
 220            if state.enable_runnable_backtraces {
 221                state
 222                    .runnable_backtraces
 223                    .insert(id, backtrace::Backtrace::new_unresolved());
 224            }
 225        }
 226
 227        let unparker = self.parker.lock().unparker();
 228        let (runnable, task) = async_task::spawn(future, move |runnable| {
 229            let mut state = state.lock();
 230            state
 231                .poll_history
 232                .push(ExecutorEvent::EnqueueRunnable { id });
 233            state
 234                .scheduled_from_background
 235                .push(BackgroundRunnable { id, runnable });
 236            unparker.unpark();
 237        });
 238        runnable.schedule();
 239        task
 240    }
 241
 242    fn run<'a>(
 243        &self,
 244        cx_id: usize,
 245        main_future: Pin<Box<dyn 'a + Future<Output = Box<dyn Any>>>>,
 246    ) -> Box<dyn Any> {
 247        use std::sync::atomic::{AtomicBool, Ordering::SeqCst};
 248
 249        let woken = Arc::new(AtomicBool::new(false));
 250
 251        let state = self.state.clone();
 252        let id;
 253        {
 254            let mut state = state.lock();
 255            id = util::post_inc(&mut state.next_runnable_id);
 256            if state.enable_runnable_backtraces {
 257                state
 258                    .runnable_backtraces
 259                    .insert(id, backtrace::Backtrace::new_unresolved());
 260            }
 261        }
 262
 263        let unparker = self.parker.lock().unparker();
 264        let (runnable, mut main_task) = unsafe {
 265            async_task::spawn_unchecked(main_future, move |runnable| {
 266                let state = &mut *state.lock();
 267                state
 268                    .scheduled_from_foreground
 269                    .entry(cx_id)
 270                    .or_default()
 271                    .push(ForegroundRunnable {
 272                        id: util::post_inc(&mut state.next_runnable_id),
 273                        runnable,
 274                        main: true,
 275                    });
 276                unparker.unpark();
 277            })
 278        };
 279        runnable.schedule();
 280
 281        loop {
 282            if let Some(result) = self.run_internal(woken.clone(), Some(&mut main_task)) {
 283                return result;
 284            }
 285
 286            if !woken.load(SeqCst) {
 287                self.state.lock().will_park();
 288            }
 289
 290            woken.store(false, SeqCst);
 291            self.parker.lock().park();
 292        }
 293    }
 294
 295    pub fn run_until_parked(&self) {
 296        use std::sync::atomic::AtomicBool;
 297        let woken = Arc::new(AtomicBool::new(false));
 298        self.run_internal(woken, None);
 299    }
 300
 301    fn run_internal(
 302        &self,
 303        woken: Arc<std::sync::atomic::AtomicBool>,
 304        mut main_task: Option<&mut AnyLocalTask>,
 305    ) -> Option<Box<dyn Any>> {
 306        use rand::prelude::*;
 307        use std::sync::atomic::Ordering::SeqCst;
 308
 309        let unparker = self.parker.lock().unparker();
 310        let waker = waker_fn::waker_fn(move || {
 311            woken.store(true, SeqCst);
 312            unparker.unpark();
 313        });
 314
 315        let mut cx = Context::from_waker(&waker);
 316        loop {
 317            let mut state = self.state.lock();
 318
 319            if state.scheduled_from_foreground.is_empty()
 320                && state.scheduled_from_background.is_empty()
 321            {
 322                if let Some(main_task) = main_task {
 323                    if let Poll::Ready(result) = main_task.poll(&mut cx) {
 324                        return Some(result);
 325                    }
 326                }
 327
 328                return None;
 329            }
 330
 331            if !state.scheduled_from_background.is_empty() && state.rng.gen() {
 332                let background_len = state.scheduled_from_background.len();
 333                let ix = state.rng.gen_range(0..background_len);
 334                let background_runnable = state.scheduled_from_background.remove(ix);
 335                state.push_to_history(ExecutorEvent::PollRunnable {
 336                    id: background_runnable.id,
 337                });
 338                drop(state);
 339                background_runnable.runnable.run();
 340            } else if !state.scheduled_from_foreground.is_empty() {
 341                let available_cx_ids = state
 342                    .scheduled_from_foreground
 343                    .keys()
 344                    .copied()
 345                    .collect::<Vec<_>>();
 346                let cx_id_to_run = *available_cx_ids.iter().choose(&mut state.rng).unwrap();
 347                let scheduled_from_cx = state
 348                    .scheduled_from_foreground
 349                    .get_mut(&cx_id_to_run)
 350                    .unwrap();
 351                let foreground_runnable = scheduled_from_cx.remove(0);
 352                if scheduled_from_cx.is_empty() {
 353                    state.scheduled_from_foreground.remove(&cx_id_to_run);
 354                }
 355                state.push_to_history(ExecutorEvent::PollRunnable {
 356                    id: foreground_runnable.id,
 357                });
 358
 359                drop(state);
 360
 361                foreground_runnable.runnable.run();
 362                if let Some(main_task) = main_task.as_mut() {
 363                    if foreground_runnable.main {
 364                        if let Poll::Ready(result) = main_task.poll(&mut cx) {
 365                            return Some(result);
 366                        }
 367                    }
 368                }
 369            }
 370        }
 371    }
 372
 373    fn block<F, T>(&self, future: &mut F, max_ticks: usize) -> Option<T>
 374    where
 375        F: Unpin + Future<Output = T>,
 376    {
 377        use rand::prelude::*;
 378
 379        let unparker = self.parker.lock().unparker();
 380        let waker = waker_fn::waker_fn(move || {
 381            unparker.unpark();
 382        });
 383
 384        let mut cx = Context::from_waker(&waker);
 385        for _ in 0..max_ticks {
 386            let mut state = self.state.lock();
 387            let runnable_count = state.scheduled_from_background.len();
 388            let ix = state.rng.gen_range(0..=runnable_count);
 389            if ix < state.scheduled_from_background.len() {
 390                let background_runnable = state.scheduled_from_background.remove(ix);
 391                state.push_to_history(ExecutorEvent::PollRunnable {
 392                    id: background_runnable.id,
 393                });
 394                drop(state);
 395                background_runnable.runnable.run();
 396            } else {
 397                drop(state);
 398                if let Poll::Ready(result) = future.poll(&mut cx) {
 399                    return Some(result);
 400                }
 401                let mut state = self.state.lock();
 402                if state.scheduled_from_background.is_empty() {
 403                    state.will_park();
 404                    drop(state);
 405                    self.parker.lock().park();
 406                }
 407
 408                continue;
 409            }
 410        }
 411
 412        None
 413    }
 414
 415    pub fn timer(&self, duration: Duration) -> Timer {
 416        let (tx, rx) = postage::barrier::channel();
 417        let mut state = self.state.lock();
 418        let wakeup_at = state.now + duration;
 419        let id = util::post_inc(&mut state.next_timer_id);
 420        match state
 421            .pending_timers
 422            .binary_search_by_key(&wakeup_at, |e| e.1)
 423        {
 424            Ok(ix) | Err(ix) => state.pending_timers.insert(ix, (id, wakeup_at, tx)),
 425        }
 426        let state = self.state.clone();
 427        Timer::Deterministic(DeterministicTimer { rx, id, state })
 428    }
 429
 430    pub fn now(&self) -> std::time::Instant {
 431        let state = self.state.lock();
 432        state.now
 433    }
 434
 435    pub fn advance_clock(&self, duration: Duration) {
 436        let new_now = self.state.lock().now + duration;
 437        loop {
 438            self.run_until_parked();
 439            let mut state = self.state.lock();
 440
 441            if let Some((_, wakeup_time, _)) = state.pending_timers.first() {
 442                let wakeup_time = *wakeup_time;
 443                if wakeup_time <= new_now {
 444                    let timer_count = state
 445                        .pending_timers
 446                        .iter()
 447                        .take_while(|(_, t, _)| *t == wakeup_time)
 448                        .count();
 449                    state.now = wakeup_time;
 450                    let timers_to_wake = state
 451                        .pending_timers
 452                        .drain(0..timer_count)
 453                        .collect::<Vec<_>>();
 454                    drop(state);
 455                    drop(timers_to_wake);
 456                    continue;
 457                }
 458            }
 459
 460            break;
 461        }
 462
 463        self.state.lock().now = new_now;
 464    }
 465
 466    pub fn start_waiting(&self) {
 467        self.state.lock().waiting_backtrace = Some(backtrace::Backtrace::new_unresolved());
 468    }
 469
 470    pub fn finish_waiting(&self) {
 471        self.state.lock().waiting_backtrace.take();
 472    }
 473
 474    pub fn forbid_parking(&self) {
 475        use rand::prelude::*;
 476
 477        let mut state = self.state.lock();
 478        state.forbid_parking = true;
 479        state.rng = StdRng::seed_from_u64(state.seed);
 480    }
 481
 482    pub fn allow_parking(&self) {
 483        use rand::prelude::*;
 484
 485        let mut state = self.state.lock();
 486        state.forbid_parking = false;
 487        state.rng = StdRng::seed_from_u64(state.seed);
 488    }
 489
 490    pub async fn simulate_random_delay(&self) {
 491        use rand::prelude::*;
 492        use smol::future::yield_now;
 493        if self.state.lock().rng.gen_bool(0.2) {
 494            let yields = self.state.lock().rng.gen_range(1..=10);
 495            for _ in 0..yields {
 496                yield_now().await;
 497            }
 498        }
 499    }
 500
 501    pub fn record_backtrace(&self) {
 502        let mut state = self.state.lock();
 503        if state.enable_runnable_backtraces {
 504            let current_id = state
 505                .poll_history
 506                .iter()
 507                .rev()
 508                .find_map(|event| match event {
 509                    ExecutorEvent::PollRunnable { id } => Some(*id),
 510                    _ => None,
 511                });
 512            if let Some(id) = current_id {
 513                state
 514                    .runnable_backtraces
 515                    .insert(id, backtrace::Backtrace::new_unresolved());
 516            }
 517        }
 518    }
 519}
 520
 521impl Drop for Timer {
 522    fn drop(&mut self) {
 523        #[cfg(any(test, feature = "test-support"))]
 524        if let Timer::Deterministic(DeterministicTimer { state, id, .. }) = self {
 525            state
 526                .lock()
 527                .pending_timers
 528                .retain(|(timer_id, _, _)| timer_id != id)
 529        }
 530    }
 531}
 532
 533impl Future for Timer {
 534    type Output = ();
 535
 536    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
 537        match &mut *self {
 538            #[cfg(any(test, feature = "test-support"))]
 539            Self::Deterministic(DeterministicTimer { rx, .. }) => {
 540                use postage::stream::{PollRecv, Stream as _};
 541                smol::pin!(rx);
 542                match rx.poll_recv(&mut postage::Context::from_waker(cx.waker())) {
 543                    PollRecv::Ready(()) | PollRecv::Closed => Poll::Ready(()),
 544                    PollRecv::Pending => Poll::Pending,
 545                }
 546            }
 547            Self::Production(timer) => {
 548                smol::pin!(timer);
 549                match timer.poll(cx) {
 550                    Poll::Ready(_) => Poll::Ready(()),
 551                    Poll::Pending => Poll::Pending,
 552                }
 553            }
 554        }
 555    }
 556}
 557
 558#[cfg(any(test, feature = "test-support"))]
 559impl DeterministicState {
 560    fn push_to_history(&mut self, event: ExecutorEvent) {
 561        use std::fmt::Write as _;
 562
 563        self.poll_history.push(event);
 564        if let Some(prev_history) = &self.previous_poll_history {
 565            let ix = self.poll_history.len() - 1;
 566            let prev_event = prev_history[ix];
 567            if event != prev_event {
 568                let mut message = String::new();
 569                writeln!(
 570                    &mut message,
 571                    "current runnable backtrace:\n{:?}",
 572                    self.runnable_backtraces.get_mut(&event.id()).map(|trace| {
 573                        trace.resolve();
 574                        util::CwdBacktrace(trace)
 575                    })
 576                )
 577                .unwrap();
 578                writeln!(
 579                    &mut message,
 580                    "previous runnable backtrace:\n{:?}",
 581                    self.runnable_backtraces
 582                        .get_mut(&prev_event.id())
 583                        .map(|trace| {
 584                            trace.resolve();
 585                            util::CwdBacktrace(trace)
 586                        })
 587                )
 588                .unwrap();
 589                panic!("detected non-determinism after {ix}. {message}");
 590            }
 591        }
 592    }
 593
 594    fn will_park(&mut self) {
 595        if self.forbid_parking {
 596            let mut backtrace_message = String::new();
 597            #[cfg(any(test, feature = "test-support"))]
 598            if let Some(backtrace) = self.waiting_backtrace.as_mut() {
 599                backtrace.resolve();
 600                backtrace_message = format!(
 601                    "\nbacktrace of waiting future:\n{:?}",
 602                    util::CwdBacktrace(backtrace)
 603                );
 604            }
 605
 606            panic!(
 607                "deterministic executor parked after a call to forbid_parking{}",
 608                backtrace_message
 609            );
 610        }
 611    }
 612}
 613
 614#[cfg(any(test, feature = "test-support"))]
 615impl ExecutorEvent {
 616    pub fn id(&self) -> usize {
 617        match self {
 618            ExecutorEvent::PollRunnable { id } => *id,
 619            ExecutorEvent::EnqueueRunnable { id } => *id,
 620        }
 621    }
 622}
 623
 624impl Foreground {
 625    pub fn platform(dispatcher: Arc<dyn platform::Dispatcher>) -> Result<Self> {
 626        if dispatcher.is_main_thread() {
 627            Ok(Self::Platform {
 628                dispatcher,
 629                _not_send_or_sync: PhantomData,
 630            })
 631        } else {
 632            Err(anyhow!("must be constructed on main thread"))
 633        }
 634    }
 635
 636    pub fn spawn<T: 'static>(&self, future: impl Future<Output = T> + 'static) -> Task<T> {
 637        let future = any_local_future(future);
 638        let any_task = match self {
 639            #[cfg(any(test, feature = "test-support"))]
 640            Self::Deterministic { cx_id, executor } => {
 641                executor.spawn_from_foreground(*cx_id, future, false)
 642            }
 643            Self::Platform { dispatcher, .. } => {
 644                fn spawn_inner(
 645                    future: AnyLocalFuture,
 646                    dispatcher: &Arc<dyn Dispatcher>,
 647                ) -> AnyLocalTask {
 648                    let dispatcher = dispatcher.clone();
 649                    let schedule =
 650                        move |runnable: Runnable| dispatcher.run_on_main_thread(runnable);
 651                    let (runnable, task) = async_task::spawn_local(future, schedule);
 652                    runnable.schedule();
 653                    task
 654                }
 655                spawn_inner(future, dispatcher)
 656            }
 657        };
 658        Task::local(any_task)
 659    }
 660
 661    #[cfg(any(test, feature = "test-support"))]
 662    pub fn run<T: 'static>(&self, future: impl Future<Output = T>) -> T {
 663        let future = async move { Box::new(future.await) as Box<dyn Any> }.boxed_local();
 664        let result = match self {
 665            Self::Deterministic { cx_id, executor } => executor.run(*cx_id, future),
 666            Self::Platform { .. } => panic!("you can't call run on a platform foreground executor"),
 667        };
 668        *result.downcast().unwrap()
 669    }
 670
 671    #[cfg(any(test, feature = "test-support"))]
 672    pub fn run_until_parked(&self) {
 673        match self {
 674            Self::Deterministic { executor, .. } => executor.run_until_parked(),
 675            _ => panic!("this method can only be called on a deterministic executor"),
 676        }
 677    }
 678
 679    #[cfg(any(test, feature = "test-support"))]
 680    pub fn parking_forbidden(&self) -> bool {
 681        match self {
 682            Self::Deterministic { executor, .. } => executor.state.lock().forbid_parking,
 683            _ => panic!("this method can only be called on a deterministic executor"),
 684        }
 685    }
 686
 687    #[cfg(any(test, feature = "test-support"))]
 688    pub fn start_waiting(&self) {
 689        match self {
 690            Self::Deterministic { executor, .. } => executor.start_waiting(),
 691            _ => panic!("this method can only be called on a deterministic executor"),
 692        }
 693    }
 694
 695    #[cfg(any(test, feature = "test-support"))]
 696    pub fn finish_waiting(&self) {
 697        match self {
 698            Self::Deterministic { executor, .. } => executor.finish_waiting(),
 699            _ => panic!("this method can only be called on a deterministic executor"),
 700        }
 701    }
 702
 703    #[cfg(any(test, feature = "test-support"))]
 704    pub fn forbid_parking(&self) {
 705        match self {
 706            Self::Deterministic { executor, .. } => executor.forbid_parking(),
 707            _ => panic!("this method can only be called on a deterministic executor"),
 708        }
 709    }
 710
 711    #[cfg(any(test, feature = "test-support"))]
 712    pub fn allow_parking(&self) {
 713        match self {
 714            Self::Deterministic { executor, .. } => executor.allow_parking(),
 715            _ => panic!("this method can only be called on a deterministic executor"),
 716        }
 717    }
 718
 719    #[cfg(any(test, feature = "test-support"))]
 720    pub fn advance_clock(&self, duration: Duration) {
 721        match self {
 722            Self::Deterministic { executor, .. } => executor.advance_clock(duration),
 723            _ => panic!("this method can only be called on a deterministic executor"),
 724        }
 725    }
 726
 727    #[cfg(any(test, feature = "test-support"))]
 728    pub fn set_block_on_ticks(&self, range: std::ops::RangeInclusive<usize>) {
 729        match self {
 730            Self::Deterministic { executor, .. } => executor.state.lock().block_on_ticks = range,
 731            _ => panic!("this method can only be called on a deterministic executor"),
 732        }
 733    }
 734}
 735
 736impl Background {
 737    pub fn new() -> Self {
 738        let executor = Arc::new(Executor::new());
 739        let stop = channel::unbounded::<()>();
 740
 741        for i in 0..2 * num_cpus::get() {
 742            let executor = executor.clone();
 743            let stop = stop.1.clone();
 744            thread::Builder::new()
 745                .name(format!("background-executor-{}", i))
 746                .spawn(move || smol::block_on(executor.run(stop.recv())))
 747                .unwrap();
 748        }
 749
 750        Self::Production {
 751            executor,
 752            _stop: stop.0,
 753        }
 754    }
 755
 756    pub fn num_cpus(&self) -> usize {
 757        num_cpus::get()
 758    }
 759
 760    pub fn spawn<T, F>(&self, future: F) -> Task<T>
 761    where
 762        T: 'static + Send,
 763        F: Send + Future<Output = T> + 'static,
 764    {
 765        let future = any_future(future);
 766        let any_task = match self {
 767            Self::Production { executor, .. } => executor.spawn(future),
 768            #[cfg(any(test, feature = "test-support"))]
 769            Self::Deterministic { executor } => executor.spawn(future),
 770        };
 771        Task::send(any_task)
 772    }
 773
 774    pub fn block<F, T>(&self, future: F) -> T
 775    where
 776        F: Future<Output = T>,
 777    {
 778        smol::pin!(future);
 779        match self {
 780            Self::Production { .. } => smol::block_on(&mut future),
 781            #[cfg(any(test, feature = "test-support"))]
 782            Self::Deterministic { executor, .. } => {
 783                executor.block(&mut future, usize::MAX).unwrap()
 784            }
 785        }
 786    }
 787
 788    pub fn block_with_timeout<F, T>(
 789        &self,
 790        timeout: Duration,
 791        future: F,
 792    ) -> Result<T, impl Future<Output = T>>
 793    where
 794        T: 'static,
 795        F: 'static + Unpin + Future<Output = T>,
 796    {
 797        let mut future = any_local_future(future);
 798        if !timeout.is_zero() {
 799            let output = match self {
 800                Self::Production { .. } => smol::block_on(util::timeout(timeout, &mut future)).ok(),
 801                #[cfg(any(test, feature = "test-support"))]
 802                Self::Deterministic { executor, .. } => {
 803                    use rand::prelude::*;
 804                    let max_ticks = {
 805                        let mut state = executor.state.lock();
 806                        let range = state.block_on_ticks.clone();
 807                        state.rng.gen_range(range)
 808                    };
 809                    executor.block(&mut future, max_ticks)
 810                }
 811            };
 812            if let Some(output) = output {
 813                return Ok(*output.downcast().unwrap());
 814            }
 815        }
 816        Err(async { *future.await.downcast().unwrap() })
 817    }
 818
 819    pub async fn scoped<'scope, F>(self: &Arc<Self>, scheduler: F)
 820    where
 821        F: FnOnce(&mut Scope<'scope>),
 822    {
 823        let mut scope = Scope::new(self.clone());
 824        (scheduler)(&mut scope);
 825        let spawned = mem::take(&mut scope.futures)
 826            .into_iter()
 827            .map(|f| self.spawn(f))
 828            .collect::<Vec<_>>();
 829        for task in spawned {
 830            task.await;
 831        }
 832    }
 833
 834    pub fn timer(&self, duration: Duration) -> Timer {
 835        match self {
 836            Background::Production { .. } => Timer::Production(smol::Timer::after(duration)),
 837            #[cfg(any(test, feature = "test-support"))]
 838            Background::Deterministic { executor } => executor.timer(duration),
 839        }
 840    }
 841
 842    pub fn now(&self) -> std::time::Instant {
 843        match self {
 844            Background::Production { .. } => std::time::Instant::now(),
 845            #[cfg(any(test, feature = "test-support"))]
 846            Background::Deterministic { executor } => executor.now(),
 847        }
 848    }
 849
 850    #[cfg(any(test, feature = "test-support"))]
 851    pub fn rng<'a>(&'a self) -> impl 'a + std::ops::DerefMut<Target = rand::prelude::StdRng> {
 852        match self {
 853            Self::Deterministic { executor, .. } => {
 854                parking_lot::lock_api::MutexGuard::map(executor.state.lock(), |s| &mut s.rng)
 855            }
 856            _ => panic!("this method can only be called on a deterministic executor"),
 857        }
 858    }
 859
 860    #[cfg(any(test, feature = "test-support"))]
 861    pub async fn simulate_random_delay(&self) {
 862        match self {
 863            Self::Deterministic { executor, .. } => {
 864                executor.simulate_random_delay().await;
 865            }
 866            _ => {
 867                panic!("this method can only be called on a deterministic executor")
 868            }
 869        }
 870    }
 871
 872    #[cfg(any(test, feature = "test-support"))]
 873    pub fn record_backtrace(&self) {
 874        match self {
 875            Self::Deterministic { executor, .. } => executor.record_backtrace(),
 876            _ => {
 877                panic!("this method can only be called on a deterministic executor")
 878            }
 879        }
 880    }
 881
 882    #[cfg(any(test, feature = "test-support"))]
 883    pub fn start_waiting(&self) {
 884        match self {
 885            Self::Deterministic { executor, .. } => executor.start_waiting(),
 886            _ => panic!("this method can only be called on a deterministic executor"),
 887        }
 888    }
 889}
 890
 891impl Default for Background {
 892    fn default() -> Self {
 893        Self::new()
 894    }
 895}
 896
 897pub struct Scope<'a> {
 898    executor: Arc<Background>,
 899    futures: Vec<Pin<Box<dyn Future<Output = ()> + Send + 'static>>>,
 900    tx: Option<mpsc::Sender<()>>,
 901    rx: mpsc::Receiver<()>,
 902    _phantom: PhantomData<&'a ()>,
 903}
 904
 905impl<'a> Scope<'a> {
 906    fn new(executor: Arc<Background>) -> Self {
 907        let (tx, rx) = mpsc::channel(1);
 908        Self {
 909            executor,
 910            tx: Some(tx),
 911            rx,
 912            futures: Default::default(),
 913            _phantom: PhantomData,
 914        }
 915    }
 916
 917    pub fn spawn<F>(&mut self, f: F)
 918    where
 919        F: Future<Output = ()> + Send + 'a,
 920    {
 921        let tx = self.tx.clone().unwrap();
 922
 923        // Safety: The 'a lifetime is guaranteed to outlive any of these futures because
 924        // dropping this `Scope` blocks until all of the futures have resolved.
 925        let f = unsafe {
 926            mem::transmute::<
 927                Pin<Box<dyn Future<Output = ()> + Send + 'a>>,
 928                Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
 929            >(Box::pin(async move {
 930                f.await;
 931                drop(tx);
 932            }))
 933        };
 934        self.futures.push(f);
 935    }
 936}
 937
 938impl<'a> Drop for Scope<'a> {
 939    fn drop(&mut self) {
 940        self.tx.take().unwrap();
 941
 942        // Wait until the channel is closed, which means that all of the spawned
 943        // futures have resolved.
 944        self.executor.block(self.rx.next());
 945    }
 946}
 947
 948impl<T> Task<T> {
 949    pub fn ready(value: T) -> Self {
 950        Self::Ready(Some(value))
 951    }
 952
 953    fn local(any_task: AnyLocalTask) -> Self {
 954        Self::Local {
 955            any_task,
 956            result_type: PhantomData,
 957        }
 958    }
 959
 960    pub fn detach(self) {
 961        match self {
 962            Task::Ready(_) => {}
 963            Task::Local { any_task, .. } => any_task.detach(),
 964            Task::Send { any_task, .. } => any_task.detach(),
 965        }
 966    }
 967}
 968
 969impl<T: 'static, E: 'static + Display> Task<Result<T, E>> {
 970    #[track_caller]
 971    pub fn detach_and_log_err(self, cx: &mut AppContext) {
 972        let caller = Location::caller();
 973        cx.spawn(|_| async move {
 974            if let Err(err) = self.await {
 975                log::error!("{}:{}: {:#}", caller.file(), caller.line(), err);
 976            }
 977        })
 978        .detach();
 979    }
 980}
 981
 982impl<T: Send> Task<T> {
 983    fn send(any_task: AnyTask) -> Self {
 984        Self::Send {
 985            any_task,
 986            result_type: PhantomData,
 987        }
 988    }
 989}
 990
 991impl<T: fmt::Debug> fmt::Debug for Task<T> {
 992    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
 993        match self {
 994            Task::Ready(value) => value.fmt(f),
 995            Task::Local { any_task, .. } => any_task.fmt(f),
 996            Task::Send { any_task, .. } => any_task.fmt(f),
 997        }
 998    }
 999}
1000
1001impl<T: 'static> Future for Task<T> {
1002    type Output = T;
1003
1004    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1005        match unsafe { self.get_unchecked_mut() } {
1006            Task::Ready(value) => Poll::Ready(value.take().unwrap()),
1007            Task::Local { any_task, .. } => {
1008                any_task.poll(cx).map(|value| *value.downcast().unwrap())
1009            }
1010            Task::Send { any_task, .. } => {
1011                any_task.poll(cx).map(|value| *value.downcast().unwrap())
1012            }
1013        }
1014    }
1015}
1016
1017fn any_future<T, F>(future: F) -> AnyFuture
1018where
1019    T: 'static + Send,
1020    F: Future<Output = T> + Send + 'static,
1021{
1022    async { Box::new(future.await) as Box<dyn Any + Send> }.boxed()
1023}
1024
1025fn any_local_future<T, F>(future: F) -> AnyLocalFuture
1026where
1027    T: 'static,
1028    F: Future<Output = T> + 'static,
1029{
1030    async { Box::new(future.await) as Box<dyn Any> }.boxed_local()
1031}