1use anyhow::{anyhow, Result};
2use async_task::Runnable;
3use smol::{channel, prelude::*, Executor};
4use std::{
5 any::Any,
6 fmt::{self, Display},
7 marker::PhantomData,
8 mem,
9 pin::Pin,
10 rc::Rc,
11 sync::Arc,
12 task::{Context, Poll},
13 thread,
14 time::Duration,
15};
16
17use crate::{
18 platform::{self, Dispatcher},
19 util, MutableAppContext,
20};
21
22pub enum Foreground {
23 Platform {
24 dispatcher: Arc<dyn platform::Dispatcher>,
25 _not_send_or_sync: PhantomData<Rc<()>>,
26 },
27 #[cfg(any(test, feature = "test-support"))]
28 Deterministic {
29 cx_id: usize,
30 executor: Arc<Deterministic>,
31 },
32}
33
34pub enum Background {
35 #[cfg(any(test, feature = "test-support"))]
36 Deterministic { executor: Arc<Deterministic> },
37 Production {
38 executor: Arc<smol::Executor<'static>>,
39 _stop: channel::Sender<()>,
40 },
41}
42
43type AnyLocalFuture = Pin<Box<dyn 'static + Future<Output = Box<dyn Any + 'static>>>>;
44type AnyFuture = Pin<Box<dyn 'static + Send + Future<Output = Box<dyn Any + Send + 'static>>>>;
45type AnyTask = async_task::Task<Box<dyn Any + Send + 'static>>;
46type AnyLocalTask = async_task::Task<Box<dyn Any + 'static>>;
47
48#[must_use]
49pub enum Task<T> {
50 Ready(Option<T>),
51 Local {
52 any_task: AnyLocalTask,
53 result_type: PhantomData<T>,
54 },
55 Send {
56 any_task: AnyTask,
57 result_type: PhantomData<T>,
58 },
59}
60
61unsafe impl<T: Send> Send for Task<T> {}
62
63#[cfg(any(test, feature = "test-support"))]
64struct DeterministicState {
65 rng: rand::prelude::StdRng,
66 seed: u64,
67 scheduled_from_foreground: collections::HashMap<usize, Vec<ForegroundRunnable>>,
68 scheduled_from_background: Vec<Runnable>,
69 forbid_parking: bool,
70 block_on_ticks: std::ops::RangeInclusive<usize>,
71 now: std::time::Instant,
72 next_timer_id: usize,
73 pending_timers: Vec<(usize, std::time::Instant, postage::barrier::Sender)>,
74 waiting_backtrace: Option<backtrace::Backtrace>,
75}
76
77#[cfg(any(test, feature = "test-support"))]
78struct ForegroundRunnable {
79 runnable: Runnable,
80 main: bool,
81}
82
83#[cfg(any(test, feature = "test-support"))]
84pub struct Deterministic {
85 state: Arc<parking_lot::Mutex<DeterministicState>>,
86 parker: parking_lot::Mutex<parking::Parker>,
87}
88
89pub enum Timer {
90 Production(smol::Timer),
91 #[cfg(any(test, feature = "test-support"))]
92 Deterministic(DeterministicTimer),
93}
94
95#[cfg(any(test, feature = "test-support"))]
96pub struct DeterministicTimer {
97 rx: postage::barrier::Receiver,
98 id: usize,
99 state: Arc<parking_lot::Mutex<DeterministicState>>,
100}
101
102#[cfg(any(test, feature = "test-support"))]
103impl Deterministic {
104 pub fn new(seed: u64) -> Arc<Self> {
105 use rand::prelude::*;
106
107 Arc::new(Self {
108 state: Arc::new(parking_lot::Mutex::new(DeterministicState {
109 rng: StdRng::seed_from_u64(seed),
110 seed,
111 scheduled_from_foreground: Default::default(),
112 scheduled_from_background: Default::default(),
113 forbid_parking: false,
114 block_on_ticks: 0..=1000,
115 now: std::time::Instant::now(),
116 next_timer_id: Default::default(),
117 pending_timers: Default::default(),
118 waiting_backtrace: None,
119 })),
120 parker: Default::default(),
121 })
122 }
123
124 pub fn build_background(self: &Arc<Self>) -> Arc<Background> {
125 Arc::new(Background::Deterministic {
126 executor: self.clone(),
127 })
128 }
129
130 pub fn build_foreground(self: &Arc<Self>, id: usize) -> Rc<Foreground> {
131 Rc::new(Foreground::Deterministic {
132 cx_id: id,
133 executor: self.clone(),
134 })
135 }
136
137 fn spawn_from_foreground(
138 &self,
139 cx_id: usize,
140 future: AnyLocalFuture,
141 main: bool,
142 ) -> AnyLocalTask {
143 let state = self.state.clone();
144 let unparker = self.parker.lock().unparker();
145 let (runnable, task) = async_task::spawn_local(future, move |runnable| {
146 let mut state = state.lock();
147 state
148 .scheduled_from_foreground
149 .entry(cx_id)
150 .or_default()
151 .push(ForegroundRunnable { runnable, main });
152 unparker.unpark();
153 });
154 runnable.schedule();
155 task
156 }
157
158 fn spawn(&self, future: AnyFuture) -> AnyTask {
159 let state = self.state.clone();
160 let unparker = self.parker.lock().unparker();
161 let (runnable, task) = async_task::spawn(future, move |runnable| {
162 let mut state = state.lock();
163 state.scheduled_from_background.push(runnable);
164 unparker.unpark();
165 });
166 runnable.schedule();
167 task
168 }
169
170 fn run<'a>(
171 &self,
172 cx_id: usize,
173 main_future: Pin<Box<dyn 'a + Future<Output = Box<dyn Any>>>>,
174 ) -> Box<dyn Any> {
175 use std::sync::atomic::{AtomicBool, Ordering::SeqCst};
176
177 let woken = Arc::new(AtomicBool::new(false));
178
179 let state = self.state.clone();
180 let unparker = self.parker.lock().unparker();
181 let (runnable, mut main_task) = unsafe {
182 async_task::spawn_unchecked(main_future, move |runnable| {
183 let mut state = state.lock();
184 state
185 .scheduled_from_foreground
186 .entry(cx_id)
187 .or_default()
188 .push(ForegroundRunnable {
189 runnable,
190 main: true,
191 });
192 unparker.unpark();
193 })
194 };
195 runnable.schedule();
196
197 loop {
198 if let Some(result) = self.run_internal(woken.clone(), Some(&mut main_task)) {
199 return result;
200 }
201
202 if !woken.load(SeqCst) {
203 self.state.lock().will_park();
204 }
205
206 woken.store(false, SeqCst);
207 self.parker.lock().park();
208 }
209 }
210
211 pub fn run_until_parked(&self) {
212 use std::sync::atomic::AtomicBool;
213 let woken = Arc::new(AtomicBool::new(false));
214 self.run_internal(woken, None);
215 }
216
217 fn run_internal(
218 &self,
219 woken: Arc<std::sync::atomic::AtomicBool>,
220 mut main_task: Option<&mut AnyLocalTask>,
221 ) -> Option<Box<dyn Any>> {
222 use rand::prelude::*;
223 use std::sync::atomic::Ordering::SeqCst;
224
225 let unparker = self.parker.lock().unparker();
226 let waker = waker_fn::waker_fn(move || {
227 woken.store(true, SeqCst);
228 unparker.unpark();
229 });
230
231 let mut cx = Context::from_waker(&waker);
232 loop {
233 let mut state = self.state.lock();
234
235 if state.scheduled_from_foreground.is_empty()
236 && state.scheduled_from_background.is_empty()
237 {
238 if let Some(main_task) = main_task {
239 if let Poll::Ready(result) = main_task.poll(&mut cx) {
240 return Some(result);
241 }
242 }
243
244 return None;
245 }
246
247 if !state.scheduled_from_background.is_empty() && state.rng.gen() {
248 let background_len = state.scheduled_from_background.len();
249 let ix = state.rng.gen_range(0..background_len);
250 let runnable = state.scheduled_from_background.remove(ix);
251 drop(state);
252 runnable.run();
253 } else if !state.scheduled_from_foreground.is_empty() {
254 let available_cx_ids = state
255 .scheduled_from_foreground
256 .keys()
257 .copied()
258 .collect::<Vec<_>>();
259 let cx_id_to_run = *available_cx_ids.iter().choose(&mut state.rng).unwrap();
260 let scheduled_from_cx = state
261 .scheduled_from_foreground
262 .get_mut(&cx_id_to_run)
263 .unwrap();
264 let foreground_runnable = scheduled_from_cx.remove(0);
265 if scheduled_from_cx.is_empty() {
266 state.scheduled_from_foreground.remove(&cx_id_to_run);
267 }
268
269 drop(state);
270
271 foreground_runnable.runnable.run();
272 if let Some(main_task) = main_task.as_mut() {
273 if foreground_runnable.main {
274 if let Poll::Ready(result) = main_task.poll(&mut cx) {
275 return Some(result);
276 }
277 }
278 }
279 }
280 }
281 }
282
283 fn block<F, T>(&self, future: &mut F, max_ticks: usize) -> Option<T>
284 where
285 F: Unpin + Future<Output = T>,
286 {
287 use rand::prelude::*;
288
289 let unparker = self.parker.lock().unparker();
290 let waker = waker_fn::waker_fn(move || {
291 unparker.unpark();
292 });
293
294 let mut cx = Context::from_waker(&waker);
295 for _ in 0..max_ticks {
296 let mut state = self.state.lock();
297 let runnable_count = state.scheduled_from_background.len();
298 let ix = state.rng.gen_range(0..=runnable_count);
299 if ix < state.scheduled_from_background.len() {
300 let runnable = state.scheduled_from_background.remove(ix);
301 drop(state);
302 runnable.run();
303 } else {
304 drop(state);
305 if let Poll::Ready(result) = future.poll(&mut cx) {
306 return Some(result);
307 }
308 let mut state = self.state.lock();
309 if state.scheduled_from_background.is_empty() {
310 state.will_park();
311 drop(state);
312 self.parker.lock().park();
313 }
314
315 continue;
316 }
317 }
318
319 None
320 }
321
322 pub fn timer(&self, duration: Duration) -> Timer {
323 let (tx, rx) = postage::barrier::channel();
324 let mut state = self.state.lock();
325 let wakeup_at = state.now + duration;
326 let id = util::post_inc(&mut state.next_timer_id);
327 state.pending_timers.push((id, wakeup_at, tx));
328 let state = self.state.clone();
329 Timer::Deterministic(DeterministicTimer { rx, id, state })
330 }
331
332 pub fn advance_clock(&self, duration: Duration) {
333 let mut state = self.state.lock();
334 state.now += duration;
335 let now = state.now;
336 let mut pending_timers = mem::take(&mut state.pending_timers);
337 drop(state);
338
339 pending_timers.retain(|(_, wakeup, _)| *wakeup > now);
340 self.state.lock().pending_timers.extend(pending_timers);
341 }
342}
343
344impl Drop for Timer {
345 fn drop(&mut self) {
346 #[cfg(any(test, feature = "test-support"))]
347 if let Timer::Deterministic(DeterministicTimer { state, id, .. }) = self {
348 state
349 .lock()
350 .pending_timers
351 .retain(|(timer_id, _, _)| timer_id != id)
352 }
353 }
354}
355
356impl Future for Timer {
357 type Output = ();
358
359 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
360 match &mut *self {
361 #[cfg(any(test, feature = "test-support"))]
362 Self::Deterministic(DeterministicTimer { rx, .. }) => {
363 use postage::stream::{PollRecv, Stream as _};
364 smol::pin!(rx);
365 match rx.poll_recv(&mut postage::Context::from_waker(cx.waker())) {
366 PollRecv::Ready(()) | PollRecv::Closed => Poll::Ready(()),
367 PollRecv::Pending => Poll::Pending,
368 }
369 }
370 Self::Production(timer) => {
371 smol::pin!(timer);
372 match timer.poll(cx) {
373 Poll::Ready(_) => Poll::Ready(()),
374 Poll::Pending => Poll::Pending,
375 }
376 }
377 }
378 }
379}
380
381#[cfg(any(test, feature = "test-support"))]
382impl DeterministicState {
383 fn will_park(&mut self) {
384 if self.forbid_parking {
385 let mut backtrace_message = String::new();
386 #[cfg(any(test, feature = "test-support"))]
387 if let Some(backtrace) = self.waiting_backtrace.as_mut() {
388 backtrace.resolve();
389 backtrace_message = format!(
390 "\nbacktrace of waiting future:\n{:?}",
391 util::CwdBacktrace(backtrace)
392 );
393 }
394
395 panic!(
396 "deterministic executor parked after a call to forbid_parking{}",
397 backtrace_message
398 );
399 }
400 }
401}
402
403impl Foreground {
404 pub fn platform(dispatcher: Arc<dyn platform::Dispatcher>) -> Result<Self> {
405 if dispatcher.is_main_thread() {
406 Ok(Self::Platform {
407 dispatcher,
408 _not_send_or_sync: PhantomData,
409 })
410 } else {
411 Err(anyhow!("must be constructed on main thread"))
412 }
413 }
414
415 pub fn spawn<T: 'static>(&self, future: impl Future<Output = T> + 'static) -> Task<T> {
416 let future = any_local_future(future);
417 let any_task = match self {
418 #[cfg(any(test, feature = "test-support"))]
419 Self::Deterministic { cx_id, executor } => {
420 executor.spawn_from_foreground(*cx_id, future, false)
421 }
422 Self::Platform { dispatcher, .. } => {
423 fn spawn_inner(
424 future: AnyLocalFuture,
425 dispatcher: &Arc<dyn Dispatcher>,
426 ) -> AnyLocalTask {
427 let dispatcher = dispatcher.clone();
428 let schedule =
429 move |runnable: Runnable| dispatcher.run_on_main_thread(runnable);
430 let (runnable, task) = async_task::spawn_local(future, schedule);
431 runnable.schedule();
432 task
433 }
434 spawn_inner(future, dispatcher)
435 }
436 };
437 Task::local(any_task)
438 }
439
440 #[cfg(any(test, feature = "test-support"))]
441 pub fn run<T: 'static>(&self, future: impl Future<Output = T>) -> T {
442 let future = async move { Box::new(future.await) as Box<dyn Any> }.boxed_local();
443 let result = match self {
444 Self::Deterministic { cx_id, executor } => executor.run(*cx_id, future),
445 Self::Platform { .. } => panic!("you can't call run on a platform foreground executor"),
446 };
447 *result.downcast().unwrap()
448 }
449
450 #[cfg(any(test, feature = "test-support"))]
451 pub fn run_until_parked(&self) {
452 match self {
453 Self::Deterministic { executor, .. } => executor.run_until_parked(),
454 _ => panic!("this method can only be called on a deterministic executor"),
455 }
456 }
457
458 #[cfg(any(test, feature = "test-support"))]
459 pub fn parking_forbidden(&self) -> bool {
460 match self {
461 Self::Deterministic { executor, .. } => executor.state.lock().forbid_parking,
462 _ => panic!("this method can only be called on a deterministic executor"),
463 }
464 }
465
466 #[cfg(any(test, feature = "test-support"))]
467 pub fn start_waiting(&self) {
468 match self {
469 Self::Deterministic { executor, .. } => {
470 executor.state.lock().waiting_backtrace =
471 Some(backtrace::Backtrace::new_unresolved());
472 }
473 _ => panic!("this method can only be called on a deterministic executor"),
474 }
475 }
476
477 #[cfg(any(test, feature = "test-support"))]
478 pub fn finish_waiting(&self) {
479 match self {
480 Self::Deterministic { executor, .. } => {
481 executor.state.lock().waiting_backtrace.take();
482 }
483 _ => panic!("this method can only be called on a deterministic executor"),
484 }
485 }
486
487 #[cfg(any(test, feature = "test-support"))]
488 pub fn forbid_parking(&self) {
489 use rand::prelude::*;
490
491 match self {
492 Self::Deterministic { executor, .. } => {
493 let mut state = executor.state.lock();
494 state.forbid_parking = true;
495 state.rng = StdRng::seed_from_u64(state.seed);
496 }
497 _ => panic!("this method can only be called on a deterministic executor"),
498 }
499 }
500
501 #[cfg(any(test, feature = "test-support"))]
502 pub fn advance_clock(&self, duration: Duration) {
503 match self {
504 Self::Deterministic { executor, .. } => {
505 executor.run_until_parked();
506 executor.advance_clock(duration);
507 }
508 _ => panic!("this method can only be called on a deterministic executor"),
509 }
510 }
511
512 #[cfg(any(test, feature = "test-support"))]
513 pub fn set_block_on_ticks(&self, range: std::ops::RangeInclusive<usize>) {
514 match self {
515 Self::Deterministic { executor, .. } => executor.state.lock().block_on_ticks = range,
516 _ => panic!("this method can only be called on a deterministic executor"),
517 }
518 }
519}
520
521impl Background {
522 pub fn new() -> Self {
523 let executor = Arc::new(Executor::new());
524 let stop = channel::unbounded::<()>();
525
526 for i in 0..2 * num_cpus::get() {
527 let executor = executor.clone();
528 let stop = stop.1.clone();
529 thread::Builder::new()
530 .name(format!("background-executor-{}", i))
531 .spawn(move || smol::block_on(executor.run(stop.recv())))
532 .unwrap();
533 }
534
535 Self::Production {
536 executor,
537 _stop: stop.0,
538 }
539 }
540
541 pub fn num_cpus(&self) -> usize {
542 num_cpus::get()
543 }
544
545 pub fn spawn<T, F>(&self, future: F) -> Task<T>
546 where
547 T: 'static + Send,
548 F: Send + Future<Output = T> + 'static,
549 {
550 let future = any_future(future);
551 let any_task = match self {
552 Self::Production { executor, .. } => executor.spawn(future),
553 #[cfg(any(test, feature = "test-support"))]
554 Self::Deterministic { executor } => executor.spawn(future),
555 };
556 Task::send(any_task)
557 }
558
559 pub fn block<F, T>(&self, future: F) -> T
560 where
561 F: Future<Output = T>,
562 {
563 smol::pin!(future);
564 match self {
565 Self::Production { .. } => smol::block_on(&mut future),
566 #[cfg(any(test, feature = "test-support"))]
567 Self::Deterministic { executor, .. } => {
568 executor.block(&mut future, usize::MAX).unwrap()
569 }
570 }
571 }
572
573 pub fn block_with_timeout<F, T>(
574 &self,
575 timeout: Duration,
576 future: F,
577 ) -> Result<T, impl Future<Output = T>>
578 where
579 T: 'static,
580 F: 'static + Unpin + Future<Output = T>,
581 {
582 let mut future = any_local_future(future);
583 if !timeout.is_zero() {
584 let output = match self {
585 Self::Production { .. } => smol::block_on(util::timeout(timeout, &mut future)).ok(),
586 #[cfg(any(test, feature = "test-support"))]
587 Self::Deterministic { executor, .. } => {
588 use rand::prelude::*;
589 let max_ticks = {
590 let mut state = executor.state.lock();
591 let range = state.block_on_ticks.clone();
592 state.rng.gen_range(range)
593 };
594 executor.block(&mut future, max_ticks)
595 }
596 };
597 if let Some(output) = output {
598 return Ok(*output.downcast().unwrap());
599 }
600 }
601 Err(async { *future.await.downcast().unwrap() })
602 }
603
604 pub async fn scoped<'scope, F>(&self, scheduler: F)
605 where
606 F: FnOnce(&mut Scope<'scope>),
607 {
608 let mut scope = Scope {
609 futures: Default::default(),
610 _phantom: PhantomData,
611 };
612 (scheduler)(&mut scope);
613 let spawned = scope
614 .futures
615 .into_iter()
616 .map(|f| self.spawn(f))
617 .collect::<Vec<_>>();
618 for task in spawned {
619 task.await;
620 }
621 }
622
623 pub fn timer(&self, duration: Duration) -> Timer {
624 match self {
625 Background::Production { .. } => Timer::Production(smol::Timer::after(duration)),
626 #[cfg(any(test, feature = "test-support"))]
627 Background::Deterministic { executor } => executor.timer(duration),
628 }
629 }
630
631 #[cfg(any(test, feature = "test-support"))]
632 pub async fn simulate_random_delay(&self) {
633 use rand::prelude::*;
634 use smol::future::yield_now;
635
636 match self {
637 Self::Deterministic { executor, .. } => {
638 if executor.state.lock().rng.gen_bool(0.2) {
639 let yields = executor.state.lock().rng.gen_range(1..=10);
640 for _ in 0..yields {
641 yield_now().await;
642 }
643
644 let delay = Duration::from_millis(executor.state.lock().rng.gen_range(0..100));
645 executor.advance_clock(delay);
646 }
647 }
648 _ => panic!("this method can only be called on a deterministic executor"),
649 }
650 }
651}
652
653pub struct Scope<'a> {
654 futures: Vec<Pin<Box<dyn Future<Output = ()> + Send + 'static>>>,
655 _phantom: PhantomData<&'a ()>,
656}
657
658impl<'a> Scope<'a> {
659 pub fn spawn<F>(&mut self, f: F)
660 where
661 F: Future<Output = ()> + Send + 'a,
662 {
663 let f = unsafe {
664 mem::transmute::<
665 Pin<Box<dyn Future<Output = ()> + Send + 'a>>,
666 Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
667 >(Box::pin(f))
668 };
669 self.futures.push(f);
670 }
671}
672
673impl<T> Task<T> {
674 pub fn ready(value: T) -> Self {
675 Self::Ready(Some(value))
676 }
677
678 fn local(any_task: AnyLocalTask) -> Self {
679 Self::Local {
680 any_task,
681 result_type: PhantomData,
682 }
683 }
684
685 pub fn detach(self) {
686 match self {
687 Task::Ready(_) => {}
688 Task::Local { any_task, .. } => any_task.detach(),
689 Task::Send { any_task, .. } => any_task.detach(),
690 }
691 }
692}
693
694impl<T: 'static, E: 'static + Display> Task<Result<T, E>> {
695 pub fn detach_and_log_err(self, cx: &mut MutableAppContext) {
696 cx.spawn(|_| async move {
697 if let Err(err) = self.await {
698 log::error!("{}", err);
699 }
700 })
701 .detach();
702 }
703}
704
705impl<T: Send> Task<T> {
706 fn send(any_task: AnyTask) -> Self {
707 Self::Send {
708 any_task,
709 result_type: PhantomData,
710 }
711 }
712}
713
714impl<T: fmt::Debug> fmt::Debug for Task<T> {
715 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
716 match self {
717 Task::Ready(value) => value.fmt(f),
718 Task::Local { any_task, .. } => any_task.fmt(f),
719 Task::Send { any_task, .. } => any_task.fmt(f),
720 }
721 }
722}
723
724impl<T: 'static> Future for Task<T> {
725 type Output = T;
726
727 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
728 match unsafe { self.get_unchecked_mut() } {
729 Task::Ready(value) => Poll::Ready(value.take().unwrap()),
730 Task::Local { any_task, .. } => {
731 any_task.poll(cx).map(|value| *value.downcast().unwrap())
732 }
733 Task::Send { any_task, .. } => {
734 any_task.poll(cx).map(|value| *value.downcast().unwrap())
735 }
736 }
737 }
738}
739
740fn any_future<T, F>(future: F) -> AnyFuture
741where
742 T: 'static + Send,
743 F: Future<Output = T> + Send + 'static,
744{
745 async { Box::new(future.await) as Box<dyn Any + Send> }.boxed()
746}
747
748fn any_local_future<T, F>(future: F) -> AnyLocalFuture
749where
750 T: 'static,
751 F: Future<Output = T> + 'static,
752{
753 async { Box::new(future.await) as Box<dyn Any> }.boxed_local()
754}