1use std::{
2 sync::atomic::{AtomicBool, Ordering},
3 thread::{ThreadId, current},
4 time::{Duration, Instant},
5};
6
7use anyhow::Context;
8use util::ResultExt;
9use windows::{
10 System::Threading::{
11 ThreadPool, ThreadPoolTimer, TimerElapsedHandler, WorkItemHandler, WorkItemPriority,
12 },
13 Win32::{
14 Foundation::{LPARAM, WPARAM},
15 System::Threading::{
16 GetCurrentThread, HIGH_PRIORITY_CLASS, SetPriorityClass, SetThreadPriority,
17 THREAD_PRIORITY_TIME_CRITICAL,
18 },
19 UI::WindowsAndMessaging::PostMessageW,
20 },
21};
22
23use crate::{
24 GLOBAL_THREAD_TIMINGS, HWND, PlatformDispatcher, Priority, PriorityQueueSender,
25 RunnableVariant, SafeHwnd, THREAD_TIMINGS, TaskTiming, ThreadTaskTimings,
26 WM_GPUI_TASK_DISPATCHED_ON_MAIN_THREAD, profiler,
27};
28
29pub(crate) struct WindowsDispatcher {
30 pub(crate) wake_posted: AtomicBool,
31 main_sender: PriorityQueueSender<RunnableVariant>,
32 main_thread_id: ThreadId,
33 pub(crate) platform_window_handle: SafeHwnd,
34 validation_number: usize,
35}
36
37impl WindowsDispatcher {
38 pub(crate) fn new(
39 main_sender: PriorityQueueSender<RunnableVariant>,
40 platform_window_handle: HWND,
41 validation_number: usize,
42 ) -> Self {
43 let main_thread_id = current().id();
44 let platform_window_handle = platform_window_handle.into();
45
46 WindowsDispatcher {
47 main_sender,
48 main_thread_id,
49 platform_window_handle,
50 validation_number,
51 wake_posted: AtomicBool::new(false),
52 }
53 }
54
55 fn dispatch_on_threadpool(&self, priority: WorkItemPriority, runnable: RunnableVariant) {
56 let handler = {
57 let mut task_wrapper = Some(runnable);
58 WorkItemHandler::new(move |_| {
59 let runnable = task_wrapper.take().unwrap();
60 // Check if the executor that spawned this task was closed
61 if runnable.metadata().is_closed() {
62 return Ok(());
63 }
64 Self::execute_runnable(runnable);
65 Ok(())
66 })
67 };
68
69 ThreadPool::RunWithPriorityAsync(&handler, priority).log_err();
70 }
71
72 fn dispatch_on_threadpool_after(&self, runnable: RunnableVariant, duration: Duration) {
73 let handler = {
74 let mut task_wrapper = Some(runnable);
75 TimerElapsedHandler::new(move |_| {
76 let runnable = task_wrapper.take().unwrap();
77 // Check if the executor that spawned this task was closed
78 if runnable.metadata().is_closed() {
79 return Ok(());
80 }
81 Self::execute_runnable(runnable);
82 Ok(())
83 })
84 };
85 ThreadPoolTimer::CreateTimer(&handler, duration.into()).log_err();
86 }
87
88 #[inline(always)]
89 pub(crate) fn execute_runnable(runnable: RunnableVariant) {
90 let start = Instant::now();
91
92 let location = runnable.metadata().location;
93 let mut timing = TaskTiming {
94 location,
95 start,
96 end: None,
97 };
98 profiler::add_task_timing(timing);
99
100 runnable.run();
101
102 let end = Instant::now();
103 timing.end = Some(end);
104
105 profiler::add_task_timing(timing);
106 }
107}
108
109impl PlatformDispatcher for WindowsDispatcher {
110 fn get_all_timings(&self) -> Vec<ThreadTaskTimings> {
111 let global_thread_timings = GLOBAL_THREAD_TIMINGS.lock();
112 ThreadTaskTimings::convert(&global_thread_timings)
113 }
114
115 fn get_current_thread_timings(&self) -> Vec<crate::TaskTiming> {
116 THREAD_TIMINGS.with(|timings| {
117 let timings = timings.lock();
118 let timings = &timings.timings;
119
120 let mut vec = Vec::with_capacity(timings.len());
121
122 let (s1, s2) = timings.as_slices();
123 vec.extend_from_slice(s1);
124 vec.extend_from_slice(s2);
125 vec
126 })
127 }
128
129 fn is_main_thread(&self) -> bool {
130 current().id() == self.main_thread_id
131 }
132
133 fn dispatch(&self, runnable: RunnableVariant, priority: Priority) {
134 let priority = match priority {
135 Priority::RealtimeAudio => {
136 panic!("RealtimeAudio priority should use spawn_realtime, not dispatch")
137 }
138 Priority::High => WorkItemPriority::High,
139 Priority::Medium => WorkItemPriority::Normal,
140 Priority::Low => WorkItemPriority::Low,
141 };
142 self.dispatch_on_threadpool(priority, runnable);
143 }
144
145 fn dispatch_on_main_thread(&self, runnable: RunnableVariant, priority: Priority) {
146 match self.main_sender.send(priority, runnable) {
147 Ok(_) => {
148 if !self.wake_posted.swap(true, Ordering::AcqRel) {
149 unsafe {
150 PostMessageW(
151 Some(self.platform_window_handle.as_raw()),
152 WM_GPUI_TASK_DISPATCHED_ON_MAIN_THREAD,
153 WPARAM(self.validation_number),
154 LPARAM(0),
155 )
156 .log_err();
157 }
158 }
159 }
160 Err(runnable) => {
161 // NOTE: Runnable may wrap a Future that is !Send.
162 //
163 // This is usually safe because we only poll it on the main thread.
164 // However if the send fails, we know that:
165 // 1. main_receiver has been dropped (which implies the app is shutting down)
166 // 2. we are on a background thread.
167 // It is not safe to drop something !Send on the wrong thread, and
168 // the app will exit soon anyway, so we must forget the runnable.
169 std::mem::forget(runnable);
170 }
171 }
172 }
173
174 fn dispatch_after(&self, duration: Duration, runnable: RunnableVariant) {
175 self.dispatch_on_threadpool_after(runnable, duration);
176 }
177
178 fn spawn_realtime(&self, f: Box<dyn FnOnce() + Send>) {
179 std::thread::spawn(move || {
180 // SAFETY: always safe to call
181 let thread_handle = unsafe { GetCurrentThread() };
182
183 // SAFETY: thread_handle is a valid handle to a thread
184 unsafe { SetPriorityClass(thread_handle, HIGH_PRIORITY_CLASS) }
185 .context("thread priority class")
186 .log_err();
187
188 // SAFETY: thread_handle is a valid handle to a thread
189 unsafe { SetThreadPriority(thread_handle, THREAD_PRIORITY_TIME_CRITICAL) }
190 .context("thread priority")
191 .log_err();
192
193 f();
194 });
195 }
196}