1pub mod kvp;
2pub mod query;
3
4// Re-export
5pub use anyhow;
6use anyhow::Context;
7pub use indoc::indoc;
8pub use lazy_static;
9use parking_lot::{Mutex, RwLock};
10pub use smol;
11pub use sqlez;
12pub use sqlez_macros;
13pub use util::channel::{RELEASE_CHANNEL, RELEASE_CHANNEL_NAME};
14pub use util::paths::DB_DIR;
15
16use sqlez::domain::Migrator;
17use sqlez::thread_safe_connection::ThreadSafeConnection;
18use sqlez_macros::sql;
19use std::fs::create_dir_all;
20use std::path::{Path, PathBuf};
21use std::sync::atomic::{AtomicBool, Ordering};
22use std::time::{SystemTime, UNIX_EPOCH};
23use util::{async_iife, ResultExt};
24use util::channel::ReleaseChannel;
25
26const CONNECTION_INITIALIZE_QUERY: &'static str = sql!(
27 PRAGMA foreign_keys=TRUE;
28);
29
30const DB_INITIALIZE_QUERY: &'static str = sql!(
31 PRAGMA journal_mode=WAL;
32 PRAGMA busy_timeout=1;
33 PRAGMA case_sensitive_like=TRUE;
34 PRAGMA synchronous=NORMAL;
35);
36
37const FALLBACK_DB_NAME: &'static str = "FALLBACK_MEMORY_DB";
38
39const DB_FILE_NAME: &'static str = "db.sqlite";
40
41lazy_static::lazy_static! {
42 static ref DB_FILE_OPERATIONS: Mutex<()> = Mutex::new(());
43 pub static ref BACKUP_DB_PATH: RwLock<Option<PathBuf>> = RwLock::new(None);
44 pub static ref ALL_FILE_DB_FAILED: AtomicBool = AtomicBool::new(false);
45}
46
47/// Open or create a database at the given directory path.
48/// This will retry a couple times if there are failures. If opening fails once, the db directory
49/// is moved to a backup folder and a new one is created. If that fails, a shared in memory db is created.
50/// In either case, static variables are set so that the user can be notified.
51pub async fn open_db<M: Migrator + 'static>(db_dir: &Path, release_channel: &ReleaseChannel) -> ThreadSafeConnection<M> {
52 let release_channel_name = release_channel.dev_name();
53 let main_db_dir = db_dir.join(Path::new(&format!("0-{}", release_channel_name)));
54
55 let connection = async_iife!({
56 // Note: This still has a race condition where 1 set of migrations succeeds
57 // (e.g. (Workspace, Editor)) and another fails (e.g. (Workspace, Terminal))
58 // This will cause the first connection to have the database taken out
59 // from under it. This *should* be fine though. The second dabatase failure will
60 // cause errors in the log and so should be observed by developers while writing
61 // soon-to-be good migrations. If user databases are corrupted, we toss them out
62 // and try again from a blank. As long as running all migrations from start to end
63 // on a blank database is ok, this race condition will never be triggered.
64 //
65 // Basically: Don't ever push invalid migrations to stable or everyone will have
66 // a bad time.
67
68 // If no db folder, create one at 0-{channel}
69 create_dir_all(&main_db_dir).context("Could not create db directory")?;
70 let db_path = main_db_dir.join(Path::new(DB_FILE_NAME));
71
72 // Optimistically open databases in parallel
73 if !DB_FILE_OPERATIONS.is_locked() {
74 // Try building a connection
75 if let Some(connection) = open_main_db(&db_path).await {
76 return Ok(connection)
77 };
78 }
79
80 // Take a lock in the failure case so that we move the db once per process instead
81 // of potentially multiple times from different threads. This shouldn't happen in the
82 // normal path
83 let _lock = DB_FILE_OPERATIONS.lock();
84 if let Some(connection) = open_main_db(&db_path).await {
85 return Ok(connection)
86 };
87
88 let backup_timestamp = SystemTime::now()
89 .duration_since(UNIX_EPOCH)
90 .expect("System clock is set before the unix timestamp, Zed does not support this region of spacetime")
91 .as_millis();
92
93 // If failed, move 0-{channel} to {current unix timestamp}-{channel}
94 let backup_db_dir = db_dir.join(Path::new(&format!(
95 "{}-{}",
96 backup_timestamp,
97 release_channel_name,
98 )));
99
100 std::fs::rename(&main_db_dir, &backup_db_dir)
101 .context("Failed clean up corrupted database, panicking.")?;
102
103 // Set a static ref with the failed timestamp and error so we can notify the user
104 {
105 let mut guard = BACKUP_DB_PATH.write();
106 *guard = Some(backup_db_dir);
107 }
108
109 // Create a new 0-{channel}
110 create_dir_all(&main_db_dir).context("Should be able to create the database directory")?;
111 let db_path = main_db_dir.join(Path::new(DB_FILE_NAME));
112
113 // Try again
114 open_main_db(&db_path).await.context("Could not newly created db")
115 }).await.log_err();
116
117 if let Some(connection) = connection {
118 return connection;
119 }
120
121 // Set another static ref so that we can escalate the notification
122 ALL_FILE_DB_FAILED.store(true, Ordering::Release);
123
124 // If still failed, create an in memory db with a known name
125 open_fallback_db().await
126}
127
128async fn open_main_db<M: Migrator>(db_path: &PathBuf) -> Option<ThreadSafeConnection<M>> {
129 log::info!("Opening main db");
130 ThreadSafeConnection::<M>::builder(db_path.to_string_lossy().as_ref(), true)
131 .with_db_initialization_query(DB_INITIALIZE_QUERY)
132 .with_connection_initialize_query(CONNECTION_INITIALIZE_QUERY)
133 .build()
134 .await
135 .log_err()
136}
137
138async fn open_fallback_db<M: Migrator>() -> ThreadSafeConnection<M> {
139 log::info!("Opening fallback db");
140 ThreadSafeConnection::<M>::builder(FALLBACK_DB_NAME, false)
141 .with_db_initialization_query(DB_INITIALIZE_QUERY)
142 .with_connection_initialize_query(CONNECTION_INITIALIZE_QUERY)
143 .build()
144 .await
145 .expect(
146 "Fallback in memory database failed. Likely initialization queries or migrations have fundamental errors",
147 )
148}
149
150#[cfg(any(test, feature = "test-support"))]
151pub async fn open_test_db<M: Migrator>(db_name: &str) -> ThreadSafeConnection<M> {
152 use sqlez::thread_safe_connection::locking_queue;
153
154 ThreadSafeConnection::<M>::builder(db_name, false)
155 .with_db_initialization_query(DB_INITIALIZE_QUERY)
156 .with_connection_initialize_query(CONNECTION_INITIALIZE_QUERY)
157 // Serialize queued writes via a mutex and run them synchronously
158 .with_write_queue_constructor(locking_queue())
159 .build()
160 .await
161 .unwrap()
162}
163
164/// Implements a basic DB wrapper for a given domain
165#[macro_export]
166macro_rules! define_connection {
167 (pub static ref $id:ident: $t:ident<()> = $migrations:expr;) => {
168 pub struct $t($crate::sqlez::thread_safe_connection::ThreadSafeConnection<$t>);
169
170 impl ::std::ops::Deref for $t {
171 type Target = $crate::sqlez::thread_safe_connection::ThreadSafeConnection<$t>;
172
173 fn deref(&self) -> &Self::Target {
174 &self.0
175 }
176 }
177
178 impl $crate::sqlez::domain::Domain for $t {
179 fn name() -> &'static str {
180 stringify!($t)
181 }
182
183 fn migrations() -> &'static [&'static str] {
184 $migrations
185 }
186 }
187
188 #[cfg(any(test, feature = "test-support"))]
189 $crate::lazy_static::lazy_static! {
190 pub static ref $id: $t = $t($crate::smol::block_on($crate::open_test_db(stringify!($id))));
191 }
192
193 #[cfg(not(any(test, feature = "test-support")))]
194 $crate::lazy_static::lazy_static! {
195 pub static ref $id: $t = $t($crate::smol::block_on($crate::open_db(&$crate::DB_DIR, &$crate::RELEASE_CHANNEL)));
196 }
197 };
198 (pub static ref $id:ident: $t:ident<$($d:ty),+> = $migrations:expr;) => {
199 pub struct $t($crate::sqlez::thread_safe_connection::ThreadSafeConnection<( $($d),+, $t )>);
200
201 impl ::std::ops::Deref for $t {
202 type Target = $crate::sqlez::thread_safe_connection::ThreadSafeConnection<($($d),+, $t)>;
203
204 fn deref(&self) -> &Self::Target {
205 &self.0
206 }
207 }
208
209 impl $crate::sqlez::domain::Domain for $t {
210 fn name() -> &'static str {
211 stringify!($t)
212 }
213
214 fn migrations() -> &'static [&'static str] {
215 $migrations
216 }
217 }
218
219 #[cfg(any(test, feature = "test-support"))]
220 $crate::lazy_static::lazy_static! {
221 pub static ref $id: $t = $t($crate::smol::block_on($crate::open_test_db(stringify!($id))));
222 }
223
224 #[cfg(not(any(test, feature = "test-support")))]
225 $crate::lazy_static::lazy_static! {
226 pub static ref $id: $t = $t($crate::smol::block_on($crate::open_db(&$crate::DB_DIR, &$crate::RELEASE_CHANNEL)));
227 }
228 };
229}
230
231#[cfg(test)]
232mod tests {
233 use std::{fs, thread};
234
235 use sqlez::{domain::Domain, connection::Connection};
236 use sqlez_macros::sql;
237 use tempdir::TempDir;
238
239 use crate::{open_db, DB_FILE_NAME};
240
241 // Test bad migration panics
242 #[gpui::test]
243 #[should_panic]
244 async fn test_bad_migration_panics() {
245 enum BadDB {}
246
247 impl Domain for BadDB {
248 fn name() -> &'static str {
249 "db_tests"
250 }
251
252 fn migrations() -> &'static [&'static str] {
253 &[sql!(CREATE TABLE test(value);),
254 // failure because test already exists
255 sql!(CREATE TABLE test(value);)]
256 }
257 }
258
259 let tempdir = TempDir::new("DbTests").unwrap();
260 let _bad_db = open_db::<BadDB>(tempdir.path(), &util::channel::ReleaseChannel::Dev).await;
261 }
262
263 /// Test that DB exists but corrupted (causing recreate)
264 #[gpui::test]
265 async fn test_db_corruption() {
266 enum CorruptedDB {}
267
268 impl Domain for CorruptedDB {
269 fn name() -> &'static str {
270 "db_tests"
271 }
272
273 fn migrations() -> &'static [&'static str] {
274 &[sql!(CREATE TABLE test(value);)]
275 }
276 }
277
278 enum GoodDB {}
279
280 impl Domain for GoodDB {
281 fn name() -> &'static str {
282 "db_tests" //Notice same name
283 }
284
285 fn migrations() -> &'static [&'static str] {
286 &[sql!(CREATE TABLE test2(value);)] //But different migration
287 }
288 }
289
290 let tempdir = TempDir::new("DbTests").unwrap();
291 {
292 let corrupt_db = open_db::<CorruptedDB>(tempdir.path(), &util::channel::ReleaseChannel::Dev).await;
293 assert!(corrupt_db.persistent());
294 }
295
296
297 let good_db = open_db::<GoodDB>(tempdir.path(), &util::channel::ReleaseChannel::Dev).await;
298 assert!(good_db.select_row::<usize>("SELECT * FROM test2").unwrap()().unwrap().is_none());
299
300 let mut corrupted_backup_dir = fs::read_dir(
301 tempdir.path()
302 ).unwrap().find(|entry| {
303 !entry.as_ref().unwrap().file_name().to_str().unwrap().starts_with("0")
304 }
305 ).unwrap().unwrap().path();
306 corrupted_backup_dir.push(DB_FILE_NAME);
307
308 dbg!(&corrupted_backup_dir);
309
310 let backup = Connection::open_file(&corrupted_backup_dir.to_string_lossy());
311 assert!(backup.select_row::<usize>("SELECT * FROM test").unwrap()().unwrap().is_none());
312 }
313
314 /// Test that DB exists but corrupted (causing recreate)
315 #[gpui::test]
316 async fn test_simultaneous_db_corruption() {
317 enum CorruptedDB {}
318
319 impl Domain for CorruptedDB {
320 fn name() -> &'static str {
321 "db_tests"
322 }
323
324 fn migrations() -> &'static [&'static str] {
325 &[sql!(CREATE TABLE test(value);)]
326 }
327 }
328
329 enum GoodDB {}
330
331 impl Domain for GoodDB {
332 fn name() -> &'static str {
333 "db_tests" //Notice same name
334 }
335
336 fn migrations() -> &'static [&'static str] {
337 &[sql!(CREATE TABLE test2(value);)] //But different migration
338 }
339 }
340
341 let tempdir = TempDir::new("DbTests").unwrap();
342 {
343 // Setup the bad database
344 let corrupt_db = open_db::<CorruptedDB>(tempdir.path(), &util::channel::ReleaseChannel::Dev).await;
345 assert!(corrupt_db.persistent());
346 }
347
348 // Try to connect to it a bunch of times at once
349 let mut guards = vec![];
350 for _ in 0..10 {
351 let tmp_path = tempdir.path().to_path_buf();
352 let guard = thread::spawn(move || {
353 let good_db = smol::block_on(open_db::<GoodDB>(tmp_path.as_path(), &util::channel::ReleaseChannel::Dev));
354 assert!(good_db.select_row::<usize>("SELECT * FROM test2").unwrap()().unwrap().is_none());
355 });
356
357 guards.push(guard);
358
359 }
360
361 for guard in guards.into_iter() {
362 assert!(guard.join().is_ok());
363 }
364 }
365}