@@ -6,6 +6,7 @@ use std::sync::Arc;
use anyhow::{anyhow, Result};
use client::{proto, Client, TypedEnvelope, User, UserStore};
use collections::HashSet;
+use futures::{future::Shared, FutureExt};
use postage::watch;
use gpui::{
@@ -33,6 +34,7 @@ pub struct IncomingCall {
/// Singleton global maintaining the user's participation in a room across workspaces.
pub struct ActiveCall {
room: Option<(ModelHandle<Room>, Vec<Subscription>)>,
+ pending_room_creation: Option<Shared<Task<Result<ModelHandle<Room>, Arc<anyhow::Error>>>>>,
location: Option<WeakModelHandle<Project>>,
pending_invites: HashSet<u64>,
incoming_call: (
@@ -56,6 +58,7 @@ impl ActiveCall {
) -> Self {
Self {
room: None,
+ pending_room_creation: None,
location: None,
pending_invites: Default::default(),
incoming_call: watch::channel(),
@@ -124,45 +127,74 @@ impl ActiveCall {
initial_project: Option<ModelHandle<Project>>,
cx: &mut ModelContext<Self>,
) -> Task<Result<()>> {
- let client = self.client.clone();
- let user_store = self.user_store.clone();
if !self.pending_invites.insert(called_user_id) {
return Task::ready(Err(anyhow!("user was already invited")));
}
-
cx.notify();
- cx.spawn(|this, mut cx| async move {
- let invite = async {
- if let Some(room) = this.read_with(&cx, |this, _| this.room().cloned()) {
- let initial_project_id = if let Some(initial_project) = initial_project {
- Some(
- room.update(&mut cx, |room, cx| {
- room.share_project(initial_project, cx)
- })
- .await?,
- )
- } else {
- None
- };
- room.update(&mut cx, |room, cx| {
- room.call(called_user_id, initial_project_id, cx)
- })
- .await?;
+ let room = if let Some(room) = self.room().cloned() {
+ Some(Task::ready(Ok(room)).shared())
+ } else {
+ self.pending_room_creation.clone()
+ };
+
+ let invite = if let Some(room) = room {
+ cx.spawn_weak(|_, mut cx| async move {
+ let room = room.await.map_err(|err| anyhow!("{:?}", err))?;
+
+ let initial_project_id = if let Some(initial_project) = initial_project {
+ Some(
+ room.update(&mut cx, |room, cx| room.share_project(initial_project, cx))
+ .await?,
+ )
} else {
- let room = cx
- .update(|cx| {
- Room::create(called_user_id, initial_project, client, user_store, cx)
- })
- .await?;
-
- this.update(&mut cx, |this, cx| this.set_room(Some(room), cx))
- .await?;
+ None
};
- Ok(())
- };
+ room.update(&mut cx, |room, cx| {
+ room.call(called_user_id, initial_project_id, cx)
+ })
+ .await?;
+
+ anyhow::Ok(())
+ })
+ } else {
+ let client = self.client.clone();
+ let user_store = self.user_store.clone();
+ let room = cx
+ .spawn(|this, mut cx| async move {
+ let create_room = async {
+ let room = cx
+ .update(|cx| {
+ Room::create(
+ called_user_id,
+ initial_project,
+ client,
+ user_store,
+ cx,
+ )
+ })
+ .await?;
+
+ this.update(&mut cx, |this, cx| this.set_room(Some(room.clone()), cx))
+ .await?;
+ anyhow::Ok(room)
+ };
+
+ let room = create_room.await;
+ this.update(&mut cx, |this, _| this.pending_room_creation = None);
+ room.map_err(Arc::new)
+ })
+ .shared();
+ self.pending_room_creation = Some(room.clone());
+ cx.foreground().spawn(async move {
+ room.await.map_err(|err| anyhow!("{:?}", err))?;
+ anyhow::Ok(())
+ })
+ };
+
+ cx.spawn(|this, mut cx| async move {
let result = invite.await;
this.update(&mut cx, |this, cx| {
this.pending_invites.remove(&called_user_id);
@@ -334,6 +334,134 @@ async fn test_basic_calls(
);
}
+#[gpui::test(iterations = 10)]
+async fn test_calling_multiple_users_simultaneously(
+ deterministic: Arc<Deterministic>,
+ cx_a: &mut TestAppContext,
+ cx_b: &mut TestAppContext,
+ cx_c: &mut TestAppContext,
+ cx_d: &mut TestAppContext,
+) {
+ deterministic.forbid_parking();
+ let mut server = TestServer::start(&deterministic).await;
+
+ let client_a = server.create_client(cx_a, "user_a").await;
+ let client_b = server.create_client(cx_b, "user_b").await;
+ let client_c = server.create_client(cx_c, "user_c").await;
+ let client_d = server.create_client(cx_d, "user_d").await;
+ server
+ .make_contacts(&mut [
+ (&client_a, cx_a),
+ (&client_b, cx_b),
+ (&client_c, cx_c),
+ (&client_d, cx_d),
+ ])
+ .await;
+
+ let active_call_a = cx_a.read(ActiveCall::global);
+ let active_call_b = cx_b.read(ActiveCall::global);
+ let active_call_c = cx_c.read(ActiveCall::global);
+ let active_call_d = cx_d.read(ActiveCall::global);
+
+ // Simultaneously call user B and user C from client A.
+ let b_invite = active_call_a.update(cx_a, |call, cx| {
+ call.invite(client_b.user_id().unwrap(), None, cx)
+ });
+ let c_invite = active_call_a.update(cx_a, |call, cx| {
+ call.invite(client_c.user_id().unwrap(), None, cx)
+ });
+ b_invite.await.unwrap();
+ c_invite.await.unwrap();
+
+ let room_a = active_call_a.read_with(cx_a, |call, _| call.room().unwrap().clone());
+ deterministic.run_until_parked();
+ assert_eq!(
+ room_participants(&room_a, cx_a),
+ RoomParticipants {
+ remote: Default::default(),
+ pending: vec!["user_b".to_string(), "user_c".to_string()]
+ }
+ );
+
+ // Call client D from client A.
+ active_call_a
+ .update(cx_a, |call, cx| {
+ call.invite(client_d.user_id().unwrap(), None, cx)
+ })
+ .await
+ .unwrap();
+ deterministic.run_until_parked();
+ assert_eq!(
+ room_participants(&room_a, cx_a),
+ RoomParticipants {
+ remote: Default::default(),
+ pending: vec![
+ "user_b".to_string(),
+ "user_c".to_string(),
+ "user_d".to_string()
+ ]
+ }
+ );
+
+ // Accept the call on all clients simultaneously.
+ let accept_b = active_call_b.update(cx_b, |call, cx| call.accept_incoming(cx));
+ let accept_c = active_call_c.update(cx_c, |call, cx| call.accept_incoming(cx));
+ let accept_d = active_call_d.update(cx_d, |call, cx| call.accept_incoming(cx));
+ accept_b.await.unwrap();
+ accept_c.await.unwrap();
+ accept_d.await.unwrap();
+
+ deterministic.run_until_parked();
+
+ let room_b = active_call_b.read_with(cx_b, |call, _| call.room().unwrap().clone());
+ let room_c = active_call_c.read_with(cx_c, |call, _| call.room().unwrap().clone());
+ let room_d = active_call_d.read_with(cx_d, |call, _| call.room().unwrap().clone());
+ assert_eq!(
+ room_participants(&room_a, cx_a),
+ RoomParticipants {
+ remote: vec![
+ "user_b".to_string(),
+ "user_c".to_string(),
+ "user_d".to_string(),
+ ],
+ pending: Default::default()
+ }
+ );
+ assert_eq!(
+ room_participants(&room_b, cx_b),
+ RoomParticipants {
+ remote: vec![
+ "user_a".to_string(),
+ "user_c".to_string(),
+ "user_d".to_string(),
+ ],
+ pending: Default::default()
+ }
+ );
+ assert_eq!(
+ room_participants(&room_c, cx_c),
+ RoomParticipants {
+ remote: vec![
+ "user_a".to_string(),
+ "user_b".to_string(),
+ "user_d".to_string(),
+ ],
+ pending: Default::default()
+ }
+ );
+ assert_eq!(
+ room_participants(&room_d, cx_d),
+ RoomParticipants {
+ remote: vec![
+ "user_a".to_string(),
+ "user_b".to_string(),
+ "user_c".to_string(),
+ ],
+ pending: Default::default()
+ }
+ );
+}
+
#[gpui::test(iterations = 10)]
async fn test_room_uniqueness(
deterministic: Arc<Deterministic>,