1use std::{
2 thread::{current, ThreadId},
3 time::Duration,
4};
5
6use anyhow::Context;
7use async_task::Runnable;
8use flume::Sender;
9use parking::Parker;
10use parking_lot::Mutex;
11use util::ResultExt;
12use windows::{
13 Foundation::TimeSpan,
14 System::Threading::{
15 ThreadPool, ThreadPoolTimer, TimerElapsedHandler, WorkItemHandler, WorkItemOptions,
16 WorkItemPriority,
17 },
18 Win32::{Foundation::HANDLE, System::Threading::SetEvent},
19};
20
21use crate::{PlatformDispatcher, SafeHandle, TaskLabel};
22
23pub(crate) struct WindowsDispatcher {
24 main_sender: Sender<Runnable>,
25 dispatch_event: SafeHandle,
26 parker: Mutex<Parker>,
27 main_thread_id: ThreadId,
28}
29
30impl WindowsDispatcher {
31 pub(crate) fn new(main_sender: Sender<Runnable>, dispatch_event: HANDLE) -> Self {
32 let dispatch_event = dispatch_event.into();
33 let parker = Mutex::new(Parker::new());
34 let main_thread_id = current().id();
35
36 WindowsDispatcher {
37 main_sender,
38 dispatch_event,
39 parker,
40 main_thread_id,
41 }
42 }
43
44 fn dispatch_on_threadpool(&self, runnable: Runnable) {
45 let handler = {
46 let mut task_wrapper = Some(runnable);
47 WorkItemHandler::new(move |_| {
48 task_wrapper.take().unwrap().run();
49 Ok(())
50 })
51 };
52 ThreadPool::RunWithPriorityAndOptionsAsync(
53 &handler,
54 WorkItemPriority::High,
55 WorkItemOptions::TimeSliced,
56 )
57 .log_err();
58 }
59
60 fn dispatch_on_threadpool_after(&self, runnable: Runnable, duration: Duration) {
61 let handler = {
62 let mut task_wrapper = Some(runnable);
63 TimerElapsedHandler::new(move |_| {
64 task_wrapper.take().unwrap().run();
65 Ok(())
66 })
67 };
68 let delay = TimeSpan {
69 // A time period expressed in 100-nanosecond units.
70 // 10,000,000 ticks per second
71 Duration: (duration.as_nanos() / 100) as i64,
72 };
73 ThreadPoolTimer::CreateTimer(&handler, delay).log_err();
74 }
75}
76
77impl PlatformDispatcher for WindowsDispatcher {
78 fn is_main_thread(&self) -> bool {
79 current().id() == self.main_thread_id
80 }
81
82 fn dispatch(&self, runnable: Runnable, label: Option<TaskLabel>) {
83 self.dispatch_on_threadpool(runnable);
84 if let Some(label) = label {
85 log::debug!("TaskLabel: {label:?}");
86 }
87 }
88
89 fn dispatch_on_main_thread(&self, runnable: Runnable) {
90 self.main_sender
91 .send(runnable)
92 .context("Dispatch on main thread failed")
93 .log_err();
94 unsafe { SetEvent(*self.dispatch_event).log_err() };
95 }
96
97 fn dispatch_after(&self, duration: Duration, runnable: Runnable) {
98 self.dispatch_on_threadpool_after(runnable, duration);
99 }
100
101 fn park(&self, timeout: Option<Duration>) -> bool {
102 if let Some(timeout) = timeout {
103 self.parker.lock().park_timeout(timeout)
104 } else {
105 self.parker.lock().park();
106 true
107 }
108 }
109
110 fn unparker(&self) -> parking::Unparker {
111 self.parker.lock().unparker()
112 }
113}