1use anyhow::{anyhow, Result};
2use async_task::Runnable;
3use backtrace::{Backtrace, BacktraceFmt, BytesOrWideString};
4use parking_lot::Mutex;
5use postage::{barrier, prelude::Stream as _};
6use rand::prelude::*;
7use smol::{channel, prelude::*, Executor, Timer};
8use std::{
9 any::Any,
10 fmt::{self, Debug, Display},
11 marker::PhantomData,
12 mem,
13 ops::RangeInclusive,
14 pin::Pin,
15 rc::Rc,
16 sync::{
17 atomic::{AtomicBool, Ordering::SeqCst},
18 Arc,
19 },
20 task::{Context, Poll},
21 thread,
22 time::{Duration, Instant},
23};
24use waker_fn::waker_fn;
25
26use crate::{
27 platform::{self, Dispatcher},
28 util, MutableAppContext,
29};
30
31pub enum Foreground {
32 Platform {
33 dispatcher: Arc<dyn platform::Dispatcher>,
34 _not_send_or_sync: PhantomData<Rc<()>>,
35 },
36 Test(smol::LocalExecutor<'static>),
37 Deterministic(Arc<Deterministic>),
38}
39
40pub enum Background {
41 Deterministic {
42 executor: Arc<Deterministic>,
43 },
44 Production {
45 executor: Arc<smol::Executor<'static>>,
46 _stop: channel::Sender<()>,
47 },
48}
49
50type AnyLocalFuture = Pin<Box<dyn 'static + Future<Output = Box<dyn Any + 'static>>>>;
51type AnyFuture = Pin<Box<dyn 'static + Send + Future<Output = Box<dyn Any + Send + 'static>>>>;
52type AnyTask = async_task::Task<Box<dyn Any + Send + 'static>>;
53type AnyLocalTask = async_task::Task<Box<dyn Any + 'static>>;
54
55#[must_use]
56pub enum Task<T> {
57 Ready(Option<T>),
58 Local {
59 any_task: AnyLocalTask,
60 result_type: PhantomData<T>,
61 },
62 Send {
63 any_task: AnyTask,
64 result_type: PhantomData<T>,
65 },
66}
67
68unsafe impl<T: Send> Send for Task<T> {}
69
70struct DeterministicState {
71 rng: StdRng,
72 seed: u64,
73 scheduled_from_foreground: Vec<(Runnable, Backtrace)>,
74 scheduled_from_background: Vec<(Runnable, Backtrace)>,
75 spawned_from_foreground: Vec<(Runnable, Backtrace)>,
76 forbid_parking: bool,
77 block_on_ticks: RangeInclusive<usize>,
78 now: Instant,
79 pending_timers: Vec<(Instant, barrier::Sender)>,
80 waiting_backtrace: Option<Backtrace>,
81}
82
83pub struct Deterministic {
84 state: Arc<Mutex<DeterministicState>>,
85 parker: Mutex<parking::Parker>,
86}
87
88impl Deterministic {
89 fn new(seed: u64) -> Self {
90 Self {
91 state: Arc::new(Mutex::new(DeterministicState {
92 rng: StdRng::seed_from_u64(seed),
93 seed,
94 scheduled_from_foreground: Default::default(),
95 scheduled_from_background: Default::default(),
96 spawned_from_foreground: Default::default(),
97 forbid_parking: false,
98 block_on_ticks: 0..=1000,
99 now: Instant::now(),
100 pending_timers: Default::default(),
101 waiting_backtrace: None,
102 })),
103 parker: Default::default(),
104 }
105 }
106
107 fn spawn_from_foreground(&self, future: AnyLocalFuture) -> AnyLocalTask {
108 let backtrace = Backtrace::new_unresolved();
109 let scheduled_once = AtomicBool::new(false);
110 let state = self.state.clone();
111 let unparker = self.parker.lock().unparker();
112 let (runnable, task) = async_task::spawn_local(future, move |runnable| {
113 let mut state = state.lock();
114 let backtrace = backtrace.clone();
115 if scheduled_once.fetch_or(true, SeqCst) {
116 state.scheduled_from_foreground.push((runnable, backtrace));
117 } else {
118 state.spawned_from_foreground.push((runnable, backtrace));
119 }
120 unparker.unpark();
121 });
122 runnable.schedule();
123 task
124 }
125
126 fn spawn(&self, future: AnyFuture) -> AnyTask {
127 let backtrace = Backtrace::new_unresolved();
128 let state = self.state.clone();
129 let unparker = self.parker.lock().unparker();
130 let (runnable, task) = async_task::spawn(future, move |runnable| {
131 let mut state = state.lock();
132 state
133 .scheduled_from_background
134 .push((runnable, backtrace.clone()));
135 unparker.unpark();
136 });
137 runnable.schedule();
138 task
139 }
140
141 fn run(&self, mut future: AnyLocalFuture) -> Box<dyn Any> {
142 let woken = Arc::new(AtomicBool::new(false));
143 loop {
144 if let Some(result) = self.run_internal(woken.clone(), &mut future) {
145 return result;
146 }
147
148 if !woken.load(SeqCst) {
149 self.state.lock().will_park();
150 }
151
152 woken.store(false, SeqCst);
153 self.parker.lock().park();
154 }
155 }
156
157 fn run_until_parked(&self) {
158 let woken = Arc::new(AtomicBool::new(false));
159 let mut future = any_local_future(std::future::pending::<()>());
160 self.run_internal(woken, &mut future);
161 }
162
163 fn run_internal(
164 &self,
165 woken: Arc<AtomicBool>,
166 future: &mut AnyLocalFuture,
167 ) -> Option<Box<dyn Any>> {
168 let unparker = self.parker.lock().unparker();
169 let waker = waker_fn(move || {
170 woken.store(true, SeqCst);
171 unparker.unpark();
172 });
173
174 let mut cx = Context::from_waker(&waker);
175 let mut trace = Trace::default();
176 loop {
177 let mut state = self.state.lock();
178 let runnable_count = state.scheduled_from_foreground.len()
179 + state.scheduled_from_background.len()
180 + state.spawned_from_foreground.len();
181
182 let ix = state.rng.gen_range(0..=runnable_count);
183 if ix < state.scheduled_from_foreground.len() {
184 let (_, backtrace) = &state.scheduled_from_foreground[ix];
185 trace.record(&state, backtrace.clone());
186 let runnable = state.scheduled_from_foreground.remove(ix).0;
187 drop(state);
188 runnable.run();
189 } else if ix - state.scheduled_from_foreground.len()
190 < state.scheduled_from_background.len()
191 {
192 let ix = ix - state.scheduled_from_foreground.len();
193 let (_, backtrace) = &state.scheduled_from_background[ix];
194 trace.record(&state, backtrace.clone());
195 let runnable = state.scheduled_from_background.remove(ix).0;
196 drop(state);
197 runnable.run();
198 } else if ix < runnable_count {
199 let (_, backtrace) = &state.spawned_from_foreground[0];
200 trace.record(&state, backtrace.clone());
201 let runnable = state.spawned_from_foreground.remove(0).0;
202 drop(state);
203 runnable.run();
204 } else {
205 drop(state);
206 if let Poll::Ready(result) = future.poll(&mut cx) {
207 return Some(result);
208 }
209
210 let state = self.state.lock();
211
212 if state.scheduled_from_foreground.is_empty()
213 && state.scheduled_from_background.is_empty()
214 && state.spawned_from_foreground.is_empty()
215 {
216 return None;
217 }
218 }
219 }
220 }
221
222 fn block_on(&self, future: &mut AnyLocalFuture) -> Option<Box<dyn Any>> {
223 let unparker = self.parker.lock().unparker();
224 let waker = waker_fn(move || {
225 unparker.unpark();
226 });
227 let max_ticks = {
228 let mut state = self.state.lock();
229 let range = state.block_on_ticks.clone();
230 state.rng.gen_range(range)
231 };
232
233 let mut cx = Context::from_waker(&waker);
234 let mut trace = Trace::default();
235 for _ in 0..max_ticks {
236 let mut state = self.state.lock();
237 let runnable_count = state.scheduled_from_background.len();
238 let ix = state.rng.gen_range(0..=runnable_count);
239 if ix < state.scheduled_from_background.len() {
240 let (_, backtrace) = &state.scheduled_from_background[ix];
241 trace.record(&state, backtrace.clone());
242 let runnable = state.scheduled_from_background.remove(ix).0;
243 drop(state);
244 runnable.run();
245 } else {
246 drop(state);
247 if let Poll::Ready(result) = future.as_mut().poll(&mut cx) {
248 return Some(result);
249 }
250 let mut state = self.state.lock();
251 if state.scheduled_from_background.is_empty() {
252 state.will_park();
253 drop(state);
254 self.parker.lock().park();
255 }
256
257 continue;
258 }
259 }
260
261 None
262 }
263}
264
265impl DeterministicState {
266 fn will_park(&mut self) {
267 if self.forbid_parking {
268 let mut backtrace_message = String::new();
269 if let Some(backtrace) = self.waiting_backtrace.as_mut() {
270 backtrace.resolve();
271 backtrace_message = format!(
272 "\nbacktrace of waiting future:\n{:?}",
273 CwdBacktrace::new(backtrace)
274 );
275 }
276
277 panic!(
278 "deterministic executor parked after a call to forbid_parking{}",
279 backtrace_message
280 );
281 }
282 }
283}
284
285#[derive(Default)]
286struct Trace {
287 executed: Vec<Backtrace>,
288 scheduled: Vec<Vec<Backtrace>>,
289 spawned_from_foreground: Vec<Vec<Backtrace>>,
290}
291
292impl Trace {
293 fn record(&mut self, state: &DeterministicState, executed: Backtrace) {
294 self.scheduled.push(
295 state
296 .scheduled_from_foreground
297 .iter()
298 .map(|(_, backtrace)| backtrace.clone())
299 .collect(),
300 );
301 self.spawned_from_foreground.push(
302 state
303 .spawned_from_foreground
304 .iter()
305 .map(|(_, backtrace)| backtrace.clone())
306 .collect(),
307 );
308 self.executed.push(executed);
309 }
310
311 fn resolve(&mut self) {
312 for backtrace in &mut self.executed {
313 backtrace.resolve();
314 }
315
316 for backtraces in &mut self.scheduled {
317 for backtrace in backtraces {
318 backtrace.resolve();
319 }
320 }
321
322 for backtraces in &mut self.spawned_from_foreground {
323 for backtrace in backtraces {
324 backtrace.resolve();
325 }
326 }
327 }
328}
329
330struct CwdBacktrace<'a> {
331 backtrace: &'a Backtrace,
332 first_frame_only: bool,
333}
334
335impl<'a> CwdBacktrace<'a> {
336 fn new(backtrace: &'a Backtrace) -> Self {
337 Self {
338 backtrace,
339 first_frame_only: false,
340 }
341 }
342
343 fn first_frame(backtrace: &'a Backtrace) -> Self {
344 Self {
345 backtrace,
346 first_frame_only: true,
347 }
348 }
349}
350
351impl<'a> Debug for CwdBacktrace<'a> {
352 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> std::fmt::Result {
353 let cwd = std::env::current_dir().unwrap();
354 let mut print_path = |fmt: &mut fmt::Formatter<'_>, path: BytesOrWideString<'_>| {
355 fmt::Display::fmt(&path, fmt)
356 };
357 let mut fmt = BacktraceFmt::new(f, backtrace::PrintFmt::Full, &mut print_path);
358 for frame in self.backtrace.frames() {
359 let mut formatted_frame = fmt.frame();
360 if frame
361 .symbols()
362 .iter()
363 .any(|s| s.filename().map_or(false, |f| f.starts_with(&cwd)))
364 {
365 formatted_frame.backtrace_frame(frame)?;
366 if self.first_frame_only {
367 break;
368 }
369 }
370 }
371 fmt.finish()
372 }
373}
374
375impl Debug for Trace {
376 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
377 for ((backtrace, scheduled), spawned_from_foreground) in self
378 .executed
379 .iter()
380 .zip(&self.scheduled)
381 .zip(&self.spawned_from_foreground)
382 {
383 writeln!(f, "Scheduled")?;
384 for backtrace in scheduled {
385 writeln!(f, "- {:?}", CwdBacktrace::first_frame(backtrace))?;
386 }
387 if scheduled.is_empty() {
388 writeln!(f, "None")?;
389 }
390 writeln!(f, "==========")?;
391
392 writeln!(f, "Spawned from foreground")?;
393 for backtrace in spawned_from_foreground {
394 writeln!(f, "- {:?}", CwdBacktrace::first_frame(backtrace))?;
395 }
396 if spawned_from_foreground.is_empty() {
397 writeln!(f, "None")?;
398 }
399 writeln!(f, "==========")?;
400
401 writeln!(f, "Run: {:?}", CwdBacktrace::first_frame(backtrace))?;
402 writeln!(f, "+++++++++++++++++++")?;
403 }
404
405 Ok(())
406 }
407}
408
409impl Drop for Trace {
410 fn drop(&mut self) {
411 let trace_on_panic = if let Ok(trace_on_panic) = std::env::var("EXECUTOR_TRACE_ON_PANIC") {
412 trace_on_panic == "1" || trace_on_panic == "true"
413 } else {
414 false
415 };
416 let trace_always = if let Ok(trace_always) = std::env::var("EXECUTOR_TRACE_ALWAYS") {
417 trace_always == "1" || trace_always == "true"
418 } else {
419 false
420 };
421
422 if trace_always || (trace_on_panic && thread::panicking()) {
423 self.resolve();
424 dbg!(self);
425 }
426 }
427}
428
429impl Foreground {
430 pub fn platform(dispatcher: Arc<dyn platform::Dispatcher>) -> Result<Self> {
431 if dispatcher.is_main_thread() {
432 Ok(Self::Platform {
433 dispatcher,
434 _not_send_or_sync: PhantomData,
435 })
436 } else {
437 Err(anyhow!("must be constructed on main thread"))
438 }
439 }
440
441 pub fn test() -> Self {
442 Self::Test(smol::LocalExecutor::new())
443 }
444
445 pub fn spawn<T: 'static>(&self, future: impl Future<Output = T> + 'static) -> Task<T> {
446 let future = any_local_future(future);
447 let any_task = match self {
448 Self::Deterministic(executor) => executor.spawn_from_foreground(future),
449 Self::Platform { dispatcher, .. } => {
450 fn spawn_inner(
451 future: AnyLocalFuture,
452 dispatcher: &Arc<dyn Dispatcher>,
453 ) -> AnyLocalTask {
454 let dispatcher = dispatcher.clone();
455 let schedule =
456 move |runnable: Runnable| dispatcher.run_on_main_thread(runnable);
457 let (runnable, task) = async_task::spawn_local(future, schedule);
458 runnable.schedule();
459 task
460 }
461 spawn_inner(future, dispatcher)
462 }
463 Self::Test(executor) => executor.spawn(future),
464 };
465 Task::local(any_task)
466 }
467
468 pub fn run<T: 'static>(&self, future: impl 'static + Future<Output = T>) -> T {
469 let future = any_local_future(future);
470 let any_value = match self {
471 Self::Deterministic(executor) => executor.run(future),
472 Self::Platform { .. } => panic!("you can't call run on a platform foreground executor"),
473 Self::Test(executor) => smol::block_on(executor.run(future)),
474 };
475 *any_value.downcast().unwrap()
476 }
477
478 pub fn parking_forbidden(&self) -> bool {
479 match self {
480 Self::Deterministic(executor) => executor.state.lock().forbid_parking,
481 _ => panic!("this method can only be called on a deterministic executor"),
482 }
483 }
484
485 pub fn start_waiting(&self) {
486 match self {
487 Self::Deterministic(executor) => {
488 executor.state.lock().waiting_backtrace = Some(Backtrace::new_unresolved());
489 }
490 _ => panic!("this method can only be called on a deterministic executor"),
491 }
492 }
493
494 pub fn finish_waiting(&self) {
495 match self {
496 Self::Deterministic(executor) => {
497 executor.state.lock().waiting_backtrace.take();
498 }
499 _ => panic!("this method can only be called on a deterministic executor"),
500 }
501 }
502
503 pub fn forbid_parking(&self) {
504 match self {
505 Self::Deterministic(executor) => {
506 let mut state = executor.state.lock();
507 state.forbid_parking = true;
508 state.rng = StdRng::seed_from_u64(state.seed);
509 }
510 _ => panic!("this method can only be called on a deterministic executor"),
511 }
512 }
513
514 pub async fn timer(&self, duration: Duration) {
515 match self {
516 Self::Deterministic(executor) => {
517 let (tx, mut rx) = barrier::channel();
518 {
519 let mut state = executor.state.lock();
520 let wakeup_at = state.now + duration;
521 state.pending_timers.push((wakeup_at, tx));
522 }
523 rx.recv().await;
524 }
525 _ => {
526 Timer::after(duration).await;
527 }
528 }
529 }
530
531 pub fn advance_clock(&self, duration: Duration) {
532 match self {
533 Self::Deterministic(executor) => {
534 executor.run_until_parked();
535
536 let mut state = executor.state.lock();
537 state.now += duration;
538 let now = state.now;
539 let mut pending_timers = mem::take(&mut state.pending_timers);
540 drop(state);
541
542 pending_timers.retain(|(wakeup, _)| *wakeup > now);
543 executor.state.lock().pending_timers.extend(pending_timers);
544 }
545 _ => panic!("this method can only be called on a deterministic executor"),
546 }
547 }
548
549 pub fn set_block_on_ticks(&self, range: RangeInclusive<usize>) {
550 match self {
551 Self::Deterministic(executor) => executor.state.lock().block_on_ticks = range,
552 _ => panic!("this method can only be called on a deterministic executor"),
553 }
554 }
555}
556
557impl Background {
558 pub fn new() -> Self {
559 let executor = Arc::new(Executor::new());
560 let stop = channel::unbounded::<()>();
561
562 for i in 0..2 * num_cpus::get() {
563 let executor = executor.clone();
564 let stop = stop.1.clone();
565 thread::Builder::new()
566 .name(format!("background-executor-{}", i))
567 .spawn(move || smol::block_on(executor.run(stop.recv())))
568 .unwrap();
569 }
570
571 Self::Production {
572 executor,
573 _stop: stop.0,
574 }
575 }
576
577 pub fn num_cpus(&self) -> usize {
578 num_cpus::get()
579 }
580
581 pub fn spawn<T, F>(&self, future: F) -> Task<T>
582 where
583 T: 'static + Send,
584 F: Send + Future<Output = T> + 'static,
585 {
586 let future = any_future(future);
587 let any_task = match self {
588 Self::Production { executor, .. } => executor.spawn(future),
589 Self::Deterministic { executor, .. } => executor.spawn(future),
590 };
591 Task::send(any_task)
592 }
593
594 pub fn block_with_timeout<F, T>(
595 &self,
596 timeout: Duration,
597 future: F,
598 ) -> Result<T, impl Future<Output = T>>
599 where
600 T: 'static,
601 F: 'static + Unpin + Future<Output = T>,
602 {
603 let mut future = any_local_future(future);
604 if !timeout.is_zero() {
605 let output = match self {
606 Self::Production { .. } => smol::block_on(util::timeout(timeout, &mut future)).ok(),
607 Self::Deterministic { executor, .. } => executor.block_on(&mut future),
608 };
609 if let Some(output) = output {
610 return Ok(*output.downcast().unwrap());
611 }
612 }
613 Err(async { *future.await.downcast().unwrap() })
614 }
615
616 pub async fn scoped<'scope, F>(&self, scheduler: F)
617 where
618 F: FnOnce(&mut Scope<'scope>),
619 {
620 let mut scope = Scope {
621 futures: Default::default(),
622 _phantom: PhantomData,
623 };
624 (scheduler)(&mut scope);
625 let spawned = scope
626 .futures
627 .into_iter()
628 .map(|f| self.spawn(f))
629 .collect::<Vec<_>>();
630 for task in spawned {
631 task.await;
632 }
633 }
634}
635
636pub struct Scope<'a> {
637 futures: Vec<Pin<Box<dyn Future<Output = ()> + Send + 'static>>>,
638 _phantom: PhantomData<&'a ()>,
639}
640
641impl<'a> Scope<'a> {
642 pub fn spawn<F>(&mut self, f: F)
643 where
644 F: Future<Output = ()> + Send + 'a,
645 {
646 let f = unsafe {
647 mem::transmute::<
648 Pin<Box<dyn Future<Output = ()> + Send + 'a>>,
649 Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
650 >(Box::pin(f))
651 };
652 self.futures.push(f);
653 }
654}
655
656pub fn deterministic(seed: u64) -> (Rc<Foreground>, Arc<Background>) {
657 let executor = Arc::new(Deterministic::new(seed));
658 (
659 Rc::new(Foreground::Deterministic(executor.clone())),
660 Arc::new(Background::Deterministic { executor }),
661 )
662}
663
664impl<T> Task<T> {
665 pub fn ready(value: T) -> Self {
666 Self::Ready(Some(value))
667 }
668
669 fn local(any_task: AnyLocalTask) -> Self {
670 Self::Local {
671 any_task,
672 result_type: PhantomData,
673 }
674 }
675
676 pub fn detach(self) {
677 match self {
678 Task::Ready(_) => {}
679 Task::Local { any_task, .. } => any_task.detach(),
680 Task::Send { any_task, .. } => any_task.detach(),
681 }
682 }
683}
684
685impl<T: 'static, E: 'static + Display> Task<Result<T, E>> {
686 pub fn detach_and_log_err(self, cx: &mut MutableAppContext) {
687 cx.spawn(|_| async move {
688 if let Err(err) = self.await {
689 log::error!("{}", err);
690 }
691 })
692 .detach();
693 }
694}
695
696impl<T: Send> Task<T> {
697 fn send(any_task: AnyTask) -> Self {
698 Self::Send {
699 any_task,
700 result_type: PhantomData,
701 }
702 }
703}
704
705impl<T: fmt::Debug> fmt::Debug for Task<T> {
706 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
707 match self {
708 Task::Ready(value) => value.fmt(f),
709 Task::Local { any_task, .. } => any_task.fmt(f),
710 Task::Send { any_task, .. } => any_task.fmt(f),
711 }
712 }
713}
714
715impl<T: 'static> Future for Task<T> {
716 type Output = T;
717
718 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
719 match unsafe { self.get_unchecked_mut() } {
720 Task::Ready(value) => Poll::Ready(value.take().unwrap()),
721 Task::Local { any_task, .. } => {
722 any_task.poll(cx).map(|value| *value.downcast().unwrap())
723 }
724 Task::Send { any_task, .. } => {
725 any_task.poll(cx).map(|value| *value.downcast().unwrap())
726 }
727 }
728 }
729}
730
731fn any_future<T, F>(future: F) -> AnyFuture
732where
733 T: 'static + Send,
734 F: Future<Output = T> + Send + 'static,
735{
736 async { Box::new(future.await) as Box<dyn Any + Send> }.boxed()
737}
738
739fn any_local_future<T, F>(future: F) -> AnyLocalFuture
740where
741 T: 'static,
742 F: Future<Output = T> + 'static,
743{
744 async { Box::new(future.await) as Box<dyn Any> }.boxed_local()
745}