WIP: render screen-sharing frames

Antonio Scandurra created

Change summary

Cargo.lock                                                              |  2 
crates/capture/src/main.rs                                              | 92 
crates/gpui/src/platform/mac/renderer.rs                                |  2 
crates/live_kit/Cargo.toml                                              |  3 
crates/live_kit/LiveKitBridge/Package.resolved                          |  3 
crates/live_kit/LiveKitBridge/Package.swift                             |  2 
crates/live_kit/LiveKitBridge/Sources/LiveKitBridge/LiveKitBridge.swift | 56 
crates/live_kit/src/live_kit.rs                                         | 79 
8 files changed, 178 insertions(+), 61 deletions(-)

Detailed changes

Cargo.lock 🔗

@@ -2944,6 +2944,8 @@ dependencies = [
  "core-foundation",
  "core-graphics",
  "futures",
+ "media",
+ "parking_lot 0.11.2",
  "serde",
  "serde_json",
 ]

crates/capture/src/main.rs 🔗

@@ -1,7 +1,6 @@
 mod live_kit_token;
 
-use std::time::Duration;
-
+use futures::StreamExt;
 use gpui::{
     actions,
     elements::{Canvas, *},
@@ -13,6 +12,7 @@ use live_kit::{LocalVideoTrack, Room};
 use log::LevelFilter;
 use media::core_video::CVImageBuffer;
 use simplelog::SimpleLogger;
+use std::sync::Arc;
 
 actions!(capture, [Quit]);
 
@@ -36,47 +36,46 @@ fn main() {
         let live_kit_key = std::env::var("LIVE_KIT_KEY").unwrap();
         let live_kit_secret = std::env::var("LIVE_KIT_SECRET").unwrap();
 
-        let background = cx.background().clone();
-        cx.foreground()
-            .spawn(async move {
-                println!("connecting...");
-                let user1_token = live_kit_token::create_token(
-                    &live_kit_key,
-                    &live_kit_secret,
-                    "test-room",
-                    "test-participant-1",
-                )
-                .unwrap();
-                let room1 = Room::new("user-1 room");
-                room1.connect(&live_kit_url, &user1_token).await.unwrap();
-
-                let user2_token = live_kit_token::create_token(
-                    &live_kit_key,
-                    &live_kit_secret,
-                    "test-room",
-                    "test-participant-2",
-                )
+        cx.spawn(|mut cx| async move {
+            let user1_token = live_kit_token::create_token(
+                &live_kit_key,
+                &live_kit_secret,
+                "test-room",
+                "test-participant-1",
+            )
+            .unwrap();
+            let room1 = Room::new();
+            room1.connect(&live_kit_url, &user1_token).await.unwrap();
+
+            let user2_token = live_kit_token::create_token(
+                &live_kit_key,
+                &live_kit_secret,
+                "test-room",
+                "test-participant-2",
+            )
+            .unwrap();
+            let room2 = Room::new();
+            room2.connect(&live_kit_url, &user2_token).await.unwrap();
+            cx.add_window(Default::default(), |cx| ScreenCaptureView::new(room2, cx));
+
+            let windows = live_kit::list_windows();
+            let window = windows
+                .iter()
+                .find(|w| w.owner_name.as_deref() == Some("Safari"))
                 .unwrap();
-                let room2 = Room::new("user-2 room");
-                room2.connect(&live_kit_url, &user2_token).await.unwrap();
-
-                let windows = live_kit::list_windows();
-                println!("connected! {:?}", windows);
-
-                let window_id = windows.iter().next().unwrap().id;
-                let track = LocalVideoTrack::screen_share_for_window(window_id);
-                room1.publish_video_track(&track).await.unwrap();
-
-                background.timer(Duration::from_secs(120)).await;
-            })
-            .detach();
+            let track = LocalVideoTrack::screen_share_for_window(window.id);
+            room1.publish_video_track(&track).await.unwrap();
 
-        // cx.add_window(Default::default(), |cx| ScreenCaptureView::new(cx));
+            std::mem::forget(track);
+            std::mem::forget(room1);
+        })
+        .detach();
     });
 }
 
 struct ScreenCaptureView {
     image_buffer: Option<CVImageBuffer>,
+    _room: Arc<Room>,
 }
 
 impl gpui::Entity for ScreenCaptureView {
@@ -84,8 +83,25 @@ impl gpui::Entity for ScreenCaptureView {
 }
 
 impl ScreenCaptureView {
-    pub fn new(_: &mut ViewContext<Self>) -> Self {
-        Self { image_buffer: None }
+    pub fn new(room: Arc<Room>, cx: &mut ViewContext<Self>) -> Self {
+        let mut remote_video_tracks = room.remote_video_tracks();
+        cx.spawn_weak(|this, mut cx| async move {
+            if let Some(video_track) = remote_video_tracks.next().await {
+                video_track.add_renderer(move |frame| {
+                    if let Some(this) = this.upgrade(&cx) {
+                        this.update(&mut cx, |this, cx| {
+                            this.image_buffer = Some(frame);
+                            cx.notify();
+                        });
+                    }
+                });
+            }
+        })
+        .detach();
+        Self {
+            image_buffer: None,
+            _room: room,
+        }
     }
 }
 

crates/live_kit/Cargo.toml 🔗

@@ -9,10 +9,13 @@ path = "src/live_kit.rs"
 doctest = false
 
 [dependencies]
+media = { path = "../media" }
+
 anyhow = "1.0.38"
 core-foundation = "0.9.3"
 core-graphics = "0.22.3"
 futures = "0.3"
+parking_lot = "0.11.1"
 
 [build-dependencies]
 serde = { version = "1.0", features = ["derive", "rc"] }

crates/live_kit/LiveKitBridge/Package.resolved 🔗

@@ -5,8 +5,7 @@
       "kind" : "remoteSourceControl",
       "location" : "https://github.com/livekit/client-sdk-swift.git",
       "state" : {
-        "revision" : "7e7decf3a09de4a169dfc0445a14d9fd2d8db58d",
-        "version" : "1.0.4"
+        "revision" : "5cc3c001779ab147199ce3ea0dce465b846368b4"
       }
     },
     {

crates/live_kit/LiveKitBridge/Package.swift 🔗

@@ -15,7 +15,7 @@ let package = Package(
             targets: ["LiveKitBridge"]),
     ],
     dependencies: [
-        .package(url: "https://github.com/livekit/client-sdk-swift.git", from: "1.0.0"),
+        .package(url: "https://github.com/livekit/client-sdk-swift.git", revision: "5cc3c001779ab147199ce3ea0dce465b846368b4"),
     ],
     targets: [
         // Targets are the basic building blocks of a package. A target can define a module or a test suite.

crates/live_kit/LiveKitBridge/Sources/LiveKitBridge/LiveKitBridge.swift 🔗

@@ -1,17 +1,49 @@
 import Foundation
 import LiveKit
+import WebRTC
 
 class LKRoomDelegate: RoomDelegate {
     var data: UnsafeRawPointer
-    var onDidSubscribeToRemoteTrack: @convention(c) (UnsafeRawPointer, UnsafeRawPointer) -> Void
+    var onDidSubscribeToRemoteVideoTrack: @convention(c) (UnsafeRawPointer, UnsafeRawPointer) -> Void
     
-    init(data: UnsafeRawPointer, onDidSubscribeToRemoteTrack: @escaping @convention(c) (UnsafeRawPointer, UnsafeRawPointer) -> Void) {
+    init(data: UnsafeRawPointer, onDidSubscribeToRemoteVideoTrack: @escaping @convention(c) (UnsafeRawPointer, UnsafeRawPointer) -> Void) {
         self.data = data
-        self.onDidSubscribeToRemoteTrack = onDidSubscribeToRemoteTrack
+        self.onDidSubscribeToRemoteVideoTrack = onDidSubscribeToRemoteVideoTrack
     }
     
     func room(_ room: Room, participant: RemoteParticipant, didSubscribe publication: RemoteTrackPublication, track: Track) {
-        self.onDidSubscribeToRemoteTrack(self.data, Unmanaged.passRetained(track).toOpaque())
+        if track.kind == .video {
+            self.onDidSubscribeToRemoteVideoTrack(self.data, Unmanaged.passRetained(track).toOpaque())
+        }
+    }
+}
+
+class LKVideoRenderer: NSObject, VideoRenderer {
+    var data: UnsafeRawPointer
+    var onFrame: @convention(c) (UnsafeRawPointer, CVPixelBuffer) -> Void
+    var onDrop: @convention(c) (UnsafeRawPointer) -> Void
+    var adaptiveStreamIsEnabled: Bool = false
+    var adaptiveStreamSize: CGSize = .zero
+
+    init(data: UnsafeRawPointer, onFrame: @escaping @convention(c) (UnsafeRawPointer, CVPixelBuffer) -> Void, onDrop: @escaping @convention(c) (UnsafeRawPointer) -> Void) {
+        self.data = data
+        self.onFrame = onFrame
+        self.onDrop = onDrop
+    }
+
+    deinit {
+        self.onDrop(self.data)
+    }
+
+    func setSize(_ size: CGSize) {
+        print("Called setSize", size);
+    }
+
+    func renderFrame(_ frame: RTCVideoFrame?) {
+        let buffer = frame?.buffer as? RTCCVPixelBuffer
+        if let pixelBuffer = buffer?.pixelBuffer {
+            self.onFrame(self.data, pixelBuffer)
+        }
     }
 }
 
@@ -21,8 +53,8 @@ public func LKRelease(ptr: UnsafeRawPointer)  {
 }
 
 @_cdecl("LKRoomDelegateCreate")
-public func LKRoomDelegateCreate(data: UnsafeRawPointer, onDidSubscribeToRemoteTrack: @escaping @convention(c) (UnsafeRawPointer, UnsafeRawPointer) -> Void) -> UnsafeMutableRawPointer {
-    let delegate = LKRoomDelegate(data: data, onDidSubscribeToRemoteTrack: onDidSubscribeToRemoteTrack)
+public func LKRoomDelegateCreate(data: UnsafeRawPointer, onDidSubscribeToRemoteVideoTrack: @escaping @convention(c) (UnsafeRawPointer, UnsafeRawPointer) -> Void) -> UnsafeMutableRawPointer {
+    let delegate = LKRoomDelegate(data: data, onDidSubscribeToRemoteVideoTrack: onDidSubscribeToRemoteVideoTrack)
     return Unmanaged.passRetained(delegate).toOpaque()
 }
 
@@ -59,3 +91,15 @@ public func LKCreateScreenShareTrackForWindow(windowId: uint32) -> UnsafeMutable
     let track = LocalVideoTrack.createMacOSScreenShareTrack(source: .window(id: windowId))
     return Unmanaged.passRetained(track).toOpaque()
 }
+
+@_cdecl("LKVideoRendererCreate")
+public func LKVideoRendererCreate(data: UnsafeRawPointer, onFrame: @escaping @convention(c) (UnsafeRawPointer, CVPixelBuffer) -> Void, onDrop: @escaping @convention(c) (UnsafeRawPointer) -> Void) -> UnsafeMutableRawPointer {
+    Unmanaged.passRetained(LKVideoRenderer(data: data, onFrame: onFrame, onDrop: onDrop)).toOpaque()
+}
+
+@_cdecl("LKVideoTrackAddRenderer")
+public func LKVideoTrackAddRenderer(track: UnsafeRawPointer, renderer: UnsafeRawPointer) {
+    let track = Unmanaged<Track>.fromOpaque(track).takeUnretainedValue() as! VideoTrack
+    let renderer = Unmanaged<LKVideoRenderer>.fromOpaque(renderer).takeRetainedValue()
+    track.add(videoRenderer: renderer)
+}

crates/live_kit/src/live_kit.rs 🔗

@@ -10,7 +10,12 @@ use core_graphics::window::{
     kCGNullWindowID, kCGWindowListOptionExcludeDesktopElements, kCGWindowListOptionOnScreenOnly,
     kCGWindowNumber, kCGWindowOwnerName, kCGWindowOwnerPID, CGWindowListCopyWindowInfo,
 };
-use futures::{channel::oneshot, Future};
+use futures::{
+    channel::{mpsc, oneshot},
+    Future,
+};
+use media::core_video::{CVImageBuffer, CVImageBufferRef};
+use parking_lot::Mutex;
 use std::{
     ffi::c_void,
     sync::{Arc, Weak},
@@ -20,8 +25,8 @@ extern "C" {
     fn LKRelease(object: *const c_void);
 
     fn LKRoomDelegateCreate(
-        callback_data: *const c_void,
-        on_did_subscribe_to_remote_track: extern "C" fn(
+        callback_data: *mut c_void,
+        on_did_subscribe_to_remote_video_track: extern "C" fn(
             callback_data: *mut c_void,
             remote_track: *const c_void,
         ),
@@ -42,22 +47,30 @@ extern "C" {
         callback_data: *mut c_void,
     );
 
+    fn LKVideoRendererCreate(
+        callback_data: *mut c_void,
+        on_frame: extern "C" fn(callback_data: *mut c_void, frame: CVImageBufferRef),
+        on_drop: extern "C" fn(callback_data: *mut c_void),
+    ) -> *const c_void;
+
+    fn LKVideoTrackAddRenderer(track: *const c_void, renderer: *const c_void);
+
     fn LKCreateScreenShareTrackForWindow(windowId: u32) -> *const c_void;
 }
 
 pub struct Room {
-    debug_name: &'static str,
     native_room: *const c_void,
+    remote_video_track_subscribers: Mutex<Vec<mpsc::UnboundedSender<Arc<RemoteVideoTrack>>>>,
     _delegate: RoomDelegate,
 }
 
 impl Room {
-    pub fn new(debug_name: &'static str) -> Arc<Self> {
+    pub fn new() -> Arc<Self> {
         Arc::new_cyclic(|weak_room| {
             let delegate = RoomDelegate::new(weak_room.clone());
             Self {
-                debug_name,
                 native_room: unsafe { LKRoomCreate(delegate.native_delegate) },
+                remote_video_track_subscribers: Default::default(),
                 _delegate: delegate,
             }
         })
@@ -88,8 +101,17 @@ impl Room {
         async { rx.await.unwrap().context("error publishing video track") }
     }
 
-    fn did_subscribe_to_remote_track(&self, track: RemoteVideoTrack) {
-        println!("{}: !!!!!!!!!!!!!!!!!!", self.debug_name);
+    pub fn remote_video_tracks(&self) -> mpsc::UnboundedReceiver<Arc<RemoteVideoTrack>> {
+        let (tx, rx) = mpsc::unbounded();
+        self.remote_video_track_subscribers.lock().push(tx);
+        rx
+    }
+
+    fn did_subscribe_to_remote_video_track(&self, track: RemoteVideoTrack) {
+        let track = Arc::new(track);
+        self.remote_video_track_subscribers
+            .lock()
+            .retain(|tx| tx.unbounded_send(track.clone()).is_ok());
     }
 
     fn build_done_callback() -> (
@@ -131,8 +153,8 @@ impl RoomDelegate {
         let weak_room = Weak::into_raw(weak_room);
         let native_delegate = unsafe {
             LKRoomDelegateCreate(
-                weak_room as *const c_void,
-                Self::on_did_subscribe_to_remote_track,
+                weak_room as *mut c_void,
+                Self::on_did_subscribe_to_remote_video_track,
             )
         };
         Self {
@@ -141,11 +163,11 @@ impl RoomDelegate {
         }
     }
 
-    extern "C" fn on_did_subscribe_to_remote_track(room: *mut c_void, track: *const c_void) {
+    extern "C" fn on_did_subscribe_to_remote_video_track(room: *mut c_void, track: *const c_void) {
         let room = unsafe { Weak::from_raw(room as *mut Room) };
-        let track = unsafe { RemoteVideoTrack(track) };
+        let track = RemoteVideoTrack(track);
         if let Some(room) = room.upgrade() {
-            room.did_subscribe_to_remote_track(track);
+            room.did_subscribe_to_remote_video_track(track);
         }
         let _ = Weak::into_raw(room);
     }
@@ -176,6 +198,37 @@ impl Drop for LocalVideoTrack {
 
 pub struct RemoteVideoTrack(*const c_void);
 
+impl RemoteVideoTrack {
+    pub fn add_renderer<F>(&self, callback: F)
+    where
+        F: 'static + FnMut(CVImageBuffer),
+    {
+        extern "C" fn on_frame<F>(callback_data: *mut c_void, frame: CVImageBufferRef)
+        where
+            F: FnMut(CVImageBuffer),
+        {
+            unsafe {
+                let buffer = CVImageBuffer::wrap_under_get_rule(frame);
+                let callback = &mut *(callback_data as *mut F);
+                callback(buffer);
+            }
+        }
+
+        extern "C" fn on_drop<F>(callback_data: *mut c_void) {
+            unsafe {
+                let _ = Box::from_raw(callback_data as *mut F);
+            }
+        }
+
+        let callback_data = Box::into_raw(Box::new(callback));
+        unsafe {
+            let renderer =
+                LKVideoRendererCreate(callback_data as *mut c_void, on_frame::<F>, on_drop::<F>);
+            LKVideoTrackAddRenderer(self.0, renderer);
+        }
+    }
+}
+
 impl Drop for RemoteVideoTrack {
     fn drop(&mut self) {
         unsafe { LKRelease(self.0) }