reindex files in the background after they have not been edited for 10 minutes

KCaverly and Max created

Co-authored-by: Max <max@zed.dev>

Change summary

crates/semantic_index/src/semantic_index.rs | 416 ++++++++++------------
1 file changed, 188 insertions(+), 228 deletions(-)

Detailed changes

crates/semantic_index/src/semantic_index.rs 🔗

@@ -16,16 +16,18 @@ use language::{Anchor, Buffer, Language, LanguageRegistry};
 use parking_lot::Mutex;
 use parsing::{CodeContextRetriever, Document, PARSEABLE_ENTIRE_FILE_TYPES};
 use postage::watch;
-use project::{search::PathMatcher, Fs, PathChange, Project, ProjectEntryId, WorktreeId};
+use project::{
+    search::PathMatcher, Fs, PathChange, Project, ProjectEntryId, ProjectPath, Worktree, WorktreeId,
+};
 use smol::channel;
 use std::{
     cmp::Ordering,
-    collections::HashMap,
+    collections::{BTreeMap, HashMap},
     mem,
     ops::Range,
     path::{Path, PathBuf},
     sync::{Arc, Weak},
-    time::{Instant, SystemTime},
+    time::{Duration, Instant, SystemTime},
 };
 use util::{
     channel::{ReleaseChannel, RELEASE_CHANNEL, RELEASE_CHANNEL_NAME},
@@ -37,6 +39,7 @@ use workspace::WorkspaceCreated;
 
 const SEMANTIC_INDEX_VERSION: usize = 7;
 const EMBEDDINGS_BATCH_SIZE: usize = 80;
+const BACKGROUND_INDEXING_DELAY: Duration = Duration::from_secs(600);
 
 pub fn init(
     fs: Arc<dyn Fs>,
@@ -77,6 +80,7 @@ pub fn init(
         let semantic_index = SemanticIndex::new(
             fs,
             db_file_path,
+            // Arc::new(embedding::DummyEmbeddings {}),
             Arc::new(OpenAIEmbeddings {
                 client: http_client,
                 executor: cx.background(),
@@ -113,9 +117,14 @@ struct ProjectState {
     worktree_db_ids: Vec<(WorktreeId, i64)>,
     _subscription: gpui::Subscription,
     outstanding_job_count_rx: watch::Receiver<usize>,
-    _outstanding_job_count_tx: Arc<Mutex<watch::Sender<usize>>>,
-    job_queue_tx: channel::Sender<IndexOperation>,
-    _queue_update_task: Task<()>,
+    outstanding_job_count_tx: Arc<Mutex<watch::Sender<usize>>>,
+    changed_paths: BTreeMap<ProjectPath, ChangedPathInfo>,
+}
+
+struct ChangedPathInfo {
+    changed_at: Instant,
+    mtime: SystemTime,
+    is_deleted: bool,
 }
 
 #[derive(Clone)]
@@ -133,31 +142,21 @@ impl JobHandle {
         }
     }
 }
+
 impl ProjectState {
     fn new(
-        cx: &mut AppContext,
         subscription: gpui::Subscription,
         worktree_db_ids: Vec<(WorktreeId, i64)>,
-        outstanding_job_count_rx: watch::Receiver<usize>,
-        _outstanding_job_count_tx: Arc<Mutex<watch::Sender<usize>>>,
+        changed_paths: BTreeMap<ProjectPath, ChangedPathInfo>,
     ) -> Self {
-        let (job_queue_tx, job_queue_rx) = channel::unbounded();
-        let _queue_update_task = cx.background().spawn({
-            let mut worktree_queue = HashMap::new();
-            async move {
-                while let Ok(operation) = job_queue_rx.recv().await {
-                    Self::update_queue(&mut worktree_queue, operation);
-                }
-            }
-        });
-
+        let (outstanding_job_count_tx, outstanding_job_count_rx) = watch::channel_with(0);
+        let outstanding_job_count_tx = Arc::new(Mutex::new(outstanding_job_count_tx));
         Self {
             worktree_db_ids,
             outstanding_job_count_rx,
-            _outstanding_job_count_tx,
+            outstanding_job_count_tx,
+            changed_paths,
             _subscription: subscription,
-            _queue_update_task,
-            job_queue_tx,
         }
     }
 
@@ -165,41 +164,6 @@ impl ProjectState {
         self.outstanding_job_count_rx.borrow().clone()
     }
 
-    fn update_queue(queue: &mut HashMap<PathBuf, IndexOperation>, operation: IndexOperation) {
-        match operation {
-            IndexOperation::FlushQueue => {
-                let queue = std::mem::take(queue);
-                for (_, op) in queue {
-                    match op {
-                        IndexOperation::IndexFile {
-                            absolute_path: _,
-                            payload,
-                            tx,
-                        } => {
-                            let _ = tx.try_send(payload);
-                        }
-                        IndexOperation::DeleteFile {
-                            absolute_path: _,
-                            payload,
-                            tx,
-                        } => {
-                            let _ = tx.try_send(payload);
-                        }
-                        _ => {}
-                    }
-                }
-            }
-            IndexOperation::IndexFile {
-                ref absolute_path, ..
-            }
-            | IndexOperation::DeleteFile {
-                ref absolute_path, ..
-            } => {
-                queue.insert(absolute_path.clone(), operation);
-            }
-        }
-    }
-
     fn db_id_for_worktree_id(&self, id: WorktreeId) -> Option<i64> {
         self.worktree_db_ids
             .iter()
@@ -230,23 +194,10 @@ pub struct PendingFile {
     worktree_db_id: i64,
     relative_path: PathBuf,
     absolute_path: PathBuf,
-    language: Arc<Language>,
+    language: Option<Arc<Language>>,
     modified_time: SystemTime,
     job_handle: JobHandle,
 }
-enum IndexOperation {
-    IndexFile {
-        absolute_path: PathBuf,
-        payload: PendingFile,
-        tx: channel::Sender<PendingFile>,
-    },
-    DeleteFile {
-        absolute_path: PathBuf,
-        payload: DbOperation,
-        tx: channel::Sender<DbOperation>,
-    },
-    FlushQueue,
-}
 
 pub struct SearchResult {
     pub buffer: ModelHandle<Buffer>,
@@ -582,13 +533,13 @@ impl SemanticIndex {
         parsing_files_rx: &channel::Receiver<PendingFile>,
         db_update_tx: &channel::Sender<DbOperation>,
     ) {
+        let Some(language) = pending_file.language else {
+            return;
+        };
+
         if let Some(content) = fs.load(&pending_file.absolute_path).await.log_err() {
             if let Some(documents) = retriever
-                .parse_file_with_template(
-                    &pending_file.relative_path,
-                    &content,
-                    pending_file.language,
-                )
+                .parse_file_with_template(&pending_file.relative_path, &content, language)
                 .log_err()
             {
                 log::trace!(
@@ -679,103 +630,50 @@ impl SemanticIndex {
     }
 
     fn project_entries_changed(
-        &self,
+        &mut self,
         project: ModelHandle<Project>,
         changes: Arc<[(Arc<Path>, ProjectEntryId, PathChange)]>,
         cx: &mut ModelContext<'_, SemanticIndex>,
         worktree_id: &WorktreeId,
-    ) -> Result<()> {
-        let parsing_files_tx = self.parsing_files_tx.clone();
-        let db_update_tx = self.db_update_tx.clone();
-        let (job_queue_tx, outstanding_job_tx, worktree_db_id) = {
-            let state = self
-                .projects
-                .get(&project.downgrade())
-                .ok_or(anyhow!("Project not yet initialized"))?;
-            let worktree_db_id = state
-                .db_id_for_worktree_id(*worktree_id)
-                .ok_or(anyhow!("Worktree ID in Database Not Available"))?;
-            (
-                state.job_queue_tx.clone(),
-                state._outstanding_job_count_tx.clone(),
-                worktree_db_id,
-            )
+    ) {
+        let Some(worktree) = project.read(cx).worktree_for_id(worktree_id.clone(), cx) else {
+            return;
+        };
+        let project = project.downgrade();
+        let Some(project_state) = self.projects.get_mut(&project) else {
+            return;
         };
 
-        let language_registry = self.language_registry.clone();
-        let parsing_files_tx = parsing_files_tx.clone();
-        let db_update_tx = db_update_tx.clone();
-
-        let worktree = project
-            .read(cx)
-            .worktree_for_id(worktree_id.clone(), cx)
-            .ok_or(anyhow!("Worktree not available"))?
-            .read(cx)
-            .snapshot();
-        cx.spawn(|_, _| async move {
-            let worktree = worktree.clone();
-            for (path, entry_id, path_change) in changes.iter() {
-                let relative_path = path.to_path_buf();
-                let absolute_path = worktree.absolutize(path);
-
-                let Some(entry) = worktree.entry_for_id(*entry_id) else {
-                    continue;
-                };
-                if entry.is_ignored || entry.is_symlink || entry.is_external {
-                    continue;
-                }
-
-                log::trace!("File Event: {:?}, Path: {:?}", &path_change, &path);
-                match path_change {
-                    PathChange::AddedOrUpdated | PathChange::Updated | PathChange::Added => {
-                        if let Ok(language) = language_registry
-                            .language_for_file(&relative_path, None)
-                            .await
-                        {
-                            if !PARSEABLE_ENTIRE_FILE_TYPES.contains(&language.name().as_ref())
-                                && &language.name().as_ref() != &"Markdown"
-                                && language
-                                    .grammar()
-                                    .and_then(|grammar| grammar.embedding_config.as_ref())
-                                    .is_none()
-                            {
-                                continue;
-                            }
+        let worktree = worktree.read(cx);
+        let change_time = Instant::now();
+        for (path, entry_id, change) in changes.iter() {
+            let Some(entry) = worktree.entry_for_id(*entry_id) else {
+                continue;
+            };
+            if entry.is_ignored || entry.is_symlink || entry.is_external {
+                continue;
+            }
+            let project_path = ProjectPath {
+                worktree_id: *worktree_id,
+                path: path.clone(),
+            };
+            project_state.changed_paths.insert(
+                project_path,
+                ChangedPathInfo {
+                    changed_at: change_time,
+                    mtime: entry.mtime,
+                    is_deleted: *change == PathChange::Removed,
+                },
+            );
+        }
 
-                            let job_handle = JobHandle::new(&outstanding_job_tx);
-                            let new_operation = IndexOperation::IndexFile {
-                                absolute_path: absolute_path.clone(),
-                                payload: PendingFile {
-                                    worktree_db_id,
-                                    relative_path,
-                                    absolute_path,
-                                    language,
-                                    modified_time: entry.mtime,
-                                    job_handle,
-                                },
-                                tx: parsing_files_tx.clone(),
-                            };
-                            let _ = job_queue_tx.try_send(new_operation);
-                        }
-                    }
-                    PathChange::Removed => {
-                        let new_operation = IndexOperation::DeleteFile {
-                            absolute_path,
-                            payload: DbOperation::Delete {
-                                worktree_id: worktree_db_id,
-                                path: relative_path,
-                            },
-                            tx: db_update_tx.clone(),
-                        };
-                        let _ = job_queue_tx.try_send(new_operation);
-                    }
-                    _ => {}
-                }
+        cx.spawn_weak(|this, mut cx| async move {
+            cx.background().timer(BACKGROUND_INDEXING_DELAY).await;
+            if let Some((this, project)) = this.upgrade(&cx).zip(project.upgrade(&cx)) {
+                Self::reindex_changed_paths(this, project, Some(change_time), &mut cx).await;
             }
         })
         .detach();
-
-        Ok(())
     }
 
     pub fn initialize_project(
@@ -805,14 +703,11 @@ impl SemanticIndex {
 
         let _subscription = cx.subscribe(&project, |this, project, event, cx| {
             if let project::Event::WorktreeUpdatedEntries(worktree_id, changes) = event {
-                let _ =
-                    this.project_entries_changed(project.clone(), changes.clone(), cx, worktree_id);
+                this.project_entries_changed(project.clone(), changes.clone(), cx, worktree_id);
             };
         });
 
         let language_registry = self.language_registry.clone();
-        let parsing_files_tx = self.parsing_files_tx.clone();
-        let db_update_tx = self.db_update_tx.clone();
 
         cx.spawn(|this, mut cx| async move {
             futures::future::join_all(worktree_scans_complete).await;
@@ -843,17 +738,13 @@ impl SemanticIndex {
                 .map(|(a, b)| (*a, *b))
                 .collect();
 
-            let (job_count_tx, job_count_rx) = watch::channel_with(0);
-            let job_count_tx = Arc::new(Mutex::new(job_count_tx));
-            let job_count_tx_longlived = job_count_tx.clone();
-
-            let worktree_files = cx
+            let changed_paths = cx
                 .background()
                 .spawn(async move {
-                    let mut worktree_files = Vec::new();
+                    let mut changed_paths = BTreeMap::new();
+                    let now = Instant::now();
                     for worktree in worktrees.into_iter() {
                         let mut file_mtimes = worktree_file_mtimes.remove(&worktree.id()).unwrap();
-                        let worktree_db_id = db_ids_by_worktree_id[&worktree.id()];
                         for file in worktree.files(false, 0) {
                             let absolute_path = worktree.absolutize(&file.path);
 
@@ -876,59 +767,51 @@ impl SemanticIndex {
                                     continue;
                                 }
 
-                                let path_buf = file.path.to_path_buf();
                                 let stored_mtime = file_mtimes.remove(&file.path.to_path_buf());
                                 let already_stored = stored_mtime
                                     .map_or(false, |existing_mtime| existing_mtime == file.mtime);
 
                                 if !already_stored {
-                                    let job_handle = JobHandle::new(&job_count_tx);
-                                    worktree_files.push(IndexOperation::IndexFile {
-                                        absolute_path: absolute_path.clone(),
-                                        payload: PendingFile {
-                                            worktree_db_id,
-                                            relative_path: path_buf,
-                                            absolute_path,
-                                            language,
-                                            job_handle,
-                                            modified_time: file.mtime,
+                                    changed_paths.insert(
+                                        ProjectPath {
+                                            worktree_id: worktree.id(),
+                                            path: file.path.clone(),
+                                        },
+                                        ChangedPathInfo {
+                                            changed_at: now,
+                                            mtime: file.mtime,
+                                            is_deleted: false,
                                         },
-                                        tx: parsing_files_tx.clone(),
-                                    });
+                                    );
                                 }
                             }
                         }
+
                         // Clean up entries from database that are no longer in the worktree.
-                        for (path, _) in file_mtimes {
-                            worktree_files.push(IndexOperation::DeleteFile {
-                                absolute_path: worktree.absolutize(path.as_path()),
-                                payload: DbOperation::Delete {
-                                    worktree_id: worktree_db_id,
-                                    path,
+                        for (path, mtime) in file_mtimes {
+                            changed_paths.insert(
+                                ProjectPath {
+                                    worktree_id: worktree.id(),
+                                    path: path.into(),
                                 },
-                                tx: db_update_tx.clone(),
-                            });
+                                ChangedPathInfo {
+                                    changed_at: now,
+                                    mtime,
+                                    is_deleted: true,
+                                },
+                            );
                         }
                     }
 
-                    anyhow::Ok(worktree_files)
+                    anyhow::Ok(changed_paths)
                 })
                 .await?;
 
-            this.update(&mut cx, |this, cx| {
-                let project_state = ProjectState::new(
-                    cx,
-                    _subscription,
-                    worktree_db_ids,
-                    job_count_rx,
-                    job_count_tx_longlived,
+            this.update(&mut cx, |this, _| {
+                this.projects.insert(
+                    project.downgrade(),
+                    ProjectState::new(_subscription, worktree_db_ids, changed_paths),
                 );
-
-                for op in worktree_files {
-                    let _ = project_state.job_queue_tx.try_send(op);
-                }
-
-                this.projects.insert(project.downgrade(), project_state);
             });
             Result::<(), _>::Ok(())
         })
@@ -939,27 +822,17 @@ impl SemanticIndex {
         project: ModelHandle<Project>,
         cx: &mut ModelContext<Self>,
     ) -> Task<Result<(usize, watch::Receiver<usize>)>> {
-        let state = self.projects.get_mut(&project.downgrade());
-        let state = if state.is_none() {
-            return Task::Ready(Some(Err(anyhow!("Project not yet initialized"))));
-        } else {
-            state.unwrap()
-        };
-
-        // let parsing_files_tx = self.parsing_files_tx.clone();
-        // let db_update_tx = self.db_update_tx.clone();
-        let job_count_rx = state.outstanding_job_count_rx.clone();
-        let count = state.get_outstanding_count();
-
         cx.spawn(|this, mut cx| async move {
-            this.update(&mut cx, |this, _| {
-                let Some(state) = this.projects.get_mut(&project.downgrade()) else {
-                    return;
-                };
-                let _ = state.job_queue_tx.try_send(IndexOperation::FlushQueue);
-            });
+            Self::reindex_changed_paths(this.clone(), project.clone(), None, &mut cx).await;
 
-            Ok((count, job_count_rx))
+            this.update(&mut cx, |this, _cx| {
+                let Some(state) = this.projects.get(&project.downgrade()) else {
+                    return Err(anyhow!("Project not yet initialized"));
+                };
+                let job_count_rx = state.outstanding_job_count_rx.clone();
+                let count = state.get_outstanding_count();
+                Ok((count, job_count_rx))
+            })
         })
     }
 
@@ -1110,6 +983,93 @@ impl SemanticIndex {
                 .collect::<Vec<_>>())
         })
     }
+
+    async fn reindex_changed_paths(
+        this: ModelHandle<SemanticIndex>,
+        project: ModelHandle<Project>,
+        last_changed_before: Option<Instant>,
+        cx: &mut AsyncAppContext,
+    ) {
+        let mut pending_files = Vec::new();
+        let (language_registry, parsing_files_tx) = this.update(cx, |this, cx| {
+            if let Some(project_state) = this.projects.get_mut(&project.downgrade()) {
+                let outstanding_job_count_tx = &project_state.outstanding_job_count_tx;
+                let db_ids = &project_state.worktree_db_ids;
+                let mut worktree: Option<ModelHandle<Worktree>> = None;
+
+                project_state.changed_paths.retain(|path, info| {
+                    if let Some(last_changed_before) = last_changed_before {
+                        if info.changed_at > last_changed_before {
+                            return true;
+                        }
+                    }
+
+                    if worktree
+                        .as_ref()
+                        .map_or(true, |tree| tree.read(cx).id() != path.worktree_id)
+                    {
+                        worktree = project.read(cx).worktree_for_id(path.worktree_id, cx);
+                    }
+                    let Some(worktree) = &worktree else {
+                        return false;
+                    };
+
+                    let Some(worktree_db_id) = db_ids
+                        .iter()
+                        .find_map(|entry| (entry.0 == path.worktree_id).then_some(entry.1))
+                    else {
+                        return false;
+                    };
+
+                    if info.is_deleted {
+                        this.db_update_tx
+                            .try_send(DbOperation::Delete {
+                                worktree_id: worktree_db_id,
+                                path: path.path.to_path_buf(),
+                            })
+                            .ok();
+                    } else {
+                        let absolute_path = worktree.read(cx).absolutize(&path.path);
+                        let job_handle = JobHandle::new(&outstanding_job_count_tx);
+                        pending_files.push(PendingFile {
+                            absolute_path,
+                            relative_path: path.path.to_path_buf(),
+                            language: None,
+                            job_handle,
+                            modified_time: info.mtime,
+                            worktree_db_id,
+                        });
+                    }
+
+                    false
+                });
+            }
+
+            (
+                this.language_registry.clone(),
+                this.parsing_files_tx.clone(),
+            )
+        });
+
+        for mut pending_file in pending_files {
+            if let Ok(language) = language_registry
+                .language_for_file(&pending_file.relative_path, None)
+                .await
+            {
+                if !PARSEABLE_ENTIRE_FILE_TYPES.contains(&language.name().as_ref())
+                    && &language.name().as_ref() != &"Markdown"
+                    && language
+                        .grammar()
+                        .and_then(|grammar| grammar.embedding_config.as_ref())
+                        .is_none()
+                {
+                    continue;
+                }
+                pending_file.language = Some(language);
+            }
+            parsing_files_tx.try_send(pending_file).ok();
+        }
+    }
 }
 
 impl Entity for SemanticIndex {