From 5713dbfd39a200cd30400091e37717476468dd63 Mon Sep 17 00:00:00 2001 From: Jakub Konka Date: Tue, 10 Mar 2026 12:23:08 +0100 Subject: [PATCH] livekit_client: Add graceful recovery for host audio latency Co-authored-by: Piotr Osiewicz --- .../src/livekit_client/playback.rs | 87 +++++++++++++++---- 1 file changed, 72 insertions(+), 15 deletions(-) diff --git a/crates/livekit_client/src/livekit_client/playback.rs b/crates/livekit_client/src/livekit_client/playback.rs index 4933b05fc51592535c1f729ae8038a62103511ba..07ccc9ac625d35f488c13f4e5bd09d3d89cb5b06 100644 --- a/crates/livekit_client/src/livekit_client/playback.rs +++ b/crates/livekit_client/src/livekit_client/playback.rs @@ -97,11 +97,13 @@ impl AudioStack { let output_task = self.start_output(output_audio_device); let next_ssrc = self.next_ssrc.fetch_add(1, Ordering::Relaxed); + let sample_rate = LEGACY_SAMPLE_RATE.get(); + let num_channels = LEGACY_CHANNEL_COUNT.get() as u32; let source = AudioMixerSource { ssrc: next_ssrc, - sample_rate: LEGACY_SAMPLE_RATE.get(), - num_channels: LEGACY_CHANNEL_COUNT.get() as u32, - buffer: Arc::default(), + sample_rate, + num_channels, + buffer: Arc::new(Mutex::new(AudioRingBuffer::new(sample_rate, num_channels))), }; self.mixer.lock().add_source(source.clone()); @@ -514,26 +516,81 @@ pub(crate) async fn capture_local_video_track( )) } +struct AudioRingBuffer { + data: VecDeque, + capacity: usize, + target_fill: usize, + frame_size: usize, + consecutive_full_ticks: u32, +} + +impl AudioRingBuffer { + fn new(sample_rate: u32, num_channels: u32) -> Self { + let frame_size = (sample_rate * num_channels / 100) as usize; + let capacity = frame_size * 10; + let target_fill = frame_size * 2; + Self { + data: VecDeque::with_capacity(capacity), + capacity, + target_fill, + frame_size, + consecutive_full_ticks: 0, + } + } + + fn push(&mut self, samples: &[i16]) { + self.data.extend(samples.iter().copied()); + + if self.data.len() > self.capacity { + let overflow = self.data.len() - self.capacity; + self.data.drain(..overflow); + } + + if self.data.len() >= self.capacity { + self.consecutive_full_ticks += 1; + } else { + self.consecutive_full_ticks = 0; + } + + if self.consecutive_full_ticks >= 10 { + let drain_to = self.data.len().saturating_sub(self.target_fill); + if drain_to > 0 { + self.data.drain(..drain_to); + } + self.consecutive_full_ticks = 0; + } + } + + fn pop_frame(&mut self) -> Option> { + if self.data.len() < self.frame_size { + return None; + } + + let frame: Vec = self.data.drain(..self.frame_size).collect(); + + if self.data.len() > self.target_fill { + let excess_samples = self.data.len() - self.target_fill; + let excess_frames = excess_samples / self.frame_size; + let extra_drain = excess_frames.clamp(1, self.frame_size / 10); + let drain_amount = extra_drain.min(self.data.len()); + self.data.drain(..drain_amount); + } + + Some(frame) + } +} + #[derive(Clone)] struct AudioMixerSource { ssrc: i32, sample_rate: u32, num_channels: u32, - buffer: Arc>>>, + buffer: Arc>, } impl AudioMixerSource { fn receive(&self, frame: AudioFrame) { - assert_eq!( - frame.data.len() as u32, - self.sample_rate * self.num_channels / 100 - ); - - let mut buffer = self.buffer.lock(); - buffer.push_back(frame.data.to_vec()); - while buffer.len() > 10 { - buffer.pop_front(); - } + self.buffer.lock().push(&frame.data); } } @@ -548,7 +605,7 @@ impl libwebrtc::native::audio_mixer::AudioMixerSource for AudioMixerSource { fn get_audio_frame_with_info<'a>(&self, target_sample_rate: u32) -> Option> { assert_eq!(self.sample_rate, target_sample_rate); - let buf = self.buffer.lock().pop_front()?; + let buf = self.buffer.lock().pop_frame()?; Some(AudioFrame { data: Cow::Owned(buf), sample_rate: self.sample_rate,