executor.rs

  1use anyhow::{anyhow, Result};
  2use async_task::Runnable;
  3use smol::{channel, prelude::*, Executor, Timer};
  4use std::{
  5    any::Any,
  6    fmt::{self, Display},
  7    marker::PhantomData,
  8    mem,
  9    pin::Pin,
 10    rc::Rc,
 11    sync::Arc,
 12    task::{Context, Poll},
 13    thread,
 14    time::Duration,
 15};
 16
 17use crate::{
 18    platform::{self, Dispatcher},
 19    util::{self, post_inc},
 20    MutableAppContext,
 21};
 22
 23pub enum Foreground {
 24    Platform {
 25        dispatcher: Arc<dyn platform::Dispatcher>,
 26        _not_send_or_sync: PhantomData<Rc<()>>,
 27    },
 28    #[cfg(any(test, feature = "test-support"))]
 29    Deterministic {
 30        cx_id: usize,
 31        executor: Arc<Deterministic>,
 32    },
 33}
 34
 35pub enum Background {
 36    #[cfg(any(test, feature = "test-support"))]
 37    Deterministic { executor: Arc<Deterministic> },
 38    Production {
 39        executor: Arc<smol::Executor<'static>>,
 40        _stop: channel::Sender<()>,
 41    },
 42}
 43
 44type AnyLocalFuture = Pin<Box<dyn 'static + Future<Output = Box<dyn Any + 'static>>>>;
 45type AnyFuture = Pin<Box<dyn 'static + Send + Future<Output = Box<dyn Any + Send + 'static>>>>;
 46type AnyTask = async_task::Task<Box<dyn Any + Send + 'static>>;
 47type AnyLocalTask = async_task::Task<Box<dyn Any + 'static>>;
 48
 49#[must_use]
 50pub enum Task<T> {
 51    Ready(Option<T>),
 52    Local {
 53        any_task: AnyLocalTask,
 54        result_type: PhantomData<T>,
 55    },
 56    Send {
 57        any_task: AnyTask,
 58        result_type: PhantomData<T>,
 59    },
 60}
 61
 62unsafe impl<T: Send> Send for Task<T> {}
 63
 64#[cfg(any(test, feature = "test-support"))]
 65struct DeterministicState {
 66    rng: rand::prelude::StdRng,
 67    seed: u64,
 68    scheduled_from_foreground: collections::HashMap<usize, Vec<ForegroundRunnable>>,
 69    scheduled_from_background: Vec<Runnable>,
 70    forbid_parking: bool,
 71    block_on_ticks: std::ops::RangeInclusive<usize>,
 72    now: std::time::Instant,
 73    next_timer_id: usize,
 74    pending_timers: Vec<(usize, std::time::Instant, postage::barrier::Sender)>,
 75    waiting_backtrace: Option<backtrace::Backtrace>,
 76}
 77
 78#[cfg(any(test, feature = "test-support"))]
 79struct ForegroundRunnable {
 80    runnable: Runnable,
 81    main: bool,
 82}
 83
 84#[cfg(any(test, feature = "test-support"))]
 85pub struct Deterministic {
 86    state: Arc<parking_lot::Mutex<DeterministicState>>,
 87    parker: parking_lot::Mutex<parking::Parker>,
 88}
 89
 90#[cfg(any(test, feature = "test-support"))]
 91impl Deterministic {
 92    pub fn new(seed: u64) -> Arc<Self> {
 93        use rand::prelude::*;
 94
 95        Arc::new(Self {
 96            state: Arc::new(parking_lot::Mutex::new(DeterministicState {
 97                rng: StdRng::seed_from_u64(seed),
 98                seed,
 99                scheduled_from_foreground: Default::default(),
100                scheduled_from_background: Default::default(),
101                forbid_parking: false,
102                block_on_ticks: 0..=1000,
103                now: std::time::Instant::now(),
104                next_timer_id: Default::default(),
105                pending_timers: Default::default(),
106                waiting_backtrace: None,
107            })),
108            parker: Default::default(),
109        })
110    }
111
112    pub fn build_background(self: &Arc<Self>) -> Arc<Background> {
113        Arc::new(Background::Deterministic {
114            executor: self.clone(),
115        })
116    }
117
118    pub fn build_foreground(self: &Arc<Self>, id: usize) -> Rc<Foreground> {
119        Rc::new(Foreground::Deterministic {
120            cx_id: id,
121            executor: self.clone(),
122        })
123    }
124
125    fn spawn_from_foreground(
126        &self,
127        cx_id: usize,
128        future: AnyLocalFuture,
129        main: bool,
130    ) -> AnyLocalTask {
131        let state = self.state.clone();
132        let unparker = self.parker.lock().unparker();
133        let (runnable, task) = async_task::spawn_local(future, move |runnable| {
134            let mut state = state.lock();
135            state
136                .scheduled_from_foreground
137                .entry(cx_id)
138                .or_default()
139                .push(ForegroundRunnable { runnable, main });
140            unparker.unpark();
141        });
142        runnable.schedule();
143        task
144    }
145
146    fn spawn(&self, future: AnyFuture) -> AnyTask {
147        let state = self.state.clone();
148        let unparker = self.parker.lock().unparker();
149        let (runnable, task) = async_task::spawn(future, move |runnable| {
150            let mut state = state.lock();
151            state.scheduled_from_background.push(runnable);
152            unparker.unpark();
153        });
154        runnable.schedule();
155        task
156    }
157
158    fn run<'a>(
159        &self,
160        cx_id: usize,
161        main_future: Pin<Box<dyn 'a + Future<Output = Box<dyn Any>>>>,
162    ) -> Box<dyn Any> {
163        use std::sync::atomic::{AtomicBool, Ordering::SeqCst};
164
165        let woken = Arc::new(AtomicBool::new(false));
166
167        let state = self.state.clone();
168        let unparker = self.parker.lock().unparker();
169        let (runnable, mut main_task) = unsafe {
170            async_task::spawn_unchecked(main_future, move |runnable| {
171                let mut state = state.lock();
172                state
173                    .scheduled_from_foreground
174                    .entry(cx_id)
175                    .or_default()
176                    .push(ForegroundRunnable {
177                        runnable,
178                        main: true,
179                    });
180                unparker.unpark();
181            })
182        };
183        runnable.schedule();
184
185        loop {
186            if let Some(result) = self.run_internal(woken.clone(), Some(&mut main_task)) {
187                return result;
188            }
189
190            if !woken.load(SeqCst) {
191                self.state.lock().will_park();
192            }
193
194            woken.store(false, SeqCst);
195            self.parker.lock().park();
196        }
197    }
198
199    pub fn run_until_parked(&self) {
200        use std::sync::atomic::AtomicBool;
201        let woken = Arc::new(AtomicBool::new(false));
202        self.run_internal(woken, None);
203    }
204
205    fn run_internal(
206        &self,
207        woken: Arc<std::sync::atomic::AtomicBool>,
208        mut main_task: Option<&mut AnyLocalTask>,
209    ) -> Option<Box<dyn Any>> {
210        use rand::prelude::*;
211        use std::sync::atomic::Ordering::SeqCst;
212
213        let unparker = self.parker.lock().unparker();
214        let waker = waker_fn::waker_fn(move || {
215            woken.store(true, SeqCst);
216            unparker.unpark();
217        });
218
219        let mut cx = Context::from_waker(&waker);
220        loop {
221            let mut state = self.state.lock();
222
223            if state.scheduled_from_foreground.is_empty()
224                && state.scheduled_from_background.is_empty()
225            {
226                return None;
227            }
228
229            if !state.scheduled_from_background.is_empty() && state.rng.gen() {
230                let background_len = state.scheduled_from_background.len();
231                let ix = state.rng.gen_range(0..background_len);
232                let runnable = state.scheduled_from_background.remove(ix);
233                drop(state);
234                runnable.run();
235            } else if !state.scheduled_from_foreground.is_empty() {
236                let available_cx_ids = state
237                    .scheduled_from_foreground
238                    .keys()
239                    .copied()
240                    .collect::<Vec<_>>();
241                let cx_id_to_run = *available_cx_ids.iter().choose(&mut state.rng).unwrap();
242                let scheduled_from_cx = state
243                    .scheduled_from_foreground
244                    .get_mut(&cx_id_to_run)
245                    .unwrap();
246                let foreground_runnable = scheduled_from_cx.remove(0);
247                if scheduled_from_cx.is_empty() {
248                    state.scheduled_from_foreground.remove(&cx_id_to_run);
249                }
250
251                drop(state);
252
253                foreground_runnable.runnable.run();
254                if let Some(main_task) = main_task.as_mut() {
255                    if foreground_runnable.main {
256                        if let Poll::Ready(result) = main_task.poll(&mut cx) {
257                            return Some(result);
258                        }
259                    }
260                }
261            }
262        }
263    }
264
265    fn block<F, T>(&self, future: &mut F, max_ticks: usize) -> Option<T>
266    where
267        F: Unpin + Future<Output = T>,
268    {
269        use rand::prelude::*;
270
271        let unparker = self.parker.lock().unparker();
272        let waker = waker_fn::waker_fn(move || {
273            unparker.unpark();
274        });
275
276        let mut cx = Context::from_waker(&waker);
277        for _ in 0..max_ticks {
278            let mut state = self.state.lock();
279            let runnable_count = state.scheduled_from_background.len();
280            let ix = state.rng.gen_range(0..=runnable_count);
281            if ix < state.scheduled_from_background.len() {
282                let runnable = state.scheduled_from_background.remove(ix);
283                drop(state);
284                runnable.run();
285            } else {
286                drop(state);
287                if let Poll::Ready(result) = future.poll(&mut cx) {
288                    return Some(result);
289                }
290                let mut state = self.state.lock();
291                if state.scheduled_from_background.is_empty() {
292                    state.will_park();
293                    drop(state);
294                    self.parker.lock().park();
295                }
296
297                continue;
298            }
299        }
300
301        None
302    }
303}
304
305#[cfg(any(test, feature = "test-support"))]
306impl DeterministicState {
307    fn will_park(&mut self) {
308        if self.forbid_parking {
309            let mut backtrace_message = String::new();
310            #[cfg(any(test, feature = "test-support"))]
311            if let Some(backtrace) = self.waiting_backtrace.as_mut() {
312                backtrace.resolve();
313                backtrace_message = format!(
314                    "\nbacktrace of waiting future:\n{:?}",
315                    util::CwdBacktrace(backtrace)
316                );
317            }
318
319            panic!(
320                "deterministic executor parked after a call to forbid_parking{}",
321                backtrace_message
322            );
323        }
324    }
325}
326
327impl Foreground {
328    pub fn platform(dispatcher: Arc<dyn platform::Dispatcher>) -> Result<Self> {
329        if dispatcher.is_main_thread() {
330            Ok(Self::Platform {
331                dispatcher,
332                _not_send_or_sync: PhantomData,
333            })
334        } else {
335            Err(anyhow!("must be constructed on main thread"))
336        }
337    }
338
339    pub fn spawn<T: 'static>(&self, future: impl Future<Output = T> + 'static) -> Task<T> {
340        let future = any_local_future(future);
341        let any_task = match self {
342            #[cfg(any(test, feature = "test-support"))]
343            Self::Deterministic { cx_id, executor } => {
344                executor.spawn_from_foreground(*cx_id, future, false)
345            }
346            Self::Platform { dispatcher, .. } => {
347                fn spawn_inner(
348                    future: AnyLocalFuture,
349                    dispatcher: &Arc<dyn Dispatcher>,
350                ) -> AnyLocalTask {
351                    let dispatcher = dispatcher.clone();
352                    let schedule =
353                        move |runnable: Runnable| dispatcher.run_on_main_thread(runnable);
354                    let (runnable, task) = async_task::spawn_local(future, schedule);
355                    runnable.schedule();
356                    task
357                }
358                spawn_inner(future, dispatcher)
359            }
360        };
361        Task::local(any_task)
362    }
363
364    #[cfg(any(test, feature = "test-support"))]
365    pub fn run<T: 'static>(&self, future: impl Future<Output = T>) -> T {
366        let future = async move { Box::new(future.await) as Box<dyn Any> }.boxed_local();
367        let result = match self {
368            Self::Deterministic { cx_id, executor } => executor.run(*cx_id, future),
369            Self::Platform { .. } => panic!("you can't call run on a platform foreground executor"),
370        };
371        *result.downcast().unwrap()
372    }
373
374    #[cfg(any(test, feature = "test-support"))]
375    pub fn run_until_parked(&self) {
376        match self {
377            Self::Deterministic { executor, .. } => executor.run_until_parked(),
378            _ => panic!("this method can only be called on a deterministic executor"),
379        }
380    }
381
382    #[cfg(any(test, feature = "test-support"))]
383    pub fn parking_forbidden(&self) -> bool {
384        match self {
385            Self::Deterministic { executor, .. } => executor.state.lock().forbid_parking,
386            _ => panic!("this method can only be called on a deterministic executor"),
387        }
388    }
389
390    #[cfg(any(test, feature = "test-support"))]
391    pub fn start_waiting(&self) {
392        match self {
393            Self::Deterministic { executor, .. } => {
394                executor.state.lock().waiting_backtrace =
395                    Some(backtrace::Backtrace::new_unresolved());
396            }
397            _ => panic!("this method can only be called on a deterministic executor"),
398        }
399    }
400
401    #[cfg(any(test, feature = "test-support"))]
402    pub fn finish_waiting(&self) {
403        match self {
404            Self::Deterministic { executor, .. } => {
405                executor.state.lock().waiting_backtrace.take();
406            }
407            _ => panic!("this method can only be called on a deterministic executor"),
408        }
409    }
410
411    #[cfg(any(test, feature = "test-support"))]
412    pub fn forbid_parking(&self) {
413        use rand::prelude::*;
414
415        match self {
416            Self::Deterministic { executor, .. } => {
417                let mut state = executor.state.lock();
418                state.forbid_parking = true;
419                state.rng = StdRng::seed_from_u64(state.seed);
420            }
421            _ => panic!("this method can only be called on a deterministic executor"),
422        }
423    }
424
425    pub async fn timer(&self, duration: Duration) {
426        match self {
427            #[cfg(any(test, feature = "test-support"))]
428            Self::Deterministic { executor, .. } => {
429                use postage::prelude::Stream as _;
430
431                let (tx, mut rx) = postage::barrier::channel();
432                let timer_id;
433                {
434                    let mut state = executor.state.lock();
435                    let wakeup_at = state.now + duration;
436                    timer_id = post_inc(&mut state.next_timer_id);
437                    state.pending_timers.push((timer_id, wakeup_at, tx));
438                }
439
440                struct DropTimer<'a>(usize, &'a Foreground);
441                impl<'a> Drop for DropTimer<'a> {
442                    fn drop(&mut self) {
443                        match self.1 {
444                            Foreground::Deterministic { executor, .. } => {
445                                executor
446                                    .state
447                                    .lock()
448                                    .pending_timers
449                                    .retain(|(timer_id, _, _)| *timer_id != self.0);
450                            }
451                            _ => unreachable!(),
452                        }
453                    }
454                }
455
456                let _guard = DropTimer(timer_id, self);
457                rx.recv().await;
458            }
459            _ => {
460                Timer::after(duration).await;
461            }
462        }
463    }
464
465    #[cfg(any(test, feature = "test-support"))]
466    pub fn advance_clock(&self, duration: Duration) {
467        match self {
468            Self::Deterministic { executor, .. } => {
469                executor.run_until_parked();
470
471                let mut state = executor.state.lock();
472                state.now += duration;
473                let now = state.now;
474                let mut pending_timers = mem::take(&mut state.pending_timers);
475                drop(state);
476
477                pending_timers.retain(|(_, wakeup, _)| *wakeup > now);
478                executor.state.lock().pending_timers.extend(pending_timers);
479            }
480            _ => panic!("this method can only be called on a deterministic executor"),
481        }
482    }
483
484    #[cfg(any(test, feature = "test-support"))]
485    pub fn set_block_on_ticks(&self, range: std::ops::RangeInclusive<usize>) {
486        match self {
487            Self::Deterministic { executor, .. } => executor.state.lock().block_on_ticks = range,
488            _ => panic!("this method can only be called on a deterministic executor"),
489        }
490    }
491}
492
493impl Background {
494    pub fn new() -> Self {
495        let executor = Arc::new(Executor::new());
496        let stop = channel::unbounded::<()>();
497
498        for i in 0..2 * num_cpus::get() {
499            let executor = executor.clone();
500            let stop = stop.1.clone();
501            thread::Builder::new()
502                .name(format!("background-executor-{}", i))
503                .spawn(move || smol::block_on(executor.run(stop.recv())))
504                .unwrap();
505        }
506
507        Self::Production {
508            executor,
509            _stop: stop.0,
510        }
511    }
512
513    pub fn num_cpus(&self) -> usize {
514        num_cpus::get()
515    }
516
517    pub fn spawn<T, F>(&self, future: F) -> Task<T>
518    where
519        T: 'static + Send,
520        F: Send + Future<Output = T> + 'static,
521    {
522        let future = any_future(future);
523        let any_task = match self {
524            Self::Production { executor, .. } => executor.spawn(future),
525            #[cfg(any(test, feature = "test-support"))]
526            Self::Deterministic { executor } => executor.spawn(future),
527        };
528        Task::send(any_task)
529    }
530
531    pub fn block<F, T>(&self, future: F) -> T
532    where
533        F: Future<Output = T>,
534    {
535        smol::pin!(future);
536        match self {
537            Self::Production { .. } => smol::block_on(&mut future),
538            #[cfg(any(test, feature = "test-support"))]
539            Self::Deterministic { executor, .. } => {
540                executor.block(&mut future, usize::MAX).unwrap()
541            }
542        }
543    }
544
545    pub fn block_with_timeout<F, T>(
546        &self,
547        timeout: Duration,
548        future: F,
549    ) -> Result<T, impl Future<Output = T>>
550    where
551        T: 'static,
552        F: 'static + Unpin + Future<Output = T>,
553    {
554        let mut future = any_local_future(future);
555        if !timeout.is_zero() {
556            let output = match self {
557                Self::Production { .. } => smol::block_on(util::timeout(timeout, &mut future)).ok(),
558                #[cfg(any(test, feature = "test-support"))]
559                Self::Deterministic { executor, .. } => {
560                    use rand::prelude::*;
561                    let max_ticks = {
562                        let mut state = executor.state.lock();
563                        let range = state.block_on_ticks.clone();
564                        state.rng.gen_range(range)
565                    };
566                    executor.block(&mut future, max_ticks)
567                }
568            };
569            if let Some(output) = output {
570                return Ok(*output.downcast().unwrap());
571            }
572        }
573        Err(async { *future.await.downcast().unwrap() })
574    }
575
576    pub async fn scoped<'scope, F>(&self, scheduler: F)
577    where
578        F: FnOnce(&mut Scope<'scope>),
579    {
580        let mut scope = Scope {
581            futures: Default::default(),
582            _phantom: PhantomData,
583        };
584        (scheduler)(&mut scope);
585        let spawned = scope
586            .futures
587            .into_iter()
588            .map(|f| self.spawn(f))
589            .collect::<Vec<_>>();
590        for task in spawned {
591            task.await;
592        }
593    }
594
595    #[cfg(any(test, feature = "test-support"))]
596    pub async fn simulate_random_delay(&self) {
597        use rand::prelude::*;
598        use smol::future::yield_now;
599
600        match self {
601            Self::Deterministic { executor, .. } => {
602                if executor.state.lock().rng.gen_bool(0.2) {
603                    let yields = executor.state.lock().rng.gen_range(1..=10);
604                    for _ in 0..yields {
605                        yield_now().await;
606                    }
607                }
608            }
609            _ => panic!("this method can only be called on a deterministic executor"),
610        }
611    }
612}
613
614pub struct Scope<'a> {
615    futures: Vec<Pin<Box<dyn Future<Output = ()> + Send + 'static>>>,
616    _phantom: PhantomData<&'a ()>,
617}
618
619impl<'a> Scope<'a> {
620    pub fn spawn<F>(&mut self, f: F)
621    where
622        F: Future<Output = ()> + Send + 'a,
623    {
624        let f = unsafe {
625            mem::transmute::<
626                Pin<Box<dyn Future<Output = ()> + Send + 'a>>,
627                Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
628            >(Box::pin(f))
629        };
630        self.futures.push(f);
631    }
632}
633
634impl<T> Task<T> {
635    pub fn ready(value: T) -> Self {
636        Self::Ready(Some(value))
637    }
638
639    fn local(any_task: AnyLocalTask) -> Self {
640        Self::Local {
641            any_task,
642            result_type: PhantomData,
643        }
644    }
645
646    pub fn detach(self) {
647        match self {
648            Task::Ready(_) => {}
649            Task::Local { any_task, .. } => any_task.detach(),
650            Task::Send { any_task, .. } => any_task.detach(),
651        }
652    }
653}
654
655impl<T: 'static, E: 'static + Display> Task<Result<T, E>> {
656    pub fn detach_and_log_err(self, cx: &mut MutableAppContext) {
657        cx.spawn(|_| async move {
658            if let Err(err) = self.await {
659                log::error!("{}", err);
660            }
661        })
662        .detach();
663    }
664}
665
666impl<T: Send> Task<T> {
667    fn send(any_task: AnyTask) -> Self {
668        Self::Send {
669            any_task,
670            result_type: PhantomData,
671        }
672    }
673}
674
675impl<T: fmt::Debug> fmt::Debug for Task<T> {
676    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
677        match self {
678            Task::Ready(value) => value.fmt(f),
679            Task::Local { any_task, .. } => any_task.fmt(f),
680            Task::Send { any_task, .. } => any_task.fmt(f),
681        }
682    }
683}
684
685impl<T: 'static> Future for Task<T> {
686    type Output = T;
687
688    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
689        match unsafe { self.get_unchecked_mut() } {
690            Task::Ready(value) => Poll::Ready(value.take().unwrap()),
691            Task::Local { any_task, .. } => {
692                any_task.poll(cx).map(|value| *value.downcast().unwrap())
693            }
694            Task::Send { any_task, .. } => {
695                any_task.poll(cx).map(|value| *value.downcast().unwrap())
696            }
697        }
698    }
699}
700
701fn any_future<T, F>(future: F) -> AnyFuture
702where
703    T: 'static + Send,
704    F: Future<Output = T> + Send + 'static,
705{
706    async { Box::new(future.await) as Box<dyn Any + Send> }.boxed()
707}
708
709fn any_local_future<T, F>(future: F) -> AnyLocalFuture
710where
711    T: 'static,
712    F: Future<Output = T> + 'static,
713{
714    async { Box::new(future.await) as Box<dyn Any> }.boxed_local()
715}