1use scheduler::Instant;
2use std::{
3 cell::LazyCell,
4 collections::{HashMap, VecDeque},
5 hash::{DefaultHasher, Hash, Hasher},
6 sync::Arc,
7 thread::ThreadId,
8};
9
10use serde::{Deserialize, Serialize};
11
12use crate::SharedString;
13
14#[doc(hidden)]
15#[derive(Debug, Copy, Clone)]
16pub struct TaskTiming {
17 pub location: &'static core::panic::Location<'static>,
18 pub start: Instant,
19 pub end: Option<Instant>,
20}
21
22#[doc(hidden)]
23#[derive(Debug, Clone)]
24pub struct ThreadTaskTimings {
25 pub thread_name: Option<String>,
26 pub thread_id: ThreadId,
27 pub timings: Vec<TaskTiming>,
28 pub total_pushed: u64,
29}
30
31impl ThreadTaskTimings {
32 /// Convert global thread timings into their structured format.
33 pub fn convert(timings: &[GlobalThreadTimings]) -> Vec<Self> {
34 timings
35 .iter()
36 .filter_map(|t| match t.timings.upgrade() {
37 Some(timings) => Some((t.thread_id, timings)),
38 _ => None,
39 })
40 .map(|(thread_id, timings)| {
41 let timings = timings.lock();
42 let thread_name = timings.thread_name.clone();
43 let total_pushed = timings.total_pushed;
44 let timings = &timings.timings;
45
46 let mut vec = Vec::with_capacity(timings.len());
47 let (s1, s2) = timings.as_slices();
48 vec.extend_from_slice(s1);
49 vec.extend_from_slice(s2);
50
51 ThreadTaskTimings {
52 thread_name,
53 thread_id,
54 timings: vec,
55 total_pushed,
56 }
57 })
58 .collect()
59 }
60}
61
62/// Serializable variant of [`core::panic::Location`]
63#[derive(Debug, Clone, Serialize, Deserialize)]
64pub struct SerializedLocation {
65 /// Name of the source file
66 pub file: SharedString,
67 /// Line in the source file
68 pub line: u32,
69 /// Column in the source file
70 pub column: u32,
71}
72
73impl From<&core::panic::Location<'static>> for SerializedLocation {
74 fn from(value: &core::panic::Location<'static>) -> Self {
75 SerializedLocation {
76 file: value.file().into(),
77 line: value.line(),
78 column: value.column(),
79 }
80 }
81}
82
83/// Serializable variant of [`TaskTiming`]
84#[derive(Debug, Clone, Serialize, Deserialize)]
85pub struct SerializedTaskTiming {
86 /// Location of the timing
87 pub location: SerializedLocation,
88 /// Time at which the measurement was reported in nanoseconds
89 pub start: u128,
90 /// Duration of the measurement in nanoseconds
91 pub duration: u128,
92}
93
94impl SerializedTaskTiming {
95 /// Convert an array of [`TaskTiming`] into their serializable format
96 ///
97 /// # Params
98 ///
99 /// `anchor` - [`Instant`] that should be earlier than all timings to use as base anchor
100 pub fn convert(anchor: Instant, timings: &[TaskTiming]) -> Vec<SerializedTaskTiming> {
101 let serialized = timings
102 .iter()
103 .map(|timing| {
104 let start = timing.start.duration_since(anchor).as_nanos();
105 let duration = timing
106 .end
107 .unwrap_or_else(|| Instant::now())
108 .duration_since(timing.start)
109 .as_nanos();
110 SerializedTaskTiming {
111 location: timing.location.into(),
112 start,
113 duration,
114 }
115 })
116 .collect::<Vec<_>>();
117
118 serialized
119 }
120}
121
122/// Serializable variant of [`ThreadTaskTimings`]
123#[derive(Debug, Clone, Serialize, Deserialize)]
124pub struct SerializedThreadTaskTimings {
125 /// Thread name
126 pub thread_name: Option<String>,
127 /// Hash of the thread id
128 pub thread_id: u64,
129 /// Timing records for this thread
130 pub timings: Vec<SerializedTaskTiming>,
131}
132
133impl SerializedThreadTaskTimings {
134 /// Convert [`ThreadTaskTimings`] into their serializable format
135 ///
136 /// # Params
137 ///
138 /// `anchor` - [`Instant`] that should be earlier than all timings to use as base anchor
139 pub fn convert(anchor: Instant, timings: ThreadTaskTimings) -> SerializedThreadTaskTimings {
140 let serialized_timings = SerializedTaskTiming::convert(anchor, &timings.timings);
141
142 let mut hasher = DefaultHasher::new();
143 timings.thread_id.hash(&mut hasher);
144 let thread_id = hasher.finish();
145
146 SerializedThreadTaskTimings {
147 thread_name: timings.thread_name,
148 thread_id,
149 timings: serialized_timings,
150 }
151 }
152}
153
154#[doc(hidden)]
155#[derive(Debug, Clone)]
156pub struct ThreadTimingsDelta {
157 /// Hashed thread id
158 pub thread_id: u64,
159 /// Thread name, if known
160 pub thread_name: Option<String>,
161 /// New timings since the last call. If the circular buffer wrapped around
162 /// since the previous poll, some entries may have been lost.
163 pub new_timings: Vec<SerializedTaskTiming>,
164}
165
166/// Tracks which timing events have already been seen so that callers can request only unseen events.
167#[doc(hidden)]
168pub struct ProfilingCollector {
169 startup_time: Instant,
170 cursors: HashMap<ThreadId, u64>,
171}
172
173impl ProfilingCollector {
174 pub fn new(startup_time: Instant) -> Self {
175 Self {
176 startup_time,
177 cursors: HashMap::default(),
178 }
179 }
180
181 pub fn startup_time(&self) -> Instant {
182 self.startup_time
183 }
184
185 pub fn collect_unseen(
186 &mut self,
187 all_timings: Vec<ThreadTaskTimings>,
188 ) -> Vec<ThreadTimingsDelta> {
189 let mut deltas = Vec::with_capacity(all_timings.len());
190
191 for thread in all_timings {
192 let mut hasher = DefaultHasher::new();
193 thread.thread_id.hash(&mut hasher);
194 let hashed_id = hasher.finish();
195
196 let prev_cursor = self.cursors.get(&thread.thread_id).copied().unwrap_or(0);
197 let buffer_len = thread.timings.len() as u64;
198 let buffer_start = thread.total_pushed.saturating_sub(buffer_len);
199
200 let mut slice = if prev_cursor < buffer_start {
201 // Cursor fell behind the buffer — some entries were evicted.
202 // Return everything still in the buffer.
203 thread.timings.as_slice()
204 } else {
205 let skip = (prev_cursor - buffer_start) as usize;
206 &thread.timings[skip.min(thread.timings.len())..]
207 };
208
209 // Don't emit the last entry if it's still in-progress (end: None).
210 let incomplete_at_end = slice.last().is_some_and(|t| t.end.is_none());
211 if incomplete_at_end {
212 slice = &slice[..slice.len() - 1];
213 }
214
215 let cursor_advance = if incomplete_at_end {
216 thread.total_pushed.saturating_sub(1)
217 } else {
218 thread.total_pushed
219 };
220
221 self.cursors.insert(thread.thread_id, cursor_advance);
222
223 if slice.is_empty() {
224 continue;
225 }
226
227 let new_timings = SerializedTaskTiming::convert(self.startup_time, slice);
228
229 deltas.push(ThreadTimingsDelta {
230 thread_id: hashed_id,
231 thread_name: thread.thread_name,
232 new_timings,
233 });
234 }
235
236 deltas
237 }
238
239 pub fn reset(&mut self) {
240 self.cursors.clear();
241 }
242}
243
244// Allow 16MiB of task timing entries.
245// VecDeque grows by doubling its capacity when full, so keep this a power of 2 to avoid wasting
246// memory.
247const MAX_TASK_TIMINGS: usize = (16 * 1024 * 1024) / core::mem::size_of::<TaskTiming>();
248
249#[doc(hidden)]
250pub(crate) type TaskTimings = VecDeque<TaskTiming>;
251
252#[doc(hidden)]
253pub type GuardedTaskTimings = spin::Mutex<ThreadTimings>;
254
255#[doc(hidden)]
256pub struct GlobalThreadTimings {
257 pub thread_id: ThreadId,
258 pub timings: std::sync::Weak<GuardedTaskTimings>,
259}
260
261#[doc(hidden)]
262pub static GLOBAL_THREAD_TIMINGS: spin::Mutex<Vec<GlobalThreadTimings>> =
263 spin::Mutex::new(Vec::new());
264
265thread_local! {
266 #[doc(hidden)]
267 pub static THREAD_TIMINGS: LazyCell<Arc<GuardedTaskTimings>> = LazyCell::new(|| {
268 let current_thread = std::thread::current();
269 let thread_name = current_thread.name();
270 let thread_id = current_thread.id();
271 let timings = ThreadTimings::new(thread_name.map(|e| e.to_string()), thread_id);
272 let timings = Arc::new(spin::Mutex::new(timings));
273
274 {
275 let timings = Arc::downgrade(&timings);
276 let global_timings = GlobalThreadTimings {
277 thread_id: std::thread::current().id(),
278 timings,
279 };
280 GLOBAL_THREAD_TIMINGS.lock().push(global_timings);
281 }
282
283 timings
284 });
285}
286
287#[doc(hidden)]
288pub struct ThreadTimings {
289 pub thread_name: Option<String>,
290 pub thread_id: ThreadId,
291 pub timings: TaskTimings,
292 pub total_pushed: u64,
293}
294
295impl ThreadTimings {
296 pub fn new(thread_name: Option<String>, thread_id: ThreadId) -> Self {
297 ThreadTimings {
298 thread_name,
299 thread_id,
300 timings: TaskTimings::new(),
301 total_pushed: 0,
302 }
303 }
304
305 /// If this task is the same as the last task, update the end time of the last task.
306 ///
307 /// Otherwise, add the new task timing to the list.
308 pub fn add_task_timing(&mut self, timing: TaskTiming) {
309 if let Some(last_timing) = self.timings.back_mut()
310 && last_timing.location == timing.location
311 && last_timing.start == timing.start
312 {
313 last_timing.end = timing.end;
314 } else {
315 while self.timings.len() + 1 > MAX_TASK_TIMINGS {
316 // This should only ever pop one element because it matches the insertion below.
317 self.timings.pop_front();
318 }
319 self.timings.push_back(timing);
320 self.total_pushed += 1;
321 }
322 }
323
324 pub fn get_thread_task_timings(&self) -> ThreadTaskTimings {
325 ThreadTaskTimings {
326 thread_name: self.thread_name.clone(),
327 thread_id: self.thread_id,
328 timings: self.timings.iter().cloned().collect(),
329 total_pushed: self.total_pushed,
330 }
331 }
332}
333
334impl Drop for ThreadTimings {
335 fn drop(&mut self) {
336 let mut thread_timings = GLOBAL_THREAD_TIMINGS.lock();
337
338 let Some((index, _)) = thread_timings
339 .iter()
340 .enumerate()
341 .find(|(_, t)| t.thread_id == self.thread_id)
342 else {
343 return;
344 };
345 thread_timings.swap_remove(index);
346 }
347}
348
349#[doc(hidden)]
350pub fn add_task_timing(timing: TaskTiming) {
351 THREAD_TIMINGS.with(|timings| {
352 timings.lock().add_task_timing(timing);
353 });
354}
355
356#[doc(hidden)]
357pub fn get_current_thread_task_timings() -> ThreadTaskTimings {
358 THREAD_TIMINGS.with(|timings| timings.lock().get_thread_task_timings())
359}