1mod remote_video_track_view;
2#[cfg(any(test, feature = "test-support"))]
3pub mod test;
4
5use anyhow::{anyhow, Context as _, Result};
6use cpal::traits::{DeviceTrait, HostTrait, StreamTrait as _};
7use futures::{io, Stream, StreamExt as _};
8use gpui::{
9 BackgroundExecutor, ScreenCaptureFrame, ScreenCaptureSource, ScreenCaptureStream, Task,
10};
11use parking_lot::Mutex;
12use std::{borrow::Cow, collections::VecDeque, future::Future, pin::Pin, sync::Arc, thread};
13use util::{debug_panic, ResultExt as _};
14use webrtc::{
15 audio_frame::AudioFrame,
16 audio_source::{native::NativeAudioSource, AudioSourceOptions, RtcAudioSource},
17 audio_stream::native::NativeAudioStream,
18 video_frame::{VideoBuffer, VideoFrame, VideoRotation},
19 video_source::{native::NativeVideoSource, RtcVideoSource, VideoResolution},
20 video_stream::native::NativeVideoStream,
21};
22
23#[cfg(not(any(test, feature = "test-support")))]
24use livekit::track::RemoteAudioTrack;
25#[cfg(not(any(test, feature = "test-support")))]
26pub use livekit::*;
27#[cfg(any(test, feature = "test-support"))]
28use test::track::RemoteAudioTrack;
29#[cfg(any(test, feature = "test-support"))]
30pub use test::*;
31
32pub use remote_video_track_view::{RemoteVideoTrackView, RemoteVideoTrackViewEvent};
33
34pub enum AudioStream {
35 Input {
36 _thread_handle: std::sync::mpsc::Sender<()>,
37 _transmit_task: Task<()>,
38 },
39 Output {
40 _task: Task<()>,
41 },
42}
43
44struct Dispatcher(Arc<dyn gpui::PlatformDispatcher>);
45
46impl livekit::dispatcher::Dispatcher for Dispatcher {
47 fn dispatch(&self, runnable: livekit::dispatcher::Runnable) {
48 self.0.dispatch(runnable, None);
49 }
50
51 fn dispatch_after(
52 &self,
53 duration: std::time::Duration,
54 runnable: livekit::dispatcher::Runnable,
55 ) {
56 self.0.dispatch_after(duration, runnable);
57 }
58}
59
60struct HttpClientAdapter(Arc<dyn http_client::HttpClient>);
61
62fn http_2_status(status: http_client::http::StatusCode) -> http_2::StatusCode {
63 http_2::StatusCode::from_u16(status.as_u16())
64 .expect("valid status code to status code conversion")
65}
66
67impl livekit::dispatcher::HttpClient for HttpClientAdapter {
68 fn get(
69 &self,
70 url: &str,
71 ) -> Pin<Box<dyn Future<Output = io::Result<livekit::dispatcher::Response>> + Send>> {
72 let http_client = self.0.clone();
73 let url = url.to_string();
74 Box::pin(async move {
75 let response = http_client
76 .get(&url, http_client::AsyncBody::empty(), false)
77 .await
78 .map_err(io::Error::other)?;
79 Ok(livekit::dispatcher::Response {
80 status: http_2_status(response.status()),
81 body: Box::pin(response.into_body()),
82 })
83 })
84 }
85
86 fn send_async(
87 &self,
88 request: http_2::Request<Vec<u8>>,
89 ) -> Pin<Box<dyn Future<Output = io::Result<livekit::dispatcher::Response>> + Send>> {
90 let http_client = self.0.clone();
91 let mut builder = http_client::http::Request::builder()
92 .method(request.method().as_str())
93 .uri(request.uri().to_string());
94
95 for (key, value) in request.headers().iter() {
96 builder = builder.header(key.as_str(), value.as_bytes());
97 }
98
99 if !request.extensions().is_empty() {
100 debug_panic!(
101 "Livekit sent an HTTP request with a protocol extension that Zed doesn't support!"
102 );
103 }
104
105 let request = builder
106 .body(http_client::AsyncBody::from_bytes(
107 request.into_body().into(),
108 ))
109 .unwrap();
110
111 Box::pin(async move {
112 let response = http_client.send(request).await.map_err(io::Error::other)?;
113 Ok(livekit::dispatcher::Response {
114 status: http_2_status(response.status()),
115 body: Box::pin(response.into_body()),
116 })
117 })
118 }
119}
120
121pub fn init(
122 dispatcher: Arc<dyn gpui::PlatformDispatcher>,
123 http_client: Arc<dyn http_client::HttpClient>,
124) {
125 livekit::dispatcher::set_dispatcher(Dispatcher(dispatcher));
126 livekit::dispatcher::set_http_client(HttpClientAdapter(http_client));
127}
128
129pub async fn capture_local_video_track(
130 capture_source: &dyn ScreenCaptureSource,
131) -> Result<(track::LocalVideoTrack, Box<dyn ScreenCaptureStream>)> {
132 let resolution = capture_source.resolution()?;
133 let track_source = NativeVideoSource::new(VideoResolution {
134 width: resolution.width.0 as u32,
135 height: resolution.height.0 as u32,
136 });
137
138 let capture_stream = capture_source
139 .stream({
140 let track_source = track_source.clone();
141 Box::new(move |frame| {
142 if let Some(buffer) = video_frame_buffer_to_webrtc(frame) {
143 track_source.capture_frame(&VideoFrame {
144 rotation: VideoRotation::VideoRotation0,
145 timestamp_us: 0,
146 buffer,
147 });
148 }
149 })
150 })
151 .await??;
152
153 Ok((
154 track::LocalVideoTrack::create_video_track(
155 "screen share",
156 RtcVideoSource::Native(track_source),
157 ),
158 capture_stream,
159 ))
160}
161
162pub fn capture_local_audio_track(
163 background_executor: &BackgroundExecutor,
164) -> Result<Task<(track::LocalAudioTrack, AudioStream)>> {
165 use util::maybe;
166
167 let (frame_tx, mut frame_rx) = futures::channel::mpsc::unbounded();
168 let (thread_handle, thread_kill_rx) = std::sync::mpsc::channel::<()>();
169 let sample_rate;
170 let channels;
171
172 if cfg!(any(test, feature = "test-support")) {
173 sample_rate = 2;
174 channels = 1;
175 } else {
176 let (device, config) = default_device(true)?;
177 sample_rate = config.sample_rate().0;
178 channels = config.channels() as u32;
179 thread::spawn(move || {
180 maybe!({
181 if let Some(name) = device.name().ok() {
182 log::info!("Using microphone: {}", name)
183 } else {
184 log::info!("Using microphone: <unknown>");
185 }
186
187 let stream = device
188 .build_input_stream_raw(
189 &config.config(),
190 cpal::SampleFormat::I16,
191 move |data, _: &_| {
192 frame_tx
193 .unbounded_send(AudioFrame {
194 data: Cow::Owned(data.as_slice::<i16>().unwrap().to_vec()),
195 sample_rate,
196 num_channels: channels,
197 samples_per_channel: data.len() as u32 / channels,
198 })
199 .ok();
200 },
201 |err| log::error!("error capturing audio track: {:?}", err),
202 None,
203 )
204 .context("failed to build input stream")?;
205
206 stream.play()?;
207 // Keep the thread alive and holding onto the `stream`
208 thread_kill_rx.recv().ok();
209 anyhow::Ok(Some(()))
210 })
211 .log_err();
212 });
213 }
214
215 Ok(background_executor.spawn({
216 let background_executor = background_executor.clone();
217 async move {
218 let source = NativeAudioSource::new(
219 AudioSourceOptions {
220 echo_cancellation: true,
221 noise_suppression: true,
222 auto_gain_control: true,
223 },
224 sample_rate,
225 channels,
226 100,
227 );
228 let transmit_task = background_executor.spawn({
229 let source = source.clone();
230 async move {
231 while let Some(frame) = frame_rx.next().await {
232 source.capture_frame(&frame).await.log_err();
233 }
234 }
235 });
236
237 let track = track::LocalAudioTrack::create_audio_track(
238 "microphone",
239 RtcAudioSource::Native(source),
240 );
241
242 (
243 track,
244 AudioStream::Input {
245 _thread_handle: thread_handle,
246 _transmit_task: transmit_task,
247 },
248 )
249 }
250 }))
251}
252
253pub fn play_remote_audio_track(
254 track: &RemoteAudioTrack,
255 background_executor: &BackgroundExecutor,
256) -> Result<AudioStream> {
257 let track = track.clone();
258 // We track device changes in our output because Livekit has a resampler built in,
259 // and it's easy to create a new native audio stream when the device changes.
260 if cfg!(any(test, feature = "test-support")) {
261 Ok(AudioStream::Output {
262 _task: background_executor.spawn(async {}),
263 })
264 } else {
265 let mut default_change_listener = DeviceChangeListener::new(false)?;
266 let (output_device, output_config) = default_device(false)?;
267
268 let _task = background_executor.spawn({
269 let background_executor = background_executor.clone();
270 async move {
271 let (mut _receive_task, mut _thread) =
272 start_output_stream(output_config, output_device, &track, &background_executor);
273
274 while let Some(_) = default_change_listener.next().await {
275 let Some((output_device, output_config)) = get_default_output().log_err()
276 else {
277 continue;
278 };
279
280 if let Ok(name) = output_device.name() {
281 log::info!("Using speaker: {}", name)
282 } else {
283 log::info!("Using speaker: <unknown>")
284 }
285
286 (_receive_task, _thread) = start_output_stream(
287 output_config,
288 output_device,
289 &track,
290 &background_executor,
291 );
292 }
293
294 futures::future::pending::<()>().await;
295 }
296 });
297
298 Ok(AudioStream::Output { _task })
299 }
300}
301
302fn default_device(input: bool) -> anyhow::Result<(cpal::Device, cpal::SupportedStreamConfig)> {
303 let device;
304 let config;
305 if input {
306 device = cpal::default_host()
307 .default_input_device()
308 .ok_or_else(|| anyhow!("no audio input device available"))?;
309 config = device
310 .default_input_config()
311 .context("failed to get default input config")?;
312 } else {
313 device = cpal::default_host()
314 .default_output_device()
315 .ok_or_else(|| anyhow!("no audio output device available"))?;
316 config = device
317 .default_output_config()
318 .context("failed to get default output config")?;
319 }
320 Ok((device, config))
321}
322
323fn get_default_output() -> anyhow::Result<(cpal::Device, cpal::SupportedStreamConfig)> {
324 let host = cpal::default_host();
325 let output_device = host
326 .default_output_device()
327 .context("failed to read default output device")?;
328 let output_config = output_device.default_output_config()?;
329 Ok((output_device, output_config))
330}
331
332fn start_output_stream(
333 output_config: cpal::SupportedStreamConfig,
334 output_device: cpal::Device,
335 track: &track::RemoteAudioTrack,
336 background_executor: &BackgroundExecutor,
337) -> (Task<()>, std::sync::mpsc::Sender<()>) {
338 let buffer = Arc::new(Mutex::new(VecDeque::<i16>::new()));
339 let sample_rate = output_config.sample_rate();
340
341 let mut stream = NativeAudioStream::new(
342 track.rtc_track(),
343 sample_rate.0 as i32,
344 output_config.channels() as i32,
345 );
346
347 let receive_task = background_executor.spawn({
348 let buffer = buffer.clone();
349 async move {
350 const MS_OF_BUFFER: u32 = 100;
351 const MS_IN_SEC: u32 = 1000;
352 while let Some(frame) = stream.next().await {
353 let frame_size = frame.samples_per_channel * frame.num_channels;
354 debug_assert!(frame.data.len() == frame_size as usize);
355
356 let buffer_size =
357 ((frame.sample_rate * frame.num_channels) / MS_IN_SEC * MS_OF_BUFFER) as usize;
358
359 let mut buffer = buffer.lock();
360 let new_size = buffer.len() + frame.data.len();
361 if new_size > buffer_size {
362 let overflow = new_size - buffer_size;
363 buffer.drain(0..overflow);
364 }
365
366 buffer.extend(frame.data.iter());
367 }
368 }
369 });
370
371 // The _output_stream needs to be on it's own thread because it's !Send
372 // and we experienced a deadlock when it's created on the main thread.
373 let (thread, end_on_drop_rx) = std::sync::mpsc::channel::<()>();
374 thread::spawn(move || {
375 if cfg!(any(test, feature = "test-support")) {
376 // Can't play audio in tests
377 return;
378 }
379
380 let output_stream = output_device.build_output_stream(
381 &output_config.config(),
382 {
383 let buffer = buffer.clone();
384 move |data, _info| {
385 let mut buffer = buffer.lock();
386 if buffer.len() < data.len() {
387 // Instead of partially filling a buffer, output silence. If a partial
388 // buffer was outputted then this could lead to a perpetual state of
389 // outputting partial buffers as it never gets filled enough for a full
390 // frame.
391 data.fill(0);
392 } else {
393 // SAFETY: We know that buffer has at least data.len() values in it.
394 // because we just checked
395 let mut drain = buffer.drain(..data.len());
396 data.fill_with(|| unsafe { drain.next().unwrap_unchecked() });
397 }
398 }
399 },
400 |error| log::error!("error playing audio track: {:?}", error),
401 None,
402 );
403
404 let Some(output_stream) = output_stream.log_err() else {
405 return;
406 };
407
408 output_stream.play().log_err();
409 // Block forever to keep the output stream alive
410 end_on_drop_rx.recv().ok();
411 });
412
413 (receive_task, thread)
414}
415
416pub fn play_remote_video_track(
417 track: &track::RemoteVideoTrack,
418) -> impl Stream<Item = RemoteVideoFrame> {
419 NativeVideoStream::new(track.rtc_track())
420 .filter_map(|frame| async move { video_frame_buffer_from_webrtc(frame.buffer) })
421}
422
423#[cfg(target_os = "macos")]
424pub type RemoteVideoFrame = media::core_video::CVImageBuffer;
425
426#[cfg(target_os = "macos")]
427fn video_frame_buffer_from_webrtc(buffer: Box<dyn VideoBuffer>) -> Option<RemoteVideoFrame> {
428 use core_foundation::base::TCFType as _;
429 use media::core_video::CVImageBuffer;
430
431 let buffer = buffer.as_native()?;
432 let pixel_buffer = buffer.get_cv_pixel_buffer();
433 if pixel_buffer.is_null() {
434 return None;
435 }
436
437 unsafe { Some(CVImageBuffer::wrap_under_get_rule(pixel_buffer as _)) }
438}
439
440#[cfg(not(target_os = "macos"))]
441pub type RemoteVideoFrame = Arc<gpui::RenderImage>;
442
443#[cfg(not(target_os = "macos"))]
444fn video_frame_buffer_from_webrtc(buffer: Box<dyn VideoBuffer>) -> Option<RemoteVideoFrame> {
445 use gpui::RenderImage;
446 use image::{Frame, RgbaImage};
447 use livekit::webrtc::prelude::VideoFormatType;
448 use smallvec::SmallVec;
449 use std::alloc::{alloc, Layout};
450
451 let width = buffer.width();
452 let height = buffer.height();
453 let stride = width * 4;
454 let byte_len = (stride * height) as usize;
455 let argb_image = unsafe {
456 // Motivation for this unsafe code is to avoid initializing the frame data, since to_argb
457 // will write all bytes anyway.
458 let start_ptr = alloc(Layout::array::<u8>(byte_len).log_err()?);
459 if start_ptr.is_null() {
460 return None;
461 }
462 let bgra_frame_slice = std::slice::from_raw_parts_mut(start_ptr, byte_len);
463 buffer.to_argb(
464 VideoFormatType::ARGB, // For some reason, this displays correctly while RGBA (the correct format) does not
465 bgra_frame_slice,
466 stride,
467 width as i32,
468 height as i32,
469 );
470 Vec::from_raw_parts(start_ptr, byte_len, byte_len)
471 };
472
473 Some(Arc::new(RenderImage::new(SmallVec::from_elem(
474 Frame::new(
475 RgbaImage::from_raw(width, height, argb_image)
476 .with_context(|| "Bug: not enough bytes allocated for image.")
477 .log_err()?,
478 ),
479 1,
480 ))))
481}
482
483#[cfg(target_os = "macos")]
484fn video_frame_buffer_to_webrtc(frame: ScreenCaptureFrame) -> Option<impl AsRef<dyn VideoBuffer>> {
485 use core_foundation::base::TCFType as _;
486
487 let pixel_buffer = frame.0.as_concrete_TypeRef();
488 std::mem::forget(frame.0);
489 unsafe {
490 Some(webrtc::video_frame::native::NativeBuffer::from_cv_pixel_buffer(pixel_buffer as _))
491 }
492}
493
494#[cfg(not(target_os = "macos"))]
495fn video_frame_buffer_to_webrtc(_frame: ScreenCaptureFrame) -> Option<impl AsRef<dyn VideoBuffer>> {
496 None as Option<Box<dyn VideoBuffer>>
497}
498
499trait DeviceChangeListenerApi: Stream<Item = ()> + Sized {
500 fn new(input: bool) -> Result<Self>;
501}
502
503#[cfg(target_os = "macos")]
504mod macos {
505
506 use coreaudio::sys::{
507 kAudioHardwarePropertyDefaultInputDevice, kAudioHardwarePropertyDefaultOutputDevice,
508 kAudioObjectPropertyElementMaster, kAudioObjectPropertyScopeGlobal,
509 kAudioObjectSystemObject, AudioObjectAddPropertyListener, AudioObjectID,
510 AudioObjectPropertyAddress, AudioObjectRemovePropertyListener, OSStatus,
511 };
512 use futures::{channel::mpsc::UnboundedReceiver, StreamExt};
513
514 use crate::DeviceChangeListenerApi;
515
516 /// Implementation from: https://github.com/zed-industries/cpal/blob/fd8bc2fd39f1f5fdee5a0690656caff9a26d9d50/src/host/coreaudio/macos/property_listener.rs#L15
517 pub struct CoreAudioDefaultDeviceChangeListener {
518 rx: UnboundedReceiver<()>,
519 callback: Box<PropertyListenerCallbackWrapper>,
520 input: bool,
521 }
522
523 trait _AssertSend: Send {}
524 impl _AssertSend for CoreAudioDefaultDeviceChangeListener {}
525
526 struct PropertyListenerCallbackWrapper(Box<dyn FnMut() + Send>);
527
528 unsafe extern "C" fn property_listener_handler_shim(
529 _: AudioObjectID,
530 _: u32,
531 _: *const AudioObjectPropertyAddress,
532 callback: *mut ::std::os::raw::c_void,
533 ) -> OSStatus {
534 let wrapper = callback as *mut PropertyListenerCallbackWrapper;
535 (*wrapper).0();
536 0
537 }
538
539 impl DeviceChangeListenerApi for CoreAudioDefaultDeviceChangeListener {
540 fn new(input: bool) -> gpui::Result<Self> {
541 let (tx, rx) = futures::channel::mpsc::unbounded();
542
543 let callback = Box::new(PropertyListenerCallbackWrapper(Box::new(move || {
544 tx.unbounded_send(()).ok();
545 })));
546
547 unsafe {
548 coreaudio::Error::from_os_status(AudioObjectAddPropertyListener(
549 kAudioObjectSystemObject,
550 &AudioObjectPropertyAddress {
551 mSelector: if input {
552 kAudioHardwarePropertyDefaultInputDevice
553 } else {
554 kAudioHardwarePropertyDefaultOutputDevice
555 },
556 mScope: kAudioObjectPropertyScopeGlobal,
557 mElement: kAudioObjectPropertyElementMaster,
558 },
559 Some(property_listener_handler_shim),
560 &*callback as *const _ as *mut _,
561 ))?;
562 }
563
564 Ok(Self {
565 rx,
566 callback,
567 input,
568 })
569 }
570 }
571
572 impl Drop for CoreAudioDefaultDeviceChangeListener {
573 fn drop(&mut self) {
574 unsafe {
575 AudioObjectRemovePropertyListener(
576 kAudioObjectSystemObject,
577 &AudioObjectPropertyAddress {
578 mSelector: if self.input {
579 kAudioHardwarePropertyDefaultInputDevice
580 } else {
581 kAudioHardwarePropertyDefaultOutputDevice
582 },
583 mScope: kAudioObjectPropertyScopeGlobal,
584 mElement: kAudioObjectPropertyElementMaster,
585 },
586 Some(property_listener_handler_shim),
587 &*self.callback as *const _ as *mut _,
588 );
589 }
590 }
591 }
592
593 impl futures::Stream for CoreAudioDefaultDeviceChangeListener {
594 type Item = ();
595
596 fn poll_next(
597 mut self: std::pin::Pin<&mut Self>,
598 cx: &mut std::task::Context<'_>,
599 ) -> std::task::Poll<Option<Self::Item>> {
600 self.rx.poll_next_unpin(cx)
601 }
602 }
603}
604
605#[cfg(target_os = "macos")]
606type DeviceChangeListener = macos::CoreAudioDefaultDeviceChangeListener;
607
608#[cfg(not(target_os = "macos"))]
609mod noop_change_listener {
610 use std::task::Poll;
611
612 use crate::DeviceChangeListenerApi;
613
614 pub struct NoopOutputDeviceChangelistener {}
615
616 impl DeviceChangeListenerApi for NoopOutputDeviceChangelistener {
617 fn new(_input: bool) -> anyhow::Result<Self> {
618 Ok(NoopOutputDeviceChangelistener {})
619 }
620 }
621
622 impl futures::Stream for NoopOutputDeviceChangelistener {
623 type Item = ();
624
625 fn poll_next(
626 self: std::pin::Pin<&mut Self>,
627 _cx: &mut std::task::Context<'_>,
628 ) -> Poll<Option<Self::Item>> {
629 Poll::Pending
630 }
631 }
632}
633
634#[cfg(not(target_os = "macos"))]
635type DeviceChangeListener = noop_change_listener::NoopOutputDeviceChangelistener;