1use crate::{
2 embedding::Embedding,
3 parsing::{Span, SpanDigest},
4 SEMANTIC_INDEX_VERSION,
5};
6use anyhow::{anyhow, Context, Result};
7use collections::HashMap;
8use futures::channel::oneshot;
9use gpui::executor;
10use ordered_float::OrderedFloat;
11use project::{search::PathMatcher, Fs};
12use rpc::proto::Timestamp;
13use rusqlite::params;
14use rusqlite::types::Value;
15use std::{
16 cmp::Reverse,
17 future::Future,
18 ops::Range,
19 path::{Path, PathBuf},
20 rc::Rc,
21 sync::Arc,
22 time::SystemTime,
23};
24use util::TryFutureExt;
25
26#[derive(Debug)]
27pub struct FileRecord {
28 pub id: usize,
29 pub relative_path: String,
30 pub mtime: Timestamp,
31}
32
33#[derive(Clone)]
34pub struct VectorDatabase {
35 path: Arc<Path>,
36 transactions:
37 smol::channel::Sender<Box<dyn 'static + Send + FnOnce(&mut rusqlite::Connection)>>,
38}
39
40impl VectorDatabase {
41 pub async fn new(
42 fs: Arc<dyn Fs>,
43 path: Arc<Path>,
44 executor: Arc<executor::Background>,
45 ) -> Result<Self> {
46 if let Some(db_directory) = path.parent() {
47 fs.create_dir(db_directory).await?;
48 }
49
50 let (transactions_tx, transactions_rx) = smol::channel::unbounded::<
51 Box<dyn 'static + Send + FnOnce(&mut rusqlite::Connection)>,
52 >();
53 executor
54 .spawn({
55 let path = path.clone();
56 async move {
57 let mut connection = rusqlite::Connection::open(&path)?;
58
59 connection.pragma_update(None, "journal_mode", "wal")?;
60 connection.pragma_update(None, "synchronous", "normal")?;
61 connection.pragma_update(None, "cache_size", 1000000)?;
62 connection.pragma_update(None, "temp_store", "MEMORY")?;
63
64 while let Ok(transaction) = transactions_rx.recv().await {
65 transaction(&mut connection);
66 }
67
68 anyhow::Ok(())
69 }
70 .log_err()
71 })
72 .detach();
73 let this = Self {
74 transactions: transactions_tx,
75 path,
76 };
77 this.initialize_database().await?;
78 Ok(this)
79 }
80
81 pub fn path(&self) -> &Arc<Path> {
82 &self.path
83 }
84
85 fn transact<F, T>(&self, f: F) -> impl Future<Output = Result<T>>
86 where
87 F: 'static + Send + FnOnce(&rusqlite::Transaction) -> Result<T>,
88 T: 'static + Send,
89 {
90 let (tx, rx) = oneshot::channel();
91 let transactions = self.transactions.clone();
92 async move {
93 if transactions
94 .send(Box::new(|connection| {
95 let result = connection
96 .transaction()
97 .map_err(|err| anyhow!(err))
98 .and_then(|transaction| {
99 let result = f(&transaction)?;
100 transaction.commit()?;
101 Ok(result)
102 });
103 let _ = tx.send(result);
104 }))
105 .await
106 .is_err()
107 {
108 return Err(anyhow!("connection was dropped"))?;
109 }
110 rx.await?
111 }
112 }
113
114 fn initialize_database(&self) -> impl Future<Output = Result<()>> {
115 self.transact(|db| {
116 rusqlite::vtab::array::load_module(&db)?;
117
118 // Delete existing tables, if SEMANTIC_INDEX_VERSION is bumped
119 let version_query = db.prepare("SELECT version from semantic_index_config");
120 let version = version_query
121 .and_then(|mut query| query.query_row([], |row| Ok(row.get::<_, i64>(0)?)));
122 if version.map_or(false, |version| version == SEMANTIC_INDEX_VERSION as i64) {
123 log::trace!("vector database schema up to date");
124 return Ok(());
125 }
126
127 log::trace!("vector database schema out of date. updating...");
128 // We renamed the `documents` table to `spans`, so we want to drop
129 // `documents` without recreating it if it exists.
130 db.execute("DROP TABLE IF EXISTS documents", [])
131 .context("failed to drop 'documents' table")?;
132 db.execute("DROP TABLE IF EXISTS spans", [])
133 .context("failed to drop 'spans' table")?;
134 db.execute("DROP TABLE IF EXISTS files", [])
135 .context("failed to drop 'files' table")?;
136 db.execute("DROP TABLE IF EXISTS worktrees", [])
137 .context("failed to drop 'worktrees' table")?;
138 db.execute("DROP TABLE IF EXISTS semantic_index_config", [])
139 .context("failed to drop 'semantic_index_config' table")?;
140
141 // Initialize Vector Databasing Tables
142 db.execute(
143 "CREATE TABLE semantic_index_config (
144 version INTEGER NOT NULL
145 )",
146 [],
147 )?;
148
149 db.execute(
150 "INSERT INTO semantic_index_config (version) VALUES (?1)",
151 params![SEMANTIC_INDEX_VERSION],
152 )?;
153
154 db.execute(
155 "CREATE TABLE worktrees (
156 id INTEGER PRIMARY KEY AUTOINCREMENT,
157 absolute_path VARCHAR NOT NULL
158 );
159 CREATE UNIQUE INDEX worktrees_absolute_path ON worktrees (absolute_path);
160 ",
161 [],
162 )?;
163
164 db.execute(
165 "CREATE TABLE files (
166 id INTEGER PRIMARY KEY AUTOINCREMENT,
167 worktree_id INTEGER NOT NULL,
168 relative_path VARCHAR NOT NULL,
169 mtime_seconds INTEGER NOT NULL,
170 mtime_nanos INTEGER NOT NULL,
171 FOREIGN KEY(worktree_id) REFERENCES worktrees(id) ON DELETE CASCADE
172 )",
173 [],
174 )?;
175
176 db.execute(
177 "CREATE UNIQUE INDEX files_worktree_id_and_relative_path ON files (worktree_id, relative_path)",
178 [],
179 )?;
180
181 db.execute(
182 "CREATE TABLE spans (
183 id INTEGER PRIMARY KEY AUTOINCREMENT,
184 file_id INTEGER NOT NULL,
185 start_byte INTEGER NOT NULL,
186 end_byte INTEGER NOT NULL,
187 name VARCHAR NOT NULL,
188 embedding BLOB NOT NULL,
189 digest BLOB NOT NULL,
190 FOREIGN KEY(file_id) REFERENCES files(id) ON DELETE CASCADE
191 )",
192 [],
193 )?;
194 db.execute(
195 "CREATE INDEX spans_digest ON spans (digest)",
196 [],
197 )?;
198
199 log::trace!("vector database initialized with updated schema.");
200 Ok(())
201 })
202 }
203
204 pub fn delete_file(
205 &self,
206 worktree_id: i64,
207 delete_path: Arc<Path>,
208 ) -> impl Future<Output = Result<()>> {
209 self.transact(move |db| {
210 db.execute(
211 "DELETE FROM files WHERE worktree_id = ?1 AND relative_path = ?2",
212 params![worktree_id, delete_path.to_str()],
213 )?;
214 Ok(())
215 })
216 }
217
218 pub fn insert_file(
219 &self,
220 worktree_id: i64,
221 path: Arc<Path>,
222 mtime: SystemTime,
223 spans: Vec<Span>,
224 ) -> impl Future<Output = Result<()>> {
225 self.transact(move |db| {
226 // Return the existing ID, if both the file and mtime match
227 let mtime = Timestamp::from(mtime);
228
229 db.execute(
230 "
231 REPLACE INTO files
232 (worktree_id, relative_path, mtime_seconds, mtime_nanos)
233 VALUES (?1, ?2, ?3, ?4)
234 ",
235 params![worktree_id, path.to_str(), mtime.seconds, mtime.nanos],
236 )?;
237
238 let file_id = db.last_insert_rowid();
239
240 let mut query = db.prepare(
241 "
242 INSERT INTO spans
243 (file_id, start_byte, end_byte, name, embedding, digest)
244 VALUES (?1, ?2, ?3, ?4, ?5, ?6)
245 ",
246 )?;
247
248 for span in spans {
249 query.execute(params![
250 file_id,
251 span.range.start.to_string(),
252 span.range.end.to_string(),
253 span.name,
254 span.embedding,
255 span.digest
256 ])?;
257 }
258
259 Ok(())
260 })
261 }
262
263 pub fn worktree_previously_indexed(
264 &self,
265 worktree_root_path: &Path,
266 ) -> impl Future<Output = Result<bool>> {
267 let worktree_root_path = worktree_root_path.to_string_lossy().into_owned();
268 self.transact(move |db| {
269 let mut worktree_query =
270 db.prepare("SELECT id FROM worktrees WHERE absolute_path = ?1")?;
271 let worktree_id = worktree_query
272 .query_row(params![worktree_root_path], |row| Ok(row.get::<_, i64>(0)?));
273
274 if worktree_id.is_ok() {
275 return Ok(true);
276 } else {
277 return Ok(false);
278 }
279 })
280 }
281
282 pub fn embeddings_for_digests(
283 &self,
284 digests: Vec<SpanDigest>,
285 ) -> impl Future<Output = Result<HashMap<SpanDigest, Embedding>>> {
286 self.transact(move |db| {
287 let mut query = db.prepare(
288 "
289 SELECT digest, embedding
290 FROM spans
291 WHERE digest IN rarray(?)
292 ",
293 )?;
294 let mut embeddings_by_digest = HashMap::default();
295 let digests = Rc::new(
296 digests
297 .into_iter()
298 .map(|p| Value::Blob(p.0.to_vec()))
299 .collect::<Vec<_>>(),
300 );
301 let rows = query.query_map(params![digests], |row| {
302 Ok((row.get::<_, SpanDigest>(0)?, row.get::<_, Embedding>(1)?))
303 })?;
304
305 for row in rows {
306 if let Ok(row) = row {
307 embeddings_by_digest.insert(row.0, row.1);
308 }
309 }
310
311 Ok(embeddings_by_digest)
312 })
313 }
314
315 pub fn embeddings_for_files(
316 &self,
317 worktree_id_file_paths: HashMap<i64, Vec<Arc<Path>>>,
318 ) -> impl Future<Output = Result<HashMap<SpanDigest, Embedding>>> {
319 self.transact(move |db| {
320 let mut query = db.prepare(
321 "
322 SELECT digest, embedding
323 FROM spans
324 LEFT JOIN files ON files.id = spans.file_id
325 WHERE files.worktree_id = ? AND files.relative_path IN rarray(?)
326 ",
327 )?;
328 let mut embeddings_by_digest = HashMap::default();
329 for (worktree_id, file_paths) in worktree_id_file_paths {
330 let file_paths = Rc::new(
331 file_paths
332 .into_iter()
333 .map(|p| Value::Text(p.to_string_lossy().into_owned()))
334 .collect::<Vec<_>>(),
335 );
336 let rows = query.query_map(params![worktree_id, file_paths], |row| {
337 Ok((row.get::<_, SpanDigest>(0)?, row.get::<_, Embedding>(1)?))
338 })?;
339
340 for row in rows {
341 if let Ok(row) = row {
342 embeddings_by_digest.insert(row.0, row.1);
343 }
344 }
345 }
346
347 Ok(embeddings_by_digest)
348 })
349 }
350
351 pub fn find_or_create_worktree(
352 &self,
353 worktree_root_path: Arc<Path>,
354 ) -> impl Future<Output = Result<i64>> {
355 self.transact(move |db| {
356 let mut worktree_query =
357 db.prepare("SELECT id FROM worktrees WHERE absolute_path = ?1")?;
358 let worktree_id = worktree_query
359 .query_row(params![worktree_root_path.to_string_lossy()], |row| {
360 Ok(row.get::<_, i64>(0)?)
361 });
362
363 if worktree_id.is_ok() {
364 return Ok(worktree_id?);
365 }
366
367 // If worktree_id is Err, insert new worktree
368 db.execute(
369 "INSERT into worktrees (absolute_path) VALUES (?1)",
370 params![worktree_root_path.to_string_lossy()],
371 )?;
372 Ok(db.last_insert_rowid())
373 })
374 }
375
376 pub fn get_file_mtimes(
377 &self,
378 worktree_id: i64,
379 ) -> impl Future<Output = Result<HashMap<PathBuf, SystemTime>>> {
380 self.transact(move |db| {
381 let mut statement = db.prepare(
382 "
383 SELECT relative_path, mtime_seconds, mtime_nanos
384 FROM files
385 WHERE worktree_id = ?1
386 ORDER BY relative_path",
387 )?;
388 let mut result: HashMap<PathBuf, SystemTime> = HashMap::default();
389 for row in statement.query_map(params![worktree_id], |row| {
390 Ok((
391 row.get::<_, String>(0)?.into(),
392 Timestamp {
393 seconds: row.get(1)?,
394 nanos: row.get(2)?,
395 }
396 .into(),
397 ))
398 })? {
399 let row = row?;
400 result.insert(row.0, row.1);
401 }
402 Ok(result)
403 })
404 }
405
406 pub fn top_k_search(
407 &self,
408 query_embedding: &Embedding,
409 limit: usize,
410 file_ids: &[i64],
411 ) -> impl Future<Output = Result<Vec<(i64, OrderedFloat<f32>)>>> {
412 let query_embedding = query_embedding.clone();
413 let file_ids = file_ids.to_vec();
414 self.transact(move |db| {
415 let mut results = Vec::<(i64, OrderedFloat<f32>)>::with_capacity(limit + 1);
416 Self::for_each_span(db, &file_ids, |id, embedding| {
417 let similarity = embedding.similarity(&query_embedding);
418 let ix = match results
419 .binary_search_by_key(&Reverse(similarity), |(_, s)| Reverse(*s))
420 {
421 Ok(ix) => ix,
422 Err(ix) => ix,
423 };
424 results.insert(ix, (id, similarity));
425 results.truncate(limit);
426 })?;
427
428 anyhow::Ok(results)
429 })
430 }
431
432 pub fn retrieve_included_file_ids(
433 &self,
434 worktree_ids: &[i64],
435 includes: &[PathMatcher],
436 excludes: &[PathMatcher],
437 ) -> impl Future<Output = Result<Vec<i64>>> {
438 let worktree_ids = worktree_ids.to_vec();
439 let includes = includes.to_vec();
440 let excludes = excludes.to_vec();
441 self.transact(move |db| {
442 let mut file_query = db.prepare(
443 "
444 SELECT
445 id, relative_path
446 FROM
447 files
448 WHERE
449 worktree_id IN rarray(?)
450 ",
451 )?;
452
453 let mut file_ids = Vec::<i64>::new();
454 let mut rows = file_query.query([ids_to_sql(&worktree_ids)])?;
455
456 while let Some(row) = rows.next()? {
457 let file_id = row.get(0)?;
458 let relative_path = row.get_ref(1)?.as_str()?;
459 let included =
460 includes.is_empty() || includes.iter().any(|glob| glob.is_match(relative_path));
461 let excluded = excludes.iter().any(|glob| glob.is_match(relative_path));
462 if included && !excluded {
463 file_ids.push(file_id);
464 }
465 }
466
467 anyhow::Ok(file_ids)
468 })
469 }
470
471 fn for_each_span(
472 db: &rusqlite::Connection,
473 file_ids: &[i64],
474 mut f: impl FnMut(i64, Embedding),
475 ) -> Result<()> {
476 let mut query_statement = db.prepare(
477 "
478 SELECT
479 id, embedding
480 FROM
481 spans
482 WHERE
483 file_id IN rarray(?)
484 ",
485 )?;
486
487 query_statement
488 .query_map(params![ids_to_sql(&file_ids)], |row| {
489 Ok((row.get(0)?, row.get::<_, Embedding>(1)?))
490 })?
491 .filter_map(|row| row.ok())
492 .for_each(|(id, embedding)| f(id, embedding));
493 Ok(())
494 }
495
496 pub fn spans_for_ids(
497 &self,
498 ids: &[i64],
499 ) -> impl Future<Output = Result<Vec<(i64, PathBuf, Range<usize>)>>> {
500 let ids = ids.to_vec();
501 self.transact(move |db| {
502 let mut statement = db.prepare(
503 "
504 SELECT
505 spans.id,
506 files.worktree_id,
507 files.relative_path,
508 spans.start_byte,
509 spans.end_byte
510 FROM
511 spans, files
512 WHERE
513 spans.file_id = files.id AND
514 spans.id in rarray(?)
515 ",
516 )?;
517
518 let result_iter = statement.query_map(params![ids_to_sql(&ids)], |row| {
519 Ok((
520 row.get::<_, i64>(0)?,
521 row.get::<_, i64>(1)?,
522 row.get::<_, String>(2)?.into(),
523 row.get(3)?..row.get(4)?,
524 ))
525 })?;
526
527 let mut values_by_id = HashMap::<i64, (i64, PathBuf, Range<usize>)>::default();
528 for row in result_iter {
529 let (id, worktree_id, path, range) = row?;
530 values_by_id.insert(id, (worktree_id, path, range));
531 }
532
533 let mut results = Vec::with_capacity(ids.len());
534 for id in &ids {
535 let value = values_by_id
536 .remove(id)
537 .ok_or(anyhow!("missing span id {}", id))?;
538 results.push(value);
539 }
540
541 Ok(results)
542 })
543 }
544}
545
546fn ids_to_sql(ids: &[i64]) -> Rc<Vec<rusqlite::types::Value>> {
547 Rc::new(
548 ids.iter()
549 .copied()
550 .map(|v| rusqlite::types::Value::from(v))
551 .collect::<Vec<_>>(),
552 )
553}