1use crate::util;
2use crate::PlatformDispatcher;
3use anyhow::{anyhow, Result};
4use async_task::Runnable;
5use futures::channel::{mpsc, oneshot};
6use smol::{channel, prelude::*, Executor};
7use std::{
8 any::Any,
9 fmt::{self},
10 marker::PhantomData,
11 mem,
12 pin::Pin,
13 rc::Rc,
14 sync::Arc,
15 task::{Context, Poll},
16 thread,
17 time::Duration,
18};
19
20/// Enqueues the given closure to be run on the application's event loop. Can be
21/// called on any thread.
22pub(crate) fn spawn_on_main<F, R>(
23 dispatcher: Arc<dyn PlatformDispatcher>,
24 func: impl FnOnce() -> F + Send + 'static,
25) -> impl Future<Output = R>
26where
27 F: Future<Output = R> + 'static,
28 R: Send + 'static,
29{
30 let (tx, rx) = oneshot::channel();
31 let (runnable, task) = async_task::spawn(
32 {
33 let dispatcher = dispatcher.clone();
34 async move {
35 let future = func();
36 let _ = spawn_on_main_local(dispatcher, async move {
37 let result = future.await;
38 let _ = tx.send(result);
39 });
40 }
41 },
42 move |runnable| dispatcher.run_on_main_thread(runnable),
43 );
44 runnable.schedule();
45 task.detach();
46 async move { rx.await.unwrap() }
47}
48
49/// Enqueues the given closure to be run on the application's event loop. Must
50/// be called on the main thread.
51pub(crate) fn spawn_on_main_local<R>(
52 dispatcher: Arc<dyn PlatformDispatcher>,
53 future: impl Future<Output = R> + 'static,
54) -> impl Future<Output = R>
55where
56 R: 'static,
57{
58 assert!(dispatcher.is_main_thread(), "must be called on main thread");
59
60 let (tx, rx) = oneshot::channel();
61 let (runnable, task) = async_task::spawn_local(
62 async move {
63 let result = future.await;
64 let _ = tx.send(result);
65 },
66 move |runnable| dispatcher.run_on_main_thread(runnable),
67 );
68 runnable.schedule();
69 task.detach();
70 async move { rx.await.unwrap() }
71}
72
73pub enum ForegroundExecutor {
74 Platform {
75 dispatcher: Arc<dyn PlatformDispatcher>,
76 _not_send_or_sync: PhantomData<Rc<()>>,
77 },
78 #[cfg(any(test, feature = "test"))]
79 Deterministic {
80 cx_id: usize,
81 executor: Arc<Deterministic>,
82 },
83}
84
85pub enum BackgroundExecutor {
86 #[cfg(any(test, feature = "test"))]
87 Deterministic { executor: Arc<Deterministic> },
88 Production {
89 executor: Arc<smol::Executor<'static>>,
90 _stop: channel::Sender<()>,
91 },
92}
93
94type AnyLocalFuture = Pin<Box<dyn 'static + Future<Output = Box<dyn Any + 'static>>>>;
95type AnyFuture = Pin<Box<dyn 'static + Send + Future<Output = Box<dyn Any + Send + 'static>>>>;
96type AnyTask = async_task::Task<Box<dyn Any + Send + 'static>>;
97type AnyLocalTask = async_task::Task<Box<dyn Any + 'static>>;
98
99#[must_use]
100pub enum Task<T> {
101 Ready(Option<T>),
102 Local {
103 any_task: AnyLocalTask,
104 result_type: PhantomData<T>,
105 },
106 Send {
107 any_task: AnyTask,
108 result_type: PhantomData<T>,
109 },
110}
111
112unsafe impl<T: Send> Send for Task<T> {}
113
114#[cfg(any(test, feature = "test"))]
115struct DeterministicState {
116 rng: rand::prelude::StdRng,
117 seed: u64,
118 scheduled_from_foreground: collections::HashMap<usize, Vec<ForegroundRunnable>>,
119 scheduled_from_background: Vec<BackgroundRunnable>,
120 forbid_parking: bool,
121 block_on_ticks: std::ops::RangeInclusive<usize>,
122 now: std::time::Instant,
123 next_timer_id: usize,
124 pending_timers: Vec<(usize, std::time::Instant, postage::barrier::Sender)>,
125 waiting_backtrace: Option<backtrace::Backtrace>,
126 next_runnable_id: usize,
127 poll_history: Vec<ExecutorEvent>,
128 previous_poll_history: Option<Vec<ExecutorEvent>>,
129 enable_runnable_backtraces: bool,
130 runnable_backtraces: collections::HashMap<usize, backtrace::Backtrace>,
131}
132
133#[derive(Copy, Clone, Debug, PartialEq, Eq)]
134pub enum ExecutorEvent {
135 PollRunnable { id: usize },
136 EnqueuRunnable { id: usize },
137}
138
139#[cfg(any(test, feature = "test"))]
140struct ForegroundRunnable {
141 id: usize,
142 runnable: Runnable,
143 main: bool,
144}
145
146#[cfg(any(test, feature = "test"))]
147struct BackgroundRunnable {
148 id: usize,
149 runnable: Runnable,
150}
151
152#[cfg(any(test, feature = "test"))]
153pub struct Deterministic {
154 state: Arc<parking_lot::Mutex<DeterministicState>>,
155 parker: parking_lot::Mutex<parking::Parker>,
156}
157
158#[must_use]
159pub enum Timer {
160 Production(smol::Timer),
161 #[cfg(any(test, feature = "test"))]
162 Deterministic(DeterministicTimer),
163}
164
165#[cfg(any(test, feature = "test"))]
166pub struct DeterministicTimer {
167 rx: postage::barrier::Receiver,
168 id: usize,
169 state: Arc<parking_lot::Mutex<DeterministicState>>,
170}
171
172#[cfg(any(test, feature = "test"))]
173impl Deterministic {
174 pub fn new(seed: u64) -> Arc<Self> {
175 use rand::prelude::*;
176
177 Arc::new(Self {
178 state: Arc::new(parking_lot::Mutex::new(DeterministicState {
179 rng: StdRng::seed_from_u64(seed),
180 seed,
181 scheduled_from_foreground: Default::default(),
182 scheduled_from_background: Default::default(),
183 forbid_parking: false,
184 block_on_ticks: 0..=1000,
185 now: std::time::Instant::now(),
186 next_timer_id: Default::default(),
187 pending_timers: Default::default(),
188 waiting_backtrace: None,
189 next_runnable_id: 0,
190 poll_history: Default::default(),
191 previous_poll_history: Default::default(),
192 enable_runnable_backtraces: false,
193 runnable_backtraces: Default::default(),
194 })),
195 parker: Default::default(),
196 })
197 }
198
199 pub fn execution_history(&self) -> Vec<ExecutorEvent> {
200 self.state.lock().poll_history.clone()
201 }
202
203 pub fn set_previous_execution_history(&self, history: Option<Vec<ExecutorEvent>>) {
204 self.state.lock().previous_poll_history = history;
205 }
206
207 pub fn enable_runnable_backtrace(&self) {
208 self.state.lock().enable_runnable_backtraces = true;
209 }
210
211 pub fn runnable_backtrace(&self, runnable_id: usize) -> backtrace::Backtrace {
212 let mut backtrace = self.state.lock().runnable_backtraces[&runnable_id].clone();
213 backtrace.resolve();
214 backtrace
215 }
216
217 pub fn build_background(self: &Arc<Self>) -> Arc<BackgroundExecutor> {
218 Arc::new(BackgroundExecutor::Deterministic {
219 executor: self.clone(),
220 })
221 }
222
223 pub fn build_foreground(self: &Arc<Self>, id: usize) -> Rc<ForegroundExecutor> {
224 Rc::new(ForegroundExecutor::Deterministic {
225 cx_id: id,
226 executor: self.clone(),
227 })
228 }
229
230 fn spawn_from_foreground(
231 &self,
232 cx_id: usize,
233 future: AnyLocalFuture,
234 main: bool,
235 ) -> AnyLocalTask {
236 let state = self.state.clone();
237 let id;
238 {
239 let mut state = state.lock();
240 id = util::post_inc(&mut state.next_runnable_id);
241 if state.enable_runnable_backtraces {
242 state
243 .runnable_backtraces
244 .insert(id, backtrace::Backtrace::new_unresolved());
245 }
246 }
247
248 let unparker = self.parker.lock().unparker();
249 let (runnable, task) = async_task::spawn_local(future, move |runnable| {
250 let mut state = state.lock();
251 state.push_to_history(ExecutorEvent::EnqueuRunnable { id });
252 state
253 .scheduled_from_foreground
254 .entry(cx_id)
255 .or_default()
256 .push(ForegroundRunnable { id, runnable, main });
257 unparker.unpark();
258 });
259 runnable.schedule();
260 task
261 }
262
263 fn spawn(&self, future: AnyFuture) -> AnyTask {
264 let state = self.state.clone();
265 let id;
266 {
267 let mut state = state.lock();
268 id = util::post_inc(&mut state.next_runnable_id);
269 if state.enable_runnable_backtraces {
270 state
271 .runnable_backtraces
272 .insert(id, backtrace::Backtrace::new_unresolved());
273 }
274 }
275
276 let unparker = self.parker.lock().unparker();
277 let (runnable, task) = async_task::spawn(future, move |runnable| {
278 let mut state = state.lock();
279 state
280 .poll_history
281 .push(ExecutorEvent::EnqueuRunnable { id });
282 state
283 .scheduled_from_background
284 .push(BackgroundRunnable { id, runnable });
285 unparker.unpark();
286 });
287 runnable.schedule();
288 task
289 }
290
291 fn run<'a>(
292 &self,
293 cx_id: usize,
294 main_future: Pin<Box<dyn 'a + Future<Output = Box<dyn Any>>>>,
295 ) -> Box<dyn Any> {
296 use std::sync::atomic::{AtomicBool, Ordering::SeqCst};
297
298 let woken = Arc::new(AtomicBool::new(false));
299
300 let state = self.state.clone();
301 let id;
302 {
303 let mut state = state.lock();
304 id = util::post_inc(&mut state.next_runnable_id);
305 if state.enable_runnable_backtraces {
306 state
307 .runnable_backtraces
308 .insert(id, backtrace::Backtrace::new_unresolved());
309 }
310 }
311
312 let unparker = self.parker.lock().unparker();
313 let (runnable, mut main_task) = unsafe {
314 async_task::spawn_unchecked(main_future, move |runnable| {
315 let state = &mut *state.lock();
316 state
317 .scheduled_from_foreground
318 .entry(cx_id)
319 .or_default()
320 .push(ForegroundRunnable {
321 id: util::post_inc(&mut state.next_runnable_id),
322 runnable,
323 main: true,
324 });
325 unparker.unpark();
326 })
327 };
328 runnable.schedule();
329
330 loop {
331 if let Some(result) = self.run_internal(woken.clone(), Some(&mut main_task)) {
332 return result;
333 }
334
335 if !woken.load(SeqCst) {
336 self.state.lock().will_park();
337 }
338
339 woken.store(false, SeqCst);
340 self.parker.lock().park();
341 }
342 }
343
344 pub fn run_until_parked(&self) {
345 use std::sync::atomic::AtomicBool;
346 let woken = Arc::new(AtomicBool::new(false));
347 self.run_internal(woken, None);
348 }
349
350 fn run_internal(
351 &self,
352 woken: Arc<std::sync::atomic::AtomicBool>,
353 mut main_task: Option<&mut AnyLocalTask>,
354 ) -> Option<Box<dyn Any>> {
355 use rand::prelude::*;
356 use std::sync::atomic::Ordering::SeqCst;
357
358 let unparker = self.parker.lock().unparker();
359 let waker = waker_fn::waker_fn(move || {
360 woken.store(true, SeqCst);
361 unparker.unpark();
362 });
363
364 let mut cx = Context::from_waker(&waker);
365 loop {
366 let mut state = self.state.lock();
367
368 if state.scheduled_from_foreground.is_empty()
369 && state.scheduled_from_background.is_empty()
370 {
371 if let Some(main_task) = main_task {
372 if let Poll::Ready(result) = main_task.poll(&mut cx) {
373 return Some(result);
374 }
375 }
376
377 return None;
378 }
379
380 if !state.scheduled_from_background.is_empty() && state.rng.gen() {
381 let background_len = state.scheduled_from_background.len();
382 let ix = state.rng.gen_range(0..background_len);
383 let background_runnable = state.scheduled_from_background.remove(ix);
384 state.push_to_history(ExecutorEvent::PollRunnable {
385 id: background_runnable.id,
386 });
387 drop(state);
388 background_runnable.runnable.run();
389 } else if !state.scheduled_from_foreground.is_empty() {
390 let available_cx_ids = state
391 .scheduled_from_foreground
392 .keys()
393 .copied()
394 .collect::<Vec<_>>();
395 let cx_id_to_run = *available_cx_ids.iter().choose(&mut state.rng).unwrap();
396 let scheduled_from_cx = state
397 .scheduled_from_foreground
398 .get_mut(&cx_id_to_run)
399 .unwrap();
400 let foreground_runnable = scheduled_from_cx.remove(0);
401 if scheduled_from_cx.is_empty() {
402 state.scheduled_from_foreground.remove(&cx_id_to_run);
403 }
404 state.push_to_history(ExecutorEvent::PollRunnable {
405 id: foreground_runnable.id,
406 });
407
408 drop(state);
409
410 foreground_runnable.runnable.run();
411 if let Some(main_task) = main_task.as_mut() {
412 if foreground_runnable.main {
413 if let Poll::Ready(result) = main_task.poll(&mut cx) {
414 return Some(result);
415 }
416 }
417 }
418 }
419 }
420 }
421
422 fn block<F, T>(&self, future: &mut F, max_ticks: usize) -> Option<T>
423 where
424 F: Unpin + Future<Output = T>,
425 {
426 use rand::prelude::*;
427
428 let unparker = self.parker.lock().unparker();
429 let waker = waker_fn::waker_fn(move || {
430 unparker.unpark();
431 });
432
433 let mut cx = Context::from_waker(&waker);
434 for _ in 0..max_ticks {
435 let mut state = self.state.lock();
436 let runnable_count = state.scheduled_from_background.len();
437 let ix = state.rng.gen_range(0..=runnable_count);
438 if ix < state.scheduled_from_background.len() {
439 let background_runnable = state.scheduled_from_background.remove(ix);
440 state.push_to_history(ExecutorEvent::PollRunnable {
441 id: background_runnable.id,
442 });
443 drop(state);
444 background_runnable.runnable.run();
445 } else {
446 drop(state);
447 if let Poll::Ready(result) = future.poll(&mut cx) {
448 return Some(result);
449 }
450 let mut state = self.state.lock();
451 if state.scheduled_from_background.is_empty() {
452 state.will_park();
453 drop(state);
454 self.parker.lock().park();
455 }
456
457 continue;
458 }
459 }
460
461 None
462 }
463
464 pub fn timer(&self, duration: Duration) -> Timer {
465 let (tx, rx) = postage::barrier::channel();
466 let mut state = self.state.lock();
467 let wakeup_at = state.now + duration;
468 let id = util::post_inc(&mut state.next_timer_id);
469 match state
470 .pending_timers
471 .binary_search_by_key(&wakeup_at, |e| e.1)
472 {
473 Ok(ix) | Err(ix) => state.pending_timers.insert(ix, (id, wakeup_at, tx)),
474 }
475 let state = self.state.clone();
476 Timer::Deterministic(DeterministicTimer { rx, id, state })
477 }
478
479 pub fn now(&self) -> std::time::Instant {
480 let state = self.state.lock();
481 state.now
482 }
483
484 pub fn advance_clock(&self, duration: Duration) {
485 let new_now = self.state.lock().now + duration;
486 loop {
487 self.run_until_parked();
488 let mut state = self.state.lock();
489
490 if let Some((_, wakeup_time, _)) = state.pending_timers.first() {
491 let wakeup_time = *wakeup_time;
492 if wakeup_time <= new_now {
493 let timer_count = state
494 .pending_timers
495 .iter()
496 .take_while(|(_, t, _)| *t == wakeup_time)
497 .count();
498 state.now = wakeup_time;
499 let timers_to_wake = state
500 .pending_timers
501 .drain(0..timer_count)
502 .collect::<Vec<_>>();
503 drop(state);
504 drop(timers_to_wake);
505 continue;
506 }
507 }
508
509 break;
510 }
511
512 self.state.lock().now = new_now;
513 }
514
515 pub fn start_waiting(&self) {
516 self.state.lock().waiting_backtrace = Some(backtrace::Backtrace::new_unresolved());
517 }
518
519 pub fn finish_waiting(&self) {
520 self.state.lock().waiting_backtrace.take();
521 }
522
523 pub fn forbid_parking(&self) {
524 use rand::prelude::*;
525
526 let mut state = self.state.lock();
527 state.forbid_parking = true;
528 state.rng = StdRng::seed_from_u64(state.seed);
529 }
530
531 pub fn allow_parking(&self) {
532 use rand::prelude::*;
533
534 let mut state = self.state.lock();
535 state.forbid_parking = false;
536 state.rng = StdRng::seed_from_u64(state.seed);
537 }
538
539 pub async fn simulate_random_delay(&self) {
540 use rand::prelude::*;
541 use smol::future::yield_now;
542 if self.state.lock().rng.gen_bool(0.2) {
543 let yields = self.state.lock().rng.gen_range(1..=10);
544 for _ in 0..yields {
545 yield_now().await;
546 }
547 }
548 }
549
550 pub fn record_backtrace(&self) {
551 let mut state = self.state.lock();
552 if state.enable_runnable_backtraces {
553 let current_id = state
554 .poll_history
555 .iter()
556 .rev()
557 .find_map(|event| match event {
558 ExecutorEvent::PollRunnable { id } => Some(*id),
559 _ => None,
560 });
561 if let Some(id) = current_id {
562 state
563 .runnable_backtraces
564 .insert(id, backtrace::Backtrace::new_unresolved());
565 }
566 }
567 }
568}
569
570impl Drop for Timer {
571 fn drop(&mut self) {
572 #[cfg(any(test, feature = "test"))]
573 if let Timer::Deterministic(DeterministicTimer { state, id, .. }) = self {
574 state
575 .lock()
576 .pending_timers
577 .retain(|(timer_id, _, _)| timer_id != id)
578 }
579 }
580}
581
582impl Future for Timer {
583 type Output = ();
584
585 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
586 match &mut *self {
587 #[cfg(any(test, feature = "test"))]
588 Self::Deterministic(DeterministicTimer { rx, .. }) => {
589 use postage::stream::{PollRecv, Stream as _};
590 smol::pin!(rx);
591 match rx.poll_recv(&mut postage::Context::from_waker(cx.waker())) {
592 PollRecv::Ready(()) | PollRecv::Closed => Poll::Ready(()),
593 PollRecv::Pending => Poll::Pending,
594 }
595 }
596 Self::Production(timer) => {
597 smol::pin!(timer);
598 match timer.poll(cx) {
599 Poll::Ready(_) => Poll::Ready(()),
600 Poll::Pending => Poll::Pending,
601 }
602 }
603 }
604 }
605}
606
607#[cfg(any(test, feature = "test"))]
608impl DeterministicState {
609 fn push_to_history(&mut self, event: ExecutorEvent) {
610 use std::fmt::Write as _;
611
612 self.poll_history.push(event);
613 if let Some(prev_history) = &self.previous_poll_history {
614 let ix = self.poll_history.len() - 1;
615 let prev_event = prev_history[ix];
616 if event != prev_event {
617 let mut message = String::new();
618 writeln!(
619 &mut message,
620 "current runnable backtrace:\n{:?}",
621 self.runnable_backtraces.get_mut(&event.id()).map(|trace| {
622 trace.resolve();
623 crate::util::CwdBacktrace(trace)
624 })
625 )
626 .unwrap();
627 writeln!(
628 &mut message,
629 "previous runnable backtrace:\n{:?}",
630 self.runnable_backtraces
631 .get_mut(&prev_event.id())
632 .map(|trace| {
633 trace.resolve();
634 util::CwdBacktrace(trace)
635 })
636 )
637 .unwrap();
638 panic!("detected non-determinism after {ix}. {message}");
639 }
640 }
641 }
642
643 fn will_park(&mut self) {
644 if self.forbid_parking {
645 let mut backtrace_message = String::new();
646 #[cfg(any(test, feature = "test"))]
647 if let Some(backtrace) = self.waiting_backtrace.as_mut() {
648 backtrace.resolve();
649 backtrace_message = format!(
650 "\nbacktrace of waiting future:\n{:?}",
651 util::CwdBacktrace(backtrace)
652 );
653 }
654
655 panic!(
656 "deterministic executor parked after a call to forbid_parking{}",
657 backtrace_message
658 );
659 }
660 }
661}
662
663#[cfg(any(test, feature = "test"))]
664impl ExecutorEvent {
665 pub fn id(&self) -> usize {
666 match self {
667 ExecutorEvent::PollRunnable { id } => *id,
668 ExecutorEvent::EnqueuRunnable { id } => *id,
669 }
670 }
671}
672
673impl ForegroundExecutor {
674 pub fn new(dispatcher: Arc<dyn PlatformDispatcher>) -> Result<Self> {
675 if dispatcher.is_main_thread() {
676 Ok(Self::Platform {
677 dispatcher,
678 _not_send_or_sync: PhantomData,
679 })
680 } else {
681 Err(anyhow!("must be constructed on main thread"))
682 }
683 }
684
685 pub fn spawn<T: 'static>(&self, future: impl Future<Output = T> + 'static) -> Task<T> {
686 let future = any_local_future(future);
687 let any_task = match self {
688 #[cfg(any(test, feature = "test"))]
689 Self::Deterministic { cx_id, executor } => {
690 executor.spawn_from_foreground(*cx_id, future, false)
691 }
692 Self::Platform { dispatcher, .. } => {
693 fn spawn_inner(
694 future: AnyLocalFuture,
695 dispatcher: &Arc<dyn PlatformDispatcher>,
696 ) -> AnyLocalTask {
697 let dispatcher = dispatcher.clone();
698 let schedule =
699 move |runnable: Runnable| dispatcher.run_on_main_thread(runnable);
700 let (runnable, task) = async_task::spawn_local(future, schedule);
701 runnable.schedule();
702 task
703 }
704 spawn_inner(future, dispatcher)
705 }
706 };
707 Task::local(any_task)
708 }
709
710 #[cfg(any(test, feature = "test"))]
711 pub fn run<T: 'static>(&self, future: impl Future<Output = T>) -> T {
712 let future = async move { Box::new(future.await) as Box<dyn Any> }.boxed_local();
713 let result = match self {
714 Self::Deterministic { cx_id, executor } => executor.run(*cx_id, future),
715 Self::Platform { .. } => panic!("you can't call run on a platform foreground executor"),
716 };
717 *result.downcast().unwrap()
718 }
719
720 #[cfg(any(test, feature = "test"))]
721 pub fn run_until_parked(&self) {
722 match self {
723 Self::Deterministic { executor, .. } => executor.run_until_parked(),
724 _ => panic!("this method can only be called on a deterministic executor"),
725 }
726 }
727
728 #[cfg(any(test, feature = "test"))]
729 pub fn parking_forbidden(&self) -> bool {
730 match self {
731 Self::Deterministic { executor, .. } => executor.state.lock().forbid_parking,
732 _ => panic!("this method can only be called on a deterministic executor"),
733 }
734 }
735
736 #[cfg(any(test, feature = "test"))]
737 pub fn start_waiting(&self) {
738 match self {
739 Self::Deterministic { executor, .. } => executor.start_waiting(),
740 _ => panic!("this method can only be called on a deterministic executor"),
741 }
742 }
743
744 #[cfg(any(test, feature = "test"))]
745 pub fn finish_waiting(&self) {
746 match self {
747 Self::Deterministic { executor, .. } => executor.finish_waiting(),
748 _ => panic!("this method can only be called on a deterministic executor"),
749 }
750 }
751
752 #[cfg(any(test, feature = "test"))]
753 pub fn forbid_parking(&self) {
754 match self {
755 Self::Deterministic { executor, .. } => executor.forbid_parking(),
756 _ => panic!("this method can only be called on a deterministic executor"),
757 }
758 }
759
760 #[cfg(any(test, feature = "test"))]
761 pub fn allow_parking(&self) {
762 match self {
763 Self::Deterministic { executor, .. } => executor.allow_parking(),
764 _ => panic!("this method can only be called on a deterministic executor"),
765 }
766 }
767
768 #[cfg(any(test, feature = "test"))]
769 pub fn advance_clock(&self, duration: Duration) {
770 match self {
771 Self::Deterministic { executor, .. } => executor.advance_clock(duration),
772 _ => panic!("this method can only be called on a deterministic executor"),
773 }
774 }
775
776 #[cfg(any(test, feature = "test"))]
777 pub fn set_block_on_ticks(&self, range: std::ops::RangeInclusive<usize>) {
778 match self {
779 Self::Deterministic { executor, .. } => executor.state.lock().block_on_ticks = range,
780 _ => panic!("this method can only be called on a deterministic executor"),
781 }
782 }
783}
784
785impl BackgroundExecutor {
786 pub fn new() -> Self {
787 let executor = Arc::new(Executor::new());
788 let stop = channel::unbounded::<()>();
789
790 for i in 0..2 * num_cpus::get() {
791 let executor = executor.clone();
792 let stop = stop.1.clone();
793 thread::Builder::new()
794 .name(format!("background-executor-{}", i))
795 .spawn(move || smol::block_on(executor.run(stop.recv())))
796 .unwrap();
797 }
798
799 Self::Production {
800 executor,
801 _stop: stop.0,
802 }
803 }
804
805 pub fn num_cpus(&self) -> usize {
806 num_cpus::get()
807 }
808
809 pub fn spawn<T, F>(&self, future: F) -> Task<T>
810 where
811 T: 'static + Send,
812 F: Send + Future<Output = T> + 'static,
813 {
814 let future = any_future(future);
815 let any_task = match self {
816 Self::Production { executor, .. } => executor.spawn(future),
817 #[cfg(any(test, feature = "test"))]
818 Self::Deterministic { executor } => executor.spawn(future),
819 };
820 Task::send(any_task)
821 }
822
823 pub fn block<F, T>(&self, future: F) -> T
824 where
825 F: Future<Output = T>,
826 {
827 smol::pin!(future);
828 match self {
829 Self::Production { .. } => smol::block_on(&mut future),
830 #[cfg(any(test, feature = "test"))]
831 Self::Deterministic { executor, .. } => {
832 executor.block(&mut future, usize::MAX).unwrap()
833 }
834 }
835 }
836
837 pub fn block_with_timeout<F, T>(
838 &self,
839 timeout: Duration,
840 future: F,
841 ) -> Result<T, impl Future<Output = T>>
842 where
843 T: 'static,
844 F: 'static + Unpin + Future<Output = T>,
845 {
846 let mut future = any_local_future(future);
847 if !timeout.is_zero() {
848 let output = match self {
849 Self::Production { .. } => smol::block_on(util::timeout(timeout, &mut future)).ok(),
850 #[cfg(any(test, feature = "test"))]
851 Self::Deterministic { executor, .. } => {
852 use rand::prelude::*;
853 let max_ticks = {
854 let mut state = executor.state.lock();
855 let range = state.block_on_ticks.clone();
856 state.rng.gen_range(range)
857 };
858 executor.block(&mut future, max_ticks)
859 }
860 };
861 if let Some(output) = output {
862 return Ok(*output.downcast().unwrap());
863 }
864 }
865 Err(async { *future.await.downcast().unwrap() })
866 }
867
868 pub async fn scoped<'scope, F>(self: &Arc<Self>, scheduler: F)
869 where
870 F: FnOnce(&mut Scope<'scope>),
871 {
872 let mut scope = Scope::new(self.clone());
873 (scheduler)(&mut scope);
874 let spawned = mem::take(&mut scope.futures)
875 .into_iter()
876 .map(|f| self.spawn(f))
877 .collect::<Vec<_>>();
878 for task in spawned {
879 task.await;
880 }
881 }
882
883 pub fn timer(&self, duration: Duration) -> Timer {
884 match self {
885 BackgroundExecutor::Production { .. } => {
886 Timer::Production(smol::Timer::after(duration))
887 }
888 #[cfg(any(test, feature = "test"))]
889 BackgroundExecutor::Deterministic { executor } => executor.timer(duration),
890 }
891 }
892
893 pub fn now(&self) -> std::time::Instant {
894 match self {
895 BackgroundExecutor::Production { .. } => std::time::Instant::now(),
896 #[cfg(any(test, feature = "test"))]
897 BackgroundExecutor::Deterministic { executor } => executor.now(),
898 }
899 }
900
901 #[cfg(any(test, feature = "test"))]
902 pub fn rng<'a>(&'a self) -> impl 'a + std::ops::DerefMut<Target = rand::prelude::StdRng> {
903 match self {
904 Self::Deterministic { executor, .. } => {
905 parking_lot::lock_api::MutexGuard::map(executor.state.lock(), |s| &mut s.rng)
906 }
907 _ => panic!("this method can only be called on a deterministic executor"),
908 }
909 }
910
911 #[cfg(any(test, feature = "test"))]
912 pub async fn simulate_random_delay(&self) {
913 match self {
914 Self::Deterministic { executor, .. } => {
915 executor.simulate_random_delay().await;
916 }
917 _ => {
918 panic!("this method can only be called on a deterministic executor")
919 }
920 }
921 }
922
923 #[cfg(any(test, feature = "test"))]
924 pub fn record_backtrace(&self) {
925 match self {
926 Self::Deterministic { executor, .. } => executor.record_backtrace(),
927 _ => {
928 panic!("this method can only be called on a deterministic executor")
929 }
930 }
931 }
932
933 #[cfg(any(test, feature = "test"))]
934 pub fn start_waiting(&self) {
935 match self {
936 Self::Deterministic { executor, .. } => executor.start_waiting(),
937 _ => panic!("this method can only be called on a deterministic executor"),
938 }
939 }
940}
941
942impl Default for BackgroundExecutor {
943 fn default() -> Self {
944 Self::new()
945 }
946}
947
948pub struct Scope<'a> {
949 executor: Arc<BackgroundExecutor>,
950 futures: Vec<Pin<Box<dyn Future<Output = ()> + Send + 'static>>>,
951 tx: Option<mpsc::Sender<()>>,
952 rx: mpsc::Receiver<()>,
953 _phantom: PhantomData<&'a ()>,
954}
955
956impl<'a> Scope<'a> {
957 fn new(executor: Arc<BackgroundExecutor>) -> Self {
958 let (tx, rx) = mpsc::channel(1);
959 Self {
960 executor,
961 tx: Some(tx),
962 rx,
963 futures: Default::default(),
964 _phantom: PhantomData,
965 }
966 }
967
968 pub fn spawn<F>(&mut self, f: F)
969 where
970 F: Future<Output = ()> + Send + 'a,
971 {
972 let tx = self.tx.clone().unwrap();
973
974 // Safety: The 'a lifetime is guaranteed to outlive any of these futures because
975 // dropping this `Scope` blocks until all of the futures have resolved.
976 let f = unsafe {
977 mem::transmute::<
978 Pin<Box<dyn Future<Output = ()> + Send + 'a>>,
979 Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
980 >(Box::pin(async move {
981 f.await;
982 drop(tx);
983 }))
984 };
985 self.futures.push(f);
986 }
987}
988
989impl<'a> Drop for Scope<'a> {
990 fn drop(&mut self) {
991 self.tx.take().unwrap();
992
993 // Wait until the channel is closed, which means that all of the spawned
994 // futures have resolved.
995 self.executor.block(self.rx.next());
996 }
997}
998
999impl<T> Task<T> {
1000 pub fn ready(value: T) -> Self {
1001 Self::Ready(Some(value))
1002 }
1003
1004 fn local(any_task: AnyLocalTask) -> Self {
1005 Self::Local {
1006 any_task,
1007 result_type: PhantomData,
1008 }
1009 }
1010
1011 pub fn detach(self) {
1012 match self {
1013 Task::Ready(_) => {}
1014 Task::Local { any_task, .. } => any_task.detach(),
1015 Task::Send { any_task, .. } => any_task.detach(),
1016 }
1017 }
1018}
1019
1020// impl<T: 'static, E: 'static + Display> Task<Result<T, E>> {
1021// #[track_caller]
1022// pub fn detach_and_log_err(self, cx: &mut AppContext) {
1023// let caller = Location::caller();
1024// cx.spawn(|_| async move {
1025// if let Err(err) = self.await {
1026// log::error!("{}:{}: {:#}", caller.file(), caller.line(), err);
1027// }
1028// })
1029// .detach();
1030// }
1031// }
1032
1033impl<T: Send> Task<T> {
1034 fn send(any_task: AnyTask) -> Self {
1035 Self::Send {
1036 any_task,
1037 result_type: PhantomData,
1038 }
1039 }
1040}
1041
1042impl<T: fmt::Debug> fmt::Debug for Task<T> {
1043 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1044 match self {
1045 Task::Ready(value) => value.fmt(f),
1046 Task::Local { any_task, .. } => any_task.fmt(f),
1047 Task::Send { any_task, .. } => any_task.fmt(f),
1048 }
1049 }
1050}
1051
1052impl<T: 'static> Future for Task<T> {
1053 type Output = T;
1054
1055 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1056 match unsafe { self.get_unchecked_mut() } {
1057 Task::Ready(value) => Poll::Ready(value.take().unwrap()),
1058 Task::Local { any_task, .. } => {
1059 any_task.poll(cx).map(|value| *value.downcast().unwrap())
1060 }
1061 Task::Send { any_task, .. } => {
1062 any_task.poll(cx).map(|value| *value.downcast().unwrap())
1063 }
1064 }
1065 }
1066}
1067
1068fn any_future<T, F>(future: F) -> AnyFuture
1069where
1070 T: 'static + Send,
1071 F: Future<Output = T> + Send + 'static,
1072{
1073 async { Box::new(future.await) as Box<dyn Any + Send> }.boxed()
1074}
1075
1076fn any_local_future<T, F>(future: F) -> AnyLocalFuture
1077where
1078 T: 'static,
1079 F: Future<Output = T> + 'static,
1080{
1081 async { Box::new(future.await) as Box<dyn Any> }.boxed_local()
1082}