dispatcher.rs

  1use crate::{
  2    GLOBAL_THREAD_TIMINGS, PlatformDispatcher, RunnableVariant, THREAD_TIMINGS, TaskLabel,
  3    TaskTiming, ThreadTaskTimings,
  4};
  5use calloop::{
  6    EventLoop,
  7    channel::{self, Sender},
  8    timer::TimeoutAction,
  9};
 10use std::{
 11    thread,
 12    time::{Duration, Instant},
 13};
 14use util::ResultExt;
 15
 16struct TimerAfter {
 17    duration: Duration,
 18    runnable: RunnableVariant,
 19}
 20
 21pub(crate) struct LinuxDispatcher {
 22    main_sender: Sender<RunnableVariant>,
 23    timer_sender: Sender<TimerAfter>,
 24    background_sender: flume::Sender<RunnableVariant>,
 25    _background_threads: Vec<thread::JoinHandle<()>>,
 26    main_thread_id: thread::ThreadId,
 27}
 28
 29const MIN_THREADS: usize = 2;
 30
 31impl LinuxDispatcher {
 32    pub fn new(main_sender: Sender<RunnableVariant>) -> Self {
 33        let (background_sender, background_receiver) = flume::unbounded::<RunnableVariant>();
 34        let thread_count =
 35            std::thread::available_parallelism().map_or(MIN_THREADS, |i| i.get().max(MIN_THREADS));
 36
 37        let mut background_threads = (0..thread_count)
 38            .map(|i| {
 39                let receiver = background_receiver.clone();
 40                std::thread::Builder::new()
 41                    .name(format!("Worker-{i}"))
 42                    .spawn(move || {
 43                        for runnable in receiver {
 44                            let start = Instant::now();
 45
 46                            let mut location = match runnable {
 47                                RunnableVariant::Meta(runnable) => {
 48                                    let location = runnable.metadata().location;
 49                                    let timing = TaskTiming {
 50                                        location,
 51                                        start,
 52                                        end: None,
 53                                    };
 54                                    Self::add_task_timing(timing);
 55
 56                                    runnable.run();
 57                                    timing
 58                                }
 59                                RunnableVariant::Compat(runnable) => {
 60                                    let location = core::panic::Location::caller();
 61                                    let timing = TaskTiming {
 62                                        location,
 63                                        start,
 64                                        end: None,
 65                                    };
 66                                    Self::add_task_timing(timing);
 67
 68                                    runnable.run();
 69                                    timing
 70                                }
 71                            };
 72
 73                            let end = Instant::now();
 74                            location.end = Some(end);
 75                            Self::add_task_timing(location);
 76
 77                            log::trace!(
 78                                "background thread {}: ran runnable. took: {:?}",
 79                                i,
 80                                start.elapsed()
 81                            );
 82                        }
 83                    })
 84                    .unwrap()
 85            })
 86            .collect::<Vec<_>>();
 87
 88        let (timer_sender, timer_channel) = calloop::channel::channel::<TimerAfter>();
 89        let timer_thread = std::thread::Builder::new()
 90            .name("Timer".to_owned())
 91            .spawn(|| {
 92                let mut event_loop: EventLoop<()> =
 93                    EventLoop::try_new().expect("Failed to initialize timer loop!");
 94
 95                let handle = event_loop.handle();
 96                let timer_handle = event_loop.handle();
 97                handle
 98                    .insert_source(timer_channel, move |e, _, _| {
 99                        if let channel::Event::Msg(timer) = e {
100                            // This has to be in an option to satisfy the borrow checker. The callback below should only be scheduled once.
101                            let mut runnable = Some(timer.runnable);
102                            timer_handle
103                                .insert_source(
104                                    calloop::timer::Timer::from_duration(timer.duration),
105                                    move |_, _, _| {
106                                        if let Some(runnable) = runnable.take() {
107                                            let start = Instant::now();
108                                            let mut timing = match runnable {
109                                                RunnableVariant::Meta(runnable) => {
110                                                    let location = runnable.metadata().location;
111                                                    let timing = TaskTiming {
112                                                        location,
113                                                        start,
114                                                        end: None,
115                                                    };
116                                                    Self::add_task_timing(timing);
117
118                                                    runnable.run();
119                                                    timing
120                                                }
121                                                RunnableVariant::Compat(runnable) => {
122                                                    let timing = TaskTiming {
123                                                        location: core::panic::Location::caller(),
124                                                        start,
125                                                        end: None,
126                                                    };
127                                                    Self::add_task_timing(timing);
128
129                                                    runnable.run();
130                                                    timing
131                                                }
132                                            };
133                                            let end = Instant::now();
134
135                                            timing.end = Some(end);
136                                            Self::add_task_timing(timing);
137                                        }
138                                        TimeoutAction::Drop
139                                    },
140                                )
141                                .expect("Failed to start timer");
142                        }
143                    })
144                    .expect("Failed to start timer thread");
145
146                event_loop.run(None, &mut (), |_| {}).log_err();
147            })
148            .unwrap();
149
150        background_threads.push(timer_thread);
151
152        Self {
153            main_sender,
154            timer_sender,
155            background_sender,
156            _background_threads: background_threads,
157            main_thread_id: thread::current().id(),
158        }
159    }
160
161    pub(crate) fn add_task_timing(timing: TaskTiming) {
162        THREAD_TIMINGS.with(|timings| {
163            let mut timings = timings.lock();
164            let timings = &mut timings.timings;
165
166            if let Some(last_timing) = timings.iter_mut().rev().next() {
167                if last_timing.location == timing.location {
168                    last_timing.end = timing.end;
169                    return;
170                }
171            }
172
173            timings.push_back(timing);
174        });
175    }
176}
177
178impl PlatformDispatcher for LinuxDispatcher {
179    fn get_all_timings(&self) -> Vec<crate::ThreadTaskTimings> {
180        let global_timings = GLOBAL_THREAD_TIMINGS.lock();
181        ThreadTaskTimings::convert(&global_timings)
182    }
183
184    fn get_current_thread_timings(&self) -> Vec<crate::TaskTiming> {
185        THREAD_TIMINGS.with(|timings| {
186            let timings = timings.lock();
187            let timings = &timings.timings;
188
189            let mut vec = Vec::with_capacity(timings.len());
190
191            let (s1, s2) = timings.as_slices();
192            vec.extend_from_slice(s1);
193            vec.extend_from_slice(s2);
194            vec
195        })
196    }
197
198    fn is_main_thread(&self) -> bool {
199        thread::current().id() == self.main_thread_id
200    }
201
202    fn dispatch(&self, runnable: RunnableVariant, _: Option<TaskLabel>) {
203        self.background_sender.send(runnable).unwrap();
204    }
205
206    fn dispatch_on_main_thread(&self, runnable: RunnableVariant) {
207        self.main_sender.send(runnable).unwrap_or_else(|runnable| {
208            // NOTE: Runnable may wrap a Future that is !Send.
209            //
210            // This is usually safe because we only poll it on the main thread.
211            // However if the send fails, we know that:
212            // 1. main_receiver has been dropped (which implies the app is shutting down)
213            // 2. we are on a background thread.
214            // It is not safe to drop something !Send on the wrong thread, and
215            // the app will exit soon anyway, so we must forget the runnable.
216            std::mem::forget(runnable);
217        });
218    }
219
220    fn dispatch_after(&self, duration: Duration, runnable: RunnableVariant) {
221        self.timer_sender
222            .send(TimerAfter { duration, runnable })
223            .ok();
224    }
225}