1use anyhow::{Context as _, Result};
2use collections::HashMap;
3use gpui::{App, AsyncApp, BackgroundExecutor, BorrowAppContext, Global};
4use libwebrtc::native::apm;
5use log::info;
6use parking_lot::Mutex;
7use rodio::{
8 Decoder, OutputStream, OutputStreamBuilder, Source,
9 cpal::Sample,
10 mixer::Mixer,
11 nz,
12 source::{Buffered, LimitSettings, UniformSourceIterator},
13};
14use settings::Settings;
15use std::{
16 io::Cursor,
17 num::NonZero,
18 path::PathBuf,
19 sync::{Arc, atomic::Ordering},
20 time::Duration,
21};
22use util::ResultExt;
23
24mod audio_settings;
25mod replays;
26mod rodio_ext;
27pub use audio_settings::AudioSettings;
28pub use rodio_ext::RodioExt;
29
30use crate::audio_settings::LIVE_SETTINGS;
31
32// NOTE: We used to use WebRTC's mixer which only supported
33// 16kHz, 32kHz and 48kHz. As 48 is the most common "next step up"
34// for audio output devices like speakers/bluetooth, we just hard-code
35// this; and downsample when we need to.
36//
37// Since most noise cancelling requires 16kHz we will move to
38// that in the future.
39pub const SAMPLE_RATE: NonZero<u32> = nz!(48000);
40pub const CHANNEL_COUNT: NonZero<u16> = nz!(2);
41pub const BUFFER_SIZE: usize = // echo canceller and livekit want 10ms of audio
42 (SAMPLE_RATE.get() as usize / 100) * CHANNEL_COUNT.get() as usize;
43
44pub const REPLAY_DURATION: Duration = Duration::from_secs(30);
45
46pub fn init(cx: &mut App) {
47 AudioSettings::register(cx);
48 LIVE_SETTINGS.initialize(cx);
49}
50
51#[derive(Debug, Copy, Clone, Eq, Hash, PartialEq)]
52pub enum Sound {
53 Joined,
54 Leave,
55 Mute,
56 Unmute,
57 StartScreenshare,
58 StopScreenshare,
59 AgentDone,
60}
61
62impl Sound {
63 fn file(&self) -> &'static str {
64 match self {
65 Self::Joined => "joined_call",
66 Self::Leave => "leave_call",
67 Self::Mute => "mute",
68 Self::Unmute => "unmute",
69 Self::StartScreenshare => "start_screenshare",
70 Self::StopScreenshare => "stop_screenshare",
71 Self::AgentDone => "agent_done",
72 }
73 }
74}
75
76pub struct Audio {
77 output_handle: Option<OutputStream>,
78 output_mixer: Option<Mixer>,
79 pub echo_canceller: Arc<Mutex<apm::AudioProcessingModule>>,
80 source_cache: HashMap<Sound, Buffered<Decoder<Cursor<Vec<u8>>>>>,
81 replays: replays::Replays,
82}
83
84impl Default for Audio {
85 fn default() -> Self {
86 Self {
87 output_handle: Default::default(),
88 output_mixer: Default::default(),
89 echo_canceller: Arc::new(Mutex::new(apm::AudioProcessingModule::new(
90 true, false, false, false,
91 ))),
92 source_cache: Default::default(),
93 replays: Default::default(),
94 }
95 }
96}
97
98impl Global for Audio {}
99
100impl Audio {
101 fn ensure_output_exists(&mut self) -> Result<&Mixer> {
102 if self.output_handle.is_none() {
103 self.output_handle = Some(
104 OutputStreamBuilder::open_default_stream()
105 .context("Could not open default output stream")?,
106 );
107 if let Some(output_handle) = &self.output_handle {
108 let (mixer, source) = rodio::mixer::mixer(CHANNEL_COUNT, SAMPLE_RATE);
109 // or the mixer will end immediately as its empty.
110 mixer.add(rodio::source::Zero::new(CHANNEL_COUNT, SAMPLE_RATE));
111 self.output_mixer = Some(mixer);
112
113 let echo_canceller = Arc::clone(&self.echo_canceller);
114 let source = source.inspect_buffer::<BUFFER_SIZE, _>(move |buffer| {
115 let mut buf: [i16; _] = buffer.map(|s| s.to_sample());
116 echo_canceller
117 .lock()
118 .process_reverse_stream(
119 &mut buf,
120 SAMPLE_RATE.get() as i32,
121 CHANNEL_COUNT.get().into(),
122 )
123 .expect("Audio input and output threads should not panic");
124 });
125 output_handle.mixer().add(source);
126 }
127 }
128
129 Ok(self
130 .output_mixer
131 .as_ref()
132 .expect("we only get here if opening the outputstream succeeded"))
133 }
134
135 pub fn save_replays(
136 &self,
137 executor: BackgroundExecutor,
138 ) -> gpui::Task<anyhow::Result<(PathBuf, Duration)>> {
139 self.replays.replays_to_tar(executor)
140 }
141
142 pub fn open_microphone(voip_parts: VoipParts) -> anyhow::Result<impl Source> {
143 let stream = rodio::microphone::MicrophoneBuilder::new()
144 .default_device()?
145 .default_config()?
146 .prefer_sample_rates([SAMPLE_RATE, SAMPLE_RATE.saturating_mul(nz!(2))])
147 .prefer_channel_counts([nz!(1), nz!(2)])
148 .prefer_buffer_sizes(512..)
149 .open_stream()?;
150 info!("Opened microphone: {:?}", stream.config());
151
152 let (replay, stream) = UniformSourceIterator::new(stream, CHANNEL_COUNT, SAMPLE_RATE)
153 .limit(LimitSettings::live_performance())
154 .process_buffer::<BUFFER_SIZE, _>(move |buffer| {
155 let mut int_buffer: [i16; _] = buffer.map(|s| s.to_sample());
156 if voip_parts
157 .echo_canceller
158 .lock()
159 .process_stream(
160 &mut int_buffer,
161 SAMPLE_RATE.get() as i32,
162 CHANNEL_COUNT.get() as i32,
163 )
164 .context("livekit audio processor error")
165 .log_err()
166 .is_some()
167 {
168 for (sample, processed) in buffer.iter_mut().zip(&int_buffer) {
169 *sample = (*processed).to_sample();
170 }
171 }
172 })
173 .automatic_gain_control(1.0, 4.0, 0.0, 5.0)
174 .periodic_access(Duration::from_millis(100), move |agc_source| {
175 agc_source.set_enabled(LIVE_SETTINGS.control_input_volume.load(Ordering::Relaxed));
176 })
177 .replayable(REPLAY_DURATION)
178 .expect("REPLAY_DURATION is longer then 100ms");
179
180 voip_parts
181 .replays
182 .add_voip_stream("local microphone".to_string(), replay);
183 Ok(stream)
184 }
185
186 pub fn play_voip_stream(
187 source: impl rodio::Source + Send + 'static,
188 speaker_name: String,
189 is_staff: bool,
190 cx: &mut App,
191 ) -> anyhow::Result<()> {
192 let (replay_source, source) = source
193 .automatic_gain_control(1.0, 4.0, 0.0, 5.0)
194 .periodic_access(Duration::from_millis(100), move |agc_source| {
195 agc_source.set_enabled(LIVE_SETTINGS.control_input_volume.load(Ordering::Relaxed));
196 })
197 .replayable(REPLAY_DURATION)
198 .expect("REPLAY_DURATION is longer then 100ms");
199
200 cx.update_default_global(|this: &mut Self, _cx| {
201 let output_mixer = this
202 .ensure_output_exists()
203 .context("Could not get output mixer")?;
204 output_mixer.add(source);
205 if is_staff {
206 this.replays.add_voip_stream(speaker_name, replay_source);
207 }
208 Ok(())
209 })
210 }
211
212 pub fn play_sound(sound: Sound, cx: &mut App) {
213 cx.update_default_global(|this: &mut Self, cx| {
214 let source = this.sound_source(sound, cx).log_err()?;
215 let output_mixer = this
216 .ensure_output_exists()
217 .context("Could not get output mixer")
218 .log_err()?;
219
220 output_mixer.add(source);
221 Some(())
222 });
223 }
224
225 pub fn end_call(cx: &mut App) {
226 cx.update_default_global(|this: &mut Self, _cx| {
227 this.output_handle.take();
228 });
229 }
230
231 fn sound_source(&mut self, sound: Sound, cx: &App) -> Result<impl Source + use<>> {
232 if let Some(wav) = self.source_cache.get(&sound) {
233 return Ok(wav.clone());
234 }
235
236 let path = format!("sounds/{}.wav", sound.file());
237 let bytes = cx
238 .asset_source()
239 .load(&path)?
240 .map(anyhow::Ok)
241 .with_context(|| format!("No asset available for path {path}"))??
242 .into_owned();
243 let cursor = Cursor::new(bytes);
244 let source = Decoder::new(cursor)?.buffered();
245
246 self.source_cache.insert(sound, source.clone());
247
248 Ok(source)
249 }
250}
251
252pub struct VoipParts {
253 echo_canceller: Arc<Mutex<apm::AudioProcessingModule>>,
254 replays: replays::Replays,
255}
256
257impl VoipParts {
258 pub fn new(cx: &AsyncApp) -> anyhow::Result<Self> {
259 let (apm, replays) = cx.try_read_default_global::<Audio, _>(|audio, _| {
260 (Arc::clone(&audio.echo_canceller), audio.replays.clone())
261 })?;
262
263 Ok(Self {
264 echo_canceller: apm,
265 replays,
266 })
267 }
268}