playback.rs

  1use anyhow::{Context as _, Result};
  2
  3use audio::{AudioSettings, CHANNEL_COUNT, SAMPLE_RATE};
  4use cpal::traits::{DeviceTrait, StreamTrait as _};
  5use futures::channel::mpsc::UnboundedSender;
  6use futures::{Stream, StreamExt as _};
  7use gpui::{
  8    AsyncApp, BackgroundExecutor, ScreenCaptureFrame, ScreenCaptureSource, ScreenCaptureStream,
  9    Task,
 10};
 11use libwebrtc::native::{apm, audio_mixer, audio_resampler};
 12use livekit::track;
 13
 14use livekit::webrtc::{
 15    audio_frame::AudioFrame,
 16    audio_source::{AudioSourceOptions, RtcAudioSource, native::NativeAudioSource},
 17    audio_stream::native::NativeAudioStream,
 18    video_frame::{VideoBuffer, VideoFrame, VideoRotation},
 19    video_source::{RtcVideoSource, VideoResolution, native::NativeVideoSource},
 20    video_stream::native::NativeVideoStream,
 21};
 22use log::info;
 23use parking_lot::Mutex;
 24use rodio::Source;
 25use serde::{Deserialize, Serialize};
 26use settings::Settings;
 27use std::cell::RefCell;
 28use std::sync::Weak;
 29use std::sync::atomic::{AtomicBool, AtomicI32, Ordering};
 30use std::time::Duration;
 31use std::{borrow::Cow, collections::VecDeque, sync::Arc, thread};
 32use util::{ResultExt as _, maybe};
 33
 34mod source;
 35
 36pub(crate) struct AudioStack {
 37    executor: BackgroundExecutor,
 38    apm: Arc<Mutex<apm::AudioProcessingModule>>,
 39    mixer: Arc<Mutex<audio_mixer::AudioMixer>>,
 40    _output_task: RefCell<Weak<Task<()>>>,
 41    next_ssrc: AtomicI32,
 42}
 43
 44pub(crate) fn play_remote_audio_track(
 45    track: &livekit::track::RemoteAudioTrack,
 46    cx: &mut gpui::App,
 47) -> Result<AudioStream> {
 48    let stop_handle = Arc::new(AtomicBool::new(false));
 49    let stop_handle_clone = stop_handle.clone();
 50    let stream = source::LiveKitStream::new(cx.background_executor(), track);
 51
 52    let stream = stream
 53        .stoppable()
 54        .periodic_access(Duration::from_millis(50), move |s| {
 55            if stop_handle.load(Ordering::Relaxed) {
 56                s.stop();
 57            }
 58        });
 59
 60    let speaker: Speaker = serde_urlencoded::from_str(&track.name()).unwrap_or_else(|_| Speaker {
 61        name: track.name(),
 62        is_staff: false,
 63    });
 64    audio::Audio::play_voip_stream(stream, speaker.name, speaker.is_staff, cx)
 65        .context("Could not play audio")?;
 66
 67    let on_drop = util::defer(move || {
 68        stop_handle_clone.store(true, Ordering::Relaxed);
 69    });
 70    Ok(AudioStream::Output {
 71        _drop: Box::new(on_drop),
 72    })
 73}
 74
 75impl AudioStack {
 76    pub(crate) fn new(executor: BackgroundExecutor) -> Self {
 77        let apm = Arc::new(Mutex::new(apm::AudioProcessingModule::new(
 78            true, true, true, true,
 79        )));
 80        let mixer = Arc::new(Mutex::new(audio_mixer::AudioMixer::new()));
 81        Self {
 82            executor,
 83            apm,
 84            mixer,
 85            _output_task: RefCell::new(Weak::new()),
 86            next_ssrc: AtomicI32::new(1),
 87        }
 88    }
 89
 90    pub(crate) fn play_remote_audio_track(
 91        &self,
 92        track: &livekit::track::RemoteAudioTrack,
 93    ) -> AudioStream {
 94        let output_task = self.start_output();
 95
 96        let next_ssrc = self.next_ssrc.fetch_add(1, Ordering::Relaxed);
 97        let source = AudioMixerSource {
 98            ssrc: next_ssrc,
 99            sample_rate: SAMPLE_RATE.get(),
100            num_channels: CHANNEL_COUNT.get() as u32,
101            buffer: Arc::default(),
102        };
103        self.mixer.lock().add_source(source.clone());
104
105        let mut stream = NativeAudioStream::new(
106            track.rtc_track(),
107            source.sample_rate as i32,
108            source.num_channels as i32,
109        );
110
111        let receive_task = self.executor.spawn({
112            let source = source.clone();
113            async move {
114                while let Some(frame) = stream.next().await {
115                    source.receive(frame);
116                }
117            }
118        });
119
120        let mixer = self.mixer.clone();
121        let on_drop = util::defer(move || {
122            mixer.lock().remove_source(source.ssrc);
123            drop(receive_task);
124            drop(output_task);
125        });
126
127        AudioStream::Output {
128            _drop: Box::new(on_drop),
129        }
130    }
131
132    fn start_output(&self) -> Arc<Task<()>> {
133        if let Some(task) = self._output_task.borrow().upgrade() {
134            return task;
135        }
136        let task = Arc::new(self.executor.spawn({
137            let apm = self.apm.clone();
138            let mixer = self.mixer.clone();
139            async move {
140                Self::play_output(apm, mixer, SAMPLE_RATE.get(), CHANNEL_COUNT.get().into())
141                    .await
142                    .log_err();
143            }
144        }));
145        *self._output_task.borrow_mut() = Arc::downgrade(&task);
146        task
147    }
148
149    pub(crate) fn capture_local_microphone_track(
150        &self,
151        user_name: String,
152        is_staff: bool,
153        cx: &AsyncApp,
154    ) -> Result<(crate::LocalAudioTrack, AudioStream)> {
155        let source = NativeAudioSource::new(
156            // n.b. this struct's options are always ignored, noise cancellation is provided by apm.
157            AudioSourceOptions::default(),
158            SAMPLE_RATE.get(),
159            CHANNEL_COUNT.get().into(),
160            10,
161        );
162
163        let track_name = serde_urlencoded::to_string(Speaker {
164            name: user_name,
165            is_staff,
166        })
167        .context("Could not encode user information in track name")?;
168
169        let track = track::LocalAudioTrack::create_audio_track(
170            &track_name,
171            RtcAudioSource::Native(source.clone()),
172        );
173
174        let apm = self.apm.clone();
175
176        let (frame_tx, mut frame_rx) = futures::channel::mpsc::unbounded();
177        let transmit_task = self.executor.spawn({
178            async move {
179                while let Some(frame) = frame_rx.next().await {
180                    source.capture_frame(&frame).await.log_err();
181                }
182            }
183        });
184        let rodio_pipeline =
185            AudioSettings::try_read_global(cx, |setting| setting.rodio_audio).unwrap_or_default();
186        let capture_task = if rodio_pipeline {
187            info!("Using experimental.rodio_audio audio pipeline");
188            let voip_parts = audio::VoipParts::new(cx)?;
189            // Audio needs to run real-time and should never be paused. That is why we are using a
190            // normal std::thread and not a background task
191            thread::spawn(move || {
192                // microphone is non send on mac
193                let microphone = audio::Audio::open_microphone(voip_parts)?;
194                send_to_livekit(frame_tx, microphone);
195                Ok::<(), anyhow::Error>(())
196            });
197            Task::ready(Ok(()))
198        } else {
199            self.executor.spawn(async move {
200                Self::capture_input(apm, frame_tx, SAMPLE_RATE.get(), CHANNEL_COUNT.get().into())
201                    .await
202            })
203        };
204
205        let on_drop = util::defer(|| {
206            drop(transmit_task);
207            drop(capture_task);
208        });
209        Ok((
210            super::LocalAudioTrack(track),
211            AudioStream::Output {
212                _drop: Box::new(on_drop),
213            },
214        ))
215    }
216
217    async fn play_output(
218        apm: Arc<Mutex<apm::AudioProcessingModule>>,
219        mixer: Arc<Mutex<audio_mixer::AudioMixer>>,
220        sample_rate: u32,
221        num_channels: u32,
222    ) -> Result<()> {
223        loop {
224            let mut device_change_listener = DeviceChangeListener::new(false)?;
225            let (output_device, output_config) = crate::default_device(false)?;
226            let (end_on_drop_tx, end_on_drop_rx) = std::sync::mpsc::channel::<()>();
227            let mixer = mixer.clone();
228            let apm = apm.clone();
229            let mut resampler = audio_resampler::AudioResampler::default();
230            let mut buf = Vec::new();
231
232            thread::spawn(move || {
233                let output_stream = output_device.build_output_stream(
234                    &output_config.config(),
235                    {
236                        move |mut data, _info| {
237                            while data.len() > 0 {
238                                if data.len() <= buf.len() {
239                                    let rest = buf.split_off(data.len());
240                                    data.copy_from_slice(&buf);
241                                    buf = rest;
242                                    return;
243                                }
244                                if buf.len() > 0 {
245                                    let (prefix, suffix) = data.split_at_mut(buf.len());
246                                    prefix.copy_from_slice(&buf);
247                                    data = suffix;
248                                }
249
250                                let mut mixer = mixer.lock();
251                                let mixed = mixer.mix(output_config.channels() as usize);
252                                let sampled = resampler.remix_and_resample(
253                                    mixed,
254                                    sample_rate / 100,
255                                    num_channels,
256                                    sample_rate,
257                                    output_config.channels() as u32,
258                                    output_config.sample_rate().0,
259                                );
260                                buf = sampled.to_vec();
261                                apm.lock()
262                                    .process_reverse_stream(
263                                        &mut buf,
264                                        output_config.sample_rate().0 as i32,
265                                        output_config.channels() as i32,
266                                    )
267                                    .ok();
268                            }
269                        }
270                    },
271                    |error| log::error!("error playing audio track: {:?}", error),
272                    Some(Duration::from_millis(100)),
273                );
274
275                let Some(output_stream) = output_stream.log_err() else {
276                    return;
277                };
278
279                output_stream.play().log_err();
280                // Block forever to keep the output stream alive
281                end_on_drop_rx.recv().ok();
282            });
283
284            device_change_listener.next().await;
285            drop(end_on_drop_tx)
286        }
287    }
288
289    async fn capture_input(
290        apm: Arc<Mutex<apm::AudioProcessingModule>>,
291        frame_tx: UnboundedSender<AudioFrame<'static>>,
292        sample_rate: u32,
293        num_channels: u32,
294    ) -> Result<()> {
295        loop {
296            let mut device_change_listener = DeviceChangeListener::new(true)?;
297            let (device, config) = crate::default_device(true)?;
298            let (end_on_drop_tx, end_on_drop_rx) = std::sync::mpsc::channel::<()>();
299            let apm = apm.clone();
300            let frame_tx = frame_tx.clone();
301            let mut resampler = audio_resampler::AudioResampler::default();
302
303            thread::spawn(move || {
304                maybe!({
305                    if let Some(name) = device.name().ok() {
306                        log::info!("Using microphone: {}", name)
307                    } else {
308                        log::info!("Using microphone: <unknown>");
309                    }
310
311                    let ten_ms_buffer_size =
312                        (config.channels() as u32 * config.sample_rate().0 / 100) as usize;
313                    let mut buf: Vec<i16> = Vec::with_capacity(ten_ms_buffer_size);
314
315                    let stream = device
316                        .build_input_stream_raw(
317                            &config.config(),
318                            config.sample_format(),
319                            move |data, _: &_| {
320                                let data =
321                                    crate::get_sample_data(config.sample_format(), data).log_err();
322                                let Some(data) = data else {
323                                    return;
324                                };
325                                let mut data = data.as_slice();
326
327                                while data.len() > 0 {
328                                    let remainder = (buf.capacity() - buf.len()).min(data.len());
329                                    buf.extend_from_slice(&data[..remainder]);
330                                    data = &data[remainder..];
331
332                                    if buf.capacity() == buf.len() {
333                                        let mut sampled = resampler
334                                            .remix_and_resample(
335                                                buf.as_slice(),
336                                                config.sample_rate().0 / 100,
337                                                config.channels() as u32,
338                                                config.sample_rate().0,
339                                                num_channels,
340                                                sample_rate,
341                                            )
342                                            .to_owned();
343                                        apm.lock()
344                                            .process_stream(
345                                                &mut sampled,
346                                                sample_rate as i32,
347                                                num_channels as i32,
348                                            )
349                                            .log_err();
350                                        buf.clear();
351                                        frame_tx
352                                            .unbounded_send(AudioFrame {
353                                                data: Cow::Owned(sampled),
354                                                sample_rate,
355                                                num_channels,
356                                                samples_per_channel: sample_rate / 100,
357                                            })
358                                            .ok();
359                                    }
360                                }
361                            },
362                            |err| log::error!("error capturing audio track: {:?}", err),
363                            Some(Duration::from_millis(100)),
364                        )
365                        .context("failed to build input stream")?;
366
367                    stream.play()?;
368                    // Keep the thread alive and holding onto the `stream`
369                    end_on_drop_rx.recv().ok();
370                    anyhow::Ok(Some(()))
371                })
372                .log_err();
373            });
374
375            device_change_listener.next().await;
376            drop(end_on_drop_tx)
377        }
378    }
379}
380
381#[derive(Serialize, Deserialize)]
382struct Speaker {
383    name: String,
384    is_staff: bool,
385}
386
387fn send_to_livekit(frame_tx: UnboundedSender<AudioFrame<'static>>, mut microphone: impl Source) {
388    use cpal::Sample;
389    loop {
390        let sampled: Vec<_> = microphone
391            .by_ref()
392            .take(audio::BUFFER_SIZE)
393            .map(|s| s.to_sample())
394            .collect();
395
396        if frame_tx
397            .unbounded_send(AudioFrame {
398                sample_rate: SAMPLE_RATE.get(),
399                num_channels: CHANNEL_COUNT.get() as u32,
400                samples_per_channel: sampled.len() as u32 / CHANNEL_COUNT.get() as u32,
401                data: Cow::Owned(sampled),
402            })
403            .is_err()
404        {
405            // must rx has dropped or is not consuming
406            break;
407        }
408    }
409}
410
411use super::LocalVideoTrack;
412
413pub enum AudioStream {
414    Input { _task: Task<()> },
415    Output { _drop: Box<dyn std::any::Any> },
416}
417
418pub(crate) async fn capture_local_video_track(
419    capture_source: &dyn ScreenCaptureSource,
420    cx: &mut gpui::AsyncApp,
421) -> Result<(crate::LocalVideoTrack, Box<dyn ScreenCaptureStream>)> {
422    let metadata = capture_source.metadata()?;
423    let track_source = gpui_tokio::Tokio::spawn(cx, async move {
424        NativeVideoSource::new(VideoResolution {
425            width: metadata.resolution.width.0 as u32,
426            height: metadata.resolution.height.0 as u32,
427        })
428    })?
429    .await?;
430
431    let capture_stream = capture_source
432        .stream(cx.foreground_executor(), {
433            let track_source = track_source.clone();
434            Box::new(move |frame| {
435                if let Some(buffer) = video_frame_buffer_to_webrtc(frame) {
436                    track_source.capture_frame(&VideoFrame {
437                        rotation: VideoRotation::VideoRotation0,
438                        timestamp_us: 0,
439                        buffer,
440                    });
441                }
442            })
443        })
444        .await??;
445
446    Ok((
447        LocalVideoTrack(track::LocalVideoTrack::create_video_track(
448            "screen share",
449            RtcVideoSource::Native(track_source),
450        )),
451        capture_stream,
452    ))
453}
454
455#[derive(Clone)]
456struct AudioMixerSource {
457    ssrc: i32,
458    sample_rate: u32,
459    num_channels: u32,
460    buffer: Arc<Mutex<VecDeque<Vec<i16>>>>,
461}
462
463impl AudioMixerSource {
464    fn receive(&self, frame: AudioFrame) {
465        assert_eq!(
466            frame.data.len() as u32,
467            self.sample_rate * self.num_channels / 100
468        );
469
470        let mut buffer = self.buffer.lock();
471        buffer.push_back(frame.data.to_vec());
472        while buffer.len() > 10 {
473            buffer.pop_front();
474        }
475    }
476}
477
478impl libwebrtc::native::audio_mixer::AudioMixerSource for AudioMixerSource {
479    fn ssrc(&self) -> i32 {
480        self.ssrc
481    }
482
483    fn preferred_sample_rate(&self) -> u32 {
484        self.sample_rate
485    }
486
487    fn get_audio_frame_with_info<'a>(&self, target_sample_rate: u32) -> Option<AudioFrame<'_>> {
488        assert_eq!(self.sample_rate, target_sample_rate);
489        let buf = self.buffer.lock().pop_front()?;
490        Some(AudioFrame {
491            data: Cow::Owned(buf),
492            sample_rate: self.sample_rate,
493            num_channels: self.num_channels,
494            samples_per_channel: self.sample_rate / 100,
495        })
496    }
497}
498
499pub fn play_remote_video_track(
500    track: &crate::RemoteVideoTrack,
501) -> impl Stream<Item = RemoteVideoFrame> + use<> {
502    #[cfg(target_os = "macos")]
503    {
504        let mut pool = None;
505        let most_recent_frame_size = (0, 0);
506        NativeVideoStream::new(track.0.rtc_track()).filter_map(move |frame| {
507            if pool == None
508                || most_recent_frame_size != (frame.buffer.width(), frame.buffer.height())
509            {
510                pool = create_buffer_pool(frame.buffer.width(), frame.buffer.height()).log_err();
511            }
512            let pool = pool.clone();
513            async move {
514                if frame.buffer.width() < 10 && frame.buffer.height() < 10 {
515                    // when the remote stops sharing, we get an 8x8 black image.
516                    // In a lil bit, the unpublish will come through and close the view,
517                    // but until then, don't flash black.
518                    return None;
519                }
520
521                video_frame_buffer_from_webrtc(pool?, frame.buffer)
522            }
523        })
524    }
525    #[cfg(not(target_os = "macos"))]
526    {
527        NativeVideoStream::new(track.0.rtc_track())
528            .filter_map(|frame| async move { video_frame_buffer_from_webrtc(frame.buffer) })
529    }
530}
531
532#[cfg(target_os = "macos")]
533fn create_buffer_pool(
534    width: u32,
535    height: u32,
536) -> Result<core_video::pixel_buffer_pool::CVPixelBufferPool> {
537    use core_foundation::{base::TCFType, number::CFNumber, string::CFString};
538    use core_video::pixel_buffer;
539    use core_video::{
540        pixel_buffer::kCVPixelFormatType_420YpCbCr8BiPlanarFullRange,
541        pixel_buffer_io_surface::kCVPixelBufferIOSurfaceCoreAnimationCompatibilityKey,
542        pixel_buffer_pool::{self},
543    };
544
545    let width_key: CFString =
546        unsafe { CFString::wrap_under_get_rule(pixel_buffer::kCVPixelBufferWidthKey) };
547    let height_key: CFString =
548        unsafe { CFString::wrap_under_get_rule(pixel_buffer::kCVPixelBufferHeightKey) };
549    let animation_key: CFString = unsafe {
550        CFString::wrap_under_get_rule(kCVPixelBufferIOSurfaceCoreAnimationCompatibilityKey)
551    };
552    let format_key: CFString =
553        unsafe { CFString::wrap_under_get_rule(pixel_buffer::kCVPixelBufferPixelFormatTypeKey) };
554
555    let yes: CFNumber = 1.into();
556    let width: CFNumber = (width as i32).into();
557    let height: CFNumber = (height as i32).into();
558    let format: CFNumber = (kCVPixelFormatType_420YpCbCr8BiPlanarFullRange as i64).into();
559
560    let buffer_attributes = core_foundation::dictionary::CFDictionary::from_CFType_pairs(&[
561        (width_key, width.into_CFType()),
562        (height_key, height.into_CFType()),
563        (animation_key, yes.into_CFType()),
564        (format_key, format.into_CFType()),
565    ]);
566
567    pixel_buffer_pool::CVPixelBufferPool::new(None, Some(&buffer_attributes)).map_err(|cv_return| {
568        anyhow::anyhow!("failed to create pixel buffer pool: CVReturn({cv_return})",)
569    })
570}
571
572#[cfg(target_os = "macos")]
573pub type RemoteVideoFrame = core_video::pixel_buffer::CVPixelBuffer;
574
575#[cfg(target_os = "macos")]
576fn video_frame_buffer_from_webrtc(
577    pool: core_video::pixel_buffer_pool::CVPixelBufferPool,
578    buffer: Box<dyn VideoBuffer>,
579) -> Option<RemoteVideoFrame> {
580    use core_foundation::base::TCFType;
581    use core_video::{pixel_buffer::CVPixelBuffer, r#return::kCVReturnSuccess};
582    use livekit::webrtc::native::yuv_helper::i420_to_nv12;
583
584    if let Some(native) = buffer.as_native() {
585        let pixel_buffer = native.get_cv_pixel_buffer();
586        if pixel_buffer.is_null() {
587            return None;
588        }
589        return unsafe { Some(CVPixelBuffer::wrap_under_get_rule(pixel_buffer as _)) };
590    }
591
592    let i420_buffer = buffer.as_i420()?;
593    let pixel_buffer = pool.create_pixel_buffer().log_err()?;
594
595    let image_buffer = unsafe {
596        if pixel_buffer.lock_base_address(0) != kCVReturnSuccess {
597            return None;
598        }
599
600        let dst_y = pixel_buffer.get_base_address_of_plane(0);
601        let dst_y_stride = pixel_buffer.get_bytes_per_row_of_plane(0);
602        let dst_y_len = pixel_buffer.get_height_of_plane(0) * dst_y_stride;
603        let dst_uv = pixel_buffer.get_base_address_of_plane(1);
604        let dst_uv_stride = pixel_buffer.get_bytes_per_row_of_plane(1);
605        let dst_uv_len = pixel_buffer.get_height_of_plane(1) * dst_uv_stride;
606        let width = pixel_buffer.get_width();
607        let height = pixel_buffer.get_height();
608        let dst_y_buffer = std::slice::from_raw_parts_mut(dst_y as *mut u8, dst_y_len);
609        let dst_uv_buffer = std::slice::from_raw_parts_mut(dst_uv as *mut u8, dst_uv_len);
610
611        let (stride_y, stride_u, stride_v) = i420_buffer.strides();
612        let (src_y, src_u, src_v) = i420_buffer.data();
613        i420_to_nv12(
614            src_y,
615            stride_y,
616            src_u,
617            stride_u,
618            src_v,
619            stride_v,
620            dst_y_buffer,
621            dst_y_stride as u32,
622            dst_uv_buffer,
623            dst_uv_stride as u32,
624            width as i32,
625            height as i32,
626        );
627
628        if pixel_buffer.unlock_base_address(0) != kCVReturnSuccess {
629            return None;
630        }
631
632        pixel_buffer
633    };
634
635    Some(image_buffer)
636}
637
638#[cfg(not(target_os = "macos"))]
639pub type RemoteVideoFrame = Arc<gpui::RenderImage>;
640
641#[cfg(not(target_os = "macos"))]
642fn video_frame_buffer_from_webrtc(buffer: Box<dyn VideoBuffer>) -> Option<RemoteVideoFrame> {
643    use gpui::RenderImage;
644    use image::{Frame, RgbaImage};
645    use livekit::webrtc::prelude::VideoFormatType;
646    use smallvec::SmallVec;
647    use std::alloc::{Layout, alloc};
648
649    let width = buffer.width();
650    let height = buffer.height();
651    let stride = width * 4;
652    let byte_len = (stride * height) as usize;
653    let argb_image = unsafe {
654        // Motivation for this unsafe code is to avoid initializing the frame data, since to_argb
655        // will write all bytes anyway.
656        let start_ptr = alloc(Layout::array::<u8>(byte_len).log_err()?);
657        if start_ptr.is_null() {
658            return None;
659        }
660        let argb_frame_slice = std::slice::from_raw_parts_mut(start_ptr, byte_len);
661        buffer.to_argb(
662            VideoFormatType::ARGB,
663            argb_frame_slice,
664            stride,
665            width as i32,
666            height as i32,
667        );
668        Vec::from_raw_parts(start_ptr, byte_len, byte_len)
669    };
670
671    // TODO: Unclear why providing argb_image to RgbaImage works properly.
672    let image = RgbaImage::from_raw(width, height, argb_image)
673        .with_context(|| "Bug: not enough bytes allocated for image.")
674        .log_err()?;
675
676    Some(Arc::new(RenderImage::new(SmallVec::from_elem(
677        Frame::new(image),
678        1,
679    ))))
680}
681
682#[cfg(target_os = "macos")]
683fn video_frame_buffer_to_webrtc(frame: ScreenCaptureFrame) -> Option<impl AsRef<dyn VideoBuffer>> {
684    use livekit::webrtc;
685
686    let pixel_buffer = frame.0.as_concrete_TypeRef();
687    std::mem::forget(frame.0);
688    unsafe {
689        Some(webrtc::video_frame::native::NativeBuffer::from_cv_pixel_buffer(pixel_buffer as _))
690    }
691}
692
693#[cfg(not(target_os = "macos"))]
694fn video_frame_buffer_to_webrtc(frame: ScreenCaptureFrame) -> Option<impl AsRef<dyn VideoBuffer>> {
695    use libwebrtc::native::yuv_helper::{abgr_to_nv12, argb_to_nv12};
696    use livekit::webrtc::prelude::NV12Buffer;
697    match frame.0 {
698        scap::frame::Frame::BGRx(frame) => {
699            let mut buffer = NV12Buffer::new(frame.width as u32, frame.height as u32);
700            let (stride_y, stride_uv) = buffer.strides();
701            let (data_y, data_uv) = buffer.data_mut();
702            argb_to_nv12(
703                &frame.data,
704                frame.width as u32 * 4,
705                data_y,
706                stride_y,
707                data_uv,
708                stride_uv,
709                frame.width,
710                frame.height,
711            );
712            Some(buffer)
713        }
714        scap::frame::Frame::RGBx(frame) => {
715            let mut buffer = NV12Buffer::new(frame.width as u32, frame.height as u32);
716            let (stride_y, stride_uv) = buffer.strides();
717            let (data_y, data_uv) = buffer.data_mut();
718            abgr_to_nv12(
719                &frame.data,
720                frame.width as u32 * 4,
721                data_y,
722                stride_y,
723                data_uv,
724                stride_uv,
725                frame.width,
726                frame.height,
727            );
728            Some(buffer)
729        }
730        scap::frame::Frame::YUVFrame(yuvframe) => {
731            let mut buffer = NV12Buffer::with_strides(
732                yuvframe.width as u32,
733                yuvframe.height as u32,
734                yuvframe.luminance_stride as u32,
735                yuvframe.chrominance_stride as u32,
736            );
737            let (luminance, chrominance) = buffer.data_mut();
738            luminance.copy_from_slice(yuvframe.luminance_bytes.as_slice());
739            chrominance.copy_from_slice(yuvframe.chrominance_bytes.as_slice());
740            Some(buffer)
741        }
742        _ => {
743            log::error!(
744                "Expected BGRx or YUV frame from scap screen capture but got some other format."
745            );
746            None
747        }
748    }
749}
750
751trait DeviceChangeListenerApi: Stream<Item = ()> + Sized {
752    fn new(input: bool) -> Result<Self>;
753}
754
755#[cfg(target_os = "macos")]
756mod macos {
757
758    use coreaudio::sys::{
759        AudioObjectAddPropertyListener, AudioObjectID, AudioObjectPropertyAddress,
760        AudioObjectRemovePropertyListener, OSStatus, kAudioHardwarePropertyDefaultInputDevice,
761        kAudioHardwarePropertyDefaultOutputDevice, kAudioObjectPropertyElementMaster,
762        kAudioObjectPropertyScopeGlobal, kAudioObjectSystemObject,
763    };
764    use futures::{StreamExt, channel::mpsc::UnboundedReceiver};
765
766    /// Implementation from: https://github.com/zed-industries/cpal/blob/fd8bc2fd39f1f5fdee5a0690656caff9a26d9d50/src/host/coreaudio/macos/property_listener.rs#L15
767    pub struct CoreAudioDefaultDeviceChangeListener {
768        rx: UnboundedReceiver<()>,
769        callback: Box<PropertyListenerCallbackWrapper>,
770        input: bool,
771        device_id: AudioObjectID, // Store the device ID to properly remove listeners
772    }
773
774    trait _AssertSend: Send {}
775    impl _AssertSend for CoreAudioDefaultDeviceChangeListener {}
776
777    struct PropertyListenerCallbackWrapper(Box<dyn FnMut() + Send>);
778
779    unsafe extern "C" fn property_listener_handler_shim(
780        _: AudioObjectID,
781        _: u32,
782        _: *const AudioObjectPropertyAddress,
783        callback: *mut ::std::os::raw::c_void,
784    ) -> OSStatus {
785        let wrapper = callback as *mut PropertyListenerCallbackWrapper;
786        unsafe { (*wrapper).0() };
787        0
788    }
789
790    impl super::DeviceChangeListenerApi for CoreAudioDefaultDeviceChangeListener {
791        fn new(input: bool) -> anyhow::Result<Self> {
792            let (tx, rx) = futures::channel::mpsc::unbounded();
793
794            let callback = Box::new(PropertyListenerCallbackWrapper(Box::new(move || {
795                tx.unbounded_send(()).ok();
796            })));
797
798            // Get the current default device ID
799            let device_id = unsafe {
800                // Listen for default device changes
801                coreaudio::Error::from_os_status(AudioObjectAddPropertyListener(
802                    kAudioObjectSystemObject,
803                    &AudioObjectPropertyAddress {
804                        mSelector: if input {
805                            kAudioHardwarePropertyDefaultInputDevice
806                        } else {
807                            kAudioHardwarePropertyDefaultOutputDevice
808                        },
809                        mScope: kAudioObjectPropertyScopeGlobal,
810                        mElement: kAudioObjectPropertyElementMaster,
811                    },
812                    Some(property_listener_handler_shim),
813                    &*callback as *const _ as *mut _,
814                ))?;
815
816                // Also listen for changes to the device configuration
817                let device_id = if input {
818                    let mut input_device: AudioObjectID = 0;
819                    let mut prop_size = std::mem::size_of::<AudioObjectID>() as u32;
820                    let result = coreaudio::sys::AudioObjectGetPropertyData(
821                        kAudioObjectSystemObject,
822                        &AudioObjectPropertyAddress {
823                            mSelector: kAudioHardwarePropertyDefaultInputDevice,
824                            mScope: kAudioObjectPropertyScopeGlobal,
825                            mElement: kAudioObjectPropertyElementMaster,
826                        },
827                        0,
828                        std::ptr::null(),
829                        &mut prop_size as *mut _,
830                        &mut input_device as *mut _ as *mut _,
831                    );
832                    if result != 0 {
833                        log::warn!("Failed to get default input device ID");
834                        0
835                    } else {
836                        input_device
837                    }
838                } else {
839                    let mut output_device: AudioObjectID = 0;
840                    let mut prop_size = std::mem::size_of::<AudioObjectID>() as u32;
841                    let result = coreaudio::sys::AudioObjectGetPropertyData(
842                        kAudioObjectSystemObject,
843                        &AudioObjectPropertyAddress {
844                            mSelector: kAudioHardwarePropertyDefaultOutputDevice,
845                            mScope: kAudioObjectPropertyScopeGlobal,
846                            mElement: kAudioObjectPropertyElementMaster,
847                        },
848                        0,
849                        std::ptr::null(),
850                        &mut prop_size as *mut _,
851                        &mut output_device as *mut _ as *mut _,
852                    );
853                    if result != 0 {
854                        log::warn!("Failed to get default output device ID");
855                        0
856                    } else {
857                        output_device
858                    }
859                };
860
861                if device_id != 0 {
862                    // Listen for format changes on the device
863                    coreaudio::Error::from_os_status(AudioObjectAddPropertyListener(
864                        device_id,
865                        &AudioObjectPropertyAddress {
866                            mSelector: coreaudio::sys::kAudioDevicePropertyStreamFormat,
867                            mScope: if input {
868                                coreaudio::sys::kAudioObjectPropertyScopeInput
869                            } else {
870                                coreaudio::sys::kAudioObjectPropertyScopeOutput
871                            },
872                            mElement: kAudioObjectPropertyElementMaster,
873                        },
874                        Some(property_listener_handler_shim),
875                        &*callback as *const _ as *mut _,
876                    ))?;
877                }
878
879                device_id
880            };
881
882            Ok(Self {
883                rx,
884                callback,
885                input,
886                device_id,
887            })
888        }
889    }
890
891    impl Drop for CoreAudioDefaultDeviceChangeListener {
892        fn drop(&mut self) {
893            unsafe {
894                // Remove the system-level property listener
895                AudioObjectRemovePropertyListener(
896                    kAudioObjectSystemObject,
897                    &AudioObjectPropertyAddress {
898                        mSelector: if self.input {
899                            kAudioHardwarePropertyDefaultInputDevice
900                        } else {
901                            kAudioHardwarePropertyDefaultOutputDevice
902                        },
903                        mScope: kAudioObjectPropertyScopeGlobal,
904                        mElement: kAudioObjectPropertyElementMaster,
905                    },
906                    Some(property_listener_handler_shim),
907                    &*self.callback as *const _ as *mut _,
908                );
909
910                // Remove the device-specific property listener if we have a valid device ID
911                if self.device_id != 0 {
912                    AudioObjectRemovePropertyListener(
913                        self.device_id,
914                        &AudioObjectPropertyAddress {
915                            mSelector: coreaudio::sys::kAudioDevicePropertyStreamFormat,
916                            mScope: if self.input {
917                                coreaudio::sys::kAudioObjectPropertyScopeInput
918                            } else {
919                                coreaudio::sys::kAudioObjectPropertyScopeOutput
920                            },
921                            mElement: kAudioObjectPropertyElementMaster,
922                        },
923                        Some(property_listener_handler_shim),
924                        &*self.callback as *const _ as *mut _,
925                    );
926                }
927            }
928        }
929    }
930
931    impl futures::Stream for CoreAudioDefaultDeviceChangeListener {
932        type Item = ();
933
934        fn poll_next(
935            mut self: std::pin::Pin<&mut Self>,
936            cx: &mut std::task::Context<'_>,
937        ) -> std::task::Poll<Option<Self::Item>> {
938            self.rx.poll_next_unpin(cx)
939        }
940    }
941}
942
943#[cfg(target_os = "macos")]
944type DeviceChangeListener = macos::CoreAudioDefaultDeviceChangeListener;
945
946#[cfg(not(target_os = "macos"))]
947mod noop_change_listener {
948    use std::task::Poll;
949
950    use super::DeviceChangeListenerApi;
951
952    pub struct NoopOutputDeviceChangelistener {}
953
954    impl DeviceChangeListenerApi for NoopOutputDeviceChangelistener {
955        fn new(_input: bool) -> anyhow::Result<Self> {
956            Ok(NoopOutputDeviceChangelistener {})
957        }
958    }
959
960    impl futures::Stream for NoopOutputDeviceChangelistener {
961        type Item = ();
962
963        fn poll_next(
964            self: std::pin::Pin<&mut Self>,
965            _cx: &mut std::task::Context<'_>,
966        ) -> Poll<Option<Self::Item>> {
967            Poll::Pending
968        }
969    }
970}
971
972#[cfg(not(target_os = "macos"))]
973type DeviceChangeListener = noop_change_listener::NoopOutputDeviceChangelistener;