dispatcher.rs

  1use calloop::{
  2    EventLoop, PostAction,
  3    channel::{self, Sender},
  4    timer::TimeoutAction,
  5};
  6use util::ResultExt;
  7
  8use std::{
  9    mem::MaybeUninit,
 10    thread,
 11    time::{Duration, Instant},
 12};
 13
 14use crate::{
 15    GLOBAL_THREAD_TIMINGS, PlatformDispatcher, Priority, PriorityQueueReceiver,
 16    PriorityQueueSender, RunnableVariant, THREAD_TIMINGS, TaskTiming, ThreadTaskTimings, profiler,
 17};
 18
 19struct TimerAfter {
 20    duration: Duration,
 21    runnable: RunnableVariant,
 22}
 23
 24pub(crate) struct LinuxDispatcher {
 25    main_sender: PriorityQueueCalloopSender<RunnableVariant>,
 26    timer_sender: Sender<TimerAfter>,
 27    background_sender: PriorityQueueSender<RunnableVariant>,
 28    _background_threads: Vec<thread::JoinHandle<()>>,
 29    main_thread_id: thread::ThreadId,
 30}
 31
 32const MIN_THREADS: usize = 2;
 33
 34impl LinuxDispatcher {
 35    pub fn new(main_sender: PriorityQueueCalloopSender<RunnableVariant>) -> Self {
 36        let (background_sender, background_receiver) = PriorityQueueReceiver::new();
 37        let thread_count =
 38            std::thread::available_parallelism().map_or(MIN_THREADS, |i| i.get().max(MIN_THREADS));
 39
 40        let mut background_threads = (0..thread_count)
 41            .map(|i| {
 42                let mut receiver: PriorityQueueReceiver<RunnableVariant> =
 43                    background_receiver.clone();
 44                std::thread::Builder::new()
 45                    .name(format!("Worker-{i}"))
 46                    .spawn(move || {
 47                        for runnable in receiver.iter() {
 48                            // Check if the executor that spawned this task was closed
 49                            if runnable.metadata().is_closed() {
 50                                continue;
 51                            }
 52
 53                            let start = Instant::now();
 54
 55                            let location = runnable.metadata().location;
 56                            let mut timing = TaskTiming {
 57                                location,
 58                                start,
 59                                end: None,
 60                            };
 61                            profiler::add_task_timing(timing);
 62
 63                            runnable.run();
 64
 65                            let end = Instant::now();
 66                            timing.end = Some(end);
 67                            profiler::add_task_timing(timing);
 68
 69                            log::trace!(
 70                                "background thread {}: ran runnable. took: {:?}",
 71                                i,
 72                                start.elapsed()
 73                            );
 74                        }
 75                    })
 76                    .unwrap()
 77            })
 78            .collect::<Vec<_>>();
 79
 80        let (timer_sender, timer_channel) = calloop::channel::channel::<TimerAfter>();
 81        let timer_thread = std::thread::Builder::new()
 82            .name("Timer".to_owned())
 83            .spawn(move || {
 84                let mut event_loop: EventLoop<()> =
 85                    EventLoop::try_new().expect("Failed to initialize timer loop!");
 86
 87                let handle = event_loop.handle();
 88                let timer_handle = event_loop.handle();
 89                handle
 90                    .insert_source(timer_channel, move |e, _, _| {
 91                        if let channel::Event::Msg(timer) = e {
 92                            let mut runnable = Some(timer.runnable);
 93                            timer_handle
 94                                .insert_source(
 95                                    calloop::timer::Timer::from_duration(timer.duration),
 96                                    move |_, _, _| {
 97                                        if let Some(runnable) = runnable.take() {
 98                                            // Check if the executor that spawned this task was closed
 99                                            if runnable.metadata().is_closed() {
100                                                return TimeoutAction::Drop;
101                                            }
102
103                                            let start = Instant::now();
104                                            let location = runnable.metadata().location;
105                                            let mut timing = TaskTiming {
106                                                location,
107                                                start,
108                                                end: None,
109                                            };
110                                            profiler::add_task_timing(timing);
111
112                                            runnable.run();
113                                            let end = Instant::now();
114
115                                            timing.end = Some(end);
116                                            profiler::add_task_timing(timing);
117                                        }
118                                        TimeoutAction::Drop
119                                    },
120                                )
121                                .expect("Failed to start timer");
122                        }
123                    })
124                    .expect("Failed to start timer thread");
125
126                event_loop.run(None, &mut (), |_| {}).log_err();
127            })
128            .unwrap();
129
130        background_threads.push(timer_thread);
131
132        Self {
133            main_sender,
134            timer_sender,
135            background_sender,
136            _background_threads: background_threads,
137            main_thread_id: thread::current().id(),
138        }
139    }
140}
141
142impl PlatformDispatcher for LinuxDispatcher {
143    fn get_all_timings(&self) -> Vec<crate::ThreadTaskTimings> {
144        let global_timings = GLOBAL_THREAD_TIMINGS.lock();
145        ThreadTaskTimings::convert(&global_timings)
146    }
147
148    fn get_current_thread_timings(&self) -> Vec<crate::TaskTiming> {
149        THREAD_TIMINGS.with(|timings| {
150            let timings = timings.lock();
151            let timings = &timings.timings;
152
153            let mut vec = Vec::with_capacity(timings.len());
154
155            let (s1, s2) = timings.as_slices();
156            vec.extend_from_slice(s1);
157            vec.extend_from_slice(s2);
158            vec
159        })
160    }
161
162    fn is_main_thread(&self) -> bool {
163        thread::current().id() == self.main_thread_id
164    }
165
166    fn dispatch(&self, runnable: RunnableVariant, priority: Priority) {
167        self.background_sender
168            .send(priority, runnable)
169            .unwrap_or_else(|_| panic!("blocking sender returned without value"));
170    }
171
172    fn dispatch_on_main_thread(&self, runnable: RunnableVariant, priority: Priority) {
173        self.main_sender
174            .send(priority, runnable)
175            .unwrap_or_else(|runnable| {
176                // NOTE: Runnable may wrap a Future that is !Send.
177                //
178                // This is usually safe because we only poll it on the main thread.
179                // However if the send fails, we know that:
180                // 1. main_receiver has been dropped (which implies the app is shutting down)
181                // 2. we are on a background thread.
182                // It is not safe to drop something !Send on the wrong thread, and
183                // the app will exit soon anyway, so we must forget the runnable.
184                std::mem::forget(runnable);
185            });
186    }
187
188    fn dispatch_after(&self, duration: Duration, runnable: RunnableVariant) {
189        self.timer_sender
190            .send(TimerAfter { duration, runnable })
191            .ok();
192    }
193
194    fn spawn_realtime(&self, f: Box<dyn FnOnce() + Send>) {
195        std::thread::spawn(move || {
196            // SAFETY: always safe to call
197            let thread_id = unsafe { libc::pthread_self() };
198
199            let policy = libc::SCHED_FIFO;
200            let sched_priority = 65;
201
202            // SAFETY: all sched_param members are valid when initialized to zero.
203            let mut sched_param =
204                unsafe { MaybeUninit::<libc::sched_param>::zeroed().assume_init() };
205            sched_param.sched_priority = sched_priority;
206            // SAFETY: sched_param is a valid initialized structure
207            let result = unsafe { libc::pthread_setschedparam(thread_id, policy, &sched_param) };
208            if result != 0 {
209                log::warn!("failed to set realtime thread priority");
210            }
211
212            f();
213        });
214    }
215}
216
217pub struct PriorityQueueCalloopSender<T> {
218    sender: PriorityQueueSender<T>,
219    ping: calloop::ping::Ping,
220}
221
222impl<T> PriorityQueueCalloopSender<T> {
223    fn new(tx: PriorityQueueSender<T>, ping: calloop::ping::Ping) -> Self {
224        Self { sender: tx, ping }
225    }
226
227    fn send(&self, priority: Priority, item: T) -> Result<(), crate::queue::SendError<T>> {
228        let res = self.sender.send(priority, item);
229        if res.is_ok() {
230            self.ping.ping();
231        }
232        res
233    }
234}
235
236impl<T> Drop for PriorityQueueCalloopSender<T> {
237    fn drop(&mut self) {
238        self.ping.ping();
239    }
240}
241
242pub struct PriorityQueueCalloopReceiver<T> {
243    receiver: PriorityQueueReceiver<T>,
244    source: calloop::ping::PingSource,
245    ping: calloop::ping::Ping,
246}
247
248impl<T> PriorityQueueCalloopReceiver<T> {
249    pub fn new() -> (PriorityQueueCalloopSender<T>, Self) {
250        let (ping, source) = calloop::ping::make_ping().expect("Failed to create a Ping.");
251
252        let (tx, rx) = PriorityQueueReceiver::new();
253
254        (
255            PriorityQueueCalloopSender::new(tx, ping.clone()),
256            Self {
257                receiver: rx,
258                source,
259                ping,
260            },
261        )
262    }
263}
264
265use calloop::channel::Event;
266
267#[derive(Debug)]
268pub struct ChannelError(calloop::ping::PingError);
269
270impl std::fmt::Display for ChannelError {
271    #[cfg_attr(feature = "nightly_coverage", coverage(off))]
272    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
273        std::fmt::Display::fmt(&self.0, f)
274    }
275}
276
277impl std::error::Error for ChannelError {
278    #[cfg_attr(feature = "nightly_coverage", coverage(off))]
279    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
280        Some(&self.0)
281    }
282}
283
284impl<T> calloop::EventSource for PriorityQueueCalloopReceiver<T> {
285    type Event = Event<T>;
286    type Metadata = ();
287    type Ret = ();
288    type Error = ChannelError;
289
290    fn process_events<F>(
291        &mut self,
292        readiness: calloop::Readiness,
293        token: calloop::Token,
294        mut callback: F,
295    ) -> Result<calloop::PostAction, Self::Error>
296    where
297        F: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret,
298    {
299        let mut clear_readiness = false;
300        let mut disconnected = false;
301
302        let action = self
303            .source
304            .process_events(readiness, token, |(), &mut ()| {
305                let mut is_empty = true;
306
307                let mut receiver = self.receiver.clone();
308                for runnable in receiver.try_iter() {
309                    match runnable {
310                        Ok(r) => {
311                            callback(Event::Msg(r), &mut ());
312                            is_empty = false;
313                        }
314                        Err(_) => {
315                            disconnected = true;
316                        }
317                    }
318                }
319
320                if disconnected {
321                    callback(Event::Closed, &mut ());
322                }
323
324                if is_empty {
325                    clear_readiness = true;
326                }
327            })
328            .map_err(ChannelError)?;
329
330        if disconnected {
331            Ok(PostAction::Remove)
332        } else if clear_readiness {
333            Ok(action)
334        } else {
335            // Re-notify the ping source so we can try again.
336            self.ping.ping();
337            Ok(PostAction::Continue)
338        }
339    }
340
341    fn register(
342        &mut self,
343        poll: &mut calloop::Poll,
344        token_factory: &mut calloop::TokenFactory,
345    ) -> calloop::Result<()> {
346        self.source.register(poll, token_factory)
347    }
348
349    fn reregister(
350        &mut self,
351        poll: &mut calloop::Poll,
352        token_factory: &mut calloop::TokenFactory,
353    ) -> calloop::Result<()> {
354        self.source.reregister(poll, token_factory)
355    }
356
357    fn unregister(&mut self, poll: &mut calloop::Poll) -> calloop::Result<()> {
358        self.source.unregister(poll)
359    }
360}
361
362#[cfg(test)]
363mod tests {
364    use super::*;
365
366    #[test]
367    fn calloop_works() {
368        let mut event_loop = calloop::EventLoop::try_new().unwrap();
369        let handle = event_loop.handle();
370
371        let (tx, rx) = PriorityQueueCalloopReceiver::new();
372
373        struct Data {
374            got_msg: bool,
375            got_closed: bool,
376        }
377
378        let mut data = Data {
379            got_msg: false,
380            got_closed: false,
381        };
382
383        let _channel_token = handle
384            .insert_source(rx, move |evt, &mut (), data: &mut Data| match evt {
385                Event::Msg(()) => {
386                    data.got_msg = true;
387                }
388
389                Event::Closed => {
390                    data.got_closed = true;
391                }
392            })
393            .unwrap();
394
395        // nothing is sent, nothing is received
396        event_loop
397            .dispatch(Some(::std::time::Duration::ZERO), &mut data)
398            .unwrap();
399
400        assert!(!data.got_msg);
401        assert!(!data.got_closed);
402        // a message is send
403
404        tx.send(Priority::Medium, ()).unwrap();
405        event_loop
406            .dispatch(Some(::std::time::Duration::ZERO), &mut data)
407            .unwrap();
408
409        assert!(data.got_msg);
410        assert!(!data.got_closed);
411
412        // the sender is dropped
413        drop(tx);
414        event_loop
415            .dispatch(Some(::std::time::Duration::ZERO), &mut data)
416            .unwrap();
417
418        assert!(data.got_msg);
419        assert!(data.got_closed);
420    }
421}
422
423// running 1 test
424// test platform::linux::dispatcher::tests::tomato ... FAILED
425
426// failures:
427
428// ---- platform::linux::dispatcher::tests::tomato stdout ----
429// [crates/gpui/src/platform/linux/dispatcher.rs:262:9]
430// returning 1 tasks to process
431// [crates/gpui/src/platform/linux/dispatcher.rs:480:75] evt = Msg(
432//     (),
433// )
434// returning 0 tasks to process
435
436// thread 'platform::linux::dispatcher::tests::tomato' (478301) panicked at crates/gpui/src/platform/linux/dispatcher.rs:515:9:
437// assertion failed: data.got_closed
438// note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace