1use anyhow::{anyhow, Result};
2use async_task::Runnable;
3use backtrace::Backtrace;
4use collections::HashMap;
5use parking_lot::Mutex;
6use postage::{barrier, prelude::Stream as _};
7use rand::prelude::*;
8use smol::{channel, future::yield_now, prelude::*, Executor, Timer};
9use std::{
10 any::Any,
11 fmt::{self, Display},
12 marker::PhantomData,
13 mem,
14 ops::RangeInclusive,
15 pin::Pin,
16 rc::Rc,
17 sync::{
18 atomic::{AtomicBool, Ordering::SeqCst},
19 Arc,
20 },
21 task::{Context, Poll},
22 thread,
23 time::{Duration, Instant},
24};
25use waker_fn::waker_fn;
26
27use crate::{
28 platform::{self, Dispatcher},
29 util, MutableAppContext,
30};
31
32pub enum Foreground {
33 Platform {
34 dispatcher: Arc<dyn platform::Dispatcher>,
35 _not_send_or_sync: PhantomData<Rc<()>>,
36 },
37 Deterministic {
38 cx_id: usize,
39 executor: Arc<Deterministic>,
40 },
41}
42
43pub enum Background {
44 Deterministic {
45 executor: Arc<Deterministic>,
46 },
47 Production {
48 executor: Arc<smol::Executor<'static>>,
49 _stop: channel::Sender<()>,
50 },
51}
52
53type AnyLocalFuture = Pin<Box<dyn 'static + Future<Output = Box<dyn Any + 'static>>>>;
54type AnyFuture = Pin<Box<dyn 'static + Send + Future<Output = Box<dyn Any + Send + 'static>>>>;
55type AnyTask = async_task::Task<Box<dyn Any + Send + 'static>>;
56type AnyLocalTask = async_task::Task<Box<dyn Any + 'static>>;
57
58#[must_use]
59pub enum Task<T> {
60 Ready(Option<T>),
61 Local {
62 any_task: AnyLocalTask,
63 result_type: PhantomData<T>,
64 },
65 Send {
66 any_task: AnyTask,
67 result_type: PhantomData<T>,
68 },
69}
70
71unsafe impl<T: Send> Send for Task<T> {}
72
73struct DeterministicState {
74 rng: StdRng,
75 seed: u64,
76 scheduled_from_foreground: HashMap<usize, Vec<ForegroundRunnable>>,
77 scheduled_from_background: Vec<Runnable>,
78 forbid_parking: bool,
79 block_on_ticks: RangeInclusive<usize>,
80 now: Instant,
81 pending_timers: Vec<(Instant, barrier::Sender)>,
82 waiting_backtrace: Option<Backtrace>,
83}
84
85struct ForegroundRunnable {
86 runnable: Runnable,
87 main: bool,
88}
89
90pub struct Deterministic {
91 state: Arc<Mutex<DeterministicState>>,
92 parker: Mutex<parking::Parker>,
93}
94
95impl Deterministic {
96 pub fn new(seed: u64) -> Arc<Self> {
97 Arc::new(Self {
98 state: Arc::new(Mutex::new(DeterministicState {
99 rng: StdRng::seed_from_u64(seed),
100 seed,
101 scheduled_from_foreground: Default::default(),
102 scheduled_from_background: Default::default(),
103 forbid_parking: false,
104 block_on_ticks: 0..=1000,
105 now: Instant::now(),
106 pending_timers: Default::default(),
107 waiting_backtrace: None,
108 })),
109 parker: Default::default(),
110 })
111 }
112
113 pub fn build_background(self: &Arc<Self>) -> Arc<Background> {
114 Arc::new(Background::Deterministic {
115 executor: self.clone(),
116 })
117 }
118
119 pub fn build_foreground(self: &Arc<Self>, id: usize) -> Rc<Foreground> {
120 Rc::new(Foreground::Deterministic {
121 cx_id: id,
122 executor: self.clone(),
123 })
124 }
125
126 fn spawn_from_foreground(
127 &self,
128 cx_id: usize,
129 future: AnyLocalFuture,
130 main: bool,
131 ) -> AnyLocalTask {
132 let state = self.state.clone();
133 let unparker = self.parker.lock().unparker();
134 let (runnable, task) = async_task::spawn_local(future, move |runnable| {
135 let mut state = state.lock();
136 state
137 .scheduled_from_foreground
138 .entry(cx_id)
139 .or_default()
140 .push(ForegroundRunnable { runnable, main });
141 unparker.unpark();
142 });
143 runnable.schedule();
144 task
145 }
146
147 fn spawn(&self, future: AnyFuture) -> AnyTask {
148 let state = self.state.clone();
149 let unparker = self.parker.lock().unparker();
150 let (runnable, task) = async_task::spawn(future, move |runnable| {
151 let mut state = state.lock();
152 state.scheduled_from_background.push(runnable);
153 unparker.unpark();
154 });
155 runnable.schedule();
156 task
157 }
158
159 fn run<'a>(
160 &self,
161 cx_id: usize,
162 main_future: Pin<Box<dyn 'a + Future<Output = Box<dyn Any>>>>,
163 ) -> Box<dyn Any> {
164 let woken = Arc::new(AtomicBool::new(false));
165
166 let state = self.state.clone();
167 let unparker = self.parker.lock().unparker();
168 let (runnable, mut main_task) = unsafe {
169 async_task::spawn_unchecked(main_future, move |runnable| {
170 let mut state = state.lock();
171 state
172 .scheduled_from_foreground
173 .entry(cx_id)
174 .or_default()
175 .push(ForegroundRunnable {
176 runnable,
177 main: true,
178 });
179 unparker.unpark();
180 })
181 };
182 runnable.schedule();
183
184 loop {
185 if let Some(result) = self.run_internal(woken.clone(), Some(&mut main_task)) {
186 return result;
187 }
188
189 if !woken.load(SeqCst) {
190 self.state.lock().will_park();
191 }
192
193 woken.store(false, SeqCst);
194 self.parker.lock().park();
195 }
196 }
197
198 pub(crate) fn run_until_parked(&self) {
199 let woken = Arc::new(AtomicBool::new(false));
200 self.run_internal(woken, None);
201 }
202
203 fn run_internal(
204 &self,
205 woken: Arc<AtomicBool>,
206 mut main_task: Option<&mut AnyLocalTask>,
207 ) -> Option<Box<dyn Any>> {
208 let unparker = self.parker.lock().unparker();
209 let waker = waker_fn(move || {
210 woken.store(true, SeqCst);
211 unparker.unpark();
212 });
213
214 let mut cx = Context::from_waker(&waker);
215 loop {
216 let mut state = self.state.lock();
217
218 if state.scheduled_from_foreground.is_empty()
219 && state.scheduled_from_background.is_empty()
220 {
221 return None;
222 }
223
224 if !state.scheduled_from_background.is_empty() && state.rng.gen() {
225 let background_len = state.scheduled_from_background.len();
226 let ix = state.rng.gen_range(0..background_len);
227 let runnable = state.scheduled_from_background.remove(ix);
228 drop(state);
229 runnable.run();
230 } else if !state.scheduled_from_foreground.is_empty() {
231 let available_cx_ids = state
232 .scheduled_from_foreground
233 .keys()
234 .copied()
235 .collect::<Vec<_>>();
236 let cx_id_to_run = *available_cx_ids.iter().choose(&mut state.rng).unwrap();
237 let scheduled_from_cx = state
238 .scheduled_from_foreground
239 .get_mut(&cx_id_to_run)
240 .unwrap();
241 let foreground_runnable = scheduled_from_cx.remove(0);
242 if scheduled_from_cx.is_empty() {
243 state.scheduled_from_foreground.remove(&cx_id_to_run);
244 }
245
246 drop(state);
247
248 foreground_runnable.runnable.run();
249 if let Some(main_task) = main_task.as_mut() {
250 if foreground_runnable.main {
251 if let Poll::Ready(result) = main_task.poll(&mut cx) {
252 return Some(result);
253 }
254 }
255 }
256 }
257 }
258 }
259
260 fn block<F, T>(&self, future: &mut F, max_ticks: usize) -> Option<T>
261 where
262 F: Unpin + Future<Output = T>,
263 {
264 let unparker = self.parker.lock().unparker();
265 let waker = waker_fn(move || {
266 unparker.unpark();
267 });
268
269 let mut cx = Context::from_waker(&waker);
270 for _ in 0..max_ticks {
271 let mut state = self.state.lock();
272 let runnable_count = state.scheduled_from_background.len();
273 let ix = state.rng.gen_range(0..=runnable_count);
274 if ix < state.scheduled_from_background.len() {
275 let runnable = state.scheduled_from_background.remove(ix);
276 drop(state);
277 runnable.run();
278 } else {
279 drop(state);
280 if let Poll::Ready(result) = future.poll(&mut cx) {
281 return Some(result);
282 }
283 let mut state = self.state.lock();
284 if state.scheduled_from_background.is_empty() {
285 state.will_park();
286 drop(state);
287 self.parker.lock().park();
288 }
289
290 continue;
291 }
292 }
293
294 None
295 }
296}
297
298impl DeterministicState {
299 fn will_park(&mut self) {
300 if self.forbid_parking {
301 let mut backtrace_message = String::new();
302 if let Some(backtrace) = self.waiting_backtrace.as_mut() {
303 backtrace.resolve();
304 backtrace_message = format!(
305 "\nbacktrace of waiting future:\n{:?}",
306 util::CwdBacktrace(backtrace)
307 );
308 }
309
310 panic!(
311 "deterministic executor parked after a call to forbid_parking{}",
312 backtrace_message
313 );
314 }
315 }
316}
317
318impl Foreground {
319 pub fn platform(dispatcher: Arc<dyn platform::Dispatcher>) -> Result<Self> {
320 if dispatcher.is_main_thread() {
321 Ok(Self::Platform {
322 dispatcher,
323 _not_send_or_sync: PhantomData,
324 })
325 } else {
326 Err(anyhow!("must be constructed on main thread"))
327 }
328 }
329
330 pub fn spawn<T: 'static>(&self, future: impl Future<Output = T> + 'static) -> Task<T> {
331 let future = any_local_future(future);
332 let any_task = match self {
333 Self::Deterministic { cx_id, executor } => {
334 executor.spawn_from_foreground(*cx_id, future, false)
335 }
336 Self::Platform { dispatcher, .. } => {
337 fn spawn_inner(
338 future: AnyLocalFuture,
339 dispatcher: &Arc<dyn Dispatcher>,
340 ) -> AnyLocalTask {
341 let dispatcher = dispatcher.clone();
342 let schedule =
343 move |runnable: Runnable| dispatcher.run_on_main_thread(runnable);
344 let (runnable, task) = async_task::spawn_local(future, schedule);
345 runnable.schedule();
346 task
347 }
348 spawn_inner(future, dispatcher)
349 }
350 };
351 Task::local(any_task)
352 }
353
354 pub fn run<T: 'static>(&self, future: impl Future<Output = T>) -> T {
355 let future = async move { Box::new(future.await) as Box<dyn Any> }.boxed_local();
356 let result = match self {
357 Self::Deterministic { cx_id, executor } => executor.run(*cx_id, future),
358 Self::Platform { .. } => panic!("you can't call run on a platform foreground executor"),
359 };
360 *result.downcast().unwrap()
361 }
362
363 pub fn run_until_parked(&self) {
364 match self {
365 Self::Deterministic { executor, .. } => executor.run_until_parked(),
366 _ => panic!("this method can only be called on a deterministic executor"),
367 }
368 }
369
370 pub fn parking_forbidden(&self) -> bool {
371 match self {
372 Self::Deterministic { executor, .. } => executor.state.lock().forbid_parking,
373 _ => panic!("this method can only be called on a deterministic executor"),
374 }
375 }
376
377 pub fn start_waiting(&self) {
378 match self {
379 Self::Deterministic { executor, .. } => {
380 executor.state.lock().waiting_backtrace = Some(Backtrace::new_unresolved());
381 }
382 _ => panic!("this method can only be called on a deterministic executor"),
383 }
384 }
385
386 pub fn finish_waiting(&self) {
387 match self {
388 Self::Deterministic { executor, .. } => {
389 executor.state.lock().waiting_backtrace.take();
390 }
391 _ => panic!("this method can only be called on a deterministic executor"),
392 }
393 }
394
395 pub fn forbid_parking(&self) {
396 match self {
397 Self::Deterministic { executor, .. } => {
398 let mut state = executor.state.lock();
399 state.forbid_parking = true;
400 state.rng = StdRng::seed_from_u64(state.seed);
401 }
402 _ => panic!("this method can only be called on a deterministic executor"),
403 }
404 }
405
406 pub async fn timer(&self, duration: Duration) {
407 match self {
408 Self::Deterministic { executor, .. } => {
409 let (tx, mut rx) = barrier::channel();
410 {
411 let mut state = executor.state.lock();
412 let wakeup_at = state.now + duration;
413 state.pending_timers.push((wakeup_at, tx));
414 }
415 rx.recv().await;
416 }
417 _ => {
418 Timer::after(duration).await;
419 }
420 }
421 }
422
423 pub fn advance_clock(&self, duration: Duration) {
424 match self {
425 Self::Deterministic { executor, .. } => {
426 executor.run_until_parked();
427
428 let mut state = executor.state.lock();
429 state.now += duration;
430 let now = state.now;
431 let mut pending_timers = mem::take(&mut state.pending_timers);
432 drop(state);
433
434 pending_timers.retain(|(wakeup, _)| *wakeup > now);
435 executor.state.lock().pending_timers.extend(pending_timers);
436 }
437 _ => panic!("this method can only be called on a deterministic executor"),
438 }
439 }
440
441 pub fn set_block_on_ticks(&self, range: RangeInclusive<usize>) {
442 match self {
443 Self::Deterministic { executor, .. } => executor.state.lock().block_on_ticks = range,
444 _ => panic!("this method can only be called on a deterministic executor"),
445 }
446 }
447}
448
449impl Background {
450 pub fn new() -> Self {
451 let executor = Arc::new(Executor::new());
452 let stop = channel::unbounded::<()>();
453
454 for i in 0..2 * num_cpus::get() {
455 let executor = executor.clone();
456 let stop = stop.1.clone();
457 thread::Builder::new()
458 .name(format!("background-executor-{}", i))
459 .spawn(move || smol::block_on(executor.run(stop.recv())))
460 .unwrap();
461 }
462
463 Self::Production {
464 executor,
465 _stop: stop.0,
466 }
467 }
468
469 pub fn num_cpus(&self) -> usize {
470 num_cpus::get()
471 }
472
473 pub fn spawn<T, F>(&self, future: F) -> Task<T>
474 where
475 T: 'static + Send,
476 F: Send + Future<Output = T> + 'static,
477 {
478 let future = any_future(future);
479 let any_task = match self {
480 Self::Production { executor, .. } => executor.spawn(future),
481 Self::Deterministic { executor } => executor.spawn(future),
482 };
483 Task::send(any_task)
484 }
485
486 pub fn block<F, T>(&self, future: F) -> T
487 where
488 F: Future<Output = T>,
489 {
490 smol::pin!(future);
491 match self {
492 Self::Production { .. } => smol::block_on(&mut future),
493 Self::Deterministic { executor, .. } => {
494 executor.block(&mut future, usize::MAX).unwrap()
495 }
496 }
497 }
498
499 pub fn block_with_timeout<F, T>(
500 &self,
501 timeout: Duration,
502 future: F,
503 ) -> Result<T, impl Future<Output = T>>
504 where
505 T: 'static,
506 F: 'static + Unpin + Future<Output = T>,
507 {
508 let mut future = any_local_future(future);
509 if !timeout.is_zero() {
510 let output = match self {
511 Self::Production { .. } => smol::block_on(util::timeout(timeout, &mut future)).ok(),
512 Self::Deterministic { executor, .. } => {
513 let max_ticks = {
514 let mut state = executor.state.lock();
515 let range = state.block_on_ticks.clone();
516 state.rng.gen_range(range)
517 };
518 executor.block(&mut future, max_ticks)
519 }
520 };
521 if let Some(output) = output {
522 return Ok(*output.downcast().unwrap());
523 }
524 }
525 Err(async { *future.await.downcast().unwrap() })
526 }
527
528 pub async fn scoped<'scope, F>(&self, scheduler: F)
529 where
530 F: FnOnce(&mut Scope<'scope>),
531 {
532 let mut scope = Scope {
533 futures: Default::default(),
534 _phantom: PhantomData,
535 };
536 (scheduler)(&mut scope);
537 let spawned = scope
538 .futures
539 .into_iter()
540 .map(|f| self.spawn(f))
541 .collect::<Vec<_>>();
542 for task in spawned {
543 task.await;
544 }
545 }
546
547 pub async fn simulate_random_delay(&self) {
548 match self {
549 Self::Deterministic { executor, .. } => {
550 if executor.state.lock().rng.gen_bool(0.2) {
551 let yields = executor.state.lock().rng.gen_range(1..=10);
552 for _ in 0..yields {
553 yield_now().await;
554 }
555 }
556 }
557 _ => panic!("this method can only be called on a deterministic executor"),
558 }
559 }
560}
561
562pub struct Scope<'a> {
563 futures: Vec<Pin<Box<dyn Future<Output = ()> + Send + 'static>>>,
564 _phantom: PhantomData<&'a ()>,
565}
566
567impl<'a> Scope<'a> {
568 pub fn spawn<F>(&mut self, f: F)
569 where
570 F: Future<Output = ()> + Send + 'a,
571 {
572 let f = unsafe {
573 mem::transmute::<
574 Pin<Box<dyn Future<Output = ()> + Send + 'a>>,
575 Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
576 >(Box::pin(f))
577 };
578 self.futures.push(f);
579 }
580}
581
582impl<T> Task<T> {
583 pub fn ready(value: T) -> Self {
584 Self::Ready(Some(value))
585 }
586
587 fn local(any_task: AnyLocalTask) -> Self {
588 Self::Local {
589 any_task,
590 result_type: PhantomData,
591 }
592 }
593
594 pub fn detach(self) {
595 match self {
596 Task::Ready(_) => {}
597 Task::Local { any_task, .. } => any_task.detach(),
598 Task::Send { any_task, .. } => any_task.detach(),
599 }
600 }
601}
602
603impl<T: 'static, E: 'static + Display> Task<Result<T, E>> {
604 pub fn detach_and_log_err(self, cx: &mut MutableAppContext) {
605 cx.spawn(|_| async move {
606 if let Err(err) = self.await {
607 log::error!("{}", err);
608 }
609 })
610 .detach();
611 }
612}
613
614impl<T: Send> Task<T> {
615 fn send(any_task: AnyTask) -> Self {
616 Self::Send {
617 any_task,
618 result_type: PhantomData,
619 }
620 }
621}
622
623impl<T: fmt::Debug> fmt::Debug for Task<T> {
624 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
625 match self {
626 Task::Ready(value) => value.fmt(f),
627 Task::Local { any_task, .. } => any_task.fmt(f),
628 Task::Send { any_task, .. } => any_task.fmt(f),
629 }
630 }
631}
632
633impl<T: 'static> Future for Task<T> {
634 type Output = T;
635
636 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
637 match unsafe { self.get_unchecked_mut() } {
638 Task::Ready(value) => Poll::Ready(value.take().unwrap()),
639 Task::Local { any_task, .. } => {
640 any_task.poll(cx).map(|value| *value.downcast().unwrap())
641 }
642 Task::Send { any_task, .. } => {
643 any_task.poll(cx).map(|value| *value.downcast().unwrap())
644 }
645 }
646 }
647}
648
649fn any_future<T, F>(future: F) -> AnyFuture
650where
651 T: 'static + Send,
652 F: Future<Output = T> + Send + 'static,
653{
654 async { Box::new(future.await) as Box<dyn Any + Send> }.boxed()
655}
656
657fn any_local_future<T, F>(future: F) -> AnyLocalFuture
658where
659 T: 'static,
660 F: Future<Output = T> + 'static,
661{
662 async { Box::new(future.await) as Box<dyn Any> }.boxed_local()
663}