1use anyhow::{anyhow, Result};
2use async_task::Runnable;
3pub use async_task::Task;
4use backtrace::{Backtrace, BacktraceFmt, BytesOrWideString};
5use futures::task::noop_waker;
6use parking_lot::Mutex;
7use rand::prelude::*;
8use smol::{channel, prelude::*, Executor};
9use std::{
10 fmt::{self, Debug},
11 marker::PhantomData,
12 mem,
13 pin::Pin,
14 rc::Rc,
15 sync::{
16 atomic::{AtomicBool, Ordering::SeqCst},
17 Arc,
18 },
19 task::{Context, Poll},
20 thread,
21 time::Duration,
22};
23
24use crate::{platform, util};
25
26pub enum Foreground {
27 Platform {
28 dispatcher: Arc<dyn platform::Dispatcher>,
29 _not_send_or_sync: PhantomData<Rc<()>>,
30 },
31 Test(smol::LocalExecutor<'static>),
32 Deterministic(Arc<Deterministic>),
33}
34
35pub enum Background {
36 Deterministic(Arc<Deterministic>),
37 Production {
38 executor: Arc<smol::Executor<'static>>,
39 _stop: channel::Sender<()>,
40 },
41}
42
43struct DeterministicState {
44 rng: StdRng,
45 seed: u64,
46 scheduled: Vec<(Runnable, Backtrace)>,
47 spawned_from_foreground: Vec<(Runnable, Backtrace)>,
48}
49
50pub struct Deterministic(Arc<Mutex<DeterministicState>>);
51
52impl Deterministic {
53 fn new(seed: u64) -> Self {
54 Self(Arc::new(Mutex::new(DeterministicState {
55 rng: StdRng::seed_from_u64(seed),
56 seed,
57 scheduled: Default::default(),
58 spawned_from_foreground: Default::default(),
59 })))
60 }
61
62 pub fn spawn_from_foreground<F, T>(&self, future: F) -> Task<T>
63 where
64 T: 'static,
65 F: Future<Output = T> + 'static,
66 {
67 let backtrace = Backtrace::new_unresolved();
68 let scheduled_once = AtomicBool::new(false);
69 let state = self.0.clone();
70 let (runnable, task) = async_task::spawn_local(future, move |runnable| {
71 let mut state = state.lock();
72 let backtrace = backtrace.clone();
73 if scheduled_once.fetch_or(true, SeqCst) {
74 state.scheduled.push((runnable, backtrace));
75 } else {
76 state.spawned_from_foreground.push((runnable, backtrace));
77 }
78 });
79 runnable.schedule();
80 task
81 }
82
83 pub fn spawn<F, T>(&self, future: F) -> Task<T>
84 where
85 T: 'static + Send,
86 F: 'static + Send + Future<Output = T>,
87 {
88 let backtrace = Backtrace::new_unresolved();
89 let state = self.0.clone();
90 let (runnable, task) = async_task::spawn(future, move |runnable| {
91 state.lock().scheduled.push((runnable, backtrace.clone()));
92 });
93 runnable.schedule();
94 task
95 }
96
97 pub fn run<F, T>(&self, future: F) -> T
98 where
99 T: 'static,
100 F: Future<Output = T> + 'static,
101 {
102 self.block_on(usize::MAX, future).unwrap()
103 }
104
105 pub fn block_on<F, T>(&self, max_ticks: usize, future: F) -> Option<T>
106 where
107 T: 'static,
108 F: Future<Output = T>,
109 {
110 smol::pin!(future);
111
112 let waker = noop_waker();
113 let mut cx = Context::from_waker(&waker);
114 let mut trace = Trace::default();
115 for _ in 0..max_ticks {
116 let runnable = {
117 let state = &mut *self.0.lock();
118 let runnable_count = state.scheduled.len() + state.spawned_from_foreground.len();
119 let ix = state.rng.gen_range(0..=runnable_count);
120 if ix < state.scheduled.len() {
121 let (_, backtrace) = &state.scheduled[ix];
122 trace.record(&state, backtrace.clone());
123 state.scheduled.remove(ix).0
124 } else if ix < runnable_count {
125 let (_, backtrace) = &state.spawned_from_foreground[0];
126 trace.record(&state, backtrace.clone());
127 state.spawned_from_foreground.remove(0).0
128 } else {
129 if let Poll::Ready(result) = future.as_mut().poll(&mut cx) {
130 return Some(result);
131 }
132
133 if state.scheduled.is_empty() && state.spawned_from_foreground.is_empty() {
134 panic!("detected non-determinism in deterministic executor");
135 } else {
136 continue;
137 }
138 }
139 };
140
141 runnable.run();
142 }
143
144 None
145 }
146}
147
148#[derive(Default)]
149struct Trace {
150 executed: Vec<Backtrace>,
151 scheduled: Vec<Vec<Backtrace>>,
152 spawned_from_foreground: Vec<Vec<Backtrace>>,
153}
154
155impl Trace {
156 fn record(&mut self, state: &DeterministicState, executed: Backtrace) {
157 self.scheduled.push(
158 state
159 .scheduled
160 .iter()
161 .map(|(_, backtrace)| backtrace.clone())
162 .collect(),
163 );
164 self.spawned_from_foreground.push(
165 state
166 .spawned_from_foreground
167 .iter()
168 .map(|(_, backtrace)| backtrace.clone())
169 .collect(),
170 );
171 self.executed.push(executed);
172 }
173
174 fn resolve(&mut self) {
175 for backtrace in &mut self.executed {
176 backtrace.resolve();
177 }
178
179 for backtraces in &mut self.scheduled {
180 for backtrace in backtraces {
181 backtrace.resolve();
182 }
183 }
184
185 for backtraces in &mut self.spawned_from_foreground {
186 for backtrace in backtraces {
187 backtrace.resolve();
188 }
189 }
190 }
191}
192
193impl Debug for Trace {
194 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
195 struct FirstCwdFrameInBacktrace<'a>(&'a Backtrace);
196
197 impl<'a> Debug for FirstCwdFrameInBacktrace<'a> {
198 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> std::fmt::Result {
199 let cwd = std::env::current_dir().unwrap();
200 let mut print_path = |fmt: &mut fmt::Formatter<'_>, path: BytesOrWideString<'_>| {
201 fmt::Display::fmt(&path, fmt)
202 };
203 let mut fmt = BacktraceFmt::new(f, backtrace::PrintFmt::Full, &mut print_path);
204 for frame in self.0.frames() {
205 let mut formatted_frame = fmt.frame();
206 if frame
207 .symbols()
208 .iter()
209 .any(|s| s.filename().map_or(false, |f| f.starts_with(&cwd)))
210 {
211 formatted_frame.backtrace_frame(frame)?;
212 break;
213 }
214 }
215 fmt.finish()
216 }
217 }
218
219 for ((backtrace, scheduled), spawned_from_foreground) in self
220 .executed
221 .iter()
222 .zip(&self.scheduled)
223 .zip(&self.spawned_from_foreground)
224 {
225 writeln!(f, "Scheduled")?;
226 for backtrace in scheduled {
227 writeln!(f, "- {:?}", FirstCwdFrameInBacktrace(backtrace))?;
228 }
229 if scheduled.is_empty() {
230 writeln!(f, "None")?;
231 }
232 writeln!(f, "==========")?;
233
234 writeln!(f, "Spawned from foreground")?;
235 for backtrace in spawned_from_foreground {
236 writeln!(f, "- {:?}", FirstCwdFrameInBacktrace(backtrace))?;
237 }
238 if spawned_from_foreground.is_empty() {
239 writeln!(f, "None")?;
240 }
241 writeln!(f, "==========")?;
242
243 writeln!(f, "Run: {:?}", FirstCwdFrameInBacktrace(backtrace))?;
244 writeln!(f, "+++++++++++++++++++")?;
245 }
246
247 Ok(())
248 }
249}
250
251impl Drop for Trace {
252 fn drop(&mut self) {
253 let trace_on_panic = if let Ok(trace_on_panic) = std::env::var("EXECUTOR_TRACE_ON_PANIC") {
254 trace_on_panic == "1" || trace_on_panic == "true"
255 } else {
256 false
257 };
258 let trace_always = if let Ok(trace_always) = std::env::var("EXECUTOR_TRACE_ALWAYS") {
259 trace_always == "1" || trace_always == "true"
260 } else {
261 false
262 };
263
264 if trace_always || (trace_on_panic && thread::panicking()) {
265 self.resolve();
266 dbg!(self);
267 }
268 }
269}
270
271impl Foreground {
272 pub fn platform(dispatcher: Arc<dyn platform::Dispatcher>) -> Result<Self> {
273 if dispatcher.is_main_thread() {
274 Ok(Self::Platform {
275 dispatcher,
276 _not_send_or_sync: PhantomData,
277 })
278 } else {
279 Err(anyhow!("must be constructed on main thread"))
280 }
281 }
282
283 pub fn test() -> Self {
284 Self::Test(smol::LocalExecutor::new())
285 }
286
287 pub fn spawn<T: 'static>(&self, future: impl Future<Output = T> + 'static) -> Task<T> {
288 match self {
289 Self::Platform { dispatcher, .. } => {
290 let dispatcher = dispatcher.clone();
291 let schedule = move |runnable: Runnable| dispatcher.run_on_main_thread(runnable);
292 let (runnable, task) = async_task::spawn_local(future, schedule);
293 runnable.schedule();
294 task
295 }
296 Self::Test(executor) => executor.spawn(future),
297 Self::Deterministic(executor) => executor.spawn_from_foreground(future),
298 }
299 }
300
301 pub fn run<T: 'static>(&self, future: impl 'static + Future<Output = T>) -> T {
302 match self {
303 Self::Platform { .. } => panic!("you can't call run on a platform foreground executor"),
304 Self::Test(executor) => smol::block_on(executor.run(future)),
305 Self::Deterministic(executor) => executor.run(future),
306 }
307 }
308
309 pub fn reset(&self) {
310 match self {
311 Self::Platform { .. } => panic!("can't call this method on a platform executor"),
312 Self::Test(_) => panic!("can't call this method on a test executor"),
313 Self::Deterministic(executor) => {
314 let state = &mut *executor.0.lock();
315 state.rng = StdRng::seed_from_u64(state.seed);
316 }
317 }
318 }
319}
320
321impl Background {
322 pub fn new() -> Self {
323 let executor = Arc::new(Executor::new());
324 let stop = channel::unbounded::<()>();
325
326 for i in 0..2 * num_cpus::get() {
327 let executor = executor.clone();
328 let stop = stop.1.clone();
329 thread::Builder::new()
330 .name(format!("background-executor-{}", i))
331 .spawn(move || smol::block_on(executor.run(stop.recv())))
332 .unwrap();
333 }
334
335 Self::Production {
336 executor,
337 _stop: stop.0,
338 }
339 }
340
341 pub fn num_cpus(&self) -> usize {
342 num_cpus::get()
343 }
344
345 pub fn spawn<T, F>(&self, future: F) -> Task<T>
346 where
347 T: 'static + Send,
348 F: Send + Future<Output = T> + 'static,
349 {
350 match self {
351 Self::Production { executor, .. } => executor.spawn(future),
352 Self::Deterministic(executor) => executor.spawn(future),
353 }
354 }
355
356 pub fn block_on<F, T>(&self, timeout: Duration, future: F) -> Option<T>
357 where
358 T: 'static,
359 F: Future<Output = T>,
360 {
361 match self {
362 Self::Production { .. } => {
363 smol::block_on(async move { util::timeout(timeout, future).await.ok() })
364 }
365 Self::Deterministic(executor) => {
366 let max_ticks = executor.0.lock().rng.gen_range(1..=1000);
367 executor.block_on(max_ticks, future)
368 }
369 }
370 }
371
372 pub async fn scoped<'scope, F>(&self, scheduler: F)
373 where
374 F: FnOnce(&mut Scope<'scope>),
375 {
376 let mut scope = Scope {
377 futures: Default::default(),
378 _phantom: PhantomData,
379 };
380 (scheduler)(&mut scope);
381 let spawned = scope
382 .futures
383 .into_iter()
384 .map(|f| self.spawn(f))
385 .collect::<Vec<_>>();
386 for task in spawned {
387 task.await;
388 }
389 }
390}
391
392pub struct Scope<'a> {
393 futures: Vec<Pin<Box<dyn Future<Output = ()> + Send + 'static>>>,
394 _phantom: PhantomData<&'a ()>,
395}
396
397impl<'a> Scope<'a> {
398 pub fn spawn<F>(&mut self, f: F)
399 where
400 F: Future<Output = ()> + Send + 'a,
401 {
402 let f = unsafe {
403 mem::transmute::<
404 Pin<Box<dyn Future<Output = ()> + Send + 'a>>,
405 Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
406 >(Box::pin(f))
407 };
408 self.futures.push(f);
409 }
410}
411
412pub fn deterministic(seed: u64) -> (Rc<Foreground>, Arc<Background>) {
413 let executor = Arc::new(Deterministic::new(seed));
414 (
415 Rc::new(Foreground::Deterministic(executor.clone())),
416 Arc::new(Background::Deterministic(executor)),
417 )
418}