1use crate::util;
2use crate::PlatformDispatcher;
3use anyhow::{anyhow, Result};
4use async_task::Runnable;
5use futures::channel::{mpsc, oneshot};
6use smol::{channel, prelude::*, Executor};
7use std::{
8 any::Any,
9 fmt::{self},
10 marker::PhantomData,
11 mem,
12 pin::Pin,
13 rc::Rc,
14 sync::Arc,
15 task::{Context, Poll},
16 thread,
17 time::Duration,
18};
19
20/// Enqueues the given closure to run on the application's event loop.
21/// Returns the result asynchronously.
22pub(crate) fn run_on_main<F, R>(
23 dispatcher: Arc<dyn PlatformDispatcher>,
24 func: F,
25) -> impl Future<Output = R>
26where
27 F: FnOnce() -> R + Send + 'static,
28 R: Send + 'static,
29{
30 let (tx, rx) = oneshot::channel();
31 if dispatcher.is_main_thread() {
32 let _ = tx.send(func());
33 } else {
34 let _ = spawn_on_main(dispatcher, move || async move {
35 let _ = tx.send(func());
36 });
37 }
38 async move { rx.await.unwrap() }
39}
40
41/// Enqueues the given closure to be run on the application's event loop. The
42/// closure returns a future which will be run to completion on the main thread.
43pub(crate) fn spawn_on_main<F, R>(
44 dispatcher: Arc<dyn PlatformDispatcher>,
45 func: impl FnOnce() -> F + Send + 'static,
46) -> impl Future<Output = R>
47where
48 F: Future<Output = R> + 'static,
49 R: Send + 'static,
50{
51 let (tx, rx) = oneshot::channel();
52 let (runnable, task) = async_task::spawn(
53 {
54 let dispatcher = dispatcher.clone();
55 async move {
56 let future = func();
57 let _ = spawn_on_main_local(dispatcher, async move {
58 let result = future.await;
59 let _ = tx.send(result);
60 });
61 }
62 },
63 move |runnable| dispatcher.run_on_main_thread(runnable),
64 );
65 runnable.schedule();
66 task.detach();
67 async move { rx.await.unwrap() }
68}
69
70/// Enqueues the given closure to be run on the application's event loop. Must
71/// be called on the main thread.
72pub(crate) fn spawn_on_main_local<R>(
73 dispatcher: Arc<dyn PlatformDispatcher>,
74 future: impl Future<Output = R> + 'static,
75) -> impl Future<Output = R>
76where
77 R: 'static,
78{
79 assert!(dispatcher.is_main_thread(), "must be called on main thread");
80
81 let (tx, rx) = oneshot::channel();
82 let (runnable, task) = async_task::spawn_local(
83 async move {
84 let result = future.await;
85 let _ = tx.send(result);
86 },
87 move |runnable| dispatcher.run_on_main_thread(runnable),
88 );
89 runnable.schedule();
90 task.detach();
91 async move { rx.await.unwrap() }
92}
93
94pub enum ForegroundExecutor {
95 Platform {
96 dispatcher: Arc<dyn PlatformDispatcher>,
97 _not_send_or_sync: PhantomData<Rc<()>>,
98 },
99 #[cfg(any(test, feature = "test"))]
100 Deterministic {
101 cx_id: usize,
102 executor: Arc<Deterministic>,
103 },
104}
105
106pub enum BackgroundExecutor {
107 #[cfg(any(test, feature = "test"))]
108 Deterministic { executor: Arc<Deterministic> },
109 Production {
110 executor: Arc<smol::Executor<'static>>,
111 _stop: channel::Sender<()>,
112 },
113}
114
115type AnyLocalFuture = Pin<Box<dyn 'static + Future<Output = Box<dyn Any + 'static>>>>;
116type AnyFuture = Pin<Box<dyn 'static + Send + Future<Output = Box<dyn Any + Send + 'static>>>>;
117type AnyTask = async_task::Task<Box<dyn Any + Send + 'static>>;
118type AnyLocalTask = async_task::Task<Box<dyn Any + 'static>>;
119
120#[must_use]
121pub enum Task<T> {
122 Ready(Option<T>),
123 Local {
124 any_task: AnyLocalTask,
125 result_type: PhantomData<T>,
126 },
127 Send {
128 any_task: AnyTask,
129 result_type: PhantomData<T>,
130 },
131}
132
133unsafe impl<T: Send> Send for Task<T> {}
134
135#[cfg(any(test, feature = "test"))]
136struct DeterministicState {
137 rng: rand::prelude::StdRng,
138 seed: u64,
139 scheduled_from_foreground: collections::HashMap<usize, Vec<ForegroundRunnable>>,
140 scheduled_from_background: Vec<BackgroundRunnable>,
141 forbid_parking: bool,
142 block_on_ticks: std::ops::RangeInclusive<usize>,
143 now: std::time::Instant,
144 next_timer_id: usize,
145 pending_timers: Vec<(usize, std::time::Instant, postage::barrier::Sender)>,
146 waiting_backtrace: Option<backtrace::Backtrace>,
147 next_runnable_id: usize,
148 poll_history: Vec<ExecutorEvent>,
149 previous_poll_history: Option<Vec<ExecutorEvent>>,
150 enable_runnable_backtraces: bool,
151 runnable_backtraces: collections::HashMap<usize, backtrace::Backtrace>,
152}
153
154#[derive(Copy, Clone, Debug, PartialEq, Eq)]
155pub enum ExecutorEvent {
156 PollRunnable { id: usize },
157 EnqueuRunnable { id: usize },
158}
159
160#[cfg(any(test, feature = "test"))]
161struct ForegroundRunnable {
162 id: usize,
163 runnable: Runnable,
164 main: bool,
165}
166
167#[cfg(any(test, feature = "test"))]
168struct BackgroundRunnable {
169 id: usize,
170 runnable: Runnable,
171}
172
173#[cfg(any(test, feature = "test"))]
174pub struct Deterministic {
175 state: Arc<parking_lot::Mutex<DeterministicState>>,
176 parker: parking_lot::Mutex<parking::Parker>,
177}
178
179#[must_use]
180pub enum Timer {
181 Production(smol::Timer),
182 #[cfg(any(test, feature = "test"))]
183 Deterministic(DeterministicTimer),
184}
185
186#[cfg(any(test, feature = "test"))]
187pub struct DeterministicTimer {
188 rx: postage::barrier::Receiver,
189 id: usize,
190 state: Arc<parking_lot::Mutex<DeterministicState>>,
191}
192
193#[cfg(any(test, feature = "test"))]
194impl Deterministic {
195 pub fn new(seed: u64) -> Arc<Self> {
196 use rand::prelude::*;
197
198 Arc::new(Self {
199 state: Arc::new(parking_lot::Mutex::new(DeterministicState {
200 rng: StdRng::seed_from_u64(seed),
201 seed,
202 scheduled_from_foreground: Default::default(),
203 scheduled_from_background: Default::default(),
204 forbid_parking: false,
205 block_on_ticks: 0..=1000,
206 now: std::time::Instant::now(),
207 next_timer_id: Default::default(),
208 pending_timers: Default::default(),
209 waiting_backtrace: None,
210 next_runnable_id: 0,
211 poll_history: Default::default(),
212 previous_poll_history: Default::default(),
213 enable_runnable_backtraces: false,
214 runnable_backtraces: Default::default(),
215 })),
216 parker: Default::default(),
217 })
218 }
219
220 pub fn execution_history(&self) -> Vec<ExecutorEvent> {
221 self.state.lock().poll_history.clone()
222 }
223
224 pub fn set_previous_execution_history(&self, history: Option<Vec<ExecutorEvent>>) {
225 self.state.lock().previous_poll_history = history;
226 }
227
228 pub fn enable_runnable_backtrace(&self) {
229 self.state.lock().enable_runnable_backtraces = true;
230 }
231
232 pub fn runnable_backtrace(&self, runnable_id: usize) -> backtrace::Backtrace {
233 let mut backtrace = self.state.lock().runnable_backtraces[&runnable_id].clone();
234 backtrace.resolve();
235 backtrace
236 }
237
238 pub fn build_background(self: &Arc<Self>) -> Arc<BackgroundExecutor> {
239 Arc::new(BackgroundExecutor::Deterministic {
240 executor: self.clone(),
241 })
242 }
243
244 pub fn build_foreground(self: &Arc<Self>, id: usize) -> Rc<ForegroundExecutor> {
245 Rc::new(ForegroundExecutor::Deterministic {
246 cx_id: id,
247 executor: self.clone(),
248 })
249 }
250
251 fn spawn_from_foreground(
252 &self,
253 cx_id: usize,
254 future: AnyLocalFuture,
255 main: bool,
256 ) -> AnyLocalTask {
257 let state = self.state.clone();
258 let id;
259 {
260 let mut state = state.lock();
261 id = util::post_inc(&mut state.next_runnable_id);
262 if state.enable_runnable_backtraces {
263 state
264 .runnable_backtraces
265 .insert(id, backtrace::Backtrace::new_unresolved());
266 }
267 }
268
269 let unparker = self.parker.lock().unparker();
270 let (runnable, task) = async_task::spawn_local(future, move |runnable| {
271 let mut state = state.lock();
272 state.push_to_history(ExecutorEvent::EnqueuRunnable { id });
273 state
274 .scheduled_from_foreground
275 .entry(cx_id)
276 .or_default()
277 .push(ForegroundRunnable { id, runnable, main });
278 unparker.unpark();
279 });
280 runnable.schedule();
281 task
282 }
283
284 fn spawn(&self, future: AnyFuture) -> AnyTask {
285 let state = self.state.clone();
286 let id;
287 {
288 let mut state = state.lock();
289 id = util::post_inc(&mut state.next_runnable_id);
290 if state.enable_runnable_backtraces {
291 state
292 .runnable_backtraces
293 .insert(id, backtrace::Backtrace::new_unresolved());
294 }
295 }
296
297 let unparker = self.parker.lock().unparker();
298 let (runnable, task) = async_task::spawn(future, move |runnable| {
299 let mut state = state.lock();
300 state
301 .poll_history
302 .push(ExecutorEvent::EnqueuRunnable { id });
303 state
304 .scheduled_from_background
305 .push(BackgroundRunnable { id, runnable });
306 unparker.unpark();
307 });
308 runnable.schedule();
309 task
310 }
311
312 fn run<'a>(
313 &self,
314 cx_id: usize,
315 main_future: Pin<Box<dyn 'a + Future<Output = Box<dyn Any>>>>,
316 ) -> Box<dyn Any> {
317 use std::sync::atomic::{AtomicBool, Ordering::SeqCst};
318
319 let woken = Arc::new(AtomicBool::new(false));
320
321 let state = self.state.clone();
322 let id;
323 {
324 let mut state = state.lock();
325 id = util::post_inc(&mut state.next_runnable_id);
326 if state.enable_runnable_backtraces {
327 state
328 .runnable_backtraces
329 .insert(id, backtrace::Backtrace::new_unresolved());
330 }
331 }
332
333 let unparker = self.parker.lock().unparker();
334 let (runnable, mut main_task) = unsafe {
335 async_task::spawn_unchecked(main_future, move |runnable| {
336 let state = &mut *state.lock();
337 state
338 .scheduled_from_foreground
339 .entry(cx_id)
340 .or_default()
341 .push(ForegroundRunnable {
342 id: util::post_inc(&mut state.next_runnable_id),
343 runnable,
344 main: true,
345 });
346 unparker.unpark();
347 })
348 };
349 runnable.schedule();
350
351 loop {
352 if let Some(result) = self.run_internal(woken.clone(), Some(&mut main_task)) {
353 return result;
354 }
355
356 if !woken.load(SeqCst) {
357 self.state.lock().will_park();
358 }
359
360 woken.store(false, SeqCst);
361 self.parker.lock().park();
362 }
363 }
364
365 pub fn run_until_parked(&self) {
366 use std::sync::atomic::AtomicBool;
367 let woken = Arc::new(AtomicBool::new(false));
368 self.run_internal(woken, None);
369 }
370
371 fn run_internal(
372 &self,
373 woken: Arc<std::sync::atomic::AtomicBool>,
374 mut main_task: Option<&mut AnyLocalTask>,
375 ) -> Option<Box<dyn Any>> {
376 use rand::prelude::*;
377 use std::sync::atomic::Ordering::SeqCst;
378
379 let unparker = self.parker.lock().unparker();
380 let waker = waker_fn::waker_fn(move || {
381 woken.store(true, SeqCst);
382 unparker.unpark();
383 });
384
385 let mut cx = Context::from_waker(&waker);
386 loop {
387 let mut state = self.state.lock();
388
389 if state.scheduled_from_foreground.is_empty()
390 && state.scheduled_from_background.is_empty()
391 {
392 if let Some(main_task) = main_task {
393 if let Poll::Ready(result) = main_task.poll(&mut cx) {
394 return Some(result);
395 }
396 }
397
398 return None;
399 }
400
401 if !state.scheduled_from_background.is_empty() && state.rng.gen() {
402 let background_len = state.scheduled_from_background.len();
403 let ix = state.rng.gen_range(0..background_len);
404 let background_runnable = state.scheduled_from_background.remove(ix);
405 state.push_to_history(ExecutorEvent::PollRunnable {
406 id: background_runnable.id,
407 });
408 drop(state);
409 background_runnable.runnable.run();
410 } else if !state.scheduled_from_foreground.is_empty() {
411 let available_cx_ids = state
412 .scheduled_from_foreground
413 .keys()
414 .copied()
415 .collect::<Vec<_>>();
416 let cx_id_to_run = *available_cx_ids.iter().choose(&mut state.rng).unwrap();
417 let scheduled_from_cx = state
418 .scheduled_from_foreground
419 .get_mut(&cx_id_to_run)
420 .unwrap();
421 let foreground_runnable = scheduled_from_cx.remove(0);
422 if scheduled_from_cx.is_empty() {
423 state.scheduled_from_foreground.remove(&cx_id_to_run);
424 }
425 state.push_to_history(ExecutorEvent::PollRunnable {
426 id: foreground_runnable.id,
427 });
428
429 drop(state);
430
431 foreground_runnable.runnable.run();
432 if let Some(main_task) = main_task.as_mut() {
433 if foreground_runnable.main {
434 if let Poll::Ready(result) = main_task.poll(&mut cx) {
435 return Some(result);
436 }
437 }
438 }
439 }
440 }
441 }
442
443 fn block<F, T>(&self, future: &mut F, max_ticks: usize) -> Option<T>
444 where
445 F: Unpin + Future<Output = T>,
446 {
447 use rand::prelude::*;
448
449 let unparker = self.parker.lock().unparker();
450 let waker = waker_fn::waker_fn(move || {
451 unparker.unpark();
452 });
453
454 let mut cx = Context::from_waker(&waker);
455 for _ in 0..max_ticks {
456 let mut state = self.state.lock();
457 let runnable_count = state.scheduled_from_background.len();
458 let ix = state.rng.gen_range(0..=runnable_count);
459 if ix < state.scheduled_from_background.len() {
460 let background_runnable = state.scheduled_from_background.remove(ix);
461 state.push_to_history(ExecutorEvent::PollRunnable {
462 id: background_runnable.id,
463 });
464 drop(state);
465 background_runnable.runnable.run();
466 } else {
467 drop(state);
468 if let Poll::Ready(result) = future.poll(&mut cx) {
469 return Some(result);
470 }
471 let mut state = self.state.lock();
472 if state.scheduled_from_background.is_empty() {
473 state.will_park();
474 drop(state);
475 self.parker.lock().park();
476 }
477
478 continue;
479 }
480 }
481
482 None
483 }
484
485 pub fn timer(&self, duration: Duration) -> Timer {
486 let (tx, rx) = postage::barrier::channel();
487 let mut state = self.state.lock();
488 let wakeup_at = state.now + duration;
489 let id = util::post_inc(&mut state.next_timer_id);
490 match state
491 .pending_timers
492 .binary_search_by_key(&wakeup_at, |e| e.1)
493 {
494 Ok(ix) | Err(ix) => state.pending_timers.insert(ix, (id, wakeup_at, tx)),
495 }
496 let state = self.state.clone();
497 Timer::Deterministic(DeterministicTimer { rx, id, state })
498 }
499
500 pub fn now(&self) -> std::time::Instant {
501 let state = self.state.lock();
502 state.now
503 }
504
505 pub fn advance_clock(&self, duration: Duration) {
506 let new_now = self.state.lock().now + duration;
507 loop {
508 self.run_until_parked();
509 let mut state = self.state.lock();
510
511 if let Some((_, wakeup_time, _)) = state.pending_timers.first() {
512 let wakeup_time = *wakeup_time;
513 if wakeup_time <= new_now {
514 let timer_count = state
515 .pending_timers
516 .iter()
517 .take_while(|(_, t, _)| *t == wakeup_time)
518 .count();
519 state.now = wakeup_time;
520 let timers_to_wake = state
521 .pending_timers
522 .drain(0..timer_count)
523 .collect::<Vec<_>>();
524 drop(state);
525 drop(timers_to_wake);
526 continue;
527 }
528 }
529
530 break;
531 }
532
533 self.state.lock().now = new_now;
534 }
535
536 pub fn start_waiting(&self) {
537 self.state.lock().waiting_backtrace = Some(backtrace::Backtrace::new_unresolved());
538 }
539
540 pub fn finish_waiting(&self) {
541 self.state.lock().waiting_backtrace.take();
542 }
543
544 pub fn forbid_parking(&self) {
545 use rand::prelude::*;
546
547 let mut state = self.state.lock();
548 state.forbid_parking = true;
549 state.rng = StdRng::seed_from_u64(state.seed);
550 }
551
552 pub fn allow_parking(&self) {
553 use rand::prelude::*;
554
555 let mut state = self.state.lock();
556 state.forbid_parking = false;
557 state.rng = StdRng::seed_from_u64(state.seed);
558 }
559
560 pub async fn simulate_random_delay(&self) {
561 use rand::prelude::*;
562 use smol::future::yield_now;
563 if self.state.lock().rng.gen_bool(0.2) {
564 let yields = self.state.lock().rng.gen_range(1..=10);
565 for _ in 0..yields {
566 yield_now().await;
567 }
568 }
569 }
570
571 pub fn record_backtrace(&self) {
572 let mut state = self.state.lock();
573 if state.enable_runnable_backtraces {
574 let current_id = state
575 .poll_history
576 .iter()
577 .rev()
578 .find_map(|event| match event {
579 ExecutorEvent::PollRunnable { id } => Some(*id),
580 _ => None,
581 });
582 if let Some(id) = current_id {
583 state
584 .runnable_backtraces
585 .insert(id, backtrace::Backtrace::new_unresolved());
586 }
587 }
588 }
589}
590
591impl Drop for Timer {
592 fn drop(&mut self) {
593 #[cfg(any(test, feature = "test"))]
594 if let Timer::Deterministic(DeterministicTimer { state, id, .. }) = self {
595 state
596 .lock()
597 .pending_timers
598 .retain(|(timer_id, _, _)| timer_id != id)
599 }
600 }
601}
602
603impl Future for Timer {
604 type Output = ();
605
606 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
607 match &mut *self {
608 #[cfg(any(test, feature = "test"))]
609 Self::Deterministic(DeterministicTimer { rx, .. }) => {
610 use postage::stream::{PollRecv, Stream as _};
611 smol::pin!(rx);
612 match rx.poll_recv(&mut postage::Context::from_waker(cx.waker())) {
613 PollRecv::Ready(()) | PollRecv::Closed => Poll::Ready(()),
614 PollRecv::Pending => Poll::Pending,
615 }
616 }
617 Self::Production(timer) => {
618 smol::pin!(timer);
619 match timer.poll(cx) {
620 Poll::Ready(_) => Poll::Ready(()),
621 Poll::Pending => Poll::Pending,
622 }
623 }
624 }
625 }
626}
627
628#[cfg(any(test, feature = "test"))]
629impl DeterministicState {
630 fn push_to_history(&mut self, event: ExecutorEvent) {
631 use std::fmt::Write as _;
632
633 self.poll_history.push(event);
634 if let Some(prev_history) = &self.previous_poll_history {
635 let ix = self.poll_history.len() - 1;
636 let prev_event = prev_history[ix];
637 if event != prev_event {
638 let mut message = String::new();
639 writeln!(
640 &mut message,
641 "current runnable backtrace:\n{:?}",
642 self.runnable_backtraces.get_mut(&event.id()).map(|trace| {
643 trace.resolve();
644 crate::util::CwdBacktrace(trace)
645 })
646 )
647 .unwrap();
648 writeln!(
649 &mut message,
650 "previous runnable backtrace:\n{:?}",
651 self.runnable_backtraces
652 .get_mut(&prev_event.id())
653 .map(|trace| {
654 trace.resolve();
655 util::CwdBacktrace(trace)
656 })
657 )
658 .unwrap();
659 panic!("detected non-determinism after {ix}. {message}");
660 }
661 }
662 }
663
664 fn will_park(&mut self) {
665 if self.forbid_parking {
666 let mut backtrace_message = String::new();
667 #[cfg(any(test, feature = "test"))]
668 if let Some(backtrace) = self.waiting_backtrace.as_mut() {
669 backtrace.resolve();
670 backtrace_message = format!(
671 "\nbacktrace of waiting future:\n{:?}",
672 util::CwdBacktrace(backtrace)
673 );
674 }
675
676 panic!(
677 "deterministic executor parked after a call to forbid_parking{}",
678 backtrace_message
679 );
680 }
681 }
682}
683
684#[cfg(any(test, feature = "test"))]
685impl ExecutorEvent {
686 pub fn id(&self) -> usize {
687 match self {
688 ExecutorEvent::PollRunnable { id } => *id,
689 ExecutorEvent::EnqueuRunnable { id } => *id,
690 }
691 }
692}
693
694impl ForegroundExecutor {
695 pub fn new(dispatcher: Arc<dyn PlatformDispatcher>) -> Result<Self> {
696 if dispatcher.is_main_thread() {
697 Ok(Self::Platform {
698 dispatcher,
699 _not_send_or_sync: PhantomData,
700 })
701 } else {
702 Err(anyhow!("must be constructed on main thread"))
703 }
704 }
705
706 pub fn spawn<T: 'static>(&self, future: impl Future<Output = T> + 'static) -> Task<T> {
707 let future = any_local_future(future);
708 let any_task = match self {
709 #[cfg(any(test, feature = "test"))]
710 Self::Deterministic { cx_id, executor } => {
711 executor.spawn_from_foreground(*cx_id, future, false)
712 }
713 Self::Platform { dispatcher, .. } => {
714 fn spawn_inner(
715 future: AnyLocalFuture,
716 dispatcher: &Arc<dyn PlatformDispatcher>,
717 ) -> AnyLocalTask {
718 let dispatcher = dispatcher.clone();
719 let schedule =
720 move |runnable: Runnable| dispatcher.run_on_main_thread(runnable);
721 let (runnable, task) = async_task::spawn_local(future, schedule);
722 runnable.schedule();
723 task
724 }
725 spawn_inner(future, dispatcher)
726 }
727 };
728 Task::local(any_task)
729 }
730
731 #[cfg(any(test, feature = "test"))]
732 pub fn run<T: 'static>(&self, future: impl Future<Output = T>) -> T {
733 let future = async move { Box::new(future.await) as Box<dyn Any> }.boxed_local();
734 let result = match self {
735 Self::Deterministic { cx_id, executor } => executor.run(*cx_id, future),
736 Self::Platform { .. } => panic!("you can't call run on a platform foreground executor"),
737 };
738 *result.downcast().unwrap()
739 }
740
741 #[cfg(any(test, feature = "test"))]
742 pub fn run_until_parked(&self) {
743 match self {
744 Self::Deterministic { executor, .. } => executor.run_until_parked(),
745 _ => panic!("this method can only be called on a deterministic executor"),
746 }
747 }
748
749 #[cfg(any(test, feature = "test"))]
750 pub fn parking_forbidden(&self) -> bool {
751 match self {
752 Self::Deterministic { executor, .. } => executor.state.lock().forbid_parking,
753 _ => panic!("this method can only be called on a deterministic executor"),
754 }
755 }
756
757 #[cfg(any(test, feature = "test"))]
758 pub fn start_waiting(&self) {
759 match self {
760 Self::Deterministic { executor, .. } => executor.start_waiting(),
761 _ => panic!("this method can only be called on a deterministic executor"),
762 }
763 }
764
765 #[cfg(any(test, feature = "test"))]
766 pub fn finish_waiting(&self) {
767 match self {
768 Self::Deterministic { executor, .. } => executor.finish_waiting(),
769 _ => panic!("this method can only be called on a deterministic executor"),
770 }
771 }
772
773 #[cfg(any(test, feature = "test"))]
774 pub fn forbid_parking(&self) {
775 match self {
776 Self::Deterministic { executor, .. } => executor.forbid_parking(),
777 _ => panic!("this method can only be called on a deterministic executor"),
778 }
779 }
780
781 #[cfg(any(test, feature = "test"))]
782 pub fn allow_parking(&self) {
783 match self {
784 Self::Deterministic { executor, .. } => executor.allow_parking(),
785 _ => panic!("this method can only be called on a deterministic executor"),
786 }
787 }
788
789 #[cfg(any(test, feature = "test"))]
790 pub fn advance_clock(&self, duration: Duration) {
791 match self {
792 Self::Deterministic { executor, .. } => executor.advance_clock(duration),
793 _ => panic!("this method can only be called on a deterministic executor"),
794 }
795 }
796
797 #[cfg(any(test, feature = "test"))]
798 pub fn set_block_on_ticks(&self, range: std::ops::RangeInclusive<usize>) {
799 match self {
800 Self::Deterministic { executor, .. } => executor.state.lock().block_on_ticks = range,
801 _ => panic!("this method can only be called on a deterministic executor"),
802 }
803 }
804}
805
806impl BackgroundExecutor {
807 pub fn new() -> Self {
808 let executor = Arc::new(Executor::new());
809 let stop = channel::unbounded::<()>();
810
811 for i in 0..2 * num_cpus::get() {
812 let executor = executor.clone();
813 let stop = stop.1.clone();
814 thread::Builder::new()
815 .name(format!("background-executor-{}", i))
816 .spawn(move || smol::block_on(executor.run(stop.recv())))
817 .unwrap();
818 }
819
820 Self::Production {
821 executor,
822 _stop: stop.0,
823 }
824 }
825
826 pub fn num_cpus(&self) -> usize {
827 num_cpus::get()
828 }
829
830 pub fn spawn<T, F>(&self, future: F) -> Task<T>
831 where
832 T: 'static + Send,
833 F: Send + Future<Output = T> + 'static,
834 {
835 let future = any_future(future);
836 let any_task = match self {
837 Self::Production { executor, .. } => executor.spawn(future),
838 #[cfg(any(test, feature = "test"))]
839 Self::Deterministic { executor } => executor.spawn(future),
840 };
841 Task::send(any_task)
842 }
843
844 pub fn block<F, T>(&self, future: F) -> T
845 where
846 F: Future<Output = T>,
847 {
848 smol::pin!(future);
849 match self {
850 Self::Production { .. } => smol::block_on(&mut future),
851 #[cfg(any(test, feature = "test"))]
852 Self::Deterministic { executor, .. } => {
853 executor.block(&mut future, usize::MAX).unwrap()
854 }
855 }
856 }
857
858 pub fn block_with_timeout<F, T>(
859 &self,
860 timeout: Duration,
861 future: F,
862 ) -> Result<T, impl Future<Output = T>>
863 where
864 T: 'static,
865 F: 'static + Unpin + Future<Output = T>,
866 {
867 let mut future = any_local_future(future);
868 if !timeout.is_zero() {
869 let output = match self {
870 Self::Production { .. } => smol::block_on(util::timeout(timeout, &mut future)).ok(),
871 #[cfg(any(test, feature = "test"))]
872 Self::Deterministic { executor, .. } => {
873 use rand::prelude::*;
874 let max_ticks = {
875 let mut state = executor.state.lock();
876 let range = state.block_on_ticks.clone();
877 state.rng.gen_range(range)
878 };
879 executor.block(&mut future, max_ticks)
880 }
881 };
882 if let Some(output) = output {
883 return Ok(*output.downcast().unwrap());
884 }
885 }
886 Err(async { *future.await.downcast().unwrap() })
887 }
888
889 pub async fn scoped<'scope, F>(self: &Arc<Self>, scheduler: F)
890 where
891 F: FnOnce(&mut Scope<'scope>),
892 {
893 let mut scope = Scope::new(self.clone());
894 (scheduler)(&mut scope);
895 let spawned = mem::take(&mut scope.futures)
896 .into_iter()
897 .map(|f| self.spawn(f))
898 .collect::<Vec<_>>();
899 for task in spawned {
900 task.await;
901 }
902 }
903
904 pub fn timer(&self, duration: Duration) -> Timer {
905 match self {
906 BackgroundExecutor::Production { .. } => {
907 Timer::Production(smol::Timer::after(duration))
908 }
909 #[cfg(any(test, feature = "test"))]
910 BackgroundExecutor::Deterministic { executor } => executor.timer(duration),
911 }
912 }
913
914 pub fn now(&self) -> std::time::Instant {
915 match self {
916 BackgroundExecutor::Production { .. } => std::time::Instant::now(),
917 #[cfg(any(test, feature = "test"))]
918 BackgroundExecutor::Deterministic { executor } => executor.now(),
919 }
920 }
921
922 #[cfg(any(test, feature = "test"))]
923 pub fn rng<'a>(&'a self) -> impl 'a + std::ops::DerefMut<Target = rand::prelude::StdRng> {
924 match self {
925 Self::Deterministic { executor, .. } => {
926 parking_lot::lock_api::MutexGuard::map(executor.state.lock(), |s| &mut s.rng)
927 }
928 _ => panic!("this method can only be called on a deterministic executor"),
929 }
930 }
931
932 #[cfg(any(test, feature = "test"))]
933 pub async fn simulate_random_delay(&self) {
934 match self {
935 Self::Deterministic { executor, .. } => {
936 executor.simulate_random_delay().await;
937 }
938 _ => {
939 panic!("this method can only be called on a deterministic executor")
940 }
941 }
942 }
943
944 #[cfg(any(test, feature = "test"))]
945 pub fn record_backtrace(&self) {
946 match self {
947 Self::Deterministic { executor, .. } => executor.record_backtrace(),
948 _ => {
949 panic!("this method can only be called on a deterministic executor")
950 }
951 }
952 }
953
954 #[cfg(any(test, feature = "test"))]
955 pub fn start_waiting(&self) {
956 match self {
957 Self::Deterministic { executor, .. } => executor.start_waiting(),
958 _ => panic!("this method can only be called on a deterministic executor"),
959 }
960 }
961}
962
963impl Default for BackgroundExecutor {
964 fn default() -> Self {
965 Self::new()
966 }
967}
968
969pub struct Scope<'a> {
970 executor: Arc<BackgroundExecutor>,
971 futures: Vec<Pin<Box<dyn Future<Output = ()> + Send + 'static>>>,
972 tx: Option<mpsc::Sender<()>>,
973 rx: mpsc::Receiver<()>,
974 _phantom: PhantomData<&'a ()>,
975}
976
977impl<'a> Scope<'a> {
978 fn new(executor: Arc<BackgroundExecutor>) -> Self {
979 let (tx, rx) = mpsc::channel(1);
980 Self {
981 executor,
982 tx: Some(tx),
983 rx,
984 futures: Default::default(),
985 _phantom: PhantomData,
986 }
987 }
988
989 pub fn spawn<F>(&mut self, f: F)
990 where
991 F: Future<Output = ()> + Send + 'a,
992 {
993 let tx = self.tx.clone().unwrap();
994
995 // Safety: The 'a lifetime is guaranteed to outlive any of these futures because
996 // dropping this `Scope` blocks until all of the futures have resolved.
997 let f = unsafe {
998 mem::transmute::<
999 Pin<Box<dyn Future<Output = ()> + Send + 'a>>,
1000 Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
1001 >(Box::pin(async move {
1002 f.await;
1003 drop(tx);
1004 }))
1005 };
1006 self.futures.push(f);
1007 }
1008}
1009
1010impl<'a> Drop for Scope<'a> {
1011 fn drop(&mut self) {
1012 self.tx.take().unwrap();
1013
1014 // Wait until the channel is closed, which means that all of the spawned
1015 // futures have resolved.
1016 self.executor.block(self.rx.next());
1017 }
1018}
1019
1020impl<T> Task<T> {
1021 pub fn ready(value: T) -> Self {
1022 Self::Ready(Some(value))
1023 }
1024
1025 fn local(any_task: AnyLocalTask) -> Self {
1026 Self::Local {
1027 any_task,
1028 result_type: PhantomData,
1029 }
1030 }
1031
1032 pub fn detach(self) {
1033 match self {
1034 Task::Ready(_) => {}
1035 Task::Local { any_task, .. } => any_task.detach(),
1036 Task::Send { any_task, .. } => any_task.detach(),
1037 }
1038 }
1039}
1040
1041// impl<T: 'static, E: 'static + Display> Task<Result<T, E>> {
1042// #[track_caller]
1043// pub fn detach_and_log_err(self, cx: &mut AppContext) {
1044// let caller = Location::caller();
1045// cx.spawn(|_| async move {
1046// if let Err(err) = self.await {
1047// log::error!("{}:{}: {:#}", caller.file(), caller.line(), err);
1048// }
1049// })
1050// .detach();
1051// }
1052// }
1053
1054impl<T: Send> Task<T> {
1055 fn send(any_task: AnyTask) -> Self {
1056 Self::Send {
1057 any_task,
1058 result_type: PhantomData,
1059 }
1060 }
1061}
1062
1063impl<T: fmt::Debug> fmt::Debug for Task<T> {
1064 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1065 match self {
1066 Task::Ready(value) => value.fmt(f),
1067 Task::Local { any_task, .. } => any_task.fmt(f),
1068 Task::Send { any_task, .. } => any_task.fmt(f),
1069 }
1070 }
1071}
1072
1073impl<T: 'static> Future for Task<T> {
1074 type Output = T;
1075
1076 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1077 match unsafe { self.get_unchecked_mut() } {
1078 Task::Ready(value) => Poll::Ready(value.take().unwrap()),
1079 Task::Local { any_task, .. } => {
1080 any_task.poll(cx).map(|value| *value.downcast().unwrap())
1081 }
1082 Task::Send { any_task, .. } => {
1083 any_task.poll(cx).map(|value| *value.downcast().unwrap())
1084 }
1085 }
1086 }
1087}
1088
1089fn any_future<T, F>(future: F) -> AnyFuture
1090where
1091 T: 'static + Send,
1092 F: Future<Output = T> + Send + 'static,
1093{
1094 async { Box::new(future.await) as Box<dyn Any + Send> }.boxed()
1095}
1096
1097fn any_local_future<T, F>(future: F) -> AnyLocalFuture
1098where
1099 T: 'static,
1100 F: Future<Output = T> + 'static,
1101{
1102 async { Box::new(future.await) as Box<dyn Any> }.boxed_local()
1103}