db.rs

  1use crate::{
  2    embedding::Embedding,
  3    parsing::{Span, SpanDigest},
  4    SEMANTIC_INDEX_VERSION,
  5};
  6use anyhow::{anyhow, Context, Result};
  7use collections::HashMap;
  8use futures::channel::oneshot;
  9use gpui::executor;
 10use project::{search::PathMatcher, Fs};
 11use rpc::proto::Timestamp;
 12use rusqlite::params;
 13use rusqlite::types::Value;
 14use std::{
 15    cmp::Ordering,
 16    future::Future,
 17    ops::Range,
 18    path::{Path, PathBuf},
 19    rc::Rc,
 20    sync::Arc,
 21    time::{Instant, SystemTime},
 22};
 23use util::TryFutureExt;
 24
 25#[derive(Debug)]
 26pub struct FileRecord {
 27    pub id: usize,
 28    pub relative_path: String,
 29    pub mtime: Timestamp,
 30}
 31
 32#[derive(Clone)]
 33pub struct VectorDatabase {
 34    path: Arc<Path>,
 35    transactions:
 36        smol::channel::Sender<Box<dyn 'static + Send + FnOnce(&mut rusqlite::Connection)>>,
 37}
 38
 39impl VectorDatabase {
 40    pub async fn new(
 41        fs: Arc<dyn Fs>,
 42        path: Arc<Path>,
 43        executor: Arc<executor::Background>,
 44    ) -> Result<Self> {
 45        if let Some(db_directory) = path.parent() {
 46            fs.create_dir(db_directory).await?;
 47        }
 48
 49        let (transactions_tx, transactions_rx) = smol::channel::unbounded::<
 50            Box<dyn 'static + Send + FnOnce(&mut rusqlite::Connection)>,
 51        >();
 52        executor
 53            .spawn({
 54                let path = path.clone();
 55                async move {
 56                    let mut connection = rusqlite::Connection::open(&path)?;
 57
 58                    connection.pragma_update(None, "journal_mode", "wal")?;
 59                    connection.pragma_update(None, "synchronous", "normal")?;
 60                    connection.pragma_update(None, "cache_size", 1000000)?;
 61                    connection.pragma_update(None, "temp_store", "MEMORY")?;
 62
 63                    while let Ok(transaction) = transactions_rx.recv().await {
 64                        transaction(&mut connection);
 65                    }
 66
 67                    anyhow::Ok(())
 68                }
 69                .log_err()
 70            })
 71            .detach();
 72        let this = Self {
 73            transactions: transactions_tx,
 74            path,
 75        };
 76        this.initialize_database().await?;
 77        Ok(this)
 78    }
 79
 80    pub fn path(&self) -> &Arc<Path> {
 81        &self.path
 82    }
 83
 84    fn transact<F, T>(&self, f: F) -> impl Future<Output = Result<T>>
 85    where
 86        F: 'static + Send + FnOnce(&rusqlite::Transaction) -> Result<T>,
 87        T: 'static + Send,
 88    {
 89        let (tx, rx) = oneshot::channel();
 90        let transactions = self.transactions.clone();
 91        async move {
 92            if transactions
 93                .send(Box::new(|connection| {
 94                    let result = connection
 95                        .transaction()
 96                        .map_err(|err| anyhow!(err))
 97                        .and_then(|transaction| {
 98                            let result = f(&transaction)?;
 99                            transaction.commit()?;
100                            Ok(result)
101                        });
102                    let _ = tx.send(result);
103                }))
104                .await
105                .is_err()
106            {
107                return Err(anyhow!("connection was dropped"))?;
108            }
109            rx.await?
110        }
111    }
112
113    fn initialize_database(&self) -> impl Future<Output = Result<()>> {
114        self.transact(|db| {
115            rusqlite::vtab::array::load_module(&db)?;
116
117            // Delete existing tables, if SEMANTIC_INDEX_VERSION is bumped
118            let version_query = db.prepare("SELECT version from semantic_index_config");
119            let version = version_query
120                .and_then(|mut query| query.query_row([], |row| Ok(row.get::<_, i64>(0)?)));
121            if version.map_or(false, |version| version == SEMANTIC_INDEX_VERSION as i64) {
122                log::trace!("vector database schema up to date");
123                return Ok(());
124            }
125
126            log::trace!("vector database schema out of date. updating...");
127            // We renamed the `documents` table to `spans`, so we want to drop
128            // `documents` without recreating it if it exists.
129            db.execute("DROP TABLE IF EXISTS documents", [])
130                .context("failed to drop 'documents' table")?;
131            db.execute("DROP TABLE IF EXISTS spans", [])
132                .context("failed to drop 'spans' table")?;
133            db.execute("DROP TABLE IF EXISTS files", [])
134                .context("failed to drop 'files' table")?;
135            db.execute("DROP TABLE IF EXISTS worktrees", [])
136                .context("failed to drop 'worktrees' table")?;
137            db.execute("DROP TABLE IF EXISTS semantic_index_config", [])
138                .context("failed to drop 'semantic_index_config' table")?;
139
140            // Initialize Vector Databasing Tables
141            db.execute(
142                "CREATE TABLE semantic_index_config (
143                    version INTEGER NOT NULL
144                )",
145                [],
146            )?;
147
148            db.execute(
149                "INSERT INTO semantic_index_config (version) VALUES (?1)",
150                params![SEMANTIC_INDEX_VERSION],
151            )?;
152
153            db.execute(
154                "CREATE TABLE worktrees (
155                    id INTEGER PRIMARY KEY AUTOINCREMENT,
156                    absolute_path VARCHAR NOT NULL
157                );
158                CREATE UNIQUE INDEX worktrees_absolute_path ON worktrees (absolute_path);
159                ",
160                [],
161            )?;
162
163            db.execute(
164                "CREATE TABLE files (
165                    id INTEGER PRIMARY KEY AUTOINCREMENT,
166                    worktree_id INTEGER NOT NULL,
167                    relative_path VARCHAR NOT NULL,
168                    mtime_seconds INTEGER NOT NULL,
169                    mtime_nanos INTEGER NOT NULL,
170                    FOREIGN KEY(worktree_id) REFERENCES worktrees(id) ON DELETE CASCADE
171                )",
172                [],
173            )?;
174
175            db.execute(
176                "CREATE UNIQUE INDEX files_worktree_id_and_relative_path ON files (worktree_id, relative_path)",
177                [],
178            )?;
179
180            db.execute(
181                "CREATE TABLE spans (
182                    id INTEGER PRIMARY KEY AUTOINCREMENT,
183                    file_id INTEGER NOT NULL,
184                    start_byte INTEGER NOT NULL,
185                    end_byte INTEGER NOT NULL,
186                    name VARCHAR NOT NULL,
187                    embedding BLOB NOT NULL,
188                    digest BLOB NOT NULL,
189                    FOREIGN KEY(file_id) REFERENCES files(id) ON DELETE CASCADE
190                )",
191                [],
192            )?;
193
194            log::trace!("vector database initialized with updated schema.");
195            Ok(())
196        })
197    }
198
199    pub fn delete_file(
200        &self,
201        worktree_id: i64,
202        delete_path: Arc<Path>,
203    ) -> impl Future<Output = Result<()>> {
204        self.transact(move |db| {
205            db.execute(
206                "DELETE FROM files WHERE worktree_id = ?1 AND relative_path = ?2",
207                params![worktree_id, delete_path.to_str()],
208            )?;
209            Ok(())
210        })
211    }
212
213    pub fn insert_file(
214        &self,
215        worktree_id: i64,
216        path: Arc<Path>,
217        mtime: SystemTime,
218        spans: Vec<Span>,
219    ) -> impl Future<Output = Result<()>> {
220        self.transact(move |db| {
221            // Return the existing ID, if both the file and mtime match
222            let mtime = Timestamp::from(mtime);
223
224            db.execute(
225                "
226                REPLACE INTO files
227                (worktree_id, relative_path, mtime_seconds, mtime_nanos)
228                VALUES (?1, ?2, ?3, ?4)
229                ",
230                params![worktree_id, path.to_str(), mtime.seconds, mtime.nanos],
231            )?;
232
233            let file_id = db.last_insert_rowid();
234
235            let t0 = Instant::now();
236            let mut query = db.prepare(
237                "
238                INSERT INTO spans
239                (file_id, start_byte, end_byte, name, embedding, digest)
240                VALUES (?1, ?2, ?3, ?4, ?5, ?6)
241                ",
242            )?;
243            log::trace!(
244                "Preparing Query Took: {:?} milliseconds",
245                t0.elapsed().as_millis()
246            );
247
248            for span in spans {
249                query.execute(params![
250                    file_id,
251                    span.range.start.to_string(),
252                    span.range.end.to_string(),
253                    span.name,
254                    span.embedding,
255                    span.digest
256                ])?;
257            }
258
259            Ok(())
260        })
261    }
262
263    pub fn worktree_previously_indexed(
264        &self,
265        worktree_root_path: &Path,
266    ) -> impl Future<Output = Result<bool>> {
267        let worktree_root_path = worktree_root_path.to_string_lossy().into_owned();
268        self.transact(move |db| {
269            let mut worktree_query =
270                db.prepare("SELECT id FROM worktrees WHERE absolute_path = ?1")?;
271            let worktree_id = worktree_query
272                .query_row(params![worktree_root_path], |row| Ok(row.get::<_, i64>(0)?));
273
274            if worktree_id.is_ok() {
275                return Ok(true);
276            } else {
277                return Ok(false);
278            }
279        })
280    }
281
282    pub fn embeddings_for_files(
283        &self,
284        worktree_id_file_paths: HashMap<i64, Vec<Arc<Path>>>,
285    ) -> impl Future<Output = Result<HashMap<SpanDigest, Embedding>>> {
286        self.transact(move |db| {
287            let mut query = db.prepare(
288                "
289                SELECT digest, embedding
290                FROM spans
291                LEFT JOIN files ON files.id = spans.file_id
292                WHERE files.worktree_id = ? AND files.relative_path IN rarray(?)
293            ",
294            )?;
295            let mut embeddings_by_digest = HashMap::default();
296            for (worktree_id, file_paths) in worktree_id_file_paths {
297                let file_paths = Rc::new(
298                    file_paths
299                        .into_iter()
300                        .map(|p| Value::Text(p.to_string_lossy().into_owned()))
301                        .collect::<Vec<_>>(),
302                );
303                let rows = query.query_map(params![worktree_id, file_paths], |row| {
304                    Ok((row.get::<_, SpanDigest>(0)?, row.get::<_, Embedding>(1)?))
305                })?;
306
307                for row in rows {
308                    if let Ok(row) = row {
309                        embeddings_by_digest.insert(row.0, row.1);
310                    }
311                }
312            }
313
314            Ok(embeddings_by_digest)
315        })
316    }
317
318    pub fn find_or_create_worktree(
319        &self,
320        worktree_root_path: Arc<Path>,
321    ) -> impl Future<Output = Result<i64>> {
322        self.transact(move |db| {
323            let mut worktree_query =
324                db.prepare("SELECT id FROM worktrees WHERE absolute_path = ?1")?;
325            let worktree_id = worktree_query
326                .query_row(params![worktree_root_path.to_string_lossy()], |row| {
327                    Ok(row.get::<_, i64>(0)?)
328                });
329
330            if worktree_id.is_ok() {
331                return Ok(worktree_id?);
332            }
333
334            // If worktree_id is Err, insert new worktree
335            db.execute(
336                "INSERT into worktrees (absolute_path) VALUES (?1)",
337                params![worktree_root_path.to_string_lossy()],
338            )?;
339            Ok(db.last_insert_rowid())
340        })
341    }
342
343    pub fn get_file_mtimes(
344        &self,
345        worktree_id: i64,
346    ) -> impl Future<Output = Result<HashMap<PathBuf, SystemTime>>> {
347        self.transact(move |db| {
348            let mut statement = db.prepare(
349                "
350                SELECT relative_path, mtime_seconds, mtime_nanos
351                FROM files
352                WHERE worktree_id = ?1
353                ORDER BY relative_path",
354            )?;
355            let mut result: HashMap<PathBuf, SystemTime> = HashMap::default();
356            for row in statement.query_map(params![worktree_id], |row| {
357                Ok((
358                    row.get::<_, String>(0)?.into(),
359                    Timestamp {
360                        seconds: row.get(1)?,
361                        nanos: row.get(2)?,
362                    }
363                    .into(),
364                ))
365            })? {
366                let row = row?;
367                result.insert(row.0, row.1);
368            }
369            Ok(result)
370        })
371    }
372
373    pub fn top_k_search(
374        &self,
375        query_embedding: &Embedding,
376        limit: usize,
377        file_ids: &[i64],
378    ) -> impl Future<Output = Result<Vec<(i64, f32)>>> {
379        let query_embedding = query_embedding.clone();
380        let file_ids = file_ids.to_vec();
381        self.transact(move |db| {
382            let mut results = Vec::<(i64, f32)>::with_capacity(limit + 1);
383            Self::for_each_span(db, &file_ids, |id, embedding| {
384                let similarity = embedding.similarity(&query_embedding);
385                let ix = match results.binary_search_by(|(_, s)| {
386                    similarity.partial_cmp(&s).unwrap_or(Ordering::Equal)
387                }) {
388                    Ok(ix) => ix,
389                    Err(ix) => ix,
390                };
391                results.insert(ix, (id, similarity));
392                results.truncate(limit);
393            })?;
394
395            anyhow::Ok(results)
396        })
397    }
398
399    pub fn retrieve_included_file_ids(
400        &self,
401        worktree_ids: &[i64],
402        includes: &[PathMatcher],
403        excludes: &[PathMatcher],
404    ) -> impl Future<Output = Result<Vec<i64>>> {
405        let worktree_ids = worktree_ids.to_vec();
406        let includes = includes.to_vec();
407        let excludes = excludes.to_vec();
408        self.transact(move |db| {
409            let mut file_query = db.prepare(
410                "
411                SELECT
412                    id, relative_path
413                FROM
414                    files
415                WHERE
416                    worktree_id IN rarray(?)
417                ",
418            )?;
419
420            let mut file_ids = Vec::<i64>::new();
421            let mut rows = file_query.query([ids_to_sql(&worktree_ids)])?;
422
423            while let Some(row) = rows.next()? {
424                let file_id = row.get(0)?;
425                let relative_path = row.get_ref(1)?.as_str()?;
426                let included =
427                    includes.is_empty() || includes.iter().any(|glob| glob.is_match(relative_path));
428                let excluded = excludes.iter().any(|glob| glob.is_match(relative_path));
429                if included && !excluded {
430                    file_ids.push(file_id);
431                }
432            }
433
434            anyhow::Ok(file_ids)
435        })
436    }
437
438    fn for_each_span(
439        db: &rusqlite::Connection,
440        file_ids: &[i64],
441        mut f: impl FnMut(i64, Embedding),
442    ) -> Result<()> {
443        let mut query_statement = db.prepare(
444            "
445            SELECT
446                id, embedding
447            FROM
448                spans
449            WHERE
450                file_id IN rarray(?)
451            ",
452        )?;
453
454        query_statement
455            .query_map(params![ids_to_sql(&file_ids)], |row| {
456                Ok((row.get(0)?, row.get::<_, Embedding>(1)?))
457            })?
458            .filter_map(|row| row.ok())
459            .for_each(|(id, embedding)| f(id, embedding));
460        Ok(())
461    }
462
463    pub fn spans_for_ids(
464        &self,
465        ids: &[i64],
466    ) -> impl Future<Output = Result<Vec<(i64, PathBuf, Range<usize>)>>> {
467        let ids = ids.to_vec();
468        self.transact(move |db| {
469            let mut statement = db.prepare(
470                "
471                    SELECT
472                        spans.id,
473                        files.worktree_id,
474                        files.relative_path,
475                        spans.start_byte,
476                        spans.end_byte
477                    FROM
478                        spans, files
479                    WHERE
480                        spans.file_id = files.id AND
481                        spans.id in rarray(?)
482                ",
483            )?;
484
485            let result_iter = statement.query_map(params![ids_to_sql(&ids)], |row| {
486                Ok((
487                    row.get::<_, i64>(0)?,
488                    row.get::<_, i64>(1)?,
489                    row.get::<_, String>(2)?.into(),
490                    row.get(3)?..row.get(4)?,
491                ))
492            })?;
493
494            let mut values_by_id = HashMap::<i64, (i64, PathBuf, Range<usize>)>::default();
495            for row in result_iter {
496                let (id, worktree_id, path, range) = row?;
497                values_by_id.insert(id, (worktree_id, path, range));
498            }
499
500            let mut results = Vec::with_capacity(ids.len());
501            for id in &ids {
502                let value = values_by_id
503                    .remove(id)
504                    .ok_or(anyhow!("missing span id {}", id))?;
505                results.push(value);
506            }
507
508            Ok(results)
509        })
510    }
511}
512
513fn ids_to_sql(ids: &[i64]) -> Rc<Vec<rusqlite::types::Value>> {
514    Rc::new(
515        ids.iter()
516            .copied()
517            .map(|v| rusqlite::types::Value::from(v))
518            .collect::<Vec<_>>(),
519    )
520}