1use std::{
2 thread::{ThreadId, current},
3 time::Duration,
4};
5
6use async_task::Runnable;
7use flume::Sender;
8use parking::Parker;
9use parking_lot::Mutex;
10use util::ResultExt;
11use windows::{
12 Foundation::TimeSpan,
13 System::Threading::{
14 ThreadPool, ThreadPoolTimer, TimerElapsedHandler, WorkItemHandler, WorkItemOptions,
15 WorkItemPriority,
16 },
17 Win32::{
18 Foundation::{LPARAM, WPARAM},
19 UI::WindowsAndMessaging::PostThreadMessageW,
20 },
21};
22
23use crate::{PlatformDispatcher, TaskLabel, WM_GPUI_TASK_DISPATCHED_ON_MAIN_THREAD};
24
25pub(crate) struct WindowsDispatcher {
26 main_sender: Sender<Runnable>,
27 parker: Mutex<Parker>,
28 main_thread_id: ThreadId,
29 main_thread_id_win32: u32,
30 validation_number: usize,
31}
32
33impl WindowsDispatcher {
34 pub(crate) fn new(
35 main_sender: Sender<Runnable>,
36 main_thread_id_win32: u32,
37 validation_number: usize,
38 ) -> Self {
39 let parker = Mutex::new(Parker::new());
40 let main_thread_id = current().id();
41
42 WindowsDispatcher {
43 main_sender,
44 parker,
45 main_thread_id,
46 main_thread_id_win32,
47 validation_number,
48 }
49 }
50
51 fn dispatch_on_threadpool(&self, runnable: Runnable) {
52 let handler = {
53 let mut task_wrapper = Some(runnable);
54 WorkItemHandler::new(move |_| {
55 task_wrapper.take().unwrap().run();
56 Ok(())
57 })
58 };
59 ThreadPool::RunWithPriorityAndOptionsAsync(
60 &handler,
61 WorkItemPriority::High,
62 WorkItemOptions::TimeSliced,
63 )
64 .log_err();
65 }
66
67 fn dispatch_on_threadpool_after(&self, runnable: Runnable, duration: Duration) {
68 let handler = {
69 let mut task_wrapper = Some(runnable);
70 TimerElapsedHandler::new(move |_| {
71 task_wrapper.take().unwrap().run();
72 Ok(())
73 })
74 };
75 let delay = TimeSpan {
76 // A time period expressed in 100-nanosecond units.
77 // 10,000,000 ticks per second
78 Duration: (duration.as_nanos() / 100) as i64,
79 };
80 ThreadPoolTimer::CreateTimer(&handler, delay).log_err();
81 }
82}
83
84impl PlatformDispatcher for WindowsDispatcher {
85 fn is_main_thread(&self) -> bool {
86 current().id() == self.main_thread_id
87 }
88
89 fn dispatch(&self, runnable: Runnable, label: Option<TaskLabel>) {
90 self.dispatch_on_threadpool(runnable);
91 if let Some(label) = label {
92 log::debug!("TaskLabel: {label:?}");
93 }
94 }
95
96 fn dispatch_on_main_thread(&self, runnable: Runnable) {
97 match self.main_sender.send(runnable) {
98 Ok(_) => unsafe {
99 PostThreadMessageW(
100 self.main_thread_id_win32,
101 WM_GPUI_TASK_DISPATCHED_ON_MAIN_THREAD,
102 WPARAM(self.validation_number),
103 LPARAM(0),
104 )
105 .log_err();
106 },
107 Err(runnable) => {
108 // NOTE: Runnable may wrap a Future that is !Send.
109 //
110 // This is usually safe because we only poll it on the main thread.
111 // However if the send fails, we know that:
112 // 1. main_receiver has been dropped (which implies the app is shutting down)
113 // 2. we are on a background thread.
114 // It is not safe to drop something !Send on the wrong thread, and
115 // the app will exit soon anyway, so we must forget the runnable.
116 std::mem::forget(runnable);
117 }
118 }
119 }
120
121 fn dispatch_after(&self, duration: Duration, runnable: Runnable) {
122 self.dispatch_on_threadpool_after(runnable, duration);
123 }
124
125 fn park(&self, timeout: Option<Duration>) -> bool {
126 if let Some(timeout) = timeout {
127 self.parker.lock().park_timeout(timeout)
128 } else {
129 self.parker.lock().park();
130 true
131 }
132 }
133
134 fn unparker(&self) -> parking::Unparker {
135 self.parker.lock().unparker()
136 }
137}