playback.rs

  1use anyhow::{Context as _, Result, anyhow};
  2
  3use cpal::traits::{DeviceTrait, HostTrait, StreamTrait as _};
  4use futures::channel::mpsc::UnboundedSender;
  5use futures::{Stream, StreamExt as _};
  6use gpui::{
  7    BackgroundExecutor, ScreenCaptureFrame, ScreenCaptureSource, ScreenCaptureStream, Task,
  8};
  9use libwebrtc::native::{apm, audio_mixer, audio_resampler};
 10use livekit::track;
 11
 12use livekit::webrtc::{
 13    audio_frame::AudioFrame,
 14    audio_source::{AudioSourceOptions, RtcAudioSource, native::NativeAudioSource},
 15    audio_stream::native::NativeAudioStream,
 16    video_frame::{VideoBuffer, VideoFrame, VideoRotation},
 17    video_source::{RtcVideoSource, VideoResolution, native::NativeVideoSource},
 18    video_stream::native::NativeVideoStream,
 19};
 20use parking_lot::Mutex;
 21use std::cell::RefCell;
 22use std::sync::Weak;
 23use std::sync::atomic::{self, AtomicI32};
 24use std::time::Duration;
 25use std::{borrow::Cow, collections::VecDeque, sync::Arc, thread};
 26use util::{ResultExt as _, maybe};
 27
 28pub(crate) struct AudioStack {
 29    executor: BackgroundExecutor,
 30    apm: Arc<Mutex<apm::AudioProcessingModule>>,
 31    mixer: Arc<Mutex<audio_mixer::AudioMixer>>,
 32    _output_task: RefCell<Weak<Task<()>>>,
 33    next_ssrc: AtomicI32,
 34}
 35
 36// NOTE: We use WebRTC's mixer which only supports
 37// 16kHz, 32kHz and 48kHz. As 48 is the most common "next step up"
 38// for audio output devices like speakers/bluetooth, we just hard-code
 39// this; and downsample when we need to.
 40const SAMPLE_RATE: u32 = 48000;
 41const NUM_CHANNELS: u32 = 2;
 42
 43impl AudioStack {
 44    pub(crate) fn new(executor: BackgroundExecutor) -> Self {
 45        let apm = Arc::new(Mutex::new(apm::AudioProcessingModule::new(
 46            true, true, true, true,
 47        )));
 48        let mixer = Arc::new(Mutex::new(audio_mixer::AudioMixer::new()));
 49        Self {
 50            executor,
 51            apm,
 52            mixer,
 53            _output_task: RefCell::new(Weak::new()),
 54            next_ssrc: AtomicI32::new(1),
 55        }
 56    }
 57
 58    pub(crate) fn play_remote_audio_track(
 59        &self,
 60        track: &livekit::track::RemoteAudioTrack,
 61    ) -> AudioStream {
 62        let output_task = self.start_output();
 63
 64        let next_ssrc = self.next_ssrc.fetch_add(1, atomic::Ordering::Relaxed);
 65        let source = AudioMixerSource {
 66            ssrc: next_ssrc,
 67            sample_rate: SAMPLE_RATE,
 68            num_channels: NUM_CHANNELS,
 69            buffer: Arc::default(),
 70        };
 71        self.mixer.lock().add_source(source.clone());
 72
 73        let mut stream = NativeAudioStream::new(
 74            track.rtc_track(),
 75            source.sample_rate as i32,
 76            source.num_channels as i32,
 77        );
 78
 79        let receive_task = self.executor.spawn({
 80            let source = source.clone();
 81            async move {
 82                while let Some(frame) = stream.next().await {
 83                    source.receive(frame);
 84                }
 85            }
 86        });
 87
 88        let mixer = self.mixer.clone();
 89        let on_drop = util::defer(move || {
 90            mixer.lock().remove_source(source.ssrc);
 91            drop(receive_task);
 92            drop(output_task);
 93        });
 94
 95        AudioStream::Output {
 96            _drop: Box::new(on_drop),
 97        }
 98    }
 99
100    pub(crate) fn capture_local_microphone_track(
101        &self,
102    ) -> Result<(crate::LocalAudioTrack, AudioStream)> {
103        let source = NativeAudioSource::new(
104            // n.b. this struct's options are always ignored, noise cancellation is provided by apm.
105            AudioSourceOptions::default(),
106            SAMPLE_RATE,
107            NUM_CHANNELS,
108            10,
109        );
110
111        let track = track::LocalAudioTrack::create_audio_track(
112            "microphone",
113            RtcAudioSource::Native(source.clone()),
114        );
115
116        let apm = self.apm.clone();
117
118        let (frame_tx, mut frame_rx) = futures::channel::mpsc::unbounded();
119        let transmit_task = self.executor.spawn({
120            let source = source.clone();
121            async move {
122                while let Some(frame) = frame_rx.next().await {
123                    source.capture_frame(&frame).await.log_err();
124                }
125            }
126        });
127        let capture_task = self.executor.spawn(async move {
128            Self::capture_input(apm, frame_tx, SAMPLE_RATE, NUM_CHANNELS).await
129        });
130
131        let on_drop = util::defer(|| {
132            drop(transmit_task);
133            drop(capture_task);
134        });
135        return Ok((
136            super::LocalAudioTrack(track),
137            AudioStream::Output {
138                _drop: Box::new(on_drop),
139            },
140        ));
141    }
142
143    fn start_output(&self) -> Arc<Task<()>> {
144        if let Some(task) = self._output_task.borrow().upgrade() {
145            return task;
146        }
147        let task = Arc::new(self.executor.spawn({
148            let apm = self.apm.clone();
149            let mixer = self.mixer.clone();
150            async move {
151                Self::play_output(apm, mixer, SAMPLE_RATE, NUM_CHANNELS)
152                    .await
153                    .log_err();
154            }
155        }));
156        *self._output_task.borrow_mut() = Arc::downgrade(&task);
157        task
158    }
159
160    async fn play_output(
161        apm: Arc<Mutex<apm::AudioProcessingModule>>,
162        mixer: Arc<Mutex<audio_mixer::AudioMixer>>,
163        sample_rate: u32,
164        num_channels: u32,
165    ) -> Result<()> {
166        let mut default_change_listener = DeviceChangeListener::new(false)?;
167
168        loop {
169            let (output_device, output_config) = default_device(false)?;
170            let (end_on_drop_tx, end_on_drop_rx) = std::sync::mpsc::channel::<()>();
171            let mixer = mixer.clone();
172            let apm = apm.clone();
173            let mut resampler = audio_resampler::AudioResampler::default();
174            let mut buf = Vec::new();
175
176            thread::spawn(move || {
177                let output_stream = output_device.build_output_stream(
178                    &output_config.config(),
179                    {
180                        move |mut data, _info| {
181                            while data.len() > 0 {
182                                if data.len() <= buf.len() {
183                                    let rest = buf.split_off(data.len());
184                                    data.copy_from_slice(&buf);
185                                    buf = rest;
186                                    return;
187                                }
188                                if buf.len() > 0 {
189                                    let (prefix, suffix) = data.split_at_mut(buf.len());
190                                    prefix.copy_from_slice(&buf);
191                                    data = suffix;
192                                }
193
194                                let mut mixer = mixer.lock();
195                                let mixed = mixer.mix(output_config.channels() as usize);
196                                let sampled = resampler.remix_and_resample(
197                                    mixed,
198                                    sample_rate / 100,
199                                    num_channels,
200                                    sample_rate,
201                                    output_config.channels() as u32,
202                                    output_config.sample_rate().0,
203                                );
204                                buf = sampled.to_vec();
205                                apm.lock()
206                                    .process_reverse_stream(
207                                        &mut buf,
208                                        output_config.sample_rate().0 as i32,
209                                        output_config.channels() as i32,
210                                    )
211                                    .ok();
212                            }
213                        }
214                    },
215                    |error| log::error!("error playing audio track: {:?}", error),
216                    Some(Duration::from_millis(100)),
217                );
218
219                let Some(output_stream) = output_stream.log_err() else {
220                    return;
221                };
222
223                output_stream.play().log_err();
224                // Block forever to keep the output stream alive
225                end_on_drop_rx.recv().ok();
226            });
227
228            default_change_listener.next().await;
229            drop(end_on_drop_tx)
230        }
231    }
232
233    async fn capture_input(
234        apm: Arc<Mutex<apm::AudioProcessingModule>>,
235        frame_tx: UnboundedSender<AudioFrame<'static>>,
236        sample_rate: u32,
237        num_channels: u32,
238    ) -> Result<()> {
239        let mut default_change_listener = DeviceChangeListener::new(true)?;
240        loop {
241            let (device, config) = default_device(true)?;
242            let (end_on_drop_tx, end_on_drop_rx) = std::sync::mpsc::channel::<()>();
243            let apm = apm.clone();
244            let frame_tx = frame_tx.clone();
245            let mut resampler = audio_resampler::AudioResampler::default();
246
247            thread::spawn(move || {
248                maybe!({
249                    if let Some(name) = device.name().ok() {
250                        log::info!("Using microphone: {}", name)
251                    } else {
252                        log::info!("Using microphone: <unknown>");
253                    }
254
255                    let ten_ms_buffer_size =
256                        (config.channels() as u32 * config.sample_rate().0 / 100) as usize;
257                    let mut buf: Vec<i16> = Vec::with_capacity(ten_ms_buffer_size);
258
259                    let stream = device
260                        .build_input_stream_raw(
261                            &config.config(),
262                            cpal::SampleFormat::I16,
263                            move |data, _: &_| {
264                                let mut data = data.as_slice::<i16>().unwrap();
265                                while data.len() > 0 {
266                                    let remainder = (buf.capacity() - buf.len()).min(data.len());
267                                    buf.extend_from_slice(&data[..remainder]);
268                                    data = &data[remainder..];
269
270                                    if buf.capacity() == buf.len() {
271                                        let mut sampled = resampler
272                                            .remix_and_resample(
273                                                buf.as_slice(),
274                                                config.sample_rate().0 / 100,
275                                                config.channels() as u32,
276                                                config.sample_rate().0,
277                                                num_channels,
278                                                sample_rate,
279                                            )
280                                            .to_owned();
281                                        apm.lock()
282                                            .process_stream(
283                                                &mut sampled,
284                                                sample_rate as i32,
285                                                num_channels as i32,
286                                            )
287                                            .log_err();
288                                        buf.clear();
289                                        frame_tx
290                                            .unbounded_send(AudioFrame {
291                                                data: Cow::Owned(sampled),
292                                                sample_rate,
293                                                num_channels,
294                                                samples_per_channel: sample_rate / 100,
295                                            })
296                                            .ok();
297                                    }
298                                }
299                            },
300                            |err| log::error!("error capturing audio track: {:?}", err),
301                            Some(Duration::from_millis(100)),
302                        )
303                        .context("failed to build input stream")?;
304
305                    stream.play()?;
306                    // Keep the thread alive and holding onto the `stream`
307                    end_on_drop_rx.recv().ok();
308                    anyhow::Ok(Some(()))
309                })
310                .log_err();
311            });
312
313            default_change_listener.next().await;
314            drop(end_on_drop_tx)
315        }
316    }
317}
318
319use super::LocalVideoTrack;
320
321pub enum AudioStream {
322    Input { _task: Task<()> },
323    Output { _drop: Box<dyn std::any::Any> },
324}
325
326pub(crate) async fn capture_local_video_track(
327    capture_source: &dyn ScreenCaptureSource,
328    cx: &mut gpui::AsyncApp,
329) -> Result<(crate::LocalVideoTrack, Box<dyn ScreenCaptureStream>)> {
330    let resolution = capture_source.resolution()?;
331    let track_source = gpui_tokio::Tokio::spawn(cx, async move {
332        NativeVideoSource::new(VideoResolution {
333            width: resolution.width.0 as u32,
334            height: resolution.height.0 as u32,
335        })
336    })?
337    .await?;
338
339    let capture_stream = capture_source
340        .stream({
341            let track_source = track_source.clone();
342            Box::new(move |frame| {
343                if let Some(buffer) = video_frame_buffer_to_webrtc(frame) {
344                    track_source.capture_frame(&VideoFrame {
345                        rotation: VideoRotation::VideoRotation0,
346                        timestamp_us: 0,
347                        buffer,
348                    });
349                }
350            })
351        })
352        .await??;
353
354    Ok((
355        LocalVideoTrack(track::LocalVideoTrack::create_video_track(
356            "screen share",
357            RtcVideoSource::Native(track_source),
358        )),
359        capture_stream,
360    ))
361}
362
363fn default_device(input: bool) -> Result<(cpal::Device, cpal::SupportedStreamConfig)> {
364    let device;
365    let config;
366    if input {
367        device = cpal::default_host()
368            .default_input_device()
369            .ok_or_else(|| anyhow!("no audio input device available"))?;
370        config = device
371            .default_input_config()
372            .context("failed to get default input config")?;
373    } else {
374        device = cpal::default_host()
375            .default_output_device()
376            .ok_or_else(|| anyhow!("no audio output device available"))?;
377        config = device
378            .default_output_config()
379            .context("failed to get default output config")?;
380    }
381    Ok((device, config))
382}
383
384#[derive(Clone)]
385struct AudioMixerSource {
386    ssrc: i32,
387    sample_rate: u32,
388    num_channels: u32,
389    buffer: Arc<Mutex<VecDeque<Vec<i16>>>>,
390}
391
392impl AudioMixerSource {
393    fn receive(&self, frame: AudioFrame) {
394        assert_eq!(
395            frame.data.len() as u32,
396            self.sample_rate * self.num_channels / 100
397        );
398
399        let mut buffer = self.buffer.lock();
400        buffer.push_back(frame.data.to_vec());
401        while buffer.len() > 10 {
402            buffer.pop_front();
403        }
404    }
405}
406
407impl libwebrtc::native::audio_mixer::AudioMixerSource for AudioMixerSource {
408    fn ssrc(&self) -> i32 {
409        self.ssrc
410    }
411
412    fn preferred_sample_rate(&self) -> u32 {
413        self.sample_rate
414    }
415
416    fn get_audio_frame_with_info<'a>(&self, target_sample_rate: u32) -> Option<AudioFrame> {
417        assert_eq!(self.sample_rate, target_sample_rate);
418        let buf = self.buffer.lock().pop_front()?;
419        Some(AudioFrame {
420            data: Cow::Owned(buf),
421            sample_rate: self.sample_rate,
422            num_channels: self.num_channels,
423            samples_per_channel: self.sample_rate / 100,
424        })
425    }
426}
427
428pub fn play_remote_video_track(
429    track: &crate::RemoteVideoTrack,
430) -> impl Stream<Item = RemoteVideoFrame> + use<> {
431    #[cfg(target_os = "macos")]
432    {
433        let mut pool = None;
434        let most_recent_frame_size = (0, 0);
435        NativeVideoStream::new(track.0.rtc_track()).filter_map(move |frame| {
436            if pool == None
437                || most_recent_frame_size != (frame.buffer.width(), frame.buffer.height())
438            {
439                pool = create_buffer_pool(frame.buffer.width(), frame.buffer.height()).log_err();
440            }
441            let pool = pool.clone();
442            async move {
443                if frame.buffer.width() < 10 && frame.buffer.height() < 10 {
444                    // when the remote stops sharing, we get an 8x8 black image.
445                    // In a lil bit, the unpublish will come through and close the view,
446                    // but until then, don't flash black.
447                    return None;
448                }
449
450                video_frame_buffer_from_webrtc(pool?, frame.buffer)
451            }
452        })
453    }
454    #[cfg(not(target_os = "macos"))]
455    {
456        NativeVideoStream::new(track.0.rtc_track())
457            .filter_map(|frame| async move { video_frame_buffer_from_webrtc(frame.buffer) })
458    }
459}
460
461#[cfg(target_os = "macos")]
462fn create_buffer_pool(
463    width: u32,
464    height: u32,
465) -> Result<core_video::pixel_buffer_pool::CVPixelBufferPool> {
466    use core_foundation::{base::TCFType, number::CFNumber, string::CFString};
467    use core_video::pixel_buffer;
468    use core_video::{
469        pixel_buffer::kCVPixelFormatType_420YpCbCr8BiPlanarFullRange,
470        pixel_buffer_io_surface::kCVPixelBufferIOSurfaceCoreAnimationCompatibilityKey,
471        pixel_buffer_pool::{self},
472    };
473
474    let width_key: CFString =
475        unsafe { CFString::wrap_under_get_rule(pixel_buffer::kCVPixelBufferWidthKey) };
476    let height_key: CFString =
477        unsafe { CFString::wrap_under_get_rule(pixel_buffer::kCVPixelBufferHeightKey) };
478    let animation_key: CFString = unsafe {
479        CFString::wrap_under_get_rule(kCVPixelBufferIOSurfaceCoreAnimationCompatibilityKey)
480    };
481    let format_key: CFString =
482        unsafe { CFString::wrap_under_get_rule(pixel_buffer::kCVPixelBufferPixelFormatTypeKey) };
483
484    let yes: CFNumber = 1.into();
485    let width: CFNumber = (width as i32).into();
486    let height: CFNumber = (height as i32).into();
487    let format: CFNumber = (kCVPixelFormatType_420YpCbCr8BiPlanarFullRange as i64).into();
488
489    let buffer_attributes = core_foundation::dictionary::CFDictionary::from_CFType_pairs(&[
490        (width_key, width.into_CFType()),
491        (height_key, height.into_CFType()),
492        (animation_key, yes.into_CFType()),
493        (format_key, format.into_CFType()),
494    ]);
495
496    pixel_buffer_pool::CVPixelBufferPool::new(None, Some(&buffer_attributes)).map_err(|cv_return| {
497        anyhow!(
498            "failed to create pixel buffer pool: CVReturn({})",
499            cv_return
500        )
501    })
502}
503
504#[cfg(target_os = "macos")]
505pub type RemoteVideoFrame = core_video::pixel_buffer::CVPixelBuffer;
506
507#[cfg(target_os = "macos")]
508fn video_frame_buffer_from_webrtc(
509    pool: core_video::pixel_buffer_pool::CVPixelBufferPool,
510    buffer: Box<dyn VideoBuffer>,
511) -> Option<RemoteVideoFrame> {
512    use core_foundation::base::TCFType;
513    use core_video::{pixel_buffer::CVPixelBuffer, r#return::kCVReturnSuccess};
514    use livekit::webrtc::native::yuv_helper::i420_to_nv12;
515
516    if let Some(native) = buffer.as_native() {
517        let pixel_buffer = native.get_cv_pixel_buffer();
518        if pixel_buffer.is_null() {
519            return None;
520        }
521        return unsafe { Some(CVPixelBuffer::wrap_under_get_rule(pixel_buffer as _)) };
522    }
523
524    let i420_buffer = buffer.as_i420()?;
525    let pixel_buffer = pool.create_pixel_buffer().log_err()?;
526
527    let image_buffer = unsafe {
528        if pixel_buffer.lock_base_address(0) != kCVReturnSuccess {
529            return None;
530        }
531
532        let dst_y = pixel_buffer.get_base_address_of_plane(0);
533        let dst_y_stride = pixel_buffer.get_bytes_per_row_of_plane(0);
534        let dst_y_len = pixel_buffer.get_height_of_plane(0) * dst_y_stride;
535        let dst_uv = pixel_buffer.get_base_address_of_plane(1);
536        let dst_uv_stride = pixel_buffer.get_bytes_per_row_of_plane(1);
537        let dst_uv_len = pixel_buffer.get_height_of_plane(1) * dst_uv_stride;
538        let width = pixel_buffer.get_width();
539        let height = pixel_buffer.get_height();
540        let dst_y_buffer = std::slice::from_raw_parts_mut(dst_y as *mut u8, dst_y_len);
541        let dst_uv_buffer = std::slice::from_raw_parts_mut(dst_uv as *mut u8, dst_uv_len);
542
543        let (stride_y, stride_u, stride_v) = i420_buffer.strides();
544        let (src_y, src_u, src_v) = i420_buffer.data();
545        i420_to_nv12(
546            src_y,
547            stride_y,
548            src_u,
549            stride_u,
550            src_v,
551            stride_v,
552            dst_y_buffer,
553            dst_y_stride as u32,
554            dst_uv_buffer,
555            dst_uv_stride as u32,
556            width as i32,
557            height as i32,
558        );
559
560        if pixel_buffer.unlock_base_address(0) != kCVReturnSuccess {
561            return None;
562        }
563
564        pixel_buffer
565    };
566
567    Some(image_buffer)
568}
569
570#[cfg(not(target_os = "macos"))]
571pub type RemoteVideoFrame = Arc<gpui::RenderImage>;
572
573#[cfg(not(target_os = "macos"))]
574fn video_frame_buffer_from_webrtc(buffer: Box<dyn VideoBuffer>) -> Option<RemoteVideoFrame> {
575    use gpui::RenderImage;
576    use image::{Frame, RgbaImage};
577    use livekit::webrtc::prelude::VideoFormatType;
578    use smallvec::SmallVec;
579    use std::alloc::{Layout, alloc};
580
581    let width = buffer.width();
582    let height = buffer.height();
583    let stride = width * 4;
584    let byte_len = (stride * height) as usize;
585    let argb_image = unsafe {
586        // Motivation for this unsafe code is to avoid initializing the frame data, since to_argb
587        // will write all bytes anyway.
588        let start_ptr = alloc(Layout::array::<u8>(byte_len).log_err()?);
589        if start_ptr.is_null() {
590            return None;
591        }
592        let bgra_frame_slice = std::slice::from_raw_parts_mut(start_ptr, byte_len);
593        buffer.to_argb(
594            VideoFormatType::ARGB, // For some reason, this displays correctly while RGBA (the correct format) does not
595            bgra_frame_slice,
596            stride,
597            width as i32,
598            height as i32,
599        );
600        Vec::from_raw_parts(start_ptr, byte_len, byte_len)
601    };
602
603    Some(Arc::new(RenderImage::new(SmallVec::from_elem(
604        Frame::new(
605            RgbaImage::from_raw(width, height, argb_image)
606                .with_context(|| "Bug: not enough bytes allocated for image.")
607                .log_err()?,
608        ),
609        1,
610    ))))
611}
612
613#[cfg(target_os = "macos")]
614fn video_frame_buffer_to_webrtc(frame: ScreenCaptureFrame) -> Option<impl AsRef<dyn VideoBuffer>> {
615    use livekit::webrtc;
616
617    let pixel_buffer = frame.0.as_concrete_TypeRef();
618    std::mem::forget(frame.0);
619    unsafe {
620        Some(webrtc::video_frame::native::NativeBuffer::from_cv_pixel_buffer(pixel_buffer as _))
621    }
622}
623
624#[cfg(not(target_os = "macos"))]
625fn video_frame_buffer_to_webrtc(_frame: ScreenCaptureFrame) -> Option<impl AsRef<dyn VideoBuffer>> {
626    None as Option<Box<dyn VideoBuffer>>
627}
628
629trait DeviceChangeListenerApi: Stream<Item = ()> + Sized {
630    fn new(input: bool) -> Result<Self>;
631}
632
633#[cfg(target_os = "macos")]
634mod macos {
635
636    use coreaudio::sys::{
637        AudioObjectAddPropertyListener, AudioObjectID, AudioObjectPropertyAddress,
638        AudioObjectRemovePropertyListener, OSStatus, kAudioHardwarePropertyDefaultInputDevice,
639        kAudioHardwarePropertyDefaultOutputDevice, kAudioObjectPropertyElementMaster,
640        kAudioObjectPropertyScopeGlobal, kAudioObjectSystemObject,
641    };
642    use futures::{StreamExt, channel::mpsc::UnboundedReceiver};
643
644    /// Implementation from: https://github.com/zed-industries/cpal/blob/fd8bc2fd39f1f5fdee5a0690656caff9a26d9d50/src/host/coreaudio/macos/property_listener.rs#L15
645    pub struct CoreAudioDefaultDeviceChangeListener {
646        rx: UnboundedReceiver<()>,
647        callback: Box<PropertyListenerCallbackWrapper>,
648        input: bool,
649    }
650
651    trait _AssertSend: Send {}
652    impl _AssertSend for CoreAudioDefaultDeviceChangeListener {}
653
654    struct PropertyListenerCallbackWrapper(Box<dyn FnMut() + Send>);
655
656    unsafe extern "C" fn property_listener_handler_shim(
657        _: AudioObjectID,
658        _: u32,
659        _: *const AudioObjectPropertyAddress,
660        callback: *mut ::std::os::raw::c_void,
661    ) -> OSStatus {
662        let wrapper = callback as *mut PropertyListenerCallbackWrapper;
663        unsafe { (*wrapper).0() };
664        0
665    }
666
667    impl super::DeviceChangeListenerApi for CoreAudioDefaultDeviceChangeListener {
668        fn new(input: bool) -> gpui::Result<Self> {
669            let (tx, rx) = futures::channel::mpsc::unbounded();
670
671            let callback = Box::new(PropertyListenerCallbackWrapper(Box::new(move || {
672                tx.unbounded_send(()).ok();
673            })));
674
675            unsafe {
676                coreaudio::Error::from_os_status(AudioObjectAddPropertyListener(
677                    kAudioObjectSystemObject,
678                    &AudioObjectPropertyAddress {
679                        mSelector: if input {
680                            kAudioHardwarePropertyDefaultInputDevice
681                        } else {
682                            kAudioHardwarePropertyDefaultOutputDevice
683                        },
684                        mScope: kAudioObjectPropertyScopeGlobal,
685                        mElement: kAudioObjectPropertyElementMaster,
686                    },
687                    Some(property_listener_handler_shim),
688                    &*callback as *const _ as *mut _,
689                ))?;
690            }
691
692            Ok(Self {
693                rx,
694                callback,
695                input,
696            })
697        }
698    }
699
700    impl Drop for CoreAudioDefaultDeviceChangeListener {
701        fn drop(&mut self) {
702            unsafe {
703                AudioObjectRemovePropertyListener(
704                    kAudioObjectSystemObject,
705                    &AudioObjectPropertyAddress {
706                        mSelector: if self.input {
707                            kAudioHardwarePropertyDefaultInputDevice
708                        } else {
709                            kAudioHardwarePropertyDefaultOutputDevice
710                        },
711                        mScope: kAudioObjectPropertyScopeGlobal,
712                        mElement: kAudioObjectPropertyElementMaster,
713                    },
714                    Some(property_listener_handler_shim),
715                    &*self.callback as *const _ as *mut _,
716                );
717            }
718        }
719    }
720
721    impl futures::Stream for CoreAudioDefaultDeviceChangeListener {
722        type Item = ();
723
724        fn poll_next(
725            mut self: std::pin::Pin<&mut Self>,
726            cx: &mut std::task::Context<'_>,
727        ) -> std::task::Poll<Option<Self::Item>> {
728            self.rx.poll_next_unpin(cx)
729        }
730    }
731}
732
733#[cfg(target_os = "macos")]
734type DeviceChangeListener = macos::CoreAudioDefaultDeviceChangeListener;
735
736#[cfg(not(target_os = "macos"))]
737mod noop_change_listener {
738    use std::task::Poll;
739
740    use super::DeviceChangeListenerApi;
741
742    pub struct NoopOutputDeviceChangelistener {}
743
744    impl DeviceChangeListenerApi for NoopOutputDeviceChangelistener {
745        fn new(_input: bool) -> anyhow::Result<Self> {
746            Ok(NoopOutputDeviceChangelistener {})
747        }
748    }
749
750    impl futures::Stream for NoopOutputDeviceChangelistener {
751        type Item = ();
752
753        fn poll_next(
754            self: std::pin::Pin<&mut Self>,
755            _cx: &mut std::task::Context<'_>,
756        ) -> Poll<Option<Self::Item>> {
757            Poll::Pending
758        }
759    }
760}
761
762#[cfg(not(target_os = "macos"))]
763type DeviceChangeListener = noop_change_listener::NoopOutputDeviceChangelistener;