db.rs

  1pub mod kvp;
  2pub mod query;
  3
  4// Re-export
  5pub use anyhow;
  6use anyhow::Context;
  7pub use indoc::indoc;
  8pub use lazy_static;
  9use parking_lot::{Mutex, RwLock};
 10pub use smol;
 11pub use sqlez;
 12pub use sqlez_macros;
 13pub use util::channel::{RELEASE_CHANNEL, RELEASE_CHANNEL_NAME};
 14pub use util::paths::DB_DIR;
 15
 16use sqlez::domain::Migrator;
 17use sqlez::thread_safe_connection::ThreadSafeConnection;
 18use sqlez_macros::sql;
 19use std::fs::create_dir_all;
 20use std::path::{Path, PathBuf};
 21use std::sync::atomic::{AtomicBool, Ordering};
 22use std::time::{SystemTime, UNIX_EPOCH};
 23use util::{async_iife, ResultExt};
 24use util::channel::ReleaseChannel;
 25
 26const CONNECTION_INITIALIZE_QUERY: &'static str = sql!(
 27    PRAGMA foreign_keys=TRUE;
 28);
 29
 30const DB_INITIALIZE_QUERY: &'static str = sql!(
 31    PRAGMA journal_mode=WAL;
 32    PRAGMA busy_timeout=1;
 33    PRAGMA case_sensitive_like=TRUE;
 34    PRAGMA synchronous=NORMAL;
 35);
 36
 37const FALLBACK_DB_NAME: &'static str = "FALLBACK_MEMORY_DB";
 38
 39const DB_FILE_NAME: &'static str = "db.sqlite";
 40
 41lazy_static::lazy_static! {
 42    static ref DB_FILE_OPERATIONS: Mutex<()> = Mutex::new(());
 43    pub static ref BACKUP_DB_PATH: RwLock<Option<PathBuf>> = RwLock::new(None);
 44    pub static ref ALL_FILE_DB_FAILED: AtomicBool = AtomicBool::new(false);    
 45}
 46
 47/// Open or create a database at the given directory path.
 48/// This will retry a couple times if there are failures. If opening fails once, the db directory
 49/// is moved to a backup folder and a new one is created. If that fails, a shared in memory db is created.
 50/// In either case, static variables are set so that the user can be notified.
 51pub async fn open_db<M: Migrator + 'static>(db_dir: &Path, release_channel: &ReleaseChannel) -> ThreadSafeConnection<M> {
 52    let release_channel_name = release_channel.dev_name();
 53    let main_db_dir = db_dir.join(Path::new(&format!("0-{}", release_channel_name)));
 54
 55    let connection = async_iife!({
 56        // Note: This still has a race condition where 1 set of migrations succeeds
 57        // (e.g. (Workspace, Editor)) and another fails (e.g. (Workspace, Terminal))
 58        // This will cause the first connection to have the database taken out 
 59        // from under it. This *should* be fine though. The second dabatase failure will
 60        // cause errors in the log and so should be observed by developers while writing
 61        // soon-to-be good migrations. If user databases are corrupted, we toss them out
 62        // and try again from a blank. As long as running all migrations from start to end 
 63        // on a blank database is ok, this race condition will never be triggered.
 64        //
 65        // Basically: Don't ever push invalid migrations to stable or everyone will have
 66        // a bad time.
 67        
 68        // If no db folder, create one at 0-{channel}
 69        create_dir_all(&main_db_dir).context("Could not create db directory")?;
 70        let db_path = main_db_dir.join(Path::new(DB_FILE_NAME));
 71        
 72        // Optimistically open databases in parallel
 73        if !DB_FILE_OPERATIONS.is_locked() {
 74            // Try building a connection
 75            if let Some(connection) = open_main_db(&db_path).await {
 76                return Ok(connection)
 77            };
 78        }
 79        
 80        // Take a lock in the failure case so that we move the db once per process instead 
 81        // of potentially multiple times from different threads. This shouldn't happen in the
 82        // normal path
 83        let _lock = DB_FILE_OPERATIONS.lock();
 84        if let Some(connection) = open_main_db(&db_path).await {
 85            return Ok(connection)
 86        };
 87        
 88        let backup_timestamp = SystemTime::now()
 89            .duration_since(UNIX_EPOCH)
 90            .expect("System clock is set before the unix timestamp, Zed does not support this region of spacetime")
 91            .as_millis();
 92        
 93        // If failed, move 0-{channel} to {current unix timestamp}-{channel}
 94        let backup_db_dir = db_dir.join(Path::new(&format!(
 95            "{}-{}",
 96            backup_timestamp,
 97            release_channel_name,
 98        )));
 99
100        std::fs::rename(&main_db_dir, &backup_db_dir)
101            .context("Failed clean up corrupted database, panicking.")?;
102
103        // Set a static ref with the failed timestamp and error so we can notify the user
104        {
105            let mut guard = BACKUP_DB_PATH.write();
106            *guard = Some(backup_db_dir);
107        }
108        
109        // Create a new 0-{channel}
110        create_dir_all(&main_db_dir).context("Should be able to create the database directory")?;
111        let db_path = main_db_dir.join(Path::new(DB_FILE_NAME));
112
113        // Try again
114        open_main_db(&db_path).await.context("Could not newly created db")
115    }).await.log_err();
116
117    if let Some(connection) = connection {
118        return connection;
119    }
120   
121    // Set another static ref so that we can escalate the notification
122    ALL_FILE_DB_FAILED.store(true, Ordering::Release);
123    
124    // If still failed, create an in memory db with a known name
125    open_fallback_db().await
126}
127
128async fn open_main_db<M: Migrator>(db_path: &PathBuf) -> Option<ThreadSafeConnection<M>> {
129    log::info!("Opening main db");
130    ThreadSafeConnection::<M>::builder(db_path.to_string_lossy().as_ref(), true)
131        .with_db_initialization_query(DB_INITIALIZE_QUERY)
132        .with_connection_initialize_query(CONNECTION_INITIALIZE_QUERY)
133        .build()
134        .await
135        .log_err()
136}
137
138async fn open_fallback_db<M: Migrator>() -> ThreadSafeConnection<M> {
139    log::info!("Opening fallback db");
140    ThreadSafeConnection::<M>::builder(FALLBACK_DB_NAME, false)
141        .with_db_initialization_query(DB_INITIALIZE_QUERY)
142        .with_connection_initialize_query(CONNECTION_INITIALIZE_QUERY)
143        .build()
144        .await
145        .expect(
146            "Fallback in memory database failed. Likely initialization queries or migrations have fundamental errors",
147        )
148}
149
150#[cfg(any(test, feature = "test-support"))]
151pub async fn open_test_db<M: Migrator>(db_name: &str) -> ThreadSafeConnection<M> {
152    use sqlez::thread_safe_connection::locking_queue;
153
154    ThreadSafeConnection::<M>::builder(db_name, false)
155        .with_db_initialization_query(DB_INITIALIZE_QUERY)
156        .with_connection_initialize_query(CONNECTION_INITIALIZE_QUERY)
157        // Serialize queued writes via a mutex and run them synchronously
158        .with_write_queue_constructor(locking_queue())
159        .build()
160        .await
161        .unwrap()
162}
163
164/// Implements a basic DB wrapper for a given domain
165#[macro_export]
166macro_rules! define_connection {
167    (pub static ref $id:ident: $t:ident<()> = $migrations:expr;) => {
168        pub struct $t($crate::sqlez::thread_safe_connection::ThreadSafeConnection<$t>);
169
170        impl ::std::ops::Deref for $t {
171            type Target = $crate::sqlez::thread_safe_connection::ThreadSafeConnection<$t>;
172
173            fn deref(&self) -> &Self::Target {
174                &self.0
175            }
176        }
177        
178        impl $crate::sqlez::domain::Domain for $t {
179            fn name() -> &'static str {
180                stringify!($t)
181            }
182            
183            fn migrations() -> &'static [&'static str] {
184                $migrations
185            } 
186        }
187
188        #[cfg(any(test, feature = "test-support"))]
189        $crate::lazy_static::lazy_static! {
190            pub static ref $id: $t = $t($crate::smol::block_on($crate::open_test_db(stringify!($id))));
191        }
192
193        #[cfg(not(any(test, feature = "test-support")))]
194        $crate::lazy_static::lazy_static! {
195            pub static ref $id: $t = $t($crate::smol::block_on($crate::open_db(&$crate::DB_DIR, &$crate::RELEASE_CHANNEL)));
196        }
197    };
198    (pub static ref $id:ident: $t:ident<$($d:ty),+> = $migrations:expr;) => {
199        pub struct $t($crate::sqlez::thread_safe_connection::ThreadSafeConnection<( $($d),+, $t )>);
200
201        impl ::std::ops::Deref for $t {
202            type Target = $crate::sqlez::thread_safe_connection::ThreadSafeConnection<($($d),+, $t)>;
203
204            fn deref(&self) -> &Self::Target {
205                &self.0
206            }
207        }
208        
209        impl $crate::sqlez::domain::Domain for $t {
210            fn name() -> &'static str {
211                stringify!($t)
212            }
213            
214            fn migrations() -> &'static [&'static str] {
215                $migrations
216            } 
217        }
218
219        #[cfg(any(test, feature = "test-support"))]
220        $crate::lazy_static::lazy_static! {
221            pub static ref $id: $t = $t($crate::smol::block_on($crate::open_test_db(stringify!($id))));
222        }
223
224        #[cfg(not(any(test, feature = "test-support")))]
225        $crate::lazy_static::lazy_static! {
226            pub static ref $id: $t = $t($crate::smol::block_on($crate::open_db(&$crate::DB_DIR, &$crate::RELEASE_CHANNEL)));
227        }
228    };
229}
230
231#[cfg(test)]
232mod tests {
233    use std::{fs, thread};
234
235    use sqlez::{domain::Domain, connection::Connection};
236    use sqlez_macros::sql;
237    use tempdir::TempDir;
238
239    use crate::{open_db, DB_FILE_NAME};
240        
241    // Test bad migration panics
242    #[gpui::test]
243    #[should_panic]
244    async fn test_bad_migration_panics() {
245        enum BadDB {}
246        
247        impl Domain for BadDB {
248            fn name() -> &'static str {
249                "db_tests"
250            }
251            
252            fn migrations() -> &'static [&'static str] {
253                &[sql!(CREATE TABLE test(value);),
254                    // failure because test already exists
255                  sql!(CREATE TABLE test(value);)]
256            }
257        }
258       
259        let tempdir = TempDir::new("DbTests").unwrap();
260        let _bad_db = open_db::<BadDB>(tempdir.path(), &util::channel::ReleaseChannel::Dev).await;
261    }
262    
263    /// Test that DB exists but corrupted (causing recreate)
264    #[gpui::test]
265    async fn test_db_corruption() {
266        enum CorruptedDB {}
267        
268        impl Domain for CorruptedDB {
269            fn name() -> &'static str {
270                "db_tests"
271            }
272            
273            fn migrations() -> &'static [&'static str] {
274                &[sql!(CREATE TABLE test(value);)]
275            }
276        }
277        
278        enum GoodDB {}
279        
280        impl Domain for GoodDB {
281            fn name() -> &'static str {
282                "db_tests" //Notice same name
283            }
284            
285            fn migrations() -> &'static [&'static str] {
286                &[sql!(CREATE TABLE test2(value);)] //But different migration
287            }
288        }
289       
290        let tempdir = TempDir::new("DbTests").unwrap();
291        {
292            let corrupt_db = open_db::<CorruptedDB>(tempdir.path(), &util::channel::ReleaseChannel::Dev).await;
293            assert!(corrupt_db.persistent());
294        }
295        
296        
297        let good_db = open_db::<GoodDB>(tempdir.path(), &util::channel::ReleaseChannel::Dev).await;
298        assert!(good_db.select_row::<usize>("SELECT * FROM test2").unwrap()().unwrap().is_none());
299        
300        let mut corrupted_backup_dir = fs::read_dir(
301            tempdir.path()
302        ).unwrap().find(|entry| {
303            !entry.as_ref().unwrap().file_name().to_str().unwrap().starts_with("0")
304        }
305        ).unwrap().unwrap().path();
306        corrupted_backup_dir.push(DB_FILE_NAME);
307        
308        dbg!(&corrupted_backup_dir);
309        
310        let backup = Connection::open_file(&corrupted_backup_dir.to_string_lossy());
311        assert!(backup.select_row::<usize>("SELECT * FROM test").unwrap()().unwrap().is_none());
312    }
313    
314    /// Test that DB exists but corrupted (causing recreate)
315    #[gpui::test]
316    async fn test_simultaneous_db_corruption() {
317        enum CorruptedDB {}
318        
319        impl Domain for CorruptedDB {
320            fn name() -> &'static str {
321                "db_tests"
322            }
323            
324            fn migrations() -> &'static [&'static str] {
325                &[sql!(CREATE TABLE test(value);)]
326            }
327        }
328        
329        enum GoodDB {}
330        
331        impl Domain for GoodDB {
332            fn name() -> &'static str {
333                "db_tests" //Notice same name
334            }
335            
336            fn migrations() -> &'static [&'static str] {
337                &[sql!(CREATE TABLE test2(value);)] //But different migration
338            }
339        }
340       
341        let tempdir = TempDir::new("DbTests").unwrap();
342        {
343            // Setup the bad database
344            let corrupt_db = open_db::<CorruptedDB>(tempdir.path(), &util::channel::ReleaseChannel::Dev).await;
345            assert!(corrupt_db.persistent());
346        }
347        
348        // Try to connect to it a bunch of times at once
349        let mut guards = vec![];
350        for _ in 0..10 {
351            let tmp_path = tempdir.path().to_path_buf();
352            let guard = thread::spawn(move || {
353                let good_db = smol::block_on(open_db::<GoodDB>(tmp_path.as_path(), &util::channel::ReleaseChannel::Dev));
354                assert!(good_db.select_row::<usize>("SELECT * FROM test2").unwrap()().unwrap().is_none());
355            });
356            
357            guards.push(guard);
358        
359        }
360        
361       for guard in guards.into_iter() {
362           assert!(guard.join().is_ok());
363       }
364    }
365}