audio_pipeline.rs

  1use anyhow::{Context as _, Result};
  2use collections::HashMap;
  3use cpal::{
  4    DeviceDescription, DeviceId, default_host,
  5    traits::{DeviceTrait, HostTrait},
  6};
  7use gpui::{App, AsyncApp, BorrowAppContext, Global};
  8
  9pub(super) use cpal::Sample;
 10
 11use rodio::{Decoder, DeviceSinkBuilder, MixerDeviceSink, Source, mixer::Mixer, source::Buffered};
 12use settings::Settings;
 13use std::io::Cursor;
 14use util::ResultExt;
 15
 16mod echo_canceller;
 17use echo_canceller::EchoCanceller;
 18mod rodio_ext;
 19pub use crate::audio_settings::AudioSettings;
 20pub use rodio_ext::RodioExt;
 21
 22use crate::audio_settings::LIVE_SETTINGS;
 23
 24use crate::Sound;
 25
 26use super::{CHANNEL_COUNT, SAMPLE_RATE};
 27pub const BUFFER_SIZE: usize = // echo canceller and livekit want 10ms of audio
 28    (SAMPLE_RATE.get() as usize / 100) * CHANNEL_COUNT.get() as usize;
 29
 30pub fn init(cx: &mut App) {
 31    LIVE_SETTINGS.initialize(cx);
 32}
 33
 34// TODO(jk): this is currently cached only once - we should observe and react instead
 35pub fn ensure_devices_initialized(cx: &mut App) {
 36    if cx.has_global::<AvailableAudioDevices>() {
 37        return;
 38    }
 39    cx.default_global::<AvailableAudioDevices>();
 40    let task = cx
 41        .background_executor()
 42        .spawn(async move { get_available_audio_devices() });
 43    cx.spawn(async move |cx: &mut AsyncApp| {
 44        let devices = task.await;
 45        cx.update(|cx| cx.set_global(AvailableAudioDevices(devices)));
 46        cx.refresh();
 47    })
 48    .detach();
 49}
 50
 51#[derive(Default)]
 52pub struct Audio {
 53    output: Option<(MixerDeviceSink, Mixer)>,
 54    pub echo_canceller: EchoCanceller,
 55    source_cache: HashMap<Sound, Buffered<Decoder<Cursor<Vec<u8>>>>>,
 56}
 57
 58impl Global for Audio {}
 59
 60impl Audio {
 61    fn ensure_output_exists(&mut self, output_audio_device: Option<DeviceId>) -> Result<&Mixer> {
 62        #[cfg(debug_assertions)]
 63        log::warn!(
 64            "Audio does not sound correct without optimizations. Use a release build to debug audio issues"
 65        );
 66
 67        if self.output.is_none() {
 68            let (output_handle, output_mixer) =
 69                open_output_stream(output_audio_device, self.echo_canceller.clone())?;
 70            self.output = Some((output_handle, output_mixer));
 71        }
 72
 73        Ok(self
 74            .output
 75            .as_ref()
 76            .map(|(_, mixer)| mixer)
 77            .expect("we only get here if opening the outputstream succeeded"))
 78    }
 79
 80    pub fn play_sound(sound: Sound, cx: &mut App) {
 81        let output_audio_device = AudioSettings::get_global(cx).output_audio_device.clone();
 82        cx.update_default_global(|this: &mut Self, cx| {
 83            let source = this.sound_source(sound, cx).log_err()?;
 84            let output_mixer = this
 85                .ensure_output_exists(output_audio_device)
 86                .context("Could not get output mixer")
 87                .log_err()?;
 88
 89            output_mixer.add(source);
 90            Some(())
 91        });
 92    }
 93
 94    pub fn end_call(cx: &mut App) {
 95        cx.update_default_global(|this: &mut Self, _cx| {
 96            this.output.take();
 97        });
 98    }
 99
100    fn sound_source(&mut self, sound: Sound, cx: &App) -> Result<impl Source + use<>> {
101        if let Some(wav) = self.source_cache.get(&sound) {
102            return Ok(wav.clone());
103        }
104
105        let path = format!("sounds/{}.wav", sound.file());
106        let bytes = cx
107            .asset_source()
108            .load(&path)?
109            .map(anyhow::Ok)
110            .with_context(|| format!("No asset available for path {path}"))??
111            .into_owned();
112        let cursor = Cursor::new(bytes);
113        let source = Decoder::new(cursor)?.buffered();
114
115        self.source_cache.insert(sound, source.clone());
116
117        Ok(source)
118    }
119}
120
121pub fn open_input_stream(
122    device_id: Option<DeviceId>,
123) -> anyhow::Result<rodio::microphone::Microphone> {
124    let builder = rodio::microphone::MicrophoneBuilder::new();
125    let builder = if let Some(id) = device_id {
126        // TODO(jk): upstream patch
127        // if let Some(input_device) = default_host().device_by_id(id) {
128        //     builder.device(input_device);
129        // }
130        let mut found = None;
131        for input in rodio::microphone::available_inputs()? {
132            if input.clone().into_inner().id()? == id {
133                found = Some(builder.device(input));
134                break;
135            }
136        }
137        found.unwrap_or_else(|| builder.default_device())?
138    } else {
139        builder.default_device()?
140    };
141    let stream = builder
142        .default_config()?
143        .prefer_sample_rates([
144            SAMPLE_RATE,
145            SAMPLE_RATE.saturating_mul(rodio::nz!(2)),
146            SAMPLE_RATE.saturating_mul(rodio::nz!(3)),
147            SAMPLE_RATE.saturating_mul(rodio::nz!(4)),
148        ])
149        .prefer_channel_counts([rodio::nz!(1), rodio::nz!(2), rodio::nz!(3), rodio::nz!(4)])
150        .prefer_buffer_sizes(512..)
151        .open_stream()?;
152    log::info!("Opened microphone: {:?}", stream.config());
153    Ok(stream)
154}
155
156pub fn resolve_device(device_id: Option<&DeviceId>, input: bool) -> anyhow::Result<cpal::Device> {
157    if let Some(id) = device_id {
158        if let Some(device) = default_host().device_by_id(id) {
159            return Ok(device);
160        }
161        log::warn!("Selected audio device not found, falling back to default");
162    }
163    if input {
164        default_host()
165            .default_input_device()
166            .context("no audio input device available")
167    } else {
168        default_host()
169            .default_output_device()
170            .context("no audio output device available")
171    }
172}
173
174pub fn open_test_output(device_id: Option<DeviceId>) -> anyhow::Result<MixerDeviceSink> {
175    let device = resolve_device(device_id.as_ref(), false)?;
176    DeviceSinkBuilder::from_device(device)?
177        .open_stream()
178        .context("Could not open output stream")
179}
180
181pub fn open_output_stream(
182    device_id: Option<DeviceId>,
183    mut echo_canceller: EchoCanceller,
184) -> anyhow::Result<(MixerDeviceSink, Mixer)> {
185    let device = resolve_device(device_id.as_ref(), false)?;
186    let mut output_handle = DeviceSinkBuilder::from_device(device)?
187        .open_stream()
188        .context("Could not open output stream")?;
189    output_handle.log_on_drop(false);
190    log::info!("Output stream: {:?}", output_handle);
191
192    let (output_mixer, source) = rodio::mixer::mixer(CHANNEL_COUNT, SAMPLE_RATE);
193    // otherwise the mixer ends as it's empty
194    output_mixer.add(rodio::source::Zero::new(CHANNEL_COUNT, SAMPLE_RATE));
195    let echo_cancelling_source = source // apply echo cancellation just before output
196        .inspect_buffer::<BUFFER_SIZE, _>(move |buffer| {
197            let mut buf: [i16; _] = buffer.map(|s| s.to_sample());
198            echo_canceller.process_reverse_stream(&mut buf)
199        });
200    output_handle.mixer().add(echo_cancelling_source);
201
202    Ok((output_handle, output_mixer))
203}
204
205#[derive(Clone, Debug)]
206pub struct AudioDeviceInfo {
207    pub id: DeviceId,
208    pub desc: DeviceDescription,
209}
210
211impl AudioDeviceInfo {
212    pub fn matches_input(&self, is_input: bool) -> bool {
213        if is_input {
214            self.desc.supports_input()
215        } else {
216            self.desc.supports_output()
217        }
218    }
219
220    pub fn matches(&self, id: &DeviceId, is_input: bool) -> bool {
221        &self.id == id && self.matches_input(is_input)
222    }
223}
224
225impl std::fmt::Display for AudioDeviceInfo {
226    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
227        write!(f, "{} ({})", self.desc.name(), self.id)
228    }
229}
230
231fn get_available_audio_devices() -> Vec<AudioDeviceInfo> {
232    let Some(devices) = default_host().devices().ok() else {
233        return Vec::new();
234    };
235    devices
236        .filter_map(|device| {
237            let id = device.id().ok()?;
238            let desc = device.description().ok()?;
239            Some(AudioDeviceInfo { id, desc })
240        })
241        .collect()
242}
243
244#[derive(Default, Clone, Debug)]
245pub struct AvailableAudioDevices(pub Vec<AudioDeviceInfo>);
246
247impl Global for AvailableAudioDevices {}