audio.rs

  1use anyhow::{Context as _, Result};
  2use collections::HashMap;
  3use gpui::{App, BackgroundExecutor, BorrowAppContext, Global};
  4
  5#[cfg(not(any(all(target_os = "windows", target_env = "gnu"), target_os = "freebsd")))]
  6mod non_windows_and_freebsd_deps {
  7    pub(super) use gpui::AsyncApp;
  8    pub(super) use libwebrtc::native::apm;
  9    pub(super) use log::info;
 10    pub(super) use parking_lot::Mutex;
 11    pub(super) use rodio::cpal::Sample;
 12    pub(super) use rodio::source::{LimitSettings, UniformSourceIterator};
 13    pub(super) use std::sync::Arc;
 14}
 15
 16#[cfg(not(any(all(target_os = "windows", target_env = "gnu"), target_os = "freebsd")))]
 17use non_windows_and_freebsd_deps::*;
 18
 19use rodio::{
 20    Decoder, OutputStream, OutputStreamBuilder, Source, mixer::Mixer, nz, source::Buffered,
 21};
 22use settings::Settings;
 23use std::{io::Cursor, num::NonZero, path::PathBuf, sync::atomic::Ordering, time::Duration};
 24use util::ResultExt;
 25
 26mod audio_settings;
 27mod replays;
 28mod rodio_ext;
 29pub use audio_settings::AudioSettings;
 30pub use rodio_ext::RodioExt;
 31
 32use crate::audio_settings::LIVE_SETTINGS;
 33
 34// NOTE: We used to use WebRTC's mixer which only supported
 35// 16kHz, 32kHz and 48kHz. As 48 is the most common "next step up"
 36// for audio output devices like speakers/bluetooth, we just hard-code
 37// this; and downsample when we need to.
 38//
 39// Since most noise cancelling requires 16kHz we will move to
 40// that in the future.
 41pub const SAMPLE_RATE: NonZero<u32> = nz!(48000);
 42pub const CHANNEL_COUNT: NonZero<u16> = nz!(2);
 43pub const BUFFER_SIZE: usize = // echo canceller and livekit want 10ms of audio
 44    (SAMPLE_RATE.get() as usize / 100) * CHANNEL_COUNT.get() as usize;
 45
 46pub const REPLAY_DURATION: Duration = Duration::from_secs(30);
 47
 48pub fn init(cx: &mut App) {
 49    AudioSettings::register(cx);
 50    LIVE_SETTINGS.initialize(cx);
 51}
 52
 53#[derive(Debug, Copy, Clone, Eq, Hash, PartialEq)]
 54pub enum Sound {
 55    Joined,
 56    Leave,
 57    Mute,
 58    Unmute,
 59    StartScreenshare,
 60    StopScreenshare,
 61    AgentDone,
 62}
 63
 64impl Sound {
 65    fn file(&self) -> &'static str {
 66        match self {
 67            Self::Joined => "joined_call",
 68            Self::Leave => "leave_call",
 69            Self::Mute => "mute",
 70            Self::Unmute => "unmute",
 71            Self::StartScreenshare => "start_screenshare",
 72            Self::StopScreenshare => "stop_screenshare",
 73            Self::AgentDone => "agent_done",
 74        }
 75    }
 76}
 77
 78pub struct Audio {
 79    output_handle: Option<OutputStream>,
 80    output_mixer: Option<Mixer>,
 81    #[cfg(not(any(all(target_os = "windows", target_env = "gnu"), target_os = "freebsd")))]
 82    pub echo_canceller: Arc<Mutex<apm::AudioProcessingModule>>,
 83    source_cache: HashMap<Sound, Buffered<Decoder<Cursor<Vec<u8>>>>>,
 84    replays: replays::Replays,
 85}
 86
 87impl Default for Audio {
 88    fn default() -> Self {
 89        Self {
 90            output_handle: Default::default(),
 91            output_mixer: Default::default(),
 92            #[cfg(not(any(
 93                all(target_os = "windows", target_env = "gnu"),
 94                target_os = "freebsd"
 95            )))]
 96            echo_canceller: Arc::new(Mutex::new(apm::AudioProcessingModule::new(
 97                true, false, false, false,
 98            ))),
 99            source_cache: Default::default(),
100            replays: Default::default(),
101        }
102    }
103}
104
105impl Global for Audio {}
106
107impl Audio {
108    fn ensure_output_exists(&mut self) -> Result<&Mixer> {
109        if self.output_handle.is_none() {
110            self.output_handle = Some(
111                OutputStreamBuilder::open_default_stream()
112                    .context("Could not open default output stream")?,
113            );
114            if let Some(output_handle) = &self.output_handle {
115                let (mixer, source) = rodio::mixer::mixer(CHANNEL_COUNT, SAMPLE_RATE);
116                // or the mixer will end immediately as its empty.
117                mixer.add(rodio::source::Zero::new(CHANNEL_COUNT, SAMPLE_RATE));
118                self.output_mixer = Some(mixer);
119
120                // The webrtc apm is not yet compiling for windows & freebsd
121                #[cfg(not(any(
122                    any(all(target_os = "windows", target_env = "gnu")),
123                    target_os = "freebsd"
124                )))]
125                let echo_canceller = Arc::clone(&self.echo_canceller);
126                #[cfg(not(any(
127                    any(all(target_os = "windows", target_env = "gnu")),
128                    target_os = "freebsd"
129                )))]
130                let source = source.inspect_buffer::<BUFFER_SIZE, _>(move |buffer| {
131                    let mut buf: [i16; _] = buffer.map(|s| s.to_sample());
132                    echo_canceller
133                        .lock()
134                        .process_reverse_stream(
135                            &mut buf,
136                            SAMPLE_RATE.get() as i32,
137                            CHANNEL_COUNT.get().into(),
138                        )
139                        .expect("Audio input and output threads should not panic");
140                });
141                output_handle.mixer().add(source);
142            }
143        }
144
145        Ok(self
146            .output_mixer
147            .as_ref()
148            .expect("we only get here if opening the outputstream succeeded"))
149    }
150
151    pub fn save_replays(
152        &self,
153        executor: BackgroundExecutor,
154    ) -> gpui::Task<anyhow::Result<(PathBuf, Duration)>> {
155        self.replays.replays_to_tar(executor)
156    }
157
158    #[cfg(not(any(all(target_os = "windows", target_env = "gnu"), target_os = "freebsd")))]
159    pub fn open_microphone(voip_parts: VoipParts) -> anyhow::Result<impl Source> {
160        let stream = rodio::microphone::MicrophoneBuilder::new()
161            .default_device()?
162            .default_config()?
163            .prefer_sample_rates([SAMPLE_RATE, SAMPLE_RATE.saturating_mul(nz!(2))])
164            .prefer_channel_counts([nz!(1), nz!(2)])
165            .prefer_buffer_sizes(512..)
166            .open_stream()?;
167        info!("Opened microphone: {:?}", stream.config());
168
169        let (replay, stream) = stream
170            // .suspicious_stereo_to_mono()
171            .constant_params(CHANNEL_COUNT, SAMPLE_RATE)
172            .limit(LimitSettings::live_performance())
173            .process_buffer::<BUFFER_SIZE, _>(move |buffer| {
174                let mut int_buffer: [i16; _] = buffer.map(|s| s.to_sample());
175                if voip_parts
176                    .echo_canceller
177                    .lock()
178                    .process_stream(
179                        &mut int_buffer,
180                        SAMPLE_RATE.get() as i32,
181                        CHANNEL_COUNT.get() as i32,
182                    )
183                    .context("livekit audio processor error")
184                    .log_err()
185                    .is_some()
186                {
187                    for (sample, processed) in buffer.iter_mut().zip(&int_buffer) {
188                        *sample = (*processed).to_sample();
189                    }
190                }
191            })
192            .automatic_gain_control(1.0, 4.0, 0.0, 5.0)
193            .periodic_access(Duration::from_millis(100), move |agc_source| {
194                agc_source.set_enabled(LIVE_SETTINGS.control_input_volume.load(Ordering::Relaxed));
195            })
196            .replayable(REPLAY_DURATION)?;
197
198        voip_parts
199            .replays
200            .add_voip_stream("local microphone".to_string(), replay);
201        Ok(stream)
202    }
203
204    pub fn play_voip_stream(
205        source: impl rodio::Source + Send + 'static,
206        speaker_name: String,
207        is_staff: bool,
208        cx: &mut App,
209    ) -> anyhow::Result<()> {
210        let (replay_source, source) = source
211            .automatic_gain_control(1.0, 4.0, 0.0, 5.0)
212            .periodic_access(Duration::from_millis(100), move |agc_source| {
213                agc_source.set_enabled(LIVE_SETTINGS.control_input_volume.load(Ordering::Relaxed));
214            })
215            .replayable(REPLAY_DURATION)
216            .expect("REPLAY_DURATION is longer than 100ms");
217
218        cx.update_default_global(|this: &mut Self, _cx| {
219            let output_mixer = this
220                .ensure_output_exists()
221                .context("Could not get output mixer")?;
222            output_mixer.add(source);
223            if is_staff {
224                this.replays.add_voip_stream(speaker_name, replay_source);
225            }
226            Ok(())
227        })
228    }
229
230    pub fn play_sound(sound: Sound, cx: &mut App) {
231        cx.update_default_global(|this: &mut Self, cx| {
232            let source = this.sound_source(sound, cx).log_err()?;
233            let output_mixer = this
234                .ensure_output_exists()
235                .context("Could not get output mixer")
236                .log_err()?;
237
238            output_mixer.add(source);
239            Some(())
240        });
241    }
242
243    pub fn end_call(cx: &mut App) {
244        cx.update_default_global(|this: &mut Self, _cx| {
245            this.output_handle.take();
246        });
247    }
248
249    fn sound_source(&mut self, sound: Sound, cx: &App) -> Result<impl Source + use<>> {
250        if let Some(wav) = self.source_cache.get(&sound) {
251            return Ok(wav.clone());
252        }
253
254        let path = format!("sounds/{}.wav", sound.file());
255        let bytes = cx
256            .asset_source()
257            .load(&path)?
258            .map(anyhow::Ok)
259            .with_context(|| format!("No asset available for path {path}"))??
260            .into_owned();
261        let cursor = Cursor::new(bytes);
262        let source = Decoder::new(cursor)?.buffered();
263
264        self.source_cache.insert(sound, source.clone());
265
266        Ok(source)
267    }
268}
269
270#[cfg(not(any(all(target_os = "windows", target_env = "gnu"), target_os = "freebsd")))]
271pub struct VoipParts {
272    echo_canceller: Arc<Mutex<apm::AudioProcessingModule>>,
273    replays: replays::Replays,
274}
275
276#[cfg(not(any(all(target_os = "windows", target_env = "gnu"), target_os = "freebsd")))]
277impl VoipParts {
278    pub fn new(cx: &AsyncApp) -> anyhow::Result<Self> {
279        let (apm, replays) = cx.try_read_default_global::<Audio, _>(|audio, _| {
280            (Arc::clone(&audio.echo_canceller), audio.replays.clone())
281        })?;
282
283        Ok(Self {
284            echo_canceller: apm,
285            replays,
286        })
287    }
288}