1use crate::{App, PlatformDispatcher, RunnableMeta, RunnableVariant, TaskTiming, profiler};
2use async_task::Runnable;
3use futures::channel::mpsc;
4use smol::prelude::*;
5use std::{
6 fmt::Debug,
7 marker::PhantomData,
8 mem::{self, ManuallyDrop},
9 num::NonZeroUsize,
10 panic::Location,
11 pin::Pin,
12 rc::Rc,
13 sync::{
14 Arc,
15 atomic::{AtomicUsize, Ordering},
16 },
17 task::{Context, Poll},
18 thread::{self, ThreadId},
19 time::{Duration, Instant},
20};
21use util::TryFutureExt;
22use waker_fn::waker_fn;
23
24#[cfg(any(test, feature = "test-support"))]
25use rand::rngs::StdRng;
26
27/// A pointer to the executor that is currently running,
28/// for spawning background tasks.
29#[derive(Clone)]
30pub struct BackgroundExecutor {
31 #[doc(hidden)]
32 pub dispatcher: Arc<dyn PlatformDispatcher>,
33}
34
35/// A pointer to the executor that is currently running,
36/// for spawning tasks on the main thread.
37///
38/// This is intentionally `!Send` via the `not_send` marker field. This is because
39/// `ForegroundExecutor::spawn` does not require `Send` but checks at runtime that the future is
40/// only polled from the same thread it was spawned from. These checks would fail when spawning
41/// foreground tasks from background threads.
42#[derive(Clone)]
43pub struct ForegroundExecutor {
44 #[doc(hidden)]
45 pub dispatcher: Arc<dyn PlatformDispatcher>,
46 not_send: PhantomData<Rc<()>>,
47}
48
49/// Realtime task priority
50#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
51#[repr(u8)]
52pub enum RealtimePriority {
53 /// Audio task
54 Audio,
55 /// Other realtime task
56 #[default]
57 Other,
58}
59
60/// Task priority
61#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
62#[repr(u8)]
63pub enum Priority {
64 /// Realtime priority
65 ///
66 /// Spawning a task with this priority will spin it off on a separate thread dedicated just to that task.
67 Realtime(RealtimePriority),
68 /// High priority
69 ///
70 /// Only use for tasks that are critical to the user experience / responsiveness of the editor.
71 High,
72 /// Medium priority, probably suits most of your use cases.
73 #[default]
74 Medium,
75 /// Low priority
76 ///
77 /// Prioritize this for background work that can come in large quantities
78 /// to not starve the executor of resources for high priority tasks
79 Low,
80}
81
82impl Priority {
83 #[allow(dead_code)]
84 pub(crate) const fn probability(&self) -> u32 {
85 match self {
86 // realtime priorities are not considered for probability scheduling
87 Priority::Realtime(_) => 0,
88 Priority::High => 60,
89 Priority::Medium => 30,
90 Priority::Low => 10,
91 }
92 }
93}
94
95/// Task is a primitive that allows work to happen in the background.
96///
97/// It implements [`Future`] so you can `.await` on it.
98///
99/// If you drop a task it will be cancelled immediately. Calling [`Task::detach`] allows
100/// the task to continue running, but with no way to return a value.
101#[must_use]
102#[derive(Debug)]
103pub struct Task<T>(TaskState<T>);
104
105#[derive(Debug)]
106enum TaskState<T> {
107 /// A task that is ready to return a value
108 Ready(Option<T>),
109
110 /// A task that is currently running.
111 Spawned(async_task::Task<T, RunnableMeta>),
112}
113
114impl<T> Task<T> {
115 /// Creates a new task that will resolve with the value
116 pub fn ready(val: T) -> Self {
117 Task(TaskState::Ready(Some(val)))
118 }
119
120 /// Detaching a task runs it to completion in the background
121 pub fn detach(self) {
122 match self {
123 Task(TaskState::Ready(_)) => {}
124 Task(TaskState::Spawned(task)) => task.detach(),
125 }
126 }
127}
128
129impl<E, T> Task<Result<T, E>>
130where
131 T: 'static,
132 E: 'static + Debug,
133{
134 /// Run the task to completion in the background and log any
135 /// errors that occur.
136 #[track_caller]
137 pub fn detach_and_log_err(self, cx: &App) {
138 let location = core::panic::Location::caller();
139 cx.foreground_executor()
140 .spawn(self.log_tracked_err(*location))
141 .detach();
142 }
143}
144
145impl<T> Future for Task<T> {
146 type Output = T;
147
148 fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
149 match unsafe { self.get_unchecked_mut() } {
150 Task(TaskState::Ready(val)) => Poll::Ready(val.take().unwrap()),
151 Task(TaskState::Spawned(task)) => task.poll(cx),
152 }
153 }
154}
155
156/// A task label is an opaque identifier that you can use to
157/// refer to a task in tests.
158#[derive(Clone, Copy, PartialEq, Eq, Hash, Debug)]
159pub struct TaskLabel(NonZeroUsize);
160
161impl Default for TaskLabel {
162 fn default() -> Self {
163 Self::new()
164 }
165}
166
167impl TaskLabel {
168 /// Construct a new task label.
169 pub fn new() -> Self {
170 static NEXT_TASK_LABEL: AtomicUsize = AtomicUsize::new(1);
171 Self(
172 NEXT_TASK_LABEL
173 .fetch_add(1, Ordering::SeqCst)
174 .try_into()
175 .unwrap(),
176 )
177 }
178}
179
180type AnyLocalFuture<R> = Pin<Box<dyn 'static + Future<Output = R>>>;
181
182type AnyFuture<R> = Pin<Box<dyn 'static + Send + Future<Output = R>>>;
183
184/// BackgroundExecutor lets you run things on background threads.
185/// In production this is a thread pool with no ordering guarantees.
186/// In tests this is simulated by running tasks one by one in a deterministic
187/// (but arbitrary) order controlled by the `SEED` environment variable.
188impl BackgroundExecutor {
189 #[doc(hidden)]
190 pub fn new(dispatcher: Arc<dyn PlatformDispatcher>) -> Self {
191 Self { dispatcher }
192 }
193
194 /// Enqueues the given future to be run to completion on a background thread.
195 #[track_caller]
196 pub fn spawn<R>(&self, future: impl Future<Output = R> + Send + 'static) -> Task<R>
197 where
198 R: Send + 'static,
199 {
200 self.spawn_with_priority(Priority::default(), future)
201 }
202
203 /// Enqueues the given future to be run to completion on a background thread.
204 #[track_caller]
205 pub fn spawn_with_priority<R>(
206 &self,
207 priority: Priority,
208 future: impl Future<Output = R> + Send + 'static,
209 ) -> Task<R>
210 where
211 R: Send + 'static,
212 {
213 self.spawn_internal::<R>(Box::pin(future), None, priority)
214 }
215
216 /// Enqueues the given future to be run to completion on a background thread.
217 /// The given label can be used to control the priority of the task in tests.
218 #[track_caller]
219 pub fn spawn_labeled<R>(
220 &self,
221 label: TaskLabel,
222 future: impl Future<Output = R> + Send + 'static,
223 ) -> Task<R>
224 where
225 R: Send + 'static,
226 {
227 self.spawn_internal::<R>(Box::pin(future), Some(label), Priority::default())
228 }
229
230 #[track_caller]
231 fn spawn_internal<R: Send + 'static>(
232 &self,
233 future: AnyFuture<R>,
234 label: Option<TaskLabel>,
235 priority: Priority,
236 ) -> Task<R> {
237 let dispatcher = self.dispatcher.clone();
238 let (runnable, task) = if let Priority::Realtime(realtime) = priority {
239 let location = core::panic::Location::caller();
240 let (mut tx, rx) = flume::bounded::<Runnable<RunnableMeta>>(1);
241
242 dispatcher.spawn_realtime(
243 realtime,
244 Box::new(move || {
245 while let Ok(runnable) = rx.recv() {
246 let start = Instant::now();
247 let location = runnable.metadata().location;
248 let mut timing = TaskTiming {
249 location,
250 start,
251 end: None,
252 };
253 profiler::add_task_timing(timing);
254
255 runnable.run();
256
257 let end = Instant::now();
258 timing.end = Some(end);
259 profiler::add_task_timing(timing);
260 }
261 }),
262 );
263
264 async_task::Builder::new()
265 .metadata(RunnableMeta { location })
266 .spawn(
267 move |_| future,
268 move |runnable| {
269 let _ = tx.send(runnable);
270 },
271 )
272 } else {
273 let location = core::panic::Location::caller();
274 async_task::Builder::new()
275 .metadata(RunnableMeta { location })
276 .spawn(
277 move |_| future,
278 move |runnable| {
279 dispatcher.dispatch(RunnableVariant::Meta(runnable), label, priority)
280 },
281 )
282 };
283
284 runnable.schedule();
285 Task(TaskState::Spawned(task))
286 }
287
288 /// Used by the test harness to run an async test in a synchronous fashion.
289 #[cfg(any(test, feature = "test-support"))]
290 #[track_caller]
291 pub fn block_test<R>(&self, future: impl Future<Output = R>) -> R {
292 if let Ok(value) = self.block_internal(false, future, None) {
293 value
294 } else {
295 unreachable!()
296 }
297 }
298
299 /// Block the current thread until the given future resolves.
300 /// Consider using `block_with_timeout` instead.
301 pub fn block<R>(&self, future: impl Future<Output = R>) -> R {
302 if let Ok(value) = self.block_internal(true, future, None) {
303 value
304 } else {
305 unreachable!()
306 }
307 }
308
309 #[cfg(not(any(test, feature = "test-support")))]
310 pub(crate) fn block_internal<Fut: Future>(
311 &self,
312 _background_only: bool,
313 future: Fut,
314 timeout: Option<Duration>,
315 ) -> Result<Fut::Output, impl Future<Output = Fut::Output> + use<Fut>> {
316 use std::time::Instant;
317
318 let mut future = Box::pin(future);
319 if timeout == Some(Duration::ZERO) {
320 return Err(future);
321 }
322 let deadline = timeout.map(|timeout| Instant::now() + timeout);
323
324 let parker = parking::Parker::new();
325 let unparker = parker.unparker();
326 let waker = waker_fn(move || {
327 unparker.unpark();
328 });
329 let mut cx = std::task::Context::from_waker(&waker);
330
331 loop {
332 match future.as_mut().poll(&mut cx) {
333 Poll::Ready(result) => return Ok(result),
334 Poll::Pending => {
335 let timeout =
336 deadline.map(|deadline| deadline.saturating_duration_since(Instant::now()));
337 if let Some(timeout) = timeout {
338 if !parker.park_timeout(timeout)
339 && deadline.is_some_and(|deadline| deadline < Instant::now())
340 {
341 return Err(future);
342 }
343 } else {
344 parker.park();
345 }
346 }
347 }
348 }
349 }
350
351 #[cfg(any(test, feature = "test-support"))]
352 #[track_caller]
353 pub(crate) fn block_internal<Fut: Future>(
354 &self,
355 background_only: bool,
356 future: Fut,
357 timeout: Option<Duration>,
358 ) -> Result<Fut::Output, impl Future<Output = Fut::Output> + use<Fut>> {
359 use std::sync::atomic::AtomicBool;
360
361 use parking::Parker;
362
363 let mut future = Box::pin(future);
364 if timeout == Some(Duration::ZERO) {
365 return Err(future);
366 }
367 let Some(dispatcher) = self.dispatcher.as_test() else {
368 return Err(future);
369 };
370
371 let mut max_ticks = if timeout.is_some() {
372 dispatcher.gen_block_on_ticks()
373 } else {
374 usize::MAX
375 };
376
377 let parker = Parker::new();
378 let unparker = parker.unparker();
379
380 let awoken = Arc::new(AtomicBool::new(false));
381 let waker = waker_fn({
382 let awoken = awoken.clone();
383 let unparker = unparker.clone();
384 move || {
385 awoken.store(true, Ordering::SeqCst);
386 unparker.unpark();
387 }
388 });
389 let mut cx = std::task::Context::from_waker(&waker);
390
391 let duration = Duration::from_secs(
392 option_env!("GPUI_TEST_TIMEOUT")
393 .and_then(|s| s.parse::<u64>().ok())
394 .unwrap_or(180),
395 );
396 let mut test_should_end_by = Instant::now() + duration;
397
398 loop {
399 match future.as_mut().poll(&mut cx) {
400 Poll::Ready(result) => return Ok(result),
401 Poll::Pending => {
402 if max_ticks == 0 {
403 return Err(future);
404 }
405 max_ticks -= 1;
406
407 if !dispatcher.tick(background_only) {
408 if awoken.swap(false, Ordering::SeqCst) {
409 continue;
410 }
411
412 if !dispatcher.parking_allowed() {
413 if dispatcher.advance_clock_to_next_delayed() {
414 continue;
415 }
416 let mut backtrace_message = String::new();
417 let mut waiting_message = String::new();
418 if let Some(backtrace) = dispatcher.waiting_backtrace() {
419 backtrace_message =
420 format!("\nbacktrace of waiting future:\n{:?}", backtrace);
421 }
422 if let Some(waiting_hint) = dispatcher.waiting_hint() {
423 waiting_message = format!("\n waiting on: {}\n", waiting_hint);
424 }
425 panic!(
426 "parked with nothing left to run{waiting_message}{backtrace_message}",
427 )
428 }
429 dispatcher.push_unparker(unparker.clone());
430 parker.park_timeout(Duration::from_millis(1));
431 if Instant::now() > test_should_end_by {
432 panic!("test timed out after {duration:?} with allow_parking")
433 }
434 }
435 }
436 }
437 }
438 }
439
440 /// Block the current thread until the given future resolves
441 /// or `duration` has elapsed.
442 pub fn block_with_timeout<Fut: Future>(
443 &self,
444 duration: Duration,
445 future: Fut,
446 ) -> Result<Fut::Output, impl Future<Output = Fut::Output> + use<Fut>> {
447 self.block_internal(true, future, Some(duration))
448 }
449
450 /// Scoped lets you start a number of tasks and waits
451 /// for all of them to complete before returning.
452 pub async fn scoped<'scope, F>(&self, scheduler: F)
453 where
454 F: FnOnce(&mut Scope<'scope>),
455 {
456 let mut scope = Scope::new(self.clone(), Priority::default());
457 (scheduler)(&mut scope);
458 let spawned = mem::take(&mut scope.futures)
459 .into_iter()
460 .map(|f| self.spawn_with_priority(scope.priority, f))
461 .collect::<Vec<_>>();
462 for task in spawned {
463 task.await;
464 }
465 }
466
467 /// Scoped lets you start a number of tasks and waits
468 /// for all of them to complete before returning.
469 pub async fn scoped_priority<'scope, F>(&self, priority: Priority, scheduler: F)
470 where
471 F: FnOnce(&mut Scope<'scope>),
472 {
473 let mut scope = Scope::new(self.clone(), priority);
474 (scheduler)(&mut scope);
475 let spawned = mem::take(&mut scope.futures)
476 .into_iter()
477 .map(|f| self.spawn_with_priority(scope.priority, f))
478 .collect::<Vec<_>>();
479 for task in spawned {
480 task.await;
481 }
482 }
483
484 /// Get the current time.
485 ///
486 /// Calling this instead of `std::time::Instant::now` allows the use
487 /// of fake timers in tests.
488 pub fn now(&self) -> Instant {
489 self.dispatcher.now()
490 }
491
492 /// Returns a task that will complete after the given duration.
493 /// Depending on other concurrent tasks the elapsed duration may be longer
494 /// than requested.
495 pub fn timer(&self, duration: Duration) -> Task<()> {
496 if duration.is_zero() {
497 return Task::ready(());
498 }
499 let location = core::panic::Location::caller();
500 let (runnable, task) = async_task::Builder::new()
501 .metadata(RunnableMeta { location })
502 .spawn(move |_| async move {}, {
503 let dispatcher = self.dispatcher.clone();
504 move |runnable| dispatcher.dispatch_after(duration, RunnableVariant::Meta(runnable))
505 });
506 runnable.schedule();
507 Task(TaskState::Spawned(task))
508 }
509
510 /// in tests, start_waiting lets you indicate which task is waiting (for debugging only)
511 #[cfg(any(test, feature = "test-support"))]
512 pub fn start_waiting(&self) {
513 self.dispatcher.as_test().unwrap().start_waiting();
514 }
515
516 /// in tests, removes the debugging data added by start_waiting
517 #[cfg(any(test, feature = "test-support"))]
518 pub fn finish_waiting(&self) {
519 self.dispatcher.as_test().unwrap().finish_waiting();
520 }
521
522 /// in tests, run an arbitrary number of tasks (determined by the SEED environment variable)
523 #[cfg(any(test, feature = "test-support"))]
524 pub fn simulate_random_delay(&self) -> impl Future<Output = ()> + use<> {
525 self.dispatcher.as_test().unwrap().simulate_random_delay()
526 }
527
528 /// in tests, indicate that a given task from `spawn_labeled` should run after everything else
529 #[cfg(any(test, feature = "test-support"))]
530 pub fn deprioritize(&self, task_label: TaskLabel) {
531 self.dispatcher.as_test().unwrap().deprioritize(task_label)
532 }
533
534 /// in tests, move time forward. This does not run any tasks, but does make `timer`s ready.
535 #[cfg(any(test, feature = "test-support"))]
536 pub fn advance_clock(&self, duration: Duration) {
537 self.dispatcher.as_test().unwrap().advance_clock(duration)
538 }
539
540 /// in tests, run one task.
541 #[cfg(any(test, feature = "test-support"))]
542 pub fn tick(&self) -> bool {
543 self.dispatcher.as_test().unwrap().tick(false)
544 }
545
546 /// in tests, run all tasks that are ready to run. If after doing so
547 /// the test still has outstanding tasks, this will panic. (See also [`Self::allow_parking`])
548 #[cfg(any(test, feature = "test-support"))]
549 pub fn run_until_parked(&self) {
550 self.dispatcher.as_test().unwrap().run_until_parked()
551 }
552
553 /// in tests, prevents `run_until_parked` from panicking if there are outstanding tasks.
554 /// This is useful when you are integrating other (non-GPUI) futures, like disk access, that
555 /// do take real async time to run.
556 #[cfg(any(test, feature = "test-support"))]
557 pub fn allow_parking(&self) {
558 self.dispatcher.as_test().unwrap().allow_parking();
559 }
560
561 /// undoes the effect of [`Self::allow_parking`].
562 #[cfg(any(test, feature = "test-support"))]
563 pub fn forbid_parking(&self) {
564 self.dispatcher.as_test().unwrap().forbid_parking();
565 }
566
567 /// adds detail to the "parked with nothing let to run" message.
568 #[cfg(any(test, feature = "test-support"))]
569 pub fn set_waiting_hint(&self, msg: Option<String>) {
570 self.dispatcher.as_test().unwrap().set_waiting_hint(msg);
571 }
572
573 /// in tests, returns the rng used by the dispatcher and seeded by the `SEED` environment variable
574 #[cfg(any(test, feature = "test-support"))]
575 pub fn rng(&self) -> StdRng {
576 self.dispatcher.as_test().unwrap().rng()
577 }
578
579 /// How many CPUs are available to the dispatcher.
580 pub fn num_cpus(&self) -> usize {
581 #[cfg(any(test, feature = "test-support"))]
582 return 4;
583
584 #[cfg(not(any(test, feature = "test-support")))]
585 return num_cpus::get();
586 }
587
588 /// Whether we're on the main thread.
589 pub fn is_main_thread(&self) -> bool {
590 self.dispatcher.is_main_thread()
591 }
592
593 #[cfg(any(test, feature = "test-support"))]
594 /// in tests, control the number of ticks that `block_with_timeout` will run before timing out.
595 pub fn set_block_on_ticks(&self, range: std::ops::RangeInclusive<usize>) {
596 self.dispatcher.as_test().unwrap().set_block_on_ticks(range);
597 }
598}
599
600/// ForegroundExecutor runs things on the main thread.
601impl ForegroundExecutor {
602 /// Creates a new ForegroundExecutor from the given PlatformDispatcher.
603 pub fn new(dispatcher: Arc<dyn PlatformDispatcher>) -> Self {
604 Self {
605 dispatcher,
606 not_send: PhantomData,
607 }
608 }
609
610 /// Enqueues the given Task to run on the main thread at some point in the future.
611 #[track_caller]
612 pub fn spawn<R>(&self, future: impl Future<Output = R> + 'static) -> Task<R>
613 where
614 R: 'static,
615 {
616 self.spawn_with_priority(Priority::default(), future)
617 }
618
619 /// Enqueues the given Task to run on the main thread at some point in the future.
620 #[track_caller]
621 pub fn spawn_with_priority<R>(
622 &self,
623 priority: Priority,
624 future: impl Future<Output = R> + 'static,
625 ) -> Task<R>
626 where
627 R: 'static,
628 {
629 let dispatcher = self.dispatcher.clone();
630 let location = core::panic::Location::caller();
631
632 #[track_caller]
633 fn inner<R: 'static>(
634 dispatcher: Arc<dyn PlatformDispatcher>,
635 future: AnyLocalFuture<R>,
636 location: &'static core::panic::Location<'static>,
637 priority: Priority,
638 ) -> Task<R> {
639 let (runnable, task) = spawn_local_with_source_location(
640 future,
641 move |runnable| {
642 dispatcher.dispatch_on_main_thread(RunnableVariant::Meta(runnable), priority)
643 },
644 RunnableMeta { location },
645 );
646 runnable.schedule();
647 Task(TaskState::Spawned(task))
648 }
649 inner::<R>(dispatcher, Box::pin(future), location, priority)
650 }
651}
652
653/// Variant of `async_task::spawn_local` that includes the source location of the spawn in panics.
654///
655/// Copy-modified from:
656/// <https://github.com/smol-rs/async-task/blob/ca9dbe1db9c422fd765847fa91306e30a6bb58a9/src/runnable.rs#L405>
657#[track_caller]
658fn spawn_local_with_source_location<Fut, S, M>(
659 future: Fut,
660 schedule: S,
661 metadata: M,
662) -> (Runnable<M>, async_task::Task<Fut::Output, M>)
663where
664 Fut: Future + 'static,
665 Fut::Output: 'static,
666 S: async_task::Schedule<M> + Send + Sync + 'static,
667 M: 'static,
668{
669 #[inline]
670 fn thread_id() -> ThreadId {
671 std::thread_local! {
672 static ID: ThreadId = thread::current().id();
673 }
674 ID.try_with(|id| *id)
675 .unwrap_or_else(|_| thread::current().id())
676 }
677
678 struct Checked<F> {
679 id: ThreadId,
680 inner: ManuallyDrop<F>,
681 location: &'static Location<'static>,
682 }
683
684 impl<F> Drop for Checked<F> {
685 fn drop(&mut self) {
686 assert!(
687 self.id == thread_id(),
688 "local task dropped by a thread that didn't spawn it. Task spawned at {}",
689 self.location
690 );
691 unsafe { ManuallyDrop::drop(&mut self.inner) };
692 }
693 }
694
695 impl<F: Future> Future for Checked<F> {
696 type Output = F::Output;
697
698 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
699 assert!(
700 self.id == thread_id(),
701 "local task polled by a thread that didn't spawn it. Task spawned at {}",
702 self.location
703 );
704 unsafe { self.map_unchecked_mut(|c| &mut *c.inner).poll(cx) }
705 }
706 }
707
708 // Wrap the future into one that checks which thread it's on.
709 let future = Checked {
710 id: thread_id(),
711 inner: ManuallyDrop::new(future),
712 location: Location::caller(),
713 };
714
715 unsafe {
716 async_task::Builder::new()
717 .metadata(metadata)
718 .spawn_unchecked(move |_| future, schedule)
719 }
720}
721
722/// Scope manages a set of tasks that are enqueued and waited on together. See [`BackgroundExecutor::scoped`].
723pub struct Scope<'a> {
724 executor: BackgroundExecutor,
725 priority: Priority,
726 futures: Vec<Pin<Box<dyn Future<Output = ()> + Send + 'static>>>,
727 tx: Option<mpsc::Sender<()>>,
728 rx: mpsc::Receiver<()>,
729 lifetime: PhantomData<&'a ()>,
730}
731
732impl<'a> Scope<'a> {
733 fn new(executor: BackgroundExecutor, priority: Priority) -> Self {
734 let (tx, rx) = mpsc::channel(1);
735 Self {
736 executor,
737 priority,
738 tx: Some(tx),
739 rx,
740 futures: Default::default(),
741 lifetime: PhantomData,
742 }
743 }
744
745 /// How many CPUs are available to the dispatcher.
746 pub fn num_cpus(&self) -> usize {
747 self.executor.num_cpus()
748 }
749
750 /// Spawn a future into this scope.
751 #[track_caller]
752 pub fn spawn<F>(&mut self, f: F)
753 where
754 F: Future<Output = ()> + Send + 'a,
755 {
756 let tx = self.tx.clone().unwrap();
757
758 // SAFETY: The 'a lifetime is guaranteed to outlive any of these futures because
759 // dropping this `Scope` blocks until all of the futures have resolved.
760 let f = unsafe {
761 mem::transmute::<
762 Pin<Box<dyn Future<Output = ()> + Send + 'a>>,
763 Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
764 >(Box::pin(async move {
765 f.await;
766 drop(tx);
767 }))
768 };
769 self.futures.push(f);
770 }
771}
772
773impl Drop for Scope<'_> {
774 fn drop(&mut self) {
775 self.tx.take().unwrap();
776
777 // Wait until the channel is closed, which means that all of the spawned
778 // futures have resolved.
779 self.executor.block(self.rx.next());
780 }
781}