1use std::{
2 thread::{current, ThreadId},
3 time::Duration,
4};
5
6use async_task::Runnable;
7use parking::Parker;
8use parking_lot::Mutex;
9use util::ResultExt;
10use windows::{
11 Foundation::TimeSpan,
12 System::{
13 DispatcherQueue, DispatcherQueueController, DispatcherQueueHandler,
14 Threading::{
15 ThreadPool, ThreadPoolTimer, TimerElapsedHandler, WorkItemHandler, WorkItemOptions,
16 WorkItemPriority,
17 },
18 },
19 Win32::System::WinRT::{
20 CreateDispatcherQueueController, DispatcherQueueOptions, DQTAT_COM_NONE,
21 DQTYPE_THREAD_CURRENT,
22 },
23};
24
25use crate::{PlatformDispatcher, TaskLabel};
26
27pub(crate) struct WindowsDispatcher {
28 controller: DispatcherQueueController,
29 main_queue: DispatcherQueue,
30 parker: Mutex<Parker>,
31 main_thread_id: ThreadId,
32}
33
34unsafe impl Send for WindowsDispatcher {}
35unsafe impl Sync for WindowsDispatcher {}
36
37impl WindowsDispatcher {
38 pub(crate) fn new() -> Self {
39 let controller = unsafe {
40 let options = DispatcherQueueOptions {
41 dwSize: std::mem::size_of::<DispatcherQueueOptions>() as u32,
42 threadType: DQTYPE_THREAD_CURRENT,
43 apartmentType: DQTAT_COM_NONE,
44 };
45 CreateDispatcherQueueController(options).unwrap()
46 };
47 let main_queue = controller.DispatcherQueue().unwrap();
48 let parker = Mutex::new(Parker::new());
49 let main_thread_id = current().id();
50
51 WindowsDispatcher {
52 controller,
53 main_queue,
54 parker,
55 main_thread_id,
56 }
57 }
58
59 fn dispatch_on_threadpool(&self, runnable: Runnable) {
60 let handler = {
61 let mut task_wrapper = Some(runnable);
62 WorkItemHandler::new(move |_| {
63 task_wrapper.take().unwrap().run();
64 Ok(())
65 })
66 };
67 ThreadPool::RunWithPriorityAndOptionsAsync(
68 &handler,
69 WorkItemPriority::High,
70 WorkItemOptions::TimeSliced,
71 )
72 .log_err();
73 }
74
75 fn dispatch_on_threadpool_after(&self, runnable: Runnable, duration: Duration) {
76 let handler = {
77 let mut task_wrapper = Some(runnable);
78 TimerElapsedHandler::new(move |_| {
79 task_wrapper.take().unwrap().run();
80 Ok(())
81 })
82 };
83 let delay = TimeSpan {
84 // A time period expressed in 100-nanosecond units.
85 // 10,000,000 ticks per second
86 Duration: (duration.as_nanos() / 100) as i64,
87 };
88 ThreadPoolTimer::CreateTimer(&handler, delay).log_err();
89 }
90}
91
92impl Drop for WindowsDispatcher {
93 fn drop(&mut self) {
94 self.controller.ShutdownQueueAsync().log_err();
95 }
96}
97
98impl PlatformDispatcher for WindowsDispatcher {
99 fn is_main_thread(&self) -> bool {
100 current().id() == self.main_thread_id
101 }
102
103 fn dispatch(&self, runnable: Runnable, label: Option<TaskLabel>) {
104 self.dispatch_on_threadpool(runnable);
105 if let Some(label) = label {
106 log::debug!("TaskLabel: {label:?}");
107 }
108 }
109
110 fn dispatch_on_main_thread(&self, runnable: Runnable) {
111 let handler = {
112 let mut task_wrapper = Some(runnable);
113 DispatcherQueueHandler::new(move || {
114 task_wrapper.take().unwrap().run();
115 Ok(())
116 })
117 };
118 self.main_queue.TryEnqueue(&handler).log_err();
119 }
120
121 fn dispatch_after(&self, duration: Duration, runnable: Runnable) {
122 self.dispatch_on_threadpool_after(runnable, duration);
123 }
124
125 fn park(&self, timeout: Option<Duration>) -> bool {
126 if let Some(timeout) = timeout {
127 self.parker.lock().park_timeout(timeout)
128 } else {
129 self.parker.lock().park();
130 true
131 }
132 }
133
134 fn unparker(&self) -> parking::Unparker {
135 self.parker.lock().unparker()
136 }
137}