profiler.rs

  1use scheduler::Instant;
  2use std::{
  3    cell::LazyCell,
  4    collections::{HashMap, VecDeque},
  5    hash::{DefaultHasher, Hash, Hasher},
  6    sync::Arc,
  7    thread::ThreadId,
  8};
  9
 10use serde::{Deserialize, Serialize};
 11
 12use crate::SharedString;
 13
 14#[doc(hidden)]
 15#[derive(Debug, Copy, Clone)]
 16pub struct TaskTiming {
 17    pub location: &'static core::panic::Location<'static>,
 18    pub start: Instant,
 19    pub end: Option<Instant>,
 20}
 21
 22#[doc(hidden)]
 23#[derive(Debug, Clone)]
 24pub struct ThreadTaskTimings {
 25    pub thread_name: Option<String>,
 26    pub thread_id: ThreadId,
 27    pub timings: Vec<TaskTiming>,
 28    pub total_pushed: u64,
 29}
 30
 31impl ThreadTaskTimings {
 32    /// Convert global thread timings into their structured format.
 33    pub fn convert(timings: &[GlobalThreadTimings]) -> Vec<Self> {
 34        timings
 35            .iter()
 36            .filter_map(|t| match t.timings.upgrade() {
 37                Some(timings) => Some((t.thread_id, timings)),
 38                _ => None,
 39            })
 40            .map(|(thread_id, timings)| {
 41                let timings = timings.lock();
 42                let thread_name = timings.thread_name.clone();
 43                let total_pushed = timings.total_pushed;
 44                let timings = &timings.timings;
 45
 46                let mut vec = Vec::with_capacity(timings.len());
 47                let (s1, s2) = timings.as_slices();
 48                vec.extend_from_slice(s1);
 49                vec.extend_from_slice(s2);
 50
 51                ThreadTaskTimings {
 52                    thread_name,
 53                    thread_id,
 54                    timings: vec,
 55                    total_pushed,
 56                }
 57            })
 58            .collect()
 59    }
 60}
 61
 62/// Serializable variant of [`core::panic::Location`]
 63#[derive(Debug, Clone, Serialize, Deserialize)]
 64pub struct SerializedLocation {
 65    /// Name of the source file
 66    pub file: SharedString,
 67    /// Line in the source file
 68    pub line: u32,
 69    /// Column in the source file
 70    pub column: u32,
 71}
 72
 73impl From<&core::panic::Location<'static>> for SerializedLocation {
 74    fn from(value: &core::panic::Location<'static>) -> Self {
 75        SerializedLocation {
 76            file: value.file().into(),
 77            line: value.line(),
 78            column: value.column(),
 79        }
 80    }
 81}
 82
 83/// Serializable variant of [`TaskTiming`]
 84#[derive(Debug, Clone, Serialize, Deserialize)]
 85pub struct SerializedTaskTiming {
 86    /// Location of the timing
 87    pub location: SerializedLocation,
 88    /// Time at which the measurement was reported in nanoseconds
 89    pub start: u128,
 90    /// Duration of the measurement in nanoseconds
 91    pub duration: u128,
 92}
 93
 94impl SerializedTaskTiming {
 95    /// Convert an array of [`TaskTiming`] into their serializable format
 96    ///
 97    /// # Params
 98    ///
 99    /// `anchor` - [`Instant`] that should be earlier than all timings to use as base anchor
100    pub fn convert(anchor: Instant, timings: &[TaskTiming]) -> Vec<SerializedTaskTiming> {
101        let serialized = timings
102            .iter()
103            .map(|timing| {
104                let start = timing.start.duration_since(anchor).as_nanos();
105                let duration = timing
106                    .end
107                    .unwrap_or_else(|| Instant::now())
108                    .duration_since(timing.start)
109                    .as_nanos();
110                SerializedTaskTiming {
111                    location: timing.location.into(),
112                    start,
113                    duration,
114                }
115            })
116            .collect::<Vec<_>>();
117
118        serialized
119    }
120}
121
122/// Serializable variant of [`ThreadTaskTimings`]
123#[derive(Debug, Clone, Serialize, Deserialize)]
124pub struct SerializedThreadTaskTimings {
125    /// Thread name
126    pub thread_name: Option<String>,
127    /// Hash of the thread id
128    pub thread_id: u64,
129    /// Timing records for this thread
130    pub timings: Vec<SerializedTaskTiming>,
131}
132
133impl SerializedThreadTaskTimings {
134    /// Convert [`ThreadTaskTimings`] into their serializable format
135    ///
136    /// # Params
137    ///
138    /// `anchor` - [`Instant`] that should be earlier than all timings to use as base anchor
139    pub fn convert(anchor: Instant, timings: ThreadTaskTimings) -> SerializedThreadTaskTimings {
140        let serialized_timings = SerializedTaskTiming::convert(anchor, &timings.timings);
141
142        let mut hasher = DefaultHasher::new();
143        timings.thread_id.hash(&mut hasher);
144        let thread_id = hasher.finish();
145
146        SerializedThreadTaskTimings {
147            thread_name: timings.thread_name,
148            thread_id,
149            timings: serialized_timings,
150        }
151    }
152}
153
154#[doc(hidden)]
155#[derive(Debug, Clone)]
156pub struct ThreadTimingsDelta {
157    /// Hashed thread id
158    pub thread_id: u64,
159    /// Thread name, if known
160    pub thread_name: Option<String>,
161    /// New timings since the last call. If the circular buffer wrapped around
162    /// since the previous poll, some entries may have been lost.
163    pub new_timings: Vec<SerializedTaskTiming>,
164}
165
166/// Tracks which timing events have already been seen so that callers can request only unseen events.
167#[doc(hidden)]
168pub struct ProfilingCollector {
169    startup_time: Instant,
170    cursors: HashMap<ThreadId, u64>,
171}
172
173impl ProfilingCollector {
174    pub fn new(startup_time: Instant) -> Self {
175        Self {
176            startup_time,
177            cursors: HashMap::default(),
178        }
179    }
180
181    pub fn startup_time(&self) -> Instant {
182        self.startup_time
183    }
184
185    pub fn collect_unseen(
186        &mut self,
187        all_timings: Vec<ThreadTaskTimings>,
188    ) -> Vec<ThreadTimingsDelta> {
189        let mut deltas = Vec::with_capacity(all_timings.len());
190
191        for thread in all_timings {
192            let mut hasher = DefaultHasher::new();
193            thread.thread_id.hash(&mut hasher);
194            let hashed_id = hasher.finish();
195
196            let prev_cursor = self.cursors.get(&thread.thread_id).copied().unwrap_or(0);
197            let buffer_len = thread.timings.len() as u64;
198            let buffer_start = thread.total_pushed.saturating_sub(buffer_len);
199
200            let mut slice = if prev_cursor < buffer_start {
201                // Cursor fell behind the buffer — some entries were evicted.
202                // Return everything still in the buffer.
203                thread.timings.as_slice()
204            } else {
205                let skip = (prev_cursor - buffer_start) as usize;
206                &thread.timings[skip.min(thread.timings.len())..]
207            };
208
209            // Don't emit the last entry if it's still in-progress (end: None).
210            let incomplete_at_end = slice.last().is_some_and(|t| t.end.is_none());
211            if incomplete_at_end {
212                slice = &slice[..slice.len() - 1];
213            }
214
215            let cursor_advance = if incomplete_at_end {
216                thread.total_pushed.saturating_sub(1)
217            } else {
218                thread.total_pushed
219            };
220
221            self.cursors.insert(thread.thread_id, cursor_advance);
222
223            if slice.is_empty() {
224                continue;
225            }
226
227            let new_timings = SerializedTaskTiming::convert(self.startup_time, slice);
228
229            deltas.push(ThreadTimingsDelta {
230                thread_id: hashed_id,
231                thread_name: thread.thread_name,
232                new_timings,
233            });
234        }
235
236        deltas
237    }
238
239    pub fn reset(&mut self) {
240        self.cursors.clear();
241    }
242}
243
244// Allow 16MiB of task timing entries.
245// VecDeque grows by doubling its capacity when full, so keep this a power of 2 to avoid wasting
246// memory.
247const MAX_TASK_TIMINGS: usize = (16 * 1024 * 1024) / core::mem::size_of::<TaskTiming>();
248
249#[doc(hidden)]
250pub(crate) type TaskTimings = VecDeque<TaskTiming>;
251
252#[doc(hidden)]
253pub type GuardedTaskTimings = spin::Mutex<ThreadTimings>;
254
255#[doc(hidden)]
256pub struct GlobalThreadTimings {
257    pub thread_id: ThreadId,
258    pub timings: std::sync::Weak<GuardedTaskTimings>,
259}
260
261#[doc(hidden)]
262pub static GLOBAL_THREAD_TIMINGS: spin::Mutex<Vec<GlobalThreadTimings>> =
263    spin::Mutex::new(Vec::new());
264
265thread_local! {
266    #[doc(hidden)]
267    pub static THREAD_TIMINGS: LazyCell<Arc<GuardedTaskTimings>> = LazyCell::new(|| {
268        let current_thread = std::thread::current();
269        let thread_name = current_thread.name();
270        let thread_id = current_thread.id();
271        let timings = ThreadTimings::new(thread_name.map(|e| e.to_string()), thread_id);
272        let timings = Arc::new(spin::Mutex::new(timings));
273
274        {
275            let timings = Arc::downgrade(&timings);
276            let global_timings = GlobalThreadTimings {
277                thread_id: std::thread::current().id(),
278                timings,
279            };
280            GLOBAL_THREAD_TIMINGS.lock().push(global_timings);
281        }
282
283        timings
284    });
285}
286
287#[doc(hidden)]
288pub struct ThreadTimings {
289    pub thread_name: Option<String>,
290    pub thread_id: ThreadId,
291    pub timings: TaskTimings,
292    pub total_pushed: u64,
293}
294
295impl ThreadTimings {
296    pub fn new(thread_name: Option<String>, thread_id: ThreadId) -> Self {
297        ThreadTimings {
298            thread_name,
299            thread_id,
300            timings: TaskTimings::new(),
301            total_pushed: 0,
302        }
303    }
304
305    /// If this task is the same as the last task, update the end time of the last task.
306    ///
307    /// Otherwise, add the new task timing to the list.
308    pub fn add_task_timing(&mut self, timing: TaskTiming) {
309        if let Some(last_timing) = self.timings.back_mut()
310            && last_timing.location == timing.location
311            && last_timing.start == timing.start
312        {
313            last_timing.end = timing.end;
314        } else {
315            while self.timings.len() + 1 > MAX_TASK_TIMINGS {
316                // This should only ever pop one element because it matches the insertion below.
317                self.timings.pop_front();
318            }
319            self.timings.push_back(timing);
320            self.total_pushed += 1;
321        }
322    }
323
324    pub fn get_thread_task_timings(&self) -> ThreadTaskTimings {
325        ThreadTaskTimings {
326            thread_name: self.thread_name.clone(),
327            thread_id: self.thread_id,
328            timings: self.timings.iter().cloned().collect(),
329            total_pushed: self.total_pushed,
330        }
331    }
332}
333
334impl Drop for ThreadTimings {
335    fn drop(&mut self) {
336        let mut thread_timings = GLOBAL_THREAD_TIMINGS.lock();
337
338        let Some((index, _)) = thread_timings
339            .iter()
340            .enumerate()
341            .find(|(_, t)| t.thread_id == self.thread_id)
342        else {
343            return;
344        };
345        thread_timings.swap_remove(index);
346    }
347}
348
349#[doc(hidden)]
350pub fn add_task_timing(timing: TaskTiming) {
351    THREAD_TIMINGS.with(|timings| {
352        timings.lock().add_task_timing(timing);
353    });
354}
355
356#[doc(hidden)]
357pub fn get_current_thread_task_timings() -> ThreadTaskTimings {
358    THREAD_TIMINGS.with(|timings| timings.lock().get_thread_task_timings())
359}