livekit_client.rs

  1#![cfg_attr(all(target_os = "windows", target_env = "gnu"), allow(unused))]
  2
  3mod remote_video_track_view;
  4#[cfg(any(
  5    test,
  6    feature = "test-support",
  7    all(target_os = "windows", target_env = "gnu")
  8))]
  9pub mod test;
 10
 11use anyhow::{anyhow, Context as _, Result};
 12use cpal::traits::{DeviceTrait, HostTrait, StreamTrait as _};
 13use futures::{io, Stream, StreamExt as _};
 14use gpui::{
 15    BackgroundExecutor, ScreenCaptureFrame, ScreenCaptureSource, ScreenCaptureStream, Task,
 16};
 17use parking_lot::Mutex;
 18use std::{borrow::Cow, collections::VecDeque, future::Future, pin::Pin, sync::Arc, thread};
 19use util::{debug_panic, ResultExt as _};
 20#[cfg(not(all(target_os = "windows", target_env = "gnu")))]
 21use webrtc::{
 22    audio_frame::AudioFrame,
 23    audio_source::{native::NativeAudioSource, AudioSourceOptions, RtcAudioSource},
 24    audio_stream::native::NativeAudioStream,
 25    video_frame::{VideoBuffer, VideoFrame, VideoRotation},
 26    video_source::{native::NativeVideoSource, RtcVideoSource, VideoResolution},
 27    video_stream::native::NativeVideoStream,
 28};
 29
 30#[cfg(all(
 31    not(any(test, feature = "test-support")),
 32    not(all(target_os = "windows", target_env = "gnu"))
 33))]
 34use livekit::track::RemoteAudioTrack;
 35#[cfg(all(
 36    not(any(test, feature = "test-support")),
 37    not(all(target_os = "windows", target_env = "gnu"))
 38))]
 39pub use livekit::*;
 40#[cfg(any(
 41    test,
 42    feature = "test-support",
 43    all(target_os = "windows", target_env = "gnu")
 44))]
 45use test::track::RemoteAudioTrack;
 46#[cfg(any(
 47    test,
 48    feature = "test-support",
 49    all(target_os = "windows", target_env = "gnu")
 50))]
 51pub use test::*;
 52
 53pub use remote_video_track_view::{RemoteVideoTrackView, RemoteVideoTrackViewEvent};
 54
 55pub enum AudioStream {
 56    Input {
 57        _thread_handle: std::sync::mpsc::Sender<()>,
 58        _transmit_task: Task<()>,
 59    },
 60    Output {
 61        _task: Task<()>,
 62    },
 63}
 64
 65struct Dispatcher(Arc<dyn gpui::PlatformDispatcher>);
 66
 67#[cfg(not(all(target_os = "windows", target_env = "gnu")))]
 68impl livekit::dispatcher::Dispatcher for Dispatcher {
 69    fn dispatch(&self, runnable: livekit::dispatcher::Runnable) {
 70        self.0.dispatch(runnable, None);
 71    }
 72
 73    fn dispatch_after(
 74        &self,
 75        duration: std::time::Duration,
 76        runnable: livekit::dispatcher::Runnable,
 77    ) {
 78        self.0.dispatch_after(duration, runnable);
 79    }
 80}
 81
 82struct HttpClientAdapter(Arc<dyn http_client::HttpClient>);
 83
 84fn http_2_status(status: http_client::http::StatusCode) -> http_2::StatusCode {
 85    http_2::StatusCode::from_u16(status.as_u16())
 86        .expect("valid status code to status code conversion")
 87}
 88
 89#[cfg(not(all(target_os = "windows", target_env = "gnu")))]
 90impl livekit::dispatcher::HttpClient for HttpClientAdapter {
 91    fn get(
 92        &self,
 93        url: &str,
 94    ) -> Pin<Box<dyn Future<Output = io::Result<livekit::dispatcher::Response>> + Send>> {
 95        let http_client = self.0.clone();
 96        let url = url.to_string();
 97        Box::pin(async move {
 98            let response = http_client
 99                .get(&url, http_client::AsyncBody::empty(), false)
100                .await
101                .map_err(io::Error::other)?;
102            Ok(livekit::dispatcher::Response {
103                status: http_2_status(response.status()),
104                body: Box::pin(response.into_body()),
105            })
106        })
107    }
108
109    fn send_async(
110        &self,
111        request: http_2::Request<Vec<u8>>,
112    ) -> Pin<Box<dyn Future<Output = io::Result<livekit::dispatcher::Response>> + Send>> {
113        let http_client = self.0.clone();
114        let mut builder = http_client::http::Request::builder()
115            .method(request.method().as_str())
116            .uri(request.uri().to_string());
117
118        for (key, value) in request.headers().iter() {
119            builder = builder.header(key.as_str(), value.as_bytes());
120        }
121
122        if !request.extensions().is_empty() {
123            debug_panic!(
124                "Livekit sent an HTTP request with a protocol extension that Zed doesn't support!"
125            );
126        }
127
128        let request = builder
129            .body(http_client::AsyncBody::from_bytes(
130                request.into_body().into(),
131            ))
132            .unwrap();
133
134        Box::pin(async move {
135            let response = http_client.send(request).await.map_err(io::Error::other)?;
136            Ok(livekit::dispatcher::Response {
137                status: http_2_status(response.status()),
138                body: Box::pin(response.into_body()),
139            })
140        })
141    }
142}
143
144#[cfg(all(target_os = "windows", target_env = "gnu"))]
145pub fn init(
146    dispatcher: Arc<dyn gpui::PlatformDispatcher>,
147    http_client: Arc<dyn http_client::HttpClient>,
148) {
149}
150
151#[cfg(not(all(target_os = "windows", target_env = "gnu")))]
152pub fn init(
153    dispatcher: Arc<dyn gpui::PlatformDispatcher>,
154    http_client: Arc<dyn http_client::HttpClient>,
155) {
156    livekit::dispatcher::set_dispatcher(Dispatcher(dispatcher));
157    livekit::dispatcher::set_http_client(HttpClientAdapter(http_client));
158}
159
160#[cfg(not(all(target_os = "windows", target_env = "gnu")))]
161pub async fn capture_local_video_track(
162    capture_source: &dyn ScreenCaptureSource,
163) -> Result<(track::LocalVideoTrack, Box<dyn ScreenCaptureStream>)> {
164    let resolution = capture_source.resolution()?;
165    let track_source = NativeVideoSource::new(VideoResolution {
166        width: resolution.width.0 as u32,
167        height: resolution.height.0 as u32,
168    });
169
170    let capture_stream = capture_source
171        .stream({
172            let track_source = track_source.clone();
173            Box::new(move |frame| {
174                if let Some(buffer) = video_frame_buffer_to_webrtc(frame) {
175                    track_source.capture_frame(&VideoFrame {
176                        rotation: VideoRotation::VideoRotation0,
177                        timestamp_us: 0,
178                        buffer,
179                    });
180                }
181            })
182        })
183        .await??;
184
185    Ok((
186        track::LocalVideoTrack::create_video_track(
187            "screen share",
188            RtcVideoSource::Native(track_source),
189        ),
190        capture_stream,
191    ))
192}
193
194#[cfg(not(all(target_os = "windows", target_env = "gnu")))]
195pub fn capture_local_audio_track(
196    background_executor: &BackgroundExecutor,
197) -> Result<Task<(track::LocalAudioTrack, AudioStream)>> {
198    use util::maybe;
199
200    let (frame_tx, mut frame_rx) = futures::channel::mpsc::unbounded();
201    let (thread_handle, thread_kill_rx) = std::sync::mpsc::channel::<()>();
202    let sample_rate;
203    let channels;
204
205    if cfg!(any(test, feature = "test-support")) {
206        sample_rate = 2;
207        channels = 1;
208    } else {
209        let (device, config) = default_device(true)?;
210        sample_rate = config.sample_rate().0;
211        channels = config.channels() as u32;
212        thread::spawn(move || {
213            maybe!({
214                if let Some(name) = device.name().ok() {
215                    log::info!("Using microphone: {}", name)
216                } else {
217                    log::info!("Using microphone: <unknown>");
218                }
219
220                let stream = device
221                    .build_input_stream_raw(
222                        &config.config(),
223                        cpal::SampleFormat::I16,
224                        move |data, _: &_| {
225                            frame_tx
226                                .unbounded_send(AudioFrame {
227                                    data: Cow::Owned(data.as_slice::<i16>().unwrap().to_vec()),
228                                    sample_rate,
229                                    num_channels: channels,
230                                    samples_per_channel: data.len() as u32 / channels,
231                                })
232                                .ok();
233                        },
234                        |err| log::error!("error capturing audio track: {:?}", err),
235                        None,
236                    )
237                    .context("failed to build input stream")?;
238
239                stream.play()?;
240                // Keep the thread alive and holding onto the `stream`
241                thread_kill_rx.recv().ok();
242                anyhow::Ok(Some(()))
243            })
244            .log_err();
245        });
246    }
247
248    Ok(background_executor.spawn({
249        let background_executor = background_executor.clone();
250        async move {
251            let source = NativeAudioSource::new(
252                AudioSourceOptions {
253                    echo_cancellation: true,
254                    noise_suppression: true,
255                    auto_gain_control: true,
256                },
257                sample_rate,
258                channels,
259                100,
260            );
261            let transmit_task = background_executor.spawn({
262                let source = source.clone();
263                async move {
264                    while let Some(frame) = frame_rx.next().await {
265                        source.capture_frame(&frame).await.log_err();
266                    }
267                }
268            });
269
270            let track = track::LocalAudioTrack::create_audio_track(
271                "microphone",
272                RtcAudioSource::Native(source),
273            );
274
275            (
276                track,
277                AudioStream::Input {
278                    _thread_handle: thread_handle,
279                    _transmit_task: transmit_task,
280                },
281            )
282        }
283    }))
284}
285
286#[cfg(not(all(target_os = "windows", target_env = "gnu")))]
287pub fn play_remote_audio_track(
288    track: &RemoteAudioTrack,
289    background_executor: &BackgroundExecutor,
290) -> Result<AudioStream> {
291    let track = track.clone();
292    // We track device changes in our output because Livekit has a resampler built in,
293    // and it's easy to create a new native audio stream when the device changes.
294    if cfg!(any(test, feature = "test-support")) {
295        Ok(AudioStream::Output {
296            _task: background_executor.spawn(async {}),
297        })
298    } else {
299        let mut default_change_listener = DeviceChangeListener::new(false)?;
300        let (output_device, output_config) = default_device(false)?;
301
302        let _task = background_executor.spawn({
303            let background_executor = background_executor.clone();
304            async move {
305                let (mut _receive_task, mut _thread) =
306                    start_output_stream(output_config, output_device, &track, &background_executor);
307
308                while let Some(_) = default_change_listener.next().await {
309                    let Some((output_device, output_config)) = get_default_output().log_err()
310                    else {
311                        continue;
312                    };
313
314                    if let Ok(name) = output_device.name() {
315                        log::info!("Using speaker: {}", name)
316                    } else {
317                        log::info!("Using speaker: <unknown>")
318                    }
319
320                    (_receive_task, _thread) = start_output_stream(
321                        output_config,
322                        output_device,
323                        &track,
324                        &background_executor,
325                    );
326                }
327
328                futures::future::pending::<()>().await;
329            }
330        });
331
332        Ok(AudioStream::Output { _task })
333    }
334}
335
336fn default_device(input: bool) -> anyhow::Result<(cpal::Device, cpal::SupportedStreamConfig)> {
337    let device;
338    let config;
339    if input {
340        device = cpal::default_host()
341            .default_input_device()
342            .ok_or_else(|| anyhow!("no audio input device available"))?;
343        config = device
344            .default_input_config()
345            .context("failed to get default input config")?;
346    } else {
347        device = cpal::default_host()
348            .default_output_device()
349            .ok_or_else(|| anyhow!("no audio output device available"))?;
350        config = device
351            .default_output_config()
352            .context("failed to get default output config")?;
353    }
354    Ok((device, config))
355}
356
357#[cfg(not(all(target_os = "windows", target_env = "gnu")))]
358fn get_default_output() -> anyhow::Result<(cpal::Device, cpal::SupportedStreamConfig)> {
359    let host = cpal::default_host();
360    let output_device = host
361        .default_output_device()
362        .context("failed to read default output device")?;
363    let output_config = output_device.default_output_config()?;
364    Ok((output_device, output_config))
365}
366
367#[cfg(not(all(target_os = "windows", target_env = "gnu")))]
368fn start_output_stream(
369    output_config: cpal::SupportedStreamConfig,
370    output_device: cpal::Device,
371    track: &track::RemoteAudioTrack,
372    background_executor: &BackgroundExecutor,
373) -> (Task<()>, std::sync::mpsc::Sender<()>) {
374    let buffer = Arc::new(Mutex::new(VecDeque::<i16>::new()));
375    let sample_rate = output_config.sample_rate();
376
377    let mut stream = NativeAudioStream::new(
378        track.rtc_track(),
379        sample_rate.0 as i32,
380        output_config.channels() as i32,
381    );
382
383    let receive_task = background_executor.spawn({
384        let buffer = buffer.clone();
385        async move {
386            const MS_OF_BUFFER: u32 = 100;
387            const MS_IN_SEC: u32 = 1000;
388            while let Some(frame) = stream.next().await {
389                let frame_size = frame.samples_per_channel * frame.num_channels;
390                debug_assert!(frame.data.len() == frame_size as usize);
391
392                let buffer_size =
393                    ((frame.sample_rate * frame.num_channels) / MS_IN_SEC * MS_OF_BUFFER) as usize;
394
395                let mut buffer = buffer.lock();
396                let new_size = buffer.len() + frame.data.len();
397                if new_size > buffer_size {
398                    let overflow = new_size - buffer_size;
399                    buffer.drain(0..overflow);
400                }
401
402                buffer.extend(frame.data.iter());
403            }
404        }
405    });
406
407    // The _output_stream needs to be on it's own thread because it's !Send
408    // and we experienced a deadlock when it's created on the main thread.
409    let (thread, end_on_drop_rx) = std::sync::mpsc::channel::<()>();
410    thread::spawn(move || {
411        if cfg!(any(test, feature = "test-support")) {
412            // Can't play audio in tests
413            return;
414        }
415
416        let output_stream = output_device.build_output_stream(
417            &output_config.config(),
418            {
419                let buffer = buffer.clone();
420                move |data, _info| {
421                    let mut buffer = buffer.lock();
422                    if buffer.len() < data.len() {
423                        // Instead of partially filling a buffer, output silence. If a partial
424                        // buffer was outputted then this could lead to a perpetual state of
425                        // outputting partial buffers as it never gets filled enough for a full
426                        // frame.
427                        data.fill(0);
428                    } else {
429                        // SAFETY: We know that buffer has at least data.len() values in it.
430                        // because we just checked
431                        let mut drain = buffer.drain(..data.len());
432                        data.fill_with(|| unsafe { drain.next().unwrap_unchecked() });
433                    }
434                }
435            },
436            |error| log::error!("error playing audio track: {:?}", error),
437            None,
438        );
439
440        let Some(output_stream) = output_stream.log_err() else {
441            return;
442        };
443
444        output_stream.play().log_err();
445        // Block forever to keep the output stream alive
446        end_on_drop_rx.recv().ok();
447    });
448
449    (receive_task, thread)
450}
451
452#[cfg(all(target_os = "windows", target_env = "gnu"))]
453pub fn play_remote_video_track(
454    track: &track::RemoteVideoTrack,
455) -> impl Stream<Item = RemoteVideoFrame> {
456    futures::stream::empty()
457}
458
459#[cfg(not(all(target_os = "windows", target_env = "gnu")))]
460pub fn play_remote_video_track(
461    track: &track::RemoteVideoTrack,
462) -> impl Stream<Item = RemoteVideoFrame> {
463    NativeVideoStream::new(track.rtc_track())
464        .filter_map(|frame| async move { video_frame_buffer_from_webrtc(frame.buffer) })
465}
466
467#[cfg(target_os = "macos")]
468pub type RemoteVideoFrame = media::core_video::CVImageBuffer;
469
470#[cfg(target_os = "macos")]
471fn video_frame_buffer_from_webrtc(buffer: Box<dyn VideoBuffer>) -> Option<RemoteVideoFrame> {
472    use core_foundation::base::TCFType as _;
473    use media::core_video::CVImageBuffer;
474
475    let buffer = buffer.as_native()?;
476    let pixel_buffer = buffer.get_cv_pixel_buffer();
477    if pixel_buffer.is_null() {
478        return None;
479    }
480
481    unsafe { Some(CVImageBuffer::wrap_under_get_rule(pixel_buffer as _)) }
482}
483
484#[cfg(not(target_os = "macos"))]
485pub type RemoteVideoFrame = Arc<gpui::RenderImage>;
486
487#[cfg(not(any(target_os = "macos", all(target_os = "windows", target_env = "gnu"))))]
488fn video_frame_buffer_from_webrtc(buffer: Box<dyn VideoBuffer>) -> Option<RemoteVideoFrame> {
489    use gpui::RenderImage;
490    use image::{Frame, RgbaImage};
491    use livekit::webrtc::prelude::VideoFormatType;
492    use smallvec::SmallVec;
493    use std::alloc::{alloc, Layout};
494
495    let width = buffer.width();
496    let height = buffer.height();
497    let stride = width * 4;
498    let byte_len = (stride * height) as usize;
499    let argb_image = unsafe {
500        // Motivation for this unsafe code is to avoid initializing the frame data, since to_argb
501        // will write all bytes anyway.
502        let start_ptr = alloc(Layout::array::<u8>(byte_len).log_err()?);
503        if start_ptr.is_null() {
504            return None;
505        }
506        let bgra_frame_slice = std::slice::from_raw_parts_mut(start_ptr, byte_len);
507        buffer.to_argb(
508            VideoFormatType::ARGB, // For some reason, this displays correctly while RGBA (the correct format) does not
509            bgra_frame_slice,
510            stride,
511            width as i32,
512            height as i32,
513        );
514        Vec::from_raw_parts(start_ptr, byte_len, byte_len)
515    };
516
517    Some(Arc::new(RenderImage::new(SmallVec::from_elem(
518        Frame::new(
519            RgbaImage::from_raw(width, height, argb_image)
520                .with_context(|| "Bug: not enough bytes allocated for image.")
521                .log_err()?,
522        ),
523        1,
524    ))))
525}
526
527#[cfg(target_os = "macos")]
528fn video_frame_buffer_to_webrtc(frame: ScreenCaptureFrame) -> Option<impl AsRef<dyn VideoBuffer>> {
529    use core_foundation::base::TCFType as _;
530
531    let pixel_buffer = frame.0.as_concrete_TypeRef();
532    std::mem::forget(frame.0);
533    unsafe {
534        Some(webrtc::video_frame::native::NativeBuffer::from_cv_pixel_buffer(pixel_buffer as _))
535    }
536}
537
538#[cfg(not(any(target_os = "macos", all(target_os = "windows", target_env = "gnu"))))]
539fn video_frame_buffer_to_webrtc(_frame: ScreenCaptureFrame) -> Option<impl AsRef<dyn VideoBuffer>> {
540    None as Option<Box<dyn VideoBuffer>>
541}
542
543trait DeviceChangeListenerApi: Stream<Item = ()> + Sized {
544    fn new(input: bool) -> Result<Self>;
545}
546
547#[cfg(target_os = "macos")]
548mod macos {
549
550    use coreaudio::sys::{
551        kAudioHardwarePropertyDefaultInputDevice, kAudioHardwarePropertyDefaultOutputDevice,
552        kAudioObjectPropertyElementMaster, kAudioObjectPropertyScopeGlobal,
553        kAudioObjectSystemObject, AudioObjectAddPropertyListener, AudioObjectID,
554        AudioObjectPropertyAddress, AudioObjectRemovePropertyListener, OSStatus,
555    };
556    use futures::{channel::mpsc::UnboundedReceiver, StreamExt};
557
558    use crate::DeviceChangeListenerApi;
559
560    /// Implementation from: https://github.com/zed-industries/cpal/blob/fd8bc2fd39f1f5fdee5a0690656caff9a26d9d50/src/host/coreaudio/macos/property_listener.rs#L15
561    pub struct CoreAudioDefaultDeviceChangeListener {
562        rx: UnboundedReceiver<()>,
563        callback: Box<PropertyListenerCallbackWrapper>,
564        input: bool,
565    }
566
567    trait _AssertSend: Send {}
568    impl _AssertSend for CoreAudioDefaultDeviceChangeListener {}
569
570    struct PropertyListenerCallbackWrapper(Box<dyn FnMut() + Send>);
571
572    unsafe extern "C" fn property_listener_handler_shim(
573        _: AudioObjectID,
574        _: u32,
575        _: *const AudioObjectPropertyAddress,
576        callback: *mut ::std::os::raw::c_void,
577    ) -> OSStatus {
578        let wrapper = callback as *mut PropertyListenerCallbackWrapper;
579        (*wrapper).0();
580        0
581    }
582
583    impl DeviceChangeListenerApi for CoreAudioDefaultDeviceChangeListener {
584        fn new(input: bool) -> gpui::Result<Self> {
585            let (tx, rx) = futures::channel::mpsc::unbounded();
586
587            let callback = Box::new(PropertyListenerCallbackWrapper(Box::new(move || {
588                tx.unbounded_send(()).ok();
589            })));
590
591            unsafe {
592                coreaudio::Error::from_os_status(AudioObjectAddPropertyListener(
593                    kAudioObjectSystemObject,
594                    &AudioObjectPropertyAddress {
595                        mSelector: if input {
596                            kAudioHardwarePropertyDefaultInputDevice
597                        } else {
598                            kAudioHardwarePropertyDefaultOutputDevice
599                        },
600                        mScope: kAudioObjectPropertyScopeGlobal,
601                        mElement: kAudioObjectPropertyElementMaster,
602                    },
603                    Some(property_listener_handler_shim),
604                    &*callback as *const _ as *mut _,
605                ))?;
606            }
607
608            Ok(Self {
609                rx,
610                callback,
611                input,
612            })
613        }
614    }
615
616    impl Drop for CoreAudioDefaultDeviceChangeListener {
617        fn drop(&mut self) {
618            unsafe {
619                AudioObjectRemovePropertyListener(
620                    kAudioObjectSystemObject,
621                    &AudioObjectPropertyAddress {
622                        mSelector: if self.input {
623                            kAudioHardwarePropertyDefaultInputDevice
624                        } else {
625                            kAudioHardwarePropertyDefaultOutputDevice
626                        },
627                        mScope: kAudioObjectPropertyScopeGlobal,
628                        mElement: kAudioObjectPropertyElementMaster,
629                    },
630                    Some(property_listener_handler_shim),
631                    &*self.callback as *const _ as *mut _,
632                );
633            }
634        }
635    }
636
637    impl futures::Stream for CoreAudioDefaultDeviceChangeListener {
638        type Item = ();
639
640        fn poll_next(
641            mut self: std::pin::Pin<&mut Self>,
642            cx: &mut std::task::Context<'_>,
643        ) -> std::task::Poll<Option<Self::Item>> {
644            self.rx.poll_next_unpin(cx)
645        }
646    }
647}
648
649#[cfg(target_os = "macos")]
650type DeviceChangeListener = macos::CoreAudioDefaultDeviceChangeListener;
651
652#[cfg(not(target_os = "macos"))]
653mod noop_change_listener {
654    use std::task::Poll;
655
656    use crate::DeviceChangeListenerApi;
657
658    pub struct NoopOutputDeviceChangelistener {}
659
660    impl DeviceChangeListenerApi for NoopOutputDeviceChangelistener {
661        fn new(_input: bool) -> anyhow::Result<Self> {
662            Ok(NoopOutputDeviceChangelistener {})
663        }
664    }
665
666    impl futures::Stream for NoopOutputDeviceChangelistener {
667        type Item = ();
668
669        fn poll_next(
670            self: std::pin::Pin<&mut Self>,
671            _cx: &mut std::task::Context<'_>,
672        ) -> Poll<Option<Self::Item>> {
673            Poll::Pending
674        }
675    }
676}
677
678#[cfg(not(target_os = "macos"))]
679type DeviceChangeListener = noop_change_listener::NoopOutputDeviceChangelistener;