Simplify renderer interface for live-kit-client

Antonio Scandurra created

Change summary

crates/call/src/room.rs                                                        | 54 
crates/live_kit_client/Cargo.toml                                              |  5 
crates/live_kit_client/LiveKitBridge/Sources/LiveKitBridge/LiveKitBridge.swift | 16 
crates/live_kit_client/examples/test_app.rs                                    |  1 
crates/live_kit_client/src/prod.rs                                             | 44 
crates/live_kit_client/src/test.rs                                             | 20 
6 files changed, 70 insertions(+), 70 deletions(-)

Detailed changes

crates/call/src/room.rs 🔗

@@ -8,7 +8,6 @@ use collections::{BTreeMap, HashSet};
 use futures::StreamExt;
 use gpui::{AsyncAppContext, Entity, ModelContext, ModelHandle, MutableAppContext, Task};
 use live_kit_client::{LocalTrackPublication, LocalVideoTrack, RemoteVideoTrackUpdate};
-use postage::watch;
 use project::Project;
 use std::{mem, os::unix::prelude::OsStrExt, sync::Arc};
 use util::{post_inc, ResultExt};
@@ -409,42 +408,39 @@ impl Room {
                     .remote_participants
                     .get_mut(&peer_id)
                     .ok_or_else(|| anyhow!("subscribed to track by unknown participant"))?;
-                let (mut tx, mut rx) = watch::channel();
-                track.add_renderer(move |frame| *tx.borrow_mut() = Some(frame));
+                let mut frames = track.frames();
                 participant.tracks.insert(
                     track_id.clone(),
                     RemoteVideoTrack {
                         frame: None,
                         _live_kit_track: track,
                         _maintain_frame: Arc::new(cx.spawn_weak(|this, mut cx| async move {
-                            while let Some(frame) = rx.next().await {
-                                if let Some(frame) = frame {
-                                    let this = if let Some(this) = this.upgrade(&cx) {
-                                        this
+                            while let Some(frame) = frames.next().await {
+                                let this = if let Some(this) = this.upgrade(&cx) {
+                                    this
+                                } else {
+                                    break;
+                                };
+
+                                let done = this.update(&mut cx, |this, cx| {
+                                    if let Some(track) =
+                                        this.remote_participants.get_mut(&peer_id).and_then(
+                                            |participant| participant.tracks.get_mut(&track_id),
+                                        )
+                                    {
+                                        track.frame = Some(frame);
+                                        cx.emit(Event::Frame {
+                                            participant_id: peer_id,
+                                            track_id: track_id.clone(),
+                                        });
+                                        false
                                     } else {
-                                        break;
-                                    };
-
-                                    let done = this.update(&mut cx, |this, cx| {
-                                        if let Some(track) =
-                                            this.remote_participants.get_mut(&peer_id).and_then(
-                                                |participant| participant.tracks.get_mut(&track_id),
-                                            )
-                                        {
-                                            track.frame = Some(frame);
-                                            cx.emit(Event::Frame {
-                                                participant_id: peer_id,
-                                                track_id: track_id.clone(),
-                                            });
-                                            false
-                                        } else {
-                                            true
-                                        }
-                                    });
-
-                                    if done {
-                                        break;
+                                        true
                                     }
+                                });
+
+                                if done {
+                                    break;
                                 }
                             }
                         })),

crates/live_kit_client/Cargo.toml 🔗

@@ -13,7 +13,6 @@ name = "test_app"
 
 [features]
 test-support = [
-    "async-broadcast",
     "async-trait", 
     "collections/test-support",
     "gpui/test-support",
@@ -29,12 +28,13 @@ live_kit_server = { path = "../live_kit_server", optional = true }
 media = { path = "../media" }
 
 anyhow = "1.0.38"
+async-broadcast = "0.4"
 core-foundation = "0.9.3"
 core-graphics = "0.22.3"
 futures = "0.3"
+log = { version = "0.4.16", features = ["kv_unstable_serde"] }
 parking_lot = "0.11.1"
 
-async-broadcast = { version = "0.4", optional = true }
 async-trait = { version = "0.1", optional = true }
 lazy_static = { version = "1.4", optional = true }
 nanoid = { version ="0.4", optional = true}
@@ -58,7 +58,6 @@ futures = "0.3"
 hmac = "0.12"
 jwt = "0.16"
 lazy_static = "1.4"
-log = { version = "0.4.16", features = ["kv_unstable_serde"] }
 objc = "0.2"
 parking_lot = "0.11.1"
 postage = { version = "0.4.1", features = ["futures-traits"] }

crates/live_kit_client/LiveKitBridge/Sources/LiveKitBridge/LiveKitBridge.swift 🔗

@@ -28,12 +28,13 @@ class LKRoomDelegate: RoomDelegate {
 
 class LKVideoRenderer: NSObject, VideoRenderer {
     var data: UnsafeRawPointer
-    var onFrame: @convention(c) (UnsafeRawPointer, CVPixelBuffer) -> Void
+    var onFrame: @convention(c) (UnsafeRawPointer, CVPixelBuffer) -> Bool
     var onDrop: @convention(c) (UnsafeRawPointer) -> Void
     var adaptiveStreamIsEnabled: Bool = false
     var adaptiveStreamSize: CGSize = .zero
+    weak var track: VideoTrack?
 
-    init(data: UnsafeRawPointer, onFrame: @escaping @convention(c) (UnsafeRawPointer, CVPixelBuffer) -> Void, onDrop: @escaping @convention(c) (UnsafeRawPointer) -> Void) {
+    init(data: UnsafeRawPointer, onFrame: @escaping @convention(c) (UnsafeRawPointer, CVPixelBuffer) -> Bool, onDrop: @escaping @convention(c) (UnsafeRawPointer) -> Void) {
         self.data = data
         self.onFrame = onFrame
         self.onDrop = onDrop
@@ -50,7 +51,11 @@ class LKVideoRenderer: NSObject, VideoRenderer {
     func renderFrame(_ frame: RTCVideoFrame?) {
         let buffer = frame?.buffer as? RTCCVPixelBuffer
         if let pixelBuffer = buffer?.pixelBuffer {
-            self.onFrame(self.data, pixelBuffer)
+            if !self.onFrame(self.data, pixelBuffer) {
+                DispatchQueue.main.async {
+                    self.track?.remove(videoRenderer: self)
+                }
+            }
         }
     }
 }
@@ -99,7 +104,7 @@ public func LKRoomPublishVideoTrack(room: UnsafeRawPointer, track: UnsafeRawPoin
 public func LKRoomUnpublishTrack(room: UnsafeRawPointer, publication: UnsafeRawPointer) {
     let room = Unmanaged<Room>.fromOpaque(room).takeUnretainedValue()
     let publication = Unmanaged<LocalTrackPublication>.fromOpaque(publication).takeUnretainedValue()
-    room.localParticipant?.unpublish(publication: publication)
+    let _ = room.localParticipant?.unpublish(publication: publication)
 }
 
 @_cdecl("LKRoomVideoTracksForRemoteParticipant")
@@ -123,7 +128,7 @@ public func LKCreateScreenShareTrackForDisplay(display: UnsafeMutableRawPointer)
 }
 
 @_cdecl("LKVideoRendererCreate")
-public func LKVideoRendererCreate(data: UnsafeRawPointer, onFrame: @escaping @convention(c) (UnsafeRawPointer, CVPixelBuffer) -> Void, onDrop: @escaping @convention(c) (UnsafeRawPointer) -> Void) -> UnsafeMutableRawPointer {
+public func LKVideoRendererCreate(data: UnsafeRawPointer, onFrame: @escaping @convention(c) (UnsafeRawPointer, CVPixelBuffer) -> Bool, onDrop: @escaping @convention(c) (UnsafeRawPointer) -> Void) -> UnsafeMutableRawPointer {
     Unmanaged.passRetained(LKVideoRenderer(data: data, onFrame: onFrame, onDrop: onDrop)).toOpaque()
 }
 
@@ -131,6 +136,7 @@ public func LKVideoRendererCreate(data: UnsafeRawPointer, onFrame: @escaping @co
 public func LKVideoTrackAddRenderer(track: UnsafeRawPointer, renderer: UnsafeRawPointer) {
     let track = Unmanaged<Track>.fromOpaque(track).takeUnretainedValue() as! VideoTrack
     let renderer = Unmanaged<LKVideoRenderer>.fromOpaque(renderer).takeRetainedValue()
+    renderer.track = track
     track.add(videoRenderer: renderer)
 }
 

crates/live_kit_client/examples/test_app.rs 🔗

@@ -60,7 +60,6 @@ fn main() {
                 let remote_tracks = room_b.remote_video_tracks("test-participant-1");
                 assert_eq!(remote_tracks.len(), 1);
                 assert_eq!(remote_tracks[0].publisher_id(), "test-participant-1");
-                dbg!(track.sid());
                 assert_eq!(track.publisher_id(), "test-participant-1");
             } else {
                 panic!("unexpected message");

crates/live_kit_client/src/prod.rs 🔗

@@ -55,7 +55,7 @@ extern "C" {
 
     fn LKVideoRendererCreate(
         callback_data: *mut c_void,
-        on_frame: extern "C" fn(callback_data: *mut c_void, frame: CVImageBufferRef),
+        on_frame: extern "C" fn(callback_data: *mut c_void, frame: CVImageBufferRef) -> bool,
         on_drop: extern "C" fn(callback_data: *mut c_void),
     ) -> *const c_void;
 
@@ -364,32 +364,43 @@ impl RemoteVideoTrack {
         &self.publisher_id
     }
 
-    pub fn add_renderer<F>(&self, callback: F)
-    where
-        F: 'static + Send + Sync + FnMut(Frame),
-    {
-        extern "C" fn on_frame<F>(callback_data: *mut c_void, frame: CVImageBufferRef)
-        where
-            F: FnMut(Frame),
-        {
+    pub fn frames(&self) -> async_broadcast::Receiver<Frame> {
+        extern "C" fn on_frame(callback_data: *mut c_void, frame: CVImageBufferRef) -> bool {
             unsafe {
+                let tx = Box::from_raw(callback_data as *mut async_broadcast::Sender<Frame>);
                 let buffer = CVImageBuffer::wrap_under_get_rule(frame);
-                let callback = &mut *(callback_data as *mut F);
-                callback(Frame(buffer));
+                let result = tx.try_broadcast(Frame(buffer));
+                let _ = Box::into_raw(tx);
+                match result {
+                    Ok(_) => true,
+                    Err(async_broadcast::TrySendError::Closed(_))
+                    | Err(async_broadcast::TrySendError::Inactive(_)) => {
+                        log::warn!("no active receiver for frame");
+                        false
+                    }
+                    Err(async_broadcast::TrySendError::Full(_)) => {
+                        log::warn!("skipping frame as receiver is not keeping up");
+                        true
+                    }
+                }
             }
         }
 
-        extern "C" fn on_drop<F>(callback_data: *mut c_void) {
+        extern "C" fn on_drop(callback_data: *mut c_void) {
             unsafe {
-                let _ = Box::from_raw(callback_data as *mut F);
+                let _ = Box::from_raw(callback_data as *mut async_broadcast::Sender<Frame>);
             }
         }
 
-        let callback_data = Box::into_raw(Box::new(callback));
+        let (tx, rx) = async_broadcast::broadcast(64);
         unsafe {
-            let renderer =
-                LKVideoRendererCreate(callback_data as *mut c_void, on_frame::<F>, on_drop::<F>);
+            let renderer = LKVideoRendererCreate(
+                Box::into_raw(Box::new(tx)) as *mut c_void,
+                on_frame,
+                on_drop,
+            );
             LKVideoTrackAddRenderer(self.native_track, renderer);
+            rx
         }
     }
 }
@@ -422,6 +433,7 @@ impl Drop for MacOSDisplay {
     }
 }
 
+#[derive(Clone)]
 pub struct Frame(CVImageBuffer);
 
 impl Frame {

crates/live_kit_client/src/test.rs 🔗

@@ -1,8 +1,8 @@
 use anyhow::{anyhow, Result};
 use async_trait::async_trait;
 use collections::HashMap;
-use futures::{Stream, StreamExt};
-use gpui::executor::{self, Background};
+use futures::Stream;
+use gpui::executor::Background;
 use lazy_static::lazy_static;
 use live_kit_server::token;
 use media::core_video::CVImageBuffer;
@@ -160,7 +160,6 @@ impl TestServer {
             sid: nanoid::nanoid!(17),
             publisher_id: identity.clone(),
             frames_rx: local_track.frames_rx.clone(),
-            background: self.background.clone(),
         }));
 
         for (id, client_room) in &room.client_rooms {
@@ -353,7 +352,6 @@ pub struct RemoteVideoTrack {
     sid: Sid,
     publisher_id: Sid,
     frames_rx: async_broadcast::Receiver<Frame>,
-    background: Arc<executor::Background>,
 }
 
 impl RemoteVideoTrack {
@@ -365,18 +363,8 @@ impl RemoteVideoTrack {
         &self.publisher_id
     }
 
-    pub fn add_renderer<F>(&self, mut callback: F)
-    where
-        F: 'static + Send + Sync + FnMut(Frame),
-    {
-        let mut frames_rx = self.frames_rx.clone();
-        self.background
-            .spawn(async move {
-                while let Some(frame) = frames_rx.next().await {
-                    callback(frame)
-                }
-            })
-            .detach();
+    pub fn frames(&self) -> async_broadcast::Receiver<Frame> {
+        self.frames_rx.clone()
     }
 }