1use anyhow::{Context as _, Result};
  2use collections::HashMap;
  3use gpui::{App, BackgroundExecutor, BorrowAppContext, Global};
  4use log::info;
  5
  6#[cfg(not(any(all(target_os = "windows", target_env = "gnu"), target_os = "freebsd")))]
  7mod non_windows_and_freebsd_deps {
  8    pub(super) use gpui::AsyncApp;
  9    pub(super) use libwebrtc::native::apm;
 10    pub(super) use parking_lot::Mutex;
 11    pub(super) use rodio::cpal::Sample;
 12    pub(super) use rodio::source::LimitSettings;
 13    pub(super) use std::sync::Arc;
 14}
 15
 16#[cfg(not(any(all(target_os = "windows", target_env = "gnu"), target_os = "freebsd")))]
 17use non_windows_and_freebsd_deps::*;
 18
 19use rodio::{
 20    Decoder, OutputStream, OutputStreamBuilder, Source, mixer::Mixer, nz, source::Buffered,
 21};
 22use settings::Settings;
 23use std::{io::Cursor, num::NonZero, path::PathBuf, sync::atomic::Ordering, time::Duration};
 24use util::ResultExt;
 25
 26mod audio_settings;
 27mod replays;
 28mod rodio_ext;
 29pub use audio_settings::AudioSettings;
 30pub use rodio_ext::RodioExt;
 31
 32use crate::audio_settings::LIVE_SETTINGS;
 33
 34// We are migrating to 16kHz sample rate from 48kHz. In the future
 35// once we are reasonably sure most users have upgraded we will
 36// remove the LEGACY parameters.
 37//
 38// We migrate to 16kHz because it is sufficient for speech and required
 39// by the denoiser and future Speech to Text layers.
 40pub const SAMPLE_RATE: NonZero<u32> = nz!(16000);
 41pub const CHANNEL_COUNT: NonZero<u16> = nz!(1);
 42pub const BUFFER_SIZE: usize = // echo canceller and livekit want 10ms of audio
 43    (SAMPLE_RATE.get() as usize / 100) * CHANNEL_COUNT.get() as usize;
 44
 45pub const LEGACY_SAMPLE_RATE: NonZero<u32> = nz!(48000);
 46pub const LEGACY_CHANNEL_COUNT: NonZero<u16> = nz!(2);
 47
 48pub const REPLAY_DURATION: Duration = Duration::from_secs(30);
 49
 50pub fn init(cx: &mut App) {
 51    AudioSettings::register(cx);
 52    LIVE_SETTINGS.initialize(cx);
 53}
 54
 55#[derive(Debug, Copy, Clone, Eq, Hash, PartialEq)]
 56pub enum Sound {
 57    Joined,
 58    GuestJoined,
 59    Leave,
 60    Mute,
 61    Unmute,
 62    StartScreenshare,
 63    StopScreenshare,
 64    AgentDone,
 65}
 66
 67impl Sound {
 68    const fn file(&self) -> &'static str {
 69        match self {
 70            Self::Joined => "joined_call",
 71            Self::GuestJoined => "guest_joined_call",
 72            Self::Leave => "leave_call",
 73            Self::Mute => "mute",
 74            Self::Unmute => "unmute",
 75            Self::StartScreenshare => "start_screenshare",
 76            Self::StopScreenshare => "stop_screenshare",
 77            Self::AgentDone => "agent_done",
 78        }
 79    }
 80}
 81
 82pub struct Audio {
 83    output_handle: Option<OutputStream>,
 84    output_mixer: Option<Mixer>,
 85    #[cfg(not(any(all(target_os = "windows", target_env = "gnu"), target_os = "freebsd")))]
 86    pub echo_canceller: Arc<Mutex<apm::AudioProcessingModule>>,
 87    source_cache: HashMap<Sound, Buffered<Decoder<Cursor<Vec<u8>>>>>,
 88    replays: replays::Replays,
 89}
 90
 91impl Default for Audio {
 92    fn default() -> Self {
 93        Self {
 94            output_handle: Default::default(),
 95            output_mixer: Default::default(),
 96            #[cfg(not(any(
 97                all(target_os = "windows", target_env = "gnu"),
 98                target_os = "freebsd"
 99            )))]
100            echo_canceller: Arc::new(Mutex::new(apm::AudioProcessingModule::new(
101                true, false, false, false,
102            ))),
103            source_cache: Default::default(),
104            replays: Default::default(),
105        }
106    }
107}
108
109impl Global for Audio {}
110
111impl Audio {
112    fn ensure_output_exists(&mut self) -> Result<&Mixer> {
113        #[cfg(debug_assertions)]
114        log::warn!(
115            "Audio does not sound correct without optimizations. Use a release build to debug audio issues"
116        );
117
118        if self.output_handle.is_none() {
119            let output_handle = OutputStreamBuilder::open_default_stream()
120                .context("Could not open default output stream")?;
121            info!("Output stream: {:?}", output_handle);
122            self.output_handle = Some(output_handle);
123            if let Some(output_handle) = &self.output_handle {
124                let (mixer, source) = rodio::mixer::mixer(CHANNEL_COUNT, SAMPLE_RATE);
125                // or the mixer will end immediately as its empty.
126                mixer.add(rodio::source::Zero::new(CHANNEL_COUNT, SAMPLE_RATE));
127                self.output_mixer = Some(mixer);
128
129                // The webrtc apm is not yet compiling for windows & freebsd
130                #[cfg(not(any(
131                    any(all(target_os = "windows", target_env = "gnu")),
132                    target_os = "freebsd"
133                )))]
134                let echo_canceller = Arc::clone(&self.echo_canceller);
135                #[cfg(not(any(
136                    any(all(target_os = "windows", target_env = "gnu")),
137                    target_os = "freebsd"
138                )))]
139                let source = source.inspect_buffer::<BUFFER_SIZE, _>(move |buffer| {
140                    let mut buf: [i16; _] = buffer.map(|s| s.to_sample());
141                    echo_canceller
142                        .lock()
143                        .process_reverse_stream(
144                            &mut buf,
145                            SAMPLE_RATE.get() as i32,
146                            CHANNEL_COUNT.get().into(),
147                        )
148                        .expect("Audio input and output threads should not panic");
149                });
150                output_handle.mixer().add(source);
151            }
152        }
153
154        Ok(self
155            .output_mixer
156            .as_ref()
157            .expect("we only get here if opening the outputstream succeeded"))
158    }
159
160    pub fn save_replays(
161        &self,
162        executor: BackgroundExecutor,
163    ) -> gpui::Task<anyhow::Result<(PathBuf, Duration)>> {
164        self.replays.replays_to_tar(executor)
165    }
166
167    #[cfg(not(any(all(target_os = "windows", target_env = "gnu"), target_os = "freebsd")))]
168    pub fn open_microphone(voip_parts: VoipParts) -> anyhow::Result<impl Source> {
169        let stream = rodio::microphone::MicrophoneBuilder::new()
170            .default_device()?
171            .default_config()?
172            .prefer_sample_rates([
173                SAMPLE_RATE, // sample rates trivially resamplable to `SAMPLE_RATE`
174                SAMPLE_RATE.saturating_mul(nz!(2)),
175                SAMPLE_RATE.saturating_mul(nz!(3)),
176                SAMPLE_RATE.saturating_mul(nz!(4)),
177            ])
178            .prefer_channel_counts([nz!(1), nz!(2), nz!(3), nz!(4)])
179            .prefer_buffer_sizes(512..)
180            .open_stream()?;
181        info!("Opened microphone: {:?}", stream.config());
182
183        let stream = stream
184            .possibly_disconnected_channels_to_mono()
185            .constant_samplerate(SAMPLE_RATE)
186            .limit(LimitSettings::live_performance())
187            .process_buffer::<BUFFER_SIZE, _>(move |buffer| {
188                let mut int_buffer: [i16; _] = buffer.map(|s| s.to_sample());
189                if voip_parts
190                    .echo_canceller
191                    .lock()
192                    .process_stream(
193                        &mut int_buffer,
194                        SAMPLE_RATE.get() as i32,
195                        CHANNEL_COUNT.get() as i32,
196                    )
197                    .context("livekit audio processor error")
198                    .log_err()
199                    .is_some()
200                {
201                    for (sample, processed) in buffer.iter_mut().zip(&int_buffer) {
202                        *sample = (*processed).to_sample();
203                    }
204                }
205            })
206            .denoise()
207            .context("Could not set up denoiser")?
208            .automatic_gain_control(0.90, 1.0, 0.0, 5.0)
209            .periodic_access(Duration::from_millis(100), move |agc_source| {
210                agc_source
211                    .set_enabled(LIVE_SETTINGS.auto_microphone_volume.load(Ordering::Relaxed));
212                let denoise = agc_source.inner_mut();
213                denoise.set_enabled(LIVE_SETTINGS.denoise.load(Ordering::Relaxed));
214            });
215
216        let stream = if voip_parts.legacy_audio_compatible {
217            stream.constant_params(LEGACY_CHANNEL_COUNT, LEGACY_SAMPLE_RATE)
218        } else {
219            stream.constant_params(CHANNEL_COUNT, SAMPLE_RATE)
220        };
221
222        let (replay, stream) = stream.replayable(REPLAY_DURATION)?;
223        voip_parts
224            .replays
225            .add_voip_stream("local microphone".to_string(), replay);
226
227        Ok(stream)
228    }
229
230    pub fn play_voip_stream(
231        source: impl rodio::Source + Send + 'static,
232        speaker_name: String,
233        is_staff: bool,
234        cx: &mut App,
235    ) -> anyhow::Result<()> {
236        let (replay_source, source) = source
237            .constant_params(CHANNEL_COUNT, SAMPLE_RATE)
238            .automatic_gain_control(0.90, 1.0, 0.0, 5.0)
239            .periodic_access(Duration::from_millis(100), move |agc_source| {
240                agc_source.set_enabled(LIVE_SETTINGS.auto_speaker_volume.load(Ordering::Relaxed));
241            })
242            .replayable(REPLAY_DURATION)
243            .expect("REPLAY_DURATION is longer than 100ms");
244
245        cx.update_default_global(|this: &mut Self, _cx| {
246            let output_mixer = this
247                .ensure_output_exists()
248                .context("Could not get output mixer")?;
249            output_mixer.add(source);
250            if is_staff {
251                this.replays.add_voip_stream(speaker_name, replay_source);
252            }
253            Ok(())
254        })
255    }
256
257    pub fn play_sound(sound: Sound, cx: &mut App) {
258        cx.update_default_global(|this: &mut Self, cx| {
259            let source = this.sound_source(sound, cx).log_err()?;
260            let output_mixer = this
261                .ensure_output_exists()
262                .context("Could not get output mixer")
263                .log_err()?;
264
265            output_mixer.add(source);
266            Some(())
267        });
268    }
269
270    pub fn end_call(cx: &mut App) {
271        cx.update_default_global(|this: &mut Self, _cx| {
272            this.output_handle.take();
273        });
274    }
275
276    fn sound_source(&mut self, sound: Sound, cx: &App) -> Result<impl Source + use<>> {
277        if let Some(wav) = self.source_cache.get(&sound) {
278            return Ok(wav.clone());
279        }
280
281        let path = format!("sounds/{}.wav", sound.file());
282        let bytes = cx
283            .asset_source()
284            .load(&path)?
285            .map(anyhow::Ok)
286            .with_context(|| format!("No asset available for path {path}"))??
287            .into_owned();
288        let cursor = Cursor::new(bytes);
289        let source = Decoder::new(cursor)?.buffered();
290
291        self.source_cache.insert(sound, source.clone());
292
293        Ok(source)
294    }
295}
296
297#[cfg(not(any(all(target_os = "windows", target_env = "gnu"), target_os = "freebsd")))]
298pub struct VoipParts {
299    echo_canceller: Arc<Mutex<apm::AudioProcessingModule>>,
300    replays: replays::Replays,
301    legacy_audio_compatible: bool,
302}
303
304#[cfg(not(any(all(target_os = "windows", target_env = "gnu"), target_os = "freebsd")))]
305impl VoipParts {
306    pub fn new(cx: &AsyncApp) -> anyhow::Result<Self> {
307        let (apm, replays) = cx.try_read_default_global::<Audio, _>(|audio, _| {
308            (Arc::clone(&audio.echo_canceller), audio.replays.clone())
309        })?;
310        let legacy_audio_compatible =
311            AudioSettings::try_read_global(cx, |settings| settings.legacy_audio_compatible)
312                .unwrap_or(true);
313
314        Ok(Self {
315            legacy_audio_compatible,
316            echo_canceller: apm,
317            replays,
318        })
319    }
320}