First draft of graceful corruption restoration

Mikayla Maki created

Change summary

crates/db/src/db.rs                        | 79 +++++++++++++++++++++--
crates/sqlez/src/thread_safe_connection.rs | 43 +++++++-----
crates/util/src/lib.rs                     |  7 ++
3 files changed, 103 insertions(+), 26 deletions(-)

Detailed changes

crates/db/src/db.rs 🔗

@@ -2,6 +2,7 @@ pub mod kvp;
 
 // Re-export
 pub use anyhow;
+use anyhow::Context;
 pub use indoc::indoc;
 pub use lazy_static;
 pub use smol;
@@ -14,9 +15,13 @@ use sqlez_macros::sql;
 use std::fs::{create_dir_all, remove_dir_all};
 use std::path::Path;
 use std::sync::atomic::{AtomicBool, Ordering};
+use std::time::{SystemTime, UNIX_EPOCH};
+use util::{async_iife, ResultExt};
 use util::channel::{ReleaseChannel, RELEASE_CHANNEL, RELEASE_CHANNEL_NAME};
 use util::paths::DB_DIR;
 
+// TODO: Add a savepoint to the thread safe connection initialization and migrations
+
 const CONNECTION_INITIALIZE_QUERY: &'static str = sql!(
     PRAGMA synchronous=NORMAL;
     PRAGMA busy_timeout=1;
@@ -28,31 +33,90 @@ const DB_INITIALIZE_QUERY: &'static str = sql!(
     PRAGMA journal_mode=WAL;
 );
 
+const FALLBACK_DB_NAME: &'static str = "FALLBACK_MEMORY_DB";
+
 lazy_static::lazy_static! {
     static ref DB_WIPED: AtomicBool = AtomicBool::new(false);
 }
 
 /// Open or create a database at the given directory path.
 pub async fn open_db<M: Migrator>() -> ThreadSafeConnection<M> {
-    // Use 0 for now. Will implement incrementing and clearing of old db files soon TM
-    let current_db_dir = (*DB_DIR).join(Path::new(&format!("0-{}", *RELEASE_CHANNEL_NAME)));
+    let db_dir = (*DB_DIR).join(Path::new(&format!("0-{}", *RELEASE_CHANNEL_NAME)));
 
+    // If WIPE_DB, delete 0-{channel}
     if *RELEASE_CHANNEL == ReleaseChannel::Dev
         && std::env::var("WIPE_DB").is_ok()
         && !DB_WIPED.load(Ordering::Acquire)
     {
-        remove_dir_all(&current_db_dir).ok();
-        DB_WIPED.store(true, Ordering::Relaxed);
+        remove_dir_all(&db_dir).ok();
+        DB_WIPED.store(true, Ordering::Release);
     }
 
-    create_dir_all(&current_db_dir).expect("Should be able to create the database directory");
-    let db_path = current_db_dir.join(Path::new("db.sqlite"));
+    let connection = async_iife!({
+        // If no db folder, create one at 0-{channel}
+        create_dir_all(&db_dir).context("Could not create db directory")?;
+        let db_path = db_dir.join(Path::new("db.sqlite"));
+
+        // Try building a connection
+        if let Some(connection) = ThreadSafeConnection::<M>::builder(db_path.to_string_lossy().as_ref(), true)
+                .with_db_initialization_query(DB_INITIALIZE_QUERY)
+                .with_connection_initialize_query(CONNECTION_INITIALIZE_QUERY)
+                .build()
+                .await
+                .log_err() {
+            return Ok(connection)
+        }
+        
+        let backup_timestamp = SystemTime::now()
+            .duration_since(UNIX_EPOCH)
+            .expect(
+                "System clock is set before the unix timestamp, Zed does not support this region of spacetime"
+            )
+            .as_millis();
+        
+        // If failed, move 0-{channel} to {current unix timestamp}-{channel}
+        let backup_db_dir = (*DB_DIR).join(Path::new(&format!(
+            "{}{}",
+            backup_timestamp,
+            *RELEASE_CHANNEL_NAME
+        )));
+
+        std::fs::rename(&db_dir, backup_db_dir)
+            .context("Failed clean up corrupted database, panicking.")?;
+
+        // TODO: Set a constant with the failed timestamp and error so we can notify the user
+
+        // Create a new 0-{channel}
+        create_dir_all(&db_dir).context("Should be able to create the database directory")?;
+        let db_path = db_dir.join(Path::new("db.sqlite"));
+
+        // Try again
+        ThreadSafeConnection::<M>::builder(db_path.to_string_lossy().as_ref(), true)
+            .with_db_initialization_query(DB_INITIALIZE_QUERY)
+            .with_connection_initialize_query(CONNECTION_INITIALIZE_QUERY)
+            .build()
+            .await
+    }).await.log_err();
+
+    if let Some(connection) = connection  {
+        return connection;
+    }
+   
+    // TODO: Set another constant so that we can escalate the notification
+    
+    // If still failed, create an in memory db with a known name
+    open_fallback_db().await
+}
 
-    ThreadSafeConnection::<M>::builder(db_path.to_string_lossy().as_ref(), true)
+async fn open_fallback_db<M: Migrator>() -> ThreadSafeConnection<M> {
+    ThreadSafeConnection::<M>::builder(FALLBACK_DB_NAME, false)
         .with_db_initialization_query(DB_INITIALIZE_QUERY)
         .with_connection_initialize_query(CONNECTION_INITIALIZE_QUERY)
         .build()
         .await
+        .expect(
+            "Fallback in memory database failed. Likely initialization queries or migrations have fundamental errors",
+        )
 }
 
 #[cfg(any(test, feature = "test-support"))]
@@ -66,6 +130,7 @@ pub async fn open_test_db<M: Migrator>(db_name: &str) -> ThreadSafeConnection<M>
         .with_write_queue_constructor(locking_queue())
         .build()
         .await
+        .unwrap()
 }
 
 /// Implements a basic DB wrapper for a given domain

crates/sqlez/src/thread_safe_connection.rs 🔗

@@ -1,3 +1,4 @@
+use anyhow::Context;
 use futures::{channel::oneshot, Future, FutureExt};
 use lazy_static::lazy_static;
 use parking_lot::{Mutex, RwLock};
@@ -72,7 +73,7 @@ impl<M: Migrator> ThreadSafeConnectionBuilder<M> {
         self
     }
 
-    pub async fn build(self) -> ThreadSafeConnection<M> {
+    pub async fn build(self) -> anyhow::Result<ThreadSafeConnection<M>> {
         self.connection
             .initialize_queues(self.write_queue_constructor);
 
@@ -81,26 +82,33 @@ impl<M: Migrator> ThreadSafeConnectionBuilder<M> {
         self.connection
             .write(move |connection| {
                 if let Some(db_initialize_query) = db_initialize_query {
-                    connection.exec(db_initialize_query).expect(&format!(
-                        "Db initialize query failed to execute: {}",
-                        db_initialize_query
-                    ))()
-                    .unwrap();
+                    connection.exec(db_initialize_query).with_context(|| {
+                        format!(
+                            "Db initialize query failed to execute: {}",
+                            db_initialize_query
+                        )
+                    })?()?;
                 }
 
-                let mut failure_result = None;
+                // Retry failed migrations in case they were run in parallel from different
+                // processes. This gives a best attempt at migrating before bailing
+                let mut migration_result =
+                    anyhow::Result::<()>::Err(anyhow::anyhow!("Migration never run"));
+
                 for _ in 0..MIGRATION_RETRIES {
-                    failure_result = Some(M::migrate(connection));
-                    if failure_result.as_ref().unwrap().is_ok() {
+                    migration_result = connection
+                        .with_savepoint("thread_safe_multi_migration", || M::migrate(connection));
+
+                    if migration_result.is_ok() {
                         break;
                     }
                 }
 
-                failure_result.unwrap().expect("Migration failed");
+                migration_result
             })
-            .await;
+            .await?;
 
-        self.connection
+        Ok(self.connection)
     }
 }
 
@@ -240,10 +248,6 @@ impl<D: Domain> Clone for ThreadSafeConnection<D> {
     }
 }
 
-// TODO:
-//  1. When migration or initialization fails, move the corrupted db to a holding place and create a new one
-//  2. If the new db also fails, downgrade to a shared in memory db
-//  3. In either case notify the user about what went wrong
 impl<M: Migrator> Deref for ThreadSafeConnection<M> {
     type Target = Connection;
 
@@ -265,7 +269,7 @@ pub fn locking_queue() -> WriteQueueConstructor {
 #[cfg(test)]
 mod test {
     use indoc::indoc;
-    use lazy_static::__Deref;
+    use std::ops::Deref;
     use std::thread;
 
     use crate::{domain::Domain, thread_safe_connection::ThreadSafeConnection};
@@ -295,7 +299,8 @@ mod test {
                                 PRAGMA foreign_keys=TRUE;
                                 PRAGMA case_sensitive_like=TRUE;
                             "});
-                let _ = smol::block_on(builder.build()).deref();
+
+                let _ = smol::block_on(builder.build()).unwrap().deref();
             }));
         }
 
@@ -341,6 +346,6 @@ mod test {
             ThreadSafeConnection::<TestWorkspace>::builder("wild_zed_lost_failure", false)
                 .with_connection_initialize_query("PRAGMA FOREIGN_KEYS=true");
 
-        smol::block_on(builder.build());
+        smol::block_on(builder.build()).unwrap();
     }
 }

crates/util/src/lib.rs 🔗

@@ -223,6 +223,13 @@ macro_rules! iife {
     };
 }
 
+#[macro_export]
+macro_rules! async_iife {
+    ($block:block) => {
+        (|| async move { $block })()
+    };
+}
+
 #[cfg(test)]
 mod tests {
     use super::*;