@@ -327,10 +327,8 @@ impl BackgroundExecutor {
"parked with nothing left to run{waiting_message}{backtrace_message}",
)
}
- dispatcher.set_unparker(unparker.clone());
- parker.park_timeout(
- test_should_end_by.saturating_duration_since(Instant::now()),
- );
+ dispatcher.push_unparker(unparker.clone());
+ parker.park_timeout(Duration::from_millis(1));
if Instant::now() > test_should_end_by {
panic!("test timed out after {duration:?} with allow_parking")
}
@@ -38,7 +38,7 @@ struct TestDispatcherState {
waiting_backtrace: Option<Backtrace>,
deprioritized_task_labels: HashSet<TaskLabel>,
block_on_ticks: RangeInclusive<usize>,
- last_parked: Option<Unparker>,
+ unparkers: Vec<Unparker>,
}
impl TestDispatcher {
@@ -58,7 +58,7 @@ impl TestDispatcher {
waiting_backtrace: None,
deprioritized_task_labels: Default::default(),
block_on_ticks: 0..=1000,
- last_parked: None,
+ unparkers: Default::default(),
};
TestDispatcher {
@@ -245,20 +245,14 @@ impl TestDispatcher {
let block_on_ticks = lock.block_on_ticks.clone();
lock.random.random_range(block_on_ticks)
}
- pub fn unpark_last(&self) {
- self.state
- .lock()
- .last_parked
- .take()
- .as_ref()
- .map(Unparker::unpark);
+
+ pub fn unpark_all(&self) {
+ self.state.lock().unparkers.retain(|parker| parker.unpark());
}
- pub fn set_unparker(&self, unparker: Unparker) {
- let last = { self.state.lock().last_parked.replace(unparker) };
- if let Some(last) = last {
- last.unpark();
- }
+ pub fn push_unparker(&self, unparker: Unparker) {
+ let mut state = self.state.lock();
+ state.unparkers.push(unparker);
}
}
@@ -299,7 +293,7 @@ impl PlatformDispatcher for TestDispatcher {
state.background.push(runnable);
}
}
- self.unpark_last();
+ self.unpark_all();
}
fn dispatch_on_main_thread(&self, runnable: RunnableVariant) {
@@ -309,7 +303,7 @@ impl PlatformDispatcher for TestDispatcher {
.entry(self.id)
.or_default()
.push_back(runnable);
- self.unpark_last();
+ self.unpark_all();
}
fn dispatch_after(&self, duration: std::time::Duration, runnable: RunnableVariant) {