tests.rs

  1use super::*;
  2use futures::{
  3    FutureExt,
  4    channel::{mpsc, oneshot},
  5    executor::block_on,
  6    future,
  7    sink::SinkExt,
  8    stream::{FuturesUnordered, StreamExt},
  9};
 10use std::{
 11    cell::RefCell,
 12    collections::{BTreeSet, HashSet},
 13    pin::Pin,
 14    rc::Rc,
 15    sync::Arc,
 16    task::{Context, Poll},
 17};
 18
 19#[test]
 20fn test_foreground_executor_spawn() {
 21    let result = TestScheduler::once(async |scheduler| {
 22        let task = scheduler.foreground().spawn(async move { 42 });
 23        task.await
 24    });
 25    assert_eq!(result, 42);
 26}
 27
 28#[test]
 29fn test_background_executor_spawn() {
 30    TestScheduler::once(async |scheduler| {
 31        let task = scheduler.background().spawn(async move { 42 });
 32        let result = task.await;
 33        assert_eq!(result, 42);
 34    });
 35}
 36
 37#[test]
 38fn test_foreground_ordering() {
 39    let mut traces = HashSet::new();
 40
 41    TestScheduler::many(100, async |scheduler| {
 42        #[derive(Hash, PartialEq, Eq)]
 43        struct TraceEntry {
 44            session: usize,
 45            task: usize,
 46        }
 47
 48        let trace = Rc::new(RefCell::new(Vec::new()));
 49
 50        let foreground_1 = scheduler.foreground();
 51        for task in 0..10 {
 52            foreground_1
 53                .spawn({
 54                    let trace = trace.clone();
 55                    async move {
 56                        trace.borrow_mut().push(TraceEntry { session: 0, task });
 57                    }
 58                })
 59                .detach();
 60        }
 61
 62        let foreground_2 = scheduler.foreground();
 63        for task in 0..10 {
 64            foreground_2
 65                .spawn({
 66                    let trace = trace.clone();
 67                    async move {
 68                        trace.borrow_mut().push(TraceEntry { session: 1, task });
 69                    }
 70                })
 71                .detach();
 72        }
 73
 74        scheduler.run();
 75
 76        assert_eq!(
 77            trace
 78                .borrow()
 79                .iter()
 80                .filter(|entry| entry.session == 0)
 81                .map(|entry| entry.task)
 82                .collect::<Vec<_>>(),
 83            (0..10).collect::<Vec<_>>()
 84        );
 85        assert_eq!(
 86            trace
 87                .borrow()
 88                .iter()
 89                .filter(|entry| entry.session == 1)
 90                .map(|entry| entry.task)
 91                .collect::<Vec<_>>(),
 92            (0..10).collect::<Vec<_>>()
 93        );
 94
 95        traces.insert(trace.take());
 96    });
 97
 98    assert!(traces.len() > 1, "Expected at least two traces");
 99}
100
101#[test]
102fn test_timer_ordering() {
103    TestScheduler::many(1, async |scheduler| {
104        let background = scheduler.background();
105        let futures = FuturesUnordered::new();
106        futures.push(
107            async {
108                background.timer(Duration::from_millis(100)).await;
109                2
110            }
111            .boxed(),
112        );
113        futures.push(
114            async {
115                background.timer(Duration::from_millis(50)).await;
116                1
117            }
118            .boxed(),
119        );
120        futures.push(
121            async {
122                background.timer(Duration::from_millis(150)).await;
123                3
124            }
125            .boxed(),
126        );
127        assert_eq!(futures.collect::<Vec<_>>().await, vec![1, 2, 3]);
128    });
129}
130
131#[test]
132fn test_send_from_bg_to_fg() {
133    TestScheduler::once(async |scheduler| {
134        let foreground = scheduler.foreground();
135        let background = scheduler.background();
136
137        let (sender, receiver) = oneshot::channel::<i32>();
138
139        background
140            .spawn(async move {
141                sender.send(42).unwrap();
142            })
143            .detach();
144
145        let task = foreground.spawn(async move { receiver.await.unwrap() });
146        let result = task.await;
147        assert_eq!(result, 42);
148    });
149}
150
151#[test]
152fn test_randomize_order() {
153    // Test deterministic mode: different seeds should produce same execution order
154    let mut deterministic_results = HashSet::new();
155    for seed in 0..10 {
156        let config = TestSchedulerConfig {
157            seed,
158            randomize_order: false,
159            ..Default::default()
160        };
161        let order = block_on(capture_execution_order(config));
162        assert_eq!(order.len(), 6);
163        deterministic_results.insert(order);
164    }
165
166    // All deterministic runs should produce the same result
167    assert_eq!(
168        deterministic_results.len(),
169        1,
170        "Deterministic mode should always produce same execution order"
171    );
172
173    // Test randomized mode: different seeds can produce different execution orders
174    let mut randomized_results = HashSet::new();
175    for seed in 0..20 {
176        let config = TestSchedulerConfig::with_seed(seed);
177        let order = block_on(capture_execution_order(config));
178        assert_eq!(order.len(), 6);
179        randomized_results.insert(order);
180    }
181
182    // Randomized mode should produce multiple different execution orders
183    assert!(
184        randomized_results.len() > 1,
185        "Randomized mode should produce multiple different orders"
186    );
187}
188
189async fn capture_execution_order(config: TestSchedulerConfig) -> Vec<String> {
190    let scheduler = Arc::new(TestScheduler::new(config));
191    let foreground = scheduler.foreground();
192    let background = scheduler.background();
193
194    let (sender, receiver) = mpsc::unbounded::<String>();
195
196    // Spawn foreground tasks
197    for i in 0..3 {
198        let mut sender = sender.clone();
199        foreground
200            .spawn(async move {
201                sender.send(format!("fg-{}", i)).await.ok();
202            })
203            .detach();
204    }
205
206    // Spawn background tasks
207    for i in 0..3 {
208        let mut sender = sender.clone();
209        background
210            .spawn(async move {
211                sender.send(format!("bg-{}", i)).await.ok();
212            })
213            .detach();
214    }
215
216    drop(sender); // Close sender to signal no more messages
217    scheduler.run();
218
219    receiver.collect().await
220}
221
222#[test]
223fn test_block() {
224    let scheduler = Arc::new(TestScheduler::new(TestSchedulerConfig::default()));
225    let (tx, rx) = oneshot::channel();
226
227    // Spawn background task to send value
228    let _ = scheduler
229        .background()
230        .spawn(async move {
231            tx.send(42).unwrap();
232        })
233        .detach();
234
235    // Block on receiving the value
236    let result = scheduler.foreground().block_on(async { rx.await.unwrap() });
237    assert_eq!(result, 42);
238}
239
240#[test]
241#[should_panic(expected = "futures_channel::oneshot::Inner")]
242fn test_parking_panics() {
243    let config = TestSchedulerConfig {
244        capture_pending_traces: true,
245        ..Default::default()
246    };
247    let scheduler = Arc::new(TestScheduler::new(config));
248    scheduler.foreground().block_on(async {
249        let (_tx, rx) = oneshot::channel::<()>();
250        rx.await.unwrap(); // This will never complete
251    });
252}
253
254#[test]
255fn test_block_with_parking() {
256    let config = TestSchedulerConfig {
257        allow_parking: true,
258        ..Default::default()
259    };
260    let scheduler = Arc::new(TestScheduler::new(config));
261    let (tx, rx) = oneshot::channel();
262
263    // Spawn background task to send value
264    let _ = scheduler
265        .background()
266        .spawn(async move {
267            tx.send(42).unwrap();
268        })
269        .detach();
270
271    // Block on receiving the value (will park if needed)
272    let result = scheduler.foreground().block_on(async { rx.await.unwrap() });
273    assert_eq!(result, 42);
274}
275
276#[test]
277fn test_helper_methods() {
278    // Test the once method
279    let result = TestScheduler::once(async |scheduler: Arc<TestScheduler>| {
280        let background = scheduler.background();
281        background.spawn(async { 42 }).await
282    });
283    assert_eq!(result, 42);
284
285    // Test the many method
286    let results = TestScheduler::many(3, async |scheduler: Arc<TestScheduler>| {
287        let background = scheduler.background();
288        background.spawn(async { 10 }).await
289    });
290    assert_eq!(results, vec![10, 10, 10]);
291}
292
293#[test]
294fn test_block_with_timeout() {
295    // Test case: future completes within timeout
296    TestScheduler::once(async |scheduler| {
297        let foreground = scheduler.foreground();
298        let future = future::ready(42);
299        let output = foreground.block_with_timeout(Duration::from_millis(100), future);
300        assert_eq!(output.unwrap(), 42);
301    });
302
303    // Test case: future times out
304    TestScheduler::once(async |scheduler| {
305        let foreground = scheduler.foreground();
306        let future = future::pending::<()>();
307        let output = foreground.block_with_timeout(Duration::from_millis(50), future);
308        let _ = output.expect_err("future should not have finished");
309    });
310
311    // Test case: future makes progress via timer but still times out
312    let mut results = BTreeSet::new();
313    TestScheduler::many(100, async |scheduler| {
314        let task = scheduler.background().spawn(async move {
315            Yield { polls: 10 }.await;
316            42
317        });
318        let output = scheduler
319            .foreground()
320            .block_with_timeout(Duration::from_millis(50), task);
321        results.insert(output.ok());
322    });
323    assert_eq!(
324        results.into_iter().collect::<Vec<_>>(),
325        vec![None, Some(42)]
326    );
327}
328
329// When calling block, we shouldn't make progress on foreground-spawned futures with the same session id.
330#[test]
331fn test_block_does_not_progress_same_session_foreground() {
332    let mut task2_made_progress_once = false;
333    TestScheduler::many(1000, async |scheduler| {
334        let foreground1 = scheduler.foreground();
335        let foreground2 = scheduler.foreground();
336
337        let task1 = foreground1.spawn(async move {});
338        let task2 = foreground2.spawn(async move {});
339
340        foreground1.block_on(async {
341            scheduler.yield_random().await;
342            assert!(!task1.is_ready());
343            task2_made_progress_once |= task2.is_ready();
344        });
345
346        task1.await;
347        task2.await;
348    });
349
350    assert!(
351        task2_made_progress_once,
352        "Expected task from different foreground executor to make progress (at least once)"
353    );
354}
355
356struct Yield {
357    polls: usize,
358}
359
360impl Future for Yield {
361    type Output = ();
362
363    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
364        self.polls -= 1;
365        if self.polls == 0 {
366            Poll::Ready(())
367        } else {
368            cx.waker().wake_by_ref();
369            Poll::Pending
370        }
371    }
372}