Merge pull request #2200 from zed-industries/fix-slow-project-join

Antonio Scandurra created

Hold room lock through the entirety of a `room_transaction`

Change summary

crates/collab/src/db.rs | 198 ++++++++++++++++++++++++------------------
1 file changed, 112 insertions(+), 86 deletions(-)

Detailed changes

crates/collab/src/db.rs 🔗

@@ -158,7 +158,7 @@ impl Database {
         room_id: RoomId,
         new_server_id: ServerId,
     ) -> Result<RoomGuard<RefreshedRoom>> {
-        self.room_transaction(|tx| async move {
+        self.room_transaction(room_id, |tx| async move {
             let stale_participant_filter = Condition::all()
                 .add(room_participant::Column::RoomId.eq(room_id))
                 .add(room_participant::Column::AnsweringConnectionId.is_not_null())
@@ -194,14 +194,11 @@ impl Database {
                 room::Entity::delete_by_id(room_id).exec(&*tx).await?;
             }
 
-            Ok((
-                room_id,
-                RefreshedRoom {
-                    room,
-                    stale_participant_user_ids,
-                    canceled_calls_to_user_ids,
-                },
-            ))
+            Ok(RefreshedRoom {
+                room,
+                stale_participant_user_ids,
+                canceled_calls_to_user_ids,
+            })
         })
         .await
     }
@@ -1130,18 +1127,16 @@ impl Database {
         user_id: UserId,
         connection: ConnectionId,
         live_kit_room: &str,
-    ) -> Result<RoomGuard<proto::Room>> {
-        self.room_transaction(|tx| async move {
+    ) -> Result<proto::Room> {
+        self.transaction(|tx| async move {
             let room = room::ActiveModel {
                 live_kit_room: ActiveValue::set(live_kit_room.into()),
                 ..Default::default()
             }
             .insert(&*tx)
             .await?;
-            let room_id = room.id;
-
             room_participant::ActiveModel {
-                room_id: ActiveValue::set(room_id),
+                room_id: ActiveValue::set(room.id),
                 user_id: ActiveValue::set(user_id),
                 answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
                 answering_connection_server_id: ActiveValue::set(Some(ServerId(
@@ -1158,8 +1153,8 @@ impl Database {
             .insert(&*tx)
             .await?;
 
-            let room = self.get_room(room_id, &tx).await?;
-            Ok((room_id, room))
+            let room = self.get_room(room.id, &tx).await?;
+            Ok(room)
         })
         .await
     }
@@ -1172,7 +1167,7 @@ impl Database {
         called_user_id: UserId,
         initial_project_id: Option<ProjectId>,
     ) -> Result<RoomGuard<(proto::Room, proto::IncomingCall)>> {
-        self.room_transaction(|tx| async move {
+        self.room_transaction(room_id, |tx| async move {
             room_participant::ActiveModel {
                 room_id: ActiveValue::set(room_id),
                 user_id: ActiveValue::set(called_user_id),
@@ -1191,7 +1186,7 @@ impl Database {
             let room = self.get_room(room_id, &tx).await?;
             let incoming_call = Self::build_incoming_call(&room, called_user_id)
                 .ok_or_else(|| anyhow!("failed to build incoming call"))?;
-            Ok((room_id, (room, incoming_call)))
+            Ok((room, incoming_call))
         })
         .await
     }
@@ -1201,7 +1196,7 @@ impl Database {
         room_id: RoomId,
         called_user_id: UserId,
     ) -> Result<RoomGuard<proto::Room>> {
-        self.room_transaction(|tx| async move {
+        self.room_transaction(room_id, |tx| async move {
             room_participant::Entity::delete_many()
                 .filter(
                     room_participant::Column::RoomId
@@ -1211,7 +1206,7 @@ impl Database {
                 .exec(&*tx)
                 .await?;
             let room = self.get_room(room_id, &tx).await?;
-            Ok((room_id, room))
+            Ok(room)
         })
         .await
     }
@@ -1258,7 +1253,7 @@ impl Database {
         calling_connection: ConnectionId,
         called_user_id: UserId,
     ) -> Result<RoomGuard<proto::Room>> {
-        self.room_transaction(|tx| async move {
+        self.room_transaction(room_id, |tx| async move {
             let participant = room_participant::Entity::find()
                 .filter(
                     Condition::all()
@@ -1277,14 +1272,13 @@ impl Database {
                 .one(&*tx)
                 .await?
                 .ok_or_else(|| anyhow!("no call to cancel"))?;
-            let room_id = participant.room_id;
 
             room_participant::Entity::delete(participant.into_active_model())
                 .exec(&*tx)
                 .await?;
 
             let room = self.get_room(room_id, &tx).await?;
-            Ok((room_id, room))
+            Ok(room)
         })
         .await
     }
@@ -1295,7 +1289,7 @@ impl Database {
         user_id: UserId,
         connection: ConnectionId,
     ) -> Result<RoomGuard<proto::Room>> {
-        self.room_transaction(|tx| async move {
+        self.room_transaction(room_id, |tx| async move {
             let result = room_participant::Entity::update_many()
                 .filter(
                     Condition::all()
@@ -1317,7 +1311,7 @@ impl Database {
                 Err(anyhow!("room does not exist or was already joined"))?
             } else {
                 let room = self.get_room(room_id, &tx).await?;
-                Ok((room_id, room))
+                Ok(room)
             }
         })
         .await
@@ -1329,9 +1323,9 @@ impl Database {
         user_id: UserId,
         connection: ConnectionId,
     ) -> Result<RoomGuard<RejoinedRoom>> {
-        self.room_transaction(|tx| async {
+        let room_id = RoomId::from_proto(rejoin_room.id);
+        self.room_transaction(room_id, |tx| async {
             let tx = tx;
-            let room_id = RoomId::from_proto(rejoin_room.id);
             let participant_update = room_participant::Entity::update_many()
                 .filter(
                     Condition::all()
@@ -1550,14 +1544,11 @@ impl Database {
             }
 
             let room = self.get_room(room_id, &tx).await?;
-            Ok((
-                room_id,
-                RejoinedRoom {
-                    room,
-                    rejoined_projects,
-                    reshared_projects,
-                },
-            ))
+            Ok(RejoinedRoom {
+                room,
+                rejoined_projects,
+                reshared_projects,
+            })
         })
         .await
     }
@@ -1806,7 +1797,7 @@ impl Database {
         connection: ConnectionId,
         location: proto::ParticipantLocation,
     ) -> Result<RoomGuard<proto::Room>> {
-        self.room_transaction(|tx| async {
+        self.room_transaction(room_id, |tx| async {
             let tx = tx;
             let location_kind;
             let location_project_id;
@@ -1852,7 +1843,7 @@ impl Database {
 
             if result.rows_affected == 1 {
                 let room = self.get_room(room_id, &tx).await?;
-                Ok((room_id, room))
+                Ok(room)
             } else {
                 Err(anyhow!("could not update room participant location"))?
             }
@@ -2058,7 +2049,7 @@ impl Database {
         connection: ConnectionId,
         worktrees: &[proto::WorktreeMetadata],
     ) -> Result<RoomGuard<(ProjectId, proto::Room)>> {
-        self.room_transaction(|tx| async move {
+        self.room_transaction(room_id, |tx| async move {
             let participant = room_participant::Entity::find()
                 .filter(
                     Condition::all()
@@ -2119,7 +2110,7 @@ impl Database {
             .await?;
 
             let room = self.get_room(room_id, &tx).await?;
-            Ok((room_id, (project.id, room)))
+            Ok((project.id, room))
         })
         .await
     }
@@ -2129,7 +2120,8 @@ impl Database {
         project_id: ProjectId,
         connection: ConnectionId,
     ) -> Result<RoomGuard<(proto::Room, Vec<ConnectionId>)>> {
-        self.room_transaction(|tx| async move {
+        let room_id = self.room_id_for_project(project_id).await?;
+        self.room_transaction(room_id, |tx| async move {
             let guest_connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
 
             let project = project::Entity::find_by_id(project_id)
@@ -2137,12 +2129,11 @@ impl Database {
                 .await?
                 .ok_or_else(|| anyhow!("project not found"))?;
             if project.host_connection()? == connection {
-                let room_id = project.room_id;
                 project::Entity::delete(project.into_active_model())
                     .exec(&*tx)
                     .await?;
                 let room = self.get_room(room_id, &tx).await?;
-                Ok((room_id, (room, guest_connection_ids)))
+                Ok((room, guest_connection_ids))
             } else {
                 Err(anyhow!("cannot unshare a project hosted by another user"))?
             }
@@ -2156,7 +2147,8 @@ impl Database {
         connection: ConnectionId,
         worktrees: &[proto::WorktreeMetadata],
     ) -> Result<RoomGuard<(proto::Room, Vec<ConnectionId>)>> {
-        self.room_transaction(|tx| async move {
+        let room_id = self.room_id_for_project(project_id).await?;
+        self.room_transaction(room_id, |tx| async move {
             let project = project::Entity::find_by_id(project_id)
                 .filter(
                     Condition::all()
@@ -2174,7 +2166,7 @@ impl Database {
 
             let guest_connection_ids = self.project_guest_connection_ids(project.id, &tx).await?;
             let room = self.get_room(project.room_id, &tx).await?;
-            Ok((project.room_id, (room, guest_connection_ids)))
+            Ok((room, guest_connection_ids))
         })
         .await
     }
@@ -2219,12 +2211,12 @@ impl Database {
         update: &proto::UpdateWorktree,
         connection: ConnectionId,
     ) -> Result<RoomGuard<Vec<ConnectionId>>> {
-        self.room_transaction(|tx| async move {
-            let project_id = ProjectId::from_proto(update.project_id);
-            let worktree_id = update.worktree_id as i64;
-
+        let project_id = ProjectId::from_proto(update.project_id);
+        let worktree_id = update.worktree_id as i64;
+        let room_id = self.room_id_for_project(project_id).await?;
+        self.room_transaction(room_id, |tx| async move {
             // Ensure the update comes from the host.
-            let project = project::Entity::find_by_id(project_id)
+            let _project = project::Entity::find_by_id(project_id)
                 .filter(
                     Condition::all()
                         .add(project::Column::HostConnectionId.eq(connection.id as i32))
@@ -2235,7 +2227,6 @@ impl Database {
                 .one(&*tx)
                 .await?
                 .ok_or_else(|| anyhow!("no such project"))?;
-            let room_id = project.room_id;
 
             // Update metadata.
             worktree::Entity::update(worktree::ActiveModel {
@@ -2315,7 +2306,7 @@ impl Database {
             }
 
             let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
-            Ok((room_id, connection_ids))
+            Ok(connection_ids)
         })
         .await
     }
@@ -2325,9 +2316,10 @@ impl Database {
         update: &proto::UpdateDiagnosticSummary,
         connection: ConnectionId,
     ) -> Result<RoomGuard<Vec<ConnectionId>>> {
-        self.room_transaction(|tx| async move {
-            let project_id = ProjectId::from_proto(update.project_id);
-            let worktree_id = update.worktree_id as i64;
+        let project_id = ProjectId::from_proto(update.project_id);
+        let worktree_id = update.worktree_id as i64;
+        let room_id = self.room_id_for_project(project_id).await?;
+        self.room_transaction(room_id, |tx| async move {
             let summary = update
                 .summary
                 .as_ref()
@@ -2369,7 +2361,7 @@ impl Database {
             .await?;
 
             let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
-            Ok((project.room_id, connection_ids))
+            Ok(connection_ids)
         })
         .await
     }
@@ -2379,8 +2371,9 @@ impl Database {
         update: &proto::StartLanguageServer,
         connection: ConnectionId,
     ) -> Result<RoomGuard<Vec<ConnectionId>>> {
-        self.room_transaction(|tx| async move {
-            let project_id = ProjectId::from_proto(update.project_id);
+        let project_id = ProjectId::from_proto(update.project_id);
+        let room_id = self.room_id_for_project(project_id).await?;
+        self.room_transaction(room_id, |tx| async move {
             let server = update
                 .server
                 .as_ref()
@@ -2414,7 +2407,7 @@ impl Database {
             .await?;
 
             let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
-            Ok((project.room_id, connection_ids))
+            Ok(connection_ids)
         })
         .await
     }
@@ -2424,7 +2417,8 @@ impl Database {
         project_id: ProjectId,
         connection: ConnectionId,
     ) -> Result<RoomGuard<(Project, ReplicaId)>> {
-        self.room_transaction(|tx| async move {
+        let room_id = self.room_id_for_project(project_id).await?;
+        self.room_transaction(room_id, |tx| async move {
             let participant = room_participant::Entity::find()
                 .filter(
                     Condition::all()
@@ -2550,7 +2544,6 @@ impl Database {
                 .all(&*tx)
                 .await?;
 
-            let room_id = project.room_id;
             let project = Project {
                 collaborators: collaborators
                     .into_iter()
@@ -2570,7 +2563,7 @@ impl Database {
                     })
                     .collect(),
             };
-            Ok((room_id, (project, replica_id as ReplicaId)))
+            Ok((project, replica_id as ReplicaId))
         })
         .await
     }
@@ -2580,7 +2573,8 @@ impl Database {
         project_id: ProjectId,
         connection: ConnectionId,
     ) -> Result<RoomGuard<LeftProject>> {
-        self.room_transaction(|tx| async move {
+        let room_id = self.room_id_for_project(project_id).await?;
+        self.room_transaction(room_id, |tx| async move {
             let result = project_collaborator::Entity::delete_many()
                 .filter(
                     Condition::all()
@@ -2616,7 +2610,7 @@ impl Database {
                 host_connection_id: project.host_connection()?,
                 connection_ids,
             };
-            Ok((project.room_id, left_project))
+            Ok(left_project)
         })
         .await
     }
@@ -2626,11 +2620,8 @@ impl Database {
         project_id: ProjectId,
         connection_id: ConnectionId,
     ) -> Result<RoomGuard<Vec<ProjectCollaborator>>> {
-        self.room_transaction(|tx| async move {
-            let project = project::Entity::find_by_id(project_id)
-                .one(&*tx)
-                .await?
-                .ok_or_else(|| anyhow!("no such project"))?;
+        let room_id = self.room_id_for_project(project_id).await?;
+        self.room_transaction(room_id, |tx| async move {
             let collaborators = project_collaborator::Entity::find()
                 .filter(project_collaborator::Column::ProjectId.eq(project_id))
                 .all(&*tx)
@@ -2648,7 +2639,7 @@ impl Database {
                 .iter()
                 .any(|collaborator| collaborator.connection_id == connection_id)
             {
-                Ok((project.room_id, collaborators))
+                Ok(collaborators)
             } else {
                 Err(anyhow!("no such project"))?
             }
@@ -2661,11 +2652,8 @@ impl Database {
         project_id: ProjectId,
         connection_id: ConnectionId,
     ) -> Result<RoomGuard<HashSet<ConnectionId>>> {
-        self.room_transaction(|tx| async move {
-            let project = project::Entity::find_by_id(project_id)
-                .one(&*tx)
-                .await?
-                .ok_or_else(|| anyhow!("no such project"))?;
+        let room_id = self.room_id_for_project(project_id).await?;
+        self.room_transaction(room_id, |tx| async move {
             let mut collaborators = project_collaborator::Entity::find()
                 .filter(project_collaborator::Column::ProjectId.eq(project_id))
                 .stream(&*tx)
@@ -2678,7 +2666,7 @@ impl Database {
             }
 
             if connection_ids.contains(&connection_id) {
-                Ok((project.room_id, connection_ids))
+                Ok(connection_ids)
             } else {
                 Err(anyhow!("no such project"))?
             }
@@ -2708,6 +2696,17 @@ impl Database {
         Ok(guest_connection_ids)
     }
 
+    async fn room_id_for_project(&self, project_id: ProjectId) -> Result<RoomId> {
+        self.transaction(|tx| async move {
+            let project = project::Entity::find_by_id(project_id)
+                .one(&*tx)
+                .await?
+                .ok_or_else(|| anyhow!("project {} not found", project_id))?;
+            Ok(project.room_id)
+        })
+        .await
+    }
+
     // access tokens
 
     pub async fn create_access_token_hash(
@@ -2858,21 +2857,48 @@ impl Database {
         self.run(body).await
     }
 
-    async fn room_transaction<F, Fut, T>(&self, f: F) -> Result<RoomGuard<T>>
+    async fn room_transaction<F, Fut, T>(&self, room_id: RoomId, f: F) -> Result<RoomGuard<T>>
     where
         F: Send + Fn(TransactionHandle) -> Fut,
-        Fut: Send + Future<Output = Result<(RoomId, T)>>,
+        Fut: Send + Future<Output = Result<T>>,
     {
-        let data = self
-            .optional_room_transaction(move |tx| {
-                let future = f(tx);
-                async {
-                    let data = future.await?;
-                    Ok(Some(data))
+        let body = async {
+            loop {
+                let lock = self.rooms.entry(room_id).or_default().clone();
+                let _guard = lock.lock_owned().await;
+                let (tx, result) = self.with_transaction(&f).await?;
+                match result {
+                    Ok(data) => {
+                        match tx.commit().await.map_err(Into::into) {
+                            Ok(()) => {
+                                return Ok(RoomGuard {
+                                    data,
+                                    _guard,
+                                    _not_send: PhantomData,
+                                });
+                            }
+                            Err(error) => {
+                                if is_serialization_error(&error) {
+                                    // Retry (don't break the loop)
+                                } else {
+                                    return Err(error);
+                                }
+                            }
+                        }
+                    }
+                    Err(error) => {
+                        tx.rollback().await?;
+                        if is_serialization_error(&error) {
+                            // Retry (don't break the loop)
+                        } else {
+                            return Err(error);
+                        }
+                    }
                 }
-            })
-            .await?;
-        Ok(data.unwrap())
+            }
+        };
+
+        self.run(body).await
     }
 
     async fn with_transaction<F, Fut, T>(&self, f: &F) -> Result<(DatabaseTransaction, Result<T>)>