db.rs

  1use crate::{
  2    embedding::Embedding,
  3    parsing::{Document, DocumentDigest},
  4    SEMANTIC_INDEX_VERSION,
  5};
  6use anyhow::{anyhow, Context, Result};
  7use futures::channel::oneshot;
  8use gpui::executor;
  9use project::{search::PathMatcher, Fs};
 10use rpc::proto::Timestamp;
 11use rusqlite::params;
 12use rusqlite::types::Value;
 13use std::{
 14    cmp::Ordering,
 15    collections::HashMap,
 16    future::Future,
 17    ops::Range,
 18    path::{Path, PathBuf},
 19    rc::Rc,
 20    sync::Arc,
 21    time::SystemTime,
 22};
 23use util::TryFutureExt;
 24
 25#[derive(Debug)]
 26pub struct FileRecord {
 27    pub id: usize,
 28    pub relative_path: String,
 29    pub mtime: Timestamp,
 30}
 31
 32#[derive(Clone)]
 33pub struct VectorDatabase {
 34    path: Arc<Path>,
 35    transactions:
 36        smol::channel::Sender<Box<dyn 'static + Send + FnOnce(&mut rusqlite::Connection)>>,
 37}
 38
 39impl VectorDatabase {
 40    pub async fn new(
 41        fs: Arc<dyn Fs>,
 42        path: Arc<Path>,
 43        executor: Arc<executor::Background>,
 44    ) -> Result<Self> {
 45        if let Some(db_directory) = path.parent() {
 46            fs.create_dir(db_directory).await?;
 47        }
 48
 49        let (transactions_tx, transactions_rx) = smol::channel::unbounded::<
 50            Box<dyn 'static + Send + FnOnce(&mut rusqlite::Connection)>,
 51        >();
 52        executor
 53            .spawn({
 54                let path = path.clone();
 55                async move {
 56                    let mut connection = rusqlite::Connection::open(&path)?;
 57                    while let Ok(transaction) = transactions_rx.recv().await {
 58                        transaction(&mut connection);
 59                    }
 60
 61                    anyhow::Ok(())
 62                }
 63                .log_err()
 64            })
 65            .detach();
 66        let this = Self {
 67            transactions: transactions_tx,
 68            path,
 69        };
 70        this.initialize_database().await?;
 71        Ok(this)
 72    }
 73
 74    pub fn path(&self) -> &Arc<Path> {
 75        &self.path
 76    }
 77
 78    fn transact<F, T>(&self, f: F) -> impl Future<Output = Result<T>>
 79    where
 80        F: 'static + Send + FnOnce(&rusqlite::Transaction) -> Result<T>,
 81        T: 'static + Send,
 82    {
 83        let (tx, rx) = oneshot::channel();
 84        let transactions = self.transactions.clone();
 85        async move {
 86            if transactions
 87                .send(Box::new(|connection| {
 88                    let result = connection
 89                        .transaction()
 90                        .map_err(|err| anyhow!(err))
 91                        .and_then(|transaction| {
 92                            let result = f(&transaction)?;
 93                            transaction.commit()?;
 94                            Ok(result)
 95                        });
 96                    let _ = tx.send(result);
 97                }))
 98                .await
 99                .is_err()
100            {
101                return Err(anyhow!("connection was dropped"))?;
102            }
103            rx.await?
104        }
105    }
106
107    fn initialize_database(&self) -> impl Future<Output = Result<()>> {
108        self.transact(|db| {
109            rusqlite::vtab::array::load_module(&db)?;
110
111            // Delete existing tables, if SEMANTIC_INDEX_VERSION is bumped
112            let version_query = db.prepare("SELECT version from semantic_index_config");
113            let version = version_query
114                .and_then(|mut query| query.query_row([], |row| Ok(row.get::<_, i64>(0)?)));
115            if version.map_or(false, |version| version == SEMANTIC_INDEX_VERSION as i64) {
116                log::trace!("vector database schema up to date");
117                return Ok(());
118            }
119
120            log::trace!("vector database schema out of date. updating...");
121            db.execute("DROP TABLE IF EXISTS documents", [])
122                .context("failed to drop 'documents' table")?;
123            db.execute("DROP TABLE IF EXISTS files", [])
124                .context("failed to drop 'files' table")?;
125            db.execute("DROP TABLE IF EXISTS worktrees", [])
126                .context("failed to drop 'worktrees' table")?;
127            db.execute("DROP TABLE IF EXISTS semantic_index_config", [])
128                .context("failed to drop 'semantic_index_config' table")?;
129
130            // Initialize Vector Databasing Tables
131            db.execute(
132                "CREATE TABLE semantic_index_config (
133                    version INTEGER NOT NULL
134                )",
135                [],
136            )?;
137
138            db.execute(
139                "INSERT INTO semantic_index_config (version) VALUES (?1)",
140                params![SEMANTIC_INDEX_VERSION],
141            )?;
142
143            db.execute(
144                "CREATE TABLE worktrees (
145                    id INTEGER PRIMARY KEY AUTOINCREMENT,
146                    absolute_path VARCHAR NOT NULL
147                );
148                CREATE UNIQUE INDEX worktrees_absolute_path ON worktrees (absolute_path);
149                ",
150                [],
151            )?;
152
153            db.execute(
154                "CREATE TABLE files (
155                    id INTEGER PRIMARY KEY AUTOINCREMENT,
156                    worktree_id INTEGER NOT NULL,
157                    relative_path VARCHAR NOT NULL,
158                    mtime_seconds INTEGER NOT NULL,
159                    mtime_nanos INTEGER NOT NULL,
160                    FOREIGN KEY(worktree_id) REFERENCES worktrees(id) ON DELETE CASCADE
161                )",
162                [],
163            )?;
164
165            db.execute(
166                "CREATE TABLE documents (
167                    id INTEGER PRIMARY KEY AUTOINCREMENT,
168                    file_id INTEGER NOT NULL,
169                    start_byte INTEGER NOT NULL,
170                    end_byte INTEGER NOT NULL,
171                    name VARCHAR NOT NULL,
172                    embedding BLOB NOT NULL,
173                    digest BLOB NOT NULL,
174                    FOREIGN KEY(file_id) REFERENCES files(id) ON DELETE CASCADE
175                )",
176                [],
177            )?;
178
179            log::trace!("vector database initialized with updated schema.");
180            Ok(())
181        })
182    }
183
184    pub fn delete_file(
185        &self,
186        worktree_id: i64,
187        delete_path: PathBuf,
188    ) -> impl Future<Output = Result<()>> {
189        self.transact(move |db| {
190            db.execute(
191                "DELETE FROM files WHERE worktree_id = ?1 AND relative_path = ?2",
192                params![worktree_id, delete_path.to_str()],
193            )?;
194            Ok(())
195        })
196    }
197
198    pub fn insert_file(
199        &self,
200        worktree_id: i64,
201        path: PathBuf,
202        mtime: SystemTime,
203        documents: Vec<Document>,
204    ) -> impl Future<Output = Result<()>> {
205        self.transact(move |db| {
206            // Return the existing ID, if both the file and mtime match
207            let mtime = Timestamp::from(mtime);
208
209            let mut existing_id_query = db.prepare("SELECT id FROM files WHERE worktree_id = ?1 AND relative_path = ?2 AND mtime_seconds = ?3 AND mtime_nanos = ?4")?;
210            let existing_id = existing_id_query
211                .query_row(
212                    params![worktree_id, path.to_str(), mtime.seconds, mtime.nanos],
213                    |row| Ok(row.get::<_, i64>(0)?),
214                );
215
216            let file_id = if existing_id.is_ok() {
217                // If already exists, just return the existing id
218                existing_id?
219            } else {
220                // Delete Existing Row
221                db.execute(
222                    "DELETE FROM files WHERE worktree_id = ?1 AND relative_path = ?2;",
223                    params![worktree_id, path.to_str()],
224                )?;
225                db.execute("INSERT INTO files (worktree_id, relative_path, mtime_seconds, mtime_nanos) VALUES (?1, ?2, ?3, ?4);", params![worktree_id, path.to_str(), mtime.seconds, mtime.nanos])?;
226                db.last_insert_rowid()
227            };
228
229            // Currently inserting at approximately 3400 documents a second
230            // I imagine we can speed this up with a bulk insert of some kind.
231            for document in documents {
232                db.execute(
233                    "INSERT INTO documents (file_id, start_byte, end_byte, name, embedding, digest) VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
234                    params![
235                        file_id,
236                        document.range.start.to_string(),
237                        document.range.end.to_string(),
238                        document.name,
239                        document.embedding,
240                        document.digest
241                    ],
242                )?;
243           }
244
245           Ok(())
246        })
247    }
248
249    pub fn worktree_previously_indexed(
250        &self,
251        worktree_root_path: &Path,
252    ) -> impl Future<Output = Result<bool>> {
253        let worktree_root_path = worktree_root_path.to_string_lossy().into_owned();
254        self.transact(move |db| {
255            let mut worktree_query =
256                db.prepare("SELECT id FROM worktrees WHERE absolute_path = ?1")?;
257            let worktree_id = worktree_query
258                .query_row(params![worktree_root_path], |row| Ok(row.get::<_, i64>(0)?));
259
260            if worktree_id.is_ok() {
261                return Ok(true);
262            } else {
263                return Ok(false);
264            }
265        })
266    }
267
268    pub fn embeddings_for_file(
269        &self,
270        worktree_id: i64,
271        relative_path: PathBuf,
272    ) -> impl Future<Output = Result<HashMap<DocumentDigest, Embedding>>> {
273        let relative_path = relative_path.to_string_lossy().into_owned();
274        self.transact(move |db| {
275            let mut query = db.prepare("SELECT digest, embedding FROM documents LEFT JOIN files ON files.id = documents.file_id WHERE files.worktree_id = ?1 AND files.relative_path = ?2")?;
276            let mut result: HashMap<DocumentDigest, Embedding> = HashMap::new();
277            for row in query.query_map(params![worktree_id, relative_path], |row| {
278                Ok((row.get::<_, DocumentDigest>(0)?.into(), row.get::<_, Embedding>(1)?.into()))
279            })? {
280                let row = row?;
281                result.insert(row.0, row.1);
282            }
283            Ok(result)
284        })
285    }
286
287    pub fn embeddings_for_files(
288        &self,
289        worktree_id_file_paths: Vec<(i64, PathBuf)>,
290    ) -> impl Future<Output = Result<HashMap<DocumentDigest, Embedding>>> {
291        todo!();
292        // The remainder of the code is wired up.
293        // I'm having a bit of trouble figuring out the rusqlite syntax for a WHERE (files.worktree_id, files.relative_path) IN (VALUES (?, ?), (?, ?)) query
294        async { Ok(HashMap::new()) }
295        // let mut embeddings_by_digest = HashMap::new();
296        // self.transact(move |db| {
297
298        //     let worktree_ids: Rc<Vec<Value>> = Rc::new(
299        //         worktree_id_file_paths
300        //             .iter()
301        //             .map(|(id, _)| Value::from(*id))
302        //             .collect(),
303        //     );
304        //     let file_paths: Rc<Vec<Value>> = Rc::new(worktree_id_file_paths
305        //         .iter()
306        //         .map(|(_, path)| Value::from(path.to_string_lossy().to_string()))
307        //         .collect());
308
309        //     let mut query = db.prepare("SELECT digest, embedding FROM documents LEFT JOIN files ON files.id = documents.file_id WHERE (files.worktree_id, files.relative_path) IN (VALUES (rarray = (?1), rarray = (?2))")?;
310
311        //     for row in query.query_map(params![worktree_ids, file_paths], |row| {
312        //         Ok((row.get::<_, DocumentDigest>(0)?, row.get::<_, Embedding>(1)?))
313        //     })? {
314        //         if let Ok(row) = row {
315        //             embeddings_by_digest.insert(row.0, row.1);
316        //         }
317        //     }
318        //     Ok(embeddings_by_digest)
319        // })
320    }
321
322    pub fn find_or_create_worktree(
323        &self,
324        worktree_root_path: PathBuf,
325    ) -> impl Future<Output = Result<i64>> {
326        self.transact(move |db| {
327            let mut worktree_query =
328                db.prepare("SELECT id FROM worktrees WHERE absolute_path = ?1")?;
329            let worktree_id = worktree_query
330                .query_row(params![worktree_root_path.to_string_lossy()], |row| {
331                    Ok(row.get::<_, i64>(0)?)
332                });
333
334            if worktree_id.is_ok() {
335                return Ok(worktree_id?);
336            }
337
338            // If worktree_id is Err, insert new worktree
339            db.execute(
340                "INSERT into worktrees (absolute_path) VALUES (?1)",
341                params![worktree_root_path.to_string_lossy()],
342            )?;
343            Ok(db.last_insert_rowid())
344        })
345    }
346
347    pub fn get_file_mtimes(
348        &self,
349        worktree_id: i64,
350    ) -> impl Future<Output = Result<HashMap<PathBuf, SystemTime>>> {
351        self.transact(move |db| {
352            let mut statement = db.prepare(
353                "
354                SELECT relative_path, mtime_seconds, mtime_nanos
355                FROM files
356                WHERE worktree_id = ?1
357                ORDER BY relative_path",
358            )?;
359            let mut result: HashMap<PathBuf, SystemTime> = HashMap::new();
360            for row in statement.query_map(params![worktree_id], |row| {
361                Ok((
362                    row.get::<_, String>(0)?.into(),
363                    Timestamp {
364                        seconds: row.get(1)?,
365                        nanos: row.get(2)?,
366                    }
367                    .into(),
368                ))
369            })? {
370                let row = row?;
371                result.insert(row.0, row.1);
372            }
373            Ok(result)
374        })
375    }
376
377    pub fn top_k_search(
378        &self,
379        query_embedding: &Embedding,
380        limit: usize,
381        file_ids: &[i64],
382    ) -> impl Future<Output = Result<Vec<(i64, f32)>>> {
383        let query_embedding = query_embedding.clone();
384        let file_ids = file_ids.to_vec();
385        self.transact(move |db| {
386            let mut results = Vec::<(i64, f32)>::with_capacity(limit + 1);
387            Self::for_each_document(db, &file_ids, |id, embedding| {
388                let similarity = embedding.similarity(&query_embedding);
389                let ix = match results.binary_search_by(|(_, s)| {
390                    similarity.partial_cmp(&s).unwrap_or(Ordering::Equal)
391                }) {
392                    Ok(ix) => ix,
393                    Err(ix) => ix,
394                };
395                results.insert(ix, (id, similarity));
396                results.truncate(limit);
397            })?;
398
399            anyhow::Ok(results)
400        })
401    }
402
403    pub fn retrieve_included_file_ids(
404        &self,
405        worktree_ids: &[i64],
406        includes: &[PathMatcher],
407        excludes: &[PathMatcher],
408    ) -> impl Future<Output = Result<Vec<i64>>> {
409        let worktree_ids = worktree_ids.to_vec();
410        let includes = includes.to_vec();
411        let excludes = excludes.to_vec();
412        self.transact(move |db| {
413            let mut file_query = db.prepare(
414                "
415                SELECT
416                    id, relative_path
417                FROM
418                    files
419                WHERE
420                    worktree_id IN rarray(?)
421                ",
422            )?;
423
424            let mut file_ids = Vec::<i64>::new();
425            let mut rows = file_query.query([ids_to_sql(&worktree_ids)])?;
426
427            while let Some(row) = rows.next()? {
428                let file_id = row.get(0)?;
429                let relative_path = row.get_ref(1)?.as_str()?;
430                let included =
431                    includes.is_empty() || includes.iter().any(|glob| glob.is_match(relative_path));
432                let excluded = excludes.iter().any(|glob| glob.is_match(relative_path));
433                if included && !excluded {
434                    file_ids.push(file_id);
435                }
436            }
437
438            anyhow::Ok(file_ids)
439        })
440    }
441
442    fn for_each_document(
443        db: &rusqlite::Connection,
444        file_ids: &[i64],
445        mut f: impl FnMut(i64, Embedding),
446    ) -> Result<()> {
447        let mut query_statement = db.prepare(
448            "
449            SELECT
450                id, embedding
451            FROM
452                documents
453            WHERE
454                file_id IN rarray(?)
455            ",
456        )?;
457
458        query_statement
459            .query_map(params![ids_to_sql(&file_ids)], |row| {
460                Ok((row.get(0)?, row.get::<_, Embedding>(1)?))
461            })?
462            .filter_map(|row| row.ok())
463            .for_each(|(id, embedding)| f(id, embedding));
464        Ok(())
465    }
466
467    pub fn get_documents_by_ids(
468        &self,
469        ids: &[i64],
470    ) -> impl Future<Output = Result<Vec<(i64, PathBuf, Range<usize>)>>> {
471        let ids = ids.to_vec();
472        self.transact(move |db| {
473            let mut statement = db.prepare(
474                "
475                    SELECT
476                        documents.id,
477                        files.worktree_id,
478                        files.relative_path,
479                        documents.start_byte,
480                        documents.end_byte
481                    FROM
482                        documents, files
483                    WHERE
484                        documents.file_id = files.id AND
485                        documents.id in rarray(?)
486                ",
487            )?;
488
489            let result_iter = statement.query_map(params![ids_to_sql(&ids)], |row| {
490                Ok((
491                    row.get::<_, i64>(0)?,
492                    row.get::<_, i64>(1)?,
493                    row.get::<_, String>(2)?.into(),
494                    row.get(3)?..row.get(4)?,
495                ))
496            })?;
497
498            let mut values_by_id = HashMap::<i64, (i64, PathBuf, Range<usize>)>::default();
499            for row in result_iter {
500                let (id, worktree_id, path, range) = row?;
501                values_by_id.insert(id, (worktree_id, path, range));
502            }
503
504            let mut results = Vec::with_capacity(ids.len());
505            for id in &ids {
506                let value = values_by_id
507                    .remove(id)
508                    .ok_or(anyhow!("missing document id {}", id))?;
509                results.push(value);
510            }
511
512            Ok(results)
513        })
514    }
515}
516
517fn ids_to_sql(ids: &[i64]) -> Rc<Vec<rusqlite::types::Value>> {
518    Rc::new(
519        ids.iter()
520            .copied()
521            .map(|v| rusqlite::types::Value::from(v))
522            .collect::<Vec<_>>(),
523    )
524}