Always cast connection ids to i32

Antonio Scandurra created

Postgres doesn't support unsigned types. This also adds indices to
support querying `project_collaborators` and `room_participants`
by connection id.

Change summary

crates/collab/migrations.sqlite/20221109000000_test_schema.sql                       |  4 
crates/collab/migrations/20221207165001_add_connection_lost_to_room_participants.sql |  5 
crates/collab/src/db.rs                                                              | 28 
3 files changed, 22 insertions(+), 15 deletions(-)

Detailed changes

crates/collab/migrations.sqlite/20221109000000_test_schema.sql 🔗

@@ -111,6 +111,8 @@ CREATE TABLE "project_collaborators" (
 CREATE INDEX "index_project_collaborators_on_project_id" ON "project_collaborators" ("project_id");
 CREATE UNIQUE INDEX "index_project_collaborators_on_project_id_and_replica_id" ON "project_collaborators" ("project_id", "replica_id");
 CREATE INDEX "index_project_collaborators_on_connection_epoch" ON "project_collaborators" ("connection_epoch");
+CREATE INDEX "index_project_collaborators_on_connection_id" ON "project_collaborators" ("connection_id");
+CREATE UNIQUE INDEX "index_project_collaborators_on_project_id_connection_id_and_epoch" ON "project_collaborators" ("project_id", "connection_id", "connection_epoch");
 
 CREATE TABLE "room_participants" (
     "id" INTEGER PRIMARY KEY AUTOINCREMENT,
@@ -129,3 +131,5 @@ CREATE TABLE "room_participants" (
 CREATE UNIQUE INDEX "index_room_participants_on_user_id" ON "room_participants" ("user_id");
 CREATE INDEX "index_room_participants_on_answering_connection_epoch" ON "room_participants" ("answering_connection_epoch");
 CREATE INDEX "index_room_participants_on_calling_connection_epoch" ON "room_participants" ("calling_connection_epoch");
+CREATE INDEX "index_room_participants_on_answering_connection_id" ON "room_participants" ("answering_connection_id");
+CREATE UNIQUE INDEX "index_room_participants_on_answering_connection_id_and_answering_connection_epoch" ON "room_participants" ("answering_connection_id", "answering_connection_epoch");

crates/collab/migrations/20221207165001_add_connection_lost_to_room_participants.sql 🔗

@@ -1,2 +1,7 @@
 ALTER TABLE "room_participants"
     ADD "answering_connection_lost" BOOLEAN NOT NULL DEFAULT FALSE;
+
+CREATE INDEX "index_project_collaborators_on_connection_id" ON "project_collaborators" ("connection_id");
+CREATE UNIQUE INDEX "index_project_collaborators_on_project_id_connection_id_and_epoch" ON "project_collaborators" ("project_id", "connection_id", "connection_epoch");
+CREATE INDEX "index_room_participants_on_answering_connection_id" ON "room_participants" ("answering_connection_id");
+CREATE UNIQUE INDEX "index_room_participants_on_answering_connection_id_and_answering_connection_epoch" ON "room_participants" ("answering_connection_id", "answering_connection_epoch");

crates/collab/src/db.rs 🔗

@@ -1204,7 +1204,7 @@ impl Database {
     pub async fn leave_room(&self, connection_id: ConnectionId) -> Result<RoomGuard<LeftRoom>> {
         self.room_transaction(|tx| async move {
             let leaving_participant = room_participant::Entity::find()
-                .filter(room_participant::Column::AnsweringConnectionId.eq(connection_id.0))
+                .filter(room_participant::Column::AnsweringConnectionId.eq(connection_id.0 as i32))
                 .one(&*tx)
                 .await?;
 
@@ -1247,7 +1247,7 @@ impl Database {
                         project_collaborator::Column::ProjectId,
                         QueryProjectIds::ProjectId,
                     )
-                    .filter(project_collaborator::Column::ConnectionId.eq(connection_id.0))
+                    .filter(project_collaborator::Column::ConnectionId.eq(connection_id.0 as i32))
                     .into_values::<_, QueryProjectIds>()
                     .all(&*tx)
                     .await?;
@@ -1284,7 +1284,7 @@ impl Database {
 
                 // Leave projects.
                 project_collaborator::Entity::delete_many()
-                    .filter(project_collaborator::Column::ConnectionId.eq(connection_id.0))
+                    .filter(project_collaborator::Column::ConnectionId.eq(connection_id.0 as i32))
                     .exec(&*tx)
                     .await?;
 
@@ -1293,7 +1293,7 @@ impl Database {
                     .filter(
                         project::Column::RoomId
                             .eq(room_id)
-                            .and(project::Column::HostConnectionId.eq(connection_id.0)),
+                            .and(project::Column::HostConnectionId.eq(connection_id.0 as i32)),
                     )
                     .exec(&*tx)
                     .await?;
@@ -1351,11 +1351,9 @@ impl Database {
             }
 
             let result = room_participant::Entity::update_many()
-                .filter(
-                    room_participant::Column::RoomId
-                        .eq(room_id)
-                        .and(room_participant::Column::AnsweringConnectionId.eq(connection_id.0)),
-                )
+                .filter(room_participant::Column::RoomId.eq(room_id).and(
+                    room_participant::Column::AnsweringConnectionId.eq(connection_id.0 as i32),
+                ))
                 .set(room_participant::ActiveModel {
                     location_kind: ActiveValue::set(Some(location_kind)),
                     location_project_id: ActiveValue::set(location_project_id),
@@ -1399,7 +1397,7 @@ impl Database {
                 .all(&*tx)
                 .await?;
             project_collaborator::Entity::delete_many()
-                .filter(project_collaborator::Column::ConnectionId.eq(connection_id.0))
+                .filter(project_collaborator::Column::ConnectionId.eq(connection_id.0 as i32))
                 .exec(&*tx)
                 .await?;
 
@@ -1581,7 +1579,7 @@ impl Database {
     ) -> Result<RoomGuard<(ProjectId, proto::Room)>> {
         self.room_transaction(|tx| async move {
             let participant = room_participant::Entity::find()
-                .filter(room_participant::Column::AnsweringConnectionId.eq(connection_id.0))
+                .filter(room_participant::Column::AnsweringConnectionId.eq(connection_id.0 as i32))
                 .one(&*tx)
                 .await?
                 .ok_or_else(|| anyhow!("could not find participant"))?;
@@ -1667,7 +1665,7 @@ impl Database {
     ) -> Result<RoomGuard<(proto::Room, Vec<ConnectionId>)>> {
         self.room_transaction(|tx| async move {
             let project = project::Entity::find_by_id(project_id)
-                .filter(project::Column::HostConnectionId.eq(connection_id.0))
+                .filter(project::Column::HostConnectionId.eq(connection_id.0 as i32))
                 .one(&*tx)
                 .await?
                 .ok_or_else(|| anyhow!("no such project"))?;
@@ -1721,7 +1719,7 @@ impl Database {
 
             // Ensure the update comes from the host.
             let project = project::Entity::find_by_id(project_id)
-                .filter(project::Column::HostConnectionId.eq(connection_id.0))
+                .filter(project::Column::HostConnectionId.eq(connection_id.0 as i32))
                 .one(&*tx)
                 .await?
                 .ok_or_else(|| anyhow!("no such project"))?;
@@ -1904,7 +1902,7 @@ impl Database {
     ) -> Result<RoomGuard<(Project, ReplicaId)>> {
         self.room_transaction(|tx| async move {
             let participant = room_participant::Entity::find()
-                .filter(room_participant::Column::AnsweringConnectionId.eq(connection_id.0))
+                .filter(room_participant::Column::AnsweringConnectionId.eq(connection_id.0 as i32))
                 .one(&*tx)
                 .await?
                 .ok_or_else(|| anyhow!("must join a room first"))?;
@@ -2041,7 +2039,7 @@ impl Database {
                 .filter(
                     project_collaborator::Column::ProjectId
                         .eq(project_id)
-                        .and(project_collaborator::Column::ConnectionId.eq(connection_id.0)),
+                        .and(project_collaborator::Column::ConnectionId.eq(connection_id.0 as i32)),
                 )
                 .exec(&*tx)
                 .await?;