1use anyhow::{anyhow, Result};
2use async_task::Runnable;
3use backtrace::{Backtrace, BacktraceFmt, BytesOrWideString};
4use collections::HashMap;
5use parking_lot::Mutex;
6use postage::{barrier, prelude::Stream as _};
7use rand::prelude::*;
8use smol::{channel, future::yield_now, prelude::*, Executor, Timer};
9use std::{
10 any::Any,
11 fmt::{self, Debug, Display},
12 marker::PhantomData,
13 mem,
14 ops::RangeInclusive,
15 pin::Pin,
16 rc::Rc,
17 sync::{
18 atomic::{AtomicBool, Ordering::SeqCst},
19 Arc,
20 },
21 task::{Context, Poll},
22 thread,
23 time::{Duration, Instant},
24};
25use waker_fn::waker_fn;
26
27use crate::{
28 platform::{self, Dispatcher},
29 util, MutableAppContext,
30};
31
32pub enum Foreground {
33 Platform {
34 dispatcher: Arc<dyn platform::Dispatcher>,
35 _not_send_or_sync: PhantomData<Rc<()>>,
36 },
37 Deterministic {
38 cx_id: usize,
39 executor: Arc<Deterministic>,
40 },
41}
42
43pub enum Background {
44 Deterministic {
45 executor: Arc<Deterministic>,
46 },
47 Production {
48 executor: Arc<smol::Executor<'static>>,
49 _stop: channel::Sender<()>,
50 },
51}
52
53type AnyLocalFuture = Pin<Box<dyn 'static + Future<Output = Box<dyn Any + 'static>>>>;
54type AnyFuture = Pin<Box<dyn 'static + Send + Future<Output = Box<dyn Any + Send + 'static>>>>;
55type AnyTask = async_task::Task<Box<dyn Any + Send + 'static>>;
56type AnyLocalTask = async_task::Task<Box<dyn Any + 'static>>;
57
58#[must_use]
59pub enum Task<T> {
60 Ready(Option<T>),
61 Local {
62 any_task: AnyLocalTask,
63 result_type: PhantomData<T>,
64 },
65 Send {
66 any_task: AnyTask,
67 result_type: PhantomData<T>,
68 },
69}
70
71unsafe impl<T: Send> Send for Task<T> {}
72
73struct DeterministicState {
74 rng: StdRng,
75 seed: u64,
76 scheduled_from_foreground: HashMap<usize, Vec<ForegroundRunnable>>,
77 scheduled_from_background: Vec<Runnable>,
78 forbid_parking: bool,
79 block_on_ticks: RangeInclusive<usize>,
80 now: Instant,
81 pending_timers: Vec<(Instant, barrier::Sender)>,
82 waiting_backtrace: Option<Backtrace>,
83}
84
85struct ForegroundRunnable {
86 runnable: Runnable,
87 main: bool,
88}
89
90pub struct Deterministic {
91 state: Arc<Mutex<DeterministicState>>,
92 parker: Mutex<parking::Parker>,
93}
94
95impl Deterministic {
96 pub fn new(seed: u64) -> Arc<Self> {
97 Arc::new(Self {
98 state: Arc::new(Mutex::new(DeterministicState {
99 rng: StdRng::seed_from_u64(seed),
100 seed,
101 scheduled_from_foreground: Default::default(),
102 scheduled_from_background: Default::default(),
103 forbid_parking: false,
104 block_on_ticks: 0..=1000,
105 now: Instant::now(),
106 pending_timers: Default::default(),
107 waiting_backtrace: None,
108 })),
109 parker: Default::default(),
110 })
111 }
112
113 pub fn build_background(self: &Arc<Self>) -> Arc<Background> {
114 Arc::new(Background::Deterministic {
115 executor: self.clone(),
116 })
117 }
118
119 pub fn build_foreground(self: &Arc<Self>, id: usize) -> Rc<Foreground> {
120 Rc::new(Foreground::Deterministic {
121 cx_id: id,
122 executor: self.clone(),
123 })
124 }
125
126 fn spawn_from_foreground(
127 &self,
128 cx_id: usize,
129 future: AnyLocalFuture,
130 main: bool,
131 ) -> AnyLocalTask {
132 let state = self.state.clone();
133 let unparker = self.parker.lock().unparker();
134 let (runnable, task) = async_task::spawn_local(future, move |runnable| {
135 let mut state = state.lock();
136 state
137 .scheduled_from_foreground
138 .entry(cx_id)
139 .or_default()
140 .push(ForegroundRunnable { runnable, main });
141 unparker.unpark();
142 });
143 runnable.schedule();
144 task
145 }
146
147 fn spawn(&self, future: AnyFuture) -> AnyTask {
148 let state = self.state.clone();
149 let unparker = self.parker.lock().unparker();
150 let (runnable, task) = async_task::spawn(future, move |runnable| {
151 let mut state = state.lock();
152 state.scheduled_from_background.push(runnable);
153 unparker.unpark();
154 });
155 runnable.schedule();
156 task
157 }
158
159 fn run(&self, cx_id: usize, main_future: AnyLocalFuture) -> Box<dyn Any> {
160 let woken = Arc::new(AtomicBool::new(false));
161 let mut main_task = self.spawn_from_foreground(cx_id, main_future, true);
162
163 loop {
164 if let Some(result) = self.run_internal(woken.clone(), Some(&mut main_task)) {
165 return result;
166 }
167
168 if !woken.load(SeqCst) {
169 self.state.lock().will_park();
170 }
171
172 woken.store(false, SeqCst);
173 self.parker.lock().park();
174 }
175 }
176
177 fn run_until_parked(&self) {
178 let woken = Arc::new(AtomicBool::new(false));
179 self.run_internal(woken, None);
180 }
181
182 fn run_internal(
183 &self,
184 woken: Arc<AtomicBool>,
185 mut main_task: Option<&mut AnyLocalTask>,
186 ) -> Option<Box<dyn Any>> {
187 let unparker = self.parker.lock().unparker();
188 let waker = waker_fn(move || {
189 woken.store(true, SeqCst);
190 unparker.unpark();
191 });
192
193 let mut cx = Context::from_waker(&waker);
194 loop {
195 let mut state = self.state.lock();
196
197 if state.scheduled_from_foreground.is_empty()
198 && state.scheduled_from_background.is_empty()
199 {
200 return None;
201 }
202
203 if !state.scheduled_from_background.is_empty() && state.rng.gen() {
204 let background_len = state.scheduled_from_background.len();
205 let ix = state.rng.gen_range(0..background_len);
206 let runnable = state.scheduled_from_background.remove(ix);
207 drop(state);
208 runnable.run();
209 } else if !state.scheduled_from_foreground.is_empty() {
210 let available_cx_ids = state
211 .scheduled_from_foreground
212 .keys()
213 .copied()
214 .collect::<Vec<_>>();
215 let cx_id_to_run = *available_cx_ids.iter().choose(&mut state.rng).unwrap();
216 let scheduled_from_cx = state
217 .scheduled_from_foreground
218 .get_mut(&cx_id_to_run)
219 .unwrap();
220 let foreground_runnable = scheduled_from_cx.remove(0);
221 if scheduled_from_cx.is_empty() {
222 state.scheduled_from_foreground.remove(&cx_id_to_run);
223 }
224
225 drop(state);
226
227 foreground_runnable.runnable.run();
228 if let Some(main_task) = main_task.as_mut() {
229 if foreground_runnable.main {
230 if let Poll::Ready(result) = main_task.poll(&mut cx) {
231 return Some(result);
232 }
233 }
234 }
235 }
236 }
237 }
238
239 fn block_on(&self, future: &mut AnyLocalFuture) -> Option<Box<dyn Any>> {
240 let unparker = self.parker.lock().unparker();
241 let waker = waker_fn(move || {
242 unparker.unpark();
243 });
244 let max_ticks = {
245 let mut state = self.state.lock();
246 let range = state.block_on_ticks.clone();
247 state.rng.gen_range(range)
248 };
249
250 let mut cx = Context::from_waker(&waker);
251 for _ in 0..max_ticks {
252 let mut state = self.state.lock();
253 let runnable_count = state.scheduled_from_background.len();
254 let ix = state.rng.gen_range(0..=runnable_count);
255 if ix < state.scheduled_from_background.len() {
256 let runnable = state.scheduled_from_background.remove(ix);
257 drop(state);
258 runnable.run();
259 } else {
260 drop(state);
261 if let Poll::Ready(result) = future.as_mut().poll(&mut cx) {
262 return Some(result);
263 }
264 let mut state = self.state.lock();
265 if state.scheduled_from_background.is_empty() {
266 state.will_park();
267 drop(state);
268 self.parker.lock().park();
269 }
270
271 continue;
272 }
273 }
274
275 None
276 }
277}
278
279impl DeterministicState {
280 fn will_park(&mut self) {
281 if self.forbid_parking {
282 let mut backtrace_message = String::new();
283 if let Some(backtrace) = self.waiting_backtrace.as_mut() {
284 backtrace.resolve();
285 backtrace_message = format!(
286 "\nbacktrace of waiting future:\n{:?}",
287 CwdBacktrace::new(backtrace)
288 );
289 }
290
291 panic!(
292 "deterministic executor parked after a call to forbid_parking{}",
293 backtrace_message
294 );
295 }
296 }
297}
298
299struct CwdBacktrace<'a> {
300 backtrace: &'a Backtrace,
301}
302
303impl<'a> CwdBacktrace<'a> {
304 fn new(backtrace: &'a Backtrace) -> Self {
305 Self { backtrace }
306 }
307}
308
309impl<'a> Debug for CwdBacktrace<'a> {
310 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> std::fmt::Result {
311 let cwd = std::env::current_dir().unwrap();
312 let mut print_path = |fmt: &mut fmt::Formatter<'_>, path: BytesOrWideString<'_>| {
313 fmt::Display::fmt(&path, fmt)
314 };
315 let mut fmt = BacktraceFmt::new(f, backtrace::PrintFmt::Full, &mut print_path);
316 for frame in self.backtrace.frames() {
317 let mut formatted_frame = fmt.frame();
318 if frame
319 .symbols()
320 .iter()
321 .any(|s| s.filename().map_or(false, |f| f.starts_with(&cwd)))
322 {
323 formatted_frame.backtrace_frame(frame)?;
324 }
325 }
326 fmt.finish()
327 }
328}
329
330impl Foreground {
331 pub fn platform(dispatcher: Arc<dyn platform::Dispatcher>) -> Result<Self> {
332 if dispatcher.is_main_thread() {
333 Ok(Self::Platform {
334 dispatcher,
335 _not_send_or_sync: PhantomData,
336 })
337 } else {
338 Err(anyhow!("must be constructed on main thread"))
339 }
340 }
341
342 pub fn spawn<T: 'static>(&self, future: impl Future<Output = T> + 'static) -> Task<T> {
343 let future = any_local_future(future);
344 let any_task = match self {
345 Self::Deterministic { cx_id, executor } => {
346 executor.spawn_from_foreground(*cx_id, future, false)
347 }
348 Self::Platform { dispatcher, .. } => {
349 fn spawn_inner(
350 future: AnyLocalFuture,
351 dispatcher: &Arc<dyn Dispatcher>,
352 ) -> AnyLocalTask {
353 let dispatcher = dispatcher.clone();
354 let schedule =
355 move |runnable: Runnable| dispatcher.run_on_main_thread(runnable);
356 let (runnable, task) = async_task::spawn_local(future, schedule);
357 runnable.schedule();
358 task
359 }
360 spawn_inner(future, dispatcher)
361 }
362 };
363 Task::local(any_task)
364 }
365
366 pub fn run<T: 'static>(&self, future: impl 'static + Future<Output = T>) -> T {
367 let future = any_local_future(future);
368 let any_value = match self {
369 Self::Deterministic { cx_id, executor } => executor.run(*cx_id, future),
370 Self::Platform { .. } => panic!("you can't call run on a platform foreground executor"),
371 };
372 *any_value.downcast().unwrap()
373 }
374
375 pub fn parking_forbidden(&self) -> bool {
376 match self {
377 Self::Deterministic { executor, .. } => executor.state.lock().forbid_parking,
378 _ => panic!("this method can only be called on a deterministic executor"),
379 }
380 }
381
382 pub fn start_waiting(&self) {
383 match self {
384 Self::Deterministic { executor, .. } => {
385 executor.state.lock().waiting_backtrace = Some(Backtrace::new_unresolved());
386 }
387 _ => panic!("this method can only be called on a deterministic executor"),
388 }
389 }
390
391 pub fn finish_waiting(&self) {
392 match self {
393 Self::Deterministic { executor, .. } => {
394 executor.state.lock().waiting_backtrace.take();
395 }
396 _ => panic!("this method can only be called on a deterministic executor"),
397 }
398 }
399
400 pub fn forbid_parking(&self) {
401 match self {
402 Self::Deterministic { executor, .. } => {
403 let mut state = executor.state.lock();
404 state.forbid_parking = true;
405 state.rng = StdRng::seed_from_u64(state.seed);
406 }
407 _ => panic!("this method can only be called on a deterministic executor"),
408 }
409 }
410
411 pub async fn timer(&self, duration: Duration) {
412 match self {
413 Self::Deterministic { executor, .. } => {
414 let (tx, mut rx) = barrier::channel();
415 {
416 let mut state = executor.state.lock();
417 let wakeup_at = state.now + duration;
418 state.pending_timers.push((wakeup_at, tx));
419 }
420 rx.recv().await;
421 }
422 _ => {
423 Timer::after(duration).await;
424 }
425 }
426 }
427
428 pub fn advance_clock(&self, duration: Duration) {
429 match self {
430 Self::Deterministic { executor, .. } => {
431 executor.run_until_parked();
432
433 let mut state = executor.state.lock();
434 state.now += duration;
435 let now = state.now;
436 let mut pending_timers = mem::take(&mut state.pending_timers);
437 drop(state);
438
439 pending_timers.retain(|(wakeup, _)| *wakeup > now);
440 executor.state.lock().pending_timers.extend(pending_timers);
441 }
442 _ => panic!("this method can only be called on a deterministic executor"),
443 }
444 }
445
446 pub fn set_block_on_ticks(&self, range: RangeInclusive<usize>) {
447 match self {
448 Self::Deterministic { executor, .. } => executor.state.lock().block_on_ticks = range,
449 _ => panic!("this method can only be called on a deterministic executor"),
450 }
451 }
452}
453
454impl Background {
455 pub fn new() -> Self {
456 let executor = Arc::new(Executor::new());
457 let stop = channel::unbounded::<()>();
458
459 for i in 0..2 * num_cpus::get() {
460 let executor = executor.clone();
461 let stop = stop.1.clone();
462 thread::Builder::new()
463 .name(format!("background-executor-{}", i))
464 .spawn(move || smol::block_on(executor.run(stop.recv())))
465 .unwrap();
466 }
467
468 Self::Production {
469 executor,
470 _stop: stop.0,
471 }
472 }
473
474 pub fn num_cpus(&self) -> usize {
475 num_cpus::get()
476 }
477
478 pub fn spawn<T, F>(&self, future: F) -> Task<T>
479 where
480 T: 'static + Send,
481 F: Send + Future<Output = T> + 'static,
482 {
483 let future = any_future(future);
484 let any_task = match self {
485 Self::Production { executor, .. } => executor.spawn(future),
486 Self::Deterministic { executor } => executor.spawn(future),
487 };
488 Task::send(any_task)
489 }
490
491 pub fn block_with_timeout<F, T>(
492 &self,
493 timeout: Duration,
494 future: F,
495 ) -> Result<T, impl Future<Output = T>>
496 where
497 T: 'static,
498 F: 'static + Unpin + Future<Output = T>,
499 {
500 let mut future = any_local_future(future);
501 if !timeout.is_zero() {
502 let output = match self {
503 Self::Production { .. } => smol::block_on(util::timeout(timeout, &mut future)).ok(),
504 Self::Deterministic { executor, .. } => executor.block_on(&mut future),
505 };
506 if let Some(output) = output {
507 return Ok(*output.downcast().unwrap());
508 }
509 }
510 Err(async { *future.await.downcast().unwrap() })
511 }
512
513 pub async fn scoped<'scope, F>(&self, scheduler: F)
514 where
515 F: FnOnce(&mut Scope<'scope>),
516 {
517 let mut scope = Scope {
518 futures: Default::default(),
519 _phantom: PhantomData,
520 };
521 (scheduler)(&mut scope);
522 let spawned = scope
523 .futures
524 .into_iter()
525 .map(|f| self.spawn(f))
526 .collect::<Vec<_>>();
527 for task in spawned {
528 task.await;
529 }
530 }
531
532 pub async fn simulate_random_delay(&self) {
533 match self {
534 Self::Deterministic { executor, .. } => {
535 if executor.state.lock().rng.gen_range(0..100) < 20 {
536 yield_now().await;
537 }
538 }
539 _ => panic!("this method can only be called on a deterministic executor"),
540 }
541 }
542}
543
544pub struct Scope<'a> {
545 futures: Vec<Pin<Box<dyn Future<Output = ()> + Send + 'static>>>,
546 _phantom: PhantomData<&'a ()>,
547}
548
549impl<'a> Scope<'a> {
550 pub fn spawn<F>(&mut self, f: F)
551 where
552 F: Future<Output = ()> + Send + 'a,
553 {
554 let f = unsafe {
555 mem::transmute::<
556 Pin<Box<dyn Future<Output = ()> + Send + 'a>>,
557 Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
558 >(Box::pin(f))
559 };
560 self.futures.push(f);
561 }
562}
563
564impl<T> Task<T> {
565 pub fn ready(value: T) -> Self {
566 Self::Ready(Some(value))
567 }
568
569 fn local(any_task: AnyLocalTask) -> Self {
570 Self::Local {
571 any_task,
572 result_type: PhantomData,
573 }
574 }
575
576 pub fn detach(self) {
577 match self {
578 Task::Ready(_) => {}
579 Task::Local { any_task, .. } => any_task.detach(),
580 Task::Send { any_task, .. } => any_task.detach(),
581 }
582 }
583}
584
585impl<T: 'static, E: 'static + Display> Task<Result<T, E>> {
586 pub fn detach_and_log_err(self, cx: &mut MutableAppContext) {
587 cx.spawn(|_| async move {
588 if let Err(err) = self.await {
589 log::error!("{}", err);
590 }
591 })
592 .detach();
593 }
594}
595
596impl<T: Send> Task<T> {
597 fn send(any_task: AnyTask) -> Self {
598 Self::Send {
599 any_task,
600 result_type: PhantomData,
601 }
602 }
603}
604
605impl<T: fmt::Debug> fmt::Debug for Task<T> {
606 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
607 match self {
608 Task::Ready(value) => value.fmt(f),
609 Task::Local { any_task, .. } => any_task.fmt(f),
610 Task::Send { any_task, .. } => any_task.fmt(f),
611 }
612 }
613}
614
615impl<T: 'static> Future for Task<T> {
616 type Output = T;
617
618 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
619 match unsafe { self.get_unchecked_mut() } {
620 Task::Ready(value) => Poll::Ready(value.take().unwrap()),
621 Task::Local { any_task, .. } => {
622 any_task.poll(cx).map(|value| *value.downcast().unwrap())
623 }
624 Task::Send { any_task, .. } => {
625 any_task.poll(cx).map(|value| *value.downcast().unwrap())
626 }
627 }
628 }
629}
630
631fn any_future<T, F>(future: F) -> AnyFuture
632where
633 T: 'static + Send,
634 F: Future<Output = T> + Send + 'static,
635{
636 async { Box::new(future.await) as Box<dyn Any + Send> }.boxed()
637}
638
639fn any_local_future<T, F>(future: F) -> AnyLocalFuture
640where
641 T: 'static,
642 F: Future<Output = T> + 'static,
643{
644 async { Box::new(future.await) as Box<dyn Any> }.boxed_local()
645}