1use anyhow::{anyhow, Result};
2use async_task::Runnable;
3pub use async_task::Task;
4use backtrace::{Backtrace, BacktraceFmt, BytesOrWideString};
5use parking_lot::Mutex;
6use rand::prelude::*;
7use smol::{channel, prelude::*, Executor};
8use std::{
9 fmt::{self, Debug},
10 marker::PhantomData,
11 mem,
12 pin::Pin,
13 rc::Rc,
14 sync::{
15 atomic::{AtomicBool, Ordering::SeqCst},
16 Arc,
17 },
18 task::{Context, Poll},
19 thread,
20 time::Duration,
21};
22use waker_fn::waker_fn;
23
24use crate::{platform, util};
25
26pub enum Foreground {
27 Platform {
28 dispatcher: Arc<dyn platform::Dispatcher>,
29 _not_send_or_sync: PhantomData<Rc<()>>,
30 },
31 Test(smol::LocalExecutor<'static>),
32 Deterministic(Arc<Deterministic>),
33}
34
35pub enum Background {
36 Deterministic(Arc<Deterministic>),
37 Production {
38 executor: Arc<smol::Executor<'static>>,
39 _stop: channel::Sender<()>,
40 },
41}
42
43struct DeterministicState {
44 rng: StdRng,
45 seed: u64,
46 scheduled: Vec<(Runnable, Backtrace)>,
47 spawned_from_foreground: Vec<(Runnable, Backtrace)>,
48 forbid_parking: bool,
49}
50
51pub struct Deterministic {
52 state: Arc<Mutex<DeterministicState>>,
53 parker: Mutex<parking::Parker>,
54}
55
56impl Deterministic {
57 fn new(seed: u64) -> Self {
58 Self {
59 state: Arc::new(Mutex::new(DeterministicState {
60 rng: StdRng::seed_from_u64(seed),
61 seed,
62 scheduled: Default::default(),
63 spawned_from_foreground: Default::default(),
64 forbid_parking: false,
65 })),
66 parker: Default::default(),
67 }
68 }
69
70 pub fn spawn_from_foreground<F, T>(&self, future: F) -> Task<T>
71 where
72 T: 'static,
73 F: Future<Output = T> + 'static,
74 {
75 let backtrace = Backtrace::new_unresolved();
76 let scheduled_once = AtomicBool::new(false);
77 let state = self.state.clone();
78 let unparker = self.parker.lock().unparker();
79 let (runnable, task) = async_task::spawn_local(future, move |runnable| {
80 let mut state = state.lock();
81 let backtrace = backtrace.clone();
82 if scheduled_once.fetch_or(true, SeqCst) {
83 state.scheduled.push((runnable, backtrace));
84 } else {
85 state.spawned_from_foreground.push((runnable, backtrace));
86 }
87 unparker.unpark();
88 });
89 runnable.schedule();
90 task
91 }
92
93 pub fn spawn<F, T>(&self, future: F) -> Task<T>
94 where
95 T: 'static + Send,
96 F: 'static + Send + Future<Output = T>,
97 {
98 let backtrace = Backtrace::new_unresolved();
99 let state = self.state.clone();
100 let unparker = self.parker.lock().unparker();
101 let (runnable, task) = async_task::spawn(future, move |runnable| {
102 let mut state = state.lock();
103 state.scheduled.push((runnable, backtrace.clone()));
104 unparker.unpark();
105 });
106 runnable.schedule();
107 task
108 }
109
110 pub fn run<F, T>(&self, future: F) -> T
111 where
112 T: 'static,
113 F: Future<Output = T> + 'static,
114 {
115 self.block_on(usize::MAX, future).unwrap()
116 }
117
118 pub fn block_on<F, T>(&self, max_ticks: usize, future: F) -> Option<T>
119 where
120 T: 'static,
121 F: Future<Output = T>,
122 {
123 smol::pin!(future);
124
125 let unparker = self.parker.lock().unparker();
126 let waker = waker_fn(move || {
127 unparker.unpark();
128 });
129
130 let mut cx = Context::from_waker(&waker);
131 let mut trace = Trace::default();
132 for _ in 0..max_ticks {
133 let mut state = self.state.lock();
134 let runnable_count = state.scheduled.len() + state.spawned_from_foreground.len();
135 let ix = state.rng.gen_range(0..=runnable_count);
136 if ix < state.scheduled.len() {
137 let (_, backtrace) = &state.scheduled[ix];
138 trace.record(&state, backtrace.clone());
139 let runnable = state.scheduled.remove(ix).0;
140 drop(state);
141 runnable.run();
142 } else if ix < runnable_count {
143 let (_, backtrace) = &state.spawned_from_foreground[0];
144 trace.record(&state, backtrace.clone());
145 let runnable = state.spawned_from_foreground.remove(0).0;
146 drop(state);
147 runnable.run();
148 } else {
149 drop(state);
150 if let Poll::Ready(result) = future.as_mut().poll(&mut cx) {
151 return Some(result);
152 }
153 let state = &mut *self.state.lock();
154 if state.scheduled.is_empty() && state.spawned_from_foreground.is_empty() {
155 if state.forbid_parking {
156 panic!("deterministic executor parked after a call to forbid_parking");
157 }
158 drop(state);
159 self.parker.lock().park();
160 }
161
162 continue;
163 }
164 }
165
166 None
167 }
168}
169
170#[derive(Default)]
171struct Trace {
172 executed: Vec<Backtrace>,
173 scheduled: Vec<Vec<Backtrace>>,
174 spawned_from_foreground: Vec<Vec<Backtrace>>,
175}
176
177impl Trace {
178 fn record(&mut self, state: &DeterministicState, executed: Backtrace) {
179 self.scheduled.push(
180 state
181 .scheduled
182 .iter()
183 .map(|(_, backtrace)| backtrace.clone())
184 .collect(),
185 );
186 self.spawned_from_foreground.push(
187 state
188 .spawned_from_foreground
189 .iter()
190 .map(|(_, backtrace)| backtrace.clone())
191 .collect(),
192 );
193 self.executed.push(executed);
194 }
195
196 fn resolve(&mut self) {
197 for backtrace in &mut self.executed {
198 backtrace.resolve();
199 }
200
201 for backtraces in &mut self.scheduled {
202 for backtrace in backtraces {
203 backtrace.resolve();
204 }
205 }
206
207 for backtraces in &mut self.spawned_from_foreground {
208 for backtrace in backtraces {
209 backtrace.resolve();
210 }
211 }
212 }
213}
214
215impl Debug for Trace {
216 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
217 struct FirstCwdFrameInBacktrace<'a>(&'a Backtrace);
218
219 impl<'a> Debug for FirstCwdFrameInBacktrace<'a> {
220 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> std::fmt::Result {
221 let cwd = std::env::current_dir().unwrap();
222 let mut print_path = |fmt: &mut fmt::Formatter<'_>, path: BytesOrWideString<'_>| {
223 fmt::Display::fmt(&path, fmt)
224 };
225 let mut fmt = BacktraceFmt::new(f, backtrace::PrintFmt::Full, &mut print_path);
226 for frame in self.0.frames() {
227 let mut formatted_frame = fmt.frame();
228 if frame
229 .symbols()
230 .iter()
231 .any(|s| s.filename().map_or(false, |f| f.starts_with(&cwd)))
232 {
233 formatted_frame.backtrace_frame(frame)?;
234 break;
235 }
236 }
237 fmt.finish()
238 }
239 }
240
241 for ((backtrace, scheduled), spawned_from_foreground) in self
242 .executed
243 .iter()
244 .zip(&self.scheduled)
245 .zip(&self.spawned_from_foreground)
246 {
247 writeln!(f, "Scheduled")?;
248 for backtrace in scheduled {
249 writeln!(f, "- {:?}", FirstCwdFrameInBacktrace(backtrace))?;
250 }
251 if scheduled.is_empty() {
252 writeln!(f, "None")?;
253 }
254 writeln!(f, "==========")?;
255
256 writeln!(f, "Spawned from foreground")?;
257 for backtrace in spawned_from_foreground {
258 writeln!(f, "- {:?}", FirstCwdFrameInBacktrace(backtrace))?;
259 }
260 if spawned_from_foreground.is_empty() {
261 writeln!(f, "None")?;
262 }
263 writeln!(f, "==========")?;
264
265 writeln!(f, "Run: {:?}", FirstCwdFrameInBacktrace(backtrace))?;
266 writeln!(f, "+++++++++++++++++++")?;
267 }
268
269 Ok(())
270 }
271}
272
273impl Drop for Trace {
274 fn drop(&mut self) {
275 let trace_on_panic = if let Ok(trace_on_panic) = std::env::var("EXECUTOR_TRACE_ON_PANIC") {
276 trace_on_panic == "1" || trace_on_panic == "true"
277 } else {
278 false
279 };
280 let trace_always = if let Ok(trace_always) = std::env::var("EXECUTOR_TRACE_ALWAYS") {
281 trace_always == "1" || trace_always == "true"
282 } else {
283 false
284 };
285
286 if trace_always || (trace_on_panic && thread::panicking()) {
287 self.resolve();
288 dbg!(self);
289 }
290 }
291}
292
293impl Foreground {
294 pub fn platform(dispatcher: Arc<dyn platform::Dispatcher>) -> Result<Self> {
295 if dispatcher.is_main_thread() {
296 Ok(Self::Platform {
297 dispatcher,
298 _not_send_or_sync: PhantomData,
299 })
300 } else {
301 Err(anyhow!("must be constructed on main thread"))
302 }
303 }
304
305 pub fn test() -> Self {
306 Self::Test(smol::LocalExecutor::new())
307 }
308
309 pub fn spawn<T: 'static>(&self, future: impl Future<Output = T> + 'static) -> Task<T> {
310 match self {
311 Self::Platform { dispatcher, .. } => {
312 let dispatcher = dispatcher.clone();
313 let schedule = move |runnable: Runnable| dispatcher.run_on_main_thread(runnable);
314 let (runnable, task) = async_task::spawn_local(future, schedule);
315 runnable.schedule();
316 task
317 }
318 Self::Test(executor) => executor.spawn(future),
319 Self::Deterministic(executor) => executor.spawn_from_foreground(future),
320 }
321 }
322
323 pub fn run<T: 'static>(&self, future: impl 'static + Future<Output = T>) -> T {
324 match self {
325 Self::Platform { .. } => panic!("you can't call run on a platform foreground executor"),
326 Self::Test(executor) => smol::block_on(executor.run(future)),
327 Self::Deterministic(executor) => executor.run(future),
328 }
329 }
330
331 pub fn reset(&self) {
332 match self {
333 Self::Platform { .. } => panic!("can't call this method on a platform executor"),
334 Self::Test(_) => panic!("can't call this method on a test executor"),
335 Self::Deterministic(executor) => {
336 let state = &mut *executor.state.lock();
337 state.rng = StdRng::seed_from_u64(state.seed);
338 }
339 }
340 }
341
342 pub fn forbid_parking(&self) {
343 match self {
344 Self::Platform { .. } => panic!("can't call this method on a platform executor"),
345 Self::Test(_) => panic!("can't call this method on a test executor"),
346 Self::Deterministic(executor) => {
347 executor.state.lock().forbid_parking = true;
348 }
349 }
350 }
351}
352
353impl Background {
354 pub fn new() -> Self {
355 let executor = Arc::new(Executor::new());
356 let stop = channel::unbounded::<()>();
357
358 for i in 0..2 * num_cpus::get() {
359 let executor = executor.clone();
360 let stop = stop.1.clone();
361 thread::Builder::new()
362 .name(format!("background-executor-{}", i))
363 .spawn(move || smol::block_on(executor.run(stop.recv())))
364 .unwrap();
365 }
366
367 Self::Production {
368 executor,
369 _stop: stop.0,
370 }
371 }
372
373 pub fn num_cpus(&self) -> usize {
374 num_cpus::get()
375 }
376
377 pub fn spawn<T, F>(&self, future: F) -> Task<T>
378 where
379 T: 'static + Send,
380 F: Send + Future<Output = T> + 'static,
381 {
382 match self {
383 Self::Production { executor, .. } => executor.spawn(future),
384 Self::Deterministic(executor) => executor.spawn(future),
385 }
386 }
387
388 pub fn block_on<F, T>(&self, timeout: Duration, future: F) -> Option<T>
389 where
390 T: 'static,
391 F: Future<Output = T>,
392 {
393 match self {
394 Self::Production { .. } => {
395 smol::block_on(async move { util::timeout(timeout, future).await.ok() })
396 }
397 Self::Deterministic(executor) => {
398 let max_ticks = executor.state.lock().rng.gen_range(1..=1000);
399 executor.block_on(max_ticks, future)
400 }
401 }
402 }
403
404 pub async fn scoped<'scope, F>(&self, scheduler: F)
405 where
406 F: FnOnce(&mut Scope<'scope>),
407 {
408 let mut scope = Scope {
409 futures: Default::default(),
410 _phantom: PhantomData,
411 };
412 (scheduler)(&mut scope);
413 let spawned = scope
414 .futures
415 .into_iter()
416 .map(|f| self.spawn(f))
417 .collect::<Vec<_>>();
418 for task in spawned {
419 task.await;
420 }
421 }
422}
423
424pub struct Scope<'a> {
425 futures: Vec<Pin<Box<dyn Future<Output = ()> + Send + 'static>>>,
426 _phantom: PhantomData<&'a ()>,
427}
428
429impl<'a> Scope<'a> {
430 pub fn spawn<F>(&mut self, f: F)
431 where
432 F: Future<Output = ()> + Send + 'a,
433 {
434 let f = unsafe {
435 mem::transmute::<
436 Pin<Box<dyn Future<Output = ()> + Send + 'a>>,
437 Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
438 >(Box::pin(f))
439 };
440 self.futures.push(f);
441 }
442}
443
444pub fn deterministic(seed: u64) -> (Rc<Foreground>, Arc<Background>) {
445 let executor = Arc::new(Deterministic::new(seed));
446 (
447 Rc::new(Foreground::Deterministic(executor.clone())),
448 Arc::new(Background::Deterministic(executor)),
449 )
450}