1use super::*;
2use futures::{
3 FutureExt,
4 channel::{mpsc, oneshot},
5 executor::block_on,
6 future,
7 sink::SinkExt,
8 stream::{FuturesUnordered, StreamExt},
9};
10use std::{
11 cell::RefCell,
12 collections::{BTreeSet, HashSet},
13 pin::Pin,
14 rc::Rc,
15 sync::Arc,
16 task::{Context, Poll},
17};
18
19#[test]
20fn test_foreground_executor_spawn() {
21 let result = TestScheduler::once(async |scheduler| {
22 let task = scheduler.foreground().spawn(async move { 42 });
23 task.await
24 });
25 assert_eq!(result, 42);
26}
27
28#[test]
29fn test_background_executor_spawn() {
30 TestScheduler::once(async |scheduler| {
31 let task = scheduler.background().spawn(async move { 42 });
32 let result = task.await;
33 assert_eq!(result, 42);
34 });
35}
36
37#[test]
38fn test_foreground_ordering() {
39 let mut traces = HashSet::new();
40
41 TestScheduler::many(100, async |scheduler| {
42 #[derive(Hash, PartialEq, Eq)]
43 struct TraceEntry {
44 session: usize,
45 task: usize,
46 }
47
48 let trace = Rc::new(RefCell::new(Vec::new()));
49
50 let foreground_1 = scheduler.foreground();
51 for task in 0..10 {
52 foreground_1
53 .spawn({
54 let trace = trace.clone();
55 async move {
56 trace.borrow_mut().push(TraceEntry { session: 0, task });
57 }
58 })
59 .detach();
60 }
61
62 let foreground_2 = scheduler.foreground();
63 for task in 0..10 {
64 foreground_2
65 .spawn({
66 let trace = trace.clone();
67 async move {
68 trace.borrow_mut().push(TraceEntry { session: 1, task });
69 }
70 })
71 .detach();
72 }
73
74 scheduler.run();
75
76 assert_eq!(
77 trace
78 .borrow()
79 .iter()
80 .filter(|entry| entry.session == 0)
81 .map(|entry| entry.task)
82 .collect::<Vec<_>>(),
83 (0..10).collect::<Vec<_>>()
84 );
85 assert_eq!(
86 trace
87 .borrow()
88 .iter()
89 .filter(|entry| entry.session == 1)
90 .map(|entry| entry.task)
91 .collect::<Vec<_>>(),
92 (0..10).collect::<Vec<_>>()
93 );
94
95 traces.insert(trace.take());
96 });
97
98 assert!(traces.len() > 1, "Expected at least two traces");
99}
100
101#[test]
102fn test_timer_ordering() {
103 TestScheduler::many(1, async |scheduler| {
104 let background = scheduler.background();
105 let futures = FuturesUnordered::new();
106 futures.push(
107 async {
108 background.timer(Duration::from_millis(100)).await;
109 2
110 }
111 .boxed(),
112 );
113 futures.push(
114 async {
115 background.timer(Duration::from_millis(50)).await;
116 1
117 }
118 .boxed(),
119 );
120 futures.push(
121 async {
122 background.timer(Duration::from_millis(150)).await;
123 3
124 }
125 .boxed(),
126 );
127 assert_eq!(futures.collect::<Vec<_>>().await, vec![1, 2, 3]);
128 });
129}
130
131#[test]
132fn test_send_from_bg_to_fg() {
133 TestScheduler::once(async |scheduler| {
134 let foreground = scheduler.foreground();
135 let background = scheduler.background();
136
137 let (sender, receiver) = oneshot::channel::<i32>();
138
139 background
140 .spawn(async move {
141 sender.send(42).unwrap();
142 })
143 .detach();
144
145 let task = foreground.spawn(async move { receiver.await.unwrap() });
146 let result = task.await;
147 assert_eq!(result, 42);
148 });
149}
150
151#[test]
152fn test_randomize_order() {
153 // Test deterministic mode: different seeds should produce same execution order
154 let mut deterministic_results = HashSet::new();
155 for seed in 0..10 {
156 let config = TestSchedulerConfig {
157 seed,
158 randomize_order: false,
159 ..Default::default()
160 };
161 let order = block_on(capture_execution_order(config));
162 assert_eq!(order.len(), 6);
163 deterministic_results.insert(order);
164 }
165
166 // All deterministic runs should produce the same result
167 assert_eq!(
168 deterministic_results.len(),
169 1,
170 "Deterministic mode should always produce same execution order"
171 );
172
173 // Test randomized mode: different seeds can produce different execution orders
174 let mut randomized_results = HashSet::new();
175 for seed in 0..20 {
176 let config = TestSchedulerConfig::with_seed(seed);
177 let order = block_on(capture_execution_order(config));
178 assert_eq!(order.len(), 6);
179 randomized_results.insert(order);
180 }
181
182 // Randomized mode should produce multiple different execution orders
183 assert!(
184 randomized_results.len() > 1,
185 "Randomized mode should produce multiple different orders"
186 );
187}
188
189async fn capture_execution_order(config: TestSchedulerConfig) -> Vec<String> {
190 let scheduler = Arc::new(TestScheduler::new(config));
191 let foreground = scheduler.foreground();
192 let background = scheduler.background();
193
194 let (sender, receiver) = mpsc::unbounded::<String>();
195
196 // Spawn foreground tasks
197 for i in 0..3 {
198 let mut sender = sender.clone();
199 foreground
200 .spawn(async move {
201 sender.send(format!("fg-{}", i)).await.ok();
202 })
203 .detach();
204 }
205
206 // Spawn background tasks
207 for i in 0..3 {
208 let mut sender = sender.clone();
209 background
210 .spawn(async move {
211 sender.send(format!("bg-{}", i)).await.ok();
212 })
213 .detach();
214 }
215
216 drop(sender); // Close sender to signal no more messages
217 scheduler.run();
218
219 receiver.collect().await
220}
221
222#[test]
223fn test_block() {
224 let scheduler = Arc::new(TestScheduler::new(TestSchedulerConfig::default()));
225 let (tx, rx) = oneshot::channel();
226
227 // Spawn background task to send value
228 let _ = scheduler
229 .background()
230 .spawn(async move {
231 tx.send(42).unwrap();
232 })
233 .detach();
234
235 // Block on receiving the value
236 let result = scheduler.foreground().block_on(async { rx.await.unwrap() });
237 assert_eq!(result, 42);
238}
239
240#[test]
241#[should_panic(expected = "Parking forbidden. Pending traces:")]
242fn test_parking_panics() {
243 let config = TestSchedulerConfig {
244 capture_pending_traces: true,
245 ..Default::default()
246 };
247 let scheduler = Arc::new(TestScheduler::new(config));
248 scheduler.foreground().block_on(async {
249 let (_tx, rx) = oneshot::channel::<()>();
250 rx.await.unwrap(); // This will never complete
251 });
252}
253
254#[test]
255fn test_block_with_parking() {
256 let config = TestSchedulerConfig {
257 allow_parking: true,
258 ..Default::default()
259 };
260 let scheduler = Arc::new(TestScheduler::new(config));
261 let (tx, rx) = oneshot::channel();
262
263 // Spawn background task to send value
264 let _ = scheduler
265 .background()
266 .spawn(async move {
267 tx.send(42).unwrap();
268 })
269 .detach();
270
271 // Block on receiving the value (will park if needed)
272 let result = scheduler.foreground().block_on(async { rx.await.unwrap() });
273 assert_eq!(result, 42);
274}
275
276#[test]
277fn test_helper_methods() {
278 // Test the once method
279 let result = TestScheduler::once(async |scheduler: Arc<TestScheduler>| {
280 let background = scheduler.background();
281 background.spawn(async { 42 }).await
282 });
283 assert_eq!(result, 42);
284
285 // Test the many method
286 let results = TestScheduler::many(3, async |scheduler: Arc<TestScheduler>| {
287 let background = scheduler.background();
288 background.spawn(async { 10 }).await
289 });
290 assert_eq!(results, vec![10, 10, 10]);
291}
292
293#[test]
294fn test_block_with_timeout() {
295 // Test case: future completes within timeout
296 TestScheduler::once(async |scheduler| {
297 let foreground = scheduler.foreground();
298 let future = future::ready(42);
299 let output = foreground.block_with_timeout(Duration::from_millis(100), future);
300 assert_eq!(output.ok(), Some(42));
301 });
302
303 // Test case: future times out
304 TestScheduler::once(async |scheduler| {
305 // Make timeout behavior deterministic by forcing the timeout tick budget to be exactly 0.
306 // This prevents `block_with_timeout` from making progress via extra scheduler stepping and
307 // accidentally completing work that we expect to time out.
308 scheduler.set_timeout_ticks(0..=0);
309
310 let foreground = scheduler.foreground();
311 let future = future::pending::<()>();
312 let output = foreground.block_with_timeout(Duration::from_millis(50), future);
313 assert!(output.is_err(), "future should not have finished");
314 });
315
316 // Test case: future makes progress via timer but still times out
317 let mut results = BTreeSet::new();
318 TestScheduler::many(100, async |scheduler| {
319 // Keep the existing probabilistic behavior here (do not force 0 ticks), since this subtest
320 // is explicitly checking that some seeds/timeouts can complete while others can time out.
321 let task = scheduler.background().spawn(async move {
322 Yield { polls: 10 }.await;
323 42
324 });
325 let output = scheduler
326 .foreground()
327 .block_with_timeout(Duration::from_millis(50), task);
328 results.insert(output.ok());
329 });
330 assert_eq!(
331 results.into_iter().collect::<Vec<_>>(),
332 vec![None, Some(42)]
333 );
334
335 // Regression test:
336 // A timed-out future must not be cancelled. The returned future should still be
337 // pollable to completion later. We also want to ensure time only advances when we
338 // explicitly advance it (not by yielding).
339 TestScheduler::once(async |scheduler| {
340 // Force immediate timeout: the timeout tick budget is 0 so we will not step or
341 // advance timers inside `block_with_timeout`.
342 scheduler.set_timeout_ticks(0..=0);
343
344 let background = scheduler.background();
345
346 // This task should only complete once time is explicitly advanced.
347 let task = background.spawn({
348 let scheduler = scheduler.clone();
349 async move {
350 scheduler.timer(Duration::from_millis(100)).await;
351 123
352 }
353 });
354
355 // This should time out before we advance time enough for the timer to fire.
356 let timed_out = scheduler
357 .foreground()
358 .block_with_timeout(Duration::from_millis(50), task);
359 assert!(
360 timed_out.is_err(),
361 "expected timeout before advancing the clock enough for the timer"
362 );
363
364 // Now explicitly advance time and ensure the returned future can complete.
365 let mut task = timed_out.err().unwrap();
366 scheduler.advance_clock(Duration::from_millis(100));
367 scheduler.run();
368
369 let output = scheduler.foreground().block_on(&mut task);
370 assert_eq!(output, 123);
371 });
372}
373
374// When calling block, we shouldn't make progress on foreground-spawned futures with the same session id.
375#[test]
376fn test_block_does_not_progress_same_session_foreground() {
377 let mut task2_made_progress_once = false;
378 TestScheduler::many(1000, async |scheduler| {
379 let foreground1 = scheduler.foreground();
380 let foreground2 = scheduler.foreground();
381
382 let task1 = foreground1.spawn(async move {});
383 let task2 = foreground2.spawn(async move {});
384
385 foreground1.block_on(async {
386 scheduler.yield_random().await;
387 assert!(!task1.is_ready());
388 task2_made_progress_once |= task2.is_ready();
389 });
390
391 task1.await;
392 task2.await;
393 });
394
395 assert!(
396 task2_made_progress_once,
397 "Expected task from different foreground executor to make progress (at least once)"
398 );
399}
400
401struct Yield {
402 polls: usize,
403}
404
405impl Future for Yield {
406 type Output = ();
407
408 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
409 self.polls -= 1;
410 if self.polls == 0 {
411 Poll::Ready(())
412 } else {
413 cx.waker().wake_by_ref();
414 Poll::Pending
415 }
416 }
417}
418
419#[test]
420fn test_background_priority_scheduling() {
421 use parking_lot::Mutex;
422
423 // Run many iterations to get statistical significance
424 let mut high_before_low_count = 0;
425 let iterations = 100;
426
427 for seed in 0..iterations {
428 let config = TestSchedulerConfig::with_seed(seed);
429 let scheduler = Arc::new(TestScheduler::new(config));
430 let background = scheduler.background();
431
432 let execution_order = Arc::new(Mutex::new(Vec::new()));
433
434 // Spawn low priority tasks first
435 for i in 0..3 {
436 let order = execution_order.clone();
437 background
438 .spawn_with_priority(Priority::Low, async move {
439 order.lock().push(format!("low-{}", i));
440 })
441 .detach();
442 }
443
444 // Spawn high priority tasks second
445 for i in 0..3 {
446 let order = execution_order.clone();
447 background
448 .spawn_with_priority(Priority::High, async move {
449 order.lock().push(format!("high-{}", i));
450 })
451 .detach();
452 }
453
454 scheduler.run();
455
456 // Count how many high priority tasks ran in the first half
457 let order = execution_order.lock();
458 let high_in_first_half = order
459 .iter()
460 .take(3)
461 .filter(|s| s.starts_with("high"))
462 .count();
463
464 if high_in_first_half >= 2 {
465 high_before_low_count += 1;
466 }
467 }
468
469 // High priority tasks should tend to run before low priority tasks
470 // With weights of 60 vs 10, high priority should dominate early execution
471 assert!(
472 high_before_low_count > iterations / 2,
473 "Expected high priority tasks to run before low priority tasks more often. \
474 Got {} out of {} iterations",
475 high_before_low_count,
476 iterations
477 );
478}