audio.rs

  1use anyhow::{Context as _, Result};
  2use collections::HashMap;
  3use cpal::{
  4    DeviceDescription, DeviceId, default_host,
  5    traits::{DeviceTrait, HostTrait},
  6};
  7use gpui::{App, AsyncApp, BackgroundExecutor, BorrowAppContext, Global};
  8
  9#[cfg(not(any(all(target_os = "windows", target_env = "gnu"), target_os = "freebsd")))]
 10mod non_windows_and_freebsd_deps {
 11    pub(super) use cpal::Sample;
 12    pub(super) use libwebrtc::native::apm;
 13    pub(super) use parking_lot::Mutex;
 14    pub(super) use rodio::source::LimitSettings;
 15    pub(super) use std::sync::Arc;
 16}
 17
 18#[cfg(not(any(all(target_os = "windows", target_env = "gnu"), target_os = "freebsd")))]
 19use non_windows_and_freebsd_deps::*;
 20
 21use rodio::{
 22    Decoder, DeviceSinkBuilder, MixerDeviceSink, Source,
 23    mixer::Mixer,
 24    nz,
 25    source::{AutomaticGainControlSettings, Buffered},
 26};
 27use settings::Settings;
 28use std::{io::Cursor, num::NonZero, path::PathBuf, sync::atomic::Ordering, time::Duration};
 29use util::ResultExt;
 30
 31mod audio_settings;
 32mod replays;
 33mod rodio_ext;
 34pub use audio_settings::AudioSettings;
 35pub use rodio_ext::RodioExt;
 36
 37use crate::audio_settings::LIVE_SETTINGS;
 38
 39// We are migrating to 16kHz sample rate from 48kHz. In the future
 40// once we are reasonably sure most users have upgraded we will
 41// remove the LEGACY parameters.
 42//
 43// We migrate to 16kHz because it is sufficient for speech and required
 44// by the denoiser and future Speech to Text layers.
 45pub const SAMPLE_RATE: NonZero<u32> = nz!(16000);
 46pub const CHANNEL_COUNT: NonZero<u16> = nz!(1);
 47pub const BUFFER_SIZE: usize = // echo canceller and livekit want 10ms of audio
 48    (SAMPLE_RATE.get() as usize / 100) * CHANNEL_COUNT.get() as usize;
 49
 50pub const LEGACY_SAMPLE_RATE: NonZero<u32> = nz!(48000);
 51pub const LEGACY_CHANNEL_COUNT: NonZero<u16> = nz!(2);
 52
 53pub const REPLAY_DURATION: Duration = Duration::from_secs(30);
 54
 55pub fn init(cx: &mut App) {
 56    LIVE_SETTINGS.initialize(cx);
 57    // TODO(jk): this is currently cached only once at startup - we should observe and react instead
 58    let task = cx
 59        .background_executor()
 60        .spawn(async move { get_available_audio_devices() });
 61    cx.spawn(async move |cx: &mut AsyncApp| {
 62        let devices = task.await;
 63        cx.update(|cx| cx.set_global(AvailableAudioDevices(devices)))
 64    })
 65    .detach();
 66}
 67
 68#[derive(Debug, Copy, Clone, Eq, Hash, PartialEq)]
 69pub enum Sound {
 70    Joined,
 71    GuestJoined,
 72    Leave,
 73    Mute,
 74    Unmute,
 75    StartScreenshare,
 76    StopScreenshare,
 77    AgentDone,
 78}
 79
 80impl Sound {
 81    fn file(&self) -> &'static str {
 82        match self {
 83            Self::Joined => "joined_call",
 84            Self::GuestJoined => "guest_joined_call",
 85            Self::Leave => "leave_call",
 86            Self::Mute => "mute",
 87            Self::Unmute => "unmute",
 88            Self::StartScreenshare => "start_screenshare",
 89            Self::StopScreenshare => "stop_screenshare",
 90            Self::AgentDone => "agent_done",
 91        }
 92    }
 93}
 94
 95pub struct Audio {
 96    output_handle: Option<MixerDeviceSink>,
 97    #[cfg(not(any(all(target_os = "windows", target_env = "gnu"), target_os = "freebsd")))]
 98    pub echo_canceller: Arc<Mutex<apm::AudioProcessingModule>>,
 99    source_cache: HashMap<Sound, Buffered<Decoder<Cursor<Vec<u8>>>>>,
100    replays: replays::Replays,
101}
102
103impl Default for Audio {
104    fn default() -> Self {
105        Self {
106            output_handle: Default::default(),
107            #[cfg(not(any(
108                all(target_os = "windows", target_env = "gnu"),
109                target_os = "freebsd"
110            )))]
111            echo_canceller: Arc::new(Mutex::new(apm::AudioProcessingModule::new(
112                true, false, false, false,
113            ))),
114            source_cache: Default::default(),
115            replays: Default::default(),
116        }
117    }
118}
119
120impl Global for Audio {}
121
122impl Audio {
123    fn ensure_output_exists(&mut self, output_audio_device: Option<DeviceId>) -> Result<&Mixer> {
124        #[cfg(debug_assertions)]
125        log::warn!(
126            "Audio does not sound correct without optimizations. Use a release build to debug audio issues"
127        );
128
129        if self.output_handle.is_none() {
130            let output_handle = open_output_stream(output_audio_device)?;
131
132            // The webrtc apm is not yet compiling for windows & freebsd
133            #[cfg(not(any(
134                any(all(target_os = "windows", target_env = "gnu")),
135                target_os = "freebsd"
136            )))]
137            let echo_canceller = Arc::clone(&self.echo_canceller);
138
139            #[cfg(not(any(
140                any(all(target_os = "windows", target_env = "gnu")),
141                target_os = "freebsd"
142            )))]
143            {
144                let source = rodio::source::Zero::new(CHANNEL_COUNT, SAMPLE_RATE)
145                    .inspect_buffer::<BUFFER_SIZE, _>(move |buffer| {
146                        let mut buf: [i16; _] = buffer.map(|s| s.to_sample());
147                        echo_canceller
148                            .lock()
149                            .process_reverse_stream(
150                                &mut buf,
151                                SAMPLE_RATE.get() as i32,
152                                CHANNEL_COUNT.get().into(),
153                            )
154                            .expect("Audio input and output threads should not panic");
155                    });
156                output_handle.mixer().add(source);
157            }
158
159            #[cfg(any(
160                any(all(target_os = "windows", target_env = "gnu")),
161                target_os = "freebsd"
162            ))]
163            {
164                let source = rodio::source::Zero::<f32>::new(CHANNEL_COUNT, SAMPLE_RATE);
165                output_handle.mixer().add(source);
166            }
167
168            self.output_handle = Some(output_handle);
169        }
170
171        Ok(self
172            .output_handle
173            .as_ref()
174            .map(|h| h.mixer())
175            .expect("we only get here if opening the outputstream succeeded"))
176    }
177
178    pub fn save_replays(
179        &self,
180        executor: BackgroundExecutor,
181    ) -> gpui::Task<anyhow::Result<(PathBuf, Duration)>> {
182        self.replays.replays_to_tar(executor)
183    }
184
185    #[cfg(not(any(all(target_os = "windows", target_env = "gnu"), target_os = "freebsd")))]
186    pub fn open_microphone(voip_parts: VoipParts) -> anyhow::Result<impl Source> {
187        let stream = open_input_stream(voip_parts.input_audio_device)?;
188        let stream = stream
189            .possibly_disconnected_channels_to_mono()
190            .constant_samplerate(SAMPLE_RATE)
191            .limit(LimitSettings::live_performance())
192            .process_buffer::<BUFFER_SIZE, _>(move |buffer| {
193                let mut int_buffer: [i16; _] = buffer.map(|s| s.to_sample());
194                if voip_parts
195                    .echo_canceller
196                    .lock()
197                    .process_stream(
198                        &mut int_buffer,
199                        SAMPLE_RATE.get() as i32,
200                        CHANNEL_COUNT.get() as i32,
201                    )
202                    .context("livekit audio processor error")
203                    .log_err()
204                    .is_some()
205                {
206                    for (sample, processed) in buffer.iter_mut().zip(&int_buffer) {
207                        *sample = (*processed).to_sample();
208                    }
209                }
210            })
211            .denoise()
212            .context("Could not set up denoiser")?
213            .automatic_gain_control(AutomaticGainControlSettings {
214                target_level: 0.90,
215                attack_time: Duration::from_secs(1),
216                release_time: Duration::from_secs(0),
217                absolute_max_gain: 5.0,
218            })
219            .periodic_access(Duration::from_millis(100), move |agc_source| {
220                agc_source
221                    .set_enabled(LIVE_SETTINGS.auto_microphone_volume.load(Ordering::Relaxed));
222                let denoise = agc_source.inner_mut();
223                denoise.set_enabled(LIVE_SETTINGS.denoise.load(Ordering::Relaxed));
224            });
225
226        let stream = if voip_parts.legacy_audio_compatible {
227            stream.constant_params(LEGACY_CHANNEL_COUNT, LEGACY_SAMPLE_RATE)
228        } else {
229            stream.constant_params(CHANNEL_COUNT, SAMPLE_RATE)
230        };
231
232        let (replay, stream) = stream.replayable(REPLAY_DURATION)?;
233        voip_parts
234            .replays
235            .add_voip_stream("local microphone".to_string(), replay);
236
237        Ok(stream)
238    }
239
240    pub fn play_voip_stream(
241        source: impl rodio::Source + Send + 'static,
242        speaker_name: String,
243        is_staff: bool,
244        cx: &mut App,
245    ) -> anyhow::Result<()> {
246        let (replay_source, source) = source
247            .constant_params(CHANNEL_COUNT, SAMPLE_RATE)
248            .automatic_gain_control(AutomaticGainControlSettings {
249                target_level: 0.90,
250                attack_time: Duration::from_secs(1),
251                release_time: Duration::from_secs(0),
252                absolute_max_gain: 5.0,
253            })
254            .periodic_access(Duration::from_millis(100), move |agc_source| {
255                agc_source.set_enabled(LIVE_SETTINGS.auto_speaker_volume.load(Ordering::Relaxed));
256            })
257            .replayable(REPLAY_DURATION)
258            .expect("REPLAY_DURATION is longer than 100ms");
259        let output_audio_device = AudioSettings::get_global(cx).output_audio_device.clone();
260
261        cx.update_default_global(|this: &mut Self, _cx| {
262            let output_mixer = this
263                .ensure_output_exists(output_audio_device)
264                .context("Could not get output mixer")?;
265            output_mixer.add(source);
266            if is_staff {
267                this.replays.add_voip_stream(speaker_name, replay_source);
268            }
269            Ok(())
270        })
271    }
272
273    pub fn play_sound(sound: Sound, cx: &mut App) {
274        let output_audio_device = AudioSettings::get_global(cx).output_audio_device.clone();
275        cx.update_default_global(|this: &mut Self, cx| {
276            let source = this.sound_source(sound, cx).log_err()?;
277            let output_mixer = this
278                .ensure_output_exists(output_audio_device)
279                .context("Could not get output mixer")
280                .log_err()?;
281
282            output_mixer.add(source);
283            Some(())
284        });
285    }
286
287    pub fn end_call(cx: &mut App) {
288        cx.update_default_global(|this: &mut Self, _cx| {
289            this.output_handle.take();
290        });
291    }
292
293    fn sound_source(&mut self, sound: Sound, cx: &App) -> Result<impl Source + use<>> {
294        if let Some(wav) = self.source_cache.get(&sound) {
295            return Ok(wav.clone());
296        }
297
298        let path = format!("sounds/{}.wav", sound.file());
299        let bytes = cx
300            .asset_source()
301            .load(&path)?
302            .map(anyhow::Ok)
303            .with_context(|| format!("No asset available for path {path}"))??
304            .into_owned();
305        let cursor = Cursor::new(bytes);
306        let source = Decoder::new(cursor)?.buffered();
307
308        self.source_cache.insert(sound, source.clone());
309
310        Ok(source)
311    }
312}
313
314#[cfg(not(any(all(target_os = "windows", target_env = "gnu"), target_os = "freebsd")))]
315pub struct VoipParts {
316    echo_canceller: Arc<Mutex<apm::AudioProcessingModule>>,
317    replays: replays::Replays,
318    legacy_audio_compatible: bool,
319    input_audio_device: Option<DeviceId>,
320}
321
322#[cfg(not(any(all(target_os = "windows", target_env = "gnu"), target_os = "freebsd")))]
323impl VoipParts {
324    pub fn new(cx: &AsyncApp) -> anyhow::Result<Self> {
325        let (apm, replays) = cx.read_default_global::<Audio, _>(|audio, _| {
326            (Arc::clone(&audio.echo_canceller), audio.replays.clone())
327        });
328        let legacy_audio_compatible =
329            AudioSettings::try_read_global(cx, |settings| settings.legacy_audio_compatible)
330                .unwrap_or(true);
331        let input_audio_device =
332            AudioSettings::try_read_global(cx, |settings| settings.input_audio_device.clone())
333                .flatten();
334
335        Ok(Self {
336            legacy_audio_compatible,
337            echo_canceller: apm,
338            replays,
339            input_audio_device,
340        })
341    }
342}
343
344pub fn open_input_stream(
345    device_id: Option<DeviceId>,
346) -> anyhow::Result<rodio::microphone::Microphone> {
347    let builder = rodio::microphone::MicrophoneBuilder::new();
348    let builder = if let Some(id) = device_id {
349        // TODO(jk): upstream patch
350        // if let Some(input_device) = default_host().device_by_id(id) {
351        //     builder.device(input_device);
352        // }
353        let mut found = None;
354        for input in rodio::microphone::available_inputs()? {
355            if input.clone().into_inner().id()? == id {
356                found = Some(builder.device(input));
357                break;
358            }
359        }
360        found.unwrap_or_else(|| builder.default_device())?
361    } else {
362        builder.default_device()?
363    };
364    let stream = builder
365        .default_config()?
366        .prefer_sample_rates([
367            SAMPLE_RATE,
368            SAMPLE_RATE.saturating_mul(rodio::nz!(2)),
369            SAMPLE_RATE.saturating_mul(rodio::nz!(3)),
370            SAMPLE_RATE.saturating_mul(rodio::nz!(4)),
371        ])
372        .prefer_channel_counts([rodio::nz!(1), rodio::nz!(2), rodio::nz!(3), rodio::nz!(4)])
373        .prefer_buffer_sizes(512..)
374        .open_stream()?;
375    log::info!("Opened microphone: {:?}", stream.config());
376    Ok(stream)
377}
378
379pub fn open_output_stream(device_id: Option<DeviceId>) -> anyhow::Result<MixerDeviceSink> {
380    let output_handle = if let Some(id) = device_id {
381        if let Some(device) = default_host().device_by_id(&id) {
382            DeviceSinkBuilder::from_device(device)?.open_stream()
383        } else {
384            DeviceSinkBuilder::open_default_sink()
385        }
386    } else {
387        DeviceSinkBuilder::open_default_sink()
388    };
389    let mut output_handle = output_handle.context("Could not open output stream")?;
390    output_handle.log_on_drop(false);
391    log::info!("Output stream: {:?}", output_handle);
392    Ok(output_handle)
393}
394
395#[derive(Clone, Debug)]
396pub struct AudioDeviceInfo {
397    pub id: DeviceId,
398    pub desc: DeviceDescription,
399}
400
401impl AudioDeviceInfo {
402    pub fn matches_input(&self, is_input: bool) -> bool {
403        if is_input {
404            self.desc.supports_input()
405        } else {
406            self.desc.supports_output()
407        }
408    }
409
410    pub fn matches(&self, id: &DeviceId, is_input: bool) -> bool {
411        &self.id == id && self.matches_input(is_input)
412    }
413}
414
415impl std::fmt::Display for AudioDeviceInfo {
416    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
417        write!(f, "{} ({})", self.desc.name(), self.id)
418    }
419}
420
421fn get_available_audio_devices() -> Vec<AudioDeviceInfo> {
422    let Some(devices) = default_host().devices().ok() else {
423        return Vec::new();
424    };
425    devices
426        .filter_map(|device| {
427            let id = device.id().ok()?;
428            let desc = device.description().ok()?;
429            Some(AudioDeviceInfo { id, desc })
430        })
431        .collect()
432}
433
434#[derive(Default, Clone, Debug)]
435pub struct AvailableAudioDevices(pub Vec<AudioDeviceInfo>);
436
437impl Global for AvailableAudioDevices {}