1use crate::{PlatformDispatcher, Priority, RunnableVariant};
2use scheduler::Instant;
3use scheduler::{Clock, Scheduler, SessionId, TestScheduler, TestSchedulerConfig, Yield};
4use std::{
5 sync::{
6 Arc,
7 atomic::{AtomicUsize, Ordering},
8 },
9 time::Duration,
10};
11
12/// TestDispatcher provides deterministic async execution for tests.
13///
14/// This implementation delegates task scheduling to the scheduler crate's `TestScheduler`.
15/// Access the scheduler directly via `scheduler()` for clock, rng, and parking control.
16#[doc(hidden)]
17pub struct TestDispatcher {
18 session_id: SessionId,
19 scheduler: Arc<TestScheduler>,
20 num_cpus_override: Arc<AtomicUsize>,
21}
22
23impl TestDispatcher {
24 pub fn new(seed: u64) -> Self {
25 let scheduler = Arc::new(TestScheduler::new(TestSchedulerConfig {
26 seed,
27 randomize_order: true,
28 allow_parking: false,
29 capture_pending_traces: std::env::var("PENDING_TRACES")
30 .map_or(false, |var| var == "1" || var == "true"),
31 timeout_ticks: 0..=1000,
32 }));
33 Self::from_scheduler(scheduler)
34 }
35
36 pub fn from_scheduler(scheduler: Arc<TestScheduler>) -> Self {
37 TestDispatcher {
38 session_id: scheduler.allocate_session_id(),
39 scheduler,
40 num_cpus_override: Arc::new(AtomicUsize::new(0)),
41 }
42 }
43
44 pub fn scheduler(&self) -> &Arc<TestScheduler> {
45 &self.scheduler
46 }
47
48 pub fn session_id(&self) -> SessionId {
49 self.session_id
50 }
51
52 pub fn drain_tasks(&self) {
53 self.scheduler.drain_tasks();
54 }
55
56 pub fn advance_clock(&self, by: Duration) {
57 self.scheduler.advance_clock(by);
58 }
59
60 pub fn advance_clock_to_next_timer(&self) -> bool {
61 self.scheduler.advance_clock_to_next_timer()
62 }
63
64 pub fn simulate_random_delay(&self) -> Yield {
65 self.scheduler.yield_random()
66 }
67
68 pub fn tick(&self, background_only: bool) -> bool {
69 if background_only {
70 self.scheduler.tick_background_only()
71 } else {
72 self.scheduler.tick()
73 }
74 }
75
76 pub fn run_until_parked(&self) {
77 while self.tick(false) {}
78 }
79
80 pub fn allow_parking(&self) {
81 self.scheduler.allow_parking();
82 }
83
84 pub fn forbid_parking(&self) {
85 self.scheduler.forbid_parking();
86 }
87
88 /// Override the value returned by `BackgroundExecutor::num_cpus()` in tests.
89 /// A value of 0 means no override (the default of 4 is used).
90 pub fn set_num_cpus(&self, count: usize) {
91 self.num_cpus_override.store(count, Ordering::SeqCst);
92 }
93
94 /// Returns the overridden CPU count, or `None` if no override is set.
95 pub fn num_cpus_override(&self) -> Option<usize> {
96 match self.num_cpus_override.load(Ordering::SeqCst) {
97 0 => None,
98 n => Some(n),
99 }
100 }
101}
102
103impl Clone for TestDispatcher {
104 fn clone(&self) -> Self {
105 let session_id = self.scheduler.allocate_session_id();
106 Self {
107 session_id,
108 scheduler: self.scheduler.clone(),
109 num_cpus_override: self.num_cpus_override.clone(),
110 }
111 }
112}
113
114impl PlatformDispatcher for TestDispatcher {
115 fn get_all_timings(&self) -> Vec<crate::ThreadTaskTimings> {
116 Vec::new()
117 }
118
119 fn get_current_thread_timings(&self) -> crate::ThreadTaskTimings {
120 crate::ThreadTaskTimings {
121 thread_name: None,
122 thread_id: std::thread::current().id(),
123 timings: Vec::new(),
124 total_pushed: 0,
125 }
126 }
127
128 fn is_main_thread(&self) -> bool {
129 self.scheduler.is_main_thread()
130 }
131
132 fn now(&self) -> Instant {
133 self.scheduler.clock().now()
134 }
135
136 fn dispatch(&self, runnable: RunnableVariant, priority: Priority) {
137 self.scheduler
138 .schedule_background_with_priority(runnable, priority);
139 }
140
141 fn dispatch_on_main_thread(&self, runnable: RunnableVariant, _priority: Priority) {
142 self.scheduler
143 .schedule_foreground(self.session_id, runnable);
144 }
145
146 fn dispatch_after(&self, _duration: Duration, _runnable: RunnableVariant) {
147 panic!(
148 "dispatch_after should not be called in tests. \
149 Use BackgroundExecutor::timer() which uses the scheduler's native timer."
150 );
151 }
152
153 fn as_test(&self) -> Option<&TestDispatcher> {
154 Some(self)
155 }
156
157 fn spawn_realtime(&self, f: Box<dyn FnOnce() + Send>) {
158 std::thread::spawn(move || {
159 f();
160 });
161 }
162}