1use anyhow::{anyhow, Result};
2use async_task::Runnable;
3use futures::channel::mpsc;
4use smol::{channel, prelude::*, Executor};
5use std::{
6 any::Any,
7 fmt::{self, Display},
8 marker::PhantomData,
9 mem,
10 panic::Location,
11 pin::Pin,
12 rc::Rc,
13 sync::Arc,
14 task::{Context, Poll},
15 thread,
16 time::Duration,
17};
18
19use crate::{
20 platform::{self, Dispatcher},
21 util, AppContext,
22};
23
24pub enum Foreground {
25 Platform {
26 dispatcher: Arc<dyn platform::Dispatcher>,
27 _not_send_or_sync: PhantomData<Rc<()>>,
28 },
29 #[cfg(any(test, feature = "test-support"))]
30 Deterministic {
31 cx_id: usize,
32 executor: Arc<Deterministic>,
33 },
34}
35
36pub enum Background {
37 #[cfg(any(test, feature = "test-support"))]
38 Deterministic { executor: Arc<Deterministic> },
39 Production {
40 executor: Arc<smol::Executor<'static>>,
41 _stop: channel::Sender<()>,
42 },
43}
44
45type AnyLocalFuture = Pin<Box<dyn 'static + Future<Output = Box<dyn Any + 'static>>>>;
46type AnyFuture = Pin<Box<dyn 'static + Send + Future<Output = Box<dyn Any + Send + 'static>>>>;
47type AnyTask = async_task::Task<Box<dyn Any + Send + 'static>>;
48type AnyLocalTask = async_task::Task<Box<dyn Any + 'static>>;
49
50#[must_use]
51pub enum Task<T> {
52 Ready(Option<T>),
53 Local {
54 any_task: AnyLocalTask,
55 result_type: PhantomData<T>,
56 },
57 Send {
58 any_task: AnyTask,
59 result_type: PhantomData<T>,
60 },
61}
62
63unsafe impl<T: Send> Send for Task<T> {}
64
65#[cfg(any(test, feature = "test-support"))]
66struct DeterministicState {
67 rng: rand::prelude::StdRng,
68 seed: u64,
69 scheduled_from_foreground: collections::HashMap<usize, Vec<ForegroundRunnable>>,
70 scheduled_from_background: Vec<BackgroundRunnable>,
71 forbid_parking: bool,
72 block_on_ticks: std::ops::RangeInclusive<usize>,
73 now: std::time::Instant,
74 next_timer_id: usize,
75 pending_timers: Vec<(usize, std::time::Instant, postage::barrier::Sender)>,
76 waiting_backtrace: Option<backtrace::Backtrace>,
77 next_runnable_id: usize,
78 poll_history: Vec<ExecutorEvent>,
79 previous_poll_history: Option<Vec<ExecutorEvent>>,
80 enable_runnable_backtraces: bool,
81 runnable_backtraces: collections::HashMap<usize, backtrace::Backtrace>,
82}
83
84#[derive(Copy, Clone, Debug, PartialEq, Eq)]
85pub enum ExecutorEvent {
86 PollRunnable { id: usize },
87 EnqueueRunnable { id: usize },
88}
89
90#[cfg(any(test, feature = "test-support"))]
91struct ForegroundRunnable {
92 id: usize,
93 runnable: Runnable,
94 main: bool,
95}
96
97#[cfg(any(test, feature = "test-support"))]
98struct BackgroundRunnable {
99 id: usize,
100 runnable: Runnable,
101}
102
103#[cfg(any(test, feature = "test-support"))]
104pub struct Deterministic {
105 state: Arc<parking_lot::Mutex<DeterministicState>>,
106 parker: parking_lot::Mutex<parking::Parker>,
107}
108
109#[must_use]
110pub enum Timer {
111 Production(smol::Timer),
112 #[cfg(any(test, feature = "test-support"))]
113 Deterministic(DeterministicTimer),
114}
115
116#[cfg(any(test, feature = "test-support"))]
117pub struct DeterministicTimer {
118 rx: postage::barrier::Receiver,
119 id: usize,
120 state: Arc<parking_lot::Mutex<DeterministicState>>,
121}
122
123#[cfg(any(test, feature = "test-support"))]
124impl Deterministic {
125 pub fn new(seed: u64) -> Arc<Self> {
126 use rand::prelude::*;
127
128 Arc::new(Self {
129 state: Arc::new(parking_lot::Mutex::new(DeterministicState {
130 rng: StdRng::seed_from_u64(seed),
131 seed,
132 scheduled_from_foreground: Default::default(),
133 scheduled_from_background: Default::default(),
134 forbid_parking: false,
135 block_on_ticks: 0..=1000,
136 now: std::time::Instant::now(),
137 next_timer_id: Default::default(),
138 pending_timers: Default::default(),
139 waiting_backtrace: None,
140 next_runnable_id: 0,
141 poll_history: Default::default(),
142 previous_poll_history: Default::default(),
143 enable_runnable_backtraces: false,
144 runnable_backtraces: Default::default(),
145 })),
146 parker: Default::default(),
147 })
148 }
149
150 pub fn execution_history(&self) -> Vec<ExecutorEvent> {
151 self.state.lock().poll_history.clone()
152 }
153
154 pub fn set_previous_execution_history(&self, history: Option<Vec<ExecutorEvent>>) {
155 self.state.lock().previous_poll_history = history;
156 }
157
158 pub fn enable_runnable_backtrace(&self) {
159 self.state.lock().enable_runnable_backtraces = true;
160 }
161
162 pub fn runnable_backtrace(&self, runnable_id: usize) -> backtrace::Backtrace {
163 let mut backtrace = self.state.lock().runnable_backtraces[&runnable_id].clone();
164 backtrace.resolve();
165 backtrace
166 }
167
168 pub fn build_background(self: &Arc<Self>) -> Arc<Background> {
169 Arc::new(Background::Deterministic {
170 executor: self.clone(),
171 })
172 }
173
174 pub fn build_foreground(self: &Arc<Self>, id: usize) -> Rc<Foreground> {
175 Rc::new(Foreground::Deterministic {
176 cx_id: id,
177 executor: self.clone(),
178 })
179 }
180
181 fn spawn_from_foreground(
182 &self,
183 cx_id: usize,
184 future: AnyLocalFuture,
185 main: bool,
186 ) -> AnyLocalTask {
187 let state = self.state.clone();
188 let id;
189 {
190 let mut state = state.lock();
191 id = util::post_inc(&mut state.next_runnable_id);
192 if state.enable_runnable_backtraces {
193 state
194 .runnable_backtraces
195 .insert(id, backtrace::Backtrace::new_unresolved());
196 }
197 }
198
199 let unparker = self.parker.lock().unparker();
200 let (runnable, task) = async_task::spawn_local(future, move |runnable| {
201 let mut state = state.lock();
202 state.push_to_history(ExecutorEvent::EnqueueRunnable { id });
203 state
204 .scheduled_from_foreground
205 .entry(cx_id)
206 .or_default()
207 .push(ForegroundRunnable { id, runnable, main });
208 unparker.unpark();
209 });
210 runnable.schedule();
211 task
212 }
213
214 fn spawn(&self, future: AnyFuture) -> AnyTask {
215 let state = self.state.clone();
216 let id;
217 {
218 let mut state = state.lock();
219 id = util::post_inc(&mut state.next_runnable_id);
220 if state.enable_runnable_backtraces {
221 state
222 .runnable_backtraces
223 .insert(id, backtrace::Backtrace::new_unresolved());
224 }
225 }
226
227 let unparker = self.parker.lock().unparker();
228 let (runnable, task) = async_task::spawn(future, move |runnable| {
229 let mut state = state.lock();
230 state
231 .poll_history
232 .push(ExecutorEvent::EnqueueRunnable { id });
233 state
234 .scheduled_from_background
235 .push(BackgroundRunnable { id, runnable });
236 unparker.unpark();
237 });
238 runnable.schedule();
239 task
240 }
241
242 fn run<'a>(
243 &self,
244 cx_id: usize,
245 main_future: Pin<Box<dyn 'a + Future<Output = Box<dyn Any>>>>,
246 ) -> Box<dyn Any> {
247 use std::sync::atomic::{AtomicBool, Ordering::SeqCst};
248
249 let woken = Arc::new(AtomicBool::new(false));
250
251 let state = self.state.clone();
252 let id;
253 {
254 let mut state = state.lock();
255 id = util::post_inc(&mut state.next_runnable_id);
256 if state.enable_runnable_backtraces {
257 state
258 .runnable_backtraces
259 .insert(id, backtrace::Backtrace::new_unresolved());
260 }
261 }
262
263 let unparker = self.parker.lock().unparker();
264 let (runnable, mut main_task) = unsafe {
265 async_task::spawn_unchecked(main_future, move |runnable| {
266 let state = &mut *state.lock();
267 state
268 .scheduled_from_foreground
269 .entry(cx_id)
270 .or_default()
271 .push(ForegroundRunnable {
272 id: util::post_inc(&mut state.next_runnable_id),
273 runnable,
274 main: true,
275 });
276 unparker.unpark();
277 })
278 };
279 runnable.schedule();
280
281 loop {
282 if let Some(result) = self.run_internal(woken.clone(), Some(&mut main_task)) {
283 return result;
284 }
285
286 if !woken.load(SeqCst) {
287 self.state.lock().will_park();
288 }
289
290 woken.store(false, SeqCst);
291 self.parker.lock().park();
292 }
293 }
294
295 pub fn run_until_parked(&self) {
296 use std::sync::atomic::AtomicBool;
297 let woken = Arc::new(AtomicBool::new(false));
298 self.run_internal(woken, None);
299 }
300
301 fn run_internal(
302 &self,
303 woken: Arc<std::sync::atomic::AtomicBool>,
304 mut main_task: Option<&mut AnyLocalTask>,
305 ) -> Option<Box<dyn Any>> {
306 use rand::prelude::*;
307 use std::sync::atomic::Ordering::SeqCst;
308
309 let unparker = self.parker.lock().unparker();
310 let waker = waker_fn::waker_fn(move || {
311 woken.store(true, SeqCst);
312 unparker.unpark();
313 });
314
315 let mut cx = Context::from_waker(&waker);
316 loop {
317 let mut state = self.state.lock();
318
319 if state.scheduled_from_foreground.is_empty()
320 && state.scheduled_from_background.is_empty()
321 {
322 if let Some(main_task) = main_task {
323 if let Poll::Ready(result) = main_task.poll(&mut cx) {
324 return Some(result);
325 }
326 }
327
328 return None;
329 }
330
331 if !state.scheduled_from_background.is_empty() && state.rng.gen() {
332 let background_len = state.scheduled_from_background.len();
333 let ix = state.rng.gen_range(0..background_len);
334 let background_runnable = state.scheduled_from_background.remove(ix);
335 state.push_to_history(ExecutorEvent::PollRunnable {
336 id: background_runnable.id,
337 });
338 drop(state);
339 background_runnable.runnable.run();
340 } else if !state.scheduled_from_foreground.is_empty() {
341 let available_cx_ids = state
342 .scheduled_from_foreground
343 .keys()
344 .copied()
345 .collect::<Vec<_>>();
346 let cx_id_to_run = *available_cx_ids.iter().choose(&mut state.rng).unwrap();
347 let scheduled_from_cx = state
348 .scheduled_from_foreground
349 .get_mut(&cx_id_to_run)
350 .unwrap();
351 let foreground_runnable = scheduled_from_cx.remove(0);
352 if scheduled_from_cx.is_empty() {
353 state.scheduled_from_foreground.remove(&cx_id_to_run);
354 }
355 state.push_to_history(ExecutorEvent::PollRunnable {
356 id: foreground_runnable.id,
357 });
358
359 drop(state);
360
361 foreground_runnable.runnable.run();
362 if let Some(main_task) = main_task.as_mut() {
363 if foreground_runnable.main {
364 if let Poll::Ready(result) = main_task.poll(&mut cx) {
365 return Some(result);
366 }
367 }
368 }
369 }
370 }
371 }
372
373 fn block<F, T>(&self, future: &mut F, max_ticks: usize) -> Option<T>
374 where
375 F: Unpin + Future<Output = T>,
376 {
377 use rand::prelude::*;
378
379 let unparker = self.parker.lock().unparker();
380 let waker = waker_fn::waker_fn(move || {
381 unparker.unpark();
382 });
383
384 let mut cx = Context::from_waker(&waker);
385 for _ in 0..max_ticks {
386 let mut state = self.state.lock();
387 let runnable_count = state.scheduled_from_background.len();
388 let ix = state.rng.gen_range(0..=runnable_count);
389 if ix < state.scheduled_from_background.len() {
390 let background_runnable = state.scheduled_from_background.remove(ix);
391 state.push_to_history(ExecutorEvent::PollRunnable {
392 id: background_runnable.id,
393 });
394 drop(state);
395 background_runnable.runnable.run();
396 } else {
397 drop(state);
398 if let Poll::Ready(result) = future.poll(&mut cx) {
399 return Some(result);
400 }
401 let mut state = self.state.lock();
402 if state.scheduled_from_background.is_empty() {
403 state.will_park();
404 drop(state);
405 self.parker.lock().park();
406 }
407
408 continue;
409 }
410 }
411
412 None
413 }
414
415 pub fn timer(&self, duration: Duration) -> Timer {
416 let (tx, rx) = postage::barrier::channel();
417 let mut state = self.state.lock();
418 let wakeup_at = state.now + duration;
419 let id = util::post_inc(&mut state.next_timer_id);
420 match state
421 .pending_timers
422 .binary_search_by_key(&wakeup_at, |e| e.1)
423 {
424 Ok(ix) | Err(ix) => state.pending_timers.insert(ix, (id, wakeup_at, tx)),
425 }
426 let state = self.state.clone();
427 Timer::Deterministic(DeterministicTimer { rx, id, state })
428 }
429
430 pub fn now(&self) -> std::time::Instant {
431 let state = self.state.lock();
432 state.now
433 }
434
435 pub fn advance_clock(&self, duration: Duration) {
436 let new_now = self.state.lock().now + duration;
437 loop {
438 self.run_until_parked();
439 let mut state = self.state.lock();
440
441 if let Some((_, wakeup_time, _)) = state.pending_timers.first() {
442 let wakeup_time = *wakeup_time;
443 if wakeup_time <= new_now {
444 let timer_count = state
445 .pending_timers
446 .iter()
447 .take_while(|(_, t, _)| *t == wakeup_time)
448 .count();
449 state.now = wakeup_time;
450 let timers_to_wake = state
451 .pending_timers
452 .drain(0..timer_count)
453 .collect::<Vec<_>>();
454 drop(state);
455 drop(timers_to_wake);
456 continue;
457 }
458 }
459
460 break;
461 }
462
463 self.state.lock().now = new_now;
464 }
465
466 pub fn start_waiting(&self) {
467 self.state.lock().waiting_backtrace = Some(backtrace::Backtrace::new_unresolved());
468 }
469
470 pub fn finish_waiting(&self) {
471 self.state.lock().waiting_backtrace.take();
472 }
473
474 pub fn forbid_parking(&self) {
475 use rand::prelude::*;
476
477 let mut state = self.state.lock();
478 state.forbid_parking = true;
479 state.rng = StdRng::seed_from_u64(state.seed);
480 }
481
482 pub fn allow_parking(&self) {
483 use rand::prelude::*;
484
485 let mut state = self.state.lock();
486 state.forbid_parking = false;
487 state.rng = StdRng::seed_from_u64(state.seed);
488 }
489
490 pub async fn simulate_random_delay(&self) {
491 use rand::prelude::*;
492 use smol::future::yield_now;
493 if self.state.lock().rng.gen_bool(0.2) {
494 let yields = self.state.lock().rng.gen_range(1..=10);
495 for _ in 0..yields {
496 yield_now().await;
497 }
498 }
499 }
500
501 pub fn record_backtrace(&self) {
502 let mut state = self.state.lock();
503 if state.enable_runnable_backtraces {
504 let current_id = state
505 .poll_history
506 .iter()
507 .rev()
508 .find_map(|event| match event {
509 ExecutorEvent::PollRunnable { id } => Some(*id),
510 _ => None,
511 });
512 if let Some(id) = current_id {
513 state
514 .runnable_backtraces
515 .insert(id, backtrace::Backtrace::new_unresolved());
516 }
517 }
518 }
519}
520
521impl Drop for Timer {
522 fn drop(&mut self) {
523 #[cfg(any(test, feature = "test-support"))]
524 if let Timer::Deterministic(DeterministicTimer { state, id, .. }) = self {
525 state
526 .lock()
527 .pending_timers
528 .retain(|(timer_id, _, _)| timer_id != id)
529 }
530 }
531}
532
533impl Future for Timer {
534 type Output = ();
535
536 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
537 match &mut *self {
538 #[cfg(any(test, feature = "test-support"))]
539 Self::Deterministic(DeterministicTimer { rx, .. }) => {
540 use postage::stream::{PollRecv, Stream as _};
541 smol::pin!(rx);
542 match rx.poll_recv(&mut postage::Context::from_waker(cx.waker())) {
543 PollRecv::Ready(()) | PollRecv::Closed => Poll::Ready(()),
544 PollRecv::Pending => Poll::Pending,
545 }
546 }
547 Self::Production(timer) => {
548 smol::pin!(timer);
549 match timer.poll(cx) {
550 Poll::Ready(_) => Poll::Ready(()),
551 Poll::Pending => Poll::Pending,
552 }
553 }
554 }
555 }
556}
557
558#[cfg(any(test, feature = "test-support"))]
559impl DeterministicState {
560 fn push_to_history(&mut self, event: ExecutorEvent) {
561 use std::fmt::Write as _;
562
563 self.poll_history.push(event);
564 if let Some(prev_history) = &self.previous_poll_history {
565 let ix = self.poll_history.len() - 1;
566 let prev_event = prev_history[ix];
567 if event != prev_event {
568 let mut message = String::new();
569 writeln!(
570 &mut message,
571 "current runnable backtrace:\n{:?}",
572 self.runnable_backtraces.get_mut(&event.id()).map(|trace| {
573 trace.resolve();
574 util::CwdBacktrace(trace)
575 })
576 )
577 .unwrap();
578 writeln!(
579 &mut message,
580 "previous runnable backtrace:\n{:?}",
581 self.runnable_backtraces
582 .get_mut(&prev_event.id())
583 .map(|trace| {
584 trace.resolve();
585 util::CwdBacktrace(trace)
586 })
587 )
588 .unwrap();
589 panic!("detected non-determinism after {ix}. {message}");
590 }
591 }
592 }
593
594 fn will_park(&mut self) {
595 if self.forbid_parking {
596 let mut backtrace_message = String::new();
597 #[cfg(any(test, feature = "test-support"))]
598 if let Some(backtrace) = self.waiting_backtrace.as_mut() {
599 backtrace.resolve();
600 backtrace_message = format!(
601 "\nbacktrace of waiting future:\n{:?}",
602 util::CwdBacktrace(backtrace)
603 );
604 }
605
606 panic!(
607 "deterministic executor parked after a call to forbid_parking{}",
608 backtrace_message
609 );
610 }
611 }
612}
613
614#[cfg(any(test, feature = "test-support"))]
615impl ExecutorEvent {
616 pub fn id(&self) -> usize {
617 match self {
618 ExecutorEvent::PollRunnable { id } => *id,
619 ExecutorEvent::EnqueueRunnable { id } => *id,
620 }
621 }
622}
623
624impl Foreground {
625 pub fn platform(dispatcher: Arc<dyn platform::Dispatcher>) -> Result<Self> {
626 if dispatcher.is_main_thread() {
627 Ok(Self::Platform {
628 dispatcher,
629 _not_send_or_sync: PhantomData,
630 })
631 } else {
632 Err(anyhow!("must be constructed on main thread"))
633 }
634 }
635
636 pub fn spawn<T: 'static>(&self, future: impl Future<Output = T> + 'static) -> Task<T> {
637 let future = any_local_future(future);
638 let any_task = match self {
639 #[cfg(any(test, feature = "test-support"))]
640 Self::Deterministic { cx_id, executor } => {
641 executor.spawn_from_foreground(*cx_id, future, false)
642 }
643 Self::Platform { dispatcher, .. } => {
644 fn spawn_inner(
645 future: AnyLocalFuture,
646 dispatcher: &Arc<dyn Dispatcher>,
647 ) -> AnyLocalTask {
648 let dispatcher = dispatcher.clone();
649 let schedule =
650 move |runnable: Runnable| dispatcher.run_on_main_thread(runnable);
651 let (runnable, task) = async_task::spawn_local(future, schedule);
652 runnable.schedule();
653 task
654 }
655 spawn_inner(future, dispatcher)
656 }
657 };
658 Task::local(any_task)
659 }
660
661 #[cfg(any(test, feature = "test-support"))]
662 pub fn run<T: 'static>(&self, future: impl Future<Output = T>) -> T {
663 let future = async move { Box::new(future.await) as Box<dyn Any> }.boxed_local();
664 let result = match self {
665 Self::Deterministic { cx_id, executor } => executor.run(*cx_id, future),
666 Self::Platform { .. } => panic!("you can't call run on a platform foreground executor"),
667 };
668 *result.downcast().unwrap()
669 }
670
671 #[cfg(any(test, feature = "test-support"))]
672 pub fn run_until_parked(&self) {
673 match self {
674 Self::Deterministic { executor, .. } => executor.run_until_parked(),
675 _ => panic!("this method can only be called on a deterministic executor"),
676 }
677 }
678
679 #[cfg(any(test, feature = "test-support"))]
680 pub fn parking_forbidden(&self) -> bool {
681 match self {
682 Self::Deterministic { executor, .. } => executor.state.lock().forbid_parking,
683 _ => panic!("this method can only be called on a deterministic executor"),
684 }
685 }
686
687 #[cfg(any(test, feature = "test-support"))]
688 pub fn start_waiting(&self) {
689 match self {
690 Self::Deterministic { executor, .. } => executor.start_waiting(),
691 _ => panic!("this method can only be called on a deterministic executor"),
692 }
693 }
694
695 #[cfg(any(test, feature = "test-support"))]
696 pub fn finish_waiting(&self) {
697 match self {
698 Self::Deterministic { executor, .. } => executor.finish_waiting(),
699 _ => panic!("this method can only be called on a deterministic executor"),
700 }
701 }
702
703 #[cfg(any(test, feature = "test-support"))]
704 pub fn forbid_parking(&self) {
705 match self {
706 Self::Deterministic { executor, .. } => executor.forbid_parking(),
707 _ => panic!("this method can only be called on a deterministic executor"),
708 }
709 }
710
711 #[cfg(any(test, feature = "test-support"))]
712 pub fn allow_parking(&self) {
713 match self {
714 Self::Deterministic { executor, .. } => executor.allow_parking(),
715 _ => panic!("this method can only be called on a deterministic executor"),
716 }
717 }
718
719 #[cfg(any(test, feature = "test-support"))]
720 pub fn advance_clock(&self, duration: Duration) {
721 match self {
722 Self::Deterministic { executor, .. } => executor.advance_clock(duration),
723 _ => panic!("this method can only be called on a deterministic executor"),
724 }
725 }
726
727 #[cfg(any(test, feature = "test-support"))]
728 pub fn set_block_on_ticks(&self, range: std::ops::RangeInclusive<usize>) {
729 match self {
730 Self::Deterministic { executor, .. } => executor.state.lock().block_on_ticks = range,
731 _ => panic!("this method can only be called on a deterministic executor"),
732 }
733 }
734}
735
736impl Background {
737 pub fn new() -> Self {
738 let executor = Arc::new(Executor::new());
739 let stop = channel::unbounded::<()>();
740
741 for i in 0..2 * num_cpus::get() {
742 let executor = executor.clone();
743 let stop = stop.1.clone();
744 thread::Builder::new()
745 .name(format!("background-executor-{}", i))
746 .spawn(move || smol::block_on(executor.run(stop.recv())))
747 .unwrap();
748 }
749
750 Self::Production {
751 executor,
752 _stop: stop.0,
753 }
754 }
755
756 pub fn num_cpus(&self) -> usize {
757 num_cpus::get()
758 }
759
760 pub fn spawn<T, F>(&self, future: F) -> Task<T>
761 where
762 T: 'static + Send,
763 F: Send + Future<Output = T> + 'static,
764 {
765 let future = any_future(future);
766 let any_task = match self {
767 Self::Production { executor, .. } => executor.spawn(future),
768 #[cfg(any(test, feature = "test-support"))]
769 Self::Deterministic { executor } => executor.spawn(future),
770 };
771 Task::send(any_task)
772 }
773
774 pub fn block<F, T>(&self, future: F) -> T
775 where
776 F: Future<Output = T>,
777 {
778 smol::pin!(future);
779 match self {
780 Self::Production { .. } => smol::block_on(&mut future),
781 #[cfg(any(test, feature = "test-support"))]
782 Self::Deterministic { executor, .. } => {
783 executor.block(&mut future, usize::MAX).unwrap()
784 }
785 }
786 }
787
788 pub fn block_with_timeout<F, T>(
789 &self,
790 timeout: Duration,
791 future: F,
792 ) -> Result<T, impl Future<Output = T>>
793 where
794 T: 'static,
795 F: 'static + Unpin + Future<Output = T>,
796 {
797 let mut future = any_local_future(future);
798 if !timeout.is_zero() {
799 let output = match self {
800 Self::Production { .. } => smol::block_on(util::timeout(timeout, &mut future)).ok(),
801 #[cfg(any(test, feature = "test-support"))]
802 Self::Deterministic { executor, .. } => {
803 use rand::prelude::*;
804 let max_ticks = {
805 let mut state = executor.state.lock();
806 let range = state.block_on_ticks.clone();
807 state.rng.gen_range(range)
808 };
809 executor.block(&mut future, max_ticks)
810 }
811 };
812 if let Some(output) = output {
813 return Ok(*output.downcast().unwrap());
814 }
815 }
816 Err(async { *future.await.downcast().unwrap() })
817 }
818
819 pub async fn scoped<'scope, F>(self: &Arc<Self>, scheduler: F)
820 where
821 F: FnOnce(&mut Scope<'scope>),
822 {
823 let mut scope = Scope::new(self.clone());
824 (scheduler)(&mut scope);
825 let spawned = mem::take(&mut scope.futures)
826 .into_iter()
827 .map(|f| self.spawn(f))
828 .collect::<Vec<_>>();
829 for task in spawned {
830 task.await;
831 }
832 }
833
834 pub fn timer(&self, duration: Duration) -> Timer {
835 match self {
836 Background::Production { .. } => Timer::Production(smol::Timer::after(duration)),
837 #[cfg(any(test, feature = "test-support"))]
838 Background::Deterministic { executor } => executor.timer(duration),
839 }
840 }
841
842 pub fn now(&self) -> std::time::Instant {
843 match self {
844 Background::Production { .. } => std::time::Instant::now(),
845 #[cfg(any(test, feature = "test-support"))]
846 Background::Deterministic { executor } => executor.now(),
847 }
848 }
849
850 #[cfg(any(test, feature = "test-support"))]
851 pub fn rng<'a>(&'a self) -> impl 'a + std::ops::DerefMut<Target = rand::prelude::StdRng> {
852 match self {
853 Self::Deterministic { executor, .. } => {
854 parking_lot::lock_api::MutexGuard::map(executor.state.lock(), |s| &mut s.rng)
855 }
856 _ => panic!("this method can only be called on a deterministic executor"),
857 }
858 }
859
860 #[cfg(any(test, feature = "test-support"))]
861 pub async fn simulate_random_delay(&self) {
862 match self {
863 Self::Deterministic { executor, .. } => {
864 executor.simulate_random_delay().await;
865 }
866 _ => {
867 panic!("this method can only be called on a deterministic executor")
868 }
869 }
870 }
871
872 #[cfg(any(test, feature = "test-support"))]
873 pub fn record_backtrace(&self) {
874 match self {
875 Self::Deterministic { executor, .. } => executor.record_backtrace(),
876 _ => {
877 panic!("this method can only be called on a deterministic executor")
878 }
879 }
880 }
881
882 #[cfg(any(test, feature = "test-support"))]
883 pub fn start_waiting(&self) {
884 match self {
885 Self::Deterministic { executor, .. } => executor.start_waiting(),
886 _ => panic!("this method can only be called on a deterministic executor"),
887 }
888 }
889}
890
891impl Default for Background {
892 fn default() -> Self {
893 Self::new()
894 }
895}
896
897pub struct Scope<'a> {
898 executor: Arc<Background>,
899 futures: Vec<Pin<Box<dyn Future<Output = ()> + Send + 'static>>>,
900 tx: Option<mpsc::Sender<()>>,
901 rx: mpsc::Receiver<()>,
902 _phantom: PhantomData<&'a ()>,
903}
904
905impl<'a> Scope<'a> {
906 fn new(executor: Arc<Background>) -> Self {
907 let (tx, rx) = mpsc::channel(1);
908 Self {
909 executor,
910 tx: Some(tx),
911 rx,
912 futures: Default::default(),
913 _phantom: PhantomData,
914 }
915 }
916
917 pub fn spawn<F>(&mut self, f: F)
918 where
919 F: Future<Output = ()> + Send + 'a,
920 {
921 let tx = self.tx.clone().unwrap();
922
923 // Safety: The 'a lifetime is guaranteed to outlive any of these futures because
924 // dropping this `Scope` blocks until all of the futures have resolved.
925 let f = unsafe {
926 mem::transmute::<
927 Pin<Box<dyn Future<Output = ()> + Send + 'a>>,
928 Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
929 >(Box::pin(async move {
930 f.await;
931 drop(tx);
932 }))
933 };
934 self.futures.push(f);
935 }
936}
937
938impl<'a> Drop for Scope<'a> {
939 fn drop(&mut self) {
940 self.tx.take().unwrap();
941
942 // Wait until the channel is closed, which means that all of the spawned
943 // futures have resolved.
944 self.executor.block(self.rx.next());
945 }
946}
947
948impl<T> Task<T> {
949 pub fn ready(value: T) -> Self {
950 Self::Ready(Some(value))
951 }
952
953 fn local(any_task: AnyLocalTask) -> Self {
954 Self::Local {
955 any_task,
956 result_type: PhantomData,
957 }
958 }
959
960 pub fn detach(self) {
961 match self {
962 Task::Ready(_) => {}
963 Task::Local { any_task, .. } => any_task.detach(),
964 Task::Send { any_task, .. } => any_task.detach(),
965 }
966 }
967}
968
969impl<T: 'static, E: 'static + Display> Task<Result<T, E>> {
970 #[track_caller]
971 pub fn detach_and_log_err(self, cx: &mut AppContext) {
972 let caller = Location::caller();
973 cx.spawn(|_| async move {
974 if let Err(err) = self.await {
975 log::error!("{}:{}: {:#}", caller.file(), caller.line(), err);
976 }
977 })
978 .detach();
979 }
980}
981
982impl<T: Send> Task<T> {
983 fn send(any_task: AnyTask) -> Self {
984 Self::Send {
985 any_task,
986 result_type: PhantomData,
987 }
988 }
989}
990
991impl<T: fmt::Debug> fmt::Debug for Task<T> {
992 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
993 match self {
994 Task::Ready(value) => value.fmt(f),
995 Task::Local { any_task, .. } => any_task.fmt(f),
996 Task::Send { any_task, .. } => any_task.fmt(f),
997 }
998 }
999}
1000
1001impl<T: 'static> Future for Task<T> {
1002 type Output = T;
1003
1004 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1005 match unsafe { self.get_unchecked_mut() } {
1006 Task::Ready(value) => Poll::Ready(value.take().unwrap()),
1007 Task::Local { any_task, .. } => {
1008 any_task.poll(cx).map(|value| *value.downcast().unwrap())
1009 }
1010 Task::Send { any_task, .. } => {
1011 any_task.poll(cx).map(|value| *value.downcast().unwrap())
1012 }
1013 }
1014 }
1015}
1016
1017fn any_future<T, F>(future: F) -> AnyFuture
1018where
1019 T: 'static + Send,
1020 F: Future<Output = T> + Send + 'static,
1021{
1022 async { Box::new(future.await) as Box<dyn Any + Send> }.boxed()
1023}
1024
1025fn any_local_future<T, F>(future: F) -> AnyLocalFuture
1026where
1027 T: 'static,
1028 F: Future<Output = T> + 'static,
1029{
1030 async { Box::new(future.await) as Box<dyn Any> }.boxed_local()
1031}