reindexing update to appropriately accomodate for buffer delay and persistent pending files list

KCaverly created

Change summary

crates/vector_store/src/vector_store.rs | 234 ++++++++++++++++++--------
1 file changed, 160 insertions(+), 74 deletions(-)

Detailed changes

crates/vector_store/src/vector_store.rs 🔗

@@ -18,11 +18,13 @@ use modal::{SemanticSearch, SemanticSearchDelegate, Toggle};
 use project::{Fs, Project, WorktreeId};
 use smol::channel;
 use std::{
+    cell::RefCell,
     cmp::Ordering,
     collections::HashMap,
     path::{Path, PathBuf},
+    rc::Rc,
     sync::Arc,
-    time::{Instant, SystemTime},
+    time::{Duration, Instant, SystemTime},
 };
 use tree_sitter::{Parser, QueryCursor};
 use util::{
@@ -33,7 +35,7 @@ use util::{
 };
 use workspace::{Workspace, WorkspaceCreated};
 
-const REINDEXING_DELAY: u64 = 30;
+const REINDEXING_DELAY_SECONDS: u64 = 3;
 const EMBEDDINGS_BATCH_SIZE: usize = 150;
 
 #[derive(Debug, Clone)]
@@ -124,20 +126,62 @@ pub struct VectorStore {
     embedding_provider: Arc<dyn EmbeddingProvider>,
     language_registry: Arc<LanguageRegistry>,
     db_update_tx: channel::Sender<DbWrite>,
-    // embed_batch_tx: channel::Sender<Vec<(i64, IndexedFile, Vec<String>)>>,
-    parsing_files_tx: channel::Sender<(i64, PathBuf, Arc<Language>, SystemTime)>,
+    parsing_files_tx: channel::Sender<PendingFile>,
     _db_update_task: Task<()>,
     _embed_batch_task: Vec<Task<()>>,
     _batch_files_task: Task<()>,
     _parsing_files_tasks: Vec<Task<()>>,
-    projects: HashMap<WeakModelHandle<Project>, ProjectState>,
+    projects: HashMap<WeakModelHandle<Project>, Rc<RefCell<ProjectState>>>,
 }
 
 struct ProjectState {
     worktree_db_ids: Vec<(WorktreeId, i64)>,
+    pending_files: HashMap<PathBuf, (PendingFile, SystemTime)>,
     _subscription: gpui::Subscription,
 }
 
+impl ProjectState {
+    fn update_pending_files(&mut self, pending_file: PendingFile, indexing_time: SystemTime) {
+        // If Pending File Already Exists, Replace it with the new one
+        // but keep the old indexing time
+        if let Some(old_file) = self.pending_files.remove(&pending_file.path.clone()) {
+            self.pending_files
+                .insert(pending_file.path.clone(), (pending_file, old_file.1));
+        } else {
+            self.pending_files
+                .insert(pending_file.path.clone(), (pending_file, indexing_time));
+        };
+    }
+
+    fn get_outstanding_files(&mut self) -> Vec<PendingFile> {
+        let mut outstanding_files = vec![];
+        let mut remove_keys = vec![];
+        for key in self.pending_files.keys().into_iter() {
+            if let Some(pending_details) = self.pending_files.get(key) {
+                let (pending_file, index_time) = pending_details;
+                if index_time <= &SystemTime::now() {
+                    outstanding_files.push(pending_file.clone());
+                    remove_keys.push(key.clone());
+                }
+            }
+        }
+
+        for key in remove_keys.iter() {
+            self.pending_files.remove(key);
+        }
+
+        return outstanding_files;
+    }
+}
+
+#[derive(Clone, Debug)]
+struct PendingFile {
+    worktree_db_id: i64,
+    path: PathBuf,
+    language: Arc<Language>,
+    modified_time: SystemTime,
+}
+
 #[derive(Debug, Clone)]
 pub struct SearchResult {
     pub worktree_id: WorktreeId,
@@ -293,8 +337,7 @@ impl VectorStore {
             });
 
             // parsing_files_tx/rx: Parsing Files to Embeddable Documents
-            let (parsing_files_tx, parsing_files_rx) =
-                channel::unbounded::<(i64, PathBuf, Arc<Language>, SystemTime)>();
+            let (parsing_files_tx, parsing_files_rx) = channel::unbounded::<PendingFile>();
 
             let mut _parsing_files_tasks = Vec::new();
             for _ in 0..cx.background().num_cpus() {
@@ -304,23 +347,25 @@ impl VectorStore {
                 _parsing_files_tasks.push(cx.background().spawn(async move {
                     let mut parser = Parser::new();
                     let mut cursor = QueryCursor::new();
-                    while let Ok((worktree_id, file_path, language, mtime)) =
-                        parsing_files_rx.recv().await
-                    {
-                        log::info!("Parsing File: {:?}", &file_path);
+                    while let Ok(pending_file) = parsing_files_rx.recv().await {
+                        log::info!("Parsing File: {:?}", &pending_file.path);
                         if let Some((indexed_file, document_spans)) = Self::index_file(
                             &mut cursor,
                             &mut parser,
                             &fs,
-                            language,
-                            file_path.clone(),
-                            mtime,
+                            pending_file.language,
+                            pending_file.path.clone(),
+                            pending_file.modified_time,
                         )
                         .await
                         .log_err()
                         {
                             batch_files_tx
-                                .try_send((worktree_id, indexed_file, document_spans))
+                                .try_send((
+                                    pending_file.worktree_db_id,
+                                    indexed_file,
+                                    document_spans,
+                                ))
                                 .unwrap();
                         }
                     }
@@ -516,12 +561,13 @@ impl VectorStore {
 
                                     if !already_stored {
                                         parsing_files_tx
-                                            .try_send((
-                                                db_ids_by_worktree_id[&worktree.id()],
-                                                path_buf,
+                                            .try_send(PendingFile {
+                                                worktree_db_id: db_ids_by_worktree_id
+                                                    [&worktree.id()],
+                                                path: path_buf,
                                                 language,
-                                                file.mtime,
-                                            ))
+                                                modified_time: file.mtime,
+                                            })
                                             .unwrap();
                                     }
                                 }
@@ -543,54 +589,82 @@ impl VectorStore {
                 })
                 .detach();
 
+            // let mut pending_files: Vec<(PathBuf, ((i64, PathBuf, Arc<Language>, SystemTime), SystemTime))> = vec![];
             this.update(&mut cx, |this, cx| {
                 // The below is managing for updated on save
                 // Currently each time a file is saved, this code is run, and for all the files that were changed, if the current time is
                 // greater than the previous embedded time by the REINDEXING_DELAY variable, we will send the file off to be indexed.
-                let _subscription = cx.subscribe(&project, |this, project, event, _cx| {
+                let _subscription = cx.subscribe(&project, |this, project, event, cx| {
                     if let Some(project_state) = this.projects.get(&project.downgrade()) {
+                        let mut project_state = project_state.borrow_mut();
                         let worktree_db_ids = project_state.worktree_db_ids.clone();
 
                         if let project::Event::WorktreeUpdatedEntries(worktree_id, changes) = event
                         {
-                            // Iterate through changes
-                            let language_registry = this.language_registry.clone();
-
-                            let db =
-                                VectorDatabase::new(this.database_url.to_string_lossy().into());
-                            if db.is_err() {
+                            // Get Worktree Object
+                            let worktree =
+                                project.read(cx).worktree_for_id(worktree_id.clone(), cx);
+                            if worktree.is_none() {
                                 return;
                             }
-                            let db = db.unwrap();
+                            let worktree = worktree.unwrap();
 
-                            let worktree_db_id: Option<i64> = {
-                                let mut found_db_id = None;
-                                for (w_id, db_id) in worktree_db_ids.into_iter() {
-                                    if &w_id == worktree_id {
-                                        found_db_id = Some(db_id);
+                            // Get Database
+                            let db_values = {
+                                if let Ok(db) =
+                                    VectorDatabase::new(this.database_url.to_string_lossy().into())
+                                {
+                                    let worktree_db_id: Option<i64> = {
+                                        let mut found_db_id = None;
+                                        for (w_id, db_id) in worktree_db_ids.into_iter() {
+                                            if &w_id == &worktree.read(cx).id() {
+                                                found_db_id = Some(db_id)
+                                            }
+                                        }
+                                        found_db_id
+                                    };
+                                    if worktree_db_id.is_none() {
+                                        return;
                                     }
-                                }
+                                    let worktree_db_id = worktree_db_id.unwrap();
 
-                                found_db_id
+                                    let file_mtimes = db.get_file_mtimes(worktree_db_id);
+                                    if file_mtimes.is_err() {
+                                        return;
+                                    }
+
+                                    let file_mtimes = file_mtimes.unwrap();
+                                    Some((file_mtimes, worktree_db_id))
+                                } else {
+                                    return;
+                                }
                             };
 
-                            if worktree_db_id.is_none() {
+                            if db_values.is_none() {
                                 return;
                             }
-                            let worktree_db_id = worktree_db_id.unwrap();
 
-                            let file_mtimes = db.get_file_mtimes(worktree_db_id);
-                            if file_mtimes.is_err() {
-                                return;
-                            }
+                            let (file_mtimes, worktree_db_id) = db_values.unwrap();
 
-                            let file_mtimes = file_mtimes.unwrap();
+                            // Iterate Through Changes
+                            let language_registry = this.language_registry.clone();
                             let parsing_files_tx = this.parsing_files_tx.clone();
 
                             smol::block_on(async move {
                                 for change in changes.into_iter() {
                                     let change_path = change.0.clone();
-                                    log::info!("Change: {:?}", &change_path);
+                                    // Skip if git ignored or symlink
+                                    if let Some(entry) = worktree.read(cx).entry_for_id(change.1) {
+                                        if entry.is_ignored || entry.is_symlink {
+                                            continue;
+                                        } else {
+                                            log::info!(
+                                                "Testing for Reindexing: {:?}",
+                                                &change_path
+                                            );
+                                        }
+                                    };
+
                                     if let Ok(language) = language_registry
                                         .language_for_file(&change_path.to_path_buf(), None)
                                         .await
@@ -603,47 +677,59 @@ impl VectorStore {
                                             continue;
                                         }
 
-                                        // TODO: Make this a bit more defensive
-                                        let modified_time =
-                                            change_path.metadata().unwrap().modified().unwrap();
-                                        let existing_time =
-                                            file_mtimes.get(&change_path.to_path_buf());
-                                        let already_stored =
-                                            existing_time.map_or(false, |existing_time| {
-                                                if &modified_time != existing_time
-                                                    && existing_time.elapsed().unwrap().as_secs()
-                                                        > REINDEXING_DELAY
-                                                {
-                                                    false
+                                        if let Some(modified_time) = {
+                                            let metadata = change_path.metadata();
+                                            if metadata.is_err() {
+                                                None
+                                            } else {
+                                                let mtime = metadata.unwrap().modified();
+                                                if mtime.is_err() {
+                                                    None
                                                 } else {
-                                                    true
+                                                    Some(mtime.unwrap())
                                                 }
-                                            });
-
-                                        if !already_stored {
-                                            log::info!("Need to reindex: {:?}", &change_path);
-                                            parsing_files_tx
-                                                .try_send((
-                                                    worktree_db_id,
-                                                    change_path.to_path_buf(),
-                                                    language,
-                                                    modified_time,
-                                                ))
-                                                .unwrap();
+                                            }
+                                        } {
+                                            let existing_time =
+                                                file_mtimes.get(&change_path.to_path_buf());
+                                            let already_stored = existing_time
+                                                .map_or(false, |existing_time| {
+                                                    &modified_time != existing_time
+                                                });
+
+                                            let reindex_time = modified_time
+                                                + Duration::from_secs(REINDEXING_DELAY_SECONDS);
+
+                                            if !already_stored {
+                                                project_state.update_pending_files(
+                                                    PendingFile {
+                                                        path: change_path.to_path_buf(),
+                                                        modified_time,
+                                                        worktree_db_id,
+                                                        language: language.clone(),
+                                                    },
+                                                    reindex_time,
+                                                );
+
+                                                for file in project_state.get_outstanding_files() {
+                                                    parsing_files_tx.try_send(file).unwrap();
+                                                }
+                                            }
                                         }
                                     }
                                 }
-                            })
-                        }
+                            });
+                        };
                     }
                 });
 
                 this.projects.insert(
                     project.downgrade(),
-                    ProjectState {
+                    Rc::new(RefCell::new(ProjectState {
+                        pending_files: HashMap::new(),
                         worktree_db_ids: db_ids_by_worktree_id.into_iter().collect(),
                         _subscription,
-                    },
+                    })),
                 );
             });
 
@@ -659,7 +745,7 @@ impl VectorStore {
         cx: &mut ModelContext<Self>,
     ) -> Task<Result<Vec<SearchResult>>> {
         let project_state = if let Some(state) = self.projects.get(&project.downgrade()) {
-            state
+            state.borrow()
         } else {
             return Task::ready(Err(anyhow!("project not added")));
         };
@@ -717,7 +803,7 @@ impl VectorStore {
 
             this.read_with(&cx, |this, _| {
                 let project_state = if let Some(state) = this.projects.get(&project.downgrade()) {
-                    state
+                    state.borrow()
                 } else {
                     return Err(anyhow!("project not added"));
                 };