1use anyhow::{anyhow, Result};
2use async_task::Runnable;
3pub use async_task::Task;
4use backtrace::{Backtrace, BacktraceFmt, BytesOrWideString};
5use parking_lot::Mutex;
6use rand::prelude::*;
7use smol::{channel, prelude::*, Executor};
8use std::{
9 fmt::{self, Debug},
10 marker::PhantomData,
11 mem,
12 pin::Pin,
13 rc::Rc,
14 sync::{
15 atomic::{AtomicBool, Ordering::SeqCst},
16 mpsc::SyncSender,
17 Arc,
18 },
19 thread,
20};
21
22use crate::platform;
23
24pub enum Foreground {
25 Platform {
26 dispatcher: Arc<dyn platform::Dispatcher>,
27 _not_send_or_sync: PhantomData<Rc<()>>,
28 },
29 Test(smol::LocalExecutor<'static>),
30 Deterministic(Arc<Deterministic>),
31}
32
33pub enum Background {
34 Deterministic(Arc<Deterministic>),
35 Production {
36 executor: Arc<smol::Executor<'static>>,
37 threads: usize,
38 _stop: channel::Sender<()>,
39 },
40}
41
42struct DeterministicState {
43 rng: StdRng,
44 seed: u64,
45 scheduled: Vec<(Runnable, Backtrace)>,
46 spawned_from_foreground: Vec<(Runnable, Backtrace)>,
47 waker: Option<SyncSender<()>>,
48}
49
50pub struct Deterministic(Arc<Mutex<DeterministicState>>);
51
52impl Deterministic {
53 fn new(seed: u64) -> Self {
54 Self(Arc::new(Mutex::new(DeterministicState {
55 rng: StdRng::seed_from_u64(seed),
56 seed,
57 scheduled: Default::default(),
58 spawned_from_foreground: Default::default(),
59 waker: None,
60 })))
61 }
62
63 pub fn spawn_from_foreground<F, T>(&self, future: F) -> Task<T>
64 where
65 T: 'static,
66 F: Future<Output = T> + 'static,
67 {
68 let backtrace = Backtrace::new_unresolved();
69 let scheduled_once = AtomicBool::new(false);
70 let state = self.0.clone();
71 let (runnable, task) = async_task::spawn_local(future, move |runnable| {
72 let mut state = state.lock();
73 let backtrace = backtrace.clone();
74 if scheduled_once.fetch_or(true, SeqCst) {
75 state.scheduled.push((runnable, backtrace));
76 } else {
77 state.spawned_from_foreground.push((runnable, backtrace));
78 }
79 if let Some(waker) = state.waker.as_ref() {
80 waker.send(()).ok();
81 }
82 });
83 runnable.schedule();
84 task
85 }
86
87 pub fn spawn<F, T>(&self, future: F) -> Task<T>
88 where
89 T: 'static + Send,
90 F: 'static + Send + Future<Output = T>,
91 {
92 let backtrace = Backtrace::new_unresolved();
93 let state = self.0.clone();
94 let (runnable, task) = async_task::spawn(future, move |runnable| {
95 let mut state = state.lock();
96 state.scheduled.push((runnable, backtrace.clone()));
97 if let Some(waker) = state.waker.as_ref() {
98 waker.send(()).ok();
99 }
100 });
101 runnable.schedule();
102 task
103 }
104
105 pub fn run<F, T>(&self, future: F) -> T
106 where
107 T: 'static,
108 F: Future<Output = T> + 'static,
109 {
110 let (wake_tx, wake_rx) = std::sync::mpsc::sync_channel(32);
111 let state = self.0.clone();
112 state.lock().waker = Some(wake_tx);
113
114 let (output_tx, output_rx) = std::sync::mpsc::channel();
115 self.spawn_from_foreground(async move {
116 let output = future.await;
117 output_tx.send(output).unwrap();
118 })
119 .detach();
120
121 let mut trace = Trace::default();
122 loop {
123 if let Ok(value) = output_rx.try_recv() {
124 state.lock().waker = None;
125 return value;
126 }
127
128 wake_rx.recv().unwrap();
129 let runnable = {
130 let state = &mut *state.lock();
131 let ix = state
132 .rng
133 .gen_range(0..state.scheduled.len() + state.spawned_from_foreground.len());
134 if ix < state.scheduled.len() {
135 let (_, backtrace) = &state.scheduled[ix];
136 trace.record(&state, backtrace.clone());
137 state.scheduled.remove(ix).0
138 } else {
139 let (_, backtrace) = &state.spawned_from_foreground[0];
140 trace.record(&state, backtrace.clone());
141 state.spawned_from_foreground.remove(0).0
142 }
143 };
144
145 runnable.run();
146 }
147 }
148}
149
150#[derive(Default)]
151struct Trace {
152 executed: Vec<Backtrace>,
153 scheduled: Vec<Vec<Backtrace>>,
154 spawned_from_foreground: Vec<Vec<Backtrace>>,
155}
156
157impl Trace {
158 fn record(&mut self, state: &DeterministicState, executed: Backtrace) {
159 self.scheduled.push(
160 state
161 .scheduled
162 .iter()
163 .map(|(_, backtrace)| backtrace.clone())
164 .collect(),
165 );
166 self.spawned_from_foreground.push(
167 state
168 .spawned_from_foreground
169 .iter()
170 .map(|(_, backtrace)| backtrace.clone())
171 .collect(),
172 );
173 self.executed.push(executed);
174 }
175
176 fn resolve(&mut self) {
177 for backtrace in &mut self.executed {
178 backtrace.resolve();
179 }
180
181 for backtraces in &mut self.scheduled {
182 for backtrace in backtraces {
183 backtrace.resolve();
184 }
185 }
186
187 for backtraces in &mut self.spawned_from_foreground {
188 for backtrace in backtraces {
189 backtrace.resolve();
190 }
191 }
192 }
193}
194
195impl Debug for Trace {
196 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
197 struct FirstCwdFrameInBacktrace<'a>(&'a Backtrace);
198
199 impl<'a> Debug for FirstCwdFrameInBacktrace<'a> {
200 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> std::fmt::Result {
201 let cwd = std::env::current_dir().unwrap();
202 let mut print_path = |fmt: &mut fmt::Formatter<'_>, path: BytesOrWideString<'_>| {
203 fmt::Display::fmt(&path, fmt)
204 };
205 let mut fmt = BacktraceFmt::new(f, backtrace::PrintFmt::Full, &mut print_path);
206 for frame in self.0.frames() {
207 let mut formatted_frame = fmt.frame();
208 if frame
209 .symbols()
210 .iter()
211 .any(|s| s.filename().map_or(false, |f| f.starts_with(&cwd)))
212 {
213 formatted_frame.backtrace_frame(frame)?;
214 break;
215 }
216 }
217 fmt.finish()
218 }
219 }
220
221 for ((backtrace, scheduled), spawned_from_foreground) in self
222 .executed
223 .iter()
224 .zip(&self.scheduled)
225 .zip(&self.spawned_from_foreground)
226 {
227 writeln!(f, "Scheduled")?;
228 for backtrace in scheduled {
229 writeln!(f, "- {:?}", FirstCwdFrameInBacktrace(backtrace))?;
230 }
231 if scheduled.is_empty() {
232 writeln!(f, "None")?;
233 }
234 writeln!(f, "==========")?;
235
236 writeln!(f, "Spawned from foreground")?;
237 for backtrace in spawned_from_foreground {
238 writeln!(f, "- {:?}", FirstCwdFrameInBacktrace(backtrace))?;
239 }
240 if spawned_from_foreground.is_empty() {
241 writeln!(f, "None")?;
242 }
243 writeln!(f, "==========")?;
244
245 writeln!(f, "Run: {:?}", FirstCwdFrameInBacktrace(backtrace))?;
246 writeln!(f, "+++++++++++++++++++")?;
247 }
248
249 Ok(())
250 }
251}
252
253impl Drop for Trace {
254 fn drop(&mut self) {
255 let trace_on_panic = if let Ok(trace_on_panic) = std::env::var("EXECUTOR_TRACE_ON_PANIC") {
256 trace_on_panic == "1" || trace_on_panic == "true"
257 } else {
258 false
259 };
260 let trace_always = if let Ok(trace_always) = std::env::var("EXECUTOR_TRACE_ALWAYS") {
261 trace_always == "1" || trace_always == "true"
262 } else {
263 false
264 };
265
266 if trace_always || (trace_on_panic && thread::panicking()) {
267 self.resolve();
268 dbg!(self);
269 }
270 }
271}
272
273impl Foreground {
274 pub fn platform(dispatcher: Arc<dyn platform::Dispatcher>) -> Result<Self> {
275 if dispatcher.is_main_thread() {
276 Ok(Self::Platform {
277 dispatcher,
278 _not_send_or_sync: PhantomData,
279 })
280 } else {
281 Err(anyhow!("must be constructed on main thread"))
282 }
283 }
284
285 pub fn test() -> Self {
286 Self::Test(smol::LocalExecutor::new())
287 }
288
289 pub fn spawn<T: 'static>(&self, future: impl Future<Output = T> + 'static) -> Task<T> {
290 match self {
291 Self::Platform { dispatcher, .. } => {
292 let dispatcher = dispatcher.clone();
293 let schedule = move |runnable: Runnable| dispatcher.run_on_main_thread(runnable);
294 let (runnable, task) = async_task::spawn_local(future, schedule);
295 runnable.schedule();
296 task
297 }
298 Self::Test(executor) => executor.spawn(future),
299 Self::Deterministic(executor) => executor.spawn_from_foreground(future),
300 }
301 }
302
303 pub fn run<T: 'static>(&self, future: impl 'static + Future<Output = T>) -> T {
304 match self {
305 Self::Platform { .. } => panic!("you can't call run on a platform foreground executor"),
306 Self::Test(executor) => smol::block_on(executor.run(future)),
307 Self::Deterministic(executor) => executor.run(future),
308 }
309 }
310
311 pub fn reset(&self) {
312 match self {
313 Self::Platform { .. } => panic!("can't call this method on a platform executor"),
314 Self::Test(_) => panic!("can't call this method on a test executor"),
315 Self::Deterministic(executor) => {
316 let state = &mut *executor.0.lock();
317 state.rng = StdRng::seed_from_u64(state.seed);
318 }
319 }
320 }
321}
322
323impl Background {
324 pub fn new() -> Self {
325 let executor = Arc::new(Executor::new());
326 let stop = channel::unbounded::<()>();
327 let threads = num_cpus::get();
328
329 for i in 0..threads {
330 let executor = executor.clone();
331 let stop = stop.1.clone();
332 thread::Builder::new()
333 .name(format!("background-executor-{}", i))
334 .spawn(move || smol::block_on(executor.run(stop.recv())))
335 .unwrap();
336 }
337
338 Self::Production {
339 executor,
340 threads,
341 _stop: stop.0,
342 }
343 }
344
345 pub fn threads(&self) -> usize {
346 match self {
347 Self::Deterministic(_) => 1,
348 Self::Production { threads, .. } => *threads,
349 }
350 }
351
352 pub fn spawn<T, F>(&self, future: F) -> Task<T>
353 where
354 T: 'static + Send,
355 F: Send + Future<Output = T> + 'static,
356 {
357 match self {
358 Self::Production { executor, .. } => executor.spawn(future),
359 Self::Deterministic(executor) => executor.spawn(future),
360 }
361 }
362
363 pub async fn scoped<'scope, F>(&self, scheduler: F)
364 where
365 F: FnOnce(&mut Scope<'scope>),
366 {
367 let mut scope = Scope {
368 futures: Default::default(),
369 _phantom: PhantomData,
370 };
371 (scheduler)(&mut scope);
372 let spawned = scope
373 .futures
374 .into_iter()
375 .map(|f| self.spawn(f))
376 .collect::<Vec<_>>();
377 for task in spawned {
378 task.await;
379 }
380 }
381}
382
383pub struct Scope<'a> {
384 futures: Vec<Pin<Box<dyn Future<Output = ()> + Send + 'static>>>,
385 _phantom: PhantomData<&'a ()>,
386}
387
388impl<'a> Scope<'a> {
389 pub fn spawn<F>(&mut self, f: F)
390 where
391 F: Future<Output = ()> + Send + 'a,
392 {
393 let f = unsafe {
394 mem::transmute::<
395 Pin<Box<dyn Future<Output = ()> + Send + 'a>>,
396 Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
397 >(Box::pin(f))
398 };
399 self.futures.push(f);
400 }
401}
402
403pub fn deterministic(seed: u64) -> (Rc<Foreground>, Arc<Background>) {
404 let executor = Arc::new(Deterministic::new(seed));
405 (
406 Rc::new(Foreground::Deterministic(executor.clone())),
407 Arc::new(Background::Deterministic(executor)),
408 )
409}