1use scheduler::Instant;
2use std::{
3 cell::LazyCell,
4 collections::HashMap,
5 hash::Hasher,
6 hash::{DefaultHasher, Hash},
7 sync::Arc,
8 thread::ThreadId,
9};
10
11use serde::{Deserialize, Serialize};
12
13use crate::SharedString;
14
15#[doc(hidden)]
16#[derive(Debug, Copy, Clone)]
17pub struct TaskTiming {
18 pub location: &'static core::panic::Location<'static>,
19 pub start: Instant,
20 pub end: Option<Instant>,
21}
22
23#[doc(hidden)]
24#[derive(Debug, Clone)]
25pub struct ThreadTaskTimings {
26 pub thread_name: Option<String>,
27 pub thread_id: ThreadId,
28 pub timings: Vec<TaskTiming>,
29 pub total_pushed: u64,
30}
31
32impl ThreadTaskTimings {
33 /// Convert global thread timings into their structured format.
34 pub fn convert(timings: &[GlobalThreadTimings]) -> Vec<Self> {
35 timings
36 .iter()
37 .filter_map(|t| match t.timings.upgrade() {
38 Some(timings) => Some((t.thread_id, timings)),
39 _ => None,
40 })
41 .map(|(thread_id, timings)| {
42 let timings = timings.lock();
43 let thread_name = timings.thread_name.clone();
44 let total_pushed = timings.total_pushed;
45 let timings = &timings.timings;
46
47 let mut vec = Vec::with_capacity(timings.len());
48
49 let (s1, s2) = timings.as_slices();
50 vec.extend_from_slice(s1);
51 vec.extend_from_slice(s2);
52
53 ThreadTaskTimings {
54 thread_name,
55 thread_id,
56 timings: vec,
57 total_pushed,
58 }
59 })
60 .collect()
61 }
62}
63
64/// Serializable variant of [`core::panic::Location`]
65#[derive(Debug, Clone, Serialize, Deserialize)]
66pub struct SerializedLocation {
67 /// Name of the source file
68 pub file: SharedString,
69 /// Line in the source file
70 pub line: u32,
71 /// Column in the source file
72 pub column: u32,
73}
74
75impl From<&core::panic::Location<'static>> for SerializedLocation {
76 fn from(value: &core::panic::Location<'static>) -> Self {
77 SerializedLocation {
78 file: value.file().into(),
79 line: value.line(),
80 column: value.column(),
81 }
82 }
83}
84
85/// Serializable variant of [`TaskTiming`]
86#[derive(Debug, Clone, Serialize, Deserialize)]
87pub struct SerializedTaskTiming {
88 /// Location of the timing
89 pub location: SerializedLocation,
90 /// Time at which the measurement was reported in nanoseconds
91 pub start: u128,
92 /// Duration of the measurement in nanoseconds
93 pub duration: u128,
94}
95
96impl SerializedTaskTiming {
97 /// Convert an array of [`TaskTiming`] into their serializable format
98 ///
99 /// # Params
100 ///
101 /// `anchor` - [`Instant`] that should be earlier than all timings to use as base anchor
102 pub fn convert(anchor: Instant, timings: &[TaskTiming]) -> Vec<SerializedTaskTiming> {
103 let serialized = timings
104 .iter()
105 .map(|timing| {
106 let start = timing.start.duration_since(anchor).as_nanos();
107 let duration = timing
108 .end
109 .unwrap_or_else(|| Instant::now())
110 .duration_since(timing.start)
111 .as_nanos();
112 SerializedTaskTiming {
113 location: timing.location.into(),
114 start,
115 duration,
116 }
117 })
118 .collect::<Vec<_>>();
119
120 serialized
121 }
122}
123
124/// Serializable variant of [`ThreadTaskTimings`]
125#[derive(Debug, Clone, Serialize, Deserialize)]
126pub struct SerializedThreadTaskTimings {
127 /// Thread name
128 pub thread_name: Option<String>,
129 /// Hash of the thread id
130 pub thread_id: u64,
131 /// Timing records for this thread
132 pub timings: Vec<SerializedTaskTiming>,
133}
134
135impl SerializedThreadTaskTimings {
136 /// Convert [`ThreadTaskTimings`] into their serializable format
137 ///
138 /// # Params
139 ///
140 /// `anchor` - [`Instant`] that should be earlier than all timings to use as base anchor
141 pub fn convert(anchor: Instant, timings: ThreadTaskTimings) -> SerializedThreadTaskTimings {
142 let serialized_timings = SerializedTaskTiming::convert(anchor, &timings.timings);
143
144 let mut hasher = DefaultHasher::new();
145 timings.thread_id.hash(&mut hasher);
146 let thread_id = hasher.finish();
147
148 SerializedThreadTaskTimings {
149 thread_name: timings.thread_name,
150 thread_id,
151 timings: serialized_timings,
152 }
153 }
154}
155
156#[doc(hidden)]
157#[derive(Debug, Clone)]
158pub struct ThreadTimingsDelta {
159 /// Hashed thread id
160 pub thread_id: u64,
161 /// Thread name, if known
162 pub thread_name: Option<String>,
163 /// New timings since the last call. If the circular buffer wrapped around
164 /// since the previous poll, some entries may have been lost.
165 pub new_timings: Vec<SerializedTaskTiming>,
166}
167
168/// Tracks which timing events have already been seen so that callers can request only unseen events.
169#[doc(hidden)]
170pub struct ProfilingCollector {
171 startup_time: Instant,
172 cursors: HashMap<u64, u64>,
173}
174
175impl ProfilingCollector {
176 pub fn new(startup_time: Instant) -> Self {
177 Self {
178 startup_time,
179 cursors: HashMap::default(),
180 }
181 }
182
183 pub fn startup_time(&self) -> Instant {
184 self.startup_time
185 }
186
187 pub fn collect_unseen(
188 &mut self,
189 all_timings: Vec<ThreadTaskTimings>,
190 ) -> Vec<ThreadTimingsDelta> {
191 let mut deltas = Vec::with_capacity(all_timings.len());
192
193 for thread in all_timings {
194 let mut hasher = DefaultHasher::new();
195 thread.thread_id.hash(&mut hasher);
196 let hashed_id = hasher.finish();
197
198 let prev_cursor = self.cursors.get(&hashed_id).copied().unwrap_or(0);
199 let buffer_len = thread.timings.len() as u64;
200 let buffer_start = thread.total_pushed.saturating_sub(buffer_len);
201
202 let mut slice = if prev_cursor < buffer_start {
203 // Cursor fell behind the buffer — some entries were evicted.
204 // Return everything still in the buffer.
205 thread.timings.as_slice()
206 } else {
207 let skip = (prev_cursor - buffer_start) as usize;
208 &thread.timings[skip..]
209 };
210
211 // Don't emit the last entry if it's still in-progress (end: None).
212 let incomplete_at_end = slice.last().is_some_and(|t| t.end.is_none());
213 if incomplete_at_end {
214 slice = &slice[..slice.len() - 1];
215 }
216
217 let cursor_advance = if incomplete_at_end {
218 thread.total_pushed - 1
219 } else {
220 thread.total_pushed
221 };
222
223 self.cursors.insert(hashed_id, cursor_advance);
224
225 if slice.is_empty() {
226 continue;
227 }
228
229 let new_timings = SerializedTaskTiming::convert(self.startup_time, slice);
230
231 deltas.push(ThreadTimingsDelta {
232 thread_id: hashed_id,
233 thread_name: thread.thread_name,
234 new_timings,
235 });
236 }
237
238 deltas
239 }
240
241 pub fn reset(&mut self) {
242 self.cursors.clear();
243 }
244}
245
246// Allow 20mb of task timing entries
247const MAX_TASK_TIMINGS: usize = (20 * 1024 * 1024) / core::mem::size_of::<TaskTiming>();
248
249#[doc(hidden)]
250pub type TaskTimings = circular_buffer::CircularBuffer<MAX_TASK_TIMINGS, TaskTiming>;
251#[doc(hidden)]
252pub type GuardedTaskTimings = spin::Mutex<ThreadTimings>;
253
254#[doc(hidden)]
255pub struct GlobalThreadTimings {
256 pub thread_id: ThreadId,
257 pub timings: std::sync::Weak<GuardedTaskTimings>,
258}
259
260#[doc(hidden)]
261pub static GLOBAL_THREAD_TIMINGS: spin::Mutex<Vec<GlobalThreadTimings>> =
262 spin::Mutex::new(Vec::new());
263
264thread_local! {
265 #[doc(hidden)]
266 pub static THREAD_TIMINGS: LazyCell<Arc<GuardedTaskTimings>> = LazyCell::new(|| {
267 let current_thread = std::thread::current();
268 let thread_name = current_thread.name();
269 let thread_id = current_thread.id();
270 let timings = ThreadTimings::new(thread_name.map(|e| e.to_string()), thread_id);
271 let timings = Arc::new(spin::Mutex::new(timings));
272
273 {
274 let timings = Arc::downgrade(&timings);
275 let global_timings = GlobalThreadTimings {
276 thread_id: std::thread::current().id(),
277 timings,
278 };
279 GLOBAL_THREAD_TIMINGS.lock().push(global_timings);
280 }
281
282 timings
283 });
284}
285
286#[doc(hidden)]
287pub struct ThreadTimings {
288 pub thread_name: Option<String>,
289 pub thread_id: ThreadId,
290 pub timings: Box<TaskTimings>,
291 pub total_pushed: u64,
292}
293
294impl ThreadTimings {
295 pub fn new(thread_name: Option<String>, thread_id: ThreadId) -> Self {
296 ThreadTimings {
297 thread_name,
298 thread_id,
299 timings: TaskTimings::boxed(),
300 total_pushed: 0,
301 }
302 }
303}
304
305impl Drop for ThreadTimings {
306 fn drop(&mut self) {
307 let mut thread_timings = GLOBAL_THREAD_TIMINGS.lock();
308
309 let Some((index, _)) = thread_timings
310 .iter()
311 .enumerate()
312 .find(|(_, t)| t.thread_id == self.thread_id)
313 else {
314 return;
315 };
316 thread_timings.swap_remove(index);
317 }
318}
319
320#[doc(hidden)]
321#[allow(dead_code)] // Used by Linux and Windows dispatchers, not macOS
322pub fn add_task_timing(timing: TaskTiming) {
323 THREAD_TIMINGS.with(|timings| {
324 let mut timings = timings.lock();
325
326 if let Some(last_timing) = timings.timings.back_mut() {
327 if last_timing.location == timing.location && last_timing.start == timing.start {
328 last_timing.end = timing.end;
329 return;
330 }
331 }
332
333 timings.timings.push_back(timing);
334 timings.total_pushed += 1;
335 });
336}