1use crate::{PlatformDispatcher, TaskLabel};
2use async_task::Runnable;
3use calloop::{
4 EventLoop,
5 channel::{self, Sender},
6 timer::TimeoutAction,
7};
8use std::{
9 thread,
10 time::{Duration, Instant},
11};
12use util::ResultExt;
13
14struct TimerAfter {
15 duration: Duration,
16 runnable: Runnable,
17}
18
19pub(crate) struct LinuxDispatcher {
20 main_sender: Sender<Runnable>,
21 timer_sender: Sender<TimerAfter>,
22 background_sender: flume::Sender<Runnable>,
23 _background_threads: Vec<thread::JoinHandle<()>>,
24 main_thread_id: thread::ThreadId,
25}
26
27impl LinuxDispatcher {
28 pub fn new(main_sender: Sender<Runnable>) -> Self {
29 let (background_sender, background_receiver) = flume::unbounded::<Runnable>();
30 let thread_count = std::thread::available_parallelism()
31 .map(|i| i.get())
32 .unwrap_or(1);
33
34 let mut background_threads = (0..thread_count)
35 .map(|i| {
36 let receiver = background_receiver.clone();
37 std::thread::Builder::new()
38 .name(format!("Worker-{i}"))
39 .spawn(move || {
40 for runnable in receiver {
41 let start = Instant::now();
42
43 runnable.run();
44
45 log::trace!(
46 "background thread {}: ran runnable. took: {:?}",
47 i,
48 start.elapsed()
49 );
50 }
51 })
52 .unwrap()
53 })
54 .collect::<Vec<_>>();
55
56 let (timer_sender, timer_channel) = calloop::channel::channel::<TimerAfter>();
57 let timer_thread = std::thread::Builder::new()
58 .name("Timer".to_owned())
59 .spawn(|| {
60 let mut event_loop: EventLoop<()> =
61 EventLoop::try_new().expect("Failed to initialize timer loop!");
62
63 let handle = event_loop.handle();
64 let timer_handle = event_loop.handle();
65 handle
66 .insert_source(timer_channel, move |e, _, _| {
67 if let channel::Event::Msg(timer) = e {
68 // This has to be in an option to satisfy the borrow checker. The callback below should only be scheduled once.
69 let mut runnable = Some(timer.runnable);
70 timer_handle
71 .insert_source(
72 calloop::timer::Timer::from_duration(timer.duration),
73 move |_, _, _| {
74 if let Some(runnable) = runnable.take() {
75 runnable.run();
76 }
77 TimeoutAction::Drop
78 },
79 )
80 .expect("Failed to start timer");
81 }
82 })
83 .expect("Failed to start timer thread");
84
85 event_loop.run(None, &mut (), |_| {}).log_err();
86 })
87 .unwrap();
88
89 background_threads.push(timer_thread);
90
91 Self {
92 main_sender,
93 timer_sender,
94 background_sender,
95 _background_threads: background_threads,
96 main_thread_id: thread::current().id(),
97 }
98 }
99}
100
101impl PlatformDispatcher for LinuxDispatcher {
102 fn is_main_thread(&self) -> bool {
103 thread::current().id() == self.main_thread_id
104 }
105
106 fn dispatch(&self, runnable: Runnable, _: Option<TaskLabel>) {
107 self.background_sender.send(runnable).unwrap();
108 }
109
110 fn dispatch_on_main_thread(&self, runnable: Runnable) {
111 self.main_sender.send(runnable).unwrap_or_else(|runnable| {
112 // NOTE: Runnable may wrap a Future that is !Send.
113 //
114 // This is usually safe because we only poll it on the main thread.
115 // However if the send fails, we know that:
116 // 1. main_receiver has been dropped (which implies the app is shutting down)
117 // 2. we are on a background thread.
118 // It is not safe to drop something !Send on the wrong thread, and
119 // the app will exit soon anyway, so we must forget the runnable.
120 std::mem::forget(runnable);
121 });
122 }
123
124 fn dispatch_after(&self, duration: Duration, runnable: Runnable) {
125 self.timer_sender
126 .send(TimerAfter { duration, runnable })
127 .ok();
128 }
129}