Found db parallelism problem :(

Mikayla Maki created

Change summary

crates/db/src/db.rs | 165 ++++++++++++++++++++++++++++++----------------
1 file changed, 107 insertions(+), 58 deletions(-)

Detailed changes

crates/db/src/db.rs 🔗

@@ -16,7 +16,7 @@ pub use util::paths::DB_DIR;
 use sqlez::domain::Migrator;
 use sqlez::thread_safe_connection::ThreadSafeConnection;
 use sqlez_macros::sql;
-use std::fs::{create_dir_all, remove_dir_all};
+use std::fs::create_dir_all;
 use std::path::{Path, PathBuf};
 use std::sync::atomic::{AtomicBool, Ordering};
 use std::time::{SystemTime, UNIX_EPOCH};
@@ -40,7 +40,7 @@ const DB_FILE_NAME: &'static str = "db.sqlite";
 
 lazy_static::lazy_static! {
     static ref DB_FILE_OPERATIONS: Mutex<()> = Mutex::new(());
-    static ref DB_WIPED: RwLock<bool> = RwLock::new(false);
+    // static ref DB_WIPED: RwLock<bool> = RwLock::new(false);
     pub static ref BACKUP_DB_PATH: RwLock<Option<PathBuf>> = RwLock::new(None);
     pub static ref ALL_FILE_DB_FAILED: AtomicBool = AtomicBool::new(false);    
 }
@@ -49,21 +49,21 @@ lazy_static::lazy_static! {
 /// This will retry a couple times if there are failures. If opening fails once, the db directory
 /// is moved to a backup folder and a new one is created. If that fails, a shared in memory db is created.
 /// In either case, static variables are set so that the user can be notified.
-pub async fn open_db<M: Migrator + 'static>(wipe_db: bool, db_dir: &Path, release_channel: &ReleaseChannel) -> ThreadSafeConnection<M> {
+pub async fn open_db<M: Migrator + 'static>(db_dir: &Path, release_channel: &ReleaseChannel) -> ThreadSafeConnection<M> {
     let release_channel_name = release_channel.dev_name();
     let main_db_dir = db_dir.join(Path::new(&format!("0-{}", release_channel_name)));
 
-    // If WIPE_DB, delete 0-{channel}
-    if release_channel == &ReleaseChannel::Dev
-        && wipe_db
-        && !*DB_WIPED.read()
-    {
-        let mut db_wiped = DB_WIPED.write();
-        if !*db_wiped {
-            remove_dir_all(&main_db_dir).ok();
-            *db_wiped = true;
-        }
-    }
+    // // If WIPE_DB, delete 0-{channel}
+    // if release_channel == &ReleaseChannel::Dev
+    //     && wipe_db
+    //     && !*DB_WIPED.read()
+    // {
+    //     let mut db_wiped = DB_WIPED.write();
+    //     if !*db_wiped {
+    //         remove_dir_all(&main_db_dir).ok();
+    //         *db_wiped = true;
+    //     }
+    // }
 
     let connection = async_iife!({
         // Note: This still has a race condition where 1 set of migrations succeeds
@@ -205,7 +205,7 @@ macro_rules! define_connection {
 
         #[cfg(not(any(test, feature = "test-support")))]
         $crate::lazy_static::lazy_static! {
-            pub static ref $id: $t = $t($crate::smol::block_on($crate::open_db(std::env::var("WIPE_DB").is_ok(), &$crate::DB_DIR, &$crate::RELEASE_CHANNEL)));
+            pub static ref $id: $t = $t($crate::smol::block_on($crate::open_db(&$crate::DB_DIR, &$crate::RELEASE_CHANNEL)));
         }
     };
     (pub static ref $id:ident: $t:ident<$($d:ty),+> = $migrations:expr;) => {
@@ -236,67 +236,66 @@ macro_rules! define_connection {
 
         #[cfg(not(any(test, feature = "test-support")))]
         $crate::lazy_static::lazy_static! {
-            pub static ref $id: $t = $t($crate::smol::block_on($crate::open_db(std::env::var("WIPE_DB").is_ok(), &$crate::DB_DIR, &$crate::RELEASE_CHANNEL)));
+            pub static ref $id: $t = $t($crate::smol::block_on($crate::open_db(&$crate::DB_DIR, &$crate::RELEASE_CHANNEL)));
         }
     };
 }
 
 #[cfg(test)]
 mod tests {
-    use std::{thread, fs};
+    use std::{fs, thread};
 
     use sqlez::{domain::Domain, connection::Connection};
     use sqlez_macros::sql;
     use tempdir::TempDir;
-    use util::channel::ReleaseChannel;
 
     use crate::{open_db, DB_FILE_NAME};
     
-    // Test that wipe_db exists and works and gives a new db
-    #[gpui::test]
-    async fn test_wipe_db() {
-        enum TestDB {}
+    // // Test that wipe_db exists and works and gives a new db
+    // #[gpui::test]
+    // async fn test_wipe_db() {
+    //     enum TestDB {}
         
-        impl Domain for TestDB {
-            fn name() -> &'static str {
-                "db_tests"
-            }
+    //     impl Domain for TestDB {
+    //         fn name() -> &'static str {
+    //             "db_tests"
+    //         }
             
-            fn migrations() -> &'static [&'static str] {
-                &[sql!(
-                    CREATE TABLE test(value);
-                )]
-            }
-        }
+    //         fn migrations() -> &'static [&'static str] {
+    //             &[sql!(
+    //                 CREATE TABLE test(value);
+    //             )]
+    //         }
+    //     }
         
-        let tempdir = TempDir::new("DbTests").unwrap();
+    //     let tempdir = TempDir::new("DbTests").unwrap();
         
-        // Create a db and insert a marker value
-        let test_db = open_db::<TestDB>(false, tempdir.path(), &util::channel::ReleaseChannel::Dev).await;
-        test_db.write(|connection|  
-            connection.exec(sql!(
-                INSERT INTO test(value) VALUES (10)
-            )).unwrap()().unwrap()
-        ).await;
-        drop(test_db);
+    //     // Create a db and insert a marker value
+    //     let test_db = open_db::<TestDB>(false, tempdir.path(), &util::channel::ReleaseChannel::Dev).await;
+    //     test_db.write(|connection|  
+    //         connection.exec(sql!(
+    //             INSERT INTO test(value) VALUES (10)
+    //         )).unwrap()().unwrap()
+    //     ).await;
+    //     drop(test_db);
         
-        // Opening db with wipe clears once and removes the marker value
-        let mut guards = vec![];
-        for _ in 0..5 {
-            let path = tempdir.path().to_path_buf();
-            let guard = thread::spawn(move || smol::block_on(async {
-                let test_db = open_db::<TestDB>(true, &path, &ReleaseChannel::Dev).await;
+    //     // Opening db with wipe clears once and removes the marker value
+    //     let mut guards = vec![];
+    //     for _ in 0..5 {
+    //         let path = tempdir.path().to_path_buf();
+    //         let guard = thread::spawn(move || smol::block_on(async {
+    //             let test_db = open_db::<TestDB>(true, &path, &ReleaseChannel::Dev).await;
                 
-                assert!(test_db.select_row::<()>(sql!(SELECT value FROM test)).unwrap()().unwrap().is_none())
-            }));
+    //             assert!(test_db.select_row::<()>(sql!(SELECT value FROM test)).unwrap()().unwrap().is_none())
+    //         }));
             
-            guards.push(guard);
-        }
+    //         guards.push(guard);
+    //     }
         
-        for guard in guards {
-            guard.join().unwrap();
-        }
-    }
+    //     for guard in guards {
+    //         guard.join().unwrap();
+    //     }
+    // }
         
     // Test bad migration panics
     #[gpui::test]
@@ -317,7 +316,7 @@ mod tests {
         }
        
         let tempdir = TempDir::new("DbTests").unwrap();
-        let _bad_db = open_db::<BadDB>(false, tempdir.path(), &util::channel::ReleaseChannel::Dev).await;
+        let _bad_db = open_db::<BadDB>(tempdir.path(), &util::channel::ReleaseChannel::Dev).await;
     }
     
     /// Test that DB exists but corrupted (causing recreate)
@@ -349,11 +348,11 @@ mod tests {
        
         let tempdir = TempDir::new("DbTests").unwrap();
         {
-            let corrupt_db = open_db::<CorruptedDB>(false, tempdir.path(), &util::channel::ReleaseChannel::Dev).await;
+            let corrupt_db = open_db::<CorruptedDB>(tempdir.path(), &util::channel::ReleaseChannel::Dev).await;
             assert!(corrupt_db.persistent());
         }
         
-        let good_db = open_db::<GoodDB>(false, tempdir.path(), &util::channel::ReleaseChannel::Dev).await;
+        let good_db = open_db::<GoodDB>(tempdir.path(), &util::channel::ReleaseChannel::Dev).await;
         assert!(good_db.select_row::<usize>("SELECT * FROM test2").unwrap()().unwrap().is_none());
         
         let mut corrupted_backup_dir = fs::read_dir(
@@ -369,4 +368,54 @@ mod tests {
         let backup = Connection::open_file(&corrupted_backup_dir.to_string_lossy());
         assert!(backup.select_row::<usize>("SELECT * FROM test").unwrap()().unwrap().is_none());
     }
+    
+    /// Test that DB exists but corrupted (causing recreate)
+    #[gpui::test]
+    async fn test_simultaneous_db_corruption() {
+        enum CorruptedDB {}
+        
+        impl Domain for CorruptedDB {
+            fn name() -> &'static str {
+                "db_tests"
+            }
+            
+            fn migrations() -> &'static [&'static str] {
+                &[sql!(CREATE TABLE test(value);)]
+            }
+        }
+        
+        enum GoodDB {}
+        
+        impl Domain for GoodDB {
+            fn name() -> &'static str {
+                "db_tests" //Notice same name
+            }
+            
+            fn migrations() -> &'static [&'static str] {
+                &[sql!(CREATE TABLE test2(value);)] //But different migration
+            }
+        }
+       
+        let tempdir = TempDir::new("DbTests").unwrap();
+        {
+            let corrupt_db = open_db::<CorruptedDB>(tempdir.path(), &util::channel::ReleaseChannel::Dev).await;
+            assert!(corrupt_db.persistent());
+        }
+        
+        let mut guards = vec![];
+        for _ in 0..10 {
+            let tmp_path = tempdir.path().to_path_buf();
+            let guard = thread::spawn(move || {
+                let good_db = smol::block_on(open_db::<GoodDB>(tmp_path.as_path(), &util::channel::ReleaseChannel::Dev));
+                assert!(good_db.select_row::<usize>("SELECT * FROM test2").unwrap()().unwrap().is_none());
+            });
+            
+            guards.push(guard);
+        
+        }
+        
+       for guard in guards.into_iter() {
+           assert!(guard.join().is_ok());
+       }
+    }
 }