dispatcher.rs

  1use std::{
  2    thread::{current, ThreadId},
  3    time::Duration,
  4};
  5
  6use async_task::Runnable;
  7use parking::Parker;
  8use parking_lot::Mutex;
  9use util::ResultExt;
 10use windows::{
 11    Foundation::TimeSpan,
 12    System::{
 13        DispatcherQueue, DispatcherQueueController, DispatcherQueueHandler,
 14        Threading::{
 15            ThreadPool, ThreadPoolTimer, TimerElapsedHandler, WorkItemHandler, WorkItemOptions,
 16            WorkItemPriority,
 17        },
 18    },
 19    Win32::System::WinRT::{
 20        CreateDispatcherQueueController, DispatcherQueueOptions, DQTAT_COM_NONE,
 21        DQTYPE_THREAD_CURRENT,
 22    },
 23};
 24
 25use crate::{PlatformDispatcher, TaskLabel};
 26
 27pub(crate) struct WindowsDispatcher {
 28    controller: DispatcherQueueController,
 29    main_queue: DispatcherQueue,
 30    parker: Mutex<Parker>,
 31    main_thread_id: ThreadId,
 32}
 33
 34unsafe impl Send for WindowsDispatcher {}
 35unsafe impl Sync for WindowsDispatcher {}
 36
 37impl WindowsDispatcher {
 38    pub(crate) fn new() -> Self {
 39        let controller = unsafe {
 40            let options = DispatcherQueueOptions {
 41                dwSize: std::mem::size_of::<DispatcherQueueOptions>() as u32,
 42                threadType: DQTYPE_THREAD_CURRENT,
 43                apartmentType: DQTAT_COM_NONE,
 44            };
 45            CreateDispatcherQueueController(options).unwrap()
 46        };
 47        let main_queue = controller.DispatcherQueue().unwrap();
 48        let parker = Mutex::new(Parker::new());
 49        let main_thread_id = current().id();
 50
 51        WindowsDispatcher {
 52            controller,
 53            main_queue,
 54            parker,
 55            main_thread_id,
 56        }
 57    }
 58
 59    fn dispatch_on_threadpool(&self, runnable: Runnable) {
 60        let handler = {
 61            let mut task_wrapper = Some(runnable);
 62            WorkItemHandler::new(move |_| {
 63                task_wrapper.take().unwrap().run();
 64                Ok(())
 65            })
 66        };
 67        ThreadPool::RunWithPriorityAndOptionsAsync(
 68            &handler,
 69            WorkItemPriority::High,
 70            WorkItemOptions::TimeSliced,
 71        )
 72        .log_err();
 73    }
 74
 75    fn dispatch_on_threadpool_after(&self, runnable: Runnable, duration: Duration) {
 76        let handler = {
 77            let mut task_wrapper = Some(runnable);
 78            TimerElapsedHandler::new(move |_| {
 79                task_wrapper.take().unwrap().run();
 80                Ok(())
 81            })
 82        };
 83        let delay = TimeSpan {
 84            // A time period expressed in 100-nanosecond units.
 85            // 10,000,000 ticks per second
 86            Duration: (duration.as_nanos() / 100) as i64,
 87        };
 88        ThreadPoolTimer::CreateTimer(&handler, delay).log_err();
 89    }
 90}
 91
 92impl Drop for WindowsDispatcher {
 93    fn drop(&mut self) {
 94        self.controller.ShutdownQueueAsync().log_err();
 95    }
 96}
 97
 98impl PlatformDispatcher for WindowsDispatcher {
 99    fn is_main_thread(&self) -> bool {
100        current().id() == self.main_thread_id
101    }
102
103    fn dispatch(&self, runnable: Runnable, label: Option<TaskLabel>) {
104        self.dispatch_on_threadpool(runnable);
105        if let Some(label) = label {
106            log::debug!("TaskLabel: {label:?}");
107        }
108    }
109
110    fn dispatch_on_main_thread(&self, runnable: Runnable) {
111        let handler = {
112            let mut task_wrapper = Some(runnable);
113            DispatcherQueueHandler::new(move || {
114                task_wrapper.take().unwrap().run();
115                Ok(())
116            })
117        };
118        self.main_queue.TryEnqueue(&handler).log_err();
119    }
120
121    fn dispatch_after(&self, duration: Duration, runnable: Runnable) {
122        self.dispatch_on_threadpool_after(runnable, duration);
123    }
124
125    fn park(&self, timeout: Option<Duration>) -> bool {
126        if let Some(timeout) = timeout {
127            self.parker.lock().park_timeout(timeout)
128        } else {
129            self.parker.lock().park();
130            true
131        }
132    }
133
134    fn unparker(&self) -> parking::Unparker {
135        self.parker.lock().unparker()
136    }
137}