1#![cfg_attr(all(target_os = "windows", target_env = "gnu"), allow(unused))]
2
3mod remote_video_track_view;
4#[cfg(any(
5 test,
6 feature = "test-support",
7 all(target_os = "windows", target_env = "gnu")
8))]
9pub mod test;
10
11use anyhow::{anyhow, Context as _, Result};
12use cpal::traits::{DeviceTrait, HostTrait, StreamTrait as _};
13use futures::{io, Stream, StreamExt as _};
14use gpui::{
15 BackgroundExecutor, ScreenCaptureFrame, ScreenCaptureSource, ScreenCaptureStream, Task,
16};
17use parking_lot::Mutex;
18use std::{borrow::Cow, collections::VecDeque, future::Future, pin::Pin, sync::Arc, thread};
19use util::{debug_panic, ResultExt as _};
20#[cfg(not(all(target_os = "windows", target_env = "gnu")))]
21use webrtc::{
22 audio_frame::AudioFrame,
23 audio_source::{native::NativeAudioSource, AudioSourceOptions, RtcAudioSource},
24 audio_stream::native::NativeAudioStream,
25 video_frame::{VideoBuffer, VideoFrame, VideoRotation},
26 video_source::{native::NativeVideoSource, RtcVideoSource, VideoResolution},
27 video_stream::native::NativeVideoStream,
28};
29
30#[cfg(all(
31 not(any(test, feature = "test-support")),
32 not(all(target_os = "windows", target_env = "gnu"))
33))]
34use livekit::track::RemoteAudioTrack;
35#[cfg(all(
36 not(any(test, feature = "test-support")),
37 not(all(target_os = "windows", target_env = "gnu"))
38))]
39pub use livekit::*;
40#[cfg(any(
41 test,
42 feature = "test-support",
43 all(target_os = "windows", target_env = "gnu")
44))]
45use test::track::RemoteAudioTrack;
46#[cfg(any(
47 test,
48 feature = "test-support",
49 all(target_os = "windows", target_env = "gnu")
50))]
51pub use test::*;
52
53pub use remote_video_track_view::{RemoteVideoTrackView, RemoteVideoTrackViewEvent};
54
55pub enum AudioStream {
56 Input {
57 _thread_handle: std::sync::mpsc::Sender<()>,
58 _transmit_task: Task<()>,
59 },
60 Output {
61 _task: Task<()>,
62 },
63}
64
65struct Dispatcher(Arc<dyn gpui::PlatformDispatcher>);
66
67#[cfg(not(all(target_os = "windows", target_env = "gnu")))]
68impl livekit::dispatcher::Dispatcher for Dispatcher {
69 fn dispatch(&self, runnable: livekit::dispatcher::Runnable) {
70 self.0.dispatch(runnable, None);
71 }
72
73 fn dispatch_after(
74 &self,
75 duration: std::time::Duration,
76 runnable: livekit::dispatcher::Runnable,
77 ) {
78 self.0.dispatch_after(duration, runnable);
79 }
80}
81
82struct HttpClientAdapter(Arc<dyn http_client::HttpClient>);
83
84fn http_2_status(status: http_client::http::StatusCode) -> http_2::StatusCode {
85 http_2::StatusCode::from_u16(status.as_u16())
86 .expect("valid status code to status code conversion")
87}
88
89#[cfg(not(all(target_os = "windows", target_env = "gnu")))]
90impl livekit::dispatcher::HttpClient for HttpClientAdapter {
91 fn get(
92 &self,
93 url: &str,
94 ) -> Pin<Box<dyn Future<Output = io::Result<livekit::dispatcher::Response>> + Send>> {
95 let http_client = self.0.clone();
96 let url = url.to_string();
97 Box::pin(async move {
98 let response = http_client
99 .get(&url, http_client::AsyncBody::empty(), false)
100 .await
101 .map_err(io::Error::other)?;
102 Ok(livekit::dispatcher::Response {
103 status: http_2_status(response.status()),
104 body: Box::pin(response.into_body()),
105 })
106 })
107 }
108
109 fn send_async(
110 &self,
111 request: http_2::Request<Vec<u8>>,
112 ) -> Pin<Box<dyn Future<Output = io::Result<livekit::dispatcher::Response>> + Send>> {
113 let http_client = self.0.clone();
114 let mut builder = http_client::http::Request::builder()
115 .method(request.method().as_str())
116 .uri(request.uri().to_string());
117
118 for (key, value) in request.headers().iter() {
119 builder = builder.header(key.as_str(), value.as_bytes());
120 }
121
122 if !request.extensions().is_empty() {
123 debug_panic!(
124 "Livekit sent an HTTP request with a protocol extension that Zed doesn't support!"
125 );
126 }
127
128 let request = builder
129 .body(http_client::AsyncBody::from_bytes(
130 request.into_body().into(),
131 ))
132 .unwrap();
133
134 Box::pin(async move {
135 let response = http_client.send(request).await.map_err(io::Error::other)?;
136 Ok(livekit::dispatcher::Response {
137 status: http_2_status(response.status()),
138 body: Box::pin(response.into_body()),
139 })
140 })
141 }
142}
143
144#[cfg(all(target_os = "windows", target_env = "gnu"))]
145pub fn init(
146 dispatcher: Arc<dyn gpui::PlatformDispatcher>,
147 http_client: Arc<dyn http_client::HttpClient>,
148) {
149}
150
151#[cfg(not(all(target_os = "windows", target_env = "gnu")))]
152pub fn init(
153 dispatcher: Arc<dyn gpui::PlatformDispatcher>,
154 http_client: Arc<dyn http_client::HttpClient>,
155) {
156 livekit::dispatcher::set_dispatcher(Dispatcher(dispatcher));
157 livekit::dispatcher::set_http_client(HttpClientAdapter(http_client));
158}
159
160#[cfg(not(all(target_os = "windows", target_env = "gnu")))]
161pub async fn capture_local_video_track(
162 capture_source: &dyn ScreenCaptureSource,
163) -> Result<(track::LocalVideoTrack, Box<dyn ScreenCaptureStream>)> {
164 let resolution = capture_source.resolution()?;
165 let track_source = NativeVideoSource::new(VideoResolution {
166 width: resolution.width.0 as u32,
167 height: resolution.height.0 as u32,
168 });
169
170 let capture_stream = capture_source
171 .stream({
172 let track_source = track_source.clone();
173 Box::new(move |frame| {
174 if let Some(buffer) = video_frame_buffer_to_webrtc(frame) {
175 track_source.capture_frame(&VideoFrame {
176 rotation: VideoRotation::VideoRotation0,
177 timestamp_us: 0,
178 buffer,
179 });
180 }
181 })
182 })
183 .await??;
184
185 Ok((
186 track::LocalVideoTrack::create_video_track(
187 "screen share",
188 RtcVideoSource::Native(track_source),
189 ),
190 capture_stream,
191 ))
192}
193
194#[cfg(not(all(target_os = "windows", target_env = "gnu")))]
195pub fn capture_local_audio_track(
196 background_executor: &BackgroundExecutor,
197) -> Result<Task<(track::LocalAudioTrack, AudioStream)>> {
198 use util::maybe;
199
200 let (frame_tx, mut frame_rx) = futures::channel::mpsc::unbounded();
201 let (thread_handle, thread_kill_rx) = std::sync::mpsc::channel::<()>();
202 let sample_rate;
203 let channels;
204
205 if cfg!(any(test, feature = "test-support")) {
206 sample_rate = 2;
207 channels = 1;
208 } else {
209 let (device, config) = default_device(true)?;
210 sample_rate = config.sample_rate().0;
211 channels = config.channels() as u32;
212 thread::spawn(move || {
213 maybe!({
214 if let Some(name) = device.name().ok() {
215 log::info!("Using microphone: {}", name)
216 } else {
217 log::info!("Using microphone: <unknown>");
218 }
219
220 let stream = device
221 .build_input_stream_raw(
222 &config.config(),
223 cpal::SampleFormat::I16,
224 move |data, _: &_| {
225 frame_tx
226 .unbounded_send(AudioFrame {
227 data: Cow::Owned(data.as_slice::<i16>().unwrap().to_vec()),
228 sample_rate,
229 num_channels: channels,
230 samples_per_channel: data.len() as u32 / channels,
231 })
232 .ok();
233 },
234 |err| log::error!("error capturing audio track: {:?}", err),
235 None,
236 )
237 .context("failed to build input stream")?;
238
239 stream.play()?;
240 // Keep the thread alive and holding onto the `stream`
241 thread_kill_rx.recv().ok();
242 anyhow::Ok(Some(()))
243 })
244 .log_err();
245 });
246 }
247
248 Ok(background_executor.spawn({
249 let background_executor = background_executor.clone();
250 async move {
251 let source = NativeAudioSource::new(
252 AudioSourceOptions {
253 echo_cancellation: true,
254 noise_suppression: true,
255 auto_gain_control: true,
256 },
257 sample_rate,
258 channels,
259 100,
260 );
261 let transmit_task = background_executor.spawn({
262 let source = source.clone();
263 async move {
264 while let Some(frame) = frame_rx.next().await {
265 source.capture_frame(&frame).await.log_err();
266 }
267 }
268 });
269
270 let track = track::LocalAudioTrack::create_audio_track(
271 "microphone",
272 RtcAudioSource::Native(source),
273 );
274
275 (
276 track,
277 AudioStream::Input {
278 _thread_handle: thread_handle,
279 _transmit_task: transmit_task,
280 },
281 )
282 }
283 }))
284}
285
286#[cfg(not(all(target_os = "windows", target_env = "gnu")))]
287pub fn play_remote_audio_track(
288 track: &RemoteAudioTrack,
289 background_executor: &BackgroundExecutor,
290) -> Result<AudioStream> {
291 let track = track.clone();
292 // We track device changes in our output because Livekit has a resampler built in,
293 // and it's easy to create a new native audio stream when the device changes.
294 if cfg!(any(test, feature = "test-support")) {
295 Ok(AudioStream::Output {
296 _task: background_executor.spawn(async {}),
297 })
298 } else {
299 let mut default_change_listener = DeviceChangeListener::new(false)?;
300 let (output_device, output_config) = default_device(false)?;
301
302 let _task = background_executor.spawn({
303 let background_executor = background_executor.clone();
304 async move {
305 let (mut _receive_task, mut _thread) =
306 start_output_stream(output_config, output_device, &track, &background_executor);
307
308 while let Some(_) = default_change_listener.next().await {
309 let Some((output_device, output_config)) = get_default_output().log_err()
310 else {
311 continue;
312 };
313
314 if let Ok(name) = output_device.name() {
315 log::info!("Using speaker: {}", name)
316 } else {
317 log::info!("Using speaker: <unknown>")
318 }
319
320 (_receive_task, _thread) = start_output_stream(
321 output_config,
322 output_device,
323 &track,
324 &background_executor,
325 );
326 }
327
328 futures::future::pending::<()>().await;
329 }
330 });
331
332 Ok(AudioStream::Output { _task })
333 }
334}
335
336fn default_device(input: bool) -> anyhow::Result<(cpal::Device, cpal::SupportedStreamConfig)> {
337 let device;
338 let config;
339 if input {
340 device = cpal::default_host()
341 .default_input_device()
342 .ok_or_else(|| anyhow!("no audio input device available"))?;
343 config = device
344 .default_input_config()
345 .context("failed to get default input config")?;
346 } else {
347 device = cpal::default_host()
348 .default_output_device()
349 .ok_or_else(|| anyhow!("no audio output device available"))?;
350 config = device
351 .default_output_config()
352 .context("failed to get default output config")?;
353 }
354 Ok((device, config))
355}
356
357#[cfg(not(all(target_os = "windows", target_env = "gnu")))]
358fn get_default_output() -> anyhow::Result<(cpal::Device, cpal::SupportedStreamConfig)> {
359 let host = cpal::default_host();
360 let output_device = host
361 .default_output_device()
362 .context("failed to read default output device")?;
363 let output_config = output_device.default_output_config()?;
364 Ok((output_device, output_config))
365}
366
367#[cfg(not(all(target_os = "windows", target_env = "gnu")))]
368fn start_output_stream(
369 output_config: cpal::SupportedStreamConfig,
370 output_device: cpal::Device,
371 track: &track::RemoteAudioTrack,
372 background_executor: &BackgroundExecutor,
373) -> (Task<()>, std::sync::mpsc::Sender<()>) {
374 let buffer = Arc::new(Mutex::new(VecDeque::<i16>::new()));
375 let sample_rate = output_config.sample_rate();
376
377 let mut stream = NativeAudioStream::new(
378 track.rtc_track(),
379 sample_rate.0 as i32,
380 output_config.channels() as i32,
381 );
382
383 let receive_task = background_executor.spawn({
384 let buffer = buffer.clone();
385 async move {
386 const MS_OF_BUFFER: u32 = 100;
387 const MS_IN_SEC: u32 = 1000;
388 while let Some(frame) = stream.next().await {
389 let frame_size = frame.samples_per_channel * frame.num_channels;
390 debug_assert!(frame.data.len() == frame_size as usize);
391
392 let buffer_size =
393 ((frame.sample_rate * frame.num_channels) / MS_IN_SEC * MS_OF_BUFFER) as usize;
394
395 let mut buffer = buffer.lock();
396 let new_size = buffer.len() + frame.data.len();
397 if new_size > buffer_size {
398 let overflow = new_size - buffer_size;
399 buffer.drain(0..overflow);
400 }
401
402 buffer.extend(frame.data.iter());
403 }
404 }
405 });
406
407 // The _output_stream needs to be on it's own thread because it's !Send
408 // and we experienced a deadlock when it's created on the main thread.
409 let (thread, end_on_drop_rx) = std::sync::mpsc::channel::<()>();
410 thread::spawn(move || {
411 if cfg!(any(test, feature = "test-support")) {
412 // Can't play audio in tests
413 return;
414 }
415
416 let output_stream = output_device.build_output_stream(
417 &output_config.config(),
418 {
419 let buffer = buffer.clone();
420 move |data, _info| {
421 let mut buffer = buffer.lock();
422 if buffer.len() < data.len() {
423 // Instead of partially filling a buffer, output silence. If a partial
424 // buffer was outputted then this could lead to a perpetual state of
425 // outputting partial buffers as it never gets filled enough for a full
426 // frame.
427 data.fill(0);
428 } else {
429 // SAFETY: We know that buffer has at least data.len() values in it.
430 // because we just checked
431 let mut drain = buffer.drain(..data.len());
432 data.fill_with(|| unsafe { drain.next().unwrap_unchecked() });
433 }
434 }
435 },
436 |error| log::error!("error playing audio track: {:?}", error),
437 None,
438 );
439
440 let Some(output_stream) = output_stream.log_err() else {
441 return;
442 };
443
444 output_stream.play().log_err();
445 // Block forever to keep the output stream alive
446 end_on_drop_rx.recv().ok();
447 });
448
449 (receive_task, thread)
450}
451
452#[cfg(all(target_os = "windows", target_env = "gnu"))]
453pub fn play_remote_video_track(
454 track: &track::RemoteVideoTrack,
455) -> impl Stream<Item = RemoteVideoFrame> {
456 futures::stream::empty()
457}
458
459#[cfg(not(all(target_os = "windows", target_env = "gnu")))]
460pub fn play_remote_video_track(
461 track: &track::RemoteVideoTrack,
462) -> impl Stream<Item = RemoteVideoFrame> {
463 NativeVideoStream::new(track.rtc_track())
464 .filter_map(|frame| async move { video_frame_buffer_from_webrtc(frame.buffer) })
465}
466
467#[cfg(target_os = "macos")]
468pub type RemoteVideoFrame = media::core_video::CVImageBuffer;
469
470#[cfg(target_os = "macos")]
471fn video_frame_buffer_from_webrtc(buffer: Box<dyn VideoBuffer>) -> Option<RemoteVideoFrame> {
472 use core_foundation::base::TCFType as _;
473 use media::core_video::CVImageBuffer;
474
475 let buffer = buffer.as_native()?;
476 let pixel_buffer = buffer.get_cv_pixel_buffer();
477 if pixel_buffer.is_null() {
478 return None;
479 }
480
481 unsafe { Some(CVImageBuffer::wrap_under_get_rule(pixel_buffer as _)) }
482}
483
484#[cfg(not(target_os = "macos"))]
485pub type RemoteVideoFrame = Arc<gpui::RenderImage>;
486
487#[cfg(not(any(target_os = "macos", all(target_os = "windows", target_env = "gnu"))))]
488fn video_frame_buffer_from_webrtc(buffer: Box<dyn VideoBuffer>) -> Option<RemoteVideoFrame> {
489 use gpui::RenderImage;
490 use image::{Frame, RgbaImage};
491 use livekit::webrtc::prelude::VideoFormatType;
492 use smallvec::SmallVec;
493 use std::alloc::{alloc, Layout};
494
495 let width = buffer.width();
496 let height = buffer.height();
497 let stride = width * 4;
498 let byte_len = (stride * height) as usize;
499 let argb_image = unsafe {
500 // Motivation for this unsafe code is to avoid initializing the frame data, since to_argb
501 // will write all bytes anyway.
502 let start_ptr = alloc(Layout::array::<u8>(byte_len).log_err()?);
503 if start_ptr.is_null() {
504 return None;
505 }
506 let bgra_frame_slice = std::slice::from_raw_parts_mut(start_ptr, byte_len);
507 buffer.to_argb(
508 VideoFormatType::ARGB, // For some reason, this displays correctly while RGBA (the correct format) does not
509 bgra_frame_slice,
510 stride,
511 width as i32,
512 height as i32,
513 );
514 Vec::from_raw_parts(start_ptr, byte_len, byte_len)
515 };
516
517 Some(Arc::new(RenderImage::new(SmallVec::from_elem(
518 Frame::new(
519 RgbaImage::from_raw(width, height, argb_image)
520 .with_context(|| "Bug: not enough bytes allocated for image.")
521 .log_err()?,
522 ),
523 1,
524 ))))
525}
526
527#[cfg(target_os = "macos")]
528fn video_frame_buffer_to_webrtc(frame: ScreenCaptureFrame) -> Option<impl AsRef<dyn VideoBuffer>> {
529 use core_foundation::base::TCFType as _;
530
531 let pixel_buffer = frame.0.as_concrete_TypeRef();
532 std::mem::forget(frame.0);
533 unsafe {
534 Some(webrtc::video_frame::native::NativeBuffer::from_cv_pixel_buffer(pixel_buffer as _))
535 }
536}
537
538#[cfg(not(any(target_os = "macos", all(target_os = "windows", target_env = "gnu"))))]
539fn video_frame_buffer_to_webrtc(_frame: ScreenCaptureFrame) -> Option<impl AsRef<dyn VideoBuffer>> {
540 None as Option<Box<dyn VideoBuffer>>
541}
542
543trait DeviceChangeListenerApi: Stream<Item = ()> + Sized {
544 fn new(input: bool) -> Result<Self>;
545}
546
547#[cfg(target_os = "macos")]
548mod macos {
549
550 use coreaudio::sys::{
551 kAudioHardwarePropertyDefaultInputDevice, kAudioHardwarePropertyDefaultOutputDevice,
552 kAudioObjectPropertyElementMaster, kAudioObjectPropertyScopeGlobal,
553 kAudioObjectSystemObject, AudioObjectAddPropertyListener, AudioObjectID,
554 AudioObjectPropertyAddress, AudioObjectRemovePropertyListener, OSStatus,
555 };
556 use futures::{channel::mpsc::UnboundedReceiver, StreamExt};
557
558 use crate::DeviceChangeListenerApi;
559
560 /// Implementation from: https://github.com/zed-industries/cpal/blob/fd8bc2fd39f1f5fdee5a0690656caff9a26d9d50/src/host/coreaudio/macos/property_listener.rs#L15
561 pub struct CoreAudioDefaultDeviceChangeListener {
562 rx: UnboundedReceiver<()>,
563 callback: Box<PropertyListenerCallbackWrapper>,
564 input: bool,
565 }
566
567 trait _AssertSend: Send {}
568 impl _AssertSend for CoreAudioDefaultDeviceChangeListener {}
569
570 struct PropertyListenerCallbackWrapper(Box<dyn FnMut() + Send>);
571
572 unsafe extern "C" fn property_listener_handler_shim(
573 _: AudioObjectID,
574 _: u32,
575 _: *const AudioObjectPropertyAddress,
576 callback: *mut ::std::os::raw::c_void,
577 ) -> OSStatus {
578 let wrapper = callback as *mut PropertyListenerCallbackWrapper;
579 (*wrapper).0();
580 0
581 }
582
583 impl DeviceChangeListenerApi for CoreAudioDefaultDeviceChangeListener {
584 fn new(input: bool) -> gpui::Result<Self> {
585 let (tx, rx) = futures::channel::mpsc::unbounded();
586
587 let callback = Box::new(PropertyListenerCallbackWrapper(Box::new(move || {
588 tx.unbounded_send(()).ok();
589 })));
590
591 unsafe {
592 coreaudio::Error::from_os_status(AudioObjectAddPropertyListener(
593 kAudioObjectSystemObject,
594 &AudioObjectPropertyAddress {
595 mSelector: if input {
596 kAudioHardwarePropertyDefaultInputDevice
597 } else {
598 kAudioHardwarePropertyDefaultOutputDevice
599 },
600 mScope: kAudioObjectPropertyScopeGlobal,
601 mElement: kAudioObjectPropertyElementMaster,
602 },
603 Some(property_listener_handler_shim),
604 &*callback as *const _ as *mut _,
605 ))?;
606 }
607
608 Ok(Self {
609 rx,
610 callback,
611 input,
612 })
613 }
614 }
615
616 impl Drop for CoreAudioDefaultDeviceChangeListener {
617 fn drop(&mut self) {
618 unsafe {
619 AudioObjectRemovePropertyListener(
620 kAudioObjectSystemObject,
621 &AudioObjectPropertyAddress {
622 mSelector: if self.input {
623 kAudioHardwarePropertyDefaultInputDevice
624 } else {
625 kAudioHardwarePropertyDefaultOutputDevice
626 },
627 mScope: kAudioObjectPropertyScopeGlobal,
628 mElement: kAudioObjectPropertyElementMaster,
629 },
630 Some(property_listener_handler_shim),
631 &*self.callback as *const _ as *mut _,
632 );
633 }
634 }
635 }
636
637 impl futures::Stream for CoreAudioDefaultDeviceChangeListener {
638 type Item = ();
639
640 fn poll_next(
641 mut self: std::pin::Pin<&mut Self>,
642 cx: &mut std::task::Context<'_>,
643 ) -> std::task::Poll<Option<Self::Item>> {
644 self.rx.poll_next_unpin(cx)
645 }
646 }
647}
648
649#[cfg(target_os = "macos")]
650type DeviceChangeListener = macos::CoreAudioDefaultDeviceChangeListener;
651
652#[cfg(not(target_os = "macos"))]
653mod noop_change_listener {
654 use std::task::Poll;
655
656 use crate::DeviceChangeListenerApi;
657
658 pub struct NoopOutputDeviceChangelistener {}
659
660 impl DeviceChangeListenerApi for NoopOutputDeviceChangelistener {
661 fn new(_input: bool) -> anyhow::Result<Self> {
662 Ok(NoopOutputDeviceChangelistener {})
663 }
664 }
665
666 impl futures::Stream for NoopOutputDeviceChangelistener {
667 type Item = ();
668
669 fn poll_next(
670 self: std::pin::Pin<&mut Self>,
671 _cx: &mut std::task::Context<'_>,
672 ) -> Poll<Option<Self::Item>> {
673 Poll::Pending
674 }
675 }
676}
677
678#[cfg(not(target_os = "macos"))]
679type DeviceChangeListener = noop_change_listener::NoopOutputDeviceChangelistener;