1use crate::util;
2use crate::PlatformDispatcher;
3use anyhow::{anyhow, Result};
4use async_task::Runnable;
5use futures::channel::{mpsc, oneshot};
6use smol::{channel, prelude::*, Executor};
7use std::{
8 any::Any,
9 fmt::{self},
10 marker::PhantomData,
11 mem,
12 pin::Pin,
13 rc::Rc,
14 sync::Arc,
15 task::{Context, Poll},
16 thread,
17 time::Duration,
18};
19
20/// Enqueues the given closure to run on the application's event loop.
21/// Returns the result asynchronously.
22pub(crate) fn run_on_main<F, R>(
23 dispatcher: Arc<dyn PlatformDispatcher>,
24 func: F,
25) -> impl Future<Output = R>
26where
27 F: FnOnce() -> R + Send + 'static,
28 R: Send + 'static,
29{
30 spawn_on_main(dispatcher, move || async move { func() })
31}
32
33/// Enqueues the given closure to be run on the application's event loop. The
34/// closure returns a future which will be run to completion on the main thread.
35pub(crate) fn spawn_on_main<F, R>(
36 dispatcher: Arc<dyn PlatformDispatcher>,
37 func: impl FnOnce() -> F + Send + 'static,
38) -> impl Future<Output = R>
39where
40 F: Future<Output = R> + 'static,
41 R: Send + 'static,
42{
43 let (tx, rx) = oneshot::channel();
44 let (runnable, task) = async_task::spawn(
45 {
46 let dispatcher = dispatcher.clone();
47 async move {
48 let future = func();
49 let _ = spawn_on_main_local(dispatcher, async move {
50 let result = future.await;
51 let _ = tx.send(result);
52 });
53 }
54 },
55 move |runnable| dispatcher.run_on_main_thread(runnable),
56 );
57 runnable.schedule();
58 task.detach();
59 async move { rx.await.unwrap() }
60}
61
62/// Enqueues the given closure to be run on the application's event loop. Must
63/// be called on the main thread.
64pub(crate) fn spawn_on_main_local<R>(
65 dispatcher: Arc<dyn PlatformDispatcher>,
66 future: impl Future<Output = R> + 'static,
67) -> impl Future<Output = R>
68where
69 R: 'static,
70{
71 assert!(dispatcher.is_main_thread(), "must be called on main thread");
72
73 let (tx, rx) = oneshot::channel();
74 let (runnable, task) = async_task::spawn_local(
75 async move {
76 let result = future.await;
77 let _ = tx.send(result);
78 },
79 move |runnable| dispatcher.run_on_main_thread(runnable),
80 );
81 runnable.schedule();
82 task.detach();
83 async move { rx.await.unwrap() }
84}
85
86pub enum ForegroundExecutor {
87 Platform {
88 dispatcher: Arc<dyn PlatformDispatcher>,
89 _not_send_or_sync: PhantomData<Rc<()>>,
90 },
91 #[cfg(any(test, feature = "test"))]
92 Deterministic {
93 cx_id: usize,
94 executor: Arc<Deterministic>,
95 },
96}
97
98pub enum BackgroundExecutor {
99 #[cfg(any(test, feature = "test"))]
100 Deterministic { executor: Arc<Deterministic> },
101 Production {
102 executor: Arc<smol::Executor<'static>>,
103 _stop: channel::Sender<()>,
104 },
105}
106
107type AnyLocalFuture = Pin<Box<dyn 'static + Future<Output = Box<dyn Any + 'static>>>>;
108type AnyFuture = Pin<Box<dyn 'static + Send + Future<Output = Box<dyn Any + Send + 'static>>>>;
109type AnyTask = async_task::Task<Box<dyn Any + Send + 'static>>;
110type AnyLocalTask = async_task::Task<Box<dyn Any + 'static>>;
111
112#[must_use]
113pub enum Task<T> {
114 Ready(Option<T>),
115 Local {
116 any_task: AnyLocalTask,
117 result_type: PhantomData<T>,
118 },
119 Send {
120 any_task: AnyTask,
121 result_type: PhantomData<T>,
122 },
123}
124
125unsafe impl<T: Send> Send for Task<T> {}
126
127#[cfg(any(test, feature = "test"))]
128struct DeterministicState {
129 rng: rand::prelude::StdRng,
130 seed: u64,
131 scheduled_from_foreground: collections::HashMap<usize, Vec<ForegroundRunnable>>,
132 scheduled_from_background: Vec<BackgroundRunnable>,
133 forbid_parking: bool,
134 block_on_ticks: std::ops::RangeInclusive<usize>,
135 now: std::time::Instant,
136 next_timer_id: usize,
137 pending_timers: Vec<(usize, std::time::Instant, postage::barrier::Sender)>,
138 waiting_backtrace: Option<backtrace::Backtrace>,
139 next_runnable_id: usize,
140 poll_history: Vec<ExecutorEvent>,
141 previous_poll_history: Option<Vec<ExecutorEvent>>,
142 enable_runnable_backtraces: bool,
143 runnable_backtraces: collections::HashMap<usize, backtrace::Backtrace>,
144}
145
146#[derive(Copy, Clone, Debug, PartialEq, Eq)]
147pub enum ExecutorEvent {
148 PollRunnable { id: usize },
149 EnqueuRunnable { id: usize },
150}
151
152#[cfg(any(test, feature = "test"))]
153struct ForegroundRunnable {
154 id: usize,
155 runnable: Runnable,
156 main: bool,
157}
158
159#[cfg(any(test, feature = "test"))]
160struct BackgroundRunnable {
161 id: usize,
162 runnable: Runnable,
163}
164
165#[cfg(any(test, feature = "test"))]
166pub struct Deterministic {
167 state: Arc<parking_lot::Mutex<DeterministicState>>,
168 parker: parking_lot::Mutex<parking::Parker>,
169}
170
171#[must_use]
172pub enum Timer {
173 Production(smol::Timer),
174 #[cfg(any(test, feature = "test"))]
175 Deterministic(DeterministicTimer),
176}
177
178#[cfg(any(test, feature = "test"))]
179pub struct DeterministicTimer {
180 rx: postage::barrier::Receiver,
181 id: usize,
182 state: Arc<parking_lot::Mutex<DeterministicState>>,
183}
184
185#[cfg(any(test, feature = "test"))]
186impl Deterministic {
187 pub fn new(seed: u64) -> Arc<Self> {
188 use rand::prelude::*;
189
190 Arc::new(Self {
191 state: Arc::new(parking_lot::Mutex::new(DeterministicState {
192 rng: StdRng::seed_from_u64(seed),
193 seed,
194 scheduled_from_foreground: Default::default(),
195 scheduled_from_background: Default::default(),
196 forbid_parking: false,
197 block_on_ticks: 0..=1000,
198 now: std::time::Instant::now(),
199 next_timer_id: Default::default(),
200 pending_timers: Default::default(),
201 waiting_backtrace: None,
202 next_runnable_id: 0,
203 poll_history: Default::default(),
204 previous_poll_history: Default::default(),
205 enable_runnable_backtraces: false,
206 runnable_backtraces: Default::default(),
207 })),
208 parker: Default::default(),
209 })
210 }
211
212 pub fn execution_history(&self) -> Vec<ExecutorEvent> {
213 self.state.lock().poll_history.clone()
214 }
215
216 pub fn set_previous_execution_history(&self, history: Option<Vec<ExecutorEvent>>) {
217 self.state.lock().previous_poll_history = history;
218 }
219
220 pub fn enable_runnable_backtrace(&self) {
221 self.state.lock().enable_runnable_backtraces = true;
222 }
223
224 pub fn runnable_backtrace(&self, runnable_id: usize) -> backtrace::Backtrace {
225 let mut backtrace = self.state.lock().runnable_backtraces[&runnable_id].clone();
226 backtrace.resolve();
227 backtrace
228 }
229
230 pub fn build_background(self: &Arc<Self>) -> Arc<BackgroundExecutor> {
231 Arc::new(BackgroundExecutor::Deterministic {
232 executor: self.clone(),
233 })
234 }
235
236 pub fn build_foreground(self: &Arc<Self>, id: usize) -> Rc<ForegroundExecutor> {
237 Rc::new(ForegroundExecutor::Deterministic {
238 cx_id: id,
239 executor: self.clone(),
240 })
241 }
242
243 fn spawn_from_foreground(
244 &self,
245 cx_id: usize,
246 future: AnyLocalFuture,
247 main: bool,
248 ) -> AnyLocalTask {
249 let state = self.state.clone();
250 let id;
251 {
252 let mut state = state.lock();
253 id = util::post_inc(&mut state.next_runnable_id);
254 if state.enable_runnable_backtraces {
255 state
256 .runnable_backtraces
257 .insert(id, backtrace::Backtrace::new_unresolved());
258 }
259 }
260
261 let unparker = self.parker.lock().unparker();
262 let (runnable, task) = async_task::spawn_local(future, move |runnable| {
263 let mut state = state.lock();
264 state.push_to_history(ExecutorEvent::EnqueuRunnable { id });
265 state
266 .scheduled_from_foreground
267 .entry(cx_id)
268 .or_default()
269 .push(ForegroundRunnable { id, runnable, main });
270 unparker.unpark();
271 });
272 runnable.schedule();
273 task
274 }
275
276 fn spawn(&self, future: AnyFuture) -> AnyTask {
277 let state = self.state.clone();
278 let id;
279 {
280 let mut state = state.lock();
281 id = util::post_inc(&mut state.next_runnable_id);
282 if state.enable_runnable_backtraces {
283 state
284 .runnable_backtraces
285 .insert(id, backtrace::Backtrace::new_unresolved());
286 }
287 }
288
289 let unparker = self.parker.lock().unparker();
290 let (runnable, task) = async_task::spawn(future, move |runnable| {
291 let mut state = state.lock();
292 state
293 .poll_history
294 .push(ExecutorEvent::EnqueuRunnable { id });
295 state
296 .scheduled_from_background
297 .push(BackgroundRunnable { id, runnable });
298 unparker.unpark();
299 });
300 runnable.schedule();
301 task
302 }
303
304 fn run<'a>(
305 &self,
306 cx_id: usize,
307 main_future: Pin<Box<dyn 'a + Future<Output = Box<dyn Any>>>>,
308 ) -> Box<dyn Any> {
309 use std::sync::atomic::{AtomicBool, Ordering::SeqCst};
310
311 let woken = Arc::new(AtomicBool::new(false));
312
313 let state = self.state.clone();
314 let id;
315 {
316 let mut state = state.lock();
317 id = util::post_inc(&mut state.next_runnable_id);
318 if state.enable_runnable_backtraces {
319 state
320 .runnable_backtraces
321 .insert(id, backtrace::Backtrace::new_unresolved());
322 }
323 }
324
325 let unparker = self.parker.lock().unparker();
326 let (runnable, mut main_task) = unsafe {
327 async_task::spawn_unchecked(main_future, move |runnable| {
328 let state = &mut *state.lock();
329 state
330 .scheduled_from_foreground
331 .entry(cx_id)
332 .or_default()
333 .push(ForegroundRunnable {
334 id: util::post_inc(&mut state.next_runnable_id),
335 runnable,
336 main: true,
337 });
338 unparker.unpark();
339 })
340 };
341 runnable.schedule();
342
343 loop {
344 if let Some(result) = self.run_internal(woken.clone(), Some(&mut main_task)) {
345 return result;
346 }
347
348 if !woken.load(SeqCst) {
349 self.state.lock().will_park();
350 }
351
352 woken.store(false, SeqCst);
353 self.parker.lock().park();
354 }
355 }
356
357 pub fn run_until_parked(&self) {
358 use std::sync::atomic::AtomicBool;
359 let woken = Arc::new(AtomicBool::new(false));
360 self.run_internal(woken, None);
361 }
362
363 fn run_internal(
364 &self,
365 woken: Arc<std::sync::atomic::AtomicBool>,
366 mut main_task: Option<&mut AnyLocalTask>,
367 ) -> Option<Box<dyn Any>> {
368 use rand::prelude::*;
369 use std::sync::atomic::Ordering::SeqCst;
370
371 let unparker = self.parker.lock().unparker();
372 let waker = waker_fn::waker_fn(move || {
373 woken.store(true, SeqCst);
374 unparker.unpark();
375 });
376
377 let mut cx = Context::from_waker(&waker);
378 loop {
379 let mut state = self.state.lock();
380
381 if state.scheduled_from_foreground.is_empty()
382 && state.scheduled_from_background.is_empty()
383 {
384 if let Some(main_task) = main_task {
385 if let Poll::Ready(result) = main_task.poll(&mut cx) {
386 return Some(result);
387 }
388 }
389
390 return None;
391 }
392
393 if !state.scheduled_from_background.is_empty() && state.rng.gen() {
394 let background_len = state.scheduled_from_background.len();
395 let ix = state.rng.gen_range(0..background_len);
396 let background_runnable = state.scheduled_from_background.remove(ix);
397 state.push_to_history(ExecutorEvent::PollRunnable {
398 id: background_runnable.id,
399 });
400 drop(state);
401 background_runnable.runnable.run();
402 } else if !state.scheduled_from_foreground.is_empty() {
403 let available_cx_ids = state
404 .scheduled_from_foreground
405 .keys()
406 .copied()
407 .collect::<Vec<_>>();
408 let cx_id_to_run = *available_cx_ids.iter().choose(&mut state.rng).unwrap();
409 let scheduled_from_cx = state
410 .scheduled_from_foreground
411 .get_mut(&cx_id_to_run)
412 .unwrap();
413 let foreground_runnable = scheduled_from_cx.remove(0);
414 if scheduled_from_cx.is_empty() {
415 state.scheduled_from_foreground.remove(&cx_id_to_run);
416 }
417 state.push_to_history(ExecutorEvent::PollRunnable {
418 id: foreground_runnable.id,
419 });
420
421 drop(state);
422
423 foreground_runnable.runnable.run();
424 if let Some(main_task) = main_task.as_mut() {
425 if foreground_runnable.main {
426 if let Poll::Ready(result) = main_task.poll(&mut cx) {
427 return Some(result);
428 }
429 }
430 }
431 }
432 }
433 }
434
435 fn block<F, T>(&self, future: &mut F, max_ticks: usize) -> Option<T>
436 where
437 F: Unpin + Future<Output = T>,
438 {
439 use rand::prelude::*;
440
441 let unparker = self.parker.lock().unparker();
442 let waker = waker_fn::waker_fn(move || {
443 unparker.unpark();
444 });
445
446 let mut cx = Context::from_waker(&waker);
447 for _ in 0..max_ticks {
448 let mut state = self.state.lock();
449 let runnable_count = state.scheduled_from_background.len();
450 let ix = state.rng.gen_range(0..=runnable_count);
451 if ix < state.scheduled_from_background.len() {
452 let background_runnable = state.scheduled_from_background.remove(ix);
453 state.push_to_history(ExecutorEvent::PollRunnable {
454 id: background_runnable.id,
455 });
456 drop(state);
457 background_runnable.runnable.run();
458 } else {
459 drop(state);
460 if let Poll::Ready(result) = future.poll(&mut cx) {
461 return Some(result);
462 }
463 let mut state = self.state.lock();
464 if state.scheduled_from_background.is_empty() {
465 state.will_park();
466 drop(state);
467 self.parker.lock().park();
468 }
469
470 continue;
471 }
472 }
473
474 None
475 }
476
477 pub fn timer(&self, duration: Duration) -> Timer {
478 let (tx, rx) = postage::barrier::channel();
479 let mut state = self.state.lock();
480 let wakeup_at = state.now + duration;
481 let id = util::post_inc(&mut state.next_timer_id);
482 match state
483 .pending_timers
484 .binary_search_by_key(&wakeup_at, |e| e.1)
485 {
486 Ok(ix) | Err(ix) => state.pending_timers.insert(ix, (id, wakeup_at, tx)),
487 }
488 let state = self.state.clone();
489 Timer::Deterministic(DeterministicTimer { rx, id, state })
490 }
491
492 pub fn now(&self) -> std::time::Instant {
493 let state = self.state.lock();
494 state.now
495 }
496
497 pub fn advance_clock(&self, duration: Duration) {
498 let new_now = self.state.lock().now + duration;
499 loop {
500 self.run_until_parked();
501 let mut state = self.state.lock();
502
503 if let Some((_, wakeup_time, _)) = state.pending_timers.first() {
504 let wakeup_time = *wakeup_time;
505 if wakeup_time <= new_now {
506 let timer_count = state
507 .pending_timers
508 .iter()
509 .take_while(|(_, t, _)| *t == wakeup_time)
510 .count();
511 state.now = wakeup_time;
512 let timers_to_wake = state
513 .pending_timers
514 .drain(0..timer_count)
515 .collect::<Vec<_>>();
516 drop(state);
517 drop(timers_to_wake);
518 continue;
519 }
520 }
521
522 break;
523 }
524
525 self.state.lock().now = new_now;
526 }
527
528 pub fn start_waiting(&self) {
529 self.state.lock().waiting_backtrace = Some(backtrace::Backtrace::new_unresolved());
530 }
531
532 pub fn finish_waiting(&self) {
533 self.state.lock().waiting_backtrace.take();
534 }
535
536 pub fn forbid_parking(&self) {
537 use rand::prelude::*;
538
539 let mut state = self.state.lock();
540 state.forbid_parking = true;
541 state.rng = StdRng::seed_from_u64(state.seed);
542 }
543
544 pub fn allow_parking(&self) {
545 use rand::prelude::*;
546
547 let mut state = self.state.lock();
548 state.forbid_parking = false;
549 state.rng = StdRng::seed_from_u64(state.seed);
550 }
551
552 pub async fn simulate_random_delay(&self) {
553 use rand::prelude::*;
554 use smol::future::yield_now;
555 if self.state.lock().rng.gen_bool(0.2) {
556 let yields = self.state.lock().rng.gen_range(1..=10);
557 for _ in 0..yields {
558 yield_now().await;
559 }
560 }
561 }
562
563 pub fn record_backtrace(&self) {
564 let mut state = self.state.lock();
565 if state.enable_runnable_backtraces {
566 let current_id = state
567 .poll_history
568 .iter()
569 .rev()
570 .find_map(|event| match event {
571 ExecutorEvent::PollRunnable { id } => Some(*id),
572 _ => None,
573 });
574 if let Some(id) = current_id {
575 state
576 .runnable_backtraces
577 .insert(id, backtrace::Backtrace::new_unresolved());
578 }
579 }
580 }
581}
582
583impl Drop for Timer {
584 fn drop(&mut self) {
585 #[cfg(any(test, feature = "test"))]
586 if let Timer::Deterministic(DeterministicTimer { state, id, .. }) = self {
587 state
588 .lock()
589 .pending_timers
590 .retain(|(timer_id, _, _)| timer_id != id)
591 }
592 }
593}
594
595impl Future for Timer {
596 type Output = ();
597
598 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
599 match &mut *self {
600 #[cfg(any(test, feature = "test"))]
601 Self::Deterministic(DeterministicTimer { rx, .. }) => {
602 use postage::stream::{PollRecv, Stream as _};
603 smol::pin!(rx);
604 match rx.poll_recv(&mut postage::Context::from_waker(cx.waker())) {
605 PollRecv::Ready(()) | PollRecv::Closed => Poll::Ready(()),
606 PollRecv::Pending => Poll::Pending,
607 }
608 }
609 Self::Production(timer) => {
610 smol::pin!(timer);
611 match timer.poll(cx) {
612 Poll::Ready(_) => Poll::Ready(()),
613 Poll::Pending => Poll::Pending,
614 }
615 }
616 }
617 }
618}
619
620#[cfg(any(test, feature = "test"))]
621impl DeterministicState {
622 fn push_to_history(&mut self, event: ExecutorEvent) {
623 use std::fmt::Write as _;
624
625 self.poll_history.push(event);
626 if let Some(prev_history) = &self.previous_poll_history {
627 let ix = self.poll_history.len() - 1;
628 let prev_event = prev_history[ix];
629 if event != prev_event {
630 let mut message = String::new();
631 writeln!(
632 &mut message,
633 "current runnable backtrace:\n{:?}",
634 self.runnable_backtraces.get_mut(&event.id()).map(|trace| {
635 trace.resolve();
636 crate::util::CwdBacktrace(trace)
637 })
638 )
639 .unwrap();
640 writeln!(
641 &mut message,
642 "previous runnable backtrace:\n{:?}",
643 self.runnable_backtraces
644 .get_mut(&prev_event.id())
645 .map(|trace| {
646 trace.resolve();
647 util::CwdBacktrace(trace)
648 })
649 )
650 .unwrap();
651 panic!("detected non-determinism after {ix}. {message}");
652 }
653 }
654 }
655
656 fn will_park(&mut self) {
657 if self.forbid_parking {
658 let mut backtrace_message = String::new();
659 #[cfg(any(test, feature = "test"))]
660 if let Some(backtrace) = self.waiting_backtrace.as_mut() {
661 backtrace.resolve();
662 backtrace_message = format!(
663 "\nbacktrace of waiting future:\n{:?}",
664 util::CwdBacktrace(backtrace)
665 );
666 }
667
668 panic!(
669 "deterministic executor parked after a call to forbid_parking{}",
670 backtrace_message
671 );
672 }
673 }
674}
675
676#[cfg(any(test, feature = "test"))]
677impl ExecutorEvent {
678 pub fn id(&self) -> usize {
679 match self {
680 ExecutorEvent::PollRunnable { id } => *id,
681 ExecutorEvent::EnqueuRunnable { id } => *id,
682 }
683 }
684}
685
686impl ForegroundExecutor {
687 pub fn new(dispatcher: Arc<dyn PlatformDispatcher>) -> Result<Self> {
688 if dispatcher.is_main_thread() {
689 Ok(Self::Platform {
690 dispatcher,
691 _not_send_or_sync: PhantomData,
692 })
693 } else {
694 Err(anyhow!("must be constructed on main thread"))
695 }
696 }
697
698 pub fn spawn<T: 'static>(&self, future: impl Future<Output = T> + 'static) -> Task<T> {
699 let future = any_local_future(future);
700 let any_task = match self {
701 #[cfg(any(test, feature = "test"))]
702 Self::Deterministic { cx_id, executor } => {
703 executor.spawn_from_foreground(*cx_id, future, false)
704 }
705 Self::Platform { dispatcher, .. } => {
706 fn spawn_inner(
707 future: AnyLocalFuture,
708 dispatcher: &Arc<dyn PlatformDispatcher>,
709 ) -> AnyLocalTask {
710 let dispatcher = dispatcher.clone();
711 let schedule =
712 move |runnable: Runnable| dispatcher.run_on_main_thread(runnable);
713 let (runnable, task) = async_task::spawn_local(future, schedule);
714 runnable.schedule();
715 task
716 }
717 spawn_inner(future, dispatcher)
718 }
719 };
720 Task::local(any_task)
721 }
722
723 #[cfg(any(test, feature = "test"))]
724 pub fn run<T: 'static>(&self, future: impl Future<Output = T>) -> T {
725 let future = async move { Box::new(future.await) as Box<dyn Any> }.boxed_local();
726 let result = match self {
727 Self::Deterministic { cx_id, executor } => executor.run(*cx_id, future),
728 Self::Platform { .. } => panic!("you can't call run on a platform foreground executor"),
729 };
730 *result.downcast().unwrap()
731 }
732
733 #[cfg(any(test, feature = "test"))]
734 pub fn run_until_parked(&self) {
735 match self {
736 Self::Deterministic { executor, .. } => executor.run_until_parked(),
737 _ => panic!("this method can only be called on a deterministic executor"),
738 }
739 }
740
741 #[cfg(any(test, feature = "test"))]
742 pub fn parking_forbidden(&self) -> bool {
743 match self {
744 Self::Deterministic { executor, .. } => executor.state.lock().forbid_parking,
745 _ => panic!("this method can only be called on a deterministic executor"),
746 }
747 }
748
749 #[cfg(any(test, feature = "test"))]
750 pub fn start_waiting(&self) {
751 match self {
752 Self::Deterministic { executor, .. } => executor.start_waiting(),
753 _ => panic!("this method can only be called on a deterministic executor"),
754 }
755 }
756
757 #[cfg(any(test, feature = "test"))]
758 pub fn finish_waiting(&self) {
759 match self {
760 Self::Deterministic { executor, .. } => executor.finish_waiting(),
761 _ => panic!("this method can only be called on a deterministic executor"),
762 }
763 }
764
765 #[cfg(any(test, feature = "test"))]
766 pub fn forbid_parking(&self) {
767 match self {
768 Self::Deterministic { executor, .. } => executor.forbid_parking(),
769 _ => panic!("this method can only be called on a deterministic executor"),
770 }
771 }
772
773 #[cfg(any(test, feature = "test"))]
774 pub fn allow_parking(&self) {
775 match self {
776 Self::Deterministic { executor, .. } => executor.allow_parking(),
777 _ => panic!("this method can only be called on a deterministic executor"),
778 }
779 }
780
781 #[cfg(any(test, feature = "test"))]
782 pub fn advance_clock(&self, duration: Duration) {
783 match self {
784 Self::Deterministic { executor, .. } => executor.advance_clock(duration),
785 _ => panic!("this method can only be called on a deterministic executor"),
786 }
787 }
788
789 #[cfg(any(test, feature = "test"))]
790 pub fn set_block_on_ticks(&self, range: std::ops::RangeInclusive<usize>) {
791 match self {
792 Self::Deterministic { executor, .. } => executor.state.lock().block_on_ticks = range,
793 _ => panic!("this method can only be called on a deterministic executor"),
794 }
795 }
796}
797
798impl BackgroundExecutor {
799 pub fn new() -> Self {
800 let executor = Arc::new(Executor::new());
801 let stop = channel::unbounded::<()>();
802
803 for i in 0..2 * num_cpus::get() {
804 let executor = executor.clone();
805 let stop = stop.1.clone();
806 thread::Builder::new()
807 .name(format!("background-executor-{}", i))
808 .spawn(move || smol::block_on(executor.run(stop.recv())))
809 .unwrap();
810 }
811
812 Self::Production {
813 executor,
814 _stop: stop.0,
815 }
816 }
817
818 pub fn num_cpus(&self) -> usize {
819 num_cpus::get()
820 }
821
822 pub fn spawn<T, F>(&self, future: F) -> Task<T>
823 where
824 T: 'static + Send,
825 F: Send + Future<Output = T> + 'static,
826 {
827 let future = any_future(future);
828 let any_task = match self {
829 Self::Production { executor, .. } => executor.spawn(future),
830 #[cfg(any(test, feature = "test"))]
831 Self::Deterministic { executor } => executor.spawn(future),
832 };
833 Task::send(any_task)
834 }
835
836 pub fn block<F, T>(&self, future: F) -> T
837 where
838 F: Future<Output = T>,
839 {
840 smol::pin!(future);
841 match self {
842 Self::Production { .. } => smol::block_on(&mut future),
843 #[cfg(any(test, feature = "test"))]
844 Self::Deterministic { executor, .. } => {
845 executor.block(&mut future, usize::MAX).unwrap()
846 }
847 }
848 }
849
850 pub fn block_with_timeout<F, T>(
851 &self,
852 timeout: Duration,
853 future: F,
854 ) -> Result<T, impl Future<Output = T>>
855 where
856 T: 'static,
857 F: 'static + Unpin + Future<Output = T>,
858 {
859 let mut future = any_local_future(future);
860 if !timeout.is_zero() {
861 let output = match self {
862 Self::Production { .. } => smol::block_on(util::timeout(timeout, &mut future)).ok(),
863 #[cfg(any(test, feature = "test"))]
864 Self::Deterministic { executor, .. } => {
865 use rand::prelude::*;
866 let max_ticks = {
867 let mut state = executor.state.lock();
868 let range = state.block_on_ticks.clone();
869 state.rng.gen_range(range)
870 };
871 executor.block(&mut future, max_ticks)
872 }
873 };
874 if let Some(output) = output {
875 return Ok(*output.downcast().unwrap());
876 }
877 }
878 Err(async { *future.await.downcast().unwrap() })
879 }
880
881 pub async fn scoped<'scope, F>(self: &Arc<Self>, scheduler: F)
882 where
883 F: FnOnce(&mut Scope<'scope>),
884 {
885 let mut scope = Scope::new(self.clone());
886 (scheduler)(&mut scope);
887 let spawned = mem::take(&mut scope.futures)
888 .into_iter()
889 .map(|f| self.spawn(f))
890 .collect::<Vec<_>>();
891 for task in spawned {
892 task.await;
893 }
894 }
895
896 pub fn timer(&self, duration: Duration) -> Timer {
897 match self {
898 BackgroundExecutor::Production { .. } => {
899 Timer::Production(smol::Timer::after(duration))
900 }
901 #[cfg(any(test, feature = "test"))]
902 BackgroundExecutor::Deterministic { executor } => executor.timer(duration),
903 }
904 }
905
906 pub fn now(&self) -> std::time::Instant {
907 match self {
908 BackgroundExecutor::Production { .. } => std::time::Instant::now(),
909 #[cfg(any(test, feature = "test"))]
910 BackgroundExecutor::Deterministic { executor } => executor.now(),
911 }
912 }
913
914 #[cfg(any(test, feature = "test"))]
915 pub fn rng<'a>(&'a self) -> impl 'a + std::ops::DerefMut<Target = rand::prelude::StdRng> {
916 match self {
917 Self::Deterministic { executor, .. } => {
918 parking_lot::lock_api::MutexGuard::map(executor.state.lock(), |s| &mut s.rng)
919 }
920 _ => panic!("this method can only be called on a deterministic executor"),
921 }
922 }
923
924 #[cfg(any(test, feature = "test"))]
925 pub async fn simulate_random_delay(&self) {
926 match self {
927 Self::Deterministic { executor, .. } => {
928 executor.simulate_random_delay().await;
929 }
930 _ => {
931 panic!("this method can only be called on a deterministic executor")
932 }
933 }
934 }
935
936 #[cfg(any(test, feature = "test"))]
937 pub fn record_backtrace(&self) {
938 match self {
939 Self::Deterministic { executor, .. } => executor.record_backtrace(),
940 _ => {
941 panic!("this method can only be called on a deterministic executor")
942 }
943 }
944 }
945
946 #[cfg(any(test, feature = "test"))]
947 pub fn start_waiting(&self) {
948 match self {
949 Self::Deterministic { executor, .. } => executor.start_waiting(),
950 _ => panic!("this method can only be called on a deterministic executor"),
951 }
952 }
953}
954
955impl Default for BackgroundExecutor {
956 fn default() -> Self {
957 Self::new()
958 }
959}
960
961pub struct Scope<'a> {
962 executor: Arc<BackgroundExecutor>,
963 futures: Vec<Pin<Box<dyn Future<Output = ()> + Send + 'static>>>,
964 tx: Option<mpsc::Sender<()>>,
965 rx: mpsc::Receiver<()>,
966 _phantom: PhantomData<&'a ()>,
967}
968
969impl<'a> Scope<'a> {
970 fn new(executor: Arc<BackgroundExecutor>) -> Self {
971 let (tx, rx) = mpsc::channel(1);
972 Self {
973 executor,
974 tx: Some(tx),
975 rx,
976 futures: Default::default(),
977 _phantom: PhantomData,
978 }
979 }
980
981 pub fn spawn<F>(&mut self, f: F)
982 where
983 F: Future<Output = ()> + Send + 'a,
984 {
985 let tx = self.tx.clone().unwrap();
986
987 // Safety: The 'a lifetime is guaranteed to outlive any of these futures because
988 // dropping this `Scope` blocks until all of the futures have resolved.
989 let f = unsafe {
990 mem::transmute::<
991 Pin<Box<dyn Future<Output = ()> + Send + 'a>>,
992 Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
993 >(Box::pin(async move {
994 f.await;
995 drop(tx);
996 }))
997 };
998 self.futures.push(f);
999 }
1000}
1001
1002impl<'a> Drop for Scope<'a> {
1003 fn drop(&mut self) {
1004 self.tx.take().unwrap();
1005
1006 // Wait until the channel is closed, which means that all of the spawned
1007 // futures have resolved.
1008 self.executor.block(self.rx.next());
1009 }
1010}
1011
1012impl<T> Task<T> {
1013 pub fn ready(value: T) -> Self {
1014 Self::Ready(Some(value))
1015 }
1016
1017 fn local(any_task: AnyLocalTask) -> Self {
1018 Self::Local {
1019 any_task,
1020 result_type: PhantomData,
1021 }
1022 }
1023
1024 pub fn detach(self) {
1025 match self {
1026 Task::Ready(_) => {}
1027 Task::Local { any_task, .. } => any_task.detach(),
1028 Task::Send { any_task, .. } => any_task.detach(),
1029 }
1030 }
1031}
1032
1033// impl<T: 'static, E: 'static + Display> Task<Result<T, E>> {
1034// #[track_caller]
1035// pub fn detach_and_log_err(self, cx: &mut AppContext) {
1036// let caller = Location::caller();
1037// cx.spawn(|_| async move {
1038// if let Err(err) = self.await {
1039// log::error!("{}:{}: {:#}", caller.file(), caller.line(), err);
1040// }
1041// })
1042// .detach();
1043// }
1044// }
1045
1046impl<T: Send> Task<T> {
1047 fn send(any_task: AnyTask) -> Self {
1048 Self::Send {
1049 any_task,
1050 result_type: PhantomData,
1051 }
1052 }
1053}
1054
1055impl<T: fmt::Debug> fmt::Debug for Task<T> {
1056 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1057 match self {
1058 Task::Ready(value) => value.fmt(f),
1059 Task::Local { any_task, .. } => any_task.fmt(f),
1060 Task::Send { any_task, .. } => any_task.fmt(f),
1061 }
1062 }
1063}
1064
1065impl<T: 'static> Future for Task<T> {
1066 type Output = T;
1067
1068 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1069 match unsafe { self.get_unchecked_mut() } {
1070 Task::Ready(value) => Poll::Ready(value.take().unwrap()),
1071 Task::Local { any_task, .. } => {
1072 any_task.poll(cx).map(|value| *value.downcast().unwrap())
1073 }
1074 Task::Send { any_task, .. } => {
1075 any_task.poll(cx).map(|value| *value.downcast().unwrap())
1076 }
1077 }
1078 }
1079}
1080
1081fn any_future<T, F>(future: F) -> AnyFuture
1082where
1083 T: 'static + Send,
1084 F: Future<Output = T> + Send + 'static,
1085{
1086 async { Box::new(future.await) as Box<dyn Any + Send> }.boxed()
1087}
1088
1089fn any_local_future<T, F>(future: F) -> AnyLocalFuture
1090where
1091 T: 'static,
1092 F: Future<Output = T> + 'static,
1093{
1094 async { Box::new(future.await) as Box<dyn Any> }.boxed_local()
1095}