1use anyhow::{Context as _, Result};
2use collections::HashMap;
3use gpui::{App, BackgroundExecutor, BorrowAppContext, Global};
4
5#[cfg(not(any(all(target_os = "windows", target_env = "gnu"), target_os = "freebsd")))]
6mod non_windows_and_freebsd_deps {
7 pub(super) use gpui::AsyncApp;
8 pub(super) use libwebrtc::native::apm;
9 pub(super) use log::info;
10 pub(super) use parking_lot::Mutex;
11 pub(super) use rodio::cpal::Sample;
12 pub(super) use rodio::source::{LimitSettings, UniformSourceIterator};
13 pub(super) use std::sync::Arc;
14}
15
16#[cfg(not(any(all(target_os = "windows", target_env = "gnu"), target_os = "freebsd")))]
17use non_windows_and_freebsd_deps::*;
18
19use rodio::{
20 Decoder, OutputStream, OutputStreamBuilder, Source, mixer::Mixer, nz, source::Buffered,
21};
22use settings::Settings;
23use std::{io::Cursor, num::NonZero, path::PathBuf, sync::atomic::Ordering, time::Duration};
24use util::ResultExt;
25
26mod audio_settings;
27mod replays;
28mod rodio_ext;
29pub use audio_settings::AudioSettings;
30pub use rodio_ext::RodioExt;
31
32use crate::audio_settings::LIVE_SETTINGS;
33
34// NOTE: We used to use WebRTC's mixer which only supported
35// 16kHz, 32kHz and 48kHz. As 48 is the most common "next step up"
36// for audio output devices like speakers/bluetooth, we just hard-code
37// this; and downsample when we need to.
38//
39// Since most noise cancelling requires 16kHz we will move to
40// that in the future.
41pub const SAMPLE_RATE: NonZero<u32> = nz!(48000);
42pub const CHANNEL_COUNT: NonZero<u16> = nz!(2);
43pub const BUFFER_SIZE: usize = // echo canceller and livekit want 10ms of audio
44 (SAMPLE_RATE.get() as usize / 100) * CHANNEL_COUNT.get() as usize;
45
46pub const REPLAY_DURATION: Duration = Duration::from_secs(30);
47
48pub fn init(cx: &mut App) {
49 AudioSettings::register(cx);
50 LIVE_SETTINGS.initialize(cx);
51}
52
53#[derive(Debug, Copy, Clone, Eq, Hash, PartialEq)]
54pub enum Sound {
55 Joined,
56 Leave,
57 Mute,
58 Unmute,
59 StartScreenshare,
60 StopScreenshare,
61 AgentDone,
62}
63
64impl Sound {
65 fn file(&self) -> &'static str {
66 match self {
67 Self::Joined => "joined_call",
68 Self::Leave => "leave_call",
69 Self::Mute => "mute",
70 Self::Unmute => "unmute",
71 Self::StartScreenshare => "start_screenshare",
72 Self::StopScreenshare => "stop_screenshare",
73 Self::AgentDone => "agent_done",
74 }
75 }
76}
77
78pub struct Audio {
79 output_handle: Option<OutputStream>,
80 output_mixer: Option<Mixer>,
81 #[cfg(not(any(all(target_os = "windows", target_env = "gnu"), target_os = "freebsd")))]
82 pub echo_canceller: Arc<Mutex<apm::AudioProcessingModule>>,
83 source_cache: HashMap<Sound, Buffered<Decoder<Cursor<Vec<u8>>>>>,
84 replays: replays::Replays,
85}
86
87impl Default for Audio {
88 fn default() -> Self {
89 Self {
90 output_handle: Default::default(),
91 output_mixer: Default::default(),
92 #[cfg(not(any(
93 all(target_os = "windows", target_env = "gnu"),
94 target_os = "freebsd"
95 )))]
96 echo_canceller: Arc::new(Mutex::new(apm::AudioProcessingModule::new(
97 true, false, false, false,
98 ))),
99 source_cache: Default::default(),
100 replays: Default::default(),
101 }
102 }
103}
104
105impl Global for Audio {}
106
107impl Audio {
108 fn ensure_output_exists(&mut self) -> Result<&Mixer> {
109 if self.output_handle.is_none() {
110 self.output_handle = Some(
111 OutputStreamBuilder::open_default_stream()
112 .context("Could not open default output stream")?,
113 );
114 if let Some(output_handle) = &self.output_handle {
115 let (mixer, source) = rodio::mixer::mixer(CHANNEL_COUNT, SAMPLE_RATE);
116 // or the mixer will end immediately as its empty.
117 mixer.add(rodio::source::Zero::new(CHANNEL_COUNT, SAMPLE_RATE));
118 self.output_mixer = Some(mixer);
119
120 // The webrtc apm is not yet compiling for windows & freebsd
121 #[cfg(not(any(
122 any(all(target_os = "windows", target_env = "gnu")),
123 target_os = "freebsd"
124 )))]
125 let echo_canceller = Arc::clone(&self.echo_canceller);
126 #[cfg(not(any(
127 any(all(target_os = "windows", target_env = "gnu")),
128 target_os = "freebsd"
129 )))]
130 let source = source.inspect_buffer::<BUFFER_SIZE, _>(move |buffer| {
131 let mut buf: [i16; _] = buffer.map(|s| s.to_sample());
132 echo_canceller
133 .lock()
134 .process_reverse_stream(
135 &mut buf,
136 SAMPLE_RATE.get() as i32,
137 CHANNEL_COUNT.get().into(),
138 )
139 .expect("Audio input and output threads should not panic");
140 });
141 output_handle.mixer().add(source);
142 }
143 }
144
145 Ok(self
146 .output_mixer
147 .as_ref()
148 .expect("we only get here if opening the outputstream succeeded"))
149 }
150
151 pub fn save_replays(
152 &self,
153 executor: BackgroundExecutor,
154 ) -> gpui::Task<anyhow::Result<(PathBuf, Duration)>> {
155 self.replays.replays_to_tar(executor)
156 }
157
158 #[cfg(not(any(all(target_os = "windows", target_env = "gnu"), target_os = "freebsd")))]
159 pub fn open_microphone(voip_parts: VoipParts) -> anyhow::Result<impl Source> {
160 let stream = rodio::microphone::MicrophoneBuilder::new()
161 .default_device()?
162 .default_config()?
163 .prefer_sample_rates([SAMPLE_RATE, SAMPLE_RATE.saturating_mul(nz!(2))])
164 .prefer_channel_counts([nz!(1), nz!(2)])
165 .prefer_buffer_sizes(512..)
166 .open_stream()?;
167 info!("Opened microphone: {:?}", stream.config());
168
169 let (replay, stream) = stream
170 // .suspicious_stereo_to_mono()
171 .constant_params(CHANNEL_COUNT, SAMPLE_RATE)
172 .limit(LimitSettings::live_performance())
173 .process_buffer::<BUFFER_SIZE, _>(move |buffer| {
174 let mut int_buffer: [i16; _] = buffer.map(|s| s.to_sample());
175 if voip_parts
176 .echo_canceller
177 .lock()
178 .process_stream(
179 &mut int_buffer,
180 SAMPLE_RATE.get() as i32,
181 CHANNEL_COUNT.get() as i32,
182 )
183 .context("livekit audio processor error")
184 .log_err()
185 .is_some()
186 {
187 for (sample, processed) in buffer.iter_mut().zip(&int_buffer) {
188 *sample = (*processed).to_sample();
189 }
190 }
191 })
192 .automatic_gain_control(1.0, 4.0, 0.0, 5.0)
193 .periodic_access(Duration::from_millis(100), move |agc_source| {
194 agc_source.set_enabled(LIVE_SETTINGS.control_input_volume.load(Ordering::Relaxed));
195 })
196 .replayable(REPLAY_DURATION)?;
197
198 voip_parts
199 .replays
200 .add_voip_stream("local microphone".to_string(), replay);
201 Ok(stream)
202 }
203
204 pub fn play_voip_stream(
205 source: impl rodio::Source + Send + 'static,
206 speaker_name: String,
207 is_staff: bool,
208 cx: &mut App,
209 ) -> anyhow::Result<()> {
210 let (replay_source, source) = source
211 .automatic_gain_control(1.0, 4.0, 0.0, 5.0)
212 .periodic_access(Duration::from_millis(100), move |agc_source| {
213 agc_source.set_enabled(LIVE_SETTINGS.control_input_volume.load(Ordering::Relaxed));
214 })
215 .replayable(REPLAY_DURATION)
216 .expect("REPLAY_DURATION is longer than 100ms");
217
218 cx.update_default_global(|this: &mut Self, _cx| {
219 let output_mixer = this
220 .ensure_output_exists()
221 .context("Could not get output mixer")?;
222 output_mixer.add(source);
223 if is_staff {
224 this.replays.add_voip_stream(speaker_name, replay_source);
225 }
226 Ok(())
227 })
228 }
229
230 pub fn play_sound(sound: Sound, cx: &mut App) {
231 cx.update_default_global(|this: &mut Self, cx| {
232 let source = this.sound_source(sound, cx).log_err()?;
233 let output_mixer = this
234 .ensure_output_exists()
235 .context("Could not get output mixer")
236 .log_err()?;
237
238 output_mixer.add(source);
239 Some(())
240 });
241 }
242
243 pub fn end_call(cx: &mut App) {
244 cx.update_default_global(|this: &mut Self, _cx| {
245 this.output_handle.take();
246 });
247 }
248
249 fn sound_source(&mut self, sound: Sound, cx: &App) -> Result<impl Source + use<>> {
250 if let Some(wav) = self.source_cache.get(&sound) {
251 return Ok(wav.clone());
252 }
253
254 let path = format!("sounds/{}.wav", sound.file());
255 let bytes = cx
256 .asset_source()
257 .load(&path)?
258 .map(anyhow::Ok)
259 .with_context(|| format!("No asset available for path {path}"))??
260 .into_owned();
261 let cursor = Cursor::new(bytes);
262 let source = Decoder::new(cursor)?.buffered();
263
264 self.source_cache.insert(sound, source.clone());
265
266 Ok(source)
267 }
268}
269
270#[cfg(not(any(all(target_os = "windows", target_env = "gnu"), target_os = "freebsd")))]
271pub struct VoipParts {
272 echo_canceller: Arc<Mutex<apm::AudioProcessingModule>>,
273 replays: replays::Replays,
274}
275
276#[cfg(not(any(all(target_os = "windows", target_env = "gnu"), target_os = "freebsd")))]
277impl VoipParts {
278 pub fn new(cx: &AsyncApp) -> anyhow::Result<Self> {
279 let (apm, replays) = cx.try_read_default_global::<Audio, _>(|audio, _| {
280 (Arc::clone(&audio.echo_canceller), audio.replays.clone())
281 })?;
282
283 Ok(Self {
284 echo_canceller: apm,
285 replays,
286 })
287 }
288}