1use std::{
2 thread::{ThreadId, current},
3 time::Duration,
4};
5
6use async_task::Runnable;
7use flume::Sender;
8use util::ResultExt;
9use windows::{
10 System::Threading::{
11 ThreadPool, ThreadPoolTimer, TimerElapsedHandler, WorkItemHandler, WorkItemPriority,
12 },
13 Win32::{
14 Foundation::{LPARAM, WPARAM},
15 UI::WindowsAndMessaging::PostMessageW,
16 },
17};
18
19use crate::{
20 HWND, PlatformDispatcher, SafeHwnd, TaskLabel, WM_GPUI_TASK_DISPATCHED_ON_MAIN_THREAD,
21};
22
23pub(crate) struct WindowsDispatcher {
24 main_sender: Sender<Runnable>,
25 main_thread_id: ThreadId,
26 platform_window_handle: SafeHwnd,
27 validation_number: usize,
28}
29
30impl WindowsDispatcher {
31 pub(crate) fn new(
32 main_sender: Sender<Runnable>,
33 platform_window_handle: HWND,
34 validation_number: usize,
35 ) -> Self {
36 let main_thread_id = current().id();
37 let platform_window_handle = platform_window_handle.into();
38
39 WindowsDispatcher {
40 main_sender,
41 main_thread_id,
42 platform_window_handle,
43 validation_number,
44 }
45 }
46
47 fn dispatch_on_threadpool(&self, runnable: Runnable) {
48 let handler = {
49 let mut task_wrapper = Some(runnable);
50 WorkItemHandler::new(move |_| {
51 task_wrapper.take().unwrap().run();
52 Ok(())
53 })
54 };
55 ThreadPool::RunWithPriorityAsync(&handler, WorkItemPriority::High).log_err();
56 }
57
58 fn dispatch_on_threadpool_after(&self, runnable: Runnable, duration: Duration) {
59 let handler = {
60 let mut task_wrapper = Some(runnable);
61 TimerElapsedHandler::new(move |_| {
62 task_wrapper.take().unwrap().run();
63 Ok(())
64 })
65 };
66 ThreadPoolTimer::CreateTimer(&handler, duration.into()).log_err();
67 }
68}
69
70impl PlatformDispatcher for WindowsDispatcher {
71 fn is_main_thread(&self) -> bool {
72 current().id() == self.main_thread_id
73 }
74
75 fn dispatch(&self, runnable: Runnable, label: Option<TaskLabel>) {
76 self.dispatch_on_threadpool(runnable);
77 if let Some(label) = label {
78 log::debug!("TaskLabel: {label:?}");
79 }
80 }
81
82 fn dispatch_on_main_thread(&self, runnable: Runnable) {
83 let pre_len = self.main_sender.len();
84 match self.main_sender.send(runnable) {
85 Ok(_) => unsafe {
86 // Only send a `WM_GPUI_TASK_DISPATCHED_ON_MAIN_THREAD` to the
87 // queue if we have no runnables queued up yet, otherwise we
88 // risk filling the message queue with gpui messages causing us
89 // to starve the message loop of system messages, resulting in a
90 // process hang.
91 //
92 // When the message loop receives a
93 // `WM_GPUI_TASK_DISPATCHED_ON_MAIN_THREAD` message we drain the
94 // runnable queue entirely.
95 //
96 // Note that we check whether our task sending did not increase
97 // the channel length, that would imply the main thread is
98 // processing tasks right now which means we might have observed
99 // a non empty channel while missing the processing with our
100 // task in that case we send another
101 // `WM_GPUI_TASK_DISPATCHED_ON_MAIN_THREAD` message to make sure
102 // we get processed
103 if pre_len == 0 || self.main_sender.len() <= pre_len {
104 PostMessageW(
105 Some(self.platform_window_handle.as_raw()),
106 WM_GPUI_TASK_DISPATCHED_ON_MAIN_THREAD,
107 WPARAM(self.validation_number),
108 LPARAM(0),
109 )
110 .log_err();
111 }
112 },
113 Err(runnable) => {
114 // NOTE: Runnable may wrap a Future that is !Send.
115 //
116 // This is usually safe because we only poll it on the main thread.
117 // However if the send fails, we know that:
118 // 1. main_receiver has been dropped (which implies the app is shutting down)
119 // 2. we are on a background thread.
120 // It is not safe to drop something !Send on the wrong thread, and
121 // the app will exit soon anyway, so we must forget the runnable.
122 std::mem::forget(runnable);
123 }
124 }
125 }
126
127 fn dispatch_after(&self, duration: Duration, runnable: Runnable) {
128 self.dispatch_on_threadpool_after(runnable, duration);
129 }
130}