1use anyhow::{Context as _, Result};
2use collections::HashMap;
3use gpui::{App, BackgroundExecutor, BorrowAppContext, Global};
4use log::info;
5
6#[cfg(not(any(all(target_os = "windows", target_env = "gnu"), target_os = "freebsd")))]
7mod non_windows_and_freebsd_deps {
8 pub(super) use gpui::AsyncApp;
9 pub(super) use libwebrtc::native::apm;
10 pub(super) use parking_lot::Mutex;
11 pub(super) use rodio::cpal::Sample;
12 pub(super) use rodio::source::LimitSettings;
13 pub(super) use std::sync::Arc;
14}
15
16#[cfg(not(any(all(target_os = "windows", target_env = "gnu"), target_os = "freebsd")))]
17use non_windows_and_freebsd_deps::*;
18
19use rodio::{
20 Decoder, OutputStream, OutputStreamBuilder, Source, mixer::Mixer, nz, source::Buffered,
21};
22use settings::Settings;
23use std::{io::Cursor, num::NonZero, path::PathBuf, sync::atomic::Ordering, time::Duration};
24use util::ResultExt;
25
26mod audio_settings;
27mod replays;
28mod rodio_ext;
29pub use audio_settings::AudioSettings;
30pub use rodio_ext::RodioExt;
31
32use crate::audio_settings::LIVE_SETTINGS;
33
34// We are migrating to 16kHz sample rate from 48kHz. In the future
35// once we are reasonably sure most users have upgraded we will
36// remove the LEGACY parameters.
37//
38// We migrate to 16kHz because it is sufficient for speech and required
39// by the denoiser and future Speech to Text layers.
40pub const SAMPLE_RATE: NonZero<u32> = nz!(16000);
41pub const CHANNEL_COUNT: NonZero<u16> = nz!(1);
42pub const BUFFER_SIZE: usize = // echo canceller and livekit want 10ms of audio
43 (SAMPLE_RATE.get() as usize / 100) * CHANNEL_COUNT.get() as usize;
44
45pub const LEGACY_SAMPLE_RATE: NonZero<u32> = nz!(48000);
46pub const LEGACY_CHANNEL_COUNT: NonZero<u16> = nz!(2);
47
48pub const REPLAY_DURATION: Duration = Duration::from_secs(30);
49
50pub fn init(cx: &mut App) {
51 AudioSettings::register(cx);
52 LIVE_SETTINGS.initialize(cx);
53}
54
55#[derive(Debug, Copy, Clone, Eq, Hash, PartialEq)]
56pub enum Sound {
57 Joined,
58 GuestJoined,
59 Leave,
60 Mute,
61 Unmute,
62 StartScreenshare,
63 StopScreenshare,
64 AgentDone,
65}
66
67impl Sound {
68 fn file(&self) -> &'static str {
69 match self {
70 Self::Joined => "joined_call",
71 Self::GuestJoined => "guest_joined_call",
72 Self::Leave => "leave_call",
73 Self::Mute => "mute",
74 Self::Unmute => "unmute",
75 Self::StartScreenshare => "start_screenshare",
76 Self::StopScreenshare => "stop_screenshare",
77 Self::AgentDone => "agent_done",
78 }
79 }
80}
81
82pub struct Audio {
83 output_handle: Option<OutputStream>,
84 output_mixer: Option<Mixer>,
85 #[cfg(not(any(all(target_os = "windows", target_env = "gnu"), target_os = "freebsd")))]
86 pub echo_canceller: Arc<Mutex<apm::AudioProcessingModule>>,
87 source_cache: HashMap<Sound, Buffered<Decoder<Cursor<Vec<u8>>>>>,
88 replays: replays::Replays,
89}
90
91impl Default for Audio {
92 fn default() -> Self {
93 Self {
94 output_handle: Default::default(),
95 output_mixer: Default::default(),
96 #[cfg(not(any(
97 all(target_os = "windows", target_env = "gnu"),
98 target_os = "freebsd"
99 )))]
100 echo_canceller: Arc::new(Mutex::new(apm::AudioProcessingModule::new(
101 true, false, false, false,
102 ))),
103 source_cache: Default::default(),
104 replays: Default::default(),
105 }
106 }
107}
108
109impl Global for Audio {}
110
111impl Audio {
112 fn ensure_output_exists(&mut self) -> Result<&Mixer> {
113 #[cfg(debug_assertions)]
114 log::warn!(
115 "Audio does not sound correct without optimizations. Use a release build to debug audio issues"
116 );
117
118 if self.output_handle.is_none() {
119 let output_handle = OutputStreamBuilder::open_default_stream()
120 .context("Could not open default output stream")?;
121 info!("Output stream: {:?}", output_handle);
122 self.output_handle = Some(output_handle);
123 if let Some(output_handle) = &self.output_handle {
124 let (mixer, source) = rodio::mixer::mixer(CHANNEL_COUNT, SAMPLE_RATE);
125 // or the mixer will end immediately as its empty.
126 mixer.add(rodio::source::Zero::new(CHANNEL_COUNT, SAMPLE_RATE));
127 self.output_mixer = Some(mixer);
128
129 // The webrtc apm is not yet compiling for windows & freebsd
130 #[cfg(not(any(
131 any(all(target_os = "windows", target_env = "gnu")),
132 target_os = "freebsd"
133 )))]
134 let echo_canceller = Arc::clone(&self.echo_canceller);
135 #[cfg(not(any(
136 any(all(target_os = "windows", target_env = "gnu")),
137 target_os = "freebsd"
138 )))]
139 let source = source.inspect_buffer::<BUFFER_SIZE, _>(move |buffer| {
140 let mut buf: [i16; _] = buffer.map(|s| s.to_sample());
141 echo_canceller
142 .lock()
143 .process_reverse_stream(
144 &mut buf,
145 SAMPLE_RATE.get() as i32,
146 CHANNEL_COUNT.get().into(),
147 )
148 .expect("Audio input and output threads should not panic");
149 });
150 output_handle.mixer().add(source);
151 }
152 }
153
154 Ok(self
155 .output_mixer
156 .as_ref()
157 .expect("we only get here if opening the outputstream succeeded"))
158 }
159
160 pub fn save_replays(
161 &self,
162 executor: BackgroundExecutor,
163 ) -> gpui::Task<anyhow::Result<(PathBuf, Duration)>> {
164 self.replays.replays_to_tar(executor)
165 }
166
167 #[cfg(not(any(all(target_os = "windows", target_env = "gnu"), target_os = "freebsd")))]
168 pub fn open_microphone(voip_parts: VoipParts) -> anyhow::Result<impl Source> {
169 let stream = rodio::microphone::MicrophoneBuilder::new()
170 .default_device()?
171 .default_config()?
172 .prefer_sample_rates([
173 SAMPLE_RATE, // sample rates trivially resamplable to `SAMPLE_RATE`
174 SAMPLE_RATE.saturating_mul(nz!(2)),
175 SAMPLE_RATE.saturating_mul(nz!(3)),
176 SAMPLE_RATE.saturating_mul(nz!(4)),
177 ])
178 .prefer_channel_counts([nz!(1), nz!(2), nz!(3), nz!(4)])
179 .prefer_buffer_sizes(512..)
180 .open_stream()?;
181 info!("Opened microphone: {:?}", stream.config());
182
183 let stream = stream
184 .possibly_disconnected_channels_to_mono()
185 .constant_samplerate(SAMPLE_RATE)
186 .limit(LimitSettings::live_performance())
187 .process_buffer::<BUFFER_SIZE, _>(move |buffer| {
188 let mut int_buffer: [i16; _] = buffer.map(|s| s.to_sample());
189 if voip_parts
190 .echo_canceller
191 .lock()
192 .process_stream(
193 &mut int_buffer,
194 SAMPLE_RATE.get() as i32,
195 CHANNEL_COUNT.get() as i32,
196 )
197 .context("livekit audio processor error")
198 .log_err()
199 .is_some()
200 {
201 for (sample, processed) in buffer.iter_mut().zip(&int_buffer) {
202 *sample = (*processed).to_sample();
203 }
204 }
205 })
206 .denoise()
207 .context("Could not set up denoiser")?
208 .automatic_gain_control(0.90, 1.0, 0.0, 5.0)
209 .periodic_access(Duration::from_millis(100), move |agc_source| {
210 agc_source
211 .set_enabled(LIVE_SETTINGS.auto_microphone_volume.load(Ordering::Relaxed));
212 let denoise = agc_source.inner_mut();
213 denoise.set_enabled(LIVE_SETTINGS.denoise.load(Ordering::Relaxed));
214 });
215
216 let stream = if voip_parts.legacy_audio_compatible {
217 stream.constant_params(LEGACY_CHANNEL_COUNT, LEGACY_SAMPLE_RATE)
218 } else {
219 stream.constant_params(CHANNEL_COUNT, SAMPLE_RATE)
220 };
221
222 let (replay, stream) = stream.replayable(REPLAY_DURATION)?;
223 voip_parts
224 .replays
225 .add_voip_stream("local microphone".to_string(), replay);
226
227 Ok(stream)
228 }
229
230 pub fn play_voip_stream(
231 source: impl rodio::Source + Send + 'static,
232 speaker_name: String,
233 is_staff: bool,
234 cx: &mut App,
235 ) -> anyhow::Result<()> {
236 let (replay_source, source) = source
237 .constant_params(CHANNEL_COUNT, SAMPLE_RATE)
238 .automatic_gain_control(0.90, 1.0, 0.0, 5.0)
239 .periodic_access(Duration::from_millis(100), move |agc_source| {
240 agc_source.set_enabled(LIVE_SETTINGS.auto_speaker_volume.load(Ordering::Relaxed));
241 })
242 .replayable(REPLAY_DURATION)
243 .expect("REPLAY_DURATION is longer than 100ms");
244
245 cx.update_default_global(|this: &mut Self, _cx| {
246 let output_mixer = this
247 .ensure_output_exists()
248 .context("Could not get output mixer")?;
249 output_mixer.add(source);
250 if is_staff {
251 this.replays.add_voip_stream(speaker_name, replay_source);
252 }
253 Ok(())
254 })
255 }
256
257 pub fn play_sound(sound: Sound, cx: &mut App) {
258 cx.update_default_global(|this: &mut Self, cx| {
259 let source = this.sound_source(sound, cx).log_err()?;
260 let output_mixer = this
261 .ensure_output_exists()
262 .context("Could not get output mixer")
263 .log_err()?;
264
265 output_mixer.add(source);
266 Some(())
267 });
268 }
269
270 pub fn end_call(cx: &mut App) {
271 cx.update_default_global(|this: &mut Self, _cx| {
272 this.output_handle.take();
273 });
274 }
275
276 fn sound_source(&mut self, sound: Sound, cx: &App) -> Result<impl Source + use<>> {
277 if let Some(wav) = self.source_cache.get(&sound) {
278 return Ok(wav.clone());
279 }
280
281 let path = format!("sounds/{}.wav", sound.file());
282 let bytes = cx
283 .asset_source()
284 .load(&path)?
285 .map(anyhow::Ok)
286 .with_context(|| format!("No asset available for path {path}"))??
287 .into_owned();
288 let cursor = Cursor::new(bytes);
289 let source = Decoder::new(cursor)?.buffered();
290
291 self.source_cache.insert(sound, source.clone());
292
293 Ok(source)
294 }
295}
296
297#[cfg(not(any(all(target_os = "windows", target_env = "gnu"), target_os = "freebsd")))]
298pub struct VoipParts {
299 echo_canceller: Arc<Mutex<apm::AudioProcessingModule>>,
300 replays: replays::Replays,
301 legacy_audio_compatible: bool,
302}
303
304#[cfg(not(any(all(target_os = "windows", target_env = "gnu"), target_os = "freebsd")))]
305impl VoipParts {
306 pub fn new(cx: &AsyncApp) -> anyhow::Result<Self> {
307 let (apm, replays) = cx.try_read_default_global::<Audio, _>(|audio, _| {
308 (Arc::clone(&audio.echo_canceller), audio.replays.clone())
309 })?;
310 let legacy_audio_compatible =
311 AudioSettings::try_read_global(cx, |settings| settings.legacy_audio_compatible)
312 .unwrap_or(true);
313
314 Ok(Self {
315 legacy_audio_compatible,
316 echo_canceller: apm,
317 replays,
318 })
319 }
320}