dispatcher.rs

  1use std::{
  2    thread::{current, ThreadId},
  3    time::Duration,
  4};
  5
  6use async_task::Runnable;
  7use parking::Parker;
  8use parking_lot::Mutex;
  9use util::ResultExt;
 10use windows::{
 11    Foundation::TimeSpan,
 12    System::{
 13        DispatcherQueue, DispatcherQueueController, DispatcherQueueHandler,
 14        Threading::{
 15            ThreadPool, ThreadPoolTimer, TimerElapsedHandler, WorkItemHandler, WorkItemOptions,
 16            WorkItemPriority,
 17        },
 18    },
 19    Win32::System::WinRT::{
 20        CreateDispatcherQueueController, DispatcherQueueOptions, DQTAT_COM_NONE,
 21        DQTYPE_THREAD_CURRENT,
 22    },
 23};
 24
 25use crate::{PlatformDispatcher, TaskLabel};
 26
 27pub(crate) struct WindowsDispatcher {
 28    controller: DispatcherQueueController,
 29    main_queue: DispatcherQueue,
 30    parker: Mutex<Parker>,
 31    main_thread_id: ThreadId,
 32}
 33
 34impl WindowsDispatcher {
 35    pub(crate) fn new() -> Self {
 36        let controller = unsafe {
 37            let options = DispatcherQueueOptions {
 38                dwSize: std::mem::size_of::<DispatcherQueueOptions>() as u32,
 39                threadType: DQTYPE_THREAD_CURRENT,
 40                apartmentType: DQTAT_COM_NONE,
 41            };
 42            CreateDispatcherQueueController(options).unwrap()
 43        };
 44        let main_queue = controller.DispatcherQueue().unwrap();
 45        let parker = Mutex::new(Parker::new());
 46        let main_thread_id = current().id();
 47
 48        WindowsDispatcher {
 49            controller,
 50            main_queue,
 51            parker,
 52            main_thread_id,
 53        }
 54    }
 55
 56    fn dispatch_on_threadpool(&self, runnable: Runnable) {
 57        let handler = {
 58            let mut task_wrapper = Some(runnable);
 59            WorkItemHandler::new(move |_| {
 60                task_wrapper.take().unwrap().run();
 61                Ok(())
 62            })
 63        };
 64        ThreadPool::RunWithPriorityAndOptionsAsync(
 65            &handler,
 66            WorkItemPriority::High,
 67            WorkItemOptions::TimeSliced,
 68        )
 69        .log_err();
 70    }
 71
 72    fn dispatch_on_threadpool_after(&self, runnable: Runnable, duration: Duration) {
 73        let handler = {
 74            let mut task_wrapper = Some(runnable);
 75            TimerElapsedHandler::new(move |_| {
 76                task_wrapper.take().unwrap().run();
 77                Ok(())
 78            })
 79        };
 80        let delay = TimeSpan {
 81            // A time period expressed in 100-nanosecond units.
 82            // 10,000,000 ticks per second
 83            Duration: (duration.as_nanos() / 100) as i64,
 84        };
 85        ThreadPoolTimer::CreateTimer(&handler, delay).log_err();
 86    }
 87}
 88
 89impl Drop for WindowsDispatcher {
 90    fn drop(&mut self) {
 91        self.controller.ShutdownQueueAsync().log_err();
 92    }
 93}
 94
 95impl PlatformDispatcher for WindowsDispatcher {
 96    fn is_main_thread(&self) -> bool {
 97        current().id() == self.main_thread_id
 98    }
 99
100    fn dispatch(&self, runnable: Runnable, label: Option<TaskLabel>) {
101        self.dispatch_on_threadpool(runnable);
102        if let Some(label) = label {
103            log::debug!("TaskLabel: {label:?}");
104        }
105    }
106
107    fn dispatch_on_main_thread(&self, runnable: Runnable) {
108        let handler = {
109            let mut task_wrapper = Some(runnable);
110            DispatcherQueueHandler::new(move || {
111                task_wrapper.take().unwrap().run();
112                Ok(())
113            })
114        };
115        self.main_queue.TryEnqueue(&handler).log_err();
116    }
117
118    fn dispatch_after(&self, duration: Duration, runnable: Runnable) {
119        self.dispatch_on_threadpool_after(runnable, duration);
120    }
121
122    fn park(&self, timeout: Option<Duration>) -> bool {
123        if let Some(timeout) = timeout {
124            self.parker.lock().park_timeout(timeout)
125        } else {
126            self.parker.lock().park();
127            true
128        }
129    }
130
131    fn unparker(&self) -> parking::Unparker {
132        self.parker.lock().unparker()
133    }
134}