1use super::*;
2use futures::{
3 FutureExt,
4 channel::{mpsc, oneshot},
5 executor::block_on,
6 future,
7 sink::SinkExt,
8 stream::{FuturesUnordered, StreamExt},
9};
10use std::{
11 cell::RefCell,
12 collections::{BTreeSet, HashSet},
13 pin::Pin,
14 rc::Rc,
15 sync::Arc,
16 task::{Context, Poll, Waker},
17};
18
19#[test]
20fn test_foreground_executor_spawn() {
21 let result = TestScheduler::once(async |scheduler| {
22 let task = scheduler.foreground().spawn(async move { 42 });
23 task.await
24 });
25 assert_eq!(result, 42);
26}
27
28#[test]
29fn test_background_executor_spawn() {
30 TestScheduler::once(async |scheduler| {
31 let task = scheduler.background().spawn(async move { 42 });
32 let result = task.await;
33 assert_eq!(result, 42);
34 });
35}
36
37#[test]
38fn test_foreground_ordering() {
39 let mut traces = HashSet::new();
40
41 TestScheduler::many(100, async |scheduler| {
42 #[derive(Hash, PartialEq, Eq)]
43 struct TraceEntry {
44 session: usize,
45 task: usize,
46 }
47
48 let trace = Rc::new(RefCell::new(Vec::new()));
49
50 let foreground_1 = scheduler.foreground();
51 for task in 0..10 {
52 foreground_1
53 .spawn({
54 let trace = trace.clone();
55 async move {
56 trace.borrow_mut().push(TraceEntry { session: 0, task });
57 }
58 })
59 .detach();
60 }
61
62 let foreground_2 = scheduler.foreground();
63 for task in 0..10 {
64 foreground_2
65 .spawn({
66 let trace = trace.clone();
67 async move {
68 trace.borrow_mut().push(TraceEntry { session: 1, task });
69 }
70 })
71 .detach();
72 }
73
74 scheduler.run();
75
76 assert_eq!(
77 trace
78 .borrow()
79 .iter()
80 .filter(|entry| entry.session == 0)
81 .map(|entry| entry.task)
82 .collect::<Vec<_>>(),
83 (0..10).collect::<Vec<_>>()
84 );
85 assert_eq!(
86 trace
87 .borrow()
88 .iter()
89 .filter(|entry| entry.session == 1)
90 .map(|entry| entry.task)
91 .collect::<Vec<_>>(),
92 (0..10).collect::<Vec<_>>()
93 );
94
95 traces.insert(trace.take());
96 });
97
98 assert!(traces.len() > 1, "Expected at least two traces");
99}
100
101#[test]
102fn test_timer_ordering() {
103 TestScheduler::many(1, async |scheduler| {
104 let background = scheduler.background();
105 let futures = FuturesUnordered::new();
106 futures.push(
107 async {
108 background.timer(Duration::from_millis(100)).await;
109 2
110 }
111 .boxed(),
112 );
113 futures.push(
114 async {
115 background.timer(Duration::from_millis(50)).await;
116 1
117 }
118 .boxed(),
119 );
120 futures.push(
121 async {
122 background.timer(Duration::from_millis(150)).await;
123 3
124 }
125 .boxed(),
126 );
127 assert_eq!(futures.collect::<Vec<_>>().await, vec![1, 2, 3]);
128 });
129}
130
131#[test]
132fn test_send_from_bg_to_fg() {
133 TestScheduler::once(async |scheduler| {
134 let foreground = scheduler.foreground();
135 let background = scheduler.background();
136
137 let (sender, receiver) = oneshot::channel::<i32>();
138
139 background
140 .spawn(async move {
141 sender.send(42).unwrap();
142 })
143 .detach();
144
145 let task = foreground.spawn(async move { receiver.await.unwrap() });
146 let result = task.await;
147 assert_eq!(result, 42);
148 });
149}
150
151#[test]
152fn test_randomize_order() {
153 // Test deterministic mode: different seeds should produce same execution order
154 let mut deterministic_results = HashSet::new();
155 for seed in 0..10 {
156 let config = TestSchedulerConfig {
157 seed,
158 randomize_order: false,
159 ..Default::default()
160 };
161 let order = block_on(capture_execution_order(config));
162 assert_eq!(order.len(), 6);
163 deterministic_results.insert(order);
164 }
165
166 // All deterministic runs should produce the same result
167 assert_eq!(
168 deterministic_results.len(),
169 1,
170 "Deterministic mode should always produce same execution order"
171 );
172
173 // Test randomized mode: different seeds can produce different execution orders
174 let mut randomized_results = HashSet::new();
175 for seed in 0..20 {
176 let config = TestSchedulerConfig::with_seed(seed);
177 let order = block_on(capture_execution_order(config));
178 assert_eq!(order.len(), 6);
179 randomized_results.insert(order);
180 }
181
182 // Randomized mode should produce multiple different execution orders
183 assert!(
184 randomized_results.len() > 1,
185 "Randomized mode should produce multiple different orders"
186 );
187}
188
189async fn capture_execution_order(config: TestSchedulerConfig) -> Vec<String> {
190 let scheduler = Arc::new(TestScheduler::new(config));
191 let foreground = scheduler.foreground();
192 let background = scheduler.background();
193
194 let (sender, receiver) = mpsc::unbounded::<String>();
195
196 // Spawn foreground tasks
197 for i in 0..3 {
198 let mut sender = sender.clone();
199 foreground
200 .spawn(async move {
201 sender.send(format!("fg-{}", i)).await.ok();
202 })
203 .detach();
204 }
205
206 // Spawn background tasks
207 for i in 0..3 {
208 let mut sender = sender.clone();
209 background
210 .spawn(async move {
211 sender.send(format!("bg-{}", i)).await.ok();
212 })
213 .detach();
214 }
215
216 drop(sender); // Close sender to signal no more messages
217 scheduler.run();
218
219 receiver.collect().await
220}
221
222#[test]
223fn test_block() {
224 let scheduler = Arc::new(TestScheduler::new(TestSchedulerConfig::default()));
225 let (tx, rx) = oneshot::channel();
226
227 // Spawn background task to send value
228 let _ = scheduler
229 .background()
230 .spawn(async move {
231 tx.send(42).unwrap();
232 })
233 .detach();
234
235 // Block on receiving the value
236 let result = scheduler.foreground().block_on(async { rx.await.unwrap() });
237 assert_eq!(result, 42);
238}
239
240#[test]
241#[should_panic(expected = "Parking forbidden. Pending traces:")]
242fn test_parking_panics() {
243 let config = TestSchedulerConfig {
244 capture_pending_traces: true,
245 ..Default::default()
246 };
247 let scheduler = Arc::new(TestScheduler::new(config));
248 scheduler.foreground().block_on(async {
249 let (_tx, rx) = oneshot::channel::<()>();
250 rx.await.unwrap(); // This will never complete
251 });
252}
253
254#[test]
255fn test_block_with_parking() {
256 let config = TestSchedulerConfig {
257 allow_parking: true,
258 ..Default::default()
259 };
260 let scheduler = Arc::new(TestScheduler::new(config));
261 let (tx, rx) = oneshot::channel();
262
263 // Spawn background task to send value
264 let _ = scheduler
265 .background()
266 .spawn(async move {
267 tx.send(42).unwrap();
268 })
269 .detach();
270
271 // Block on receiving the value (will park if needed)
272 let result = scheduler.foreground().block_on(async { rx.await.unwrap() });
273 assert_eq!(result, 42);
274}
275
276#[test]
277fn test_helper_methods() {
278 // Test the once method
279 let result = TestScheduler::once(async |scheduler: Arc<TestScheduler>| {
280 let background = scheduler.background();
281 background.spawn(async { 42 }).await
282 });
283 assert_eq!(result, 42);
284
285 // Test the many method
286 let results = TestScheduler::many(3, async |scheduler: Arc<TestScheduler>| {
287 let background = scheduler.background();
288 background.spawn(async { 10 }).await
289 });
290 assert_eq!(results, vec![10, 10, 10]);
291}
292
293#[test]
294fn test_many_with_arbitrary_seed() {
295 for seed in [0u64, 1, 5, 42] {
296 let mut seeds_seen = Vec::new();
297 let iterations = 3usize;
298
299 for current_seed in seed..seed + iterations as u64 {
300 let scheduler = Arc::new(TestScheduler::new(TestSchedulerConfig::with_seed(
301 current_seed,
302 )));
303 let captured_seed = current_seed;
304 scheduler
305 .foreground()
306 .block_on(async { seeds_seen.push(captured_seed) });
307 scheduler.run();
308 }
309
310 assert_eq!(
311 seeds_seen,
312 (seed..seed + iterations as u64).collect::<Vec<_>>(),
313 "Expected {iterations} iterations starting at seed {seed}"
314 );
315 }
316}
317
318#[test]
319fn test_block_with_timeout() {
320 // Test case: future completes within timeout
321 TestScheduler::once(async |scheduler| {
322 let foreground = scheduler.foreground();
323 let future = future::ready(42);
324 let output = foreground.block_with_timeout(Duration::from_millis(100), future);
325 assert_eq!(output.ok(), Some(42));
326 });
327
328 // Test case: future times out
329 TestScheduler::once(async |scheduler| {
330 // Make timeout behavior deterministic by forcing the timeout tick budget to be exactly 0.
331 // This prevents `block_with_timeout` from making progress via extra scheduler stepping and
332 // accidentally completing work that we expect to time out.
333 scheduler.set_timeout_ticks(0..=0);
334
335 let foreground = scheduler.foreground();
336 let future = future::pending::<()>();
337 let output = foreground.block_with_timeout(Duration::from_millis(50), future);
338 assert!(output.is_err(), "future should not have finished");
339 });
340
341 // Test case: future makes progress via timer but still times out
342 let mut results = BTreeSet::new();
343 TestScheduler::many(100, async |scheduler| {
344 // Keep the existing probabilistic behavior here (do not force 0 ticks), since this subtest
345 // is explicitly checking that some seeds/timeouts can complete while others can time out.
346 let task = scheduler.background().spawn(async move {
347 Yield { polls: 10 }.await;
348 42
349 });
350 let output = scheduler
351 .foreground()
352 .block_with_timeout(Duration::from_millis(50), task);
353 results.insert(output.ok());
354 });
355 assert_eq!(
356 results.into_iter().collect::<Vec<_>>(),
357 vec![None, Some(42)]
358 );
359
360 // Regression test:
361 // A timed-out future must not be cancelled. The returned future should still be
362 // pollable to completion later. We also want to ensure time only advances when we
363 // explicitly advance it (not by yielding).
364 TestScheduler::once(async |scheduler| {
365 // Force immediate timeout: the timeout tick budget is 0 so we will not step or
366 // advance timers inside `block_with_timeout`.
367 scheduler.set_timeout_ticks(0..=0);
368
369 let background = scheduler.background();
370
371 // This task should only complete once time is explicitly advanced.
372 let task = background.spawn({
373 let scheduler = scheduler.clone();
374 async move {
375 scheduler.timer(Duration::from_millis(100)).await;
376 123
377 }
378 });
379
380 // This should time out before we advance time enough for the timer to fire.
381 let timed_out = scheduler
382 .foreground()
383 .block_with_timeout(Duration::from_millis(50), task);
384 assert!(
385 timed_out.is_err(),
386 "expected timeout before advancing the clock enough for the timer"
387 );
388
389 // Now explicitly advance time and ensure the returned future can complete.
390 let mut task = timed_out.err().unwrap();
391 scheduler.advance_clock(Duration::from_millis(100));
392 scheduler.run();
393
394 let output = scheduler.foreground().block_on(&mut task);
395 assert_eq!(output, 123);
396 });
397}
398
399// When calling block, we shouldn't make progress on foreground-spawned futures with the same session id.
400#[test]
401fn test_block_does_not_progress_same_session_foreground() {
402 let mut task2_made_progress_once = false;
403 TestScheduler::many(1000, async |scheduler| {
404 let foreground1 = scheduler.foreground();
405 let foreground2 = scheduler.foreground();
406
407 let task1 = foreground1.spawn(async move {});
408 let task2 = foreground2.spawn(async move {});
409
410 foreground1.block_on(async {
411 scheduler.yield_random().await;
412 assert!(!task1.is_ready());
413 task2_made_progress_once |= task2.is_ready();
414 });
415
416 task1.await;
417 task2.await;
418 });
419
420 assert!(
421 task2_made_progress_once,
422 "Expected task from different foreground executor to make progress (at least once)"
423 );
424}
425
426struct Yield {
427 polls: usize,
428}
429
430impl Future for Yield {
431 type Output = ();
432
433 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
434 self.polls -= 1;
435 if self.polls == 0 {
436 Poll::Ready(())
437 } else {
438 cx.waker().wake_by_ref();
439 Poll::Pending
440 }
441 }
442}
443
444#[test]
445fn test_nondeterministic_wake_detection() {
446 let config = TestSchedulerConfig {
447 allow_parking: false,
448 ..Default::default()
449 };
450 let scheduler = Arc::new(TestScheduler::new(config));
451
452 // A future that captures its waker and sends it to an external thread
453 struct SendWakerToThread {
454 waker_tx: Option<std::sync::mpsc::Sender<Waker>>,
455 }
456
457 impl Future for SendWakerToThread {
458 type Output = ();
459
460 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
461 if let Some(tx) = self.waker_tx.take() {
462 tx.send(cx.waker().clone()).ok();
463 }
464 Poll::Ready(())
465 }
466 }
467
468 let (waker_tx, waker_rx) = std::sync::mpsc::channel::<Waker>();
469
470 // Get a waker by running a future that sends it
471 scheduler.foreground().block_on(SendWakerToThread {
472 waker_tx: Some(waker_tx),
473 });
474
475 // Spawn a real OS thread that will call wake() on the waker
476 let handle = std::thread::spawn(move || {
477 if let Ok(waker) = waker_rx.recv() {
478 // This should trigger the non-determinism detection
479 waker.wake();
480 }
481 });
482
483 // Wait for the spawned thread to complete
484 handle.join().ok();
485
486 // The non-determinism error should be detected when end_test is called
487 let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
488 scheduler.end_test();
489 }));
490 assert!(result.is_err(), "Expected end_test to panic");
491 let panic_payload = result.unwrap_err();
492 let panic_message = panic_payload
493 .downcast_ref::<String>()
494 .map(|s| s.as_str())
495 .or_else(|| panic_payload.downcast_ref::<&str>().copied())
496 .unwrap_or("<unknown panic>");
497 assert!(
498 panic_message.contains("Your test is not deterministic"),
499 "Expected panic message to contain non-determinism error, got: {}",
500 panic_message
501 );
502}
503
504#[test]
505fn test_nondeterministic_wake_allowed_with_parking() {
506 let config = TestSchedulerConfig {
507 allow_parking: true,
508 ..Default::default()
509 };
510 let scheduler = Arc::new(TestScheduler::new(config));
511
512 // A future that captures its waker and sends it to an external thread
513 struct WakeFromExternalThread {
514 waker_sent: bool,
515 waker_tx: Option<std::sync::mpsc::Sender<Waker>>,
516 }
517
518 impl Future for WakeFromExternalThread {
519 type Output = ();
520
521 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
522 if !self.waker_sent {
523 self.waker_sent = true;
524 if let Some(tx) = self.waker_tx.take() {
525 tx.send(cx.waker().clone()).ok();
526 }
527 Poll::Pending
528 } else {
529 Poll::Ready(())
530 }
531 }
532 }
533
534 let (waker_tx, waker_rx) = std::sync::mpsc::channel::<Waker>();
535
536 // Spawn a real OS thread that will call wake() on the waker
537 std::thread::spawn(move || {
538 if let Ok(waker) = waker_rx.recv() {
539 // With allow_parking, this should NOT panic
540 waker.wake();
541 }
542 });
543
544 // This should complete without panicking
545 scheduler.foreground().block_on(WakeFromExternalThread {
546 waker_sent: false,
547 waker_tx: Some(waker_tx),
548 });
549}
550
551#[test]
552fn test_nondeterministic_waker_drop_detection() {
553 let config = TestSchedulerConfig {
554 allow_parking: false,
555 ..Default::default()
556 };
557 let scheduler = Arc::new(TestScheduler::new(config));
558
559 // A future that captures its waker and sends it to an external thread
560 struct SendWakerToThread {
561 waker_tx: Option<std::sync::mpsc::Sender<Waker>>,
562 }
563
564 impl Future for SendWakerToThread {
565 type Output = ();
566
567 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
568 if let Some(tx) = self.waker_tx.take() {
569 tx.send(cx.waker().clone()).ok();
570 }
571 Poll::Ready(())
572 }
573 }
574
575 let (waker_tx, waker_rx) = std::sync::mpsc::channel::<Waker>();
576
577 // Get a waker by running a future that sends it
578 scheduler.foreground().block_on(SendWakerToThread {
579 waker_tx: Some(waker_tx),
580 });
581
582 // Spawn a real OS thread that will drop the waker without calling wake
583 let handle = std::thread::spawn(move || {
584 if let Ok(waker) = waker_rx.recv() {
585 // This should trigger the non-determinism detection on drop
586 drop(waker);
587 }
588 });
589
590 // Wait for the spawned thread to complete
591 handle.join().ok();
592
593 // The non-determinism error should be detected when end_test is called
594 let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
595 scheduler.end_test();
596 }));
597 assert!(result.is_err(), "Expected end_test to panic");
598 let panic_payload = result.unwrap_err();
599 let panic_message = panic_payload
600 .downcast_ref::<String>()
601 .map(|s| s.as_str())
602 .or_else(|| panic_payload.downcast_ref::<&str>().copied())
603 .unwrap_or("<unknown panic>");
604 assert!(
605 panic_message.contains("Your test is not deterministic"),
606 "Expected panic message to contain non-determinism error, got: {}",
607 panic_message
608 );
609}
610
611#[test]
612fn test_background_priority_scheduling() {
613 use parking_lot::Mutex;
614
615 // Run many iterations to get statistical significance
616 let mut high_before_low_count = 0;
617 let iterations = 100;
618
619 for seed in 0..iterations {
620 let config = TestSchedulerConfig::with_seed(seed);
621 let scheduler = Arc::new(TestScheduler::new(config));
622 let background = scheduler.background();
623
624 let execution_order = Arc::new(Mutex::new(Vec::new()));
625
626 // Spawn low priority tasks first
627 for i in 0..3 {
628 let order = execution_order.clone();
629 background
630 .spawn_with_priority(Priority::Low, async move {
631 order.lock().push(format!("low-{}", i));
632 })
633 .detach();
634 }
635
636 // Spawn high priority tasks second
637 for i in 0..3 {
638 let order = execution_order.clone();
639 background
640 .spawn_with_priority(Priority::High, async move {
641 order.lock().push(format!("high-{}", i));
642 })
643 .detach();
644 }
645
646 scheduler.run();
647
648 // Count how many high priority tasks ran in the first half
649 let order = execution_order.lock();
650 let high_in_first_half = order
651 .iter()
652 .take(3)
653 .filter(|s| s.starts_with("high"))
654 .count();
655
656 if high_in_first_half >= 2 {
657 high_before_low_count += 1;
658 }
659 }
660
661 // High priority tasks should tend to run before low priority tasks
662 // With weights of 60 vs 10, high priority should dominate early execution
663 assert!(
664 high_before_low_count > iterations / 2,
665 "Expected high priority tasks to run before low priority tasks more often. \
666 Got {} out of {} iterations",
667 high_before_low_count,
668 iterations
669 );
670}