moved semantic_index project intialization to queue and channel method

KCaverly created

Change summary

crates/semantic_index/src/semantic_index.rs | 267 +++++++++-------------
1 file changed, 108 insertions(+), 159 deletions(-)

Detailed changes

crates/semantic_index/src/semantic_index.rs 🔗

@@ -15,8 +15,9 @@ use gpui::{AppContext, AsyncAppContext, Entity, ModelContext, ModelHandle, Task,
 use language::{Anchor, Buffer, Language, LanguageRegistry};
 use parking_lot::Mutex;
 use parsing::{CodeContextRetriever, Document, PARSEABLE_ENTIRE_FILE_TYPES};
+use postage::stream::Stream;
 use postage::watch;
-use project::{search::PathMatcher, Fs, Project, WorktreeId};
+use project::{search::PathMatcher, Fs, PathChange, Project, ProjectEntryId, WorktreeId};
 use smol::channel;
 use std::{
     cmp::Ordering,
@@ -96,7 +97,8 @@ struct ProjectState {
     subscription: gpui::Subscription,
     outstanding_job_count_rx: watch::Receiver<usize>,
     _outstanding_job_count_tx: Arc<Mutex<watch::Sender<usize>>>,
-    queue: HashMap<WorktreeId, Vec<IndexOperation>>,
+    job_queue_tx: channel::Sender<IndexOperation>,
+    _queue_update_task: Task<()>,
 }
 
 #[derive(Clone)]
@@ -116,6 +118,7 @@ impl JobHandle {
 }
 impl ProjectState {
     fn new(
+        cx: &mut AppContext,
         subscription: gpui::Subscription,
         worktree_db_ids: Vec<(WorktreeId, i64)>,
         worktree_file_mtimes: HashMap<WorktreeId, HashMap<PathBuf, SystemTime>>,
@@ -125,29 +128,51 @@ impl ProjectState {
         let (job_count_tx, job_count_rx) = watch::channel_with(0);
         let job_count_tx = Arc::new(Mutex::new(job_count_tx));
 
+        let (job_queue_tx, job_queue_rx) = channel::unbounded();
+        let _queue_update_task = cx.background().spawn({
+            let mut worktree_queue = Vec::new();
+            async move {
+                while let Ok(operation) = job_queue_rx.recv().await {
+                    Self::update_queue(&mut worktree_queue, operation);
+                }
+            }
+        });
+
         Self {
             worktree_db_ids,
             worktree_file_mtimes,
             outstanding_job_count_rx,
             _outstanding_job_count_tx,
             subscription,
-            queue: HashMap::new(),
+            _queue_update_task,
+            job_queue_tx,
         }
     }
 
-    fn add_to_queue(&mut self, worktree_id: WorktreeId, operation: IndexOperation) {
-        if let Some(worktree_queue) = self.queue.get_mut(&worktree_id) {
-            worktree_queue.push(operation);
-        } else {
-            self.queue.insert(worktree_id, vec![operation]);
-        }
+    pub fn get_outstanding_count(&self) -> usize {
+        self.outstanding_job_count_rx.borrow().clone()
     }
 
-    fn pop(&mut self) -> Option<IndexOperation> {
-        self.queue
-            .iter_mut()
-            .next()
-            .and_then(|(_, mut entry)| entry.pop())
+    fn update_queue(queue: &mut Vec<IndexOperation>, operation: IndexOperation) {
+        match operation {
+            IndexOperation::FlushQueue => {
+                for op in queue.pop() {
+                    match op {
+                        IndexOperation::IndexFile { payload, tx } => {
+                            tx.try_send(payload);
+                        }
+                        IndexOperation::DeleteFile { payload, tx } => {
+                            tx.try_send(payload);
+                        }
+                        _ => {}
+                    }
+                }
+            }
+            _ => {
+                // TODO: This has to accomodate for duplicate files to index.
+                queue.push(operation);
+            }
+        }
     }
 
     fn db_id_for_worktree_id(&self, id: WorktreeId) -> Option<i64> {
@@ -185,10 +210,16 @@ pub struct PendingFile {
     job_handle: JobHandle,
 }
 
-#[derive(Clone)]
 enum IndexOperation {
-    IndexFile { file: PendingFile },
-    DeleteFile { file: PendingFile },
+    IndexFile {
+        payload: PendingFile,
+        tx: channel::Sender<PendingFile>,
+    },
+    DeleteFile {
+        payload: DbOperation,
+        tx: channel::Sender<DbOperation>,
+    },
+    FlushQueue,
 }
 
 pub struct SearchResult {
@@ -621,6 +652,52 @@ impl SemanticIndex {
         })
     }
 
+    // pub fn project_entries_changed(
+    //     &self,
+    //     project: ModelHandle<Project>,
+    //     changes: &Arc<[(Arc<Path>, ProjectEntryId, PathChange)]>,
+    //     cx: &ModelContext<SemanticIndex>,
+    //     worktree_id: &WorktreeId,
+    // ) -> Result<()> {
+    //     let parsing_files_tx = self.parsing_files_tx.clone();
+    //     let db_update_tx = self.db_update_tx.clone();
+    //     let (job_queue_tx, outstanding_job_tx, worktree_db_id) = {
+    //         let state = self.projects.get(&project.downgrade());
+    //         if state.is_none() {
+    //             return anyhow::Error(anyhow!("Project not yet initialized"));
+    //         }
+    //         let state = state.unwrap();
+    //         (
+    //             state.job_queue_tx.clone(),
+    //             state._outstanding_job_count_tx,
+    //             state.db_id_for_worktree_id(worktree_id),
+    //         )
+    //     };
+
+    //     for (path, entry_id, path_change) in changes.iter() {
+    //         match path_change {
+    //             PathChange::AddedOrUpdated => {
+    //                 let job_handle = JobHandle::new(&outstanding_job_tx);
+    //                 job_queue_tx.try_send(IndexOperation::IndexFile {
+    //                     payload: PendingFile {
+    //                         worktree_db_id,
+    //                         relative_path: path,
+    //                         absolute_path,
+    //                         language,
+    //                         modified_time,
+    //                         job_handle,
+    //                     },
+    //                     tx: parsing_files_tx,
+    //                 })
+    //             }
+    //             PathChange::Removed => {}
+    //             _ => {}
+    //         }
+    //     }
+
+    //     Ok(())
+    // }
+
     pub fn initialize_project(
         &mut self,
         project: ModelHandle<Project>,
@@ -653,6 +730,7 @@ impl SemanticIndex {
         });
 
         let language_registry = self.language_registry.clone();
+        let parsing_files_tx = self.parsing_files_tx.clone();
 
         cx.spawn(|this, mut cx| async move {
             futures::future::join_all(worktree_scans_complete).await;
@@ -686,24 +764,13 @@ impl SemanticIndex {
             let (job_count_tx, job_count_rx) = watch::channel_with(0);
             let job_count_tx = Arc::new(Mutex::new(job_count_tx));
             let job_count_tx_longlived = job_count_tx.clone();
-            // this.update(&mut cx, |this, _| {
-            //     let project_state = ProjectState::new(
-            //         _subscription,
-            //         worktree_db_ids,
-            //         worktree_file_mtimes.clone(),
-            //         job_count_rx.clone(),
-            //         job_count_tx.clone(),
-            //     );
-            //     this.projects.insert(project.downgrade(), project_state);
-            // });
 
             let worktree_file_mtimes_all = worktree_file_mtimes.clone();
             let worktree_files = cx
                 .background()
                 .spawn(async move {
-                    let mut worktree_files = HashMap::new();
+                    let mut worktree_files = Vec::new();
                     for worktree in worktrees.into_iter() {
-                        let mut candidate_files = Vec::new();
                         let mut file_mtimes = worktree_file_mtimes.remove(&worktree.id()).unwrap();
                         for file in worktree.files(false, 0) {
                             let absolute_path = worktree.absolutize(&file.path);
@@ -730,8 +797,8 @@ impl SemanticIndex {
 
                                 if !already_stored {
                                     let job_handle = JobHandle::new(&job_count_tx);
-                                    candidate_files.push(IndexOperation::IndexFile {
-                                        file: PendingFile {
+                                    worktree_files.push(IndexOperation::IndexFile {
+                                        payload: PendingFile {
                                             worktree_db_id: db_ids_by_worktree_id[&worktree.id()],
                                             relative_path: path_buf,
                                             absolute_path,
@@ -739,12 +806,11 @@ impl SemanticIndex {
                                             job_handle,
                                             modified_time: file.mtime,
                                         },
+                                        tx: parsing_files_tx.clone(),
                                     });
                                 }
                             }
                         }
-
-                        worktree_files.insert(worktree.id(), candidate_files);
                     }
 
                     anyhow::Ok(worktree_files)
@@ -753,6 +819,7 @@ impl SemanticIndex {
 
             this.update(&mut cx, |this, cx| {
                 let project_state = ProjectState::new(
+                    cx,
                     _subscription,
                     worktree_db_ids,
                     worktree_file_mtimes_all,
@@ -761,10 +828,8 @@ impl SemanticIndex {
                 );
 
                 if let Some(project_state) = this.projects.get_mut(&project.downgrade()) {
-                    for (worktree_id, index_operations) in &worktree_files {
-                        for op in index_operations {
-                            project_state.add_to_queue(*worktree_id, op.clone());
-                        }
+                    for op in worktree_files {
+                        project_state.job_queue_tx.try_send(op);
                     }
                 }
 
@@ -791,134 +856,18 @@ impl SemanticIndex {
         let parsing_files_tx = self.parsing_files_tx.clone();
         let db_update_tx = self.db_update_tx.clone();
         let job_count_rx = state.outstanding_job_count_rx.clone();
-        let count = state.queue.values().map(Vec::len).sum();
+        let count = state.get_outstanding_count();
+
         cx.spawn(|this, mut cx| async move {
             this.update(&mut cx, |this, cx| {
-                let Some(mut state) = this.projects.get_mut(&project.downgrade()) else {
+                let Some(state) = this.projects.get_mut(&project.downgrade()) else {
                     return;
                 };
-                let Some(mut index_operation) = state.pop() else { return;};
-                let _ = match index_operation {
-                    IndexOperation::IndexFile { file } => {
-                        parsing_files_tx.try_send(file);
-                    }
-                    IndexOperation::DeleteFile { file } => {
-                        db_update_tx.try_send(DbOperation::Delete {
-                            worktree_id: file.worktree_db_id,
-                            path: file.relative_path,
-                        });
-                    }
-                };
-            });
-        })
-        .detach();
+                state.job_queue_tx.try_send(IndexOperation::FlushQueue);
+            })
+        });
 
         Task::Ready(Some(Ok((count, job_count_rx))))
-
-        // cx.spawn(|this, mut cx| async move {
-        //     futures::future::join_all(worktree_scans_complete).await;
-
-        //     let worktree_db_ids = futures::future::join_all(worktree_db_ids).await;
-
-        //     let worktrees = project.read_with(&cx, |project, cx| {
-        //         project
-        //             .worktrees(cx)
-        //             .map(|worktree| worktree.read(cx).snapshot())
-        //             .collect::<Vec<_>>()
-        //     });
-
-        //     let mut worktree_file_mtimes = HashMap::new();
-        //     let mut db_ids_by_worktree_id = HashMap::new();
-        //     for (worktree, db_id) in worktrees.iter().zip(worktree_db_ids) {
-        //         let db_id = db_id?;
-        //         db_ids_by_worktree_id.insert(worktree.id(), db_id);
-        //         worktree_file_mtimes.insert(
-        //             worktree.id(),
-        //             this.read_with(&cx, |this, _| this.get_file_mtimes(db_id))
-        //                 .await?,
-        //         );
-        //     }
-
-        //     let worktree_db_ids = db_ids_by_worktree_id
-        //         .iter()
-        //         .map(|(a, b)| (*a, *b))
-        //         .collect();
-
-        //     let (job_count_tx, job_count_rx) = watch::channel_with(0);
-        //     let job_count_tx = Arc::new(Mutex::new(job_count_tx));
-        //     this.update(&mut cx, |this, _| {
-        //         let project_state = ProjectState::new(
-        //             _subscription,
-        //             worktree_db_ids,
-        //             worktree_file_mtimes.clone(),
-        //             job_count_rx.clone(),
-        //             job_count_tx.clone(),
-        //         );
-        //         this.projects.insert(project.downgrade(), project_state);
-        //     });
-
-        //     cx.background()
-        //         .spawn(async move {
-        //             let mut count = 0;
-        //             for worktree in worktrees.into_iter() {
-        //                 let mut file_mtimes = worktree_file_mtimes.remove(&worktree.id()).unwrap();
-        //                 for file in worktree.files(false, 0) {
-        //                     let absolute_path = worktree.absolutize(&file.path);
-
-        //                     if let Ok(language) = language_registry
-        //                         .language_for_file(&absolute_path, None)
-        //                         .await
-        //                     {
-        //                         if !PARSEABLE_ENTIRE_FILE_TYPES.contains(&language.name().as_ref())
-        //                             && &language.name().as_ref() != &"Markdown"
-        //                             && language
-        //                                 .grammar()
-        //                                 .and_then(|grammar| grammar.embedding_config.as_ref())
-        //                                 .is_none()
-        //                         {
-        //                             continue;
-        //                         }
-
-        //                         let path_buf = file.path.to_path_buf();
-        //                         let stored_mtime = file_mtimes.remove(&file.path.to_path_buf());
-        //                         let already_stored = stored_mtime
-        //                             .map_or(false, |existing_mtime| existing_mtime == file.mtime);
-
-        //                         if !already_stored {
-        //                             count += 1;
-
-        //                             let job_handle = JobHandle::new(&job_count_tx);
-        //                             parsing_files_tx
-        //                                 .try_send(PendingFile {
-        //                                     worktree_db_id: db_ids_by_worktree_id[&worktree.id()],
-        //                                     relative_path: path_buf,
-        //                                     absolute_path,
-        //                                     language,
-        //                                     job_handle,
-        //                                     modified_time: file.mtime,
-        //                                 })
-        //                                 .unwrap();
-        //                         }
-        //                     }
-        //                 }
-        //                 for file in file_mtimes.keys() {
-        //                     db_update_tx
-        //                         .try_send(DbOperation::Delete {
-        //                             worktree_id: db_ids_by_worktree_id[&worktree.id()],
-        //                             path: file.to_owned(),
-        //                         })
-        //                         .unwrap();
-        //                 }
-        //             }
-
-        //             log::trace!(
-        //                 "walking worktree took {:?} milliseconds",
-        //                 t0.elapsed().as_millis()
-        //             );
-        //             anyhow::Ok((count, job_count_rx))
-        //         })
-        //         .await
-        // })
     }
 
     pub fn outstanding_job_count_rx(