diff --git a/crates/call/src/call.rs b/crates/call/src/call.rs index 596a0ec8535e626cc379ec9fb3a62318ff7d54b6..64584e61400b414620ba640f2fbc0b79825c535e 100644 --- a/crates/call/src/call.rs +++ b/crates/call/src/call.rs @@ -6,6 +6,7 @@ use std::sync::Arc; use anyhow::{anyhow, Result}; use client::{proto, Client, TypedEnvelope, User, UserStore}; use collections::HashSet; +use futures::{future::Shared, FutureExt}; use postage::watch; use gpui::{ @@ -33,6 +34,7 @@ pub struct IncomingCall { /// Singleton global maintaining the user's participation in a room across workspaces. pub struct ActiveCall { room: Option<(ModelHandle, Vec)>, + pending_room_creation: Option, Arc>>>>, location: Option>, pending_invites: HashSet, incoming_call: ( @@ -56,6 +58,7 @@ impl ActiveCall { ) -> Self { Self { room: None, + pending_room_creation: None, location: None, pending_invites: Default::default(), incoming_call: watch::channel(), @@ -124,45 +127,74 @@ impl ActiveCall { initial_project: Option>, cx: &mut ModelContext, ) -> Task> { - let client = self.client.clone(); - let user_store = self.user_store.clone(); if !self.pending_invites.insert(called_user_id) { return Task::ready(Err(anyhow!("user was already invited"))); } - cx.notify(); - cx.spawn(|this, mut cx| async move { - let invite = async { - if let Some(room) = this.read_with(&cx, |this, _| this.room().cloned()) { - let initial_project_id = if let Some(initial_project) = initial_project { - Some( - room.update(&mut cx, |room, cx| { - room.share_project(initial_project, cx) - }) - .await?, - ) - } else { - None - }; - room.update(&mut cx, |room, cx| { - room.call(called_user_id, initial_project_id, cx) - }) - .await?; + let room = if let Some(room) = self.room().cloned() { + Some(Task::ready(Ok(room)).shared()) + } else { + self.pending_room_creation.clone() + }; + + let invite = if let Some(room) = room { + cx.spawn_weak(|_, mut cx| async move { + let room = room.await.map_err(|err| anyhow!("{:?}", err))?; + + let initial_project_id = if let Some(initial_project) = initial_project { + Some( + room.update(&mut cx, |room, cx| room.share_project(initial_project, cx)) + .await?, + ) } else { - let room = cx - .update(|cx| { - Room::create(called_user_id, initial_project, client, user_store, cx) - }) - .await?; - - this.update(&mut cx, |this, cx| this.set_room(Some(room), cx)) - .await?; + None }; - Ok(()) - }; + room.update(&mut cx, |room, cx| { + room.call(called_user_id, initial_project_id, cx) + }) + .await?; + + anyhow::Ok(()) + }) + } else { + let client = self.client.clone(); + let user_store = self.user_store.clone(); + let room = cx + .spawn(|this, mut cx| async move { + let create_room = async { + let room = cx + .update(|cx| { + Room::create( + called_user_id, + initial_project, + client, + user_store, + cx, + ) + }) + .await?; + + this.update(&mut cx, |this, cx| this.set_room(Some(room.clone()), cx)) + .await?; + anyhow::Ok(room) + }; + + let room = create_room.await; + this.update(&mut cx, |this, _| this.pending_room_creation = None); + room.map_err(Arc::new) + }) + .shared(); + self.pending_room_creation = Some(room.clone()); + cx.foreground().spawn(async move { + room.await.map_err(|err| anyhow!("{:?}", err))?; + anyhow::Ok(()) + }) + }; + + cx.spawn(|this, mut cx| async move { let result = invite.await; this.update(&mut cx, |this, cx| { this.pending_invites.remove(&called_user_id); diff --git a/crates/collab/src/tests/integration_tests.rs b/crates/collab/src/tests/integration_tests.rs index 9a7e7295fe86b06a78e12df8707aaefc4e602326..ff9872f31feb9d0c1e2b357daa2fbfb8f58f9085 100644 --- a/crates/collab/src/tests/integration_tests.rs +++ b/crates/collab/src/tests/integration_tests.rs @@ -166,9 +166,67 @@ async fn test_basic_calls( } ); + // Call user C again from user A. + active_call_a + .update(cx_a, |call, cx| { + call.invite(client_c.user_id().unwrap(), None, cx) + }) + .await + .unwrap(); + + deterministic.run_until_parked(); + assert_eq!( + room_participants(&room_a, cx_a), + RoomParticipants { + remote: vec!["user_b".to_string()], + pending: vec!["user_c".to_string()] + } + ); + assert_eq!( + room_participants(&room_b, cx_b), + RoomParticipants { + remote: vec!["user_a".to_string()], + pending: vec!["user_c".to_string()] + } + ); + + // User C accepts the call. + let call_c = incoming_call_c.next().await.unwrap().unwrap(); + assert_eq!(call_c.calling_user.github_login, "user_a"); + active_call_c + .update(cx_c, |call, cx| call.accept_incoming(cx)) + .await + .unwrap(); + assert!(incoming_call_c.next().await.unwrap().is_none()); + let room_c = active_call_c.read_with(cx_c, |call, _| call.room().unwrap().clone()); + + deterministic.run_until_parked(); + assert_eq!( + room_participants(&room_a, cx_a), + RoomParticipants { + remote: vec!["user_b".to_string(), "user_c".to_string()], + pending: Default::default() + } + ); + assert_eq!( + room_participants(&room_b, cx_b), + RoomParticipants { + remote: vec!["user_a".to_string(), "user_c".to_string()], + pending: Default::default() + } + ); + assert_eq!( + room_participants(&room_c, cx_c), + RoomParticipants { + remote: vec!["user_a".to_string(), "user_b".to_string()], + pending: Default::default() + } + ); + // User A shares their screen let display = MacOSDisplay::new(); let events_b = active_call_events(cx_b); + let events_c = active_call_events(cx_c); active_call_a .update(cx_a, |call, cx| { call.room().unwrap().update(cx, |room, cx| { @@ -181,9 +239,10 @@ async fn test_basic_calls( deterministic.run_until_parked(); + // User B observes the remote screen sharing track. assert_eq!(events_b.borrow().len(), 1); - let event = events_b.borrow().first().unwrap().clone(); - if let call::room::Event::RemoteVideoTracksChanged { participant_id } = event { + let event_b = events_b.borrow().first().unwrap().clone(); + if let call::room::Event::RemoteVideoTracksChanged { participant_id } = event_b { assert_eq!(participant_id, client_a.peer_id().unwrap()); room_b.read_with(cx_b, |room, _| { assert_eq!( @@ -197,6 +256,23 @@ async fn test_basic_calls( panic!("unexpected event") } + // User C observes the remote screen sharing track. + assert_eq!(events_c.borrow().len(), 1); + let event_c = events_c.borrow().first().unwrap().clone(); + if let call::room::Event::RemoteVideoTracksChanged { participant_id } = event_c { + assert_eq!(participant_id, client_a.peer_id().unwrap()); + room_c.read_with(cx_c, |room, _| { + assert_eq!( + room.remote_participants()[&client_a.user_id().unwrap()] + .tracks + .len(), + 1 + ); + }); + } else { + panic!("unexpected event") + } + // User A leaves the room. active_call_a.update(cx_a, |call, cx| { call.hang_up(cx).unwrap(); @@ -213,18 +289,28 @@ async fn test_basic_calls( assert_eq!( room_participants(&room_b, cx_b), RoomParticipants { - remote: Default::default(), + remote: vec!["user_c".to_string()], + pending: Default::default() + } + ); + assert_eq!( + room_participants(&room_c, cx_c), + RoomParticipants { + remote: vec!["user_b".to_string()], pending: Default::default() } ); // User B gets disconnected from the LiveKit server, which causes them - // to automatically leave the room. + // to automatically leave the room. User C leaves the room as well because + // nobody else is in there. server .test_live_kit_server - .disconnect_client(client_b.peer_id().unwrap().to_string()) + .disconnect_client(client_b.user_id().unwrap().to_string()) .await; - active_call_b.update(cx_b, |call, _| assert!(call.room().is_none())); + deterministic.run_until_parked(); + active_call_b.read_with(cx_b, |call, _| assert!(call.room().is_none())); + active_call_c.read_with(cx_c, |call, _| assert!(call.room().is_none())); assert_eq!( room_participants(&room_a, cx_a), RoomParticipants { @@ -239,6 +325,141 @@ async fn test_basic_calls( pending: Default::default() } ); + assert_eq!( + room_participants(&room_c, cx_c), + RoomParticipants { + remote: Default::default(), + pending: Default::default() + } + ); +} + +#[gpui::test(iterations = 10)] +async fn test_calling_multiple_users_simultaneously( + deterministic: Arc, + cx_a: &mut TestAppContext, + cx_b: &mut TestAppContext, + cx_c: &mut TestAppContext, + cx_d: &mut TestAppContext, +) { + deterministic.forbid_parking(); + let mut server = TestServer::start(&deterministic).await; + + let client_a = server.create_client(cx_a, "user_a").await; + let client_b = server.create_client(cx_b, "user_b").await; + let client_c = server.create_client(cx_c, "user_c").await; + let client_d = server.create_client(cx_d, "user_d").await; + server + .make_contacts(&mut [ + (&client_a, cx_a), + (&client_b, cx_b), + (&client_c, cx_c), + (&client_d, cx_d), + ]) + .await; + + let active_call_a = cx_a.read(ActiveCall::global); + let active_call_b = cx_b.read(ActiveCall::global); + let active_call_c = cx_c.read(ActiveCall::global); + let active_call_d = cx_d.read(ActiveCall::global); + + // Simultaneously call user B and user C from client A. + let b_invite = active_call_a.update(cx_a, |call, cx| { + call.invite(client_b.user_id().unwrap(), None, cx) + }); + let c_invite = active_call_a.update(cx_a, |call, cx| { + call.invite(client_c.user_id().unwrap(), None, cx) + }); + b_invite.await.unwrap(); + c_invite.await.unwrap(); + + let room_a = active_call_a.read_with(cx_a, |call, _| call.room().unwrap().clone()); + deterministic.run_until_parked(); + assert_eq!( + room_participants(&room_a, cx_a), + RoomParticipants { + remote: Default::default(), + pending: vec!["user_b".to_string(), "user_c".to_string()] + } + ); + + // Call client D from client A. + active_call_a + .update(cx_a, |call, cx| { + call.invite(client_d.user_id().unwrap(), None, cx) + }) + .await + .unwrap(); + deterministic.run_until_parked(); + assert_eq!( + room_participants(&room_a, cx_a), + RoomParticipants { + remote: Default::default(), + pending: vec![ + "user_b".to_string(), + "user_c".to_string(), + "user_d".to_string() + ] + } + ); + + // Accept the call on all clients simultaneously. + let accept_b = active_call_b.update(cx_b, |call, cx| call.accept_incoming(cx)); + let accept_c = active_call_c.update(cx_c, |call, cx| call.accept_incoming(cx)); + let accept_d = active_call_d.update(cx_d, |call, cx| call.accept_incoming(cx)); + accept_b.await.unwrap(); + accept_c.await.unwrap(); + accept_d.await.unwrap(); + + deterministic.run_until_parked(); + + let room_b = active_call_b.read_with(cx_b, |call, _| call.room().unwrap().clone()); + let room_c = active_call_c.read_with(cx_c, |call, _| call.room().unwrap().clone()); + let room_d = active_call_d.read_with(cx_d, |call, _| call.room().unwrap().clone()); + assert_eq!( + room_participants(&room_a, cx_a), + RoomParticipants { + remote: vec![ + "user_b".to_string(), + "user_c".to_string(), + "user_d".to_string(), + ], + pending: Default::default() + } + ); + assert_eq!( + room_participants(&room_b, cx_b), + RoomParticipants { + remote: vec![ + "user_a".to_string(), + "user_c".to_string(), + "user_d".to_string(), + ], + pending: Default::default() + } + ); + assert_eq!( + room_participants(&room_c, cx_c), + RoomParticipants { + remote: vec![ + "user_a".to_string(), + "user_b".to_string(), + "user_d".to_string(), + ], + pending: Default::default() + } + ); + assert_eq!( + room_participants(&room_d, cx_d), + RoomParticipants { + remote: vec![ + "user_a".to_string(), + "user_b".to_string(), + "user_c".to_string(), + ], + pending: Default::default() + } + ); } #[gpui::test(iterations = 10)] @@ -2572,7 +2793,7 @@ async fn test_fs_operations( .await .unwrap(); deterministic.run_until_parked(); - + worktree_a.read_with(cx_a, |worktree, _| { assert_eq!( worktree