1use anyhow::{anyhow, Result};
2use async_task::Runnable;
3pub use async_task::Task;
4use backtrace::{Backtrace, BacktraceFmt, BytesOrWideString};
5use parking_lot::Mutex;
6use rand::prelude::*;
7use smol::{channel, prelude::*, Executor};
8use std::{
9 fmt::{self, Debug},
10 marker::PhantomData,
11 mem,
12 pin::Pin,
13 rc::Rc,
14 sync::{
15 atomic::{AtomicBool, Ordering::SeqCst},
16 Arc,
17 },
18 task::{Context, Poll},
19 thread,
20 time::Duration,
21};
22use waker_fn::waker_fn;
23
24use crate::{platform, util};
25
26pub enum Foreground {
27 Platform {
28 dispatcher: Arc<dyn platform::Dispatcher>,
29 _not_send_or_sync: PhantomData<Rc<()>>,
30 },
31 Test(smol::LocalExecutor<'static>),
32 Deterministic(Arc<Deterministic>),
33}
34
35pub enum Background {
36 Deterministic(Arc<Deterministic>),
37 Production {
38 executor: Arc<smol::Executor<'static>>,
39 _stop: channel::Sender<()>,
40 },
41}
42
43struct DeterministicState {
44 rng: StdRng,
45 seed: u64,
46 scheduled: Vec<(Runnable, Backtrace)>,
47 spawned_from_foreground: Vec<(Runnable, Backtrace)>,
48 forbid_parking: bool,
49}
50
51pub struct Deterministic {
52 state: Arc<Mutex<DeterministicState>>,
53 parker: Mutex<parking::Parker>,
54}
55
56impl Deterministic {
57 fn new(seed: u64) -> Self {
58 Self {
59 state: Arc::new(Mutex::new(DeterministicState {
60 rng: StdRng::seed_from_u64(seed),
61 seed,
62 scheduled: Default::default(),
63 spawned_from_foreground: Default::default(),
64 forbid_parking: false,
65 })),
66 parker: Default::default(),
67 }
68 }
69
70 pub fn spawn_from_foreground<F, T>(&self, future: F) -> Task<T>
71 where
72 T: 'static,
73 F: Future<Output = T> + 'static,
74 {
75 let backtrace = Backtrace::new_unresolved();
76 let scheduled_once = AtomicBool::new(false);
77 let state = self.state.clone();
78 let unparker = self.parker.lock().unparker();
79 let (runnable, task) = async_task::spawn_local(future, move |runnable| {
80 let mut state = state.lock();
81 let backtrace = backtrace.clone();
82 if scheduled_once.fetch_or(true, SeqCst) {
83 state.scheduled.push((runnable, backtrace));
84 } else {
85 state.spawned_from_foreground.push((runnable, backtrace));
86 }
87 unparker.unpark();
88 });
89 runnable.schedule();
90 task
91 }
92
93 pub fn spawn<F, T>(&self, future: F) -> Task<T>
94 where
95 T: 'static + Send,
96 F: 'static + Send + Future<Output = T>,
97 {
98 let backtrace = Backtrace::new_unresolved();
99 let state = self.state.clone();
100 let unparker = self.parker.lock().unparker();
101 let (runnable, task) = async_task::spawn(future, move |runnable| {
102 let mut state = state.lock();
103 state.scheduled.push((runnable, backtrace.clone()));
104 unparker.unpark();
105 });
106 runnable.schedule();
107 task
108 }
109
110 pub fn run<F, T>(&self, future: F) -> T
111 where
112 T: 'static,
113 F: Future<Output = T> + 'static,
114 {
115 self.block_on(usize::MAX, future).unwrap()
116 }
117
118 pub fn block_on<F, T>(&self, max_ticks: usize, future: F) -> Option<T>
119 where
120 T: 'static,
121 F: Future<Output = T>,
122 {
123 smol::pin!(future);
124
125 let unparker = self.parker.lock().unparker();
126 let waker = waker_fn(move || {
127 unparker.unpark();
128 });
129
130 let mut cx = Context::from_waker(&waker);
131 let mut trace = Trace::default();
132 for _ in 0..max_ticks {
133 let mut state = self.state.lock();
134 let runnable_count = state.scheduled.len() + state.spawned_from_foreground.len();
135 let ix = state.rng.gen_range(0..=runnable_count);
136 if ix < state.scheduled.len() {
137 let (_, backtrace) = &state.scheduled[ix];
138 trace.record(&state, backtrace.clone());
139 let runnable = state.scheduled.remove(ix).0;
140 drop(state);
141 runnable.run();
142 } else if ix < runnable_count {
143 let (_, backtrace) = &state.spawned_from_foreground[0];
144 trace.record(&state, backtrace.clone());
145 let runnable = state.spawned_from_foreground.remove(0).0;
146 drop(state);
147 runnable.run();
148 } else {
149 drop(state);
150 if let Poll::Ready(result) = future.as_mut().poll(&mut cx) {
151 return Some(result);
152 }
153 let state = &mut *self.state.lock();
154 if state.scheduled.is_empty() && state.spawned_from_foreground.is_empty() {
155 if state.forbid_parking {
156 panic!("deterministic executor parked after a call to forbid_parking");
157 }
158 drop(state);
159 self.parker.lock().park();
160 }
161
162 continue;
163 }
164 }
165
166 None
167 }
168}
169
170#[derive(Default)]
171struct Trace {
172 executed: Vec<Backtrace>,
173 scheduled: Vec<Vec<Backtrace>>,
174 spawned_from_foreground: Vec<Vec<Backtrace>>,
175}
176
177impl Trace {
178 fn record(&mut self, state: &DeterministicState, executed: Backtrace) {
179 self.scheduled.push(
180 state
181 .scheduled
182 .iter()
183 .map(|(_, backtrace)| backtrace.clone())
184 .collect(),
185 );
186 self.spawned_from_foreground.push(
187 state
188 .spawned_from_foreground
189 .iter()
190 .map(|(_, backtrace)| backtrace.clone())
191 .collect(),
192 );
193 self.executed.push(executed);
194 }
195
196 fn resolve(&mut self) {
197 for backtrace in &mut self.executed {
198 backtrace.resolve();
199 }
200
201 for backtraces in &mut self.scheduled {
202 for backtrace in backtraces {
203 backtrace.resolve();
204 }
205 }
206
207 for backtraces in &mut self.spawned_from_foreground {
208 for backtrace in backtraces {
209 backtrace.resolve();
210 }
211 }
212 }
213}
214
215impl Debug for Trace {
216 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
217 struct FirstCwdFrameInBacktrace<'a>(&'a Backtrace);
218
219 impl<'a> Debug for FirstCwdFrameInBacktrace<'a> {
220 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> std::fmt::Result {
221 let cwd = std::env::current_dir().unwrap();
222 let mut print_path = |fmt: &mut fmt::Formatter<'_>, path: BytesOrWideString<'_>| {
223 fmt::Display::fmt(&path, fmt)
224 };
225 let mut fmt = BacktraceFmt::new(f, backtrace::PrintFmt::Full, &mut print_path);
226 for frame in self.0.frames() {
227 let mut formatted_frame = fmt.frame();
228 if frame
229 .symbols()
230 .iter()
231 .any(|s| s.filename().map_or(false, |f| f.starts_with(&cwd)))
232 {
233 formatted_frame.backtrace_frame(frame)?;
234 break;
235 }
236 }
237 fmt.finish()
238 }
239 }
240
241 for ((backtrace, scheduled), spawned_from_foreground) in self
242 .executed
243 .iter()
244 .zip(&self.scheduled)
245 .zip(&self.spawned_from_foreground)
246 {
247 writeln!(f, "Scheduled")?;
248 for backtrace in scheduled {
249 writeln!(f, "- {:?}", FirstCwdFrameInBacktrace(backtrace))?;
250 }
251 if scheduled.is_empty() {
252 writeln!(f, "None")?;
253 }
254 writeln!(f, "==========")?;
255
256 writeln!(f, "Spawned from foreground")?;
257 for backtrace in spawned_from_foreground {
258 writeln!(f, "- {:?}", FirstCwdFrameInBacktrace(backtrace))?;
259 }
260 if spawned_from_foreground.is_empty() {
261 writeln!(f, "None")?;
262 }
263 writeln!(f, "==========")?;
264
265 writeln!(f, "Run: {:?}", FirstCwdFrameInBacktrace(backtrace))?;
266 writeln!(f, "+++++++++++++++++++")?;
267 }
268
269 Ok(())
270 }
271}
272
273impl Drop for Trace {
274 fn drop(&mut self) {
275 let trace_on_panic = if let Ok(trace_on_panic) = std::env::var("EXECUTOR_TRACE_ON_PANIC") {
276 trace_on_panic == "1" || trace_on_panic == "true"
277 } else {
278 false
279 };
280 let trace_always = if let Ok(trace_always) = std::env::var("EXECUTOR_TRACE_ALWAYS") {
281 trace_always == "1" || trace_always == "true"
282 } else {
283 false
284 };
285
286 if trace_always || (trace_on_panic && thread::panicking()) {
287 self.resolve();
288 dbg!(self);
289 }
290 }
291}
292
293impl Foreground {
294 pub fn platform(dispatcher: Arc<dyn platform::Dispatcher>) -> Result<Self> {
295 if dispatcher.is_main_thread() {
296 Ok(Self::Platform {
297 dispatcher,
298 _not_send_or_sync: PhantomData,
299 })
300 } else {
301 Err(anyhow!("must be constructed on main thread"))
302 }
303 }
304
305 pub fn test() -> Self {
306 Self::Test(smol::LocalExecutor::new())
307 }
308
309 pub fn spawn<T: 'static>(&self, future: impl Future<Output = T> + 'static) -> Task<T> {
310 match self {
311 Self::Platform { dispatcher, .. } => {
312 let dispatcher = dispatcher.clone();
313 let schedule = move |runnable: Runnable| dispatcher.run_on_main_thread(runnable);
314 let (runnable, task) = async_task::spawn_local(future, schedule);
315 runnable.schedule();
316 task
317 }
318 Self::Test(executor) => executor.spawn(future),
319 Self::Deterministic(executor) => executor.spawn_from_foreground(future),
320 }
321 }
322
323 pub fn run<T: 'static>(&self, future: impl 'static + Future<Output = T>) -> T {
324 match self {
325 Self::Platform { .. } => panic!("you can't call run on a platform foreground executor"),
326 Self::Test(executor) => smol::block_on(executor.run(future)),
327 Self::Deterministic(executor) => executor.run(future),
328 }
329 }
330
331 pub fn forbid_parking(&self) {
332 match self {
333 Self::Platform { .. } => panic!("can't call this method on a platform executor"),
334 Self::Test(_) => panic!("can't call this method on a test executor"),
335 Self::Deterministic(executor) => {
336 let mut state = executor.state.lock();
337 state.forbid_parking = true;
338 state.rng = StdRng::seed_from_u64(state.seed);
339 }
340 }
341 }
342}
343
344impl Background {
345 pub fn new() -> Self {
346 let executor = Arc::new(Executor::new());
347 let stop = channel::unbounded::<()>();
348
349 for i in 0..2 * num_cpus::get() {
350 let executor = executor.clone();
351 let stop = stop.1.clone();
352 thread::Builder::new()
353 .name(format!("background-executor-{}", i))
354 .spawn(move || smol::block_on(executor.run(stop.recv())))
355 .unwrap();
356 }
357
358 Self::Production {
359 executor,
360 _stop: stop.0,
361 }
362 }
363
364 pub fn num_cpus(&self) -> usize {
365 num_cpus::get()
366 }
367
368 pub fn spawn<T, F>(&self, future: F) -> Task<T>
369 where
370 T: 'static + Send,
371 F: Send + Future<Output = T> + 'static,
372 {
373 match self {
374 Self::Production { executor, .. } => executor.spawn(future),
375 Self::Deterministic(executor) => executor.spawn(future),
376 }
377 }
378
379 pub fn block_on<F, T>(&self, timeout: Duration, future: F) -> Option<T>
380 where
381 T: 'static,
382 F: Future<Output = T>,
383 {
384 match self {
385 Self::Production { .. } => {
386 smol::block_on(async move { util::timeout(timeout, future).await.ok() })
387 }
388 Self::Deterministic(executor) => {
389 let max_ticks = executor.state.lock().rng.gen_range(1..=1000);
390 executor.block_on(max_ticks, future)
391 }
392 }
393 }
394
395 pub async fn scoped<'scope, F>(&self, scheduler: F)
396 where
397 F: FnOnce(&mut Scope<'scope>),
398 {
399 let mut scope = Scope {
400 futures: Default::default(),
401 _phantom: PhantomData,
402 };
403 (scheduler)(&mut scope);
404 let spawned = scope
405 .futures
406 .into_iter()
407 .map(|f| self.spawn(f))
408 .collect::<Vec<_>>();
409 for task in spawned {
410 task.await;
411 }
412 }
413}
414
415pub struct Scope<'a> {
416 futures: Vec<Pin<Box<dyn Future<Output = ()> + Send + 'static>>>,
417 _phantom: PhantomData<&'a ()>,
418}
419
420impl<'a> Scope<'a> {
421 pub fn spawn<F>(&mut self, f: F)
422 where
423 F: Future<Output = ()> + Send + 'a,
424 {
425 let f = unsafe {
426 mem::transmute::<
427 Pin<Box<dyn Future<Output = ()> + Send + 'a>>,
428 Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
429 >(Box::pin(f))
430 };
431 self.futures.push(f);
432 }
433}
434
435pub fn deterministic(seed: u64) -> (Rc<Foreground>, Arc<Background>) {
436 let executor = Arc::new(Deterministic::new(seed));
437 (
438 Rc::new(Foreground::Deterministic(executor.clone())),
439 Arc::new(Background::Deterministic(executor)),
440 )
441}