1use anyhow::{anyhow, Result};
2use async_task::Runnable;
3use backtrace::{Backtrace, BacktraceFmt, BytesOrWideString};
4use collections::HashMap;
5use parking_lot::Mutex;
6use postage::{barrier, prelude::Stream as _};
7use rand::prelude::*;
8use smol::{channel, future::yield_now, prelude::*, Executor, Timer};
9use std::{
10 any::Any,
11 fmt::{self, Debug, Display},
12 marker::PhantomData,
13 mem,
14 ops::RangeInclusive,
15 pin::Pin,
16 rc::Rc,
17 sync::{
18 atomic::{AtomicBool, Ordering::SeqCst},
19 Arc,
20 },
21 task::{Context, Poll},
22 thread,
23 time::{Duration, Instant},
24};
25use waker_fn::waker_fn;
26
27use crate::{
28 platform::{self, Dispatcher},
29 util, MutableAppContext,
30};
31
32pub enum Foreground {
33 Platform {
34 dispatcher: Arc<dyn platform::Dispatcher>,
35 _not_send_or_sync: PhantomData<Rc<()>>,
36 },
37 Deterministic {
38 cx_id: usize,
39 executor: Arc<Deterministic>,
40 },
41}
42
43pub enum Background {
44 Deterministic {
45 executor: Arc<Deterministic>,
46 },
47 Production {
48 executor: Arc<smol::Executor<'static>>,
49 _stop: channel::Sender<()>,
50 },
51}
52
53type AnyLocalFuture = Pin<Box<dyn 'static + Future<Output = Box<dyn Any + 'static>>>>;
54type AnyFuture = Pin<Box<dyn 'static + Send + Future<Output = Box<dyn Any + Send + 'static>>>>;
55type AnyTask = async_task::Task<Box<dyn Any + Send + 'static>>;
56type AnyLocalTask = async_task::Task<Box<dyn Any + 'static>>;
57
58#[must_use]
59pub enum Task<T> {
60 Ready(Option<T>),
61 Local {
62 any_task: AnyLocalTask,
63 result_type: PhantomData<T>,
64 },
65 Send {
66 any_task: AnyTask,
67 result_type: PhantomData<T>,
68 },
69}
70
71unsafe impl<T: Send> Send for Task<T> {}
72
73struct DeterministicState {
74 rng: StdRng,
75 seed: u64,
76 scheduled_from_foreground: HashMap<usize, Vec<ForegroundRunnable>>,
77 scheduled_from_background: Vec<Runnable>,
78 forbid_parking: bool,
79 block_on_ticks: RangeInclusive<usize>,
80 now: Instant,
81 pending_timers: Vec<(Instant, barrier::Sender)>,
82 waiting_backtrace: Option<Backtrace>,
83}
84
85struct ForegroundRunnable {
86 runnable: Runnable,
87 main: bool,
88}
89
90pub struct Deterministic {
91 state: Arc<Mutex<DeterministicState>>,
92 parker: Mutex<parking::Parker>,
93}
94
95impl Deterministic {
96 pub fn new(seed: u64) -> Arc<Self> {
97 Arc::new(Self {
98 state: Arc::new(Mutex::new(DeterministicState {
99 rng: StdRng::seed_from_u64(seed),
100 seed,
101 scheduled_from_foreground: Default::default(),
102 scheduled_from_background: Default::default(),
103 forbid_parking: false,
104 block_on_ticks: 0..=1000,
105 now: Instant::now(),
106 pending_timers: Default::default(),
107 waiting_backtrace: None,
108 })),
109 parker: Default::default(),
110 })
111 }
112
113 pub fn build_background(self: &Arc<Self>) -> Arc<Background> {
114 Arc::new(Background::Deterministic {
115 executor: self.clone(),
116 })
117 }
118
119 pub fn build_foreground(self: &Arc<Self>, id: usize) -> Rc<Foreground> {
120 Rc::new(Foreground::Deterministic {
121 cx_id: id,
122 executor: self.clone(),
123 })
124 }
125
126 fn spawn_from_foreground(
127 &self,
128 cx_id: usize,
129 future: AnyLocalFuture,
130 main: bool,
131 ) -> AnyLocalTask {
132 let state = self.state.clone();
133 let unparker = self.parker.lock().unparker();
134 let (runnable, task) = async_task::spawn_local(future, move |runnable| {
135 let mut state = state.lock();
136 state
137 .scheduled_from_foreground
138 .entry(cx_id)
139 .or_default()
140 .push(ForegroundRunnable { runnable, main });
141 unparker.unpark();
142 });
143 runnable.schedule();
144 task
145 }
146
147 fn spawn(&self, future: AnyFuture) -> AnyTask {
148 let state = self.state.clone();
149 let unparker = self.parker.lock().unparker();
150 let (runnable, task) = async_task::spawn(future, move |runnable| {
151 let mut state = state.lock();
152 state.scheduled_from_background.push(runnable);
153 unparker.unpark();
154 });
155 runnable.schedule();
156 task
157 }
158
159 fn run(&self, cx_id: usize, main_future: AnyLocalFuture) -> Box<dyn Any> {
160 let woken = Arc::new(AtomicBool::new(false));
161 let mut main_task = self.spawn_from_foreground(cx_id, main_future, true);
162
163 loop {
164 if let Some(result) = self.run_internal(woken.clone(), Some(&mut main_task)) {
165 return result;
166 }
167
168 if !woken.load(SeqCst) {
169 self.state.lock().will_park();
170 }
171
172 woken.store(false, SeqCst);
173 self.parker.lock().park();
174 }
175 }
176
177 fn run_until_parked(&self) {
178 let woken = Arc::new(AtomicBool::new(false));
179 self.run_internal(woken, None);
180 }
181
182 fn run_internal(
183 &self,
184 woken: Arc<AtomicBool>,
185 mut main_task: Option<&mut AnyLocalTask>,
186 ) -> Option<Box<dyn Any>> {
187 let unparker = self.parker.lock().unparker();
188 let waker = waker_fn(move || {
189 woken.store(true, SeqCst);
190 unparker.unpark();
191 });
192
193 let mut cx = Context::from_waker(&waker);
194 loop {
195 let mut state = self.state.lock();
196
197 if state.scheduled_from_foreground.is_empty()
198 && state.scheduled_from_background.is_empty()
199 {
200 return None;
201 }
202
203 if !state.scheduled_from_background.is_empty() && state.rng.gen() {
204 let background_len = state.scheduled_from_background.len();
205 let ix = state.rng.gen_range(0..background_len);
206 let runnable = state.scheduled_from_background.remove(ix);
207 drop(state);
208 runnable.run();
209 } else if !state.scheduled_from_foreground.is_empty() {
210 let available_cx_ids = state
211 .scheduled_from_foreground
212 .keys()
213 .copied()
214 .collect::<Vec<_>>();
215 let cx_id_to_run = *available_cx_ids.iter().choose(&mut state.rng).unwrap();
216 let scheduled_from_cx = state
217 .scheduled_from_foreground
218 .get_mut(&cx_id_to_run)
219 .unwrap();
220 let foreground_runnable = scheduled_from_cx.remove(0);
221 if scheduled_from_cx.is_empty() {
222 state.scheduled_from_foreground.remove(&cx_id_to_run);
223 }
224
225 drop(state);
226
227 foreground_runnable.runnable.run();
228 if let Some(main_task) = main_task.as_mut() {
229 if foreground_runnable.main {
230 if let Poll::Ready(result) = main_task.poll(&mut cx) {
231 return Some(result);
232 }
233 }
234 }
235 }
236 }
237 }
238
239 fn block<F, T>(&self, future: &mut F, max_ticks: usize) -> Option<T>
240 where
241 F: Unpin + Future<Output = T>,
242 {
243 let unparker = self.parker.lock().unparker();
244 let waker = waker_fn(move || {
245 unparker.unpark();
246 });
247
248 let mut cx = Context::from_waker(&waker);
249 for _ in 0..max_ticks {
250 let mut state = self.state.lock();
251 let runnable_count = state.scheduled_from_background.len();
252 let ix = state.rng.gen_range(0..=runnable_count);
253 if ix < state.scheduled_from_background.len() {
254 let runnable = state.scheduled_from_background.remove(ix);
255 drop(state);
256 runnable.run();
257 } else {
258 drop(state);
259 if let Poll::Ready(result) = future.poll(&mut cx) {
260 return Some(result);
261 }
262 let mut state = self.state.lock();
263 if state.scheduled_from_background.is_empty() {
264 state.will_park();
265 drop(state);
266 self.parker.lock().park();
267 }
268
269 continue;
270 }
271 }
272
273 None
274 }
275}
276
277impl DeterministicState {
278 fn will_park(&mut self) {
279 if self.forbid_parking {
280 let mut backtrace_message = String::new();
281 if let Some(backtrace) = self.waiting_backtrace.as_mut() {
282 backtrace.resolve();
283 backtrace_message = format!(
284 "\nbacktrace of waiting future:\n{:?}",
285 CwdBacktrace::new(backtrace)
286 );
287 }
288
289 panic!(
290 "deterministic executor parked after a call to forbid_parking{}",
291 backtrace_message
292 );
293 }
294 }
295}
296
297struct CwdBacktrace<'a> {
298 backtrace: &'a Backtrace,
299}
300
301impl<'a> CwdBacktrace<'a> {
302 fn new(backtrace: &'a Backtrace) -> Self {
303 Self { backtrace }
304 }
305}
306
307impl<'a> Debug for CwdBacktrace<'a> {
308 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> std::fmt::Result {
309 let cwd = std::env::current_dir().unwrap();
310 let mut print_path = |fmt: &mut fmt::Formatter<'_>, path: BytesOrWideString<'_>| {
311 fmt::Display::fmt(&path, fmt)
312 };
313 let mut fmt = BacktraceFmt::new(f, backtrace::PrintFmt::Full, &mut print_path);
314 for frame in self.backtrace.frames() {
315 let mut formatted_frame = fmt.frame();
316 if frame
317 .symbols()
318 .iter()
319 .any(|s| s.filename().map_or(false, |f| f.starts_with(&cwd)))
320 {
321 formatted_frame.backtrace_frame(frame)?;
322 }
323 }
324 fmt.finish()
325 }
326}
327
328impl Foreground {
329 pub fn platform(dispatcher: Arc<dyn platform::Dispatcher>) -> Result<Self> {
330 if dispatcher.is_main_thread() {
331 Ok(Self::Platform {
332 dispatcher,
333 _not_send_or_sync: PhantomData,
334 })
335 } else {
336 Err(anyhow!("must be constructed on main thread"))
337 }
338 }
339
340 pub fn spawn<T: 'static>(&self, future: impl Future<Output = T> + 'static) -> Task<T> {
341 let future = any_local_future(future);
342 let any_task = match self {
343 Self::Deterministic { cx_id, executor } => {
344 executor.spawn_from_foreground(*cx_id, future, false)
345 }
346 Self::Platform { dispatcher, .. } => {
347 fn spawn_inner(
348 future: AnyLocalFuture,
349 dispatcher: &Arc<dyn Dispatcher>,
350 ) -> AnyLocalTask {
351 let dispatcher = dispatcher.clone();
352 let schedule =
353 move |runnable: Runnable| dispatcher.run_on_main_thread(runnable);
354 let (runnable, task) = async_task::spawn_local(future, schedule);
355 runnable.schedule();
356 task
357 }
358 spawn_inner(future, dispatcher)
359 }
360 };
361 Task::local(any_task)
362 }
363
364 pub fn run<T: 'static>(&self, future: impl 'static + Future<Output = T>) -> T {
365 let future = any_local_future(future);
366 let any_value = match self {
367 Self::Deterministic { cx_id, executor } => executor.run(*cx_id, future),
368 Self::Platform { .. } => panic!("you can't call run on a platform foreground executor"),
369 };
370 *any_value.downcast().unwrap()
371 }
372
373 pub fn run_until_parked(&self) {
374 match self {
375 Self::Deterministic { executor, .. } => executor.run_until_parked(),
376 _ => panic!("this method can only be called on a deterministic executor"),
377 }
378 }
379
380 pub fn parking_forbidden(&self) -> bool {
381 match self {
382 Self::Deterministic { executor, .. } => executor.state.lock().forbid_parking,
383 _ => panic!("this method can only be called on a deterministic executor"),
384 }
385 }
386
387 pub fn start_waiting(&self) {
388 match self {
389 Self::Deterministic { executor, .. } => {
390 executor.state.lock().waiting_backtrace = Some(Backtrace::new_unresolved());
391 }
392 _ => panic!("this method can only be called on a deterministic executor"),
393 }
394 }
395
396 pub fn finish_waiting(&self) {
397 match self {
398 Self::Deterministic { executor, .. } => {
399 executor.state.lock().waiting_backtrace.take();
400 }
401 _ => panic!("this method can only be called on a deterministic executor"),
402 }
403 }
404
405 pub fn forbid_parking(&self) {
406 match self {
407 Self::Deterministic { executor, .. } => {
408 let mut state = executor.state.lock();
409 state.forbid_parking = true;
410 state.rng = StdRng::seed_from_u64(state.seed);
411 }
412 _ => panic!("this method can only be called on a deterministic executor"),
413 }
414 }
415
416 pub async fn timer(&self, duration: Duration) {
417 match self {
418 Self::Deterministic { executor, .. } => {
419 let (tx, mut rx) = barrier::channel();
420 {
421 let mut state = executor.state.lock();
422 let wakeup_at = state.now + duration;
423 state.pending_timers.push((wakeup_at, tx));
424 }
425 rx.recv().await;
426 }
427 _ => {
428 Timer::after(duration).await;
429 }
430 }
431 }
432
433 pub fn advance_clock(&self, duration: Duration) {
434 match self {
435 Self::Deterministic { executor, .. } => {
436 executor.run_until_parked();
437
438 let mut state = executor.state.lock();
439 state.now += duration;
440 let now = state.now;
441 let mut pending_timers = mem::take(&mut state.pending_timers);
442 drop(state);
443
444 pending_timers.retain(|(wakeup, _)| *wakeup > now);
445 executor.state.lock().pending_timers.extend(pending_timers);
446 }
447 _ => panic!("this method can only be called on a deterministic executor"),
448 }
449 }
450
451 pub fn set_block_on_ticks(&self, range: RangeInclusive<usize>) {
452 match self {
453 Self::Deterministic { executor, .. } => executor.state.lock().block_on_ticks = range,
454 _ => panic!("this method can only be called on a deterministic executor"),
455 }
456 }
457}
458
459impl Background {
460 pub fn new() -> Self {
461 let executor = Arc::new(Executor::new());
462 let stop = channel::unbounded::<()>();
463
464 for i in 0..2 * num_cpus::get() {
465 let executor = executor.clone();
466 let stop = stop.1.clone();
467 thread::Builder::new()
468 .name(format!("background-executor-{}", i))
469 .spawn(move || smol::block_on(executor.run(stop.recv())))
470 .unwrap();
471 }
472
473 Self::Production {
474 executor,
475 _stop: stop.0,
476 }
477 }
478
479 pub fn num_cpus(&self) -> usize {
480 num_cpus::get()
481 }
482
483 pub fn spawn<T, F>(&self, future: F) -> Task<T>
484 where
485 T: 'static + Send,
486 F: Send + Future<Output = T> + 'static,
487 {
488 let future = any_future(future);
489 let any_task = match self {
490 Self::Production { executor, .. } => executor.spawn(future),
491 Self::Deterministic { executor } => executor.spawn(future),
492 };
493 Task::send(any_task)
494 }
495
496 pub fn block<F, T>(&self, future: F) -> T
497 where
498 F: Future<Output = T>,
499 {
500 smol::pin!(future);
501 match self {
502 Self::Production { .. } => smol::block_on(&mut future),
503 Self::Deterministic { executor, .. } => {
504 executor.block(&mut future, usize::MAX).unwrap()
505 }
506 }
507 }
508
509 pub fn block_with_timeout<F, T>(
510 &self,
511 timeout: Duration,
512 future: F,
513 ) -> Result<T, impl Future<Output = T>>
514 where
515 T: 'static,
516 F: 'static + Unpin + Future<Output = T>,
517 {
518 let mut future = any_local_future(future);
519 if !timeout.is_zero() {
520 let output = match self {
521 Self::Production { .. } => smol::block_on(util::timeout(timeout, &mut future)).ok(),
522 Self::Deterministic { executor, .. } => {
523 let max_ticks = {
524 let mut state = executor.state.lock();
525 let range = state.block_on_ticks.clone();
526 state.rng.gen_range(range)
527 };
528 executor.block(&mut future, max_ticks)
529 }
530 };
531 if let Some(output) = output {
532 return Ok(*output.downcast().unwrap());
533 }
534 }
535 Err(async { *future.await.downcast().unwrap() })
536 }
537
538 pub async fn scoped<'scope, F>(&self, scheduler: F)
539 where
540 F: FnOnce(&mut Scope<'scope>),
541 {
542 let mut scope = Scope {
543 futures: Default::default(),
544 _phantom: PhantomData,
545 };
546 (scheduler)(&mut scope);
547 let spawned = scope
548 .futures
549 .into_iter()
550 .map(|f| self.spawn(f))
551 .collect::<Vec<_>>();
552 for task in spawned {
553 task.await;
554 }
555 }
556
557 pub async fn simulate_random_delay(&self) {
558 match self {
559 Self::Deterministic { executor, .. } => {
560 if executor.state.lock().rng.gen_bool(0.2) {
561 let yields = executor.state.lock().rng.gen_range(1..=10);
562 for _ in 0..yields {
563 yield_now().await;
564 }
565 }
566 }
567 _ => panic!("this method can only be called on a deterministic executor"),
568 }
569 }
570}
571
572pub struct Scope<'a> {
573 futures: Vec<Pin<Box<dyn Future<Output = ()> + Send + 'static>>>,
574 _phantom: PhantomData<&'a ()>,
575}
576
577impl<'a> Scope<'a> {
578 pub fn spawn<F>(&mut self, f: F)
579 where
580 F: Future<Output = ()> + Send + 'a,
581 {
582 let f = unsafe {
583 mem::transmute::<
584 Pin<Box<dyn Future<Output = ()> + Send + 'a>>,
585 Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
586 >(Box::pin(f))
587 };
588 self.futures.push(f);
589 }
590}
591
592impl<T> Task<T> {
593 pub fn ready(value: T) -> Self {
594 Self::Ready(Some(value))
595 }
596
597 fn local(any_task: AnyLocalTask) -> Self {
598 Self::Local {
599 any_task,
600 result_type: PhantomData,
601 }
602 }
603
604 pub fn detach(self) {
605 match self {
606 Task::Ready(_) => {}
607 Task::Local { any_task, .. } => any_task.detach(),
608 Task::Send { any_task, .. } => any_task.detach(),
609 }
610 }
611}
612
613impl<T: 'static, E: 'static + Display> Task<Result<T, E>> {
614 pub fn detach_and_log_err(self, cx: &mut MutableAppContext) {
615 cx.spawn(|_| async move {
616 if let Err(err) = self.await {
617 log::error!("{}", err);
618 }
619 })
620 .detach();
621 }
622}
623
624impl<T: Send> Task<T> {
625 fn send(any_task: AnyTask) -> Self {
626 Self::Send {
627 any_task,
628 result_type: PhantomData,
629 }
630 }
631}
632
633impl<T: fmt::Debug> fmt::Debug for Task<T> {
634 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
635 match self {
636 Task::Ready(value) => value.fmt(f),
637 Task::Local { any_task, .. } => any_task.fmt(f),
638 Task::Send { any_task, .. } => any_task.fmt(f),
639 }
640 }
641}
642
643impl<T: 'static> Future for Task<T> {
644 type Output = T;
645
646 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
647 match unsafe { self.get_unchecked_mut() } {
648 Task::Ready(value) => Poll::Ready(value.take().unwrap()),
649 Task::Local { any_task, .. } => {
650 any_task.poll(cx).map(|value| *value.downcast().unwrap())
651 }
652 Task::Send { any_task, .. } => {
653 any_task.poll(cx).map(|value| *value.downcast().unwrap())
654 }
655 }
656 }
657}
658
659fn any_future<T, F>(future: F) -> AnyFuture
660where
661 T: 'static + Send,
662 F: Future<Output = T> + Send + 'static,
663{
664 async { Box::new(future.await) as Box<dyn Any + Send> }.boxed()
665}
666
667fn any_local_future<T, F>(future: F) -> AnyLocalFuture
668where
669 T: 'static,
670 F: Future<Output = T> + 'static,
671{
672 async { Box::new(future.await) as Box<dyn Any> }.boxed_local()
673}