WIP

Antonio Scandurra created

Change summary

crates/call/src/room.rs                                                        | 51 
crates/live_kit_client/LiveKitBridge/Sources/LiveKitBridge/LiveKitBridge.swift |  8 
crates/live_kit_client/src/live_kit_client.rs                                  | 34 
3 files changed, 68 insertions(+), 25 deletions(-)

Detailed changes

crates/call/src/room.rs 🔗

@@ -7,7 +7,7 @@ use client::{proto, Client, PeerId, TypedEnvelope, User, UserStore};
 use collections::{BTreeMap, HashSet};
 use futures::StreamExt;
 use gpui::{AsyncAppContext, Entity, ModelContext, ModelHandle, MutableAppContext, Task};
-use live_kit_client::{LocalVideoTrack, RemoteVideoTrackUpdate};
+use live_kit_client::{LocalTrackPublication, LocalVideoTrack, RemoteVideoTrackUpdate};
 use postage::watch;
 use project::Project;
 use std::sync::Arc;
@@ -32,7 +32,7 @@ pub enum Event {
 
 pub struct Room {
     id: u64,
-    live_kit_room: Option<(Arc<live_kit_client::Room>, Task<()>)>,
+    live_kit: Option<LiveKitRoom>,
     status: RoomStatus,
     local_participant: LocalParticipant,
     remote_participants: BTreeMap<PeerId, RemoteParticipant>,
@@ -82,7 +82,7 @@ impl Room {
         let live_kit_room = if let Some(connection_info) = live_kit_connection_info {
             let room = live_kit_client::Room::new();
             let mut track_changes = room.remote_video_track_updates();
-            let maintain_room = cx.spawn_weak(|this, mut cx| async move {
+            let _maintain_room = cx.spawn_weak(|this, mut cx| async move {
                 while let Some(track_change) = track_changes.next().await {
                     let this = if let Some(this) = this.upgrade(&cx) {
                         this
@@ -98,14 +98,18 @@ impl Room {
             cx.foreground()
                 .spawn(room.connect(&connection_info.server_url, &connection_info.token))
                 .detach_and_log_err(cx);
-            Some((room, maintain_room))
+            Some(LiveKitRoom {
+                room,
+                screen_track: None,
+                _maintain_room,
+            })
         } else {
             None
         };
 
         Self {
             id,
-            live_kit_room,
+            live_kit: live_kit_room,
             status: RoomStatus::Online,
             participant_user_ids: Default::default(),
             local_participant: Default::default(),
@@ -212,7 +216,7 @@ impl Room {
         self.pending_participants.clear();
         self.participant_user_ids.clear();
         self.subscriptions.clear();
-        self.live_kit_room.take();
+        self.live_kit.take();
         self.client.send(proto::LeaveRoom { id: self.id })?;
         Ok(())
     }
@@ -342,8 +346,9 @@ impl Room {
                                 },
                             );
 
-                            if let Some((room, _)) = this.live_kit_room.as_ref() {
-                                let tracks = room.remote_video_tracks(&peer_id.0.to_string());
+                            if let Some(live_kit) = this.live_kit.as_ref() {
+                                let tracks =
+                                    live_kit.room.remote_video_tracks(&peer_id.0.to_string());
                                 for track in tracks {
                                     this.remote_video_track_updated(
                                         RemoteVideoTrackUpdate::Subscribed(track),
@@ -605,24 +610,36 @@ impl Room {
             return Task::ready(Err(anyhow!("room is offline")));
         }
 
-        let room = if let Some((room, _)) = self.live_kit_room.as_ref() {
-            room.clone()
-        } else {
-            return Task::ready(Err(anyhow!("not connected to LiveKit")));
-        };
-
-        cx.foreground().spawn(async move {
+        cx.spawn_weak(|this, mut cx| async move {
             let displays = live_kit_client::display_sources().await?;
             let display = displays
                 .first()
                 .ok_or_else(|| anyhow!("no display found"))?;
             let track = LocalVideoTrack::screen_share_for_display(&display);
-            room.publish_video_track(&track).await?;
-            Ok(())
+
+            let publication = this
+                .upgrade(&cx)?
+                .read_with(&cx, |this, _| {
+                    this.live_kit
+                        .as_ref()
+                        .map(|live_kit| live_kit.room.publish_video_track(&track))
+                })?
+                .await?;
+
+            this.upgrade(&cx)?.update(cx, |this, _| {
+                this.live_kit.as_mut()?.screen_track = Some(publication);
+                Some(())
+            })
         })
     }
 }
 
+struct LiveKitRoom {
+    room: Arc<live_kit_client::Room>,
+    screen_track: Option<LocalTrackPublication>,
+    _maintain_room: Task<()>,
+}
+
 #[derive(Copy, Clone, PartialEq, Eq)]
 pub enum RoomStatus {
     Online,

crates/live_kit_client/LiveKitBridge/Sources/LiveKitBridge/LiveKitBridge.swift 🔗

@@ -85,13 +85,13 @@ public func LKRoomDisconnect(room: UnsafeRawPointer) {
 }
 
 @_cdecl("LKRoomPublishVideoTrack")
-public func LKRoomPublishVideoTrack(room: UnsafeRawPointer, track: UnsafeRawPointer, callback: @escaping @convention(c) (UnsafeRawPointer, CFString?) -> Void, callback_data: UnsafeRawPointer) {
+public func LKRoomPublishVideoTrack(room: UnsafeRawPointer, track: UnsafeRawPointer, callback: @escaping @convention(c) (UnsafeRawPointer, UnsafeMutableRawPointer?, CFString?) -> Void, callback_data: UnsafeRawPointer) {
     let room = Unmanaged<Room>.fromOpaque(room).takeUnretainedValue()
     let track = Unmanaged<LocalVideoTrack>.fromOpaque(track).takeUnretainedValue()
-    room.localParticipant?.publishVideoTrack(track: track).then { _ in
-        callback(callback_data, UnsafeRawPointer(nil) as! CFString?)
+    room.localParticipant?.publishVideoTrack(track: track).then { publication in
+        callback(callback_data, Unmanaged.passRetained(publication).toOpaque(), nil)
     }.catch { error in
-        callback(callback_data, error.localizedDescription as CFString)
+        callback(callback_data, nil, error.localizedDescription as CFString)
     }
 }
 

crates/live_kit_client/src/live_kit_client.rs 🔗

@@ -45,7 +45,7 @@ extern "C" {
     fn LKRoomPublishVideoTrack(
         room: *const c_void,
         track: *const c_void,
-        callback: extern "C" fn(*mut c_void, CFStringRef),
+        callback: extern "C" fn(*mut c_void, *mut c_void, CFStringRef),
         callback_data: *mut c_void,
     );
     fn LKRoomVideoTracksForRemoteParticipant(
@@ -108,10 +108,28 @@ impl Room {
         async { rx.await.unwrap().context("error connecting to room") }
     }
 
-    pub fn publish_video_track(&self, track: &LocalVideoTrack) -> impl Future<Output = Result<()>> {
-        let (did_publish, tx, rx) = Self::build_done_callback();
+    pub fn publish_video_track(
+        &self,
+        track: &LocalVideoTrack,
+    ) -> impl Future<Output = Result<LocalTrackPublication>> {
+        let (tx, rx) = oneshot::channel::<Result<LocalTrackPublication>>();
+        extern "C" fn callback(tx: *mut c_void, publication: *mut c_void, error: CFStringRef) {
+            let tx =
+                unsafe { Box::from_raw(tx as *mut oneshot::Sender<Result<LocalTrackPublication>>) };
+            if error.is_null() {
+                let _ = tx.send(Ok(LocalTrackPublication(publication)));
+            } else {
+                let error = unsafe { CFString::wrap_under_get_rule(error).to_string() };
+                let _ = tx.send(Err(anyhow!(error)));
+            }
+        }
         unsafe {
-            LKRoomPublishVideoTrack(self.native_room, track.0, did_publish, tx);
+            LKRoomPublishVideoTrack(
+                self.native_room,
+                track.0,
+                callback,
+                Box::into_raw(Box::new(tx)) as *mut c_void,
+            );
         }
         async { rx.await.unwrap().context("error publishing video track") }
     }
@@ -275,6 +293,14 @@ impl Drop for LocalVideoTrack {
     }
 }
 
+pub struct LocalTrackPublication(*const c_void);
+
+impl Drop for LocalTrackPublication {
+    fn drop(&mut self) {
+        unsafe { CFRelease(self.0) }
+    }
+}
+
 #[derive(Debug)]
 pub struct RemoteVideoTrack {
     native_track: *const c_void,