From 702b5ddf4b9921c2572b857ffdc697652295a09e Mon Sep 17 00:00:00 2001 From: Jakub Konka Date: Fri, 20 Mar 2026 15:04:21 +0100 Subject: [PATCH] livekit: Remove obsolete separate thread for frame feeder (#51993) Release Notes: - N/A --- Cargo.lock | 1 + crates/livekit_client/Cargo.toml | 3 + .../src/livekit_client/linux.rs | 217 ++++++++---------- 3 files changed, 104 insertions(+), 117 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index f8724ee07361e095605297ebcac7bc4133102b8c..21893b57542098c6166cc4a822429eb4df902702 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -10001,6 +10001,7 @@ dependencies = [ "settings", "simplelog", "smallvec", + "tokio", "ui", "util", "zed-scap", diff --git a/crates/livekit_client/Cargo.toml b/crates/livekit_client/Cargo.toml index df1024aa99e15e322c7dff5ee7933db2a9df80b4..d4a238fc15997d833df65ac1be459763be6ec782 100644 --- a/crates/livekit_client/Cargo.toml +++ b/crates/livekit_client/Cargo.toml @@ -47,6 +47,9 @@ util.workspace = true libwebrtc.workspace = true livekit.workspace = true +[target.'cfg(target_os = "linux")'.dependencies] +tokio = { workspace = true, features = ["time"] } + [target.'cfg(any(target_os = "linux", target_os = "freebsd", target_os = "windows"))'.dependencies] scap.workspace = true diff --git a/crates/livekit_client/src/livekit_client/linux.rs b/crates/livekit_client/src/livekit_client/linux.rs index 6c6768980181c3abb2137417e94a64f4c8e2efc1..e7bfa7b2ca631636233586cb902b36bac93c9be1 100644 --- a/crates/livekit_client/src/livekit_client/linux.rs +++ b/crates/livekit_client/src/livekit_client/linux.rs @@ -4,15 +4,13 @@ use futures::channel::oneshot; use gpui::{AsyncApp, ScreenCaptureStream}; use livekit::track; use livekit::webrtc::{ + prelude::NV12Buffer, video_frame::{VideoFrame, VideoRotation}, video_source::{RtcVideoSource, VideoResolution, native::NativeVideoSource}, }; -use std::{ - sync::{ - Arc, - atomic::{AtomicBool, AtomicU64, Ordering}, - }, - time::Duration, +use std::sync::{ + Arc, + atomic::{AtomicBool, AtomicU64, Ordering}, }; static NEXT_WAYLAND_SHARE_ID: AtomicU64 = AtomicU64::new(1); @@ -20,15 +18,15 @@ static NEXT_WAYLAND_SHARE_ID: AtomicU64 = AtomicU64::new(1); pub struct WaylandScreenCaptureStream { id: u64, stop_flag: Arc, - _feed_task: gpui::Task<()>, + _capture_task: gpui::Task<()>, } impl WaylandScreenCaptureStream { - pub fn new(stop_flag: Arc, feed_task: gpui::Task<()>) -> Self { + pub fn new(stop_flag: Arc, capture_task: gpui::Task<()>) -> Self { Self { id: NEXT_WAYLAND_SHARE_ID.fetch_add(1, Ordering::Relaxed), stop_flag, - _feed_task: feed_task, + _capture_task: capture_task, } } } @@ -46,37 +44,10 @@ impl ScreenCaptureStream for WaylandScreenCaptureStream { impl Drop for WaylandScreenCaptureStream { fn drop(&mut self) { - self.stop_flag.store(true, Ordering::Relaxed); + self.stop_flag.store(true, Ordering::Release); } } -struct CapturedFrame { - width: u32, - height: u32, - stride: u32, - data: Vec, -} - -fn desktop_frame_to_nv12(frame: &CapturedFrame) -> livekit::webrtc::prelude::NV12Buffer { - use libwebrtc::native::yuv_helper::argb_to_nv12; - use livekit::webrtc::prelude::NV12Buffer; - - let mut buffer = NV12Buffer::new(frame.width, frame.height); - let (stride_y, stride_uv) = buffer.strides(); - let (data_y, data_uv) = buffer.data_mut(); - argb_to_nv12( - &frame.data, - frame.stride, - data_y, - stride_y, - data_uv, - stride_uv, - frame.width as i32, - frame.height as i32, - ); - buffer -} - pub(crate) async fn start_wayland_desktop_capture( cx: &mut AsyncApp, ) -> Result<( @@ -89,62 +60,107 @@ pub(crate) async fn start_wayland_desktop_capture( use gpui::FutureExt as _; use libwebrtc::desktop_capturer::{ CaptureError, DesktopCaptureSourceType, DesktopCapturer, DesktopCapturerOptions, + DesktopFrame, }; + use libwebrtc::native::yuv_helper::argb_to_nv12; + use std::time::Duration; - let (frame_tx, mut frame_rx) = mpsc::channel::(2); let stop_flag = Arc::new(AtomicBool::new(false)); - let stop = stop_flag.clone(); - - let permanent_error = Arc::new(AtomicBool::new(false)); - let permanent_error_cb = permanent_error.clone(); + let (mut video_source_tx, mut video_source_rx) = mpsc::channel::(1); + let (failure_tx, failure_rx) = oneshot::channel::<()>(); - let executor = cx.background_executor().clone(); + let mut options = DesktopCapturerOptions::new(DesktopCaptureSourceType::Generic); + options.set_include_cursor(true); + let mut capturer = DesktopCapturer::new(options).ok_or_else(|| { + anyhow::anyhow!( + "Failed to create desktop capturer. \ + Check that xdg-desktop-portal is installed and running." + ) + })?; - let capture_executor = executor.clone(); - executor - .spawn(async move { - let mut options = DesktopCapturerOptions::new(DesktopCaptureSourceType::Generic); - options.set_include_cursor(true); - - let Some(mut capturer) = DesktopCapturer::new(options) else { - log::error!( - "Failed to create Wayland desktop capturer. Is xdg-desktop-portal running?" - ); - return; - }; + let permanent_error = Arc::new(AtomicBool::new(false)); - let frame_tx_cb = parking_lot::Mutex::new(frame_tx.clone()); - capturer.start_capture(None, move |result| match result { - Ok(frame) => { - let captured = CapturedFrame { - width: frame.width() as u32, - height: frame.height() as u32, - stride: frame.stride(), - data: frame.data().to_vec(), - }; - frame_tx_cb.lock().try_send(captured).ok(); - } - Err(CaptureError::Temporary) => { - // Expected before the portal picker completes - } + let stop_cb = stop_flag.clone(); + let permanent_error_cb = permanent_error.clone(); + capturer.start_capture(None, { + let mut video_source: Option = None; + let mut current_width: u32 = 0; + let mut current_height: u32 = 0; + let mut video_frame = VideoFrame { + rotation: VideoRotation::VideoRotation0, + buffer: NV12Buffer::new(1, 1), + timestamp_us: 0, + }; + + move |result: Result| { + let frame = match result { + Ok(frame) => frame, + Err(CaptureError::Temporary) => return, Err(CaptureError::Permanent) => { - permanent_error_cb.store(true, Ordering::Relaxed); log::error!("Wayland desktop capture encountered a permanent error"); + permanent_error_cb.store(true, Ordering::Release); + stop_cb.store(true, Ordering::Release); + return; } - }); + }; - while !stop.load(Ordering::Relaxed) { - capturer.capture_frame(); - if permanent_error.load(Ordering::Relaxed) { - break; - } - capture_executor.timer(Duration::from_millis(33)).await; + let width = frame.width() as u32; + let height = frame.height() as u32; + if width != current_width || height != current_height { + current_width = width; + current_height = height; + video_frame.buffer = NV12Buffer::new(width, height); } - drop(frame_tx); - }) - .detach(); - let first_frame = frame_rx + let (stride_y, stride_uv) = video_frame.buffer.strides(); + let (data_y, data_uv) = video_frame.buffer.data_mut(); + argb_to_nv12( + frame.data(), + frame.stride(), + data_y, + stride_y, + data_uv, + stride_uv, + width as i32, + height as i32, + ); + + if let Some(source) = &video_source { + source.capture_frame(&video_frame); + } else { + let source = NativeVideoSource::new(VideoResolution { width, height }, true); + source.capture_frame(&video_frame); + video_source_tx.try_send(source.clone()).ok(); + video_source = Some(source); + } + } + }); + + let stop = stop_flag.clone(); + let tokio_task = gpui_tokio::Tokio::spawn(cx, async move { + loop { + if stop.load(Ordering::Acquire) { + break; + } + capturer.capture_frame(); + tokio::time::sleep(Duration::from_millis(33)).await; + } + drop(capturer); + + if permanent_error.load(Ordering::Acquire) { + log::error!("Wayland screen capture ended due to a permanent capture error"); + let _ = failure_tx.send(()); + } + }); + + let capture_task = cx.background_executor().spawn(async move { + if let Err(error) = tokio_task.await { + log::error!("Wayland capture task failed: {error}"); + } + }); + + let executor = cx.background_executor().clone(); + let video_source = video_source_rx .next() .with_timeout(Duration::from_secs(15), &executor) .await @@ -157,50 +173,17 @@ pub(crate) async fn start_wayland_desktop_capture( ) })? .ok_or_else(|| { + stop_flag.store(true, Ordering::Relaxed); anyhow::anyhow!( "Screen sharing was canceled or the portal denied permission. \ You can try again from the screen share button." ) })?; - let width = first_frame.width; - let height = first_frame.height; - let video_source = gpui_tokio::Tokio::spawn(cx, async move { - NativeVideoSource::new(VideoResolution { width, height }, true) - }) - .await?; - - let nv12 = desktop_frame_to_nv12(&first_frame); - video_source.capture_frame(&VideoFrame { - rotation: VideoRotation::VideoRotation0, - timestamp_us: 0, - buffer: nv12, - }); - let track = super::LocalVideoTrack(track::LocalVideoTrack::create_video_track( "screen share", - RtcVideoSource::Native(video_source.clone()), + RtcVideoSource::Native(video_source), )); - let (failure_tx, failure_rx) = oneshot::channel::<()>(); - let feed_stop = stop_flag.clone(); - let feed_task = cx.background_executor().spawn(async move { - while let Some(frame) = frame_rx.next().await { - if feed_stop.load(Ordering::Relaxed) { - break; - } - let nv12 = desktop_frame_to_nv12(&frame); - video_source.capture_frame(&VideoFrame { - rotation: VideoRotation::VideoRotation0, - timestamp_us: 0, - buffer: nv12, - }); - } - if !feed_stop.load(Ordering::Relaxed) { - log::error!("Wayland screen capture ended unexpectedly"); - let _ = failure_tx.send(()); - } - }); - - Ok((track, stop_flag, feed_task, failure_rx)) + Ok((track, stop_flag, capture_task, failure_rx)) }