playback.rs

   1use anyhow::{Context as _, Result};
   2
   3use audio::{AudioSettings, CHANNEL_COUNT, LEGACY_CHANNEL_COUNT, LEGACY_SAMPLE_RATE, SAMPLE_RATE};
   4use cpal::traits::{DeviceTrait, StreamTrait as _};
   5use futures::channel::mpsc::UnboundedSender;
   6use futures::{Stream, StreamExt as _};
   7use gpui::{
   8    AsyncApp, BackgroundExecutor, ScreenCaptureFrame, ScreenCaptureSource, ScreenCaptureStream,
   9    Task,
  10};
  11use libwebrtc::native::{apm, audio_mixer, audio_resampler};
  12use livekit::track;
  13
  14use livekit::webrtc::{
  15    audio_frame::AudioFrame,
  16    audio_source::{AudioSourceOptions, RtcAudioSource, native::NativeAudioSource},
  17    audio_stream::native::NativeAudioStream,
  18    video_frame::{VideoBuffer, VideoFrame, VideoRotation},
  19    video_source::{RtcVideoSource, VideoResolution, native::NativeVideoSource},
  20    video_stream::native::NativeVideoStream,
  21};
  22use log::info;
  23use parking_lot::Mutex;
  24use rodio::Source;
  25use serde::{Deserialize, Serialize};
  26use settings::Settings;
  27use std::cell::RefCell;
  28use std::sync::Weak;
  29use std::sync::atomic::{AtomicBool, AtomicI32, Ordering};
  30use std::time::Duration;
  31use std::{borrow::Cow, collections::VecDeque, sync::Arc, thread};
  32use util::{ResultExt as _, maybe};
  33
  34mod source;
  35
  36pub(crate) struct AudioStack {
  37    executor: BackgroundExecutor,
  38    apm: Arc<Mutex<apm::AudioProcessingModule>>,
  39    mixer: Arc<Mutex<audio_mixer::AudioMixer>>,
  40    _output_task: RefCell<Weak<Task<()>>>,
  41    next_ssrc: AtomicI32,
  42}
  43
  44pub(crate) fn play_remote_audio_track(
  45    track: &livekit::track::RemoteAudioTrack,
  46    speaker: Speaker,
  47    cx: &mut gpui::App,
  48) -> Result<AudioStream> {
  49    info!("speaker: {speaker:?}");
  50    let stream =
  51        source::LiveKitStream::new(cx.background_executor(), track, speaker.sends_legacy_audio);
  52
  53    let stop_handle = Arc::new(AtomicBool::new(false));
  54    let stop_handle_clone = stop_handle.clone();
  55    let stream = stream
  56        .stoppable()
  57        .periodic_access(Duration::from_millis(50), move |s| {
  58            if stop_handle.load(Ordering::Relaxed) {
  59                s.stop();
  60            }
  61        });
  62
  63    info!("sample_rate: {:?}", stream.sample_rate());
  64    info!("channel_count: {:?}", stream.channels());
  65    audio::Audio::play_voip_stream(stream, speaker.name, speaker.is_staff, cx)
  66        .context("Could not play audio")?;
  67
  68    let on_drop = util::defer(move || {
  69        stop_handle_clone.store(true, Ordering::Relaxed);
  70    });
  71    Ok(AudioStream::Output {
  72        _drop: Box::new(on_drop),
  73    })
  74}
  75
  76impl AudioStack {
  77    pub(crate) fn new(executor: BackgroundExecutor) -> Self {
  78        let apm = Arc::new(Mutex::new(apm::AudioProcessingModule::new(
  79            true, true, true, true,
  80        )));
  81        let mixer = Arc::new(Mutex::new(audio_mixer::AudioMixer::new()));
  82        Self {
  83            executor,
  84            apm,
  85            mixer,
  86            _output_task: RefCell::new(Weak::new()),
  87            next_ssrc: AtomicI32::new(1),
  88        }
  89    }
  90
  91    pub(crate) fn play_remote_audio_track(
  92        &self,
  93        track: &livekit::track::RemoteAudioTrack,
  94    ) -> AudioStream {
  95        let output_task = self.start_output();
  96
  97        let next_ssrc = self.next_ssrc.fetch_add(1, Ordering::Relaxed);
  98        let source = AudioMixerSource {
  99            ssrc: next_ssrc,
 100            sample_rate: LEGACY_SAMPLE_RATE.get(),
 101            num_channels: LEGACY_CHANNEL_COUNT.get() as u32,
 102            buffer: Arc::default(),
 103        };
 104        self.mixer.lock().add_source(source.clone());
 105
 106        let mut stream = NativeAudioStream::new(
 107            track.rtc_track(),
 108            source.sample_rate as i32,
 109            source.num_channels as i32,
 110        );
 111
 112        let receive_task = self.executor.spawn({
 113            let source = source.clone();
 114            async move {
 115                while let Some(frame) = stream.next().await {
 116                    source.receive(frame);
 117                }
 118            }
 119        });
 120
 121        let mixer = self.mixer.clone();
 122        let on_drop = util::defer(move || {
 123            mixer.lock().remove_source(source.ssrc);
 124            drop(receive_task);
 125            drop(output_task);
 126        });
 127
 128        AudioStream::Output {
 129            _drop: Box::new(on_drop),
 130        }
 131    }
 132
 133    fn start_output(&self) -> Arc<Task<()>> {
 134        if let Some(task) = self._output_task.borrow().upgrade() {
 135            return task;
 136        }
 137        let task = Arc::new(self.executor.spawn({
 138            let apm = self.apm.clone();
 139            let mixer = self.mixer.clone();
 140            async move {
 141                Self::play_output(
 142                    apm,
 143                    mixer,
 144                    LEGACY_SAMPLE_RATE.get(),
 145                    LEGACY_CHANNEL_COUNT.get().into(),
 146                )
 147                .await
 148                .log_err();
 149            }
 150        }));
 151        *self._output_task.borrow_mut() = Arc::downgrade(&task);
 152        task
 153    }
 154
 155    pub(crate) fn capture_local_microphone_track(
 156        &self,
 157        user_name: String,
 158        is_staff: bool,
 159        cx: &AsyncApp,
 160    ) -> Result<(crate::LocalAudioTrack, AudioStream)> {
 161        let legacy_audio_compatible =
 162            AudioSettings::try_read_global(cx, |setting| setting.legacy_audio_compatible)
 163                .unwrap_or(true);
 164
 165        let source = if legacy_audio_compatible {
 166            NativeAudioSource::new(
 167                // n.b. this struct's options are always ignored, noise cancellation is provided by apm.
 168                AudioSourceOptions::default(),
 169                LEGACY_SAMPLE_RATE.get(),
 170                LEGACY_CHANNEL_COUNT.get().into(),
 171                10,
 172            )
 173        } else {
 174            NativeAudioSource::new(
 175                // n.b. this struct's options are always ignored, noise cancellation is provided by apm.
 176                AudioSourceOptions::default(),
 177                SAMPLE_RATE.get(),
 178                CHANNEL_COUNT.get().into(),
 179                10,
 180            )
 181        };
 182
 183        let speaker = Speaker {
 184            name: user_name,
 185            is_staff,
 186            sends_legacy_audio: legacy_audio_compatible,
 187        };
 188        log::info!("Microphone speaker: {speaker:?}");
 189        let track_name = serde_urlencoded::to_string(speaker)
 190            .context("Could not encode user information in track name")?;
 191
 192        let track = track::LocalAudioTrack::create_audio_track(
 193            &track_name,
 194            RtcAudioSource::Native(source.clone()),
 195        );
 196
 197        let apm = self.apm.clone();
 198
 199        let (frame_tx, mut frame_rx) = futures::channel::mpsc::unbounded();
 200        let transmit_task = self.executor.spawn({
 201            async move {
 202                while let Some(frame) = frame_rx.next().await {
 203                    source.capture_frame(&frame).await.log_err();
 204                }
 205            }
 206        });
 207        let rodio_pipeline =
 208            AudioSettings::try_read_global(cx, |setting| setting.rodio_audio).unwrap_or_default();
 209        let capture_task = if rodio_pipeline {
 210            info!("Using experimental.rodio_audio audio pipeline");
 211            let voip_parts = audio::VoipParts::new(cx)?;
 212            // Audio needs to run real-time and should never be paused. That is
 213            // why we are using a normal std::thread and not a background task
 214            thread::Builder::new()
 215                .name("MicrophoneToLivekit".to_string())
 216                .spawn(move || {
 217                    // microphone is non send on mac
 218                    let microphone = match audio::Audio::open_microphone(voip_parts) {
 219                        Ok(m) => m,
 220                        Err(e) => {
 221                            log::error!("Could not open microphone: {e}");
 222                            return;
 223                        }
 224                    };
 225                    send_to_livekit(frame_tx, microphone);
 226                })
 227                .expect("should be able to spawn threads");
 228            Task::ready(Ok(()))
 229        } else {
 230            self.executor.spawn(async move {
 231                Self::capture_input(
 232                    apm,
 233                    frame_tx,
 234                    LEGACY_SAMPLE_RATE.get(),
 235                    LEGACY_CHANNEL_COUNT.get().into(),
 236                )
 237                .await
 238            })
 239        };
 240
 241        let on_drop = util::defer(|| {
 242            drop(transmit_task);
 243            drop(capture_task);
 244        });
 245        Ok((
 246            super::LocalAudioTrack(track),
 247            AudioStream::Output {
 248                _drop: Box::new(on_drop),
 249            },
 250        ))
 251    }
 252
 253    async fn play_output(
 254        apm: Arc<Mutex<apm::AudioProcessingModule>>,
 255        mixer: Arc<Mutex<audio_mixer::AudioMixer>>,
 256        sample_rate: u32,
 257        num_channels: u32,
 258    ) -> Result<()> {
 259        // Prevent App Nap from throttling audio playback on macOS.
 260        // This guard is held for the entire duration of audio output.
 261        #[cfg(target_os = "macos")]
 262        let _prevent_app_nap = PreventAppNapGuard::new();
 263
 264        loop {
 265            let mut device_change_listener = DeviceChangeListener::new(false)?;
 266            let (output_device, output_config) = crate::default_device(false)?;
 267            let (end_on_drop_tx, end_on_drop_rx) = std::sync::mpsc::channel::<()>();
 268            let mixer = mixer.clone();
 269            let apm = apm.clone();
 270            let mut resampler = audio_resampler::AudioResampler::default();
 271            let mut buf = Vec::new();
 272
 273            thread::Builder::new()
 274                .name("AudioPlayback".to_owned())
 275                .spawn(move || {
 276                    let output_stream = output_device.build_output_stream(
 277                        &output_config.config(),
 278                        {
 279                            move |mut data, _info| {
 280                                while data.len() > 0 {
 281                                    if data.len() <= buf.len() {
 282                                        let rest = buf.split_off(data.len());
 283                                        data.copy_from_slice(&buf);
 284                                        buf = rest;
 285                                        return;
 286                                    }
 287                                    if buf.len() > 0 {
 288                                        let (prefix, suffix) = data.split_at_mut(buf.len());
 289                                        prefix.copy_from_slice(&buf);
 290                                        data = suffix;
 291                                    }
 292
 293                                    let mut mixer = mixer.lock();
 294                                    let mixed = mixer.mix(output_config.channels() as usize);
 295                                    let sampled = resampler.remix_and_resample(
 296                                        mixed,
 297                                        sample_rate / 100,
 298                                        num_channels,
 299                                        sample_rate,
 300                                        output_config.channels() as u32,
 301                                        output_config.sample_rate().0,
 302                                    );
 303                                    buf = sampled.to_vec();
 304                                    apm.lock()
 305                                        .process_reverse_stream(
 306                                            &mut buf,
 307                                            output_config.sample_rate().0 as i32,
 308                                            output_config.channels() as i32,
 309                                        )
 310                                        .ok();
 311                                }
 312                            }
 313                        },
 314                        |error| log::error!("error playing audio track: {:?}", error),
 315                        Some(Duration::from_millis(100)),
 316                    );
 317
 318                    let Some(output_stream) = output_stream.log_err() else {
 319                        return;
 320                    };
 321
 322                    output_stream.play().log_err();
 323                    // Block forever to keep the output stream alive
 324                    end_on_drop_rx.recv().ok();
 325                })
 326                .unwrap();
 327
 328            device_change_listener.next().await;
 329            drop(end_on_drop_tx)
 330        }
 331    }
 332
 333    async fn capture_input(
 334        apm: Arc<Mutex<apm::AudioProcessingModule>>,
 335        frame_tx: UnboundedSender<AudioFrame<'static>>,
 336        sample_rate: u32,
 337        num_channels: u32,
 338    ) -> Result<()> {
 339        loop {
 340            let mut device_change_listener = DeviceChangeListener::new(true)?;
 341            let (device, config) = crate::default_device(true)?;
 342            let (end_on_drop_tx, end_on_drop_rx) = std::sync::mpsc::channel::<()>();
 343            let apm = apm.clone();
 344            let frame_tx = frame_tx.clone();
 345            let mut resampler = audio_resampler::AudioResampler::default();
 346
 347            thread::Builder::new()
 348                .name("AudioCapture".to_owned())
 349                .spawn(move || {
 350                    maybe!({
 351                        if let Some(name) = device.name().ok() {
 352                            log::info!("Using microphone: {}", name)
 353                        } else {
 354                            log::info!("Using microphone: <unknown>");
 355                        }
 356
 357                        let ten_ms_buffer_size =
 358                            (config.channels() as u32 * config.sample_rate().0 / 100) as usize;
 359                        let mut buf: Vec<i16> = Vec::with_capacity(ten_ms_buffer_size);
 360
 361                        let stream = device
 362                            .build_input_stream_raw(
 363                                &config.config(),
 364                                config.sample_format(),
 365                                move |data, _: &_| {
 366                                    let data = crate::get_sample_data(config.sample_format(), data)
 367                                        .log_err();
 368                                    let Some(data) = data else {
 369                                        return;
 370                                    };
 371                                    let mut data = data.as_slice();
 372
 373                                    while data.len() > 0 {
 374                                        let remainder =
 375                                            (buf.capacity() - buf.len()).min(data.len());
 376                                        buf.extend_from_slice(&data[..remainder]);
 377                                        data = &data[remainder..];
 378
 379                                        if buf.capacity() == buf.len() {
 380                                            let mut sampled = resampler
 381                                                .remix_and_resample(
 382                                                    buf.as_slice(),
 383                                                    config.sample_rate().0 / 100,
 384                                                    config.channels() as u32,
 385                                                    config.sample_rate().0,
 386                                                    num_channels,
 387                                                    sample_rate,
 388                                                )
 389                                                .to_owned();
 390                                            apm.lock()
 391                                                .process_stream(
 392                                                    &mut sampled,
 393                                                    sample_rate as i32,
 394                                                    num_channels as i32,
 395                                                )
 396                                                .log_err();
 397                                            buf.clear();
 398                                            frame_tx
 399                                                .unbounded_send(AudioFrame {
 400                                                    data: Cow::Owned(sampled),
 401                                                    sample_rate,
 402                                                    num_channels,
 403                                                    samples_per_channel: sample_rate / 100,
 404                                                })
 405                                                .ok();
 406                                        }
 407                                    }
 408                                },
 409                                |err| log::error!("error capturing audio track: {:?}", err),
 410                                Some(Duration::from_millis(100)),
 411                            )
 412                            .context("failed to build input stream")?;
 413
 414                        stream.play()?;
 415                        // Keep the thread alive and holding onto the `stream`
 416                        end_on_drop_rx.recv().ok();
 417                        anyhow::Ok(Some(()))
 418                    })
 419                    .log_err();
 420                })
 421                .unwrap();
 422
 423            device_change_listener.next().await;
 424            drop(end_on_drop_tx)
 425        }
 426    }
 427}
 428
 429#[derive(Serialize, Deserialize, Debug)]
 430pub struct Speaker {
 431    pub name: String,
 432    pub is_staff: bool,
 433    pub sends_legacy_audio: bool,
 434}
 435
 436fn send_to_livekit(frame_tx: UnboundedSender<AudioFrame<'static>>, mut microphone: impl Source) {
 437    use cpal::Sample;
 438    let sample_rate = microphone.sample_rate().get();
 439    let num_channels = microphone.channels().get() as u32;
 440    let buffer_size = sample_rate / 100 * num_channels;
 441
 442    loop {
 443        let sampled: Vec<_> = microphone
 444            .by_ref()
 445            .take(buffer_size as usize)
 446            .map(|s| s.to_sample())
 447            .collect();
 448
 449        if frame_tx
 450            .unbounded_send(AudioFrame {
 451                sample_rate,
 452                num_channels,
 453                samples_per_channel: sampled.len() as u32 / num_channels,
 454                data: Cow::Owned(sampled),
 455            })
 456            .is_err()
 457        {
 458            // must rx has dropped or is not consuming
 459            break;
 460        }
 461    }
 462}
 463
 464use super::LocalVideoTrack;
 465
 466pub enum AudioStream {
 467    Input { _task: Task<()> },
 468    Output { _drop: Box<dyn std::any::Any> },
 469}
 470
 471pub(crate) async fn capture_local_video_track(
 472    capture_source: &dyn ScreenCaptureSource,
 473    cx: &mut gpui::AsyncApp,
 474) -> Result<(crate::LocalVideoTrack, Box<dyn ScreenCaptureStream>)> {
 475    let metadata = capture_source.metadata()?;
 476    let track_source = gpui_tokio::Tokio::spawn(cx, async move {
 477        NativeVideoSource::new(VideoResolution {
 478            width: metadata.resolution.width.0 as u32,
 479            height: metadata.resolution.height.0 as u32,
 480        })
 481    })
 482    .await?;
 483
 484    let capture_stream = capture_source
 485        .stream(cx.foreground_executor(), {
 486            let track_source = track_source.clone();
 487            Box::new(move |frame| {
 488                if let Some(buffer) = video_frame_buffer_to_webrtc(frame) {
 489                    track_source.capture_frame(&VideoFrame {
 490                        rotation: VideoRotation::VideoRotation0,
 491                        timestamp_us: 0,
 492                        buffer,
 493                    });
 494                }
 495            })
 496        })
 497        .await??;
 498
 499    Ok((
 500        LocalVideoTrack(track::LocalVideoTrack::create_video_track(
 501            "screen share",
 502            RtcVideoSource::Native(track_source),
 503        )),
 504        capture_stream,
 505    ))
 506}
 507
 508#[derive(Clone)]
 509struct AudioMixerSource {
 510    ssrc: i32,
 511    sample_rate: u32,
 512    num_channels: u32,
 513    buffer: Arc<Mutex<VecDeque<Vec<i16>>>>,
 514}
 515
 516impl AudioMixerSource {
 517    fn receive(&self, frame: AudioFrame) {
 518        assert_eq!(
 519            frame.data.len() as u32,
 520            self.sample_rate * self.num_channels / 100
 521        );
 522
 523        let mut buffer = self.buffer.lock();
 524        buffer.push_back(frame.data.to_vec());
 525        while buffer.len() > 10 {
 526            buffer.pop_front();
 527        }
 528    }
 529}
 530
 531impl libwebrtc::native::audio_mixer::AudioMixerSource for AudioMixerSource {
 532    fn ssrc(&self) -> i32 {
 533        self.ssrc
 534    }
 535
 536    fn preferred_sample_rate(&self) -> u32 {
 537        self.sample_rate
 538    }
 539
 540    fn get_audio_frame_with_info<'a>(&self, target_sample_rate: u32) -> Option<AudioFrame<'_>> {
 541        assert_eq!(self.sample_rate, target_sample_rate);
 542        let buf = self.buffer.lock().pop_front()?;
 543        Some(AudioFrame {
 544            data: Cow::Owned(buf),
 545            sample_rate: self.sample_rate,
 546            num_channels: self.num_channels,
 547            samples_per_channel: self.sample_rate / 100,
 548        })
 549    }
 550}
 551
 552pub fn play_remote_video_track(
 553    track: &crate::RemoteVideoTrack,
 554    executor: &BackgroundExecutor,
 555) -> impl Stream<Item = RemoteVideoFrame> + use<> {
 556    #[cfg(target_os = "macos")]
 557    {
 558        _ = executor;
 559        let mut pool = None;
 560        let most_recent_frame_size = (0, 0);
 561        NativeVideoStream::new(track.0.rtc_track()).filter_map(move |frame| {
 562            if pool == None
 563                || most_recent_frame_size != (frame.buffer.width(), frame.buffer.height())
 564            {
 565                pool = create_buffer_pool(frame.buffer.width(), frame.buffer.height()).log_err();
 566            }
 567            let pool = pool.clone();
 568            async move {
 569                if frame.buffer.width() < 10 && frame.buffer.height() < 10 {
 570                    // when the remote stops sharing, we get an 8x8 black image.
 571                    // In a lil bit, the unpublish will come through and close the view,
 572                    // but until then, don't flash black.
 573                    return None;
 574                }
 575
 576                video_frame_buffer_from_webrtc(pool?, frame.buffer)
 577            }
 578        })
 579    }
 580    #[cfg(not(target_os = "macos"))]
 581    {
 582        let executor = executor.clone();
 583        NativeVideoStream::new(track.0.rtc_track()).filter_map(move |frame| {
 584            executor.spawn(async move { video_frame_buffer_from_webrtc(frame.buffer) })
 585        })
 586    }
 587}
 588
 589#[cfg(target_os = "macos")]
 590fn create_buffer_pool(
 591    width: u32,
 592    height: u32,
 593) -> Result<core_video::pixel_buffer_pool::CVPixelBufferPool> {
 594    use core_foundation::{base::TCFType, number::CFNumber, string::CFString};
 595    use core_video::pixel_buffer;
 596    use core_video::{
 597        pixel_buffer::kCVPixelFormatType_420YpCbCr8BiPlanarFullRange,
 598        pixel_buffer_io_surface::kCVPixelBufferIOSurfaceCoreAnimationCompatibilityKey,
 599        pixel_buffer_pool::{self},
 600    };
 601
 602    let width_key: CFString =
 603        unsafe { CFString::wrap_under_get_rule(pixel_buffer::kCVPixelBufferWidthKey) };
 604    let height_key: CFString =
 605        unsafe { CFString::wrap_under_get_rule(pixel_buffer::kCVPixelBufferHeightKey) };
 606    let animation_key: CFString = unsafe {
 607        CFString::wrap_under_get_rule(kCVPixelBufferIOSurfaceCoreAnimationCompatibilityKey)
 608    };
 609    let format_key: CFString =
 610        unsafe { CFString::wrap_under_get_rule(pixel_buffer::kCVPixelBufferPixelFormatTypeKey) };
 611
 612    let yes: CFNumber = 1.into();
 613    let width: CFNumber = (width as i32).into();
 614    let height: CFNumber = (height as i32).into();
 615    let format: CFNumber = (kCVPixelFormatType_420YpCbCr8BiPlanarFullRange as i64).into();
 616
 617    let buffer_attributes = core_foundation::dictionary::CFDictionary::from_CFType_pairs(&[
 618        (width_key, width.into_CFType()),
 619        (height_key, height.into_CFType()),
 620        (animation_key, yes.into_CFType()),
 621        (format_key, format.into_CFType()),
 622    ]);
 623
 624    pixel_buffer_pool::CVPixelBufferPool::new(None, Some(&buffer_attributes)).map_err(|cv_return| {
 625        anyhow::anyhow!("failed to create pixel buffer pool: CVReturn({cv_return})",)
 626    })
 627}
 628
 629#[cfg(target_os = "macos")]
 630pub type RemoteVideoFrame = core_video::pixel_buffer::CVPixelBuffer;
 631
 632#[cfg(target_os = "macos")]
 633fn video_frame_buffer_from_webrtc(
 634    pool: core_video::pixel_buffer_pool::CVPixelBufferPool,
 635    buffer: Box<dyn VideoBuffer>,
 636) -> Option<RemoteVideoFrame> {
 637    use core_foundation::base::TCFType;
 638    use core_video::{pixel_buffer::CVPixelBuffer, r#return::kCVReturnSuccess};
 639    use livekit::webrtc::native::yuv_helper::i420_to_nv12;
 640
 641    if let Some(native) = buffer.as_native() {
 642        let pixel_buffer = native.get_cv_pixel_buffer();
 643        if pixel_buffer.is_null() {
 644            return None;
 645        }
 646        return unsafe { Some(CVPixelBuffer::wrap_under_get_rule(pixel_buffer as _)) };
 647    }
 648
 649    let i420_buffer = buffer.as_i420()?;
 650    let pixel_buffer = pool.create_pixel_buffer().log_err()?;
 651
 652    let image_buffer = unsafe {
 653        if pixel_buffer.lock_base_address(0) != kCVReturnSuccess {
 654            return None;
 655        }
 656
 657        let dst_y = pixel_buffer.get_base_address_of_plane(0);
 658        let dst_y_stride = pixel_buffer.get_bytes_per_row_of_plane(0);
 659        let dst_y_len = pixel_buffer.get_height_of_plane(0) * dst_y_stride;
 660        let dst_uv = pixel_buffer.get_base_address_of_plane(1);
 661        let dst_uv_stride = pixel_buffer.get_bytes_per_row_of_plane(1);
 662        let dst_uv_len = pixel_buffer.get_height_of_plane(1) * dst_uv_stride;
 663        let width = pixel_buffer.get_width();
 664        let height = pixel_buffer.get_height();
 665        let dst_y_buffer = std::slice::from_raw_parts_mut(dst_y as *mut u8, dst_y_len);
 666        let dst_uv_buffer = std::slice::from_raw_parts_mut(dst_uv as *mut u8, dst_uv_len);
 667
 668        let (stride_y, stride_u, stride_v) = i420_buffer.strides();
 669        let (src_y, src_u, src_v) = i420_buffer.data();
 670        i420_to_nv12(
 671            src_y,
 672            stride_y,
 673            src_u,
 674            stride_u,
 675            src_v,
 676            stride_v,
 677            dst_y_buffer,
 678            dst_y_stride as u32,
 679            dst_uv_buffer,
 680            dst_uv_stride as u32,
 681            width as i32,
 682            height as i32,
 683        );
 684
 685        if pixel_buffer.unlock_base_address(0) != kCVReturnSuccess {
 686            return None;
 687        }
 688
 689        pixel_buffer
 690    };
 691
 692    Some(image_buffer)
 693}
 694
 695#[cfg(not(target_os = "macos"))]
 696pub type RemoteVideoFrame = Arc<gpui::RenderImage>;
 697
 698#[cfg(not(target_os = "macos"))]
 699fn video_frame_buffer_from_webrtc(buffer: Box<dyn VideoBuffer>) -> Option<RemoteVideoFrame> {
 700    use gpui::RenderImage;
 701    use image::{Frame, RgbaImage};
 702    use livekit::webrtc::prelude::VideoFormatType;
 703    use smallvec::SmallVec;
 704    use std::alloc::{Layout, alloc};
 705
 706    let width = buffer.width();
 707    let height = buffer.height();
 708    let stride = width * 4;
 709    let byte_len = (stride * height) as usize;
 710    let argb_image = unsafe {
 711        // Motivation for this unsafe code is to avoid initializing the frame data, since to_argb
 712        // will write all bytes anyway.
 713        let start_ptr = alloc(Layout::array::<u8>(byte_len).log_err()?);
 714        if start_ptr.is_null() {
 715            return None;
 716        }
 717        let argb_frame_slice = std::slice::from_raw_parts_mut(start_ptr, byte_len);
 718        buffer.to_argb(
 719            VideoFormatType::ARGB,
 720            argb_frame_slice,
 721            stride,
 722            width as i32,
 723            height as i32,
 724        );
 725        Vec::from_raw_parts(start_ptr, byte_len, byte_len)
 726    };
 727
 728    // TODO: Unclear why providing argb_image to RgbaImage works properly.
 729    let image = RgbaImage::from_raw(width, height, argb_image)
 730        .with_context(|| "Bug: not enough bytes allocated for image.")
 731        .log_err()?;
 732
 733    Some(Arc::new(RenderImage::new(SmallVec::from_elem(
 734        Frame::new(image),
 735        1,
 736    ))))
 737}
 738
 739#[cfg(target_os = "macos")]
 740fn video_frame_buffer_to_webrtc(frame: ScreenCaptureFrame) -> Option<impl AsRef<dyn VideoBuffer>> {
 741    use livekit::webrtc;
 742
 743    let pixel_buffer = frame.0.as_concrete_TypeRef();
 744    std::mem::forget(frame.0);
 745    unsafe {
 746        Some(webrtc::video_frame::native::NativeBuffer::from_cv_pixel_buffer(pixel_buffer as _))
 747    }
 748}
 749
 750#[cfg(not(target_os = "macos"))]
 751fn video_frame_buffer_to_webrtc(frame: ScreenCaptureFrame) -> Option<impl AsRef<dyn VideoBuffer>> {
 752    use libwebrtc::native::yuv_helper::{abgr_to_nv12, argb_to_nv12};
 753    use livekit::webrtc::prelude::NV12Buffer;
 754    match frame.0 {
 755        scap::frame::Frame::BGRx(frame) => {
 756            let mut buffer = NV12Buffer::new(frame.width as u32, frame.height as u32);
 757            let (stride_y, stride_uv) = buffer.strides();
 758            let (data_y, data_uv) = buffer.data_mut();
 759            argb_to_nv12(
 760                &frame.data,
 761                frame.width as u32 * 4,
 762                data_y,
 763                stride_y,
 764                data_uv,
 765                stride_uv,
 766                frame.width,
 767                frame.height,
 768            );
 769            Some(buffer)
 770        }
 771        scap::frame::Frame::RGBx(frame) => {
 772            let mut buffer = NV12Buffer::new(frame.width as u32, frame.height as u32);
 773            let (stride_y, stride_uv) = buffer.strides();
 774            let (data_y, data_uv) = buffer.data_mut();
 775            abgr_to_nv12(
 776                &frame.data,
 777                frame.width as u32 * 4,
 778                data_y,
 779                stride_y,
 780                data_uv,
 781                stride_uv,
 782                frame.width,
 783                frame.height,
 784            );
 785            Some(buffer)
 786        }
 787        scap::frame::Frame::YUVFrame(yuvframe) => {
 788            let mut buffer = NV12Buffer::with_strides(
 789                yuvframe.width as u32,
 790                yuvframe.height as u32,
 791                yuvframe.luminance_stride as u32,
 792                yuvframe.chrominance_stride as u32,
 793            );
 794            let (luminance, chrominance) = buffer.data_mut();
 795            luminance.copy_from_slice(yuvframe.luminance_bytes.as_slice());
 796            chrominance.copy_from_slice(yuvframe.chrominance_bytes.as_slice());
 797            Some(buffer)
 798        }
 799        _ => {
 800            log::error!(
 801                "Expected BGRx or YUV frame from scap screen capture but got some other format."
 802            );
 803            None
 804        }
 805    }
 806}
 807
 808trait DeviceChangeListenerApi: Stream<Item = ()> + Sized {
 809    fn new(input: bool) -> Result<Self>;
 810}
 811
 812#[cfg(target_os = "macos")]
 813mod macos {
 814    use cocoa::{
 815        base::{id, nil},
 816        foundation::{NSProcessInfo, NSString},
 817    };
 818    use coreaudio::sys::{
 819        AudioObjectAddPropertyListener, AudioObjectID, AudioObjectPropertyAddress,
 820        AudioObjectRemovePropertyListener, OSStatus, kAudioHardwarePropertyDefaultInputDevice,
 821        kAudioHardwarePropertyDefaultOutputDevice, kAudioObjectPropertyElementMaster,
 822        kAudioObjectPropertyScopeGlobal, kAudioObjectSystemObject,
 823    };
 824    use futures::{StreamExt, channel::mpsc::UnboundedReceiver};
 825    use objc::{msg_send, sel, sel_impl};
 826
 827    /// A guard that prevents App Nap while held.
 828    ///
 829    /// On macOS, App Nap can throttle background apps to save power. This can cause
 830    /// audio artifacts when the app is not in the foreground. This guard tells macOS
 831    /// that we're doing latency-sensitive work and should not be throttled.
 832    ///
 833    /// See Apple's documentation on prioritizing work at the app level:
 834    /// https://developer.apple.com/library/archive/documentation/Performance/Conceptual/power_efficiency_guidelines_osx/PrioritizeWorkAtTheAppLevel.html
 835    pub struct PreventAppNapGuard {
 836        activity: id,
 837    }
 838
 839    // The activity token returned by NSProcessInfo is thread-safe
 840    unsafe impl Send for PreventAppNapGuard {}
 841
 842    // From NSProcessInfo.h
 843    const NS_ACTIVITY_IDLE_SYSTEM_SLEEP_DISABLED: u64 = 1 << 20;
 844    const NS_ACTIVITY_USER_INITIATED: u64 = 0x00FFFFFF | NS_ACTIVITY_IDLE_SYSTEM_SLEEP_DISABLED;
 845    const NS_ACTIVITY_USER_INITIATED_ALLOWING_IDLE_SYSTEM_SLEEP: u64 =
 846        NS_ACTIVITY_USER_INITIATED & !NS_ACTIVITY_IDLE_SYSTEM_SLEEP_DISABLED;
 847
 848    impl PreventAppNapGuard {
 849        pub fn new() -> Self {
 850            unsafe {
 851                let process_info = NSProcessInfo::processInfo(nil);
 852                #[allow(clippy::disallowed_methods)]
 853                let reason = NSString::alloc(nil).init_str("Audio playback in progress");
 854                let activity: id = msg_send![process_info, beginActivityWithOptions:NS_ACTIVITY_USER_INITIATED_ALLOWING_IDLE_SYSTEM_SLEEP reason:reason];
 855                let _: () = msg_send![reason, release];
 856                let _: () = msg_send![activity, retain];
 857                Self { activity }
 858            }
 859        }
 860    }
 861
 862    impl Drop for PreventAppNapGuard {
 863        fn drop(&mut self) {
 864            unsafe {
 865                let process_info = NSProcessInfo::processInfo(nil);
 866                let _: () = msg_send![process_info, endActivity:self.activity];
 867                let _: () = msg_send![self.activity, release];
 868            }
 869        }
 870    }
 871
 872    /// Implementation from: https://github.com/zed-industries/cpal/blob/fd8bc2fd39f1f5fdee5a0690656caff9a26d9d50/src/host/coreaudio/macos/property_listener.rs#L15
 873    pub struct CoreAudioDefaultDeviceChangeListener {
 874        rx: UnboundedReceiver<()>,
 875        callback: Box<PropertyListenerCallbackWrapper>,
 876        input: bool,
 877        device_id: AudioObjectID, // Store the device ID to properly remove listeners
 878    }
 879
 880    trait _AssertSend: Send {}
 881    impl _AssertSend for CoreAudioDefaultDeviceChangeListener {}
 882
 883    struct PropertyListenerCallbackWrapper(Box<dyn FnMut() + Send>);
 884
 885    unsafe extern "C" fn property_listener_handler_shim(
 886        _: AudioObjectID,
 887        _: u32,
 888        _: *const AudioObjectPropertyAddress,
 889        callback: *mut ::std::os::raw::c_void,
 890    ) -> OSStatus {
 891        let wrapper = callback as *mut PropertyListenerCallbackWrapper;
 892        unsafe { (*wrapper).0() };
 893        0
 894    }
 895
 896    impl super::DeviceChangeListenerApi for CoreAudioDefaultDeviceChangeListener {
 897        fn new(input: bool) -> anyhow::Result<Self> {
 898            let (tx, rx) = futures::channel::mpsc::unbounded();
 899
 900            let callback = Box::new(PropertyListenerCallbackWrapper(Box::new(move || {
 901                tx.unbounded_send(()).ok();
 902            })));
 903
 904            // Get the current default device ID
 905            let device_id = unsafe {
 906                // Listen for default device changes
 907                coreaudio::Error::from_os_status(AudioObjectAddPropertyListener(
 908                    kAudioObjectSystemObject,
 909                    &AudioObjectPropertyAddress {
 910                        mSelector: if input {
 911                            kAudioHardwarePropertyDefaultInputDevice
 912                        } else {
 913                            kAudioHardwarePropertyDefaultOutputDevice
 914                        },
 915                        mScope: kAudioObjectPropertyScopeGlobal,
 916                        mElement: kAudioObjectPropertyElementMaster,
 917                    },
 918                    Some(property_listener_handler_shim),
 919                    &*callback as *const _ as *mut _,
 920                ))?;
 921
 922                // Also listen for changes to the device configuration
 923                let device_id = if input {
 924                    let mut input_device: AudioObjectID = 0;
 925                    let mut prop_size = std::mem::size_of::<AudioObjectID>() as u32;
 926                    let result = coreaudio::sys::AudioObjectGetPropertyData(
 927                        kAudioObjectSystemObject,
 928                        &AudioObjectPropertyAddress {
 929                            mSelector: kAudioHardwarePropertyDefaultInputDevice,
 930                            mScope: kAudioObjectPropertyScopeGlobal,
 931                            mElement: kAudioObjectPropertyElementMaster,
 932                        },
 933                        0,
 934                        std::ptr::null(),
 935                        &mut prop_size as *mut _,
 936                        &mut input_device as *mut _ as *mut _,
 937                    );
 938                    if result != 0 {
 939                        log::warn!("Failed to get default input device ID");
 940                        0
 941                    } else {
 942                        input_device
 943                    }
 944                } else {
 945                    let mut output_device: AudioObjectID = 0;
 946                    let mut prop_size = std::mem::size_of::<AudioObjectID>() as u32;
 947                    let result = coreaudio::sys::AudioObjectGetPropertyData(
 948                        kAudioObjectSystemObject,
 949                        &AudioObjectPropertyAddress {
 950                            mSelector: kAudioHardwarePropertyDefaultOutputDevice,
 951                            mScope: kAudioObjectPropertyScopeGlobal,
 952                            mElement: kAudioObjectPropertyElementMaster,
 953                        },
 954                        0,
 955                        std::ptr::null(),
 956                        &mut prop_size as *mut _,
 957                        &mut output_device as *mut _ as *mut _,
 958                    );
 959                    if result != 0 {
 960                        log::warn!("Failed to get default output device ID");
 961                        0
 962                    } else {
 963                        output_device
 964                    }
 965                };
 966
 967                if device_id != 0 {
 968                    // Listen for format changes on the device
 969                    coreaudio::Error::from_os_status(AudioObjectAddPropertyListener(
 970                        device_id,
 971                        &AudioObjectPropertyAddress {
 972                            mSelector: coreaudio::sys::kAudioDevicePropertyStreamFormat,
 973                            mScope: if input {
 974                                coreaudio::sys::kAudioObjectPropertyScopeInput
 975                            } else {
 976                                coreaudio::sys::kAudioObjectPropertyScopeOutput
 977                            },
 978                            mElement: kAudioObjectPropertyElementMaster,
 979                        },
 980                        Some(property_listener_handler_shim),
 981                        &*callback as *const _ as *mut _,
 982                    ))?;
 983                }
 984
 985                device_id
 986            };
 987
 988            Ok(Self {
 989                rx,
 990                callback,
 991                input,
 992                device_id,
 993            })
 994        }
 995    }
 996
 997    impl Drop for CoreAudioDefaultDeviceChangeListener {
 998        fn drop(&mut self) {
 999            unsafe {
1000                // Remove the system-level property listener
1001                AudioObjectRemovePropertyListener(
1002                    kAudioObjectSystemObject,
1003                    &AudioObjectPropertyAddress {
1004                        mSelector: if self.input {
1005                            kAudioHardwarePropertyDefaultInputDevice
1006                        } else {
1007                            kAudioHardwarePropertyDefaultOutputDevice
1008                        },
1009                        mScope: kAudioObjectPropertyScopeGlobal,
1010                        mElement: kAudioObjectPropertyElementMaster,
1011                    },
1012                    Some(property_listener_handler_shim),
1013                    &*self.callback as *const _ as *mut _,
1014                );
1015
1016                // Remove the device-specific property listener if we have a valid device ID
1017                if self.device_id != 0 {
1018                    AudioObjectRemovePropertyListener(
1019                        self.device_id,
1020                        &AudioObjectPropertyAddress {
1021                            mSelector: coreaudio::sys::kAudioDevicePropertyStreamFormat,
1022                            mScope: if self.input {
1023                                coreaudio::sys::kAudioObjectPropertyScopeInput
1024                            } else {
1025                                coreaudio::sys::kAudioObjectPropertyScopeOutput
1026                            },
1027                            mElement: kAudioObjectPropertyElementMaster,
1028                        },
1029                        Some(property_listener_handler_shim),
1030                        &*self.callback as *const _ as *mut _,
1031                    );
1032                }
1033            }
1034        }
1035    }
1036
1037    impl futures::Stream for CoreAudioDefaultDeviceChangeListener {
1038        type Item = ();
1039
1040        fn poll_next(
1041            mut self: std::pin::Pin<&mut Self>,
1042            cx: &mut std::task::Context<'_>,
1043        ) -> std::task::Poll<Option<Self::Item>> {
1044            self.rx.poll_next_unpin(cx)
1045        }
1046    }
1047}
1048
1049#[cfg(target_os = "macos")]
1050type DeviceChangeListener = macos::CoreAudioDefaultDeviceChangeListener;
1051#[cfg(target_os = "macos")]
1052use macos::PreventAppNapGuard;
1053
1054#[cfg(not(target_os = "macos"))]
1055mod noop_change_listener {
1056    use std::task::Poll;
1057
1058    use super::DeviceChangeListenerApi;
1059
1060    pub struct NoopOutputDeviceChangelistener {}
1061
1062    impl DeviceChangeListenerApi for NoopOutputDeviceChangelistener {
1063        fn new(_input: bool) -> anyhow::Result<Self> {
1064            Ok(NoopOutputDeviceChangelistener {})
1065        }
1066    }
1067
1068    impl futures::Stream for NoopOutputDeviceChangelistener {
1069        type Item = ();
1070
1071        fn poll_next(
1072            self: std::pin::Pin<&mut Self>,
1073            _cx: &mut std::task::Context<'_>,
1074        ) -> Poll<Option<Self::Item>> {
1075            Poll::Pending
1076        }
1077    }
1078}
1079
1080#[cfg(not(target_os = "macos"))]
1081type DeviceChangeListener = noop_change_listener::NoopOutputDeviceChangelistener;