executor.rs

  1use anyhow::{anyhow, Result};
  2use async_task::Runnable;
  3use futures::channel::mpsc;
  4use smol::{channel, prelude::*, Executor};
  5use std::{
  6    any::Any,
  7    fmt::{self, Display, Write as _},
  8    marker::PhantomData,
  9    mem,
 10    pin::Pin,
 11    rc::Rc,
 12    sync::Arc,
 13    task::{Context, Poll},
 14    thread,
 15    time::Duration,
 16};
 17
 18use crate::{
 19    platform::{self, Dispatcher},
 20    util::{self, CwdBacktrace},
 21    MutableAppContext,
 22};
 23
 24pub enum Foreground {
 25    Platform {
 26        dispatcher: Arc<dyn platform::Dispatcher>,
 27        _not_send_or_sync: PhantomData<Rc<()>>,
 28    },
 29    #[cfg(any(test, feature = "test-support"))]
 30    Deterministic {
 31        cx_id: usize,
 32        executor: Arc<Deterministic>,
 33    },
 34}
 35
 36pub enum Background {
 37    #[cfg(any(test, feature = "test-support"))]
 38    Deterministic { executor: Arc<Deterministic> },
 39    Production {
 40        executor: Arc<smol::Executor<'static>>,
 41        _stop: channel::Sender<()>,
 42    },
 43}
 44
 45type AnyLocalFuture = Pin<Box<dyn 'static + Future<Output = Box<dyn Any + 'static>>>>;
 46type AnyFuture = Pin<Box<dyn 'static + Send + Future<Output = Box<dyn Any + Send + 'static>>>>;
 47type AnyTask = async_task::Task<Box<dyn Any + Send + 'static>>;
 48type AnyLocalTask = async_task::Task<Box<dyn Any + 'static>>;
 49
 50#[must_use]
 51pub enum Task<T> {
 52    Ready(Option<T>),
 53    Local {
 54        any_task: AnyLocalTask,
 55        result_type: PhantomData<T>,
 56    },
 57    Send {
 58        any_task: AnyTask,
 59        result_type: PhantomData<T>,
 60    },
 61}
 62
 63unsafe impl<T: Send> Send for Task<T> {}
 64
 65#[cfg(any(test, feature = "test-support"))]
 66struct DeterministicState {
 67    rng: rand::prelude::StdRng,
 68    seed: u64,
 69    scheduled_from_foreground: collections::HashMap<usize, Vec<ForegroundRunnable>>,
 70    scheduled_from_background: Vec<BackgroundRunnable>,
 71    forbid_parking: bool,
 72    block_on_ticks: std::ops::RangeInclusive<usize>,
 73    now: std::time::Instant,
 74    next_timer_id: usize,
 75    pending_timers: Vec<(usize, std::time::Instant, postage::barrier::Sender)>,
 76    waiting_backtrace: Option<backtrace::Backtrace>,
 77    next_runnable_id: usize,
 78    poll_history: Vec<ExecutorEvent>,
 79    previous_poll_history: Option<Vec<ExecutorEvent>>,
 80    enable_runnable_backtraces: bool,
 81    runnable_backtraces: collections::HashMap<usize, backtrace::Backtrace>,
 82}
 83
 84#[derive(Copy, Clone, Debug, PartialEq, Eq)]
 85pub enum ExecutorEvent {
 86    PollRunnable { id: usize },
 87    EnqueuRunnable { id: usize },
 88}
 89
 90#[cfg(any(test, feature = "test-support"))]
 91struct ForegroundRunnable {
 92    id: usize,
 93    runnable: Runnable,
 94    main: bool,
 95}
 96
 97#[cfg(any(test, feature = "test-support"))]
 98struct BackgroundRunnable {
 99    id: usize,
100    runnable: Runnable,
101}
102
103#[cfg(any(test, feature = "test-support"))]
104pub struct Deterministic {
105    state: Arc<parking_lot::Mutex<DeterministicState>>,
106    parker: parking_lot::Mutex<parking::Parker>,
107}
108
109pub enum Timer {
110    Production(smol::Timer),
111    #[cfg(any(test, feature = "test-support"))]
112    Deterministic(DeterministicTimer),
113}
114
115#[cfg(any(test, feature = "test-support"))]
116pub struct DeterministicTimer {
117    rx: postage::barrier::Receiver,
118    id: usize,
119    state: Arc<parking_lot::Mutex<DeterministicState>>,
120}
121
122#[cfg(any(test, feature = "test-support"))]
123impl Deterministic {
124    pub fn new(seed: u64) -> Arc<Self> {
125        use rand::prelude::*;
126
127        Arc::new(Self {
128            state: Arc::new(parking_lot::Mutex::new(DeterministicState {
129                rng: StdRng::seed_from_u64(seed),
130                seed,
131                scheduled_from_foreground: Default::default(),
132                scheduled_from_background: Default::default(),
133                forbid_parking: false,
134                block_on_ticks: 0..=1000,
135                now: std::time::Instant::now(),
136                next_timer_id: Default::default(),
137                pending_timers: Default::default(),
138                waiting_backtrace: None,
139                next_runnable_id: 0,
140                poll_history: Default::default(),
141                previous_poll_history: Default::default(),
142                enable_runnable_backtraces: false,
143                runnable_backtraces: Default::default(),
144            })),
145            parker: Default::default(),
146        })
147    }
148
149    pub fn execution_history(&self) -> Vec<ExecutorEvent> {
150        self.state.lock().poll_history.clone()
151    }
152
153    pub fn set_previous_execution_history(&self, history: Option<Vec<ExecutorEvent>>) {
154        self.state.lock().previous_poll_history = history;
155    }
156
157    pub fn enable_runnable_backtrace(&self) {
158        self.state.lock().enable_runnable_backtraces = true;
159    }
160
161    pub fn runnable_backtrace(&self, runnable_id: usize) -> backtrace::Backtrace {
162        let mut backtrace = self.state.lock().runnable_backtraces[&runnable_id].clone();
163        backtrace.resolve();
164        backtrace
165    }
166
167    pub fn build_background(self: &Arc<Self>) -> Arc<Background> {
168        Arc::new(Background::Deterministic {
169            executor: self.clone(),
170        })
171    }
172
173    pub fn build_foreground(self: &Arc<Self>, id: usize) -> Rc<Foreground> {
174        Rc::new(Foreground::Deterministic {
175            cx_id: id,
176            executor: self.clone(),
177        })
178    }
179
180    fn spawn_from_foreground(
181        &self,
182        cx_id: usize,
183        future: AnyLocalFuture,
184        main: bool,
185    ) -> AnyLocalTask {
186        let state = self.state.clone();
187        let id;
188        {
189            let mut state = state.lock();
190            id = util::post_inc(&mut state.next_runnable_id);
191            if state.enable_runnable_backtraces {
192                state
193                    .runnable_backtraces
194                    .insert(id, backtrace::Backtrace::new_unresolved());
195            }
196        }
197
198        let unparker = self.parker.lock().unparker();
199        let (runnable, task) = async_task::spawn_local(future, move |runnable| {
200            let mut state = state.lock();
201            state
202                .poll_history
203                .push(ExecutorEvent::EnqueuRunnable { id });
204            state
205                .scheduled_from_foreground
206                .entry(cx_id)
207                .or_default()
208                .push(ForegroundRunnable { id, runnable, main });
209            unparker.unpark();
210        });
211        runnable.schedule();
212        task
213    }
214
215    fn spawn(&self, future: AnyFuture) -> AnyTask {
216        let state = self.state.clone();
217        let id;
218        {
219            let mut state = state.lock();
220            id = util::post_inc(&mut state.next_runnable_id);
221            if state.enable_runnable_backtraces {
222                state
223                    .runnable_backtraces
224                    .insert(id, backtrace::Backtrace::new_unresolved());
225            }
226        }
227
228        let unparker = self.parker.lock().unparker();
229        let (runnable, task) = async_task::spawn(future, move |runnable| {
230            let mut state = state.lock();
231            state
232                .poll_history
233                .push(ExecutorEvent::EnqueuRunnable { id });
234            state
235                .scheduled_from_background
236                .push(BackgroundRunnable { id, runnable });
237            unparker.unpark();
238        });
239        runnable.schedule();
240        task
241    }
242
243    fn run<'a>(
244        &self,
245        cx_id: usize,
246        main_future: Pin<Box<dyn 'a + Future<Output = Box<dyn Any>>>>,
247    ) -> Box<dyn Any> {
248        use std::sync::atomic::{AtomicBool, Ordering::SeqCst};
249
250        let woken = Arc::new(AtomicBool::new(false));
251
252        let state = self.state.clone();
253        let id;
254        {
255            let mut state = state.lock();
256            id = util::post_inc(&mut state.next_runnable_id);
257            if state.enable_runnable_backtraces {
258                state
259                    .runnable_backtraces
260                    .insert(id, backtrace::Backtrace::new_unresolved());
261            }
262        }
263
264        let unparker = self.parker.lock().unparker();
265        let (runnable, mut main_task) = unsafe {
266            async_task::spawn_unchecked(main_future, move |runnable| {
267                let state = &mut *state.lock();
268                state
269                    .scheduled_from_foreground
270                    .entry(cx_id)
271                    .or_default()
272                    .push(ForegroundRunnable {
273                        id: util::post_inc(&mut state.next_runnable_id),
274                        runnable,
275                        main: true,
276                    });
277                unparker.unpark();
278            })
279        };
280        runnable.schedule();
281
282        loop {
283            if let Some(result) = self.run_internal(woken.clone(), Some(&mut main_task)) {
284                return result;
285            }
286
287            if !woken.load(SeqCst) {
288                self.state.lock().will_park();
289            }
290
291            woken.store(false, SeqCst);
292            self.parker.lock().park();
293        }
294    }
295
296    pub fn run_until_parked(&self) {
297        use std::sync::atomic::AtomicBool;
298        let woken = Arc::new(AtomicBool::new(false));
299        self.run_internal(woken, None);
300    }
301
302    fn run_internal(
303        &self,
304        woken: Arc<std::sync::atomic::AtomicBool>,
305        mut main_task: Option<&mut AnyLocalTask>,
306    ) -> Option<Box<dyn Any>> {
307        use rand::prelude::*;
308        use std::sync::atomic::Ordering::SeqCst;
309
310        let unparker = self.parker.lock().unparker();
311        let waker = waker_fn::waker_fn(move || {
312            woken.store(true, SeqCst);
313            unparker.unpark();
314        });
315
316        let mut cx = Context::from_waker(&waker);
317        loop {
318            let mut state = self.state.lock();
319
320            if state.scheduled_from_foreground.is_empty()
321                && state.scheduled_from_background.is_empty()
322            {
323                if let Some(main_task) = main_task {
324                    if let Poll::Ready(result) = main_task.poll(&mut cx) {
325                        return Some(result);
326                    }
327                }
328
329                return None;
330            }
331
332            if !state.scheduled_from_background.is_empty() && state.rng.gen() {
333                let background_len = state.scheduled_from_background.len();
334                let ix = state.rng.gen_range(0..background_len);
335                let background_runnable = state.scheduled_from_background.remove(ix);
336                state.push_to_history(ExecutorEvent::PollRunnable {
337                    id: background_runnable.id,
338                });
339                drop(state);
340                background_runnable.runnable.run();
341            } else if !state.scheduled_from_foreground.is_empty() {
342                let available_cx_ids = state
343                    .scheduled_from_foreground
344                    .keys()
345                    .copied()
346                    .collect::<Vec<_>>();
347                let cx_id_to_run = *available_cx_ids.iter().choose(&mut state.rng).unwrap();
348                let scheduled_from_cx = state
349                    .scheduled_from_foreground
350                    .get_mut(&cx_id_to_run)
351                    .unwrap();
352                let foreground_runnable = scheduled_from_cx.remove(0);
353                if scheduled_from_cx.is_empty() {
354                    state.scheduled_from_foreground.remove(&cx_id_to_run);
355                }
356                state.push_to_history(ExecutorEvent::PollRunnable {
357                    id: foreground_runnable.id,
358                });
359
360                drop(state);
361
362                foreground_runnable.runnable.run();
363                if let Some(main_task) = main_task.as_mut() {
364                    if foreground_runnable.main {
365                        if let Poll::Ready(result) = main_task.poll(&mut cx) {
366                            return Some(result);
367                        }
368                    }
369                }
370            }
371        }
372    }
373
374    fn block<F, T>(&self, future: &mut F, max_ticks: usize) -> Option<T>
375    where
376        F: Unpin + Future<Output = T>,
377    {
378        use rand::prelude::*;
379
380        let unparker = self.parker.lock().unparker();
381        let waker = waker_fn::waker_fn(move || {
382            unparker.unpark();
383        });
384
385        let mut cx = Context::from_waker(&waker);
386        for _ in 0..max_ticks {
387            let mut state = self.state.lock();
388            let runnable_count = state.scheduled_from_background.len();
389            let ix = state.rng.gen_range(0..=runnable_count);
390            if ix < state.scheduled_from_background.len() {
391                let background_runnable = state.scheduled_from_background.remove(ix);
392                state.push_to_history(ExecutorEvent::PollRunnable {
393                    id: background_runnable.id,
394                });
395                drop(state);
396                background_runnable.runnable.run();
397            } else {
398                drop(state);
399                if let Poll::Ready(result) = future.poll(&mut cx) {
400                    return Some(result);
401                }
402                let mut state = self.state.lock();
403                if state.scheduled_from_background.is_empty() {
404                    state.will_park();
405                    drop(state);
406                    self.parker.lock().park();
407                }
408
409                continue;
410            }
411        }
412
413        None
414    }
415
416    pub fn timer(&self, duration: Duration) -> Timer {
417        let (tx, rx) = postage::barrier::channel();
418        let mut state = self.state.lock();
419        let wakeup_at = state.now + duration;
420        let id = util::post_inc(&mut state.next_timer_id);
421        match state
422            .pending_timers
423            .binary_search_by_key(&wakeup_at, |e| e.1)
424        {
425            Ok(ix) | Err(ix) => state.pending_timers.insert(ix, (id, wakeup_at, tx)),
426        }
427        let state = self.state.clone();
428        Timer::Deterministic(DeterministicTimer { rx, id, state })
429    }
430
431    pub fn now(&self) -> std::time::Instant {
432        let state = self.state.lock();
433        state.now
434    }
435
436    pub fn advance_clock(&self, duration: Duration) {
437        let new_now = self.state.lock().now + duration;
438        loop {
439            self.run_until_parked();
440            let mut state = self.state.lock();
441
442            if let Some((_, wakeup_time, _)) = state.pending_timers.first() {
443                let wakeup_time = *wakeup_time;
444                if wakeup_time <= new_now {
445                    let timer_count = state
446                        .pending_timers
447                        .iter()
448                        .take_while(|(_, t, _)| *t == wakeup_time)
449                        .count();
450                    state.now = wakeup_time;
451                    let timers_to_wake = state
452                        .pending_timers
453                        .drain(0..timer_count)
454                        .collect::<Vec<_>>();
455                    drop(state);
456                    drop(timers_to_wake);
457                    continue;
458                }
459            }
460
461            break;
462        }
463
464        self.state.lock().now = new_now;
465    }
466
467    pub fn start_waiting(&self) {
468        self.state.lock().waiting_backtrace = Some(backtrace::Backtrace::new_unresolved());
469    }
470
471    pub fn finish_waiting(&self) {
472        self.state.lock().waiting_backtrace.take();
473    }
474
475    pub fn forbid_parking(&self) {
476        use rand::prelude::*;
477
478        let mut state = self.state.lock();
479        state.forbid_parking = true;
480        state.rng = StdRng::seed_from_u64(state.seed);
481    }
482
483    pub async fn simulate_random_delay(&self) {
484        use rand::prelude::*;
485        use smol::future::yield_now;
486        if self.state.lock().rng.gen_bool(0.2) {
487            let yields = self.state.lock().rng.gen_range(1..=10);
488            for _ in 0..yields {
489                yield_now().await;
490            }
491        }
492    }
493
494    pub fn record_backtrace(&self) {
495        let mut state = self.state.lock();
496        if state.enable_runnable_backtraces {
497            let current_id = state
498                .poll_history
499                .iter()
500                .rev()
501                .find_map(|event| match event {
502                    ExecutorEvent::PollRunnable { id } => Some(*id),
503                    _ => None,
504                });
505            if let Some(id) = current_id {
506                state
507                    .runnable_backtraces
508                    .insert(id, backtrace::Backtrace::new_unresolved());
509            }
510        }
511    }
512}
513
514impl Drop for Timer {
515    fn drop(&mut self) {
516        #[cfg(any(test, feature = "test-support"))]
517        if let Timer::Deterministic(DeterministicTimer { state, id, .. }) = self {
518            state
519                .lock()
520                .pending_timers
521                .retain(|(timer_id, _, _)| timer_id != id)
522        }
523    }
524}
525
526impl Future for Timer {
527    type Output = ();
528
529    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
530        match &mut *self {
531            #[cfg(any(test, feature = "test-support"))]
532            Self::Deterministic(DeterministicTimer { rx, .. }) => {
533                use postage::stream::{PollRecv, Stream as _};
534                smol::pin!(rx);
535                match rx.poll_recv(&mut postage::Context::from_waker(cx.waker())) {
536                    PollRecv::Ready(()) | PollRecv::Closed => Poll::Ready(()),
537                    PollRecv::Pending => Poll::Pending,
538                }
539            }
540            Self::Production(timer) => {
541                smol::pin!(timer);
542                match timer.poll(cx) {
543                    Poll::Ready(_) => Poll::Ready(()),
544                    Poll::Pending => Poll::Pending,
545                }
546            }
547        }
548    }
549}
550
551#[cfg(any(test, feature = "test-support"))]
552impl DeterministicState {
553    fn push_to_history(&mut self, event: ExecutorEvent) {
554        self.poll_history.push(event);
555        if let Some(prev_history) = &self.previous_poll_history {
556            let ix = self.poll_history.len() - 1;
557            let prev_event = prev_history[ix];
558            if event != prev_event {
559                let mut message = String::new();
560                writeln!(
561                    &mut message,
562                    "current runnable backtrace:\n{:?}",
563                    self.runnable_backtraces.get_mut(&event.id()).map(|trace| {
564                        trace.resolve();
565                        CwdBacktrace(trace)
566                    })
567                )
568                .unwrap();
569                writeln!(
570                    &mut message,
571                    "previous runnable backtrace:\n{:?}",
572                    self.runnable_backtraces
573                        .get_mut(&prev_event.id())
574                        .map(|trace| {
575                            trace.resolve();
576                            CwdBacktrace(trace)
577                        })
578                )
579                .unwrap();
580                panic!("detected non-determinism after {ix}. {message}");
581            }
582        }
583    }
584
585    fn will_park(&mut self) {
586        if self.forbid_parking {
587            let mut backtrace_message = String::new();
588            #[cfg(any(test, feature = "test-support"))]
589            if let Some(backtrace) = self.waiting_backtrace.as_mut() {
590                backtrace.resolve();
591                backtrace_message = format!(
592                    "\nbacktrace of waiting future:\n{:?}",
593                    util::CwdBacktrace(backtrace)
594                );
595            }
596
597            panic!(
598                "deterministic executor parked after a call to forbid_parking{}",
599                backtrace_message
600            );
601        }
602    }
603}
604
605#[cfg(any(test, feature = "test-support"))]
606impl ExecutorEvent {
607    pub fn id(&self) -> usize {
608        match self {
609            ExecutorEvent::PollRunnable { id } => *id,
610            ExecutorEvent::EnqueuRunnable { id } => *id,
611        }
612    }
613}
614
615impl Foreground {
616    pub fn platform(dispatcher: Arc<dyn platform::Dispatcher>) -> Result<Self> {
617        if dispatcher.is_main_thread() {
618            Ok(Self::Platform {
619                dispatcher,
620                _not_send_or_sync: PhantomData,
621            })
622        } else {
623            Err(anyhow!("must be constructed on main thread"))
624        }
625    }
626
627    pub fn spawn<T: 'static>(&self, future: impl Future<Output = T> + 'static) -> Task<T> {
628        let future = any_local_future(future);
629        let any_task = match self {
630            #[cfg(any(test, feature = "test-support"))]
631            Self::Deterministic { cx_id, executor } => {
632                executor.spawn_from_foreground(*cx_id, future, false)
633            }
634            Self::Platform { dispatcher, .. } => {
635                fn spawn_inner(
636                    future: AnyLocalFuture,
637                    dispatcher: &Arc<dyn Dispatcher>,
638                ) -> AnyLocalTask {
639                    let dispatcher = dispatcher.clone();
640                    let schedule =
641                        move |runnable: Runnable| dispatcher.run_on_main_thread(runnable);
642                    let (runnable, task) = async_task::spawn_local(future, schedule);
643                    runnable.schedule();
644                    task
645                }
646                spawn_inner(future, dispatcher)
647            }
648        };
649        Task::local(any_task)
650    }
651
652    #[cfg(any(test, feature = "test-support"))]
653    pub fn run<T: 'static>(&self, future: impl Future<Output = T>) -> T {
654        let future = async move { Box::new(future.await) as Box<dyn Any> }.boxed_local();
655        let result = match self {
656            Self::Deterministic { cx_id, executor } => executor.run(*cx_id, future),
657            Self::Platform { .. } => panic!("you can't call run on a platform foreground executor"),
658        };
659        *result.downcast().unwrap()
660    }
661
662    #[cfg(any(test, feature = "test-support"))]
663    pub fn run_until_parked(&self) {
664        match self {
665            Self::Deterministic { executor, .. } => executor.run_until_parked(),
666            _ => panic!("this method can only be called on a deterministic executor"),
667        }
668    }
669
670    #[cfg(any(test, feature = "test-support"))]
671    pub fn parking_forbidden(&self) -> bool {
672        match self {
673            Self::Deterministic { executor, .. } => executor.state.lock().forbid_parking,
674            _ => panic!("this method can only be called on a deterministic executor"),
675        }
676    }
677
678    #[cfg(any(test, feature = "test-support"))]
679    pub fn start_waiting(&self) {
680        match self {
681            Self::Deterministic { executor, .. } => executor.start_waiting(),
682            _ => panic!("this method can only be called on a deterministic executor"),
683        }
684    }
685
686    #[cfg(any(test, feature = "test-support"))]
687    pub fn finish_waiting(&self) {
688        match self {
689            Self::Deterministic { executor, .. } => executor.finish_waiting(),
690            _ => panic!("this method can only be called on a deterministic executor"),
691        }
692    }
693
694    #[cfg(any(test, feature = "test-support"))]
695    pub fn forbid_parking(&self) {
696        match self {
697            Self::Deterministic { executor, .. } => executor.forbid_parking(),
698            _ => panic!("this method can only be called on a deterministic executor"),
699        }
700    }
701
702    #[cfg(any(test, feature = "test-support"))]
703    pub fn advance_clock(&self, duration: Duration) {
704        match self {
705            Self::Deterministic { executor, .. } => executor.advance_clock(duration),
706            _ => panic!("this method can only be called on a deterministic executor"),
707        }
708    }
709
710    #[cfg(any(test, feature = "test-support"))]
711    pub fn set_block_on_ticks(&self, range: std::ops::RangeInclusive<usize>) {
712        match self {
713            Self::Deterministic { executor, .. } => executor.state.lock().block_on_ticks = range,
714            _ => panic!("this method can only be called on a deterministic executor"),
715        }
716    }
717}
718
719impl Background {
720    pub fn new() -> Self {
721        let executor = Arc::new(Executor::new());
722        let stop = channel::unbounded::<()>();
723
724        for i in 0..2 * num_cpus::get() {
725            let executor = executor.clone();
726            let stop = stop.1.clone();
727            thread::Builder::new()
728                .name(format!("background-executor-{}", i))
729                .spawn(move || smol::block_on(executor.run(stop.recv())))
730                .unwrap();
731        }
732
733        Self::Production {
734            executor,
735            _stop: stop.0,
736        }
737    }
738
739    pub fn num_cpus(&self) -> usize {
740        num_cpus::get()
741    }
742
743    pub fn spawn<T, F>(&self, future: F) -> Task<T>
744    where
745        T: 'static + Send,
746        F: Send + Future<Output = T> + 'static,
747    {
748        let future = any_future(future);
749        let any_task = match self {
750            Self::Production { executor, .. } => executor.spawn(future),
751            #[cfg(any(test, feature = "test-support"))]
752            Self::Deterministic { executor } => executor.spawn(future),
753        };
754        Task::send(any_task)
755    }
756
757    pub fn block<F, T>(&self, future: F) -> T
758    where
759        F: Future<Output = T>,
760    {
761        smol::pin!(future);
762        match self {
763            Self::Production { .. } => smol::block_on(&mut future),
764            #[cfg(any(test, feature = "test-support"))]
765            Self::Deterministic { executor, .. } => {
766                executor.block(&mut future, usize::MAX).unwrap()
767            }
768        }
769    }
770
771    pub fn block_with_timeout<F, T>(
772        &self,
773        timeout: Duration,
774        future: F,
775    ) -> Result<T, impl Future<Output = T>>
776    where
777        T: 'static,
778        F: 'static + Unpin + Future<Output = T>,
779    {
780        let mut future = any_local_future(future);
781        if !timeout.is_zero() {
782            let output = match self {
783                Self::Production { .. } => smol::block_on(util::timeout(timeout, &mut future)).ok(),
784                #[cfg(any(test, feature = "test-support"))]
785                Self::Deterministic { executor, .. } => {
786                    use rand::prelude::*;
787                    let max_ticks = {
788                        let mut state = executor.state.lock();
789                        let range = state.block_on_ticks.clone();
790                        state.rng.gen_range(range)
791                    };
792                    executor.block(&mut future, max_ticks)
793                }
794            };
795            if let Some(output) = output {
796                return Ok(*output.downcast().unwrap());
797            }
798        }
799        Err(async { *future.await.downcast().unwrap() })
800    }
801
802    pub async fn scoped<'scope, F>(self: &Arc<Self>, scheduler: F)
803    where
804        F: FnOnce(&mut Scope<'scope>),
805    {
806        let mut scope = Scope::new(self.clone());
807        (scheduler)(&mut scope);
808        let spawned = mem::take(&mut scope.futures)
809            .into_iter()
810            .map(|f| self.spawn(f))
811            .collect::<Vec<_>>();
812        for task in spawned {
813            task.await;
814        }
815    }
816
817    pub fn timer(&self, duration: Duration) -> Timer {
818        match self {
819            Background::Production { .. } => Timer::Production(smol::Timer::after(duration)),
820            #[cfg(any(test, feature = "test-support"))]
821            Background::Deterministic { executor } => executor.timer(duration),
822        }
823    }
824
825    pub fn now(&self) -> std::time::Instant {
826        match self {
827            Background::Production { .. } => std::time::Instant::now(),
828            #[cfg(any(test, feature = "test-support"))]
829            Background::Deterministic { executor } => executor.now(),
830        }
831    }
832
833    #[cfg(any(test, feature = "test-support"))]
834    pub async fn simulate_random_delay(&self) {
835        match self {
836            Self::Deterministic { executor, .. } => {
837                executor.simulate_random_delay().await;
838            }
839            _ => {
840                panic!("this method can only be called on a deterministic executor")
841            }
842        }
843    }
844
845    #[cfg(any(test, feature = "test-support"))]
846    pub fn record_backtrace(&self) {
847        match self {
848            Self::Deterministic { executor, .. } => executor.record_backtrace(),
849            _ => {
850                panic!("this method can only be called on a deterministic executor")
851            }
852        }
853    }
854}
855
856impl Default for Background {
857    fn default() -> Self {
858        Self::new()
859    }
860}
861
862pub struct Scope<'a> {
863    executor: Arc<Background>,
864    futures: Vec<Pin<Box<dyn Future<Output = ()> + Send + 'static>>>,
865    tx: Option<mpsc::Sender<()>>,
866    rx: mpsc::Receiver<()>,
867    _phantom: PhantomData<&'a ()>,
868}
869
870impl<'a> Scope<'a> {
871    fn new(executor: Arc<Background>) -> Self {
872        let (tx, rx) = mpsc::channel(1);
873        Self {
874            executor,
875            tx: Some(tx),
876            rx,
877            futures: Default::default(),
878            _phantom: PhantomData,
879        }
880    }
881
882    pub fn spawn<F>(&mut self, f: F)
883    where
884        F: Future<Output = ()> + Send + 'a,
885    {
886        let tx = self.tx.clone().unwrap();
887
888        // Safety: The 'a lifetime is guaranteed to outlive any of these futures because
889        // dropping this `Scope` blocks until all of the futures have resolved.
890        let f = unsafe {
891            mem::transmute::<
892                Pin<Box<dyn Future<Output = ()> + Send + 'a>>,
893                Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
894            >(Box::pin(async move {
895                f.await;
896                drop(tx);
897            }))
898        };
899        self.futures.push(f);
900    }
901}
902
903impl<'a> Drop for Scope<'a> {
904    fn drop(&mut self) {
905        self.tx.take().unwrap();
906
907        // Wait until the channel is closed, which means that all of the spawned
908        // futures have resolved.
909        self.executor.block(self.rx.next());
910    }
911}
912
913impl<T> Task<T> {
914    pub fn ready(value: T) -> Self {
915        Self::Ready(Some(value))
916    }
917
918    fn local(any_task: AnyLocalTask) -> Self {
919        Self::Local {
920            any_task,
921            result_type: PhantomData,
922        }
923    }
924
925    pub fn detach(self) {
926        match self {
927            Task::Ready(_) => {}
928            Task::Local { any_task, .. } => any_task.detach(),
929            Task::Send { any_task, .. } => any_task.detach(),
930        }
931    }
932}
933
934impl<T: 'static, E: 'static + Display> Task<Result<T, E>> {
935    pub fn detach_and_log_err(self, cx: &mut MutableAppContext) {
936        cx.spawn(|_| async move {
937            if let Err(err) = self.await {
938                log::error!("{}", err);
939            }
940        })
941        .detach();
942    }
943}
944
945impl<T: Send> Task<T> {
946    fn send(any_task: AnyTask) -> Self {
947        Self::Send {
948            any_task,
949            result_type: PhantomData,
950        }
951    }
952}
953
954impl<T: fmt::Debug> fmt::Debug for Task<T> {
955    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
956        match self {
957            Task::Ready(value) => value.fmt(f),
958            Task::Local { any_task, .. } => any_task.fmt(f),
959            Task::Send { any_task, .. } => any_task.fmt(f),
960        }
961    }
962}
963
964impl<T: 'static> Future for Task<T> {
965    type Output = T;
966
967    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
968        match unsafe { self.get_unchecked_mut() } {
969            Task::Ready(value) => Poll::Ready(value.take().unwrap()),
970            Task::Local { any_task, .. } => {
971                any_task.poll(cx).map(|value| *value.downcast().unwrap())
972            }
973            Task::Send { any_task, .. } => {
974                any_task.poll(cx).map(|value| *value.downcast().unwrap())
975            }
976        }
977    }
978}
979
980fn any_future<T, F>(future: F) -> AnyFuture
981where
982    T: 'static + Send,
983    F: Future<Output = T> + Send + 'static,
984{
985    async { Box::new(future.await) as Box<dyn Any + Send> }.boxed()
986}
987
988fn any_local_future<T, F>(future: F) -> AnyLocalFuture
989where
990    T: 'static,
991    F: Future<Output = T> + 'static,
992{
993    async { Box::new(future.await) as Box<dyn Any> }.boxed_local()
994}