Fix failure to see screenshare tracks that were started prior to joining a call

Max Brunsfeld and Antonio Scandurra created

Co-authored-by: Antonio Scandurra <antonio@zed.dev>

Change summary

crates/call/src/room.rs                      |  2 
crates/client/src/user.rs                    |  5 +
crates/collab/src/tests/integration_tests.rs | 93 ++++++++++++++++++++++
crates/live_kit_client/src/test.rs           | 49 ++++++++++-
4 files changed, 143 insertions(+), 6 deletions(-)

Detailed changes

crates/call/src/room.rs 🔗

@@ -626,7 +626,7 @@ impl Room {
 
                             if let Some(live_kit) = this.live_kit.as_ref() {
                                 let tracks =
-                                    live_kit.room.remote_video_tracks(&peer_id.to_string());
+                                    live_kit.room.remote_video_tracks(&user.id.to_string());
                                 for track in tracks {
                                     this.remote_video_track_updated(
                                         RemoteVideoTrackUpdate::Subscribed(track),

crates/client/src/user.rs 🔗

@@ -183,6 +183,11 @@ impl UserStore {
         }
     }
 
+    #[cfg(feature = "test-support")]
+    pub fn clear_cache(&mut self) {
+        self.users.clear();
+    }
+
     async fn handle_update_invite_info(
         this: ModelHandle<Self>,
         message: TypedEnvelope<proto::UpdateInviteInfo>,

crates/collab/src/tests/integration_tests.rs 🔗

@@ -6292,6 +6292,99 @@ async fn test_basic_following(
     );
 }
 
+#[gpui::test(iterations = 10)]
+async fn test_join_call_after_screen_was_shared(
+    deterministic: Arc<Deterministic>,
+    cx_a: &mut TestAppContext,
+    cx_b: &mut TestAppContext,
+) {
+    deterministic.forbid_parking();
+    let mut server = TestServer::start(&deterministic).await;
+
+    let client_a = server.create_client(cx_a, "user_a").await;
+    let client_b = server.create_client(cx_b, "user_b").await;
+    server
+        .make_contacts(&mut [(&client_a, cx_a), (&client_b, cx_b)])
+        .await;
+
+    let active_call_a = cx_a.read(ActiveCall::global);
+    let active_call_b = cx_b.read(ActiveCall::global);
+
+    // Call users B and C from client A.
+    active_call_a
+        .update(cx_a, |call, cx| {
+            call.invite(client_b.user_id().unwrap(), None, cx)
+        })
+        .await
+        .unwrap();
+    let room_a = active_call_a.read_with(cx_a, |call, _| call.room().unwrap().clone());
+    deterministic.run_until_parked();
+    assert_eq!(
+        room_participants(&room_a, cx_a),
+        RoomParticipants {
+            remote: Default::default(),
+            pending: vec!["user_b".to_string()]
+        }
+    );
+
+    // User B receives the call.
+    let mut incoming_call_b = active_call_b.read_with(cx_b, |call, _| call.incoming());
+    let call_b = incoming_call_b.next().await.unwrap().unwrap();
+    assert_eq!(call_b.calling_user.github_login, "user_a");
+
+    // User A shares their screen
+    let display = MacOSDisplay::new();
+    active_call_a
+        .update(cx_a, |call, cx| {
+            call.room().unwrap().update(cx, |room, cx| {
+                room.set_display_sources(vec![display.clone()]);
+                room.share_screen(cx)
+            })
+        })
+        .await
+        .unwrap();
+
+    client_b.user_store.update(cx_b, |user_store, _| {
+        user_store.clear_cache();
+    });
+
+    // User B joins the room
+    active_call_b
+        .update(cx_b, |call, cx| call.accept_incoming(cx))
+        .await
+        .unwrap();
+    let room_b = active_call_b.read_with(cx_b, |call, _| call.room().unwrap().clone());
+    assert!(incoming_call_b.next().await.unwrap().is_none());
+
+    deterministic.run_until_parked();
+    assert_eq!(
+        room_participants(&room_a, cx_a),
+        RoomParticipants {
+            remote: vec!["user_b".to_string()],
+            pending: vec![],
+        }
+    );
+    assert_eq!(
+        room_participants(&room_b, cx_b),
+        RoomParticipants {
+            remote: vec!["user_a".to_string()],
+            pending: vec![],
+        }
+    );
+
+    // Ensure User B sees User A's screenshare.
+    room_b.read_with(cx_b, |room, _| {
+        assert_eq!(
+            room.remote_participants()
+                .get(&client_a.user_id().unwrap())
+                .unwrap()
+                .tracks
+                .len(),
+            1
+        );
+    });
+}
+
 #[gpui::test]
 async fn test_following_tab_order(
     deterministic: Arc<Deterministic>,

crates/live_kit_client/src/test.rs 🔗

@@ -104,6 +104,15 @@ impl TestServer {
                 room_name
             ))
         } else {
+            for track in &room.tracks {
+                client_room
+                    .0
+                    .lock()
+                    .video_track_updates
+                    .0
+                    .try_broadcast(RemoteVideoTrackUpdate::Subscribed(track.clone()))
+                    .unwrap();
+            }
             room.client_rooms.insert(identity, client_room);
             Ok(())
         }
@@ -167,11 +176,13 @@ impl TestServer {
             .get_mut(&*room_name)
             .ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
 
-        let update = RemoteVideoTrackUpdate::Subscribed(Arc::new(RemoteVideoTrack {
+        let track = Arc::new(RemoteVideoTrack {
             sid: nanoid::nanoid!(17),
             publisher_id: identity.clone(),
             frames_rx: local_track.frames_rx.clone(),
-        }));
+        });
+
+        room.tracks.push(track.clone());
 
         for (id, client_room) in &room.client_rooms {
             if *id != identity {
@@ -180,18 +191,30 @@ impl TestServer {
                     .lock()
                     .video_track_updates
                     .0
-                    .try_broadcast(update.clone())
+                    .try_broadcast(RemoteVideoTrackUpdate::Subscribed(track.clone()))
                     .unwrap();
             }
         }
 
         Ok(())
     }
+
+    fn video_tracks(&self, token: String) -> Result<Vec<Arc<RemoteVideoTrack>>> {
+        let claims = live_kit_server::token::validate(&token, &self.secret_key)?;
+        let room_name = claims.video.room.unwrap();
+
+        let mut server_rooms = self.rooms.lock();
+        let room = server_rooms
+            .get_mut(&*room_name)
+            .ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
+        Ok(room.tracks.clone())
+    }
 }
 
 #[derive(Default)]
 struct TestServerRoom {
     client_rooms: HashMap<Sid, Arc<Room>>,
+    tracks: Vec<Arc<RemoteVideoTrack>>,
 }
 
 impl TestServerRoom {}
@@ -307,8 +330,17 @@ impl Room {
 
     pub fn unpublish_track(&self, _: LocalTrackPublication) {}
 
-    pub fn remote_video_tracks(&self, _: &str) -> Vec<Arc<RemoteVideoTrack>> {
-        Default::default()
+    pub fn remote_video_tracks(&self, publisher_id: &str) -> Vec<Arc<RemoteVideoTrack>> {
+        if !self.is_connected() {
+            return Vec::new();
+        }
+
+        self.test_server()
+            .video_tracks(self.token())
+            .unwrap()
+            .into_iter()
+            .filter(|track| track.publisher_id() == publisher_id)
+            .collect()
     }
 
     pub fn remote_video_track_updates(&self) -> impl Stream<Item = RemoteVideoTrackUpdate> {
@@ -332,6 +364,13 @@ impl Room {
             ConnectionState::Connected { token, .. } => token,
         }
     }
+
+    fn is_connected(&self) -> bool {
+        match *self.0.lock().connection.1.borrow() {
+            ConnectionState::Disconnected => false,
+            ConnectionState::Connected { .. } => true,
+        }
+    }
 }
 
 impl Drop for Room {