1use anyhow::{anyhow, Result};
2use async_task::Runnable;
3use smol::{channel, prelude::*, Executor};
4use std::{
5 any::Any,
6 fmt::{self, Display},
7 marker::PhantomData,
8 mem,
9 pin::Pin,
10 rc::Rc,
11 sync::{mpsc, Arc},
12 task::{Context, Poll},
13 thread,
14 time::Duration,
15};
16
17use crate::{
18 platform::{self, Dispatcher},
19 util, MutableAppContext,
20};
21
22pub enum Foreground {
23 Platform {
24 dispatcher: Arc<dyn platform::Dispatcher>,
25 _not_send_or_sync: PhantomData<Rc<()>>,
26 },
27 #[cfg(any(test, feature = "test-support"))]
28 Deterministic {
29 cx_id: usize,
30 executor: Arc<Deterministic>,
31 },
32}
33
34pub enum Background {
35 #[cfg(any(test, feature = "test-support"))]
36 Deterministic { executor: Arc<Deterministic> },
37 Production {
38 executor: Arc<smol::Executor<'static>>,
39 _stop: channel::Sender<()>,
40 },
41}
42
43type AnyLocalFuture = Pin<Box<dyn 'static + Future<Output = Box<dyn Any + 'static>>>>;
44type AnyFuture = Pin<Box<dyn 'static + Send + Future<Output = Box<dyn Any + Send + 'static>>>>;
45type AnyTask = async_task::Task<Box<dyn Any + Send + 'static>>;
46type AnyLocalTask = async_task::Task<Box<dyn Any + 'static>>;
47
48#[must_use]
49pub enum Task<T> {
50 Ready(Option<T>),
51 Local {
52 any_task: AnyLocalTask,
53 result_type: PhantomData<T>,
54 },
55 Send {
56 any_task: AnyTask,
57 result_type: PhantomData<T>,
58 },
59}
60
61unsafe impl<T: Send> Send for Task<T> {}
62
63#[cfg(any(test, feature = "test-support"))]
64struct DeterministicState {
65 rng: rand::prelude::StdRng,
66 seed: u64,
67 scheduled_from_foreground: collections::HashMap<usize, Vec<ForegroundRunnable>>,
68 scheduled_from_background: Vec<Runnable>,
69 forbid_parking: bool,
70 block_on_ticks: std::ops::RangeInclusive<usize>,
71 now: std::time::Instant,
72 next_timer_id: usize,
73 pending_timers: Vec<(usize, std::time::Instant, postage::barrier::Sender)>,
74 waiting_backtrace: Option<backtrace::Backtrace>,
75}
76
77#[cfg(any(test, feature = "test-support"))]
78struct ForegroundRunnable {
79 runnable: Runnable,
80 main: bool,
81}
82
83#[cfg(any(test, feature = "test-support"))]
84pub struct Deterministic {
85 state: Arc<parking_lot::Mutex<DeterministicState>>,
86 parker: parking_lot::Mutex<parking::Parker>,
87}
88
89pub enum Timer {
90 Production(smol::Timer),
91 #[cfg(any(test, feature = "test-support"))]
92 Deterministic(DeterministicTimer),
93}
94
95#[cfg(any(test, feature = "test-support"))]
96pub struct DeterministicTimer {
97 rx: postage::barrier::Receiver,
98 id: usize,
99 state: Arc<parking_lot::Mutex<DeterministicState>>,
100}
101
102#[cfg(any(test, feature = "test-support"))]
103impl Deterministic {
104 pub fn new(seed: u64) -> Arc<Self> {
105 use rand::prelude::*;
106
107 Arc::new(Self {
108 state: Arc::new(parking_lot::Mutex::new(DeterministicState {
109 rng: StdRng::seed_from_u64(seed),
110 seed,
111 scheduled_from_foreground: Default::default(),
112 scheduled_from_background: Default::default(),
113 forbid_parking: false,
114 block_on_ticks: 0..=1000,
115 now: std::time::Instant::now(),
116 next_timer_id: Default::default(),
117 pending_timers: Default::default(),
118 waiting_backtrace: None,
119 })),
120 parker: Default::default(),
121 })
122 }
123
124 pub fn build_background(self: &Arc<Self>) -> Arc<Background> {
125 Arc::new(Background::Deterministic {
126 executor: self.clone(),
127 })
128 }
129
130 pub fn build_foreground(self: &Arc<Self>, id: usize) -> Rc<Foreground> {
131 Rc::new(Foreground::Deterministic {
132 cx_id: id,
133 executor: self.clone(),
134 })
135 }
136
137 fn spawn_from_foreground(
138 &self,
139 cx_id: usize,
140 future: AnyLocalFuture,
141 main: bool,
142 ) -> AnyLocalTask {
143 let state = self.state.clone();
144 let unparker = self.parker.lock().unparker();
145 let (runnable, task) = async_task::spawn_local(future, move |runnable| {
146 let mut state = state.lock();
147 state
148 .scheduled_from_foreground
149 .entry(cx_id)
150 .or_default()
151 .push(ForegroundRunnable { runnable, main });
152 unparker.unpark();
153 });
154 runnable.schedule();
155 task
156 }
157
158 fn spawn(&self, future: AnyFuture) -> AnyTask {
159 let state = self.state.clone();
160 let unparker = self.parker.lock().unparker();
161 let (runnable, task) = async_task::spawn(future, move |runnable| {
162 let mut state = state.lock();
163 state.scheduled_from_background.push(runnable);
164 unparker.unpark();
165 });
166 runnable.schedule();
167 task
168 }
169
170 fn run<'a>(
171 &self,
172 cx_id: usize,
173 main_future: Pin<Box<dyn 'a + Future<Output = Box<dyn Any>>>>,
174 ) -> Box<dyn Any> {
175 use std::sync::atomic::{AtomicBool, Ordering::SeqCst};
176
177 let woken = Arc::new(AtomicBool::new(false));
178
179 let state = self.state.clone();
180 let unparker = self.parker.lock().unparker();
181 let (runnable, mut main_task) = unsafe {
182 async_task::spawn_unchecked(main_future, move |runnable| {
183 let mut state = state.lock();
184 state
185 .scheduled_from_foreground
186 .entry(cx_id)
187 .or_default()
188 .push(ForegroundRunnable {
189 runnable,
190 main: true,
191 });
192 unparker.unpark();
193 })
194 };
195 runnable.schedule();
196
197 loop {
198 if let Some(result) = self.run_internal(woken.clone(), Some(&mut main_task)) {
199 return result;
200 }
201
202 if !woken.load(SeqCst) {
203 self.state.lock().will_park();
204 }
205
206 woken.store(false, SeqCst);
207 self.parker.lock().park();
208 }
209 }
210
211 pub fn run_until_parked(&self) {
212 use std::sync::atomic::AtomicBool;
213 let woken = Arc::new(AtomicBool::new(false));
214 self.run_internal(woken, None);
215 }
216
217 fn run_internal(
218 &self,
219 woken: Arc<std::sync::atomic::AtomicBool>,
220 mut main_task: Option<&mut AnyLocalTask>,
221 ) -> Option<Box<dyn Any>> {
222 use rand::prelude::*;
223 use std::sync::atomic::Ordering::SeqCst;
224
225 let unparker = self.parker.lock().unparker();
226 let waker = waker_fn::waker_fn(move || {
227 woken.store(true, SeqCst);
228 unparker.unpark();
229 });
230
231 let mut cx = Context::from_waker(&waker);
232 loop {
233 let mut state = self.state.lock();
234
235 if state.scheduled_from_foreground.is_empty()
236 && state.scheduled_from_background.is_empty()
237 {
238 if let Some(main_task) = main_task {
239 if let Poll::Ready(result) = main_task.poll(&mut cx) {
240 return Some(result);
241 }
242 }
243
244 return None;
245 }
246
247 if !state.scheduled_from_background.is_empty() && state.rng.gen() {
248 let background_len = state.scheduled_from_background.len();
249 let ix = state.rng.gen_range(0..background_len);
250 let runnable = state.scheduled_from_background.remove(ix);
251 drop(state);
252 runnable.run();
253 } else if !state.scheduled_from_foreground.is_empty() {
254 let available_cx_ids = state
255 .scheduled_from_foreground
256 .keys()
257 .copied()
258 .collect::<Vec<_>>();
259 let cx_id_to_run = *available_cx_ids.iter().choose(&mut state.rng).unwrap();
260 let scheduled_from_cx = state
261 .scheduled_from_foreground
262 .get_mut(&cx_id_to_run)
263 .unwrap();
264 let foreground_runnable = scheduled_from_cx.remove(0);
265 if scheduled_from_cx.is_empty() {
266 state.scheduled_from_foreground.remove(&cx_id_to_run);
267 }
268
269 drop(state);
270
271 foreground_runnable.runnable.run();
272 if let Some(main_task) = main_task.as_mut() {
273 if foreground_runnable.main {
274 if let Poll::Ready(result) = main_task.poll(&mut cx) {
275 return Some(result);
276 }
277 }
278 }
279 }
280 }
281 }
282
283 fn block<F, T>(&self, future: &mut F, max_ticks: usize) -> Option<T>
284 where
285 F: Unpin + Future<Output = T>,
286 {
287 use rand::prelude::*;
288
289 let unparker = self.parker.lock().unparker();
290 let waker = waker_fn::waker_fn(move || {
291 unparker.unpark();
292 });
293
294 let mut cx = Context::from_waker(&waker);
295 for _ in 0..max_ticks {
296 let mut state = self.state.lock();
297 let runnable_count = state.scheduled_from_background.len();
298 let ix = state.rng.gen_range(0..=runnable_count);
299 if ix < state.scheduled_from_background.len() {
300 let runnable = state.scheduled_from_background.remove(ix);
301 drop(state);
302 runnable.run();
303 } else {
304 drop(state);
305 if let Poll::Ready(result) = future.poll(&mut cx) {
306 return Some(result);
307 }
308 let mut state = self.state.lock();
309 if state.scheduled_from_background.is_empty() {
310 state.will_park();
311 drop(state);
312 self.parker.lock().park();
313 }
314
315 continue;
316 }
317 }
318
319 None
320 }
321
322 pub fn timer(&self, duration: Duration) -> Timer {
323 let (tx, rx) = postage::barrier::channel();
324 let mut state = self.state.lock();
325 let wakeup_at = state.now + duration;
326 let id = util::post_inc(&mut state.next_timer_id);
327 state.pending_timers.push((id, wakeup_at, tx));
328 let state = self.state.clone();
329 Timer::Deterministic(DeterministicTimer { rx, id, state })
330 }
331
332 pub fn advance_clock(&self, duration: Duration) {
333 let new_now = self.state.lock().now + duration;
334 loop {
335 self.run_until_parked();
336 let mut state = self.state.lock();
337
338 if let Some((_, wakeup_time, _)) = state.pending_timers.first() {
339 let wakeup_time = *wakeup_time;
340 if wakeup_time <= new_now {
341 let timer_count = state
342 .pending_timers
343 .iter()
344 .take_while(|(_, t, _)| *t == wakeup_time)
345 .count();
346 state.now = wakeup_time;
347 let timers_to_wake = state
348 .pending_timers
349 .drain(0..timer_count)
350 .collect::<Vec<_>>();
351 drop(state);
352 drop(timers_to_wake);
353 continue;
354 }
355 }
356
357 break;
358 }
359
360 self.state.lock().now = new_now;
361 }
362}
363
364impl Drop for Timer {
365 fn drop(&mut self) {
366 #[cfg(any(test, feature = "test-support"))]
367 if let Timer::Deterministic(DeterministicTimer { state, id, .. }) = self {
368 state
369 .lock()
370 .pending_timers
371 .retain(|(timer_id, _, _)| timer_id != id)
372 }
373 }
374}
375
376impl Future for Timer {
377 type Output = ();
378
379 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
380 match &mut *self {
381 #[cfg(any(test, feature = "test-support"))]
382 Self::Deterministic(DeterministicTimer { rx, .. }) => {
383 use postage::stream::{PollRecv, Stream as _};
384 smol::pin!(rx);
385 match rx.poll_recv(&mut postage::Context::from_waker(cx.waker())) {
386 PollRecv::Ready(()) | PollRecv::Closed => Poll::Ready(()),
387 PollRecv::Pending => Poll::Pending,
388 }
389 }
390 Self::Production(timer) => {
391 smol::pin!(timer);
392 match timer.poll(cx) {
393 Poll::Ready(_) => Poll::Ready(()),
394 Poll::Pending => Poll::Pending,
395 }
396 }
397 }
398 }
399}
400
401#[cfg(any(test, feature = "test-support"))]
402impl DeterministicState {
403 fn will_park(&mut self) {
404 if self.forbid_parking {
405 let mut backtrace_message = String::new();
406 #[cfg(any(test, feature = "test-support"))]
407 if let Some(backtrace) = self.waiting_backtrace.as_mut() {
408 backtrace.resolve();
409 backtrace_message = format!(
410 "\nbacktrace of waiting future:\n{:?}",
411 util::CwdBacktrace(backtrace)
412 );
413 }
414
415 panic!(
416 "deterministic executor parked after a call to forbid_parking{}",
417 backtrace_message
418 );
419 }
420 }
421}
422
423impl Foreground {
424 pub fn platform(dispatcher: Arc<dyn platform::Dispatcher>) -> Result<Self> {
425 if dispatcher.is_main_thread() {
426 Ok(Self::Platform {
427 dispatcher,
428 _not_send_or_sync: PhantomData,
429 })
430 } else {
431 Err(anyhow!("must be constructed on main thread"))
432 }
433 }
434
435 pub fn spawn<T: 'static>(&self, future: impl Future<Output = T> + 'static) -> Task<T> {
436 let future = any_local_future(future);
437 let any_task = match self {
438 #[cfg(any(test, feature = "test-support"))]
439 Self::Deterministic { cx_id, executor } => {
440 executor.spawn_from_foreground(*cx_id, future, false)
441 }
442 Self::Platform { dispatcher, .. } => {
443 fn spawn_inner(
444 future: AnyLocalFuture,
445 dispatcher: &Arc<dyn Dispatcher>,
446 ) -> AnyLocalTask {
447 let dispatcher = dispatcher.clone();
448 let schedule =
449 move |runnable: Runnable| dispatcher.run_on_main_thread(runnable);
450 let (runnable, task) = async_task::spawn_local(future, schedule);
451 runnable.schedule();
452 task
453 }
454 spawn_inner(future, dispatcher)
455 }
456 };
457 Task::local(any_task)
458 }
459
460 #[cfg(any(test, feature = "test-support"))]
461 pub fn run<T: 'static>(&self, future: impl Future<Output = T>) -> T {
462 let future = async move { Box::new(future.await) as Box<dyn Any> }.boxed_local();
463 let result = match self {
464 Self::Deterministic { cx_id, executor } => executor.run(*cx_id, future),
465 Self::Platform { .. } => panic!("you can't call run on a platform foreground executor"),
466 };
467 *result.downcast().unwrap()
468 }
469
470 #[cfg(any(test, feature = "test-support"))]
471 pub fn run_until_parked(&self) {
472 match self {
473 Self::Deterministic { executor, .. } => executor.run_until_parked(),
474 _ => panic!("this method can only be called on a deterministic executor"),
475 }
476 }
477
478 #[cfg(any(test, feature = "test-support"))]
479 pub fn parking_forbidden(&self) -> bool {
480 match self {
481 Self::Deterministic { executor, .. } => executor.state.lock().forbid_parking,
482 _ => panic!("this method can only be called on a deterministic executor"),
483 }
484 }
485
486 #[cfg(any(test, feature = "test-support"))]
487 pub fn start_waiting(&self) {
488 match self {
489 Self::Deterministic { executor, .. } => {
490 executor.state.lock().waiting_backtrace =
491 Some(backtrace::Backtrace::new_unresolved());
492 }
493 _ => panic!("this method can only be called on a deterministic executor"),
494 }
495 }
496
497 #[cfg(any(test, feature = "test-support"))]
498 pub fn finish_waiting(&self) {
499 match self {
500 Self::Deterministic { executor, .. } => {
501 executor.state.lock().waiting_backtrace.take();
502 }
503 _ => panic!("this method can only be called on a deterministic executor"),
504 }
505 }
506
507 #[cfg(any(test, feature = "test-support"))]
508 pub fn forbid_parking(&self) {
509 use rand::prelude::*;
510
511 match self {
512 Self::Deterministic { executor, .. } => {
513 let mut state = executor.state.lock();
514 state.forbid_parking = true;
515 state.rng = StdRng::seed_from_u64(state.seed);
516 }
517 _ => panic!("this method can only be called on a deterministic executor"),
518 }
519 }
520
521 #[cfg(any(test, feature = "test-support"))]
522 pub fn advance_clock(&self, duration: Duration) {
523 match self {
524 Self::Deterministic { executor, .. } => {
525 executor.run_until_parked();
526 executor.advance_clock(duration);
527 }
528 _ => panic!("this method can only be called on a deterministic executor"),
529 }
530 }
531
532 #[cfg(any(test, feature = "test-support"))]
533 pub fn set_block_on_ticks(&self, range: std::ops::RangeInclusive<usize>) {
534 match self {
535 Self::Deterministic { executor, .. } => executor.state.lock().block_on_ticks = range,
536 _ => panic!("this method can only be called on a deterministic executor"),
537 }
538 }
539}
540
541impl Background {
542 pub fn new() -> Self {
543 let executor = Arc::new(Executor::new());
544 let stop = channel::unbounded::<()>();
545
546 for i in 0..2 * num_cpus::get() {
547 let executor = executor.clone();
548 let stop = stop.1.clone();
549 thread::Builder::new()
550 .name(format!("background-executor-{}", i))
551 .spawn(move || smol::block_on(executor.run(stop.recv())))
552 .unwrap();
553 }
554
555 Self::Production {
556 executor,
557 _stop: stop.0,
558 }
559 }
560
561 pub fn num_cpus(&self) -> usize {
562 num_cpus::get()
563 }
564
565 pub fn spawn<T, F>(&self, future: F) -> Task<T>
566 where
567 T: 'static + Send,
568 F: Send + Future<Output = T> + 'static,
569 {
570 let future = any_future(future);
571 let any_task = match self {
572 Self::Production { executor, .. } => executor.spawn(future),
573 #[cfg(any(test, feature = "test-support"))]
574 Self::Deterministic { executor } => executor.spawn(future),
575 };
576 Task::send(any_task)
577 }
578
579 pub fn block<F, T>(&self, future: F) -> T
580 where
581 F: Future<Output = T>,
582 {
583 smol::pin!(future);
584 match self {
585 Self::Production { .. } => smol::block_on(&mut future),
586 #[cfg(any(test, feature = "test-support"))]
587 Self::Deterministic { executor, .. } => {
588 executor.block(&mut future, usize::MAX).unwrap()
589 }
590 }
591 }
592
593 pub fn block_with_timeout<F, T>(
594 &self,
595 timeout: Duration,
596 future: F,
597 ) -> Result<T, impl Future<Output = T>>
598 where
599 T: 'static,
600 F: 'static + Unpin + Future<Output = T>,
601 {
602 let mut future = any_local_future(future);
603 if !timeout.is_zero() {
604 let output = match self {
605 Self::Production { .. } => smol::block_on(util::timeout(timeout, &mut future)).ok(),
606 #[cfg(any(test, feature = "test-support"))]
607 Self::Deterministic { executor, .. } => {
608 use rand::prelude::*;
609 let max_ticks = {
610 let mut state = executor.state.lock();
611 let range = state.block_on_ticks.clone();
612 state.rng.gen_range(range)
613 };
614 executor.block(&mut future, max_ticks)
615 }
616 };
617 if let Some(output) = output {
618 return Ok(*output.downcast().unwrap());
619 }
620 }
621 Err(async { *future.await.downcast().unwrap() })
622 }
623
624 pub async fn scoped<'scope, F>(&self, scheduler: F)
625 where
626 F: FnOnce(&mut Scope<'scope>),
627 {
628 let mut scope = Scope::new();
629 (scheduler)(&mut scope);
630 let spawned = mem::take(&mut scope.futures)
631 .into_iter()
632 .map(|f| self.spawn(f))
633 .collect::<Vec<_>>();
634 for task in spawned {
635 task.await;
636 }
637 }
638
639 pub fn timer(&self, duration: Duration) -> Timer {
640 match self {
641 Background::Production { .. } => Timer::Production(smol::Timer::after(duration)),
642 #[cfg(any(test, feature = "test-support"))]
643 Background::Deterministic { executor } => executor.timer(duration),
644 }
645 }
646
647 #[cfg(any(test, feature = "test-support"))]
648 pub async fn simulate_random_delay(&self) {
649 use rand::prelude::*;
650 use smol::future::yield_now;
651
652 match self {
653 Self::Deterministic { executor, .. } => {
654 if executor.state.lock().rng.gen_bool(0.2) {
655 let yields = executor.state.lock().rng.gen_range(1..=10);
656 for _ in 0..yields {
657 yield_now().await;
658 }
659 }
660 }
661 _ => panic!("this method can only be called on a deterministic executor"),
662 }
663 }
664}
665
666pub struct Scope<'a> {
667 futures: Vec<Pin<Box<dyn Future<Output = ()> + Send + 'static>>>,
668 tx: Option<mpsc::Sender<()>>,
669 rx: mpsc::Receiver<()>,
670 _phantom: PhantomData<&'a ()>,
671}
672
673impl<'a> Scope<'a> {
674 fn new() -> Self {
675 let (tx, rx) = mpsc::channel();
676 Self {
677 tx: Some(tx),
678 rx,
679 futures: Default::default(),
680 _phantom: PhantomData,
681 }
682 }
683
684 pub fn spawn<F>(&mut self, f: F)
685 where
686 F: Future<Output = ()> + Send + 'a,
687 {
688 let tx = self.tx.clone().unwrap();
689
690 // Safety: The 'a lifetime is guaranteed to outlive any of these futures because
691 // dropping this `Scope` blocks until all of the futures have resolved.
692 let f = unsafe {
693 mem::transmute::<
694 Pin<Box<dyn Future<Output = ()> + Send + 'a>>,
695 Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
696 >(Box::pin(async move {
697 f.await;
698 drop(tx);
699 }))
700 };
701 self.futures.push(f);
702 }
703}
704
705impl<'a> Drop for Scope<'a> {
706 fn drop(&mut self) {
707 self.tx.take().unwrap();
708
709 // Wait until the channel is closed, which means that all of the spawned
710 // futures have resolved.
711 self.rx.recv().ok();
712 }
713}
714
715impl<T> Task<T> {
716 pub fn ready(value: T) -> Self {
717 Self::Ready(Some(value))
718 }
719
720 fn local(any_task: AnyLocalTask) -> Self {
721 Self::Local {
722 any_task,
723 result_type: PhantomData,
724 }
725 }
726
727 pub fn detach(self) {
728 match self {
729 Task::Ready(_) => {}
730 Task::Local { any_task, .. } => any_task.detach(),
731 Task::Send { any_task, .. } => any_task.detach(),
732 }
733 }
734}
735
736impl<T: 'static, E: 'static + Display> Task<Result<T, E>> {
737 pub fn detach_and_log_err(self, cx: &mut MutableAppContext) {
738 cx.spawn(|_| async move {
739 if let Err(err) = self.await {
740 log::error!("{}", err);
741 }
742 })
743 .detach();
744 }
745}
746
747impl<T: Send> Task<T> {
748 fn send(any_task: AnyTask) -> Self {
749 Self::Send {
750 any_task,
751 result_type: PhantomData,
752 }
753 }
754}
755
756impl<T: fmt::Debug> fmt::Debug for Task<T> {
757 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
758 match self {
759 Task::Ready(value) => value.fmt(f),
760 Task::Local { any_task, .. } => any_task.fmt(f),
761 Task::Send { any_task, .. } => any_task.fmt(f),
762 }
763 }
764}
765
766impl<T: 'static> Future for Task<T> {
767 type Output = T;
768
769 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
770 match unsafe { self.get_unchecked_mut() } {
771 Task::Ready(value) => Poll::Ready(value.take().unwrap()),
772 Task::Local { any_task, .. } => {
773 any_task.poll(cx).map(|value| *value.downcast().unwrap())
774 }
775 Task::Send { any_task, .. } => {
776 any_task.poll(cx).map(|value| *value.downcast().unwrap())
777 }
778 }
779 }
780}
781
782fn any_future<T, F>(future: F) -> AnyFuture
783where
784 T: 'static + Send,
785 F: Future<Output = T> + Send + 'static,
786{
787 async { Box::new(future.await) as Box<dyn Any + Send> }.boxed()
788}
789
790fn any_local_future<T, F>(future: F) -> AnyLocalFuture
791where
792 T: 'static,
793 F: Future<Output = T> + 'static,
794{
795 async { Box::new(future.await) as Box<dyn Any> }.boxed_local()
796}