playback.rs

  1use anyhow::{Context as _, Result};
  2
  3use audio::{AudioSettings, CHANNEL_COUNT, SAMPLE_RATE};
  4use cpal::traits::{DeviceTrait, StreamTrait as _};
  5use futures::channel::mpsc::UnboundedSender;
  6use futures::{Stream, StreamExt as _};
  7use gpui::{
  8    AsyncApp, BackgroundExecutor, ScreenCaptureFrame, ScreenCaptureSource, ScreenCaptureStream,
  9    Task,
 10};
 11use libwebrtc::native::{apm, audio_mixer, audio_resampler};
 12use livekit::track;
 13
 14use livekit::webrtc::{
 15    audio_frame::AudioFrame,
 16    audio_source::{AudioSourceOptions, RtcAudioSource, native::NativeAudioSource},
 17    audio_stream::native::NativeAudioStream,
 18    video_frame::{VideoBuffer, VideoFrame, VideoRotation},
 19    video_source::{RtcVideoSource, VideoResolution, native::NativeVideoSource},
 20    video_stream::native::NativeVideoStream,
 21};
 22use log::info;
 23use parking_lot::Mutex;
 24use rodio::Source;
 25use serde::{Deserialize, Serialize};
 26use settings::Settings;
 27use std::cell::RefCell;
 28use std::sync::Weak;
 29use std::sync::atomic::{AtomicBool, AtomicI32, Ordering};
 30use std::time::Duration;
 31use std::{borrow::Cow, collections::VecDeque, sync::Arc, thread};
 32use util::{ResultExt as _, maybe};
 33
 34mod source;
 35
 36pub(crate) struct AudioStack {
 37    executor: BackgroundExecutor,
 38    apm: Arc<Mutex<apm::AudioProcessingModule>>,
 39    mixer: Arc<Mutex<audio_mixer::AudioMixer>>,
 40    _output_task: RefCell<Weak<Task<()>>>,
 41    next_ssrc: AtomicI32,
 42}
 43
 44pub(crate) fn play_remote_audio_track(
 45    track: &livekit::track::RemoteAudioTrack,
 46    cx: &mut gpui::App,
 47) -> Result<AudioStream> {
 48    let stop_handle = Arc::new(AtomicBool::new(false));
 49    let stop_handle_clone = stop_handle.clone();
 50    let stream = source::LiveKitStream::new(cx.background_executor(), track);
 51
 52    let stream = stream
 53        .stoppable()
 54        .periodic_access(Duration::from_millis(50), move |s| {
 55            if stop_handle.load(Ordering::Relaxed) {
 56                s.stop();
 57            }
 58        });
 59
 60    let speaker: Speaker = serde_urlencoded::from_str(&track.name()).unwrap_or_else(|_| Speaker {
 61        name: track.name(),
 62        is_staff: false,
 63    });
 64    audio::Audio::play_voip_stream(stream, speaker.name, speaker.is_staff, cx)
 65        .context("Could not play audio")?;
 66
 67    let on_drop = util::defer(move || {
 68        stop_handle_clone.store(true, Ordering::Relaxed);
 69    });
 70    Ok(AudioStream::Output {
 71        _drop: Box::new(on_drop),
 72    })
 73}
 74
 75impl AudioStack {
 76    pub(crate) fn new(executor: BackgroundExecutor) -> Self {
 77        let apm = Arc::new(Mutex::new(apm::AudioProcessingModule::new(
 78            true, true, true, true,
 79        )));
 80        let mixer = Arc::new(Mutex::new(audio_mixer::AudioMixer::new()));
 81        Self {
 82            executor,
 83            apm,
 84            mixer,
 85            _output_task: RefCell::new(Weak::new()),
 86            next_ssrc: AtomicI32::new(1),
 87        }
 88    }
 89
 90    pub(crate) fn play_remote_audio_track(
 91        &self,
 92        track: &livekit::track::RemoteAudioTrack,
 93    ) -> AudioStream {
 94        let output_task = self.start_output();
 95
 96        let next_ssrc = self.next_ssrc.fetch_add(1, Ordering::Relaxed);
 97        let source = AudioMixerSource {
 98            ssrc: next_ssrc,
 99            sample_rate: SAMPLE_RATE.get(),
100            num_channels: CHANNEL_COUNT.get() as u32,
101            buffer: Arc::default(),
102        };
103        self.mixer.lock().add_source(source.clone());
104
105        let mut stream = NativeAudioStream::new(
106            track.rtc_track(),
107            source.sample_rate as i32,
108            source.num_channels as i32,
109        );
110
111        let receive_task = self.executor.spawn({
112            let source = source.clone();
113            async move {
114                while let Some(frame) = stream.next().await {
115                    source.receive(frame);
116                }
117            }
118        });
119
120        let mixer = self.mixer.clone();
121        let on_drop = util::defer(move || {
122            mixer.lock().remove_source(source.ssrc);
123            drop(receive_task);
124            drop(output_task);
125        });
126
127        AudioStream::Output {
128            _drop: Box::new(on_drop),
129        }
130    }
131
132    fn start_output(&self) -> Arc<Task<()>> {
133        if let Some(task) = self._output_task.borrow().upgrade() {
134            return task;
135        }
136        let task = Arc::new(self.executor.spawn({
137            let apm = self.apm.clone();
138            let mixer = self.mixer.clone();
139            async move {
140                Self::play_output(apm, mixer, SAMPLE_RATE.get(), CHANNEL_COUNT.get().into())
141                    .await
142                    .log_err();
143            }
144        }));
145        *self._output_task.borrow_mut() = Arc::downgrade(&task);
146        task
147    }
148
149    pub(crate) fn capture_local_microphone_track(
150        &self,
151        user_name: String,
152        is_staff: bool,
153        cx: &AsyncApp,
154    ) -> Result<(crate::LocalAudioTrack, AudioStream)> {
155        let source = NativeAudioSource::new(
156            // n.b. this struct's options are always ignored, noise cancellation is provided by apm.
157            AudioSourceOptions::default(),
158            SAMPLE_RATE.get(),
159            CHANNEL_COUNT.get().into(),
160            10,
161        );
162
163        let track_name = serde_urlencoded::to_string(Speaker {
164            name: user_name,
165            is_staff,
166        })
167        .context("Could not encode user information in track name")?;
168
169        let track = track::LocalAudioTrack::create_audio_track(
170            &track_name,
171            RtcAudioSource::Native(source.clone()),
172        );
173
174        let apm = self.apm.clone();
175
176        let (frame_tx, mut frame_rx) = futures::channel::mpsc::unbounded();
177        let transmit_task = self.executor.spawn({
178            async move {
179                while let Some(frame) = frame_rx.next().await {
180                    source.capture_frame(&frame).await.log_err();
181                }
182            }
183        });
184        let rodio_pipeline =
185            AudioSettings::try_read_global(cx, |setting| setting.rodio_audio).unwrap_or_default();
186        let capture_task = if rodio_pipeline {
187            info!("Using experimental.rodio_audio audio pipeline");
188            let voip_parts = audio::VoipParts::new(cx)?;
189            // Audio needs to run real-time and should never be paused. That is why we are using a
190            // normal std::thread and not a background task
191            thread::Builder::new()
192                .name("AudioCapture".to_string())
193                .spawn(move || {
194                    // microphone is non send on mac
195                    let microphone = audio::Audio::open_microphone(voip_parts)?;
196                    send_to_livekit(frame_tx, microphone);
197                    Ok::<(), anyhow::Error>(())
198                })
199                .unwrap();
200            Task::ready(Ok(()))
201        } else {
202            self.executor.spawn(async move {
203                Self::capture_input(apm, frame_tx, SAMPLE_RATE.get(), CHANNEL_COUNT.get().into())
204                    .await
205            })
206        };
207
208        let on_drop = util::defer(|| {
209            drop(transmit_task);
210            drop(capture_task);
211        });
212        Ok((
213            super::LocalAudioTrack(track),
214            AudioStream::Output {
215                _drop: Box::new(on_drop),
216            },
217        ))
218    }
219
220    async fn play_output(
221        apm: Arc<Mutex<apm::AudioProcessingModule>>,
222        mixer: Arc<Mutex<audio_mixer::AudioMixer>>,
223        sample_rate: u32,
224        num_channels: u32,
225    ) -> Result<()> {
226        loop {
227            let mut device_change_listener = DeviceChangeListener::new(false)?;
228            let (output_device, output_config) = crate::default_device(false)?;
229            let (end_on_drop_tx, end_on_drop_rx) = std::sync::mpsc::channel::<()>();
230            let mixer = mixer.clone();
231            let apm = apm.clone();
232            let mut resampler = audio_resampler::AudioResampler::default();
233            let mut buf = Vec::new();
234
235            thread::Builder::new()
236                .name("AudioPlayback".to_owned())
237                .spawn(move || {
238                    let output_stream = output_device.build_output_stream(
239                        &output_config.config(),
240                        {
241                            move |mut data, _info| {
242                                while data.len() > 0 {
243                                    if data.len() <= buf.len() {
244                                        let rest = buf.split_off(data.len());
245                                        data.copy_from_slice(&buf);
246                                        buf = rest;
247                                        return;
248                                    }
249                                    if buf.len() > 0 {
250                                        let (prefix, suffix) = data.split_at_mut(buf.len());
251                                        prefix.copy_from_slice(&buf);
252                                        data = suffix;
253                                    }
254
255                                    let mut mixer = mixer.lock();
256                                    let mixed = mixer.mix(output_config.channels() as usize);
257                                    let sampled = resampler.remix_and_resample(
258                                        mixed,
259                                        sample_rate / 100,
260                                        num_channels,
261                                        sample_rate,
262                                        output_config.channels() as u32,
263                                        output_config.sample_rate().0,
264                                    );
265                                    buf = sampled.to_vec();
266                                    apm.lock()
267                                        .process_reverse_stream(
268                                            &mut buf,
269                                            output_config.sample_rate().0 as i32,
270                                            output_config.channels() as i32,
271                                        )
272                                        .ok();
273                                }
274                            }
275                        },
276                        |error| log::error!("error playing audio track: {:?}", error),
277                        Some(Duration::from_millis(100)),
278                    );
279
280                    let Some(output_stream) = output_stream.log_err() else {
281                        return;
282                    };
283
284                    output_stream.play().log_err();
285                    // Block forever to keep the output stream alive
286                    end_on_drop_rx.recv().ok();
287                })
288                .unwrap();
289
290            device_change_listener.next().await;
291            drop(end_on_drop_tx)
292        }
293    }
294
295    async fn capture_input(
296        apm: Arc<Mutex<apm::AudioProcessingModule>>,
297        frame_tx: UnboundedSender<AudioFrame<'static>>,
298        sample_rate: u32,
299        num_channels: u32,
300    ) -> Result<()> {
301        loop {
302            let mut device_change_listener = DeviceChangeListener::new(true)?;
303            let (device, config) = crate::default_device(true)?;
304            let (end_on_drop_tx, end_on_drop_rx) = std::sync::mpsc::channel::<()>();
305            let apm = apm.clone();
306            let frame_tx = frame_tx.clone();
307            let mut resampler = audio_resampler::AudioResampler::default();
308
309            thread::Builder::new()
310                .name("AudioCapture".to_owned())
311                .spawn(move || {
312                    maybe!({
313                        if let Some(name) = device.name().ok() {
314                            log::info!("Using microphone: {}", name)
315                        } else {
316                            log::info!("Using microphone: <unknown>");
317                        }
318
319                        let ten_ms_buffer_size =
320                            (config.channels() as u32 * config.sample_rate().0 / 100) as usize;
321                        let mut buf: Vec<i16> = Vec::with_capacity(ten_ms_buffer_size);
322
323                        let stream = device
324                            .build_input_stream_raw(
325                                &config.config(),
326                                config.sample_format(),
327                                move |data, _: &_| {
328                                    let data = crate::get_sample_data(config.sample_format(), data)
329                                        .log_err();
330                                    let Some(data) = data else {
331                                        return;
332                                    };
333                                    let mut data = data.as_slice();
334
335                                    while data.len() > 0 {
336                                        let remainder =
337                                            (buf.capacity() - buf.len()).min(data.len());
338                                        buf.extend_from_slice(&data[..remainder]);
339                                        data = &data[remainder..];
340
341                                        if buf.capacity() == buf.len() {
342                                            let mut sampled = resampler
343                                                .remix_and_resample(
344                                                    buf.as_slice(),
345                                                    config.sample_rate().0 / 100,
346                                                    config.channels() as u32,
347                                                    config.sample_rate().0,
348                                                    num_channels,
349                                                    sample_rate,
350                                                )
351                                                .to_owned();
352                                            apm.lock()
353                                                .process_stream(
354                                                    &mut sampled,
355                                                    sample_rate as i32,
356                                                    num_channels as i32,
357                                                )
358                                                .log_err();
359                                            buf.clear();
360                                            frame_tx
361                                                .unbounded_send(AudioFrame {
362                                                    data: Cow::Owned(sampled),
363                                                    sample_rate,
364                                                    num_channels,
365                                                    samples_per_channel: sample_rate / 100,
366                                                })
367                                                .ok();
368                                        }
369                                    }
370                                },
371                                |err| log::error!("error capturing audio track: {:?}", err),
372                                Some(Duration::from_millis(100)),
373                            )
374                            .context("failed to build input stream")?;
375
376                        stream.play()?;
377                        // Keep the thread alive and holding onto the `stream`
378                        end_on_drop_rx.recv().ok();
379                        anyhow::Ok(Some(()))
380                    })
381                    .log_err();
382                })
383                .unwrap();
384
385            device_change_listener.next().await;
386            drop(end_on_drop_tx)
387        }
388    }
389}
390
391#[derive(Serialize, Deserialize)]
392struct Speaker {
393    name: String,
394    is_staff: bool,
395}
396
397fn send_to_livekit(frame_tx: UnboundedSender<AudioFrame<'static>>, mut microphone: impl Source) {
398    use cpal::Sample;
399    loop {
400        let sampled: Vec<_> = microphone
401            .by_ref()
402            .take(audio::BUFFER_SIZE)
403            .map(|s| s.to_sample())
404            .collect();
405
406        if frame_tx
407            .unbounded_send(AudioFrame {
408                sample_rate: SAMPLE_RATE.get(),
409                num_channels: CHANNEL_COUNT.get() as u32,
410                samples_per_channel: sampled.len() as u32 / CHANNEL_COUNT.get() as u32,
411                data: Cow::Owned(sampled),
412            })
413            .is_err()
414        {
415            // must rx has dropped or is not consuming
416            break;
417        }
418    }
419}
420
421use super::LocalVideoTrack;
422
423pub enum AudioStream {
424    Input { _task: Task<()> },
425    Output { _drop: Box<dyn std::any::Any> },
426}
427
428pub(crate) async fn capture_local_video_track(
429    capture_source: &dyn ScreenCaptureSource,
430    cx: &mut gpui::AsyncApp,
431) -> Result<(crate::LocalVideoTrack, Box<dyn ScreenCaptureStream>)> {
432    let metadata = capture_source.metadata()?;
433    let track_source = gpui_tokio::Tokio::spawn(cx, async move {
434        NativeVideoSource::new(VideoResolution {
435            width: metadata.resolution.width.0 as u32,
436            height: metadata.resolution.height.0 as u32,
437        })
438    })?
439    .await?;
440
441    let capture_stream = capture_source
442        .stream(cx.foreground_executor(), {
443            let track_source = track_source.clone();
444            Box::new(move |frame| {
445                if let Some(buffer) = video_frame_buffer_to_webrtc(frame) {
446                    track_source.capture_frame(&VideoFrame {
447                        rotation: VideoRotation::VideoRotation0,
448                        timestamp_us: 0,
449                        buffer,
450                    });
451                }
452            })
453        })
454        .await??;
455
456    Ok((
457        LocalVideoTrack(track::LocalVideoTrack::create_video_track(
458            "screen share",
459            RtcVideoSource::Native(track_source),
460        )),
461        capture_stream,
462    ))
463}
464
465#[derive(Clone)]
466struct AudioMixerSource {
467    ssrc: i32,
468    sample_rate: u32,
469    num_channels: u32,
470    buffer: Arc<Mutex<VecDeque<Vec<i16>>>>,
471}
472
473impl AudioMixerSource {
474    fn receive(&self, frame: AudioFrame) {
475        assert_eq!(
476            frame.data.len() as u32,
477            self.sample_rate * self.num_channels / 100
478        );
479
480        let mut buffer = self.buffer.lock();
481        buffer.push_back(frame.data.to_vec());
482        while buffer.len() > 10 {
483            buffer.pop_front();
484        }
485    }
486}
487
488impl libwebrtc::native::audio_mixer::AudioMixerSource for AudioMixerSource {
489    fn ssrc(&self) -> i32 {
490        self.ssrc
491    }
492
493    fn preferred_sample_rate(&self) -> u32 {
494        self.sample_rate
495    }
496
497    fn get_audio_frame_with_info<'a>(&self, target_sample_rate: u32) -> Option<AudioFrame<'_>> {
498        assert_eq!(self.sample_rate, target_sample_rate);
499        let buf = self.buffer.lock().pop_front()?;
500        Some(AudioFrame {
501            data: Cow::Owned(buf),
502            sample_rate: self.sample_rate,
503            num_channels: self.num_channels,
504            samples_per_channel: self.sample_rate / 100,
505        })
506    }
507}
508
509pub fn play_remote_video_track(
510    track: &crate::RemoteVideoTrack,
511) -> impl Stream<Item = RemoteVideoFrame> + use<> {
512    #[cfg(target_os = "macos")]
513    {
514        let mut pool = None;
515        let most_recent_frame_size = (0, 0);
516        NativeVideoStream::new(track.0.rtc_track()).filter_map(move |frame| {
517            if pool == None
518                || most_recent_frame_size != (frame.buffer.width(), frame.buffer.height())
519            {
520                pool = create_buffer_pool(frame.buffer.width(), frame.buffer.height()).log_err();
521            }
522            let pool = pool.clone();
523            async move {
524                if frame.buffer.width() < 10 && frame.buffer.height() < 10 {
525                    // when the remote stops sharing, we get an 8x8 black image.
526                    // In a lil bit, the unpublish will come through and close the view,
527                    // but until then, don't flash black.
528                    return None;
529                }
530
531                video_frame_buffer_from_webrtc(pool?, frame.buffer)
532            }
533        })
534    }
535    #[cfg(not(target_os = "macos"))]
536    {
537        NativeVideoStream::new(track.0.rtc_track())
538            .filter_map(|frame| async move { video_frame_buffer_from_webrtc(frame.buffer) })
539    }
540}
541
542#[cfg(target_os = "macos")]
543fn create_buffer_pool(
544    width: u32,
545    height: u32,
546) -> Result<core_video::pixel_buffer_pool::CVPixelBufferPool> {
547    use core_foundation::{base::TCFType, number::CFNumber, string::CFString};
548    use core_video::pixel_buffer;
549    use core_video::{
550        pixel_buffer::kCVPixelFormatType_420YpCbCr8BiPlanarFullRange,
551        pixel_buffer_io_surface::kCVPixelBufferIOSurfaceCoreAnimationCompatibilityKey,
552        pixel_buffer_pool::{self},
553    };
554
555    let width_key: CFString =
556        unsafe { CFString::wrap_under_get_rule(pixel_buffer::kCVPixelBufferWidthKey) };
557    let height_key: CFString =
558        unsafe { CFString::wrap_under_get_rule(pixel_buffer::kCVPixelBufferHeightKey) };
559    let animation_key: CFString = unsafe {
560        CFString::wrap_under_get_rule(kCVPixelBufferIOSurfaceCoreAnimationCompatibilityKey)
561    };
562    let format_key: CFString =
563        unsafe { CFString::wrap_under_get_rule(pixel_buffer::kCVPixelBufferPixelFormatTypeKey) };
564
565    let yes: CFNumber = 1.into();
566    let width: CFNumber = (width as i32).into();
567    let height: CFNumber = (height as i32).into();
568    let format: CFNumber = (kCVPixelFormatType_420YpCbCr8BiPlanarFullRange as i64).into();
569
570    let buffer_attributes = core_foundation::dictionary::CFDictionary::from_CFType_pairs(&[
571        (width_key, width.into_CFType()),
572        (height_key, height.into_CFType()),
573        (animation_key, yes.into_CFType()),
574        (format_key, format.into_CFType()),
575    ]);
576
577    pixel_buffer_pool::CVPixelBufferPool::new(None, Some(&buffer_attributes)).map_err(|cv_return| {
578        anyhow::anyhow!("failed to create pixel buffer pool: CVReturn({cv_return})",)
579    })
580}
581
582#[cfg(target_os = "macos")]
583pub type RemoteVideoFrame = core_video::pixel_buffer::CVPixelBuffer;
584
585#[cfg(target_os = "macos")]
586fn video_frame_buffer_from_webrtc(
587    pool: core_video::pixel_buffer_pool::CVPixelBufferPool,
588    buffer: Box<dyn VideoBuffer>,
589) -> Option<RemoteVideoFrame> {
590    use core_foundation::base::TCFType;
591    use core_video::{pixel_buffer::CVPixelBuffer, r#return::kCVReturnSuccess};
592    use livekit::webrtc::native::yuv_helper::i420_to_nv12;
593
594    if let Some(native) = buffer.as_native() {
595        let pixel_buffer = native.get_cv_pixel_buffer();
596        if pixel_buffer.is_null() {
597            return None;
598        }
599        return unsafe { Some(CVPixelBuffer::wrap_under_get_rule(pixel_buffer as _)) };
600    }
601
602    let i420_buffer = buffer.as_i420()?;
603    let pixel_buffer = pool.create_pixel_buffer().log_err()?;
604
605    let image_buffer = unsafe {
606        if pixel_buffer.lock_base_address(0) != kCVReturnSuccess {
607            return None;
608        }
609
610        let dst_y = pixel_buffer.get_base_address_of_plane(0);
611        let dst_y_stride = pixel_buffer.get_bytes_per_row_of_plane(0);
612        let dst_y_len = pixel_buffer.get_height_of_plane(0) * dst_y_stride;
613        let dst_uv = pixel_buffer.get_base_address_of_plane(1);
614        let dst_uv_stride = pixel_buffer.get_bytes_per_row_of_plane(1);
615        let dst_uv_len = pixel_buffer.get_height_of_plane(1) * dst_uv_stride;
616        let width = pixel_buffer.get_width();
617        let height = pixel_buffer.get_height();
618        let dst_y_buffer = std::slice::from_raw_parts_mut(dst_y as *mut u8, dst_y_len);
619        let dst_uv_buffer = std::slice::from_raw_parts_mut(dst_uv as *mut u8, dst_uv_len);
620
621        let (stride_y, stride_u, stride_v) = i420_buffer.strides();
622        let (src_y, src_u, src_v) = i420_buffer.data();
623        i420_to_nv12(
624            src_y,
625            stride_y,
626            src_u,
627            stride_u,
628            src_v,
629            stride_v,
630            dst_y_buffer,
631            dst_y_stride as u32,
632            dst_uv_buffer,
633            dst_uv_stride as u32,
634            width as i32,
635            height as i32,
636        );
637
638        if pixel_buffer.unlock_base_address(0) != kCVReturnSuccess {
639            return None;
640        }
641
642        pixel_buffer
643    };
644
645    Some(image_buffer)
646}
647
648#[cfg(not(target_os = "macos"))]
649pub type RemoteVideoFrame = Arc<gpui::RenderImage>;
650
651#[cfg(not(target_os = "macos"))]
652fn video_frame_buffer_from_webrtc(buffer: Box<dyn VideoBuffer>) -> Option<RemoteVideoFrame> {
653    use gpui::RenderImage;
654    use image::{Frame, RgbaImage};
655    use livekit::webrtc::prelude::VideoFormatType;
656    use smallvec::SmallVec;
657    use std::alloc::{Layout, alloc};
658
659    let width = buffer.width();
660    let height = buffer.height();
661    let stride = width * 4;
662    let byte_len = (stride * height) as usize;
663    let argb_image = unsafe {
664        // Motivation for this unsafe code is to avoid initializing the frame data, since to_argb
665        // will write all bytes anyway.
666        let start_ptr = alloc(Layout::array::<u8>(byte_len).log_err()?);
667        if start_ptr.is_null() {
668            return None;
669        }
670        let argb_frame_slice = std::slice::from_raw_parts_mut(start_ptr, byte_len);
671        buffer.to_argb(
672            VideoFormatType::ARGB,
673            argb_frame_slice,
674            stride,
675            width as i32,
676            height as i32,
677        );
678        Vec::from_raw_parts(start_ptr, byte_len, byte_len)
679    };
680
681    // TODO: Unclear why providing argb_image to RgbaImage works properly.
682    let image = RgbaImage::from_raw(width, height, argb_image)
683        .with_context(|| "Bug: not enough bytes allocated for image.")
684        .log_err()?;
685
686    Some(Arc::new(RenderImage::new(SmallVec::from_elem(
687        Frame::new(image),
688        1,
689    ))))
690}
691
692#[cfg(target_os = "macos")]
693fn video_frame_buffer_to_webrtc(frame: ScreenCaptureFrame) -> Option<impl AsRef<dyn VideoBuffer>> {
694    use livekit::webrtc;
695
696    let pixel_buffer = frame.0.as_concrete_TypeRef();
697    std::mem::forget(frame.0);
698    unsafe {
699        Some(webrtc::video_frame::native::NativeBuffer::from_cv_pixel_buffer(pixel_buffer as _))
700    }
701}
702
703#[cfg(not(target_os = "macos"))]
704fn video_frame_buffer_to_webrtc(frame: ScreenCaptureFrame) -> Option<impl AsRef<dyn VideoBuffer>> {
705    use libwebrtc::native::yuv_helper::{abgr_to_nv12, argb_to_nv12};
706    use livekit::webrtc::prelude::NV12Buffer;
707    match frame.0 {
708        scap::frame::Frame::BGRx(frame) => {
709            let mut buffer = NV12Buffer::new(frame.width as u32, frame.height as u32);
710            let (stride_y, stride_uv) = buffer.strides();
711            let (data_y, data_uv) = buffer.data_mut();
712            argb_to_nv12(
713                &frame.data,
714                frame.width as u32 * 4,
715                data_y,
716                stride_y,
717                data_uv,
718                stride_uv,
719                frame.width,
720                frame.height,
721            );
722            Some(buffer)
723        }
724        scap::frame::Frame::RGBx(frame) => {
725            let mut buffer = NV12Buffer::new(frame.width as u32, frame.height as u32);
726            let (stride_y, stride_uv) = buffer.strides();
727            let (data_y, data_uv) = buffer.data_mut();
728            abgr_to_nv12(
729                &frame.data,
730                frame.width as u32 * 4,
731                data_y,
732                stride_y,
733                data_uv,
734                stride_uv,
735                frame.width,
736                frame.height,
737            );
738            Some(buffer)
739        }
740        scap::frame::Frame::YUVFrame(yuvframe) => {
741            let mut buffer = NV12Buffer::with_strides(
742                yuvframe.width as u32,
743                yuvframe.height as u32,
744                yuvframe.luminance_stride as u32,
745                yuvframe.chrominance_stride as u32,
746            );
747            let (luminance, chrominance) = buffer.data_mut();
748            luminance.copy_from_slice(yuvframe.luminance_bytes.as_slice());
749            chrominance.copy_from_slice(yuvframe.chrominance_bytes.as_slice());
750            Some(buffer)
751        }
752        _ => {
753            log::error!(
754                "Expected BGRx or YUV frame from scap screen capture but got some other format."
755            );
756            None
757        }
758    }
759}
760
761trait DeviceChangeListenerApi: Stream<Item = ()> + Sized {
762    fn new(input: bool) -> Result<Self>;
763}
764
765#[cfg(target_os = "macos")]
766mod macos {
767
768    use coreaudio::sys::{
769        AudioObjectAddPropertyListener, AudioObjectID, AudioObjectPropertyAddress,
770        AudioObjectRemovePropertyListener, OSStatus, kAudioHardwarePropertyDefaultInputDevice,
771        kAudioHardwarePropertyDefaultOutputDevice, kAudioObjectPropertyElementMaster,
772        kAudioObjectPropertyScopeGlobal, kAudioObjectSystemObject,
773    };
774    use futures::{StreamExt, channel::mpsc::UnboundedReceiver};
775
776    /// Implementation from: https://github.com/zed-industries/cpal/blob/fd8bc2fd39f1f5fdee5a0690656caff9a26d9d50/src/host/coreaudio/macos/property_listener.rs#L15
777    pub struct CoreAudioDefaultDeviceChangeListener {
778        rx: UnboundedReceiver<()>,
779        callback: Box<PropertyListenerCallbackWrapper>,
780        input: bool,
781        device_id: AudioObjectID, // Store the device ID to properly remove listeners
782    }
783
784    trait _AssertSend: Send {}
785    impl _AssertSend for CoreAudioDefaultDeviceChangeListener {}
786
787    struct PropertyListenerCallbackWrapper(Box<dyn FnMut() + Send>);
788
789    unsafe extern "C" fn property_listener_handler_shim(
790        _: AudioObjectID,
791        _: u32,
792        _: *const AudioObjectPropertyAddress,
793        callback: *mut ::std::os::raw::c_void,
794    ) -> OSStatus {
795        let wrapper = callback as *mut PropertyListenerCallbackWrapper;
796        unsafe { (*wrapper).0() };
797        0
798    }
799
800    impl super::DeviceChangeListenerApi for CoreAudioDefaultDeviceChangeListener {
801        fn new(input: bool) -> anyhow::Result<Self> {
802            let (tx, rx) = futures::channel::mpsc::unbounded();
803
804            let callback = Box::new(PropertyListenerCallbackWrapper(Box::new(move || {
805                tx.unbounded_send(()).ok();
806            })));
807
808            // Get the current default device ID
809            let device_id = unsafe {
810                // Listen for default device changes
811                coreaudio::Error::from_os_status(AudioObjectAddPropertyListener(
812                    kAudioObjectSystemObject,
813                    &AudioObjectPropertyAddress {
814                        mSelector: if input {
815                            kAudioHardwarePropertyDefaultInputDevice
816                        } else {
817                            kAudioHardwarePropertyDefaultOutputDevice
818                        },
819                        mScope: kAudioObjectPropertyScopeGlobal,
820                        mElement: kAudioObjectPropertyElementMaster,
821                    },
822                    Some(property_listener_handler_shim),
823                    &*callback as *const _ as *mut _,
824                ))?;
825
826                // Also listen for changes to the device configuration
827                let device_id = if input {
828                    let mut input_device: AudioObjectID = 0;
829                    let mut prop_size = std::mem::size_of::<AudioObjectID>() as u32;
830                    let result = coreaudio::sys::AudioObjectGetPropertyData(
831                        kAudioObjectSystemObject,
832                        &AudioObjectPropertyAddress {
833                            mSelector: kAudioHardwarePropertyDefaultInputDevice,
834                            mScope: kAudioObjectPropertyScopeGlobal,
835                            mElement: kAudioObjectPropertyElementMaster,
836                        },
837                        0,
838                        std::ptr::null(),
839                        &mut prop_size as *mut _,
840                        &mut input_device as *mut _ as *mut _,
841                    );
842                    if result != 0 {
843                        log::warn!("Failed to get default input device ID");
844                        0
845                    } else {
846                        input_device
847                    }
848                } else {
849                    let mut output_device: AudioObjectID = 0;
850                    let mut prop_size = std::mem::size_of::<AudioObjectID>() as u32;
851                    let result = coreaudio::sys::AudioObjectGetPropertyData(
852                        kAudioObjectSystemObject,
853                        &AudioObjectPropertyAddress {
854                            mSelector: kAudioHardwarePropertyDefaultOutputDevice,
855                            mScope: kAudioObjectPropertyScopeGlobal,
856                            mElement: kAudioObjectPropertyElementMaster,
857                        },
858                        0,
859                        std::ptr::null(),
860                        &mut prop_size as *mut _,
861                        &mut output_device as *mut _ as *mut _,
862                    );
863                    if result != 0 {
864                        log::warn!("Failed to get default output device ID");
865                        0
866                    } else {
867                        output_device
868                    }
869                };
870
871                if device_id != 0 {
872                    // Listen for format changes on the device
873                    coreaudio::Error::from_os_status(AudioObjectAddPropertyListener(
874                        device_id,
875                        &AudioObjectPropertyAddress {
876                            mSelector: coreaudio::sys::kAudioDevicePropertyStreamFormat,
877                            mScope: if input {
878                                coreaudio::sys::kAudioObjectPropertyScopeInput
879                            } else {
880                                coreaudio::sys::kAudioObjectPropertyScopeOutput
881                            },
882                            mElement: kAudioObjectPropertyElementMaster,
883                        },
884                        Some(property_listener_handler_shim),
885                        &*callback as *const _ as *mut _,
886                    ))?;
887                }
888
889                device_id
890            };
891
892            Ok(Self {
893                rx,
894                callback,
895                input,
896                device_id,
897            })
898        }
899    }
900
901    impl Drop for CoreAudioDefaultDeviceChangeListener {
902        fn drop(&mut self) {
903            unsafe {
904                // Remove the system-level property listener
905                AudioObjectRemovePropertyListener(
906                    kAudioObjectSystemObject,
907                    &AudioObjectPropertyAddress {
908                        mSelector: if self.input {
909                            kAudioHardwarePropertyDefaultInputDevice
910                        } else {
911                            kAudioHardwarePropertyDefaultOutputDevice
912                        },
913                        mScope: kAudioObjectPropertyScopeGlobal,
914                        mElement: kAudioObjectPropertyElementMaster,
915                    },
916                    Some(property_listener_handler_shim),
917                    &*self.callback as *const _ as *mut _,
918                );
919
920                // Remove the device-specific property listener if we have a valid device ID
921                if self.device_id != 0 {
922                    AudioObjectRemovePropertyListener(
923                        self.device_id,
924                        &AudioObjectPropertyAddress {
925                            mSelector: coreaudio::sys::kAudioDevicePropertyStreamFormat,
926                            mScope: if self.input {
927                                coreaudio::sys::kAudioObjectPropertyScopeInput
928                            } else {
929                                coreaudio::sys::kAudioObjectPropertyScopeOutput
930                            },
931                            mElement: kAudioObjectPropertyElementMaster,
932                        },
933                        Some(property_listener_handler_shim),
934                        &*self.callback as *const _ as *mut _,
935                    );
936                }
937            }
938        }
939    }
940
941    impl futures::Stream for CoreAudioDefaultDeviceChangeListener {
942        type Item = ();
943
944        fn poll_next(
945            mut self: std::pin::Pin<&mut Self>,
946            cx: &mut std::task::Context<'_>,
947        ) -> std::task::Poll<Option<Self::Item>> {
948            self.rx.poll_next_unpin(cx)
949        }
950    }
951}
952
953#[cfg(target_os = "macos")]
954type DeviceChangeListener = macos::CoreAudioDefaultDeviceChangeListener;
955
956#[cfg(not(target_os = "macos"))]
957mod noop_change_listener {
958    use std::task::Poll;
959
960    use super::DeviceChangeListenerApi;
961
962    pub struct NoopOutputDeviceChangelistener {}
963
964    impl DeviceChangeListenerApi for NoopOutputDeviceChangelistener {
965        fn new(_input: bool) -> anyhow::Result<Self> {
966            Ok(NoopOutputDeviceChangelistener {})
967        }
968    }
969
970    impl futures::Stream for NoopOutputDeviceChangelistener {
971        type Item = ();
972
973        fn poll_next(
974            self: std::pin::Pin<&mut Self>,
975            _cx: &mut std::task::Context<'_>,
976        ) -> Poll<Option<Self::Item>> {
977            Poll::Pending
978        }
979    }
980}
981
982#[cfg(not(target_os = "macos"))]
983type DeviceChangeListener = noop_change_listener::NoopOutputDeviceChangelistener;