1use anyhow::{Context as _, Result};
2
3use audio::{AudioSettings, CHANNEL_COUNT, LEGACY_CHANNEL_COUNT, LEGACY_SAMPLE_RATE, SAMPLE_RATE};
4use cpal::traits::{DeviceTrait, StreamTrait as _};
5use futures::channel::mpsc::UnboundedSender;
6use futures::{Stream, StreamExt as _};
7use gpui::{
8 AsyncApp, BackgroundExecutor, ScreenCaptureFrame, ScreenCaptureSource, ScreenCaptureStream,
9 Task,
10};
11use libwebrtc::native::{apm, audio_mixer, audio_resampler};
12use livekit::track;
13
14use livekit::webrtc::{
15 audio_frame::AudioFrame,
16 audio_source::{AudioSourceOptions, RtcAudioSource, native::NativeAudioSource},
17 audio_stream::native::NativeAudioStream,
18 video_frame::{VideoBuffer, VideoFrame, VideoRotation},
19 video_source::{RtcVideoSource, VideoResolution, native::NativeVideoSource},
20 video_stream::native::NativeVideoStream,
21};
22use log::info;
23use parking_lot::Mutex;
24use rodio::Source;
25use serde::{Deserialize, Serialize};
26use settings::Settings;
27use std::cell::RefCell;
28use std::sync::Weak;
29use std::sync::atomic::{AtomicBool, AtomicI32, Ordering};
30use std::time::Duration;
31use std::{borrow::Cow, collections::VecDeque, sync::Arc, thread};
32use util::{ResultExt as _, maybe};
33
34mod source;
35
36pub(crate) struct AudioStack {
37 executor: BackgroundExecutor,
38 apm: Arc<Mutex<apm::AudioProcessingModule>>,
39 mixer: Arc<Mutex<audio_mixer::AudioMixer>>,
40 _output_task: RefCell<Weak<Task<()>>>,
41 next_ssrc: AtomicI32,
42}
43
44pub(crate) fn play_remote_audio_track(
45 track: &livekit::track::RemoteAudioTrack,
46 speaker: Speaker,
47 cx: &mut gpui::App,
48) -> Result<AudioStream> {
49 let stream = source::LiveKitStream::new(
50 cx.background_executor(),
51 track,
52 speaker.legacy_audio_compatible,
53 );
54
55 let stop_handle = Arc::new(AtomicBool::new(false));
56 let stop_handle_clone = stop_handle.clone();
57 let stream = stream
58 .stoppable()
59 .periodic_access(Duration::from_millis(50), move |s| {
60 if stop_handle.load(Ordering::Relaxed) {
61 s.stop();
62 }
63 });
64
65 audio::Audio::play_voip_stream(stream, speaker.name, speaker.is_staff, cx)
66 .context("Could not play audio")?;
67
68 let on_drop = util::defer(move || {
69 stop_handle_clone.store(true, Ordering::Relaxed);
70 });
71 Ok(AudioStream::Output {
72 _drop: Box::new(on_drop),
73 })
74}
75
76impl AudioStack {
77 pub(crate) fn new(executor: BackgroundExecutor) -> Self {
78 let apm = Arc::new(Mutex::new(apm::AudioProcessingModule::new(
79 true, true, true, true,
80 )));
81 let mixer = Arc::new(Mutex::new(audio_mixer::AudioMixer::new()));
82 Self {
83 executor,
84 apm,
85 mixer,
86 _output_task: RefCell::new(Weak::new()),
87 next_ssrc: AtomicI32::new(1),
88 }
89 }
90
91 pub(crate) fn play_remote_audio_track(
92 &self,
93 track: &livekit::track::RemoteAudioTrack,
94 ) -> AudioStream {
95 let output_task = self.start_output();
96
97 let next_ssrc = self.next_ssrc.fetch_add(1, Ordering::Relaxed);
98 let source = AudioMixerSource {
99 ssrc: next_ssrc,
100 sample_rate: SAMPLE_RATE.get(),
101 num_channels: CHANNEL_COUNT.get() as u32,
102 buffer: Arc::default(),
103 };
104 self.mixer.lock().add_source(source.clone());
105
106 let mut stream = NativeAudioStream::new(
107 track.rtc_track(),
108 source.sample_rate as i32,
109 source.num_channels as i32,
110 );
111
112 let receive_task = self.executor.spawn({
113 let source = source.clone();
114 async move {
115 while let Some(frame) = stream.next().await {
116 source.receive(frame);
117 }
118 }
119 });
120
121 let mixer = self.mixer.clone();
122 let on_drop = util::defer(move || {
123 mixer.lock().remove_source(source.ssrc);
124 drop(receive_task);
125 drop(output_task);
126 });
127
128 AudioStream::Output {
129 _drop: Box::new(on_drop),
130 }
131 }
132
133 fn start_output(&self) -> Arc<Task<()>> {
134 if let Some(task) = self._output_task.borrow().upgrade() {
135 return task;
136 }
137 let task = Arc::new(self.executor.spawn({
138 let apm = self.apm.clone();
139 let mixer = self.mixer.clone();
140 async move {
141 Self::play_output(apm, mixer, SAMPLE_RATE.get(), CHANNEL_COUNT.get().into())
142 .await
143 .log_err();
144 }
145 }));
146 *self._output_task.borrow_mut() = Arc::downgrade(&task);
147 task
148 }
149
150 pub(crate) fn capture_local_microphone_track(
151 &self,
152 user_name: String,
153 is_staff: bool,
154 cx: &AsyncApp,
155 ) -> Result<(crate::LocalAudioTrack, AudioStream)> {
156 let legacy_audio_compatible =
157 AudioSettings::try_read_global(cx, |setting| setting.legacy_audio_compatible)
158 .unwrap_or_default();
159
160 let source = if legacy_audio_compatible {
161 NativeAudioSource::new(
162 // n.b. this struct's options are always ignored, noise cancellation is provided by apm.
163 AudioSourceOptions::default(),
164 LEGACY_SAMPLE_RATE.get(),
165 LEGACY_CHANNEL_COUNT.get().into(),
166 10,
167 )
168 } else {
169 NativeAudioSource::new(
170 // n.b. this struct's options are always ignored, noise cancellation is provided by apm.
171 AudioSourceOptions::default(),
172 SAMPLE_RATE.get(),
173 CHANNEL_COUNT.get().into(),
174 10,
175 )
176 };
177
178 let track_name = serde_urlencoded::to_string(Speaker {
179 name: user_name,
180 is_staff,
181 legacy_audio_compatible,
182 })
183 .context("Could not encode user information in track name")?;
184
185 let track = track::LocalAudioTrack::create_audio_track(
186 &track_name,
187 RtcAudioSource::Native(source.clone()),
188 );
189
190 let apm = self.apm.clone();
191
192 let (frame_tx, mut frame_rx) = futures::channel::mpsc::unbounded();
193 let transmit_task = self.executor.spawn({
194 async move {
195 while let Some(frame) = frame_rx.next().await {
196 source.capture_frame(&frame).await.log_err();
197 }
198 }
199 });
200 let rodio_pipeline =
201 AudioSettings::try_read_global(cx, |setting| setting.rodio_audio).unwrap_or_default();
202 let capture_task = if rodio_pipeline {
203 info!("Using experimental.rodio_audio audio pipeline");
204 let voip_parts = audio::VoipParts::new(cx)?;
205 // Audio needs to run real-time and should never be paused. That is
206 // why we are using a normal std::thread and not a background task
207 thread::Builder::new()
208 .name("MicrophoneToLivekit".to_string())
209 .spawn(move || {
210 // microphone is non send on mac
211 let microphone = match audio::Audio::open_microphone(voip_parts) {
212 Ok(m) => m,
213 Err(e) => {
214 log::error!("Could not open microphone: {e}");
215 return;
216 }
217 };
218 send_to_livekit(frame_tx, microphone);
219 })
220 .expect("should be able to spawn threads");
221 Task::ready(Ok(()))
222 } else {
223 self.executor.spawn(async move {
224 Self::capture_input(
225 apm,
226 frame_tx,
227 LEGACY_SAMPLE_RATE.get(),
228 LEGACY_CHANNEL_COUNT.get().into(),
229 )
230 .await
231 })
232 };
233
234 let on_drop = util::defer(|| {
235 drop(transmit_task);
236 drop(capture_task);
237 });
238 Ok((
239 super::LocalAudioTrack(track),
240 AudioStream::Output {
241 _drop: Box::new(on_drop),
242 },
243 ))
244 }
245
246 async fn play_output(
247 apm: Arc<Mutex<apm::AudioProcessingModule>>,
248 mixer: Arc<Mutex<audio_mixer::AudioMixer>>,
249 sample_rate: u32,
250 num_channels: u32,
251 ) -> Result<()> {
252 loop {
253 let mut device_change_listener = DeviceChangeListener::new(false)?;
254 let (output_device, output_config) = crate::default_device(false)?;
255 let (end_on_drop_tx, end_on_drop_rx) = std::sync::mpsc::channel::<()>();
256 let mixer = mixer.clone();
257 let apm = apm.clone();
258 let mut resampler = audio_resampler::AudioResampler::default();
259 let mut buf = Vec::new();
260
261 thread::Builder::new()
262 .name("AudioPlayback".to_owned())
263 .spawn(move || {
264 let output_stream = output_device.build_output_stream(
265 &output_config.config(),
266 {
267 move |mut data, _info| {
268 while data.len() > 0 {
269 if data.len() <= buf.len() {
270 let rest = buf.split_off(data.len());
271 data.copy_from_slice(&buf);
272 buf = rest;
273 return;
274 }
275 if buf.len() > 0 {
276 let (prefix, suffix) = data.split_at_mut(buf.len());
277 prefix.copy_from_slice(&buf);
278 data = suffix;
279 }
280
281 let mut mixer = mixer.lock();
282 let mixed = mixer.mix(output_config.channels() as usize);
283 let sampled = resampler.remix_and_resample(
284 mixed,
285 sample_rate / 100,
286 num_channels,
287 sample_rate,
288 output_config.channels() as u32,
289 output_config.sample_rate().0,
290 );
291 buf = sampled.to_vec();
292 apm.lock()
293 .process_reverse_stream(
294 &mut buf,
295 output_config.sample_rate().0 as i32,
296 output_config.channels() as i32,
297 )
298 .ok();
299 }
300 }
301 },
302 |error| log::error!("error playing audio track: {:?}", error),
303 Some(Duration::from_millis(100)),
304 );
305
306 let Some(output_stream) = output_stream.log_err() else {
307 return;
308 };
309
310 output_stream.play().log_err();
311 // Block forever to keep the output stream alive
312 end_on_drop_rx.recv().ok();
313 })
314 .unwrap();
315
316 device_change_listener.next().await;
317 drop(end_on_drop_tx)
318 }
319 }
320
321 async fn capture_input(
322 apm: Arc<Mutex<apm::AudioProcessingModule>>,
323 frame_tx: UnboundedSender<AudioFrame<'static>>,
324 sample_rate: u32,
325 num_channels: u32,
326 ) -> Result<()> {
327 loop {
328 let mut device_change_listener = DeviceChangeListener::new(true)?;
329 let (device, config) = crate::default_device(true)?;
330 let (end_on_drop_tx, end_on_drop_rx) = std::sync::mpsc::channel::<()>();
331 let apm = apm.clone();
332 let frame_tx = frame_tx.clone();
333 let mut resampler = audio_resampler::AudioResampler::default();
334
335 thread::Builder::new()
336 .name("AudioCapture".to_owned())
337 .spawn(move || {
338 maybe!({
339 if let Some(name) = device.name().ok() {
340 log::info!("Using microphone: {}", name)
341 } else {
342 log::info!("Using microphone: <unknown>");
343 }
344
345 let ten_ms_buffer_size =
346 (config.channels() as u32 * config.sample_rate().0 / 100) as usize;
347 let mut buf: Vec<i16> = Vec::with_capacity(ten_ms_buffer_size);
348
349 let stream = device
350 .build_input_stream_raw(
351 &config.config(),
352 config.sample_format(),
353 move |data, _: &_| {
354 let data = crate::get_sample_data(config.sample_format(), data)
355 .log_err();
356 let Some(data) = data else {
357 return;
358 };
359 let mut data = data.as_slice();
360
361 while data.len() > 0 {
362 let remainder =
363 (buf.capacity() - buf.len()).min(data.len());
364 buf.extend_from_slice(&data[..remainder]);
365 data = &data[remainder..];
366
367 if buf.capacity() == buf.len() {
368 let mut sampled = resampler
369 .remix_and_resample(
370 buf.as_slice(),
371 config.sample_rate().0 / 100,
372 config.channels() as u32,
373 config.sample_rate().0,
374 num_channels,
375 sample_rate,
376 )
377 .to_owned();
378 apm.lock()
379 .process_stream(
380 &mut sampled,
381 sample_rate as i32,
382 num_channels as i32,
383 )
384 .log_err();
385 buf.clear();
386 frame_tx
387 .unbounded_send(AudioFrame {
388 data: Cow::Owned(sampled),
389 sample_rate,
390 num_channels,
391 samples_per_channel: sample_rate / 100,
392 })
393 .ok();
394 }
395 }
396 },
397 |err| log::error!("error capturing audio track: {:?}", err),
398 Some(Duration::from_millis(100)),
399 )
400 .context("failed to build input stream")?;
401
402 stream.play()?;
403 // Keep the thread alive and holding onto the `stream`
404 end_on_drop_rx.recv().ok();
405 anyhow::Ok(Some(()))
406 })
407 .log_err();
408 })
409 .unwrap();
410
411 device_change_listener.next().await;
412 drop(end_on_drop_tx)
413 }
414 }
415}
416
417#[derive(Serialize, Deserialize)]
418pub struct Speaker {
419 pub name: String,
420 pub is_staff: bool,
421 pub legacy_audio_compatible: bool,
422}
423
424fn send_to_livekit(frame_tx: UnboundedSender<AudioFrame<'static>>, mut microphone: impl Source) {
425 use cpal::Sample;
426 let sample_rate = microphone.sample_rate().get();
427 let num_channels = microphone.channels().get() as u32;
428 let buffer_size = sample_rate / 100 * num_channels;
429
430 loop {
431 let sampled: Vec<_> = microphone
432 .by_ref()
433 .take(buffer_size as usize)
434 .map(|s| s.to_sample())
435 .collect();
436
437 if frame_tx
438 .unbounded_send(AudioFrame {
439 sample_rate,
440 num_channels,
441 samples_per_channel: sampled.len() as u32 / num_channels,
442 data: Cow::Owned(sampled),
443 })
444 .is_err()
445 {
446 // must rx has dropped or is not consuming
447 break;
448 }
449 }
450}
451
452use super::LocalVideoTrack;
453
454pub enum AudioStream {
455 Input { _task: Task<()> },
456 Output { _drop: Box<dyn std::any::Any> },
457}
458
459pub(crate) async fn capture_local_video_track(
460 capture_source: &dyn ScreenCaptureSource,
461 cx: &mut gpui::AsyncApp,
462) -> Result<(crate::LocalVideoTrack, Box<dyn ScreenCaptureStream>)> {
463 let metadata = capture_source.metadata()?;
464 let track_source = gpui_tokio::Tokio::spawn(cx, async move {
465 NativeVideoSource::new(VideoResolution {
466 width: metadata.resolution.width.0 as u32,
467 height: metadata.resolution.height.0 as u32,
468 })
469 })?
470 .await?;
471
472 let capture_stream = capture_source
473 .stream(cx.foreground_executor(), {
474 let track_source = track_source.clone();
475 Box::new(move |frame| {
476 if let Some(buffer) = video_frame_buffer_to_webrtc(frame) {
477 track_source.capture_frame(&VideoFrame {
478 rotation: VideoRotation::VideoRotation0,
479 timestamp_us: 0,
480 buffer,
481 });
482 }
483 })
484 })
485 .await??;
486
487 Ok((
488 LocalVideoTrack(track::LocalVideoTrack::create_video_track(
489 "screen share",
490 RtcVideoSource::Native(track_source),
491 )),
492 capture_stream,
493 ))
494}
495
496#[derive(Clone)]
497struct AudioMixerSource {
498 ssrc: i32,
499 sample_rate: u32,
500 num_channels: u32,
501 buffer: Arc<Mutex<VecDeque<Vec<i16>>>>,
502}
503
504impl AudioMixerSource {
505 fn receive(&self, frame: AudioFrame) {
506 assert_eq!(
507 frame.data.len() as u32,
508 self.sample_rate * self.num_channels / 100
509 );
510
511 let mut buffer = self.buffer.lock();
512 buffer.push_back(frame.data.to_vec());
513 while buffer.len() > 10 {
514 buffer.pop_front();
515 }
516 }
517}
518
519impl libwebrtc::native::audio_mixer::AudioMixerSource for AudioMixerSource {
520 fn ssrc(&self) -> i32 {
521 self.ssrc
522 }
523
524 fn preferred_sample_rate(&self) -> u32 {
525 self.sample_rate
526 }
527
528 fn get_audio_frame_with_info<'a>(&self, target_sample_rate: u32) -> Option<AudioFrame<'_>> {
529 assert_eq!(self.sample_rate, target_sample_rate);
530 let buf = self.buffer.lock().pop_front()?;
531 Some(AudioFrame {
532 data: Cow::Owned(buf),
533 sample_rate: self.sample_rate,
534 num_channels: self.num_channels,
535 samples_per_channel: self.sample_rate / 100,
536 })
537 }
538}
539
540pub fn play_remote_video_track(
541 track: &crate::RemoteVideoTrack,
542) -> impl Stream<Item = RemoteVideoFrame> + use<> {
543 #[cfg(target_os = "macos")]
544 {
545 let mut pool = None;
546 let most_recent_frame_size = (0, 0);
547 NativeVideoStream::new(track.0.rtc_track()).filter_map(move |frame| {
548 if pool == None
549 || most_recent_frame_size != (frame.buffer.width(), frame.buffer.height())
550 {
551 pool = create_buffer_pool(frame.buffer.width(), frame.buffer.height()).log_err();
552 }
553 let pool = pool.clone();
554 async move {
555 if frame.buffer.width() < 10 && frame.buffer.height() < 10 {
556 // when the remote stops sharing, we get an 8x8 black image.
557 // In a lil bit, the unpublish will come through and close the view,
558 // but until then, don't flash black.
559 return None;
560 }
561
562 video_frame_buffer_from_webrtc(pool?, frame.buffer)
563 }
564 })
565 }
566 #[cfg(not(target_os = "macos"))]
567 {
568 NativeVideoStream::new(track.0.rtc_track())
569 .filter_map(|frame| async move { video_frame_buffer_from_webrtc(frame.buffer) })
570 }
571}
572
573#[cfg(target_os = "macos")]
574fn create_buffer_pool(
575 width: u32,
576 height: u32,
577) -> Result<core_video::pixel_buffer_pool::CVPixelBufferPool> {
578 use core_foundation::{base::TCFType, number::CFNumber, string::CFString};
579 use core_video::pixel_buffer;
580 use core_video::{
581 pixel_buffer::kCVPixelFormatType_420YpCbCr8BiPlanarFullRange,
582 pixel_buffer_io_surface::kCVPixelBufferIOSurfaceCoreAnimationCompatibilityKey,
583 pixel_buffer_pool::{self},
584 };
585
586 let width_key: CFString =
587 unsafe { CFString::wrap_under_get_rule(pixel_buffer::kCVPixelBufferWidthKey) };
588 let height_key: CFString =
589 unsafe { CFString::wrap_under_get_rule(pixel_buffer::kCVPixelBufferHeightKey) };
590 let animation_key: CFString = unsafe {
591 CFString::wrap_under_get_rule(kCVPixelBufferIOSurfaceCoreAnimationCompatibilityKey)
592 };
593 let format_key: CFString =
594 unsafe { CFString::wrap_under_get_rule(pixel_buffer::kCVPixelBufferPixelFormatTypeKey) };
595
596 let yes: CFNumber = 1.into();
597 let width: CFNumber = (width as i32).into();
598 let height: CFNumber = (height as i32).into();
599 let format: CFNumber = (kCVPixelFormatType_420YpCbCr8BiPlanarFullRange as i64).into();
600
601 let buffer_attributes = core_foundation::dictionary::CFDictionary::from_CFType_pairs(&[
602 (width_key, width.into_CFType()),
603 (height_key, height.into_CFType()),
604 (animation_key, yes.into_CFType()),
605 (format_key, format.into_CFType()),
606 ]);
607
608 pixel_buffer_pool::CVPixelBufferPool::new(None, Some(&buffer_attributes)).map_err(|cv_return| {
609 anyhow::anyhow!("failed to create pixel buffer pool: CVReturn({cv_return})",)
610 })
611}
612
613#[cfg(target_os = "macos")]
614pub type RemoteVideoFrame = core_video::pixel_buffer::CVPixelBuffer;
615
616#[cfg(target_os = "macos")]
617fn video_frame_buffer_from_webrtc(
618 pool: core_video::pixel_buffer_pool::CVPixelBufferPool,
619 buffer: Box<dyn VideoBuffer>,
620) -> Option<RemoteVideoFrame> {
621 use core_foundation::base::TCFType;
622 use core_video::{pixel_buffer::CVPixelBuffer, r#return::kCVReturnSuccess};
623 use livekit::webrtc::native::yuv_helper::i420_to_nv12;
624
625 if let Some(native) = buffer.as_native() {
626 let pixel_buffer = native.get_cv_pixel_buffer();
627 if pixel_buffer.is_null() {
628 return None;
629 }
630 return unsafe { Some(CVPixelBuffer::wrap_under_get_rule(pixel_buffer as _)) };
631 }
632
633 let i420_buffer = buffer.as_i420()?;
634 let pixel_buffer = pool.create_pixel_buffer().log_err()?;
635
636 let image_buffer = unsafe {
637 if pixel_buffer.lock_base_address(0) != kCVReturnSuccess {
638 return None;
639 }
640
641 let dst_y = pixel_buffer.get_base_address_of_plane(0);
642 let dst_y_stride = pixel_buffer.get_bytes_per_row_of_plane(0);
643 let dst_y_len = pixel_buffer.get_height_of_plane(0) * dst_y_stride;
644 let dst_uv = pixel_buffer.get_base_address_of_plane(1);
645 let dst_uv_stride = pixel_buffer.get_bytes_per_row_of_plane(1);
646 let dst_uv_len = pixel_buffer.get_height_of_plane(1) * dst_uv_stride;
647 let width = pixel_buffer.get_width();
648 let height = pixel_buffer.get_height();
649 let dst_y_buffer = std::slice::from_raw_parts_mut(dst_y as *mut u8, dst_y_len);
650 let dst_uv_buffer = std::slice::from_raw_parts_mut(dst_uv as *mut u8, dst_uv_len);
651
652 let (stride_y, stride_u, stride_v) = i420_buffer.strides();
653 let (src_y, src_u, src_v) = i420_buffer.data();
654 i420_to_nv12(
655 src_y,
656 stride_y,
657 src_u,
658 stride_u,
659 src_v,
660 stride_v,
661 dst_y_buffer,
662 dst_y_stride as u32,
663 dst_uv_buffer,
664 dst_uv_stride as u32,
665 width as i32,
666 height as i32,
667 );
668
669 if pixel_buffer.unlock_base_address(0) != kCVReturnSuccess {
670 return None;
671 }
672
673 pixel_buffer
674 };
675
676 Some(image_buffer)
677}
678
679#[cfg(not(target_os = "macos"))]
680pub type RemoteVideoFrame = Arc<gpui::RenderImage>;
681
682#[cfg(not(target_os = "macos"))]
683fn video_frame_buffer_from_webrtc(buffer: Box<dyn VideoBuffer>) -> Option<RemoteVideoFrame> {
684 use gpui::RenderImage;
685 use image::{Frame, RgbaImage};
686 use livekit::webrtc::prelude::VideoFormatType;
687 use smallvec::SmallVec;
688 use std::alloc::{Layout, alloc};
689
690 let width = buffer.width();
691 let height = buffer.height();
692 let stride = width * 4;
693 let byte_len = (stride * height) as usize;
694 let argb_image = unsafe {
695 // Motivation for this unsafe code is to avoid initializing the frame data, since to_argb
696 // will write all bytes anyway.
697 let start_ptr = alloc(Layout::array::<u8>(byte_len).log_err()?);
698 if start_ptr.is_null() {
699 return None;
700 }
701 let argb_frame_slice = std::slice::from_raw_parts_mut(start_ptr, byte_len);
702 buffer.to_argb(
703 VideoFormatType::ARGB,
704 argb_frame_slice,
705 stride,
706 width as i32,
707 height as i32,
708 );
709 Vec::from_raw_parts(start_ptr, byte_len, byte_len)
710 };
711
712 // TODO: Unclear why providing argb_image to RgbaImage works properly.
713 let image = RgbaImage::from_raw(width, height, argb_image)
714 .with_context(|| "Bug: not enough bytes allocated for image.")
715 .log_err()?;
716
717 Some(Arc::new(RenderImage::new(SmallVec::from_elem(
718 Frame::new(image),
719 1,
720 ))))
721}
722
723#[cfg(target_os = "macos")]
724fn video_frame_buffer_to_webrtc(frame: ScreenCaptureFrame) -> Option<impl AsRef<dyn VideoBuffer>> {
725 use livekit::webrtc;
726
727 let pixel_buffer = frame.0.as_concrete_TypeRef();
728 std::mem::forget(frame.0);
729 unsafe {
730 Some(webrtc::video_frame::native::NativeBuffer::from_cv_pixel_buffer(pixel_buffer as _))
731 }
732}
733
734#[cfg(not(target_os = "macos"))]
735fn video_frame_buffer_to_webrtc(frame: ScreenCaptureFrame) -> Option<impl AsRef<dyn VideoBuffer>> {
736 use libwebrtc::native::yuv_helper::{abgr_to_nv12, argb_to_nv12};
737 use livekit::webrtc::prelude::NV12Buffer;
738 match frame.0 {
739 scap::frame::Frame::BGRx(frame) => {
740 let mut buffer = NV12Buffer::new(frame.width as u32, frame.height as u32);
741 let (stride_y, stride_uv) = buffer.strides();
742 let (data_y, data_uv) = buffer.data_mut();
743 argb_to_nv12(
744 &frame.data,
745 frame.width as u32 * 4,
746 data_y,
747 stride_y,
748 data_uv,
749 stride_uv,
750 frame.width,
751 frame.height,
752 );
753 Some(buffer)
754 }
755 scap::frame::Frame::RGBx(frame) => {
756 let mut buffer = NV12Buffer::new(frame.width as u32, frame.height as u32);
757 let (stride_y, stride_uv) = buffer.strides();
758 let (data_y, data_uv) = buffer.data_mut();
759 abgr_to_nv12(
760 &frame.data,
761 frame.width as u32 * 4,
762 data_y,
763 stride_y,
764 data_uv,
765 stride_uv,
766 frame.width,
767 frame.height,
768 );
769 Some(buffer)
770 }
771 scap::frame::Frame::YUVFrame(yuvframe) => {
772 let mut buffer = NV12Buffer::with_strides(
773 yuvframe.width as u32,
774 yuvframe.height as u32,
775 yuvframe.luminance_stride as u32,
776 yuvframe.chrominance_stride as u32,
777 );
778 let (luminance, chrominance) = buffer.data_mut();
779 luminance.copy_from_slice(yuvframe.luminance_bytes.as_slice());
780 chrominance.copy_from_slice(yuvframe.chrominance_bytes.as_slice());
781 Some(buffer)
782 }
783 _ => {
784 log::error!(
785 "Expected BGRx or YUV frame from scap screen capture but got some other format."
786 );
787 None
788 }
789 }
790}
791
792trait DeviceChangeListenerApi: Stream<Item = ()> + Sized {
793 fn new(input: bool) -> Result<Self>;
794}
795
796#[cfg(target_os = "macos")]
797mod macos {
798
799 use coreaudio::sys::{
800 AudioObjectAddPropertyListener, AudioObjectID, AudioObjectPropertyAddress,
801 AudioObjectRemovePropertyListener, OSStatus, kAudioHardwarePropertyDefaultInputDevice,
802 kAudioHardwarePropertyDefaultOutputDevice, kAudioObjectPropertyElementMaster,
803 kAudioObjectPropertyScopeGlobal, kAudioObjectSystemObject,
804 };
805 use futures::{StreamExt, channel::mpsc::UnboundedReceiver};
806
807 /// Implementation from: https://github.com/zed-industries/cpal/blob/fd8bc2fd39f1f5fdee5a0690656caff9a26d9d50/src/host/coreaudio/macos/property_listener.rs#L15
808 pub struct CoreAudioDefaultDeviceChangeListener {
809 rx: UnboundedReceiver<()>,
810 callback: Box<PropertyListenerCallbackWrapper>,
811 input: bool,
812 device_id: AudioObjectID, // Store the device ID to properly remove listeners
813 }
814
815 trait _AssertSend: Send {}
816 impl _AssertSend for CoreAudioDefaultDeviceChangeListener {}
817
818 struct PropertyListenerCallbackWrapper(Box<dyn FnMut() + Send>);
819
820 unsafe extern "C" fn property_listener_handler_shim(
821 _: AudioObjectID,
822 _: u32,
823 _: *const AudioObjectPropertyAddress,
824 callback: *mut ::std::os::raw::c_void,
825 ) -> OSStatus {
826 let wrapper = callback as *mut PropertyListenerCallbackWrapper;
827 unsafe { (*wrapper).0() };
828 0
829 }
830
831 impl super::DeviceChangeListenerApi for CoreAudioDefaultDeviceChangeListener {
832 fn new(input: bool) -> anyhow::Result<Self> {
833 let (tx, rx) = futures::channel::mpsc::unbounded();
834
835 let callback = Box::new(PropertyListenerCallbackWrapper(Box::new(move || {
836 tx.unbounded_send(()).ok();
837 })));
838
839 // Get the current default device ID
840 let device_id = unsafe {
841 // Listen for default device changes
842 coreaudio::Error::from_os_status(AudioObjectAddPropertyListener(
843 kAudioObjectSystemObject,
844 &AudioObjectPropertyAddress {
845 mSelector: if input {
846 kAudioHardwarePropertyDefaultInputDevice
847 } else {
848 kAudioHardwarePropertyDefaultOutputDevice
849 },
850 mScope: kAudioObjectPropertyScopeGlobal,
851 mElement: kAudioObjectPropertyElementMaster,
852 },
853 Some(property_listener_handler_shim),
854 &*callback as *const _ as *mut _,
855 ))?;
856
857 // Also listen for changes to the device configuration
858 let device_id = if input {
859 let mut input_device: AudioObjectID = 0;
860 let mut prop_size = std::mem::size_of::<AudioObjectID>() as u32;
861 let result = coreaudio::sys::AudioObjectGetPropertyData(
862 kAudioObjectSystemObject,
863 &AudioObjectPropertyAddress {
864 mSelector: kAudioHardwarePropertyDefaultInputDevice,
865 mScope: kAudioObjectPropertyScopeGlobal,
866 mElement: kAudioObjectPropertyElementMaster,
867 },
868 0,
869 std::ptr::null(),
870 &mut prop_size as *mut _,
871 &mut input_device as *mut _ as *mut _,
872 );
873 if result != 0 {
874 log::warn!("Failed to get default input device ID");
875 0
876 } else {
877 input_device
878 }
879 } else {
880 let mut output_device: AudioObjectID = 0;
881 let mut prop_size = std::mem::size_of::<AudioObjectID>() as u32;
882 let result = coreaudio::sys::AudioObjectGetPropertyData(
883 kAudioObjectSystemObject,
884 &AudioObjectPropertyAddress {
885 mSelector: kAudioHardwarePropertyDefaultOutputDevice,
886 mScope: kAudioObjectPropertyScopeGlobal,
887 mElement: kAudioObjectPropertyElementMaster,
888 },
889 0,
890 std::ptr::null(),
891 &mut prop_size as *mut _,
892 &mut output_device as *mut _ as *mut _,
893 );
894 if result != 0 {
895 log::warn!("Failed to get default output device ID");
896 0
897 } else {
898 output_device
899 }
900 };
901
902 if device_id != 0 {
903 // Listen for format changes on the device
904 coreaudio::Error::from_os_status(AudioObjectAddPropertyListener(
905 device_id,
906 &AudioObjectPropertyAddress {
907 mSelector: coreaudio::sys::kAudioDevicePropertyStreamFormat,
908 mScope: if input {
909 coreaudio::sys::kAudioObjectPropertyScopeInput
910 } else {
911 coreaudio::sys::kAudioObjectPropertyScopeOutput
912 },
913 mElement: kAudioObjectPropertyElementMaster,
914 },
915 Some(property_listener_handler_shim),
916 &*callback as *const _ as *mut _,
917 ))?;
918 }
919
920 device_id
921 };
922
923 Ok(Self {
924 rx,
925 callback,
926 input,
927 device_id,
928 })
929 }
930 }
931
932 impl Drop for CoreAudioDefaultDeviceChangeListener {
933 fn drop(&mut self) {
934 unsafe {
935 // Remove the system-level property listener
936 AudioObjectRemovePropertyListener(
937 kAudioObjectSystemObject,
938 &AudioObjectPropertyAddress {
939 mSelector: if self.input {
940 kAudioHardwarePropertyDefaultInputDevice
941 } else {
942 kAudioHardwarePropertyDefaultOutputDevice
943 },
944 mScope: kAudioObjectPropertyScopeGlobal,
945 mElement: kAudioObjectPropertyElementMaster,
946 },
947 Some(property_listener_handler_shim),
948 &*self.callback as *const _ as *mut _,
949 );
950
951 // Remove the device-specific property listener if we have a valid device ID
952 if self.device_id != 0 {
953 AudioObjectRemovePropertyListener(
954 self.device_id,
955 &AudioObjectPropertyAddress {
956 mSelector: coreaudio::sys::kAudioDevicePropertyStreamFormat,
957 mScope: if self.input {
958 coreaudio::sys::kAudioObjectPropertyScopeInput
959 } else {
960 coreaudio::sys::kAudioObjectPropertyScopeOutput
961 },
962 mElement: kAudioObjectPropertyElementMaster,
963 },
964 Some(property_listener_handler_shim),
965 &*self.callback as *const _ as *mut _,
966 );
967 }
968 }
969 }
970 }
971
972 impl futures::Stream for CoreAudioDefaultDeviceChangeListener {
973 type Item = ();
974
975 fn poll_next(
976 mut self: std::pin::Pin<&mut Self>,
977 cx: &mut std::task::Context<'_>,
978 ) -> std::task::Poll<Option<Self::Item>> {
979 self.rx.poll_next_unpin(cx)
980 }
981 }
982}
983
984#[cfg(target_os = "macos")]
985type DeviceChangeListener = macos::CoreAudioDefaultDeviceChangeListener;
986
987#[cfg(not(target_os = "macos"))]
988mod noop_change_listener {
989 use std::task::Poll;
990
991 use super::DeviceChangeListenerApi;
992
993 pub struct NoopOutputDeviceChangelistener {}
994
995 impl DeviceChangeListenerApi for NoopOutputDeviceChangelistener {
996 fn new(_input: bool) -> anyhow::Result<Self> {
997 Ok(NoopOutputDeviceChangelistener {})
998 }
999 }
1000
1001 impl futures::Stream for NoopOutputDeviceChangelistener {
1002 type Item = ();
1003
1004 fn poll_next(
1005 self: std::pin::Pin<&mut Self>,
1006 _cx: &mut std::task::Context<'_>,
1007 ) -> Poll<Option<Self::Item>> {
1008 Poll::Pending
1009 }
1010 }
1011}
1012
1013#[cfg(not(target_os = "macos"))]
1014type DeviceChangeListener = noop_change_listener::NoopOutputDeviceChangelistener;