db.rs

  1use crate::{
  2    embedding::Embedding,
  3    parsing::{Span, SpanDigest},
  4    SEMANTIC_INDEX_VERSION,
  5};
  6use anyhow::{anyhow, Context, Result};
  7use collections::HashMap;
  8use futures::channel::oneshot;
  9use gpui::executor;
 10use project::{search::PathMatcher, Fs};
 11use rpc::proto::Timestamp;
 12use rusqlite::params;
 13use rusqlite::types::Value;
 14use std::{
 15    cmp::Ordering,
 16    future::Future,
 17    ops::Range,
 18    path::{Path, PathBuf},
 19    rc::Rc,
 20    sync::Arc,
 21    time::SystemTime,
 22};
 23use util::TryFutureExt;
 24
 25#[derive(Debug)]
 26pub struct FileRecord {
 27    pub id: usize,
 28    pub relative_path: String,
 29    pub mtime: Timestamp,
 30}
 31
 32#[derive(Clone)]
 33pub struct VectorDatabase {
 34    path: Arc<Path>,
 35    transactions:
 36        smol::channel::Sender<Box<dyn 'static + Send + FnOnce(&mut rusqlite::Connection)>>,
 37}
 38
 39impl VectorDatabase {
 40    pub async fn new(
 41        fs: Arc<dyn Fs>,
 42        path: Arc<Path>,
 43        executor: Arc<executor::Background>,
 44    ) -> Result<Self> {
 45        if let Some(db_directory) = path.parent() {
 46            fs.create_dir(db_directory).await?;
 47        }
 48
 49        let (transactions_tx, transactions_rx) = smol::channel::unbounded::<
 50            Box<dyn 'static + Send + FnOnce(&mut rusqlite::Connection)>,
 51        >();
 52        executor
 53            .spawn({
 54                let path = path.clone();
 55                async move {
 56                    let mut connection = rusqlite::Connection::open(&path)?;
 57
 58                    connection.pragma_update(None, "journal_mode", "wal")?;
 59                    connection.pragma_update(None, "synchronous", "normal")?;
 60                    connection.pragma_update(None, "cache_size", 1000000)?;
 61                    connection.pragma_update(None, "temp_store", "MEMORY")?;
 62
 63                    while let Ok(transaction) = transactions_rx.recv().await {
 64                        transaction(&mut connection);
 65                    }
 66
 67                    anyhow::Ok(())
 68                }
 69                .log_err()
 70            })
 71            .detach();
 72        let this = Self {
 73            transactions: transactions_tx,
 74            path,
 75        };
 76        this.initialize_database().await?;
 77        Ok(this)
 78    }
 79
 80    pub fn path(&self) -> &Arc<Path> {
 81        &self.path
 82    }
 83
 84    fn transact<F, T>(&self, f: F) -> impl Future<Output = Result<T>>
 85    where
 86        F: 'static + Send + FnOnce(&rusqlite::Transaction) -> Result<T>,
 87        T: 'static + Send,
 88    {
 89        let (tx, rx) = oneshot::channel();
 90        let transactions = self.transactions.clone();
 91        async move {
 92            if transactions
 93                .send(Box::new(|connection| {
 94                    let result = connection
 95                        .transaction()
 96                        .map_err(|err| anyhow!(err))
 97                        .and_then(|transaction| {
 98                            let result = f(&transaction)?;
 99                            transaction.commit()?;
100                            Ok(result)
101                        });
102                    let _ = tx.send(result);
103                }))
104                .await
105                .is_err()
106            {
107                return Err(anyhow!("connection was dropped"))?;
108            }
109            rx.await?
110        }
111    }
112
113    fn initialize_database(&self) -> impl Future<Output = Result<()>> {
114        self.transact(|db| {
115            rusqlite::vtab::array::load_module(&db)?;
116
117            // Delete existing tables, if SEMANTIC_INDEX_VERSION is bumped
118            let version_query = db.prepare("SELECT version from semantic_index_config");
119            let version = version_query
120                .and_then(|mut query| query.query_row([], |row| Ok(row.get::<_, i64>(0)?)));
121            if version.map_or(false, |version| version == SEMANTIC_INDEX_VERSION as i64) {
122                log::trace!("vector database schema up to date");
123                return Ok(());
124            }
125
126            log::trace!("vector database schema out of date. updating...");
127            // We renamed the `documents` table to `spans`, so we want to drop
128            // `documents` without recreating it if it exists.
129            db.execute("DROP TABLE IF EXISTS documents", [])
130                .context("failed to drop 'documents' table")?;
131            db.execute("DROP TABLE IF EXISTS spans", [])
132                .context("failed to drop 'spans' table")?;
133            db.execute("DROP TABLE IF EXISTS files", [])
134                .context("failed to drop 'files' table")?;
135            db.execute("DROP TABLE IF EXISTS worktrees", [])
136                .context("failed to drop 'worktrees' table")?;
137            db.execute("DROP TABLE IF EXISTS semantic_index_config", [])
138                .context("failed to drop 'semantic_index_config' table")?;
139
140            // Initialize Vector Databasing Tables
141            db.execute(
142                "CREATE TABLE semantic_index_config (
143                    version INTEGER NOT NULL
144                )",
145                [],
146            )?;
147
148            db.execute(
149                "INSERT INTO semantic_index_config (version) VALUES (?1)",
150                params![SEMANTIC_INDEX_VERSION],
151            )?;
152
153            db.execute(
154                "CREATE TABLE worktrees (
155                    id INTEGER PRIMARY KEY AUTOINCREMENT,
156                    absolute_path VARCHAR NOT NULL
157                );
158                CREATE UNIQUE INDEX worktrees_absolute_path ON worktrees (absolute_path);
159                ",
160                [],
161            )?;
162
163            db.execute(
164                "CREATE TABLE files (
165                    id INTEGER PRIMARY KEY AUTOINCREMENT,
166                    worktree_id INTEGER NOT NULL,
167                    relative_path VARCHAR NOT NULL,
168                    mtime_seconds INTEGER NOT NULL,
169                    mtime_nanos INTEGER NOT NULL,
170                    FOREIGN KEY(worktree_id) REFERENCES worktrees(id) ON DELETE CASCADE
171                )",
172                [],
173            )?;
174
175            db.execute(
176                "CREATE UNIQUE INDEX files_worktree_id_and_relative_path ON files (worktree_id, relative_path)",
177                [],
178            )?;
179
180            db.execute(
181                "CREATE TABLE spans (
182                    id INTEGER PRIMARY KEY AUTOINCREMENT,
183                    file_id INTEGER NOT NULL,
184                    start_byte INTEGER NOT NULL,
185                    end_byte INTEGER NOT NULL,
186                    name VARCHAR NOT NULL,
187                    embedding BLOB NOT NULL,
188                    digest BLOB NOT NULL,
189                    FOREIGN KEY(file_id) REFERENCES files(id) ON DELETE CASCADE
190                )",
191                [],
192            )?;
193            db.execute(
194                "CREATE INDEX spans_digest ON spans (digest)",
195                [],
196            )?;
197
198            log::trace!("vector database initialized with updated schema.");
199            Ok(())
200        })
201    }
202
203    pub fn delete_file(
204        &self,
205        worktree_id: i64,
206        delete_path: Arc<Path>,
207    ) -> impl Future<Output = Result<()>> {
208        self.transact(move |db| {
209            db.execute(
210                "DELETE FROM files WHERE worktree_id = ?1 AND relative_path = ?2",
211                params![worktree_id, delete_path.to_str()],
212            )?;
213            Ok(())
214        })
215    }
216
217    pub fn insert_file(
218        &self,
219        worktree_id: i64,
220        path: Arc<Path>,
221        mtime: SystemTime,
222        spans: Vec<Span>,
223    ) -> impl Future<Output = Result<()>> {
224        self.transact(move |db| {
225            // Return the existing ID, if both the file and mtime match
226            let mtime = Timestamp::from(mtime);
227
228            db.execute(
229                "
230                REPLACE INTO files
231                (worktree_id, relative_path, mtime_seconds, mtime_nanos)
232                VALUES (?1, ?2, ?3, ?4)
233                ",
234                params![worktree_id, path.to_str(), mtime.seconds, mtime.nanos],
235            )?;
236
237            let file_id = db.last_insert_rowid();
238
239            let mut query = db.prepare(
240                "
241                INSERT INTO spans
242                (file_id, start_byte, end_byte, name, embedding, digest)
243                VALUES (?1, ?2, ?3, ?4, ?5, ?6)
244                ",
245            )?;
246
247            for span in spans {
248                query.execute(params![
249                    file_id,
250                    span.range.start.to_string(),
251                    span.range.end.to_string(),
252                    span.name,
253                    span.embedding,
254                    span.digest
255                ])?;
256            }
257
258            Ok(())
259        })
260    }
261
262    pub fn worktree_previously_indexed(
263        &self,
264        worktree_root_path: &Path,
265    ) -> impl Future<Output = Result<bool>> {
266        let worktree_root_path = worktree_root_path.to_string_lossy().into_owned();
267        self.transact(move |db| {
268            let mut worktree_query =
269                db.prepare("SELECT id FROM worktrees WHERE absolute_path = ?1")?;
270            let worktree_id = worktree_query
271                .query_row(params![worktree_root_path], |row| Ok(row.get::<_, i64>(0)?));
272
273            if worktree_id.is_ok() {
274                return Ok(true);
275            } else {
276                return Ok(false);
277            }
278        })
279    }
280
281    pub fn embeddings_for_digests(
282        &self,
283        digests: Vec<SpanDigest>,
284    ) -> impl Future<Output = Result<HashMap<SpanDigest, Embedding>>> {
285        self.transact(move |db| {
286            let mut query = db.prepare(
287                "
288                SELECT digest, embedding
289                FROM spans
290                WHERE digest IN rarray(?)
291                ",
292            )?;
293            let mut embeddings_by_digest = HashMap::default();
294            let digests = Rc::new(
295                digests
296                    .into_iter()
297                    .map(|p| Value::Blob(p.0.to_vec()))
298                    .collect::<Vec<_>>(),
299            );
300            let rows = query.query_map(params![digests], |row| {
301                Ok((row.get::<_, SpanDigest>(0)?, row.get::<_, Embedding>(1)?))
302            })?;
303
304            for row in rows {
305                if let Ok(row) = row {
306                    embeddings_by_digest.insert(row.0, row.1);
307                }
308            }
309
310            Ok(embeddings_by_digest)
311        })
312    }
313
314    pub fn embeddings_for_files(
315        &self,
316        worktree_id_file_paths: HashMap<i64, Vec<Arc<Path>>>,
317    ) -> impl Future<Output = Result<HashMap<SpanDigest, Embedding>>> {
318        self.transact(move |db| {
319            let mut query = db.prepare(
320                "
321                SELECT digest, embedding
322                FROM spans
323                LEFT JOIN files ON files.id = spans.file_id
324                WHERE files.worktree_id = ? AND files.relative_path IN rarray(?)
325            ",
326            )?;
327            let mut embeddings_by_digest = HashMap::default();
328            for (worktree_id, file_paths) in worktree_id_file_paths {
329                let file_paths = Rc::new(
330                    file_paths
331                        .into_iter()
332                        .map(|p| Value::Text(p.to_string_lossy().into_owned()))
333                        .collect::<Vec<_>>(),
334                );
335                let rows = query.query_map(params![worktree_id, file_paths], |row| {
336                    Ok((row.get::<_, SpanDigest>(0)?, row.get::<_, Embedding>(1)?))
337                })?;
338
339                for row in rows {
340                    if let Ok(row) = row {
341                        embeddings_by_digest.insert(row.0, row.1);
342                    }
343                }
344            }
345
346            Ok(embeddings_by_digest)
347        })
348    }
349
350    pub fn find_or_create_worktree(
351        &self,
352        worktree_root_path: Arc<Path>,
353    ) -> impl Future<Output = Result<i64>> {
354        self.transact(move |db| {
355            let mut worktree_query =
356                db.prepare("SELECT id FROM worktrees WHERE absolute_path = ?1")?;
357            let worktree_id = worktree_query
358                .query_row(params![worktree_root_path.to_string_lossy()], |row| {
359                    Ok(row.get::<_, i64>(0)?)
360                });
361
362            if worktree_id.is_ok() {
363                return Ok(worktree_id?);
364            }
365
366            // If worktree_id is Err, insert new worktree
367            db.execute(
368                "INSERT into worktrees (absolute_path) VALUES (?1)",
369                params![worktree_root_path.to_string_lossy()],
370            )?;
371            Ok(db.last_insert_rowid())
372        })
373    }
374
375    pub fn get_file_mtimes(
376        &self,
377        worktree_id: i64,
378    ) -> impl Future<Output = Result<HashMap<PathBuf, SystemTime>>> {
379        self.transact(move |db| {
380            let mut statement = db.prepare(
381                "
382                SELECT relative_path, mtime_seconds, mtime_nanos
383                FROM files
384                WHERE worktree_id = ?1
385                ORDER BY relative_path",
386            )?;
387            let mut result: HashMap<PathBuf, SystemTime> = HashMap::default();
388            for row in statement.query_map(params![worktree_id], |row| {
389                Ok((
390                    row.get::<_, String>(0)?.into(),
391                    Timestamp {
392                        seconds: row.get(1)?,
393                        nanos: row.get(2)?,
394                    }
395                    .into(),
396                ))
397            })? {
398                let row = row?;
399                result.insert(row.0, row.1);
400            }
401            Ok(result)
402        })
403    }
404
405    pub fn top_k_search(
406        &self,
407        query_embedding: &Embedding,
408        limit: usize,
409        file_ids: &[i64],
410    ) -> impl Future<Output = Result<Vec<(i64, f32)>>> {
411        let query_embedding = query_embedding.clone();
412        let file_ids = file_ids.to_vec();
413        self.transact(move |db| {
414            let mut results = Vec::<(i64, f32)>::with_capacity(limit + 1);
415            Self::for_each_span(db, &file_ids, |id, embedding| {
416                let similarity = embedding.similarity(&query_embedding);
417                let ix = match results.binary_search_by(|(_, s)| {
418                    similarity.partial_cmp(&s).unwrap_or(Ordering::Equal)
419                }) {
420                    Ok(ix) => ix,
421                    Err(ix) => ix,
422                };
423                results.insert(ix, (id, similarity));
424                results.truncate(limit);
425            })?;
426
427            anyhow::Ok(results)
428        })
429    }
430
431    pub fn retrieve_included_file_ids(
432        &self,
433        worktree_ids: &[i64],
434        includes: &[PathMatcher],
435        excludes: &[PathMatcher],
436    ) -> impl Future<Output = Result<Vec<i64>>> {
437        let worktree_ids = worktree_ids.to_vec();
438        let includes = includes.to_vec();
439        let excludes = excludes.to_vec();
440        self.transact(move |db| {
441            let mut file_query = db.prepare(
442                "
443                SELECT
444                    id, relative_path
445                FROM
446                    files
447                WHERE
448                    worktree_id IN rarray(?)
449                ",
450            )?;
451
452            let mut file_ids = Vec::<i64>::new();
453            let mut rows = file_query.query([ids_to_sql(&worktree_ids)])?;
454
455            while let Some(row) = rows.next()? {
456                let file_id = row.get(0)?;
457                let relative_path = row.get_ref(1)?.as_str()?;
458                let included =
459                    includes.is_empty() || includes.iter().any(|glob| glob.is_match(relative_path));
460                let excluded = excludes.iter().any(|glob| glob.is_match(relative_path));
461                if included && !excluded {
462                    file_ids.push(file_id);
463                }
464            }
465
466            anyhow::Ok(file_ids)
467        })
468    }
469
470    fn for_each_span(
471        db: &rusqlite::Connection,
472        file_ids: &[i64],
473        mut f: impl FnMut(i64, Embedding),
474    ) -> Result<()> {
475        let mut query_statement = db.prepare(
476            "
477            SELECT
478                id, embedding
479            FROM
480                spans
481            WHERE
482                file_id IN rarray(?)
483            ",
484        )?;
485
486        query_statement
487            .query_map(params![ids_to_sql(&file_ids)], |row| {
488                Ok((row.get(0)?, row.get::<_, Embedding>(1)?))
489            })?
490            .filter_map(|row| row.ok())
491            .for_each(|(id, embedding)| f(id, embedding));
492        Ok(())
493    }
494
495    pub fn spans_for_ids(
496        &self,
497        ids: &[i64],
498    ) -> impl Future<Output = Result<Vec<(i64, PathBuf, Range<usize>)>>> {
499        let ids = ids.to_vec();
500        self.transact(move |db| {
501            let mut statement = db.prepare(
502                "
503                    SELECT
504                        spans.id,
505                        files.worktree_id,
506                        files.relative_path,
507                        spans.start_byte,
508                        spans.end_byte
509                    FROM
510                        spans, files
511                    WHERE
512                        spans.file_id = files.id AND
513                        spans.id in rarray(?)
514                ",
515            )?;
516
517            let result_iter = statement.query_map(params![ids_to_sql(&ids)], |row| {
518                Ok((
519                    row.get::<_, i64>(0)?,
520                    row.get::<_, i64>(1)?,
521                    row.get::<_, String>(2)?.into(),
522                    row.get(3)?..row.get(4)?,
523                ))
524            })?;
525
526            let mut values_by_id = HashMap::<i64, (i64, PathBuf, Range<usize>)>::default();
527            for row in result_iter {
528                let (id, worktree_id, path, range) = row?;
529                values_by_id.insert(id, (worktree_id, path, range));
530            }
531
532            let mut results = Vec::with_capacity(ids.len());
533            for id in &ids {
534                let value = values_by_id
535                    .remove(id)
536                    .ok_or(anyhow!("missing span id {}", id))?;
537                results.push(value);
538            }
539
540            Ok(results)
541        })
542    }
543}
544
545fn ids_to_sql(ids: &[i64]) -> Rc<Vec<rusqlite::types::Value>> {
546    Rc::new(
547        ids.iter()
548            .copied()
549            .map(|v| rusqlite::types::Value::from(v))
550            .collect::<Vec<_>>(),
551    )
552}