1use anyhow::{anyhow, Result};
2use async_task::Runnable;
3use futures::channel::mpsc;
4use smol::{channel, prelude::*, Executor};
5use std::{
6 any::Any,
7 fmt::{self, Display},
8 marker::PhantomData,
9 mem,
10 pin::Pin,
11 rc::Rc,
12 sync::Arc,
13 task::{Context, Poll},
14 thread,
15 time::Duration,
16};
17
18use crate::{
19 platform::{self, Dispatcher},
20 util, AppContext,
21};
22
23pub enum Foreground {
24 Platform {
25 dispatcher: Arc<dyn platform::Dispatcher>,
26 _not_send_or_sync: PhantomData<Rc<()>>,
27 },
28 #[cfg(any(test, feature = "test-support"))]
29 Deterministic {
30 cx_id: usize,
31 executor: Arc<Deterministic>,
32 },
33}
34
35pub enum Background {
36 #[cfg(any(test, feature = "test-support"))]
37 Deterministic { executor: Arc<Deterministic> },
38 Production {
39 executor: Arc<smol::Executor<'static>>,
40 _stop: channel::Sender<()>,
41 },
42}
43
44type AnyLocalFuture = Pin<Box<dyn 'static + Future<Output = Box<dyn Any + 'static>>>>;
45type AnyFuture = Pin<Box<dyn 'static + Send + Future<Output = Box<dyn Any + Send + 'static>>>>;
46type AnyTask = async_task::Task<Box<dyn Any + Send + 'static>>;
47type AnyLocalTask = async_task::Task<Box<dyn Any + 'static>>;
48
49#[must_use]
50pub enum Task<T> {
51 Ready(Option<T>),
52 Local {
53 any_task: AnyLocalTask,
54 result_type: PhantomData<T>,
55 },
56 Send {
57 any_task: AnyTask,
58 result_type: PhantomData<T>,
59 },
60}
61
62unsafe impl<T: Send> Send for Task<T> {}
63
64#[cfg(any(test, feature = "test-support"))]
65struct DeterministicState {
66 rng: rand::prelude::StdRng,
67 seed: u64,
68 scheduled_from_foreground: collections::HashMap<usize, Vec<ForegroundRunnable>>,
69 scheduled_from_background: Vec<BackgroundRunnable>,
70 forbid_parking: bool,
71 block_on_ticks: std::ops::RangeInclusive<usize>,
72 now: std::time::Instant,
73 next_timer_id: usize,
74 pending_timers: Vec<(usize, std::time::Instant, postage::barrier::Sender)>,
75 waiting_backtrace: Option<backtrace::Backtrace>,
76 next_runnable_id: usize,
77 poll_history: Vec<ExecutorEvent>,
78 previous_poll_history: Option<Vec<ExecutorEvent>>,
79 enable_runnable_backtraces: bool,
80 runnable_backtraces: collections::HashMap<usize, backtrace::Backtrace>,
81}
82
83#[derive(Copy, Clone, Debug, PartialEq, Eq)]
84pub enum ExecutorEvent {
85 PollRunnable { id: usize },
86 EnqueuRunnable { id: usize },
87}
88
89#[cfg(any(test, feature = "test-support"))]
90struct ForegroundRunnable {
91 id: usize,
92 runnable: Runnable,
93 main: bool,
94}
95
96#[cfg(any(test, feature = "test-support"))]
97struct BackgroundRunnable {
98 id: usize,
99 runnable: Runnable,
100}
101
102#[cfg(any(test, feature = "test-support"))]
103pub struct Deterministic {
104 state: Arc<parking_lot::Mutex<DeterministicState>>,
105 parker: parking_lot::Mutex<parking::Parker>,
106}
107
108pub enum Timer {
109 Production(smol::Timer),
110 #[cfg(any(test, feature = "test-support"))]
111 Deterministic(DeterministicTimer),
112}
113
114#[cfg(any(test, feature = "test-support"))]
115pub struct DeterministicTimer {
116 rx: postage::barrier::Receiver,
117 id: usize,
118 state: Arc<parking_lot::Mutex<DeterministicState>>,
119}
120
121#[cfg(any(test, feature = "test-support"))]
122impl Deterministic {
123 pub fn new(seed: u64) -> Arc<Self> {
124 use rand::prelude::*;
125
126 Arc::new(Self {
127 state: Arc::new(parking_lot::Mutex::new(DeterministicState {
128 rng: StdRng::seed_from_u64(seed),
129 seed,
130 scheduled_from_foreground: Default::default(),
131 scheduled_from_background: Default::default(),
132 forbid_parking: false,
133 block_on_ticks: 0..=1000,
134 now: std::time::Instant::now(),
135 next_timer_id: Default::default(),
136 pending_timers: Default::default(),
137 waiting_backtrace: None,
138 next_runnable_id: 0,
139 poll_history: Default::default(),
140 previous_poll_history: Default::default(),
141 enable_runnable_backtraces: false,
142 runnable_backtraces: Default::default(),
143 })),
144 parker: Default::default(),
145 })
146 }
147
148 pub fn execution_history(&self) -> Vec<ExecutorEvent> {
149 self.state.lock().poll_history.clone()
150 }
151
152 pub fn set_previous_execution_history(&self, history: Option<Vec<ExecutorEvent>>) {
153 self.state.lock().previous_poll_history = history;
154 }
155
156 pub fn enable_runnable_backtrace(&self) {
157 self.state.lock().enable_runnable_backtraces = true;
158 }
159
160 pub fn runnable_backtrace(&self, runnable_id: usize) -> backtrace::Backtrace {
161 let mut backtrace = self.state.lock().runnable_backtraces[&runnable_id].clone();
162 backtrace.resolve();
163 backtrace
164 }
165
166 pub fn build_background(self: &Arc<Self>) -> Arc<Background> {
167 Arc::new(Background::Deterministic {
168 executor: self.clone(),
169 })
170 }
171
172 pub fn build_foreground(self: &Arc<Self>, id: usize) -> Rc<Foreground> {
173 Rc::new(Foreground::Deterministic {
174 cx_id: id,
175 executor: self.clone(),
176 })
177 }
178
179 fn spawn_from_foreground(
180 &self,
181 cx_id: usize,
182 future: AnyLocalFuture,
183 main: bool,
184 ) -> AnyLocalTask {
185 let state = self.state.clone();
186 let id;
187 {
188 let mut state = state.lock();
189 id = util::post_inc(&mut state.next_runnable_id);
190 if state.enable_runnable_backtraces {
191 state
192 .runnable_backtraces
193 .insert(id, backtrace::Backtrace::new_unresolved());
194 }
195 }
196
197 let unparker = self.parker.lock().unparker();
198 let (runnable, task) = async_task::spawn_local(future, move |runnable| {
199 let mut state = state.lock();
200 state.push_to_history(ExecutorEvent::EnqueuRunnable { id });
201 state
202 .scheduled_from_foreground
203 .entry(cx_id)
204 .or_default()
205 .push(ForegroundRunnable { id, runnable, main });
206 unparker.unpark();
207 });
208 runnable.schedule();
209 task
210 }
211
212 fn spawn(&self, future: AnyFuture) -> AnyTask {
213 let state = self.state.clone();
214 let id;
215 {
216 let mut state = state.lock();
217 id = util::post_inc(&mut state.next_runnable_id);
218 if state.enable_runnable_backtraces {
219 state
220 .runnable_backtraces
221 .insert(id, backtrace::Backtrace::new_unresolved());
222 }
223 }
224
225 let unparker = self.parker.lock().unparker();
226 let (runnable, task) = async_task::spawn(future, move |runnable| {
227 let mut state = state.lock();
228 state
229 .poll_history
230 .push(ExecutorEvent::EnqueuRunnable { id });
231 state
232 .scheduled_from_background
233 .push(BackgroundRunnable { id, runnable });
234 unparker.unpark();
235 });
236 runnable.schedule();
237 task
238 }
239
240 fn run<'a>(
241 &self,
242 cx_id: usize,
243 main_future: Pin<Box<dyn 'a + Future<Output = Box<dyn Any>>>>,
244 ) -> Box<dyn Any> {
245 use std::sync::atomic::{AtomicBool, Ordering::SeqCst};
246
247 let woken = Arc::new(AtomicBool::new(false));
248
249 let state = self.state.clone();
250 let id;
251 {
252 let mut state = state.lock();
253 id = util::post_inc(&mut state.next_runnable_id);
254 if state.enable_runnable_backtraces {
255 state
256 .runnable_backtraces
257 .insert(id, backtrace::Backtrace::new_unresolved());
258 }
259 }
260
261 let unparker = self.parker.lock().unparker();
262 let (runnable, mut main_task) = unsafe {
263 async_task::spawn_unchecked(main_future, move |runnable| {
264 let state = &mut *state.lock();
265 state
266 .scheduled_from_foreground
267 .entry(cx_id)
268 .or_default()
269 .push(ForegroundRunnable {
270 id: util::post_inc(&mut state.next_runnable_id),
271 runnable,
272 main: true,
273 });
274 unparker.unpark();
275 })
276 };
277 runnable.schedule();
278
279 loop {
280 if let Some(result) = self.run_internal(woken.clone(), Some(&mut main_task)) {
281 return result;
282 }
283
284 if !woken.load(SeqCst) {
285 self.state.lock().will_park();
286 }
287
288 woken.store(false, SeqCst);
289 self.parker.lock().park();
290 }
291 }
292
293 pub fn run_until_parked(&self) {
294 use std::sync::atomic::AtomicBool;
295 let woken = Arc::new(AtomicBool::new(false));
296 self.run_internal(woken, None);
297 }
298
299 fn run_internal(
300 &self,
301 woken: Arc<std::sync::atomic::AtomicBool>,
302 mut main_task: Option<&mut AnyLocalTask>,
303 ) -> Option<Box<dyn Any>> {
304 use rand::prelude::*;
305 use std::sync::atomic::Ordering::SeqCst;
306
307 let unparker = self.parker.lock().unparker();
308 let waker = waker_fn::waker_fn(move || {
309 woken.store(true, SeqCst);
310 unparker.unpark();
311 });
312
313 let mut cx = Context::from_waker(&waker);
314 loop {
315 let mut state = self.state.lock();
316
317 if state.scheduled_from_foreground.is_empty()
318 && state.scheduled_from_background.is_empty()
319 {
320 if let Some(main_task) = main_task {
321 if let Poll::Ready(result) = main_task.poll(&mut cx) {
322 return Some(result);
323 }
324 }
325
326 return None;
327 }
328
329 if !state.scheduled_from_background.is_empty() && state.rng.gen() {
330 let background_len = state.scheduled_from_background.len();
331 let ix = state.rng.gen_range(0..background_len);
332 let background_runnable = state.scheduled_from_background.remove(ix);
333 state.push_to_history(ExecutorEvent::PollRunnable {
334 id: background_runnable.id,
335 });
336 drop(state);
337 background_runnable.runnable.run();
338 } else if !state.scheduled_from_foreground.is_empty() {
339 let available_cx_ids = state
340 .scheduled_from_foreground
341 .keys()
342 .copied()
343 .collect::<Vec<_>>();
344 let cx_id_to_run = *available_cx_ids.iter().choose(&mut state.rng).unwrap();
345 let scheduled_from_cx = state
346 .scheduled_from_foreground
347 .get_mut(&cx_id_to_run)
348 .unwrap();
349 let foreground_runnable = scheduled_from_cx.remove(0);
350 if scheduled_from_cx.is_empty() {
351 state.scheduled_from_foreground.remove(&cx_id_to_run);
352 }
353 state.push_to_history(ExecutorEvent::PollRunnable {
354 id: foreground_runnable.id,
355 });
356
357 drop(state);
358
359 foreground_runnable.runnable.run();
360 if let Some(main_task) = main_task.as_mut() {
361 if foreground_runnable.main {
362 if let Poll::Ready(result) = main_task.poll(&mut cx) {
363 return Some(result);
364 }
365 }
366 }
367 }
368 }
369 }
370
371 fn block<F, T>(&self, future: &mut F, max_ticks: usize) -> Option<T>
372 where
373 F: Unpin + Future<Output = T>,
374 {
375 use rand::prelude::*;
376
377 let unparker = self.parker.lock().unparker();
378 let waker = waker_fn::waker_fn(move || {
379 unparker.unpark();
380 });
381
382 let mut cx = Context::from_waker(&waker);
383 for _ in 0..max_ticks {
384 let mut state = self.state.lock();
385 let runnable_count = state.scheduled_from_background.len();
386 let ix = state.rng.gen_range(0..=runnable_count);
387 if ix < state.scheduled_from_background.len() {
388 let background_runnable = state.scheduled_from_background.remove(ix);
389 state.push_to_history(ExecutorEvent::PollRunnable {
390 id: background_runnable.id,
391 });
392 drop(state);
393 background_runnable.runnable.run();
394 } else {
395 drop(state);
396 if let Poll::Ready(result) = future.poll(&mut cx) {
397 return Some(result);
398 }
399 let mut state = self.state.lock();
400 if state.scheduled_from_background.is_empty() {
401 state.will_park();
402 drop(state);
403 self.parker.lock().park();
404 }
405
406 continue;
407 }
408 }
409
410 None
411 }
412
413 pub fn timer(&self, duration: Duration) -> Timer {
414 let (tx, rx) = postage::barrier::channel();
415 let mut state = self.state.lock();
416 let wakeup_at = state.now + duration;
417 let id = util::post_inc(&mut state.next_timer_id);
418 match state
419 .pending_timers
420 .binary_search_by_key(&wakeup_at, |e| e.1)
421 {
422 Ok(ix) | Err(ix) => state.pending_timers.insert(ix, (id, wakeup_at, tx)),
423 }
424 let state = self.state.clone();
425 Timer::Deterministic(DeterministicTimer { rx, id, state })
426 }
427
428 pub fn now(&self) -> std::time::Instant {
429 let state = self.state.lock();
430 state.now
431 }
432
433 pub fn advance_clock(&self, duration: Duration) {
434 let new_now = self.state.lock().now + duration;
435 loop {
436 self.run_until_parked();
437 let mut state = self.state.lock();
438
439 if let Some((_, wakeup_time, _)) = state.pending_timers.first() {
440 let wakeup_time = *wakeup_time;
441 if wakeup_time <= new_now {
442 let timer_count = state
443 .pending_timers
444 .iter()
445 .take_while(|(_, t, _)| *t == wakeup_time)
446 .count();
447 state.now = wakeup_time;
448 let timers_to_wake = state
449 .pending_timers
450 .drain(0..timer_count)
451 .collect::<Vec<_>>();
452 drop(state);
453 drop(timers_to_wake);
454 continue;
455 }
456 }
457
458 break;
459 }
460
461 self.state.lock().now = new_now;
462 }
463
464 pub fn start_waiting(&self) {
465 self.state.lock().waiting_backtrace = Some(backtrace::Backtrace::new_unresolved());
466 }
467
468 pub fn finish_waiting(&self) {
469 self.state.lock().waiting_backtrace.take();
470 }
471
472 pub fn forbid_parking(&self) {
473 use rand::prelude::*;
474
475 let mut state = self.state.lock();
476 state.forbid_parking = true;
477 state.rng = StdRng::seed_from_u64(state.seed);
478 }
479
480 pub fn allow_parking(&self) {
481 use rand::prelude::*;
482
483 let mut state = self.state.lock();
484 state.forbid_parking = false;
485 state.rng = StdRng::seed_from_u64(state.seed);
486 }
487
488 pub async fn simulate_random_delay(&self) {
489 use rand::prelude::*;
490 use smol::future::yield_now;
491 if self.state.lock().rng.gen_bool(0.2) {
492 let yields = self.state.lock().rng.gen_range(1..=10);
493 for _ in 0..yields {
494 yield_now().await;
495 }
496 }
497 }
498
499 pub fn record_backtrace(&self) {
500 let mut state = self.state.lock();
501 if state.enable_runnable_backtraces {
502 let current_id = state
503 .poll_history
504 .iter()
505 .rev()
506 .find_map(|event| match event {
507 ExecutorEvent::PollRunnable { id } => Some(*id),
508 _ => None,
509 });
510 if let Some(id) = current_id {
511 state
512 .runnable_backtraces
513 .insert(id, backtrace::Backtrace::new_unresolved());
514 }
515 }
516 }
517}
518
519impl Drop for Timer {
520 fn drop(&mut self) {
521 #[cfg(any(test, feature = "test-support"))]
522 if let Timer::Deterministic(DeterministicTimer { state, id, .. }) = self {
523 state
524 .lock()
525 .pending_timers
526 .retain(|(timer_id, _, _)| timer_id != id)
527 }
528 }
529}
530
531impl Future for Timer {
532 type Output = ();
533
534 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
535 match &mut *self {
536 #[cfg(any(test, feature = "test-support"))]
537 Self::Deterministic(DeterministicTimer { rx, .. }) => {
538 use postage::stream::{PollRecv, Stream as _};
539 smol::pin!(rx);
540 match rx.poll_recv(&mut postage::Context::from_waker(cx.waker())) {
541 PollRecv::Ready(()) | PollRecv::Closed => Poll::Ready(()),
542 PollRecv::Pending => Poll::Pending,
543 }
544 }
545 Self::Production(timer) => {
546 smol::pin!(timer);
547 match timer.poll(cx) {
548 Poll::Ready(_) => Poll::Ready(()),
549 Poll::Pending => Poll::Pending,
550 }
551 }
552 }
553 }
554}
555
556#[cfg(any(test, feature = "test-support"))]
557impl DeterministicState {
558 fn push_to_history(&mut self, event: ExecutorEvent) {
559 use std::fmt::Write as _;
560
561 self.poll_history.push(event);
562 if let Some(prev_history) = &self.previous_poll_history {
563 let ix = self.poll_history.len() - 1;
564 let prev_event = prev_history[ix];
565 if event != prev_event {
566 let mut message = String::new();
567 writeln!(
568 &mut message,
569 "current runnable backtrace:\n{:?}",
570 self.runnable_backtraces.get_mut(&event.id()).map(|trace| {
571 trace.resolve();
572 util::CwdBacktrace(trace)
573 })
574 )
575 .unwrap();
576 writeln!(
577 &mut message,
578 "previous runnable backtrace:\n{:?}",
579 self.runnable_backtraces
580 .get_mut(&prev_event.id())
581 .map(|trace| {
582 trace.resolve();
583 util::CwdBacktrace(trace)
584 })
585 )
586 .unwrap();
587 panic!("detected non-determinism after {ix}. {message}");
588 }
589 }
590 }
591
592 fn will_park(&mut self) {
593 if self.forbid_parking {
594 let mut backtrace_message = String::new();
595 #[cfg(any(test, feature = "test-support"))]
596 if let Some(backtrace) = self.waiting_backtrace.as_mut() {
597 backtrace.resolve();
598 backtrace_message = format!(
599 "\nbacktrace of waiting future:\n{:?}",
600 util::CwdBacktrace(backtrace)
601 );
602 }
603
604 panic!(
605 "deterministic executor parked after a call to forbid_parking{}",
606 backtrace_message
607 );
608 }
609 }
610}
611
612#[cfg(any(test, feature = "test-support"))]
613impl ExecutorEvent {
614 pub fn id(&self) -> usize {
615 match self {
616 ExecutorEvent::PollRunnable { id } => *id,
617 ExecutorEvent::EnqueuRunnable { id } => *id,
618 }
619 }
620}
621
622impl Foreground {
623 pub fn platform(dispatcher: Arc<dyn platform::Dispatcher>) -> Result<Self> {
624 if dispatcher.is_main_thread() {
625 Ok(Self::Platform {
626 dispatcher,
627 _not_send_or_sync: PhantomData,
628 })
629 } else {
630 Err(anyhow!("must be constructed on main thread"))
631 }
632 }
633
634 pub fn spawn<T: 'static>(&self, future: impl Future<Output = T> + 'static) -> Task<T> {
635 let future = any_local_future(future);
636 let any_task = match self {
637 #[cfg(any(test, feature = "test-support"))]
638 Self::Deterministic { cx_id, executor } => {
639 executor.spawn_from_foreground(*cx_id, future, false)
640 }
641 Self::Platform { dispatcher, .. } => {
642 fn spawn_inner(
643 future: AnyLocalFuture,
644 dispatcher: &Arc<dyn Dispatcher>,
645 ) -> AnyLocalTask {
646 let dispatcher = dispatcher.clone();
647 let schedule =
648 move |runnable: Runnable| dispatcher.run_on_main_thread(runnable);
649 let (runnable, task) = async_task::spawn_local(future, schedule);
650 runnable.schedule();
651 task
652 }
653 spawn_inner(future, dispatcher)
654 }
655 };
656 Task::local(any_task)
657 }
658
659 #[cfg(any(test, feature = "test-support"))]
660 pub fn run<T: 'static>(&self, future: impl Future<Output = T>) -> T {
661 let future = async move { Box::new(future.await) as Box<dyn Any> }.boxed_local();
662 let result = match self {
663 Self::Deterministic { cx_id, executor } => executor.run(*cx_id, future),
664 Self::Platform { .. } => panic!("you can't call run on a platform foreground executor"),
665 };
666 *result.downcast().unwrap()
667 }
668
669 #[cfg(any(test, feature = "test-support"))]
670 pub fn run_until_parked(&self) {
671 match self {
672 Self::Deterministic { executor, .. } => executor.run_until_parked(),
673 _ => panic!("this method can only be called on a deterministic executor"),
674 }
675 }
676
677 #[cfg(any(test, feature = "test-support"))]
678 pub fn parking_forbidden(&self) -> bool {
679 match self {
680 Self::Deterministic { executor, .. } => executor.state.lock().forbid_parking,
681 _ => panic!("this method can only be called on a deterministic executor"),
682 }
683 }
684
685 #[cfg(any(test, feature = "test-support"))]
686 pub fn start_waiting(&self) {
687 match self {
688 Self::Deterministic { executor, .. } => executor.start_waiting(),
689 _ => panic!("this method can only be called on a deterministic executor"),
690 }
691 }
692
693 #[cfg(any(test, feature = "test-support"))]
694 pub fn finish_waiting(&self) {
695 match self {
696 Self::Deterministic { executor, .. } => executor.finish_waiting(),
697 _ => panic!("this method can only be called on a deterministic executor"),
698 }
699 }
700
701 #[cfg(any(test, feature = "test-support"))]
702 pub fn forbid_parking(&self) {
703 match self {
704 Self::Deterministic { executor, .. } => executor.forbid_parking(),
705 _ => panic!("this method can only be called on a deterministic executor"),
706 }
707 }
708
709 #[cfg(any(test, feature = "test-support"))]
710 pub fn allow_parking(&self) {
711 match self {
712 Self::Deterministic { executor, .. } => executor.allow_parking(),
713 _ => panic!("this method can only be called on a deterministic executor"),
714 }
715 }
716
717 #[cfg(any(test, feature = "test-support"))]
718 pub fn advance_clock(&self, duration: Duration) {
719 match self {
720 Self::Deterministic { executor, .. } => executor.advance_clock(duration),
721 _ => panic!("this method can only be called on a deterministic executor"),
722 }
723 }
724
725 #[cfg(any(test, feature = "test-support"))]
726 pub fn set_block_on_ticks(&self, range: std::ops::RangeInclusive<usize>) {
727 match self {
728 Self::Deterministic { executor, .. } => executor.state.lock().block_on_ticks = range,
729 _ => panic!("this method can only be called on a deterministic executor"),
730 }
731 }
732}
733
734impl Background {
735 pub fn new() -> Self {
736 let executor = Arc::new(Executor::new());
737 let stop = channel::unbounded::<()>();
738
739 for i in 0..2 * num_cpus::get() {
740 let executor = executor.clone();
741 let stop = stop.1.clone();
742 thread::Builder::new()
743 .name(format!("background-executor-{}", i))
744 .spawn(move || smol::block_on(executor.run(stop.recv())))
745 .unwrap();
746 }
747
748 Self::Production {
749 executor,
750 _stop: stop.0,
751 }
752 }
753
754 pub fn num_cpus(&self) -> usize {
755 num_cpus::get()
756 }
757
758 pub fn spawn<T, F>(&self, future: F) -> Task<T>
759 where
760 T: 'static + Send,
761 F: Send + Future<Output = T> + 'static,
762 {
763 let future = any_future(future);
764 let any_task = match self {
765 Self::Production { executor, .. } => executor.spawn(future),
766 #[cfg(any(test, feature = "test-support"))]
767 Self::Deterministic { executor } => executor.spawn(future),
768 };
769 Task::send(any_task)
770 }
771
772 pub fn block<F, T>(&self, future: F) -> T
773 where
774 F: Future<Output = T>,
775 {
776 smol::pin!(future);
777 match self {
778 Self::Production { .. } => smol::block_on(&mut future),
779 #[cfg(any(test, feature = "test-support"))]
780 Self::Deterministic { executor, .. } => {
781 executor.block(&mut future, usize::MAX).unwrap()
782 }
783 }
784 }
785
786 pub fn block_with_timeout<F, T>(
787 &self,
788 timeout: Duration,
789 future: F,
790 ) -> Result<T, impl Future<Output = T>>
791 where
792 T: 'static,
793 F: 'static + Unpin + Future<Output = T>,
794 {
795 let mut future = any_local_future(future);
796 if !timeout.is_zero() {
797 let output = match self {
798 Self::Production { .. } => smol::block_on(util::timeout(timeout, &mut future)).ok(),
799 #[cfg(any(test, feature = "test-support"))]
800 Self::Deterministic { executor, .. } => {
801 use rand::prelude::*;
802 let max_ticks = {
803 let mut state = executor.state.lock();
804 let range = state.block_on_ticks.clone();
805 state.rng.gen_range(range)
806 };
807 executor.block(&mut future, max_ticks)
808 }
809 };
810 if let Some(output) = output {
811 return Ok(*output.downcast().unwrap());
812 }
813 }
814 Err(async { *future.await.downcast().unwrap() })
815 }
816
817 pub async fn scoped<'scope, F>(self: &Arc<Self>, scheduler: F)
818 where
819 F: FnOnce(&mut Scope<'scope>),
820 {
821 let mut scope = Scope::new(self.clone());
822 (scheduler)(&mut scope);
823 let spawned = mem::take(&mut scope.futures)
824 .into_iter()
825 .map(|f| self.spawn(f))
826 .collect::<Vec<_>>();
827 for task in spawned {
828 task.await;
829 }
830 }
831
832 pub fn timer(&self, duration: Duration) -> Timer {
833 match self {
834 Background::Production { .. } => Timer::Production(smol::Timer::after(duration)),
835 #[cfg(any(test, feature = "test-support"))]
836 Background::Deterministic { executor } => executor.timer(duration),
837 }
838 }
839
840 pub fn now(&self) -> std::time::Instant {
841 match self {
842 Background::Production { .. } => std::time::Instant::now(),
843 #[cfg(any(test, feature = "test-support"))]
844 Background::Deterministic { executor } => executor.now(),
845 }
846 }
847
848 #[cfg(any(test, feature = "test-support"))]
849 pub fn rng<'a>(&'a self) -> impl 'a + std::ops::DerefMut<Target = rand::prelude::StdRng> {
850 match self {
851 Self::Deterministic { executor, .. } => {
852 parking_lot::lock_api::MutexGuard::map(executor.state.lock(), |s| &mut s.rng)
853 }
854 _ => panic!("this method can only be called on a deterministic executor"),
855 }
856 }
857
858 #[cfg(any(test, feature = "test-support"))]
859 pub async fn simulate_random_delay(&self) {
860 match self {
861 Self::Deterministic { executor, .. } => {
862 executor.simulate_random_delay().await;
863 }
864 _ => {
865 panic!("this method can only be called on a deterministic executor")
866 }
867 }
868 }
869
870 #[cfg(any(test, feature = "test-support"))]
871 pub fn record_backtrace(&self) {
872 match self {
873 Self::Deterministic { executor, .. } => executor.record_backtrace(),
874 _ => {
875 panic!("this method can only be called on a deterministic executor")
876 }
877 }
878 }
879}
880
881impl Default for Background {
882 fn default() -> Self {
883 Self::new()
884 }
885}
886
887pub struct Scope<'a> {
888 executor: Arc<Background>,
889 futures: Vec<Pin<Box<dyn Future<Output = ()> + Send + 'static>>>,
890 tx: Option<mpsc::Sender<()>>,
891 rx: mpsc::Receiver<()>,
892 _phantom: PhantomData<&'a ()>,
893}
894
895impl<'a> Scope<'a> {
896 fn new(executor: Arc<Background>) -> Self {
897 let (tx, rx) = mpsc::channel(1);
898 Self {
899 executor,
900 tx: Some(tx),
901 rx,
902 futures: Default::default(),
903 _phantom: PhantomData,
904 }
905 }
906
907 pub fn spawn<F>(&mut self, f: F)
908 where
909 F: Future<Output = ()> + Send + 'a,
910 {
911 let tx = self.tx.clone().unwrap();
912
913 // Safety: The 'a lifetime is guaranteed to outlive any of these futures because
914 // dropping this `Scope` blocks until all of the futures have resolved.
915 let f = unsafe {
916 mem::transmute::<
917 Pin<Box<dyn Future<Output = ()> + Send + 'a>>,
918 Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
919 >(Box::pin(async move {
920 f.await;
921 drop(tx);
922 }))
923 };
924 self.futures.push(f);
925 }
926}
927
928impl<'a> Drop for Scope<'a> {
929 fn drop(&mut self) {
930 self.tx.take().unwrap();
931
932 // Wait until the channel is closed, which means that all of the spawned
933 // futures have resolved.
934 self.executor.block(self.rx.next());
935 }
936}
937
938impl<T> Task<T> {
939 pub fn ready(value: T) -> Self {
940 Self::Ready(Some(value))
941 }
942
943 fn local(any_task: AnyLocalTask) -> Self {
944 Self::Local {
945 any_task,
946 result_type: PhantomData,
947 }
948 }
949
950 pub fn detach(self) {
951 match self {
952 Task::Ready(_) => {}
953 Task::Local { any_task, .. } => any_task.detach(),
954 Task::Send { any_task, .. } => any_task.detach(),
955 }
956 }
957}
958
959impl<T: 'static, E: 'static + Display> Task<Result<T, E>> {
960 pub fn detach_and_log_err(self, cx: &mut AppContext) {
961 cx.spawn(|_| async move {
962 if let Err(err) = self.await {
963 log::error!("{:#}", err);
964 }
965 })
966 .detach();
967 }
968}
969
970impl<T: Send> Task<T> {
971 fn send(any_task: AnyTask) -> Self {
972 Self::Send {
973 any_task,
974 result_type: PhantomData,
975 }
976 }
977}
978
979impl<T: fmt::Debug> fmt::Debug for Task<T> {
980 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
981 match self {
982 Task::Ready(value) => value.fmt(f),
983 Task::Local { any_task, .. } => any_task.fmt(f),
984 Task::Send { any_task, .. } => any_task.fmt(f),
985 }
986 }
987}
988
989impl<T: 'static> Future for Task<T> {
990 type Output = T;
991
992 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
993 match unsafe { self.get_unchecked_mut() } {
994 Task::Ready(value) => Poll::Ready(value.take().unwrap()),
995 Task::Local { any_task, .. } => {
996 any_task.poll(cx).map(|value| *value.downcast().unwrap())
997 }
998 Task::Send { any_task, .. } => {
999 any_task.poll(cx).map(|value| *value.downcast().unwrap())
1000 }
1001 }
1002 }
1003}
1004
1005fn any_future<T, F>(future: F) -> AnyFuture
1006where
1007 T: 'static + Send,
1008 F: Future<Output = T> + Send + 'static,
1009{
1010 async { Box::new(future.await) as Box<dyn Any + Send> }.boxed()
1011}
1012
1013fn any_local_future<T, F>(future: F) -> AnyLocalFuture
1014where
1015 T: 'static,
1016 F: Future<Output = T> + 'static,
1017{
1018 async { Box::new(future.await) as Box<dyn Any> }.boxed_local()
1019}