executor.rs

  1use anyhow::{anyhow, Result};
  2use async_task::Runnable;
  3use backtrace::{Backtrace, BacktraceFmt, BytesOrWideString};
  4use collections::HashMap;
  5use parking_lot::Mutex;
  6use postage::{barrier, prelude::Stream as _};
  7use rand::prelude::*;
  8use smol::{channel, prelude::*, Executor, Timer};
  9use std::{
 10    any::Any,
 11    fmt::{self, Debug, Display},
 12    marker::PhantomData,
 13    mem,
 14    ops::RangeInclusive,
 15    pin::Pin,
 16    rc::Rc,
 17    sync::{
 18        atomic::{AtomicBool, Ordering::SeqCst},
 19        Arc,
 20    },
 21    task::{Context, Poll},
 22    thread,
 23    time::{Duration, Instant},
 24};
 25use waker_fn::waker_fn;
 26
 27use crate::{
 28    platform::{self, Dispatcher},
 29    util, MutableAppContext,
 30};
 31
 32pub enum Foreground {
 33    Platform {
 34        dispatcher: Arc<dyn platform::Dispatcher>,
 35        _not_send_or_sync: PhantomData<Rc<()>>,
 36    },
 37    Deterministic {
 38        cx_id: usize,
 39        executor: Arc<Deterministic>,
 40    },
 41}
 42
 43pub enum Background {
 44    Deterministic {
 45        executor: Arc<Deterministic>,
 46    },
 47    Production {
 48        executor: Arc<smol::Executor<'static>>,
 49        _stop: channel::Sender<()>,
 50    },
 51}
 52
 53type AnyLocalFuture = Pin<Box<dyn 'static + Future<Output = Box<dyn Any + 'static>>>>;
 54type AnyFuture = Pin<Box<dyn 'static + Send + Future<Output = Box<dyn Any + Send + 'static>>>>;
 55type AnyTask = async_task::Task<Box<dyn Any + Send + 'static>>;
 56type AnyLocalTask = async_task::Task<Box<dyn Any + 'static>>;
 57
 58#[must_use]
 59pub enum Task<T> {
 60    Ready(Option<T>),
 61    Local {
 62        any_task: AnyLocalTask,
 63        result_type: PhantomData<T>,
 64    },
 65    Send {
 66        any_task: AnyTask,
 67        result_type: PhantomData<T>,
 68    },
 69}
 70
 71unsafe impl<T: Send> Send for Task<T> {}
 72
 73struct DeterministicState {
 74    rng: StdRng,
 75    seed: u64,
 76    scheduled_from_foreground: HashMap<usize, Vec<ScheduledForeground>>,
 77    scheduled_from_background: Vec<Runnable>,
 78    forbid_parking: bool,
 79    block_on_ticks: RangeInclusive<usize>,
 80    now: Instant,
 81    pending_timers: Vec<(Instant, barrier::Sender)>,
 82    waiting_backtrace: Option<Backtrace>,
 83}
 84
 85enum ScheduledForeground {
 86    MainFuture,
 87    Runnable(Runnable),
 88}
 89
 90pub struct Deterministic {
 91    state: Arc<Mutex<DeterministicState>>,
 92    parker: Mutex<parking::Parker>,
 93}
 94
 95impl Deterministic {
 96    pub fn new(seed: u64) -> Arc<Self> {
 97        Arc::new(Self {
 98            state: Arc::new(Mutex::new(DeterministicState {
 99                rng: StdRng::seed_from_u64(seed),
100                seed,
101                scheduled_from_foreground: Default::default(),
102                scheduled_from_background: Default::default(),
103                forbid_parking: false,
104                block_on_ticks: 0..=1000,
105                now: Instant::now(),
106                pending_timers: Default::default(),
107                waiting_backtrace: None,
108            })),
109            parker: Default::default(),
110        })
111    }
112
113    pub fn build_background(self: &Arc<Self>) -> Arc<Background> {
114        Arc::new(Background::Deterministic {
115            executor: self.clone(),
116        })
117    }
118
119    pub fn build_foreground(self: &Arc<Self>, id: usize) -> Rc<Foreground> {
120        Rc::new(Foreground::Deterministic {
121            cx_id: id,
122            executor: self.clone(),
123        })
124    }
125
126    fn spawn_from_foreground(&self, cx_id: usize, future: AnyLocalFuture) -> AnyLocalTask {
127        let state = self.state.clone();
128        let unparker = self.parker.lock().unparker();
129        let (runnable, task) = async_task::spawn_local(future, move |runnable| {
130            let mut state = state.lock();
131            state
132                .scheduled_from_foreground
133                .entry(cx_id)
134                .or_default()
135                .push(ScheduledForeground::Runnable(runnable));
136            unparker.unpark();
137        });
138        runnable.schedule();
139        task
140    }
141
142    fn spawn(&self, future: AnyFuture) -> AnyTask {
143        let state = self.state.clone();
144        let unparker = self.parker.lock().unparker();
145        let (runnable, task) = async_task::spawn(future, move |runnable| {
146            let mut state = state.lock();
147            state.scheduled_from_background.push(runnable);
148            unparker.unpark();
149        });
150        runnable.schedule();
151        task
152    }
153
154    fn run(&self, cx_id: usize, mut future: AnyLocalFuture) -> Box<dyn Any> {
155        let woken = Arc::new(AtomicBool::new(false));
156        loop {
157            if let Some(result) = self.run_internal(cx_id, woken.clone(), &mut future) {
158                return result;
159            }
160
161            if !woken.load(SeqCst) {
162                self.state.lock().will_park();
163            }
164
165            woken.store(false, SeqCst);
166            self.parker.lock().park();
167        }
168    }
169
170    fn run_until_parked(&self, cx_id: usize) {
171        let woken = Arc::new(AtomicBool::new(false));
172        let mut future = any_local_future(std::future::pending::<()>());
173        self.run_internal(cx_id, woken, &mut future);
174    }
175
176    fn run_internal(
177        &self,
178        cx_id: usize,
179        woken: Arc<AtomicBool>,
180        future: &mut AnyLocalFuture,
181    ) -> Option<Box<dyn Any>> {
182        let unparker = self.parker.lock().unparker();
183        let scheduled_main_future = Arc::new(AtomicBool::new(true));
184        self.state
185            .lock()
186            .scheduled_from_foreground
187            .entry(cx_id)
188            .or_default()
189            .insert(0, ScheduledForeground::MainFuture);
190
191        let waker = waker_fn({
192            let state = self.state.clone();
193            let scheduled_main_future = scheduled_main_future.clone();
194            move || {
195                woken.store(true, SeqCst);
196                if !scheduled_main_future.load(SeqCst) {
197                    scheduled_main_future.store(true, SeqCst);
198                    state
199                        .lock()
200                        .scheduled_from_foreground
201                        .entry(cx_id)
202                        .or_default()
203                        .push(ScheduledForeground::MainFuture);
204                }
205
206                unparker.unpark();
207            }
208        });
209
210        let mut cx = Context::from_waker(&waker);
211        loop {
212            let mut state = self.state.lock();
213
214            if state.scheduled_from_foreground.is_empty()
215                && state.scheduled_from_background.is_empty()
216            {
217                return None;
218            }
219
220            if !state.scheduled_from_background.is_empty() && state.rng.gen() {
221                let background_len = state.scheduled_from_background.len();
222                let ix = state.rng.gen_range(0..background_len);
223                let runnable = state.scheduled_from_background.remove(ix);
224                drop(state);
225                runnable.run();
226            } else if !state.scheduled_from_foreground.is_empty() {
227                let available_cx_ids = state
228                    .scheduled_from_foreground
229                    .keys()
230                    .copied()
231                    .collect::<Vec<_>>();
232                let cx_id_to_run = *available_cx_ids.iter().choose(&mut state.rng).unwrap();
233                let scheduled_from_cx = state
234                    .scheduled_from_foreground
235                    .get_mut(&cx_id_to_run)
236                    .unwrap();
237                let runnable = scheduled_from_cx.remove(0);
238                if scheduled_from_cx.is_empty() {
239                    state.scheduled_from_foreground.remove(&cx_id_to_run);
240                }
241
242                drop(state);
243                match runnable {
244                    ScheduledForeground::MainFuture => {
245                        scheduled_main_future.store(false, SeqCst);
246                        if let Poll::Ready(result) = future.poll(&mut cx) {
247                            return Some(result);
248                        }
249                    }
250                    ScheduledForeground::Runnable(runnable) => {
251                        runnable.run();
252                    }
253                }
254            } else {
255                return None;
256            }
257        }
258    }
259
260    fn block_on(&self, future: &mut AnyLocalFuture) -> Option<Box<dyn Any>> {
261        let unparker = self.parker.lock().unparker();
262        let waker = waker_fn(move || {
263            unparker.unpark();
264        });
265        let max_ticks = {
266            let mut state = self.state.lock();
267            let range = state.block_on_ticks.clone();
268            state.rng.gen_range(range)
269        };
270
271        let mut cx = Context::from_waker(&waker);
272        for _ in 0..max_ticks {
273            let mut state = self.state.lock();
274            let runnable_count = state.scheduled_from_background.len();
275            let ix = state.rng.gen_range(0..=runnable_count);
276            if ix < state.scheduled_from_background.len() {
277                let runnable = state.scheduled_from_background.remove(ix);
278                drop(state);
279                runnable.run();
280            } else {
281                drop(state);
282                if let Poll::Ready(result) = future.as_mut().poll(&mut cx) {
283                    return Some(result);
284                }
285                let mut state = self.state.lock();
286                if state.scheduled_from_background.is_empty() {
287                    state.will_park();
288                    drop(state);
289                    self.parker.lock().park();
290                }
291
292                continue;
293            }
294        }
295
296        None
297    }
298}
299
300impl DeterministicState {
301    fn will_park(&mut self) {
302        if self.forbid_parking {
303            let mut backtrace_message = String::new();
304            if let Some(backtrace) = self.waiting_backtrace.as_mut() {
305                backtrace.resolve();
306                backtrace_message = format!(
307                    "\nbacktrace of waiting future:\n{:?}",
308                    CwdBacktrace::new(backtrace)
309                );
310            }
311
312            panic!(
313                "deterministic executor parked after a call to forbid_parking{}",
314                backtrace_message
315            );
316        }
317    }
318}
319
320struct CwdBacktrace<'a> {
321    backtrace: &'a Backtrace,
322}
323
324impl<'a> CwdBacktrace<'a> {
325    fn new(backtrace: &'a Backtrace) -> Self {
326        Self { backtrace }
327    }
328}
329
330impl<'a> Debug for CwdBacktrace<'a> {
331    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> std::fmt::Result {
332        let cwd = std::env::current_dir().unwrap();
333        let mut print_path = |fmt: &mut fmt::Formatter<'_>, path: BytesOrWideString<'_>| {
334            fmt::Display::fmt(&path, fmt)
335        };
336        let mut fmt = BacktraceFmt::new(f, backtrace::PrintFmt::Full, &mut print_path);
337        for frame in self.backtrace.frames() {
338            let mut formatted_frame = fmt.frame();
339            if frame
340                .symbols()
341                .iter()
342                .any(|s| s.filename().map_or(false, |f| f.starts_with(&cwd)))
343            {
344                formatted_frame.backtrace_frame(frame)?;
345            }
346        }
347        fmt.finish()
348    }
349}
350
351impl Foreground {
352    pub fn platform(dispatcher: Arc<dyn platform::Dispatcher>) -> Result<Self> {
353        if dispatcher.is_main_thread() {
354            Ok(Self::Platform {
355                dispatcher,
356                _not_send_or_sync: PhantomData,
357            })
358        } else {
359            Err(anyhow!("must be constructed on main thread"))
360        }
361    }
362
363    pub fn spawn<T: 'static>(&self, future: impl Future<Output = T> + 'static) -> Task<T> {
364        let future = any_local_future(future);
365        let any_task = match self {
366            Self::Deterministic { cx_id, executor } => {
367                executor.spawn_from_foreground(*cx_id, future)
368            }
369            Self::Platform { dispatcher, .. } => {
370                fn spawn_inner(
371                    future: AnyLocalFuture,
372                    dispatcher: &Arc<dyn Dispatcher>,
373                ) -> AnyLocalTask {
374                    let dispatcher = dispatcher.clone();
375                    let schedule =
376                        move |runnable: Runnable| dispatcher.run_on_main_thread(runnable);
377                    let (runnable, task) = async_task::spawn_local(future, schedule);
378                    runnable.schedule();
379                    task
380                }
381                spawn_inner(future, dispatcher)
382            }
383        };
384        Task::local(any_task)
385    }
386
387    pub fn run<T: 'static>(&self, future: impl 'static + Future<Output = T>) -> T {
388        let future = any_local_future(future);
389        let any_value = match self {
390            Self::Deterministic { cx_id, executor } => executor.run(*cx_id, future),
391            Self::Platform { .. } => panic!("you can't call run on a platform foreground executor"),
392        };
393        *any_value.downcast().unwrap()
394    }
395
396    pub fn parking_forbidden(&self) -> bool {
397        match self {
398            Self::Deterministic { executor, .. } => executor.state.lock().forbid_parking,
399            _ => panic!("this method can only be called on a deterministic executor"),
400        }
401    }
402
403    pub fn start_waiting(&self) {
404        match self {
405            Self::Deterministic { executor, .. } => {
406                executor.state.lock().waiting_backtrace = Some(Backtrace::new_unresolved());
407            }
408            _ => panic!("this method can only be called on a deterministic executor"),
409        }
410    }
411
412    pub fn finish_waiting(&self) {
413        match self {
414            Self::Deterministic { executor, .. } => {
415                executor.state.lock().waiting_backtrace.take();
416            }
417            _ => panic!("this method can only be called on a deterministic executor"),
418        }
419    }
420
421    pub fn forbid_parking(&self) {
422        match self {
423            Self::Deterministic { executor, .. } => {
424                let mut state = executor.state.lock();
425                state.forbid_parking = true;
426                state.rng = StdRng::seed_from_u64(state.seed);
427            }
428            _ => panic!("this method can only be called on a deterministic executor"),
429        }
430    }
431
432    pub async fn timer(&self, duration: Duration) {
433        match self {
434            Self::Deterministic { executor, .. } => {
435                let (tx, mut rx) = barrier::channel();
436                {
437                    let mut state = executor.state.lock();
438                    let wakeup_at = state.now + duration;
439                    state.pending_timers.push((wakeup_at, tx));
440                }
441                rx.recv().await;
442            }
443            _ => {
444                Timer::after(duration).await;
445            }
446        }
447    }
448
449    pub fn advance_clock(&self, duration: Duration) {
450        match self {
451            Self::Deterministic { cx_id, executor } => {
452                executor.run_until_parked(*cx_id);
453
454                let mut state = executor.state.lock();
455                state.now += duration;
456                let now = state.now;
457                let mut pending_timers = mem::take(&mut state.pending_timers);
458                drop(state);
459
460                pending_timers.retain(|(wakeup, _)| *wakeup > now);
461                executor.state.lock().pending_timers.extend(pending_timers);
462            }
463            _ => panic!("this method can only be called on a deterministic executor"),
464        }
465    }
466
467    pub fn set_block_on_ticks(&self, range: RangeInclusive<usize>) {
468        match self {
469            Self::Deterministic { executor, .. } => executor.state.lock().block_on_ticks = range,
470            _ => panic!("this method can only be called on a deterministic executor"),
471        }
472    }
473}
474
475impl Background {
476    pub fn new() -> Self {
477        let executor = Arc::new(Executor::new());
478        let stop = channel::unbounded::<()>();
479
480        for i in 0..2 * num_cpus::get() {
481            let executor = executor.clone();
482            let stop = stop.1.clone();
483            thread::Builder::new()
484                .name(format!("background-executor-{}", i))
485                .spawn(move || smol::block_on(executor.run(stop.recv())))
486                .unwrap();
487        }
488
489        Self::Production {
490            executor,
491            _stop: stop.0,
492        }
493    }
494
495    pub fn num_cpus(&self) -> usize {
496        num_cpus::get()
497    }
498
499    pub fn spawn<T, F>(&self, future: F) -> Task<T>
500    where
501        T: 'static + Send,
502        F: Send + Future<Output = T> + 'static,
503    {
504        let future = any_future(future);
505        let any_task = match self {
506            Self::Production { executor, .. } => executor.spawn(future),
507            Self::Deterministic { executor } => executor.spawn(future),
508        };
509        Task::send(any_task)
510    }
511
512    pub fn block_with_timeout<F, T>(
513        &self,
514        timeout: Duration,
515        future: F,
516    ) -> Result<T, impl Future<Output = T>>
517    where
518        T: 'static,
519        F: 'static + Unpin + Future<Output = T>,
520    {
521        let mut future = any_local_future(future);
522        if !timeout.is_zero() {
523            let output = match self {
524                Self::Production { .. } => smol::block_on(util::timeout(timeout, &mut future)).ok(),
525                Self::Deterministic { executor, .. } => executor.block_on(&mut future),
526            };
527            if let Some(output) = output {
528                return Ok(*output.downcast().unwrap());
529            }
530        }
531        Err(async { *future.await.downcast().unwrap() })
532    }
533
534    pub async fn scoped<'scope, F>(&self, scheduler: F)
535    where
536        F: FnOnce(&mut Scope<'scope>),
537    {
538        let mut scope = Scope {
539            futures: Default::default(),
540            _phantom: PhantomData,
541        };
542        (scheduler)(&mut scope);
543        let spawned = scope
544            .futures
545            .into_iter()
546            .map(|f| self.spawn(f))
547            .collect::<Vec<_>>();
548        for task in spawned {
549            task.await;
550        }
551    }
552}
553
554pub struct Scope<'a> {
555    futures: Vec<Pin<Box<dyn Future<Output = ()> + Send + 'static>>>,
556    _phantom: PhantomData<&'a ()>,
557}
558
559impl<'a> Scope<'a> {
560    pub fn spawn<F>(&mut self, f: F)
561    where
562        F: Future<Output = ()> + Send + 'a,
563    {
564        let f = unsafe {
565            mem::transmute::<
566                Pin<Box<dyn Future<Output = ()> + Send + 'a>>,
567                Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
568            >(Box::pin(f))
569        };
570        self.futures.push(f);
571    }
572}
573
574impl<T> Task<T> {
575    pub fn ready(value: T) -> Self {
576        Self::Ready(Some(value))
577    }
578
579    fn local(any_task: AnyLocalTask) -> Self {
580        Self::Local {
581            any_task,
582            result_type: PhantomData,
583        }
584    }
585
586    pub fn detach(self) {
587        match self {
588            Task::Ready(_) => {}
589            Task::Local { any_task, .. } => any_task.detach(),
590            Task::Send { any_task, .. } => any_task.detach(),
591        }
592    }
593}
594
595impl<T: 'static, E: 'static + Display> Task<Result<T, E>> {
596    pub fn detach_and_log_err(self, cx: &mut MutableAppContext) {
597        cx.spawn(|_| async move {
598            if let Err(err) = self.await {
599                log::error!("{}", err);
600            }
601        })
602        .detach();
603    }
604}
605
606impl<T: Send> Task<T> {
607    fn send(any_task: AnyTask) -> Self {
608        Self::Send {
609            any_task,
610            result_type: PhantomData,
611        }
612    }
613}
614
615impl<T: fmt::Debug> fmt::Debug for Task<T> {
616    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
617        match self {
618            Task::Ready(value) => value.fmt(f),
619            Task::Local { any_task, .. } => any_task.fmt(f),
620            Task::Send { any_task, .. } => any_task.fmt(f),
621        }
622    }
623}
624
625impl<T: 'static> Future for Task<T> {
626    type Output = T;
627
628    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
629        match unsafe { self.get_unchecked_mut() } {
630            Task::Ready(value) => Poll::Ready(value.take().unwrap()),
631            Task::Local { any_task, .. } => {
632                any_task.poll(cx).map(|value| *value.downcast().unwrap())
633            }
634            Task::Send { any_task, .. } => {
635                any_task.poll(cx).map(|value| *value.downcast().unwrap())
636            }
637        }
638    }
639}
640
641fn any_future<T, F>(future: F) -> AnyFuture
642where
643    T: 'static + Send,
644    F: Future<Output = T> + Send + 'static,
645{
646    async { Box::new(future.await) as Box<dyn Any + Send> }.boxed()
647}
648
649fn any_local_future<T, F>(future: F) -> AnyLocalFuture
650where
651    T: 'static,
652    F: Future<Output = T> + 'static,
653{
654    async { Box::new(future.await) as Box<dyn Any> }.boxed_local()
655}