dispatcher.rs

  1use std::{
  2    thread::{ThreadId, current},
  3    time::Duration,
  4};
  5
  6use async_task::Runnable;
  7use flume::Sender;
  8use parking::Parker;
  9use parking_lot::Mutex;
 10use util::ResultExt;
 11use windows::{
 12    System::Threading::{
 13        ThreadPool, ThreadPoolTimer, TimerElapsedHandler, WorkItemHandler, WorkItemPriority,
 14    },
 15    Win32::{
 16        Foundation::{LPARAM, WPARAM},
 17        UI::WindowsAndMessaging::PostMessageW,
 18    },
 19};
 20
 21use crate::{
 22    HWND, PlatformDispatcher, SafeHwnd, TaskLabel, WM_GPUI_TASK_DISPATCHED_ON_MAIN_THREAD,
 23};
 24
 25pub(crate) struct WindowsDispatcher {
 26    main_sender: Sender<Runnable>,
 27    parker: Mutex<Parker>,
 28    main_thread_id: ThreadId,
 29    platform_window_handle: SafeHwnd,
 30    validation_number: usize,
 31}
 32
 33impl WindowsDispatcher {
 34    pub(crate) fn new(
 35        main_sender: Sender<Runnable>,
 36        platform_window_handle: HWND,
 37        validation_number: usize,
 38    ) -> Self {
 39        let parker = Mutex::new(Parker::new());
 40        let main_thread_id = current().id();
 41        let platform_window_handle = platform_window_handle.into();
 42
 43        WindowsDispatcher {
 44            main_sender,
 45            parker,
 46            main_thread_id,
 47            platform_window_handle,
 48            validation_number,
 49        }
 50    }
 51
 52    fn dispatch_on_threadpool(&self, runnable: Runnable) {
 53        let handler = {
 54            let mut task_wrapper = Some(runnable);
 55            WorkItemHandler::new(move |_| {
 56                profiling::register_thread!();
 57                task_wrapper.take().unwrap().run();
 58                Ok(())
 59            })
 60        };
 61        ThreadPool::RunWithPriorityAsync(&handler, WorkItemPriority::High).log_err();
 62    }
 63
 64    fn dispatch_on_threadpool_after(&self, runnable: Runnable, duration: Duration) {
 65        let handler = {
 66            let mut task_wrapper = Some(runnable);
 67            TimerElapsedHandler::new(move |_| {
 68                profiling::register_thread!();
 69                task_wrapper.take().unwrap().run();
 70                Ok(())
 71            })
 72        };
 73        ThreadPoolTimer::CreateTimer(&handler, duration.into()).log_err();
 74    }
 75}
 76
 77impl PlatformDispatcher for WindowsDispatcher {
 78    fn is_main_thread(&self) -> bool {
 79        current().id() == self.main_thread_id
 80    }
 81
 82    fn dispatch(&self, runnable: Runnable, label: Option<TaskLabel>) {
 83        self.dispatch_on_threadpool(runnable);
 84        if let Some(label) = label {
 85            log::debug!("TaskLabel: {label:?}");
 86        }
 87    }
 88
 89    fn dispatch_on_main_thread(&self, runnable: Runnable) {
 90        match self.main_sender.send(runnable) {
 91            Ok(_) => unsafe {
 92                PostMessageW(
 93                    Some(self.platform_window_handle.as_raw()),
 94                    WM_GPUI_TASK_DISPATCHED_ON_MAIN_THREAD,
 95                    WPARAM(self.validation_number),
 96                    LPARAM(0),
 97                )
 98                .log_err();
 99            },
100            Err(runnable) => {
101                // NOTE: Runnable may wrap a Future that is !Send.
102                //
103                // This is usually safe because we only poll it on the main thread.
104                // However if the send fails, we know that:
105                // 1. main_receiver has been dropped (which implies the app is shutting down)
106                // 2. we are on a background thread.
107                // It is not safe to drop something !Send on the wrong thread, and
108                // the app will exit soon anyway, so we must forget the runnable.
109                std::mem::forget(runnable);
110            }
111        }
112    }
113
114    fn dispatch_after(&self, duration: Duration, runnable: Runnable) {
115        self.dispatch_on_threadpool_after(runnable, duration);
116    }
117
118    fn park(&self, timeout: Option<Duration>) -> bool {
119        if let Some(timeout) = timeout {
120            self.parker.lock().park_timeout(timeout)
121        } else {
122            self.parker.lock().park();
123            true
124        }
125    }
126
127    fn unparker(&self) -> parking::Unparker {
128        self.parker.lock().unparker()
129    }
130}