1use anyhow::{Context as _, Result};
2
3use cpal::traits::{DeviceTrait, StreamTrait as _};
4use futures::channel::mpsc::UnboundedSender;
5use futures::{Stream, StreamExt as _};
6use gpui::{
7 BackgroundExecutor, ScreenCaptureFrame, ScreenCaptureSource, ScreenCaptureStream, Task,
8};
9use libwebrtc::native::{apm, audio_mixer, audio_resampler};
10use livekit::track;
11
12use livekit::webrtc::{
13 audio_frame::AudioFrame,
14 audio_source::{AudioSourceOptions, RtcAudioSource, native::NativeAudioSource},
15 audio_stream::native::NativeAudioStream,
16 video_frame::{VideoBuffer, VideoFrame, VideoRotation},
17 video_source::{RtcVideoSource, VideoResolution, native::NativeVideoSource},
18 video_stream::native::NativeVideoStream,
19};
20use parking_lot::Mutex;
21use rodio::Source;
22use std::cell::RefCell;
23use std::sync::Weak;
24use std::sync::atomic::{AtomicBool, AtomicI32, Ordering};
25use std::time::Duration;
26use std::{borrow::Cow, collections::VecDeque, sync::Arc, thread};
27use util::{ResultExt as _, maybe};
28
29mod source;
30
31pub(crate) struct AudioStack {
32 executor: BackgroundExecutor,
33 apm: Arc<Mutex<apm::AudioProcessingModule>>,
34 mixer: Arc<Mutex<audio_mixer::AudioMixer>>,
35 _output_task: RefCell<Weak<Task<()>>>,
36 next_ssrc: AtomicI32,
37}
38
39// NOTE: We use WebRTC's mixer which only supports
40// 16kHz, 32kHz and 48kHz. As 48 is the most common "next step up"
41// for audio output devices like speakers/bluetooth, we just hard-code
42// this; and downsample when we need to.
43const SAMPLE_RATE: u32 = 48000;
44const NUM_CHANNELS: u32 = 2;
45
46pub(crate) fn play_remote_audio_track(
47 track: &livekit::track::RemoteAudioTrack,
48 cx: &mut gpui::App,
49) -> Result<AudioStream> {
50 let stop_handle = Arc::new(AtomicBool::new(false));
51 let stop_handle_clone = stop_handle.clone();
52 let stream = source::LiveKitStream::new(cx.background_executor(), track)
53 .stoppable()
54 .periodic_access(Duration::from_millis(50), move |s| {
55 if stop_handle.load(Ordering::Relaxed) {
56 s.stop();
57 }
58 });
59 audio::Audio::play_source(stream, cx).context("Could not play audio")?;
60
61 let on_drop = util::defer(move || {
62 stop_handle_clone.store(true, Ordering::Relaxed);
63 });
64 Ok(AudioStream::Output {
65 _drop: Box::new(on_drop),
66 })
67}
68
69impl AudioStack {
70 pub(crate) fn new(executor: BackgroundExecutor) -> Self {
71 let apm = Arc::new(Mutex::new(apm::AudioProcessingModule::new(
72 true, true, true, true,
73 )));
74 let mixer = Arc::new(Mutex::new(audio_mixer::AudioMixer::new()));
75 Self {
76 executor,
77 apm,
78 mixer,
79 _output_task: RefCell::new(Weak::new()),
80 next_ssrc: AtomicI32::new(1),
81 }
82 }
83
84 pub(crate) fn play_remote_audio_track(
85 &self,
86 track: &livekit::track::RemoteAudioTrack,
87 ) -> AudioStream {
88 let output_task = self.start_output();
89
90 let next_ssrc = self.next_ssrc.fetch_add(1, Ordering::Relaxed);
91 let source = AudioMixerSource {
92 ssrc: next_ssrc,
93 sample_rate: SAMPLE_RATE,
94 num_channels: NUM_CHANNELS,
95 buffer: Arc::default(),
96 };
97 self.mixer.lock().add_source(source.clone());
98
99 let mut stream = NativeAudioStream::new(
100 track.rtc_track(),
101 source.sample_rate as i32,
102 source.num_channels as i32,
103 );
104
105 let receive_task = self.executor.spawn({
106 let source = source.clone();
107 async move {
108 while let Some(frame) = stream.next().await {
109 source.receive(frame);
110 }
111 }
112 });
113
114 let mixer = self.mixer.clone();
115 let on_drop = util::defer(move || {
116 mixer.lock().remove_source(source.ssrc);
117 drop(receive_task);
118 drop(output_task);
119 });
120
121 AudioStream::Output {
122 _drop: Box::new(on_drop),
123 }
124 }
125
126 fn start_output(&self) -> Arc<Task<()>> {
127 if let Some(task) = self._output_task.borrow().upgrade() {
128 return task;
129 }
130 let task = Arc::new(self.executor.spawn({
131 let apm = self.apm.clone();
132 let mixer = self.mixer.clone();
133 async move {
134 Self::play_output(apm, mixer, SAMPLE_RATE, NUM_CHANNELS)
135 .await
136 .log_err();
137 }
138 }));
139 *self._output_task.borrow_mut() = Arc::downgrade(&task);
140 task
141 }
142
143 pub(crate) fn capture_local_microphone_track(
144 &self,
145 ) -> Result<(crate::LocalAudioTrack, AudioStream)> {
146 let source = NativeAudioSource::new(
147 // n.b. this struct's options are always ignored, noise cancellation is provided by apm.
148 AudioSourceOptions::default(),
149 SAMPLE_RATE,
150 NUM_CHANNELS,
151 10,
152 );
153
154 let track = track::LocalAudioTrack::create_audio_track(
155 "microphone",
156 RtcAudioSource::Native(source.clone()),
157 );
158
159 let apm = self.apm.clone();
160
161 let (frame_tx, mut frame_rx) = futures::channel::mpsc::unbounded();
162 let transmit_task = self.executor.spawn({
163 async move {
164 while let Some(frame) = frame_rx.next().await {
165 source.capture_frame(&frame).await.log_err();
166 }
167 }
168 });
169 let capture_task = self.executor.spawn(async move {
170 Self::capture_input(apm, frame_tx, SAMPLE_RATE, NUM_CHANNELS).await
171 });
172
173 let on_drop = util::defer(|| {
174 drop(transmit_task);
175 drop(capture_task);
176 });
177 Ok((
178 super::LocalAudioTrack(track),
179 AudioStream::Output {
180 _drop: Box::new(on_drop),
181 },
182 ))
183 }
184
185 async fn play_output(
186 apm: Arc<Mutex<apm::AudioProcessingModule>>,
187 mixer: Arc<Mutex<audio_mixer::AudioMixer>>,
188 sample_rate: u32,
189 num_channels: u32,
190 ) -> Result<()> {
191 loop {
192 let mut device_change_listener = DeviceChangeListener::new(false)?;
193 let (output_device, output_config) = crate::default_device(false)?;
194 let (end_on_drop_tx, end_on_drop_rx) = std::sync::mpsc::channel::<()>();
195 let mixer = mixer.clone();
196 let apm = apm.clone();
197 let mut resampler = audio_resampler::AudioResampler::default();
198 let mut buf = Vec::new();
199
200 thread::spawn(move || {
201 let output_stream = output_device.build_output_stream(
202 &output_config.config(),
203 {
204 move |mut data, _info| {
205 while data.len() > 0 {
206 if data.len() <= buf.len() {
207 let rest = buf.split_off(data.len());
208 data.copy_from_slice(&buf);
209 buf = rest;
210 return;
211 }
212 if buf.len() > 0 {
213 let (prefix, suffix) = data.split_at_mut(buf.len());
214 prefix.copy_from_slice(&buf);
215 data = suffix;
216 }
217
218 let mut mixer = mixer.lock();
219 let mixed = mixer.mix(output_config.channels() as usize);
220 let sampled = resampler.remix_and_resample(
221 mixed,
222 sample_rate / 100,
223 num_channels,
224 sample_rate,
225 output_config.channels() as u32,
226 output_config.sample_rate().0,
227 );
228 buf = sampled.to_vec();
229 apm.lock()
230 .process_reverse_stream(
231 &mut buf,
232 output_config.sample_rate().0 as i32,
233 output_config.channels() as i32,
234 )
235 .ok();
236 }
237 }
238 },
239 |error| log::error!("error playing audio track: {:?}", error),
240 Some(Duration::from_millis(100)),
241 );
242
243 let Some(output_stream) = output_stream.log_err() else {
244 return;
245 };
246
247 output_stream.play().log_err();
248 // Block forever to keep the output stream alive
249 end_on_drop_rx.recv().ok();
250 });
251
252 device_change_listener.next().await;
253 drop(end_on_drop_tx)
254 }
255 }
256
257 async fn capture_input(
258 apm: Arc<Mutex<apm::AudioProcessingModule>>,
259 frame_tx: UnboundedSender<AudioFrame<'static>>,
260 sample_rate: u32,
261 num_channels: u32,
262 ) -> Result<()> {
263 loop {
264 let mut device_change_listener = DeviceChangeListener::new(true)?;
265 let (device, config) = crate::default_device(true)?;
266 let (end_on_drop_tx, end_on_drop_rx) = std::sync::mpsc::channel::<()>();
267 let apm = apm.clone();
268 let frame_tx = frame_tx.clone();
269 let mut resampler = audio_resampler::AudioResampler::default();
270
271 thread::spawn(move || {
272 maybe!({
273 if let Some(name) = device.name().ok() {
274 log::info!("Using microphone: {}", name)
275 } else {
276 log::info!("Using microphone: <unknown>");
277 }
278
279 let ten_ms_buffer_size =
280 (config.channels() as u32 * config.sample_rate().0 / 100) as usize;
281 let mut buf: Vec<i16> = Vec::with_capacity(ten_ms_buffer_size);
282
283 let stream = device
284 .build_input_stream_raw(
285 &config.config(),
286 config.sample_format(),
287 move |data, _: &_| {
288 let data =
289 crate::get_sample_data(config.sample_format(), data).log_err();
290 let Some(data) = data else {
291 return;
292 };
293 let mut data = data.as_slice();
294
295 while data.len() > 0 {
296 let remainder = (buf.capacity() - buf.len()).min(data.len());
297 buf.extend_from_slice(&data[..remainder]);
298 data = &data[remainder..];
299
300 if buf.capacity() == buf.len() {
301 let mut sampled = resampler
302 .remix_and_resample(
303 buf.as_slice(),
304 config.sample_rate().0 / 100,
305 config.channels() as u32,
306 config.sample_rate().0,
307 num_channels,
308 sample_rate,
309 )
310 .to_owned();
311 apm.lock()
312 .process_stream(
313 &mut sampled,
314 sample_rate as i32,
315 num_channels as i32,
316 )
317 .log_err();
318 buf.clear();
319 frame_tx
320 .unbounded_send(AudioFrame {
321 data: Cow::Owned(sampled),
322 sample_rate,
323 num_channels,
324 samples_per_channel: sample_rate / 100,
325 })
326 .ok();
327 }
328 }
329 },
330 |err| log::error!("error capturing audio track: {:?}", err),
331 Some(Duration::from_millis(100)),
332 )
333 .context("failed to build input stream")?;
334
335 stream.play()?;
336 // Keep the thread alive and holding onto the `stream`
337 end_on_drop_rx.recv().ok();
338 anyhow::Ok(Some(()))
339 })
340 .log_err();
341 });
342
343 device_change_listener.next().await;
344 drop(end_on_drop_tx)
345 }
346 }
347}
348
349use super::LocalVideoTrack;
350
351pub enum AudioStream {
352 Input { _task: Task<()> },
353 Output { _drop: Box<dyn std::any::Any> },
354}
355
356pub(crate) async fn capture_local_video_track(
357 capture_source: &dyn ScreenCaptureSource,
358 cx: &mut gpui::AsyncApp,
359) -> Result<(crate::LocalVideoTrack, Box<dyn ScreenCaptureStream>)> {
360 let metadata = capture_source.metadata()?;
361 let track_source = gpui_tokio::Tokio::spawn(cx, async move {
362 NativeVideoSource::new(VideoResolution {
363 width: metadata.resolution.width.0 as u32,
364 height: metadata.resolution.height.0 as u32,
365 })
366 })?
367 .await?;
368
369 let capture_stream = capture_source
370 .stream(cx.foreground_executor(), {
371 let track_source = track_source.clone();
372 Box::new(move |frame| {
373 if let Some(buffer) = video_frame_buffer_to_webrtc(frame) {
374 track_source.capture_frame(&VideoFrame {
375 rotation: VideoRotation::VideoRotation0,
376 timestamp_us: 0,
377 buffer,
378 });
379 }
380 })
381 })
382 .await??;
383
384 Ok((
385 LocalVideoTrack(track::LocalVideoTrack::create_video_track(
386 "screen share",
387 RtcVideoSource::Native(track_source),
388 )),
389 capture_stream,
390 ))
391}
392
393#[derive(Clone)]
394struct AudioMixerSource {
395 ssrc: i32,
396 sample_rate: u32,
397 num_channels: u32,
398 buffer: Arc<Mutex<VecDeque<Vec<i16>>>>,
399}
400
401impl AudioMixerSource {
402 fn receive(&self, frame: AudioFrame) {
403 assert_eq!(
404 frame.data.len() as u32,
405 self.sample_rate * self.num_channels / 100
406 );
407
408 let mut buffer = self.buffer.lock();
409 buffer.push_back(frame.data.to_vec());
410 while buffer.len() > 10 {
411 buffer.pop_front();
412 }
413 }
414}
415
416impl libwebrtc::native::audio_mixer::AudioMixerSource for AudioMixerSource {
417 fn ssrc(&self) -> i32 {
418 self.ssrc
419 }
420
421 fn preferred_sample_rate(&self) -> u32 {
422 self.sample_rate
423 }
424
425 fn get_audio_frame_with_info<'a>(&self, target_sample_rate: u32) -> Option<AudioFrame<'_>> {
426 assert_eq!(self.sample_rate, target_sample_rate);
427 let buf = self.buffer.lock().pop_front()?;
428 Some(AudioFrame {
429 data: Cow::Owned(buf),
430 sample_rate: self.sample_rate,
431 num_channels: self.num_channels,
432 samples_per_channel: self.sample_rate / 100,
433 })
434 }
435}
436
437pub fn play_remote_video_track(
438 track: &crate::RemoteVideoTrack,
439) -> impl Stream<Item = RemoteVideoFrame> + use<> {
440 #[cfg(target_os = "macos")]
441 {
442 let mut pool = None;
443 let most_recent_frame_size = (0, 0);
444 NativeVideoStream::new(track.0.rtc_track()).filter_map(move |frame| {
445 if pool == None
446 || most_recent_frame_size != (frame.buffer.width(), frame.buffer.height())
447 {
448 pool = create_buffer_pool(frame.buffer.width(), frame.buffer.height()).log_err();
449 }
450 let pool = pool.clone();
451 async move {
452 if frame.buffer.width() < 10 && frame.buffer.height() < 10 {
453 // when the remote stops sharing, we get an 8x8 black image.
454 // In a lil bit, the unpublish will come through and close the view,
455 // but until then, don't flash black.
456 return None;
457 }
458
459 video_frame_buffer_from_webrtc(pool?, frame.buffer)
460 }
461 })
462 }
463 #[cfg(not(target_os = "macos"))]
464 {
465 NativeVideoStream::new(track.0.rtc_track())
466 .filter_map(|frame| async move { video_frame_buffer_from_webrtc(frame.buffer) })
467 }
468}
469
470#[cfg(target_os = "macos")]
471fn create_buffer_pool(
472 width: u32,
473 height: u32,
474) -> Result<core_video::pixel_buffer_pool::CVPixelBufferPool> {
475 use core_foundation::{base::TCFType, number::CFNumber, string::CFString};
476 use core_video::pixel_buffer;
477 use core_video::{
478 pixel_buffer::kCVPixelFormatType_420YpCbCr8BiPlanarFullRange,
479 pixel_buffer_io_surface::kCVPixelBufferIOSurfaceCoreAnimationCompatibilityKey,
480 pixel_buffer_pool::{self},
481 };
482
483 let width_key: CFString =
484 unsafe { CFString::wrap_under_get_rule(pixel_buffer::kCVPixelBufferWidthKey) };
485 let height_key: CFString =
486 unsafe { CFString::wrap_under_get_rule(pixel_buffer::kCVPixelBufferHeightKey) };
487 let animation_key: CFString = unsafe {
488 CFString::wrap_under_get_rule(kCVPixelBufferIOSurfaceCoreAnimationCompatibilityKey)
489 };
490 let format_key: CFString =
491 unsafe { CFString::wrap_under_get_rule(pixel_buffer::kCVPixelBufferPixelFormatTypeKey) };
492
493 let yes: CFNumber = 1.into();
494 let width: CFNumber = (width as i32).into();
495 let height: CFNumber = (height as i32).into();
496 let format: CFNumber = (kCVPixelFormatType_420YpCbCr8BiPlanarFullRange as i64).into();
497
498 let buffer_attributes = core_foundation::dictionary::CFDictionary::from_CFType_pairs(&[
499 (width_key, width.into_CFType()),
500 (height_key, height.into_CFType()),
501 (animation_key, yes.into_CFType()),
502 (format_key, format.into_CFType()),
503 ]);
504
505 pixel_buffer_pool::CVPixelBufferPool::new(None, Some(&buffer_attributes)).map_err(|cv_return| {
506 anyhow::anyhow!("failed to create pixel buffer pool: CVReturn({cv_return})",)
507 })
508}
509
510#[cfg(target_os = "macos")]
511pub type RemoteVideoFrame = core_video::pixel_buffer::CVPixelBuffer;
512
513#[cfg(target_os = "macos")]
514fn video_frame_buffer_from_webrtc(
515 pool: core_video::pixel_buffer_pool::CVPixelBufferPool,
516 buffer: Box<dyn VideoBuffer>,
517) -> Option<RemoteVideoFrame> {
518 use core_foundation::base::TCFType;
519 use core_video::{pixel_buffer::CVPixelBuffer, r#return::kCVReturnSuccess};
520 use livekit::webrtc::native::yuv_helper::i420_to_nv12;
521
522 if let Some(native) = buffer.as_native() {
523 let pixel_buffer = native.get_cv_pixel_buffer();
524 if pixel_buffer.is_null() {
525 return None;
526 }
527 return unsafe { Some(CVPixelBuffer::wrap_under_get_rule(pixel_buffer as _)) };
528 }
529
530 let i420_buffer = buffer.as_i420()?;
531 let pixel_buffer = pool.create_pixel_buffer().log_err()?;
532
533 let image_buffer = unsafe {
534 if pixel_buffer.lock_base_address(0) != kCVReturnSuccess {
535 return None;
536 }
537
538 let dst_y = pixel_buffer.get_base_address_of_plane(0);
539 let dst_y_stride = pixel_buffer.get_bytes_per_row_of_plane(0);
540 let dst_y_len = pixel_buffer.get_height_of_plane(0) * dst_y_stride;
541 let dst_uv = pixel_buffer.get_base_address_of_plane(1);
542 let dst_uv_stride = pixel_buffer.get_bytes_per_row_of_plane(1);
543 let dst_uv_len = pixel_buffer.get_height_of_plane(1) * dst_uv_stride;
544 let width = pixel_buffer.get_width();
545 let height = pixel_buffer.get_height();
546 let dst_y_buffer = std::slice::from_raw_parts_mut(dst_y as *mut u8, dst_y_len);
547 let dst_uv_buffer = std::slice::from_raw_parts_mut(dst_uv as *mut u8, dst_uv_len);
548
549 let (stride_y, stride_u, stride_v) = i420_buffer.strides();
550 let (src_y, src_u, src_v) = i420_buffer.data();
551 i420_to_nv12(
552 src_y,
553 stride_y,
554 src_u,
555 stride_u,
556 src_v,
557 stride_v,
558 dst_y_buffer,
559 dst_y_stride as u32,
560 dst_uv_buffer,
561 dst_uv_stride as u32,
562 width as i32,
563 height as i32,
564 );
565
566 if pixel_buffer.unlock_base_address(0) != kCVReturnSuccess {
567 return None;
568 }
569
570 pixel_buffer
571 };
572
573 Some(image_buffer)
574}
575
576#[cfg(not(target_os = "macos"))]
577pub type RemoteVideoFrame = Arc<gpui::RenderImage>;
578
579#[cfg(not(target_os = "macos"))]
580fn video_frame_buffer_from_webrtc(buffer: Box<dyn VideoBuffer>) -> Option<RemoteVideoFrame> {
581 use gpui::RenderImage;
582 use image::{Frame, RgbaImage};
583 use livekit::webrtc::prelude::VideoFormatType;
584 use smallvec::SmallVec;
585 use std::alloc::{Layout, alloc};
586
587 let width = buffer.width();
588 let height = buffer.height();
589 let stride = width * 4;
590 let byte_len = (stride * height) as usize;
591 let argb_image = unsafe {
592 // Motivation for this unsafe code is to avoid initializing the frame data, since to_argb
593 // will write all bytes anyway.
594 let start_ptr = alloc(Layout::array::<u8>(byte_len).log_err()?);
595 if start_ptr.is_null() {
596 return None;
597 }
598 let argb_frame_slice = std::slice::from_raw_parts_mut(start_ptr, byte_len);
599 buffer.to_argb(
600 VideoFormatType::ARGB,
601 argb_frame_slice,
602 stride,
603 width as i32,
604 height as i32,
605 );
606 Vec::from_raw_parts(start_ptr, byte_len, byte_len)
607 };
608
609 // TODO: Unclear why providing argb_image to RgbaImage works properly.
610 let image = RgbaImage::from_raw(width, height, argb_image)
611 .with_context(|| "Bug: not enough bytes allocated for image.")
612 .log_err()?;
613
614 Some(Arc::new(RenderImage::new(SmallVec::from_elem(
615 Frame::new(image),
616 1,
617 ))))
618}
619
620#[cfg(target_os = "macos")]
621fn video_frame_buffer_to_webrtc(frame: ScreenCaptureFrame) -> Option<impl AsRef<dyn VideoBuffer>> {
622 use livekit::webrtc;
623
624 let pixel_buffer = frame.0.as_concrete_TypeRef();
625 std::mem::forget(frame.0);
626 unsafe {
627 Some(webrtc::video_frame::native::NativeBuffer::from_cv_pixel_buffer(pixel_buffer as _))
628 }
629}
630
631#[cfg(not(target_os = "macos"))]
632fn video_frame_buffer_to_webrtc(frame: ScreenCaptureFrame) -> Option<impl AsRef<dyn VideoBuffer>> {
633 use libwebrtc::native::yuv_helper::{abgr_to_nv12, argb_to_nv12};
634 use livekit::webrtc::prelude::NV12Buffer;
635 match frame.0 {
636 scap::frame::Frame::BGRx(frame) => {
637 let mut buffer = NV12Buffer::new(frame.width as u32, frame.height as u32);
638 let (stride_y, stride_uv) = buffer.strides();
639 let (data_y, data_uv) = buffer.data_mut();
640 argb_to_nv12(
641 &frame.data,
642 frame.width as u32 * 4,
643 data_y,
644 stride_y,
645 data_uv,
646 stride_uv,
647 frame.width,
648 frame.height,
649 );
650 Some(buffer)
651 }
652 scap::frame::Frame::RGBx(frame) => {
653 let mut buffer = NV12Buffer::new(frame.width as u32, frame.height as u32);
654 let (stride_y, stride_uv) = buffer.strides();
655 let (data_y, data_uv) = buffer.data_mut();
656 abgr_to_nv12(
657 &frame.data,
658 frame.width as u32 * 4,
659 data_y,
660 stride_y,
661 data_uv,
662 stride_uv,
663 frame.width,
664 frame.height,
665 );
666 Some(buffer)
667 }
668 scap::frame::Frame::YUVFrame(yuvframe) => {
669 let mut buffer = NV12Buffer::with_strides(
670 yuvframe.width as u32,
671 yuvframe.height as u32,
672 yuvframe.luminance_stride as u32,
673 yuvframe.chrominance_stride as u32,
674 );
675 let (luminance, chrominance) = buffer.data_mut();
676 luminance.copy_from_slice(yuvframe.luminance_bytes.as_slice());
677 chrominance.copy_from_slice(yuvframe.chrominance_bytes.as_slice());
678 Some(buffer)
679 }
680 _ => {
681 log::error!(
682 "Expected BGRx or YUV frame from scap screen capture but got some other format."
683 );
684 None
685 }
686 }
687}
688
689trait DeviceChangeListenerApi: Stream<Item = ()> + Sized {
690 fn new(input: bool) -> Result<Self>;
691}
692
693#[cfg(target_os = "macos")]
694mod macos {
695
696 use coreaudio::sys::{
697 AudioObjectAddPropertyListener, AudioObjectID, AudioObjectPropertyAddress,
698 AudioObjectRemovePropertyListener, OSStatus, kAudioHardwarePropertyDefaultInputDevice,
699 kAudioHardwarePropertyDefaultOutputDevice, kAudioObjectPropertyElementMaster,
700 kAudioObjectPropertyScopeGlobal, kAudioObjectSystemObject,
701 };
702 use futures::{StreamExt, channel::mpsc::UnboundedReceiver};
703
704 /// Implementation from: https://github.com/zed-industries/cpal/blob/fd8bc2fd39f1f5fdee5a0690656caff9a26d9d50/src/host/coreaudio/macos/property_listener.rs#L15
705 pub struct CoreAudioDefaultDeviceChangeListener {
706 rx: UnboundedReceiver<()>,
707 callback: Box<PropertyListenerCallbackWrapper>,
708 input: bool,
709 device_id: AudioObjectID, // Store the device ID to properly remove listeners
710 }
711
712 trait _AssertSend: Send {}
713 impl _AssertSend for CoreAudioDefaultDeviceChangeListener {}
714
715 struct PropertyListenerCallbackWrapper(Box<dyn FnMut() + Send>);
716
717 unsafe extern "C" fn property_listener_handler_shim(
718 _: AudioObjectID,
719 _: u32,
720 _: *const AudioObjectPropertyAddress,
721 callback: *mut ::std::os::raw::c_void,
722 ) -> OSStatus {
723 let wrapper = callback as *mut PropertyListenerCallbackWrapper;
724 unsafe { (*wrapper).0() };
725 0
726 }
727
728 impl super::DeviceChangeListenerApi for CoreAudioDefaultDeviceChangeListener {
729 fn new(input: bool) -> anyhow::Result<Self> {
730 let (tx, rx) = futures::channel::mpsc::unbounded();
731
732 let callback = Box::new(PropertyListenerCallbackWrapper(Box::new(move || {
733 tx.unbounded_send(()).ok();
734 })));
735
736 // Get the current default device ID
737 let device_id = unsafe {
738 // Listen for default device changes
739 coreaudio::Error::from_os_status(AudioObjectAddPropertyListener(
740 kAudioObjectSystemObject,
741 &AudioObjectPropertyAddress {
742 mSelector: if input {
743 kAudioHardwarePropertyDefaultInputDevice
744 } else {
745 kAudioHardwarePropertyDefaultOutputDevice
746 },
747 mScope: kAudioObjectPropertyScopeGlobal,
748 mElement: kAudioObjectPropertyElementMaster,
749 },
750 Some(property_listener_handler_shim),
751 &*callback as *const _ as *mut _,
752 ))?;
753
754 // Also listen for changes to the device configuration
755 let device_id = if input {
756 let mut input_device: AudioObjectID = 0;
757 let mut prop_size = std::mem::size_of::<AudioObjectID>() as u32;
758 let result = coreaudio::sys::AudioObjectGetPropertyData(
759 kAudioObjectSystemObject,
760 &AudioObjectPropertyAddress {
761 mSelector: kAudioHardwarePropertyDefaultInputDevice,
762 mScope: kAudioObjectPropertyScopeGlobal,
763 mElement: kAudioObjectPropertyElementMaster,
764 },
765 0,
766 std::ptr::null(),
767 &mut prop_size as *mut _,
768 &mut input_device as *mut _ as *mut _,
769 );
770 if result != 0 {
771 log::warn!("Failed to get default input device ID");
772 0
773 } else {
774 input_device
775 }
776 } else {
777 let mut output_device: AudioObjectID = 0;
778 let mut prop_size = std::mem::size_of::<AudioObjectID>() as u32;
779 let result = coreaudio::sys::AudioObjectGetPropertyData(
780 kAudioObjectSystemObject,
781 &AudioObjectPropertyAddress {
782 mSelector: kAudioHardwarePropertyDefaultOutputDevice,
783 mScope: kAudioObjectPropertyScopeGlobal,
784 mElement: kAudioObjectPropertyElementMaster,
785 },
786 0,
787 std::ptr::null(),
788 &mut prop_size as *mut _,
789 &mut output_device as *mut _ as *mut _,
790 );
791 if result != 0 {
792 log::warn!("Failed to get default output device ID");
793 0
794 } else {
795 output_device
796 }
797 };
798
799 if device_id != 0 {
800 // Listen for format changes on the device
801 coreaudio::Error::from_os_status(AudioObjectAddPropertyListener(
802 device_id,
803 &AudioObjectPropertyAddress {
804 mSelector: coreaudio::sys::kAudioDevicePropertyStreamFormat,
805 mScope: if input {
806 coreaudio::sys::kAudioObjectPropertyScopeInput
807 } else {
808 coreaudio::sys::kAudioObjectPropertyScopeOutput
809 },
810 mElement: kAudioObjectPropertyElementMaster,
811 },
812 Some(property_listener_handler_shim),
813 &*callback as *const _ as *mut _,
814 ))?;
815 }
816
817 device_id
818 };
819
820 Ok(Self {
821 rx,
822 callback,
823 input,
824 device_id,
825 })
826 }
827 }
828
829 impl Drop for CoreAudioDefaultDeviceChangeListener {
830 fn drop(&mut self) {
831 unsafe {
832 // Remove the system-level property listener
833 AudioObjectRemovePropertyListener(
834 kAudioObjectSystemObject,
835 &AudioObjectPropertyAddress {
836 mSelector: if self.input {
837 kAudioHardwarePropertyDefaultInputDevice
838 } else {
839 kAudioHardwarePropertyDefaultOutputDevice
840 },
841 mScope: kAudioObjectPropertyScopeGlobal,
842 mElement: kAudioObjectPropertyElementMaster,
843 },
844 Some(property_listener_handler_shim),
845 &*self.callback as *const _ as *mut _,
846 );
847
848 // Remove the device-specific property listener if we have a valid device ID
849 if self.device_id != 0 {
850 AudioObjectRemovePropertyListener(
851 self.device_id,
852 &AudioObjectPropertyAddress {
853 mSelector: coreaudio::sys::kAudioDevicePropertyStreamFormat,
854 mScope: if self.input {
855 coreaudio::sys::kAudioObjectPropertyScopeInput
856 } else {
857 coreaudio::sys::kAudioObjectPropertyScopeOutput
858 },
859 mElement: kAudioObjectPropertyElementMaster,
860 },
861 Some(property_listener_handler_shim),
862 &*self.callback as *const _ as *mut _,
863 );
864 }
865 }
866 }
867 }
868
869 impl futures::Stream for CoreAudioDefaultDeviceChangeListener {
870 type Item = ();
871
872 fn poll_next(
873 mut self: std::pin::Pin<&mut Self>,
874 cx: &mut std::task::Context<'_>,
875 ) -> std::task::Poll<Option<Self::Item>> {
876 self.rx.poll_next_unpin(cx)
877 }
878 }
879}
880
881#[cfg(target_os = "macos")]
882type DeviceChangeListener = macos::CoreAudioDefaultDeviceChangeListener;
883
884#[cfg(not(target_os = "macos"))]
885mod noop_change_listener {
886 use std::task::Poll;
887
888 use super::DeviceChangeListenerApi;
889
890 pub struct NoopOutputDeviceChangelistener {}
891
892 impl DeviceChangeListenerApi for NoopOutputDeviceChangelistener {
893 fn new(_input: bool) -> anyhow::Result<Self> {
894 Ok(NoopOutputDeviceChangelistener {})
895 }
896 }
897
898 impl futures::Stream for NoopOutputDeviceChangelistener {
899 type Item = ();
900
901 fn poll_next(
902 self: std::pin::Pin<&mut Self>,
903 _cx: &mut std::task::Context<'_>,
904 ) -> Poll<Option<Self::Item>> {
905 Poll::Pending
906 }
907 }
908}
909
910#[cfg(not(target_os = "macos"))]
911type DeviceChangeListener = noop_change_listener::NoopOutputDeviceChangelistener;