working on initialization + index breakup

KCaverly created

Change summary

crates/search/src/project_search.rs         |   2 
crates/semantic_index/src/semantic_index.rs | 308 ++++++++++++++--------
2 files changed, 197 insertions(+), 113 deletions(-)

Detailed changes

crates/search/src/project_search.rs 🔗

@@ -849,7 +849,7 @@ impl ProjectSearchView {
             let model = model.read(cx);
             project = model.project.clone();
             SemanticIndex::global(cx).map(|semantic| {
-                semantic.update(cx, |this, cx| this.initialize_project(project, cx))
+                semantic.update(cx, |this, cx| this.initialize_project(project.clone(), cx));
             });
         }
 

crates/semantic_index/src/semantic_index.rs 🔗

@@ -96,6 +96,7 @@ struct ProjectState {
     subscription: gpui::Subscription,
     outstanding_job_count_rx: watch::Receiver<usize>,
     _outstanding_job_count_tx: Arc<Mutex<watch::Sender<usize>>>,
+    queue: HashMap<WorktreeId, Vec<IndexOperation>>,
 }
 
 #[derive(Clone)]
@@ -130,9 +131,25 @@ impl ProjectState {
             outstanding_job_count_rx,
             _outstanding_job_count_tx,
             subscription,
+            queue: HashMap::new(),
         }
     }
 
+    fn add_to_queue(&mut self, worktree_id: WorktreeId, operation: IndexOperation) {
+        if let Some(worktree_queue) = self.queue.get_mut(&worktree_id) {
+            worktree_queue.push(operation);
+        } else {
+            self.queue.insert(worktree_id, vec![operation]);
+        }
+    }
+
+    fn pop(&mut self) -> Option<IndexOperation> {
+        self.queue
+            .iter_mut()
+            .next()
+            .and_then(|(_, mut entry)| entry.pop())
+    }
+
     fn db_id_for_worktree_id(&self, id: WorktreeId) -> Option<i64> {
         self.worktree_db_ids
             .iter()
@@ -158,6 +175,7 @@ impl ProjectState {
     }
 }
 
+#[derive(Clone)]
 pub struct PendingFile {
     worktree_db_id: i64,
     relative_path: PathBuf,
@@ -167,6 +185,12 @@ pub struct PendingFile {
     job_handle: JobHandle,
 }
 
+#[derive(Clone)]
+enum IndexOperation {
+    IndexFile { file: PendingFile },
+    DeleteFile { file: PendingFile },
+}
+
 pub struct SearchResult {
     pub buffer: ModelHandle<Buffer>,
     pub range: Range<Anchor>,
@@ -628,102 +652,12 @@ impl SemanticIndex {
             }
         });
 
-        cx.spawn(|this, mut cx| async move {
-            futures::future::join_all(worktree_scans_complete).await;
-
-            let worktree_db_ids = futures::future::join_all(worktree_db_ids).await;
-            let worktrees = project.read_with(&cx, |project, cx| {
-                project
-                    .worktrees(cx)
-                    .map(|worktree| worktree.read(cx).snapshot())
-                    .collect::<Vec<_>>()
-            });
-
-            let mut worktree_file_mtimes = HashMap::new();
-            let mut db_ids_by_worktree_id = HashMap::new();
-
-            for (worktree, db_id) in worktrees.iter().zip(worktree_db_ids) {
-                let db_id = db_id?;
-                db_ids_by_worktree_id.insert(worktree.id(), db_id);
-                worktree_file_mtimes.insert(
-                    worktree.id(),
-                    this.read_with(&cx, |this, _| this.get_file_mtimes(db_id))
-                        .await?,
-                );
-            }
-
-            let worktree_db_ids = db_ids_by_worktree_id
-                .iter()
-                .map(|(a, b)| (*a, *b))
-                .collect();
-
-            let (job_count_tx, job_count_rx) = watch::channel_with(0);
-            let job_count_tx = Arc::new(Mutex::new(job_count_tx));
-            this.update(&mut cx, |this, _| {
-                let project_state = ProjectState::new(
-                    _subscription,
-                    worktree_db_ids,
-                    worktree_file_mtimes.clone(),
-                    job_count_rx,
-                    job_count_tx,
-                );
-                this.projects.insert(project.downgrade(), project_state);
-            });
-
-            anyhow::Ok(())
-        })
-        .detach_and_log_err(cx)
-    }
-
-    pub fn index_project(
-        &mut self,
-        project: ModelHandle<Project>,
-        cx: &mut ModelContext<Self>,
-    ) -> Task<Result<(usize, watch::Receiver<usize>)>> {
-        let t0 = Instant::now();
-        let worktree_scans_complete = project
-            .read(cx)
-            .worktrees(cx)
-            .map(|worktree| {
-                let scan_complete = worktree.read(cx).as_local().unwrap().scan_complete();
-                async move {
-                    scan_complete.await;
-                }
-            })
-            .collect::<Vec<_>>();
-        let worktree_db_ids = project
-            .read(cx)
-            .worktrees(cx)
-            .map(|worktree| {
-                self.find_or_create_worktree(worktree.read(cx).abs_path().to_path_buf())
-            })
-            .collect::<Vec<_>>();
-
         let language_registry = self.language_registry.clone();
-        let db_update_tx = self.db_update_tx.clone();
-        let parsing_files_tx = self.parsing_files_tx.clone();
-
-        let state = self.projects.get(&project.downgrade());
-        let state = if state.is_none() {
-            return Task::Ready(Some(Err(anyhow!("Project not yet initialized"))));
-        } else {
-            state.unwrap()
-        };
-
-        let state = state.clone().to_owned();
-
-        let _subscription = cx.subscribe(&project, |this, project, event, _cx| {
-            if let project::Event::WorktreeUpdatedEntries(worktree_id, changes) = event {
-                todo!();
-                // this.project_entries_changed(project, changes, cx, worktree_id);
-            }
-        });
 
         cx.spawn(|this, mut cx| async move {
             futures::future::join_all(worktree_scans_complete).await;
 
             let worktree_db_ids = futures::future::join_all(worktree_db_ids).await;
-
             let worktrees = project.read_with(&cx, |project, cx| {
                 project
                     .worktrees(cx)
@@ -733,6 +667,7 @@ impl SemanticIndex {
 
             let mut worktree_file_mtimes = HashMap::new();
             let mut db_ids_by_worktree_id = HashMap::new();
+
             for (worktree, db_id) in worktrees.iter().zip(worktree_db_ids) {
                 let db_id = db_id?;
                 db_ids_by_worktree_id.insert(worktree.id(), db_id);
@@ -761,10 +696,12 @@ impl SemanticIndex {
                 this.projects.insert(project.downgrade(), project_state);
             });
 
-            cx.background()
+            let worktree_files = cx
+                .background()
                 .spawn(async move {
-                    let mut count = 0;
+                    let mut worktree_files = HashMap::new();
                     for worktree in worktrees.into_iter() {
+                        let mut candidate_files = Vec::new();
                         let mut file_mtimes = worktree_file_mtimes.remove(&worktree.id()).unwrap();
                         for file in worktree.files(false, 0) {
                             let absolute_path = worktree.absolutize(&file.path);
@@ -773,6 +710,7 @@ impl SemanticIndex {
                                 .language_for_file(&absolute_path, None)
                                 .await
                             {
+                                // Test if file is valid parseable file
                                 if !PARSEABLE_ENTIRE_FILE_TYPES.contains(&language.name().as_ref())
                                     && &language.name().as_ref() != &"Markdown"
                                     && language
@@ -789,40 +727,186 @@ impl SemanticIndex {
                                     .map_or(false, |existing_mtime| existing_mtime == file.mtime);
 
                                 if !already_stored {
-                                    count += 1;
-
                                     let job_handle = JobHandle::new(&job_count_tx);
-                                    parsing_files_tx
-                                        .try_send(PendingFile {
+                                    candidate_files.push(IndexOperation::IndexFile {
+                                        file: PendingFile {
                                             worktree_db_id: db_ids_by_worktree_id[&worktree.id()],
                                             relative_path: path_buf,
                                             absolute_path,
                                             language,
                                             job_handle,
                                             modified_time: file.mtime,
-                                        })
-                                        .unwrap();
+                                        },
+                                    });
                                 }
                             }
                         }
-                        for file in file_mtimes.keys() {
-                            db_update_tx
-                                .try_send(DbOperation::Delete {
-                                    worktree_id: db_ids_by_worktree_id[&worktree.id()],
-                                    path: file.to_owned(),
-                                })
-                                .unwrap();
-                        }
+
+                        worktree_files.insert(worktree.id(), candidate_files);
                     }
 
-                    log::trace!(
-                        "walking worktree took {:?} milliseconds",
-                        t0.elapsed().as_millis()
-                    );
-                    anyhow::Ok((count, job_count_rx))
+                    anyhow::Ok(worktree_files)
                 })
-                .await
+                .await?;
+
+            this.update(&mut cx, |this, cx| {
+                if let Some(project_state) = this.projects.get_mut(&project.downgrade()) {
+                    for (worktree_id, index_operations) in &worktree_files {
+                        for op in index_operations {
+                            project_state.add_to_queue(*worktree_id, op.clone());
+                        }
+                    }
+                }
+            });
+
+            cx.background().spawn(async move { anyhow::Ok(()) }).await
+        })
+        .detach_and_log_err(cx)
+    }
+
+    pub fn index_project(
+        &mut self,
+        project: ModelHandle<Project>,
+        cx: &mut ModelContext<Self>,
+    ) -> Task<Result<(usize, watch::Receiver<usize>)>> {
+        let state = self.projects.get_mut(&project.downgrade());
+        let state = if state.is_none() {
+            return Task::Ready(Some(Err(anyhow!("Project not yet initialized"))));
+        } else {
+            state.unwrap()
+        };
+
+        let parsing_files_tx = self.parsing_files_tx.clone();
+        let db_update_tx = self.db_update_tx.clone();
+        let job_count_rx = state.outstanding_job_count_rx.clone();
+        let count = state.queue.values().map(Vec::len).sum();
+        cx.spawn(|this, mut cx| async move {
+            this.update(&mut cx, |this, cx| {
+                let Some(mut state) = this.projects.get_mut(&project.downgrade()) else {
+                    return;
+                };
+                let Some(mut index_operation) = state.pop() else { return;};
+                let _ = match index_operation {
+                    IndexOperation::IndexFile { file } => {
+                        parsing_files_tx.try_send(file);
+                    }
+                    IndexOperation::DeleteFile { file } => {
+                        db_update_tx.try_send(DbOperation::Delete {
+                            worktree_id: file.worktree_db_id,
+                            path: file.relative_path,
+                        });
+                    }
+                };
+            });
         })
+        .detach();
+
+        Task::Ready(Some(Ok((count, job_count_rx))))
+
+        // cx.spawn(|this, mut cx| async move {
+        //     futures::future::join_all(worktree_scans_complete).await;
+
+        //     let worktree_db_ids = futures::future::join_all(worktree_db_ids).await;
+
+        //     let worktrees = project.read_with(&cx, |project, cx| {
+        //         project
+        //             .worktrees(cx)
+        //             .map(|worktree| worktree.read(cx).snapshot())
+        //             .collect::<Vec<_>>()
+        //     });
+
+        //     let mut worktree_file_mtimes = HashMap::new();
+        //     let mut db_ids_by_worktree_id = HashMap::new();
+        //     for (worktree, db_id) in worktrees.iter().zip(worktree_db_ids) {
+        //         let db_id = db_id?;
+        //         db_ids_by_worktree_id.insert(worktree.id(), db_id);
+        //         worktree_file_mtimes.insert(
+        //             worktree.id(),
+        //             this.read_with(&cx, |this, _| this.get_file_mtimes(db_id))
+        //                 .await?,
+        //         );
+        //     }
+
+        //     let worktree_db_ids = db_ids_by_worktree_id
+        //         .iter()
+        //         .map(|(a, b)| (*a, *b))
+        //         .collect();
+
+        //     let (job_count_tx, job_count_rx) = watch::channel_with(0);
+        //     let job_count_tx = Arc::new(Mutex::new(job_count_tx));
+        //     this.update(&mut cx, |this, _| {
+        //         let project_state = ProjectState::new(
+        //             _subscription,
+        //             worktree_db_ids,
+        //             worktree_file_mtimes.clone(),
+        //             job_count_rx.clone(),
+        //             job_count_tx.clone(),
+        //         );
+        //         this.projects.insert(project.downgrade(), project_state);
+        //     });
+
+        //     cx.background()
+        //         .spawn(async move {
+        //             let mut count = 0;
+        //             for worktree in worktrees.into_iter() {
+        //                 let mut file_mtimes = worktree_file_mtimes.remove(&worktree.id()).unwrap();
+        //                 for file in worktree.files(false, 0) {
+        //                     let absolute_path = worktree.absolutize(&file.path);
+
+        //                     if let Ok(language) = language_registry
+        //                         .language_for_file(&absolute_path, None)
+        //                         .await
+        //                     {
+        //                         if !PARSEABLE_ENTIRE_FILE_TYPES.contains(&language.name().as_ref())
+        //                             && &language.name().as_ref() != &"Markdown"
+        //                             && language
+        //                                 .grammar()
+        //                                 .and_then(|grammar| grammar.embedding_config.as_ref())
+        //                                 .is_none()
+        //                         {
+        //                             continue;
+        //                         }
+
+        //                         let path_buf = file.path.to_path_buf();
+        //                         let stored_mtime = file_mtimes.remove(&file.path.to_path_buf());
+        //                         let already_stored = stored_mtime
+        //                             .map_or(false, |existing_mtime| existing_mtime == file.mtime);
+
+        //                         if !already_stored {
+        //                             count += 1;
+
+        //                             let job_handle = JobHandle::new(&job_count_tx);
+        //                             parsing_files_tx
+        //                                 .try_send(PendingFile {
+        //                                     worktree_db_id: db_ids_by_worktree_id[&worktree.id()],
+        //                                     relative_path: path_buf,
+        //                                     absolute_path,
+        //                                     language,
+        //                                     job_handle,
+        //                                     modified_time: file.mtime,
+        //                                 })
+        //                                 .unwrap();
+        //                         }
+        //                     }
+        //                 }
+        //                 for file in file_mtimes.keys() {
+        //                     db_update_tx
+        //                         .try_send(DbOperation::Delete {
+        //                             worktree_id: db_ids_by_worktree_id[&worktree.id()],
+        //                             path: file.to_owned(),
+        //                         })
+        //                         .unwrap();
+        //                 }
+        //             }
+
+        //             log::trace!(
+        //                 "walking worktree took {:?} milliseconds",
+        //                 t0.elapsed().as_millis()
+        //             );
+        //             anyhow::Ok((count, job_count_rx))
+        //         })
+        //         .await
+        // })
     }
 
     pub fn outstanding_job_count_rx(