playback.rs

  1use anyhow::{Context as _, Result};
  2
  3use cpal::traits::{DeviceTrait, StreamTrait as _};
  4use futures::channel::mpsc::UnboundedSender;
  5use futures::{Stream, StreamExt as _};
  6use gpui::{
  7    BackgroundExecutor, ScreenCaptureFrame, ScreenCaptureSource, ScreenCaptureStream, Task,
  8};
  9use libwebrtc::native::{apm, audio_mixer, audio_resampler};
 10use livekit::track;
 11
 12use livekit::webrtc::{
 13    audio_frame::AudioFrame,
 14    audio_source::{AudioSourceOptions, RtcAudioSource, native::NativeAudioSource},
 15    audio_stream::native::NativeAudioStream,
 16    video_frame::{VideoBuffer, VideoFrame, VideoRotation},
 17    video_source::{RtcVideoSource, VideoResolution, native::NativeVideoSource},
 18    video_stream::native::NativeVideoStream,
 19};
 20use parking_lot::Mutex;
 21use rodio::Source;
 22use std::cell::RefCell;
 23use std::sync::Weak;
 24use std::sync::atomic::{AtomicBool, AtomicI32, Ordering};
 25use std::time::Duration;
 26use std::{borrow::Cow, collections::VecDeque, sync::Arc, thread};
 27use util::{ResultExt as _, maybe};
 28
 29mod source;
 30
 31pub(crate) struct AudioStack {
 32    executor: BackgroundExecutor,
 33    apm: Arc<Mutex<apm::AudioProcessingModule>>,
 34    mixer: Arc<Mutex<audio_mixer::AudioMixer>>,
 35    _output_task: RefCell<Weak<Task<()>>>,
 36    next_ssrc: AtomicI32,
 37}
 38
 39// NOTE: We use WebRTC's mixer which only supports
 40// 16kHz, 32kHz and 48kHz. As 48 is the most common "next step up"
 41// for audio output devices like speakers/bluetooth, we just hard-code
 42// this; and downsample when we need to.
 43const SAMPLE_RATE: u32 = 48000;
 44const NUM_CHANNELS: u32 = 2;
 45
 46pub(crate) fn play_remote_audio_track(
 47    track: &livekit::track::RemoteAudioTrack,
 48    cx: &mut gpui::App,
 49) -> Result<AudioStream> {
 50    let stop_handle = Arc::new(AtomicBool::new(false));
 51    let stop_handle_clone = stop_handle.clone();
 52    let stream = source::LiveKitStream::new(cx.background_executor(), track)
 53        .stoppable()
 54        .periodic_access(Duration::from_millis(50), move |s| {
 55            if stop_handle.load(Ordering::Relaxed) {
 56                s.stop();
 57            }
 58        });
 59    audio::Audio::play_source(stream, cx).context("Could not play audio")?;
 60
 61    let on_drop = util::defer(move || {
 62        stop_handle_clone.store(true, Ordering::Relaxed);
 63    });
 64    Ok(AudioStream::Output {
 65        _drop: Box::new(on_drop),
 66    })
 67}
 68
 69impl AudioStack {
 70    pub(crate) fn new(executor: BackgroundExecutor) -> Self {
 71        let apm = Arc::new(Mutex::new(apm::AudioProcessingModule::new(
 72            true, true, true, true,
 73        )));
 74        let mixer = Arc::new(Mutex::new(audio_mixer::AudioMixer::new()));
 75        Self {
 76            executor,
 77            apm,
 78            mixer,
 79            _output_task: RefCell::new(Weak::new()),
 80            next_ssrc: AtomicI32::new(1),
 81        }
 82    }
 83
 84    pub(crate) fn play_remote_audio_track(
 85        &self,
 86        track: &livekit::track::RemoteAudioTrack,
 87    ) -> AudioStream {
 88        let output_task = self.start_output();
 89
 90        let next_ssrc = self.next_ssrc.fetch_add(1, Ordering::Relaxed);
 91        let source = AudioMixerSource {
 92            ssrc: next_ssrc,
 93            sample_rate: SAMPLE_RATE,
 94            num_channels: NUM_CHANNELS,
 95            buffer: Arc::default(),
 96        };
 97        self.mixer.lock().add_source(source.clone());
 98
 99        let mut stream = NativeAudioStream::new(
100            track.rtc_track(),
101            source.sample_rate as i32,
102            source.num_channels as i32,
103        );
104
105        let receive_task = self.executor.spawn({
106            let source = source.clone();
107            async move {
108                while let Some(frame) = stream.next().await {
109                    source.receive(frame);
110                }
111            }
112        });
113
114        let mixer = self.mixer.clone();
115        let on_drop = util::defer(move || {
116            mixer.lock().remove_source(source.ssrc);
117            drop(receive_task);
118            drop(output_task);
119        });
120
121        AudioStream::Output {
122            _drop: Box::new(on_drop),
123        }
124    }
125
126    fn start_output(&self) -> Arc<Task<()>> {
127        if let Some(task) = self._output_task.borrow().upgrade() {
128            return task;
129        }
130        let task = Arc::new(self.executor.spawn({
131            let apm = self.apm.clone();
132            let mixer = self.mixer.clone();
133            async move {
134                Self::play_output(apm, mixer, SAMPLE_RATE, NUM_CHANNELS)
135                    .await
136                    .log_err();
137            }
138        }));
139        *self._output_task.borrow_mut() = Arc::downgrade(&task);
140        task
141    }
142
143    pub(crate) fn capture_local_microphone_track(
144        &self,
145    ) -> Result<(crate::LocalAudioTrack, AudioStream)> {
146        let source = NativeAudioSource::new(
147            // n.b. this struct's options are always ignored, noise cancellation is provided by apm.
148            AudioSourceOptions::default(),
149            SAMPLE_RATE,
150            NUM_CHANNELS,
151            10,
152        );
153
154        let track = track::LocalAudioTrack::create_audio_track(
155            "microphone",
156            RtcAudioSource::Native(source.clone()),
157        );
158
159        let apm = self.apm.clone();
160
161        let (frame_tx, mut frame_rx) = futures::channel::mpsc::unbounded();
162        let transmit_task = self.executor.spawn({
163            async move {
164                while let Some(frame) = frame_rx.next().await {
165                    source.capture_frame(&frame).await.log_err();
166                }
167            }
168        });
169        let capture_task = self.executor.spawn(async move {
170            Self::capture_input(apm, frame_tx, SAMPLE_RATE, NUM_CHANNELS).await
171        });
172
173        let on_drop = util::defer(|| {
174            drop(transmit_task);
175            drop(capture_task);
176        });
177        Ok((
178            super::LocalAudioTrack(track),
179            AudioStream::Output {
180                _drop: Box::new(on_drop),
181            },
182        ))
183    }
184
185    async fn play_output(
186        apm: Arc<Mutex<apm::AudioProcessingModule>>,
187        mixer: Arc<Mutex<audio_mixer::AudioMixer>>,
188        sample_rate: u32,
189        num_channels: u32,
190    ) -> Result<()> {
191        loop {
192            let mut device_change_listener = DeviceChangeListener::new(false)?;
193            let (output_device, output_config) = crate::default_device(false)?;
194            let (end_on_drop_tx, end_on_drop_rx) = std::sync::mpsc::channel::<()>();
195            let mixer = mixer.clone();
196            let apm = apm.clone();
197            let mut resampler = audio_resampler::AudioResampler::default();
198            let mut buf = Vec::new();
199
200            thread::spawn(move || {
201                let output_stream = output_device.build_output_stream(
202                    &output_config.config(),
203                    {
204                        move |mut data, _info| {
205                            while data.len() > 0 {
206                                if data.len() <= buf.len() {
207                                    let rest = buf.split_off(data.len());
208                                    data.copy_from_slice(&buf);
209                                    buf = rest;
210                                    return;
211                                }
212                                if buf.len() > 0 {
213                                    let (prefix, suffix) = data.split_at_mut(buf.len());
214                                    prefix.copy_from_slice(&buf);
215                                    data = suffix;
216                                }
217
218                                let mut mixer = mixer.lock();
219                                let mixed = mixer.mix(output_config.channels() as usize);
220                                let sampled = resampler.remix_and_resample(
221                                    mixed,
222                                    sample_rate / 100,
223                                    num_channels,
224                                    sample_rate,
225                                    output_config.channels() as u32,
226                                    output_config.sample_rate().0,
227                                );
228                                buf = sampled.to_vec();
229                                apm.lock()
230                                    .process_reverse_stream(
231                                        &mut buf,
232                                        output_config.sample_rate().0 as i32,
233                                        output_config.channels() as i32,
234                                    )
235                                    .ok();
236                            }
237                        }
238                    },
239                    |error| log::error!("error playing audio track: {:?}", error),
240                    Some(Duration::from_millis(100)),
241                );
242
243                let Some(output_stream) = output_stream.log_err() else {
244                    return;
245                };
246
247                output_stream.play().log_err();
248                // Block forever to keep the output stream alive
249                end_on_drop_rx.recv().ok();
250            });
251
252            device_change_listener.next().await;
253            drop(end_on_drop_tx)
254        }
255    }
256
257    async fn capture_input(
258        apm: Arc<Mutex<apm::AudioProcessingModule>>,
259        frame_tx: UnboundedSender<AudioFrame<'static>>,
260        sample_rate: u32,
261        num_channels: u32,
262    ) -> Result<()> {
263        loop {
264            let mut device_change_listener = DeviceChangeListener::new(true)?;
265            let (device, config) = crate::default_device(true)?;
266            let (end_on_drop_tx, end_on_drop_rx) = std::sync::mpsc::channel::<()>();
267            let apm = apm.clone();
268            let frame_tx = frame_tx.clone();
269            let mut resampler = audio_resampler::AudioResampler::default();
270
271            thread::spawn(move || {
272                maybe!({
273                    if let Some(name) = device.name().ok() {
274                        log::info!("Using microphone: {}", name)
275                    } else {
276                        log::info!("Using microphone: <unknown>");
277                    }
278
279                    let ten_ms_buffer_size =
280                        (config.channels() as u32 * config.sample_rate().0 / 100) as usize;
281                    let mut buf: Vec<i16> = Vec::with_capacity(ten_ms_buffer_size);
282
283                    let stream = device
284                        .build_input_stream_raw(
285                            &config.config(),
286                            config.sample_format(),
287                            move |data, _: &_| {
288                                let data =
289                                    crate::get_sample_data(config.sample_format(), data).log_err();
290                                let Some(data) = data else {
291                                    return;
292                                };
293                                let mut data = data.as_slice();
294
295                                while data.len() > 0 {
296                                    let remainder = (buf.capacity() - buf.len()).min(data.len());
297                                    buf.extend_from_slice(&data[..remainder]);
298                                    data = &data[remainder..];
299
300                                    if buf.capacity() == buf.len() {
301                                        let mut sampled = resampler
302                                            .remix_and_resample(
303                                                buf.as_slice(),
304                                                config.sample_rate().0 / 100,
305                                                config.channels() as u32,
306                                                config.sample_rate().0,
307                                                num_channels,
308                                                sample_rate,
309                                            )
310                                            .to_owned();
311                                        apm.lock()
312                                            .process_stream(
313                                                &mut sampled,
314                                                sample_rate as i32,
315                                                num_channels as i32,
316                                            )
317                                            .log_err();
318                                        buf.clear();
319                                        frame_tx
320                                            .unbounded_send(AudioFrame {
321                                                data: Cow::Owned(sampled),
322                                                sample_rate,
323                                                num_channels,
324                                                samples_per_channel: sample_rate / 100,
325                                            })
326                                            .ok();
327                                    }
328                                }
329                            },
330                            |err| log::error!("error capturing audio track: {:?}", err),
331                            Some(Duration::from_millis(100)),
332                        )
333                        .context("failed to build input stream")?;
334
335                    stream.play()?;
336                    // Keep the thread alive and holding onto the `stream`
337                    end_on_drop_rx.recv().ok();
338                    anyhow::Ok(Some(()))
339                })
340                .log_err();
341            });
342
343            device_change_listener.next().await;
344            drop(end_on_drop_tx)
345        }
346    }
347}
348
349use super::LocalVideoTrack;
350
351pub enum AudioStream {
352    Input { _task: Task<()> },
353    Output { _drop: Box<dyn std::any::Any> },
354}
355
356pub(crate) async fn capture_local_video_track(
357    capture_source: &dyn ScreenCaptureSource,
358    cx: &mut gpui::AsyncApp,
359) -> Result<(crate::LocalVideoTrack, Box<dyn ScreenCaptureStream>)> {
360    let metadata = capture_source.metadata()?;
361    let track_source = gpui_tokio::Tokio::spawn(cx, async move {
362        NativeVideoSource::new(VideoResolution {
363            width: metadata.resolution.width.0 as u32,
364            height: metadata.resolution.height.0 as u32,
365        })
366    })?
367    .await?;
368
369    let capture_stream = capture_source
370        .stream(cx.foreground_executor(), {
371            let track_source = track_source.clone();
372            Box::new(move |frame| {
373                if let Some(buffer) = video_frame_buffer_to_webrtc(frame) {
374                    track_source.capture_frame(&VideoFrame {
375                        rotation: VideoRotation::VideoRotation0,
376                        timestamp_us: 0,
377                        buffer,
378                    });
379                }
380            })
381        })
382        .await??;
383
384    Ok((
385        LocalVideoTrack(track::LocalVideoTrack::create_video_track(
386            "screen share",
387            RtcVideoSource::Native(track_source),
388        )),
389        capture_stream,
390    ))
391}
392
393#[derive(Clone)]
394struct AudioMixerSource {
395    ssrc: i32,
396    sample_rate: u32,
397    num_channels: u32,
398    buffer: Arc<Mutex<VecDeque<Vec<i16>>>>,
399}
400
401impl AudioMixerSource {
402    fn receive(&self, frame: AudioFrame) {
403        assert_eq!(
404            frame.data.len() as u32,
405            self.sample_rate * self.num_channels / 100
406        );
407
408        let mut buffer = self.buffer.lock();
409        buffer.push_back(frame.data.to_vec());
410        while buffer.len() > 10 {
411            buffer.pop_front();
412        }
413    }
414}
415
416impl libwebrtc::native::audio_mixer::AudioMixerSource for AudioMixerSource {
417    fn ssrc(&self) -> i32 {
418        self.ssrc
419    }
420
421    fn preferred_sample_rate(&self) -> u32 {
422        self.sample_rate
423    }
424
425    fn get_audio_frame_with_info<'a>(&self, target_sample_rate: u32) -> Option<AudioFrame<'_>> {
426        assert_eq!(self.sample_rate, target_sample_rate);
427        let buf = self.buffer.lock().pop_front()?;
428        Some(AudioFrame {
429            data: Cow::Owned(buf),
430            sample_rate: self.sample_rate,
431            num_channels: self.num_channels,
432            samples_per_channel: self.sample_rate / 100,
433        })
434    }
435}
436
437pub fn play_remote_video_track(
438    track: &crate::RemoteVideoTrack,
439) -> impl Stream<Item = RemoteVideoFrame> + use<> {
440    #[cfg(target_os = "macos")]
441    {
442        let mut pool = None;
443        let most_recent_frame_size = (0, 0);
444        NativeVideoStream::new(track.0.rtc_track()).filter_map(move |frame| {
445            if pool == None
446                || most_recent_frame_size != (frame.buffer.width(), frame.buffer.height())
447            {
448                pool = create_buffer_pool(frame.buffer.width(), frame.buffer.height()).log_err();
449            }
450            let pool = pool.clone();
451            async move {
452                if frame.buffer.width() < 10 && frame.buffer.height() < 10 {
453                    // when the remote stops sharing, we get an 8x8 black image.
454                    // In a lil bit, the unpublish will come through and close the view,
455                    // but until then, don't flash black.
456                    return None;
457                }
458
459                video_frame_buffer_from_webrtc(pool?, frame.buffer)
460            }
461        })
462    }
463    #[cfg(not(target_os = "macos"))]
464    {
465        NativeVideoStream::new(track.0.rtc_track())
466            .filter_map(|frame| async move { video_frame_buffer_from_webrtc(frame.buffer) })
467    }
468}
469
470#[cfg(target_os = "macos")]
471fn create_buffer_pool(
472    width: u32,
473    height: u32,
474) -> Result<core_video::pixel_buffer_pool::CVPixelBufferPool> {
475    use core_foundation::{base::TCFType, number::CFNumber, string::CFString};
476    use core_video::pixel_buffer;
477    use core_video::{
478        pixel_buffer::kCVPixelFormatType_420YpCbCr8BiPlanarFullRange,
479        pixel_buffer_io_surface::kCVPixelBufferIOSurfaceCoreAnimationCompatibilityKey,
480        pixel_buffer_pool::{self},
481    };
482
483    let width_key: CFString =
484        unsafe { CFString::wrap_under_get_rule(pixel_buffer::kCVPixelBufferWidthKey) };
485    let height_key: CFString =
486        unsafe { CFString::wrap_under_get_rule(pixel_buffer::kCVPixelBufferHeightKey) };
487    let animation_key: CFString = unsafe {
488        CFString::wrap_under_get_rule(kCVPixelBufferIOSurfaceCoreAnimationCompatibilityKey)
489    };
490    let format_key: CFString =
491        unsafe { CFString::wrap_under_get_rule(pixel_buffer::kCVPixelBufferPixelFormatTypeKey) };
492
493    let yes: CFNumber = 1.into();
494    let width: CFNumber = (width as i32).into();
495    let height: CFNumber = (height as i32).into();
496    let format: CFNumber = (kCVPixelFormatType_420YpCbCr8BiPlanarFullRange as i64).into();
497
498    let buffer_attributes = core_foundation::dictionary::CFDictionary::from_CFType_pairs(&[
499        (width_key, width.into_CFType()),
500        (height_key, height.into_CFType()),
501        (animation_key, yes.into_CFType()),
502        (format_key, format.into_CFType()),
503    ]);
504
505    pixel_buffer_pool::CVPixelBufferPool::new(None, Some(&buffer_attributes)).map_err(|cv_return| {
506        anyhow::anyhow!("failed to create pixel buffer pool: CVReturn({cv_return})",)
507    })
508}
509
510#[cfg(target_os = "macos")]
511pub type RemoteVideoFrame = core_video::pixel_buffer::CVPixelBuffer;
512
513#[cfg(target_os = "macos")]
514fn video_frame_buffer_from_webrtc(
515    pool: core_video::pixel_buffer_pool::CVPixelBufferPool,
516    buffer: Box<dyn VideoBuffer>,
517) -> Option<RemoteVideoFrame> {
518    use core_foundation::base::TCFType;
519    use core_video::{pixel_buffer::CVPixelBuffer, r#return::kCVReturnSuccess};
520    use livekit::webrtc::native::yuv_helper::i420_to_nv12;
521
522    if let Some(native) = buffer.as_native() {
523        let pixel_buffer = native.get_cv_pixel_buffer();
524        if pixel_buffer.is_null() {
525            return None;
526        }
527        return unsafe { Some(CVPixelBuffer::wrap_under_get_rule(pixel_buffer as _)) };
528    }
529
530    let i420_buffer = buffer.as_i420()?;
531    let pixel_buffer = pool.create_pixel_buffer().log_err()?;
532
533    let image_buffer = unsafe {
534        if pixel_buffer.lock_base_address(0) != kCVReturnSuccess {
535            return None;
536        }
537
538        let dst_y = pixel_buffer.get_base_address_of_plane(0);
539        let dst_y_stride = pixel_buffer.get_bytes_per_row_of_plane(0);
540        let dst_y_len = pixel_buffer.get_height_of_plane(0) * dst_y_stride;
541        let dst_uv = pixel_buffer.get_base_address_of_plane(1);
542        let dst_uv_stride = pixel_buffer.get_bytes_per_row_of_plane(1);
543        let dst_uv_len = pixel_buffer.get_height_of_plane(1) * dst_uv_stride;
544        let width = pixel_buffer.get_width();
545        let height = pixel_buffer.get_height();
546        let dst_y_buffer = std::slice::from_raw_parts_mut(dst_y as *mut u8, dst_y_len);
547        let dst_uv_buffer = std::slice::from_raw_parts_mut(dst_uv as *mut u8, dst_uv_len);
548
549        let (stride_y, stride_u, stride_v) = i420_buffer.strides();
550        let (src_y, src_u, src_v) = i420_buffer.data();
551        i420_to_nv12(
552            src_y,
553            stride_y,
554            src_u,
555            stride_u,
556            src_v,
557            stride_v,
558            dst_y_buffer,
559            dst_y_stride as u32,
560            dst_uv_buffer,
561            dst_uv_stride as u32,
562            width as i32,
563            height as i32,
564        );
565
566        if pixel_buffer.unlock_base_address(0) != kCVReturnSuccess {
567            return None;
568        }
569
570        pixel_buffer
571    };
572
573    Some(image_buffer)
574}
575
576#[cfg(not(target_os = "macos"))]
577pub type RemoteVideoFrame = Arc<gpui::RenderImage>;
578
579#[cfg(not(target_os = "macos"))]
580fn video_frame_buffer_from_webrtc(buffer: Box<dyn VideoBuffer>) -> Option<RemoteVideoFrame> {
581    use gpui::RenderImage;
582    use image::{Frame, RgbaImage};
583    use livekit::webrtc::prelude::VideoFormatType;
584    use smallvec::SmallVec;
585    use std::alloc::{Layout, alloc};
586
587    let width = buffer.width();
588    let height = buffer.height();
589    let stride = width * 4;
590    let byte_len = (stride * height) as usize;
591    let argb_image = unsafe {
592        // Motivation for this unsafe code is to avoid initializing the frame data, since to_argb
593        // will write all bytes anyway.
594        let start_ptr = alloc(Layout::array::<u8>(byte_len).log_err()?);
595        if start_ptr.is_null() {
596            return None;
597        }
598        let argb_frame_slice = std::slice::from_raw_parts_mut(start_ptr, byte_len);
599        buffer.to_argb(
600            VideoFormatType::ARGB,
601            argb_frame_slice,
602            stride,
603            width as i32,
604            height as i32,
605        );
606        Vec::from_raw_parts(start_ptr, byte_len, byte_len)
607    };
608
609    // TODO: Unclear why providing argb_image to RgbaImage works properly.
610    let image = RgbaImage::from_raw(width, height, argb_image)
611        .with_context(|| "Bug: not enough bytes allocated for image.")
612        .log_err()?;
613
614    Some(Arc::new(RenderImage::new(SmallVec::from_elem(
615        Frame::new(image),
616        1,
617    ))))
618}
619
620#[cfg(target_os = "macos")]
621fn video_frame_buffer_to_webrtc(frame: ScreenCaptureFrame) -> Option<impl AsRef<dyn VideoBuffer>> {
622    use livekit::webrtc;
623
624    let pixel_buffer = frame.0.as_concrete_TypeRef();
625    std::mem::forget(frame.0);
626    unsafe {
627        Some(webrtc::video_frame::native::NativeBuffer::from_cv_pixel_buffer(pixel_buffer as _))
628    }
629}
630
631#[cfg(not(target_os = "macos"))]
632fn video_frame_buffer_to_webrtc(frame: ScreenCaptureFrame) -> Option<impl AsRef<dyn VideoBuffer>> {
633    use libwebrtc::native::yuv_helper::{abgr_to_nv12, argb_to_nv12};
634    use livekit::webrtc::prelude::NV12Buffer;
635    match frame.0 {
636        scap::frame::Frame::BGRx(frame) => {
637            let mut buffer = NV12Buffer::new(frame.width as u32, frame.height as u32);
638            let (stride_y, stride_uv) = buffer.strides();
639            let (data_y, data_uv) = buffer.data_mut();
640            argb_to_nv12(
641                &frame.data,
642                frame.width as u32 * 4,
643                data_y,
644                stride_y,
645                data_uv,
646                stride_uv,
647                frame.width,
648                frame.height,
649            );
650            Some(buffer)
651        }
652        scap::frame::Frame::RGBx(frame) => {
653            let mut buffer = NV12Buffer::new(frame.width as u32, frame.height as u32);
654            let (stride_y, stride_uv) = buffer.strides();
655            let (data_y, data_uv) = buffer.data_mut();
656            abgr_to_nv12(
657                &frame.data,
658                frame.width as u32 * 4,
659                data_y,
660                stride_y,
661                data_uv,
662                stride_uv,
663                frame.width,
664                frame.height,
665            );
666            Some(buffer)
667        }
668        scap::frame::Frame::YUVFrame(yuvframe) => {
669            let mut buffer = NV12Buffer::with_strides(
670                yuvframe.width as u32,
671                yuvframe.height as u32,
672                yuvframe.luminance_stride as u32,
673                yuvframe.chrominance_stride as u32,
674            );
675            let (luminance, chrominance) = buffer.data_mut();
676            luminance.copy_from_slice(yuvframe.luminance_bytes.as_slice());
677            chrominance.copy_from_slice(yuvframe.chrominance_bytes.as_slice());
678            Some(buffer)
679        }
680        _ => {
681            log::error!(
682                "Expected BGRx or YUV frame from scap screen capture but got some other format."
683            );
684            None
685        }
686    }
687}
688
689trait DeviceChangeListenerApi: Stream<Item = ()> + Sized {
690    fn new(input: bool) -> Result<Self>;
691}
692
693#[cfg(target_os = "macos")]
694mod macos {
695
696    use coreaudio::sys::{
697        AudioObjectAddPropertyListener, AudioObjectID, AudioObjectPropertyAddress,
698        AudioObjectRemovePropertyListener, OSStatus, kAudioHardwarePropertyDefaultInputDevice,
699        kAudioHardwarePropertyDefaultOutputDevice, kAudioObjectPropertyElementMaster,
700        kAudioObjectPropertyScopeGlobal, kAudioObjectSystemObject,
701    };
702    use futures::{StreamExt, channel::mpsc::UnboundedReceiver};
703
704    /// Implementation from: https://github.com/zed-industries/cpal/blob/fd8bc2fd39f1f5fdee5a0690656caff9a26d9d50/src/host/coreaudio/macos/property_listener.rs#L15
705    pub struct CoreAudioDefaultDeviceChangeListener {
706        rx: UnboundedReceiver<()>,
707        callback: Box<PropertyListenerCallbackWrapper>,
708        input: bool,
709        device_id: AudioObjectID, // Store the device ID to properly remove listeners
710    }
711
712    trait _AssertSend: Send {}
713    impl _AssertSend for CoreAudioDefaultDeviceChangeListener {}
714
715    struct PropertyListenerCallbackWrapper(Box<dyn FnMut() + Send>);
716
717    unsafe extern "C" fn property_listener_handler_shim(
718        _: AudioObjectID,
719        _: u32,
720        _: *const AudioObjectPropertyAddress,
721        callback: *mut ::std::os::raw::c_void,
722    ) -> OSStatus {
723        let wrapper = callback as *mut PropertyListenerCallbackWrapper;
724        unsafe { (*wrapper).0() };
725        0
726    }
727
728    impl super::DeviceChangeListenerApi for CoreAudioDefaultDeviceChangeListener {
729        fn new(input: bool) -> anyhow::Result<Self> {
730            let (tx, rx) = futures::channel::mpsc::unbounded();
731
732            let callback = Box::new(PropertyListenerCallbackWrapper(Box::new(move || {
733                tx.unbounded_send(()).ok();
734            })));
735
736            // Get the current default device ID
737            let device_id = unsafe {
738                // Listen for default device changes
739                coreaudio::Error::from_os_status(AudioObjectAddPropertyListener(
740                    kAudioObjectSystemObject,
741                    &AudioObjectPropertyAddress {
742                        mSelector: if input {
743                            kAudioHardwarePropertyDefaultInputDevice
744                        } else {
745                            kAudioHardwarePropertyDefaultOutputDevice
746                        },
747                        mScope: kAudioObjectPropertyScopeGlobal,
748                        mElement: kAudioObjectPropertyElementMaster,
749                    },
750                    Some(property_listener_handler_shim),
751                    &*callback as *const _ as *mut _,
752                ))?;
753
754                // Also listen for changes to the device configuration
755                let device_id = if input {
756                    let mut input_device: AudioObjectID = 0;
757                    let mut prop_size = std::mem::size_of::<AudioObjectID>() as u32;
758                    let result = coreaudio::sys::AudioObjectGetPropertyData(
759                        kAudioObjectSystemObject,
760                        &AudioObjectPropertyAddress {
761                            mSelector: kAudioHardwarePropertyDefaultInputDevice,
762                            mScope: kAudioObjectPropertyScopeGlobal,
763                            mElement: kAudioObjectPropertyElementMaster,
764                        },
765                        0,
766                        std::ptr::null(),
767                        &mut prop_size as *mut _,
768                        &mut input_device as *mut _ as *mut _,
769                    );
770                    if result != 0 {
771                        log::warn!("Failed to get default input device ID");
772                        0
773                    } else {
774                        input_device
775                    }
776                } else {
777                    let mut output_device: AudioObjectID = 0;
778                    let mut prop_size = std::mem::size_of::<AudioObjectID>() as u32;
779                    let result = coreaudio::sys::AudioObjectGetPropertyData(
780                        kAudioObjectSystemObject,
781                        &AudioObjectPropertyAddress {
782                            mSelector: kAudioHardwarePropertyDefaultOutputDevice,
783                            mScope: kAudioObjectPropertyScopeGlobal,
784                            mElement: kAudioObjectPropertyElementMaster,
785                        },
786                        0,
787                        std::ptr::null(),
788                        &mut prop_size as *mut _,
789                        &mut output_device as *mut _ as *mut _,
790                    );
791                    if result != 0 {
792                        log::warn!("Failed to get default output device ID");
793                        0
794                    } else {
795                        output_device
796                    }
797                };
798
799                if device_id != 0 {
800                    // Listen for format changes on the device
801                    coreaudio::Error::from_os_status(AudioObjectAddPropertyListener(
802                        device_id,
803                        &AudioObjectPropertyAddress {
804                            mSelector: coreaudio::sys::kAudioDevicePropertyStreamFormat,
805                            mScope: if input {
806                                coreaudio::sys::kAudioObjectPropertyScopeInput
807                            } else {
808                                coreaudio::sys::kAudioObjectPropertyScopeOutput
809                            },
810                            mElement: kAudioObjectPropertyElementMaster,
811                        },
812                        Some(property_listener_handler_shim),
813                        &*callback as *const _ as *mut _,
814                    ))?;
815                }
816
817                device_id
818            };
819
820            Ok(Self {
821                rx,
822                callback,
823                input,
824                device_id,
825            })
826        }
827    }
828
829    impl Drop for CoreAudioDefaultDeviceChangeListener {
830        fn drop(&mut self) {
831            unsafe {
832                // Remove the system-level property listener
833                AudioObjectRemovePropertyListener(
834                    kAudioObjectSystemObject,
835                    &AudioObjectPropertyAddress {
836                        mSelector: if self.input {
837                            kAudioHardwarePropertyDefaultInputDevice
838                        } else {
839                            kAudioHardwarePropertyDefaultOutputDevice
840                        },
841                        mScope: kAudioObjectPropertyScopeGlobal,
842                        mElement: kAudioObjectPropertyElementMaster,
843                    },
844                    Some(property_listener_handler_shim),
845                    &*self.callback as *const _ as *mut _,
846                );
847
848                // Remove the device-specific property listener if we have a valid device ID
849                if self.device_id != 0 {
850                    AudioObjectRemovePropertyListener(
851                        self.device_id,
852                        &AudioObjectPropertyAddress {
853                            mSelector: coreaudio::sys::kAudioDevicePropertyStreamFormat,
854                            mScope: if self.input {
855                                coreaudio::sys::kAudioObjectPropertyScopeInput
856                            } else {
857                                coreaudio::sys::kAudioObjectPropertyScopeOutput
858                            },
859                            mElement: kAudioObjectPropertyElementMaster,
860                        },
861                        Some(property_listener_handler_shim),
862                        &*self.callback as *const _ as *mut _,
863                    );
864                }
865            }
866        }
867    }
868
869    impl futures::Stream for CoreAudioDefaultDeviceChangeListener {
870        type Item = ();
871
872        fn poll_next(
873            mut self: std::pin::Pin<&mut Self>,
874            cx: &mut std::task::Context<'_>,
875        ) -> std::task::Poll<Option<Self::Item>> {
876            self.rx.poll_next_unpin(cx)
877        }
878    }
879}
880
881#[cfg(target_os = "macos")]
882type DeviceChangeListener = macos::CoreAudioDefaultDeviceChangeListener;
883
884#[cfg(not(target_os = "macos"))]
885mod noop_change_listener {
886    use std::task::Poll;
887
888    use super::DeviceChangeListenerApi;
889
890    pub struct NoopOutputDeviceChangelistener {}
891
892    impl DeviceChangeListenerApi for NoopOutputDeviceChangelistener {
893        fn new(_input: bool) -> anyhow::Result<Self> {
894            Ok(NoopOutputDeviceChangelistener {})
895        }
896    }
897
898    impl futures::Stream for NoopOutputDeviceChangelistener {
899        type Item = ();
900
901        fn poll_next(
902            self: std::pin::Pin<&mut Self>,
903            _cx: &mut std::task::Context<'_>,
904        ) -> Poll<Option<Self::Item>> {
905            Poll::Pending
906        }
907    }
908}
909
910#[cfg(not(target_os = "macos"))]
911type DeviceChangeListener = noop_change_listener::NoopOutputDeviceChangelistener;