1use crate::{PlatformDispatcher, Priority, RunnableVariant};
2use scheduler::Instant;
3use scheduler::{Clock, Scheduler, SessionId, TestScheduler, TestSchedulerConfig, Yield};
4use std::{
5 sync::{
6 Arc,
7 atomic::{AtomicUsize, Ordering},
8 },
9 time::Duration,
10};
11
12/// TestDispatcher provides deterministic async execution for tests.
13///
14/// This implementation delegates task scheduling to the scheduler crate's `TestScheduler`.
15/// Access the scheduler directly via `scheduler()` for clock, rng, and parking control.
16#[doc(hidden)]
17pub struct TestDispatcher {
18 session_id: SessionId,
19 scheduler: Arc<TestScheduler>,
20 num_cpus_override: Arc<AtomicUsize>,
21}
22
23impl TestDispatcher {
24 pub fn new(seed: u64) -> Self {
25 let scheduler = Arc::new(TestScheduler::new(TestSchedulerConfig {
26 seed,
27 randomize_order: true,
28 allow_parking: false,
29 capture_pending_traces: std::env::var("PENDING_TRACES")
30 .map_or(false, |var| var == "1" || var == "true"),
31 timeout_ticks: 0..=1000,
32 }));
33
34 let session_id = scheduler.allocate_session_id();
35
36 TestDispatcher {
37 session_id,
38 scheduler,
39 num_cpus_override: Arc::new(AtomicUsize::new(0)),
40 }
41 }
42
43 pub fn scheduler(&self) -> &Arc<TestScheduler> {
44 &self.scheduler
45 }
46
47 pub fn session_id(&self) -> SessionId {
48 self.session_id
49 }
50
51 pub fn drain_tasks(&self) {
52 self.scheduler.drain_tasks();
53 }
54
55 pub fn advance_clock(&self, by: Duration) {
56 self.scheduler.advance_clock(by);
57 }
58
59 pub fn advance_clock_to_next_timer(&self) -> bool {
60 self.scheduler.advance_clock_to_next_timer()
61 }
62
63 pub fn simulate_random_delay(&self) -> Yield {
64 self.scheduler.yield_random()
65 }
66
67 pub fn tick(&self, background_only: bool) -> bool {
68 if background_only {
69 self.scheduler.tick_background_only()
70 } else {
71 self.scheduler.tick()
72 }
73 }
74
75 pub fn run_until_parked(&self) {
76 while self.tick(false) {}
77 }
78
79 /// Override the value returned by `BackgroundExecutor::num_cpus()` in tests.
80 /// A value of 0 means no override (the default of 4 is used).
81 pub fn set_num_cpus(&self, count: usize) {
82 self.num_cpus_override.store(count, Ordering::SeqCst);
83 }
84
85 /// Returns the overridden CPU count, or `None` if no override is set.
86 pub fn num_cpus_override(&self) -> Option<usize> {
87 match self.num_cpus_override.load(Ordering::SeqCst) {
88 0 => None,
89 n => Some(n),
90 }
91 }
92}
93
94impl Clone for TestDispatcher {
95 fn clone(&self) -> Self {
96 let session_id = self.scheduler.allocate_session_id();
97 Self {
98 session_id,
99 scheduler: self.scheduler.clone(),
100 num_cpus_override: self.num_cpus_override.clone(),
101 }
102 }
103}
104
105impl PlatformDispatcher for TestDispatcher {
106 fn get_all_timings(&self) -> Vec<crate::ThreadTaskTimings> {
107 Vec::new()
108 }
109
110 fn get_current_thread_timings(&self) -> crate::ThreadTaskTimings {
111 crate::ThreadTaskTimings {
112 thread_name: None,
113 thread_id: std::thread::current().id(),
114 timings: Vec::new(),
115 total_pushed: 0,
116 }
117 }
118
119 fn is_main_thread(&self) -> bool {
120 self.scheduler.is_main_thread()
121 }
122
123 fn now(&self) -> Instant {
124 self.scheduler.clock().now()
125 }
126
127 fn dispatch(&self, runnable: RunnableVariant, priority: Priority) {
128 self.scheduler
129 .schedule_background_with_priority(runnable, priority);
130 }
131
132 fn dispatch_on_main_thread(&self, runnable: RunnableVariant, _priority: Priority) {
133 self.scheduler
134 .schedule_foreground(self.session_id, runnable);
135 }
136
137 fn dispatch_after(&self, _duration: Duration, _runnable: RunnableVariant) {
138 panic!(
139 "dispatch_after should not be called in tests. \
140 Use BackgroundExecutor::timer() which uses the scheduler's native timer."
141 );
142 }
143
144 fn as_test(&self) -> Option<&TestDispatcher> {
145 Some(self)
146 }
147
148 fn spawn_realtime(&self, f: Box<dyn FnOnce() + Send>) {
149 std::thread::spawn(move || {
150 f();
151 });
152 }
153}