Cargo.lock 🔗
@@ -14348,9 +14348,9 @@ name = "scheduler"
version = "0.1.0"
dependencies = [
"async-task",
+ "backtrace",
"chrono",
"futures 0.3.31",
- "parking",
"parking_lot",
"rand 0.9.1",
"workspace-hack",
Antonio Scandurra and Nathan created
Added features that weren't needed in our cloud code.
Release Notes:
- N/A
---------
Co-authored-by: Nathan <nathan@zed.dev>
Cargo.lock | 2
Cargo.toml | 2
crates/scheduler/Cargo.toml | 2
crates/scheduler/src/clock.rs | 38 +-
crates/scheduler/src/executor.rs | 146 ++++++++--
crates/scheduler/src/scheduler.rs | 44 +-
crates/scheduler/src/test_scheduler.rs | 374 ++++++++++++++++++++-------
crates/scheduler/src/tests.rs | 88 ++++--
8 files changed, 498 insertions(+), 198 deletions(-)
@@ -14348,9 +14348,9 @@ name = "scheduler"
version = "0.1.0"
dependencies = [
"async-task",
+ "backtrace",
"chrono",
"futures 0.3.31",
- "parking",
"parking_lot",
"rand 0.9.1",
"workspace-hack",
@@ -460,6 +460,7 @@ aws-sdk-bedrockruntime = { version = "1.80.0", features = [
] }
aws-smithy-runtime-api = { version = "1.7.4", features = ["http-1x", "client"] }
aws-smithy-types = { version = "1.3.0", features = ["http-body-1-x"] }
+backtrace = "0.3"
base64 = "0.22"
bincode = "1.2.1"
bitflags = "2.6.0"
@@ -567,7 +568,6 @@ objc2-foundation = { version = "0.3", default-features = false, features = [
open = "5.0.0"
ordered-float = "2.1.1"
palette = { version = "0.7.5", default-features = false, features = ["std"] }
-parking = "2.0"
parking_lot = "0.12.1"
partial-json-fixer = "0.5.3"
parse_int = "0.9"
@@ -17,9 +17,9 @@ test-support = []
[dependencies]
async-task.workspace = true
+backtrace.workspace = true
chrono.workspace = true
futures.workspace = true
-parking.workspace = true
parking_lot.workspace = true
rand.workspace = true
workspace-hack.workspace = true
@@ -1,34 +1,42 @@
-use chrono::{DateTime, Duration, Utc};
+use chrono::{DateTime, Utc};
use parking_lot::Mutex;
+use std::time::{Duration, Instant};
pub trait Clock {
- fn now(&self) -> DateTime<Utc>;
+ fn utc_now(&self) -> DateTime<Utc>;
+ fn now(&self) -> Instant;
}
-pub struct TestClock {
- now: Mutex<DateTime<Utc>>,
+pub struct TestClock(Mutex<TestClockState>);
+
+struct TestClockState {
+ now: Instant,
+ utc_now: DateTime<Utc>,
}
impl TestClock {
pub fn new() -> Self {
const START_TIME: &str = "2025-07-01T23:59:58-00:00";
- let now = DateTime::parse_from_rfc3339(START_TIME).unwrap().to_utc();
- Self {
- now: Mutex::new(now),
- }
- }
-
- pub fn set_now(&self, now: DateTime<Utc>) {
- *self.now.lock() = now;
+ let utc_now = DateTime::parse_from_rfc3339(START_TIME).unwrap().to_utc();
+ Self(Mutex::new(TestClockState {
+ now: Instant::now(),
+ utc_now,
+ }))
}
pub fn advance(&self, duration: Duration) {
- *self.now.lock() += duration;
+ let mut state = self.0.lock();
+ state.now += duration;
+ state.utc_now += duration;
}
}
impl Clock for TestClock {
- fn now(&self) -> DateTime<Utc> {
- *self.now.lock()
+ fn utc_now(&self) -> DateTime<Utc> {
+ self.0.lock().utc_now
+ }
+
+ fn now(&self) -> Instant {
+ self.0.lock().now
}
}
@@ -1,11 +1,15 @@
use crate::{Scheduler, SessionId, Timer};
+use futures::FutureExt as _;
use std::{
future::Future,
marker::PhantomData,
+ mem::ManuallyDrop,
+ panic::Location,
pin::Pin,
rc::Rc,
sync::Arc,
task::{Context, Poll},
+ thread::{self, ThreadId},
time::Duration,
};
@@ -17,6 +21,15 @@ pub struct ForegroundExecutor {
}
impl ForegroundExecutor {
+ pub fn new(session_id: SessionId, scheduler: Arc<dyn Scheduler>) -> Self {
+ Self {
+ session_id,
+ scheduler,
+ not_send: PhantomData,
+ }
+ }
+
+ #[track_caller]
pub fn spawn<F>(&self, future: F) -> Task<F::Output>
where
F: Future + 'static,
@@ -24,43 +37,52 @@ impl ForegroundExecutor {
{
let session_id = self.session_id;
let scheduler = Arc::clone(&self.scheduler);
- let (runnable, task) = async_task::spawn_local(future, move |runnable| {
+ let (runnable, task) = spawn_local_with_source_location(future, move |runnable| {
scheduler.schedule_foreground(session_id, runnable);
});
runnable.schedule();
Task(TaskState::Spawned(task))
}
- pub fn timer(&self, duration: Duration) -> Timer {
- self.scheduler.timer(duration)
+ pub fn block_on<Fut: Future>(&self, future: Fut) -> Fut::Output {
+ let mut output = None;
+ self.scheduler.block(
+ Some(self.session_id),
+ async { output = Some(future.await) }.boxed_local(),
+ None,
+ );
+ output.unwrap()
}
-}
-impl ForegroundExecutor {
- pub fn new(session_id: SessionId, scheduler: Arc<dyn Scheduler>) -> Self {
- assert!(
- scheduler.is_main_thread(),
- "ForegroundExecutor must be created on the same thread as the Scheduler"
+ pub fn block_with_timeout<Fut: Unpin + Future>(
+ &self,
+ timeout: Duration,
+ mut future: Fut,
+ ) -> Result<Fut::Output, Fut> {
+ let mut output = None;
+ self.scheduler.block(
+ Some(self.session_id),
+ async { output = Some((&mut future).await) }.boxed_local(),
+ Some(timeout),
);
- Self {
- session_id,
- scheduler,
- not_send: PhantomData,
- }
+ output.ok_or(future)
}
-}
-impl BackgroundExecutor {
- pub fn new(scheduler: Arc<dyn Scheduler>) -> Self {
- Self { scheduler }
+ pub fn timer(&self, duration: Duration) -> Timer {
+ self.scheduler.timer(duration)
}
}
+#[derive(Clone)]
pub struct BackgroundExecutor {
scheduler: Arc<dyn Scheduler>,
}
impl BackgroundExecutor {
+ pub fn new(scheduler: Arc<dyn Scheduler>) -> Self {
+ Self { scheduler }
+ }
+
pub fn spawn<F>(&self, future: F) -> Task<F::Output>
where
F: Future + Send + 'static,
@@ -74,21 +96,13 @@ impl BackgroundExecutor {
Task(TaskState::Spawned(task))
}
- pub fn block_on<Fut: Future>(&self, future: Fut) -> Fut::Output {
- self.scheduler.block_on(future)
- }
-
- pub fn block_with_timeout<Fut: Unpin + Future>(
- &self,
- future: &mut Fut,
- timeout: Duration,
- ) -> Option<Fut::Output> {
- self.scheduler.block_with_timeout(future, timeout)
- }
-
pub fn timer(&self, duration: Duration) -> Timer {
self.scheduler.timer(duration)
}
+
+ pub fn scheduler(&self) -> &Arc<dyn Scheduler> {
+ &self.scheduler
+ }
}
/// Task is a primitive that allows work to happen in the background.
@@ -116,6 +130,13 @@ impl<T> Task<T> {
Task(TaskState::Ready(Some(val)))
}
+ pub fn is_ready(&self) -> bool {
+ match &self.0 {
+ TaskState::Ready(_) => true,
+ TaskState::Spawned(task) => task.is_finished(),
+ }
+ }
+
/// Detaching a task runs it to completion in the background
pub fn detach(self) {
match self {
@@ -135,3 +156,68 @@ impl<T> Future for Task<T> {
}
}
}
+
+/// Variant of `async_task::spawn_local` that includes the source location of the spawn in panics.
+///
+/// Copy-modified from:
+/// <https://github.com/smol-rs/async-task/blob/ca9dbe1db9c422fd765847fa91306e30a6bb58a9/src/runnable.rs#L405>
+#[track_caller]
+fn spawn_local_with_source_location<Fut, S>(
+ future: Fut,
+ schedule: S,
+) -> (async_task::Runnable, async_task::Task<Fut::Output, ()>)
+where
+ Fut: Future + 'static,
+ Fut::Output: 'static,
+ S: async_task::Schedule + Send + Sync + 'static,
+{
+ #[inline]
+ fn thread_id() -> ThreadId {
+ std::thread_local! {
+ static ID: ThreadId = thread::current().id();
+ }
+ ID.try_with(|id| *id)
+ .unwrap_or_else(|_| thread::current().id())
+ }
+
+ struct Checked<F> {
+ id: ThreadId,
+ inner: ManuallyDrop<F>,
+ location: &'static Location<'static>,
+ }
+
+ impl<F> Drop for Checked<F> {
+ fn drop(&mut self) {
+ assert!(
+ self.id == thread_id(),
+ "local task dropped by a thread that didn't spawn it. Task spawned at {}",
+ self.location
+ );
+ unsafe {
+ ManuallyDrop::drop(&mut self.inner);
+ }
+ }
+ }
+
+ impl<F: Future> Future for Checked<F> {
+ type Output = F::Output;
+
+ fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ assert!(
+ self.id == thread_id(),
+ "local task polled by a thread that didn't spawn it. Task spawned at {}",
+ self.location
+ );
+ unsafe { self.map_unchecked_mut(|c| &mut *c.inner).poll(cx) }
+ }
+ }
+
+ // Wrap the future into one that checks which thread it's on.
+ let future = Checked {
+ id: thread_id(),
+ inner: ManuallyDrop::new(future),
+ location: Location::caller(),
+ };
+
+ unsafe { async_task::spawn_unchecked(future, schedule) }
+}
@@ -13,44 +13,44 @@ use futures::{FutureExt as _, channel::oneshot, future::LocalBoxFuture};
use std::{
future::Future,
pin::Pin,
+ sync::Arc,
task::{Context, Poll},
time::Duration,
};
pub trait Scheduler: Send + Sync {
- fn block(&self, future: LocalBoxFuture<()>, timeout: Option<Duration>);
+ fn block(
+ &self,
+ session_id: Option<SessionId>,
+ future: LocalBoxFuture<()>,
+ timeout: Option<Duration>,
+ );
fn schedule_foreground(&self, session_id: SessionId, runnable: Runnable);
fn schedule_background(&self, runnable: Runnable);
fn timer(&self, timeout: Duration) -> Timer;
- fn is_main_thread(&self) -> bool;
-}
-
-impl dyn Scheduler {
- pub fn block_on<Fut: Future>(&self, future: Fut) -> Fut::Output {
- let mut output = None;
- self.block(async { output = Some(future.await) }.boxed_local(), None);
- output.unwrap()
- }
-
- pub fn block_with_timeout<Fut: Unpin + Future>(
- &self,
- future: &mut Fut,
- timeout: Duration,
- ) -> Option<Fut::Output> {
- let mut output = None;
- self.block(
- async { output = Some(future.await) }.boxed_local(),
- Some(timeout),
- );
- output
+ fn clock(&self) -> Arc<dyn Clock>;
+ fn as_test(&self) -> &TestScheduler {
+ panic!("this is not a test scheduler")
}
}
#[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd)]
pub struct SessionId(u16);
+impl SessionId {
+ pub fn new(id: u16) -> Self {
+ SessionId(id)
+ }
+}
+
pub struct Timer(oneshot::Receiver<()>);
+impl Timer {
+ pub fn new(rx: oneshot::Receiver<()>) -> Self {
+ Timer(rx)
+ }
+}
+
impl Future for Timer {
type Output = ();
@@ -1,31 +1,37 @@
use crate::{
- BackgroundExecutor, Clock as _, ForegroundExecutor, Scheduler, SessionId, TestClock, Timer,
+ BackgroundExecutor, Clock, ForegroundExecutor, Scheduler, SessionId, TestClock, Timer,
};
use async_task::Runnable;
-use chrono::{DateTime, Duration as ChronoDuration, Utc};
+use backtrace::{Backtrace, BacktraceFrame};
use futures::{FutureExt as _, channel::oneshot, future::LocalBoxFuture};
use parking_lot::Mutex;
use rand::prelude::*;
use std::{
- collections::VecDeque,
+ any::type_name_of_val,
+ collections::{BTreeMap, VecDeque},
+ env,
+ fmt::Write,
future::Future,
+ mem,
+ ops::RangeInclusive,
panic::{self, AssertUnwindSafe},
pin::Pin,
sync::{
Arc,
atomic::{AtomicBool, Ordering::SeqCst},
},
- task::{Context, Poll, Wake, Waker},
- thread,
+ task::{Context, Poll, RawWaker, RawWakerVTable, Waker},
+ thread::{self, Thread},
time::{Duration, Instant},
};
+const PENDING_TRACES_VAR_NAME: &str = "PENDING_TRACES";
+
pub struct TestScheduler {
clock: Arc<TestClock>,
rng: Arc<Mutex<StdRng>>,
- state: Mutex<SchedulerState>,
- pub thread_id: thread::ThreadId,
- pub config: SchedulerConfig,
+ state: Arc<Mutex<SchedulerState>>,
+ thread: Thread,
}
impl TestScheduler {
@@ -52,26 +58,30 @@ impl TestScheduler {
/// Run a test once with a specific seed
pub fn with_seed<R>(seed: u64, f: impl AsyncFnOnce(Arc<TestScheduler>) -> R) -> R {
- let scheduler = Arc::new(TestScheduler::new(SchedulerConfig::with_seed(seed)));
+ let scheduler = Arc::new(TestScheduler::new(TestSchedulerConfig::with_seed(seed)));
let future = f(scheduler.clone());
- let result = scheduler.block_on(future);
- scheduler.run();
+ let result = scheduler.foreground().block_on(future);
+ scheduler.run(); // Ensure spawned tasks finish up before returning in tests
result
}
- pub fn new(config: SchedulerConfig) -> Self {
+ pub fn new(config: TestSchedulerConfig) -> Self {
Self {
rng: Arc::new(Mutex::new(StdRng::seed_from_u64(config.seed))),
- state: Mutex::new(SchedulerState {
+ state: Arc::new(Mutex::new(SchedulerState {
runnables: VecDeque::new(),
timers: Vec::new(),
+ blocked_sessions: Vec::new(),
randomize_order: config.randomize_order,
allow_parking: config.allow_parking,
+ timeout_ticks: config.timeout_ticks,
next_session_id: SessionId(0),
- }),
- thread_id: thread::current().id(),
+ capture_pending_traces: config.capture_pending_traces,
+ pending_traces: BTreeMap::new(),
+ next_trace_id: TraceId(0),
+ })),
clock: Arc::new(TestClock::new()),
- config,
+ thread: thread::current(),
}
}
@@ -83,6 +93,18 @@ impl TestScheduler {
self.rng.clone()
}
+ pub fn set_timeout_ticks(&self, timeout_ticks: RangeInclusive<usize>) {
+ self.state.lock().timeout_ticks = timeout_ticks;
+ }
+
+ pub fn allow_parking(&self) {
+ self.state.lock().allow_parking = true;
+ }
+
+ pub fn forbid_parking(&self) {
+ self.state.lock().allow_parking = false;
+ }
+
/// Create a foreground executor for this scheduler
pub fn foreground(self: &Arc<Self>) -> ForegroundExecutor {
let session_id = {
@@ -98,16 +120,17 @@ impl TestScheduler {
BackgroundExecutor::new(self.clone())
}
- pub fn block_on<Fut: Future>(&self, future: Fut) -> Fut::Output {
- (self as &dyn Scheduler).block_on(future)
- }
-
pub fn yield_random(&self) -> Yield {
- Yield(self.rng.lock().random_range(0..20))
+ let rng = &mut *self.rng.lock();
+ if rng.random_bool(0.1) {
+ Yield(rng.random_range(10..20))
+ } else {
+ Yield(rng.random_range(0..2))
+ }
}
pub fn run(&self) {
- while self.step() || self.advance_clock() {
+ while self.step() {
// Continue until no work remains
}
}
@@ -125,7 +148,16 @@ impl TestScheduler {
return true;
}
- let runnable = self.state.lock().runnables.pop_front();
+ let runnable = {
+ let state = &mut *self.state.lock();
+ let ix = state.runnables.iter().position(|runnable| {
+ runnable
+ .session_id
+ .is_none_or(|session_id| !state.blocked_sessions.contains(&session_id))
+ });
+ ix.and_then(|ix| state.runnables.remove(ix))
+ };
+
if let Some(runnable) = runnable {
runnable.run();
return true;
@@ -134,19 +166,120 @@ impl TestScheduler {
false
}
- fn advance_clock(&self) -> bool {
+ fn advance_clock_to_next_timer(&self) -> bool {
if let Some(timer) = self.state.lock().timers.first() {
- self.clock.set_now(timer.expiration);
+ self.clock.advance(timer.expiration - self.clock.now());
true
} else {
false
}
}
+
+ pub fn advance_clock(&self, duration: Duration) {
+ let next_now = self.clock.now() + duration;
+ loop {
+ self.run();
+ if let Some(timer) = self.state.lock().timers.first()
+ && timer.expiration <= next_now
+ {
+ self.clock.advance(timer.expiration - self.clock.now());
+ } else {
+ break;
+ }
+ }
+ self.clock.advance(next_now - self.clock.now());
+ }
+
+ fn park(&self, deadline: Option<Instant>) -> bool {
+ if self.state.lock().allow_parking {
+ if let Some(deadline) = deadline {
+ let now = Instant::now();
+ let timeout = deadline.saturating_duration_since(now);
+ thread::park_timeout(timeout);
+ now.elapsed() < timeout
+ } else {
+ thread::park();
+ true
+ }
+ } else if deadline.is_some() {
+ false
+ } else if self.state.lock().capture_pending_traces {
+ let mut pending_traces = String::new();
+ for (_, trace) in mem::take(&mut self.state.lock().pending_traces) {
+ writeln!(pending_traces, "{:?}", exclude_wakers_from_trace(trace)).unwrap();
+ }
+ panic!("Parking forbidden. Pending traces:\n{}", pending_traces);
+ } else {
+ panic!(
+ "Parking forbidden. Re-run with {PENDING_TRACES_VAR_NAME}=1 to show pending traces"
+ );
+ }
+ }
}
impl Scheduler for TestScheduler {
- fn is_main_thread(&self) -> bool {
- thread::current().id() == self.thread_id
+ /// Block until the given future completes, with an optional timeout. If the
+ /// future is unable to make progress at any moment before the timeout and
+ /// no other tasks or timers remain, we panic unless parking is allowed. If
+ /// parking is allowed, we block up to the timeout or indefinitely if none
+ /// is provided. This is to allow testing a mix of deterministic and
+ /// non-deterministic async behavior, such as when interacting with I/O in
+ /// an otherwise deterministic test.
+ fn block(
+ &self,
+ session_id: Option<SessionId>,
+ mut future: LocalBoxFuture<()>,
+ timeout: Option<Duration>,
+ ) {
+ if let Some(session_id) = session_id {
+ self.state.lock().blocked_sessions.push(session_id);
+ }
+
+ let deadline = timeout.map(|timeout| Instant::now() + timeout);
+ let awoken = Arc::new(AtomicBool::new(false));
+ let waker = Box::new(TracingWaker {
+ id: None,
+ awoken: awoken.clone(),
+ thread: self.thread.clone(),
+ state: self.state.clone(),
+ });
+ let waker = unsafe { Waker::new(Box::into_raw(waker) as *const (), &WAKER_VTABLE) };
+ let max_ticks = if timeout.is_some() {
+ self.rng
+ .lock()
+ .random_range(self.state.lock().timeout_ticks.clone())
+ } else {
+ usize::MAX
+ };
+ let mut cx = Context::from_waker(&waker);
+
+ for _ in 0..max_ticks {
+ let Poll::Pending = future.poll_unpin(&mut cx) else {
+ break;
+ };
+
+ let mut stepped = None;
+ while self.rng.lock().random() {
+ let stepped = stepped.get_or_insert(false);
+ if self.step() {
+ *stepped = true;
+ } else {
+ break;
+ }
+ }
+
+ let stepped = stepped.unwrap_or(true);
+ let awoken = awoken.swap(false, SeqCst);
+ if !stepped && !awoken && !self.advance_clock_to_next_timer() {
+ if !self.park(deadline) {
+ break;
+ }
+ }
+ }
+
+ if session_id.is_some() {
+ self.state.lock().blocked_sessions.pop();
+ }
}
fn schedule_foreground(&self, session_id: SessionId, runnable: Runnable) {
@@ -170,6 +303,8 @@ impl Scheduler for TestScheduler {
runnable,
},
);
+ drop(state);
+ self.thread.unpark();
}
fn schedule_background(&self, runnable: Runnable) {
@@ -186,83 +321,40 @@ impl Scheduler for TestScheduler {
runnable,
},
);
+ drop(state);
+ self.thread.unpark();
}
fn timer(&self, duration: Duration) -> Timer {
let (tx, rx) = oneshot::channel();
- let expiration = self.clock.now() + ChronoDuration::from_std(duration).unwrap();
let state = &mut *self.state.lock();
state.timers.push(ScheduledTimer {
- expiration,
+ expiration: self.clock.now() + duration,
_notify: tx,
});
state.timers.sort_by_key(|timer| timer.expiration);
Timer(rx)
}
- /// Block until the given future completes, with an optional timeout. If the
- /// future is unable to make progress at any moment before the timeout and
- /// no other tasks or timers remain, we panic unless parking is allowed. If
- /// parking is allowed, we block up to the timeout or indefinitely if none
- /// is provided. This is to allow testing a mix of deterministic and
- /// non-deterministic async behavior, such as when interacting with I/O in
- /// an otherwise deterministic test.
- fn block(&self, mut future: LocalBoxFuture<()>, timeout: Option<Duration>) {
- let (parker, unparker) = parking::pair();
- let deadline = timeout.map(|timeout| Instant::now() + timeout);
- let awoken = Arc::new(AtomicBool::new(false));
- let waker = Waker::from(Arc::new(WakerFn::new({
- let awoken = awoken.clone();
- move || {
- awoken.store(true, SeqCst);
- unparker.unpark();
- }
- })));
- let max_ticks = if timeout.is_some() {
- self.rng
- .lock()
- .random_range(0..=self.config.max_timeout_ticks)
- } else {
- usize::MAX
- };
- let mut cx = Context::from_waker(&waker);
-
- for _ in 0..max_ticks {
- let Poll::Pending = future.poll_unpin(&mut cx) else {
- break;
- };
-
- let mut stepped = None;
- while self.rng.lock().random() && stepped.unwrap_or(true) {
- *stepped.get_or_insert(false) |= self.step();
- }
+ fn clock(&self) -> Arc<dyn Clock> {
+ self.clock.clone()
+ }
- let stepped = stepped.unwrap_or(true);
- let awoken = awoken.swap(false, SeqCst);
- if !stepped && !awoken && !self.advance_clock() {
- if self.state.lock().allow_parking {
- if !park(&parker, deadline) {
- break;
- }
- } else if deadline.is_some() {
- break;
- } else {
- panic!("Parking forbidden");
- }
- }
- }
+ fn as_test(&self) -> &TestScheduler {
+ self
}
}
#[derive(Clone, Debug)]
-pub struct SchedulerConfig {
+pub struct TestSchedulerConfig {
pub seed: u64,
pub randomize_order: bool,
pub allow_parking: bool,
- pub max_timeout_ticks: usize,
+ pub capture_pending_traces: bool,
+ pub timeout_ticks: RangeInclusive<usize>,
}
-impl SchedulerConfig {
+impl TestSchedulerConfig {
pub fn with_seed(seed: u64) -> Self {
Self {
seed,
@@ -271,13 +363,15 @@ impl SchedulerConfig {
}
}
-impl Default for SchedulerConfig {
+impl Default for TestSchedulerConfig {
fn default() -> Self {
Self {
seed: 0,
randomize_order: true,
allow_parking: false,
- max_timeout_ticks: 1000,
+ capture_pending_traces: env::var(PENDING_TRACES_VAR_NAME)
+ .map_or(false, |var| var == "1" || var == "true"),
+ timeout_ticks: 0..=1000,
}
}
}
@@ -294,35 +388,104 @@ impl ScheduledRunnable {
}
struct ScheduledTimer {
- expiration: DateTime<Utc>,
+ expiration: Instant,
_notify: oneshot::Sender<()>,
}
struct SchedulerState {
runnables: VecDeque<ScheduledRunnable>,
timers: Vec<ScheduledTimer>,
+ blocked_sessions: Vec<SessionId>,
randomize_order: bool,
allow_parking: bool,
+ timeout_ticks: RangeInclusive<usize>,
next_session_id: SessionId,
+ capture_pending_traces: bool,
+ next_trace_id: TraceId,
+ pending_traces: BTreeMap<TraceId, Backtrace>,
}
-struct WakerFn<F> {
- f: F,
+const WAKER_VTABLE: RawWakerVTable = RawWakerVTable::new(
+ TracingWaker::clone_raw,
+ TracingWaker::wake_raw,
+ TracingWaker::wake_by_ref_raw,
+ TracingWaker::drop_raw,
+);
+
+#[derive(Copy, Clone, Eq, PartialEq, PartialOrd, Ord)]
+struct TraceId(usize);
+
+struct TracingWaker {
+ id: Option<TraceId>,
+ awoken: Arc<AtomicBool>,
+ thread: Thread,
+ state: Arc<Mutex<SchedulerState>>,
}
-impl<F: Fn()> WakerFn<F> {
- fn new(f: F) -> Self {
- Self { f }
+impl Clone for TracingWaker {
+ fn clone(&self) -> Self {
+ let mut state = self.state.lock();
+ let id = if state.capture_pending_traces {
+ let id = state.next_trace_id;
+ state.next_trace_id.0 += 1;
+ state.pending_traces.insert(id, Backtrace::new_unresolved());
+ Some(id)
+ } else {
+ None
+ };
+ Self {
+ id,
+ awoken: self.awoken.clone(),
+ thread: self.thread.clone(),
+ state: self.state.clone(),
+ }
}
}
-impl<F: Fn()> Wake for WakerFn<F> {
- fn wake(self: Arc<Self>) {
- (self.f)();
+impl Drop for TracingWaker {
+ fn drop(&mut self) {
+ if let Some(id) = self.id {
+ self.state.lock().pending_traces.remove(&id);
+ }
+ }
+}
+
+impl TracingWaker {
+ fn wake(self) {
+ self.wake_by_ref();
+ }
+
+ fn wake_by_ref(&self) {
+ if let Some(id) = self.id {
+ self.state.lock().pending_traces.remove(&id);
+ }
+ self.awoken.store(true, SeqCst);
+ self.thread.unpark();
}
- fn wake_by_ref(self: &Arc<Self>) {
- (self.f)();
+ fn clone_raw(waker: *const ()) -> RawWaker {
+ let waker = waker as *const TracingWaker;
+ let waker = unsafe { &*waker };
+ RawWaker::new(
+ Box::into_raw(Box::new(waker.clone())) as *const (),
+ &WAKER_VTABLE,
+ )
+ }
+
+ fn wake_raw(waker: *const ()) {
+ let waker = unsafe { Box::from_raw(waker as *mut TracingWaker) };
+ waker.wake();
+ }
+
+ fn wake_by_ref_raw(waker: *const ()) {
+ let waker = waker as *const TracingWaker;
+ let waker = unsafe { &*waker };
+ waker.wake_by_ref();
+ }
+
+ fn drop_raw(waker: *const ()) {
+ let waker = unsafe { Box::from_raw(waker as *mut TracingWaker) };
+ drop(waker);
}
}
@@ -342,11 +505,20 @@ impl Future for Yield {
}
}
-fn park(parker: &parking::Parker, deadline: Option<Instant>) -> bool {
- if let Some(deadline) = deadline {
- parker.park_deadline(deadline)
- } else {
- parker.park();
- true
+fn exclude_wakers_from_trace(mut trace: Backtrace) -> Backtrace {
+ trace.resolve();
+ let mut frames: Vec<BacktraceFrame> = trace.into();
+ let waker_clone_frame_ix = frames.iter().position(|frame| {
+ frame.symbols().iter().any(|symbol| {
+ symbol
+ .name()
+ .is_some_and(|name| format!("{name:#?}") == type_name_of_val(&Waker::clone))
+ })
+ });
+
+ if let Some(waker_clone_frame_ix) = waker_clone_frame_ix {
+ frames.drain(..waker_clone_frame_ix + 1);
}
+
+ Backtrace::from(frames)
}
@@ -153,7 +153,7 @@ fn test_randomize_order() {
// Test deterministic mode: different seeds should produce same execution order
let mut deterministic_results = HashSet::new();
for seed in 0..10 {
- let config = SchedulerConfig {
+ let config = TestSchedulerConfig {
seed,
randomize_order: false,
..Default::default()
@@ -173,7 +173,7 @@ fn test_randomize_order() {
// Test randomized mode: different seeds can produce different execution orders
let mut randomized_results = HashSet::new();
for seed in 0..20 {
- let config = SchedulerConfig::with_seed(seed);
+ let config = TestSchedulerConfig::with_seed(seed);
let order = block_on(capture_execution_order(config));
assert_eq!(order.len(), 6);
randomized_results.insert(order);
@@ -186,7 +186,7 @@ fn test_randomize_order() {
);
}
-async fn capture_execution_order(config: SchedulerConfig) -> Vec<String> {
+async fn capture_execution_order(config: TestSchedulerConfig) -> Vec<String> {
let scheduler = Arc::new(TestScheduler::new(config));
let foreground = scheduler.foreground();
let background = scheduler.background();
@@ -221,49 +221,55 @@ async fn capture_execution_order(config: SchedulerConfig) -> Vec<String> {
#[test]
fn test_block() {
- let scheduler = Arc::new(TestScheduler::new(SchedulerConfig::default()));
- let executor = BackgroundExecutor::new(scheduler);
+ let scheduler = Arc::new(TestScheduler::new(TestSchedulerConfig::default()));
let (tx, rx) = oneshot::channel();
// Spawn background task to send value
- let _ = executor
+ let _ = scheduler
+ .background()
.spawn(async move {
tx.send(42).unwrap();
})
.detach();
// Block on receiving the value
- let result = executor.block_on(async { rx.await.unwrap() });
+ let result = scheduler.foreground().block_on(async { rx.await.unwrap() });
assert_eq!(result, 42);
}
#[test]
-#[should_panic(expected = "Parking forbidden")]
+#[should_panic(expected = "futures_channel::oneshot::Inner")]
fn test_parking_panics() {
- let scheduler = Arc::new(TestScheduler::new(SchedulerConfig::default()));
- let executor = BackgroundExecutor::new(scheduler);
- executor.block_on(future::pending::<()>());
+ let config = TestSchedulerConfig {
+ capture_pending_traces: true,
+ ..Default::default()
+ };
+ let scheduler = Arc::new(TestScheduler::new(config));
+ scheduler.foreground().block_on(async {
+ let (_tx, rx) = oneshot::channel::<()>();
+ rx.await.unwrap(); // This will never complete
+ });
}
#[test]
fn test_block_with_parking() {
- let config = SchedulerConfig {
+ let config = TestSchedulerConfig {
allow_parking: true,
..Default::default()
};
let scheduler = Arc::new(TestScheduler::new(config));
- let executor = BackgroundExecutor::new(scheduler);
let (tx, rx) = oneshot::channel();
// Spawn background task to send value
- let _ = executor
+ let _ = scheduler
+ .background()
.spawn(async move {
tx.send(42).unwrap();
})
.detach();
// Block on receiving the value (will park if needed)
- let result = executor.block_on(async { rx.await.unwrap() });
+ let result = scheduler.foreground().block_on(async { rx.await.unwrap() });
assert_eq!(result, 42);
}
@@ -298,30 +304,31 @@ fn test_helper_methods() {
fn test_block_with_timeout() {
// Test case: future completes within timeout
TestScheduler::once(async |scheduler| {
- let background = scheduler.background();
- let mut future = future::ready(42);
- let output = background.block_with_timeout(&mut future, Duration::from_millis(100));
- assert_eq!(output, Some(42));
+ let foreground = scheduler.foreground();
+ let future = future::ready(42);
+ let output = foreground.block_with_timeout(Duration::from_millis(100), future);
+ assert_eq!(output.unwrap(), 42);
});
// Test case: future times out
TestScheduler::once(async |scheduler| {
- let background = scheduler.background();
- let mut future = future::pending::<()>();
- let output = background.block_with_timeout(&mut future, Duration::from_millis(50));
- assert_eq!(output, None);
+ let foreground = scheduler.foreground();
+ let future = future::pending::<()>();
+ let output = foreground.block_with_timeout(Duration::from_millis(50), future);
+ let _ = output.expect_err("future should not have finished");
});
// Test case: future makes progress via timer but still times out
let mut results = BTreeSet::new();
TestScheduler::many(100, async |scheduler| {
- let background = scheduler.background();
- let mut task = background.spawn(async move {
+ let task = scheduler.background().spawn(async move {
Yield { polls: 10 }.await;
42
});
- let output = background.block_with_timeout(&mut task, Duration::from_millis(50));
- results.insert(output);
+ let output = scheduler
+ .foreground()
+ .block_with_timeout(Duration::from_millis(50), task);
+ results.insert(output.ok());
});
assert_eq!(
results.into_iter().collect::<Vec<_>>(),
@@ -329,6 +336,33 @@ fn test_block_with_timeout() {
);
}
+// When calling block, we shouldn't make progress on foreground-spawned futures with the same session id.
+#[test]
+fn test_block_does_not_progress_same_session_foreground() {
+ let mut task2_made_progress_once = false;
+ TestScheduler::many(1000, async |scheduler| {
+ let foreground1 = scheduler.foreground();
+ let foreground2 = scheduler.foreground();
+
+ let task1 = foreground1.spawn(async move {});
+ let task2 = foreground2.spawn(async move {});
+
+ foreground1.block_on(async {
+ scheduler.yield_random().await;
+ assert!(!task1.is_ready());
+ task2_made_progress_once |= task2.is_ready();
+ });
+
+ task1.await;
+ task2.await;
+ });
+
+ assert!(
+ task2_made_progress_once,
+ "Expected task from different foreground executor to make progress (at least once)"
+ );
+}
+
struct Yield {
polls: usize,
}