Implement `Db::cancel_call`

Antonio Scandurra created

Change summary

crates/collab/src/db.rs        | 29 +++++++++++++++++++
crates/collab/src/rpc.rs       | 44 +++++++++++++++---------------
crates/collab/src/rpc/store.rs | 51 ------------------------------------
3 files changed, 50 insertions(+), 74 deletions(-)

Detailed changes

crates/collab/src/db.rs 🔗

@@ -1048,6 +1048,30 @@ where
         })
     }
 
+    pub async fn cancel_call(
+        &self,
+        room_id: RoomId,
+        calling_connection_id: ConnectionId,
+        called_user_id: UserId,
+    ) -> Result<proto::Room> {
+        test_support!(self, {
+            let mut tx = self.pool.begin().await?;
+            sqlx::query(
+                "
+                DELETE FROM room_participants
+                WHERE room_id = $1 AND user_id = $2 AND calling_connection_id = $3 AND answering_connection_id IS NULL
+                ",
+            )
+            .bind(room_id)
+            .bind(called_user_id)
+            .bind(calling_connection_id.0 as i32)
+            .execute(&mut tx)
+            .await?;
+
+            self.commit_room_transaction(room_id, tx).await
+        })
+    }
+
     pub async fn join_room(
         &self,
         room_id: RoomId,
@@ -1073,7 +1097,10 @@ where
         })
     }
 
-    pub async fn leave_room(&self, connection_id: ConnectionId) -> Result<Option<LeftRoom>> {
+    pub async fn leave_room_for_connection(
+        &self,
+        connection_id: ConnectionId,
+    ) -> Result<Option<LeftRoom>> {
         test_support!(self, {
             let mut tx = self.pool.begin().await?;
 

crates/collab/src/rpc.rs 🔗

@@ -558,13 +558,13 @@ impl Server {
                 request.sender_connection_id,
             )
             .await?;
-        for recipient_id in self
+        for connection_id in self
             .store()
             .await
             .connection_ids_for_user(request.sender_user_id)
         {
             self.peer
-                .send(recipient_id, proto::CallCanceled {})
+                .send(connection_id, proto::CallCanceled {})
                 .trace_err();
         }
 
@@ -610,7 +610,7 @@ impl Server {
     ) -> Result<()> {
         let mut contacts_to_update = HashSet::default();
 
-        let Some(left_room) = self.app_state.db.leave_room(connection_id).await? else {
+        let Some(left_room) = self.app_state.db.leave_room_for_connection(connection_id).await? else {
             return Err(anyhow!("no room to leave"))?;
         };
         contacts_to_update.insert(user_id);
@@ -751,7 +751,7 @@ impl Server {
         self.room_updated(&room);
         self.update_user_contacts(called_user_id).await?;
 
-        Err(anyhow!("failed to ring call recipient"))?
+        Err(anyhow!("failed to ring user"))?
     }
 
     async fn cancel_call(
@@ -759,23 +759,23 @@ impl Server {
         request: Message<proto::CancelCall>,
         response: Response<proto::CancelCall>,
     ) -> Result<()> {
-        let recipient_user_id = UserId::from_proto(request.payload.called_user_id);
-        {
-            let mut store = self.store().await;
-            let (room, recipient_connection_ids) = store.cancel_call(
-                request.payload.room_id,
-                recipient_user_id,
-                request.sender_connection_id,
-            )?;
-            for recipient_id in recipient_connection_ids {
-                self.peer
-                    .send(recipient_id, proto::CallCanceled {})
-                    .trace_err();
-            }
-            self.room_updated(room);
-            response.send(proto::Ack {})?;
+        let called_user_id = UserId::from_proto(request.payload.called_user_id);
+        let room_id = RoomId::from_proto(request.payload.room_id);
+
+        let room = self
+            .app_state
+            .db
+            .cancel_call(room_id, request.sender_connection_id, called_user_id)
+            .await?;
+        for connection_id in self.store().await.connection_ids_for_user(called_user_id) {
+            self.peer
+                .send(connection_id, proto::CallCanceled {})
+                .trace_err();
         }
-        self.update_user_contacts(recipient_user_id).await?;
+        self.room_updated(&room);
+        response.send(proto::Ack {})?;
+
+        self.update_user_contacts(called_user_id).await?;
         Ok(())
     }
 
@@ -788,13 +788,13 @@ impl Server {
                 message.sender_user_id,
             )
             .await?;
-        for recipient_id in self
+        for connection_id in self
             .store()
             .await
             .connection_ids_for_user(message.sender_user_id)
         {
             self.peer
-                .send(recipient_id, proto::CallCanceled {})
+                .send(connection_id, proto::CallCanceled {})
                 .trace_err();
         }
         self.room_updated(&room);

crates/collab/src/rpc/store.rs 🔗

@@ -211,57 +211,6 @@ impl Store {
         &self.rooms
     }
 
-    pub fn cancel_call(
-        &mut self,
-        room_id: RoomId,
-        called_user_id: UserId,
-        canceller_connection_id: ConnectionId,
-    ) -> Result<(&proto::Room, HashSet<ConnectionId>)> {
-        todo!()
-        // let canceller_user_id = self.user_id_for_connection(canceller_connection_id)?;
-        // let canceller = self
-        //     .connected_users
-        //     .get(&canceller_user_id)
-        //     .ok_or_else(|| anyhow!("no such connection"))?;
-        // let recipient = self
-        //     .connected_users
-        //     .get(&called_user_id)
-        //     .ok_or_else(|| anyhow!("no such connection"))?;
-        // let canceller_active_call = canceller
-        //     .active_call
-        //     .as_ref()
-        //     .ok_or_else(|| anyhow!("no active call"))?;
-        // let recipient_active_call = recipient
-        //     .active_call
-        //     .as_ref()
-        //     .ok_or_else(|| anyhow!("no active call for recipient"))?;
-
-        // anyhow::ensure!(
-        //     canceller_active_call.room_id == room_id,
-        //     "users are on different calls"
-        // );
-        // anyhow::ensure!(
-        //     recipient_active_call.room_id == room_id,
-        //     "users are on different calls"
-        // );
-        // anyhow::ensure!(
-        //     recipient_active_call.connection_id.is_none(),
-        //     "recipient has already answered"
-        // );
-        // let room_id = recipient_active_call.room_id;
-        // let room = self
-        //     .rooms
-        //     .get_mut(&room_id)
-        //     .ok_or_else(|| anyhow!("no such room"))?;
-        // room.pending_participant_user_ids
-        //     .retain(|user_id| UserId::from_proto(*user_id) != called_user_id);
-
-        // let recipient = self.connected_users.get_mut(&called_user_id).unwrap();
-        // recipient.active_call.take();
-
-        // Ok((room, recipient.connection_ids.clone()))
-    }
-
     pub fn unshare_project(
         &mut self,
         project_id: ProjectId,