From 087d51634d3072854c150371a892ef56c544b563 Mon Sep 17 00:00:00 2001 From: Antonio Scandurra Date: Tue, 14 Feb 2023 10:46:29 +0100 Subject: [PATCH 1/2] Fix test that wasn't properly verifying disconnection from livekit --- crates/collab/src/tests/integration_tests.rs | 107 +++++++++++++++++-- 1 file changed, 100 insertions(+), 7 deletions(-) diff --git a/crates/collab/src/tests/integration_tests.rs b/crates/collab/src/tests/integration_tests.rs index 9a7e7295fe86b06a78e12df8707aaefc4e602326..55f888d11d5b6ed16fe58cd85a94cfe59f5f8d60 100644 --- a/crates/collab/src/tests/integration_tests.rs +++ b/crates/collab/src/tests/integration_tests.rs @@ -166,9 +166,67 @@ async fn test_basic_calls( } ); + // Call user C again from user A. + active_call_a + .update(cx_a, |call, cx| { + call.invite(client_c.user_id().unwrap(), None, cx) + }) + .await + .unwrap(); + + deterministic.run_until_parked(); + assert_eq!( + room_participants(&room_a, cx_a), + RoomParticipants { + remote: vec!["user_b".to_string()], + pending: vec!["user_c".to_string()] + } + ); + assert_eq!( + room_participants(&room_b, cx_b), + RoomParticipants { + remote: vec!["user_a".to_string()], + pending: vec!["user_c".to_string()] + } + ); + + // User C accepts the call. + let call_c = incoming_call_c.next().await.unwrap().unwrap(); + assert_eq!(call_c.calling_user.github_login, "user_a"); + active_call_c + .update(cx_c, |call, cx| call.accept_incoming(cx)) + .await + .unwrap(); + assert!(incoming_call_c.next().await.unwrap().is_none()); + let room_c = active_call_c.read_with(cx_c, |call, _| call.room().unwrap().clone()); + + deterministic.run_until_parked(); + assert_eq!( + room_participants(&room_a, cx_a), + RoomParticipants { + remote: vec!["user_b".to_string(), "user_c".to_string()], + pending: Default::default() + } + ); + assert_eq!( + room_participants(&room_b, cx_b), + RoomParticipants { + remote: vec!["user_a".to_string(), "user_c".to_string()], + pending: Default::default() + } + ); + assert_eq!( + room_participants(&room_c, cx_c), + RoomParticipants { + remote: vec!["user_a".to_string(), "user_b".to_string()], + pending: Default::default() + } + ); + // User A shares their screen let display = MacOSDisplay::new(); let events_b = active_call_events(cx_b); + let events_c = active_call_events(cx_c); active_call_a .update(cx_a, |call, cx| { call.room().unwrap().update(cx, |room, cx| { @@ -181,9 +239,10 @@ async fn test_basic_calls( deterministic.run_until_parked(); + // User B observes the remote screen sharing track. assert_eq!(events_b.borrow().len(), 1); - let event = events_b.borrow().first().unwrap().clone(); - if let call::room::Event::RemoteVideoTracksChanged { participant_id } = event { + let event_b = events_b.borrow().first().unwrap().clone(); + if let call::room::Event::RemoteVideoTracksChanged { participant_id } = event_b { assert_eq!(participant_id, client_a.peer_id().unwrap()); room_b.read_with(cx_b, |room, _| { assert_eq!( @@ -197,6 +256,23 @@ async fn test_basic_calls( panic!("unexpected event") } + // User C observes the remote screen sharing track. + assert_eq!(events_c.borrow().len(), 1); + let event_c = events_c.borrow().first().unwrap().clone(); + if let call::room::Event::RemoteVideoTracksChanged { participant_id } = event_c { + assert_eq!(participant_id, client_a.peer_id().unwrap()); + room_c.read_with(cx_c, |room, _| { + assert_eq!( + room.remote_participants()[&client_a.user_id().unwrap()] + .tracks + .len(), + 1 + ); + }); + } else { + panic!("unexpected event") + } + // User A leaves the room. active_call_a.update(cx_a, |call, cx| { call.hang_up(cx).unwrap(); @@ -213,18 +289,28 @@ async fn test_basic_calls( assert_eq!( room_participants(&room_b, cx_b), RoomParticipants { - remote: Default::default(), + remote: vec!["user_c".to_string()], + pending: Default::default() + } + ); + assert_eq!( + room_participants(&room_c, cx_c), + RoomParticipants { + remote: vec!["user_b".to_string()], pending: Default::default() } ); // User B gets disconnected from the LiveKit server, which causes them - // to automatically leave the room. + // to automatically leave the room. User C leaves the room as well because + // nobody else is in there. server .test_live_kit_server - .disconnect_client(client_b.peer_id().unwrap().to_string()) + .disconnect_client(client_b.user_id().unwrap().to_string()) .await; - active_call_b.update(cx_b, |call, _| assert!(call.room().is_none())); + deterministic.run_until_parked(); + active_call_b.read_with(cx_b, |call, _| assert!(call.room().is_none())); + active_call_c.read_with(cx_c, |call, _| assert!(call.room().is_none())); assert_eq!( room_participants(&room_a, cx_a), RoomParticipants { @@ -239,6 +325,13 @@ async fn test_basic_calls( pending: Default::default() } ); + assert_eq!( + room_participants(&room_c, cx_c), + RoomParticipants { + remote: Default::default(), + pending: Default::default() + } + ); } #[gpui::test(iterations = 10)] @@ -2572,7 +2665,7 @@ async fn test_fs_operations( .await .unwrap(); deterministic.run_until_parked(); - + worktree_a.read_with(cx_a, |worktree, _| { assert_eq!( worktree From 7be868e3726c268c0d7cfc01dabd2687e0951dbc Mon Sep 17 00:00:00 2001 From: Antonio Scandurra Date: Tue, 14 Feb 2023 12:03:30 +0100 Subject: [PATCH 2/2] Avoid creating more than one room when inviting multiple people at once Previously, when initiating a call by calling multiple people, only the first person would get the call while all the others would briefly show a "pending" status but never get the call. This would happen because `ActiveCall` was trying to a create a different room for each person called, because the original room creation hadn't finished and so a `ModelHandle` wasn't being store in the active call. With this commit, only one room can be created at any given time and further invites have to wait until that room creation is done. --- crates/call/src/call.rs | 92 ++++++++----- crates/collab/src/tests/integration_tests.rs | 128 +++++++++++++++++++ 2 files changed, 190 insertions(+), 30 deletions(-) diff --git a/crates/call/src/call.rs b/crates/call/src/call.rs index 596a0ec8535e626cc379ec9fb3a62318ff7d54b6..64584e61400b414620ba640f2fbc0b79825c535e 100644 --- a/crates/call/src/call.rs +++ b/crates/call/src/call.rs @@ -6,6 +6,7 @@ use std::sync::Arc; use anyhow::{anyhow, Result}; use client::{proto, Client, TypedEnvelope, User, UserStore}; use collections::HashSet; +use futures::{future::Shared, FutureExt}; use postage::watch; use gpui::{ @@ -33,6 +34,7 @@ pub struct IncomingCall { /// Singleton global maintaining the user's participation in a room across workspaces. pub struct ActiveCall { room: Option<(ModelHandle, Vec)>, + pending_room_creation: Option, Arc>>>>, location: Option>, pending_invites: HashSet, incoming_call: ( @@ -56,6 +58,7 @@ impl ActiveCall { ) -> Self { Self { room: None, + pending_room_creation: None, location: None, pending_invites: Default::default(), incoming_call: watch::channel(), @@ -124,45 +127,74 @@ impl ActiveCall { initial_project: Option>, cx: &mut ModelContext, ) -> Task> { - let client = self.client.clone(); - let user_store = self.user_store.clone(); if !self.pending_invites.insert(called_user_id) { return Task::ready(Err(anyhow!("user was already invited"))); } - cx.notify(); - cx.spawn(|this, mut cx| async move { - let invite = async { - if let Some(room) = this.read_with(&cx, |this, _| this.room().cloned()) { - let initial_project_id = if let Some(initial_project) = initial_project { - Some( - room.update(&mut cx, |room, cx| { - room.share_project(initial_project, cx) - }) - .await?, - ) - } else { - None - }; - room.update(&mut cx, |room, cx| { - room.call(called_user_id, initial_project_id, cx) - }) - .await?; + let room = if let Some(room) = self.room().cloned() { + Some(Task::ready(Ok(room)).shared()) + } else { + self.pending_room_creation.clone() + }; + + let invite = if let Some(room) = room { + cx.spawn_weak(|_, mut cx| async move { + let room = room.await.map_err(|err| anyhow!("{:?}", err))?; + + let initial_project_id = if let Some(initial_project) = initial_project { + Some( + room.update(&mut cx, |room, cx| room.share_project(initial_project, cx)) + .await?, + ) } else { - let room = cx - .update(|cx| { - Room::create(called_user_id, initial_project, client, user_store, cx) - }) - .await?; - - this.update(&mut cx, |this, cx| this.set_room(Some(room), cx)) - .await?; + None }; - Ok(()) - }; + room.update(&mut cx, |room, cx| { + room.call(called_user_id, initial_project_id, cx) + }) + .await?; + + anyhow::Ok(()) + }) + } else { + let client = self.client.clone(); + let user_store = self.user_store.clone(); + let room = cx + .spawn(|this, mut cx| async move { + let create_room = async { + let room = cx + .update(|cx| { + Room::create( + called_user_id, + initial_project, + client, + user_store, + cx, + ) + }) + .await?; + + this.update(&mut cx, |this, cx| this.set_room(Some(room.clone()), cx)) + .await?; + anyhow::Ok(room) + }; + + let room = create_room.await; + this.update(&mut cx, |this, _| this.pending_room_creation = None); + room.map_err(Arc::new) + }) + .shared(); + self.pending_room_creation = Some(room.clone()); + cx.foreground().spawn(async move { + room.await.map_err(|err| anyhow!("{:?}", err))?; + anyhow::Ok(()) + }) + }; + + cx.spawn(|this, mut cx| async move { let result = invite.await; this.update(&mut cx, |this, cx| { this.pending_invites.remove(&called_user_id); diff --git a/crates/collab/src/tests/integration_tests.rs b/crates/collab/src/tests/integration_tests.rs index 55f888d11d5b6ed16fe58cd85a94cfe59f5f8d60..ff9872f31feb9d0c1e2b357daa2fbfb8f58f9085 100644 --- a/crates/collab/src/tests/integration_tests.rs +++ b/crates/collab/src/tests/integration_tests.rs @@ -334,6 +334,134 @@ async fn test_basic_calls( ); } +#[gpui::test(iterations = 10)] +async fn test_calling_multiple_users_simultaneously( + deterministic: Arc, + cx_a: &mut TestAppContext, + cx_b: &mut TestAppContext, + cx_c: &mut TestAppContext, + cx_d: &mut TestAppContext, +) { + deterministic.forbid_parking(); + let mut server = TestServer::start(&deterministic).await; + + let client_a = server.create_client(cx_a, "user_a").await; + let client_b = server.create_client(cx_b, "user_b").await; + let client_c = server.create_client(cx_c, "user_c").await; + let client_d = server.create_client(cx_d, "user_d").await; + server + .make_contacts(&mut [ + (&client_a, cx_a), + (&client_b, cx_b), + (&client_c, cx_c), + (&client_d, cx_d), + ]) + .await; + + let active_call_a = cx_a.read(ActiveCall::global); + let active_call_b = cx_b.read(ActiveCall::global); + let active_call_c = cx_c.read(ActiveCall::global); + let active_call_d = cx_d.read(ActiveCall::global); + + // Simultaneously call user B and user C from client A. + let b_invite = active_call_a.update(cx_a, |call, cx| { + call.invite(client_b.user_id().unwrap(), None, cx) + }); + let c_invite = active_call_a.update(cx_a, |call, cx| { + call.invite(client_c.user_id().unwrap(), None, cx) + }); + b_invite.await.unwrap(); + c_invite.await.unwrap(); + + let room_a = active_call_a.read_with(cx_a, |call, _| call.room().unwrap().clone()); + deterministic.run_until_parked(); + assert_eq!( + room_participants(&room_a, cx_a), + RoomParticipants { + remote: Default::default(), + pending: vec!["user_b".to_string(), "user_c".to_string()] + } + ); + + // Call client D from client A. + active_call_a + .update(cx_a, |call, cx| { + call.invite(client_d.user_id().unwrap(), None, cx) + }) + .await + .unwrap(); + deterministic.run_until_parked(); + assert_eq!( + room_participants(&room_a, cx_a), + RoomParticipants { + remote: Default::default(), + pending: vec![ + "user_b".to_string(), + "user_c".to_string(), + "user_d".to_string() + ] + } + ); + + // Accept the call on all clients simultaneously. + let accept_b = active_call_b.update(cx_b, |call, cx| call.accept_incoming(cx)); + let accept_c = active_call_c.update(cx_c, |call, cx| call.accept_incoming(cx)); + let accept_d = active_call_d.update(cx_d, |call, cx| call.accept_incoming(cx)); + accept_b.await.unwrap(); + accept_c.await.unwrap(); + accept_d.await.unwrap(); + + deterministic.run_until_parked(); + + let room_b = active_call_b.read_with(cx_b, |call, _| call.room().unwrap().clone()); + let room_c = active_call_c.read_with(cx_c, |call, _| call.room().unwrap().clone()); + let room_d = active_call_d.read_with(cx_d, |call, _| call.room().unwrap().clone()); + assert_eq!( + room_participants(&room_a, cx_a), + RoomParticipants { + remote: vec![ + "user_b".to_string(), + "user_c".to_string(), + "user_d".to_string(), + ], + pending: Default::default() + } + ); + assert_eq!( + room_participants(&room_b, cx_b), + RoomParticipants { + remote: vec![ + "user_a".to_string(), + "user_c".to_string(), + "user_d".to_string(), + ], + pending: Default::default() + } + ); + assert_eq!( + room_participants(&room_c, cx_c), + RoomParticipants { + remote: vec![ + "user_a".to_string(), + "user_b".to_string(), + "user_d".to_string(), + ], + pending: Default::default() + } + ); + assert_eq!( + room_participants(&room_d, cx_d), + RoomParticipants { + remote: vec![ + "user_a".to_string(), + "user_b".to_string(), + "user_c".to_string(), + ], + pending: Default::default() + } + ); +} + #[gpui::test(iterations = 10)] async fn test_room_uniqueness( deterministic: Arc,