Get basic calls working again with sea-orm

Antonio Scandurra created

Change summary

crates/collab/src/db.rs | 471 +++++++++++++++++++++---------------------
1 file changed, 232 insertions(+), 239 deletions(-)

Detailed changes

crates/collab/src/db.rs 🔗

@@ -987,32 +987,22 @@ impl Database {
         initial_project_id: Option<ProjectId>,
     ) -> Result<RoomGuard<(proto::Room, proto::IncomingCall)>> {
         self.transact(|tx| async move {
-            todo!()
-            // sqlx::query(
-            //     "
-            //     INSERT INTO room_participants (
-            //     room_id,
-            //     user_id,
-            //     calling_user_id,
-            //     calling_connection_id,
-            //     initial_project_id
-            //     )
-            //     VALUES ($1, $2, $3, $4, $5)
-            //     ",
-            // )
-            // .bind(room_id)
-            // .bind(called_user_id)
-            // .bind(calling_user_id)
-            // .bind(calling_connection_id.0 as i32)
-            // .bind(initial_project_id)
-            // .execute(&mut tx)
-            // .await?;
+            room_participant::ActiveModel {
+                room_id: ActiveValue::set(room_id),
+                user_id: ActiveValue::set(called_user_id),
+                calling_user_id: ActiveValue::set(calling_user_id),
+                calling_connection_id: ActiveValue::set(calling_connection_id.0 as i32),
+                initial_project_id: ActiveValue::set(initial_project_id),
+                ..Default::default()
+            }
+            .insert(&tx)
+            .await?;
 
-            // let room = self.get_room(room_id, &mut tx).await?;
-            // let incoming_call = Self::build_incoming_call(&room, called_user_id)
-            //     .ok_or_else(|| anyhow!("failed to build incoming call"))?;
-            // self.commit_room_transaction(room_id, tx, (room, incoming_call))
-            // .await
+            let room = self.get_room(room_id, &tx).await?;
+            let incoming_call = Self::build_incoming_call(&room, called_user_id)
+                .ok_or_else(|| anyhow!("failed to build incoming call"))?;
+            self.commit_room_transaction(room_id, tx, (room, incoming_call))
+                .await
         })
         .await
     }
@@ -1023,20 +1013,16 @@ impl Database {
         called_user_id: UserId,
     ) -> Result<RoomGuard<proto::Room>> {
         self.transact(|tx| async move {
-            todo!()
-            // sqlx::query(
-            //     "
-            //     DELETE FROM room_participants
-            //     WHERE room_id = $1 AND user_id = $2
-            //     ",
-            // )
-            // .bind(room_id)
-            // .bind(called_user_id)
-            // .execute(&mut tx)
-            // .await?;
-
-            // let room = self.get_room(room_id, &mut tx).await?;
-            // self.commit_room_transaction(room_id, tx, room).await
+            room_participant::Entity::delete_many()
+                .filter(
+                    room_participant::Column::RoomId
+                        .eq(room_id)
+                        .and(room_participant::Column::UserId.eq(called_user_id)),
+                )
+                .exec(&tx)
+                .await?;
+            let room = self.get_room(room_id, &tx).await?;
+            self.commit_room_transaction(room_id, tx, room).await
         })
         .await
     }
@@ -1047,23 +1033,27 @@ impl Database {
         user_id: UserId,
     ) -> Result<RoomGuard<proto::Room>> {
         self.transact(|tx| async move {
-            todo!()
-            // let room_id = sqlx::query_scalar(
-            //     "
-            //     DELETE FROM room_participants
-            //     WHERE user_id = $1 AND answering_connection_id IS NULL
-            //     RETURNING room_id
-            //     ",
-            // )
-            // .bind(user_id)
-            // .fetch_one(&mut tx)
-            // .await?;
-            // if expected_room_id.map_or(false, |expected_room_id| expected_room_id != room_id) {
-            //     return Err(anyhow!("declining call on unexpected room"))?;
-            // }
+            let participant = room_participant::Entity::find()
+                .filter(
+                    room_participant::Column::UserId
+                        .eq(user_id)
+                        .and(room_participant::Column::AnsweringConnectionId.is_null()),
+                )
+                .one(&tx)
+                .await?
+                .ok_or_else(|| anyhow!("could not decline call"))?;
+            let room_id = participant.room_id;
 
-            // let room = self.get_room(room_id, &mut tx).await?;
-            // self.commit_room_transaction(room_id, tx, room).await
+            if expected_room_id.map_or(false, |expected_room_id| expected_room_id != room_id) {
+                return Err(anyhow!("declining call on unexpected room"))?;
+            }
+
+            room_participant::Entity::delete(participant.into_active_model())
+                .exec(&tx)
+                .await?;
+
+            let room = self.get_room(room_id, &tx).await?;
+            self.commit_room_transaction(room_id, tx, room).await
         })
         .await
     }
@@ -1075,24 +1065,30 @@ impl Database {
         called_user_id: UserId,
     ) -> Result<RoomGuard<proto::Room>> {
         self.transact(|tx| async move {
-            todo!()
-            // let room_id = sqlx::query_scalar(
-            //     "
-            //     DELETE FROM room_participants
-            //     WHERE user_id = $1 AND calling_connection_id = $2 AND answering_connection_id IS NULL
-            //     RETURNING room_id
-            //     ",
-            // )
-            // .bind(called_user_id)
-            // .bind(calling_connection_id.0 as i32)
-            // .fetch_one(&mut tx)
-            // .await?;
-            // if expected_room_id.map_or(false, |expected_room_id| expected_room_id != room_id) {
-            //     return Err(anyhow!("canceling call on unexpected room"))?;
-            // }
+            let participant = room_participant::Entity::find()
+                .filter(
+                    room_participant::Column::UserId
+                        .eq(called_user_id)
+                        .and(
+                            room_participant::Column::CallingConnectionId
+                                .eq(calling_connection_id.0 as i32),
+                        )
+                        .and(room_participant::Column::AnsweringConnectionId.is_null()),
+                )
+                .one(&tx)
+                .await?
+                .ok_or_else(|| anyhow!("could not cancel call"))?;
+            let room_id = participant.room_id;
+            if expected_room_id.map_or(false, |expected_room_id| expected_room_id != room_id) {
+                return Err(anyhow!("canceling call on unexpected room"))?;
+            }
 
-            // let room = self.get_room(room_id, &mut tx).await?;
-            // self.commit_room_transaction(room_id, tx, room).await
+            room_participant::Entity::delete(participant.into_active_model())
+                .exec(&tx)
+                .await?;
+
+            let room = self.get_room(room_id, &tx).await?;
+            self.commit_room_transaction(room_id, tx, room).await
         })
         .await
     }
@@ -1104,23 +1100,25 @@ impl Database {
         connection_id: ConnectionId,
     ) -> Result<RoomGuard<proto::Room>> {
         self.transact(|tx| async move {
-            todo!()
-            // sqlx::query(
-            //     "
-            //     UPDATE room_participants
-            //     SET answering_connection_id = $1
-            //     WHERE room_id = $2 AND user_id = $3
-            //     RETURNING 1
-            //     ",
-            // )
-            // .bind(connection_id.0 as i32)
-            // .bind(room_id)
-            // .bind(user_id)
-            // .fetch_one(&mut tx)
-            // .await?;
-
-            // let room = self.get_room(room_id, &mut tx).await?;
-            // self.commit_room_transaction(room_id, tx, room).await
+            let result = room_participant::Entity::update_many()
+                .filter(
+                    room_participant::Column::RoomId
+                        .eq(room_id)
+                        .and(room_participant::Column::UserId.eq(user_id))
+                        .and(room_participant::Column::AnsweringConnectionId.is_null()),
+                )
+                .col_expr(
+                    room_participant::Column::AnsweringConnectionId,
+                    connection_id.0.into(),
+                )
+                .exec(&tx)
+                .await?;
+            if result.rows_affected == 0 {
+                Err(anyhow!("room does not exist or was already joined"))?
+            } else {
+                let room = self.get_room(room_id, &tx).await?;
+                self.commit_room_transaction(room_id, tx, room).await
+            }
         })
         .await
     }
@@ -1130,124 +1128,117 @@ impl Database {
         connection_id: ConnectionId,
     ) -> Result<Option<RoomGuard<LeftRoom>>> {
         self.transact(|tx| async move {
-            todo!()
-            // // Leave room.
-            // let room_id = sqlx::query_scalar::<_, RoomId>(
-            //     "
-            //     DELETE FROM room_participants
-            //     WHERE answering_connection_id = $1
-            //     RETURNING room_id
-            //     ",
-            // )
-            // .bind(connection_id.0 as i32)
-            // .fetch_optional(&mut tx)
-            // .await?;
+            let leaving_participant = room_participant::Entity::find()
+                .filter(room_participant::Column::AnsweringConnectionId.eq(connection_id.0))
+                .one(&tx)
+                .await?;
 
-            // if let Some(room_id) = room_id {
-            //     // Cancel pending calls initiated by the leaving user.
-            //     let canceled_calls_to_user_ids: Vec<UserId> = sqlx::query_scalar(
-            //         "
-            //         DELETE FROM room_participants
-            //         WHERE calling_connection_id = $1 AND answering_connection_id IS NULL
-            //         RETURNING user_id
-            //         ",
-            //     )
-            //     .bind(connection_id.0 as i32)
-            //     .fetch_all(&mut tx)
-            //     .await?;
+            if let Some(leaving_participant) = leaving_participant {
+                // Leave room.
+                let room_id = leaving_participant.room_id;
+                room_participant::Entity::delete_by_id(leaving_participant.id)
+                    .exec(&tx)
+                    .await?;
 
-            //     let project_ids = sqlx::query_scalar::<_, ProjectId>(
-            //         "
-            //         SELECT project_id
-            //         FROM project_collaborators
-            //         WHERE connection_id = $1
-            //         ",
-            //     )
-            //     .bind(connection_id.0 as i32)
-            //     .fetch_all(&mut tx)
-            //     .await?;
+                // Cancel pending calls initiated by the leaving user.
+                let called_participants = room_participant::Entity::find()
+                    .filter(
+                        room_participant::Column::CallingConnectionId
+                            .eq(connection_id.0)
+                            .and(room_participant::Column::AnsweringConnectionId.is_null()),
+                    )
+                    .all(&tx)
+                    .await?;
+                room_participant::Entity::delete_many()
+                    .filter(
+                        room_participant::Column::Id
+                            .is_in(called_participants.iter().map(|participant| participant.id)),
+                    )
+                    .exec(&tx)
+                    .await?;
+                let canceled_calls_to_user_ids = called_participants
+                    .into_iter()
+                    .map(|participant| participant.user_id)
+                    .collect();
+
+                // Detect left projects.
+                #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
+                enum QueryProjectIds {
+                    ProjectId,
+                }
+                let project_ids: Vec<ProjectId> = project_collaborator::Entity::find()
+                    .select_only()
+                    .column_as(
+                        project_collaborator::Column::ProjectId,
+                        QueryProjectIds::ProjectId,
+                    )
+                    .filter(project_collaborator::Column::ConnectionId.eq(connection_id.0))
+                    .into_values::<_, QueryProjectIds>()
+                    .all(&tx)
+                    .await?;
+                let mut left_projects = HashMap::default();
+                let mut collaborators = project_collaborator::Entity::find()
+                    .filter(project_collaborator::Column::ProjectId.is_in(project_ids))
+                    .stream(&tx)
+                    .await?;
+                while let Some(collaborator) = collaborators.next().await {
+                    let collaborator = collaborator?;
+                    let left_project =
+                        left_projects
+                            .entry(collaborator.project_id)
+                            .or_insert(LeftProject {
+                                id: collaborator.project_id,
+                                host_user_id: Default::default(),
+                                connection_ids: Default::default(),
+                                host_connection_id: Default::default(),
+                            });
+
+                    let collaborator_connection_id =
+                        ConnectionId(collaborator.connection_id as u32);
+                    if collaborator_connection_id != connection_id {
+                        left_project.connection_ids.push(collaborator_connection_id);
+                    }
 
-            //     // Leave projects.
-            //     let mut left_projects = HashMap::default();
-            //     if !project_ids.is_empty() {
-            //         let mut params = "?,".repeat(project_ids.len());
-            //         params.pop();
-            //         let query = format!(
-            //             "
-            //             SELECT *
-            //             FROM project_collaborators
-            //             WHERE project_id IN ({params})
-            //             "
-            //         );
-            //         let mut query = sqlx::query_as::<_, ProjectCollaborator>(&query);
-            //         for project_id in project_ids {
-            //             query = query.bind(project_id);
-            //         }
+                    if collaborator.is_host {
+                        left_project.host_user_id = collaborator.user_id;
+                        left_project.host_connection_id =
+                            ConnectionId(collaborator.connection_id as u32);
+                    }
+                }
+                drop(collaborators);
 
-            //         let mut project_collaborators = query.fetch(&mut tx);
-            //         while let Some(collaborator) = project_collaborators.next().await {
-            //             let collaborator = collaborator?;
-            //             let left_project =
-            //                 left_projects
-            //                     .entry(collaborator.project_id)
-            //                     .or_insert(LeftProject {
-            //                         id: collaborator.project_id,
-            //                         host_user_id: Default::default(),
-            //                         connection_ids: Default::default(),
-            //                         host_connection_id: Default::default(),
-            //                     });
-
-            //             let collaborator_connection_id =
-            //                 ConnectionId(collaborator.connection_id as u32);
-            //             if collaborator_connection_id != connection_id {
-            //                 left_project.connection_ids.push(collaborator_connection_id);
-            //             }
-
-            //             if collaborator.is_host {
-            //                 left_project.host_user_id = collaborator.user_id;
-            //                 left_project.host_connection_id =
-            //                     ConnectionId(collaborator.connection_id as u32);
-            //             }
-            //         }
-            //     }
-            //     sqlx::query(
-            //         "
-            //         DELETE FROM project_collaborators
-            //         WHERE connection_id = $1
-            //         ",
-            //     )
-            //     .bind(connection_id.0 as i32)
-            //     .execute(&mut tx)
-            //     .await?;
+                // Leave projects.
+                project_collaborator::Entity::delete_many()
+                    .filter(project_collaborator::Column::ConnectionId.eq(connection_id.0))
+                    .exec(&tx)
+                    .await?;
 
-            //     // Unshare projects.
-            //     sqlx::query(
-            //         "
-            //         DELETE FROM projects
-            //         WHERE room_id = $1 AND host_connection_id = $2
-            //         ",
-            //     )
-            //     .bind(room_id)
-            //     .bind(connection_id.0 as i32)
-            //     .execute(&mut tx)
-            //     .await?;
+                // Unshare projects.
+                project::Entity::delete_many()
+                    .filter(
+                        project::Column::RoomId
+                            .eq(room_id)
+                            .and(project::Column::HostConnectionId.eq(connection_id.0)),
+                    )
+                    .exec(&tx)
+                    .await?;
 
-            //     let room = self.get_room(room_id, &mut tx).await?;
-            //     Ok(Some(
-            //         self.commit_room_transaction(
-            //             room_id,
-            //             tx,
-            //             LeftRoom {
-            //                 room,
-            //                 left_projects,
-            //                 canceled_calls_to_user_ids,
-            //             },
-            //         )
-            //         .await?,
-            //     ))
-            // } else {
-            //     Ok(None)
-            // }
+                let room = self.get_room(room_id, &tx).await?;
+                Ok(Some(
+                    self.commit_room_transaction(
+                        room_id,
+                        tx,
+                        LeftRoom {
+                            room,
+                            left_projects,
+                            canceled_calls_to_user_ids,
+                        },
+                    )
+                    .await?,
+                ))
+            } else {
+                Ok(None)
+            }
         })
         .await
     }
@@ -1259,46 +1250,48 @@ impl Database {
         location: proto::ParticipantLocation,
     ) -> Result<RoomGuard<proto::Room>> {
         self.transact(|tx| async {
-            todo!()
-            // let mut tx = tx;
-            // let location_kind;
-            // let location_project_id;
-            // match location
-            //     .variant
-            //     .as_ref()
-            //     .ok_or_else(|| anyhow!("invalid location"))?
-            // {
-            //     proto::participant_location::Variant::SharedProject(project) => {
-            //         location_kind = 0;
-            //         location_project_id = Some(ProjectId::from_proto(project.id));
-            //     }
-            //     proto::participant_location::Variant::UnsharedProject(_) => {
-            //         location_kind = 1;
-            //         location_project_id = None;
-            //     }
-            //     proto::participant_location::Variant::External(_) => {
-            //         location_kind = 2;
-            //         location_project_id = None;
-            //     }
-            // }
+            let mut tx = tx;
+            let location_kind;
+            let location_project_id;
+            match location
+                .variant
+                .as_ref()
+                .ok_or_else(|| anyhow!("invalid location"))?
+            {
+                proto::participant_location::Variant::SharedProject(project) => {
+                    location_kind = 0;
+                    location_project_id = Some(ProjectId::from_proto(project.id));
+                }
+                proto::participant_location::Variant::UnsharedProject(_) => {
+                    location_kind = 1;
+                    location_project_id = None;
+                }
+                proto::participant_location::Variant::External(_) => {
+                    location_kind = 2;
+                    location_project_id = None;
+                }
+            }
 
-            // sqlx::query(
-            //     "
-            //     UPDATE room_participants
-            //     SET location_kind = $1, location_project_id = $2
-            //     WHERE room_id = $3 AND answering_connection_id = $4
-            //     RETURNING 1
-            //     ",
-            // )
-            // .bind(location_kind)
-            // .bind(location_project_id)
-            // .bind(room_id)
-            // .bind(connection_id.0 as i32)
-            // .fetch_one(&mut tx)
-            // .await?;
+            let result = room_participant::Entity::update_many()
+                .filter(
+                    room_participant::Column::RoomId
+                        .eq(room_id)
+                        .and(room_participant::Column::AnsweringConnectionId.eq(connection_id.0)),
+                )
+                .set(room_participant::ActiveModel {
+                    location_kind: ActiveValue::set(Some(location_kind)),
+                    location_project_id: ActiveValue::set(location_project_id),
+                    ..Default::default()
+                })
+                .exec(&tx)
+                .await?;
 
-            // let room = self.get_room(room_id, &mut tx).await?;
-            // self.commit_room_transaction(room_id, tx, room).await
+            if result.rows_affected == 1 {
+                let room = self.get_room(room_id, &mut tx).await?;
+                self.commit_room_transaction(room_id, tx, room).await
+            } else {
+                Err(anyhow!("could not update room participant location"))?
+            }
         })
         .await
     }