1use anyhow::{anyhow, Result};
2use async_task::Runnable;
3use futures::channel::mpsc;
4use smol::{channel, prelude::*, Executor};
5use std::{
6 any::Any,
7 fmt::{self, Display},
8 marker::PhantomData,
9 mem,
10 panic::Location,
11 pin::Pin,
12 rc::Rc,
13 sync::Arc,
14 task::{Context, Poll},
15 thread,
16 time::Duration,
17};
18
19use crate::{
20 platform::{self, Dispatcher},
21 util, AppContext,
22};
23
24pub enum Foreground {
25 Platform {
26 dispatcher: Arc<dyn platform::Dispatcher>,
27 _not_send_or_sync: PhantomData<Rc<()>>,
28 },
29 #[cfg(any(test, feature = "test-support"))]
30 Deterministic {
31 cx_id: usize,
32 executor: Arc<Deterministic>,
33 },
34}
35
36pub enum Background {
37 #[cfg(any(test, feature = "test-support"))]
38 Deterministic { executor: Arc<Deterministic> },
39 Production {
40 executor: Arc<smol::Executor<'static>>,
41 _stop: channel::Sender<()>,
42 },
43}
44
45type AnyLocalFuture = Pin<Box<dyn 'static + Future<Output = Box<dyn Any + 'static>>>>;
46type AnyFuture = Pin<Box<dyn 'static + Send + Future<Output = Box<dyn Any + Send + 'static>>>>;
47type AnyTask = async_task::Task<Box<dyn Any + Send + 'static>>;
48type AnyLocalTask = async_task::Task<Box<dyn Any + 'static>>;
49
50#[must_use]
51pub enum Task<T> {
52 Ready(Option<T>),
53 Local {
54 any_task: AnyLocalTask,
55 result_type: PhantomData<T>,
56 },
57 Send {
58 any_task: AnyTask,
59 result_type: PhantomData<T>,
60 },
61}
62
63unsafe impl<T: Send> Send for Task<T> {}
64
65#[cfg(any(test, feature = "test-support"))]
66struct DeterministicState {
67 rng: rand::prelude::StdRng,
68 seed: u64,
69 scheduled_from_foreground: collections::HashMap<usize, Vec<ForegroundRunnable>>,
70 scheduled_from_background: Vec<BackgroundRunnable>,
71 forbid_parking: bool,
72 block_on_ticks: std::ops::RangeInclusive<usize>,
73 now: std::time::Instant,
74 next_timer_id: usize,
75 pending_timers: Vec<(usize, std::time::Instant, postage::barrier::Sender)>,
76 waiting_backtrace: Option<backtrace::Backtrace>,
77 next_runnable_id: usize,
78 poll_history: Vec<ExecutorEvent>,
79 previous_poll_history: Option<Vec<ExecutorEvent>>,
80 enable_runnable_backtraces: bool,
81 runnable_backtraces: collections::HashMap<usize, backtrace::Backtrace>,
82}
83
84#[derive(Copy, Clone, Debug, PartialEq, Eq)]
85pub enum ExecutorEvent {
86 PollRunnable { id: usize },
87 EnqueuRunnable { id: usize },
88}
89
90#[cfg(any(test, feature = "test-support"))]
91struct ForegroundRunnable {
92 id: usize,
93 runnable: Runnable,
94 main: bool,
95}
96
97#[cfg(any(test, feature = "test-support"))]
98struct BackgroundRunnable {
99 id: usize,
100 runnable: Runnable,
101}
102
103#[cfg(any(test, feature = "test-support"))]
104pub struct Deterministic {
105 state: Arc<parking_lot::Mutex<DeterministicState>>,
106 parker: parking_lot::Mutex<parking::Parker>,
107}
108
109pub enum Timer {
110 Production(smol::Timer),
111 #[cfg(any(test, feature = "test-support"))]
112 Deterministic(DeterministicTimer),
113}
114
115#[cfg(any(test, feature = "test-support"))]
116pub struct DeterministicTimer {
117 rx: postage::barrier::Receiver,
118 id: usize,
119 state: Arc<parking_lot::Mutex<DeterministicState>>,
120}
121
122#[cfg(any(test, feature = "test-support"))]
123impl Deterministic {
124 pub fn new(seed: u64) -> Arc<Self> {
125 use rand::prelude::*;
126
127 Arc::new(Self {
128 state: Arc::new(parking_lot::Mutex::new(DeterministicState {
129 rng: StdRng::seed_from_u64(seed),
130 seed,
131 scheduled_from_foreground: Default::default(),
132 scheduled_from_background: Default::default(),
133 forbid_parking: false,
134 block_on_ticks: 0..=1000,
135 now: std::time::Instant::now(),
136 next_timer_id: Default::default(),
137 pending_timers: Default::default(),
138 waiting_backtrace: None,
139 next_runnable_id: 0,
140 poll_history: Default::default(),
141 previous_poll_history: Default::default(),
142 enable_runnable_backtraces: false,
143 runnable_backtraces: Default::default(),
144 })),
145 parker: Default::default(),
146 })
147 }
148
149 pub fn execution_history(&self) -> Vec<ExecutorEvent> {
150 self.state.lock().poll_history.clone()
151 }
152
153 pub fn set_previous_execution_history(&self, history: Option<Vec<ExecutorEvent>>) {
154 self.state.lock().previous_poll_history = history;
155 }
156
157 pub fn enable_runnable_backtrace(&self) {
158 self.state.lock().enable_runnable_backtraces = true;
159 }
160
161 pub fn runnable_backtrace(&self, runnable_id: usize) -> backtrace::Backtrace {
162 let mut backtrace = self.state.lock().runnable_backtraces[&runnable_id].clone();
163 backtrace.resolve();
164 backtrace
165 }
166
167 pub fn build_background(self: &Arc<Self>) -> Arc<Background> {
168 Arc::new(Background::Deterministic {
169 executor: self.clone(),
170 })
171 }
172
173 pub fn build_foreground(self: &Arc<Self>, id: usize) -> Rc<Foreground> {
174 Rc::new(Foreground::Deterministic {
175 cx_id: id,
176 executor: self.clone(),
177 })
178 }
179
180 fn spawn_from_foreground(
181 &self,
182 cx_id: usize,
183 future: AnyLocalFuture,
184 main: bool,
185 ) -> AnyLocalTask {
186 let state = self.state.clone();
187 let id;
188 {
189 let mut state = state.lock();
190 id = util::post_inc(&mut state.next_runnable_id);
191 if state.enable_runnable_backtraces {
192 state
193 .runnable_backtraces
194 .insert(id, backtrace::Backtrace::new_unresolved());
195 }
196 }
197
198 let unparker = self.parker.lock().unparker();
199 let (runnable, task) = async_task::spawn_local(future, move |runnable| {
200 let mut state = state.lock();
201 state.push_to_history(ExecutorEvent::EnqueuRunnable { id });
202 state
203 .scheduled_from_foreground
204 .entry(cx_id)
205 .or_default()
206 .push(ForegroundRunnable { id, runnable, main });
207 unparker.unpark();
208 });
209 runnable.schedule();
210 task
211 }
212
213 fn spawn(&self, future: AnyFuture) -> AnyTask {
214 let state = self.state.clone();
215 let id;
216 {
217 let mut state = state.lock();
218 id = util::post_inc(&mut state.next_runnable_id);
219 if state.enable_runnable_backtraces {
220 state
221 .runnable_backtraces
222 .insert(id, backtrace::Backtrace::new_unresolved());
223 }
224 }
225
226 let unparker = self.parker.lock().unparker();
227 let (runnable, task) = async_task::spawn(future, move |runnable| {
228 let mut state = state.lock();
229 state
230 .poll_history
231 .push(ExecutorEvent::EnqueuRunnable { id });
232 state
233 .scheduled_from_background
234 .push(BackgroundRunnable { id, runnable });
235 unparker.unpark();
236 });
237 runnable.schedule();
238 task
239 }
240
241 fn run<'a>(
242 &self,
243 cx_id: usize,
244 main_future: Pin<Box<dyn 'a + Future<Output = Box<dyn Any>>>>,
245 ) -> Box<dyn Any> {
246 use std::sync::atomic::{AtomicBool, Ordering::SeqCst};
247
248 let woken = Arc::new(AtomicBool::new(false));
249
250 let state = self.state.clone();
251 let id;
252 {
253 let mut state = state.lock();
254 id = util::post_inc(&mut state.next_runnable_id);
255 if state.enable_runnable_backtraces {
256 state
257 .runnable_backtraces
258 .insert(id, backtrace::Backtrace::new_unresolved());
259 }
260 }
261
262 let unparker = self.parker.lock().unparker();
263 let (runnable, mut main_task) = unsafe {
264 async_task::spawn_unchecked(main_future, move |runnable| {
265 let state = &mut *state.lock();
266 state
267 .scheduled_from_foreground
268 .entry(cx_id)
269 .or_default()
270 .push(ForegroundRunnable {
271 id: util::post_inc(&mut state.next_runnable_id),
272 runnable,
273 main: true,
274 });
275 unparker.unpark();
276 })
277 };
278 runnable.schedule();
279
280 loop {
281 if let Some(result) = self.run_internal(woken.clone(), Some(&mut main_task)) {
282 return result;
283 }
284
285 if !woken.load(SeqCst) {
286 self.state.lock().will_park();
287 }
288
289 woken.store(false, SeqCst);
290 self.parker.lock().park();
291 }
292 }
293
294 pub fn run_until_parked(&self) {
295 use std::sync::atomic::AtomicBool;
296 let woken = Arc::new(AtomicBool::new(false));
297 self.run_internal(woken, None);
298 }
299
300 fn run_internal(
301 &self,
302 woken: Arc<std::sync::atomic::AtomicBool>,
303 mut main_task: Option<&mut AnyLocalTask>,
304 ) -> Option<Box<dyn Any>> {
305 use rand::prelude::*;
306 use std::sync::atomic::Ordering::SeqCst;
307
308 let unparker = self.parker.lock().unparker();
309 let waker = waker_fn::waker_fn(move || {
310 woken.store(true, SeqCst);
311 unparker.unpark();
312 });
313
314 let mut cx = Context::from_waker(&waker);
315 loop {
316 let mut state = self.state.lock();
317
318 if state.scheduled_from_foreground.is_empty()
319 && state.scheduled_from_background.is_empty()
320 {
321 if let Some(main_task) = main_task {
322 if let Poll::Ready(result) = main_task.poll(&mut cx) {
323 return Some(result);
324 }
325 }
326
327 return None;
328 }
329
330 if !state.scheduled_from_background.is_empty() && state.rng.gen() {
331 let background_len = state.scheduled_from_background.len();
332 let ix = state.rng.gen_range(0..background_len);
333 let background_runnable = state.scheduled_from_background.remove(ix);
334 state.push_to_history(ExecutorEvent::PollRunnable {
335 id: background_runnable.id,
336 });
337 drop(state);
338 background_runnable.runnable.run();
339 } else if !state.scheduled_from_foreground.is_empty() {
340 let available_cx_ids = state
341 .scheduled_from_foreground
342 .keys()
343 .copied()
344 .collect::<Vec<_>>();
345 let cx_id_to_run = *available_cx_ids.iter().choose(&mut state.rng).unwrap();
346 let scheduled_from_cx = state
347 .scheduled_from_foreground
348 .get_mut(&cx_id_to_run)
349 .unwrap();
350 let foreground_runnable = scheduled_from_cx.remove(0);
351 if scheduled_from_cx.is_empty() {
352 state.scheduled_from_foreground.remove(&cx_id_to_run);
353 }
354 state.push_to_history(ExecutorEvent::PollRunnable {
355 id: foreground_runnable.id,
356 });
357
358 drop(state);
359
360 foreground_runnable.runnable.run();
361 if let Some(main_task) = main_task.as_mut() {
362 if foreground_runnable.main {
363 if let Poll::Ready(result) = main_task.poll(&mut cx) {
364 return Some(result);
365 }
366 }
367 }
368 }
369 }
370 }
371
372 fn block<F, T>(&self, future: &mut F, max_ticks: usize) -> Option<T>
373 where
374 F: Unpin + Future<Output = T>,
375 {
376 use rand::prelude::*;
377
378 let unparker = self.parker.lock().unparker();
379 let waker = waker_fn::waker_fn(move || {
380 unparker.unpark();
381 });
382
383 let mut cx = Context::from_waker(&waker);
384 for _ in 0..max_ticks {
385 let mut state = self.state.lock();
386 let runnable_count = state.scheduled_from_background.len();
387 let ix = state.rng.gen_range(0..=runnable_count);
388 if ix < state.scheduled_from_background.len() {
389 let background_runnable = state.scheduled_from_background.remove(ix);
390 state.push_to_history(ExecutorEvent::PollRunnable {
391 id: background_runnable.id,
392 });
393 drop(state);
394 background_runnable.runnable.run();
395 } else {
396 drop(state);
397 if let Poll::Ready(result) = future.poll(&mut cx) {
398 return Some(result);
399 }
400 let mut state = self.state.lock();
401 if state.scheduled_from_background.is_empty() {
402 state.will_park();
403 drop(state);
404 self.parker.lock().park();
405 }
406
407 continue;
408 }
409 }
410
411 None
412 }
413
414 pub fn timer(&self, duration: Duration) -> Timer {
415 let (tx, rx) = postage::barrier::channel();
416 let mut state = self.state.lock();
417 let wakeup_at = state.now + duration;
418 let id = util::post_inc(&mut state.next_timer_id);
419 match state
420 .pending_timers
421 .binary_search_by_key(&wakeup_at, |e| e.1)
422 {
423 Ok(ix) | Err(ix) => state.pending_timers.insert(ix, (id, wakeup_at, tx)),
424 }
425 let state = self.state.clone();
426 Timer::Deterministic(DeterministicTimer { rx, id, state })
427 }
428
429 pub fn now(&self) -> std::time::Instant {
430 let state = self.state.lock();
431 state.now
432 }
433
434 pub fn advance_clock(&self, duration: Duration) {
435 let new_now = self.state.lock().now + duration;
436 loop {
437 self.run_until_parked();
438 let mut state = self.state.lock();
439
440 if let Some((_, wakeup_time, _)) = state.pending_timers.first() {
441 let wakeup_time = *wakeup_time;
442 if wakeup_time <= new_now {
443 let timer_count = state
444 .pending_timers
445 .iter()
446 .take_while(|(_, t, _)| *t == wakeup_time)
447 .count();
448 state.now = wakeup_time;
449 let timers_to_wake = state
450 .pending_timers
451 .drain(0..timer_count)
452 .collect::<Vec<_>>();
453 drop(state);
454 drop(timers_to_wake);
455 continue;
456 }
457 }
458
459 break;
460 }
461
462 self.state.lock().now = new_now;
463 }
464
465 pub fn start_waiting(&self) {
466 self.state.lock().waiting_backtrace = Some(backtrace::Backtrace::new_unresolved());
467 }
468
469 pub fn finish_waiting(&self) {
470 self.state.lock().waiting_backtrace.take();
471 }
472
473 pub fn forbid_parking(&self) {
474 use rand::prelude::*;
475
476 let mut state = self.state.lock();
477 state.forbid_parking = true;
478 state.rng = StdRng::seed_from_u64(state.seed);
479 }
480
481 pub fn allow_parking(&self) {
482 use rand::prelude::*;
483
484 let mut state = self.state.lock();
485 state.forbid_parking = false;
486 state.rng = StdRng::seed_from_u64(state.seed);
487 }
488
489 pub async fn simulate_random_delay(&self) {
490 use rand::prelude::*;
491 use smol::future::yield_now;
492 if self.state.lock().rng.gen_bool(0.2) {
493 let yields = self.state.lock().rng.gen_range(1..=10);
494 for _ in 0..yields {
495 yield_now().await;
496 }
497 }
498 }
499
500 pub fn record_backtrace(&self) {
501 let mut state = self.state.lock();
502 if state.enable_runnable_backtraces {
503 let current_id = state
504 .poll_history
505 .iter()
506 .rev()
507 .find_map(|event| match event {
508 ExecutorEvent::PollRunnable { id } => Some(*id),
509 _ => None,
510 });
511 if let Some(id) = current_id {
512 state
513 .runnable_backtraces
514 .insert(id, backtrace::Backtrace::new_unresolved());
515 }
516 }
517 }
518}
519
520impl Drop for Timer {
521 fn drop(&mut self) {
522 #[cfg(any(test, feature = "test-support"))]
523 if let Timer::Deterministic(DeterministicTimer { state, id, .. }) = self {
524 state
525 .lock()
526 .pending_timers
527 .retain(|(timer_id, _, _)| timer_id != id)
528 }
529 }
530}
531
532impl Future for Timer {
533 type Output = ();
534
535 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
536 match &mut *self {
537 #[cfg(any(test, feature = "test-support"))]
538 Self::Deterministic(DeterministicTimer { rx, .. }) => {
539 use postage::stream::{PollRecv, Stream as _};
540 smol::pin!(rx);
541 match rx.poll_recv(&mut postage::Context::from_waker(cx.waker())) {
542 PollRecv::Ready(()) | PollRecv::Closed => Poll::Ready(()),
543 PollRecv::Pending => Poll::Pending,
544 }
545 }
546 Self::Production(timer) => {
547 smol::pin!(timer);
548 match timer.poll(cx) {
549 Poll::Ready(_) => Poll::Ready(()),
550 Poll::Pending => Poll::Pending,
551 }
552 }
553 }
554 }
555}
556
557#[cfg(any(test, feature = "test-support"))]
558impl DeterministicState {
559 fn push_to_history(&mut self, event: ExecutorEvent) {
560 use std::fmt::Write as _;
561
562 self.poll_history.push(event);
563 if let Some(prev_history) = &self.previous_poll_history {
564 let ix = self.poll_history.len() - 1;
565 let prev_event = prev_history[ix];
566 if event != prev_event {
567 let mut message = String::new();
568 writeln!(
569 &mut message,
570 "current runnable backtrace:\n{:?}",
571 self.runnable_backtraces.get_mut(&event.id()).map(|trace| {
572 trace.resolve();
573 util::CwdBacktrace(trace)
574 })
575 )
576 .unwrap();
577 writeln!(
578 &mut message,
579 "previous runnable backtrace:\n{:?}",
580 self.runnable_backtraces
581 .get_mut(&prev_event.id())
582 .map(|trace| {
583 trace.resolve();
584 util::CwdBacktrace(trace)
585 })
586 )
587 .unwrap();
588 panic!("detected non-determinism after {ix}. {message}");
589 }
590 }
591 }
592
593 fn will_park(&mut self) {
594 if self.forbid_parking {
595 let mut backtrace_message = String::new();
596 #[cfg(any(test, feature = "test-support"))]
597 if let Some(backtrace) = self.waiting_backtrace.as_mut() {
598 backtrace.resolve();
599 backtrace_message = format!(
600 "\nbacktrace of waiting future:\n{:?}",
601 util::CwdBacktrace(backtrace)
602 );
603 }
604
605 panic!(
606 "deterministic executor parked after a call to forbid_parking{}",
607 backtrace_message
608 );
609 }
610 }
611}
612
613#[cfg(any(test, feature = "test-support"))]
614impl ExecutorEvent {
615 pub fn id(&self) -> usize {
616 match self {
617 ExecutorEvent::PollRunnable { id } => *id,
618 ExecutorEvent::EnqueuRunnable { id } => *id,
619 }
620 }
621}
622
623impl Foreground {
624 pub fn platform(dispatcher: Arc<dyn platform::Dispatcher>) -> Result<Self> {
625 if dispatcher.is_main_thread() {
626 Ok(Self::Platform {
627 dispatcher,
628 _not_send_or_sync: PhantomData,
629 })
630 } else {
631 Err(anyhow!("must be constructed on main thread"))
632 }
633 }
634
635 pub fn spawn<T: 'static>(&self, future: impl Future<Output = T> + 'static) -> Task<T> {
636 let future = any_local_future(future);
637 let any_task = match self {
638 #[cfg(any(test, feature = "test-support"))]
639 Self::Deterministic { cx_id, executor } => {
640 executor.spawn_from_foreground(*cx_id, future, false)
641 }
642 Self::Platform { dispatcher, .. } => {
643 fn spawn_inner(
644 future: AnyLocalFuture,
645 dispatcher: &Arc<dyn Dispatcher>,
646 ) -> AnyLocalTask {
647 let dispatcher = dispatcher.clone();
648 let schedule =
649 move |runnable: Runnable| dispatcher.run_on_main_thread(runnable);
650 let (runnable, task) = async_task::spawn_local(future, schedule);
651 runnable.schedule();
652 task
653 }
654 spawn_inner(future, dispatcher)
655 }
656 };
657 Task::local(any_task)
658 }
659
660 #[cfg(any(test, feature = "test-support"))]
661 pub fn run<T: 'static>(&self, future: impl Future<Output = T>) -> T {
662 let future = async move { Box::new(future.await) as Box<dyn Any> }.boxed_local();
663 let result = match self {
664 Self::Deterministic { cx_id, executor } => executor.run(*cx_id, future),
665 Self::Platform { .. } => panic!("you can't call run on a platform foreground executor"),
666 };
667 *result.downcast().unwrap()
668 }
669
670 #[cfg(any(test, feature = "test-support"))]
671 pub fn run_until_parked(&self) {
672 match self {
673 Self::Deterministic { executor, .. } => executor.run_until_parked(),
674 _ => panic!("this method can only be called on a deterministic executor"),
675 }
676 }
677
678 #[cfg(any(test, feature = "test-support"))]
679 pub fn parking_forbidden(&self) -> bool {
680 match self {
681 Self::Deterministic { executor, .. } => executor.state.lock().forbid_parking,
682 _ => panic!("this method can only be called on a deterministic executor"),
683 }
684 }
685
686 #[cfg(any(test, feature = "test-support"))]
687 pub fn start_waiting(&self) {
688 match self {
689 Self::Deterministic { executor, .. } => executor.start_waiting(),
690 _ => panic!("this method can only be called on a deterministic executor"),
691 }
692 }
693
694 #[cfg(any(test, feature = "test-support"))]
695 pub fn finish_waiting(&self) {
696 match self {
697 Self::Deterministic { executor, .. } => executor.finish_waiting(),
698 _ => panic!("this method can only be called on a deterministic executor"),
699 }
700 }
701
702 #[cfg(any(test, feature = "test-support"))]
703 pub fn forbid_parking(&self) {
704 match self {
705 Self::Deterministic { executor, .. } => executor.forbid_parking(),
706 _ => panic!("this method can only be called on a deterministic executor"),
707 }
708 }
709
710 #[cfg(any(test, feature = "test-support"))]
711 pub fn allow_parking(&self) {
712 match self {
713 Self::Deterministic { executor, .. } => executor.allow_parking(),
714 _ => panic!("this method can only be called on a deterministic executor"),
715 }
716 }
717
718 #[cfg(any(test, feature = "test-support"))]
719 pub fn advance_clock(&self, duration: Duration) {
720 match self {
721 Self::Deterministic { executor, .. } => executor.advance_clock(duration),
722 _ => panic!("this method can only be called on a deterministic executor"),
723 }
724 }
725
726 #[cfg(any(test, feature = "test-support"))]
727 pub fn set_block_on_ticks(&self, range: std::ops::RangeInclusive<usize>) {
728 match self {
729 Self::Deterministic { executor, .. } => executor.state.lock().block_on_ticks = range,
730 _ => panic!("this method can only be called on a deterministic executor"),
731 }
732 }
733}
734
735impl Background {
736 pub fn new() -> Self {
737 let executor = Arc::new(Executor::new());
738 let stop = channel::unbounded::<()>();
739
740 for i in 0..2 * num_cpus::get() {
741 let executor = executor.clone();
742 let stop = stop.1.clone();
743 thread::Builder::new()
744 .name(format!("background-executor-{}", i))
745 .spawn(move || smol::block_on(executor.run(stop.recv())))
746 .unwrap();
747 }
748
749 Self::Production {
750 executor,
751 _stop: stop.0,
752 }
753 }
754
755 pub fn num_cpus(&self) -> usize {
756 num_cpus::get()
757 }
758
759 pub fn spawn<T, F>(&self, future: F) -> Task<T>
760 where
761 T: 'static + Send,
762 F: Send + Future<Output = T> + 'static,
763 {
764 let future = any_future(future);
765 let any_task = match self {
766 Self::Production { executor, .. } => executor.spawn(future),
767 #[cfg(any(test, feature = "test-support"))]
768 Self::Deterministic { executor } => executor.spawn(future),
769 };
770 Task::send(any_task)
771 }
772
773 pub fn block<F, T>(&self, future: F) -> T
774 where
775 F: Future<Output = T>,
776 {
777 smol::pin!(future);
778 match self {
779 Self::Production { .. } => smol::block_on(&mut future),
780 #[cfg(any(test, feature = "test-support"))]
781 Self::Deterministic { executor, .. } => {
782 executor.block(&mut future, usize::MAX).unwrap()
783 }
784 }
785 }
786
787 pub fn block_with_timeout<F, T>(
788 &self,
789 timeout: Duration,
790 future: F,
791 ) -> Result<T, impl Future<Output = T>>
792 where
793 T: 'static,
794 F: 'static + Unpin + Future<Output = T>,
795 {
796 let mut future = any_local_future(future);
797 if !timeout.is_zero() {
798 let output = match self {
799 Self::Production { .. } => smol::block_on(util::timeout(timeout, &mut future)).ok(),
800 #[cfg(any(test, feature = "test-support"))]
801 Self::Deterministic { executor, .. } => {
802 use rand::prelude::*;
803 let max_ticks = {
804 let mut state = executor.state.lock();
805 let range = state.block_on_ticks.clone();
806 state.rng.gen_range(range)
807 };
808 executor.block(&mut future, max_ticks)
809 }
810 };
811 if let Some(output) = output {
812 return Ok(*output.downcast().unwrap());
813 }
814 }
815 Err(async { *future.await.downcast().unwrap() })
816 }
817
818 pub async fn scoped<'scope, F>(self: &Arc<Self>, scheduler: F)
819 where
820 F: FnOnce(&mut Scope<'scope>),
821 {
822 let mut scope = Scope::new(self.clone());
823 (scheduler)(&mut scope);
824 let spawned = mem::take(&mut scope.futures)
825 .into_iter()
826 .map(|f| self.spawn(f))
827 .collect::<Vec<_>>();
828 for task in spawned {
829 task.await;
830 }
831 }
832
833 pub fn timer(&self, duration: Duration) -> Timer {
834 match self {
835 Background::Production { .. } => Timer::Production(smol::Timer::after(duration)),
836 #[cfg(any(test, feature = "test-support"))]
837 Background::Deterministic { executor } => executor.timer(duration),
838 }
839 }
840
841 pub fn now(&self) -> std::time::Instant {
842 match self {
843 Background::Production { .. } => std::time::Instant::now(),
844 #[cfg(any(test, feature = "test-support"))]
845 Background::Deterministic { executor } => executor.now(),
846 }
847 }
848
849 #[cfg(any(test, feature = "test-support"))]
850 pub fn rng<'a>(&'a self) -> impl 'a + std::ops::DerefMut<Target = rand::prelude::StdRng> {
851 match self {
852 Self::Deterministic { executor, .. } => {
853 parking_lot::lock_api::MutexGuard::map(executor.state.lock(), |s| &mut s.rng)
854 }
855 _ => panic!("this method can only be called on a deterministic executor"),
856 }
857 }
858
859 #[cfg(any(test, feature = "test-support"))]
860 pub async fn simulate_random_delay(&self) {
861 match self {
862 Self::Deterministic { executor, .. } => {
863 executor.simulate_random_delay().await;
864 }
865 _ => {
866 panic!("this method can only be called on a deterministic executor")
867 }
868 }
869 }
870
871 #[cfg(any(test, feature = "test-support"))]
872 pub fn record_backtrace(&self) {
873 match self {
874 Self::Deterministic { executor, .. } => executor.record_backtrace(),
875 _ => {
876 panic!("this method can only be called on a deterministic executor")
877 }
878 }
879 }
880
881 #[cfg(any(test, feature = "test-support"))]
882 pub fn start_waiting(&self) {
883 match self {
884 Self::Deterministic { executor, .. } => executor.start_waiting(),
885 _ => panic!("this method can only be called on a deterministic executor"),
886 }
887 }
888}
889
890impl Default for Background {
891 fn default() -> Self {
892 Self::new()
893 }
894}
895
896pub struct Scope<'a> {
897 executor: Arc<Background>,
898 futures: Vec<Pin<Box<dyn Future<Output = ()> + Send + 'static>>>,
899 tx: Option<mpsc::Sender<()>>,
900 rx: mpsc::Receiver<()>,
901 _phantom: PhantomData<&'a ()>,
902}
903
904impl<'a> Scope<'a> {
905 fn new(executor: Arc<Background>) -> Self {
906 let (tx, rx) = mpsc::channel(1);
907 Self {
908 executor,
909 tx: Some(tx),
910 rx,
911 futures: Default::default(),
912 _phantom: PhantomData,
913 }
914 }
915
916 pub fn spawn<F>(&mut self, f: F)
917 where
918 F: Future<Output = ()> + Send + 'a,
919 {
920 let tx = self.tx.clone().unwrap();
921
922 // Safety: The 'a lifetime is guaranteed to outlive any of these futures because
923 // dropping this `Scope` blocks until all of the futures have resolved.
924 let f = unsafe {
925 mem::transmute::<
926 Pin<Box<dyn Future<Output = ()> + Send + 'a>>,
927 Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
928 >(Box::pin(async move {
929 f.await;
930 drop(tx);
931 }))
932 };
933 self.futures.push(f);
934 }
935}
936
937impl<'a> Drop for Scope<'a> {
938 fn drop(&mut self) {
939 self.tx.take().unwrap();
940
941 // Wait until the channel is closed, which means that all of the spawned
942 // futures have resolved.
943 self.executor.block(self.rx.next());
944 }
945}
946
947impl<T> Task<T> {
948 pub fn ready(value: T) -> Self {
949 Self::Ready(Some(value))
950 }
951
952 fn local(any_task: AnyLocalTask) -> Self {
953 Self::Local {
954 any_task,
955 result_type: PhantomData,
956 }
957 }
958
959 pub fn detach(self) {
960 match self {
961 Task::Ready(_) => {}
962 Task::Local { any_task, .. } => any_task.detach(),
963 Task::Send { any_task, .. } => any_task.detach(),
964 }
965 }
966}
967
968impl<T: 'static, E: 'static + Display> Task<Result<T, E>> {
969 #[track_caller]
970 pub fn detach_and_log_err(self, cx: &mut AppContext) {
971 let caller = Location::caller();
972 cx.spawn(|_| async move {
973 if let Err(err) = self.await {
974 log::error!("{}:{}: {:#}", caller.file(), caller.line(), err);
975 }
976 })
977 .detach();
978 }
979}
980
981impl<T: Send> Task<T> {
982 fn send(any_task: AnyTask) -> Self {
983 Self::Send {
984 any_task,
985 result_type: PhantomData,
986 }
987 }
988}
989
990impl<T: fmt::Debug> fmt::Debug for Task<T> {
991 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
992 match self {
993 Task::Ready(value) => value.fmt(f),
994 Task::Local { any_task, .. } => any_task.fmt(f),
995 Task::Send { any_task, .. } => any_task.fmt(f),
996 }
997 }
998}
999
1000impl<T: 'static> Future for Task<T> {
1001 type Output = T;
1002
1003 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1004 match unsafe { self.get_unchecked_mut() } {
1005 Task::Ready(value) => Poll::Ready(value.take().unwrap()),
1006 Task::Local { any_task, .. } => {
1007 any_task.poll(cx).map(|value| *value.downcast().unwrap())
1008 }
1009 Task::Send { any_task, .. } => {
1010 any_task.poll(cx).map(|value| *value.downcast().unwrap())
1011 }
1012 }
1013 }
1014}
1015
1016fn any_future<T, F>(future: F) -> AnyFuture
1017where
1018 T: 'static + Send,
1019 F: Future<Output = T> + Send + 'static,
1020{
1021 async { Box::new(future.await) as Box<dyn Any + Send> }.boxed()
1022}
1023
1024fn any_local_future<T, F>(future: F) -> AnyLocalFuture
1025where
1026 T: 'static,
1027 F: Future<Output = T> + 'static,
1028{
1029 async { Box::new(future.await) as Box<dyn Any> }.boxed_local()
1030}