playback.rs

   1use anyhow::{Context as _, Result};
   2
   3use audio::{AudioSettings, CHANNEL_COUNT, LEGACY_CHANNEL_COUNT, LEGACY_SAMPLE_RATE, SAMPLE_RATE};
   4use cpal::traits::{DeviceTrait, StreamTrait as _};
   5use futures::channel::mpsc::UnboundedSender;
   6use futures::{Stream, StreamExt as _};
   7use gpui::{
   8    AsyncApp, BackgroundExecutor, ScreenCaptureFrame, ScreenCaptureSource, ScreenCaptureStream,
   9    Task,
  10};
  11use libwebrtc::native::{apm, audio_mixer, audio_resampler};
  12use livekit::track;
  13
  14use livekit::webrtc::{
  15    audio_frame::AudioFrame,
  16    audio_source::{AudioSourceOptions, RtcAudioSource, native::NativeAudioSource},
  17    audio_stream::native::NativeAudioStream,
  18    video_frame::{VideoBuffer, VideoFrame, VideoRotation},
  19    video_source::{RtcVideoSource, VideoResolution, native::NativeVideoSource},
  20    video_stream::native::NativeVideoStream,
  21};
  22use log::info;
  23use parking_lot::Mutex;
  24use rodio::Source;
  25use serde::{Deserialize, Serialize};
  26use settings::Settings;
  27use std::cell::RefCell;
  28use std::sync::Weak;
  29use std::sync::atomic::{AtomicBool, AtomicI32, Ordering};
  30use std::time::Duration;
  31use std::{borrow::Cow, collections::VecDeque, sync::Arc, thread};
  32use util::{ResultExt as _, maybe};
  33
  34mod source;
  35
  36pub(crate) struct AudioStack {
  37    executor: BackgroundExecutor,
  38    apm: Arc<Mutex<apm::AudioProcessingModule>>,
  39    mixer: Arc<Mutex<audio_mixer::AudioMixer>>,
  40    _output_task: RefCell<Weak<Task<()>>>,
  41    next_ssrc: AtomicI32,
  42}
  43
  44pub(crate) fn play_remote_audio_track(
  45    track: &livekit::track::RemoteAudioTrack,
  46    speaker: Speaker,
  47    cx: &mut gpui::App,
  48) -> Result<AudioStream> {
  49    let stream = source::LiveKitStream::new(
  50        cx.background_executor(),
  51        track,
  52        speaker.legacy_audio_compatible,
  53    );
  54
  55    let stop_handle = Arc::new(AtomicBool::new(false));
  56    let stop_handle_clone = stop_handle.clone();
  57    let stream = stream
  58        .stoppable()
  59        .periodic_access(Duration::from_millis(50), move |s| {
  60            if stop_handle.load(Ordering::Relaxed) {
  61                s.stop();
  62            }
  63        });
  64
  65    audio::Audio::play_voip_stream(stream, speaker.name, speaker.is_staff, cx)
  66        .context("Could not play audio")?;
  67
  68    let on_drop = util::defer(move || {
  69        stop_handle_clone.store(true, Ordering::Relaxed);
  70    });
  71    Ok(AudioStream::Output {
  72        _drop: Box::new(on_drop),
  73    })
  74}
  75
  76impl AudioStack {
  77    pub(crate) fn new(executor: BackgroundExecutor) -> Self {
  78        let apm = Arc::new(Mutex::new(apm::AudioProcessingModule::new(
  79            true, true, true, true,
  80        )));
  81        let mixer = Arc::new(Mutex::new(audio_mixer::AudioMixer::new()));
  82        Self {
  83            executor,
  84            apm,
  85            mixer,
  86            _output_task: RefCell::new(Weak::new()),
  87            next_ssrc: AtomicI32::new(1),
  88        }
  89    }
  90
  91    pub(crate) fn play_remote_audio_track(
  92        &self,
  93        track: &livekit::track::RemoteAudioTrack,
  94    ) -> AudioStream {
  95        let output_task = self.start_output();
  96
  97        let next_ssrc = self.next_ssrc.fetch_add(1, Ordering::Relaxed);
  98        let source = AudioMixerSource {
  99            ssrc: next_ssrc,
 100            sample_rate: SAMPLE_RATE.get(),
 101            num_channels: CHANNEL_COUNT.get() as u32,
 102            buffer: Arc::default(),
 103        };
 104        self.mixer.lock().add_source(source.clone());
 105
 106        let mut stream = NativeAudioStream::new(
 107            track.rtc_track(),
 108            source.sample_rate as i32,
 109            source.num_channels as i32,
 110        );
 111
 112        let receive_task = self.executor.spawn({
 113            let source = source.clone();
 114            async move {
 115                while let Some(frame) = stream.next().await {
 116                    source.receive(frame);
 117                }
 118            }
 119        });
 120
 121        let mixer = self.mixer.clone();
 122        let on_drop = util::defer(move || {
 123            mixer.lock().remove_source(source.ssrc);
 124            drop(receive_task);
 125            drop(output_task);
 126        });
 127
 128        AudioStream::Output {
 129            _drop: Box::new(on_drop),
 130        }
 131    }
 132
 133    fn start_output(&self) -> Arc<Task<()>> {
 134        if let Some(task) = self._output_task.borrow().upgrade() {
 135            return task;
 136        }
 137        let task = Arc::new(self.executor.spawn({
 138            let apm = self.apm.clone();
 139            let mixer = self.mixer.clone();
 140            async move {
 141                Self::play_output(apm, mixer, SAMPLE_RATE.get(), CHANNEL_COUNT.get().into())
 142                    .await
 143                    .log_err();
 144            }
 145        }));
 146        *self._output_task.borrow_mut() = Arc::downgrade(&task);
 147        task
 148    }
 149
 150    pub(crate) fn capture_local_microphone_track(
 151        &self,
 152        user_name: String,
 153        is_staff: bool,
 154        cx: &AsyncApp,
 155    ) -> Result<(crate::LocalAudioTrack, AudioStream)> {
 156        let legacy_audio_compatible =
 157            AudioSettings::try_read_global(cx, |setting| setting.legacy_audio_compatible)
 158                .unwrap_or_default();
 159
 160        let source = if legacy_audio_compatible {
 161            NativeAudioSource::new(
 162                // n.b. this struct's options are always ignored, noise cancellation is provided by apm.
 163                AudioSourceOptions::default(),
 164                LEGACY_SAMPLE_RATE.get(),
 165                LEGACY_CHANNEL_COUNT.get().into(),
 166                10,
 167            )
 168        } else {
 169            NativeAudioSource::new(
 170                // n.b. this struct's options are always ignored, noise cancellation is provided by apm.
 171                AudioSourceOptions::default(),
 172                SAMPLE_RATE.get(),
 173                CHANNEL_COUNT.get().into(),
 174                10,
 175            )
 176        };
 177
 178        let track_name = serde_urlencoded::to_string(Speaker {
 179            name: user_name,
 180            is_staff,
 181            legacy_audio_compatible,
 182        })
 183        .context("Could not encode user information in track name")?;
 184
 185        let track = track::LocalAudioTrack::create_audio_track(
 186            &track_name,
 187            RtcAudioSource::Native(source.clone()),
 188        );
 189
 190        let apm = self.apm.clone();
 191
 192        let (frame_tx, mut frame_rx) = futures::channel::mpsc::unbounded();
 193        let transmit_task = self.executor.spawn({
 194            async move {
 195                while let Some(frame) = frame_rx.next().await {
 196                    source.capture_frame(&frame).await.log_err();
 197                }
 198            }
 199        });
 200        let rodio_pipeline =
 201            AudioSettings::try_read_global(cx, |setting| setting.rodio_audio).unwrap_or_default();
 202        let capture_task = if rodio_pipeline {
 203            info!("Using experimental.rodio_audio audio pipeline");
 204            let voip_parts = audio::VoipParts::new(cx)?;
 205            // Audio needs to run real-time and should never be paused. That is
 206            // why we are using a normal std::thread and not a background task
 207            thread::Builder::new()
 208                .name("MicrophoneToLivekit".to_string())
 209                .spawn(move || {
 210                    // microphone is non send on mac
 211                    let microphone = match audio::Audio::open_microphone(voip_parts) {
 212                        Ok(m) => m,
 213                        Err(e) => {
 214                            log::error!("Could not open microphone: {e}");
 215                            return;
 216                        }
 217                    };
 218                    send_to_livekit(frame_tx, microphone);
 219                })
 220                .expect("should be able to spawn threads");
 221            Task::ready(Ok(()))
 222        } else {
 223            self.executor.spawn(async move {
 224                Self::capture_input(
 225                    apm,
 226                    frame_tx,
 227                    LEGACY_SAMPLE_RATE.get(),
 228                    LEGACY_CHANNEL_COUNT.get().into(),
 229                )
 230                .await
 231            })
 232        };
 233
 234        let on_drop = util::defer(|| {
 235            drop(transmit_task);
 236            drop(capture_task);
 237        });
 238        Ok((
 239            super::LocalAudioTrack(track),
 240            AudioStream::Output {
 241                _drop: Box::new(on_drop),
 242            },
 243        ))
 244    }
 245
 246    async fn play_output(
 247        apm: Arc<Mutex<apm::AudioProcessingModule>>,
 248        mixer: Arc<Mutex<audio_mixer::AudioMixer>>,
 249        sample_rate: u32,
 250        num_channels: u32,
 251    ) -> Result<()> {
 252        loop {
 253            let mut device_change_listener = DeviceChangeListener::new(false)?;
 254            let (output_device, output_config) = crate::default_device(false)?;
 255            let (end_on_drop_tx, end_on_drop_rx) = std::sync::mpsc::channel::<()>();
 256            let mixer = mixer.clone();
 257            let apm = apm.clone();
 258            let mut resampler = audio_resampler::AudioResampler::default();
 259            let mut buf = Vec::new();
 260
 261            thread::Builder::new()
 262                .name("AudioPlayback".to_owned())
 263                .spawn(move || {
 264                    let output_stream = output_device.build_output_stream(
 265                        &output_config.config(),
 266                        {
 267                            move |mut data, _info| {
 268                                while data.len() > 0 {
 269                                    if data.len() <= buf.len() {
 270                                        let rest = buf.split_off(data.len());
 271                                        data.copy_from_slice(&buf);
 272                                        buf = rest;
 273                                        return;
 274                                    }
 275                                    if buf.len() > 0 {
 276                                        let (prefix, suffix) = data.split_at_mut(buf.len());
 277                                        prefix.copy_from_slice(&buf);
 278                                        data = suffix;
 279                                    }
 280
 281                                    let mut mixer = mixer.lock();
 282                                    let mixed = mixer.mix(output_config.channels() as usize);
 283                                    let sampled = resampler.remix_and_resample(
 284                                        mixed,
 285                                        sample_rate / 100,
 286                                        num_channels,
 287                                        sample_rate,
 288                                        output_config.channels() as u32,
 289                                        output_config.sample_rate().0,
 290                                    );
 291                                    buf = sampled.to_vec();
 292                                    apm.lock()
 293                                        .process_reverse_stream(
 294                                            &mut buf,
 295                                            output_config.sample_rate().0 as i32,
 296                                            output_config.channels() as i32,
 297                                        )
 298                                        .ok();
 299                                }
 300                            }
 301                        },
 302                        |error| log::error!("error playing audio track: {:?}", error),
 303                        Some(Duration::from_millis(100)),
 304                    );
 305
 306                    let Some(output_stream) = output_stream.log_err() else {
 307                        return;
 308                    };
 309
 310                    output_stream.play().log_err();
 311                    // Block forever to keep the output stream alive
 312                    end_on_drop_rx.recv().ok();
 313                })
 314                .unwrap();
 315
 316            device_change_listener.next().await;
 317            drop(end_on_drop_tx)
 318        }
 319    }
 320
 321    async fn capture_input(
 322        apm: Arc<Mutex<apm::AudioProcessingModule>>,
 323        frame_tx: UnboundedSender<AudioFrame<'static>>,
 324        sample_rate: u32,
 325        num_channels: u32,
 326    ) -> Result<()> {
 327        loop {
 328            let mut device_change_listener = DeviceChangeListener::new(true)?;
 329            let (device, config) = crate::default_device(true)?;
 330            let (end_on_drop_tx, end_on_drop_rx) = std::sync::mpsc::channel::<()>();
 331            let apm = apm.clone();
 332            let frame_tx = frame_tx.clone();
 333            let mut resampler = audio_resampler::AudioResampler::default();
 334
 335            thread::Builder::new()
 336                .name("AudioCapture".to_owned())
 337                .spawn(move || {
 338                    maybe!({
 339                        if let Some(name) = device.name().ok() {
 340                            log::info!("Using microphone: {}", name)
 341                        } else {
 342                            log::info!("Using microphone: <unknown>");
 343                        }
 344
 345                        let ten_ms_buffer_size =
 346                            (config.channels() as u32 * config.sample_rate().0 / 100) as usize;
 347                        let mut buf: Vec<i16> = Vec::with_capacity(ten_ms_buffer_size);
 348
 349                        let stream = device
 350                            .build_input_stream_raw(
 351                                &config.config(),
 352                                config.sample_format(),
 353                                move |data, _: &_| {
 354                                    let data = crate::get_sample_data(config.sample_format(), data)
 355                                        .log_err();
 356                                    let Some(data) = data else {
 357                                        return;
 358                                    };
 359                                    let mut data = data.as_slice();
 360
 361                                    while data.len() > 0 {
 362                                        let remainder =
 363                                            (buf.capacity() - buf.len()).min(data.len());
 364                                        buf.extend_from_slice(&data[..remainder]);
 365                                        data = &data[remainder..];
 366
 367                                        if buf.capacity() == buf.len() {
 368                                            let mut sampled = resampler
 369                                                .remix_and_resample(
 370                                                    buf.as_slice(),
 371                                                    config.sample_rate().0 / 100,
 372                                                    config.channels() as u32,
 373                                                    config.sample_rate().0,
 374                                                    num_channels,
 375                                                    sample_rate,
 376                                                )
 377                                                .to_owned();
 378                                            apm.lock()
 379                                                .process_stream(
 380                                                    &mut sampled,
 381                                                    sample_rate as i32,
 382                                                    num_channels as i32,
 383                                                )
 384                                                .log_err();
 385                                            buf.clear();
 386                                            frame_tx
 387                                                .unbounded_send(AudioFrame {
 388                                                    data: Cow::Owned(sampled),
 389                                                    sample_rate,
 390                                                    num_channels,
 391                                                    samples_per_channel: sample_rate / 100,
 392                                                })
 393                                                .ok();
 394                                        }
 395                                    }
 396                                },
 397                                |err| log::error!("error capturing audio track: {:?}", err),
 398                                Some(Duration::from_millis(100)),
 399                            )
 400                            .context("failed to build input stream")?;
 401
 402                        stream.play()?;
 403                        // Keep the thread alive and holding onto the `stream`
 404                        end_on_drop_rx.recv().ok();
 405                        anyhow::Ok(Some(()))
 406                    })
 407                    .log_err();
 408                })
 409                .unwrap();
 410
 411            device_change_listener.next().await;
 412            drop(end_on_drop_tx)
 413        }
 414    }
 415}
 416
 417#[derive(Serialize, Deserialize)]
 418pub struct Speaker {
 419    pub name: String,
 420    pub is_staff: bool,
 421    pub legacy_audio_compatible: bool,
 422}
 423
 424fn send_to_livekit(frame_tx: UnboundedSender<AudioFrame<'static>>, mut microphone: impl Source) {
 425    use cpal::Sample;
 426    let sample_rate = microphone.sample_rate().get();
 427    let num_channels = microphone.channels().get() as u32;
 428    let buffer_size = sample_rate / 100 * num_channels;
 429
 430    loop {
 431        let sampled: Vec<_> = microphone
 432            .by_ref()
 433            .take(buffer_size as usize)
 434            .map(|s| s.to_sample())
 435            .collect();
 436
 437        if frame_tx
 438            .unbounded_send(AudioFrame {
 439                sample_rate,
 440                num_channels,
 441                samples_per_channel: sampled.len() as u32 / num_channels,
 442                data: Cow::Owned(sampled),
 443            })
 444            .is_err()
 445        {
 446            // must rx has dropped or is not consuming
 447            break;
 448        }
 449    }
 450}
 451
 452use super::LocalVideoTrack;
 453
 454pub enum AudioStream {
 455    Input { _task: Task<()> },
 456    Output { _drop: Box<dyn std::any::Any> },
 457}
 458
 459pub(crate) async fn capture_local_video_track(
 460    capture_source: &dyn ScreenCaptureSource,
 461    cx: &mut gpui::AsyncApp,
 462) -> Result<(crate::LocalVideoTrack, Box<dyn ScreenCaptureStream>)> {
 463    let metadata = capture_source.metadata()?;
 464    let track_source = gpui_tokio::Tokio::spawn(cx, async move {
 465        NativeVideoSource::new(VideoResolution {
 466            width: metadata.resolution.width.0 as u32,
 467            height: metadata.resolution.height.0 as u32,
 468        })
 469    })?
 470    .await?;
 471
 472    let capture_stream = capture_source
 473        .stream(cx.foreground_executor(), {
 474            let track_source = track_source.clone();
 475            Box::new(move |frame| {
 476                if let Some(buffer) = video_frame_buffer_to_webrtc(frame) {
 477                    track_source.capture_frame(&VideoFrame {
 478                        rotation: VideoRotation::VideoRotation0,
 479                        timestamp_us: 0,
 480                        buffer,
 481                    });
 482                }
 483            })
 484        })
 485        .await??;
 486
 487    Ok((
 488        LocalVideoTrack(track::LocalVideoTrack::create_video_track(
 489            "screen share",
 490            RtcVideoSource::Native(track_source),
 491        )),
 492        capture_stream,
 493    ))
 494}
 495
 496#[derive(Clone)]
 497struct AudioMixerSource {
 498    ssrc: i32,
 499    sample_rate: u32,
 500    num_channels: u32,
 501    buffer: Arc<Mutex<VecDeque<Vec<i16>>>>,
 502}
 503
 504impl AudioMixerSource {
 505    fn receive(&self, frame: AudioFrame) {
 506        assert_eq!(
 507            frame.data.len() as u32,
 508            self.sample_rate * self.num_channels / 100
 509        );
 510
 511        let mut buffer = self.buffer.lock();
 512        buffer.push_back(frame.data.to_vec());
 513        while buffer.len() > 10 {
 514            buffer.pop_front();
 515        }
 516    }
 517}
 518
 519impl libwebrtc::native::audio_mixer::AudioMixerSource for AudioMixerSource {
 520    fn ssrc(&self) -> i32 {
 521        self.ssrc
 522    }
 523
 524    fn preferred_sample_rate(&self) -> u32 {
 525        self.sample_rate
 526    }
 527
 528    fn get_audio_frame_with_info<'a>(&self, target_sample_rate: u32) -> Option<AudioFrame<'_>> {
 529        assert_eq!(self.sample_rate, target_sample_rate);
 530        let buf = self.buffer.lock().pop_front()?;
 531        Some(AudioFrame {
 532            data: Cow::Owned(buf),
 533            sample_rate: self.sample_rate,
 534            num_channels: self.num_channels,
 535            samples_per_channel: self.sample_rate / 100,
 536        })
 537    }
 538}
 539
 540pub fn play_remote_video_track(
 541    track: &crate::RemoteVideoTrack,
 542) -> impl Stream<Item = RemoteVideoFrame> + use<> {
 543    #[cfg(target_os = "macos")]
 544    {
 545        let mut pool = None;
 546        let most_recent_frame_size = (0, 0);
 547        NativeVideoStream::new(track.0.rtc_track()).filter_map(move |frame| {
 548            if pool == None
 549                || most_recent_frame_size != (frame.buffer.width(), frame.buffer.height())
 550            {
 551                pool = create_buffer_pool(frame.buffer.width(), frame.buffer.height()).log_err();
 552            }
 553            let pool = pool.clone();
 554            async move {
 555                if frame.buffer.width() < 10 && frame.buffer.height() < 10 {
 556                    // when the remote stops sharing, we get an 8x8 black image.
 557                    // In a lil bit, the unpublish will come through and close the view,
 558                    // but until then, don't flash black.
 559                    return None;
 560                }
 561
 562                video_frame_buffer_from_webrtc(pool?, frame.buffer)
 563            }
 564        })
 565    }
 566    #[cfg(not(target_os = "macos"))]
 567    {
 568        NativeVideoStream::new(track.0.rtc_track())
 569            .filter_map(|frame| async move { video_frame_buffer_from_webrtc(frame.buffer) })
 570    }
 571}
 572
 573#[cfg(target_os = "macos")]
 574fn create_buffer_pool(
 575    width: u32,
 576    height: u32,
 577) -> Result<core_video::pixel_buffer_pool::CVPixelBufferPool> {
 578    use core_foundation::{base::TCFType, number::CFNumber, string::CFString};
 579    use core_video::pixel_buffer;
 580    use core_video::{
 581        pixel_buffer::kCVPixelFormatType_420YpCbCr8BiPlanarFullRange,
 582        pixel_buffer_io_surface::kCVPixelBufferIOSurfaceCoreAnimationCompatibilityKey,
 583        pixel_buffer_pool::{self},
 584    };
 585
 586    let width_key: CFString =
 587        unsafe { CFString::wrap_under_get_rule(pixel_buffer::kCVPixelBufferWidthKey) };
 588    let height_key: CFString =
 589        unsafe { CFString::wrap_under_get_rule(pixel_buffer::kCVPixelBufferHeightKey) };
 590    let animation_key: CFString = unsafe {
 591        CFString::wrap_under_get_rule(kCVPixelBufferIOSurfaceCoreAnimationCompatibilityKey)
 592    };
 593    let format_key: CFString =
 594        unsafe { CFString::wrap_under_get_rule(pixel_buffer::kCVPixelBufferPixelFormatTypeKey) };
 595
 596    let yes: CFNumber = 1.into();
 597    let width: CFNumber = (width as i32).into();
 598    let height: CFNumber = (height as i32).into();
 599    let format: CFNumber = (kCVPixelFormatType_420YpCbCr8BiPlanarFullRange as i64).into();
 600
 601    let buffer_attributes = core_foundation::dictionary::CFDictionary::from_CFType_pairs(&[
 602        (width_key, width.into_CFType()),
 603        (height_key, height.into_CFType()),
 604        (animation_key, yes.into_CFType()),
 605        (format_key, format.into_CFType()),
 606    ]);
 607
 608    pixel_buffer_pool::CVPixelBufferPool::new(None, Some(&buffer_attributes)).map_err(|cv_return| {
 609        anyhow::anyhow!("failed to create pixel buffer pool: CVReturn({cv_return})",)
 610    })
 611}
 612
 613#[cfg(target_os = "macos")]
 614pub type RemoteVideoFrame = core_video::pixel_buffer::CVPixelBuffer;
 615
 616#[cfg(target_os = "macos")]
 617fn video_frame_buffer_from_webrtc(
 618    pool: core_video::pixel_buffer_pool::CVPixelBufferPool,
 619    buffer: Box<dyn VideoBuffer>,
 620) -> Option<RemoteVideoFrame> {
 621    use core_foundation::base::TCFType;
 622    use core_video::{pixel_buffer::CVPixelBuffer, r#return::kCVReturnSuccess};
 623    use livekit::webrtc::native::yuv_helper::i420_to_nv12;
 624
 625    if let Some(native) = buffer.as_native() {
 626        let pixel_buffer = native.get_cv_pixel_buffer();
 627        if pixel_buffer.is_null() {
 628            return None;
 629        }
 630        return unsafe { Some(CVPixelBuffer::wrap_under_get_rule(pixel_buffer as _)) };
 631    }
 632
 633    let i420_buffer = buffer.as_i420()?;
 634    let pixel_buffer = pool.create_pixel_buffer().log_err()?;
 635
 636    let image_buffer = unsafe {
 637        if pixel_buffer.lock_base_address(0) != kCVReturnSuccess {
 638            return None;
 639        }
 640
 641        let dst_y = pixel_buffer.get_base_address_of_plane(0);
 642        let dst_y_stride = pixel_buffer.get_bytes_per_row_of_plane(0);
 643        let dst_y_len = pixel_buffer.get_height_of_plane(0) * dst_y_stride;
 644        let dst_uv = pixel_buffer.get_base_address_of_plane(1);
 645        let dst_uv_stride = pixel_buffer.get_bytes_per_row_of_plane(1);
 646        let dst_uv_len = pixel_buffer.get_height_of_plane(1) * dst_uv_stride;
 647        let width = pixel_buffer.get_width();
 648        let height = pixel_buffer.get_height();
 649        let dst_y_buffer = std::slice::from_raw_parts_mut(dst_y as *mut u8, dst_y_len);
 650        let dst_uv_buffer = std::slice::from_raw_parts_mut(dst_uv as *mut u8, dst_uv_len);
 651
 652        let (stride_y, stride_u, stride_v) = i420_buffer.strides();
 653        let (src_y, src_u, src_v) = i420_buffer.data();
 654        i420_to_nv12(
 655            src_y,
 656            stride_y,
 657            src_u,
 658            stride_u,
 659            src_v,
 660            stride_v,
 661            dst_y_buffer,
 662            dst_y_stride as u32,
 663            dst_uv_buffer,
 664            dst_uv_stride as u32,
 665            width as i32,
 666            height as i32,
 667        );
 668
 669        if pixel_buffer.unlock_base_address(0) != kCVReturnSuccess {
 670            return None;
 671        }
 672
 673        pixel_buffer
 674    };
 675
 676    Some(image_buffer)
 677}
 678
 679#[cfg(not(target_os = "macos"))]
 680pub type RemoteVideoFrame = Arc<gpui::RenderImage>;
 681
 682#[cfg(not(target_os = "macos"))]
 683fn video_frame_buffer_from_webrtc(buffer: Box<dyn VideoBuffer>) -> Option<RemoteVideoFrame> {
 684    use gpui::RenderImage;
 685    use image::{Frame, RgbaImage};
 686    use livekit::webrtc::prelude::VideoFormatType;
 687    use smallvec::SmallVec;
 688    use std::alloc::{Layout, alloc};
 689
 690    let width = buffer.width();
 691    let height = buffer.height();
 692    let stride = width * 4;
 693    let byte_len = (stride * height) as usize;
 694    let argb_image = unsafe {
 695        // Motivation for this unsafe code is to avoid initializing the frame data, since to_argb
 696        // will write all bytes anyway.
 697        let start_ptr = alloc(Layout::array::<u8>(byte_len).log_err()?);
 698        if start_ptr.is_null() {
 699            return None;
 700        }
 701        let argb_frame_slice = std::slice::from_raw_parts_mut(start_ptr, byte_len);
 702        buffer.to_argb(
 703            VideoFormatType::ARGB,
 704            argb_frame_slice,
 705            stride,
 706            width as i32,
 707            height as i32,
 708        );
 709        Vec::from_raw_parts(start_ptr, byte_len, byte_len)
 710    };
 711
 712    // TODO: Unclear why providing argb_image to RgbaImage works properly.
 713    let image = RgbaImage::from_raw(width, height, argb_image)
 714        .with_context(|| "Bug: not enough bytes allocated for image.")
 715        .log_err()?;
 716
 717    Some(Arc::new(RenderImage::new(SmallVec::from_elem(
 718        Frame::new(image),
 719        1,
 720    ))))
 721}
 722
 723#[cfg(target_os = "macos")]
 724fn video_frame_buffer_to_webrtc(frame: ScreenCaptureFrame) -> Option<impl AsRef<dyn VideoBuffer>> {
 725    use livekit::webrtc;
 726
 727    let pixel_buffer = frame.0.as_concrete_TypeRef();
 728    std::mem::forget(frame.0);
 729    unsafe {
 730        Some(webrtc::video_frame::native::NativeBuffer::from_cv_pixel_buffer(pixel_buffer as _))
 731    }
 732}
 733
 734#[cfg(not(target_os = "macos"))]
 735fn video_frame_buffer_to_webrtc(frame: ScreenCaptureFrame) -> Option<impl AsRef<dyn VideoBuffer>> {
 736    use libwebrtc::native::yuv_helper::{abgr_to_nv12, argb_to_nv12};
 737    use livekit::webrtc::prelude::NV12Buffer;
 738    match frame.0 {
 739        scap::frame::Frame::BGRx(frame) => {
 740            let mut buffer = NV12Buffer::new(frame.width as u32, frame.height as u32);
 741            let (stride_y, stride_uv) = buffer.strides();
 742            let (data_y, data_uv) = buffer.data_mut();
 743            argb_to_nv12(
 744                &frame.data,
 745                frame.width as u32 * 4,
 746                data_y,
 747                stride_y,
 748                data_uv,
 749                stride_uv,
 750                frame.width,
 751                frame.height,
 752            );
 753            Some(buffer)
 754        }
 755        scap::frame::Frame::RGBx(frame) => {
 756            let mut buffer = NV12Buffer::new(frame.width as u32, frame.height as u32);
 757            let (stride_y, stride_uv) = buffer.strides();
 758            let (data_y, data_uv) = buffer.data_mut();
 759            abgr_to_nv12(
 760                &frame.data,
 761                frame.width as u32 * 4,
 762                data_y,
 763                stride_y,
 764                data_uv,
 765                stride_uv,
 766                frame.width,
 767                frame.height,
 768            );
 769            Some(buffer)
 770        }
 771        scap::frame::Frame::YUVFrame(yuvframe) => {
 772            let mut buffer = NV12Buffer::with_strides(
 773                yuvframe.width as u32,
 774                yuvframe.height as u32,
 775                yuvframe.luminance_stride as u32,
 776                yuvframe.chrominance_stride as u32,
 777            );
 778            let (luminance, chrominance) = buffer.data_mut();
 779            luminance.copy_from_slice(yuvframe.luminance_bytes.as_slice());
 780            chrominance.copy_from_slice(yuvframe.chrominance_bytes.as_slice());
 781            Some(buffer)
 782        }
 783        _ => {
 784            log::error!(
 785                "Expected BGRx or YUV frame from scap screen capture but got some other format."
 786            );
 787            None
 788        }
 789    }
 790}
 791
 792trait DeviceChangeListenerApi: Stream<Item = ()> + Sized {
 793    fn new(input: bool) -> Result<Self>;
 794}
 795
 796#[cfg(target_os = "macos")]
 797mod macos {
 798
 799    use coreaudio::sys::{
 800        AudioObjectAddPropertyListener, AudioObjectID, AudioObjectPropertyAddress,
 801        AudioObjectRemovePropertyListener, OSStatus, kAudioHardwarePropertyDefaultInputDevice,
 802        kAudioHardwarePropertyDefaultOutputDevice, kAudioObjectPropertyElementMaster,
 803        kAudioObjectPropertyScopeGlobal, kAudioObjectSystemObject,
 804    };
 805    use futures::{StreamExt, channel::mpsc::UnboundedReceiver};
 806
 807    /// Implementation from: https://github.com/zed-industries/cpal/blob/fd8bc2fd39f1f5fdee5a0690656caff9a26d9d50/src/host/coreaudio/macos/property_listener.rs#L15
 808    pub struct CoreAudioDefaultDeviceChangeListener {
 809        rx: UnboundedReceiver<()>,
 810        callback: Box<PropertyListenerCallbackWrapper>,
 811        input: bool,
 812        device_id: AudioObjectID, // Store the device ID to properly remove listeners
 813    }
 814
 815    trait _AssertSend: Send {}
 816    impl _AssertSend for CoreAudioDefaultDeviceChangeListener {}
 817
 818    struct PropertyListenerCallbackWrapper(Box<dyn FnMut() + Send>);
 819
 820    unsafe extern "C" fn property_listener_handler_shim(
 821        _: AudioObjectID,
 822        _: u32,
 823        _: *const AudioObjectPropertyAddress,
 824        callback: *mut ::std::os::raw::c_void,
 825    ) -> OSStatus {
 826        let wrapper = callback as *mut PropertyListenerCallbackWrapper;
 827        unsafe { (*wrapper).0() };
 828        0
 829    }
 830
 831    impl super::DeviceChangeListenerApi for CoreAudioDefaultDeviceChangeListener {
 832        fn new(input: bool) -> anyhow::Result<Self> {
 833            let (tx, rx) = futures::channel::mpsc::unbounded();
 834
 835            let callback = Box::new(PropertyListenerCallbackWrapper(Box::new(move || {
 836                tx.unbounded_send(()).ok();
 837            })));
 838
 839            // Get the current default device ID
 840            let device_id = unsafe {
 841                // Listen for default device changes
 842                coreaudio::Error::from_os_status(AudioObjectAddPropertyListener(
 843                    kAudioObjectSystemObject,
 844                    &AudioObjectPropertyAddress {
 845                        mSelector: if input {
 846                            kAudioHardwarePropertyDefaultInputDevice
 847                        } else {
 848                            kAudioHardwarePropertyDefaultOutputDevice
 849                        },
 850                        mScope: kAudioObjectPropertyScopeGlobal,
 851                        mElement: kAudioObjectPropertyElementMaster,
 852                    },
 853                    Some(property_listener_handler_shim),
 854                    &*callback as *const _ as *mut _,
 855                ))?;
 856
 857                // Also listen for changes to the device configuration
 858                let device_id = if input {
 859                    let mut input_device: AudioObjectID = 0;
 860                    let mut prop_size = std::mem::size_of::<AudioObjectID>() as u32;
 861                    let result = coreaudio::sys::AudioObjectGetPropertyData(
 862                        kAudioObjectSystemObject,
 863                        &AudioObjectPropertyAddress {
 864                            mSelector: kAudioHardwarePropertyDefaultInputDevice,
 865                            mScope: kAudioObjectPropertyScopeGlobal,
 866                            mElement: kAudioObjectPropertyElementMaster,
 867                        },
 868                        0,
 869                        std::ptr::null(),
 870                        &mut prop_size as *mut _,
 871                        &mut input_device as *mut _ as *mut _,
 872                    );
 873                    if result != 0 {
 874                        log::warn!("Failed to get default input device ID");
 875                        0
 876                    } else {
 877                        input_device
 878                    }
 879                } else {
 880                    let mut output_device: AudioObjectID = 0;
 881                    let mut prop_size = std::mem::size_of::<AudioObjectID>() as u32;
 882                    let result = coreaudio::sys::AudioObjectGetPropertyData(
 883                        kAudioObjectSystemObject,
 884                        &AudioObjectPropertyAddress {
 885                            mSelector: kAudioHardwarePropertyDefaultOutputDevice,
 886                            mScope: kAudioObjectPropertyScopeGlobal,
 887                            mElement: kAudioObjectPropertyElementMaster,
 888                        },
 889                        0,
 890                        std::ptr::null(),
 891                        &mut prop_size as *mut _,
 892                        &mut output_device as *mut _ as *mut _,
 893                    );
 894                    if result != 0 {
 895                        log::warn!("Failed to get default output device ID");
 896                        0
 897                    } else {
 898                        output_device
 899                    }
 900                };
 901
 902                if device_id != 0 {
 903                    // Listen for format changes on the device
 904                    coreaudio::Error::from_os_status(AudioObjectAddPropertyListener(
 905                        device_id,
 906                        &AudioObjectPropertyAddress {
 907                            mSelector: coreaudio::sys::kAudioDevicePropertyStreamFormat,
 908                            mScope: if input {
 909                                coreaudio::sys::kAudioObjectPropertyScopeInput
 910                            } else {
 911                                coreaudio::sys::kAudioObjectPropertyScopeOutput
 912                            },
 913                            mElement: kAudioObjectPropertyElementMaster,
 914                        },
 915                        Some(property_listener_handler_shim),
 916                        &*callback as *const _ as *mut _,
 917                    ))?;
 918                }
 919
 920                device_id
 921            };
 922
 923            Ok(Self {
 924                rx,
 925                callback,
 926                input,
 927                device_id,
 928            })
 929        }
 930    }
 931
 932    impl Drop for CoreAudioDefaultDeviceChangeListener {
 933        fn drop(&mut self) {
 934            unsafe {
 935                // Remove the system-level property listener
 936                AudioObjectRemovePropertyListener(
 937                    kAudioObjectSystemObject,
 938                    &AudioObjectPropertyAddress {
 939                        mSelector: if self.input {
 940                            kAudioHardwarePropertyDefaultInputDevice
 941                        } else {
 942                            kAudioHardwarePropertyDefaultOutputDevice
 943                        },
 944                        mScope: kAudioObjectPropertyScopeGlobal,
 945                        mElement: kAudioObjectPropertyElementMaster,
 946                    },
 947                    Some(property_listener_handler_shim),
 948                    &*self.callback as *const _ as *mut _,
 949                );
 950
 951                // Remove the device-specific property listener if we have a valid device ID
 952                if self.device_id != 0 {
 953                    AudioObjectRemovePropertyListener(
 954                        self.device_id,
 955                        &AudioObjectPropertyAddress {
 956                            mSelector: coreaudio::sys::kAudioDevicePropertyStreamFormat,
 957                            mScope: if self.input {
 958                                coreaudio::sys::kAudioObjectPropertyScopeInput
 959                            } else {
 960                                coreaudio::sys::kAudioObjectPropertyScopeOutput
 961                            },
 962                            mElement: kAudioObjectPropertyElementMaster,
 963                        },
 964                        Some(property_listener_handler_shim),
 965                        &*self.callback as *const _ as *mut _,
 966                    );
 967                }
 968            }
 969        }
 970    }
 971
 972    impl futures::Stream for CoreAudioDefaultDeviceChangeListener {
 973        type Item = ();
 974
 975        fn poll_next(
 976            mut self: std::pin::Pin<&mut Self>,
 977            cx: &mut std::task::Context<'_>,
 978        ) -> std::task::Poll<Option<Self::Item>> {
 979            self.rx.poll_next_unpin(cx)
 980        }
 981    }
 982}
 983
 984#[cfg(target_os = "macos")]
 985type DeviceChangeListener = macos::CoreAudioDefaultDeviceChangeListener;
 986
 987#[cfg(not(target_os = "macos"))]
 988mod noop_change_listener {
 989    use std::task::Poll;
 990
 991    use super::DeviceChangeListenerApi;
 992
 993    pub struct NoopOutputDeviceChangelistener {}
 994
 995    impl DeviceChangeListenerApi for NoopOutputDeviceChangelistener {
 996        fn new(_input: bool) -> anyhow::Result<Self> {
 997            Ok(NoopOutputDeviceChangelistener {})
 998        }
 999    }
1000
1001    impl futures::Stream for NoopOutputDeviceChangelistener {
1002        type Item = ();
1003
1004        fn poll_next(
1005            self: std::pin::Pin<&mut Self>,
1006            _cx: &mut std::task::Context<'_>,
1007        ) -> Poll<Option<Self::Item>> {
1008            Poll::Pending
1009        }
1010    }
1011}
1012
1013#[cfg(not(target_os = "macos"))]
1014type DeviceChangeListener = noop_change_listener::NoopOutputDeviceChangelistener;