1use crate::util;
2use anyhow::{anyhow, Result};
3use async_task::Runnable;
4use futures::channel::mpsc;
5use smol::{channel, prelude::*, Executor};
6use std::{
7 any::Any,
8 fmt::{self},
9 marker::PhantomData,
10 mem,
11 pin::Pin,
12 rc::Rc,
13 sync::Arc,
14 task::{Context, Poll},
15 thread,
16 time::Duration,
17};
18
19use crate::PlatformDispatcher;
20
21pub enum ForegroundExecutor {
22 Platform {
23 dispatcher: Arc<dyn PlatformDispatcher>,
24 _not_send_or_sync: PhantomData<Rc<()>>,
25 },
26 #[cfg(any(test, feature = "test"))]
27 Deterministic {
28 cx_id: usize,
29 executor: Arc<Deterministic>,
30 },
31}
32
33pub enum BackgroundExecutor {
34 #[cfg(any(test, feature = "test"))]
35 Deterministic { executor: Arc<Deterministic> },
36 Production {
37 executor: Arc<smol::Executor<'static>>,
38 _stop: channel::Sender<()>,
39 },
40}
41
42type AnyLocalFuture = Pin<Box<dyn 'static + Future<Output = Box<dyn Any + 'static>>>>;
43type AnyFuture = Pin<Box<dyn 'static + Send + Future<Output = Box<dyn Any + Send + 'static>>>>;
44type AnyTask = async_task::Task<Box<dyn Any + Send + 'static>>;
45type AnyLocalTask = async_task::Task<Box<dyn Any + 'static>>;
46
47#[must_use]
48pub enum Task<T> {
49 Ready(Option<T>),
50 Local {
51 any_task: AnyLocalTask,
52 result_type: PhantomData<T>,
53 },
54 Send {
55 any_task: AnyTask,
56 result_type: PhantomData<T>,
57 },
58}
59
60unsafe impl<T: Send> Send for Task<T> {}
61
62#[cfg(any(test, feature = "test"))]
63struct DeterministicState {
64 rng: rand::prelude::StdRng,
65 seed: u64,
66 scheduled_from_foreground: collections::HashMap<usize, Vec<ForegroundRunnable>>,
67 scheduled_from_background: Vec<BackgroundRunnable>,
68 forbid_parking: bool,
69 block_on_ticks: std::ops::RangeInclusive<usize>,
70 now: std::time::Instant,
71 next_timer_id: usize,
72 pending_timers: Vec<(usize, std::time::Instant, postage::barrier::Sender)>,
73 waiting_backtrace: Option<backtrace::Backtrace>,
74 next_runnable_id: usize,
75 poll_history: Vec<ExecutorEvent>,
76 previous_poll_history: Option<Vec<ExecutorEvent>>,
77 enable_runnable_backtraces: bool,
78 runnable_backtraces: collections::HashMap<usize, backtrace::Backtrace>,
79}
80
81#[derive(Copy, Clone, Debug, PartialEq, Eq)]
82pub enum ExecutorEvent {
83 PollRunnable { id: usize },
84 EnqueuRunnable { id: usize },
85}
86
87#[cfg(any(test, feature = "test"))]
88struct ForegroundRunnable {
89 id: usize,
90 runnable: Runnable,
91 main: bool,
92}
93
94#[cfg(any(test, feature = "test"))]
95struct BackgroundRunnable {
96 id: usize,
97 runnable: Runnable,
98}
99
100#[cfg(any(test, feature = "test"))]
101pub struct Deterministic {
102 state: Arc<parking_lot::Mutex<DeterministicState>>,
103 parker: parking_lot::Mutex<parking::Parker>,
104}
105
106#[must_use]
107pub enum Timer {
108 Production(smol::Timer),
109 #[cfg(any(test, feature = "test"))]
110 Deterministic(DeterministicTimer),
111}
112
113#[cfg(any(test, feature = "test"))]
114pub struct DeterministicTimer {
115 rx: postage::barrier::Receiver,
116 id: usize,
117 state: Arc<parking_lot::Mutex<DeterministicState>>,
118}
119
120#[cfg(any(test, feature = "test"))]
121impl Deterministic {
122 pub fn new(seed: u64) -> Arc<Self> {
123 use rand::prelude::*;
124
125 Arc::new(Self {
126 state: Arc::new(parking_lot::Mutex::new(DeterministicState {
127 rng: StdRng::seed_from_u64(seed),
128 seed,
129 scheduled_from_foreground: Default::default(),
130 scheduled_from_background: Default::default(),
131 forbid_parking: false,
132 block_on_ticks: 0..=1000,
133 now: std::time::Instant::now(),
134 next_timer_id: Default::default(),
135 pending_timers: Default::default(),
136 waiting_backtrace: None,
137 next_runnable_id: 0,
138 poll_history: Default::default(),
139 previous_poll_history: Default::default(),
140 enable_runnable_backtraces: false,
141 runnable_backtraces: Default::default(),
142 })),
143 parker: Default::default(),
144 })
145 }
146
147 pub fn execution_history(&self) -> Vec<ExecutorEvent> {
148 self.state.lock().poll_history.clone()
149 }
150
151 pub fn set_previous_execution_history(&self, history: Option<Vec<ExecutorEvent>>) {
152 self.state.lock().previous_poll_history = history;
153 }
154
155 pub fn enable_runnable_backtrace(&self) {
156 self.state.lock().enable_runnable_backtraces = true;
157 }
158
159 pub fn runnable_backtrace(&self, runnable_id: usize) -> backtrace::Backtrace {
160 let mut backtrace = self.state.lock().runnable_backtraces[&runnable_id].clone();
161 backtrace.resolve();
162 backtrace
163 }
164
165 pub fn build_background(self: &Arc<Self>) -> Arc<BackgroundExecutor> {
166 Arc::new(BackgroundExecutor::Deterministic {
167 executor: self.clone(),
168 })
169 }
170
171 pub fn build_foreground(self: &Arc<Self>, id: usize) -> Rc<ForegroundExecutor> {
172 Rc::new(ForegroundExecutor::Deterministic {
173 cx_id: id,
174 executor: self.clone(),
175 })
176 }
177
178 fn spawn_from_foreground(
179 &self,
180 cx_id: usize,
181 future: AnyLocalFuture,
182 main: bool,
183 ) -> AnyLocalTask {
184 let state = self.state.clone();
185 let id;
186 {
187 let mut state = state.lock();
188 id = util::post_inc(&mut state.next_runnable_id);
189 if state.enable_runnable_backtraces {
190 state
191 .runnable_backtraces
192 .insert(id, backtrace::Backtrace::new_unresolved());
193 }
194 }
195
196 let unparker = self.parker.lock().unparker();
197 let (runnable, task) = async_task::spawn_local(future, move |runnable| {
198 let mut state = state.lock();
199 state.push_to_history(ExecutorEvent::EnqueuRunnable { id });
200 state
201 .scheduled_from_foreground
202 .entry(cx_id)
203 .or_default()
204 .push(ForegroundRunnable { id, runnable, main });
205 unparker.unpark();
206 });
207 runnable.schedule();
208 task
209 }
210
211 fn spawn(&self, future: AnyFuture) -> AnyTask {
212 let state = self.state.clone();
213 let id;
214 {
215 let mut state = state.lock();
216 id = util::post_inc(&mut state.next_runnable_id);
217 if state.enable_runnable_backtraces {
218 state
219 .runnable_backtraces
220 .insert(id, backtrace::Backtrace::new_unresolved());
221 }
222 }
223
224 let unparker = self.parker.lock().unparker();
225 let (runnable, task) = async_task::spawn(future, move |runnable| {
226 let mut state = state.lock();
227 state
228 .poll_history
229 .push(ExecutorEvent::EnqueuRunnable { id });
230 state
231 .scheduled_from_background
232 .push(BackgroundRunnable { id, runnable });
233 unparker.unpark();
234 });
235 runnable.schedule();
236 task
237 }
238
239 fn run<'a>(
240 &self,
241 cx_id: usize,
242 main_future: Pin<Box<dyn 'a + Future<Output = Box<dyn Any>>>>,
243 ) -> Box<dyn Any> {
244 use std::sync::atomic::{AtomicBool, Ordering::SeqCst};
245
246 let woken = Arc::new(AtomicBool::new(false));
247
248 let state = self.state.clone();
249 let id;
250 {
251 let mut state = state.lock();
252 id = util::post_inc(&mut state.next_runnable_id);
253 if state.enable_runnable_backtraces {
254 state
255 .runnable_backtraces
256 .insert(id, backtrace::Backtrace::new_unresolved());
257 }
258 }
259
260 let unparker = self.parker.lock().unparker();
261 let (runnable, mut main_task) = unsafe {
262 async_task::spawn_unchecked(main_future, move |runnable| {
263 let state = &mut *state.lock();
264 state
265 .scheduled_from_foreground
266 .entry(cx_id)
267 .or_default()
268 .push(ForegroundRunnable {
269 id: util::post_inc(&mut state.next_runnable_id),
270 runnable,
271 main: true,
272 });
273 unparker.unpark();
274 })
275 };
276 runnable.schedule();
277
278 loop {
279 if let Some(result) = self.run_internal(woken.clone(), Some(&mut main_task)) {
280 return result;
281 }
282
283 if !woken.load(SeqCst) {
284 self.state.lock().will_park();
285 }
286
287 woken.store(false, SeqCst);
288 self.parker.lock().park();
289 }
290 }
291
292 pub fn run_until_parked(&self) {
293 use std::sync::atomic::AtomicBool;
294 let woken = Arc::new(AtomicBool::new(false));
295 self.run_internal(woken, None);
296 }
297
298 fn run_internal(
299 &self,
300 woken: Arc<std::sync::atomic::AtomicBool>,
301 mut main_task: Option<&mut AnyLocalTask>,
302 ) -> Option<Box<dyn Any>> {
303 use rand::prelude::*;
304 use std::sync::atomic::Ordering::SeqCst;
305
306 let unparker = self.parker.lock().unparker();
307 let waker = waker_fn::waker_fn(move || {
308 woken.store(true, SeqCst);
309 unparker.unpark();
310 });
311
312 let mut cx = Context::from_waker(&waker);
313 loop {
314 let mut state = self.state.lock();
315
316 if state.scheduled_from_foreground.is_empty()
317 && state.scheduled_from_background.is_empty()
318 {
319 if let Some(main_task) = main_task {
320 if let Poll::Ready(result) = main_task.poll(&mut cx) {
321 return Some(result);
322 }
323 }
324
325 return None;
326 }
327
328 if !state.scheduled_from_background.is_empty() && state.rng.gen() {
329 let background_len = state.scheduled_from_background.len();
330 let ix = state.rng.gen_range(0..background_len);
331 let background_runnable = state.scheduled_from_background.remove(ix);
332 state.push_to_history(ExecutorEvent::PollRunnable {
333 id: background_runnable.id,
334 });
335 drop(state);
336 background_runnable.runnable.run();
337 } else if !state.scheduled_from_foreground.is_empty() {
338 let available_cx_ids = state
339 .scheduled_from_foreground
340 .keys()
341 .copied()
342 .collect::<Vec<_>>();
343 let cx_id_to_run = *available_cx_ids.iter().choose(&mut state.rng).unwrap();
344 let scheduled_from_cx = state
345 .scheduled_from_foreground
346 .get_mut(&cx_id_to_run)
347 .unwrap();
348 let foreground_runnable = scheduled_from_cx.remove(0);
349 if scheduled_from_cx.is_empty() {
350 state.scheduled_from_foreground.remove(&cx_id_to_run);
351 }
352 state.push_to_history(ExecutorEvent::PollRunnable {
353 id: foreground_runnable.id,
354 });
355
356 drop(state);
357
358 foreground_runnable.runnable.run();
359 if let Some(main_task) = main_task.as_mut() {
360 if foreground_runnable.main {
361 if let Poll::Ready(result) = main_task.poll(&mut cx) {
362 return Some(result);
363 }
364 }
365 }
366 }
367 }
368 }
369
370 fn block<F, T>(&self, future: &mut F, max_ticks: usize) -> Option<T>
371 where
372 F: Unpin + Future<Output = T>,
373 {
374 use rand::prelude::*;
375
376 let unparker = self.parker.lock().unparker();
377 let waker = waker_fn::waker_fn(move || {
378 unparker.unpark();
379 });
380
381 let mut cx = Context::from_waker(&waker);
382 for _ in 0..max_ticks {
383 let mut state = self.state.lock();
384 let runnable_count = state.scheduled_from_background.len();
385 let ix = state.rng.gen_range(0..=runnable_count);
386 if ix < state.scheduled_from_background.len() {
387 let background_runnable = state.scheduled_from_background.remove(ix);
388 state.push_to_history(ExecutorEvent::PollRunnable {
389 id: background_runnable.id,
390 });
391 drop(state);
392 background_runnable.runnable.run();
393 } else {
394 drop(state);
395 if let Poll::Ready(result) = future.poll(&mut cx) {
396 return Some(result);
397 }
398 let mut state = self.state.lock();
399 if state.scheduled_from_background.is_empty() {
400 state.will_park();
401 drop(state);
402 self.parker.lock().park();
403 }
404
405 continue;
406 }
407 }
408
409 None
410 }
411
412 pub fn timer(&self, duration: Duration) -> Timer {
413 let (tx, rx) = postage::barrier::channel();
414 let mut state = self.state.lock();
415 let wakeup_at = state.now + duration;
416 let id = util::post_inc(&mut state.next_timer_id);
417 match state
418 .pending_timers
419 .binary_search_by_key(&wakeup_at, |e| e.1)
420 {
421 Ok(ix) | Err(ix) => state.pending_timers.insert(ix, (id, wakeup_at, tx)),
422 }
423 let state = self.state.clone();
424 Timer::Deterministic(DeterministicTimer { rx, id, state })
425 }
426
427 pub fn now(&self) -> std::time::Instant {
428 let state = self.state.lock();
429 state.now
430 }
431
432 pub fn advance_clock(&self, duration: Duration) {
433 let new_now = self.state.lock().now + duration;
434 loop {
435 self.run_until_parked();
436 let mut state = self.state.lock();
437
438 if let Some((_, wakeup_time, _)) = state.pending_timers.first() {
439 let wakeup_time = *wakeup_time;
440 if wakeup_time <= new_now {
441 let timer_count = state
442 .pending_timers
443 .iter()
444 .take_while(|(_, t, _)| *t == wakeup_time)
445 .count();
446 state.now = wakeup_time;
447 let timers_to_wake = state
448 .pending_timers
449 .drain(0..timer_count)
450 .collect::<Vec<_>>();
451 drop(state);
452 drop(timers_to_wake);
453 continue;
454 }
455 }
456
457 break;
458 }
459
460 self.state.lock().now = new_now;
461 }
462
463 pub fn start_waiting(&self) {
464 self.state.lock().waiting_backtrace = Some(backtrace::Backtrace::new_unresolved());
465 }
466
467 pub fn finish_waiting(&self) {
468 self.state.lock().waiting_backtrace.take();
469 }
470
471 pub fn forbid_parking(&self) {
472 use rand::prelude::*;
473
474 let mut state = self.state.lock();
475 state.forbid_parking = true;
476 state.rng = StdRng::seed_from_u64(state.seed);
477 }
478
479 pub fn allow_parking(&self) {
480 use rand::prelude::*;
481
482 let mut state = self.state.lock();
483 state.forbid_parking = false;
484 state.rng = StdRng::seed_from_u64(state.seed);
485 }
486
487 pub async fn simulate_random_delay(&self) {
488 use rand::prelude::*;
489 use smol::future::yield_now;
490 if self.state.lock().rng.gen_bool(0.2) {
491 let yields = self.state.lock().rng.gen_range(1..=10);
492 for _ in 0..yields {
493 yield_now().await;
494 }
495 }
496 }
497
498 pub fn record_backtrace(&self) {
499 let mut state = self.state.lock();
500 if state.enable_runnable_backtraces {
501 let current_id = state
502 .poll_history
503 .iter()
504 .rev()
505 .find_map(|event| match event {
506 ExecutorEvent::PollRunnable { id } => Some(*id),
507 _ => None,
508 });
509 if let Some(id) = current_id {
510 state
511 .runnable_backtraces
512 .insert(id, backtrace::Backtrace::new_unresolved());
513 }
514 }
515 }
516}
517
518impl Drop for Timer {
519 fn drop(&mut self) {
520 #[cfg(any(test, feature = "test"))]
521 if let Timer::Deterministic(DeterministicTimer { state, id, .. }) = self {
522 state
523 .lock()
524 .pending_timers
525 .retain(|(timer_id, _, _)| timer_id != id)
526 }
527 }
528}
529
530impl Future for Timer {
531 type Output = ();
532
533 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
534 match &mut *self {
535 #[cfg(any(test, feature = "test"))]
536 Self::Deterministic(DeterministicTimer { rx, .. }) => {
537 use postage::stream::{PollRecv, Stream as _};
538 smol::pin!(rx);
539 match rx.poll_recv(&mut postage::Context::from_waker(cx.waker())) {
540 PollRecv::Ready(()) | PollRecv::Closed => Poll::Ready(()),
541 PollRecv::Pending => Poll::Pending,
542 }
543 }
544 Self::Production(timer) => {
545 smol::pin!(timer);
546 match timer.poll(cx) {
547 Poll::Ready(_) => Poll::Ready(()),
548 Poll::Pending => Poll::Pending,
549 }
550 }
551 }
552 }
553}
554
555#[cfg(any(test, feature = "test"))]
556impl DeterministicState {
557 fn push_to_history(&mut self, event: ExecutorEvent) {
558 use std::fmt::Write as _;
559
560 self.poll_history.push(event);
561 if let Some(prev_history) = &self.previous_poll_history {
562 let ix = self.poll_history.len() - 1;
563 let prev_event = prev_history[ix];
564 if event != prev_event {
565 let mut message = String::new();
566 writeln!(
567 &mut message,
568 "current runnable backtrace:\n{:?}",
569 self.runnable_backtraces.get_mut(&event.id()).map(|trace| {
570 trace.resolve();
571 crate::util::CwdBacktrace(trace)
572 })
573 )
574 .unwrap();
575 writeln!(
576 &mut message,
577 "previous runnable backtrace:\n{:?}",
578 self.runnable_backtraces
579 .get_mut(&prev_event.id())
580 .map(|trace| {
581 trace.resolve();
582 util::CwdBacktrace(trace)
583 })
584 )
585 .unwrap();
586 panic!("detected non-determinism after {ix}. {message}");
587 }
588 }
589 }
590
591 fn will_park(&mut self) {
592 if self.forbid_parking {
593 let mut backtrace_message = String::new();
594 #[cfg(any(test, feature = "test"))]
595 if let Some(backtrace) = self.waiting_backtrace.as_mut() {
596 backtrace.resolve();
597 backtrace_message = format!(
598 "\nbacktrace of waiting future:\n{:?}",
599 util::CwdBacktrace(backtrace)
600 );
601 }
602
603 panic!(
604 "deterministic executor parked after a call to forbid_parking{}",
605 backtrace_message
606 );
607 }
608 }
609}
610
611#[cfg(any(test, feature = "test"))]
612impl ExecutorEvent {
613 pub fn id(&self) -> usize {
614 match self {
615 ExecutorEvent::PollRunnable { id } => *id,
616 ExecutorEvent::EnqueuRunnable { id } => *id,
617 }
618 }
619}
620
621impl ForegroundExecutor {
622 pub fn new(dispatcher: Arc<dyn PlatformDispatcher>) -> Result<Self> {
623 if dispatcher.is_main_thread() {
624 Ok(Self::Platform {
625 dispatcher,
626 _not_send_or_sync: PhantomData,
627 })
628 } else {
629 Err(anyhow!("must be constructed on main thread"))
630 }
631 }
632
633 pub fn spawn<T: 'static>(&self, future: impl Future<Output = T> + 'static) -> Task<T> {
634 let future = any_local_future(future);
635 let any_task = match self {
636 #[cfg(any(test, feature = "test"))]
637 Self::Deterministic { cx_id, executor } => {
638 executor.spawn_from_foreground(*cx_id, future, false)
639 }
640 Self::Platform { dispatcher, .. } => {
641 fn spawn_inner(
642 future: AnyLocalFuture,
643 dispatcher: &Arc<dyn PlatformDispatcher>,
644 ) -> AnyLocalTask {
645 let dispatcher = dispatcher.clone();
646 let schedule =
647 move |runnable: Runnable| dispatcher.run_on_main_thread(runnable);
648 let (runnable, task) = async_task::spawn_local(future, schedule);
649 runnable.schedule();
650 task
651 }
652 spawn_inner(future, dispatcher)
653 }
654 };
655 Task::local(any_task)
656 }
657
658 #[cfg(any(test, feature = "test"))]
659 pub fn run<T: 'static>(&self, future: impl Future<Output = T>) -> T {
660 let future = async move { Box::new(future.await) as Box<dyn Any> }.boxed_local();
661 let result = match self {
662 Self::Deterministic { cx_id, executor } => executor.run(*cx_id, future),
663 Self::Platform { .. } => panic!("you can't call run on a platform foreground executor"),
664 };
665 *result.downcast().unwrap()
666 }
667
668 #[cfg(any(test, feature = "test"))]
669 pub fn run_until_parked(&self) {
670 match self {
671 Self::Deterministic { executor, .. } => executor.run_until_parked(),
672 _ => panic!("this method can only be called on a deterministic executor"),
673 }
674 }
675
676 #[cfg(any(test, feature = "test"))]
677 pub fn parking_forbidden(&self) -> bool {
678 match self {
679 Self::Deterministic { executor, .. } => executor.state.lock().forbid_parking,
680 _ => panic!("this method can only be called on a deterministic executor"),
681 }
682 }
683
684 #[cfg(any(test, feature = "test"))]
685 pub fn start_waiting(&self) {
686 match self {
687 Self::Deterministic { executor, .. } => executor.start_waiting(),
688 _ => panic!("this method can only be called on a deterministic executor"),
689 }
690 }
691
692 #[cfg(any(test, feature = "test"))]
693 pub fn finish_waiting(&self) {
694 match self {
695 Self::Deterministic { executor, .. } => executor.finish_waiting(),
696 _ => panic!("this method can only be called on a deterministic executor"),
697 }
698 }
699
700 #[cfg(any(test, feature = "test"))]
701 pub fn forbid_parking(&self) {
702 match self {
703 Self::Deterministic { executor, .. } => executor.forbid_parking(),
704 _ => panic!("this method can only be called on a deterministic executor"),
705 }
706 }
707
708 #[cfg(any(test, feature = "test"))]
709 pub fn allow_parking(&self) {
710 match self {
711 Self::Deterministic { executor, .. } => executor.allow_parking(),
712 _ => panic!("this method can only be called on a deterministic executor"),
713 }
714 }
715
716 #[cfg(any(test, feature = "test"))]
717 pub fn advance_clock(&self, duration: Duration) {
718 match self {
719 Self::Deterministic { executor, .. } => executor.advance_clock(duration),
720 _ => panic!("this method can only be called on a deterministic executor"),
721 }
722 }
723
724 #[cfg(any(test, feature = "test"))]
725 pub fn set_block_on_ticks(&self, range: std::ops::RangeInclusive<usize>) {
726 match self {
727 Self::Deterministic { executor, .. } => executor.state.lock().block_on_ticks = range,
728 _ => panic!("this method can only be called on a deterministic executor"),
729 }
730 }
731}
732
733impl BackgroundExecutor {
734 pub fn new() -> Self {
735 let executor = Arc::new(Executor::new());
736 let stop = channel::unbounded::<()>();
737
738 for i in 0..2 * num_cpus::get() {
739 let executor = executor.clone();
740 let stop = stop.1.clone();
741 thread::Builder::new()
742 .name(format!("background-executor-{}", i))
743 .spawn(move || smol::block_on(executor.run(stop.recv())))
744 .unwrap();
745 }
746
747 Self::Production {
748 executor,
749 _stop: stop.0,
750 }
751 }
752
753 pub fn num_cpus(&self) -> usize {
754 num_cpus::get()
755 }
756
757 pub fn spawn<T, F>(&self, future: F) -> Task<T>
758 where
759 T: 'static + Send,
760 F: Send + Future<Output = T> + 'static,
761 {
762 let future = any_future(future);
763 let any_task = match self {
764 Self::Production { executor, .. } => executor.spawn(future),
765 #[cfg(any(test, feature = "test"))]
766 Self::Deterministic { executor } => executor.spawn(future),
767 };
768 Task::send(any_task)
769 }
770
771 pub fn block<F, T>(&self, future: F) -> T
772 where
773 F: Future<Output = T>,
774 {
775 smol::pin!(future);
776 match self {
777 Self::Production { .. } => smol::block_on(&mut future),
778 #[cfg(any(test, feature = "test"))]
779 Self::Deterministic { executor, .. } => {
780 executor.block(&mut future, usize::MAX).unwrap()
781 }
782 }
783 }
784
785 pub fn block_with_timeout<F, T>(
786 &self,
787 timeout: Duration,
788 future: F,
789 ) -> Result<T, impl Future<Output = T>>
790 where
791 T: 'static,
792 F: 'static + Unpin + Future<Output = T>,
793 {
794 let mut future = any_local_future(future);
795 if !timeout.is_zero() {
796 let output = match self {
797 Self::Production { .. } => smol::block_on(util::timeout(timeout, &mut future)).ok(),
798 #[cfg(any(test, feature = "test"))]
799 Self::Deterministic { executor, .. } => {
800 use rand::prelude::*;
801 let max_ticks = {
802 let mut state = executor.state.lock();
803 let range = state.block_on_ticks.clone();
804 state.rng.gen_range(range)
805 };
806 executor.block(&mut future, max_ticks)
807 }
808 };
809 if let Some(output) = output {
810 return Ok(*output.downcast().unwrap());
811 }
812 }
813 Err(async { *future.await.downcast().unwrap() })
814 }
815
816 pub async fn scoped<'scope, F>(self: &Arc<Self>, scheduler: F)
817 where
818 F: FnOnce(&mut Scope<'scope>),
819 {
820 let mut scope = Scope::new(self.clone());
821 (scheduler)(&mut scope);
822 let spawned = mem::take(&mut scope.futures)
823 .into_iter()
824 .map(|f| self.spawn(f))
825 .collect::<Vec<_>>();
826 for task in spawned {
827 task.await;
828 }
829 }
830
831 pub fn timer(&self, duration: Duration) -> Timer {
832 match self {
833 BackgroundExecutor::Production { .. } => {
834 Timer::Production(smol::Timer::after(duration))
835 }
836 #[cfg(any(test, feature = "test"))]
837 BackgroundExecutor::Deterministic { executor } => executor.timer(duration),
838 }
839 }
840
841 pub fn now(&self) -> std::time::Instant {
842 match self {
843 BackgroundExecutor::Production { .. } => std::time::Instant::now(),
844 #[cfg(any(test, feature = "test"))]
845 BackgroundExecutor::Deterministic { executor } => executor.now(),
846 }
847 }
848
849 #[cfg(any(test, feature = "test"))]
850 pub fn rng<'a>(&'a self) -> impl 'a + std::ops::DerefMut<Target = rand::prelude::StdRng> {
851 match self {
852 Self::Deterministic { executor, .. } => {
853 parking_lot::lock_api::MutexGuard::map(executor.state.lock(), |s| &mut s.rng)
854 }
855 _ => panic!("this method can only be called on a deterministic executor"),
856 }
857 }
858
859 #[cfg(any(test, feature = "test"))]
860 pub async fn simulate_random_delay(&self) {
861 match self {
862 Self::Deterministic { executor, .. } => {
863 executor.simulate_random_delay().await;
864 }
865 _ => {
866 panic!("this method can only be called on a deterministic executor")
867 }
868 }
869 }
870
871 #[cfg(any(test, feature = "test"))]
872 pub fn record_backtrace(&self) {
873 match self {
874 Self::Deterministic { executor, .. } => executor.record_backtrace(),
875 _ => {
876 panic!("this method can only be called on a deterministic executor")
877 }
878 }
879 }
880
881 #[cfg(any(test, feature = "test"))]
882 pub fn start_waiting(&self) {
883 match self {
884 Self::Deterministic { executor, .. } => executor.start_waiting(),
885 _ => panic!("this method can only be called on a deterministic executor"),
886 }
887 }
888}
889
890impl Default for BackgroundExecutor {
891 fn default() -> Self {
892 Self::new()
893 }
894}
895
896pub struct Scope<'a> {
897 executor: Arc<BackgroundExecutor>,
898 futures: Vec<Pin<Box<dyn Future<Output = ()> + Send + 'static>>>,
899 tx: Option<mpsc::Sender<()>>,
900 rx: mpsc::Receiver<()>,
901 _phantom: PhantomData<&'a ()>,
902}
903
904impl<'a> Scope<'a> {
905 fn new(executor: Arc<BackgroundExecutor>) -> Self {
906 let (tx, rx) = mpsc::channel(1);
907 Self {
908 executor,
909 tx: Some(tx),
910 rx,
911 futures: Default::default(),
912 _phantom: PhantomData,
913 }
914 }
915
916 pub fn spawn<F>(&mut self, f: F)
917 where
918 F: Future<Output = ()> + Send + 'a,
919 {
920 let tx = self.tx.clone().unwrap();
921
922 // Safety: The 'a lifetime is guaranteed to outlive any of these futures because
923 // dropping this `Scope` blocks until all of the futures have resolved.
924 let f = unsafe {
925 mem::transmute::<
926 Pin<Box<dyn Future<Output = ()> + Send + 'a>>,
927 Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
928 >(Box::pin(async move {
929 f.await;
930 drop(tx);
931 }))
932 };
933 self.futures.push(f);
934 }
935}
936
937impl<'a> Drop for Scope<'a> {
938 fn drop(&mut self) {
939 self.tx.take().unwrap();
940
941 // Wait until the channel is closed, which means that all of the spawned
942 // futures have resolved.
943 self.executor.block(self.rx.next());
944 }
945}
946
947impl<T> Task<T> {
948 pub fn ready(value: T) -> Self {
949 Self::Ready(Some(value))
950 }
951
952 fn local(any_task: AnyLocalTask) -> Self {
953 Self::Local {
954 any_task,
955 result_type: PhantomData,
956 }
957 }
958
959 pub fn detach(self) {
960 match self {
961 Task::Ready(_) => {}
962 Task::Local { any_task, .. } => any_task.detach(),
963 Task::Send { any_task, .. } => any_task.detach(),
964 }
965 }
966}
967
968// impl<T: 'static, E: 'static + Display> Task<Result<T, E>> {
969// #[track_caller]
970// pub fn detach_and_log_err(self, cx: &mut AppContext) {
971// let caller = Location::caller();
972// cx.spawn(|_| async move {
973// if let Err(err) = self.await {
974// log::error!("{}:{}: {:#}", caller.file(), caller.line(), err);
975// }
976// })
977// .detach();
978// }
979// }
980
981impl<T: Send> Task<T> {
982 fn send(any_task: AnyTask) -> Self {
983 Self::Send {
984 any_task,
985 result_type: PhantomData,
986 }
987 }
988}
989
990impl<T: fmt::Debug> fmt::Debug for Task<T> {
991 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
992 match self {
993 Task::Ready(value) => value.fmt(f),
994 Task::Local { any_task, .. } => any_task.fmt(f),
995 Task::Send { any_task, .. } => any_task.fmt(f),
996 }
997 }
998}
999
1000impl<T: 'static> Future for Task<T> {
1001 type Output = T;
1002
1003 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1004 match unsafe { self.get_unchecked_mut() } {
1005 Task::Ready(value) => Poll::Ready(value.take().unwrap()),
1006 Task::Local { any_task, .. } => {
1007 any_task.poll(cx).map(|value| *value.downcast().unwrap())
1008 }
1009 Task::Send { any_task, .. } => {
1010 any_task.poll(cx).map(|value| *value.downcast().unwrap())
1011 }
1012 }
1013 }
1014}
1015
1016fn any_future<T, F>(future: F) -> AnyFuture
1017where
1018 T: 'static + Send,
1019 F: Future<Output = T> + Send + 'static,
1020{
1021 async { Box::new(future.await) as Box<dyn Any + Send> }.boxed()
1022}
1023
1024fn any_local_future<T, F>(future: F) -> AnyLocalFuture
1025where
1026 T: 'static,
1027 F: Future<Output = T> + 'static,
1028{
1029 async { Box::new(future.await) as Box<dyn Any> }.boxed_local()
1030}