1use crate::{
2 embedding::Embedding,
3 parsing::{Span, SpanDigest},
4 SEMANTIC_INDEX_VERSION,
5};
6use anyhow::{anyhow, Context, Result};
7use collections::HashMap;
8use futures::channel::oneshot;
9use gpui::executor;
10use project::{search::PathMatcher, Fs};
11use rpc::proto::Timestamp;
12use rusqlite::params;
13use rusqlite::types::Value;
14use std::{
15 cmp::Ordering,
16 future::Future,
17 ops::Range,
18 path::{Path, PathBuf},
19 rc::Rc,
20 sync::Arc,
21 time::{Instant, SystemTime},
22};
23use util::TryFutureExt;
24
25#[derive(Debug)]
26pub struct FileRecord {
27 pub id: usize,
28 pub relative_path: String,
29 pub mtime: Timestamp,
30}
31
32#[derive(Clone)]
33pub struct VectorDatabase {
34 path: Arc<Path>,
35 transactions:
36 smol::channel::Sender<Box<dyn 'static + Send + FnOnce(&mut rusqlite::Connection)>>,
37}
38
39impl VectorDatabase {
40 pub async fn new(
41 fs: Arc<dyn Fs>,
42 path: Arc<Path>,
43 executor: Arc<executor::Background>,
44 ) -> Result<Self> {
45 if let Some(db_directory) = path.parent() {
46 fs.create_dir(db_directory).await?;
47 }
48
49 let (transactions_tx, transactions_rx) = smol::channel::unbounded::<
50 Box<dyn 'static + Send + FnOnce(&mut rusqlite::Connection)>,
51 >();
52 executor
53 .spawn({
54 let path = path.clone();
55 async move {
56 let mut connection = rusqlite::Connection::open(&path)?;
57
58 connection.pragma_update(None, "journal_mode", "wal")?;
59 connection.pragma_update(None, "synchronous", "normal")?;
60 connection.pragma_update(None, "cache_size", 1000000)?;
61 connection.pragma_update(None, "temp_store", "MEMORY")?;
62
63 while let Ok(transaction) = transactions_rx.recv().await {
64 transaction(&mut connection);
65 }
66
67 anyhow::Ok(())
68 }
69 .log_err()
70 })
71 .detach();
72 let this = Self {
73 transactions: transactions_tx,
74 path,
75 };
76 this.initialize_database().await?;
77 Ok(this)
78 }
79
80 pub fn path(&self) -> &Arc<Path> {
81 &self.path
82 }
83
84 fn transact<F, T>(&self, f: F) -> impl Future<Output = Result<T>>
85 where
86 F: 'static + Send + FnOnce(&rusqlite::Transaction) -> Result<T>,
87 T: 'static + Send,
88 {
89 let (tx, rx) = oneshot::channel();
90 let transactions = self.transactions.clone();
91 async move {
92 if transactions
93 .send(Box::new(|connection| {
94 let result = connection
95 .transaction()
96 .map_err(|err| anyhow!(err))
97 .and_then(|transaction| {
98 let result = f(&transaction)?;
99 transaction.commit()?;
100 Ok(result)
101 });
102 let _ = tx.send(result);
103 }))
104 .await
105 .is_err()
106 {
107 return Err(anyhow!("connection was dropped"))?;
108 }
109 rx.await?
110 }
111 }
112
113 fn initialize_database(&self) -> impl Future<Output = Result<()>> {
114 self.transact(|db| {
115 rusqlite::vtab::array::load_module(&db)?;
116
117 // Delete existing tables, if SEMANTIC_INDEX_VERSION is bumped
118 let version_query = db.prepare("SELECT version from semantic_index_config");
119 let version = version_query
120 .and_then(|mut query| query.query_row([], |row| Ok(row.get::<_, i64>(0)?)));
121 if version.map_or(false, |version| version == SEMANTIC_INDEX_VERSION as i64) {
122 log::trace!("vector database schema up to date");
123 return Ok(());
124 }
125
126 log::trace!("vector database schema out of date. updating...");
127 // We renamed the `documents` table to `spans`, so we want to drop
128 // `documents` without recreating it if it exists.
129 db.execute("DROP TABLE IF EXISTS documents", [])
130 .context("failed to drop 'documents' table")?;
131 db.execute("DROP TABLE IF EXISTS spans", [])
132 .context("failed to drop 'spans' table")?;
133 db.execute("DROP TABLE IF EXISTS files", [])
134 .context("failed to drop 'files' table")?;
135 db.execute("DROP TABLE IF EXISTS worktrees", [])
136 .context("failed to drop 'worktrees' table")?;
137 db.execute("DROP TABLE IF EXISTS semantic_index_config", [])
138 .context("failed to drop 'semantic_index_config' table")?;
139
140 // Initialize Vector Databasing Tables
141 db.execute(
142 "CREATE TABLE semantic_index_config (
143 version INTEGER NOT NULL
144 )",
145 [],
146 )?;
147
148 db.execute(
149 "INSERT INTO semantic_index_config (version) VALUES (?1)",
150 params![SEMANTIC_INDEX_VERSION],
151 )?;
152
153 db.execute(
154 "CREATE TABLE worktrees (
155 id INTEGER PRIMARY KEY AUTOINCREMENT,
156 absolute_path VARCHAR NOT NULL
157 );
158 CREATE UNIQUE INDEX worktrees_absolute_path ON worktrees (absolute_path);
159 ",
160 [],
161 )?;
162
163 db.execute(
164 "CREATE TABLE files (
165 id INTEGER PRIMARY KEY AUTOINCREMENT,
166 worktree_id INTEGER NOT NULL,
167 relative_path VARCHAR NOT NULL,
168 mtime_seconds INTEGER NOT NULL,
169 mtime_nanos INTEGER NOT NULL,
170 FOREIGN KEY(worktree_id) REFERENCES worktrees(id) ON DELETE CASCADE
171 )",
172 [],
173 )?;
174
175 db.execute(
176 "CREATE UNIQUE INDEX files_worktree_id_and_relative_path ON files (worktree_id, relative_path)",
177 [],
178 )?;
179
180 db.execute(
181 "CREATE TABLE spans (
182 id INTEGER PRIMARY KEY AUTOINCREMENT,
183 file_id INTEGER NOT NULL,
184 start_byte INTEGER NOT NULL,
185 end_byte INTEGER NOT NULL,
186 name VARCHAR NOT NULL,
187 embedding BLOB NOT NULL,
188 digest BLOB NOT NULL,
189 FOREIGN KEY(file_id) REFERENCES files(id) ON DELETE CASCADE
190 )",
191 [],
192 )?;
193
194 log::trace!("vector database initialized with updated schema.");
195 Ok(())
196 })
197 }
198
199 pub fn delete_file(
200 &self,
201 worktree_id: i64,
202 delete_path: Arc<Path>,
203 ) -> impl Future<Output = Result<()>> {
204 self.transact(move |db| {
205 db.execute(
206 "DELETE FROM files WHERE worktree_id = ?1 AND relative_path = ?2",
207 params![worktree_id, delete_path.to_str()],
208 )?;
209 Ok(())
210 })
211 }
212
213 pub fn insert_file(
214 &self,
215 worktree_id: i64,
216 path: Arc<Path>,
217 mtime: SystemTime,
218 spans: Vec<Span>,
219 ) -> impl Future<Output = Result<()>> {
220 self.transact(move |db| {
221 // Return the existing ID, if both the file and mtime match
222 let mtime = Timestamp::from(mtime);
223
224 db.execute(
225 "
226 REPLACE INTO files
227 (worktree_id, relative_path, mtime_seconds, mtime_nanos)
228 VALUES (?1, ?2, ?3, ?4)
229 ",
230 params![worktree_id, path.to_str(), mtime.seconds, mtime.nanos],
231 )?;
232
233 let file_id = db.last_insert_rowid();
234
235 let t0 = Instant::now();
236 let mut query = db.prepare(
237 "
238 INSERT INTO spans
239 (file_id, start_byte, end_byte, name, embedding, digest)
240 VALUES (?1, ?2, ?3, ?4, ?5, ?6)
241 ",
242 )?;
243 log::trace!(
244 "Preparing Query Took: {:?} milliseconds",
245 t0.elapsed().as_millis()
246 );
247
248 for span in spans {
249 query.execute(params![
250 file_id,
251 span.range.start.to_string(),
252 span.range.end.to_string(),
253 span.name,
254 span.embedding,
255 span.digest
256 ])?;
257 }
258
259 Ok(())
260 })
261 }
262
263 pub fn worktree_previously_indexed(
264 &self,
265 worktree_root_path: &Path,
266 ) -> impl Future<Output = Result<bool>> {
267 let worktree_root_path = worktree_root_path.to_string_lossy().into_owned();
268 self.transact(move |db| {
269 let mut worktree_query =
270 db.prepare("SELECT id FROM worktrees WHERE absolute_path = ?1")?;
271 let worktree_id = worktree_query
272 .query_row(params![worktree_root_path], |row| Ok(row.get::<_, i64>(0)?));
273
274 if worktree_id.is_ok() {
275 return Ok(true);
276 } else {
277 return Ok(false);
278 }
279 })
280 }
281
282 pub fn embeddings_for_files(
283 &self,
284 worktree_id_file_paths: HashMap<i64, Vec<Arc<Path>>>,
285 ) -> impl Future<Output = Result<HashMap<SpanDigest, Embedding>>> {
286 self.transact(move |db| {
287 let mut query = db.prepare(
288 "
289 SELECT digest, embedding
290 FROM spans
291 LEFT JOIN files ON files.id = spans.file_id
292 WHERE files.worktree_id = ? AND files.relative_path IN rarray(?)
293 ",
294 )?;
295 let mut embeddings_by_digest = HashMap::default();
296 for (worktree_id, file_paths) in worktree_id_file_paths {
297 let file_paths = Rc::new(
298 file_paths
299 .into_iter()
300 .map(|p| Value::Text(p.to_string_lossy().into_owned()))
301 .collect::<Vec<_>>(),
302 );
303 let rows = query.query_map(params![worktree_id, file_paths], |row| {
304 Ok((row.get::<_, SpanDigest>(0)?, row.get::<_, Embedding>(1)?))
305 })?;
306
307 for row in rows {
308 if let Ok(row) = row {
309 embeddings_by_digest.insert(row.0, row.1);
310 }
311 }
312 }
313
314 Ok(embeddings_by_digest)
315 })
316 }
317
318 pub fn find_or_create_worktree(
319 &self,
320 worktree_root_path: Arc<Path>,
321 ) -> impl Future<Output = Result<i64>> {
322 self.transact(move |db| {
323 let mut worktree_query =
324 db.prepare("SELECT id FROM worktrees WHERE absolute_path = ?1")?;
325 let worktree_id = worktree_query
326 .query_row(params![worktree_root_path.to_string_lossy()], |row| {
327 Ok(row.get::<_, i64>(0)?)
328 });
329
330 if worktree_id.is_ok() {
331 return Ok(worktree_id?);
332 }
333
334 // If worktree_id is Err, insert new worktree
335 db.execute(
336 "INSERT into worktrees (absolute_path) VALUES (?1)",
337 params![worktree_root_path.to_string_lossy()],
338 )?;
339 Ok(db.last_insert_rowid())
340 })
341 }
342
343 pub fn get_file_mtimes(
344 &self,
345 worktree_id: i64,
346 ) -> impl Future<Output = Result<HashMap<PathBuf, SystemTime>>> {
347 self.transact(move |db| {
348 let mut statement = db.prepare(
349 "
350 SELECT relative_path, mtime_seconds, mtime_nanos
351 FROM files
352 WHERE worktree_id = ?1
353 ORDER BY relative_path",
354 )?;
355 let mut result: HashMap<PathBuf, SystemTime> = HashMap::default();
356 for row in statement.query_map(params![worktree_id], |row| {
357 Ok((
358 row.get::<_, String>(0)?.into(),
359 Timestamp {
360 seconds: row.get(1)?,
361 nanos: row.get(2)?,
362 }
363 .into(),
364 ))
365 })? {
366 let row = row?;
367 result.insert(row.0, row.1);
368 }
369 Ok(result)
370 })
371 }
372
373 pub fn top_k_search(
374 &self,
375 query_embedding: &Embedding,
376 limit: usize,
377 file_ids: &[i64],
378 ) -> impl Future<Output = Result<Vec<(i64, f32)>>> {
379 let query_embedding = query_embedding.clone();
380 let file_ids = file_ids.to_vec();
381 self.transact(move |db| {
382 let mut results = Vec::<(i64, f32)>::with_capacity(limit + 1);
383 Self::for_each_span(db, &file_ids, |id, embedding| {
384 let similarity = embedding.similarity(&query_embedding);
385 let ix = match results.binary_search_by(|(_, s)| {
386 similarity.partial_cmp(&s).unwrap_or(Ordering::Equal)
387 }) {
388 Ok(ix) => ix,
389 Err(ix) => ix,
390 };
391 results.insert(ix, (id, similarity));
392 results.truncate(limit);
393 })?;
394
395 anyhow::Ok(results)
396 })
397 }
398
399 pub fn retrieve_included_file_ids(
400 &self,
401 worktree_ids: &[i64],
402 includes: &[PathMatcher],
403 excludes: &[PathMatcher],
404 ) -> impl Future<Output = Result<Vec<i64>>> {
405 let worktree_ids = worktree_ids.to_vec();
406 let includes = includes.to_vec();
407 let excludes = excludes.to_vec();
408 self.transact(move |db| {
409 let mut file_query = db.prepare(
410 "
411 SELECT
412 id, relative_path
413 FROM
414 files
415 WHERE
416 worktree_id IN rarray(?)
417 ",
418 )?;
419
420 let mut file_ids = Vec::<i64>::new();
421 let mut rows = file_query.query([ids_to_sql(&worktree_ids)])?;
422
423 while let Some(row) = rows.next()? {
424 let file_id = row.get(0)?;
425 let relative_path = row.get_ref(1)?.as_str()?;
426 let included =
427 includes.is_empty() || includes.iter().any(|glob| glob.is_match(relative_path));
428 let excluded = excludes.iter().any(|glob| glob.is_match(relative_path));
429 if included && !excluded {
430 file_ids.push(file_id);
431 }
432 }
433
434 anyhow::Ok(file_ids)
435 })
436 }
437
438 fn for_each_span(
439 db: &rusqlite::Connection,
440 file_ids: &[i64],
441 mut f: impl FnMut(i64, Embedding),
442 ) -> Result<()> {
443 let mut query_statement = db.prepare(
444 "
445 SELECT
446 id, embedding
447 FROM
448 spans
449 WHERE
450 file_id IN rarray(?)
451 ",
452 )?;
453
454 query_statement
455 .query_map(params![ids_to_sql(&file_ids)], |row| {
456 Ok((row.get(0)?, row.get::<_, Embedding>(1)?))
457 })?
458 .filter_map(|row| row.ok())
459 .for_each(|(id, embedding)| f(id, embedding));
460 Ok(())
461 }
462
463 pub fn spans_for_ids(
464 &self,
465 ids: &[i64],
466 ) -> impl Future<Output = Result<Vec<(i64, PathBuf, Range<usize>)>>> {
467 let ids = ids.to_vec();
468 self.transact(move |db| {
469 let mut statement = db.prepare(
470 "
471 SELECT
472 spans.id,
473 files.worktree_id,
474 files.relative_path,
475 spans.start_byte,
476 spans.end_byte
477 FROM
478 spans, files
479 WHERE
480 spans.file_id = files.id AND
481 spans.id in rarray(?)
482 ",
483 )?;
484
485 let result_iter = statement.query_map(params![ids_to_sql(&ids)], |row| {
486 Ok((
487 row.get::<_, i64>(0)?,
488 row.get::<_, i64>(1)?,
489 row.get::<_, String>(2)?.into(),
490 row.get(3)?..row.get(4)?,
491 ))
492 })?;
493
494 let mut values_by_id = HashMap::<i64, (i64, PathBuf, Range<usize>)>::default();
495 for row in result_iter {
496 let (id, worktree_id, path, range) = row?;
497 values_by_id.insert(id, (worktree_id, path, range));
498 }
499
500 let mut results = Vec::with_capacity(ids.len());
501 for id in &ids {
502 let value = values_by_id
503 .remove(id)
504 .ok_or(anyhow!("missing span id {}", id))?;
505 results.push(value);
506 }
507
508 Ok(results)
509 })
510 }
511}
512
513fn ids_to_sql(ids: &[i64]) -> Rc<Vec<rusqlite::types::Value>> {
514 Rc::new(
515 ids.iter()
516 .copied()
517 .map(|v| rusqlite::types::Value::from(v))
518 .collect::<Vec<_>>(),
519 )
520}