Detailed changes
@@ -2193,7 +2193,7 @@ version = "3.8.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "89ec27229c38ed0eb3c0feee3d2c1d6a4379ae44f418a29a658890e062d8f365"
dependencies = [
- "darling 0.20.11",
+ "darling 0.21.3",
"ident_case",
"prettyplease",
"proc-macro2",
@@ -3319,6 +3319,7 @@ dependencies = [
"futures 0.3.31",
"fuzzy",
"gpui",
+ "livekit_client",
"log",
"menu",
"notifications",
@@ -3338,6 +3339,7 @@ dependencies = [
"ui",
"util",
"workspace",
+ "zed_actions",
]
[[package]]
@@ -17753,6 +17755,8 @@ dependencies = [
"feature_flags",
"git_ui",
"gpui",
+ "icons",
+ "livekit_client",
"notifications",
"platform_title_bar",
"project",
@@ -0,0 +1,6 @@
+<svg width="16" height="16" viewBox="0 0 16 16" fill="none" xmlns="http://www.w3.org/2000/svg">
+<path d="M12.5 3V14" stroke="#C6CAD0" stroke-width="1.2" stroke-linecap="round"/>
+<path d="M9.5 6V14" stroke="#C6CAD0" stroke-width="1.2" stroke-linecap="round"/>
+<path d="M6.5 9V14" stroke="#C6CAD0" stroke-width="1.2" stroke-linecap="round"/>
+<path d="M3.5 12V14" stroke="#C6CAD0" stroke-width="1.2" stroke-linecap="round"/>
+</svg>
@@ -0,0 +1,6 @@
+<svg width="16" height="16" viewBox="0 0 16 16" fill="none" xmlns="http://www.w3.org/2000/svg">
+<path opacity="0.2" d="M12.5 3V14" stroke="#C6CAD0" stroke-width="1.2" stroke-linecap="round"/>
+<path opacity="0.2" d="M9.5 6V14" stroke="#C6CAD0" stroke-width="1.2" stroke-linecap="round"/>
+<path d="M6.5 9V14" stroke="#C6CAD0" stroke-width="1.2" stroke-linecap="round"/>
+<path d="M3.5 12V14" stroke="#C6CAD0" stroke-width="1.2" stroke-linecap="round"/>
+</svg>
@@ -0,0 +1,6 @@
+<svg width="16" height="16" viewBox="0 0 16 16" fill="none" xmlns="http://www.w3.org/2000/svg">
+<path opacity="0.2" d="M12.5 3V14" stroke="#C6CAD0" stroke-width="1.2" stroke-linecap="round"/>
+<path d="M9.5 6V14" stroke="#C6CAD0" stroke-width="1.2" stroke-linecap="round"/>
+<path d="M6.5 9V14" stroke="#C6CAD0" stroke-width="1.2" stroke-linecap="round"/>
+<path d="M3.5 12V14" stroke="#C6CAD0" stroke-width="1.2" stroke-linecap="round"/>
+</svg>
@@ -0,0 +1,232 @@
+use gpui::{Context, Task, WeakEntity};
+use livekit_client::ConnectionQuality;
+use std::time::Duration;
+
+use super::room::Room;
+
+#[derive(Clone, Default)]
+pub struct CallStats {
+ pub connection_quality: Option<ConnectionQuality>,
+ pub effective_quality: Option<ConnectionQuality>,
+ pub latency_ms: Option<f64>,
+ pub jitter_ms: Option<f64>,
+ pub packet_loss_pct: Option<f64>,
+ pub input_lag: Option<Duration>,
+}
+
+pub struct CallDiagnostics {
+ stats: CallStats,
+ room: WeakEntity<Room>,
+ poll_task: Option<Task<()>>,
+ stats_update_task: Option<Task<()>>,
+}
+
+impl CallDiagnostics {
+ pub fn new(room: WeakEntity<Room>, cx: &mut Context<Self>) -> Self {
+ let mut this = Self {
+ stats: CallStats::default(),
+ room,
+ poll_task: None,
+ stats_update_task: None,
+ };
+ this.start_polling(cx);
+ this
+ }
+
+ pub fn stats(&self) -> &CallStats {
+ &self.stats
+ }
+
+ fn start_polling(&mut self, cx: &mut Context<Self>) {
+ self.poll_task = Some(cx.spawn(async move |this, cx| {
+ loop {
+ if this.update(cx, |this, cx| this.poll_stats(cx)).is_err() {
+ break;
+ }
+ cx.background_executor().timer(Duration::from_secs(1)).await;
+ }
+ }));
+ }
+
+ fn poll_stats(&mut self, cx: &mut Context<Self>) {
+ let Some(room) = self.room.upgrade() else {
+ return;
+ };
+
+ let connection_quality = room.read(cx).connection_quality();
+ self.stats.connection_quality = Some(connection_quality);
+ self.stats.input_lag = room.read(cx).input_lag();
+
+ let stats_future = room.read(cx).get_stats(cx);
+
+ let background_task = cx.background_executor().spawn(async move {
+ let session_stats = stats_future.await;
+ session_stats.map(|stats| compute_network_stats(&stats))
+ });
+
+ self.stats_update_task = Some(cx.spawn(async move |this, cx| {
+ let result = background_task.await;
+ this.update(cx, |this, cx| {
+ if let Some(computed) = result {
+ this.stats.latency_ms = computed.latency_ms;
+ this.stats.jitter_ms = computed.jitter_ms;
+ this.stats.packet_loss_pct = computed.packet_loss_pct;
+ }
+ let quality = this
+ .stats
+ .connection_quality
+ .unwrap_or(ConnectionQuality::Lost);
+ this.stats.effective_quality =
+ Some(effective_connection_quality(quality, &this.stats));
+ cx.notify();
+ })
+ .ok();
+ }));
+ }
+}
+
+struct ComputedNetworkStats {
+ latency_ms: Option<f64>,
+ jitter_ms: Option<f64>,
+ packet_loss_pct: Option<f64>,
+}
+
+fn compute_network_stats(stats: &livekit_client::SessionStats) -> ComputedNetworkStats {
+ let mut min_rtt: Option<f64> = None;
+ let mut max_jitter: Option<f64> = None;
+ let mut total_packets_received: u64 = 0;
+ let mut total_packets_lost: i64 = 0;
+
+ let all_stats = stats
+ .publisher_stats
+ .iter()
+ .chain(stats.subscriber_stats.iter());
+
+ for stat in all_stats {
+ extract_metrics(
+ stat,
+ &mut min_rtt,
+ &mut max_jitter,
+ &mut total_packets_received,
+ &mut total_packets_lost,
+ );
+ }
+
+ let total_expected = total_packets_received as i64 + total_packets_lost;
+ let packet_loss_pct = if total_expected > 0 {
+ Some((total_packets_lost as f64 / total_expected as f64) * 100.0)
+ } else {
+ None
+ };
+
+ ComputedNetworkStats {
+ latency_ms: min_rtt.map(|rtt| rtt * 1000.0),
+ jitter_ms: max_jitter.map(|j| j * 1000.0),
+ packet_loss_pct,
+ }
+}
+
+#[cfg(all(
+ not(rust_analyzer),
+ any(
+ test,
+ feature = "test-support",
+ all(target_os = "windows", target_env = "gnu"),
+ target_os = "freebsd"
+ )
+))]
+fn extract_metrics(
+ _stat: &livekit_client::RtcStats,
+ _min_rtt: &mut Option<f64>,
+ _max_jitter: &mut Option<f64>,
+ _total_packets_received: &mut u64,
+ _total_packets_lost: &mut i64,
+) {
+}
+
+#[cfg(any(
+ rust_analyzer,
+ not(any(
+ test,
+ feature = "test-support",
+ all(target_os = "windows", target_env = "gnu"),
+ target_os = "freebsd"
+ ))
+))]
+fn extract_metrics(
+ stat: &livekit_client::RtcStats,
+ min_rtt: &mut Option<f64>,
+ max_jitter: &mut Option<f64>,
+ total_packets_received: &mut u64,
+ total_packets_lost: &mut i64,
+) {
+ use livekit_client::RtcStats;
+
+ match stat {
+ RtcStats::CandidatePair(pair) => {
+ let rtt = pair.candidate_pair.current_round_trip_time;
+ if rtt > 0.0 {
+ *min_rtt = Some(match *min_rtt {
+ Some(current) => current.min(rtt),
+ None => rtt,
+ });
+ }
+ }
+ RtcStats::InboundRtp(inbound) => {
+ let jitter = inbound.received.jitter;
+ if jitter > 0.0 {
+ *max_jitter = Some(match *max_jitter {
+ Some(current) => current.max(jitter),
+ None => jitter,
+ });
+ }
+ *total_packets_received += inbound.received.packets_received;
+ *total_packets_lost += inbound.received.packets_lost;
+ }
+ RtcStats::RemoteInboundRtp(remote_inbound) => {
+ let rtt = remote_inbound.remote_inbound.round_trip_time;
+ if rtt > 0.0 {
+ *min_rtt = Some(match *min_rtt {
+ Some(current) => current.min(rtt),
+ None => rtt,
+ });
+ }
+ }
+ _ => {}
+ }
+}
+
+fn metric_quality(value: f64, warn_threshold: f64, error_threshold: f64) -> ConnectionQuality {
+ if value < warn_threshold {
+ ConnectionQuality::Excellent
+ } else if value < error_threshold {
+ ConnectionQuality::Poor
+ } else {
+ ConnectionQuality::Lost
+ }
+}
+
+/// Computes the effective connection quality by taking the worst of the
+/// LiveKit-reported quality and each individual metric rating.
+fn effective_connection_quality(
+ livekit_quality: ConnectionQuality,
+ stats: &CallStats,
+) -> ConnectionQuality {
+ let mut worst = livekit_quality;
+
+ if let Some(latency) = stats.latency_ms {
+ worst = worst.max(metric_quality(latency, 100.0, 300.0));
+ }
+ if let Some(jitter) = stats.jitter_ms {
+ worst = worst.max(metric_quality(jitter, 30.0, 75.0));
+ }
+ if let Some(loss) = stats.packet_loss_pct {
+ worst = worst.max(metric_quality(loss, 1.0, 5.0));
+ }
+ if let Some(lag) = stats.input_lag {
+ let lag_ms = lag.as_secs_f64() * 1000.0;
+ worst = worst.max(metric_quality(lag_ms, 20.0, 50.0));
+ }
+
+ worst
+}
@@ -1,3 +1,4 @@
+pub mod diagnostics;
pub mod participant;
pub mod room;
@@ -23,7 +23,10 @@ use livekit_client::{self as livekit, AudioStream, TrackSid};
use postage::{sink::Sink, stream::Stream, watch};
use project::Project;
use settings::Settings as _;
+use std::sync::atomic::AtomicU64;
use std::{future::Future, mem, rc::Rc, sync::Arc, time::Duration, time::Instant};
+
+use super::diagnostics::CallDiagnostics;
use util::{ResultExt, TryFutureExt, paths::PathStyle, post_inc};
use workspace::ParticipantLocation;
@@ -69,6 +72,7 @@ pub struct Room {
id: u64,
channel_id: Option<ChannelId>,
live_kit: Option<LiveKitRoom>,
+ diagnostics: Option<Entity<CallDiagnostics>>,
status: RoomStatus,
shared_projects: HashSet<WeakEntity<Project>>,
joined_projects: HashSet<WeakEntity<Project>>,
@@ -136,6 +140,7 @@ impl Room {
id,
channel_id,
live_kit: None,
+ diagnostics: None,
status: RoomStatus::Online,
shared_projects: Default::default(),
joined_projects: Default::default(),
@@ -350,6 +355,7 @@ impl Room {
self.participant_user_ids.clear();
self.client_subscriptions.clear();
self.live_kit.take();
+ self.diagnostics.take();
self.pending_room_update.take();
self.maintain_connection.take();
}
@@ -540,6 +546,42 @@ impl Room {
}
}
+ pub fn get_stats(&self, cx: &App) -> Task<Option<livekit::SessionStats>> {
+ match self.live_kit.as_ref() {
+ Some(lk) => {
+ let task = lk.room.stats_task(cx);
+ cx.background_executor()
+ .spawn(async move { task.await.ok() })
+ }
+ None => Task::ready(None),
+ }
+ }
+
+ pub fn input_lag(&self) -> Option<Duration> {
+ let us = self
+ .live_kit
+ .as_ref()?
+ .input_lag_us
+ .as_ref()?
+ .load(std::sync::atomic::Ordering::Relaxed);
+ if us > 0 {
+ Some(Duration::from_micros(us))
+ } else {
+ None
+ }
+ }
+
+ pub fn diagnostics(&self) -> Option<&Entity<CallDiagnostics>> {
+ self.diagnostics.as_ref()
+ }
+
+ pub fn connection_quality(&self) -> livekit::ConnectionQuality {
+ self.live_kit
+ .as_ref()
+ .map(|lk| lk.room.local_participant().connection_quality())
+ .unwrap_or(livekit::ConnectionQuality::Lost)
+ }
+
pub fn status(&self) -> RoomStatus {
self.status
}
@@ -1383,7 +1425,7 @@ impl Room {
};
match publication {
- Ok((publication, stream)) => {
+ Ok((publication, stream, input_lag_us)) => {
if canceled {
cx.spawn(async move |_, cx| {
room.unpublish_local_track(publication.sid(), cx).await
@@ -1393,6 +1435,7 @@ impl Room {
if live_kit.muted_by_user || live_kit.deafened {
publication.mute(cx);
}
+ live_kit.input_lag_us = Some(input_lag_us);
live_kit.microphone_track = LocalTrack::Published {
track_publication: publication,
_stream: Box::new(stream),
@@ -1623,6 +1666,7 @@ fn spawn_room_connection(
livekit::Room::connect(connection_info.server_url, connection_info.token, cx)
.await?;
+ let weak_room = this.clone();
this.update(cx, |this, cx| {
let _handle_updates = cx.spawn(async move |this, cx| {
while let Some(event) = events.next().await {
@@ -1642,12 +1686,14 @@ fn spawn_room_connection(
room: Rc::new(room),
screen_track: LocalTrack::None,
microphone_track: LocalTrack::None,
+ input_lag_us: None,
next_publish_id: 0,
muted_by_user,
deafened: false,
speaking: false,
_handle_updates,
});
+ this.diagnostics = Some(cx.new(|cx| CallDiagnostics::new(weak_room, cx)));
if !muted_by_user && this.can_use_microphone() {
this.share_microphone(cx)
@@ -1665,6 +1711,9 @@ struct LiveKitRoom {
room: Rc<livekit::Room>,
screen_track: LocalTrack<dyn ScreenCaptureStream>,
microphone_track: LocalTrack<AudioStream>,
+ /// Shared atomic storing the most recent input lag measurement in microseconds.
+ /// Written by the audio capture/transmit pipeline, read here for diagnostics.
+ input_lag_us: Option<Arc<AtomicU64>>,
/// Tracks whether we're currently in a muted state due to auto-mute from deafening or manual mute performed by user.
muted_by_user: bool,
deafened: bool,
@@ -1681,6 +1730,7 @@ impl LiveKitRoom {
} = mem::replace(&mut self.microphone_track, LocalTrack::None)
{
tracks_to_unpublish.push(track_publication.sid());
+ self.input_lag_us = None;
cx.notify();
}
@@ -1787,6 +1787,7 @@ async fn test_project_reconnect(
// While disconnected, close project 3
cx_a.update(|_| drop(project_a3));
+ executor.run_until_parked();
// Client B reconnects. They re-join the room and the remaining shared project.
server.allow_connections();
@@ -40,6 +40,7 @@ editor.workspace = true
futures.workspace = true
fuzzy.workspace = true
gpui.workspace = true
+livekit_client.workspace = true
log.workspace = true
menu.workspace = true
notifications.workspace = true
@@ -59,6 +60,7 @@ title_bar.workspace = true
ui.workspace = true
util.workspace = true
workspace.workspace = true
+zed_actions.workspace = true
[dev-dependencies]
call = { workspace = true, features = ["test-support"] }
@@ -0,0 +1,270 @@
+use call::{ActiveCall, Room, room};
+use gpui::{
+ DismissEvent, Entity, EventEmitter, FocusHandle, Focusable, FontWeight, Render, Subscription,
+ Window,
+};
+use livekit_client::ConnectionQuality;
+use ui::prelude::*;
+use workspace::{ModalView, Workspace};
+use zed_actions::ShowCallStats;
+
+pub fn init(cx: &mut App) {
+ cx.observe_new(|workspace: &mut Workspace, _, _cx| {
+ workspace.register_action(|workspace, _: &ShowCallStats, window, cx| {
+ workspace.toggle_modal(window, cx, |_window, cx| CallStatsModal::new(cx));
+ });
+ })
+ .detach();
+}
+
+pub struct CallStatsModal {
+ focus_handle: FocusHandle,
+ _active_call_subscription: Option<Subscription>,
+ _diagnostics_subscription: Option<Subscription>,
+}
+
+impl CallStatsModal {
+ fn new(cx: &mut Context<Self>) -> Self {
+ let mut this = Self {
+ focus_handle: cx.focus_handle(),
+ _active_call_subscription: None,
+ _diagnostics_subscription: None,
+ };
+
+ if let Some(active_call) = ActiveCall::try_global(cx) {
+ this._active_call_subscription =
+ Some(cx.subscribe(&active_call, Self::handle_call_event));
+ this.observe_diagnostics(cx);
+ }
+
+ this
+ }
+
+ fn observe_diagnostics(&mut self, cx: &mut Context<Self>) {
+ let diagnostics = active_room(cx).and_then(|room| room.read(cx).diagnostics().cloned());
+
+ if let Some(diagnostics) = diagnostics {
+ self._diagnostics_subscription = Some(cx.observe(&diagnostics, |_, _, cx| cx.notify()));
+ } else {
+ self._diagnostics_subscription = None;
+ }
+ }
+
+ fn handle_call_event(
+ &mut self,
+ _: Entity<ActiveCall>,
+ event: &room::Event,
+ cx: &mut Context<Self>,
+ ) {
+ match event {
+ room::Event::RoomJoined { .. } => {
+ self.observe_diagnostics(cx);
+ }
+ room::Event::RoomLeft { .. } => {
+ self._diagnostics_subscription = None;
+ cx.notify();
+ }
+ _ => {}
+ }
+ }
+
+ fn dismiss(&mut self, _: &menu::Cancel, _: &mut Window, cx: &mut Context<Self>) {
+ cx.emit(DismissEvent);
+ }
+}
+
+fn active_room(cx: &App) -> Option<Entity<Room>> {
+ ActiveCall::try_global(cx)?.read(cx).room().cloned()
+}
+
+fn quality_label(quality: Option<ConnectionQuality>) -> (&'static str, Color) {
+ match quality {
+ Some(ConnectionQuality::Excellent) => ("Excellent", Color::Success),
+ Some(ConnectionQuality::Good) => ("Good", Color::Success),
+ Some(ConnectionQuality::Poor) => ("Poor", Color::Warning),
+ Some(ConnectionQuality::Lost) => ("Lost", Color::Error),
+ None => ("โ", Color::Muted),
+ }
+}
+
+fn metric_rating(label: &str, value_ms: f64) -> (&'static str, Color) {
+ match label {
+ "Latency" => {
+ if value_ms < 100.0 {
+ ("Normal", Color::Success)
+ } else if value_ms < 300.0 {
+ ("High", Color::Warning)
+ } else {
+ ("Poor", Color::Error)
+ }
+ }
+ "Jitter" => {
+ if value_ms < 30.0 {
+ ("Normal", Color::Success)
+ } else if value_ms < 75.0 {
+ ("High", Color::Warning)
+ } else {
+ ("Poor", Color::Error)
+ }
+ }
+ _ => ("Normal", Color::Success),
+ }
+}
+
+fn input_lag_rating(value_ms: f64) -> (&'static str, Color) {
+ if value_ms < 20.0 {
+ ("Normal", Color::Success)
+ } else if value_ms < 50.0 {
+ ("High", Color::Warning)
+ } else {
+ ("Poor", Color::Error)
+ }
+}
+
+fn packet_loss_rating(loss_pct: f64) -> (&'static str, Color) {
+ if loss_pct < 1.0 {
+ ("Normal", Color::Success)
+ } else if loss_pct < 5.0 {
+ ("High", Color::Warning)
+ } else {
+ ("Poor", Color::Error)
+ }
+}
+
+impl EventEmitter<DismissEvent> for CallStatsModal {}
+impl ModalView for CallStatsModal {}
+
+impl Focusable for CallStatsModal {
+ fn focus_handle(&self, _cx: &App) -> FocusHandle {
+ self.focus_handle.clone()
+ }
+}
+
+impl Render for CallStatsModal {
+ fn render(&mut self, _window: &mut Window, cx: &mut Context<Self>) -> impl IntoElement {
+ let room = active_room(cx);
+ let is_connected = room.is_some();
+ let stats = room
+ .and_then(|room| {
+ let diagnostics = room.read(cx).diagnostics()?;
+ Some(diagnostics.read(cx).stats().clone())
+ })
+ .unwrap_or_default();
+
+ let (quality_text, quality_color) = quality_label(stats.connection_quality);
+
+ v_flex()
+ .key_context("CallStatsModal")
+ .on_action(cx.listener(Self::dismiss))
+ .track_focus(&self.focus_handle)
+ .elevation_3(cx)
+ .w(rems(24.))
+ .p_4()
+ .gap_3()
+ .child(
+ h_flex()
+ .justify_between()
+ .child(Label::new("Call Diagnostics").size(LabelSize::Large))
+ .child(
+ Label::new(quality_text)
+ .size(LabelSize::Large)
+ .color(quality_color),
+ ),
+ )
+ .when(!is_connected, |this| {
+ this.child(
+ h_flex()
+ .justify_center()
+ .py_4()
+ .child(Label::new("Not in a call").color(Color::Muted)),
+ )
+ })
+ .when(is_connected, |this| {
+ this.child(
+ v_flex()
+ .gap_1()
+ .child(
+ h_flex()
+ .gap_2()
+ .child(Label::new("Network").weight(FontWeight::SEMIBOLD)),
+ )
+ .child(self.render_metric_row(
+ "Latency",
+ "Time for data to travel to the server",
+ stats.latency_ms,
+ |v| format!("{:.0}ms", v),
+ |v| metric_rating("Latency", v),
+ ))
+ .child(self.render_metric_row(
+ "Jitter",
+ "Variance or fluctuation in latency",
+ stats.jitter_ms,
+ |v| format!("{:.0}ms", v),
+ |v| metric_rating("Jitter", v),
+ ))
+ .child(self.render_metric_row(
+ "Packet loss",
+ "Amount of data lost during transfer",
+ stats.packet_loss_pct,
+ |v| format!("{:.1}%", v),
+ |v| packet_loss_rating(v),
+ ))
+ .child(self.render_metric_row(
+ "Input lag",
+ "Delay from audio capture to WebRTC",
+ stats.input_lag.map(|d| d.as_secs_f64() * 1000.0),
+ |v| format!("{:.1}ms", v),
+ |v| input_lag_rating(v),
+ )),
+ )
+ })
+ }
+}
+
+impl CallStatsModal {
+ fn render_metric_row(
+ &self,
+ title: &str,
+ description: &str,
+ value: Option<f64>,
+ format_value: impl Fn(f64) -> String,
+ rate: impl Fn(f64) -> (&'static str, Color),
+ ) -> impl IntoElement {
+ let (rating_text, rating_color, value_text) = match value {
+ Some(v) => {
+ let (rt, rc) = rate(v);
+ (rt, rc, format_value(v))
+ }
+ None => ("โ", Color::Muted, "โ".to_string()),
+ };
+
+ h_flex()
+ .px_2()
+ .py_1()
+ .rounded_md()
+ .justify_between()
+ .child(
+ v_flex()
+ .child(Label::new(title.to_string()).size(LabelSize::Default))
+ .child(
+ Label::new(description.to_string())
+ .size(LabelSize::Small)
+ .color(Color::Muted),
+ ),
+ )
+ .child(
+ v_flex()
+ .items_end()
+ .child(
+ Label::new(rating_text)
+ .size(LabelSize::Default)
+ .color(rating_color),
+ )
+ .child(
+ Label::new(value_text)
+ .size(LabelSize::Small)
+ .color(Color::Muted),
+ ),
+ )
+ }
+}
@@ -1,3 +1,4 @@
+mod call_stats_modal;
pub mod channel_view;
pub mod collab_panel;
pub mod notification_panel;
@@ -18,6 +19,7 @@ use workspace::AppState;
// Another comment, nice.
pub fn init(app_state: &Arc<AppState>, cx: &mut App) {
+ call_stats_modal::init(cx);
channel_view::init(cx);
collab_panel::init(cx);
notification_panel::init(cx);
@@ -219,6 +219,9 @@ pub enum IconName {
Settings,
ShieldCheck,
Shift,
+ SignalHigh,
+ SignalLow,
+ SignalMedium,
Slash,
Sliders,
Space,
@@ -255,7 +255,7 @@ impl LivekitWindow {
} else {
let room = self.room.clone();
cx.spawn_in(window, async move |this, cx| {
- let (publication, stream) = room
+ let (publication, stream, _input_lag_us) = room
.publish_local_microphone_track("test_user".to_string(), false, cx)
.await
.unwrap();
@@ -67,6 +67,14 @@ pub enum Participant {
Remote(RemoteParticipant),
}
+#[derive(Copy, Clone, Debug, Eq, PartialEq, PartialOrd, Ord)]
+pub enum ConnectionQuality {
+ Excellent,
+ Good,
+ Poor,
+ Lost,
+}
+
#[derive(Debug, Clone)]
pub enum TrackPublication {
Local(LocalTrackPublication),
@@ -179,6 +187,10 @@ pub enum RoomEvent {
ActiveSpeakersChanged {
speakers: Vec<Participant>,
},
+ ConnectionQualityChanged {
+ participant: Participant,
+ quality: ConnectionQuality,
+ },
ConnectionStateChanged(ConnectionState),
Connected {
participants_with_tracks: Vec<(RemoteParticipant, Vec<RemoteTrackPublication>)>,
@@ -7,13 +7,16 @@ use gpui_tokio::Tokio;
use log::info;
use playback::capture_local_video_track;
use settings::Settings;
+use std::sync::{Arc, atomic::AtomicU64};
mod playback;
use crate::{
- LocalTrack, Participant, RemoteTrack, RoomEvent, TrackPublication,
+ ConnectionQuality, LocalTrack, Participant, RemoteTrack, RoomEvent, TrackPublication,
livekit_client::playback::Speaker,
};
+pub use livekit::SessionStats;
+pub use livekit::webrtc::stats::RtcStats;
pub use playback::AudioStream;
pub(crate) use playback::{RemoteVideoFrame, play_remote_video_track};
@@ -107,8 +110,8 @@ impl Room {
user_name: String,
is_staff: bool,
cx: &mut AsyncApp,
- ) -> Result<(LocalTrackPublication, playback::AudioStream)> {
- let (track, stream) = self
+ ) -> Result<(LocalTrackPublication, playback::AudioStream, Arc<AtomicU64>)> {
+ let (track, stream, input_lag_us) = self
.playback
.capture_local_microphone_track(user_name, is_staff, &cx)?;
let publication = self
@@ -123,7 +126,7 @@ impl Room {
)
.await?;
- Ok((publication, stream))
+ Ok((publication, stream, input_lag_us))
}
pub async fn unpublish_local_track(
@@ -158,9 +161,32 @@ impl Room {
Err(anyhow!("Client version too old to play audio in call"))
}
}
+
+ pub async fn get_stats(&self) -> Result<livekit::SessionStats> {
+ self.room.get_stats().await.map_err(anyhow::Error::from)
+ }
+
+ /// Returns a `Task` that fetches room stats on the Tokio runtime.
+ ///
+ /// LiveKit's SDK is Tokio-based, so the stats fetch must run within
+ /// a Tokio context rather than on GPUI's smol-based background executor.
+ pub fn stats_task(&self, cx: &impl gpui::AppContext) -> Task<Result<livekit::SessionStats>> {
+ let inner = self.room.clone();
+ Tokio::spawn_result(cx, async move {
+ inner.get_stats().await.map_err(anyhow::Error::from)
+ })
+ }
}
impl LocalParticipant {
+ pub fn connection_quality(&self) -> ConnectionQuality {
+ connection_quality_from_livekit(self.0.connection_quality())
+ }
+
+ pub fn audio_level(&self) -> f32 {
+ self.0.audio_level()
+ }
+
pub async fn publish_screenshare_track(
&self,
source: &dyn ScreenCaptureSource,
@@ -234,6 +260,14 @@ impl LocalTrackPublication {
}
impl RemoteParticipant {
+ pub fn connection_quality(&self) -> ConnectionQuality {
+ connection_quality_from_livekit(self.0.connection_quality())
+ }
+
+ pub fn audio_level(&self) -> f32 {
+ self.0.audio_level()
+ }
+
pub fn identity(&self) -> ParticipantIdentity {
ParticipantIdentity(self.0.identity().0)
}
@@ -297,6 +331,31 @@ impl Participant {
}
}
}
+
+ pub fn connection_quality(&self) -> ConnectionQuality {
+ match self {
+ Participant::Local(local_participant) => local_participant.connection_quality(),
+ Participant::Remote(remote_participant) => remote_participant.connection_quality(),
+ }
+ }
+
+ pub fn audio_level(&self) -> f32 {
+ match self {
+ Participant::Local(local_participant) => local_participant.audio_level(),
+ Participant::Remote(remote_participant) => remote_participant.audio_level(),
+ }
+ }
+}
+
+fn connection_quality_from_livekit(
+ quality: livekit::prelude::ConnectionQuality,
+) -> ConnectionQuality {
+ match quality {
+ livekit::prelude::ConnectionQuality::Excellent => ConnectionQuality::Excellent,
+ livekit::prelude::ConnectionQuality::Good => ConnectionQuality::Good,
+ livekit::prelude::ConnectionQuality::Poor => ConnectionQuality::Poor,
+ livekit::prelude::ConnectionQuality::Lost => ConnectionQuality::Lost,
+ }
}
fn participant_from_livekit(participant: livekit::participant::Participant) -> Participant {
@@ -474,6 +533,13 @@ fn room_event_from_livekit(event: livekit::RoomEvent) -> Option<RoomEvent> {
},
livekit::RoomEvent::Reconnecting => RoomEvent::Reconnecting,
livekit::RoomEvent::Reconnected => RoomEvent::Reconnected,
+ livekit::RoomEvent::ConnectionQualityChanged {
+ quality,
+ participant,
+ } => RoomEvent::ConnectionQualityChanged {
+ participant: participant_from_livekit(participant),
+ quality: connection_quality_from_livekit(quality),
+ },
_ => {
log::trace!("dropping livekit event: {:?}", event);
return None;
@@ -27,11 +27,16 @@ use serde::{Deserialize, Serialize};
use settings::Settings;
use std::cell::RefCell;
use std::sync::Weak;
-use std::sync::atomic::{AtomicBool, AtomicI32, Ordering};
-use std::time::Duration;
+use std::sync::atomic::{AtomicBool, AtomicI32, AtomicU64, Ordering};
+use std::time::{Duration, Instant};
use std::{borrow::Cow, collections::VecDeque, sync::Arc};
use util::{ResultExt as _, maybe};
+struct TimestampedFrame {
+ frame: AudioFrame<'static>,
+ captured_at: Instant,
+}
+
mod source;
pub(crate) struct AudioStack {
@@ -162,7 +167,7 @@ impl AudioStack {
user_name: String,
is_staff: bool,
cx: &AsyncApp,
- ) -> Result<(crate::LocalAudioTrack, AudioStream)> {
+ ) -> Result<(crate::LocalAudioTrack, AudioStream, Arc<AtomicU64>)> {
let legacy_audio_compatible =
AudioSettings::try_read_global(cx, |setting| setting.legacy_audio_compatible)
.unwrap_or(true);
@@ -202,11 +207,15 @@ impl AudioStack {
let apm = self.apm.clone();
- let (frame_tx, mut frame_rx) = futures::channel::mpsc::channel(1);
+ let input_lag_us = Arc::new(AtomicU64::new(0));
+ let (frame_tx, mut frame_rx) = futures::channel::mpsc::channel::<TimestampedFrame>(1);
let transmit_task = self.executor.spawn_with_priority(Priority::RealtimeAudio, {
+ let input_lag_us = input_lag_us.clone();
async move {
- while let Some(frame) = frame_rx.next().await {
- source.capture_frame(&frame).await.log_err();
+ while let Some(timestamped) = frame_rx.next().await {
+ let lag = timestamped.captured_at.elapsed();
+ input_lag_us.store(lag.as_micros() as u64, Ordering::Relaxed);
+ source.capture_frame(×tamped.frame).await.log_err();
}
}
});
@@ -251,6 +260,7 @@ impl AudioStack {
AudioStream::Output {
_drop: Box::new(on_drop),
},
+ input_lag_us,
))
}
@@ -345,7 +355,7 @@ impl AudioStack {
async fn capture_input(
executor: BackgroundExecutor,
apm: Arc<Mutex<apm::AudioProcessingModule>>,
- frame_tx: Sender<AudioFrame<'static>>,
+ frame_tx: Sender<TimestampedFrame>,
sample_rate: u32,
num_channels: u32,
input_audio_device: Option<DeviceId>,
@@ -376,6 +386,7 @@ impl AudioStack {
&config.config(),
config.sample_format(),
move |data, _: &_| {
+ let captured_at = Instant::now();
let data = crate::get_sample_data(config.sample_format(), data)
.log_err();
let Some(data) = data else {
@@ -409,11 +420,14 @@ impl AudioStack {
.log_err();
buf.clear();
frame_tx
- .try_send(AudioFrame {
- data: Cow::Owned(sampled),
- sample_rate,
- num_channels,
- samples_per_channel: sample_rate / 100,
+ .try_send(TimestampedFrame {
+ frame: AudioFrame {
+ data: Cow::Owned(sampled),
+ sample_rate,
+ num_channels,
+ samples_per_channel: sample_rate / 100,
+ },
+ captured_at,
})
.ok();
}
@@ -446,7 +460,7 @@ pub struct Speaker {
pub sends_legacy_audio: bool,
}
-fn send_to_livekit(mut frame_tx: Sender<AudioFrame<'static>>, mut microphone: impl Source) {
+fn send_to_livekit(mut frame_tx: Sender<TimestampedFrame>, mut microphone: impl Source) {
use cpal::Sample;
let sample_rate = microphone.sample_rate().get();
let num_channels = microphone.channels().get() as u32;
@@ -459,11 +473,14 @@ fn send_to_livekit(mut frame_tx: Sender<AudioFrame<'static>>, mut microphone: im
.map(|s| s.to_sample())
.collect();
- match frame_tx.try_send(AudioFrame {
- sample_rate,
- num_channels,
- samples_per_channel: sampled.len() as u32 / num_channels,
- data: Cow::Owned(sampled),
+ match frame_tx.try_send(TimestampedFrame {
+ frame: AudioFrame {
+ sample_rate,
+ num_channels,
+ samples_per_channel: sampled.len() as u32 / num_channels,
+ data: Cow::Owned(sampled),
+ },
+ captured_at: Instant::now(),
}) {
Ok(_) => {}
Err(err) => {
@@ -15,7 +15,7 @@ pub type LocalTrackPublication = publication::LocalTrackPublication;
pub type LocalParticipant = participant::LocalParticipant;
pub type Room = test::Room;
-pub use test::{ConnectionState, ParticipantIdentity, TrackSid};
+pub use test::{ConnectionState, ParticipantIdentity, RtcStats, SessionStats, TrackSid};
pub struct AudioStream {}
@@ -1,6 +1,6 @@
use crate::{
- AudioStream, LocalAudioTrack, LocalTrackPublication, LocalVideoTrack, Participant,
- ParticipantIdentity, RemoteTrack, RemoteTrackPublication, TrackSid,
+ AudioStream, ConnectionQuality, LocalAudioTrack, LocalTrackPublication, LocalVideoTrack,
+ Participant, ParticipantIdentity, RemoteTrack, RemoteTrackPublication, TrackSid,
test::{Room, WeakRoom},
};
use anyhow::Result;
@@ -8,6 +8,7 @@ use collections::HashMap;
use gpui::{
AsyncApp, DevicePixels, ScreenCaptureSource, ScreenCaptureStream, SourceMetadata, size,
};
+use std::sync::{Arc, atomic::AtomicU64};
#[derive(Clone, Debug)]
pub struct LocalParticipant {
@@ -28,9 +29,31 @@ impl Participant {
Participant::Remote(participant) => participant.identity.clone(),
}
}
+
+ pub fn connection_quality(&self) -> ConnectionQuality {
+ match self {
+ Participant::Local(p) => p.connection_quality(),
+ Participant::Remote(p) => p.connection_quality(),
+ }
+ }
+
+ pub fn audio_level(&self) -> f32 {
+ match self {
+ Participant::Local(p) => p.audio_level(),
+ Participant::Remote(p) => p.audio_level(),
+ }
+ }
}
impl LocalParticipant {
+ pub fn connection_quality(&self) -> ConnectionQuality {
+ ConnectionQuality::Excellent
+ }
+
+ pub fn audio_level(&self) -> f32 {
+ 0.0
+ }
+
pub async fn unpublish_track(&self, track: TrackSid, _cx: &AsyncApp) -> Result<()> {
self.room
.test_server()
@@ -41,7 +64,7 @@ impl LocalParticipant {
pub(crate) async fn publish_microphone_track(
&self,
_cx: &AsyncApp,
- ) -> Result<(LocalTrackPublication, AudioStream)> {
+ ) -> Result<(LocalTrackPublication, AudioStream, Arc<AtomicU64>)> {
let this = self.clone();
let server = this.room.test_server();
let sid = server
@@ -54,6 +77,7 @@ impl LocalParticipant {
sid,
},
AudioStream {},
+ Arc::new(AtomicU64::new(0)),
))
}
@@ -78,6 +102,14 @@ impl LocalParticipant {
}
impl RemoteParticipant {
+ pub fn connection_quality(&self) -> ConnectionQuality {
+ ConnectionQuality::Excellent
+ }
+
+ pub fn audio_level(&self) -> f32 {
+ 0.0
+ }
+
pub fn track_publications(&self) -> HashMap<TrackSid, RemoteTrackPublication> {
if let Some(room) = self.room.upgrade() {
let server = room.test_server();
@@ -10,7 +10,7 @@ use parking_lot::Mutex;
use postage::{mpsc, sink::Sink};
use std::sync::{
Arc, Weak,
- atomic::{AtomicBool, Ordering::SeqCst},
+ atomic::{AtomicBool, AtomicU64, Ordering::SeqCst},
};
#[derive(Clone, Debug, Eq, Hash, PartialEq, PartialOrd, Ord)]
@@ -40,6 +40,15 @@ pub enum ConnectionState {
Disconnected,
}
+#[derive(Clone, Debug, Default)]
+pub struct SessionStats {
+ pub publisher_stats: Vec<RtcStats>,
+ pub subscriber_stats: Vec<RtcStats>,
+}
+
+#[derive(Clone, Debug)]
+pub enum RtcStats {}
+
static SERVERS: Mutex<BTreeMap<String, Arc<TestServer>>> = Mutex::new(BTreeMap::new());
pub struct TestServer {
@@ -739,9 +748,17 @@ impl Room {
_track_name: String,
_is_staff: bool,
cx: &mut AsyncApp,
- ) -> Result<(LocalTrackPublication, AudioStream)> {
+ ) -> Result<(LocalTrackPublication, AudioStream, Arc<AtomicU64>)> {
self.local_participant().publish_microphone_track(cx).await
}
+
+ pub async fn get_stats(&self) -> Result<SessionStats> {
+ Ok(SessionStats::default())
+ }
+
+ pub fn stats_task(&self, _cx: &impl gpui::AppContext) -> gpui::Task<Result<SessionStats>> {
+ gpui::Task::ready(Ok(SessionStats::default()))
+ }
}
impl Drop for RoomState {
@@ -41,6 +41,8 @@ db.workspace = true
feature_flags.workspace = true
git_ui.workspace = true
gpui = { workspace = true, features = ["screen-capture"] }
+icons.workspace = true
+livekit_client.workspace = true
notifications.workspace = true
project.workspace = true
recent_projects.workspace = true
@@ -9,6 +9,8 @@ use gpui::{
canvas, point,
};
use gpui::{App, Task, Window};
+use icons::IconName;
+use livekit_client::ConnectionQuality;
use project::WorktreeSettings;
use rpc::proto::{self};
use settings::{Settings as _, SettingsLocation};
@@ -19,9 +21,17 @@ use ui::{
};
use util::rel_path::RelPath;
use workspace::{ParticipantLocation, notifications::DetachAndPromptErr};
+use zed_actions::ShowCallStats;
use crate::TitleBar;
+fn format_stat(value: Option<f64>, format: impl Fn(f64) -> String) -> String {
+ match value {
+ Some(v) => format(v),
+ None => "โ".to_string(),
+ }
+}
+
pub fn toggle_screen_sharing(
screen: anyhow::Result<Option<Rc<dyn ScreenCaptureSource>>>,
window: &mut Window,
@@ -347,6 +357,11 @@ impl TitleBar {
let can_share_projects = room.can_share_projects();
let screen_sharing_supported = cx.is_screen_capture_supported();
+ let stats = room
+ .diagnostics()
+ .map(|d| d.read(cx).stats().clone())
+ .unwrap_or_default();
+
let channel_store = ChannelStore::global(cx);
let channel = room
.channel_id()
@@ -354,6 +369,45 @@ impl TitleBar {
let mut children = Vec::new();
+ let effective_quality = stats.effective_quality.unwrap_or(ConnectionQuality::Lost);
+ let (signal_icon, signal_color, quality_label) = match effective_quality {
+ ConnectionQuality::Excellent => {
+ (IconName::SignalHigh, Some(Color::Success), "Excellent")
+ }
+ ConnectionQuality::Good => (IconName::SignalHigh, None, "Good"),
+ ConnectionQuality::Poor => (IconName::SignalMedium, Some(Color::Warning), "Poor"),
+ ConnectionQuality::Lost => (IconName::SignalLow, Some(Color::Error), "Lost"),
+ };
+ let quality_label: SharedString = quality_label.into();
+ children.push(
+ IconButton::new("call-quality", signal_icon)
+ .style(ButtonStyle::Subtle)
+ .icon_size(IconSize::Small)
+ .when_some(signal_color, |button, color| button.icon_color(color))
+ .tooltip(move |_window, cx| {
+ let quality_label = quality_label.clone();
+ let latency = format_stat(stats.latency_ms, |v| format!("{:.0}ms", v));
+ let jitter = format_stat(stats.jitter_ms, |v| format!("{:.0}ms", v));
+ let packet_loss = format_stat(stats.packet_loss_pct, |v| format!("{:.1}%", v));
+ let input_lag =
+ format_stat(stats.input_lag.map(|d| d.as_secs_f64() * 1000.0), |v| {
+ format!("{:.1}ms", v)
+ });
+
+ Tooltip::with_meta(
+ format!("Connection: {quality_label}"),
+ Some(&ShowCallStats),
+ format!(
+ "Latency: {latency} ยท Jitter: {jitter} ยท Loss: {packet_loss} ยท Input lag: {input_lag}",
+ ),
+ cx,
+ )
+ })
+ .on_click(move |_, window, cx| {
+ window.dispatch_action(Box::new(ShowCallStats), cx);
+ })
+ .into_any_element(),
+ );
children.push(
h_flex()
.gap_1()
@@ -158,6 +158,7 @@ pub struct TitleBar {
banner: Entity<OnboardingBanner>,
update_version: Entity<UpdateVersion>,
screen_share_popover_handle: PopoverMenuHandle<ContextMenu>,
+ _diagnostics_subscription: Option<gpui::Subscription>,
}
impl Render for TitleBar {
@@ -400,7 +401,7 @@ impl TitleBar {
.detach();
}
- Self {
+ let mut this = Self {
platform_titlebar,
application_menu,
workspace: workspace.weak_handle(),
@@ -412,7 +413,12 @@ impl TitleBar {
banner,
update_version,
screen_share_popover_handle: PopoverMenuHandle::default(),
- }
+ _diagnostics_subscription: None,
+ };
+
+ this.observe_diagnostics(cx);
+
+ this
}
fn worktree_count(&self, cx: &App) -> usize {
@@ -956,9 +962,23 @@ impl TitleBar {
}
fn active_call_changed(&mut self, cx: &mut Context<Self>) {
+ self.observe_diagnostics(cx);
cx.notify();
}
+ fn observe_diagnostics(&mut self, cx: &mut Context<Self>) {
+ let diagnostics = ActiveCall::global(cx)
+ .read(cx)
+ .room()
+ .and_then(|room| room.read(cx).diagnostics().cloned());
+
+ if let Some(diagnostics) = diagnostics {
+ self._diagnostics_subscription = Some(cx.observe(&diagnostics, |_, _, cx| cx.notify()));
+ } else {
+ self._diagnostics_subscription = None;
+ }
+ }
+
fn share_project(&mut self, cx: &mut Context<Self>) {
let active_call = ActiveCall::global(cx);
let project = self.project.clone();
@@ -110,6 +110,12 @@ pub struct Extensions {
#[serde(deny_unknown_fields)]
pub struct AcpRegistry;
+/// Show call diagnostics and connection quality statistics.
+#[derive(PartialEq, Clone, Default, Debug, Deserialize, JsonSchema, Action)]
+#[action(namespace = collab)]
+#[serde(deny_unknown_fields)]
+pub struct ShowCallStats;
+
/// Decreases the font size in the editor buffer.
#[derive(PartialEq, Clone, Default, Debug, Deserialize, JsonSchema, Action)]
#[action(namespace = zed)]