gpui_tokio.rs

  1use std::future::Future;
  2
  3use gpui::{App, AppContext, Global, ReadGlobal, Task};
  4use util::defer;
  5
  6pub use tokio::task::JoinError;
  7
  8/// Initializes the Tokio wrapper using a new Tokio runtime with 2 worker threads.
  9///
 10/// If you need more threads (or access to the runtime outside of GPUI), you can create the runtime
 11/// yourself and pass a Handle to `init_from_handle`.
 12pub fn init(cx: &mut App) {
 13    let runtime = tokio::runtime::Builder::new_multi_thread()
 14        // Since we now have two executors, let's try to keep our footprint small
 15        .worker_threads(2)
 16        .enable_all()
 17        .build()
 18        .expect("Failed to initialize Tokio");
 19
 20    let handle = runtime.handle().clone();
 21    cx.set_global(GlobalTokio {
 22        owned_runtime: Some(runtime),
 23        handle,
 24    });
 25}
 26
 27/// Initializes the Tokio wrapper using a Tokio runtime handle.
 28pub fn init_from_handle(cx: &mut App, handle: tokio::runtime::Handle) {
 29    cx.set_global(GlobalTokio {
 30        owned_runtime: None,
 31        handle,
 32    });
 33}
 34
 35struct GlobalTokio {
 36    owned_runtime: Option<tokio::runtime::Runtime>,
 37    handle: tokio::runtime::Handle,
 38}
 39
 40impl Global for GlobalTokio {}
 41
 42impl Drop for GlobalTokio {
 43    fn drop(&mut self) {
 44        if let Some(runtime) = self.owned_runtime.take() {
 45            runtime.shutdown_background();
 46        }
 47    }
 48}
 49
 50pub struct Tokio {}
 51
 52impl Tokio {
 53    /// Spawns the given future on Tokio's thread pool, and returns it via a GPUI task
 54    /// Note that the Tokio task will be cancelled if the GPUI task is dropped
 55    pub fn spawn<C, Fut, R>(cx: &C, f: Fut) -> Task<Result<R, JoinError>>
 56    where
 57        C: AppContext,
 58        Fut: Future<Output = R> + Send + 'static,
 59        R: Send + 'static,
 60    {
 61        cx.read_global(|tokio: &GlobalTokio, cx| {
 62            let join_handle = tokio.handle.spawn(f);
 63            let abort_handle = join_handle.abort_handle();
 64            let cancel = defer(move || {
 65                abort_handle.abort();
 66            });
 67            cx.background_spawn(async move {
 68                let result = join_handle.await;
 69                drop(cancel);
 70                result
 71            })
 72        })
 73    }
 74
 75    /// Spawns the given future on Tokio's thread pool, and returns it via a GPUI task
 76    /// Note that the Tokio task will be cancelled if the GPUI task is dropped
 77    pub fn spawn_result<C, Fut, R>(cx: &C, f: Fut) -> Task<anyhow::Result<R>>
 78    where
 79        C: AppContext,
 80        Fut: Future<Output = anyhow::Result<R>> + Send + 'static,
 81        R: Send + 'static,
 82    {
 83        cx.read_global(|tokio: &GlobalTokio, cx| {
 84            let join_handle = tokio.handle.spawn(f);
 85            let abort_handle = join_handle.abort_handle();
 86            let cancel = defer(move || {
 87                abort_handle.abort();
 88            });
 89            cx.background_spawn(async move {
 90                let result = join_handle.await?;
 91                drop(cancel);
 92                result
 93            })
 94        })
 95    }
 96
 97    pub fn handle(cx: &App) -> tokio::runtime::Handle {
 98        GlobalTokio::global(cx).handle.clone()
 99    }
100}