1use anyhow::{anyhow, Result};
2use async_task::Runnable;
3use futures::channel::mpsc;
4use smol::{channel, prelude::*, Executor};
5use std::{
6 any::Any,
7 fmt::{self, Display},
8 marker::PhantomData,
9 mem,
10 pin::Pin,
11 rc::Rc,
12 sync::Arc,
13 task::{Context, Poll},
14 thread,
15 time::Duration,
16};
17
18use crate::{
19 platform::{self, Dispatcher},
20 util, MutableAppContext,
21};
22
23pub enum Foreground {
24 Platform {
25 dispatcher: Arc<dyn platform::Dispatcher>,
26 _not_send_or_sync: PhantomData<Rc<()>>,
27 },
28 #[cfg(any(test, feature = "test-support"))]
29 Deterministic {
30 cx_id: usize,
31 executor: Arc<Deterministic>,
32 },
33}
34
35pub enum Background {
36 #[cfg(any(test, feature = "test-support"))]
37 Deterministic { executor: Arc<Deterministic> },
38 Production {
39 executor: Arc<smol::Executor<'static>>,
40 _stop: channel::Sender<()>,
41 },
42}
43
44type AnyLocalFuture = Pin<Box<dyn 'static + Future<Output = Box<dyn Any + 'static>>>>;
45type AnyFuture = Pin<Box<dyn 'static + Send + Future<Output = Box<dyn Any + Send + 'static>>>>;
46type AnyTask = async_task::Task<Box<dyn Any + Send + 'static>>;
47type AnyLocalTask = async_task::Task<Box<dyn Any + 'static>>;
48
49#[must_use]
50pub enum Task<T> {
51 Ready(Option<T>),
52 Local {
53 any_task: AnyLocalTask,
54 result_type: PhantomData<T>,
55 },
56 Send {
57 any_task: AnyTask,
58 result_type: PhantomData<T>,
59 },
60}
61
62unsafe impl<T: Send> Send for Task<T> {}
63
64#[cfg(any(test, feature = "test-support"))]
65struct DeterministicState {
66 rng: rand::prelude::StdRng,
67 seed: u64,
68 scheduled_from_foreground: collections::HashMap<usize, Vec<ForegroundRunnable>>,
69 scheduled_from_background: Vec<BackgroundRunnable>,
70 forbid_parking: bool,
71 block_on_ticks: std::ops::RangeInclusive<usize>,
72 now: std::time::Instant,
73 next_timer_id: usize,
74 pending_timers: Vec<(usize, std::time::Instant, postage::barrier::Sender)>,
75 waiting_backtrace: Option<backtrace::Backtrace>,
76 next_runnable_id: usize,
77 poll_history: Vec<usize>,
78 enable_runnable_backtraces: bool,
79 runnable_backtraces: collections::HashMap<usize, backtrace::Backtrace>,
80}
81
82#[cfg(any(test, feature = "test-support"))]
83struct ForegroundRunnable {
84 id: usize,
85 runnable: Runnable,
86 main: bool,
87}
88
89#[cfg(any(test, feature = "test-support"))]
90struct BackgroundRunnable {
91 id: usize,
92 runnable: Runnable,
93}
94
95#[cfg(any(test, feature = "test-support"))]
96pub struct Deterministic {
97 state: Arc<parking_lot::Mutex<DeterministicState>>,
98 parker: parking_lot::Mutex<parking::Parker>,
99}
100
101pub enum Timer {
102 Production(smol::Timer),
103 #[cfg(any(test, feature = "test-support"))]
104 Deterministic(DeterministicTimer),
105}
106
107#[cfg(any(test, feature = "test-support"))]
108pub struct DeterministicTimer {
109 rx: postage::barrier::Receiver,
110 id: usize,
111 state: Arc<parking_lot::Mutex<DeterministicState>>,
112}
113
114#[cfg(any(test, feature = "test-support"))]
115impl Deterministic {
116 pub fn new(seed: u64) -> Arc<Self> {
117 use rand::prelude::*;
118
119 Arc::new(Self {
120 state: Arc::new(parking_lot::Mutex::new(DeterministicState {
121 rng: StdRng::seed_from_u64(seed),
122 seed,
123 scheduled_from_foreground: Default::default(),
124 scheduled_from_background: Default::default(),
125 forbid_parking: false,
126 block_on_ticks: 0..=1000,
127 now: std::time::Instant::now(),
128 next_timer_id: Default::default(),
129 pending_timers: Default::default(),
130 waiting_backtrace: None,
131 next_runnable_id: 0,
132 poll_history: Default::default(),
133 enable_runnable_backtraces: false,
134 runnable_backtraces: Default::default(),
135 })),
136 parker: Default::default(),
137 })
138 }
139
140 pub fn runnable_history(&self) -> Vec<usize> {
141 self.state.lock().poll_history.clone()
142 }
143
144 pub fn enable_runnable_backtrace(&self) {
145 self.state.lock().enable_runnable_backtraces = true;
146 }
147
148 pub fn runnable_backtrace(&self, runnable_id: usize) -> backtrace::Backtrace {
149 let mut backtrace = self.state.lock().runnable_backtraces[&runnable_id].clone();
150 backtrace.resolve();
151 backtrace
152 }
153
154 pub fn build_background(self: &Arc<Self>) -> Arc<Background> {
155 Arc::new(Background::Deterministic {
156 executor: self.clone(),
157 })
158 }
159
160 pub fn build_foreground(self: &Arc<Self>, id: usize) -> Rc<Foreground> {
161 Rc::new(Foreground::Deterministic {
162 cx_id: id,
163 executor: self.clone(),
164 })
165 }
166
167 fn spawn_from_foreground(
168 &self,
169 cx_id: usize,
170 future: AnyLocalFuture,
171 main: bool,
172 ) -> AnyLocalTask {
173 let state = self.state.clone();
174 let id;
175 {
176 let mut state = state.lock();
177 id = util::post_inc(&mut state.next_runnable_id);
178 if state.enable_runnable_backtraces {
179 state
180 .runnable_backtraces
181 .insert(id, backtrace::Backtrace::new_unresolved());
182 }
183 }
184
185 let unparker = self.parker.lock().unparker();
186 let (runnable, task) = async_task::spawn_local(future, move |runnable| {
187 let mut state = state.lock();
188 state
189 .scheduled_from_foreground
190 .entry(cx_id)
191 .or_default()
192 .push(ForegroundRunnable { id, runnable, main });
193 unparker.unpark();
194 });
195 runnable.schedule();
196 task
197 }
198
199 fn spawn(&self, future: AnyFuture) -> AnyTask {
200 let state = self.state.clone();
201 let id;
202 {
203 let mut state = state.lock();
204 id = util::post_inc(&mut state.next_runnable_id);
205 if state.enable_runnable_backtraces {
206 state
207 .runnable_backtraces
208 .insert(id, backtrace::Backtrace::new_unresolved());
209 }
210 }
211
212 let unparker = self.parker.lock().unparker();
213 let (runnable, task) = async_task::spawn(future, move |runnable| {
214 let mut state = state.lock();
215 state
216 .scheduled_from_background
217 .push(BackgroundRunnable { id, runnable });
218 unparker.unpark();
219 });
220 runnable.schedule();
221 task
222 }
223
224 fn run<'a>(
225 &self,
226 cx_id: usize,
227 main_future: Pin<Box<dyn 'a + Future<Output = Box<dyn Any>>>>,
228 ) -> Box<dyn Any> {
229 use std::sync::atomic::{AtomicBool, Ordering::SeqCst};
230
231 let woken = Arc::new(AtomicBool::new(false));
232
233 let state = self.state.clone();
234 let id;
235 {
236 let mut state = state.lock();
237 id = util::post_inc(&mut state.next_runnable_id);
238 if state.enable_runnable_backtraces {
239 state
240 .runnable_backtraces
241 .insert(id, backtrace::Backtrace::new_unresolved());
242 }
243 }
244
245 let unparker = self.parker.lock().unparker();
246 let (runnable, mut main_task) = unsafe {
247 async_task::spawn_unchecked(main_future, move |runnable| {
248 let state = &mut *state.lock();
249 state
250 .scheduled_from_foreground
251 .entry(cx_id)
252 .or_default()
253 .push(ForegroundRunnable {
254 id: util::post_inc(&mut state.next_runnable_id),
255 runnable,
256 main: true,
257 });
258 unparker.unpark();
259 })
260 };
261 runnable.schedule();
262
263 loop {
264 if let Some(result) = self.run_internal(woken.clone(), Some(&mut main_task)) {
265 return result;
266 }
267
268 if !woken.load(SeqCst) {
269 self.state.lock().will_park();
270 }
271
272 woken.store(false, SeqCst);
273 self.parker.lock().park();
274 }
275 }
276
277 pub fn run_until_parked(&self) {
278 use std::sync::atomic::AtomicBool;
279 let woken = Arc::new(AtomicBool::new(false));
280 self.run_internal(woken, None);
281 }
282
283 fn run_internal(
284 &self,
285 woken: Arc<std::sync::atomic::AtomicBool>,
286 mut main_task: Option<&mut AnyLocalTask>,
287 ) -> Option<Box<dyn Any>> {
288 use rand::prelude::*;
289 use std::sync::atomic::Ordering::SeqCst;
290
291 let unparker = self.parker.lock().unparker();
292 let waker = waker_fn::waker_fn(move || {
293 woken.store(true, SeqCst);
294 unparker.unpark();
295 });
296
297 let mut cx = Context::from_waker(&waker);
298 loop {
299 let mut state = self.state.lock();
300
301 if state.scheduled_from_foreground.is_empty()
302 && state.scheduled_from_background.is_empty()
303 {
304 if let Some(main_task) = main_task {
305 if let Poll::Ready(result) = main_task.poll(&mut cx) {
306 return Some(result);
307 }
308 }
309
310 return None;
311 }
312
313 if !state.scheduled_from_background.is_empty() && state.rng.gen() {
314 let background_len = state.scheduled_from_background.len();
315 let ix = state.rng.gen_range(0..background_len);
316 let background_runnable = state.scheduled_from_background.remove(ix);
317 state.poll_history.push(background_runnable.id);
318 drop(state);
319 background_runnable.runnable.run();
320 } else if !state.scheduled_from_foreground.is_empty() {
321 let available_cx_ids = state
322 .scheduled_from_foreground
323 .keys()
324 .copied()
325 .collect::<Vec<_>>();
326 let cx_id_to_run = *available_cx_ids.iter().choose(&mut state.rng).unwrap();
327 let scheduled_from_cx = state
328 .scheduled_from_foreground
329 .get_mut(&cx_id_to_run)
330 .unwrap();
331 let foreground_runnable = scheduled_from_cx.remove(0);
332 if scheduled_from_cx.is_empty() {
333 state.scheduled_from_foreground.remove(&cx_id_to_run);
334 }
335 state.poll_history.push(foreground_runnable.id);
336
337 drop(state);
338
339 foreground_runnable.runnable.run();
340 if let Some(main_task) = main_task.as_mut() {
341 if foreground_runnable.main {
342 if let Poll::Ready(result) = main_task.poll(&mut cx) {
343 return Some(result);
344 }
345 }
346 }
347 }
348 }
349 }
350
351 fn block<F, T>(&self, future: &mut F, max_ticks: usize) -> Option<T>
352 where
353 F: Unpin + Future<Output = T>,
354 {
355 use rand::prelude::*;
356
357 let unparker = self.parker.lock().unparker();
358 let waker = waker_fn::waker_fn(move || {
359 unparker.unpark();
360 });
361
362 let mut cx = Context::from_waker(&waker);
363 for _ in 0..max_ticks {
364 let mut state = self.state.lock();
365 let runnable_count = state.scheduled_from_background.len();
366 let ix = state.rng.gen_range(0..=runnable_count);
367 if ix < state.scheduled_from_background.len() {
368 let background_runnable = state.scheduled_from_background.remove(ix);
369 state.poll_history.push(background_runnable.id);
370 drop(state);
371 background_runnable.runnable.run();
372 } else {
373 drop(state);
374 if let Poll::Ready(result) = future.poll(&mut cx) {
375 return Some(result);
376 }
377 let mut state = self.state.lock();
378 if state.scheduled_from_background.is_empty() {
379 state.will_park();
380 drop(state);
381 self.parker.lock().park();
382 }
383
384 continue;
385 }
386 }
387
388 None
389 }
390
391 pub fn timer(&self, duration: Duration) -> Timer {
392 let (tx, rx) = postage::barrier::channel();
393 let mut state = self.state.lock();
394 let wakeup_at = state.now + duration;
395 let id = util::post_inc(&mut state.next_timer_id);
396 match state
397 .pending_timers
398 .binary_search_by_key(&wakeup_at, |e| e.1)
399 {
400 Ok(ix) | Err(ix) => state.pending_timers.insert(ix, (id, wakeup_at, tx)),
401 }
402 let state = self.state.clone();
403 Timer::Deterministic(DeterministicTimer { rx, id, state })
404 }
405
406 pub fn now(&self) -> std::time::Instant {
407 let state = self.state.lock();
408 state.now
409 }
410
411 pub fn advance_clock(&self, duration: Duration) {
412 let new_now = self.state.lock().now + duration;
413 loop {
414 self.run_until_parked();
415 let mut state = self.state.lock();
416
417 if let Some((_, wakeup_time, _)) = state.pending_timers.first() {
418 let wakeup_time = *wakeup_time;
419 if wakeup_time <= new_now {
420 let timer_count = state
421 .pending_timers
422 .iter()
423 .take_while(|(_, t, _)| *t == wakeup_time)
424 .count();
425 state.now = wakeup_time;
426 let timers_to_wake = state
427 .pending_timers
428 .drain(0..timer_count)
429 .collect::<Vec<_>>();
430 drop(state);
431 drop(timers_to_wake);
432 continue;
433 }
434 }
435
436 break;
437 }
438
439 self.state.lock().now = new_now;
440 }
441
442 pub fn start_waiting(&self) {
443 self.state.lock().waiting_backtrace = Some(backtrace::Backtrace::new_unresolved());
444 }
445
446 pub fn finish_waiting(&self) {
447 self.state.lock().waiting_backtrace.take();
448 }
449
450 pub fn forbid_parking(&self) {
451 use rand::prelude::*;
452
453 let mut state = self.state.lock();
454 state.forbid_parking = true;
455 state.rng = StdRng::seed_from_u64(state.seed);
456 }
457
458 pub async fn simulate_random_delay(&self) {
459 use rand::prelude::*;
460 use smol::future::yield_now;
461 if self.state.lock().rng.gen_bool(0.2) {
462 let yields = self.state.lock().rng.gen_range(1..=10);
463 for _ in 0..yields {
464 yield_now().await;
465 }
466 }
467 }
468}
469
470impl Drop for Timer {
471 fn drop(&mut self) {
472 #[cfg(any(test, feature = "test-support"))]
473 if let Timer::Deterministic(DeterministicTimer { state, id, .. }) = self {
474 state
475 .lock()
476 .pending_timers
477 .retain(|(timer_id, _, _)| timer_id != id)
478 }
479 }
480}
481
482impl Future for Timer {
483 type Output = ();
484
485 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
486 match &mut *self {
487 #[cfg(any(test, feature = "test-support"))]
488 Self::Deterministic(DeterministicTimer { rx, .. }) => {
489 use postage::stream::{PollRecv, Stream as _};
490 smol::pin!(rx);
491 match rx.poll_recv(&mut postage::Context::from_waker(cx.waker())) {
492 PollRecv::Ready(()) | PollRecv::Closed => Poll::Ready(()),
493 PollRecv::Pending => Poll::Pending,
494 }
495 }
496 Self::Production(timer) => {
497 smol::pin!(timer);
498 match timer.poll(cx) {
499 Poll::Ready(_) => Poll::Ready(()),
500 Poll::Pending => Poll::Pending,
501 }
502 }
503 }
504 }
505}
506
507#[cfg(any(test, feature = "test-support"))]
508impl DeterministicState {
509 fn will_park(&mut self) {
510 if self.forbid_parking {
511 let mut backtrace_message = String::new();
512 #[cfg(any(test, feature = "test-support"))]
513 if let Some(backtrace) = self.waiting_backtrace.as_mut() {
514 backtrace.resolve();
515 backtrace_message = format!(
516 "\nbacktrace of waiting future:\n{:?}",
517 util::CwdBacktrace(backtrace)
518 );
519 }
520
521 panic!(
522 "deterministic executor parked after a call to forbid_parking{}",
523 backtrace_message
524 );
525 }
526 }
527}
528
529impl Foreground {
530 pub fn platform(dispatcher: Arc<dyn platform::Dispatcher>) -> Result<Self> {
531 if dispatcher.is_main_thread() {
532 Ok(Self::Platform {
533 dispatcher,
534 _not_send_or_sync: PhantomData,
535 })
536 } else {
537 Err(anyhow!("must be constructed on main thread"))
538 }
539 }
540
541 pub fn spawn<T: 'static>(&self, future: impl Future<Output = T> + 'static) -> Task<T> {
542 let future = any_local_future(future);
543 let any_task = match self {
544 #[cfg(any(test, feature = "test-support"))]
545 Self::Deterministic { cx_id, executor } => {
546 executor.spawn_from_foreground(*cx_id, future, false)
547 }
548 Self::Platform { dispatcher, .. } => {
549 fn spawn_inner(
550 future: AnyLocalFuture,
551 dispatcher: &Arc<dyn Dispatcher>,
552 ) -> AnyLocalTask {
553 let dispatcher = dispatcher.clone();
554 let schedule =
555 move |runnable: Runnable| dispatcher.run_on_main_thread(runnable);
556 let (runnable, task) = async_task::spawn_local(future, schedule);
557 runnable.schedule();
558 task
559 }
560 spawn_inner(future, dispatcher)
561 }
562 };
563 Task::local(any_task)
564 }
565
566 #[cfg(any(test, feature = "test-support"))]
567 pub fn run<T: 'static>(&self, future: impl Future<Output = T>) -> T {
568 let future = async move { Box::new(future.await) as Box<dyn Any> }.boxed_local();
569 let result = match self {
570 Self::Deterministic { cx_id, executor } => executor.run(*cx_id, future),
571 Self::Platform { .. } => panic!("you can't call run on a platform foreground executor"),
572 };
573 *result.downcast().unwrap()
574 }
575
576 #[cfg(any(test, feature = "test-support"))]
577 pub fn run_until_parked(&self) {
578 match self {
579 Self::Deterministic { executor, .. } => executor.run_until_parked(),
580 _ => panic!("this method can only be called on a deterministic executor"),
581 }
582 }
583
584 #[cfg(any(test, feature = "test-support"))]
585 pub fn parking_forbidden(&self) -> bool {
586 match self {
587 Self::Deterministic { executor, .. } => executor.state.lock().forbid_parking,
588 _ => panic!("this method can only be called on a deterministic executor"),
589 }
590 }
591
592 #[cfg(any(test, feature = "test-support"))]
593 pub fn start_waiting(&self) {
594 match self {
595 Self::Deterministic { executor, .. } => executor.start_waiting(),
596 _ => panic!("this method can only be called on a deterministic executor"),
597 }
598 }
599
600 #[cfg(any(test, feature = "test-support"))]
601 pub fn finish_waiting(&self) {
602 match self {
603 Self::Deterministic { executor, .. } => executor.finish_waiting(),
604 _ => panic!("this method can only be called on a deterministic executor"),
605 }
606 }
607
608 #[cfg(any(test, feature = "test-support"))]
609 pub fn forbid_parking(&self) {
610 match self {
611 Self::Deterministic { executor, .. } => executor.forbid_parking(),
612 _ => panic!("this method can only be called on a deterministic executor"),
613 }
614 }
615
616 #[cfg(any(test, feature = "test-support"))]
617 pub fn advance_clock(&self, duration: Duration) {
618 match self {
619 Self::Deterministic { executor, .. } => executor.advance_clock(duration),
620 _ => panic!("this method can only be called on a deterministic executor"),
621 }
622 }
623
624 #[cfg(any(test, feature = "test-support"))]
625 pub fn set_block_on_ticks(&self, range: std::ops::RangeInclusive<usize>) {
626 match self {
627 Self::Deterministic { executor, .. } => executor.state.lock().block_on_ticks = range,
628 _ => panic!("this method can only be called on a deterministic executor"),
629 }
630 }
631}
632
633impl Background {
634 pub fn new() -> Self {
635 let executor = Arc::new(Executor::new());
636 let stop = channel::unbounded::<()>();
637
638 for i in 0..2 * num_cpus::get() {
639 let executor = executor.clone();
640 let stop = stop.1.clone();
641 thread::Builder::new()
642 .name(format!("background-executor-{}", i))
643 .spawn(move || smol::block_on(executor.run(stop.recv())))
644 .unwrap();
645 }
646
647 Self::Production {
648 executor,
649 _stop: stop.0,
650 }
651 }
652
653 pub fn num_cpus(&self) -> usize {
654 num_cpus::get()
655 }
656
657 pub fn spawn<T, F>(&self, future: F) -> Task<T>
658 where
659 T: 'static + Send,
660 F: Send + Future<Output = T> + 'static,
661 {
662 let future = any_future(future);
663 let any_task = match self {
664 Self::Production { executor, .. } => executor.spawn(future),
665 #[cfg(any(test, feature = "test-support"))]
666 Self::Deterministic { executor } => executor.spawn(future),
667 };
668 Task::send(any_task)
669 }
670
671 pub fn block<F, T>(&self, future: F) -> T
672 where
673 F: Future<Output = T>,
674 {
675 smol::pin!(future);
676 match self {
677 Self::Production { .. } => smol::block_on(&mut future),
678 #[cfg(any(test, feature = "test-support"))]
679 Self::Deterministic { executor, .. } => {
680 executor.block(&mut future, usize::MAX).unwrap()
681 }
682 }
683 }
684
685 pub fn block_with_timeout<F, T>(
686 &self,
687 timeout: Duration,
688 future: F,
689 ) -> Result<T, impl Future<Output = T>>
690 where
691 T: 'static,
692 F: 'static + Unpin + Future<Output = T>,
693 {
694 let mut future = any_local_future(future);
695 if !timeout.is_zero() {
696 let output = match self {
697 Self::Production { .. } => smol::block_on(util::timeout(timeout, &mut future)).ok(),
698 #[cfg(any(test, feature = "test-support"))]
699 Self::Deterministic { executor, .. } => {
700 use rand::prelude::*;
701 let max_ticks = {
702 let mut state = executor.state.lock();
703 let range = state.block_on_ticks.clone();
704 state.rng.gen_range(range)
705 };
706 executor.block(&mut future, max_ticks)
707 }
708 };
709 if let Some(output) = output {
710 return Ok(*output.downcast().unwrap());
711 }
712 }
713 Err(async { *future.await.downcast().unwrap() })
714 }
715
716 pub async fn scoped<'scope, F>(self: &Arc<Self>, scheduler: F)
717 where
718 F: FnOnce(&mut Scope<'scope>),
719 {
720 let mut scope = Scope::new(self.clone());
721 (scheduler)(&mut scope);
722 let spawned = mem::take(&mut scope.futures)
723 .into_iter()
724 .map(|f| self.spawn(f))
725 .collect::<Vec<_>>();
726 for task in spawned {
727 task.await;
728 }
729 }
730
731 pub fn timer(&self, duration: Duration) -> Timer {
732 match self {
733 Background::Production { .. } => Timer::Production(smol::Timer::after(duration)),
734 #[cfg(any(test, feature = "test-support"))]
735 Background::Deterministic { executor } => executor.timer(duration),
736 }
737 }
738
739 pub fn now(&self) -> std::time::Instant {
740 match self {
741 Background::Production { .. } => std::time::Instant::now(),
742 #[cfg(any(test, feature = "test-support"))]
743 Background::Deterministic { executor } => executor.now(),
744 }
745 }
746
747 #[cfg(any(test, feature = "test-support"))]
748 pub async fn simulate_random_delay(&self) {
749 match self {
750 Self::Deterministic { executor, .. } => {
751 executor.simulate_random_delay().await;
752 }
753 _ => {
754 panic!("this method can only be called on a deterministic executor")
755 }
756 }
757 }
758}
759
760impl Default for Background {
761 fn default() -> Self {
762 Self::new()
763 }
764}
765
766pub struct Scope<'a> {
767 executor: Arc<Background>,
768 futures: Vec<Pin<Box<dyn Future<Output = ()> + Send + 'static>>>,
769 tx: Option<mpsc::Sender<()>>,
770 rx: mpsc::Receiver<()>,
771 _phantom: PhantomData<&'a ()>,
772}
773
774impl<'a> Scope<'a> {
775 fn new(executor: Arc<Background>) -> Self {
776 let (tx, rx) = mpsc::channel(1);
777 Self {
778 executor,
779 tx: Some(tx),
780 rx,
781 futures: Default::default(),
782 _phantom: PhantomData,
783 }
784 }
785
786 pub fn spawn<F>(&mut self, f: F)
787 where
788 F: Future<Output = ()> + Send + 'a,
789 {
790 let tx = self.tx.clone().unwrap();
791
792 // Safety: The 'a lifetime is guaranteed to outlive any of these futures because
793 // dropping this `Scope` blocks until all of the futures have resolved.
794 let f = unsafe {
795 mem::transmute::<
796 Pin<Box<dyn Future<Output = ()> + Send + 'a>>,
797 Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
798 >(Box::pin(async move {
799 f.await;
800 drop(tx);
801 }))
802 };
803 self.futures.push(f);
804 }
805}
806
807impl<'a> Drop for Scope<'a> {
808 fn drop(&mut self) {
809 self.tx.take().unwrap();
810
811 // Wait until the channel is closed, which means that all of the spawned
812 // futures have resolved.
813 self.executor.block(self.rx.next());
814 }
815}
816
817impl<T> Task<T> {
818 pub fn ready(value: T) -> Self {
819 Self::Ready(Some(value))
820 }
821
822 fn local(any_task: AnyLocalTask) -> Self {
823 Self::Local {
824 any_task,
825 result_type: PhantomData,
826 }
827 }
828
829 pub fn detach(self) {
830 match self {
831 Task::Ready(_) => {}
832 Task::Local { any_task, .. } => any_task.detach(),
833 Task::Send { any_task, .. } => any_task.detach(),
834 }
835 }
836}
837
838impl<T: 'static, E: 'static + Display> Task<Result<T, E>> {
839 pub fn detach_and_log_err(self, cx: &mut MutableAppContext) {
840 cx.spawn(|_| async move {
841 if let Err(err) = self.await {
842 log::error!("{}", err);
843 }
844 })
845 .detach();
846 }
847}
848
849impl<T: Send> Task<T> {
850 fn send(any_task: AnyTask) -> Self {
851 Self::Send {
852 any_task,
853 result_type: PhantomData,
854 }
855 }
856}
857
858impl<T: fmt::Debug> fmt::Debug for Task<T> {
859 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
860 match self {
861 Task::Ready(value) => value.fmt(f),
862 Task::Local { any_task, .. } => any_task.fmt(f),
863 Task::Send { any_task, .. } => any_task.fmt(f),
864 }
865 }
866}
867
868impl<T: 'static> Future for Task<T> {
869 type Output = T;
870
871 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
872 match unsafe { self.get_unchecked_mut() } {
873 Task::Ready(value) => Poll::Ready(value.take().unwrap()),
874 Task::Local { any_task, .. } => {
875 any_task.poll(cx).map(|value| *value.downcast().unwrap())
876 }
877 Task::Send { any_task, .. } => {
878 any_task.poll(cx).map(|value| *value.downcast().unwrap())
879 }
880 }
881 }
882}
883
884fn any_future<T, F>(future: F) -> AnyFuture
885where
886 T: 'static + Send,
887 F: Future<Output = T> + Send + 'static,
888{
889 async { Box::new(future.await) as Box<dyn Any + Send> }.boxed()
890}
891
892fn any_local_future<T, F>(future: F) -> AnyLocalFuture
893where
894 T: 'static,
895 F: Future<Output = T> + 'static,
896{
897 async { Box::new(future.await) as Box<dyn Any> }.boxed_local()
898}