1use crate::{
2 embedding::Embedding,
3 parsing::{Span, SpanDigest},
4 SEMANTIC_INDEX_VERSION,
5};
6use anyhow::{anyhow, Context, Result};
7use collections::HashMap;
8use futures::channel::oneshot;
9use gpui::executor;
10use project::{search::PathMatcher, Fs};
11use rpc::proto::Timestamp;
12use rusqlite::params;
13use rusqlite::types::Value;
14use std::{
15 cmp::Ordering,
16 future::Future,
17 ops::Range,
18 path::{Path, PathBuf},
19 rc::Rc,
20 sync::Arc,
21 time::SystemTime,
22};
23use util::TryFutureExt;
24
25#[derive(Debug)]
26pub struct FileRecord {
27 pub id: usize,
28 pub relative_path: String,
29 pub mtime: Timestamp,
30}
31
32#[derive(Clone)]
33pub struct VectorDatabase {
34 path: Arc<Path>,
35 transactions:
36 smol::channel::Sender<Box<dyn 'static + Send + FnOnce(&mut rusqlite::Connection)>>,
37}
38
39impl VectorDatabase {
40 pub async fn new(
41 fs: Arc<dyn Fs>,
42 path: Arc<Path>,
43 executor: Arc<executor::Background>,
44 ) -> Result<Self> {
45 if let Some(db_directory) = path.parent() {
46 fs.create_dir(db_directory).await?;
47 }
48
49 let (transactions_tx, transactions_rx) = smol::channel::unbounded::<
50 Box<dyn 'static + Send + FnOnce(&mut rusqlite::Connection)>,
51 >();
52 executor
53 .spawn({
54 let path = path.clone();
55 async move {
56 let mut connection = rusqlite::Connection::open(&path)?;
57
58 connection.pragma_update(None, "journal_mode", "wal")?;
59 connection.pragma_update(None, "synchronous", "normal")?;
60 connection.pragma_update(None, "cache_size", 1000000)?;
61 connection.pragma_update(None, "temp_store", "MEMORY")?;
62
63 while let Ok(transaction) = transactions_rx.recv().await {
64 transaction(&mut connection);
65 }
66
67 anyhow::Ok(())
68 }
69 .log_err()
70 })
71 .detach();
72 let this = Self {
73 transactions: transactions_tx,
74 path,
75 };
76 this.initialize_database().await?;
77 Ok(this)
78 }
79
80 pub fn path(&self) -> &Arc<Path> {
81 &self.path
82 }
83
84 fn transact<F, T>(&self, f: F) -> impl Future<Output = Result<T>>
85 where
86 F: 'static + Send + FnOnce(&rusqlite::Transaction) -> Result<T>,
87 T: 'static + Send,
88 {
89 let (tx, rx) = oneshot::channel();
90 let transactions = self.transactions.clone();
91 async move {
92 if transactions
93 .send(Box::new(|connection| {
94 let result = connection
95 .transaction()
96 .map_err(|err| anyhow!(err))
97 .and_then(|transaction| {
98 let result = f(&transaction)?;
99 transaction.commit()?;
100 Ok(result)
101 });
102 let _ = tx.send(result);
103 }))
104 .await
105 .is_err()
106 {
107 return Err(anyhow!("connection was dropped"))?;
108 }
109 rx.await?
110 }
111 }
112
113 fn initialize_database(&self) -> impl Future<Output = Result<()>> {
114 self.transact(|db| {
115 rusqlite::vtab::array::load_module(&db)?;
116
117 // Delete existing tables, if SEMANTIC_INDEX_VERSION is bumped
118 let version_query = db.prepare("SELECT version from semantic_index_config");
119 let version = version_query
120 .and_then(|mut query| query.query_row([], |row| Ok(row.get::<_, i64>(0)?)));
121 if version.map_or(false, |version| version == SEMANTIC_INDEX_VERSION as i64) {
122 log::trace!("vector database schema up to date");
123 return Ok(());
124 }
125
126 log::trace!("vector database schema out of date. updating...");
127 // We renamed the `documents` table to `spans`, so we want to drop
128 // `documents` without recreating it if it exists.
129 db.execute("DROP TABLE IF EXISTS documents", [])
130 .context("failed to drop 'documents' table")?;
131 db.execute("DROP TABLE IF EXISTS spans", [])
132 .context("failed to drop 'spans' table")?;
133 db.execute("DROP TABLE IF EXISTS files", [])
134 .context("failed to drop 'files' table")?;
135 db.execute("DROP TABLE IF EXISTS worktrees", [])
136 .context("failed to drop 'worktrees' table")?;
137 db.execute("DROP TABLE IF EXISTS semantic_index_config", [])
138 .context("failed to drop 'semantic_index_config' table")?;
139
140 // Initialize Vector Databasing Tables
141 db.execute(
142 "CREATE TABLE semantic_index_config (
143 version INTEGER NOT NULL
144 )",
145 [],
146 )?;
147
148 db.execute(
149 "INSERT INTO semantic_index_config (version) VALUES (?1)",
150 params![SEMANTIC_INDEX_VERSION],
151 )?;
152
153 db.execute(
154 "CREATE TABLE worktrees (
155 id INTEGER PRIMARY KEY AUTOINCREMENT,
156 absolute_path VARCHAR NOT NULL
157 );
158 CREATE UNIQUE INDEX worktrees_absolute_path ON worktrees (absolute_path);
159 ",
160 [],
161 )?;
162
163 db.execute(
164 "CREATE TABLE files (
165 id INTEGER PRIMARY KEY AUTOINCREMENT,
166 worktree_id INTEGER NOT NULL,
167 relative_path VARCHAR NOT NULL,
168 mtime_seconds INTEGER NOT NULL,
169 mtime_nanos INTEGER NOT NULL,
170 FOREIGN KEY(worktree_id) REFERENCES worktrees(id) ON DELETE CASCADE
171 )",
172 [],
173 )?;
174
175 db.execute(
176 "CREATE UNIQUE INDEX files_worktree_id_and_relative_path ON files (worktree_id, relative_path)",
177 [],
178 )?;
179
180 db.execute(
181 "CREATE TABLE spans (
182 id INTEGER PRIMARY KEY AUTOINCREMENT,
183 file_id INTEGER NOT NULL,
184 start_byte INTEGER NOT NULL,
185 end_byte INTEGER NOT NULL,
186 name VARCHAR NOT NULL,
187 embedding BLOB NOT NULL,
188 digest BLOB NOT NULL,
189 FOREIGN KEY(file_id) REFERENCES files(id) ON DELETE CASCADE
190 )",
191 [],
192 )?;
193 db.execute(
194 "CREATE INDEX spans_digest ON spans (digest)",
195 [],
196 )?;
197
198 log::trace!("vector database initialized with updated schema.");
199 Ok(())
200 })
201 }
202
203 pub fn delete_file(
204 &self,
205 worktree_id: i64,
206 delete_path: Arc<Path>,
207 ) -> impl Future<Output = Result<()>> {
208 self.transact(move |db| {
209 db.execute(
210 "DELETE FROM files WHERE worktree_id = ?1 AND relative_path = ?2",
211 params![worktree_id, delete_path.to_str()],
212 )?;
213 Ok(())
214 })
215 }
216
217 pub fn insert_file(
218 &self,
219 worktree_id: i64,
220 path: Arc<Path>,
221 mtime: SystemTime,
222 spans: Vec<Span>,
223 ) -> impl Future<Output = Result<()>> {
224 self.transact(move |db| {
225 // Return the existing ID, if both the file and mtime match
226 let mtime = Timestamp::from(mtime);
227
228 db.execute(
229 "
230 REPLACE INTO files
231 (worktree_id, relative_path, mtime_seconds, mtime_nanos)
232 VALUES (?1, ?2, ?3, ?4)
233 ",
234 params![worktree_id, path.to_str(), mtime.seconds, mtime.nanos],
235 )?;
236
237 let file_id = db.last_insert_rowid();
238
239 let mut query = db.prepare(
240 "
241 INSERT INTO spans
242 (file_id, start_byte, end_byte, name, embedding, digest)
243 VALUES (?1, ?2, ?3, ?4, ?5, ?6)
244 ",
245 )?;
246
247 for span in spans {
248 query.execute(params![
249 file_id,
250 span.range.start.to_string(),
251 span.range.end.to_string(),
252 span.name,
253 span.embedding,
254 span.digest
255 ])?;
256 }
257
258 Ok(())
259 })
260 }
261
262 pub fn worktree_previously_indexed(
263 &self,
264 worktree_root_path: &Path,
265 ) -> impl Future<Output = Result<bool>> {
266 let worktree_root_path = worktree_root_path.to_string_lossy().into_owned();
267 self.transact(move |db| {
268 let mut worktree_query =
269 db.prepare("SELECT id FROM worktrees WHERE absolute_path = ?1")?;
270 let worktree_id = worktree_query
271 .query_row(params![worktree_root_path], |row| Ok(row.get::<_, i64>(0)?));
272
273 if worktree_id.is_ok() {
274 return Ok(true);
275 } else {
276 return Ok(false);
277 }
278 })
279 }
280
281 pub fn embeddings_for_digests(
282 &self,
283 digests: Vec<SpanDigest>,
284 ) -> impl Future<Output = Result<HashMap<SpanDigest, Embedding>>> {
285 self.transact(move |db| {
286 let mut query = db.prepare(
287 "
288 SELECT digest, embedding
289 FROM spans
290 WHERE digest IN rarray(?)
291 ",
292 )?;
293 let mut embeddings_by_digest = HashMap::default();
294 let digests = Rc::new(
295 digests
296 .into_iter()
297 .map(|p| Value::Blob(p.0.to_vec()))
298 .collect::<Vec<_>>(),
299 );
300 let rows = query.query_map(params![digests], |row| {
301 Ok((row.get::<_, SpanDigest>(0)?, row.get::<_, Embedding>(1)?))
302 })?;
303
304 for row in rows {
305 if let Ok(row) = row {
306 embeddings_by_digest.insert(row.0, row.1);
307 }
308 }
309
310 Ok(embeddings_by_digest)
311 })
312 }
313
314 pub fn embeddings_for_files(
315 &self,
316 worktree_id_file_paths: HashMap<i64, Vec<Arc<Path>>>,
317 ) -> impl Future<Output = Result<HashMap<SpanDigest, Embedding>>> {
318 self.transact(move |db| {
319 let mut query = db.prepare(
320 "
321 SELECT digest, embedding
322 FROM spans
323 LEFT JOIN files ON files.id = spans.file_id
324 WHERE files.worktree_id = ? AND files.relative_path IN rarray(?)
325 ",
326 )?;
327 let mut embeddings_by_digest = HashMap::default();
328 for (worktree_id, file_paths) in worktree_id_file_paths {
329 let file_paths = Rc::new(
330 file_paths
331 .into_iter()
332 .map(|p| Value::Text(p.to_string_lossy().into_owned()))
333 .collect::<Vec<_>>(),
334 );
335 let rows = query.query_map(params![worktree_id, file_paths], |row| {
336 Ok((row.get::<_, SpanDigest>(0)?, row.get::<_, Embedding>(1)?))
337 })?;
338
339 for row in rows {
340 if let Ok(row) = row {
341 embeddings_by_digest.insert(row.0, row.1);
342 }
343 }
344 }
345
346 Ok(embeddings_by_digest)
347 })
348 }
349
350 pub fn find_or_create_worktree(
351 &self,
352 worktree_root_path: Arc<Path>,
353 ) -> impl Future<Output = Result<i64>> {
354 self.transact(move |db| {
355 let mut worktree_query =
356 db.prepare("SELECT id FROM worktrees WHERE absolute_path = ?1")?;
357 let worktree_id = worktree_query
358 .query_row(params![worktree_root_path.to_string_lossy()], |row| {
359 Ok(row.get::<_, i64>(0)?)
360 });
361
362 if worktree_id.is_ok() {
363 return Ok(worktree_id?);
364 }
365
366 // If worktree_id is Err, insert new worktree
367 db.execute(
368 "INSERT into worktrees (absolute_path) VALUES (?1)",
369 params![worktree_root_path.to_string_lossy()],
370 )?;
371 Ok(db.last_insert_rowid())
372 })
373 }
374
375 pub fn get_file_mtimes(
376 &self,
377 worktree_id: i64,
378 ) -> impl Future<Output = Result<HashMap<PathBuf, SystemTime>>> {
379 self.transact(move |db| {
380 let mut statement = db.prepare(
381 "
382 SELECT relative_path, mtime_seconds, mtime_nanos
383 FROM files
384 WHERE worktree_id = ?1
385 ORDER BY relative_path",
386 )?;
387 let mut result: HashMap<PathBuf, SystemTime> = HashMap::default();
388 for row in statement.query_map(params![worktree_id], |row| {
389 Ok((
390 row.get::<_, String>(0)?.into(),
391 Timestamp {
392 seconds: row.get(1)?,
393 nanos: row.get(2)?,
394 }
395 .into(),
396 ))
397 })? {
398 let row = row?;
399 result.insert(row.0, row.1);
400 }
401 Ok(result)
402 })
403 }
404
405 pub fn top_k_search(
406 &self,
407 query_embedding: &Embedding,
408 limit: usize,
409 file_ids: &[i64],
410 ) -> impl Future<Output = Result<Vec<(i64, f32)>>> {
411 let query_embedding = query_embedding.clone();
412 let file_ids = file_ids.to_vec();
413 self.transact(move |db| {
414 let mut results = Vec::<(i64, f32)>::with_capacity(limit + 1);
415 Self::for_each_span(db, &file_ids, |id, embedding| {
416 let similarity = embedding.similarity(&query_embedding);
417 let ix = match results.binary_search_by(|(_, s)| {
418 similarity.partial_cmp(&s).unwrap_or(Ordering::Equal)
419 }) {
420 Ok(ix) => ix,
421 Err(ix) => ix,
422 };
423 results.insert(ix, (id, similarity));
424 results.truncate(limit);
425 })?;
426
427 anyhow::Ok(results)
428 })
429 }
430
431 pub fn retrieve_included_file_ids(
432 &self,
433 worktree_ids: &[i64],
434 includes: &[PathMatcher],
435 excludes: &[PathMatcher],
436 ) -> impl Future<Output = Result<Vec<i64>>> {
437 let worktree_ids = worktree_ids.to_vec();
438 let includes = includes.to_vec();
439 let excludes = excludes.to_vec();
440 self.transact(move |db| {
441 let mut file_query = db.prepare(
442 "
443 SELECT
444 id, relative_path
445 FROM
446 files
447 WHERE
448 worktree_id IN rarray(?)
449 ",
450 )?;
451
452 let mut file_ids = Vec::<i64>::new();
453 let mut rows = file_query.query([ids_to_sql(&worktree_ids)])?;
454
455 while let Some(row) = rows.next()? {
456 let file_id = row.get(0)?;
457 let relative_path = row.get_ref(1)?.as_str()?;
458 let included =
459 includes.is_empty() || includes.iter().any(|glob| glob.is_match(relative_path));
460 let excluded = excludes.iter().any(|glob| glob.is_match(relative_path));
461 if included && !excluded {
462 file_ids.push(file_id);
463 }
464 }
465
466 anyhow::Ok(file_ids)
467 })
468 }
469
470 fn for_each_span(
471 db: &rusqlite::Connection,
472 file_ids: &[i64],
473 mut f: impl FnMut(i64, Embedding),
474 ) -> Result<()> {
475 let mut query_statement = db.prepare(
476 "
477 SELECT
478 id, embedding
479 FROM
480 spans
481 WHERE
482 file_id IN rarray(?)
483 ",
484 )?;
485
486 query_statement
487 .query_map(params![ids_to_sql(&file_ids)], |row| {
488 Ok((row.get(0)?, row.get::<_, Embedding>(1)?))
489 })?
490 .filter_map(|row| row.ok())
491 .for_each(|(id, embedding)| f(id, embedding));
492 Ok(())
493 }
494
495 pub fn spans_for_ids(
496 &self,
497 ids: &[i64],
498 ) -> impl Future<Output = Result<Vec<(i64, PathBuf, Range<usize>)>>> {
499 let ids = ids.to_vec();
500 self.transact(move |db| {
501 let mut statement = db.prepare(
502 "
503 SELECT
504 spans.id,
505 files.worktree_id,
506 files.relative_path,
507 spans.start_byte,
508 spans.end_byte
509 FROM
510 spans, files
511 WHERE
512 spans.file_id = files.id AND
513 spans.id in rarray(?)
514 ",
515 )?;
516
517 let result_iter = statement.query_map(params![ids_to_sql(&ids)], |row| {
518 Ok((
519 row.get::<_, i64>(0)?,
520 row.get::<_, i64>(1)?,
521 row.get::<_, String>(2)?.into(),
522 row.get(3)?..row.get(4)?,
523 ))
524 })?;
525
526 let mut values_by_id = HashMap::<i64, (i64, PathBuf, Range<usize>)>::default();
527 for row in result_iter {
528 let (id, worktree_id, path, range) = row?;
529 values_by_id.insert(id, (worktree_id, path, range));
530 }
531
532 let mut results = Vec::with_capacity(ids.len());
533 for id in &ids {
534 let value = values_by_id
535 .remove(id)
536 .ok_or(anyhow!("missing span id {}", id))?;
537 results.push(value);
538 }
539
540 Ok(results)
541 })
542 }
543}
544
545fn ids_to_sql(ids: &[i64]) -> Rc<Vec<rusqlite::types::Value>> {
546 Rc::new(
547 ids.iter()
548 .copied()
549 .map(|v| rusqlite::types::Value::from(v))
550 .collect::<Vec<_>>(),
551 )
552}