playback.rs

   1use anyhow::{Context as _, Result};
   2
   3use audio::{AudioSettings, CHANNEL_COUNT, SAMPLE_RATE};
   4use cpal::DeviceId;
   5use cpal::traits::{DeviceTrait, StreamTrait as _};
   6use futures::channel::mpsc::Sender;
   7use futures::{Stream, StreamExt as _};
   8use gpui::{
   9    AsyncApp, BackgroundExecutor, Priority, ScreenCaptureFrame, ScreenCaptureSource,
  10    ScreenCaptureStream, Task,
  11};
  12use libwebrtc::native::{apm, audio_mixer, audio_resampler};
  13use livekit::track;
  14
  15use livekit::webrtc::{
  16    audio_frame::AudioFrame,
  17    audio_source::{AudioSourceOptions, RtcAudioSource, native::NativeAudioSource},
  18    audio_stream::native::NativeAudioStream,
  19    video_frame::{VideoBuffer, VideoFrame, VideoRotation},
  20    video_source::{RtcVideoSource, VideoResolution, native::NativeVideoSource},
  21    video_stream::native::NativeVideoStream,
  22};
  23use log::info;
  24use parking_lot::Mutex;
  25use rodio::Source;
  26use rodio::conversions::SampleTypeConverter;
  27use rodio::source::{AutomaticGainControlSettings, LimitSettings};
  28use serde::{Deserialize, Serialize};
  29use settings::Settings;
  30use std::cell::RefCell;
  31use std::sync::Weak;
  32use std::sync::atomic::{AtomicI32, AtomicU64, Ordering};
  33use std::time::{Duration, Instant};
  34use std::{borrow::Cow, collections::VecDeque, sync::Arc};
  35use util::{ResultExt as _, maybe};
  36
  37struct TimestampedFrame {
  38    frame: AudioFrame<'static>,
  39    captured_at: Instant,
  40}
  41
  42pub(crate) struct AudioStack {
  43    executor: BackgroundExecutor,
  44    apm: Arc<Mutex<apm::AudioProcessingModule>>,
  45    mixer: Arc<Mutex<audio_mixer::AudioMixer>>,
  46    _output_task: RefCell<Weak<Task<()>>>,
  47    next_ssrc: AtomicI32,
  48}
  49
  50impl AudioStack {
  51    pub(crate) fn new(executor: BackgroundExecutor) -> Self {
  52        let apm = Arc::new(Mutex::new(apm::AudioProcessingModule::new(
  53            true, true, true, true,
  54        )));
  55        let mixer = Arc::new(Mutex::new(audio_mixer::AudioMixer::new()));
  56        Self {
  57            executor,
  58            apm,
  59            mixer,
  60            _output_task: RefCell::new(Weak::new()),
  61            next_ssrc: AtomicI32::new(1),
  62        }
  63    }
  64
  65    pub(crate) fn play_remote_audio_track(
  66        &self,
  67        track: &livekit::track::RemoteAudioTrack,
  68        output_audio_device: Option<DeviceId>,
  69    ) -> AudioStream {
  70        let output_task = self.start_output(output_audio_device);
  71
  72        let next_ssrc = self.next_ssrc.fetch_add(1, Ordering::Relaxed);
  73        let source = AudioMixerSource {
  74            ssrc: next_ssrc,
  75            sample_rate: SAMPLE_RATE.get(),
  76            num_channels: CHANNEL_COUNT.get() as u32,
  77            buffer: Arc::default(),
  78        };
  79        self.mixer.lock().add_source(source.clone());
  80
  81        let mut stream = NativeAudioStream::new(
  82            track.rtc_track(),
  83            source.sample_rate as i32,
  84            source.num_channels as i32,
  85        );
  86
  87        let receive_task = self.executor.spawn_with_priority(Priority::RealtimeAudio, {
  88            let source = source.clone();
  89            async move {
  90                while let Some(frame) = stream.next().await {
  91                    source.receive(frame);
  92                }
  93            }
  94        });
  95
  96        let mixer = self.mixer.clone();
  97        let on_drop = util::defer(move || {
  98            mixer.lock().remove_source(source.ssrc);
  99            drop(receive_task);
 100            drop(output_task);
 101        });
 102
 103        AudioStream::Output {
 104            _drop: Box::new(on_drop),
 105        }
 106    }
 107
 108    fn start_output(&self, output_audio_device: Option<DeviceId>) -> Arc<Task<()>> {
 109        if let Some(task) = self._output_task.borrow().upgrade() {
 110            return task;
 111        }
 112        let task = Arc::new(self.executor.spawn({
 113            let apm = self.apm.clone();
 114            let mixer = self.mixer.clone();
 115            let executor = self.executor.clone();
 116            async move {
 117                Self::play_output(
 118                    executor,
 119                    apm,
 120                    mixer,
 121                    SAMPLE_RATE.get(),
 122                    CHANNEL_COUNT.get().into(),
 123                    output_audio_device,
 124                )
 125                .await
 126                .log_err();
 127            }
 128        }));
 129        *self._output_task.borrow_mut() = Arc::downgrade(&task);
 130        task
 131    }
 132
 133    pub(crate) fn capture_local_microphone_track(
 134        &self,
 135        user_name: String,
 136        is_staff: bool,
 137        cx: &AsyncApp,
 138    ) -> Result<(crate::LocalAudioTrack, AudioStream, Arc<AtomicU64>)> {
 139        let source = NativeAudioSource::new(
 140            // n.b. this struct's options are always ignored, noise cancellation is provided by apm.
 141            AudioSourceOptions::default(),
 142            SAMPLE_RATE.get(),
 143            CHANNEL_COUNT.get().into(),
 144            10,
 145        );
 146
 147        let speaker = Speaker {
 148            name: user_name,
 149            is_staff,
 150        };
 151        log::info!("Microphone speaker: {speaker:?}");
 152        let track_name = serde_urlencoded::to_string(speaker)
 153            .context("Could not encode user information in track name")?;
 154
 155        let track = track::LocalAudioTrack::create_audio_track(
 156            &track_name,
 157            RtcAudioSource::Native(source.clone()),
 158        );
 159
 160        let apm = self.apm.clone();
 161
 162        let input_lag_us = Arc::new(AtomicU64::new(0));
 163        let (frame_tx, mut frame_rx) = futures::channel::mpsc::channel::<TimestampedFrame>(1);
 164        let transmit_task = self.executor.spawn_with_priority(Priority::RealtimeAudio, {
 165            let input_lag_us = input_lag_us.clone();
 166            async move {
 167                while let Some(timestamped) = frame_rx.next().await {
 168                    let lag = timestamped.captured_at.elapsed();
 169                    input_lag_us.store(lag.as_micros() as u64, Ordering::Relaxed);
 170                    source.capture_frame(&timestamped.frame).await.log_err();
 171                }
 172            }
 173        });
 174        let capture_task = {
 175            let input_audio_device =
 176                AudioSettings::try_read_global(cx, |settings| settings.input_audio_device.clone())
 177                    .flatten();
 178            let executor = self.executor.clone();
 179            self.executor.spawn(async move {
 180                Self::capture_input(
 181                    executor,
 182                    apm,
 183                    frame_tx,
 184                    SAMPLE_RATE.get(), // TODO(audio): was legacy removed for now
 185                    CHANNEL_COUNT.get().into(),
 186                    input_audio_device,
 187                )
 188                .await
 189            })
 190        };
 191
 192        let on_drop = util::defer(|| {
 193            drop(transmit_task);
 194            drop(capture_task);
 195        });
 196        Ok((
 197            super::LocalAudioTrack(track),
 198            AudioStream::Output {
 199                _drop: Box::new(on_drop),
 200            },
 201            input_lag_us,
 202        ))
 203    }
 204
 205    async fn play_output(
 206        executor: BackgroundExecutor,
 207        apm: Arc<Mutex<apm::AudioProcessingModule>>,
 208        mixer: Arc<Mutex<audio_mixer::AudioMixer>>,
 209        sample_rate: u32,
 210        _num_channels: u32,
 211        output_audio_device: Option<DeviceId>,
 212    ) -> Result<()> {
 213        // Prevent App Nap from throttling audio playback on macOS.
 214        // This guard is held for the entire duration of audio output.
 215        #[cfg(target_os = "macos")]
 216        let _prevent_app_nap = PreventAppNapGuard::new();
 217
 218        loop {
 219            let mut device_change_listener = DeviceChangeListener::new(false)?;
 220            let (output_device, output_config) =
 221                crate::default_device(false, output_audio_device.as_ref())?;
 222            info!("Output config: {output_config:?}");
 223            let (end_on_drop_tx, end_on_drop_rx) = std::sync::mpsc::channel::<()>();
 224            let mixer = mixer.clone();
 225            let apm = apm.clone();
 226            let mut resampler = audio_resampler::AudioResampler::default();
 227            let mut buf = Vec::new();
 228
 229            executor
 230                .spawn_with_priority(Priority::RealtimeAudio, async move {
 231                    let output_stream = output_device.build_output_stream(
 232                        &output_config.config(),
 233                        {
 234                            move |mut data, _info| {
 235                                while data.len() > 0 {
 236                                    if data.len() <= buf.len() {
 237                                        let rest = buf.split_off(data.len());
 238                                        data.copy_from_slice(&buf);
 239                                        buf = rest;
 240                                        return;
 241                                    }
 242                                    if buf.len() > 0 {
 243                                        let (prefix, suffix) = data.split_at_mut(buf.len());
 244                                        prefix.copy_from_slice(&buf);
 245                                        data = suffix;
 246                                    }
 247
 248                                    let mut mixer = mixer.lock();
 249                                    let mixed = mixer.mix(output_config.channels() as usize);
 250                                    let sampled = resampler.remix_and_resample(
 251                                        mixed,
 252                                        sample_rate / 100,
 253                                        // We need to assume output number of channels as otherwise we will
 254                                        // crash in process_reverse_stream otherwise as livekit's audio resampler
 255                                        // does not seem to support non-matching channel counts.
 256                                        // NOTE: you can verify this by debug printing buf.len() after this stage.
 257                                        // For 2->4 channel upmix, we should see buf.len=1920, buf we get only 960.
 258                                        output_config.channels() as u32,
 259                                        sample_rate,
 260                                        output_config.channels() as u32,
 261                                        output_config.sample_rate(),
 262                                    );
 263                                    buf = sampled.to_vec();
 264                                    apm.lock()
 265                                        .process_reverse_stream(
 266                                            &mut buf,
 267                                            output_config.sample_rate() as i32,
 268                                            output_config.channels() as i32,
 269                                        )
 270                                        .ok();
 271                                }
 272                            }
 273                        },
 274                        |error| log::error!("error playing audio track: {:?}", error),
 275                        Some(Duration::from_millis(100)),
 276                    );
 277
 278                    let Some(output_stream) = output_stream.log_err() else {
 279                        return;
 280                    };
 281
 282                    output_stream.play().log_err();
 283                    // Block forever to keep the output stream alive
 284                    end_on_drop_rx.recv().ok();
 285                })
 286                .detach();
 287
 288            device_change_listener.next().await;
 289            drop(end_on_drop_tx)
 290        }
 291    }
 292
 293    async fn capture_input(
 294        executor: BackgroundExecutor,
 295        apm: Arc<Mutex<apm::AudioProcessingModule>>,
 296        frame_tx: Sender<TimestampedFrame>,
 297        sample_rate: u32,
 298        num_channels: u32,
 299        input_audio_device: Option<DeviceId>,
 300    ) -> Result<()> {
 301        loop {
 302            let mut device_change_listener = DeviceChangeListener::new(true)?;
 303            let (device, config) = crate::default_device(true, input_audio_device.as_ref())?;
 304            let (end_on_drop_tx, end_on_drop_rx) = std::sync::mpsc::channel::<()>();
 305            let apm = apm.clone();
 306            let mut frame_tx = frame_tx.clone();
 307            let mut resampler = audio_resampler::AudioResampler::default();
 308
 309            executor
 310                .spawn_with_priority(Priority::RealtimeAudio, async move {
 311                    maybe!({
 312                        if let Some(desc) = device.description().ok() {
 313                            log::info!("Using microphone: {}", desc.name())
 314                        } else {
 315                            log::info!("Using microphone: <unknown>");
 316                        }
 317
 318                        let ten_ms_buffer_size =
 319                            (config.channels() as u32 * config.sample_rate() / 100) as usize;
 320                        let mut buf: Vec<i16> = Vec::with_capacity(ten_ms_buffer_size);
 321                        let mut rodio_effects = RodioEffectsAdaptor::new(buf.len())
 322                            .automatic_gain_control(AutomaticGainControlSettings {
 323                                target_level: 0.50,
 324                                attack_time: Duration::from_secs(1),
 325                                release_time: Duration::from_secs(0),
 326                                absolute_max_gain: 5.0,
 327                            })
 328                            .limit(LimitSettings::live_performance());
 329
 330                        let stream = device
 331                            .build_input_stream_raw(
 332                                &config.config(),
 333                                config.sample_format(),
 334                                move |data, _: &_| {
 335                                    let captured_at = Instant::now();
 336                                    let data = crate::get_sample_data(config.sample_format(), data)
 337                                        .log_err();
 338                                    let Some(data) = data else {
 339                                        return;
 340                                    };
 341                                    let mut data = data.as_slice();
 342
 343                                    while data.len() > 0 {
 344                                        let remainder =
 345                                            (buf.capacity() - buf.len()).min(data.len());
 346                                        buf.extend_from_slice(&data[..remainder]);
 347                                        data = &data[remainder..];
 348
 349                                        if buf.capacity() == buf.len() {
 350                                            let mut sampled = resampler
 351                                                .remix_and_resample(
 352                                                    buf.as_slice(),
 353                                                    config.sample_rate() / 100,
 354                                                    config.channels() as u32,
 355                                                    config.sample_rate(),
 356                                                    num_channels,
 357                                                    sample_rate,
 358                                                )
 359                                                .to_owned();
 360
 361                                            if audio::LIVE_SETTINGS
 362                                                .auto_microphone_volume
 363                                                .load(Ordering::Relaxed)
 364                                            {
 365                                                rodio_effects
 366                                                    .inner_mut()
 367                                                    .inner_mut()
 368                                                    .fill_buffer_with(&sampled);
 369                                                sampled.clear();
 370                                                sampled.extend(SampleTypeConverter::<_, i16>::new(
 371                                                    rodio_effects.by_ref(),
 372                                                ));
 373                                            }
 374
 375                                            apm.lock()
 376                                                .process_stream(
 377                                                    &mut sampled,
 378                                                    sample_rate as i32,
 379                                                    num_channels as i32,
 380                                                )
 381                                                .log_err();
 382                                            buf.clear();
 383
 384                                            frame_tx
 385                                                .try_send(TimestampedFrame {
 386                                                    frame: AudioFrame {
 387                                                        data: Cow::Owned(sampled),
 388                                                        sample_rate,
 389                                                        num_channels,
 390                                                        samples_per_channel: sample_rate / 100,
 391                                                    },
 392                                                    captured_at,
 393                                                })
 394                                                .ok();
 395                                        }
 396                                    }
 397                                },
 398                                |err| log::error!("error capturing audio track: {:?}", err),
 399                                Some(Duration::from_millis(100)),
 400                            )
 401                            .context("failed to build input stream")?;
 402
 403                        stream.play()?;
 404                        // Keep the thread alive and holding onto the `stream`
 405                        end_on_drop_rx.recv().ok();
 406                        anyhow::Ok(Some(()))
 407                    })
 408                    .log_err();
 409                })
 410                .detach();
 411
 412            device_change_listener.next().await;
 413            drop(end_on_drop_tx)
 414        }
 415    }
 416}
 417
 418/// This allows using of Rodio's effects library within our home brewn audio
 419/// pipeline. The alternative would be inlining Rodio's effects which is
 420/// problematic from a legal stance. We would then have to make clear that code
 421/// is not owned by zed-industries while the code would be surrounded by
 422/// zed-industries owned code.
 423///
 424/// This adaptor does incur a slight performance penalty (copying into a
 425/// pre-allocated vec and back) however the impact will be immeasurably low.
 426///
 427/// There is no latency impact.
 428pub struct RodioEffectsAdaptor {
 429    input: Vec<rodio::Sample>,
 430    pos: usize,
 431}
 432
 433impl RodioEffectsAdaptor {
 434    // This implementation incorrect terminology confusing everyone. A normal
 435    // audio frame consists of all samples for one moment in time (one for mono,
 436    // two for stereo). Here a frame of audio refers to a 10ms buffer of samples.
 437    fn new(samples_per_frame: usize) -> Self {
 438        Self {
 439            input: Vec::with_capacity(samples_per_frame),
 440            pos: 0,
 441        }
 442    }
 443
 444    fn fill_buffer_with(&mut self, integer_samples: &[i16]) {
 445        self.input.clear();
 446        self.input.extend(SampleTypeConverter::<_, f32>::new(
 447            integer_samples.iter().copied(),
 448        ));
 449        self.pos = 0;
 450    }
 451}
 452
 453impl Iterator for RodioEffectsAdaptor {
 454    type Item = rodio::Sample;
 455
 456    fn next(&mut self) -> Option<Self::Item> {
 457        let sample = self.input.get(self.pos)?;
 458        self.pos += 1;
 459        Some(*sample)
 460    }
 461}
 462
 463impl rodio::Source for RodioEffectsAdaptor {
 464    fn current_span_len(&self) -> Option<usize> {
 465        None
 466    }
 467
 468    fn channels(&self) -> rodio::ChannelCount {
 469        rodio::nz!(2)
 470    }
 471
 472    fn sample_rate(&self) -> rodio::SampleRate {
 473        rodio::nz!(48000)
 474    }
 475
 476    fn total_duration(&self) -> Option<Duration> {
 477        None
 478    }
 479}
 480
 481#[derive(Serialize, Deserialize, Debug)]
 482pub struct Speaker {
 483    pub name: String,
 484    pub is_staff: bool,
 485}
 486
 487use super::LocalVideoTrack;
 488
 489pub enum AudioStream {
 490    Input { _task: Task<()> },
 491    Output { _drop: Box<dyn std::any::Any> },
 492}
 493
 494pub(crate) async fn capture_local_video_track(
 495    capture_source: &dyn ScreenCaptureSource,
 496    cx: &mut gpui::AsyncApp,
 497) -> Result<(crate::LocalVideoTrack, Box<dyn ScreenCaptureStream>)> {
 498    let metadata = capture_source.metadata()?;
 499    let track_source = gpui_tokio::Tokio::spawn(cx, async move {
 500        NativeVideoSource::new(
 501            VideoResolution {
 502                width: metadata.resolution.width.0 as u32,
 503                height: metadata.resolution.height.0 as u32,
 504            },
 505            true,
 506        )
 507    })
 508    .await?;
 509
 510    let capture_stream = capture_source
 511        .stream(cx.foreground_executor(), {
 512            let track_source = track_source.clone();
 513            Box::new(move |frame| {
 514                if let Some(buffer) = video_frame_buffer_to_webrtc(frame) {
 515                    track_source.capture_frame(&VideoFrame {
 516                        rotation: VideoRotation::VideoRotation0,
 517                        timestamp_us: 0,
 518                        buffer,
 519                    });
 520                }
 521            })
 522        })
 523        .await??;
 524
 525    Ok((
 526        LocalVideoTrack(track::LocalVideoTrack::create_video_track(
 527            "screen share",
 528            RtcVideoSource::Native(track_source),
 529        )),
 530        capture_stream,
 531    ))
 532}
 533
 534#[derive(Clone)]
 535struct AudioMixerSource {
 536    ssrc: i32,
 537    sample_rate: u32,
 538    num_channels: u32,
 539    buffer: Arc<Mutex<VecDeque<Vec<i16>>>>,
 540}
 541
 542impl AudioMixerSource {
 543    fn receive(&self, frame: AudioFrame) {
 544        assert_eq!(
 545            frame.data.len() as u32,
 546            self.sample_rate * self.num_channels / 100
 547        );
 548
 549        let mut buffer = self.buffer.lock();
 550        buffer.push_back(frame.data.to_vec());
 551        while buffer.len() > 10 {
 552            buffer.pop_front();
 553        }
 554    }
 555}
 556
 557impl libwebrtc::native::audio_mixer::AudioMixerSource for AudioMixerSource {
 558    fn ssrc(&self) -> i32 {
 559        self.ssrc
 560    }
 561
 562    fn preferred_sample_rate(&self) -> u32 {
 563        self.sample_rate
 564    }
 565
 566    fn get_audio_frame_with_info<'a>(&self, target_sample_rate: u32) -> Option<AudioFrame<'_>> {
 567        assert_eq!(self.sample_rate, target_sample_rate);
 568        let buf = self.buffer.lock().pop_front()?;
 569        Some(AudioFrame {
 570            data: Cow::Owned(buf),
 571            sample_rate: self.sample_rate,
 572            num_channels: self.num_channels,
 573            samples_per_channel: self.sample_rate / 100,
 574        })
 575    }
 576}
 577
 578pub fn play_remote_video_track(
 579    track: &crate::RemoteVideoTrack,
 580    executor: &BackgroundExecutor,
 581) -> impl Stream<Item = RemoteVideoFrame> + use<> {
 582    #[cfg(target_os = "macos")]
 583    {
 584        _ = executor;
 585        let mut pool = None;
 586        let most_recent_frame_size = (0, 0);
 587        NativeVideoStream::new(track.0.rtc_track()).filter_map(move |frame| {
 588            if pool == None
 589                || most_recent_frame_size != (frame.buffer.width(), frame.buffer.height())
 590            {
 591                pool = create_buffer_pool(frame.buffer.width(), frame.buffer.height()).log_err();
 592            }
 593            let pool = pool.clone();
 594            async move {
 595                if frame.buffer.width() < 10 && frame.buffer.height() < 10 {
 596                    // when the remote stops sharing, we get an 8x8 black image.
 597                    // In a lil bit, the unpublish will come through and close the view,
 598                    // but until then, don't flash black.
 599                    return None;
 600                }
 601
 602                video_frame_buffer_from_webrtc(pool?, frame.buffer)
 603            }
 604        })
 605    }
 606    #[cfg(not(target_os = "macos"))]
 607    {
 608        let executor = executor.clone();
 609        NativeVideoStream::new(track.0.rtc_track()).filter_map(move |frame| {
 610            executor.spawn(async move { video_frame_buffer_from_webrtc(frame.buffer) })
 611        })
 612    }
 613}
 614
 615#[cfg(target_os = "macos")]
 616fn create_buffer_pool(
 617    width: u32,
 618    height: u32,
 619) -> Result<core_video::pixel_buffer_pool::CVPixelBufferPool> {
 620    use core_foundation::{base::TCFType, number::CFNumber, string::CFString};
 621    use core_video::pixel_buffer;
 622    use core_video::{
 623        pixel_buffer::kCVPixelFormatType_420YpCbCr8BiPlanarFullRange,
 624        pixel_buffer_io_surface::kCVPixelBufferIOSurfaceCoreAnimationCompatibilityKey,
 625        pixel_buffer_pool::{self},
 626    };
 627
 628    let width_key: CFString =
 629        unsafe { CFString::wrap_under_get_rule(pixel_buffer::kCVPixelBufferWidthKey) };
 630    let height_key: CFString =
 631        unsafe { CFString::wrap_under_get_rule(pixel_buffer::kCVPixelBufferHeightKey) };
 632    let animation_key: CFString = unsafe {
 633        CFString::wrap_under_get_rule(kCVPixelBufferIOSurfaceCoreAnimationCompatibilityKey)
 634    };
 635    let format_key: CFString =
 636        unsafe { CFString::wrap_under_get_rule(pixel_buffer::kCVPixelBufferPixelFormatTypeKey) };
 637
 638    let yes: CFNumber = 1.into();
 639    let width: CFNumber = (width as i32).into();
 640    let height: CFNumber = (height as i32).into();
 641    let format: CFNumber = (kCVPixelFormatType_420YpCbCr8BiPlanarFullRange as i64).into();
 642
 643    let buffer_attributes = core_foundation::dictionary::CFDictionary::from_CFType_pairs(&[
 644        (width_key, width.into_CFType()),
 645        (height_key, height.into_CFType()),
 646        (animation_key, yes.into_CFType()),
 647        (format_key, format.into_CFType()),
 648    ]);
 649
 650    pixel_buffer_pool::CVPixelBufferPool::new(None, Some(&buffer_attributes)).map_err(|cv_return| {
 651        anyhow::anyhow!("failed to create pixel buffer pool: CVReturn({cv_return})",)
 652    })
 653}
 654
 655#[cfg(target_os = "macos")]
 656pub type RemoteVideoFrame = core_video::pixel_buffer::CVPixelBuffer;
 657
 658#[cfg(target_os = "macos")]
 659fn video_frame_buffer_from_webrtc(
 660    pool: core_video::pixel_buffer_pool::CVPixelBufferPool,
 661    buffer: Box<dyn VideoBuffer>,
 662) -> Option<RemoteVideoFrame> {
 663    use core_foundation::base::TCFType;
 664    use core_video::{pixel_buffer::CVPixelBuffer, r#return::kCVReturnSuccess};
 665    use livekit::webrtc::native::yuv_helper::i420_to_nv12;
 666
 667    if let Some(native) = buffer.as_native() {
 668        let pixel_buffer = native.get_cv_pixel_buffer();
 669        if pixel_buffer.is_null() {
 670            return None;
 671        }
 672        return unsafe { Some(CVPixelBuffer::wrap_under_get_rule(pixel_buffer as _)) };
 673    }
 674
 675    let i420_buffer = buffer.as_i420()?;
 676    let pixel_buffer = pool.create_pixel_buffer().log_err()?;
 677
 678    let image_buffer = unsafe {
 679        if pixel_buffer.lock_base_address(0) != kCVReturnSuccess {
 680            return None;
 681        }
 682
 683        let dst_y = pixel_buffer.get_base_address_of_plane(0);
 684        let dst_y_stride = pixel_buffer.get_bytes_per_row_of_plane(0);
 685        let dst_y_len = pixel_buffer.get_height_of_plane(0) * dst_y_stride;
 686        let dst_uv = pixel_buffer.get_base_address_of_plane(1);
 687        let dst_uv_stride = pixel_buffer.get_bytes_per_row_of_plane(1);
 688        let dst_uv_len = pixel_buffer.get_height_of_plane(1) * dst_uv_stride;
 689        let width = pixel_buffer.get_width();
 690        let height = pixel_buffer.get_height();
 691        let dst_y_buffer = std::slice::from_raw_parts_mut(dst_y as *mut u8, dst_y_len);
 692        let dst_uv_buffer = std::slice::from_raw_parts_mut(dst_uv as *mut u8, dst_uv_len);
 693
 694        let (stride_y, stride_u, stride_v) = i420_buffer.strides();
 695        let (src_y, src_u, src_v) = i420_buffer.data();
 696        i420_to_nv12(
 697            src_y,
 698            stride_y,
 699            src_u,
 700            stride_u,
 701            src_v,
 702            stride_v,
 703            dst_y_buffer,
 704            dst_y_stride as u32,
 705            dst_uv_buffer,
 706            dst_uv_stride as u32,
 707            width as i32,
 708            height as i32,
 709        );
 710
 711        if pixel_buffer.unlock_base_address(0) != kCVReturnSuccess {
 712            return None;
 713        }
 714
 715        pixel_buffer
 716    };
 717
 718    Some(image_buffer)
 719}
 720
 721#[cfg(not(target_os = "macos"))]
 722pub type RemoteVideoFrame = Arc<gpui::RenderImage>;
 723
 724#[cfg(not(target_os = "macos"))]
 725fn video_frame_buffer_from_webrtc(buffer: Box<dyn VideoBuffer>) -> Option<RemoteVideoFrame> {
 726    use gpui::RenderImage;
 727    use image::{Frame, RgbaImage};
 728    use livekit::webrtc::prelude::VideoFormatType;
 729    use smallvec::SmallVec;
 730    use std::alloc::{Layout, alloc};
 731
 732    let width = buffer.width();
 733    let height = buffer.height();
 734    let stride = width * 4;
 735    let byte_len = (stride * height) as usize;
 736    let argb_image = unsafe {
 737        // Motivation for this unsafe code is to avoid initializing the frame data, since to_argb
 738        // will write all bytes anyway.
 739        let start_ptr = alloc(Layout::array::<u8>(byte_len).log_err()?);
 740        if start_ptr.is_null() {
 741            return None;
 742        }
 743        let argb_frame_slice = std::slice::from_raw_parts_mut(start_ptr, byte_len);
 744        buffer.to_argb(
 745            VideoFormatType::ARGB,
 746            argb_frame_slice,
 747            stride,
 748            width as i32,
 749            height as i32,
 750        );
 751        Vec::from_raw_parts(start_ptr, byte_len, byte_len)
 752    };
 753
 754    // TODO: Unclear why providing argb_image to RgbaImage works properly.
 755    let image = RgbaImage::from_raw(width, height, argb_image)
 756        .with_context(|| "Bug: not enough bytes allocated for image.")
 757        .log_err()?;
 758
 759    Some(Arc::new(RenderImage::new(SmallVec::from_elem(
 760        Frame::new(image),
 761        1,
 762    ))))
 763}
 764
 765#[cfg(target_os = "macos")]
 766fn video_frame_buffer_to_webrtc(frame: ScreenCaptureFrame) -> Option<impl AsRef<dyn VideoBuffer>> {
 767    use livekit::webrtc;
 768
 769    let pixel_buffer = frame.0.as_concrete_TypeRef();
 770    std::mem::forget(frame.0);
 771    unsafe {
 772        Some(webrtc::video_frame::native::NativeBuffer::from_cv_pixel_buffer(pixel_buffer as _))
 773    }
 774}
 775
 776#[cfg(not(target_os = "macos"))]
 777fn video_frame_buffer_to_webrtc(frame: ScreenCaptureFrame) -> Option<impl AsRef<dyn VideoBuffer>> {
 778    use libwebrtc::native::yuv_helper::{abgr_to_nv12, argb_to_nv12};
 779    use livekit::webrtc::prelude::NV12Buffer;
 780    match frame.0 {
 781        scap::frame::Frame::BGRx(frame) => {
 782            let mut buffer = NV12Buffer::new(frame.width as u32, frame.height as u32);
 783            let (stride_y, stride_uv) = buffer.strides();
 784            let (data_y, data_uv) = buffer.data_mut();
 785            argb_to_nv12(
 786                &frame.data,
 787                frame.width as u32 * 4,
 788                data_y,
 789                stride_y,
 790                data_uv,
 791                stride_uv,
 792                frame.width,
 793                frame.height,
 794            );
 795            Some(buffer)
 796        }
 797        scap::frame::Frame::RGBx(frame) => {
 798            let mut buffer = NV12Buffer::new(frame.width as u32, frame.height as u32);
 799            let (stride_y, stride_uv) = buffer.strides();
 800            let (data_y, data_uv) = buffer.data_mut();
 801            abgr_to_nv12(
 802                &frame.data,
 803                frame.width as u32 * 4,
 804                data_y,
 805                stride_y,
 806                data_uv,
 807                stride_uv,
 808                frame.width,
 809                frame.height,
 810            );
 811            Some(buffer)
 812        }
 813        scap::frame::Frame::YUVFrame(yuvframe) => {
 814            let mut buffer = NV12Buffer::with_strides(
 815                yuvframe.width as u32,
 816                yuvframe.height as u32,
 817                yuvframe.luminance_stride as u32,
 818                yuvframe.chrominance_stride as u32,
 819            );
 820            let (luminance, chrominance) = buffer.data_mut();
 821            luminance.copy_from_slice(yuvframe.luminance_bytes.as_slice());
 822            chrominance.copy_from_slice(yuvframe.chrominance_bytes.as_slice());
 823            Some(buffer)
 824        }
 825        _ => {
 826            log::error!(
 827                "Expected BGRx or YUV frame from scap screen capture but got some other format."
 828            );
 829            None
 830        }
 831    }
 832}
 833
 834trait DeviceChangeListenerApi: Stream<Item = ()> + Sized {
 835    fn new(input: bool) -> Result<Self>;
 836}
 837
 838#[cfg(target_os = "macos")]
 839mod macos {
 840    use cocoa::{
 841        base::{id, nil},
 842        foundation::{NSProcessInfo, NSString},
 843    };
 844    use coreaudio::sys::{
 845        AudioObjectAddPropertyListener, AudioObjectID, AudioObjectPropertyAddress,
 846        AudioObjectRemovePropertyListener, OSStatus, kAudioHardwarePropertyDefaultInputDevice,
 847        kAudioHardwarePropertyDefaultOutputDevice, kAudioObjectPropertyElementMaster,
 848        kAudioObjectPropertyScopeGlobal, kAudioObjectSystemObject,
 849    };
 850    use futures::{StreamExt, channel::mpsc::UnboundedReceiver};
 851    use objc::{msg_send, sel, sel_impl};
 852
 853    /// A guard that prevents App Nap while held.
 854    ///
 855    /// On macOS, App Nap can throttle background apps to save power. This can cause
 856    /// audio artifacts when the app is not in the foreground. This guard tells macOS
 857    /// that we're doing latency-sensitive work and should not be throttled.
 858    ///
 859    /// See Apple's documentation on prioritizing work at the app level:
 860    /// https://developer.apple.com/library/archive/documentation/Performance/Conceptual/power_efficiency_guidelines_osx/PrioritizeWorkAtTheAppLevel.html
 861    pub struct PreventAppNapGuard {
 862        activity: id,
 863    }
 864
 865    // The activity token returned by NSProcessInfo is thread-safe
 866    unsafe impl Send for PreventAppNapGuard {}
 867
 868    // From NSProcessInfo.h
 869    const NS_ACTIVITY_IDLE_SYSTEM_SLEEP_DISABLED: u64 = 1 << 20;
 870    const NS_ACTIVITY_USER_INITIATED: u64 = 0x00FFFFFF | NS_ACTIVITY_IDLE_SYSTEM_SLEEP_DISABLED;
 871    const NS_ACTIVITY_USER_INITIATED_ALLOWING_IDLE_SYSTEM_SLEEP: u64 =
 872        NS_ACTIVITY_USER_INITIATED & !NS_ACTIVITY_IDLE_SYSTEM_SLEEP_DISABLED;
 873
 874    impl PreventAppNapGuard {
 875        pub fn new() -> Self {
 876            unsafe {
 877                let process_info = NSProcessInfo::processInfo(nil);
 878                #[allow(clippy::disallowed_methods)]
 879                let reason = NSString::alloc(nil).init_str("Audio playback in progress");
 880                let activity: id = msg_send![process_info, beginActivityWithOptions:NS_ACTIVITY_USER_INITIATED_ALLOWING_IDLE_SYSTEM_SLEEP reason:reason];
 881                let _: () = msg_send![reason, release];
 882                let _: () = msg_send![activity, retain];
 883                Self { activity }
 884            }
 885        }
 886    }
 887
 888    impl Drop for PreventAppNapGuard {
 889        fn drop(&mut self) {
 890            unsafe {
 891                let process_info = NSProcessInfo::processInfo(nil);
 892                let _: () = msg_send![process_info, endActivity:self.activity];
 893                let _: () = msg_send![self.activity, release];
 894            }
 895        }
 896    }
 897
 898    /// Implementation from: https://github.com/zed-industries/cpal/blob/fd8bc2fd39f1f5fdee5a0690656caff9a26d9d50/src/host/coreaudio/macos/property_listener.rs#L15
 899    pub struct CoreAudioDefaultDeviceChangeListener {
 900        rx: UnboundedReceiver<()>,
 901        callback: Box<PropertyListenerCallbackWrapper>,
 902        input: bool,
 903        device_id: AudioObjectID, // Store the device ID to properly remove listeners
 904    }
 905
 906    trait _AssertSend: Send {}
 907    impl _AssertSend for CoreAudioDefaultDeviceChangeListener {}
 908
 909    struct PropertyListenerCallbackWrapper(Box<dyn FnMut() + Send>);
 910
 911    unsafe extern "C" fn property_listener_handler_shim(
 912        _: AudioObjectID,
 913        _: u32,
 914        _: *const AudioObjectPropertyAddress,
 915        callback: *mut ::std::os::raw::c_void,
 916    ) -> OSStatus {
 917        let wrapper = callback as *mut PropertyListenerCallbackWrapper;
 918        unsafe { (*wrapper).0() };
 919        0
 920    }
 921
 922    impl super::DeviceChangeListenerApi for CoreAudioDefaultDeviceChangeListener {
 923        fn new(input: bool) -> anyhow::Result<Self> {
 924            let (tx, rx) = futures::channel::mpsc::unbounded();
 925
 926            let callback = Box::new(PropertyListenerCallbackWrapper(Box::new(move || {
 927                tx.unbounded_send(()).ok();
 928            })));
 929
 930            // Get the current default device ID
 931            let device_id = unsafe {
 932                // Listen for default device changes
 933                coreaudio::Error::from_os_status(AudioObjectAddPropertyListener(
 934                    kAudioObjectSystemObject,
 935                    &AudioObjectPropertyAddress {
 936                        mSelector: if input {
 937                            kAudioHardwarePropertyDefaultInputDevice
 938                        } else {
 939                            kAudioHardwarePropertyDefaultOutputDevice
 940                        },
 941                        mScope: kAudioObjectPropertyScopeGlobal,
 942                        mElement: kAudioObjectPropertyElementMaster,
 943                    },
 944                    Some(property_listener_handler_shim),
 945                    &*callback as *const _ as *mut _,
 946                ))?;
 947
 948                // Also listen for changes to the device configuration
 949                let device_id = if input {
 950                    let mut input_device: AudioObjectID = 0;
 951                    let mut prop_size = std::mem::size_of::<AudioObjectID>() as u32;
 952                    let result = coreaudio::sys::AudioObjectGetPropertyData(
 953                        kAudioObjectSystemObject,
 954                        &AudioObjectPropertyAddress {
 955                            mSelector: kAudioHardwarePropertyDefaultInputDevice,
 956                            mScope: kAudioObjectPropertyScopeGlobal,
 957                            mElement: kAudioObjectPropertyElementMaster,
 958                        },
 959                        0,
 960                        std::ptr::null(),
 961                        &mut prop_size as *mut _,
 962                        &mut input_device as *mut _ as *mut _,
 963                    );
 964                    if result != 0 {
 965                        log::warn!("Failed to get default input device ID");
 966                        0
 967                    } else {
 968                        input_device
 969                    }
 970                } else {
 971                    let mut output_device: AudioObjectID = 0;
 972                    let mut prop_size = std::mem::size_of::<AudioObjectID>() as u32;
 973                    let result = coreaudio::sys::AudioObjectGetPropertyData(
 974                        kAudioObjectSystemObject,
 975                        &AudioObjectPropertyAddress {
 976                            mSelector: kAudioHardwarePropertyDefaultOutputDevice,
 977                            mScope: kAudioObjectPropertyScopeGlobal,
 978                            mElement: kAudioObjectPropertyElementMaster,
 979                        },
 980                        0,
 981                        std::ptr::null(),
 982                        &mut prop_size as *mut _,
 983                        &mut output_device as *mut _ as *mut _,
 984                    );
 985                    if result != 0 {
 986                        log::warn!("Failed to get default output device ID");
 987                        0
 988                    } else {
 989                        output_device
 990                    }
 991                };
 992
 993                if device_id != 0 {
 994                    // Listen for format changes on the device
 995                    coreaudio::Error::from_os_status(AudioObjectAddPropertyListener(
 996                        device_id,
 997                        &AudioObjectPropertyAddress {
 998                            mSelector: coreaudio::sys::kAudioDevicePropertyStreamFormat,
 999                            mScope: if input {
1000                                coreaudio::sys::kAudioObjectPropertyScopeInput
1001                            } else {
1002                                coreaudio::sys::kAudioObjectPropertyScopeOutput
1003                            },
1004                            mElement: kAudioObjectPropertyElementMaster,
1005                        },
1006                        Some(property_listener_handler_shim),
1007                        &*callback as *const _ as *mut _,
1008                    ))?;
1009                }
1010
1011                device_id
1012            };
1013
1014            Ok(Self {
1015                rx,
1016                callback,
1017                input,
1018                device_id,
1019            })
1020        }
1021    }
1022
1023    impl Drop for CoreAudioDefaultDeviceChangeListener {
1024        fn drop(&mut self) {
1025            unsafe {
1026                // Remove the system-level property listener
1027                AudioObjectRemovePropertyListener(
1028                    kAudioObjectSystemObject,
1029                    &AudioObjectPropertyAddress {
1030                        mSelector: if self.input {
1031                            kAudioHardwarePropertyDefaultInputDevice
1032                        } else {
1033                            kAudioHardwarePropertyDefaultOutputDevice
1034                        },
1035                        mScope: kAudioObjectPropertyScopeGlobal,
1036                        mElement: kAudioObjectPropertyElementMaster,
1037                    },
1038                    Some(property_listener_handler_shim),
1039                    &*self.callback as *const _ as *mut _,
1040                );
1041
1042                // Remove the device-specific property listener if we have a valid device ID
1043                if self.device_id != 0 {
1044                    AudioObjectRemovePropertyListener(
1045                        self.device_id,
1046                        &AudioObjectPropertyAddress {
1047                            mSelector: coreaudio::sys::kAudioDevicePropertyStreamFormat,
1048                            mScope: if self.input {
1049                                coreaudio::sys::kAudioObjectPropertyScopeInput
1050                            } else {
1051                                coreaudio::sys::kAudioObjectPropertyScopeOutput
1052                            },
1053                            mElement: kAudioObjectPropertyElementMaster,
1054                        },
1055                        Some(property_listener_handler_shim),
1056                        &*self.callback as *const _ as *mut _,
1057                    );
1058                }
1059            }
1060        }
1061    }
1062
1063    impl futures::Stream for CoreAudioDefaultDeviceChangeListener {
1064        type Item = ();
1065
1066        fn poll_next(
1067            mut self: std::pin::Pin<&mut Self>,
1068            cx: &mut std::task::Context<'_>,
1069        ) -> std::task::Poll<Option<Self::Item>> {
1070            self.rx.poll_next_unpin(cx)
1071        }
1072    }
1073}
1074
1075#[cfg(target_os = "macos")]
1076type DeviceChangeListener = macos::CoreAudioDefaultDeviceChangeListener;
1077#[cfg(target_os = "macos")]
1078use macos::PreventAppNapGuard;
1079
1080#[cfg(not(target_os = "macos"))]
1081mod noop_change_listener {
1082    use std::task::Poll;
1083
1084    use super::DeviceChangeListenerApi;
1085
1086    pub struct NoopOutputDeviceChangelistener {}
1087
1088    impl DeviceChangeListenerApi for NoopOutputDeviceChangelistener {
1089        fn new(_input: bool) -> anyhow::Result<Self> {
1090            Ok(NoopOutputDeviceChangelistener {})
1091        }
1092    }
1093
1094    impl futures::Stream for NoopOutputDeviceChangelistener {
1095        type Item = ();
1096
1097        fn poll_next(
1098            self: std::pin::Pin<&mut Self>,
1099            _cx: &mut std::task::Context<'_>,
1100        ) -> Poll<Option<Self::Item>> {
1101            Poll::Pending
1102        }
1103    }
1104}
1105
1106#[cfg(not(target_os = "macos"))]
1107type DeviceChangeListener = noop_change_listener::NoopOutputDeviceChangelistener;