diff --git a/Cargo.lock b/Cargo.lock index c20230354cbd0c4082e8d7b66ffffcbd39e29adf..73614e6257995ba65fcc6ce46b1ad995fd821485 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2193,7 +2193,7 @@ version = "3.8.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "89ec27229c38ed0eb3c0feee3d2c1d6a4379ae44f418a29a658890e062d8f365" dependencies = [ - "darling 0.20.11", + "darling 0.21.3", "ident_case", "prettyplease", "proc-macro2", @@ -3319,6 +3319,7 @@ dependencies = [ "futures 0.3.31", "fuzzy", "gpui", + "livekit_client", "log", "menu", "notifications", @@ -3338,6 +3339,7 @@ dependencies = [ "ui", "util", "workspace", + "zed_actions", ] [[package]] @@ -17753,6 +17755,8 @@ dependencies = [ "feature_flags", "git_ui", "gpui", + "icons", + "livekit_client", "notifications", "platform_title_bar", "project", diff --git a/assets/icons/signal_high.svg b/assets/icons/signal_high.svg new file mode 100644 index 0000000000000000000000000000000000000000..6c1fec96098242444407fb9f66a025d03a10e50b --- /dev/null +++ b/assets/icons/signal_high.svg @@ -0,0 +1,6 @@ + + + + + + diff --git a/assets/icons/signal_low.svg b/assets/icons/signal_low.svg new file mode 100644 index 0000000000000000000000000000000000000000..b0ebccdd4c8897e8fdaf013a56cc4498dc5e0fe7 --- /dev/null +++ b/assets/icons/signal_low.svg @@ -0,0 +1,6 @@ + + + + + + diff --git a/assets/icons/signal_medium.svg b/assets/icons/signal_medium.svg new file mode 100644 index 0000000000000000000000000000000000000000..3652724dc8b095dd68eb9977108711e71ffe67cb --- /dev/null +++ b/assets/icons/signal_medium.svg @@ -0,0 +1,6 @@ + + + + + + diff --git a/crates/call/src/call_impl/diagnostics.rs b/crates/call/src/call_impl/diagnostics.rs new file mode 100644 index 0000000000000000000000000000000000000000..1aa1774dfb0f598f6024c72b67e2079c01b2b8f0 --- /dev/null +++ b/crates/call/src/call_impl/diagnostics.rs @@ -0,0 +1,232 @@ +use gpui::{Context, Task, WeakEntity}; +use livekit_client::ConnectionQuality; +use std::time::Duration; + +use super::room::Room; + +#[derive(Clone, Default)] +pub struct CallStats { + pub connection_quality: Option, + pub effective_quality: Option, + pub latency_ms: Option, + pub jitter_ms: Option, + pub packet_loss_pct: Option, + pub input_lag: Option, +} + +pub struct CallDiagnostics { + stats: CallStats, + room: WeakEntity, + poll_task: Option>, + stats_update_task: Option>, +} + +impl CallDiagnostics { + pub fn new(room: WeakEntity, cx: &mut Context) -> Self { + let mut this = Self { + stats: CallStats::default(), + room, + poll_task: None, + stats_update_task: None, + }; + this.start_polling(cx); + this + } + + pub fn stats(&self) -> &CallStats { + &self.stats + } + + fn start_polling(&mut self, cx: &mut Context) { + self.poll_task = Some(cx.spawn(async move |this, cx| { + loop { + if this.update(cx, |this, cx| this.poll_stats(cx)).is_err() { + break; + } + cx.background_executor().timer(Duration::from_secs(1)).await; + } + })); + } + + fn poll_stats(&mut self, cx: &mut Context) { + let Some(room) = self.room.upgrade() else { + return; + }; + + let connection_quality = room.read(cx).connection_quality(); + self.stats.connection_quality = Some(connection_quality); + self.stats.input_lag = room.read(cx).input_lag(); + + let stats_future = room.read(cx).get_stats(cx); + + let background_task = cx.background_executor().spawn(async move { + let session_stats = stats_future.await; + session_stats.map(|stats| compute_network_stats(&stats)) + }); + + self.stats_update_task = Some(cx.spawn(async move |this, cx| { + let result = background_task.await; + this.update(cx, |this, cx| { + if let Some(computed) = result { + this.stats.latency_ms = computed.latency_ms; + this.stats.jitter_ms = computed.jitter_ms; + this.stats.packet_loss_pct = computed.packet_loss_pct; + } + let quality = this + .stats + .connection_quality + .unwrap_or(ConnectionQuality::Lost); + this.stats.effective_quality = + Some(effective_connection_quality(quality, &this.stats)); + cx.notify(); + }) + .ok(); + })); + } +} + +struct ComputedNetworkStats { + latency_ms: Option, + jitter_ms: Option, + packet_loss_pct: Option, +} + +fn compute_network_stats(stats: &livekit_client::SessionStats) -> ComputedNetworkStats { + let mut min_rtt: Option = None; + let mut max_jitter: Option = None; + let mut total_packets_received: u64 = 0; + let mut total_packets_lost: i64 = 0; + + let all_stats = stats + .publisher_stats + .iter() + .chain(stats.subscriber_stats.iter()); + + for stat in all_stats { + extract_metrics( + stat, + &mut min_rtt, + &mut max_jitter, + &mut total_packets_received, + &mut total_packets_lost, + ); + } + + let total_expected = total_packets_received as i64 + total_packets_lost; + let packet_loss_pct = if total_expected > 0 { + Some((total_packets_lost as f64 / total_expected as f64) * 100.0) + } else { + None + }; + + ComputedNetworkStats { + latency_ms: min_rtt.map(|rtt| rtt * 1000.0), + jitter_ms: max_jitter.map(|j| j * 1000.0), + packet_loss_pct, + } +} + +#[cfg(all( + not(rust_analyzer), + any( + test, + feature = "test-support", + all(target_os = "windows", target_env = "gnu"), + target_os = "freebsd" + ) +))] +fn extract_metrics( + _stat: &livekit_client::RtcStats, + _min_rtt: &mut Option, + _max_jitter: &mut Option, + _total_packets_received: &mut u64, + _total_packets_lost: &mut i64, +) { +} + +#[cfg(any( + rust_analyzer, + not(any( + test, + feature = "test-support", + all(target_os = "windows", target_env = "gnu"), + target_os = "freebsd" + )) +))] +fn extract_metrics( + stat: &livekit_client::RtcStats, + min_rtt: &mut Option, + max_jitter: &mut Option, + total_packets_received: &mut u64, + total_packets_lost: &mut i64, +) { + use livekit_client::RtcStats; + + match stat { + RtcStats::CandidatePair(pair) => { + let rtt = pair.candidate_pair.current_round_trip_time; + if rtt > 0.0 { + *min_rtt = Some(match *min_rtt { + Some(current) => current.min(rtt), + None => rtt, + }); + } + } + RtcStats::InboundRtp(inbound) => { + let jitter = inbound.received.jitter; + if jitter > 0.0 { + *max_jitter = Some(match *max_jitter { + Some(current) => current.max(jitter), + None => jitter, + }); + } + *total_packets_received += inbound.received.packets_received; + *total_packets_lost += inbound.received.packets_lost; + } + RtcStats::RemoteInboundRtp(remote_inbound) => { + let rtt = remote_inbound.remote_inbound.round_trip_time; + if rtt > 0.0 { + *min_rtt = Some(match *min_rtt { + Some(current) => current.min(rtt), + None => rtt, + }); + } + } + _ => {} + } +} + +fn metric_quality(value: f64, warn_threshold: f64, error_threshold: f64) -> ConnectionQuality { + if value < warn_threshold { + ConnectionQuality::Excellent + } else if value < error_threshold { + ConnectionQuality::Poor + } else { + ConnectionQuality::Lost + } +} + +/// Computes the effective connection quality by taking the worst of the +/// LiveKit-reported quality and each individual metric rating. +fn effective_connection_quality( + livekit_quality: ConnectionQuality, + stats: &CallStats, +) -> ConnectionQuality { + let mut worst = livekit_quality; + + if let Some(latency) = stats.latency_ms { + worst = worst.max(metric_quality(latency, 100.0, 300.0)); + } + if let Some(jitter) = stats.jitter_ms { + worst = worst.max(metric_quality(jitter, 30.0, 75.0)); + } + if let Some(loss) = stats.packet_loss_pct { + worst = worst.max(metric_quality(loss, 1.0, 5.0)); + } + if let Some(lag) = stats.input_lag { + let lag_ms = lag.as_secs_f64() * 1000.0; + worst = worst.max(metric_quality(lag_ms, 20.0, 50.0)); + } + + worst +} diff --git a/crates/call/src/call_impl/mod.rs b/crates/call/src/call_impl/mod.rs index e3945cf2c746f4c598caa7996deb2c76fc859e64..e060ec5edae6277a92c2c09ab54ded449bc56e11 100644 --- a/crates/call/src/call_impl/mod.rs +++ b/crates/call/src/call_impl/mod.rs @@ -1,3 +1,4 @@ +pub mod diagnostics; pub mod participant; pub mod room; diff --git a/crates/call/src/call_impl/room.rs b/crates/call/src/call_impl/room.rs index 701d7dd65423f97b3f4d5cfa4a198083593211e6..117789e233ab6fbc101b28e5e9f485ec17c1f79d 100644 --- a/crates/call/src/call_impl/room.rs +++ b/crates/call/src/call_impl/room.rs @@ -23,7 +23,10 @@ use livekit_client::{self as livekit, AudioStream, TrackSid}; use postage::{sink::Sink, stream::Stream, watch}; use project::Project; use settings::Settings as _; +use std::sync::atomic::AtomicU64; use std::{future::Future, mem, rc::Rc, sync::Arc, time::Duration, time::Instant}; + +use super::diagnostics::CallDiagnostics; use util::{ResultExt, TryFutureExt, paths::PathStyle, post_inc}; use workspace::ParticipantLocation; @@ -69,6 +72,7 @@ pub struct Room { id: u64, channel_id: Option, live_kit: Option, + diagnostics: Option>, status: RoomStatus, shared_projects: HashSet>, joined_projects: HashSet>, @@ -136,6 +140,7 @@ impl Room { id, channel_id, live_kit: None, + diagnostics: None, status: RoomStatus::Online, shared_projects: Default::default(), joined_projects: Default::default(), @@ -350,6 +355,7 @@ impl Room { self.participant_user_ids.clear(); self.client_subscriptions.clear(); self.live_kit.take(); + self.diagnostics.take(); self.pending_room_update.take(); self.maintain_connection.take(); } @@ -540,6 +546,42 @@ impl Room { } } + pub fn get_stats(&self, cx: &App) -> Task> { + match self.live_kit.as_ref() { + Some(lk) => { + let task = lk.room.stats_task(cx); + cx.background_executor() + .spawn(async move { task.await.ok() }) + } + None => Task::ready(None), + } + } + + pub fn input_lag(&self) -> Option { + let us = self + .live_kit + .as_ref()? + .input_lag_us + .as_ref()? + .load(std::sync::atomic::Ordering::Relaxed); + if us > 0 { + Some(Duration::from_micros(us)) + } else { + None + } + } + + pub fn diagnostics(&self) -> Option<&Entity> { + self.diagnostics.as_ref() + } + + pub fn connection_quality(&self) -> livekit::ConnectionQuality { + self.live_kit + .as_ref() + .map(|lk| lk.room.local_participant().connection_quality()) + .unwrap_or(livekit::ConnectionQuality::Lost) + } + pub fn status(&self) -> RoomStatus { self.status } @@ -1383,7 +1425,7 @@ impl Room { }; match publication { - Ok((publication, stream)) => { + Ok((publication, stream, input_lag_us)) => { if canceled { cx.spawn(async move |_, cx| { room.unpublish_local_track(publication.sid(), cx).await @@ -1393,6 +1435,7 @@ impl Room { if live_kit.muted_by_user || live_kit.deafened { publication.mute(cx); } + live_kit.input_lag_us = Some(input_lag_us); live_kit.microphone_track = LocalTrack::Published { track_publication: publication, _stream: Box::new(stream), @@ -1623,6 +1666,7 @@ fn spawn_room_connection( livekit::Room::connect(connection_info.server_url, connection_info.token, cx) .await?; + let weak_room = this.clone(); this.update(cx, |this, cx| { let _handle_updates = cx.spawn(async move |this, cx| { while let Some(event) = events.next().await { @@ -1642,12 +1686,14 @@ fn spawn_room_connection( room: Rc::new(room), screen_track: LocalTrack::None, microphone_track: LocalTrack::None, + input_lag_us: None, next_publish_id: 0, muted_by_user, deafened: false, speaking: false, _handle_updates, }); + this.diagnostics = Some(cx.new(|cx| CallDiagnostics::new(weak_room, cx))); if !muted_by_user && this.can_use_microphone() { this.share_microphone(cx) @@ -1665,6 +1711,9 @@ struct LiveKitRoom { room: Rc, screen_track: LocalTrack, microphone_track: LocalTrack, + /// Shared atomic storing the most recent input lag measurement in microseconds. + /// Written by the audio capture/transmit pipeline, read here for diagnostics. + input_lag_us: Option>, /// Tracks whether we're currently in a muted state due to auto-mute from deafening or manual mute performed by user. muted_by_user: bool, deafened: bool, @@ -1681,6 +1730,7 @@ impl LiveKitRoom { } = mem::replace(&mut self.microphone_track, LocalTrack::None) { tracks_to_unpublish.push(track_publication.sid()); + self.input_lag_us = None; cx.notify(); } diff --git a/crates/collab/tests/integration/integration_tests.rs b/crates/collab/tests/integration/integration_tests.rs index 3bad9c82c26392a935f67efc578b5d293b2cab3d..8c817c1fc7cc9bb7d33c01ba467d13c971453ac3 100644 --- a/crates/collab/tests/integration/integration_tests.rs +++ b/crates/collab/tests/integration/integration_tests.rs @@ -1787,6 +1787,7 @@ async fn test_project_reconnect( // While disconnected, close project 3 cx_a.update(|_| drop(project_a3)); + executor.run_until_parked(); // Client B reconnects. They re-join the room and the remaining shared project. server.allow_connections(); diff --git a/crates/collab_ui/Cargo.toml b/crates/collab_ui/Cargo.toml index 0ac413d1863dbbcdbcd81ad2bb3907f7a370c866..498f3f0bd76e002797389a279a17849448e6e873 100644 --- a/crates/collab_ui/Cargo.toml +++ b/crates/collab_ui/Cargo.toml @@ -40,6 +40,7 @@ editor.workspace = true futures.workspace = true fuzzy.workspace = true gpui.workspace = true +livekit_client.workspace = true log.workspace = true menu.workspace = true notifications.workspace = true @@ -59,6 +60,7 @@ title_bar.workspace = true ui.workspace = true util.workspace = true workspace.workspace = true +zed_actions.workspace = true [dev-dependencies] call = { workspace = true, features = ["test-support"] } diff --git a/crates/collab_ui/src/call_stats_modal.rs b/crates/collab_ui/src/call_stats_modal.rs new file mode 100644 index 0000000000000000000000000000000000000000..cfbdf82c3bb95854e90aa12c1ec313fa82527c33 --- /dev/null +++ b/crates/collab_ui/src/call_stats_modal.rs @@ -0,0 +1,270 @@ +use call::{ActiveCall, Room, room}; +use gpui::{ + DismissEvent, Entity, EventEmitter, FocusHandle, Focusable, FontWeight, Render, Subscription, + Window, +}; +use livekit_client::ConnectionQuality; +use ui::prelude::*; +use workspace::{ModalView, Workspace}; +use zed_actions::ShowCallStats; + +pub fn init(cx: &mut App) { + cx.observe_new(|workspace: &mut Workspace, _, _cx| { + workspace.register_action(|workspace, _: &ShowCallStats, window, cx| { + workspace.toggle_modal(window, cx, |_window, cx| CallStatsModal::new(cx)); + }); + }) + .detach(); +} + +pub struct CallStatsModal { + focus_handle: FocusHandle, + _active_call_subscription: Option, + _diagnostics_subscription: Option, +} + +impl CallStatsModal { + fn new(cx: &mut Context) -> Self { + let mut this = Self { + focus_handle: cx.focus_handle(), + _active_call_subscription: None, + _diagnostics_subscription: None, + }; + + if let Some(active_call) = ActiveCall::try_global(cx) { + this._active_call_subscription = + Some(cx.subscribe(&active_call, Self::handle_call_event)); + this.observe_diagnostics(cx); + } + + this + } + + fn observe_diagnostics(&mut self, cx: &mut Context) { + let diagnostics = active_room(cx).and_then(|room| room.read(cx).diagnostics().cloned()); + + if let Some(diagnostics) = diagnostics { + self._diagnostics_subscription = Some(cx.observe(&diagnostics, |_, _, cx| cx.notify())); + } else { + self._diagnostics_subscription = None; + } + } + + fn handle_call_event( + &mut self, + _: Entity, + event: &room::Event, + cx: &mut Context, + ) { + match event { + room::Event::RoomJoined { .. } => { + self.observe_diagnostics(cx); + } + room::Event::RoomLeft { .. } => { + self._diagnostics_subscription = None; + cx.notify(); + } + _ => {} + } + } + + fn dismiss(&mut self, _: &menu::Cancel, _: &mut Window, cx: &mut Context) { + cx.emit(DismissEvent); + } +} + +fn active_room(cx: &App) -> Option> { + ActiveCall::try_global(cx)?.read(cx).room().cloned() +} + +fn quality_label(quality: Option) -> (&'static str, Color) { + match quality { + Some(ConnectionQuality::Excellent) => ("Excellent", Color::Success), + Some(ConnectionQuality::Good) => ("Good", Color::Success), + Some(ConnectionQuality::Poor) => ("Poor", Color::Warning), + Some(ConnectionQuality::Lost) => ("Lost", Color::Error), + None => ("—", Color::Muted), + } +} + +fn metric_rating(label: &str, value_ms: f64) -> (&'static str, Color) { + match label { + "Latency" => { + if value_ms < 100.0 { + ("Normal", Color::Success) + } else if value_ms < 300.0 { + ("High", Color::Warning) + } else { + ("Poor", Color::Error) + } + } + "Jitter" => { + if value_ms < 30.0 { + ("Normal", Color::Success) + } else if value_ms < 75.0 { + ("High", Color::Warning) + } else { + ("Poor", Color::Error) + } + } + _ => ("Normal", Color::Success), + } +} + +fn input_lag_rating(value_ms: f64) -> (&'static str, Color) { + if value_ms < 20.0 { + ("Normal", Color::Success) + } else if value_ms < 50.0 { + ("High", Color::Warning) + } else { + ("Poor", Color::Error) + } +} + +fn packet_loss_rating(loss_pct: f64) -> (&'static str, Color) { + if loss_pct < 1.0 { + ("Normal", Color::Success) + } else if loss_pct < 5.0 { + ("High", Color::Warning) + } else { + ("Poor", Color::Error) + } +} + +impl EventEmitter for CallStatsModal {} +impl ModalView for CallStatsModal {} + +impl Focusable for CallStatsModal { + fn focus_handle(&self, _cx: &App) -> FocusHandle { + self.focus_handle.clone() + } +} + +impl Render for CallStatsModal { + fn render(&mut self, _window: &mut Window, cx: &mut Context) -> impl IntoElement { + let room = active_room(cx); + let is_connected = room.is_some(); + let stats = room + .and_then(|room| { + let diagnostics = room.read(cx).diagnostics()?; + Some(diagnostics.read(cx).stats().clone()) + }) + .unwrap_or_default(); + + let (quality_text, quality_color) = quality_label(stats.connection_quality); + + v_flex() + .key_context("CallStatsModal") + .on_action(cx.listener(Self::dismiss)) + .track_focus(&self.focus_handle) + .elevation_3(cx) + .w(rems(24.)) + .p_4() + .gap_3() + .child( + h_flex() + .justify_between() + .child(Label::new("Call Diagnostics").size(LabelSize::Large)) + .child( + Label::new(quality_text) + .size(LabelSize::Large) + .color(quality_color), + ), + ) + .when(!is_connected, |this| { + this.child( + h_flex() + .justify_center() + .py_4() + .child(Label::new("Not in a call").color(Color::Muted)), + ) + }) + .when(is_connected, |this| { + this.child( + v_flex() + .gap_1() + .child( + h_flex() + .gap_2() + .child(Label::new("Network").weight(FontWeight::SEMIBOLD)), + ) + .child(self.render_metric_row( + "Latency", + "Time for data to travel to the server", + stats.latency_ms, + |v| format!("{:.0}ms", v), + |v| metric_rating("Latency", v), + )) + .child(self.render_metric_row( + "Jitter", + "Variance or fluctuation in latency", + stats.jitter_ms, + |v| format!("{:.0}ms", v), + |v| metric_rating("Jitter", v), + )) + .child(self.render_metric_row( + "Packet loss", + "Amount of data lost during transfer", + stats.packet_loss_pct, + |v| format!("{:.1}%", v), + |v| packet_loss_rating(v), + )) + .child(self.render_metric_row( + "Input lag", + "Delay from audio capture to WebRTC", + stats.input_lag.map(|d| d.as_secs_f64() * 1000.0), + |v| format!("{:.1}ms", v), + |v| input_lag_rating(v), + )), + ) + }) + } +} + +impl CallStatsModal { + fn render_metric_row( + &self, + title: &str, + description: &str, + value: Option, + format_value: impl Fn(f64) -> String, + rate: impl Fn(f64) -> (&'static str, Color), + ) -> impl IntoElement { + let (rating_text, rating_color, value_text) = match value { + Some(v) => { + let (rt, rc) = rate(v); + (rt, rc, format_value(v)) + } + None => ("—", Color::Muted, "—".to_string()), + }; + + h_flex() + .px_2() + .py_1() + .rounded_md() + .justify_between() + .child( + v_flex() + .child(Label::new(title.to_string()).size(LabelSize::Default)) + .child( + Label::new(description.to_string()) + .size(LabelSize::Small) + .color(Color::Muted), + ), + ) + .child( + v_flex() + .items_end() + .child( + Label::new(rating_text) + .size(LabelSize::Default) + .color(rating_color), + ) + .child( + Label::new(value_text) + .size(LabelSize::Small) + .color(Color::Muted), + ), + ) + } +} diff --git a/crates/collab_ui/src/collab_ui.rs b/crates/collab_ui/src/collab_ui.rs index 7155defc2c06e01f6c7465b60eccfd44adfc420b..107b2ffa7f625d98dd9c54bb6bbf75df8b72d020 100644 --- a/crates/collab_ui/src/collab_ui.rs +++ b/crates/collab_ui/src/collab_ui.rs @@ -1,3 +1,4 @@ +mod call_stats_modal; pub mod channel_view; pub mod collab_panel; pub mod notification_panel; @@ -18,6 +19,7 @@ use workspace::AppState; // Another comment, nice. pub fn init(app_state: &Arc, cx: &mut App) { + call_stats_modal::init(cx); channel_view::init(cx); collab_panel::init(cx); notification_panel::init(cx); diff --git a/crates/icons/src/icons.rs b/crates/icons/src/icons.rs index ca12f1a90383f1aa682531f201dc21a8065fe3f5..d1450abdac49b34f240e375e9a4318d186c1f1da 100644 --- a/crates/icons/src/icons.rs +++ b/crates/icons/src/icons.rs @@ -219,6 +219,9 @@ pub enum IconName { Settings, ShieldCheck, Shift, + SignalHigh, + SignalLow, + SignalMedium, Slash, Sliders, Space, diff --git a/crates/livekit_client/examples/test_app.rs b/crates/livekit_client/examples/test_app.rs index 06b9a1402a5c313117dfe559d1f293b6393c6172..959944d0830bc8270c4f0b85896cbfc9351e5197 100644 --- a/crates/livekit_client/examples/test_app.rs +++ b/crates/livekit_client/examples/test_app.rs @@ -255,7 +255,7 @@ impl LivekitWindow { } else { let room = self.room.clone(); cx.spawn_in(window, async move |this, cx| { - let (publication, stream) = room + let (publication, stream, _input_lag_us) = room .publish_local_microphone_track("test_user".to_string(), false, cx) .await .unwrap(); diff --git a/crates/livekit_client/src/lib.rs b/crates/livekit_client/src/lib.rs index 352776cf6bbe02381957a197eca9a64fff094892..aa4831562c2f70cf505d1042f1c9446504dce9fd 100644 --- a/crates/livekit_client/src/lib.rs +++ b/crates/livekit_client/src/lib.rs @@ -67,6 +67,14 @@ pub enum Participant { Remote(RemoteParticipant), } +#[derive(Copy, Clone, Debug, Eq, PartialEq, PartialOrd, Ord)] +pub enum ConnectionQuality { + Excellent, + Good, + Poor, + Lost, +} + #[derive(Debug, Clone)] pub enum TrackPublication { Local(LocalTrackPublication), @@ -179,6 +187,10 @@ pub enum RoomEvent { ActiveSpeakersChanged { speakers: Vec, }, + ConnectionQualityChanged { + participant: Participant, + quality: ConnectionQuality, + }, ConnectionStateChanged(ConnectionState), Connected { participants_with_tracks: Vec<(RemoteParticipant, Vec)>, diff --git a/crates/livekit_client/src/livekit_client.rs b/crates/livekit_client/src/livekit_client.rs index 863cf0dc527300f1e85df6867d99e367b5c7fa15..31a13e64e7dff17f0d1a36662761cdd2c51de4e7 100644 --- a/crates/livekit_client/src/livekit_client.rs +++ b/crates/livekit_client/src/livekit_client.rs @@ -7,13 +7,16 @@ use gpui_tokio::Tokio; use log::info; use playback::capture_local_video_track; use settings::Settings; +use std::sync::{Arc, atomic::AtomicU64}; mod playback; use crate::{ - LocalTrack, Participant, RemoteTrack, RoomEvent, TrackPublication, + ConnectionQuality, LocalTrack, Participant, RemoteTrack, RoomEvent, TrackPublication, livekit_client::playback::Speaker, }; +pub use livekit::SessionStats; +pub use livekit::webrtc::stats::RtcStats; pub use playback::AudioStream; pub(crate) use playback::{RemoteVideoFrame, play_remote_video_track}; @@ -107,8 +110,8 @@ impl Room { user_name: String, is_staff: bool, cx: &mut AsyncApp, - ) -> Result<(LocalTrackPublication, playback::AudioStream)> { - let (track, stream) = self + ) -> Result<(LocalTrackPublication, playback::AudioStream, Arc)> { + let (track, stream, input_lag_us) = self .playback .capture_local_microphone_track(user_name, is_staff, &cx)?; let publication = self @@ -123,7 +126,7 @@ impl Room { ) .await?; - Ok((publication, stream)) + Ok((publication, stream, input_lag_us)) } pub async fn unpublish_local_track( @@ -158,9 +161,32 @@ impl Room { Err(anyhow!("Client version too old to play audio in call")) } } + + pub async fn get_stats(&self) -> Result { + self.room.get_stats().await.map_err(anyhow::Error::from) + } + + /// Returns a `Task` that fetches room stats on the Tokio runtime. + /// + /// LiveKit's SDK is Tokio-based, so the stats fetch must run within + /// a Tokio context rather than on GPUI's smol-based background executor. + pub fn stats_task(&self, cx: &impl gpui::AppContext) -> Task> { + let inner = self.room.clone(); + Tokio::spawn_result(cx, async move { + inner.get_stats().await.map_err(anyhow::Error::from) + }) + } } impl LocalParticipant { + pub fn connection_quality(&self) -> ConnectionQuality { + connection_quality_from_livekit(self.0.connection_quality()) + } + + pub fn audio_level(&self) -> f32 { + self.0.audio_level() + } + pub async fn publish_screenshare_track( &self, source: &dyn ScreenCaptureSource, @@ -234,6 +260,14 @@ impl LocalTrackPublication { } impl RemoteParticipant { + pub fn connection_quality(&self) -> ConnectionQuality { + connection_quality_from_livekit(self.0.connection_quality()) + } + + pub fn audio_level(&self) -> f32 { + self.0.audio_level() + } + pub fn identity(&self) -> ParticipantIdentity { ParticipantIdentity(self.0.identity().0) } @@ -297,6 +331,31 @@ impl Participant { } } } + + pub fn connection_quality(&self) -> ConnectionQuality { + match self { + Participant::Local(local_participant) => local_participant.connection_quality(), + Participant::Remote(remote_participant) => remote_participant.connection_quality(), + } + } + + pub fn audio_level(&self) -> f32 { + match self { + Participant::Local(local_participant) => local_participant.audio_level(), + Participant::Remote(remote_participant) => remote_participant.audio_level(), + } + } +} + +fn connection_quality_from_livekit( + quality: livekit::prelude::ConnectionQuality, +) -> ConnectionQuality { + match quality { + livekit::prelude::ConnectionQuality::Excellent => ConnectionQuality::Excellent, + livekit::prelude::ConnectionQuality::Good => ConnectionQuality::Good, + livekit::prelude::ConnectionQuality::Poor => ConnectionQuality::Poor, + livekit::prelude::ConnectionQuality::Lost => ConnectionQuality::Lost, + } } fn participant_from_livekit(participant: livekit::participant::Participant) -> Participant { @@ -474,6 +533,13 @@ fn room_event_from_livekit(event: livekit::RoomEvent) -> Option { }, livekit::RoomEvent::Reconnecting => RoomEvent::Reconnecting, livekit::RoomEvent::Reconnected => RoomEvent::Reconnected, + livekit::RoomEvent::ConnectionQualityChanged { + quality, + participant, + } => RoomEvent::ConnectionQualityChanged { + participant: participant_from_livekit(participant), + quality: connection_quality_from_livekit(quality), + }, _ => { log::trace!("dropping livekit event: {:?}", event); return None; diff --git a/crates/livekit_client/src/livekit_client/playback.rs b/crates/livekit_client/src/livekit_client/playback.rs index 4b3c55109a297c888ac64d5742a1df91163d77e0..4801981a1d9275e561a101c88b4204d91e09ed28 100644 --- a/crates/livekit_client/src/livekit_client/playback.rs +++ b/crates/livekit_client/src/livekit_client/playback.rs @@ -27,11 +27,16 @@ use serde::{Deserialize, Serialize}; use settings::Settings; use std::cell::RefCell; use std::sync::Weak; -use std::sync::atomic::{AtomicBool, AtomicI32, Ordering}; -use std::time::Duration; +use std::sync::atomic::{AtomicBool, AtomicI32, AtomicU64, Ordering}; +use std::time::{Duration, Instant}; use std::{borrow::Cow, collections::VecDeque, sync::Arc}; use util::{ResultExt as _, maybe}; +struct TimestampedFrame { + frame: AudioFrame<'static>, + captured_at: Instant, +} + mod source; pub(crate) struct AudioStack { @@ -162,7 +167,7 @@ impl AudioStack { user_name: String, is_staff: bool, cx: &AsyncApp, - ) -> Result<(crate::LocalAudioTrack, AudioStream)> { + ) -> Result<(crate::LocalAudioTrack, AudioStream, Arc)> { let legacy_audio_compatible = AudioSettings::try_read_global(cx, |setting| setting.legacy_audio_compatible) .unwrap_or(true); @@ -202,11 +207,15 @@ impl AudioStack { let apm = self.apm.clone(); - let (frame_tx, mut frame_rx) = futures::channel::mpsc::channel(1); + let input_lag_us = Arc::new(AtomicU64::new(0)); + let (frame_tx, mut frame_rx) = futures::channel::mpsc::channel::(1); let transmit_task = self.executor.spawn_with_priority(Priority::RealtimeAudio, { + let input_lag_us = input_lag_us.clone(); async move { - while let Some(frame) = frame_rx.next().await { - source.capture_frame(&frame).await.log_err(); + while let Some(timestamped) = frame_rx.next().await { + let lag = timestamped.captured_at.elapsed(); + input_lag_us.store(lag.as_micros() as u64, Ordering::Relaxed); + source.capture_frame(×tamped.frame).await.log_err(); } } }); @@ -251,6 +260,7 @@ impl AudioStack { AudioStream::Output { _drop: Box::new(on_drop), }, + input_lag_us, )) } @@ -345,7 +355,7 @@ impl AudioStack { async fn capture_input( executor: BackgroundExecutor, apm: Arc>, - frame_tx: Sender>, + frame_tx: Sender, sample_rate: u32, num_channels: u32, input_audio_device: Option, @@ -376,6 +386,7 @@ impl AudioStack { &config.config(), config.sample_format(), move |data, _: &_| { + let captured_at = Instant::now(); let data = crate::get_sample_data(config.sample_format(), data) .log_err(); let Some(data) = data else { @@ -409,11 +420,14 @@ impl AudioStack { .log_err(); buf.clear(); frame_tx - .try_send(AudioFrame { - data: Cow::Owned(sampled), - sample_rate, - num_channels, - samples_per_channel: sample_rate / 100, + .try_send(TimestampedFrame { + frame: AudioFrame { + data: Cow::Owned(sampled), + sample_rate, + num_channels, + samples_per_channel: sample_rate / 100, + }, + captured_at, }) .ok(); } @@ -446,7 +460,7 @@ pub struct Speaker { pub sends_legacy_audio: bool, } -fn send_to_livekit(mut frame_tx: Sender>, mut microphone: impl Source) { +fn send_to_livekit(mut frame_tx: Sender, mut microphone: impl Source) { use cpal::Sample; let sample_rate = microphone.sample_rate().get(); let num_channels = microphone.channels().get() as u32; @@ -459,11 +473,14 @@ fn send_to_livekit(mut frame_tx: Sender>, mut microphone: im .map(|s| s.to_sample()) .collect(); - match frame_tx.try_send(AudioFrame { - sample_rate, - num_channels, - samples_per_channel: sampled.len() as u32 / num_channels, - data: Cow::Owned(sampled), + match frame_tx.try_send(TimestampedFrame { + frame: AudioFrame { + sample_rate, + num_channels, + samples_per_channel: sampled.len() as u32 / num_channels, + data: Cow::Owned(sampled), + }, + captured_at: Instant::now(), }) { Ok(_) => {} Err(err) => { diff --git a/crates/livekit_client/src/mock_client.rs b/crates/livekit_client/src/mock_client.rs index 4c19cb4d57695f86b98c299646a376edb64414b7..d1cd399d256d6b826de349d9fe533f45990c5f04 100644 --- a/crates/livekit_client/src/mock_client.rs +++ b/crates/livekit_client/src/mock_client.rs @@ -15,7 +15,7 @@ pub type LocalTrackPublication = publication::LocalTrackPublication; pub type LocalParticipant = participant::LocalParticipant; pub type Room = test::Room; -pub use test::{ConnectionState, ParticipantIdentity, TrackSid}; +pub use test::{ConnectionState, ParticipantIdentity, RtcStats, SessionStats, TrackSid}; pub struct AudioStream {} diff --git a/crates/livekit_client/src/mock_client/participant.rs b/crates/livekit_client/src/mock_client/participant.rs index 033808cbb54189fa2a7841264097751da4deb027..be8cd7f2d38ebcda00dc58300ef98adb6b7340f9 100644 --- a/crates/livekit_client/src/mock_client/participant.rs +++ b/crates/livekit_client/src/mock_client/participant.rs @@ -1,6 +1,6 @@ use crate::{ - AudioStream, LocalAudioTrack, LocalTrackPublication, LocalVideoTrack, Participant, - ParticipantIdentity, RemoteTrack, RemoteTrackPublication, TrackSid, + AudioStream, ConnectionQuality, LocalAudioTrack, LocalTrackPublication, LocalVideoTrack, + Participant, ParticipantIdentity, RemoteTrack, RemoteTrackPublication, TrackSid, test::{Room, WeakRoom}, }; use anyhow::Result; @@ -8,6 +8,7 @@ use collections::HashMap; use gpui::{ AsyncApp, DevicePixels, ScreenCaptureSource, ScreenCaptureStream, SourceMetadata, size, }; +use std::sync::{Arc, atomic::AtomicU64}; #[derive(Clone, Debug)] pub struct LocalParticipant { @@ -28,9 +29,31 @@ impl Participant { Participant::Remote(participant) => participant.identity.clone(), } } + + pub fn connection_quality(&self) -> ConnectionQuality { + match self { + Participant::Local(p) => p.connection_quality(), + Participant::Remote(p) => p.connection_quality(), + } + } + + pub fn audio_level(&self) -> f32 { + match self { + Participant::Local(p) => p.audio_level(), + Participant::Remote(p) => p.audio_level(), + } + } } impl LocalParticipant { + pub fn connection_quality(&self) -> ConnectionQuality { + ConnectionQuality::Excellent + } + + pub fn audio_level(&self) -> f32 { + 0.0 + } + pub async fn unpublish_track(&self, track: TrackSid, _cx: &AsyncApp) -> Result<()> { self.room .test_server() @@ -41,7 +64,7 @@ impl LocalParticipant { pub(crate) async fn publish_microphone_track( &self, _cx: &AsyncApp, - ) -> Result<(LocalTrackPublication, AudioStream)> { + ) -> Result<(LocalTrackPublication, AudioStream, Arc)> { let this = self.clone(); let server = this.room.test_server(); let sid = server @@ -54,6 +77,7 @@ impl LocalParticipant { sid, }, AudioStream {}, + Arc::new(AtomicU64::new(0)), )) } @@ -78,6 +102,14 @@ impl LocalParticipant { } impl RemoteParticipant { + pub fn connection_quality(&self) -> ConnectionQuality { + ConnectionQuality::Excellent + } + + pub fn audio_level(&self) -> f32 { + 0.0 + } + pub fn track_publications(&self) -> HashMap { if let Some(room) = self.room.upgrade() { let server = room.test_server(); diff --git a/crates/livekit_client/src/test.rs b/crates/livekit_client/src/test.rs index a8222b9a18b719f59ccaebdff6e08b7ee4edef67..4b5efe0aafbe5c27be0de973bc05e9901dd032ae 100644 --- a/crates/livekit_client/src/test.rs +++ b/crates/livekit_client/src/test.rs @@ -10,7 +10,7 @@ use parking_lot::Mutex; use postage::{mpsc, sink::Sink}; use std::sync::{ Arc, Weak, - atomic::{AtomicBool, Ordering::SeqCst}, + atomic::{AtomicBool, AtomicU64, Ordering::SeqCst}, }; #[derive(Clone, Debug, Eq, Hash, PartialEq, PartialOrd, Ord)] @@ -40,6 +40,15 @@ pub enum ConnectionState { Disconnected, } +#[derive(Clone, Debug, Default)] +pub struct SessionStats { + pub publisher_stats: Vec, + pub subscriber_stats: Vec, +} + +#[derive(Clone, Debug)] +pub enum RtcStats {} + static SERVERS: Mutex>> = Mutex::new(BTreeMap::new()); pub struct TestServer { @@ -739,9 +748,17 @@ impl Room { _track_name: String, _is_staff: bool, cx: &mut AsyncApp, - ) -> Result<(LocalTrackPublication, AudioStream)> { + ) -> Result<(LocalTrackPublication, AudioStream, Arc)> { self.local_participant().publish_microphone_track(cx).await } + + pub async fn get_stats(&self) -> Result { + Ok(SessionStats::default()) + } + + pub fn stats_task(&self, _cx: &impl gpui::AppContext) -> gpui::Task> { + gpui::Task::ready(Ok(SessionStats::default())) + } } impl Drop for RoomState { diff --git a/crates/title_bar/Cargo.toml b/crates/title_bar/Cargo.toml index b5c10835c6bf85ea24db1ff9bad5abbbf3b517ee..d290b4c767b7b012561ee0ec6c2e769b653436df 100644 --- a/crates/title_bar/Cargo.toml +++ b/crates/title_bar/Cargo.toml @@ -41,6 +41,8 @@ db.workspace = true feature_flags.workspace = true git_ui.workspace = true gpui = { workspace = true, features = ["screen-capture"] } +icons.workspace = true +livekit_client.workspace = true notifications.workspace = true project.workspace = true recent_projects.workspace = true diff --git a/crates/title_bar/src/collab.rs b/crates/title_bar/src/collab.rs index 0f4d5977947fa27cf3ca5811dbf883c4dbd9df94..027d53a3dff4fb285f1e6a4a745ecd93bd1a3857 100644 --- a/crates/title_bar/src/collab.rs +++ b/crates/title_bar/src/collab.rs @@ -9,6 +9,8 @@ use gpui::{ canvas, point, }; use gpui::{App, Task, Window}; +use icons::IconName; +use livekit_client::ConnectionQuality; use project::WorktreeSettings; use rpc::proto::{self}; use settings::{Settings as _, SettingsLocation}; @@ -19,9 +21,17 @@ use ui::{ }; use util::rel_path::RelPath; use workspace::{ParticipantLocation, notifications::DetachAndPromptErr}; +use zed_actions::ShowCallStats; use crate::TitleBar; +fn format_stat(value: Option, format: impl Fn(f64) -> String) -> String { + match value { + Some(v) => format(v), + None => "—".to_string(), + } +} + pub fn toggle_screen_sharing( screen: anyhow::Result>>, window: &mut Window, @@ -347,6 +357,11 @@ impl TitleBar { let can_share_projects = room.can_share_projects(); let screen_sharing_supported = cx.is_screen_capture_supported(); + let stats = room + .diagnostics() + .map(|d| d.read(cx).stats().clone()) + .unwrap_or_default(); + let channel_store = ChannelStore::global(cx); let channel = room .channel_id() @@ -354,6 +369,45 @@ impl TitleBar { let mut children = Vec::new(); + let effective_quality = stats.effective_quality.unwrap_or(ConnectionQuality::Lost); + let (signal_icon, signal_color, quality_label) = match effective_quality { + ConnectionQuality::Excellent => { + (IconName::SignalHigh, Some(Color::Success), "Excellent") + } + ConnectionQuality::Good => (IconName::SignalHigh, None, "Good"), + ConnectionQuality::Poor => (IconName::SignalMedium, Some(Color::Warning), "Poor"), + ConnectionQuality::Lost => (IconName::SignalLow, Some(Color::Error), "Lost"), + }; + let quality_label: SharedString = quality_label.into(); + children.push( + IconButton::new("call-quality", signal_icon) + .style(ButtonStyle::Subtle) + .icon_size(IconSize::Small) + .when_some(signal_color, |button, color| button.icon_color(color)) + .tooltip(move |_window, cx| { + let quality_label = quality_label.clone(); + let latency = format_stat(stats.latency_ms, |v| format!("{:.0}ms", v)); + let jitter = format_stat(stats.jitter_ms, |v| format!("{:.0}ms", v)); + let packet_loss = format_stat(stats.packet_loss_pct, |v| format!("{:.1}%", v)); + let input_lag = + format_stat(stats.input_lag.map(|d| d.as_secs_f64() * 1000.0), |v| { + format!("{:.1}ms", v) + }); + + Tooltip::with_meta( + format!("Connection: {quality_label}"), + Some(&ShowCallStats), + format!( + "Latency: {latency} · Jitter: {jitter} · Loss: {packet_loss} · Input lag: {input_lag}", + ), + cx, + ) + }) + .on_click(move |_, window, cx| { + window.dispatch_action(Box::new(ShowCallStats), cx); + }) + .into_any_element(), + ); children.push( h_flex() .gap_1() diff --git a/crates/title_bar/src/title_bar.rs b/crates/title_bar/src/title_bar.rs index 5622604aa5aea2c955be2773cb7b962b13fe3906..6e96b52d6b78bccea6e1f7e4825508f0aa25e60a 100644 --- a/crates/title_bar/src/title_bar.rs +++ b/crates/title_bar/src/title_bar.rs @@ -158,6 +158,7 @@ pub struct TitleBar { banner: Entity, update_version: Entity, screen_share_popover_handle: PopoverMenuHandle, + _diagnostics_subscription: Option, } impl Render for TitleBar { @@ -400,7 +401,7 @@ impl TitleBar { .detach(); } - Self { + let mut this = Self { platform_titlebar, application_menu, workspace: workspace.weak_handle(), @@ -412,7 +413,12 @@ impl TitleBar { banner, update_version, screen_share_popover_handle: PopoverMenuHandle::default(), - } + _diagnostics_subscription: None, + }; + + this.observe_diagnostics(cx); + + this } fn worktree_count(&self, cx: &App) -> usize { @@ -956,9 +962,23 @@ impl TitleBar { } fn active_call_changed(&mut self, cx: &mut Context) { + self.observe_diagnostics(cx); cx.notify(); } + fn observe_diagnostics(&mut self, cx: &mut Context) { + let diagnostics = ActiveCall::global(cx) + .read(cx) + .room() + .and_then(|room| room.read(cx).diagnostics().cloned()); + + if let Some(diagnostics) = diagnostics { + self._diagnostics_subscription = Some(cx.observe(&diagnostics, |_, _, cx| cx.notify())); + } else { + self._diagnostics_subscription = None; + } + } + fn share_project(&mut self, cx: &mut Context) { let active_call = ActiveCall::global(cx); let project = self.project.clone(); diff --git a/crates/zed_actions/src/lib.rs b/crates/zed_actions/src/lib.rs index 8edc80b4ec7816cd9e2ae2d7b995dd74b8128a9a..f01361ecea54561fd30e6dbe8aa01cc99b725a43 100644 --- a/crates/zed_actions/src/lib.rs +++ b/crates/zed_actions/src/lib.rs @@ -110,6 +110,12 @@ pub struct Extensions { #[serde(deny_unknown_fields)] pub struct AcpRegistry; +/// Show call diagnostics and connection quality statistics. +#[derive(PartialEq, Clone, Default, Debug, Deserialize, JsonSchema, Action)] +#[action(namespace = collab)] +#[serde(deny_unknown_fields)] +pub struct ShowCallStats; + /// Decreases the font size in the editor buffer. #[derive(PartialEq, Clone, Default, Debug, Deserialize, JsonSchema, Action)] #[action(namespace = zed)]