1use anyhow::{Context as _, Result};
2
3use audio::{AudioSettings, CHANNEL_COUNT, SAMPLE_RATE};
4use cpal::traits::{DeviceTrait, StreamTrait as _};
5use futures::channel::mpsc::UnboundedSender;
6use futures::{Stream, StreamExt as _};
7use gpui::{
8 AsyncApp, BackgroundExecutor, ScreenCaptureFrame, ScreenCaptureSource, ScreenCaptureStream,
9 Task,
10};
11use libwebrtc::native::{apm, audio_mixer, audio_resampler};
12use livekit::track;
13
14use livekit::webrtc::{
15 audio_frame::AudioFrame,
16 audio_source::{AudioSourceOptions, RtcAudioSource, native::NativeAudioSource},
17 audio_stream::native::NativeAudioStream,
18 video_frame::{VideoBuffer, VideoFrame, VideoRotation},
19 video_source::{RtcVideoSource, VideoResolution, native::NativeVideoSource},
20 video_stream::native::NativeVideoStream,
21};
22use log::info;
23use parking_lot::Mutex;
24use rodio::Source;
25use serde::{Deserialize, Serialize};
26use settings::Settings;
27use std::cell::RefCell;
28use std::sync::Weak;
29use std::sync::atomic::{AtomicBool, AtomicI32, Ordering};
30use std::time::Duration;
31use std::{borrow::Cow, collections::VecDeque, sync::Arc, thread};
32use util::{ResultExt as _, maybe};
33
34mod source;
35
36pub(crate) struct AudioStack {
37 executor: BackgroundExecutor,
38 apm: Arc<Mutex<apm::AudioProcessingModule>>,
39 mixer: Arc<Mutex<audio_mixer::AudioMixer>>,
40 _output_task: RefCell<Weak<Task<()>>>,
41 next_ssrc: AtomicI32,
42}
43
44pub(crate) fn play_remote_audio_track(
45 track: &livekit::track::RemoteAudioTrack,
46 cx: &mut gpui::App,
47) -> Result<AudioStream> {
48 let stop_handle = Arc::new(AtomicBool::new(false));
49 let stop_handle_clone = stop_handle.clone();
50 let stream = source::LiveKitStream::new(cx.background_executor(), track);
51
52 let stream = stream
53 .stoppable()
54 .periodic_access(Duration::from_millis(50), move |s| {
55 if stop_handle.load(Ordering::Relaxed) {
56 s.stop();
57 }
58 });
59
60 let speaker: Speaker = serde_urlencoded::from_str(&track.name()).unwrap_or_else(|_| Speaker {
61 name: track.name(),
62 is_staff: false,
63 });
64 audio::Audio::play_voip_stream(stream, speaker.name, speaker.is_staff, cx)
65 .context("Could not play audio")?;
66
67 let on_drop = util::defer(move || {
68 stop_handle_clone.store(true, Ordering::Relaxed);
69 });
70 Ok(AudioStream::Output {
71 _drop: Box::new(on_drop),
72 })
73}
74
75impl AudioStack {
76 pub(crate) fn new(executor: BackgroundExecutor) -> Self {
77 let apm = Arc::new(Mutex::new(apm::AudioProcessingModule::new(
78 true, true, true, true,
79 )));
80 let mixer = Arc::new(Mutex::new(audio_mixer::AudioMixer::new()));
81 Self {
82 executor,
83 apm,
84 mixer,
85 _output_task: RefCell::new(Weak::new()),
86 next_ssrc: AtomicI32::new(1),
87 }
88 }
89
90 pub(crate) fn play_remote_audio_track(
91 &self,
92 track: &livekit::track::RemoteAudioTrack,
93 ) -> AudioStream {
94 let output_task = self.start_output();
95
96 let next_ssrc = self.next_ssrc.fetch_add(1, Ordering::Relaxed);
97 let source = AudioMixerSource {
98 ssrc: next_ssrc,
99 sample_rate: SAMPLE_RATE.get(),
100 num_channels: CHANNEL_COUNT.get() as u32,
101 buffer: Arc::default(),
102 };
103 self.mixer.lock().add_source(source.clone());
104
105 let mut stream = NativeAudioStream::new(
106 track.rtc_track(),
107 source.sample_rate as i32,
108 source.num_channels as i32,
109 );
110
111 let receive_task = self.executor.spawn({
112 let source = source.clone();
113 async move {
114 while let Some(frame) = stream.next().await {
115 source.receive(frame);
116 }
117 }
118 });
119
120 let mixer = self.mixer.clone();
121 let on_drop = util::defer(move || {
122 mixer.lock().remove_source(source.ssrc);
123 drop(receive_task);
124 drop(output_task);
125 });
126
127 AudioStream::Output {
128 _drop: Box::new(on_drop),
129 }
130 }
131
132 fn start_output(&self) -> Arc<Task<()>> {
133 if let Some(task) = self._output_task.borrow().upgrade() {
134 return task;
135 }
136 let task = Arc::new(self.executor.spawn({
137 let apm = self.apm.clone();
138 let mixer = self.mixer.clone();
139 async move {
140 Self::play_output(apm, mixer, SAMPLE_RATE.get(), CHANNEL_COUNT.get().into())
141 .await
142 .log_err();
143 }
144 }));
145 *self._output_task.borrow_mut() = Arc::downgrade(&task);
146 task
147 }
148
149 pub(crate) fn capture_local_microphone_track(
150 &self,
151 user_name: String,
152 is_staff: bool,
153 cx: &AsyncApp,
154 ) -> Result<(crate::LocalAudioTrack, AudioStream)> {
155 let source = NativeAudioSource::new(
156 // n.b. this struct's options are always ignored, noise cancellation is provided by apm.
157 AudioSourceOptions::default(),
158 SAMPLE_RATE.get(),
159 CHANNEL_COUNT.get().into(),
160 10,
161 );
162
163 let track_name = serde_urlencoded::to_string(Speaker {
164 name: user_name,
165 is_staff,
166 })
167 .context("Could not encode user information in track name")?;
168
169 let track = track::LocalAudioTrack::create_audio_track(
170 &track_name,
171 RtcAudioSource::Native(source.clone()),
172 );
173
174 let apm = self.apm.clone();
175
176 let (frame_tx, mut frame_rx) = futures::channel::mpsc::unbounded();
177 let transmit_task = self.executor.spawn({
178 async move {
179 while let Some(frame) = frame_rx.next().await {
180 source.capture_frame(&frame).await.log_err();
181 }
182 }
183 });
184 let rodio_pipeline =
185 AudioSettings::try_read_global(cx, |setting| setting.rodio_audio).unwrap_or_default();
186 let capture_task = if rodio_pipeline {
187 info!("Using experimental.rodio_audio audio pipeline");
188 let voip_parts = audio::VoipParts::new(cx)?;
189 // Audio needs to run real-time and should never be paused. That is why we are using a
190 // normal std::thread and not a background task
191 thread::Builder::new()
192 .name("AudioCapture".to_string())
193 .spawn(move || {
194 // microphone is non send on mac
195 let microphone = audio::Audio::open_microphone(voip_parts)?;
196 send_to_livekit(frame_tx, microphone);
197 Ok::<(), anyhow::Error>(())
198 })
199 .unwrap();
200 Task::ready(Ok(()))
201 } else {
202 self.executor.spawn(async move {
203 Self::capture_input(apm, frame_tx, SAMPLE_RATE.get(), CHANNEL_COUNT.get().into())
204 .await
205 })
206 };
207
208 let on_drop = util::defer(|| {
209 drop(transmit_task);
210 drop(capture_task);
211 });
212 Ok((
213 super::LocalAudioTrack(track),
214 AudioStream::Output {
215 _drop: Box::new(on_drop),
216 },
217 ))
218 }
219
220 async fn play_output(
221 apm: Arc<Mutex<apm::AudioProcessingModule>>,
222 mixer: Arc<Mutex<audio_mixer::AudioMixer>>,
223 sample_rate: u32,
224 num_channels: u32,
225 ) -> Result<()> {
226 loop {
227 let mut device_change_listener = DeviceChangeListener::new(false)?;
228 let (output_device, output_config) = crate::default_device(false)?;
229 let (end_on_drop_tx, end_on_drop_rx) = std::sync::mpsc::channel::<()>();
230 let mixer = mixer.clone();
231 let apm = apm.clone();
232 let mut resampler = audio_resampler::AudioResampler::default();
233 let mut buf = Vec::new();
234
235 thread::Builder::new()
236 .name("AudioPlayback".to_owned())
237 .spawn(move || {
238 let output_stream = output_device.build_output_stream(
239 &output_config.config(),
240 {
241 move |mut data, _info| {
242 while data.len() > 0 {
243 if data.len() <= buf.len() {
244 let rest = buf.split_off(data.len());
245 data.copy_from_slice(&buf);
246 buf = rest;
247 return;
248 }
249 if buf.len() > 0 {
250 let (prefix, suffix) = data.split_at_mut(buf.len());
251 prefix.copy_from_slice(&buf);
252 data = suffix;
253 }
254
255 let mut mixer = mixer.lock();
256 let mixed = mixer.mix(output_config.channels() as usize);
257 let sampled = resampler.remix_and_resample(
258 mixed,
259 sample_rate / 100,
260 num_channels,
261 sample_rate,
262 output_config.channels() as u32,
263 output_config.sample_rate().0,
264 );
265 buf = sampled.to_vec();
266 apm.lock()
267 .process_reverse_stream(
268 &mut buf,
269 output_config.sample_rate().0 as i32,
270 output_config.channels() as i32,
271 )
272 .ok();
273 }
274 }
275 },
276 |error| log::error!("error playing audio track: {:?}", error),
277 Some(Duration::from_millis(100)),
278 );
279
280 let Some(output_stream) = output_stream.log_err() else {
281 return;
282 };
283
284 output_stream.play().log_err();
285 // Block forever to keep the output stream alive
286 end_on_drop_rx.recv().ok();
287 })
288 .unwrap();
289
290 device_change_listener.next().await;
291 drop(end_on_drop_tx)
292 }
293 }
294
295 async fn capture_input(
296 apm: Arc<Mutex<apm::AudioProcessingModule>>,
297 frame_tx: UnboundedSender<AudioFrame<'static>>,
298 sample_rate: u32,
299 num_channels: u32,
300 ) -> Result<()> {
301 loop {
302 let mut device_change_listener = DeviceChangeListener::new(true)?;
303 let (device, config) = crate::default_device(true)?;
304 let (end_on_drop_tx, end_on_drop_rx) = std::sync::mpsc::channel::<()>();
305 let apm = apm.clone();
306 let frame_tx = frame_tx.clone();
307 let mut resampler = audio_resampler::AudioResampler::default();
308
309 thread::Builder::new()
310 .name("AudioCapture".to_owned())
311 .spawn(move || {
312 maybe!({
313 if let Some(name) = device.name().ok() {
314 log::info!("Using microphone: {}", name)
315 } else {
316 log::info!("Using microphone: <unknown>");
317 }
318
319 let ten_ms_buffer_size =
320 (config.channels() as u32 * config.sample_rate().0 / 100) as usize;
321 let mut buf: Vec<i16> = Vec::with_capacity(ten_ms_buffer_size);
322
323 let stream = device
324 .build_input_stream_raw(
325 &config.config(),
326 config.sample_format(),
327 move |data, _: &_| {
328 let data = crate::get_sample_data(config.sample_format(), data)
329 .log_err();
330 let Some(data) = data else {
331 return;
332 };
333 let mut data = data.as_slice();
334
335 while data.len() > 0 {
336 let remainder =
337 (buf.capacity() - buf.len()).min(data.len());
338 buf.extend_from_slice(&data[..remainder]);
339 data = &data[remainder..];
340
341 if buf.capacity() == buf.len() {
342 let mut sampled = resampler
343 .remix_and_resample(
344 buf.as_slice(),
345 config.sample_rate().0 / 100,
346 config.channels() as u32,
347 config.sample_rate().0,
348 num_channels,
349 sample_rate,
350 )
351 .to_owned();
352 apm.lock()
353 .process_stream(
354 &mut sampled,
355 sample_rate as i32,
356 num_channels as i32,
357 )
358 .log_err();
359 buf.clear();
360 frame_tx
361 .unbounded_send(AudioFrame {
362 data: Cow::Owned(sampled),
363 sample_rate,
364 num_channels,
365 samples_per_channel: sample_rate / 100,
366 })
367 .ok();
368 }
369 }
370 },
371 |err| log::error!("error capturing audio track: {:?}", err),
372 Some(Duration::from_millis(100)),
373 )
374 .context("failed to build input stream")?;
375
376 stream.play()?;
377 // Keep the thread alive and holding onto the `stream`
378 end_on_drop_rx.recv().ok();
379 anyhow::Ok(Some(()))
380 })
381 .log_err();
382 })
383 .unwrap();
384
385 device_change_listener.next().await;
386 drop(end_on_drop_tx)
387 }
388 }
389}
390
391#[derive(Serialize, Deserialize)]
392struct Speaker {
393 name: String,
394 is_staff: bool,
395}
396
397fn send_to_livekit(frame_tx: UnboundedSender<AudioFrame<'static>>, mut microphone: impl Source) {
398 use cpal::Sample;
399 loop {
400 let sampled: Vec<_> = microphone
401 .by_ref()
402 .take(audio::BUFFER_SIZE)
403 .map(|s| s.to_sample())
404 .collect();
405
406 if frame_tx
407 .unbounded_send(AudioFrame {
408 sample_rate: SAMPLE_RATE.get(),
409 num_channels: CHANNEL_COUNT.get() as u32,
410 samples_per_channel: sampled.len() as u32 / CHANNEL_COUNT.get() as u32,
411 data: Cow::Owned(sampled),
412 })
413 .is_err()
414 {
415 // must rx has dropped or is not consuming
416 break;
417 }
418 }
419}
420
421use super::LocalVideoTrack;
422
423pub enum AudioStream {
424 Input { _task: Task<()> },
425 Output { _drop: Box<dyn std::any::Any> },
426}
427
428pub(crate) async fn capture_local_video_track(
429 capture_source: &dyn ScreenCaptureSource,
430 cx: &mut gpui::AsyncApp,
431) -> Result<(crate::LocalVideoTrack, Box<dyn ScreenCaptureStream>)> {
432 let metadata = capture_source.metadata()?;
433 let track_source = gpui_tokio::Tokio::spawn(cx, async move {
434 NativeVideoSource::new(VideoResolution {
435 width: metadata.resolution.width.0 as u32,
436 height: metadata.resolution.height.0 as u32,
437 })
438 })?
439 .await?;
440
441 let capture_stream = capture_source
442 .stream(cx.foreground_executor(), {
443 let track_source = track_source.clone();
444 Box::new(move |frame| {
445 if let Some(buffer) = video_frame_buffer_to_webrtc(frame) {
446 track_source.capture_frame(&VideoFrame {
447 rotation: VideoRotation::VideoRotation0,
448 timestamp_us: 0,
449 buffer,
450 });
451 }
452 })
453 })
454 .await??;
455
456 Ok((
457 LocalVideoTrack(track::LocalVideoTrack::create_video_track(
458 "screen share",
459 RtcVideoSource::Native(track_source),
460 )),
461 capture_stream,
462 ))
463}
464
465#[derive(Clone)]
466struct AudioMixerSource {
467 ssrc: i32,
468 sample_rate: u32,
469 num_channels: u32,
470 buffer: Arc<Mutex<VecDeque<Vec<i16>>>>,
471}
472
473impl AudioMixerSource {
474 fn receive(&self, frame: AudioFrame) {
475 assert_eq!(
476 frame.data.len() as u32,
477 self.sample_rate * self.num_channels / 100
478 );
479
480 let mut buffer = self.buffer.lock();
481 buffer.push_back(frame.data.to_vec());
482 while buffer.len() > 10 {
483 buffer.pop_front();
484 }
485 }
486}
487
488impl libwebrtc::native::audio_mixer::AudioMixerSource for AudioMixerSource {
489 fn ssrc(&self) -> i32 {
490 self.ssrc
491 }
492
493 fn preferred_sample_rate(&self) -> u32 {
494 self.sample_rate
495 }
496
497 fn get_audio_frame_with_info<'a>(&self, target_sample_rate: u32) -> Option<AudioFrame<'_>> {
498 assert_eq!(self.sample_rate, target_sample_rate);
499 let buf = self.buffer.lock().pop_front()?;
500 Some(AudioFrame {
501 data: Cow::Owned(buf),
502 sample_rate: self.sample_rate,
503 num_channels: self.num_channels,
504 samples_per_channel: self.sample_rate / 100,
505 })
506 }
507}
508
509pub fn play_remote_video_track(
510 track: &crate::RemoteVideoTrack,
511) -> impl Stream<Item = RemoteVideoFrame> + use<> {
512 #[cfg(target_os = "macos")]
513 {
514 let mut pool = None;
515 let most_recent_frame_size = (0, 0);
516 NativeVideoStream::new(track.0.rtc_track()).filter_map(move |frame| {
517 if pool == None
518 || most_recent_frame_size != (frame.buffer.width(), frame.buffer.height())
519 {
520 pool = create_buffer_pool(frame.buffer.width(), frame.buffer.height()).log_err();
521 }
522 let pool = pool.clone();
523 async move {
524 if frame.buffer.width() < 10 && frame.buffer.height() < 10 {
525 // when the remote stops sharing, we get an 8x8 black image.
526 // In a lil bit, the unpublish will come through and close the view,
527 // but until then, don't flash black.
528 return None;
529 }
530
531 video_frame_buffer_from_webrtc(pool?, frame.buffer)
532 }
533 })
534 }
535 #[cfg(not(target_os = "macos"))]
536 {
537 NativeVideoStream::new(track.0.rtc_track())
538 .filter_map(|frame| async move { video_frame_buffer_from_webrtc(frame.buffer) })
539 }
540}
541
542#[cfg(target_os = "macos")]
543fn create_buffer_pool(
544 width: u32,
545 height: u32,
546) -> Result<core_video::pixel_buffer_pool::CVPixelBufferPool> {
547 use core_foundation::{base::TCFType, number::CFNumber, string::CFString};
548 use core_video::pixel_buffer;
549 use core_video::{
550 pixel_buffer::kCVPixelFormatType_420YpCbCr8BiPlanarFullRange,
551 pixel_buffer_io_surface::kCVPixelBufferIOSurfaceCoreAnimationCompatibilityKey,
552 pixel_buffer_pool::{self},
553 };
554
555 let width_key: CFString =
556 unsafe { CFString::wrap_under_get_rule(pixel_buffer::kCVPixelBufferWidthKey) };
557 let height_key: CFString =
558 unsafe { CFString::wrap_under_get_rule(pixel_buffer::kCVPixelBufferHeightKey) };
559 let animation_key: CFString = unsafe {
560 CFString::wrap_under_get_rule(kCVPixelBufferIOSurfaceCoreAnimationCompatibilityKey)
561 };
562 let format_key: CFString =
563 unsafe { CFString::wrap_under_get_rule(pixel_buffer::kCVPixelBufferPixelFormatTypeKey) };
564
565 let yes: CFNumber = 1.into();
566 let width: CFNumber = (width as i32).into();
567 let height: CFNumber = (height as i32).into();
568 let format: CFNumber = (kCVPixelFormatType_420YpCbCr8BiPlanarFullRange as i64).into();
569
570 let buffer_attributes = core_foundation::dictionary::CFDictionary::from_CFType_pairs(&[
571 (width_key, width.into_CFType()),
572 (height_key, height.into_CFType()),
573 (animation_key, yes.into_CFType()),
574 (format_key, format.into_CFType()),
575 ]);
576
577 pixel_buffer_pool::CVPixelBufferPool::new(None, Some(&buffer_attributes)).map_err(|cv_return| {
578 anyhow::anyhow!("failed to create pixel buffer pool: CVReturn({cv_return})",)
579 })
580}
581
582#[cfg(target_os = "macos")]
583pub type RemoteVideoFrame = core_video::pixel_buffer::CVPixelBuffer;
584
585#[cfg(target_os = "macos")]
586fn video_frame_buffer_from_webrtc(
587 pool: core_video::pixel_buffer_pool::CVPixelBufferPool,
588 buffer: Box<dyn VideoBuffer>,
589) -> Option<RemoteVideoFrame> {
590 use core_foundation::base::TCFType;
591 use core_video::{pixel_buffer::CVPixelBuffer, r#return::kCVReturnSuccess};
592 use livekit::webrtc::native::yuv_helper::i420_to_nv12;
593
594 if let Some(native) = buffer.as_native() {
595 let pixel_buffer = native.get_cv_pixel_buffer();
596 if pixel_buffer.is_null() {
597 return None;
598 }
599 return unsafe { Some(CVPixelBuffer::wrap_under_get_rule(pixel_buffer as _)) };
600 }
601
602 let i420_buffer = buffer.as_i420()?;
603 let pixel_buffer = pool.create_pixel_buffer().log_err()?;
604
605 let image_buffer = unsafe {
606 if pixel_buffer.lock_base_address(0) != kCVReturnSuccess {
607 return None;
608 }
609
610 let dst_y = pixel_buffer.get_base_address_of_plane(0);
611 let dst_y_stride = pixel_buffer.get_bytes_per_row_of_plane(0);
612 let dst_y_len = pixel_buffer.get_height_of_plane(0) * dst_y_stride;
613 let dst_uv = pixel_buffer.get_base_address_of_plane(1);
614 let dst_uv_stride = pixel_buffer.get_bytes_per_row_of_plane(1);
615 let dst_uv_len = pixel_buffer.get_height_of_plane(1) * dst_uv_stride;
616 let width = pixel_buffer.get_width();
617 let height = pixel_buffer.get_height();
618 let dst_y_buffer = std::slice::from_raw_parts_mut(dst_y as *mut u8, dst_y_len);
619 let dst_uv_buffer = std::slice::from_raw_parts_mut(dst_uv as *mut u8, dst_uv_len);
620
621 let (stride_y, stride_u, stride_v) = i420_buffer.strides();
622 let (src_y, src_u, src_v) = i420_buffer.data();
623 i420_to_nv12(
624 src_y,
625 stride_y,
626 src_u,
627 stride_u,
628 src_v,
629 stride_v,
630 dst_y_buffer,
631 dst_y_stride as u32,
632 dst_uv_buffer,
633 dst_uv_stride as u32,
634 width as i32,
635 height as i32,
636 );
637
638 if pixel_buffer.unlock_base_address(0) != kCVReturnSuccess {
639 return None;
640 }
641
642 pixel_buffer
643 };
644
645 Some(image_buffer)
646}
647
648#[cfg(not(target_os = "macos"))]
649pub type RemoteVideoFrame = Arc<gpui::RenderImage>;
650
651#[cfg(not(target_os = "macos"))]
652fn video_frame_buffer_from_webrtc(buffer: Box<dyn VideoBuffer>) -> Option<RemoteVideoFrame> {
653 use gpui::RenderImage;
654 use image::{Frame, RgbaImage};
655 use livekit::webrtc::prelude::VideoFormatType;
656 use smallvec::SmallVec;
657 use std::alloc::{Layout, alloc};
658
659 let width = buffer.width();
660 let height = buffer.height();
661 let stride = width * 4;
662 let byte_len = (stride * height) as usize;
663 let argb_image = unsafe {
664 // Motivation for this unsafe code is to avoid initializing the frame data, since to_argb
665 // will write all bytes anyway.
666 let start_ptr = alloc(Layout::array::<u8>(byte_len).log_err()?);
667 if start_ptr.is_null() {
668 return None;
669 }
670 let argb_frame_slice = std::slice::from_raw_parts_mut(start_ptr, byte_len);
671 buffer.to_argb(
672 VideoFormatType::ARGB,
673 argb_frame_slice,
674 stride,
675 width as i32,
676 height as i32,
677 );
678 Vec::from_raw_parts(start_ptr, byte_len, byte_len)
679 };
680
681 // TODO: Unclear why providing argb_image to RgbaImage works properly.
682 let image = RgbaImage::from_raw(width, height, argb_image)
683 .with_context(|| "Bug: not enough bytes allocated for image.")
684 .log_err()?;
685
686 Some(Arc::new(RenderImage::new(SmallVec::from_elem(
687 Frame::new(image),
688 1,
689 ))))
690}
691
692#[cfg(target_os = "macos")]
693fn video_frame_buffer_to_webrtc(frame: ScreenCaptureFrame) -> Option<impl AsRef<dyn VideoBuffer>> {
694 use livekit::webrtc;
695
696 let pixel_buffer = frame.0.as_concrete_TypeRef();
697 std::mem::forget(frame.0);
698 unsafe {
699 Some(webrtc::video_frame::native::NativeBuffer::from_cv_pixel_buffer(pixel_buffer as _))
700 }
701}
702
703#[cfg(not(target_os = "macos"))]
704fn video_frame_buffer_to_webrtc(frame: ScreenCaptureFrame) -> Option<impl AsRef<dyn VideoBuffer>> {
705 use libwebrtc::native::yuv_helper::{abgr_to_nv12, argb_to_nv12};
706 use livekit::webrtc::prelude::NV12Buffer;
707 match frame.0 {
708 scap::frame::Frame::BGRx(frame) => {
709 let mut buffer = NV12Buffer::new(frame.width as u32, frame.height as u32);
710 let (stride_y, stride_uv) = buffer.strides();
711 let (data_y, data_uv) = buffer.data_mut();
712 argb_to_nv12(
713 &frame.data,
714 frame.width as u32 * 4,
715 data_y,
716 stride_y,
717 data_uv,
718 stride_uv,
719 frame.width,
720 frame.height,
721 );
722 Some(buffer)
723 }
724 scap::frame::Frame::RGBx(frame) => {
725 let mut buffer = NV12Buffer::new(frame.width as u32, frame.height as u32);
726 let (stride_y, stride_uv) = buffer.strides();
727 let (data_y, data_uv) = buffer.data_mut();
728 abgr_to_nv12(
729 &frame.data,
730 frame.width as u32 * 4,
731 data_y,
732 stride_y,
733 data_uv,
734 stride_uv,
735 frame.width,
736 frame.height,
737 );
738 Some(buffer)
739 }
740 scap::frame::Frame::YUVFrame(yuvframe) => {
741 let mut buffer = NV12Buffer::with_strides(
742 yuvframe.width as u32,
743 yuvframe.height as u32,
744 yuvframe.luminance_stride as u32,
745 yuvframe.chrominance_stride as u32,
746 );
747 let (luminance, chrominance) = buffer.data_mut();
748 luminance.copy_from_slice(yuvframe.luminance_bytes.as_slice());
749 chrominance.copy_from_slice(yuvframe.chrominance_bytes.as_slice());
750 Some(buffer)
751 }
752 _ => {
753 log::error!(
754 "Expected BGRx or YUV frame from scap screen capture but got some other format."
755 );
756 None
757 }
758 }
759}
760
761trait DeviceChangeListenerApi: Stream<Item = ()> + Sized {
762 fn new(input: bool) -> Result<Self>;
763}
764
765#[cfg(target_os = "macos")]
766mod macos {
767
768 use coreaudio::sys::{
769 AudioObjectAddPropertyListener, AudioObjectID, AudioObjectPropertyAddress,
770 AudioObjectRemovePropertyListener, OSStatus, kAudioHardwarePropertyDefaultInputDevice,
771 kAudioHardwarePropertyDefaultOutputDevice, kAudioObjectPropertyElementMaster,
772 kAudioObjectPropertyScopeGlobal, kAudioObjectSystemObject,
773 };
774 use futures::{StreamExt, channel::mpsc::UnboundedReceiver};
775
776 /// Implementation from: https://github.com/zed-industries/cpal/blob/fd8bc2fd39f1f5fdee5a0690656caff9a26d9d50/src/host/coreaudio/macos/property_listener.rs#L15
777 pub struct CoreAudioDefaultDeviceChangeListener {
778 rx: UnboundedReceiver<()>,
779 callback: Box<PropertyListenerCallbackWrapper>,
780 input: bool,
781 device_id: AudioObjectID, // Store the device ID to properly remove listeners
782 }
783
784 trait _AssertSend: Send {}
785 impl _AssertSend for CoreAudioDefaultDeviceChangeListener {}
786
787 struct PropertyListenerCallbackWrapper(Box<dyn FnMut() + Send>);
788
789 unsafe extern "C" fn property_listener_handler_shim(
790 _: AudioObjectID,
791 _: u32,
792 _: *const AudioObjectPropertyAddress,
793 callback: *mut ::std::os::raw::c_void,
794 ) -> OSStatus {
795 let wrapper = callback as *mut PropertyListenerCallbackWrapper;
796 unsafe { (*wrapper).0() };
797 0
798 }
799
800 impl super::DeviceChangeListenerApi for CoreAudioDefaultDeviceChangeListener {
801 fn new(input: bool) -> anyhow::Result<Self> {
802 let (tx, rx) = futures::channel::mpsc::unbounded();
803
804 let callback = Box::new(PropertyListenerCallbackWrapper(Box::new(move || {
805 tx.unbounded_send(()).ok();
806 })));
807
808 // Get the current default device ID
809 let device_id = unsafe {
810 // Listen for default device changes
811 coreaudio::Error::from_os_status(AudioObjectAddPropertyListener(
812 kAudioObjectSystemObject,
813 &AudioObjectPropertyAddress {
814 mSelector: if input {
815 kAudioHardwarePropertyDefaultInputDevice
816 } else {
817 kAudioHardwarePropertyDefaultOutputDevice
818 },
819 mScope: kAudioObjectPropertyScopeGlobal,
820 mElement: kAudioObjectPropertyElementMaster,
821 },
822 Some(property_listener_handler_shim),
823 &*callback as *const _ as *mut _,
824 ))?;
825
826 // Also listen for changes to the device configuration
827 let device_id = if input {
828 let mut input_device: AudioObjectID = 0;
829 let mut prop_size = std::mem::size_of::<AudioObjectID>() as u32;
830 let result = coreaudio::sys::AudioObjectGetPropertyData(
831 kAudioObjectSystemObject,
832 &AudioObjectPropertyAddress {
833 mSelector: kAudioHardwarePropertyDefaultInputDevice,
834 mScope: kAudioObjectPropertyScopeGlobal,
835 mElement: kAudioObjectPropertyElementMaster,
836 },
837 0,
838 std::ptr::null(),
839 &mut prop_size as *mut _,
840 &mut input_device as *mut _ as *mut _,
841 );
842 if result != 0 {
843 log::warn!("Failed to get default input device ID");
844 0
845 } else {
846 input_device
847 }
848 } else {
849 let mut output_device: AudioObjectID = 0;
850 let mut prop_size = std::mem::size_of::<AudioObjectID>() as u32;
851 let result = coreaudio::sys::AudioObjectGetPropertyData(
852 kAudioObjectSystemObject,
853 &AudioObjectPropertyAddress {
854 mSelector: kAudioHardwarePropertyDefaultOutputDevice,
855 mScope: kAudioObjectPropertyScopeGlobal,
856 mElement: kAudioObjectPropertyElementMaster,
857 },
858 0,
859 std::ptr::null(),
860 &mut prop_size as *mut _,
861 &mut output_device as *mut _ as *mut _,
862 );
863 if result != 0 {
864 log::warn!("Failed to get default output device ID");
865 0
866 } else {
867 output_device
868 }
869 };
870
871 if device_id != 0 {
872 // Listen for format changes on the device
873 coreaudio::Error::from_os_status(AudioObjectAddPropertyListener(
874 device_id,
875 &AudioObjectPropertyAddress {
876 mSelector: coreaudio::sys::kAudioDevicePropertyStreamFormat,
877 mScope: if input {
878 coreaudio::sys::kAudioObjectPropertyScopeInput
879 } else {
880 coreaudio::sys::kAudioObjectPropertyScopeOutput
881 },
882 mElement: kAudioObjectPropertyElementMaster,
883 },
884 Some(property_listener_handler_shim),
885 &*callback as *const _ as *mut _,
886 ))?;
887 }
888
889 device_id
890 };
891
892 Ok(Self {
893 rx,
894 callback,
895 input,
896 device_id,
897 })
898 }
899 }
900
901 impl Drop for CoreAudioDefaultDeviceChangeListener {
902 fn drop(&mut self) {
903 unsafe {
904 // Remove the system-level property listener
905 AudioObjectRemovePropertyListener(
906 kAudioObjectSystemObject,
907 &AudioObjectPropertyAddress {
908 mSelector: if self.input {
909 kAudioHardwarePropertyDefaultInputDevice
910 } else {
911 kAudioHardwarePropertyDefaultOutputDevice
912 },
913 mScope: kAudioObjectPropertyScopeGlobal,
914 mElement: kAudioObjectPropertyElementMaster,
915 },
916 Some(property_listener_handler_shim),
917 &*self.callback as *const _ as *mut _,
918 );
919
920 // Remove the device-specific property listener if we have a valid device ID
921 if self.device_id != 0 {
922 AudioObjectRemovePropertyListener(
923 self.device_id,
924 &AudioObjectPropertyAddress {
925 mSelector: coreaudio::sys::kAudioDevicePropertyStreamFormat,
926 mScope: if self.input {
927 coreaudio::sys::kAudioObjectPropertyScopeInput
928 } else {
929 coreaudio::sys::kAudioObjectPropertyScopeOutput
930 },
931 mElement: kAudioObjectPropertyElementMaster,
932 },
933 Some(property_listener_handler_shim),
934 &*self.callback as *const _ as *mut _,
935 );
936 }
937 }
938 }
939 }
940
941 impl futures::Stream for CoreAudioDefaultDeviceChangeListener {
942 type Item = ();
943
944 fn poll_next(
945 mut self: std::pin::Pin<&mut Self>,
946 cx: &mut std::task::Context<'_>,
947 ) -> std::task::Poll<Option<Self::Item>> {
948 self.rx.poll_next_unpin(cx)
949 }
950 }
951}
952
953#[cfg(target_os = "macos")]
954type DeviceChangeListener = macos::CoreAudioDefaultDeviceChangeListener;
955
956#[cfg(not(target_os = "macos"))]
957mod noop_change_listener {
958 use std::task::Poll;
959
960 use super::DeviceChangeListenerApi;
961
962 pub struct NoopOutputDeviceChangelistener {}
963
964 impl DeviceChangeListenerApi for NoopOutputDeviceChangelistener {
965 fn new(_input: bool) -> anyhow::Result<Self> {
966 Ok(NoopOutputDeviceChangelistener {})
967 }
968 }
969
970 impl futures::Stream for NoopOutputDeviceChangelistener {
971 type Item = ();
972
973 fn poll_next(
974 self: std::pin::Pin<&mut Self>,
975 _cx: &mut std::task::Context<'_>,
976 ) -> Poll<Option<Self::Item>> {
977 Poll::Pending
978 }
979 }
980}
981
982#[cfg(not(target_os = "macos"))]
983type DeviceChangeListener = noop_change_listener::NoopOutputDeviceChangelistener;