WIP

Nathan Sobo created

Change summary

Cargo.lock                                                                     |  14 
crates/call/src/room.rs                                                        |  42 
crates/live_kit_client/.cargo/config.toml                                      |   2 
crates/live_kit_client/Cargo.toml                                              |  27 
crates/live_kit_client/LiveKitBridge/Sources/LiveKitBridge/LiveKitBridge.swift |  33 
crates/live_kit_client/build.rs                                                |   5 
crates/live_kit_client/examples/test_app.rs                                    | 160 
crates/live_kit_client/src/live_kit_client.rs                                  |  88 
crates/live_kit_server/src/token.rs                                            |  12 
9 files changed, 336 insertions(+), 47 deletions(-)

Detailed changes

Cargo.lock 🔗

@@ -3166,13 +3166,27 @@ name = "live_kit_client"
 version = "0.1.0"
 dependencies = [
  "anyhow",
+ "block",
+ "byteorder",
+ "bytes 1.2.1",
+ "cocoa",
  "core-foundation",
  "core-graphics",
+ "foreign-types",
  "futures 0.3.24",
+ "gpui",
+ "hmac 0.12.1",
+ "jwt",
+ "live_kit_server",
+ "log",
  "media",
+ "objc",
  "parking_lot 0.11.2",
+ "postage",
  "serde",
  "serde_json",
+ "sha2 0.10.6",
+ "simplelog",
 ]
 
 [[package]]

crates/call/src/room.rs 🔗

@@ -7,7 +7,7 @@ use client::{proto, Client, PeerId, TypedEnvelope, User, UserStore};
 use collections::{BTreeMap, HashSet};
 use futures::StreamExt;
 use gpui::{AsyncAppContext, Entity, ModelContext, ModelHandle, MutableAppContext, Task};
-use live_kit_client::{LocalVideoTrack, RemoteVideoTrackChange};
+use live_kit_client::{LocalVideoTrack, RemoteVideoTrackUpdate};
 use postage::watch;
 use project::Project;
 use std::sync::Arc;
@@ -75,9 +75,9 @@ impl Room {
 
         let live_kit_room = if let Some(connection_info) = live_kit_connection_info {
             let room = live_kit_client::Room::new();
-            let mut tracks = room.remote_video_tracks();
+            let mut track_changes = room.remote_video_track_updates();
             let maintain_room = cx.spawn_weak(|this, mut cx| async move {
-                while let Some(track_change) = tracks.next().await {
+                while let Some(track_change) = track_changes.next().await {
                     let this = if let Some(this) = this.upgrade(&cx) {
                         this
                     } else {
@@ -85,7 +85,7 @@ impl Room {
                     };
 
                     this.update(&mut cx, |this, cx| {
-                        this.remote_video_track_changed(track_change, cx).log_err()
+                        this.remote_video_track_updated(track_change, cx).log_err()
                     });
                 }
             });
@@ -330,13 +330,16 @@ impl Room {
                         );
 
                         if let Some((room, _)) = this.live_kit_room.as_ref() {
-                            for track in
-                                room.video_tracks_for_remote_participant(peer_id.0.to_string())
-                            {
-                                this.remote_video_track_changed(
-                                    RemoteVideoTrackChange::Subscribed(track),
+                            println!("getting video tracks for peer id {}", peer_id.0.to_string());
+                            let tracks = room.remote_video_tracks(&peer_id.0.to_string());
+                            dbg!(tracks.len());
+                            for track in tracks {
+                                dbg!(track.id(), track.publisher_id());
+                                this.remote_video_track_updated(
+                                    RemoteVideoTrackUpdate::Subscribed(track),
                                     cx,
-                                );
+                                )
+                                .log_err();
                             }
                         }
                     }
@@ -376,13 +379,14 @@ impl Room {
         Ok(())
     }
 
-    fn remote_video_track_changed(
+    fn remote_video_track_updated(
         &mut self,
-        change: RemoteVideoTrackChange,
+        change: RemoteVideoTrackUpdate,
         cx: &mut ModelContext<Self>,
     ) -> Result<()> {
         match change {
-            RemoteVideoTrackChange::Subscribed(track) => {
+            RemoteVideoTrackUpdate::Subscribed(track) => {
+                dbg!(track.publisher_id(), track.id());
                 let peer_id = PeerId(track.publisher_id().parse()?);
                 let track_id = track.id().to_string();
                 let participant = self
@@ -427,7 +431,7 @@ impl Room {
                     },
                 );
             }
-            RemoteVideoTrackChange::Unsubscribed {
+            RemoteVideoTrackUpdate::Unsubscribed {
                 publisher_id,
                 track_id,
             } => {
@@ -596,11 +600,11 @@ impl Room {
         };
 
         cx.foreground().spawn(async move {
-            let displays = live_kit_client::display_sources().await?;
-            let display = displays
-                .first()
-                .ok_or_else(|| anyhow!("no display found"))?;
-            let track = LocalVideoTrack::screen_share_for_display(display);
+            let display = live_kit_client::display_source().await?;
+            // let display = displays
+            //     .first()
+            //     .ok_or_else(|| anyhow!("no display found"))?;
+            let track = LocalVideoTrack::screen_share_for_display(&display);
             room.publish_video_track(&track).await?;
             Ok(())
         })

crates/live_kit_client/Cargo.toml 🔗

@@ -8,6 +8,9 @@ description = "Bindings to LiveKit Swift client SDK"
 path = "src/live_kit_client.rs"
 doctest = false
 
+[[example]]
+name = "test_app"
+
 [dependencies]
 media = { path = "../media" }
 
@@ -17,6 +20,30 @@ core-graphics = "0.22.3"
 futures = "0.3"
 parking_lot = "0.11.1"
 
+[dev-dependencies]
+gpui = { path = "../gpui" }
+live_kit_server = { path = "../live_kit_server" }
+media = { path = "../media" }
+
+anyhow = "1.0.38"
+block = "0.1"
+bytes = "1.2"
+byteorder = "1.4"
+cocoa = "0.24"
+core-foundation = "0.9.3"
+core-graphics = "0.22.3"
+foreign-types = "0.3"
+futures = "0.3"
+hmac = "0.12"
+jwt = "0.16"
+log = { version = "0.4.16", features = ["kv_unstable_serde"] }
+objc = "0.2"
+parking_lot = "0.11.1"
+postage = { version = "0.4.1", features = ["futures-traits"] }
+serde = { version = "1.0", features = ["derive", "rc"] }
+sha2 = "0.10"
+simplelog = "0.9"
+
 [build-dependencies]
 serde = { version = "1.0", features = ["derive", "rc"] }
 serde_json = { version = "1.0", features = ["preserve_order"] }

crates/live_kit_client/LiveKitBridge/Sources/LiveKitBridge/LiveKitBridge.swift 🔗

@@ -15,7 +15,7 @@ class LKRoomDelegate: RoomDelegate {
 
     func room(_ room: Room, participant: RemoteParticipant, didSubscribe publication: RemoteTrackPublication, track: Track) {
         if track.kind == .video {
-            self.onDidSubscribeToRemoteVideoTrack(self.data, participant.sid as CFString, track.id as CFString, Unmanaged.passRetained(track).toOpaque())
+            self.onDidSubscribeToRemoteVideoTrack(self.data, participant.identity as CFString, track.id as CFString, Unmanaged.passRetained(track).toOpaque())
         }
     }
     
@@ -97,8 +97,22 @@ public func LKRoomPublishVideoTrack(room: UnsafeRawPointer, track: UnsafeRawPoin
 @_cdecl("LKRoomVideoTracksForRemoteParticipant")
 public func LKRoomVideoTracksForRemoteParticipant(room: UnsafeRawPointer, participantId: CFString) -> CFArray? {
     let room = Unmanaged<Room>.fromOpaque(room).takeUnretainedValue()
-    let tracks = room.remoteParticipants[participantId as Sid]?.videoTracks.compactMap { $0.track as? RemoteVideoTrack }
-    return tracks as CFArray?
+    
+    for (_, participant) in room.remoteParticipants {
+        if participant.identity == participantId as String {
+            var tracks = [UnsafeMutableRawPointer]()
+            for publication in participant.videoTracks {
+                let track = publication.track as? RemoteVideoTrack
+                if track != nil {
+                    tracks.append(Unmanaged.passRetained(track!).toOpaque())
+                }
+                
+            }
+            return tracks as CFArray?
+        }
+    }
+    
+    return nil;
 }
 
 @_cdecl("LKCreateScreenShareTrackForDisplay")
@@ -120,10 +134,17 @@ public func LKVideoTrackAddRenderer(track: UnsafeRawPointer, renderer: UnsafeRaw
     track.add(videoRenderer: renderer)
 }
 
-@_cdecl("LKDisplaySources")
-public func LKDisplaySources(data: UnsafeRawPointer, callback: @escaping @convention(c) (UnsafeRawPointer, CFArray?, CFString?) -> Void) {
+@_cdecl("LKRemoteVideoTrackGetSid")
+public func LKRemoteVideoTrackGetSid(track: UnsafeRawPointer) -> CFString {
+    let track = Unmanaged<RemoteVideoTrack>.fromOpaque(track).takeUnretainedValue()
+    return track.sid! as CFString
+}
+
+@_cdecl("LKDisplaySource")
+public func LKDisplaySource(data: UnsafeRawPointer, callback: @escaping @convention(c) (UnsafeRawPointer, UnsafeRawPointer?, CFString?) -> Void) {
     MacOSScreenCapturer.sources(for: .display, includeCurrentApplication: false, preferredMethod: .legacy).then { displaySources in
-        callback(data, displaySources as CFArray, nil)
+        let displaySource = displaySources.first.map { Unmanaged.passRetained($0).toOpaque() }
+        callback(data, displaySource, nil)
     }.catch { error in
         callback(data, nil, error.localizedDescription as CFString)
     }

crates/live_kit_client/build.rs 🔗

@@ -40,6 +40,9 @@ fn main() {
     build_bridge(&swift_target);
     link_swift_stdlib(&swift_target);
     link_webrtc_framework(&swift_target);
+
+    // Register exported Objective-C selectors, protocols, etc when building example binaries.
+    println!("cargo:rustc-link-arg=-Wl,-ObjC");
 }
 
 fn build_bridge(swift_target: &SwiftTarget) {
@@ -94,6 +97,8 @@ fn link_webrtc_framework(swift_target: &SwiftTarget) {
     );
     // Find WebRTC.framework as a sibling of the executable when running tests.
     println!("cargo:rustc-link-arg=-Wl,-rpath,@executable_path");
+    // Find WebRTC.framework in parent directory of the executable when running examples.
+    println!("cargo:rustc-link-arg=-Wl,-rpath,@executable_path/..");
 
     let source_path = swift_out_dir_path.join("WebRTC.framework");
     let deps_dir_path =

crates/live_kit_client/examples/test_app.rs 🔗

@@ -0,0 +1,160 @@
+use core_foundation::base::CFRetain;
+use futures::StreamExt;
+use gpui::{
+    actions,
+    elements::{Canvas, *},
+    keymap::Binding,
+    platform::current::Surface,
+    Menu, MenuItem, ViewContext,
+};
+use live_kit_client::{LocalVideoTrack, RemoteVideoTrackUpdate, Room};
+use live_kit_server::token::{self, VideoGrant};
+use log::LevelFilter;
+use media::core_video::CVImageBuffer;
+use postage::watch;
+use simplelog::SimpleLogger;
+use std::sync::Arc;
+
+actions!(capture, [Quit]);
+
+fn main() {
+    SimpleLogger::init(LevelFilter::Info, Default::default()).expect("could not initialize logger");
+
+    gpui::App::new(()).unwrap().run(|cx| {
+        cx.platform().activate(true);
+        cx.add_global_action(quit);
+
+        cx.add_bindings([Binding::new("cmd-q", Quit, None)]);
+        cx.set_menus(vec![Menu {
+            name: "Zed",
+            items: vec![MenuItem::Action {
+                name: "Quit",
+                action: Box::new(Quit),
+            }],
+        }]);
+
+        let live_kit_url = std::env::var("LIVE_KIT_URL").unwrap_or("http://localhost:7880".into());
+        let live_kit_key = std::env::var("LIVE_KIT_KEY").unwrap_or("devkey".into());
+        let live_kit_secret = std::env::var("LIVE_KIT_SECRET").unwrap_or("secret".into());
+
+        cx.spawn(|cx| async move {
+            let user_a_token = token::create(
+                &live_kit_key,
+                &live_kit_secret,
+                Some("test-participant-1"),
+                VideoGrant::to_join("test-room"),
+            )
+            .unwrap();
+            let room_a = Room::new();
+            room_a.connect(&live_kit_url, &user_a_token).await.unwrap();
+
+            let user2_token = token::create(
+                &live_kit_key,
+                &live_kit_secret,
+                Some("test-participant-2"),
+                VideoGrant::to_join("test-room"),
+            )
+            .unwrap();
+            let room_b = Room::new();
+            room_b.connect(&live_kit_url, &user2_token).await.unwrap();
+
+            let mut track_changes = room_b.remote_video_track_updates();
+
+            let display = live_kit_client::display_source().await.unwrap();
+
+            let track_a = LocalVideoTrack::screen_share_for_display(&display);
+            room_a.publish_video_track(&track_a).await.unwrap();
+
+            let next_update = track_changes.next().await.unwrap();
+
+            if let RemoteVideoTrackUpdate::Subscribed(track) = next_update {
+                println!("A !!!!!!!!!!!!");
+                let remote_tracks = room_b.remote_video_tracks("test-participant-1");
+                println!("B !!!!!!!!!!!!");
+                assert_eq!(remote_tracks.len(), 1);
+                println!("C !!!!!!!!!!!!");
+                assert_eq!(remote_tracks[0].publisher_id(), "test-participant-1");
+                println!("D !!!!!!!!!!!!");
+                // dbg!(track.id());
+                // assert_eq!(track.id(), "test-participant-1");
+            } else {
+                panic!("unexpected message")
+            }
+            println!("E !!!!!!!!!!!!");
+
+            cx.platform().quit();
+        })
+        .detach();
+    });
+}
+
+struct ScreenCaptureView {
+    image_buffer: Option<CVImageBuffer>,
+    _room: Arc<Room>,
+}
+
+impl gpui::Entity for ScreenCaptureView {
+    type Event = ();
+}
+
+impl ScreenCaptureView {
+    pub fn new(room: Arc<Room>, cx: &mut ViewContext<Self>) -> Self {
+        let mut remote_video_tracks = room.remote_video_track_updates();
+        cx.spawn_weak(|this, mut cx| async move {
+            if let Some(video_track) = remote_video_tracks.next().await {
+                let (mut frames_tx, mut frames_rx) = watch::channel_with(None);
+                // video_track.add_renderer(move |frame| *frames_tx.borrow_mut() = Some(frame));
+
+                while let Some(frame) = frames_rx.next().await {
+                    if let Some(this) = this.upgrade(&cx) {
+                        this.update(&mut cx, |this, cx| {
+                            this.image_buffer = frame;
+                            cx.notify();
+                        });
+                    } else {
+                        break;
+                    }
+                }
+            }
+        })
+        .detach();
+
+        Self {
+            image_buffer: None,
+            _room: room,
+        }
+    }
+}
+
+impl gpui::View for ScreenCaptureView {
+    fn ui_name() -> &'static str {
+        "View"
+    }
+
+    fn render(&mut self, _: &mut gpui::RenderContext<Self>) -> gpui::ElementBox {
+        let image_buffer = self.image_buffer.clone();
+        let canvas = Canvas::new(move |bounds, _, cx| {
+            if let Some(image_buffer) = image_buffer.clone() {
+                cx.scene.push_surface(Surface {
+                    bounds,
+                    image_buffer,
+                });
+            }
+        });
+
+        if let Some(image_buffer) = self.image_buffer.as_ref() {
+            canvas
+                .constrained()
+                .with_width(image_buffer.width() as f32)
+                .with_height(image_buffer.height() as f32)
+                .aligned()
+                .boxed()
+        } else {
+            canvas.boxed()
+        }
+    }
+}
+
+fn quit(_: &Quit, cx: &mut gpui::MutableAppContext) {
+    cx.platform().quit();
+}

crates/live_kit_client/src/live_kit_client.rs 🔗

@@ -15,6 +15,10 @@ use std::{
     sync::{Arc, Weak},
 };
 
+pub type Sid = String;
+#[allow(non_camel_case_types)]
+pub type sid = str;
+
 extern "C" {
     fn LKRelease(object: *const c_void);
 
@@ -47,6 +51,10 @@ extern "C" {
         callback: extern "C" fn(*mut c_void, CFStringRef),
         callback_data: *mut c_void,
     );
+    fn LKRoomVideoTracksForRemoteParticipant(
+        room: *const c_void,
+        participant_id: CFStringRef,
+    ) -> CFArrayRef;
 
     fn LKVideoRendererCreate(
         callback_data: *mut c_void,
@@ -55,12 +63,13 @@ extern "C" {
     ) -> *const c_void;
 
     fn LKVideoTrackAddRenderer(track: *const c_void, renderer: *const c_void);
+    fn LKRemoteVideoTrackGetSid(track: *const c_void) -> CFStringRef;
 
-    fn LKDisplaySources(
+    fn LKDisplaySource(
         callback_data: *mut c_void,
         callback: extern "C" fn(
             callback_data: *mut c_void,
-            sources: CFArrayRef,
+            source: *mut c_void,
             error: CFStringRef,
         ),
     );
@@ -69,7 +78,7 @@ extern "C" {
 
 pub struct Room {
     native_room: *const c_void,
-    remote_video_track_subscribers: Mutex<Vec<mpsc::UnboundedSender<RemoteVideoTrackChange>>>,
+    remote_video_track_subscribers: Mutex<Vec<mpsc::UnboundedSender<RemoteVideoTrackUpdate>>>,
     _delegate: RoomDelegate,
 }
 
@@ -110,7 +119,40 @@ impl Room {
         async { rx.await.unwrap().context("error publishing video track") }
     }
 
-    pub fn remote_video_tracks(&self) -> mpsc::UnboundedReceiver<RemoteVideoTrackChange> {
+    pub fn remote_video_tracks(&self, participant_sid: &sid) -> Vec<Arc<RemoteVideoTrack>> {
+        unsafe {
+            let tracks = LKRoomVideoTracksForRemoteParticipant(
+                self.native_room,
+                CFString::new(participant_sid).as_concrete_TypeRef(),
+            );
+
+            if tracks.is_null() {
+                Vec::new()
+            } else {
+                println!("aaaa >>>>>>>>>>>>>>>");
+                let tracks = CFArray::wrap_under_get_rule(tracks);
+                println!("bbbb >>>>>>>>>>>>>>>");
+                tracks
+                    .into_iter()
+                    .map(|native_track| {
+                        let native_track = *native_track;
+                        println!("cccc >>>>>>>>>>>>>>>");
+                        let id =
+                            CFString::wrap_under_get_rule(LKRemoteVideoTrackGetSid(native_track))
+                                .to_string();
+                        println!("dddd >>>>>>>>>>>>>>>");
+                        Arc::new(RemoteVideoTrack {
+                            native_track,
+                            publisher_id: participant_sid.into(),
+                            id,
+                        })
+                    })
+                    .collect()
+            }
+        }
+    }
+
+    pub fn remote_video_track_updates(&self) -> mpsc::UnboundedReceiver<RemoteVideoTrackUpdate> {
         let (tx, rx) = mpsc::unbounded();
         self.remote_video_track_subscribers.lock().push(tx);
         rx
@@ -119,14 +161,14 @@ impl Room {
     fn did_subscribe_to_remote_video_track(&self, track: RemoteVideoTrack) {
         let track = Arc::new(track);
         self.remote_video_track_subscribers.lock().retain(|tx| {
-            tx.unbounded_send(RemoteVideoTrackChange::Subscribed(track.clone()))
+            tx.unbounded_send(RemoteVideoTrackUpdate::Subscribed(track.clone()))
                 .is_ok()
         });
     }
 
     fn did_unsubscribe_from_remote_video_track(&self, publisher_id: String, track_id: String) {
         self.remote_video_track_subscribers.lock().retain(|tx| {
-            tx.unbounded_send(RemoteVideoTrackChange::Unsubscribed {
+            tx.unbounded_send(RemoteVideoTrackUpdate::Unsubscribed {
                 publisher_id: publisher_id.clone(),
                 track_id: track_id.clone(),
             })
@@ -201,7 +243,7 @@ impl RoomDelegate {
         if let Some(room) = room.upgrade() {
             room.did_subscribe_to_remote_video_track(track);
         }
-        let _ = Weak::into_raw(room);
+        // let _ = Weak::into_raw(room);
     }
 
     extern "C" fn on_did_unsubscribe_from_remote_video_track(
@@ -244,9 +286,9 @@ impl Drop for LocalVideoTrack {
 
 #[derive(Debug)]
 pub struct RemoteVideoTrack {
-    id: String,
+    id: Sid,
     native_track: *const c_void,
-    publisher_id: String,
+    publisher_id: Sid,
 }
 
 impl RemoteVideoTrack {
@@ -294,12 +336,9 @@ impl Drop for RemoteVideoTrack {
     }
 }
 
-pub enum RemoteVideoTrackChange {
+pub enum RemoteVideoTrackUpdate {
     Subscribed(Arc<RemoteVideoTrack>),
-    Unsubscribed {
-        publisher_id: String,
-        track_id: String,
-    },
+    Unsubscribed { publisher_id: Sid, track_id: Sid },
 }
 
 pub struct MacOSDisplay(*const c_void);
@@ -310,17 +349,16 @@ impl Drop for MacOSDisplay {
     }
 }
 
-pub fn display_sources() -> impl Future<Output = Result<Vec<MacOSDisplay>>> {
-    extern "C" fn callback(tx: *mut c_void, sources: CFArrayRef, error: CFStringRef) {
+pub fn display_source() -> impl Future<Output = Result<MacOSDisplay>> {
+    extern "C" fn callback(tx: *mut c_void, source: *mut c_void, error: CFStringRef) {
         unsafe {
-            let tx = Box::from_raw(tx as *mut oneshot::Sender<Result<Vec<MacOSDisplay>>>);
+            let tx = Box::from_raw(tx as *mut oneshot::Sender<Result<MacOSDisplay>>);
 
-            if sources.is_null() {
+            if source.is_null() {
                 let _ = tx.send(Err(anyhow!("{}", CFString::wrap_under_get_rule(error))));
             } else {
-                let sources = CFArray::wrap_under_get_rule(sources);
-                let sources_vec = sources.iter().map(|source| MacOSDisplay(*source)).collect();
-                let _ = tx.send(Ok(sources_vec));
+                let source = MacOSDisplay(source);
+                let _ = tx.send(Ok(source));
             }
         }
     }
@@ -328,8 +366,14 @@ pub fn display_sources() -> impl Future<Output = Result<Vec<MacOSDisplay>>> {
     let (tx, rx) = oneshot::channel();
 
     unsafe {
-        LKDisplaySources(Box::into_raw(Box::new(tx)) as *mut _, callback);
+        LKDisplaySource(Box::into_raw(Box::new(tx)) as *mut _, callback);
     }
 
     async move { rx.await.unwrap() }
 }
+
+#[cfg(test)]
+mod tests {
+    #[test]
+    fn test_client() {}
+}

crates/live_kit_server/src/token.rs 🔗

@@ -38,6 +38,18 @@ pub struct VideoGrant<'a> {
     pub recorder: Option<bool>,
 }
 
+impl<'a> VideoGrant<'a> {
+    pub fn to_join(room: &'a str) -> Self {
+        Self {
+            room: Some(room),
+            room_join: Some(true),
+            can_publish: Some(true),
+            can_subscribe: Some(true),
+            ..Default::default()
+        }
+    }
+}
+
 pub fn create(
     api_key: &str,
     secret_key: &str,