Remove random pauses to prevent the database from deadlocking

Antonio Scandurra created

Change summary

crates/collab/src/db.rs                | 108 ++++++++++++++-------------
crates/collab/src/integration_tests.rs |   8 +
2 files changed, 62 insertions(+), 54 deletions(-)

Detailed changes

crates/collab/src/db.rs 🔗

@@ -2131,31 +2131,35 @@ impl Database {
         F: Send + Fn(TransactionHandle) -> Fut,
         Fut: Send + Future<Output = Result<T>>,
     {
-        loop {
-            let (tx, result) = self.run(self.with_transaction(&f)).await?;
-            match result {
-                Ok(result) => {
-                    match self.run(async move { Ok(tx.commit().await?) }).await {
-                        Ok(()) => return Ok(result),
-                        Err(error) => {
-                            if is_serialization_error(&error) {
-                                // Retry (don't break the loop)
-                            } else {
-                                return Err(error);
+        let body = async {
+            loop {
+                let (tx, result) = self.with_transaction(&f).await?;
+                match result {
+                    Ok(result) => {
+                        match tx.commit().await.map_err(Into::into) {
+                            Ok(()) => return Ok(result),
+                            Err(error) => {
+                                if is_serialization_error(&error) {
+                                    // Retry (don't break the loop)
+                                } else {
+                                    return Err(error);
+                                }
                             }
                         }
                     }
-                }
-                Err(error) => {
-                    self.run(tx.rollback()).await?;
-                    if is_serialization_error(&error) {
-                        // Retry (don't break the loop)
-                    } else {
-                        return Err(error);
+                    Err(error) => {
+                        tx.rollback().await?;
+                        if is_serialization_error(&error) {
+                            // Retry (don't break the loop)
+                        } else {
+                            return Err(error);
+                        }
                     }
                 }
             }
-        }
+        };
+
+        self.run(body).await
     }
 
     async fn room_transaction<F, Fut, T>(&self, f: F) -> Result<RoomGuard<T>>
@@ -2163,39 +2167,43 @@ impl Database {
         F: Send + Fn(TransactionHandle) -> Fut,
         Fut: Send + Future<Output = Result<(RoomId, T)>>,
     {
-        loop {
-            let (tx, result) = self.run(self.with_transaction(&f)).await?;
-            match result {
-                Ok((room_id, data)) => {
-                    let lock = self.rooms.entry(room_id).or_default().clone();
-                    let _guard = lock.lock_owned().await;
-                    match self.run(async move { Ok(tx.commit().await?) }).await {
-                        Ok(()) => {
-                            return Ok(RoomGuard {
-                                data,
-                                _guard,
-                                _not_send: PhantomData,
-                            });
-                        }
-                        Err(error) => {
-                            if is_serialization_error(&error) {
-                                // Retry (don't break the loop)
-                            } else {
-                                return Err(error);
+        let body = async {
+            loop {
+                let (tx, result) = self.with_transaction(&f).await?;
+                match result {
+                    Ok((room_id, data)) => {
+                        let lock = self.rooms.entry(room_id).or_default().clone();
+                        let _guard = lock.lock_owned().await;
+                        match tx.commit().await.map_err(Into::into) {
+                            Ok(()) => {
+                                return Ok(RoomGuard {
+                                    data,
+                                    _guard,
+                                    _not_send: PhantomData,
+                                });
+                            }
+                            Err(error) => {
+                                if is_serialization_error(&error) {
+                                    // Retry (don't break the loop)
+                                } else {
+                                    return Err(error);
+                                }
                             }
                         }
                     }
-                }
-                Err(error) => {
-                    self.run(tx.rollback()).await?;
-                    if is_serialization_error(&error) {
-                        // Retry (don't break the loop)
-                    } else {
-                        return Err(error);
+                    Err(error) => {
+                        tx.rollback().await?;
+                        if is_serialization_error(&error) {
+                            // Retry (don't break the loop)
+                        } else {
+                            return Err(error);
+                        }
                     }
                 }
             }
-        }
+        };
+
+        self.run(body).await
     }
 
     async fn with_transaction<F, Fut, T>(&self, f: &F) -> Result<(DatabaseTransaction, Result<T>)>
@@ -2233,13 +2241,7 @@ impl Database {
                 background.simulate_random_delay().await;
             }
 
-            let result = self.runtime.as_ref().unwrap().block_on(future);
-
-            if let Some(background) = self.background.as_ref() {
-                background.simulate_random_delay().await;
-            }
-
-            result
+            self.runtime.as_ref().unwrap().block_on(future)
         }
 
         #[cfg(not(test))]

crates/collab/src/integration_tests.rs 🔗

@@ -5672,7 +5672,13 @@ impl TestServer {
     async fn start(background: Arc<executor::Background>) -> Self {
         static NEXT_LIVE_KIT_SERVER_ID: AtomicUsize = AtomicUsize::new(0);
 
-        let test_db = TestDb::sqlite(background.clone());
+        let use_postgres = env::var("USE_POSTGRES").ok();
+        let use_postgres = use_postgres.as_deref();
+        let test_db = if use_postgres == Some("true") || use_postgres == Some("1") {
+            TestDb::postgres(background.clone())
+        } else {
+            TestDb::sqlite(background.clone())
+        };
         let live_kit_server_id = NEXT_LIVE_KIT_SERVER_ID.fetch_add(1, SeqCst);
         let live_kit_server = live_kit_client::TestServer::create(
             format!("http://livekit.{}.test", live_kit_server_id),