From bb9ce86a29c300fdcf227dcc35a6d3748371d078 Mon Sep 17 00:00:00 2001 From: Antonio Scandurra Date: Mon, 26 Sep 2022 11:56:19 +0200 Subject: [PATCH] Introduce the ability of declining calls --- crates/client/src/user.rs | 11 +++ crates/collab/src/integration_tests.rs | 120 ++++++++++++++++++++++--- crates/collab/src/rpc.rs | 26 ++++-- crates/collab/src/rpc/store.rs | 40 +++++++-- crates/room/src/room.rs | 7 ++ crates/rpc/proto/zed.proto | 6 +- 6 files changed, 181 insertions(+), 29 deletions(-) diff --git a/crates/client/src/user.rs b/crates/client/src/user.rs index 0e09c7636a6a127ee06dfce37aec13f5eb8d9338..5be0125ff8ef97a9f226a1629275b464c771fa71 100644 --- a/crates/client/src/user.rs +++ b/crates/client/src/user.rs @@ -245,6 +245,17 @@ impl UserStore { self.incoming_call.1.clone() } + pub fn decline_call(&mut self) -> Result<()> { + let mut incoming_call = self.incoming_call.0.borrow_mut(); + if incoming_call.is_some() { + if let Some(client) = self.client.upgrade() { + client.send(proto::DeclineCall {})?; + } + *incoming_call = None; + } + Ok(()) + } + async fn handle_update_contacts( this: ModelHandle, message: TypedEnvelope, diff --git a/crates/collab/src/integration_tests.rs b/crates/collab/src/integration_tests.rs index 63a2efa0fb2521787bf59b0cc506af4a2d349e1a..550a13a2a9cbc5a21ceb1181a92eb92e6b571eec 100644 --- a/crates/collab/src/integration_tests.rs +++ b/crates/collab/src/integration_tests.rs @@ -66,13 +66,15 @@ async fn test_share_project_in_room( deterministic: Arc, cx_a: &mut TestAppContext, cx_b: &mut TestAppContext, + cx_c: &mut TestAppContext, ) { deterministic.forbid_parking(); let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await; let client_a = server.create_client(cx_a, "user_a").await; let client_b = server.create_client(cx_b, "user_b").await; + let client_c = server.create_client(cx_c, "user_c").await; server - .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)]) + .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b), (&client_c, cx_c)]) .await; client_a @@ -95,6 +97,13 @@ async fn test_share_project_in_room( .update(|cx| Room::create(client_a.clone(), cx)) .await .unwrap(); + assert_eq!( + participants(&room_a, &client_a, cx_a).await, + RoomParticipants { + remote: Default::default(), + pending: Default::default() + } + ); let (project_a, worktree_id) = client_a.build_local_project("/a", cx_a).await; // room.publish_project(project_a.clone()).await.unwrap(); @@ -105,27 +114,94 @@ async fn test_share_project_in_room( .update(cx_a, |room, cx| room.call(client_b.user_id().unwrap(), cx)) .await .unwrap(); + assert_eq!( + participants(&room_a, &client_a, cx_a).await, + RoomParticipants { + remote: Default::default(), + pending: vec!["user_b".to_string()] + } + ); + let call_b = incoming_call_b.next().await.unwrap().unwrap(); let room_b = cx_b .update(|cx| Room::join(&call_b, client_b.clone(), cx)) .await .unwrap(); assert!(incoming_call_b.next().await.unwrap().is_none()); + + deterministic.run_until_parked(); assert_eq!( - remote_participants(&room_a, &client_a, cx_a).await, - vec!["user_b"] + participants(&room_a, &client_a, cx_a).await, + RoomParticipants { + remote: vec!["user_b".to_string()], + pending: Default::default() + } ); assert_eq!( - remote_participants(&room_b, &client_b, cx_b).await, - vec!["user_a"] + participants(&room_b, &client_b, cx_b).await, + RoomParticipants { + remote: vec!["user_a".to_string()], + pending: Default::default() + } ); - async fn remote_participants( + let mut incoming_call_c = client_c + .user_store + .update(cx_c, |user, _| user.incoming_call()); + room_a + .update(cx_a, |room, cx| room.call(client_c.user_id().unwrap(), cx)) + .await + .unwrap(); + assert_eq!( + participants(&room_a, &client_a, cx_a).await, + RoomParticipants { + remote: vec!["user_b".to_string()], + pending: vec!["user_c".to_string()] + } + ); + assert_eq!( + participants(&room_b, &client_b, cx_b).await, + RoomParticipants { + remote: vec!["user_a".to_string()], + pending: vec!["user_c".to_string()] + } + ); + let _call_c = incoming_call_c.next().await.unwrap().unwrap(); + + client_c + .user_store + .update(cx_c, |user, _| user.decline_call()) + .unwrap(); + assert!(incoming_call_c.next().await.unwrap().is_none()); + + deterministic.run_until_parked(); + assert_eq!( + participants(&room_a, &client_a, cx_a).await, + RoomParticipants { + remote: vec!["user_b".to_string()], + pending: Default::default() + } + ); + assert_eq!( + participants(&room_b, &client_b, cx_b).await, + RoomParticipants { + remote: vec!["user_a".to_string()], + pending: Default::default() + } + ); + + #[derive(Debug, Eq, PartialEq)] + struct RoomParticipants { + remote: Vec, + pending: Vec, + } + + async fn participants( room: &ModelHandle, client: &TestClient, cx: &mut TestAppContext, - ) -> Vec { - let users = room.update(cx, |room, cx| { + ) -> RoomParticipants { + let remote_users = room.update(cx, |room, cx| { room.remote_participants() .values() .map(|participant| { @@ -135,11 +211,29 @@ async fn test_share_project_in_room( }) .collect::>() }); - let users = futures::future::try_join_all(users).await.unwrap(); - users - .into_iter() - .map(|user| user.github_login.clone()) - .collect() + let remote_users = futures::future::try_join_all(remote_users).await.unwrap(); + let pending_users = room.update(cx, |room, cx| { + room.pending_user_ids() + .iter() + .map(|user_id| { + client + .user_store + .update(cx, |users, cx| users.get_user(*user_id, cx)) + }) + .collect::>() + }); + let pending_users = futures::future::try_join_all(pending_users).await.unwrap(); + + RoomParticipants { + remote: remote_users + .into_iter() + .map(|user| user.github_login.clone()) + .collect(), + pending: pending_users + .into_iter() + .map(|user| user.github_login.clone()) + .collect(), + } } } diff --git a/crates/collab/src/rpc.rs b/crates/collab/src/rpc.rs index dfaaa8a03da75e4337605287fd58e67f9cf24694..fb8bbdb85afd2645fcf90356f99329cbba33a0f4 100644 --- a/crates/collab/src/rpc.rs +++ b/crates/collab/src/rpc.rs @@ -154,6 +154,7 @@ impl Server { .add_request_handler(Server::create_room) .add_request_handler(Server::join_room) .add_request_handler(Server::call) + .add_message_handler(Server::decline_call) .add_request_handler(Server::register_project) .add_request_handler(Server::unregister_project) .add_request_handler(Server::join_project) @@ -613,10 +614,10 @@ impl Server { ) -> Result<()> { let room_id = request.payload.id; let mut store = self.store().await; - let (room, recipient_ids) = store.join_room(room_id, request.sender_id)?; - for receiver_id in recipient_ids { + let (room, recipient_connection_ids) = store.join_room(room_id, request.sender_id)?; + for recipient_id in recipient_connection_ids { self.peer - .send(receiver_id, proto::CancelCall {}) + .send(recipient_id, proto::CancelCall {}) .trace_err(); } response.send(proto::JoinRoomResponse { @@ -635,10 +636,10 @@ impl Server { let room_id = request.payload.room_id; let mut calls = { let mut store = self.store().await; - let (from_user_id, recipient_ids, room) = + let (from_user_id, recipient_connection_ids, room) = store.call(room_id, request.sender_id, to_user_id)?; self.room_updated(room); - recipient_ids + recipient_connection_ids .into_iter() .map(|recipient_id| { self.peer.request( @@ -678,6 +679,21 @@ impl Server { Err(anyhow!("failed to ring call recipient"))? } + async fn decline_call( + self: Arc, + message: TypedEnvelope, + ) -> Result<()> { + let mut store = self.store().await; + let (room, recipient_connection_ids) = store.call_declined(message.sender_id)?; + for recipient_id in recipient_connection_ids { + self.peer + .send(recipient_id, proto::CancelCall {}) + .trace_err(); + } + self.room_updated(room); + Ok(()) + } + fn room_updated(&self, room: &proto::Room) { for participant in &room.participants { self.peer diff --git a/crates/collab/src/rpc/store.rs b/crates/collab/src/rpc/store.rs index d19ae122e095a048c66b58893014f4f76d87bce6..fc8576224bd227b9809f046458289c2d1b43eea9 100644 --- a/crates/collab/src/rpc/store.rs +++ b/crates/collab/src/rpc/store.rs @@ -384,7 +384,7 @@ impl Store { .get_mut(&connection_id) .ok_or_else(|| anyhow!("no such connection"))?; let user_id = connection.user_id; - let recipient_ids = self.connection_ids_for_user(user_id).collect::>(); + let recipient_connection_ids = self.connection_ids_for_user(user_id).collect::>(); let mut user_connection_state = self .connections_by_user_id @@ -402,10 +402,10 @@ impl Store { .get_mut(&room_id) .ok_or_else(|| anyhow!("no such room"))?; anyhow::ensure!( - room.pending_calls_to_user_ids.contains(&user_id.to_proto()), + room.pending_user_ids.contains(&user_id.to_proto()), anyhow!("no such room") ); - room.pending_calls_to_user_ids + room.pending_user_ids .retain(|pending| *pending != user_id.to_proto()); room.participants.push(proto::Participant { user_id: user_id.to_proto(), @@ -419,7 +419,7 @@ impl Store { }); user_connection_state.room = Some(RoomState::Joined); - Ok((room, recipient_ids)) + Ok((room, recipient_connection_ids)) } pub fn call( @@ -451,12 +451,12 @@ impl Store { "no such room" ); anyhow::ensure!( - room.pending_calls_to_user_ids + room.pending_user_ids .iter() .all(|user_id| UserId::from_proto(*user_id) != to_user_id), "cannot call the same user more than once" ); - room.pending_calls_to_user_ids.push(to_user_id.to_proto()); + room.pending_user_ids.push(to_user_id.to_proto()); to_user_connection_state.room = Some(RoomState::Calling { room_id }); Ok((from_user_id, to_connection_ids, room)) @@ -473,11 +473,37 @@ impl Store { .rooms .get_mut(&room_id) .ok_or_else(|| anyhow!("no such room"))?; - room.pending_calls_to_user_ids + room.pending_user_ids .retain(|user_id| UserId::from_proto(*user_id) != to_user_id); Ok(room) } + pub fn call_declined( + &mut self, + recipient_connection_id: ConnectionId, + ) -> Result<(&proto::Room, Vec)> { + let recipient_user_id = self.user_id_for_connection(recipient_connection_id)?; + let mut to_user_connection_state = self + .connections_by_user_id + .get_mut(&recipient_user_id) + .ok_or_else(|| anyhow!("no such connection"))?; + if let Some(RoomState::Calling { room_id }) = to_user_connection_state.room { + to_user_connection_state.room = None; + let recipient_connection_ids = self + .connection_ids_for_user(recipient_user_id) + .collect::>(); + let room = self + .rooms + .get_mut(&room_id) + .ok_or_else(|| anyhow!("no such room"))?; + room.pending_user_ids + .retain(|user_id| UserId::from_proto(*user_id) != recipient_user_id); + Ok((room, recipient_connection_ids)) + } else { + Err(anyhow!("user is not being called")) + } + } + pub fn register_project( &mut self, host_connection_id: ConnectionId, diff --git a/crates/room/src/room.rs b/crates/room/src/room.rs index 78de99497862d408657c4cbb143451e9beb440d8..6dddfeda3f8311ff45b6b65d4334b0d33dc3764e 100644 --- a/crates/room/src/room.rs +++ b/crates/room/src/room.rs @@ -21,6 +21,7 @@ pub struct Room { id: u64, local_participant: LocalParticipant, remote_participants: HashMap, + pending_user_ids: Vec, client: Arc, _subscriptions: Vec, } @@ -62,6 +63,7 @@ impl Room { projects: Default::default(), }, remote_participants: Default::default(), + pending_user_ids: Default::default(), _subscriptions: vec![client.add_message_handler(cx.handle(), Self::handle_room_updated)], client, } @@ -71,6 +73,10 @@ impl Room { &self.remote_participants } + pub fn pending_user_ids(&self) -> &[u64] { + &self.pending_user_ids + } + async fn handle_room_updated( this: ModelHandle, envelope: TypedEnvelope, @@ -100,6 +106,7 @@ impl Room { ); } } + self.pending_user_ids = room.pending_user_ids; cx.notify(); Ok(()) } diff --git a/crates/rpc/proto/zed.proto b/crates/rpc/proto/zed.proto index 6c0c929f8253525f013cf4ae34b05f00abf87ccf..bcc762283d2ed0742c06893101a437afe2e3f1f9 100644 --- a/crates/rpc/proto/zed.proto +++ b/crates/rpc/proto/zed.proto @@ -151,7 +151,7 @@ message JoinRoomResponse { message Room { repeated Participant participants = 1; - repeated uint64 pending_calls_to_user_ids = 2; + repeated uint64 pending_user_ids = 2; } message Participant { @@ -187,9 +187,7 @@ message IncomingCall { message CancelCall {} -message DeclineCall { - uint64 room_id = 1; -} +message DeclineCall {} message RoomUpdated { Room room = 1;