1use anyhow::{anyhow, Result};
2use async_task::Runnable;
3pub use async_task::Task;
4use backtrace::{Backtrace, BacktraceFmt, BytesOrWideString};
5use parking_lot::Mutex;
6use rand::prelude::*;
7use smol::{channel, prelude::*, Executor};
8use std::{
9 fmt::{self, Debug},
10 marker::PhantomData,
11 mem,
12 pin::Pin,
13 rc::Rc,
14 sync::{
15 atomic::{AtomicBool, Ordering::SeqCst},
16 mpsc::SyncSender,
17 Arc,
18 },
19 thread,
20};
21
22use crate::platform;
23
24pub enum Foreground {
25 Platform {
26 dispatcher: Arc<dyn platform::Dispatcher>,
27 _not_send_or_sync: PhantomData<Rc<()>>,
28 },
29 Test(smol::LocalExecutor<'static>),
30 Deterministic(Arc<Deterministic>),
31}
32
33pub enum Background {
34 Deterministic(Arc<Deterministic>),
35 Production {
36 executor: Arc<smol::Executor<'static>>,
37 _stop: channel::Sender<()>,
38 },
39}
40
41struct DeterministicState {
42 rng: StdRng,
43 seed: u64,
44 scheduled: Vec<(Runnable, Backtrace)>,
45 spawned_from_foreground: Vec<(Runnable, Backtrace)>,
46 waker: Option<SyncSender<()>>,
47}
48
49pub struct Deterministic(Arc<Mutex<DeterministicState>>);
50
51impl Deterministic {
52 fn new(seed: u64) -> Self {
53 Self(Arc::new(Mutex::new(DeterministicState {
54 rng: StdRng::seed_from_u64(seed),
55 seed,
56 scheduled: Default::default(),
57 spawned_from_foreground: Default::default(),
58 waker: None,
59 })))
60 }
61
62 pub fn spawn_from_foreground<F, T>(&self, future: F) -> Task<T>
63 where
64 T: 'static,
65 F: Future<Output = T> + 'static,
66 {
67 let backtrace = Backtrace::new_unresolved();
68 let scheduled_once = AtomicBool::new(false);
69 let state = self.0.clone();
70 let (runnable, task) = async_task::spawn_local(future, move |runnable| {
71 let mut state = state.lock();
72 let backtrace = backtrace.clone();
73 if scheduled_once.fetch_or(true, SeqCst) {
74 state.scheduled.push((runnable, backtrace));
75 } else {
76 state.spawned_from_foreground.push((runnable, backtrace));
77 }
78 if let Some(waker) = state.waker.as_ref() {
79 waker.send(()).ok();
80 }
81 });
82 runnable.schedule();
83 task
84 }
85
86 pub fn spawn<F, T>(&self, future: F) -> Task<T>
87 where
88 T: 'static + Send,
89 F: 'static + Send + Future<Output = T>,
90 {
91 let backtrace = Backtrace::new_unresolved();
92 let state = self.0.clone();
93 let (runnable, task) = async_task::spawn(future, move |runnable| {
94 let mut state = state.lock();
95 state.scheduled.push((runnable, backtrace.clone()));
96 if let Some(waker) = state.waker.as_ref() {
97 waker.send(()).ok();
98 }
99 });
100 runnable.schedule();
101 task
102 }
103
104 pub fn run<F, T>(&self, future: F) -> T
105 where
106 T: 'static,
107 F: Future<Output = T> + 'static,
108 {
109 let (wake_tx, wake_rx) = std::sync::mpsc::sync_channel(32);
110 let state = self.0.clone();
111 state.lock().waker = Some(wake_tx);
112
113 let (output_tx, output_rx) = std::sync::mpsc::channel();
114 self.spawn_from_foreground(async move {
115 let output = future.await;
116 output_tx.send(output).unwrap();
117 })
118 .detach();
119
120 let mut trace = Trace::default();
121 loop {
122 if let Ok(value) = output_rx.try_recv() {
123 state.lock().waker = None;
124 return value;
125 }
126
127 wake_rx.recv().unwrap();
128 let runnable = {
129 let state = &mut *state.lock();
130 let ix = state
131 .rng
132 .gen_range(0..state.scheduled.len() + state.spawned_from_foreground.len());
133 if ix < state.scheduled.len() {
134 let (_, backtrace) = &state.scheduled[ix];
135 trace.record(&state, backtrace.clone());
136 state.scheduled.remove(ix).0
137 } else {
138 let (_, backtrace) = &state.spawned_from_foreground[0];
139 trace.record(&state, backtrace.clone());
140 state.spawned_from_foreground.remove(0).0
141 }
142 };
143
144 runnable.run();
145 }
146 }
147}
148
149#[derive(Default)]
150struct Trace {
151 executed: Vec<Backtrace>,
152 scheduled: Vec<Vec<Backtrace>>,
153 spawned_from_foreground: Vec<Vec<Backtrace>>,
154}
155
156impl Trace {
157 fn record(&mut self, state: &DeterministicState, executed: Backtrace) {
158 self.scheduled.push(
159 state
160 .scheduled
161 .iter()
162 .map(|(_, backtrace)| backtrace.clone())
163 .collect(),
164 );
165 self.spawned_from_foreground.push(
166 state
167 .spawned_from_foreground
168 .iter()
169 .map(|(_, backtrace)| backtrace.clone())
170 .collect(),
171 );
172 self.executed.push(executed);
173 }
174
175 fn resolve(&mut self) {
176 for backtrace in &mut self.executed {
177 backtrace.resolve();
178 }
179
180 for backtraces in &mut self.scheduled {
181 for backtrace in backtraces {
182 backtrace.resolve();
183 }
184 }
185
186 for backtraces in &mut self.spawned_from_foreground {
187 for backtrace in backtraces {
188 backtrace.resolve();
189 }
190 }
191 }
192}
193
194impl Debug for Trace {
195 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
196 struct FirstCwdFrameInBacktrace<'a>(&'a Backtrace);
197
198 impl<'a> Debug for FirstCwdFrameInBacktrace<'a> {
199 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> std::fmt::Result {
200 let cwd = std::env::current_dir().unwrap();
201 let mut print_path = |fmt: &mut fmt::Formatter<'_>, path: BytesOrWideString<'_>| {
202 fmt::Display::fmt(&path, fmt)
203 };
204 let mut fmt = BacktraceFmt::new(f, backtrace::PrintFmt::Full, &mut print_path);
205 for frame in self.0.frames() {
206 let mut formatted_frame = fmt.frame();
207 if frame
208 .symbols()
209 .iter()
210 .any(|s| s.filename().map_or(false, |f| f.starts_with(&cwd)))
211 {
212 formatted_frame.backtrace_frame(frame)?;
213 break;
214 }
215 }
216 fmt.finish()
217 }
218 }
219
220 for ((backtrace, scheduled), spawned_from_foreground) in self
221 .executed
222 .iter()
223 .zip(&self.scheduled)
224 .zip(&self.spawned_from_foreground)
225 {
226 writeln!(f, "Scheduled")?;
227 for backtrace in scheduled {
228 writeln!(f, "- {:?}", FirstCwdFrameInBacktrace(backtrace))?;
229 }
230 if scheduled.is_empty() {
231 writeln!(f, "None")?;
232 }
233 writeln!(f, "==========")?;
234
235 writeln!(f, "Spawned from foreground")?;
236 for backtrace in spawned_from_foreground {
237 writeln!(f, "- {:?}", FirstCwdFrameInBacktrace(backtrace))?;
238 }
239 if spawned_from_foreground.is_empty() {
240 writeln!(f, "None")?;
241 }
242 writeln!(f, "==========")?;
243
244 writeln!(f, "Run: {:?}", FirstCwdFrameInBacktrace(backtrace))?;
245 writeln!(f, "+++++++++++++++++++")?;
246 }
247
248 Ok(())
249 }
250}
251
252impl Drop for Trace {
253 fn drop(&mut self) {
254 let trace_on_panic = if let Ok(trace_on_panic) = std::env::var("EXECUTOR_TRACE_ON_PANIC") {
255 trace_on_panic == "1" || trace_on_panic == "true"
256 } else {
257 false
258 };
259 let trace_always = if let Ok(trace_always) = std::env::var("EXECUTOR_TRACE_ALWAYS") {
260 trace_always == "1" || trace_always == "true"
261 } else {
262 false
263 };
264
265 if trace_always || (trace_on_panic && thread::panicking()) {
266 self.resolve();
267 dbg!(self);
268 }
269 }
270}
271
272impl Foreground {
273 pub fn platform(dispatcher: Arc<dyn platform::Dispatcher>) -> Result<Self> {
274 if dispatcher.is_main_thread() {
275 Ok(Self::Platform {
276 dispatcher,
277 _not_send_or_sync: PhantomData,
278 })
279 } else {
280 Err(anyhow!("must be constructed on main thread"))
281 }
282 }
283
284 pub fn test() -> Self {
285 Self::Test(smol::LocalExecutor::new())
286 }
287
288 pub fn spawn<T: 'static>(&self, future: impl Future<Output = T> + 'static) -> Task<T> {
289 match self {
290 Self::Platform { dispatcher, .. } => {
291 let dispatcher = dispatcher.clone();
292 let schedule = move |runnable: Runnable| dispatcher.run_on_main_thread(runnable);
293 let (runnable, task) = async_task::spawn_local(future, schedule);
294 runnable.schedule();
295 task
296 }
297 Self::Test(executor) => executor.spawn(future),
298 Self::Deterministic(executor) => executor.spawn_from_foreground(future),
299 }
300 }
301
302 pub fn run<T: 'static>(&self, future: impl 'static + Future<Output = T>) -> T {
303 match self {
304 Self::Platform { .. } => panic!("you can't call run on a platform foreground executor"),
305 Self::Test(executor) => smol::block_on(executor.run(future)),
306 Self::Deterministic(executor) => executor.run(future),
307 }
308 }
309
310 pub fn reset(&self) {
311 match self {
312 Self::Platform { .. } => panic!("can't call this method on a platform executor"),
313 Self::Test(_) => panic!("can't call this method on a test executor"),
314 Self::Deterministic(executor) => {
315 let state = &mut *executor.0.lock();
316 state.rng = StdRng::seed_from_u64(state.seed);
317 }
318 }
319 }
320}
321
322impl Background {
323 pub fn new() -> Self {
324 let executor = Arc::new(Executor::new());
325 let stop = channel::unbounded::<()>();
326
327 for i in 0..2 * num_cpus::get() {
328 let executor = executor.clone();
329 let stop = stop.1.clone();
330 thread::Builder::new()
331 .name(format!("background-executor-{}", i))
332 .spawn(move || smol::block_on(executor.run(stop.recv())))
333 .unwrap();
334 }
335
336 Self::Production {
337 executor,
338 _stop: stop.0,
339 }
340 }
341
342 pub fn num_cpus(&self) -> usize {
343 num_cpus::get()
344 }
345
346 pub fn spawn<T, F>(&self, future: F) -> Task<T>
347 where
348 T: 'static + Send,
349 F: Send + Future<Output = T> + 'static,
350 {
351 match self {
352 Self::Production { executor, .. } => executor.spawn(future),
353 Self::Deterministic(executor) => executor.spawn(future),
354 }
355 }
356
357 pub async fn scoped<'scope, F>(&self, scheduler: F)
358 where
359 F: FnOnce(&mut Scope<'scope>),
360 {
361 let mut scope = Scope {
362 futures: Default::default(),
363 _phantom: PhantomData,
364 };
365 (scheduler)(&mut scope);
366 let spawned = scope
367 .futures
368 .into_iter()
369 .map(|f| self.spawn(f))
370 .collect::<Vec<_>>();
371 for task in spawned {
372 task.await;
373 }
374 }
375}
376
377pub struct Scope<'a> {
378 futures: Vec<Pin<Box<dyn Future<Output = ()> + Send + 'static>>>,
379 _phantom: PhantomData<&'a ()>,
380}
381
382impl<'a> Scope<'a> {
383 pub fn spawn<F>(&mut self, f: F)
384 where
385 F: Future<Output = ()> + Send + 'a,
386 {
387 let f = unsafe {
388 mem::transmute::<
389 Pin<Box<dyn Future<Output = ()> + Send + 'a>>,
390 Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
391 >(Box::pin(f))
392 };
393 self.futures.push(f);
394 }
395}
396
397pub fn deterministic(seed: u64) -> (Rc<Foreground>, Arc<Background>) {
398 let executor = Arc::new(Deterministic::new(seed));
399 (
400 Rc::new(Foreground::Deterministic(executor.clone())),
401 Arc::new(Background::Deterministic(executor)),
402 )
403}