dispatcher.rs

  1#![allow(non_upper_case_globals)]
  2#![allow(non_camel_case_types)]
  3#![allow(non_snake_case)]
  4
  5use gpui::{
  6    GLOBAL_THREAD_TIMINGS, PlatformDispatcher, Priority, RunnableMeta, RunnableVariant,
  7    THREAD_TIMINGS, TaskTiming, ThreadTaskTimings,
  8};
  9use mach2::{
 10    kern_return::KERN_SUCCESS,
 11    mach_time::mach_timebase_info_data_t,
 12    thread_policy::{
 13        THREAD_EXTENDED_POLICY, THREAD_EXTENDED_POLICY_COUNT, THREAD_PRECEDENCE_POLICY,
 14        THREAD_PRECEDENCE_POLICY_COUNT, THREAD_TIME_CONSTRAINT_POLICY,
 15        THREAD_TIME_CONSTRAINT_POLICY_COUNT, thread_extended_policy_data_t,
 16        thread_precedence_policy_data_t, thread_time_constraint_policy_data_t,
 17    },
 18};
 19use util::ResultExt;
 20
 21use async_task::Runnable;
 22use objc::{
 23    class, msg_send,
 24    runtime::{BOOL, YES},
 25    sel, sel_impl,
 26};
 27use std::{
 28    ffi::c_void,
 29    ptr::{NonNull, addr_of},
 30    time::{Duration, Instant},
 31};
 32
 33/// All items in the generated file are marked as pub, so we're gonna wrap it in a separate mod to prevent
 34/// these pub items from leaking into public API.
 35pub(crate) mod dispatch_sys {
 36    include!(concat!(env!("OUT_DIR"), "/dispatch_sys.rs"));
 37}
 38
 39use dispatch_sys::*;
 40pub(crate) fn dispatch_get_main_queue() -> dispatch_queue_t {
 41    addr_of!(_dispatch_main_q) as *const _ as dispatch_queue_t
 42}
 43
 44pub(crate) struct MacDispatcher;
 45
 46impl MacDispatcher {
 47    pub fn new() -> Self {
 48        Self
 49    }
 50}
 51
 52impl PlatformDispatcher for MacDispatcher {
 53    fn get_all_timings(&self) -> Vec<ThreadTaskTimings> {
 54        let global_timings = GLOBAL_THREAD_TIMINGS.lock();
 55        ThreadTaskTimings::convert(&global_timings)
 56    }
 57
 58    fn get_current_thread_timings(&self) -> ThreadTaskTimings {
 59        THREAD_TIMINGS.with(|timings| {
 60            let timings = timings.lock();
 61            let thread_name = timings.thread_name.clone();
 62            let total_pushed = timings.total_pushed;
 63            let timings = &timings.timings;
 64
 65            let mut vec = Vec::with_capacity(timings.len());
 66
 67            let (s1, s2) = timings.as_slices();
 68            vec.extend_from_slice(s1);
 69            vec.extend_from_slice(s2);
 70
 71            ThreadTaskTimings {
 72                thread_name,
 73                thread_id: std::thread::current().id(),
 74                timings: vec,
 75                total_pushed,
 76            }
 77        })
 78    }
 79
 80    fn is_main_thread(&self) -> bool {
 81        let is_main_thread: BOOL = unsafe { msg_send![class!(NSThread), isMainThread] };
 82        is_main_thread == YES
 83    }
 84
 85    fn dispatch(&self, runnable: RunnableVariant, priority: Priority) {
 86        let context = runnable.into_raw().as_ptr() as *mut c_void;
 87
 88        let queue_priority = match priority {
 89            Priority::RealtimeAudio => {
 90                panic!("RealtimeAudio priority should use spawn_realtime, not dispatch")
 91            }
 92            Priority::High => DISPATCH_QUEUE_PRIORITY_HIGH as isize,
 93            Priority::Medium => DISPATCH_QUEUE_PRIORITY_DEFAULT as isize,
 94            Priority::Low => DISPATCH_QUEUE_PRIORITY_LOW as isize,
 95        };
 96
 97        unsafe {
 98            dispatch_async_f(
 99                dispatch_get_global_queue(queue_priority, 0),
100                context,
101                Some(trampoline as unsafe extern "C" fn(*mut c_void)),
102            );
103        }
104    }
105
106    fn dispatch_on_main_thread(&self, runnable: RunnableVariant, _priority: Priority) {
107        let context = runnable.into_raw().as_ptr() as *mut c_void;
108        unsafe {
109            dispatch_async_f(
110                dispatch_get_main_queue(),
111                context,
112                Some(trampoline as unsafe extern "C" fn(*mut c_void)),
113            );
114        }
115    }
116
117    fn dispatch_after(&self, duration: Duration, runnable: RunnableVariant) {
118        let context = runnable.into_raw().as_ptr() as *mut c_void;
119        unsafe {
120            let queue =
121                dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_HIGH.try_into().unwrap(), 0);
122            let when = dispatch_time(DISPATCH_TIME_NOW as u64, duration.as_nanos() as i64);
123            dispatch_after_f(
124                when,
125                queue,
126                context,
127                Some(trampoline as unsafe extern "C" fn(*mut c_void)),
128            );
129        }
130    }
131
132    fn spawn_realtime(&self, f: Box<dyn FnOnce() + Send>) {
133        std::thread::spawn(move || {
134            set_audio_thread_priority().log_err();
135            f();
136        });
137    }
138}
139
140fn set_audio_thread_priority() -> anyhow::Result<()> {
141    // https://chromium.googlesource.com/chromium/chromium/+/master/base/threading/platform_thread_mac.mm#93
142
143    // SAFETY: always safe to call
144    let thread_id = unsafe { libc::pthread_self() };
145
146    // SAFETY: thread_id is a valid thread id
147    let thread_id = unsafe { libc::pthread_mach_thread_np(thread_id) };
148
149    // Fixed priority thread
150    let mut policy = thread_extended_policy_data_t { timeshare: 0 };
151
152    // SAFETY: thread_id is a valid thread id
153    // SAFETY: thread_extended_policy_data_t is passed as THREAD_EXTENDED_POLICY
154    let result = unsafe {
155        mach2::thread_policy::thread_policy_set(
156            thread_id,
157            THREAD_EXTENDED_POLICY,
158            &mut policy as *mut _ as *mut _,
159            THREAD_EXTENDED_POLICY_COUNT,
160        )
161    };
162
163    if result != KERN_SUCCESS {
164        anyhow::bail!("failed to set thread extended policy");
165    }
166
167    // relatively high priority
168    let mut precedence = thread_precedence_policy_data_t { importance: 63 };
169
170    // SAFETY: thread_id is a valid thread id
171    // SAFETY: thread_precedence_policy_data_t is passed as THREAD_PRECEDENCE_POLICY
172    let result = unsafe {
173        mach2::thread_policy::thread_policy_set(
174            thread_id,
175            THREAD_PRECEDENCE_POLICY,
176            &mut precedence as *mut _ as *mut _,
177            THREAD_PRECEDENCE_POLICY_COUNT,
178        )
179    };
180
181    if result != KERN_SUCCESS {
182        anyhow::bail!("failed to set thread precedence policy");
183    }
184
185    const GUARANTEED_AUDIO_DUTY_CYCLE: f32 = 0.75;
186    const MAX_AUDIO_DUTY_CYCLE: f32 = 0.85;
187
188    // ~128 frames @ 44.1KHz
189    const TIME_QUANTUM: f32 = 2.9;
190
191    const AUDIO_TIME_NEEDED: f32 = GUARANTEED_AUDIO_DUTY_CYCLE * TIME_QUANTUM;
192    const MAX_TIME_ALLOWED: f32 = MAX_AUDIO_DUTY_CYCLE * TIME_QUANTUM;
193
194    let mut timebase_info = mach_timebase_info_data_t { numer: 0, denom: 0 };
195    // SAFETY: timebase_info is a valid pointer to a mach_timebase_info_data_t struct
196    unsafe { mach2::mach_time::mach_timebase_info(&mut timebase_info) };
197
198    let ms_to_abs_time = ((timebase_info.denom as f32) / (timebase_info.numer as f32)) * 1000000f32;
199
200    let mut time_constraints = thread_time_constraint_policy_data_t {
201        period: (TIME_QUANTUM * ms_to_abs_time) as u32,
202        computation: (AUDIO_TIME_NEEDED * ms_to_abs_time) as u32,
203        constraint: (MAX_TIME_ALLOWED * ms_to_abs_time) as u32,
204        preemptible: 0,
205    };
206
207    // SAFETY: thread_id is a valid thread id
208    // SAFETY: thread_precedence_pthread_time_constraint_policy_data_t is passed as THREAD_TIME_CONSTRAINT_POLICY
209    let result = unsafe {
210        mach2::thread_policy::thread_policy_set(
211            thread_id,
212            THREAD_TIME_CONSTRAINT_POLICY,
213            &mut time_constraints as *mut _ as *mut _,
214            THREAD_TIME_CONSTRAINT_POLICY_COUNT,
215        )
216    };
217
218    if result != KERN_SUCCESS {
219        anyhow::bail!("failed to set thread time constraint policy");
220    }
221
222    Ok(())
223}
224
225extern "C" fn trampoline(context: *mut c_void) {
226    let runnable =
227        unsafe { Runnable::<RunnableMeta>::from_raw(NonNull::new_unchecked(context as *mut ())) };
228
229    let metadata = runnable.metadata();
230
231    // Check if the executor that spawned this task was closed
232    if metadata.is_closed() {
233        return;
234    }
235
236    let location = metadata.location;
237
238    let start = Instant::now();
239    let timing = TaskTiming {
240        location,
241        start,
242        end: None,
243    };
244
245    THREAD_TIMINGS.with(|timings| {
246        let mut timings = timings.lock();
247        let timings = &mut timings.timings;
248        if let Some(last_timing) = timings.iter_mut().rev().next() {
249            if last_timing.location == timing.location {
250                return;
251            }
252        }
253
254        timings.push_back(timing);
255    });
256
257    runnable.run();
258    let end = Instant::now();
259
260    THREAD_TIMINGS.with(|timings| {
261        let mut timings = timings.lock();
262        let timings = &mut timings.timings;
263        let Some(last_timing) = timings.iter_mut().rev().next() else {
264            return;
265        };
266        last_timing.end = Some(end);
267    });
268}