From 215ac1f1914c919c0636cdfe62a4a927e5b190ce Mon Sep 17 00:00:00 2001 From: Piotr Osiewicz <24362066+osiewicz@users.noreply.github.com> Date: Wed, 11 Mar 2026 11:26:04 +0100 Subject: [PATCH] audio(rodio): use ring buffer instead of a rodio::queue::queue Rodio's queue does not let us skip past frames when we're falling behind. --- .../src/livekit_client/playback/source.rs | 53 +++++++++++++------ 1 file changed, 37 insertions(+), 16 deletions(-) diff --git a/crates/livekit_client/src/livekit_client/playback/source.rs b/crates/livekit_client/src/livekit_client/playback/source.rs index b90c3613f8215481a4a535eb81c665fccae80e5c..a98223c3a0cccf121506978864b7caedcb17e197 100644 --- a/crates/livekit_client/src/livekit_client/playback/source.rs +++ b/crates/livekit_client/src/livekit_client/playback/source.rs @@ -1,28 +1,34 @@ -use std::num::NonZero; +use std::sync::Arc; use futures::StreamExt; use libwebrtc::{audio_stream::native::NativeAudioStream, prelude::AudioFrame}; use livekit::track::RemoteAudioTrack; +use parking_lot::Mutex; +use ringbuffer::{ConstGenericRingBuffer, RingBuffer}; use rodio::{ - ChannelCount, SampleRate, Source, buffer::SamplesBuffer, conversions::SampleTypeConverter, + ChannelCount, Sample, SampleRate, Source, buffer::SamplesBuffer, + conversions::SampleTypeConverter, }; use audio::{CHANNEL_COUNT, LEGACY_CHANNEL_COUNT, LEGACY_SAMPLE_RATE, SAMPLE_RATE}; +// 10ms frames; 10 frames = 100ms max buffered before we start dropping old frames. +const RING_BUFFER_CAPACITY: usize = 10; + fn frame_to_samplesbuffer(frame: AudioFrame) -> SamplesBuffer { let samples = frame.data.iter().copied(); let samples = SampleTypeConverter::<_, _>::new(samples); let samples: Vec = samples.collect(); SamplesBuffer::new( - NonZero::new(frame.num_channels as u16).expect("zero channels is nonsense"), - NonZero::new(frame.sample_rate).expect("samplerate zero is nonsense"), + std::num::NonZero::new(frame.num_channels as u16).expect("zero channels is nonsense"), + std::num::NonZero::new(frame.sample_rate).expect("samplerate zero is nonsense"), samples, ) } pub struct LiveKitStream { - // shared_buffer: SharedBuffer, - inner: rodio::queue::SourcesQueueOutput, + buffer: Arc>>, + current: Option, _receiver_task: gpui::Task<()>, channel_count: ChannelCount, sample_rate: SampleRate, @@ -45,20 +51,24 @@ impl LiveKitStream { sample_rate.get() as i32, channel_count.get().into(), ); - let (queue_input, queue_output) = rodio::queue::queue(true); - // spawn rtc stream + + let buffer: Arc>> = + Arc::default(); + let receiver_task = executor.spawn_with_priority(gpui::Priority::RealtimeAudio, { + let buffer = Arc::clone(&buffer); async move { while let Some(frame) = stream.next().await { let samples = frame_to_samplesbuffer(frame); - queue_input.append(samples); + buffer.lock().enqueue(samples); } } }); LiveKitStream { + buffer, + current: None, _receiver_task: receiver_task, - inner: queue_output, sample_rate, channel_count, } @@ -66,27 +76,38 @@ impl LiveKitStream { } impl Iterator for LiveKitStream { - type Item = rodio::Sample; + type Item = Sample; fn next(&mut self) -> Option { - self.inner.next() + loop { + if let Some(current) = &mut self.current { + if let Some(sample) = current.next() { + return Some(sample); + } + } + self.current = self.buffer.lock().dequeue(); + if self.current.is_none() { + // Underrun: emit silence rather than ending the stream. + return Some(0.0); + } + } } } impl Source for LiveKitStream { fn current_span_len(&self) -> Option { - self.inner.current_span_len() + self.current.as_ref().and_then(|s| s.current_span_len()) } - fn channels(&self) -> rodio::ChannelCount { + fn channels(&self) -> ChannelCount { self.channel_count } - fn sample_rate(&self) -> rodio::SampleRate { + fn sample_rate(&self) -> SampleRate { self.sample_rate } fn total_duration(&self) -> Option { - self.inner.total_duration() + None } }