1use anyhow::{anyhow, Result};
2use async_task::Runnable;
3use smol::{channel, prelude::*, Executor, Timer};
4use std::{
5 any::Any,
6 fmt::{self, Display},
7 marker::PhantomData,
8 mem,
9 pin::Pin,
10 rc::Rc,
11 sync::Arc,
12 task::{Context, Poll},
13 thread,
14 time::Duration,
15};
16
17use crate::{
18 platform::{self, Dispatcher},
19 util, MutableAppContext,
20};
21
22pub enum Foreground {
23 Platform {
24 dispatcher: Arc<dyn platform::Dispatcher>,
25 _not_send_or_sync: PhantomData<Rc<()>>,
26 },
27 #[cfg(any(test, feature = "test-support"))]
28 Deterministic {
29 cx_id: usize,
30 executor: Arc<Deterministic>,
31 },
32}
33
34pub enum Background {
35 #[cfg(any(test, feature = "test-support"))]
36 Deterministic { executor: Arc<Deterministic> },
37 Production {
38 executor: Arc<smol::Executor<'static>>,
39 _stop: channel::Sender<()>,
40 },
41}
42
43type AnyLocalFuture = Pin<Box<dyn 'static + Future<Output = Box<dyn Any + 'static>>>>;
44type AnyFuture = Pin<Box<dyn 'static + Send + Future<Output = Box<dyn Any + Send + 'static>>>>;
45type AnyTask = async_task::Task<Box<dyn Any + Send + 'static>>;
46type AnyLocalTask = async_task::Task<Box<dyn Any + 'static>>;
47
48#[must_use]
49pub enum Task<T> {
50 Ready(Option<T>),
51 Local {
52 any_task: AnyLocalTask,
53 result_type: PhantomData<T>,
54 },
55 Send {
56 any_task: AnyTask,
57 result_type: PhantomData<T>,
58 },
59}
60
61unsafe impl<T: Send> Send for Task<T> {}
62
63#[cfg(any(test, feature = "test-support"))]
64struct DeterministicState {
65 rng: rand::prelude::StdRng,
66 seed: u64,
67 scheduled_from_foreground: collections::HashMap<usize, Vec<ForegroundRunnable>>,
68 scheduled_from_background: Vec<Runnable>,
69 forbid_parking: bool,
70 block_on_ticks: std::ops::RangeInclusive<usize>,
71 now: std::time::Instant,
72 next_timer_id: usize,
73 pending_timers: Vec<(usize, std::time::Instant, postage::barrier::Sender)>,
74 waiting_backtrace: Option<backtrace::Backtrace>,
75}
76
77#[cfg(any(test, feature = "test-support"))]
78struct ForegroundRunnable {
79 runnable: Runnable,
80 main: bool,
81}
82
83#[cfg(any(test, feature = "test-support"))]
84pub struct Deterministic {
85 state: Arc<parking_lot::Mutex<DeterministicState>>,
86 parker: parking_lot::Mutex<parking::Parker>,
87}
88
89#[cfg(any(test, feature = "test-support"))]
90impl Deterministic {
91 pub fn new(seed: u64) -> Arc<Self> {
92 use rand::prelude::*;
93
94 Arc::new(Self {
95 state: Arc::new(parking_lot::Mutex::new(DeterministicState {
96 rng: StdRng::seed_from_u64(seed),
97 seed,
98 scheduled_from_foreground: Default::default(),
99 scheduled_from_background: Default::default(),
100 forbid_parking: false,
101 block_on_ticks: 0..=1000,
102 now: std::time::Instant::now(),
103 next_timer_id: Default::default(),
104 pending_timers: Default::default(),
105 waiting_backtrace: None,
106 })),
107 parker: Default::default(),
108 })
109 }
110
111 pub fn build_background(self: &Arc<Self>) -> Arc<Background> {
112 Arc::new(Background::Deterministic {
113 executor: self.clone(),
114 })
115 }
116
117 pub fn build_foreground(self: &Arc<Self>, id: usize) -> Rc<Foreground> {
118 Rc::new(Foreground::Deterministic {
119 cx_id: id,
120 executor: self.clone(),
121 })
122 }
123
124 fn spawn_from_foreground(
125 &self,
126 cx_id: usize,
127 future: AnyLocalFuture,
128 main: bool,
129 ) -> AnyLocalTask {
130 let state = self.state.clone();
131 let unparker = self.parker.lock().unparker();
132 let (runnable, task) = async_task::spawn_local(future, move |runnable| {
133 let mut state = state.lock();
134 state
135 .scheduled_from_foreground
136 .entry(cx_id)
137 .or_default()
138 .push(ForegroundRunnable { runnable, main });
139 unparker.unpark();
140 });
141 runnable.schedule();
142 task
143 }
144
145 fn spawn(&self, future: AnyFuture) -> AnyTask {
146 let state = self.state.clone();
147 let unparker = self.parker.lock().unparker();
148 let (runnable, task) = async_task::spawn(future, move |runnable| {
149 let mut state = state.lock();
150 state.scheduled_from_background.push(runnable);
151 unparker.unpark();
152 });
153 runnable.schedule();
154 task
155 }
156
157 fn run<'a>(
158 &self,
159 cx_id: usize,
160 main_future: Pin<Box<dyn 'a + Future<Output = Box<dyn Any>>>>,
161 ) -> Box<dyn Any> {
162 use std::sync::atomic::{AtomicBool, Ordering::SeqCst};
163
164 let woken = Arc::new(AtomicBool::new(false));
165
166 let state = self.state.clone();
167 let unparker = self.parker.lock().unparker();
168 let (runnable, mut main_task) = unsafe {
169 async_task::spawn_unchecked(main_future, move |runnable| {
170 let mut state = state.lock();
171 state
172 .scheduled_from_foreground
173 .entry(cx_id)
174 .or_default()
175 .push(ForegroundRunnable {
176 runnable,
177 main: true,
178 });
179 unparker.unpark();
180 })
181 };
182 runnable.schedule();
183
184 loop {
185 if let Some(result) = self.run_internal(woken.clone(), Some(&mut main_task)) {
186 return result;
187 }
188
189 if !woken.load(SeqCst) {
190 self.state.lock().will_park();
191 }
192
193 woken.store(false, SeqCst);
194 self.parker.lock().park();
195 }
196 }
197
198 pub fn run_until_parked(&self) {
199 use std::sync::atomic::AtomicBool;
200 let woken = Arc::new(AtomicBool::new(false));
201 self.run_internal(woken, None);
202 }
203
204 fn run_internal(
205 &self,
206 woken: Arc<std::sync::atomic::AtomicBool>,
207 mut main_task: Option<&mut AnyLocalTask>,
208 ) -> Option<Box<dyn Any>> {
209 use rand::prelude::*;
210 use std::sync::atomic::Ordering::SeqCst;
211
212 let unparker = self.parker.lock().unparker();
213 let waker = waker_fn::waker_fn(move || {
214 woken.store(true, SeqCst);
215 unparker.unpark();
216 });
217
218 let mut cx = Context::from_waker(&waker);
219 loop {
220 let mut state = self.state.lock();
221
222 if state.scheduled_from_foreground.is_empty()
223 && state.scheduled_from_background.is_empty()
224 {
225 if let Some(main_task) = main_task {
226 if let Poll::Ready(result) = main_task.poll(&mut cx) {
227 return Some(result);
228 }
229 }
230
231 return None;
232 }
233
234 if !state.scheduled_from_background.is_empty() && state.rng.gen() {
235 let background_len = state.scheduled_from_background.len();
236 let ix = state.rng.gen_range(0..background_len);
237 let runnable = state.scheduled_from_background.remove(ix);
238 drop(state);
239 runnable.run();
240 } else if !state.scheduled_from_foreground.is_empty() {
241 let available_cx_ids = state
242 .scheduled_from_foreground
243 .keys()
244 .copied()
245 .collect::<Vec<_>>();
246 let cx_id_to_run = *available_cx_ids.iter().choose(&mut state.rng).unwrap();
247 let scheduled_from_cx = state
248 .scheduled_from_foreground
249 .get_mut(&cx_id_to_run)
250 .unwrap();
251 let foreground_runnable = scheduled_from_cx.remove(0);
252 if scheduled_from_cx.is_empty() {
253 state.scheduled_from_foreground.remove(&cx_id_to_run);
254 }
255
256 drop(state);
257
258 foreground_runnable.runnable.run();
259 if let Some(main_task) = main_task.as_mut() {
260 if foreground_runnable.main {
261 if let Poll::Ready(result) = main_task.poll(&mut cx) {
262 return Some(result);
263 }
264 }
265 }
266 }
267 }
268 }
269
270 fn block<F, T>(&self, future: &mut F, max_ticks: usize) -> Option<T>
271 where
272 F: Unpin + Future<Output = T>,
273 {
274 use rand::prelude::*;
275
276 let unparker = self.parker.lock().unparker();
277 let waker = waker_fn::waker_fn(move || {
278 unparker.unpark();
279 });
280
281 let mut cx = Context::from_waker(&waker);
282 for _ in 0..max_ticks {
283 let mut state = self.state.lock();
284 let runnable_count = state.scheduled_from_background.len();
285 let ix = state.rng.gen_range(0..=runnable_count);
286 if ix < state.scheduled_from_background.len() {
287 let runnable = state.scheduled_from_background.remove(ix);
288 drop(state);
289 runnable.run();
290 } else {
291 drop(state);
292 if let Poll::Ready(result) = future.poll(&mut cx) {
293 return Some(result);
294 }
295 let mut state = self.state.lock();
296 if state.scheduled_from_background.is_empty() {
297 state.will_park();
298 drop(state);
299 self.parker.lock().park();
300 }
301
302 continue;
303 }
304 }
305
306 None
307 }
308
309 pub fn timer(&self, duration: Duration) -> impl Future<Output = ()> {
310 let (tx, mut rx) = postage::barrier::channel();
311 let timer_id;
312 {
313 let mut state = self.state.lock();
314 let wakeup_at = state.now + duration;
315 timer_id = util::post_inc(&mut state.next_timer_id);
316 state.pending_timers.push((timer_id, wakeup_at, tx));
317 }
318
319 let remove_timer = util::defer({
320 let state = self.state.clone();
321 move || {
322 state
323 .lock()
324 .pending_timers
325 .retain(|(id, _, _)| *id != timer_id);
326 }
327 });
328
329 async move {
330 postage::prelude::Stream::recv(&mut rx).await;
331 drop(remove_timer);
332 }
333 }
334
335 pub fn advance_clock(&self, duration: Duration) {
336 let mut state = self.state.lock();
337 state.now += duration;
338 let now = state.now;
339 let mut pending_timers = mem::take(&mut state.pending_timers);
340 drop(state);
341
342 pending_timers.retain(|(_, wakeup, _)| *wakeup > now);
343 self.state.lock().pending_timers.extend(pending_timers);
344 }
345}
346
347#[cfg(any(test, feature = "test-support"))]
348impl DeterministicState {
349 fn will_park(&mut self) {
350 if self.forbid_parking {
351 let mut backtrace_message = String::new();
352 #[cfg(any(test, feature = "test-support"))]
353 if let Some(backtrace) = self.waiting_backtrace.as_mut() {
354 backtrace.resolve();
355 backtrace_message = format!(
356 "\nbacktrace of waiting future:\n{:?}",
357 util::CwdBacktrace(backtrace)
358 );
359 }
360
361 panic!(
362 "deterministic executor parked after a call to forbid_parking{}",
363 backtrace_message
364 );
365 }
366 }
367}
368
369impl Foreground {
370 pub fn platform(dispatcher: Arc<dyn platform::Dispatcher>) -> Result<Self> {
371 if dispatcher.is_main_thread() {
372 Ok(Self::Platform {
373 dispatcher,
374 _not_send_or_sync: PhantomData,
375 })
376 } else {
377 Err(anyhow!("must be constructed on main thread"))
378 }
379 }
380
381 pub fn spawn<T: 'static>(&self, future: impl Future<Output = T> + 'static) -> Task<T> {
382 let future = any_local_future(future);
383 let any_task = match self {
384 #[cfg(any(test, feature = "test-support"))]
385 Self::Deterministic { cx_id, executor } => {
386 executor.spawn_from_foreground(*cx_id, future, false)
387 }
388 Self::Platform { dispatcher, .. } => {
389 fn spawn_inner(
390 future: AnyLocalFuture,
391 dispatcher: &Arc<dyn Dispatcher>,
392 ) -> AnyLocalTask {
393 let dispatcher = dispatcher.clone();
394 let schedule =
395 move |runnable: Runnable| dispatcher.run_on_main_thread(runnable);
396 let (runnable, task) = async_task::spawn_local(future, schedule);
397 runnable.schedule();
398 task
399 }
400 spawn_inner(future, dispatcher)
401 }
402 };
403 Task::local(any_task)
404 }
405
406 #[cfg(any(test, feature = "test-support"))]
407 pub fn run<T: 'static>(&self, future: impl Future<Output = T>) -> T {
408 let future = async move { Box::new(future.await) as Box<dyn Any> }.boxed_local();
409 let result = match self {
410 Self::Deterministic { cx_id, executor } => executor.run(*cx_id, future),
411 Self::Platform { .. } => panic!("you can't call run on a platform foreground executor"),
412 };
413 *result.downcast().unwrap()
414 }
415
416 #[cfg(any(test, feature = "test-support"))]
417 pub fn run_until_parked(&self) {
418 match self {
419 Self::Deterministic { executor, .. } => executor.run_until_parked(),
420 _ => panic!("this method can only be called on a deterministic executor"),
421 }
422 }
423
424 #[cfg(any(test, feature = "test-support"))]
425 pub fn parking_forbidden(&self) -> bool {
426 match self {
427 Self::Deterministic { executor, .. } => executor.state.lock().forbid_parking,
428 _ => panic!("this method can only be called on a deterministic executor"),
429 }
430 }
431
432 #[cfg(any(test, feature = "test-support"))]
433 pub fn start_waiting(&self) {
434 match self {
435 Self::Deterministic { executor, .. } => {
436 executor.state.lock().waiting_backtrace =
437 Some(backtrace::Backtrace::new_unresolved());
438 }
439 _ => panic!("this method can only be called on a deterministic executor"),
440 }
441 }
442
443 #[cfg(any(test, feature = "test-support"))]
444 pub fn finish_waiting(&self) {
445 match self {
446 Self::Deterministic { executor, .. } => {
447 executor.state.lock().waiting_backtrace.take();
448 }
449 _ => panic!("this method can only be called on a deterministic executor"),
450 }
451 }
452
453 #[cfg(any(test, feature = "test-support"))]
454 pub fn forbid_parking(&self) {
455 use rand::prelude::*;
456
457 match self {
458 Self::Deterministic { executor, .. } => {
459 let mut state = executor.state.lock();
460 state.forbid_parking = true;
461 state.rng = StdRng::seed_from_u64(state.seed);
462 }
463 _ => panic!("this method can only be called on a deterministic executor"),
464 }
465 }
466
467 pub fn timer(&self, duration: Duration) -> impl Future<Output = ()> {
468 let mut timer = None;
469
470 #[cfg(any(test, feature = "test-support"))]
471 if let Self::Deterministic { executor, .. } = self {
472 timer = Some(executor.timer(duration));
473 }
474
475 async move {
476 if let Some(timer) = timer {
477 timer.await;
478 } else {
479 Timer::after(duration).await;
480 }
481 }
482 }
483
484 #[cfg(any(test, feature = "test-support"))]
485 pub fn advance_clock(&self, duration: Duration) {
486 match self {
487 Self::Deterministic { executor, .. } => {
488 executor.run_until_parked();
489 executor.advance_clock(duration);
490 }
491 _ => panic!("this method can only be called on a deterministic executor"),
492 }
493 }
494
495 #[cfg(any(test, feature = "test-support"))]
496 pub fn set_block_on_ticks(&self, range: std::ops::RangeInclusive<usize>) {
497 match self {
498 Self::Deterministic { executor, .. } => executor.state.lock().block_on_ticks = range,
499 _ => panic!("this method can only be called on a deterministic executor"),
500 }
501 }
502}
503
504impl Background {
505 pub fn new() -> Self {
506 let executor = Arc::new(Executor::new());
507 let stop = channel::unbounded::<()>();
508
509 for i in 0..2 * num_cpus::get() {
510 let executor = executor.clone();
511 let stop = stop.1.clone();
512 thread::Builder::new()
513 .name(format!("background-executor-{}", i))
514 .spawn(move || smol::block_on(executor.run(stop.recv())))
515 .unwrap();
516 }
517
518 Self::Production {
519 executor,
520 _stop: stop.0,
521 }
522 }
523
524 pub fn num_cpus(&self) -> usize {
525 num_cpus::get()
526 }
527
528 pub fn spawn<T, F>(&self, future: F) -> Task<T>
529 where
530 T: 'static + Send,
531 F: Send + Future<Output = T> + 'static,
532 {
533 let future = any_future(future);
534 let any_task = match self {
535 Self::Production { executor, .. } => executor.spawn(future),
536 #[cfg(any(test, feature = "test-support"))]
537 Self::Deterministic { executor } => executor.spawn(future),
538 };
539 Task::send(any_task)
540 }
541
542 pub fn block<F, T>(&self, future: F) -> T
543 where
544 F: Future<Output = T>,
545 {
546 smol::pin!(future);
547 match self {
548 Self::Production { .. } => smol::block_on(&mut future),
549 #[cfg(any(test, feature = "test-support"))]
550 Self::Deterministic { executor, .. } => {
551 executor.block(&mut future, usize::MAX).unwrap()
552 }
553 }
554 }
555
556 pub fn block_with_timeout<F, T>(
557 &self,
558 timeout: Duration,
559 future: F,
560 ) -> Result<T, impl Future<Output = T>>
561 where
562 T: 'static,
563 F: 'static + Unpin + Future<Output = T>,
564 {
565 let mut future = any_local_future(future);
566 if !timeout.is_zero() {
567 let output = match self {
568 Self::Production { .. } => smol::block_on(util::timeout(timeout, &mut future)).ok(),
569 #[cfg(any(test, feature = "test-support"))]
570 Self::Deterministic { executor, .. } => {
571 use rand::prelude::*;
572 let max_ticks = {
573 let mut state = executor.state.lock();
574 let range = state.block_on_ticks.clone();
575 state.rng.gen_range(range)
576 };
577 executor.block(&mut future, max_ticks)
578 }
579 };
580 if let Some(output) = output {
581 return Ok(*output.downcast().unwrap());
582 }
583 }
584 Err(async { *future.await.downcast().unwrap() })
585 }
586
587 pub async fn scoped<'scope, F>(&self, scheduler: F)
588 where
589 F: FnOnce(&mut Scope<'scope>),
590 {
591 let mut scope = Scope {
592 futures: Default::default(),
593 _phantom: PhantomData,
594 };
595 (scheduler)(&mut scope);
596 let spawned = scope
597 .futures
598 .into_iter()
599 .map(|f| self.spawn(f))
600 .collect::<Vec<_>>();
601 for task in spawned {
602 task.await;
603 }
604 }
605
606 pub fn timer(&self, duration: Duration) -> impl Future<Output = ()> {
607 let mut timer = None;
608
609 #[cfg(any(test, feature = "test-support"))]
610 if let Self::Deterministic { executor, .. } = self {
611 timer = Some(executor.timer(duration));
612 }
613
614 async move {
615 if let Some(timer) = timer {
616 timer.await;
617 } else {
618 Timer::after(duration).await;
619 }
620 }
621 }
622
623 #[cfg(any(test, feature = "test-support"))]
624 pub async fn simulate_random_delay(&self) {
625 use rand::prelude::*;
626 use smol::future::yield_now;
627
628 match self {
629 Self::Deterministic { executor, .. } => {
630 if executor.state.lock().rng.gen_bool(0.2) {
631 let yields = executor.state.lock().rng.gen_range(1..=10);
632 for _ in 0..yields {
633 yield_now().await;
634 }
635
636 let delay = Duration::from_millis(executor.state.lock().rng.gen_range(0..100));
637 executor.advance_clock(delay);
638 }
639 }
640 _ => panic!("this method can only be called on a deterministic executor"),
641 }
642 }
643}
644
645pub struct Scope<'a> {
646 futures: Vec<Pin<Box<dyn Future<Output = ()> + Send + 'static>>>,
647 _phantom: PhantomData<&'a ()>,
648}
649
650impl<'a> Scope<'a> {
651 pub fn spawn<F>(&mut self, f: F)
652 where
653 F: Future<Output = ()> + Send + 'a,
654 {
655 let f = unsafe {
656 mem::transmute::<
657 Pin<Box<dyn Future<Output = ()> + Send + 'a>>,
658 Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
659 >(Box::pin(f))
660 };
661 self.futures.push(f);
662 }
663}
664
665impl<T> Task<T> {
666 pub fn ready(value: T) -> Self {
667 Self::Ready(Some(value))
668 }
669
670 fn local(any_task: AnyLocalTask) -> Self {
671 Self::Local {
672 any_task,
673 result_type: PhantomData,
674 }
675 }
676
677 pub fn detach(self) {
678 match self {
679 Task::Ready(_) => {}
680 Task::Local { any_task, .. } => any_task.detach(),
681 Task::Send { any_task, .. } => any_task.detach(),
682 }
683 }
684}
685
686impl<T: 'static, E: 'static + Display> Task<Result<T, E>> {
687 pub fn detach_and_log_err(self, cx: &mut MutableAppContext) {
688 cx.spawn(|_| async move {
689 if let Err(err) = self.await {
690 log::error!("{}", err);
691 }
692 })
693 .detach();
694 }
695}
696
697impl<T: Send> Task<T> {
698 fn send(any_task: AnyTask) -> Self {
699 Self::Send {
700 any_task,
701 result_type: PhantomData,
702 }
703 }
704}
705
706impl<T: fmt::Debug> fmt::Debug for Task<T> {
707 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
708 match self {
709 Task::Ready(value) => value.fmt(f),
710 Task::Local { any_task, .. } => any_task.fmt(f),
711 Task::Send { any_task, .. } => any_task.fmt(f),
712 }
713 }
714}
715
716impl<T: 'static> Future for Task<T> {
717 type Output = T;
718
719 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
720 match unsafe { self.get_unchecked_mut() } {
721 Task::Ready(value) => Poll::Ready(value.take().unwrap()),
722 Task::Local { any_task, .. } => {
723 any_task.poll(cx).map(|value| *value.downcast().unwrap())
724 }
725 Task::Send { any_task, .. } => {
726 any_task.poll(cx).map(|value| *value.downcast().unwrap())
727 }
728 }
729 }
730}
731
732fn any_future<T, F>(future: F) -> AnyFuture
733where
734 T: 'static + Send,
735 F: Future<Output = T> + Send + 'static,
736{
737 async { Box::new(future.await) as Box<dyn Any + Send> }.boxed()
738}
739
740fn any_local_future<T, F>(future: F) -> AnyLocalFuture
741where
742 T: 'static,
743 F: Future<Output = T> + 'static,
744{
745 async { Box::new(future.await) as Box<dyn Any> }.boxed_local()
746}