playback.rs

   1use anyhow::{Context as _, Result};
   2
   3use audio::{AudioSettings, CHANNEL_COUNT, LEGACY_CHANNEL_COUNT, LEGACY_SAMPLE_RATE, SAMPLE_RATE};
   4use cpal::traits::{DeviceTrait, StreamTrait as _};
   5use futures::channel::mpsc::UnboundedSender;
   6use futures::{Stream, StreamExt as _};
   7use gpui::{
   8    AsyncApp, BackgroundExecutor, ScreenCaptureFrame, ScreenCaptureSource, ScreenCaptureStream,
   9    Task,
  10};
  11use libwebrtc::native::{apm, audio_mixer, audio_resampler};
  12use livekit::track;
  13
  14use livekit::webrtc::{
  15    audio_frame::AudioFrame,
  16    audio_source::{AudioSourceOptions, RtcAudioSource, native::NativeAudioSource},
  17    audio_stream::native::NativeAudioStream,
  18    video_frame::{VideoBuffer, VideoFrame, VideoRotation},
  19    video_source::{RtcVideoSource, VideoResolution, native::NativeVideoSource},
  20    video_stream::native::NativeVideoStream,
  21};
  22use log::info;
  23use parking_lot::Mutex;
  24use rodio::Source;
  25use serde::{Deserialize, Serialize};
  26use settings::Settings;
  27use std::cell::RefCell;
  28use std::sync::Weak;
  29use std::sync::atomic::{AtomicBool, AtomicI32, Ordering};
  30use std::time::Duration;
  31use std::{borrow::Cow, collections::VecDeque, sync::Arc, thread};
  32use util::{ResultExt as _, maybe};
  33
  34mod source;
  35
  36pub(crate) struct AudioStack {
  37    executor: BackgroundExecutor,
  38    apm: Arc<Mutex<apm::AudioProcessingModule>>,
  39    mixer: Arc<Mutex<audio_mixer::AudioMixer>>,
  40    _output_task: RefCell<Weak<Task<()>>>,
  41    next_ssrc: AtomicI32,
  42}
  43
  44pub(crate) fn play_remote_audio_track(
  45    track: &livekit::track::RemoteAudioTrack,
  46    speaker: Speaker,
  47    cx: &mut gpui::App,
  48) -> Result<AudioStream> {
  49    info!("speaker: {speaker:?}");
  50    let stream =
  51        source::LiveKitStream::new(cx.background_executor(), track, speaker.sends_legacy_audio);
  52
  53    let stop_handle = Arc::new(AtomicBool::new(false));
  54    let stop_handle_clone = stop_handle.clone();
  55    let stream = stream
  56        .stoppable()
  57        .periodic_access(Duration::from_millis(50), move |s| {
  58            if stop_handle.load(Ordering::Relaxed) {
  59                s.stop();
  60            }
  61        });
  62
  63    info!("sample_rate: {:?}", stream.sample_rate());
  64    info!("channel_count: {:?}", stream.channels());
  65    audio::Audio::play_voip_stream(stream, speaker.name, speaker.is_staff, cx)
  66        .context("Could not play audio")?;
  67
  68    let on_drop = util::defer(move || {
  69        stop_handle_clone.store(true, Ordering::Relaxed);
  70    });
  71    Ok(AudioStream::Output {
  72        _drop: Box::new(on_drop),
  73    })
  74}
  75
  76impl AudioStack {
  77    pub(crate) fn new(executor: BackgroundExecutor) -> Self {
  78        let apm = Arc::new(Mutex::new(apm::AudioProcessingModule::new(
  79            true, true, true, true,
  80        )));
  81        let mixer = Arc::new(Mutex::new(audio_mixer::AudioMixer::new()));
  82        Self {
  83            executor,
  84            apm,
  85            mixer,
  86            _output_task: RefCell::new(Weak::new()),
  87            next_ssrc: AtomicI32::new(1),
  88        }
  89    }
  90
  91    pub(crate) fn play_remote_audio_track(
  92        &self,
  93        track: &livekit::track::RemoteAudioTrack,
  94    ) -> AudioStream {
  95        let output_task = self.start_output();
  96
  97        let next_ssrc = self.next_ssrc.fetch_add(1, Ordering::Relaxed);
  98        let source = AudioMixerSource {
  99            ssrc: next_ssrc,
 100            sample_rate: LEGACY_SAMPLE_RATE.get(),
 101            num_channels: LEGACY_CHANNEL_COUNT.get() as u32,
 102            buffer: Arc::default(),
 103        };
 104        self.mixer.lock().add_source(source.clone());
 105
 106        let mut stream = NativeAudioStream::new(
 107            track.rtc_track(),
 108            source.sample_rate as i32,
 109            source.num_channels as i32,
 110        );
 111
 112        let receive_task = self.executor.spawn({
 113            let source = source.clone();
 114            async move {
 115                while let Some(frame) = stream.next().await {
 116                    source.receive(frame);
 117                }
 118            }
 119        });
 120
 121        let mixer = self.mixer.clone();
 122        let on_drop = util::defer(move || {
 123            mixer.lock().remove_source(source.ssrc);
 124            drop(receive_task);
 125            drop(output_task);
 126        });
 127
 128        AudioStream::Output {
 129            _drop: Box::new(on_drop),
 130        }
 131    }
 132
 133    fn start_output(&self) -> Arc<Task<()>> {
 134        if let Some(task) = self._output_task.borrow().upgrade() {
 135            return task;
 136        }
 137        let task = Arc::new(self.executor.spawn({
 138            let apm = self.apm.clone();
 139            let mixer = self.mixer.clone();
 140            async move {
 141                Self::play_output(
 142                    apm,
 143                    mixer,
 144                    LEGACY_SAMPLE_RATE.get(),
 145                    LEGACY_CHANNEL_COUNT.get().into(),
 146                )
 147                .await
 148                .log_err();
 149            }
 150        }));
 151        *self._output_task.borrow_mut() = Arc::downgrade(&task);
 152        task
 153    }
 154
 155    pub(crate) fn capture_local_microphone_track(
 156        &self,
 157        user_name: String,
 158        is_staff: bool,
 159        cx: &AsyncApp,
 160    ) -> Result<(crate::LocalAudioTrack, AudioStream)> {
 161        let legacy_audio_compatible =
 162            AudioSettings::try_read_global(cx, |setting| setting.legacy_audio_compatible)
 163                .unwrap_or(true);
 164
 165        let source = if legacy_audio_compatible {
 166            NativeAudioSource::new(
 167                // n.b. this struct's options are always ignored, noise cancellation is provided by apm.
 168                AudioSourceOptions::default(),
 169                LEGACY_SAMPLE_RATE.get(),
 170                LEGACY_CHANNEL_COUNT.get().into(),
 171                10,
 172            )
 173        } else {
 174            NativeAudioSource::new(
 175                // n.b. this struct's options are always ignored, noise cancellation is provided by apm.
 176                AudioSourceOptions::default(),
 177                SAMPLE_RATE.get(),
 178                CHANNEL_COUNT.get().into(),
 179                10,
 180            )
 181        };
 182
 183        let speaker = Speaker {
 184            name: user_name,
 185            is_staff,
 186            sends_legacy_audio: legacy_audio_compatible,
 187        };
 188        log::info!("Microphone speaker: {speaker:?}");
 189        let track_name = serde_urlencoded::to_string(speaker)
 190            .context("Could not encode user information in track name")?;
 191
 192        let track = track::LocalAudioTrack::create_audio_track(
 193            &track_name,
 194            RtcAudioSource::Native(source.clone()),
 195        );
 196
 197        let apm = self.apm.clone();
 198
 199        let (frame_tx, mut frame_rx) = futures::channel::mpsc::unbounded();
 200        let transmit_task = self.executor.spawn({
 201            async move {
 202                while let Some(frame) = frame_rx.next().await {
 203                    source.capture_frame(&frame).await.log_err();
 204                }
 205            }
 206        });
 207        let rodio_pipeline =
 208            AudioSettings::try_read_global(cx, |setting| setting.rodio_audio).unwrap_or_default();
 209        let capture_task = if rodio_pipeline {
 210            info!("Using experimental.rodio_audio audio pipeline");
 211            let voip_parts = audio::VoipParts::new(cx)?;
 212            // Audio needs to run real-time and should never be paused. That is
 213            // why we are using a normal std::thread and not a background task
 214            thread::Builder::new()
 215                .name("MicrophoneToLivekit".to_string())
 216                .spawn(move || {
 217                    // microphone is non send on mac
 218                    let microphone = match audio::Audio::open_microphone(voip_parts) {
 219                        Ok(m) => m,
 220                        Err(e) => {
 221                            log::error!("Could not open microphone: {e}");
 222                            return;
 223                        }
 224                    };
 225                    send_to_livekit(frame_tx, microphone);
 226                })
 227                .expect("should be able to spawn threads");
 228            Task::ready(Ok(()))
 229        } else {
 230            self.executor.spawn(async move {
 231                Self::capture_input(
 232                    apm,
 233                    frame_tx,
 234                    LEGACY_SAMPLE_RATE.get(),
 235                    LEGACY_CHANNEL_COUNT.get().into(),
 236                )
 237                .await
 238            })
 239        };
 240
 241        let on_drop = util::defer(|| {
 242            drop(transmit_task);
 243            drop(capture_task);
 244        });
 245        Ok((
 246            super::LocalAudioTrack(track),
 247            AudioStream::Output {
 248                _drop: Box::new(on_drop),
 249            },
 250        ))
 251    }
 252
 253    async fn play_output(
 254        apm: Arc<Mutex<apm::AudioProcessingModule>>,
 255        mixer: Arc<Mutex<audio_mixer::AudioMixer>>,
 256        sample_rate: u32,
 257        num_channels: u32,
 258    ) -> Result<()> {
 259        loop {
 260            let mut device_change_listener = DeviceChangeListener::new(false)?;
 261            let (output_device, output_config) = crate::default_device(false)?;
 262            let (end_on_drop_tx, end_on_drop_rx) = std::sync::mpsc::channel::<()>();
 263            let mixer = mixer.clone();
 264            let apm = apm.clone();
 265            let mut resampler = audio_resampler::AudioResampler::default();
 266            let mut buf = Vec::new();
 267
 268            thread::Builder::new()
 269                .name("AudioPlayback".to_owned())
 270                .spawn(move || {
 271                    let output_stream = output_device.build_output_stream(
 272                        &output_config.config(),
 273                        {
 274                            move |mut data, _info| {
 275                                while data.len() > 0 {
 276                                    if data.len() <= buf.len() {
 277                                        let rest = buf.split_off(data.len());
 278                                        data.copy_from_slice(&buf);
 279                                        buf = rest;
 280                                        return;
 281                                    }
 282                                    if buf.len() > 0 {
 283                                        let (prefix, suffix) = data.split_at_mut(buf.len());
 284                                        prefix.copy_from_slice(&buf);
 285                                        data = suffix;
 286                                    }
 287
 288                                    let mut mixer = mixer.lock();
 289                                    let mixed = mixer.mix(output_config.channels() as usize);
 290                                    let sampled = resampler.remix_and_resample(
 291                                        mixed,
 292                                        sample_rate / 100,
 293                                        num_channels,
 294                                        sample_rate,
 295                                        output_config.channels() as u32,
 296                                        output_config.sample_rate().0,
 297                                    );
 298                                    buf = sampled.to_vec();
 299                                    apm.lock()
 300                                        .process_reverse_stream(
 301                                            &mut buf,
 302                                            output_config.sample_rate().0 as i32,
 303                                            output_config.channels() as i32,
 304                                        )
 305                                        .ok();
 306                                }
 307                            }
 308                        },
 309                        |error| log::error!("error playing audio track: {:?}", error),
 310                        Some(Duration::from_millis(100)),
 311                    );
 312
 313                    let Some(output_stream) = output_stream.log_err() else {
 314                        return;
 315                    };
 316
 317                    output_stream.play().log_err();
 318                    // Block forever to keep the output stream alive
 319                    end_on_drop_rx.recv().ok();
 320                })
 321                .unwrap();
 322
 323            device_change_listener.next().await;
 324            drop(end_on_drop_tx)
 325        }
 326    }
 327
 328    async fn capture_input(
 329        apm: Arc<Mutex<apm::AudioProcessingModule>>,
 330        frame_tx: UnboundedSender<AudioFrame<'static>>,
 331        sample_rate: u32,
 332        num_channels: u32,
 333    ) -> Result<()> {
 334        loop {
 335            let mut device_change_listener = DeviceChangeListener::new(true)?;
 336            let (device, config) = crate::default_device(true)?;
 337            let (end_on_drop_tx, end_on_drop_rx) = std::sync::mpsc::channel::<()>();
 338            let apm = apm.clone();
 339            let frame_tx = frame_tx.clone();
 340            let mut resampler = audio_resampler::AudioResampler::default();
 341
 342            thread::Builder::new()
 343                .name("AudioCapture".to_owned())
 344                .spawn(move || {
 345                    maybe!({
 346                        if let Some(name) = device.name().ok() {
 347                            log::info!("Using microphone: {}", name)
 348                        } else {
 349                            log::info!("Using microphone: <unknown>");
 350                        }
 351
 352                        let ten_ms_buffer_size =
 353                            (config.channels() as u32 * config.sample_rate().0 / 100) as usize;
 354                        let mut buf: Vec<i16> = Vec::with_capacity(ten_ms_buffer_size);
 355
 356                        let stream = device
 357                            .build_input_stream_raw(
 358                                &config.config(),
 359                                config.sample_format(),
 360                                move |data, _: &_| {
 361                                    let data = crate::get_sample_data(config.sample_format(), data)
 362                                        .log_err();
 363                                    let Some(data) = data else {
 364                                        return;
 365                                    };
 366                                    let mut data = data.as_slice();
 367
 368                                    while data.len() > 0 {
 369                                        let remainder =
 370                                            (buf.capacity() - buf.len()).min(data.len());
 371                                        buf.extend_from_slice(&data[..remainder]);
 372                                        data = &data[remainder..];
 373
 374                                        if buf.capacity() == buf.len() {
 375                                            let mut sampled = resampler
 376                                                .remix_and_resample(
 377                                                    buf.as_slice(),
 378                                                    config.sample_rate().0 / 100,
 379                                                    config.channels() as u32,
 380                                                    config.sample_rate().0,
 381                                                    num_channels,
 382                                                    sample_rate,
 383                                                )
 384                                                .to_owned();
 385                                            apm.lock()
 386                                                .process_stream(
 387                                                    &mut sampled,
 388                                                    sample_rate as i32,
 389                                                    num_channels as i32,
 390                                                )
 391                                                .log_err();
 392                                            buf.clear();
 393                                            frame_tx
 394                                                .unbounded_send(AudioFrame {
 395                                                    data: Cow::Owned(sampled),
 396                                                    sample_rate,
 397                                                    num_channels,
 398                                                    samples_per_channel: sample_rate / 100,
 399                                                })
 400                                                .ok();
 401                                        }
 402                                    }
 403                                },
 404                                |err| log::error!("error capturing audio track: {:?}", err),
 405                                Some(Duration::from_millis(100)),
 406                            )
 407                            .context("failed to build input stream")?;
 408
 409                        stream.play()?;
 410                        // Keep the thread alive and holding onto the `stream`
 411                        end_on_drop_rx.recv().ok();
 412                        anyhow::Ok(Some(()))
 413                    })
 414                    .log_err();
 415                })
 416                .unwrap();
 417
 418            device_change_listener.next().await;
 419            drop(end_on_drop_tx)
 420        }
 421    }
 422}
 423
 424#[derive(Serialize, Deserialize, Debug)]
 425pub struct Speaker {
 426    pub name: String,
 427    pub is_staff: bool,
 428    pub sends_legacy_audio: bool,
 429}
 430
 431fn send_to_livekit(frame_tx: UnboundedSender<AudioFrame<'static>>, mut microphone: impl Source) {
 432    use cpal::Sample;
 433    let sample_rate = microphone.sample_rate().get();
 434    let num_channels = microphone.channels().get() as u32;
 435    let buffer_size = sample_rate / 100 * num_channels;
 436
 437    loop {
 438        let sampled: Vec<_> = microphone
 439            .by_ref()
 440            .take(buffer_size as usize)
 441            .map(|s| s.to_sample())
 442            .collect();
 443
 444        if frame_tx
 445            .unbounded_send(AudioFrame {
 446                sample_rate,
 447                num_channels,
 448                samples_per_channel: sampled.len() as u32 / num_channels,
 449                data: Cow::Owned(sampled),
 450            })
 451            .is_err()
 452        {
 453            // must rx has dropped or is not consuming
 454            break;
 455        }
 456    }
 457}
 458
 459use super::LocalVideoTrack;
 460
 461pub enum AudioStream {
 462    Input { _task: Task<()> },
 463    Output { _drop: Box<dyn std::any::Any> },
 464}
 465
 466pub(crate) async fn capture_local_video_track(
 467    capture_source: &dyn ScreenCaptureSource,
 468    cx: &mut gpui::AsyncApp,
 469) -> Result<(crate::LocalVideoTrack, Box<dyn ScreenCaptureStream>)> {
 470    let metadata = capture_source.metadata()?;
 471    let track_source = gpui_tokio::Tokio::spawn(cx, async move {
 472        NativeVideoSource::new(VideoResolution {
 473            width: metadata.resolution.width.0 as u32,
 474            height: metadata.resolution.height.0 as u32,
 475        })
 476    })?
 477    .await?;
 478
 479    let capture_stream = capture_source
 480        .stream(cx.foreground_executor(), {
 481            let track_source = track_source.clone();
 482            Box::new(move |frame| {
 483                if let Some(buffer) = video_frame_buffer_to_webrtc(frame) {
 484                    track_source.capture_frame(&VideoFrame {
 485                        rotation: VideoRotation::VideoRotation0,
 486                        timestamp_us: 0,
 487                        buffer,
 488                    });
 489                }
 490            })
 491        })
 492        .await??;
 493
 494    Ok((
 495        LocalVideoTrack(track::LocalVideoTrack::create_video_track(
 496            "screen share",
 497            RtcVideoSource::Native(track_source),
 498        )),
 499        capture_stream,
 500    ))
 501}
 502
 503#[derive(Clone)]
 504struct AudioMixerSource {
 505    ssrc: i32,
 506    sample_rate: u32,
 507    num_channels: u32,
 508    buffer: Arc<Mutex<VecDeque<Vec<i16>>>>,
 509}
 510
 511impl AudioMixerSource {
 512    fn receive(&self, frame: AudioFrame) {
 513        assert_eq!(
 514            frame.data.len() as u32,
 515            self.sample_rate * self.num_channels / 100
 516        );
 517
 518        let mut buffer = self.buffer.lock();
 519        buffer.push_back(frame.data.to_vec());
 520        while buffer.len() > 10 {
 521            buffer.pop_front();
 522        }
 523    }
 524}
 525
 526impl libwebrtc::native::audio_mixer::AudioMixerSource for AudioMixerSource {
 527    fn ssrc(&self) -> i32 {
 528        self.ssrc
 529    }
 530
 531    fn preferred_sample_rate(&self) -> u32 {
 532        self.sample_rate
 533    }
 534
 535    fn get_audio_frame_with_info<'a>(&self, target_sample_rate: u32) -> Option<AudioFrame<'_>> {
 536        assert_eq!(self.sample_rate, target_sample_rate);
 537        let buf = self.buffer.lock().pop_front()?;
 538        Some(AudioFrame {
 539            data: Cow::Owned(buf),
 540            sample_rate: self.sample_rate,
 541            num_channels: self.num_channels,
 542            samples_per_channel: self.sample_rate / 100,
 543        })
 544    }
 545}
 546
 547pub fn play_remote_video_track(
 548    track: &crate::RemoteVideoTrack,
 549) -> impl Stream<Item = RemoteVideoFrame> + use<> {
 550    #[cfg(target_os = "macos")]
 551    {
 552        let mut pool = None;
 553        let most_recent_frame_size = (0, 0);
 554        NativeVideoStream::new(track.0.rtc_track()).filter_map(move |frame| {
 555            if pool == None
 556                || most_recent_frame_size != (frame.buffer.width(), frame.buffer.height())
 557            {
 558                pool = create_buffer_pool(frame.buffer.width(), frame.buffer.height()).log_err();
 559            }
 560            let pool = pool.clone();
 561            async move {
 562                if frame.buffer.width() < 10 && frame.buffer.height() < 10 {
 563                    // when the remote stops sharing, we get an 8x8 black image.
 564                    // In a lil bit, the unpublish will come through and close the view,
 565                    // but until then, don't flash black.
 566                    return None;
 567                }
 568
 569                video_frame_buffer_from_webrtc(pool?, frame.buffer)
 570            }
 571        })
 572    }
 573    #[cfg(not(target_os = "macos"))]
 574    {
 575        NativeVideoStream::new(track.0.rtc_track())
 576            .filter_map(|frame| async move { video_frame_buffer_from_webrtc(frame.buffer) })
 577    }
 578}
 579
 580#[cfg(target_os = "macos")]
 581fn create_buffer_pool(
 582    width: u32,
 583    height: u32,
 584) -> Result<core_video::pixel_buffer_pool::CVPixelBufferPool> {
 585    use core_foundation::{base::TCFType, number::CFNumber, string::CFString};
 586    use core_video::pixel_buffer;
 587    use core_video::{
 588        pixel_buffer::kCVPixelFormatType_420YpCbCr8BiPlanarFullRange,
 589        pixel_buffer_io_surface::kCVPixelBufferIOSurfaceCoreAnimationCompatibilityKey,
 590        pixel_buffer_pool::{self},
 591    };
 592
 593    let width_key: CFString =
 594        unsafe { CFString::wrap_under_get_rule(pixel_buffer::kCVPixelBufferWidthKey) };
 595    let height_key: CFString =
 596        unsafe { CFString::wrap_under_get_rule(pixel_buffer::kCVPixelBufferHeightKey) };
 597    let animation_key: CFString = unsafe {
 598        CFString::wrap_under_get_rule(kCVPixelBufferIOSurfaceCoreAnimationCompatibilityKey)
 599    };
 600    let format_key: CFString =
 601        unsafe { CFString::wrap_under_get_rule(pixel_buffer::kCVPixelBufferPixelFormatTypeKey) };
 602
 603    let yes: CFNumber = 1.into();
 604    let width: CFNumber = (width as i32).into();
 605    let height: CFNumber = (height as i32).into();
 606    let format: CFNumber = (kCVPixelFormatType_420YpCbCr8BiPlanarFullRange as i64).into();
 607
 608    let buffer_attributes = core_foundation::dictionary::CFDictionary::from_CFType_pairs(&[
 609        (width_key, width.into_CFType()),
 610        (height_key, height.into_CFType()),
 611        (animation_key, yes.into_CFType()),
 612        (format_key, format.into_CFType()),
 613    ]);
 614
 615    pixel_buffer_pool::CVPixelBufferPool::new(None, Some(&buffer_attributes)).map_err(|cv_return| {
 616        anyhow::anyhow!("failed to create pixel buffer pool: CVReturn({cv_return})",)
 617    })
 618}
 619
 620#[cfg(target_os = "macos")]
 621pub type RemoteVideoFrame = core_video::pixel_buffer::CVPixelBuffer;
 622
 623#[cfg(target_os = "macos")]
 624fn video_frame_buffer_from_webrtc(
 625    pool: core_video::pixel_buffer_pool::CVPixelBufferPool,
 626    buffer: Box<dyn VideoBuffer>,
 627) -> Option<RemoteVideoFrame> {
 628    use core_foundation::base::TCFType;
 629    use core_video::{pixel_buffer::CVPixelBuffer, r#return::kCVReturnSuccess};
 630    use livekit::webrtc::native::yuv_helper::i420_to_nv12;
 631
 632    if let Some(native) = buffer.as_native() {
 633        let pixel_buffer = native.get_cv_pixel_buffer();
 634        if pixel_buffer.is_null() {
 635            return None;
 636        }
 637        return unsafe { Some(CVPixelBuffer::wrap_under_get_rule(pixel_buffer as _)) };
 638    }
 639
 640    let i420_buffer = buffer.as_i420()?;
 641    let pixel_buffer = pool.create_pixel_buffer().log_err()?;
 642
 643    let image_buffer = unsafe {
 644        if pixel_buffer.lock_base_address(0) != kCVReturnSuccess {
 645            return None;
 646        }
 647
 648        let dst_y = pixel_buffer.get_base_address_of_plane(0);
 649        let dst_y_stride = pixel_buffer.get_bytes_per_row_of_plane(0);
 650        let dst_y_len = pixel_buffer.get_height_of_plane(0) * dst_y_stride;
 651        let dst_uv = pixel_buffer.get_base_address_of_plane(1);
 652        let dst_uv_stride = pixel_buffer.get_bytes_per_row_of_plane(1);
 653        let dst_uv_len = pixel_buffer.get_height_of_plane(1) * dst_uv_stride;
 654        let width = pixel_buffer.get_width();
 655        let height = pixel_buffer.get_height();
 656        let dst_y_buffer = std::slice::from_raw_parts_mut(dst_y as *mut u8, dst_y_len);
 657        let dst_uv_buffer = std::slice::from_raw_parts_mut(dst_uv as *mut u8, dst_uv_len);
 658
 659        let (stride_y, stride_u, stride_v) = i420_buffer.strides();
 660        let (src_y, src_u, src_v) = i420_buffer.data();
 661        i420_to_nv12(
 662            src_y,
 663            stride_y,
 664            src_u,
 665            stride_u,
 666            src_v,
 667            stride_v,
 668            dst_y_buffer,
 669            dst_y_stride as u32,
 670            dst_uv_buffer,
 671            dst_uv_stride as u32,
 672            width as i32,
 673            height as i32,
 674        );
 675
 676        if pixel_buffer.unlock_base_address(0) != kCVReturnSuccess {
 677            return None;
 678        }
 679
 680        pixel_buffer
 681    };
 682
 683    Some(image_buffer)
 684}
 685
 686#[cfg(not(target_os = "macos"))]
 687pub type RemoteVideoFrame = Arc<gpui::RenderImage>;
 688
 689#[cfg(not(target_os = "macos"))]
 690fn video_frame_buffer_from_webrtc(buffer: Box<dyn VideoBuffer>) -> Option<RemoteVideoFrame> {
 691    use gpui::RenderImage;
 692    use image::{Frame, RgbaImage};
 693    use livekit::webrtc::prelude::VideoFormatType;
 694    use smallvec::SmallVec;
 695    use std::alloc::{Layout, alloc};
 696
 697    let width = buffer.width();
 698    let height = buffer.height();
 699    let stride = width * 4;
 700    let byte_len = (stride * height) as usize;
 701    let argb_image = unsafe {
 702        // Motivation for this unsafe code is to avoid initializing the frame data, since to_argb
 703        // will write all bytes anyway.
 704        let start_ptr = alloc(Layout::array::<u8>(byte_len).log_err()?);
 705        if start_ptr.is_null() {
 706            return None;
 707        }
 708        let argb_frame_slice = std::slice::from_raw_parts_mut(start_ptr, byte_len);
 709        buffer.to_argb(
 710            VideoFormatType::ARGB,
 711            argb_frame_slice,
 712            stride,
 713            width as i32,
 714            height as i32,
 715        );
 716        Vec::from_raw_parts(start_ptr, byte_len, byte_len)
 717    };
 718
 719    // TODO: Unclear why providing argb_image to RgbaImage works properly.
 720    let image = RgbaImage::from_raw(width, height, argb_image)
 721        .with_context(|| "Bug: not enough bytes allocated for image.")
 722        .log_err()?;
 723
 724    Some(Arc::new(RenderImage::new(SmallVec::from_elem(
 725        Frame::new(image),
 726        1,
 727    ))))
 728}
 729
 730#[cfg(target_os = "macos")]
 731fn video_frame_buffer_to_webrtc(frame: ScreenCaptureFrame) -> Option<impl AsRef<dyn VideoBuffer>> {
 732    use livekit::webrtc;
 733
 734    let pixel_buffer = frame.0.as_concrete_TypeRef();
 735    std::mem::forget(frame.0);
 736    unsafe {
 737        Some(webrtc::video_frame::native::NativeBuffer::from_cv_pixel_buffer(pixel_buffer as _))
 738    }
 739}
 740
 741#[cfg(not(target_os = "macos"))]
 742fn video_frame_buffer_to_webrtc(frame: ScreenCaptureFrame) -> Option<impl AsRef<dyn VideoBuffer>> {
 743    use libwebrtc::native::yuv_helper::{abgr_to_nv12, argb_to_nv12};
 744    use livekit::webrtc::prelude::NV12Buffer;
 745    match frame.0 {
 746        scap::frame::Frame::BGRx(frame) => {
 747            let mut buffer = NV12Buffer::new(frame.width as u32, frame.height as u32);
 748            let (stride_y, stride_uv) = buffer.strides();
 749            let (data_y, data_uv) = buffer.data_mut();
 750            argb_to_nv12(
 751                &frame.data,
 752                frame.width as u32 * 4,
 753                data_y,
 754                stride_y,
 755                data_uv,
 756                stride_uv,
 757                frame.width,
 758                frame.height,
 759            );
 760            Some(buffer)
 761        }
 762        scap::frame::Frame::RGBx(frame) => {
 763            let mut buffer = NV12Buffer::new(frame.width as u32, frame.height as u32);
 764            let (stride_y, stride_uv) = buffer.strides();
 765            let (data_y, data_uv) = buffer.data_mut();
 766            abgr_to_nv12(
 767                &frame.data,
 768                frame.width as u32 * 4,
 769                data_y,
 770                stride_y,
 771                data_uv,
 772                stride_uv,
 773                frame.width,
 774                frame.height,
 775            );
 776            Some(buffer)
 777        }
 778        scap::frame::Frame::YUVFrame(yuvframe) => {
 779            let mut buffer = NV12Buffer::with_strides(
 780                yuvframe.width as u32,
 781                yuvframe.height as u32,
 782                yuvframe.luminance_stride as u32,
 783                yuvframe.chrominance_stride as u32,
 784            );
 785            let (luminance, chrominance) = buffer.data_mut();
 786            luminance.copy_from_slice(yuvframe.luminance_bytes.as_slice());
 787            chrominance.copy_from_slice(yuvframe.chrominance_bytes.as_slice());
 788            Some(buffer)
 789        }
 790        _ => {
 791            log::error!(
 792                "Expected BGRx or YUV frame from scap screen capture but got some other format."
 793            );
 794            None
 795        }
 796    }
 797}
 798
 799trait DeviceChangeListenerApi: Stream<Item = ()> + Sized {
 800    fn new(input: bool) -> Result<Self>;
 801}
 802
 803#[cfg(target_os = "macos")]
 804mod macos {
 805
 806    use coreaudio::sys::{
 807        AudioObjectAddPropertyListener, AudioObjectID, AudioObjectPropertyAddress,
 808        AudioObjectRemovePropertyListener, OSStatus, kAudioHardwarePropertyDefaultInputDevice,
 809        kAudioHardwarePropertyDefaultOutputDevice, kAudioObjectPropertyElementMaster,
 810        kAudioObjectPropertyScopeGlobal, kAudioObjectSystemObject,
 811    };
 812    use futures::{StreamExt, channel::mpsc::UnboundedReceiver};
 813
 814    /// Implementation from: https://github.com/zed-industries/cpal/blob/fd8bc2fd39f1f5fdee5a0690656caff9a26d9d50/src/host/coreaudio/macos/property_listener.rs#L15
 815    pub struct CoreAudioDefaultDeviceChangeListener {
 816        rx: UnboundedReceiver<()>,
 817        callback: Box<PropertyListenerCallbackWrapper>,
 818        input: bool,
 819        device_id: AudioObjectID, // Store the device ID to properly remove listeners
 820    }
 821
 822    trait _AssertSend: Send {}
 823    impl _AssertSend for CoreAudioDefaultDeviceChangeListener {}
 824
 825    struct PropertyListenerCallbackWrapper(Box<dyn FnMut() + Send>);
 826
 827    unsafe extern "C" fn property_listener_handler_shim(
 828        _: AudioObjectID,
 829        _: u32,
 830        _: *const AudioObjectPropertyAddress,
 831        callback: *mut ::std::os::raw::c_void,
 832    ) -> OSStatus {
 833        let wrapper = callback as *mut PropertyListenerCallbackWrapper;
 834        unsafe { (*wrapper).0() };
 835        0
 836    }
 837
 838    impl super::DeviceChangeListenerApi for CoreAudioDefaultDeviceChangeListener {
 839        fn new(input: bool) -> anyhow::Result<Self> {
 840            let (tx, rx) = futures::channel::mpsc::unbounded();
 841
 842            let callback = Box::new(PropertyListenerCallbackWrapper(Box::new(move || {
 843                tx.unbounded_send(()).ok();
 844            })));
 845
 846            // Get the current default device ID
 847            let device_id = unsafe {
 848                // Listen for default device changes
 849                coreaudio::Error::from_os_status(AudioObjectAddPropertyListener(
 850                    kAudioObjectSystemObject,
 851                    &AudioObjectPropertyAddress {
 852                        mSelector: if input {
 853                            kAudioHardwarePropertyDefaultInputDevice
 854                        } else {
 855                            kAudioHardwarePropertyDefaultOutputDevice
 856                        },
 857                        mScope: kAudioObjectPropertyScopeGlobal,
 858                        mElement: kAudioObjectPropertyElementMaster,
 859                    },
 860                    Some(property_listener_handler_shim),
 861                    &*callback as *const _ as *mut _,
 862                ))?;
 863
 864                // Also listen for changes to the device configuration
 865                let device_id = if input {
 866                    let mut input_device: AudioObjectID = 0;
 867                    let mut prop_size = std::mem::size_of::<AudioObjectID>() as u32;
 868                    let result = coreaudio::sys::AudioObjectGetPropertyData(
 869                        kAudioObjectSystemObject,
 870                        &AudioObjectPropertyAddress {
 871                            mSelector: kAudioHardwarePropertyDefaultInputDevice,
 872                            mScope: kAudioObjectPropertyScopeGlobal,
 873                            mElement: kAudioObjectPropertyElementMaster,
 874                        },
 875                        0,
 876                        std::ptr::null(),
 877                        &mut prop_size as *mut _,
 878                        &mut input_device as *mut _ as *mut _,
 879                    );
 880                    if result != 0 {
 881                        log::warn!("Failed to get default input device ID");
 882                        0
 883                    } else {
 884                        input_device
 885                    }
 886                } else {
 887                    let mut output_device: AudioObjectID = 0;
 888                    let mut prop_size = std::mem::size_of::<AudioObjectID>() as u32;
 889                    let result = coreaudio::sys::AudioObjectGetPropertyData(
 890                        kAudioObjectSystemObject,
 891                        &AudioObjectPropertyAddress {
 892                            mSelector: kAudioHardwarePropertyDefaultOutputDevice,
 893                            mScope: kAudioObjectPropertyScopeGlobal,
 894                            mElement: kAudioObjectPropertyElementMaster,
 895                        },
 896                        0,
 897                        std::ptr::null(),
 898                        &mut prop_size as *mut _,
 899                        &mut output_device as *mut _ as *mut _,
 900                    );
 901                    if result != 0 {
 902                        log::warn!("Failed to get default output device ID");
 903                        0
 904                    } else {
 905                        output_device
 906                    }
 907                };
 908
 909                if device_id != 0 {
 910                    // Listen for format changes on the device
 911                    coreaudio::Error::from_os_status(AudioObjectAddPropertyListener(
 912                        device_id,
 913                        &AudioObjectPropertyAddress {
 914                            mSelector: coreaudio::sys::kAudioDevicePropertyStreamFormat,
 915                            mScope: if input {
 916                                coreaudio::sys::kAudioObjectPropertyScopeInput
 917                            } else {
 918                                coreaudio::sys::kAudioObjectPropertyScopeOutput
 919                            },
 920                            mElement: kAudioObjectPropertyElementMaster,
 921                        },
 922                        Some(property_listener_handler_shim),
 923                        &*callback as *const _ as *mut _,
 924                    ))?;
 925                }
 926
 927                device_id
 928            };
 929
 930            Ok(Self {
 931                rx,
 932                callback,
 933                input,
 934                device_id,
 935            })
 936        }
 937    }
 938
 939    impl Drop for CoreAudioDefaultDeviceChangeListener {
 940        fn drop(&mut self) {
 941            unsafe {
 942                // Remove the system-level property listener
 943                AudioObjectRemovePropertyListener(
 944                    kAudioObjectSystemObject,
 945                    &AudioObjectPropertyAddress {
 946                        mSelector: if self.input {
 947                            kAudioHardwarePropertyDefaultInputDevice
 948                        } else {
 949                            kAudioHardwarePropertyDefaultOutputDevice
 950                        },
 951                        mScope: kAudioObjectPropertyScopeGlobal,
 952                        mElement: kAudioObjectPropertyElementMaster,
 953                    },
 954                    Some(property_listener_handler_shim),
 955                    &*self.callback as *const _ as *mut _,
 956                );
 957
 958                // Remove the device-specific property listener if we have a valid device ID
 959                if self.device_id != 0 {
 960                    AudioObjectRemovePropertyListener(
 961                        self.device_id,
 962                        &AudioObjectPropertyAddress {
 963                            mSelector: coreaudio::sys::kAudioDevicePropertyStreamFormat,
 964                            mScope: if self.input {
 965                                coreaudio::sys::kAudioObjectPropertyScopeInput
 966                            } else {
 967                                coreaudio::sys::kAudioObjectPropertyScopeOutput
 968                            },
 969                            mElement: kAudioObjectPropertyElementMaster,
 970                        },
 971                        Some(property_listener_handler_shim),
 972                        &*self.callback as *const _ as *mut _,
 973                    );
 974                }
 975            }
 976        }
 977    }
 978
 979    impl futures::Stream for CoreAudioDefaultDeviceChangeListener {
 980        type Item = ();
 981
 982        fn poll_next(
 983            mut self: std::pin::Pin<&mut Self>,
 984            cx: &mut std::task::Context<'_>,
 985        ) -> std::task::Poll<Option<Self::Item>> {
 986            self.rx.poll_next_unpin(cx)
 987        }
 988    }
 989}
 990
 991#[cfg(target_os = "macos")]
 992type DeviceChangeListener = macos::CoreAudioDefaultDeviceChangeListener;
 993
 994#[cfg(not(target_os = "macos"))]
 995mod noop_change_listener {
 996    use std::task::Poll;
 997
 998    use super::DeviceChangeListenerApi;
 999
1000    pub struct NoopOutputDeviceChangelistener {}
1001
1002    impl DeviceChangeListenerApi for NoopOutputDeviceChangelistener {
1003        fn new(_input: bool) -> anyhow::Result<Self> {
1004            Ok(NoopOutputDeviceChangelistener {})
1005        }
1006    }
1007
1008    impl futures::Stream for NoopOutputDeviceChangelistener {
1009        type Item = ();
1010
1011        fn poll_next(
1012            self: std::pin::Pin<&mut Self>,
1013            _cx: &mut std::task::Context<'_>,
1014        ) -> Poll<Option<Self::Item>> {
1015            Poll::Pending
1016        }
1017    }
1018}
1019
1020#[cfg(not(target_os = "macos"))]
1021type DeviceChangeListener = noop_change_listener::NoopOutputDeviceChangelistener;