1use anyhow::{anyhow, Result};
2use async_task::Runnable;
3use smol::{channel, prelude::*, Executor, Timer};
4use std::{
5 any::Any,
6 fmt::{self, Display},
7 marker::PhantomData,
8 mem,
9 pin::Pin,
10 rc::Rc,
11 sync::Arc,
12 task::{Context, Poll},
13 thread,
14 time::Duration,
15};
16
17use crate::{
18 platform::{self, Dispatcher},
19 util::{self, post_inc},
20 MutableAppContext,
21};
22
23pub enum Foreground {
24 Platform {
25 dispatcher: Arc<dyn platform::Dispatcher>,
26 _not_send_or_sync: PhantomData<Rc<()>>,
27 },
28 #[cfg(any(test, feature = "test-support"))]
29 Deterministic {
30 cx_id: usize,
31 executor: Arc<Deterministic>,
32 },
33}
34
35pub enum Background {
36 #[cfg(any(test, feature = "test-support"))]
37 Deterministic { executor: Arc<Deterministic> },
38 Production {
39 executor: Arc<smol::Executor<'static>>,
40 _stop: channel::Sender<()>,
41 },
42}
43
44type AnyLocalFuture = Pin<Box<dyn 'static + Future<Output = Box<dyn Any + 'static>>>>;
45type AnyFuture = Pin<Box<dyn 'static + Send + Future<Output = Box<dyn Any + Send + 'static>>>>;
46type AnyTask = async_task::Task<Box<dyn Any + Send + 'static>>;
47type AnyLocalTask = async_task::Task<Box<dyn Any + 'static>>;
48
49#[must_use]
50pub enum Task<T> {
51 Ready(Option<T>),
52 Local {
53 any_task: AnyLocalTask,
54 result_type: PhantomData<T>,
55 },
56 Send {
57 any_task: AnyTask,
58 result_type: PhantomData<T>,
59 },
60}
61
62unsafe impl<T: Send> Send for Task<T> {}
63
64#[cfg(any(test, feature = "test-support"))]
65struct DeterministicState {
66 rng: rand::prelude::StdRng,
67 seed: u64,
68 scheduled_from_foreground: collections::HashMap<usize, Vec<ForegroundRunnable>>,
69 scheduled_from_background: Vec<Runnable>,
70 forbid_parking: bool,
71 block_on_ticks: std::ops::RangeInclusive<usize>,
72 now: std::time::Instant,
73 next_timer_id: usize,
74 pending_timers: Vec<(usize, std::time::Instant, postage::barrier::Sender)>,
75 waiting_backtrace: Option<backtrace::Backtrace>,
76}
77
78#[cfg(any(test, feature = "test-support"))]
79struct ForegroundRunnable {
80 runnable: Runnable,
81 main: bool,
82}
83
84#[cfg(any(test, feature = "test-support"))]
85pub struct Deterministic {
86 state: Arc<parking_lot::Mutex<DeterministicState>>,
87 parker: parking_lot::Mutex<parking::Parker>,
88}
89
90#[cfg(any(test, feature = "test-support"))]
91impl Deterministic {
92 pub fn new(seed: u64) -> Arc<Self> {
93 use rand::prelude::*;
94
95 Arc::new(Self {
96 state: Arc::new(parking_lot::Mutex::new(DeterministicState {
97 rng: StdRng::seed_from_u64(seed),
98 seed,
99 scheduled_from_foreground: Default::default(),
100 scheduled_from_background: Default::default(),
101 forbid_parking: false,
102 block_on_ticks: 0..=1000,
103 now: std::time::Instant::now(),
104 next_timer_id: Default::default(),
105 pending_timers: Default::default(),
106 waiting_backtrace: None,
107 })),
108 parker: Default::default(),
109 })
110 }
111
112 pub fn build_background(self: &Arc<Self>) -> Arc<Background> {
113 Arc::new(Background::Deterministic {
114 executor: self.clone(),
115 })
116 }
117
118 pub fn build_foreground(self: &Arc<Self>, id: usize) -> Rc<Foreground> {
119 Rc::new(Foreground::Deterministic {
120 cx_id: id,
121 executor: self.clone(),
122 })
123 }
124
125 fn spawn_from_foreground(
126 &self,
127 cx_id: usize,
128 future: AnyLocalFuture,
129 main: bool,
130 ) -> AnyLocalTask {
131 let state = self.state.clone();
132 let unparker = self.parker.lock().unparker();
133 let (runnable, task) = async_task::spawn_local(future, move |runnable| {
134 let mut state = state.lock();
135 state
136 .scheduled_from_foreground
137 .entry(cx_id)
138 .or_default()
139 .push(ForegroundRunnable { runnable, main });
140 unparker.unpark();
141 });
142 runnable.schedule();
143 task
144 }
145
146 fn spawn(&self, future: AnyFuture) -> AnyTask {
147 let state = self.state.clone();
148 let unparker = self.parker.lock().unparker();
149 let (runnable, task) = async_task::spawn(future, move |runnable| {
150 let mut state = state.lock();
151 state.scheduled_from_background.push(runnable);
152 unparker.unpark();
153 });
154 runnable.schedule();
155 task
156 }
157
158 fn run<'a>(
159 &self,
160 cx_id: usize,
161 main_future: Pin<Box<dyn 'a + Future<Output = Box<dyn Any>>>>,
162 ) -> Box<dyn Any> {
163 use std::sync::atomic::{AtomicBool, Ordering::SeqCst};
164
165 let woken = Arc::new(AtomicBool::new(false));
166
167 let state = self.state.clone();
168 let unparker = self.parker.lock().unparker();
169 let (runnable, mut main_task) = unsafe {
170 async_task::spawn_unchecked(main_future, move |runnable| {
171 let mut state = state.lock();
172 state
173 .scheduled_from_foreground
174 .entry(cx_id)
175 .or_default()
176 .push(ForegroundRunnable {
177 runnable,
178 main: true,
179 });
180 unparker.unpark();
181 })
182 };
183 runnable.schedule();
184
185 loop {
186 if let Some(result) = self.run_internal(woken.clone(), Some(&mut main_task)) {
187 return result;
188 }
189
190 if !woken.load(SeqCst) {
191 self.state.lock().will_park();
192 }
193
194 woken.store(false, SeqCst);
195 self.parker.lock().park();
196 }
197 }
198
199 pub fn run_until_parked(&self) {
200 use std::sync::atomic::AtomicBool;
201 let woken = Arc::new(AtomicBool::new(false));
202 self.run_internal(woken, None);
203 }
204
205 fn run_internal(
206 &self,
207 woken: Arc<std::sync::atomic::AtomicBool>,
208 mut main_task: Option<&mut AnyLocalTask>,
209 ) -> Option<Box<dyn Any>> {
210 use rand::prelude::*;
211 use std::sync::atomic::Ordering::SeqCst;
212
213 let unparker = self.parker.lock().unparker();
214 let waker = waker_fn::waker_fn(move || {
215 woken.store(true, SeqCst);
216 unparker.unpark();
217 });
218
219 let mut cx = Context::from_waker(&waker);
220 loop {
221 let mut state = self.state.lock();
222
223 if state.scheduled_from_foreground.is_empty()
224 && state.scheduled_from_background.is_empty()
225 {
226 return None;
227 }
228
229 if !state.scheduled_from_background.is_empty() && state.rng.gen() {
230 let background_len = state.scheduled_from_background.len();
231 let ix = state.rng.gen_range(0..background_len);
232 let runnable = state.scheduled_from_background.remove(ix);
233 drop(state);
234 runnable.run();
235 } else if !state.scheduled_from_foreground.is_empty() {
236 let available_cx_ids = state
237 .scheduled_from_foreground
238 .keys()
239 .copied()
240 .collect::<Vec<_>>();
241 let cx_id_to_run = *available_cx_ids.iter().choose(&mut state.rng).unwrap();
242 let scheduled_from_cx = state
243 .scheduled_from_foreground
244 .get_mut(&cx_id_to_run)
245 .unwrap();
246 let foreground_runnable = scheduled_from_cx.remove(0);
247 if scheduled_from_cx.is_empty() {
248 state.scheduled_from_foreground.remove(&cx_id_to_run);
249 }
250
251 drop(state);
252
253 foreground_runnable.runnable.run();
254 if let Some(main_task) = main_task.as_mut() {
255 if foreground_runnable.main {
256 if let Poll::Ready(result) = main_task.poll(&mut cx) {
257 return Some(result);
258 }
259 }
260 }
261 }
262 }
263 }
264
265 fn block<F, T>(&self, future: &mut F, max_ticks: usize) -> Option<T>
266 where
267 F: Unpin + Future<Output = T>,
268 {
269 use rand::prelude::*;
270
271 let unparker = self.parker.lock().unparker();
272 let waker = waker_fn::waker_fn(move || {
273 unparker.unpark();
274 });
275
276 let mut cx = Context::from_waker(&waker);
277 for _ in 0..max_ticks {
278 let mut state = self.state.lock();
279 let runnable_count = state.scheduled_from_background.len();
280 let ix = state.rng.gen_range(0..=runnable_count);
281 if ix < state.scheduled_from_background.len() {
282 let runnable = state.scheduled_from_background.remove(ix);
283 drop(state);
284 runnable.run();
285 } else {
286 drop(state);
287 if let Poll::Ready(result) = future.poll(&mut cx) {
288 return Some(result);
289 }
290 let mut state = self.state.lock();
291 if state.scheduled_from_background.is_empty() {
292 state.will_park();
293 drop(state);
294 self.parker.lock().park();
295 }
296
297 continue;
298 }
299 }
300
301 None
302 }
303}
304
305#[cfg(any(test, feature = "test-support"))]
306impl DeterministicState {
307 fn will_park(&mut self) {
308 if self.forbid_parking {
309 let mut backtrace_message = String::new();
310 #[cfg(any(test, feature = "test-support"))]
311 if let Some(backtrace) = self.waiting_backtrace.as_mut() {
312 backtrace.resolve();
313 backtrace_message = format!(
314 "\nbacktrace of waiting future:\n{:?}",
315 util::CwdBacktrace(backtrace)
316 );
317 }
318
319 panic!(
320 "deterministic executor parked after a call to forbid_parking{}",
321 backtrace_message
322 );
323 }
324 }
325}
326
327impl Foreground {
328 pub fn platform(dispatcher: Arc<dyn platform::Dispatcher>) -> Result<Self> {
329 if dispatcher.is_main_thread() {
330 Ok(Self::Platform {
331 dispatcher,
332 _not_send_or_sync: PhantomData,
333 })
334 } else {
335 Err(anyhow!("must be constructed on main thread"))
336 }
337 }
338
339 pub fn spawn<T: 'static>(&self, future: impl Future<Output = T> + 'static) -> Task<T> {
340 let future = any_local_future(future);
341 let any_task = match self {
342 #[cfg(any(test, feature = "test-support"))]
343 Self::Deterministic { cx_id, executor } => {
344 executor.spawn_from_foreground(*cx_id, future, false)
345 }
346 Self::Platform { dispatcher, .. } => {
347 fn spawn_inner(
348 future: AnyLocalFuture,
349 dispatcher: &Arc<dyn Dispatcher>,
350 ) -> AnyLocalTask {
351 let dispatcher = dispatcher.clone();
352 let schedule =
353 move |runnable: Runnable| dispatcher.run_on_main_thread(runnable);
354 let (runnable, task) = async_task::spawn_local(future, schedule);
355 runnable.schedule();
356 task
357 }
358 spawn_inner(future, dispatcher)
359 }
360 };
361 Task::local(any_task)
362 }
363
364 #[cfg(any(test, feature = "test-support"))]
365 pub fn run<T: 'static>(&self, future: impl Future<Output = T>) -> T {
366 let future = async move { Box::new(future.await) as Box<dyn Any> }.boxed_local();
367 let result = match self {
368 Self::Deterministic { cx_id, executor } => executor.run(*cx_id, future),
369 Self::Platform { .. } => panic!("you can't call run on a platform foreground executor"),
370 };
371 *result.downcast().unwrap()
372 }
373
374 #[cfg(any(test, feature = "test-support"))]
375 pub fn run_until_parked(&self) {
376 match self {
377 Self::Deterministic { executor, .. } => executor.run_until_parked(),
378 _ => panic!("this method can only be called on a deterministic executor"),
379 }
380 }
381
382 #[cfg(any(test, feature = "test-support"))]
383 pub fn parking_forbidden(&self) -> bool {
384 match self {
385 Self::Deterministic { executor, .. } => executor.state.lock().forbid_parking,
386 _ => panic!("this method can only be called on a deterministic executor"),
387 }
388 }
389
390 #[cfg(any(test, feature = "test-support"))]
391 pub fn start_waiting(&self) {
392 match self {
393 Self::Deterministic { executor, .. } => {
394 executor.state.lock().waiting_backtrace =
395 Some(backtrace::Backtrace::new_unresolved());
396 }
397 _ => panic!("this method can only be called on a deterministic executor"),
398 }
399 }
400
401 #[cfg(any(test, feature = "test-support"))]
402 pub fn finish_waiting(&self) {
403 match self {
404 Self::Deterministic { executor, .. } => {
405 executor.state.lock().waiting_backtrace.take();
406 }
407 _ => panic!("this method can only be called on a deterministic executor"),
408 }
409 }
410
411 #[cfg(any(test, feature = "test-support"))]
412 pub fn forbid_parking(&self) {
413 use rand::prelude::*;
414
415 match self {
416 Self::Deterministic { executor, .. } => {
417 let mut state = executor.state.lock();
418 state.forbid_parking = true;
419 state.rng = StdRng::seed_from_u64(state.seed);
420 }
421 _ => panic!("this method can only be called on a deterministic executor"),
422 }
423 }
424
425 pub async fn timer(&self, duration: Duration) {
426 match self {
427 #[cfg(any(test, feature = "test-support"))]
428 Self::Deterministic { executor, .. } => {
429 use postage::prelude::Stream as _;
430
431 let (tx, mut rx) = postage::barrier::channel();
432 let timer_id;
433 {
434 let mut state = executor.state.lock();
435 let wakeup_at = state.now + duration;
436 timer_id = post_inc(&mut state.next_timer_id);
437 state.pending_timers.push((timer_id, wakeup_at, tx));
438 }
439
440 struct DropTimer<'a>(usize, &'a Foreground);
441 impl<'a> Drop for DropTimer<'a> {
442 fn drop(&mut self) {
443 match self.1 {
444 Foreground::Deterministic { executor, .. } => {
445 executor
446 .state
447 .lock()
448 .pending_timers
449 .retain(|(timer_id, _, _)| *timer_id != self.0);
450 }
451 _ => unreachable!(),
452 }
453 }
454 }
455
456 let _guard = DropTimer(timer_id, self);
457 rx.recv().await;
458 }
459 _ => {
460 Timer::after(duration).await;
461 }
462 }
463 }
464
465 #[cfg(any(test, feature = "test-support"))]
466 pub fn advance_clock(&self, duration: Duration) {
467 match self {
468 Self::Deterministic { executor, .. } => {
469 executor.run_until_parked();
470
471 let mut state = executor.state.lock();
472 state.now += duration;
473 let now = state.now;
474 let mut pending_timers = mem::take(&mut state.pending_timers);
475 drop(state);
476
477 pending_timers.retain(|(_, wakeup, _)| *wakeup > now);
478 executor.state.lock().pending_timers.extend(pending_timers);
479 }
480 _ => panic!("this method can only be called on a deterministic executor"),
481 }
482 }
483
484 #[cfg(any(test, feature = "test-support"))]
485 pub fn set_block_on_ticks(&self, range: std::ops::RangeInclusive<usize>) {
486 match self {
487 Self::Deterministic { executor, .. } => executor.state.lock().block_on_ticks = range,
488 _ => panic!("this method can only be called on a deterministic executor"),
489 }
490 }
491}
492
493impl Background {
494 pub fn new() -> Self {
495 let executor = Arc::new(Executor::new());
496 let stop = channel::unbounded::<()>();
497
498 for i in 0..2 * num_cpus::get() {
499 let executor = executor.clone();
500 let stop = stop.1.clone();
501 thread::Builder::new()
502 .name(format!("background-executor-{}", i))
503 .spawn(move || smol::block_on(executor.run(stop.recv())))
504 .unwrap();
505 }
506
507 Self::Production {
508 executor,
509 _stop: stop.0,
510 }
511 }
512
513 pub fn num_cpus(&self) -> usize {
514 num_cpus::get()
515 }
516
517 pub fn spawn<T, F>(&self, future: F) -> Task<T>
518 where
519 T: 'static + Send,
520 F: Send + Future<Output = T> + 'static,
521 {
522 let future = any_future(future);
523 let any_task = match self {
524 Self::Production { executor, .. } => executor.spawn(future),
525 #[cfg(any(test, feature = "test-support"))]
526 Self::Deterministic { executor } => executor.spawn(future),
527 };
528 Task::send(any_task)
529 }
530
531 pub fn block<F, T>(&self, future: F) -> T
532 where
533 F: Future<Output = T>,
534 {
535 smol::pin!(future);
536 match self {
537 Self::Production { .. } => smol::block_on(&mut future),
538 #[cfg(any(test, feature = "test-support"))]
539 Self::Deterministic { executor, .. } => {
540 executor.block(&mut future, usize::MAX).unwrap()
541 }
542 }
543 }
544
545 pub fn block_with_timeout<F, T>(
546 &self,
547 timeout: Duration,
548 future: F,
549 ) -> Result<T, impl Future<Output = T>>
550 where
551 T: 'static,
552 F: 'static + Unpin + Future<Output = T>,
553 {
554 let mut future = any_local_future(future);
555 if !timeout.is_zero() {
556 let output = match self {
557 Self::Production { .. } => smol::block_on(util::timeout(timeout, &mut future)).ok(),
558 #[cfg(any(test, feature = "test-support"))]
559 Self::Deterministic { executor, .. } => {
560 use rand::prelude::*;
561 let max_ticks = {
562 let mut state = executor.state.lock();
563 let range = state.block_on_ticks.clone();
564 state.rng.gen_range(range)
565 };
566 executor.block(&mut future, max_ticks)
567 }
568 };
569 if let Some(output) = output {
570 return Ok(*output.downcast().unwrap());
571 }
572 }
573 Err(async { *future.await.downcast().unwrap() })
574 }
575
576 pub async fn scoped<'scope, F>(&self, scheduler: F)
577 where
578 F: FnOnce(&mut Scope<'scope>),
579 {
580 let mut scope = Scope {
581 futures: Default::default(),
582 _phantom: PhantomData,
583 };
584 (scheduler)(&mut scope);
585 let spawned = scope
586 .futures
587 .into_iter()
588 .map(|f| self.spawn(f))
589 .collect::<Vec<_>>();
590 for task in spawned {
591 task.await;
592 }
593 }
594
595 #[cfg(any(test, feature = "test-support"))]
596 pub async fn simulate_random_delay(&self) {
597 use rand::prelude::*;
598 use smol::future::yield_now;
599
600 match self {
601 Self::Deterministic { executor, .. } => {
602 if executor.state.lock().rng.gen_bool(0.2) {
603 let yields = executor.state.lock().rng.gen_range(1..=10);
604 for _ in 0..yields {
605 yield_now().await;
606 }
607 }
608 }
609 _ => panic!("this method can only be called on a deterministic executor"),
610 }
611 }
612}
613
614pub struct Scope<'a> {
615 futures: Vec<Pin<Box<dyn Future<Output = ()> + Send + 'static>>>,
616 _phantom: PhantomData<&'a ()>,
617}
618
619impl<'a> Scope<'a> {
620 pub fn spawn<F>(&mut self, f: F)
621 where
622 F: Future<Output = ()> + Send + 'a,
623 {
624 let f = unsafe {
625 mem::transmute::<
626 Pin<Box<dyn Future<Output = ()> + Send + 'a>>,
627 Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
628 >(Box::pin(f))
629 };
630 self.futures.push(f);
631 }
632}
633
634impl<T> Task<T> {
635 pub fn ready(value: T) -> Self {
636 Self::Ready(Some(value))
637 }
638
639 fn local(any_task: AnyLocalTask) -> Self {
640 Self::Local {
641 any_task,
642 result_type: PhantomData,
643 }
644 }
645
646 pub fn detach(self) {
647 match self {
648 Task::Ready(_) => {}
649 Task::Local { any_task, .. } => any_task.detach(),
650 Task::Send { any_task, .. } => any_task.detach(),
651 }
652 }
653}
654
655impl<T: 'static, E: 'static + Display> Task<Result<T, E>> {
656 pub fn detach_and_log_err(self, cx: &mut MutableAppContext) {
657 cx.spawn(|_| async move {
658 if let Err(err) = self.await {
659 log::error!("{}", err);
660 }
661 })
662 .detach();
663 }
664}
665
666impl<T: Send> Task<T> {
667 fn send(any_task: AnyTask) -> Self {
668 Self::Send {
669 any_task,
670 result_type: PhantomData,
671 }
672 }
673}
674
675impl<T: fmt::Debug> fmt::Debug for Task<T> {
676 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
677 match self {
678 Task::Ready(value) => value.fmt(f),
679 Task::Local { any_task, .. } => any_task.fmt(f),
680 Task::Send { any_task, .. } => any_task.fmt(f),
681 }
682 }
683}
684
685impl<T: 'static> Future for Task<T> {
686 type Output = T;
687
688 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
689 match unsafe { self.get_unchecked_mut() } {
690 Task::Ready(value) => Poll::Ready(value.take().unwrap()),
691 Task::Local { any_task, .. } => {
692 any_task.poll(cx).map(|value| *value.downcast().unwrap())
693 }
694 Task::Send { any_task, .. } => {
695 any_task.poll(cx).map(|value| *value.downcast().unwrap())
696 }
697 }
698 }
699}
700
701fn any_future<T, F>(future: F) -> AnyFuture
702where
703 T: 'static + Send,
704 F: Future<Output = T> + Send + 'static,
705{
706 async { Box::new(future.await) as Box<dyn Any + Send> }.boxed()
707}
708
709fn any_local_future<T, F>(future: F) -> AnyLocalFuture
710where
711 T: 'static,
712 F: Future<Output = T> + 'static,
713{
714 async { Box::new(future.await) as Box<dyn Any> }.boxed_local()
715}