Expose a single `updates` stream from live_kit_client::Room

Max Brunsfeld and Julia created

Co-authored-by: Julia <julia@zed.dev>

Change summary

crates/call/src/room.rs                       | 80 ++++++--------------
crates/live_kit_client/examples/test_app.rs   | 35 ++++----
crates/live_kit_client/src/live_kit_client.rs | 20 +++++
crates/live_kit_client/src/prod.rs            | 72 +++++-------------
crates/live_kit_client/src/test.rs            | 61 +++------------
5 files changed, 95 insertions(+), 173 deletions(-)

Detailed changes

crates/call/src/room.rs 🔗

@@ -15,10 +15,7 @@ use gpui::{
     AppContext, AsyncAppContext, Context, EventEmitter, Model, ModelContext, Task, WeakModel,
 };
 use language::LanguageRegistry;
-use live_kit_client::{
-    LocalAudioTrack, LocalTrackPublication, LocalVideoTrack, RemoteAudioTrackUpdate,
-    RemoteVideoTrackUpdate,
-};
+use live_kit_client::{LocalAudioTrack, LocalTrackPublication, LocalVideoTrack, RoomUpdate};
 use postage::{sink::Sink, stream::Stream, watch};
 use project::Project;
 use settings::Settings as _;
@@ -131,30 +128,11 @@ impl Room {
                 }
             });
 
-            let _maintain_video_tracks = cx.spawn({
+            let _handle_updates = cx.spawn({
                 let room = room.clone();
                 move |this, mut cx| async move {
-                    let mut track_video_changes = room.remote_video_track_updates();
-                    while let Some(track_change) = track_video_changes.next().await {
-                        let this = if let Some(this) = this.upgrade() {
-                            this
-                        } else {
-                            break;
-                        };
-
-                        this.update(&mut cx, |this, cx| {
-                            this.remote_video_track_updated(track_change, cx).log_err()
-                        })
-                        .ok();
-                    }
-                }
-            });
-
-            let _maintain_audio_tracks = cx.spawn({
-                let room = room.clone();
-                |this, mut cx| async move {
-                    let mut track_audio_changes = room.remote_audio_track_updates();
-                    while let Some(track_change) = track_audio_changes.next().await {
+                    let mut updates = room.updates();
+                    while let Some(update) = updates.next().await {
                         let this = if let Some(this) = this.upgrade() {
                             this
                         } else {
@@ -162,7 +140,7 @@ impl Room {
                         };
 
                         this.update(&mut cx, |this, cx| {
-                            this.remote_audio_track_updated(track_change, cx).log_err()
+                            this.live_kit_room_updated(update, cx).log_err()
                         })
                         .ok();
                     }
@@ -195,7 +173,7 @@ impl Room {
                 deafened: false,
                 speaking: false,
                 _maintain_room,
-                _maintain_tracks: [_maintain_video_tracks, _maintain_audio_tracks],
+                _handle_updates,
             })
         } else {
             None
@@ -877,8 +855,8 @@ impl Room {
                                     .remote_audio_track_publications(&user.id.to_string());
 
                                 for track in video_tracks {
-                                    this.remote_video_track_updated(
-                                        RemoteVideoTrackUpdate::Subscribed(track),
+                                    this.live_kit_room_updated(
+                                        RoomUpdate::SubscribedToRemoteVideoTrack(track),
                                         cx,
                                     )
                                     .log_err();
@@ -887,8 +865,8 @@ impl Room {
                                 for (track, publication) in
                                     audio_tracks.iter().zip(publications.iter())
                                 {
-                                    this.remote_audio_track_updated(
-                                        RemoteAudioTrackUpdate::Subscribed(
+                                    this.live_kit_room_updated(
+                                        RoomUpdate::SubscribedToRemoteAudioTrack(
                                             track.clone(),
                                             publication.clone(),
                                         ),
@@ -979,13 +957,13 @@ impl Room {
         }
     }
 
-    fn remote_video_track_updated(
+    fn live_kit_room_updated(
         &mut self,
-        change: RemoteVideoTrackUpdate,
+        update: RoomUpdate,
         cx: &mut ModelContext<Self>,
     ) -> Result<()> {
-        match change {
-            RemoteVideoTrackUpdate::Subscribed(track) => {
+        match update {
+            RoomUpdate::SubscribedToRemoteVideoTrack(track) => {
                 let user_id = track.publisher_id().parse()?;
                 let track_id = track.sid().to_string();
                 let participant = self
@@ -997,7 +975,8 @@ impl Room {
                     participant_id: participant.peer_id,
                 });
             }
-            RemoteVideoTrackUpdate::Unsubscribed {
+
+            RoomUpdate::UnsubscribedFromRemoteVideoTrack {
                 publisher_id,
                 track_id,
             } => {
@@ -1011,19 +990,8 @@ impl Room {
                     participant_id: participant.peer_id,
                 });
             }
-        }
-
-        cx.notify();
-        Ok(())
-    }
 
-    fn remote_audio_track_updated(
-        &mut self,
-        change: RemoteAudioTrackUpdate,
-        cx: &mut ModelContext<Self>,
-    ) -> Result<()> {
-        match change {
-            RemoteAudioTrackUpdate::ActiveSpeakersChanged { speakers } => {
+            RoomUpdate::ActiveSpeakersChanged { speakers } => {
                 let mut speaker_ids = speakers
                     .into_iter()
                     .filter_map(|speaker_sid| speaker_sid.parse().ok())
@@ -1045,9 +1013,9 @@ impl Room {
                         }
                     }
                 }
-                cx.notify();
             }
-            RemoteAudioTrackUpdate::MuteChanged { track_id, muted } => {
+
+            RoomUpdate::RemoteAudioTrackMuteChanged { track_id, muted } => {
                 let mut found = false;
                 for participant in &mut self.remote_participants.values_mut() {
                     for track in participant.audio_tracks.values() {
@@ -1061,10 +1029,9 @@ impl Room {
                         break;
                     }
                 }
-
-                cx.notify();
             }
-            RemoteAudioTrackUpdate::Subscribed(track, publication) => {
+
+            RoomUpdate::SubscribedToRemoteAudioTrack(track, publication) => {
                 let user_id = track.publisher_id().parse()?;
                 let track_id = track.sid().to_string();
                 let participant = self
@@ -1078,7 +1045,8 @@ impl Room {
                     participant_id: participant.peer_id,
                 });
             }
-            RemoteAudioTrackUpdate::Unsubscribed {
+
+            RoomUpdate::UnsubscribedFromRemoteAudioTrack {
                 publisher_id,
                 track_id,
             } => {
@@ -1597,7 +1565,7 @@ struct LiveKitRoom {
     speaking: bool,
     next_publish_id: usize,
     _maintain_room: Task<()>,
-    _maintain_tracks: [Task<()>; 2],
+    _handle_updates: Task<()>,
 }
 
 impl LiveKitRoom {

crates/live_kit_client/examples/test_app.rs 🔗

@@ -2,9 +2,7 @@ use std::{sync::Arc, time::Duration};
 
 use futures::StreamExt;
 use gpui::{actions, KeyBinding, Menu, MenuItem};
-use live_kit_client::{
-    LocalAudioTrack, LocalVideoTrack, RemoteAudioTrackUpdate, RemoteVideoTrackUpdate, Room,
-};
+use live_kit_client::{LocalAudioTrack, LocalVideoTrack, Room, RoomUpdate};
 use live_kit_server::token::{self, VideoGrant};
 use log::LevelFilter;
 use simplelog::SimpleLogger;
@@ -60,12 +58,12 @@ fn main() {
             let room_b = Room::new();
             room_b.connect(&live_kit_url, &user2_token).await.unwrap();
 
-            let mut audio_track_updates = room_b.remote_audio_track_updates();
+            let mut room_updates = room_b.updates();
             let audio_track = LocalAudioTrack::create();
             let audio_track_publication = room_a.publish_audio_track(audio_track).await.unwrap();
 
-            if let RemoteAudioTrackUpdate::Subscribed(track, _) =
-                audio_track_updates.next().await.unwrap()
+            if let RoomUpdate::SubscribedToRemoteAudioTrack(track, _) =
+                room_updates.next().await.unwrap()
             {
                 let remote_tracks = room_b.remote_audio_tracks("test-participant-1");
                 assert_eq!(remote_tracks.len(), 1);
@@ -78,8 +76,8 @@ fn main() {
             audio_track_publication.set_mute(true).await.unwrap();
 
             println!("waiting for mute changed!");
-            if let RemoteAudioTrackUpdate::MuteChanged { track_id, muted } =
-                audio_track_updates.next().await.unwrap()
+            if let RoomUpdate::RemoteAudioTrackMuteChanged { track_id, muted } =
+                room_updates.next().await.unwrap()
             {
                 let remote_tracks = room_b.remote_audio_tracks("test-participant-1");
                 assert_eq!(remote_tracks[0].sid(), track_id);
@@ -90,8 +88,8 @@ fn main() {
 
             audio_track_publication.set_mute(false).await.unwrap();
 
-            if let RemoteAudioTrackUpdate::MuteChanged { track_id, muted } =
-                audio_track_updates.next().await.unwrap()
+            if let RoomUpdate::RemoteAudioTrackMuteChanged { track_id, muted } =
+                room_updates.next().await.unwrap()
             {
                 let remote_tracks = room_b.remote_audio_tracks("test-participant-1");
                 assert_eq!(remote_tracks[0].sid(), track_id);
@@ -110,13 +108,13 @@ fn main() {
             room_a.unpublish_track(audio_track_publication);
 
             // Clear out any active speakers changed messages
-            let mut next = audio_track_updates.next().await.unwrap();
-            while let RemoteAudioTrackUpdate::ActiveSpeakersChanged { speakers } = next {
+            let mut next = room_updates.next().await.unwrap();
+            while let RoomUpdate::ActiveSpeakersChanged { speakers } = next {
                 println!("Speakers changed: {:?}", speakers);
-                next = audio_track_updates.next().await.unwrap();
+                next = room_updates.next().await.unwrap();
             }
 
-            if let RemoteAudioTrackUpdate::Unsubscribed {
+            if let RoomUpdate::UnsubscribedFromRemoteAudioTrack {
                 publisher_id,
                 track_id,
             } = next
@@ -128,7 +126,6 @@ fn main() {
                 panic!("unexpected message");
             }
 
-            let mut video_track_updates = room_b.remote_video_track_updates();
             let displays = room_a.display_sources().await.unwrap();
             let display = displays.into_iter().next().unwrap();
 
@@ -136,8 +133,8 @@ fn main() {
             let local_video_track_publication =
                 room_a.publish_video_track(local_video_track).await.unwrap();
 
-            if let RemoteVideoTrackUpdate::Subscribed(track) =
-                video_track_updates.next().await.unwrap()
+            if let RoomUpdate::SubscribedToRemoteVideoTrack(track) =
+                room_updates.next().await.unwrap()
             {
                 let remote_video_tracks = room_b.remote_video_tracks("test-participant-1");
                 assert_eq!(remote_video_tracks.len(), 1);
@@ -152,10 +149,10 @@ fn main() {
                 .pop()
                 .unwrap();
             room_a.unpublish_track(local_video_track_publication);
-            if let RemoteVideoTrackUpdate::Unsubscribed {
+            if let RoomUpdate::UnsubscribedFromRemoteVideoTrack {
                 publisher_id,
                 track_id,
-            } = video_track_updates.next().await.unwrap()
+            } = room_updates.next().await.unwrap()
             {
                 assert_eq!(publisher_id, "test-participant-1");
                 assert_eq!(remote_video_track.sid(), track_id);

crates/live_kit_client/src/live_kit_client.rs 🔗

@@ -1,3 +1,5 @@
+use std::sync::Arc;
+
 #[cfg(not(any(test, feature = "test-support")))]
 pub mod prod;
 
@@ -9,3 +11,21 @@ pub mod test;
 
 #[cfg(any(test, feature = "test-support"))]
 pub use test::*;
+
+pub type Sid = String;
+
+#[derive(Clone, Eq, PartialEq)]
+pub enum ConnectionState {
+    Disconnected,
+    Connected { url: String, token: String },
+}
+
+#[derive(Clone)]
+pub enum RoomUpdate {
+    ActiveSpeakersChanged { speakers: Vec<Sid> },
+    RemoteAudioTrackMuteChanged { track_id: Sid, muted: bool },
+    SubscribedToRemoteVideoTrack(Arc<RemoteVideoTrack>),
+    SubscribedToRemoteAudioTrack(Arc<RemoteAudioTrack>, Arc<RemoteTrackPublication>),
+    UnsubscribedFromRemoteVideoTrack { publisher_id: Sid, track_id: Sid },
+    UnsubscribedFromRemoteAudioTrack { publisher_id: Sid, track_id: Sid },
+}

crates/live_kit_client/src/prod.rs 🔗

@@ -1,3 +1,4 @@
+use crate::{ConnectionState, RoomUpdate, Sid};
 use anyhow::{anyhow, Context, Result};
 use core_foundation::{
     array::{CFArray, CFArrayRef},
@@ -155,22 +156,13 @@ extern "C" {
     fn LKRemoteTrackPublicationGetSid(publication: swift::RemoteTrackPublication) -> CFStringRef;
 }
 
-pub type Sid = String;
-
-#[derive(Clone, Eq, PartialEq)]
-pub enum ConnectionState {
-    Disconnected,
-    Connected { url: String, token: String },
-}
-
 pub struct Room {
     native_room: swift::Room,
     connection: Mutex<(
         watch::Sender<ConnectionState>,
         watch::Receiver<ConnectionState>,
     )>,
-    remote_audio_track_subscribers: Mutex<Vec<mpsc::UnboundedSender<RemoteAudioTrackUpdate>>>,
-    remote_video_track_subscribers: Mutex<Vec<mpsc::UnboundedSender<RemoteVideoTrackUpdate>>>,
+    update_subscribers: Mutex<Vec<mpsc::UnboundedSender<RoomUpdate>>>,
     _delegate: RoomDelegate,
 }
 
@@ -181,8 +173,7 @@ impl Room {
             Self {
                 native_room: unsafe { LKRoomCreate(delegate.native_delegate) },
                 connection: Mutex::new(watch::channel_with(ConnectionState::Disconnected)),
-                remote_audio_track_subscribers: Default::default(),
-                remote_video_track_subscribers: Default::default(),
+                update_subscribers: Default::default(),
                 _delegate: delegate,
             }
         })
@@ -397,15 +388,9 @@ impl Room {
         }
     }
 
-    pub fn remote_audio_track_updates(&self) -> mpsc::UnboundedReceiver<RemoteAudioTrackUpdate> {
-        let (tx, rx) = mpsc::unbounded();
-        self.remote_audio_track_subscribers.lock().push(tx);
-        rx
-    }
-
-    pub fn remote_video_track_updates(&self) -> mpsc::UnboundedReceiver<RemoteVideoTrackUpdate> {
+    pub fn updates(&self) -> mpsc::UnboundedReceiver<RoomUpdate> {
         let (tx, rx) = mpsc::unbounded();
-        self.remote_video_track_subscribers.lock().push(tx);
+        self.update_subscribers.lock().push(tx);
         rx
     }
 
@@ -416,8 +401,8 @@ impl Room {
     ) {
         let track = Arc::new(track);
         let publication = Arc::new(publication);
-        self.remote_audio_track_subscribers.lock().retain(|tx| {
-            tx.unbounded_send(RemoteAudioTrackUpdate::Subscribed(
+        self.update_subscribers.lock().retain(|tx| {
+            tx.unbounded_send(RoomUpdate::SubscribedToRemoteAudioTrack(
                 track.clone(),
                 publication.clone(),
             ))
@@ -426,8 +411,8 @@ impl Room {
     }
 
     fn did_unsubscribe_from_remote_audio_track(&self, publisher_id: String, track_id: String) {
-        self.remote_audio_track_subscribers.lock().retain(|tx| {
-            tx.unbounded_send(RemoteAudioTrackUpdate::Unsubscribed {
+        self.update_subscribers.lock().retain(|tx| {
+            tx.unbounded_send(RoomUpdate::UnsubscribedFromRemoteAudioTrack {
                 publisher_id: publisher_id.clone(),
                 track_id: track_id.clone(),
             })
@@ -436,8 +421,8 @@ impl Room {
     }
 
     fn mute_changed_from_remote_audio_track(&self, track_id: String, muted: bool) {
-        self.remote_audio_track_subscribers.lock().retain(|tx| {
-            tx.unbounded_send(RemoteAudioTrackUpdate::MuteChanged {
+        self.update_subscribers.lock().retain(|tx| {
+            tx.unbounded_send(RoomUpdate::RemoteAudioTrackMuteChanged {
                 track_id: track_id.clone(),
                 muted,
             })
@@ -445,29 +430,26 @@ impl Room {
         });
     }
 
-    // A vec of publisher IDs
     fn active_speakers_changed(&self, speakers: Vec<String>) {
-        self.remote_audio_track_subscribers
-            .lock()
-            .retain(move |tx| {
-                tx.unbounded_send(RemoteAudioTrackUpdate::ActiveSpeakersChanged {
-                    speakers: speakers.clone(),
-                })
-                .is_ok()
-            });
+        self.update_subscribers.lock().retain(move |tx| {
+            tx.unbounded_send(RoomUpdate::ActiveSpeakersChanged {
+                speakers: speakers.clone(),
+            })
+            .is_ok()
+        });
     }
 
     fn did_subscribe_to_remote_video_track(&self, track: RemoteVideoTrack) {
         let track = Arc::new(track);
-        self.remote_video_track_subscribers.lock().retain(|tx| {
-            tx.unbounded_send(RemoteVideoTrackUpdate::Subscribed(track.clone()))
+        self.update_subscribers.lock().retain(|tx| {
+            tx.unbounded_send(RoomUpdate::SubscribedToRemoteVideoTrack(track.clone()))
                 .is_ok()
         });
     }
 
     fn did_unsubscribe_from_remote_video_track(&self, publisher_id: String, track_id: String) {
-        self.remote_video_track_subscribers.lock().retain(|tx| {
-            tx.unbounded_send(RemoteVideoTrackUpdate::Unsubscribed {
+        self.update_subscribers.lock().retain(|tx| {
+            tx.unbounded_send(RoomUpdate::UnsubscribedFromRemoteVideoTrack {
                 publisher_id: publisher_id.clone(),
                 track_id: track_id.clone(),
             })
@@ -889,18 +871,6 @@ impl Drop for RemoteVideoTrack {
     }
 }
 
-pub enum RemoteVideoTrackUpdate {
-    Subscribed(Arc<RemoteVideoTrack>),
-    Unsubscribed { publisher_id: Sid, track_id: Sid },
-}
-
-pub enum RemoteAudioTrackUpdate {
-    ActiveSpeakersChanged { speakers: Vec<Sid> },
-    MuteChanged { track_id: Sid, muted: bool },
-    Subscribed(Arc<RemoteAudioTrack>, Arc<RemoteTrackPublication>),
-    Unsubscribed { publisher_id: Sid, track_id: Sid },
-}
-
 pub struct MacOSDisplay(swift::MacOSDisplay);
 
 impl MacOSDisplay {

crates/live_kit_client/src/test.rs 🔗

@@ -1,3 +1,4 @@
+use crate::{ConnectionState, RoomUpdate, Sid};
 use anyhow::{anyhow, Context, Result};
 use async_trait::async_trait;
 use collections::{BTreeMap, HashMap};
@@ -104,9 +105,8 @@ impl TestServer {
                 client_room
                     .0
                     .lock()
-                    .video_track_updates
-                    .0
-                    .try_broadcast(RemoteVideoTrackUpdate::Subscribed(track.clone()))
+                    .updates_tx
+                    .try_broadcast(RoomUpdate::SubscribedToRemoteVideoTrack(track.clone()))
                     .unwrap();
             }
             room.client_rooms.insert(identity, client_room);
@@ -211,9 +211,8 @@ impl TestServer {
                 let _ = client_room
                     .0
                     .lock()
-                    .video_track_updates
-                    .0
-                    .try_broadcast(RemoteVideoTrackUpdate::Subscribed(track.clone()))
+                    .updates_tx
+                    .try_broadcast(RoomUpdate::SubscribedToRemoteVideoTrack(track.clone()))
                     .unwrap();
             }
         }
@@ -261,9 +260,8 @@ impl TestServer {
                 let _ = client_room
                     .0
                     .lock()
-                    .audio_track_updates
-                    .0
-                    .try_broadcast(RemoteAudioTrackUpdate::Subscribed(
+                    .updates_tx
+                    .try_broadcast(RoomUpdate::SubscribedToRemoteAudioTrack(
                         track.clone(),
                         publication.clone(),
                     ))
@@ -369,39 +367,26 @@ impl live_kit_server::api::Client for TestApiClient {
     }
 }
 
-pub type Sid = String;
-
 struct RoomState {
     connection: (
         watch::Sender<ConnectionState>,
         watch::Receiver<ConnectionState>,
     ),
     display_sources: Vec<MacOSDisplay>,
-    audio_track_updates: (
-        async_broadcast::Sender<RemoteAudioTrackUpdate>,
-        async_broadcast::Receiver<RemoteAudioTrackUpdate>,
-    ),
-    video_track_updates: (
-        async_broadcast::Sender<RemoteVideoTrackUpdate>,
-        async_broadcast::Receiver<RemoteVideoTrackUpdate>,
-    ),
-}
-
-#[derive(Clone, Eq, PartialEq)]
-pub enum ConnectionState {
-    Disconnected,
-    Connected { url: String, token: String },
+    updates_tx: async_broadcast::Sender<RoomUpdate>,
+    updates_rx: async_broadcast::Receiver<RoomUpdate>,
 }
 
 pub struct Room(Mutex<RoomState>);
 
 impl Room {
     pub fn new() -> Arc<Self> {
+        let (updates_tx, updates_rx) = async_broadcast::broadcast(128);
         Arc::new(Self(Mutex::new(RoomState {
             connection: watch::channel_with(ConnectionState::Disconnected),
             display_sources: Default::default(),
-            video_track_updates: async_broadcast::broadcast(128),
-            audio_track_updates: async_broadcast::broadcast(128),
+            updates_tx,
+            updates_rx,
         })))
     }
 
@@ -505,12 +490,8 @@ impl Room {
             .collect()
     }
 
-    pub fn remote_audio_track_updates(&self) -> impl Stream<Item = RemoteAudioTrackUpdate> {
-        self.0.lock().audio_track_updates.1.clone()
-    }
-
-    pub fn remote_video_track_updates(&self) -> impl Stream<Item = RemoteVideoTrackUpdate> {
-        self.0.lock().video_track_updates.1.clone()
+    pub fn updates(&self) -> impl Stream<Item = RoomUpdate> {
+        self.0.lock().updates_rx.clone()
     }
 
     pub fn set_display_sources(&self, sources: Vec<MacOSDisplay>) {
@@ -646,20 +627,6 @@ impl RemoteAudioTrack {
     }
 }
 
-#[derive(Clone)]
-pub enum RemoteVideoTrackUpdate {
-    Subscribed(Arc<RemoteVideoTrack>),
-    Unsubscribed { publisher_id: Sid, track_id: Sid },
-}
-
-#[derive(Clone)]
-pub enum RemoteAudioTrackUpdate {
-    ActiveSpeakersChanged { speakers: Vec<Sid> },
-    MuteChanged { track_id: Sid, muted: bool },
-    Subscribed(Arc<RemoteAudioTrack>, Arc<RemoteTrackPublication>),
-    Unsubscribed { publisher_id: Sid, track_id: Sid },
-}
-
 #[derive(Clone)]
 pub struct MacOSDisplay {
     frames: (