1use anyhow::{anyhow, Result};
2use async_task::Runnable;
3use futures::channel::mpsc;
4use smol::{channel, prelude::*, Executor};
5use std::{
6 any::Any,
7 fmt::{self, Display},
8 marker::PhantomData,
9 mem,
10 pin::Pin,
11 rc::Rc,
12 sync::Arc,
13 task::{Context, Poll},
14 thread,
15 time::Duration,
16};
17
18use crate::{
19 platform::{self, Dispatcher},
20 util, MutableAppContext,
21};
22
23pub enum Foreground {
24 Platform {
25 dispatcher: Arc<dyn platform::Dispatcher>,
26 _not_send_or_sync: PhantomData<Rc<()>>,
27 },
28 #[cfg(any(test, feature = "test-support"))]
29 Deterministic {
30 cx_id: usize,
31 executor: Arc<Deterministic>,
32 },
33}
34
35pub enum Background {
36 #[cfg(any(test, feature = "test-support"))]
37 Deterministic { executor: Arc<Deterministic> },
38 Production {
39 executor: Arc<smol::Executor<'static>>,
40 _stop: channel::Sender<()>,
41 },
42}
43
44type AnyLocalFuture = Pin<Box<dyn 'static + Future<Output = Box<dyn Any + 'static>>>>;
45type AnyFuture = Pin<Box<dyn 'static + Send + Future<Output = Box<dyn Any + Send + 'static>>>>;
46type AnyTask = async_task::Task<Box<dyn Any + Send + 'static>>;
47type AnyLocalTask = async_task::Task<Box<dyn Any + 'static>>;
48
49#[must_use]
50pub enum Task<T> {
51 Ready(Option<T>),
52 Local {
53 any_task: AnyLocalTask,
54 result_type: PhantomData<T>,
55 },
56 Send {
57 any_task: AnyTask,
58 result_type: PhantomData<T>,
59 },
60}
61
62unsafe impl<T: Send> Send for Task<T> {}
63
64#[cfg(any(test, feature = "test-support"))]
65struct DeterministicState {
66 rng: rand::prelude::StdRng,
67 seed: u64,
68 scheduled_from_foreground: collections::HashMap<usize, Vec<ForegroundRunnable>>,
69 scheduled_from_background: Vec<BackgroundRunnable>,
70 forbid_parking: bool,
71 block_on_ticks: std::ops::RangeInclusive<usize>,
72 now: std::time::Instant,
73 next_timer_id: usize,
74 pending_timers: Vec<(usize, std::time::Instant, postage::barrier::Sender)>,
75 waiting_backtrace: Option<backtrace::Backtrace>,
76 next_runnable_id: usize,
77 poll_history: Vec<usize>,
78 runnable_backtraces: collections::HashMap<usize, backtrace::Backtrace>,
79}
80
81#[cfg(any(test, feature = "test-support"))]
82struct ForegroundRunnable {
83 id: usize,
84 runnable: Runnable,
85 main: bool,
86}
87
88#[cfg(any(test, feature = "test-support"))]
89struct BackgroundRunnable {
90 id: usize,
91 runnable: Runnable,
92}
93
94#[cfg(any(test, feature = "test-support"))]
95pub struct Deterministic {
96 state: Arc<parking_lot::Mutex<DeterministicState>>,
97 parker: parking_lot::Mutex<parking::Parker>,
98}
99
100pub enum Timer {
101 Production(smol::Timer),
102 #[cfg(any(test, feature = "test-support"))]
103 Deterministic(DeterministicTimer),
104}
105
106#[cfg(any(test, feature = "test-support"))]
107pub struct DeterministicTimer {
108 rx: postage::barrier::Receiver,
109 id: usize,
110 state: Arc<parking_lot::Mutex<DeterministicState>>,
111}
112
113#[cfg(any(test, feature = "test-support"))]
114impl Deterministic {
115 pub fn new(seed: u64) -> Arc<Self> {
116 use rand::prelude::*;
117
118 Arc::new(Self {
119 state: Arc::new(parking_lot::Mutex::new(DeterministicState {
120 rng: StdRng::seed_from_u64(seed),
121 seed,
122 scheduled_from_foreground: Default::default(),
123 scheduled_from_background: Default::default(),
124 forbid_parking: false,
125 block_on_ticks: 0..=1000,
126 now: std::time::Instant::now(),
127 next_timer_id: Default::default(),
128 pending_timers: Default::default(),
129 waiting_backtrace: None,
130 next_runnable_id: 0,
131 poll_history: Default::default(),
132 runnable_backtraces: Default::default(),
133 })),
134 parker: Default::default(),
135 })
136 }
137
138 pub fn runnable_history(&self) -> Vec<usize> {
139 self.state.lock().poll_history.clone()
140 }
141
142 pub fn runnable_backtrace(&self, runnable_id: usize) -> backtrace::Backtrace {
143 let mut backtrace = self.state.lock().runnable_backtraces[&runnable_id].clone();
144 backtrace.resolve();
145 backtrace
146 }
147
148 pub fn build_background(self: &Arc<Self>) -> Arc<Background> {
149 Arc::new(Background::Deterministic {
150 executor: self.clone(),
151 })
152 }
153
154 pub fn build_foreground(self: &Arc<Self>, id: usize) -> Rc<Foreground> {
155 Rc::new(Foreground::Deterministic {
156 cx_id: id,
157 executor: self.clone(),
158 })
159 }
160
161 fn spawn_from_foreground(
162 &self,
163 cx_id: usize,
164 future: AnyLocalFuture,
165 main: bool,
166 ) -> AnyLocalTask {
167 let state = self.state.clone();
168 let id;
169 {
170 let mut state = state.lock();
171 id = util::post_inc(&mut state.next_runnable_id);
172 state
173 .runnable_backtraces
174 .insert(id, backtrace::Backtrace::new_unresolved());
175 }
176
177 let unparker = self.parker.lock().unparker();
178 let (runnable, task) = async_task::spawn_local(future, move |runnable| {
179 let mut state = state.lock();
180 state
181 .scheduled_from_foreground
182 .entry(cx_id)
183 .or_default()
184 .push(ForegroundRunnable { id, runnable, main });
185 unparker.unpark();
186 });
187 runnable.schedule();
188 task
189 }
190
191 fn spawn(&self, future: AnyFuture) -> AnyTask {
192 let state = self.state.clone();
193 let id;
194 {
195 let mut state = state.lock();
196 id = util::post_inc(&mut state.next_runnable_id);
197 state
198 .runnable_backtraces
199 .insert(id, backtrace::Backtrace::new_unresolved());
200 }
201
202 let unparker = self.parker.lock().unparker();
203 let (runnable, task) = async_task::spawn(future, move |runnable| {
204 let mut state = state.lock();
205 state
206 .scheduled_from_background
207 .push(BackgroundRunnable { id, runnable });
208 unparker.unpark();
209 });
210 runnable.schedule();
211 task
212 }
213
214 fn run<'a>(
215 &self,
216 cx_id: usize,
217 main_future: Pin<Box<dyn 'a + Future<Output = Box<dyn Any>>>>,
218 ) -> Box<dyn Any> {
219 use std::sync::atomic::{AtomicBool, Ordering::SeqCst};
220
221 let woken = Arc::new(AtomicBool::new(false));
222
223 let state = self.state.clone();
224 let id;
225 {
226 let mut state = state.lock();
227 id = util::post_inc(&mut state.next_runnable_id);
228 state
229 .runnable_backtraces
230 .insert(id, backtrace::Backtrace::new());
231 }
232
233 let unparker = self.parker.lock().unparker();
234 let (runnable, mut main_task) = unsafe {
235 async_task::spawn_unchecked(main_future, move |runnable| {
236 let state = &mut *state.lock();
237 state
238 .scheduled_from_foreground
239 .entry(cx_id)
240 .or_default()
241 .push(ForegroundRunnable {
242 id: util::post_inc(&mut state.next_runnable_id),
243 runnable,
244 main: true,
245 });
246 unparker.unpark();
247 })
248 };
249 runnable.schedule();
250
251 loop {
252 if let Some(result) = self.run_internal(woken.clone(), Some(&mut main_task)) {
253 return result;
254 }
255
256 if !woken.load(SeqCst) {
257 self.state.lock().will_park();
258 }
259
260 woken.store(false, SeqCst);
261 self.parker.lock().park();
262 }
263 }
264
265 pub fn run_until_parked(&self) {
266 use std::sync::atomic::AtomicBool;
267 let woken = Arc::new(AtomicBool::new(false));
268 self.run_internal(woken, None);
269 }
270
271 fn run_internal(
272 &self,
273 woken: Arc<std::sync::atomic::AtomicBool>,
274 mut main_task: Option<&mut AnyLocalTask>,
275 ) -> Option<Box<dyn Any>> {
276 use rand::prelude::*;
277 use std::sync::atomic::Ordering::SeqCst;
278
279 let unparker = self.parker.lock().unparker();
280 let waker = waker_fn::waker_fn(move || {
281 woken.store(true, SeqCst);
282 unparker.unpark();
283 });
284
285 let mut cx = Context::from_waker(&waker);
286 loop {
287 let mut state = self.state.lock();
288
289 if state.scheduled_from_foreground.is_empty()
290 && state.scheduled_from_background.is_empty()
291 {
292 if let Some(main_task) = main_task {
293 if let Poll::Ready(result) = main_task.poll(&mut cx) {
294 return Some(result);
295 }
296 }
297
298 return None;
299 }
300
301 if !state.scheduled_from_background.is_empty() && state.rng.gen() {
302 let background_len = state.scheduled_from_background.len();
303 let ix = state.rng.gen_range(0..background_len);
304 let background_runnable = state.scheduled_from_background.remove(ix);
305 state.poll_history.push(background_runnable.id);
306 drop(state);
307 background_runnable.runnable.run();
308 } else if !state.scheduled_from_foreground.is_empty() {
309 let available_cx_ids = state
310 .scheduled_from_foreground
311 .keys()
312 .copied()
313 .collect::<Vec<_>>();
314 let cx_id_to_run = *available_cx_ids.iter().choose(&mut state.rng).unwrap();
315 let scheduled_from_cx = state
316 .scheduled_from_foreground
317 .get_mut(&cx_id_to_run)
318 .unwrap();
319 let foreground_runnable = scheduled_from_cx.remove(0);
320 if scheduled_from_cx.is_empty() {
321 state.scheduled_from_foreground.remove(&cx_id_to_run);
322 }
323 state.poll_history.push(foreground_runnable.id);
324
325 drop(state);
326
327 foreground_runnable.runnable.run();
328 if let Some(main_task) = main_task.as_mut() {
329 if foreground_runnable.main {
330 if let Poll::Ready(result) = main_task.poll(&mut cx) {
331 return Some(result);
332 }
333 }
334 }
335 }
336 }
337 }
338
339 fn block<F, T>(&self, future: &mut F, max_ticks: usize) -> Option<T>
340 where
341 F: Unpin + Future<Output = T>,
342 {
343 use rand::prelude::*;
344
345 let unparker = self.parker.lock().unparker();
346 let waker = waker_fn::waker_fn(move || {
347 unparker.unpark();
348 });
349
350 let mut cx = Context::from_waker(&waker);
351 for _ in 0..max_ticks {
352 let mut state = self.state.lock();
353 let runnable_count = state.scheduled_from_background.len();
354 let ix = state.rng.gen_range(0..=runnable_count);
355 if ix < state.scheduled_from_background.len() {
356 let background_runnable = state.scheduled_from_background.remove(ix);
357 state.poll_history.push(background_runnable.id);
358 drop(state);
359 background_runnable.runnable.run();
360 } else {
361 drop(state);
362 if let Poll::Ready(result) = future.poll(&mut cx) {
363 return Some(result);
364 }
365 let mut state = self.state.lock();
366 if state.scheduled_from_background.is_empty() {
367 state.will_park();
368 drop(state);
369 self.parker.lock().park();
370 }
371
372 continue;
373 }
374 }
375
376 None
377 }
378
379 pub fn timer(&self, duration: Duration) -> Timer {
380 let (tx, rx) = postage::barrier::channel();
381 let mut state = self.state.lock();
382 let wakeup_at = state.now + duration;
383 let id = util::post_inc(&mut state.next_timer_id);
384 match state
385 .pending_timers
386 .binary_search_by_key(&wakeup_at, |e| e.1)
387 {
388 Ok(ix) | Err(ix) => state.pending_timers.insert(ix, (id, wakeup_at, tx)),
389 }
390 let state = self.state.clone();
391 Timer::Deterministic(DeterministicTimer { rx, id, state })
392 }
393
394 pub fn now(&self) -> std::time::Instant {
395 let state = self.state.lock();
396 state.now
397 }
398
399 pub fn advance_clock(&self, duration: Duration) {
400 let new_now = self.state.lock().now + duration;
401 loop {
402 self.run_until_parked();
403 let mut state = self.state.lock();
404
405 if let Some((_, wakeup_time, _)) = state.pending_timers.first() {
406 let wakeup_time = *wakeup_time;
407 if wakeup_time <= new_now {
408 let timer_count = state
409 .pending_timers
410 .iter()
411 .take_while(|(_, t, _)| *t == wakeup_time)
412 .count();
413 state.now = wakeup_time;
414 let timers_to_wake = state
415 .pending_timers
416 .drain(0..timer_count)
417 .collect::<Vec<_>>();
418 drop(state);
419 drop(timers_to_wake);
420 continue;
421 }
422 }
423
424 break;
425 }
426
427 self.state.lock().now = new_now;
428 }
429
430 pub fn start_waiting(&self) {
431 self.state.lock().waiting_backtrace = Some(backtrace::Backtrace::new_unresolved());
432 }
433
434 pub fn finish_waiting(&self) {
435 self.state.lock().waiting_backtrace.take();
436 }
437
438 pub fn forbid_parking(&self) {
439 use rand::prelude::*;
440
441 let mut state = self.state.lock();
442 state.forbid_parking = true;
443 state.rng = StdRng::seed_from_u64(state.seed);
444 }
445
446 pub async fn simulate_random_delay(&self) {
447 use rand::prelude::*;
448 use smol::future::yield_now;
449 if self.state.lock().rng.gen_bool(0.2) {
450 let yields = self.state.lock().rng.gen_range(1..=10);
451 for _ in 0..yields {
452 yield_now().await;
453 }
454 }
455 }
456}
457
458impl Drop for Timer {
459 fn drop(&mut self) {
460 #[cfg(any(test, feature = "test-support"))]
461 if let Timer::Deterministic(DeterministicTimer { state, id, .. }) = self {
462 state
463 .lock()
464 .pending_timers
465 .retain(|(timer_id, _, _)| timer_id != id)
466 }
467 }
468}
469
470impl Future for Timer {
471 type Output = ();
472
473 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
474 match &mut *self {
475 #[cfg(any(test, feature = "test-support"))]
476 Self::Deterministic(DeterministicTimer { rx, .. }) => {
477 use postage::stream::{PollRecv, Stream as _};
478 smol::pin!(rx);
479 match rx.poll_recv(&mut postage::Context::from_waker(cx.waker())) {
480 PollRecv::Ready(()) | PollRecv::Closed => Poll::Ready(()),
481 PollRecv::Pending => Poll::Pending,
482 }
483 }
484 Self::Production(timer) => {
485 smol::pin!(timer);
486 match timer.poll(cx) {
487 Poll::Ready(_) => Poll::Ready(()),
488 Poll::Pending => Poll::Pending,
489 }
490 }
491 }
492 }
493}
494
495#[cfg(any(test, feature = "test-support"))]
496impl DeterministicState {
497 fn will_park(&mut self) {
498 if self.forbid_parking {
499 let mut backtrace_message = String::new();
500 #[cfg(any(test, feature = "test-support"))]
501 if let Some(backtrace) = self.waiting_backtrace.as_mut() {
502 backtrace.resolve();
503 backtrace_message = format!(
504 "\nbacktrace of waiting future:\n{:?}",
505 util::CwdBacktrace(backtrace)
506 );
507 }
508
509 panic!(
510 "deterministic executor parked after a call to forbid_parking{}",
511 backtrace_message
512 );
513 }
514 }
515}
516
517impl Foreground {
518 pub fn platform(dispatcher: Arc<dyn platform::Dispatcher>) -> Result<Self> {
519 if dispatcher.is_main_thread() {
520 Ok(Self::Platform {
521 dispatcher,
522 _not_send_or_sync: PhantomData,
523 })
524 } else {
525 Err(anyhow!("must be constructed on main thread"))
526 }
527 }
528
529 pub fn spawn<T: 'static>(&self, future: impl Future<Output = T> + 'static) -> Task<T> {
530 let future = any_local_future(future);
531 let any_task = match self {
532 #[cfg(any(test, feature = "test-support"))]
533 Self::Deterministic { cx_id, executor } => {
534 executor.spawn_from_foreground(*cx_id, future, false)
535 }
536 Self::Platform { dispatcher, .. } => {
537 fn spawn_inner(
538 future: AnyLocalFuture,
539 dispatcher: &Arc<dyn Dispatcher>,
540 ) -> AnyLocalTask {
541 let dispatcher = dispatcher.clone();
542 let schedule =
543 move |runnable: Runnable| dispatcher.run_on_main_thread(runnable);
544 let (runnable, task) = async_task::spawn_local(future, schedule);
545 runnable.schedule();
546 task
547 }
548 spawn_inner(future, dispatcher)
549 }
550 };
551 Task::local(any_task)
552 }
553
554 #[cfg(any(test, feature = "test-support"))]
555 pub fn run<T: 'static>(&self, future: impl Future<Output = T>) -> T {
556 let future = async move { Box::new(future.await) as Box<dyn Any> }.boxed_local();
557 let result = match self {
558 Self::Deterministic { cx_id, executor } => executor.run(*cx_id, future),
559 Self::Platform { .. } => panic!("you can't call run on a platform foreground executor"),
560 };
561 *result.downcast().unwrap()
562 }
563
564 #[cfg(any(test, feature = "test-support"))]
565 pub fn run_until_parked(&self) {
566 match self {
567 Self::Deterministic { executor, .. } => executor.run_until_parked(),
568 _ => panic!("this method can only be called on a deterministic executor"),
569 }
570 }
571
572 #[cfg(any(test, feature = "test-support"))]
573 pub fn parking_forbidden(&self) -> bool {
574 match self {
575 Self::Deterministic { executor, .. } => executor.state.lock().forbid_parking,
576 _ => panic!("this method can only be called on a deterministic executor"),
577 }
578 }
579
580 #[cfg(any(test, feature = "test-support"))]
581 pub fn start_waiting(&self) {
582 match self {
583 Self::Deterministic { executor, .. } => executor.start_waiting(),
584 _ => panic!("this method can only be called on a deterministic executor"),
585 }
586 }
587
588 #[cfg(any(test, feature = "test-support"))]
589 pub fn finish_waiting(&self) {
590 match self {
591 Self::Deterministic { executor, .. } => executor.finish_waiting(),
592 _ => panic!("this method can only be called on a deterministic executor"),
593 }
594 }
595
596 #[cfg(any(test, feature = "test-support"))]
597 pub fn forbid_parking(&self) {
598 match self {
599 Self::Deterministic { executor, .. } => executor.forbid_parking(),
600 _ => panic!("this method can only be called on a deterministic executor"),
601 }
602 }
603
604 #[cfg(any(test, feature = "test-support"))]
605 pub fn advance_clock(&self, duration: Duration) {
606 match self {
607 Self::Deterministic { executor, .. } => executor.advance_clock(duration),
608 _ => panic!("this method can only be called on a deterministic executor"),
609 }
610 }
611
612 #[cfg(any(test, feature = "test-support"))]
613 pub fn set_block_on_ticks(&self, range: std::ops::RangeInclusive<usize>) {
614 match self {
615 Self::Deterministic { executor, .. } => executor.state.lock().block_on_ticks = range,
616 _ => panic!("this method can only be called on a deterministic executor"),
617 }
618 }
619}
620
621impl Background {
622 pub fn new() -> Self {
623 let executor = Arc::new(Executor::new());
624 let stop = channel::unbounded::<()>();
625
626 for i in 0..2 * num_cpus::get() {
627 let executor = executor.clone();
628 let stop = stop.1.clone();
629 thread::Builder::new()
630 .name(format!("background-executor-{}", i))
631 .spawn(move || smol::block_on(executor.run(stop.recv())))
632 .unwrap();
633 }
634
635 Self::Production {
636 executor,
637 _stop: stop.0,
638 }
639 }
640
641 pub fn num_cpus(&self) -> usize {
642 num_cpus::get()
643 }
644
645 pub fn spawn<T, F>(&self, future: F) -> Task<T>
646 where
647 T: 'static + Send,
648 F: Send + Future<Output = T> + 'static,
649 {
650 let future = any_future(future);
651 let any_task = match self {
652 Self::Production { executor, .. } => executor.spawn(future),
653 #[cfg(any(test, feature = "test-support"))]
654 Self::Deterministic { executor } => executor.spawn(future),
655 };
656 Task::send(any_task)
657 }
658
659 pub fn block<F, T>(&self, future: F) -> T
660 where
661 F: Future<Output = T>,
662 {
663 smol::pin!(future);
664 match self {
665 Self::Production { .. } => smol::block_on(&mut future),
666 #[cfg(any(test, feature = "test-support"))]
667 Self::Deterministic { executor, .. } => {
668 executor.block(&mut future, usize::MAX).unwrap()
669 }
670 }
671 }
672
673 pub fn block_with_timeout<F, T>(
674 &self,
675 timeout: Duration,
676 future: F,
677 ) -> Result<T, impl Future<Output = T>>
678 where
679 T: 'static,
680 F: 'static + Unpin + Future<Output = T>,
681 {
682 let mut future = any_local_future(future);
683 if !timeout.is_zero() {
684 let output = match self {
685 Self::Production { .. } => smol::block_on(util::timeout(timeout, &mut future)).ok(),
686 #[cfg(any(test, feature = "test-support"))]
687 Self::Deterministic { executor, .. } => {
688 use rand::prelude::*;
689 let max_ticks = {
690 let mut state = executor.state.lock();
691 let range = state.block_on_ticks.clone();
692 state.rng.gen_range(range)
693 };
694 executor.block(&mut future, max_ticks)
695 }
696 };
697 if let Some(output) = output {
698 return Ok(*output.downcast().unwrap());
699 }
700 }
701 Err(async { *future.await.downcast().unwrap() })
702 }
703
704 pub async fn scoped<'scope, F>(self: &Arc<Self>, scheduler: F)
705 where
706 F: FnOnce(&mut Scope<'scope>),
707 {
708 let mut scope = Scope::new(self.clone());
709 (scheduler)(&mut scope);
710 let spawned = mem::take(&mut scope.futures)
711 .into_iter()
712 .map(|f| self.spawn(f))
713 .collect::<Vec<_>>();
714 for task in spawned {
715 task.await;
716 }
717 }
718
719 pub fn timer(&self, duration: Duration) -> Timer {
720 match self {
721 Background::Production { .. } => Timer::Production(smol::Timer::after(duration)),
722 #[cfg(any(test, feature = "test-support"))]
723 Background::Deterministic { executor } => executor.timer(duration),
724 }
725 }
726
727 pub fn now(&self) -> std::time::Instant {
728 match self {
729 Background::Production { .. } => std::time::Instant::now(),
730 #[cfg(any(test, feature = "test-support"))]
731 Background::Deterministic { executor } => executor.now(),
732 }
733 }
734
735 #[cfg(any(test, feature = "test-support"))]
736 pub async fn simulate_random_delay(&self) {
737 match self {
738 Self::Deterministic { executor, .. } => {
739 executor.simulate_random_delay().await;
740 }
741 _ => {
742 panic!("this method can only be called on a deterministic executor")
743 }
744 }
745 }
746}
747
748impl Default for Background {
749 fn default() -> Self {
750 Self::new()
751 }
752}
753
754pub struct Scope<'a> {
755 executor: Arc<Background>,
756 futures: Vec<Pin<Box<dyn Future<Output = ()> + Send + 'static>>>,
757 tx: Option<mpsc::Sender<()>>,
758 rx: mpsc::Receiver<()>,
759 _phantom: PhantomData<&'a ()>,
760}
761
762impl<'a> Scope<'a> {
763 fn new(executor: Arc<Background>) -> Self {
764 let (tx, rx) = mpsc::channel(1);
765 Self {
766 executor,
767 tx: Some(tx),
768 rx,
769 futures: Default::default(),
770 _phantom: PhantomData,
771 }
772 }
773
774 pub fn spawn<F>(&mut self, f: F)
775 where
776 F: Future<Output = ()> + Send + 'a,
777 {
778 let tx = self.tx.clone().unwrap();
779
780 // Safety: The 'a lifetime is guaranteed to outlive any of these futures because
781 // dropping this `Scope` blocks until all of the futures have resolved.
782 let f = unsafe {
783 mem::transmute::<
784 Pin<Box<dyn Future<Output = ()> + Send + 'a>>,
785 Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
786 >(Box::pin(async move {
787 f.await;
788 drop(tx);
789 }))
790 };
791 self.futures.push(f);
792 }
793}
794
795impl<'a> Drop for Scope<'a> {
796 fn drop(&mut self) {
797 self.tx.take().unwrap();
798
799 // Wait until the channel is closed, which means that all of the spawned
800 // futures have resolved.
801 self.executor.block(self.rx.next());
802 }
803}
804
805impl<T> Task<T> {
806 pub fn ready(value: T) -> Self {
807 Self::Ready(Some(value))
808 }
809
810 fn local(any_task: AnyLocalTask) -> Self {
811 Self::Local {
812 any_task,
813 result_type: PhantomData,
814 }
815 }
816
817 pub fn detach(self) {
818 match self {
819 Task::Ready(_) => {}
820 Task::Local { any_task, .. } => any_task.detach(),
821 Task::Send { any_task, .. } => any_task.detach(),
822 }
823 }
824}
825
826impl<T: 'static, E: 'static + Display> Task<Result<T, E>> {
827 pub fn detach_and_log_err(self, cx: &mut MutableAppContext) {
828 cx.spawn(|_| async move {
829 if let Err(err) = self.await {
830 log::error!("{}", err);
831 }
832 })
833 .detach();
834 }
835}
836
837impl<T: Send> Task<T> {
838 fn send(any_task: AnyTask) -> Self {
839 Self::Send {
840 any_task,
841 result_type: PhantomData,
842 }
843 }
844}
845
846impl<T: fmt::Debug> fmt::Debug for Task<T> {
847 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
848 match self {
849 Task::Ready(value) => value.fmt(f),
850 Task::Local { any_task, .. } => any_task.fmt(f),
851 Task::Send { any_task, .. } => any_task.fmt(f),
852 }
853 }
854}
855
856impl<T: 'static> Future for Task<T> {
857 type Output = T;
858
859 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
860 match unsafe { self.get_unchecked_mut() } {
861 Task::Ready(value) => Poll::Ready(value.take().unwrap()),
862 Task::Local { any_task, .. } => {
863 any_task.poll(cx).map(|value| *value.downcast().unwrap())
864 }
865 Task::Send { any_task, .. } => {
866 any_task.poll(cx).map(|value| *value.downcast().unwrap())
867 }
868 }
869 }
870}
871
872fn any_future<T, F>(future: F) -> AnyFuture
873where
874 T: 'static + Send,
875 F: Future<Output = T> + Send + 'static,
876{
877 async { Box::new(future.await) as Box<dyn Any + Send> }.boxed()
878}
879
880fn any_local_future<T, F>(future: F) -> AnyLocalFuture
881where
882 T: 'static,
883 F: Future<Output = T> + 'static,
884{
885 async { Box::new(future.await) as Box<dyn Any> }.boxed_local()
886}