dispatcher.rs

  1use crate::{PlatformDispatcher, Priority, RunnableVariant};
  2use scheduler::Instant;
  3use scheduler::{Clock, Scheduler, SessionId, TestScheduler, TestSchedulerConfig, Yield};
  4use std::{
  5    sync::{
  6        Arc,
  7        atomic::{AtomicUsize, Ordering},
  8    },
  9    time::Duration,
 10};
 11
 12/// TestDispatcher provides deterministic async execution for tests.
 13///
 14/// This implementation delegates task scheduling to the scheduler crate's `TestScheduler`.
 15/// Access the scheduler directly via `scheduler()` for clock, rng, and parking control.
 16#[doc(hidden)]
 17pub struct TestDispatcher {
 18    session_id: SessionId,
 19    scheduler: Arc<TestScheduler>,
 20    num_cpus_override: Arc<AtomicUsize>,
 21}
 22
 23impl TestDispatcher {
 24    pub fn new(seed: u64) -> Self {
 25        let scheduler = Arc::new(TestScheduler::new(TestSchedulerConfig {
 26            seed,
 27            randomize_order: true,
 28            allow_parking: false,
 29            capture_pending_traces: std::env::var("PENDING_TRACES")
 30                .map_or(false, |var| var == "1" || var == "true"),
 31            timeout_ticks: 0..=1000,
 32        }));
 33        Self::from_scheduler(scheduler)
 34    }
 35
 36    pub fn from_scheduler(scheduler: Arc<TestScheduler>) -> Self {
 37        TestDispatcher {
 38            session_id: scheduler.allocate_session_id(),
 39            scheduler,
 40            num_cpus_override: Arc::new(AtomicUsize::new(0)),
 41        }
 42    }
 43
 44    pub fn scheduler(&self) -> &Arc<TestScheduler> {
 45        &self.scheduler
 46    }
 47
 48    pub fn session_id(&self) -> SessionId {
 49        self.session_id
 50    }
 51
 52    pub fn drain_tasks(&self) {
 53        self.scheduler.drain_tasks();
 54    }
 55
 56    pub fn advance_clock(&self, by: Duration) {
 57        self.scheduler.advance_clock(by);
 58    }
 59
 60    pub fn advance_clock_to_next_timer(&self) -> bool {
 61        self.scheduler.advance_clock_to_next_timer()
 62    }
 63
 64    pub fn simulate_random_delay(&self) -> Yield {
 65        self.scheduler.yield_random()
 66    }
 67
 68    pub fn tick(&self, background_only: bool) -> bool {
 69        if background_only {
 70            self.scheduler.tick_background_only()
 71        } else {
 72            self.scheduler.tick()
 73        }
 74    }
 75
 76    pub fn run_until_parked(&self) {
 77        while self.tick(false) {}
 78    }
 79
 80    pub fn allow_parking(&self) {
 81        self.scheduler.allow_parking();
 82    }
 83
 84    pub fn forbid_parking(&self) {
 85        self.scheduler.forbid_parking();
 86    }
 87
 88    /// Override the value returned by `BackgroundExecutor::num_cpus()` in tests.
 89    /// A value of 0 means no override (the default of 4 is used).
 90    pub fn set_num_cpus(&self, count: usize) {
 91        self.num_cpus_override.store(count, Ordering::SeqCst);
 92    }
 93
 94    /// Returns the overridden CPU count, or `None` if no override is set.
 95    pub fn num_cpus_override(&self) -> Option<usize> {
 96        match self.num_cpus_override.load(Ordering::SeqCst) {
 97            0 => None,
 98            n => Some(n),
 99        }
100    }
101}
102
103impl Clone for TestDispatcher {
104    fn clone(&self) -> Self {
105        let session_id = self.scheduler.allocate_session_id();
106        Self {
107            session_id,
108            scheduler: self.scheduler.clone(),
109            num_cpus_override: self.num_cpus_override.clone(),
110        }
111    }
112}
113
114impl PlatformDispatcher for TestDispatcher {
115    fn get_all_timings(&self) -> Vec<crate::ThreadTaskTimings> {
116        Vec::new()
117    }
118
119    fn get_current_thread_timings(&self) -> crate::ThreadTaskTimings {
120        crate::ThreadTaskTimings {
121            thread_name: None,
122            thread_id: std::thread::current().id(),
123            timings: Vec::new(),
124            total_pushed: 0,
125        }
126    }
127
128    fn is_main_thread(&self) -> bool {
129        self.scheduler.is_main_thread()
130    }
131
132    fn now(&self) -> Instant {
133        self.scheduler.clock().now()
134    }
135
136    fn dispatch(&self, runnable: RunnableVariant, priority: Priority) {
137        self.scheduler
138            .schedule_background_with_priority(runnable, priority);
139    }
140
141    fn dispatch_on_main_thread(&self, runnable: RunnableVariant, _priority: Priority) {
142        self.scheduler
143            .schedule_foreground(self.session_id, runnable);
144    }
145
146    fn dispatch_after(&self, _duration: Duration, _runnable: RunnableVariant) {
147        panic!(
148            "dispatch_after should not be called in tests. \
149            Use BackgroundExecutor::timer() which uses the scheduler's native timer."
150        );
151    }
152
153    fn as_test(&self) -> Option<&TestDispatcher> {
154        Some(self)
155    }
156
157    fn spawn_realtime(&self, f: Box<dyn FnOnce() + Send>) {
158        std::thread::spawn(move || {
159            f();
160        });
161    }
162}