executor.rs

  1use anyhow::{anyhow, Result};
  2use async_task::Runnable;
  3use backtrace::{Backtrace, BacktraceFmt, BytesOrWideString};
  4use parking_lot::Mutex;
  5use postage::{barrier, prelude::Stream as _};
  6use rand::prelude::*;
  7use smol::{channel, prelude::*, Executor, Timer};
  8use std::{
  9    any::Any,
 10    fmt::{self, Debug, Display},
 11    marker::PhantomData,
 12    mem,
 13    ops::RangeInclusive,
 14    pin::Pin,
 15    rc::Rc,
 16    sync::{
 17        atomic::{AtomicBool, Ordering::SeqCst},
 18        Arc,
 19    },
 20    task::{Context, Poll},
 21    thread,
 22    time::{Duration, Instant},
 23};
 24use waker_fn::waker_fn;
 25
 26use crate::{
 27    platform::{self, Dispatcher},
 28    util, MutableAppContext,
 29};
 30
 31pub enum Foreground {
 32    Platform {
 33        dispatcher: Arc<dyn platform::Dispatcher>,
 34        _not_send_or_sync: PhantomData<Rc<()>>,
 35    },
 36    Test(smol::LocalExecutor<'static>),
 37    Deterministic(Arc<Deterministic>),
 38}
 39
 40pub enum Background {
 41    Deterministic {
 42        executor: Arc<Deterministic>,
 43    },
 44    Production {
 45        executor: Arc<smol::Executor<'static>>,
 46        _stop: channel::Sender<()>,
 47    },
 48}
 49
 50type AnyLocalFuture = Pin<Box<dyn 'static + Future<Output = Box<dyn Any + 'static>>>>;
 51type AnyFuture = Pin<Box<dyn 'static + Send + Future<Output = Box<dyn Any + Send + 'static>>>>;
 52type AnyTask = async_task::Task<Box<dyn Any + Send + 'static>>;
 53type AnyLocalTask = async_task::Task<Box<dyn Any + 'static>>;
 54
 55#[must_use]
 56pub enum Task<T> {
 57    Ready(Option<T>),
 58    Local {
 59        any_task: AnyLocalTask,
 60        result_type: PhantomData<T>,
 61    },
 62    Send {
 63        any_task: AnyTask,
 64        result_type: PhantomData<T>,
 65    },
 66}
 67
 68unsafe impl<T: Send> Send for Task<T> {}
 69
 70struct DeterministicState {
 71    rng: StdRng,
 72    seed: u64,
 73    scheduled_from_foreground: Vec<(Runnable, Backtrace)>,
 74    scheduled_from_background: Vec<(Runnable, Backtrace)>,
 75    spawned_from_foreground: Vec<(Runnable, Backtrace)>,
 76    forbid_parking: bool,
 77    block_on_ticks: RangeInclusive<usize>,
 78    now: Instant,
 79    pending_timers: Vec<(Instant, barrier::Sender)>,
 80    waiting_backtrace: Option<Backtrace>,
 81}
 82
 83pub struct Deterministic {
 84    state: Arc<Mutex<DeterministicState>>,
 85    parker: Mutex<parking::Parker>,
 86}
 87
 88impl Deterministic {
 89    fn new(seed: u64) -> Self {
 90        Self {
 91            state: Arc::new(Mutex::new(DeterministicState {
 92                rng: StdRng::seed_from_u64(seed),
 93                seed,
 94                scheduled_from_foreground: Default::default(),
 95                scheduled_from_background: Default::default(),
 96                spawned_from_foreground: Default::default(),
 97                forbid_parking: false,
 98                block_on_ticks: 0..=1000,
 99                now: Instant::now(),
100                pending_timers: Default::default(),
101                waiting_backtrace: None,
102            })),
103            parker: Default::default(),
104        }
105    }
106
107    fn spawn_from_foreground(&self, future: AnyLocalFuture) -> AnyLocalTask {
108        let backtrace = Backtrace::new_unresolved();
109        let scheduled_once = AtomicBool::new(false);
110        let state = self.state.clone();
111        let unparker = self.parker.lock().unparker();
112        let (runnable, task) = async_task::spawn_local(future, move |runnable| {
113            let mut state = state.lock();
114            let backtrace = backtrace.clone();
115            if scheduled_once.fetch_or(true, SeqCst) {
116                state.scheduled_from_foreground.push((runnable, backtrace));
117            } else {
118                state.spawned_from_foreground.push((runnable, backtrace));
119            }
120            unparker.unpark();
121        });
122        runnable.schedule();
123        task
124    }
125
126    fn spawn(&self, future: AnyFuture) -> AnyTask {
127        let backtrace = Backtrace::new_unresolved();
128        let state = self.state.clone();
129        let unparker = self.parker.lock().unparker();
130        let (runnable, task) = async_task::spawn(future, move |runnable| {
131            let mut state = state.lock();
132            state
133                .scheduled_from_background
134                .push((runnable, backtrace.clone()));
135            unparker.unpark();
136        });
137        runnable.schedule();
138        task
139    }
140
141    fn run(&self, mut future: AnyLocalFuture) -> Box<dyn Any> {
142        let woken = Arc::new(AtomicBool::new(false));
143        loop {
144            if let Some(result) = self.run_internal(woken.clone(), &mut future) {
145                return result;
146            }
147
148            if !woken.load(SeqCst) {
149                self.state.lock().will_park();
150            }
151
152            woken.store(false, SeqCst);
153            self.parker.lock().park();
154        }
155    }
156
157    fn run_until_parked(&self) {
158        let woken = Arc::new(AtomicBool::new(false));
159        let mut future = any_local_future(std::future::pending::<()>());
160        self.run_internal(woken, &mut future);
161    }
162
163    fn run_internal(
164        &self,
165        woken: Arc<AtomicBool>,
166        future: &mut AnyLocalFuture,
167    ) -> Option<Box<dyn Any>> {
168        let unparker = self.parker.lock().unparker();
169        let waker = waker_fn(move || {
170            woken.store(true, SeqCst);
171            unparker.unpark();
172        });
173
174        let mut cx = Context::from_waker(&waker);
175        let mut trace = Trace::default();
176        loop {
177            let mut state = self.state.lock();
178            let runnable_count = state.scheduled_from_foreground.len()
179                + state.scheduled_from_background.len()
180                + state.spawned_from_foreground.len();
181
182            let ix = state.rng.gen_range(0..=runnable_count);
183            if ix < state.scheduled_from_foreground.len() {
184                let (_, backtrace) = &state.scheduled_from_foreground[ix];
185                trace.record(&state, backtrace.clone());
186                let runnable = state.scheduled_from_foreground.remove(ix).0;
187                drop(state);
188                runnable.run();
189            } else if ix - state.scheduled_from_foreground.len()
190                < state.scheduled_from_background.len()
191            {
192                let ix = ix - state.scheduled_from_foreground.len();
193                let (_, backtrace) = &state.scheduled_from_background[ix];
194                trace.record(&state, backtrace.clone());
195                let runnable = state.scheduled_from_background.remove(ix).0;
196                drop(state);
197                runnable.run();
198            } else if ix < runnable_count {
199                let (_, backtrace) = &state.spawned_from_foreground[0];
200                trace.record(&state, backtrace.clone());
201                let runnable = state.spawned_from_foreground.remove(0).0;
202                drop(state);
203                runnable.run();
204            } else {
205                drop(state);
206                if let Poll::Ready(result) = future.poll(&mut cx) {
207                    return Some(result);
208                }
209
210                let state = self.state.lock();
211
212                if state.scheduled_from_foreground.is_empty()
213                    && state.scheduled_from_background.is_empty()
214                    && state.spawned_from_foreground.is_empty()
215                {
216                    return None;
217                }
218            }
219        }
220    }
221
222    fn block_on(&self, future: &mut AnyLocalFuture) -> Option<Box<dyn Any>> {
223        let unparker = self.parker.lock().unparker();
224        let waker = waker_fn(move || {
225            unparker.unpark();
226        });
227        let max_ticks = {
228            let mut state = self.state.lock();
229            let range = state.block_on_ticks.clone();
230            state.rng.gen_range(range)
231        };
232
233        let mut cx = Context::from_waker(&waker);
234        let mut trace = Trace::default();
235        for _ in 0..max_ticks {
236            let mut state = self.state.lock();
237            let runnable_count = state.scheduled_from_background.len();
238            let ix = state.rng.gen_range(0..=runnable_count);
239            if ix < state.scheduled_from_background.len() {
240                let (_, backtrace) = &state.scheduled_from_background[ix];
241                trace.record(&state, backtrace.clone());
242                let runnable = state.scheduled_from_background.remove(ix).0;
243                drop(state);
244                runnable.run();
245            } else {
246                drop(state);
247                if let Poll::Ready(result) = future.as_mut().poll(&mut cx) {
248                    return Some(result);
249                }
250                let mut state = self.state.lock();
251                if state.scheduled_from_background.is_empty() {
252                    state.will_park();
253                    drop(state);
254                    self.parker.lock().park();
255                }
256
257                continue;
258            }
259        }
260
261        None
262    }
263}
264
265impl DeterministicState {
266    fn will_park(&mut self) {
267        if self.forbid_parking {
268            let mut backtrace_message = String::new();
269            if let Some(backtrace) = self.waiting_backtrace.as_mut() {
270                backtrace.resolve();
271                backtrace_message = format!(
272                    "\nbacktrace of waiting future:\n{:?}",
273                    CwdBacktrace::new(backtrace)
274                );
275            }
276
277            panic!(
278                "deterministic executor parked after a call to forbid_parking{}",
279                backtrace_message
280            );
281        }
282    }
283}
284
285#[derive(Default)]
286struct Trace {
287    executed: Vec<Backtrace>,
288    scheduled: Vec<Vec<Backtrace>>,
289    spawned_from_foreground: Vec<Vec<Backtrace>>,
290}
291
292impl Trace {
293    fn record(&mut self, state: &DeterministicState, executed: Backtrace) {
294        self.scheduled.push(
295            state
296                .scheduled_from_foreground
297                .iter()
298                .map(|(_, backtrace)| backtrace.clone())
299                .collect(),
300        );
301        self.spawned_from_foreground.push(
302            state
303                .spawned_from_foreground
304                .iter()
305                .map(|(_, backtrace)| backtrace.clone())
306                .collect(),
307        );
308        self.executed.push(executed);
309    }
310
311    fn resolve(&mut self) {
312        for backtrace in &mut self.executed {
313            backtrace.resolve();
314        }
315
316        for backtraces in &mut self.scheduled {
317            for backtrace in backtraces {
318                backtrace.resolve();
319            }
320        }
321
322        for backtraces in &mut self.spawned_from_foreground {
323            for backtrace in backtraces {
324                backtrace.resolve();
325            }
326        }
327    }
328}
329
330struct CwdBacktrace<'a> {
331    backtrace: &'a Backtrace,
332    first_frame_only: bool,
333}
334
335impl<'a> CwdBacktrace<'a> {
336    fn new(backtrace: &'a Backtrace) -> Self {
337        Self {
338            backtrace,
339            first_frame_only: false,
340        }
341    }
342
343    fn first_frame(backtrace: &'a Backtrace) -> Self {
344        Self {
345            backtrace,
346            first_frame_only: true,
347        }
348    }
349}
350
351impl<'a> Debug for CwdBacktrace<'a> {
352    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> std::fmt::Result {
353        let cwd = std::env::current_dir().unwrap();
354        let mut print_path = |fmt: &mut fmt::Formatter<'_>, path: BytesOrWideString<'_>| {
355            fmt::Display::fmt(&path, fmt)
356        };
357        let mut fmt = BacktraceFmt::new(f, backtrace::PrintFmt::Full, &mut print_path);
358        for frame in self.backtrace.frames() {
359            let mut formatted_frame = fmt.frame();
360            if frame
361                .symbols()
362                .iter()
363                .any(|s| s.filename().map_or(false, |f| f.starts_with(&cwd)))
364            {
365                formatted_frame.backtrace_frame(frame)?;
366                if self.first_frame_only {
367                    break;
368                }
369            }
370        }
371        fmt.finish()
372    }
373}
374
375impl Debug for Trace {
376    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
377        for ((backtrace, scheduled), spawned_from_foreground) in self
378            .executed
379            .iter()
380            .zip(&self.scheduled)
381            .zip(&self.spawned_from_foreground)
382        {
383            writeln!(f, "Scheduled")?;
384            for backtrace in scheduled {
385                writeln!(f, "- {:?}", CwdBacktrace::first_frame(backtrace))?;
386            }
387            if scheduled.is_empty() {
388                writeln!(f, "None")?;
389            }
390            writeln!(f, "==========")?;
391
392            writeln!(f, "Spawned from foreground")?;
393            for backtrace in spawned_from_foreground {
394                writeln!(f, "- {:?}", CwdBacktrace::first_frame(backtrace))?;
395            }
396            if spawned_from_foreground.is_empty() {
397                writeln!(f, "None")?;
398            }
399            writeln!(f, "==========")?;
400
401            writeln!(f, "Run: {:?}", CwdBacktrace::first_frame(backtrace))?;
402            writeln!(f, "+++++++++++++++++++")?;
403        }
404
405        Ok(())
406    }
407}
408
409impl Drop for Trace {
410    fn drop(&mut self) {
411        let trace_on_panic = if let Ok(trace_on_panic) = std::env::var("EXECUTOR_TRACE_ON_PANIC") {
412            trace_on_panic == "1" || trace_on_panic == "true"
413        } else {
414            false
415        };
416        let trace_always = if let Ok(trace_always) = std::env::var("EXECUTOR_TRACE_ALWAYS") {
417            trace_always == "1" || trace_always == "true"
418        } else {
419            false
420        };
421
422        if trace_always || (trace_on_panic && thread::panicking()) {
423            self.resolve();
424            dbg!(self);
425        }
426    }
427}
428
429impl Foreground {
430    pub fn platform(dispatcher: Arc<dyn platform::Dispatcher>) -> Result<Self> {
431        if dispatcher.is_main_thread() {
432            Ok(Self::Platform {
433                dispatcher,
434                _not_send_or_sync: PhantomData,
435            })
436        } else {
437            Err(anyhow!("must be constructed on main thread"))
438        }
439    }
440
441    pub fn test() -> Self {
442        Self::Test(smol::LocalExecutor::new())
443    }
444
445    pub fn spawn<T: 'static>(&self, future: impl Future<Output = T> + 'static) -> Task<T> {
446        let future = any_local_future(future);
447        let any_task = match self {
448            Self::Deterministic(executor) => executor.spawn_from_foreground(future),
449            Self::Platform { dispatcher, .. } => {
450                fn spawn_inner(
451                    future: AnyLocalFuture,
452                    dispatcher: &Arc<dyn Dispatcher>,
453                ) -> AnyLocalTask {
454                    let dispatcher = dispatcher.clone();
455                    let schedule =
456                        move |runnable: Runnable| dispatcher.run_on_main_thread(runnable);
457                    let (runnable, task) = async_task::spawn_local(future, schedule);
458                    runnable.schedule();
459                    task
460                }
461                spawn_inner(future, dispatcher)
462            }
463            Self::Test(executor) => executor.spawn(future),
464        };
465        Task::local(any_task)
466    }
467
468    pub fn run<T: 'static>(&self, future: impl 'static + Future<Output = T>) -> T {
469        let future = any_local_future(future);
470        let any_value = match self {
471            Self::Deterministic(executor) => executor.run(future),
472            Self::Platform { .. } => panic!("you can't call run on a platform foreground executor"),
473            Self::Test(executor) => smol::block_on(executor.run(future)),
474        };
475        *any_value.downcast().unwrap()
476    }
477
478    pub fn parking_forbidden(&self) -> bool {
479        match self {
480            Self::Deterministic(executor) => executor.state.lock().forbid_parking,
481            _ => panic!("this method can only be called on a deterministic executor"),
482        }
483    }
484
485    pub fn start_waiting(&self) {
486        match self {
487            Self::Deterministic(executor) => {
488                executor.state.lock().waiting_backtrace = Some(Backtrace::new_unresolved());
489            }
490            _ => panic!("this method can only be called on a deterministic executor"),
491        }
492    }
493
494    pub fn finish_waiting(&self) {
495        match self {
496            Self::Deterministic(executor) => {
497                executor.state.lock().waiting_backtrace.take();
498            }
499            _ => panic!("this method can only be called on a deterministic executor"),
500        }
501    }
502
503    pub fn forbid_parking(&self) {
504        match self {
505            Self::Deterministic(executor) => {
506                let mut state = executor.state.lock();
507                state.forbid_parking = true;
508                state.rng = StdRng::seed_from_u64(state.seed);
509            }
510            _ => panic!("this method can only be called on a deterministic executor"),
511        }
512    }
513
514    pub async fn timer(&self, duration: Duration) {
515        match self {
516            Self::Deterministic(executor) => {
517                let (tx, mut rx) = barrier::channel();
518                {
519                    let mut state = executor.state.lock();
520                    let wakeup_at = state.now + duration;
521                    state.pending_timers.push((wakeup_at, tx));
522                }
523                rx.recv().await;
524            }
525            _ => {
526                Timer::after(duration).await;
527            }
528        }
529    }
530
531    pub fn advance_clock(&self, duration: Duration) {
532        match self {
533            Self::Deterministic(executor) => {
534                executor.run_until_parked();
535
536                let mut state = executor.state.lock();
537                state.now += duration;
538                let now = state.now;
539                let mut pending_timers = mem::take(&mut state.pending_timers);
540                drop(state);
541
542                pending_timers.retain(|(wakeup, _)| *wakeup > now);
543                executor.state.lock().pending_timers.extend(pending_timers);
544            }
545            _ => panic!("this method can only be called on a deterministic executor"),
546        }
547    }
548
549    pub fn set_block_on_ticks(&self, range: RangeInclusive<usize>) {
550        match self {
551            Self::Deterministic(executor) => executor.state.lock().block_on_ticks = range,
552            _ => panic!("this method can only be called on a deterministic executor"),
553        }
554    }
555}
556
557impl Background {
558    pub fn new() -> Self {
559        let executor = Arc::new(Executor::new());
560        let stop = channel::unbounded::<()>();
561
562        for i in 0..2 * num_cpus::get() {
563            let executor = executor.clone();
564            let stop = stop.1.clone();
565            thread::Builder::new()
566                .name(format!("background-executor-{}", i))
567                .spawn(move || smol::block_on(executor.run(stop.recv())))
568                .unwrap();
569        }
570
571        Self::Production {
572            executor,
573            _stop: stop.0,
574        }
575    }
576
577    pub fn num_cpus(&self) -> usize {
578        num_cpus::get()
579    }
580
581    pub fn spawn<T, F>(&self, future: F) -> Task<T>
582    where
583        T: 'static + Send,
584        F: Send + Future<Output = T> + 'static,
585    {
586        let future = any_future(future);
587        let any_task = match self {
588            Self::Production { executor, .. } => executor.spawn(future),
589            Self::Deterministic { executor, .. } => executor.spawn(future),
590        };
591        Task::send(any_task)
592    }
593
594    pub fn block_with_timeout<F, T>(
595        &self,
596        timeout: Duration,
597        future: F,
598    ) -> Result<T, impl Future<Output = T>>
599    where
600        T: 'static,
601        F: 'static + Unpin + Future<Output = T>,
602    {
603        let mut future = any_local_future(future);
604        if !timeout.is_zero() {
605            let output = match self {
606                Self::Production { .. } => smol::block_on(util::timeout(timeout, &mut future)).ok(),
607                Self::Deterministic { executor, .. } => executor.block_on(&mut future),
608            };
609            if let Some(output) = output {
610                return Ok(*output.downcast().unwrap());
611            }
612        }
613        Err(async { *future.await.downcast().unwrap() })
614    }
615
616    pub async fn scoped<'scope, F>(&self, scheduler: F)
617    where
618        F: FnOnce(&mut Scope<'scope>),
619    {
620        let mut scope = Scope {
621            futures: Default::default(),
622            _phantom: PhantomData,
623        };
624        (scheduler)(&mut scope);
625        let spawned = scope
626            .futures
627            .into_iter()
628            .map(|f| self.spawn(f))
629            .collect::<Vec<_>>();
630        for task in spawned {
631            task.await;
632        }
633    }
634}
635
636pub struct Scope<'a> {
637    futures: Vec<Pin<Box<dyn Future<Output = ()> + Send + 'static>>>,
638    _phantom: PhantomData<&'a ()>,
639}
640
641impl<'a> Scope<'a> {
642    pub fn spawn<F>(&mut self, f: F)
643    where
644        F: Future<Output = ()> + Send + 'a,
645    {
646        let f = unsafe {
647            mem::transmute::<
648                Pin<Box<dyn Future<Output = ()> + Send + 'a>>,
649                Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
650            >(Box::pin(f))
651        };
652        self.futures.push(f);
653    }
654}
655
656pub fn deterministic(seed: u64) -> (Rc<Foreground>, Arc<Background>) {
657    let executor = Arc::new(Deterministic::new(seed));
658    (
659        Rc::new(Foreground::Deterministic(executor.clone())),
660        Arc::new(Background::Deterministic { executor }),
661    )
662}
663
664impl<T> Task<T> {
665    pub fn ready(value: T) -> Self {
666        Self::Ready(Some(value))
667    }
668
669    fn local(any_task: AnyLocalTask) -> Self {
670        Self::Local {
671            any_task,
672            result_type: PhantomData,
673        }
674    }
675
676    pub fn detach(self) {
677        match self {
678            Task::Ready(_) => {}
679            Task::Local { any_task, .. } => any_task.detach(),
680            Task::Send { any_task, .. } => any_task.detach(),
681        }
682    }
683}
684
685impl<T: 'static, E: 'static + Display> Task<Result<T, E>> {
686    pub fn detach_and_log_err(self, cx: &mut MutableAppContext) {
687        cx.spawn(|_| async move {
688            if let Err(err) = self.await {
689                log::error!("{}", err);
690            }
691        })
692        .detach();
693    }
694}
695
696impl<T: Send> Task<T> {
697    fn send(any_task: AnyTask) -> Self {
698        Self::Send {
699            any_task,
700            result_type: PhantomData,
701        }
702    }
703}
704
705impl<T: fmt::Debug> fmt::Debug for Task<T> {
706    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
707        match self {
708            Task::Ready(value) => value.fmt(f),
709            Task::Local { any_task, .. } => any_task.fmt(f),
710            Task::Send { any_task, .. } => any_task.fmt(f),
711        }
712    }
713}
714
715impl<T: 'static> Future for Task<T> {
716    type Output = T;
717
718    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
719        match unsafe { self.get_unchecked_mut() } {
720            Task::Ready(value) => Poll::Ready(value.take().unwrap()),
721            Task::Local { any_task, .. } => {
722                any_task.poll(cx).map(|value| *value.downcast().unwrap())
723            }
724            Task::Send { any_task, .. } => {
725                any_task.poll(cx).map(|value| *value.downcast().unwrap())
726            }
727        }
728    }
729}
730
731fn any_future<T, F>(future: F) -> AnyFuture
732where
733    T: 'static + Send,
734    F: Future<Output = T> + Send + 'static,
735{
736    async { Box::new(future.await) as Box<dyn Any + Send> }.boxed()
737}
738
739fn any_local_future<T, F>(future: F) -> AnyLocalFuture
740where
741    T: 'static,
742    F: Future<Output = T> + 'static,
743{
744    async { Box::new(future.await) as Box<dyn Any> }.boxed_local()
745}