db.rs

  1use crate::{
  2    embedding::Embedding,
  3    parsing::{Span, SpanDigest},
  4    SEMANTIC_INDEX_VERSION,
  5};
  6use anyhow::{anyhow, Context, Result};
  7use collections::HashMap;
  8use futures::channel::oneshot;
  9use gpui::executor;
 10use ordered_float::OrderedFloat;
 11use project::{search::PathMatcher, Fs};
 12use rpc::proto::Timestamp;
 13use rusqlite::params;
 14use rusqlite::types::Value;
 15use std::{
 16    cmp::Reverse,
 17    future::Future,
 18    ops::Range,
 19    path::{Path, PathBuf},
 20    rc::Rc,
 21    sync::Arc,
 22    time::SystemTime,
 23};
 24use util::TryFutureExt;
 25
 26#[derive(Debug)]
 27pub struct FileRecord {
 28    pub id: usize,
 29    pub relative_path: String,
 30    pub mtime: Timestamp,
 31}
 32
 33#[derive(Clone)]
 34pub struct VectorDatabase {
 35    path: Arc<Path>,
 36    transactions:
 37        smol::channel::Sender<Box<dyn 'static + Send + FnOnce(&mut rusqlite::Connection)>>,
 38}
 39
 40impl VectorDatabase {
 41    pub async fn new(
 42        fs: Arc<dyn Fs>,
 43        path: Arc<Path>,
 44        executor: Arc<executor::Background>,
 45    ) -> Result<Self> {
 46        if let Some(db_directory) = path.parent() {
 47            fs.create_dir(db_directory).await?;
 48        }
 49
 50        let (transactions_tx, transactions_rx) = smol::channel::unbounded::<
 51            Box<dyn 'static + Send + FnOnce(&mut rusqlite::Connection)>,
 52        >();
 53        executor
 54            .spawn({
 55                let path = path.clone();
 56                async move {
 57                    let mut connection = rusqlite::Connection::open(&path)?;
 58
 59                    connection.pragma_update(None, "journal_mode", "wal")?;
 60                    connection.pragma_update(None, "synchronous", "normal")?;
 61                    connection.pragma_update(None, "cache_size", 1000000)?;
 62                    connection.pragma_update(None, "temp_store", "MEMORY")?;
 63
 64                    while let Ok(transaction) = transactions_rx.recv().await {
 65                        transaction(&mut connection);
 66                    }
 67
 68                    anyhow::Ok(())
 69                }
 70                .log_err()
 71            })
 72            .detach();
 73        let this = Self {
 74            transactions: transactions_tx,
 75            path,
 76        };
 77        this.initialize_database().await?;
 78        Ok(this)
 79    }
 80
 81    pub fn path(&self) -> &Arc<Path> {
 82        &self.path
 83    }
 84
 85    fn transact<F, T>(&self, f: F) -> impl Future<Output = Result<T>>
 86    where
 87        F: 'static + Send + FnOnce(&rusqlite::Transaction) -> Result<T>,
 88        T: 'static + Send,
 89    {
 90        let (tx, rx) = oneshot::channel();
 91        let transactions = self.transactions.clone();
 92        async move {
 93            if transactions
 94                .send(Box::new(|connection| {
 95                    let result = connection
 96                        .transaction()
 97                        .map_err(|err| anyhow!(err))
 98                        .and_then(|transaction| {
 99                            let result = f(&transaction)?;
100                            transaction.commit()?;
101                            Ok(result)
102                        });
103                    let _ = tx.send(result);
104                }))
105                .await
106                .is_err()
107            {
108                return Err(anyhow!("connection was dropped"))?;
109            }
110            rx.await?
111        }
112    }
113
114    fn initialize_database(&self) -> impl Future<Output = Result<()>> {
115        self.transact(|db| {
116            rusqlite::vtab::array::load_module(&db)?;
117
118            // Delete existing tables, if SEMANTIC_INDEX_VERSION is bumped
119            let version_query = db.prepare("SELECT version from semantic_index_config");
120            let version = version_query
121                .and_then(|mut query| query.query_row([], |row| Ok(row.get::<_, i64>(0)?)));
122            if version.map_or(false, |version| version == SEMANTIC_INDEX_VERSION as i64) {
123                log::trace!("vector database schema up to date");
124                return Ok(());
125            }
126
127            log::trace!("vector database schema out of date. updating...");
128            // We renamed the `documents` table to `spans`, so we want to drop
129            // `documents` without recreating it if it exists.
130            db.execute("DROP TABLE IF EXISTS documents", [])
131                .context("failed to drop 'documents' table")?;
132            db.execute("DROP TABLE IF EXISTS spans", [])
133                .context("failed to drop 'spans' table")?;
134            db.execute("DROP TABLE IF EXISTS files", [])
135                .context("failed to drop 'files' table")?;
136            db.execute("DROP TABLE IF EXISTS worktrees", [])
137                .context("failed to drop 'worktrees' table")?;
138            db.execute("DROP TABLE IF EXISTS semantic_index_config", [])
139                .context("failed to drop 'semantic_index_config' table")?;
140
141            // Initialize Vector Databasing Tables
142            db.execute(
143                "CREATE TABLE semantic_index_config (
144                    version INTEGER NOT NULL
145                )",
146                [],
147            )?;
148
149            db.execute(
150                "INSERT INTO semantic_index_config (version) VALUES (?1)",
151                params![SEMANTIC_INDEX_VERSION],
152            )?;
153
154            db.execute(
155                "CREATE TABLE worktrees (
156                    id INTEGER PRIMARY KEY AUTOINCREMENT,
157                    absolute_path VARCHAR NOT NULL
158                );
159                CREATE UNIQUE INDEX worktrees_absolute_path ON worktrees (absolute_path);
160                ",
161                [],
162            )?;
163
164            db.execute(
165                "CREATE TABLE files (
166                    id INTEGER PRIMARY KEY AUTOINCREMENT,
167                    worktree_id INTEGER NOT NULL,
168                    relative_path VARCHAR NOT NULL,
169                    mtime_seconds INTEGER NOT NULL,
170                    mtime_nanos INTEGER NOT NULL,
171                    FOREIGN KEY(worktree_id) REFERENCES worktrees(id) ON DELETE CASCADE
172                )",
173                [],
174            )?;
175
176            db.execute(
177                "CREATE UNIQUE INDEX files_worktree_id_and_relative_path ON files (worktree_id, relative_path)",
178                [],
179            )?;
180
181            db.execute(
182                "CREATE TABLE spans (
183                    id INTEGER PRIMARY KEY AUTOINCREMENT,
184                    file_id INTEGER NOT NULL,
185                    start_byte INTEGER NOT NULL,
186                    end_byte INTEGER NOT NULL,
187                    name VARCHAR NOT NULL,
188                    embedding BLOB NOT NULL,
189                    digest BLOB NOT NULL,
190                    FOREIGN KEY(file_id) REFERENCES files(id) ON DELETE CASCADE
191                )",
192                [],
193            )?;
194            db.execute(
195                "CREATE INDEX spans_digest ON spans (digest)",
196                [],
197            )?;
198
199            log::trace!("vector database initialized with updated schema.");
200            Ok(())
201        })
202    }
203
204    pub fn delete_file(
205        &self,
206        worktree_id: i64,
207        delete_path: Arc<Path>,
208    ) -> impl Future<Output = Result<()>> {
209        self.transact(move |db| {
210            db.execute(
211                "DELETE FROM files WHERE worktree_id = ?1 AND relative_path = ?2",
212                params![worktree_id, delete_path.to_str()],
213            )?;
214            Ok(())
215        })
216    }
217
218    pub fn insert_file(
219        &self,
220        worktree_id: i64,
221        path: Arc<Path>,
222        mtime: SystemTime,
223        spans: Vec<Span>,
224    ) -> impl Future<Output = Result<()>> {
225        self.transact(move |db| {
226            // Return the existing ID, if both the file and mtime match
227            let mtime = Timestamp::from(mtime);
228
229            db.execute(
230                "
231                REPLACE INTO files
232                (worktree_id, relative_path, mtime_seconds, mtime_nanos)
233                VALUES (?1, ?2, ?3, ?4)
234                ",
235                params![worktree_id, path.to_str(), mtime.seconds, mtime.nanos],
236            )?;
237
238            let file_id = db.last_insert_rowid();
239
240            let mut query = db.prepare(
241                "
242                INSERT INTO spans
243                (file_id, start_byte, end_byte, name, embedding, digest)
244                VALUES (?1, ?2, ?3, ?4, ?5, ?6)
245                ",
246            )?;
247
248            for span in spans {
249                query.execute(params![
250                    file_id,
251                    span.range.start.to_string(),
252                    span.range.end.to_string(),
253                    span.name,
254                    span.embedding,
255                    span.digest
256                ])?;
257            }
258
259            Ok(())
260        })
261    }
262
263    pub fn worktree_previously_indexed(
264        &self,
265        worktree_root_path: &Path,
266    ) -> impl Future<Output = Result<bool>> {
267        let worktree_root_path = worktree_root_path.to_string_lossy().into_owned();
268        self.transact(move |db| {
269            let mut worktree_query =
270                db.prepare("SELECT id FROM worktrees WHERE absolute_path = ?1")?;
271            let worktree_id = worktree_query
272                .query_row(params![worktree_root_path], |row| Ok(row.get::<_, i64>(0)?));
273
274            if worktree_id.is_ok() {
275                return Ok(true);
276            } else {
277                return Ok(false);
278            }
279        })
280    }
281
282    pub fn embeddings_for_digests(
283        &self,
284        digests: Vec<SpanDigest>,
285    ) -> impl Future<Output = Result<HashMap<SpanDigest, Embedding>>> {
286        self.transact(move |db| {
287            let mut query = db.prepare(
288                "
289                SELECT digest, embedding
290                FROM spans
291                WHERE digest IN rarray(?)
292                ",
293            )?;
294            let mut embeddings_by_digest = HashMap::default();
295            let digests = Rc::new(
296                digests
297                    .into_iter()
298                    .map(|p| Value::Blob(p.0.to_vec()))
299                    .collect::<Vec<_>>(),
300            );
301            let rows = query.query_map(params![digests], |row| {
302                Ok((row.get::<_, SpanDigest>(0)?, row.get::<_, Embedding>(1)?))
303            })?;
304
305            for row in rows {
306                if let Ok(row) = row {
307                    embeddings_by_digest.insert(row.0, row.1);
308                }
309            }
310
311            Ok(embeddings_by_digest)
312        })
313    }
314
315    pub fn embeddings_for_files(
316        &self,
317        worktree_id_file_paths: HashMap<i64, Vec<Arc<Path>>>,
318    ) -> impl Future<Output = Result<HashMap<SpanDigest, Embedding>>> {
319        self.transact(move |db| {
320            let mut query = db.prepare(
321                "
322                SELECT digest, embedding
323                FROM spans
324                LEFT JOIN files ON files.id = spans.file_id
325                WHERE files.worktree_id = ? AND files.relative_path IN rarray(?)
326            ",
327            )?;
328            let mut embeddings_by_digest = HashMap::default();
329            for (worktree_id, file_paths) in worktree_id_file_paths {
330                let file_paths = Rc::new(
331                    file_paths
332                        .into_iter()
333                        .map(|p| Value::Text(p.to_string_lossy().into_owned()))
334                        .collect::<Vec<_>>(),
335                );
336                let rows = query.query_map(params![worktree_id, file_paths], |row| {
337                    Ok((row.get::<_, SpanDigest>(0)?, row.get::<_, Embedding>(1)?))
338                })?;
339
340                for row in rows {
341                    if let Ok(row) = row {
342                        embeddings_by_digest.insert(row.0, row.1);
343                    }
344                }
345            }
346
347            Ok(embeddings_by_digest)
348        })
349    }
350
351    pub fn find_or_create_worktree(
352        &self,
353        worktree_root_path: Arc<Path>,
354    ) -> impl Future<Output = Result<i64>> {
355        self.transact(move |db| {
356            let mut worktree_query =
357                db.prepare("SELECT id FROM worktrees WHERE absolute_path = ?1")?;
358            let worktree_id = worktree_query
359                .query_row(params![worktree_root_path.to_string_lossy()], |row| {
360                    Ok(row.get::<_, i64>(0)?)
361                });
362
363            if worktree_id.is_ok() {
364                return Ok(worktree_id?);
365            }
366
367            // If worktree_id is Err, insert new worktree
368            db.execute(
369                "INSERT into worktrees (absolute_path) VALUES (?1)",
370                params![worktree_root_path.to_string_lossy()],
371            )?;
372            Ok(db.last_insert_rowid())
373        })
374    }
375
376    pub fn get_file_mtimes(
377        &self,
378        worktree_id: i64,
379    ) -> impl Future<Output = Result<HashMap<PathBuf, SystemTime>>> {
380        self.transact(move |db| {
381            let mut statement = db.prepare(
382                "
383                SELECT relative_path, mtime_seconds, mtime_nanos
384                FROM files
385                WHERE worktree_id = ?1
386                ORDER BY relative_path",
387            )?;
388            let mut result: HashMap<PathBuf, SystemTime> = HashMap::default();
389            for row in statement.query_map(params![worktree_id], |row| {
390                Ok((
391                    row.get::<_, String>(0)?.into(),
392                    Timestamp {
393                        seconds: row.get(1)?,
394                        nanos: row.get(2)?,
395                    }
396                    .into(),
397                ))
398            })? {
399                let row = row?;
400                result.insert(row.0, row.1);
401            }
402            Ok(result)
403        })
404    }
405
406    pub fn top_k_search(
407        &self,
408        query_embedding: &Embedding,
409        limit: usize,
410        file_ids: &[i64],
411    ) -> impl Future<Output = Result<Vec<(i64, OrderedFloat<f32>)>>> {
412        let query_embedding = query_embedding.clone();
413        let file_ids = file_ids.to_vec();
414        self.transact(move |db| {
415            let mut results = Vec::<(i64, OrderedFloat<f32>)>::with_capacity(limit + 1);
416            Self::for_each_span(db, &file_ids, |id, embedding| {
417                let similarity = embedding.similarity(&query_embedding);
418                let ix = match results
419                    .binary_search_by_key(&Reverse(similarity), |(_, s)| Reverse(*s))
420                {
421                    Ok(ix) => ix,
422                    Err(ix) => ix,
423                };
424                results.insert(ix, (id, similarity));
425                results.truncate(limit);
426            })?;
427
428            anyhow::Ok(results)
429        })
430    }
431
432    pub fn retrieve_included_file_ids(
433        &self,
434        worktree_ids: &[i64],
435        includes: &[PathMatcher],
436        excludes: &[PathMatcher],
437    ) -> impl Future<Output = Result<Vec<i64>>> {
438        let worktree_ids = worktree_ids.to_vec();
439        let includes = includes.to_vec();
440        let excludes = excludes.to_vec();
441        self.transact(move |db| {
442            let mut file_query = db.prepare(
443                "
444                SELECT
445                    id, relative_path
446                FROM
447                    files
448                WHERE
449                    worktree_id IN rarray(?)
450                ",
451            )?;
452
453            let mut file_ids = Vec::<i64>::new();
454            let mut rows = file_query.query([ids_to_sql(&worktree_ids)])?;
455
456            while let Some(row) = rows.next()? {
457                let file_id = row.get(0)?;
458                let relative_path = row.get_ref(1)?.as_str()?;
459                let included =
460                    includes.is_empty() || includes.iter().any(|glob| glob.is_match(relative_path));
461                let excluded = excludes.iter().any(|glob| glob.is_match(relative_path));
462                if included && !excluded {
463                    file_ids.push(file_id);
464                }
465            }
466
467            anyhow::Ok(file_ids)
468        })
469    }
470
471    fn for_each_span(
472        db: &rusqlite::Connection,
473        file_ids: &[i64],
474        mut f: impl FnMut(i64, Embedding),
475    ) -> Result<()> {
476        let mut query_statement = db.prepare(
477            "
478            SELECT
479                id, embedding
480            FROM
481                spans
482            WHERE
483                file_id IN rarray(?)
484            ",
485        )?;
486
487        query_statement
488            .query_map(params![ids_to_sql(&file_ids)], |row| {
489                Ok((row.get(0)?, row.get::<_, Embedding>(1)?))
490            })?
491            .filter_map(|row| row.ok())
492            .for_each(|(id, embedding)| f(id, embedding));
493        Ok(())
494    }
495
496    pub fn spans_for_ids(
497        &self,
498        ids: &[i64],
499    ) -> impl Future<Output = Result<Vec<(i64, PathBuf, Range<usize>)>>> {
500        let ids = ids.to_vec();
501        self.transact(move |db| {
502            let mut statement = db.prepare(
503                "
504                    SELECT
505                        spans.id,
506                        files.worktree_id,
507                        files.relative_path,
508                        spans.start_byte,
509                        spans.end_byte
510                    FROM
511                        spans, files
512                    WHERE
513                        spans.file_id = files.id AND
514                        spans.id in rarray(?)
515                ",
516            )?;
517
518            let result_iter = statement.query_map(params![ids_to_sql(&ids)], |row| {
519                Ok((
520                    row.get::<_, i64>(0)?,
521                    row.get::<_, i64>(1)?,
522                    row.get::<_, String>(2)?.into(),
523                    row.get(3)?..row.get(4)?,
524                ))
525            })?;
526
527            let mut values_by_id = HashMap::<i64, (i64, PathBuf, Range<usize>)>::default();
528            for row in result_iter {
529                let (id, worktree_id, path, range) = row?;
530                values_by_id.insert(id, (worktree_id, path, range));
531            }
532
533            let mut results = Vec::with_capacity(ids.len());
534            for id in &ids {
535                let value = values_by_id
536                    .remove(id)
537                    .ok_or(anyhow!("missing span id {}", id))?;
538                results.push(value);
539            }
540
541            Ok(results)
542        })
543    }
544}
545
546fn ids_to_sql(ids: &[i64]) -> Rc<Vec<rusqlite::types::Value>> {
547    Rc::new(
548        ids.iter()
549            .copied()
550            .map(|v| rusqlite::types::Value::from(v))
551            .collect::<Vec<_>>(),
552    )
553}