1use calloop::{
2 EventLoop, PostAction,
3 channel::{self, Sender},
4 timer::TimeoutAction,
5};
6use util::ResultExt;
7
8use std::{
9 mem::MaybeUninit,
10 thread,
11 time::{Duration, Instant},
12};
13
14use crate::{
15 GLOBAL_THREAD_TIMINGS, PlatformDispatcher, Priority, PriorityQueueReceiver,
16 PriorityQueueSender, RunnableVariant, THREAD_TIMINGS, TaskTiming, ThreadTaskTimings, profiler,
17};
18
19struct TimerAfter {
20 duration: Duration,
21 runnable: RunnableVariant,
22}
23
24pub(crate) struct LinuxDispatcher {
25 main_sender: PriorityQueueCalloopSender<RunnableVariant>,
26 timer_sender: Sender<TimerAfter>,
27 background_sender: PriorityQueueSender<RunnableVariant>,
28 _background_threads: Vec<thread::JoinHandle<()>>,
29 main_thread_id: thread::ThreadId,
30}
31
32const MIN_THREADS: usize = 2;
33
34impl LinuxDispatcher {
35 pub fn new(main_sender: PriorityQueueCalloopSender<RunnableVariant>) -> Self {
36 let (background_sender, background_receiver) = PriorityQueueReceiver::new();
37 let thread_count =
38 std::thread::available_parallelism().map_or(MIN_THREADS, |i| i.get().max(MIN_THREADS));
39
40 let mut background_threads = (0..thread_count)
41 .map(|i| {
42 let mut receiver: PriorityQueueReceiver<RunnableVariant> =
43 background_receiver.clone();
44 std::thread::Builder::new()
45 .name(format!("Worker-{i}"))
46 .spawn(move || {
47 for runnable in receiver.iter() {
48 // Check if the executor that spawned this task was closed
49 if runnable.metadata().is_closed() {
50 continue;
51 }
52
53 let start = Instant::now();
54
55 let location = runnable.metadata().location;
56 let mut timing = TaskTiming {
57 location,
58 start,
59 end: None,
60 };
61 profiler::add_task_timing(timing);
62
63 runnable.run();
64
65 let end = Instant::now();
66 timing.end = Some(end);
67 profiler::add_task_timing(timing);
68
69 log::trace!(
70 "background thread {}: ran runnable. took: {:?}",
71 i,
72 start.elapsed()
73 );
74 }
75 })
76 .unwrap()
77 })
78 .collect::<Vec<_>>();
79
80 let (timer_sender, timer_channel) = calloop::channel::channel::<TimerAfter>();
81 let timer_thread = std::thread::Builder::new()
82 .name("Timer".to_owned())
83 .spawn(move || {
84 let mut event_loop: EventLoop<()> =
85 EventLoop::try_new().expect("Failed to initialize timer loop!");
86
87 let handle = event_loop.handle();
88 let timer_handle = event_loop.handle();
89 handle
90 .insert_source(timer_channel, move |e, _, _| {
91 if let channel::Event::Msg(timer) = e {
92 let mut runnable = Some(timer.runnable);
93 timer_handle
94 .insert_source(
95 calloop::timer::Timer::from_duration(timer.duration),
96 move |_, _, _| {
97 if let Some(runnable) = runnable.take() {
98 // Check if the executor that spawned this task was closed
99 if runnable.metadata().is_closed() {
100 return TimeoutAction::Drop;
101 }
102
103 let start = Instant::now();
104 let location = runnable.metadata().location;
105 let mut timing = TaskTiming {
106 location,
107 start,
108 end: None,
109 };
110 profiler::add_task_timing(timing);
111
112 runnable.run();
113 let end = Instant::now();
114
115 timing.end = Some(end);
116 profiler::add_task_timing(timing);
117 }
118 TimeoutAction::Drop
119 },
120 )
121 .expect("Failed to start timer");
122 }
123 })
124 .expect("Failed to start timer thread");
125
126 event_loop.run(None, &mut (), |_| {}).log_err();
127 })
128 .unwrap();
129
130 background_threads.push(timer_thread);
131
132 Self {
133 main_sender,
134 timer_sender,
135 background_sender,
136 _background_threads: background_threads,
137 main_thread_id: thread::current().id(),
138 }
139 }
140}
141
142impl PlatformDispatcher for LinuxDispatcher {
143 fn get_all_timings(&self) -> Vec<crate::ThreadTaskTimings> {
144 let global_timings = GLOBAL_THREAD_TIMINGS.lock();
145 ThreadTaskTimings::convert(&global_timings)
146 }
147
148 fn get_current_thread_timings(&self) -> Vec<crate::TaskTiming> {
149 THREAD_TIMINGS.with(|timings| {
150 let timings = timings.lock();
151 let timings = &timings.timings;
152
153 let mut vec = Vec::with_capacity(timings.len());
154
155 let (s1, s2) = timings.as_slices();
156 vec.extend_from_slice(s1);
157 vec.extend_from_slice(s2);
158 vec
159 })
160 }
161
162 fn is_main_thread(&self) -> bool {
163 thread::current().id() == self.main_thread_id
164 }
165
166 fn dispatch(&self, runnable: RunnableVariant, priority: Priority) {
167 self.background_sender
168 .send(priority, runnable)
169 .unwrap_or_else(|_| panic!("blocking sender returned without value"));
170 }
171
172 fn dispatch_on_main_thread(&self, runnable: RunnableVariant, priority: Priority) {
173 self.main_sender
174 .send(priority, runnable)
175 .unwrap_or_else(|runnable| {
176 // NOTE: Runnable may wrap a Future that is !Send.
177 //
178 // This is usually safe because we only poll it on the main thread.
179 // However if the send fails, we know that:
180 // 1. main_receiver has been dropped (which implies the app is shutting down)
181 // 2. we are on a background thread.
182 // It is not safe to drop something !Send on the wrong thread, and
183 // the app will exit soon anyway, so we must forget the runnable.
184 std::mem::forget(runnable);
185 });
186 }
187
188 fn dispatch_after(&self, duration: Duration, runnable: RunnableVariant) {
189 self.timer_sender
190 .send(TimerAfter { duration, runnable })
191 .ok();
192 }
193
194 fn spawn_realtime(&self, f: Box<dyn FnOnce() + Send>) {
195 std::thread::spawn(move || {
196 // SAFETY: always safe to call
197 let thread_id = unsafe { libc::pthread_self() };
198
199 let policy = libc::SCHED_FIFO;
200 let sched_priority = 65;
201
202 // SAFETY: all sched_param members are valid when initialized to zero.
203 let mut sched_param =
204 unsafe { MaybeUninit::<libc::sched_param>::zeroed().assume_init() };
205 sched_param.sched_priority = sched_priority;
206 // SAFETY: sched_param is a valid initialized structure
207 let result = unsafe { libc::pthread_setschedparam(thread_id, policy, &sched_param) };
208 if result != 0 {
209 log::warn!("failed to set realtime thread priority");
210 }
211
212 f();
213 });
214 }
215}
216
217pub struct PriorityQueueCalloopSender<T> {
218 sender: PriorityQueueSender<T>,
219 ping: calloop::ping::Ping,
220}
221
222impl<T> PriorityQueueCalloopSender<T> {
223 fn new(tx: PriorityQueueSender<T>, ping: calloop::ping::Ping) -> Self {
224 Self { sender: tx, ping }
225 }
226
227 fn send(&self, priority: Priority, item: T) -> Result<(), crate::queue::SendError<T>> {
228 let res = self.sender.send(priority, item);
229 if res.is_ok() {
230 self.ping.ping();
231 }
232 res
233 }
234}
235
236impl<T> Drop for PriorityQueueCalloopSender<T> {
237 fn drop(&mut self) {
238 self.ping.ping();
239 }
240}
241
242pub struct PriorityQueueCalloopReceiver<T> {
243 receiver: PriorityQueueReceiver<T>,
244 source: calloop::ping::PingSource,
245 ping: calloop::ping::Ping,
246}
247
248impl<T> PriorityQueueCalloopReceiver<T> {
249 pub fn new() -> (PriorityQueueCalloopSender<T>, Self) {
250 let (ping, source) = calloop::ping::make_ping().expect("Failed to create a Ping.");
251
252 let (tx, rx) = PriorityQueueReceiver::new();
253
254 (
255 PriorityQueueCalloopSender::new(tx, ping.clone()),
256 Self {
257 receiver: rx,
258 source,
259 ping,
260 },
261 )
262 }
263}
264
265use calloop::channel::Event;
266
267#[derive(Debug)]
268pub struct ChannelError(calloop::ping::PingError);
269
270impl std::fmt::Display for ChannelError {
271 #[cfg_attr(feature = "nightly_coverage", coverage(off))]
272 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
273 std::fmt::Display::fmt(&self.0, f)
274 }
275}
276
277impl std::error::Error for ChannelError {
278 #[cfg_attr(feature = "nightly_coverage", coverage(off))]
279 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
280 Some(&self.0)
281 }
282}
283
284impl<T> calloop::EventSource for PriorityQueueCalloopReceiver<T> {
285 type Event = Event<T>;
286 type Metadata = ();
287 type Ret = ();
288 type Error = ChannelError;
289
290 fn process_events<F>(
291 &mut self,
292 readiness: calloop::Readiness,
293 token: calloop::Token,
294 mut callback: F,
295 ) -> Result<calloop::PostAction, Self::Error>
296 where
297 F: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret,
298 {
299 let mut clear_readiness = false;
300 let mut disconnected = false;
301
302 let action = self
303 .source
304 .process_events(readiness, token, |(), &mut ()| {
305 let mut is_empty = true;
306
307 let mut receiver = self.receiver.clone();
308 for runnable in receiver.try_iter() {
309 match runnable {
310 Ok(r) => {
311 callback(Event::Msg(r), &mut ());
312 is_empty = false;
313 }
314 Err(_) => {
315 disconnected = true;
316 }
317 }
318 }
319
320 if disconnected {
321 callback(Event::Closed, &mut ());
322 }
323
324 if is_empty {
325 clear_readiness = true;
326 }
327 })
328 .map_err(ChannelError)?;
329
330 if disconnected {
331 Ok(PostAction::Remove)
332 } else if clear_readiness {
333 Ok(action)
334 } else {
335 // Re-notify the ping source so we can try again.
336 self.ping.ping();
337 Ok(PostAction::Continue)
338 }
339 }
340
341 fn register(
342 &mut self,
343 poll: &mut calloop::Poll,
344 token_factory: &mut calloop::TokenFactory,
345 ) -> calloop::Result<()> {
346 self.source.register(poll, token_factory)
347 }
348
349 fn reregister(
350 &mut self,
351 poll: &mut calloop::Poll,
352 token_factory: &mut calloop::TokenFactory,
353 ) -> calloop::Result<()> {
354 self.source.reregister(poll, token_factory)
355 }
356
357 fn unregister(&mut self, poll: &mut calloop::Poll) -> calloop::Result<()> {
358 self.source.unregister(poll)
359 }
360}
361
362#[cfg(test)]
363mod tests {
364 use super::*;
365
366 #[test]
367 fn calloop_works() {
368 let mut event_loop = calloop::EventLoop::try_new().unwrap();
369 let handle = event_loop.handle();
370
371 let (tx, rx) = PriorityQueueCalloopReceiver::new();
372
373 struct Data {
374 got_msg: bool,
375 got_closed: bool,
376 }
377
378 let mut data = Data {
379 got_msg: false,
380 got_closed: false,
381 };
382
383 let _channel_token = handle
384 .insert_source(rx, move |evt, &mut (), data: &mut Data| match evt {
385 Event::Msg(()) => {
386 data.got_msg = true;
387 }
388
389 Event::Closed => {
390 data.got_closed = true;
391 }
392 })
393 .unwrap();
394
395 // nothing is sent, nothing is received
396 event_loop
397 .dispatch(Some(::std::time::Duration::ZERO), &mut data)
398 .unwrap();
399
400 assert!(!data.got_msg);
401 assert!(!data.got_closed);
402 // a message is send
403
404 tx.send(Priority::Medium, ()).unwrap();
405 event_loop
406 .dispatch(Some(::std::time::Duration::ZERO), &mut data)
407 .unwrap();
408
409 assert!(data.got_msg);
410 assert!(!data.got_closed);
411
412 // the sender is dropped
413 drop(tx);
414 event_loop
415 .dispatch(Some(::std::time::Duration::ZERO), &mut data)
416 .unwrap();
417
418 assert!(data.got_msg);
419 assert!(data.got_closed);
420 }
421}
422
423// running 1 test
424// test platform::linux::dispatcher::tests::tomato ... FAILED
425
426// failures:
427
428// ---- platform::linux::dispatcher::tests::tomato stdout ----
429// [crates/gpui/src/platform/linux/dispatcher.rs:262:9]
430// returning 1 tasks to process
431// [crates/gpui/src/platform/linux/dispatcher.rs:480:75] evt = Msg(
432// (),
433// )
434// returning 0 tasks to process
435
436// thread 'platform::linux::dispatcher::tests::tomato' (478301) panicked at crates/gpui/src/platform/linux/dispatcher.rs:515:9:
437// assertion failed: data.got_closed
438// note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace