diff --git a/crates/audio/src/audio.rs b/crates/audio/src/audio.rs index 2165cf39136a1ed7268fbf6ea670d825b2b50bcc..650285aa654ac02ae03f41d0af66b33f086a106e 100644 --- a/crates/audio/src/audio.rs +++ b/crates/audio/src/audio.rs @@ -1,77 +1,22 @@ -use anyhow::{Context as _, Result}; -use collections::HashMap; -use cpal::{ - DeviceDescription, DeviceId, default_host, - traits::{DeviceTrait, HostTrait}, -}; -use gpui::{App, AsyncApp, BackgroundExecutor, BorrowAppContext, Global}; +use std::time::Duration; -#[cfg(not(any(all(target_os = "windows", target_env = "gnu"), target_os = "freebsd")))] -mod non_windows_and_freebsd_deps { - pub(super) use cpal::Sample; - pub(super) use libwebrtc::native::apm; - pub(super) use parking_lot::Mutex; - pub(super) use rodio::source::LimitSettings; - pub(super) use std::sync::Arc; -} - -#[cfg(not(any(all(target_os = "windows", target_env = "gnu"), target_os = "freebsd")))] -use non_windows_and_freebsd_deps::*; +use rodio::{ChannelCount, SampleRate, nz}; -use rodio::{ - Decoder, DeviceSinkBuilder, MixerDeviceSink, Source, - mixer::Mixer, - nz, - source::{AutomaticGainControlSettings, Buffered}, -}; -use settings::Settings; -use std::{io::Cursor, num::NonZero, path::PathBuf, sync::atomic::Ordering, time::Duration}; -use util::ResultExt; +pub const REPLAY_DURATION: Duration = Duration::from_secs(30); +pub const SAMPLE_RATE: SampleRate = nz!(48000); +pub const CHANNEL_COUNT: ChannelCount = nz!(2); mod audio_settings; -mod replays; -mod rodio_ext; pub use audio_settings::AudioSettings; -pub use rodio_ext::RodioExt; -use crate::audio_settings::LIVE_SETTINGS; - -// We are migrating to 16kHz sample rate from 48kHz. In the future -// once we are reasonably sure most users have upgraded we will -// remove the LEGACY parameters. -// -// We migrate to 16kHz because it is sufficient for speech and required -// by the denoiser and future Speech to Text layers. -pub const SAMPLE_RATE: NonZero = nz!(16000); -pub const CHANNEL_COUNT: NonZero = nz!(1); -pub const BUFFER_SIZE: usize = // echo canceller and livekit want 10ms of audio - (SAMPLE_RATE.get() as usize / 100) * CHANNEL_COUNT.get() as usize; - -pub const LEGACY_SAMPLE_RATE: NonZero = nz!(48000); -pub const LEGACY_CHANNEL_COUNT: NonZero = nz!(2); - -pub const REPLAY_DURATION: Duration = Duration::from_secs(30); - -pub fn init(cx: &mut App) { - LIVE_SETTINGS.initialize(cx); -} - -// TODO(jk): this is currently cached only once - we should observe and react instead -pub fn ensure_devices_initialized(cx: &mut App) { - if cx.has_global::() { - return; - } - cx.default_global::(); - let task = cx - .background_executor() - .spawn(async move { get_available_audio_devices() }); - cx.spawn(async move |cx: &mut AsyncApp| { - let devices = task.await; - cx.update(|cx| cx.set_global(AvailableAudioDevices(devices))); - cx.refresh(); - }) - .detach(); -} +mod audio_pipeline; +pub use audio_pipeline::{Audio, VoipParts}; +pub use audio_pipeline::{AudioDeviceInfo, AvailableAudioDevices}; +pub use audio_pipeline::{ensure_devices_initialized, resolve_device}; +// TODO(audio) replace with input test functionality in the audio crate +pub use audio_pipeline::RodioExt; +pub use audio_pipeline::init; +pub use audio_pipeline::{open_input_stream, open_test_output}; #[derive(Debug, Copy, Clone, Eq, Hash, PartialEq)] pub enum Sound { @@ -99,359 +44,3 @@ impl Sound { } } } - -pub struct Audio { - output_handle: Option, - #[cfg(not(any(all(target_os = "windows", target_env = "gnu"), target_os = "freebsd")))] - pub echo_canceller: Arc>, - source_cache: HashMap>>>>, - replays: replays::Replays, -} - -impl Default for Audio { - fn default() -> Self { - Self { - output_handle: Default::default(), - #[cfg(not(any( - all(target_os = "windows", target_env = "gnu"), - target_os = "freebsd" - )))] - echo_canceller: Arc::new(Mutex::new(apm::AudioProcessingModule::new( - true, false, false, false, - ))), - source_cache: Default::default(), - replays: Default::default(), - } - } -} - -impl Global for Audio {} - -impl Audio { - fn ensure_output_exists(&mut self, output_audio_device: Option) -> Result<&Mixer> { - #[cfg(debug_assertions)] - log::warn!( - "Audio does not sound correct without optimizations. Use a release build to debug audio issues" - ); - - if self.output_handle.is_none() { - let output_handle = open_output_stream(output_audio_device)?; - - // The webrtc apm is not yet compiling for windows & freebsd - #[cfg(not(any( - any(all(target_os = "windows", target_env = "gnu")), - target_os = "freebsd" - )))] - let echo_canceller = Arc::clone(&self.echo_canceller); - - #[cfg(not(any( - any(all(target_os = "windows", target_env = "gnu")), - target_os = "freebsd" - )))] - { - let source = rodio::source::Zero::new(CHANNEL_COUNT, SAMPLE_RATE) - .inspect_buffer::(move |buffer| { - let mut buf: [i16; _] = buffer.map(|s| s.to_sample()); - echo_canceller - .lock() - .process_reverse_stream( - &mut buf, - SAMPLE_RATE.get() as i32, - CHANNEL_COUNT.get().into(), - ) - .expect("Audio input and output threads should not panic"); - }); - output_handle.mixer().add(source); - } - - #[cfg(any( - any(all(target_os = "windows", target_env = "gnu")), - target_os = "freebsd" - ))] - { - let source = rodio::source::Zero::new(CHANNEL_COUNT, SAMPLE_RATE); - output_handle.mixer().add(source); - } - - self.output_handle = Some(output_handle); - } - - Ok(self - .output_handle - .as_ref() - .map(|h| h.mixer()) - .expect("we only get here if opening the outputstream succeeded")) - } - - pub fn save_replays( - &self, - executor: BackgroundExecutor, - ) -> gpui::Task> { - self.replays.replays_to_tar(executor) - } - - #[cfg(not(any(all(target_os = "windows", target_env = "gnu"), target_os = "freebsd")))] - pub fn open_microphone(voip_parts: VoipParts) -> anyhow::Result { - let stream = open_input_stream(voip_parts.input_audio_device)?; - let stream = stream - .possibly_disconnected_channels_to_mono() - .constant_samplerate(SAMPLE_RATE) - .limit(LimitSettings::live_performance()) - .process_buffer::(move |buffer| { - let mut int_buffer: [i16; _] = buffer.map(|s| s.to_sample()); - if voip_parts - .echo_canceller - .lock() - .process_stream( - &mut int_buffer, - SAMPLE_RATE.get() as i32, - CHANNEL_COUNT.get() as i32, - ) - .context("livekit audio processor error") - .log_err() - .is_some() - { - for (sample, processed) in buffer.iter_mut().zip(&int_buffer) { - *sample = (*processed).to_sample(); - } - } - }) - .denoise() - .context("Could not set up denoiser")? - .automatic_gain_control(AutomaticGainControlSettings { - target_level: 0.90, - attack_time: Duration::from_secs(1), - release_time: Duration::from_secs(0), - absolute_max_gain: 5.0, - }) - .periodic_access(Duration::from_millis(100), move |agc_source| { - agc_source - .set_enabled(LIVE_SETTINGS.auto_microphone_volume.load(Ordering::Relaxed)); - let denoise = agc_source.inner_mut(); - denoise.set_enabled(LIVE_SETTINGS.denoise.load(Ordering::Relaxed)); - }); - - let stream = if voip_parts.legacy_audio_compatible { - stream.constant_params(LEGACY_CHANNEL_COUNT, LEGACY_SAMPLE_RATE) - } else { - stream.constant_params(CHANNEL_COUNT, SAMPLE_RATE) - }; - - let (replay, stream) = stream.replayable(REPLAY_DURATION)?; - voip_parts - .replays - .add_voip_stream("local microphone".to_string(), replay); - - Ok(stream) - } - - pub fn play_voip_stream( - source: impl rodio::Source + Send + 'static, - speaker_name: String, - is_staff: bool, - cx: &mut App, - ) -> anyhow::Result<()> { - let (replay_source, source) = source - .constant_params(CHANNEL_COUNT, SAMPLE_RATE) - .automatic_gain_control(AutomaticGainControlSettings { - target_level: 0.90, - attack_time: Duration::from_secs(1), - release_time: Duration::from_secs(0), - absolute_max_gain: 5.0, - }) - .periodic_access(Duration::from_millis(100), move |agc_source| { - agc_source.set_enabled(LIVE_SETTINGS.auto_speaker_volume.load(Ordering::Relaxed)); - }) - .replayable(REPLAY_DURATION) - .expect("REPLAY_DURATION is longer than 100ms"); - let output_audio_device = AudioSettings::get_global(cx).output_audio_device.clone(); - - cx.update_default_global(|this: &mut Self, _cx| { - let output_mixer = this - .ensure_output_exists(output_audio_device) - .context("Could not get output mixer")?; - output_mixer.add(source); - if is_staff { - this.replays.add_voip_stream(speaker_name, replay_source); - } - Ok(()) - }) - } - - pub fn play_sound(sound: Sound, cx: &mut App) { - let output_audio_device = AudioSettings::get_global(cx).output_audio_device.clone(); - cx.update_default_global(|this: &mut Self, cx| { - let source = this.sound_source(sound, cx).log_err()?; - let output_mixer = this - .ensure_output_exists(output_audio_device) - .context("Could not get output mixer") - .log_err()?; - - output_mixer.add(source); - Some(()) - }); - } - - pub fn end_call(cx: &mut App) { - cx.update_default_global(|this: &mut Self, _cx| { - this.output_handle.take(); - }); - } - - fn sound_source(&mut self, sound: Sound, cx: &App) -> Result> { - if let Some(wav) = self.source_cache.get(&sound) { - return Ok(wav.clone()); - } - - let path = format!("sounds/{}.wav", sound.file()); - let bytes = cx - .asset_source() - .load(&path)? - .map(anyhow::Ok) - .with_context(|| format!("No asset available for path {path}"))?? - .into_owned(); - let cursor = Cursor::new(bytes); - let source = Decoder::new(cursor)?.buffered(); - - self.source_cache.insert(sound, source.clone()); - - Ok(source) - } -} - -#[cfg(not(any(all(target_os = "windows", target_env = "gnu"), target_os = "freebsd")))] -pub struct VoipParts { - echo_canceller: Arc>, - replays: replays::Replays, - legacy_audio_compatible: bool, - input_audio_device: Option, -} - -#[cfg(not(any(all(target_os = "windows", target_env = "gnu"), target_os = "freebsd")))] -impl VoipParts { - pub fn new(cx: &AsyncApp) -> anyhow::Result { - let (apm, replays) = cx.read_default_global::(|audio, _| { - (Arc::clone(&audio.echo_canceller), audio.replays.clone()) - }); - let legacy_audio_compatible = - AudioSettings::try_read_global(cx, |settings| settings.legacy_audio_compatible) - .unwrap_or(true); - let input_audio_device = - AudioSettings::try_read_global(cx, |settings| settings.input_audio_device.clone()) - .flatten(); - - Ok(Self { - legacy_audio_compatible, - echo_canceller: apm, - replays, - input_audio_device, - }) - } -} - -pub fn open_input_stream( - device_id: Option, -) -> anyhow::Result { - let builder = rodio::microphone::MicrophoneBuilder::new(); - let builder = if let Some(id) = device_id { - // TODO(jk): upstream patch - // if let Some(input_device) = default_host().device_by_id(id) { - // builder.device(input_device); - // } - let mut found = None; - for input in rodio::microphone::available_inputs()? { - if input.clone().into_inner().id()? == id { - found = Some(builder.device(input)); - break; - } - } - found.unwrap_or_else(|| builder.default_device())? - } else { - builder.default_device()? - }; - let stream = builder - .default_config()? - .prefer_sample_rates([ - SAMPLE_RATE, - SAMPLE_RATE.saturating_mul(rodio::nz!(2)), - SAMPLE_RATE.saturating_mul(rodio::nz!(3)), - SAMPLE_RATE.saturating_mul(rodio::nz!(4)), - ]) - .prefer_channel_counts([rodio::nz!(1), rodio::nz!(2), rodio::nz!(3), rodio::nz!(4)]) - .prefer_buffer_sizes(512..) - .open_stream()?; - log::info!("Opened microphone: {:?}", stream.config()); - Ok(stream) -} - -pub fn resolve_device(device_id: Option<&DeviceId>, input: bool) -> anyhow::Result { - if let Some(id) = device_id { - if let Some(device) = default_host().device_by_id(id) { - return Ok(device); - } - log::warn!("Selected audio device not found, falling back to default"); - } - if input { - default_host() - .default_input_device() - .context("no audio input device available") - } else { - default_host() - .default_output_device() - .context("no audio output device available") - } -} - -pub fn open_output_stream(device_id: Option) -> anyhow::Result { - let device = resolve_device(device_id.as_ref(), false)?; - let mut output_handle = DeviceSinkBuilder::from_device(device)? - .open_stream() - .context("Could not open output stream")?; - output_handle.log_on_drop(false); - log::info!("Output stream: {:?}", output_handle); - Ok(output_handle) -} - -#[derive(Clone, Debug)] -pub struct AudioDeviceInfo { - pub id: DeviceId, - pub desc: DeviceDescription, -} - -impl AudioDeviceInfo { - pub fn matches_input(&self, is_input: bool) -> bool { - if is_input { - self.desc.supports_input() - } else { - self.desc.supports_output() - } - } - - pub fn matches(&self, id: &DeviceId, is_input: bool) -> bool { - &self.id == id && self.matches_input(is_input) - } -} - -impl std::fmt::Display for AudioDeviceInfo { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "{} ({})", self.desc.name(), self.id) - } -} - -fn get_available_audio_devices() -> Vec { - let Some(devices) = default_host().devices().ok() else { - return Vec::new(); - }; - devices - .filter_map(|device| { - let id = device.id().ok()?; - let desc = device.description().ok()?; - Some(AudioDeviceInfo { id, desc }) - }) - .collect() -} - -#[derive(Default, Clone, Debug)] -pub struct AvailableAudioDevices(pub Vec); - -impl Global for AvailableAudioDevices {} diff --git a/crates/audio/src/audio_pipeline.rs b/crates/audio/src/audio_pipeline.rs new file mode 100644 index 0000000000000000000000000000000000000000..3d2a6ae32c381b1cab590946c35fbb68325af5db --- /dev/null +++ b/crates/audio/src/audio_pipeline.rs @@ -0,0 +1,355 @@ +use anyhow::{Context as _, Result}; +use collections::HashMap; +use cpal::{ + DeviceDescription, DeviceId, default_host, + traits::{DeviceTrait, HostTrait}, +}; +use gpui::{App, AsyncApp, BackgroundExecutor, BorrowAppContext, Global}; + +pub(super) use cpal::Sample; +pub(super) use rodio::source::LimitSettings; + +use rodio::{ + Decoder, DeviceSinkBuilder, MixerDeviceSink, Source, + mixer::Mixer, + source::{AutomaticGainControlSettings, Buffered}, +}; +use settings::Settings; +use std::{io::Cursor, path::PathBuf, sync::atomic::Ordering, time::Duration}; +use util::ResultExt; + +mod echo_canceller; +use echo_canceller::EchoCanceller; +mod replays; +mod rodio_ext; +pub use crate::audio_settings::AudioSettings; +pub use rodio_ext::RodioExt; + +use crate::audio_settings::LIVE_SETTINGS; + +use crate::Sound; + +use super::{CHANNEL_COUNT, SAMPLE_RATE}; +pub const BUFFER_SIZE: usize = // echo canceller and livekit want 10ms of audio + (SAMPLE_RATE.get() as usize / 100) * CHANNEL_COUNT.get() as usize; + +pub fn init(cx: &mut App) { + LIVE_SETTINGS.initialize(cx); +} + +// TODO(jk): this is currently cached only once - we should observe and react instead +pub fn ensure_devices_initialized(cx: &mut App) { + if cx.has_global::() { + return; + } + cx.default_global::(); + let task = cx + .background_executor() + .spawn(async move { get_available_audio_devices() }); + cx.spawn(async move |cx: &mut AsyncApp| { + let devices = task.await; + cx.update(|cx| cx.set_global(AvailableAudioDevices(devices))); + cx.refresh(); + }) + .detach(); +} + +#[derive(Default)] +pub struct Audio { + output: Option<(MixerDeviceSink, Mixer)>, + pub echo_canceller: EchoCanceller, + source_cache: HashMap>>>>, + replays: replays::Replays, +} + +impl Global for Audio {} + +impl Audio { + fn ensure_output_exists(&mut self, output_audio_device: Option) -> Result<&Mixer> { + #[cfg(debug_assertions)] + log::warn!( + "Audio does not sound correct without optimizations. Use a release build to debug audio issues" + ); + + if self.output.is_none() { + let (output_handle, output_mixer) = + open_output_stream(output_audio_device, self.echo_canceller.clone())?; + self.output = Some((output_handle, output_mixer)); + } + + Ok(self + .output + .as_ref() + .map(|(_, mixer)| mixer) + .expect("we only get here if opening the outputstream succeeded")) + } + + pub fn save_replays( + &self, + executor: BackgroundExecutor, + ) -> gpui::Task> { + self.replays.replays_to_tar(executor) + } + + pub fn open_microphone(mut voip_parts: VoipParts) -> anyhow::Result { + let stream = open_input_stream(voip_parts.input_audio_device)?; + let stream = stream + .possibly_disconnected_channels_to_mono() + .constant_params(CHANNEL_COUNT, SAMPLE_RATE) + .process_buffer::(move |buffer| { + let mut int_buffer: [i16; _] = buffer.map(|s| s.to_sample()); + if voip_parts + .echo_canceller + .process_stream(&mut int_buffer) + .log_err() + .is_some() + { + for (sample, processed) in buffer.iter_mut().zip(&int_buffer) { + *sample = (*processed).to_sample(); + } + } + }) + .limit(LimitSettings::live_performance()) + .automatic_gain_control(AutomaticGainControlSettings { + target_level: 0.90, + attack_time: Duration::from_secs(1), + release_time: Duration::from_secs(0), + absolute_max_gain: 5.0, + }) + .periodic_access(Duration::from_millis(100), move |agc_source| { + agc_source + .set_enabled(LIVE_SETTINGS.auto_microphone_volume.load(Ordering::Relaxed)); + let _ = LIVE_SETTINGS.denoise; // TODO(audio: re-introduce de-noising + }); + + let (replay, stream) = stream.replayable(crate::REPLAY_DURATION)?; + voip_parts + .replays + .add_voip_stream("local microphone".to_string(), replay); + + Ok(stream) + } + + pub fn play_voip_stream( + source: impl rodio::Source + Send + 'static, + speaker_name: String, + is_staff: bool, + cx: &mut App, + ) -> anyhow::Result<()> { + let (replay_source, source) = source + .automatic_gain_control(AutomaticGainControlSettings { + target_level: 0.90, + attack_time: Duration::from_secs(1), + release_time: Duration::from_secs(0), + absolute_max_gain: 5.0, + }) + .periodic_access(Duration::from_millis(100), move |agc_source| { + agc_source.set_enabled(LIVE_SETTINGS.auto_speaker_volume.load(Ordering::Relaxed)); + }) + .replayable(crate::REPLAY_DURATION) + .expect("REPLAY_DURATION is longer than 100ms"); + let output_audio_device = AudioSettings::get_global(cx).output_audio_device.clone(); + + cx.update_default_global(|this: &mut Self, _cx| { + let output_mixer = this + .ensure_output_exists(output_audio_device) + .context("Could not get output mixer")?; + output_mixer.add(source); + if is_staff { + this.replays.add_voip_stream(speaker_name, replay_source); + } + Ok(()) + }) + } + + pub fn play_sound(sound: Sound, cx: &mut App) { + let output_audio_device = AudioSettings::get_global(cx).output_audio_device.clone(); + cx.update_default_global(|this: &mut Self, cx| { + let source = this.sound_source(sound, cx).log_err()?; + let output_mixer = this + .ensure_output_exists(output_audio_device) + .context("Could not get output mixer") + .log_err()?; + + output_mixer.add(source); + Some(()) + }); + } + + pub fn end_call(cx: &mut App) { + cx.update_default_global(|this: &mut Self, _cx| { + this.output.take(); + }); + } + + fn sound_source(&mut self, sound: Sound, cx: &App) -> Result> { + if let Some(wav) = self.source_cache.get(&sound) { + return Ok(wav.clone()); + } + + let path = format!("sounds/{}.wav", sound.file()); + let bytes = cx + .asset_source() + .load(&path)? + .map(anyhow::Ok) + .with_context(|| format!("No asset available for path {path}"))?? + .into_owned(); + let cursor = Cursor::new(bytes); + let source = Decoder::new(cursor)?.buffered(); + + self.source_cache.insert(sound, source.clone()); + + Ok(source) + } +} + +pub struct VoipParts { + echo_canceller: EchoCanceller, + replays: replays::Replays, + input_audio_device: Option, +} + +impl VoipParts { + pub fn new(cx: &AsyncApp) -> anyhow::Result { + let (apm, replays) = cx.read_default_global::(|audio, _| { + (audio.echo_canceller.clone(), audio.replays.clone()) + }); + let input_audio_device = + AudioSettings::try_read_global(cx, |settings| settings.input_audio_device.clone()) + .flatten(); + + Ok(Self { + echo_canceller: apm, + replays, + input_audio_device, + }) + } +} + +pub fn open_input_stream( + device_id: Option, +) -> anyhow::Result { + let builder = rodio::microphone::MicrophoneBuilder::new(); + let builder = if let Some(id) = device_id { + // TODO(jk): upstream patch + // if let Some(input_device) = default_host().device_by_id(id) { + // builder.device(input_device); + // } + let mut found = None; + for input in rodio::microphone::available_inputs()? { + if input.clone().into_inner().id()? == id { + found = Some(builder.device(input)); + break; + } + } + found.unwrap_or_else(|| builder.default_device())? + } else { + builder.default_device()? + }; + let stream = builder + .default_config()? + .prefer_sample_rates([ + SAMPLE_RATE, + SAMPLE_RATE.saturating_mul(rodio::nz!(2)), + SAMPLE_RATE.saturating_mul(rodio::nz!(3)), + SAMPLE_RATE.saturating_mul(rodio::nz!(4)), + ]) + .prefer_channel_counts([rodio::nz!(1), rodio::nz!(2), rodio::nz!(3), rodio::nz!(4)]) + .prefer_buffer_sizes(512..) + .open_stream()?; + log::info!("Opened microphone: {:?}", stream.config()); + Ok(stream) +} + +pub fn resolve_device(device_id: Option<&DeviceId>, input: bool) -> anyhow::Result { + if let Some(id) = device_id { + if let Some(device) = default_host().device_by_id(id) { + return Ok(device); + } + log::warn!("Selected audio device not found, falling back to default"); + } + if input { + default_host() + .default_input_device() + .context("no audio input device available") + } else { + default_host() + .default_output_device() + .context("no audio output device available") + } +} + +pub fn open_test_output(device_id: Option) -> anyhow::Result { + let device = resolve_device(device_id.as_ref(), false)?; + DeviceSinkBuilder::from_device(device)? + .open_stream() + .context("Could not open output stream") +} + +pub fn open_output_stream( + device_id: Option, + mut echo_canceller: EchoCanceller, +) -> anyhow::Result<(MixerDeviceSink, Mixer)> { + let device = resolve_device(device_id.as_ref(), false)?; + let mut output_handle = DeviceSinkBuilder::from_device(device)? + .open_stream() + .context("Could not open output stream")?; + output_handle.log_on_drop(false); + log::info!("Output stream: {:?}", output_handle); + + let (output_mixer, source) = rodio::mixer::mixer(CHANNEL_COUNT, SAMPLE_RATE); + // otherwise the mixer ends as it's empty + output_mixer.add(rodio::source::Zero::new(CHANNEL_COUNT, SAMPLE_RATE)); + let echo_cancelling_source = source // apply echo cancellation just before output + .inspect_buffer::(move |buffer| { + let mut buf: [i16; _] = buffer.map(|s| s.to_sample()); + echo_canceller.process_reverse_stream(&mut buf) + }); + output_handle.mixer().add(echo_cancelling_source); + + Ok((output_handle, output_mixer)) +} + +#[derive(Clone, Debug)] +pub struct AudioDeviceInfo { + pub id: DeviceId, + pub desc: DeviceDescription, +} + +impl AudioDeviceInfo { + pub fn matches_input(&self, is_input: bool) -> bool { + if is_input { + self.desc.supports_input() + } else { + self.desc.supports_output() + } + } + + pub fn matches(&self, id: &DeviceId, is_input: bool) -> bool { + &self.id == id && self.matches_input(is_input) + } +} + +impl std::fmt::Display for AudioDeviceInfo { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{} ({})", self.desc.name(), self.id) + } +} + +fn get_available_audio_devices() -> Vec { + let Some(devices) = default_host().devices().ok() else { + return Vec::new(); + }; + devices + .filter_map(|device| { + let id = device.id().ok()?; + let desc = device.description().ok()?; + Some(AudioDeviceInfo { id, desc }) + }) + .collect() +} + +#[derive(Default, Clone, Debug)] +pub struct AvailableAudioDevices(pub Vec); + +impl Global for AvailableAudioDevices {} diff --git a/crates/audio/src/audio_pipeline/echo_canceller.rs b/crates/audio/src/audio_pipeline/echo_canceller.rs new file mode 100644 index 0000000000000000000000000000000000000000..ec612b1b448bd33871b33638468747b765fc3c1a --- /dev/null +++ b/crates/audio/src/audio_pipeline/echo_canceller.rs @@ -0,0 +1,54 @@ +#[cfg(not(any(all(target_os = "windows", target_env = "gnu"), target_os = "freebsd")))] +mod real_implementation { + use anyhow::Context; + use libwebrtc::native::apm; + use parking_lot::Mutex; + use std::sync::Arc; + + use crate::{CHANNEL_COUNT, SAMPLE_RATE}; + + #[derive(Clone)] + pub struct EchoCanceller(Arc>); + + impl Default for EchoCanceller { + fn default() -> Self { + Self(Arc::new(Mutex::new(apm::AudioProcessingModule::new( + true, false, false, false, + )))) + } + } + + impl EchoCanceller { + pub fn process_reverse_stream(&mut self, buf: &mut [i16]) { + self.0 + .lock() + .process_reverse_stream(buf, SAMPLE_RATE.get() as i32, CHANNEL_COUNT.get().into()) + .expect("Audio input and output threads should not panic"); + } + + pub fn process_stream(&mut self, buf: &mut [i16]) -> anyhow::Result<()> { + self.0 + .lock() + .process_stream(buf, SAMPLE_RATE.get() as i32, CHANNEL_COUNT.get() as i32) + .context("livekit audio processor error") + } + } +} + +#[cfg(any(all(target_os = "windows", target_env = "gnu"), target_os = "freebsd"))] +mod fake_implementation { + #[derive(Clone, Default)] + pub struct EchoCanceller; + + impl EchoCanceller { + pub fn process_reverse_stream(&mut self, _buf: &mut [i16]) {} + pub fn process_stream(&mut self, _buf: &mut [i16]) -> anyhow::Result<()> { + Ok(()) + } + } +} + +#[cfg(any(all(target_os = "windows", target_env = "gnu"), target_os = "freebsd"))] +pub use fake_implementation::EchoCanceller; +#[cfg(not(any(all(target_os = "windows", target_env = "gnu"), target_os = "freebsd")))] +pub use real_implementation::EchoCanceller; diff --git a/crates/audio/src/replays.rs b/crates/audio/src/audio_pipeline/replays.rs similarity index 97% rename from crates/audio/src/replays.rs rename to crates/audio/src/audio_pipeline/replays.rs index bb21df51e5642bf633d068d544690cb26a239151..3228700b2df5581e862da6ec71787704985386a2 100644 --- a/crates/audio/src/replays.rs +++ b/crates/audio/src/audio_pipeline/replays.rs @@ -8,7 +8,8 @@ use rodio::Source; use smol::fs::File; use std::{io, path::PathBuf, sync::Arc, time::Duration}; -use crate::{REPLAY_DURATION, rodio_ext::Replay}; +use crate::REPLAY_DURATION; +use crate::audio_pipeline::rodio_ext::Replay; #[derive(Default, Clone)] pub(crate) struct Replays(Arc>>); diff --git a/crates/audio/src/rodio_ext.rs b/crates/audio/src/audio_pipeline/rodio_ext.rs similarity index 100% rename from crates/audio/src/rodio_ext.rs rename to crates/audio/src/audio_pipeline/rodio_ext.rs diff --git a/crates/livekit_client/src/livekit_client/playback.rs b/crates/livekit_client/src/livekit_client/playback.rs index d6fc061321acd8d40a7df0e615bad0b8ecbb1f26..4b3c55109a297c888ac64d5742a1df91163d77e0 100644 --- a/crates/livekit_client/src/livekit_client/playback.rs +++ b/crates/livekit_client/src/livekit_client/playback.rs @@ -1,6 +1,6 @@ use anyhow::{Context as _, Result}; -use audio::{AudioSettings, CHANNEL_COUNT, LEGACY_CHANNEL_COUNT, LEGACY_SAMPLE_RATE, SAMPLE_RATE}; +use audio::{AudioSettings, CHANNEL_COUNT, SAMPLE_RATE}; use cpal::DeviceId; use cpal::traits::{DeviceTrait, StreamTrait as _}; use futures::channel::mpsc::Sender; @@ -99,8 +99,8 @@ impl AudioStack { let next_ssrc = self.next_ssrc.fetch_add(1, Ordering::Relaxed); let source = AudioMixerSource { ssrc: next_ssrc, - sample_rate: LEGACY_SAMPLE_RATE.get(), - num_channels: LEGACY_CHANNEL_COUNT.get() as u32, + sample_rate: SAMPLE_RATE.get(), + num_channels: CHANNEL_COUNT.get() as u32, buffer: Arc::default(), }; self.mixer.lock().add_source(source.clone()); @@ -145,8 +145,8 @@ impl AudioStack { executor, apm, mixer, - LEGACY_SAMPLE_RATE.get(), - LEGACY_CHANNEL_COUNT.get().into(), + SAMPLE_RATE.get(), + CHANNEL_COUNT.get().into(), output_audio_device, ) .await @@ -171,8 +171,9 @@ impl AudioStack { NativeAudioSource::new( // n.b. this struct's options are always ignored, noise cancellation is provided by apm. AudioSourceOptions::default(), - LEGACY_SAMPLE_RATE.get(), - LEGACY_CHANNEL_COUNT.get().into(), + SAMPLE_RATE.get(), // TODO(audio): this was legacy params, + // removed for now for simplicity + CHANNEL_COUNT.get().into(), 10, ) } else { @@ -233,8 +234,8 @@ impl AudioStack { executor, apm, frame_tx, - LEGACY_SAMPLE_RATE.get(), - LEGACY_CHANNEL_COUNT.get().into(), + SAMPLE_RATE.get(), // TODO(audio): was legacy removed for now + CHANNEL_COUNT.get().into(), input_audio_device, ) .await diff --git a/crates/livekit_client/src/livekit_client/playback/source.rs b/crates/livekit_client/src/livekit_client/playback/source.rs index b90c3613f8215481a4a535eb81c665fccae80e5c..2738109ff8fc972e9ab53768fd212d6f5ff5f194 100644 --- a/crates/livekit_client/src/livekit_client/playback/source.rs +++ b/crates/livekit_client/src/livekit_client/playback/source.rs @@ -7,7 +7,7 @@ use rodio::{ ChannelCount, SampleRate, Source, buffer::SamplesBuffer, conversions::SampleTypeConverter, }; -use audio::{CHANNEL_COUNT, LEGACY_CHANNEL_COUNT, LEGACY_SAMPLE_RATE, SAMPLE_RATE}; +use audio::{CHANNEL_COUNT, SAMPLE_RATE}; fn frame_to_samplesbuffer(frame: AudioFrame) -> SamplesBuffer { let samples = frame.data.iter().copied(); @@ -35,7 +35,8 @@ impl LiveKitStream { legacy: bool, ) -> Self { let (channel_count, sample_rate) = if legacy { - (LEGACY_CHANNEL_COUNT, LEGACY_SAMPLE_RATE) + // (LEGACY_CHANNEL_COUNT, LEGACY_SAMPLE_RATE) TODO(audio): do this or remove + (CHANNEL_COUNT, SAMPLE_RATE) } else { (CHANNEL_COUNT, SAMPLE_RATE) }; diff --git a/crates/settings_ui/src/pages/audio_test_window.rs b/crates/settings_ui/src/pages/audio_test_window.rs index 63bd1d14ffb3ad9c7d1b2d176d9de58aa762ec25..d50d017d7abde836fb2945baf2f1434472281005 100644 --- a/crates/settings_ui/src/pages/audio_test_window.rs +++ b/crates/settings_ui/src/pages/audio_test_window.rs @@ -88,7 +88,7 @@ fn start_test_playback( } }; - let Ok(output) = audio::open_output_stream(output_device_id) else { + let Ok(output) = audio::open_test_output(output_device_id) else { log::error!("Could not open output device for audio test"); return; };