dispatcher.rs

  1use std::{
  2    thread::{ThreadId, current},
  3    time::Duration,
  4};
  5
  6use async_task::Runnable;
  7use flume::Sender;
  8use parking::Parker;
  9use parking_lot::Mutex;
 10use util::ResultExt;
 11use windows::{
 12    Foundation::TimeSpan,
 13    System::Threading::{
 14        ThreadPool, ThreadPoolTimer, TimerElapsedHandler, WorkItemHandler, WorkItemOptions,
 15        WorkItemPriority,
 16    },
 17    Win32::{
 18        Foundation::{LPARAM, WPARAM},
 19        UI::WindowsAndMessaging::PostThreadMessageW,
 20    },
 21};
 22
 23use crate::{PlatformDispatcher, TaskLabel, WM_GPUI_TASK_DISPATCHED_ON_MAIN_THREAD};
 24
 25pub(crate) struct WindowsDispatcher {
 26    main_sender: Sender<Runnable>,
 27    parker: Mutex<Parker>,
 28    main_thread_id: ThreadId,
 29    main_thread_id_win32: u32,
 30    validation_number: usize,
 31}
 32
 33impl WindowsDispatcher {
 34    pub(crate) fn new(
 35        main_sender: Sender<Runnable>,
 36        main_thread_id_win32: u32,
 37        validation_number: usize,
 38    ) -> Self {
 39        let parker = Mutex::new(Parker::new());
 40        let main_thread_id = current().id();
 41
 42        WindowsDispatcher {
 43            main_sender,
 44            parker,
 45            main_thread_id,
 46            main_thread_id_win32,
 47            validation_number,
 48        }
 49    }
 50
 51    fn dispatch_on_threadpool(&self, runnable: Runnable) {
 52        let handler = {
 53            let mut task_wrapper = Some(runnable);
 54            WorkItemHandler::new(move |_| {
 55                task_wrapper.take().unwrap().run();
 56                Ok(())
 57            })
 58        };
 59        ThreadPool::RunWithPriorityAndOptionsAsync(
 60            &handler,
 61            WorkItemPriority::High,
 62            WorkItemOptions::TimeSliced,
 63        )
 64        .log_err();
 65    }
 66
 67    fn dispatch_on_threadpool_after(&self, runnable: Runnable, duration: Duration) {
 68        let handler = {
 69            let mut task_wrapper = Some(runnable);
 70            TimerElapsedHandler::new(move |_| {
 71                task_wrapper.take().unwrap().run();
 72                Ok(())
 73            })
 74        };
 75        let delay = TimeSpan {
 76            // A time period expressed in 100-nanosecond units.
 77            // 10,000,000 ticks per second
 78            Duration: (duration.as_nanos() / 100) as i64,
 79        };
 80        ThreadPoolTimer::CreateTimer(&handler, delay).log_err();
 81    }
 82}
 83
 84impl PlatformDispatcher for WindowsDispatcher {
 85    fn is_main_thread(&self) -> bool {
 86        current().id() == self.main_thread_id
 87    }
 88
 89    fn dispatch(&self, runnable: Runnable, label: Option<TaskLabel>) {
 90        self.dispatch_on_threadpool(runnable);
 91        if let Some(label) = label {
 92            log::debug!("TaskLabel: {label:?}");
 93        }
 94    }
 95
 96    fn dispatch_on_main_thread(&self, runnable: Runnable) {
 97        match self.main_sender.send(runnable) {
 98            Ok(_) => unsafe {
 99                PostThreadMessageW(
100                    self.main_thread_id_win32,
101                    WM_GPUI_TASK_DISPATCHED_ON_MAIN_THREAD,
102                    WPARAM(self.validation_number),
103                    LPARAM(0),
104                )
105                .log_err();
106            },
107            Err(runnable) => {
108                // NOTE: Runnable may wrap a Future that is !Send.
109                //
110                // This is usually safe because we only poll it on the main thread.
111                // However if the send fails, we know that:
112                // 1. main_receiver has been dropped (which implies the app is shutting down)
113                // 2. we are on a background thread.
114                // It is not safe to drop something !Send on the wrong thread, and
115                // the app will exit soon anyway, so we must forget the runnable.
116                std::mem::forget(runnable);
117            }
118        }
119    }
120
121    fn dispatch_after(&self, duration: Duration, runnable: Runnable) {
122        self.dispatch_on_threadpool_after(runnable, duration);
123    }
124
125    fn park(&self, timeout: Option<Duration>) -> bool {
126        if let Some(timeout) = timeout {
127            self.parker.lock().park_timeout(timeout)
128        } else {
129            self.parker.lock().park();
130            true
131        }
132    }
133
134    fn unparker(&self) -> parking::Unparker {
135        self.parker.lock().unparker()
136    }
137}