audio.rs

  1use anyhow::{Context as _, Result};
  2use collections::HashMap;
  3use gpui::{App, AsyncApp, BackgroundExecutor, BorrowAppContext, Global};
  4use libwebrtc::native::apm;
  5use log::info;
  6use parking_lot::Mutex;
  7use rodio::{
  8    Decoder, OutputStream, OutputStreamBuilder, Source,
  9    cpal::Sample,
 10    mixer::Mixer,
 11    nz,
 12    source::{Buffered, LimitSettings, UniformSourceIterator},
 13};
 14use settings::Settings;
 15use std::{
 16    io::Cursor,
 17    num::NonZero,
 18    path::PathBuf,
 19    sync::{Arc, atomic::Ordering},
 20    time::Duration,
 21};
 22use util::ResultExt;
 23
 24mod audio_settings;
 25mod replays;
 26mod rodio_ext;
 27pub use audio_settings::AudioSettings;
 28pub use rodio_ext::RodioExt;
 29
 30use crate::audio_settings::LIVE_SETTINGS;
 31
 32// NOTE: We used to use WebRTC's mixer which only supported
 33// 16kHz, 32kHz and 48kHz. As 48 is the most common "next step up"
 34// for audio output devices like speakers/bluetooth, we just hard-code
 35// this; and downsample when we need to.
 36//
 37// Since most noise cancelling requires 16kHz we will move to
 38// that in the future.
 39pub const SAMPLE_RATE: NonZero<u32> = nz!(48000);
 40pub const CHANNEL_COUNT: NonZero<u16> = nz!(2);
 41pub const BUFFER_SIZE: usize = // echo canceller and livekit want 10ms of audio
 42    (SAMPLE_RATE.get() as usize / 100) * CHANNEL_COUNT.get() as usize;
 43
 44pub const REPLAY_DURATION: Duration = Duration::from_secs(30);
 45
 46pub fn init(cx: &mut App) {
 47    AudioSettings::register(cx);
 48    LIVE_SETTINGS.initialize(cx);
 49}
 50
 51#[derive(Debug, Copy, Clone, Eq, Hash, PartialEq)]
 52pub enum Sound {
 53    Joined,
 54    Leave,
 55    Mute,
 56    Unmute,
 57    StartScreenshare,
 58    StopScreenshare,
 59    AgentDone,
 60}
 61
 62impl Sound {
 63    fn file(&self) -> &'static str {
 64        match self {
 65            Self::Joined => "joined_call",
 66            Self::Leave => "leave_call",
 67            Self::Mute => "mute",
 68            Self::Unmute => "unmute",
 69            Self::StartScreenshare => "start_screenshare",
 70            Self::StopScreenshare => "stop_screenshare",
 71            Self::AgentDone => "agent_done",
 72        }
 73    }
 74}
 75
 76pub struct Audio {
 77    output_handle: Option<OutputStream>,
 78    output_mixer: Option<Mixer>,
 79    pub echo_canceller: Arc<Mutex<apm::AudioProcessingModule>>,
 80    source_cache: HashMap<Sound, Buffered<Decoder<Cursor<Vec<u8>>>>>,
 81    replays: replays::Replays,
 82}
 83
 84impl Default for Audio {
 85    fn default() -> Self {
 86        Self {
 87            output_handle: Default::default(),
 88            output_mixer: Default::default(),
 89            echo_canceller: Arc::new(Mutex::new(apm::AudioProcessingModule::new(
 90                true, false, false, false,
 91            ))),
 92            source_cache: Default::default(),
 93            replays: Default::default(),
 94        }
 95    }
 96}
 97
 98impl Global for Audio {}
 99
100impl Audio {
101    fn ensure_output_exists(&mut self) -> Result<&Mixer> {
102        if self.output_handle.is_none() {
103            self.output_handle = Some(
104                OutputStreamBuilder::open_default_stream()
105                    .context("Could not open default output stream")?,
106            );
107            if let Some(output_handle) = &self.output_handle {
108                let (mixer, source) = rodio::mixer::mixer(CHANNEL_COUNT, SAMPLE_RATE);
109                // or the mixer will end immediately as its empty.
110                mixer.add(rodio::source::Zero::new(CHANNEL_COUNT, SAMPLE_RATE));
111                self.output_mixer = Some(mixer);
112
113                let echo_canceller = Arc::clone(&self.echo_canceller);
114                let source = source.inspect_buffer::<BUFFER_SIZE, _>(move |buffer| {
115                    let mut buf: [i16; _] = buffer.map(|s| s.to_sample());
116                    echo_canceller
117                        .lock()
118                        .process_reverse_stream(
119                            &mut buf,
120                            SAMPLE_RATE.get() as i32,
121                            CHANNEL_COUNT.get().into(),
122                        )
123                        .expect("Audio input and output threads should not panic");
124                });
125                output_handle.mixer().add(source);
126            }
127        }
128
129        Ok(self
130            .output_mixer
131            .as_ref()
132            .expect("we only get here if opening the outputstream succeeded"))
133    }
134
135    pub fn save_replays(
136        &self,
137        executor: BackgroundExecutor,
138    ) -> gpui::Task<anyhow::Result<(PathBuf, Duration)>> {
139        self.replays.replays_to_tar(executor)
140    }
141
142    pub fn open_microphone(voip_parts: VoipParts) -> anyhow::Result<impl Source> {
143        let stream = rodio::microphone::MicrophoneBuilder::new()
144            .default_device()?
145            .default_config()?
146            .prefer_sample_rates([SAMPLE_RATE, SAMPLE_RATE.saturating_mul(nz!(2))])
147            .prefer_channel_counts([nz!(1), nz!(2)])
148            .prefer_buffer_sizes(512..)
149            .open_stream()?;
150        info!("Opened microphone: {:?}", stream.config());
151
152        let (replay, stream) = UniformSourceIterator::new(stream, CHANNEL_COUNT, SAMPLE_RATE)
153            .limit(LimitSettings::live_performance())
154            .process_buffer::<BUFFER_SIZE, _>(move |buffer| {
155                let mut int_buffer: [i16; _] = buffer.map(|s| s.to_sample());
156                if voip_parts
157                    .echo_canceller
158                    .lock()
159                    .process_stream(
160                        &mut int_buffer,
161                        SAMPLE_RATE.get() as i32,
162                        CHANNEL_COUNT.get() as i32,
163                    )
164                    .context("livekit audio processor error")
165                    .log_err()
166                    .is_some()
167                {
168                    for (sample, processed) in buffer.iter_mut().zip(&int_buffer) {
169                        *sample = (*processed).to_sample();
170                    }
171                }
172            })
173            .automatic_gain_control(1.0, 4.0, 0.0, 5.0)
174            .periodic_access(Duration::from_millis(100), move |agc_source| {
175                agc_source.set_enabled(LIVE_SETTINGS.control_input_volume.load(Ordering::Relaxed));
176            })
177            .replayable(REPLAY_DURATION)
178            .expect("REPLAY_DURATION is longer then 100ms");
179
180        voip_parts
181            .replays
182            .add_voip_stream("local microphone".to_string(), replay);
183        Ok(stream)
184    }
185
186    pub fn play_voip_stream(
187        source: impl rodio::Source + Send + 'static,
188        speaker_name: String,
189        is_staff: bool,
190        cx: &mut App,
191    ) -> anyhow::Result<()> {
192        let (replay_source, source) = source
193            .automatic_gain_control(1.0, 4.0, 0.0, 5.0)
194            .periodic_access(Duration::from_millis(100), move |agc_source| {
195                agc_source.set_enabled(LIVE_SETTINGS.control_input_volume.load(Ordering::Relaxed));
196            })
197            .replayable(REPLAY_DURATION)
198            .expect("REPLAY_DURATION is longer then 100ms");
199
200        cx.update_default_global(|this: &mut Self, _cx| {
201            let output_mixer = this
202                .ensure_output_exists()
203                .context("Could not get output mixer")?;
204            output_mixer.add(source);
205            if is_staff {
206                this.replays.add_voip_stream(speaker_name, replay_source);
207            }
208            Ok(())
209        })
210    }
211
212    pub fn play_sound(sound: Sound, cx: &mut App) {
213        cx.update_default_global(|this: &mut Self, cx| {
214            let source = this.sound_source(sound, cx).log_err()?;
215            let output_mixer = this
216                .ensure_output_exists()
217                .context("Could not get output mixer")
218                .log_err()?;
219
220            output_mixer.add(source);
221            Some(())
222        });
223    }
224
225    pub fn end_call(cx: &mut App) {
226        cx.update_default_global(|this: &mut Self, _cx| {
227            this.output_handle.take();
228        });
229    }
230
231    fn sound_source(&mut self, sound: Sound, cx: &App) -> Result<impl Source + use<>> {
232        if let Some(wav) = self.source_cache.get(&sound) {
233            return Ok(wav.clone());
234        }
235
236        let path = format!("sounds/{}.wav", sound.file());
237        let bytes = cx
238            .asset_source()
239            .load(&path)?
240            .map(anyhow::Ok)
241            .with_context(|| format!("No asset available for path {path}"))??
242            .into_owned();
243        let cursor = Cursor::new(bytes);
244        let source = Decoder::new(cursor)?.buffered();
245
246        self.source_cache.insert(sound, source.clone());
247
248        Ok(source)
249    }
250}
251
252pub struct VoipParts {
253    echo_canceller: Arc<Mutex<apm::AudioProcessingModule>>,
254    replays: replays::Replays,
255}
256
257impl VoipParts {
258    pub fn new(cx: &AsyncApp) -> anyhow::Result<Self> {
259        let (apm, replays) = cx.try_read_default_global::<Audio, _>(|audio, _| {
260            (Arc::clone(&audio.echo_canceller), audio.replays.clone())
261        })?;
262
263        Ok(Self {
264            echo_canceller: apm,
265            replays,
266        })
267    }
268}