1use anyhow::{anyhow, Result};
2use async_task::Runnable;
3use pin_project::pin_project;
4use smol::prelude::*;
5use smol::{channel, Executor};
6use std::rc::Rc;
7use std::sync::Arc;
8use std::{marker::PhantomData, thread};
9
10use crate::platform;
11
12pub enum Foreground {
13 Platform {
14 dispatcher: Arc<dyn platform::Dispatcher>,
15 _not_send_or_sync: PhantomData<Rc<()>>,
16 },
17 Test(smol::LocalExecutor<'static>),
18}
19
20#[pin_project(project = ForegroundTaskProject)]
21pub enum ForegroundTask<T> {
22 Platform(#[pin] async_task::Task<T>),
23 Test(#[pin] smol::Task<T>),
24}
25
26impl<T> Future for ForegroundTask<T> {
27 type Output = T;
28
29 fn poll(
30 self: std::pin::Pin<&mut Self>,
31 ctx: &mut std::task::Context<'_>,
32 ) -> std::task::Poll<Self::Output> {
33 match self.project() {
34 ForegroundTaskProject::Platform(task) => task.poll(ctx),
35 ForegroundTaskProject::Test(task) => task.poll(ctx),
36 }
37 }
38}
39
40pub struct Background {
41 executor: Arc<smol::Executor<'static>>,
42 _stop: channel::Sender<()>,
43}
44
45#[must_use]
46pub type BackgroundTask<T> = smol::Task<T>;
47
48impl Foreground {
49 pub fn platform(dispatcher: Arc<dyn platform::Dispatcher>) -> Result<Self> {
50 if dispatcher.is_main_thread() {
51 Ok(Self::Platform {
52 dispatcher,
53 _not_send_or_sync: PhantomData,
54 })
55 } else {
56 Err(anyhow!("must be constructed on main thread"))
57 }
58 }
59
60 pub fn test() -> Self {
61 Self::Test(smol::LocalExecutor::new())
62 }
63
64 pub fn spawn<T: 'static>(
65 &self,
66 future: impl Future<Output = T> + 'static,
67 ) -> ForegroundTask<T> {
68 match self {
69 Self::Platform { dispatcher, .. } => {
70 let dispatcher = dispatcher.clone();
71 let schedule = move |runnable: Runnable| dispatcher.run_on_main_thread(runnable);
72 let (runnable, task) = async_task::spawn_local(future, schedule);
73 runnable.schedule();
74 ForegroundTask::Platform(task)
75 }
76 Self::Test(executor) => ForegroundTask::Test(executor.spawn(future)),
77 }
78 }
79
80 pub async fn run<T>(&self, future: impl Future<Output = T>) -> T {
81 match self {
82 Self::Platform { .. } => panic!("you can't call run on a platform foreground executor"),
83 Self::Test(executor) => executor.run(future).await,
84 }
85 }
86}
87
88#[must_use]
89impl<T> ForegroundTask<T> {
90 pub fn detach(self) {
91 match self {
92 Self::Platform(task) => task.detach(),
93 Self::Test(task) => task.detach(),
94 }
95 }
96
97 pub async fn cancel(self) -> Option<T> {
98 match self {
99 Self::Platform(task) => task.cancel().await,
100 Self::Test(task) => task.cancel().await,
101 }
102 }
103}
104
105impl Background {
106 pub fn new() -> Self {
107 let executor = Arc::new(Executor::new());
108 let stop = channel::unbounded::<()>();
109
110 for i in 0..num_cpus::get() {
111 let executor = executor.clone();
112 let stop = stop.1.clone();
113 thread::Builder::new()
114 .name(format!("background-executor-{}", i))
115 .spawn(move || smol::block_on(executor.run(stop.recv())))
116 .unwrap();
117 }
118
119 Self {
120 executor,
121 _stop: stop.0,
122 }
123 }
124
125 pub fn spawn<T>(&self, future: impl Send + Future<Output = T> + 'static) -> BackgroundTask<T>
126 where
127 T: 'static + Send,
128 {
129 self.executor.spawn(future)
130 }
131}