1use std::{
2 sync::atomic::{AtomicBool, Ordering},
3 thread::{ThreadId, current},
4 time::{Duration, Instant},
5};
6
7use flume::Sender;
8use util::ResultExt;
9use windows::{
10 System::Threading::{ThreadPool, ThreadPoolTimer, TimerElapsedHandler, WorkItemHandler},
11 Win32::{
12 Foundation::{LPARAM, WPARAM},
13 UI::WindowsAndMessaging::PostMessageW,
14 },
15};
16
17use crate::{
18 GLOBAL_THREAD_TIMINGS, HWND, PlatformDispatcher, RunnableVariant, SafeHwnd, THREAD_TIMINGS,
19 TaskLabel, TaskTiming, ThreadTaskTimings, WM_GPUI_TASK_DISPATCHED_ON_MAIN_THREAD,
20};
21
22pub(crate) struct WindowsDispatcher {
23 pub(crate) wake_posted: AtomicBool,
24 main_sender: Sender<RunnableVariant>,
25 main_thread_id: ThreadId,
26 pub(crate) platform_window_handle: SafeHwnd,
27 validation_number: usize,
28}
29
30impl WindowsDispatcher {
31 pub(crate) fn new(
32 main_sender: Sender<RunnableVariant>,
33 platform_window_handle: HWND,
34 validation_number: usize,
35 ) -> Self {
36 let main_thread_id = current().id();
37 let platform_window_handle = platform_window_handle.into();
38
39 WindowsDispatcher {
40 main_sender,
41 main_thread_id,
42 platform_window_handle,
43 validation_number,
44 wake_posted: AtomicBool::new(false),
45 }
46 }
47
48 fn dispatch_on_threadpool(&self, runnable: RunnableVariant) {
49 let handler = {
50 let mut task_wrapper = Some(runnable);
51 WorkItemHandler::new(move |_| {
52 Self::execute_runnable(task_wrapper.take().unwrap());
53 Ok(())
54 })
55 };
56 ThreadPool::RunAsync(&handler).log_err();
57 }
58
59 fn dispatch_on_threadpool_after(&self, runnable: RunnableVariant, duration: Duration) {
60 let handler = {
61 let mut task_wrapper = Some(runnable);
62 TimerElapsedHandler::new(move |_| {
63 Self::execute_runnable(task_wrapper.take().unwrap());
64 Ok(())
65 })
66 };
67 ThreadPoolTimer::CreateTimer(&handler, duration.into()).log_err();
68 }
69
70 #[inline(always)]
71 pub(crate) fn execute_runnable(runnable: RunnableVariant) {
72 let start = Instant::now();
73
74 let mut timing = match runnable {
75 RunnableVariant::Meta(runnable) => {
76 let metadata = runnable.metadata();
77 let location = metadata.location;
78
79 if !metadata.is_app_alive() {
80 drop(runnable);
81 return;
82 }
83
84 let timing = TaskTiming {
85 location,
86 start,
87 end: None,
88 };
89 Self::add_task_timing(timing);
90
91 runnable.run();
92
93 timing
94 }
95 RunnableVariant::Compat(runnable) => {
96 let timing = TaskTiming {
97 location: core::panic::Location::caller(),
98 start,
99 end: None,
100 };
101 Self::add_task_timing(timing);
102
103 runnable.run();
104
105 timing
106 }
107 };
108
109 let end = Instant::now();
110 timing.end = Some(end);
111
112 Self::add_task_timing(timing);
113 }
114
115 pub(crate) fn add_task_timing(timing: TaskTiming) {
116 THREAD_TIMINGS.with(|timings| {
117 let mut timings = timings.lock();
118 let timings = &mut timings.timings;
119
120 if let Some(last_timing) = timings.iter_mut().rev().next() {
121 if last_timing.location == timing.location {
122 last_timing.end = timing.end;
123 return;
124 }
125 }
126
127 timings.push_back(timing);
128 });
129 }
130}
131
132impl PlatformDispatcher for WindowsDispatcher {
133 fn get_all_timings(&self) -> Vec<ThreadTaskTimings> {
134 let global_thread_timings = GLOBAL_THREAD_TIMINGS.lock();
135 ThreadTaskTimings::convert(&global_thread_timings)
136 }
137
138 fn get_current_thread_timings(&self) -> Vec<crate::TaskTiming> {
139 THREAD_TIMINGS.with(|timings| {
140 let timings = timings.lock();
141 let timings = &timings.timings;
142
143 let mut vec = Vec::with_capacity(timings.len());
144
145 let (s1, s2) = timings.as_slices();
146 vec.extend_from_slice(s1);
147 vec.extend_from_slice(s2);
148 vec
149 })
150 }
151
152 fn is_main_thread(&self) -> bool {
153 current().id() == self.main_thread_id
154 }
155
156 fn dispatch(
157 &self,
158 runnable: RunnableVariant,
159 label: Option<TaskLabel>,
160 _priority: gpui::Priority,
161 ) {
162 self.dispatch_on_threadpool(runnable);
163 if let Some(label) = label {
164 log::debug!("TaskLabel: {label:?}");
165 }
166 }
167
168 fn dispatch_on_main_thread(&self, runnable: RunnableVariant, _priority: gpui::Priority) {
169 match self.main_sender.send(runnable) {
170 Ok(_) => {
171 if !self.wake_posted.swap(true, Ordering::AcqRel) {
172 unsafe {
173 PostMessageW(
174 Some(self.platform_window_handle.as_raw()),
175 WM_GPUI_TASK_DISPATCHED_ON_MAIN_THREAD,
176 WPARAM(self.validation_number),
177 LPARAM(0),
178 )
179 .log_err();
180 }
181 }
182 }
183 Err(runnable) => {
184 // NOTE: Runnable may wrap a Future that is !Send.
185 //
186 // This is usually safe because we only poll it on the main thread.
187 // However if the send fails, we know that:
188 // 1. main_receiver has been dropped (which implies the app is shutting down)
189 // 2. we are on a background thread.
190 // It is not safe to drop something !Send on the wrong thread, and
191 // the app will exit soon anyway, so we must forget the runnable.
192 std::mem::forget(runnable);
193 }
194 }
195 }
196
197 fn dispatch_after(&self, duration: Duration, runnable: RunnableVariant) {
198 self.dispatch_on_threadpool_after(runnable, duration);
199 }
200
201 fn spawn_realtime(&self, _priority: crate::RealtimePriority, _f: Box<dyn FnOnce() + Send>) {
202 // disabled on windows for now.
203 unimplemented!();
204 }
205}