1use anyhow::{anyhow, Result};
2use async_task::Runnable;
3pub use async_task::Task;
4use backtrace::{Backtrace, BacktraceFmt, BytesOrWideString};
5use parking_lot::Mutex;
6use rand::prelude::*;
7use smol::{channel, prelude::*, Executor};
8use std::{
9 fmt::{self, Debug},
10 marker::PhantomData,
11 mem,
12 ops::RangeInclusive,
13 pin::Pin,
14 rc::Rc,
15 sync::{
16 atomic::{AtomicBool, Ordering::SeqCst},
17 Arc,
18 },
19 task::{Context, Poll},
20 thread,
21 time::Duration,
22};
23use waker_fn::waker_fn;
24
25use crate::{platform, util};
26
27pub enum Foreground {
28 Platform {
29 dispatcher: Arc<dyn platform::Dispatcher>,
30 _not_send_or_sync: PhantomData<Rc<()>>,
31 },
32 Test(smol::LocalExecutor<'static>),
33 Deterministic(Arc<Deterministic>),
34}
35
36pub enum Background {
37 Deterministic(Arc<Deterministic>),
38 Production {
39 executor: Arc<smol::Executor<'static>>,
40 _stop: channel::Sender<()>,
41 },
42}
43
44struct DeterministicState {
45 rng: StdRng,
46 seed: u64,
47 scheduled: Vec<(Runnable, Backtrace)>,
48 spawned_from_foreground: Vec<(Runnable, Backtrace)>,
49 forbid_parking: bool,
50 block_on_ticks: RangeInclusive<usize>,
51}
52
53pub struct Deterministic {
54 state: Arc<Mutex<DeterministicState>>,
55 parker: Mutex<parking::Parker>,
56}
57
58impl Deterministic {
59 fn new(seed: u64) -> Self {
60 Self {
61 state: Arc::new(Mutex::new(DeterministicState {
62 rng: StdRng::seed_from_u64(seed),
63 seed,
64 scheduled: Default::default(),
65 spawned_from_foreground: Default::default(),
66 forbid_parking: false,
67 block_on_ticks: 0..=1000,
68 })),
69 parker: Default::default(),
70 }
71 }
72
73 pub fn spawn_from_foreground<F, T>(&self, future: F) -> Task<T>
74 where
75 T: 'static,
76 F: Future<Output = T> + 'static,
77 {
78 let backtrace = Backtrace::new_unresolved();
79 let scheduled_once = AtomicBool::new(false);
80 let state = self.state.clone();
81 let unparker = self.parker.lock().unparker();
82 let (runnable, task) = async_task::spawn_local(future, move |runnable| {
83 let mut state = state.lock();
84 let backtrace = backtrace.clone();
85 if scheduled_once.fetch_or(true, SeqCst) {
86 state.scheduled.push((runnable, backtrace));
87 } else {
88 state.spawned_from_foreground.push((runnable, backtrace));
89 }
90 unparker.unpark();
91 });
92 runnable.schedule();
93 task
94 }
95
96 pub fn spawn<F, T>(&self, future: F) -> Task<T>
97 where
98 T: 'static + Send,
99 F: 'static + Send + Future<Output = T>,
100 {
101 let backtrace = Backtrace::new_unresolved();
102 let state = self.state.clone();
103 let unparker = self.parker.lock().unparker();
104 let (runnable, task) = async_task::spawn(future, move |runnable| {
105 let mut state = state.lock();
106 state.scheduled.push((runnable, backtrace.clone()));
107 unparker.unpark();
108 });
109 runnable.schedule();
110 task
111 }
112
113 pub fn run<F, T>(&self, future: F) -> T
114 where
115 T: 'static,
116 F: Future<Output = T> + 'static,
117 {
118 self.block_on(usize::MAX, future).unwrap()
119 }
120
121 pub fn block_on<F, T>(&self, max_ticks: usize, future: F) -> Option<T>
122 where
123 T: 'static,
124 F: Future<Output = T>,
125 {
126 smol::pin!(future);
127
128 let unparker = self.parker.lock().unparker();
129 let waker = waker_fn(move || {
130 unparker.unpark();
131 });
132
133 let mut cx = Context::from_waker(&waker);
134 let mut trace = Trace::default();
135 for _ in 0..max_ticks {
136 let mut state = self.state.lock();
137 let runnable_count = state.scheduled.len() + state.spawned_from_foreground.len();
138 let ix = state.rng.gen_range(0..=runnable_count);
139 if ix < state.scheduled.len() {
140 let (_, backtrace) = &state.scheduled[ix];
141 trace.record(&state, backtrace.clone());
142 let runnable = state.scheduled.remove(ix).0;
143 drop(state);
144 runnable.run();
145 } else if ix < runnable_count {
146 let (_, backtrace) = &state.spawned_from_foreground[0];
147 trace.record(&state, backtrace.clone());
148 let runnable = state.spawned_from_foreground.remove(0).0;
149 drop(state);
150 runnable.run();
151 } else {
152 drop(state);
153 if let Poll::Ready(result) = future.as_mut().poll(&mut cx) {
154 return Some(result);
155 }
156 let state = self.state.lock();
157 if state.scheduled.is_empty() && state.spawned_from_foreground.is_empty() {
158 if state.forbid_parking {
159 panic!("deterministic executor parked after a call to forbid_parking");
160 }
161 drop(state);
162 self.parker.lock().park();
163 }
164
165 continue;
166 }
167 }
168
169 None
170 }
171}
172
173#[derive(Default)]
174struct Trace {
175 executed: Vec<Backtrace>,
176 scheduled: Vec<Vec<Backtrace>>,
177 spawned_from_foreground: Vec<Vec<Backtrace>>,
178}
179
180impl Trace {
181 fn record(&mut self, state: &DeterministicState, executed: Backtrace) {
182 self.scheduled.push(
183 state
184 .scheduled
185 .iter()
186 .map(|(_, backtrace)| backtrace.clone())
187 .collect(),
188 );
189 self.spawned_from_foreground.push(
190 state
191 .spawned_from_foreground
192 .iter()
193 .map(|(_, backtrace)| backtrace.clone())
194 .collect(),
195 );
196 self.executed.push(executed);
197 }
198
199 fn resolve(&mut self) {
200 for backtrace in &mut self.executed {
201 backtrace.resolve();
202 }
203
204 for backtraces in &mut self.scheduled {
205 for backtrace in backtraces {
206 backtrace.resolve();
207 }
208 }
209
210 for backtraces in &mut self.spawned_from_foreground {
211 for backtrace in backtraces {
212 backtrace.resolve();
213 }
214 }
215 }
216}
217
218impl Debug for Trace {
219 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
220 struct FirstCwdFrameInBacktrace<'a>(&'a Backtrace);
221
222 impl<'a> Debug for FirstCwdFrameInBacktrace<'a> {
223 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> std::fmt::Result {
224 let cwd = std::env::current_dir().unwrap();
225 let mut print_path = |fmt: &mut fmt::Formatter<'_>, path: BytesOrWideString<'_>| {
226 fmt::Display::fmt(&path, fmt)
227 };
228 let mut fmt = BacktraceFmt::new(f, backtrace::PrintFmt::Full, &mut print_path);
229 for frame in self.0.frames() {
230 let mut formatted_frame = fmt.frame();
231 if frame
232 .symbols()
233 .iter()
234 .any(|s| s.filename().map_or(false, |f| f.starts_with(&cwd)))
235 {
236 formatted_frame.backtrace_frame(frame)?;
237 break;
238 }
239 }
240 fmt.finish()
241 }
242 }
243
244 for ((backtrace, scheduled), spawned_from_foreground) in self
245 .executed
246 .iter()
247 .zip(&self.scheduled)
248 .zip(&self.spawned_from_foreground)
249 {
250 writeln!(f, "Scheduled")?;
251 for backtrace in scheduled {
252 writeln!(f, "- {:?}", FirstCwdFrameInBacktrace(backtrace))?;
253 }
254 if scheduled.is_empty() {
255 writeln!(f, "None")?;
256 }
257 writeln!(f, "==========")?;
258
259 writeln!(f, "Spawned from foreground")?;
260 for backtrace in spawned_from_foreground {
261 writeln!(f, "- {:?}", FirstCwdFrameInBacktrace(backtrace))?;
262 }
263 if spawned_from_foreground.is_empty() {
264 writeln!(f, "None")?;
265 }
266 writeln!(f, "==========")?;
267
268 writeln!(f, "Run: {:?}", FirstCwdFrameInBacktrace(backtrace))?;
269 writeln!(f, "+++++++++++++++++++")?;
270 }
271
272 Ok(())
273 }
274}
275
276impl Drop for Trace {
277 fn drop(&mut self) {
278 let trace_on_panic = if let Ok(trace_on_panic) = std::env::var("EXECUTOR_TRACE_ON_PANIC") {
279 trace_on_panic == "1" || trace_on_panic == "true"
280 } else {
281 false
282 };
283 let trace_always = if let Ok(trace_always) = std::env::var("EXECUTOR_TRACE_ALWAYS") {
284 trace_always == "1" || trace_always == "true"
285 } else {
286 false
287 };
288
289 if trace_always || (trace_on_panic && thread::panicking()) {
290 self.resolve();
291 dbg!(self);
292 }
293 }
294}
295
296impl Foreground {
297 pub fn platform(dispatcher: Arc<dyn platform::Dispatcher>) -> Result<Self> {
298 if dispatcher.is_main_thread() {
299 Ok(Self::Platform {
300 dispatcher,
301 _not_send_or_sync: PhantomData,
302 })
303 } else {
304 Err(anyhow!("must be constructed on main thread"))
305 }
306 }
307
308 pub fn test() -> Self {
309 Self::Test(smol::LocalExecutor::new())
310 }
311
312 pub fn spawn<T: 'static>(&self, future: impl Future<Output = T> + 'static) -> Task<T> {
313 match self {
314 Self::Platform { dispatcher, .. } => {
315 let dispatcher = dispatcher.clone();
316 let schedule = move |runnable: Runnable| dispatcher.run_on_main_thread(runnable);
317 let (runnable, task) = async_task::spawn_local(future, schedule);
318 runnable.schedule();
319 task
320 }
321 Self::Test(executor) => executor.spawn(future),
322 Self::Deterministic(executor) => executor.spawn_from_foreground(future),
323 }
324 }
325
326 pub fn run<T: 'static>(&self, future: impl 'static + Future<Output = T>) -> T {
327 match self {
328 Self::Platform { .. } => panic!("you can't call run on a platform foreground executor"),
329 Self::Test(executor) => smol::block_on(executor.run(future)),
330 Self::Deterministic(executor) => executor.run(future),
331 }
332 }
333
334 pub fn forbid_parking(&self) {
335 match self {
336 Self::Deterministic(executor) => {
337 let mut state = executor.state.lock();
338 state.forbid_parking = true;
339 state.rng = StdRng::seed_from_u64(state.seed);
340 }
341 _ => panic!("this method can only be called on a deterministic executor"),
342 }
343 }
344
345 pub fn set_block_on_ticks(&self, range: RangeInclusive<usize>) {
346 match self {
347 Self::Deterministic(executor) => executor.state.lock().block_on_ticks = range,
348 _ => panic!("this method can only be called on a deterministic executor"),
349 }
350 }
351}
352
353impl Background {
354 pub fn new() -> Self {
355 let executor = Arc::new(Executor::new());
356 let stop = channel::unbounded::<()>();
357
358 for i in 0..2 * num_cpus::get() {
359 let executor = executor.clone();
360 let stop = stop.1.clone();
361 thread::Builder::new()
362 .name(format!("background-executor-{}", i))
363 .spawn(move || smol::block_on(executor.run(stop.recv())))
364 .unwrap();
365 }
366
367 Self::Production {
368 executor,
369 _stop: stop.0,
370 }
371 }
372
373 pub fn num_cpus(&self) -> usize {
374 num_cpus::get()
375 }
376
377 pub fn spawn<T, F>(&self, future: F) -> Task<T>
378 where
379 T: 'static + Send,
380 F: Send + Future<Output = T> + 'static,
381 {
382 match self {
383 Self::Production { executor, .. } => executor.spawn(future),
384 Self::Deterministic(executor) => executor.spawn(future),
385 }
386 }
387
388 pub fn block_on<F, T>(&self, timeout: Duration, future: F) -> Option<T>
389 where
390 T: 'static,
391 F: Future<Output = T>,
392 {
393 match self {
394 Self::Production { .. } => {
395 smol::block_on(async move { util::timeout(timeout, future).await.ok() })
396 }
397 Self::Deterministic(executor) => {
398 let max_ticks = {
399 let mut state = executor.state.lock();
400 let range = state.block_on_ticks.clone();
401 state.rng.gen_range(range)
402 };
403 executor.block_on(max_ticks, future)
404 }
405 }
406 }
407
408 pub async fn scoped<'scope, F>(&self, scheduler: F)
409 where
410 F: FnOnce(&mut Scope<'scope>),
411 {
412 let mut scope = Scope {
413 futures: Default::default(),
414 _phantom: PhantomData,
415 };
416 (scheduler)(&mut scope);
417 let spawned = scope
418 .futures
419 .into_iter()
420 .map(|f| self.spawn(f))
421 .collect::<Vec<_>>();
422 for task in spawned {
423 task.await;
424 }
425 }
426}
427
428pub struct Scope<'a> {
429 futures: Vec<Pin<Box<dyn Future<Output = ()> + Send + 'static>>>,
430 _phantom: PhantomData<&'a ()>,
431}
432
433impl<'a> Scope<'a> {
434 pub fn spawn<F>(&mut self, f: F)
435 where
436 F: Future<Output = ()> + Send + 'a,
437 {
438 let f = unsafe {
439 mem::transmute::<
440 Pin<Box<dyn Future<Output = ()> + Send + 'a>>,
441 Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
442 >(Box::pin(f))
443 };
444 self.futures.push(f);
445 }
446}
447
448pub fn deterministic(seed: u64) -> (Rc<Foreground>, Arc<Background>) {
449 let executor = Arc::new(Deterministic::new(seed));
450 (
451 Rc::new(Foreground::Deterministic(executor.clone())),
452 Arc::new(Background::Deterministic(executor)),
453 )
454}