dispatcher.rs

  1use crate::{PlatformDispatcher, TaskLabel};
  2use async_task::Runnable;
  3use calloop::{
  4    EventLoop,
  5    channel::{self, Sender},
  6    timer::TimeoutAction,
  7};
  8use parking::{Parker, Unparker};
  9use parking_lot::Mutex;
 10use std::{
 11    thread,
 12    time::{Duration, Instant},
 13};
 14use util::ResultExt;
 15
 16struct TimerAfter {
 17    duration: Duration,
 18    runnable: Runnable,
 19}
 20
 21pub(crate) struct LinuxDispatcher {
 22    parker: Mutex<Parker>,
 23    main_sender: Sender<Runnable>,
 24    timer_sender: Sender<TimerAfter>,
 25    background_sender: flume::Sender<Runnable>,
 26    _background_threads: Vec<thread::JoinHandle<()>>,
 27    main_thread_id: thread::ThreadId,
 28}
 29
 30impl LinuxDispatcher {
 31    pub fn new(main_sender: Sender<Runnable>) -> Self {
 32        let (background_sender, background_receiver) = flume::unbounded::<Runnable>();
 33        let thread_count = std::thread::available_parallelism()
 34            .map(|i| i.get())
 35            .unwrap_or(1);
 36
 37        let mut background_threads = (0..thread_count)
 38            .map(|i| {
 39                let receiver = background_receiver.clone();
 40                std::thread::spawn(move || {
 41                    for runnable in receiver {
 42                        let start = Instant::now();
 43
 44                        runnable.run();
 45
 46                        log::trace!(
 47                            "background thread {}: ran runnable. took: {:?}",
 48                            i,
 49                            start.elapsed()
 50                        );
 51                    }
 52                })
 53            })
 54            .collect::<Vec<_>>();
 55
 56        let (timer_sender, timer_channel) = calloop::channel::channel::<TimerAfter>();
 57        let timer_thread = std::thread::spawn(|| {
 58            let mut event_loop: EventLoop<()> =
 59                EventLoop::try_new().expect("Failed to initialize timer loop!");
 60
 61            let handle = event_loop.handle();
 62            let timer_handle = event_loop.handle();
 63            handle
 64                .insert_source(timer_channel, move |e, _, _| {
 65                    if let channel::Event::Msg(timer) = e {
 66                        // This has to be in an option to satisfy the borrow checker. The callback below should only be scheduled once.
 67                        let mut runnable = Some(timer.runnable);
 68                        timer_handle
 69                            .insert_source(
 70                                calloop::timer::Timer::from_duration(timer.duration),
 71                                move |_, _, _| {
 72                                    if let Some(runnable) = runnable.take() {
 73                                        runnable.run();
 74                                    }
 75                                    TimeoutAction::Drop
 76                                },
 77                            )
 78                            .expect("Failed to start timer");
 79                    }
 80                })
 81                .expect("Failed to start timer thread");
 82
 83            event_loop.run(None, &mut (), |_| {}).log_err();
 84        });
 85
 86        background_threads.push(timer_thread);
 87
 88        Self {
 89            parker: Mutex::new(Parker::new()),
 90            main_sender,
 91            timer_sender,
 92            background_sender,
 93            _background_threads: background_threads,
 94            main_thread_id: thread::current().id(),
 95        }
 96    }
 97}
 98
 99impl PlatformDispatcher for LinuxDispatcher {
100    fn is_main_thread(&self) -> bool {
101        thread::current().id() == self.main_thread_id
102    }
103
104    fn dispatch(&self, runnable: Runnable, _: Option<TaskLabel>) {
105        self.background_sender.send(runnable).unwrap();
106    }
107
108    fn dispatch_on_main_thread(&self, runnable: Runnable) {
109        self.main_sender.send(runnable).unwrap_or_else(|runnable| {
110            // NOTE: Runnable may wrap a Future that is !Send.
111            //
112            // This is usually safe because we only poll it on the main thread.
113            // However if the send fails, we know that:
114            // 1. main_receiver has been dropped (which implies the app is shutting down)
115            // 2. we are on a background thread.
116            // It is not safe to drop something !Send on the wrong thread, and
117            // the app will exit soon anyway, so we must forget the runnable.
118            std::mem::forget(runnable);
119        });
120    }
121
122    fn dispatch_after(&self, duration: Duration, runnable: Runnable) {
123        self.timer_sender
124            .send(TimerAfter { duration, runnable })
125            .ok();
126    }
127
128    fn park(&self, timeout: Option<Duration>) -> bool {
129        if let Some(timeout) = timeout {
130            self.parker.lock().park_timeout(timeout)
131        } else {
132            self.parker.lock().park();
133            true
134        }
135    }
136
137    fn unparker(&self) -> Unparker {
138        self.parker.lock().unparker()
139    }
140}