livekit: Remove obsolete separate thread for frame feeder (#51993)

Jakub Konka created

Release Notes:

- N/A

Change summary

Cargo.lock                                        |   1 
crates/livekit_client/Cargo.toml                  |   3 
crates/livekit_client/src/livekit_client/linux.rs | 217 +++++++---------
3 files changed, 104 insertions(+), 117 deletions(-)

Detailed changes

Cargo.lock 🔗

@@ -10001,6 +10001,7 @@ dependencies = [
  "settings",
  "simplelog",
  "smallvec",
+ "tokio",
  "ui",
  "util",
  "zed-scap",

crates/livekit_client/Cargo.toml 🔗

@@ -47,6 +47,9 @@ util.workspace = true
 libwebrtc.workspace = true
 livekit.workspace = true
 
+[target.'cfg(target_os = "linux")'.dependencies]
+tokio = { workspace = true, features = ["time"] }
+
 [target.'cfg(any(target_os = "linux", target_os = "freebsd", target_os = "windows"))'.dependencies]
 scap.workspace = true
 

crates/livekit_client/src/livekit_client/linux.rs 🔗

@@ -4,15 +4,13 @@ use futures::channel::oneshot;
 use gpui::{AsyncApp, ScreenCaptureStream};
 use livekit::track;
 use livekit::webrtc::{
+    prelude::NV12Buffer,
     video_frame::{VideoFrame, VideoRotation},
     video_source::{RtcVideoSource, VideoResolution, native::NativeVideoSource},
 };
-use std::{
-    sync::{
-        Arc,
-        atomic::{AtomicBool, AtomicU64, Ordering},
-    },
-    time::Duration,
+use std::sync::{
+    Arc,
+    atomic::{AtomicBool, AtomicU64, Ordering},
 };
 
 static NEXT_WAYLAND_SHARE_ID: AtomicU64 = AtomicU64::new(1);
@@ -20,15 +18,15 @@ static NEXT_WAYLAND_SHARE_ID: AtomicU64 = AtomicU64::new(1);
 pub struct WaylandScreenCaptureStream {
     id: u64,
     stop_flag: Arc<AtomicBool>,
-    _feed_task: gpui::Task<()>,
+    _capture_task: gpui::Task<()>,
 }
 
 impl WaylandScreenCaptureStream {
-    pub fn new(stop_flag: Arc<AtomicBool>, feed_task: gpui::Task<()>) -> Self {
+    pub fn new(stop_flag: Arc<AtomicBool>, capture_task: gpui::Task<()>) -> Self {
         Self {
             id: NEXT_WAYLAND_SHARE_ID.fetch_add(1, Ordering::Relaxed),
             stop_flag,
-            _feed_task: feed_task,
+            _capture_task: capture_task,
         }
     }
 }
@@ -46,37 +44,10 @@ impl ScreenCaptureStream for WaylandScreenCaptureStream {
 
 impl Drop for WaylandScreenCaptureStream {
     fn drop(&mut self) {
-        self.stop_flag.store(true, Ordering::Relaxed);
+        self.stop_flag.store(true, Ordering::Release);
     }
 }
 
-struct CapturedFrame {
-    width: u32,
-    height: u32,
-    stride: u32,
-    data: Vec<u8>,
-}
-
-fn desktop_frame_to_nv12(frame: &CapturedFrame) -> livekit::webrtc::prelude::NV12Buffer {
-    use libwebrtc::native::yuv_helper::argb_to_nv12;
-    use livekit::webrtc::prelude::NV12Buffer;
-
-    let mut buffer = NV12Buffer::new(frame.width, frame.height);
-    let (stride_y, stride_uv) = buffer.strides();
-    let (data_y, data_uv) = buffer.data_mut();
-    argb_to_nv12(
-        &frame.data,
-        frame.stride,
-        data_y,
-        stride_y,
-        data_uv,
-        stride_uv,
-        frame.width as i32,
-        frame.height as i32,
-    );
-    buffer
-}
-
 pub(crate) async fn start_wayland_desktop_capture(
     cx: &mut AsyncApp,
 ) -> Result<(
@@ -89,62 +60,107 @@ pub(crate) async fn start_wayland_desktop_capture(
     use gpui::FutureExt as _;
     use libwebrtc::desktop_capturer::{
         CaptureError, DesktopCaptureSourceType, DesktopCapturer, DesktopCapturerOptions,
+        DesktopFrame,
     };
+    use libwebrtc::native::yuv_helper::argb_to_nv12;
+    use std::time::Duration;
 
-    let (frame_tx, mut frame_rx) = mpsc::channel::<CapturedFrame>(2);
     let stop_flag = Arc::new(AtomicBool::new(false));
-    let stop = stop_flag.clone();
-
-    let permanent_error = Arc::new(AtomicBool::new(false));
-    let permanent_error_cb = permanent_error.clone();
+    let (mut video_source_tx, mut video_source_rx) = mpsc::channel::<NativeVideoSource>(1);
+    let (failure_tx, failure_rx) = oneshot::channel::<()>();
 
-    let executor = cx.background_executor().clone();
+    let mut options = DesktopCapturerOptions::new(DesktopCaptureSourceType::Generic);
+    options.set_include_cursor(true);
+    let mut capturer = DesktopCapturer::new(options).ok_or_else(|| {
+        anyhow::anyhow!(
+            "Failed to create desktop capturer. \
+             Check that xdg-desktop-portal is installed and running."
+        )
+    })?;
 
-    let capture_executor = executor.clone();
-    executor
-        .spawn(async move {
-            let mut options = DesktopCapturerOptions::new(DesktopCaptureSourceType::Generic);
-            options.set_include_cursor(true);
-
-            let Some(mut capturer) = DesktopCapturer::new(options) else {
-                log::error!(
-                    "Failed to create Wayland desktop capturer. Is xdg-desktop-portal running?"
-                );
-                return;
-            };
+    let permanent_error = Arc::new(AtomicBool::new(false));
 
-            let frame_tx_cb = parking_lot::Mutex::new(frame_tx.clone());
-            capturer.start_capture(None, move |result| match result {
-                Ok(frame) => {
-                    let captured = CapturedFrame {
-                        width: frame.width() as u32,
-                        height: frame.height() as u32,
-                        stride: frame.stride(),
-                        data: frame.data().to_vec(),
-                    };
-                    frame_tx_cb.lock().try_send(captured).ok();
-                }
-                Err(CaptureError::Temporary) => {
-                    // Expected before the portal picker completes
-                }
+    let stop_cb = stop_flag.clone();
+    let permanent_error_cb = permanent_error.clone();
+    capturer.start_capture(None, {
+        let mut video_source: Option<NativeVideoSource> = None;
+        let mut current_width: u32 = 0;
+        let mut current_height: u32 = 0;
+        let mut video_frame = VideoFrame {
+            rotation: VideoRotation::VideoRotation0,
+            buffer: NV12Buffer::new(1, 1),
+            timestamp_us: 0,
+        };
+
+        move |result: Result<DesktopFrame, CaptureError>| {
+            let frame = match result {
+                Ok(frame) => frame,
+                Err(CaptureError::Temporary) => return,
                 Err(CaptureError::Permanent) => {
-                    permanent_error_cb.store(true, Ordering::Relaxed);
                     log::error!("Wayland desktop capture encountered a permanent error");
+                    permanent_error_cb.store(true, Ordering::Release);
+                    stop_cb.store(true, Ordering::Release);
+                    return;
                 }
-            });
+            };
 
-            while !stop.load(Ordering::Relaxed) {
-                capturer.capture_frame();
-                if permanent_error.load(Ordering::Relaxed) {
-                    break;
-                }
-                capture_executor.timer(Duration::from_millis(33)).await;
+            let width = frame.width() as u32;
+            let height = frame.height() as u32;
+            if width != current_width || height != current_height {
+                current_width = width;
+                current_height = height;
+                video_frame.buffer = NV12Buffer::new(width, height);
             }
 
-            drop(frame_tx);
-        })
-        .detach();
-    let first_frame = frame_rx
+            let (stride_y, stride_uv) = video_frame.buffer.strides();
+            let (data_y, data_uv) = video_frame.buffer.data_mut();
+            argb_to_nv12(
+                frame.data(),
+                frame.stride(),
+                data_y,
+                stride_y,
+                data_uv,
+                stride_uv,
+                width as i32,
+                height as i32,
+            );
+
+            if let Some(source) = &video_source {
+                source.capture_frame(&video_frame);
+            } else {
+                let source = NativeVideoSource::new(VideoResolution { width, height }, true);
+                source.capture_frame(&video_frame);
+                video_source_tx.try_send(source.clone()).ok();
+                video_source = Some(source);
+            }
+        }
+    });
+
+    let stop = stop_flag.clone();
+    let tokio_task = gpui_tokio::Tokio::spawn(cx, async move {
+        loop {
+            if stop.load(Ordering::Acquire) {
+                break;
+            }
+            capturer.capture_frame();
+            tokio::time::sleep(Duration::from_millis(33)).await;
+        }
+        drop(capturer);
+
+        if permanent_error.load(Ordering::Acquire) {
+            log::error!("Wayland screen capture ended due to a permanent capture error");
+            let _ = failure_tx.send(());
+        }
+    });
+
+    let capture_task = cx.background_executor().spawn(async move {
+        if let Err(error) = tokio_task.await {
+            log::error!("Wayland capture task failed: {error}");
+        }
+    });
+
+    let executor = cx.background_executor().clone();
+    let video_source = video_source_rx
         .next()
         .with_timeout(Duration::from_secs(15), &executor)
         .await
@@ -157,50 +173,17 @@ pub(crate) async fn start_wayland_desktop_capture(
             )
         })?
         .ok_or_else(|| {
+            stop_flag.store(true, Ordering::Relaxed);
             anyhow::anyhow!(
                 "Screen sharing was canceled or the portal denied permission. \
                  You can try again from the screen share button."
             )
         })?;
 
-    let width = first_frame.width;
-    let height = first_frame.height;
-    let video_source = gpui_tokio::Tokio::spawn(cx, async move {
-        NativeVideoSource::new(VideoResolution { width, height }, true)
-    })
-    .await?;
-
-    let nv12 = desktop_frame_to_nv12(&first_frame);
-    video_source.capture_frame(&VideoFrame {
-        rotation: VideoRotation::VideoRotation0,
-        timestamp_us: 0,
-        buffer: nv12,
-    });
-
     let track = super::LocalVideoTrack(track::LocalVideoTrack::create_video_track(
         "screen share",
-        RtcVideoSource::Native(video_source.clone()),
+        RtcVideoSource::Native(video_source),
     ));
 
-    let (failure_tx, failure_rx) = oneshot::channel::<()>();
-    let feed_stop = stop_flag.clone();
-    let feed_task = cx.background_executor().spawn(async move {
-        while let Some(frame) = frame_rx.next().await {
-            if feed_stop.load(Ordering::Relaxed) {
-                break;
-            }
-            let nv12 = desktop_frame_to_nv12(&frame);
-            video_source.capture_frame(&VideoFrame {
-                rotation: VideoRotation::VideoRotation0,
-                timestamp_us: 0,
-                buffer: nv12,
-            });
-        }
-        if !feed_stop.load(Ordering::Relaxed) {
-            log::error!("Wayland screen capture ended unexpectedly");
-            let _ = failure_tx.send(());
-        }
-    });
-
-    Ok((track, stop_flag, feed_task, failure_rx))
+    Ok((track, stop_flag, capture_task, failure_rx))
 }