Fix echo canceller not working (#51673)

Yara 🏳️‍⚧️ created

Release Notes:

- Fixed: echo's on experimental audio pipeline

Some more context:
- `EchoCanceller` was not implemented on the output side
- removes the planned migration to a different sample rate (44100) and
channel count (1)
- de-duplicate the interleaved `#cfg[]`'s and centralized them in
`echo_canceller.rs`

Change summary

crates/audio/src/audio.rs                                   | 437 ------
crates/audio/src/audio_pipeline.rs                          | 355 +++++
crates/audio/src/audio_pipeline/echo_canceller.rs           |  54 
crates/audio/src/audio_pipeline/replays.rs                  |   3 
crates/audio/src/audio_pipeline/rodio_ext.rs                |   0 
crates/livekit_client/src/livekit_client/playback.rs        |  19 
crates/livekit_client/src/livekit_client/playback/source.rs |   5 
crates/settings_ui/src/pages/audio_test_window.rs           |   2 
8 files changed, 438 insertions(+), 437 deletions(-)

Detailed changes

crates/audio/src/audio.rs 🔗

@@ -1,77 +1,22 @@
-use anyhow::{Context as _, Result};
-use collections::HashMap;
-use cpal::{
-    DeviceDescription, DeviceId, default_host,
-    traits::{DeviceTrait, HostTrait},
-};
-use gpui::{App, AsyncApp, BackgroundExecutor, BorrowAppContext, Global};
+use std::time::Duration;
 
-#[cfg(not(any(all(target_os = "windows", target_env = "gnu"), target_os = "freebsd")))]
-mod non_windows_and_freebsd_deps {
-    pub(super) use cpal::Sample;
-    pub(super) use libwebrtc::native::apm;
-    pub(super) use parking_lot::Mutex;
-    pub(super) use rodio::source::LimitSettings;
-    pub(super) use std::sync::Arc;
-}
-
-#[cfg(not(any(all(target_os = "windows", target_env = "gnu"), target_os = "freebsd")))]
-use non_windows_and_freebsd_deps::*;
+use rodio::{ChannelCount, SampleRate, nz};
 
-use rodio::{
-    Decoder, DeviceSinkBuilder, MixerDeviceSink, Source,
-    mixer::Mixer,
-    nz,
-    source::{AutomaticGainControlSettings, Buffered},
-};
-use settings::Settings;
-use std::{io::Cursor, num::NonZero, path::PathBuf, sync::atomic::Ordering, time::Duration};
-use util::ResultExt;
+pub const REPLAY_DURATION: Duration = Duration::from_secs(30);
+pub const SAMPLE_RATE: SampleRate = nz!(48000);
+pub const CHANNEL_COUNT: ChannelCount = nz!(2);
 
 mod audio_settings;
-mod replays;
-mod rodio_ext;
 pub use audio_settings::AudioSettings;
-pub use rodio_ext::RodioExt;
 
-use crate::audio_settings::LIVE_SETTINGS;
-
-// We are migrating to 16kHz sample rate from 48kHz. In the future
-// once we are reasonably sure most users have upgraded we will
-// remove the LEGACY parameters.
-//
-// We migrate to 16kHz because it is sufficient for speech and required
-// by the denoiser and future Speech to Text layers.
-pub const SAMPLE_RATE: NonZero<u32> = nz!(16000);
-pub const CHANNEL_COUNT: NonZero<u16> = nz!(1);
-pub const BUFFER_SIZE: usize = // echo canceller and livekit want 10ms of audio
-    (SAMPLE_RATE.get() as usize / 100) * CHANNEL_COUNT.get() as usize;
-
-pub const LEGACY_SAMPLE_RATE: NonZero<u32> = nz!(48000);
-pub const LEGACY_CHANNEL_COUNT: NonZero<u16> = nz!(2);
-
-pub const REPLAY_DURATION: Duration = Duration::from_secs(30);
-
-pub fn init(cx: &mut App) {
-    LIVE_SETTINGS.initialize(cx);
-}
-
-// TODO(jk): this is currently cached only once - we should observe and react instead
-pub fn ensure_devices_initialized(cx: &mut App) {
-    if cx.has_global::<AvailableAudioDevices>() {
-        return;
-    }
-    cx.default_global::<AvailableAudioDevices>();
-    let task = cx
-        .background_executor()
-        .spawn(async move { get_available_audio_devices() });
-    cx.spawn(async move |cx: &mut AsyncApp| {
-        let devices = task.await;
-        cx.update(|cx| cx.set_global(AvailableAudioDevices(devices)));
-        cx.refresh();
-    })
-    .detach();
-}
+mod audio_pipeline;
+pub use audio_pipeline::{Audio, VoipParts};
+pub use audio_pipeline::{AudioDeviceInfo, AvailableAudioDevices};
+pub use audio_pipeline::{ensure_devices_initialized, resolve_device};
+// TODO(audio) replace with input test functionality in the audio crate
+pub use audio_pipeline::RodioExt;
+pub use audio_pipeline::init;
+pub use audio_pipeline::{open_input_stream, open_test_output};
 
 #[derive(Debug, Copy, Clone, Eq, Hash, PartialEq)]
 pub enum Sound {
@@ -99,359 +44,3 @@ impl Sound {
         }
     }
 }
-
-pub struct Audio {
-    output_handle: Option<MixerDeviceSink>,
-    #[cfg(not(any(all(target_os = "windows", target_env = "gnu"), target_os = "freebsd")))]
-    pub echo_canceller: Arc<Mutex<apm::AudioProcessingModule>>,
-    source_cache: HashMap<Sound, Buffered<Decoder<Cursor<Vec<u8>>>>>,
-    replays: replays::Replays,
-}
-
-impl Default for Audio {
-    fn default() -> Self {
-        Self {
-            output_handle: Default::default(),
-            #[cfg(not(any(
-                all(target_os = "windows", target_env = "gnu"),
-                target_os = "freebsd"
-            )))]
-            echo_canceller: Arc::new(Mutex::new(apm::AudioProcessingModule::new(
-                true, false, false, false,
-            ))),
-            source_cache: Default::default(),
-            replays: Default::default(),
-        }
-    }
-}
-
-impl Global for Audio {}
-
-impl Audio {
-    fn ensure_output_exists(&mut self, output_audio_device: Option<DeviceId>) -> Result<&Mixer> {
-        #[cfg(debug_assertions)]
-        log::warn!(
-            "Audio does not sound correct without optimizations. Use a release build to debug audio issues"
-        );
-
-        if self.output_handle.is_none() {
-            let output_handle = open_output_stream(output_audio_device)?;
-
-            // The webrtc apm is not yet compiling for windows & freebsd
-            #[cfg(not(any(
-                any(all(target_os = "windows", target_env = "gnu")),
-                target_os = "freebsd"
-            )))]
-            let echo_canceller = Arc::clone(&self.echo_canceller);
-
-            #[cfg(not(any(
-                any(all(target_os = "windows", target_env = "gnu")),
-                target_os = "freebsd"
-            )))]
-            {
-                let source = rodio::source::Zero::new(CHANNEL_COUNT, SAMPLE_RATE)
-                    .inspect_buffer::<BUFFER_SIZE, _>(move |buffer| {
-                        let mut buf: [i16; _] = buffer.map(|s| s.to_sample());
-                        echo_canceller
-                            .lock()
-                            .process_reverse_stream(
-                                &mut buf,
-                                SAMPLE_RATE.get() as i32,
-                                CHANNEL_COUNT.get().into(),
-                            )
-                            .expect("Audio input and output threads should not panic");
-                    });
-                output_handle.mixer().add(source);
-            }
-
-            #[cfg(any(
-                any(all(target_os = "windows", target_env = "gnu")),
-                target_os = "freebsd"
-            ))]
-            {
-                let source = rodio::source::Zero::new(CHANNEL_COUNT, SAMPLE_RATE);
-                output_handle.mixer().add(source);
-            }
-
-            self.output_handle = Some(output_handle);
-        }
-
-        Ok(self
-            .output_handle
-            .as_ref()
-            .map(|h| h.mixer())
-            .expect("we only get here if opening the outputstream succeeded"))
-    }
-
-    pub fn save_replays(
-        &self,
-        executor: BackgroundExecutor,
-    ) -> gpui::Task<anyhow::Result<(PathBuf, Duration)>> {
-        self.replays.replays_to_tar(executor)
-    }
-
-    #[cfg(not(any(all(target_os = "windows", target_env = "gnu"), target_os = "freebsd")))]
-    pub fn open_microphone(voip_parts: VoipParts) -> anyhow::Result<impl Source> {
-        let stream = open_input_stream(voip_parts.input_audio_device)?;
-        let stream = stream
-            .possibly_disconnected_channels_to_mono()
-            .constant_samplerate(SAMPLE_RATE)
-            .limit(LimitSettings::live_performance())
-            .process_buffer::<BUFFER_SIZE, _>(move |buffer| {
-                let mut int_buffer: [i16; _] = buffer.map(|s| s.to_sample());
-                if voip_parts
-                    .echo_canceller
-                    .lock()
-                    .process_stream(
-                        &mut int_buffer,
-                        SAMPLE_RATE.get() as i32,
-                        CHANNEL_COUNT.get() as i32,
-                    )
-                    .context("livekit audio processor error")
-                    .log_err()
-                    .is_some()
-                {
-                    for (sample, processed) in buffer.iter_mut().zip(&int_buffer) {
-                        *sample = (*processed).to_sample();
-                    }
-                }
-            })
-            .denoise()
-            .context("Could not set up denoiser")?
-            .automatic_gain_control(AutomaticGainControlSettings {
-                target_level: 0.90,
-                attack_time: Duration::from_secs(1),
-                release_time: Duration::from_secs(0),
-                absolute_max_gain: 5.0,
-            })
-            .periodic_access(Duration::from_millis(100), move |agc_source| {
-                agc_source
-                    .set_enabled(LIVE_SETTINGS.auto_microphone_volume.load(Ordering::Relaxed));
-                let denoise = agc_source.inner_mut();
-                denoise.set_enabled(LIVE_SETTINGS.denoise.load(Ordering::Relaxed));
-            });
-
-        let stream = if voip_parts.legacy_audio_compatible {
-            stream.constant_params(LEGACY_CHANNEL_COUNT, LEGACY_SAMPLE_RATE)
-        } else {
-            stream.constant_params(CHANNEL_COUNT, SAMPLE_RATE)
-        };
-
-        let (replay, stream) = stream.replayable(REPLAY_DURATION)?;
-        voip_parts
-            .replays
-            .add_voip_stream("local microphone".to_string(), replay);
-
-        Ok(stream)
-    }
-
-    pub fn play_voip_stream(
-        source: impl rodio::Source + Send + 'static,
-        speaker_name: String,
-        is_staff: bool,
-        cx: &mut App,
-    ) -> anyhow::Result<()> {
-        let (replay_source, source) = source
-            .constant_params(CHANNEL_COUNT, SAMPLE_RATE)
-            .automatic_gain_control(AutomaticGainControlSettings {
-                target_level: 0.90,
-                attack_time: Duration::from_secs(1),
-                release_time: Duration::from_secs(0),
-                absolute_max_gain: 5.0,
-            })
-            .periodic_access(Duration::from_millis(100), move |agc_source| {
-                agc_source.set_enabled(LIVE_SETTINGS.auto_speaker_volume.load(Ordering::Relaxed));
-            })
-            .replayable(REPLAY_DURATION)
-            .expect("REPLAY_DURATION is longer than 100ms");
-        let output_audio_device = AudioSettings::get_global(cx).output_audio_device.clone();
-
-        cx.update_default_global(|this: &mut Self, _cx| {
-            let output_mixer = this
-                .ensure_output_exists(output_audio_device)
-                .context("Could not get output mixer")?;
-            output_mixer.add(source);
-            if is_staff {
-                this.replays.add_voip_stream(speaker_name, replay_source);
-            }
-            Ok(())
-        })
-    }
-
-    pub fn play_sound(sound: Sound, cx: &mut App) {
-        let output_audio_device = AudioSettings::get_global(cx).output_audio_device.clone();
-        cx.update_default_global(|this: &mut Self, cx| {
-            let source = this.sound_source(sound, cx).log_err()?;
-            let output_mixer = this
-                .ensure_output_exists(output_audio_device)
-                .context("Could not get output mixer")
-                .log_err()?;
-
-            output_mixer.add(source);
-            Some(())
-        });
-    }
-
-    pub fn end_call(cx: &mut App) {
-        cx.update_default_global(|this: &mut Self, _cx| {
-            this.output_handle.take();
-        });
-    }
-
-    fn sound_source(&mut self, sound: Sound, cx: &App) -> Result<impl Source + use<>> {
-        if let Some(wav) = self.source_cache.get(&sound) {
-            return Ok(wav.clone());
-        }
-
-        let path = format!("sounds/{}.wav", sound.file());
-        let bytes = cx
-            .asset_source()
-            .load(&path)?
-            .map(anyhow::Ok)
-            .with_context(|| format!("No asset available for path {path}"))??
-            .into_owned();
-        let cursor = Cursor::new(bytes);
-        let source = Decoder::new(cursor)?.buffered();
-
-        self.source_cache.insert(sound, source.clone());
-
-        Ok(source)
-    }
-}
-
-#[cfg(not(any(all(target_os = "windows", target_env = "gnu"), target_os = "freebsd")))]
-pub struct VoipParts {
-    echo_canceller: Arc<Mutex<apm::AudioProcessingModule>>,
-    replays: replays::Replays,
-    legacy_audio_compatible: bool,
-    input_audio_device: Option<DeviceId>,
-}
-
-#[cfg(not(any(all(target_os = "windows", target_env = "gnu"), target_os = "freebsd")))]
-impl VoipParts {
-    pub fn new(cx: &AsyncApp) -> anyhow::Result<Self> {
-        let (apm, replays) = cx.read_default_global::<Audio, _>(|audio, _| {
-            (Arc::clone(&audio.echo_canceller), audio.replays.clone())
-        });
-        let legacy_audio_compatible =
-            AudioSettings::try_read_global(cx, |settings| settings.legacy_audio_compatible)
-                .unwrap_or(true);
-        let input_audio_device =
-            AudioSettings::try_read_global(cx, |settings| settings.input_audio_device.clone())
-                .flatten();
-
-        Ok(Self {
-            legacy_audio_compatible,
-            echo_canceller: apm,
-            replays,
-            input_audio_device,
-        })
-    }
-}
-
-pub fn open_input_stream(
-    device_id: Option<DeviceId>,
-) -> anyhow::Result<rodio::microphone::Microphone> {
-    let builder = rodio::microphone::MicrophoneBuilder::new();
-    let builder = if let Some(id) = device_id {
-        // TODO(jk): upstream patch
-        // if let Some(input_device) = default_host().device_by_id(id) {
-        //     builder.device(input_device);
-        // }
-        let mut found = None;
-        for input in rodio::microphone::available_inputs()? {
-            if input.clone().into_inner().id()? == id {
-                found = Some(builder.device(input));
-                break;
-            }
-        }
-        found.unwrap_or_else(|| builder.default_device())?
-    } else {
-        builder.default_device()?
-    };
-    let stream = builder
-        .default_config()?
-        .prefer_sample_rates([
-            SAMPLE_RATE,
-            SAMPLE_RATE.saturating_mul(rodio::nz!(2)),
-            SAMPLE_RATE.saturating_mul(rodio::nz!(3)),
-            SAMPLE_RATE.saturating_mul(rodio::nz!(4)),
-        ])
-        .prefer_channel_counts([rodio::nz!(1), rodio::nz!(2), rodio::nz!(3), rodio::nz!(4)])
-        .prefer_buffer_sizes(512..)
-        .open_stream()?;
-    log::info!("Opened microphone: {:?}", stream.config());
-    Ok(stream)
-}
-
-pub fn resolve_device(device_id: Option<&DeviceId>, input: bool) -> anyhow::Result<cpal::Device> {
-    if let Some(id) = device_id {
-        if let Some(device) = default_host().device_by_id(id) {
-            return Ok(device);
-        }
-        log::warn!("Selected audio device not found, falling back to default");
-    }
-    if input {
-        default_host()
-            .default_input_device()
-            .context("no audio input device available")
-    } else {
-        default_host()
-            .default_output_device()
-            .context("no audio output device available")
-    }
-}
-
-pub fn open_output_stream(device_id: Option<DeviceId>) -> anyhow::Result<MixerDeviceSink> {
-    let device = resolve_device(device_id.as_ref(), false)?;
-    let mut output_handle = DeviceSinkBuilder::from_device(device)?
-        .open_stream()
-        .context("Could not open output stream")?;
-    output_handle.log_on_drop(false);
-    log::info!("Output stream: {:?}", output_handle);
-    Ok(output_handle)
-}
-
-#[derive(Clone, Debug)]
-pub struct AudioDeviceInfo {
-    pub id: DeviceId,
-    pub desc: DeviceDescription,
-}
-
-impl AudioDeviceInfo {
-    pub fn matches_input(&self, is_input: bool) -> bool {
-        if is_input {
-            self.desc.supports_input()
-        } else {
-            self.desc.supports_output()
-        }
-    }
-
-    pub fn matches(&self, id: &DeviceId, is_input: bool) -> bool {
-        &self.id == id && self.matches_input(is_input)
-    }
-}
-
-impl std::fmt::Display for AudioDeviceInfo {
-    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
-        write!(f, "{} ({})", self.desc.name(), self.id)
-    }
-}
-
-fn get_available_audio_devices() -> Vec<AudioDeviceInfo> {
-    let Some(devices) = default_host().devices().ok() else {
-        return Vec::new();
-    };
-    devices
-        .filter_map(|device| {
-            let id = device.id().ok()?;
-            let desc = device.description().ok()?;
-            Some(AudioDeviceInfo { id, desc })
-        })
-        .collect()
-}
-
-#[derive(Default, Clone, Debug)]
-pub struct AvailableAudioDevices(pub Vec<AudioDeviceInfo>);
-
-impl Global for AvailableAudioDevices {}

crates/audio/src/audio_pipeline.rs 🔗

@@ -0,0 +1,355 @@
+use anyhow::{Context as _, Result};
+use collections::HashMap;
+use cpal::{
+    DeviceDescription, DeviceId, default_host,
+    traits::{DeviceTrait, HostTrait},
+};
+use gpui::{App, AsyncApp, BackgroundExecutor, BorrowAppContext, Global};
+
+pub(super) use cpal::Sample;
+pub(super) use rodio::source::LimitSettings;
+
+use rodio::{
+    Decoder, DeviceSinkBuilder, MixerDeviceSink, Source,
+    mixer::Mixer,
+    source::{AutomaticGainControlSettings, Buffered},
+};
+use settings::Settings;
+use std::{io::Cursor, path::PathBuf, sync::atomic::Ordering, time::Duration};
+use util::ResultExt;
+
+mod echo_canceller;
+use echo_canceller::EchoCanceller;
+mod replays;
+mod rodio_ext;
+pub use crate::audio_settings::AudioSettings;
+pub use rodio_ext::RodioExt;
+
+use crate::audio_settings::LIVE_SETTINGS;
+
+use crate::Sound;
+
+use super::{CHANNEL_COUNT, SAMPLE_RATE};
+pub const BUFFER_SIZE: usize = // echo canceller and livekit want 10ms of audio
+    (SAMPLE_RATE.get() as usize / 100) * CHANNEL_COUNT.get() as usize;
+
+pub fn init(cx: &mut App) {
+    LIVE_SETTINGS.initialize(cx);
+}
+
+// TODO(jk): this is currently cached only once - we should observe and react instead
+pub fn ensure_devices_initialized(cx: &mut App) {
+    if cx.has_global::<AvailableAudioDevices>() {
+        return;
+    }
+    cx.default_global::<AvailableAudioDevices>();
+    let task = cx
+        .background_executor()
+        .spawn(async move { get_available_audio_devices() });
+    cx.spawn(async move |cx: &mut AsyncApp| {
+        let devices = task.await;
+        cx.update(|cx| cx.set_global(AvailableAudioDevices(devices)));
+        cx.refresh();
+    })
+    .detach();
+}
+
+#[derive(Default)]
+pub struct Audio {
+    output: Option<(MixerDeviceSink, Mixer)>,
+    pub echo_canceller: EchoCanceller,
+    source_cache: HashMap<Sound, Buffered<Decoder<Cursor<Vec<u8>>>>>,
+    replays: replays::Replays,
+}
+
+impl Global for Audio {}
+
+impl Audio {
+    fn ensure_output_exists(&mut self, output_audio_device: Option<DeviceId>) -> Result<&Mixer> {
+        #[cfg(debug_assertions)]
+        log::warn!(
+            "Audio does not sound correct without optimizations. Use a release build to debug audio issues"
+        );
+
+        if self.output.is_none() {
+            let (output_handle, output_mixer) =
+                open_output_stream(output_audio_device, self.echo_canceller.clone())?;
+            self.output = Some((output_handle, output_mixer));
+        }
+
+        Ok(self
+            .output
+            .as_ref()
+            .map(|(_, mixer)| mixer)
+            .expect("we only get here if opening the outputstream succeeded"))
+    }
+
+    pub fn save_replays(
+        &self,
+        executor: BackgroundExecutor,
+    ) -> gpui::Task<anyhow::Result<(PathBuf, Duration)>> {
+        self.replays.replays_to_tar(executor)
+    }
+
+    pub fn open_microphone(mut voip_parts: VoipParts) -> anyhow::Result<impl Source> {
+        let stream = open_input_stream(voip_parts.input_audio_device)?;
+        let stream = stream
+            .possibly_disconnected_channels_to_mono()
+            .constant_params(CHANNEL_COUNT, SAMPLE_RATE)
+            .process_buffer::<BUFFER_SIZE, _>(move |buffer| {
+                let mut int_buffer: [i16; _] = buffer.map(|s| s.to_sample());
+                if voip_parts
+                    .echo_canceller
+                    .process_stream(&mut int_buffer)
+                    .log_err()
+                    .is_some()
+                {
+                    for (sample, processed) in buffer.iter_mut().zip(&int_buffer) {
+                        *sample = (*processed).to_sample();
+                    }
+                }
+            })
+            .limit(LimitSettings::live_performance())
+            .automatic_gain_control(AutomaticGainControlSettings {
+                target_level: 0.90,
+                attack_time: Duration::from_secs(1),
+                release_time: Duration::from_secs(0),
+                absolute_max_gain: 5.0,
+            })
+            .periodic_access(Duration::from_millis(100), move |agc_source| {
+                agc_source
+                    .set_enabled(LIVE_SETTINGS.auto_microphone_volume.load(Ordering::Relaxed));
+                let _ = LIVE_SETTINGS.denoise; // TODO(audio: re-introduce de-noising
+            });
+
+        let (replay, stream) = stream.replayable(crate::REPLAY_DURATION)?;
+        voip_parts
+            .replays
+            .add_voip_stream("local microphone".to_string(), replay);
+
+        Ok(stream)
+    }
+
+    pub fn play_voip_stream(
+        source: impl rodio::Source + Send + 'static,
+        speaker_name: String,
+        is_staff: bool,
+        cx: &mut App,
+    ) -> anyhow::Result<()> {
+        let (replay_source, source) = source
+            .automatic_gain_control(AutomaticGainControlSettings {
+                target_level: 0.90,
+                attack_time: Duration::from_secs(1),
+                release_time: Duration::from_secs(0),
+                absolute_max_gain: 5.0,
+            })
+            .periodic_access(Duration::from_millis(100), move |agc_source| {
+                agc_source.set_enabled(LIVE_SETTINGS.auto_speaker_volume.load(Ordering::Relaxed));
+            })
+            .replayable(crate::REPLAY_DURATION)
+            .expect("REPLAY_DURATION is longer than 100ms");
+        let output_audio_device = AudioSettings::get_global(cx).output_audio_device.clone();
+
+        cx.update_default_global(|this: &mut Self, _cx| {
+            let output_mixer = this
+                .ensure_output_exists(output_audio_device)
+                .context("Could not get output mixer")?;
+            output_mixer.add(source);
+            if is_staff {
+                this.replays.add_voip_stream(speaker_name, replay_source);
+            }
+            Ok(())
+        })
+    }
+
+    pub fn play_sound(sound: Sound, cx: &mut App) {
+        let output_audio_device = AudioSettings::get_global(cx).output_audio_device.clone();
+        cx.update_default_global(|this: &mut Self, cx| {
+            let source = this.sound_source(sound, cx).log_err()?;
+            let output_mixer = this
+                .ensure_output_exists(output_audio_device)
+                .context("Could not get output mixer")
+                .log_err()?;
+
+            output_mixer.add(source);
+            Some(())
+        });
+    }
+
+    pub fn end_call(cx: &mut App) {
+        cx.update_default_global(|this: &mut Self, _cx| {
+            this.output.take();
+        });
+    }
+
+    fn sound_source(&mut self, sound: Sound, cx: &App) -> Result<impl Source + use<>> {
+        if let Some(wav) = self.source_cache.get(&sound) {
+            return Ok(wav.clone());
+        }
+
+        let path = format!("sounds/{}.wav", sound.file());
+        let bytes = cx
+            .asset_source()
+            .load(&path)?
+            .map(anyhow::Ok)
+            .with_context(|| format!("No asset available for path {path}"))??
+            .into_owned();
+        let cursor = Cursor::new(bytes);
+        let source = Decoder::new(cursor)?.buffered();
+
+        self.source_cache.insert(sound, source.clone());
+
+        Ok(source)
+    }
+}
+
+pub struct VoipParts {
+    echo_canceller: EchoCanceller,
+    replays: replays::Replays,
+    input_audio_device: Option<DeviceId>,
+}
+
+impl VoipParts {
+    pub fn new(cx: &AsyncApp) -> anyhow::Result<Self> {
+        let (apm, replays) = cx.read_default_global::<Audio, _>(|audio, _| {
+            (audio.echo_canceller.clone(), audio.replays.clone())
+        });
+        let input_audio_device =
+            AudioSettings::try_read_global(cx, |settings| settings.input_audio_device.clone())
+                .flatten();
+
+        Ok(Self {
+            echo_canceller: apm,
+            replays,
+            input_audio_device,
+        })
+    }
+}
+
+pub fn open_input_stream(
+    device_id: Option<DeviceId>,
+) -> anyhow::Result<rodio::microphone::Microphone> {
+    let builder = rodio::microphone::MicrophoneBuilder::new();
+    let builder = if let Some(id) = device_id {
+        // TODO(jk): upstream patch
+        // if let Some(input_device) = default_host().device_by_id(id) {
+        //     builder.device(input_device);
+        // }
+        let mut found = None;
+        for input in rodio::microphone::available_inputs()? {
+            if input.clone().into_inner().id()? == id {
+                found = Some(builder.device(input));
+                break;
+            }
+        }
+        found.unwrap_or_else(|| builder.default_device())?
+    } else {
+        builder.default_device()?
+    };
+    let stream = builder
+        .default_config()?
+        .prefer_sample_rates([
+            SAMPLE_RATE,
+            SAMPLE_RATE.saturating_mul(rodio::nz!(2)),
+            SAMPLE_RATE.saturating_mul(rodio::nz!(3)),
+            SAMPLE_RATE.saturating_mul(rodio::nz!(4)),
+        ])
+        .prefer_channel_counts([rodio::nz!(1), rodio::nz!(2), rodio::nz!(3), rodio::nz!(4)])
+        .prefer_buffer_sizes(512..)
+        .open_stream()?;
+    log::info!("Opened microphone: {:?}", stream.config());
+    Ok(stream)
+}
+
+pub fn resolve_device(device_id: Option<&DeviceId>, input: bool) -> anyhow::Result<cpal::Device> {
+    if let Some(id) = device_id {
+        if let Some(device) = default_host().device_by_id(id) {
+            return Ok(device);
+        }
+        log::warn!("Selected audio device not found, falling back to default");
+    }
+    if input {
+        default_host()
+            .default_input_device()
+            .context("no audio input device available")
+    } else {
+        default_host()
+            .default_output_device()
+            .context("no audio output device available")
+    }
+}
+
+pub fn open_test_output(device_id: Option<DeviceId>) -> anyhow::Result<MixerDeviceSink> {
+    let device = resolve_device(device_id.as_ref(), false)?;
+    DeviceSinkBuilder::from_device(device)?
+        .open_stream()
+        .context("Could not open output stream")
+}
+
+pub fn open_output_stream(
+    device_id: Option<DeviceId>,
+    mut echo_canceller: EchoCanceller,
+) -> anyhow::Result<(MixerDeviceSink, Mixer)> {
+    let device = resolve_device(device_id.as_ref(), false)?;
+    let mut output_handle = DeviceSinkBuilder::from_device(device)?
+        .open_stream()
+        .context("Could not open output stream")?;
+    output_handle.log_on_drop(false);
+    log::info!("Output stream: {:?}", output_handle);
+
+    let (output_mixer, source) = rodio::mixer::mixer(CHANNEL_COUNT, SAMPLE_RATE);
+    // otherwise the mixer ends as it's empty
+    output_mixer.add(rodio::source::Zero::new(CHANNEL_COUNT, SAMPLE_RATE));
+    let echo_cancelling_source = source // apply echo cancellation just before output
+        .inspect_buffer::<BUFFER_SIZE, _>(move |buffer| {
+            let mut buf: [i16; _] = buffer.map(|s| s.to_sample());
+            echo_canceller.process_reverse_stream(&mut buf)
+        });
+    output_handle.mixer().add(echo_cancelling_source);
+
+    Ok((output_handle, output_mixer))
+}
+
+#[derive(Clone, Debug)]
+pub struct AudioDeviceInfo {
+    pub id: DeviceId,
+    pub desc: DeviceDescription,
+}
+
+impl AudioDeviceInfo {
+    pub fn matches_input(&self, is_input: bool) -> bool {
+        if is_input {
+            self.desc.supports_input()
+        } else {
+            self.desc.supports_output()
+        }
+    }
+
+    pub fn matches(&self, id: &DeviceId, is_input: bool) -> bool {
+        &self.id == id && self.matches_input(is_input)
+    }
+}
+
+impl std::fmt::Display for AudioDeviceInfo {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        write!(f, "{} ({})", self.desc.name(), self.id)
+    }
+}
+
+fn get_available_audio_devices() -> Vec<AudioDeviceInfo> {
+    let Some(devices) = default_host().devices().ok() else {
+        return Vec::new();
+    };
+    devices
+        .filter_map(|device| {
+            let id = device.id().ok()?;
+            let desc = device.description().ok()?;
+            Some(AudioDeviceInfo { id, desc })
+        })
+        .collect()
+}
+
+#[derive(Default, Clone, Debug)]
+pub struct AvailableAudioDevices(pub Vec<AudioDeviceInfo>);
+
+impl Global for AvailableAudioDevices {}

crates/audio/src/audio_pipeline/echo_canceller.rs 🔗

@@ -0,0 +1,54 @@
+#[cfg(not(any(all(target_os = "windows", target_env = "gnu"), target_os = "freebsd")))]
+mod real_implementation {
+    use anyhow::Context;
+    use libwebrtc::native::apm;
+    use parking_lot::Mutex;
+    use std::sync::Arc;
+
+    use crate::{CHANNEL_COUNT, SAMPLE_RATE};
+
+    #[derive(Clone)]
+    pub struct EchoCanceller(Arc<Mutex<apm::AudioProcessingModule>>);
+
+    impl Default for EchoCanceller {
+        fn default() -> Self {
+            Self(Arc::new(Mutex::new(apm::AudioProcessingModule::new(
+                true, false, false, false,
+            ))))
+        }
+    }
+
+    impl EchoCanceller {
+        pub fn process_reverse_stream(&mut self, buf: &mut [i16]) {
+            self.0
+                .lock()
+                .process_reverse_stream(buf, SAMPLE_RATE.get() as i32, CHANNEL_COUNT.get().into())
+                .expect("Audio input and output threads should not panic");
+        }
+
+        pub fn process_stream(&mut self, buf: &mut [i16]) -> anyhow::Result<()> {
+            self.0
+                .lock()
+                .process_stream(buf, SAMPLE_RATE.get() as i32, CHANNEL_COUNT.get() as i32)
+                .context("livekit audio processor error")
+        }
+    }
+}
+
+#[cfg(any(all(target_os = "windows", target_env = "gnu"), target_os = "freebsd"))]
+mod fake_implementation {
+    #[derive(Clone, Default)]
+    pub struct EchoCanceller;
+
+    impl EchoCanceller {
+        pub fn process_reverse_stream(&mut self, _buf: &mut [i16]) {}
+        pub fn process_stream(&mut self, _buf: &mut [i16]) -> anyhow::Result<()> {
+            Ok(())
+        }
+    }
+}
+
+#[cfg(any(all(target_os = "windows", target_env = "gnu"), target_os = "freebsd"))]
+pub use fake_implementation::EchoCanceller;
+#[cfg(not(any(all(target_os = "windows", target_env = "gnu"), target_os = "freebsd")))]
+pub use real_implementation::EchoCanceller;

crates/audio/src/replays.rs → crates/audio/src/audio_pipeline/replays.rs 🔗

@@ -8,7 +8,8 @@ use rodio::Source;
 use smol::fs::File;
 use std::{io, path::PathBuf, sync::Arc, time::Duration};
 
-use crate::{REPLAY_DURATION, rodio_ext::Replay};
+use crate::REPLAY_DURATION;
+use crate::audio_pipeline::rodio_ext::Replay;
 
 #[derive(Default, Clone)]
 pub(crate) struct Replays(Arc<Mutex<HashMap<String, Replay>>>);

crates/livekit_client/src/livekit_client/playback.rs 🔗

@@ -1,6 +1,6 @@
 use anyhow::{Context as _, Result};
 
-use audio::{AudioSettings, CHANNEL_COUNT, LEGACY_CHANNEL_COUNT, LEGACY_SAMPLE_RATE, SAMPLE_RATE};
+use audio::{AudioSettings, CHANNEL_COUNT, SAMPLE_RATE};
 use cpal::DeviceId;
 use cpal::traits::{DeviceTrait, StreamTrait as _};
 use futures::channel::mpsc::Sender;
@@ -99,8 +99,8 @@ impl AudioStack {
         let next_ssrc = self.next_ssrc.fetch_add(1, Ordering::Relaxed);
         let source = AudioMixerSource {
             ssrc: next_ssrc,
-            sample_rate: LEGACY_SAMPLE_RATE.get(),
-            num_channels: LEGACY_CHANNEL_COUNT.get() as u32,
+            sample_rate: SAMPLE_RATE.get(),
+            num_channels: CHANNEL_COUNT.get() as u32,
             buffer: Arc::default(),
         };
         self.mixer.lock().add_source(source.clone());
@@ -145,8 +145,8 @@ impl AudioStack {
                     executor,
                     apm,
                     mixer,
-                    LEGACY_SAMPLE_RATE.get(),
-                    LEGACY_CHANNEL_COUNT.get().into(),
+                    SAMPLE_RATE.get(),
+                    CHANNEL_COUNT.get().into(),
                     output_audio_device,
                 )
                 .await
@@ -171,8 +171,9 @@ impl AudioStack {
             NativeAudioSource::new(
                 // n.b. this struct's options are always ignored, noise cancellation is provided by apm.
                 AudioSourceOptions::default(),
-                LEGACY_SAMPLE_RATE.get(),
-                LEGACY_CHANNEL_COUNT.get().into(),
+                SAMPLE_RATE.get(), // TODO(audio): this was legacy params,
+                // removed for now for simplicity
+                CHANNEL_COUNT.get().into(),
                 10,
             )
         } else {
@@ -233,8 +234,8 @@ impl AudioStack {
                     executor,
                     apm,
                     frame_tx,
-                    LEGACY_SAMPLE_RATE.get(),
-                    LEGACY_CHANNEL_COUNT.get().into(),
+                    SAMPLE_RATE.get(), // TODO(audio): was legacy removed for now
+                    CHANNEL_COUNT.get().into(),
                     input_audio_device,
                 )
                 .await

crates/livekit_client/src/livekit_client/playback/source.rs 🔗

@@ -7,7 +7,7 @@ use rodio::{
     ChannelCount, SampleRate, Source, buffer::SamplesBuffer, conversions::SampleTypeConverter,
 };
 
-use audio::{CHANNEL_COUNT, LEGACY_CHANNEL_COUNT, LEGACY_SAMPLE_RATE, SAMPLE_RATE};
+use audio::{CHANNEL_COUNT, SAMPLE_RATE};
 
 fn frame_to_samplesbuffer(frame: AudioFrame) -> SamplesBuffer {
     let samples = frame.data.iter().copied();
@@ -35,7 +35,8 @@ impl LiveKitStream {
         legacy: bool,
     ) -> Self {
         let (channel_count, sample_rate) = if legacy {
-            (LEGACY_CHANNEL_COUNT, LEGACY_SAMPLE_RATE)
+            // (LEGACY_CHANNEL_COUNT, LEGACY_SAMPLE_RATE) TODO(audio): do this or remove
+            (CHANNEL_COUNT, SAMPLE_RATE)
         } else {
             (CHANNEL_COUNT, SAMPLE_RATE)
         };

crates/settings_ui/src/pages/audio_test_window.rs 🔗

@@ -88,7 +88,7 @@ fn start_test_playback(
                     }
                 };
 
-                let Ok(output) = audio::open_output_stream(output_device_id) else {
+                let Ok(output) = audio::open_test_output(output_device_id) else {
                     log::error!("Could not open output device for audio test");
                     return;
                 };