diagnostics.rs

  1use gpui::{Context, Task, WeakEntity};
  2use livekit_client::ConnectionQuality;
  3use std::time::Duration;
  4
  5use super::room::Room;
  6
  7#[derive(Clone, Default)]
  8pub struct CallStats {
  9    pub connection_quality: Option<ConnectionQuality>,
 10    pub effective_quality: Option<ConnectionQuality>,
 11    pub latency_ms: Option<f64>,
 12    pub jitter_ms: Option<f64>,
 13    pub packet_loss_pct: Option<f64>,
 14    pub input_lag: Option<Duration>,
 15}
 16
 17pub struct CallDiagnostics {
 18    stats: CallStats,
 19    room: WeakEntity<Room>,
 20    poll_task: Option<Task<()>>,
 21    stats_update_task: Option<Task<()>>,
 22}
 23
 24impl CallDiagnostics {
 25    pub fn new(room: WeakEntity<Room>, cx: &mut Context<Self>) -> Self {
 26        let mut this = Self {
 27            stats: CallStats::default(),
 28            room,
 29            poll_task: None,
 30            stats_update_task: None,
 31        };
 32        this.start_polling(cx);
 33        this
 34    }
 35
 36    pub fn stats(&self) -> &CallStats {
 37        &self.stats
 38    }
 39
 40    fn start_polling(&mut self, cx: &mut Context<Self>) {
 41        self.poll_task = Some(cx.spawn(async move |this, cx| {
 42            loop {
 43                if this.update(cx, |this, cx| this.poll_stats(cx)).is_err() {
 44                    break;
 45                }
 46                cx.background_executor().timer(Duration::from_secs(1)).await;
 47            }
 48        }));
 49    }
 50
 51    fn poll_stats(&mut self, cx: &mut Context<Self>) {
 52        let Some(room) = self.room.upgrade() else {
 53            return;
 54        };
 55
 56        let connection_quality = room.read(cx).connection_quality();
 57        self.stats.connection_quality = Some(connection_quality);
 58        self.stats.input_lag = room.read(cx).input_lag();
 59
 60        let stats_future = room.read(cx).get_stats(cx);
 61
 62        let background_task = cx.background_executor().spawn(async move {
 63            let session_stats = stats_future.await;
 64            session_stats.map(|stats| compute_network_stats(&stats))
 65        });
 66
 67        self.stats_update_task = Some(cx.spawn(async move |this, cx| {
 68            let result = background_task.await;
 69            this.update(cx, |this, cx| {
 70                if let Some(computed) = result {
 71                    this.stats.latency_ms = computed.latency_ms;
 72                    this.stats.jitter_ms = computed.jitter_ms;
 73                    this.stats.packet_loss_pct = computed.packet_loss_pct;
 74                }
 75                let quality = this
 76                    .stats
 77                    .connection_quality
 78                    .unwrap_or(ConnectionQuality::Lost);
 79                this.stats.effective_quality =
 80                    Some(effective_connection_quality(quality, &this.stats));
 81                cx.notify();
 82            })
 83            .ok();
 84        }));
 85    }
 86}
 87
 88struct ComputedNetworkStats {
 89    latency_ms: Option<f64>,
 90    jitter_ms: Option<f64>,
 91    packet_loss_pct: Option<f64>,
 92}
 93
 94fn compute_network_stats(stats: &livekit_client::SessionStats) -> ComputedNetworkStats {
 95    let mut min_rtt: Option<f64> = None;
 96    let mut max_jitter: Option<f64> = None;
 97    let mut total_packets_received: u64 = 0;
 98    let mut total_packets_lost: i64 = 0;
 99
100    let all_stats = stats
101        .publisher_stats
102        .iter()
103        .chain(stats.subscriber_stats.iter());
104
105    for stat in all_stats {
106        extract_metrics(
107            stat,
108            &mut min_rtt,
109            &mut max_jitter,
110            &mut total_packets_received,
111            &mut total_packets_lost,
112        );
113    }
114
115    let total_expected = total_packets_received as i64 + total_packets_lost;
116    let packet_loss_pct = if total_expected > 0 {
117        Some((total_packets_lost as f64 / total_expected as f64) * 100.0)
118    } else {
119        None
120    };
121
122    ComputedNetworkStats {
123        latency_ms: min_rtt.map(|rtt| rtt * 1000.0),
124        jitter_ms: max_jitter.map(|j| j * 1000.0),
125        packet_loss_pct,
126    }
127}
128
129#[cfg(all(
130    not(rust_analyzer),
131    any(
132        test,
133        feature = "test-support",
134        all(target_os = "windows", target_env = "gnu"),
135        target_os = "freebsd"
136    )
137))]
138fn extract_metrics(
139    _stat: &livekit_client::RtcStats,
140    _min_rtt: &mut Option<f64>,
141    _max_jitter: &mut Option<f64>,
142    _total_packets_received: &mut u64,
143    _total_packets_lost: &mut i64,
144) {
145}
146
147#[cfg(any(
148    rust_analyzer,
149    not(any(
150        test,
151        feature = "test-support",
152        all(target_os = "windows", target_env = "gnu"),
153        target_os = "freebsd"
154    ))
155))]
156fn extract_metrics(
157    stat: &livekit_client::RtcStats,
158    min_rtt: &mut Option<f64>,
159    max_jitter: &mut Option<f64>,
160    total_packets_received: &mut u64,
161    total_packets_lost: &mut i64,
162) {
163    use livekit_client::RtcStats;
164
165    match stat {
166        RtcStats::CandidatePair(pair) => {
167            let rtt = pair.candidate_pair.current_round_trip_time;
168            if rtt > 0.0 {
169                *min_rtt = Some(match *min_rtt {
170                    Some(current) => current.min(rtt),
171                    None => rtt,
172                });
173            }
174        }
175        RtcStats::InboundRtp(inbound) => {
176            let jitter = inbound.received.jitter;
177            if jitter > 0.0 {
178                *max_jitter = Some(match *max_jitter {
179                    Some(current) => current.max(jitter),
180                    None => jitter,
181                });
182            }
183            *total_packets_received += inbound.received.packets_received;
184            *total_packets_lost += inbound.received.packets_lost;
185        }
186        RtcStats::RemoteInboundRtp(remote_inbound) => {
187            let rtt = remote_inbound.remote_inbound.round_trip_time;
188            if rtt > 0.0 {
189                *min_rtt = Some(match *min_rtt {
190                    Some(current) => current.min(rtt),
191                    None => rtt,
192                });
193            }
194        }
195        _ => {}
196    }
197}
198
199fn metric_quality(value: f64, warn_threshold: f64, error_threshold: f64) -> ConnectionQuality {
200    if value < warn_threshold {
201        ConnectionQuality::Excellent
202    } else if value < error_threshold {
203        ConnectionQuality::Poor
204    } else {
205        ConnectionQuality::Lost
206    }
207}
208
209/// Computes the effective connection quality by taking the worst of the
210/// LiveKit-reported quality and each individual metric rating.
211fn effective_connection_quality(
212    livekit_quality: ConnectionQuality,
213    stats: &CallStats,
214) -> ConnectionQuality {
215    let mut worst = livekit_quality;
216
217    if let Some(latency) = stats.latency_ms {
218        worst = worst.max(metric_quality(latency, 100.0, 300.0));
219    }
220    if let Some(jitter) = stats.jitter_ms {
221        worst = worst.max(metric_quality(jitter, 30.0, 75.0));
222    }
223    if let Some(loss) = stats.packet_loss_pct {
224        worst = worst.max(metric_quality(loss, 1.0, 5.0));
225    }
226    if let Some(lag) = stats.input_lag {
227        let lag_ms = lag.as_secs_f64() * 1000.0;
228        worst = worst.max(metric_quality(lag_ms, 20.0, 50.0));
229    }
230
231    worst
232}