db.rs

  1use crate::{
  2    embedding::Embedding,
  3    parsing::{Document, DocumentDigest},
  4    SEMANTIC_INDEX_VERSION,
  5};
  6use anyhow::{anyhow, Context, Result};
  7use futures::channel::oneshot;
  8use gpui::executor;
  9use project::{search::PathMatcher, Fs};
 10use rpc::proto::Timestamp;
 11use rusqlite::params;
 12use rusqlite::types::Value;
 13use std::{
 14    cmp::Ordering,
 15    collections::HashMap,
 16    future::Future,
 17    ops::Range,
 18    path::{Path, PathBuf},
 19    rc::Rc,
 20    sync::Arc,
 21    time::{Instant, SystemTime},
 22};
 23use util::TryFutureExt;
 24
 25#[derive(Debug)]
 26pub struct FileRecord {
 27    pub id: usize,
 28    pub relative_path: String,
 29    pub mtime: Timestamp,
 30}
 31
 32#[derive(Clone)]
 33pub struct VectorDatabase {
 34    path: Arc<Path>,
 35    transactions:
 36        smol::channel::Sender<Box<dyn 'static + Send + FnOnce(&mut rusqlite::Connection)>>,
 37}
 38
 39impl VectorDatabase {
 40    pub async fn new(
 41        fs: Arc<dyn Fs>,
 42        path: Arc<Path>,
 43        executor: Arc<executor::Background>,
 44    ) -> Result<Self> {
 45        if let Some(db_directory) = path.parent() {
 46            fs.create_dir(db_directory).await?;
 47        }
 48
 49        let (transactions_tx, transactions_rx) = smol::channel::unbounded::<
 50            Box<dyn 'static + Send + FnOnce(&mut rusqlite::Connection)>,
 51        >();
 52        executor
 53            .spawn({
 54                let path = path.clone();
 55                async move {
 56                    let mut connection = rusqlite::Connection::open(&path)?;
 57
 58                    connection.pragma_update(None, "journal_mode", "wal")?;
 59                    connection.pragma_update(None, "synchronous", "normal")?;
 60                    connection.pragma_update(None, "cache_size", 1000000)?;
 61                    connection.pragma_update(None, "temp_store", "MEMORY")?;
 62
 63                    while let Ok(transaction) = transactions_rx.recv().await {
 64                        transaction(&mut connection);
 65                    }
 66
 67                    anyhow::Ok(())
 68                }
 69                .log_err()
 70            })
 71            .detach();
 72        let this = Self {
 73            transactions: transactions_tx,
 74            path,
 75        };
 76        this.initialize_database().await?;
 77        Ok(this)
 78    }
 79
 80    pub fn path(&self) -> &Arc<Path> {
 81        &self.path
 82    }
 83
 84    fn transact<F, T>(&self, f: F) -> impl Future<Output = Result<T>>
 85    where
 86        F: 'static + Send + FnOnce(&rusqlite::Transaction) -> Result<T>,
 87        T: 'static + Send,
 88    {
 89        let (tx, rx) = oneshot::channel();
 90        let transactions = self.transactions.clone();
 91        async move {
 92            if transactions
 93                .send(Box::new(|connection| {
 94                    let result = connection
 95                        .transaction()
 96                        .map_err(|err| anyhow!(err))
 97                        .and_then(|transaction| {
 98                            let result = f(&transaction)?;
 99                            transaction.commit()?;
100                            Ok(result)
101                        });
102                    let _ = tx.send(result);
103                }))
104                .await
105                .is_err()
106            {
107                return Err(anyhow!("connection was dropped"))?;
108            }
109            rx.await?
110        }
111    }
112
113    fn initialize_database(&self) -> impl Future<Output = Result<()>> {
114        self.transact(|db| {
115            rusqlite::vtab::array::load_module(&db)?;
116
117            // Delete existing tables, if SEMANTIC_INDEX_VERSION is bumped
118            let version_query = db.prepare("SELECT version from semantic_index_config");
119            let version = version_query
120                .and_then(|mut query| query.query_row([], |row| Ok(row.get::<_, i64>(0)?)));
121            if version.map_or(false, |version| version == SEMANTIC_INDEX_VERSION as i64) {
122                log::trace!("vector database schema up to date");
123                return Ok(());
124            }
125
126            log::trace!("vector database schema out of date. updating...");
127            db.execute("DROP TABLE IF EXISTS documents", [])
128                .context("failed to drop 'documents' table")?;
129            db.execute("DROP TABLE IF EXISTS files", [])
130                .context("failed to drop 'files' table")?;
131            db.execute("DROP TABLE IF EXISTS worktrees", [])
132                .context("failed to drop 'worktrees' table")?;
133            db.execute("DROP TABLE IF EXISTS semantic_index_config", [])
134                .context("failed to drop 'semantic_index_config' table")?;
135
136            // Initialize Vector Databasing Tables
137            db.execute(
138                "CREATE TABLE semantic_index_config (
139                    version INTEGER NOT NULL
140                )",
141                [],
142            )?;
143
144            db.execute(
145                "INSERT INTO semantic_index_config (version) VALUES (?1)",
146                params![SEMANTIC_INDEX_VERSION],
147            )?;
148
149            db.execute(
150                "CREATE TABLE worktrees (
151                    id INTEGER PRIMARY KEY AUTOINCREMENT,
152                    absolute_path VARCHAR NOT NULL
153                );
154                CREATE UNIQUE INDEX worktrees_absolute_path ON worktrees (absolute_path);
155                ",
156                [],
157            )?;
158
159            db.execute(
160                "CREATE TABLE files (
161                    id INTEGER PRIMARY KEY AUTOINCREMENT,
162                    worktree_id INTEGER NOT NULL,
163                    relative_path VARCHAR NOT NULL,
164                    mtime_seconds INTEGER NOT NULL,
165                    mtime_nanos INTEGER NOT NULL,
166                    FOREIGN KEY(worktree_id) REFERENCES worktrees(id) ON DELETE CASCADE
167                )",
168                [],
169            )?;
170
171            db.execute(
172                "CREATE UNIQUE INDEX files_worktree_id_and_relative_path ON files (worktree_id, relative_path)",
173                [],
174            )?;
175
176            db.execute(
177                "CREATE TABLE documents (
178                    id INTEGER PRIMARY KEY AUTOINCREMENT,
179                    file_id INTEGER NOT NULL,
180                    start_byte INTEGER NOT NULL,
181                    end_byte INTEGER NOT NULL,
182                    name VARCHAR NOT NULL,
183                    embedding BLOB NOT NULL,
184                    digest BLOB NOT NULL,
185                    FOREIGN KEY(file_id) REFERENCES files(id) ON DELETE CASCADE
186                )",
187                [],
188            )?;
189
190            log::trace!("vector database initialized with updated schema.");
191            Ok(())
192        })
193    }
194
195    pub fn delete_file(
196        &self,
197        worktree_id: i64,
198        delete_path: PathBuf,
199    ) -> impl Future<Output = Result<()>> {
200        self.transact(move |db| {
201            db.execute(
202                "DELETE FROM files WHERE worktree_id = ?1 AND relative_path = ?2",
203                params![worktree_id, delete_path.to_str()],
204            )?;
205            Ok(())
206        })
207    }
208
209    pub fn insert_file(
210        &self,
211        worktree_id: i64,
212        path: PathBuf,
213        mtime: SystemTime,
214        documents: Vec<Document>,
215    ) -> impl Future<Output = Result<()>> {
216        self.transact(move |db| {
217            // Return the existing ID, if both the file and mtime match
218            let mtime = Timestamp::from(mtime);
219
220            db.execute(
221                "
222                REPLACE INTO files
223                (worktree_id, relative_path, mtime_seconds, mtime_nanos)
224                VALUES (?1, ?2, ?3, ?4)
225                ",
226                params![worktree_id, path.to_str(), mtime.seconds, mtime.nanos],
227            )?;
228
229            let file_id = db.last_insert_rowid();
230
231            let t0 = Instant::now();
232            let mut query = db.prepare(
233                "
234                INSERT INTO documents
235                (file_id, start_byte, end_byte, name, embedding, digest)
236                VALUES (?1, ?2, ?3, ?4, ?5, ?6)
237                ",
238            )?;
239            log::trace!(
240                "Preparing Query Took: {:?} milliseconds",
241                t0.elapsed().as_millis()
242            );
243
244            for document in documents {
245                query.execute(params![
246                    file_id,
247                    document.range.start.to_string(),
248                    document.range.end.to_string(),
249                    document.name,
250                    document.embedding,
251                    document.digest
252                ])?;
253            }
254
255            Ok(())
256        })
257    }
258
259    pub fn worktree_previously_indexed(
260        &self,
261        worktree_root_path: &Path,
262    ) -> impl Future<Output = Result<bool>> {
263        let worktree_root_path = worktree_root_path.to_string_lossy().into_owned();
264        self.transact(move |db| {
265            let mut worktree_query =
266                db.prepare("SELECT id FROM worktrees WHERE absolute_path = ?1")?;
267            let worktree_id = worktree_query
268                .query_row(params![worktree_root_path], |row| Ok(row.get::<_, i64>(0)?));
269
270            if worktree_id.is_ok() {
271                return Ok(true);
272            } else {
273                return Ok(false);
274            }
275        })
276    }
277
278    pub fn embeddings_for_files(
279        &self,
280        worktree_id_file_paths: HashMap<i64, Vec<Arc<Path>>>,
281    ) -> impl Future<Output = Result<HashMap<DocumentDigest, Embedding>>> {
282        self.transact(move |db| {
283            let mut query = db.prepare(
284                "
285                SELECT digest, embedding
286                FROM documents
287                LEFT JOIN files ON files.id = documents.file_id
288                WHERE files.worktree_id = ? AND files.relative_path IN rarray(?)
289            ",
290            )?;
291            let mut embeddings_by_digest = HashMap::new();
292            for (worktree_id, file_paths) in worktree_id_file_paths {
293                let file_paths = Rc::new(
294                    file_paths
295                        .into_iter()
296                        .map(|p| Value::Text(p.to_string_lossy().into_owned()))
297                        .collect::<Vec<_>>(),
298                );
299                let rows = query.query_map(params![worktree_id, file_paths], |row| {
300                    Ok((
301                        row.get::<_, DocumentDigest>(0)?,
302                        row.get::<_, Embedding>(1)?,
303                    ))
304                })?;
305
306                for row in rows {
307                    if let Ok(row) = row {
308                        embeddings_by_digest.insert(row.0, row.1);
309                    }
310                }
311            }
312
313            Ok(embeddings_by_digest)
314        })
315    }
316
317    pub fn find_or_create_worktree(
318        &self,
319        worktree_root_path: PathBuf,
320    ) -> impl Future<Output = Result<i64>> {
321        self.transact(move |db| {
322            let mut worktree_query =
323                db.prepare("SELECT id FROM worktrees WHERE absolute_path = ?1")?;
324            let worktree_id = worktree_query
325                .query_row(params![worktree_root_path.to_string_lossy()], |row| {
326                    Ok(row.get::<_, i64>(0)?)
327                });
328
329            if worktree_id.is_ok() {
330                return Ok(worktree_id?);
331            }
332
333            // If worktree_id is Err, insert new worktree
334            db.execute(
335                "INSERT into worktrees (absolute_path) VALUES (?1)",
336                params![worktree_root_path.to_string_lossy()],
337            )?;
338            Ok(db.last_insert_rowid())
339        })
340    }
341
342    pub fn get_file_mtimes(
343        &self,
344        worktree_id: i64,
345    ) -> impl Future<Output = Result<HashMap<PathBuf, SystemTime>>> {
346        self.transact(move |db| {
347            let mut statement = db.prepare(
348                "
349                SELECT relative_path, mtime_seconds, mtime_nanos
350                FROM files
351                WHERE worktree_id = ?1
352                ORDER BY relative_path",
353            )?;
354            let mut result: HashMap<PathBuf, SystemTime> = HashMap::new();
355            for row in statement.query_map(params![worktree_id], |row| {
356                Ok((
357                    row.get::<_, String>(0)?.into(),
358                    Timestamp {
359                        seconds: row.get(1)?,
360                        nanos: row.get(2)?,
361                    }
362                    .into(),
363                ))
364            })? {
365                let row = row?;
366                result.insert(row.0, row.1);
367            }
368            Ok(result)
369        })
370    }
371
372    pub fn top_k_search(
373        &self,
374        query_embedding: &Embedding,
375        limit: usize,
376        file_ids: &[i64],
377    ) -> impl Future<Output = Result<Vec<(i64, f32)>>> {
378        let query_embedding = query_embedding.clone();
379        let file_ids = file_ids.to_vec();
380        self.transact(move |db| {
381            let mut results = Vec::<(i64, f32)>::with_capacity(limit + 1);
382            Self::for_each_document(db, &file_ids, |id, embedding| {
383                let similarity = embedding.similarity(&query_embedding);
384                let ix = match results.binary_search_by(|(_, s)| {
385                    similarity.partial_cmp(&s).unwrap_or(Ordering::Equal)
386                }) {
387                    Ok(ix) => ix,
388                    Err(ix) => ix,
389                };
390                results.insert(ix, (id, similarity));
391                results.truncate(limit);
392            })?;
393
394            anyhow::Ok(results)
395        })
396    }
397
398    pub fn retrieve_included_file_ids(
399        &self,
400        worktree_ids: &[i64],
401        includes: &[PathMatcher],
402        excludes: &[PathMatcher],
403    ) -> impl Future<Output = Result<Vec<i64>>> {
404        let worktree_ids = worktree_ids.to_vec();
405        let includes = includes.to_vec();
406        let excludes = excludes.to_vec();
407        self.transact(move |db| {
408            let mut file_query = db.prepare(
409                "
410                SELECT
411                    id, relative_path
412                FROM
413                    files
414                WHERE
415                    worktree_id IN rarray(?)
416                ",
417            )?;
418
419            let mut file_ids = Vec::<i64>::new();
420            let mut rows = file_query.query([ids_to_sql(&worktree_ids)])?;
421
422            while let Some(row) = rows.next()? {
423                let file_id = row.get(0)?;
424                let relative_path = row.get_ref(1)?.as_str()?;
425                let included =
426                    includes.is_empty() || includes.iter().any(|glob| glob.is_match(relative_path));
427                let excluded = excludes.iter().any(|glob| glob.is_match(relative_path));
428                if included && !excluded {
429                    file_ids.push(file_id);
430                }
431            }
432
433            anyhow::Ok(file_ids)
434        })
435    }
436
437    fn for_each_document(
438        db: &rusqlite::Connection,
439        file_ids: &[i64],
440        mut f: impl FnMut(i64, Embedding),
441    ) -> Result<()> {
442        let mut query_statement = db.prepare(
443            "
444            SELECT
445                id, embedding
446            FROM
447                documents
448            WHERE
449                file_id IN rarray(?)
450            ",
451        )?;
452
453        query_statement
454            .query_map(params![ids_to_sql(&file_ids)], |row| {
455                Ok((row.get(0)?, row.get::<_, Embedding>(1)?))
456            })?
457            .filter_map(|row| row.ok())
458            .for_each(|(id, embedding)| f(id, embedding));
459        Ok(())
460    }
461
462    pub fn get_documents_by_ids(
463        &self,
464        ids: &[i64],
465    ) -> impl Future<Output = Result<Vec<(i64, PathBuf, Range<usize>)>>> {
466        let ids = ids.to_vec();
467        self.transact(move |db| {
468            let mut statement = db.prepare(
469                "
470                    SELECT
471                        documents.id,
472                        files.worktree_id,
473                        files.relative_path,
474                        documents.start_byte,
475                        documents.end_byte
476                    FROM
477                        documents, files
478                    WHERE
479                        documents.file_id = files.id AND
480                        documents.id in rarray(?)
481                ",
482            )?;
483
484            let result_iter = statement.query_map(params![ids_to_sql(&ids)], |row| {
485                Ok((
486                    row.get::<_, i64>(0)?,
487                    row.get::<_, i64>(1)?,
488                    row.get::<_, String>(2)?.into(),
489                    row.get(3)?..row.get(4)?,
490                ))
491            })?;
492
493            let mut values_by_id = HashMap::<i64, (i64, PathBuf, Range<usize>)>::default();
494            for row in result_iter {
495                let (id, worktree_id, path, range) = row?;
496                values_by_id.insert(id, (worktree_id, path, range));
497            }
498
499            let mut results = Vec::with_capacity(ids.len());
500            for id in &ids {
501                let value = values_by_id
502                    .remove(id)
503                    .ok_or(anyhow!("missing document id {}", id))?;
504                results.push(value);
505            }
506
507            Ok(results)
508        })
509    }
510}
511
512fn ids_to_sql(ids: &[i64]) -> Rc<Vec<rusqlite::types::Value>> {
513    Rc::new(
514        ids.iter()
515            .copied()
516            .map(|v| rusqlite::types::Value::from(v))
517            .collect::<Vec<_>>(),
518    )
519}