executor.rs

  1use anyhow::{anyhow, Result};
  2use async_task::Runnable;
  3use backtrace::Backtrace;
  4use collections::HashMap;
  5use parking_lot::Mutex;
  6use postage::{barrier, prelude::Stream as _};
  7use rand::prelude::*;
  8use smol::{channel, future::yield_now, prelude::*, Executor, Timer};
  9use std::{
 10    any::Any,
 11    fmt::{self, Display},
 12    marker::PhantomData,
 13    mem,
 14    ops::RangeInclusive,
 15    pin::Pin,
 16    rc::Rc,
 17    sync::{
 18        atomic::{AtomicBool, Ordering::SeqCst},
 19        Arc,
 20    },
 21    task::{Context, Poll},
 22    thread,
 23    time::{Duration, Instant},
 24};
 25use waker_fn::waker_fn;
 26
 27use crate::{
 28    platform::{self, Dispatcher},
 29    util, MutableAppContext,
 30};
 31
 32pub enum Foreground {
 33    Platform {
 34        dispatcher: Arc<dyn platform::Dispatcher>,
 35        _not_send_or_sync: PhantomData<Rc<()>>,
 36    },
 37    Deterministic {
 38        cx_id: usize,
 39        executor: Arc<Deterministic>,
 40    },
 41}
 42
 43pub enum Background {
 44    Deterministic {
 45        executor: Arc<Deterministic>,
 46    },
 47    Production {
 48        executor: Arc<smol::Executor<'static>>,
 49        _stop: channel::Sender<()>,
 50    },
 51}
 52
 53type AnyLocalFuture = Pin<Box<dyn 'static + Future<Output = Box<dyn Any + 'static>>>>;
 54type AnyFuture = Pin<Box<dyn 'static + Send + Future<Output = Box<dyn Any + Send + 'static>>>>;
 55type AnyTask = async_task::Task<Box<dyn Any + Send + 'static>>;
 56type AnyLocalTask = async_task::Task<Box<dyn Any + 'static>>;
 57
 58#[must_use]
 59pub enum Task<T> {
 60    Ready(Option<T>),
 61    Local {
 62        any_task: AnyLocalTask,
 63        result_type: PhantomData<T>,
 64    },
 65    Send {
 66        any_task: AnyTask,
 67        result_type: PhantomData<T>,
 68    },
 69}
 70
 71unsafe impl<T: Send> Send for Task<T> {}
 72
 73struct DeterministicState {
 74    rng: StdRng,
 75    seed: u64,
 76    scheduled_from_foreground: HashMap<usize, Vec<ForegroundRunnable>>,
 77    scheduled_from_background: Vec<Runnable>,
 78    forbid_parking: bool,
 79    block_on_ticks: RangeInclusive<usize>,
 80    now: Instant,
 81    pending_timers: Vec<(Instant, barrier::Sender)>,
 82    waiting_backtrace: Option<Backtrace>,
 83}
 84
 85struct ForegroundRunnable {
 86    runnable: Runnable,
 87    main: bool,
 88}
 89
 90pub struct Deterministic {
 91    state: Arc<Mutex<DeterministicState>>,
 92    parker: Mutex<parking::Parker>,
 93}
 94
 95impl Deterministic {
 96    pub fn new(seed: u64) -> Arc<Self> {
 97        Arc::new(Self {
 98            state: Arc::new(Mutex::new(DeterministicState {
 99                rng: StdRng::seed_from_u64(seed),
100                seed,
101                scheduled_from_foreground: Default::default(),
102                scheduled_from_background: Default::default(),
103                forbid_parking: false,
104                block_on_ticks: 0..=1000,
105                now: Instant::now(),
106                pending_timers: Default::default(),
107                waiting_backtrace: None,
108            })),
109            parker: Default::default(),
110        })
111    }
112
113    pub fn build_background(self: &Arc<Self>) -> Arc<Background> {
114        Arc::new(Background::Deterministic {
115            executor: self.clone(),
116        })
117    }
118
119    pub fn build_foreground(self: &Arc<Self>, id: usize) -> Rc<Foreground> {
120        Rc::new(Foreground::Deterministic {
121            cx_id: id,
122            executor: self.clone(),
123        })
124    }
125
126    fn spawn_from_foreground(
127        &self,
128        cx_id: usize,
129        future: AnyLocalFuture,
130        main: bool,
131    ) -> AnyLocalTask {
132        let state = self.state.clone();
133        let unparker = self.parker.lock().unparker();
134        let (runnable, task) = async_task::spawn_local(future, move |runnable| {
135            let mut state = state.lock();
136            state
137                .scheduled_from_foreground
138                .entry(cx_id)
139                .or_default()
140                .push(ForegroundRunnable { runnable, main });
141            unparker.unpark();
142        });
143        runnable.schedule();
144        task
145    }
146
147    fn spawn(&self, future: AnyFuture) -> AnyTask {
148        let state = self.state.clone();
149        let unparker = self.parker.lock().unparker();
150        let (runnable, task) = async_task::spawn(future, move |runnable| {
151            let mut state = state.lock();
152            state.scheduled_from_background.push(runnable);
153            unparker.unpark();
154        });
155        runnable.schedule();
156        task
157    }
158
159    fn run<'a>(
160        &self,
161        cx_id: usize,
162        main_future: Pin<Box<dyn 'a + Future<Output = Box<dyn Any>>>>,
163    ) -> Box<dyn Any> {
164        let woken = Arc::new(AtomicBool::new(false));
165
166        let state = self.state.clone();
167        let unparker = self.parker.lock().unparker();
168        let (runnable, mut main_task) = unsafe {
169            async_task::spawn_unchecked(main_future, move |runnable| {
170                let mut state = state.lock();
171                state
172                    .scheduled_from_foreground
173                    .entry(cx_id)
174                    .or_default()
175                    .push(ForegroundRunnable {
176                        runnable,
177                        main: true,
178                    });
179                unparker.unpark();
180            })
181        };
182        runnable.schedule();
183
184        loop {
185            if let Some(result) = self.run_internal(woken.clone(), Some(&mut main_task)) {
186                return result;
187            }
188
189            if !woken.load(SeqCst) {
190                self.state.lock().will_park();
191            }
192
193            woken.store(false, SeqCst);
194            self.parker.lock().park();
195        }
196    }
197
198    pub(crate) fn run_until_parked(&self) {
199        let woken = Arc::new(AtomicBool::new(false));
200        self.run_internal(woken, None);
201    }
202
203    fn run_internal(
204        &self,
205        woken: Arc<AtomicBool>,
206        mut main_task: Option<&mut AnyLocalTask>,
207    ) -> Option<Box<dyn Any>> {
208        let unparker = self.parker.lock().unparker();
209        let waker = waker_fn(move || {
210            woken.store(true, SeqCst);
211            unparker.unpark();
212        });
213
214        let mut cx = Context::from_waker(&waker);
215        loop {
216            let mut state = self.state.lock();
217
218            if state.scheduled_from_foreground.is_empty()
219                && state.scheduled_from_background.is_empty()
220            {
221                return None;
222            }
223
224            if !state.scheduled_from_background.is_empty() && state.rng.gen() {
225                let background_len = state.scheduled_from_background.len();
226                let ix = state.rng.gen_range(0..background_len);
227                let runnable = state.scheduled_from_background.remove(ix);
228                drop(state);
229                runnable.run();
230            } else if !state.scheduled_from_foreground.is_empty() {
231                let available_cx_ids = state
232                    .scheduled_from_foreground
233                    .keys()
234                    .copied()
235                    .collect::<Vec<_>>();
236                let cx_id_to_run = *available_cx_ids.iter().choose(&mut state.rng).unwrap();
237                let scheduled_from_cx = state
238                    .scheduled_from_foreground
239                    .get_mut(&cx_id_to_run)
240                    .unwrap();
241                let foreground_runnable = scheduled_from_cx.remove(0);
242                if scheduled_from_cx.is_empty() {
243                    state.scheduled_from_foreground.remove(&cx_id_to_run);
244                }
245
246                drop(state);
247
248                foreground_runnable.runnable.run();
249                if let Some(main_task) = main_task.as_mut() {
250                    if foreground_runnable.main {
251                        if let Poll::Ready(result) = main_task.poll(&mut cx) {
252                            return Some(result);
253                        }
254                    }
255                }
256            }
257        }
258    }
259
260    fn block<F, T>(&self, future: &mut F, max_ticks: usize) -> Option<T>
261    where
262        F: Unpin + Future<Output = T>,
263    {
264        let unparker = self.parker.lock().unparker();
265        let waker = waker_fn(move || {
266            unparker.unpark();
267        });
268
269        let mut cx = Context::from_waker(&waker);
270        for _ in 0..max_ticks {
271            let mut state = self.state.lock();
272            let runnable_count = state.scheduled_from_background.len();
273            let ix = state.rng.gen_range(0..=runnable_count);
274            if ix < state.scheduled_from_background.len() {
275                let runnable = state.scheduled_from_background.remove(ix);
276                drop(state);
277                runnable.run();
278            } else {
279                drop(state);
280                if let Poll::Ready(result) = future.poll(&mut cx) {
281                    return Some(result);
282                }
283                let mut state = self.state.lock();
284                if state.scheduled_from_background.is_empty() {
285                    state.will_park();
286                    drop(state);
287                    self.parker.lock().park();
288                }
289
290                continue;
291            }
292        }
293
294        None
295    }
296}
297
298impl DeterministicState {
299    fn will_park(&mut self) {
300        if self.forbid_parking {
301            let mut backtrace_message = String::new();
302            if let Some(backtrace) = self.waiting_backtrace.as_mut() {
303                backtrace.resolve();
304                backtrace_message = format!(
305                    "\nbacktrace of waiting future:\n{:?}",
306                    util::CwdBacktrace(backtrace)
307                );
308            }
309
310            panic!(
311                "deterministic executor parked after a call to forbid_parking{}",
312                backtrace_message
313            );
314        }
315    }
316}
317
318impl Foreground {
319    pub fn platform(dispatcher: Arc<dyn platform::Dispatcher>) -> Result<Self> {
320        if dispatcher.is_main_thread() {
321            Ok(Self::Platform {
322                dispatcher,
323                _not_send_or_sync: PhantomData,
324            })
325        } else {
326            Err(anyhow!("must be constructed on main thread"))
327        }
328    }
329
330    pub fn spawn<T: 'static>(&self, future: impl Future<Output = T> + 'static) -> Task<T> {
331        let future = any_local_future(future);
332        let any_task = match self {
333            Self::Deterministic { cx_id, executor } => {
334                executor.spawn_from_foreground(*cx_id, future, false)
335            }
336            Self::Platform { dispatcher, .. } => {
337                fn spawn_inner(
338                    future: AnyLocalFuture,
339                    dispatcher: &Arc<dyn Dispatcher>,
340                ) -> AnyLocalTask {
341                    let dispatcher = dispatcher.clone();
342                    let schedule =
343                        move |runnable: Runnable| dispatcher.run_on_main_thread(runnable);
344                    let (runnable, task) = async_task::spawn_local(future, schedule);
345                    runnable.schedule();
346                    task
347                }
348                spawn_inner(future, dispatcher)
349            }
350        };
351        Task::local(any_task)
352    }
353
354    pub fn run<T: 'static>(&self, future: impl Future<Output = T>) -> T {
355        let future = async move { Box::new(future.await) as Box<dyn Any> }.boxed_local();
356        let result = match self {
357            Self::Deterministic { cx_id, executor } => executor.run(*cx_id, future),
358            Self::Platform { .. } => panic!("you can't call run on a platform foreground executor"),
359        };
360        *result.downcast().unwrap()
361    }
362
363    pub fn run_until_parked(&self) {
364        match self {
365            Self::Deterministic { executor, .. } => executor.run_until_parked(),
366            _ => panic!("this method can only be called on a deterministic executor"),
367        }
368    }
369
370    pub fn parking_forbidden(&self) -> bool {
371        match self {
372            Self::Deterministic { executor, .. } => executor.state.lock().forbid_parking,
373            _ => panic!("this method can only be called on a deterministic executor"),
374        }
375    }
376
377    pub fn start_waiting(&self) {
378        match self {
379            Self::Deterministic { executor, .. } => {
380                executor.state.lock().waiting_backtrace = Some(Backtrace::new_unresolved());
381            }
382            _ => panic!("this method can only be called on a deterministic executor"),
383        }
384    }
385
386    pub fn finish_waiting(&self) {
387        match self {
388            Self::Deterministic { executor, .. } => {
389                executor.state.lock().waiting_backtrace.take();
390            }
391            _ => panic!("this method can only be called on a deterministic executor"),
392        }
393    }
394
395    pub fn forbid_parking(&self) {
396        match self {
397            Self::Deterministic { executor, .. } => {
398                let mut state = executor.state.lock();
399                state.forbid_parking = true;
400                state.rng = StdRng::seed_from_u64(state.seed);
401            }
402            _ => panic!("this method can only be called on a deterministic executor"),
403        }
404    }
405
406    pub async fn timer(&self, duration: Duration) {
407        match self {
408            Self::Deterministic { executor, .. } => {
409                let (tx, mut rx) = barrier::channel();
410                {
411                    let mut state = executor.state.lock();
412                    let wakeup_at = state.now + duration;
413                    state.pending_timers.push((wakeup_at, tx));
414                }
415                rx.recv().await;
416            }
417            _ => {
418                Timer::after(duration).await;
419            }
420        }
421    }
422
423    pub fn advance_clock(&self, duration: Duration) {
424        match self {
425            Self::Deterministic { executor, .. } => {
426                executor.run_until_parked();
427
428                let mut state = executor.state.lock();
429                state.now += duration;
430                let now = state.now;
431                let mut pending_timers = mem::take(&mut state.pending_timers);
432                drop(state);
433
434                pending_timers.retain(|(wakeup, _)| *wakeup > now);
435                executor.state.lock().pending_timers.extend(pending_timers);
436            }
437            _ => panic!("this method can only be called on a deterministic executor"),
438        }
439    }
440
441    pub fn set_block_on_ticks(&self, range: RangeInclusive<usize>) {
442        match self {
443            Self::Deterministic { executor, .. } => executor.state.lock().block_on_ticks = range,
444            _ => panic!("this method can only be called on a deterministic executor"),
445        }
446    }
447}
448
449impl Background {
450    pub fn new() -> Self {
451        let executor = Arc::new(Executor::new());
452        let stop = channel::unbounded::<()>();
453
454        for i in 0..2 * num_cpus::get() {
455            let executor = executor.clone();
456            let stop = stop.1.clone();
457            thread::Builder::new()
458                .name(format!("background-executor-{}", i))
459                .spawn(move || smol::block_on(executor.run(stop.recv())))
460                .unwrap();
461        }
462
463        Self::Production {
464            executor,
465            _stop: stop.0,
466        }
467    }
468
469    pub fn num_cpus(&self) -> usize {
470        num_cpus::get()
471    }
472
473    pub fn spawn<T, F>(&self, future: F) -> Task<T>
474    where
475        T: 'static + Send,
476        F: Send + Future<Output = T> + 'static,
477    {
478        let future = any_future(future);
479        let any_task = match self {
480            Self::Production { executor, .. } => executor.spawn(future),
481            Self::Deterministic { executor } => executor.spawn(future),
482        };
483        Task::send(any_task)
484    }
485
486    pub fn block<F, T>(&self, future: F) -> T
487    where
488        F: Future<Output = T>,
489    {
490        smol::pin!(future);
491        match self {
492            Self::Production { .. } => smol::block_on(&mut future),
493            Self::Deterministic { executor, .. } => {
494                executor.block(&mut future, usize::MAX).unwrap()
495            }
496        }
497    }
498
499    pub fn block_with_timeout<F, T>(
500        &self,
501        timeout: Duration,
502        future: F,
503    ) -> Result<T, impl Future<Output = T>>
504    where
505        T: 'static,
506        F: 'static + Unpin + Future<Output = T>,
507    {
508        let mut future = any_local_future(future);
509        if !timeout.is_zero() {
510            let output = match self {
511                Self::Production { .. } => smol::block_on(util::timeout(timeout, &mut future)).ok(),
512                Self::Deterministic { executor, .. } => {
513                    let max_ticks = {
514                        let mut state = executor.state.lock();
515                        let range = state.block_on_ticks.clone();
516                        state.rng.gen_range(range)
517                    };
518                    executor.block(&mut future, max_ticks)
519                }
520            };
521            if let Some(output) = output {
522                return Ok(*output.downcast().unwrap());
523            }
524        }
525        Err(async { *future.await.downcast().unwrap() })
526    }
527
528    pub async fn scoped<'scope, F>(&self, scheduler: F)
529    where
530        F: FnOnce(&mut Scope<'scope>),
531    {
532        let mut scope = Scope {
533            futures: Default::default(),
534            _phantom: PhantomData,
535        };
536        (scheduler)(&mut scope);
537        let spawned = scope
538            .futures
539            .into_iter()
540            .map(|f| self.spawn(f))
541            .collect::<Vec<_>>();
542        for task in spawned {
543            task.await;
544        }
545    }
546
547    pub async fn simulate_random_delay(&self) {
548        match self {
549            Self::Deterministic { executor, .. } => {
550                if executor.state.lock().rng.gen_bool(0.2) {
551                    let yields = executor.state.lock().rng.gen_range(1..=10);
552                    for _ in 0..yields {
553                        yield_now().await;
554                    }
555                }
556            }
557            _ => panic!("this method can only be called on a deterministic executor"),
558        }
559    }
560}
561
562pub struct Scope<'a> {
563    futures: Vec<Pin<Box<dyn Future<Output = ()> + Send + 'static>>>,
564    _phantom: PhantomData<&'a ()>,
565}
566
567impl<'a> Scope<'a> {
568    pub fn spawn<F>(&mut self, f: F)
569    where
570        F: Future<Output = ()> + Send + 'a,
571    {
572        let f = unsafe {
573            mem::transmute::<
574                Pin<Box<dyn Future<Output = ()> + Send + 'a>>,
575                Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
576            >(Box::pin(f))
577        };
578        self.futures.push(f);
579    }
580}
581
582impl<T> Task<T> {
583    pub fn ready(value: T) -> Self {
584        Self::Ready(Some(value))
585    }
586
587    fn local(any_task: AnyLocalTask) -> Self {
588        Self::Local {
589            any_task,
590            result_type: PhantomData,
591        }
592    }
593
594    pub fn detach(self) {
595        match self {
596            Task::Ready(_) => {}
597            Task::Local { any_task, .. } => any_task.detach(),
598            Task::Send { any_task, .. } => any_task.detach(),
599        }
600    }
601}
602
603impl<T: 'static, E: 'static + Display> Task<Result<T, E>> {
604    pub fn detach_and_log_err(self, cx: &mut MutableAppContext) {
605        cx.spawn(|_| async move {
606            if let Err(err) = self.await {
607                log::error!("{}", err);
608            }
609        })
610        .detach();
611    }
612}
613
614impl<T: Send> Task<T> {
615    fn send(any_task: AnyTask) -> Self {
616        Self::Send {
617            any_task,
618            result_type: PhantomData,
619        }
620    }
621}
622
623impl<T: fmt::Debug> fmt::Debug for Task<T> {
624    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
625        match self {
626            Task::Ready(value) => value.fmt(f),
627            Task::Local { any_task, .. } => any_task.fmt(f),
628            Task::Send { any_task, .. } => any_task.fmt(f),
629        }
630    }
631}
632
633impl<T: 'static> Future for Task<T> {
634    type Output = T;
635
636    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
637        match unsafe { self.get_unchecked_mut() } {
638            Task::Ready(value) => Poll::Ready(value.take().unwrap()),
639            Task::Local { any_task, .. } => {
640                any_task.poll(cx).map(|value| *value.downcast().unwrap())
641            }
642            Task::Send { any_task, .. } => {
643                any_task.poll(cx).map(|value| *value.downcast().unwrap())
644            }
645        }
646    }
647}
648
649fn any_future<T, F>(future: F) -> AnyFuture
650where
651    T: 'static + Send,
652    F: Future<Output = T> + Send + 'static,
653{
654    async { Box::new(future.await) as Box<dyn Any + Send> }.boxed()
655}
656
657fn any_local_future<T, F>(future: F) -> AnyLocalFuture
658where
659    T: 'static,
660    F: Future<Output = T> + 'static,
661{
662    async { Box::new(future.await) as Box<dyn Any> }.boxed_local()
663}