1use crate::util;
2use crate::PlatformDispatcher;
3use anyhow::{anyhow, Result};
4use async_task::Runnable;
5use futures::channel::{mpsc, oneshot};
6use smol::{channel, prelude::*, Executor};
7use std::{
8 any::Any,
9 fmt::{self},
10 marker::PhantomData,
11 mem,
12 pin::Pin,
13 rc::Rc,
14 sync::Arc,
15 task::{Context, Poll},
16 thread,
17 time::Duration,
18};
19
20/// Enqueues the given closure to be run on the application's event loop. Can be
21/// called on any thread.
22pub(crate) fn spawn_on_main<R>(
23 dispatcher: Arc<dyn PlatformDispatcher>,
24 future: impl Future<Output = R> + Send + 'static,
25) -> impl Future<Output = R>
26where
27 R: Send + 'static,
28{
29 let (tx, rx) = oneshot::channel();
30 let (runnable, task) = async_task::spawn(
31 async move {
32 let result = future.await;
33 let _ = tx.send(result);
34 },
35 move |runnable| dispatcher.run_on_main_thread(runnable),
36 );
37 runnable.schedule();
38 task.detach();
39 async move { rx.await.unwrap() }
40}
41
42/// Enqueues the given closure to be run on the application's event loop. Must
43/// be called on the main thread.
44pub(crate) fn spawn_on_main_local<R>(
45 dispatcher: Arc<dyn PlatformDispatcher>,
46 future: impl Future<Output = R> + 'static,
47) -> impl Future<Output = R>
48where
49 R: 'static,
50{
51 assert!(dispatcher.is_main_thread(), "must be called on main thread");
52
53 let (tx, rx) = oneshot::channel();
54 let (runnable, task) = async_task::spawn_local(
55 async move {
56 let result = future.await;
57 let _ = tx.send(result);
58 },
59 move |runnable| dispatcher.run_on_main_thread(runnable),
60 );
61 runnable.schedule();
62 task.detach();
63 async move { rx.await.unwrap() }
64}
65
66pub enum ForegroundExecutor {
67 Platform {
68 dispatcher: Arc<dyn PlatformDispatcher>,
69 _not_send_or_sync: PhantomData<Rc<()>>,
70 },
71 #[cfg(any(test, feature = "test"))]
72 Deterministic {
73 cx_id: usize,
74 executor: Arc<Deterministic>,
75 },
76}
77
78pub enum BackgroundExecutor {
79 #[cfg(any(test, feature = "test"))]
80 Deterministic { executor: Arc<Deterministic> },
81 Production {
82 executor: Arc<smol::Executor<'static>>,
83 _stop: channel::Sender<()>,
84 },
85}
86
87type AnyLocalFuture = Pin<Box<dyn 'static + Future<Output = Box<dyn Any + 'static>>>>;
88type AnyFuture = Pin<Box<dyn 'static + Send + Future<Output = Box<dyn Any + Send + 'static>>>>;
89type AnyTask = async_task::Task<Box<dyn Any + Send + 'static>>;
90type AnyLocalTask = async_task::Task<Box<dyn Any + 'static>>;
91
92#[must_use]
93pub enum Task<T> {
94 Ready(Option<T>),
95 Local {
96 any_task: AnyLocalTask,
97 result_type: PhantomData<T>,
98 },
99 Send {
100 any_task: AnyTask,
101 result_type: PhantomData<T>,
102 },
103}
104
105unsafe impl<T: Send> Send for Task<T> {}
106
107#[cfg(any(test, feature = "test"))]
108struct DeterministicState {
109 rng: rand::prelude::StdRng,
110 seed: u64,
111 scheduled_from_foreground: collections::HashMap<usize, Vec<ForegroundRunnable>>,
112 scheduled_from_background: Vec<BackgroundRunnable>,
113 forbid_parking: bool,
114 block_on_ticks: std::ops::RangeInclusive<usize>,
115 now: std::time::Instant,
116 next_timer_id: usize,
117 pending_timers: Vec<(usize, std::time::Instant, postage::barrier::Sender)>,
118 waiting_backtrace: Option<backtrace::Backtrace>,
119 next_runnable_id: usize,
120 poll_history: Vec<ExecutorEvent>,
121 previous_poll_history: Option<Vec<ExecutorEvent>>,
122 enable_runnable_backtraces: bool,
123 runnable_backtraces: collections::HashMap<usize, backtrace::Backtrace>,
124}
125
126#[derive(Copy, Clone, Debug, PartialEq, Eq)]
127pub enum ExecutorEvent {
128 PollRunnable { id: usize },
129 EnqueuRunnable { id: usize },
130}
131
132#[cfg(any(test, feature = "test"))]
133struct ForegroundRunnable {
134 id: usize,
135 runnable: Runnable,
136 main: bool,
137}
138
139#[cfg(any(test, feature = "test"))]
140struct BackgroundRunnable {
141 id: usize,
142 runnable: Runnable,
143}
144
145#[cfg(any(test, feature = "test"))]
146pub struct Deterministic {
147 state: Arc<parking_lot::Mutex<DeterministicState>>,
148 parker: parking_lot::Mutex<parking::Parker>,
149}
150
151#[must_use]
152pub enum Timer {
153 Production(smol::Timer),
154 #[cfg(any(test, feature = "test"))]
155 Deterministic(DeterministicTimer),
156}
157
158#[cfg(any(test, feature = "test"))]
159pub struct DeterministicTimer {
160 rx: postage::barrier::Receiver,
161 id: usize,
162 state: Arc<parking_lot::Mutex<DeterministicState>>,
163}
164
165#[cfg(any(test, feature = "test"))]
166impl Deterministic {
167 pub fn new(seed: u64) -> Arc<Self> {
168 use rand::prelude::*;
169
170 Arc::new(Self {
171 state: Arc::new(parking_lot::Mutex::new(DeterministicState {
172 rng: StdRng::seed_from_u64(seed),
173 seed,
174 scheduled_from_foreground: Default::default(),
175 scheduled_from_background: Default::default(),
176 forbid_parking: false,
177 block_on_ticks: 0..=1000,
178 now: std::time::Instant::now(),
179 next_timer_id: Default::default(),
180 pending_timers: Default::default(),
181 waiting_backtrace: None,
182 next_runnable_id: 0,
183 poll_history: Default::default(),
184 previous_poll_history: Default::default(),
185 enable_runnable_backtraces: false,
186 runnable_backtraces: Default::default(),
187 })),
188 parker: Default::default(),
189 })
190 }
191
192 pub fn execution_history(&self) -> Vec<ExecutorEvent> {
193 self.state.lock().poll_history.clone()
194 }
195
196 pub fn set_previous_execution_history(&self, history: Option<Vec<ExecutorEvent>>) {
197 self.state.lock().previous_poll_history = history;
198 }
199
200 pub fn enable_runnable_backtrace(&self) {
201 self.state.lock().enable_runnable_backtraces = true;
202 }
203
204 pub fn runnable_backtrace(&self, runnable_id: usize) -> backtrace::Backtrace {
205 let mut backtrace = self.state.lock().runnable_backtraces[&runnable_id].clone();
206 backtrace.resolve();
207 backtrace
208 }
209
210 pub fn build_background(self: &Arc<Self>) -> Arc<BackgroundExecutor> {
211 Arc::new(BackgroundExecutor::Deterministic {
212 executor: self.clone(),
213 })
214 }
215
216 pub fn build_foreground(self: &Arc<Self>, id: usize) -> Rc<ForegroundExecutor> {
217 Rc::new(ForegroundExecutor::Deterministic {
218 cx_id: id,
219 executor: self.clone(),
220 })
221 }
222
223 fn spawn_from_foreground(
224 &self,
225 cx_id: usize,
226 future: AnyLocalFuture,
227 main: bool,
228 ) -> AnyLocalTask {
229 let state = self.state.clone();
230 let id;
231 {
232 let mut state = state.lock();
233 id = util::post_inc(&mut state.next_runnable_id);
234 if state.enable_runnable_backtraces {
235 state
236 .runnable_backtraces
237 .insert(id, backtrace::Backtrace::new_unresolved());
238 }
239 }
240
241 let unparker = self.parker.lock().unparker();
242 let (runnable, task) = async_task::spawn_local(future, move |runnable| {
243 let mut state = state.lock();
244 state.push_to_history(ExecutorEvent::EnqueuRunnable { id });
245 state
246 .scheduled_from_foreground
247 .entry(cx_id)
248 .or_default()
249 .push(ForegroundRunnable { id, runnable, main });
250 unparker.unpark();
251 });
252 runnable.schedule();
253 task
254 }
255
256 fn spawn(&self, future: AnyFuture) -> AnyTask {
257 let state = self.state.clone();
258 let id;
259 {
260 let mut state = state.lock();
261 id = util::post_inc(&mut state.next_runnable_id);
262 if state.enable_runnable_backtraces {
263 state
264 .runnable_backtraces
265 .insert(id, backtrace::Backtrace::new_unresolved());
266 }
267 }
268
269 let unparker = self.parker.lock().unparker();
270 let (runnable, task) = async_task::spawn(future, move |runnable| {
271 let mut state = state.lock();
272 state
273 .poll_history
274 .push(ExecutorEvent::EnqueuRunnable { id });
275 state
276 .scheduled_from_background
277 .push(BackgroundRunnable { id, runnable });
278 unparker.unpark();
279 });
280 runnable.schedule();
281 task
282 }
283
284 fn run<'a>(
285 &self,
286 cx_id: usize,
287 main_future: Pin<Box<dyn 'a + Future<Output = Box<dyn Any>>>>,
288 ) -> Box<dyn Any> {
289 use std::sync::atomic::{AtomicBool, Ordering::SeqCst};
290
291 let woken = Arc::new(AtomicBool::new(false));
292
293 let state = self.state.clone();
294 let id;
295 {
296 let mut state = state.lock();
297 id = util::post_inc(&mut state.next_runnable_id);
298 if state.enable_runnable_backtraces {
299 state
300 .runnable_backtraces
301 .insert(id, backtrace::Backtrace::new_unresolved());
302 }
303 }
304
305 let unparker = self.parker.lock().unparker();
306 let (runnable, mut main_task) = unsafe {
307 async_task::spawn_unchecked(main_future, move |runnable| {
308 let state = &mut *state.lock();
309 state
310 .scheduled_from_foreground
311 .entry(cx_id)
312 .or_default()
313 .push(ForegroundRunnable {
314 id: util::post_inc(&mut state.next_runnable_id),
315 runnable,
316 main: true,
317 });
318 unparker.unpark();
319 })
320 };
321 runnable.schedule();
322
323 loop {
324 if let Some(result) = self.run_internal(woken.clone(), Some(&mut main_task)) {
325 return result;
326 }
327
328 if !woken.load(SeqCst) {
329 self.state.lock().will_park();
330 }
331
332 woken.store(false, SeqCst);
333 self.parker.lock().park();
334 }
335 }
336
337 pub fn run_until_parked(&self) {
338 use std::sync::atomic::AtomicBool;
339 let woken = Arc::new(AtomicBool::new(false));
340 self.run_internal(woken, None);
341 }
342
343 fn run_internal(
344 &self,
345 woken: Arc<std::sync::atomic::AtomicBool>,
346 mut main_task: Option<&mut AnyLocalTask>,
347 ) -> Option<Box<dyn Any>> {
348 use rand::prelude::*;
349 use std::sync::atomic::Ordering::SeqCst;
350
351 let unparker = self.parker.lock().unparker();
352 let waker = waker_fn::waker_fn(move || {
353 woken.store(true, SeqCst);
354 unparker.unpark();
355 });
356
357 let mut cx = Context::from_waker(&waker);
358 loop {
359 let mut state = self.state.lock();
360
361 if state.scheduled_from_foreground.is_empty()
362 && state.scheduled_from_background.is_empty()
363 {
364 if let Some(main_task) = main_task {
365 if let Poll::Ready(result) = main_task.poll(&mut cx) {
366 return Some(result);
367 }
368 }
369
370 return None;
371 }
372
373 if !state.scheduled_from_background.is_empty() && state.rng.gen() {
374 let background_len = state.scheduled_from_background.len();
375 let ix = state.rng.gen_range(0..background_len);
376 let background_runnable = state.scheduled_from_background.remove(ix);
377 state.push_to_history(ExecutorEvent::PollRunnable {
378 id: background_runnable.id,
379 });
380 drop(state);
381 background_runnable.runnable.run();
382 } else if !state.scheduled_from_foreground.is_empty() {
383 let available_cx_ids = state
384 .scheduled_from_foreground
385 .keys()
386 .copied()
387 .collect::<Vec<_>>();
388 let cx_id_to_run = *available_cx_ids.iter().choose(&mut state.rng).unwrap();
389 let scheduled_from_cx = state
390 .scheduled_from_foreground
391 .get_mut(&cx_id_to_run)
392 .unwrap();
393 let foreground_runnable = scheduled_from_cx.remove(0);
394 if scheduled_from_cx.is_empty() {
395 state.scheduled_from_foreground.remove(&cx_id_to_run);
396 }
397 state.push_to_history(ExecutorEvent::PollRunnable {
398 id: foreground_runnable.id,
399 });
400
401 drop(state);
402
403 foreground_runnable.runnable.run();
404 if let Some(main_task) = main_task.as_mut() {
405 if foreground_runnable.main {
406 if let Poll::Ready(result) = main_task.poll(&mut cx) {
407 return Some(result);
408 }
409 }
410 }
411 }
412 }
413 }
414
415 fn block<F, T>(&self, future: &mut F, max_ticks: usize) -> Option<T>
416 where
417 F: Unpin + Future<Output = T>,
418 {
419 use rand::prelude::*;
420
421 let unparker = self.parker.lock().unparker();
422 let waker = waker_fn::waker_fn(move || {
423 unparker.unpark();
424 });
425
426 let mut cx = Context::from_waker(&waker);
427 for _ in 0..max_ticks {
428 let mut state = self.state.lock();
429 let runnable_count = state.scheduled_from_background.len();
430 let ix = state.rng.gen_range(0..=runnable_count);
431 if ix < state.scheduled_from_background.len() {
432 let background_runnable = state.scheduled_from_background.remove(ix);
433 state.push_to_history(ExecutorEvent::PollRunnable {
434 id: background_runnable.id,
435 });
436 drop(state);
437 background_runnable.runnable.run();
438 } else {
439 drop(state);
440 if let Poll::Ready(result) = future.poll(&mut cx) {
441 return Some(result);
442 }
443 let mut state = self.state.lock();
444 if state.scheduled_from_background.is_empty() {
445 state.will_park();
446 drop(state);
447 self.parker.lock().park();
448 }
449
450 continue;
451 }
452 }
453
454 None
455 }
456
457 pub fn timer(&self, duration: Duration) -> Timer {
458 let (tx, rx) = postage::barrier::channel();
459 let mut state = self.state.lock();
460 let wakeup_at = state.now + duration;
461 let id = util::post_inc(&mut state.next_timer_id);
462 match state
463 .pending_timers
464 .binary_search_by_key(&wakeup_at, |e| e.1)
465 {
466 Ok(ix) | Err(ix) => state.pending_timers.insert(ix, (id, wakeup_at, tx)),
467 }
468 let state = self.state.clone();
469 Timer::Deterministic(DeterministicTimer { rx, id, state })
470 }
471
472 pub fn now(&self) -> std::time::Instant {
473 let state = self.state.lock();
474 state.now
475 }
476
477 pub fn advance_clock(&self, duration: Duration) {
478 let new_now = self.state.lock().now + duration;
479 loop {
480 self.run_until_parked();
481 let mut state = self.state.lock();
482
483 if let Some((_, wakeup_time, _)) = state.pending_timers.first() {
484 let wakeup_time = *wakeup_time;
485 if wakeup_time <= new_now {
486 let timer_count = state
487 .pending_timers
488 .iter()
489 .take_while(|(_, t, _)| *t == wakeup_time)
490 .count();
491 state.now = wakeup_time;
492 let timers_to_wake = state
493 .pending_timers
494 .drain(0..timer_count)
495 .collect::<Vec<_>>();
496 drop(state);
497 drop(timers_to_wake);
498 continue;
499 }
500 }
501
502 break;
503 }
504
505 self.state.lock().now = new_now;
506 }
507
508 pub fn start_waiting(&self) {
509 self.state.lock().waiting_backtrace = Some(backtrace::Backtrace::new_unresolved());
510 }
511
512 pub fn finish_waiting(&self) {
513 self.state.lock().waiting_backtrace.take();
514 }
515
516 pub fn forbid_parking(&self) {
517 use rand::prelude::*;
518
519 let mut state = self.state.lock();
520 state.forbid_parking = true;
521 state.rng = StdRng::seed_from_u64(state.seed);
522 }
523
524 pub fn allow_parking(&self) {
525 use rand::prelude::*;
526
527 let mut state = self.state.lock();
528 state.forbid_parking = false;
529 state.rng = StdRng::seed_from_u64(state.seed);
530 }
531
532 pub async fn simulate_random_delay(&self) {
533 use rand::prelude::*;
534 use smol::future::yield_now;
535 if self.state.lock().rng.gen_bool(0.2) {
536 let yields = self.state.lock().rng.gen_range(1..=10);
537 for _ in 0..yields {
538 yield_now().await;
539 }
540 }
541 }
542
543 pub fn record_backtrace(&self) {
544 let mut state = self.state.lock();
545 if state.enable_runnable_backtraces {
546 let current_id = state
547 .poll_history
548 .iter()
549 .rev()
550 .find_map(|event| match event {
551 ExecutorEvent::PollRunnable { id } => Some(*id),
552 _ => None,
553 });
554 if let Some(id) = current_id {
555 state
556 .runnable_backtraces
557 .insert(id, backtrace::Backtrace::new_unresolved());
558 }
559 }
560 }
561}
562
563impl Drop for Timer {
564 fn drop(&mut self) {
565 #[cfg(any(test, feature = "test"))]
566 if let Timer::Deterministic(DeterministicTimer { state, id, .. }) = self {
567 state
568 .lock()
569 .pending_timers
570 .retain(|(timer_id, _, _)| timer_id != id)
571 }
572 }
573}
574
575impl Future for Timer {
576 type Output = ();
577
578 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
579 match &mut *self {
580 #[cfg(any(test, feature = "test"))]
581 Self::Deterministic(DeterministicTimer { rx, .. }) => {
582 use postage::stream::{PollRecv, Stream as _};
583 smol::pin!(rx);
584 match rx.poll_recv(&mut postage::Context::from_waker(cx.waker())) {
585 PollRecv::Ready(()) | PollRecv::Closed => Poll::Ready(()),
586 PollRecv::Pending => Poll::Pending,
587 }
588 }
589 Self::Production(timer) => {
590 smol::pin!(timer);
591 match timer.poll(cx) {
592 Poll::Ready(_) => Poll::Ready(()),
593 Poll::Pending => Poll::Pending,
594 }
595 }
596 }
597 }
598}
599
600#[cfg(any(test, feature = "test"))]
601impl DeterministicState {
602 fn push_to_history(&mut self, event: ExecutorEvent) {
603 use std::fmt::Write as _;
604
605 self.poll_history.push(event);
606 if let Some(prev_history) = &self.previous_poll_history {
607 let ix = self.poll_history.len() - 1;
608 let prev_event = prev_history[ix];
609 if event != prev_event {
610 let mut message = String::new();
611 writeln!(
612 &mut message,
613 "current runnable backtrace:\n{:?}",
614 self.runnable_backtraces.get_mut(&event.id()).map(|trace| {
615 trace.resolve();
616 crate::util::CwdBacktrace(trace)
617 })
618 )
619 .unwrap();
620 writeln!(
621 &mut message,
622 "previous runnable backtrace:\n{:?}",
623 self.runnable_backtraces
624 .get_mut(&prev_event.id())
625 .map(|trace| {
626 trace.resolve();
627 util::CwdBacktrace(trace)
628 })
629 )
630 .unwrap();
631 panic!("detected non-determinism after {ix}. {message}");
632 }
633 }
634 }
635
636 fn will_park(&mut self) {
637 if self.forbid_parking {
638 let mut backtrace_message = String::new();
639 #[cfg(any(test, feature = "test"))]
640 if let Some(backtrace) = self.waiting_backtrace.as_mut() {
641 backtrace.resolve();
642 backtrace_message = format!(
643 "\nbacktrace of waiting future:\n{:?}",
644 util::CwdBacktrace(backtrace)
645 );
646 }
647
648 panic!(
649 "deterministic executor parked after a call to forbid_parking{}",
650 backtrace_message
651 );
652 }
653 }
654}
655
656#[cfg(any(test, feature = "test"))]
657impl ExecutorEvent {
658 pub fn id(&self) -> usize {
659 match self {
660 ExecutorEvent::PollRunnable { id } => *id,
661 ExecutorEvent::EnqueuRunnable { id } => *id,
662 }
663 }
664}
665
666impl ForegroundExecutor {
667 pub fn new(dispatcher: Arc<dyn PlatformDispatcher>) -> Result<Self> {
668 if dispatcher.is_main_thread() {
669 Ok(Self::Platform {
670 dispatcher,
671 _not_send_or_sync: PhantomData,
672 })
673 } else {
674 Err(anyhow!("must be constructed on main thread"))
675 }
676 }
677
678 pub fn spawn<T: 'static>(&self, future: impl Future<Output = T> + 'static) -> Task<T> {
679 let future = any_local_future(future);
680 let any_task = match self {
681 #[cfg(any(test, feature = "test"))]
682 Self::Deterministic { cx_id, executor } => {
683 executor.spawn_from_foreground(*cx_id, future, false)
684 }
685 Self::Platform { dispatcher, .. } => {
686 fn spawn_inner(
687 future: AnyLocalFuture,
688 dispatcher: &Arc<dyn PlatformDispatcher>,
689 ) -> AnyLocalTask {
690 let dispatcher = dispatcher.clone();
691 let schedule =
692 move |runnable: Runnable| dispatcher.run_on_main_thread(runnable);
693 let (runnable, task) = async_task::spawn_local(future, schedule);
694 runnable.schedule();
695 task
696 }
697 spawn_inner(future, dispatcher)
698 }
699 };
700 Task::local(any_task)
701 }
702
703 #[cfg(any(test, feature = "test"))]
704 pub fn run<T: 'static>(&self, future: impl Future<Output = T>) -> T {
705 let future = async move { Box::new(future.await) as Box<dyn Any> }.boxed_local();
706 let result = match self {
707 Self::Deterministic { cx_id, executor } => executor.run(*cx_id, future),
708 Self::Platform { .. } => panic!("you can't call run on a platform foreground executor"),
709 };
710 *result.downcast().unwrap()
711 }
712
713 #[cfg(any(test, feature = "test"))]
714 pub fn run_until_parked(&self) {
715 match self {
716 Self::Deterministic { executor, .. } => executor.run_until_parked(),
717 _ => panic!("this method can only be called on a deterministic executor"),
718 }
719 }
720
721 #[cfg(any(test, feature = "test"))]
722 pub fn parking_forbidden(&self) -> bool {
723 match self {
724 Self::Deterministic { executor, .. } => executor.state.lock().forbid_parking,
725 _ => panic!("this method can only be called on a deterministic executor"),
726 }
727 }
728
729 #[cfg(any(test, feature = "test"))]
730 pub fn start_waiting(&self) {
731 match self {
732 Self::Deterministic { executor, .. } => executor.start_waiting(),
733 _ => panic!("this method can only be called on a deterministic executor"),
734 }
735 }
736
737 #[cfg(any(test, feature = "test"))]
738 pub fn finish_waiting(&self) {
739 match self {
740 Self::Deterministic { executor, .. } => executor.finish_waiting(),
741 _ => panic!("this method can only be called on a deterministic executor"),
742 }
743 }
744
745 #[cfg(any(test, feature = "test"))]
746 pub fn forbid_parking(&self) {
747 match self {
748 Self::Deterministic { executor, .. } => executor.forbid_parking(),
749 _ => panic!("this method can only be called on a deterministic executor"),
750 }
751 }
752
753 #[cfg(any(test, feature = "test"))]
754 pub fn allow_parking(&self) {
755 match self {
756 Self::Deterministic { executor, .. } => executor.allow_parking(),
757 _ => panic!("this method can only be called on a deterministic executor"),
758 }
759 }
760
761 #[cfg(any(test, feature = "test"))]
762 pub fn advance_clock(&self, duration: Duration) {
763 match self {
764 Self::Deterministic { executor, .. } => executor.advance_clock(duration),
765 _ => panic!("this method can only be called on a deterministic executor"),
766 }
767 }
768
769 #[cfg(any(test, feature = "test"))]
770 pub fn set_block_on_ticks(&self, range: std::ops::RangeInclusive<usize>) {
771 match self {
772 Self::Deterministic { executor, .. } => executor.state.lock().block_on_ticks = range,
773 _ => panic!("this method can only be called on a deterministic executor"),
774 }
775 }
776}
777
778impl BackgroundExecutor {
779 pub fn new() -> Self {
780 let executor = Arc::new(Executor::new());
781 let stop = channel::unbounded::<()>();
782
783 for i in 0..2 * num_cpus::get() {
784 let executor = executor.clone();
785 let stop = stop.1.clone();
786 thread::Builder::new()
787 .name(format!("background-executor-{}", i))
788 .spawn(move || smol::block_on(executor.run(stop.recv())))
789 .unwrap();
790 }
791
792 Self::Production {
793 executor,
794 _stop: stop.0,
795 }
796 }
797
798 pub fn num_cpus(&self) -> usize {
799 num_cpus::get()
800 }
801
802 pub fn spawn<T, F>(&self, future: F) -> Task<T>
803 where
804 T: 'static + Send,
805 F: Send + Future<Output = T> + 'static,
806 {
807 let future = any_future(future);
808 let any_task = match self {
809 Self::Production { executor, .. } => executor.spawn(future),
810 #[cfg(any(test, feature = "test"))]
811 Self::Deterministic { executor } => executor.spawn(future),
812 };
813 Task::send(any_task)
814 }
815
816 pub fn block<F, T>(&self, future: F) -> T
817 where
818 F: Future<Output = T>,
819 {
820 smol::pin!(future);
821 match self {
822 Self::Production { .. } => smol::block_on(&mut future),
823 #[cfg(any(test, feature = "test"))]
824 Self::Deterministic { executor, .. } => {
825 executor.block(&mut future, usize::MAX).unwrap()
826 }
827 }
828 }
829
830 pub fn block_with_timeout<F, T>(
831 &self,
832 timeout: Duration,
833 future: F,
834 ) -> Result<T, impl Future<Output = T>>
835 where
836 T: 'static,
837 F: 'static + Unpin + Future<Output = T>,
838 {
839 let mut future = any_local_future(future);
840 if !timeout.is_zero() {
841 let output = match self {
842 Self::Production { .. } => smol::block_on(util::timeout(timeout, &mut future)).ok(),
843 #[cfg(any(test, feature = "test"))]
844 Self::Deterministic { executor, .. } => {
845 use rand::prelude::*;
846 let max_ticks = {
847 let mut state = executor.state.lock();
848 let range = state.block_on_ticks.clone();
849 state.rng.gen_range(range)
850 };
851 executor.block(&mut future, max_ticks)
852 }
853 };
854 if let Some(output) = output {
855 return Ok(*output.downcast().unwrap());
856 }
857 }
858 Err(async { *future.await.downcast().unwrap() })
859 }
860
861 pub async fn scoped<'scope, F>(self: &Arc<Self>, scheduler: F)
862 where
863 F: FnOnce(&mut Scope<'scope>),
864 {
865 let mut scope = Scope::new(self.clone());
866 (scheduler)(&mut scope);
867 let spawned = mem::take(&mut scope.futures)
868 .into_iter()
869 .map(|f| self.spawn(f))
870 .collect::<Vec<_>>();
871 for task in spawned {
872 task.await;
873 }
874 }
875
876 pub fn timer(&self, duration: Duration) -> Timer {
877 match self {
878 BackgroundExecutor::Production { .. } => {
879 Timer::Production(smol::Timer::after(duration))
880 }
881 #[cfg(any(test, feature = "test"))]
882 BackgroundExecutor::Deterministic { executor } => executor.timer(duration),
883 }
884 }
885
886 pub fn now(&self) -> std::time::Instant {
887 match self {
888 BackgroundExecutor::Production { .. } => std::time::Instant::now(),
889 #[cfg(any(test, feature = "test"))]
890 BackgroundExecutor::Deterministic { executor } => executor.now(),
891 }
892 }
893
894 #[cfg(any(test, feature = "test"))]
895 pub fn rng<'a>(&'a self) -> impl 'a + std::ops::DerefMut<Target = rand::prelude::StdRng> {
896 match self {
897 Self::Deterministic { executor, .. } => {
898 parking_lot::lock_api::MutexGuard::map(executor.state.lock(), |s| &mut s.rng)
899 }
900 _ => panic!("this method can only be called on a deterministic executor"),
901 }
902 }
903
904 #[cfg(any(test, feature = "test"))]
905 pub async fn simulate_random_delay(&self) {
906 match self {
907 Self::Deterministic { executor, .. } => {
908 executor.simulate_random_delay().await;
909 }
910 _ => {
911 panic!("this method can only be called on a deterministic executor")
912 }
913 }
914 }
915
916 #[cfg(any(test, feature = "test"))]
917 pub fn record_backtrace(&self) {
918 match self {
919 Self::Deterministic { executor, .. } => executor.record_backtrace(),
920 _ => {
921 panic!("this method can only be called on a deterministic executor")
922 }
923 }
924 }
925
926 #[cfg(any(test, feature = "test"))]
927 pub fn start_waiting(&self) {
928 match self {
929 Self::Deterministic { executor, .. } => executor.start_waiting(),
930 _ => panic!("this method can only be called on a deterministic executor"),
931 }
932 }
933}
934
935impl Default for BackgroundExecutor {
936 fn default() -> Self {
937 Self::new()
938 }
939}
940
941pub struct Scope<'a> {
942 executor: Arc<BackgroundExecutor>,
943 futures: Vec<Pin<Box<dyn Future<Output = ()> + Send + 'static>>>,
944 tx: Option<mpsc::Sender<()>>,
945 rx: mpsc::Receiver<()>,
946 _phantom: PhantomData<&'a ()>,
947}
948
949impl<'a> Scope<'a> {
950 fn new(executor: Arc<BackgroundExecutor>) -> Self {
951 let (tx, rx) = mpsc::channel(1);
952 Self {
953 executor,
954 tx: Some(tx),
955 rx,
956 futures: Default::default(),
957 _phantom: PhantomData,
958 }
959 }
960
961 pub fn spawn<F>(&mut self, f: F)
962 where
963 F: Future<Output = ()> + Send + 'a,
964 {
965 let tx = self.tx.clone().unwrap();
966
967 // Safety: The 'a lifetime is guaranteed to outlive any of these futures because
968 // dropping this `Scope` blocks until all of the futures have resolved.
969 let f = unsafe {
970 mem::transmute::<
971 Pin<Box<dyn Future<Output = ()> + Send + 'a>>,
972 Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
973 >(Box::pin(async move {
974 f.await;
975 drop(tx);
976 }))
977 };
978 self.futures.push(f);
979 }
980}
981
982impl<'a> Drop for Scope<'a> {
983 fn drop(&mut self) {
984 self.tx.take().unwrap();
985
986 // Wait until the channel is closed, which means that all of the spawned
987 // futures have resolved.
988 self.executor.block(self.rx.next());
989 }
990}
991
992impl<T> Task<T> {
993 pub fn ready(value: T) -> Self {
994 Self::Ready(Some(value))
995 }
996
997 fn local(any_task: AnyLocalTask) -> Self {
998 Self::Local {
999 any_task,
1000 result_type: PhantomData,
1001 }
1002 }
1003
1004 pub fn detach(self) {
1005 match self {
1006 Task::Ready(_) => {}
1007 Task::Local { any_task, .. } => any_task.detach(),
1008 Task::Send { any_task, .. } => any_task.detach(),
1009 }
1010 }
1011}
1012
1013// impl<T: 'static, E: 'static + Display> Task<Result<T, E>> {
1014// #[track_caller]
1015// pub fn detach_and_log_err(self, cx: &mut AppContext) {
1016// let caller = Location::caller();
1017// cx.spawn(|_| async move {
1018// if let Err(err) = self.await {
1019// log::error!("{}:{}: {:#}", caller.file(), caller.line(), err);
1020// }
1021// })
1022// .detach();
1023// }
1024// }
1025
1026impl<T: Send> Task<T> {
1027 fn send(any_task: AnyTask) -> Self {
1028 Self::Send {
1029 any_task,
1030 result_type: PhantomData,
1031 }
1032 }
1033}
1034
1035impl<T: fmt::Debug> fmt::Debug for Task<T> {
1036 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1037 match self {
1038 Task::Ready(value) => value.fmt(f),
1039 Task::Local { any_task, .. } => any_task.fmt(f),
1040 Task::Send { any_task, .. } => any_task.fmt(f),
1041 }
1042 }
1043}
1044
1045impl<T: 'static> Future for Task<T> {
1046 type Output = T;
1047
1048 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1049 match unsafe { self.get_unchecked_mut() } {
1050 Task::Ready(value) => Poll::Ready(value.take().unwrap()),
1051 Task::Local { any_task, .. } => {
1052 any_task.poll(cx).map(|value| *value.downcast().unwrap())
1053 }
1054 Task::Send { any_task, .. } => {
1055 any_task.poll(cx).map(|value| *value.downcast().unwrap())
1056 }
1057 }
1058 }
1059}
1060
1061fn any_future<T, F>(future: F) -> AnyFuture
1062where
1063 T: 'static + Send,
1064 F: Future<Output = T> + Send + 'static,
1065{
1066 async { Box::new(future.await) as Box<dyn Any + Send> }.boxed()
1067}
1068
1069fn any_local_future<T, F>(future: F) -> AnyLocalFuture
1070where
1071 T: 'static,
1072 F: Future<Output = T> + 'static,
1073{
1074 async { Box::new(future.await) as Box<dyn Any> }.boxed_local()
1075}