Introduce the ability of declining calls

Antonio Scandurra created

Change summary

crates/client/src/user.rs              |  11 ++
crates/collab/src/integration_tests.rs | 120 ++++++++++++++++++++++++---
crates/collab/src/rpc.rs               |  26 ++++-
crates/collab/src/rpc/store.rs         |  40 +++++++-
crates/room/src/room.rs                |   7 +
crates/rpc/proto/zed.proto             |   6 
6 files changed, 181 insertions(+), 29 deletions(-)

Detailed changes

crates/client/src/user.rs 🔗

@@ -245,6 +245,17 @@ impl UserStore {
         self.incoming_call.1.clone()
     }
 
+    pub fn decline_call(&mut self) -> Result<()> {
+        let mut incoming_call = self.incoming_call.0.borrow_mut();
+        if incoming_call.is_some() {
+            if let Some(client) = self.client.upgrade() {
+                client.send(proto::DeclineCall {})?;
+            }
+            *incoming_call = None;
+        }
+        Ok(())
+    }
+
     async fn handle_update_contacts(
         this: ModelHandle<Self>,
         message: TypedEnvelope<proto::UpdateContacts>,

crates/collab/src/integration_tests.rs 🔗

@@ -66,13 +66,15 @@ async fn test_share_project_in_room(
     deterministic: Arc<Deterministic>,
     cx_a: &mut TestAppContext,
     cx_b: &mut TestAppContext,
+    cx_c: &mut TestAppContext,
 ) {
     deterministic.forbid_parking();
     let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
     let client_a = server.create_client(cx_a, "user_a").await;
     let client_b = server.create_client(cx_b, "user_b").await;
+    let client_c = server.create_client(cx_c, "user_c").await;
     server
-        .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b)])
+        .make_contacts(vec![(&client_a, cx_a), (&client_b, cx_b), (&client_c, cx_c)])
         .await;
 
     client_a
@@ -95,6 +97,13 @@ async fn test_share_project_in_room(
         .update(|cx| Room::create(client_a.clone(), cx))
         .await
         .unwrap();
+    assert_eq!(
+        participants(&room_a, &client_a, cx_a).await,
+        RoomParticipants {
+            remote: Default::default(),
+            pending: Default::default()
+        }
+    );
     let (project_a, worktree_id) = client_a.build_local_project("/a", cx_a).await;
     // room.publish_project(project_a.clone()).await.unwrap();
 
@@ -105,27 +114,94 @@ async fn test_share_project_in_room(
         .update(cx_a, |room, cx| room.call(client_b.user_id().unwrap(), cx))
         .await
         .unwrap();
+    assert_eq!(
+        participants(&room_a, &client_a, cx_a).await,
+        RoomParticipants {
+            remote: Default::default(),
+            pending: vec!["user_b".to_string()]
+        }
+    );
+
     let call_b = incoming_call_b.next().await.unwrap().unwrap();
     let room_b = cx_b
         .update(|cx| Room::join(&call_b, client_b.clone(), cx))
         .await
         .unwrap();
     assert!(incoming_call_b.next().await.unwrap().is_none());
+
+    deterministic.run_until_parked();
     assert_eq!(
-        remote_participants(&room_a, &client_a, cx_a).await,
-        vec!["user_b"]
+        participants(&room_a, &client_a, cx_a).await,
+        RoomParticipants {
+            remote: vec!["user_b".to_string()],
+            pending: Default::default()
+        }
     );
     assert_eq!(
-        remote_participants(&room_b, &client_b, cx_b).await,
-        vec!["user_a"]
+        participants(&room_b, &client_b, cx_b).await,
+        RoomParticipants {
+            remote: vec!["user_a".to_string()],
+            pending: Default::default()
+        }
     );
 
-    async fn remote_participants(
+    let mut incoming_call_c = client_c
+        .user_store
+        .update(cx_c, |user, _| user.incoming_call());
+    room_a
+        .update(cx_a, |room, cx| room.call(client_c.user_id().unwrap(), cx))
+        .await
+        .unwrap();
+    assert_eq!(
+        participants(&room_a, &client_a, cx_a).await,
+        RoomParticipants {
+            remote: vec!["user_b".to_string()],
+            pending: vec!["user_c".to_string()]
+        }
+    );
+    assert_eq!(
+        participants(&room_b, &client_b, cx_b).await,
+        RoomParticipants {
+            remote: vec!["user_a".to_string()],
+            pending: vec!["user_c".to_string()]
+        }
+    );
+    let _call_c = incoming_call_c.next().await.unwrap().unwrap();
+
+    client_c
+        .user_store
+        .update(cx_c, |user, _| user.decline_call())
+        .unwrap();
+    assert!(incoming_call_c.next().await.unwrap().is_none());
+
+    deterministic.run_until_parked();
+    assert_eq!(
+        participants(&room_a, &client_a, cx_a).await,
+        RoomParticipants {
+            remote: vec!["user_b".to_string()],
+            pending: Default::default()
+        }
+    );
+    assert_eq!(
+        participants(&room_b, &client_b, cx_b).await,
+        RoomParticipants {
+            remote: vec!["user_a".to_string()],
+            pending: Default::default()
+        }
+    );
+
+    #[derive(Debug, Eq, PartialEq)]
+    struct RoomParticipants {
+        remote: Vec<String>,
+        pending: Vec<String>,
+    }
+
+    async fn participants(
         room: &ModelHandle<Room>,
         client: &TestClient,
         cx: &mut TestAppContext,
-    ) -> Vec<String> {
-        let users = room.update(cx, |room, cx| {
+    ) -> RoomParticipants {
+        let remote_users = room.update(cx, |room, cx| {
             room.remote_participants()
                 .values()
                 .map(|participant| {
@@ -135,11 +211,29 @@ async fn test_share_project_in_room(
                 })
                 .collect::<Vec<_>>()
         });
-        let users = futures::future::try_join_all(users).await.unwrap();
-        users
-            .into_iter()
-            .map(|user| user.github_login.clone())
-            .collect()
+        let remote_users = futures::future::try_join_all(remote_users).await.unwrap();
+        let pending_users = room.update(cx, |room, cx| {
+            room.pending_user_ids()
+                .iter()
+                .map(|user_id| {
+                    client
+                        .user_store
+                        .update(cx, |users, cx| users.get_user(*user_id, cx))
+                })
+                .collect::<Vec<_>>()
+        });
+        let pending_users = futures::future::try_join_all(pending_users).await.unwrap();
+
+        RoomParticipants {
+            remote: remote_users
+                .into_iter()
+                .map(|user| user.github_login.clone())
+                .collect(),
+            pending: pending_users
+                .into_iter()
+                .map(|user| user.github_login.clone())
+                .collect(),
+        }
     }
 }
 

crates/collab/src/rpc.rs 🔗

@@ -154,6 +154,7 @@ impl Server {
             .add_request_handler(Server::create_room)
             .add_request_handler(Server::join_room)
             .add_request_handler(Server::call)
+            .add_message_handler(Server::decline_call)
             .add_request_handler(Server::register_project)
             .add_request_handler(Server::unregister_project)
             .add_request_handler(Server::join_project)
@@ -613,10 +614,10 @@ impl Server {
     ) -> Result<()> {
         let room_id = request.payload.id;
         let mut store = self.store().await;
-        let (room, recipient_ids) = store.join_room(room_id, request.sender_id)?;
-        for receiver_id in recipient_ids {
+        let (room, recipient_connection_ids) = store.join_room(room_id, request.sender_id)?;
+        for recipient_id in recipient_connection_ids {
             self.peer
-                .send(receiver_id, proto::CancelCall {})
+                .send(recipient_id, proto::CancelCall {})
                 .trace_err();
         }
         response.send(proto::JoinRoomResponse {
@@ -635,10 +636,10 @@ impl Server {
         let room_id = request.payload.room_id;
         let mut calls = {
             let mut store = self.store().await;
-            let (from_user_id, recipient_ids, room) =
+            let (from_user_id, recipient_connection_ids, room) =
                 store.call(room_id, request.sender_id, to_user_id)?;
             self.room_updated(room);
-            recipient_ids
+            recipient_connection_ids
                 .into_iter()
                 .map(|recipient_id| {
                     self.peer.request(
@@ -678,6 +679,21 @@ impl Server {
         Err(anyhow!("failed to ring call recipient"))?
     }
 
+    async fn decline_call(
+        self: Arc<Server>,
+        message: TypedEnvelope<proto::DeclineCall>,
+    ) -> Result<()> {
+        let mut store = self.store().await;
+        let (room, recipient_connection_ids) = store.call_declined(message.sender_id)?;
+        for recipient_id in recipient_connection_ids {
+            self.peer
+                .send(recipient_id, proto::CancelCall {})
+                .trace_err();
+        }
+        self.room_updated(room);
+        Ok(())
+    }
+
     fn room_updated(&self, room: &proto::Room) {
         for participant in &room.participants {
             self.peer

crates/collab/src/rpc/store.rs 🔗

@@ -384,7 +384,7 @@ impl Store {
             .get_mut(&connection_id)
             .ok_or_else(|| anyhow!("no such connection"))?;
         let user_id = connection.user_id;
-        let recipient_ids = self.connection_ids_for_user(user_id).collect::<Vec<_>>();
+        let recipient_connection_ids = self.connection_ids_for_user(user_id).collect::<Vec<_>>();
 
         let mut user_connection_state = self
             .connections_by_user_id
@@ -402,10 +402,10 @@ impl Store {
             .get_mut(&room_id)
             .ok_or_else(|| anyhow!("no such room"))?;
         anyhow::ensure!(
-            room.pending_calls_to_user_ids.contains(&user_id.to_proto()),
+            room.pending_user_ids.contains(&user_id.to_proto()),
             anyhow!("no such room")
         );
-        room.pending_calls_to_user_ids
+        room.pending_user_ids
             .retain(|pending| *pending != user_id.to_proto());
         room.participants.push(proto::Participant {
             user_id: user_id.to_proto(),
@@ -419,7 +419,7 @@ impl Store {
         });
         user_connection_state.room = Some(RoomState::Joined);
 
-        Ok((room, recipient_ids))
+        Ok((room, recipient_connection_ids))
     }
 
     pub fn call(
@@ -451,12 +451,12 @@ impl Store {
             "no such room"
         );
         anyhow::ensure!(
-            room.pending_calls_to_user_ids
+            room.pending_user_ids
                 .iter()
                 .all(|user_id| UserId::from_proto(*user_id) != to_user_id),
             "cannot call the same user more than once"
         );
-        room.pending_calls_to_user_ids.push(to_user_id.to_proto());
+        room.pending_user_ids.push(to_user_id.to_proto());
         to_user_connection_state.room = Some(RoomState::Calling { room_id });
 
         Ok((from_user_id, to_connection_ids, room))
@@ -473,11 +473,37 @@ impl Store {
             .rooms
             .get_mut(&room_id)
             .ok_or_else(|| anyhow!("no such room"))?;
-        room.pending_calls_to_user_ids
+        room.pending_user_ids
             .retain(|user_id| UserId::from_proto(*user_id) != to_user_id);
         Ok(room)
     }
 
+    pub fn call_declined(
+        &mut self,
+        recipient_connection_id: ConnectionId,
+    ) -> Result<(&proto::Room, Vec<ConnectionId>)> {
+        let recipient_user_id = self.user_id_for_connection(recipient_connection_id)?;
+        let mut to_user_connection_state = self
+            .connections_by_user_id
+            .get_mut(&recipient_user_id)
+            .ok_or_else(|| anyhow!("no such connection"))?;
+        if let Some(RoomState::Calling { room_id }) = to_user_connection_state.room {
+            to_user_connection_state.room = None;
+            let recipient_connection_ids = self
+                .connection_ids_for_user(recipient_user_id)
+                .collect::<Vec<_>>();
+            let room = self
+                .rooms
+                .get_mut(&room_id)
+                .ok_or_else(|| anyhow!("no such room"))?;
+            room.pending_user_ids
+                .retain(|user_id| UserId::from_proto(*user_id) != recipient_user_id);
+            Ok((room, recipient_connection_ids))
+        } else {
+            Err(anyhow!("user is not being called"))
+        }
+    }
+
     pub fn register_project(
         &mut self,
         host_connection_id: ConnectionId,

crates/room/src/room.rs 🔗

@@ -21,6 +21,7 @@ pub struct Room {
     id: u64,
     local_participant: LocalParticipant,
     remote_participants: HashMap<PeerId, RemoteParticipant>,
+    pending_user_ids: Vec<u64>,
     client: Arc<Client>,
     _subscriptions: Vec<client::Subscription>,
 }
@@ -62,6 +63,7 @@ impl Room {
                 projects: Default::default(),
             },
             remote_participants: Default::default(),
+            pending_user_ids: Default::default(),
             _subscriptions: vec![client.add_message_handler(cx.handle(), Self::handle_room_updated)],
             client,
         }
@@ -71,6 +73,10 @@ impl Room {
         &self.remote_participants
     }
 
+    pub fn pending_user_ids(&self) -> &[u64] {
+        &self.pending_user_ids
+    }
+
     async fn handle_room_updated(
         this: ModelHandle<Self>,
         envelope: TypedEnvelope<proto::RoomUpdated>,
@@ -100,6 +106,7 @@ impl Room {
                 );
             }
         }
+        self.pending_user_ids = room.pending_user_ids;
         cx.notify();
         Ok(())
     }

crates/rpc/proto/zed.proto 🔗

@@ -151,7 +151,7 @@ message JoinRoomResponse {
 
 message Room {
     repeated Participant participants = 1;
-    repeated uint64 pending_calls_to_user_ids = 2;
+    repeated uint64 pending_user_ids = 2;
 }
 
 message Participant {
@@ -187,9 +187,7 @@ message IncomingCall {
 
 message CancelCall {}
 
-message DeclineCall {
-    uint64 room_id = 1;
-}
+message DeclineCall {}
 
 message RoomUpdated {
     Room room = 1;