1use crate::{
2 embedding::Embedding,
3 parsing::{Document, DocumentDigest},
4 SEMANTIC_INDEX_VERSION,
5};
6use anyhow::{anyhow, Context, Result};
7use futures::channel::oneshot;
8use gpui::executor;
9use project::{search::PathMatcher, Fs};
10use rpc::proto::Timestamp;
11use rusqlite::params;
12use rusqlite::types::Value;
13use std::{
14 cmp::Ordering,
15 collections::HashMap,
16 future::Future,
17 ops::Range,
18 path::{Path, PathBuf},
19 rc::Rc,
20 sync::Arc,
21 time::SystemTime,
22};
23use util::TryFutureExt;
24
25#[derive(Debug)]
26pub struct FileRecord {
27 pub id: usize,
28 pub relative_path: String,
29 pub mtime: Timestamp,
30}
31
32#[derive(Clone)]
33pub struct VectorDatabase {
34 path: Arc<Path>,
35 transactions:
36 smol::channel::Sender<Box<dyn 'static + Send + FnOnce(&mut rusqlite::Connection)>>,
37}
38
39impl VectorDatabase {
40 pub async fn new(
41 fs: Arc<dyn Fs>,
42 path: Arc<Path>,
43 executor: Arc<executor::Background>,
44 ) -> Result<Self> {
45 if let Some(db_directory) = path.parent() {
46 fs.create_dir(db_directory).await?;
47 }
48
49 let (transactions_tx, transactions_rx) = smol::channel::unbounded::<
50 Box<dyn 'static + Send + FnOnce(&mut rusqlite::Connection)>,
51 >();
52 executor
53 .spawn({
54 let path = path.clone();
55 async move {
56 let mut connection = rusqlite::Connection::open(&path)?;
57 while let Ok(transaction) = transactions_rx.recv().await {
58 transaction(&mut connection);
59 }
60
61 anyhow::Ok(())
62 }
63 .log_err()
64 })
65 .detach();
66 let this = Self {
67 transactions: transactions_tx,
68 path,
69 };
70 this.initialize_database().await?;
71 Ok(this)
72 }
73
74 pub fn path(&self) -> &Arc<Path> {
75 &self.path
76 }
77
78 fn transact<F, T>(&self, f: F) -> impl Future<Output = Result<T>>
79 where
80 F: 'static + Send + FnOnce(&rusqlite::Transaction) -> Result<T>,
81 T: 'static + Send,
82 {
83 let (tx, rx) = oneshot::channel();
84 let transactions = self.transactions.clone();
85 async move {
86 if transactions
87 .send(Box::new(|connection| {
88 let result = connection
89 .transaction()
90 .map_err(|err| anyhow!(err))
91 .and_then(|transaction| {
92 let result = f(&transaction)?;
93 transaction.commit()?;
94 Ok(result)
95 });
96 let _ = tx.send(result);
97 }))
98 .await
99 .is_err()
100 {
101 return Err(anyhow!("connection was dropped"))?;
102 }
103 rx.await?
104 }
105 }
106
107 fn initialize_database(&self) -> impl Future<Output = Result<()>> {
108 self.transact(|db| {
109 rusqlite::vtab::array::load_module(&db)?;
110
111 // Delete existing tables, if SEMANTIC_INDEX_VERSION is bumped
112 let version_query = db.prepare("SELECT version from semantic_index_config");
113 let version = version_query
114 .and_then(|mut query| query.query_row([], |row| Ok(row.get::<_, i64>(0)?)));
115 if version.map_or(false, |version| version == SEMANTIC_INDEX_VERSION as i64) {
116 log::trace!("vector database schema up to date");
117 return Ok(());
118 }
119
120 log::trace!("vector database schema out of date. updating...");
121 db.execute("DROP TABLE IF EXISTS documents", [])
122 .context("failed to drop 'documents' table")?;
123 db.execute("DROP TABLE IF EXISTS files", [])
124 .context("failed to drop 'files' table")?;
125 db.execute("DROP TABLE IF EXISTS worktrees", [])
126 .context("failed to drop 'worktrees' table")?;
127 db.execute("DROP TABLE IF EXISTS semantic_index_config", [])
128 .context("failed to drop 'semantic_index_config' table")?;
129
130 // Initialize Vector Databasing Tables
131 db.execute(
132 "CREATE TABLE semantic_index_config (
133 version INTEGER NOT NULL
134 )",
135 [],
136 )?;
137
138 db.execute(
139 "INSERT INTO semantic_index_config (version) VALUES (?1)",
140 params![SEMANTIC_INDEX_VERSION],
141 )?;
142
143 db.execute(
144 "CREATE TABLE worktrees (
145 id INTEGER PRIMARY KEY AUTOINCREMENT,
146 absolute_path VARCHAR NOT NULL
147 );
148 CREATE UNIQUE INDEX worktrees_absolute_path ON worktrees (absolute_path);
149 ",
150 [],
151 )?;
152
153 db.execute(
154 "CREATE TABLE files (
155 id INTEGER PRIMARY KEY AUTOINCREMENT,
156 worktree_id INTEGER NOT NULL,
157 relative_path VARCHAR NOT NULL,
158 mtime_seconds INTEGER NOT NULL,
159 mtime_nanos INTEGER NOT NULL,
160 FOREIGN KEY(worktree_id) REFERENCES worktrees(id) ON DELETE CASCADE
161 )",
162 [],
163 )?;
164
165 db.execute(
166 "CREATE TABLE documents (
167 id INTEGER PRIMARY KEY AUTOINCREMENT,
168 file_id INTEGER NOT NULL,
169 start_byte INTEGER NOT NULL,
170 end_byte INTEGER NOT NULL,
171 name VARCHAR NOT NULL,
172 embedding BLOB NOT NULL,
173 digest BLOB NOT NULL,
174 FOREIGN KEY(file_id) REFERENCES files(id) ON DELETE CASCADE
175 )",
176 [],
177 )?;
178
179 log::trace!("vector database initialized with updated schema.");
180 Ok(())
181 })
182 }
183
184 pub fn delete_file(
185 &self,
186 worktree_id: i64,
187 delete_path: PathBuf,
188 ) -> impl Future<Output = Result<()>> {
189 self.transact(move |db| {
190 db.execute(
191 "DELETE FROM files WHERE worktree_id = ?1 AND relative_path = ?2",
192 params![worktree_id, delete_path.to_str()],
193 )?;
194 Ok(())
195 })
196 }
197
198 pub fn insert_file(
199 &self,
200 worktree_id: i64,
201 path: PathBuf,
202 mtime: SystemTime,
203 documents: Vec<Document>,
204 ) -> impl Future<Output = Result<()>> {
205 self.transact(move |db| {
206 // Return the existing ID, if both the file and mtime match
207 let mtime = Timestamp::from(mtime);
208
209 let mut existing_id_query = db.prepare("SELECT id FROM files WHERE worktree_id = ?1 AND relative_path = ?2 AND mtime_seconds = ?3 AND mtime_nanos = ?4")?;
210 let existing_id = existing_id_query
211 .query_row(
212 params![worktree_id, path.to_str(), mtime.seconds, mtime.nanos],
213 |row| Ok(row.get::<_, i64>(0)?),
214 );
215
216 let file_id = if existing_id.is_ok() {
217 // If already exists, just return the existing id
218 existing_id?
219 } else {
220 // Delete Existing Row
221 db.execute(
222 "DELETE FROM files WHERE worktree_id = ?1 AND relative_path = ?2;",
223 params![worktree_id, path.to_str()],
224 )?;
225 db.execute("INSERT INTO files (worktree_id, relative_path, mtime_seconds, mtime_nanos) VALUES (?1, ?2, ?3, ?4);", params![worktree_id, path.to_str(), mtime.seconds, mtime.nanos])?;
226 db.last_insert_rowid()
227 };
228
229 // Currently inserting at approximately 3400 documents a second
230 // I imagine we can speed this up with a bulk insert of some kind.
231 for document in documents {
232 db.execute(
233 "INSERT INTO documents (file_id, start_byte, end_byte, name, embedding, digest) VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
234 params![
235 file_id,
236 document.range.start.to_string(),
237 document.range.end.to_string(),
238 document.name,
239 document.embedding,
240 document.digest
241 ],
242 )?;
243 }
244
245 Ok(())
246 })
247 }
248
249 pub fn worktree_previously_indexed(
250 &self,
251 worktree_root_path: &Path,
252 ) -> impl Future<Output = Result<bool>> {
253 let worktree_root_path = worktree_root_path.to_string_lossy().into_owned();
254 self.transact(move |db| {
255 let mut worktree_query =
256 db.prepare("SELECT id FROM worktrees WHERE absolute_path = ?1")?;
257 let worktree_id = worktree_query
258 .query_row(params![worktree_root_path], |row| Ok(row.get::<_, i64>(0)?));
259
260 if worktree_id.is_ok() {
261 return Ok(true);
262 } else {
263 return Ok(false);
264 }
265 })
266 }
267
268 pub fn embeddings_for_file(
269 &self,
270 worktree_id: i64,
271 relative_path: PathBuf,
272 ) -> impl Future<Output = Result<HashMap<DocumentDigest, Embedding>>> {
273 let relative_path = relative_path.to_string_lossy().into_owned();
274 self.transact(move |db| {
275 let mut query = db.prepare("SELECT digest, embedding FROM documents LEFT JOIN files ON files.id = documents.file_id WHERE files.worktree_id = ?1 AND files.relative_path = ?2")?;
276 let mut result: HashMap<DocumentDigest, Embedding> = HashMap::new();
277 for row in query.query_map(params![worktree_id, relative_path], |row| {
278 Ok((row.get::<_, DocumentDigest>(0)?.into(), row.get::<_, Embedding>(1)?.into()))
279 })? {
280 let row = row?;
281 result.insert(row.0, row.1);
282 }
283 Ok(result)
284 })
285 }
286
287 pub fn embeddings_for_files(
288 &self,
289 worktree_id_file_paths: Vec<(i64, PathBuf)>,
290 ) -> impl Future<Output = Result<HashMap<DocumentDigest, Embedding>>> {
291 todo!();
292 // The remainder of the code is wired up.
293 // I'm having a bit of trouble figuring out the rusqlite syntax for a WHERE (files.worktree_id, files.relative_path) IN (VALUES (?, ?), (?, ?)) query
294 async { Ok(HashMap::new()) }
295 // let mut embeddings_by_digest = HashMap::new();
296 // self.transact(move |db| {
297
298 // let worktree_ids: Rc<Vec<Value>> = Rc::new(
299 // worktree_id_file_paths
300 // .iter()
301 // .map(|(id, _)| Value::from(*id))
302 // .collect(),
303 // );
304 // let file_paths: Rc<Vec<Value>> = Rc::new(worktree_id_file_paths
305 // .iter()
306 // .map(|(_, path)| Value::from(path.to_string_lossy().to_string()))
307 // .collect());
308
309 // let mut query = db.prepare("SELECT digest, embedding FROM documents LEFT JOIN files ON files.id = documents.file_id WHERE (files.worktree_id, files.relative_path) IN (VALUES (rarray = (?1), rarray = (?2))")?;
310
311 // for row in query.query_map(params![worktree_ids, file_paths], |row| {
312 // Ok((row.get::<_, DocumentDigest>(0)?, row.get::<_, Embedding>(1)?))
313 // })? {
314 // if let Ok(row) = row {
315 // embeddings_by_digest.insert(row.0, row.1);
316 // }
317 // }
318 // Ok(embeddings_by_digest)
319 // })
320 }
321
322 pub fn find_or_create_worktree(
323 &self,
324 worktree_root_path: PathBuf,
325 ) -> impl Future<Output = Result<i64>> {
326 self.transact(move |db| {
327 let mut worktree_query =
328 db.prepare("SELECT id FROM worktrees WHERE absolute_path = ?1")?;
329 let worktree_id = worktree_query
330 .query_row(params![worktree_root_path.to_string_lossy()], |row| {
331 Ok(row.get::<_, i64>(0)?)
332 });
333
334 if worktree_id.is_ok() {
335 return Ok(worktree_id?);
336 }
337
338 // If worktree_id is Err, insert new worktree
339 db.execute(
340 "INSERT into worktrees (absolute_path) VALUES (?1)",
341 params![worktree_root_path.to_string_lossy()],
342 )?;
343 Ok(db.last_insert_rowid())
344 })
345 }
346
347 pub fn get_file_mtimes(
348 &self,
349 worktree_id: i64,
350 ) -> impl Future<Output = Result<HashMap<PathBuf, SystemTime>>> {
351 self.transact(move |db| {
352 let mut statement = db.prepare(
353 "
354 SELECT relative_path, mtime_seconds, mtime_nanos
355 FROM files
356 WHERE worktree_id = ?1
357 ORDER BY relative_path",
358 )?;
359 let mut result: HashMap<PathBuf, SystemTime> = HashMap::new();
360 for row in statement.query_map(params![worktree_id], |row| {
361 Ok((
362 row.get::<_, String>(0)?.into(),
363 Timestamp {
364 seconds: row.get(1)?,
365 nanos: row.get(2)?,
366 }
367 .into(),
368 ))
369 })? {
370 let row = row?;
371 result.insert(row.0, row.1);
372 }
373 Ok(result)
374 })
375 }
376
377 pub fn top_k_search(
378 &self,
379 query_embedding: &Embedding,
380 limit: usize,
381 file_ids: &[i64],
382 ) -> impl Future<Output = Result<Vec<(i64, f32)>>> {
383 let query_embedding = query_embedding.clone();
384 let file_ids = file_ids.to_vec();
385 self.transact(move |db| {
386 let mut results = Vec::<(i64, f32)>::with_capacity(limit + 1);
387 Self::for_each_document(db, &file_ids, |id, embedding| {
388 let similarity = embedding.similarity(&query_embedding);
389 let ix = match results.binary_search_by(|(_, s)| {
390 similarity.partial_cmp(&s).unwrap_or(Ordering::Equal)
391 }) {
392 Ok(ix) => ix,
393 Err(ix) => ix,
394 };
395 results.insert(ix, (id, similarity));
396 results.truncate(limit);
397 })?;
398
399 anyhow::Ok(results)
400 })
401 }
402
403 pub fn retrieve_included_file_ids(
404 &self,
405 worktree_ids: &[i64],
406 includes: &[PathMatcher],
407 excludes: &[PathMatcher],
408 ) -> impl Future<Output = Result<Vec<i64>>> {
409 let worktree_ids = worktree_ids.to_vec();
410 let includes = includes.to_vec();
411 let excludes = excludes.to_vec();
412 self.transact(move |db| {
413 let mut file_query = db.prepare(
414 "
415 SELECT
416 id, relative_path
417 FROM
418 files
419 WHERE
420 worktree_id IN rarray(?)
421 ",
422 )?;
423
424 let mut file_ids = Vec::<i64>::new();
425 let mut rows = file_query.query([ids_to_sql(&worktree_ids)])?;
426
427 while let Some(row) = rows.next()? {
428 let file_id = row.get(0)?;
429 let relative_path = row.get_ref(1)?.as_str()?;
430 let included =
431 includes.is_empty() || includes.iter().any(|glob| glob.is_match(relative_path));
432 let excluded = excludes.iter().any(|glob| glob.is_match(relative_path));
433 if included && !excluded {
434 file_ids.push(file_id);
435 }
436 }
437
438 anyhow::Ok(file_ids)
439 })
440 }
441
442 fn for_each_document(
443 db: &rusqlite::Connection,
444 file_ids: &[i64],
445 mut f: impl FnMut(i64, Embedding),
446 ) -> Result<()> {
447 let mut query_statement = db.prepare(
448 "
449 SELECT
450 id, embedding
451 FROM
452 documents
453 WHERE
454 file_id IN rarray(?)
455 ",
456 )?;
457
458 query_statement
459 .query_map(params![ids_to_sql(&file_ids)], |row| {
460 Ok((row.get(0)?, row.get::<_, Embedding>(1)?))
461 })?
462 .filter_map(|row| row.ok())
463 .for_each(|(id, embedding)| f(id, embedding));
464 Ok(())
465 }
466
467 pub fn get_documents_by_ids(
468 &self,
469 ids: &[i64],
470 ) -> impl Future<Output = Result<Vec<(i64, PathBuf, Range<usize>)>>> {
471 let ids = ids.to_vec();
472 self.transact(move |db| {
473 let mut statement = db.prepare(
474 "
475 SELECT
476 documents.id,
477 files.worktree_id,
478 files.relative_path,
479 documents.start_byte,
480 documents.end_byte
481 FROM
482 documents, files
483 WHERE
484 documents.file_id = files.id AND
485 documents.id in rarray(?)
486 ",
487 )?;
488
489 let result_iter = statement.query_map(params![ids_to_sql(&ids)], |row| {
490 Ok((
491 row.get::<_, i64>(0)?,
492 row.get::<_, i64>(1)?,
493 row.get::<_, String>(2)?.into(),
494 row.get(3)?..row.get(4)?,
495 ))
496 })?;
497
498 let mut values_by_id = HashMap::<i64, (i64, PathBuf, Range<usize>)>::default();
499 for row in result_iter {
500 let (id, worktree_id, path, range) = row?;
501 values_by_id.insert(id, (worktree_id, path, range));
502 }
503
504 let mut results = Vec::with_capacity(ids.len());
505 for id in &ids {
506 let value = values_by_id
507 .remove(id)
508 .ok_or(anyhow!("missing document id {}", id))?;
509 results.push(value);
510 }
511
512 Ok(results)
513 })
514 }
515}
516
517fn ids_to_sql(ids: &[i64]) -> Rc<Vec<rusqlite::types::Value>> {
518 Rc::new(
519 ids.iter()
520 .copied()
521 .map(|v| rusqlite::types::Value::from(v))
522 .collect::<Vec<_>>(),
523 )
524}