dispatcher.rs

  1use dispatch2::{DispatchQueue, DispatchQueueGlobalPriority, DispatchTime, GlobalQueueIdentifier};
  2use gpui::{
  3    GLOBAL_THREAD_TIMINGS, PlatformDispatcher, Priority, RunnableMeta, RunnableVariant,
  4    THREAD_TIMINGS, TaskTiming, ThreadTaskTimings,
  5};
  6use mach2::{
  7    kern_return::KERN_SUCCESS,
  8    mach_time::mach_timebase_info_data_t,
  9    thread_policy::{
 10        THREAD_EXTENDED_POLICY, THREAD_EXTENDED_POLICY_COUNT, THREAD_PRECEDENCE_POLICY,
 11        THREAD_PRECEDENCE_POLICY_COUNT, THREAD_TIME_CONSTRAINT_POLICY,
 12        THREAD_TIME_CONSTRAINT_POLICY_COUNT, thread_extended_policy_data_t,
 13        thread_precedence_policy_data_t, thread_time_constraint_policy_data_t,
 14    },
 15};
 16use util::ResultExt;
 17
 18use async_task::Runnable;
 19use objc::{
 20    class, msg_send,
 21    runtime::{BOOL, YES},
 22    sel, sel_impl,
 23};
 24use std::{
 25    ffi::c_void,
 26    ptr::NonNull,
 27    time::{Duration, Instant},
 28};
 29
 30pub(crate) struct MacDispatcher;
 31
 32impl MacDispatcher {
 33    pub fn new() -> Self {
 34        Self
 35    }
 36}
 37
 38impl PlatformDispatcher for MacDispatcher {
 39    fn get_all_timings(&self) -> Vec<ThreadTaskTimings> {
 40        let global_timings = GLOBAL_THREAD_TIMINGS.lock();
 41        ThreadTaskTimings::convert(&global_timings)
 42    }
 43
 44    fn get_current_thread_timings(&self) -> ThreadTaskTimings {
 45        THREAD_TIMINGS.with(|timings| {
 46            let timings = timings.lock();
 47            let thread_name = timings.thread_name.clone();
 48            let total_pushed = timings.total_pushed;
 49            let timings = &timings.timings;
 50
 51            let mut vec = Vec::with_capacity(timings.len());
 52
 53            let (s1, s2) = timings.as_slices();
 54            vec.extend_from_slice(s1);
 55            vec.extend_from_slice(s2);
 56
 57            ThreadTaskTimings {
 58                thread_name,
 59                thread_id: std::thread::current().id(),
 60                timings: vec,
 61                total_pushed,
 62            }
 63        })
 64    }
 65
 66    fn is_main_thread(&self) -> bool {
 67        let is_main_thread: BOOL = unsafe { msg_send![class!(NSThread), isMainThread] };
 68        is_main_thread == YES
 69    }
 70
 71    fn dispatch(&self, runnable: RunnableVariant, priority: Priority) {
 72        let context = runnable.into_raw().as_ptr() as *mut c_void;
 73
 74        let queue_priority = match priority {
 75            Priority::RealtimeAudio => {
 76                panic!("RealtimeAudio priority should use spawn_realtime, not dispatch")
 77            }
 78            Priority::High => DispatchQueueGlobalPriority::High,
 79            Priority::Medium => DispatchQueueGlobalPriority::Default,
 80            Priority::Low => DispatchQueueGlobalPriority::Low,
 81        };
 82
 83        unsafe {
 84            DispatchQueue::global_queue(GlobalQueueIdentifier::Priority(queue_priority))
 85                .exec_async_f(context, trampoline);
 86        }
 87    }
 88
 89    fn dispatch_on_main_thread(&self, runnable: RunnableVariant, _priority: Priority) {
 90        let context = runnable.into_raw().as_ptr() as *mut c_void;
 91        unsafe {
 92            DispatchQueue::main().exec_async_f(context, trampoline);
 93        }
 94    }
 95
 96    fn dispatch_after(&self, duration: Duration, runnable: RunnableVariant) {
 97        let context = runnable.into_raw().as_ptr() as *mut c_void;
 98        let queue = DispatchQueue::global_queue(GlobalQueueIdentifier::Priority(
 99            DispatchQueueGlobalPriority::High,
100        ));
101        let when = DispatchTime::NOW.time(duration.as_nanos() as i64);
102        unsafe {
103            DispatchQueue::exec_after_f(when, &queue, context, trampoline);
104        }
105    }
106
107    fn spawn_realtime(&self, f: Box<dyn FnOnce() + Send>) {
108        std::thread::spawn(move || {
109            set_audio_thread_priority().log_err();
110            f();
111        });
112    }
113}
114
115fn set_audio_thread_priority() -> anyhow::Result<()> {
116    // https://chromium.googlesource.com/chromium/chromium/+/master/base/threading/platform_thread_mac.mm#93
117
118    // SAFETY: always safe to call
119    let thread_id = unsafe { libc::pthread_self() };
120
121    // SAFETY: thread_id is a valid thread id
122    let thread_id = unsafe { libc::pthread_mach_thread_np(thread_id) };
123
124    // Fixed priority thread
125    let mut policy = thread_extended_policy_data_t { timeshare: 0 };
126
127    // SAFETY: thread_id is a valid thread id
128    // SAFETY: thread_extended_policy_data_t is passed as THREAD_EXTENDED_POLICY
129    let result = unsafe {
130        mach2::thread_policy::thread_policy_set(
131            thread_id,
132            THREAD_EXTENDED_POLICY,
133            &mut policy as *mut _ as *mut _,
134            THREAD_EXTENDED_POLICY_COUNT,
135        )
136    };
137
138    if result != KERN_SUCCESS {
139        anyhow::bail!("failed to set thread extended policy");
140    }
141
142    // relatively high priority
143    let mut precedence = thread_precedence_policy_data_t { importance: 63 };
144
145    // SAFETY: thread_id is a valid thread id
146    // SAFETY: thread_precedence_policy_data_t is passed as THREAD_PRECEDENCE_POLICY
147    let result = unsafe {
148        mach2::thread_policy::thread_policy_set(
149            thread_id,
150            THREAD_PRECEDENCE_POLICY,
151            &mut precedence as *mut _ as *mut _,
152            THREAD_PRECEDENCE_POLICY_COUNT,
153        )
154    };
155
156    if result != KERN_SUCCESS {
157        anyhow::bail!("failed to set thread precedence policy");
158    }
159
160    const GUARANTEED_AUDIO_DUTY_CYCLE: f32 = 0.75;
161    const MAX_AUDIO_DUTY_CYCLE: f32 = 0.85;
162
163    // ~128 frames @ 44.1KHz
164    const TIME_QUANTUM: f32 = 2.9;
165
166    const AUDIO_TIME_NEEDED: f32 = GUARANTEED_AUDIO_DUTY_CYCLE * TIME_QUANTUM;
167    const MAX_TIME_ALLOWED: f32 = MAX_AUDIO_DUTY_CYCLE * TIME_QUANTUM;
168
169    let mut timebase_info = mach_timebase_info_data_t { numer: 0, denom: 0 };
170    // SAFETY: timebase_info is a valid pointer to a mach_timebase_info_data_t struct
171    unsafe { mach2::mach_time::mach_timebase_info(&mut timebase_info) };
172
173    let ms_to_abs_time = ((timebase_info.denom as f32) / (timebase_info.numer as f32)) * 1000000f32;
174
175    let mut time_constraints = thread_time_constraint_policy_data_t {
176        period: (TIME_QUANTUM * ms_to_abs_time) as u32,
177        computation: (AUDIO_TIME_NEEDED * ms_to_abs_time) as u32,
178        constraint: (MAX_TIME_ALLOWED * ms_to_abs_time) as u32,
179        preemptible: 0,
180    };
181
182    // SAFETY: thread_id is a valid thread id
183    // SAFETY: thread_precedence_pthread_time_constraint_policy_data_t is passed as THREAD_TIME_CONSTRAINT_POLICY
184    let result = unsafe {
185        mach2::thread_policy::thread_policy_set(
186            thread_id,
187            THREAD_TIME_CONSTRAINT_POLICY,
188            &mut time_constraints as *mut _ as *mut _,
189            THREAD_TIME_CONSTRAINT_POLICY_COUNT,
190        )
191    };
192
193    if result != KERN_SUCCESS {
194        anyhow::bail!("failed to set thread time constraint policy");
195    }
196
197    Ok(())
198}
199
200extern "C" fn trampoline(context: *mut c_void) {
201    let runnable =
202        unsafe { Runnable::<RunnableMeta>::from_raw(NonNull::new_unchecked(context as *mut ())) };
203
204    let metadata = runnable.metadata();
205
206    // Check if the executor that spawned this task was closed
207    if metadata.is_closed() {
208        return;
209    }
210
211    let location = metadata.location;
212
213    let start = Instant::now();
214    let timing = TaskTiming {
215        location,
216        start,
217        end: None,
218    };
219
220    THREAD_TIMINGS.with(|timings| {
221        let mut timings = timings.lock();
222        let timings = &mut timings.timings;
223        if let Some(last_timing) = timings.iter_mut().rev().next() {
224            if last_timing.location == timing.location {
225                return;
226            }
227        }
228
229        timings.push_back(timing);
230    });
231
232    runnable.run();
233    let end = Instant::now();
234
235    THREAD_TIMINGS.with(|timings| {
236        let mut timings = timings.lock();
237        let timings = &mut timings.timings;
238        let Some(last_timing) = timings.iter_mut().rev().next() else {
239            return;
240        };
241        last_timing.end = Some(end);
242    });
243}