1use std::{
2 thread::{ThreadId, current},
3 time::Duration,
4};
5
6use async_task::Runnable;
7use flume::Sender;
8use parking::Parker;
9use parking_lot::Mutex;
10use util::ResultExt;
11use windows::{
12 System::Threading::{
13 ThreadPool, ThreadPoolTimer, TimerElapsedHandler, WorkItemHandler, WorkItemPriority,
14 },
15 Win32::{
16 Foundation::{LPARAM, WPARAM},
17 UI::WindowsAndMessaging::PostMessageW,
18 },
19};
20
21use crate::{
22 HWND, PlatformDispatcher, SafeHwnd, TaskLabel, WM_GPUI_TASK_DISPATCHED_ON_MAIN_THREAD,
23};
24
25pub(crate) struct WindowsDispatcher {
26 main_sender: Sender<Runnable>,
27 parker: Mutex<Parker>,
28 main_thread_id: ThreadId,
29 platform_window_handle: SafeHwnd,
30 validation_number: usize,
31}
32
33impl WindowsDispatcher {
34 pub(crate) fn new(
35 main_sender: Sender<Runnable>,
36 platform_window_handle: HWND,
37 validation_number: usize,
38 ) -> Self {
39 let parker = Mutex::new(Parker::new());
40 let main_thread_id = current().id();
41 let platform_window_handle = platform_window_handle.into();
42
43 WindowsDispatcher {
44 main_sender,
45 parker,
46 main_thread_id,
47 platform_window_handle,
48 validation_number,
49 }
50 }
51
52 fn dispatch_on_threadpool(&self, runnable: Runnable) {
53 let handler = {
54 let mut task_wrapper = Some(runnable);
55 WorkItemHandler::new(move |_| {
56 profiling::register_thread!();
57 task_wrapper.take().unwrap().run();
58 Ok(())
59 })
60 };
61 ThreadPool::RunWithPriorityAsync(&handler, WorkItemPriority::High).log_err();
62 }
63
64 fn dispatch_on_threadpool_after(&self, runnable: Runnable, duration: Duration) {
65 let handler = {
66 let mut task_wrapper = Some(runnable);
67 TimerElapsedHandler::new(move |_| {
68 profiling::register_thread!();
69 task_wrapper.take().unwrap().run();
70 Ok(())
71 })
72 };
73 ThreadPoolTimer::CreateTimer(&handler, duration.into()).log_err();
74 }
75}
76
77impl PlatformDispatcher for WindowsDispatcher {
78 fn is_main_thread(&self) -> bool {
79 current().id() == self.main_thread_id
80 }
81
82 fn dispatch(&self, runnable: Runnable, label: Option<TaskLabel>) {
83 self.dispatch_on_threadpool(runnable);
84 if let Some(label) = label {
85 log::debug!("TaskLabel: {label:?}");
86 }
87 }
88
89 fn dispatch_on_main_thread(&self, runnable: Runnable) {
90 match self.main_sender.send(runnable) {
91 Ok(_) => unsafe {
92 PostMessageW(
93 Some(self.platform_window_handle.as_raw()),
94 WM_GPUI_TASK_DISPATCHED_ON_MAIN_THREAD,
95 WPARAM(self.validation_number),
96 LPARAM(0),
97 )
98 .log_err();
99 },
100 Err(runnable) => {
101 // NOTE: Runnable may wrap a Future that is !Send.
102 //
103 // This is usually safe because we only poll it on the main thread.
104 // However if the send fails, we know that:
105 // 1. main_receiver has been dropped (which implies the app is shutting down)
106 // 2. we are on a background thread.
107 // It is not safe to drop something !Send on the wrong thread, and
108 // the app will exit soon anyway, so we must forget the runnable.
109 std::mem::forget(runnable);
110 }
111 }
112 }
113
114 fn dispatch_after(&self, duration: Duration, runnable: Runnable) {
115 self.dispatch_on_threadpool_after(runnable, duration);
116 }
117
118 fn park(&self, timeout: Option<Duration>) -> bool {
119 if let Some(timeout) = timeout {
120 self.parker.lock().park_timeout(timeout)
121 } else {
122 self.parker.lock().park();
123 true
124 }
125 }
126
127 fn unparker(&self) -> parking::Unparker {
128 self.parker.lock().unparker()
129 }
130}