1use anyhow::{Context as _, Result};
2
3use audio::{AudioSettings, CHANNEL_COUNT, SAMPLE_RATE};
4use cpal::traits::{DeviceTrait, StreamTrait as _};
5use futures::channel::mpsc::UnboundedSender;
6use futures::{Stream, StreamExt as _};
7use gpui::{
8 AsyncApp, BackgroundExecutor, ScreenCaptureFrame, ScreenCaptureSource, ScreenCaptureStream,
9 Task,
10};
11use libwebrtc::native::{apm, audio_mixer, audio_resampler};
12use livekit::track;
13
14use livekit::webrtc::{
15 audio_frame::AudioFrame,
16 audio_source::{AudioSourceOptions, RtcAudioSource, native::NativeAudioSource},
17 audio_stream::native::NativeAudioStream,
18 video_frame::{VideoBuffer, VideoFrame, VideoRotation},
19 video_source::{RtcVideoSource, VideoResolution, native::NativeVideoSource},
20 video_stream::native::NativeVideoStream,
21};
22use log::info;
23use parking_lot::Mutex;
24use rodio::Source;
25use serde::{Deserialize, Serialize};
26use settings::Settings;
27use std::cell::RefCell;
28use std::sync::Weak;
29use std::sync::atomic::{AtomicBool, AtomicI32, Ordering};
30use std::time::Duration;
31use std::{borrow::Cow, collections::VecDeque, sync::Arc, thread};
32use util::{ResultExt as _, maybe};
33
34mod source;
35
36pub(crate) struct AudioStack {
37 executor: BackgroundExecutor,
38 apm: Arc<Mutex<apm::AudioProcessingModule>>,
39 mixer: Arc<Mutex<audio_mixer::AudioMixer>>,
40 _output_task: RefCell<Weak<Task<()>>>,
41 next_ssrc: AtomicI32,
42}
43
44pub(crate) fn play_remote_audio_track(
45 track: &livekit::track::RemoteAudioTrack,
46 cx: &mut gpui::App,
47) -> Result<AudioStream> {
48 let stop_handle = Arc::new(AtomicBool::new(false));
49 let stop_handle_clone = stop_handle.clone();
50 let stream = source::LiveKitStream::new(cx.background_executor(), track);
51
52 let stream = stream
53 .stoppable()
54 .periodic_access(Duration::from_millis(50), move |s| {
55 if stop_handle.load(Ordering::Relaxed) {
56 s.stop();
57 }
58 });
59
60 let speaker: Speaker = serde_urlencoded::from_str(&track.name()).unwrap_or_else(|_| Speaker {
61 name: track.name(),
62 is_staff: false,
63 });
64 audio::Audio::play_voip_stream(stream, speaker.name, speaker.is_staff, cx)
65 .context("Could not play audio")?;
66
67 let on_drop = util::defer(move || {
68 stop_handle_clone.store(true, Ordering::Relaxed);
69 });
70 Ok(AudioStream::Output {
71 _drop: Box::new(on_drop),
72 })
73}
74
75impl AudioStack {
76 pub(crate) fn new(executor: BackgroundExecutor) -> Self {
77 let apm = Arc::new(Mutex::new(apm::AudioProcessingModule::new(
78 true, true, true, true,
79 )));
80 let mixer = Arc::new(Mutex::new(audio_mixer::AudioMixer::new()));
81 Self {
82 executor,
83 apm,
84 mixer,
85 _output_task: RefCell::new(Weak::new()),
86 next_ssrc: AtomicI32::new(1),
87 }
88 }
89
90 pub(crate) fn play_remote_audio_track(
91 &self,
92 track: &livekit::track::RemoteAudioTrack,
93 ) -> AudioStream {
94 let output_task = self.start_output();
95
96 let next_ssrc = self.next_ssrc.fetch_add(1, Ordering::Relaxed);
97 let source = AudioMixerSource {
98 ssrc: next_ssrc,
99 sample_rate: SAMPLE_RATE.get(),
100 num_channels: CHANNEL_COUNT.get() as u32,
101 buffer: Arc::default(),
102 };
103 self.mixer.lock().add_source(source.clone());
104
105 let mut stream = NativeAudioStream::new(
106 track.rtc_track(),
107 source.sample_rate as i32,
108 source.num_channels as i32,
109 );
110
111 let receive_task = self.executor.spawn({
112 let source = source.clone();
113 async move {
114 while let Some(frame) = stream.next().await {
115 source.receive(frame);
116 }
117 }
118 });
119
120 let mixer = self.mixer.clone();
121 let on_drop = util::defer(move || {
122 mixer.lock().remove_source(source.ssrc);
123 drop(receive_task);
124 drop(output_task);
125 });
126
127 AudioStream::Output {
128 _drop: Box::new(on_drop),
129 }
130 }
131
132 fn start_output(&self) -> Arc<Task<()>> {
133 if let Some(task) = self._output_task.borrow().upgrade() {
134 return task;
135 }
136 let task = Arc::new(self.executor.spawn({
137 let apm = self.apm.clone();
138 let mixer = self.mixer.clone();
139 async move {
140 Self::play_output(apm, mixer, SAMPLE_RATE.get(), CHANNEL_COUNT.get().into())
141 .await
142 .log_err();
143 }
144 }));
145 *self._output_task.borrow_mut() = Arc::downgrade(&task);
146 task
147 }
148
149 pub(crate) fn capture_local_microphone_track(
150 &self,
151 user_name: String,
152 is_staff: bool,
153 cx: &AsyncApp,
154 ) -> Result<(crate::LocalAudioTrack, AudioStream)> {
155 let source = NativeAudioSource::new(
156 // n.b. this struct's options are always ignored, noise cancellation is provided by apm.
157 AudioSourceOptions::default(),
158 SAMPLE_RATE.get(),
159 CHANNEL_COUNT.get().into(),
160 10,
161 );
162
163 let track_name = serde_urlencoded::to_string(Speaker {
164 name: user_name,
165 is_staff,
166 })
167 .context("Could not encode user information in track name")?;
168
169 let track = track::LocalAudioTrack::create_audio_track(
170 &track_name,
171 RtcAudioSource::Native(source.clone()),
172 );
173
174 let apm = self.apm.clone();
175
176 let (frame_tx, mut frame_rx) = futures::channel::mpsc::unbounded();
177 let transmit_task = self.executor.spawn({
178 async move {
179 while let Some(frame) = frame_rx.next().await {
180 source.capture_frame(&frame).await.log_err();
181 }
182 }
183 });
184 let rodio_pipeline =
185 AudioSettings::try_read_global(cx, |setting| setting.rodio_audio).unwrap_or_default();
186 let capture_task = if rodio_pipeline {
187 info!("Using experimental.rodio_audio audio pipeline");
188 let voip_parts = audio::VoipParts::new(cx)?;
189 // Audio needs to run real-time and should never be paused. That is why we are using a
190 // normal std::thread and not a background task
191 thread::spawn(move || {
192 // microphone is non send on mac
193 let microphone = audio::Audio::open_microphone(voip_parts)?;
194 send_to_livekit(frame_tx, microphone);
195 Ok::<(), anyhow::Error>(())
196 });
197 Task::ready(Ok(()))
198 } else {
199 self.executor.spawn(async move {
200 Self::capture_input(apm, frame_tx, SAMPLE_RATE.get(), CHANNEL_COUNT.get().into())
201 .await
202 })
203 };
204
205 let on_drop = util::defer(|| {
206 drop(transmit_task);
207 drop(capture_task);
208 });
209 Ok((
210 super::LocalAudioTrack(track),
211 AudioStream::Output {
212 _drop: Box::new(on_drop),
213 },
214 ))
215 }
216
217 async fn play_output(
218 apm: Arc<Mutex<apm::AudioProcessingModule>>,
219 mixer: Arc<Mutex<audio_mixer::AudioMixer>>,
220 sample_rate: u32,
221 num_channels: u32,
222 ) -> Result<()> {
223 loop {
224 let mut device_change_listener = DeviceChangeListener::new(false)?;
225 let (output_device, output_config) = crate::default_device(false)?;
226 let (end_on_drop_tx, end_on_drop_rx) = std::sync::mpsc::channel::<()>();
227 let mixer = mixer.clone();
228 let apm = apm.clone();
229 let mut resampler = audio_resampler::AudioResampler::default();
230 let mut buf = Vec::new();
231
232 thread::spawn(move || {
233 let output_stream = output_device.build_output_stream(
234 &output_config.config(),
235 {
236 move |mut data, _info| {
237 while data.len() > 0 {
238 if data.len() <= buf.len() {
239 let rest = buf.split_off(data.len());
240 data.copy_from_slice(&buf);
241 buf = rest;
242 return;
243 }
244 if buf.len() > 0 {
245 let (prefix, suffix) = data.split_at_mut(buf.len());
246 prefix.copy_from_slice(&buf);
247 data = suffix;
248 }
249
250 let mut mixer = mixer.lock();
251 let mixed = mixer.mix(output_config.channels() as usize);
252 let sampled = resampler.remix_and_resample(
253 mixed,
254 sample_rate / 100,
255 num_channels,
256 sample_rate,
257 output_config.channels() as u32,
258 output_config.sample_rate().0,
259 );
260 buf = sampled.to_vec();
261 apm.lock()
262 .process_reverse_stream(
263 &mut buf,
264 output_config.sample_rate().0 as i32,
265 output_config.channels() as i32,
266 )
267 .ok();
268 }
269 }
270 },
271 |error| log::error!("error playing audio track: {:?}", error),
272 Some(Duration::from_millis(100)),
273 );
274
275 let Some(output_stream) = output_stream.log_err() else {
276 return;
277 };
278
279 output_stream.play().log_err();
280 // Block forever to keep the output stream alive
281 end_on_drop_rx.recv().ok();
282 });
283
284 device_change_listener.next().await;
285 drop(end_on_drop_tx)
286 }
287 }
288
289 async fn capture_input(
290 apm: Arc<Mutex<apm::AudioProcessingModule>>,
291 frame_tx: UnboundedSender<AudioFrame<'static>>,
292 sample_rate: u32,
293 num_channels: u32,
294 ) -> Result<()> {
295 loop {
296 let mut device_change_listener = DeviceChangeListener::new(true)?;
297 let (device, config) = crate::default_device(true)?;
298 let (end_on_drop_tx, end_on_drop_rx) = std::sync::mpsc::channel::<()>();
299 let apm = apm.clone();
300 let frame_tx = frame_tx.clone();
301 let mut resampler = audio_resampler::AudioResampler::default();
302
303 thread::spawn(move || {
304 maybe!({
305 if let Some(name) = device.name().ok() {
306 log::info!("Using microphone: {}", name)
307 } else {
308 log::info!("Using microphone: <unknown>");
309 }
310
311 let ten_ms_buffer_size =
312 (config.channels() as u32 * config.sample_rate().0 / 100) as usize;
313 let mut buf: Vec<i16> = Vec::with_capacity(ten_ms_buffer_size);
314
315 let stream = device
316 .build_input_stream_raw(
317 &config.config(),
318 config.sample_format(),
319 move |data, _: &_| {
320 let data =
321 crate::get_sample_data(config.sample_format(), data).log_err();
322 let Some(data) = data else {
323 return;
324 };
325 let mut data = data.as_slice();
326
327 while data.len() > 0 {
328 let remainder = (buf.capacity() - buf.len()).min(data.len());
329 buf.extend_from_slice(&data[..remainder]);
330 data = &data[remainder..];
331
332 if buf.capacity() == buf.len() {
333 let mut sampled = resampler
334 .remix_and_resample(
335 buf.as_slice(),
336 config.sample_rate().0 / 100,
337 config.channels() as u32,
338 config.sample_rate().0,
339 num_channels,
340 sample_rate,
341 )
342 .to_owned();
343 apm.lock()
344 .process_stream(
345 &mut sampled,
346 sample_rate as i32,
347 num_channels as i32,
348 )
349 .log_err();
350 buf.clear();
351 frame_tx
352 .unbounded_send(AudioFrame {
353 data: Cow::Owned(sampled),
354 sample_rate,
355 num_channels,
356 samples_per_channel: sample_rate / 100,
357 })
358 .ok();
359 }
360 }
361 },
362 |err| log::error!("error capturing audio track: {:?}", err),
363 Some(Duration::from_millis(100)),
364 )
365 .context("failed to build input stream")?;
366
367 stream.play()?;
368 // Keep the thread alive and holding onto the `stream`
369 end_on_drop_rx.recv().ok();
370 anyhow::Ok(Some(()))
371 })
372 .log_err();
373 });
374
375 device_change_listener.next().await;
376 drop(end_on_drop_tx)
377 }
378 }
379}
380
381#[derive(Serialize, Deserialize)]
382struct Speaker {
383 name: String,
384 is_staff: bool,
385}
386
387fn send_to_livekit(frame_tx: UnboundedSender<AudioFrame<'static>>, mut microphone: impl Source) {
388 use cpal::Sample;
389 loop {
390 let sampled: Vec<_> = microphone
391 .by_ref()
392 .take(audio::BUFFER_SIZE)
393 .map(|s| s.to_sample())
394 .collect();
395
396 if frame_tx
397 .unbounded_send(AudioFrame {
398 sample_rate: SAMPLE_RATE.get(),
399 num_channels: CHANNEL_COUNT.get() as u32,
400 samples_per_channel: sampled.len() as u32 / CHANNEL_COUNT.get() as u32,
401 data: Cow::Owned(sampled),
402 })
403 .is_err()
404 {
405 // must rx has dropped or is not consuming
406 break;
407 }
408 }
409}
410
411use super::LocalVideoTrack;
412
413pub enum AudioStream {
414 Input { _task: Task<()> },
415 Output { _drop: Box<dyn std::any::Any> },
416}
417
418pub(crate) async fn capture_local_video_track(
419 capture_source: &dyn ScreenCaptureSource,
420 cx: &mut gpui::AsyncApp,
421) -> Result<(crate::LocalVideoTrack, Box<dyn ScreenCaptureStream>)> {
422 let metadata = capture_source.metadata()?;
423 let track_source = gpui_tokio::Tokio::spawn(cx, async move {
424 NativeVideoSource::new(VideoResolution {
425 width: metadata.resolution.width.0 as u32,
426 height: metadata.resolution.height.0 as u32,
427 })
428 })?
429 .await?;
430
431 let capture_stream = capture_source
432 .stream(cx.foreground_executor(), {
433 let track_source = track_source.clone();
434 Box::new(move |frame| {
435 if let Some(buffer) = video_frame_buffer_to_webrtc(frame) {
436 track_source.capture_frame(&VideoFrame {
437 rotation: VideoRotation::VideoRotation0,
438 timestamp_us: 0,
439 buffer,
440 });
441 }
442 })
443 })
444 .await??;
445
446 Ok((
447 LocalVideoTrack(track::LocalVideoTrack::create_video_track(
448 "screen share",
449 RtcVideoSource::Native(track_source),
450 )),
451 capture_stream,
452 ))
453}
454
455#[derive(Clone)]
456struct AudioMixerSource {
457 ssrc: i32,
458 sample_rate: u32,
459 num_channels: u32,
460 buffer: Arc<Mutex<VecDeque<Vec<i16>>>>,
461}
462
463impl AudioMixerSource {
464 fn receive(&self, frame: AudioFrame) {
465 assert_eq!(
466 frame.data.len() as u32,
467 self.sample_rate * self.num_channels / 100
468 );
469
470 let mut buffer = self.buffer.lock();
471 buffer.push_back(frame.data.to_vec());
472 while buffer.len() > 10 {
473 buffer.pop_front();
474 }
475 }
476}
477
478impl libwebrtc::native::audio_mixer::AudioMixerSource for AudioMixerSource {
479 fn ssrc(&self) -> i32 {
480 self.ssrc
481 }
482
483 fn preferred_sample_rate(&self) -> u32 {
484 self.sample_rate
485 }
486
487 fn get_audio_frame_with_info<'a>(&self, target_sample_rate: u32) -> Option<AudioFrame<'_>> {
488 assert_eq!(self.sample_rate, target_sample_rate);
489 let buf = self.buffer.lock().pop_front()?;
490 Some(AudioFrame {
491 data: Cow::Owned(buf),
492 sample_rate: self.sample_rate,
493 num_channels: self.num_channels,
494 samples_per_channel: self.sample_rate / 100,
495 })
496 }
497}
498
499pub fn play_remote_video_track(
500 track: &crate::RemoteVideoTrack,
501) -> impl Stream<Item = RemoteVideoFrame> + use<> {
502 #[cfg(target_os = "macos")]
503 {
504 let mut pool = None;
505 let most_recent_frame_size = (0, 0);
506 NativeVideoStream::new(track.0.rtc_track()).filter_map(move |frame| {
507 if pool == None
508 || most_recent_frame_size != (frame.buffer.width(), frame.buffer.height())
509 {
510 pool = create_buffer_pool(frame.buffer.width(), frame.buffer.height()).log_err();
511 }
512 let pool = pool.clone();
513 async move {
514 if frame.buffer.width() < 10 && frame.buffer.height() < 10 {
515 // when the remote stops sharing, we get an 8x8 black image.
516 // In a lil bit, the unpublish will come through and close the view,
517 // but until then, don't flash black.
518 return None;
519 }
520
521 video_frame_buffer_from_webrtc(pool?, frame.buffer)
522 }
523 })
524 }
525 #[cfg(not(target_os = "macos"))]
526 {
527 NativeVideoStream::new(track.0.rtc_track())
528 .filter_map(|frame| async move { video_frame_buffer_from_webrtc(frame.buffer) })
529 }
530}
531
532#[cfg(target_os = "macos")]
533fn create_buffer_pool(
534 width: u32,
535 height: u32,
536) -> Result<core_video::pixel_buffer_pool::CVPixelBufferPool> {
537 use core_foundation::{base::TCFType, number::CFNumber, string::CFString};
538 use core_video::pixel_buffer;
539 use core_video::{
540 pixel_buffer::kCVPixelFormatType_420YpCbCr8BiPlanarFullRange,
541 pixel_buffer_io_surface::kCVPixelBufferIOSurfaceCoreAnimationCompatibilityKey,
542 pixel_buffer_pool::{self},
543 };
544
545 let width_key: CFString =
546 unsafe { CFString::wrap_under_get_rule(pixel_buffer::kCVPixelBufferWidthKey) };
547 let height_key: CFString =
548 unsafe { CFString::wrap_under_get_rule(pixel_buffer::kCVPixelBufferHeightKey) };
549 let animation_key: CFString = unsafe {
550 CFString::wrap_under_get_rule(kCVPixelBufferIOSurfaceCoreAnimationCompatibilityKey)
551 };
552 let format_key: CFString =
553 unsafe { CFString::wrap_under_get_rule(pixel_buffer::kCVPixelBufferPixelFormatTypeKey) };
554
555 let yes: CFNumber = 1.into();
556 let width: CFNumber = (width as i32).into();
557 let height: CFNumber = (height as i32).into();
558 let format: CFNumber = (kCVPixelFormatType_420YpCbCr8BiPlanarFullRange as i64).into();
559
560 let buffer_attributes = core_foundation::dictionary::CFDictionary::from_CFType_pairs(&[
561 (width_key, width.into_CFType()),
562 (height_key, height.into_CFType()),
563 (animation_key, yes.into_CFType()),
564 (format_key, format.into_CFType()),
565 ]);
566
567 pixel_buffer_pool::CVPixelBufferPool::new(None, Some(&buffer_attributes)).map_err(|cv_return| {
568 anyhow::anyhow!("failed to create pixel buffer pool: CVReturn({cv_return})",)
569 })
570}
571
572#[cfg(target_os = "macos")]
573pub type RemoteVideoFrame = core_video::pixel_buffer::CVPixelBuffer;
574
575#[cfg(target_os = "macos")]
576fn video_frame_buffer_from_webrtc(
577 pool: core_video::pixel_buffer_pool::CVPixelBufferPool,
578 buffer: Box<dyn VideoBuffer>,
579) -> Option<RemoteVideoFrame> {
580 use core_foundation::base::TCFType;
581 use core_video::{pixel_buffer::CVPixelBuffer, r#return::kCVReturnSuccess};
582 use livekit::webrtc::native::yuv_helper::i420_to_nv12;
583
584 if let Some(native) = buffer.as_native() {
585 let pixel_buffer = native.get_cv_pixel_buffer();
586 if pixel_buffer.is_null() {
587 return None;
588 }
589 return unsafe { Some(CVPixelBuffer::wrap_under_get_rule(pixel_buffer as _)) };
590 }
591
592 let i420_buffer = buffer.as_i420()?;
593 let pixel_buffer = pool.create_pixel_buffer().log_err()?;
594
595 let image_buffer = unsafe {
596 if pixel_buffer.lock_base_address(0) != kCVReturnSuccess {
597 return None;
598 }
599
600 let dst_y = pixel_buffer.get_base_address_of_plane(0);
601 let dst_y_stride = pixel_buffer.get_bytes_per_row_of_plane(0);
602 let dst_y_len = pixel_buffer.get_height_of_plane(0) * dst_y_stride;
603 let dst_uv = pixel_buffer.get_base_address_of_plane(1);
604 let dst_uv_stride = pixel_buffer.get_bytes_per_row_of_plane(1);
605 let dst_uv_len = pixel_buffer.get_height_of_plane(1) * dst_uv_stride;
606 let width = pixel_buffer.get_width();
607 let height = pixel_buffer.get_height();
608 let dst_y_buffer = std::slice::from_raw_parts_mut(dst_y as *mut u8, dst_y_len);
609 let dst_uv_buffer = std::slice::from_raw_parts_mut(dst_uv as *mut u8, dst_uv_len);
610
611 let (stride_y, stride_u, stride_v) = i420_buffer.strides();
612 let (src_y, src_u, src_v) = i420_buffer.data();
613 i420_to_nv12(
614 src_y,
615 stride_y,
616 src_u,
617 stride_u,
618 src_v,
619 stride_v,
620 dst_y_buffer,
621 dst_y_stride as u32,
622 dst_uv_buffer,
623 dst_uv_stride as u32,
624 width as i32,
625 height as i32,
626 );
627
628 if pixel_buffer.unlock_base_address(0) != kCVReturnSuccess {
629 return None;
630 }
631
632 pixel_buffer
633 };
634
635 Some(image_buffer)
636}
637
638#[cfg(not(target_os = "macos"))]
639pub type RemoteVideoFrame = Arc<gpui::RenderImage>;
640
641#[cfg(not(target_os = "macos"))]
642fn video_frame_buffer_from_webrtc(buffer: Box<dyn VideoBuffer>) -> Option<RemoteVideoFrame> {
643 use gpui::RenderImage;
644 use image::{Frame, RgbaImage};
645 use livekit::webrtc::prelude::VideoFormatType;
646 use smallvec::SmallVec;
647 use std::alloc::{Layout, alloc};
648
649 let width = buffer.width();
650 let height = buffer.height();
651 let stride = width * 4;
652 let byte_len = (stride * height) as usize;
653 let argb_image = unsafe {
654 // Motivation for this unsafe code is to avoid initializing the frame data, since to_argb
655 // will write all bytes anyway.
656 let start_ptr = alloc(Layout::array::<u8>(byte_len).log_err()?);
657 if start_ptr.is_null() {
658 return None;
659 }
660 let argb_frame_slice = std::slice::from_raw_parts_mut(start_ptr, byte_len);
661 buffer.to_argb(
662 VideoFormatType::ARGB,
663 argb_frame_slice,
664 stride,
665 width as i32,
666 height as i32,
667 );
668 Vec::from_raw_parts(start_ptr, byte_len, byte_len)
669 };
670
671 // TODO: Unclear why providing argb_image to RgbaImage works properly.
672 let image = RgbaImage::from_raw(width, height, argb_image)
673 .with_context(|| "Bug: not enough bytes allocated for image.")
674 .log_err()?;
675
676 Some(Arc::new(RenderImage::new(SmallVec::from_elem(
677 Frame::new(image),
678 1,
679 ))))
680}
681
682#[cfg(target_os = "macos")]
683fn video_frame_buffer_to_webrtc(frame: ScreenCaptureFrame) -> Option<impl AsRef<dyn VideoBuffer>> {
684 use livekit::webrtc;
685
686 let pixel_buffer = frame.0.as_concrete_TypeRef();
687 std::mem::forget(frame.0);
688 unsafe {
689 Some(webrtc::video_frame::native::NativeBuffer::from_cv_pixel_buffer(pixel_buffer as _))
690 }
691}
692
693#[cfg(not(target_os = "macos"))]
694fn video_frame_buffer_to_webrtc(frame: ScreenCaptureFrame) -> Option<impl AsRef<dyn VideoBuffer>> {
695 use libwebrtc::native::yuv_helper::{abgr_to_nv12, argb_to_nv12};
696 use livekit::webrtc::prelude::NV12Buffer;
697 match frame.0 {
698 scap::frame::Frame::BGRx(frame) => {
699 let mut buffer = NV12Buffer::new(frame.width as u32, frame.height as u32);
700 let (stride_y, stride_uv) = buffer.strides();
701 let (data_y, data_uv) = buffer.data_mut();
702 argb_to_nv12(
703 &frame.data,
704 frame.width as u32 * 4,
705 data_y,
706 stride_y,
707 data_uv,
708 stride_uv,
709 frame.width,
710 frame.height,
711 );
712 Some(buffer)
713 }
714 scap::frame::Frame::RGBx(frame) => {
715 let mut buffer = NV12Buffer::new(frame.width as u32, frame.height as u32);
716 let (stride_y, stride_uv) = buffer.strides();
717 let (data_y, data_uv) = buffer.data_mut();
718 abgr_to_nv12(
719 &frame.data,
720 frame.width as u32 * 4,
721 data_y,
722 stride_y,
723 data_uv,
724 stride_uv,
725 frame.width,
726 frame.height,
727 );
728 Some(buffer)
729 }
730 scap::frame::Frame::YUVFrame(yuvframe) => {
731 let mut buffer = NV12Buffer::with_strides(
732 yuvframe.width as u32,
733 yuvframe.height as u32,
734 yuvframe.luminance_stride as u32,
735 yuvframe.chrominance_stride as u32,
736 );
737 let (luminance, chrominance) = buffer.data_mut();
738 luminance.copy_from_slice(yuvframe.luminance_bytes.as_slice());
739 chrominance.copy_from_slice(yuvframe.chrominance_bytes.as_slice());
740 Some(buffer)
741 }
742 _ => {
743 log::error!(
744 "Expected BGRx or YUV frame from scap screen capture but got some other format."
745 );
746 None
747 }
748 }
749}
750
751trait DeviceChangeListenerApi: Stream<Item = ()> + Sized {
752 fn new(input: bool) -> Result<Self>;
753}
754
755#[cfg(target_os = "macos")]
756mod macos {
757
758 use coreaudio::sys::{
759 AudioObjectAddPropertyListener, AudioObjectID, AudioObjectPropertyAddress,
760 AudioObjectRemovePropertyListener, OSStatus, kAudioHardwarePropertyDefaultInputDevice,
761 kAudioHardwarePropertyDefaultOutputDevice, kAudioObjectPropertyElementMaster,
762 kAudioObjectPropertyScopeGlobal, kAudioObjectSystemObject,
763 };
764 use futures::{StreamExt, channel::mpsc::UnboundedReceiver};
765
766 /// Implementation from: https://github.com/zed-industries/cpal/blob/fd8bc2fd39f1f5fdee5a0690656caff9a26d9d50/src/host/coreaudio/macos/property_listener.rs#L15
767 pub struct CoreAudioDefaultDeviceChangeListener {
768 rx: UnboundedReceiver<()>,
769 callback: Box<PropertyListenerCallbackWrapper>,
770 input: bool,
771 device_id: AudioObjectID, // Store the device ID to properly remove listeners
772 }
773
774 trait _AssertSend: Send {}
775 impl _AssertSend for CoreAudioDefaultDeviceChangeListener {}
776
777 struct PropertyListenerCallbackWrapper(Box<dyn FnMut() + Send>);
778
779 unsafe extern "C" fn property_listener_handler_shim(
780 _: AudioObjectID,
781 _: u32,
782 _: *const AudioObjectPropertyAddress,
783 callback: *mut ::std::os::raw::c_void,
784 ) -> OSStatus {
785 let wrapper = callback as *mut PropertyListenerCallbackWrapper;
786 unsafe { (*wrapper).0() };
787 0
788 }
789
790 impl super::DeviceChangeListenerApi for CoreAudioDefaultDeviceChangeListener {
791 fn new(input: bool) -> anyhow::Result<Self> {
792 let (tx, rx) = futures::channel::mpsc::unbounded();
793
794 let callback = Box::new(PropertyListenerCallbackWrapper(Box::new(move || {
795 tx.unbounded_send(()).ok();
796 })));
797
798 // Get the current default device ID
799 let device_id = unsafe {
800 // Listen for default device changes
801 coreaudio::Error::from_os_status(AudioObjectAddPropertyListener(
802 kAudioObjectSystemObject,
803 &AudioObjectPropertyAddress {
804 mSelector: if input {
805 kAudioHardwarePropertyDefaultInputDevice
806 } else {
807 kAudioHardwarePropertyDefaultOutputDevice
808 },
809 mScope: kAudioObjectPropertyScopeGlobal,
810 mElement: kAudioObjectPropertyElementMaster,
811 },
812 Some(property_listener_handler_shim),
813 &*callback as *const _ as *mut _,
814 ))?;
815
816 // Also listen for changes to the device configuration
817 let device_id = if input {
818 let mut input_device: AudioObjectID = 0;
819 let mut prop_size = std::mem::size_of::<AudioObjectID>() as u32;
820 let result = coreaudio::sys::AudioObjectGetPropertyData(
821 kAudioObjectSystemObject,
822 &AudioObjectPropertyAddress {
823 mSelector: kAudioHardwarePropertyDefaultInputDevice,
824 mScope: kAudioObjectPropertyScopeGlobal,
825 mElement: kAudioObjectPropertyElementMaster,
826 },
827 0,
828 std::ptr::null(),
829 &mut prop_size as *mut _,
830 &mut input_device as *mut _ as *mut _,
831 );
832 if result != 0 {
833 log::warn!("Failed to get default input device ID");
834 0
835 } else {
836 input_device
837 }
838 } else {
839 let mut output_device: AudioObjectID = 0;
840 let mut prop_size = std::mem::size_of::<AudioObjectID>() as u32;
841 let result = coreaudio::sys::AudioObjectGetPropertyData(
842 kAudioObjectSystemObject,
843 &AudioObjectPropertyAddress {
844 mSelector: kAudioHardwarePropertyDefaultOutputDevice,
845 mScope: kAudioObjectPropertyScopeGlobal,
846 mElement: kAudioObjectPropertyElementMaster,
847 },
848 0,
849 std::ptr::null(),
850 &mut prop_size as *mut _,
851 &mut output_device as *mut _ as *mut _,
852 );
853 if result != 0 {
854 log::warn!("Failed to get default output device ID");
855 0
856 } else {
857 output_device
858 }
859 };
860
861 if device_id != 0 {
862 // Listen for format changes on the device
863 coreaudio::Error::from_os_status(AudioObjectAddPropertyListener(
864 device_id,
865 &AudioObjectPropertyAddress {
866 mSelector: coreaudio::sys::kAudioDevicePropertyStreamFormat,
867 mScope: if input {
868 coreaudio::sys::kAudioObjectPropertyScopeInput
869 } else {
870 coreaudio::sys::kAudioObjectPropertyScopeOutput
871 },
872 mElement: kAudioObjectPropertyElementMaster,
873 },
874 Some(property_listener_handler_shim),
875 &*callback as *const _ as *mut _,
876 ))?;
877 }
878
879 device_id
880 };
881
882 Ok(Self {
883 rx,
884 callback,
885 input,
886 device_id,
887 })
888 }
889 }
890
891 impl Drop for CoreAudioDefaultDeviceChangeListener {
892 fn drop(&mut self) {
893 unsafe {
894 // Remove the system-level property listener
895 AudioObjectRemovePropertyListener(
896 kAudioObjectSystemObject,
897 &AudioObjectPropertyAddress {
898 mSelector: if self.input {
899 kAudioHardwarePropertyDefaultInputDevice
900 } else {
901 kAudioHardwarePropertyDefaultOutputDevice
902 },
903 mScope: kAudioObjectPropertyScopeGlobal,
904 mElement: kAudioObjectPropertyElementMaster,
905 },
906 Some(property_listener_handler_shim),
907 &*self.callback as *const _ as *mut _,
908 );
909
910 // Remove the device-specific property listener if we have a valid device ID
911 if self.device_id != 0 {
912 AudioObjectRemovePropertyListener(
913 self.device_id,
914 &AudioObjectPropertyAddress {
915 mSelector: coreaudio::sys::kAudioDevicePropertyStreamFormat,
916 mScope: if self.input {
917 coreaudio::sys::kAudioObjectPropertyScopeInput
918 } else {
919 coreaudio::sys::kAudioObjectPropertyScopeOutput
920 },
921 mElement: kAudioObjectPropertyElementMaster,
922 },
923 Some(property_listener_handler_shim),
924 &*self.callback as *const _ as *mut _,
925 );
926 }
927 }
928 }
929 }
930
931 impl futures::Stream for CoreAudioDefaultDeviceChangeListener {
932 type Item = ();
933
934 fn poll_next(
935 mut self: std::pin::Pin<&mut Self>,
936 cx: &mut std::task::Context<'_>,
937 ) -> std::task::Poll<Option<Self::Item>> {
938 self.rx.poll_next_unpin(cx)
939 }
940 }
941}
942
943#[cfg(target_os = "macos")]
944type DeviceChangeListener = macos::CoreAudioDefaultDeviceChangeListener;
945
946#[cfg(not(target_os = "macos"))]
947mod noop_change_listener {
948 use std::task::Poll;
949
950 use super::DeviceChangeListenerApi;
951
952 pub struct NoopOutputDeviceChangelistener {}
953
954 impl DeviceChangeListenerApi for NoopOutputDeviceChangelistener {
955 fn new(_input: bool) -> anyhow::Result<Self> {
956 Ok(NoopOutputDeviceChangelistener {})
957 }
958 }
959
960 impl futures::Stream for NoopOutputDeviceChangelistener {
961 type Item = ();
962
963 fn poll_next(
964 self: std::pin::Pin<&mut Self>,
965 _cx: &mut std::task::Context<'_>,
966 ) -> Poll<Option<Self::Item>> {
967 Poll::Pending
968 }
969 }
970}
971
972#[cfg(not(target_os = "macos"))]
973type DeviceChangeListener = noop_change_listener::NoopOutputDeviceChangelistener;