livekit_client.rs

  1#![cfg_attr(target_os = "windows", allow(unused))]
  2
  3mod remote_video_track_view;
  4#[cfg(any(test, feature = "test-support", target_os = "windows"))]
  5pub mod test;
  6
  7use anyhow::{anyhow, Context as _, Result};
  8use cpal::traits::{DeviceTrait, HostTrait, StreamTrait as _};
  9use futures::{io, Stream, StreamExt as _};
 10use gpui::{
 11    BackgroundExecutor, ScreenCaptureFrame, ScreenCaptureSource, ScreenCaptureStream, Task,
 12};
 13use parking_lot::Mutex;
 14use std::{borrow::Cow, collections::VecDeque, future::Future, pin::Pin, sync::Arc, thread};
 15use util::{debug_panic, ResultExt as _};
 16#[cfg(not(target_os = "windows"))]
 17use webrtc::{
 18    audio_frame::AudioFrame,
 19    audio_source::{native::NativeAudioSource, AudioSourceOptions, RtcAudioSource},
 20    audio_stream::native::NativeAudioStream,
 21    video_frame::{VideoBuffer, VideoFrame, VideoRotation},
 22    video_source::{native::NativeVideoSource, RtcVideoSource, VideoResolution},
 23    video_stream::native::NativeVideoStream,
 24};
 25
 26#[cfg(all(not(any(test, feature = "test-support")), not(target_os = "windows")))]
 27use livekit::track::RemoteAudioTrack;
 28#[cfg(all(not(any(test, feature = "test-support")), not(target_os = "windows")))]
 29pub use livekit::*;
 30#[cfg(any(test, feature = "test-support", target_os = "windows"))]
 31use test::track::RemoteAudioTrack;
 32#[cfg(any(test, feature = "test-support", target_os = "windows"))]
 33pub use test::*;
 34
 35pub use remote_video_track_view::{RemoteVideoTrackView, RemoteVideoTrackViewEvent};
 36
 37pub enum AudioStream {
 38    Input {
 39        _thread_handle: std::sync::mpsc::Sender<()>,
 40        _transmit_task: Task<()>,
 41    },
 42    Output {
 43        _task: Task<()>,
 44    },
 45}
 46
 47struct Dispatcher(Arc<dyn gpui::PlatformDispatcher>);
 48
 49#[cfg(not(target_os = "windows"))]
 50impl livekit::dispatcher::Dispatcher for Dispatcher {
 51    fn dispatch(&self, runnable: livekit::dispatcher::Runnable) {
 52        self.0.dispatch(runnable, None);
 53    }
 54
 55    fn dispatch_after(
 56        &self,
 57        duration: std::time::Duration,
 58        runnable: livekit::dispatcher::Runnable,
 59    ) {
 60        self.0.dispatch_after(duration, runnable);
 61    }
 62}
 63
 64struct HttpClientAdapter(Arc<dyn http_client::HttpClient>);
 65
 66fn http_2_status(status: http_client::http::StatusCode) -> http_2::StatusCode {
 67    http_2::StatusCode::from_u16(status.as_u16())
 68        .expect("valid status code to status code conversion")
 69}
 70
 71#[cfg(not(target_os = "windows"))]
 72impl livekit::dispatcher::HttpClient for HttpClientAdapter {
 73    fn get(
 74        &self,
 75        url: &str,
 76    ) -> Pin<Box<dyn Future<Output = io::Result<livekit::dispatcher::Response>> + Send>> {
 77        let http_client = self.0.clone();
 78        let url = url.to_string();
 79        Box::pin(async move {
 80            let response = http_client
 81                .get(&url, http_client::AsyncBody::empty(), false)
 82                .await
 83                .map_err(io::Error::other)?;
 84            Ok(livekit::dispatcher::Response {
 85                status: http_2_status(response.status()),
 86                body: Box::pin(response.into_body()),
 87            })
 88        })
 89    }
 90
 91    fn send_async(
 92        &self,
 93        request: http_2::Request<Vec<u8>>,
 94    ) -> Pin<Box<dyn Future<Output = io::Result<livekit::dispatcher::Response>> + Send>> {
 95        let http_client = self.0.clone();
 96        let mut builder = http_client::http::Request::builder()
 97            .method(request.method().as_str())
 98            .uri(request.uri().to_string());
 99
100        for (key, value) in request.headers().iter() {
101            builder = builder.header(key.as_str(), value.as_bytes());
102        }
103
104        if !request.extensions().is_empty() {
105            debug_panic!(
106                "Livekit sent an HTTP request with a protocol extension that Zed doesn't support!"
107            );
108        }
109
110        let request = builder
111            .body(http_client::AsyncBody::from_bytes(
112                request.into_body().into(),
113            ))
114            .unwrap();
115
116        Box::pin(async move {
117            let response = http_client.send(request).await.map_err(io::Error::other)?;
118            Ok(livekit::dispatcher::Response {
119                status: http_2_status(response.status()),
120                body: Box::pin(response.into_body()),
121            })
122        })
123    }
124}
125
126#[cfg(target_os = "windows")]
127pub fn init(
128    dispatcher: Arc<dyn gpui::PlatformDispatcher>,
129    http_client: Arc<dyn http_client::HttpClient>,
130) {
131}
132
133#[cfg(not(target_os = "windows"))]
134pub fn init(
135    dispatcher: Arc<dyn gpui::PlatformDispatcher>,
136    http_client: Arc<dyn http_client::HttpClient>,
137) {
138    livekit::dispatcher::set_dispatcher(Dispatcher(dispatcher));
139    livekit::dispatcher::set_http_client(HttpClientAdapter(http_client));
140}
141
142#[cfg(not(target_os = "windows"))]
143pub async fn capture_local_video_track(
144    capture_source: &dyn ScreenCaptureSource,
145) -> Result<(track::LocalVideoTrack, Box<dyn ScreenCaptureStream>)> {
146    let resolution = capture_source.resolution()?;
147    let track_source = NativeVideoSource::new(VideoResolution {
148        width: resolution.width.0 as u32,
149        height: resolution.height.0 as u32,
150    });
151
152    let capture_stream = capture_source
153        .stream({
154            let track_source = track_source.clone();
155            Box::new(move |frame| {
156                if let Some(buffer) = video_frame_buffer_to_webrtc(frame) {
157                    track_source.capture_frame(&VideoFrame {
158                        rotation: VideoRotation::VideoRotation0,
159                        timestamp_us: 0,
160                        buffer,
161                    });
162                }
163            })
164        })
165        .await??;
166
167    Ok((
168        track::LocalVideoTrack::create_video_track(
169            "screen share",
170            RtcVideoSource::Native(track_source),
171        ),
172        capture_stream,
173    ))
174}
175
176#[cfg(not(target_os = "windows"))]
177pub fn capture_local_audio_track(
178    background_executor: &BackgroundExecutor,
179) -> Result<Task<(track::LocalAudioTrack, AudioStream)>> {
180    use util::maybe;
181
182    let (frame_tx, mut frame_rx) = futures::channel::mpsc::unbounded();
183    let (thread_handle, thread_kill_rx) = std::sync::mpsc::channel::<()>();
184    let sample_rate;
185    let channels;
186
187    if cfg!(any(test, feature = "test-support")) {
188        sample_rate = 2;
189        channels = 1;
190    } else {
191        let (device, config) = default_device(true)?;
192        sample_rate = config.sample_rate().0;
193        channels = config.channels() as u32;
194        thread::spawn(move || {
195            maybe!({
196                if let Some(name) = device.name().ok() {
197                    log::info!("Using microphone: {}", name)
198                } else {
199                    log::info!("Using microphone: <unknown>");
200                }
201
202                let stream = device
203                    .build_input_stream_raw(
204                        &config.config(),
205                        cpal::SampleFormat::I16,
206                        move |data, _: &_| {
207                            frame_tx
208                                .unbounded_send(AudioFrame {
209                                    data: Cow::Owned(data.as_slice::<i16>().unwrap().to_vec()),
210                                    sample_rate,
211                                    num_channels: channels,
212                                    samples_per_channel: data.len() as u32 / channels,
213                                })
214                                .ok();
215                        },
216                        |err| log::error!("error capturing audio track: {:?}", err),
217                        None,
218                    )
219                    .context("failed to build input stream")?;
220
221                stream.play()?;
222                // Keep the thread alive and holding onto the `stream`
223                thread_kill_rx.recv().ok();
224                anyhow::Ok(Some(()))
225            })
226            .log_err();
227        });
228    }
229
230    Ok(background_executor.spawn({
231        let background_executor = background_executor.clone();
232        async move {
233            let source = NativeAudioSource::new(
234                AudioSourceOptions {
235                    echo_cancellation: true,
236                    noise_suppression: true,
237                    auto_gain_control: true,
238                },
239                sample_rate,
240                channels,
241                100,
242            );
243            let transmit_task = background_executor.spawn({
244                let source = source.clone();
245                async move {
246                    while let Some(frame) = frame_rx.next().await {
247                        source.capture_frame(&frame).await.log_err();
248                    }
249                }
250            });
251
252            let track = track::LocalAudioTrack::create_audio_track(
253                "microphone",
254                RtcAudioSource::Native(source),
255            );
256
257            (
258                track,
259                AudioStream::Input {
260                    _thread_handle: thread_handle,
261                    _transmit_task: transmit_task,
262                },
263            )
264        }
265    }))
266}
267
268#[cfg(not(target_os = "windows"))]
269pub fn play_remote_audio_track(
270    track: &RemoteAudioTrack,
271    background_executor: &BackgroundExecutor,
272) -> Result<AudioStream> {
273    let track = track.clone();
274    // We track device changes in our output because Livekit has a resampler built in,
275    // and it's easy to create a new native audio stream when the device changes.
276    if cfg!(any(test, feature = "test-support")) {
277        Ok(AudioStream::Output {
278            _task: background_executor.spawn(async {}),
279        })
280    } else {
281        let mut default_change_listener = DeviceChangeListener::new(false)?;
282        let (output_device, output_config) = default_device(false)?;
283
284        let _task = background_executor.spawn({
285            let background_executor = background_executor.clone();
286            async move {
287                let (mut _receive_task, mut _thread) =
288                    start_output_stream(output_config, output_device, &track, &background_executor);
289
290                while let Some(_) = default_change_listener.next().await {
291                    let Some((output_device, output_config)) = get_default_output().log_err()
292                    else {
293                        continue;
294                    };
295
296                    if let Ok(name) = output_device.name() {
297                        log::info!("Using speaker: {}", name)
298                    } else {
299                        log::info!("Using speaker: <unknown>")
300                    }
301
302                    (_receive_task, _thread) = start_output_stream(
303                        output_config,
304                        output_device,
305                        &track,
306                        &background_executor,
307                    );
308                }
309
310                futures::future::pending::<()>().await;
311            }
312        });
313
314        Ok(AudioStream::Output { _task })
315    }
316}
317
318fn default_device(input: bool) -> anyhow::Result<(cpal::Device, cpal::SupportedStreamConfig)> {
319    let device;
320    let config;
321    if input {
322        device = cpal::default_host()
323            .default_input_device()
324            .ok_or_else(|| anyhow!("no audio input device available"))?;
325        config = device
326            .default_input_config()
327            .context("failed to get default input config")?;
328    } else {
329        device = cpal::default_host()
330            .default_output_device()
331            .ok_or_else(|| anyhow!("no audio output device available"))?;
332        config = device
333            .default_output_config()
334            .context("failed to get default output config")?;
335    }
336    Ok((device, config))
337}
338
339#[cfg(not(target_os = "windows"))]
340fn get_default_output() -> anyhow::Result<(cpal::Device, cpal::SupportedStreamConfig)> {
341    let host = cpal::default_host();
342    let output_device = host
343        .default_output_device()
344        .context("failed to read default output device")?;
345    let output_config = output_device.default_output_config()?;
346    Ok((output_device, output_config))
347}
348
349#[cfg(not(target_os = "windows"))]
350fn start_output_stream(
351    output_config: cpal::SupportedStreamConfig,
352    output_device: cpal::Device,
353    track: &track::RemoteAudioTrack,
354    background_executor: &BackgroundExecutor,
355) -> (Task<()>, std::sync::mpsc::Sender<()>) {
356    let buffer = Arc::new(Mutex::new(VecDeque::<i16>::new()));
357    let sample_rate = output_config.sample_rate();
358
359    let mut stream = NativeAudioStream::new(
360        track.rtc_track(),
361        sample_rate.0 as i32,
362        output_config.channels() as i32,
363    );
364
365    let receive_task = background_executor.spawn({
366        let buffer = buffer.clone();
367        async move {
368            const MS_OF_BUFFER: u32 = 100;
369            const MS_IN_SEC: u32 = 1000;
370            while let Some(frame) = stream.next().await {
371                let frame_size = frame.samples_per_channel * frame.num_channels;
372                debug_assert!(frame.data.len() == frame_size as usize);
373
374                let buffer_size =
375                    ((frame.sample_rate * frame.num_channels) / MS_IN_SEC * MS_OF_BUFFER) as usize;
376
377                let mut buffer = buffer.lock();
378                let new_size = buffer.len() + frame.data.len();
379                if new_size > buffer_size {
380                    let overflow = new_size - buffer_size;
381                    buffer.drain(0..overflow);
382                }
383
384                buffer.extend(frame.data.iter());
385            }
386        }
387    });
388
389    // The _output_stream needs to be on it's own thread because it's !Send
390    // and we experienced a deadlock when it's created on the main thread.
391    let (thread, end_on_drop_rx) = std::sync::mpsc::channel::<()>();
392    thread::spawn(move || {
393        if cfg!(any(test, feature = "test-support")) {
394            // Can't play audio in tests
395            return;
396        }
397
398        let output_stream = output_device.build_output_stream(
399            &output_config.config(),
400            {
401                let buffer = buffer.clone();
402                move |data, _info| {
403                    let mut buffer = buffer.lock();
404                    if buffer.len() < data.len() {
405                        // Instead of partially filling a buffer, output silence. If a partial
406                        // buffer was outputted then this could lead to a perpetual state of
407                        // outputting partial buffers as it never gets filled enough for a full
408                        // frame.
409                        data.fill(0);
410                    } else {
411                        // SAFETY: We know that buffer has at least data.len() values in it.
412                        // because we just checked
413                        let mut drain = buffer.drain(..data.len());
414                        data.fill_with(|| unsafe { drain.next().unwrap_unchecked() });
415                    }
416                }
417            },
418            |error| log::error!("error playing audio track: {:?}", error),
419            None,
420        );
421
422        let Some(output_stream) = output_stream.log_err() else {
423            return;
424        };
425
426        output_stream.play().log_err();
427        // Block forever to keep the output stream alive
428        end_on_drop_rx.recv().ok();
429    });
430
431    (receive_task, thread)
432}
433
434#[cfg(target_os = "windows")]
435pub fn play_remote_video_track(
436    track: &track::RemoteVideoTrack,
437) -> impl Stream<Item = RemoteVideoFrame> {
438    futures::stream::empty()
439}
440
441#[cfg(not(target_os = "windows"))]
442pub fn play_remote_video_track(
443    track: &track::RemoteVideoTrack,
444) -> impl Stream<Item = RemoteVideoFrame> {
445    NativeVideoStream::new(track.rtc_track())
446        .filter_map(|frame| async move { video_frame_buffer_from_webrtc(frame.buffer) })
447}
448
449#[cfg(target_os = "macos")]
450pub type RemoteVideoFrame = media::core_video::CVImageBuffer;
451
452#[cfg(target_os = "macos")]
453fn video_frame_buffer_from_webrtc(buffer: Box<dyn VideoBuffer>) -> Option<RemoteVideoFrame> {
454    use core_foundation::base::TCFType as _;
455    use media::core_video::CVImageBuffer;
456
457    let buffer = buffer.as_native()?;
458    let pixel_buffer = buffer.get_cv_pixel_buffer();
459    if pixel_buffer.is_null() {
460        return None;
461    }
462
463    unsafe { Some(CVImageBuffer::wrap_under_get_rule(pixel_buffer as _)) }
464}
465
466#[cfg(not(target_os = "macos"))]
467pub type RemoteVideoFrame = Arc<gpui::RenderImage>;
468
469#[cfg(not(any(target_os = "macos", target_os = "windows")))]
470fn video_frame_buffer_from_webrtc(buffer: Box<dyn VideoBuffer>) -> Option<RemoteVideoFrame> {
471    use gpui::RenderImage;
472    use image::{Frame, RgbaImage};
473    use livekit::webrtc::prelude::VideoFormatType;
474    use smallvec::SmallVec;
475    use std::alloc::{alloc, Layout};
476
477    let width = buffer.width();
478    let height = buffer.height();
479    let stride = width * 4;
480    let byte_len = (stride * height) as usize;
481    let argb_image = unsafe {
482        // Motivation for this unsafe code is to avoid initializing the frame data, since to_argb
483        // will write all bytes anyway.
484        let start_ptr = alloc(Layout::array::<u8>(byte_len).log_err()?);
485        if start_ptr.is_null() {
486            return None;
487        }
488        let bgra_frame_slice = std::slice::from_raw_parts_mut(start_ptr, byte_len);
489        buffer.to_argb(
490            VideoFormatType::ARGB, // For some reason, this displays correctly while RGBA (the correct format) does not
491            bgra_frame_slice,
492            stride,
493            width as i32,
494            height as i32,
495        );
496        Vec::from_raw_parts(start_ptr, byte_len, byte_len)
497    };
498
499    Some(Arc::new(RenderImage::new(SmallVec::from_elem(
500        Frame::new(
501            RgbaImage::from_raw(width, height, argb_image)
502                .with_context(|| "Bug: not enough bytes allocated for image.")
503                .log_err()?,
504        ),
505        1,
506    ))))
507}
508
509#[cfg(target_os = "macos")]
510fn video_frame_buffer_to_webrtc(frame: ScreenCaptureFrame) -> Option<impl AsRef<dyn VideoBuffer>> {
511    use core_foundation::base::TCFType as _;
512
513    let pixel_buffer = frame.0.as_concrete_TypeRef();
514    std::mem::forget(frame.0);
515    unsafe {
516        Some(webrtc::video_frame::native::NativeBuffer::from_cv_pixel_buffer(pixel_buffer as _))
517    }
518}
519
520#[cfg(not(any(target_os = "macos", target_os = "windows")))]
521fn video_frame_buffer_to_webrtc(_frame: ScreenCaptureFrame) -> Option<impl AsRef<dyn VideoBuffer>> {
522    None as Option<Box<dyn VideoBuffer>>
523}
524
525trait DeviceChangeListenerApi: Stream<Item = ()> + Sized {
526    fn new(input: bool) -> Result<Self>;
527}
528
529#[cfg(target_os = "macos")]
530mod macos {
531
532    use coreaudio::sys::{
533        kAudioHardwarePropertyDefaultInputDevice, kAudioHardwarePropertyDefaultOutputDevice,
534        kAudioObjectPropertyElementMaster, kAudioObjectPropertyScopeGlobal,
535        kAudioObjectSystemObject, AudioObjectAddPropertyListener, AudioObjectID,
536        AudioObjectPropertyAddress, AudioObjectRemovePropertyListener, OSStatus,
537    };
538    use futures::{channel::mpsc::UnboundedReceiver, StreamExt};
539
540    use crate::DeviceChangeListenerApi;
541
542    /// Implementation from: https://github.com/zed-industries/cpal/blob/fd8bc2fd39f1f5fdee5a0690656caff9a26d9d50/src/host/coreaudio/macos/property_listener.rs#L15
543    pub struct CoreAudioDefaultDeviceChangeListener {
544        rx: UnboundedReceiver<()>,
545        callback: Box<PropertyListenerCallbackWrapper>,
546        input: bool,
547    }
548
549    trait _AssertSend: Send {}
550    impl _AssertSend for CoreAudioDefaultDeviceChangeListener {}
551
552    struct PropertyListenerCallbackWrapper(Box<dyn FnMut() + Send>);
553
554    unsafe extern "C" fn property_listener_handler_shim(
555        _: AudioObjectID,
556        _: u32,
557        _: *const AudioObjectPropertyAddress,
558        callback: *mut ::std::os::raw::c_void,
559    ) -> OSStatus {
560        let wrapper = callback as *mut PropertyListenerCallbackWrapper;
561        (*wrapper).0();
562        0
563    }
564
565    impl DeviceChangeListenerApi for CoreAudioDefaultDeviceChangeListener {
566        fn new(input: bool) -> gpui::Result<Self> {
567            let (tx, rx) = futures::channel::mpsc::unbounded();
568
569            let callback = Box::new(PropertyListenerCallbackWrapper(Box::new(move || {
570                tx.unbounded_send(()).ok();
571            })));
572
573            unsafe {
574                coreaudio::Error::from_os_status(AudioObjectAddPropertyListener(
575                    kAudioObjectSystemObject,
576                    &AudioObjectPropertyAddress {
577                        mSelector: if input {
578                            kAudioHardwarePropertyDefaultInputDevice
579                        } else {
580                            kAudioHardwarePropertyDefaultOutputDevice
581                        },
582                        mScope: kAudioObjectPropertyScopeGlobal,
583                        mElement: kAudioObjectPropertyElementMaster,
584                    },
585                    Some(property_listener_handler_shim),
586                    &*callback as *const _ as *mut _,
587                ))?;
588            }
589
590            Ok(Self {
591                rx,
592                callback,
593                input,
594            })
595        }
596    }
597
598    impl Drop for CoreAudioDefaultDeviceChangeListener {
599        fn drop(&mut self) {
600            unsafe {
601                AudioObjectRemovePropertyListener(
602                    kAudioObjectSystemObject,
603                    &AudioObjectPropertyAddress {
604                        mSelector: if self.input {
605                            kAudioHardwarePropertyDefaultInputDevice
606                        } else {
607                            kAudioHardwarePropertyDefaultOutputDevice
608                        },
609                        mScope: kAudioObjectPropertyScopeGlobal,
610                        mElement: kAudioObjectPropertyElementMaster,
611                    },
612                    Some(property_listener_handler_shim),
613                    &*self.callback as *const _ as *mut _,
614                );
615            }
616        }
617    }
618
619    impl futures::Stream for CoreAudioDefaultDeviceChangeListener {
620        type Item = ();
621
622        fn poll_next(
623            mut self: std::pin::Pin<&mut Self>,
624            cx: &mut std::task::Context<'_>,
625        ) -> std::task::Poll<Option<Self::Item>> {
626            self.rx.poll_next_unpin(cx)
627        }
628    }
629}
630
631#[cfg(target_os = "macos")]
632type DeviceChangeListener = macos::CoreAudioDefaultDeviceChangeListener;
633
634#[cfg(not(target_os = "macos"))]
635mod noop_change_listener {
636    use std::task::Poll;
637
638    use crate::DeviceChangeListenerApi;
639
640    pub struct NoopOutputDeviceChangelistener {}
641
642    impl DeviceChangeListenerApi for NoopOutputDeviceChangelistener {
643        fn new(_input: bool) -> anyhow::Result<Self> {
644            Ok(NoopOutputDeviceChangelistener {})
645        }
646    }
647
648    impl futures::Stream for NoopOutputDeviceChangelistener {
649        type Item = ();
650
651        fn poll_next(
652            self: std::pin::Pin<&mut Self>,
653            _cx: &mut std::task::Context<'_>,
654        ) -> Poll<Option<Self::Item>> {
655            Poll::Pending
656        }
657    }
658}
659
660#[cfg(not(target_os = "macos"))]
661type DeviceChangeListener = noop_change_listener::NoopOutputDeviceChangelistener;