executor.rs

  1use anyhow::{anyhow, Result};
  2use async_task::Runnable;
  3use futures::channel::mpsc;
  4use smol::{channel, prelude::*, Executor};
  5use std::{
  6    any::Any,
  7    fmt::{self, Display, Write as _},
  8    marker::PhantomData,
  9    mem,
 10    pin::Pin,
 11    rc::Rc,
 12    sync::Arc,
 13    task::{Context, Poll},
 14    thread,
 15    time::Duration,
 16};
 17
 18use crate::{
 19    platform::{self, Dispatcher},
 20    util::{self, CwdBacktrace},
 21    MutableAppContext,
 22};
 23
 24pub enum Foreground {
 25    Platform {
 26        dispatcher: Arc<dyn platform::Dispatcher>,
 27        _not_send_or_sync: PhantomData<Rc<()>>,
 28    },
 29    #[cfg(any(test, feature = "test-support"))]
 30    Deterministic {
 31        cx_id: usize,
 32        executor: Arc<Deterministic>,
 33    },
 34}
 35
 36pub enum Background {
 37    #[cfg(any(test, feature = "test-support"))]
 38    Deterministic { executor: Arc<Deterministic> },
 39    Production {
 40        executor: Arc<smol::Executor<'static>>,
 41        _stop: channel::Sender<()>,
 42    },
 43}
 44
 45type AnyLocalFuture = Pin<Box<dyn 'static + Future<Output = Box<dyn Any + 'static>>>>;
 46type AnyFuture = Pin<Box<dyn 'static + Send + Future<Output = Box<dyn Any + Send + 'static>>>>;
 47type AnyTask = async_task::Task<Box<dyn Any + Send + 'static>>;
 48type AnyLocalTask = async_task::Task<Box<dyn Any + 'static>>;
 49
 50#[must_use]
 51pub enum Task<T> {
 52    Ready(Option<T>),
 53    Local {
 54        any_task: AnyLocalTask,
 55        result_type: PhantomData<T>,
 56    },
 57    Send {
 58        any_task: AnyTask,
 59        result_type: PhantomData<T>,
 60    },
 61}
 62
 63unsafe impl<T: Send> Send for Task<T> {}
 64
 65#[cfg(any(test, feature = "test-support"))]
 66struct DeterministicState {
 67    rng: rand::prelude::StdRng,
 68    seed: u64,
 69    scheduled_from_foreground: collections::HashMap<usize, Vec<ForegroundRunnable>>,
 70    scheduled_from_background: Vec<BackgroundRunnable>,
 71    forbid_parking: bool,
 72    block_on_ticks: std::ops::RangeInclusive<usize>,
 73    now: std::time::Instant,
 74    next_timer_id: usize,
 75    pending_timers: Vec<(usize, std::time::Instant, postage::barrier::Sender)>,
 76    waiting_backtrace: Option<backtrace::Backtrace>,
 77    next_runnable_id: usize,
 78    poll_history: Vec<ExecutorEvent>,
 79    previous_poll_history: Option<Vec<ExecutorEvent>>,
 80    enable_runnable_backtraces: bool,
 81    runnable_backtraces: collections::HashMap<usize, backtrace::Backtrace>,
 82}
 83
 84#[derive(Copy, Clone, Debug, PartialEq, Eq)]
 85pub enum ExecutorEvent {
 86    PollRunnable { id: usize },
 87    EnqueuRunnable { id: usize },
 88}
 89
 90#[cfg(any(test, feature = "test-support"))]
 91struct ForegroundRunnable {
 92    id: usize,
 93    runnable: Runnable,
 94    main: bool,
 95}
 96
 97#[cfg(any(test, feature = "test-support"))]
 98struct BackgroundRunnable {
 99    id: usize,
100    runnable: Runnable,
101}
102
103#[cfg(any(test, feature = "test-support"))]
104pub struct Deterministic {
105    state: Arc<parking_lot::Mutex<DeterministicState>>,
106    parker: parking_lot::Mutex<parking::Parker>,
107}
108
109pub enum Timer {
110    Production(smol::Timer),
111    #[cfg(any(test, feature = "test-support"))]
112    Deterministic(DeterministicTimer),
113}
114
115#[cfg(any(test, feature = "test-support"))]
116pub struct DeterministicTimer {
117    rx: postage::barrier::Receiver,
118    id: usize,
119    state: Arc<parking_lot::Mutex<DeterministicState>>,
120}
121
122#[cfg(any(test, feature = "test-support"))]
123impl Deterministic {
124    pub fn new(seed: u64) -> Arc<Self> {
125        use rand::prelude::*;
126
127        Arc::new(Self {
128            state: Arc::new(parking_lot::Mutex::new(DeterministicState {
129                rng: StdRng::seed_from_u64(seed),
130                seed,
131                scheduled_from_foreground: Default::default(),
132                scheduled_from_background: Default::default(),
133                forbid_parking: false,
134                block_on_ticks: 0..=1000,
135                now: std::time::Instant::now(),
136                next_timer_id: Default::default(),
137                pending_timers: Default::default(),
138                waiting_backtrace: None,
139                next_runnable_id: 0,
140                poll_history: Default::default(),
141                previous_poll_history: Default::default(),
142                enable_runnable_backtraces: false,
143                runnable_backtraces: Default::default(),
144            })),
145            parker: Default::default(),
146        })
147    }
148
149    pub fn execution_history(&self) -> Vec<ExecutorEvent> {
150        self.state.lock().poll_history.clone()
151    }
152
153    pub fn set_previous_execution_history(&self, history: Option<Vec<ExecutorEvent>>) {
154        self.state.lock().previous_poll_history = history;
155    }
156
157    pub fn enable_runnable_backtrace(&self) {
158        self.state.lock().enable_runnable_backtraces = true;
159    }
160
161    pub fn runnable_backtrace(&self, runnable_id: usize) -> backtrace::Backtrace {
162        let mut backtrace = self.state.lock().runnable_backtraces[&runnable_id].clone();
163        backtrace.resolve();
164        backtrace
165    }
166
167    pub fn build_background(self: &Arc<Self>) -> Arc<Background> {
168        Arc::new(Background::Deterministic {
169            executor: self.clone(),
170        })
171    }
172
173    pub fn build_foreground(self: &Arc<Self>, id: usize) -> Rc<Foreground> {
174        Rc::new(Foreground::Deterministic {
175            cx_id: id,
176            executor: self.clone(),
177        })
178    }
179
180    fn spawn_from_foreground(
181        &self,
182        cx_id: usize,
183        future: AnyLocalFuture,
184        main: bool,
185    ) -> AnyLocalTask {
186        let state = self.state.clone();
187        let id;
188        {
189            let mut state = state.lock();
190            id = util::post_inc(&mut state.next_runnable_id);
191            if state.enable_runnable_backtraces {
192                state
193                    .runnable_backtraces
194                    .insert(id, backtrace::Backtrace::new_unresolved());
195            }
196        }
197
198        let unparker = self.parker.lock().unparker();
199        let (runnable, task) = async_task::spawn_local(future, move |runnable| {
200            let mut state = state.lock();
201            state.push_to_history(ExecutorEvent::EnqueuRunnable { id });
202            state
203                .scheduled_from_foreground
204                .entry(cx_id)
205                .or_default()
206                .push(ForegroundRunnable { id, runnable, main });
207            unparker.unpark();
208        });
209        runnable.schedule();
210        task
211    }
212
213    fn spawn(&self, future: AnyFuture) -> AnyTask {
214        let state = self.state.clone();
215        let id;
216        {
217            let mut state = state.lock();
218            id = util::post_inc(&mut state.next_runnable_id);
219            if state.enable_runnable_backtraces {
220                state
221                    .runnable_backtraces
222                    .insert(id, backtrace::Backtrace::new_unresolved());
223            }
224        }
225
226        let unparker = self.parker.lock().unparker();
227        let (runnable, task) = async_task::spawn(future, move |runnable| {
228            let mut state = state.lock();
229            state
230                .poll_history
231                .push(ExecutorEvent::EnqueuRunnable { id });
232            state
233                .scheduled_from_background
234                .push(BackgroundRunnable { id, runnable });
235            unparker.unpark();
236        });
237        runnable.schedule();
238        task
239    }
240
241    fn run<'a>(
242        &self,
243        cx_id: usize,
244        main_future: Pin<Box<dyn 'a + Future<Output = Box<dyn Any>>>>,
245    ) -> Box<dyn Any> {
246        use std::sync::atomic::{AtomicBool, Ordering::SeqCst};
247
248        let woken = Arc::new(AtomicBool::new(false));
249
250        let state = self.state.clone();
251        let id;
252        {
253            let mut state = state.lock();
254            id = util::post_inc(&mut state.next_runnable_id);
255            if state.enable_runnable_backtraces {
256                state
257                    .runnable_backtraces
258                    .insert(id, backtrace::Backtrace::new_unresolved());
259            }
260        }
261
262        let unparker = self.parker.lock().unparker();
263        let (runnable, mut main_task) = unsafe {
264            async_task::spawn_unchecked(main_future, move |runnable| {
265                let state = &mut *state.lock();
266                state
267                    .scheduled_from_foreground
268                    .entry(cx_id)
269                    .or_default()
270                    .push(ForegroundRunnable {
271                        id: util::post_inc(&mut state.next_runnable_id),
272                        runnable,
273                        main: true,
274                    });
275                unparker.unpark();
276            })
277        };
278        runnable.schedule();
279
280        loop {
281            if let Some(result) = self.run_internal(woken.clone(), Some(&mut main_task)) {
282                return result;
283            }
284
285            if !woken.load(SeqCst) {
286                self.state.lock().will_park();
287            }
288
289            woken.store(false, SeqCst);
290            self.parker.lock().park();
291        }
292    }
293
294    pub fn run_until_parked(&self) {
295        use std::sync::atomic::AtomicBool;
296        let woken = Arc::new(AtomicBool::new(false));
297        self.run_internal(woken, None);
298    }
299
300    fn run_internal(
301        &self,
302        woken: Arc<std::sync::atomic::AtomicBool>,
303        mut main_task: Option<&mut AnyLocalTask>,
304    ) -> Option<Box<dyn Any>> {
305        use rand::prelude::*;
306        use std::sync::atomic::Ordering::SeqCst;
307
308        let unparker = self.parker.lock().unparker();
309        let waker = waker_fn::waker_fn(move || {
310            woken.store(true, SeqCst);
311            unparker.unpark();
312        });
313
314        let mut cx = Context::from_waker(&waker);
315        loop {
316            let mut state = self.state.lock();
317
318            if state.scheduled_from_foreground.is_empty()
319                && state.scheduled_from_background.is_empty()
320            {
321                if let Some(main_task) = main_task {
322                    if let Poll::Ready(result) = main_task.poll(&mut cx) {
323                        return Some(result);
324                    }
325                }
326
327                return None;
328            }
329
330            if !state.scheduled_from_background.is_empty() && state.rng.gen() {
331                let background_len = state.scheduled_from_background.len();
332                let ix = state.rng.gen_range(0..background_len);
333                let background_runnable = state.scheduled_from_background.remove(ix);
334                state.push_to_history(ExecutorEvent::PollRunnable {
335                    id: background_runnable.id,
336                });
337                drop(state);
338                background_runnable.runnable.run();
339            } else if !state.scheduled_from_foreground.is_empty() {
340                let available_cx_ids = state
341                    .scheduled_from_foreground
342                    .keys()
343                    .copied()
344                    .collect::<Vec<_>>();
345                let cx_id_to_run = *available_cx_ids.iter().choose(&mut state.rng).unwrap();
346                let scheduled_from_cx = state
347                    .scheduled_from_foreground
348                    .get_mut(&cx_id_to_run)
349                    .unwrap();
350                let foreground_runnable = scheduled_from_cx.remove(0);
351                if scheduled_from_cx.is_empty() {
352                    state.scheduled_from_foreground.remove(&cx_id_to_run);
353                }
354                state.push_to_history(ExecutorEvent::PollRunnable {
355                    id: foreground_runnable.id,
356                });
357
358                drop(state);
359
360                foreground_runnable.runnable.run();
361                if let Some(main_task) = main_task.as_mut() {
362                    if foreground_runnable.main {
363                        if let Poll::Ready(result) = main_task.poll(&mut cx) {
364                            return Some(result);
365                        }
366                    }
367                }
368            }
369        }
370    }
371
372    fn block<F, T>(&self, future: &mut F, max_ticks: usize) -> Option<T>
373    where
374        F: Unpin + Future<Output = T>,
375    {
376        use rand::prelude::*;
377
378        let unparker = self.parker.lock().unparker();
379        let waker = waker_fn::waker_fn(move || {
380            unparker.unpark();
381        });
382
383        let mut cx = Context::from_waker(&waker);
384        for _ in 0..max_ticks {
385            let mut state = self.state.lock();
386            let runnable_count = state.scheduled_from_background.len();
387            let ix = state.rng.gen_range(0..=runnable_count);
388            if ix < state.scheduled_from_background.len() {
389                let background_runnable = state.scheduled_from_background.remove(ix);
390                state.push_to_history(ExecutorEvent::PollRunnable {
391                    id: background_runnable.id,
392                });
393                drop(state);
394                background_runnable.runnable.run();
395            } else {
396                drop(state);
397                if let Poll::Ready(result) = future.poll(&mut cx) {
398                    return Some(result);
399                }
400                let mut state = self.state.lock();
401                if state.scheduled_from_background.is_empty() {
402                    state.will_park();
403                    drop(state);
404                    self.parker.lock().park();
405                }
406
407                continue;
408            }
409        }
410
411        None
412    }
413
414    pub fn timer(&self, duration: Duration) -> Timer {
415        let (tx, rx) = postage::barrier::channel();
416        let mut state = self.state.lock();
417        let wakeup_at = state.now + duration;
418        let id = util::post_inc(&mut state.next_timer_id);
419        match state
420            .pending_timers
421            .binary_search_by_key(&wakeup_at, |e| e.1)
422        {
423            Ok(ix) | Err(ix) => state.pending_timers.insert(ix, (id, wakeup_at, tx)),
424        }
425        let state = self.state.clone();
426        Timer::Deterministic(DeterministicTimer { rx, id, state })
427    }
428
429    pub fn now(&self) -> std::time::Instant {
430        let state = self.state.lock();
431        state.now
432    }
433
434    pub fn advance_clock(&self, duration: Duration) {
435        let new_now = self.state.lock().now + duration;
436        loop {
437            self.run_until_parked();
438            let mut state = self.state.lock();
439
440            if let Some((_, wakeup_time, _)) = state.pending_timers.first() {
441                let wakeup_time = *wakeup_time;
442                if wakeup_time <= new_now {
443                    let timer_count = state
444                        .pending_timers
445                        .iter()
446                        .take_while(|(_, t, _)| *t == wakeup_time)
447                        .count();
448                    state.now = wakeup_time;
449                    let timers_to_wake = state
450                        .pending_timers
451                        .drain(0..timer_count)
452                        .collect::<Vec<_>>();
453                    drop(state);
454                    drop(timers_to_wake);
455                    continue;
456                }
457            }
458
459            break;
460        }
461
462        self.state.lock().now = new_now;
463    }
464
465    pub fn start_waiting(&self) {
466        self.state.lock().waiting_backtrace = Some(backtrace::Backtrace::new_unresolved());
467    }
468
469    pub fn finish_waiting(&self) {
470        self.state.lock().waiting_backtrace.take();
471    }
472
473    pub fn forbid_parking(&self) {
474        use rand::prelude::*;
475
476        let mut state = self.state.lock();
477        state.forbid_parking = true;
478        state.rng = StdRng::seed_from_u64(state.seed);
479    }
480
481    pub async fn simulate_random_delay(&self) {
482        use rand::prelude::*;
483        use smol::future::yield_now;
484        if self.state.lock().rng.gen_bool(0.2) {
485            let yields = self.state.lock().rng.gen_range(1..=10);
486            for _ in 0..yields {
487                yield_now().await;
488            }
489        }
490    }
491
492    pub fn record_backtrace(&self) {
493        let mut state = self.state.lock();
494        if state.enable_runnable_backtraces {
495            let current_id = state
496                .poll_history
497                .iter()
498                .rev()
499                .find_map(|event| match event {
500                    ExecutorEvent::PollRunnable { id } => Some(*id),
501                    _ => None,
502                });
503            if let Some(id) = current_id {
504                state
505                    .runnable_backtraces
506                    .insert(id, backtrace::Backtrace::new_unresolved());
507            }
508        }
509    }
510}
511
512impl Drop for Timer {
513    fn drop(&mut self) {
514        #[cfg(any(test, feature = "test-support"))]
515        if let Timer::Deterministic(DeterministicTimer { state, id, .. }) = self {
516            state
517                .lock()
518                .pending_timers
519                .retain(|(timer_id, _, _)| timer_id != id)
520        }
521    }
522}
523
524impl Future for Timer {
525    type Output = ();
526
527    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
528        match &mut *self {
529            #[cfg(any(test, feature = "test-support"))]
530            Self::Deterministic(DeterministicTimer { rx, .. }) => {
531                use postage::stream::{PollRecv, Stream as _};
532                smol::pin!(rx);
533                match rx.poll_recv(&mut postage::Context::from_waker(cx.waker())) {
534                    PollRecv::Ready(()) | PollRecv::Closed => Poll::Ready(()),
535                    PollRecv::Pending => Poll::Pending,
536                }
537            }
538            Self::Production(timer) => {
539                smol::pin!(timer);
540                match timer.poll(cx) {
541                    Poll::Ready(_) => Poll::Ready(()),
542                    Poll::Pending => Poll::Pending,
543                }
544            }
545        }
546    }
547}
548
549#[cfg(any(test, feature = "test-support"))]
550impl DeterministicState {
551    fn push_to_history(&mut self, event: ExecutorEvent) {
552        self.poll_history.push(event);
553        if let Some(prev_history) = &self.previous_poll_history {
554            let ix = self.poll_history.len() - 1;
555            let prev_event = prev_history[ix];
556            if event != prev_event {
557                let mut message = String::new();
558                writeln!(
559                    &mut message,
560                    "current runnable backtrace:\n{:?}",
561                    self.runnable_backtraces.get_mut(&event.id()).map(|trace| {
562                        trace.resolve();
563                        CwdBacktrace(trace)
564                    })
565                )
566                .unwrap();
567                writeln!(
568                    &mut message,
569                    "previous runnable backtrace:\n{:?}",
570                    self.runnable_backtraces
571                        .get_mut(&prev_event.id())
572                        .map(|trace| {
573                            trace.resolve();
574                            CwdBacktrace(trace)
575                        })
576                )
577                .unwrap();
578                panic!("detected non-determinism after {ix}. {message}");
579            }
580        }
581    }
582
583    fn will_park(&mut self) {
584        if self.forbid_parking {
585            let mut backtrace_message = String::new();
586            #[cfg(any(test, feature = "test-support"))]
587            if let Some(backtrace) = self.waiting_backtrace.as_mut() {
588                backtrace.resolve();
589                backtrace_message = format!(
590                    "\nbacktrace of waiting future:\n{:?}",
591                    util::CwdBacktrace(backtrace)
592                );
593            }
594
595            panic!(
596                "deterministic executor parked after a call to forbid_parking{}",
597                backtrace_message
598            );
599        }
600    }
601}
602
603#[cfg(any(test, feature = "test-support"))]
604impl ExecutorEvent {
605    pub fn id(&self) -> usize {
606        match self {
607            ExecutorEvent::PollRunnable { id } => *id,
608            ExecutorEvent::EnqueuRunnable { id } => *id,
609        }
610    }
611}
612
613impl Foreground {
614    pub fn platform(dispatcher: Arc<dyn platform::Dispatcher>) -> Result<Self> {
615        if dispatcher.is_main_thread() {
616            Ok(Self::Platform {
617                dispatcher,
618                _not_send_or_sync: PhantomData,
619            })
620        } else {
621            Err(anyhow!("must be constructed on main thread"))
622        }
623    }
624
625    pub fn spawn<T: 'static>(&self, future: impl Future<Output = T> + 'static) -> Task<T> {
626        let future = any_local_future(future);
627        let any_task = match self {
628            #[cfg(any(test, feature = "test-support"))]
629            Self::Deterministic { cx_id, executor } => {
630                executor.spawn_from_foreground(*cx_id, future, false)
631            }
632            Self::Platform { dispatcher, .. } => {
633                fn spawn_inner(
634                    future: AnyLocalFuture,
635                    dispatcher: &Arc<dyn Dispatcher>,
636                ) -> AnyLocalTask {
637                    let dispatcher = dispatcher.clone();
638                    let schedule =
639                        move |runnable: Runnable| dispatcher.run_on_main_thread(runnable);
640                    let (runnable, task) = async_task::spawn_local(future, schedule);
641                    runnable.schedule();
642                    task
643                }
644                spawn_inner(future, dispatcher)
645            }
646        };
647        Task::local(any_task)
648    }
649
650    #[cfg(any(test, feature = "test-support"))]
651    pub fn run<T: 'static>(&self, future: impl Future<Output = T>) -> T {
652        let future = async move { Box::new(future.await) as Box<dyn Any> }.boxed_local();
653        let result = match self {
654            Self::Deterministic { cx_id, executor } => executor.run(*cx_id, future),
655            Self::Platform { .. } => panic!("you can't call run on a platform foreground executor"),
656        };
657        *result.downcast().unwrap()
658    }
659
660    #[cfg(any(test, feature = "test-support"))]
661    pub fn run_until_parked(&self) {
662        match self {
663            Self::Deterministic { executor, .. } => executor.run_until_parked(),
664            _ => panic!("this method can only be called on a deterministic executor"),
665        }
666    }
667
668    #[cfg(any(test, feature = "test-support"))]
669    pub fn parking_forbidden(&self) -> bool {
670        match self {
671            Self::Deterministic { executor, .. } => executor.state.lock().forbid_parking,
672            _ => panic!("this method can only be called on a deterministic executor"),
673        }
674    }
675
676    #[cfg(any(test, feature = "test-support"))]
677    pub fn start_waiting(&self) {
678        match self {
679            Self::Deterministic { executor, .. } => executor.start_waiting(),
680            _ => panic!("this method can only be called on a deterministic executor"),
681        }
682    }
683
684    #[cfg(any(test, feature = "test-support"))]
685    pub fn finish_waiting(&self) {
686        match self {
687            Self::Deterministic { executor, .. } => executor.finish_waiting(),
688            _ => panic!("this method can only be called on a deterministic executor"),
689        }
690    }
691
692    #[cfg(any(test, feature = "test-support"))]
693    pub fn forbid_parking(&self) {
694        match self {
695            Self::Deterministic { executor, .. } => executor.forbid_parking(),
696            _ => panic!("this method can only be called on a deterministic executor"),
697        }
698    }
699
700    #[cfg(any(test, feature = "test-support"))]
701    pub fn advance_clock(&self, duration: Duration) {
702        match self {
703            Self::Deterministic { executor, .. } => executor.advance_clock(duration),
704            _ => panic!("this method can only be called on a deterministic executor"),
705        }
706    }
707
708    #[cfg(any(test, feature = "test-support"))]
709    pub fn set_block_on_ticks(&self, range: std::ops::RangeInclusive<usize>) {
710        match self {
711            Self::Deterministic { executor, .. } => executor.state.lock().block_on_ticks = range,
712            _ => panic!("this method can only be called on a deterministic executor"),
713        }
714    }
715}
716
717impl Background {
718    pub fn new() -> Self {
719        let executor = Arc::new(Executor::new());
720        let stop = channel::unbounded::<()>();
721
722        for i in 0..2 * num_cpus::get() {
723            let executor = executor.clone();
724            let stop = stop.1.clone();
725            thread::Builder::new()
726                .name(format!("background-executor-{}", i))
727                .spawn(move || smol::block_on(executor.run(stop.recv())))
728                .unwrap();
729        }
730
731        Self::Production {
732            executor,
733            _stop: stop.0,
734        }
735    }
736
737    pub fn num_cpus(&self) -> usize {
738        num_cpus::get()
739    }
740
741    pub fn spawn<T, F>(&self, future: F) -> Task<T>
742    where
743        T: 'static + Send,
744        F: Send + Future<Output = T> + 'static,
745    {
746        let future = any_future(future);
747        let any_task = match self {
748            Self::Production { executor, .. } => executor.spawn(future),
749            #[cfg(any(test, feature = "test-support"))]
750            Self::Deterministic { executor } => executor.spawn(future),
751        };
752        Task::send(any_task)
753    }
754
755    pub fn block<F, T>(&self, future: F) -> T
756    where
757        F: Future<Output = T>,
758    {
759        smol::pin!(future);
760        match self {
761            Self::Production { .. } => smol::block_on(&mut future),
762            #[cfg(any(test, feature = "test-support"))]
763            Self::Deterministic { executor, .. } => {
764                executor.block(&mut future, usize::MAX).unwrap()
765            }
766        }
767    }
768
769    pub fn block_with_timeout<F, T>(
770        &self,
771        timeout: Duration,
772        future: F,
773    ) -> Result<T, impl Future<Output = T>>
774    where
775        T: 'static,
776        F: 'static + Unpin + Future<Output = T>,
777    {
778        let mut future = any_local_future(future);
779        if !timeout.is_zero() {
780            let output = match self {
781                Self::Production { .. } => smol::block_on(util::timeout(timeout, &mut future)).ok(),
782                #[cfg(any(test, feature = "test-support"))]
783                Self::Deterministic { executor, .. } => {
784                    use rand::prelude::*;
785                    let max_ticks = {
786                        let mut state = executor.state.lock();
787                        let range = state.block_on_ticks.clone();
788                        state.rng.gen_range(range)
789                    };
790                    executor.block(&mut future, max_ticks)
791                }
792            };
793            if let Some(output) = output {
794                return Ok(*output.downcast().unwrap());
795            }
796        }
797        Err(async { *future.await.downcast().unwrap() })
798    }
799
800    pub async fn scoped<'scope, F>(self: &Arc<Self>, scheduler: F)
801    where
802        F: FnOnce(&mut Scope<'scope>),
803    {
804        let mut scope = Scope::new(self.clone());
805        (scheduler)(&mut scope);
806        let spawned = mem::take(&mut scope.futures)
807            .into_iter()
808            .map(|f| self.spawn(f))
809            .collect::<Vec<_>>();
810        for task in spawned {
811            task.await;
812        }
813    }
814
815    pub fn timer(&self, duration: Duration) -> Timer {
816        match self {
817            Background::Production { .. } => Timer::Production(smol::Timer::after(duration)),
818            #[cfg(any(test, feature = "test-support"))]
819            Background::Deterministic { executor } => executor.timer(duration),
820        }
821    }
822
823    pub fn now(&self) -> std::time::Instant {
824        match self {
825            Background::Production { .. } => std::time::Instant::now(),
826            #[cfg(any(test, feature = "test-support"))]
827            Background::Deterministic { executor } => executor.now(),
828        }
829    }
830
831    #[cfg(any(test, feature = "test-support"))]
832    pub async fn simulate_random_delay(&self) {
833        match self {
834            Self::Deterministic { executor, .. } => {
835                executor.simulate_random_delay().await;
836            }
837            _ => {
838                panic!("this method can only be called on a deterministic executor")
839            }
840        }
841    }
842
843    #[cfg(any(test, feature = "test-support"))]
844    pub fn record_backtrace(&self) {
845        match self {
846            Self::Deterministic { executor, .. } => executor.record_backtrace(),
847            _ => {
848                panic!("this method can only be called on a deterministic executor")
849            }
850        }
851    }
852}
853
854impl Default for Background {
855    fn default() -> Self {
856        Self::new()
857    }
858}
859
860pub struct Scope<'a> {
861    executor: Arc<Background>,
862    futures: Vec<Pin<Box<dyn Future<Output = ()> + Send + 'static>>>,
863    tx: Option<mpsc::Sender<()>>,
864    rx: mpsc::Receiver<()>,
865    _phantom: PhantomData<&'a ()>,
866}
867
868impl<'a> Scope<'a> {
869    fn new(executor: Arc<Background>) -> Self {
870        let (tx, rx) = mpsc::channel(1);
871        Self {
872            executor,
873            tx: Some(tx),
874            rx,
875            futures: Default::default(),
876            _phantom: PhantomData,
877        }
878    }
879
880    pub fn spawn<F>(&mut self, f: F)
881    where
882        F: Future<Output = ()> + Send + 'a,
883    {
884        let tx = self.tx.clone().unwrap();
885
886        // Safety: The 'a lifetime is guaranteed to outlive any of these futures because
887        // dropping this `Scope` blocks until all of the futures have resolved.
888        let f = unsafe {
889            mem::transmute::<
890                Pin<Box<dyn Future<Output = ()> + Send + 'a>>,
891                Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
892            >(Box::pin(async move {
893                f.await;
894                drop(tx);
895            }))
896        };
897        self.futures.push(f);
898    }
899}
900
901impl<'a> Drop for Scope<'a> {
902    fn drop(&mut self) {
903        self.tx.take().unwrap();
904
905        // Wait until the channel is closed, which means that all of the spawned
906        // futures have resolved.
907        self.executor.block(self.rx.next());
908    }
909}
910
911impl<T> Task<T> {
912    pub fn ready(value: T) -> Self {
913        Self::Ready(Some(value))
914    }
915
916    fn local(any_task: AnyLocalTask) -> Self {
917        Self::Local {
918            any_task,
919            result_type: PhantomData,
920        }
921    }
922
923    pub fn detach(self) {
924        match self {
925            Task::Ready(_) => {}
926            Task::Local { any_task, .. } => any_task.detach(),
927            Task::Send { any_task, .. } => any_task.detach(),
928        }
929    }
930}
931
932impl<T: 'static, E: 'static + Display> Task<Result<T, E>> {
933    pub fn detach_and_log_err(self, cx: &mut MutableAppContext) {
934        cx.spawn(|_| async move {
935            if let Err(err) = self.await {
936                log::error!("{}", err);
937            }
938        })
939        .detach();
940    }
941}
942
943impl<T: Send> Task<T> {
944    fn send(any_task: AnyTask) -> Self {
945        Self::Send {
946            any_task,
947            result_type: PhantomData,
948        }
949    }
950}
951
952impl<T: fmt::Debug> fmt::Debug for Task<T> {
953    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
954        match self {
955            Task::Ready(value) => value.fmt(f),
956            Task::Local { any_task, .. } => any_task.fmt(f),
957            Task::Send { any_task, .. } => any_task.fmt(f),
958        }
959    }
960}
961
962impl<T: 'static> Future for Task<T> {
963    type Output = T;
964
965    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
966        match unsafe { self.get_unchecked_mut() } {
967            Task::Ready(value) => Poll::Ready(value.take().unwrap()),
968            Task::Local { any_task, .. } => {
969                any_task.poll(cx).map(|value| *value.downcast().unwrap())
970            }
971            Task::Send { any_task, .. } => {
972                any_task.poll(cx).map(|value| *value.downcast().unwrap())
973            }
974        }
975    }
976}
977
978fn any_future<T, F>(future: F) -> AnyFuture
979where
980    T: 'static + Send,
981    F: Future<Output = T> + Send + 'static,
982{
983    async { Box::new(future.await) as Box<dyn Any + Send> }.boxed()
984}
985
986fn any_local_future<T, F>(future: F) -> AnyLocalFuture
987where
988    T: 'static,
989    F: Future<Output = T> + 'static,
990{
991    async { Box::new(future.await) as Box<dyn Any> }.boxed_local()
992}