1#![cfg_attr(target_os = "windows", allow(unused))]
2
3mod remote_video_track_view;
4#[cfg(any(test, feature = "test-support", target_os = "windows"))]
5pub mod test;
6
7use anyhow::{anyhow, Context as _, Result};
8use cpal::traits::{DeviceTrait, HostTrait, StreamTrait as _};
9use futures::{io, Stream, StreamExt as _};
10use gpui::{
11 BackgroundExecutor, ScreenCaptureFrame, ScreenCaptureSource, ScreenCaptureStream, Task,
12};
13use parking_lot::Mutex;
14use std::{borrow::Cow, collections::VecDeque, future::Future, pin::Pin, sync::Arc, thread};
15use util::{debug_panic, ResultExt as _};
16#[cfg(not(target_os = "windows"))]
17use webrtc::{
18 audio_frame::AudioFrame,
19 audio_source::{native::NativeAudioSource, AudioSourceOptions, RtcAudioSource},
20 audio_stream::native::NativeAudioStream,
21 video_frame::{VideoBuffer, VideoFrame, VideoRotation},
22 video_source::{native::NativeVideoSource, RtcVideoSource, VideoResolution},
23 video_stream::native::NativeVideoStream,
24};
25
26#[cfg(all(not(any(test, feature = "test-support")), not(target_os = "windows")))]
27use livekit::track::RemoteAudioTrack;
28#[cfg(all(not(any(test, feature = "test-support")), not(target_os = "windows")))]
29pub use livekit::*;
30#[cfg(any(test, feature = "test-support", target_os = "windows"))]
31use test::track::RemoteAudioTrack;
32#[cfg(any(test, feature = "test-support", target_os = "windows"))]
33pub use test::*;
34
35pub use remote_video_track_view::{RemoteVideoTrackView, RemoteVideoTrackViewEvent};
36
37pub enum AudioStream {
38 Input {
39 _thread_handle: std::sync::mpsc::Sender<()>,
40 _transmit_task: Task<()>,
41 },
42 Output {
43 _task: Task<()>,
44 },
45}
46
47struct Dispatcher(Arc<dyn gpui::PlatformDispatcher>);
48
49#[cfg(not(target_os = "windows"))]
50impl livekit::dispatcher::Dispatcher for Dispatcher {
51 fn dispatch(&self, runnable: livekit::dispatcher::Runnable) {
52 self.0.dispatch(runnable, None);
53 }
54
55 fn dispatch_after(
56 &self,
57 duration: std::time::Duration,
58 runnable: livekit::dispatcher::Runnable,
59 ) {
60 self.0.dispatch_after(duration, runnable);
61 }
62}
63
64struct HttpClientAdapter(Arc<dyn http_client::HttpClient>);
65
66fn http_2_status(status: http_client::http::StatusCode) -> http_2::StatusCode {
67 http_2::StatusCode::from_u16(status.as_u16())
68 .expect("valid status code to status code conversion")
69}
70
71#[cfg(not(target_os = "windows"))]
72impl livekit::dispatcher::HttpClient for HttpClientAdapter {
73 fn get(
74 &self,
75 url: &str,
76 ) -> Pin<Box<dyn Future<Output = io::Result<livekit::dispatcher::Response>> + Send>> {
77 let http_client = self.0.clone();
78 let url = url.to_string();
79 Box::pin(async move {
80 let response = http_client
81 .get(&url, http_client::AsyncBody::empty(), false)
82 .await
83 .map_err(io::Error::other)?;
84 Ok(livekit::dispatcher::Response {
85 status: http_2_status(response.status()),
86 body: Box::pin(response.into_body()),
87 })
88 })
89 }
90
91 fn send_async(
92 &self,
93 request: http_2::Request<Vec<u8>>,
94 ) -> Pin<Box<dyn Future<Output = io::Result<livekit::dispatcher::Response>> + Send>> {
95 let http_client = self.0.clone();
96 let mut builder = http_client::http::Request::builder()
97 .method(request.method().as_str())
98 .uri(request.uri().to_string());
99
100 for (key, value) in request.headers().iter() {
101 builder = builder.header(key.as_str(), value.as_bytes());
102 }
103
104 if !request.extensions().is_empty() {
105 debug_panic!(
106 "Livekit sent an HTTP request with a protocol extension that Zed doesn't support!"
107 );
108 }
109
110 let request = builder
111 .body(http_client::AsyncBody::from_bytes(
112 request.into_body().into(),
113 ))
114 .unwrap();
115
116 Box::pin(async move {
117 let response = http_client.send(request).await.map_err(io::Error::other)?;
118 Ok(livekit::dispatcher::Response {
119 status: http_2_status(response.status()),
120 body: Box::pin(response.into_body()),
121 })
122 })
123 }
124}
125
126#[cfg(target_os = "windows")]
127pub fn init(
128 dispatcher: Arc<dyn gpui::PlatformDispatcher>,
129 http_client: Arc<dyn http_client::HttpClient>,
130) {
131}
132
133#[cfg(not(target_os = "windows"))]
134pub fn init(
135 dispatcher: Arc<dyn gpui::PlatformDispatcher>,
136 http_client: Arc<dyn http_client::HttpClient>,
137) {
138 livekit::dispatcher::set_dispatcher(Dispatcher(dispatcher));
139 livekit::dispatcher::set_http_client(HttpClientAdapter(http_client));
140}
141
142#[cfg(not(target_os = "windows"))]
143pub async fn capture_local_video_track(
144 capture_source: &dyn ScreenCaptureSource,
145) -> Result<(track::LocalVideoTrack, Box<dyn ScreenCaptureStream>)> {
146 let resolution = capture_source.resolution()?;
147 let track_source = NativeVideoSource::new(VideoResolution {
148 width: resolution.width.0 as u32,
149 height: resolution.height.0 as u32,
150 });
151
152 let capture_stream = capture_source
153 .stream({
154 let track_source = track_source.clone();
155 Box::new(move |frame| {
156 if let Some(buffer) = video_frame_buffer_to_webrtc(frame) {
157 track_source.capture_frame(&VideoFrame {
158 rotation: VideoRotation::VideoRotation0,
159 timestamp_us: 0,
160 buffer,
161 });
162 }
163 })
164 })
165 .await??;
166
167 Ok((
168 track::LocalVideoTrack::create_video_track(
169 "screen share",
170 RtcVideoSource::Native(track_source),
171 ),
172 capture_stream,
173 ))
174}
175
176#[cfg(not(target_os = "windows"))]
177pub fn capture_local_audio_track(
178 background_executor: &BackgroundExecutor,
179) -> Result<Task<(track::LocalAudioTrack, AudioStream)>> {
180 use util::maybe;
181
182 let (frame_tx, mut frame_rx) = futures::channel::mpsc::unbounded();
183 let (thread_handle, thread_kill_rx) = std::sync::mpsc::channel::<()>();
184 let sample_rate;
185 let channels;
186
187 if cfg!(any(test, feature = "test-support")) {
188 sample_rate = 2;
189 channels = 1;
190 } else {
191 let (device, config) = default_device(true)?;
192 sample_rate = config.sample_rate().0;
193 channels = config.channels() as u32;
194 thread::spawn(move || {
195 maybe!({
196 if let Some(name) = device.name().ok() {
197 log::info!("Using microphone: {}", name)
198 } else {
199 log::info!("Using microphone: <unknown>");
200 }
201
202 let stream = device
203 .build_input_stream_raw(
204 &config.config(),
205 cpal::SampleFormat::I16,
206 move |data, _: &_| {
207 frame_tx
208 .unbounded_send(AudioFrame {
209 data: Cow::Owned(data.as_slice::<i16>().unwrap().to_vec()),
210 sample_rate,
211 num_channels: channels,
212 samples_per_channel: data.len() as u32 / channels,
213 })
214 .ok();
215 },
216 |err| log::error!("error capturing audio track: {:?}", err),
217 None,
218 )
219 .context("failed to build input stream")?;
220
221 stream.play()?;
222 // Keep the thread alive and holding onto the `stream`
223 thread_kill_rx.recv().ok();
224 anyhow::Ok(Some(()))
225 })
226 .log_err();
227 });
228 }
229
230 Ok(background_executor.spawn({
231 let background_executor = background_executor.clone();
232 async move {
233 let source = NativeAudioSource::new(
234 AudioSourceOptions {
235 echo_cancellation: true,
236 noise_suppression: true,
237 auto_gain_control: true,
238 },
239 sample_rate,
240 channels,
241 100,
242 );
243 let transmit_task = background_executor.spawn({
244 let source = source.clone();
245 async move {
246 while let Some(frame) = frame_rx.next().await {
247 source.capture_frame(&frame).await.log_err();
248 }
249 }
250 });
251
252 let track = track::LocalAudioTrack::create_audio_track(
253 "microphone",
254 RtcAudioSource::Native(source),
255 );
256
257 (
258 track,
259 AudioStream::Input {
260 _thread_handle: thread_handle,
261 _transmit_task: transmit_task,
262 },
263 )
264 }
265 }))
266}
267
268#[cfg(not(target_os = "windows"))]
269pub fn play_remote_audio_track(
270 track: &RemoteAudioTrack,
271 background_executor: &BackgroundExecutor,
272) -> Result<AudioStream> {
273 let track = track.clone();
274 // We track device changes in our output because Livekit has a resampler built in,
275 // and it's easy to create a new native audio stream when the device changes.
276 if cfg!(any(test, feature = "test-support")) {
277 Ok(AudioStream::Output {
278 _task: background_executor.spawn(async {}),
279 })
280 } else {
281 let mut default_change_listener = DeviceChangeListener::new(false)?;
282 let (output_device, output_config) = default_device(false)?;
283
284 let _task = background_executor.spawn({
285 let background_executor = background_executor.clone();
286 async move {
287 let (mut _receive_task, mut _thread) =
288 start_output_stream(output_config, output_device, &track, &background_executor);
289
290 while let Some(_) = default_change_listener.next().await {
291 let Some((output_device, output_config)) = get_default_output().log_err()
292 else {
293 continue;
294 };
295
296 if let Ok(name) = output_device.name() {
297 log::info!("Using speaker: {}", name)
298 } else {
299 log::info!("Using speaker: <unknown>")
300 }
301
302 (_receive_task, _thread) = start_output_stream(
303 output_config,
304 output_device,
305 &track,
306 &background_executor,
307 );
308 }
309
310 futures::future::pending::<()>().await;
311 }
312 });
313
314 Ok(AudioStream::Output { _task })
315 }
316}
317
318fn default_device(input: bool) -> anyhow::Result<(cpal::Device, cpal::SupportedStreamConfig)> {
319 let device;
320 let config;
321 if input {
322 device = cpal::default_host()
323 .default_input_device()
324 .ok_or_else(|| anyhow!("no audio input device available"))?;
325 config = device
326 .default_input_config()
327 .context("failed to get default input config")?;
328 } else {
329 device = cpal::default_host()
330 .default_output_device()
331 .ok_or_else(|| anyhow!("no audio output device available"))?;
332 config = device
333 .default_output_config()
334 .context("failed to get default output config")?;
335 }
336 Ok((device, config))
337}
338
339#[cfg(not(target_os = "windows"))]
340fn get_default_output() -> anyhow::Result<(cpal::Device, cpal::SupportedStreamConfig)> {
341 let host = cpal::default_host();
342 let output_device = host
343 .default_output_device()
344 .context("failed to read default output device")?;
345 let output_config = output_device.default_output_config()?;
346 Ok((output_device, output_config))
347}
348
349#[cfg(not(target_os = "windows"))]
350fn start_output_stream(
351 output_config: cpal::SupportedStreamConfig,
352 output_device: cpal::Device,
353 track: &track::RemoteAudioTrack,
354 background_executor: &BackgroundExecutor,
355) -> (Task<()>, std::sync::mpsc::Sender<()>) {
356 let buffer = Arc::new(Mutex::new(VecDeque::<i16>::new()));
357 let sample_rate = output_config.sample_rate();
358
359 let mut stream = NativeAudioStream::new(
360 track.rtc_track(),
361 sample_rate.0 as i32,
362 output_config.channels() as i32,
363 );
364
365 let receive_task = background_executor.spawn({
366 let buffer = buffer.clone();
367 async move {
368 const MS_OF_BUFFER: u32 = 100;
369 const MS_IN_SEC: u32 = 1000;
370 while let Some(frame) = stream.next().await {
371 let frame_size = frame.samples_per_channel * frame.num_channels;
372 debug_assert!(frame.data.len() == frame_size as usize);
373
374 let buffer_size =
375 ((frame.sample_rate * frame.num_channels) / MS_IN_SEC * MS_OF_BUFFER) as usize;
376
377 let mut buffer = buffer.lock();
378 let new_size = buffer.len() + frame.data.len();
379 if new_size > buffer_size {
380 let overflow = new_size - buffer_size;
381 buffer.drain(0..overflow);
382 }
383
384 buffer.extend(frame.data.iter());
385 }
386 }
387 });
388
389 // The _output_stream needs to be on it's own thread because it's !Send
390 // and we experienced a deadlock when it's created on the main thread.
391 let (thread, end_on_drop_rx) = std::sync::mpsc::channel::<()>();
392 thread::spawn(move || {
393 if cfg!(any(test, feature = "test-support")) {
394 // Can't play audio in tests
395 return;
396 }
397
398 let output_stream = output_device.build_output_stream(
399 &output_config.config(),
400 {
401 let buffer = buffer.clone();
402 move |data, _info| {
403 let mut buffer = buffer.lock();
404 if buffer.len() < data.len() {
405 // Instead of partially filling a buffer, output silence. If a partial
406 // buffer was outputted then this could lead to a perpetual state of
407 // outputting partial buffers as it never gets filled enough for a full
408 // frame.
409 data.fill(0);
410 } else {
411 // SAFETY: We know that buffer has at least data.len() values in it.
412 // because we just checked
413 let mut drain = buffer.drain(..data.len());
414 data.fill_with(|| unsafe { drain.next().unwrap_unchecked() });
415 }
416 }
417 },
418 |error| log::error!("error playing audio track: {:?}", error),
419 None,
420 );
421
422 let Some(output_stream) = output_stream.log_err() else {
423 return;
424 };
425
426 output_stream.play().log_err();
427 // Block forever to keep the output stream alive
428 end_on_drop_rx.recv().ok();
429 });
430
431 (receive_task, thread)
432}
433
434#[cfg(target_os = "windows")]
435pub fn play_remote_video_track(
436 track: &track::RemoteVideoTrack,
437) -> impl Stream<Item = RemoteVideoFrame> {
438 futures::stream::empty()
439}
440
441#[cfg(not(target_os = "windows"))]
442pub fn play_remote_video_track(
443 track: &track::RemoteVideoTrack,
444) -> impl Stream<Item = RemoteVideoFrame> {
445 NativeVideoStream::new(track.rtc_track())
446 .filter_map(|frame| async move { video_frame_buffer_from_webrtc(frame.buffer) })
447}
448
449#[cfg(target_os = "macos")]
450pub type RemoteVideoFrame = media::core_video::CVImageBuffer;
451
452#[cfg(target_os = "macos")]
453fn video_frame_buffer_from_webrtc(buffer: Box<dyn VideoBuffer>) -> Option<RemoteVideoFrame> {
454 use core_foundation::base::TCFType as _;
455 use media::core_video::CVImageBuffer;
456
457 let buffer = buffer.as_native()?;
458 let pixel_buffer = buffer.get_cv_pixel_buffer();
459 if pixel_buffer.is_null() {
460 return None;
461 }
462
463 unsafe { Some(CVImageBuffer::wrap_under_get_rule(pixel_buffer as _)) }
464}
465
466#[cfg(not(target_os = "macos"))]
467pub type RemoteVideoFrame = Arc<gpui::RenderImage>;
468
469#[cfg(not(any(target_os = "macos", target_os = "windows")))]
470fn video_frame_buffer_from_webrtc(buffer: Box<dyn VideoBuffer>) -> Option<RemoteVideoFrame> {
471 use gpui::RenderImage;
472 use image::{Frame, RgbaImage};
473 use livekit::webrtc::prelude::VideoFormatType;
474 use smallvec::SmallVec;
475 use std::alloc::{alloc, Layout};
476
477 let width = buffer.width();
478 let height = buffer.height();
479 let stride = width * 4;
480 let byte_len = (stride * height) as usize;
481 let argb_image = unsafe {
482 // Motivation for this unsafe code is to avoid initializing the frame data, since to_argb
483 // will write all bytes anyway.
484 let start_ptr = alloc(Layout::array::<u8>(byte_len).log_err()?);
485 if start_ptr.is_null() {
486 return None;
487 }
488 let bgra_frame_slice = std::slice::from_raw_parts_mut(start_ptr, byte_len);
489 buffer.to_argb(
490 VideoFormatType::ARGB, // For some reason, this displays correctly while RGBA (the correct format) does not
491 bgra_frame_slice,
492 stride,
493 width as i32,
494 height as i32,
495 );
496 Vec::from_raw_parts(start_ptr, byte_len, byte_len)
497 };
498
499 Some(Arc::new(RenderImage::new(SmallVec::from_elem(
500 Frame::new(
501 RgbaImage::from_raw(width, height, argb_image)
502 .with_context(|| "Bug: not enough bytes allocated for image.")
503 .log_err()?,
504 ),
505 1,
506 ))))
507}
508
509#[cfg(target_os = "macos")]
510fn video_frame_buffer_to_webrtc(frame: ScreenCaptureFrame) -> Option<impl AsRef<dyn VideoBuffer>> {
511 use core_foundation::base::TCFType as _;
512
513 let pixel_buffer = frame.0.as_concrete_TypeRef();
514 std::mem::forget(frame.0);
515 unsafe {
516 Some(webrtc::video_frame::native::NativeBuffer::from_cv_pixel_buffer(pixel_buffer as _))
517 }
518}
519
520#[cfg(not(any(target_os = "macos", target_os = "windows")))]
521fn video_frame_buffer_to_webrtc(_frame: ScreenCaptureFrame) -> Option<impl AsRef<dyn VideoBuffer>> {
522 None as Option<Box<dyn VideoBuffer>>
523}
524
525trait DeviceChangeListenerApi: Stream<Item = ()> + Sized {
526 fn new(input: bool) -> Result<Self>;
527}
528
529#[cfg(target_os = "macos")]
530mod macos {
531
532 use coreaudio::sys::{
533 kAudioHardwarePropertyDefaultInputDevice, kAudioHardwarePropertyDefaultOutputDevice,
534 kAudioObjectPropertyElementMaster, kAudioObjectPropertyScopeGlobal,
535 kAudioObjectSystemObject, AudioObjectAddPropertyListener, AudioObjectID,
536 AudioObjectPropertyAddress, AudioObjectRemovePropertyListener, OSStatus,
537 };
538 use futures::{channel::mpsc::UnboundedReceiver, StreamExt};
539
540 use crate::DeviceChangeListenerApi;
541
542 /// Implementation from: https://github.com/zed-industries/cpal/blob/fd8bc2fd39f1f5fdee5a0690656caff9a26d9d50/src/host/coreaudio/macos/property_listener.rs#L15
543 pub struct CoreAudioDefaultDeviceChangeListener {
544 rx: UnboundedReceiver<()>,
545 callback: Box<PropertyListenerCallbackWrapper>,
546 input: bool,
547 }
548
549 trait _AssertSend: Send {}
550 impl _AssertSend for CoreAudioDefaultDeviceChangeListener {}
551
552 struct PropertyListenerCallbackWrapper(Box<dyn FnMut() + Send>);
553
554 unsafe extern "C" fn property_listener_handler_shim(
555 _: AudioObjectID,
556 _: u32,
557 _: *const AudioObjectPropertyAddress,
558 callback: *mut ::std::os::raw::c_void,
559 ) -> OSStatus {
560 let wrapper = callback as *mut PropertyListenerCallbackWrapper;
561 (*wrapper).0();
562 0
563 }
564
565 impl DeviceChangeListenerApi for CoreAudioDefaultDeviceChangeListener {
566 fn new(input: bool) -> gpui::Result<Self> {
567 let (tx, rx) = futures::channel::mpsc::unbounded();
568
569 let callback = Box::new(PropertyListenerCallbackWrapper(Box::new(move || {
570 tx.unbounded_send(()).ok();
571 })));
572
573 unsafe {
574 coreaudio::Error::from_os_status(AudioObjectAddPropertyListener(
575 kAudioObjectSystemObject,
576 &AudioObjectPropertyAddress {
577 mSelector: if input {
578 kAudioHardwarePropertyDefaultInputDevice
579 } else {
580 kAudioHardwarePropertyDefaultOutputDevice
581 },
582 mScope: kAudioObjectPropertyScopeGlobal,
583 mElement: kAudioObjectPropertyElementMaster,
584 },
585 Some(property_listener_handler_shim),
586 &*callback as *const _ as *mut _,
587 ))?;
588 }
589
590 Ok(Self {
591 rx,
592 callback,
593 input,
594 })
595 }
596 }
597
598 impl Drop for CoreAudioDefaultDeviceChangeListener {
599 fn drop(&mut self) {
600 unsafe {
601 AudioObjectRemovePropertyListener(
602 kAudioObjectSystemObject,
603 &AudioObjectPropertyAddress {
604 mSelector: if self.input {
605 kAudioHardwarePropertyDefaultInputDevice
606 } else {
607 kAudioHardwarePropertyDefaultOutputDevice
608 },
609 mScope: kAudioObjectPropertyScopeGlobal,
610 mElement: kAudioObjectPropertyElementMaster,
611 },
612 Some(property_listener_handler_shim),
613 &*self.callback as *const _ as *mut _,
614 );
615 }
616 }
617 }
618
619 impl futures::Stream for CoreAudioDefaultDeviceChangeListener {
620 type Item = ();
621
622 fn poll_next(
623 mut self: std::pin::Pin<&mut Self>,
624 cx: &mut std::task::Context<'_>,
625 ) -> std::task::Poll<Option<Self::Item>> {
626 self.rx.poll_next_unpin(cx)
627 }
628 }
629}
630
631#[cfg(target_os = "macos")]
632type DeviceChangeListener = macos::CoreAudioDefaultDeviceChangeListener;
633
634#[cfg(not(target_os = "macos"))]
635mod noop_change_listener {
636 use std::task::Poll;
637
638 use crate::DeviceChangeListenerApi;
639
640 pub struct NoopOutputDeviceChangelistener {}
641
642 impl DeviceChangeListenerApi for NoopOutputDeviceChangelistener {
643 fn new(_input: bool) -> anyhow::Result<Self> {
644 Ok(NoopOutputDeviceChangelistener {})
645 }
646 }
647
648 impl futures::Stream for NoopOutputDeviceChangelistener {
649 type Item = ();
650
651 fn poll_next(
652 self: std::pin::Pin<&mut Self>,
653 _cx: &mut std::task::Context<'_>,
654 ) -> Poll<Option<Self::Item>> {
655 Poll::Pending
656 }
657 }
658}
659
660#[cfg(not(target_os = "macos"))]
661type DeviceChangeListener = noop_change_listener::NoopOutputDeviceChangelistener;