playback.rs

  1use anyhow::{Context as _, Result};
  2
  3use cpal::traits::{DeviceTrait, HostTrait, StreamTrait as _};
  4use cpal::{Data, FromSample, I24, SampleFormat, SizedSample};
  5use futures::channel::mpsc::UnboundedSender;
  6use futures::{Stream, StreamExt as _};
  7use gpui::{
  8    BackgroundExecutor, ScreenCaptureFrame, ScreenCaptureSource, ScreenCaptureStream, Task,
  9};
 10use libwebrtc::native::{apm, audio_mixer, audio_resampler};
 11use livekit::track;
 12
 13use livekit::webrtc::{
 14    audio_frame::AudioFrame,
 15    audio_source::{AudioSourceOptions, RtcAudioSource, native::NativeAudioSource},
 16    audio_stream::native::NativeAudioStream,
 17    video_frame::{VideoBuffer, VideoFrame, VideoRotation},
 18    video_source::{RtcVideoSource, VideoResolution, native::NativeVideoSource},
 19    video_stream::native::NativeVideoStream,
 20};
 21use parking_lot::Mutex;
 22use std::cell::RefCell;
 23use std::sync::Weak;
 24use std::sync::atomic::{self, AtomicI32};
 25use std::time::Duration;
 26use std::{borrow::Cow, collections::VecDeque, sync::Arc, thread};
 27use util::{ResultExt as _, maybe};
 28
 29pub(crate) struct AudioStack {
 30    executor: BackgroundExecutor,
 31    apm: Arc<Mutex<apm::AudioProcessingModule>>,
 32    mixer: Arc<Mutex<audio_mixer::AudioMixer>>,
 33    _output_task: RefCell<Weak<Task<()>>>,
 34    next_ssrc: AtomicI32,
 35}
 36
 37// NOTE: We use WebRTC's mixer which only supports
 38// 16kHz, 32kHz and 48kHz. As 48 is the most common "next step up"
 39// for audio output devices like speakers/bluetooth, we just hard-code
 40// this; and downsample when we need to.
 41const SAMPLE_RATE: u32 = 48000;
 42const NUM_CHANNELS: u32 = 2;
 43
 44impl AudioStack {
 45    pub(crate) fn new(executor: BackgroundExecutor) -> Self {
 46        let apm = Arc::new(Mutex::new(apm::AudioProcessingModule::new(
 47            true, true, true, true,
 48        )));
 49        let mixer = Arc::new(Mutex::new(audio_mixer::AudioMixer::new()));
 50        Self {
 51            executor,
 52            apm,
 53            mixer,
 54            _output_task: RefCell::new(Weak::new()),
 55            next_ssrc: AtomicI32::new(1),
 56        }
 57    }
 58
 59    pub(crate) fn play_remote_audio_track(
 60        &self,
 61        track: &livekit::track::RemoteAudioTrack,
 62    ) -> AudioStream {
 63        let output_task = self.start_output();
 64
 65        let next_ssrc = self.next_ssrc.fetch_add(1, atomic::Ordering::Relaxed);
 66        let source = AudioMixerSource {
 67            ssrc: next_ssrc,
 68            sample_rate: SAMPLE_RATE,
 69            num_channels: NUM_CHANNELS,
 70            buffer: Arc::default(),
 71        };
 72        self.mixer.lock().add_source(source.clone());
 73
 74        let mut stream = NativeAudioStream::new(
 75            track.rtc_track(),
 76            source.sample_rate as i32,
 77            source.num_channels as i32,
 78        );
 79
 80        let receive_task = self.executor.spawn({
 81            let source = source.clone();
 82            async move {
 83                while let Some(frame) = stream.next().await {
 84                    source.receive(frame);
 85                }
 86            }
 87        });
 88
 89        let mixer = self.mixer.clone();
 90        let on_drop = util::defer(move || {
 91            mixer.lock().remove_source(source.ssrc);
 92            drop(receive_task);
 93            drop(output_task);
 94        });
 95
 96        AudioStream::Output {
 97            _drop: Box::new(on_drop),
 98        }
 99    }
100
101    pub(crate) fn capture_local_microphone_track(
102        &self,
103    ) -> Result<(crate::LocalAudioTrack, AudioStream)> {
104        let source = NativeAudioSource::new(
105            // n.b. this struct's options are always ignored, noise cancellation is provided by apm.
106            AudioSourceOptions::default(),
107            SAMPLE_RATE,
108            NUM_CHANNELS,
109            10,
110        );
111
112        let track = track::LocalAudioTrack::create_audio_track(
113            "microphone",
114            RtcAudioSource::Native(source.clone()),
115        );
116
117        let apm = self.apm.clone();
118
119        let (frame_tx, mut frame_rx) = futures::channel::mpsc::unbounded();
120        let transmit_task = self.executor.spawn({
121            let source = source.clone();
122            async move {
123                while let Some(frame) = frame_rx.next().await {
124                    source.capture_frame(&frame).await.log_err();
125                }
126            }
127        });
128        let capture_task = self.executor.spawn(async move {
129            Self::capture_input(apm, frame_tx, SAMPLE_RATE, NUM_CHANNELS).await
130        });
131
132        let on_drop = util::defer(|| {
133            drop(transmit_task);
134            drop(capture_task);
135        });
136        return Ok((
137            super::LocalAudioTrack(track),
138            AudioStream::Output {
139                _drop: Box::new(on_drop),
140            },
141        ));
142    }
143
144    fn start_output(&self) -> Arc<Task<()>> {
145        if let Some(task) = self._output_task.borrow().upgrade() {
146            return task;
147        }
148        let task = Arc::new(self.executor.spawn({
149            let apm = self.apm.clone();
150            let mixer = self.mixer.clone();
151            async move {
152                Self::play_output(apm, mixer, SAMPLE_RATE, NUM_CHANNELS)
153                    .await
154                    .log_err();
155            }
156        }));
157        *self._output_task.borrow_mut() = Arc::downgrade(&task);
158        task
159    }
160
161    async fn play_output(
162        apm: Arc<Mutex<apm::AudioProcessingModule>>,
163        mixer: Arc<Mutex<audio_mixer::AudioMixer>>,
164        sample_rate: u32,
165        num_channels: u32,
166    ) -> Result<()> {
167        loop {
168            let mut device_change_listener = DeviceChangeListener::new(false)?;
169            let (output_device, output_config) = default_device(false)?;
170            let (end_on_drop_tx, end_on_drop_rx) = std::sync::mpsc::channel::<()>();
171            let mixer = mixer.clone();
172            let apm = apm.clone();
173            let mut resampler = audio_resampler::AudioResampler::default();
174            let mut buf = Vec::new();
175
176            thread::spawn(move || {
177                let output_stream = output_device.build_output_stream(
178                    &output_config.config(),
179                    {
180                        move |mut data, _info| {
181                            while data.len() > 0 {
182                                if data.len() <= buf.len() {
183                                    let rest = buf.split_off(data.len());
184                                    data.copy_from_slice(&buf);
185                                    buf = rest;
186                                    return;
187                                }
188                                if buf.len() > 0 {
189                                    let (prefix, suffix) = data.split_at_mut(buf.len());
190                                    prefix.copy_from_slice(&buf);
191                                    data = suffix;
192                                }
193
194                                let mut mixer = mixer.lock();
195                                let mixed = mixer.mix(output_config.channels() as usize);
196                                let sampled = resampler.remix_and_resample(
197                                    mixed,
198                                    sample_rate / 100,
199                                    num_channels,
200                                    sample_rate,
201                                    output_config.channels() as u32,
202                                    output_config.sample_rate().0,
203                                );
204                                buf = sampled.to_vec();
205                                apm.lock()
206                                    .process_reverse_stream(
207                                        &mut buf,
208                                        output_config.sample_rate().0 as i32,
209                                        output_config.channels() as i32,
210                                    )
211                                    .ok();
212                            }
213                        }
214                    },
215                    |error| log::error!("error playing audio track: {:?}", error),
216                    Some(Duration::from_millis(100)),
217                );
218
219                let Some(output_stream) = output_stream.log_err() else {
220                    return;
221                };
222
223                output_stream.play().log_err();
224                // Block forever to keep the output stream alive
225                end_on_drop_rx.recv().ok();
226            });
227
228            device_change_listener.next().await;
229            drop(end_on_drop_tx)
230        }
231    }
232
233    async fn capture_input(
234        apm: Arc<Mutex<apm::AudioProcessingModule>>,
235        frame_tx: UnboundedSender<AudioFrame<'static>>,
236        sample_rate: u32,
237        num_channels: u32,
238    ) -> Result<()> {
239        loop {
240            let mut device_change_listener = DeviceChangeListener::new(true)?;
241            let (device, config) = default_device(true)?;
242            let (end_on_drop_tx, end_on_drop_rx) = std::sync::mpsc::channel::<()>();
243            let apm = apm.clone();
244            let frame_tx = frame_tx.clone();
245            let mut resampler = audio_resampler::AudioResampler::default();
246
247            thread::spawn(move || {
248                maybe!({
249                    if let Some(name) = device.name().ok() {
250                        log::info!("Using microphone: {}", name)
251                    } else {
252                        log::info!("Using microphone: <unknown>");
253                    }
254
255                    let ten_ms_buffer_size =
256                        (config.channels() as u32 * config.sample_rate().0 / 100) as usize;
257                    let mut buf: Vec<i16> = Vec::with_capacity(ten_ms_buffer_size);
258
259                    let stream = device
260                        .build_input_stream_raw(
261                            &config.config(),
262                            config.sample_format(),
263                            move |data, _: &_| {
264                                let data =
265                                    Self::get_sample_data(config.sample_format(), data).log_err();
266                                let Some(data) = data else {
267                                    return;
268                                };
269                                let mut data = data.as_slice();
270
271                                while data.len() > 0 {
272                                    let remainder = (buf.capacity() - buf.len()).min(data.len());
273                                    buf.extend_from_slice(&data[..remainder]);
274                                    data = &data[remainder..];
275
276                                    if buf.capacity() == buf.len() {
277                                        let mut sampled = resampler
278                                            .remix_and_resample(
279                                                buf.as_slice(),
280                                                config.sample_rate().0 / 100,
281                                                config.channels() as u32,
282                                                config.sample_rate().0,
283                                                num_channels,
284                                                sample_rate,
285                                            )
286                                            .to_owned();
287                                        apm.lock()
288                                            .process_stream(
289                                                &mut sampled,
290                                                sample_rate as i32,
291                                                num_channels as i32,
292                                            )
293                                            .log_err();
294                                        buf.clear();
295                                        frame_tx
296                                            .unbounded_send(AudioFrame {
297                                                data: Cow::Owned(sampled),
298                                                sample_rate,
299                                                num_channels,
300                                                samples_per_channel: sample_rate / 100,
301                                            })
302                                            .ok();
303                                    }
304                                }
305                            },
306                            |err| log::error!("error capturing audio track: {:?}", err),
307                            Some(Duration::from_millis(100)),
308                        )
309                        .context("failed to build input stream")?;
310
311                    stream.play()?;
312                    // Keep the thread alive and holding onto the `stream`
313                    end_on_drop_rx.recv().ok();
314                    anyhow::Ok(Some(()))
315                })
316                .log_err();
317            });
318
319            device_change_listener.next().await;
320            drop(end_on_drop_tx)
321        }
322    }
323
324    fn get_sample_data(sample_format: SampleFormat, data: &Data) -> Result<Vec<i16>> {
325        match sample_format {
326            SampleFormat::I8 => Ok(Self::convert_sample_data::<i8, i16>(data)),
327            SampleFormat::I16 => Ok(data.as_slice::<i16>().unwrap().to_vec()),
328            SampleFormat::I24 => Ok(Self::convert_sample_data::<I24, i16>(data)),
329            SampleFormat::I32 => Ok(Self::convert_sample_data::<i32, i16>(data)),
330            SampleFormat::I64 => Ok(Self::convert_sample_data::<i64, i16>(data)),
331            SampleFormat::U8 => Ok(Self::convert_sample_data::<u8, i16>(data)),
332            SampleFormat::U16 => Ok(Self::convert_sample_data::<u16, i16>(data)),
333            SampleFormat::U32 => Ok(Self::convert_sample_data::<u32, i16>(data)),
334            SampleFormat::U64 => Ok(Self::convert_sample_data::<u64, i16>(data)),
335            SampleFormat::F32 => Ok(Self::convert_sample_data::<f32, i16>(data)),
336            SampleFormat::F64 => Ok(Self::convert_sample_data::<f64, i16>(data)),
337            _ => anyhow::bail!("Unsupported sample format"),
338        }
339    }
340
341    fn convert_sample_data<TSource: SizedSample, TDest: SizedSample + FromSample<TSource>>(
342        data: &Data,
343    ) -> Vec<TDest> {
344        data.as_slice::<TSource>()
345            .unwrap()
346            .iter()
347            .map(|e| e.to_sample::<TDest>())
348            .collect()
349    }
350}
351
352use super::LocalVideoTrack;
353
354pub enum AudioStream {
355    Input { _task: Task<()> },
356    Output { _drop: Box<dyn std::any::Any> },
357}
358
359pub(crate) async fn capture_local_video_track(
360    capture_source: &dyn ScreenCaptureSource,
361    cx: &mut gpui::AsyncApp,
362) -> Result<(crate::LocalVideoTrack, Box<dyn ScreenCaptureStream>)> {
363    let metadata = capture_source.metadata()?;
364    let track_source = gpui_tokio::Tokio::spawn(cx, async move {
365        NativeVideoSource::new(VideoResolution {
366            width: metadata.resolution.width.0 as u32,
367            height: metadata.resolution.height.0 as u32,
368        })
369    })?
370    .await?;
371
372    let capture_stream = capture_source
373        .stream(cx.foreground_executor(), {
374            let track_source = track_source.clone();
375            Box::new(move |frame| {
376                if let Some(buffer) = video_frame_buffer_to_webrtc(frame) {
377                    track_source.capture_frame(&VideoFrame {
378                        rotation: VideoRotation::VideoRotation0,
379                        timestamp_us: 0,
380                        buffer,
381                    });
382                }
383            })
384        })
385        .await??;
386
387    Ok((
388        LocalVideoTrack(track::LocalVideoTrack::create_video_track(
389            "screen share",
390            RtcVideoSource::Native(track_source),
391        )),
392        capture_stream,
393    ))
394}
395
396fn default_device(input: bool) -> Result<(cpal::Device, cpal::SupportedStreamConfig)> {
397    let device;
398    let config;
399    if input {
400        device = cpal::default_host()
401            .default_input_device()
402            .context("no audio input device available")?;
403        config = device
404            .default_input_config()
405            .context("failed to get default input config")?;
406    } else {
407        device = cpal::default_host()
408            .default_output_device()
409            .context("no audio output device available")?;
410        config = device
411            .default_output_config()
412            .context("failed to get default output config")?;
413    }
414    Ok((device, config))
415}
416
417#[derive(Clone)]
418struct AudioMixerSource {
419    ssrc: i32,
420    sample_rate: u32,
421    num_channels: u32,
422    buffer: Arc<Mutex<VecDeque<Vec<i16>>>>,
423}
424
425impl AudioMixerSource {
426    fn receive(&self, frame: AudioFrame) {
427        assert_eq!(
428            frame.data.len() as u32,
429            self.sample_rate * self.num_channels / 100
430        );
431
432        let mut buffer = self.buffer.lock();
433        buffer.push_back(frame.data.to_vec());
434        while buffer.len() > 10 {
435            buffer.pop_front();
436        }
437    }
438}
439
440impl libwebrtc::native::audio_mixer::AudioMixerSource for AudioMixerSource {
441    fn ssrc(&self) -> i32 {
442        self.ssrc
443    }
444
445    fn preferred_sample_rate(&self) -> u32 {
446        self.sample_rate
447    }
448
449    fn get_audio_frame_with_info<'a>(&self, target_sample_rate: u32) -> Option<AudioFrame<'_>> {
450        assert_eq!(self.sample_rate, target_sample_rate);
451        let buf = self.buffer.lock().pop_front()?;
452        Some(AudioFrame {
453            data: Cow::Owned(buf),
454            sample_rate: self.sample_rate,
455            num_channels: self.num_channels,
456            samples_per_channel: self.sample_rate / 100,
457        })
458    }
459}
460
461pub fn play_remote_video_track(
462    track: &crate::RemoteVideoTrack,
463) -> impl Stream<Item = RemoteVideoFrame> + use<> {
464    #[cfg(target_os = "macos")]
465    {
466        let mut pool = None;
467        let most_recent_frame_size = (0, 0);
468        NativeVideoStream::new(track.0.rtc_track()).filter_map(move |frame| {
469            if pool == None
470                || most_recent_frame_size != (frame.buffer.width(), frame.buffer.height())
471            {
472                pool = create_buffer_pool(frame.buffer.width(), frame.buffer.height()).log_err();
473            }
474            let pool = pool.clone();
475            async move {
476                if frame.buffer.width() < 10 && frame.buffer.height() < 10 {
477                    // when the remote stops sharing, we get an 8x8 black image.
478                    // In a lil bit, the unpublish will come through and close the view,
479                    // but until then, don't flash black.
480                    return None;
481                }
482
483                video_frame_buffer_from_webrtc(pool?, frame.buffer)
484            }
485        })
486    }
487    #[cfg(not(target_os = "macos"))]
488    {
489        NativeVideoStream::new(track.0.rtc_track())
490            .filter_map(|frame| async move { video_frame_buffer_from_webrtc(frame.buffer) })
491    }
492}
493
494#[cfg(target_os = "macos")]
495fn create_buffer_pool(
496    width: u32,
497    height: u32,
498) -> Result<core_video::pixel_buffer_pool::CVPixelBufferPool> {
499    use core_foundation::{base::TCFType, number::CFNumber, string::CFString};
500    use core_video::pixel_buffer;
501    use core_video::{
502        pixel_buffer::kCVPixelFormatType_420YpCbCr8BiPlanarFullRange,
503        pixel_buffer_io_surface::kCVPixelBufferIOSurfaceCoreAnimationCompatibilityKey,
504        pixel_buffer_pool::{self},
505    };
506
507    let width_key: CFString =
508        unsafe { CFString::wrap_under_get_rule(pixel_buffer::kCVPixelBufferWidthKey) };
509    let height_key: CFString =
510        unsafe { CFString::wrap_under_get_rule(pixel_buffer::kCVPixelBufferHeightKey) };
511    let animation_key: CFString = unsafe {
512        CFString::wrap_under_get_rule(kCVPixelBufferIOSurfaceCoreAnimationCompatibilityKey)
513    };
514    let format_key: CFString =
515        unsafe { CFString::wrap_under_get_rule(pixel_buffer::kCVPixelBufferPixelFormatTypeKey) };
516
517    let yes: CFNumber = 1.into();
518    let width: CFNumber = (width as i32).into();
519    let height: CFNumber = (height as i32).into();
520    let format: CFNumber = (kCVPixelFormatType_420YpCbCr8BiPlanarFullRange as i64).into();
521
522    let buffer_attributes = core_foundation::dictionary::CFDictionary::from_CFType_pairs(&[
523        (width_key, width.into_CFType()),
524        (height_key, height.into_CFType()),
525        (animation_key, yes.into_CFType()),
526        (format_key, format.into_CFType()),
527    ]);
528
529    pixel_buffer_pool::CVPixelBufferPool::new(None, Some(&buffer_attributes)).map_err(|cv_return| {
530        anyhow::anyhow!("failed to create pixel buffer pool: CVReturn({cv_return})",)
531    })
532}
533
534#[cfg(target_os = "macos")]
535pub type RemoteVideoFrame = core_video::pixel_buffer::CVPixelBuffer;
536
537#[cfg(target_os = "macos")]
538fn video_frame_buffer_from_webrtc(
539    pool: core_video::pixel_buffer_pool::CVPixelBufferPool,
540    buffer: Box<dyn VideoBuffer>,
541) -> Option<RemoteVideoFrame> {
542    use core_foundation::base::TCFType;
543    use core_video::{pixel_buffer::CVPixelBuffer, r#return::kCVReturnSuccess};
544    use livekit::webrtc::native::yuv_helper::i420_to_nv12;
545
546    if let Some(native) = buffer.as_native() {
547        let pixel_buffer = native.get_cv_pixel_buffer();
548        if pixel_buffer.is_null() {
549            return None;
550        }
551        return unsafe { Some(CVPixelBuffer::wrap_under_get_rule(pixel_buffer as _)) };
552    }
553
554    let i420_buffer = buffer.as_i420()?;
555    let pixel_buffer = pool.create_pixel_buffer().log_err()?;
556
557    let image_buffer = unsafe {
558        if pixel_buffer.lock_base_address(0) != kCVReturnSuccess {
559            return None;
560        }
561
562        let dst_y = pixel_buffer.get_base_address_of_plane(0);
563        let dst_y_stride = pixel_buffer.get_bytes_per_row_of_plane(0);
564        let dst_y_len = pixel_buffer.get_height_of_plane(0) * dst_y_stride;
565        let dst_uv = pixel_buffer.get_base_address_of_plane(1);
566        let dst_uv_stride = pixel_buffer.get_bytes_per_row_of_plane(1);
567        let dst_uv_len = pixel_buffer.get_height_of_plane(1) * dst_uv_stride;
568        let width = pixel_buffer.get_width();
569        let height = pixel_buffer.get_height();
570        let dst_y_buffer = std::slice::from_raw_parts_mut(dst_y as *mut u8, dst_y_len);
571        let dst_uv_buffer = std::slice::from_raw_parts_mut(dst_uv as *mut u8, dst_uv_len);
572
573        let (stride_y, stride_u, stride_v) = i420_buffer.strides();
574        let (src_y, src_u, src_v) = i420_buffer.data();
575        i420_to_nv12(
576            src_y,
577            stride_y,
578            src_u,
579            stride_u,
580            src_v,
581            stride_v,
582            dst_y_buffer,
583            dst_y_stride as u32,
584            dst_uv_buffer,
585            dst_uv_stride as u32,
586            width as i32,
587            height as i32,
588        );
589
590        if pixel_buffer.unlock_base_address(0) != kCVReturnSuccess {
591            return None;
592        }
593
594        pixel_buffer
595    };
596
597    Some(image_buffer)
598}
599
600#[cfg(not(target_os = "macos"))]
601pub type RemoteVideoFrame = Arc<gpui::RenderImage>;
602
603#[cfg(not(target_os = "macos"))]
604fn video_frame_buffer_from_webrtc(buffer: Box<dyn VideoBuffer>) -> Option<RemoteVideoFrame> {
605    use gpui::RenderImage;
606    use image::{Frame, RgbaImage};
607    use livekit::webrtc::prelude::VideoFormatType;
608    use smallvec::SmallVec;
609    use std::alloc::{Layout, alloc};
610
611    let width = buffer.width();
612    let height = buffer.height();
613    let stride = width * 4;
614    let byte_len = (stride * height) as usize;
615    let argb_image = unsafe {
616        // Motivation for this unsafe code is to avoid initializing the frame data, since to_argb
617        // will write all bytes anyway.
618        let start_ptr = alloc(Layout::array::<u8>(byte_len).log_err()?);
619        if start_ptr.is_null() {
620            return None;
621        }
622        let argb_frame_slice = std::slice::from_raw_parts_mut(start_ptr, byte_len);
623        buffer.to_argb(
624            VideoFormatType::ARGB,
625            argb_frame_slice,
626            stride,
627            width as i32,
628            height as i32,
629        );
630        Vec::from_raw_parts(start_ptr, byte_len, byte_len)
631    };
632
633    // TODO: Unclear why providing argb_image to RgbaImage works properly.
634    let image = RgbaImage::from_raw(width, height, argb_image)
635        .with_context(|| "Bug: not enough bytes allocated for image.")
636        .log_err()?;
637
638    Some(Arc::new(RenderImage::new(SmallVec::from_elem(
639        Frame::new(image),
640        1,
641    ))))
642}
643
644#[cfg(target_os = "macos")]
645fn video_frame_buffer_to_webrtc(frame: ScreenCaptureFrame) -> Option<impl AsRef<dyn VideoBuffer>> {
646    use livekit::webrtc;
647
648    let pixel_buffer = frame.0.as_concrete_TypeRef();
649    std::mem::forget(frame.0);
650    unsafe {
651        Some(webrtc::video_frame::native::NativeBuffer::from_cv_pixel_buffer(pixel_buffer as _))
652    }
653}
654
655#[cfg(not(target_os = "macos"))]
656fn video_frame_buffer_to_webrtc(frame: ScreenCaptureFrame) -> Option<impl AsRef<dyn VideoBuffer>> {
657    use libwebrtc::native::yuv_helper::{abgr_to_nv12, argb_to_nv12};
658    use livekit::webrtc::prelude::NV12Buffer;
659    match frame.0 {
660        scap::frame::Frame::BGRx(frame) => {
661            let mut buffer = NV12Buffer::new(frame.width as u32, frame.height as u32);
662            let (stride_y, stride_uv) = buffer.strides();
663            let (data_y, data_uv) = buffer.data_mut();
664            argb_to_nv12(
665                &frame.data,
666                frame.width as u32 * 4,
667                data_y,
668                stride_y,
669                data_uv,
670                stride_uv,
671                frame.width,
672                frame.height,
673            );
674            Some(buffer)
675        }
676        scap::frame::Frame::RGBx(frame) => {
677            let mut buffer = NV12Buffer::new(frame.width as u32, frame.height as u32);
678            let (stride_y, stride_uv) = buffer.strides();
679            let (data_y, data_uv) = buffer.data_mut();
680            abgr_to_nv12(
681                &frame.data,
682                frame.width as u32 * 4,
683                data_y,
684                stride_y,
685                data_uv,
686                stride_uv,
687                frame.width,
688                frame.height,
689            );
690            Some(buffer)
691        }
692        scap::frame::Frame::YUVFrame(yuvframe) => {
693            let mut buffer = NV12Buffer::with_strides(
694                yuvframe.width as u32,
695                yuvframe.height as u32,
696                yuvframe.luminance_stride as u32,
697                yuvframe.chrominance_stride as u32,
698            );
699            let (luminance, chrominance) = buffer.data_mut();
700            luminance.copy_from_slice(yuvframe.luminance_bytes.as_slice());
701            chrominance.copy_from_slice(yuvframe.chrominance_bytes.as_slice());
702            Some(buffer)
703        }
704        _ => {
705            log::error!(
706                "Expected BGRx or YUV frame from scap screen capture but got some other format."
707            );
708            None
709        }
710    }
711}
712
713trait DeviceChangeListenerApi: Stream<Item = ()> + Sized {
714    fn new(input: bool) -> Result<Self>;
715}
716
717#[cfg(target_os = "macos")]
718mod macos {
719
720    use coreaudio::sys::{
721        AudioObjectAddPropertyListener, AudioObjectID, AudioObjectPropertyAddress,
722        AudioObjectRemovePropertyListener, OSStatus, kAudioHardwarePropertyDefaultInputDevice,
723        kAudioHardwarePropertyDefaultOutputDevice, kAudioObjectPropertyElementMaster,
724        kAudioObjectPropertyScopeGlobal, kAudioObjectSystemObject,
725    };
726    use futures::{StreamExt, channel::mpsc::UnboundedReceiver};
727
728    /// Implementation from: https://github.com/zed-industries/cpal/blob/fd8bc2fd39f1f5fdee5a0690656caff9a26d9d50/src/host/coreaudio/macos/property_listener.rs#L15
729    pub struct CoreAudioDefaultDeviceChangeListener {
730        rx: UnboundedReceiver<()>,
731        callback: Box<PropertyListenerCallbackWrapper>,
732        input: bool,
733        device_id: AudioObjectID, // Store the device ID to properly remove listeners
734    }
735
736    trait _AssertSend: Send {}
737    impl _AssertSend for CoreAudioDefaultDeviceChangeListener {}
738
739    struct PropertyListenerCallbackWrapper(Box<dyn FnMut() + Send>);
740
741    unsafe extern "C" fn property_listener_handler_shim(
742        _: AudioObjectID,
743        _: u32,
744        _: *const AudioObjectPropertyAddress,
745        callback: *mut ::std::os::raw::c_void,
746    ) -> OSStatus {
747        let wrapper = callback as *mut PropertyListenerCallbackWrapper;
748        unsafe { (*wrapper).0() };
749        0
750    }
751
752    impl super::DeviceChangeListenerApi for CoreAudioDefaultDeviceChangeListener {
753        fn new(input: bool) -> anyhow::Result<Self> {
754            let (tx, rx) = futures::channel::mpsc::unbounded();
755
756            let callback = Box::new(PropertyListenerCallbackWrapper(Box::new(move || {
757                tx.unbounded_send(()).ok();
758            })));
759
760            // Get the current default device ID
761            let device_id = unsafe {
762                // Listen for default device changes
763                coreaudio::Error::from_os_status(AudioObjectAddPropertyListener(
764                    kAudioObjectSystemObject,
765                    &AudioObjectPropertyAddress {
766                        mSelector: if input {
767                            kAudioHardwarePropertyDefaultInputDevice
768                        } else {
769                            kAudioHardwarePropertyDefaultOutputDevice
770                        },
771                        mScope: kAudioObjectPropertyScopeGlobal,
772                        mElement: kAudioObjectPropertyElementMaster,
773                    },
774                    Some(property_listener_handler_shim),
775                    &*callback as *const _ as *mut _,
776                ))?;
777
778                // Also listen for changes to the device configuration
779                let device_id = if input {
780                    let mut input_device: AudioObjectID = 0;
781                    let mut prop_size = std::mem::size_of::<AudioObjectID>() as u32;
782                    let result = coreaudio::sys::AudioObjectGetPropertyData(
783                        kAudioObjectSystemObject,
784                        &AudioObjectPropertyAddress {
785                            mSelector: kAudioHardwarePropertyDefaultInputDevice,
786                            mScope: kAudioObjectPropertyScopeGlobal,
787                            mElement: kAudioObjectPropertyElementMaster,
788                        },
789                        0,
790                        std::ptr::null(),
791                        &mut prop_size as *mut _,
792                        &mut input_device as *mut _ as *mut _,
793                    );
794                    if result != 0 {
795                        log::warn!("Failed to get default input device ID");
796                        0
797                    } else {
798                        input_device
799                    }
800                } else {
801                    let mut output_device: AudioObjectID = 0;
802                    let mut prop_size = std::mem::size_of::<AudioObjectID>() as u32;
803                    let result = coreaudio::sys::AudioObjectGetPropertyData(
804                        kAudioObjectSystemObject,
805                        &AudioObjectPropertyAddress {
806                            mSelector: kAudioHardwarePropertyDefaultOutputDevice,
807                            mScope: kAudioObjectPropertyScopeGlobal,
808                            mElement: kAudioObjectPropertyElementMaster,
809                        },
810                        0,
811                        std::ptr::null(),
812                        &mut prop_size as *mut _,
813                        &mut output_device as *mut _ as *mut _,
814                    );
815                    if result != 0 {
816                        log::warn!("Failed to get default output device ID");
817                        0
818                    } else {
819                        output_device
820                    }
821                };
822
823                if device_id != 0 {
824                    // Listen for format changes on the device
825                    coreaudio::Error::from_os_status(AudioObjectAddPropertyListener(
826                        device_id,
827                        &AudioObjectPropertyAddress {
828                            mSelector: coreaudio::sys::kAudioDevicePropertyStreamFormat,
829                            mScope: if input {
830                                coreaudio::sys::kAudioObjectPropertyScopeInput
831                            } else {
832                                coreaudio::sys::kAudioObjectPropertyScopeOutput
833                            },
834                            mElement: kAudioObjectPropertyElementMaster,
835                        },
836                        Some(property_listener_handler_shim),
837                        &*callback as *const _ as *mut _,
838                    ))?;
839                }
840
841                device_id
842            };
843
844            Ok(Self {
845                rx,
846                callback,
847                input,
848                device_id,
849            })
850        }
851    }
852
853    impl Drop for CoreAudioDefaultDeviceChangeListener {
854        fn drop(&mut self) {
855            unsafe {
856                // Remove the system-level property listener
857                AudioObjectRemovePropertyListener(
858                    kAudioObjectSystemObject,
859                    &AudioObjectPropertyAddress {
860                        mSelector: if self.input {
861                            kAudioHardwarePropertyDefaultInputDevice
862                        } else {
863                            kAudioHardwarePropertyDefaultOutputDevice
864                        },
865                        mScope: kAudioObjectPropertyScopeGlobal,
866                        mElement: kAudioObjectPropertyElementMaster,
867                    },
868                    Some(property_listener_handler_shim),
869                    &*self.callback as *const _ as *mut _,
870                );
871
872                // Remove the device-specific property listener if we have a valid device ID
873                if self.device_id != 0 {
874                    AudioObjectRemovePropertyListener(
875                        self.device_id,
876                        &AudioObjectPropertyAddress {
877                            mSelector: coreaudio::sys::kAudioDevicePropertyStreamFormat,
878                            mScope: if self.input {
879                                coreaudio::sys::kAudioObjectPropertyScopeInput
880                            } else {
881                                coreaudio::sys::kAudioObjectPropertyScopeOutput
882                            },
883                            mElement: kAudioObjectPropertyElementMaster,
884                        },
885                        Some(property_listener_handler_shim),
886                        &*self.callback as *const _ as *mut _,
887                    );
888                }
889            }
890        }
891    }
892
893    impl futures::Stream for CoreAudioDefaultDeviceChangeListener {
894        type Item = ();
895
896        fn poll_next(
897            mut self: std::pin::Pin<&mut Self>,
898            cx: &mut std::task::Context<'_>,
899        ) -> std::task::Poll<Option<Self::Item>> {
900            self.rx.poll_next_unpin(cx)
901        }
902    }
903}
904
905#[cfg(target_os = "macos")]
906type DeviceChangeListener = macos::CoreAudioDefaultDeviceChangeListener;
907
908#[cfg(not(target_os = "macos"))]
909mod noop_change_listener {
910    use std::task::Poll;
911
912    use super::DeviceChangeListenerApi;
913
914    pub struct NoopOutputDeviceChangelistener {}
915
916    impl DeviceChangeListenerApi for NoopOutputDeviceChangelistener {
917        fn new(_input: bool) -> anyhow::Result<Self> {
918            Ok(NoopOutputDeviceChangelistener {})
919        }
920    }
921
922    impl futures::Stream for NoopOutputDeviceChangelistener {
923        type Item = ();
924
925        fn poll_next(
926            self: std::pin::Pin<&mut Self>,
927            _cx: &mut std::task::Context<'_>,
928        ) -> Poll<Option<Self::Item>> {
929            Poll::Pending
930        }
931    }
932}
933
934#[cfg(not(target_os = "macos"))]
935type DeviceChangeListener = noop_change_listener::NoopOutputDeviceChangelistener;