diff --git a/Cargo.lock b/Cargo.lock index 1c8f49103f476c5f04887ccf2dbd9bd6b560a6f3..cfb2ba816a4b314454236f2b8bf72638e34a4923 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -14348,9 +14348,9 @@ name = "scheduler" version = "0.1.0" dependencies = [ "async-task", + "backtrace", "chrono", "futures 0.3.31", - "parking", "parking_lot", "rand 0.9.1", "workspace-hack", diff --git a/Cargo.toml b/Cargo.toml index b5fd14087bcdd4675c630d5bb1137b8e8c3743c7..f44a1061f4c35575b132f14039edab547f828415 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -460,6 +460,7 @@ aws-sdk-bedrockruntime = { version = "1.80.0", features = [ ] } aws-smithy-runtime-api = { version = "1.7.4", features = ["http-1x", "client"] } aws-smithy-types = { version = "1.3.0", features = ["http-body-1-x"] } +backtrace = "0.3" base64 = "0.22" bincode = "1.2.1" bitflags = "2.6.0" @@ -567,7 +568,6 @@ objc2-foundation = { version = "0.3", default-features = false, features = [ open = "5.0.0" ordered-float = "2.1.1" palette = { version = "0.7.5", default-features = false, features = ["std"] } -parking = "2.0" parking_lot = "0.12.1" partial-json-fixer = "0.5.3" parse_int = "0.9" diff --git a/crates/scheduler/Cargo.toml b/crates/scheduler/Cargo.toml index 0446c67914541964f01514865ddc363c60f837c8..44436b34d490b94588af54b79abfbf3d60974a93 100644 --- a/crates/scheduler/Cargo.toml +++ b/crates/scheduler/Cargo.toml @@ -17,9 +17,9 @@ test-support = [] [dependencies] async-task.workspace = true +backtrace.workspace = true chrono.workspace = true futures.workspace = true -parking.workspace = true parking_lot.workspace = true rand.workspace = true workspace-hack.workspace = true diff --git a/crates/scheduler/src/clock.rs b/crates/scheduler/src/clock.rs index c035c6b7dbcbabeaeeb2a952974cc4bf777c1f92..447d76b584a6b55e5fb977661e1aed068486f864 100644 --- a/crates/scheduler/src/clock.rs +++ b/crates/scheduler/src/clock.rs @@ -1,34 +1,42 @@ -use chrono::{DateTime, Duration, Utc}; +use chrono::{DateTime, Utc}; use parking_lot::Mutex; +use std::time::{Duration, Instant}; pub trait Clock { - fn now(&self) -> DateTime; + fn utc_now(&self) -> DateTime; + fn now(&self) -> Instant; } -pub struct TestClock { - now: Mutex>, +pub struct TestClock(Mutex); + +struct TestClockState { + now: Instant, + utc_now: DateTime, } impl TestClock { pub fn new() -> Self { const START_TIME: &str = "2025-07-01T23:59:58-00:00"; - let now = DateTime::parse_from_rfc3339(START_TIME).unwrap().to_utc(); - Self { - now: Mutex::new(now), - } - } - - pub fn set_now(&self, now: DateTime) { - *self.now.lock() = now; + let utc_now = DateTime::parse_from_rfc3339(START_TIME).unwrap().to_utc(); + Self(Mutex::new(TestClockState { + now: Instant::now(), + utc_now, + })) } pub fn advance(&self, duration: Duration) { - *self.now.lock() += duration; + let mut state = self.0.lock(); + state.now += duration; + state.utc_now += duration; } } impl Clock for TestClock { - fn now(&self) -> DateTime { - *self.now.lock() + fn utc_now(&self) -> DateTime { + self.0.lock().utc_now + } + + fn now(&self) -> Instant { + self.0.lock().now } } diff --git a/crates/scheduler/src/executor.rs b/crates/scheduler/src/executor.rs index 03f91ae551ff086f56e089bd53d690a2c5345949..a5a0126860f4e685f5f442a201920015bda2fd37 100644 --- a/crates/scheduler/src/executor.rs +++ b/crates/scheduler/src/executor.rs @@ -1,11 +1,15 @@ use crate::{Scheduler, SessionId, Timer}; +use futures::FutureExt as _; use std::{ future::Future, marker::PhantomData, + mem::ManuallyDrop, + panic::Location, pin::Pin, rc::Rc, sync::Arc, task::{Context, Poll}, + thread::{self, ThreadId}, time::Duration, }; @@ -17,6 +21,15 @@ pub struct ForegroundExecutor { } impl ForegroundExecutor { + pub fn new(session_id: SessionId, scheduler: Arc) -> Self { + Self { + session_id, + scheduler, + not_send: PhantomData, + } + } + + #[track_caller] pub fn spawn(&self, future: F) -> Task where F: Future + 'static, @@ -24,43 +37,52 @@ impl ForegroundExecutor { { let session_id = self.session_id; let scheduler = Arc::clone(&self.scheduler); - let (runnable, task) = async_task::spawn_local(future, move |runnable| { + let (runnable, task) = spawn_local_with_source_location(future, move |runnable| { scheduler.schedule_foreground(session_id, runnable); }); runnable.schedule(); Task(TaskState::Spawned(task)) } - pub fn timer(&self, duration: Duration) -> Timer { - self.scheduler.timer(duration) + pub fn block_on(&self, future: Fut) -> Fut::Output { + let mut output = None; + self.scheduler.block( + Some(self.session_id), + async { output = Some(future.await) }.boxed_local(), + None, + ); + output.unwrap() } -} -impl ForegroundExecutor { - pub fn new(session_id: SessionId, scheduler: Arc) -> Self { - assert!( - scheduler.is_main_thread(), - "ForegroundExecutor must be created on the same thread as the Scheduler" + pub fn block_with_timeout( + &self, + timeout: Duration, + mut future: Fut, + ) -> Result { + let mut output = None; + self.scheduler.block( + Some(self.session_id), + async { output = Some((&mut future).await) }.boxed_local(), + Some(timeout), ); - Self { - session_id, - scheduler, - not_send: PhantomData, - } + output.ok_or(future) } -} -impl BackgroundExecutor { - pub fn new(scheduler: Arc) -> Self { - Self { scheduler } + pub fn timer(&self, duration: Duration) -> Timer { + self.scheduler.timer(duration) } } +#[derive(Clone)] pub struct BackgroundExecutor { scheduler: Arc, } impl BackgroundExecutor { + pub fn new(scheduler: Arc) -> Self { + Self { scheduler } + } + pub fn spawn(&self, future: F) -> Task where F: Future + Send + 'static, @@ -74,21 +96,13 @@ impl BackgroundExecutor { Task(TaskState::Spawned(task)) } - pub fn block_on(&self, future: Fut) -> Fut::Output { - self.scheduler.block_on(future) - } - - pub fn block_with_timeout( - &self, - future: &mut Fut, - timeout: Duration, - ) -> Option { - self.scheduler.block_with_timeout(future, timeout) - } - pub fn timer(&self, duration: Duration) -> Timer { self.scheduler.timer(duration) } + + pub fn scheduler(&self) -> &Arc { + &self.scheduler + } } /// Task is a primitive that allows work to happen in the background. @@ -116,6 +130,13 @@ impl Task { Task(TaskState::Ready(Some(val))) } + pub fn is_ready(&self) -> bool { + match &self.0 { + TaskState::Ready(_) => true, + TaskState::Spawned(task) => task.is_finished(), + } + } + /// Detaching a task runs it to completion in the background pub fn detach(self) { match self { @@ -135,3 +156,68 @@ impl Future for Task { } } } + +/// Variant of `async_task::spawn_local` that includes the source location of the spawn in panics. +/// +/// Copy-modified from: +/// +#[track_caller] +fn spawn_local_with_source_location( + future: Fut, + schedule: S, +) -> (async_task::Runnable, async_task::Task) +where + Fut: Future + 'static, + Fut::Output: 'static, + S: async_task::Schedule + Send + Sync + 'static, +{ + #[inline] + fn thread_id() -> ThreadId { + std::thread_local! { + static ID: ThreadId = thread::current().id(); + } + ID.try_with(|id| *id) + .unwrap_or_else(|_| thread::current().id()) + } + + struct Checked { + id: ThreadId, + inner: ManuallyDrop, + location: &'static Location<'static>, + } + + impl Drop for Checked { + fn drop(&mut self) { + assert!( + self.id == thread_id(), + "local task dropped by a thread that didn't spawn it. Task spawned at {}", + self.location + ); + unsafe { + ManuallyDrop::drop(&mut self.inner); + } + } + } + + impl Future for Checked { + type Output = F::Output; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + assert!( + self.id == thread_id(), + "local task polled by a thread that didn't spawn it. Task spawned at {}", + self.location + ); + unsafe { self.map_unchecked_mut(|c| &mut *c.inner).poll(cx) } + } + } + + // Wrap the future into one that checks which thread it's on. + let future = Checked { + id: thread_id(), + inner: ManuallyDrop::new(future), + location: Location::caller(), + }; + + unsafe { async_task::spawn_unchecked(future, schedule) } +} diff --git a/crates/scheduler/src/scheduler.rs b/crates/scheduler/src/scheduler.rs index ee1964784565266aba2fcc1efd1cd8de0a7fd5e7..8d485e009cc68d0a2da784ee2f91e9fbf4f2b3b9 100644 --- a/crates/scheduler/src/scheduler.rs +++ b/crates/scheduler/src/scheduler.rs @@ -13,44 +13,44 @@ use futures::{FutureExt as _, channel::oneshot, future::LocalBoxFuture}; use std::{ future::Future, pin::Pin, + sync::Arc, task::{Context, Poll}, time::Duration, }; pub trait Scheduler: Send + Sync { - fn block(&self, future: LocalBoxFuture<()>, timeout: Option); + fn block( + &self, + session_id: Option, + future: LocalBoxFuture<()>, + timeout: Option, + ); fn schedule_foreground(&self, session_id: SessionId, runnable: Runnable); fn schedule_background(&self, runnable: Runnable); fn timer(&self, timeout: Duration) -> Timer; - fn is_main_thread(&self) -> bool; -} - -impl dyn Scheduler { - pub fn block_on(&self, future: Fut) -> Fut::Output { - let mut output = None; - self.block(async { output = Some(future.await) }.boxed_local(), None); - output.unwrap() - } - - pub fn block_with_timeout( - &self, - future: &mut Fut, - timeout: Duration, - ) -> Option { - let mut output = None; - self.block( - async { output = Some(future.await) }.boxed_local(), - Some(timeout), - ); - output + fn clock(&self) -> Arc; + fn as_test(&self) -> &TestScheduler { + panic!("this is not a test scheduler") } } #[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd)] pub struct SessionId(u16); +impl SessionId { + pub fn new(id: u16) -> Self { + SessionId(id) + } +} + pub struct Timer(oneshot::Receiver<()>); +impl Timer { + pub fn new(rx: oneshot::Receiver<()>) -> Self { + Timer(rx) + } +} + impl Future for Timer { type Output = (); diff --git a/crates/scheduler/src/test_scheduler.rs b/crates/scheduler/src/test_scheduler.rs index 479759d9bdb775a3d2a71bae586fba9d658e71ce..243f6f756df15f9b05b87ec70b37c1392f60c57d 100644 --- a/crates/scheduler/src/test_scheduler.rs +++ b/crates/scheduler/src/test_scheduler.rs @@ -1,31 +1,37 @@ use crate::{ - BackgroundExecutor, Clock as _, ForegroundExecutor, Scheduler, SessionId, TestClock, Timer, + BackgroundExecutor, Clock, ForegroundExecutor, Scheduler, SessionId, TestClock, Timer, }; use async_task::Runnable; -use chrono::{DateTime, Duration as ChronoDuration, Utc}; +use backtrace::{Backtrace, BacktraceFrame}; use futures::{FutureExt as _, channel::oneshot, future::LocalBoxFuture}; use parking_lot::Mutex; use rand::prelude::*; use std::{ - collections::VecDeque, + any::type_name_of_val, + collections::{BTreeMap, VecDeque}, + env, + fmt::Write, future::Future, + mem, + ops::RangeInclusive, panic::{self, AssertUnwindSafe}, pin::Pin, sync::{ Arc, atomic::{AtomicBool, Ordering::SeqCst}, }, - task::{Context, Poll, Wake, Waker}, - thread, + task::{Context, Poll, RawWaker, RawWakerVTable, Waker}, + thread::{self, Thread}, time::{Duration, Instant}, }; +const PENDING_TRACES_VAR_NAME: &str = "PENDING_TRACES"; + pub struct TestScheduler { clock: Arc, rng: Arc>, - state: Mutex, - pub thread_id: thread::ThreadId, - pub config: SchedulerConfig, + state: Arc>, + thread: Thread, } impl TestScheduler { @@ -52,26 +58,30 @@ impl TestScheduler { /// Run a test once with a specific seed pub fn with_seed(seed: u64, f: impl AsyncFnOnce(Arc) -> R) -> R { - let scheduler = Arc::new(TestScheduler::new(SchedulerConfig::with_seed(seed))); + let scheduler = Arc::new(TestScheduler::new(TestSchedulerConfig::with_seed(seed))); let future = f(scheduler.clone()); - let result = scheduler.block_on(future); - scheduler.run(); + let result = scheduler.foreground().block_on(future); + scheduler.run(); // Ensure spawned tasks finish up before returning in tests result } - pub fn new(config: SchedulerConfig) -> Self { + pub fn new(config: TestSchedulerConfig) -> Self { Self { rng: Arc::new(Mutex::new(StdRng::seed_from_u64(config.seed))), - state: Mutex::new(SchedulerState { + state: Arc::new(Mutex::new(SchedulerState { runnables: VecDeque::new(), timers: Vec::new(), + blocked_sessions: Vec::new(), randomize_order: config.randomize_order, allow_parking: config.allow_parking, + timeout_ticks: config.timeout_ticks, next_session_id: SessionId(0), - }), - thread_id: thread::current().id(), + capture_pending_traces: config.capture_pending_traces, + pending_traces: BTreeMap::new(), + next_trace_id: TraceId(0), + })), clock: Arc::new(TestClock::new()), - config, + thread: thread::current(), } } @@ -83,6 +93,18 @@ impl TestScheduler { self.rng.clone() } + pub fn set_timeout_ticks(&self, timeout_ticks: RangeInclusive) { + self.state.lock().timeout_ticks = timeout_ticks; + } + + pub fn allow_parking(&self) { + self.state.lock().allow_parking = true; + } + + pub fn forbid_parking(&self) { + self.state.lock().allow_parking = false; + } + /// Create a foreground executor for this scheduler pub fn foreground(self: &Arc) -> ForegroundExecutor { let session_id = { @@ -98,16 +120,17 @@ impl TestScheduler { BackgroundExecutor::new(self.clone()) } - pub fn block_on(&self, future: Fut) -> Fut::Output { - (self as &dyn Scheduler).block_on(future) - } - pub fn yield_random(&self) -> Yield { - Yield(self.rng.lock().random_range(0..20)) + let rng = &mut *self.rng.lock(); + if rng.random_bool(0.1) { + Yield(rng.random_range(10..20)) + } else { + Yield(rng.random_range(0..2)) + } } pub fn run(&self) { - while self.step() || self.advance_clock() { + while self.step() { // Continue until no work remains } } @@ -125,7 +148,16 @@ impl TestScheduler { return true; } - let runnable = self.state.lock().runnables.pop_front(); + let runnable = { + let state = &mut *self.state.lock(); + let ix = state.runnables.iter().position(|runnable| { + runnable + .session_id + .is_none_or(|session_id| !state.blocked_sessions.contains(&session_id)) + }); + ix.and_then(|ix| state.runnables.remove(ix)) + }; + if let Some(runnable) = runnable { runnable.run(); return true; @@ -134,19 +166,120 @@ impl TestScheduler { false } - fn advance_clock(&self) -> bool { + fn advance_clock_to_next_timer(&self) -> bool { if let Some(timer) = self.state.lock().timers.first() { - self.clock.set_now(timer.expiration); + self.clock.advance(timer.expiration - self.clock.now()); true } else { false } } + + pub fn advance_clock(&self, duration: Duration) { + let next_now = self.clock.now() + duration; + loop { + self.run(); + if let Some(timer) = self.state.lock().timers.first() + && timer.expiration <= next_now + { + self.clock.advance(timer.expiration - self.clock.now()); + } else { + break; + } + } + self.clock.advance(next_now - self.clock.now()); + } + + fn park(&self, deadline: Option) -> bool { + if self.state.lock().allow_parking { + if let Some(deadline) = deadline { + let now = Instant::now(); + let timeout = deadline.saturating_duration_since(now); + thread::park_timeout(timeout); + now.elapsed() < timeout + } else { + thread::park(); + true + } + } else if deadline.is_some() { + false + } else if self.state.lock().capture_pending_traces { + let mut pending_traces = String::new(); + for (_, trace) in mem::take(&mut self.state.lock().pending_traces) { + writeln!(pending_traces, "{:?}", exclude_wakers_from_trace(trace)).unwrap(); + } + panic!("Parking forbidden. Pending traces:\n{}", pending_traces); + } else { + panic!( + "Parking forbidden. Re-run with {PENDING_TRACES_VAR_NAME}=1 to show pending traces" + ); + } + } } impl Scheduler for TestScheduler { - fn is_main_thread(&self) -> bool { - thread::current().id() == self.thread_id + /// Block until the given future completes, with an optional timeout. If the + /// future is unable to make progress at any moment before the timeout and + /// no other tasks or timers remain, we panic unless parking is allowed. If + /// parking is allowed, we block up to the timeout or indefinitely if none + /// is provided. This is to allow testing a mix of deterministic and + /// non-deterministic async behavior, such as when interacting with I/O in + /// an otherwise deterministic test. + fn block( + &self, + session_id: Option, + mut future: LocalBoxFuture<()>, + timeout: Option, + ) { + if let Some(session_id) = session_id { + self.state.lock().blocked_sessions.push(session_id); + } + + let deadline = timeout.map(|timeout| Instant::now() + timeout); + let awoken = Arc::new(AtomicBool::new(false)); + let waker = Box::new(TracingWaker { + id: None, + awoken: awoken.clone(), + thread: self.thread.clone(), + state: self.state.clone(), + }); + let waker = unsafe { Waker::new(Box::into_raw(waker) as *const (), &WAKER_VTABLE) }; + let max_ticks = if timeout.is_some() { + self.rng + .lock() + .random_range(self.state.lock().timeout_ticks.clone()) + } else { + usize::MAX + }; + let mut cx = Context::from_waker(&waker); + + for _ in 0..max_ticks { + let Poll::Pending = future.poll_unpin(&mut cx) else { + break; + }; + + let mut stepped = None; + while self.rng.lock().random() { + let stepped = stepped.get_or_insert(false); + if self.step() { + *stepped = true; + } else { + break; + } + } + + let stepped = stepped.unwrap_or(true); + let awoken = awoken.swap(false, SeqCst); + if !stepped && !awoken && !self.advance_clock_to_next_timer() { + if !self.park(deadline) { + break; + } + } + } + + if session_id.is_some() { + self.state.lock().blocked_sessions.pop(); + } } fn schedule_foreground(&self, session_id: SessionId, runnable: Runnable) { @@ -170,6 +303,8 @@ impl Scheduler for TestScheduler { runnable, }, ); + drop(state); + self.thread.unpark(); } fn schedule_background(&self, runnable: Runnable) { @@ -186,83 +321,40 @@ impl Scheduler for TestScheduler { runnable, }, ); + drop(state); + self.thread.unpark(); } fn timer(&self, duration: Duration) -> Timer { let (tx, rx) = oneshot::channel(); - let expiration = self.clock.now() + ChronoDuration::from_std(duration).unwrap(); let state = &mut *self.state.lock(); state.timers.push(ScheduledTimer { - expiration, + expiration: self.clock.now() + duration, _notify: tx, }); state.timers.sort_by_key(|timer| timer.expiration); Timer(rx) } - /// Block until the given future completes, with an optional timeout. If the - /// future is unable to make progress at any moment before the timeout and - /// no other tasks or timers remain, we panic unless parking is allowed. If - /// parking is allowed, we block up to the timeout or indefinitely if none - /// is provided. This is to allow testing a mix of deterministic and - /// non-deterministic async behavior, such as when interacting with I/O in - /// an otherwise deterministic test. - fn block(&self, mut future: LocalBoxFuture<()>, timeout: Option) { - let (parker, unparker) = parking::pair(); - let deadline = timeout.map(|timeout| Instant::now() + timeout); - let awoken = Arc::new(AtomicBool::new(false)); - let waker = Waker::from(Arc::new(WakerFn::new({ - let awoken = awoken.clone(); - move || { - awoken.store(true, SeqCst); - unparker.unpark(); - } - }))); - let max_ticks = if timeout.is_some() { - self.rng - .lock() - .random_range(0..=self.config.max_timeout_ticks) - } else { - usize::MAX - }; - let mut cx = Context::from_waker(&waker); - - for _ in 0..max_ticks { - let Poll::Pending = future.poll_unpin(&mut cx) else { - break; - }; - - let mut stepped = None; - while self.rng.lock().random() && stepped.unwrap_or(true) { - *stepped.get_or_insert(false) |= self.step(); - } + fn clock(&self) -> Arc { + self.clock.clone() + } - let stepped = stepped.unwrap_or(true); - let awoken = awoken.swap(false, SeqCst); - if !stepped && !awoken && !self.advance_clock() { - if self.state.lock().allow_parking { - if !park(&parker, deadline) { - break; - } - } else if deadline.is_some() { - break; - } else { - panic!("Parking forbidden"); - } - } - } + fn as_test(&self) -> &TestScheduler { + self } } #[derive(Clone, Debug)] -pub struct SchedulerConfig { +pub struct TestSchedulerConfig { pub seed: u64, pub randomize_order: bool, pub allow_parking: bool, - pub max_timeout_ticks: usize, + pub capture_pending_traces: bool, + pub timeout_ticks: RangeInclusive, } -impl SchedulerConfig { +impl TestSchedulerConfig { pub fn with_seed(seed: u64) -> Self { Self { seed, @@ -271,13 +363,15 @@ impl SchedulerConfig { } } -impl Default for SchedulerConfig { +impl Default for TestSchedulerConfig { fn default() -> Self { Self { seed: 0, randomize_order: true, allow_parking: false, - max_timeout_ticks: 1000, + capture_pending_traces: env::var(PENDING_TRACES_VAR_NAME) + .map_or(false, |var| var == "1" || var == "true"), + timeout_ticks: 0..=1000, } } } @@ -294,35 +388,104 @@ impl ScheduledRunnable { } struct ScheduledTimer { - expiration: DateTime, + expiration: Instant, _notify: oneshot::Sender<()>, } struct SchedulerState { runnables: VecDeque, timers: Vec, + blocked_sessions: Vec, randomize_order: bool, allow_parking: bool, + timeout_ticks: RangeInclusive, next_session_id: SessionId, + capture_pending_traces: bool, + next_trace_id: TraceId, + pending_traces: BTreeMap, } -struct WakerFn { - f: F, +const WAKER_VTABLE: RawWakerVTable = RawWakerVTable::new( + TracingWaker::clone_raw, + TracingWaker::wake_raw, + TracingWaker::wake_by_ref_raw, + TracingWaker::drop_raw, +); + +#[derive(Copy, Clone, Eq, PartialEq, PartialOrd, Ord)] +struct TraceId(usize); + +struct TracingWaker { + id: Option, + awoken: Arc, + thread: Thread, + state: Arc>, } -impl WakerFn { - fn new(f: F) -> Self { - Self { f } +impl Clone for TracingWaker { + fn clone(&self) -> Self { + let mut state = self.state.lock(); + let id = if state.capture_pending_traces { + let id = state.next_trace_id; + state.next_trace_id.0 += 1; + state.pending_traces.insert(id, Backtrace::new_unresolved()); + Some(id) + } else { + None + }; + Self { + id, + awoken: self.awoken.clone(), + thread: self.thread.clone(), + state: self.state.clone(), + } } } -impl Wake for WakerFn { - fn wake(self: Arc) { - (self.f)(); +impl Drop for TracingWaker { + fn drop(&mut self) { + if let Some(id) = self.id { + self.state.lock().pending_traces.remove(&id); + } + } +} + +impl TracingWaker { + fn wake(self) { + self.wake_by_ref(); + } + + fn wake_by_ref(&self) { + if let Some(id) = self.id { + self.state.lock().pending_traces.remove(&id); + } + self.awoken.store(true, SeqCst); + self.thread.unpark(); } - fn wake_by_ref(self: &Arc) { - (self.f)(); + fn clone_raw(waker: *const ()) -> RawWaker { + let waker = waker as *const TracingWaker; + let waker = unsafe { &*waker }; + RawWaker::new( + Box::into_raw(Box::new(waker.clone())) as *const (), + &WAKER_VTABLE, + ) + } + + fn wake_raw(waker: *const ()) { + let waker = unsafe { Box::from_raw(waker as *mut TracingWaker) }; + waker.wake(); + } + + fn wake_by_ref_raw(waker: *const ()) { + let waker = waker as *const TracingWaker; + let waker = unsafe { &*waker }; + waker.wake_by_ref(); + } + + fn drop_raw(waker: *const ()) { + let waker = unsafe { Box::from_raw(waker as *mut TracingWaker) }; + drop(waker); } } @@ -342,11 +505,20 @@ impl Future for Yield { } } -fn park(parker: &parking::Parker, deadline: Option) -> bool { - if let Some(deadline) = deadline { - parker.park_deadline(deadline) - } else { - parker.park(); - true +fn exclude_wakers_from_trace(mut trace: Backtrace) -> Backtrace { + trace.resolve(); + let mut frames: Vec = trace.into(); + let waker_clone_frame_ix = frames.iter().position(|frame| { + frame.symbols().iter().any(|symbol| { + symbol + .name() + .is_some_and(|name| format!("{name:#?}") == type_name_of_val(&Waker::clone)) + }) + }); + + if let Some(waker_clone_frame_ix) = waker_clone_frame_ix { + frames.drain(..waker_clone_frame_ix + 1); } + + Backtrace::from(frames) } diff --git a/crates/scheduler/src/tests.rs b/crates/scheduler/src/tests.rs index 19eb354e979083b1ec070bd5d09e5871001a8c4f..54ce84e8841c738316232d9e8fec16f62810385e 100644 --- a/crates/scheduler/src/tests.rs +++ b/crates/scheduler/src/tests.rs @@ -153,7 +153,7 @@ fn test_randomize_order() { // Test deterministic mode: different seeds should produce same execution order let mut deterministic_results = HashSet::new(); for seed in 0..10 { - let config = SchedulerConfig { + let config = TestSchedulerConfig { seed, randomize_order: false, ..Default::default() @@ -173,7 +173,7 @@ fn test_randomize_order() { // Test randomized mode: different seeds can produce different execution orders let mut randomized_results = HashSet::new(); for seed in 0..20 { - let config = SchedulerConfig::with_seed(seed); + let config = TestSchedulerConfig::with_seed(seed); let order = block_on(capture_execution_order(config)); assert_eq!(order.len(), 6); randomized_results.insert(order); @@ -186,7 +186,7 @@ fn test_randomize_order() { ); } -async fn capture_execution_order(config: SchedulerConfig) -> Vec { +async fn capture_execution_order(config: TestSchedulerConfig) -> Vec { let scheduler = Arc::new(TestScheduler::new(config)); let foreground = scheduler.foreground(); let background = scheduler.background(); @@ -221,49 +221,55 @@ async fn capture_execution_order(config: SchedulerConfig) -> Vec { #[test] fn test_block() { - let scheduler = Arc::new(TestScheduler::new(SchedulerConfig::default())); - let executor = BackgroundExecutor::new(scheduler); + let scheduler = Arc::new(TestScheduler::new(TestSchedulerConfig::default())); let (tx, rx) = oneshot::channel(); // Spawn background task to send value - let _ = executor + let _ = scheduler + .background() .spawn(async move { tx.send(42).unwrap(); }) .detach(); // Block on receiving the value - let result = executor.block_on(async { rx.await.unwrap() }); + let result = scheduler.foreground().block_on(async { rx.await.unwrap() }); assert_eq!(result, 42); } #[test] -#[should_panic(expected = "Parking forbidden")] +#[should_panic(expected = "futures_channel::oneshot::Inner")] fn test_parking_panics() { - let scheduler = Arc::new(TestScheduler::new(SchedulerConfig::default())); - let executor = BackgroundExecutor::new(scheduler); - executor.block_on(future::pending::<()>()); + let config = TestSchedulerConfig { + capture_pending_traces: true, + ..Default::default() + }; + let scheduler = Arc::new(TestScheduler::new(config)); + scheduler.foreground().block_on(async { + let (_tx, rx) = oneshot::channel::<()>(); + rx.await.unwrap(); // This will never complete + }); } #[test] fn test_block_with_parking() { - let config = SchedulerConfig { + let config = TestSchedulerConfig { allow_parking: true, ..Default::default() }; let scheduler = Arc::new(TestScheduler::new(config)); - let executor = BackgroundExecutor::new(scheduler); let (tx, rx) = oneshot::channel(); // Spawn background task to send value - let _ = executor + let _ = scheduler + .background() .spawn(async move { tx.send(42).unwrap(); }) .detach(); // Block on receiving the value (will park if needed) - let result = executor.block_on(async { rx.await.unwrap() }); + let result = scheduler.foreground().block_on(async { rx.await.unwrap() }); assert_eq!(result, 42); } @@ -298,30 +304,31 @@ fn test_helper_methods() { fn test_block_with_timeout() { // Test case: future completes within timeout TestScheduler::once(async |scheduler| { - let background = scheduler.background(); - let mut future = future::ready(42); - let output = background.block_with_timeout(&mut future, Duration::from_millis(100)); - assert_eq!(output, Some(42)); + let foreground = scheduler.foreground(); + let future = future::ready(42); + let output = foreground.block_with_timeout(Duration::from_millis(100), future); + assert_eq!(output.unwrap(), 42); }); // Test case: future times out TestScheduler::once(async |scheduler| { - let background = scheduler.background(); - let mut future = future::pending::<()>(); - let output = background.block_with_timeout(&mut future, Duration::from_millis(50)); - assert_eq!(output, None); + let foreground = scheduler.foreground(); + let future = future::pending::<()>(); + let output = foreground.block_with_timeout(Duration::from_millis(50), future); + let _ = output.expect_err("future should not have finished"); }); // Test case: future makes progress via timer but still times out let mut results = BTreeSet::new(); TestScheduler::many(100, async |scheduler| { - let background = scheduler.background(); - let mut task = background.spawn(async move { + let task = scheduler.background().spawn(async move { Yield { polls: 10 }.await; 42 }); - let output = background.block_with_timeout(&mut task, Duration::from_millis(50)); - results.insert(output); + let output = scheduler + .foreground() + .block_with_timeout(Duration::from_millis(50), task); + results.insert(output.ok()); }); assert_eq!( results.into_iter().collect::>(), @@ -329,6 +336,33 @@ fn test_block_with_timeout() { ); } +// When calling block, we shouldn't make progress on foreground-spawned futures with the same session id. +#[test] +fn test_block_does_not_progress_same_session_foreground() { + let mut task2_made_progress_once = false; + TestScheduler::many(1000, async |scheduler| { + let foreground1 = scheduler.foreground(); + let foreground2 = scheduler.foreground(); + + let task1 = foreground1.spawn(async move {}); + let task2 = foreground2.spawn(async move {}); + + foreground1.block_on(async { + scheduler.yield_random().await; + assert!(!task1.is_ready()); + task2_made_progress_once |= task2.is_ready(); + }); + + task1.await; + task2.await; + }); + + assert!( + task2_made_progress_once, + "Expected task from different foreground executor to make progress (at least once)" + ); +} + struct Yield { polls: usize, }