1use anyhow::{anyhow, Result};
2use async_task::Runnable;
3use backtrace::{Backtrace, BacktraceFmt, BytesOrWideString};
4use collections::HashMap;
5use parking_lot::Mutex;
6use postage::{barrier, prelude::Stream as _};
7use rand::prelude::*;
8use smol::{channel, prelude::*, Executor, Timer};
9use std::{
10 any::Any,
11 fmt::{self, Debug, Display},
12 marker::PhantomData,
13 mem,
14 ops::RangeInclusive,
15 pin::Pin,
16 rc::Rc,
17 sync::{
18 atomic::{AtomicBool, Ordering::SeqCst},
19 Arc,
20 },
21 task::{Context, Poll},
22 thread,
23 time::{Duration, Instant},
24};
25use waker_fn::waker_fn;
26
27use crate::{
28 platform::{self, Dispatcher},
29 util, MutableAppContext,
30};
31
32pub enum Foreground {
33 Platform {
34 dispatcher: Arc<dyn platform::Dispatcher>,
35 _not_send_or_sync: PhantomData<Rc<()>>,
36 },
37 Deterministic {
38 cx_id: usize,
39 executor: Arc<Deterministic>,
40 },
41}
42
43pub enum Background {
44 Deterministic {
45 executor: Arc<Deterministic>,
46 },
47 Production {
48 executor: Arc<smol::Executor<'static>>,
49 _stop: channel::Sender<()>,
50 },
51}
52
53type AnyLocalFuture = Pin<Box<dyn 'static + Future<Output = Box<dyn Any + 'static>>>>;
54type AnyFuture = Pin<Box<dyn 'static + Send + Future<Output = Box<dyn Any + Send + 'static>>>>;
55type AnyTask = async_task::Task<Box<dyn Any + Send + 'static>>;
56type AnyLocalTask = async_task::Task<Box<dyn Any + 'static>>;
57
58#[must_use]
59pub enum Task<T> {
60 Ready(Option<T>),
61 Local {
62 any_task: AnyLocalTask,
63 result_type: PhantomData<T>,
64 },
65 Send {
66 any_task: AnyTask,
67 result_type: PhantomData<T>,
68 },
69}
70
71unsafe impl<T: Send> Send for Task<T> {}
72
73struct DeterministicState {
74 rng: StdRng,
75 seed: u64,
76 scheduled_from_foreground: HashMap<usize, Vec<ScheduledForeground>>,
77 scheduled_from_background: Vec<Runnable>,
78 forbid_parking: bool,
79 block_on_ticks: RangeInclusive<usize>,
80 now: Instant,
81 pending_timers: Vec<(Instant, barrier::Sender)>,
82 waiting_backtrace: Option<Backtrace>,
83}
84
85enum ScheduledForeground {
86 MainFuture,
87 Runnable(Runnable),
88}
89
90pub struct Deterministic {
91 state: Arc<Mutex<DeterministicState>>,
92 parker: Mutex<parking::Parker>,
93}
94
95impl Deterministic {
96 pub fn new(seed: u64) -> Arc<Self> {
97 Arc::new(Self {
98 state: Arc::new(Mutex::new(DeterministicState {
99 rng: StdRng::seed_from_u64(seed),
100 seed,
101 scheduled_from_foreground: Default::default(),
102 scheduled_from_background: Default::default(),
103 forbid_parking: false,
104 block_on_ticks: 0..=1000,
105 now: Instant::now(),
106 pending_timers: Default::default(),
107 waiting_backtrace: None,
108 })),
109 parker: Default::default(),
110 })
111 }
112
113 pub fn build_background(self: &Arc<Self>) -> Arc<Background> {
114 Arc::new(Background::Deterministic {
115 executor: self.clone(),
116 })
117 }
118
119 pub fn build_foreground(self: &Arc<Self>, id: usize) -> Rc<Foreground> {
120 Rc::new(Foreground::Deterministic {
121 cx_id: id,
122 executor: self.clone(),
123 })
124 }
125
126 fn spawn_from_foreground(&self, cx_id: usize, future: AnyLocalFuture) -> AnyLocalTask {
127 let state = self.state.clone();
128 let unparker = self.parker.lock().unparker();
129 let (runnable, task) = async_task::spawn_local(future, move |runnable| {
130 let mut state = state.lock();
131 state
132 .scheduled_from_foreground
133 .entry(cx_id)
134 .or_default()
135 .push(ScheduledForeground::Runnable(runnable));
136 unparker.unpark();
137 });
138 runnable.schedule();
139 task
140 }
141
142 fn spawn(&self, future: AnyFuture) -> AnyTask {
143 let state = self.state.clone();
144 let unparker = self.parker.lock().unparker();
145 let (runnable, task) = async_task::spawn(future, move |runnable| {
146 let mut state = state.lock();
147 state.scheduled_from_background.push(runnable);
148 unparker.unpark();
149 });
150 runnable.schedule();
151 task
152 }
153
154 fn run(&self, cx_id: usize, mut future: AnyLocalFuture) -> Box<dyn Any> {
155 let woken = Arc::new(AtomicBool::new(false));
156 loop {
157 if let Some(result) = self.run_internal(cx_id, woken.clone(), &mut future) {
158 return result;
159 }
160
161 if !woken.load(SeqCst) {
162 self.state.lock().will_park();
163 }
164
165 woken.store(false, SeqCst);
166 self.parker.lock().park();
167 }
168 }
169
170 fn run_until_parked(&self, cx_id: usize) {
171 let woken = Arc::new(AtomicBool::new(false));
172 let mut future = any_local_future(std::future::pending::<()>());
173 self.run_internal(cx_id, woken, &mut future);
174 }
175
176 fn run_internal(
177 &self,
178 cx_id: usize,
179 woken: Arc<AtomicBool>,
180 future: &mut AnyLocalFuture,
181 ) -> Option<Box<dyn Any>> {
182 let unparker = self.parker.lock().unparker();
183 let scheduled_main_future = Arc::new(AtomicBool::new(true));
184 self.state
185 .lock()
186 .scheduled_from_foreground
187 .entry(cx_id)
188 .or_default()
189 .insert(0, ScheduledForeground::MainFuture);
190
191 let waker = waker_fn({
192 let state = self.state.clone();
193 let scheduled_main_future = scheduled_main_future.clone();
194 move || {
195 woken.store(true, SeqCst);
196 if !scheduled_main_future.load(SeqCst) {
197 scheduled_main_future.store(true, SeqCst);
198 state
199 .lock()
200 .scheduled_from_foreground
201 .entry(cx_id)
202 .or_default()
203 .push(ScheduledForeground::MainFuture);
204 }
205
206 unparker.unpark();
207 }
208 });
209
210 let mut cx = Context::from_waker(&waker);
211 loop {
212 let mut state = self.state.lock();
213
214 if state.scheduled_from_foreground.is_empty()
215 && state.scheduled_from_background.is_empty()
216 {
217 return None;
218 }
219
220 if !state.scheduled_from_background.is_empty() && state.rng.gen() {
221 let background_len = state.scheduled_from_background.len();
222 let ix = state.rng.gen_range(0..background_len);
223 let runnable = state.scheduled_from_background.remove(ix);
224 drop(state);
225 runnable.run();
226 } else if !state.scheduled_from_foreground.is_empty() {
227 let available_cx_ids = state
228 .scheduled_from_foreground
229 .keys()
230 .copied()
231 .collect::<Vec<_>>();
232 let cx_id_to_run = *available_cx_ids.iter().choose(&mut state.rng).unwrap();
233 let scheduled_from_cx = state
234 .scheduled_from_foreground
235 .get_mut(&cx_id_to_run)
236 .unwrap();
237 let runnable = scheduled_from_cx.remove(0);
238 if scheduled_from_cx.is_empty() {
239 state.scheduled_from_foreground.remove(&cx_id_to_run);
240 }
241
242 drop(state);
243 match runnable {
244 ScheduledForeground::MainFuture => {
245 scheduled_main_future.store(false, SeqCst);
246 if let Poll::Ready(result) = future.poll(&mut cx) {
247 return Some(result);
248 }
249 }
250 ScheduledForeground::Runnable(runnable) => {
251 runnable.run();
252 }
253 }
254 } else {
255 return None;
256 }
257 }
258 }
259
260 fn block_on(&self, future: &mut AnyLocalFuture) -> Option<Box<dyn Any>> {
261 let unparker = self.parker.lock().unparker();
262 let waker = waker_fn(move || {
263 unparker.unpark();
264 });
265 let max_ticks = {
266 let mut state = self.state.lock();
267 let range = state.block_on_ticks.clone();
268 state.rng.gen_range(range)
269 };
270
271 let mut cx = Context::from_waker(&waker);
272 for _ in 0..max_ticks {
273 let mut state = self.state.lock();
274 let runnable_count = state.scheduled_from_background.len();
275 let ix = state.rng.gen_range(0..=runnable_count);
276 if ix < state.scheduled_from_background.len() {
277 let runnable = state.scheduled_from_background.remove(ix);
278 drop(state);
279 runnable.run();
280 } else {
281 drop(state);
282 if let Poll::Ready(result) = future.as_mut().poll(&mut cx) {
283 return Some(result);
284 }
285 let mut state = self.state.lock();
286 if state.scheduled_from_background.is_empty() {
287 state.will_park();
288 drop(state);
289 self.parker.lock().park();
290 }
291
292 continue;
293 }
294 }
295
296 None
297 }
298}
299
300impl DeterministicState {
301 fn will_park(&mut self) {
302 if self.forbid_parking {
303 let mut backtrace_message = String::new();
304 if let Some(backtrace) = self.waiting_backtrace.as_mut() {
305 backtrace.resolve();
306 backtrace_message = format!(
307 "\nbacktrace of waiting future:\n{:?}",
308 CwdBacktrace::new(backtrace)
309 );
310 }
311
312 panic!(
313 "deterministic executor parked after a call to forbid_parking{}",
314 backtrace_message
315 );
316 }
317 }
318}
319
320struct CwdBacktrace<'a> {
321 backtrace: &'a Backtrace,
322}
323
324impl<'a> CwdBacktrace<'a> {
325 fn new(backtrace: &'a Backtrace) -> Self {
326 Self { backtrace }
327 }
328}
329
330impl<'a> Debug for CwdBacktrace<'a> {
331 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> std::fmt::Result {
332 let cwd = std::env::current_dir().unwrap();
333 let mut print_path = |fmt: &mut fmt::Formatter<'_>, path: BytesOrWideString<'_>| {
334 fmt::Display::fmt(&path, fmt)
335 };
336 let mut fmt = BacktraceFmt::new(f, backtrace::PrintFmt::Full, &mut print_path);
337 for frame in self.backtrace.frames() {
338 let mut formatted_frame = fmt.frame();
339 if frame
340 .symbols()
341 .iter()
342 .any(|s| s.filename().map_or(false, |f| f.starts_with(&cwd)))
343 {
344 formatted_frame.backtrace_frame(frame)?;
345 }
346 }
347 fmt.finish()
348 }
349}
350
351impl Foreground {
352 pub fn platform(dispatcher: Arc<dyn platform::Dispatcher>) -> Result<Self> {
353 if dispatcher.is_main_thread() {
354 Ok(Self::Platform {
355 dispatcher,
356 _not_send_or_sync: PhantomData,
357 })
358 } else {
359 Err(anyhow!("must be constructed on main thread"))
360 }
361 }
362
363 pub fn spawn<T: 'static>(&self, future: impl Future<Output = T> + 'static) -> Task<T> {
364 let future = any_local_future(future);
365 let any_task = match self {
366 Self::Deterministic { cx_id, executor } => {
367 executor.spawn_from_foreground(*cx_id, future)
368 }
369 Self::Platform { dispatcher, .. } => {
370 fn spawn_inner(
371 future: AnyLocalFuture,
372 dispatcher: &Arc<dyn Dispatcher>,
373 ) -> AnyLocalTask {
374 let dispatcher = dispatcher.clone();
375 let schedule =
376 move |runnable: Runnable| dispatcher.run_on_main_thread(runnable);
377 let (runnable, task) = async_task::spawn_local(future, schedule);
378 runnable.schedule();
379 task
380 }
381 spawn_inner(future, dispatcher)
382 }
383 };
384 Task::local(any_task)
385 }
386
387 pub fn run<T: 'static>(&self, future: impl 'static + Future<Output = T>) -> T {
388 let future = any_local_future(future);
389 let any_value = match self {
390 Self::Deterministic { cx_id, executor } => executor.run(*cx_id, future),
391 Self::Platform { .. } => panic!("you can't call run on a platform foreground executor"),
392 };
393 *any_value.downcast().unwrap()
394 }
395
396 pub fn parking_forbidden(&self) -> bool {
397 match self {
398 Self::Deterministic { executor, .. } => executor.state.lock().forbid_parking,
399 _ => panic!("this method can only be called on a deterministic executor"),
400 }
401 }
402
403 pub fn start_waiting(&self) {
404 match self {
405 Self::Deterministic { executor, .. } => {
406 executor.state.lock().waiting_backtrace = Some(Backtrace::new_unresolved());
407 }
408 _ => panic!("this method can only be called on a deterministic executor"),
409 }
410 }
411
412 pub fn finish_waiting(&self) {
413 match self {
414 Self::Deterministic { executor, .. } => {
415 executor.state.lock().waiting_backtrace.take();
416 }
417 _ => panic!("this method can only be called on a deterministic executor"),
418 }
419 }
420
421 pub fn forbid_parking(&self) {
422 match self {
423 Self::Deterministic { executor, .. } => {
424 let mut state = executor.state.lock();
425 state.forbid_parking = true;
426 state.rng = StdRng::seed_from_u64(state.seed);
427 }
428 _ => panic!("this method can only be called on a deterministic executor"),
429 }
430 }
431
432 pub async fn timer(&self, duration: Duration) {
433 match self {
434 Self::Deterministic { executor, .. } => {
435 let (tx, mut rx) = barrier::channel();
436 {
437 let mut state = executor.state.lock();
438 let wakeup_at = state.now + duration;
439 state.pending_timers.push((wakeup_at, tx));
440 }
441 rx.recv().await;
442 }
443 _ => {
444 Timer::after(duration).await;
445 }
446 }
447 }
448
449 pub fn advance_clock(&self, duration: Duration) {
450 match self {
451 Self::Deterministic { cx_id, executor } => {
452 executor.run_until_parked(*cx_id);
453
454 let mut state = executor.state.lock();
455 state.now += duration;
456 let now = state.now;
457 let mut pending_timers = mem::take(&mut state.pending_timers);
458 drop(state);
459
460 pending_timers.retain(|(wakeup, _)| *wakeup > now);
461 executor.state.lock().pending_timers.extend(pending_timers);
462 }
463 _ => panic!("this method can only be called on a deterministic executor"),
464 }
465 }
466
467 pub fn set_block_on_ticks(&self, range: RangeInclusive<usize>) {
468 match self {
469 Self::Deterministic { executor, .. } => executor.state.lock().block_on_ticks = range,
470 _ => panic!("this method can only be called on a deterministic executor"),
471 }
472 }
473}
474
475impl Background {
476 pub fn new() -> Self {
477 let executor = Arc::new(Executor::new());
478 let stop = channel::unbounded::<()>();
479
480 for i in 0..2 * num_cpus::get() {
481 let executor = executor.clone();
482 let stop = stop.1.clone();
483 thread::Builder::new()
484 .name(format!("background-executor-{}", i))
485 .spawn(move || smol::block_on(executor.run(stop.recv())))
486 .unwrap();
487 }
488
489 Self::Production {
490 executor,
491 _stop: stop.0,
492 }
493 }
494
495 pub fn num_cpus(&self) -> usize {
496 num_cpus::get()
497 }
498
499 pub fn spawn<T, F>(&self, future: F) -> Task<T>
500 where
501 T: 'static + Send,
502 F: Send + Future<Output = T> + 'static,
503 {
504 let future = any_future(future);
505 let any_task = match self {
506 Self::Production { executor, .. } => executor.spawn(future),
507 Self::Deterministic { executor } => executor.spawn(future),
508 };
509 Task::send(any_task)
510 }
511
512 pub fn block_with_timeout<F, T>(
513 &self,
514 timeout: Duration,
515 future: F,
516 ) -> Result<T, impl Future<Output = T>>
517 where
518 T: 'static,
519 F: 'static + Unpin + Future<Output = T>,
520 {
521 let mut future = any_local_future(future);
522 if !timeout.is_zero() {
523 let output = match self {
524 Self::Production { .. } => smol::block_on(util::timeout(timeout, &mut future)).ok(),
525 Self::Deterministic { executor, .. } => executor.block_on(&mut future),
526 };
527 if let Some(output) = output {
528 return Ok(*output.downcast().unwrap());
529 }
530 }
531 Err(async { *future.await.downcast().unwrap() })
532 }
533
534 pub async fn scoped<'scope, F>(&self, scheduler: F)
535 where
536 F: FnOnce(&mut Scope<'scope>),
537 {
538 let mut scope = Scope {
539 futures: Default::default(),
540 _phantom: PhantomData,
541 };
542 (scheduler)(&mut scope);
543 let spawned = scope
544 .futures
545 .into_iter()
546 .map(|f| self.spawn(f))
547 .collect::<Vec<_>>();
548 for task in spawned {
549 task.await;
550 }
551 }
552}
553
554pub struct Scope<'a> {
555 futures: Vec<Pin<Box<dyn Future<Output = ()> + Send + 'static>>>,
556 _phantom: PhantomData<&'a ()>,
557}
558
559impl<'a> Scope<'a> {
560 pub fn spawn<F>(&mut self, f: F)
561 where
562 F: Future<Output = ()> + Send + 'a,
563 {
564 let f = unsafe {
565 mem::transmute::<
566 Pin<Box<dyn Future<Output = ()> + Send + 'a>>,
567 Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
568 >(Box::pin(f))
569 };
570 self.futures.push(f);
571 }
572}
573
574impl<T> Task<T> {
575 pub fn ready(value: T) -> Self {
576 Self::Ready(Some(value))
577 }
578
579 fn local(any_task: AnyLocalTask) -> Self {
580 Self::Local {
581 any_task,
582 result_type: PhantomData,
583 }
584 }
585
586 pub fn detach(self) {
587 match self {
588 Task::Ready(_) => {}
589 Task::Local { any_task, .. } => any_task.detach(),
590 Task::Send { any_task, .. } => any_task.detach(),
591 }
592 }
593}
594
595impl<T: 'static, E: 'static + Display> Task<Result<T, E>> {
596 pub fn detach_and_log_err(self, cx: &mut MutableAppContext) {
597 cx.spawn(|_| async move {
598 if let Err(err) = self.await {
599 log::error!("{}", err);
600 }
601 })
602 .detach();
603 }
604}
605
606impl<T: Send> Task<T> {
607 fn send(any_task: AnyTask) -> Self {
608 Self::Send {
609 any_task,
610 result_type: PhantomData,
611 }
612 }
613}
614
615impl<T: fmt::Debug> fmt::Debug for Task<T> {
616 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
617 match self {
618 Task::Ready(value) => value.fmt(f),
619 Task::Local { any_task, .. } => any_task.fmt(f),
620 Task::Send { any_task, .. } => any_task.fmt(f),
621 }
622 }
623}
624
625impl<T: 'static> Future for Task<T> {
626 type Output = T;
627
628 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
629 match unsafe { self.get_unchecked_mut() } {
630 Task::Ready(value) => Poll::Ready(value.take().unwrap()),
631 Task::Local { any_task, .. } => {
632 any_task.poll(cx).map(|value| *value.downcast().unwrap())
633 }
634 Task::Send { any_task, .. } => {
635 any_task.poll(cx).map(|value| *value.downcast().unwrap())
636 }
637 }
638 }
639}
640
641fn any_future<T, F>(future: F) -> AnyFuture
642where
643 T: 'static + Send,
644 F: Future<Output = T> + Send + 'static,
645{
646 async { Box::new(future.await) as Box<dyn Any + Send> }.boxed()
647}
648
649fn any_local_future<T, F>(future: F) -> AnyLocalFuture
650where
651 T: 'static,
652 F: Future<Output = T> + 'static,
653{
654 async { Box::new(future.await) as Box<dyn Any> }.boxed_local()
655}