1use calloop::{
2 EventLoop, PostAction,
3 channel::{self, Sender},
4 timer::TimeoutAction,
5};
6use util::ResultExt;
7
8use std::{
9 mem::MaybeUninit,
10 thread,
11 time::{Duration, Instant},
12};
13
14use gpui::{
15 GLOBAL_THREAD_TIMINGS, PlatformDispatcher, Priority, PriorityQueueReceiver,
16 PriorityQueueSender, RunnableVariant, THREAD_TIMINGS, TaskTiming, ThreadTaskTimings, profiler,
17};
18
19struct TimerAfter {
20 duration: Duration,
21 runnable: RunnableVariant,
22}
23
24pub(crate) struct LinuxDispatcher {
25 main_sender: PriorityQueueCalloopSender<RunnableVariant>,
26 timer_sender: Sender<TimerAfter>,
27 background_sender: PriorityQueueSender<RunnableVariant>,
28 _background_threads: Vec<thread::JoinHandle<()>>,
29 main_thread_id: thread::ThreadId,
30}
31
32const MIN_THREADS: usize = 2;
33
34impl LinuxDispatcher {
35 pub fn new(main_sender: PriorityQueueCalloopSender<RunnableVariant>) -> Self {
36 let (background_sender, background_receiver) = PriorityQueueReceiver::new();
37 let thread_count =
38 std::thread::available_parallelism().map_or(MIN_THREADS, |i| i.get().max(MIN_THREADS));
39
40 let mut background_threads = (0..thread_count)
41 .map(|i| {
42 let receiver: PriorityQueueReceiver<RunnableVariant> = background_receiver.clone();
43 std::thread::Builder::new()
44 .name(format!("Worker-{i}"))
45 .spawn(move || {
46 for runnable in receiver.iter() {
47 // Check if the executor that spawned this task was closed
48 if runnable.metadata().is_closed() {
49 continue;
50 }
51
52 let start = Instant::now();
53
54 let location = runnable.metadata().location;
55 let mut timing = TaskTiming {
56 location,
57 start,
58 end: None,
59 };
60 profiler::add_task_timing(timing);
61
62 runnable.run();
63
64 let end = Instant::now();
65 timing.end = Some(end);
66 profiler::add_task_timing(timing);
67
68 log::trace!(
69 "background thread {}: ran runnable. took: {:?}",
70 i,
71 start.elapsed()
72 );
73 }
74 })
75 .unwrap()
76 })
77 .collect::<Vec<_>>();
78
79 let (timer_sender, timer_channel) = calloop::channel::channel::<TimerAfter>();
80 let timer_thread = std::thread::Builder::new()
81 .name("Timer".to_owned())
82 .spawn(move || {
83 let mut event_loop: EventLoop<()> =
84 EventLoop::try_new().expect("Failed to initialize timer loop!");
85
86 let handle = event_loop.handle();
87 let timer_handle = event_loop.handle();
88 handle
89 .insert_source(timer_channel, move |e, _, _| {
90 if let channel::Event::Msg(timer) = e {
91 let mut runnable = Some(timer.runnable);
92 timer_handle
93 .insert_source(
94 calloop::timer::Timer::from_duration(timer.duration),
95 move |_, _, _| {
96 if let Some(runnable) = runnable.take() {
97 // Check if the executor that spawned this task was closed
98 if runnable.metadata().is_closed() {
99 return TimeoutAction::Drop;
100 }
101
102 let start = Instant::now();
103 let location = runnable.metadata().location;
104 let mut timing = TaskTiming {
105 location,
106 start,
107 end: None,
108 };
109 profiler::add_task_timing(timing);
110
111 runnable.run();
112 let end = Instant::now();
113
114 timing.end = Some(end);
115 profiler::add_task_timing(timing);
116 }
117 TimeoutAction::Drop
118 },
119 )
120 .expect("Failed to start timer");
121 }
122 })
123 .expect("Failed to start timer thread");
124
125 event_loop.run(None, &mut (), |_| {}).log_err();
126 })
127 .unwrap();
128
129 background_threads.push(timer_thread);
130
131 Self {
132 main_sender,
133 timer_sender,
134 background_sender,
135 _background_threads: background_threads,
136 main_thread_id: thread::current().id(),
137 }
138 }
139}
140
141impl PlatformDispatcher for LinuxDispatcher {
142 fn get_all_timings(&self) -> Vec<gpui::ThreadTaskTimings> {
143 let global_timings = GLOBAL_THREAD_TIMINGS.lock();
144 ThreadTaskTimings::convert(&global_timings)
145 }
146
147 fn get_current_thread_timings(&self) -> gpui::ThreadTaskTimings {
148 THREAD_TIMINGS.with(|timings| {
149 let timings = timings.lock();
150 let thread_name = timings.thread_name.clone();
151 let total_pushed = timings.total_pushed;
152 let timings = &timings.timings;
153
154 let mut vec = Vec::with_capacity(timings.len());
155
156 let (s1, s2) = timings.as_slices();
157 vec.extend_from_slice(s1);
158 vec.extend_from_slice(s2);
159
160 gpui::ThreadTaskTimings {
161 thread_name,
162 thread_id: std::thread::current().id(),
163 timings: vec,
164 total_pushed,
165 }
166 })
167 }
168
169 fn is_main_thread(&self) -> bool {
170 thread::current().id() == self.main_thread_id
171 }
172
173 fn dispatch(&self, runnable: RunnableVariant, priority: Priority) {
174 self.background_sender
175 .send(priority, runnable)
176 .unwrap_or_else(|_| panic!("blocking sender returned without value"));
177 }
178
179 fn dispatch_on_main_thread(&self, runnable: RunnableVariant, priority: Priority) {
180 self.main_sender
181 .send(priority, runnable)
182 .unwrap_or_else(|runnable| {
183 // NOTE: Runnable may wrap a Future that is !Send.
184 //
185 // This is usually safe because we only poll it on the main thread.
186 // However if the send fails, we know that:
187 // 1. main_receiver has been dropped (which implies the app is shutting down)
188 // 2. we are on a background thread.
189 // It is not safe to drop something !Send on the wrong thread, and
190 // the app will exit soon anyway, so we must forget the runnable.
191 std::mem::forget(runnable);
192 });
193 }
194
195 fn dispatch_after(&self, duration: Duration, runnable: RunnableVariant) {
196 self.timer_sender
197 .send(TimerAfter { duration, runnable })
198 .ok();
199 }
200
201 fn spawn_realtime(&self, f: Box<dyn FnOnce() + Send>) {
202 std::thread::spawn(move || {
203 // SAFETY: always safe to call
204 let thread_id = unsafe { libc::pthread_self() };
205
206 let policy = libc::SCHED_FIFO;
207 let sched_priority = 65;
208
209 // SAFETY: all sched_param members are valid when initialized to zero.
210 let mut sched_param =
211 unsafe { MaybeUninit::<libc::sched_param>::zeroed().assume_init() };
212 sched_param.sched_priority = sched_priority;
213 // SAFETY: sched_param is a valid initialized structure
214 let result = unsafe { libc::pthread_setschedparam(thread_id, policy, &sched_param) };
215 if result != 0 {
216 log::warn!("failed to set realtime thread priority");
217 }
218
219 f();
220 });
221 }
222}
223
224pub struct PriorityQueueCalloopSender<T> {
225 sender: PriorityQueueSender<T>,
226 ping: calloop::ping::Ping,
227}
228
229impl<T> PriorityQueueCalloopSender<T> {
230 fn new(tx: PriorityQueueSender<T>, ping: calloop::ping::Ping) -> Self {
231 Self { sender: tx, ping }
232 }
233
234 fn send(&self, priority: Priority, item: T) -> Result<(), gpui::queue::SendError<T>> {
235 let res = self.sender.send(priority, item);
236 if res.is_ok() {
237 self.ping.ping();
238 }
239 res
240 }
241}
242
243impl<T> Drop for PriorityQueueCalloopSender<T> {
244 fn drop(&mut self) {
245 self.ping.ping();
246 }
247}
248
249pub struct PriorityQueueCalloopReceiver<T> {
250 receiver: PriorityQueueReceiver<T>,
251 source: calloop::ping::PingSource,
252 ping: calloop::ping::Ping,
253}
254
255impl<T> PriorityQueueCalloopReceiver<T> {
256 pub fn new() -> (PriorityQueueCalloopSender<T>, Self) {
257 let (ping, source) = calloop::ping::make_ping().expect("Failed to create a Ping.");
258
259 let (tx, rx) = PriorityQueueReceiver::new();
260
261 (
262 PriorityQueueCalloopSender::new(tx, ping.clone()),
263 Self {
264 receiver: rx,
265 source,
266 ping,
267 },
268 )
269 }
270}
271
272use calloop::channel::Event;
273
274#[derive(Debug)]
275pub struct ChannelError(calloop::ping::PingError);
276
277impl std::fmt::Display for ChannelError {
278 #[cfg_attr(feature = "nightly_coverage", coverage(off))]
279 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
280 std::fmt::Display::fmt(&self.0, f)
281 }
282}
283
284impl std::error::Error for ChannelError {
285 #[cfg_attr(feature = "nightly_coverage", coverage(off))]
286 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
287 Some(&self.0)
288 }
289}
290
291impl<T> calloop::EventSource for PriorityQueueCalloopReceiver<T> {
292 type Event = Event<T>;
293 type Metadata = ();
294 type Ret = ();
295 type Error = ChannelError;
296
297 fn process_events<F>(
298 &mut self,
299 readiness: calloop::Readiness,
300 token: calloop::Token,
301 mut callback: F,
302 ) -> Result<calloop::PostAction, Self::Error>
303 where
304 F: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret,
305 {
306 let mut clear_readiness = false;
307 let mut disconnected = false;
308
309 let action = self
310 .source
311 .process_events(readiness, token, |(), &mut ()| {
312 let mut is_empty = true;
313
314 let receiver = self.receiver.clone();
315 for runnable in receiver.try_iter() {
316 match runnable {
317 Ok(r) => {
318 callback(Event::Msg(r), &mut ());
319 is_empty = false;
320 }
321 Err(_) => {
322 disconnected = true;
323 }
324 }
325 }
326
327 if disconnected {
328 callback(Event::Closed, &mut ());
329 }
330
331 if is_empty {
332 clear_readiness = true;
333 }
334 })
335 .map_err(ChannelError)?;
336
337 if disconnected {
338 Ok(PostAction::Remove)
339 } else if clear_readiness {
340 Ok(action)
341 } else {
342 // Re-notify the ping source so we can try again.
343 self.ping.ping();
344 Ok(PostAction::Continue)
345 }
346 }
347
348 fn register(
349 &mut self,
350 poll: &mut calloop::Poll,
351 token_factory: &mut calloop::TokenFactory,
352 ) -> calloop::Result<()> {
353 self.source.register(poll, token_factory)
354 }
355
356 fn reregister(
357 &mut self,
358 poll: &mut calloop::Poll,
359 token_factory: &mut calloop::TokenFactory,
360 ) -> calloop::Result<()> {
361 self.source.reregister(poll, token_factory)
362 }
363
364 fn unregister(&mut self, poll: &mut calloop::Poll) -> calloop::Result<()> {
365 self.source.unregister(poll)
366 }
367}
368
369#[cfg(test)]
370mod tests {
371 use super::*;
372
373 #[test]
374 fn calloop_works() {
375 let mut event_loop = calloop::EventLoop::try_new().unwrap();
376 let handle = event_loop.handle();
377
378 let (tx, rx) = PriorityQueueCalloopReceiver::new();
379
380 struct Data {
381 got_msg: bool,
382 got_closed: bool,
383 }
384
385 let mut data = Data {
386 got_msg: false,
387 got_closed: false,
388 };
389
390 let _channel_token = handle
391 .insert_source(rx, move |evt, &mut (), data: &mut Data| match evt {
392 Event::Msg(()) => {
393 data.got_msg = true;
394 }
395
396 Event::Closed => {
397 data.got_closed = true;
398 }
399 })
400 .unwrap();
401
402 // nothing is sent, nothing is received
403 event_loop
404 .dispatch(Some(::std::time::Duration::ZERO), &mut data)
405 .unwrap();
406
407 assert!(!data.got_msg);
408 assert!(!data.got_closed);
409 // a message is send
410
411 tx.send(Priority::Medium, ()).unwrap();
412 event_loop
413 .dispatch(Some(::std::time::Duration::ZERO), &mut data)
414 .unwrap();
415
416 assert!(data.got_msg);
417 assert!(!data.got_closed);
418
419 // the sender is dropped
420 drop(tx);
421 event_loop
422 .dispatch(Some(::std::time::Duration::ZERO), &mut data)
423 .unwrap();
424
425 assert!(data.got_msg);
426 assert!(data.got_closed);
427 }
428}
429
430// running 1 test
431// test linux::dispatcher::tests::tomato ... FAILED
432
433// failures:
434
435// ---- linux::dispatcher::tests::tomato stdout ----
436// [crates/gpui/src/platform/linux/dispatcher.rs:262:9]
437// returning 1 tasks to process
438// [crates/gpui/src/platform/linux/dispatcher.rs:480:75] evt = Msg(
439// (),
440// )
441// returning 0 tasks to process
442
443// thread 'linux::dispatcher::tests::tomato' (478301) panicked at crates/gpui/src/platform/linux/dispatcher.rs:515:9:
444// assertion failed: data.got_closed
445// note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace