audio.rs

  1use anyhow::{Context as _, Result};
  2use collections::HashMap;
  3use cpal::{
  4    DeviceDescription, DeviceId, default_host,
  5    traits::{DeviceTrait, HostTrait},
  6};
  7use gpui::{App, AsyncApp, BackgroundExecutor, BorrowAppContext, Global};
  8
  9#[cfg(not(any(all(target_os = "windows", target_env = "gnu"), target_os = "freebsd")))]
 10mod non_windows_and_freebsd_deps {
 11    pub(super) use cpal::Sample;
 12    pub(super) use libwebrtc::native::apm;
 13    pub(super) use parking_lot::Mutex;
 14    pub(super) use rodio::source::LimitSettings;
 15    pub(super) use std::sync::Arc;
 16}
 17
 18#[cfg(not(any(all(target_os = "windows", target_env = "gnu"), target_os = "freebsd")))]
 19use non_windows_and_freebsd_deps::*;
 20
 21use rodio::{
 22    Decoder, DeviceSinkBuilder, MixerDeviceSink, Source,
 23    mixer::Mixer,
 24    nz,
 25    source::{AutomaticGainControlSettings, Buffered},
 26};
 27use settings::Settings;
 28use std::{io::Cursor, num::NonZero, path::PathBuf, sync::atomic::Ordering, time::Duration};
 29use util::ResultExt;
 30
 31mod audio_settings;
 32mod replays;
 33mod rodio_ext;
 34pub use audio_settings::AudioSettings;
 35pub use rodio_ext::RodioExt;
 36
 37use crate::audio_settings::LIVE_SETTINGS;
 38
 39// We are migrating to 16kHz sample rate from 48kHz. In the future
 40// once we are reasonably sure most users have upgraded we will
 41// remove the LEGACY parameters.
 42//
 43// We migrate to 16kHz because it is sufficient for speech and required
 44// by the denoiser and future Speech to Text layers.
 45pub const SAMPLE_RATE: NonZero<u32> = nz!(16000);
 46pub const CHANNEL_COUNT: NonZero<u16> = nz!(1);
 47pub const BUFFER_SIZE: usize = // echo canceller and livekit want 10ms of audio
 48    (SAMPLE_RATE.get() as usize / 100) * CHANNEL_COUNT.get() as usize;
 49
 50pub const LEGACY_SAMPLE_RATE: NonZero<u32> = nz!(48000);
 51pub const LEGACY_CHANNEL_COUNT: NonZero<u16> = nz!(2);
 52
 53pub const REPLAY_DURATION: Duration = Duration::from_secs(30);
 54
 55pub fn init(cx: &mut App) {
 56    LIVE_SETTINGS.initialize(cx);
 57}
 58
 59// TODO(jk): this is currently cached only once - we should observe and react instead
 60pub fn ensure_devices_initialized(cx: &mut App) {
 61    if cx.has_global::<AvailableAudioDevices>() {
 62        return;
 63    }
 64    cx.default_global::<AvailableAudioDevices>();
 65    let task = cx
 66        .background_executor()
 67        .spawn(async move { get_available_audio_devices() });
 68    cx.spawn(async move |cx: &mut AsyncApp| {
 69        let devices = task.await;
 70        cx.update(|cx| cx.set_global(AvailableAudioDevices(devices)));
 71        cx.refresh();
 72    })
 73    .detach();
 74}
 75
 76#[derive(Debug, Copy, Clone, Eq, Hash, PartialEq)]
 77pub enum Sound {
 78    Joined,
 79    GuestJoined,
 80    Leave,
 81    Mute,
 82    Unmute,
 83    StartScreenshare,
 84    StopScreenshare,
 85    AgentDone,
 86}
 87
 88impl Sound {
 89    fn file(&self) -> &'static str {
 90        match self {
 91            Self::Joined => "joined_call",
 92            Self::GuestJoined => "guest_joined_call",
 93            Self::Leave => "leave_call",
 94            Self::Mute => "mute",
 95            Self::Unmute => "unmute",
 96            Self::StartScreenshare => "start_screenshare",
 97            Self::StopScreenshare => "stop_screenshare",
 98            Self::AgentDone => "agent_done",
 99        }
100    }
101}
102
103pub struct Audio {
104    output_handle: Option<MixerDeviceSink>,
105    #[cfg(not(any(all(target_os = "windows", target_env = "gnu"), target_os = "freebsd")))]
106    pub echo_canceller: Arc<Mutex<apm::AudioProcessingModule>>,
107    source_cache: HashMap<Sound, Buffered<Decoder<Cursor<Vec<u8>>>>>,
108    replays: replays::Replays,
109}
110
111impl Default for Audio {
112    fn default() -> Self {
113        Self {
114            output_handle: Default::default(),
115            #[cfg(not(any(
116                all(target_os = "windows", target_env = "gnu"),
117                target_os = "freebsd"
118            )))]
119            echo_canceller: Arc::new(Mutex::new(apm::AudioProcessingModule::new(
120                true, false, false, false,
121            ))),
122            source_cache: Default::default(),
123            replays: Default::default(),
124        }
125    }
126}
127
128impl Global for Audio {}
129
130impl Audio {
131    fn ensure_output_exists(&mut self, output_audio_device: Option<DeviceId>) -> Result<&Mixer> {
132        #[cfg(debug_assertions)]
133        log::warn!(
134            "Audio does not sound correct without optimizations. Use a release build to debug audio issues"
135        );
136
137        if self.output_handle.is_none() {
138            let output_handle = open_output_stream(output_audio_device)?;
139
140            // The webrtc apm is not yet compiling for windows & freebsd
141            #[cfg(not(any(
142                any(all(target_os = "windows", target_env = "gnu")),
143                target_os = "freebsd"
144            )))]
145            let echo_canceller = Arc::clone(&self.echo_canceller);
146
147            #[cfg(not(any(
148                any(all(target_os = "windows", target_env = "gnu")),
149                target_os = "freebsd"
150            )))]
151            {
152                let source = rodio::source::Zero::new(CHANNEL_COUNT, SAMPLE_RATE)
153                    .inspect_buffer::<BUFFER_SIZE, _>(move |buffer| {
154                        let mut buf: [i16; _] = buffer.map(|s| s.to_sample());
155                        echo_canceller
156                            .lock()
157                            .process_reverse_stream(
158                                &mut buf,
159                                SAMPLE_RATE.get() as i32,
160                                CHANNEL_COUNT.get().into(),
161                            )
162                            .expect("Audio input and output threads should not panic");
163                    });
164                output_handle.mixer().add(source);
165            }
166
167            #[cfg(any(
168                any(all(target_os = "windows", target_env = "gnu")),
169                target_os = "freebsd"
170            ))]
171            {
172                let source = rodio::source::Zero::new(CHANNEL_COUNT, SAMPLE_RATE);
173                output_handle.mixer().add(source);
174            }
175
176            self.output_handle = Some(output_handle);
177        }
178
179        Ok(self
180            .output_handle
181            .as_ref()
182            .map(|h| h.mixer())
183            .expect("we only get here if opening the outputstream succeeded"))
184    }
185
186    pub fn save_replays(
187        &self,
188        executor: BackgroundExecutor,
189    ) -> gpui::Task<anyhow::Result<(PathBuf, Duration)>> {
190        self.replays.replays_to_tar(executor)
191    }
192
193    #[cfg(not(any(all(target_os = "windows", target_env = "gnu"), target_os = "freebsd")))]
194    pub fn open_microphone(voip_parts: VoipParts) -> anyhow::Result<impl Source> {
195        let stream = open_input_stream(voip_parts.input_audio_device)?;
196        let stream = stream
197            .possibly_disconnected_channels_to_mono()
198            .constant_samplerate(SAMPLE_RATE)
199            .limit(LimitSettings::live_performance())
200            .process_buffer::<BUFFER_SIZE, _>(move |buffer| {
201                let mut int_buffer: [i16; _] = buffer.map(|s| s.to_sample());
202                if voip_parts
203                    .echo_canceller
204                    .lock()
205                    .process_stream(
206                        &mut int_buffer,
207                        SAMPLE_RATE.get() as i32,
208                        CHANNEL_COUNT.get() as i32,
209                    )
210                    .context("livekit audio processor error")
211                    .log_err()
212                    .is_some()
213                {
214                    for (sample, processed) in buffer.iter_mut().zip(&int_buffer) {
215                        *sample = (*processed).to_sample();
216                    }
217                }
218            })
219            .denoise()
220            .context("Could not set up denoiser")?
221            .automatic_gain_control(AutomaticGainControlSettings {
222                target_level: 0.90,
223                attack_time: Duration::from_secs(1),
224                release_time: Duration::from_secs(0),
225                absolute_max_gain: 5.0,
226            })
227            .periodic_access(Duration::from_millis(100), move |agc_source| {
228                agc_source
229                    .set_enabled(LIVE_SETTINGS.auto_microphone_volume.load(Ordering::Relaxed));
230                let denoise = agc_source.inner_mut();
231                denoise.set_enabled(LIVE_SETTINGS.denoise.load(Ordering::Relaxed));
232            });
233
234        let stream = if voip_parts.legacy_audio_compatible {
235            stream.constant_params(LEGACY_CHANNEL_COUNT, LEGACY_SAMPLE_RATE)
236        } else {
237            stream.constant_params(CHANNEL_COUNT, SAMPLE_RATE)
238        };
239
240        let (replay, stream) = stream.replayable(REPLAY_DURATION)?;
241        voip_parts
242            .replays
243            .add_voip_stream("local microphone".to_string(), replay);
244
245        Ok(stream)
246    }
247
248    pub fn play_voip_stream(
249        source: impl rodio::Source + Send + 'static,
250        speaker_name: String,
251        is_staff: bool,
252        cx: &mut App,
253    ) -> anyhow::Result<()> {
254        let (replay_source, source) = source
255            .constant_params(CHANNEL_COUNT, SAMPLE_RATE)
256            .automatic_gain_control(AutomaticGainControlSettings {
257                target_level: 0.90,
258                attack_time: Duration::from_secs(1),
259                release_time: Duration::from_secs(0),
260                absolute_max_gain: 5.0,
261            })
262            .periodic_access(Duration::from_millis(100), move |agc_source| {
263                agc_source.set_enabled(LIVE_SETTINGS.auto_speaker_volume.load(Ordering::Relaxed));
264            })
265            .replayable(REPLAY_DURATION)
266            .expect("REPLAY_DURATION is longer than 100ms");
267        let output_audio_device = AudioSettings::get_global(cx).output_audio_device.clone();
268
269        cx.update_default_global(|this: &mut Self, _cx| {
270            let output_mixer = this
271                .ensure_output_exists(output_audio_device)
272                .context("Could not get output mixer")?;
273            output_mixer.add(source);
274            if is_staff {
275                this.replays.add_voip_stream(speaker_name, replay_source);
276            }
277            Ok(())
278        })
279    }
280
281    pub fn play_sound(sound: Sound, cx: &mut App) {
282        let output_audio_device = AudioSettings::get_global(cx).output_audio_device.clone();
283        cx.update_default_global(|this: &mut Self, cx| {
284            let source = this.sound_source(sound, cx).log_err()?;
285            let output_mixer = this
286                .ensure_output_exists(output_audio_device)
287                .context("Could not get output mixer")
288                .log_err()?;
289
290            output_mixer.add(source);
291            Some(())
292        });
293    }
294
295    pub fn end_call(cx: &mut App) {
296        cx.update_default_global(|this: &mut Self, _cx| {
297            this.output_handle.take();
298        });
299    }
300
301    fn sound_source(&mut self, sound: Sound, cx: &App) -> Result<impl Source + use<>> {
302        if let Some(wav) = self.source_cache.get(&sound) {
303            return Ok(wav.clone());
304        }
305
306        let path = format!("sounds/{}.wav", sound.file());
307        let bytes = cx
308            .asset_source()
309            .load(&path)?
310            .map(anyhow::Ok)
311            .with_context(|| format!("No asset available for path {path}"))??
312            .into_owned();
313        let cursor = Cursor::new(bytes);
314        let source = Decoder::new(cursor)?.buffered();
315
316        self.source_cache.insert(sound, source.clone());
317
318        Ok(source)
319    }
320}
321
322#[cfg(not(any(all(target_os = "windows", target_env = "gnu"), target_os = "freebsd")))]
323pub struct VoipParts {
324    echo_canceller: Arc<Mutex<apm::AudioProcessingModule>>,
325    replays: replays::Replays,
326    legacy_audio_compatible: bool,
327    input_audio_device: Option<DeviceId>,
328}
329
330#[cfg(not(any(all(target_os = "windows", target_env = "gnu"), target_os = "freebsd")))]
331impl VoipParts {
332    pub fn new(cx: &AsyncApp) -> anyhow::Result<Self> {
333        let (apm, replays) = cx.read_default_global::<Audio, _>(|audio, _| {
334            (Arc::clone(&audio.echo_canceller), audio.replays.clone())
335        });
336        let legacy_audio_compatible =
337            AudioSettings::try_read_global(cx, |settings| settings.legacy_audio_compatible)
338                .unwrap_or(true);
339        let input_audio_device =
340            AudioSettings::try_read_global(cx, |settings| settings.input_audio_device.clone())
341                .flatten();
342
343        Ok(Self {
344            legacy_audio_compatible,
345            echo_canceller: apm,
346            replays,
347            input_audio_device,
348        })
349    }
350}
351
352pub fn open_input_stream(
353    device_id: Option<DeviceId>,
354) -> anyhow::Result<rodio::microphone::Microphone> {
355    let builder = rodio::microphone::MicrophoneBuilder::new();
356    let builder = if let Some(id) = device_id {
357        // TODO(jk): upstream patch
358        // if let Some(input_device) = default_host().device_by_id(id) {
359        //     builder.device(input_device);
360        // }
361        let mut found = None;
362        for input in rodio::microphone::available_inputs()? {
363            if input.clone().into_inner().id()? == id {
364                found = Some(builder.device(input));
365                break;
366            }
367        }
368        found.unwrap_or_else(|| builder.default_device())?
369    } else {
370        builder.default_device()?
371    };
372    let stream = builder
373        .default_config()?
374        .prefer_sample_rates([
375            SAMPLE_RATE,
376            SAMPLE_RATE.saturating_mul(rodio::nz!(2)),
377            SAMPLE_RATE.saturating_mul(rodio::nz!(3)),
378            SAMPLE_RATE.saturating_mul(rodio::nz!(4)),
379        ])
380        .prefer_channel_counts([rodio::nz!(1), rodio::nz!(2), rodio::nz!(3), rodio::nz!(4)])
381        .prefer_buffer_sizes(512..)
382        .open_stream()?;
383    log::info!("Opened microphone: {:?}", stream.config());
384    Ok(stream)
385}
386
387pub fn open_output_stream(device_id: Option<DeviceId>) -> anyhow::Result<MixerDeviceSink> {
388    let output_handle = if let Some(id) = device_id {
389        if let Some(device) = default_host().device_by_id(&id) {
390            DeviceSinkBuilder::from_device(device)?.open_stream()
391        } else {
392            DeviceSinkBuilder::open_default_sink()
393        }
394    } else {
395        DeviceSinkBuilder::open_default_sink()
396    };
397    let mut output_handle = output_handle.context("Could not open output stream")?;
398    output_handle.log_on_drop(false);
399    log::info!("Output stream: {:?}", output_handle);
400    Ok(output_handle)
401}
402
403#[derive(Clone, Debug)]
404pub struct AudioDeviceInfo {
405    pub id: DeviceId,
406    pub desc: DeviceDescription,
407}
408
409impl AudioDeviceInfo {
410    pub fn matches_input(&self, is_input: bool) -> bool {
411        if is_input {
412            self.desc.supports_input()
413        } else {
414            self.desc.supports_output()
415        }
416    }
417
418    pub fn matches(&self, id: &DeviceId, is_input: bool) -> bool {
419        &self.id == id && self.matches_input(is_input)
420    }
421}
422
423impl std::fmt::Display for AudioDeviceInfo {
424    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
425        write!(f, "{} ({})", self.desc.name(), self.id)
426    }
427}
428
429fn get_available_audio_devices() -> Vec<AudioDeviceInfo> {
430    let Some(devices) = default_host().devices().ok() else {
431        return Vec::new();
432    };
433    devices
434        .filter_map(|device| {
435            let id = device.id().ok()?;
436            let desc = device.description().ok()?;
437            Some(AudioDeviceInfo { id, desc })
438        })
439        .collect()
440}
441
442#[derive(Default, Clone, Debug)]
443pub struct AvailableAudioDevices(pub Vec<AudioDeviceInfo>);
444
445impl Global for AvailableAudioDevices {}