1use anyhow::{anyhow, Result};
2use async_task::Runnable;
3pub use async_task::Task;
4use backtrace::{Backtrace, BacktraceFmt, BytesOrWideString};
5use parking_lot::Mutex;
6use rand::prelude::*;
7use smol::{channel, prelude::*, Executor};
8use std::{
9 fmt::{self, Debug},
10 marker::PhantomData,
11 mem,
12 ops::RangeInclusive,
13 pin::Pin,
14 rc::Rc,
15 sync::{
16 atomic::{AtomicBool, Ordering::SeqCst},
17 Arc,
18 },
19 task::{Context, Poll},
20 thread,
21 time::Duration,
22};
23use waker_fn::waker_fn;
24
25use crate::{platform, util};
26
27pub enum Foreground {
28 Platform {
29 dispatcher: Arc<dyn platform::Dispatcher>,
30 _not_send_or_sync: PhantomData<Rc<()>>,
31 },
32 Test(smol::LocalExecutor<'static>),
33 Deterministic(Arc<Deterministic>),
34}
35
36pub enum Background {
37 Deterministic(Arc<Deterministic>),
38 Production {
39 executor: Arc<smol::Executor<'static>>,
40 _stop: channel::Sender<()>,
41 },
42}
43
44struct DeterministicState {
45 rng: StdRng,
46 seed: u64,
47 scheduled_from_foreground: Vec<(Runnable, Backtrace)>,
48 scheduled_from_background: Vec<(Runnable, Backtrace)>,
49 spawned_from_foreground: Vec<(Runnable, Backtrace)>,
50 forbid_parking: bool,
51 block_on_ticks: RangeInclusive<usize>,
52}
53
54pub struct Deterministic {
55 state: Arc<Mutex<DeterministicState>>,
56 parker: Mutex<parking::Parker>,
57}
58
59impl Deterministic {
60 fn new(seed: u64) -> Self {
61 Self {
62 state: Arc::new(Mutex::new(DeterministicState {
63 rng: StdRng::seed_from_u64(seed),
64 seed,
65 scheduled_from_foreground: Default::default(),
66 scheduled_from_background: Default::default(),
67 spawned_from_foreground: Default::default(),
68 forbid_parking: false,
69 block_on_ticks: 0..=1000,
70 })),
71 parker: Default::default(),
72 }
73 }
74
75 pub fn spawn_from_foreground<F, T>(&self, future: F) -> Task<T>
76 where
77 T: 'static,
78 F: Future<Output = T> + 'static,
79 {
80 let backtrace = Backtrace::new_unresolved();
81 let scheduled_once = AtomicBool::new(false);
82 let state = self.state.clone();
83 let unparker = self.parker.lock().unparker();
84 let (runnable, task) = async_task::spawn_local(future, move |runnable| {
85 let mut state = state.lock();
86 let backtrace = backtrace.clone();
87 if scheduled_once.fetch_or(true, SeqCst) {
88 state.scheduled_from_foreground.push((runnable, backtrace));
89 } else {
90 state.spawned_from_foreground.push((runnable, backtrace));
91 }
92 unparker.unpark();
93 });
94 runnable.schedule();
95 task
96 }
97
98 pub fn spawn<F, T>(&self, future: F) -> Task<T>
99 where
100 T: 'static + Send,
101 F: 'static + Send + Future<Output = T>,
102 {
103 let backtrace = Backtrace::new_unresolved();
104 let state = self.state.clone();
105 let unparker = self.parker.lock().unparker();
106 let (runnable, task) = async_task::spawn(future, move |runnable| {
107 let mut state = state.lock();
108 state
109 .scheduled_from_background
110 .push((runnable, backtrace.clone()));
111 unparker.unpark();
112 });
113 runnable.schedule();
114 task
115 }
116
117 pub fn run<F, T>(&self, future: F) -> T
118 where
119 T: 'static,
120 F: Future<Output = T> + 'static,
121 {
122 smol::pin!(future);
123
124 let unparker = self.parker.lock().unparker();
125 let waker = waker_fn(move || {
126 unparker.unpark();
127 });
128
129 let mut cx = Context::from_waker(&waker);
130 let mut trace = Trace::default();
131 loop {
132 let mut state = self.state.lock();
133 let runnable_count = state.scheduled_from_foreground.len()
134 + state.scheduled_from_background.len()
135 + state.spawned_from_foreground.len();
136
137 let ix = state.rng.gen_range(0..=runnable_count);
138 if ix < state.scheduled_from_foreground.len() {
139 let (_, backtrace) = &state.scheduled_from_foreground[ix];
140 trace.record(&state, backtrace.clone());
141 let runnable = state.scheduled_from_foreground.remove(ix).0;
142 drop(state);
143 runnable.run();
144 } else if ix - state.scheduled_from_foreground.len()
145 < state.scheduled_from_background.len()
146 {
147 let ix = ix - state.scheduled_from_foreground.len();
148 let (_, backtrace) = &state.scheduled_from_background[ix];
149 trace.record(&state, backtrace.clone());
150 let runnable = state.scheduled_from_background.remove(ix).0;
151 drop(state);
152 runnable.run();
153 } else if ix < runnable_count {
154 let (_, backtrace) = &state.spawned_from_foreground[0];
155 trace.record(&state, backtrace.clone());
156 let runnable = state.spawned_from_foreground.remove(0).0;
157 drop(state);
158 runnable.run();
159 } else {
160 drop(state);
161 if let Poll::Ready(result) = future.as_mut().poll(&mut cx) {
162 return result;
163 }
164 let state = self.state.lock();
165 if state.scheduled_from_foreground.is_empty()
166 && state.scheduled_from_background.is_empty()
167 && state.spawned_from_foreground.is_empty()
168 {
169 if state.forbid_parking {
170 panic!("deterministic executor parked after a call to forbid_parking");
171 }
172 drop(state);
173 self.parker.lock().park();
174 }
175
176 continue;
177 }
178 }
179 }
180
181 pub fn block_on<F, T>(&self, future: F) -> Option<T>
182 where
183 T: 'static,
184 F: Future<Output = T>,
185 {
186 smol::pin!(future);
187
188 let unparker = self.parker.lock().unparker();
189 let waker = waker_fn(move || {
190 unparker.unpark();
191 });
192 let max_ticks = {
193 let mut state = self.state.lock();
194 let range = state.block_on_ticks.clone();
195 state.rng.gen_range(range)
196 };
197
198 let mut cx = Context::from_waker(&waker);
199 let mut trace = Trace::default();
200 for _ in 0..max_ticks {
201 let mut state = self.state.lock();
202 let runnable_count = state.scheduled_from_background.len();
203 let ix = state.rng.gen_range(0..=runnable_count);
204 if ix < state.scheduled_from_background.len() {
205 let (_, backtrace) = &state.scheduled_from_background[ix];
206 trace.record(&state, backtrace.clone());
207 let runnable = state.scheduled_from_background.remove(ix).0;
208 drop(state);
209 runnable.run();
210 } else {
211 drop(state);
212 if let Poll::Ready(result) = future.as_mut().poll(&mut cx) {
213 return Some(result);
214 }
215 let state = self.state.lock();
216 if state.scheduled_from_background.is_empty() {
217 if state.forbid_parking {
218 panic!("deterministic executor parked after a call to forbid_parking");
219 }
220 drop(state);
221 self.parker.lock().park();
222 }
223
224 continue;
225 }
226 }
227
228 None
229 }
230}
231
232#[derive(Default)]
233struct Trace {
234 executed: Vec<Backtrace>,
235 scheduled: Vec<Vec<Backtrace>>,
236 spawned_from_foreground: Vec<Vec<Backtrace>>,
237}
238
239impl Trace {
240 fn record(&mut self, state: &DeterministicState, executed: Backtrace) {
241 self.scheduled.push(
242 state
243 .scheduled_from_foreground
244 .iter()
245 .map(|(_, backtrace)| backtrace.clone())
246 .collect(),
247 );
248 self.spawned_from_foreground.push(
249 state
250 .spawned_from_foreground
251 .iter()
252 .map(|(_, backtrace)| backtrace.clone())
253 .collect(),
254 );
255 self.executed.push(executed);
256 }
257
258 fn resolve(&mut self) {
259 for backtrace in &mut self.executed {
260 backtrace.resolve();
261 }
262
263 for backtraces in &mut self.scheduled {
264 for backtrace in backtraces {
265 backtrace.resolve();
266 }
267 }
268
269 for backtraces in &mut self.spawned_from_foreground {
270 for backtrace in backtraces {
271 backtrace.resolve();
272 }
273 }
274 }
275}
276
277impl Debug for Trace {
278 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
279 struct FirstCwdFrameInBacktrace<'a>(&'a Backtrace);
280
281 impl<'a> Debug for FirstCwdFrameInBacktrace<'a> {
282 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> std::fmt::Result {
283 let cwd = std::env::current_dir().unwrap();
284 let mut print_path = |fmt: &mut fmt::Formatter<'_>, path: BytesOrWideString<'_>| {
285 fmt::Display::fmt(&path, fmt)
286 };
287 let mut fmt = BacktraceFmt::new(f, backtrace::PrintFmt::Full, &mut print_path);
288 for frame in self.0.frames() {
289 let mut formatted_frame = fmt.frame();
290 if frame
291 .symbols()
292 .iter()
293 .any(|s| s.filename().map_or(false, |f| f.starts_with(&cwd)))
294 {
295 formatted_frame.backtrace_frame(frame)?;
296 break;
297 }
298 }
299 fmt.finish()
300 }
301 }
302
303 for ((backtrace, scheduled), spawned_from_foreground) in self
304 .executed
305 .iter()
306 .zip(&self.scheduled)
307 .zip(&self.spawned_from_foreground)
308 {
309 writeln!(f, "Scheduled")?;
310 for backtrace in scheduled {
311 writeln!(f, "- {:?}", FirstCwdFrameInBacktrace(backtrace))?;
312 }
313 if scheduled.is_empty() {
314 writeln!(f, "None")?;
315 }
316 writeln!(f, "==========")?;
317
318 writeln!(f, "Spawned from foreground")?;
319 for backtrace in spawned_from_foreground {
320 writeln!(f, "- {:?}", FirstCwdFrameInBacktrace(backtrace))?;
321 }
322 if spawned_from_foreground.is_empty() {
323 writeln!(f, "None")?;
324 }
325 writeln!(f, "==========")?;
326
327 writeln!(f, "Run: {:?}", FirstCwdFrameInBacktrace(backtrace))?;
328 writeln!(f, "+++++++++++++++++++")?;
329 }
330
331 Ok(())
332 }
333}
334
335impl Drop for Trace {
336 fn drop(&mut self) {
337 let trace_on_panic = if let Ok(trace_on_panic) = std::env::var("EXECUTOR_TRACE_ON_PANIC") {
338 trace_on_panic == "1" || trace_on_panic == "true"
339 } else {
340 false
341 };
342 let trace_always = if let Ok(trace_always) = std::env::var("EXECUTOR_TRACE_ALWAYS") {
343 trace_always == "1" || trace_always == "true"
344 } else {
345 false
346 };
347
348 if trace_always || (trace_on_panic && thread::panicking()) {
349 self.resolve();
350 dbg!(self);
351 }
352 }
353}
354
355impl Foreground {
356 pub fn platform(dispatcher: Arc<dyn platform::Dispatcher>) -> Result<Self> {
357 if dispatcher.is_main_thread() {
358 Ok(Self::Platform {
359 dispatcher,
360 _not_send_or_sync: PhantomData,
361 })
362 } else {
363 Err(anyhow!("must be constructed on main thread"))
364 }
365 }
366
367 pub fn test() -> Self {
368 Self::Test(smol::LocalExecutor::new())
369 }
370
371 pub fn spawn<T: 'static>(&self, future: impl Future<Output = T> + 'static) -> Task<T> {
372 match self {
373 Self::Platform { dispatcher, .. } => {
374 let dispatcher = dispatcher.clone();
375 let schedule = move |runnable: Runnable| dispatcher.run_on_main_thread(runnable);
376 let (runnable, task) = async_task::spawn_local(future, schedule);
377 runnable.schedule();
378 task
379 }
380 Self::Test(executor) => executor.spawn(future),
381 Self::Deterministic(executor) => executor.spawn_from_foreground(future),
382 }
383 }
384
385 pub fn run<T: 'static>(&self, future: impl 'static + Future<Output = T>) -> T {
386 match self {
387 Self::Platform { .. } => panic!("you can't call run on a platform foreground executor"),
388 Self::Test(executor) => smol::block_on(executor.run(future)),
389 Self::Deterministic(executor) => executor.run(future),
390 }
391 }
392
393 pub fn forbid_parking(&self) {
394 match self {
395 Self::Deterministic(executor) => {
396 let mut state = executor.state.lock();
397 state.forbid_parking = true;
398 state.rng = StdRng::seed_from_u64(state.seed);
399 }
400 _ => panic!("this method can only be called on a deterministic executor"),
401 }
402 }
403
404 pub fn set_block_on_ticks(&self, range: RangeInclusive<usize>) {
405 match self {
406 Self::Deterministic(executor) => executor.state.lock().block_on_ticks = range,
407 _ => panic!("this method can only be called on a deterministic executor"),
408 }
409 }
410}
411
412impl Background {
413 pub fn new() -> Self {
414 let executor = Arc::new(Executor::new());
415 let stop = channel::unbounded::<()>();
416
417 for i in 0..2 * num_cpus::get() {
418 let executor = executor.clone();
419 let stop = stop.1.clone();
420 thread::Builder::new()
421 .name(format!("background-executor-{}", i))
422 .spawn(move || smol::block_on(executor.run(stop.recv())))
423 .unwrap();
424 }
425
426 Self::Production {
427 executor,
428 _stop: stop.0,
429 }
430 }
431
432 pub fn num_cpus(&self) -> usize {
433 num_cpus::get()
434 }
435
436 pub fn spawn<T, F>(&self, future: F) -> Task<T>
437 where
438 T: 'static + Send,
439 F: Send + Future<Output = T> + 'static,
440 {
441 match self {
442 Self::Production { executor, .. } => executor.spawn(future),
443 Self::Deterministic(executor) => executor.spawn(future),
444 }
445 }
446
447 pub fn block_with_timeout<F, T>(&self, timeout: Duration, mut future: F) -> Result<T, F>
448 where
449 T: 'static,
450 F: 'static + Unpin + Future<Output = T>,
451 {
452 let output = match self {
453 Self::Production { .. } => {
454 smol::block_on(util::timeout(timeout, Pin::new(&mut future))).ok()
455 }
456 Self::Deterministic(executor) => executor.block_on(Pin::new(&mut future)),
457 };
458
459 if let Some(output) = output {
460 Ok(output)
461 } else {
462 Err(future)
463 }
464 }
465
466 pub async fn scoped<'scope, F>(&self, scheduler: F)
467 where
468 F: FnOnce(&mut Scope<'scope>),
469 {
470 let mut scope = Scope {
471 futures: Default::default(),
472 _phantom: PhantomData,
473 };
474 (scheduler)(&mut scope);
475 let spawned = scope
476 .futures
477 .into_iter()
478 .map(|f| self.spawn(f))
479 .collect::<Vec<_>>();
480 for task in spawned {
481 task.await;
482 }
483 }
484}
485
486pub struct Scope<'a> {
487 futures: Vec<Pin<Box<dyn Future<Output = ()> + Send + 'static>>>,
488 _phantom: PhantomData<&'a ()>,
489}
490
491impl<'a> Scope<'a> {
492 pub fn spawn<F>(&mut self, f: F)
493 where
494 F: Future<Output = ()> + Send + 'a,
495 {
496 let f = unsafe {
497 mem::transmute::<
498 Pin<Box<dyn Future<Output = ()> + Send + 'a>>,
499 Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
500 >(Box::pin(f))
501 };
502 self.futures.push(f);
503 }
504}
505
506pub fn deterministic(seed: u64) -> (Rc<Foreground>, Arc<Background>) {
507 let executor = Arc::new(Deterministic::new(seed));
508 (
509 Rc::new(Foreground::Deterministic(executor.clone())),
510 Arc::new(Background::Deterministic(executor)),
511 )
512}