Detect when a track is unpublished due to reconnecting to livekit

Max Brunsfeld and Julia created

Co-authored-by: Julia <julia@zed.dev>

Change summary

crates/call/src/room.rs                                                        | 22 
crates/live_kit_client/LiveKitBridge/Sources/LiveKitBridge/LiveKitBridge.swift | 50 
crates/live_kit_client/src/live_kit_client.rs                                  |  4 
crates/live_kit_client/src/prod.rs                                             | 71 
crates/live_kit_client/src/test.rs                                             | 65 
5 files changed, 195 insertions(+), 17 deletions(-)

Detailed changes

crates/call/src/room.rs 🔗

@@ -1060,6 +1060,28 @@ impl Room {
                     participant_id: participant.peer_id,
                 });
             }
+
+            RoomUpdate::LocalAudioTrackUnpublished { publication } => {
+                log::info!("unpublished audio track {}", publication.sid());
+                if let Some(room) = &mut self.live_kit {
+                    room.microphone_track = LocalTrack::None;
+                }
+            }
+
+            RoomUpdate::LocalVideoTrackUnpublished { publication } => {
+                log::info!("unpublished video track {}", publication.sid());
+                if let Some(room) = &mut self.live_kit {
+                    room.screen_track = LocalTrack::None;
+                }
+            }
+
+            RoomUpdate::LocalAudioTrackPublished { publication } => {
+                log::info!("published audio track {}", publication.sid());
+            }
+
+            RoomUpdate::LocalVideoTrackPublished { publication } => {
+                log::info!("published video track {}", publication.sid());
+            }
         }
 
         cx.notify();

crates/live_kit_client/LiveKitBridge/Sources/LiveKitBridge/LiveKitBridge.swift 🔗

@@ -12,6 +12,8 @@ class LKRoomDelegate: RoomDelegate {
     var onActiveSpeakersChanged: @convention(c) (UnsafeRawPointer, CFArray) -> Void
     var onDidSubscribeToRemoteVideoTrack: @convention(c) (UnsafeRawPointer, CFString, CFString, UnsafeRawPointer) -> Void
     var onDidUnsubscribeFromRemoteVideoTrack: @convention(c) (UnsafeRawPointer, CFString, CFString) -> Void
+    var onDidPublishOrUnpublishLocalAudioTrack: @convention(c) (UnsafeRawPointer, UnsafeRawPointer, Bool) -> Void
+    var onDidPublishOrUnpublishLocalVideoTrack: @convention(c) (UnsafeRawPointer, UnsafeRawPointer, Bool) -> Void
 
     init(
         data: UnsafeRawPointer,
@@ -21,7 +23,10 @@ class LKRoomDelegate: RoomDelegate {
         onMuteChangedFromRemoteAudioTrack: @escaping @convention(c) (UnsafeRawPointer, CFString, Bool) -> Void,
         onActiveSpeakersChanged: @convention(c) (UnsafeRawPointer, CFArray) -> Void,
         onDidSubscribeToRemoteVideoTrack: @escaping @convention(c) (UnsafeRawPointer, CFString, CFString, UnsafeRawPointer) -> Void,
-        onDidUnsubscribeFromRemoteVideoTrack: @escaping @convention(c) (UnsafeRawPointer, CFString, CFString) -> Void)
+        onDidUnsubscribeFromRemoteVideoTrack: @escaping @convention(c) (UnsafeRawPointer, CFString, CFString) -> Void,
+        onDidPublishOrUnpublishLocalAudioTrack: @escaping @convention(c) (UnsafeRawPointer, UnsafeRawPointer, Bool) -> Void,
+        onDidPublishOrUnpublishLocalVideoTrack: @escaping @convention(c) (UnsafeRawPointer, UnsafeRawPointer, Bool) -> Void
+    )
     {
         self.data = data
         self.onDidDisconnect = onDidDisconnect
@@ -31,6 +36,8 @@ class LKRoomDelegate: RoomDelegate {
         self.onDidUnsubscribeFromRemoteVideoTrack = onDidUnsubscribeFromRemoteVideoTrack
         self.onMuteChangedFromRemoteAudioTrack = onMuteChangedFromRemoteAudioTrack
         self.onActiveSpeakersChanged = onActiveSpeakersChanged
+        self.onDidPublishOrUnpublishLocalAudioTrack = onDidPublishOrUnpublishLocalAudioTrack
+        self.onDidPublishOrUnpublishLocalVideoTrack = onDidPublishOrUnpublishLocalVideoTrack
     }
 
     func room(_ room: Room, didUpdate connectionState: ConnectionState, oldValue: ConnectionState) {
@@ -65,6 +72,22 @@ class LKRoomDelegate: RoomDelegate {
             self.onDidUnsubscribeFromRemoteAudioTrack(self.data, participant.identity as CFString, track.sid! as CFString)
         }
     }
+
+    func room(_ room: Room, localParticipant: LocalParticipant, didPublish publication: LocalTrackPublication) {
+        if publication.kind == .video {
+            self.onDidPublishOrUnpublishLocalVideoTrack(self.data, Unmanaged.passUnretained(publication).toOpaque(), true)
+        } else if publication.kind == .audio {
+            self.onDidPublishOrUnpublishLocalAudioTrack(self.data, Unmanaged.passUnretained(publication).toOpaque(), true)
+        }
+    }
+
+    func room(_ room: Room, localParticipant: LocalParticipant, didUnpublish publication: LocalTrackPublication) {
+        if publication.kind == .video {
+            self.onDidPublishOrUnpublishLocalVideoTrack(self.data, Unmanaged.passUnretained(publication).toOpaque(), false)
+        } else if publication.kind == .audio {
+            self.onDidPublishOrUnpublishLocalAudioTrack(self.data, Unmanaged.passUnretained(publication).toOpaque(), false)
+        }
+    }
 }
 
 class LKVideoRenderer: NSObject, VideoRenderer {
@@ -109,7 +132,9 @@ public func LKRoomDelegateCreate(
     onMuteChangedFromRemoteAudioTrack: @escaping @convention(c) (UnsafeRawPointer, CFString, Bool) -> Void,
     onActiveSpeakerChanged: @escaping @convention(c) (UnsafeRawPointer, CFArray) -> Void,
     onDidSubscribeToRemoteVideoTrack: @escaping @convention(c) (UnsafeRawPointer, CFString, CFString, UnsafeRawPointer) -> Void,
-    onDidUnsubscribeFromRemoteVideoTrack: @escaping @convention(c) (UnsafeRawPointer, CFString, CFString) -> Void
+    onDidUnsubscribeFromRemoteVideoTrack: @escaping @convention(c) (UnsafeRawPointer, CFString, CFString) -> Void,
+    onDidPublishOrUnpublishLocalAudioTrack: @escaping @convention(c) (UnsafeRawPointer, UnsafeRawPointer, Bool) -> Void,
+    onDidPublishOrUnpublishLocalVideoTrack: @escaping @convention(c) (UnsafeRawPointer, UnsafeRawPointer, Bool) -> Void
 ) -> UnsafeMutableRawPointer {
     let delegate = LKRoomDelegate(
         data: data,
@@ -119,7 +144,9 @@ public func LKRoomDelegateCreate(
         onMuteChangedFromRemoteAudioTrack: onMuteChangedFromRemoteAudioTrack,
         onActiveSpeakersChanged: onActiveSpeakerChanged,
         onDidSubscribeToRemoteVideoTrack: onDidSubscribeToRemoteVideoTrack,
-        onDidUnsubscribeFromRemoteVideoTrack: onDidUnsubscribeFromRemoteVideoTrack
+        onDidUnsubscribeFromRemoteVideoTrack: onDidUnsubscribeFromRemoteVideoTrack,
+        onDidPublishOrUnpublishLocalAudioTrack: onDidPublishOrUnpublishLocalAudioTrack,
+        onDidPublishOrUnpublishLocalVideoTrack: onDidPublishOrUnpublishLocalVideoTrack
     )
     return Unmanaged.passRetained(delegate).toOpaque()
 }
@@ -292,6 +319,14 @@ public func LKLocalTrackPublicationSetMute(
     }
 }
 
+@_cdecl("LKLocalTrackPublicationIsMuted")
+public func LKLocalTrackPublicationIsMuted(
+    publication: UnsafeRawPointer
+) -> Bool {
+    let publication = Unmanaged<LocalTrackPublication>.fromOpaque(publication).takeUnretainedValue()
+    return publication.muted
+}
+
 @_cdecl("LKRemoteTrackPublicationSetEnabled")
 public func LKRemoteTrackPublicationSetEnabled(
     publication: UnsafeRawPointer,
@@ -325,3 +360,12 @@ public func LKRemoteTrackPublicationGetSid(
 
     return publication.sid as CFString
 }
+
+@_cdecl("LKLocalTrackPublicationGetSid")
+public func LKLocalTrackPublicationGetSid(
+    publication: UnsafeRawPointer
+) -> CFString {
+    let publication = Unmanaged<LocalTrackPublication>.fromOpaque(publication).takeUnretainedValue()
+
+    return publication.sid as CFString
+}

crates/live_kit_client/src/live_kit_client.rs 🔗

@@ -28,4 +28,8 @@ pub enum RoomUpdate {
     SubscribedToRemoteAudioTrack(Arc<RemoteAudioTrack>, Arc<RemoteTrackPublication>),
     UnsubscribedFromRemoteVideoTrack { publisher_id: Sid, track_id: Sid },
     UnsubscribedFromRemoteAudioTrack { publisher_id: Sid, track_id: Sid },
+    LocalAudioTrackPublished { publication: LocalTrackPublication },
+    LocalAudioTrackUnpublished { publication: LocalTrackPublication },
+    LocalVideoTrackPublished { publication: LocalTrackPublication },
+    LocalVideoTrackUnpublished { publication: LocalTrackPublication },
 }

crates/live_kit_client/src/prod.rs 🔗

@@ -77,6 +77,16 @@ extern "C" {
             publisher_id: CFStringRef,
             track_id: CFStringRef,
         ),
+        on_did_publish_or_unpublish_local_audio_track: extern "C" fn(
+            callback_data: *mut c_void,
+            publication: swift::LocalTrackPublication,
+            is_published: bool,
+        ),
+        on_did_publish_or_unpublish_local_video_track: extern "C" fn(
+            callback_data: *mut c_void,
+            publication: swift::LocalTrackPublication,
+            is_published: bool,
+        ),
     ) -> swift::RoomDelegate;
 
     fn LKRoomCreate(delegate: swift::RoomDelegate) -> swift::Room;
@@ -152,7 +162,9 @@ extern "C" {
         callback_data: *mut c_void,
     );
 
+    fn LKLocalTrackPublicationIsMuted(publication: swift::LocalTrackPublication) -> bool;
     fn LKRemoteTrackPublicationIsMuted(publication: swift::RemoteTrackPublication) -> bool;
+    fn LKLocalTrackPublicationGetSid(publication: swift::LocalTrackPublication) -> CFStringRef;
     fn LKRemoteTrackPublicationGetSid(publication: swift::RemoteTrackPublication) -> CFStringRef;
 }
 
@@ -511,6 +523,8 @@ impl RoomDelegate {
                 Self::on_active_speakers_changed,
                 Self::on_did_subscribe_to_remote_video_track,
                 Self::on_did_unsubscribe_from_remote_video_track,
+                Self::on_did_publish_or_unpublish_local_audio_track,
+                Self::on_did_publish_or_unpublish_local_video_track,
             )
         };
         Self {
@@ -624,6 +638,46 @@ impl RoomDelegate {
         }
         let _ = Weak::into_raw(room);
     }
+
+    extern "C" fn on_did_publish_or_unpublish_local_audio_track(
+        room: *mut c_void,
+        publication: swift::LocalTrackPublication,
+        is_published: bool,
+    ) {
+        let room = unsafe { Weak::from_raw(room as *mut Room) };
+        if let Some(room) = room.upgrade() {
+            let publication = LocalTrackPublication::new(publication);
+            let update = if is_published {
+                RoomUpdate::LocalAudioTrackPublished { publication }
+            } else {
+                RoomUpdate::LocalAudioTrackUnpublished { publication }
+            };
+            room.update_subscribers
+                .lock()
+                .retain(|tx| tx.unbounded_send(update.clone()).is_ok());
+        }
+        let _ = Weak::into_raw(room);
+    }
+
+    extern "C" fn on_did_publish_or_unpublish_local_video_track(
+        room: *mut c_void,
+        publication: swift::LocalTrackPublication,
+        is_published: bool,
+    ) {
+        let room = unsafe { Weak::from_raw(room as *mut Room) };
+        if let Some(room) = room.upgrade() {
+            let publication = LocalTrackPublication::new(publication);
+            let update = if is_published {
+                RoomUpdate::LocalVideoTrackPublished { publication }
+            } else {
+                RoomUpdate::LocalVideoTrackUnpublished { publication }
+            };
+            room.update_subscribers
+                .lock()
+                .retain(|tx| tx.unbounded_send(update.clone()).is_ok());
+        }
+        let _ = Weak::into_raw(room);
+    }
 }
 
 impl Drop for RoomDelegate {
@@ -673,6 +727,10 @@ impl LocalTrackPublication {
         Self(native_track_publication)
     }
 
+    pub fn sid(&self) -> String {
+        unsafe { CFString::wrap_under_get_rule(LKLocalTrackPublicationGetSid(self.0)).to_string() }
+    }
+
     pub fn set_mute(&self, muted: bool) -> impl Future<Output = Result<()>> {
         let (tx, rx) = futures::channel::oneshot::channel();
 
@@ -697,6 +755,19 @@ impl LocalTrackPublication {
 
         async move { rx.await.unwrap() }
     }
+
+    pub fn is_muted(&self) -> bool {
+        unsafe { LKLocalTrackPublicationIsMuted(self.0) }
+    }
+}
+
+impl Clone for LocalTrackPublication {
+    fn clone(&self) -> Self {
+        unsafe {
+            CFRetain(self.0 .0);
+        }
+        Self(self.0)
+    }
 }
 
 impl Drop for LocalTrackPublication {

crates/live_kit_client/src/test.rs 🔗

@@ -8,7 +8,14 @@ use live_kit_server::{proto, token};
 use media::core_video::CVImageBuffer;
 use parking_lot::Mutex;
 use postage::watch;
-use std::{future::Future, mem, sync::Arc};
+use std::{
+    future::Future,
+    mem,
+    sync::{
+        atomic::{AtomicBool, Ordering::SeqCst},
+        Arc,
+    },
+};
 
 static SERVERS: Mutex<BTreeMap<String, Arc<TestServer>>> = Mutex::new(BTreeMap::new());
 
@@ -176,7 +183,11 @@ impl TestServer {
         }
     }
 
-    async fn publish_video_track(&self, token: String, local_track: LocalVideoTrack) -> Result<()> {
+    async fn publish_video_track(
+        &self,
+        token: String,
+        local_track: LocalVideoTrack,
+    ) -> Result<Sid> {
         self.executor.simulate_random_delay().await;
         let claims = live_kit_server::token::validate(&token, &self.secret_key)?;
         let identity = claims.sub.unwrap().to_string();
@@ -198,8 +209,9 @@ impl TestServer {
             return Err(anyhow!("user is not allowed to publish"));
         }
 
+        let sid = nanoid::nanoid!(17);
         let track = Arc::new(RemoteVideoTrack {
-            sid: nanoid::nanoid!(17),
+            sid: sid.clone(),
             publisher_id: identity.clone(),
             frames_rx: local_track.frames_rx.clone(),
         });
@@ -217,14 +229,14 @@ impl TestServer {
             }
         }
 
-        Ok(())
+        Ok(sid)
     }
 
     async fn publish_audio_track(
         &self,
         token: String,
         _local_track: &LocalAudioTrack,
-    ) -> Result<()> {
+    ) -> Result<Sid> {
         self.executor.simulate_random_delay().await;
         let claims = live_kit_server::token::validate(&token, &self.secret_key)?;
         let identity = claims.sub.unwrap().to_string();
@@ -246,8 +258,9 @@ impl TestServer {
             return Err(anyhow!("user is not allowed to publish"));
         }
 
+        let sid = nanoid::nanoid!(17);
         let track = Arc::new(RemoteAudioTrack {
-            sid: nanoid::nanoid!(17),
+            sid: sid.clone(),
             publisher_id: identity.clone(),
         });
 
@@ -269,7 +282,7 @@ impl TestServer {
             }
         }
 
-        Ok(())
+        Ok(sid)
     }
 
     fn video_tracks(&self, token: String) -> Result<Vec<Arc<RemoteVideoTrack>>> {
@@ -425,10 +438,14 @@ impl Room {
         let this = self.clone();
         let track = track.clone();
         async move {
-            this.test_server()
+            let sid = this
+                .test_server()
                 .publish_video_track(this.token(), track)
                 .await?;
-            Ok(LocalTrackPublication)
+            Ok(LocalTrackPublication {
+                muted: Default::default(),
+                sid,
+            })
         }
     }
     pub fn publish_audio_track(
@@ -438,10 +455,14 @@ impl Room {
         let this = self.clone();
         let track = track.clone();
         async move {
-            this.test_server()
+            let sid = this
+                .test_server()
                 .publish_audio_track(this.token(), &track)
                 .await?;
-            Ok(LocalTrackPublication)
+            Ok(LocalTrackPublication {
+                muted: Default::default(),
+                sid,
+            })
         }
     }
 
@@ -536,11 +557,27 @@ impl Drop for Room {
     }
 }
 
-pub struct LocalTrackPublication;
+#[derive(Clone)]
+pub struct LocalTrackPublication {
+    sid: String,
+    muted: Arc<AtomicBool>,
+}
 
 impl LocalTrackPublication {
-    pub fn set_mute(&self, _mute: bool) -> impl Future<Output = Result<()>> {
-        async { Ok(()) }
+    pub fn set_mute(&self, mute: bool) -> impl Future<Output = Result<()>> {
+        let muted = self.muted.clone();
+        async move {
+            muted.store(mute, SeqCst);
+            Ok(())
+        }
+    }
+
+    pub fn is_muted(&self) -> bool {
+        self.muted.load(SeqCst)
+    }
+
+    pub fn sid(&self) -> String {
+        self.sid.clone()
     }
 }