dispatcher.rs

  1#![allow(non_upper_case_globals)]
  2#![allow(non_camel_case_types)]
  3#![allow(non_snake_case)]
  4
  5use crate::{
  6    GLOBAL_THREAD_TIMINGS, PlatformDispatcher, Priority, RunnableMeta, RunnableVariant,
  7    THREAD_TIMINGS, TaskTiming, ThreadTaskTimings,
  8};
  9use mach2::{
 10    kern_return::KERN_SUCCESS,
 11    mach_time::mach_timebase_info_data_t,
 12    thread_policy::{
 13        THREAD_EXTENDED_POLICY, THREAD_EXTENDED_POLICY_COUNT, THREAD_PRECEDENCE_POLICY,
 14        THREAD_PRECEDENCE_POLICY_COUNT, THREAD_TIME_CONSTRAINT_POLICY,
 15        THREAD_TIME_CONSTRAINT_POLICY_COUNT, thread_extended_policy_data_t,
 16        thread_precedence_policy_data_t, thread_time_constraint_policy_data_t,
 17    },
 18};
 19use util::ResultExt;
 20
 21use async_task::Runnable;
 22use objc::{
 23    class, msg_send,
 24    runtime::{BOOL, YES},
 25    sel, sel_impl,
 26};
 27use std::{
 28    ffi::c_void,
 29    ptr::{NonNull, addr_of},
 30    time::{Duration, Instant},
 31};
 32
 33/// All items in the generated file are marked as pub, so we're gonna wrap it in a separate mod to prevent
 34/// these pub items from leaking into public API.
 35pub(crate) mod dispatch_sys {
 36    include!(concat!(env!("OUT_DIR"), "/dispatch_sys.rs"));
 37}
 38
 39use dispatch_sys::*;
 40pub(crate) fn dispatch_get_main_queue() -> dispatch_queue_t {
 41    addr_of!(_dispatch_main_q) as *const _ as dispatch_queue_t
 42}
 43
 44pub(crate) struct MacDispatcher;
 45
 46impl MacDispatcher {
 47    pub fn new() -> Self {
 48        Self
 49    }
 50}
 51
 52impl PlatformDispatcher for MacDispatcher {
 53    fn get_all_timings(&self) -> Vec<ThreadTaskTimings> {
 54        let global_timings = GLOBAL_THREAD_TIMINGS.lock();
 55        ThreadTaskTimings::convert(&global_timings)
 56    }
 57
 58    fn get_current_thread_timings(&self) -> Vec<TaskTiming> {
 59        THREAD_TIMINGS.with(|timings| {
 60            let timings = &timings.lock().timings;
 61
 62            let mut vec = Vec::with_capacity(timings.len());
 63
 64            let (s1, s2) = timings.as_slices();
 65            vec.extend_from_slice(s1);
 66            vec.extend_from_slice(s2);
 67            vec
 68        })
 69    }
 70
 71    fn is_main_thread(&self) -> bool {
 72        let is_main_thread: BOOL = unsafe { msg_send![class!(NSThread), isMainThread] };
 73        is_main_thread == YES
 74    }
 75
 76    fn dispatch(&self, runnable: RunnableVariant, priority: Priority) {
 77        let context = runnable.into_raw().as_ptr() as *mut c_void;
 78
 79        let queue_priority = match priority {
 80            Priority::RealtimeAudio => {
 81                panic!("RealtimeAudio priority should use spawn_realtime, not dispatch")
 82            }
 83            Priority::High => DISPATCH_QUEUE_PRIORITY_HIGH as isize,
 84            Priority::Medium => DISPATCH_QUEUE_PRIORITY_DEFAULT as isize,
 85            Priority::Low => DISPATCH_QUEUE_PRIORITY_LOW as isize,
 86        };
 87
 88        unsafe {
 89            dispatch_async_f(
 90                dispatch_get_global_queue(queue_priority, 0),
 91                context,
 92                Some(trampoline as unsafe extern "C" fn(*mut c_void)),
 93            );
 94        }
 95    }
 96
 97    fn dispatch_on_main_thread(&self, runnable: RunnableVariant, _priority: Priority) {
 98        let context = runnable.into_raw().as_ptr() as *mut c_void;
 99        unsafe {
100            dispatch_async_f(
101                dispatch_get_main_queue(),
102                context,
103                Some(trampoline as unsafe extern "C" fn(*mut c_void)),
104            );
105        }
106    }
107
108    fn dispatch_after(&self, duration: Duration, runnable: RunnableVariant) {
109        let context = runnable.into_raw().as_ptr() as *mut c_void;
110        unsafe {
111            let queue =
112                dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_HIGH.try_into().unwrap(), 0);
113            let when = dispatch_time(DISPATCH_TIME_NOW as u64, duration.as_nanos() as i64);
114            dispatch_after_f(
115                when,
116                queue,
117                context,
118                Some(trampoline as unsafe extern "C" fn(*mut c_void)),
119            );
120        }
121    }
122
123    fn spawn_realtime(&self, f: Box<dyn FnOnce() + Send>) {
124        std::thread::spawn(move || {
125            set_audio_thread_priority().log_err();
126            f();
127        });
128    }
129}
130
131fn set_audio_thread_priority() -> anyhow::Result<()> {
132    // https://chromium.googlesource.com/chromium/chromium/+/master/base/threading/platform_thread_mac.mm#93
133
134    // SAFETY: always safe to call
135    let thread_id = unsafe { libc::pthread_self() };
136
137    // SAFETY: thread_id is a valid thread id
138    let thread_id = unsafe { libc::pthread_mach_thread_np(thread_id) };
139
140    // Fixed priority thread
141    let mut policy = thread_extended_policy_data_t { timeshare: 0 };
142
143    // SAFETY: thread_id is a valid thread id
144    // SAFETY: thread_extended_policy_data_t is passed as THREAD_EXTENDED_POLICY
145    let result = unsafe {
146        mach2::thread_policy::thread_policy_set(
147            thread_id,
148            THREAD_EXTENDED_POLICY,
149            &mut policy as *mut _ as *mut _,
150            THREAD_EXTENDED_POLICY_COUNT,
151        )
152    };
153
154    if result != KERN_SUCCESS {
155        anyhow::bail!("failed to set thread extended policy");
156    }
157
158    // relatively high priority
159    let mut precedence = thread_precedence_policy_data_t { importance: 63 };
160
161    // SAFETY: thread_id is a valid thread id
162    // SAFETY: thread_precedence_policy_data_t is passed as THREAD_PRECEDENCE_POLICY
163    let result = unsafe {
164        mach2::thread_policy::thread_policy_set(
165            thread_id,
166            THREAD_PRECEDENCE_POLICY,
167            &mut precedence as *mut _ as *mut _,
168            THREAD_PRECEDENCE_POLICY_COUNT,
169        )
170    };
171
172    if result != KERN_SUCCESS {
173        anyhow::bail!("failed to set thread precedence policy");
174    }
175
176    const GUARANTEED_AUDIO_DUTY_CYCLE: f32 = 0.75;
177    const MAX_AUDIO_DUTY_CYCLE: f32 = 0.85;
178
179    // ~128 frames @ 44.1KHz
180    const TIME_QUANTUM: f32 = 2.9;
181
182    const AUDIO_TIME_NEEDED: f32 = GUARANTEED_AUDIO_DUTY_CYCLE * TIME_QUANTUM;
183    const MAX_TIME_ALLOWED: f32 = MAX_AUDIO_DUTY_CYCLE * TIME_QUANTUM;
184
185    let mut timebase_info = mach_timebase_info_data_t { numer: 0, denom: 0 };
186    // SAFETY: timebase_info is a valid pointer to a mach_timebase_info_data_t struct
187    unsafe { mach2::mach_time::mach_timebase_info(&mut timebase_info) };
188
189    let ms_to_abs_time = ((timebase_info.denom as f32) / (timebase_info.numer as f32)) * 1000000f32;
190
191    let mut time_constraints = thread_time_constraint_policy_data_t {
192        period: (TIME_QUANTUM * ms_to_abs_time) as u32,
193        computation: (AUDIO_TIME_NEEDED * ms_to_abs_time) as u32,
194        constraint: (MAX_TIME_ALLOWED * ms_to_abs_time) as u32,
195        preemptible: 0,
196    };
197
198    // SAFETY: thread_id is a valid thread id
199    // SAFETY: thread_precedence_pthread_time_constraint_policy_data_t is passed as THREAD_TIME_CONSTRAINT_POLICY
200    let result = unsafe {
201        mach2::thread_policy::thread_policy_set(
202            thread_id,
203            THREAD_TIME_CONSTRAINT_POLICY,
204            &mut time_constraints as *mut _ as *mut _,
205            THREAD_TIME_CONSTRAINT_POLICY_COUNT,
206        )
207    };
208
209    if result != KERN_SUCCESS {
210        anyhow::bail!("failed to set thread time constraint policy");
211    }
212
213    Ok(())
214}
215
216extern "C" fn trampoline(context: *mut c_void) {
217    let runnable =
218        unsafe { Runnable::<RunnableMeta>::from_raw(NonNull::new_unchecked(context as *mut ())) };
219
220    let metadata = runnable.metadata();
221
222    // Check if the executor that spawned this task was closed
223    if metadata.is_closed() {
224        return;
225    }
226
227    let location = metadata.location;
228
229    let start = Instant::now();
230    let timing = TaskTiming {
231        location,
232        start,
233        end: None,
234    };
235
236    THREAD_TIMINGS.with(|timings| {
237        let mut timings = timings.lock();
238        let timings = &mut timings.timings;
239        if let Some(last_timing) = timings.iter_mut().rev().next() {
240            if last_timing.location == timing.location {
241                return;
242            }
243        }
244
245        timings.push_back(timing);
246    });
247
248    runnable.run();
249    let end = Instant::now();
250
251    THREAD_TIMINGS.with(|timings| {
252        let mut timings = timings.lock();
253        let timings = &mut timings.timings;
254        let Some(last_timing) = timings.iter_mut().rev().next() else {
255            return;
256        };
257        last_timing.end = Some(end);
258    });
259}