1use anyhow::{Context as _, Result};
2
3use audio::{AudioSettings, CHANNEL_COUNT, SAMPLE_RATE};
4use cpal::DeviceId;
5use cpal::traits::{DeviceTrait, StreamTrait as _};
6use futures::channel::mpsc::Sender;
7use futures::{Stream, StreamExt as _};
8use gpui::{
9 AsyncApp, BackgroundExecutor, Priority, ScreenCaptureFrame, ScreenCaptureSource,
10 ScreenCaptureStream, Task,
11};
12use libwebrtc::native::{apm, audio_mixer, audio_resampler};
13use livekit::track;
14
15use livekit::webrtc::{
16 audio_frame::AudioFrame,
17 audio_source::{AudioSourceOptions, RtcAudioSource, native::NativeAudioSource},
18 audio_stream::native::NativeAudioStream,
19 video_frame::{VideoBuffer, VideoFrame, VideoRotation},
20 video_source::{RtcVideoSource, VideoResolution, native::NativeVideoSource},
21 video_stream::native::NativeVideoStream,
22};
23use log::info;
24use parking_lot::Mutex;
25use rodio::Source;
26use rodio::conversions::SampleTypeConverter;
27use rodio::source::{AutomaticGainControlSettings, LimitSettings};
28use serde::{Deserialize, Serialize};
29use settings::Settings;
30use std::cell::RefCell;
31use std::sync::Weak;
32use std::sync::atomic::{AtomicI32, AtomicU64, Ordering};
33use std::time::{Duration, Instant};
34use std::{borrow::Cow, collections::VecDeque, sync::Arc};
35use util::{ResultExt as _, maybe};
36
37struct TimestampedFrame {
38 frame: AudioFrame<'static>,
39 captured_at: Instant,
40}
41
42pub(crate) struct AudioStack {
43 executor: BackgroundExecutor,
44 apm: Arc<Mutex<apm::AudioProcessingModule>>,
45 mixer: Arc<Mutex<audio_mixer::AudioMixer>>,
46 _output_task: RefCell<Weak<Task<()>>>,
47 next_ssrc: AtomicI32,
48}
49
50impl AudioStack {
51 pub(crate) fn new(executor: BackgroundExecutor) -> Self {
52 let apm = Arc::new(Mutex::new(apm::AudioProcessingModule::new(
53 true, true, true, true,
54 )));
55 let mixer = Arc::new(Mutex::new(audio_mixer::AudioMixer::new()));
56 Self {
57 executor,
58 apm,
59 mixer,
60 _output_task: RefCell::new(Weak::new()),
61 next_ssrc: AtomicI32::new(1),
62 }
63 }
64
65 pub(crate) fn play_remote_audio_track(
66 &self,
67 track: &livekit::track::RemoteAudioTrack,
68 output_audio_device: Option<DeviceId>,
69 ) -> AudioStream {
70 let output_task = self.start_output(output_audio_device);
71
72 let next_ssrc = self.next_ssrc.fetch_add(1, Ordering::Relaxed);
73 let source = AudioMixerSource {
74 ssrc: next_ssrc,
75 sample_rate: SAMPLE_RATE.get(),
76 num_channels: CHANNEL_COUNT.get() as u32,
77 buffer: Arc::default(),
78 };
79 self.mixer.lock().add_source(source.clone());
80
81 let mut stream = NativeAudioStream::new(
82 track.rtc_track(),
83 source.sample_rate as i32,
84 source.num_channels as i32,
85 );
86
87 let receive_task = self.executor.spawn_with_priority(Priority::RealtimeAudio, {
88 let source = source.clone();
89 async move {
90 while let Some(frame) = stream.next().await {
91 source.receive(frame);
92 }
93 }
94 });
95
96 let mixer = self.mixer.clone();
97 let on_drop = util::defer(move || {
98 mixer.lock().remove_source(source.ssrc);
99 drop(receive_task);
100 drop(output_task);
101 });
102
103 AudioStream::Output {
104 _drop: Box::new(on_drop),
105 }
106 }
107
108 fn start_output(&self, output_audio_device: Option<DeviceId>) -> Arc<Task<()>> {
109 if let Some(task) = self._output_task.borrow().upgrade() {
110 return task;
111 }
112 let task = Arc::new(self.executor.spawn({
113 let apm = self.apm.clone();
114 let mixer = self.mixer.clone();
115 let executor = self.executor.clone();
116 async move {
117 Self::play_output(
118 executor,
119 apm,
120 mixer,
121 SAMPLE_RATE.get(),
122 CHANNEL_COUNT.get().into(),
123 output_audio_device,
124 )
125 .await
126 .log_err();
127 }
128 }));
129 *self._output_task.borrow_mut() = Arc::downgrade(&task);
130 task
131 }
132
133 pub(crate) fn capture_local_microphone_track(
134 &self,
135 user_name: String,
136 is_staff: bool,
137 cx: &AsyncApp,
138 ) -> Result<(crate::LocalAudioTrack, AudioStream, Arc<AtomicU64>)> {
139 let source = NativeAudioSource::new(
140 // n.b. this struct's options are always ignored, noise cancellation is provided by apm.
141 AudioSourceOptions::default(),
142 SAMPLE_RATE.get(),
143 CHANNEL_COUNT.get().into(),
144 10,
145 );
146
147 let speaker = Speaker {
148 name: user_name,
149 is_staff,
150 };
151 log::info!("Microphone speaker: {speaker:?}");
152 let track_name = serde_urlencoded::to_string(speaker)
153 .context("Could not encode user information in track name")?;
154
155 let track = track::LocalAudioTrack::create_audio_track(
156 &track_name,
157 RtcAudioSource::Native(source.clone()),
158 );
159
160 let apm = self.apm.clone();
161
162 let input_lag_us = Arc::new(AtomicU64::new(0));
163 let (frame_tx, mut frame_rx) = futures::channel::mpsc::channel::<TimestampedFrame>(1);
164 let transmit_task = self.executor.spawn_with_priority(Priority::RealtimeAudio, {
165 let input_lag_us = input_lag_us.clone();
166 async move {
167 while let Some(timestamped) = frame_rx.next().await {
168 let lag = timestamped.captured_at.elapsed();
169 input_lag_us.store(lag.as_micros() as u64, Ordering::Relaxed);
170 source.capture_frame(×tamped.frame).await.log_err();
171 }
172 }
173 });
174 let capture_task = {
175 let input_audio_device =
176 AudioSettings::try_read_global(cx, |settings| settings.input_audio_device.clone())
177 .flatten();
178 let executor = self.executor.clone();
179 self.executor.spawn(async move {
180 Self::capture_input(
181 executor,
182 apm,
183 frame_tx,
184 SAMPLE_RATE.get(), // TODO(audio): was legacy removed for now
185 CHANNEL_COUNT.get().into(),
186 input_audio_device,
187 )
188 .await
189 })
190 };
191
192 let on_drop = util::defer(|| {
193 drop(transmit_task);
194 drop(capture_task);
195 });
196 Ok((
197 super::LocalAudioTrack(track),
198 AudioStream::Output {
199 _drop: Box::new(on_drop),
200 },
201 input_lag_us,
202 ))
203 }
204
205 async fn play_output(
206 executor: BackgroundExecutor,
207 apm: Arc<Mutex<apm::AudioProcessingModule>>,
208 mixer: Arc<Mutex<audio_mixer::AudioMixer>>,
209 sample_rate: u32,
210 _num_channels: u32,
211 output_audio_device: Option<DeviceId>,
212 ) -> Result<()> {
213 // Prevent App Nap from throttling audio playback on macOS.
214 // This guard is held for the entire duration of audio output.
215 #[cfg(target_os = "macos")]
216 let _prevent_app_nap = PreventAppNapGuard::new();
217
218 loop {
219 let mut device_change_listener = DeviceChangeListener::new(false)?;
220 let (output_device, output_config) =
221 crate::default_device(false, output_audio_device.as_ref())?;
222 info!("Output config: {output_config:?}");
223 let (end_on_drop_tx, end_on_drop_rx) = std::sync::mpsc::channel::<()>();
224 let mixer = mixer.clone();
225 let apm = apm.clone();
226 let mut resampler = audio_resampler::AudioResampler::default();
227 let mut buf = Vec::new();
228
229 executor
230 .spawn_with_priority(Priority::RealtimeAudio, async move {
231 let output_stream = output_device.build_output_stream(
232 &output_config.config(),
233 {
234 move |mut data, _info| {
235 while data.len() > 0 {
236 if data.len() <= buf.len() {
237 let rest = buf.split_off(data.len());
238 data.copy_from_slice(&buf);
239 buf = rest;
240 return;
241 }
242 if buf.len() > 0 {
243 let (prefix, suffix) = data.split_at_mut(buf.len());
244 prefix.copy_from_slice(&buf);
245 data = suffix;
246 }
247
248 let mut mixer = mixer.lock();
249 let mixed = mixer.mix(output_config.channels() as usize);
250 let sampled = resampler.remix_and_resample(
251 mixed,
252 sample_rate / 100,
253 // We need to assume output number of channels as otherwise we will
254 // crash in process_reverse_stream otherwise as livekit's audio resampler
255 // does not seem to support non-matching channel counts.
256 // NOTE: you can verify this by debug printing buf.len() after this stage.
257 // For 2->4 channel upmix, we should see buf.len=1920, buf we get only 960.
258 output_config.channels() as u32,
259 sample_rate,
260 output_config.channels() as u32,
261 output_config.sample_rate(),
262 );
263 buf = sampled.to_vec();
264 apm.lock()
265 .process_reverse_stream(
266 &mut buf,
267 output_config.sample_rate() as i32,
268 output_config.channels() as i32,
269 )
270 .ok();
271 }
272 }
273 },
274 |error| log::error!("error playing audio track: {:?}", error),
275 Some(Duration::from_millis(100)),
276 );
277
278 let Some(output_stream) = output_stream.log_err() else {
279 return;
280 };
281
282 output_stream.play().log_err();
283 // Block forever to keep the output stream alive
284 end_on_drop_rx.recv().ok();
285 })
286 .detach();
287
288 device_change_listener.next().await;
289 drop(end_on_drop_tx)
290 }
291 }
292
293 async fn capture_input(
294 executor: BackgroundExecutor,
295 apm: Arc<Mutex<apm::AudioProcessingModule>>,
296 frame_tx: Sender<TimestampedFrame>,
297 sample_rate: u32,
298 num_channels: u32,
299 input_audio_device: Option<DeviceId>,
300 ) -> Result<()> {
301 loop {
302 let mut device_change_listener = DeviceChangeListener::new(true)?;
303 let (device, config) = crate::default_device(true, input_audio_device.as_ref())?;
304 let (end_on_drop_tx, end_on_drop_rx) = std::sync::mpsc::channel::<()>();
305 let apm = apm.clone();
306 let mut frame_tx = frame_tx.clone();
307 let mut resampler = audio_resampler::AudioResampler::default();
308
309 executor
310 .spawn_with_priority(Priority::RealtimeAudio, async move {
311 maybe!({
312 if let Some(desc) = device.description().ok() {
313 log::info!("Using microphone: {}", desc.name())
314 } else {
315 log::info!("Using microphone: <unknown>");
316 }
317
318 let ten_ms_buffer_size =
319 (config.channels() as u32 * config.sample_rate() / 100) as usize;
320 let mut buf: Vec<i16> = Vec::with_capacity(ten_ms_buffer_size);
321 let mut rodio_effects = RodioEffectsAdaptor::new(buf.len())
322 .automatic_gain_control(AutomaticGainControlSettings {
323 target_level: 0.50,
324 attack_time: Duration::from_secs(1),
325 release_time: Duration::from_secs(0),
326 absolute_max_gain: 5.0,
327 })
328 .limit(LimitSettings::live_performance());
329
330 let stream = device
331 .build_input_stream_raw(
332 &config.config(),
333 config.sample_format(),
334 move |data, _: &_| {
335 let captured_at = Instant::now();
336 let data = crate::get_sample_data(config.sample_format(), data)
337 .log_err();
338 let Some(data) = data else {
339 return;
340 };
341 let mut data = data.as_slice();
342
343 while data.len() > 0 {
344 let remainder =
345 (buf.capacity() - buf.len()).min(data.len());
346 buf.extend_from_slice(&data[..remainder]);
347 data = &data[remainder..];
348
349 if buf.capacity() == buf.len() {
350 let mut sampled = resampler
351 .remix_and_resample(
352 buf.as_slice(),
353 config.sample_rate() / 100,
354 config.channels() as u32,
355 config.sample_rate(),
356 num_channels,
357 sample_rate,
358 )
359 .to_owned();
360
361 if audio::LIVE_SETTINGS
362 .auto_microphone_volume
363 .load(Ordering::Relaxed)
364 {
365 rodio_effects
366 .inner_mut()
367 .inner_mut()
368 .fill_buffer_with(&sampled);
369 sampled.clear();
370 sampled.extend(SampleTypeConverter::<_, i16>::new(
371 rodio_effects.by_ref(),
372 ));
373 }
374
375 apm.lock()
376 .process_stream(
377 &mut sampled,
378 sample_rate as i32,
379 num_channels as i32,
380 )
381 .log_err();
382 buf.clear();
383
384 frame_tx
385 .try_send(TimestampedFrame {
386 frame: AudioFrame {
387 data: Cow::Owned(sampled),
388 sample_rate,
389 num_channels,
390 samples_per_channel: sample_rate / 100,
391 },
392 captured_at,
393 })
394 .ok();
395 }
396 }
397 },
398 |err| log::error!("error capturing audio track: {:?}", err),
399 Some(Duration::from_millis(100)),
400 )
401 .context("failed to build input stream")?;
402
403 stream.play()?;
404 // Keep the thread alive and holding onto the `stream`
405 end_on_drop_rx.recv().ok();
406 anyhow::Ok(Some(()))
407 })
408 .log_err();
409 })
410 .detach();
411
412 device_change_listener.next().await;
413 drop(end_on_drop_tx)
414 }
415 }
416}
417
418/// This allows using of Rodio's effects library within our home brewn audio
419/// pipeline. The alternative would be inlining Rodio's effects which is
420/// problematic from a legal stance. We would then have to make clear that code
421/// is not owned by zed-industries while the code would be surrounded by
422/// zed-industries owned code.
423///
424/// This adaptor does incur a slight performance penalty (copying into a
425/// pre-allocated vec and back) however the impact will be immeasurably low.
426///
427/// There is no latency impact.
428pub struct RodioEffectsAdaptor {
429 input: Vec<rodio::Sample>,
430 pos: usize,
431}
432
433impl RodioEffectsAdaptor {
434 // This implementation incorrect terminology confusing everyone. A normal
435 // audio frame consists of all samples for one moment in time (one for mono,
436 // two for stereo). Here a frame of audio refers to a 10ms buffer of samples.
437 fn new(samples_per_frame: usize) -> Self {
438 Self {
439 input: Vec::with_capacity(samples_per_frame),
440 pos: 0,
441 }
442 }
443
444 fn fill_buffer_with(&mut self, integer_samples: &[i16]) {
445 self.input.clear();
446 self.input.extend(SampleTypeConverter::<_, f32>::new(
447 integer_samples.iter().copied(),
448 ));
449 self.pos = 0;
450 }
451}
452
453impl Iterator for RodioEffectsAdaptor {
454 type Item = rodio::Sample;
455
456 fn next(&mut self) -> Option<Self::Item> {
457 let sample = self.input.get(self.pos)?;
458 self.pos += 1;
459 Some(*sample)
460 }
461}
462
463impl rodio::Source for RodioEffectsAdaptor {
464 fn current_span_len(&self) -> Option<usize> {
465 None
466 }
467
468 fn channels(&self) -> rodio::ChannelCount {
469 rodio::nz!(2)
470 }
471
472 fn sample_rate(&self) -> rodio::SampleRate {
473 rodio::nz!(48000)
474 }
475
476 fn total_duration(&self) -> Option<Duration> {
477 None
478 }
479}
480
481#[derive(Serialize, Deserialize, Debug)]
482pub struct Speaker {
483 pub name: String,
484 pub is_staff: bool,
485}
486
487use super::LocalVideoTrack;
488
489pub enum AudioStream {
490 Input { _task: Task<()> },
491 Output { _drop: Box<dyn std::any::Any> },
492}
493
494pub(crate) async fn capture_local_video_track(
495 capture_source: &dyn ScreenCaptureSource,
496 cx: &mut gpui::AsyncApp,
497) -> Result<(crate::LocalVideoTrack, Box<dyn ScreenCaptureStream>)> {
498 let metadata = capture_source.metadata()?;
499 let track_source = gpui_tokio::Tokio::spawn(cx, async move {
500 NativeVideoSource::new(
501 VideoResolution {
502 width: metadata.resolution.width.0 as u32,
503 height: metadata.resolution.height.0 as u32,
504 },
505 true,
506 )
507 })
508 .await?;
509
510 let capture_stream = capture_source
511 .stream(cx.foreground_executor(), {
512 let track_source = track_source.clone();
513 Box::new(move |frame| {
514 if let Some(buffer) = video_frame_buffer_to_webrtc(frame) {
515 track_source.capture_frame(&VideoFrame {
516 rotation: VideoRotation::VideoRotation0,
517 timestamp_us: 0,
518 buffer,
519 });
520 }
521 })
522 })
523 .await??;
524
525 Ok((
526 LocalVideoTrack(track::LocalVideoTrack::create_video_track(
527 "screen share",
528 RtcVideoSource::Native(track_source),
529 )),
530 capture_stream,
531 ))
532}
533
534#[derive(Clone)]
535struct AudioMixerSource {
536 ssrc: i32,
537 sample_rate: u32,
538 num_channels: u32,
539 buffer: Arc<Mutex<VecDeque<Vec<i16>>>>,
540}
541
542impl AudioMixerSource {
543 fn receive(&self, frame: AudioFrame) {
544 assert_eq!(
545 frame.data.len() as u32,
546 self.sample_rate * self.num_channels / 100
547 );
548
549 let mut buffer = self.buffer.lock();
550 buffer.push_back(frame.data.to_vec());
551 while buffer.len() > 10 {
552 buffer.pop_front();
553 }
554 }
555}
556
557impl libwebrtc::native::audio_mixer::AudioMixerSource for AudioMixerSource {
558 fn ssrc(&self) -> i32 {
559 self.ssrc
560 }
561
562 fn preferred_sample_rate(&self) -> u32 {
563 self.sample_rate
564 }
565
566 fn get_audio_frame_with_info<'a>(&self, target_sample_rate: u32) -> Option<AudioFrame<'_>> {
567 assert_eq!(self.sample_rate, target_sample_rate);
568 let buf = self.buffer.lock().pop_front()?;
569 Some(AudioFrame {
570 data: Cow::Owned(buf),
571 sample_rate: self.sample_rate,
572 num_channels: self.num_channels,
573 samples_per_channel: self.sample_rate / 100,
574 })
575 }
576}
577
578pub fn play_remote_video_track(
579 track: &crate::RemoteVideoTrack,
580 executor: &BackgroundExecutor,
581) -> impl Stream<Item = RemoteVideoFrame> + use<> {
582 #[cfg(target_os = "macos")]
583 {
584 _ = executor;
585 let mut pool = None;
586 let most_recent_frame_size = (0, 0);
587 NativeVideoStream::new(track.0.rtc_track()).filter_map(move |frame| {
588 if pool == None
589 || most_recent_frame_size != (frame.buffer.width(), frame.buffer.height())
590 {
591 pool = create_buffer_pool(frame.buffer.width(), frame.buffer.height()).log_err();
592 }
593 let pool = pool.clone();
594 async move {
595 if frame.buffer.width() < 10 && frame.buffer.height() < 10 {
596 // when the remote stops sharing, we get an 8x8 black image.
597 // In a lil bit, the unpublish will come through and close the view,
598 // but until then, don't flash black.
599 return None;
600 }
601
602 video_frame_buffer_from_webrtc(pool?, frame.buffer)
603 }
604 })
605 }
606 #[cfg(not(target_os = "macos"))]
607 {
608 let executor = executor.clone();
609 NativeVideoStream::new(track.0.rtc_track()).filter_map(move |frame| {
610 executor.spawn(async move { video_frame_buffer_from_webrtc(frame.buffer) })
611 })
612 }
613}
614
615#[cfg(target_os = "macos")]
616fn create_buffer_pool(
617 width: u32,
618 height: u32,
619) -> Result<core_video::pixel_buffer_pool::CVPixelBufferPool> {
620 use core_foundation::{base::TCFType, number::CFNumber, string::CFString};
621 use core_video::pixel_buffer;
622 use core_video::{
623 pixel_buffer::kCVPixelFormatType_420YpCbCr8BiPlanarFullRange,
624 pixel_buffer_io_surface::kCVPixelBufferIOSurfaceCoreAnimationCompatibilityKey,
625 pixel_buffer_pool::{self},
626 };
627
628 let width_key: CFString =
629 unsafe { CFString::wrap_under_get_rule(pixel_buffer::kCVPixelBufferWidthKey) };
630 let height_key: CFString =
631 unsafe { CFString::wrap_under_get_rule(pixel_buffer::kCVPixelBufferHeightKey) };
632 let animation_key: CFString = unsafe {
633 CFString::wrap_under_get_rule(kCVPixelBufferIOSurfaceCoreAnimationCompatibilityKey)
634 };
635 let format_key: CFString =
636 unsafe { CFString::wrap_under_get_rule(pixel_buffer::kCVPixelBufferPixelFormatTypeKey) };
637
638 let yes: CFNumber = 1.into();
639 let width: CFNumber = (width as i32).into();
640 let height: CFNumber = (height as i32).into();
641 let format: CFNumber = (kCVPixelFormatType_420YpCbCr8BiPlanarFullRange as i64).into();
642
643 let buffer_attributes = core_foundation::dictionary::CFDictionary::from_CFType_pairs(&[
644 (width_key, width.into_CFType()),
645 (height_key, height.into_CFType()),
646 (animation_key, yes.into_CFType()),
647 (format_key, format.into_CFType()),
648 ]);
649
650 pixel_buffer_pool::CVPixelBufferPool::new(None, Some(&buffer_attributes)).map_err(|cv_return| {
651 anyhow::anyhow!("failed to create pixel buffer pool: CVReturn({cv_return})",)
652 })
653}
654
655#[cfg(target_os = "macos")]
656pub type RemoteVideoFrame = core_video::pixel_buffer::CVPixelBuffer;
657
658#[cfg(target_os = "macos")]
659fn video_frame_buffer_from_webrtc(
660 pool: core_video::pixel_buffer_pool::CVPixelBufferPool,
661 buffer: Box<dyn VideoBuffer>,
662) -> Option<RemoteVideoFrame> {
663 use core_foundation::base::TCFType;
664 use core_video::{pixel_buffer::CVPixelBuffer, r#return::kCVReturnSuccess};
665 use livekit::webrtc::native::yuv_helper::i420_to_nv12;
666
667 if let Some(native) = buffer.as_native() {
668 let pixel_buffer = native.get_cv_pixel_buffer();
669 if pixel_buffer.is_null() {
670 return None;
671 }
672 return unsafe { Some(CVPixelBuffer::wrap_under_get_rule(pixel_buffer as _)) };
673 }
674
675 let i420_buffer = buffer.as_i420()?;
676 let pixel_buffer = pool.create_pixel_buffer().log_err()?;
677
678 let image_buffer = unsafe {
679 if pixel_buffer.lock_base_address(0) != kCVReturnSuccess {
680 return None;
681 }
682
683 let dst_y = pixel_buffer.get_base_address_of_plane(0);
684 let dst_y_stride = pixel_buffer.get_bytes_per_row_of_plane(0);
685 let dst_y_len = pixel_buffer.get_height_of_plane(0) * dst_y_stride;
686 let dst_uv = pixel_buffer.get_base_address_of_plane(1);
687 let dst_uv_stride = pixel_buffer.get_bytes_per_row_of_plane(1);
688 let dst_uv_len = pixel_buffer.get_height_of_plane(1) * dst_uv_stride;
689 let width = pixel_buffer.get_width();
690 let height = pixel_buffer.get_height();
691 let dst_y_buffer = std::slice::from_raw_parts_mut(dst_y as *mut u8, dst_y_len);
692 let dst_uv_buffer = std::slice::from_raw_parts_mut(dst_uv as *mut u8, dst_uv_len);
693
694 let (stride_y, stride_u, stride_v) = i420_buffer.strides();
695 let (src_y, src_u, src_v) = i420_buffer.data();
696 i420_to_nv12(
697 src_y,
698 stride_y,
699 src_u,
700 stride_u,
701 src_v,
702 stride_v,
703 dst_y_buffer,
704 dst_y_stride as u32,
705 dst_uv_buffer,
706 dst_uv_stride as u32,
707 width as i32,
708 height as i32,
709 );
710
711 if pixel_buffer.unlock_base_address(0) != kCVReturnSuccess {
712 return None;
713 }
714
715 pixel_buffer
716 };
717
718 Some(image_buffer)
719}
720
721#[cfg(not(target_os = "macos"))]
722pub type RemoteVideoFrame = Arc<gpui::RenderImage>;
723
724#[cfg(not(target_os = "macos"))]
725fn video_frame_buffer_from_webrtc(buffer: Box<dyn VideoBuffer>) -> Option<RemoteVideoFrame> {
726 use gpui::RenderImage;
727 use image::{Frame, RgbaImage};
728 use livekit::webrtc::prelude::VideoFormatType;
729 use smallvec::SmallVec;
730 use std::alloc::{Layout, alloc};
731
732 let width = buffer.width();
733 let height = buffer.height();
734 let stride = width * 4;
735 let byte_len = (stride * height) as usize;
736 let argb_image = unsafe {
737 // Motivation for this unsafe code is to avoid initializing the frame data, since to_argb
738 // will write all bytes anyway.
739 let start_ptr = alloc(Layout::array::<u8>(byte_len).log_err()?);
740 if start_ptr.is_null() {
741 return None;
742 }
743 let argb_frame_slice = std::slice::from_raw_parts_mut(start_ptr, byte_len);
744 buffer.to_argb(
745 VideoFormatType::ARGB,
746 argb_frame_slice,
747 stride,
748 width as i32,
749 height as i32,
750 );
751 Vec::from_raw_parts(start_ptr, byte_len, byte_len)
752 };
753
754 // TODO: Unclear why providing argb_image to RgbaImage works properly.
755 let image = RgbaImage::from_raw(width, height, argb_image)
756 .with_context(|| "Bug: not enough bytes allocated for image.")
757 .log_err()?;
758
759 Some(Arc::new(RenderImage::new(SmallVec::from_elem(
760 Frame::new(image),
761 1,
762 ))))
763}
764
765#[cfg(target_os = "macos")]
766fn video_frame_buffer_to_webrtc(frame: ScreenCaptureFrame) -> Option<impl AsRef<dyn VideoBuffer>> {
767 use livekit::webrtc;
768
769 let pixel_buffer = frame.0.as_concrete_TypeRef();
770 std::mem::forget(frame.0);
771 unsafe {
772 Some(webrtc::video_frame::native::NativeBuffer::from_cv_pixel_buffer(pixel_buffer as _))
773 }
774}
775
776#[cfg(not(target_os = "macos"))]
777fn video_frame_buffer_to_webrtc(frame: ScreenCaptureFrame) -> Option<impl AsRef<dyn VideoBuffer>> {
778 use libwebrtc::native::yuv_helper::{abgr_to_nv12, argb_to_nv12};
779 use livekit::webrtc::prelude::NV12Buffer;
780 match frame.0 {
781 scap::frame::Frame::BGRx(frame) => {
782 let mut buffer = NV12Buffer::new(frame.width as u32, frame.height as u32);
783 let (stride_y, stride_uv) = buffer.strides();
784 let (data_y, data_uv) = buffer.data_mut();
785 argb_to_nv12(
786 &frame.data,
787 frame.width as u32 * 4,
788 data_y,
789 stride_y,
790 data_uv,
791 stride_uv,
792 frame.width,
793 frame.height,
794 );
795 Some(buffer)
796 }
797 scap::frame::Frame::RGBx(frame) => {
798 let mut buffer = NV12Buffer::new(frame.width as u32, frame.height as u32);
799 let (stride_y, stride_uv) = buffer.strides();
800 let (data_y, data_uv) = buffer.data_mut();
801 abgr_to_nv12(
802 &frame.data,
803 frame.width as u32 * 4,
804 data_y,
805 stride_y,
806 data_uv,
807 stride_uv,
808 frame.width,
809 frame.height,
810 );
811 Some(buffer)
812 }
813 scap::frame::Frame::YUVFrame(yuvframe) => {
814 let mut buffer = NV12Buffer::with_strides(
815 yuvframe.width as u32,
816 yuvframe.height as u32,
817 yuvframe.luminance_stride as u32,
818 yuvframe.chrominance_stride as u32,
819 );
820 let (luminance, chrominance) = buffer.data_mut();
821 luminance.copy_from_slice(yuvframe.luminance_bytes.as_slice());
822 chrominance.copy_from_slice(yuvframe.chrominance_bytes.as_slice());
823 Some(buffer)
824 }
825 _ => {
826 log::error!(
827 "Expected BGRx or YUV frame from scap screen capture but got some other format."
828 );
829 None
830 }
831 }
832}
833
834trait DeviceChangeListenerApi: Stream<Item = ()> + Sized {
835 fn new(input: bool) -> Result<Self>;
836}
837
838#[cfg(target_os = "macos")]
839mod macos {
840 use cocoa::{
841 base::{id, nil},
842 foundation::{NSProcessInfo, NSString},
843 };
844 use coreaudio::sys::{
845 AudioObjectAddPropertyListener, AudioObjectID, AudioObjectPropertyAddress,
846 AudioObjectRemovePropertyListener, OSStatus, kAudioHardwarePropertyDefaultInputDevice,
847 kAudioHardwarePropertyDefaultOutputDevice, kAudioObjectPropertyElementMaster,
848 kAudioObjectPropertyScopeGlobal, kAudioObjectSystemObject,
849 };
850 use futures::{StreamExt, channel::mpsc::UnboundedReceiver};
851 use objc::{msg_send, sel, sel_impl};
852
853 /// A guard that prevents App Nap while held.
854 ///
855 /// On macOS, App Nap can throttle background apps to save power. This can cause
856 /// audio artifacts when the app is not in the foreground. This guard tells macOS
857 /// that we're doing latency-sensitive work and should not be throttled.
858 ///
859 /// See Apple's documentation on prioritizing work at the app level:
860 /// https://developer.apple.com/library/archive/documentation/Performance/Conceptual/power_efficiency_guidelines_osx/PrioritizeWorkAtTheAppLevel.html
861 pub struct PreventAppNapGuard {
862 activity: id,
863 }
864
865 // The activity token returned by NSProcessInfo is thread-safe
866 unsafe impl Send for PreventAppNapGuard {}
867
868 // From NSProcessInfo.h
869 const NS_ACTIVITY_IDLE_SYSTEM_SLEEP_DISABLED: u64 = 1 << 20;
870 const NS_ACTIVITY_USER_INITIATED: u64 = 0x00FFFFFF | NS_ACTIVITY_IDLE_SYSTEM_SLEEP_DISABLED;
871 const NS_ACTIVITY_USER_INITIATED_ALLOWING_IDLE_SYSTEM_SLEEP: u64 =
872 NS_ACTIVITY_USER_INITIATED & !NS_ACTIVITY_IDLE_SYSTEM_SLEEP_DISABLED;
873
874 impl PreventAppNapGuard {
875 pub fn new() -> Self {
876 unsafe {
877 let process_info = NSProcessInfo::processInfo(nil);
878 #[allow(clippy::disallowed_methods)]
879 let reason = NSString::alloc(nil).init_str("Audio playback in progress");
880 let activity: id = msg_send![process_info, beginActivityWithOptions:NS_ACTIVITY_USER_INITIATED_ALLOWING_IDLE_SYSTEM_SLEEP reason:reason];
881 let _: () = msg_send![reason, release];
882 let _: () = msg_send![activity, retain];
883 Self { activity }
884 }
885 }
886 }
887
888 impl Drop for PreventAppNapGuard {
889 fn drop(&mut self) {
890 unsafe {
891 let process_info = NSProcessInfo::processInfo(nil);
892 let _: () = msg_send![process_info, endActivity:self.activity];
893 let _: () = msg_send![self.activity, release];
894 }
895 }
896 }
897
898 /// Implementation from: https://github.com/zed-industries/cpal/blob/fd8bc2fd39f1f5fdee5a0690656caff9a26d9d50/src/host/coreaudio/macos/property_listener.rs#L15
899 pub struct CoreAudioDefaultDeviceChangeListener {
900 rx: UnboundedReceiver<()>,
901 callback: Box<PropertyListenerCallbackWrapper>,
902 input: bool,
903 device_id: AudioObjectID, // Store the device ID to properly remove listeners
904 }
905
906 trait _AssertSend: Send {}
907 impl _AssertSend for CoreAudioDefaultDeviceChangeListener {}
908
909 struct PropertyListenerCallbackWrapper(Box<dyn FnMut() + Send>);
910
911 unsafe extern "C" fn property_listener_handler_shim(
912 _: AudioObjectID,
913 _: u32,
914 _: *const AudioObjectPropertyAddress,
915 callback: *mut ::std::os::raw::c_void,
916 ) -> OSStatus {
917 let wrapper = callback as *mut PropertyListenerCallbackWrapper;
918 unsafe { (*wrapper).0() };
919 0
920 }
921
922 impl super::DeviceChangeListenerApi for CoreAudioDefaultDeviceChangeListener {
923 fn new(input: bool) -> anyhow::Result<Self> {
924 let (tx, rx) = futures::channel::mpsc::unbounded();
925
926 let callback = Box::new(PropertyListenerCallbackWrapper(Box::new(move || {
927 tx.unbounded_send(()).ok();
928 })));
929
930 // Get the current default device ID
931 let device_id = unsafe {
932 // Listen for default device changes
933 coreaudio::Error::from_os_status(AudioObjectAddPropertyListener(
934 kAudioObjectSystemObject,
935 &AudioObjectPropertyAddress {
936 mSelector: if input {
937 kAudioHardwarePropertyDefaultInputDevice
938 } else {
939 kAudioHardwarePropertyDefaultOutputDevice
940 },
941 mScope: kAudioObjectPropertyScopeGlobal,
942 mElement: kAudioObjectPropertyElementMaster,
943 },
944 Some(property_listener_handler_shim),
945 &*callback as *const _ as *mut _,
946 ))?;
947
948 // Also listen for changes to the device configuration
949 let device_id = if input {
950 let mut input_device: AudioObjectID = 0;
951 let mut prop_size = std::mem::size_of::<AudioObjectID>() as u32;
952 let result = coreaudio::sys::AudioObjectGetPropertyData(
953 kAudioObjectSystemObject,
954 &AudioObjectPropertyAddress {
955 mSelector: kAudioHardwarePropertyDefaultInputDevice,
956 mScope: kAudioObjectPropertyScopeGlobal,
957 mElement: kAudioObjectPropertyElementMaster,
958 },
959 0,
960 std::ptr::null(),
961 &mut prop_size as *mut _,
962 &mut input_device as *mut _ as *mut _,
963 );
964 if result != 0 {
965 log::warn!("Failed to get default input device ID");
966 0
967 } else {
968 input_device
969 }
970 } else {
971 let mut output_device: AudioObjectID = 0;
972 let mut prop_size = std::mem::size_of::<AudioObjectID>() as u32;
973 let result = coreaudio::sys::AudioObjectGetPropertyData(
974 kAudioObjectSystemObject,
975 &AudioObjectPropertyAddress {
976 mSelector: kAudioHardwarePropertyDefaultOutputDevice,
977 mScope: kAudioObjectPropertyScopeGlobal,
978 mElement: kAudioObjectPropertyElementMaster,
979 },
980 0,
981 std::ptr::null(),
982 &mut prop_size as *mut _,
983 &mut output_device as *mut _ as *mut _,
984 );
985 if result != 0 {
986 log::warn!("Failed to get default output device ID");
987 0
988 } else {
989 output_device
990 }
991 };
992
993 if device_id != 0 {
994 // Listen for format changes on the device
995 coreaudio::Error::from_os_status(AudioObjectAddPropertyListener(
996 device_id,
997 &AudioObjectPropertyAddress {
998 mSelector: coreaudio::sys::kAudioDevicePropertyStreamFormat,
999 mScope: if input {
1000 coreaudio::sys::kAudioObjectPropertyScopeInput
1001 } else {
1002 coreaudio::sys::kAudioObjectPropertyScopeOutput
1003 },
1004 mElement: kAudioObjectPropertyElementMaster,
1005 },
1006 Some(property_listener_handler_shim),
1007 &*callback as *const _ as *mut _,
1008 ))?;
1009 }
1010
1011 device_id
1012 };
1013
1014 Ok(Self {
1015 rx,
1016 callback,
1017 input,
1018 device_id,
1019 })
1020 }
1021 }
1022
1023 impl Drop for CoreAudioDefaultDeviceChangeListener {
1024 fn drop(&mut self) {
1025 unsafe {
1026 // Remove the system-level property listener
1027 AudioObjectRemovePropertyListener(
1028 kAudioObjectSystemObject,
1029 &AudioObjectPropertyAddress {
1030 mSelector: if self.input {
1031 kAudioHardwarePropertyDefaultInputDevice
1032 } else {
1033 kAudioHardwarePropertyDefaultOutputDevice
1034 },
1035 mScope: kAudioObjectPropertyScopeGlobal,
1036 mElement: kAudioObjectPropertyElementMaster,
1037 },
1038 Some(property_listener_handler_shim),
1039 &*self.callback as *const _ as *mut _,
1040 );
1041
1042 // Remove the device-specific property listener if we have a valid device ID
1043 if self.device_id != 0 {
1044 AudioObjectRemovePropertyListener(
1045 self.device_id,
1046 &AudioObjectPropertyAddress {
1047 mSelector: coreaudio::sys::kAudioDevicePropertyStreamFormat,
1048 mScope: if self.input {
1049 coreaudio::sys::kAudioObjectPropertyScopeInput
1050 } else {
1051 coreaudio::sys::kAudioObjectPropertyScopeOutput
1052 },
1053 mElement: kAudioObjectPropertyElementMaster,
1054 },
1055 Some(property_listener_handler_shim),
1056 &*self.callback as *const _ as *mut _,
1057 );
1058 }
1059 }
1060 }
1061 }
1062
1063 impl futures::Stream for CoreAudioDefaultDeviceChangeListener {
1064 type Item = ();
1065
1066 fn poll_next(
1067 mut self: std::pin::Pin<&mut Self>,
1068 cx: &mut std::task::Context<'_>,
1069 ) -> std::task::Poll<Option<Self::Item>> {
1070 self.rx.poll_next_unpin(cx)
1071 }
1072 }
1073}
1074
1075#[cfg(target_os = "macos")]
1076type DeviceChangeListener = macos::CoreAudioDefaultDeviceChangeListener;
1077#[cfg(target_os = "macos")]
1078use macos::PreventAppNapGuard;
1079
1080#[cfg(not(target_os = "macos"))]
1081mod noop_change_listener {
1082 use std::task::Poll;
1083
1084 use super::DeviceChangeListenerApi;
1085
1086 pub struct NoopOutputDeviceChangelistener {}
1087
1088 impl DeviceChangeListenerApi for NoopOutputDeviceChangelistener {
1089 fn new(_input: bool) -> anyhow::Result<Self> {
1090 Ok(NoopOutputDeviceChangelistener {})
1091 }
1092 }
1093
1094 impl futures::Stream for NoopOutputDeviceChangelistener {
1095 type Item = ();
1096
1097 fn poll_next(
1098 self: std::pin::Pin<&mut Self>,
1099 _cx: &mut std::task::Context<'_>,
1100 ) -> Poll<Option<Self::Item>> {
1101 Poll::Pending
1102 }
1103 }
1104}
1105
1106#[cfg(not(target_os = "macos"))]
1107type DeviceChangeListener = noop_change_listener::NoopOutputDeviceChangelistener;