1use std::{
2 thread::{current, ThreadId},
3 time::Duration,
4};
5
6use async_task::Runnable;
7use parking::Parker;
8use parking_lot::Mutex;
9use util::ResultExt;
10use windows::{
11 Foundation::TimeSpan,
12 System::{
13 DispatcherQueue, DispatcherQueueController, DispatcherQueueHandler,
14 Threading::{
15 ThreadPool, ThreadPoolTimer, TimerElapsedHandler, WorkItemHandler, WorkItemOptions,
16 WorkItemPriority,
17 },
18 },
19 Win32::System::WinRT::{
20 CreateDispatcherQueueController, DispatcherQueueOptions, DQTAT_COM_NONE,
21 DQTYPE_THREAD_CURRENT,
22 },
23};
24
25use crate::{PlatformDispatcher, TaskLabel};
26
27pub(crate) struct WindowsDispatcher {
28 controller: DispatcherQueueController,
29 main_queue: DispatcherQueue,
30 parker: Mutex<Parker>,
31 main_thread_id: ThreadId,
32}
33
34impl WindowsDispatcher {
35 pub(crate) fn new() -> Self {
36 let controller = unsafe {
37 let options = DispatcherQueueOptions {
38 dwSize: std::mem::size_of::<DispatcherQueueOptions>() as u32,
39 threadType: DQTYPE_THREAD_CURRENT,
40 apartmentType: DQTAT_COM_NONE,
41 };
42 CreateDispatcherQueueController(options).unwrap()
43 };
44 let main_queue = controller.DispatcherQueue().unwrap();
45 let parker = Mutex::new(Parker::new());
46 let main_thread_id = current().id();
47
48 WindowsDispatcher {
49 controller,
50 main_queue,
51 parker,
52 main_thread_id,
53 }
54 }
55
56 fn dispatch_on_threadpool(&self, runnable: Runnable) {
57 let handler = {
58 let mut task_wrapper = Some(runnable);
59 WorkItemHandler::new(move |_| {
60 task_wrapper.take().unwrap().run();
61 Ok(())
62 })
63 };
64 ThreadPool::RunWithPriorityAndOptionsAsync(
65 &handler,
66 WorkItemPriority::High,
67 WorkItemOptions::TimeSliced,
68 )
69 .log_err();
70 }
71
72 fn dispatch_on_threadpool_after(&self, runnable: Runnable, duration: Duration) {
73 let handler = {
74 let mut task_wrapper = Some(runnable);
75 TimerElapsedHandler::new(move |_| {
76 task_wrapper.take().unwrap().run();
77 Ok(())
78 })
79 };
80 let delay = TimeSpan {
81 // A time period expressed in 100-nanosecond units.
82 // 10,000,000 ticks per second
83 Duration: (duration.as_nanos() / 100) as i64,
84 };
85 ThreadPoolTimer::CreateTimer(&handler, delay).log_err();
86 }
87}
88
89impl Drop for WindowsDispatcher {
90 fn drop(&mut self) {
91 self.controller.ShutdownQueueAsync().log_err();
92 }
93}
94
95impl PlatformDispatcher for WindowsDispatcher {
96 fn is_main_thread(&self) -> bool {
97 current().id() == self.main_thread_id
98 }
99
100 fn dispatch(&self, runnable: Runnable, label: Option<TaskLabel>) {
101 self.dispatch_on_threadpool(runnable);
102 if let Some(label) = label {
103 log::debug!("TaskLabel: {label:?}");
104 }
105 }
106
107 fn dispatch_on_main_thread(&self, runnable: Runnable) {
108 let handler = {
109 let mut task_wrapper = Some(runnable);
110 DispatcherQueueHandler::new(move || {
111 task_wrapper.take().unwrap().run();
112 Ok(())
113 })
114 };
115 self.main_queue.TryEnqueue(&handler).log_err();
116 }
117
118 fn dispatch_after(&self, duration: Duration, runnable: Runnable) {
119 self.dispatch_on_threadpool_after(runnable, duration);
120 }
121
122 fn park(&self, timeout: Option<Duration>) -> bool {
123 if let Some(timeout) = timeout {
124 self.parker.lock().park_timeout(timeout)
125 } else {
126 self.parker.lock().park();
127 true
128 }
129 }
130
131 fn unparker(&self) -> parking::Unparker {
132 self.parker.lock().unparker()
133 }
134}