From 1135aeecb8b9640111bc1e0c5566c8b3b64b7e4e Mon Sep 17 00:00:00 2001 From: Antonio Scandurra Date: Fri, 11 Nov 2022 16:59:54 +0100 Subject: [PATCH] WIP: Move `Store::leave_room` to `Db::leave_room` --- .../20221109000000_test_schema.sql | 4 +- .../20221111092550_reconnection_support.sql | 4 +- crates/collab/src/db.rs | 112 ++++++++++++++++++ crates/collab/src/rpc.rs | 71 ++++++----- crates/collab/src/rpc/store.rs | 96 +-------------- 5 files changed, 161 insertions(+), 126 deletions(-) diff --git a/crates/collab/migrations.sqlite/20221109000000_test_schema.sql b/crates/collab/migrations.sqlite/20221109000000_test_schema.sql index 5b38ebf8b1e9d88a12c18064f973aed183cf045b..44495f16ce368dd877df0cc6c04eef95fba04fa1 100644 --- a/crates/collab/migrations.sqlite/20221109000000_test_schema.sql +++ b/crates/collab/migrations.sqlite/20221109000000_test_schema.sql @@ -48,7 +48,7 @@ CREATE TABLE "projects" ( CREATE TABLE "project_collaborators" ( "id" INTEGER PRIMARY KEY, - "project_id" INTEGER NOT NULL REFERENCES projects (id), + "project_id" INTEGER NOT NULL REFERENCES projects (id) ON DELETE CASCADE, "connection_id" INTEGER NOT NULL, "user_id" INTEGER NOT NULL, "replica_id" INTEGER NOT NULL, @@ -58,7 +58,7 @@ CREATE INDEX "index_project_collaborators_on_project_id" ON "project_collaborato CREATE TABLE "worktrees" ( "id" INTEGER NOT NULL, - "project_id" INTEGER NOT NULL REFERENCES projects (id), + "project_id" INTEGER NOT NULL REFERENCES projects (id) ON DELETE CASCADE, "root_name" VARCHAR NOT NULL, PRIMARY KEY(project_id, id) ); diff --git a/crates/collab/migrations/20221111092550_reconnection_support.sql b/crates/collab/migrations/20221111092550_reconnection_support.sql index 621512bf43b3c2f39ce54a478f8e82825fc37cfd..ed6da2b7b14a3f31fdfffba4a80af286002f4739 100644 --- a/crates/collab/migrations/20221111092550_reconnection_support.sql +++ b/crates/collab/migrations/20221111092550_reconnection_support.sql @@ -10,7 +10,7 @@ ALTER TABLE "projects" CREATE TABLE "project_collaborators" ( "id" SERIAL PRIMARY KEY, - "project_id" INTEGER NOT NULL REFERENCES projects (id), + "project_id" INTEGER NOT NULL REFERENCES projects (id) ON DELETE CASCADE, "connection_id" INTEGER NOT NULL, "user_id" INTEGER NOT NULL, "replica_id" INTEGER NOT NULL, @@ -20,7 +20,7 @@ CREATE INDEX "index_project_collaborators_on_project_id" ON "project_collaborato CREATE TABLE IF NOT EXISTS "worktrees" ( "id" INTEGER NOT NULL, - "project_id" INTEGER NOT NULL REFERENCES projects (id), + "project_id" INTEGER NOT NULL REFERENCES projects (id) ON DELETE CASCADE, "root_name" VARCHAR NOT NULL, PRIMARY KEY(project_id, id) ); diff --git a/crates/collab/src/db.rs b/crates/collab/src/db.rs index 10f1dd04424ba7ed1c0fa07e685c285f8769b1eb..fc5e3c242b9ae05f2cbffe2615fc2edcaac77b8e 100644 --- a/crates/collab/src/db.rs +++ b/crates/collab/src/db.rs @@ -1070,6 +1070,97 @@ where }) } + pub async fn leave_room( + &self, + room_id: RoomId, + connection_id: ConnectionId, + ) -> Result { + test_support!(self, { + let mut tx = self.pool.begin().await?; + + // Leave room. + let user_id: UserId = sqlx::query_scalar( + " + DELETE FROM room_participants + WHERE room_id = $1 AND connection_id = $2 + RETURNING user_id + ", + ) + .bind(room_id) + .bind(connection_id.0 as i32) + .fetch_one(&mut tx) + .await?; + + // Cancel pending calls initiated by the leaving user. + let canceled_calls_to_user_ids: Vec = sqlx::query_scalar( + " + DELETE FROM room_participants + WHERE calling_user_id = $1 AND connection_id IS NULL + RETURNING user_id + ", + ) + .bind(room_id) + .bind(connection_id.0 as i32) + .fetch_all(&mut tx) + .await?; + + let mut project_collaborators = sqlx::query_as::<_, ProjectCollaborator>( + " + SELECT project_collaborators.* + FROM projects, project_collaborators + WHERE + projects.room_id = $1 AND + projects.user_id = $2 AND + projects.id = project_collaborators.project_id + ", + ) + .bind(room_id) + .bind(user_id) + .fetch(&mut tx); + + let mut left_projects = HashMap::default(); + while let Some(collaborator) = project_collaborators.next().await { + let collaborator = collaborator?; + let left_project = + left_projects + .entry(collaborator.project_id) + .or_insert(LeftProject { + id: collaborator.project_id, + host_user_id: Default::default(), + connection_ids: Default::default(), + }); + + let collaborator_connection_id = ConnectionId(collaborator.connection_id as u32); + if collaborator_connection_id != connection_id || collaborator.is_host { + left_project.connection_ids.push(collaborator_connection_id); + } + + if collaborator.is_host { + left_project.host_user_id = collaborator.user_id; + } + } + drop(project_collaborators); + + sqlx::query( + " + DELETE FROM projects + WHERE room_id = $1 AND user_id = $2 + ", + ) + .bind(room_id) + .bind(user_id) + .execute(&mut tx) + .await?; + + let room = self.commit_room_transaction(room_id, tx).await?; + Ok(LeftRoom { + room, + left_projects, + canceled_calls_to_user_ids, + }) + }) + } + pub async fn update_room_participant_location( &self, room_id: RoomId, @@ -1667,6 +1758,27 @@ pub struct Project { pub unregistered: bool, } +#[derive(Clone, Debug, Default, FromRow, PartialEq)] +pub struct ProjectCollaborator { + pub project_id: ProjectId, + pub connection_id: i32, + pub user_id: UserId, + pub replica_id: i32, + pub is_host: bool, +} + +pub struct LeftProject { + pub id: ProjectId, + pub host_user_id: UserId, + pub connection_ids: Vec, +} + +pub struct LeftRoom { + pub room: proto::Room, + pub left_projects: HashMap, + pub canceled_calls_to_user_ids: Vec, +} + #[derive(Clone, Debug, PartialEq, Eq)] pub enum Contact { Accepted { diff --git a/crates/collab/src/rpc.rs b/crates/collab/src/rpc.rs index 652ac5917b0e80b7745b8fd17673db8e4dcc1753..1221964601592223962f54e9ed5be38d24c7faaa 100644 --- a/crates/collab/src/rpc.rs +++ b/crates/collab/src/rpc.rs @@ -658,14 +658,20 @@ impl Server { async fn leave_room(self: Arc, message: Message) -> Result<()> { let mut contacts_to_update = HashSet::default(); - let room_left; - { - let mut store = self.store().await; - let left_room = store.leave_room(message.payload.id, message.sender_connection_id)?; - contacts_to_update.insert(message.sender_user_id); - for project in left_room.unshared_projects { - for connection_id in project.connection_ids() { + let left_room = self + .app_state + .db + .leave_room( + RoomId::from_proto(message.payload.id), + message.sender_connection_id, + ) + .await?; + contacts_to_update.insert(message.sender_user_id); + + for project in left_room.left_projects.into_values() { + if project.host_user_id == message.sender_user_id { + for connection_id in project.connection_ids { self.peer.send( connection_id, proto::UnshareProject { @@ -673,41 +679,42 @@ impl Server { }, )?; } - } - - for project in left_room.left_projects { - if project.remove_collaborator { - for connection_id in project.connection_ids { - self.peer.send( - connection_id, - proto::RemoveProjectCollaborator { - project_id: project.id.to_proto(), - peer_id: message.sender_connection_id.0, - }, - )?; - } - + } else { + for connection_id in project.connection_ids { self.peer.send( - message.sender_connection_id, - proto::UnshareProject { + connection_id, + proto::RemoveProjectCollaborator { project_id: project.id.to_proto(), + peer_id: message.sender_connection_id.0, }, )?; } - } - self.room_updated(&left_room.room); - room_left = self.room_left(&left_room.room, message.sender_connection_id); + self.peer.send( + message.sender_connection_id, + proto::UnshareProject { + project_id: project.id.to_proto(), + }, + )?; + } + } - for connection_id in left_room.canceled_call_connection_ids { - self.peer - .send(connection_id, proto::CallCanceled {}) - .trace_err(); - contacts_to_update.extend(store.user_id_for_connection(connection_id).ok()); + self.room_updated(&left_room.room); + { + let store = self.store().await; + for user_id in left_room.canceled_calls_to_user_ids { + for connection_id in store.connection_ids_for_user(user_id) { + self.peer + .send(connection_id, proto::CallCanceled {}) + .trace_err(); + } + contacts_to_update.insert(user_id); } } - room_left.await.trace_err(); + self.room_left(&left_room.room, message.sender_connection_id) + .await + .trace_err(); for user_id in contacts_to_update { self.update_user_contacts(user_id).await?; } diff --git a/crates/collab/src/rpc/store.rs b/crates/collab/src/rpc/store.rs index d64464f601be06f89ac583e3edd26e08716ac806..4ea2c7b38ef9b78c9ca02ad4653f1707a0781387 100644 --- a/crates/collab/src/rpc/store.rs +++ b/crates/collab/src/rpc/store.rs @@ -90,13 +90,6 @@ pub struct LeftProject { pub remove_collaborator: bool, } -pub struct LeftRoom<'a> { - pub room: Cow<'a, proto::Room>, - pub unshared_projects: Vec, - pub left_projects: Vec, - pub canceled_call_connection_ids: Vec, -} - #[derive(Copy, Clone)] pub struct Metrics { pub connections: usize, @@ -156,11 +149,12 @@ impl Store { if let Some(active_call) = connected_user.active_call.as_ref() { let room_id = active_call.room_id; if active_call.connection_id == Some(connection_id) { - let left_room = self.leave_room(room_id, connection_id)?; - result.hosted_projects = left_room.unshared_projects; - result.guest_projects = left_room.left_projects; - result.room = Some(Cow::Owned(left_room.room.into_owned())); - result.canceled_call_connection_ids = left_room.canceled_call_connection_ids; + todo!() + // let left_room = self.leave_room(room_id, connection_id)?; + // result.hosted_projects = left_room.unshared_projects; + // result.guest_projects = left_room.left_projects; + // result.room = Some(Cow::Owned(left_room.room.into_owned())); + // result.canceled_call_connection_ids = left_room.canceled_call_connection_ids; } else if connected_user.connection_ids.len() == 1 { todo!() // let (room, _) = self.decline_call(room_id, connection_id)?; @@ -258,84 +252,6 @@ impl Store { } } - pub fn leave_room(&mut self, room_id: RoomId, connection_id: ConnectionId) -> Result { - todo!() - // let connection = self - // .connections - // .get_mut(&connection_id) - // .ok_or_else(|| anyhow!("no such connection"))?; - // let user_id = connection.user_id; - - // let connected_user = self - // .connected_users - // .get(&user_id) - // .ok_or_else(|| anyhow!("no such connection"))?; - // anyhow::ensure!( - // connected_user - // .active_call - // .map_or(false, |call| call.room_id == room_id - // && call.connection_id == Some(connection_id)), - // "cannot leave a room before joining it" - // ); - - // // Given that users can only join one room at a time, we can safely unshare - // // and leave all projects associated with the connection. - // let mut unshared_projects = Vec::new(); - // let mut left_projects = Vec::new(); - // for project_id in connection.projects.clone() { - // if let Ok((_, project)) = self.unshare_project(project_id, connection_id) { - // unshared_projects.push(project); - // } else if let Ok(project) = self.leave_project(project_id, connection_id) { - // left_projects.push(project); - // } - // } - // self.connected_users.get_mut(&user_id).unwrap().active_call = None; - - // let room = self - // .rooms - // .get_mut(&room_id) - // .ok_or_else(|| anyhow!("no such room"))?; - // room.participants - // .retain(|participant| participant.peer_id != connection_id.0); - - // let mut canceled_call_connection_ids = Vec::new(); - // room.pending_participant_user_ids - // .retain(|pending_participant_user_id| { - // if let Some(connected_user) = self - // .connected_users - // .get_mut(&UserId::from_proto(*pending_participant_user_id)) - // { - // if let Some(call) = connected_user.active_call.as_ref() { - // if call.calling_user_id == user_id { - // connected_user.active_call.take(); - // canceled_call_connection_ids - // .extend(connected_user.connection_ids.iter().copied()); - // false - // } else { - // true - // } - // } else { - // true - // } - // } else { - // true - // } - // }); - - // let room = if room.participants.is_empty() { - // Cow::Owned(self.rooms.remove(&room_id).unwrap()) - // } else { - // Cow::Borrowed(self.rooms.get(&room_id).unwrap()) - // }; - - // Ok(LeftRoom { - // room, - // unshared_projects, - // left_projects, - // canceled_call_connection_ids, - // }) - } - pub fn rooms(&self) -> &BTreeMap { &self.rooms }