1use crate::{App, PlatformDispatcher, PlatformScheduler};
2use futures::channel::mpsc;
3use scheduler::Scheduler;
4use smol::prelude::*;
5use std::{
6 fmt::Debug,
7 future::Future,
8 marker::PhantomData,
9 mem,
10 pin::Pin,
11 rc::Rc,
12 sync::Arc,
13 time::{Duration, Instant},
14};
15use util::TryFutureExt;
16
17pub use scheduler::{FallibleTask, Priority};
18
19/// A pointer to the executor that is currently running,
20/// for spawning background tasks.
21#[derive(Clone)]
22pub struct BackgroundExecutor {
23 inner: scheduler::BackgroundExecutor,
24 dispatcher: Arc<dyn PlatformDispatcher>,
25}
26
27/// A pointer to the executor that is currently running,
28/// for spawning tasks on the main thread.
29#[derive(Clone)]
30pub struct ForegroundExecutor {
31 inner: scheduler::ForegroundExecutor,
32 dispatcher: Arc<dyn PlatformDispatcher>,
33 not_send: PhantomData<Rc<()>>,
34}
35
36/// Task is a primitive that allows work to happen in the background.
37///
38/// It implements [`Future`] so you can `.await` on it.
39///
40/// If you drop a task it will be cancelled immediately. Calling [`Task::detach`] allows
41/// the task to continue running, but with no way to return a value.
42#[must_use]
43#[derive(Debug)]
44pub struct Task<T>(scheduler::Task<T>);
45
46impl<T> Task<T> {
47 /// Creates a new task that will resolve with the value.
48 pub fn ready(val: T) -> Self {
49 Task(scheduler::Task::ready(val))
50 }
51
52 /// Returns true if the task has completed or was created with `Task::ready`.
53 pub fn is_ready(&self) -> bool {
54 self.0.is_ready()
55 }
56
57 /// Detaching a task runs it to completion in the background.
58 pub fn detach(self) {
59 self.0.detach()
60 }
61
62 /// Wraps a scheduler::Task.
63 pub fn from_scheduler(task: scheduler::Task<T>) -> Self {
64 Task(task)
65 }
66
67 /// Converts this task into a fallible task that returns `Option<T>`.
68 ///
69 /// Unlike the standard `Task<T>`, a [`FallibleTask`] will return `None`
70 /// if the task was cancelled.
71 ///
72 /// # Example
73 ///
74 /// ```ignore
75 /// // Background task that gracefully handles cancellation:
76 /// cx.background_spawn(async move {
77 /// let result = foreground_task.fallible().await;
78 /// if let Some(value) = result {
79 /// // Process the value
80 /// }
81 /// // If None, task was cancelled - just exit gracefully
82 /// }).detach();
83 /// ```
84 pub fn fallible(self) -> FallibleTask<T> {
85 self.0.fallible()
86 }
87}
88
89impl<T, E> Task<Result<T, E>>
90where
91 T: 'static,
92 E: 'static + Debug,
93{
94 /// Run the task to completion in the background and log any errors that occur.
95 #[track_caller]
96 pub fn detach_and_log_err(self, cx: &App) {
97 let location = core::panic::Location::caller();
98 cx.foreground_executor()
99 .spawn(self.log_tracked_err(*location))
100 .detach();
101 }
102}
103
104impl<T> std::future::Future for Task<T> {
105 type Output = T;
106
107 fn poll(
108 self: std::pin::Pin<&mut Self>,
109 cx: &mut std::task::Context<'_>,
110 ) -> std::task::Poll<Self::Output> {
111 // SAFETY: Task is a repr(transparent) wrapper around scheduler::Task,
112 // and we're just projecting the pin through to the inner task.
113 let inner = unsafe { self.map_unchecked_mut(|t| &mut t.0) };
114 inner.poll(cx)
115 }
116}
117
118impl BackgroundExecutor {
119 /// Creates a new BackgroundExecutor from the given PlatformDispatcher.
120 pub fn new(dispatcher: Arc<dyn PlatformDispatcher>) -> Self {
121 #[cfg(any(test, feature = "test-support"))]
122 let scheduler: Arc<dyn Scheduler> = if let Some(test_dispatcher) = dispatcher.as_test() {
123 test_dispatcher.scheduler().clone()
124 } else {
125 Arc::new(PlatformScheduler::new(dispatcher.clone()))
126 };
127
128 #[cfg(not(any(test, feature = "test-support")))]
129 let scheduler: Arc<dyn Scheduler> = Arc::new(PlatformScheduler::new(dispatcher.clone()));
130
131 Self {
132 inner: scheduler::BackgroundExecutor::new(scheduler),
133 dispatcher,
134 }
135 }
136
137 /// Close this executor. Tasks will not run after this is called.
138 pub fn close(&self) {
139 self.inner.close();
140 }
141
142 /// Enqueues the given future to be run to completion on a background thread.
143 #[track_caller]
144 pub fn spawn<R>(&self, future: impl Future<Output = R> + Send + 'static) -> Task<R>
145 where
146 R: Send + 'static,
147 {
148 self.spawn_with_priority(Priority::default(), future)
149 }
150
151 /// Enqueues the given future to be run to completion on a background thread with the given priority.
152 ///
153 /// `Priority::Realtime` is currently treated as `Priority::High`.
154 ///
155 /// This is intentionally *not* a "downgrade" feature: realtime execution is effectively
156 /// disabled until we have an in-tree use case and are confident about the semantics and
157 /// failure modes (especially around channel backpressure and the risk of blocking
158 /// latency-sensitive threads). It should be straightforward to add a true realtime
159 /// implementation back once those constraints are well-defined.
160 #[track_caller]
161 pub fn spawn_with_priority<R>(
162 &self,
163 priority: Priority,
164 future: impl Future<Output = R> + Send + 'static,
165 ) -> Task<R>
166 where
167 R: Send + 'static,
168 {
169 Task::from_scheduler(self.inner.spawn_with_priority(priority, future))
170 }
171
172 /// Enqueues the given future to be run to completion on a background thread and blocking the current task on it.
173 ///
174 /// This allows to spawn background work that borrows from its scope. Note that the supplied future will run to
175 /// completion before the current task is resumed, even if the current task is slated for cancellation.
176 pub async fn await_on_background<R>(&self, future: impl Future<Output = R> + Send) -> R
177 where
178 R: Send,
179 {
180 use crate::RunnableMeta;
181 use parking_lot::{Condvar, Mutex};
182 use std::sync::{Arc, atomic::AtomicBool};
183
184 struct NotifyOnDrop<'a>(&'a (Condvar, Mutex<bool>));
185
186 impl Drop for NotifyOnDrop<'_> {
187 fn drop(&mut self) {
188 *self.0.1.lock() = true;
189 self.0.0.notify_all();
190 }
191 }
192
193 struct WaitOnDrop<'a>(&'a (Condvar, Mutex<bool>));
194
195 impl Drop for WaitOnDrop<'_> {
196 fn drop(&mut self) {
197 let mut done = self.0.1.lock();
198 if !*done {
199 self.0.0.wait(&mut done);
200 }
201 }
202 }
203
204 let dispatcher = self.dispatcher.clone();
205 let location = core::panic::Location::caller();
206 let closed = Arc::new(AtomicBool::new(false));
207
208 let pair = &(Condvar::new(), Mutex::new(false));
209 let _wait_guard = WaitOnDrop(pair);
210
211 let (runnable, task) = unsafe {
212 async_task::Builder::new()
213 .metadata(RunnableMeta { location, closed })
214 .spawn_unchecked(
215 move |_| async {
216 let _notify_guard = NotifyOnDrop(pair);
217 future.await
218 },
219 move |runnable| {
220 dispatcher.dispatch(runnable, Priority::default());
221 },
222 )
223 };
224 runnable.schedule();
225 task.await
226 }
227
228 /// Scoped lets you start a number of tasks and waits
229 /// for all of them to complete before returning.
230 pub async fn scoped<'scope, F>(&self, scheduler: F)
231 where
232 F: FnOnce(&mut Scope<'scope>),
233 {
234 let mut scope = Scope::new(self.clone(), Priority::default());
235 (scheduler)(&mut scope);
236 let spawned = mem::take(&mut scope.futures)
237 .into_iter()
238 .map(|f| self.spawn_with_priority(scope.priority, f))
239 .collect::<Vec<_>>();
240 for task in spawned {
241 task.await;
242 }
243 }
244
245 /// Scoped lets you start a number of tasks and waits
246 /// for all of them to complete before returning.
247 pub async fn scoped_priority<'scope, F>(&self, priority: Priority, scheduler: F)
248 where
249 F: FnOnce(&mut Scope<'scope>),
250 {
251 let mut scope = Scope::new(self.clone(), priority);
252 (scheduler)(&mut scope);
253 let spawned = mem::take(&mut scope.futures)
254 .into_iter()
255 .map(|f| self.spawn_with_priority(scope.priority, f))
256 .collect::<Vec<_>>();
257 for task in spawned {
258 task.await;
259 }
260 }
261
262 /// Get the current time.
263 ///
264 /// Calling this instead of `std::time::Instant::now` allows the use
265 /// of fake timers in tests.
266 pub fn now(&self) -> Instant {
267 self.inner.scheduler().clock().now()
268 }
269
270 /// Returns a task that will complete after the given duration.
271 /// Depending on other concurrent tasks the elapsed duration may be longer
272 /// than requested.
273 pub fn timer(&self, duration: Duration) -> Task<()> {
274 if duration.is_zero() {
275 return Task::ready(());
276 }
277 self.spawn(self.inner.scheduler().timer(duration))
278 }
279
280 /// In tests, run an arbitrary number of tasks (determined by the SEED environment variable)
281 #[cfg(any(test, feature = "test-support"))]
282 pub fn simulate_random_delay(&self) -> impl Future<Output = ()> + use<> {
283 self.dispatcher.as_test().unwrap().simulate_random_delay()
284 }
285
286 /// In tests, move time forward. This does not run any tasks, but does make `timer`s ready.
287 #[cfg(any(test, feature = "test-support"))]
288 pub fn advance_clock(&self, duration: Duration) {
289 self.dispatcher.as_test().unwrap().advance_clock(duration)
290 }
291
292 /// In tests, run one task.
293 #[cfg(any(test, feature = "test-support"))]
294 pub fn tick(&self) -> bool {
295 self.dispatcher.as_test().unwrap().scheduler().tick()
296 }
297
298 /// In tests, run tasks until the scheduler would park.
299 ///
300 /// Under the scheduler-backed test dispatcher, `tick()` will not advance the clock, so a pending
301 /// timer can keep `has_pending_tasks()` true even after all currently-runnable tasks have been
302 /// drained. To preserve the historical semantics that tests relied on (drain all work that can
303 /// make progress), we advance the clock to the next timer when no runnable tasks remain.
304 #[cfg(any(test, feature = "test-support"))]
305 pub fn run_until_parked(&self) {
306 let scheduler = self.dispatcher.as_test().unwrap().scheduler();
307 scheduler.run();
308 }
309
310 /// In tests, prevents `run_until_parked` from panicking if there are outstanding tasks.
311 #[cfg(any(test, feature = "test-support"))]
312 pub fn allow_parking(&self) {
313 self.dispatcher
314 .as_test()
315 .unwrap()
316 .scheduler()
317 .allow_parking();
318
319 if std::env::var("GPUI_RUN_UNTIL_PARKED_LOG").ok().as_deref() == Some("1") {
320 log::warn!("[gpui::executor] allow_parking: enabled");
321 }
322 }
323
324 /// Sets the range of ticks to run before timing out in block_on.
325 #[cfg(any(test, feature = "test-support"))]
326 pub fn set_block_on_ticks(&self, range: std::ops::RangeInclusive<usize>) {
327 self.dispatcher
328 .as_test()
329 .unwrap()
330 .scheduler()
331 .set_timeout_ticks(range);
332 }
333
334 /// Undoes the effect of [`Self::allow_parking`].
335 #[cfg(any(test, feature = "test-support"))]
336 pub fn forbid_parking(&self) {
337 self.dispatcher
338 .as_test()
339 .unwrap()
340 .scheduler()
341 .forbid_parking();
342 }
343
344 /// In tests, returns the rng used by the dispatcher.
345 #[cfg(any(test, feature = "test-support"))]
346 pub fn rng(&self) -> scheduler::SharedRng {
347 self.dispatcher.as_test().unwrap().scheduler().rng()
348 }
349
350 /// How many CPUs are available to the dispatcher.
351 pub fn num_cpus(&self) -> usize {
352 #[cfg(any(test, feature = "test-support"))]
353 if self.dispatcher.as_test().is_some() {
354 return 4;
355 }
356 num_cpus::get()
357 }
358
359 /// Whether we're on the main thread.
360 pub fn is_main_thread(&self) -> bool {
361 self.dispatcher.is_main_thread()
362 }
363
364 #[doc(hidden)]
365 pub fn dispatcher(&self) -> &Arc<dyn PlatformDispatcher> {
366 &self.dispatcher
367 }
368}
369
370impl ForegroundExecutor {
371 /// Creates a new ForegroundExecutor from the given PlatformDispatcher.
372 pub fn new(dispatcher: Arc<dyn PlatformDispatcher>) -> Self {
373 #[cfg(any(test, feature = "test-support"))]
374 let (scheduler, session_id): (Arc<dyn Scheduler>, _) =
375 if let Some(test_dispatcher) = dispatcher.as_test() {
376 (
377 test_dispatcher.scheduler().clone(),
378 test_dispatcher.session_id(),
379 )
380 } else {
381 let platform_scheduler = Arc::new(PlatformScheduler::new(dispatcher.clone()));
382 let session_id = platform_scheduler.allocate_session_id();
383 (platform_scheduler, session_id)
384 };
385
386 #[cfg(not(any(test, feature = "test-support")))]
387 let (scheduler, session_id): (Arc<dyn Scheduler>, _) = {
388 let platform_scheduler = Arc::new(PlatformScheduler::new(dispatcher.clone()));
389 let session_id = platform_scheduler.allocate_session_id();
390 (platform_scheduler, session_id)
391 };
392
393 let inner = scheduler::ForegroundExecutor::new(session_id, scheduler);
394
395 Self {
396 inner,
397 dispatcher,
398 not_send: PhantomData,
399 }
400 }
401
402 /// Close this executor. Tasks will not run after this is called.
403 pub fn close(&self) {
404 self.inner.close();
405 }
406
407 /// Enqueues the given Task to run on the main thread.
408 #[track_caller]
409 pub fn spawn<R>(&self, future: impl Future<Output = R> + 'static) -> Task<R>
410 where
411 R: 'static,
412 {
413 Task::from_scheduler(self.inner.spawn(future))
414 }
415
416 /// Enqueues the given Task to run on the main thread with the given priority.
417 #[track_caller]
418 pub fn spawn_with_priority<R>(
419 &self,
420 _priority: Priority,
421 future: impl Future<Output = R> + 'static,
422 ) -> Task<R>
423 where
424 R: 'static,
425 {
426 // Priority is ignored for foreground tasks - they run in order on the main thread
427 Task::from_scheduler(self.inner.spawn(future))
428 }
429
430 /// Used by the test harness to run an async test in a synchronous fashion.
431 #[cfg(any(test, feature = "test-support"))]
432 #[track_caller]
433 pub fn block_test<R>(&self, future: impl Future<Output = R>) -> R {
434 use std::cell::Cell;
435
436 let scheduler = self.inner.scheduler();
437
438 let output = Cell::new(None);
439 let future = async {
440 output.set(Some(future.await));
441 };
442 let mut future = std::pin::pin!(future);
443
444 // In async GPUI tests, we must allow foreground tasks scheduled by the test itself
445 // (which are associated with the test session) to make progress while we block.
446 // Otherwise, awaiting futures that depend on same-session foreground work can deadlock.
447 scheduler.block(None, future.as_mut(), None);
448
449 output.take().expect("block_test future did not complete")
450 }
451
452 /// Block the current thread until the given future resolves.
453 /// Consider using `block_with_timeout` instead.
454 pub fn block_on<R>(&self, future: impl Future<Output = R>) -> R {
455 self.inner.block_on(future)
456 }
457
458 /// Block the current thread until the given future resolves or the timeout elapses.
459 pub fn block_with_timeout<R, Fut: Future<Output = R>>(
460 &self,
461 duration: Duration,
462 future: Fut,
463 ) -> Result<R, impl Future<Output = R> + use<R, Fut>> {
464 self.inner.block_with_timeout(duration, future)
465 }
466
467 #[doc(hidden)]
468 pub fn dispatcher(&self) -> &Arc<dyn PlatformDispatcher> {
469 &self.dispatcher
470 }
471}
472
473/// Scope manages a set of tasks that are enqueued and waited on together. See [`BackgroundExecutor::scoped`].
474pub struct Scope<'a> {
475 executor: BackgroundExecutor,
476 priority: Priority,
477 futures: Vec<Pin<Box<dyn Future<Output = ()> + Send + 'static>>>,
478 tx: Option<mpsc::Sender<()>>,
479 rx: mpsc::Receiver<()>,
480 lifetime: PhantomData<&'a ()>,
481}
482
483impl<'a> Scope<'a> {
484 fn new(executor: BackgroundExecutor, priority: Priority) -> Self {
485 let (tx, rx) = mpsc::channel(1);
486 Self {
487 executor,
488 priority,
489 tx: Some(tx),
490 rx,
491 futures: Default::default(),
492 lifetime: PhantomData,
493 }
494 }
495
496 /// How many CPUs are available to the dispatcher.
497 pub fn num_cpus(&self) -> usize {
498 self.executor.num_cpus()
499 }
500
501 /// Spawn a future into this scope.
502 #[track_caller]
503 pub fn spawn<F>(&mut self, f: F)
504 where
505 F: Future<Output = ()> + Send + 'a,
506 {
507 let tx = self.tx.clone().unwrap();
508
509 // SAFETY: The 'a lifetime is guaranteed to outlive any of these futures because
510 // dropping this `Scope` blocks until all of the futures have resolved.
511 let f = unsafe {
512 mem::transmute::<
513 Pin<Box<dyn Future<Output = ()> + Send + 'a>>,
514 Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
515 >(Box::pin(async move {
516 f.await;
517 drop(tx);
518 }))
519 };
520 self.futures.push(f);
521 }
522}
523
524impl Drop for Scope<'_> {
525 fn drop(&mut self) {
526 self.tx.take().unwrap();
527
528 // Wait until the channel is closed, which means that all of the spawned
529 // futures have resolved.
530 let future = async {
531 self.rx.next().await;
532 };
533 let mut future = std::pin::pin!(future);
534 self.executor
535 .inner
536 .scheduler()
537 .block(None, future.as_mut(), None);
538 }
539}
540
541#[cfg(test)]
542mod test {
543 use super::*;
544 use crate::{App, TestDispatcher, TestPlatform};
545 use std::cell::RefCell;
546
547 /// Helper to create test infrastructure.
548 /// Returns (dispatcher, background_executor, app).
549 fn create_test_app() -> (TestDispatcher, BackgroundExecutor, Rc<crate::AppCell>) {
550 let dispatcher = TestDispatcher::new(0);
551 let arc_dispatcher = Arc::new(dispatcher.clone());
552 let background_executor = BackgroundExecutor::new(arc_dispatcher.clone());
553 let foreground_executor = ForegroundExecutor::new(arc_dispatcher);
554
555 let platform = TestPlatform::new(background_executor.clone(), foreground_executor);
556 let asset_source = Arc::new(());
557 let http_client = http_client::FakeHttpClient::with_404_response();
558
559 let app = App::new_app(platform, asset_source, http_client);
560 (dispatcher, background_executor, app)
561 }
562
563 #[test]
564 fn sanity_test_tasks_run() {
565 let (dispatcher, _background_executor, app) = create_test_app();
566 let foreground_executor = app.borrow().foreground_executor.clone();
567
568 let task_ran = Rc::new(RefCell::new(false));
569
570 foreground_executor
571 .spawn({
572 let task_ran = Rc::clone(&task_ran);
573 async move {
574 *task_ran.borrow_mut() = true;
575 }
576 })
577 .detach();
578
579 // Run dispatcher while app is still alive
580 dispatcher.run_until_parked();
581
582 // Task should have run
583 assert!(
584 *task_ran.borrow(),
585 "Task should run normally when app is alive"
586 );
587 }
588
589 #[test]
590 fn test_task_cancelled_when_app_dropped() {
591 let (dispatcher, _background_executor, app) = create_test_app();
592 let foreground_executor = app.borrow().foreground_executor.clone();
593 let app_weak = Rc::downgrade(&app);
594
595 let task_ran = Rc::new(RefCell::new(false));
596 let task_ran_clone = Rc::clone(&task_ran);
597
598 foreground_executor
599 .spawn(async move {
600 *task_ran_clone.borrow_mut() = true;
601 })
602 .detach();
603
604 drop(app);
605
606 assert!(app_weak.upgrade().is_none(), "App should have been dropped");
607
608 dispatcher.run_until_parked();
609
610 // The task should have been cancelled, not run
611 assert!(
612 !*task_ran.borrow(),
613 "Task should have been cancelled when app was dropped, but it ran!"
614 );
615 }
616
617 #[test]
618 fn test_nested_tasks_both_cancel() {
619 let (dispatcher, _background_executor, app) = create_test_app();
620 let foreground_executor = app.borrow().foreground_executor.clone();
621 let app_weak = Rc::downgrade(&app);
622
623 let outer_completed = Rc::new(RefCell::new(false));
624 let inner_completed = Rc::new(RefCell::new(false));
625 let reached_await = Rc::new(RefCell::new(false));
626
627 let outer_flag = Rc::clone(&outer_completed);
628 let inner_flag = Rc::clone(&inner_completed);
629 let await_flag = Rc::clone(&reached_await);
630
631 // Channel to block the inner task until we're ready
632 let (tx, rx) = futures::channel::oneshot::channel::<()>();
633
634 let inner_executor = foreground_executor.clone();
635
636 foreground_executor
637 .spawn(async move {
638 let inner_task = inner_executor.spawn({
639 let inner_flag = Rc::clone(&inner_flag);
640 async move {
641 rx.await.ok();
642 *inner_flag.borrow_mut() = true;
643 }
644 });
645
646 *await_flag.borrow_mut() = true;
647
648 inner_task.await;
649
650 *outer_flag.borrow_mut() = true;
651 })
652 .detach();
653
654 // Run dispatcher until outer task reaches the await point
655 // The inner task will be blocked on the channel
656 dispatcher.run_until_parked();
657
658 // Verify we actually reached the await point before dropping the app
659 assert!(
660 *reached_await.borrow(),
661 "Outer task should have reached the await point"
662 );
663
664 // Neither task should have completed yet
665 assert!(
666 !*outer_completed.borrow(),
667 "Outer task should not have completed yet"
668 );
669 assert!(
670 !*inner_completed.borrow(),
671 "Inner task should not have completed yet"
672 );
673
674 // Drop the channel sender and app while outer is awaiting inner
675 drop(tx);
676 drop(app);
677 assert!(app_weak.upgrade().is_none(), "App should have been dropped");
678
679 // Run dispatcher - both tasks should be cancelled
680 dispatcher.run_until_parked();
681
682 // Neither task should have completed (both were cancelled)
683 assert!(
684 !*outer_completed.borrow(),
685 "Outer task should have been cancelled, not completed"
686 );
687 assert!(
688 !*inner_completed.borrow(),
689 "Inner task should have been cancelled, not completed"
690 );
691 }
692
693 #[test]
694 #[should_panic]
695 fn test_polling_cancelled_task_panics() {
696 let (dispatcher, _background_executor, app) = create_test_app();
697 let foreground_executor = app.borrow().foreground_executor.clone();
698 let app_weak = Rc::downgrade(&app);
699
700 let task = foreground_executor.spawn(async move { 42 });
701
702 drop(app);
703
704 assert!(app_weak.upgrade().is_none(), "App should have been dropped");
705
706 dispatcher.run_until_parked();
707
708 foreground_executor.block_on(task);
709 }
710
711 #[test]
712 fn test_polling_cancelled_task_returns_none_with_fallible() {
713 let (dispatcher, _background_executor, app) = create_test_app();
714 let foreground_executor = app.borrow().foreground_executor.clone();
715 let app_weak = Rc::downgrade(&app);
716
717 let task = foreground_executor.spawn(async move { 42 }).fallible();
718
719 drop(app);
720
721 assert!(app_weak.upgrade().is_none(), "App should have been dropped");
722
723 dispatcher.run_until_parked();
724
725 let result = foreground_executor.block_on(task);
726 assert_eq!(result, None, "Cancelled task should return None");
727 }
728}