dispatcher.rs

  1use std::{
  2    thread::{current, ThreadId},
  3    time::Duration,
  4};
  5
  6use anyhow::Context as _;
  7use async_task::Runnable;
  8use flume::Sender;
  9use parking::Parker;
 10use parking_lot::Mutex;
 11use util::ResultExt;
 12use windows::{
 13    Foundation::TimeSpan,
 14    System::Threading::{
 15        ThreadPool, ThreadPoolTimer, TimerElapsedHandler, WorkItemHandler, WorkItemOptions,
 16        WorkItemPriority,
 17    },
 18    Win32::{
 19        Foundation::{LPARAM, WPARAM},
 20        UI::WindowsAndMessaging::PostThreadMessageW,
 21    },
 22};
 23
 24use crate::{PlatformDispatcher, TaskLabel, WM_GPUI_TASK_DISPATCHED_ON_MAIN_THREAD};
 25
 26pub(crate) struct WindowsDispatcher {
 27    main_sender: Sender<Runnable>,
 28    parker: Mutex<Parker>,
 29    main_thread_id: ThreadId,
 30    main_thread_id_win32: u32,
 31    validation_number: usize,
 32}
 33
 34impl WindowsDispatcher {
 35    pub(crate) fn new(
 36        main_sender: Sender<Runnable>,
 37        main_thread_id_win32: u32,
 38        validation_number: usize,
 39    ) -> Self {
 40        let parker = Mutex::new(Parker::new());
 41        let main_thread_id = current().id();
 42
 43        WindowsDispatcher {
 44            main_sender,
 45            parker,
 46            main_thread_id,
 47            main_thread_id_win32,
 48            validation_number,
 49        }
 50    }
 51
 52    fn dispatch_on_threadpool(&self, runnable: Runnable) {
 53        let handler = {
 54            let mut task_wrapper = Some(runnable);
 55            WorkItemHandler::new(move |_| {
 56                task_wrapper.take().unwrap().run();
 57                Ok(())
 58            })
 59        };
 60        ThreadPool::RunWithPriorityAndOptionsAsync(
 61            &handler,
 62            WorkItemPriority::High,
 63            WorkItemOptions::TimeSliced,
 64        )
 65        .log_err();
 66    }
 67
 68    fn dispatch_on_threadpool_after(&self, runnable: Runnable, duration: Duration) {
 69        let handler = {
 70            let mut task_wrapper = Some(runnable);
 71            TimerElapsedHandler::new(move |_| {
 72                task_wrapper.take().unwrap().run();
 73                Ok(())
 74            })
 75        };
 76        let delay = TimeSpan {
 77            // A time period expressed in 100-nanosecond units.
 78            // 10,000,000 ticks per second
 79            Duration: (duration.as_nanos() / 100) as i64,
 80        };
 81        ThreadPoolTimer::CreateTimer(&handler, delay).log_err();
 82    }
 83}
 84
 85impl PlatformDispatcher for WindowsDispatcher {
 86    fn is_main_thread(&self) -> bool {
 87        current().id() == self.main_thread_id
 88    }
 89
 90    fn dispatch(&self, runnable: Runnable, label: Option<TaskLabel>) {
 91        self.dispatch_on_threadpool(runnable);
 92        if let Some(label) = label {
 93            log::debug!("TaskLabel: {label:?}");
 94        }
 95    }
 96
 97    fn dispatch_on_main_thread(&self, runnable: Runnable) {
 98        if self
 99            .main_sender
100            .send(runnable)
101            .context("Dispatch on main thread failed")
102            .log_err()
103            .is_some()
104        {
105            unsafe {
106                PostThreadMessageW(
107                    self.main_thread_id_win32,
108                    WM_GPUI_TASK_DISPATCHED_ON_MAIN_THREAD,
109                    WPARAM(self.validation_number),
110                    LPARAM(0),
111                )
112                .log_err();
113            }
114        }
115    }
116
117    fn dispatch_after(&self, duration: Duration, runnable: Runnable) {
118        self.dispatch_on_threadpool_after(runnable, duration);
119    }
120
121    fn park(&self, timeout: Option<Duration>) -> bool {
122        if let Some(timeout) = timeout {
123            self.parker.lock().park_timeout(timeout)
124        } else {
125            self.parker.lock().park();
126            true
127        }
128    }
129
130    fn unparker(&self) -> parking::Unparker {
131        self.parker.lock().unparker()
132    }
133}