From 59e8600e4c43e412f6088eb80dfe4a78f5fb3969 Mon Sep 17 00:00:00 2001 From: Antonio Scandurra Date: Mon, 14 Nov 2022 11:12:23 +0100 Subject: [PATCH] Implement `Db::cancel_call` --- crates/collab/src/db.rs | 29 ++++++++++++++++++- crates/collab/src/rpc.rs | 44 ++++++++++++++--------------- crates/collab/src/rpc/store.rs | 51 ---------------------------------- 3 files changed, 50 insertions(+), 74 deletions(-) diff --git a/crates/collab/src/db.rs b/crates/collab/src/db.rs index d329bf23e500615c42b927035b12de997cb1c153..50a333bced9aec4c5d4a761b49f4cdc084e9a711 100644 --- a/crates/collab/src/db.rs +++ b/crates/collab/src/db.rs @@ -1048,6 +1048,30 @@ where }) } + pub async fn cancel_call( + &self, + room_id: RoomId, + calling_connection_id: ConnectionId, + called_user_id: UserId, + ) -> Result { + test_support!(self, { + let mut tx = self.pool.begin().await?; + sqlx::query( + " + DELETE FROM room_participants + WHERE room_id = $1 AND user_id = $2 AND calling_connection_id = $3 AND answering_connection_id IS NULL + ", + ) + .bind(room_id) + .bind(called_user_id) + .bind(calling_connection_id.0 as i32) + .execute(&mut tx) + .await?; + + self.commit_room_transaction(room_id, tx).await + }) + } + pub async fn join_room( &self, room_id: RoomId, @@ -1073,7 +1097,10 @@ where }) } - pub async fn leave_room(&self, connection_id: ConnectionId) -> Result> { + pub async fn leave_room_for_connection( + &self, + connection_id: ConnectionId, + ) -> Result> { test_support!(self, { let mut tx = self.pool.begin().await?; diff --git a/crates/collab/src/rpc.rs b/crates/collab/src/rpc.rs index 038724c25a23eb353891c7e5fdb4c5aa9237361e..3e519d91aefae54ba16091c6e8bcd2f3230be9d5 100644 --- a/crates/collab/src/rpc.rs +++ b/crates/collab/src/rpc.rs @@ -558,13 +558,13 @@ impl Server { request.sender_connection_id, ) .await?; - for recipient_id in self + for connection_id in self .store() .await .connection_ids_for_user(request.sender_user_id) { self.peer - .send(recipient_id, proto::CallCanceled {}) + .send(connection_id, proto::CallCanceled {}) .trace_err(); } @@ -610,7 +610,7 @@ impl Server { ) -> Result<()> { let mut contacts_to_update = HashSet::default(); - let Some(left_room) = self.app_state.db.leave_room(connection_id).await? else { + let Some(left_room) = self.app_state.db.leave_room_for_connection(connection_id).await? else { return Err(anyhow!("no room to leave"))?; }; contacts_to_update.insert(user_id); @@ -751,7 +751,7 @@ impl Server { self.room_updated(&room); self.update_user_contacts(called_user_id).await?; - Err(anyhow!("failed to ring call recipient"))? + Err(anyhow!("failed to ring user"))? } async fn cancel_call( @@ -759,23 +759,23 @@ impl Server { request: Message, response: Response, ) -> Result<()> { - let recipient_user_id = UserId::from_proto(request.payload.called_user_id); - { - let mut store = self.store().await; - let (room, recipient_connection_ids) = store.cancel_call( - request.payload.room_id, - recipient_user_id, - request.sender_connection_id, - )?; - for recipient_id in recipient_connection_ids { - self.peer - .send(recipient_id, proto::CallCanceled {}) - .trace_err(); - } - self.room_updated(room); - response.send(proto::Ack {})?; + let called_user_id = UserId::from_proto(request.payload.called_user_id); + let room_id = RoomId::from_proto(request.payload.room_id); + + let room = self + .app_state + .db + .cancel_call(room_id, request.sender_connection_id, called_user_id) + .await?; + for connection_id in self.store().await.connection_ids_for_user(called_user_id) { + self.peer + .send(connection_id, proto::CallCanceled {}) + .trace_err(); } - self.update_user_contacts(recipient_user_id).await?; + self.room_updated(&room); + response.send(proto::Ack {})?; + + self.update_user_contacts(called_user_id).await?; Ok(()) } @@ -788,13 +788,13 @@ impl Server { message.sender_user_id, ) .await?; - for recipient_id in self + for connection_id in self .store() .await .connection_ids_for_user(message.sender_user_id) { self.peer - .send(recipient_id, proto::CallCanceled {}) + .send(connection_id, proto::CallCanceled {}) .trace_err(); } self.room_updated(&room); diff --git a/crates/collab/src/rpc/store.rs b/crates/collab/src/rpc/store.rs index 3896b8f7a40a9f7f2e1b09d032cf8b38dcd83cce..a9793e9fb67af8e97c11d79d59a3b7927d24d3cd 100644 --- a/crates/collab/src/rpc/store.rs +++ b/crates/collab/src/rpc/store.rs @@ -211,57 +211,6 @@ impl Store { &self.rooms } - pub fn cancel_call( - &mut self, - room_id: RoomId, - called_user_id: UserId, - canceller_connection_id: ConnectionId, - ) -> Result<(&proto::Room, HashSet)> { - todo!() - // let canceller_user_id = self.user_id_for_connection(canceller_connection_id)?; - // let canceller = self - // .connected_users - // .get(&canceller_user_id) - // .ok_or_else(|| anyhow!("no such connection"))?; - // let recipient = self - // .connected_users - // .get(&called_user_id) - // .ok_or_else(|| anyhow!("no such connection"))?; - // let canceller_active_call = canceller - // .active_call - // .as_ref() - // .ok_or_else(|| anyhow!("no active call"))?; - // let recipient_active_call = recipient - // .active_call - // .as_ref() - // .ok_or_else(|| anyhow!("no active call for recipient"))?; - - // anyhow::ensure!( - // canceller_active_call.room_id == room_id, - // "users are on different calls" - // ); - // anyhow::ensure!( - // recipient_active_call.room_id == room_id, - // "users are on different calls" - // ); - // anyhow::ensure!( - // recipient_active_call.connection_id.is_none(), - // "recipient has already answered" - // ); - // let room_id = recipient_active_call.room_id; - // let room = self - // .rooms - // .get_mut(&room_id) - // .ok_or_else(|| anyhow!("no such room"))?; - // room.pending_participant_user_ids - // .retain(|user_id| UserId::from_proto(*user_id) != called_user_id); - - // let recipient = self.connected_users.get_mut(&called_user_id).unwrap(); - // recipient.active_call.take(); - - // Ok((room, recipient.connection_ids.clone())) - } - pub fn unshare_project( &mut self, project_id: ProjectId,