tests.rs

  1use super::*;
  2use futures::{
  3    FutureExt,
  4    channel::{mpsc, oneshot},
  5    executor::block_on,
  6    future,
  7    sink::SinkExt,
  8    stream::{FuturesUnordered, StreamExt},
  9};
 10use std::{
 11    cell::RefCell,
 12    collections::{BTreeSet, HashSet},
 13    pin::Pin,
 14    rc::Rc,
 15    sync::Arc,
 16    task::{Context, Poll, Waker},
 17};
 18
 19#[test]
 20fn test_foreground_executor_spawn() {
 21    let result = TestScheduler::once(async |scheduler| {
 22        let task = scheduler.foreground().spawn(async move { 42 });
 23        task.await
 24    });
 25    assert_eq!(result, 42);
 26}
 27
 28#[test]
 29fn test_background_executor_spawn() {
 30    TestScheduler::once(async |scheduler| {
 31        let task = scheduler.background().spawn(async move { 42 });
 32        let result = task.await;
 33        assert_eq!(result, 42);
 34    });
 35}
 36
 37#[test]
 38fn test_foreground_ordering() {
 39    let mut traces = HashSet::new();
 40
 41    TestScheduler::many(100, async |scheduler| {
 42        #[derive(Hash, PartialEq, Eq)]
 43        struct TraceEntry {
 44            session: usize,
 45            task: usize,
 46        }
 47
 48        let trace = Rc::new(RefCell::new(Vec::new()));
 49
 50        let foreground_1 = scheduler.foreground();
 51        for task in 0..10 {
 52            foreground_1
 53                .spawn({
 54                    let trace = trace.clone();
 55                    async move {
 56                        trace.borrow_mut().push(TraceEntry { session: 0, task });
 57                    }
 58                })
 59                .detach();
 60        }
 61
 62        let foreground_2 = scheduler.foreground();
 63        for task in 0..10 {
 64            foreground_2
 65                .spawn({
 66                    let trace = trace.clone();
 67                    async move {
 68                        trace.borrow_mut().push(TraceEntry { session: 1, task });
 69                    }
 70                })
 71                .detach();
 72        }
 73
 74        scheduler.run();
 75
 76        assert_eq!(
 77            trace
 78                .borrow()
 79                .iter()
 80                .filter(|entry| entry.session == 0)
 81                .map(|entry| entry.task)
 82                .collect::<Vec<_>>(),
 83            (0..10).collect::<Vec<_>>()
 84        );
 85        assert_eq!(
 86            trace
 87                .borrow()
 88                .iter()
 89                .filter(|entry| entry.session == 1)
 90                .map(|entry| entry.task)
 91                .collect::<Vec<_>>(),
 92            (0..10).collect::<Vec<_>>()
 93        );
 94
 95        traces.insert(trace.take());
 96    });
 97
 98    assert!(traces.len() > 1, "Expected at least two traces");
 99}
100
101#[test]
102fn test_timer_ordering() {
103    TestScheduler::many(1, async |scheduler| {
104        let background = scheduler.background();
105        let futures = FuturesUnordered::new();
106        futures.push(
107            async {
108                background.timer(Duration::from_millis(100)).await;
109                2
110            }
111            .boxed(),
112        );
113        futures.push(
114            async {
115                background.timer(Duration::from_millis(50)).await;
116                1
117            }
118            .boxed(),
119        );
120        futures.push(
121            async {
122                background.timer(Duration::from_millis(150)).await;
123                3
124            }
125            .boxed(),
126        );
127        assert_eq!(futures.collect::<Vec<_>>().await, vec![1, 2, 3]);
128    });
129}
130
131#[test]
132fn test_send_from_bg_to_fg() {
133    TestScheduler::once(async |scheduler| {
134        let foreground = scheduler.foreground();
135        let background = scheduler.background();
136
137        let (sender, receiver) = oneshot::channel::<i32>();
138
139        background
140            .spawn(async move {
141                sender.send(42).unwrap();
142            })
143            .detach();
144
145        let task = foreground.spawn(async move { receiver.await.unwrap() });
146        let result = task.await;
147        assert_eq!(result, 42);
148    });
149}
150
151#[test]
152fn test_randomize_order() {
153    // Test deterministic mode: different seeds should produce same execution order
154    let mut deterministic_results = HashSet::new();
155    for seed in 0..10 {
156        let config = TestSchedulerConfig {
157            seed,
158            randomize_order: false,
159            ..Default::default()
160        };
161        let order = block_on(capture_execution_order(config));
162        assert_eq!(order.len(), 6);
163        deterministic_results.insert(order);
164    }
165
166    // All deterministic runs should produce the same result
167    assert_eq!(
168        deterministic_results.len(),
169        1,
170        "Deterministic mode should always produce same execution order"
171    );
172
173    // Test randomized mode: different seeds can produce different execution orders
174    let mut randomized_results = HashSet::new();
175    for seed in 0..20 {
176        let config = TestSchedulerConfig::with_seed(seed);
177        let order = block_on(capture_execution_order(config));
178        assert_eq!(order.len(), 6);
179        randomized_results.insert(order);
180    }
181
182    // Randomized mode should produce multiple different execution orders
183    assert!(
184        randomized_results.len() > 1,
185        "Randomized mode should produce multiple different orders"
186    );
187}
188
189async fn capture_execution_order(config: TestSchedulerConfig) -> Vec<String> {
190    let scheduler = Arc::new(TestScheduler::new(config));
191    let foreground = scheduler.foreground();
192    let background = scheduler.background();
193
194    let (sender, receiver) = mpsc::unbounded::<String>();
195
196    // Spawn foreground tasks
197    for i in 0..3 {
198        let mut sender = sender.clone();
199        foreground
200            .spawn(async move {
201                sender.send(format!("fg-{}", i)).await.ok();
202            })
203            .detach();
204    }
205
206    // Spawn background tasks
207    for i in 0..3 {
208        let mut sender = sender.clone();
209        background
210            .spawn(async move {
211                sender.send(format!("bg-{}", i)).await.ok();
212            })
213            .detach();
214    }
215
216    drop(sender); // Close sender to signal no more messages
217    scheduler.run();
218
219    receiver.collect().await
220}
221
222#[test]
223fn test_block() {
224    let scheduler = Arc::new(TestScheduler::new(TestSchedulerConfig::default()));
225    let (tx, rx) = oneshot::channel();
226
227    // Spawn background task to send value
228    let _ = scheduler
229        .background()
230        .spawn(async move {
231            tx.send(42).unwrap();
232        })
233        .detach();
234
235    // Block on receiving the value
236    let result = scheduler.foreground().block_on(async { rx.await.unwrap() });
237    assert_eq!(result, 42);
238}
239
240#[test]
241#[should_panic(expected = "Parking forbidden. Pending traces:")]
242fn test_parking_panics() {
243    let config = TestSchedulerConfig {
244        capture_pending_traces: true,
245        ..Default::default()
246    };
247    let scheduler = Arc::new(TestScheduler::new(config));
248    scheduler.foreground().block_on(async {
249        let (_tx, rx) = oneshot::channel::<()>();
250        rx.await.unwrap(); // This will never complete
251    });
252}
253
254#[test]
255fn test_block_with_parking() {
256    let config = TestSchedulerConfig {
257        allow_parking: true,
258        ..Default::default()
259    };
260    let scheduler = Arc::new(TestScheduler::new(config));
261    let (tx, rx) = oneshot::channel();
262
263    // Spawn background task to send value
264    let _ = scheduler
265        .background()
266        .spawn(async move {
267            tx.send(42).unwrap();
268        })
269        .detach();
270
271    // Block on receiving the value (will park if needed)
272    let result = scheduler.foreground().block_on(async { rx.await.unwrap() });
273    assert_eq!(result, 42);
274}
275
276#[test]
277fn test_helper_methods() {
278    // Test the once method
279    let result = TestScheduler::once(async |scheduler: Arc<TestScheduler>| {
280        let background = scheduler.background();
281        background.spawn(async { 42 }).await
282    });
283    assert_eq!(result, 42);
284
285    // Test the many method
286    let results = TestScheduler::many(3, async |scheduler: Arc<TestScheduler>| {
287        let background = scheduler.background();
288        background.spawn(async { 10 }).await
289    });
290    assert_eq!(results, vec![10, 10, 10]);
291}
292
293#[test]
294fn test_many_with_arbitrary_seed() {
295    for seed in [0u64, 1, 5, 42] {
296        let mut seeds_seen = Vec::new();
297        let iterations = 3usize;
298
299        for current_seed in seed..seed + iterations as u64 {
300            let scheduler = Arc::new(TestScheduler::new(TestSchedulerConfig::with_seed(
301                current_seed,
302            )));
303            let captured_seed = current_seed;
304            scheduler
305                .foreground()
306                .block_on(async { seeds_seen.push(captured_seed) });
307            scheduler.run();
308        }
309
310        assert_eq!(
311            seeds_seen,
312            (seed..seed + iterations as u64).collect::<Vec<_>>(),
313            "Expected {iterations} iterations starting at seed {seed}"
314        );
315    }
316}
317
318#[test]
319fn test_block_with_timeout() {
320    // Test case: future completes within timeout
321    TestScheduler::once(async |scheduler| {
322        let foreground = scheduler.foreground();
323        let future = future::ready(42);
324        let output = foreground.block_with_timeout(Duration::from_millis(100), future);
325        assert_eq!(output.ok(), Some(42));
326    });
327
328    // Test case: future times out
329    TestScheduler::once(async |scheduler| {
330        // Make timeout behavior deterministic by forcing the timeout tick budget to be exactly 0.
331        // This prevents `block_with_timeout` from making progress via extra scheduler stepping and
332        // accidentally completing work that we expect to time out.
333        scheduler.set_timeout_ticks(0..=0);
334
335        let foreground = scheduler.foreground();
336        let future = future::pending::<()>();
337        let output = foreground.block_with_timeout(Duration::from_millis(50), future);
338        assert!(output.is_err(), "future should not have finished");
339    });
340
341    // Test case: future makes progress via timer but still times out
342    let mut results = BTreeSet::new();
343    TestScheduler::many(100, async |scheduler| {
344        // Keep the existing probabilistic behavior here (do not force 0 ticks), since this subtest
345        // is explicitly checking that some seeds/timeouts can complete while others can time out.
346        let task = scheduler.background().spawn(async move {
347            Yield { polls: 10 }.await;
348            42
349        });
350        let output = scheduler
351            .foreground()
352            .block_with_timeout(Duration::from_millis(50), task);
353        results.insert(output.ok());
354    });
355    assert_eq!(
356        results.into_iter().collect::<Vec<_>>(),
357        vec![None, Some(42)]
358    );
359
360    // Regression test:
361    // A timed-out future must not be cancelled. The returned future should still be
362    // pollable to completion later. We also want to ensure time only advances when we
363    // explicitly advance it (not by yielding).
364    TestScheduler::once(async |scheduler| {
365        // Force immediate timeout: the timeout tick budget is 0 so we will not step or
366        // advance timers inside `block_with_timeout`.
367        scheduler.set_timeout_ticks(0..=0);
368
369        let background = scheduler.background();
370
371        // This task should only complete once time is explicitly advanced.
372        let task = background.spawn({
373            let scheduler = scheduler.clone();
374            async move {
375                scheduler.timer(Duration::from_millis(100)).await;
376                123
377            }
378        });
379
380        // This should time out before we advance time enough for the timer to fire.
381        let timed_out = scheduler
382            .foreground()
383            .block_with_timeout(Duration::from_millis(50), task);
384        assert!(
385            timed_out.is_err(),
386            "expected timeout before advancing the clock enough for the timer"
387        );
388
389        // Now explicitly advance time and ensure the returned future can complete.
390        let mut task = timed_out.err().unwrap();
391        scheduler.advance_clock(Duration::from_millis(100));
392        scheduler.run();
393
394        let output = scheduler.foreground().block_on(&mut task);
395        assert_eq!(output, 123);
396    });
397}
398
399// When calling block, we shouldn't make progress on foreground-spawned futures with the same session id.
400#[test]
401fn test_block_does_not_progress_same_session_foreground() {
402    let mut task2_made_progress_once = false;
403    TestScheduler::many(1000, async |scheduler| {
404        let foreground1 = scheduler.foreground();
405        let foreground2 = scheduler.foreground();
406
407        let task1 = foreground1.spawn(async move {});
408        let task2 = foreground2.spawn(async move {});
409
410        foreground1.block_on(async {
411            scheduler.yield_random().await;
412            assert!(!task1.is_ready());
413            task2_made_progress_once |= task2.is_ready();
414        });
415
416        task1.await;
417        task2.await;
418    });
419
420    assert!(
421        task2_made_progress_once,
422        "Expected task from different foreground executor to make progress (at least once)"
423    );
424}
425
426struct Yield {
427    polls: usize,
428}
429
430impl Future for Yield {
431    type Output = ();
432
433    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
434        self.polls -= 1;
435        if self.polls == 0 {
436            Poll::Ready(())
437        } else {
438            cx.waker().wake_by_ref();
439            Poll::Pending
440        }
441    }
442}
443
444#[test]
445fn test_nondeterministic_wake_detection() {
446    let config = TestSchedulerConfig {
447        allow_parking: false,
448        ..Default::default()
449    };
450    let scheduler = Arc::new(TestScheduler::new(config));
451
452    // A future that captures its waker and sends it to an external thread
453    struct SendWakerToThread {
454        waker_tx: Option<std::sync::mpsc::Sender<Waker>>,
455    }
456
457    impl Future for SendWakerToThread {
458        type Output = ();
459
460        fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
461            if let Some(tx) = self.waker_tx.take() {
462                tx.send(cx.waker().clone()).ok();
463            }
464            Poll::Ready(())
465        }
466    }
467
468    let (waker_tx, waker_rx) = std::sync::mpsc::channel::<Waker>();
469
470    // Get a waker by running a future that sends it
471    scheduler.foreground().block_on(SendWakerToThread {
472        waker_tx: Some(waker_tx),
473    });
474
475    // Spawn a real OS thread that will call wake() on the waker
476    let handle = std::thread::spawn(move || {
477        if let Ok(waker) = waker_rx.recv() {
478            // This should trigger the non-determinism detection
479            waker.wake();
480        }
481    });
482
483    // Wait for the spawned thread to complete
484    handle.join().ok();
485
486    // The non-determinism error should be detected when end_test is called
487    let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
488        scheduler.end_test();
489    }));
490    assert!(result.is_err(), "Expected end_test to panic");
491    let panic_payload = result.unwrap_err();
492    let panic_message = panic_payload
493        .downcast_ref::<String>()
494        .map(|s| s.as_str())
495        .or_else(|| panic_payload.downcast_ref::<&str>().copied())
496        .unwrap_or("<unknown panic>");
497    assert!(
498        panic_message.contains("Your test is not deterministic"),
499        "Expected panic message to contain non-determinism error, got: {}",
500        panic_message
501    );
502}
503
504#[test]
505fn test_nondeterministic_wake_allowed_with_parking() {
506    let config = TestSchedulerConfig {
507        allow_parking: true,
508        ..Default::default()
509    };
510    let scheduler = Arc::new(TestScheduler::new(config));
511
512    // A future that captures its waker and sends it to an external thread
513    struct WakeFromExternalThread {
514        waker_sent: bool,
515        waker_tx: Option<std::sync::mpsc::Sender<Waker>>,
516    }
517
518    impl Future for WakeFromExternalThread {
519        type Output = ();
520
521        fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
522            if !self.waker_sent {
523                self.waker_sent = true;
524                if let Some(tx) = self.waker_tx.take() {
525                    tx.send(cx.waker().clone()).ok();
526                }
527                Poll::Pending
528            } else {
529                Poll::Ready(())
530            }
531        }
532    }
533
534    let (waker_tx, waker_rx) = std::sync::mpsc::channel::<Waker>();
535
536    // Spawn a real OS thread that will call wake() on the waker
537    std::thread::spawn(move || {
538        if let Ok(waker) = waker_rx.recv() {
539            // With allow_parking, this should NOT panic
540            waker.wake();
541        }
542    });
543
544    // This should complete without panicking
545    scheduler.foreground().block_on(WakeFromExternalThread {
546        waker_sent: false,
547        waker_tx: Some(waker_tx),
548    });
549}
550
551#[test]
552fn test_nondeterministic_waker_drop_detection() {
553    let config = TestSchedulerConfig {
554        allow_parking: false,
555        ..Default::default()
556    };
557    let scheduler = Arc::new(TestScheduler::new(config));
558
559    // A future that captures its waker and sends it to an external thread
560    struct SendWakerToThread {
561        waker_tx: Option<std::sync::mpsc::Sender<Waker>>,
562    }
563
564    impl Future for SendWakerToThread {
565        type Output = ();
566
567        fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
568            if let Some(tx) = self.waker_tx.take() {
569                tx.send(cx.waker().clone()).ok();
570            }
571            Poll::Ready(())
572        }
573    }
574
575    let (waker_tx, waker_rx) = std::sync::mpsc::channel::<Waker>();
576
577    // Get a waker by running a future that sends it
578    scheduler.foreground().block_on(SendWakerToThread {
579        waker_tx: Some(waker_tx),
580    });
581
582    // Spawn a real OS thread that will drop the waker without calling wake
583    let handle = std::thread::spawn(move || {
584        if let Ok(waker) = waker_rx.recv() {
585            // This should trigger the non-determinism detection on drop
586            drop(waker);
587        }
588    });
589
590    // Wait for the spawned thread to complete
591    handle.join().ok();
592
593    // The non-determinism error should be detected when end_test is called
594    let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
595        scheduler.end_test();
596    }));
597    assert!(result.is_err(), "Expected end_test to panic");
598    let panic_payload = result.unwrap_err();
599    let panic_message = panic_payload
600        .downcast_ref::<String>()
601        .map(|s| s.as_str())
602        .or_else(|| panic_payload.downcast_ref::<&str>().copied())
603        .unwrap_or("<unknown panic>");
604    assert!(
605        panic_message.contains("Your test is not deterministic"),
606        "Expected panic message to contain non-determinism error, got: {}",
607        panic_message
608    );
609}
610
611#[test]
612fn test_background_priority_scheduling() {
613    use parking_lot::Mutex;
614
615    // Run many iterations to get statistical significance
616    let mut high_before_low_count = 0;
617    let iterations = 100;
618
619    for seed in 0..iterations {
620        let config = TestSchedulerConfig::with_seed(seed);
621        let scheduler = Arc::new(TestScheduler::new(config));
622        let background = scheduler.background();
623
624        let execution_order = Arc::new(Mutex::new(Vec::new()));
625
626        // Spawn low priority tasks first
627        for i in 0..3 {
628            let order = execution_order.clone();
629            background
630                .spawn_with_priority(Priority::Low, async move {
631                    order.lock().push(format!("low-{}", i));
632                })
633                .detach();
634        }
635
636        // Spawn high priority tasks second
637        for i in 0..3 {
638            let order = execution_order.clone();
639            background
640                .spawn_with_priority(Priority::High, async move {
641                    order.lock().push(format!("high-{}", i));
642                })
643                .detach();
644        }
645
646        scheduler.run();
647
648        // Count how many high priority tasks ran in the first half
649        let order = execution_order.lock();
650        let high_in_first_half = order
651            .iter()
652            .take(3)
653            .filter(|s| s.starts_with("high"))
654            .count();
655
656        if high_in_first_half >= 2 {
657            high_before_low_count += 1;
658        }
659    }
660
661    // High priority tasks should tend to run before low priority tasks
662    // With weights of 60 vs 10, high priority should dominate early execution
663    assert!(
664        high_before_low_count > iterations / 2,
665        "Expected high priority tasks to run before low priority tasks more often. \
666         Got {} out of {} iterations",
667        high_before_low_count,
668        iterations
669    );
670}