1use anyhow::{anyhow, Result};
2use async_task::Runnable;
3use futures::channel::mpsc;
4use smol::{channel, prelude::*, Executor};
5use std::{
6 any::Any,
7 fmt::{self, Display, Write as _},
8 marker::PhantomData,
9 mem,
10 pin::Pin,
11 rc::Rc,
12 sync::Arc,
13 task::{Context, Poll},
14 thread,
15 time::Duration,
16};
17
18use crate::{
19 platform::{self, Dispatcher},
20 util::{self, CwdBacktrace},
21 MutableAppContext,
22};
23
24pub enum Foreground {
25 Platform {
26 dispatcher: Arc<dyn platform::Dispatcher>,
27 _not_send_or_sync: PhantomData<Rc<()>>,
28 },
29 #[cfg(any(test, feature = "test-support"))]
30 Deterministic {
31 cx_id: usize,
32 executor: Arc<Deterministic>,
33 },
34}
35
36pub enum Background {
37 #[cfg(any(test, feature = "test-support"))]
38 Deterministic { executor: Arc<Deterministic> },
39 Production {
40 executor: Arc<smol::Executor<'static>>,
41 _stop: channel::Sender<()>,
42 },
43}
44
45type AnyLocalFuture = Pin<Box<dyn 'static + Future<Output = Box<dyn Any + 'static>>>>;
46type AnyFuture = Pin<Box<dyn 'static + Send + Future<Output = Box<dyn Any + Send + 'static>>>>;
47type AnyTask = async_task::Task<Box<dyn Any + Send + 'static>>;
48type AnyLocalTask = async_task::Task<Box<dyn Any + 'static>>;
49
50#[must_use]
51pub enum Task<T> {
52 Ready(Option<T>),
53 Local {
54 any_task: AnyLocalTask,
55 result_type: PhantomData<T>,
56 },
57 Send {
58 any_task: AnyTask,
59 result_type: PhantomData<T>,
60 },
61}
62
63unsafe impl<T: Send> Send for Task<T> {}
64
65#[cfg(any(test, feature = "test-support"))]
66struct DeterministicState {
67 rng: rand::prelude::StdRng,
68 seed: u64,
69 scheduled_from_foreground: collections::HashMap<usize, Vec<ForegroundRunnable>>,
70 scheduled_from_background: Vec<BackgroundRunnable>,
71 forbid_parking: bool,
72 block_on_ticks: std::ops::RangeInclusive<usize>,
73 now: std::time::Instant,
74 next_timer_id: usize,
75 pending_timers: Vec<(usize, std::time::Instant, postage::barrier::Sender)>,
76 waiting_backtrace: Option<backtrace::Backtrace>,
77 next_runnable_id: usize,
78 poll_history: Vec<ExecutorEvent>,
79 previous_poll_history: Option<Vec<ExecutorEvent>>,
80 enable_runnable_backtraces: bool,
81 runnable_backtraces: collections::HashMap<usize, backtrace::Backtrace>,
82}
83
84#[derive(Copy, Clone, Debug, PartialEq, Eq)]
85pub enum ExecutorEvent {
86 PollRunnable { id: usize },
87 EnqueuRunnable { id: usize },
88}
89
90#[cfg(any(test, feature = "test-support"))]
91struct ForegroundRunnable {
92 id: usize,
93 runnable: Runnable,
94 main: bool,
95}
96
97#[cfg(any(test, feature = "test-support"))]
98struct BackgroundRunnable {
99 id: usize,
100 runnable: Runnable,
101}
102
103#[cfg(any(test, feature = "test-support"))]
104pub struct Deterministic {
105 state: Arc<parking_lot::Mutex<DeterministicState>>,
106 parker: parking_lot::Mutex<parking::Parker>,
107}
108
109pub enum Timer {
110 Production(smol::Timer),
111 #[cfg(any(test, feature = "test-support"))]
112 Deterministic(DeterministicTimer),
113}
114
115#[cfg(any(test, feature = "test-support"))]
116pub struct DeterministicTimer {
117 rx: postage::barrier::Receiver,
118 id: usize,
119 state: Arc<parking_lot::Mutex<DeterministicState>>,
120}
121
122#[cfg(any(test, feature = "test-support"))]
123impl Deterministic {
124 pub fn new(seed: u64) -> Arc<Self> {
125 use rand::prelude::*;
126
127 Arc::new(Self {
128 state: Arc::new(parking_lot::Mutex::new(DeterministicState {
129 rng: StdRng::seed_from_u64(seed),
130 seed,
131 scheduled_from_foreground: Default::default(),
132 scheduled_from_background: Default::default(),
133 forbid_parking: false,
134 block_on_ticks: 0..=1000,
135 now: std::time::Instant::now(),
136 next_timer_id: Default::default(),
137 pending_timers: Default::default(),
138 waiting_backtrace: None,
139 next_runnable_id: 0,
140 poll_history: Default::default(),
141 previous_poll_history: Default::default(),
142 enable_runnable_backtraces: false,
143 runnable_backtraces: Default::default(),
144 })),
145 parker: Default::default(),
146 })
147 }
148
149 pub fn execution_history(&self) -> Vec<ExecutorEvent> {
150 self.state.lock().poll_history.clone()
151 }
152
153 pub fn set_previous_execution_history(&self, history: Option<Vec<ExecutorEvent>>) {
154 self.state.lock().previous_poll_history = history;
155 }
156
157 pub fn enable_runnable_backtrace(&self) {
158 self.state.lock().enable_runnable_backtraces = true;
159 }
160
161 pub fn runnable_backtrace(&self, runnable_id: usize) -> backtrace::Backtrace {
162 let mut backtrace = self.state.lock().runnable_backtraces[&runnable_id].clone();
163 backtrace.resolve();
164 backtrace
165 }
166
167 pub fn build_background(self: &Arc<Self>) -> Arc<Background> {
168 Arc::new(Background::Deterministic {
169 executor: self.clone(),
170 })
171 }
172
173 pub fn build_foreground(self: &Arc<Self>, id: usize) -> Rc<Foreground> {
174 Rc::new(Foreground::Deterministic {
175 cx_id: id,
176 executor: self.clone(),
177 })
178 }
179
180 fn spawn_from_foreground(
181 &self,
182 cx_id: usize,
183 future: AnyLocalFuture,
184 main: bool,
185 ) -> AnyLocalTask {
186 let state = self.state.clone();
187 let id;
188 {
189 let mut state = state.lock();
190 id = util::post_inc(&mut state.next_runnable_id);
191 if state.enable_runnable_backtraces {
192 state
193 .runnable_backtraces
194 .insert(id, backtrace::Backtrace::new_unresolved());
195 }
196 }
197
198 let unparker = self.parker.lock().unparker();
199 let (runnable, task) = async_task::spawn_local(future, move |runnable| {
200 let mut state = state.lock();
201 state
202 .poll_history
203 .push(ExecutorEvent::EnqueuRunnable { id });
204 state
205 .scheduled_from_foreground
206 .entry(cx_id)
207 .or_default()
208 .push(ForegroundRunnable { id, runnable, main });
209 unparker.unpark();
210 });
211 runnable.schedule();
212 task
213 }
214
215 fn spawn(&self, future: AnyFuture) -> AnyTask {
216 let state = self.state.clone();
217 let id;
218 {
219 let mut state = state.lock();
220 id = util::post_inc(&mut state.next_runnable_id);
221 if state.enable_runnable_backtraces {
222 state
223 .runnable_backtraces
224 .insert(id, backtrace::Backtrace::new_unresolved());
225 }
226 }
227
228 let unparker = self.parker.lock().unparker();
229 let (runnable, task) = async_task::spawn(future, move |runnable| {
230 let mut state = state.lock();
231 state
232 .poll_history
233 .push(ExecutorEvent::EnqueuRunnable { id });
234 state
235 .scheduled_from_background
236 .push(BackgroundRunnable { id, runnable });
237 unparker.unpark();
238 });
239 runnable.schedule();
240 task
241 }
242
243 fn run<'a>(
244 &self,
245 cx_id: usize,
246 main_future: Pin<Box<dyn 'a + Future<Output = Box<dyn Any>>>>,
247 ) -> Box<dyn Any> {
248 use std::sync::atomic::{AtomicBool, Ordering::SeqCst};
249
250 let woken = Arc::new(AtomicBool::new(false));
251
252 let state = self.state.clone();
253 let id;
254 {
255 let mut state = state.lock();
256 id = util::post_inc(&mut state.next_runnable_id);
257 if state.enable_runnable_backtraces {
258 state
259 .runnable_backtraces
260 .insert(id, backtrace::Backtrace::new_unresolved());
261 }
262 }
263
264 let unparker = self.parker.lock().unparker();
265 let (runnable, mut main_task) = unsafe {
266 async_task::spawn_unchecked(main_future, move |runnable| {
267 let state = &mut *state.lock();
268 state
269 .scheduled_from_foreground
270 .entry(cx_id)
271 .or_default()
272 .push(ForegroundRunnable {
273 id: util::post_inc(&mut state.next_runnable_id),
274 runnable,
275 main: true,
276 });
277 unparker.unpark();
278 })
279 };
280 runnable.schedule();
281
282 loop {
283 if let Some(result) = self.run_internal(woken.clone(), Some(&mut main_task)) {
284 return result;
285 }
286
287 if !woken.load(SeqCst) {
288 self.state.lock().will_park();
289 }
290
291 woken.store(false, SeqCst);
292 self.parker.lock().park();
293 }
294 }
295
296 pub fn run_until_parked(&self) {
297 use std::sync::atomic::AtomicBool;
298 let woken = Arc::new(AtomicBool::new(false));
299 self.run_internal(woken, None);
300 }
301
302 fn run_internal(
303 &self,
304 woken: Arc<std::sync::atomic::AtomicBool>,
305 mut main_task: Option<&mut AnyLocalTask>,
306 ) -> Option<Box<dyn Any>> {
307 use rand::prelude::*;
308 use std::sync::atomic::Ordering::SeqCst;
309
310 let unparker = self.parker.lock().unparker();
311 let waker = waker_fn::waker_fn(move || {
312 woken.store(true, SeqCst);
313 unparker.unpark();
314 });
315
316 let mut cx = Context::from_waker(&waker);
317 loop {
318 let mut state = self.state.lock();
319
320 if state.scheduled_from_foreground.is_empty()
321 && state.scheduled_from_background.is_empty()
322 {
323 if let Some(main_task) = main_task {
324 if let Poll::Ready(result) = main_task.poll(&mut cx) {
325 return Some(result);
326 }
327 }
328
329 return None;
330 }
331
332 if !state.scheduled_from_background.is_empty() && state.rng.gen() {
333 let background_len = state.scheduled_from_background.len();
334 let ix = state.rng.gen_range(0..background_len);
335 let background_runnable = state.scheduled_from_background.remove(ix);
336 state.push_to_history(ExecutorEvent::PollRunnable {
337 id: background_runnable.id,
338 });
339 drop(state);
340 background_runnable.runnable.run();
341 } else if !state.scheduled_from_foreground.is_empty() {
342 let available_cx_ids = state
343 .scheduled_from_foreground
344 .keys()
345 .copied()
346 .collect::<Vec<_>>();
347 let cx_id_to_run = *available_cx_ids.iter().choose(&mut state.rng).unwrap();
348 let scheduled_from_cx = state
349 .scheduled_from_foreground
350 .get_mut(&cx_id_to_run)
351 .unwrap();
352 let foreground_runnable = scheduled_from_cx.remove(0);
353 if scheduled_from_cx.is_empty() {
354 state.scheduled_from_foreground.remove(&cx_id_to_run);
355 }
356 state.push_to_history(ExecutorEvent::PollRunnable {
357 id: foreground_runnable.id,
358 });
359
360 drop(state);
361
362 foreground_runnable.runnable.run();
363 if let Some(main_task) = main_task.as_mut() {
364 if foreground_runnable.main {
365 if let Poll::Ready(result) = main_task.poll(&mut cx) {
366 return Some(result);
367 }
368 }
369 }
370 }
371 }
372 }
373
374 fn block<F, T>(&self, future: &mut F, max_ticks: usize) -> Option<T>
375 where
376 F: Unpin + Future<Output = T>,
377 {
378 use rand::prelude::*;
379
380 let unparker = self.parker.lock().unparker();
381 let waker = waker_fn::waker_fn(move || {
382 unparker.unpark();
383 });
384
385 let mut cx = Context::from_waker(&waker);
386 for _ in 0..max_ticks {
387 let mut state = self.state.lock();
388 let runnable_count = state.scheduled_from_background.len();
389 let ix = state.rng.gen_range(0..=runnable_count);
390 if ix < state.scheduled_from_background.len() {
391 let background_runnable = state.scheduled_from_background.remove(ix);
392 state.push_to_history(ExecutorEvent::PollRunnable {
393 id: background_runnable.id,
394 });
395 drop(state);
396 background_runnable.runnable.run();
397 } else {
398 drop(state);
399 if let Poll::Ready(result) = future.poll(&mut cx) {
400 return Some(result);
401 }
402 let mut state = self.state.lock();
403 if state.scheduled_from_background.is_empty() {
404 state.will_park();
405 drop(state);
406 self.parker.lock().park();
407 }
408
409 continue;
410 }
411 }
412
413 None
414 }
415
416 pub fn timer(&self, duration: Duration) -> Timer {
417 let (tx, rx) = postage::barrier::channel();
418 let mut state = self.state.lock();
419 let wakeup_at = state.now + duration;
420 let id = util::post_inc(&mut state.next_timer_id);
421 match state
422 .pending_timers
423 .binary_search_by_key(&wakeup_at, |e| e.1)
424 {
425 Ok(ix) | Err(ix) => state.pending_timers.insert(ix, (id, wakeup_at, tx)),
426 }
427 let state = self.state.clone();
428 Timer::Deterministic(DeterministicTimer { rx, id, state })
429 }
430
431 pub fn now(&self) -> std::time::Instant {
432 let state = self.state.lock();
433 state.now
434 }
435
436 pub fn advance_clock(&self, duration: Duration) {
437 let new_now = self.state.lock().now + duration;
438 loop {
439 self.run_until_parked();
440 let mut state = self.state.lock();
441
442 if let Some((_, wakeup_time, _)) = state.pending_timers.first() {
443 let wakeup_time = *wakeup_time;
444 if wakeup_time <= new_now {
445 let timer_count = state
446 .pending_timers
447 .iter()
448 .take_while(|(_, t, _)| *t == wakeup_time)
449 .count();
450 state.now = wakeup_time;
451 let timers_to_wake = state
452 .pending_timers
453 .drain(0..timer_count)
454 .collect::<Vec<_>>();
455 drop(state);
456 drop(timers_to_wake);
457 continue;
458 }
459 }
460
461 break;
462 }
463
464 self.state.lock().now = new_now;
465 }
466
467 pub fn start_waiting(&self) {
468 self.state.lock().waiting_backtrace = Some(backtrace::Backtrace::new_unresolved());
469 }
470
471 pub fn finish_waiting(&self) {
472 self.state.lock().waiting_backtrace.take();
473 }
474
475 pub fn forbid_parking(&self) {
476 use rand::prelude::*;
477
478 let mut state = self.state.lock();
479 state.forbid_parking = true;
480 state.rng = StdRng::seed_from_u64(state.seed);
481 }
482
483 pub async fn simulate_random_delay(&self) {
484 use rand::prelude::*;
485 use smol::future::yield_now;
486 if self.state.lock().rng.gen_bool(0.2) {
487 let yields = self.state.lock().rng.gen_range(1..=10);
488 for _ in 0..yields {
489 yield_now().await;
490 }
491 }
492 }
493
494 pub fn record_backtrace(&self) {
495 let mut state = self.state.lock();
496 if state.enable_runnable_backtraces {
497 let current_id = state
498 .poll_history
499 .iter()
500 .rev()
501 .find_map(|event| match event {
502 ExecutorEvent::PollRunnable { id } => Some(*id),
503 _ => None,
504 });
505 if let Some(id) = current_id {
506 state
507 .runnable_backtraces
508 .insert(id, backtrace::Backtrace::new_unresolved());
509 }
510 }
511 }
512}
513
514impl Drop for Timer {
515 fn drop(&mut self) {
516 #[cfg(any(test, feature = "test-support"))]
517 if let Timer::Deterministic(DeterministicTimer { state, id, .. }) = self {
518 state
519 .lock()
520 .pending_timers
521 .retain(|(timer_id, _, _)| timer_id != id)
522 }
523 }
524}
525
526impl Future for Timer {
527 type Output = ();
528
529 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
530 match &mut *self {
531 #[cfg(any(test, feature = "test-support"))]
532 Self::Deterministic(DeterministicTimer { rx, .. }) => {
533 use postage::stream::{PollRecv, Stream as _};
534 smol::pin!(rx);
535 match rx.poll_recv(&mut postage::Context::from_waker(cx.waker())) {
536 PollRecv::Ready(()) | PollRecv::Closed => Poll::Ready(()),
537 PollRecv::Pending => Poll::Pending,
538 }
539 }
540 Self::Production(timer) => {
541 smol::pin!(timer);
542 match timer.poll(cx) {
543 Poll::Ready(_) => Poll::Ready(()),
544 Poll::Pending => Poll::Pending,
545 }
546 }
547 }
548 }
549}
550
551#[cfg(any(test, feature = "test-support"))]
552impl DeterministicState {
553 fn push_to_history(&mut self, event: ExecutorEvent) {
554 self.poll_history.push(event);
555 if let Some(prev_history) = &self.previous_poll_history {
556 let ix = self.poll_history.len() - 1;
557 let prev_event = prev_history[ix];
558 if event != prev_event {
559 let mut message = String::new();
560 writeln!(
561 &mut message,
562 "current runnable backtrace:\n{:?}",
563 self.runnable_backtraces.get_mut(&event.id()).map(|trace| {
564 trace.resolve();
565 CwdBacktrace(trace)
566 })
567 )
568 .unwrap();
569 writeln!(
570 &mut message,
571 "previous runnable backtrace:\n{:?}",
572 self.runnable_backtraces
573 .get_mut(&prev_event.id())
574 .map(|trace| {
575 trace.resolve();
576 CwdBacktrace(trace)
577 })
578 )
579 .unwrap();
580 panic!("detected non-determinism after {ix}. {message}");
581 }
582 }
583 }
584
585 fn will_park(&mut self) {
586 if self.forbid_parking {
587 let mut backtrace_message = String::new();
588 #[cfg(any(test, feature = "test-support"))]
589 if let Some(backtrace) = self.waiting_backtrace.as_mut() {
590 backtrace.resolve();
591 backtrace_message = format!(
592 "\nbacktrace of waiting future:\n{:?}",
593 util::CwdBacktrace(backtrace)
594 );
595 }
596
597 panic!(
598 "deterministic executor parked after a call to forbid_parking{}",
599 backtrace_message
600 );
601 }
602 }
603}
604
605#[cfg(any(test, feature = "test-support"))]
606impl ExecutorEvent {
607 pub fn id(&self) -> usize {
608 match self {
609 ExecutorEvent::PollRunnable { id } => *id,
610 ExecutorEvent::EnqueuRunnable { id } => *id,
611 }
612 }
613}
614
615impl Foreground {
616 pub fn platform(dispatcher: Arc<dyn platform::Dispatcher>) -> Result<Self> {
617 if dispatcher.is_main_thread() {
618 Ok(Self::Platform {
619 dispatcher,
620 _not_send_or_sync: PhantomData,
621 })
622 } else {
623 Err(anyhow!("must be constructed on main thread"))
624 }
625 }
626
627 pub fn spawn<T: 'static>(&self, future: impl Future<Output = T> + 'static) -> Task<T> {
628 let future = any_local_future(future);
629 let any_task = match self {
630 #[cfg(any(test, feature = "test-support"))]
631 Self::Deterministic { cx_id, executor } => {
632 executor.spawn_from_foreground(*cx_id, future, false)
633 }
634 Self::Platform { dispatcher, .. } => {
635 fn spawn_inner(
636 future: AnyLocalFuture,
637 dispatcher: &Arc<dyn Dispatcher>,
638 ) -> AnyLocalTask {
639 let dispatcher = dispatcher.clone();
640 let schedule =
641 move |runnable: Runnable| dispatcher.run_on_main_thread(runnable);
642 let (runnable, task) = async_task::spawn_local(future, schedule);
643 runnable.schedule();
644 task
645 }
646 spawn_inner(future, dispatcher)
647 }
648 };
649 Task::local(any_task)
650 }
651
652 #[cfg(any(test, feature = "test-support"))]
653 pub fn run<T: 'static>(&self, future: impl Future<Output = T>) -> T {
654 let future = async move { Box::new(future.await) as Box<dyn Any> }.boxed_local();
655 let result = match self {
656 Self::Deterministic { cx_id, executor } => executor.run(*cx_id, future),
657 Self::Platform { .. } => panic!("you can't call run on a platform foreground executor"),
658 };
659 *result.downcast().unwrap()
660 }
661
662 #[cfg(any(test, feature = "test-support"))]
663 pub fn run_until_parked(&self) {
664 match self {
665 Self::Deterministic { executor, .. } => executor.run_until_parked(),
666 _ => panic!("this method can only be called on a deterministic executor"),
667 }
668 }
669
670 #[cfg(any(test, feature = "test-support"))]
671 pub fn parking_forbidden(&self) -> bool {
672 match self {
673 Self::Deterministic { executor, .. } => executor.state.lock().forbid_parking,
674 _ => panic!("this method can only be called on a deterministic executor"),
675 }
676 }
677
678 #[cfg(any(test, feature = "test-support"))]
679 pub fn start_waiting(&self) {
680 match self {
681 Self::Deterministic { executor, .. } => executor.start_waiting(),
682 _ => panic!("this method can only be called on a deterministic executor"),
683 }
684 }
685
686 #[cfg(any(test, feature = "test-support"))]
687 pub fn finish_waiting(&self) {
688 match self {
689 Self::Deterministic { executor, .. } => executor.finish_waiting(),
690 _ => panic!("this method can only be called on a deterministic executor"),
691 }
692 }
693
694 #[cfg(any(test, feature = "test-support"))]
695 pub fn forbid_parking(&self) {
696 match self {
697 Self::Deterministic { executor, .. } => executor.forbid_parking(),
698 _ => panic!("this method can only be called on a deterministic executor"),
699 }
700 }
701
702 #[cfg(any(test, feature = "test-support"))]
703 pub fn advance_clock(&self, duration: Duration) {
704 match self {
705 Self::Deterministic { executor, .. } => executor.advance_clock(duration),
706 _ => panic!("this method can only be called on a deterministic executor"),
707 }
708 }
709
710 #[cfg(any(test, feature = "test-support"))]
711 pub fn set_block_on_ticks(&self, range: std::ops::RangeInclusive<usize>) {
712 match self {
713 Self::Deterministic { executor, .. } => executor.state.lock().block_on_ticks = range,
714 _ => panic!("this method can only be called on a deterministic executor"),
715 }
716 }
717}
718
719impl Background {
720 pub fn new() -> Self {
721 let executor = Arc::new(Executor::new());
722 let stop = channel::unbounded::<()>();
723
724 for i in 0..2 * num_cpus::get() {
725 let executor = executor.clone();
726 let stop = stop.1.clone();
727 thread::Builder::new()
728 .name(format!("background-executor-{}", i))
729 .spawn(move || smol::block_on(executor.run(stop.recv())))
730 .unwrap();
731 }
732
733 Self::Production {
734 executor,
735 _stop: stop.0,
736 }
737 }
738
739 pub fn num_cpus(&self) -> usize {
740 num_cpus::get()
741 }
742
743 pub fn spawn<T, F>(&self, future: F) -> Task<T>
744 where
745 T: 'static + Send,
746 F: Send + Future<Output = T> + 'static,
747 {
748 let future = any_future(future);
749 let any_task = match self {
750 Self::Production { executor, .. } => executor.spawn(future),
751 #[cfg(any(test, feature = "test-support"))]
752 Self::Deterministic { executor } => executor.spawn(future),
753 };
754 Task::send(any_task)
755 }
756
757 pub fn block<F, T>(&self, future: F) -> T
758 where
759 F: Future<Output = T>,
760 {
761 smol::pin!(future);
762 match self {
763 Self::Production { .. } => smol::block_on(&mut future),
764 #[cfg(any(test, feature = "test-support"))]
765 Self::Deterministic { executor, .. } => {
766 executor.block(&mut future, usize::MAX).unwrap()
767 }
768 }
769 }
770
771 pub fn block_with_timeout<F, T>(
772 &self,
773 timeout: Duration,
774 future: F,
775 ) -> Result<T, impl Future<Output = T>>
776 where
777 T: 'static,
778 F: 'static + Unpin + Future<Output = T>,
779 {
780 let mut future = any_local_future(future);
781 if !timeout.is_zero() {
782 let output = match self {
783 Self::Production { .. } => smol::block_on(util::timeout(timeout, &mut future)).ok(),
784 #[cfg(any(test, feature = "test-support"))]
785 Self::Deterministic { executor, .. } => {
786 use rand::prelude::*;
787 let max_ticks = {
788 let mut state = executor.state.lock();
789 let range = state.block_on_ticks.clone();
790 state.rng.gen_range(range)
791 };
792 executor.block(&mut future, max_ticks)
793 }
794 };
795 if let Some(output) = output {
796 return Ok(*output.downcast().unwrap());
797 }
798 }
799 Err(async { *future.await.downcast().unwrap() })
800 }
801
802 pub async fn scoped<'scope, F>(self: &Arc<Self>, scheduler: F)
803 where
804 F: FnOnce(&mut Scope<'scope>),
805 {
806 let mut scope = Scope::new(self.clone());
807 (scheduler)(&mut scope);
808 let spawned = mem::take(&mut scope.futures)
809 .into_iter()
810 .map(|f| self.spawn(f))
811 .collect::<Vec<_>>();
812 for task in spawned {
813 task.await;
814 }
815 }
816
817 pub fn timer(&self, duration: Duration) -> Timer {
818 match self {
819 Background::Production { .. } => Timer::Production(smol::Timer::after(duration)),
820 #[cfg(any(test, feature = "test-support"))]
821 Background::Deterministic { executor } => executor.timer(duration),
822 }
823 }
824
825 pub fn now(&self) -> std::time::Instant {
826 match self {
827 Background::Production { .. } => std::time::Instant::now(),
828 #[cfg(any(test, feature = "test-support"))]
829 Background::Deterministic { executor } => executor.now(),
830 }
831 }
832
833 #[cfg(any(test, feature = "test-support"))]
834 pub async fn simulate_random_delay(&self) {
835 match self {
836 Self::Deterministic { executor, .. } => {
837 executor.simulate_random_delay().await;
838 }
839 _ => {
840 panic!("this method can only be called on a deterministic executor")
841 }
842 }
843 }
844
845 #[cfg(any(test, feature = "test-support"))]
846 pub fn record_backtrace(&self) {
847 match self {
848 Self::Deterministic { executor, .. } => executor.record_backtrace(),
849 _ => {
850 panic!("this method can only be called on a deterministic executor")
851 }
852 }
853 }
854}
855
856impl Default for Background {
857 fn default() -> Self {
858 Self::new()
859 }
860}
861
862pub struct Scope<'a> {
863 executor: Arc<Background>,
864 futures: Vec<Pin<Box<dyn Future<Output = ()> + Send + 'static>>>,
865 tx: Option<mpsc::Sender<()>>,
866 rx: mpsc::Receiver<()>,
867 _phantom: PhantomData<&'a ()>,
868}
869
870impl<'a> Scope<'a> {
871 fn new(executor: Arc<Background>) -> Self {
872 let (tx, rx) = mpsc::channel(1);
873 Self {
874 executor,
875 tx: Some(tx),
876 rx,
877 futures: Default::default(),
878 _phantom: PhantomData,
879 }
880 }
881
882 pub fn spawn<F>(&mut self, f: F)
883 where
884 F: Future<Output = ()> + Send + 'a,
885 {
886 let tx = self.tx.clone().unwrap();
887
888 // Safety: The 'a lifetime is guaranteed to outlive any of these futures because
889 // dropping this `Scope` blocks until all of the futures have resolved.
890 let f = unsafe {
891 mem::transmute::<
892 Pin<Box<dyn Future<Output = ()> + Send + 'a>>,
893 Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
894 >(Box::pin(async move {
895 f.await;
896 drop(tx);
897 }))
898 };
899 self.futures.push(f);
900 }
901}
902
903impl<'a> Drop for Scope<'a> {
904 fn drop(&mut self) {
905 self.tx.take().unwrap();
906
907 // Wait until the channel is closed, which means that all of the spawned
908 // futures have resolved.
909 self.executor.block(self.rx.next());
910 }
911}
912
913impl<T> Task<T> {
914 pub fn ready(value: T) -> Self {
915 Self::Ready(Some(value))
916 }
917
918 fn local(any_task: AnyLocalTask) -> Self {
919 Self::Local {
920 any_task,
921 result_type: PhantomData,
922 }
923 }
924
925 pub fn detach(self) {
926 match self {
927 Task::Ready(_) => {}
928 Task::Local { any_task, .. } => any_task.detach(),
929 Task::Send { any_task, .. } => any_task.detach(),
930 }
931 }
932}
933
934impl<T: 'static, E: 'static + Display> Task<Result<T, E>> {
935 pub fn detach_and_log_err(self, cx: &mut MutableAppContext) {
936 cx.spawn(|_| async move {
937 if let Err(err) = self.await {
938 log::error!("{}", err);
939 }
940 })
941 .detach();
942 }
943}
944
945impl<T: Send> Task<T> {
946 fn send(any_task: AnyTask) -> Self {
947 Self::Send {
948 any_task,
949 result_type: PhantomData,
950 }
951 }
952}
953
954impl<T: fmt::Debug> fmt::Debug for Task<T> {
955 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
956 match self {
957 Task::Ready(value) => value.fmt(f),
958 Task::Local { any_task, .. } => any_task.fmt(f),
959 Task::Send { any_task, .. } => any_task.fmt(f),
960 }
961 }
962}
963
964impl<T: 'static> Future for Task<T> {
965 type Output = T;
966
967 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
968 match unsafe { self.get_unchecked_mut() } {
969 Task::Ready(value) => Poll::Ready(value.take().unwrap()),
970 Task::Local { any_task, .. } => {
971 any_task.poll(cx).map(|value| *value.downcast().unwrap())
972 }
973 Task::Send { any_task, .. } => {
974 any_task.poll(cx).map(|value| *value.downcast().unwrap())
975 }
976 }
977 }
978}
979
980fn any_future<T, F>(future: F) -> AnyFuture
981where
982 T: 'static + Send,
983 F: Future<Output = T> + Send + 'static,
984{
985 async { Box::new(future.await) as Box<dyn Any + Send> }.boxed()
986}
987
988fn any_local_future<T, F>(future: F) -> AnyLocalFuture
989where
990 T: 'static,
991 F: Future<Output = T> + 'static,
992{
993 async { Box::new(future.await) as Box<dyn Any> }.boxed_local()
994}