1use anyhow::{Context as _, Result};
2use collections::HashMap;
3use gpui::{App, BackgroundExecutor, BorrowAppContext, Global};
4
5#[cfg(not(any(all(target_os = "windows", target_env = "gnu"), target_os = "freebsd")))]
6mod non_windows_and_freebsd_deps {
7 pub(super) use gpui::AsyncApp;
8 pub(super) use libwebrtc::native::apm;
9 pub(super) use log::info;
10 pub(super) use parking_lot::Mutex;
11 pub(super) use rodio::cpal::Sample;
12 pub(super) use rodio::source::LimitSettings;
13 pub(super) use std::sync::Arc;
14}
15
16#[cfg(not(any(all(target_os = "windows", target_env = "gnu"), target_os = "freebsd")))]
17use non_windows_and_freebsd_deps::*;
18
19use rodio::{
20 Decoder, OutputStream, OutputStreamBuilder, Source, mixer::Mixer, nz, source::Buffered,
21};
22use settings::Settings;
23use std::{io::Cursor, num::NonZero, path::PathBuf, sync::atomic::Ordering, time::Duration};
24use util::ResultExt;
25
26mod audio_settings;
27mod replays;
28mod rodio_ext;
29pub use audio_settings::AudioSettings;
30pub use rodio_ext::RodioExt;
31
32use crate::audio_settings::LIVE_SETTINGS;
33
34// We are migrating to 16kHz sample rate from 48kHz. In the future
35// once we are reasonably sure most users have upgraded we will
36// remove the LEGACY parameters.
37//
38// We migrate to 16kHz because it is sufficient for speech and required
39// by the denoiser and future Speech to Text layers.
40pub const SAMPLE_RATE: NonZero<u32> = nz!(16000);
41pub const CHANNEL_COUNT: NonZero<u16> = nz!(1);
42pub const BUFFER_SIZE: usize = // echo canceller and livekit want 10ms of audio
43 (SAMPLE_RATE.get() as usize / 100) * CHANNEL_COUNT.get() as usize;
44
45pub const LEGACY_SAMPLE_RATE: NonZero<u32> = nz!(48000);
46pub const LEGACY_CHANNEL_COUNT: NonZero<u16> = nz!(2);
47
48pub const REPLAY_DURATION: Duration = Duration::from_secs(30);
49
50pub fn init(cx: &mut App) {
51 AudioSettings::register(cx);
52 LIVE_SETTINGS.initialize(cx);
53}
54
55#[derive(Debug, Copy, Clone, Eq, Hash, PartialEq)]
56pub enum Sound {
57 Joined,
58 Leave,
59 Mute,
60 Unmute,
61 StartScreenshare,
62 StopScreenshare,
63 AgentDone,
64}
65
66impl Sound {
67 fn file(&self) -> &'static str {
68 match self {
69 Self::Joined => "joined_call",
70 Self::Leave => "leave_call",
71 Self::Mute => "mute",
72 Self::Unmute => "unmute",
73 Self::StartScreenshare => "start_screenshare",
74 Self::StopScreenshare => "stop_screenshare",
75 Self::AgentDone => "agent_done",
76 }
77 }
78}
79
80pub struct Audio {
81 output_handle: Option<OutputStream>,
82 output_mixer: Option<Mixer>,
83 #[cfg(not(any(all(target_os = "windows", target_env = "gnu"), target_os = "freebsd")))]
84 pub echo_canceller: Arc<Mutex<apm::AudioProcessingModule>>,
85 source_cache: HashMap<Sound, Buffered<Decoder<Cursor<Vec<u8>>>>>,
86 replays: replays::Replays,
87}
88
89impl Default for Audio {
90 fn default() -> Self {
91 Self {
92 output_handle: Default::default(),
93 output_mixer: Default::default(),
94 #[cfg(not(any(
95 all(target_os = "windows", target_env = "gnu"),
96 target_os = "freebsd"
97 )))]
98 echo_canceller: Arc::new(Mutex::new(apm::AudioProcessingModule::new(
99 true, false, false, false,
100 ))),
101 source_cache: Default::default(),
102 replays: Default::default(),
103 }
104 }
105}
106
107impl Global for Audio {}
108
109impl Audio {
110 fn ensure_output_exists(&mut self) -> Result<&Mixer> {
111 if self.output_handle.is_none() {
112 self.output_handle = Some(
113 OutputStreamBuilder::open_default_stream()
114 .context("Could not open default output stream")?,
115 );
116 if let Some(output_handle) = &self.output_handle {
117 let (mixer, source) = rodio::mixer::mixer(CHANNEL_COUNT, SAMPLE_RATE);
118 // or the mixer will end immediately as its empty.
119 mixer.add(rodio::source::Zero::new(CHANNEL_COUNT, SAMPLE_RATE));
120 self.output_mixer = Some(mixer);
121
122 // The webrtc apm is not yet compiling for windows & freebsd
123 #[cfg(not(any(
124 any(all(target_os = "windows", target_env = "gnu")),
125 target_os = "freebsd"
126 )))]
127 let echo_canceller = Arc::clone(&self.echo_canceller);
128 #[cfg(not(any(
129 any(all(target_os = "windows", target_env = "gnu")),
130 target_os = "freebsd"
131 )))]
132 let source = source.inspect_buffer::<BUFFER_SIZE, _>(move |buffer| {
133 let mut buf: [i16; _] = buffer.map(|s| s.to_sample());
134 echo_canceller
135 .lock()
136 .process_reverse_stream(
137 &mut buf,
138 SAMPLE_RATE.get() as i32,
139 CHANNEL_COUNT.get().into(),
140 )
141 .expect("Audio input and output threads should not panic");
142 });
143 output_handle.mixer().add(source);
144 }
145 }
146
147 Ok(self
148 .output_mixer
149 .as_ref()
150 .expect("we only get here if opening the outputstream succeeded"))
151 }
152
153 pub fn save_replays(
154 &self,
155 executor: BackgroundExecutor,
156 ) -> gpui::Task<anyhow::Result<(PathBuf, Duration)>> {
157 self.replays.replays_to_tar(executor)
158 }
159
160 #[cfg(not(any(all(target_os = "windows", target_env = "gnu"), target_os = "freebsd")))]
161 pub fn open_microphone(voip_parts: VoipParts) -> anyhow::Result<impl Source> {
162 let stream = rodio::microphone::MicrophoneBuilder::new()
163 .default_device()?
164 .default_config()?
165 .prefer_sample_rates([
166 SAMPLE_RATE, // sample rates trivially resamplable to `SAMPLE_RATE`
167 SAMPLE_RATE.saturating_mul(nz!(2)),
168 SAMPLE_RATE.saturating_mul(nz!(3)),
169 SAMPLE_RATE.saturating_mul(nz!(4)),
170 ])
171 .prefer_channel_counts([CHANNEL_COUNT, CHANNEL_COUNT.saturating_mul(nz!(2))])
172 .prefer_buffer_sizes(512..)
173 .open_stream()?;
174 info!("Opened microphone: {:?}", stream.config());
175
176 let (replay, stream) = stream
177 // .suspicious_stereo_to_mono()
178 .constant_params(CHANNEL_COUNT, SAMPLE_RATE)
179 .limit(LimitSettings::live_performance())
180 .process_buffer::<BUFFER_SIZE, _>(move |buffer| {
181 let mut int_buffer: [i16; _] = buffer.map(|s| s.to_sample());
182 if voip_parts
183 .echo_canceller
184 .lock()
185 .process_stream(
186 &mut int_buffer,
187 SAMPLE_RATE.get() as i32,
188 CHANNEL_COUNT.get() as i32,
189 )
190 .context("livekit audio processor error")
191 .log_err()
192 .is_some()
193 {
194 for (sample, processed) in buffer.iter_mut().zip(&int_buffer) {
195 *sample = (*processed).to_sample();
196 }
197 }
198 })
199 .denoise()
200 .context("Could not set up denoiser")?
201 .periodic_access(Duration::from_millis(100), move |denoise| {
202 denoise.set_enabled(LIVE_SETTINGS.denoise.load(Ordering::Relaxed));
203 })
204 .automatic_gain_control(1.0, 2.0, 0.0, 5.0)
205 .periodic_access(Duration::from_millis(100), move |agc_source| {
206 agc_source
207 .set_enabled(LIVE_SETTINGS.auto_microphone_volume.load(Ordering::Relaxed));
208 })
209 .replayable(REPLAY_DURATION)?;
210
211 voip_parts
212 .replays
213 .add_voip_stream("local microphone".to_string(), replay);
214
215 let stream = stream.constant_params(LEGACY_CHANNEL_COUNT, LEGACY_SAMPLE_RATE);
216
217 Ok(stream)
218 }
219
220 pub fn play_voip_stream(
221 source: impl rodio::Source + Send + 'static,
222 speaker_name: String,
223 is_staff: bool,
224 cx: &mut App,
225 ) -> anyhow::Result<()> {
226 let (replay_source, source) = source
227 .constant_params(CHANNEL_COUNT, SAMPLE_RATE)
228 .automatic_gain_control(1.0, 2.0, 0.0, 5.0)
229 .periodic_access(Duration::from_millis(100), move |agc_source| {
230 agc_source.set_enabled(LIVE_SETTINGS.auto_speaker_volume.load(Ordering::Relaxed));
231 })
232 .replayable(REPLAY_DURATION)
233 .expect("REPLAY_DURATION is longer then 100ms");
234
235 cx.update_default_global(|this: &mut Self, _cx| {
236 let output_mixer = this
237 .ensure_output_exists()
238 .context("Could not get output mixer")?;
239 output_mixer.add(source);
240 if is_staff {
241 this.replays.add_voip_stream(speaker_name, replay_source);
242 }
243 Ok(())
244 })
245 }
246
247 pub fn play_sound(sound: Sound, cx: &mut App) {
248 cx.update_default_global(|this: &mut Self, cx| {
249 let source = this.sound_source(sound, cx).log_err()?;
250 let output_mixer = this
251 .ensure_output_exists()
252 .context("Could not get output mixer")
253 .log_err()?;
254
255 output_mixer.add(source);
256 Some(())
257 });
258 }
259
260 pub fn end_call(cx: &mut App) {
261 cx.update_default_global(|this: &mut Self, _cx| {
262 this.output_handle.take();
263 });
264 }
265
266 fn sound_source(&mut self, sound: Sound, cx: &App) -> Result<impl Source + use<>> {
267 if let Some(wav) = self.source_cache.get(&sound) {
268 return Ok(wav.clone());
269 }
270
271 let path = format!("sounds/{}.wav", sound.file());
272 let bytes = cx
273 .asset_source()
274 .load(&path)?
275 .map(anyhow::Ok)
276 .with_context(|| format!("No asset available for path {path}"))??
277 .into_owned();
278 let cursor = Cursor::new(bytes);
279 let source = Decoder::new(cursor)?.buffered();
280
281 self.source_cache.insert(sound, source.clone());
282
283 Ok(source)
284 }
285}
286
287#[cfg(not(any(all(target_os = "windows", target_env = "gnu"), target_os = "freebsd")))]
288pub struct VoipParts {
289 echo_canceller: Arc<Mutex<apm::AudioProcessingModule>>,
290 replays: replays::Replays,
291}
292
293#[cfg(not(any(all(target_os = "windows", target_env = "gnu"), target_os = "freebsd")))]
294impl VoipParts {
295 pub fn new(cx: &AsyncApp) -> anyhow::Result<Self> {
296 let (apm, replays) = cx.try_read_default_global::<Audio, _>(|audio, _| {
297 (Arc::clone(&audio.echo_canceller), audio.replays.clone())
298 })?;
299
300 Ok(Self {
301 echo_canceller: apm,
302 replays,
303 })
304 }
305}