dispatcher.rs

  1use crate::{PlatformDispatcher, TaskLabel};
  2use async_task::Runnable;
  3use calloop::{
  4    channel::{self, Sender},
  5    timer::TimeoutAction,
  6    EventLoop,
  7};
  8use mio::Waker;
  9use parking::{Parker, Unparker};
 10use parking_lot::Mutex;
 11use std::{sync::Arc, thread, time::Duration};
 12use util::ResultExt;
 13
 14struct TimerAfter {
 15    duration: Duration,
 16    runnable: Runnable,
 17}
 18
 19pub(crate) struct LinuxDispatcher {
 20    parker: Mutex<Parker>,
 21    main_sender: Sender<Runnable>,
 22    main_waker: Option<Arc<Waker>>,
 23    timer_sender: Sender<TimerAfter>,
 24    background_sender: flume::Sender<Runnable>,
 25    _background_threads: Vec<thread::JoinHandle<()>>,
 26    main_thread_id: thread::ThreadId,
 27}
 28
 29impl LinuxDispatcher {
 30    pub fn new(main_sender: Sender<Runnable>, main_waker: Option<Arc<Waker>>) -> Self {
 31        let (background_sender, background_receiver) = flume::unbounded::<Runnable>();
 32        let thread_count = std::thread::available_parallelism()
 33            .map(|i| i.get())
 34            .unwrap_or(1);
 35
 36        let mut background_threads = (0..thread_count)
 37            .map(|_| {
 38                let receiver = background_receiver.clone();
 39                std::thread::spawn(move || {
 40                    for runnable in receiver {
 41                        runnable.run();
 42                    }
 43                })
 44            })
 45            .collect::<Vec<_>>();
 46
 47        let (timer_sender, timer_channel) = calloop::channel::channel::<TimerAfter>();
 48        let timer_thread = std::thread::spawn(|| {
 49            let mut event_loop: EventLoop<()> =
 50                EventLoop::try_new().expect("Failed to initialize timer loop!");
 51
 52            let handle = event_loop.handle();
 53            let timer_handle = event_loop.handle();
 54            handle
 55                .insert_source(timer_channel, move |e, _, _| {
 56                    if let channel::Event::Msg(timer) = e {
 57                        // This has to be in an option to satisfy the borrow checker. The callback below should only be scheduled once.
 58                        let mut runnable = Some(timer.runnable);
 59                        timer_handle
 60                            .insert_source(
 61                                calloop::timer::Timer::from_duration(timer.duration),
 62                                move |_, _, _| {
 63                                    if let Some(runnable) = runnable.take() {
 64                                        runnable.run();
 65                                    }
 66                                    TimeoutAction::Drop
 67                                },
 68                            )
 69                            .expect("Failed to start timer");
 70                    }
 71                })
 72                .expect("Failed to start timer thread");
 73
 74            event_loop.run(None, &mut (), |_| {}).log_err();
 75        });
 76
 77        background_threads.push(timer_thread);
 78
 79        Self {
 80            parker: Mutex::new(Parker::new()),
 81            main_sender,
 82            main_waker,
 83            timer_sender,
 84            background_sender,
 85            _background_threads: background_threads,
 86            main_thread_id: thread::current().id(),
 87        }
 88    }
 89}
 90
 91impl PlatformDispatcher for LinuxDispatcher {
 92    fn is_main_thread(&self) -> bool {
 93        thread::current().id() == self.main_thread_id
 94    }
 95
 96    fn dispatch(&self, runnable: Runnable, _: Option<TaskLabel>) {
 97        self.background_sender.send(runnable).unwrap();
 98    }
 99
100    fn dispatch_on_main_thread(&self, runnable: Runnable) {
101        self.main_sender.send(runnable).ok();
102        if let Some(main_waker) = self.main_waker.as_ref() {
103            main_waker.wake().ok();
104        }
105    }
106
107    fn dispatch_after(&self, duration: Duration, runnable: Runnable) {
108        self.timer_sender
109            .send(TimerAfter { duration, runnable })
110            .ok();
111    }
112
113    fn park(&self, timeout: Option<Duration>) -> bool {
114        if let Some(timeout) = timeout {
115            self.parker.lock().park_timeout(timeout)
116        } else {
117            self.parker.lock().park();
118            true
119        }
120    }
121
122    fn unparker(&self) -> Unparker {
123        self.parker.lock().unparker()
124    }
125}