dispatcher.rs

  1use std::{
  2    thread::{ThreadId, current},
  3    time::Duration,
  4};
  5
  6use async_task::Runnable;
  7use flume::Sender;
  8use parking::Parker;
  9use parking_lot::Mutex;
 10use util::ResultExt;
 11use windows::{
 12    System::Threading::{
 13        ThreadPool, ThreadPoolTimer, TimerElapsedHandler, WorkItemHandler, WorkItemPriority,
 14    },
 15    Win32::{
 16        Foundation::{LPARAM, WPARAM},
 17        UI::WindowsAndMessaging::PostThreadMessageW,
 18    },
 19};
 20
 21use crate::{PlatformDispatcher, TaskLabel, WM_GPUI_TASK_DISPATCHED_ON_MAIN_THREAD};
 22
 23pub(crate) struct WindowsDispatcher {
 24    main_sender: Sender<Runnable>,
 25    parker: Mutex<Parker>,
 26    main_thread_id: ThreadId,
 27    main_thread_id_win32: u32,
 28    validation_number: usize,
 29}
 30
 31impl WindowsDispatcher {
 32    pub(crate) fn new(
 33        main_sender: Sender<Runnable>,
 34        main_thread_id_win32: u32,
 35        validation_number: usize,
 36    ) -> Self {
 37        let parker = Mutex::new(Parker::new());
 38        let main_thread_id = current().id();
 39
 40        WindowsDispatcher {
 41            main_sender,
 42            parker,
 43            main_thread_id,
 44            main_thread_id_win32,
 45            validation_number,
 46        }
 47    }
 48
 49    fn dispatch_on_threadpool(&self, runnable: Runnable) {
 50        let handler = {
 51            let mut task_wrapper = Some(runnable);
 52            WorkItemHandler::new(move |_| {
 53                task_wrapper.take().unwrap().run();
 54                Ok(())
 55            })
 56        };
 57        ThreadPool::RunWithPriorityAsync(&handler, WorkItemPriority::High).log_err();
 58    }
 59
 60    fn dispatch_on_threadpool_after(&self, runnable: Runnable, duration: Duration) {
 61        let handler = {
 62            let mut task_wrapper = Some(runnable);
 63            TimerElapsedHandler::new(move |_| {
 64                task_wrapper.take().unwrap().run();
 65                Ok(())
 66            })
 67        };
 68        ThreadPoolTimer::CreateTimer(&handler, duration.into()).log_err();
 69    }
 70}
 71
 72impl PlatformDispatcher for WindowsDispatcher {
 73    fn is_main_thread(&self) -> bool {
 74        current().id() == self.main_thread_id
 75    }
 76
 77    fn dispatch(&self, runnable: Runnable, label: Option<TaskLabel>) {
 78        self.dispatch_on_threadpool(runnable);
 79        if let Some(label) = label {
 80            log::debug!("TaskLabel: {label:?}");
 81        }
 82    }
 83
 84    fn dispatch_on_main_thread(&self, runnable: Runnable) {
 85        match self.main_sender.send(runnable) {
 86            Ok(_) => unsafe {
 87                PostThreadMessageW(
 88                    self.main_thread_id_win32,
 89                    WM_GPUI_TASK_DISPATCHED_ON_MAIN_THREAD,
 90                    WPARAM(self.validation_number),
 91                    LPARAM(0),
 92                )
 93                .log_err();
 94            },
 95            Err(runnable) => {
 96                // NOTE: Runnable may wrap a Future that is !Send.
 97                //
 98                // This is usually safe because we only poll it on the main thread.
 99                // However if the send fails, we know that:
100                // 1. main_receiver has been dropped (which implies the app is shutting down)
101                // 2. we are on a background thread.
102                // It is not safe to drop something !Send on the wrong thread, and
103                // the app will exit soon anyway, so we must forget the runnable.
104                std::mem::forget(runnable);
105            }
106        }
107    }
108
109    fn dispatch_after(&self, duration: Duration, runnable: Runnable) {
110        self.dispatch_on_threadpool_after(runnable, duration);
111    }
112
113    fn park(&self, timeout: Option<Duration>) -> bool {
114        if let Some(timeout) = timeout {
115            self.parker.lock().park_timeout(timeout)
116        } else {
117            self.parker.lock().park();
118            true
119        }
120    }
121
122    fn unparker(&self) -> parking::Unparker {
123        self.parker.lock().unparker()
124    }
125}