dispatcher.rs

  1use crate::{PlatformDispatcher, Priority, RunnableVariant};
  2use scheduler::{Clock, Scheduler, SessionId, TestScheduler, TestSchedulerConfig, Yield};
  3use std::{
  4    sync::{
  5        Arc,
  6        atomic::{AtomicUsize, Ordering},
  7    },
  8    time::{Duration, Instant},
  9};
 10
 11/// TestDispatcher provides deterministic async execution for tests.
 12///
 13/// This implementation delegates task scheduling to the scheduler crate's `TestScheduler`.
 14/// Access the scheduler directly via `scheduler()` for clock, rng, and parking control.
 15#[doc(hidden)]
 16pub struct TestDispatcher {
 17    session_id: SessionId,
 18    scheduler: Arc<TestScheduler>,
 19    num_cpus_override: Arc<AtomicUsize>,
 20}
 21
 22impl TestDispatcher {
 23    pub fn new(seed: u64) -> Self {
 24        let scheduler = Arc::new(TestScheduler::new(TestSchedulerConfig {
 25            seed,
 26            randomize_order: true,
 27            allow_parking: false,
 28            capture_pending_traces: std::env::var("PENDING_TRACES")
 29                .map_or(false, |var| var == "1" || var == "true"),
 30            timeout_ticks: 0..=1000,
 31        }));
 32
 33        let session_id = scheduler.allocate_session_id();
 34
 35        TestDispatcher {
 36            session_id,
 37            scheduler,
 38            num_cpus_override: Arc::new(AtomicUsize::new(0)),
 39        }
 40    }
 41
 42    pub fn scheduler(&self) -> &Arc<TestScheduler> {
 43        &self.scheduler
 44    }
 45
 46    pub fn session_id(&self) -> SessionId {
 47        self.session_id
 48    }
 49
 50    pub fn advance_clock(&self, by: Duration) {
 51        self.scheduler.advance_clock(by);
 52    }
 53
 54    pub fn advance_clock_to_next_timer(&self) -> bool {
 55        self.scheduler.advance_clock_to_next_timer()
 56    }
 57
 58    pub fn simulate_random_delay(&self) -> Yield {
 59        self.scheduler.yield_random()
 60    }
 61
 62    pub fn tick(&self, background_only: bool) -> bool {
 63        if background_only {
 64            self.scheduler.tick_background_only()
 65        } else {
 66            self.scheduler.tick()
 67        }
 68    }
 69
 70    pub fn run_until_parked(&self) {
 71        while self.tick(false) {}
 72    }
 73
 74    /// Override the value returned by `BackgroundExecutor::num_cpus()` in tests.
 75    /// A value of 0 means no override (the default of 4 is used).
 76    pub fn set_num_cpus(&self, count: usize) {
 77        self.num_cpus_override.store(count, Ordering::SeqCst);
 78    }
 79
 80    /// Returns the overridden CPU count, or `None` if no override is set.
 81    pub fn num_cpus_override(&self) -> Option<usize> {
 82        match self.num_cpus_override.load(Ordering::SeqCst) {
 83            0 => None,
 84            n => Some(n),
 85        }
 86    }
 87}
 88
 89impl Clone for TestDispatcher {
 90    fn clone(&self) -> Self {
 91        let session_id = self.scheduler.allocate_session_id();
 92        Self {
 93            session_id,
 94            scheduler: self.scheduler.clone(),
 95            num_cpus_override: self.num_cpus_override.clone(),
 96        }
 97    }
 98}
 99
100impl PlatformDispatcher for TestDispatcher {
101    fn get_all_timings(&self) -> Vec<crate::ThreadTaskTimings> {
102        Vec::new()
103    }
104
105    fn get_current_thread_timings(&self) -> Vec<crate::TaskTiming> {
106        Vec::new()
107    }
108
109    fn is_main_thread(&self) -> bool {
110        self.scheduler.is_main_thread()
111    }
112
113    fn now(&self) -> Instant {
114        self.scheduler.clock().now()
115    }
116
117    fn dispatch(&self, runnable: RunnableVariant, priority: Priority) {
118        self.scheduler
119            .schedule_background_with_priority(runnable, priority);
120    }
121
122    fn dispatch_on_main_thread(&self, runnable: RunnableVariant, _priority: Priority) {
123        self.scheduler
124            .schedule_foreground(self.session_id, runnable);
125    }
126
127    fn dispatch_after(&self, _duration: Duration, _runnable: RunnableVariant) {
128        panic!(
129            "dispatch_after should not be called in tests. \
130            Use BackgroundExecutor::timer() which uses the scheduler's native timer."
131        );
132    }
133
134    fn as_test(&self) -> Option<&TestDispatcher> {
135        Some(self)
136    }
137
138    fn spawn_realtime(&self, f: Box<dyn FnOnce() + Send>) {
139        std::thread::spawn(move || {
140            f();
141        });
142    }
143}