livekit_client: Add graceful recovery for host audio latency

Jakub Konka and Piotr Osiewicz created

Co-authored-by: Piotr Osiewicz <piotr@zed.dev>

Change summary

crates/livekit_client/src/livekit_client/playback.rs | 87 +++++++++++--
1 file changed, 72 insertions(+), 15 deletions(-)

Detailed changes

crates/livekit_client/src/livekit_client/playback.rs 🔗

@@ -97,11 +97,13 @@ impl AudioStack {
         let output_task = self.start_output(output_audio_device);
 
         let next_ssrc = self.next_ssrc.fetch_add(1, Ordering::Relaxed);
+        let sample_rate = LEGACY_SAMPLE_RATE.get();
+        let num_channels = LEGACY_CHANNEL_COUNT.get() as u32;
         let source = AudioMixerSource {
             ssrc: next_ssrc,
-            sample_rate: LEGACY_SAMPLE_RATE.get(),
-            num_channels: LEGACY_CHANNEL_COUNT.get() as u32,
-            buffer: Arc::default(),
+            sample_rate,
+            num_channels,
+            buffer: Arc::new(Mutex::new(AudioRingBuffer::new(sample_rate, num_channels))),
         };
         self.mixer.lock().add_source(source.clone());
 
@@ -514,26 +516,81 @@ pub(crate) async fn capture_local_video_track(
     ))
 }
 
+struct AudioRingBuffer {
+    data: VecDeque<i16>,
+    capacity: usize,
+    target_fill: usize,
+    frame_size: usize,
+    consecutive_full_ticks: u32,
+}
+
+impl AudioRingBuffer {
+    fn new(sample_rate: u32, num_channels: u32) -> Self {
+        let frame_size = (sample_rate * num_channels / 100) as usize;
+        let capacity = frame_size * 10;
+        let target_fill = frame_size * 2;
+        Self {
+            data: VecDeque::with_capacity(capacity),
+            capacity,
+            target_fill,
+            frame_size,
+            consecutive_full_ticks: 0,
+        }
+    }
+
+    fn push(&mut self, samples: &[i16]) {
+        self.data.extend(samples.iter().copied());
+
+        if self.data.len() > self.capacity {
+            let overflow = self.data.len() - self.capacity;
+            self.data.drain(..overflow);
+        }
+
+        if self.data.len() >= self.capacity {
+            self.consecutive_full_ticks += 1;
+        } else {
+            self.consecutive_full_ticks = 0;
+        }
+
+        if self.consecutive_full_ticks >= 10 {
+            let drain_to = self.data.len().saturating_sub(self.target_fill);
+            if drain_to > 0 {
+                self.data.drain(..drain_to);
+            }
+            self.consecutive_full_ticks = 0;
+        }
+    }
+
+    fn pop_frame(&mut self) -> Option<Vec<i16>> {
+        if self.data.len() < self.frame_size {
+            return None;
+        }
+
+        let frame: Vec<i16> = self.data.drain(..self.frame_size).collect();
+
+        if self.data.len() > self.target_fill {
+            let excess_samples = self.data.len() - self.target_fill;
+            let excess_frames = excess_samples / self.frame_size;
+            let extra_drain = excess_frames.clamp(1, self.frame_size / 10);
+            let drain_amount = extra_drain.min(self.data.len());
+            self.data.drain(..drain_amount);
+        }
+
+        Some(frame)
+    }
+}
+
 #[derive(Clone)]
 struct AudioMixerSource {
     ssrc: i32,
     sample_rate: u32,
     num_channels: u32,
-    buffer: Arc<Mutex<VecDeque<Vec<i16>>>>,
+    buffer: Arc<Mutex<AudioRingBuffer>>,
 }
 
 impl AudioMixerSource {
     fn receive(&self, frame: AudioFrame) {
-        assert_eq!(
-            frame.data.len() as u32,
-            self.sample_rate * self.num_channels / 100
-        );
-
-        let mut buffer = self.buffer.lock();
-        buffer.push_back(frame.data.to_vec());
-        while buffer.len() > 10 {
-            buffer.pop_front();
-        }
+        self.buffer.lock().push(&frame.data);
     }
 }
 
@@ -548,7 +605,7 @@ impl libwebrtc::native::audio_mixer::AudioMixerSource for AudioMixerSource {
 
     fn get_audio_frame_with_info<'a>(&self, target_sample_rate: u32) -> Option<AudioFrame<'_>> {
         assert_eq!(self.sample_rate, target_sample_rate);
-        let buf = self.buffer.lock().pop_front()?;
+        let buf = self.buffer.lock().pop_frame()?;
         Some(AudioFrame {
             data: Cow::Owned(buf),
             sample_rate: self.sample_rate,