profiler.rs

  1use scheduler::Instant;
  2use std::{
  3    cell::LazyCell,
  4    collections::HashMap,
  5    hash::Hasher,
  6    hash::{DefaultHasher, Hash},
  7    sync::Arc,
  8    thread::ThreadId,
  9};
 10
 11use serde::{Deserialize, Serialize};
 12
 13use crate::SharedString;
 14
 15#[doc(hidden)]
 16#[derive(Debug, Copy, Clone)]
 17pub struct TaskTiming {
 18    pub location: &'static core::panic::Location<'static>,
 19    pub start: Instant,
 20    pub end: Option<Instant>,
 21}
 22
 23#[doc(hidden)]
 24#[derive(Debug, Clone)]
 25pub struct ThreadTaskTimings {
 26    pub thread_name: Option<String>,
 27    pub thread_id: ThreadId,
 28    pub timings: Vec<TaskTiming>,
 29    pub total_pushed: u64,
 30}
 31
 32impl ThreadTaskTimings {
 33    /// Convert global thread timings into their structured format.
 34    pub fn convert(timings: &[GlobalThreadTimings]) -> Vec<Self> {
 35        timings
 36            .iter()
 37            .filter_map(|t| match t.timings.upgrade() {
 38                Some(timings) => Some((t.thread_id, timings)),
 39                _ => None,
 40            })
 41            .map(|(thread_id, timings)| {
 42                let timings = timings.lock();
 43                let thread_name = timings.thread_name.clone();
 44                let total_pushed = timings.total_pushed;
 45                let timings = &timings.timings;
 46
 47                let mut vec = Vec::with_capacity(timings.len());
 48
 49                let (s1, s2) = timings.as_slices();
 50                vec.extend_from_slice(s1);
 51                vec.extend_from_slice(s2);
 52
 53                ThreadTaskTimings {
 54                    thread_name,
 55                    thread_id,
 56                    timings: vec,
 57                    total_pushed,
 58                }
 59            })
 60            .collect()
 61    }
 62}
 63
 64/// Serializable variant of [`core::panic::Location`]
 65#[derive(Debug, Clone, Serialize, Deserialize)]
 66pub struct SerializedLocation {
 67    /// Name of the source file
 68    pub file: SharedString,
 69    /// Line in the source file
 70    pub line: u32,
 71    /// Column in the source file
 72    pub column: u32,
 73}
 74
 75impl From<&core::panic::Location<'static>> for SerializedLocation {
 76    fn from(value: &core::panic::Location<'static>) -> Self {
 77        SerializedLocation {
 78            file: value.file().into(),
 79            line: value.line(),
 80            column: value.column(),
 81        }
 82    }
 83}
 84
 85/// Serializable variant of [`TaskTiming`]
 86#[derive(Debug, Clone, Serialize, Deserialize)]
 87pub struct SerializedTaskTiming {
 88    /// Location of the timing
 89    pub location: SerializedLocation,
 90    /// Time at which the measurement was reported in nanoseconds
 91    pub start: u128,
 92    /// Duration of the measurement in nanoseconds
 93    pub duration: u128,
 94}
 95
 96impl SerializedTaskTiming {
 97    /// Convert an array of [`TaskTiming`] into their serializable format
 98    ///
 99    /// # Params
100    ///
101    /// `anchor` - [`Instant`] that should be earlier than all timings to use as base anchor
102    pub fn convert(anchor: Instant, timings: &[TaskTiming]) -> Vec<SerializedTaskTiming> {
103        let serialized = timings
104            .iter()
105            .map(|timing| {
106                let start = timing.start.duration_since(anchor).as_nanos();
107                let duration = timing
108                    .end
109                    .unwrap_or_else(|| Instant::now())
110                    .duration_since(timing.start)
111                    .as_nanos();
112                SerializedTaskTiming {
113                    location: timing.location.into(),
114                    start,
115                    duration,
116                }
117            })
118            .collect::<Vec<_>>();
119
120        serialized
121    }
122}
123
124/// Serializable variant of [`ThreadTaskTimings`]
125#[derive(Debug, Clone, Serialize, Deserialize)]
126pub struct SerializedThreadTaskTimings {
127    /// Thread name
128    pub thread_name: Option<String>,
129    /// Hash of the thread id
130    pub thread_id: u64,
131    /// Timing records for this thread
132    pub timings: Vec<SerializedTaskTiming>,
133}
134
135impl SerializedThreadTaskTimings {
136    /// Convert [`ThreadTaskTimings`] into their serializable format
137    ///
138    /// # Params
139    ///
140    /// `anchor` - [`Instant`] that should be earlier than all timings to use as base anchor
141    pub fn convert(anchor: Instant, timings: ThreadTaskTimings) -> SerializedThreadTaskTimings {
142        let serialized_timings = SerializedTaskTiming::convert(anchor, &timings.timings);
143
144        let mut hasher = DefaultHasher::new();
145        timings.thread_id.hash(&mut hasher);
146        let thread_id = hasher.finish();
147
148        SerializedThreadTaskTimings {
149            thread_name: timings.thread_name,
150            thread_id,
151            timings: serialized_timings,
152        }
153    }
154}
155
156#[doc(hidden)]
157#[derive(Debug, Clone)]
158pub struct ThreadTimingsDelta {
159    /// Hashed thread id
160    pub thread_id: u64,
161    /// Thread name, if known
162    pub thread_name: Option<String>,
163    /// New timings since the last call. If the circular buffer wrapped around
164    /// since the previous poll, some entries may have been lost.
165    pub new_timings: Vec<SerializedTaskTiming>,
166}
167
168/// Tracks which timing events have already been seen so that callers can request only unseen events.
169#[doc(hidden)]
170pub struct ProfilingCollector {
171    startup_time: Instant,
172    cursors: HashMap<u64, u64>,
173}
174
175impl ProfilingCollector {
176    pub fn new(startup_time: Instant) -> Self {
177        Self {
178            startup_time,
179            cursors: HashMap::default(),
180        }
181    }
182
183    pub fn startup_time(&self) -> Instant {
184        self.startup_time
185    }
186
187    pub fn collect_unseen(
188        &mut self,
189        all_timings: Vec<ThreadTaskTimings>,
190    ) -> Vec<ThreadTimingsDelta> {
191        let mut deltas = Vec::with_capacity(all_timings.len());
192
193        for thread in all_timings {
194            let mut hasher = DefaultHasher::new();
195            thread.thread_id.hash(&mut hasher);
196            let hashed_id = hasher.finish();
197
198            let prev_cursor = self.cursors.get(&hashed_id).copied().unwrap_or(0);
199            let buffer_len = thread.timings.len() as u64;
200            let buffer_start = thread.total_pushed.saturating_sub(buffer_len);
201
202            let mut slice = if prev_cursor < buffer_start {
203                // Cursor fell behind the buffer — some entries were evicted.
204                // Return everything still in the buffer.
205                thread.timings.as_slice()
206            } else {
207                let skip = (prev_cursor - buffer_start) as usize;
208                &thread.timings[skip..]
209            };
210
211            // Don't emit the last entry if it's still in-progress (end: None).
212            let incomplete_at_end = slice.last().is_some_and(|t| t.end.is_none());
213            if incomplete_at_end {
214                slice = &slice[..slice.len() - 1];
215            }
216
217            let cursor_advance = if incomplete_at_end {
218                thread.total_pushed - 1
219            } else {
220                thread.total_pushed
221            };
222
223            self.cursors.insert(hashed_id, cursor_advance);
224
225            if slice.is_empty() {
226                continue;
227            }
228
229            let new_timings = SerializedTaskTiming::convert(self.startup_time, slice);
230
231            deltas.push(ThreadTimingsDelta {
232                thread_id: hashed_id,
233                thread_name: thread.thread_name,
234                new_timings,
235            });
236        }
237
238        deltas
239    }
240
241    pub fn reset(&mut self) {
242        self.cursors.clear();
243    }
244}
245
246// Allow 20mb of task timing entries
247const MAX_TASK_TIMINGS: usize = (20 * 1024 * 1024) / core::mem::size_of::<TaskTiming>();
248
249#[doc(hidden)]
250pub type TaskTimings = circular_buffer::CircularBuffer<MAX_TASK_TIMINGS, TaskTiming>;
251#[doc(hidden)]
252pub type GuardedTaskTimings = spin::Mutex<ThreadTimings>;
253
254#[doc(hidden)]
255pub struct GlobalThreadTimings {
256    pub thread_id: ThreadId,
257    pub timings: std::sync::Weak<GuardedTaskTimings>,
258}
259
260#[doc(hidden)]
261pub static GLOBAL_THREAD_TIMINGS: spin::Mutex<Vec<GlobalThreadTimings>> =
262    spin::Mutex::new(Vec::new());
263
264thread_local! {
265    #[doc(hidden)]
266    pub static THREAD_TIMINGS: LazyCell<Arc<GuardedTaskTimings>> = LazyCell::new(|| {
267        let current_thread = std::thread::current();
268        let thread_name = current_thread.name();
269        let thread_id = current_thread.id();
270        let timings = ThreadTimings::new(thread_name.map(|e| e.to_string()), thread_id);
271        let timings = Arc::new(spin::Mutex::new(timings));
272
273        {
274            let timings = Arc::downgrade(&timings);
275            let global_timings = GlobalThreadTimings {
276                thread_id: std::thread::current().id(),
277                timings,
278            };
279            GLOBAL_THREAD_TIMINGS.lock().push(global_timings);
280        }
281
282        timings
283    });
284}
285
286#[doc(hidden)]
287pub struct ThreadTimings {
288    pub thread_name: Option<String>,
289    pub thread_id: ThreadId,
290    pub timings: Box<TaskTimings>,
291    pub total_pushed: u64,
292}
293
294impl ThreadTimings {
295    pub fn new(thread_name: Option<String>, thread_id: ThreadId) -> Self {
296        ThreadTimings {
297            thread_name,
298            thread_id,
299            timings: TaskTimings::boxed(),
300            total_pushed: 0,
301        }
302    }
303}
304
305impl Drop for ThreadTimings {
306    fn drop(&mut self) {
307        let mut thread_timings = GLOBAL_THREAD_TIMINGS.lock();
308
309        let Some((index, _)) = thread_timings
310            .iter()
311            .enumerate()
312            .find(|(_, t)| t.thread_id == self.thread_id)
313        else {
314            return;
315        };
316        thread_timings.swap_remove(index);
317    }
318}
319
320#[doc(hidden)]
321#[allow(dead_code)] // Used by Linux and Windows dispatchers, not macOS
322pub fn add_task_timing(timing: TaskTiming) {
323    THREAD_TIMINGS.with(|timings| {
324        let mut timings = timings.lock();
325
326        if let Some(last_timing) = timings.timings.back_mut() {
327            if last_timing.location == timing.location && last_timing.start == timing.start {
328                last_timing.end = timing.end;
329                return;
330            }
331        }
332
333        timings.timings.push_back(timing);
334        timings.total_pushed += 1;
335    });
336}