dispatcher.rs

  1#![allow(non_upper_case_globals)]
  2#![allow(non_camel_case_types)]
  3#![allow(non_snake_case)]
  4
  5use crate::{
  6    GLOBAL_THREAD_TIMINGS, PlatformDispatcher, Priority, RealtimePriority, RunnableMeta,
  7    RunnableVariant, THREAD_TIMINGS, TaskLabel, TaskTiming, ThreadTaskTimings,
  8};
  9
 10use anyhow::Context;
 11use async_task::Runnable;
 12use mach2::{
 13    kern_return::KERN_SUCCESS,
 14    mach_time::mach_timebase_info_data_t,
 15    thread_policy::{
 16        THREAD_EXTENDED_POLICY, THREAD_EXTENDED_POLICY_COUNT, THREAD_PRECEDENCE_POLICY,
 17        THREAD_PRECEDENCE_POLICY_COUNT, THREAD_TIME_CONSTRAINT_POLICY,
 18        THREAD_TIME_CONSTRAINT_POLICY_COUNT, thread_extended_policy_data_t,
 19        thread_precedence_policy_data_t, thread_time_constraint_policy_data_t,
 20    },
 21};
 22use objc::{
 23    class, msg_send,
 24    runtime::{BOOL, YES},
 25    sel, sel_impl,
 26};
 27use std::{
 28    ffi::c_void,
 29    mem::MaybeUninit,
 30    ptr::{NonNull, addr_of},
 31    time::{Duration, Instant},
 32};
 33use util::ResultExt;
 34
 35/// All items in the generated file are marked as pub, so we're gonna wrap it in a separate mod to prevent
 36/// these pub items from leaking into public API.
 37pub(crate) mod dispatch_sys {
 38    include!(concat!(env!("OUT_DIR"), "/dispatch_sys.rs"));
 39}
 40
 41use dispatch_sys::*;
 42pub(crate) fn dispatch_get_main_queue() -> dispatch_queue_t {
 43    addr_of!(_dispatch_main_q) as *const _ as dispatch_queue_t
 44}
 45
 46pub(crate) struct MacDispatcher;
 47
 48impl PlatformDispatcher for MacDispatcher {
 49    fn get_all_timings(&self) -> Vec<ThreadTaskTimings> {
 50        let global_timings = GLOBAL_THREAD_TIMINGS.lock();
 51        ThreadTaskTimings::convert(&global_timings)
 52    }
 53
 54    fn get_current_thread_timings(&self) -> Vec<TaskTiming> {
 55        THREAD_TIMINGS.with(|timings| {
 56            let timings = &timings.lock().timings;
 57
 58            let mut vec = Vec::with_capacity(timings.len());
 59
 60            let (s1, s2) = timings.as_slices();
 61            vec.extend_from_slice(s1);
 62            vec.extend_from_slice(s2);
 63            vec
 64        })
 65    }
 66
 67    fn is_main_thread(&self) -> bool {
 68        let is_main_thread: BOOL = unsafe { msg_send![class!(NSThread), isMainThread] };
 69        is_main_thread == YES
 70    }
 71
 72    fn dispatch(&self, runnable: RunnableVariant, _: Option<TaskLabel>, priority: Priority) {
 73        let (context, trampoline) = match runnable {
 74            RunnableVariant::Meta(runnable) => (
 75                runnable.into_raw().as_ptr() as *mut c_void,
 76                Some(trampoline as unsafe extern "C" fn(*mut c_void)),
 77            ),
 78            RunnableVariant::Compat(runnable) => (
 79                runnable.into_raw().as_ptr() as *mut c_void,
 80                Some(trampoline_compat as unsafe extern "C" fn(*mut c_void)),
 81            ),
 82        };
 83
 84        let queue_priority = match priority {
 85            Priority::Realtime(_) => unreachable!(),
 86            Priority::High => DISPATCH_QUEUE_PRIORITY_HIGH as isize,
 87            Priority::Medium => DISPATCH_QUEUE_PRIORITY_DEFAULT as isize,
 88            Priority::Low => DISPATCH_QUEUE_PRIORITY_LOW as isize,
 89        };
 90
 91        unsafe {
 92            dispatch_async_f(
 93                dispatch_get_global_queue(queue_priority, 0),
 94                context,
 95                trampoline,
 96            );
 97        }
 98    }
 99
100    fn dispatch_on_main_thread(&self, runnable: RunnableVariant, _priority: Priority) {
101        let (context, trampoline) = match runnable {
102            RunnableVariant::Meta(runnable) => (
103                runnable.into_raw().as_ptr() as *mut c_void,
104                Some(trampoline as unsafe extern "C" fn(*mut c_void)),
105            ),
106            RunnableVariant::Compat(runnable) => (
107                runnable.into_raw().as_ptr() as *mut c_void,
108                Some(trampoline_compat as unsafe extern "C" fn(*mut c_void)),
109            ),
110        };
111        unsafe {
112            dispatch_async_f(dispatch_get_main_queue(), context, trampoline);
113        }
114    }
115
116    fn dispatch_after(&self, duration: Duration, runnable: RunnableVariant) {
117        let (context, trampoline) = match runnable {
118            RunnableVariant::Meta(runnable) => (
119                runnable.into_raw().as_ptr() as *mut c_void,
120                Some(trampoline as unsafe extern "C" fn(*mut c_void)),
121            ),
122            RunnableVariant::Compat(runnable) => (
123                runnable.into_raw().as_ptr() as *mut c_void,
124                Some(trampoline_compat as unsafe extern "C" fn(*mut c_void)),
125            ),
126        };
127        unsafe {
128            let queue =
129                dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_HIGH.try_into().unwrap(), 0);
130            let when = dispatch_time(DISPATCH_TIME_NOW as u64, duration.as_nanos() as i64);
131            dispatch_after_f(when, queue, context, trampoline);
132        }
133    }
134
135    fn spawn_realtime(&self, priority: RealtimePriority, f: Box<dyn FnOnce() + Send>) {
136        std::thread::spawn(move || {
137            match priority {
138                RealtimePriority::Audio => set_audio_thread_priority(),
139                RealtimePriority::Other => set_high_thread_priority(),
140            }
141            .context(format!("for priority {:?}", priority))
142            .log_err();
143
144            f();
145        });
146    }
147}
148
149fn set_high_thread_priority() -> anyhow::Result<()> {
150    // SAFETY: always safe to call
151    let thread_id = unsafe { libc::pthread_self() };
152
153    // SAFETY: all sched_param members are valid when initialized to zero.
154    let mut sched_param = unsafe { MaybeUninit::<libc::sched_param>::zeroed().assume_init() };
155    sched_param.sched_priority = 45;
156
157    let result = unsafe { libc::pthread_setschedparam(thread_id, libc::SCHED_FIFO, &sched_param) };
158    if result != 0 {
159        anyhow::bail!("failed to set realtime thread priority")
160    }
161
162    Ok(())
163}
164
165fn set_audio_thread_priority() -> anyhow::Result<()> {
166    // https://chromium.googlesource.com/chromium/chromium/+/master/base/threading/platform_thread_mac.mm#93
167
168    // SAFETY: always safe to call
169    let thread_id = unsafe { libc::pthread_self() };
170
171    // SAFETY: thread_id is a valid thread id
172    let thread_id = unsafe { libc::pthread_mach_thread_np(thread_id) };
173
174    // Fixed priority thread
175    let mut policy = thread_extended_policy_data_t { timeshare: 0 };
176
177    // SAFETY: thread_id is a valid thread id
178    // SAFETY: thread_extended_policy_data_t is passed as THREAD_EXTENDED_POLICY
179    let result = unsafe {
180        mach2::thread_policy::thread_policy_set(
181            thread_id,
182            THREAD_EXTENDED_POLICY,
183            &mut policy as *mut _ as *mut _,
184            THREAD_EXTENDED_POLICY_COUNT,
185        )
186    };
187
188    if result != KERN_SUCCESS {
189        anyhow::bail!("failed to set thread extended policy");
190    }
191
192    // relatively high priority
193    let mut precedence = thread_precedence_policy_data_t { importance: 63 };
194
195    // SAFETY: thread_id is a valid thread id
196    // SAFETY: thread_precedence_policy_data_t is passed as THREAD_PRECEDENCE_POLICY
197    let result = unsafe {
198        mach2::thread_policy::thread_policy_set(
199            thread_id,
200            THREAD_PRECEDENCE_POLICY,
201            &mut precedence as *mut _ as *mut _,
202            THREAD_PRECEDENCE_POLICY_COUNT,
203        )
204    };
205
206    if result != KERN_SUCCESS {
207        anyhow::bail!("failed to set thread precedence policy");
208    }
209
210    const GUARANTEED_AUDIO_DUTY_CYCLE: f32 = 0.75;
211    const MAX_AUDIO_DUTY_CYCLE: f32 = 0.85;
212
213    // ~128 frames @ 44.1KHz
214    const TIME_QUANTUM: f32 = 2.9;
215
216    const AUDIO_TIME_NEEDED: f32 = GUARANTEED_AUDIO_DUTY_CYCLE * TIME_QUANTUM;
217    const MAX_TIME_ALLOWED: f32 = MAX_AUDIO_DUTY_CYCLE * TIME_QUANTUM;
218
219    let mut timebase_info = mach_timebase_info_data_t { numer: 0, denom: 0 };
220    // SAFETY: timebase_info is a valid pointer to a mach_timebase_info_data_t struct
221    unsafe { mach2::mach_time::mach_timebase_info(&mut timebase_info) };
222
223    let ms_to_abs_time = ((timebase_info.denom as f32) / (timebase_info.numer as f32)) * 1000000f32;
224
225    let mut time_constraints = thread_time_constraint_policy_data_t {
226        period: (TIME_QUANTUM * ms_to_abs_time) as u32,
227        computation: (AUDIO_TIME_NEEDED * ms_to_abs_time) as u32,
228        constraint: (MAX_TIME_ALLOWED * ms_to_abs_time) as u32,
229        preemptible: 0,
230    };
231
232    // SAFETY: thread_id is a valid thread id
233    // SAFETY: thread_precedence_pthread_time_constraint_policy_data_t is passed as THREAD_TIME_CONSTRAINT_POLICY
234    let result = unsafe {
235        mach2::thread_policy::thread_policy_set(
236            thread_id,
237            THREAD_TIME_CONSTRAINT_POLICY,
238            &mut time_constraints as *mut _ as *mut _,
239            THREAD_TIME_CONSTRAINT_POLICY_COUNT,
240        )
241    };
242
243    if result != KERN_SUCCESS {
244        anyhow::bail!("failed to set thread time constraint policy");
245    }
246
247    Ok(())
248}
249
250extern "C" fn trampoline(runnable: *mut c_void) {
251    let task =
252        unsafe { Runnable::<RunnableMeta>::from_raw(NonNull::new_unchecked(runnable as *mut ())) };
253
254    let location = task.metadata().location;
255
256    let start = Instant::now();
257    let timing = TaskTiming {
258        location,
259        start,
260        end: None,
261    };
262
263    THREAD_TIMINGS.with(|timings| {
264        let mut timings = timings.lock();
265        let timings = &mut timings.timings;
266        if let Some(last_timing) = timings.iter_mut().rev().next() {
267            if last_timing.location == timing.location {
268                return;
269            }
270        }
271
272        timings.push_back(timing);
273    });
274
275    task.run();
276    let end = Instant::now();
277
278    THREAD_TIMINGS.with(|timings| {
279        let mut timings = timings.lock();
280        let timings = &mut timings.timings;
281        let Some(last_timing) = timings.iter_mut().rev().next() else {
282            return;
283        };
284        last_timing.end = Some(end);
285    });
286}
287
288extern "C" fn trampoline_compat(runnable: *mut c_void) {
289    let task = unsafe { Runnable::<()>::from_raw(NonNull::new_unchecked(runnable as *mut ())) };
290
291    let location = core::panic::Location::caller();
292
293    let start = Instant::now();
294    let timing = TaskTiming {
295        location,
296        start,
297        end: None,
298    };
299    THREAD_TIMINGS.with(|timings| {
300        let mut timings = timings.lock();
301        let timings = &mut timings.timings;
302        if let Some(last_timing) = timings.iter_mut().rev().next() {
303            if last_timing.location == timing.location {
304                return;
305            }
306        }
307
308        timings.push_back(timing);
309    });
310
311    task.run();
312    let end = Instant::now();
313
314    THREAD_TIMINGS.with(|timings| {
315        let mut timings = timings.lock();
316        let timings = &mut timings.timings;
317        let Some(last_timing) = timings.iter_mut().rev().next() else {
318            return;
319        };
320        last_timing.end = Some(end);
321    });
322}