Add Rodio audio pipeline as alternative to current LiveKit pipeline (#36607)

David Kleingeld , Mikayla , and Antonio Scandurra created

Rodio parts are well tested and need less configuration then the livekit
parts. I suspect there is a bug in the livekit configuration regarding
resampling. Rather then investigate that it seemed faster & easier to
swap in Rodio.

This opens the door to using other Rodio parts like:
 - Decibel based volume control
 - Limiter (prevents sound from becoming too loud)
 - Automatic gain control

To use this add to settings:
```
  "audio": {
    "experimental.rodio_audio": true
  }
```

Release Notes:

- N/A

Co-authored-by: Mikayla <mikayla@zed.dev>
Co-authored-by: Antonio Scandurra <me@as-cii.com>

Change summary

Cargo.lock                                                  |  7 
crates/audio/Cargo.toml                                     |  5 
crates/audio/src/assets.rs                                  | 54 ----
crates/audio/src/audio.rs                                   | 76 ++++--
crates/audio/src/audio_settings.rs                          | 33 +++
crates/livekit_client/Cargo.toml                            |  2 
crates/livekit_client/src/lib.rs                            |  7 
crates/livekit_client/src/livekit_client.rs                 | 14 
crates/livekit_client/src/livekit_client/playback.rs        | 64 ++++-
crates/livekit_client/src/livekit_client/playback/source.rs | 67 ++++++
crates/settings/src/settings_store.rs                       |  5 
crates/zed/src/main.rs                                      |  2 
crates/zed/src/zed.rs                                       |  2 
13 files changed, 226 insertions(+), 112 deletions(-)

Detailed changes

Cargo.lock 🔗

@@ -1379,10 +1379,11 @@ version = "0.1.0"
 dependencies = [
  "anyhow",
  "collections",
- "derive_more 0.99.19",
  "gpui",
- "parking_lot",
  "rodio",
+ "schemars",
+ "serde",
+ "settings",
  "util",
  "workspace-hack",
 ]
@@ -9621,6 +9622,7 @@ version = "0.1.0"
 dependencies = [
  "anyhow",
  "async-trait",
+ "audio",
  "collections",
  "core-foundation 0.10.0",
  "core-video",
@@ -9643,6 +9645,7 @@ dependencies = [
  "scap",
  "serde",
  "serde_json",
+ "settings",
  "sha2",
  "simplelog",
  "smallvec",

crates/audio/Cargo.toml 🔗

@@ -15,9 +15,10 @@ doctest = false
 [dependencies]
 anyhow.workspace = true
 collections.workspace = true
-derive_more.workspace = true
 gpui.workspace = true
-parking_lot.workspace = true
+settings.workspace = true
+schemars.workspace = true
+serde.workspace = true
 rodio = { workspace = true, features = [ "wav", "playback", "tracing" ] }
 util.workspace = true
 workspace-hack.workspace = true

crates/audio/src/assets.rs 🔗

@@ -1,54 +0,0 @@
-use std::{io::Cursor, sync::Arc};
-
-use anyhow::{Context as _, Result};
-use collections::HashMap;
-use gpui::{App, AssetSource, Global};
-use rodio::{Decoder, Source, source::Buffered};
-
-type Sound = Buffered<Decoder<Cursor<Vec<u8>>>>;
-
-pub struct SoundRegistry {
-    cache: Arc<parking_lot::Mutex<HashMap<String, Sound>>>,
-    assets: Box<dyn AssetSource>,
-}
-
-struct GlobalSoundRegistry(Arc<SoundRegistry>);
-
-impl Global for GlobalSoundRegistry {}
-
-impl SoundRegistry {
-    pub fn new(source: impl AssetSource) -> Arc<Self> {
-        Arc::new(Self {
-            cache: Default::default(),
-            assets: Box::new(source),
-        })
-    }
-
-    pub fn global(cx: &App) -> Arc<Self> {
-        cx.global::<GlobalSoundRegistry>().0.clone()
-    }
-
-    pub(crate) fn set_global(source: impl AssetSource, cx: &mut App) {
-        cx.set_global(GlobalSoundRegistry(SoundRegistry::new(source)));
-    }
-
-    pub fn get(&self, name: &str) -> Result<impl Source<Item = f32> + use<>> {
-        if let Some(wav) = self.cache.lock().get(name) {
-            return Ok(wav.clone());
-        }
-
-        let path = format!("sounds/{}.wav", name);
-        let bytes = self
-            .assets
-            .load(&path)?
-            .map(anyhow::Ok)
-            .with_context(|| format!("No asset available for path {path}"))??
-            .into_owned();
-        let cursor = Cursor::new(bytes);
-        let source = Decoder::new(cursor)?.buffered();
-
-        self.cache.lock().insert(name.to_string(), source.clone());
-
-        Ok(source)
-    }
-}

crates/audio/src/audio.rs 🔗

@@ -1,16 +1,19 @@
-use assets::SoundRegistry;
-use derive_more::{Deref, DerefMut};
-use gpui::{App, AssetSource, BorrowAppContext, Global};
-use rodio::{OutputStream, OutputStreamBuilder};
+use anyhow::{Context as _, Result, anyhow};
+use collections::HashMap;
+use gpui::{App, BorrowAppContext, Global};
+use rodio::{Decoder, OutputStream, OutputStreamBuilder, Source, source::Buffered};
+use settings::Settings;
+use std::io::Cursor;
 use util::ResultExt;
 
-mod assets;
+mod audio_settings;
+pub use audio_settings::AudioSettings;
 
-pub fn init(source: impl AssetSource, cx: &mut App) {
-    SoundRegistry::set_global(source, cx);
-    cx.set_global(GlobalAudio(Audio::new()));
+pub fn init(cx: &mut App) {
+    AudioSettings::register(cx);
 }
 
+#[derive(Copy, Clone, Eq, Hash, PartialEq)]
 pub enum Sound {
     Joined,
     Leave,
@@ -38,18 +41,12 @@ impl Sound {
 #[derive(Default)]
 pub struct Audio {
     output_handle: Option<OutputStream>,
+    source_cache: HashMap<Sound, Buffered<Decoder<Cursor<Vec<u8>>>>>,
 }
 
-#[derive(Deref, DerefMut)]
-struct GlobalAudio(Audio);
-
-impl Global for GlobalAudio {}
+impl Global for Audio {}
 
 impl Audio {
-    pub fn new() -> Self {
-        Self::default()
-    }
-
     fn ensure_output_exists(&mut self) -> Option<&OutputStream> {
         if self.output_handle.is_none() {
             self.output_handle = OutputStreamBuilder::open_default_stream().log_err();
@@ -58,26 +55,51 @@ impl Audio {
         self.output_handle.as_ref()
     }
 
-    pub fn play_sound(sound: Sound, cx: &mut App) {
-        if !cx.has_global::<GlobalAudio>() {
-            return;
-        }
+    pub fn play_source(
+        source: impl rodio::Source + Send + 'static,
+        cx: &mut App,
+    ) -> anyhow::Result<()> {
+        cx.update_default_global(|this: &mut Self, _cx| {
+            let output_handle = this
+                .ensure_output_exists()
+                .ok_or_else(|| anyhow!("Could not open audio output"))?;
+            output_handle.mixer().add(source);
+            Ok(())
+        })
+    }
 
-        cx.update_global::<GlobalAudio, _>(|this, cx| {
+    pub fn play_sound(sound: Sound, cx: &mut App) {
+        cx.update_default_global(|this: &mut Self, cx| {
+            let source = this.sound_source(sound, cx).log_err()?;
             let output_handle = this.ensure_output_exists()?;
-            let source = SoundRegistry::global(cx).get(sound.file()).log_err()?;
             output_handle.mixer().add(source);
             Some(())
         });
     }
 
     pub fn end_call(cx: &mut App) {
-        if !cx.has_global::<GlobalAudio>() {
-            return;
-        }
-
-        cx.update_global::<GlobalAudio, _>(|this, _| {
+        cx.update_default_global(|this: &mut Self, _cx| {
             this.output_handle.take();
         });
     }
+
+    fn sound_source(&mut self, sound: Sound, cx: &App) -> Result<impl Source + use<>> {
+        if let Some(wav) = self.source_cache.get(&sound) {
+            return Ok(wav.clone());
+        }
+
+        let path = format!("sounds/{}.wav", sound.file());
+        let bytes = cx
+            .asset_source()
+            .load(&path)?
+            .map(anyhow::Ok)
+            .with_context(|| format!("No asset available for path {path}"))??
+            .into_owned();
+        let cursor = Cursor::new(bytes);
+        let source = Decoder::new(cursor)?.buffered();
+
+        self.source_cache.insert(sound, source.clone());
+
+        Ok(source)
+    }
 }

crates/audio/src/audio_settings.rs 🔗

@@ -0,0 +1,33 @@
+use anyhow::Result;
+use gpui::App;
+use schemars::JsonSchema;
+use serde::{Deserialize, Serialize};
+use settings::{Settings, SettingsSources};
+
+#[derive(Deserialize, Debug)]
+pub struct AudioSettings {
+    /// Opt into the new audio system.
+    #[serde(rename = "experimental.rodio_audio", default)]
+    pub rodio_audio: bool, // default is false
+}
+
+/// Configuration of audio in Zed.
+#[derive(Clone, Default, Serialize, Deserialize, JsonSchema, Debug)]
+#[serde(default)]
+pub struct AudioSettingsContent {
+    /// Whether to use the experimental audio system
+    #[serde(rename = "experimental.rodio_audio", default)]
+    pub rodio_audio: bool,
+}
+
+impl Settings for AudioSettings {
+    const KEY: Option<&'static str> = Some("audio");
+
+    type FileContent = AudioSettingsContent;
+
+    fn load(sources: SettingsSources<Self::FileContent>, _cx: &mut App) -> Result<Self> {
+        sources.json_merge()
+    }
+
+    fn import_from_vscode(_vscode: &settings::VsCodeSettings, _current: &mut Self::FileContent) {}
+}

crates/livekit_client/Cargo.toml 🔗

@@ -25,6 +25,7 @@ async-trait.workspace = true
 collections.workspace = true
 cpal.workspace = true
 futures.workspace = true
+audio.workspace = true
 gpui = { workspace = true, features = ["screen-capture", "x11", "wayland", "windows-manifest"] }
 gpui_tokio.workspace = true
 http_client_tls.workspace = true
@@ -35,6 +36,7 @@ nanoid.workspace = true
 parking_lot.workspace = true
 postage.workspace = true
 smallvec.workspace = true
+settings.workspace = true
 tokio-tungstenite.workspace = true
 util.workspace = true
 workspace-hack.workspace = true

crates/livekit_client/src/lib.rs 🔗

@@ -24,8 +24,11 @@ mod livekit_client;
 )))]
 pub use livekit_client::*;
 
-// If you need proper LSP in livekit_client you've got to comment out
-// the mocks and test
+// If you need proper LSP in livekit_client you've got to comment
+// - the cfg blocks above
+// - the mods: mock_client & test and their conditional blocks
+// - the pub use mock_client::* and their conditional blocks
+
 #[cfg(any(
     test,
     feature = "test-support",

crates/livekit_client/src/livekit_client.rs 🔗

@@ -1,15 +1,16 @@
 use std::sync::Arc;
 
 use anyhow::{Context as _, Result};
+use audio::AudioSettings;
 use collections::HashMap;
 use futures::{SinkExt, channel::mpsc};
 use gpui::{App, AsyncApp, ScreenCaptureSource, ScreenCaptureStream, Task};
 use gpui_tokio::Tokio;
+use log::info;
 use playback::capture_local_video_track;
+use settings::Settings;
 
 mod playback;
-#[cfg(feature = "record-microphone")]
-mod record;
 
 use crate::{LocalTrack, Participant, RemoteTrack, RoomEvent, TrackPublication};
 pub use playback::AudioStream;
@@ -125,9 +126,14 @@ impl Room {
     pub fn play_remote_audio_track(
         &self,
         track: &RemoteAudioTrack,
-        _cx: &App,
+        cx: &mut App,
     ) -> Result<playback::AudioStream> {
-        Ok(self.playback.play_remote_audio_track(&track.0))
+        if AudioSettings::get_global(cx).rodio_audio {
+            info!("Using experimental.rodio_audio audio pipeline");
+            playback::play_remote_audio_track(&track.0, cx)
+        } else {
+            Ok(self.playback.play_remote_audio_track(&track.0))
+        }
     }
 }
 

crates/livekit_client/src/livekit_client/playback.rs 🔗

@@ -18,13 +18,16 @@ use livekit::webrtc::{
     video_stream::native::NativeVideoStream,
 };
 use parking_lot::Mutex;
+use rodio::Source;
 use std::cell::RefCell;
 use std::sync::Weak;
-use std::sync::atomic::{self, AtomicI32};
+use std::sync::atomic::{AtomicBool, AtomicI32, Ordering};
 use std::time::Duration;
 use std::{borrow::Cow, collections::VecDeque, sync::Arc, thread};
 use util::{ResultExt as _, maybe};
 
+mod source;
+
 pub(crate) struct AudioStack {
     executor: BackgroundExecutor,
     apm: Arc<Mutex<apm::AudioProcessingModule>>,
@@ -40,6 +43,29 @@ pub(crate) struct AudioStack {
 const SAMPLE_RATE: u32 = 48000;
 const NUM_CHANNELS: u32 = 2;
 
+pub(crate) fn play_remote_audio_track(
+    track: &livekit::track::RemoteAudioTrack,
+    cx: &mut gpui::App,
+) -> Result<AudioStream> {
+    let stop_handle = Arc::new(AtomicBool::new(false));
+    let stop_handle_clone = stop_handle.clone();
+    let stream = source::LiveKitStream::new(cx.background_executor(), track)
+        .stoppable()
+        .periodic_access(Duration::from_millis(50), move |s| {
+            if stop_handle.load(Ordering::Relaxed) {
+                s.stop();
+            }
+        });
+    audio::Audio::play_source(stream, cx).context("Could not play audio")?;
+
+    let on_drop = util::defer(move || {
+        stop_handle_clone.store(true, Ordering::Relaxed);
+    });
+    Ok(AudioStream::Output {
+        _drop: Box::new(on_drop),
+    })
+}
+
 impl AudioStack {
     pub(crate) fn new(executor: BackgroundExecutor) -> Self {
         let apm = Arc::new(Mutex::new(apm::AudioProcessingModule::new(
@@ -61,7 +87,7 @@ impl AudioStack {
     ) -> AudioStream {
         let output_task = self.start_output();
 
-        let next_ssrc = self.next_ssrc.fetch_add(1, atomic::Ordering::Relaxed);
+        let next_ssrc = self.next_ssrc.fetch_add(1, Ordering::Relaxed);
         let source = AudioMixerSource {
             ssrc: next_ssrc,
             sample_rate: SAMPLE_RATE,
@@ -97,6 +123,23 @@ impl AudioStack {
         }
     }
 
+    fn start_output(&self) -> Arc<Task<()>> {
+        if let Some(task) = self._output_task.borrow().upgrade() {
+            return task;
+        }
+        let task = Arc::new(self.executor.spawn({
+            let apm = self.apm.clone();
+            let mixer = self.mixer.clone();
+            async move {
+                Self::play_output(apm, mixer, SAMPLE_RATE, NUM_CHANNELS)
+                    .await
+                    .log_err();
+            }
+        }));
+        *self._output_task.borrow_mut() = Arc::downgrade(&task);
+        task
+    }
+
     pub(crate) fn capture_local_microphone_track(
         &self,
     ) -> Result<(crate::LocalAudioTrack, AudioStream)> {
@@ -139,23 +182,6 @@ impl AudioStack {
         ))
     }
 
-    fn start_output(&self) -> Arc<Task<()>> {
-        if let Some(task) = self._output_task.borrow().upgrade() {
-            return task;
-        }
-        let task = Arc::new(self.executor.spawn({
-            let apm = self.apm.clone();
-            let mixer = self.mixer.clone();
-            async move {
-                Self::play_output(apm, mixer, SAMPLE_RATE, NUM_CHANNELS)
-                    .await
-                    .log_err();
-            }
-        }));
-        *self._output_task.borrow_mut() = Arc::downgrade(&task);
-        task
-    }
-
     async fn play_output(
         apm: Arc<Mutex<apm::AudioProcessingModule>>,
         mixer: Arc<Mutex<audio_mixer::AudioMixer>>,

crates/livekit_client/src/livekit_client/playback/source.rs 🔗

@@ -0,0 +1,67 @@
+use futures::StreamExt;
+use libwebrtc::{audio_stream::native::NativeAudioStream, prelude::AudioFrame};
+use livekit::track::RemoteAudioTrack;
+use rodio::{Source, buffer::SamplesBuffer, conversions::SampleTypeConverter};
+
+use crate::livekit_client::playback::{NUM_CHANNELS, SAMPLE_RATE};
+
+fn frame_to_samplesbuffer(frame: AudioFrame) -> SamplesBuffer {
+    let samples = frame.data.iter().copied();
+    let samples = SampleTypeConverter::<_, _>::new(samples);
+    let samples: Vec<f32> = samples.collect();
+    SamplesBuffer::new(frame.num_channels as u16, frame.sample_rate, samples)
+}
+
+pub struct LiveKitStream {
+    // shared_buffer: SharedBuffer,
+    inner: rodio::queue::SourcesQueueOutput,
+    _receiver_task: gpui::Task<()>,
+}
+
+impl LiveKitStream {
+    pub fn new(executor: &gpui::BackgroundExecutor, track: &RemoteAudioTrack) -> Self {
+        let mut stream =
+            NativeAudioStream::new(track.rtc_track(), SAMPLE_RATE as i32, NUM_CHANNELS as i32);
+        let (queue_input, queue_output) = rodio::queue::queue(true);
+        // spawn rtc stream
+        let receiver_task = executor.spawn({
+            async move {
+                while let Some(frame) = stream.next().await {
+                    let samples = frame_to_samplesbuffer(frame);
+                    queue_input.append(samples);
+                }
+            }
+        });
+
+        LiveKitStream {
+            _receiver_task: receiver_task,
+            inner: queue_output,
+        }
+    }
+}
+
+impl Iterator for LiveKitStream {
+    type Item = rodio::Sample;
+
+    fn next(&mut self) -> Option<Self::Item> {
+        self.inner.next()
+    }
+}
+
+impl Source for LiveKitStream {
+    fn current_span_len(&self) -> Option<usize> {
+        self.inner.current_span_len()
+    }
+
+    fn channels(&self) -> rodio::ChannelCount {
+        self.inner.channels()
+    }
+
+    fn sample_rate(&self) -> rodio::SampleRate {
+        self.inner.sample_rate()
+    }
+
+    fn total_duration(&self) -> Option<std::time::Duration> {
+        self.inner.total_duration()
+    }
+}

crates/settings/src/settings_store.rs 🔗

@@ -60,6 +60,11 @@ pub trait Settings: 'static + Send + Sync {
 
     /// The logic for combining together values from one or more JSON files into the
     /// final value for this setting.
+    ///
+    /// # Warning
+    /// `Self::FileContent` deserialized field names should match with `Self` deserialized field names
+    /// otherwise the field won't be deserialized properly and you will get the error:
+    /// "A default setting must be added to the `default.json` file"
     fn load(sources: SettingsSources<Self::FileContent>, cx: &mut App) -> Result<Self>
     where
         Self: Sized;

crates/zed/src/main.rs 🔗

@@ -598,7 +598,7 @@ pub fn main() {
         repl::notebook::init(cx);
         diagnostics::init(cx);
 
-        audio::init(Assets, cx);
+        audio::init(cx);
         workspace::init(app_state.clone(), cx);
         ui_prompt::init(cx);
 

crates/zed/src/zed.rs 🔗

@@ -4614,7 +4614,7 @@ mod tests {
             gpui_tokio::init(cx);
             vim_mode_setting::init(cx);
             theme::init(theme::LoadThemes::JustBase, cx);
-            audio::init((), cx);
+            audio::init(cx);
             channel::init(&app_state.client, app_state.user_store.clone(), cx);
             call::init(app_state.client.clone(), app_state.user_store.clone(), cx);
             notifications::init(app_state.client.clone(), app_state.user_store.clone(), cx);