1use anyhow::{Context as _, Result, anyhow};
2
3use cpal::traits::{DeviceTrait, HostTrait, StreamTrait as _};
4use futures::channel::mpsc::UnboundedSender;
5use futures::{Stream, StreamExt as _};
6use gpui::{
7 BackgroundExecutor, ScreenCaptureFrame, ScreenCaptureSource, ScreenCaptureStream, Task,
8};
9use libwebrtc::native::{apm, audio_mixer, audio_resampler};
10use livekit::track;
11
12use livekit::webrtc::{
13 audio_frame::AudioFrame,
14 audio_source::{AudioSourceOptions, RtcAudioSource, native::NativeAudioSource},
15 audio_stream::native::NativeAudioStream,
16 video_frame::{VideoBuffer, VideoFrame, VideoRotation},
17 video_source::{RtcVideoSource, VideoResolution, native::NativeVideoSource},
18 video_stream::native::NativeVideoStream,
19};
20use parking_lot::Mutex;
21use std::cell::RefCell;
22use std::sync::Weak;
23use std::sync::atomic::{self, AtomicI32};
24use std::time::Duration;
25use std::{borrow::Cow, collections::VecDeque, sync::Arc, thread};
26use util::{ResultExt as _, maybe};
27
28pub(crate) struct AudioStack {
29 executor: BackgroundExecutor,
30 apm: Arc<Mutex<apm::AudioProcessingModule>>,
31 mixer: Arc<Mutex<audio_mixer::AudioMixer>>,
32 _output_task: RefCell<Weak<Task<()>>>,
33 next_ssrc: AtomicI32,
34}
35
36// NOTE: We use WebRTC's mixer which only supports
37// 16kHz, 32kHz and 48kHz. As 48 is the most common "next step up"
38// for audio output devices like speakers/bluetooth, we just hard-code
39// this; and downsample when we need to.
40const SAMPLE_RATE: u32 = 48000;
41const NUM_CHANNELS: u32 = 2;
42
43impl AudioStack {
44 pub(crate) fn new(executor: BackgroundExecutor) -> Self {
45 let apm = Arc::new(Mutex::new(apm::AudioProcessingModule::new(
46 true, true, true, true,
47 )));
48 let mixer = Arc::new(Mutex::new(audio_mixer::AudioMixer::new()));
49 Self {
50 executor,
51 apm,
52 mixer,
53 _output_task: RefCell::new(Weak::new()),
54 next_ssrc: AtomicI32::new(1),
55 }
56 }
57
58 pub(crate) fn play_remote_audio_track(
59 &self,
60 track: &livekit::track::RemoteAudioTrack,
61 ) -> AudioStream {
62 let output_task = self.start_output();
63
64 let next_ssrc = self.next_ssrc.fetch_add(1, atomic::Ordering::Relaxed);
65 let source = AudioMixerSource {
66 ssrc: next_ssrc,
67 sample_rate: SAMPLE_RATE,
68 num_channels: NUM_CHANNELS,
69 buffer: Arc::default(),
70 };
71 self.mixer.lock().add_source(source.clone());
72
73 let mut stream = NativeAudioStream::new(
74 track.rtc_track(),
75 source.sample_rate as i32,
76 source.num_channels as i32,
77 );
78
79 let receive_task = self.executor.spawn({
80 let source = source.clone();
81 async move {
82 while let Some(frame) = stream.next().await {
83 source.receive(frame);
84 }
85 }
86 });
87
88 let mixer = self.mixer.clone();
89 let on_drop = util::defer(move || {
90 mixer.lock().remove_source(source.ssrc);
91 drop(receive_task);
92 drop(output_task);
93 });
94
95 AudioStream::Output {
96 _drop: Box::new(on_drop),
97 }
98 }
99
100 pub(crate) fn capture_local_microphone_track(
101 &self,
102 ) -> Result<(crate::LocalAudioTrack, AudioStream)> {
103 let source = NativeAudioSource::new(
104 // n.b. this struct's options are always ignored, noise cancellation is provided by apm.
105 AudioSourceOptions::default(),
106 SAMPLE_RATE,
107 NUM_CHANNELS,
108 10,
109 );
110
111 let track = track::LocalAudioTrack::create_audio_track(
112 "microphone",
113 RtcAudioSource::Native(source.clone()),
114 );
115
116 let apm = self.apm.clone();
117
118 let (frame_tx, mut frame_rx) = futures::channel::mpsc::unbounded();
119 let transmit_task = self.executor.spawn({
120 let source = source.clone();
121 async move {
122 while let Some(frame) = frame_rx.next().await {
123 source.capture_frame(&frame).await.log_err();
124 }
125 }
126 });
127 let capture_task = self.executor.spawn(async move {
128 Self::capture_input(apm, frame_tx, SAMPLE_RATE, NUM_CHANNELS).await
129 });
130
131 let on_drop = util::defer(|| {
132 drop(transmit_task);
133 drop(capture_task);
134 });
135 return Ok((
136 super::LocalAudioTrack(track),
137 AudioStream::Output {
138 _drop: Box::new(on_drop),
139 },
140 ));
141 }
142
143 fn start_output(&self) -> Arc<Task<()>> {
144 if let Some(task) = self._output_task.borrow().upgrade() {
145 return task;
146 }
147 let task = Arc::new(self.executor.spawn({
148 let apm = self.apm.clone();
149 let mixer = self.mixer.clone();
150 async move {
151 Self::play_output(apm, mixer, SAMPLE_RATE, NUM_CHANNELS)
152 .await
153 .log_err();
154 }
155 }));
156 *self._output_task.borrow_mut() = Arc::downgrade(&task);
157 task
158 }
159
160 async fn play_output(
161 apm: Arc<Mutex<apm::AudioProcessingModule>>,
162 mixer: Arc<Mutex<audio_mixer::AudioMixer>>,
163 sample_rate: u32,
164 num_channels: u32,
165 ) -> Result<()> {
166 let mut default_change_listener = DeviceChangeListener::new(false)?;
167
168 loop {
169 let (output_device, output_config) = default_device(false)?;
170 let (end_on_drop_tx, end_on_drop_rx) = std::sync::mpsc::channel::<()>();
171 let mixer = mixer.clone();
172 let apm = apm.clone();
173 let mut resampler = audio_resampler::AudioResampler::default();
174 let mut buf = Vec::new();
175
176 thread::spawn(move || {
177 let output_stream = output_device.build_output_stream(
178 &output_config.config(),
179 {
180 move |mut data, _info| {
181 while data.len() > 0 {
182 if data.len() <= buf.len() {
183 let rest = buf.split_off(data.len());
184 data.copy_from_slice(&buf);
185 buf = rest;
186 return;
187 }
188 if buf.len() > 0 {
189 let (prefix, suffix) = data.split_at_mut(buf.len());
190 prefix.copy_from_slice(&buf);
191 data = suffix;
192 }
193
194 let mut mixer = mixer.lock();
195 let mixed = mixer.mix(output_config.channels() as usize);
196 let sampled = resampler.remix_and_resample(
197 mixed,
198 sample_rate / 100,
199 num_channels,
200 sample_rate,
201 output_config.channels() as u32,
202 output_config.sample_rate().0,
203 );
204 buf = sampled.to_vec();
205 apm.lock()
206 .process_reverse_stream(
207 &mut buf,
208 output_config.sample_rate().0 as i32,
209 output_config.channels() as i32,
210 )
211 .ok();
212 }
213 }
214 },
215 |error| log::error!("error playing audio track: {:?}", error),
216 Some(Duration::from_millis(100)),
217 );
218
219 let Some(output_stream) = output_stream.log_err() else {
220 return;
221 };
222
223 output_stream.play().log_err();
224 // Block forever to keep the output stream alive
225 end_on_drop_rx.recv().ok();
226 });
227
228 default_change_listener.next().await;
229 drop(end_on_drop_tx)
230 }
231 }
232
233 async fn capture_input(
234 apm: Arc<Mutex<apm::AudioProcessingModule>>,
235 frame_tx: UnboundedSender<AudioFrame<'static>>,
236 sample_rate: u32,
237 num_channels: u32,
238 ) -> Result<()> {
239 let mut default_change_listener = DeviceChangeListener::new(true)?;
240 loop {
241 let (device, config) = default_device(true)?;
242 let (end_on_drop_tx, end_on_drop_rx) = std::sync::mpsc::channel::<()>();
243 let apm = apm.clone();
244 let frame_tx = frame_tx.clone();
245 let mut resampler = audio_resampler::AudioResampler::default();
246
247 thread::spawn(move || {
248 maybe!({
249 if let Some(name) = device.name().ok() {
250 log::info!("Using microphone: {}", name)
251 } else {
252 log::info!("Using microphone: <unknown>");
253 }
254
255 let ten_ms_buffer_size =
256 (config.channels() as u32 * config.sample_rate().0 / 100) as usize;
257 let mut buf: Vec<i16> = Vec::with_capacity(ten_ms_buffer_size);
258
259 let stream = device
260 .build_input_stream_raw(
261 &config.config(),
262 cpal::SampleFormat::I16,
263 move |data, _: &_| {
264 let mut data = data.as_slice::<i16>().unwrap();
265 while data.len() > 0 {
266 let remainder = (buf.capacity() - buf.len()).min(data.len());
267 buf.extend_from_slice(&data[..remainder]);
268 data = &data[remainder..];
269
270 if buf.capacity() == buf.len() {
271 let mut sampled = resampler
272 .remix_and_resample(
273 buf.as_slice(),
274 config.sample_rate().0 / 100,
275 config.channels() as u32,
276 config.sample_rate().0,
277 num_channels,
278 sample_rate,
279 )
280 .to_owned();
281 apm.lock()
282 .process_stream(
283 &mut sampled,
284 sample_rate as i32,
285 num_channels as i32,
286 )
287 .log_err();
288 buf.clear();
289 frame_tx
290 .unbounded_send(AudioFrame {
291 data: Cow::Owned(sampled),
292 sample_rate,
293 num_channels,
294 samples_per_channel: sample_rate / 100,
295 })
296 .ok();
297 }
298 }
299 },
300 |err| log::error!("error capturing audio track: {:?}", err),
301 Some(Duration::from_millis(100)),
302 )
303 .context("failed to build input stream")?;
304
305 stream.play()?;
306 // Keep the thread alive and holding onto the `stream`
307 end_on_drop_rx.recv().ok();
308 anyhow::Ok(Some(()))
309 })
310 .log_err();
311 });
312
313 default_change_listener.next().await;
314 drop(end_on_drop_tx)
315 }
316 }
317}
318
319use super::LocalVideoTrack;
320
321pub enum AudioStream {
322 Input { _task: Task<()> },
323 Output { _drop: Box<dyn std::any::Any> },
324}
325
326pub(crate) async fn capture_local_video_track(
327 capture_source: &dyn ScreenCaptureSource,
328 cx: &mut gpui::AsyncApp,
329) -> Result<(crate::LocalVideoTrack, Box<dyn ScreenCaptureStream>)> {
330 let resolution = capture_source.resolution()?;
331 let track_source = gpui_tokio::Tokio::spawn(cx, async move {
332 NativeVideoSource::new(VideoResolution {
333 width: resolution.width.0 as u32,
334 height: resolution.height.0 as u32,
335 })
336 })?
337 .await?;
338
339 let capture_stream = capture_source
340 .stream({
341 let track_source = track_source.clone();
342 Box::new(move |frame| {
343 if let Some(buffer) = video_frame_buffer_to_webrtc(frame) {
344 track_source.capture_frame(&VideoFrame {
345 rotation: VideoRotation::VideoRotation0,
346 timestamp_us: 0,
347 buffer,
348 });
349 }
350 })
351 })
352 .await??;
353
354 Ok((
355 LocalVideoTrack(track::LocalVideoTrack::create_video_track(
356 "screen share",
357 RtcVideoSource::Native(track_source),
358 )),
359 capture_stream,
360 ))
361}
362
363fn default_device(input: bool) -> Result<(cpal::Device, cpal::SupportedStreamConfig)> {
364 let device;
365 let config;
366 if input {
367 device = cpal::default_host()
368 .default_input_device()
369 .ok_or_else(|| anyhow!("no audio input device available"))?;
370 config = device
371 .default_input_config()
372 .context("failed to get default input config")?;
373 } else {
374 device = cpal::default_host()
375 .default_output_device()
376 .ok_or_else(|| anyhow!("no audio output device available"))?;
377 config = device
378 .default_output_config()
379 .context("failed to get default output config")?;
380 }
381 Ok((device, config))
382}
383
384#[derive(Clone)]
385struct AudioMixerSource {
386 ssrc: i32,
387 sample_rate: u32,
388 num_channels: u32,
389 buffer: Arc<Mutex<VecDeque<Vec<i16>>>>,
390}
391
392impl AudioMixerSource {
393 fn receive(&self, frame: AudioFrame) {
394 assert_eq!(
395 frame.data.len() as u32,
396 self.sample_rate * self.num_channels / 100
397 );
398
399 let mut buffer = self.buffer.lock();
400 buffer.push_back(frame.data.to_vec());
401 while buffer.len() > 10 {
402 buffer.pop_front();
403 }
404 }
405}
406
407impl libwebrtc::native::audio_mixer::AudioMixerSource for AudioMixerSource {
408 fn ssrc(&self) -> i32 {
409 self.ssrc
410 }
411
412 fn preferred_sample_rate(&self) -> u32 {
413 self.sample_rate
414 }
415
416 fn get_audio_frame_with_info<'a>(&self, target_sample_rate: u32) -> Option<AudioFrame> {
417 assert_eq!(self.sample_rate, target_sample_rate);
418 let buf = self.buffer.lock().pop_front()?;
419 Some(AudioFrame {
420 data: Cow::Owned(buf),
421 sample_rate: self.sample_rate,
422 num_channels: self.num_channels,
423 samples_per_channel: self.sample_rate / 100,
424 })
425 }
426}
427
428pub fn play_remote_video_track(
429 track: &crate::RemoteVideoTrack,
430) -> impl Stream<Item = RemoteVideoFrame> + use<> {
431 #[cfg(target_os = "macos")]
432 {
433 let mut pool = None;
434 let most_recent_frame_size = (0, 0);
435 NativeVideoStream::new(track.0.rtc_track()).filter_map(move |frame| {
436 if pool == None
437 || most_recent_frame_size != (frame.buffer.width(), frame.buffer.height())
438 {
439 pool = create_buffer_pool(frame.buffer.width(), frame.buffer.height()).log_err();
440 }
441 let pool = pool.clone();
442 async move {
443 if frame.buffer.width() < 10 && frame.buffer.height() < 10 {
444 // when the remote stops sharing, we get an 8x8 black image.
445 // In a lil bit, the unpublish will come through and close the view,
446 // but until then, don't flash black.
447 return None;
448 }
449
450 video_frame_buffer_from_webrtc(pool?, frame.buffer)
451 }
452 })
453 }
454 #[cfg(not(target_os = "macos"))]
455 {
456 NativeVideoStream::new(track.0.rtc_track())
457 .filter_map(|frame| async move { video_frame_buffer_from_webrtc(frame.buffer) })
458 }
459}
460
461#[cfg(target_os = "macos")]
462fn create_buffer_pool(
463 width: u32,
464 height: u32,
465) -> Result<core_video::pixel_buffer_pool::CVPixelBufferPool> {
466 use core_foundation::{base::TCFType, number::CFNumber, string::CFString};
467 use core_video::pixel_buffer;
468 use core_video::{
469 pixel_buffer::kCVPixelFormatType_420YpCbCr8BiPlanarFullRange,
470 pixel_buffer_io_surface::kCVPixelBufferIOSurfaceCoreAnimationCompatibilityKey,
471 pixel_buffer_pool::{self},
472 };
473
474 let width_key: CFString =
475 unsafe { CFString::wrap_under_get_rule(pixel_buffer::kCVPixelBufferWidthKey) };
476 let height_key: CFString =
477 unsafe { CFString::wrap_under_get_rule(pixel_buffer::kCVPixelBufferHeightKey) };
478 let animation_key: CFString = unsafe {
479 CFString::wrap_under_get_rule(kCVPixelBufferIOSurfaceCoreAnimationCompatibilityKey)
480 };
481 let format_key: CFString =
482 unsafe { CFString::wrap_under_get_rule(pixel_buffer::kCVPixelBufferPixelFormatTypeKey) };
483
484 let yes: CFNumber = 1.into();
485 let width: CFNumber = (width as i32).into();
486 let height: CFNumber = (height as i32).into();
487 let format: CFNumber = (kCVPixelFormatType_420YpCbCr8BiPlanarFullRange as i64).into();
488
489 let buffer_attributes = core_foundation::dictionary::CFDictionary::from_CFType_pairs(&[
490 (width_key, width.into_CFType()),
491 (height_key, height.into_CFType()),
492 (animation_key, yes.into_CFType()),
493 (format_key, format.into_CFType()),
494 ]);
495
496 pixel_buffer_pool::CVPixelBufferPool::new(None, Some(&buffer_attributes)).map_err(|cv_return| {
497 anyhow!(
498 "failed to create pixel buffer pool: CVReturn({})",
499 cv_return
500 )
501 })
502}
503
504#[cfg(target_os = "macos")]
505pub type RemoteVideoFrame = core_video::pixel_buffer::CVPixelBuffer;
506
507#[cfg(target_os = "macos")]
508fn video_frame_buffer_from_webrtc(
509 pool: core_video::pixel_buffer_pool::CVPixelBufferPool,
510 buffer: Box<dyn VideoBuffer>,
511) -> Option<RemoteVideoFrame> {
512 use core_foundation::base::TCFType;
513 use core_video::{pixel_buffer::CVPixelBuffer, r#return::kCVReturnSuccess};
514 use livekit::webrtc::native::yuv_helper::i420_to_nv12;
515
516 if let Some(native) = buffer.as_native() {
517 let pixel_buffer = native.get_cv_pixel_buffer();
518 if pixel_buffer.is_null() {
519 return None;
520 }
521 return unsafe { Some(CVPixelBuffer::wrap_under_get_rule(pixel_buffer as _)) };
522 }
523
524 let i420_buffer = buffer.as_i420()?;
525 let pixel_buffer = pool.create_pixel_buffer().log_err()?;
526
527 let image_buffer = unsafe {
528 if pixel_buffer.lock_base_address(0) != kCVReturnSuccess {
529 return None;
530 }
531
532 let dst_y = pixel_buffer.get_base_address_of_plane(0);
533 let dst_y_stride = pixel_buffer.get_bytes_per_row_of_plane(0);
534 let dst_y_len = pixel_buffer.get_height_of_plane(0) * dst_y_stride;
535 let dst_uv = pixel_buffer.get_base_address_of_plane(1);
536 let dst_uv_stride = pixel_buffer.get_bytes_per_row_of_plane(1);
537 let dst_uv_len = pixel_buffer.get_height_of_plane(1) * dst_uv_stride;
538 let width = pixel_buffer.get_width();
539 let height = pixel_buffer.get_height();
540 let dst_y_buffer = std::slice::from_raw_parts_mut(dst_y as *mut u8, dst_y_len);
541 let dst_uv_buffer = std::slice::from_raw_parts_mut(dst_uv as *mut u8, dst_uv_len);
542
543 let (stride_y, stride_u, stride_v) = i420_buffer.strides();
544 let (src_y, src_u, src_v) = i420_buffer.data();
545 i420_to_nv12(
546 src_y,
547 stride_y,
548 src_u,
549 stride_u,
550 src_v,
551 stride_v,
552 dst_y_buffer,
553 dst_y_stride as u32,
554 dst_uv_buffer,
555 dst_uv_stride as u32,
556 width as i32,
557 height as i32,
558 );
559
560 if pixel_buffer.unlock_base_address(0) != kCVReturnSuccess {
561 return None;
562 }
563
564 pixel_buffer
565 };
566
567 Some(image_buffer)
568}
569
570#[cfg(not(target_os = "macos"))]
571pub type RemoteVideoFrame = Arc<gpui::RenderImage>;
572
573#[cfg(not(target_os = "macos"))]
574fn video_frame_buffer_from_webrtc(buffer: Box<dyn VideoBuffer>) -> Option<RemoteVideoFrame> {
575 use gpui::RenderImage;
576 use image::{Frame, RgbaImage};
577 use livekit::webrtc::prelude::VideoFormatType;
578 use smallvec::SmallVec;
579 use std::alloc::{Layout, alloc};
580
581 let width = buffer.width();
582 let height = buffer.height();
583 let stride = width * 4;
584 let byte_len = (stride * height) as usize;
585 let argb_image = unsafe {
586 // Motivation for this unsafe code is to avoid initializing the frame data, since to_argb
587 // will write all bytes anyway.
588 let start_ptr = alloc(Layout::array::<u8>(byte_len).log_err()?);
589 if start_ptr.is_null() {
590 return None;
591 }
592 let bgra_frame_slice = std::slice::from_raw_parts_mut(start_ptr, byte_len);
593 buffer.to_argb(
594 VideoFormatType::ARGB, // For some reason, this displays correctly while RGBA (the correct format) does not
595 bgra_frame_slice,
596 stride,
597 width as i32,
598 height as i32,
599 );
600 Vec::from_raw_parts(start_ptr, byte_len, byte_len)
601 };
602
603 Some(Arc::new(RenderImage::new(SmallVec::from_elem(
604 Frame::new(
605 RgbaImage::from_raw(width, height, argb_image)
606 .with_context(|| "Bug: not enough bytes allocated for image.")
607 .log_err()?,
608 ),
609 1,
610 ))))
611}
612
613#[cfg(target_os = "macos")]
614fn video_frame_buffer_to_webrtc(frame: ScreenCaptureFrame) -> Option<impl AsRef<dyn VideoBuffer>> {
615 use livekit::webrtc;
616
617 let pixel_buffer = frame.0.as_concrete_TypeRef();
618 std::mem::forget(frame.0);
619 unsafe {
620 Some(webrtc::video_frame::native::NativeBuffer::from_cv_pixel_buffer(pixel_buffer as _))
621 }
622}
623
624#[cfg(not(target_os = "macos"))]
625fn video_frame_buffer_to_webrtc(_frame: ScreenCaptureFrame) -> Option<impl AsRef<dyn VideoBuffer>> {
626 None as Option<Box<dyn VideoBuffer>>
627}
628
629trait DeviceChangeListenerApi: Stream<Item = ()> + Sized {
630 fn new(input: bool) -> Result<Self>;
631}
632
633#[cfg(target_os = "macos")]
634mod macos {
635
636 use coreaudio::sys::{
637 AudioObjectAddPropertyListener, AudioObjectID, AudioObjectPropertyAddress,
638 AudioObjectRemovePropertyListener, OSStatus, kAudioHardwarePropertyDefaultInputDevice,
639 kAudioHardwarePropertyDefaultOutputDevice, kAudioObjectPropertyElementMaster,
640 kAudioObjectPropertyScopeGlobal, kAudioObjectSystemObject,
641 };
642 use futures::{StreamExt, channel::mpsc::UnboundedReceiver};
643
644 /// Implementation from: https://github.com/zed-industries/cpal/blob/fd8bc2fd39f1f5fdee5a0690656caff9a26d9d50/src/host/coreaudio/macos/property_listener.rs#L15
645 pub struct CoreAudioDefaultDeviceChangeListener {
646 rx: UnboundedReceiver<()>,
647 callback: Box<PropertyListenerCallbackWrapper>,
648 input: bool,
649 }
650
651 trait _AssertSend: Send {}
652 impl _AssertSend for CoreAudioDefaultDeviceChangeListener {}
653
654 struct PropertyListenerCallbackWrapper(Box<dyn FnMut() + Send>);
655
656 unsafe extern "C" fn property_listener_handler_shim(
657 _: AudioObjectID,
658 _: u32,
659 _: *const AudioObjectPropertyAddress,
660 callback: *mut ::std::os::raw::c_void,
661 ) -> OSStatus {
662 let wrapper = callback as *mut PropertyListenerCallbackWrapper;
663 unsafe { (*wrapper).0() };
664 0
665 }
666
667 impl super::DeviceChangeListenerApi for CoreAudioDefaultDeviceChangeListener {
668 fn new(input: bool) -> gpui::Result<Self> {
669 let (tx, rx) = futures::channel::mpsc::unbounded();
670
671 let callback = Box::new(PropertyListenerCallbackWrapper(Box::new(move || {
672 tx.unbounded_send(()).ok();
673 })));
674
675 unsafe {
676 coreaudio::Error::from_os_status(AudioObjectAddPropertyListener(
677 kAudioObjectSystemObject,
678 &AudioObjectPropertyAddress {
679 mSelector: if input {
680 kAudioHardwarePropertyDefaultInputDevice
681 } else {
682 kAudioHardwarePropertyDefaultOutputDevice
683 },
684 mScope: kAudioObjectPropertyScopeGlobal,
685 mElement: kAudioObjectPropertyElementMaster,
686 },
687 Some(property_listener_handler_shim),
688 &*callback as *const _ as *mut _,
689 ))?;
690 }
691
692 Ok(Self {
693 rx,
694 callback,
695 input,
696 })
697 }
698 }
699
700 impl Drop for CoreAudioDefaultDeviceChangeListener {
701 fn drop(&mut self) {
702 unsafe {
703 AudioObjectRemovePropertyListener(
704 kAudioObjectSystemObject,
705 &AudioObjectPropertyAddress {
706 mSelector: if self.input {
707 kAudioHardwarePropertyDefaultInputDevice
708 } else {
709 kAudioHardwarePropertyDefaultOutputDevice
710 },
711 mScope: kAudioObjectPropertyScopeGlobal,
712 mElement: kAudioObjectPropertyElementMaster,
713 },
714 Some(property_listener_handler_shim),
715 &*self.callback as *const _ as *mut _,
716 );
717 }
718 }
719 }
720
721 impl futures::Stream for CoreAudioDefaultDeviceChangeListener {
722 type Item = ();
723
724 fn poll_next(
725 mut self: std::pin::Pin<&mut Self>,
726 cx: &mut std::task::Context<'_>,
727 ) -> std::task::Poll<Option<Self::Item>> {
728 self.rx.poll_next_unpin(cx)
729 }
730 }
731}
732
733#[cfg(target_os = "macos")]
734type DeviceChangeListener = macos::CoreAudioDefaultDeviceChangeListener;
735
736#[cfg(not(target_os = "macos"))]
737mod noop_change_listener {
738 use std::task::Poll;
739
740 use super::DeviceChangeListenerApi;
741
742 pub struct NoopOutputDeviceChangelistener {}
743
744 impl DeviceChangeListenerApi for NoopOutputDeviceChangelistener {
745 fn new(_input: bool) -> anyhow::Result<Self> {
746 Ok(NoopOutputDeviceChangelistener {})
747 }
748 }
749
750 impl futures::Stream for NoopOutputDeviceChangelistener {
751 type Item = ();
752
753 fn poll_next(
754 self: std::pin::Pin<&mut Self>,
755 _cx: &mut std::task::Context<'_>,
756 ) -> Poll<Option<Self::Item>> {
757 Poll::Pending
758 }
759 }
760}
761
762#[cfg(not(target_os = "macos"))]
763type DeviceChangeListener = noop_change_listener::NoopOutputDeviceChangelistener;