audio_pipeline.rs

  1use anyhow::{Context as _, Result};
  2use collections::HashMap;
  3use cpal::{
  4    DeviceDescription, DeviceId, default_host,
  5    traits::{DeviceTrait, HostTrait},
  6};
  7use gpui::{App, AsyncApp, BackgroundExecutor, BorrowAppContext, Global};
  8
  9pub(super) use cpal::Sample;
 10pub(super) use rodio::source::LimitSettings;
 11
 12use rodio::{
 13    Decoder, DeviceSinkBuilder, MixerDeviceSink, Source,
 14    mixer::Mixer,
 15    source::{AutomaticGainControlSettings, Buffered},
 16};
 17use settings::Settings;
 18use std::{io::Cursor, path::PathBuf, sync::atomic::Ordering, time::Duration};
 19use util::ResultExt;
 20
 21mod echo_canceller;
 22use echo_canceller::EchoCanceller;
 23mod replays;
 24mod rodio_ext;
 25pub use crate::audio_settings::AudioSettings;
 26pub use rodio_ext::RodioExt;
 27
 28use crate::audio_settings::LIVE_SETTINGS;
 29
 30use crate::Sound;
 31
 32use super::{CHANNEL_COUNT, SAMPLE_RATE};
 33pub const BUFFER_SIZE: usize = // echo canceller and livekit want 10ms of audio
 34    (SAMPLE_RATE.get() as usize / 100) * CHANNEL_COUNT.get() as usize;
 35
 36pub fn init(cx: &mut App) {
 37    LIVE_SETTINGS.initialize(cx);
 38}
 39
 40// TODO(jk): this is currently cached only once - we should observe and react instead
 41pub fn ensure_devices_initialized(cx: &mut App) {
 42    if cx.has_global::<AvailableAudioDevices>() {
 43        return;
 44    }
 45    cx.default_global::<AvailableAudioDevices>();
 46    let task = cx
 47        .background_executor()
 48        .spawn(async move { get_available_audio_devices() });
 49    cx.spawn(async move |cx: &mut AsyncApp| {
 50        let devices = task.await;
 51        cx.update(|cx| cx.set_global(AvailableAudioDevices(devices)));
 52        cx.refresh();
 53    })
 54    .detach();
 55}
 56
 57#[derive(Default)]
 58pub struct Audio {
 59    output: Option<(MixerDeviceSink, Mixer)>,
 60    pub echo_canceller: EchoCanceller,
 61    source_cache: HashMap<Sound, Buffered<Decoder<Cursor<Vec<u8>>>>>,
 62    replays: replays::Replays,
 63}
 64
 65impl Global for Audio {}
 66
 67impl Audio {
 68    fn ensure_output_exists(&mut self, output_audio_device: Option<DeviceId>) -> Result<&Mixer> {
 69        #[cfg(debug_assertions)]
 70        log::warn!(
 71            "Audio does not sound correct without optimizations. Use a release build to debug audio issues"
 72        );
 73
 74        if self.output.is_none() {
 75            let (output_handle, output_mixer) =
 76                open_output_stream(output_audio_device, self.echo_canceller.clone())?;
 77            self.output = Some((output_handle, output_mixer));
 78        }
 79
 80        Ok(self
 81            .output
 82            .as_ref()
 83            .map(|(_, mixer)| mixer)
 84            .expect("we only get here if opening the outputstream succeeded"))
 85    }
 86
 87    pub fn save_replays(
 88        &self,
 89        executor: BackgroundExecutor,
 90    ) -> gpui::Task<anyhow::Result<(PathBuf, Duration)>> {
 91        self.replays.replays_to_tar(executor)
 92    }
 93
 94    pub fn open_microphone(mut voip_parts: VoipParts) -> anyhow::Result<impl Source> {
 95        let stream = open_input_stream(voip_parts.input_audio_device)?;
 96        let stream = stream
 97            .possibly_disconnected_channels_to_mono()
 98            .constant_params(CHANNEL_COUNT, SAMPLE_RATE)
 99            .process_buffer::<BUFFER_SIZE, _>(move |buffer| {
100                let mut int_buffer: [i16; _] = buffer.map(|s| s.to_sample());
101                if voip_parts
102                    .echo_canceller
103                    .process_stream(&mut int_buffer)
104                    .log_err()
105                    .is_some()
106                {
107                    for (sample, processed) in buffer.iter_mut().zip(&int_buffer) {
108                        *sample = (*processed).to_sample();
109                    }
110                }
111            })
112            .limit(LimitSettings::live_performance())
113            .automatic_gain_control(AutomaticGainControlSettings {
114                target_level: 0.90,
115                attack_time: Duration::from_secs(1),
116                release_time: Duration::from_secs(0),
117                absolute_max_gain: 5.0,
118            })
119            .periodic_access(Duration::from_millis(100), move |agc_source| {
120                agc_source
121                    .set_enabled(LIVE_SETTINGS.auto_microphone_volume.load(Ordering::Relaxed));
122                let _ = LIVE_SETTINGS.denoise; // TODO(audio: re-introduce de-noising
123            });
124
125        let (replay, stream) = stream.replayable(crate::REPLAY_DURATION)?;
126        voip_parts
127            .replays
128            .add_voip_stream("local microphone".to_string(), replay);
129
130        Ok(stream)
131    }
132
133    pub fn play_voip_stream(
134        source: impl rodio::Source + Send + 'static,
135        speaker_name: String,
136        is_staff: bool,
137        cx: &mut App,
138    ) -> anyhow::Result<()> {
139        let (replay_source, source) = source
140            .automatic_gain_control(AutomaticGainControlSettings {
141                target_level: 0.90,
142                attack_time: Duration::from_secs(1),
143                release_time: Duration::from_secs(0),
144                absolute_max_gain: 5.0,
145            })
146            .periodic_access(Duration::from_millis(100), move |agc_source| {
147                agc_source.set_enabled(LIVE_SETTINGS.auto_speaker_volume.load(Ordering::Relaxed));
148            })
149            .replayable(crate::REPLAY_DURATION)
150            .expect("REPLAY_DURATION is longer than 100ms");
151        let output_audio_device = AudioSettings::get_global(cx).output_audio_device.clone();
152
153        cx.update_default_global(|this: &mut Self, _cx| {
154            let output_mixer = this
155                .ensure_output_exists(output_audio_device)
156                .context("Could not get output mixer")?;
157            output_mixer.add(source);
158            if is_staff {
159                this.replays.add_voip_stream(speaker_name, replay_source);
160            }
161            Ok(())
162        })
163    }
164
165    pub fn play_sound(sound: Sound, cx: &mut App) {
166        let output_audio_device = AudioSettings::get_global(cx).output_audio_device.clone();
167        cx.update_default_global(|this: &mut Self, cx| {
168            let source = this.sound_source(sound, cx).log_err()?;
169            let output_mixer = this
170                .ensure_output_exists(output_audio_device)
171                .context("Could not get output mixer")
172                .log_err()?;
173
174            output_mixer.add(source);
175            Some(())
176        });
177    }
178
179    pub fn end_call(cx: &mut App) {
180        cx.update_default_global(|this: &mut Self, _cx| {
181            this.output.take();
182        });
183    }
184
185    fn sound_source(&mut self, sound: Sound, cx: &App) -> Result<impl Source + use<>> {
186        if let Some(wav) = self.source_cache.get(&sound) {
187            return Ok(wav.clone());
188        }
189
190        let path = format!("sounds/{}.wav", sound.file());
191        let bytes = cx
192            .asset_source()
193            .load(&path)?
194            .map(anyhow::Ok)
195            .with_context(|| format!("No asset available for path {path}"))??
196            .into_owned();
197        let cursor = Cursor::new(bytes);
198        let source = Decoder::new(cursor)?.buffered();
199
200        self.source_cache.insert(sound, source.clone());
201
202        Ok(source)
203    }
204}
205
206pub struct VoipParts {
207    echo_canceller: EchoCanceller,
208    replays: replays::Replays,
209    input_audio_device: Option<DeviceId>,
210}
211
212impl VoipParts {
213    pub fn new(cx: &AsyncApp) -> anyhow::Result<Self> {
214        let (apm, replays) = cx.read_default_global::<Audio, _>(|audio, _| {
215            (audio.echo_canceller.clone(), audio.replays.clone())
216        });
217        let input_audio_device =
218            AudioSettings::try_read_global(cx, |settings| settings.input_audio_device.clone())
219                .flatten();
220
221        Ok(Self {
222            echo_canceller: apm,
223            replays,
224            input_audio_device,
225        })
226    }
227}
228
229pub fn open_input_stream(
230    device_id: Option<DeviceId>,
231) -> anyhow::Result<rodio::microphone::Microphone> {
232    let builder = rodio::microphone::MicrophoneBuilder::new();
233    let builder = if let Some(id) = device_id {
234        // TODO(jk): upstream patch
235        // if let Some(input_device) = default_host().device_by_id(id) {
236        //     builder.device(input_device);
237        // }
238        let mut found = None;
239        for input in rodio::microphone::available_inputs()? {
240            if input.clone().into_inner().id()? == id {
241                found = Some(builder.device(input));
242                break;
243            }
244        }
245        found.unwrap_or_else(|| builder.default_device())?
246    } else {
247        builder.default_device()?
248    };
249    let stream = builder
250        .default_config()?
251        .prefer_sample_rates([
252            SAMPLE_RATE,
253            SAMPLE_RATE.saturating_mul(rodio::nz!(2)),
254            SAMPLE_RATE.saturating_mul(rodio::nz!(3)),
255            SAMPLE_RATE.saturating_mul(rodio::nz!(4)),
256        ])
257        .prefer_channel_counts([rodio::nz!(1), rodio::nz!(2), rodio::nz!(3), rodio::nz!(4)])
258        .prefer_buffer_sizes(512..)
259        .open_stream()?;
260    log::info!("Opened microphone: {:?}", stream.config());
261    Ok(stream)
262}
263
264pub fn resolve_device(device_id: Option<&DeviceId>, input: bool) -> anyhow::Result<cpal::Device> {
265    if let Some(id) = device_id {
266        if let Some(device) = default_host().device_by_id(id) {
267            return Ok(device);
268        }
269        log::warn!("Selected audio device not found, falling back to default");
270    }
271    if input {
272        default_host()
273            .default_input_device()
274            .context("no audio input device available")
275    } else {
276        default_host()
277            .default_output_device()
278            .context("no audio output device available")
279    }
280}
281
282pub fn open_test_output(device_id: Option<DeviceId>) -> anyhow::Result<MixerDeviceSink> {
283    let device = resolve_device(device_id.as_ref(), false)?;
284    DeviceSinkBuilder::from_device(device)?
285        .open_stream()
286        .context("Could not open output stream")
287}
288
289pub fn open_output_stream(
290    device_id: Option<DeviceId>,
291    mut echo_canceller: EchoCanceller,
292) -> anyhow::Result<(MixerDeviceSink, Mixer)> {
293    let device = resolve_device(device_id.as_ref(), false)?;
294    let mut output_handle = DeviceSinkBuilder::from_device(device)?
295        .open_stream()
296        .context("Could not open output stream")?;
297    output_handle.log_on_drop(false);
298    log::info!("Output stream: {:?}", output_handle);
299
300    let (output_mixer, source) = rodio::mixer::mixer(CHANNEL_COUNT, SAMPLE_RATE);
301    // otherwise the mixer ends as it's empty
302    output_mixer.add(rodio::source::Zero::new(CHANNEL_COUNT, SAMPLE_RATE));
303    let echo_cancelling_source = source // apply echo cancellation just before output
304        .inspect_buffer::<BUFFER_SIZE, _>(move |buffer| {
305            let mut buf: [i16; _] = buffer.map(|s| s.to_sample());
306            echo_canceller.process_reverse_stream(&mut buf)
307        });
308    output_handle.mixer().add(echo_cancelling_source);
309
310    Ok((output_handle, output_mixer))
311}
312
313#[derive(Clone, Debug)]
314pub struct AudioDeviceInfo {
315    pub id: DeviceId,
316    pub desc: DeviceDescription,
317}
318
319impl AudioDeviceInfo {
320    pub fn matches_input(&self, is_input: bool) -> bool {
321        if is_input {
322            self.desc.supports_input()
323        } else {
324            self.desc.supports_output()
325        }
326    }
327
328    pub fn matches(&self, id: &DeviceId, is_input: bool) -> bool {
329        &self.id == id && self.matches_input(is_input)
330    }
331}
332
333impl std::fmt::Display for AudioDeviceInfo {
334    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
335        write!(f, "{} ({})", self.desc.name(), self.id)
336    }
337}
338
339fn get_available_audio_devices() -> Vec<AudioDeviceInfo> {
340    let Some(devices) = default_host().devices().ok() else {
341        return Vec::new();
342    };
343    devices
344        .filter_map(|device| {
345            let id = device.id().ok()?;
346            let desc = device.description().ok()?;
347            Some(AudioDeviceInfo { id, desc })
348        })
349        .collect()
350}
351
352#[derive(Default, Clone, Debug)]
353pub struct AvailableAudioDevices(pub Vec<AudioDeviceInfo>);
354
355impl Global for AvailableAudioDevices {}