livekit_client.rs

  1mod remote_video_track_view;
  2#[cfg(any(test, feature = "test-support"))]
  3pub mod test;
  4
  5use anyhow::{anyhow, Context as _, Result};
  6use cpal::traits::{DeviceTrait, HostTrait, StreamTrait as _};
  7use futures::{io, Stream, StreamExt as _};
  8use gpui::{
  9    BackgroundExecutor, ScreenCaptureFrame, ScreenCaptureSource, ScreenCaptureStream, Task,
 10};
 11use parking_lot::Mutex;
 12use std::{borrow::Cow, collections::VecDeque, future::Future, pin::Pin, sync::Arc, thread};
 13use util::{debug_panic, ResultExt as _};
 14use webrtc::{
 15    audio_frame::AudioFrame,
 16    audio_source::{native::NativeAudioSource, AudioSourceOptions, RtcAudioSource},
 17    audio_stream::native::NativeAudioStream,
 18    video_frame::{VideoBuffer, VideoFrame, VideoRotation},
 19    video_source::{native::NativeVideoSource, RtcVideoSource, VideoResolution},
 20    video_stream::native::NativeVideoStream,
 21};
 22
 23#[cfg(not(any(test, feature = "test-support")))]
 24use livekit::track::RemoteAudioTrack;
 25#[cfg(not(any(test, feature = "test-support")))]
 26pub use livekit::*;
 27#[cfg(any(test, feature = "test-support"))]
 28use test::track::RemoteAudioTrack;
 29#[cfg(any(test, feature = "test-support"))]
 30pub use test::*;
 31
 32pub use remote_video_track_view::{RemoteVideoTrackView, RemoteVideoTrackViewEvent};
 33
 34pub enum AudioStream {
 35    Input {
 36        _thread_handle: std::sync::mpsc::Sender<()>,
 37        _transmit_task: Task<()>,
 38    },
 39    Output {
 40        _task: Task<()>,
 41    },
 42}
 43
 44struct Dispatcher(Arc<dyn gpui::PlatformDispatcher>);
 45
 46impl livekit::dispatcher::Dispatcher for Dispatcher {
 47    fn dispatch(&self, runnable: livekit::dispatcher::Runnable) {
 48        self.0.dispatch(runnable, None);
 49    }
 50
 51    fn dispatch_after(
 52        &self,
 53        duration: std::time::Duration,
 54        runnable: livekit::dispatcher::Runnable,
 55    ) {
 56        self.0.dispatch_after(duration, runnable);
 57    }
 58}
 59
 60struct HttpClientAdapter(Arc<dyn http_client::HttpClient>);
 61
 62fn http_2_status(status: http_client::http::StatusCode) -> http_2::StatusCode {
 63    http_2::StatusCode::from_u16(status.as_u16())
 64        .expect("valid status code to status code conversion")
 65}
 66
 67impl livekit::dispatcher::HttpClient for HttpClientAdapter {
 68    fn get(
 69        &self,
 70        url: &str,
 71    ) -> Pin<Box<dyn Future<Output = io::Result<livekit::dispatcher::Response>> + Send>> {
 72        let http_client = self.0.clone();
 73        let url = url.to_string();
 74        Box::pin(async move {
 75            let response = http_client
 76                .get(&url, http_client::AsyncBody::empty(), false)
 77                .await
 78                .map_err(io::Error::other)?;
 79            Ok(livekit::dispatcher::Response {
 80                status: http_2_status(response.status()),
 81                body: Box::pin(response.into_body()),
 82            })
 83        })
 84    }
 85
 86    fn send_async(
 87        &self,
 88        request: http_2::Request<Vec<u8>>,
 89    ) -> Pin<Box<dyn Future<Output = io::Result<livekit::dispatcher::Response>> + Send>> {
 90        let http_client = self.0.clone();
 91        let mut builder = http_client::http::Request::builder()
 92            .method(request.method().as_str())
 93            .uri(request.uri().to_string());
 94
 95        for (key, value) in request.headers().iter() {
 96            builder = builder.header(key.as_str(), value.as_bytes());
 97        }
 98
 99        if !request.extensions().is_empty() {
100            debug_panic!(
101                "Livekit sent an HTTP request with a protocol extension that Zed doesn't support!"
102            );
103        }
104
105        let request = builder
106            .body(http_client::AsyncBody::from_bytes(
107                request.into_body().into(),
108            ))
109            .unwrap();
110
111        Box::pin(async move {
112            let response = http_client.send(request).await.map_err(io::Error::other)?;
113            Ok(livekit::dispatcher::Response {
114                status: http_2_status(response.status()),
115                body: Box::pin(response.into_body()),
116            })
117        })
118    }
119}
120
121pub fn init(
122    dispatcher: Arc<dyn gpui::PlatformDispatcher>,
123    http_client: Arc<dyn http_client::HttpClient>,
124) {
125    livekit::dispatcher::set_dispatcher(Dispatcher(dispatcher));
126    livekit::dispatcher::set_http_client(HttpClientAdapter(http_client));
127}
128
129pub async fn capture_local_video_track(
130    capture_source: &dyn ScreenCaptureSource,
131) -> Result<(track::LocalVideoTrack, Box<dyn ScreenCaptureStream>)> {
132    let resolution = capture_source.resolution()?;
133    let track_source = NativeVideoSource::new(VideoResolution {
134        width: resolution.width.0 as u32,
135        height: resolution.height.0 as u32,
136    });
137
138    let capture_stream = capture_source
139        .stream({
140            let track_source = track_source.clone();
141            Box::new(move |frame| {
142                if let Some(buffer) = video_frame_buffer_to_webrtc(frame) {
143                    track_source.capture_frame(&VideoFrame {
144                        rotation: VideoRotation::VideoRotation0,
145                        timestamp_us: 0,
146                        buffer,
147                    });
148                }
149            })
150        })
151        .await??;
152
153    Ok((
154        track::LocalVideoTrack::create_video_track(
155            "screen share",
156            RtcVideoSource::Native(track_source),
157        ),
158        capture_stream,
159    ))
160}
161
162pub fn capture_local_audio_track(
163    background_executor: &BackgroundExecutor,
164) -> Result<Task<(track::LocalAudioTrack, AudioStream)>> {
165    use util::maybe;
166
167    let (frame_tx, mut frame_rx) = futures::channel::mpsc::unbounded();
168    let (thread_handle, thread_kill_rx) = std::sync::mpsc::channel::<()>();
169    let sample_rate;
170    let channels;
171
172    if cfg!(any(test, feature = "test-support")) {
173        sample_rate = 2;
174        channels = 1;
175    } else {
176        let (device, config) = default_device(true)?;
177        sample_rate = config.sample_rate().0;
178        channels = config.channels() as u32;
179        thread::spawn(move || {
180            maybe!({
181                if let Some(name) = device.name().ok() {
182                    log::info!("Using microphone: {}", name)
183                } else {
184                    log::info!("Using microphone: <unknown>");
185                }
186
187                let stream = device
188                    .build_input_stream_raw(
189                        &config.config(),
190                        cpal::SampleFormat::I16,
191                        move |data, _: &_| {
192                            frame_tx
193                                .unbounded_send(AudioFrame {
194                                    data: Cow::Owned(data.as_slice::<i16>().unwrap().to_vec()),
195                                    sample_rate,
196                                    num_channels: channels,
197                                    samples_per_channel: data.len() as u32 / channels,
198                                })
199                                .ok();
200                        },
201                        |err| log::error!("error capturing audio track: {:?}", err),
202                        None,
203                    )
204                    .context("failed to build input stream")?;
205
206                stream.play()?;
207                // Keep the thread alive and holding onto the `stream`
208                thread_kill_rx.recv().ok();
209                anyhow::Ok(Some(()))
210            })
211            .log_err();
212        });
213    }
214
215    Ok(background_executor.spawn({
216        let background_executor = background_executor.clone();
217        async move {
218            let source = NativeAudioSource::new(
219                AudioSourceOptions {
220                    echo_cancellation: true,
221                    noise_suppression: true,
222                    auto_gain_control: true,
223                },
224                sample_rate,
225                channels,
226                100,
227            );
228            let transmit_task = background_executor.spawn({
229                let source = source.clone();
230                async move {
231                    while let Some(frame) = frame_rx.next().await {
232                        source.capture_frame(&frame).await.log_err();
233                    }
234                }
235            });
236
237            let track = track::LocalAudioTrack::create_audio_track(
238                "microphone",
239                RtcAudioSource::Native(source),
240            );
241
242            (
243                track,
244                AudioStream::Input {
245                    _thread_handle: thread_handle,
246                    _transmit_task: transmit_task,
247                },
248            )
249        }
250    }))
251}
252
253pub fn play_remote_audio_track(
254    track: &RemoteAudioTrack,
255    background_executor: &BackgroundExecutor,
256) -> Result<AudioStream> {
257    let track = track.clone();
258    // We track device changes in our output because Livekit has a resampler built in,
259    // and it's easy to create a new native audio stream when the device changes.
260    if cfg!(any(test, feature = "test-support")) {
261        Ok(AudioStream::Output {
262            _task: background_executor.spawn(async {}),
263        })
264    } else {
265        let mut default_change_listener = DeviceChangeListener::new(false)?;
266        let (output_device, output_config) = default_device(false)?;
267
268        let _task = background_executor.spawn({
269            let background_executor = background_executor.clone();
270            async move {
271                let (mut _receive_task, mut _thread) =
272                    start_output_stream(output_config, output_device, &track, &background_executor);
273
274                while let Some(_) = default_change_listener.next().await {
275                    let Some((output_device, output_config)) = get_default_output().log_err()
276                    else {
277                        continue;
278                    };
279
280                    if let Ok(name) = output_device.name() {
281                        log::info!("Using speaker: {}", name)
282                    } else {
283                        log::info!("Using speaker: <unknown>")
284                    }
285
286                    (_receive_task, _thread) = start_output_stream(
287                        output_config,
288                        output_device,
289                        &track,
290                        &background_executor,
291                    );
292                }
293
294                futures::future::pending::<()>().await;
295            }
296        });
297
298        Ok(AudioStream::Output { _task })
299    }
300}
301
302fn default_device(input: bool) -> anyhow::Result<(cpal::Device, cpal::SupportedStreamConfig)> {
303    let device;
304    let config;
305    if input {
306        device = cpal::default_host()
307            .default_input_device()
308            .ok_or_else(|| anyhow!("no audio input device available"))?;
309        config = device
310            .default_input_config()
311            .context("failed to get default input config")?;
312    } else {
313        device = cpal::default_host()
314            .default_output_device()
315            .ok_or_else(|| anyhow!("no audio output device available"))?;
316        config = device
317            .default_output_config()
318            .context("failed to get default output config")?;
319    }
320    Ok((device, config))
321}
322
323fn get_default_output() -> anyhow::Result<(cpal::Device, cpal::SupportedStreamConfig)> {
324    let host = cpal::default_host();
325    let output_device = host
326        .default_output_device()
327        .context("failed to read default output device")?;
328    let output_config = output_device.default_output_config()?;
329    Ok((output_device, output_config))
330}
331
332fn start_output_stream(
333    output_config: cpal::SupportedStreamConfig,
334    output_device: cpal::Device,
335    track: &track::RemoteAudioTrack,
336    background_executor: &BackgroundExecutor,
337) -> (Task<()>, std::sync::mpsc::Sender<()>) {
338    let buffer = Arc::new(Mutex::new(VecDeque::<i16>::new()));
339    let sample_rate = output_config.sample_rate();
340
341    let mut stream = NativeAudioStream::new(
342        track.rtc_track(),
343        sample_rate.0 as i32,
344        output_config.channels() as i32,
345    );
346
347    let receive_task = background_executor.spawn({
348        let buffer = buffer.clone();
349        async move {
350            const MS_OF_BUFFER: u32 = 100;
351            const MS_IN_SEC: u32 = 1000;
352            while let Some(frame) = stream.next().await {
353                let frame_size = frame.samples_per_channel * frame.num_channels;
354                debug_assert!(frame.data.len() == frame_size as usize);
355
356                let buffer_size =
357                    ((frame.sample_rate * frame.num_channels) / MS_IN_SEC * MS_OF_BUFFER) as usize;
358
359                let mut buffer = buffer.lock();
360                let new_size = buffer.len() + frame.data.len();
361                if new_size > buffer_size {
362                    let overflow = new_size - buffer_size;
363                    buffer.drain(0..overflow);
364                }
365
366                buffer.extend(frame.data.iter());
367            }
368        }
369    });
370
371    // The _output_stream needs to be on it's own thread because it's !Send
372    // and we experienced a deadlock when it's created on the main thread.
373    let (thread, end_on_drop_rx) = std::sync::mpsc::channel::<()>();
374    thread::spawn(move || {
375        if cfg!(any(test, feature = "test-support")) {
376            // Can't play audio in tests
377            return;
378        }
379
380        let output_stream = output_device.build_output_stream(
381            &output_config.config(),
382            {
383                let buffer = buffer.clone();
384                move |data, _info| {
385                    let mut buffer = buffer.lock();
386                    if buffer.len() < data.len() {
387                        // Instead of partially filling a buffer, output silence. If a partial
388                        // buffer was outputted then this could lead to a perpetual state of
389                        // outputting partial buffers as it never gets filled enough for a full
390                        // frame.
391                        data.fill(0);
392                    } else {
393                        // SAFETY: We know that buffer has at least data.len() values in it.
394                        // because we just checked
395                        let mut drain = buffer.drain(..data.len());
396                        data.fill_with(|| unsafe { drain.next().unwrap_unchecked() });
397                    }
398                }
399            },
400            |error| log::error!("error playing audio track: {:?}", error),
401            None,
402        );
403
404        let Some(output_stream) = output_stream.log_err() else {
405            return;
406        };
407
408        output_stream.play().log_err();
409        // Block forever to keep the output stream alive
410        end_on_drop_rx.recv().ok();
411    });
412
413    (receive_task, thread)
414}
415
416pub fn play_remote_video_track(
417    track: &track::RemoteVideoTrack,
418) -> impl Stream<Item = RemoteVideoFrame> {
419    NativeVideoStream::new(track.rtc_track())
420        .filter_map(|frame| async move { video_frame_buffer_from_webrtc(frame.buffer) })
421}
422
423#[cfg(target_os = "macos")]
424pub type RemoteVideoFrame = media::core_video::CVImageBuffer;
425
426#[cfg(target_os = "macos")]
427fn video_frame_buffer_from_webrtc(buffer: Box<dyn VideoBuffer>) -> Option<RemoteVideoFrame> {
428    use core_foundation::base::TCFType as _;
429    use media::core_video::CVImageBuffer;
430
431    let buffer = buffer.as_native()?;
432    let pixel_buffer = buffer.get_cv_pixel_buffer();
433    if pixel_buffer.is_null() {
434        return None;
435    }
436
437    unsafe { Some(CVImageBuffer::wrap_under_get_rule(pixel_buffer as _)) }
438}
439
440#[cfg(not(target_os = "macos"))]
441pub type RemoteVideoFrame = Arc<gpui::RenderImage>;
442
443#[cfg(not(target_os = "macos"))]
444fn video_frame_buffer_from_webrtc(buffer: Box<dyn VideoBuffer>) -> Option<RemoteVideoFrame> {
445    use gpui::RenderImage;
446    use image::{Frame, RgbaImage};
447    use livekit::webrtc::prelude::VideoFormatType;
448    use smallvec::SmallVec;
449    use std::alloc::{alloc, Layout};
450
451    let width = buffer.width();
452    let height = buffer.height();
453    let stride = width * 4;
454    let byte_len = (stride * height) as usize;
455    let argb_image = unsafe {
456        // Motivation for this unsafe code is to avoid initializing the frame data, since to_argb
457        // will write all bytes anyway.
458        let start_ptr = alloc(Layout::array::<u8>(byte_len).log_err()?);
459        if start_ptr.is_null() {
460            return None;
461        }
462        let bgra_frame_slice = std::slice::from_raw_parts_mut(start_ptr, byte_len);
463        buffer.to_argb(
464            VideoFormatType::ARGB, // For some reason, this displays correctly while RGBA (the correct format) does not
465            bgra_frame_slice,
466            stride,
467            width as i32,
468            height as i32,
469        );
470        Vec::from_raw_parts(start_ptr, byte_len, byte_len)
471    };
472
473    Some(Arc::new(RenderImage::new(SmallVec::from_elem(
474        Frame::new(
475            RgbaImage::from_raw(width, height, argb_image)
476                .with_context(|| "Bug: not enough bytes allocated for image.")
477                .log_err()?,
478        ),
479        1,
480    ))))
481}
482
483#[cfg(target_os = "macos")]
484fn video_frame_buffer_to_webrtc(frame: ScreenCaptureFrame) -> Option<impl AsRef<dyn VideoBuffer>> {
485    use core_foundation::base::TCFType as _;
486
487    let pixel_buffer = frame.0.as_concrete_TypeRef();
488    std::mem::forget(frame.0);
489    unsafe {
490        Some(webrtc::video_frame::native::NativeBuffer::from_cv_pixel_buffer(pixel_buffer as _))
491    }
492}
493
494#[cfg(not(target_os = "macos"))]
495fn video_frame_buffer_to_webrtc(_frame: ScreenCaptureFrame) -> Option<impl AsRef<dyn VideoBuffer>> {
496    None as Option<Box<dyn VideoBuffer>>
497}
498
499trait DeviceChangeListenerApi: Stream<Item = ()> + Sized {
500    fn new(input: bool) -> Result<Self>;
501}
502
503#[cfg(target_os = "macos")]
504mod macos {
505
506    use coreaudio::sys::{
507        kAudioHardwarePropertyDefaultInputDevice, kAudioHardwarePropertyDefaultOutputDevice,
508        kAudioObjectPropertyElementMaster, kAudioObjectPropertyScopeGlobal,
509        kAudioObjectSystemObject, AudioObjectAddPropertyListener, AudioObjectID,
510        AudioObjectPropertyAddress, AudioObjectRemovePropertyListener, OSStatus,
511    };
512    use futures::{channel::mpsc::UnboundedReceiver, StreamExt};
513
514    use crate::DeviceChangeListenerApi;
515
516    /// Implementation from: https://github.com/zed-industries/cpal/blob/fd8bc2fd39f1f5fdee5a0690656caff9a26d9d50/src/host/coreaudio/macos/property_listener.rs#L15
517    pub struct CoreAudioDefaultDeviceChangeListener {
518        rx: UnboundedReceiver<()>,
519        callback: Box<PropertyListenerCallbackWrapper>,
520        input: bool,
521    }
522
523    trait _AssertSend: Send {}
524    impl _AssertSend for CoreAudioDefaultDeviceChangeListener {}
525
526    struct PropertyListenerCallbackWrapper(Box<dyn FnMut() + Send>);
527
528    unsafe extern "C" fn property_listener_handler_shim(
529        _: AudioObjectID,
530        _: u32,
531        _: *const AudioObjectPropertyAddress,
532        callback: *mut ::std::os::raw::c_void,
533    ) -> OSStatus {
534        let wrapper = callback as *mut PropertyListenerCallbackWrapper;
535        (*wrapper).0();
536        0
537    }
538
539    impl DeviceChangeListenerApi for CoreAudioDefaultDeviceChangeListener {
540        fn new(input: bool) -> gpui::Result<Self> {
541            let (tx, rx) = futures::channel::mpsc::unbounded();
542
543            let callback = Box::new(PropertyListenerCallbackWrapper(Box::new(move || {
544                tx.unbounded_send(()).ok();
545            })));
546
547            unsafe {
548                coreaudio::Error::from_os_status(AudioObjectAddPropertyListener(
549                    kAudioObjectSystemObject,
550                    &AudioObjectPropertyAddress {
551                        mSelector: if input {
552                            kAudioHardwarePropertyDefaultInputDevice
553                        } else {
554                            kAudioHardwarePropertyDefaultOutputDevice
555                        },
556                        mScope: kAudioObjectPropertyScopeGlobal,
557                        mElement: kAudioObjectPropertyElementMaster,
558                    },
559                    Some(property_listener_handler_shim),
560                    &*callback as *const _ as *mut _,
561                ))?;
562            }
563
564            Ok(Self {
565                rx,
566                callback,
567                input,
568            })
569        }
570    }
571
572    impl Drop for CoreAudioDefaultDeviceChangeListener {
573        fn drop(&mut self) {
574            unsafe {
575                AudioObjectRemovePropertyListener(
576                    kAudioObjectSystemObject,
577                    &AudioObjectPropertyAddress {
578                        mSelector: if self.input {
579                            kAudioHardwarePropertyDefaultInputDevice
580                        } else {
581                            kAudioHardwarePropertyDefaultOutputDevice
582                        },
583                        mScope: kAudioObjectPropertyScopeGlobal,
584                        mElement: kAudioObjectPropertyElementMaster,
585                    },
586                    Some(property_listener_handler_shim),
587                    &*self.callback as *const _ as *mut _,
588                );
589            }
590        }
591    }
592
593    impl futures::Stream for CoreAudioDefaultDeviceChangeListener {
594        type Item = ();
595
596        fn poll_next(
597            mut self: std::pin::Pin<&mut Self>,
598            cx: &mut std::task::Context<'_>,
599        ) -> std::task::Poll<Option<Self::Item>> {
600            self.rx.poll_next_unpin(cx)
601        }
602    }
603}
604
605#[cfg(target_os = "macos")]
606type DeviceChangeListener = macos::CoreAudioDefaultDeviceChangeListener;
607
608#[cfg(not(target_os = "macos"))]
609mod noop_change_listener {
610    use std::task::Poll;
611
612    use crate::DeviceChangeListenerApi;
613
614    pub struct NoopOutputDeviceChangelistener {}
615
616    impl DeviceChangeListenerApi for NoopOutputDeviceChangelistener {
617        fn new(_input: bool) -> anyhow::Result<Self> {
618            Ok(NoopOutputDeviceChangelistener {})
619        }
620    }
621
622    impl futures::Stream for NoopOutputDeviceChangelistener {
623        type Item = ();
624
625        fn poll_next(
626            self: std::pin::Pin<&mut Self>,
627            _cx: &mut std::task::Context<'_>,
628        ) -> Poll<Option<Self::Item>> {
629            Poll::Pending
630        }
631    }
632}
633
634#[cfg(not(target_os = "macos"))]
635type DeviceChangeListener = noop_change_listener::NoopOutputDeviceChangelistener;