dispatcher.rs

  1use std::{
  2    sync::atomic::{AtomicBool, Ordering},
  3    thread::{ThreadId, current},
  4    time::{Duration, Instant},
  5};
  6
  7use flume::Sender;
  8use util::ResultExt;
  9use windows::{
 10    System::Threading::{ThreadPool, ThreadPoolTimer, TimerElapsedHandler, WorkItemHandler},
 11    Win32::{
 12        Foundation::{LPARAM, WPARAM},
 13        UI::WindowsAndMessaging::PostMessageW,
 14    },
 15};
 16
 17use crate::{
 18    GLOBAL_THREAD_TIMINGS, HWND, PlatformDispatcher, RunnableVariant, SafeHwnd, THREAD_TIMINGS,
 19    TaskLabel, TaskTiming, ThreadTaskTimings, WM_GPUI_TASK_DISPATCHED_ON_MAIN_THREAD,
 20};
 21
 22pub(crate) struct WindowsDispatcher {
 23    pub(crate) wake_posted: AtomicBool,
 24    main_sender: Sender<RunnableVariant>,
 25    main_thread_id: ThreadId,
 26    pub(crate) platform_window_handle: SafeHwnd,
 27    validation_number: usize,
 28}
 29
 30impl WindowsDispatcher {
 31    pub(crate) fn new(
 32        main_sender: Sender<RunnableVariant>,
 33        platform_window_handle: HWND,
 34        validation_number: usize,
 35    ) -> Self {
 36        let main_thread_id = current().id();
 37        let platform_window_handle = platform_window_handle.into();
 38
 39        WindowsDispatcher {
 40            main_sender,
 41            main_thread_id,
 42            platform_window_handle,
 43            validation_number,
 44            wake_posted: AtomicBool::new(false),
 45        }
 46    }
 47
 48    fn dispatch_on_threadpool(&self, runnable: RunnableVariant) {
 49        let handler = {
 50            let mut task_wrapper = Some(runnable);
 51            WorkItemHandler::new(move |_| {
 52                Self::execute_runnable(task_wrapper.take().unwrap());
 53                Ok(())
 54            })
 55        };
 56        ThreadPool::RunAsync(&handler).log_err();
 57    }
 58
 59    fn dispatch_on_threadpool_after(&self, runnable: RunnableVariant, duration: Duration) {
 60        let handler = {
 61            let mut task_wrapper = Some(runnable);
 62            TimerElapsedHandler::new(move |_| {
 63                Self::execute_runnable(task_wrapper.take().unwrap());
 64                Ok(())
 65            })
 66        };
 67        ThreadPoolTimer::CreateTimer(&handler, duration.into()).log_err();
 68    }
 69
 70    #[inline(always)]
 71    pub(crate) fn execute_runnable(runnable: RunnableVariant) {
 72        let start = Instant::now();
 73
 74        let mut timing = match runnable {
 75            RunnableVariant::Meta(runnable) => {
 76                let metadata = runnable.metadata();
 77                let location = metadata.location;
 78
 79                if !metadata.is_app_alive() {
 80                    drop(runnable);
 81                    return;
 82                }
 83
 84                let timing = TaskTiming {
 85                    location,
 86                    start,
 87                    end: None,
 88                };
 89                Self::add_task_timing(timing);
 90
 91                runnable.run();
 92
 93                timing
 94            }
 95            RunnableVariant::Compat(runnable) => {
 96                let timing = TaskTiming {
 97                    location: core::panic::Location::caller(),
 98                    start,
 99                    end: None,
100                };
101                Self::add_task_timing(timing);
102
103                runnable.run();
104
105                timing
106            }
107        };
108
109        let end = Instant::now();
110        timing.end = Some(end);
111
112        Self::add_task_timing(timing);
113    }
114
115    pub(crate) fn add_task_timing(timing: TaskTiming) {
116        THREAD_TIMINGS.with(|timings| {
117            let mut timings = timings.lock();
118            let timings = &mut timings.timings;
119
120            if let Some(last_timing) = timings.iter_mut().rev().next() {
121                if last_timing.location == timing.location {
122                    last_timing.end = timing.end;
123                    return;
124                }
125            }
126
127            timings.push_back(timing);
128        });
129    }
130}
131
132impl PlatformDispatcher for WindowsDispatcher {
133    fn get_all_timings(&self) -> Vec<ThreadTaskTimings> {
134        let global_thread_timings = GLOBAL_THREAD_TIMINGS.lock();
135        ThreadTaskTimings::convert(&global_thread_timings)
136    }
137
138    fn get_current_thread_timings(&self) -> Vec<crate::TaskTiming> {
139        THREAD_TIMINGS.with(|timings| {
140            let timings = timings.lock();
141            let timings = &timings.timings;
142
143            let mut vec = Vec::with_capacity(timings.len());
144
145            let (s1, s2) = timings.as_slices();
146            vec.extend_from_slice(s1);
147            vec.extend_from_slice(s2);
148            vec
149        })
150    }
151
152    fn is_main_thread(&self) -> bool {
153        current().id() == self.main_thread_id
154    }
155
156    fn dispatch(
157        &self,
158        runnable: RunnableVariant,
159        label: Option<TaskLabel>,
160        _priority: gpui::Priority,
161    ) {
162        self.dispatch_on_threadpool(runnable);
163        if let Some(label) = label {
164            log::debug!("TaskLabel: {label:?}");
165        }
166    }
167
168    fn dispatch_on_main_thread(&self, runnable: RunnableVariant, _priority: gpui::Priority) {
169        match self.main_sender.send(runnable) {
170            Ok(_) => {
171                if !self.wake_posted.swap(true, Ordering::AcqRel) {
172                    unsafe {
173                        PostMessageW(
174                            Some(self.platform_window_handle.as_raw()),
175                            WM_GPUI_TASK_DISPATCHED_ON_MAIN_THREAD,
176                            WPARAM(self.validation_number),
177                            LPARAM(0),
178                        )
179                        .log_err();
180                    }
181                }
182            }
183            Err(runnable) => {
184                // NOTE: Runnable may wrap a Future that is !Send.
185                //
186                // This is usually safe because we only poll it on the main thread.
187                // However if the send fails, we know that:
188                // 1. main_receiver has been dropped (which implies the app is shutting down)
189                // 2. we are on a background thread.
190                // It is not safe to drop something !Send on the wrong thread, and
191                // the app will exit soon anyway, so we must forget the runnable.
192                std::mem::forget(runnable);
193            }
194        }
195    }
196
197    fn dispatch_after(&self, duration: Duration, runnable: RunnableVariant) {
198        self.dispatch_on_threadpool_after(runnable, duration);
199    }
200
201    fn spawn_realtime(&self, _priority: crate::RealtimePriority, _f: Box<dyn FnOnce() + Send>) {
202        // disabled on windows for now.
203        unimplemented!();
204    }
205}