1use crate::{PlatformDispatcher, Priority, RunnableVariant};
2use scheduler::{Clock, Scheduler, SessionId, TestScheduler, TestSchedulerConfig, Yield};
3use std::{
4 sync::{
5 Arc,
6 atomic::{AtomicUsize, Ordering},
7 },
8 time::{Duration, Instant},
9};
10
11/// TestDispatcher provides deterministic async execution for tests.
12///
13/// This implementation delegates task scheduling to the scheduler crate's `TestScheduler`.
14/// Access the scheduler directly via `scheduler()` for clock, rng, and parking control.
15#[doc(hidden)]
16pub struct TestDispatcher {
17 session_id: SessionId,
18 scheduler: Arc<TestScheduler>,
19 num_cpus_override: Arc<AtomicUsize>,
20}
21
22impl TestDispatcher {
23 pub fn new(seed: u64) -> Self {
24 let scheduler = Arc::new(TestScheduler::new(TestSchedulerConfig {
25 seed,
26 randomize_order: true,
27 allow_parking: false,
28 capture_pending_traces: std::env::var("PENDING_TRACES")
29 .map_or(false, |var| var == "1" || var == "true"),
30 timeout_ticks: 0..=1000,
31 }));
32
33 let session_id = scheduler.allocate_session_id();
34
35 TestDispatcher {
36 session_id,
37 scheduler,
38 num_cpus_override: Arc::new(AtomicUsize::new(0)),
39 }
40 }
41
42 pub fn scheduler(&self) -> &Arc<TestScheduler> {
43 &self.scheduler
44 }
45
46 pub fn session_id(&self) -> SessionId {
47 self.session_id
48 }
49
50 pub fn advance_clock(&self, by: Duration) {
51 self.scheduler.advance_clock(by);
52 }
53
54 pub fn advance_clock_to_next_timer(&self) -> bool {
55 self.scheduler.advance_clock_to_next_timer()
56 }
57
58 pub fn simulate_random_delay(&self) -> Yield {
59 self.scheduler.yield_random()
60 }
61
62 pub fn tick(&self, background_only: bool) -> bool {
63 if background_only {
64 self.scheduler.tick_background_only()
65 } else {
66 self.scheduler.tick()
67 }
68 }
69
70 pub fn run_until_parked(&self) {
71 while self.tick(false) {}
72 }
73
74 /// Override the value returned by `BackgroundExecutor::num_cpus()` in tests.
75 /// A value of 0 means no override (the default of 4 is used).
76 pub fn set_num_cpus(&self, count: usize) {
77 self.num_cpus_override.store(count, Ordering::SeqCst);
78 }
79
80 /// Returns the overridden CPU count, or `None` if no override is set.
81 pub fn num_cpus_override(&self) -> Option<usize> {
82 match self.num_cpus_override.load(Ordering::SeqCst) {
83 0 => None,
84 n => Some(n),
85 }
86 }
87}
88
89impl Clone for TestDispatcher {
90 fn clone(&self) -> Self {
91 let session_id = self.scheduler.allocate_session_id();
92 Self {
93 session_id,
94 scheduler: self.scheduler.clone(),
95 num_cpus_override: self.num_cpus_override.clone(),
96 }
97 }
98}
99
100impl PlatformDispatcher for TestDispatcher {
101 fn get_all_timings(&self) -> Vec<crate::ThreadTaskTimings> {
102 Vec::new()
103 }
104
105 fn get_current_thread_timings(&self) -> Vec<crate::TaskTiming> {
106 Vec::new()
107 }
108
109 fn is_main_thread(&self) -> bool {
110 self.scheduler.is_main_thread()
111 }
112
113 fn now(&self) -> Instant {
114 self.scheduler.clock().now()
115 }
116
117 fn dispatch(&self, runnable: RunnableVariant, priority: Priority) {
118 self.scheduler
119 .schedule_background_with_priority(runnable, priority);
120 }
121
122 fn dispatch_on_main_thread(&self, runnable: RunnableVariant, _priority: Priority) {
123 self.scheduler
124 .schedule_foreground(self.session_id, runnable);
125 }
126
127 fn dispatch_after(&self, _duration: Duration, _runnable: RunnableVariant) {
128 panic!(
129 "dispatch_after should not be called in tests. \
130 Use BackgroundExecutor::timer() which uses the scheduler's native timer."
131 );
132 }
133
134 fn as_test(&self) -> Option<&TestDispatcher> {
135 Some(self)
136 }
137
138 fn spawn_realtime(&self, f: Box<dyn FnOnce() + Send>) {
139 std::thread::spawn(move || {
140 f();
141 });
142 }
143}