playback.rs

   1use anyhow::{Context as _, Result};
   2
   3use audio::{AudioSettings, CHANNEL_COUNT, LEGACY_CHANNEL_COUNT, LEGACY_SAMPLE_RATE, SAMPLE_RATE};
   4use cpal::traits::{DeviceTrait, StreamTrait as _};
   5use futures::channel::mpsc::UnboundedSender;
   6use futures::{Stream, StreamExt as _};
   7use gpui::{
   8    AsyncApp, BackgroundExecutor, ScreenCaptureFrame, ScreenCaptureSource, ScreenCaptureStream,
   9    Task,
  10};
  11use libwebrtc::native::{apm, audio_mixer, audio_resampler};
  12use livekit::track;
  13
  14use livekit::webrtc::{
  15    audio_frame::AudioFrame,
  16    audio_source::{AudioSourceOptions, RtcAudioSource, native::NativeAudioSource},
  17    audio_stream::native::NativeAudioStream,
  18    video_frame::{VideoBuffer, VideoFrame, VideoRotation},
  19    video_source::{RtcVideoSource, VideoResolution, native::NativeVideoSource},
  20    video_stream::native::NativeVideoStream,
  21};
  22use log::info;
  23use parking_lot::Mutex;
  24use rodio::Source;
  25use serde::{Deserialize, Serialize};
  26use settings::Settings;
  27use std::cell::RefCell;
  28use std::sync::Weak;
  29use std::sync::atomic::{AtomicBool, AtomicI32, Ordering};
  30use std::time::Duration;
  31use std::{borrow::Cow, collections::VecDeque, sync::Arc, thread};
  32use util::{ResultExt as _, maybe};
  33
  34mod source;
  35
  36pub(crate) struct AudioStack {
  37    executor: BackgroundExecutor,
  38    apm: Arc<Mutex<apm::AudioProcessingModule>>,
  39    mixer: Arc<Mutex<audio_mixer::AudioMixer>>,
  40    _output_task: RefCell<Weak<Task<()>>>,
  41    next_ssrc: AtomicI32,
  42}
  43
  44pub(crate) fn play_remote_audio_track(
  45    track: &livekit::track::RemoteAudioTrack,
  46    speaker: Speaker,
  47    cx: &mut gpui::App,
  48) -> Result<AudioStream> {
  49    info!("speaker: {speaker:?}");
  50    let stream =
  51        source::LiveKitStream::new(cx.background_executor(), track, speaker.sends_legacy_audio);
  52
  53    let stop_handle = Arc::new(AtomicBool::new(false));
  54    let stop_handle_clone = stop_handle.clone();
  55    let stream = stream
  56        .stoppable()
  57        .periodic_access(Duration::from_millis(50), move |s| {
  58            if stop_handle.load(Ordering::Relaxed) {
  59                s.stop();
  60            }
  61        });
  62
  63    info!("sample_rate: {:?}", stream.sample_rate());
  64    info!("channel_count: {:?}", stream.channels());
  65    audio::Audio::play_voip_stream(stream, speaker.name, speaker.is_staff, cx)
  66        .context("Could not play audio")?;
  67
  68    let on_drop = util::defer(move || {
  69        stop_handle_clone.store(true, Ordering::Relaxed);
  70    });
  71    Ok(AudioStream::Output {
  72        _drop: Box::new(on_drop),
  73    })
  74}
  75
  76impl AudioStack {
  77    pub(crate) fn new(executor: BackgroundExecutor) -> Self {
  78        let apm = Arc::new(Mutex::new(apm::AudioProcessingModule::new(
  79            true, true, true, true,
  80        )));
  81        let mixer = Arc::new(Mutex::new(audio_mixer::AudioMixer::new()));
  82        Self {
  83            executor,
  84            apm,
  85            mixer,
  86            _output_task: RefCell::new(Weak::new()),
  87            next_ssrc: AtomicI32::new(1),
  88        }
  89    }
  90
  91    pub(crate) fn play_remote_audio_track(
  92        &self,
  93        track: &livekit::track::RemoteAudioTrack,
  94    ) -> AudioStream {
  95        let output_task = self.start_output();
  96
  97        let next_ssrc = self.next_ssrc.fetch_add(1, Ordering::Relaxed);
  98        let source = AudioMixerSource {
  99            ssrc: next_ssrc,
 100            sample_rate: LEGACY_SAMPLE_RATE.get(),
 101            num_channels: LEGACY_CHANNEL_COUNT.get() as u32,
 102            buffer: Arc::default(),
 103        };
 104        self.mixer.lock().add_source(source.clone());
 105
 106        let mut stream = NativeAudioStream::new(
 107            track.rtc_track(),
 108            source.sample_rate as i32,
 109            source.num_channels as i32,
 110        );
 111
 112        let receive_task = self.executor.spawn({
 113            let source = source.clone();
 114            async move {
 115                while let Some(frame) = stream.next().await {
 116                    source.receive(frame);
 117                }
 118            }
 119        });
 120
 121        let mixer = self.mixer.clone();
 122        let on_drop = util::defer(move || {
 123            mixer.lock().remove_source(source.ssrc);
 124            drop(receive_task);
 125            drop(output_task);
 126        });
 127
 128        AudioStream::Output {
 129            _drop: Box::new(on_drop),
 130        }
 131    }
 132
 133    fn start_output(&self) -> Arc<Task<()>> {
 134        if let Some(task) = self._output_task.borrow().upgrade() {
 135            return task;
 136        }
 137        let task = Arc::new(self.executor.spawn({
 138            let apm = self.apm.clone();
 139            let mixer = self.mixer.clone();
 140            async move {
 141                Self::play_output(
 142                    apm,
 143                    mixer,
 144                    LEGACY_SAMPLE_RATE.get(),
 145                    LEGACY_CHANNEL_COUNT.get().into(),
 146                )
 147                .await
 148                .log_err();
 149            }
 150        }));
 151        *self._output_task.borrow_mut() = Arc::downgrade(&task);
 152        task
 153    }
 154
 155    pub(crate) fn capture_local_microphone_track(
 156        &self,
 157        user_name: String,
 158        is_staff: bool,
 159        cx: &AsyncApp,
 160    ) -> Result<(crate::LocalAudioTrack, AudioStream)> {
 161        let legacy_audio_compatible =
 162            AudioSettings::try_read_global(cx, |setting| setting.legacy_audio_compatible)
 163                .unwrap_or(true);
 164
 165        let source = if legacy_audio_compatible {
 166            NativeAudioSource::new(
 167                // n.b. this struct's options are always ignored, noise cancellation is provided by apm.
 168                AudioSourceOptions::default(),
 169                LEGACY_SAMPLE_RATE.get(),
 170                LEGACY_CHANNEL_COUNT.get().into(),
 171                10,
 172            )
 173        } else {
 174            NativeAudioSource::new(
 175                // n.b. this struct's options are always ignored, noise cancellation is provided by apm.
 176                AudioSourceOptions::default(),
 177                SAMPLE_RATE.get(),
 178                CHANNEL_COUNT.get().into(),
 179                10,
 180            )
 181        };
 182
 183        let speaker = Speaker {
 184            name: user_name,
 185            is_staff,
 186            sends_legacy_audio: legacy_audio_compatible,
 187        };
 188        log::info!("Microphone speaker: {speaker:?}");
 189        let track_name = serde_urlencoded::to_string(speaker)
 190            .context("Could not encode user information in track name")?;
 191
 192        let track = track::LocalAudioTrack::create_audio_track(
 193            &track_name,
 194            RtcAudioSource::Native(source.clone()),
 195        );
 196
 197        let apm = self.apm.clone();
 198
 199        let (frame_tx, mut frame_rx) = futures::channel::mpsc::unbounded();
 200        let transmit_task = self.executor.spawn({
 201            async move {
 202                while let Some(frame) = frame_rx.next().await {
 203                    source.capture_frame(&frame).await.log_err();
 204                }
 205            }
 206        });
 207        let rodio_pipeline =
 208            AudioSettings::try_read_global(cx, |setting| setting.rodio_audio).unwrap_or_default();
 209        let capture_task = if rodio_pipeline {
 210            info!("Using experimental.rodio_audio audio pipeline");
 211            let voip_parts = audio::VoipParts::new(cx)?;
 212            // Audio needs to run real-time and should never be paused. That is
 213            // why we are using a normal std::thread and not a background task
 214            thread::Builder::new()
 215                .name("MicrophoneToLivekit".to_string())
 216                .spawn(move || {
 217                    // microphone is non send on mac
 218                    let microphone = match audio::Audio::open_microphone(voip_parts) {
 219                        Ok(m) => m,
 220                        Err(e) => {
 221                            log::error!("Could not open microphone: {e}");
 222                            return;
 223                        }
 224                    };
 225                    send_to_livekit(frame_tx, microphone);
 226                })
 227                .expect("should be able to spawn threads");
 228            Task::ready(Ok(()))
 229        } else {
 230            self.executor.spawn(async move {
 231                Self::capture_input(
 232                    apm,
 233                    frame_tx,
 234                    LEGACY_SAMPLE_RATE.get(),
 235                    LEGACY_CHANNEL_COUNT.get().into(),
 236                )
 237                .await
 238            })
 239        };
 240
 241        let on_drop = util::defer(|| {
 242            drop(transmit_task);
 243            drop(capture_task);
 244        });
 245        Ok((
 246            super::LocalAudioTrack(track),
 247            AudioStream::Output {
 248                _drop: Box::new(on_drop),
 249            },
 250        ))
 251    }
 252
 253    async fn play_output(
 254        apm: Arc<Mutex<apm::AudioProcessingModule>>,
 255        mixer: Arc<Mutex<audio_mixer::AudioMixer>>,
 256        sample_rate: u32,
 257        num_channels: u32,
 258    ) -> Result<()> {
 259        // Prevent App Nap from throttling audio playback on macOS.
 260        // This guard is held for the entire duration of audio output.
 261        #[cfg(target_os = "macos")]
 262        let _prevent_app_nap = PreventAppNapGuard::new();
 263
 264        loop {
 265            let mut device_change_listener = DeviceChangeListener::new(false)?;
 266            let (output_device, output_config) = crate::default_device(false)?;
 267            let (end_on_drop_tx, end_on_drop_rx) = std::sync::mpsc::channel::<()>();
 268            let mixer = mixer.clone();
 269            let apm = apm.clone();
 270            let mut resampler = audio_resampler::AudioResampler::default();
 271            let mut buf = Vec::new();
 272
 273            thread::Builder::new()
 274                .name("AudioPlayback".to_owned())
 275                .spawn(move || {
 276                    let output_stream = output_device.build_output_stream(
 277                        &output_config.config(),
 278                        {
 279                            move |mut data, _info| {
 280                                while data.len() > 0 {
 281                                    if data.len() <= buf.len() {
 282                                        let rest = buf.split_off(data.len());
 283                                        data.copy_from_slice(&buf);
 284                                        buf = rest;
 285                                        return;
 286                                    }
 287                                    if buf.len() > 0 {
 288                                        let (prefix, suffix) = data.split_at_mut(buf.len());
 289                                        prefix.copy_from_slice(&buf);
 290                                        data = suffix;
 291                                    }
 292
 293                                    let mut mixer = mixer.lock();
 294                                    let mixed = mixer.mix(output_config.channels() as usize);
 295                                    let sampled = resampler.remix_and_resample(
 296                                        mixed,
 297                                        sample_rate / 100,
 298                                        num_channels,
 299                                        sample_rate,
 300                                        output_config.channels() as u32,
 301                                        output_config.sample_rate().0,
 302                                    );
 303                                    buf = sampled.to_vec();
 304                                    apm.lock()
 305                                        .process_reverse_stream(
 306                                            &mut buf,
 307                                            output_config.sample_rate().0 as i32,
 308                                            output_config.channels() as i32,
 309                                        )
 310                                        .ok();
 311                                }
 312                            }
 313                        },
 314                        |error| log::error!("error playing audio track: {:?}", error),
 315                        Some(Duration::from_millis(100)),
 316                    );
 317
 318                    let Some(output_stream) = output_stream.log_err() else {
 319                        return;
 320                    };
 321
 322                    output_stream.play().log_err();
 323                    // Block forever to keep the output stream alive
 324                    end_on_drop_rx.recv().ok();
 325                })
 326                .unwrap();
 327
 328            device_change_listener.next().await;
 329            drop(end_on_drop_tx)
 330        }
 331    }
 332
 333    async fn capture_input(
 334        apm: Arc<Mutex<apm::AudioProcessingModule>>,
 335        frame_tx: UnboundedSender<AudioFrame<'static>>,
 336        sample_rate: u32,
 337        num_channels: u32,
 338    ) -> Result<()> {
 339        loop {
 340            let mut device_change_listener = DeviceChangeListener::new(true)?;
 341            let (device, config) = crate::default_device(true)?;
 342            let (end_on_drop_tx, end_on_drop_rx) = std::sync::mpsc::channel::<()>();
 343            let apm = apm.clone();
 344            let frame_tx = frame_tx.clone();
 345            let mut resampler = audio_resampler::AudioResampler::default();
 346
 347            thread::Builder::new()
 348                .name("AudioCapture".to_owned())
 349                .spawn(move || {
 350                    maybe!({
 351                        if let Some(name) = device.name().ok() {
 352                            log::info!("Using microphone: {}", name)
 353                        } else {
 354                            log::info!("Using microphone: <unknown>");
 355                        }
 356
 357                        let ten_ms_buffer_size =
 358                            (config.channels() as u32 * config.sample_rate().0 / 100) as usize;
 359                        let mut buf: Vec<i16> = Vec::with_capacity(ten_ms_buffer_size);
 360
 361                        let stream = device
 362                            .build_input_stream_raw(
 363                                &config.config(),
 364                                config.sample_format(),
 365                                move |data, _: &_| {
 366                                    let data = crate::get_sample_data(config.sample_format(), data)
 367                                        .log_err();
 368                                    let Some(data) = data else {
 369                                        return;
 370                                    };
 371                                    let mut data = data.as_slice();
 372
 373                                    while data.len() > 0 {
 374                                        let remainder =
 375                                            (buf.capacity() - buf.len()).min(data.len());
 376                                        buf.extend_from_slice(&data[..remainder]);
 377                                        data = &data[remainder..];
 378
 379                                        if buf.capacity() == buf.len() {
 380                                            let mut sampled = resampler
 381                                                .remix_and_resample(
 382                                                    buf.as_slice(),
 383                                                    config.sample_rate().0 / 100,
 384                                                    config.channels() as u32,
 385                                                    config.sample_rate().0,
 386                                                    num_channels,
 387                                                    sample_rate,
 388                                                )
 389                                                .to_owned();
 390                                            apm.lock()
 391                                                .process_stream(
 392                                                    &mut sampled,
 393                                                    sample_rate as i32,
 394                                                    num_channels as i32,
 395                                                )
 396                                                .log_err();
 397                                            buf.clear();
 398                                            frame_tx
 399                                                .unbounded_send(AudioFrame {
 400                                                    data: Cow::Owned(sampled),
 401                                                    sample_rate,
 402                                                    num_channels,
 403                                                    samples_per_channel: sample_rate / 100,
 404                                                })
 405                                                .ok();
 406                                        }
 407                                    }
 408                                },
 409                                |err| log::error!("error capturing audio track: {:?}", err),
 410                                Some(Duration::from_millis(100)),
 411                            )
 412                            .context("failed to build input stream")?;
 413
 414                        stream.play()?;
 415                        // Keep the thread alive and holding onto the `stream`
 416                        end_on_drop_rx.recv().ok();
 417                        anyhow::Ok(Some(()))
 418                    })
 419                    .log_err();
 420                })
 421                .unwrap();
 422
 423            device_change_listener.next().await;
 424            drop(end_on_drop_tx)
 425        }
 426    }
 427}
 428
 429#[derive(Serialize, Deserialize, Debug)]
 430pub struct Speaker {
 431    pub name: String,
 432    pub is_staff: bool,
 433    pub sends_legacy_audio: bool,
 434}
 435
 436fn send_to_livekit(frame_tx: UnboundedSender<AudioFrame<'static>>, mut microphone: impl Source) {
 437    use cpal::Sample;
 438    let sample_rate = microphone.sample_rate().get();
 439    let num_channels = microphone.channels().get() as u32;
 440    let buffer_size = sample_rate / 100 * num_channels;
 441
 442    loop {
 443        let sampled: Vec<_> = microphone
 444            .by_ref()
 445            .take(buffer_size as usize)
 446            .map(|s| s.to_sample())
 447            .collect();
 448
 449        if frame_tx
 450            .unbounded_send(AudioFrame {
 451                sample_rate,
 452                num_channels,
 453                samples_per_channel: sampled.len() as u32 / num_channels,
 454                data: Cow::Owned(sampled),
 455            })
 456            .is_err()
 457        {
 458            // must rx has dropped or is not consuming
 459            break;
 460        }
 461    }
 462}
 463
 464use super::LocalVideoTrack;
 465
 466pub enum AudioStream {
 467    Input { _task: Task<()> },
 468    Output { _drop: Box<dyn std::any::Any> },
 469}
 470
 471pub(crate) async fn capture_local_video_track(
 472    capture_source: &dyn ScreenCaptureSource,
 473    cx: &mut gpui::AsyncApp,
 474) -> Result<(crate::LocalVideoTrack, Box<dyn ScreenCaptureStream>)> {
 475    let metadata = capture_source.metadata()?;
 476    let track_source = gpui_tokio::Tokio::spawn(cx, async move {
 477        NativeVideoSource::new(VideoResolution {
 478            width: metadata.resolution.width.0 as u32,
 479            height: metadata.resolution.height.0 as u32,
 480        })
 481    })?
 482    .await?;
 483
 484    let capture_stream = capture_source
 485        .stream(cx.foreground_executor(), {
 486            let track_source = track_source.clone();
 487            Box::new(move |frame| {
 488                if let Some(buffer) = video_frame_buffer_to_webrtc(frame) {
 489                    track_source.capture_frame(&VideoFrame {
 490                        rotation: VideoRotation::VideoRotation0,
 491                        timestamp_us: 0,
 492                        buffer,
 493                    });
 494                }
 495            })
 496        })
 497        .await??;
 498
 499    Ok((
 500        LocalVideoTrack(track::LocalVideoTrack::create_video_track(
 501            "screen share",
 502            RtcVideoSource::Native(track_source),
 503        )),
 504        capture_stream,
 505    ))
 506}
 507
 508#[derive(Clone)]
 509struct AudioMixerSource {
 510    ssrc: i32,
 511    sample_rate: u32,
 512    num_channels: u32,
 513    buffer: Arc<Mutex<VecDeque<Vec<i16>>>>,
 514}
 515
 516impl AudioMixerSource {
 517    fn receive(&self, frame: AudioFrame) {
 518        assert_eq!(
 519            frame.data.len() as u32,
 520            self.sample_rate * self.num_channels / 100
 521        );
 522
 523        let mut buffer = self.buffer.lock();
 524        buffer.push_back(frame.data.to_vec());
 525        while buffer.len() > 10 {
 526            buffer.pop_front();
 527        }
 528    }
 529}
 530
 531impl libwebrtc::native::audio_mixer::AudioMixerSource for AudioMixerSource {
 532    fn ssrc(&self) -> i32 {
 533        self.ssrc
 534    }
 535
 536    fn preferred_sample_rate(&self) -> u32 {
 537        self.sample_rate
 538    }
 539
 540    fn get_audio_frame_with_info<'a>(&self, target_sample_rate: u32) -> Option<AudioFrame<'_>> {
 541        assert_eq!(self.sample_rate, target_sample_rate);
 542        let buf = self.buffer.lock().pop_front()?;
 543        Some(AudioFrame {
 544            data: Cow::Owned(buf),
 545            sample_rate: self.sample_rate,
 546            num_channels: self.num_channels,
 547            samples_per_channel: self.sample_rate / 100,
 548        })
 549    }
 550}
 551
 552pub fn play_remote_video_track(
 553    track: &crate::RemoteVideoTrack,
 554) -> impl Stream<Item = RemoteVideoFrame> + use<> {
 555    #[cfg(target_os = "macos")]
 556    {
 557        let mut pool = None;
 558        let most_recent_frame_size = (0, 0);
 559        NativeVideoStream::new(track.0.rtc_track()).filter_map(move |frame| {
 560            if pool == None
 561                || most_recent_frame_size != (frame.buffer.width(), frame.buffer.height())
 562            {
 563                pool = create_buffer_pool(frame.buffer.width(), frame.buffer.height()).log_err();
 564            }
 565            let pool = pool.clone();
 566            async move {
 567                if frame.buffer.width() < 10 && frame.buffer.height() < 10 {
 568                    // when the remote stops sharing, we get an 8x8 black image.
 569                    // In a lil bit, the unpublish will come through and close the view,
 570                    // but until then, don't flash black.
 571                    return None;
 572                }
 573
 574                video_frame_buffer_from_webrtc(pool?, frame.buffer)
 575            }
 576        })
 577    }
 578    #[cfg(not(target_os = "macos"))]
 579    {
 580        NativeVideoStream::new(track.0.rtc_track())
 581            .filter_map(|frame| async move { video_frame_buffer_from_webrtc(frame.buffer) })
 582    }
 583}
 584
 585#[cfg(target_os = "macos")]
 586fn create_buffer_pool(
 587    width: u32,
 588    height: u32,
 589) -> Result<core_video::pixel_buffer_pool::CVPixelBufferPool> {
 590    use core_foundation::{base::TCFType, number::CFNumber, string::CFString};
 591    use core_video::pixel_buffer;
 592    use core_video::{
 593        pixel_buffer::kCVPixelFormatType_420YpCbCr8BiPlanarFullRange,
 594        pixel_buffer_io_surface::kCVPixelBufferIOSurfaceCoreAnimationCompatibilityKey,
 595        pixel_buffer_pool::{self},
 596    };
 597
 598    let width_key: CFString =
 599        unsafe { CFString::wrap_under_get_rule(pixel_buffer::kCVPixelBufferWidthKey) };
 600    let height_key: CFString =
 601        unsafe { CFString::wrap_under_get_rule(pixel_buffer::kCVPixelBufferHeightKey) };
 602    let animation_key: CFString = unsafe {
 603        CFString::wrap_under_get_rule(kCVPixelBufferIOSurfaceCoreAnimationCompatibilityKey)
 604    };
 605    let format_key: CFString =
 606        unsafe { CFString::wrap_under_get_rule(pixel_buffer::kCVPixelBufferPixelFormatTypeKey) };
 607
 608    let yes: CFNumber = 1.into();
 609    let width: CFNumber = (width as i32).into();
 610    let height: CFNumber = (height as i32).into();
 611    let format: CFNumber = (kCVPixelFormatType_420YpCbCr8BiPlanarFullRange as i64).into();
 612
 613    let buffer_attributes = core_foundation::dictionary::CFDictionary::from_CFType_pairs(&[
 614        (width_key, width.into_CFType()),
 615        (height_key, height.into_CFType()),
 616        (animation_key, yes.into_CFType()),
 617        (format_key, format.into_CFType()),
 618    ]);
 619
 620    pixel_buffer_pool::CVPixelBufferPool::new(None, Some(&buffer_attributes)).map_err(|cv_return| {
 621        anyhow::anyhow!("failed to create pixel buffer pool: CVReturn({cv_return})",)
 622    })
 623}
 624
 625#[cfg(target_os = "macos")]
 626pub type RemoteVideoFrame = core_video::pixel_buffer::CVPixelBuffer;
 627
 628#[cfg(target_os = "macos")]
 629fn video_frame_buffer_from_webrtc(
 630    pool: core_video::pixel_buffer_pool::CVPixelBufferPool,
 631    buffer: Box<dyn VideoBuffer>,
 632) -> Option<RemoteVideoFrame> {
 633    use core_foundation::base::TCFType;
 634    use core_video::{pixel_buffer::CVPixelBuffer, r#return::kCVReturnSuccess};
 635    use livekit::webrtc::native::yuv_helper::i420_to_nv12;
 636
 637    if let Some(native) = buffer.as_native() {
 638        let pixel_buffer = native.get_cv_pixel_buffer();
 639        if pixel_buffer.is_null() {
 640            return None;
 641        }
 642        return unsafe { Some(CVPixelBuffer::wrap_under_get_rule(pixel_buffer as _)) };
 643    }
 644
 645    let i420_buffer = buffer.as_i420()?;
 646    let pixel_buffer = pool.create_pixel_buffer().log_err()?;
 647
 648    let image_buffer = unsafe {
 649        if pixel_buffer.lock_base_address(0) != kCVReturnSuccess {
 650            return None;
 651        }
 652
 653        let dst_y = pixel_buffer.get_base_address_of_plane(0);
 654        let dst_y_stride = pixel_buffer.get_bytes_per_row_of_plane(0);
 655        let dst_y_len = pixel_buffer.get_height_of_plane(0) * dst_y_stride;
 656        let dst_uv = pixel_buffer.get_base_address_of_plane(1);
 657        let dst_uv_stride = pixel_buffer.get_bytes_per_row_of_plane(1);
 658        let dst_uv_len = pixel_buffer.get_height_of_plane(1) * dst_uv_stride;
 659        let width = pixel_buffer.get_width();
 660        let height = pixel_buffer.get_height();
 661        let dst_y_buffer = std::slice::from_raw_parts_mut(dst_y as *mut u8, dst_y_len);
 662        let dst_uv_buffer = std::slice::from_raw_parts_mut(dst_uv as *mut u8, dst_uv_len);
 663
 664        let (stride_y, stride_u, stride_v) = i420_buffer.strides();
 665        let (src_y, src_u, src_v) = i420_buffer.data();
 666        i420_to_nv12(
 667            src_y,
 668            stride_y,
 669            src_u,
 670            stride_u,
 671            src_v,
 672            stride_v,
 673            dst_y_buffer,
 674            dst_y_stride as u32,
 675            dst_uv_buffer,
 676            dst_uv_stride as u32,
 677            width as i32,
 678            height as i32,
 679        );
 680
 681        if pixel_buffer.unlock_base_address(0) != kCVReturnSuccess {
 682            return None;
 683        }
 684
 685        pixel_buffer
 686    };
 687
 688    Some(image_buffer)
 689}
 690
 691#[cfg(not(target_os = "macos"))]
 692pub type RemoteVideoFrame = Arc<gpui::RenderImage>;
 693
 694#[cfg(not(target_os = "macos"))]
 695fn video_frame_buffer_from_webrtc(buffer: Box<dyn VideoBuffer>) -> Option<RemoteVideoFrame> {
 696    use gpui::RenderImage;
 697    use image::{Frame, RgbaImage};
 698    use livekit::webrtc::prelude::VideoFormatType;
 699    use smallvec::SmallVec;
 700    use std::alloc::{Layout, alloc};
 701
 702    let width = buffer.width();
 703    let height = buffer.height();
 704    let stride = width * 4;
 705    let byte_len = (stride * height) as usize;
 706    let argb_image = unsafe {
 707        // Motivation for this unsafe code is to avoid initializing the frame data, since to_argb
 708        // will write all bytes anyway.
 709        let start_ptr = alloc(Layout::array::<u8>(byte_len).log_err()?);
 710        if start_ptr.is_null() {
 711            return None;
 712        }
 713        let argb_frame_slice = std::slice::from_raw_parts_mut(start_ptr, byte_len);
 714        buffer.to_argb(
 715            VideoFormatType::ARGB,
 716            argb_frame_slice,
 717            stride,
 718            width as i32,
 719            height as i32,
 720        );
 721        Vec::from_raw_parts(start_ptr, byte_len, byte_len)
 722    };
 723
 724    // TODO: Unclear why providing argb_image to RgbaImage works properly.
 725    let image = RgbaImage::from_raw(width, height, argb_image)
 726        .with_context(|| "Bug: not enough bytes allocated for image.")
 727        .log_err()?;
 728
 729    Some(Arc::new(RenderImage::new(SmallVec::from_elem(
 730        Frame::new(image),
 731        1,
 732    ))))
 733}
 734
 735#[cfg(target_os = "macos")]
 736fn video_frame_buffer_to_webrtc(frame: ScreenCaptureFrame) -> Option<impl AsRef<dyn VideoBuffer>> {
 737    use livekit::webrtc;
 738
 739    let pixel_buffer = frame.0.as_concrete_TypeRef();
 740    std::mem::forget(frame.0);
 741    unsafe {
 742        Some(webrtc::video_frame::native::NativeBuffer::from_cv_pixel_buffer(pixel_buffer as _))
 743    }
 744}
 745
 746#[cfg(not(target_os = "macos"))]
 747fn video_frame_buffer_to_webrtc(frame: ScreenCaptureFrame) -> Option<impl AsRef<dyn VideoBuffer>> {
 748    use libwebrtc::native::yuv_helper::{abgr_to_nv12, argb_to_nv12};
 749    use livekit::webrtc::prelude::NV12Buffer;
 750    match frame.0 {
 751        scap::frame::Frame::BGRx(frame) => {
 752            let mut buffer = NV12Buffer::new(frame.width as u32, frame.height as u32);
 753            let (stride_y, stride_uv) = buffer.strides();
 754            let (data_y, data_uv) = buffer.data_mut();
 755            argb_to_nv12(
 756                &frame.data,
 757                frame.width as u32 * 4,
 758                data_y,
 759                stride_y,
 760                data_uv,
 761                stride_uv,
 762                frame.width,
 763                frame.height,
 764            );
 765            Some(buffer)
 766        }
 767        scap::frame::Frame::RGBx(frame) => {
 768            let mut buffer = NV12Buffer::new(frame.width as u32, frame.height as u32);
 769            let (stride_y, stride_uv) = buffer.strides();
 770            let (data_y, data_uv) = buffer.data_mut();
 771            abgr_to_nv12(
 772                &frame.data,
 773                frame.width as u32 * 4,
 774                data_y,
 775                stride_y,
 776                data_uv,
 777                stride_uv,
 778                frame.width,
 779                frame.height,
 780            );
 781            Some(buffer)
 782        }
 783        scap::frame::Frame::YUVFrame(yuvframe) => {
 784            let mut buffer = NV12Buffer::with_strides(
 785                yuvframe.width as u32,
 786                yuvframe.height as u32,
 787                yuvframe.luminance_stride as u32,
 788                yuvframe.chrominance_stride as u32,
 789            );
 790            let (luminance, chrominance) = buffer.data_mut();
 791            luminance.copy_from_slice(yuvframe.luminance_bytes.as_slice());
 792            chrominance.copy_from_slice(yuvframe.chrominance_bytes.as_slice());
 793            Some(buffer)
 794        }
 795        _ => {
 796            log::error!(
 797                "Expected BGRx or YUV frame from scap screen capture but got some other format."
 798            );
 799            None
 800        }
 801    }
 802}
 803
 804trait DeviceChangeListenerApi: Stream<Item = ()> + Sized {
 805    fn new(input: bool) -> Result<Self>;
 806}
 807
 808#[cfg(target_os = "macos")]
 809mod macos {
 810    use cocoa::{
 811        base::{id, nil},
 812        foundation::{NSProcessInfo, NSString},
 813    };
 814    use coreaudio::sys::{
 815        AudioObjectAddPropertyListener, AudioObjectID, AudioObjectPropertyAddress,
 816        AudioObjectRemovePropertyListener, OSStatus, kAudioHardwarePropertyDefaultInputDevice,
 817        kAudioHardwarePropertyDefaultOutputDevice, kAudioObjectPropertyElementMaster,
 818        kAudioObjectPropertyScopeGlobal, kAudioObjectSystemObject,
 819    };
 820    use futures::{StreamExt, channel::mpsc::UnboundedReceiver};
 821    use objc::{msg_send, sel, sel_impl};
 822
 823    /// A guard that prevents App Nap while held.
 824    ///
 825    /// On macOS, App Nap can throttle background apps to save power. This can cause
 826    /// audio artifacts when the app is not in the foreground. This guard tells macOS
 827    /// that we're doing latency-sensitive work and should not be throttled.
 828    ///
 829    /// See Apple's documentation on prioritizing work at the app level:
 830    /// https://developer.apple.com/library/archive/documentation/Performance/Conceptual/power_efficiency_guidelines_osx/PrioritizeWorkAtTheAppLevel.html
 831    pub struct PreventAppNapGuard {
 832        activity: id,
 833    }
 834
 835    // The activity token returned by NSProcessInfo is thread-safe
 836    unsafe impl Send for PreventAppNapGuard {}
 837
 838    // From NSProcessInfo.h
 839    const NS_ACTIVITY_IDLE_SYSTEM_SLEEP_DISABLED: u64 = 1 << 20;
 840    const NS_ACTIVITY_USER_INITIATED: u64 = 0x00FFFFFF | NS_ACTIVITY_IDLE_SYSTEM_SLEEP_DISABLED;
 841    const NS_ACTIVITY_USER_INITIATED_ALLOWING_IDLE_SYSTEM_SLEEP: u64 =
 842        NS_ACTIVITY_USER_INITIATED & !NS_ACTIVITY_IDLE_SYSTEM_SLEEP_DISABLED;
 843
 844    impl PreventAppNapGuard {
 845        pub fn new() -> Self {
 846            unsafe {
 847                let process_info = NSProcessInfo::processInfo(nil);
 848                let reason = NSString::alloc(nil).init_str("Audio playback in progress");
 849                let activity: id = msg_send![process_info, beginActivityWithOptions:NS_ACTIVITY_USER_INITIATED_ALLOWING_IDLE_SYSTEM_SLEEP reason:reason];
 850                let _: () = msg_send![activity, retain];
 851                Self { activity }
 852            }
 853        }
 854    }
 855
 856    impl Drop for PreventAppNapGuard {
 857        fn drop(&mut self) {
 858            unsafe {
 859                let process_info = NSProcessInfo::processInfo(nil);
 860                let _: () = msg_send![process_info, endActivity:self.activity];
 861                let _: () = msg_send![self.activity, release];
 862            }
 863        }
 864    }
 865
 866    /// Implementation from: https://github.com/zed-industries/cpal/blob/fd8bc2fd39f1f5fdee5a0690656caff9a26d9d50/src/host/coreaudio/macos/property_listener.rs#L15
 867    pub struct CoreAudioDefaultDeviceChangeListener {
 868        rx: UnboundedReceiver<()>,
 869        callback: Box<PropertyListenerCallbackWrapper>,
 870        input: bool,
 871        device_id: AudioObjectID, // Store the device ID to properly remove listeners
 872    }
 873
 874    trait _AssertSend: Send {}
 875    impl _AssertSend for CoreAudioDefaultDeviceChangeListener {}
 876
 877    struct PropertyListenerCallbackWrapper(Box<dyn FnMut() + Send>);
 878
 879    unsafe extern "C" fn property_listener_handler_shim(
 880        _: AudioObjectID,
 881        _: u32,
 882        _: *const AudioObjectPropertyAddress,
 883        callback: *mut ::std::os::raw::c_void,
 884    ) -> OSStatus {
 885        let wrapper = callback as *mut PropertyListenerCallbackWrapper;
 886        unsafe { (*wrapper).0() };
 887        0
 888    }
 889
 890    impl super::DeviceChangeListenerApi for CoreAudioDefaultDeviceChangeListener {
 891        fn new(input: bool) -> anyhow::Result<Self> {
 892            let (tx, rx) = futures::channel::mpsc::unbounded();
 893
 894            let callback = Box::new(PropertyListenerCallbackWrapper(Box::new(move || {
 895                tx.unbounded_send(()).ok();
 896            })));
 897
 898            // Get the current default device ID
 899            let device_id = unsafe {
 900                // Listen for default device changes
 901                coreaudio::Error::from_os_status(AudioObjectAddPropertyListener(
 902                    kAudioObjectSystemObject,
 903                    &AudioObjectPropertyAddress {
 904                        mSelector: if input {
 905                            kAudioHardwarePropertyDefaultInputDevice
 906                        } else {
 907                            kAudioHardwarePropertyDefaultOutputDevice
 908                        },
 909                        mScope: kAudioObjectPropertyScopeGlobal,
 910                        mElement: kAudioObjectPropertyElementMaster,
 911                    },
 912                    Some(property_listener_handler_shim),
 913                    &*callback as *const _ as *mut _,
 914                ))?;
 915
 916                // Also listen for changes to the device configuration
 917                let device_id = if input {
 918                    let mut input_device: AudioObjectID = 0;
 919                    let mut prop_size = std::mem::size_of::<AudioObjectID>() as u32;
 920                    let result = coreaudio::sys::AudioObjectGetPropertyData(
 921                        kAudioObjectSystemObject,
 922                        &AudioObjectPropertyAddress {
 923                            mSelector: kAudioHardwarePropertyDefaultInputDevice,
 924                            mScope: kAudioObjectPropertyScopeGlobal,
 925                            mElement: kAudioObjectPropertyElementMaster,
 926                        },
 927                        0,
 928                        std::ptr::null(),
 929                        &mut prop_size as *mut _,
 930                        &mut input_device as *mut _ as *mut _,
 931                    );
 932                    if result != 0 {
 933                        log::warn!("Failed to get default input device ID");
 934                        0
 935                    } else {
 936                        input_device
 937                    }
 938                } else {
 939                    let mut output_device: AudioObjectID = 0;
 940                    let mut prop_size = std::mem::size_of::<AudioObjectID>() as u32;
 941                    let result = coreaudio::sys::AudioObjectGetPropertyData(
 942                        kAudioObjectSystemObject,
 943                        &AudioObjectPropertyAddress {
 944                            mSelector: kAudioHardwarePropertyDefaultOutputDevice,
 945                            mScope: kAudioObjectPropertyScopeGlobal,
 946                            mElement: kAudioObjectPropertyElementMaster,
 947                        },
 948                        0,
 949                        std::ptr::null(),
 950                        &mut prop_size as *mut _,
 951                        &mut output_device as *mut _ as *mut _,
 952                    );
 953                    if result != 0 {
 954                        log::warn!("Failed to get default output device ID");
 955                        0
 956                    } else {
 957                        output_device
 958                    }
 959                };
 960
 961                if device_id != 0 {
 962                    // Listen for format changes on the device
 963                    coreaudio::Error::from_os_status(AudioObjectAddPropertyListener(
 964                        device_id,
 965                        &AudioObjectPropertyAddress {
 966                            mSelector: coreaudio::sys::kAudioDevicePropertyStreamFormat,
 967                            mScope: if input {
 968                                coreaudio::sys::kAudioObjectPropertyScopeInput
 969                            } else {
 970                                coreaudio::sys::kAudioObjectPropertyScopeOutput
 971                            },
 972                            mElement: kAudioObjectPropertyElementMaster,
 973                        },
 974                        Some(property_listener_handler_shim),
 975                        &*callback as *const _ as *mut _,
 976                    ))?;
 977                }
 978
 979                device_id
 980            };
 981
 982            Ok(Self {
 983                rx,
 984                callback,
 985                input,
 986                device_id,
 987            })
 988        }
 989    }
 990
 991    impl Drop for CoreAudioDefaultDeviceChangeListener {
 992        fn drop(&mut self) {
 993            unsafe {
 994                // Remove the system-level property listener
 995                AudioObjectRemovePropertyListener(
 996                    kAudioObjectSystemObject,
 997                    &AudioObjectPropertyAddress {
 998                        mSelector: if self.input {
 999                            kAudioHardwarePropertyDefaultInputDevice
1000                        } else {
1001                            kAudioHardwarePropertyDefaultOutputDevice
1002                        },
1003                        mScope: kAudioObjectPropertyScopeGlobal,
1004                        mElement: kAudioObjectPropertyElementMaster,
1005                    },
1006                    Some(property_listener_handler_shim),
1007                    &*self.callback as *const _ as *mut _,
1008                );
1009
1010                // Remove the device-specific property listener if we have a valid device ID
1011                if self.device_id != 0 {
1012                    AudioObjectRemovePropertyListener(
1013                        self.device_id,
1014                        &AudioObjectPropertyAddress {
1015                            mSelector: coreaudio::sys::kAudioDevicePropertyStreamFormat,
1016                            mScope: if self.input {
1017                                coreaudio::sys::kAudioObjectPropertyScopeInput
1018                            } else {
1019                                coreaudio::sys::kAudioObjectPropertyScopeOutput
1020                            },
1021                            mElement: kAudioObjectPropertyElementMaster,
1022                        },
1023                        Some(property_listener_handler_shim),
1024                        &*self.callback as *const _ as *mut _,
1025                    );
1026                }
1027            }
1028        }
1029    }
1030
1031    impl futures::Stream for CoreAudioDefaultDeviceChangeListener {
1032        type Item = ();
1033
1034        fn poll_next(
1035            mut self: std::pin::Pin<&mut Self>,
1036            cx: &mut std::task::Context<'_>,
1037        ) -> std::task::Poll<Option<Self::Item>> {
1038            self.rx.poll_next_unpin(cx)
1039        }
1040    }
1041}
1042
1043#[cfg(target_os = "macos")]
1044type DeviceChangeListener = macos::CoreAudioDefaultDeviceChangeListener;
1045#[cfg(target_os = "macos")]
1046use macos::PreventAppNapGuard;
1047
1048#[cfg(not(target_os = "macos"))]
1049mod noop_change_listener {
1050    use std::task::Poll;
1051
1052    use super::DeviceChangeListenerApi;
1053
1054    pub struct NoopOutputDeviceChangelistener {}
1055
1056    impl DeviceChangeListenerApi for NoopOutputDeviceChangelistener {
1057        fn new(_input: bool) -> anyhow::Result<Self> {
1058            Ok(NoopOutputDeviceChangelistener {})
1059        }
1060    }
1061
1062    impl futures::Stream for NoopOutputDeviceChangelistener {
1063        type Item = ();
1064
1065        fn poll_next(
1066            self: std::pin::Pin<&mut Self>,
1067            _cx: &mut std::task::Context<'_>,
1068        ) -> Poll<Option<Self::Item>> {
1069            Poll::Pending
1070        }
1071    }
1072}
1073
1074#[cfg(not(target_os = "macos"))]
1075type DeviceChangeListener = noop_change_listener::NoopOutputDeviceChangelistener;