Merge pull request #2163 from zed-industries/quick-invite-bug

Antonio Scandurra created

Avoid creating more than one room when inviting multiple people at once

Change summary

crates/call/src/call.rs                      |  92 +++++--
crates/collab/src/tests/integration_tests.rs | 235 +++++++++++++++++++++
2 files changed, 290 insertions(+), 37 deletions(-)

Detailed changes

crates/call/src/call.rs 🔗

@@ -6,6 +6,7 @@ use std::sync::Arc;
 use anyhow::{anyhow, Result};
 use client::{proto, Client, TypedEnvelope, User, UserStore};
 use collections::HashSet;
+use futures::{future::Shared, FutureExt};
 use postage::watch;
 
 use gpui::{
@@ -33,6 +34,7 @@ pub struct IncomingCall {
 /// Singleton global maintaining the user's participation in a room across workspaces.
 pub struct ActiveCall {
     room: Option<(ModelHandle<Room>, Vec<Subscription>)>,
+    pending_room_creation: Option<Shared<Task<Result<ModelHandle<Room>, Arc<anyhow::Error>>>>>,
     location: Option<WeakModelHandle<Project>>,
     pending_invites: HashSet<u64>,
     incoming_call: (
@@ -56,6 +58,7 @@ impl ActiveCall {
     ) -> Self {
         Self {
             room: None,
+            pending_room_creation: None,
             location: None,
             pending_invites: Default::default(),
             incoming_call: watch::channel(),
@@ -124,45 +127,74 @@ impl ActiveCall {
         initial_project: Option<ModelHandle<Project>>,
         cx: &mut ModelContext<Self>,
     ) -> Task<Result<()>> {
-        let client = self.client.clone();
-        let user_store = self.user_store.clone();
         if !self.pending_invites.insert(called_user_id) {
             return Task::ready(Err(anyhow!("user was already invited")));
         }
-
         cx.notify();
-        cx.spawn(|this, mut cx| async move {
-            let invite = async {
-                if let Some(room) = this.read_with(&cx, |this, _| this.room().cloned()) {
-                    let initial_project_id = if let Some(initial_project) = initial_project {
-                        Some(
-                            room.update(&mut cx, |room, cx| {
-                                room.share_project(initial_project, cx)
-                            })
-                            .await?,
-                        )
-                    } else {
-                        None
-                    };
 
-                    room.update(&mut cx, |room, cx| {
-                        room.call(called_user_id, initial_project_id, cx)
-                    })
-                    .await?;
+        let room = if let Some(room) = self.room().cloned() {
+            Some(Task::ready(Ok(room)).shared())
+        } else {
+            self.pending_room_creation.clone()
+        };
+
+        let invite = if let Some(room) = room {
+            cx.spawn_weak(|_, mut cx| async move {
+                let room = room.await.map_err(|err| anyhow!("{:?}", err))?;
+
+                let initial_project_id = if let Some(initial_project) = initial_project {
+                    Some(
+                        room.update(&mut cx, |room, cx| room.share_project(initial_project, cx))
+                            .await?,
+                    )
                 } else {
-                    let room = cx
-                        .update(|cx| {
-                            Room::create(called_user_id, initial_project, client, user_store, cx)
-                        })
-                        .await?;
-
-                    this.update(&mut cx, |this, cx| this.set_room(Some(room), cx))
-                        .await?;
+                    None
                 };
 
-                Ok(())
-            };
+                room.update(&mut cx, |room, cx| {
+                    room.call(called_user_id, initial_project_id, cx)
+                })
+                .await?;
+
+                anyhow::Ok(())
+            })
+        } else {
+            let client = self.client.clone();
+            let user_store = self.user_store.clone();
+            let room = cx
+                .spawn(|this, mut cx| async move {
+                    let create_room = async {
+                        let room = cx
+                            .update(|cx| {
+                                Room::create(
+                                    called_user_id,
+                                    initial_project,
+                                    client,
+                                    user_store,
+                                    cx,
+                                )
+                            })
+                            .await?;
+
+                        this.update(&mut cx, |this, cx| this.set_room(Some(room.clone()), cx))
+                            .await?;
 
+                        anyhow::Ok(room)
+                    };
+
+                    let room = create_room.await;
+                    this.update(&mut cx, |this, _| this.pending_room_creation = None);
+                    room.map_err(Arc::new)
+                })
+                .shared();
+            self.pending_room_creation = Some(room.clone());
+            cx.foreground().spawn(async move {
+                room.await.map_err(|err| anyhow!("{:?}", err))?;
+                anyhow::Ok(())
+            })
+        };
+
+        cx.spawn(|this, mut cx| async move {
             let result = invite.await;
             this.update(&mut cx, |this, cx| {
                 this.pending_invites.remove(&called_user_id);

crates/collab/src/tests/integration_tests.rs 🔗

@@ -166,9 +166,67 @@ async fn test_basic_calls(
         }
     );
 
+    // Call user C again from user A.
+    active_call_a
+        .update(cx_a, |call, cx| {
+            call.invite(client_c.user_id().unwrap(), None, cx)
+        })
+        .await
+        .unwrap();
+
+    deterministic.run_until_parked();
+    assert_eq!(
+        room_participants(&room_a, cx_a),
+        RoomParticipants {
+            remote: vec!["user_b".to_string()],
+            pending: vec!["user_c".to_string()]
+        }
+    );
+    assert_eq!(
+        room_participants(&room_b, cx_b),
+        RoomParticipants {
+            remote: vec!["user_a".to_string()],
+            pending: vec!["user_c".to_string()]
+        }
+    );
+
+    // User C accepts the call.
+    let call_c = incoming_call_c.next().await.unwrap().unwrap();
+    assert_eq!(call_c.calling_user.github_login, "user_a");
+    active_call_c
+        .update(cx_c, |call, cx| call.accept_incoming(cx))
+        .await
+        .unwrap();
+    assert!(incoming_call_c.next().await.unwrap().is_none());
+    let room_c = active_call_c.read_with(cx_c, |call, _| call.room().unwrap().clone());
+
+    deterministic.run_until_parked();
+    assert_eq!(
+        room_participants(&room_a, cx_a),
+        RoomParticipants {
+            remote: vec!["user_b".to_string(), "user_c".to_string()],
+            pending: Default::default()
+        }
+    );
+    assert_eq!(
+        room_participants(&room_b, cx_b),
+        RoomParticipants {
+            remote: vec!["user_a".to_string(), "user_c".to_string()],
+            pending: Default::default()
+        }
+    );
+    assert_eq!(
+        room_participants(&room_c, cx_c),
+        RoomParticipants {
+            remote: vec!["user_a".to_string(), "user_b".to_string()],
+            pending: Default::default()
+        }
+    );
+
     // User A shares their screen
     let display = MacOSDisplay::new();
     let events_b = active_call_events(cx_b);
+    let events_c = active_call_events(cx_c);
     active_call_a
         .update(cx_a, |call, cx| {
             call.room().unwrap().update(cx, |room, cx| {
@@ -181,9 +239,10 @@ async fn test_basic_calls(
 
     deterministic.run_until_parked();
 
+    // User B observes the remote screen sharing track.
     assert_eq!(events_b.borrow().len(), 1);
-    let event = events_b.borrow().first().unwrap().clone();
-    if let call::room::Event::RemoteVideoTracksChanged { participant_id } = event {
+    let event_b = events_b.borrow().first().unwrap().clone();
+    if let call::room::Event::RemoteVideoTracksChanged { participant_id } = event_b {
         assert_eq!(participant_id, client_a.peer_id().unwrap());
         room_b.read_with(cx_b, |room, _| {
             assert_eq!(
@@ -197,6 +256,23 @@ async fn test_basic_calls(
         panic!("unexpected event")
     }
 
+    // User C observes the remote screen sharing track.
+    assert_eq!(events_c.borrow().len(), 1);
+    let event_c = events_c.borrow().first().unwrap().clone();
+    if let call::room::Event::RemoteVideoTracksChanged { participant_id } = event_c {
+        assert_eq!(participant_id, client_a.peer_id().unwrap());
+        room_c.read_with(cx_c, |room, _| {
+            assert_eq!(
+                room.remote_participants()[&client_a.user_id().unwrap()]
+                    .tracks
+                    .len(),
+                1
+            );
+        });
+    } else {
+        panic!("unexpected event")
+    }
+
     // User A leaves the room.
     active_call_a.update(cx_a, |call, cx| {
         call.hang_up(cx).unwrap();
@@ -213,18 +289,28 @@ async fn test_basic_calls(
     assert_eq!(
         room_participants(&room_b, cx_b),
         RoomParticipants {
-            remote: Default::default(),
+            remote: vec!["user_c".to_string()],
+            pending: Default::default()
+        }
+    );
+    assert_eq!(
+        room_participants(&room_c, cx_c),
+        RoomParticipants {
+            remote: vec!["user_b".to_string()],
             pending: Default::default()
         }
     );
 
     // User B gets disconnected from the LiveKit server, which causes them
-    // to automatically leave the room.
+    // to automatically leave the room. User C leaves the room as well because
+    // nobody else is in there.
     server
         .test_live_kit_server
-        .disconnect_client(client_b.peer_id().unwrap().to_string())
+        .disconnect_client(client_b.user_id().unwrap().to_string())
         .await;
-    active_call_b.update(cx_b, |call, _| assert!(call.room().is_none()));
+    deterministic.run_until_parked();
+    active_call_b.read_with(cx_b, |call, _| assert!(call.room().is_none()));
+    active_call_c.read_with(cx_c, |call, _| assert!(call.room().is_none()));
     assert_eq!(
         room_participants(&room_a, cx_a),
         RoomParticipants {
@@ -239,6 +325,141 @@ async fn test_basic_calls(
             pending: Default::default()
         }
     );
+    assert_eq!(
+        room_participants(&room_c, cx_c),
+        RoomParticipants {
+            remote: Default::default(),
+            pending: Default::default()
+        }
+    );
+}
+
+#[gpui::test(iterations = 10)]
+async fn test_calling_multiple_users_simultaneously(
+    deterministic: Arc<Deterministic>,
+    cx_a: &mut TestAppContext,
+    cx_b: &mut TestAppContext,
+    cx_c: &mut TestAppContext,
+    cx_d: &mut TestAppContext,
+) {
+    deterministic.forbid_parking();
+    let mut server = TestServer::start(&deterministic).await;
+
+    let client_a = server.create_client(cx_a, "user_a").await;
+    let client_b = server.create_client(cx_b, "user_b").await;
+    let client_c = server.create_client(cx_c, "user_c").await;
+    let client_d = server.create_client(cx_d, "user_d").await;
+    server
+        .make_contacts(&mut [
+            (&client_a, cx_a),
+            (&client_b, cx_b),
+            (&client_c, cx_c),
+            (&client_d, cx_d),
+        ])
+        .await;
+
+    let active_call_a = cx_a.read(ActiveCall::global);
+    let active_call_b = cx_b.read(ActiveCall::global);
+    let active_call_c = cx_c.read(ActiveCall::global);
+    let active_call_d = cx_d.read(ActiveCall::global);
+
+    // Simultaneously call user B and user C from client A.
+    let b_invite = active_call_a.update(cx_a, |call, cx| {
+        call.invite(client_b.user_id().unwrap(), None, cx)
+    });
+    let c_invite = active_call_a.update(cx_a, |call, cx| {
+        call.invite(client_c.user_id().unwrap(), None, cx)
+    });
+    b_invite.await.unwrap();
+    c_invite.await.unwrap();
+
+    let room_a = active_call_a.read_with(cx_a, |call, _| call.room().unwrap().clone());
+    deterministic.run_until_parked();
+    assert_eq!(
+        room_participants(&room_a, cx_a),
+        RoomParticipants {
+            remote: Default::default(),
+            pending: vec!["user_b".to_string(), "user_c".to_string()]
+        }
+    );
+
+    // Call client D from client A.
+    active_call_a
+        .update(cx_a, |call, cx| {
+            call.invite(client_d.user_id().unwrap(), None, cx)
+        })
+        .await
+        .unwrap();
+    deterministic.run_until_parked();
+    assert_eq!(
+        room_participants(&room_a, cx_a),
+        RoomParticipants {
+            remote: Default::default(),
+            pending: vec![
+                "user_b".to_string(),
+                "user_c".to_string(),
+                "user_d".to_string()
+            ]
+        }
+    );
+
+    // Accept the call on all clients simultaneously.
+    let accept_b = active_call_b.update(cx_b, |call, cx| call.accept_incoming(cx));
+    let accept_c = active_call_c.update(cx_c, |call, cx| call.accept_incoming(cx));
+    let accept_d = active_call_d.update(cx_d, |call, cx| call.accept_incoming(cx));
+    accept_b.await.unwrap();
+    accept_c.await.unwrap();
+    accept_d.await.unwrap();
+
+    deterministic.run_until_parked();
+
+    let room_b = active_call_b.read_with(cx_b, |call, _| call.room().unwrap().clone());
+    let room_c = active_call_c.read_with(cx_c, |call, _| call.room().unwrap().clone());
+    let room_d = active_call_d.read_with(cx_d, |call, _| call.room().unwrap().clone());
+    assert_eq!(
+        room_participants(&room_a, cx_a),
+        RoomParticipants {
+            remote: vec![
+                "user_b".to_string(),
+                "user_c".to_string(),
+                "user_d".to_string(),
+            ],
+            pending: Default::default()
+        }
+    );
+    assert_eq!(
+        room_participants(&room_b, cx_b),
+        RoomParticipants {
+            remote: vec![
+                "user_a".to_string(),
+                "user_c".to_string(),
+                "user_d".to_string(),
+            ],
+            pending: Default::default()
+        }
+    );
+    assert_eq!(
+        room_participants(&room_c, cx_c),
+        RoomParticipants {
+            remote: vec![
+                "user_a".to_string(),
+                "user_b".to_string(),
+                "user_d".to_string(),
+            ],
+            pending: Default::default()
+        }
+    );
+    assert_eq!(
+        room_participants(&room_d, cx_d),
+        RoomParticipants {
+            remote: vec![
+                "user_a".to_string(),
+                "user_b".to_string(),
+                "user_c".to_string(),
+            ],
+            pending: Default::default()
+        }
+    );
 }
 
 #[gpui::test(iterations = 10)]
@@ -2572,7 +2793,7 @@ async fn test_fs_operations(
         .await
         .unwrap();
     deterministic.run_until_parked();
-    
+
     worktree_a.read_with(cx_a, |worktree, _| {
         assert_eq!(
             worktree