1use crate::{
2 embedding::Embedding,
3 parsing::{Document, DocumentDigest},
4 SEMANTIC_INDEX_VERSION,
5};
6use anyhow::{anyhow, Context, Result};
7use futures::channel::oneshot;
8use gpui::executor;
9use project::{search::PathMatcher, Fs};
10use rpc::proto::Timestamp;
11use rusqlite::params;
12use rusqlite::types::Value;
13use std::{
14 cmp::Ordering,
15 collections::HashMap,
16 future::Future,
17 ops::Range,
18 path::{Path, PathBuf},
19 rc::Rc,
20 sync::Arc,
21 time::{Instant, SystemTime},
22};
23use util::TryFutureExt;
24
25#[derive(Debug)]
26pub struct FileRecord {
27 pub id: usize,
28 pub relative_path: String,
29 pub mtime: Timestamp,
30}
31
32#[derive(Clone)]
33pub struct VectorDatabase {
34 path: Arc<Path>,
35 transactions:
36 smol::channel::Sender<Box<dyn 'static + Send + FnOnce(&mut rusqlite::Connection)>>,
37}
38
39impl VectorDatabase {
40 pub async fn new(
41 fs: Arc<dyn Fs>,
42 path: Arc<Path>,
43 executor: Arc<executor::Background>,
44 ) -> Result<Self> {
45 if let Some(db_directory) = path.parent() {
46 fs.create_dir(db_directory).await?;
47 }
48
49 let (transactions_tx, transactions_rx) = smol::channel::unbounded::<
50 Box<dyn 'static + Send + FnOnce(&mut rusqlite::Connection)>,
51 >();
52 executor
53 .spawn({
54 let path = path.clone();
55 async move {
56 let mut connection = rusqlite::Connection::open(&path)?;
57
58 connection.pragma_update(None, "journal_mode", "wal")?;
59 connection.pragma_update(None, "synchronous", "normal")?;
60 connection.pragma_update(None, "cache_size", 1000000)?;
61 connection.pragma_update(None, "temp_store", "MEMORY")?;
62
63 while let Ok(transaction) = transactions_rx.recv().await {
64 transaction(&mut connection);
65 }
66
67 anyhow::Ok(())
68 }
69 .log_err()
70 })
71 .detach();
72 let this = Self {
73 transactions: transactions_tx,
74 path,
75 };
76 this.initialize_database().await?;
77 Ok(this)
78 }
79
80 pub fn path(&self) -> &Arc<Path> {
81 &self.path
82 }
83
84 fn transact<F, T>(&self, f: F) -> impl Future<Output = Result<T>>
85 where
86 F: 'static + Send + FnOnce(&rusqlite::Transaction) -> Result<T>,
87 T: 'static + Send,
88 {
89 let (tx, rx) = oneshot::channel();
90 let transactions = self.transactions.clone();
91 async move {
92 if transactions
93 .send(Box::new(|connection| {
94 let result = connection
95 .transaction()
96 .map_err(|err| anyhow!(err))
97 .and_then(|transaction| {
98 let result = f(&transaction)?;
99 transaction.commit()?;
100 Ok(result)
101 });
102 let _ = tx.send(result);
103 }))
104 .await
105 .is_err()
106 {
107 return Err(anyhow!("connection was dropped"))?;
108 }
109 rx.await?
110 }
111 }
112
113 fn initialize_database(&self) -> impl Future<Output = Result<()>> {
114 self.transact(|db| {
115 rusqlite::vtab::array::load_module(&db)?;
116
117 // Delete existing tables, if SEMANTIC_INDEX_VERSION is bumped
118 let version_query = db.prepare("SELECT version from semantic_index_config");
119 let version = version_query
120 .and_then(|mut query| query.query_row([], |row| Ok(row.get::<_, i64>(0)?)));
121 if version.map_or(false, |version| version == SEMANTIC_INDEX_VERSION as i64) {
122 log::trace!("vector database schema up to date");
123 return Ok(());
124 }
125
126 log::trace!("vector database schema out of date. updating...");
127 db.execute("DROP TABLE IF EXISTS documents", [])
128 .context("failed to drop 'documents' table")?;
129 db.execute("DROP TABLE IF EXISTS files", [])
130 .context("failed to drop 'files' table")?;
131 db.execute("DROP TABLE IF EXISTS worktrees", [])
132 .context("failed to drop 'worktrees' table")?;
133 db.execute("DROP TABLE IF EXISTS semantic_index_config", [])
134 .context("failed to drop 'semantic_index_config' table")?;
135
136 // Initialize Vector Databasing Tables
137 db.execute(
138 "CREATE TABLE semantic_index_config (
139 version INTEGER NOT NULL
140 )",
141 [],
142 )?;
143
144 db.execute(
145 "INSERT INTO semantic_index_config (version) VALUES (?1)",
146 params![SEMANTIC_INDEX_VERSION],
147 )?;
148
149 db.execute(
150 "CREATE TABLE worktrees (
151 id INTEGER PRIMARY KEY AUTOINCREMENT,
152 absolute_path VARCHAR NOT NULL
153 );
154 CREATE UNIQUE INDEX worktrees_absolute_path ON worktrees (absolute_path);
155 ",
156 [],
157 )?;
158
159 db.execute(
160 "CREATE TABLE files (
161 id INTEGER PRIMARY KEY AUTOINCREMENT,
162 worktree_id INTEGER NOT NULL,
163 relative_path VARCHAR NOT NULL,
164 mtime_seconds INTEGER NOT NULL,
165 mtime_nanos INTEGER NOT NULL,
166 FOREIGN KEY(worktree_id) REFERENCES worktrees(id) ON DELETE CASCADE
167 )",
168 [],
169 )?;
170
171 db.execute(
172 "CREATE UNIQUE INDEX files_worktree_id_and_relative_path ON files (worktree_id, relative_path)",
173 [],
174 )?;
175
176 db.execute(
177 "CREATE TABLE documents (
178 id INTEGER PRIMARY KEY AUTOINCREMENT,
179 file_id INTEGER NOT NULL,
180 start_byte INTEGER NOT NULL,
181 end_byte INTEGER NOT NULL,
182 name VARCHAR NOT NULL,
183 embedding BLOB NOT NULL,
184 digest BLOB NOT NULL,
185 FOREIGN KEY(file_id) REFERENCES files(id) ON DELETE CASCADE
186 )",
187 [],
188 )?;
189
190 log::trace!("vector database initialized with updated schema.");
191 Ok(())
192 })
193 }
194
195 pub fn delete_file(
196 &self,
197 worktree_id: i64,
198 delete_path: PathBuf,
199 ) -> impl Future<Output = Result<()>> {
200 self.transact(move |db| {
201 db.execute(
202 "DELETE FROM files WHERE worktree_id = ?1 AND relative_path = ?2",
203 params![worktree_id, delete_path.to_str()],
204 )?;
205 Ok(())
206 })
207 }
208
209 pub fn insert_file(
210 &self,
211 worktree_id: i64,
212 path: PathBuf,
213 mtime: SystemTime,
214 documents: Vec<Document>,
215 ) -> impl Future<Output = Result<()>> {
216 self.transact(move |db| {
217 // Return the existing ID, if both the file and mtime match
218 let mtime = Timestamp::from(mtime);
219
220 db.execute(
221 "
222 REPLACE INTO files
223 (worktree_id, relative_path, mtime_seconds, mtime_nanos)
224 VALUES (?1, ?2, ?3, ?4)
225 ",
226 params![worktree_id, path.to_str(), mtime.seconds, mtime.nanos],
227 )?;
228
229 let file_id = db.last_insert_rowid();
230
231 let t0 = Instant::now();
232 let mut query = db.prepare(
233 "
234 INSERT INTO documents
235 (file_id, start_byte, end_byte, name, embedding, digest)
236 VALUES (?1, ?2, ?3, ?4, ?5, ?6)
237 ",
238 )?;
239 log::trace!(
240 "Preparing Query Took: {:?} milliseconds",
241 t0.elapsed().as_millis()
242 );
243
244 for document in documents {
245 query.execute(params![
246 file_id,
247 document.range.start.to_string(),
248 document.range.end.to_string(),
249 document.name,
250 document.embedding,
251 document.digest
252 ])?;
253 }
254
255 Ok(())
256 })
257 }
258
259 pub fn worktree_previously_indexed(
260 &self,
261 worktree_root_path: &Path,
262 ) -> impl Future<Output = Result<bool>> {
263 let worktree_root_path = worktree_root_path.to_string_lossy().into_owned();
264 self.transact(move |db| {
265 let mut worktree_query =
266 db.prepare("SELECT id FROM worktrees WHERE absolute_path = ?1")?;
267 let worktree_id = worktree_query
268 .query_row(params![worktree_root_path], |row| Ok(row.get::<_, i64>(0)?));
269
270 if worktree_id.is_ok() {
271 return Ok(true);
272 } else {
273 return Ok(false);
274 }
275 })
276 }
277
278 pub fn embeddings_for_files(
279 &self,
280 worktree_id_file_paths: HashMap<i64, Vec<Arc<Path>>>,
281 ) -> impl Future<Output = Result<HashMap<DocumentDigest, Embedding>>> {
282 self.transact(move |db| {
283 let mut query = db.prepare(
284 "
285 SELECT digest, embedding
286 FROM documents
287 LEFT JOIN files ON files.id = documents.file_id
288 WHERE files.worktree_id = ? AND files.relative_path IN rarray(?)
289 ",
290 )?;
291 let mut embeddings_by_digest = HashMap::new();
292 for (worktree_id, file_paths) in worktree_id_file_paths {
293 let file_paths = Rc::new(
294 file_paths
295 .into_iter()
296 .map(|p| Value::Text(p.to_string_lossy().into_owned()))
297 .collect::<Vec<_>>(),
298 );
299 let rows = query.query_map(params![worktree_id, file_paths], |row| {
300 Ok((
301 row.get::<_, DocumentDigest>(0)?,
302 row.get::<_, Embedding>(1)?,
303 ))
304 })?;
305
306 for row in rows {
307 if let Ok(row) = row {
308 embeddings_by_digest.insert(row.0, row.1);
309 }
310 }
311 }
312
313 Ok(embeddings_by_digest)
314 })
315 }
316
317 pub fn find_or_create_worktree(
318 &self,
319 worktree_root_path: PathBuf,
320 ) -> impl Future<Output = Result<i64>> {
321 self.transact(move |db| {
322 let mut worktree_query =
323 db.prepare("SELECT id FROM worktrees WHERE absolute_path = ?1")?;
324 let worktree_id = worktree_query
325 .query_row(params![worktree_root_path.to_string_lossy()], |row| {
326 Ok(row.get::<_, i64>(0)?)
327 });
328
329 if worktree_id.is_ok() {
330 return Ok(worktree_id?);
331 }
332
333 // If worktree_id is Err, insert new worktree
334 db.execute(
335 "INSERT into worktrees (absolute_path) VALUES (?1)",
336 params![worktree_root_path.to_string_lossy()],
337 )?;
338 Ok(db.last_insert_rowid())
339 })
340 }
341
342 pub fn get_file_mtimes(
343 &self,
344 worktree_id: i64,
345 ) -> impl Future<Output = Result<HashMap<PathBuf, SystemTime>>> {
346 self.transact(move |db| {
347 let mut statement = db.prepare(
348 "
349 SELECT relative_path, mtime_seconds, mtime_nanos
350 FROM files
351 WHERE worktree_id = ?1
352 ORDER BY relative_path",
353 )?;
354 let mut result: HashMap<PathBuf, SystemTime> = HashMap::new();
355 for row in statement.query_map(params![worktree_id], |row| {
356 Ok((
357 row.get::<_, String>(0)?.into(),
358 Timestamp {
359 seconds: row.get(1)?,
360 nanos: row.get(2)?,
361 }
362 .into(),
363 ))
364 })? {
365 let row = row?;
366 result.insert(row.0, row.1);
367 }
368 Ok(result)
369 })
370 }
371
372 pub fn top_k_search(
373 &self,
374 query_embedding: &Embedding,
375 limit: usize,
376 file_ids: &[i64],
377 ) -> impl Future<Output = Result<Vec<(i64, f32)>>> {
378 let query_embedding = query_embedding.clone();
379 let file_ids = file_ids.to_vec();
380 self.transact(move |db| {
381 let mut results = Vec::<(i64, f32)>::with_capacity(limit + 1);
382 Self::for_each_document(db, &file_ids, |id, embedding| {
383 let similarity = embedding.similarity(&query_embedding);
384 let ix = match results.binary_search_by(|(_, s)| {
385 similarity.partial_cmp(&s).unwrap_or(Ordering::Equal)
386 }) {
387 Ok(ix) => ix,
388 Err(ix) => ix,
389 };
390 results.insert(ix, (id, similarity));
391 results.truncate(limit);
392 })?;
393
394 anyhow::Ok(results)
395 })
396 }
397
398 pub fn retrieve_included_file_ids(
399 &self,
400 worktree_ids: &[i64],
401 includes: &[PathMatcher],
402 excludes: &[PathMatcher],
403 ) -> impl Future<Output = Result<Vec<i64>>> {
404 let worktree_ids = worktree_ids.to_vec();
405 let includes = includes.to_vec();
406 let excludes = excludes.to_vec();
407 self.transact(move |db| {
408 let mut file_query = db.prepare(
409 "
410 SELECT
411 id, relative_path
412 FROM
413 files
414 WHERE
415 worktree_id IN rarray(?)
416 ",
417 )?;
418
419 let mut file_ids = Vec::<i64>::new();
420 let mut rows = file_query.query([ids_to_sql(&worktree_ids)])?;
421
422 while let Some(row) = rows.next()? {
423 let file_id = row.get(0)?;
424 let relative_path = row.get_ref(1)?.as_str()?;
425 let included =
426 includes.is_empty() || includes.iter().any(|glob| glob.is_match(relative_path));
427 let excluded = excludes.iter().any(|glob| glob.is_match(relative_path));
428 if included && !excluded {
429 file_ids.push(file_id);
430 }
431 }
432
433 anyhow::Ok(file_ids)
434 })
435 }
436
437 fn for_each_document(
438 db: &rusqlite::Connection,
439 file_ids: &[i64],
440 mut f: impl FnMut(i64, Embedding),
441 ) -> Result<()> {
442 let mut query_statement = db.prepare(
443 "
444 SELECT
445 id, embedding
446 FROM
447 documents
448 WHERE
449 file_id IN rarray(?)
450 ",
451 )?;
452
453 query_statement
454 .query_map(params![ids_to_sql(&file_ids)], |row| {
455 Ok((row.get(0)?, row.get::<_, Embedding>(1)?))
456 })?
457 .filter_map(|row| row.ok())
458 .for_each(|(id, embedding)| f(id, embedding));
459 Ok(())
460 }
461
462 pub fn get_documents_by_ids(
463 &self,
464 ids: &[i64],
465 ) -> impl Future<Output = Result<Vec<(i64, PathBuf, Range<usize>)>>> {
466 let ids = ids.to_vec();
467 self.transact(move |db| {
468 let mut statement = db.prepare(
469 "
470 SELECT
471 documents.id,
472 files.worktree_id,
473 files.relative_path,
474 documents.start_byte,
475 documents.end_byte
476 FROM
477 documents, files
478 WHERE
479 documents.file_id = files.id AND
480 documents.id in rarray(?)
481 ",
482 )?;
483
484 let result_iter = statement.query_map(params![ids_to_sql(&ids)], |row| {
485 Ok((
486 row.get::<_, i64>(0)?,
487 row.get::<_, i64>(1)?,
488 row.get::<_, String>(2)?.into(),
489 row.get(3)?..row.get(4)?,
490 ))
491 })?;
492
493 let mut values_by_id = HashMap::<i64, (i64, PathBuf, Range<usize>)>::default();
494 for row in result_iter {
495 let (id, worktree_id, path, range) = row?;
496 values_by_id.insert(id, (worktree_id, path, range));
497 }
498
499 let mut results = Vec::with_capacity(ids.len());
500 for id in &ids {
501 let value = values_by_id
502 .remove(id)
503 .ok_or(anyhow!("missing document id {}", id))?;
504 results.push(value);
505 }
506
507 Ok(results)
508 })
509 }
510}
511
512fn ids_to_sql(ids: &[i64]) -> Rc<Vec<rusqlite::types::Value>> {
513 Rc::new(
514 ids.iter()
515 .copied()
516 .map(|v| rusqlite::types::Value::from(v))
517 .collect::<Vec<_>>(),
518 )
519}