dispatcher.rs

  1use crate::{PlatformDispatcher, TaskLabel};
  2use async_task::Runnable;
  3use calloop::{
  4    EventLoop,
  5    channel::{self, Sender},
  6    timer::TimeoutAction,
  7};
  8use std::{
  9    thread,
 10    time::{Duration, Instant},
 11};
 12use util::ResultExt;
 13
 14struct TimerAfter {
 15    duration: Duration,
 16    runnable: Runnable,
 17}
 18
 19pub(crate) struct LinuxDispatcher {
 20    main_sender: Sender<Runnable>,
 21    timer_sender: Sender<TimerAfter>,
 22    background_sender: flume::Sender<Runnable>,
 23    _background_threads: Vec<thread::JoinHandle<()>>,
 24    main_thread_id: thread::ThreadId,
 25}
 26
 27impl LinuxDispatcher {
 28    pub fn new(main_sender: Sender<Runnable>) -> Self {
 29        let (background_sender, background_receiver) = flume::unbounded::<Runnable>();
 30        let thread_count = std::thread::available_parallelism()
 31            .map(|i| i.get())
 32            .unwrap_or(1);
 33
 34        let mut background_threads = (0..thread_count)
 35            .map(|i| {
 36                let receiver = background_receiver.clone();
 37                std::thread::Builder::new()
 38                    .name(format!("Worker-{i}"))
 39                    .spawn(move || {
 40                        for runnable in receiver {
 41                            let start = Instant::now();
 42
 43                            runnable.run();
 44
 45                            log::trace!(
 46                                "background thread {}: ran runnable. took: {:?}",
 47                                i,
 48                                start.elapsed()
 49                            );
 50                        }
 51                    })
 52                    .unwrap()
 53            })
 54            .collect::<Vec<_>>();
 55
 56        let (timer_sender, timer_channel) = calloop::channel::channel::<TimerAfter>();
 57        let timer_thread = std::thread::Builder::new()
 58            .name("Timer".to_owned())
 59            .spawn(|| {
 60                let mut event_loop: EventLoop<()> =
 61                    EventLoop::try_new().expect("Failed to initialize timer loop!");
 62
 63                let handle = event_loop.handle();
 64                let timer_handle = event_loop.handle();
 65                handle
 66                    .insert_source(timer_channel, move |e, _, _| {
 67                        if let channel::Event::Msg(timer) = e {
 68                            // This has to be in an option to satisfy the borrow checker. The callback below should only be scheduled once.
 69                            let mut runnable = Some(timer.runnable);
 70                            timer_handle
 71                                .insert_source(
 72                                    calloop::timer::Timer::from_duration(timer.duration),
 73                                    move |_, _, _| {
 74                                        if let Some(runnable) = runnable.take() {
 75                                            runnable.run();
 76                                        }
 77                                        TimeoutAction::Drop
 78                                    },
 79                                )
 80                                .expect("Failed to start timer");
 81                        }
 82                    })
 83                    .expect("Failed to start timer thread");
 84
 85                event_loop.run(None, &mut (), |_| {}).log_err();
 86            })
 87            .unwrap();
 88
 89        background_threads.push(timer_thread);
 90
 91        Self {
 92            main_sender,
 93            timer_sender,
 94            background_sender,
 95            _background_threads: background_threads,
 96            main_thread_id: thread::current().id(),
 97        }
 98    }
 99}
100
101impl PlatformDispatcher for LinuxDispatcher {
102    fn is_main_thread(&self) -> bool {
103        thread::current().id() == self.main_thread_id
104    }
105
106    fn dispatch(&self, runnable: Runnable, _: Option<TaskLabel>) {
107        self.background_sender.send(runnable).unwrap();
108    }
109
110    fn dispatch_on_main_thread(&self, runnable: Runnable) {
111        self.main_sender.send(runnable).unwrap_or_else(|runnable| {
112            // NOTE: Runnable may wrap a Future that is !Send.
113            //
114            // This is usually safe because we only poll it on the main thread.
115            // However if the send fails, we know that:
116            // 1. main_receiver has been dropped (which implies the app is shutting down)
117            // 2. we are on a background thread.
118            // It is not safe to drop something !Send on the wrong thread, and
119            // the app will exit soon anyway, so we must forget the runnable.
120            std::mem::forget(runnable);
121        });
122    }
123
124    fn dispatch_after(&self, duration: Duration, runnable: Runnable) {
125        self.timer_sender
126            .send(TimerAfter { duration, runnable })
127            .ok();
128    }
129}