1use anyhow::{Context as _, Result};
2
3use audio::{AudioSettings, CHANNEL_COUNT, LEGACY_CHANNEL_COUNT, LEGACY_SAMPLE_RATE, SAMPLE_RATE};
4use cpal::traits::{DeviceTrait, StreamTrait as _};
5use futures::channel::mpsc::UnboundedSender;
6use futures::{Stream, StreamExt as _};
7use gpui::{
8 AsyncApp, BackgroundExecutor, ScreenCaptureFrame, ScreenCaptureSource, ScreenCaptureStream,
9 Task,
10};
11use libwebrtc::native::{apm, audio_mixer, audio_resampler};
12use livekit::track;
13
14use livekit::webrtc::{
15 audio_frame::AudioFrame,
16 audio_source::{AudioSourceOptions, RtcAudioSource, native::NativeAudioSource},
17 audio_stream::native::NativeAudioStream,
18 video_frame::{VideoBuffer, VideoFrame, VideoRotation},
19 video_source::{RtcVideoSource, VideoResolution, native::NativeVideoSource},
20 video_stream::native::NativeVideoStream,
21};
22use log::info;
23use parking_lot::Mutex;
24use rodio::Source;
25use serde::{Deserialize, Serialize};
26use settings::Settings;
27use std::cell::RefCell;
28use std::sync::Weak;
29use std::sync::atomic::{AtomicBool, AtomicI32, Ordering};
30use std::time::Duration;
31use std::{borrow::Cow, collections::VecDeque, sync::Arc, thread};
32use util::{ResultExt as _, maybe};
33
34mod source;
35
36pub(crate) struct AudioStack {
37 executor: BackgroundExecutor,
38 apm: Arc<Mutex<apm::AudioProcessingModule>>,
39 mixer: Arc<Mutex<audio_mixer::AudioMixer>>,
40 _output_task: RefCell<Weak<Task<()>>>,
41 next_ssrc: AtomicI32,
42}
43
44pub(crate) fn play_remote_audio_track(
45 track: &livekit::track::RemoteAudioTrack,
46 speaker: Speaker,
47 cx: &mut gpui::App,
48) -> Result<AudioStream> {
49 info!("speaker: {speaker:?}");
50 let stream =
51 source::LiveKitStream::new(cx.background_executor(), track, speaker.sends_legacy_audio);
52
53 let stop_handle = Arc::new(AtomicBool::new(false));
54 let stop_handle_clone = stop_handle.clone();
55 let stream = stream
56 .stoppable()
57 .periodic_access(Duration::from_millis(50), move |s| {
58 if stop_handle.load(Ordering::Relaxed) {
59 s.stop();
60 }
61 });
62
63 info!("sample_rate: {:?}", stream.sample_rate());
64 info!("channel_count: {:?}", stream.channels());
65 audio::Audio::play_voip_stream(stream, speaker.name, speaker.is_staff, cx)
66 .context("Could not play audio")?;
67
68 let on_drop = util::defer(move || {
69 stop_handle_clone.store(true, Ordering::Relaxed);
70 });
71 Ok(AudioStream::Output {
72 _drop: Box::new(on_drop),
73 })
74}
75
76impl AudioStack {
77 pub(crate) fn new(executor: BackgroundExecutor) -> Self {
78 let apm = Arc::new(Mutex::new(apm::AudioProcessingModule::new(
79 true, true, true, true,
80 )));
81 let mixer = Arc::new(Mutex::new(audio_mixer::AudioMixer::new()));
82 Self {
83 executor,
84 apm,
85 mixer,
86 _output_task: RefCell::new(Weak::new()),
87 next_ssrc: AtomicI32::new(1),
88 }
89 }
90
91 pub(crate) fn play_remote_audio_track(
92 &self,
93 track: &livekit::track::RemoteAudioTrack,
94 ) -> AudioStream {
95 let output_task = self.start_output();
96
97 let next_ssrc = self.next_ssrc.fetch_add(1, Ordering::Relaxed);
98 let source = AudioMixerSource {
99 ssrc: next_ssrc,
100 sample_rate: LEGACY_SAMPLE_RATE.get(),
101 num_channels: LEGACY_CHANNEL_COUNT.get() as u32,
102 buffer: Arc::default(),
103 };
104 self.mixer.lock().add_source(source.clone());
105
106 let mut stream = NativeAudioStream::new(
107 track.rtc_track(),
108 source.sample_rate as i32,
109 source.num_channels as i32,
110 );
111
112 let receive_task = self.executor.spawn({
113 let source = source.clone();
114 async move {
115 while let Some(frame) = stream.next().await {
116 source.receive(frame);
117 }
118 }
119 });
120
121 let mixer = self.mixer.clone();
122 let on_drop = util::defer(move || {
123 mixer.lock().remove_source(source.ssrc);
124 drop(receive_task);
125 drop(output_task);
126 });
127
128 AudioStream::Output {
129 _drop: Box::new(on_drop),
130 }
131 }
132
133 fn start_output(&self) -> Arc<Task<()>> {
134 if let Some(task) = self._output_task.borrow().upgrade() {
135 return task;
136 }
137 let task = Arc::new(self.executor.spawn({
138 let apm = self.apm.clone();
139 let mixer = self.mixer.clone();
140 async move {
141 Self::play_output(
142 apm,
143 mixer,
144 LEGACY_SAMPLE_RATE.get(),
145 LEGACY_CHANNEL_COUNT.get().into(),
146 )
147 .await
148 .log_err();
149 }
150 }));
151 *self._output_task.borrow_mut() = Arc::downgrade(&task);
152 task
153 }
154
155 pub(crate) fn capture_local_microphone_track(
156 &self,
157 user_name: String,
158 is_staff: bool,
159 cx: &AsyncApp,
160 ) -> Result<(crate::LocalAudioTrack, AudioStream)> {
161 let legacy_audio_compatible =
162 AudioSettings::try_read_global(cx, |setting| setting.legacy_audio_compatible)
163 .unwrap_or(true);
164
165 let source = if legacy_audio_compatible {
166 NativeAudioSource::new(
167 // n.b. this struct's options are always ignored, noise cancellation is provided by apm.
168 AudioSourceOptions::default(),
169 LEGACY_SAMPLE_RATE.get(),
170 LEGACY_CHANNEL_COUNT.get().into(),
171 10,
172 )
173 } else {
174 NativeAudioSource::new(
175 // n.b. this struct's options are always ignored, noise cancellation is provided by apm.
176 AudioSourceOptions::default(),
177 SAMPLE_RATE.get(),
178 CHANNEL_COUNT.get().into(),
179 10,
180 )
181 };
182
183 let speaker = Speaker {
184 name: user_name,
185 is_staff,
186 sends_legacy_audio: legacy_audio_compatible,
187 };
188 log::info!("Microphone speaker: {speaker:?}");
189 let track_name = serde_urlencoded::to_string(speaker)
190 .context("Could not encode user information in track name")?;
191
192 let track = track::LocalAudioTrack::create_audio_track(
193 &track_name,
194 RtcAudioSource::Native(source.clone()),
195 );
196
197 let apm = self.apm.clone();
198
199 let (frame_tx, mut frame_rx) = futures::channel::mpsc::unbounded();
200 let transmit_task = self.executor.spawn({
201 async move {
202 while let Some(frame) = frame_rx.next().await {
203 source.capture_frame(&frame).await.log_err();
204 }
205 }
206 });
207 let rodio_pipeline =
208 AudioSettings::try_read_global(cx, |setting| setting.rodio_audio).unwrap_or_default();
209 let capture_task = if rodio_pipeline {
210 info!("Using experimental.rodio_audio audio pipeline");
211 let voip_parts = audio::VoipParts::new(cx)?;
212 // Audio needs to run real-time and should never be paused. That is
213 // why we are using a normal std::thread and not a background task
214 thread::Builder::new()
215 .name("MicrophoneToLivekit".to_string())
216 .spawn(move || {
217 // microphone is non send on mac
218 let microphone = match audio::Audio::open_microphone(voip_parts) {
219 Ok(m) => m,
220 Err(e) => {
221 log::error!("Could not open microphone: {e}");
222 return;
223 }
224 };
225 send_to_livekit(frame_tx, microphone);
226 })
227 .expect("should be able to spawn threads");
228 Task::ready(Ok(()))
229 } else {
230 self.executor.spawn(async move {
231 Self::capture_input(
232 apm,
233 frame_tx,
234 LEGACY_SAMPLE_RATE.get(),
235 LEGACY_CHANNEL_COUNT.get().into(),
236 )
237 .await
238 })
239 };
240
241 let on_drop = util::defer(|| {
242 drop(transmit_task);
243 drop(capture_task);
244 });
245 Ok((
246 super::LocalAudioTrack(track),
247 AudioStream::Output {
248 _drop: Box::new(on_drop),
249 },
250 ))
251 }
252
253 async fn play_output(
254 apm: Arc<Mutex<apm::AudioProcessingModule>>,
255 mixer: Arc<Mutex<audio_mixer::AudioMixer>>,
256 sample_rate: u32,
257 num_channels: u32,
258 ) -> Result<()> {
259 // Prevent App Nap from throttling audio playback on macOS.
260 // This guard is held for the entire duration of audio output.
261 #[cfg(target_os = "macos")]
262 let _prevent_app_nap = PreventAppNapGuard::new();
263
264 loop {
265 let mut device_change_listener = DeviceChangeListener::new(false)?;
266 let (output_device, output_config) = crate::default_device(false)?;
267 let (end_on_drop_tx, end_on_drop_rx) = std::sync::mpsc::channel::<()>();
268 let mixer = mixer.clone();
269 let apm = apm.clone();
270 let mut resampler = audio_resampler::AudioResampler::default();
271 let mut buf = Vec::new();
272
273 thread::Builder::new()
274 .name("AudioPlayback".to_owned())
275 .spawn(move || {
276 let output_stream = output_device.build_output_stream(
277 &output_config.config(),
278 {
279 move |mut data, _info| {
280 while data.len() > 0 {
281 if data.len() <= buf.len() {
282 let rest = buf.split_off(data.len());
283 data.copy_from_slice(&buf);
284 buf = rest;
285 return;
286 }
287 if buf.len() > 0 {
288 let (prefix, suffix) = data.split_at_mut(buf.len());
289 prefix.copy_from_slice(&buf);
290 data = suffix;
291 }
292
293 let mut mixer = mixer.lock();
294 let mixed = mixer.mix(output_config.channels() as usize);
295 let sampled = resampler.remix_and_resample(
296 mixed,
297 sample_rate / 100,
298 num_channels,
299 sample_rate,
300 output_config.channels() as u32,
301 output_config.sample_rate().0,
302 );
303 buf = sampled.to_vec();
304 apm.lock()
305 .process_reverse_stream(
306 &mut buf,
307 output_config.sample_rate().0 as i32,
308 output_config.channels() as i32,
309 )
310 .ok();
311 }
312 }
313 },
314 |error| log::error!("error playing audio track: {:?}", error),
315 Some(Duration::from_millis(100)),
316 );
317
318 let Some(output_stream) = output_stream.log_err() else {
319 return;
320 };
321
322 output_stream.play().log_err();
323 // Block forever to keep the output stream alive
324 end_on_drop_rx.recv().ok();
325 })
326 .unwrap();
327
328 device_change_listener.next().await;
329 drop(end_on_drop_tx)
330 }
331 }
332
333 async fn capture_input(
334 apm: Arc<Mutex<apm::AudioProcessingModule>>,
335 frame_tx: UnboundedSender<AudioFrame<'static>>,
336 sample_rate: u32,
337 num_channels: u32,
338 ) -> Result<()> {
339 loop {
340 let mut device_change_listener = DeviceChangeListener::new(true)?;
341 let (device, config) = crate::default_device(true)?;
342 let (end_on_drop_tx, end_on_drop_rx) = std::sync::mpsc::channel::<()>();
343 let apm = apm.clone();
344 let frame_tx = frame_tx.clone();
345 let mut resampler = audio_resampler::AudioResampler::default();
346
347 thread::Builder::new()
348 .name("AudioCapture".to_owned())
349 .spawn(move || {
350 maybe!({
351 if let Some(name) = device.name().ok() {
352 log::info!("Using microphone: {}", name)
353 } else {
354 log::info!("Using microphone: <unknown>");
355 }
356
357 let ten_ms_buffer_size =
358 (config.channels() as u32 * config.sample_rate().0 / 100) as usize;
359 let mut buf: Vec<i16> = Vec::with_capacity(ten_ms_buffer_size);
360
361 let stream = device
362 .build_input_stream_raw(
363 &config.config(),
364 config.sample_format(),
365 move |data, _: &_| {
366 let data = crate::get_sample_data(config.sample_format(), data)
367 .log_err();
368 let Some(data) = data else {
369 return;
370 };
371 let mut data = data.as_slice();
372
373 while data.len() > 0 {
374 let remainder =
375 (buf.capacity() - buf.len()).min(data.len());
376 buf.extend_from_slice(&data[..remainder]);
377 data = &data[remainder..];
378
379 if buf.capacity() == buf.len() {
380 let mut sampled = resampler
381 .remix_and_resample(
382 buf.as_slice(),
383 config.sample_rate().0 / 100,
384 config.channels() as u32,
385 config.sample_rate().0,
386 num_channels,
387 sample_rate,
388 )
389 .to_owned();
390 apm.lock()
391 .process_stream(
392 &mut sampled,
393 sample_rate as i32,
394 num_channels as i32,
395 )
396 .log_err();
397 buf.clear();
398 frame_tx
399 .unbounded_send(AudioFrame {
400 data: Cow::Owned(sampled),
401 sample_rate,
402 num_channels,
403 samples_per_channel: sample_rate / 100,
404 })
405 .ok();
406 }
407 }
408 },
409 |err| log::error!("error capturing audio track: {:?}", err),
410 Some(Duration::from_millis(100)),
411 )
412 .context("failed to build input stream")?;
413
414 stream.play()?;
415 // Keep the thread alive and holding onto the `stream`
416 end_on_drop_rx.recv().ok();
417 anyhow::Ok(Some(()))
418 })
419 .log_err();
420 })
421 .unwrap();
422
423 device_change_listener.next().await;
424 drop(end_on_drop_tx)
425 }
426 }
427}
428
429#[derive(Serialize, Deserialize, Debug)]
430pub struct Speaker {
431 pub name: String,
432 pub is_staff: bool,
433 pub sends_legacy_audio: bool,
434}
435
436fn send_to_livekit(frame_tx: UnboundedSender<AudioFrame<'static>>, mut microphone: impl Source) {
437 use cpal::Sample;
438 let sample_rate = microphone.sample_rate().get();
439 let num_channels = microphone.channels().get() as u32;
440 let buffer_size = sample_rate / 100 * num_channels;
441
442 loop {
443 let sampled: Vec<_> = microphone
444 .by_ref()
445 .take(buffer_size as usize)
446 .map(|s| s.to_sample())
447 .collect();
448
449 if frame_tx
450 .unbounded_send(AudioFrame {
451 sample_rate,
452 num_channels,
453 samples_per_channel: sampled.len() as u32 / num_channels,
454 data: Cow::Owned(sampled),
455 })
456 .is_err()
457 {
458 // must rx has dropped or is not consuming
459 break;
460 }
461 }
462}
463
464use super::LocalVideoTrack;
465
466pub enum AudioStream {
467 Input { _task: Task<()> },
468 Output { _drop: Box<dyn std::any::Any> },
469}
470
471pub(crate) async fn capture_local_video_track(
472 capture_source: &dyn ScreenCaptureSource,
473 cx: &mut gpui::AsyncApp,
474) -> Result<(crate::LocalVideoTrack, Box<dyn ScreenCaptureStream>)> {
475 let metadata = capture_source.metadata()?;
476 let track_source = gpui_tokio::Tokio::spawn(cx, async move {
477 NativeVideoSource::new(VideoResolution {
478 width: metadata.resolution.width.0 as u32,
479 height: metadata.resolution.height.0 as u32,
480 })
481 })
482 .await?;
483
484 let capture_stream = capture_source
485 .stream(cx.foreground_executor(), {
486 let track_source = track_source.clone();
487 Box::new(move |frame| {
488 if let Some(buffer) = video_frame_buffer_to_webrtc(frame) {
489 track_source.capture_frame(&VideoFrame {
490 rotation: VideoRotation::VideoRotation0,
491 timestamp_us: 0,
492 buffer,
493 });
494 }
495 })
496 })
497 .await??;
498
499 Ok((
500 LocalVideoTrack(track::LocalVideoTrack::create_video_track(
501 "screen share",
502 RtcVideoSource::Native(track_source),
503 )),
504 capture_stream,
505 ))
506}
507
508#[derive(Clone)]
509struct AudioMixerSource {
510 ssrc: i32,
511 sample_rate: u32,
512 num_channels: u32,
513 buffer: Arc<Mutex<VecDeque<Vec<i16>>>>,
514}
515
516impl AudioMixerSource {
517 fn receive(&self, frame: AudioFrame) {
518 assert_eq!(
519 frame.data.len() as u32,
520 self.sample_rate * self.num_channels / 100
521 );
522
523 let mut buffer = self.buffer.lock();
524 buffer.push_back(frame.data.to_vec());
525 while buffer.len() > 10 {
526 buffer.pop_front();
527 }
528 }
529}
530
531impl libwebrtc::native::audio_mixer::AudioMixerSource for AudioMixerSource {
532 fn ssrc(&self) -> i32 {
533 self.ssrc
534 }
535
536 fn preferred_sample_rate(&self) -> u32 {
537 self.sample_rate
538 }
539
540 fn get_audio_frame_with_info<'a>(&self, target_sample_rate: u32) -> Option<AudioFrame<'_>> {
541 assert_eq!(self.sample_rate, target_sample_rate);
542 let buf = self.buffer.lock().pop_front()?;
543 Some(AudioFrame {
544 data: Cow::Owned(buf),
545 sample_rate: self.sample_rate,
546 num_channels: self.num_channels,
547 samples_per_channel: self.sample_rate / 100,
548 })
549 }
550}
551
552pub fn play_remote_video_track(
553 track: &crate::RemoteVideoTrack,
554 executor: &BackgroundExecutor,
555) -> impl Stream<Item = RemoteVideoFrame> + use<> {
556 #[cfg(target_os = "macos")]
557 {
558 _ = executor;
559 let mut pool = None;
560 let most_recent_frame_size = (0, 0);
561 NativeVideoStream::new(track.0.rtc_track()).filter_map(move |frame| {
562 if pool == None
563 || most_recent_frame_size != (frame.buffer.width(), frame.buffer.height())
564 {
565 pool = create_buffer_pool(frame.buffer.width(), frame.buffer.height()).log_err();
566 }
567 let pool = pool.clone();
568 async move {
569 if frame.buffer.width() < 10 && frame.buffer.height() < 10 {
570 // when the remote stops sharing, we get an 8x8 black image.
571 // In a lil bit, the unpublish will come through and close the view,
572 // but until then, don't flash black.
573 return None;
574 }
575
576 video_frame_buffer_from_webrtc(pool?, frame.buffer)
577 }
578 })
579 }
580 #[cfg(not(target_os = "macos"))]
581 {
582 let executor = executor.clone();
583 NativeVideoStream::new(track.0.rtc_track()).filter_map(move |frame| {
584 executor.spawn(async move { video_frame_buffer_from_webrtc(frame.buffer) })
585 })
586 }
587}
588
589#[cfg(target_os = "macos")]
590fn create_buffer_pool(
591 width: u32,
592 height: u32,
593) -> Result<core_video::pixel_buffer_pool::CVPixelBufferPool> {
594 use core_foundation::{base::TCFType, number::CFNumber, string::CFString};
595 use core_video::pixel_buffer;
596 use core_video::{
597 pixel_buffer::kCVPixelFormatType_420YpCbCr8BiPlanarFullRange,
598 pixel_buffer_io_surface::kCVPixelBufferIOSurfaceCoreAnimationCompatibilityKey,
599 pixel_buffer_pool::{self},
600 };
601
602 let width_key: CFString =
603 unsafe { CFString::wrap_under_get_rule(pixel_buffer::kCVPixelBufferWidthKey) };
604 let height_key: CFString =
605 unsafe { CFString::wrap_under_get_rule(pixel_buffer::kCVPixelBufferHeightKey) };
606 let animation_key: CFString = unsafe {
607 CFString::wrap_under_get_rule(kCVPixelBufferIOSurfaceCoreAnimationCompatibilityKey)
608 };
609 let format_key: CFString =
610 unsafe { CFString::wrap_under_get_rule(pixel_buffer::kCVPixelBufferPixelFormatTypeKey) };
611
612 let yes: CFNumber = 1.into();
613 let width: CFNumber = (width as i32).into();
614 let height: CFNumber = (height as i32).into();
615 let format: CFNumber = (kCVPixelFormatType_420YpCbCr8BiPlanarFullRange as i64).into();
616
617 let buffer_attributes = core_foundation::dictionary::CFDictionary::from_CFType_pairs(&[
618 (width_key, width.into_CFType()),
619 (height_key, height.into_CFType()),
620 (animation_key, yes.into_CFType()),
621 (format_key, format.into_CFType()),
622 ]);
623
624 pixel_buffer_pool::CVPixelBufferPool::new(None, Some(&buffer_attributes)).map_err(|cv_return| {
625 anyhow::anyhow!("failed to create pixel buffer pool: CVReturn({cv_return})",)
626 })
627}
628
629#[cfg(target_os = "macos")]
630pub type RemoteVideoFrame = core_video::pixel_buffer::CVPixelBuffer;
631
632#[cfg(target_os = "macos")]
633fn video_frame_buffer_from_webrtc(
634 pool: core_video::pixel_buffer_pool::CVPixelBufferPool,
635 buffer: Box<dyn VideoBuffer>,
636) -> Option<RemoteVideoFrame> {
637 use core_foundation::base::TCFType;
638 use core_video::{pixel_buffer::CVPixelBuffer, r#return::kCVReturnSuccess};
639 use livekit::webrtc::native::yuv_helper::i420_to_nv12;
640
641 if let Some(native) = buffer.as_native() {
642 let pixel_buffer = native.get_cv_pixel_buffer();
643 if pixel_buffer.is_null() {
644 return None;
645 }
646 return unsafe { Some(CVPixelBuffer::wrap_under_get_rule(pixel_buffer as _)) };
647 }
648
649 let i420_buffer = buffer.as_i420()?;
650 let pixel_buffer = pool.create_pixel_buffer().log_err()?;
651
652 let image_buffer = unsafe {
653 if pixel_buffer.lock_base_address(0) != kCVReturnSuccess {
654 return None;
655 }
656
657 let dst_y = pixel_buffer.get_base_address_of_plane(0);
658 let dst_y_stride = pixel_buffer.get_bytes_per_row_of_plane(0);
659 let dst_y_len = pixel_buffer.get_height_of_plane(0) * dst_y_stride;
660 let dst_uv = pixel_buffer.get_base_address_of_plane(1);
661 let dst_uv_stride = pixel_buffer.get_bytes_per_row_of_plane(1);
662 let dst_uv_len = pixel_buffer.get_height_of_plane(1) * dst_uv_stride;
663 let width = pixel_buffer.get_width();
664 let height = pixel_buffer.get_height();
665 let dst_y_buffer = std::slice::from_raw_parts_mut(dst_y as *mut u8, dst_y_len);
666 let dst_uv_buffer = std::slice::from_raw_parts_mut(dst_uv as *mut u8, dst_uv_len);
667
668 let (stride_y, stride_u, stride_v) = i420_buffer.strides();
669 let (src_y, src_u, src_v) = i420_buffer.data();
670 i420_to_nv12(
671 src_y,
672 stride_y,
673 src_u,
674 stride_u,
675 src_v,
676 stride_v,
677 dst_y_buffer,
678 dst_y_stride as u32,
679 dst_uv_buffer,
680 dst_uv_stride as u32,
681 width as i32,
682 height as i32,
683 );
684
685 if pixel_buffer.unlock_base_address(0) != kCVReturnSuccess {
686 return None;
687 }
688
689 pixel_buffer
690 };
691
692 Some(image_buffer)
693}
694
695#[cfg(not(target_os = "macos"))]
696pub type RemoteVideoFrame = Arc<gpui::RenderImage>;
697
698#[cfg(not(target_os = "macos"))]
699fn video_frame_buffer_from_webrtc(buffer: Box<dyn VideoBuffer>) -> Option<RemoteVideoFrame> {
700 use gpui::RenderImage;
701 use image::{Frame, RgbaImage};
702 use livekit::webrtc::prelude::VideoFormatType;
703 use smallvec::SmallVec;
704 use std::alloc::{Layout, alloc};
705
706 let width = buffer.width();
707 let height = buffer.height();
708 let stride = width * 4;
709 let byte_len = (stride * height) as usize;
710 let argb_image = unsafe {
711 // Motivation for this unsafe code is to avoid initializing the frame data, since to_argb
712 // will write all bytes anyway.
713 let start_ptr = alloc(Layout::array::<u8>(byte_len).log_err()?);
714 if start_ptr.is_null() {
715 return None;
716 }
717 let argb_frame_slice = std::slice::from_raw_parts_mut(start_ptr, byte_len);
718 buffer.to_argb(
719 VideoFormatType::ARGB,
720 argb_frame_slice,
721 stride,
722 width as i32,
723 height as i32,
724 );
725 Vec::from_raw_parts(start_ptr, byte_len, byte_len)
726 };
727
728 // TODO: Unclear why providing argb_image to RgbaImage works properly.
729 let image = RgbaImage::from_raw(width, height, argb_image)
730 .with_context(|| "Bug: not enough bytes allocated for image.")
731 .log_err()?;
732
733 Some(Arc::new(RenderImage::new(SmallVec::from_elem(
734 Frame::new(image),
735 1,
736 ))))
737}
738
739#[cfg(target_os = "macos")]
740fn video_frame_buffer_to_webrtc(frame: ScreenCaptureFrame) -> Option<impl AsRef<dyn VideoBuffer>> {
741 use livekit::webrtc;
742
743 let pixel_buffer = frame.0.as_concrete_TypeRef();
744 std::mem::forget(frame.0);
745 unsafe {
746 Some(webrtc::video_frame::native::NativeBuffer::from_cv_pixel_buffer(pixel_buffer as _))
747 }
748}
749
750#[cfg(not(target_os = "macos"))]
751fn video_frame_buffer_to_webrtc(frame: ScreenCaptureFrame) -> Option<impl AsRef<dyn VideoBuffer>> {
752 use libwebrtc::native::yuv_helper::{abgr_to_nv12, argb_to_nv12};
753 use livekit::webrtc::prelude::NV12Buffer;
754 match frame.0 {
755 scap::frame::Frame::BGRx(frame) => {
756 let mut buffer = NV12Buffer::new(frame.width as u32, frame.height as u32);
757 let (stride_y, stride_uv) = buffer.strides();
758 let (data_y, data_uv) = buffer.data_mut();
759 argb_to_nv12(
760 &frame.data,
761 frame.width as u32 * 4,
762 data_y,
763 stride_y,
764 data_uv,
765 stride_uv,
766 frame.width,
767 frame.height,
768 );
769 Some(buffer)
770 }
771 scap::frame::Frame::RGBx(frame) => {
772 let mut buffer = NV12Buffer::new(frame.width as u32, frame.height as u32);
773 let (stride_y, stride_uv) = buffer.strides();
774 let (data_y, data_uv) = buffer.data_mut();
775 abgr_to_nv12(
776 &frame.data,
777 frame.width as u32 * 4,
778 data_y,
779 stride_y,
780 data_uv,
781 stride_uv,
782 frame.width,
783 frame.height,
784 );
785 Some(buffer)
786 }
787 scap::frame::Frame::YUVFrame(yuvframe) => {
788 let mut buffer = NV12Buffer::with_strides(
789 yuvframe.width as u32,
790 yuvframe.height as u32,
791 yuvframe.luminance_stride as u32,
792 yuvframe.chrominance_stride as u32,
793 );
794 let (luminance, chrominance) = buffer.data_mut();
795 luminance.copy_from_slice(yuvframe.luminance_bytes.as_slice());
796 chrominance.copy_from_slice(yuvframe.chrominance_bytes.as_slice());
797 Some(buffer)
798 }
799 _ => {
800 log::error!(
801 "Expected BGRx or YUV frame from scap screen capture but got some other format."
802 );
803 None
804 }
805 }
806}
807
808trait DeviceChangeListenerApi: Stream<Item = ()> + Sized {
809 fn new(input: bool) -> Result<Self>;
810}
811
812#[cfg(target_os = "macos")]
813mod macos {
814 use cocoa::{
815 base::{id, nil},
816 foundation::{NSProcessInfo, NSString},
817 };
818 use coreaudio::sys::{
819 AudioObjectAddPropertyListener, AudioObjectID, AudioObjectPropertyAddress,
820 AudioObjectRemovePropertyListener, OSStatus, kAudioHardwarePropertyDefaultInputDevice,
821 kAudioHardwarePropertyDefaultOutputDevice, kAudioObjectPropertyElementMaster,
822 kAudioObjectPropertyScopeGlobal, kAudioObjectSystemObject,
823 };
824 use futures::{StreamExt, channel::mpsc::UnboundedReceiver};
825 use objc::{msg_send, sel, sel_impl};
826
827 /// A guard that prevents App Nap while held.
828 ///
829 /// On macOS, App Nap can throttle background apps to save power. This can cause
830 /// audio artifacts when the app is not in the foreground. This guard tells macOS
831 /// that we're doing latency-sensitive work and should not be throttled.
832 ///
833 /// See Apple's documentation on prioritizing work at the app level:
834 /// https://developer.apple.com/library/archive/documentation/Performance/Conceptual/power_efficiency_guidelines_osx/PrioritizeWorkAtTheAppLevel.html
835 pub struct PreventAppNapGuard {
836 activity: id,
837 }
838
839 // The activity token returned by NSProcessInfo is thread-safe
840 unsafe impl Send for PreventAppNapGuard {}
841
842 // From NSProcessInfo.h
843 const NS_ACTIVITY_IDLE_SYSTEM_SLEEP_DISABLED: u64 = 1 << 20;
844 const NS_ACTIVITY_USER_INITIATED: u64 = 0x00FFFFFF | NS_ACTIVITY_IDLE_SYSTEM_SLEEP_DISABLED;
845 const NS_ACTIVITY_USER_INITIATED_ALLOWING_IDLE_SYSTEM_SLEEP: u64 =
846 NS_ACTIVITY_USER_INITIATED & !NS_ACTIVITY_IDLE_SYSTEM_SLEEP_DISABLED;
847
848 impl PreventAppNapGuard {
849 pub fn new() -> Self {
850 unsafe {
851 let process_info = NSProcessInfo::processInfo(nil);
852 #[allow(clippy::disallowed_methods)]
853 let reason = NSString::alloc(nil).init_str("Audio playback in progress");
854 let activity: id = msg_send![process_info, beginActivityWithOptions:NS_ACTIVITY_USER_INITIATED_ALLOWING_IDLE_SYSTEM_SLEEP reason:reason];
855 let _: () = msg_send![reason, release];
856 let _: () = msg_send![activity, retain];
857 Self { activity }
858 }
859 }
860 }
861
862 impl Drop for PreventAppNapGuard {
863 fn drop(&mut self) {
864 unsafe {
865 let process_info = NSProcessInfo::processInfo(nil);
866 let _: () = msg_send![process_info, endActivity:self.activity];
867 let _: () = msg_send![self.activity, release];
868 }
869 }
870 }
871
872 /// Implementation from: https://github.com/zed-industries/cpal/blob/fd8bc2fd39f1f5fdee5a0690656caff9a26d9d50/src/host/coreaudio/macos/property_listener.rs#L15
873 pub struct CoreAudioDefaultDeviceChangeListener {
874 rx: UnboundedReceiver<()>,
875 callback: Box<PropertyListenerCallbackWrapper>,
876 input: bool,
877 device_id: AudioObjectID, // Store the device ID to properly remove listeners
878 }
879
880 trait _AssertSend: Send {}
881 impl _AssertSend for CoreAudioDefaultDeviceChangeListener {}
882
883 struct PropertyListenerCallbackWrapper(Box<dyn FnMut() + Send>);
884
885 unsafe extern "C" fn property_listener_handler_shim(
886 _: AudioObjectID,
887 _: u32,
888 _: *const AudioObjectPropertyAddress,
889 callback: *mut ::std::os::raw::c_void,
890 ) -> OSStatus {
891 let wrapper = callback as *mut PropertyListenerCallbackWrapper;
892 unsafe { (*wrapper).0() };
893 0
894 }
895
896 impl super::DeviceChangeListenerApi for CoreAudioDefaultDeviceChangeListener {
897 fn new(input: bool) -> anyhow::Result<Self> {
898 let (tx, rx) = futures::channel::mpsc::unbounded();
899
900 let callback = Box::new(PropertyListenerCallbackWrapper(Box::new(move || {
901 tx.unbounded_send(()).ok();
902 })));
903
904 // Get the current default device ID
905 let device_id = unsafe {
906 // Listen for default device changes
907 coreaudio::Error::from_os_status(AudioObjectAddPropertyListener(
908 kAudioObjectSystemObject,
909 &AudioObjectPropertyAddress {
910 mSelector: if input {
911 kAudioHardwarePropertyDefaultInputDevice
912 } else {
913 kAudioHardwarePropertyDefaultOutputDevice
914 },
915 mScope: kAudioObjectPropertyScopeGlobal,
916 mElement: kAudioObjectPropertyElementMaster,
917 },
918 Some(property_listener_handler_shim),
919 &*callback as *const _ as *mut _,
920 ))?;
921
922 // Also listen for changes to the device configuration
923 let device_id = if input {
924 let mut input_device: AudioObjectID = 0;
925 let mut prop_size = std::mem::size_of::<AudioObjectID>() as u32;
926 let result = coreaudio::sys::AudioObjectGetPropertyData(
927 kAudioObjectSystemObject,
928 &AudioObjectPropertyAddress {
929 mSelector: kAudioHardwarePropertyDefaultInputDevice,
930 mScope: kAudioObjectPropertyScopeGlobal,
931 mElement: kAudioObjectPropertyElementMaster,
932 },
933 0,
934 std::ptr::null(),
935 &mut prop_size as *mut _,
936 &mut input_device as *mut _ as *mut _,
937 );
938 if result != 0 {
939 log::warn!("Failed to get default input device ID");
940 0
941 } else {
942 input_device
943 }
944 } else {
945 let mut output_device: AudioObjectID = 0;
946 let mut prop_size = std::mem::size_of::<AudioObjectID>() as u32;
947 let result = coreaudio::sys::AudioObjectGetPropertyData(
948 kAudioObjectSystemObject,
949 &AudioObjectPropertyAddress {
950 mSelector: kAudioHardwarePropertyDefaultOutputDevice,
951 mScope: kAudioObjectPropertyScopeGlobal,
952 mElement: kAudioObjectPropertyElementMaster,
953 },
954 0,
955 std::ptr::null(),
956 &mut prop_size as *mut _,
957 &mut output_device as *mut _ as *mut _,
958 );
959 if result != 0 {
960 log::warn!("Failed to get default output device ID");
961 0
962 } else {
963 output_device
964 }
965 };
966
967 if device_id != 0 {
968 // Listen for format changes on the device
969 coreaudio::Error::from_os_status(AudioObjectAddPropertyListener(
970 device_id,
971 &AudioObjectPropertyAddress {
972 mSelector: coreaudio::sys::kAudioDevicePropertyStreamFormat,
973 mScope: if input {
974 coreaudio::sys::kAudioObjectPropertyScopeInput
975 } else {
976 coreaudio::sys::kAudioObjectPropertyScopeOutput
977 },
978 mElement: kAudioObjectPropertyElementMaster,
979 },
980 Some(property_listener_handler_shim),
981 &*callback as *const _ as *mut _,
982 ))?;
983 }
984
985 device_id
986 };
987
988 Ok(Self {
989 rx,
990 callback,
991 input,
992 device_id,
993 })
994 }
995 }
996
997 impl Drop for CoreAudioDefaultDeviceChangeListener {
998 fn drop(&mut self) {
999 unsafe {
1000 // Remove the system-level property listener
1001 AudioObjectRemovePropertyListener(
1002 kAudioObjectSystemObject,
1003 &AudioObjectPropertyAddress {
1004 mSelector: if self.input {
1005 kAudioHardwarePropertyDefaultInputDevice
1006 } else {
1007 kAudioHardwarePropertyDefaultOutputDevice
1008 },
1009 mScope: kAudioObjectPropertyScopeGlobal,
1010 mElement: kAudioObjectPropertyElementMaster,
1011 },
1012 Some(property_listener_handler_shim),
1013 &*self.callback as *const _ as *mut _,
1014 );
1015
1016 // Remove the device-specific property listener if we have a valid device ID
1017 if self.device_id != 0 {
1018 AudioObjectRemovePropertyListener(
1019 self.device_id,
1020 &AudioObjectPropertyAddress {
1021 mSelector: coreaudio::sys::kAudioDevicePropertyStreamFormat,
1022 mScope: if self.input {
1023 coreaudio::sys::kAudioObjectPropertyScopeInput
1024 } else {
1025 coreaudio::sys::kAudioObjectPropertyScopeOutput
1026 },
1027 mElement: kAudioObjectPropertyElementMaster,
1028 },
1029 Some(property_listener_handler_shim),
1030 &*self.callback as *const _ as *mut _,
1031 );
1032 }
1033 }
1034 }
1035 }
1036
1037 impl futures::Stream for CoreAudioDefaultDeviceChangeListener {
1038 type Item = ();
1039
1040 fn poll_next(
1041 mut self: std::pin::Pin<&mut Self>,
1042 cx: &mut std::task::Context<'_>,
1043 ) -> std::task::Poll<Option<Self::Item>> {
1044 self.rx.poll_next_unpin(cx)
1045 }
1046 }
1047}
1048
1049#[cfg(target_os = "macos")]
1050type DeviceChangeListener = macos::CoreAudioDefaultDeviceChangeListener;
1051#[cfg(target_os = "macos")]
1052use macos::PreventAppNapGuard;
1053
1054#[cfg(not(target_os = "macos"))]
1055mod noop_change_listener {
1056 use std::task::Poll;
1057
1058 use super::DeviceChangeListenerApi;
1059
1060 pub struct NoopOutputDeviceChangelistener {}
1061
1062 impl DeviceChangeListenerApi for NoopOutputDeviceChangelistener {
1063 fn new(_input: bool) -> anyhow::Result<Self> {
1064 Ok(NoopOutputDeviceChangelistener {})
1065 }
1066 }
1067
1068 impl futures::Stream for NoopOutputDeviceChangelistener {
1069 type Item = ();
1070
1071 fn poll_next(
1072 self: std::pin::Pin<&mut Self>,
1073 _cx: &mut std::task::Context<'_>,
1074 ) -> Poll<Option<Self::Item>> {
1075 Poll::Pending
1076 }
1077 }
1078}
1079
1080#[cfg(not(target_os = "macos"))]
1081type DeviceChangeListener = noop_change_listener::NoopOutputDeviceChangelistener;