executor.rs

  1use anyhow::{anyhow, Result};
  2use async_task::Runnable;
  3use smol::{channel, prelude::*, Executor, Timer};
  4use std::{
  5    any::Any,
  6    fmt::{self, Display},
  7    marker::PhantomData,
  8    mem,
  9    pin::Pin,
 10    rc::Rc,
 11    sync::Arc,
 12    task::{Context, Poll},
 13    thread,
 14    time::Duration,
 15};
 16
 17use crate::{
 18    platform::{self, Dispatcher},
 19    util, MutableAppContext,
 20};
 21
 22pub enum Foreground {
 23    Platform {
 24        dispatcher: Arc<dyn platform::Dispatcher>,
 25        _not_send_or_sync: PhantomData<Rc<()>>,
 26    },
 27    #[cfg(any(test, feature = "test-support"))]
 28    Deterministic {
 29        cx_id: usize,
 30        executor: Arc<Deterministic>,
 31    },
 32}
 33
 34pub enum Background {
 35    #[cfg(any(test, feature = "test-support"))]
 36    Deterministic { executor: Arc<Deterministic> },
 37    Production {
 38        executor: Arc<smol::Executor<'static>>,
 39        _stop: channel::Sender<()>,
 40    },
 41}
 42
 43type AnyLocalFuture = Pin<Box<dyn 'static + Future<Output = Box<dyn Any + 'static>>>>;
 44type AnyFuture = Pin<Box<dyn 'static + Send + Future<Output = Box<dyn Any + Send + 'static>>>>;
 45type AnyTask = async_task::Task<Box<dyn Any + Send + 'static>>;
 46type AnyLocalTask = async_task::Task<Box<dyn Any + 'static>>;
 47
 48#[must_use]
 49pub enum Task<T> {
 50    Ready(Option<T>),
 51    Local {
 52        any_task: AnyLocalTask,
 53        result_type: PhantomData<T>,
 54    },
 55    Send {
 56        any_task: AnyTask,
 57        result_type: PhantomData<T>,
 58    },
 59}
 60
 61unsafe impl<T: Send> Send for Task<T> {}
 62
 63#[cfg(any(test, feature = "test-support"))]
 64struct DeterministicState {
 65    rng: rand::prelude::StdRng,
 66    seed: u64,
 67    scheduled_from_foreground: collections::HashMap<usize, Vec<ForegroundRunnable>>,
 68    scheduled_from_background: Vec<Runnable>,
 69    forbid_parking: bool,
 70    block_on_ticks: std::ops::RangeInclusive<usize>,
 71    now: std::time::Instant,
 72    pending_timers: Vec<(std::time::Instant, postage::barrier::Sender)>,
 73    waiting_backtrace: Option<backtrace::Backtrace>,
 74}
 75
 76#[cfg(any(test, feature = "test-support"))]
 77struct ForegroundRunnable {
 78    runnable: Runnable,
 79    main: bool,
 80}
 81
 82#[cfg(any(test, feature = "test-support"))]
 83pub struct Deterministic {
 84    state: Arc<parking_lot::Mutex<DeterministicState>>,
 85    parker: parking_lot::Mutex<parking::Parker>,
 86}
 87
 88#[cfg(any(test, feature = "test-support"))]
 89impl Deterministic {
 90    pub fn new(seed: u64) -> Arc<Self> {
 91        use rand::prelude::*;
 92
 93        Arc::new(Self {
 94            state: Arc::new(parking_lot::Mutex::new(DeterministicState {
 95                rng: StdRng::seed_from_u64(seed),
 96                seed,
 97                scheduled_from_foreground: Default::default(),
 98                scheduled_from_background: Default::default(),
 99                forbid_parking: false,
100                block_on_ticks: 0..=1000,
101                now: std::time::Instant::now(),
102                pending_timers: Default::default(),
103                waiting_backtrace: None,
104            })),
105            parker: Default::default(),
106        })
107    }
108
109    pub fn build_background(self: &Arc<Self>) -> Arc<Background> {
110        Arc::new(Background::Deterministic {
111            executor: self.clone(),
112        })
113    }
114
115    pub fn build_foreground(self: &Arc<Self>, id: usize) -> Rc<Foreground> {
116        Rc::new(Foreground::Deterministic {
117            cx_id: id,
118            executor: self.clone(),
119        })
120    }
121
122    fn spawn_from_foreground(
123        &self,
124        cx_id: usize,
125        future: AnyLocalFuture,
126        main: bool,
127    ) -> AnyLocalTask {
128        let state = self.state.clone();
129        let unparker = self.parker.lock().unparker();
130        let (runnable, task) = async_task::spawn_local(future, move |runnable| {
131            let mut state = state.lock();
132            state
133                .scheduled_from_foreground
134                .entry(cx_id)
135                .or_default()
136                .push(ForegroundRunnable { runnable, main });
137            unparker.unpark();
138        });
139        runnable.schedule();
140        task
141    }
142
143    fn spawn(&self, future: AnyFuture) -> AnyTask {
144        let state = self.state.clone();
145        let unparker = self.parker.lock().unparker();
146        let (runnable, task) = async_task::spawn(future, move |runnable| {
147            let mut state = state.lock();
148            state.scheduled_from_background.push(runnable);
149            unparker.unpark();
150        });
151        runnable.schedule();
152        task
153    }
154
155    fn run<'a>(
156        &self,
157        cx_id: usize,
158        main_future: Pin<Box<dyn 'a + Future<Output = Box<dyn Any>>>>,
159    ) -> Box<dyn Any> {
160        use std::sync::atomic::{AtomicBool, Ordering::SeqCst};
161
162        let woken = Arc::new(AtomicBool::new(false));
163
164        let state = self.state.clone();
165        let unparker = self.parker.lock().unparker();
166        let (runnable, mut main_task) = unsafe {
167            async_task::spawn_unchecked(main_future, move |runnable| {
168                let mut state = state.lock();
169                state
170                    .scheduled_from_foreground
171                    .entry(cx_id)
172                    .or_default()
173                    .push(ForegroundRunnable {
174                        runnable,
175                        main: true,
176                    });
177                unparker.unpark();
178            })
179        };
180        runnable.schedule();
181
182        loop {
183            if let Some(result) = self.run_internal(woken.clone(), Some(&mut main_task)) {
184                return result;
185            }
186
187            if !woken.load(SeqCst) {
188                self.state.lock().will_park();
189            }
190
191            woken.store(false, SeqCst);
192            self.parker.lock().park();
193        }
194    }
195
196    pub fn run_until_parked(&self) {
197        use std::sync::atomic::AtomicBool;
198        let woken = Arc::new(AtomicBool::new(false));
199        self.run_internal(woken, None);
200    }
201
202    fn run_internal(
203        &self,
204        woken: Arc<std::sync::atomic::AtomicBool>,
205        mut main_task: Option<&mut AnyLocalTask>,
206    ) -> Option<Box<dyn Any>> {
207        use rand::prelude::*;
208        use std::sync::atomic::Ordering::SeqCst;
209
210        let unparker = self.parker.lock().unparker();
211        let waker = waker_fn::waker_fn(move || {
212            woken.store(true, SeqCst);
213            unparker.unpark();
214        });
215
216        let mut cx = Context::from_waker(&waker);
217        loop {
218            let mut state = self.state.lock();
219
220            if state.scheduled_from_foreground.is_empty()
221                && state.scheduled_from_background.is_empty()
222            {
223                return None;
224            }
225
226            if !state.scheduled_from_background.is_empty() && state.rng.gen() {
227                let background_len = state.scheduled_from_background.len();
228                let ix = state.rng.gen_range(0..background_len);
229                let runnable = state.scheduled_from_background.remove(ix);
230                drop(state);
231                runnable.run();
232            } else if !state.scheduled_from_foreground.is_empty() {
233                let available_cx_ids = state
234                    .scheduled_from_foreground
235                    .keys()
236                    .copied()
237                    .collect::<Vec<_>>();
238                let cx_id_to_run = *available_cx_ids.iter().choose(&mut state.rng).unwrap();
239                let scheduled_from_cx = state
240                    .scheduled_from_foreground
241                    .get_mut(&cx_id_to_run)
242                    .unwrap();
243                let foreground_runnable = scheduled_from_cx.remove(0);
244                if scheduled_from_cx.is_empty() {
245                    state.scheduled_from_foreground.remove(&cx_id_to_run);
246                }
247
248                drop(state);
249
250                foreground_runnable.runnable.run();
251                if let Some(main_task) = main_task.as_mut() {
252                    if foreground_runnable.main {
253                        if let Poll::Ready(result) = main_task.poll(&mut cx) {
254                            return Some(result);
255                        }
256                    }
257                }
258            }
259        }
260    }
261
262    fn block<F, T>(&self, future: &mut F, max_ticks: usize) -> Option<T>
263    where
264        F: Unpin + Future<Output = T>,
265    {
266        use rand::prelude::*;
267
268        let unparker = self.parker.lock().unparker();
269        let waker = waker_fn::waker_fn(move || {
270            unparker.unpark();
271        });
272
273        let mut cx = Context::from_waker(&waker);
274        for _ in 0..max_ticks {
275            let mut state = self.state.lock();
276            let runnable_count = state.scheduled_from_background.len();
277            let ix = state.rng.gen_range(0..=runnable_count);
278            if ix < state.scheduled_from_background.len() {
279                let runnable = state.scheduled_from_background.remove(ix);
280                drop(state);
281                runnable.run();
282            } else {
283                drop(state);
284                if let Poll::Ready(result) = future.poll(&mut cx) {
285                    return Some(result);
286                }
287                let mut state = self.state.lock();
288                if state.scheduled_from_background.is_empty() {
289                    state.will_park();
290                    drop(state);
291                    self.parker.lock().park();
292                }
293
294                continue;
295            }
296        }
297
298        None
299    }
300}
301
302#[cfg(any(test, feature = "test-support"))]
303impl DeterministicState {
304    fn will_park(&mut self) {
305        if self.forbid_parking {
306            let mut backtrace_message = String::new();
307            #[cfg(any(test, feature = "test-support"))]
308            if let Some(backtrace) = self.waiting_backtrace.as_mut() {
309                backtrace.resolve();
310                backtrace_message = format!(
311                    "\nbacktrace of waiting future:\n{:?}",
312                    util::CwdBacktrace(backtrace)
313                );
314            }
315
316            panic!(
317                "deterministic executor parked after a call to forbid_parking{}",
318                backtrace_message
319            );
320        }
321    }
322}
323
324impl Foreground {
325    pub fn platform(dispatcher: Arc<dyn platform::Dispatcher>) -> Result<Self> {
326        if dispatcher.is_main_thread() {
327            Ok(Self::Platform {
328                dispatcher,
329                _not_send_or_sync: PhantomData,
330            })
331        } else {
332            Err(anyhow!("must be constructed on main thread"))
333        }
334    }
335
336    pub fn spawn<T: 'static>(&self, future: impl Future<Output = T> + 'static) -> Task<T> {
337        let future = any_local_future(future);
338        let any_task = match self {
339            #[cfg(any(test, feature = "test-support"))]
340            Self::Deterministic { cx_id, executor } => {
341                executor.spawn_from_foreground(*cx_id, future, false)
342            }
343            Self::Platform { dispatcher, .. } => {
344                fn spawn_inner(
345                    future: AnyLocalFuture,
346                    dispatcher: &Arc<dyn Dispatcher>,
347                ) -> AnyLocalTask {
348                    let dispatcher = dispatcher.clone();
349                    let schedule =
350                        move |runnable: Runnable| dispatcher.run_on_main_thread(runnable);
351                    let (runnable, task) = async_task::spawn_local(future, schedule);
352                    runnable.schedule();
353                    task
354                }
355                spawn_inner(future, dispatcher)
356            }
357        };
358        Task::local(any_task)
359    }
360
361    #[cfg(any(test, feature = "test-support"))]
362    pub fn run<T: 'static>(&self, future: impl Future<Output = T>) -> T {
363        let future = async move { Box::new(future.await) as Box<dyn Any> }.boxed_local();
364        let result = match self {
365            Self::Deterministic { cx_id, executor } => executor.run(*cx_id, future),
366            Self::Platform { .. } => panic!("you can't call run on a platform foreground executor"),
367        };
368        *result.downcast().unwrap()
369    }
370
371    #[cfg(any(test, feature = "test-support"))]
372    pub fn run_until_parked(&self) {
373        match self {
374            Self::Deterministic { executor, .. } => executor.run_until_parked(),
375            _ => panic!("this method can only be called on a deterministic executor"),
376        }
377    }
378
379    #[cfg(any(test, feature = "test-support"))]
380    pub fn parking_forbidden(&self) -> bool {
381        match self {
382            Self::Deterministic { executor, .. } => executor.state.lock().forbid_parking,
383            _ => panic!("this method can only be called on a deterministic executor"),
384        }
385    }
386
387    #[cfg(any(test, feature = "test-support"))]
388    pub fn start_waiting(&self) {
389        match self {
390            Self::Deterministic { executor, .. } => {
391                executor.state.lock().waiting_backtrace =
392                    Some(backtrace::Backtrace::new_unresolved());
393            }
394            _ => panic!("this method can only be called on a deterministic executor"),
395        }
396    }
397
398    #[cfg(any(test, feature = "test-support"))]
399    pub fn finish_waiting(&self) {
400        match self {
401            Self::Deterministic { executor, .. } => {
402                executor.state.lock().waiting_backtrace.take();
403            }
404            _ => panic!("this method can only be called on a deterministic executor"),
405        }
406    }
407
408    #[cfg(any(test, feature = "test-support"))]
409    pub fn forbid_parking(&self) {
410        use rand::prelude::*;
411
412        match self {
413            Self::Deterministic { executor, .. } => {
414                let mut state = executor.state.lock();
415                state.forbid_parking = true;
416                state.rng = StdRng::seed_from_u64(state.seed);
417            }
418            _ => panic!("this method can only be called on a deterministic executor"),
419        }
420    }
421
422    pub async fn timer(&self, duration: Duration) {
423        match self {
424            #[cfg(any(test, feature = "test-support"))]
425            Self::Deterministic { executor, .. } => {
426                use postage::prelude::Stream as _;
427
428                let (tx, mut rx) = postage::barrier::channel();
429                {
430                    let mut state = executor.state.lock();
431                    let wakeup_at = state.now + duration;
432                    state.pending_timers.push((wakeup_at, tx));
433                }
434                rx.recv().await;
435            }
436            _ => {
437                Timer::after(duration).await;
438            }
439        }
440    }
441
442    #[cfg(any(test, feature = "test-support"))]
443    pub fn advance_clock(&self, duration: Duration) {
444        match self {
445            Self::Deterministic { executor, .. } => {
446                executor.run_until_parked();
447
448                let mut state = executor.state.lock();
449                state.now += duration;
450                let now = state.now;
451                let mut pending_timers = mem::take(&mut state.pending_timers);
452                drop(state);
453
454                pending_timers.retain(|(wakeup, _)| *wakeup > now);
455                executor.state.lock().pending_timers.extend(pending_timers);
456            }
457            _ => panic!("this method can only be called on a deterministic executor"),
458        }
459    }
460
461    #[cfg(any(test, feature = "test-support"))]
462    pub fn set_block_on_ticks(&self, range: std::ops::RangeInclusive<usize>) {
463        match self {
464            Self::Deterministic { executor, .. } => executor.state.lock().block_on_ticks = range,
465            _ => panic!("this method can only be called on a deterministic executor"),
466        }
467    }
468}
469
470impl Background {
471    pub fn new() -> Self {
472        let executor = Arc::new(Executor::new());
473        let stop = channel::unbounded::<()>();
474
475        for i in 0..2 * num_cpus::get() {
476            let executor = executor.clone();
477            let stop = stop.1.clone();
478            thread::Builder::new()
479                .name(format!("background-executor-{}", i))
480                .spawn(move || smol::block_on(executor.run(stop.recv())))
481                .unwrap();
482        }
483
484        Self::Production {
485            executor,
486            _stop: stop.0,
487        }
488    }
489
490    pub fn num_cpus(&self) -> usize {
491        num_cpus::get()
492    }
493
494    pub fn spawn<T, F>(&self, future: F) -> Task<T>
495    where
496        T: 'static + Send,
497        F: Send + Future<Output = T> + 'static,
498    {
499        let future = any_future(future);
500        let any_task = match self {
501            Self::Production { executor, .. } => executor.spawn(future),
502            #[cfg(any(test, feature = "test-support"))]
503            Self::Deterministic { executor } => executor.spawn(future),
504        };
505        Task::send(any_task)
506    }
507
508    pub fn block<F, T>(&self, future: F) -> T
509    where
510        F: Future<Output = T>,
511    {
512        smol::pin!(future);
513        match self {
514            Self::Production { .. } => smol::block_on(&mut future),
515            #[cfg(any(test, feature = "test-support"))]
516            Self::Deterministic { executor, .. } => {
517                executor.block(&mut future, usize::MAX).unwrap()
518            }
519        }
520    }
521
522    pub fn block_with_timeout<F, T>(
523        &self,
524        timeout: Duration,
525        future: F,
526    ) -> Result<T, impl Future<Output = T>>
527    where
528        T: 'static,
529        F: 'static + Unpin + Future<Output = T>,
530    {
531        let mut future = any_local_future(future);
532        if !timeout.is_zero() {
533            let output = match self {
534                Self::Production { .. } => smol::block_on(util::timeout(timeout, &mut future)).ok(),
535                #[cfg(any(test, feature = "test-support"))]
536                Self::Deterministic { executor, .. } => {
537                    use rand::prelude::*;
538                    let max_ticks = {
539                        let mut state = executor.state.lock();
540                        let range = state.block_on_ticks.clone();
541                        state.rng.gen_range(range)
542                    };
543                    executor.block(&mut future, max_ticks)
544                }
545            };
546            if let Some(output) = output {
547                return Ok(*output.downcast().unwrap());
548            }
549        }
550        Err(async { *future.await.downcast().unwrap() })
551    }
552
553    pub async fn scoped<'scope, F>(&self, scheduler: F)
554    where
555        F: FnOnce(&mut Scope<'scope>),
556    {
557        let mut scope = Scope {
558            futures: Default::default(),
559            _phantom: PhantomData,
560        };
561        (scheduler)(&mut scope);
562        let spawned = scope
563            .futures
564            .into_iter()
565            .map(|f| self.spawn(f))
566            .collect::<Vec<_>>();
567        for task in spawned {
568            task.await;
569        }
570    }
571
572    #[cfg(any(test, feature = "test-support"))]
573    pub async fn simulate_random_delay(&self) {
574        use rand::prelude::*;
575        use smol::future::yield_now;
576
577        match self {
578            Self::Deterministic { executor, .. } => {
579                if executor.state.lock().rng.gen_bool(0.2) {
580                    let yields = executor.state.lock().rng.gen_range(1..=10);
581                    for _ in 0..yields {
582                        yield_now().await;
583                    }
584                }
585            }
586            _ => panic!("this method can only be called on a deterministic executor"),
587        }
588    }
589}
590
591pub struct Scope<'a> {
592    futures: Vec<Pin<Box<dyn Future<Output = ()> + Send + 'static>>>,
593    _phantom: PhantomData<&'a ()>,
594}
595
596impl<'a> Scope<'a> {
597    pub fn spawn<F>(&mut self, f: F)
598    where
599        F: Future<Output = ()> + Send + 'a,
600    {
601        let f = unsafe {
602            mem::transmute::<
603                Pin<Box<dyn Future<Output = ()> + Send + 'a>>,
604                Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
605            >(Box::pin(f))
606        };
607        self.futures.push(f);
608    }
609}
610
611impl<T> Task<T> {
612    pub fn ready(value: T) -> Self {
613        Self::Ready(Some(value))
614    }
615
616    fn local(any_task: AnyLocalTask) -> Self {
617        Self::Local {
618            any_task,
619            result_type: PhantomData,
620        }
621    }
622
623    pub fn detach(self) {
624        match self {
625            Task::Ready(_) => {}
626            Task::Local { any_task, .. } => any_task.detach(),
627            Task::Send { any_task, .. } => any_task.detach(),
628        }
629    }
630}
631
632impl<T: 'static, E: 'static + Display> Task<Result<T, E>> {
633    pub fn detach_and_log_err(self, cx: &mut MutableAppContext) {
634        cx.spawn(|_| async move {
635            if let Err(err) = self.await {
636                log::error!("{}", err);
637            }
638        })
639        .detach();
640    }
641}
642
643impl<T: Send> Task<T> {
644    fn send(any_task: AnyTask) -> Self {
645        Self::Send {
646            any_task,
647            result_type: PhantomData,
648        }
649    }
650}
651
652impl<T: fmt::Debug> fmt::Debug for Task<T> {
653    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
654        match self {
655            Task::Ready(value) => value.fmt(f),
656            Task::Local { any_task, .. } => any_task.fmt(f),
657            Task::Send { any_task, .. } => any_task.fmt(f),
658        }
659    }
660}
661
662impl<T: 'static> Future for Task<T> {
663    type Output = T;
664
665    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
666        match unsafe { self.get_unchecked_mut() } {
667            Task::Ready(value) => Poll::Ready(value.take().unwrap()),
668            Task::Local { any_task, .. } => {
669                any_task.poll(cx).map(|value| *value.downcast().unwrap())
670            }
671            Task::Send { any_task, .. } => {
672                any_task.poll(cx).map(|value| *value.downcast().unwrap())
673            }
674        }
675    }
676}
677
678fn any_future<T, F>(future: F) -> AnyFuture
679where
680    T: 'static + Send,
681    F: Future<Output = T> + Send + 'static,
682{
683    async { Box::new(future.await) as Box<dyn Any + Send> }.boxed()
684}
685
686fn any_local_future<T, F>(future: F) -> AnyLocalFuture
687where
688    T: 'static,
689    F: Future<Output = T> + 'static,
690{
691    async { Box::new(future.await) as Box<dyn Any> }.boxed_local()
692}