1use anyhow::{Context as _, Result};
2
3use cpal::traits::{DeviceTrait, HostTrait, StreamTrait as _};
4use cpal::{Data, FromSample, I24, SampleFormat, SizedSample};
5use futures::channel::mpsc::UnboundedSender;
6use futures::{Stream, StreamExt as _};
7use gpui::{
8 BackgroundExecutor, ScreenCaptureFrame, ScreenCaptureSource, ScreenCaptureStream, Task,
9};
10use libwebrtc::native::{apm, audio_mixer, audio_resampler};
11use livekit::track;
12
13use livekit::webrtc::{
14 audio_frame::AudioFrame,
15 audio_source::{AudioSourceOptions, RtcAudioSource, native::NativeAudioSource},
16 audio_stream::native::NativeAudioStream,
17 video_frame::{VideoBuffer, VideoFrame, VideoRotation},
18 video_source::{RtcVideoSource, VideoResolution, native::NativeVideoSource},
19 video_stream::native::NativeVideoStream,
20};
21use parking_lot::Mutex;
22use std::cell::RefCell;
23use std::sync::Weak;
24use std::sync::atomic::{self, AtomicI32};
25use std::time::Duration;
26use std::{borrow::Cow, collections::VecDeque, sync::Arc, thread};
27use util::{ResultExt as _, maybe};
28
29pub(crate) struct AudioStack {
30 executor: BackgroundExecutor,
31 apm: Arc<Mutex<apm::AudioProcessingModule>>,
32 mixer: Arc<Mutex<audio_mixer::AudioMixer>>,
33 _output_task: RefCell<Weak<Task<()>>>,
34 next_ssrc: AtomicI32,
35}
36
37// NOTE: We use WebRTC's mixer which only supports
38// 16kHz, 32kHz and 48kHz. As 48 is the most common "next step up"
39// for audio output devices like speakers/bluetooth, we just hard-code
40// this; and downsample when we need to.
41const SAMPLE_RATE: u32 = 48000;
42const NUM_CHANNELS: u32 = 2;
43
44impl AudioStack {
45 pub(crate) fn new(executor: BackgroundExecutor) -> Self {
46 let apm = Arc::new(Mutex::new(apm::AudioProcessingModule::new(
47 true, true, true, true,
48 )));
49 let mixer = Arc::new(Mutex::new(audio_mixer::AudioMixer::new()));
50 Self {
51 executor,
52 apm,
53 mixer,
54 _output_task: RefCell::new(Weak::new()),
55 next_ssrc: AtomicI32::new(1),
56 }
57 }
58
59 pub(crate) fn play_remote_audio_track(
60 &self,
61 track: &livekit::track::RemoteAudioTrack,
62 ) -> AudioStream {
63 let output_task = self.start_output();
64
65 let next_ssrc = self.next_ssrc.fetch_add(1, atomic::Ordering::Relaxed);
66 let source = AudioMixerSource {
67 ssrc: next_ssrc,
68 sample_rate: SAMPLE_RATE,
69 num_channels: NUM_CHANNELS,
70 buffer: Arc::default(),
71 };
72 self.mixer.lock().add_source(source.clone());
73
74 let mut stream = NativeAudioStream::new(
75 track.rtc_track(),
76 source.sample_rate as i32,
77 source.num_channels as i32,
78 );
79
80 let receive_task = self.executor.spawn({
81 let source = source.clone();
82 async move {
83 while let Some(frame) = stream.next().await {
84 source.receive(frame);
85 }
86 }
87 });
88
89 let mixer = self.mixer.clone();
90 let on_drop = util::defer(move || {
91 mixer.lock().remove_source(source.ssrc);
92 drop(receive_task);
93 drop(output_task);
94 });
95
96 AudioStream::Output {
97 _drop: Box::new(on_drop),
98 }
99 }
100
101 pub(crate) fn capture_local_microphone_track(
102 &self,
103 ) -> Result<(crate::LocalAudioTrack, AudioStream)> {
104 let source = NativeAudioSource::new(
105 // n.b. this struct's options are always ignored, noise cancellation is provided by apm.
106 AudioSourceOptions::default(),
107 SAMPLE_RATE,
108 NUM_CHANNELS,
109 10,
110 );
111
112 let track = track::LocalAudioTrack::create_audio_track(
113 "microphone",
114 RtcAudioSource::Native(source.clone()),
115 );
116
117 let apm = self.apm.clone();
118
119 let (frame_tx, mut frame_rx) = futures::channel::mpsc::unbounded();
120 let transmit_task = self.executor.spawn({
121 let source = source.clone();
122 async move {
123 while let Some(frame) = frame_rx.next().await {
124 source.capture_frame(&frame).await.log_err();
125 }
126 }
127 });
128 let capture_task = self.executor.spawn(async move {
129 Self::capture_input(apm, frame_tx, SAMPLE_RATE, NUM_CHANNELS).await
130 });
131
132 let on_drop = util::defer(|| {
133 drop(transmit_task);
134 drop(capture_task);
135 });
136 return Ok((
137 super::LocalAudioTrack(track),
138 AudioStream::Output {
139 _drop: Box::new(on_drop),
140 },
141 ));
142 }
143
144 fn start_output(&self) -> Arc<Task<()>> {
145 if let Some(task) = self._output_task.borrow().upgrade() {
146 return task;
147 }
148 let task = Arc::new(self.executor.spawn({
149 let apm = self.apm.clone();
150 let mixer = self.mixer.clone();
151 async move {
152 Self::play_output(apm, mixer, SAMPLE_RATE, NUM_CHANNELS)
153 .await
154 .log_err();
155 }
156 }));
157 *self._output_task.borrow_mut() = Arc::downgrade(&task);
158 task
159 }
160
161 async fn play_output(
162 apm: Arc<Mutex<apm::AudioProcessingModule>>,
163 mixer: Arc<Mutex<audio_mixer::AudioMixer>>,
164 sample_rate: u32,
165 num_channels: u32,
166 ) -> Result<()> {
167 loop {
168 let mut device_change_listener = DeviceChangeListener::new(false)?;
169 let (output_device, output_config) = default_device(false)?;
170 let (end_on_drop_tx, end_on_drop_rx) = std::sync::mpsc::channel::<()>();
171 let mixer = mixer.clone();
172 let apm = apm.clone();
173 let mut resampler = audio_resampler::AudioResampler::default();
174 let mut buf = Vec::new();
175
176 thread::spawn(move || {
177 let output_stream = output_device.build_output_stream(
178 &output_config.config(),
179 {
180 move |mut data, _info| {
181 while data.len() > 0 {
182 if data.len() <= buf.len() {
183 let rest = buf.split_off(data.len());
184 data.copy_from_slice(&buf);
185 buf = rest;
186 return;
187 }
188 if buf.len() > 0 {
189 let (prefix, suffix) = data.split_at_mut(buf.len());
190 prefix.copy_from_slice(&buf);
191 data = suffix;
192 }
193
194 let mut mixer = mixer.lock();
195 let mixed = mixer.mix(output_config.channels() as usize);
196 let sampled = resampler.remix_and_resample(
197 mixed,
198 sample_rate / 100,
199 num_channels,
200 sample_rate,
201 output_config.channels() as u32,
202 output_config.sample_rate().0,
203 );
204 buf = sampled.to_vec();
205 apm.lock()
206 .process_reverse_stream(
207 &mut buf,
208 output_config.sample_rate().0 as i32,
209 output_config.channels() as i32,
210 )
211 .ok();
212 }
213 }
214 },
215 |error| log::error!("error playing audio track: {:?}", error),
216 Some(Duration::from_millis(100)),
217 );
218
219 let Some(output_stream) = output_stream.log_err() else {
220 return;
221 };
222
223 output_stream.play().log_err();
224 // Block forever to keep the output stream alive
225 end_on_drop_rx.recv().ok();
226 });
227
228 device_change_listener.next().await;
229 drop(end_on_drop_tx)
230 }
231 }
232
233 async fn capture_input(
234 apm: Arc<Mutex<apm::AudioProcessingModule>>,
235 frame_tx: UnboundedSender<AudioFrame<'static>>,
236 sample_rate: u32,
237 num_channels: u32,
238 ) -> Result<()> {
239 loop {
240 let mut device_change_listener = DeviceChangeListener::new(true)?;
241 let (device, config) = default_device(true)?;
242 let (end_on_drop_tx, end_on_drop_rx) = std::sync::mpsc::channel::<()>();
243 let apm = apm.clone();
244 let frame_tx = frame_tx.clone();
245 let mut resampler = audio_resampler::AudioResampler::default();
246
247 thread::spawn(move || {
248 maybe!({
249 if let Some(name) = device.name().ok() {
250 log::info!("Using microphone: {}", name)
251 } else {
252 log::info!("Using microphone: <unknown>");
253 }
254
255 let ten_ms_buffer_size =
256 (config.channels() as u32 * config.sample_rate().0 / 100) as usize;
257 let mut buf: Vec<i16> = Vec::with_capacity(ten_ms_buffer_size);
258
259 let stream = device
260 .build_input_stream_raw(
261 &config.config(),
262 config.sample_format(),
263 move |data, _: &_| {
264 let data =
265 Self::get_sample_data(config.sample_format(), data).log_err();
266 let Some(data) = data else {
267 return;
268 };
269 let mut data = data.as_slice();
270
271 while data.len() > 0 {
272 let remainder = (buf.capacity() - buf.len()).min(data.len());
273 buf.extend_from_slice(&data[..remainder]);
274 data = &data[remainder..];
275
276 if buf.capacity() == buf.len() {
277 let mut sampled = resampler
278 .remix_and_resample(
279 buf.as_slice(),
280 config.sample_rate().0 / 100,
281 config.channels() as u32,
282 config.sample_rate().0,
283 num_channels,
284 sample_rate,
285 )
286 .to_owned();
287 apm.lock()
288 .process_stream(
289 &mut sampled,
290 sample_rate as i32,
291 num_channels as i32,
292 )
293 .log_err();
294 buf.clear();
295 frame_tx
296 .unbounded_send(AudioFrame {
297 data: Cow::Owned(sampled),
298 sample_rate,
299 num_channels,
300 samples_per_channel: sample_rate / 100,
301 })
302 .ok();
303 }
304 }
305 },
306 |err| log::error!("error capturing audio track: {:?}", err),
307 Some(Duration::from_millis(100)),
308 )
309 .context("failed to build input stream")?;
310
311 stream.play()?;
312 // Keep the thread alive and holding onto the `stream`
313 end_on_drop_rx.recv().ok();
314 anyhow::Ok(Some(()))
315 })
316 .log_err();
317 });
318
319 device_change_listener.next().await;
320 drop(end_on_drop_tx)
321 }
322 }
323
324 fn get_sample_data(sample_format: SampleFormat, data: &Data) -> Result<Vec<i16>> {
325 match sample_format {
326 SampleFormat::I8 => Ok(Self::convert_sample_data::<i8, i16>(data)),
327 SampleFormat::I16 => Ok(data.as_slice::<i16>().unwrap().to_vec()),
328 SampleFormat::I24 => Ok(Self::convert_sample_data::<I24, i16>(data)),
329 SampleFormat::I32 => Ok(Self::convert_sample_data::<i32, i16>(data)),
330 SampleFormat::I64 => Ok(Self::convert_sample_data::<i64, i16>(data)),
331 SampleFormat::U8 => Ok(Self::convert_sample_data::<u8, i16>(data)),
332 SampleFormat::U16 => Ok(Self::convert_sample_data::<u16, i16>(data)),
333 SampleFormat::U32 => Ok(Self::convert_sample_data::<u32, i16>(data)),
334 SampleFormat::U64 => Ok(Self::convert_sample_data::<u64, i16>(data)),
335 SampleFormat::F32 => Ok(Self::convert_sample_data::<f32, i16>(data)),
336 SampleFormat::F64 => Ok(Self::convert_sample_data::<f64, i16>(data)),
337 _ => anyhow::bail!("Unsupported sample format"),
338 }
339 }
340
341 fn convert_sample_data<TSource: SizedSample, TDest: SizedSample + FromSample<TSource>>(
342 data: &Data,
343 ) -> Vec<TDest> {
344 data.as_slice::<TSource>()
345 .unwrap()
346 .iter()
347 .map(|e| e.to_sample::<TDest>())
348 .collect()
349 }
350}
351
352use super::LocalVideoTrack;
353
354pub enum AudioStream {
355 Input { _task: Task<()> },
356 Output { _drop: Box<dyn std::any::Any> },
357}
358
359pub(crate) async fn capture_local_video_track(
360 capture_source: &dyn ScreenCaptureSource,
361 cx: &mut gpui::AsyncApp,
362) -> Result<(crate::LocalVideoTrack, Box<dyn ScreenCaptureStream>)> {
363 let metadata = capture_source.metadata()?;
364 let track_source = gpui_tokio::Tokio::spawn(cx, async move {
365 NativeVideoSource::new(VideoResolution {
366 width: metadata.resolution.width.0 as u32,
367 height: metadata.resolution.height.0 as u32,
368 })
369 })?
370 .await?;
371
372 let capture_stream = capture_source
373 .stream(cx.foreground_executor(), {
374 let track_source = track_source.clone();
375 Box::new(move |frame| {
376 if let Some(buffer) = video_frame_buffer_to_webrtc(frame) {
377 track_source.capture_frame(&VideoFrame {
378 rotation: VideoRotation::VideoRotation0,
379 timestamp_us: 0,
380 buffer,
381 });
382 }
383 })
384 })
385 .await??;
386
387 Ok((
388 LocalVideoTrack(track::LocalVideoTrack::create_video_track(
389 "screen share",
390 RtcVideoSource::Native(track_source),
391 )),
392 capture_stream,
393 ))
394}
395
396fn default_device(input: bool) -> Result<(cpal::Device, cpal::SupportedStreamConfig)> {
397 let device;
398 let config;
399 if input {
400 device = cpal::default_host()
401 .default_input_device()
402 .context("no audio input device available")?;
403 config = device
404 .default_input_config()
405 .context("failed to get default input config")?;
406 } else {
407 device = cpal::default_host()
408 .default_output_device()
409 .context("no audio output device available")?;
410 config = device
411 .default_output_config()
412 .context("failed to get default output config")?;
413 }
414 Ok((device, config))
415}
416
417#[derive(Clone)]
418struct AudioMixerSource {
419 ssrc: i32,
420 sample_rate: u32,
421 num_channels: u32,
422 buffer: Arc<Mutex<VecDeque<Vec<i16>>>>,
423}
424
425impl AudioMixerSource {
426 fn receive(&self, frame: AudioFrame) {
427 assert_eq!(
428 frame.data.len() as u32,
429 self.sample_rate * self.num_channels / 100
430 );
431
432 let mut buffer = self.buffer.lock();
433 buffer.push_back(frame.data.to_vec());
434 while buffer.len() > 10 {
435 buffer.pop_front();
436 }
437 }
438}
439
440impl libwebrtc::native::audio_mixer::AudioMixerSource for AudioMixerSource {
441 fn ssrc(&self) -> i32 {
442 self.ssrc
443 }
444
445 fn preferred_sample_rate(&self) -> u32 {
446 self.sample_rate
447 }
448
449 fn get_audio_frame_with_info<'a>(&self, target_sample_rate: u32) -> Option<AudioFrame<'_>> {
450 assert_eq!(self.sample_rate, target_sample_rate);
451 let buf = self.buffer.lock().pop_front()?;
452 Some(AudioFrame {
453 data: Cow::Owned(buf),
454 sample_rate: self.sample_rate,
455 num_channels: self.num_channels,
456 samples_per_channel: self.sample_rate / 100,
457 })
458 }
459}
460
461pub fn play_remote_video_track(
462 track: &crate::RemoteVideoTrack,
463) -> impl Stream<Item = RemoteVideoFrame> + use<> {
464 #[cfg(target_os = "macos")]
465 {
466 let mut pool = None;
467 let most_recent_frame_size = (0, 0);
468 NativeVideoStream::new(track.0.rtc_track()).filter_map(move |frame| {
469 if pool == None
470 || most_recent_frame_size != (frame.buffer.width(), frame.buffer.height())
471 {
472 pool = create_buffer_pool(frame.buffer.width(), frame.buffer.height()).log_err();
473 }
474 let pool = pool.clone();
475 async move {
476 if frame.buffer.width() < 10 && frame.buffer.height() < 10 {
477 // when the remote stops sharing, we get an 8x8 black image.
478 // In a lil bit, the unpublish will come through and close the view,
479 // but until then, don't flash black.
480 return None;
481 }
482
483 video_frame_buffer_from_webrtc(pool?, frame.buffer)
484 }
485 })
486 }
487 #[cfg(not(target_os = "macos"))]
488 {
489 NativeVideoStream::new(track.0.rtc_track())
490 .filter_map(|frame| async move { video_frame_buffer_from_webrtc(frame.buffer) })
491 }
492}
493
494#[cfg(target_os = "macos")]
495fn create_buffer_pool(
496 width: u32,
497 height: u32,
498) -> Result<core_video::pixel_buffer_pool::CVPixelBufferPool> {
499 use core_foundation::{base::TCFType, number::CFNumber, string::CFString};
500 use core_video::pixel_buffer;
501 use core_video::{
502 pixel_buffer::kCVPixelFormatType_420YpCbCr8BiPlanarFullRange,
503 pixel_buffer_io_surface::kCVPixelBufferIOSurfaceCoreAnimationCompatibilityKey,
504 pixel_buffer_pool::{self},
505 };
506
507 let width_key: CFString =
508 unsafe { CFString::wrap_under_get_rule(pixel_buffer::kCVPixelBufferWidthKey) };
509 let height_key: CFString =
510 unsafe { CFString::wrap_under_get_rule(pixel_buffer::kCVPixelBufferHeightKey) };
511 let animation_key: CFString = unsafe {
512 CFString::wrap_under_get_rule(kCVPixelBufferIOSurfaceCoreAnimationCompatibilityKey)
513 };
514 let format_key: CFString =
515 unsafe { CFString::wrap_under_get_rule(pixel_buffer::kCVPixelBufferPixelFormatTypeKey) };
516
517 let yes: CFNumber = 1.into();
518 let width: CFNumber = (width as i32).into();
519 let height: CFNumber = (height as i32).into();
520 let format: CFNumber = (kCVPixelFormatType_420YpCbCr8BiPlanarFullRange as i64).into();
521
522 let buffer_attributes = core_foundation::dictionary::CFDictionary::from_CFType_pairs(&[
523 (width_key, width.into_CFType()),
524 (height_key, height.into_CFType()),
525 (animation_key, yes.into_CFType()),
526 (format_key, format.into_CFType()),
527 ]);
528
529 pixel_buffer_pool::CVPixelBufferPool::new(None, Some(&buffer_attributes)).map_err(|cv_return| {
530 anyhow::anyhow!("failed to create pixel buffer pool: CVReturn({cv_return})",)
531 })
532}
533
534#[cfg(target_os = "macos")]
535pub type RemoteVideoFrame = core_video::pixel_buffer::CVPixelBuffer;
536
537#[cfg(target_os = "macos")]
538fn video_frame_buffer_from_webrtc(
539 pool: core_video::pixel_buffer_pool::CVPixelBufferPool,
540 buffer: Box<dyn VideoBuffer>,
541) -> Option<RemoteVideoFrame> {
542 use core_foundation::base::TCFType;
543 use core_video::{pixel_buffer::CVPixelBuffer, r#return::kCVReturnSuccess};
544 use livekit::webrtc::native::yuv_helper::i420_to_nv12;
545
546 if let Some(native) = buffer.as_native() {
547 let pixel_buffer = native.get_cv_pixel_buffer();
548 if pixel_buffer.is_null() {
549 return None;
550 }
551 return unsafe { Some(CVPixelBuffer::wrap_under_get_rule(pixel_buffer as _)) };
552 }
553
554 let i420_buffer = buffer.as_i420()?;
555 let pixel_buffer = pool.create_pixel_buffer().log_err()?;
556
557 let image_buffer = unsafe {
558 if pixel_buffer.lock_base_address(0) != kCVReturnSuccess {
559 return None;
560 }
561
562 let dst_y = pixel_buffer.get_base_address_of_plane(0);
563 let dst_y_stride = pixel_buffer.get_bytes_per_row_of_plane(0);
564 let dst_y_len = pixel_buffer.get_height_of_plane(0) * dst_y_stride;
565 let dst_uv = pixel_buffer.get_base_address_of_plane(1);
566 let dst_uv_stride = pixel_buffer.get_bytes_per_row_of_plane(1);
567 let dst_uv_len = pixel_buffer.get_height_of_plane(1) * dst_uv_stride;
568 let width = pixel_buffer.get_width();
569 let height = pixel_buffer.get_height();
570 let dst_y_buffer = std::slice::from_raw_parts_mut(dst_y as *mut u8, dst_y_len);
571 let dst_uv_buffer = std::slice::from_raw_parts_mut(dst_uv as *mut u8, dst_uv_len);
572
573 let (stride_y, stride_u, stride_v) = i420_buffer.strides();
574 let (src_y, src_u, src_v) = i420_buffer.data();
575 i420_to_nv12(
576 src_y,
577 stride_y,
578 src_u,
579 stride_u,
580 src_v,
581 stride_v,
582 dst_y_buffer,
583 dst_y_stride as u32,
584 dst_uv_buffer,
585 dst_uv_stride as u32,
586 width as i32,
587 height as i32,
588 );
589
590 if pixel_buffer.unlock_base_address(0) != kCVReturnSuccess {
591 return None;
592 }
593
594 pixel_buffer
595 };
596
597 Some(image_buffer)
598}
599
600#[cfg(not(target_os = "macos"))]
601pub type RemoteVideoFrame = Arc<gpui::RenderImage>;
602
603#[cfg(not(target_os = "macos"))]
604fn video_frame_buffer_from_webrtc(buffer: Box<dyn VideoBuffer>) -> Option<RemoteVideoFrame> {
605 use gpui::RenderImage;
606 use image::{Frame, RgbaImage};
607 use livekit::webrtc::prelude::VideoFormatType;
608 use smallvec::SmallVec;
609 use std::alloc::{Layout, alloc};
610
611 let width = buffer.width();
612 let height = buffer.height();
613 let stride = width * 4;
614 let byte_len = (stride * height) as usize;
615 let argb_image = unsafe {
616 // Motivation for this unsafe code is to avoid initializing the frame data, since to_argb
617 // will write all bytes anyway.
618 let start_ptr = alloc(Layout::array::<u8>(byte_len).log_err()?);
619 if start_ptr.is_null() {
620 return None;
621 }
622 let argb_frame_slice = std::slice::from_raw_parts_mut(start_ptr, byte_len);
623 buffer.to_argb(
624 VideoFormatType::ARGB,
625 argb_frame_slice,
626 stride,
627 width as i32,
628 height as i32,
629 );
630 Vec::from_raw_parts(start_ptr, byte_len, byte_len)
631 };
632
633 // TODO: Unclear why providing argb_image to RgbaImage works properly.
634 let image = RgbaImage::from_raw(width, height, argb_image)
635 .with_context(|| "Bug: not enough bytes allocated for image.")
636 .log_err()?;
637
638 Some(Arc::new(RenderImage::new(SmallVec::from_elem(
639 Frame::new(image),
640 1,
641 ))))
642}
643
644#[cfg(target_os = "macos")]
645fn video_frame_buffer_to_webrtc(frame: ScreenCaptureFrame) -> Option<impl AsRef<dyn VideoBuffer>> {
646 use livekit::webrtc;
647
648 let pixel_buffer = frame.0.as_concrete_TypeRef();
649 std::mem::forget(frame.0);
650 unsafe {
651 Some(webrtc::video_frame::native::NativeBuffer::from_cv_pixel_buffer(pixel_buffer as _))
652 }
653}
654
655#[cfg(not(target_os = "macos"))]
656fn video_frame_buffer_to_webrtc(frame: ScreenCaptureFrame) -> Option<impl AsRef<dyn VideoBuffer>> {
657 use libwebrtc::native::yuv_helper::{abgr_to_nv12, argb_to_nv12};
658 use livekit::webrtc::prelude::NV12Buffer;
659 match frame.0 {
660 scap::frame::Frame::BGRx(frame) => {
661 let mut buffer = NV12Buffer::new(frame.width as u32, frame.height as u32);
662 let (stride_y, stride_uv) = buffer.strides();
663 let (data_y, data_uv) = buffer.data_mut();
664 argb_to_nv12(
665 &frame.data,
666 frame.width as u32 * 4,
667 data_y,
668 stride_y,
669 data_uv,
670 stride_uv,
671 frame.width,
672 frame.height,
673 );
674 Some(buffer)
675 }
676 scap::frame::Frame::RGBx(frame) => {
677 let mut buffer = NV12Buffer::new(frame.width as u32, frame.height as u32);
678 let (stride_y, stride_uv) = buffer.strides();
679 let (data_y, data_uv) = buffer.data_mut();
680 abgr_to_nv12(
681 &frame.data,
682 frame.width as u32 * 4,
683 data_y,
684 stride_y,
685 data_uv,
686 stride_uv,
687 frame.width,
688 frame.height,
689 );
690 Some(buffer)
691 }
692 scap::frame::Frame::YUVFrame(yuvframe) => {
693 let mut buffer = NV12Buffer::with_strides(
694 yuvframe.width as u32,
695 yuvframe.height as u32,
696 yuvframe.luminance_stride as u32,
697 yuvframe.chrominance_stride as u32,
698 );
699 let (luminance, chrominance) = buffer.data_mut();
700 luminance.copy_from_slice(yuvframe.luminance_bytes.as_slice());
701 chrominance.copy_from_slice(yuvframe.chrominance_bytes.as_slice());
702 Some(buffer)
703 }
704 _ => {
705 log::error!(
706 "Expected BGRx or YUV frame from scap screen capture but got some other format."
707 );
708 None
709 }
710 }
711}
712
713trait DeviceChangeListenerApi: Stream<Item = ()> + Sized {
714 fn new(input: bool) -> Result<Self>;
715}
716
717#[cfg(target_os = "macos")]
718mod macos {
719
720 use coreaudio::sys::{
721 AudioObjectAddPropertyListener, AudioObjectID, AudioObjectPropertyAddress,
722 AudioObjectRemovePropertyListener, OSStatus, kAudioHardwarePropertyDefaultInputDevice,
723 kAudioHardwarePropertyDefaultOutputDevice, kAudioObjectPropertyElementMaster,
724 kAudioObjectPropertyScopeGlobal, kAudioObjectSystemObject,
725 };
726 use futures::{StreamExt, channel::mpsc::UnboundedReceiver};
727
728 /// Implementation from: https://github.com/zed-industries/cpal/blob/fd8bc2fd39f1f5fdee5a0690656caff9a26d9d50/src/host/coreaudio/macos/property_listener.rs#L15
729 pub struct CoreAudioDefaultDeviceChangeListener {
730 rx: UnboundedReceiver<()>,
731 callback: Box<PropertyListenerCallbackWrapper>,
732 input: bool,
733 device_id: AudioObjectID, // Store the device ID to properly remove listeners
734 }
735
736 trait _AssertSend: Send {}
737 impl _AssertSend for CoreAudioDefaultDeviceChangeListener {}
738
739 struct PropertyListenerCallbackWrapper(Box<dyn FnMut() + Send>);
740
741 unsafe extern "C" fn property_listener_handler_shim(
742 _: AudioObjectID,
743 _: u32,
744 _: *const AudioObjectPropertyAddress,
745 callback: *mut ::std::os::raw::c_void,
746 ) -> OSStatus {
747 let wrapper = callback as *mut PropertyListenerCallbackWrapper;
748 unsafe { (*wrapper).0() };
749 0
750 }
751
752 impl super::DeviceChangeListenerApi for CoreAudioDefaultDeviceChangeListener {
753 fn new(input: bool) -> anyhow::Result<Self> {
754 let (tx, rx) = futures::channel::mpsc::unbounded();
755
756 let callback = Box::new(PropertyListenerCallbackWrapper(Box::new(move || {
757 tx.unbounded_send(()).ok();
758 })));
759
760 // Get the current default device ID
761 let device_id = unsafe {
762 // Listen for default device changes
763 coreaudio::Error::from_os_status(AudioObjectAddPropertyListener(
764 kAudioObjectSystemObject,
765 &AudioObjectPropertyAddress {
766 mSelector: if input {
767 kAudioHardwarePropertyDefaultInputDevice
768 } else {
769 kAudioHardwarePropertyDefaultOutputDevice
770 },
771 mScope: kAudioObjectPropertyScopeGlobal,
772 mElement: kAudioObjectPropertyElementMaster,
773 },
774 Some(property_listener_handler_shim),
775 &*callback as *const _ as *mut _,
776 ))?;
777
778 // Also listen for changes to the device configuration
779 let device_id = if input {
780 let mut input_device: AudioObjectID = 0;
781 let mut prop_size = std::mem::size_of::<AudioObjectID>() as u32;
782 let result = coreaudio::sys::AudioObjectGetPropertyData(
783 kAudioObjectSystemObject,
784 &AudioObjectPropertyAddress {
785 mSelector: kAudioHardwarePropertyDefaultInputDevice,
786 mScope: kAudioObjectPropertyScopeGlobal,
787 mElement: kAudioObjectPropertyElementMaster,
788 },
789 0,
790 std::ptr::null(),
791 &mut prop_size as *mut _,
792 &mut input_device as *mut _ as *mut _,
793 );
794 if result != 0 {
795 log::warn!("Failed to get default input device ID");
796 0
797 } else {
798 input_device
799 }
800 } else {
801 let mut output_device: AudioObjectID = 0;
802 let mut prop_size = std::mem::size_of::<AudioObjectID>() as u32;
803 let result = coreaudio::sys::AudioObjectGetPropertyData(
804 kAudioObjectSystemObject,
805 &AudioObjectPropertyAddress {
806 mSelector: kAudioHardwarePropertyDefaultOutputDevice,
807 mScope: kAudioObjectPropertyScopeGlobal,
808 mElement: kAudioObjectPropertyElementMaster,
809 },
810 0,
811 std::ptr::null(),
812 &mut prop_size as *mut _,
813 &mut output_device as *mut _ as *mut _,
814 );
815 if result != 0 {
816 log::warn!("Failed to get default output device ID");
817 0
818 } else {
819 output_device
820 }
821 };
822
823 if device_id != 0 {
824 // Listen for format changes on the device
825 coreaudio::Error::from_os_status(AudioObjectAddPropertyListener(
826 device_id,
827 &AudioObjectPropertyAddress {
828 mSelector: coreaudio::sys::kAudioDevicePropertyStreamFormat,
829 mScope: if input {
830 coreaudio::sys::kAudioObjectPropertyScopeInput
831 } else {
832 coreaudio::sys::kAudioObjectPropertyScopeOutput
833 },
834 mElement: kAudioObjectPropertyElementMaster,
835 },
836 Some(property_listener_handler_shim),
837 &*callback as *const _ as *mut _,
838 ))?;
839 }
840
841 device_id
842 };
843
844 Ok(Self {
845 rx,
846 callback,
847 input,
848 device_id,
849 })
850 }
851 }
852
853 impl Drop for CoreAudioDefaultDeviceChangeListener {
854 fn drop(&mut self) {
855 unsafe {
856 // Remove the system-level property listener
857 AudioObjectRemovePropertyListener(
858 kAudioObjectSystemObject,
859 &AudioObjectPropertyAddress {
860 mSelector: if self.input {
861 kAudioHardwarePropertyDefaultInputDevice
862 } else {
863 kAudioHardwarePropertyDefaultOutputDevice
864 },
865 mScope: kAudioObjectPropertyScopeGlobal,
866 mElement: kAudioObjectPropertyElementMaster,
867 },
868 Some(property_listener_handler_shim),
869 &*self.callback as *const _ as *mut _,
870 );
871
872 // Remove the device-specific property listener if we have a valid device ID
873 if self.device_id != 0 {
874 AudioObjectRemovePropertyListener(
875 self.device_id,
876 &AudioObjectPropertyAddress {
877 mSelector: coreaudio::sys::kAudioDevicePropertyStreamFormat,
878 mScope: if self.input {
879 coreaudio::sys::kAudioObjectPropertyScopeInput
880 } else {
881 coreaudio::sys::kAudioObjectPropertyScopeOutput
882 },
883 mElement: kAudioObjectPropertyElementMaster,
884 },
885 Some(property_listener_handler_shim),
886 &*self.callback as *const _ as *mut _,
887 );
888 }
889 }
890 }
891 }
892
893 impl futures::Stream for CoreAudioDefaultDeviceChangeListener {
894 type Item = ();
895
896 fn poll_next(
897 mut self: std::pin::Pin<&mut Self>,
898 cx: &mut std::task::Context<'_>,
899 ) -> std::task::Poll<Option<Self::Item>> {
900 self.rx.poll_next_unpin(cx)
901 }
902 }
903}
904
905#[cfg(target_os = "macos")]
906type DeviceChangeListener = macos::CoreAudioDefaultDeviceChangeListener;
907
908#[cfg(not(target_os = "macos"))]
909mod noop_change_listener {
910 use std::task::Poll;
911
912 use super::DeviceChangeListenerApi;
913
914 pub struct NoopOutputDeviceChangelistener {}
915
916 impl DeviceChangeListenerApi for NoopOutputDeviceChangelistener {
917 fn new(_input: bool) -> anyhow::Result<Self> {
918 Ok(NoopOutputDeviceChangelistener {})
919 }
920 }
921
922 impl futures::Stream for NoopOutputDeviceChangelistener {
923 type Item = ();
924
925 fn poll_next(
926 self: std::pin::Pin<&mut Self>,
927 _cx: &mut std::task::Context<'_>,
928 ) -> Poll<Option<Self::Item>> {
929 Poll::Pending
930 }
931 }
932}
933
934#[cfg(not(target_os = "macos"))]
935type DeviceChangeListener = noop_change_listener::NoopOutputDeviceChangelistener;