1use crate::{AppContext, PlatformDispatcher};
2use smol::prelude::*;
3use std::{
4 fmt::Debug,
5 pin::Pin,
6 sync::Arc,
7 task::{Context, Poll},
8};
9use util::TryFutureExt;
10
11#[derive(Clone)]
12pub struct Executor {
13 dispatcher: Arc<dyn PlatformDispatcher>,
14}
15
16#[must_use]
17pub enum Task<T> {
18 Ready(Option<T>),
19 Spawned(async_task::Task<T>),
20}
21
22impl<T> Task<T> {
23 pub fn ready(val: T) -> Self {
24 Task::Ready(Some(val))
25 }
26
27 pub fn detach(self) {
28 match self {
29 Task::Ready(_) => {}
30 Task::Spawned(task) => task.detach(),
31 }
32 }
33}
34
35impl<E, T> Task<Result<T, E>>
36where
37 T: 'static + Send,
38 E: 'static + Send + Debug,
39{
40 pub fn detach_and_log_err(self, cx: &mut AppContext) {
41 cx.executor().spawn(self.log_err()).detach();
42 }
43}
44
45impl<T> Future for Task<T> {
46 type Output = T;
47
48 fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
49 match unsafe { self.get_unchecked_mut() } {
50 Task::Ready(val) => Poll::Ready(val.take().unwrap()),
51 Task::Spawned(task) => task.poll(cx),
52 }
53 }
54}
55
56impl Executor {
57 pub fn new(dispatcher: Arc<dyn PlatformDispatcher>) -> Self {
58 Self { dispatcher }
59 }
60
61 /// Enqueues the given closure to be run on any thread. The closure returns
62 /// a future which will be run to completion on any available thread.
63 pub fn spawn<R>(&self, future: impl Future<Output = R> + Send + 'static) -> Task<R>
64 where
65 R: Send + 'static,
66 {
67 let dispatcher = self.dispatcher.clone();
68 let (runnable, task) =
69 async_task::spawn(future, move |runnable| dispatcher.dispatch(runnable));
70 runnable.schedule();
71 Task::Spawned(task)
72 }
73
74 /// Enqueues the given closure to run on the application's event loop.
75 /// Returns the result asynchronously.
76 pub fn run_on_main<F, R>(&self, func: F) -> Task<R>
77 where
78 F: FnOnce() -> R + Send + 'static,
79 R: Send + 'static,
80 {
81 if self.dispatcher.is_main_thread() {
82 Task::ready(func())
83 } else {
84 self.spawn_on_main(move || async move { func() })
85 }
86 }
87
88 /// Enqueues the given closure to be run on the application's event loop. The
89 /// closure returns a future which will be run to completion on the main thread.
90 pub fn spawn_on_main<F, R>(&self, func: impl FnOnce() -> F + Send + 'static) -> Task<R>
91 where
92 F: Future<Output = R> + 'static,
93 R: Send + 'static,
94 {
95 let (runnable, task) = async_task::spawn(
96 {
97 let this = self.clone();
98 async move {
99 let task = this.spawn_on_main_local(func());
100 task.await
101 }
102 },
103 {
104 let dispatcher = self.dispatcher.clone();
105 move |runnable| dispatcher.dispatch_on_main_thread(runnable)
106 },
107 );
108 runnable.schedule();
109 Task::Spawned(task)
110 }
111
112 /// Enqueues the given closure to be run on the application's event loop. Must
113 /// be called on the main thread.
114 pub fn spawn_on_main_local<R>(&self, future: impl Future<Output = R> + 'static) -> Task<R>
115 where
116 R: 'static,
117 {
118 assert!(
119 self.dispatcher.is_main_thread(),
120 "must be called on main thread"
121 );
122
123 let dispatcher = self.dispatcher.clone();
124 let (runnable, task) = async_task::spawn_local(future, move |runnable| {
125 dispatcher.dispatch_on_main_thread(runnable)
126 });
127 runnable.schedule();
128 Task::Spawned(task)
129 }
130
131 pub fn block<R>(&self, future: impl Future<Output = R>) -> R {
132 // todo!("integrate with deterministic dispatcher")
133 futures::executor::block_on(future)
134 }
135
136 pub fn is_main_thread(&self) -> bool {
137 self.dispatcher.is_main_thread()
138 }
139}