dispatcher.rs

  1use crate::{PlatformDispatcher, TaskLabel};
  2use async_task::Runnable;
  3use calloop::{
  4    EventLoop,
  5    channel::{self, Sender},
  6    timer::TimeoutAction,
  7};
  8use parking::{Parker, Unparker};
  9use parking_lot::Mutex;
 10use std::{
 11    thread,
 12    time::{Duration, Instant},
 13};
 14use util::ResultExt;
 15
 16struct TimerAfter {
 17    duration: Duration,
 18    runnable: Runnable,
 19}
 20
 21pub(crate) struct LinuxDispatcher {
 22    parker: Mutex<Parker>,
 23    main_sender: Sender<Runnable>,
 24    timer_sender: Sender<TimerAfter>,
 25    background_sender: flume::Sender<Runnable>,
 26    _background_threads: Vec<thread::JoinHandle<()>>,
 27    main_thread_id: thread::ThreadId,
 28}
 29
 30impl LinuxDispatcher {
 31    pub fn new(main_sender: Sender<Runnable>) -> Self {
 32        let (background_sender, background_receiver) = flume::unbounded::<Runnable>();
 33        let thread_count = std::thread::available_parallelism()
 34            .map(|i| i.get())
 35            .unwrap_or(1);
 36
 37        let mut background_threads = (0..thread_count)
 38            .map(|i| {
 39                let receiver = background_receiver.clone();
 40                std::thread::Builder::new()
 41                    .name(format!("Worker-{i}"))
 42                    .spawn(move || {
 43                        for runnable in receiver {
 44                            let start = Instant::now();
 45
 46                            runnable.run();
 47
 48                            log::trace!(
 49                                "background thread {}: ran runnable. took: {:?}",
 50                                i,
 51                                start.elapsed()
 52                            );
 53                        }
 54                    })
 55                    .unwrap()
 56            })
 57            .collect::<Vec<_>>();
 58
 59        let (timer_sender, timer_channel) = calloop::channel::channel::<TimerAfter>();
 60        let timer_thread = std::thread::Builder::new()
 61            .name("Timer".to_owned())
 62            .spawn(|| {
 63                let mut event_loop: EventLoop<()> =
 64                    EventLoop::try_new().expect("Failed to initialize timer loop!");
 65
 66                let handle = event_loop.handle();
 67                let timer_handle = event_loop.handle();
 68                handle
 69                    .insert_source(timer_channel, move |e, _, _| {
 70                        if let channel::Event::Msg(timer) = e {
 71                            // This has to be in an option to satisfy the borrow checker. The callback below should only be scheduled once.
 72                            let mut runnable = Some(timer.runnable);
 73                            timer_handle
 74                                .insert_source(
 75                                    calloop::timer::Timer::from_duration(timer.duration),
 76                                    move |_, _, _| {
 77                                        if let Some(runnable) = runnable.take() {
 78                                            runnable.run();
 79                                        }
 80                                        TimeoutAction::Drop
 81                                    },
 82                                )
 83                                .expect("Failed to start timer");
 84                        }
 85                    })
 86                    .expect("Failed to start timer thread");
 87
 88                event_loop.run(None, &mut (), |_| {}).log_err();
 89            })
 90            .unwrap();
 91
 92        background_threads.push(timer_thread);
 93
 94        Self {
 95            parker: Mutex::new(Parker::new()),
 96            main_sender,
 97            timer_sender,
 98            background_sender,
 99            _background_threads: background_threads,
100            main_thread_id: thread::current().id(),
101        }
102    }
103}
104
105impl PlatformDispatcher for LinuxDispatcher {
106    fn is_main_thread(&self) -> bool {
107        thread::current().id() == self.main_thread_id
108    }
109
110    fn dispatch(&self, runnable: Runnable, _: Option<TaskLabel>) {
111        self.background_sender.send(runnable).unwrap();
112    }
113
114    fn dispatch_on_main_thread(&self, runnable: Runnable) {
115        self.main_sender.send(runnable).unwrap_or_else(|runnable| {
116            // NOTE: Runnable may wrap a Future that is !Send.
117            //
118            // This is usually safe because we only poll it on the main thread.
119            // However if the send fails, we know that:
120            // 1. main_receiver has been dropped (which implies the app is shutting down)
121            // 2. we are on a background thread.
122            // It is not safe to drop something !Send on the wrong thread, and
123            // the app will exit soon anyway, so we must forget the runnable.
124            std::mem::forget(runnable);
125        });
126    }
127
128    fn dispatch_after(&self, duration: Duration, runnable: Runnable) {
129        self.timer_sender
130            .send(TimerAfter { duration, runnable })
131            .ok();
132    }
133
134    fn park(&self, timeout: Option<Duration>) -> bool {
135        if let Some(timeout) = timeout {
136            self.parker.lock().park_timeout(timeout)
137        } else {
138            self.parker.lock().park();
139            true
140        }
141    }
142
143    fn unparker(&self) -> Unparker {
144        self.parker.lock().unparker()
145    }
146}