@@ -306,6 +306,32 @@ impl Deterministic {
None
}
+ pub fn timer(&self, duration: Duration) -> impl Future<Output = ()> {
+ let (tx, mut rx) = postage::barrier::channel();
+ let timer_id;
+ {
+ let mut state = self.state.lock();
+ let wakeup_at = state.now + duration;
+ timer_id = util::post_inc(&mut state.next_timer_id);
+ state.pending_timers.push((timer_id, wakeup_at, tx));
+ }
+
+ let remove_timer = util::defer({
+ let state = self.state.clone();
+ move || {
+ state
+ .lock()
+ .pending_timers
+ .retain(|(id, _, _)| *id != timer_id);
+ }
+ });
+
+ async move {
+ postage::prelude::Stream::recv(&mut rx).await;
+ drop(remove_timer);
+ }
+ }
+
pub fn advance_clock(&self, duration: Duration) {
let mut state = self.state.lock();
state.now += duration;
@@ -438,41 +464,18 @@ impl Foreground {
}
}
- pub async fn timer(&self, duration: Duration) {
- match self {
- #[cfg(any(test, feature = "test-support"))]
- Self::Deterministic { executor, .. } => {
- use postage::prelude::Stream as _;
-
- let (tx, mut rx) = postage::barrier::channel();
- let timer_id;
- {
- let mut state = executor.state.lock();
- let wakeup_at = state.now + duration;
- timer_id = util::post_inc(&mut state.next_timer_id);
- state.pending_timers.push((timer_id, wakeup_at, tx));
- }
+ pub fn timer(&self, duration: Duration) -> impl Future<Output = ()> {
+ let mut timer = None;
- struct DropTimer<'a>(usize, &'a Foreground);
- impl<'a> Drop for DropTimer<'a> {
- fn drop(&mut self) {
- match self.1 {
- Foreground::Deterministic { executor, .. } => {
- executor
- .state
- .lock()
- .pending_timers
- .retain(|(timer_id, _, _)| *timer_id != self.0);
- }
- _ => unreachable!(),
- }
- }
- }
+ #[cfg(any(test, feature = "test-support"))]
+ if let Self::Deterministic { executor, .. } = self {
+ timer = Some(executor.timer(duration));
+ }
- let _guard = DropTimer(timer_id, self);
- rx.recv().await;
- }
- _ => {
+ async move {
+ if let Some(timer) = timer {
+ timer.await;
+ } else {
Timer::after(duration).await;
}
}
@@ -600,6 +603,23 @@ impl Background {
}
}
+ pub fn timer(&self, duration: Duration) -> impl Future<Output = ()> {
+ let mut timer = None;
+
+ #[cfg(any(test, feature = "test-support"))]
+ if let Self::Deterministic { executor, .. } = self {
+ timer = Some(executor.timer(duration));
+ }
+
+ async move {
+ if let Some(timer) = timer {
+ timer.await;
+ } else {
+ Timer::after(duration).await;
+ }
+ }
+ }
+
#[cfg(any(test, feature = "test-support"))]
pub async fn simulate_random_delay(&self) {
use rand::prelude::*;
@@ -1,5 +1,6 @@
use smol::future::FutureExt;
use std::{future::Future, time::Duration};
+pub use util::*;
pub fn post_inc(value: &mut usize) -> usize {
let prev = *value;
@@ -123,6 +123,18 @@ where
}
}
+struct Defer<F: FnOnce()>(Option<F>);
+
+impl<F: FnOnce()> Drop for Defer<F> {
+ fn drop(&mut self) {
+ self.0.take().map(|f| f());
+ }
+}
+
+pub fn defer<F: FnOnce()>(f: F) -> impl Drop {
+ Defer(Some(f))
+}
+
#[cfg(test)]
mod tests {
use super::*;