1use anyhow::{anyhow, Result};
2use async_task::Runnable;
3use futures::channel::mpsc;
4use smol::{channel, prelude::*, Executor};
5use std::{
6 any::Any,
7 fmt::{self, Display, Write as _},
8 marker::PhantomData,
9 mem,
10 pin::Pin,
11 rc::Rc,
12 sync::Arc,
13 task::{Context, Poll},
14 thread,
15 time::Duration,
16};
17
18use crate::{
19 platform::{self, Dispatcher},
20 util::{self, CwdBacktrace},
21 MutableAppContext,
22};
23
24pub enum Foreground {
25 Platform {
26 dispatcher: Arc<dyn platform::Dispatcher>,
27 _not_send_or_sync: PhantomData<Rc<()>>,
28 },
29 #[cfg(any(test, feature = "test-support"))]
30 Deterministic {
31 cx_id: usize,
32 executor: Arc<Deterministic>,
33 },
34}
35
36pub enum Background {
37 #[cfg(any(test, feature = "test-support"))]
38 Deterministic { executor: Arc<Deterministic> },
39 Production {
40 executor: Arc<smol::Executor<'static>>,
41 _stop: channel::Sender<()>,
42 },
43}
44
45type AnyLocalFuture = Pin<Box<dyn 'static + Future<Output = Box<dyn Any + 'static>>>>;
46type AnyFuture = Pin<Box<dyn 'static + Send + Future<Output = Box<dyn Any + Send + 'static>>>>;
47type AnyTask = async_task::Task<Box<dyn Any + Send + 'static>>;
48type AnyLocalTask = async_task::Task<Box<dyn Any + 'static>>;
49
50#[must_use]
51pub enum Task<T> {
52 Ready(Option<T>),
53 Local {
54 any_task: AnyLocalTask,
55 result_type: PhantomData<T>,
56 },
57 Send {
58 any_task: AnyTask,
59 result_type: PhantomData<T>,
60 },
61}
62
63unsafe impl<T: Send> Send for Task<T> {}
64
65#[cfg(any(test, feature = "test-support"))]
66struct DeterministicState {
67 rng: rand::prelude::StdRng,
68 seed: u64,
69 scheduled_from_foreground: collections::HashMap<usize, Vec<ForegroundRunnable>>,
70 scheduled_from_background: Vec<BackgroundRunnable>,
71 forbid_parking: bool,
72 block_on_ticks: std::ops::RangeInclusive<usize>,
73 now: std::time::Instant,
74 next_timer_id: usize,
75 pending_timers: Vec<(usize, std::time::Instant, postage::barrier::Sender)>,
76 waiting_backtrace: Option<backtrace::Backtrace>,
77 next_runnable_id: usize,
78 poll_history: Vec<ExecutorEvent>,
79 previous_poll_history: Option<Vec<ExecutorEvent>>,
80 enable_runnable_backtraces: bool,
81 runnable_backtraces: collections::HashMap<usize, backtrace::Backtrace>,
82}
83
84#[derive(Copy, Clone, Debug, PartialEq, Eq)]
85pub enum ExecutorEvent {
86 PollRunnable { id: usize },
87 EnqueuRunnable { id: usize },
88}
89
90#[cfg(any(test, feature = "test-support"))]
91struct ForegroundRunnable {
92 id: usize,
93 runnable: Runnable,
94 main: bool,
95}
96
97#[cfg(any(test, feature = "test-support"))]
98struct BackgroundRunnable {
99 id: usize,
100 runnable: Runnable,
101}
102
103#[cfg(any(test, feature = "test-support"))]
104pub struct Deterministic {
105 state: Arc<parking_lot::Mutex<DeterministicState>>,
106 parker: parking_lot::Mutex<parking::Parker>,
107}
108
109pub enum Timer {
110 Production(smol::Timer),
111 #[cfg(any(test, feature = "test-support"))]
112 Deterministic(DeterministicTimer),
113}
114
115#[cfg(any(test, feature = "test-support"))]
116pub struct DeterministicTimer {
117 rx: postage::barrier::Receiver,
118 id: usize,
119 state: Arc<parking_lot::Mutex<DeterministicState>>,
120}
121
122#[cfg(any(test, feature = "test-support"))]
123impl Deterministic {
124 pub fn new(seed: u64) -> Arc<Self> {
125 use rand::prelude::*;
126
127 Arc::new(Self {
128 state: Arc::new(parking_lot::Mutex::new(DeterministicState {
129 rng: StdRng::seed_from_u64(seed),
130 seed,
131 scheduled_from_foreground: Default::default(),
132 scheduled_from_background: Default::default(),
133 forbid_parking: false,
134 block_on_ticks: 0..=1000,
135 now: std::time::Instant::now(),
136 next_timer_id: Default::default(),
137 pending_timers: Default::default(),
138 waiting_backtrace: None,
139 next_runnable_id: 0,
140 poll_history: Default::default(),
141 previous_poll_history: Default::default(),
142 enable_runnable_backtraces: false,
143 runnable_backtraces: Default::default(),
144 })),
145 parker: Default::default(),
146 })
147 }
148
149 pub fn execution_history(&self) -> Vec<ExecutorEvent> {
150 self.state.lock().poll_history.clone()
151 }
152
153 pub fn set_previous_execution_history(&self, history: Option<Vec<ExecutorEvent>>) {
154 self.state.lock().previous_poll_history = history;
155 }
156
157 pub fn enable_runnable_backtrace(&self) {
158 self.state.lock().enable_runnable_backtraces = true;
159 }
160
161 pub fn runnable_backtrace(&self, runnable_id: usize) -> backtrace::Backtrace {
162 let mut backtrace = self.state.lock().runnable_backtraces[&runnable_id].clone();
163 backtrace.resolve();
164 backtrace
165 }
166
167 pub fn build_background(self: &Arc<Self>) -> Arc<Background> {
168 Arc::new(Background::Deterministic {
169 executor: self.clone(),
170 })
171 }
172
173 pub fn build_foreground(self: &Arc<Self>, id: usize) -> Rc<Foreground> {
174 Rc::new(Foreground::Deterministic {
175 cx_id: id,
176 executor: self.clone(),
177 })
178 }
179
180 fn spawn_from_foreground(
181 &self,
182 cx_id: usize,
183 future: AnyLocalFuture,
184 main: bool,
185 ) -> AnyLocalTask {
186 let state = self.state.clone();
187 let id;
188 {
189 let mut state = state.lock();
190 id = util::post_inc(&mut state.next_runnable_id);
191 if state.enable_runnable_backtraces {
192 state
193 .runnable_backtraces
194 .insert(id, backtrace::Backtrace::new_unresolved());
195 }
196 }
197
198 let unparker = self.parker.lock().unparker();
199 let (runnable, task) = async_task::spawn_local(future, move |runnable| {
200 let mut state = state.lock();
201 state.push_to_history(ExecutorEvent::EnqueuRunnable { id });
202 state
203 .scheduled_from_foreground
204 .entry(cx_id)
205 .or_default()
206 .push(ForegroundRunnable { id, runnable, main });
207 unparker.unpark();
208 });
209 runnable.schedule();
210 task
211 }
212
213 fn spawn(&self, future: AnyFuture) -> AnyTask {
214 let state = self.state.clone();
215 let id;
216 {
217 let mut state = state.lock();
218 id = util::post_inc(&mut state.next_runnable_id);
219 if state.enable_runnable_backtraces {
220 state
221 .runnable_backtraces
222 .insert(id, backtrace::Backtrace::new_unresolved());
223 }
224 }
225
226 let unparker = self.parker.lock().unparker();
227 let (runnable, task) = async_task::spawn(future, move |runnable| {
228 let mut state = state.lock();
229 state
230 .poll_history
231 .push(ExecutorEvent::EnqueuRunnable { id });
232 state
233 .scheduled_from_background
234 .push(BackgroundRunnable { id, runnable });
235 unparker.unpark();
236 });
237 runnable.schedule();
238 task
239 }
240
241 fn run<'a>(
242 &self,
243 cx_id: usize,
244 main_future: Pin<Box<dyn 'a + Future<Output = Box<dyn Any>>>>,
245 ) -> Box<dyn Any> {
246 use std::sync::atomic::{AtomicBool, Ordering::SeqCst};
247
248 let woken = Arc::new(AtomicBool::new(false));
249
250 let state = self.state.clone();
251 let id;
252 {
253 let mut state = state.lock();
254 id = util::post_inc(&mut state.next_runnable_id);
255 if state.enable_runnable_backtraces {
256 state
257 .runnable_backtraces
258 .insert(id, backtrace::Backtrace::new_unresolved());
259 }
260 }
261
262 let unparker = self.parker.lock().unparker();
263 let (runnable, mut main_task) = unsafe {
264 async_task::spawn_unchecked(main_future, move |runnable| {
265 let state = &mut *state.lock();
266 state
267 .scheduled_from_foreground
268 .entry(cx_id)
269 .or_default()
270 .push(ForegroundRunnable {
271 id: util::post_inc(&mut state.next_runnable_id),
272 runnable,
273 main: true,
274 });
275 unparker.unpark();
276 })
277 };
278 runnable.schedule();
279
280 loop {
281 if let Some(result) = self.run_internal(woken.clone(), Some(&mut main_task)) {
282 return result;
283 }
284
285 if !woken.load(SeqCst) {
286 self.state.lock().will_park();
287 }
288
289 woken.store(false, SeqCst);
290 self.parker.lock().park();
291 }
292 }
293
294 pub fn run_until_parked(&self) {
295 use std::sync::atomic::AtomicBool;
296 let woken = Arc::new(AtomicBool::new(false));
297 self.run_internal(woken, None);
298 }
299
300 fn run_internal(
301 &self,
302 woken: Arc<std::sync::atomic::AtomicBool>,
303 mut main_task: Option<&mut AnyLocalTask>,
304 ) -> Option<Box<dyn Any>> {
305 use rand::prelude::*;
306 use std::sync::atomic::Ordering::SeqCst;
307
308 let unparker = self.parker.lock().unparker();
309 let waker = waker_fn::waker_fn(move || {
310 woken.store(true, SeqCst);
311 unparker.unpark();
312 });
313
314 let mut cx = Context::from_waker(&waker);
315 loop {
316 let mut state = self.state.lock();
317
318 if state.scheduled_from_foreground.is_empty()
319 && state.scheduled_from_background.is_empty()
320 {
321 if let Some(main_task) = main_task {
322 if let Poll::Ready(result) = main_task.poll(&mut cx) {
323 return Some(result);
324 }
325 }
326
327 return None;
328 }
329
330 if !state.scheduled_from_background.is_empty() && state.rng.gen() {
331 let background_len = state.scheduled_from_background.len();
332 let ix = state.rng.gen_range(0..background_len);
333 let background_runnable = state.scheduled_from_background.remove(ix);
334 state.push_to_history(ExecutorEvent::PollRunnable {
335 id: background_runnable.id,
336 });
337 drop(state);
338 background_runnable.runnable.run();
339 } else if !state.scheduled_from_foreground.is_empty() {
340 let available_cx_ids = state
341 .scheduled_from_foreground
342 .keys()
343 .copied()
344 .collect::<Vec<_>>();
345 let cx_id_to_run = *available_cx_ids.iter().choose(&mut state.rng).unwrap();
346 let scheduled_from_cx = state
347 .scheduled_from_foreground
348 .get_mut(&cx_id_to_run)
349 .unwrap();
350 let foreground_runnable = scheduled_from_cx.remove(0);
351 if scheduled_from_cx.is_empty() {
352 state.scheduled_from_foreground.remove(&cx_id_to_run);
353 }
354 state.push_to_history(ExecutorEvent::PollRunnable {
355 id: foreground_runnable.id,
356 });
357
358 drop(state);
359
360 foreground_runnable.runnable.run();
361 if let Some(main_task) = main_task.as_mut() {
362 if foreground_runnable.main {
363 if let Poll::Ready(result) = main_task.poll(&mut cx) {
364 return Some(result);
365 }
366 }
367 }
368 }
369 }
370 }
371
372 fn block<F, T>(&self, future: &mut F, max_ticks: usize) -> Option<T>
373 where
374 F: Unpin + Future<Output = T>,
375 {
376 use rand::prelude::*;
377
378 let unparker = self.parker.lock().unparker();
379 let waker = waker_fn::waker_fn(move || {
380 unparker.unpark();
381 });
382
383 let mut cx = Context::from_waker(&waker);
384 for _ in 0..max_ticks {
385 let mut state = self.state.lock();
386 let runnable_count = state.scheduled_from_background.len();
387 let ix = state.rng.gen_range(0..=runnable_count);
388 if ix < state.scheduled_from_background.len() {
389 let background_runnable = state.scheduled_from_background.remove(ix);
390 state.push_to_history(ExecutorEvent::PollRunnable {
391 id: background_runnable.id,
392 });
393 drop(state);
394 background_runnable.runnable.run();
395 } else {
396 drop(state);
397 if let Poll::Ready(result) = future.poll(&mut cx) {
398 return Some(result);
399 }
400 let mut state = self.state.lock();
401 if state.scheduled_from_background.is_empty() {
402 state.will_park();
403 drop(state);
404 self.parker.lock().park();
405 }
406
407 continue;
408 }
409 }
410
411 None
412 }
413
414 pub fn timer(&self, duration: Duration) -> Timer {
415 let (tx, rx) = postage::barrier::channel();
416 let mut state = self.state.lock();
417 let wakeup_at = state.now + duration;
418 let id = util::post_inc(&mut state.next_timer_id);
419 match state
420 .pending_timers
421 .binary_search_by_key(&wakeup_at, |e| e.1)
422 {
423 Ok(ix) | Err(ix) => state.pending_timers.insert(ix, (id, wakeup_at, tx)),
424 }
425 let state = self.state.clone();
426 Timer::Deterministic(DeterministicTimer { rx, id, state })
427 }
428
429 pub fn now(&self) -> std::time::Instant {
430 let state = self.state.lock();
431 state.now
432 }
433
434 pub fn advance_clock(&self, duration: Duration) {
435 let new_now = self.state.lock().now + duration;
436 loop {
437 self.run_until_parked();
438 let mut state = self.state.lock();
439
440 if let Some((_, wakeup_time, _)) = state.pending_timers.first() {
441 let wakeup_time = *wakeup_time;
442 if wakeup_time <= new_now {
443 let timer_count = state
444 .pending_timers
445 .iter()
446 .take_while(|(_, t, _)| *t == wakeup_time)
447 .count();
448 state.now = wakeup_time;
449 let timers_to_wake = state
450 .pending_timers
451 .drain(0..timer_count)
452 .collect::<Vec<_>>();
453 drop(state);
454 drop(timers_to_wake);
455 continue;
456 }
457 }
458
459 break;
460 }
461
462 self.state.lock().now = new_now;
463 }
464
465 pub fn start_waiting(&self) {
466 self.state.lock().waiting_backtrace = Some(backtrace::Backtrace::new_unresolved());
467 }
468
469 pub fn finish_waiting(&self) {
470 self.state.lock().waiting_backtrace.take();
471 }
472
473 pub fn forbid_parking(&self) {
474 use rand::prelude::*;
475
476 let mut state = self.state.lock();
477 state.forbid_parking = true;
478 state.rng = StdRng::seed_from_u64(state.seed);
479 }
480
481 pub async fn simulate_random_delay(&self) {
482 use rand::prelude::*;
483 use smol::future::yield_now;
484 if self.state.lock().rng.gen_bool(0.2) {
485 let yields = self.state.lock().rng.gen_range(1..=10);
486 for _ in 0..yields {
487 yield_now().await;
488 }
489 }
490 }
491
492 pub fn record_backtrace(&self) {
493 let mut state = self.state.lock();
494 if state.enable_runnable_backtraces {
495 let current_id = state
496 .poll_history
497 .iter()
498 .rev()
499 .find_map(|event| match event {
500 ExecutorEvent::PollRunnable { id } => Some(*id),
501 _ => None,
502 });
503 if let Some(id) = current_id {
504 state
505 .runnable_backtraces
506 .insert(id, backtrace::Backtrace::new_unresolved());
507 }
508 }
509 }
510}
511
512impl Drop for Timer {
513 fn drop(&mut self) {
514 #[cfg(any(test, feature = "test-support"))]
515 if let Timer::Deterministic(DeterministicTimer { state, id, .. }) = self {
516 state
517 .lock()
518 .pending_timers
519 .retain(|(timer_id, _, _)| timer_id != id)
520 }
521 }
522}
523
524impl Future for Timer {
525 type Output = ();
526
527 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
528 match &mut *self {
529 #[cfg(any(test, feature = "test-support"))]
530 Self::Deterministic(DeterministicTimer { rx, .. }) => {
531 use postage::stream::{PollRecv, Stream as _};
532 smol::pin!(rx);
533 match rx.poll_recv(&mut postage::Context::from_waker(cx.waker())) {
534 PollRecv::Ready(()) | PollRecv::Closed => Poll::Ready(()),
535 PollRecv::Pending => Poll::Pending,
536 }
537 }
538 Self::Production(timer) => {
539 smol::pin!(timer);
540 match timer.poll(cx) {
541 Poll::Ready(_) => Poll::Ready(()),
542 Poll::Pending => Poll::Pending,
543 }
544 }
545 }
546 }
547}
548
549#[cfg(any(test, feature = "test-support"))]
550impl DeterministicState {
551 fn push_to_history(&mut self, event: ExecutorEvent) {
552 self.poll_history.push(event);
553 if let Some(prev_history) = &self.previous_poll_history {
554 let ix = self.poll_history.len() - 1;
555 let prev_event = prev_history[ix];
556 if event != prev_event {
557 let mut message = String::new();
558 writeln!(
559 &mut message,
560 "current runnable backtrace:\n{:?}",
561 self.runnable_backtraces.get_mut(&event.id()).map(|trace| {
562 trace.resolve();
563 CwdBacktrace(trace)
564 })
565 )
566 .unwrap();
567 writeln!(
568 &mut message,
569 "previous runnable backtrace:\n{:?}",
570 self.runnable_backtraces
571 .get_mut(&prev_event.id())
572 .map(|trace| {
573 trace.resolve();
574 CwdBacktrace(trace)
575 })
576 )
577 .unwrap();
578 panic!("detected non-determinism after {ix}. {message}");
579 }
580 }
581 }
582
583 fn will_park(&mut self) {
584 if self.forbid_parking {
585 let mut backtrace_message = String::new();
586 #[cfg(any(test, feature = "test-support"))]
587 if let Some(backtrace) = self.waiting_backtrace.as_mut() {
588 backtrace.resolve();
589 backtrace_message = format!(
590 "\nbacktrace of waiting future:\n{:?}",
591 util::CwdBacktrace(backtrace)
592 );
593 }
594
595 panic!(
596 "deterministic executor parked after a call to forbid_parking{}",
597 backtrace_message
598 );
599 }
600 }
601}
602
603#[cfg(any(test, feature = "test-support"))]
604impl ExecutorEvent {
605 pub fn id(&self) -> usize {
606 match self {
607 ExecutorEvent::PollRunnable { id } => *id,
608 ExecutorEvent::EnqueuRunnable { id } => *id,
609 }
610 }
611}
612
613impl Foreground {
614 pub fn platform(dispatcher: Arc<dyn platform::Dispatcher>) -> Result<Self> {
615 if dispatcher.is_main_thread() {
616 Ok(Self::Platform {
617 dispatcher,
618 _not_send_or_sync: PhantomData,
619 })
620 } else {
621 Err(anyhow!("must be constructed on main thread"))
622 }
623 }
624
625 pub fn spawn<T: 'static>(&self, future: impl Future<Output = T> + 'static) -> Task<T> {
626 let future = any_local_future(future);
627 let any_task = match self {
628 #[cfg(any(test, feature = "test-support"))]
629 Self::Deterministic { cx_id, executor } => {
630 executor.spawn_from_foreground(*cx_id, future, false)
631 }
632 Self::Platform { dispatcher, .. } => {
633 fn spawn_inner(
634 future: AnyLocalFuture,
635 dispatcher: &Arc<dyn Dispatcher>,
636 ) -> AnyLocalTask {
637 let dispatcher = dispatcher.clone();
638 let schedule =
639 move |runnable: Runnable| dispatcher.run_on_main_thread(runnable);
640 let (runnable, task) = async_task::spawn_local(future, schedule);
641 runnable.schedule();
642 task
643 }
644 spawn_inner(future, dispatcher)
645 }
646 };
647 Task::local(any_task)
648 }
649
650 #[cfg(any(test, feature = "test-support"))]
651 pub fn run<T: 'static>(&self, future: impl Future<Output = T>) -> T {
652 let future = async move { Box::new(future.await) as Box<dyn Any> }.boxed_local();
653 let result = match self {
654 Self::Deterministic { cx_id, executor } => executor.run(*cx_id, future),
655 Self::Platform { .. } => panic!("you can't call run on a platform foreground executor"),
656 };
657 *result.downcast().unwrap()
658 }
659
660 #[cfg(any(test, feature = "test-support"))]
661 pub fn run_until_parked(&self) {
662 match self {
663 Self::Deterministic { executor, .. } => executor.run_until_parked(),
664 _ => panic!("this method can only be called on a deterministic executor"),
665 }
666 }
667
668 #[cfg(any(test, feature = "test-support"))]
669 pub fn parking_forbidden(&self) -> bool {
670 match self {
671 Self::Deterministic { executor, .. } => executor.state.lock().forbid_parking,
672 _ => panic!("this method can only be called on a deterministic executor"),
673 }
674 }
675
676 #[cfg(any(test, feature = "test-support"))]
677 pub fn start_waiting(&self) {
678 match self {
679 Self::Deterministic { executor, .. } => executor.start_waiting(),
680 _ => panic!("this method can only be called on a deterministic executor"),
681 }
682 }
683
684 #[cfg(any(test, feature = "test-support"))]
685 pub fn finish_waiting(&self) {
686 match self {
687 Self::Deterministic { executor, .. } => executor.finish_waiting(),
688 _ => panic!("this method can only be called on a deterministic executor"),
689 }
690 }
691
692 #[cfg(any(test, feature = "test-support"))]
693 pub fn forbid_parking(&self) {
694 match self {
695 Self::Deterministic { executor, .. } => executor.forbid_parking(),
696 _ => panic!("this method can only be called on a deterministic executor"),
697 }
698 }
699
700 #[cfg(any(test, feature = "test-support"))]
701 pub fn advance_clock(&self, duration: Duration) {
702 match self {
703 Self::Deterministic { executor, .. } => executor.advance_clock(duration),
704 _ => panic!("this method can only be called on a deterministic executor"),
705 }
706 }
707
708 #[cfg(any(test, feature = "test-support"))]
709 pub fn set_block_on_ticks(&self, range: std::ops::RangeInclusive<usize>) {
710 match self {
711 Self::Deterministic { executor, .. } => executor.state.lock().block_on_ticks = range,
712 _ => panic!("this method can only be called on a deterministic executor"),
713 }
714 }
715}
716
717impl Background {
718 pub fn new() -> Self {
719 let executor = Arc::new(Executor::new());
720 let stop = channel::unbounded::<()>();
721
722 for i in 0..2 * num_cpus::get() {
723 let executor = executor.clone();
724 let stop = stop.1.clone();
725 thread::Builder::new()
726 .name(format!("background-executor-{}", i))
727 .spawn(move || smol::block_on(executor.run(stop.recv())))
728 .unwrap();
729 }
730
731 Self::Production {
732 executor,
733 _stop: stop.0,
734 }
735 }
736
737 pub fn num_cpus(&self) -> usize {
738 num_cpus::get()
739 }
740
741 pub fn spawn<T, F>(&self, future: F) -> Task<T>
742 where
743 T: 'static + Send,
744 F: Send + Future<Output = T> + 'static,
745 {
746 let future = any_future(future);
747 let any_task = match self {
748 Self::Production { executor, .. } => executor.spawn(future),
749 #[cfg(any(test, feature = "test-support"))]
750 Self::Deterministic { executor } => executor.spawn(future),
751 };
752 Task::send(any_task)
753 }
754
755 pub fn block<F, T>(&self, future: F) -> T
756 where
757 F: Future<Output = T>,
758 {
759 smol::pin!(future);
760 match self {
761 Self::Production { .. } => smol::block_on(&mut future),
762 #[cfg(any(test, feature = "test-support"))]
763 Self::Deterministic { executor, .. } => {
764 executor.block(&mut future, usize::MAX).unwrap()
765 }
766 }
767 }
768
769 pub fn block_with_timeout<F, T>(
770 &self,
771 timeout: Duration,
772 future: F,
773 ) -> Result<T, impl Future<Output = T>>
774 where
775 T: 'static,
776 F: 'static + Unpin + Future<Output = T>,
777 {
778 let mut future = any_local_future(future);
779 if !timeout.is_zero() {
780 let output = match self {
781 Self::Production { .. } => smol::block_on(util::timeout(timeout, &mut future)).ok(),
782 #[cfg(any(test, feature = "test-support"))]
783 Self::Deterministic { executor, .. } => {
784 use rand::prelude::*;
785 let max_ticks = {
786 let mut state = executor.state.lock();
787 let range = state.block_on_ticks.clone();
788 state.rng.gen_range(range)
789 };
790 executor.block(&mut future, max_ticks)
791 }
792 };
793 if let Some(output) = output {
794 return Ok(*output.downcast().unwrap());
795 }
796 }
797 Err(async { *future.await.downcast().unwrap() })
798 }
799
800 pub async fn scoped<'scope, F>(self: &Arc<Self>, scheduler: F)
801 where
802 F: FnOnce(&mut Scope<'scope>),
803 {
804 let mut scope = Scope::new(self.clone());
805 (scheduler)(&mut scope);
806 let spawned = mem::take(&mut scope.futures)
807 .into_iter()
808 .map(|f| self.spawn(f))
809 .collect::<Vec<_>>();
810 for task in spawned {
811 task.await;
812 }
813 }
814
815 pub fn timer(&self, duration: Duration) -> Timer {
816 match self {
817 Background::Production { .. } => Timer::Production(smol::Timer::after(duration)),
818 #[cfg(any(test, feature = "test-support"))]
819 Background::Deterministic { executor } => executor.timer(duration),
820 }
821 }
822
823 pub fn now(&self) -> std::time::Instant {
824 match self {
825 Background::Production { .. } => std::time::Instant::now(),
826 #[cfg(any(test, feature = "test-support"))]
827 Background::Deterministic { executor } => executor.now(),
828 }
829 }
830
831 #[cfg(any(test, feature = "test-support"))]
832 pub async fn simulate_random_delay(&self) {
833 match self {
834 Self::Deterministic { executor, .. } => {
835 executor.simulate_random_delay().await;
836 }
837 _ => {
838 panic!("this method can only be called on a deterministic executor")
839 }
840 }
841 }
842
843 #[cfg(any(test, feature = "test-support"))]
844 pub fn record_backtrace(&self) {
845 match self {
846 Self::Deterministic { executor, .. } => executor.record_backtrace(),
847 _ => {
848 panic!("this method can only be called on a deterministic executor")
849 }
850 }
851 }
852}
853
854impl Default for Background {
855 fn default() -> Self {
856 Self::new()
857 }
858}
859
860pub struct Scope<'a> {
861 executor: Arc<Background>,
862 futures: Vec<Pin<Box<dyn Future<Output = ()> + Send + 'static>>>,
863 tx: Option<mpsc::Sender<()>>,
864 rx: mpsc::Receiver<()>,
865 _phantom: PhantomData<&'a ()>,
866}
867
868impl<'a> Scope<'a> {
869 fn new(executor: Arc<Background>) -> Self {
870 let (tx, rx) = mpsc::channel(1);
871 Self {
872 executor,
873 tx: Some(tx),
874 rx,
875 futures: Default::default(),
876 _phantom: PhantomData,
877 }
878 }
879
880 pub fn spawn<F>(&mut self, f: F)
881 where
882 F: Future<Output = ()> + Send + 'a,
883 {
884 let tx = self.tx.clone().unwrap();
885
886 // Safety: The 'a lifetime is guaranteed to outlive any of these futures because
887 // dropping this `Scope` blocks until all of the futures have resolved.
888 let f = unsafe {
889 mem::transmute::<
890 Pin<Box<dyn Future<Output = ()> + Send + 'a>>,
891 Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
892 >(Box::pin(async move {
893 f.await;
894 drop(tx);
895 }))
896 };
897 self.futures.push(f);
898 }
899}
900
901impl<'a> Drop for Scope<'a> {
902 fn drop(&mut self) {
903 self.tx.take().unwrap();
904
905 // Wait until the channel is closed, which means that all of the spawned
906 // futures have resolved.
907 self.executor.block(self.rx.next());
908 }
909}
910
911impl<T> Task<T> {
912 pub fn ready(value: T) -> Self {
913 Self::Ready(Some(value))
914 }
915
916 fn local(any_task: AnyLocalTask) -> Self {
917 Self::Local {
918 any_task,
919 result_type: PhantomData,
920 }
921 }
922
923 pub fn detach(self) {
924 match self {
925 Task::Ready(_) => {}
926 Task::Local { any_task, .. } => any_task.detach(),
927 Task::Send { any_task, .. } => any_task.detach(),
928 }
929 }
930}
931
932impl<T: 'static, E: 'static + Display> Task<Result<T, E>> {
933 pub fn detach_and_log_err(self, cx: &mut MutableAppContext) {
934 cx.spawn(|_| async move {
935 if let Err(err) = self.await {
936 log::error!("{}", err);
937 }
938 })
939 .detach();
940 }
941}
942
943impl<T: Send> Task<T> {
944 fn send(any_task: AnyTask) -> Self {
945 Self::Send {
946 any_task,
947 result_type: PhantomData,
948 }
949 }
950}
951
952impl<T: fmt::Debug> fmt::Debug for Task<T> {
953 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
954 match self {
955 Task::Ready(value) => value.fmt(f),
956 Task::Local { any_task, .. } => any_task.fmt(f),
957 Task::Send { any_task, .. } => any_task.fmt(f),
958 }
959 }
960}
961
962impl<T: 'static> Future for Task<T> {
963 type Output = T;
964
965 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
966 match unsafe { self.get_unchecked_mut() } {
967 Task::Ready(value) => Poll::Ready(value.take().unwrap()),
968 Task::Local { any_task, .. } => {
969 any_task.poll(cx).map(|value| *value.downcast().unwrap())
970 }
971 Task::Send { any_task, .. } => {
972 any_task.poll(cx).map(|value| *value.downcast().unwrap())
973 }
974 }
975 }
976}
977
978fn any_future<T, F>(future: F) -> AnyFuture
979where
980 T: 'static + Send,
981 F: Future<Output = T> + Send + 'static,
982{
983 async { Box::new(future.await) as Box<dyn Any + Send> }.boxed()
984}
985
986fn any_local_future<T, F>(future: F) -> AnyLocalFuture
987where
988 T: 'static,
989 F: Future<Output = T> + 'static,
990{
991 async { Box::new(future.await) as Box<dyn Any> }.boxed_local()
992}