WIP: Move `Store::leave_room` to `Db::leave_room`

Antonio Scandurra created

Change summary

crates/collab/migrations.sqlite/20221109000000_test_schema.sql   |   4 
crates/collab/migrations/20221111092550_reconnection_support.sql |   4 
crates/collab/src/db.rs                                          | 112 ++
crates/collab/src/rpc.rs                                         |  71 
crates/collab/src/rpc/store.rs                                   |  96 -
5 files changed, 161 insertions(+), 126 deletions(-)

Detailed changes

crates/collab/migrations.sqlite/20221109000000_test_schema.sql 🔗

@@ -48,7 +48,7 @@ CREATE TABLE "projects" (
 
 CREATE TABLE "project_collaborators" (
     "id" INTEGER PRIMARY KEY,
-    "project_id" INTEGER NOT NULL REFERENCES projects (id),
+    "project_id" INTEGER NOT NULL REFERENCES projects (id) ON DELETE CASCADE,
     "connection_id" INTEGER NOT NULL,
     "user_id" INTEGER NOT NULL,
     "replica_id" INTEGER NOT NULL,
@@ -58,7 +58,7 @@ CREATE INDEX "index_project_collaborators_on_project_id" ON "project_collaborato
 
 CREATE TABLE "worktrees" (
     "id" INTEGER NOT NULL,
-    "project_id" INTEGER NOT NULL REFERENCES projects (id),
+    "project_id" INTEGER NOT NULL REFERENCES projects (id) ON DELETE CASCADE,
     "root_name" VARCHAR NOT NULL,
     PRIMARY KEY(project_id, id)
 );

crates/collab/migrations/20221111092550_reconnection_support.sql 🔗

@@ -10,7 +10,7 @@ ALTER TABLE "projects"
 
 CREATE TABLE "project_collaborators" (
     "id" SERIAL PRIMARY KEY,
-    "project_id" INTEGER NOT NULL REFERENCES projects (id),
+    "project_id" INTEGER NOT NULL REFERENCES projects (id) ON DELETE CASCADE,
     "connection_id" INTEGER NOT NULL,
     "user_id" INTEGER NOT NULL,
     "replica_id" INTEGER NOT NULL,
@@ -20,7 +20,7 @@ CREATE INDEX "index_project_collaborators_on_project_id" ON "project_collaborato
 
 CREATE TABLE IF NOT EXISTS "worktrees" (
     "id" INTEGER NOT NULL,
-    "project_id" INTEGER NOT NULL REFERENCES projects (id),
+    "project_id" INTEGER NOT NULL REFERENCES projects (id) ON DELETE CASCADE,
     "root_name" VARCHAR NOT NULL,
     PRIMARY KEY(project_id, id)
 );

crates/collab/src/db.rs 🔗

@@ -1070,6 +1070,97 @@ where
         })
     }
 
+    pub async fn leave_room(
+        &self,
+        room_id: RoomId,
+        connection_id: ConnectionId,
+    ) -> Result<LeftRoom> {
+        test_support!(self, {
+            let mut tx = self.pool.begin().await?;
+
+            // Leave room.
+            let user_id: UserId = sqlx::query_scalar(
+                "
+                DELETE FROM room_participants
+                WHERE room_id = $1 AND connection_id = $2
+                RETURNING user_id
+                ",
+            )
+            .bind(room_id)
+            .bind(connection_id.0 as i32)
+            .fetch_one(&mut tx)
+            .await?;
+
+            // Cancel pending calls initiated by the leaving user.
+            let canceled_calls_to_user_ids: Vec<UserId> = sqlx::query_scalar(
+                "
+                DELETE FROM room_participants
+                WHERE calling_user_id = $1 AND connection_id IS NULL
+                RETURNING user_id
+                ",
+            )
+            .bind(room_id)
+            .bind(connection_id.0 as i32)
+            .fetch_all(&mut tx)
+            .await?;
+
+            let mut project_collaborators = sqlx::query_as::<_, ProjectCollaborator>(
+                "
+                SELECT project_collaborators.*
+                FROM projects, project_collaborators
+                WHERE
+                    projects.room_id = $1 AND
+                    projects.user_id = $2 AND
+                    projects.id = project_collaborators.project_id
+                ",
+            )
+            .bind(room_id)
+            .bind(user_id)
+            .fetch(&mut tx);
+
+            let mut left_projects = HashMap::default();
+            while let Some(collaborator) = project_collaborators.next().await {
+                let collaborator = collaborator?;
+                let left_project =
+                    left_projects
+                        .entry(collaborator.project_id)
+                        .or_insert(LeftProject {
+                            id: collaborator.project_id,
+                            host_user_id: Default::default(),
+                            connection_ids: Default::default(),
+                        });
+
+                let collaborator_connection_id = ConnectionId(collaborator.connection_id as u32);
+                if collaborator_connection_id != connection_id || collaborator.is_host {
+                    left_project.connection_ids.push(collaborator_connection_id);
+                }
+
+                if collaborator.is_host {
+                    left_project.host_user_id = collaborator.user_id;
+                }
+            }
+            drop(project_collaborators);
+
+            sqlx::query(
+                "
+                DELETE FROM projects
+                WHERE room_id = $1 AND user_id = $2
+                ",
+            )
+            .bind(room_id)
+            .bind(user_id)
+            .execute(&mut tx)
+            .await?;
+
+            let room = self.commit_room_transaction(room_id, tx).await?;
+            Ok(LeftRoom {
+                room,
+                left_projects,
+                canceled_calls_to_user_ids,
+            })
+        })
+    }
+
     pub async fn update_room_participant_location(
         &self,
         room_id: RoomId,
@@ -1667,6 +1758,27 @@ pub struct Project {
     pub unregistered: bool,
 }
 
+#[derive(Clone, Debug, Default, FromRow, PartialEq)]
+pub struct ProjectCollaborator {
+    pub project_id: ProjectId,
+    pub connection_id: i32,
+    pub user_id: UserId,
+    pub replica_id: i32,
+    pub is_host: bool,
+}
+
+pub struct LeftProject {
+    pub id: ProjectId,
+    pub host_user_id: UserId,
+    pub connection_ids: Vec<ConnectionId>,
+}
+
+pub struct LeftRoom {
+    pub room: proto::Room,
+    pub left_projects: HashMap<ProjectId, LeftProject>,
+    pub canceled_calls_to_user_ids: Vec<UserId>,
+}
+
 #[derive(Clone, Debug, PartialEq, Eq)]
 pub enum Contact {
     Accepted {

crates/collab/src/rpc.rs 🔗

@@ -658,14 +658,20 @@ impl Server {
 
     async fn leave_room(self: Arc<Server>, message: Message<proto::LeaveRoom>) -> Result<()> {
         let mut contacts_to_update = HashSet::default();
-        let room_left;
-        {
-            let mut store = self.store().await;
-            let left_room = store.leave_room(message.payload.id, message.sender_connection_id)?;
-            contacts_to_update.insert(message.sender_user_id);
 
-            for project in left_room.unshared_projects {
-                for connection_id in project.connection_ids() {
+        let left_room = self
+            .app_state
+            .db
+            .leave_room(
+                RoomId::from_proto(message.payload.id),
+                message.sender_connection_id,
+            )
+            .await?;
+        contacts_to_update.insert(message.sender_user_id);
+
+        for project in left_room.left_projects.into_values() {
+            if project.host_user_id == message.sender_user_id {
+                for connection_id in project.connection_ids {
                     self.peer.send(
                         connection_id,
                         proto::UnshareProject {
@@ -673,41 +679,42 @@ impl Server {
                         },
                     )?;
                 }
-            }
-
-            for project in left_room.left_projects {
-                if project.remove_collaborator {
-                    for connection_id in project.connection_ids {
-                        self.peer.send(
-                            connection_id,
-                            proto::RemoveProjectCollaborator {
-                                project_id: project.id.to_proto(),
-                                peer_id: message.sender_connection_id.0,
-                            },
-                        )?;
-                    }
-
+            } else {
+                for connection_id in project.connection_ids {
                     self.peer.send(
-                        message.sender_connection_id,
-                        proto::UnshareProject {
+                        connection_id,
+                        proto::RemoveProjectCollaborator {
                             project_id: project.id.to_proto(),
+                            peer_id: message.sender_connection_id.0,
                         },
                     )?;
                 }
-            }
 
-            self.room_updated(&left_room.room);
-            room_left = self.room_left(&left_room.room, message.sender_connection_id);
+                self.peer.send(
+                    message.sender_connection_id,
+                    proto::UnshareProject {
+                        project_id: project.id.to_proto(),
+                    },
+                )?;
+            }
+        }
 
-            for connection_id in left_room.canceled_call_connection_ids {
-                self.peer
-                    .send(connection_id, proto::CallCanceled {})
-                    .trace_err();
-                contacts_to_update.extend(store.user_id_for_connection(connection_id).ok());
+        self.room_updated(&left_room.room);
+        {
+            let store = self.store().await;
+            for user_id in left_room.canceled_calls_to_user_ids {
+                for connection_id in store.connection_ids_for_user(user_id) {
+                    self.peer
+                        .send(connection_id, proto::CallCanceled {})
+                        .trace_err();
+                }
+                contacts_to_update.insert(user_id);
             }
         }
 
-        room_left.await.trace_err();
+        self.room_left(&left_room.room, message.sender_connection_id)
+            .await
+            .trace_err();
         for user_id in contacts_to_update {
             self.update_user_contacts(user_id).await?;
         }

crates/collab/src/rpc/store.rs 🔗

@@ -90,13 +90,6 @@ pub struct LeftProject {
     pub remove_collaborator: bool,
 }
 
-pub struct LeftRoom<'a> {
-    pub room: Cow<'a, proto::Room>,
-    pub unshared_projects: Vec<Project>,
-    pub left_projects: Vec<LeftProject>,
-    pub canceled_call_connection_ids: Vec<ConnectionId>,
-}
-
 #[derive(Copy, Clone)]
 pub struct Metrics {
     pub connections: usize,
@@ -156,11 +149,12 @@ impl Store {
         if let Some(active_call) = connected_user.active_call.as_ref() {
             let room_id = active_call.room_id;
             if active_call.connection_id == Some(connection_id) {
-                let left_room = self.leave_room(room_id, connection_id)?;
-                result.hosted_projects = left_room.unshared_projects;
-                result.guest_projects = left_room.left_projects;
-                result.room = Some(Cow::Owned(left_room.room.into_owned()));
-                result.canceled_call_connection_ids = left_room.canceled_call_connection_ids;
+                todo!()
+                // let left_room = self.leave_room(room_id, connection_id)?;
+                // result.hosted_projects = left_room.unshared_projects;
+                // result.guest_projects = left_room.left_projects;
+                // result.room = Some(Cow::Owned(left_room.room.into_owned()));
+                // result.canceled_call_connection_ids = left_room.canceled_call_connection_ids;
             } else if connected_user.connection_ids.len() == 1 {
                 todo!()
                 // let (room, _) = self.decline_call(room_id, connection_id)?;
@@ -258,84 +252,6 @@ impl Store {
         }
     }
 
-    pub fn leave_room(&mut self, room_id: RoomId, connection_id: ConnectionId) -> Result<LeftRoom> {
-        todo!()
-        // let connection = self
-        //     .connections
-        //     .get_mut(&connection_id)
-        //     .ok_or_else(|| anyhow!("no such connection"))?;
-        // let user_id = connection.user_id;
-
-        // let connected_user = self
-        //     .connected_users
-        //     .get(&user_id)
-        //     .ok_or_else(|| anyhow!("no such connection"))?;
-        // anyhow::ensure!(
-        //     connected_user
-        //         .active_call
-        //         .map_or(false, |call| call.room_id == room_id
-        //             && call.connection_id == Some(connection_id)),
-        //     "cannot leave a room before joining it"
-        // );
-
-        // // Given that users can only join one room at a time, we can safely unshare
-        // // and leave all projects associated with the connection.
-        // let mut unshared_projects = Vec::new();
-        // let mut left_projects = Vec::new();
-        // for project_id in connection.projects.clone() {
-        //     if let Ok((_, project)) = self.unshare_project(project_id, connection_id) {
-        //         unshared_projects.push(project);
-        //     } else if let Ok(project) = self.leave_project(project_id, connection_id) {
-        //         left_projects.push(project);
-        //     }
-        // }
-        // self.connected_users.get_mut(&user_id).unwrap().active_call = None;
-
-        // let room = self
-        //     .rooms
-        //     .get_mut(&room_id)
-        //     .ok_or_else(|| anyhow!("no such room"))?;
-        // room.participants
-        //     .retain(|participant| participant.peer_id != connection_id.0);
-
-        // let mut canceled_call_connection_ids = Vec::new();
-        // room.pending_participant_user_ids
-        //     .retain(|pending_participant_user_id| {
-        //         if let Some(connected_user) = self
-        //             .connected_users
-        //             .get_mut(&UserId::from_proto(*pending_participant_user_id))
-        //         {
-        //             if let Some(call) = connected_user.active_call.as_ref() {
-        //                 if call.calling_user_id == user_id {
-        //                     connected_user.active_call.take();
-        //                     canceled_call_connection_ids
-        //                         .extend(connected_user.connection_ids.iter().copied());
-        //                     false
-        //                 } else {
-        //                     true
-        //                 }
-        //             } else {
-        //                 true
-        //             }
-        //         } else {
-        //             true
-        //         }
-        //     });
-
-        // let room = if room.participants.is_empty() {
-        //     Cow::Owned(self.rooms.remove(&room_id).unwrap())
-        // } else {
-        //     Cow::Borrowed(self.rooms.get(&room_id).unwrap())
-        // };
-
-        // Ok(LeftRoom {
-        //     room,
-        //     unshared_projects,
-        //     left_projects,
-        //     canceled_call_connection_ids,
-        // })
-    }
-
     pub fn rooms(&self) -> &BTreeMap<RoomId, proto::Room> {
         &self.rooms
     }