tests.rs

  1use super::*;
  2use futures::{
  3    FutureExt,
  4    channel::{mpsc, oneshot},
  5    executor::block_on,
  6    future,
  7    sink::SinkExt,
  8    stream::{FuturesUnordered, StreamExt},
  9};
 10use std::{
 11    cell::RefCell,
 12    collections::{BTreeSet, HashSet},
 13    pin::Pin,
 14    rc::Rc,
 15    sync::Arc,
 16    task::{Context, Poll},
 17};
 18
 19#[test]
 20fn test_foreground_executor_spawn() {
 21    let result = TestScheduler::once(async |scheduler| {
 22        let task = scheduler.foreground().spawn(async move { 42 });
 23        task.await
 24    });
 25    assert_eq!(result, 42);
 26}
 27
 28#[test]
 29fn test_background_executor_spawn() {
 30    TestScheduler::once(async |scheduler| {
 31        let task = scheduler.background().spawn(async move { 42 });
 32        let result = task.await;
 33        assert_eq!(result, 42);
 34    });
 35}
 36
 37#[test]
 38fn test_foreground_ordering() {
 39    let mut traces = HashSet::new();
 40
 41    TestScheduler::many(100, async |scheduler| {
 42        #[derive(Hash, PartialEq, Eq)]
 43        struct TraceEntry {
 44            session: usize,
 45            task: usize,
 46        }
 47
 48        let trace = Rc::new(RefCell::new(Vec::new()));
 49
 50        let foreground_1 = scheduler.foreground();
 51        for task in 0..10 {
 52            foreground_1
 53                .spawn({
 54                    let trace = trace.clone();
 55                    async move {
 56                        trace.borrow_mut().push(TraceEntry { session: 0, task });
 57                    }
 58                })
 59                .detach();
 60        }
 61
 62        let foreground_2 = scheduler.foreground();
 63        for task in 0..10 {
 64            foreground_2
 65                .spawn({
 66                    let trace = trace.clone();
 67                    async move {
 68                        trace.borrow_mut().push(TraceEntry { session: 1, task });
 69                    }
 70                })
 71                .detach();
 72        }
 73
 74        scheduler.run();
 75
 76        assert_eq!(
 77            trace
 78                .borrow()
 79                .iter()
 80                .filter(|entry| entry.session == 0)
 81                .map(|entry| entry.task)
 82                .collect::<Vec<_>>(),
 83            (0..10).collect::<Vec<_>>()
 84        );
 85        assert_eq!(
 86            trace
 87                .borrow()
 88                .iter()
 89                .filter(|entry| entry.session == 1)
 90                .map(|entry| entry.task)
 91                .collect::<Vec<_>>(),
 92            (0..10).collect::<Vec<_>>()
 93        );
 94
 95        traces.insert(trace.take());
 96    });
 97
 98    assert!(traces.len() > 1, "Expected at least two traces");
 99}
100
101#[test]
102fn test_timer_ordering() {
103    TestScheduler::many(1, async |scheduler| {
104        let background = scheduler.background();
105        let futures = FuturesUnordered::new();
106        futures.push(
107            async {
108                background.timer(Duration::from_millis(100)).await;
109                2
110            }
111            .boxed(),
112        );
113        futures.push(
114            async {
115                background.timer(Duration::from_millis(50)).await;
116                1
117            }
118            .boxed(),
119        );
120        futures.push(
121            async {
122                background.timer(Duration::from_millis(150)).await;
123                3
124            }
125            .boxed(),
126        );
127        assert_eq!(futures.collect::<Vec<_>>().await, vec![1, 2, 3]);
128    });
129}
130
131#[test]
132fn test_send_from_bg_to_fg() {
133    TestScheduler::once(async |scheduler| {
134        let foreground = scheduler.foreground();
135        let background = scheduler.background();
136
137        let (sender, receiver) = oneshot::channel::<i32>();
138
139        background
140            .spawn(async move {
141                sender.send(42).unwrap();
142            })
143            .detach();
144
145        let task = foreground.spawn(async move { receiver.await.unwrap() });
146        let result = task.await;
147        assert_eq!(result, 42);
148    });
149}
150
151#[test]
152fn test_randomize_order() {
153    // Test deterministic mode: different seeds should produce same execution order
154    let mut deterministic_results = HashSet::new();
155    for seed in 0..10 {
156        let config = TestSchedulerConfig {
157            seed,
158            randomize_order: false,
159            ..Default::default()
160        };
161        let order = block_on(capture_execution_order(config));
162        assert_eq!(order.len(), 6);
163        deterministic_results.insert(order);
164    }
165
166    // All deterministic runs should produce the same result
167    assert_eq!(
168        deterministic_results.len(),
169        1,
170        "Deterministic mode should always produce same execution order"
171    );
172
173    // Test randomized mode: different seeds can produce different execution orders
174    let mut randomized_results = HashSet::new();
175    for seed in 0..20 {
176        let config = TestSchedulerConfig::with_seed(seed);
177        let order = block_on(capture_execution_order(config));
178        assert_eq!(order.len(), 6);
179        randomized_results.insert(order);
180    }
181
182    // Randomized mode should produce multiple different execution orders
183    assert!(
184        randomized_results.len() > 1,
185        "Randomized mode should produce multiple different orders"
186    );
187}
188
189async fn capture_execution_order(config: TestSchedulerConfig) -> Vec<String> {
190    let scheduler = Arc::new(TestScheduler::new(config));
191    let foreground = scheduler.foreground();
192    let background = scheduler.background();
193
194    let (sender, receiver) = mpsc::unbounded::<String>();
195
196    // Spawn foreground tasks
197    for i in 0..3 {
198        let mut sender = sender.clone();
199        foreground
200            .spawn(async move {
201                sender.send(format!("fg-{}", i)).await.ok();
202            })
203            .detach();
204    }
205
206    // Spawn background tasks
207    for i in 0..3 {
208        let mut sender = sender.clone();
209        background
210            .spawn(async move {
211                sender.send(format!("bg-{}", i)).await.ok();
212            })
213            .detach();
214    }
215
216    drop(sender); // Close sender to signal no more messages
217    scheduler.run();
218
219    receiver.collect().await
220}
221
222#[test]
223fn test_block() {
224    let scheduler = Arc::new(TestScheduler::new(TestSchedulerConfig::default()));
225    let (tx, rx) = oneshot::channel();
226
227    // Spawn background task to send value
228    let _ = scheduler
229        .background()
230        .spawn(async move {
231            tx.send(42).unwrap();
232        })
233        .detach();
234
235    // Block on receiving the value
236    let result = scheduler.foreground().block_on(async { rx.await.unwrap() });
237    assert_eq!(result, 42);
238}
239
240#[test]
241#[should_panic(expected = "Parking forbidden. Pending traces:")]
242fn test_parking_panics() {
243    let config = TestSchedulerConfig {
244        capture_pending_traces: true,
245        ..Default::default()
246    };
247    let scheduler = Arc::new(TestScheduler::new(config));
248    scheduler.foreground().block_on(async {
249        let (_tx, rx) = oneshot::channel::<()>();
250        rx.await.unwrap(); // This will never complete
251    });
252}
253
254#[test]
255fn test_block_with_parking() {
256    let config = TestSchedulerConfig {
257        allow_parking: true,
258        ..Default::default()
259    };
260    let scheduler = Arc::new(TestScheduler::new(config));
261    let (tx, rx) = oneshot::channel();
262
263    // Spawn background task to send value
264    let _ = scheduler
265        .background()
266        .spawn(async move {
267            tx.send(42).unwrap();
268        })
269        .detach();
270
271    // Block on receiving the value (will park if needed)
272    let result = scheduler.foreground().block_on(async { rx.await.unwrap() });
273    assert_eq!(result, 42);
274}
275
276#[test]
277fn test_helper_methods() {
278    // Test the once method
279    let result = TestScheduler::once(async |scheduler: Arc<TestScheduler>| {
280        let background = scheduler.background();
281        background.spawn(async { 42 }).await
282    });
283    assert_eq!(result, 42);
284
285    // Test the many method
286    let results = TestScheduler::many(3, async |scheduler: Arc<TestScheduler>| {
287        let background = scheduler.background();
288        background.spawn(async { 10 }).await
289    });
290    assert_eq!(results, vec![10, 10, 10]);
291}
292
293#[test]
294fn test_block_with_timeout() {
295    // Test case: future completes within timeout
296    TestScheduler::once(async |scheduler| {
297        let foreground = scheduler.foreground();
298        let future = future::ready(42);
299        let output = foreground.block_with_timeout(Duration::from_millis(100), future);
300        assert_eq!(output.ok(), Some(42));
301    });
302
303    // Test case: future times out
304    TestScheduler::once(async |scheduler| {
305        // Make timeout behavior deterministic by forcing the timeout tick budget to be exactly 0.
306        // This prevents `block_with_timeout` from making progress via extra scheduler stepping and
307        // accidentally completing work that we expect to time out.
308        scheduler.set_timeout_ticks(0..=0);
309
310        let foreground = scheduler.foreground();
311        let future = future::pending::<()>();
312        let output = foreground.block_with_timeout(Duration::from_millis(50), future);
313        assert!(output.is_err(), "future should not have finished");
314    });
315
316    // Test case: future makes progress via timer but still times out
317    let mut results = BTreeSet::new();
318    TestScheduler::many(100, async |scheduler| {
319        // Keep the existing probabilistic behavior here (do not force 0 ticks), since this subtest
320        // is explicitly checking that some seeds/timeouts can complete while others can time out.
321        let task = scheduler.background().spawn(async move {
322            Yield { polls: 10 }.await;
323            42
324        });
325        let output = scheduler
326            .foreground()
327            .block_with_timeout(Duration::from_millis(50), task);
328        results.insert(output.ok());
329    });
330    assert_eq!(
331        results.into_iter().collect::<Vec<_>>(),
332        vec![None, Some(42)]
333    );
334
335    // Regression test:
336    // A timed-out future must not be cancelled. The returned future should still be
337    // pollable to completion later. We also want to ensure time only advances when we
338    // explicitly advance it (not by yielding).
339    TestScheduler::once(async |scheduler| {
340        // Force immediate timeout: the timeout tick budget is 0 so we will not step or
341        // advance timers inside `block_with_timeout`.
342        scheduler.set_timeout_ticks(0..=0);
343
344        let background = scheduler.background();
345
346        // This task should only complete once time is explicitly advanced.
347        let task = background.spawn({
348            let scheduler = scheduler.clone();
349            async move {
350                scheduler.timer(Duration::from_millis(100)).await;
351                123
352            }
353        });
354
355        // This should time out before we advance time enough for the timer to fire.
356        let timed_out = scheduler
357            .foreground()
358            .block_with_timeout(Duration::from_millis(50), task);
359        assert!(
360            timed_out.is_err(),
361            "expected timeout before advancing the clock enough for the timer"
362        );
363
364        // Now explicitly advance time and ensure the returned future can complete.
365        let mut task = timed_out.err().unwrap();
366        scheduler.advance_clock(Duration::from_millis(100));
367        scheduler.run();
368
369        let output = scheduler.foreground().block_on(&mut task);
370        assert_eq!(output, 123);
371    });
372}
373
374// When calling block, we shouldn't make progress on foreground-spawned futures with the same session id.
375#[test]
376fn test_block_does_not_progress_same_session_foreground() {
377    let mut task2_made_progress_once = false;
378    TestScheduler::many(1000, async |scheduler| {
379        let foreground1 = scheduler.foreground();
380        let foreground2 = scheduler.foreground();
381
382        let task1 = foreground1.spawn(async move {});
383        let task2 = foreground2.spawn(async move {});
384
385        foreground1.block_on(async {
386            scheduler.yield_random().await;
387            assert!(!task1.is_ready());
388            task2_made_progress_once |= task2.is_ready();
389        });
390
391        task1.await;
392        task2.await;
393    });
394
395    assert!(
396        task2_made_progress_once,
397        "Expected task from different foreground executor to make progress (at least once)"
398    );
399}
400
401struct Yield {
402    polls: usize,
403}
404
405impl Future for Yield {
406    type Output = ();
407
408    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
409        self.polls -= 1;
410        if self.polls == 0 {
411            Poll::Ready(())
412        } else {
413            cx.waker().wake_by_ref();
414            Poll::Pending
415        }
416    }
417}
418
419#[test]
420fn test_background_priority_scheduling() {
421    use parking_lot::Mutex;
422
423    // Run many iterations to get statistical significance
424    let mut high_before_low_count = 0;
425    let iterations = 100;
426
427    for seed in 0..iterations {
428        let config = TestSchedulerConfig::with_seed(seed);
429        let scheduler = Arc::new(TestScheduler::new(config));
430        let background = scheduler.background();
431
432        let execution_order = Arc::new(Mutex::new(Vec::new()));
433
434        // Spawn low priority tasks first
435        for i in 0..3 {
436            let order = execution_order.clone();
437            background
438                .spawn_with_priority(Priority::Low, async move {
439                    order.lock().push(format!("low-{}", i));
440                })
441                .detach();
442        }
443
444        // Spawn high priority tasks second
445        for i in 0..3 {
446            let order = execution_order.clone();
447            background
448                .spawn_with_priority(Priority::High, async move {
449                    order.lock().push(format!("high-{}", i));
450                })
451                .detach();
452        }
453
454        scheduler.run();
455
456        // Count how many high priority tasks ran in the first half
457        let order = execution_order.lock();
458        let high_in_first_half = order
459            .iter()
460            .take(3)
461            .filter(|s| s.starts_with("high"))
462            .count();
463
464        if high_in_first_half >= 2 {
465            high_before_low_count += 1;
466        }
467    }
468
469    // High priority tasks should tend to run before low priority tasks
470    // With weights of 60 vs 10, high priority should dominate early execution
471    assert!(
472        high_before_low_count > iterations / 2,
473        "Expected high priority tasks to run before low priority tasks more often. \
474         Got {} out of {} iterations",
475        high_before_low_count,
476        iterations
477    );
478}