1use anyhow::{Context as _, Result};
2use collections::HashMap;
3use cpal::{
4 DeviceDescription, DeviceId, default_host,
5 traits::{DeviceTrait, HostTrait},
6};
7use gpui::{App, AsyncApp, BackgroundExecutor, BorrowAppContext, Global};
8
9#[cfg(not(any(all(target_os = "windows", target_env = "gnu"), target_os = "freebsd")))]
10mod non_windows_and_freebsd_deps {
11 pub(super) use cpal::Sample;
12 pub(super) use libwebrtc::native::apm;
13 pub(super) use parking_lot::Mutex;
14 pub(super) use rodio::source::LimitSettings;
15 pub(super) use std::sync::Arc;
16}
17
18#[cfg(not(any(all(target_os = "windows", target_env = "gnu"), target_os = "freebsd")))]
19use non_windows_and_freebsd_deps::*;
20
21use rodio::{
22 Decoder, DeviceSinkBuilder, MixerDeviceSink, Source,
23 mixer::Mixer,
24 nz,
25 source::{AutomaticGainControlSettings, Buffered},
26};
27use settings::Settings;
28use std::{io::Cursor, num::NonZero, path::PathBuf, sync::atomic::Ordering, time::Duration};
29use util::ResultExt;
30
31mod audio_settings;
32mod replays;
33mod rodio_ext;
34pub use audio_settings::AudioSettings;
35pub use rodio_ext::RodioExt;
36
37use crate::audio_settings::LIVE_SETTINGS;
38
39// We are migrating to 16kHz sample rate from 48kHz. In the future
40// once we are reasonably sure most users have upgraded we will
41// remove the LEGACY parameters.
42//
43// We migrate to 16kHz because it is sufficient for speech and required
44// by the denoiser and future Speech to Text layers.
45pub const SAMPLE_RATE: NonZero<u32> = nz!(16000);
46pub const CHANNEL_COUNT: NonZero<u16> = nz!(1);
47pub const BUFFER_SIZE: usize = // echo canceller and livekit want 10ms of audio
48 (SAMPLE_RATE.get() as usize / 100) * CHANNEL_COUNT.get() as usize;
49
50pub const LEGACY_SAMPLE_RATE: NonZero<u32> = nz!(48000);
51pub const LEGACY_CHANNEL_COUNT: NonZero<u16> = nz!(2);
52
53pub const REPLAY_DURATION: Duration = Duration::from_secs(30);
54
55pub fn init(cx: &mut App) {
56 LIVE_SETTINGS.initialize(cx);
57 // TODO(jk): this is currently cached only once at startup - we should observe and react instead
58 let task = cx
59 .background_executor()
60 .spawn(async move { get_available_audio_devices() });
61 cx.spawn(async move |cx: &mut AsyncApp| {
62 let devices = task.await;
63 cx.update(|cx| cx.set_global(AvailableAudioDevices(devices)))
64 })
65 .detach();
66}
67
68#[derive(Debug, Copy, Clone, Eq, Hash, PartialEq)]
69pub enum Sound {
70 Joined,
71 GuestJoined,
72 Leave,
73 Mute,
74 Unmute,
75 StartScreenshare,
76 StopScreenshare,
77 AgentDone,
78}
79
80impl Sound {
81 fn file(&self) -> &'static str {
82 match self {
83 Self::Joined => "joined_call",
84 Self::GuestJoined => "guest_joined_call",
85 Self::Leave => "leave_call",
86 Self::Mute => "mute",
87 Self::Unmute => "unmute",
88 Self::StartScreenshare => "start_screenshare",
89 Self::StopScreenshare => "stop_screenshare",
90 Self::AgentDone => "agent_done",
91 }
92 }
93}
94
95pub struct Audio {
96 output_handle: Option<MixerDeviceSink>,
97 #[cfg(not(any(all(target_os = "windows", target_env = "gnu"), target_os = "freebsd")))]
98 pub echo_canceller: Arc<Mutex<apm::AudioProcessingModule>>,
99 source_cache: HashMap<Sound, Buffered<Decoder<Cursor<Vec<u8>>>>>,
100 replays: replays::Replays,
101}
102
103impl Default for Audio {
104 fn default() -> Self {
105 Self {
106 output_handle: Default::default(),
107 #[cfg(not(any(
108 all(target_os = "windows", target_env = "gnu"),
109 target_os = "freebsd"
110 )))]
111 echo_canceller: Arc::new(Mutex::new(apm::AudioProcessingModule::new(
112 true, false, false, false,
113 ))),
114 source_cache: Default::default(),
115 replays: Default::default(),
116 }
117 }
118}
119
120impl Global for Audio {}
121
122impl Audio {
123 fn ensure_output_exists(&mut self, output_audio_device: Option<DeviceId>) -> Result<&Mixer> {
124 #[cfg(debug_assertions)]
125 log::warn!(
126 "Audio does not sound correct without optimizations. Use a release build to debug audio issues"
127 );
128
129 if self.output_handle.is_none() {
130 let output_handle = open_output_stream(output_audio_device)?;
131
132 // The webrtc apm is not yet compiling for windows & freebsd
133 #[cfg(not(any(
134 any(all(target_os = "windows", target_env = "gnu")),
135 target_os = "freebsd"
136 )))]
137 let echo_canceller = Arc::clone(&self.echo_canceller);
138
139 #[cfg(not(any(
140 any(all(target_os = "windows", target_env = "gnu")),
141 target_os = "freebsd"
142 )))]
143 {
144 let source = rodio::source::Zero::new(CHANNEL_COUNT, SAMPLE_RATE)
145 .inspect_buffer::<BUFFER_SIZE, _>(move |buffer| {
146 let mut buf: [i16; _] = buffer.map(|s| s.to_sample());
147 echo_canceller
148 .lock()
149 .process_reverse_stream(
150 &mut buf,
151 SAMPLE_RATE.get() as i32,
152 CHANNEL_COUNT.get().into(),
153 )
154 .expect("Audio input and output threads should not panic");
155 });
156 output_handle.mixer().add(source);
157 }
158
159 #[cfg(any(
160 any(all(target_os = "windows", target_env = "gnu")),
161 target_os = "freebsd"
162 ))]
163 {
164 let source = rodio::source::Zero::<f32>::new(CHANNEL_COUNT, SAMPLE_RATE);
165 output_handle.mixer().add(source);
166 }
167
168 self.output_handle = Some(output_handle);
169 }
170
171 Ok(self
172 .output_handle
173 .as_ref()
174 .map(|h| h.mixer())
175 .expect("we only get here if opening the outputstream succeeded"))
176 }
177
178 pub fn save_replays(
179 &self,
180 executor: BackgroundExecutor,
181 ) -> gpui::Task<anyhow::Result<(PathBuf, Duration)>> {
182 self.replays.replays_to_tar(executor)
183 }
184
185 #[cfg(not(any(all(target_os = "windows", target_env = "gnu"), target_os = "freebsd")))]
186 pub fn open_microphone(voip_parts: VoipParts) -> anyhow::Result<impl Source> {
187 let stream = open_input_stream(voip_parts.input_audio_device)?;
188 let stream = stream
189 .possibly_disconnected_channels_to_mono()
190 .constant_samplerate(SAMPLE_RATE)
191 .limit(LimitSettings::live_performance())
192 .process_buffer::<BUFFER_SIZE, _>(move |buffer| {
193 let mut int_buffer: [i16; _] = buffer.map(|s| s.to_sample());
194 if voip_parts
195 .echo_canceller
196 .lock()
197 .process_stream(
198 &mut int_buffer,
199 SAMPLE_RATE.get() as i32,
200 CHANNEL_COUNT.get() as i32,
201 )
202 .context("livekit audio processor error")
203 .log_err()
204 .is_some()
205 {
206 for (sample, processed) in buffer.iter_mut().zip(&int_buffer) {
207 *sample = (*processed).to_sample();
208 }
209 }
210 })
211 .denoise()
212 .context("Could not set up denoiser")?
213 .automatic_gain_control(AutomaticGainControlSettings {
214 target_level: 0.90,
215 attack_time: Duration::from_secs(1),
216 release_time: Duration::from_secs(0),
217 absolute_max_gain: 5.0,
218 })
219 .periodic_access(Duration::from_millis(100), move |agc_source| {
220 agc_source
221 .set_enabled(LIVE_SETTINGS.auto_microphone_volume.load(Ordering::Relaxed));
222 let denoise = agc_source.inner_mut();
223 denoise.set_enabled(LIVE_SETTINGS.denoise.load(Ordering::Relaxed));
224 });
225
226 let stream = if voip_parts.legacy_audio_compatible {
227 stream.constant_params(LEGACY_CHANNEL_COUNT, LEGACY_SAMPLE_RATE)
228 } else {
229 stream.constant_params(CHANNEL_COUNT, SAMPLE_RATE)
230 };
231
232 let (replay, stream) = stream.replayable(REPLAY_DURATION)?;
233 voip_parts
234 .replays
235 .add_voip_stream("local microphone".to_string(), replay);
236
237 Ok(stream)
238 }
239
240 pub fn play_voip_stream(
241 source: impl rodio::Source + Send + 'static,
242 speaker_name: String,
243 is_staff: bool,
244 cx: &mut App,
245 ) -> anyhow::Result<()> {
246 let (replay_source, source) = source
247 .constant_params(CHANNEL_COUNT, SAMPLE_RATE)
248 .automatic_gain_control(AutomaticGainControlSettings {
249 target_level: 0.90,
250 attack_time: Duration::from_secs(1),
251 release_time: Duration::from_secs(0),
252 absolute_max_gain: 5.0,
253 })
254 .periodic_access(Duration::from_millis(100), move |agc_source| {
255 agc_source.set_enabled(LIVE_SETTINGS.auto_speaker_volume.load(Ordering::Relaxed));
256 })
257 .replayable(REPLAY_DURATION)
258 .expect("REPLAY_DURATION is longer than 100ms");
259 let output_audio_device = AudioSettings::get_global(cx).output_audio_device.clone();
260
261 cx.update_default_global(|this: &mut Self, _cx| {
262 let output_mixer = this
263 .ensure_output_exists(output_audio_device)
264 .context("Could not get output mixer")?;
265 output_mixer.add(source);
266 if is_staff {
267 this.replays.add_voip_stream(speaker_name, replay_source);
268 }
269 Ok(())
270 })
271 }
272
273 pub fn play_sound(sound: Sound, cx: &mut App) {
274 let output_audio_device = AudioSettings::get_global(cx).output_audio_device.clone();
275 cx.update_default_global(|this: &mut Self, cx| {
276 let source = this.sound_source(sound, cx).log_err()?;
277 let output_mixer = this
278 .ensure_output_exists(output_audio_device)
279 .context("Could not get output mixer")
280 .log_err()?;
281
282 output_mixer.add(source);
283 Some(())
284 });
285 }
286
287 pub fn end_call(cx: &mut App) {
288 cx.update_default_global(|this: &mut Self, _cx| {
289 this.output_handle.take();
290 });
291 }
292
293 fn sound_source(&mut self, sound: Sound, cx: &App) -> Result<impl Source + use<>> {
294 if let Some(wav) = self.source_cache.get(&sound) {
295 return Ok(wav.clone());
296 }
297
298 let path = format!("sounds/{}.wav", sound.file());
299 let bytes = cx
300 .asset_source()
301 .load(&path)?
302 .map(anyhow::Ok)
303 .with_context(|| format!("No asset available for path {path}"))??
304 .into_owned();
305 let cursor = Cursor::new(bytes);
306 let source = Decoder::new(cursor)?.buffered();
307
308 self.source_cache.insert(sound, source.clone());
309
310 Ok(source)
311 }
312}
313
314#[cfg(not(any(all(target_os = "windows", target_env = "gnu"), target_os = "freebsd")))]
315pub struct VoipParts {
316 echo_canceller: Arc<Mutex<apm::AudioProcessingModule>>,
317 replays: replays::Replays,
318 legacy_audio_compatible: bool,
319 input_audio_device: Option<DeviceId>,
320}
321
322#[cfg(not(any(all(target_os = "windows", target_env = "gnu"), target_os = "freebsd")))]
323impl VoipParts {
324 pub fn new(cx: &AsyncApp) -> anyhow::Result<Self> {
325 let (apm, replays) = cx.read_default_global::<Audio, _>(|audio, _| {
326 (Arc::clone(&audio.echo_canceller), audio.replays.clone())
327 });
328 let legacy_audio_compatible =
329 AudioSettings::try_read_global(cx, |settings| settings.legacy_audio_compatible)
330 .unwrap_or(true);
331 let input_audio_device =
332 AudioSettings::try_read_global(cx, |settings| settings.input_audio_device.clone())
333 .flatten();
334
335 Ok(Self {
336 legacy_audio_compatible,
337 echo_canceller: apm,
338 replays,
339 input_audio_device,
340 })
341 }
342}
343
344pub fn open_input_stream(
345 device_id: Option<DeviceId>,
346) -> anyhow::Result<rodio::microphone::Microphone> {
347 let builder = rodio::microphone::MicrophoneBuilder::new();
348 let builder = if let Some(id) = device_id {
349 // TODO(jk): upstream patch
350 // if let Some(input_device) = default_host().device_by_id(id) {
351 // builder.device(input_device);
352 // }
353 let mut found = None;
354 for input in rodio::microphone::available_inputs()? {
355 if input.clone().into_inner().id()? == id {
356 found = Some(builder.device(input));
357 break;
358 }
359 }
360 found.unwrap_or_else(|| builder.default_device())?
361 } else {
362 builder.default_device()?
363 };
364 let stream = builder
365 .default_config()?
366 .prefer_sample_rates([
367 SAMPLE_RATE,
368 SAMPLE_RATE.saturating_mul(rodio::nz!(2)),
369 SAMPLE_RATE.saturating_mul(rodio::nz!(3)),
370 SAMPLE_RATE.saturating_mul(rodio::nz!(4)),
371 ])
372 .prefer_channel_counts([rodio::nz!(1), rodio::nz!(2), rodio::nz!(3), rodio::nz!(4)])
373 .prefer_buffer_sizes(512..)
374 .open_stream()?;
375 log::info!("Opened microphone: {:?}", stream.config());
376 Ok(stream)
377}
378
379pub fn open_output_stream(device_id: Option<DeviceId>) -> anyhow::Result<MixerDeviceSink> {
380 let output_handle = if let Some(id) = device_id {
381 if let Some(device) = default_host().device_by_id(&id) {
382 DeviceSinkBuilder::from_device(device)?.open_stream()
383 } else {
384 DeviceSinkBuilder::open_default_sink()
385 }
386 } else {
387 DeviceSinkBuilder::open_default_sink()
388 };
389 let mut output_handle = output_handle.context("Could not open output stream")?;
390 output_handle.log_on_drop(false);
391 log::info!("Output stream: {:?}", output_handle);
392 Ok(output_handle)
393}
394
395#[derive(Clone, Debug)]
396pub struct AudioDeviceInfo {
397 pub id: DeviceId,
398 pub desc: DeviceDescription,
399}
400
401impl AudioDeviceInfo {
402 pub fn matches_input(&self, is_input: bool) -> bool {
403 if is_input {
404 self.desc.supports_input()
405 } else {
406 self.desc.supports_output()
407 }
408 }
409
410 pub fn matches(&self, id: &DeviceId, is_input: bool) -> bool {
411 &self.id == id && self.matches_input(is_input)
412 }
413}
414
415impl std::fmt::Display for AudioDeviceInfo {
416 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
417 write!(f, "{} ({})", self.desc.name(), self.id)
418 }
419}
420
421fn get_available_audio_devices() -> Vec<AudioDeviceInfo> {
422 let Some(devices) = default_host().devices().ok() else {
423 return Vec::new();
424 };
425 devices
426 .filter_map(|device| {
427 let id = device.id().ok()?;
428 let desc = device.description().ok()?;
429 Some(AudioDeviceInfo { id, desc })
430 })
431 .collect()
432}
433
434#[derive(Default, Clone, Debug)]
435pub struct AvailableAudioDevices(pub Vec<AudioDeviceInfo>);
436
437impl Global for AvailableAudioDevices {}