Hold room lock through the entirety of a `room_transaction`

Antonio Scandurra created

Previously, when the host repeatedly sent `UpdateWorktree` messages,
new guests attempting to join a project would observe a severe slowdown
caused by a database serialization error (e.g., the coherence of the data
would get violated midway through `Database::join_project` due to worktree
entries being mutated as the user joined). Writing entries is pretty fast,
whereas reading all of them for a project can take more than 100ms.
Transactions that failed due to a serialization error are retried, but the guest
would keep retrying until the host finished writing because the guest's read
was slow.

This commit changes the semantics of `room_transaction` to acquire a room
lock before even starting the transaction and holding it all the way after
commit (storing it, as before, in the `RoomGuard`). This ensures that a fast
writer (the host) can't starve a slow reader (the guest), allowing the latter
to make progress by temporarily pausing writes by the former.

Change summary

crates/collab/src/db.rs | 198 ++++++++++++++++++++++++------------------
1 file changed, 112 insertions(+), 86 deletions(-)

Detailed changes

crates/collab/src/db.rs 🔗

@@ -157,7 +157,7 @@ impl Database {
         room_id: RoomId,
         new_server_id: ServerId,
     ) -> Result<RoomGuard<RefreshedRoom>> {
-        self.room_transaction(|tx| async move {
+        self.room_transaction(room_id, |tx| async move {
             let stale_participant_filter = Condition::all()
                 .add(room_participant::Column::RoomId.eq(room_id))
                 .add(room_participant::Column::AnsweringConnectionId.is_not_null())
@@ -193,14 +193,11 @@ impl Database {
                 room::Entity::delete_by_id(room_id).exec(&*tx).await?;
             }
 
-            Ok((
-                room_id,
-                RefreshedRoom {
-                    room,
-                    stale_participant_user_ids,
-                    canceled_calls_to_user_ids,
-                },
-            ))
+            Ok(RefreshedRoom {
+                room,
+                stale_participant_user_ids,
+                canceled_calls_to_user_ids,
+            })
         })
         .await
     }
@@ -1129,18 +1126,16 @@ impl Database {
         user_id: UserId,
         connection: ConnectionId,
         live_kit_room: &str,
-    ) -> Result<RoomGuard<proto::Room>> {
-        self.room_transaction(|tx| async move {
+    ) -> Result<proto::Room> {
+        self.transaction(|tx| async move {
             let room = room::ActiveModel {
                 live_kit_room: ActiveValue::set(live_kit_room.into()),
                 ..Default::default()
             }
             .insert(&*tx)
             .await?;
-            let room_id = room.id;
-
             room_participant::ActiveModel {
-                room_id: ActiveValue::set(room_id),
+                room_id: ActiveValue::set(room.id),
                 user_id: ActiveValue::set(user_id),
                 answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
                 answering_connection_server_id: ActiveValue::set(Some(ServerId(
@@ -1157,8 +1152,8 @@ impl Database {
             .insert(&*tx)
             .await?;
 
-            let room = self.get_room(room_id, &tx).await?;
-            Ok((room_id, room))
+            let room = self.get_room(room.id, &tx).await?;
+            Ok(room)
         })
         .await
     }
@@ -1171,7 +1166,7 @@ impl Database {
         called_user_id: UserId,
         initial_project_id: Option<ProjectId>,
     ) -> Result<RoomGuard<(proto::Room, proto::IncomingCall)>> {
-        self.room_transaction(|tx| async move {
+        self.room_transaction(room_id, |tx| async move {
             room_participant::ActiveModel {
                 room_id: ActiveValue::set(room_id),
                 user_id: ActiveValue::set(called_user_id),
@@ -1190,7 +1185,7 @@ impl Database {
             let room = self.get_room(room_id, &tx).await?;
             let incoming_call = Self::build_incoming_call(&room, called_user_id)
                 .ok_or_else(|| anyhow!("failed to build incoming call"))?;
-            Ok((room_id, (room, incoming_call)))
+            Ok((room, incoming_call))
         })
         .await
     }
@@ -1200,7 +1195,7 @@ impl Database {
         room_id: RoomId,
         called_user_id: UserId,
     ) -> Result<RoomGuard<proto::Room>> {
-        self.room_transaction(|tx| async move {
+        self.room_transaction(room_id, |tx| async move {
             room_participant::Entity::delete_many()
                 .filter(
                     room_participant::Column::RoomId
@@ -1210,7 +1205,7 @@ impl Database {
                 .exec(&*tx)
                 .await?;
             let room = self.get_room(room_id, &tx).await?;
-            Ok((room_id, room))
+            Ok(room)
         })
         .await
     }
@@ -1257,7 +1252,7 @@ impl Database {
         calling_connection: ConnectionId,
         called_user_id: UserId,
     ) -> Result<RoomGuard<proto::Room>> {
-        self.room_transaction(|tx| async move {
+        self.room_transaction(room_id, |tx| async move {
             let participant = room_participant::Entity::find()
                 .filter(
                     Condition::all()
@@ -1276,14 +1271,13 @@ impl Database {
                 .one(&*tx)
                 .await?
                 .ok_or_else(|| anyhow!("no call to cancel"))?;
-            let room_id = participant.room_id;
 
             room_participant::Entity::delete(participant.into_active_model())
                 .exec(&*tx)
                 .await?;
 
             let room = self.get_room(room_id, &tx).await?;
-            Ok((room_id, room))
+            Ok(room)
         })
         .await
     }
@@ -1294,7 +1288,7 @@ impl Database {
         user_id: UserId,
         connection: ConnectionId,
     ) -> Result<RoomGuard<proto::Room>> {
-        self.room_transaction(|tx| async move {
+        self.room_transaction(room_id, |tx| async move {
             let result = room_participant::Entity::update_many()
                 .filter(
                     Condition::all()
@@ -1316,7 +1310,7 @@ impl Database {
                 Err(anyhow!("room does not exist or was already joined"))?
             } else {
                 let room = self.get_room(room_id, &tx).await?;
-                Ok((room_id, room))
+                Ok(room)
             }
         })
         .await
@@ -1328,9 +1322,9 @@ impl Database {
         user_id: UserId,
         connection: ConnectionId,
     ) -> Result<RoomGuard<RejoinedRoom>> {
-        self.room_transaction(|tx| async {
+        let room_id = RoomId::from_proto(rejoin_room.id);
+        self.room_transaction(room_id, |tx| async {
             let tx = tx;
-            let room_id = RoomId::from_proto(rejoin_room.id);
             let participant_update = room_participant::Entity::update_many()
                 .filter(
                     Condition::all()
@@ -1549,14 +1543,11 @@ impl Database {
             }
 
             let room = self.get_room(room_id, &tx).await?;
-            Ok((
-                room_id,
-                RejoinedRoom {
-                    room,
-                    rejoined_projects,
-                    reshared_projects,
-                },
-            ))
+            Ok(RejoinedRoom {
+                room,
+                rejoined_projects,
+                reshared_projects,
+            })
         })
         .await
     }
@@ -1723,7 +1714,7 @@ impl Database {
         connection: ConnectionId,
         location: proto::ParticipantLocation,
     ) -> Result<RoomGuard<proto::Room>> {
-        self.room_transaction(|tx| async {
+        self.room_transaction(room_id, |tx| async {
             let tx = tx;
             let location_kind;
             let location_project_id;
@@ -1769,7 +1760,7 @@ impl Database {
 
             if result.rows_affected == 1 {
                 let room = self.get_room(room_id, &tx).await?;
-                Ok((room_id, room))
+                Ok(room)
             } else {
                 Err(anyhow!("could not update room participant location"))?
             }
@@ -1963,7 +1954,7 @@ impl Database {
         connection: ConnectionId,
         worktrees: &[proto::WorktreeMetadata],
     ) -> Result<RoomGuard<(ProjectId, proto::Room)>> {
-        self.room_transaction(|tx| async move {
+        self.room_transaction(room_id, |tx| async move {
             let participant = room_participant::Entity::find()
                 .filter(
                     Condition::all()
@@ -2024,7 +2015,7 @@ impl Database {
             .await?;
 
             let room = self.get_room(room_id, &tx).await?;
-            Ok((room_id, (project.id, room)))
+            Ok((project.id, room))
         })
         .await
     }
@@ -2034,7 +2025,8 @@ impl Database {
         project_id: ProjectId,
         connection: ConnectionId,
     ) -> Result<RoomGuard<(proto::Room, Vec<ConnectionId>)>> {
-        self.room_transaction(|tx| async move {
+        let room_id = self.room_id_for_project(project_id).await?;
+        self.room_transaction(room_id, |tx| async move {
             let guest_connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
 
             let project = project::Entity::find_by_id(project_id)
@@ -2042,12 +2034,11 @@ impl Database {
                 .await?
                 .ok_or_else(|| anyhow!("project not found"))?;
             if project.host_connection()? == connection {
-                let room_id = project.room_id;
                 project::Entity::delete(project.into_active_model())
                     .exec(&*tx)
                     .await?;
                 let room = self.get_room(room_id, &tx).await?;
-                Ok((room_id, (room, guest_connection_ids)))
+                Ok((room, guest_connection_ids))
             } else {
                 Err(anyhow!("cannot unshare a project hosted by another user"))?
             }
@@ -2061,7 +2052,8 @@ impl Database {
         connection: ConnectionId,
         worktrees: &[proto::WorktreeMetadata],
     ) -> Result<RoomGuard<(proto::Room, Vec<ConnectionId>)>> {
-        self.room_transaction(|tx| async move {
+        let room_id = self.room_id_for_project(project_id).await?;
+        self.room_transaction(room_id, |tx| async move {
             let project = project::Entity::find_by_id(project_id)
                 .filter(
                     Condition::all()
@@ -2079,7 +2071,7 @@ impl Database {
 
             let guest_connection_ids = self.project_guest_connection_ids(project.id, &tx).await?;
             let room = self.get_room(project.room_id, &tx).await?;
-            Ok((project.room_id, (room, guest_connection_ids)))
+            Ok((room, guest_connection_ids))
         })
         .await
     }
@@ -2124,12 +2116,12 @@ impl Database {
         update: &proto::UpdateWorktree,
         connection: ConnectionId,
     ) -> Result<RoomGuard<Vec<ConnectionId>>> {
-        self.room_transaction(|tx| async move {
-            let project_id = ProjectId::from_proto(update.project_id);
-            let worktree_id = update.worktree_id as i64;
-
+        let project_id = ProjectId::from_proto(update.project_id);
+        let worktree_id = update.worktree_id as i64;
+        let room_id = self.room_id_for_project(project_id).await?;
+        self.room_transaction(room_id, |tx| async move {
             // Ensure the update comes from the host.
-            let project = project::Entity::find_by_id(project_id)
+            let _project = project::Entity::find_by_id(project_id)
                 .filter(
                     Condition::all()
                         .add(project::Column::HostConnectionId.eq(connection.id as i32))
@@ -2140,7 +2132,6 @@ impl Database {
                 .one(&*tx)
                 .await?
                 .ok_or_else(|| anyhow!("no such project"))?;
-            let room_id = project.room_id;
 
             // Update metadata.
             worktree::Entity::update(worktree::ActiveModel {
@@ -2220,7 +2211,7 @@ impl Database {
             }
 
             let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
-            Ok((room_id, connection_ids))
+            Ok(connection_ids)
         })
         .await
     }
@@ -2230,9 +2221,10 @@ impl Database {
         update: &proto::UpdateDiagnosticSummary,
         connection: ConnectionId,
     ) -> Result<RoomGuard<Vec<ConnectionId>>> {
-        self.room_transaction(|tx| async move {
-            let project_id = ProjectId::from_proto(update.project_id);
-            let worktree_id = update.worktree_id as i64;
+        let project_id = ProjectId::from_proto(update.project_id);
+        let worktree_id = update.worktree_id as i64;
+        let room_id = self.room_id_for_project(project_id).await?;
+        self.room_transaction(room_id, |tx| async move {
             let summary = update
                 .summary
                 .as_ref()
@@ -2274,7 +2266,7 @@ impl Database {
             .await?;
 
             let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
-            Ok((project.room_id, connection_ids))
+            Ok(connection_ids)
         })
         .await
     }
@@ -2284,8 +2276,9 @@ impl Database {
         update: &proto::StartLanguageServer,
         connection: ConnectionId,
     ) -> Result<RoomGuard<Vec<ConnectionId>>> {
-        self.room_transaction(|tx| async move {
-            let project_id = ProjectId::from_proto(update.project_id);
+        let project_id = ProjectId::from_proto(update.project_id);
+        let room_id = self.room_id_for_project(project_id).await?;
+        self.room_transaction(room_id, |tx| async move {
             let server = update
                 .server
                 .as_ref()
@@ -2319,7 +2312,7 @@ impl Database {
             .await?;
 
             let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
-            Ok((project.room_id, connection_ids))
+            Ok(connection_ids)
         })
         .await
     }
@@ -2329,7 +2322,8 @@ impl Database {
         project_id: ProjectId,
         connection: ConnectionId,
     ) -> Result<RoomGuard<(Project, ReplicaId)>> {
-        self.room_transaction(|tx| async move {
+        let room_id = self.room_id_for_project(project_id).await?;
+        self.room_transaction(room_id, |tx| async move {
             let participant = room_participant::Entity::find()
                 .filter(
                     Condition::all()
@@ -2455,7 +2449,6 @@ impl Database {
                 .all(&*tx)
                 .await?;
 
-            let room_id = project.room_id;
             let project = Project {
                 collaborators: collaborators
                     .into_iter()
@@ -2475,7 +2468,7 @@ impl Database {
                     })
                     .collect(),
             };
-            Ok((room_id, (project, replica_id as ReplicaId)))
+            Ok((project, replica_id as ReplicaId))
         })
         .await
     }
@@ -2485,7 +2478,8 @@ impl Database {
         project_id: ProjectId,
         connection: ConnectionId,
     ) -> Result<RoomGuard<LeftProject>> {
-        self.room_transaction(|tx| async move {
+        let room_id = self.room_id_for_project(project_id).await?;
+        self.room_transaction(room_id, |tx| async move {
             let result = project_collaborator::Entity::delete_many()
                 .filter(
                     Condition::all()
@@ -2521,7 +2515,7 @@ impl Database {
                 host_connection_id: project.host_connection()?,
                 connection_ids,
             };
-            Ok((project.room_id, left_project))
+            Ok(left_project)
         })
         .await
     }
@@ -2531,11 +2525,8 @@ impl Database {
         project_id: ProjectId,
         connection_id: ConnectionId,
     ) -> Result<RoomGuard<Vec<ProjectCollaborator>>> {
-        self.room_transaction(|tx| async move {
-            let project = project::Entity::find_by_id(project_id)
-                .one(&*tx)
-                .await?
-                .ok_or_else(|| anyhow!("no such project"))?;
+        let room_id = self.room_id_for_project(project_id).await?;
+        self.room_transaction(room_id, |tx| async move {
             let collaborators = project_collaborator::Entity::find()
                 .filter(project_collaborator::Column::ProjectId.eq(project_id))
                 .all(&*tx)
@@ -2553,7 +2544,7 @@ impl Database {
                 .iter()
                 .any(|collaborator| collaborator.connection_id == connection_id)
             {
-                Ok((project.room_id, collaborators))
+                Ok(collaborators)
             } else {
                 Err(anyhow!("no such project"))?
             }
@@ -2566,11 +2557,8 @@ impl Database {
         project_id: ProjectId,
         connection_id: ConnectionId,
     ) -> Result<RoomGuard<HashSet<ConnectionId>>> {
-        self.room_transaction(|tx| async move {
-            let project = project::Entity::find_by_id(project_id)
-                .one(&*tx)
-                .await?
-                .ok_or_else(|| anyhow!("no such project"))?;
+        let room_id = self.room_id_for_project(project_id).await?;
+        self.room_transaction(room_id, |tx| async move {
             let mut collaborators = project_collaborator::Entity::find()
                 .filter(project_collaborator::Column::ProjectId.eq(project_id))
                 .stream(&*tx)
@@ -2583,7 +2571,7 @@ impl Database {
             }
 
             if connection_ids.contains(&connection_id) {
-                Ok((project.room_id, connection_ids))
+                Ok(connection_ids)
             } else {
                 Err(anyhow!("no such project"))?
             }
@@ -2613,6 +2601,17 @@ impl Database {
         Ok(guest_connection_ids)
     }
 
+    async fn room_id_for_project(&self, project_id: ProjectId) -> Result<RoomId> {
+        self.transaction(|tx| async move {
+            let project = project::Entity::find_by_id(project_id)
+                .one(&*tx)
+                .await?
+                .ok_or_else(|| anyhow!("project {} not found", project_id))?;
+            Ok(project.room_id)
+        })
+        .await
+    }
+
     // access tokens
 
     pub async fn create_access_token_hash(
@@ -2763,21 +2762,48 @@ impl Database {
         self.run(body).await
     }
 
-    async fn room_transaction<F, Fut, T>(&self, f: F) -> Result<RoomGuard<T>>
+    async fn room_transaction<F, Fut, T>(&self, room_id: RoomId, f: F) -> Result<RoomGuard<T>>
     where
         F: Send + Fn(TransactionHandle) -> Fut,
-        Fut: Send + Future<Output = Result<(RoomId, T)>>,
+        Fut: Send + Future<Output = Result<T>>,
     {
-        let data = self
-            .optional_room_transaction(move |tx| {
-                let future = f(tx);
-                async {
-                    let data = future.await?;
-                    Ok(Some(data))
+        let body = async {
+            loop {
+                let lock = self.rooms.entry(room_id).or_default().clone();
+                let _guard = lock.lock_owned().await;
+                let (tx, result) = self.with_transaction(&f).await?;
+                match result {
+                    Ok(data) => {
+                        match tx.commit().await.map_err(Into::into) {
+                            Ok(()) => {
+                                return Ok(RoomGuard {
+                                    data,
+                                    _guard,
+                                    _not_send: PhantomData,
+                                });
+                            }
+                            Err(error) => {
+                                if is_serialization_error(&error) {
+                                    // Retry (don't break the loop)
+                                } else {
+                                    return Err(error);
+                                }
+                            }
+                        }
+                    }
+                    Err(error) => {
+                        tx.rollback().await?;
+                        if is_serialization_error(&error) {
+                            // Retry (don't break the loop)
+                        } else {
+                            return Err(error);
+                        }
+                    }
                 }
-            })
-            .await?;
-        Ok(data.unwrap())
+            }
+        };
+
+        self.run(body).await
     }
 
     async fn with_transaction<F, Fut, T>(&self, f: &F) -> Result<(DatabaseTransaction, Result<T>)>