From 58947c5c7269ec5de2421cd018abe0d254626695 Mon Sep 17 00:00:00 2001 From: Antonio Scandurra Date: Fri, 11 Nov 2022 14:28:26 +0100 Subject: [PATCH] Move incoming calls into `Db` --- crates/collab/src/db.rs | 89 +++++++++++++++++++++++++++++++--- crates/collab/src/rpc.rs | 31 +++--------- crates/collab/src/rpc/store.rs | 48 +----------------- 3 files changed, 89 insertions(+), 79 deletions(-) diff --git a/crates/collab/src/db.rs b/crates/collab/src/db.rs index b7d6f995b0b5a595114c6582371f31816542863d..506606274d93e5d550888e130cb8915f222953e7 100644 --- a/crates/collab/src/db.rs +++ b/crates/collab/src/db.rs @@ -940,7 +940,7 @@ where calling_user_id: UserId, called_user_id: UserId, initial_project_id: Option, - ) -> Result { + ) -> Result<(proto::Room, proto::IncomingCall)> { test_support!(self, { let mut tx = self.pool.begin().await?; sqlx::query( @@ -967,10 +967,67 @@ where .execute(&mut tx) .await?; - self.commit_room_transaction(room_id, tx).await + let room = self.commit_room_transaction(room_id, tx).await?; + let incoming_call = + Self::build_incoming_call(&room, calling_user_id, initial_project_id); + Ok((room, incoming_call)) }) } + pub async fn incoming_call_for_user( + &self, + user_id: UserId, + ) -> Result> { + test_support!(self, { + let mut tx = self.pool.begin().await?; + let call = sqlx::query_as::<_, Call>( + " + SELECT * + FROM calls + WHERE called_user_id = $1 AND answering_connection_id IS NULL + ", + ) + .bind(user_id) + .fetch_optional(&mut tx) + .await?; + + if let Some(call) = call { + let room = self.get_room(call.room_id, &mut tx).await?; + Ok(Some(Self::build_incoming_call( + &room, + call.calling_user_id, + call.initial_project_id, + ))) + } else { + Ok(None) + } + }) + } + + fn build_incoming_call( + room: &proto::Room, + calling_user_id: UserId, + initial_project_id: Option, + ) -> proto::IncomingCall { + proto::IncomingCall { + room_id: room.id, + calling_user_id: calling_user_id.to_proto(), + participant_user_ids: room + .participants + .iter() + .map(|participant| participant.user_id) + .collect(), + initial_project: room.participants.iter().find_map(|participant| { + let initial_project_id = initial_project_id?.to_proto(); + participant + .projects + .iter() + .find(|project| project.id == initial_project_id) + .cloned() + }), + } + } + pub async fn call_failed( &self, room_id: RoomId, @@ -1066,7 +1123,17 @@ where .bind(room_id) .execute(&mut tx) .await?; + let room = self.get_room(room_id, &mut tx).await?; + tx.commit().await?; + + Ok(room) + } + async fn get_room( + &self, + room_id: RoomId, + tx: &mut sqlx::Transaction<'_, D>, + ) -> Result { let room: Room = sqlx::query_as( " SELECT * @@ -1075,7 +1142,7 @@ where ", ) .bind(room_id) - .fetch_one(&mut tx) + .fetch_one(&mut *tx) .await?; let mut db_participants = @@ -1087,7 +1154,7 @@ where ", ) .bind(room_id) - .fetch(&mut tx); + .fetch(&mut *tx); let mut participants = Vec::new(); let mut pending_participant_user_ids = Vec::new(); @@ -1120,7 +1187,7 @@ where ", ) .bind(room_id) - .fetch(&mut tx); + .fetch(&mut *tx); let mut projects = HashMap::default(); while let Some(entry) = entries.next().await { @@ -1139,9 +1206,6 @@ where participant.projects = projects.into_values().collect(); } - - tx.commit().await?; - Ok(proto::Room { id: room.id.to_proto(), version: room.version as u64, @@ -1566,6 +1630,15 @@ pub struct Room { pub live_kit_room: String, } +#[derive(Clone, Debug, Default, FromRow, PartialEq)] +pub struct Call { + pub room_id: RoomId, + pub calling_user_id: UserId, + pub called_user_id: UserId, + pub answering_connection_id: Option, + pub initial_project_id: Option, +} + id_type!(ProjectId); #[derive(Clone, Debug, Default, FromRow, Serialize, PartialEq)] pub struct Project { diff --git a/crates/collab/src/rpc.rs b/crates/collab/src/rpc.rs index 75ff703b1f6c3d283be78c85791d8f7a86977097..64affdb8252c0bce3dc318ecbc8e45b76fb5273d 100644 --- a/crates/collab/src/rpc.rs +++ b/crates/collab/src/rpc.rs @@ -346,11 +346,7 @@ impl Server { { let mut store = this.store().await; - let incoming_call = store.add_connection(connection_id, user_id, user.admin); - if let Some(incoming_call) = incoming_call { - this.peer.send(connection_id, incoming_call)?; - } - + store.add_connection(connection_id, user_id, user.admin); this.peer.send(connection_id, store.build_initial_contacts_update(contacts))?; if let Some((code, count)) = invite_code { @@ -360,6 +356,11 @@ impl Server { })?; } } + + if let Some(incoming_call) = this.app_state.db.incoming_call_for_user(user_id).await? { + this.peer.send(connection_id, incoming_call)?; + } + this.update_user_contacts(user_id).await?; let handle_io = handle_io.fuse(); @@ -726,7 +727,7 @@ impl Server { return Err(anyhow!("cannot call a user who isn't a contact"))?; } - let room = self + let (room, incoming_call) = self .app_state .db .call(room_id, calling_user_id, called_user_id, initial_project_id) @@ -734,24 +735,6 @@ impl Server { self.room_updated(&room); self.update_user_contacts(called_user_id).await?; - let incoming_call = proto::IncomingCall { - room_id: room_id.to_proto(), - calling_user_id: calling_user_id.to_proto(), - participant_user_ids: room - .participants - .iter() - .map(|participant| participant.user_id) - .collect(), - initial_project: room.participants.iter().find_map(|participant| { - let initial_project_id = initial_project_id?.to_proto(); - participant - .projects - .iter() - .find(|project| project.id == initial_project_id) - .cloned() - }), - }; - let mut calls = self .store() .await diff --git a/crates/collab/src/rpc/store.rs b/crates/collab/src/rpc/store.rs index 72da82ea8ce1c6a8ab5539531467de2a6296c2bc..f16910fac514bc0def6b17cf5a5e2ff97e169557 100644 --- a/crates/collab/src/rpc/store.rs +++ b/crates/collab/src/rpc/store.rs @@ -122,12 +122,7 @@ impl Store { } #[instrument(skip(self))] - pub fn add_connection( - &mut self, - connection_id: ConnectionId, - user_id: UserId, - admin: bool, - ) -> Option { + pub fn add_connection(&mut self, connection_id: ConnectionId, user_id: UserId, admin: bool) { self.connections.insert( connection_id, ConnectionState { @@ -138,27 +133,6 @@ impl Store { ); let connected_user = self.connected_users.entry(user_id).or_default(); connected_user.connection_ids.insert(connection_id); - if let Some(active_call) = connected_user.active_call { - if active_call.connection_id.is_some() { - None - } else { - let room = self.room(active_call.room_id)?; - Some(proto::IncomingCall { - room_id: active_call.room_id, - calling_user_id: active_call.calling_user_id.to_proto(), - participant_user_ids: room - .participants - .iter() - .map(|participant| participant.user_id) - .collect(), - initial_project: active_call - .initial_project_id - .and_then(|id| Self::build_participant_project(id, &self.projects)), - }) - } - } else { - None - } } #[instrument(skip(self))] @@ -411,10 +385,6 @@ impl Store { }) } - pub fn room(&self, room_id: RoomId) -> Option<&proto::Room> { - self.rooms.get(&room_id) - } - pub fn rooms(&self) -> &BTreeMap { &self.rooms } @@ -740,22 +710,6 @@ impl Store { Ok(connection_ids) } - fn build_participant_project( - project_id: ProjectId, - projects: &BTreeMap, - ) -> Option { - Some(proto::ParticipantProject { - id: project_id.to_proto(), - worktree_root_names: projects - .get(&project_id)? - .worktrees - .values() - .filter(|worktree| worktree.visible) - .map(|worktree| worktree.root_name.clone()) - .collect(), - }) - } - pub fn project_connection_ids( &self, project_id: ProjectId,