1use super::*;
2use futures::{
3 FutureExt,
4 channel::{mpsc, oneshot},
5 executor::block_on,
6 future,
7 sink::SinkExt,
8 stream::{FuturesUnordered, StreamExt},
9};
10use std::{
11 cell::RefCell,
12 collections::{BTreeSet, HashSet},
13 pin::Pin,
14 rc::Rc,
15 sync::Arc,
16 task::{Context, Poll},
17};
18
19#[test]
20fn test_foreground_executor_spawn() {
21 let result = TestScheduler::once(async |scheduler| {
22 let task = scheduler.foreground().spawn(async move { 42 });
23 task.await
24 });
25 assert_eq!(result, 42);
26}
27
28#[test]
29fn test_background_executor_spawn() {
30 TestScheduler::once(async |scheduler| {
31 let task = scheduler.background().spawn(async move { 42 });
32 let result = task.await;
33 assert_eq!(result, 42);
34 });
35}
36
37#[test]
38fn test_foreground_ordering() {
39 let mut traces = HashSet::new();
40
41 TestScheduler::many(100, async |scheduler| {
42 #[derive(Hash, PartialEq, Eq)]
43 struct TraceEntry {
44 session: usize,
45 task: usize,
46 }
47
48 let trace = Rc::new(RefCell::new(Vec::new()));
49
50 let foreground_1 = scheduler.foreground();
51 for task in 0..10 {
52 foreground_1
53 .spawn({
54 let trace = trace.clone();
55 async move {
56 trace.borrow_mut().push(TraceEntry { session: 0, task });
57 }
58 })
59 .detach();
60 }
61
62 let foreground_2 = scheduler.foreground();
63 for task in 0..10 {
64 foreground_2
65 .spawn({
66 let trace = trace.clone();
67 async move {
68 trace.borrow_mut().push(TraceEntry { session: 1, task });
69 }
70 })
71 .detach();
72 }
73
74 scheduler.run();
75
76 assert_eq!(
77 trace
78 .borrow()
79 .iter()
80 .filter(|entry| entry.session == 0)
81 .map(|entry| entry.task)
82 .collect::<Vec<_>>(),
83 (0..10).collect::<Vec<_>>()
84 );
85 assert_eq!(
86 trace
87 .borrow()
88 .iter()
89 .filter(|entry| entry.session == 1)
90 .map(|entry| entry.task)
91 .collect::<Vec<_>>(),
92 (0..10).collect::<Vec<_>>()
93 );
94
95 traces.insert(trace.take());
96 });
97
98 assert!(traces.len() > 1, "Expected at least two traces");
99}
100
101#[test]
102fn test_timer_ordering() {
103 TestScheduler::many(1, async |scheduler| {
104 let background = scheduler.background();
105 let futures = FuturesUnordered::new();
106 futures.push(
107 async {
108 background.timer(Duration::from_millis(100)).await;
109 2
110 }
111 .boxed(),
112 );
113 futures.push(
114 async {
115 background.timer(Duration::from_millis(50)).await;
116 1
117 }
118 .boxed(),
119 );
120 futures.push(
121 async {
122 background.timer(Duration::from_millis(150)).await;
123 3
124 }
125 .boxed(),
126 );
127 assert_eq!(futures.collect::<Vec<_>>().await, vec![1, 2, 3]);
128 });
129}
130
131#[test]
132fn test_send_from_bg_to_fg() {
133 TestScheduler::once(async |scheduler| {
134 let foreground = scheduler.foreground();
135 let background = scheduler.background();
136
137 let (sender, receiver) = oneshot::channel::<i32>();
138
139 background
140 .spawn(async move {
141 sender.send(42).unwrap();
142 })
143 .detach();
144
145 let task = foreground.spawn(async move { receiver.await.unwrap() });
146 let result = task.await;
147 assert_eq!(result, 42);
148 });
149}
150
151#[test]
152fn test_randomize_order() {
153 // Test deterministic mode: different seeds should produce same execution order
154 let mut deterministic_results = HashSet::new();
155 for seed in 0..10 {
156 let config = TestSchedulerConfig {
157 seed,
158 randomize_order: false,
159 ..Default::default()
160 };
161 let order = block_on(capture_execution_order(config));
162 assert_eq!(order.len(), 6);
163 deterministic_results.insert(order);
164 }
165
166 // All deterministic runs should produce the same result
167 assert_eq!(
168 deterministic_results.len(),
169 1,
170 "Deterministic mode should always produce same execution order"
171 );
172
173 // Test randomized mode: different seeds can produce different execution orders
174 let mut randomized_results = HashSet::new();
175 for seed in 0..20 {
176 let config = TestSchedulerConfig::with_seed(seed);
177 let order = block_on(capture_execution_order(config));
178 assert_eq!(order.len(), 6);
179 randomized_results.insert(order);
180 }
181
182 // Randomized mode should produce multiple different execution orders
183 assert!(
184 randomized_results.len() > 1,
185 "Randomized mode should produce multiple different orders"
186 );
187}
188
189async fn capture_execution_order(config: TestSchedulerConfig) -> Vec<String> {
190 let scheduler = Arc::new(TestScheduler::new(config));
191 let foreground = scheduler.foreground();
192 let background = scheduler.background();
193
194 let (sender, receiver) = mpsc::unbounded::<String>();
195
196 // Spawn foreground tasks
197 for i in 0..3 {
198 let mut sender = sender.clone();
199 foreground
200 .spawn(async move {
201 sender.send(format!("fg-{}", i)).await.ok();
202 })
203 .detach();
204 }
205
206 // Spawn background tasks
207 for i in 0..3 {
208 let mut sender = sender.clone();
209 background
210 .spawn(async move {
211 sender.send(format!("bg-{}", i)).await.ok();
212 })
213 .detach();
214 }
215
216 drop(sender); // Close sender to signal no more messages
217 scheduler.run();
218
219 receiver.collect().await
220}
221
222#[test]
223fn test_block() {
224 let scheduler = Arc::new(TestScheduler::new(TestSchedulerConfig::default()));
225 let (tx, rx) = oneshot::channel();
226
227 // Spawn background task to send value
228 let _ = scheduler
229 .background()
230 .spawn(async move {
231 tx.send(42).unwrap();
232 })
233 .detach();
234
235 // Block on receiving the value
236 let result = scheduler.foreground().block_on(async { rx.await.unwrap() });
237 assert_eq!(result, 42);
238}
239
240#[test]
241#[should_panic(expected = "futures_channel::oneshot::Inner")]
242fn test_parking_panics() {
243 let config = TestSchedulerConfig {
244 capture_pending_traces: true,
245 ..Default::default()
246 };
247 let scheduler = Arc::new(TestScheduler::new(config));
248 scheduler.foreground().block_on(async {
249 let (_tx, rx) = oneshot::channel::<()>();
250 rx.await.unwrap(); // This will never complete
251 });
252}
253
254#[test]
255fn test_block_with_parking() {
256 let config = TestSchedulerConfig {
257 allow_parking: true,
258 ..Default::default()
259 };
260 let scheduler = Arc::new(TestScheduler::new(config));
261 let (tx, rx) = oneshot::channel();
262
263 // Spawn background task to send value
264 let _ = scheduler
265 .background()
266 .spawn(async move {
267 tx.send(42).unwrap();
268 })
269 .detach();
270
271 // Block on receiving the value (will park if needed)
272 let result = scheduler.foreground().block_on(async { rx.await.unwrap() });
273 assert_eq!(result, 42);
274}
275
276#[test]
277fn test_helper_methods() {
278 // Test the once method
279 let result = TestScheduler::once(async |scheduler: Arc<TestScheduler>| {
280 let background = scheduler.background();
281 background.spawn(async { 42 }).await
282 });
283 assert_eq!(result, 42);
284
285 // Test the many method
286 let results = TestScheduler::many(3, async |scheduler: Arc<TestScheduler>| {
287 let background = scheduler.background();
288 background.spawn(async { 10 }).await
289 });
290 assert_eq!(results, vec![10, 10, 10]);
291
292 // Test the with_seed method
293 let result = TestScheduler::with_seed(123, async |scheduler: Arc<TestScheduler>| {
294 let background = scheduler.background();
295
296 // Spawn a background task and wait for its result
297 let task = background.spawn(async { 99 });
298 task.await
299 });
300 assert_eq!(result, 99);
301}
302
303#[test]
304fn test_block_with_timeout() {
305 // Test case: future completes within timeout
306 TestScheduler::once(async |scheduler| {
307 let foreground = scheduler.foreground();
308 let future = future::ready(42);
309 let output = foreground.block_with_timeout(Duration::from_millis(100), future);
310 assert_eq!(output.unwrap(), 42);
311 });
312
313 // Test case: future times out
314 TestScheduler::once(async |scheduler| {
315 let foreground = scheduler.foreground();
316 let future = future::pending::<()>();
317 let output = foreground.block_with_timeout(Duration::from_millis(50), future);
318 let _ = output.expect_err("future should not have finished");
319 });
320
321 // Test case: future makes progress via timer but still times out
322 let mut results = BTreeSet::new();
323 TestScheduler::many(100, async |scheduler| {
324 let task = scheduler.background().spawn(async move {
325 Yield { polls: 10 }.await;
326 42
327 });
328 let output = scheduler
329 .foreground()
330 .block_with_timeout(Duration::from_millis(50), task);
331 results.insert(output.ok());
332 });
333 assert_eq!(
334 results.into_iter().collect::<Vec<_>>(),
335 vec![None, Some(42)]
336 );
337}
338
339// When calling block, we shouldn't make progress on foreground-spawned futures with the same session id.
340#[test]
341fn test_block_does_not_progress_same_session_foreground() {
342 let mut task2_made_progress_once = false;
343 TestScheduler::many(1000, async |scheduler| {
344 let foreground1 = scheduler.foreground();
345 let foreground2 = scheduler.foreground();
346
347 let task1 = foreground1.spawn(async move {});
348 let task2 = foreground2.spawn(async move {});
349
350 foreground1.block_on(async {
351 scheduler.yield_random().await;
352 assert!(!task1.is_ready());
353 task2_made_progress_once |= task2.is_ready();
354 });
355
356 task1.await;
357 task2.await;
358 });
359
360 assert!(
361 task2_made_progress_once,
362 "Expected task from different foreground executor to make progress (at least once)"
363 );
364}
365
366struct Yield {
367 polls: usize,
368}
369
370impl Future for Yield {
371 type Output = ();
372
373 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
374 self.polls -= 1;
375 if self.polls == 0 {
376 Poll::Ready(())
377 } else {
378 cx.waker().wake_by_ref();
379 Poll::Pending
380 }
381 }
382}