1use anyhow::{Context as _, Result};
2use collections::HashMap;
3use cpal::{
4 DeviceDescription, DeviceId, default_host,
5 traits::{DeviceTrait, HostTrait},
6};
7use gpui::{App, AsyncApp, BackgroundExecutor, BorrowAppContext, Global};
8
9#[cfg(not(any(all(target_os = "windows", target_env = "gnu"), target_os = "freebsd")))]
10mod non_windows_and_freebsd_deps {
11 pub(super) use cpal::Sample;
12 pub(super) use libwebrtc::native::apm;
13 pub(super) use parking_lot::Mutex;
14 pub(super) use rodio::source::LimitSettings;
15 pub(super) use std::sync::Arc;
16}
17
18#[cfg(not(any(all(target_os = "windows", target_env = "gnu"), target_os = "freebsd")))]
19use non_windows_and_freebsd_deps::*;
20
21use rodio::{
22 Decoder, DeviceSinkBuilder, MixerDeviceSink, Source,
23 mixer::Mixer,
24 nz,
25 source::{AutomaticGainControlSettings, Buffered},
26};
27use settings::Settings;
28use std::{io::Cursor, num::NonZero, path::PathBuf, sync::atomic::Ordering, time::Duration};
29use util::ResultExt;
30
31mod audio_settings;
32mod replays;
33mod rodio_ext;
34pub use audio_settings::AudioSettings;
35pub use rodio_ext::RodioExt;
36
37use crate::audio_settings::LIVE_SETTINGS;
38
39// We are migrating to 16kHz sample rate from 48kHz. In the future
40// once we are reasonably sure most users have upgraded we will
41// remove the LEGACY parameters.
42//
43// We migrate to 16kHz because it is sufficient for speech and required
44// by the denoiser and future Speech to Text layers.
45pub const SAMPLE_RATE: NonZero<u32> = nz!(16000);
46pub const CHANNEL_COUNT: NonZero<u16> = nz!(1);
47pub const BUFFER_SIZE: usize = // echo canceller and livekit want 10ms of audio
48 (SAMPLE_RATE.get() as usize / 100) * CHANNEL_COUNT.get() as usize;
49
50pub const LEGACY_SAMPLE_RATE: NonZero<u32> = nz!(48000);
51pub const LEGACY_CHANNEL_COUNT: NonZero<u16> = nz!(2);
52
53pub const REPLAY_DURATION: Duration = Duration::from_secs(30);
54
55pub fn init(cx: &mut App) {
56 LIVE_SETTINGS.initialize(cx);
57}
58
59// TODO(jk): this is currently cached only once - we should observe and react instead
60pub fn ensure_devices_initialized(cx: &mut App) {
61 if cx.has_global::<AvailableAudioDevices>() {
62 return;
63 }
64 cx.default_global::<AvailableAudioDevices>();
65 let task = cx
66 .background_executor()
67 .spawn(async move { get_available_audio_devices() });
68 cx.spawn(async move |cx: &mut AsyncApp| {
69 let devices = task.await;
70 cx.update(|cx| cx.set_global(AvailableAudioDevices(devices)));
71 cx.refresh();
72 })
73 .detach();
74}
75
76#[derive(Debug, Copy, Clone, Eq, Hash, PartialEq)]
77pub enum Sound {
78 Joined,
79 GuestJoined,
80 Leave,
81 Mute,
82 Unmute,
83 StartScreenshare,
84 StopScreenshare,
85 AgentDone,
86}
87
88impl Sound {
89 fn file(&self) -> &'static str {
90 match self {
91 Self::Joined => "joined_call",
92 Self::GuestJoined => "guest_joined_call",
93 Self::Leave => "leave_call",
94 Self::Mute => "mute",
95 Self::Unmute => "unmute",
96 Self::StartScreenshare => "start_screenshare",
97 Self::StopScreenshare => "stop_screenshare",
98 Self::AgentDone => "agent_done",
99 }
100 }
101}
102
103pub struct Audio {
104 output_handle: Option<MixerDeviceSink>,
105 #[cfg(not(any(all(target_os = "windows", target_env = "gnu"), target_os = "freebsd")))]
106 pub echo_canceller: Arc<Mutex<apm::AudioProcessingModule>>,
107 source_cache: HashMap<Sound, Buffered<Decoder<Cursor<Vec<u8>>>>>,
108 replays: replays::Replays,
109}
110
111impl Default for Audio {
112 fn default() -> Self {
113 Self {
114 output_handle: Default::default(),
115 #[cfg(not(any(
116 all(target_os = "windows", target_env = "gnu"),
117 target_os = "freebsd"
118 )))]
119 echo_canceller: Arc::new(Mutex::new(apm::AudioProcessingModule::new(
120 true, false, false, false,
121 ))),
122 source_cache: Default::default(),
123 replays: Default::default(),
124 }
125 }
126}
127
128impl Global for Audio {}
129
130impl Audio {
131 fn ensure_output_exists(&mut self, output_audio_device: Option<DeviceId>) -> Result<&Mixer> {
132 #[cfg(debug_assertions)]
133 log::warn!(
134 "Audio does not sound correct without optimizations. Use a release build to debug audio issues"
135 );
136
137 if self.output_handle.is_none() {
138 let output_handle = open_output_stream(output_audio_device)?;
139
140 // The webrtc apm is not yet compiling for windows & freebsd
141 #[cfg(not(any(
142 any(all(target_os = "windows", target_env = "gnu")),
143 target_os = "freebsd"
144 )))]
145 let echo_canceller = Arc::clone(&self.echo_canceller);
146
147 #[cfg(not(any(
148 any(all(target_os = "windows", target_env = "gnu")),
149 target_os = "freebsd"
150 )))]
151 {
152 let source = rodio::source::Zero::new(CHANNEL_COUNT, SAMPLE_RATE)
153 .inspect_buffer::<BUFFER_SIZE, _>(move |buffer| {
154 let mut buf: [i16; _] = buffer.map(|s| s.to_sample());
155 echo_canceller
156 .lock()
157 .process_reverse_stream(
158 &mut buf,
159 SAMPLE_RATE.get() as i32,
160 CHANNEL_COUNT.get().into(),
161 )
162 .expect("Audio input and output threads should not panic");
163 });
164 output_handle.mixer().add(source);
165 }
166
167 #[cfg(any(
168 any(all(target_os = "windows", target_env = "gnu")),
169 target_os = "freebsd"
170 ))]
171 {
172 let source = rodio::source::Zero::new(CHANNEL_COUNT, SAMPLE_RATE);
173 output_handle.mixer().add(source);
174 }
175
176 self.output_handle = Some(output_handle);
177 }
178
179 Ok(self
180 .output_handle
181 .as_ref()
182 .map(|h| h.mixer())
183 .expect("we only get here if opening the outputstream succeeded"))
184 }
185
186 pub fn save_replays(
187 &self,
188 executor: BackgroundExecutor,
189 ) -> gpui::Task<anyhow::Result<(PathBuf, Duration)>> {
190 self.replays.replays_to_tar(executor)
191 }
192
193 #[cfg(not(any(all(target_os = "windows", target_env = "gnu"), target_os = "freebsd")))]
194 pub fn open_microphone(voip_parts: VoipParts) -> anyhow::Result<impl Source> {
195 let stream = open_input_stream(voip_parts.input_audio_device)?;
196 let stream = stream
197 .possibly_disconnected_channels_to_mono()
198 .constant_samplerate(SAMPLE_RATE)
199 .limit(LimitSettings::live_performance())
200 .process_buffer::<BUFFER_SIZE, _>(move |buffer| {
201 let mut int_buffer: [i16; _] = buffer.map(|s| s.to_sample());
202 if voip_parts
203 .echo_canceller
204 .lock()
205 .process_stream(
206 &mut int_buffer,
207 SAMPLE_RATE.get() as i32,
208 CHANNEL_COUNT.get() as i32,
209 )
210 .context("livekit audio processor error")
211 .log_err()
212 .is_some()
213 {
214 for (sample, processed) in buffer.iter_mut().zip(&int_buffer) {
215 *sample = (*processed).to_sample();
216 }
217 }
218 })
219 .denoise()
220 .context("Could not set up denoiser")?
221 .automatic_gain_control(AutomaticGainControlSettings {
222 target_level: 0.90,
223 attack_time: Duration::from_secs(1),
224 release_time: Duration::from_secs(0),
225 absolute_max_gain: 5.0,
226 })
227 .periodic_access(Duration::from_millis(100), move |agc_source| {
228 agc_source
229 .set_enabled(LIVE_SETTINGS.auto_microphone_volume.load(Ordering::Relaxed));
230 let denoise = agc_source.inner_mut();
231 denoise.set_enabled(LIVE_SETTINGS.denoise.load(Ordering::Relaxed));
232 });
233
234 let stream = if voip_parts.legacy_audio_compatible {
235 stream.constant_params(LEGACY_CHANNEL_COUNT, LEGACY_SAMPLE_RATE)
236 } else {
237 stream.constant_params(CHANNEL_COUNT, SAMPLE_RATE)
238 };
239
240 let (replay, stream) = stream.replayable(REPLAY_DURATION)?;
241 voip_parts
242 .replays
243 .add_voip_stream("local microphone".to_string(), replay);
244
245 Ok(stream)
246 }
247
248 pub fn play_voip_stream(
249 source: impl rodio::Source + Send + 'static,
250 speaker_name: String,
251 is_staff: bool,
252 cx: &mut App,
253 ) -> anyhow::Result<()> {
254 let (replay_source, source) = source
255 .constant_params(CHANNEL_COUNT, SAMPLE_RATE)
256 .automatic_gain_control(AutomaticGainControlSettings {
257 target_level: 0.90,
258 attack_time: Duration::from_secs(1),
259 release_time: Duration::from_secs(0),
260 absolute_max_gain: 5.0,
261 })
262 .periodic_access(Duration::from_millis(100), move |agc_source| {
263 agc_source.set_enabled(LIVE_SETTINGS.auto_speaker_volume.load(Ordering::Relaxed));
264 })
265 .replayable(REPLAY_DURATION)
266 .expect("REPLAY_DURATION is longer than 100ms");
267 let output_audio_device = AudioSettings::get_global(cx).output_audio_device.clone();
268
269 cx.update_default_global(|this: &mut Self, _cx| {
270 let output_mixer = this
271 .ensure_output_exists(output_audio_device)
272 .context("Could not get output mixer")?;
273 output_mixer.add(source);
274 if is_staff {
275 this.replays.add_voip_stream(speaker_name, replay_source);
276 }
277 Ok(())
278 })
279 }
280
281 pub fn play_sound(sound: Sound, cx: &mut App) {
282 let output_audio_device = AudioSettings::get_global(cx).output_audio_device.clone();
283 cx.update_default_global(|this: &mut Self, cx| {
284 let source = this.sound_source(sound, cx).log_err()?;
285 let output_mixer = this
286 .ensure_output_exists(output_audio_device)
287 .context("Could not get output mixer")
288 .log_err()?;
289
290 output_mixer.add(source);
291 Some(())
292 });
293 }
294
295 pub fn end_call(cx: &mut App) {
296 cx.update_default_global(|this: &mut Self, _cx| {
297 this.output_handle.take();
298 });
299 }
300
301 fn sound_source(&mut self, sound: Sound, cx: &App) -> Result<impl Source + use<>> {
302 if let Some(wav) = self.source_cache.get(&sound) {
303 return Ok(wav.clone());
304 }
305
306 let path = format!("sounds/{}.wav", sound.file());
307 let bytes = cx
308 .asset_source()
309 .load(&path)?
310 .map(anyhow::Ok)
311 .with_context(|| format!("No asset available for path {path}"))??
312 .into_owned();
313 let cursor = Cursor::new(bytes);
314 let source = Decoder::new(cursor)?.buffered();
315
316 self.source_cache.insert(sound, source.clone());
317
318 Ok(source)
319 }
320}
321
322#[cfg(not(any(all(target_os = "windows", target_env = "gnu"), target_os = "freebsd")))]
323pub struct VoipParts {
324 echo_canceller: Arc<Mutex<apm::AudioProcessingModule>>,
325 replays: replays::Replays,
326 legacy_audio_compatible: bool,
327 input_audio_device: Option<DeviceId>,
328}
329
330#[cfg(not(any(all(target_os = "windows", target_env = "gnu"), target_os = "freebsd")))]
331impl VoipParts {
332 pub fn new(cx: &AsyncApp) -> anyhow::Result<Self> {
333 let (apm, replays) = cx.read_default_global::<Audio, _>(|audio, _| {
334 (Arc::clone(&audio.echo_canceller), audio.replays.clone())
335 });
336 let legacy_audio_compatible =
337 AudioSettings::try_read_global(cx, |settings| settings.legacy_audio_compatible)
338 .unwrap_or(true);
339 let input_audio_device =
340 AudioSettings::try_read_global(cx, |settings| settings.input_audio_device.clone())
341 .flatten();
342
343 Ok(Self {
344 legacy_audio_compatible,
345 echo_canceller: apm,
346 replays,
347 input_audio_device,
348 })
349 }
350}
351
352pub fn open_input_stream(
353 device_id: Option<DeviceId>,
354) -> anyhow::Result<rodio::microphone::Microphone> {
355 let builder = rodio::microphone::MicrophoneBuilder::new();
356 let builder = if let Some(id) = device_id {
357 // TODO(jk): upstream patch
358 // if let Some(input_device) = default_host().device_by_id(id) {
359 // builder.device(input_device);
360 // }
361 let mut found = None;
362 for input in rodio::microphone::available_inputs()? {
363 if input.clone().into_inner().id()? == id {
364 found = Some(builder.device(input));
365 break;
366 }
367 }
368 found.unwrap_or_else(|| builder.default_device())?
369 } else {
370 builder.default_device()?
371 };
372 let stream = builder
373 .default_config()?
374 .prefer_sample_rates([
375 SAMPLE_RATE,
376 SAMPLE_RATE.saturating_mul(rodio::nz!(2)),
377 SAMPLE_RATE.saturating_mul(rodio::nz!(3)),
378 SAMPLE_RATE.saturating_mul(rodio::nz!(4)),
379 ])
380 .prefer_channel_counts([rodio::nz!(1), rodio::nz!(2), rodio::nz!(3), rodio::nz!(4)])
381 .prefer_buffer_sizes(512..)
382 .open_stream()?;
383 log::info!("Opened microphone: {:?}", stream.config());
384 Ok(stream)
385}
386
387pub fn open_output_stream(device_id: Option<DeviceId>) -> anyhow::Result<MixerDeviceSink> {
388 let output_handle = if let Some(id) = device_id {
389 if let Some(device) = default_host().device_by_id(&id) {
390 DeviceSinkBuilder::from_device(device)?.open_stream()
391 } else {
392 DeviceSinkBuilder::open_default_sink()
393 }
394 } else {
395 DeviceSinkBuilder::open_default_sink()
396 };
397 let mut output_handle = output_handle.context("Could not open output stream")?;
398 output_handle.log_on_drop(false);
399 log::info!("Output stream: {:?}", output_handle);
400 Ok(output_handle)
401}
402
403#[derive(Clone, Debug)]
404pub struct AudioDeviceInfo {
405 pub id: DeviceId,
406 pub desc: DeviceDescription,
407}
408
409impl AudioDeviceInfo {
410 pub fn matches_input(&self, is_input: bool) -> bool {
411 if is_input {
412 self.desc.supports_input()
413 } else {
414 self.desc.supports_output()
415 }
416 }
417
418 pub fn matches(&self, id: &DeviceId, is_input: bool) -> bool {
419 &self.id == id && self.matches_input(is_input)
420 }
421}
422
423impl std::fmt::Display for AudioDeviceInfo {
424 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
425 write!(f, "{} ({})", self.desc.name(), self.id)
426 }
427}
428
429fn get_available_audio_devices() -> Vec<AudioDeviceInfo> {
430 let Some(devices) = default_host().devices().ok() else {
431 return Vec::new();
432 };
433 devices
434 .filter_map(|device| {
435 let id = device.id().ok()?;
436 let desc = device.description().ok()?;
437 Some(AudioDeviceInfo { id, desc })
438 })
439 .collect()
440}
441
442#[derive(Default, Clone, Debug)]
443pub struct AvailableAudioDevices(pub Vec<AudioDeviceInfo>);
444
445impl Global for AvailableAudioDevices {}