updated vector_store to reindex on save after timed delay

KCaverly created

Change summary

crates/vector_store/src/vector_store.rs       | 108 ++++++++++----------
crates/vector_store/src/vector_store_tests.rs |   2 
2 files changed, 57 insertions(+), 53 deletions(-)

Detailed changes

crates/vector_store/src/vector_store.rs 🔗

@@ -33,6 +33,8 @@ use util::{
 };
 use workspace::{Workspace, WorkspaceCreated};
 
+const REINDEXING_DELAY: u64 = 30;
+
 #[derive(Debug)]
 pub struct Document {
     pub offset: usize,
@@ -58,10 +60,10 @@ pub fn init(
         let vector_store = VectorStore::new(
             fs,
             db_file_path,
-            Arc::new(embedding::DummyEmbeddings {}),
-            // Arc::new(OpenAIEmbeddings {
-            //     client: http_client,
-            // }),
+            // Arc::new(embedding::DummyEmbeddings {}),
+            Arc::new(OpenAIEmbeddings {
+                client: http_client,
+            }),
             language_registry,
             cx.clone(),
         )
@@ -121,7 +123,9 @@ pub struct VectorStore {
     embedding_provider: Arc<dyn EmbeddingProvider>,
     language_registry: Arc<LanguageRegistry>,
     db_update_tx: channel::Sender<DbWrite>,
+    paths_tx: channel::Sender<(i64, PathBuf, Arc<Language>, SystemTime)>,
     _db_update_task: Task<()>,
+    _paths_update_task: Task<()>,
     projects: HashMap<WeakModelHandle<Project>, ProjectState>,
 }
 
@@ -203,14 +207,50 @@ impl VectorStore {
                 }
             });
 
+            let (paths_tx, paths_rx) =
+                channel::unbounded::<(i64, PathBuf, Arc<Language>, SystemTime)>();
+
+            let fs_clone = fs.clone();
+            let db_update_tx_clone = db_update_tx.clone();
+            let embedding_provider_clone = embedding_provider.clone();
+
+            let _paths_update_task = cx.background().spawn(async move {
+                let mut parser = Parser::new();
+                let mut cursor = QueryCursor::new();
+                while let Ok((worktree_id, file_path, language, mtime)) = paths_rx.recv().await {
+                    log::info!("Parsing File: {:?}", &file_path);
+                    if let Some(indexed_file) = Self::index_file(
+                        &mut cursor,
+                        &mut parser,
+                        embedding_provider_clone.as_ref(),
+                        &fs_clone,
+                        language,
+                        file_path,
+                        mtime,
+                    )
+                    .await
+                    .log_err()
+                    {
+                        db_update_tx_clone
+                            .try_send(DbWrite::InsertFile {
+                                worktree_id,
+                                indexed_file,
+                            })
+                            .unwrap();
+                    }
+                }
+            });
+
             Self {
                 fs,
                 database_url,
                 db_update_tx,
+                paths_tx,
                 embedding_provider,
                 language_registry,
                 projects: HashMap::new(),
                 _db_update_task,
+                _paths_update_task,
             }
         }))
     }
@@ -315,9 +355,9 @@ impl VectorStore {
 
         let fs = self.fs.clone();
         let language_registry = self.language_registry.clone();
-        let embedding_provider = self.embedding_provider.clone();
         let database_url = self.database_url.clone();
         let db_update_tx = self.db_update_tx.clone();
+        let paths_tx = self.paths_tx.clone();
 
         cx.spawn(|this, mut cx| async move {
             futures::future::join_all(worktree_scans_complete).await;
@@ -356,8 +396,6 @@ impl VectorStore {
                 })
                 .await?;
 
-            let (paths_tx, paths_rx) =
-                channel::unbounded::<(i64, PathBuf, Arc<Language>, SystemTime)>();
             cx.background()
                 .spawn({
                     let db_ids_by_worktree_id = db_ids_by_worktree_id.clone();
@@ -415,42 +453,8 @@ impl VectorStore {
                 })
                 .detach();
 
-            cx.background()
-                .scoped(|scope| {
-                    for _ in 0..cx.background().num_cpus() {
-                        scope.spawn(async {
-                            let mut parser = Parser::new();
-                            let mut cursor = QueryCursor::new();
-                            while let Ok((worktree_id, file_path, language, mtime)) =
-                                paths_rx.recv().await
-                            {
-                                if let Some(indexed_file) = Self::index_file(
-                                    &mut cursor,
-                                    &mut parser,
-                                    embedding_provider.as_ref(),
-                                    &fs,
-                                    language,
-                                    file_path,
-                                    mtime,
-                                )
-                                .await
-                                .log_err()
-                                {
-                                    db_update_tx
-                                        .try_send(DbWrite::InsertFile {
-                                            worktree_id,
-                                            indexed_file,
-                                        })
-                                        .unwrap();
-                                }
-                            }
-                        });
-                    }
-                })
-                .await;
-
             this.update(&mut cx, |this, cx| {
-                let _subscription = cx.subscribe(&project, |this, project, event, cx| {
+                let _subscription = cx.subscribe(&project, |this, project, event, _cx| {
                     if let Some(project_state) = this.projects.get(&project.downgrade()) {
                         let worktree_db_ids = project_state.worktree_db_ids.clone();
 
@@ -488,6 +492,7 @@ impl VectorStore {
                             }
 
                             let file_mtimes = file_mtimes.unwrap();
+                            let paths_tx = this.paths_tx.clone();
 
                             smol::block_on(async move {
                                 for change in changes.into_iter() {
@@ -504,7 +509,6 @@ impl VectorStore {
                                         {
                                             continue;
                                         }
-                                        log::info!("Language found: {:?}: ", language.name());
 
                                         // TODO: Make this a bit more defensive
                                         let modified_time =
@@ -515,7 +519,7 @@ impl VectorStore {
                                             existing_time.map_or(false, |existing_time| {
                                                 if &modified_time != existing_time
                                                     && existing_time.elapsed().unwrap().as_secs()
-                                                        > 30
+                                                        > REINDEXING_DELAY
                                                 {
                                                     false
                                                 } else {
@@ -525,14 +529,14 @@ impl VectorStore {
 
                                         if !already_stored {
                                             log::info!("Need to reindex: {:?}", &change_path);
-                                            // paths_tx
-                                            //     .try_send((
-                                            //         worktree_db_id,
-                                            //         change_path.to_path_buf(),
-                                            //         language,
-                                            //         modified_time,
-                                            //     ))
-                                            //     .unwrap();
+                                            paths_tx
+                                                .try_send((
+                                                    worktree_db_id,
+                                                    change_path.to_path_buf(),
+                                                    language,
+                                                    modified_time,
+                                                ))
+                                                .unwrap();
                                         }
                                     }
                                 }

crates/vector_store/src/vector_store_tests.rs 🔗

@@ -5,7 +5,7 @@ use anyhow::Result;
 use async_trait::async_trait;
 use gpui::{Task, TestAppContext};
 use language::{Language, LanguageConfig, LanguageRegistry};
-use project::{FakeFs, Fs, Project};
+use project::{FakeFs, Project};
 use rand::Rng;
 use serde_json::json;
 use unindent::Unindent;