From d4cff684751e464e1fc733bd5cbe72772e95f891 Mon Sep 17 00:00:00 2001 From: Antonio Scandurra Date: Tue, 5 Sep 2023 12:26:48 +0200 Subject: [PATCH 01/11] :art: --- Cargo.lock | 1 + crates/semantic_index/Cargo.toml | 2 + crates/semantic_index/src/db.rs | 12 +- crates/semantic_index/src/embedding_queue.rs | 4 +- crates/semantic_index/src/semantic_index.rs | 535 ++++++++++-------- .../src/semantic_index_tests.rs | 9 +- 6 files changed, 306 insertions(+), 257 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e0eb1947e21956fa85ee180c902435f463e1e8d6..c99e88b9b46c33fefa25f630bd8507a36546fac4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6713,6 +6713,7 @@ dependencies = [ "anyhow", "async-trait", "bincode", + "collections", "ctor", "editor", "env_logger 0.9.3", diff --git a/crates/semantic_index/Cargo.toml b/crates/semantic_index/Cargo.toml index d46346e0ab8ceb4f6047c2022a8ce7083a2b16be..72a36efd508eecc5e28223e3593ad883a6084e2d 100644 --- a/crates/semantic_index/Cargo.toml +++ b/crates/semantic_index/Cargo.toml @@ -9,6 +9,7 @@ path = "src/semantic_index.rs" doctest = false [dependencies] +collections = { path = "../collections" } gpui = { path = "../gpui" } language = { path = "../language" } project = { path = "../project" } @@ -42,6 +43,7 @@ sha1 = "0.10.5" parse_duration = "2.1.1" [dev-dependencies] +collections = { path = "../collections", features = ["test-support"] } gpui = { path = "../gpui", features = ["test-support"] } language = { path = "../language", features = ["test-support"] } project = { path = "../project", features = ["test-support"] } diff --git a/crates/semantic_index/src/db.rs b/crates/semantic_index/src/db.rs index 2ececc1eb6b768ec7dd6b69ed4d00290a24e5d60..5664210388ab68ccc52e55221aedf67afbd1b635 100644 --- a/crates/semantic_index/src/db.rs +++ b/crates/semantic_index/src/db.rs @@ -4,6 +4,7 @@ use crate::{ SEMANTIC_INDEX_VERSION, }; use anyhow::{anyhow, Context, Result}; +use collections::HashMap; use futures::channel::oneshot; use gpui::executor; use project::{search::PathMatcher, Fs}; @@ -12,7 +13,6 @@ use rusqlite::params; use rusqlite::types::Value; use std::{ cmp::Ordering, - collections::HashMap, future::Future, ops::Range, path::{Path, PathBuf}, @@ -195,7 +195,7 @@ impl VectorDatabase { pub fn delete_file( &self, worktree_id: i64, - delete_path: PathBuf, + delete_path: Arc, ) -> impl Future> { self.transact(move |db| { db.execute( @@ -209,7 +209,7 @@ impl VectorDatabase { pub fn insert_file( &self, worktree_id: i64, - path: PathBuf, + path: Arc, mtime: SystemTime, documents: Vec, ) -> impl Future> { @@ -288,7 +288,7 @@ impl VectorDatabase { WHERE files.worktree_id = ? AND files.relative_path IN rarray(?) ", )?; - let mut embeddings_by_digest = HashMap::new(); + let mut embeddings_by_digest = HashMap::default(); for (worktree_id, file_paths) in worktree_id_file_paths { let file_paths = Rc::new( file_paths @@ -316,7 +316,7 @@ impl VectorDatabase { pub fn find_or_create_worktree( &self, - worktree_root_path: PathBuf, + worktree_root_path: Arc, ) -> impl Future> { self.transact(move |db| { let mut worktree_query = @@ -351,7 +351,7 @@ impl VectorDatabase { WHERE worktree_id = ?1 ORDER BY relative_path", )?; - let mut result: HashMap = HashMap::new(); + let mut result: HashMap = HashMap::default(); for row in statement.query_map(params![worktree_id], |row| { Ok(( row.get::<_, String>(0)?.into(), diff --git a/crates/semantic_index/src/embedding_queue.rs b/crates/semantic_index/src/embedding_queue.rs index 96493fc4d34558ff0d48bd322bca81d1a3cd3a90..f1abbde3a4e2d528bc0b0b292511462c515b4a74 100644 --- a/crates/semantic_index/src/embedding_queue.rs +++ b/crates/semantic_index/src/embedding_queue.rs @@ -2,12 +2,12 @@ use crate::{embedding::EmbeddingProvider, parsing::Document, JobHandle}; use gpui::executor::Background; use parking_lot::Mutex; use smol::channel; -use std::{mem, ops::Range, path::PathBuf, sync::Arc, time::SystemTime}; +use std::{mem, ops::Range, path::Path, sync::Arc, time::SystemTime}; #[derive(Clone)] pub struct FileToEmbed { pub worktree_id: i64, - pub path: PathBuf, + pub path: Arc, pub mtime: SystemTime, pub documents: Vec, pub job_handle: JobHandle, diff --git a/crates/semantic_index/src/semantic_index.rs b/crates/semantic_index/src/semantic_index.rs index a917eabfc8b8103725516bab61c4ff1b17b248d6..6441d8d5c0d4d3cf5b9c14e03fa32b6aad8a0df2 100644 --- a/crates/semantic_index/src/semantic_index.rs +++ b/crates/semantic_index/src/semantic_index.rs @@ -9,6 +9,7 @@ mod semantic_index_tests; use crate::semantic_index_settings::SemanticIndexSettings; use anyhow::{anyhow, Result}; +use collections::{BTreeMap, HashMap, HashSet}; use db::VectorDatabase; use embedding::{Embedding, EmbeddingProvider, OpenAIEmbeddings}; use embedding_queue::{EmbeddingQueue, FileToEmbed}; @@ -18,13 +19,10 @@ use language::{Anchor, Buffer, Language, LanguageRegistry}; use parking_lot::Mutex; use parsing::{CodeContextRetriever, DocumentDigest, PARSEABLE_ENTIRE_FILE_TYPES}; use postage::watch; -use project::{ - search::PathMatcher, Fs, PathChange, Project, ProjectEntryId, ProjectPath, Worktree, WorktreeId, -}; +use project::{search::PathMatcher, Fs, PathChange, Project, ProjectEntryId, Worktree, WorktreeId}; use smol::channel; use std::{ cmp::Ordering, - collections::{BTreeMap, HashMap}, ops::Range, path::{Path, PathBuf}, sync::{Arc, Weak}, @@ -34,7 +32,7 @@ use util::{ channel::{ReleaseChannel, RELEASE_CHANNEL, RELEASE_CHANNEL_NAME}, http::HttpClient, paths::EMBEDDINGS_DIR, - ResultExt, + ResultExt, TryFutureExt, }; use workspace::WorkspaceCreated; @@ -68,9 +66,7 @@ pub fn init( if let Some(workspace) = workspace.upgrade(cx) { let project = workspace.read(cx).project().clone(); if project.read(cx).is_local() { - semantic_index.update(cx, |index, cx| { - index.initialize_project(project, cx).detach_and_log_err(cx) - }); + semantic_index.update(cx, |index, cx| index.register_project(project, cx)); } } } @@ -111,11 +107,56 @@ pub struct SemanticIndex { } struct ProjectState { - worktree_db_ids: Vec<(WorktreeId, i64)>, - _subscription: gpui::Subscription, + worktrees: HashMap, outstanding_job_count_rx: watch::Receiver, outstanding_job_count_tx: Arc>>, - changed_paths: BTreeMap, + _subscription: gpui::Subscription, +} + +enum WorktreeState { + Registering(RegisteringWorktreeState), + Registered(RegisteredWorktreeState), +} + +impl WorktreeState { + fn paths_changed( + &mut self, + changes: Arc<[(Arc, ProjectEntryId, PathChange)]>, + changed_at: Instant, + worktree: &Worktree, + ) { + let changed_paths = match self { + Self::Registering(state) => &mut state.changed_paths, + Self::Registered(state) => &mut state.changed_paths, + }; + + for (path, entry_id, change) in changes.iter() { + let Some(entry) = worktree.entry_for_id(*entry_id) else { + continue; + }; + if entry.is_ignored || entry.is_symlink || entry.is_external || entry.is_dir() { + continue; + } + changed_paths.insert( + path.clone(), + ChangedPathInfo { + changed_at, + mtime: entry.mtime, + is_deleted: *change == PathChange::Removed, + }, + ); + } + } +} + +struct RegisteringWorktreeState { + changed_paths: BTreeMap, ChangedPathInfo>, + _registration: Task>, +} + +struct RegisteredWorktreeState { + db_id: i64, + changed_paths: BTreeMap, ChangedPathInfo>, } struct ChangedPathInfo { @@ -141,55 +182,42 @@ impl JobHandle { } impl ProjectState { - fn new( - subscription: gpui::Subscription, - worktree_db_ids: Vec<(WorktreeId, i64)>, - changed_paths: BTreeMap, - ) -> Self { + fn new(subscription: gpui::Subscription) -> Self { let (outstanding_job_count_tx, outstanding_job_count_rx) = watch::channel_with(0); let outstanding_job_count_tx = Arc::new(Mutex::new(outstanding_job_count_tx)); Self { - worktree_db_ids, + worktrees: Default::default(), outstanding_job_count_rx, outstanding_job_count_tx, - changed_paths, _subscription: subscription, } } - pub fn get_outstanding_count(&self) -> usize { - self.outstanding_job_count_rx.borrow().clone() - } - fn db_id_for_worktree_id(&self, id: WorktreeId) -> Option { - self.worktree_db_ids - .iter() - .find_map(|(worktree_id, db_id)| { - if *worktree_id == id { - Some(*db_id) - } else { - None - } - }) + match self.worktrees.get(&id)? { + WorktreeState::Registering(_) => None, + WorktreeState::Registered(state) => Some(state.db_id), + } } fn worktree_id_for_db_id(&self, id: i64) -> Option { - self.worktree_db_ids + self.worktrees .iter() - .find_map(|(worktree_id, db_id)| { - if *db_id == id { - Some(*worktree_id) - } else { - None - } + .find_map(|(worktree_id, worktree_state)| match worktree_state { + WorktreeState::Registered(state) if state.db_id == id => Some(*worktree_id), + _ => None, }) } + + fn worktree(&mut self, id: WorktreeId) -> Option<&mut WorktreeState> { + self.worktrees.get_mut(&id) + } } #[derive(Clone)] pub struct PendingFile { worktree_db_id: i64, - relative_path: PathBuf, + relative_path: Arc, absolute_path: PathBuf, language: Option>, modified_time: SystemTime, @@ -298,7 +326,7 @@ impl SemanticIndex { parsing_files_tx, _embedding_task, _parsing_files_tasks, - projects: HashMap::new(), + projects: Default::default(), } })) } @@ -369,9 +397,9 @@ impl SemanticIndex { fn project_entries_changed( &mut self, project: ModelHandle, + worktree_id: WorktreeId, changes: Arc<[(Arc, ProjectEntryId, PathChange)]>, - cx: &mut ModelContext<'_, SemanticIndex>, - worktree_id: &WorktreeId, + cx: &mut ModelContext, ) { let Some(worktree) = project.read(cx).worktree_for_id(worktree_id.clone(), cx) else { return; @@ -381,131 +409,103 @@ impl SemanticIndex { return; }; - let embeddings_for_digest = { - let mut worktree_id_file_paths = HashMap::new(); - for (path, _) in &project_state.changed_paths { - if let Some(worktree_db_id) = project_state.db_id_for_worktree_id(path.worktree_id) - { - worktree_id_file_paths - .entry(worktree_db_id) - .or_insert(Vec::new()) - .push(path.path.clone()); - } - } - self.db.embeddings_for_files(worktree_id_file_paths) - }; - - let worktree = worktree.read(cx); let change_time = Instant::now(); - for (path, entry_id, change) in changes.iter() { - let Some(entry) = worktree.entry_for_id(*entry_id) else { - continue; - }; - if entry.is_ignored || entry.is_symlink || entry.is_external { - continue; - } - let project_path = ProjectPath { - worktree_id: *worktree_id, - path: path.clone(), + let worktree = worktree.read(cx); + let worktree_state = if let Some(worktree_state) = project_state.worktree(worktree_id) { + worktree_state + } else { + return; + }; + worktree_state.paths_changed(changes, Instant::now(), worktree); + if let WorktreeState::Registered(worktree_state) = worktree_state { + let embeddings_for_digest = { + let worktree_paths = worktree_state + .changed_paths + .iter() + .map(|(path, _)| path.clone()) + .collect::>(); + let mut worktree_id_file_paths = HashMap::default(); + worktree_id_file_paths.insert(worktree_state.db_id, worktree_paths); + self.db.embeddings_for_files(worktree_id_file_paths) }; - project_state.changed_paths.insert( - project_path, - ChangedPathInfo { - changed_at: change_time, - mtime: entry.mtime, - is_deleted: *change == PathChange::Removed, - }, - ); + + cx.spawn_weak(|this, mut cx| async move { + let embeddings_for_digest = + embeddings_for_digest.await.log_err().unwrap_or_default(); + + cx.background().timer(BACKGROUND_INDEXING_DELAY).await; + if let Some((this, project)) = this.upgrade(&cx).zip(project.upgrade(&cx)) { + Self::reindex_changed_paths( + this, + project, + Some(change_time), + &mut cx, + Arc::new(embeddings_for_digest), + ) + .await; + } + }) + .detach(); } + } + + pub fn register_project(&mut self, project: ModelHandle, cx: &mut ModelContext) { + log::trace!("Registering Project for Semantic Index"); - cx.spawn_weak(|this, mut cx| async move { - let embeddings_for_digest = embeddings_for_digest.await.log_err().unwrap_or_default(); - - cx.background().timer(BACKGROUND_INDEXING_DELAY).await; - if let Some((this, project)) = this.upgrade(&cx).zip(project.upgrade(&cx)) { - Self::reindex_changed_paths( - this, - project, - Some(change_time), - &mut cx, - Arc::new(embeddings_for_digest), - ) - .await; + let subscription = cx.subscribe(&project, |this, project, event, cx| match event { + project::Event::WorktreeAdded | project::Event::WorktreeRemoved(_) => { + this.project_worktrees_changed(project.clone(), cx); } - }) - .detach(); + project::Event::WorktreeUpdatedEntries(worktree_id, changes) => { + this.project_entries_changed(project, *worktree_id, changes.clone(), cx); + } + _ => {} + }); + self.projects + .insert(project.downgrade(), ProjectState::new(subscription)); + self.project_worktrees_changed(project, cx); } - pub fn initialize_project( + fn register_worktree( &mut self, project: ModelHandle, + worktree: ModelHandle, cx: &mut ModelContext, - ) -> Task> { - log::trace!("Initializing Project for Semantic Index"); - let worktree_scans_complete = project - .read(cx) - .worktrees(cx) - .map(|worktree| { - let scan_complete = worktree.read(cx).as_local().unwrap().scan_complete(); - async move { - scan_complete.await; - } - }) - .collect::>(); - - let worktree_db_ids = project - .read(cx) - .worktrees(cx) - .map(|worktree| { - self.db - .find_or_create_worktree(worktree.read(cx).abs_path().to_path_buf()) - }) - .collect::>(); - - let _subscription = cx.subscribe(&project, |this, project, event, cx| { - if let project::Event::WorktreeUpdatedEntries(worktree_id, changes) = event { - this.project_entries_changed(project.clone(), changes.clone(), cx, worktree_id); - }; - }); - + ) { + let project = project.downgrade(); + let project_state = if let Some(project_state) = self.projects.get_mut(&project) { + project_state + } else { + return; + }; + let worktree = if let Some(worktree) = worktree.read(cx).as_local() { + worktree + } else { + return; + }; + let worktree_abs_path = worktree.abs_path().clone(); + let scan_complete = worktree.scan_complete(); + let worktree_id = worktree.id(); + let db = self.db.clone(); let language_registry = self.language_registry.clone(); - - cx.spawn(|this, mut cx| async move { - futures::future::join_all(worktree_scans_complete).await; - - let worktree_db_ids = futures::future::join_all(worktree_db_ids).await; - let worktrees = project.read_with(&cx, |project, cx| { - project - .worktrees(cx) - .map(|worktree| worktree.read(cx).snapshot()) - .collect::>() - }); - - let mut worktree_file_mtimes = HashMap::new(); - let mut db_ids_by_worktree_id = HashMap::new(); - - for (worktree, db_id) in worktrees.iter().zip(worktree_db_ids) { - let db_id = db_id?; - db_ids_by_worktree_id.insert(worktree.id(), db_id); - worktree_file_mtimes.insert( - worktree.id(), - this.read_with(&cx, |this, _| this.db.get_file_mtimes(db_id)) - .await?, - ); - } - - let worktree_db_ids = db_ids_by_worktree_id - .iter() - .map(|(a, b)| (*a, *b)) - .collect(); - - let changed_paths = cx - .background() - .spawn(async move { - let mut changed_paths = BTreeMap::new(); - let now = Instant::now(); - for worktree in worktrees.into_iter() { - let mut file_mtimes = worktree_file_mtimes.remove(&worktree.id()).unwrap(); + let registration = cx.spawn(|this, mut cx| { + async move { + scan_complete.await; + let db_id = db.find_or_create_worktree(worktree_abs_path).await?; + let mut file_mtimes = db.get_file_mtimes(db_id).await?; + let worktree = if let Some(project) = project.upgrade(&cx) { + project + .read_with(&cx, |project, cx| project.worktree_for_id(worktree_id, cx)) + .ok_or_else(|| anyhow!("worktree not found"))? + } else { + return anyhow::Ok(()); + }; + let worktree = worktree.read_with(&cx, |worktree, _| worktree.snapshot()); + let mut changed_paths = cx + .background() + .spawn(async move { + let mut changed_paths = BTreeMap::new(); + let now = Instant::now(); for file in worktree.files(false, 0) { let absolute_path = worktree.absolutize(&file.path); @@ -534,10 +534,7 @@ impl SemanticIndex { if !already_stored { changed_paths.insert( - ProjectPath { - worktree_id: worktree.id(), - path: file.path.clone(), - }, + file.path.clone(), ChangedPathInfo { changed_at: now, mtime: file.mtime, @@ -551,10 +548,7 @@ impl SemanticIndex { // Clean up entries from database that are no longer in the worktree. for (path, mtime) in file_mtimes { changed_paths.insert( - ProjectPath { - worktree_id: worktree.id(), - path: path.into(), - }, + path.into(), ChangedPathInfo { changed_at: now, mtime, @@ -562,20 +556,80 @@ impl SemanticIndex { }, ); } + + anyhow::Ok(changed_paths) + }) + .await?; + this.update(&mut cx, |this, _| { + let project_state = this + .projects + .get_mut(&project) + .ok_or_else(|| anyhow!("project not registered"))?; + + if let Some(WorktreeState::Registering(state)) = + project_state.worktrees.remove(&worktree_id) + { + changed_paths.extend(state.changed_paths); } + project_state.worktrees.insert( + worktree_id, + WorktreeState::Registered(RegisteredWorktreeState { + db_id, + changed_paths, + }), + ); + + anyhow::Ok(()) + })?; + + anyhow::Ok(()) + } + .log_err() + }); + project_state.worktrees.insert( + worktree_id, + WorktreeState::Registering(RegisteringWorktreeState { + changed_paths: Default::default(), + _registration: registration, + }), + ); + } - anyhow::Ok(changed_paths) - }) - .await?; + fn project_worktrees_changed( + &mut self, + project: ModelHandle, + cx: &mut ModelContext, + ) { + let project_state = if let Some(project_state) = self.projects.get_mut(&project.downgrade()) + { + project_state + } else { + return; + }; - this.update(&mut cx, |this, _| { - this.projects.insert( - project.downgrade(), - ProjectState::new(_subscription, worktree_db_ids, changed_paths), - ); - }); - Result::<(), _>::Ok(()) - }) + let mut worktrees = project + .read(cx) + .worktrees(cx) + .filter(|worktree| worktree.read(cx).is_local()) + .collect::>(); + let worktree_ids = worktrees + .iter() + .map(|worktree| worktree.read(cx).id()) + .collect::>(); + + // Remove worktrees that are no longer present + project_state + .worktrees + .retain(|worktree_id, _| worktree_ids.contains(worktree_id)); + + // Register new worktrees + worktrees.retain(|worktree| { + let worktree_id = worktree.read(cx).id(); + project_state.worktree(worktree_id).is_none() + }); + for worktree in worktrees { + self.register_worktree(project.clone(), worktree, cx); + } } pub fn index_project( @@ -583,28 +637,31 @@ impl SemanticIndex { project: ModelHandle, cx: &mut ModelContext, ) -> Task)>> { - cx.spawn(|this, mut cx| async move { - let embeddings_for_digest = this.read_with(&cx, |this, _| { - if let Some(state) = this.projects.get(&project.downgrade()) { - let mut worktree_id_file_paths = HashMap::default(); - for (path, _) in &state.changed_paths { - if let Some(worktree_db_id) = state.db_id_for_worktree_id(path.worktree_id) - { - worktree_id_file_paths - .entry(worktree_db_id) - .or_insert(Vec::new()) - .push(path.path.clone()); - } - } + let project_state = if let Some(project_state) = self.projects.get_mut(&project.downgrade()) + { + project_state + } else { + return Task::ready(Err(anyhow!("project was not registered"))); + }; + let outstanding_job_count_rx = project_state.outstanding_job_count_rx.clone(); - Ok(this.db.embeddings_for_files(worktree_id_file_paths)) - } else { - Err(anyhow!("Project not yet initialized")) + let mut worktree_id_file_paths = HashMap::default(); + for worktree in project_state.worktrees.values() { + if let WorktreeState::Registered(worktree_state) = worktree { + for (path, _) in &worktree_state.changed_paths { + worktree_id_file_paths + .entry(worktree_state.db_id) + .or_insert(Vec::new()) + .push(path.clone()); } - })?; + } + } + cx.spawn(|this, mut cx| async move { + let embeddings_for_digest = this.read_with(&cx, |this, _| { + this.db.embeddings_for_files(worktree_id_file_paths) + }); let embeddings_for_digest = Arc::new(embeddings_for_digest.await?); - Self::reindex_changed_paths( this.clone(), project.clone(), @@ -613,15 +670,8 @@ impl SemanticIndex { embeddings_for_digest, ) .await; - - this.update(&mut cx, |this, _cx| { - let Some(state) = this.projects.get(&project.downgrade()) else { - return Err(anyhow!("Project not yet initialized")); - }; - let job_count_rx = state.outstanding_job_count_rx.clone(); - let count = state.get_outstanding_count(); - Ok((count, job_count_rx)) - }) + let count = *outstanding_job_count_rx.borrow(); + Ok((count, outstanding_job_count_rx)) }) } @@ -784,50 +834,49 @@ impl SemanticIndex { let (db, language_registry, parsing_files_tx) = this.update(cx, |this, cx| { if let Some(project_state) = this.projects.get_mut(&project.downgrade()) { let outstanding_job_count_tx = &project_state.outstanding_job_count_tx; - let db_ids = &project_state.worktree_db_ids; - let mut worktree: Option> = None; + project_state + .worktrees + .retain(|worktree_id, worktree_state| { + let worktree = if let Some(worktree) = + project.read(cx).worktree_for_id(*worktree_id, cx) + { + worktree + } else { + return false; + }; + let worktree_state = + if let WorktreeState::Registered(worktree_state) = worktree_state { + worktree_state + } else { + return true; + }; + + worktree_state.changed_paths.retain(|path, info| { + if let Some(last_changed_before) = last_changed_before { + if info.changed_at > last_changed_before { + return true; + } + } - project_state.changed_paths.retain(|path, info| { - if let Some(last_changed_before) = last_changed_before { - if info.changed_at > last_changed_before { - return true; - } - } + if info.is_deleted { + files_to_delete.push((worktree_state.db_id, path.clone())); + } else { + let absolute_path = worktree.read(cx).absolutize(path); + let job_handle = JobHandle::new(&outstanding_job_count_tx); + pending_files.push(PendingFile { + absolute_path, + relative_path: path.clone(), + language: None, + job_handle, + modified_time: info.mtime, + worktree_db_id: worktree_state.db_id, + }); + } - if worktree - .as_ref() - .map_or(true, |tree| tree.read(cx).id() != path.worktree_id) - { - worktree = project.read(cx).worktree_for_id(path.worktree_id, cx); - } - let Some(worktree) = &worktree else { - return false; - }; - - let Some(worktree_db_id) = db_ids - .iter() - .find_map(|entry| (entry.0 == path.worktree_id).then_some(entry.1)) - else { - return false; - }; - - if info.is_deleted { - files_to_delete.push((worktree_db_id, path.path.to_path_buf())); - } else { - let absolute_path = worktree.read(cx).absolutize(&path.path); - let job_handle = JobHandle::new(&outstanding_job_count_tx); - pending_files.push(PendingFile { - absolute_path, - relative_path: path.path.to_path_buf(), - language: None, - job_handle, - modified_time: info.mtime, - worktree_db_id, + false }); - } - - false - }); + true + }); } ( diff --git a/crates/semantic_index/src/semantic_index_tests.rs b/crates/semantic_index/src/semantic_index_tests.rs index f549e68e04be844d409e4deab84b02ddea0093de..2f28184f20e3e3acdc191ac1e9a9fe1be9b58750 100644 --- a/crates/semantic_index/src/semantic_index_tests.rs +++ b/crates/semantic_index/src/semantic_index_tests.rs @@ -87,11 +87,8 @@ async fn test_semantic_index(deterministic: Arc, cx: &mut TestApp let project = Project::test(fs.clone(), ["/the-root".as_ref()], cx).await; - let _ = semantic_index - .update(cx, |store, cx| { - store.initialize_project(project.clone(), cx) - }) - .await; + semantic_index.update(cx, |store, cx| store.register_project(project.clone(), cx)); + deterministic.run_until_parked(); let (file_count, outstanding_file_count) = semantic_index .update(cx, |store, cx| store.index_project(project.clone(), cx)) @@ -214,7 +211,7 @@ async fn test_embedding_batching(cx: &mut TestAppContext, mut rng: StdRng) { let files = (1..=3) .map(|file_ix| FileToEmbed { worktree_id: 5, - path: format!("path-{file_ix}").into(), + path: Path::new(&format!("path-{file_ix}")).into(), mtime: SystemTime::now(), documents: (0..rng.gen_range(4..22)) .map(|document_ix| { From 7b5a41dda22a4d99ae226aaeda658194d4b1689b Mon Sep 17 00:00:00 2001 From: Antonio Scandurra Date: Tue, 5 Sep 2023 16:09:24 +0200 Subject: [PATCH 02/11] Move retrieval of embeddings from the db into `reindex_changed_files` Co-Authored-By: Kyle Caverly --- crates/semantic_index/src/semantic_index.rs | 220 +++++++++----------- 1 file changed, 100 insertions(+), 120 deletions(-) diff --git a/crates/semantic_index/src/semantic_index.rs b/crates/semantic_index/src/semantic_index.rs index 6441d8d5c0d4d3cf5b9c14e03fa32b6aad8a0df2..0b2a77378e78f0929b1628df377d92cfaa4e273f 100644 --- a/crates/semantic_index/src/semantic_index.rs +++ b/crates/semantic_index/src/semantic_index.rs @@ -418,30 +418,12 @@ impl SemanticIndex { }; worktree_state.paths_changed(changes, Instant::now(), worktree); if let WorktreeState::Registered(worktree_state) = worktree_state { - let embeddings_for_digest = { - let worktree_paths = worktree_state - .changed_paths - .iter() - .map(|(path, _)| path.clone()) - .collect::>(); - let mut worktree_id_file_paths = HashMap::default(); - worktree_id_file_paths.insert(worktree_state.db_id, worktree_paths); - self.db.embeddings_for_files(worktree_id_file_paths) - }; - cx.spawn_weak(|this, mut cx| async move { - let embeddings_for_digest = - embeddings_for_digest.await.log_err().unwrap_or_default(); - cx.background().timer(BACKGROUND_INDEXING_DELAY).await; if let Some((this, project)) = this.upgrade(&cx).zip(project.upgrade(&cx)) { - Self::reindex_changed_paths( - this, - project, - Some(change_time), - &mut cx, - Arc::new(embeddings_for_digest), - ) + this.update(&mut cx, |this, cx| { + this.reindex_changed_paths(project, Some(change_time), cx) + }) .await; } }) @@ -644,31 +626,10 @@ impl SemanticIndex { return Task::ready(Err(anyhow!("project was not registered"))); }; let outstanding_job_count_rx = project_state.outstanding_job_count_rx.clone(); - - let mut worktree_id_file_paths = HashMap::default(); - for worktree in project_state.worktrees.values() { - if let WorktreeState::Registered(worktree_state) = worktree { - for (path, _) in &worktree_state.changed_paths { - worktree_id_file_paths - .entry(worktree_state.db_id) - .or_insert(Vec::new()) - .push(path.clone()); - } - } - } - cx.spawn(|this, mut cx| async move { - let embeddings_for_digest = this.read_with(&cx, |this, _| { - this.db.embeddings_for_files(worktree_id_file_paths) - }); - let embeddings_for_digest = Arc::new(embeddings_for_digest.await?); - Self::reindex_changed_paths( - this.clone(), - project.clone(), - None, - &mut cx, - embeddings_for_digest, - ) + this.update(&mut cx, |this, cx| { + this.reindex_changed_paths(project.clone(), None, cx) + }) .await; let count = *outstanding_job_count_rx.borrow(); Ok((count, outstanding_job_count_rx)) @@ -822,94 +783,113 @@ impl SemanticIndex { }) } - async fn reindex_changed_paths( - this: ModelHandle, + fn reindex_changed_paths( + &mut self, project: ModelHandle, last_changed_before: Option, - cx: &mut AsyncAppContext, - embeddings_for_digest: Arc>, - ) { + cx: &mut ModelContext, + ) -> Task<()> { + let project_state = if let Some(project_state) = self.projects.get_mut(&project.downgrade()) + { + project_state + } else { + return Task::ready(()); + }; + let mut pending_files = Vec::new(); let mut files_to_delete = Vec::new(); - let (db, language_registry, parsing_files_tx) = this.update(cx, |this, cx| { - if let Some(project_state) = this.projects.get_mut(&project.downgrade()) { - let outstanding_job_count_tx = &project_state.outstanding_job_count_tx; - project_state - .worktrees - .retain(|worktree_id, worktree_state| { - let worktree = if let Some(worktree) = - project.read(cx).worktree_for_id(*worktree_id, cx) - { - worktree - } else { - return false; - }; - let worktree_state = - if let WorktreeState::Registered(worktree_state) = worktree_state { - worktree_state - } else { - return true; - }; - - worktree_state.changed_paths.retain(|path, info| { - if let Some(last_changed_before) = last_changed_before { - if info.changed_at > last_changed_before { - return true; - } - } - - if info.is_deleted { - files_to_delete.push((worktree_state.db_id, path.clone())); - } else { - let absolute_path = worktree.read(cx).absolutize(path); - let job_handle = JobHandle::new(&outstanding_job_count_tx); - pending_files.push(PendingFile { - absolute_path, - relative_path: path.clone(), - language: None, - job_handle, - modified_time: info.mtime, - worktree_db_id: worktree_state.db_id, - }); - } + let outstanding_job_count_tx = &project_state.outstanding_job_count_tx; + project_state + .worktrees + .retain(|worktree_id, worktree_state| { + let worktree = + if let Some(worktree) = project.read(cx).worktree_for_id(*worktree_id, cx) { + worktree + } else { + return false; + }; + let worktree_state = + if let WorktreeState::Registered(worktree_state) = worktree_state { + worktree_state + } else { + return true; + }; + + worktree_state.changed_paths.retain(|path, info| { + if let Some(last_changed_before) = last_changed_before { + if info.changed_at > last_changed_before { + return true; + } + } - false + if info.is_deleted { + files_to_delete.push((worktree_state.db_id, path.clone())); + } else { + let absolute_path = worktree.read(cx).absolutize(path); + let job_handle = JobHandle::new(&outstanding_job_count_tx); + pending_files.push(PendingFile { + absolute_path, + relative_path: path.clone(), + language: None, + job_handle, + modified_time: info.mtime, + worktree_db_id: worktree_state.db_id, }); - true - }); - } + } - ( - this.db.clone(), - this.language_registry.clone(), - this.parsing_files_tx.clone(), - ) - }); + false + }); + true + }); - for (worktree_db_id, path) in files_to_delete { - db.delete_file(worktree_db_id, path).await.log_err(); + let mut worktree_id_file_paths = HashMap::default(); + for worktree in project_state.worktrees.values() { + if let WorktreeState::Registered(worktree_state) = worktree { + for (path, _) in &worktree_state.changed_paths { + worktree_id_file_paths + .entry(worktree_state.db_id) + .or_insert(Vec::new()) + .push(path.clone()); + } + } } - for mut pending_file in pending_files { - if let Ok(language) = language_registry - .language_for_file(&pending_file.relative_path, None) - .await - { - if !PARSEABLE_ENTIRE_FILE_TYPES.contains(&language.name().as_ref()) - && &language.name().as_ref() != &"Markdown" - && language - .grammar() - .and_then(|grammar| grammar.embedding_config.as_ref()) - .is_none() + let db = self.db.clone(); + let language_registry = self.language_registry.clone(); + let parsing_files_tx = self.parsing_files_tx.clone(); + cx.background().spawn(async move { + for (worktree_db_id, path) in files_to_delete { + db.delete_file(worktree_db_id, path).await.log_err(); + } + + let embeddings_for_digest = Arc::new( + db.embeddings_for_files(worktree_id_file_paths) + .await + .log_err() + .unwrap_or_default(), + ); + + for mut pending_file in pending_files { + if let Ok(language) = language_registry + .language_for_file(&pending_file.relative_path, None) + .await { - continue; + if !PARSEABLE_ENTIRE_FILE_TYPES.contains(&language.name().as_ref()) + && &language.name().as_ref() != &"Markdown" + && language + .grammar() + .and_then(|grammar| grammar.embedding_config.as_ref()) + .is_none() + { + continue; + } + pending_file.language = Some(language); } - pending_file.language = Some(language); + parsing_files_tx + .try_send((embeddings_for_digest.clone(), pending_file)) + .ok(); } - parsing_files_tx - .try_send((embeddings_for_digest.clone(), pending_file)) - .ok(); - } + }) } } From 6b1dc63fc057df75acee794c4d8670c6fcb120f1 Mon Sep 17 00:00:00 2001 From: Antonio Scandurra Date: Tue, 5 Sep 2023 16:16:12 +0200 Subject: [PATCH 03/11] Retrieve embeddings based on pending files Co-Authored-By: Kyle Caverly --- crates/semantic_index/src/semantic_index.rs | 35 ++++++++++----------- 1 file changed, 16 insertions(+), 19 deletions(-) diff --git a/crates/semantic_index/src/semantic_index.rs b/crates/semantic_index/src/semantic_index.rs index 0b2a77378e78f0929b1628df377d92cfaa4e273f..8122cffdc96c235d15216ff8ad64a401eccc04a5 100644 --- a/crates/semantic_index/src/semantic_index.rs +++ b/crates/semantic_index/src/semantic_index.rs @@ -417,7 +417,7 @@ impl SemanticIndex { return; }; worktree_state.paths_changed(changes, Instant::now(), worktree); - if let WorktreeState::Registered(worktree_state) = worktree_state { + if let WorktreeState::Registered(_) = worktree_state { cx.spawn_weak(|this, mut cx| async move { cx.background().timer(BACKGROUND_INDEXING_DELAY).await; if let Some((this, project)) = this.upgrade(&cx).zip(project.upgrade(&cx)) { @@ -842,18 +842,6 @@ impl SemanticIndex { true }); - let mut worktree_id_file_paths = HashMap::default(); - for worktree in project_state.worktrees.values() { - if let WorktreeState::Registered(worktree_state) = worktree { - for (path, _) in &worktree_state.changed_paths { - worktree_id_file_paths - .entry(worktree_state.db_id) - .or_insert(Vec::new()) - .push(path.clone()); - } - } - } - let db = self.db.clone(); let language_registry = self.language_registry.clone(); let parsing_files_tx = self.parsing_files_tx.clone(); @@ -862,12 +850,21 @@ impl SemanticIndex { db.delete_file(worktree_db_id, path).await.log_err(); } - let embeddings_for_digest = Arc::new( - db.embeddings_for_files(worktree_id_file_paths) - .await - .log_err() - .unwrap_or_default(), - ); + let embeddings_for_digest = { + let mut files = HashMap::default(); + for pending_file in &pending_files { + files + .entry(pending_file.worktree_db_id) + .or_insert(Vec::new()) + .push(pending_file.relative_path.clone()); + } + Arc::new( + db.embeddings_for_files(files) + .await + .log_err() + .unwrap_or_default(), + ) + }; for mut pending_file in pending_files { if let Ok(language) = language_registry From 3c70b127bd2785a943ae81bc3643f73eeb8babae Mon Sep 17 00:00:00 2001 From: Antonio Scandurra Date: Tue, 5 Sep 2023 16:52:58 +0200 Subject: [PATCH 04/11] Simplify `SemanticIndex::index_project` Co-Authored-By: Kyle Caverly --- crates/search/src/project_search.rs | 40 ++-- crates/semantic_index/src/semantic_index.rs | 174 +++++++----------- .../src/semantic_index_tests.rs | 27 ++- 3 files changed, 98 insertions(+), 143 deletions(-) diff --git a/crates/search/src/project_search.rs b/crates/search/src/project_search.rs index 6364183877b7e1f2bff8ea26b6244a0ed57c6be9..f1a0ff71d33398cb95a06fde5d39a4e14ce20ee5 100644 --- a/crates/search/src/project_search.rs +++ b/crates/search/src/project_search.rs @@ -12,15 +12,13 @@ use editor::{ SelectAll, MAX_TAB_TITLE_LEN, }; use futures::StreamExt; - -use gpui::platform::PromptLevel; - use gpui::{ - actions, elements::*, platform::MouseButton, Action, AnyElement, AnyViewHandle, AppContext, - Entity, ModelContext, ModelHandle, Subscription, Task, View, ViewContext, ViewHandle, - WeakModelHandle, WeakViewHandle, + actions, + elements::*, + platform::{MouseButton, PromptLevel}, + Action, AnyElement, AnyViewHandle, AppContext, Entity, ModelContext, ModelHandle, Subscription, + Task, View, ViewContext, ViewHandle, WeakModelHandle, WeakViewHandle, }; - use menu::Confirm; use postage::stream::Stream; use project::{ @@ -132,8 +130,7 @@ pub struct ProjectSearchView { } struct SemanticSearchState { - file_count: usize, - outstanding_file_count: usize, + pending_file_count: usize, _progress_task: Task<()>, } @@ -319,12 +316,8 @@ impl View for ProjectSearchView { }; let semantic_status = if let Some(semantic) = &self.semantic_state { - if semantic.outstanding_file_count > 0 { - format!( - "Indexing: {} of {}...", - semantic.file_count - semantic.outstanding_file_count, - semantic.file_count - ) + if semantic.pending_file_count > 0 { + format!("Remaining files to index: {}", semantic.pending_file_count) } else { "Indexing complete".to_string() } @@ -641,26 +634,25 @@ impl ProjectSearchView { let project = self.model.read(cx).project.clone(); - let index_task = semantic_index.update(cx, |semantic_index, cx| { - semantic_index.index_project(project, cx) + let mut pending_file_count_rx = semantic_index.update(cx, |semantic_index, cx| { + semantic_index.index_project(project.clone(), cx); + semantic_index.pending_file_count(&project).unwrap() }); cx.spawn(|search_view, mut cx| async move { - let (files_to_index, mut files_remaining_rx) = index_task.await?; - search_view.update(&mut cx, |search_view, cx| { cx.notify(); + let pending_file_count = *pending_file_count_rx.borrow(); search_view.semantic_state = Some(SemanticSearchState { - file_count: files_to_index, - outstanding_file_count: files_to_index, + pending_file_count, _progress_task: cx.spawn(|search_view, mut cx| async move { - while let Some(count) = files_remaining_rx.recv().await { + while let Some(count) = pending_file_count_rx.recv().await { search_view .update(&mut cx, |search_view, cx| { if let Some(semantic_search_state) = &mut search_view.semantic_state { - semantic_search_state.outstanding_file_count = count; + semantic_search_state.pending_file_count = count; cx.notify(); if count == 0 { return; @@ -959,7 +951,7 @@ impl ProjectSearchView { match mode { SearchMode::Semantic => { if let Some(semantic) = &mut self.semantic_state { - if semantic.outstanding_file_count > 0 { + if semantic.pending_file_count > 0 { return; } diff --git a/crates/semantic_index/src/semantic_index.rs b/crates/semantic_index/src/semantic_index.rs index 8122cffdc96c235d15216ff8ad64a401eccc04a5..2de78ab7e3728bbc10669c8895aa9671c67642b9 100644 --- a/crates/semantic_index/src/semantic_index.rs +++ b/crates/semantic_index/src/semantic_index.rs @@ -66,7 +66,9 @@ pub fn init( if let Some(workspace) = workspace.upgrade(cx) { let project = workspace.read(cx).project().clone(); if project.read(cx).is_local() { - semantic_index.update(cx, |index, cx| index.register_project(project, cx)); + semantic_index.update(cx, |index, cx| { + index.register_project(project, cx); + }); } } } @@ -122,7 +124,6 @@ impl WorktreeState { fn paths_changed( &mut self, changes: Arc<[(Arc, ProjectEntryId, PathChange)]>, - changed_at: Instant, worktree: &Worktree, ) { let changed_paths = match self { @@ -140,7 +141,6 @@ impl WorktreeState { changed_paths.insert( path.clone(), ChangedPathInfo { - changed_at, mtime: entry.mtime, is_deleted: *change == PathChange::Removed, }, @@ -160,7 +160,6 @@ struct RegisteredWorktreeState { } struct ChangedPathInfo { - changed_at: Instant, mtime: SystemTime, is_deleted: bool, } @@ -409,43 +408,47 @@ impl SemanticIndex { return; }; - let change_time = Instant::now(); let worktree = worktree.read(cx); let worktree_state = if let Some(worktree_state) = project_state.worktree(worktree_id) { worktree_state } else { return; }; - worktree_state.paths_changed(changes, Instant::now(), worktree); + worktree_state.paths_changed(changes, worktree); if let WorktreeState::Registered(_) = worktree_state { cx.spawn_weak(|this, mut cx| async move { cx.background().timer(BACKGROUND_INDEXING_DELAY).await; if let Some((this, project)) = this.upgrade(&cx).zip(project.upgrade(&cx)) { - this.update(&mut cx, |this, cx| { - this.reindex_changed_paths(project, Some(change_time), cx) - }) - .await; + this.update(&mut cx, |this, cx| this.index_project(project, cx)); } }) .detach(); } } - pub fn register_project(&mut self, project: ModelHandle, cx: &mut ModelContext) { - log::trace!("Registering Project for Semantic Index"); + fn register_project( + &mut self, + project: ModelHandle, + cx: &mut ModelContext, + ) -> &mut ProjectState { + if !self.projects.contains_key(&project.downgrade()) { + log::trace!("Registering Project for Semantic Index"); - let subscription = cx.subscribe(&project, |this, project, event, cx| match event { - project::Event::WorktreeAdded | project::Event::WorktreeRemoved(_) => { - this.project_worktrees_changed(project.clone(), cx); - } - project::Event::WorktreeUpdatedEntries(worktree_id, changes) => { - this.project_entries_changed(project, *worktree_id, changes.clone(), cx); - } - _ => {} - }); - self.projects - .insert(project.downgrade(), ProjectState::new(subscription)); - self.project_worktrees_changed(project, cx); + let subscription = cx.subscribe(&project, |this, project, event, cx| match event { + project::Event::WorktreeAdded | project::Event::WorktreeRemoved(_) => { + this.project_worktrees_changed(project.clone(), cx); + } + project::Event::WorktreeUpdatedEntries(worktree_id, changes) => { + this.project_entries_changed(project, *worktree_id, changes.clone(), cx); + } + _ => {} + }); + self.projects + .insert(project.downgrade(), ProjectState::new(subscription)); + self.project_worktrees_changed(project.clone(), cx); + } + + self.projects.get_mut(&project.downgrade()).unwrap() } fn register_worktree( @@ -487,7 +490,6 @@ impl SemanticIndex { .background() .spawn(async move { let mut changed_paths = BTreeMap::new(); - let now = Instant::now(); for file in worktree.files(false, 0) { let absolute_path = worktree.absolutize(&file.path); @@ -518,7 +520,6 @@ impl SemanticIndex { changed_paths.insert( file.path.clone(), ChangedPathInfo { - changed_at: now, mtime: file.mtime, is_deleted: false, }, @@ -532,7 +533,6 @@ impl SemanticIndex { changed_paths.insert( path.into(), ChangedPathInfo { - changed_at: now, mtime, is_deleted: true, }, @@ -614,29 +614,7 @@ impl SemanticIndex { } } - pub fn index_project( - &mut self, - project: ModelHandle, - cx: &mut ModelContext, - ) -> Task)>> { - let project_state = if let Some(project_state) = self.projects.get_mut(&project.downgrade()) - { - project_state - } else { - return Task::ready(Err(anyhow!("project was not registered"))); - }; - let outstanding_job_count_rx = project_state.outstanding_job_count_rx.clone(); - cx.spawn(|this, mut cx| async move { - this.update(&mut cx, |this, cx| { - this.reindex_changed_paths(project.clone(), None, cx) - }) - .await; - let count = *outstanding_job_count_rx.borrow(); - Ok((count, outstanding_job_count_rx)) - }) - } - - pub fn outstanding_job_count_rx( + pub fn pending_file_count( &self, project: &ModelHandle, ) -> Option> { @@ -783,18 +761,8 @@ impl SemanticIndex { }) } - fn reindex_changed_paths( - &mut self, - project: ModelHandle, - last_changed_before: Option, - cx: &mut ModelContext, - ) -> Task<()> { - let project_state = if let Some(project_state) = self.projects.get_mut(&project.downgrade()) - { - project_state - } else { - return Task::ready(()); - }; + pub fn index_project(&mut self, project: ModelHandle, cx: &mut ModelContext) { + let project_state = self.register_project(project.clone(), cx); let mut pending_files = Vec::new(); let mut files_to_delete = Vec::new(); @@ -816,12 +784,6 @@ impl SemanticIndex { }; worktree_state.changed_paths.retain(|path, info| { - if let Some(last_changed_before) = last_changed_before { - if info.changed_at > last_changed_before { - return true; - } - } - if info.is_deleted { files_to_delete.push((worktree_state.db_id, path.clone())); } else { @@ -845,48 +807,50 @@ impl SemanticIndex { let db = self.db.clone(); let language_registry = self.language_registry.clone(); let parsing_files_tx = self.parsing_files_tx.clone(); - cx.background().spawn(async move { - for (worktree_db_id, path) in files_to_delete { - db.delete_file(worktree_db_id, path).await.log_err(); - } - - let embeddings_for_digest = { - let mut files = HashMap::default(); - for pending_file in &pending_files { - files - .entry(pending_file.worktree_db_id) - .or_insert(Vec::new()) - .push(pending_file.relative_path.clone()); + cx.background() + .spawn(async move { + for (worktree_db_id, path) in files_to_delete { + db.delete_file(worktree_db_id, path).await.log_err(); } - Arc::new( - db.embeddings_for_files(files) - .await - .log_err() - .unwrap_or_default(), - ) - }; - for mut pending_file in pending_files { - if let Ok(language) = language_registry - .language_for_file(&pending_file.relative_path, None) - .await - { - if !PARSEABLE_ENTIRE_FILE_TYPES.contains(&language.name().as_ref()) - && &language.name().as_ref() != &"Markdown" - && language - .grammar() - .and_then(|grammar| grammar.embedding_config.as_ref()) - .is_none() + let embeddings_for_digest = { + let mut files = HashMap::default(); + for pending_file in &pending_files { + files + .entry(pending_file.worktree_db_id) + .or_insert(Vec::new()) + .push(pending_file.relative_path.clone()); + } + Arc::new( + db.embeddings_for_files(files) + .await + .log_err() + .unwrap_or_default(), + ) + }; + + for mut pending_file in pending_files { + if let Ok(language) = language_registry + .language_for_file(&pending_file.relative_path, None) + .await { - continue; + if !PARSEABLE_ENTIRE_FILE_TYPES.contains(&language.name().as_ref()) + && &language.name().as_ref() != &"Markdown" + && language + .grammar() + .and_then(|grammar| grammar.embedding_config.as_ref()) + .is_none() + { + continue; + } + pending_file.language = Some(language); } - pending_file.language = Some(language); + parsing_files_tx + .try_send((embeddings_for_digest.clone(), pending_file)) + .ok(); } - parsing_files_tx - .try_send((embeddings_for_digest.clone(), pending_file)) - .ok(); - } - }) + }) + .detach() } } diff --git a/crates/semantic_index/src/semantic_index_tests.rs b/crates/semantic_index/src/semantic_index_tests.rs index 2f28184f20e3e3acdc191ac1e9a9fe1be9b58750..008a9e0434dd8794ef9845447b1598e3e4224e3f 100644 --- a/crates/semantic_index/src/semantic_index_tests.rs +++ b/crates/semantic_index/src/semantic_index_tests.rs @@ -87,16 +87,18 @@ async fn test_semantic_index(deterministic: Arc, cx: &mut TestApp let project = Project::test(fs.clone(), ["/the-root".as_ref()], cx).await; - semantic_index.update(cx, |store, cx| store.register_project(project.clone(), cx)); + semantic_index.update(cx, |store, cx| { + store.register_project(project.clone(), cx); + }); deterministic.run_until_parked(); - let (file_count, outstanding_file_count) = semantic_index - .update(cx, |store, cx| store.index_project(project.clone(), cx)) - .await - .unwrap(); - assert_eq!(file_count, 3); + let pending_file_count = + semantic_index.read_with(cx, |index, _| index.pending_file_count(&project).unwrap()); + semantic_index.update(cx, |store, cx| store.index_project(project.clone(), cx)); + deterministic.run_until_parked(); + assert_eq!(*pending_file_count.borrow(), 3); deterministic.advance_clock(EMBEDDING_QUEUE_FLUSH_TIMEOUT); - assert_eq!(*outstanding_file_count.borrow(), 0); + assert_eq!(*pending_file_count.borrow(), 0); let search_results = semantic_index .update(cx, |store, cx| { @@ -188,14 +190,11 @@ async fn test_semantic_index(deterministic: Arc, cx: &mut TestApp deterministic.advance_clock(EMBEDDING_QUEUE_FLUSH_TIMEOUT); let prev_embedding_count = embedding_provider.embedding_count(); - let (file_count, outstanding_file_count) = semantic_index - .update(cx, |store, cx| store.index_project(project.clone(), cx)) - .await - .unwrap(); - assert_eq!(file_count, 1); - + semantic_index.update(cx, |store, cx| store.index_project(project.clone(), cx)); + deterministic.run_until_parked(); + assert_eq!(*pending_file_count.borrow(), 1); deterministic.advance_clock(EMBEDDING_QUEUE_FLUSH_TIMEOUT); - assert_eq!(*outstanding_file_count.borrow(), 0); + assert_eq!(*pending_file_count.borrow(), 0); assert_eq!( embedding_provider.embedding_count() - prev_embedding_count, From 95b72a73ade5f6579be0e5c03becc2ab2f3c592e Mon Sep 17 00:00:00 2001 From: Antonio Scandurra Date: Tue, 5 Sep 2023 17:17:58 +0200 Subject: [PATCH 05/11] Re-index project when a worktree is registered Co-Authored-By: Kyle Caverly --- crates/semantic_index/src/semantic_index.rs | 69 +++++++------------ .../src/semantic_index_tests.rs | 7 +- 2 files changed, 24 insertions(+), 52 deletions(-) diff --git a/crates/semantic_index/src/semantic_index.rs b/crates/semantic_index/src/semantic_index.rs index 2de78ab7e3728bbc10669c8895aa9671c67642b9..6b94874c435afb68545d981e7b5df41a515faa39 100644 --- a/crates/semantic_index/src/semantic_index.rs +++ b/crates/semantic_index/src/semantic_index.rs @@ -34,7 +34,6 @@ use util::{ paths::EMBEDDINGS_DIR, ResultExt, TryFutureExt, }; -use workspace::WorkspaceCreated; const SEMANTIC_INDEX_VERSION: usize = 9; const BACKGROUND_INDEXING_DELAY: Duration = Duration::from_secs(5 * 60); @@ -57,24 +56,6 @@ pub fn init( return; } - cx.subscribe_global::({ - move |event, cx| { - let Some(semantic_index) = SemanticIndex::global(cx) else { - return; - }; - let workspace = &event.0; - if let Some(workspace) = workspace.upgrade(cx) { - let project = workspace.read(cx).project().clone(); - if project.read(cx).is_local() { - semantic_index.update(cx, |index, cx| { - index.register_project(project, cx); - }); - } - } - } - }) - .detach(); - cx.spawn(move |mut cx| async move { let semantic_index = SemanticIndex::new( fs, @@ -426,31 +407,6 @@ impl SemanticIndex { } } - fn register_project( - &mut self, - project: ModelHandle, - cx: &mut ModelContext, - ) -> &mut ProjectState { - if !self.projects.contains_key(&project.downgrade()) { - log::trace!("Registering Project for Semantic Index"); - - let subscription = cx.subscribe(&project, |this, project, event, cx| match event { - project::Event::WorktreeAdded | project::Event::WorktreeRemoved(_) => { - this.project_worktrees_changed(project.clone(), cx); - } - project::Event::WorktreeUpdatedEntries(worktree_id, changes) => { - this.project_entries_changed(project, *worktree_id, changes.clone(), cx); - } - _ => {} - }); - self.projects - .insert(project.downgrade(), ProjectState::new(subscription)); - self.project_worktrees_changed(project.clone(), cx); - } - - self.projects.get_mut(&project.downgrade()).unwrap() - } - fn register_worktree( &mut self, project: ModelHandle, @@ -542,11 +498,14 @@ impl SemanticIndex { anyhow::Ok(changed_paths) }) .await?; - this.update(&mut cx, |this, _| { + this.update(&mut cx, |this, cx| { let project_state = this .projects .get_mut(&project) .ok_or_else(|| anyhow!("project not registered"))?; + let project = project + .upgrade(cx) + .ok_or_else(|| anyhow!("project was dropped"))?; if let Some(WorktreeState::Registering(state)) = project_state.worktrees.remove(&worktree_id) @@ -560,6 +519,7 @@ impl SemanticIndex { changed_paths, }), ); + this.index_project(project, cx); anyhow::Ok(()) })?; @@ -762,7 +722,24 @@ impl SemanticIndex { } pub fn index_project(&mut self, project: ModelHandle, cx: &mut ModelContext) { - let project_state = self.register_project(project.clone(), cx); + if !self.projects.contains_key(&project.downgrade()) { + log::trace!("Registering Project for Semantic Index"); + + let subscription = cx.subscribe(&project, |this, project, event, cx| match event { + project::Event::WorktreeAdded | project::Event::WorktreeRemoved(_) => { + this.project_worktrees_changed(project.clone(), cx); + } + project::Event::WorktreeUpdatedEntries(worktree_id, changes) => { + this.project_entries_changed(project, *worktree_id, changes.clone(), cx); + } + _ => {} + }); + self.projects + .insert(project.downgrade(), ProjectState::new(subscription)); + self.project_worktrees_changed(project.clone(), cx); + } + + let project_state = self.projects.get_mut(&project.downgrade()).unwrap(); let mut pending_files = Vec::new(); let mut files_to_delete = Vec::new(); diff --git a/crates/semantic_index/src/semantic_index_tests.rs b/crates/semantic_index/src/semantic_index_tests.rs index 008a9e0434dd8794ef9845447b1598e3e4224e3f..ca5f7df533a98753ff52e681cfc3fb6bf96fa778 100644 --- a/crates/semantic_index/src/semantic_index_tests.rs +++ b/crates/semantic_index/src/semantic_index_tests.rs @@ -87,14 +87,9 @@ async fn test_semantic_index(deterministic: Arc, cx: &mut TestApp let project = Project::test(fs.clone(), ["/the-root".as_ref()], cx).await; - semantic_index.update(cx, |store, cx| { - store.register_project(project.clone(), cx); - }); - deterministic.run_until_parked(); - + semantic_index.update(cx, |store, cx| store.index_project(project.clone(), cx)); let pending_file_count = semantic_index.read_with(cx, |index, _| index.pending_file_count(&project).unwrap()); - semantic_index.update(cx, |store, cx| store.index_project(project.clone(), cx)); deterministic.run_until_parked(); assert_eq!(*pending_file_count.borrow(), 3); deterministic.advance_clock(EMBEDDING_QUEUE_FLUSH_TIMEOUT); From c802680084d8cadf7aa2a8ac096e5dbd452abd31 Mon Sep 17 00:00:00 2001 From: Antonio Scandurra Date: Wed, 6 Sep 2023 09:41:51 +0200 Subject: [PATCH 06/11] Clip ranges returned by `SemanticIndex::search` The files may have changed since the last time they were parsed, so the ranges returned by `SemanticIndex::search` may be out of bounds. --- crates/semantic_index/src/semantic_index.rs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/crates/semantic_index/src/semantic_index.rs b/crates/semantic_index/src/semantic_index.rs index 6b94874c435afb68545d981e7b5df41a515faa39..7ce4c9c2e45a02555da37ff9ca10b70d5a518786 100644 --- a/crates/semantic_index/src/semantic_index.rs +++ b/crates/semantic_index/src/semantic_index.rs @@ -15,7 +15,7 @@ use embedding::{Embedding, EmbeddingProvider, OpenAIEmbeddings}; use embedding_queue::{EmbeddingQueue, FileToEmbed}; use futures::{FutureExt, StreamExt}; use gpui::{AppContext, AsyncAppContext, Entity, ModelContext, ModelHandle, Task, WeakModelHandle}; -use language::{Anchor, Buffer, Language, LanguageRegistry}; +use language::{Anchor, Bias, Buffer, Language, LanguageRegistry}; use parking_lot::Mutex; use parsing::{CodeContextRetriever, DocumentDigest, PARSEABLE_ENTIRE_FILE_TYPES}; use postage::watch; @@ -713,7 +713,9 @@ impl SemanticIndex { .filter_map(|(buffer, range)| { let buffer = buffer.log_err()?; let range = buffer.read_with(&cx, |buffer, _| { - buffer.anchor_before(range.start)..buffer.anchor_after(range.end) + let start = buffer.clip_offset(range.start, Bias::Left); + let end = buffer.clip_offset(range.end, Bias::Right); + buffer.anchor_before(start)..buffer.anchor_after(end) }); Some(SearchResult { buffer, range }) }) From de0f53b39f5e41ef22cd90635de96dcb4e086259 Mon Sep 17 00:00:00 2001 From: Antonio Scandurra Date: Wed, 6 Sep 2023 11:40:59 +0200 Subject: [PATCH 07/11] Ensure `SemanticIndex::search` waits for indexing to complete --- crates/search/src/project_search.rs | 4 +- crates/semantic_index/src/semantic_index.rs | 490 +++++++++++------- .../src/semantic_index_tests.rs | 29 +- 3 files changed, 306 insertions(+), 217 deletions(-) diff --git a/crates/search/src/project_search.rs b/crates/search/src/project_search.rs index f1a0ff71d33398cb95a06fde5d39a4e14ce20ee5..9bebd448a724351eebb22cf12ec11b99a5d3ddeb 100644 --- a/crates/search/src/project_search.rs +++ b/crates/search/src/project_search.rs @@ -635,7 +635,9 @@ impl ProjectSearchView { let project = self.model.read(cx).project.clone(); let mut pending_file_count_rx = semantic_index.update(cx, |semantic_index, cx| { - semantic_index.index_project(project.clone(), cx); + semantic_index + .index_project(project.clone(), cx) + .detach_and_log_err(cx); semantic_index.pending_file_count(&project).unwrap() }); diff --git a/crates/semantic_index/src/semantic_index.rs b/crates/semantic_index/src/semantic_index.rs index 7ce4c9c2e45a02555da37ff9ca10b70d5a518786..a098152784a5822f4ec31ded0230bd5e6d808315 100644 --- a/crates/semantic_index/src/semantic_index.rs +++ b/crates/semantic_index/src/semantic_index.rs @@ -13,7 +13,7 @@ use collections::{BTreeMap, HashMap, HashSet}; use db::VectorDatabase; use embedding::{Embedding, EmbeddingProvider, OpenAIEmbeddings}; use embedding_queue::{EmbeddingQueue, FileToEmbed}; -use futures::{FutureExt, StreamExt}; +use futures::{future, FutureExt, StreamExt}; use gpui::{AppContext, AsyncAppContext, Entity, ModelContext, ModelHandle, Task, WeakModelHandle}; use language::{Anchor, Bias, Buffer, Language, LanguageRegistry}; use parking_lot::Mutex; @@ -23,6 +23,7 @@ use project::{search::PathMatcher, Fs, PathChange, Project, ProjectEntryId, Work use smol::channel; use std::{ cmp::Ordering, + future::Future, ops::Range, path::{Path, PathBuf}, sync::{Arc, Weak}, @@ -32,7 +33,7 @@ use util::{ channel::{ReleaseChannel, RELEASE_CHANNEL, RELEASE_CHANNEL_NAME}, http::HttpClient, paths::EMBEDDINGS_DIR, - ResultExt, TryFutureExt, + ResultExt, }; const SEMANTIC_INDEX_VERSION: usize = 9; @@ -132,7 +133,21 @@ impl WorktreeState { struct RegisteringWorktreeState { changed_paths: BTreeMap, ChangedPathInfo>, - _registration: Task>, + done_rx: watch::Receiver>, + _registration: Task<()>, +} + +impl RegisteringWorktreeState { + fn done(&self) -> impl Future { + let mut done_rx = self.done_rx.clone(); + async move { + while let Some(result) = done_rx.next().await { + if result.is_some() { + break; + } + } + } + } } struct RegisteredWorktreeState { @@ -173,13 +188,6 @@ impl ProjectState { } } - fn db_id_for_worktree_id(&self, id: WorktreeId) -> Option { - match self.worktrees.get(&id)? { - WorktreeState::Registering(_) => None, - WorktreeState::Registered(state) => Some(state.db_id), - } - } - fn worktree_id_for_db_id(&self, id: i64) -> Option { self.worktrees .iter() @@ -188,10 +196,6 @@ impl ProjectState { _ => None, }) } - - fn worktree(&mut self, id: WorktreeId) -> Option<&mut WorktreeState> { - self.worktrees.get_mut(&id) - } } #[derive(Clone)] @@ -390,17 +394,20 @@ impl SemanticIndex { }; let worktree = worktree.read(cx); - let worktree_state = if let Some(worktree_state) = project_state.worktree(worktree_id) { - worktree_state - } else { - return; - }; + let worktree_state = + if let Some(worktree_state) = project_state.worktrees.get_mut(&worktree_id) { + worktree_state + } else { + return; + }; worktree_state.paths_changed(changes, worktree); if let WorktreeState::Registered(_) = worktree_state { cx.spawn_weak(|this, mut cx| async move { cx.background().timer(BACKGROUND_INDEXING_DELAY).await; if let Some((this, project)) = this.upgrade(&cx).zip(project.upgrade(&cx)) { - this.update(&mut cx, |this, cx| this.index_project(project, cx)); + this.update(&mut cx, |this, cx| { + this.index_project(project, cx).detach_and_log_err(cx) + }); } }) .detach(); @@ -429,109 +436,126 @@ impl SemanticIndex { let worktree_id = worktree.id(); let db = self.db.clone(); let language_registry = self.language_registry.clone(); + let (mut done_tx, done_rx) = watch::channel(); let registration = cx.spawn(|this, mut cx| { async move { - scan_complete.await; - let db_id = db.find_or_create_worktree(worktree_abs_path).await?; - let mut file_mtimes = db.get_file_mtimes(db_id).await?; - let worktree = if let Some(project) = project.upgrade(&cx) { - project - .read_with(&cx, |project, cx| project.worktree_for_id(worktree_id, cx)) - .ok_or_else(|| anyhow!("worktree not found"))? - } else { - return anyhow::Ok(()); - }; - let worktree = worktree.read_with(&cx, |worktree, _| worktree.snapshot()); - let mut changed_paths = cx - .background() - .spawn(async move { - let mut changed_paths = BTreeMap::new(); - for file in worktree.files(false, 0) { - let absolute_path = worktree.absolutize(&file.path); - - if file.is_external || file.is_ignored || file.is_symlink { - continue; - } - - if let Ok(language) = language_registry - .language_for_file(&absolute_path, None) - .await - { - // Test if file is valid parseable file - if !PARSEABLE_ENTIRE_FILE_TYPES.contains(&language.name().as_ref()) - && &language.name().as_ref() != &"Markdown" - && language - .grammar() - .and_then(|grammar| grammar.embedding_config.as_ref()) - .is_none() - { + let register = async { + scan_complete.await; + let db_id = db.find_or_create_worktree(worktree_abs_path).await?; + let mut file_mtimes = db.get_file_mtimes(db_id).await?; + let worktree = if let Some(project) = project.upgrade(&cx) { + project + .read_with(&cx, |project, cx| project.worktree_for_id(worktree_id, cx)) + .ok_or_else(|| anyhow!("worktree not found"))? + } else { + return anyhow::Ok(()); + }; + let worktree = worktree.read_with(&cx, |worktree, _| worktree.snapshot()); + let mut changed_paths = cx + .background() + .spawn(async move { + let mut changed_paths = BTreeMap::new(); + for file in worktree.files(false, 0) { + let absolute_path = worktree.absolutize(&file.path); + + if file.is_external || file.is_ignored || file.is_symlink { continue; } - let stored_mtime = file_mtimes.remove(&file.path.to_path_buf()); - let already_stored = stored_mtime - .map_or(false, |existing_mtime| existing_mtime == file.mtime); - - if !already_stored { - changed_paths.insert( - file.path.clone(), - ChangedPathInfo { - mtime: file.mtime, - is_deleted: false, - }, - ); + if let Ok(language) = language_registry + .language_for_file(&absolute_path, None) + .await + { + // Test if file is valid parseable file + if !PARSEABLE_ENTIRE_FILE_TYPES + .contains(&language.name().as_ref()) + && &language.name().as_ref() != &"Markdown" + && language + .grammar() + .and_then(|grammar| grammar.embedding_config.as_ref()) + .is_none() + { + continue; + } + + let stored_mtime = file_mtimes.remove(&file.path.to_path_buf()); + let already_stored = stored_mtime + .map_or(false, |existing_mtime| { + existing_mtime == file.mtime + }); + + if !already_stored { + changed_paths.insert( + file.path.clone(), + ChangedPathInfo { + mtime: file.mtime, + is_deleted: false, + }, + ); + } } } - } - // Clean up entries from database that are no longer in the worktree. - for (path, mtime) in file_mtimes { - changed_paths.insert( - path.into(), - ChangedPathInfo { - mtime, - is_deleted: true, - }, - ); - } + // Clean up entries from database that are no longer in the worktree. + for (path, mtime) in file_mtimes { + changed_paths.insert( + path.into(), + ChangedPathInfo { + mtime, + is_deleted: true, + }, + ); + } - anyhow::Ok(changed_paths) - }) - .await?; - this.update(&mut cx, |this, cx| { - let project_state = this - .projects - .get_mut(&project) - .ok_or_else(|| anyhow!("project not registered"))?; - let project = project - .upgrade(cx) - .ok_or_else(|| anyhow!("project was dropped"))?; - - if let Some(WorktreeState::Registering(state)) = - project_state.worktrees.remove(&worktree_id) - { - changed_paths.extend(state.changed_paths); - } - project_state.worktrees.insert( - worktree_id, - WorktreeState::Registered(RegisteredWorktreeState { - db_id, - changed_paths, - }), - ); - this.index_project(project, cx); + anyhow::Ok(changed_paths) + }) + .await?; + this.update(&mut cx, |this, cx| { + let project_state = this + .projects + .get_mut(&project) + .ok_or_else(|| anyhow!("project not registered"))?; + let project = project + .upgrade(cx) + .ok_or_else(|| anyhow!("project was dropped"))?; + + if let Some(WorktreeState::Registering(state)) = + project_state.worktrees.remove(&worktree_id) + { + changed_paths.extend(state.changed_paths); + } + project_state.worktrees.insert( + worktree_id, + WorktreeState::Registered(RegisteredWorktreeState { + db_id, + changed_paths, + }), + ); + this.index_project(project, cx).detach_and_log_err(cx); + + anyhow::Ok(()) + })?; anyhow::Ok(()) - })?; + }; - anyhow::Ok(()) + if register.await.log_err().is_none() { + // Stop tracking this worktree if the registration failed. + this.update(&mut cx, |this, _| { + this.projects.get_mut(&project).map(|project_state| { + project_state.worktrees.remove(&worktree_id); + }); + }) + } + + *done_tx.borrow_mut() = Some(()); } - .log_err() }); project_state.worktrees.insert( worktree_id, WorktreeState::Registering(RegisteringWorktreeState { changed_paths: Default::default(), + done_rx, _registration: registration, }), ); @@ -567,7 +591,7 @@ impl SemanticIndex { // Register new worktrees worktrees.retain(|worktree| { let worktree_id = worktree.read(cx).id(); - project_state.worktree(worktree_id).is_none() + !project_state.worktrees.contains_key(&worktree_id) }); for worktree in worktrees { self.register_worktree(project.clone(), worktree, cx); @@ -595,25 +619,13 @@ impl SemanticIndex { excludes: Vec, cx: &mut ModelContext, ) -> Task>> { - let project_state = if let Some(state) = self.projects.get(&project.downgrade()) { - state - } else { - return Task::ready(Err(anyhow!("project not added"))); - }; - - let worktree_db_ids = project - .read(cx) - .worktrees(cx) - .filter_map(|worktree| { - let worktree_id = worktree.read(cx).id(); - project_state.db_id_for_worktree_id(worktree_id) - }) - .collect::>(); - + let index = self.index_project(project.clone(), cx); let embedding_provider = self.embedding_provider.clone(); let db_path = self.db.path().clone(); let fs = self.fs.clone(); cx.spawn(|this, mut cx| async move { + index.await?; + let t0 = Instant::now(); let database = VectorDatabase::new(fs.clone(), db_path.clone(), cx.background()).await?; @@ -630,6 +642,24 @@ impl SemanticIndex { t0.elapsed().as_millis() ); + let worktree_db_ids = this.read_with(&cx, |this, _| { + let project_state = this + .projects + .get(&project.downgrade()) + .ok_or_else(|| anyhow!("project was not indexed"))?; + let worktree_db_ids = project_state + .worktrees + .values() + .filter_map(|worktree| { + if let WorktreeState::Registered(worktree) = worktree { + Some(worktree.db_id) + } else { + None + } + }) + .collect::>(); + anyhow::Ok(worktree_db_ids) + })?; let file_ids = database .retrieve_included_file_ids(&worktree_db_ids, &includes, &excludes) .await?; @@ -723,7 +753,11 @@ impl SemanticIndex { }) } - pub fn index_project(&mut self, project: ModelHandle, cx: &mut ModelContext) { + pub fn index_project( + &mut self, + project: ModelHandle, + cx: &mut ModelContext, + ) -> Task> { if !self.projects.contains_key(&project.downgrade()) { log::trace!("Registering Project for Semantic Index"); @@ -740,96 +774,152 @@ impl SemanticIndex { .insert(project.downgrade(), ProjectState::new(subscription)); self.project_worktrees_changed(project.clone(), cx); } + let project_state = self.projects.get(&project.downgrade()).unwrap(); + let mut outstanding_job_count_rx = project_state.outstanding_job_count_rx.clone(); - let project_state = self.projects.get_mut(&project.downgrade()).unwrap(); + let db = self.db.clone(); + let language_registry = self.language_registry.clone(); + let parsing_files_tx = self.parsing_files_tx.clone(); + let worktree_registration = self.wait_for_worktree_registration(&project, cx); - let mut pending_files = Vec::new(); - let mut files_to_delete = Vec::new(); - let outstanding_job_count_tx = &project_state.outstanding_job_count_tx; - project_state - .worktrees - .retain(|worktree_id, worktree_state| { - let worktree = - if let Some(worktree) = project.read(cx).worktree_for_id(*worktree_id, cx) { - worktree - } else { - return false; - }; - let worktree_state = - if let WorktreeState::Registered(worktree_state) = worktree_state { - worktree_state - } else { - return true; - }; + cx.spawn(|this, mut cx| async move { + worktree_registration.await?; + + let mut pending_files = Vec::new(); + let mut files_to_delete = Vec::new(); + this.update(&mut cx, |this, cx| { + let project_state = this + .projects + .get_mut(&project.downgrade()) + .ok_or_else(|| anyhow!("project was dropped"))?; + let outstanding_job_count_tx = &project_state.outstanding_job_count_tx; + + project_state + .worktrees + .retain(|worktree_id, worktree_state| { + let worktree = if let Some(worktree) = + project.read(cx).worktree_for_id(*worktree_id, cx) + { + worktree + } else { + return false; + }; + let worktree_state = + if let WorktreeState::Registered(worktree_state) = worktree_state { + worktree_state + } else { + return true; + }; + + worktree_state.changed_paths.retain(|path, info| { + if info.is_deleted { + files_to_delete.push((worktree_state.db_id, path.clone())); + } else { + let absolute_path = worktree.read(cx).absolutize(path); + let job_handle = JobHandle::new(outstanding_job_count_tx); + pending_files.push(PendingFile { + absolute_path, + relative_path: path.clone(), + language: None, + job_handle, + modified_time: info.mtime, + worktree_db_id: worktree_state.db_id, + }); + } - worktree_state.changed_paths.retain(|path, info| { - if info.is_deleted { - files_to_delete.push((worktree_state.db_id, path.clone())); - } else { - let absolute_path = worktree.read(cx).absolutize(path); - let job_handle = JobHandle::new(&outstanding_job_count_tx); - pending_files.push(PendingFile { - absolute_path, - relative_path: path.clone(), - language: None, - job_handle, - modified_time: info.mtime, - worktree_db_id: worktree_state.db_id, + false }); - } - - false - }); - true - }); + true + }); - let db = self.db.clone(); - let language_registry = self.language_registry.clone(); - let parsing_files_tx = self.parsing_files_tx.clone(); - cx.background() - .spawn(async move { - for (worktree_db_id, path) in files_to_delete { - db.delete_file(worktree_db_id, path).await.log_err(); - } + anyhow::Ok(()) + })?; - let embeddings_for_digest = { - let mut files = HashMap::default(); - for pending_file in &pending_files { - files - .entry(pending_file.worktree_db_id) - .or_insert(Vec::new()) - .push(pending_file.relative_path.clone()); + cx.background() + .spawn(async move { + for (worktree_db_id, path) in files_to_delete { + db.delete_file(worktree_db_id, path).await.log_err(); } - Arc::new( - db.embeddings_for_files(files) - .await - .log_err() - .unwrap_or_default(), - ) - }; - for mut pending_file in pending_files { - if let Ok(language) = language_registry - .language_for_file(&pending_file.relative_path, None) - .await - { - if !PARSEABLE_ENTIRE_FILE_TYPES.contains(&language.name().as_ref()) - && &language.name().as_ref() != &"Markdown" - && language - .grammar() - .and_then(|grammar| grammar.embedding_config.as_ref()) - .is_none() + let embeddings_for_digest = { + let mut files = HashMap::default(); + for pending_file in &pending_files { + files + .entry(pending_file.worktree_db_id) + .or_insert(Vec::new()) + .push(pending_file.relative_path.clone()); + } + Arc::new( + db.embeddings_for_files(files) + .await + .log_err() + .unwrap_or_default(), + ) + }; + + for mut pending_file in pending_files { + if let Ok(language) = language_registry + .language_for_file(&pending_file.relative_path, None) + .await { - continue; + if !PARSEABLE_ENTIRE_FILE_TYPES.contains(&language.name().as_ref()) + && &language.name().as_ref() != &"Markdown" + && language + .grammar() + .and_then(|grammar| grammar.embedding_config.as_ref()) + .is_none() + { + continue; + } + pending_file.language = Some(language); } - pending_file.language = Some(language); + parsing_files_tx + .try_send((embeddings_for_digest.clone(), pending_file)) + .ok(); } - parsing_files_tx - .try_send((embeddings_for_digest.clone(), pending_file)) - .ok(); + + // Wait until we're done indexing. + while let Some(count) = outstanding_job_count_rx.next().await { + if count == 0 { + break; + } + } + }) + .await; + + Ok(()) + }) + } + + fn wait_for_worktree_registration( + &self, + project: &ModelHandle, + cx: &mut ModelContext, + ) -> Task> { + let project = project.downgrade(); + cx.spawn_weak(|this, cx| async move { + loop { + let mut pending_worktrees = Vec::new(); + this.upgrade(&cx) + .ok_or_else(|| anyhow!("semantic index dropped"))? + .read_with(&cx, |this, _| { + if let Some(project) = this.projects.get(&project) { + for worktree in project.worktrees.values() { + if let WorktreeState::Registering(worktree) = worktree { + pending_worktrees.push(worktree.done()); + } + } + } + }); + + if pending_worktrees.is_empty() { + break; + } else { + future::join_all(pending_worktrees).await; } - }) - .detach() + } + Ok(()) + }) } } diff --git a/crates/semantic_index/src/semantic_index_tests.rs b/crates/semantic_index/src/semantic_index_tests.rs index ca5f7df533a98753ff52e681cfc3fb6bf96fa778..fe1b6b9cf9bb7958a4aec99a34e411885ecbdb2a 100644 --- a/crates/semantic_index/src/semantic_index_tests.rs +++ b/crates/semantic_index/src/semantic_index_tests.rs @@ -87,7 +87,16 @@ async fn test_semantic_index(deterministic: Arc, cx: &mut TestApp let project = Project::test(fs.clone(), ["/the-root".as_ref()], cx).await; - semantic_index.update(cx, |store, cx| store.index_project(project.clone(), cx)); + let search_results = semantic_index.update(cx, |store, cx| { + store.search_project( + project.clone(), + "aaaaaabbbbzz".to_string(), + 5, + vec![], + vec![], + cx, + ) + }); let pending_file_count = semantic_index.read_with(cx, |index, _| index.pending_file_count(&project).unwrap()); deterministic.run_until_parked(); @@ -95,20 +104,7 @@ async fn test_semantic_index(deterministic: Arc, cx: &mut TestApp deterministic.advance_clock(EMBEDDING_QUEUE_FLUSH_TIMEOUT); assert_eq!(*pending_file_count.borrow(), 0); - let search_results = semantic_index - .update(cx, |store, cx| { - store.search_project( - project.clone(), - "aaaaaabbbbzz".to_string(), - 5, - vec![], - vec![], - cx, - ) - }) - .await - .unwrap(); - + let search_results = search_results.await.unwrap(); assert_search_results( &search_results, &[ @@ -185,11 +181,12 @@ async fn test_semantic_index(deterministic: Arc, cx: &mut TestApp deterministic.advance_clock(EMBEDDING_QUEUE_FLUSH_TIMEOUT); let prev_embedding_count = embedding_provider.embedding_count(); - semantic_index.update(cx, |store, cx| store.index_project(project.clone(), cx)); + let index = semantic_index.update(cx, |store, cx| store.index_project(project.clone(), cx)); deterministic.run_until_parked(); assert_eq!(*pending_file_count.borrow(), 1); deterministic.advance_clock(EMBEDDING_QUEUE_FLUSH_TIMEOUT); assert_eq!(*pending_file_count.borrow(), 0); + index.await.unwrap(); assert_eq!( embedding_provider.embedding_count() - prev_embedding_count, From ce62173534cff576776a92e154149153b183936e Mon Sep 17 00:00:00 2001 From: Antonio Scandurra Date: Wed, 6 Sep 2023 16:48:53 +0200 Subject: [PATCH 08/11] Rename `Document` to `Span` --- crates/semantic_index/src/db.rs | 57 ++++++++-------- crates/semantic_index/src/embedding_queue.rs | 54 +++++++-------- crates/semantic_index/src/parsing.rs | 66 +++++++++---------- crates/semantic_index/src/semantic_index.rs | 32 ++++----- .../src/semantic_index_tests.rs | 12 ++-- 5 files changed, 109 insertions(+), 112 deletions(-) diff --git a/crates/semantic_index/src/db.rs b/crates/semantic_index/src/db.rs index 5664210388ab68ccc52e55221aedf67afbd1b635..28bbd56156b820168b13998d41c1ea061706dcf9 100644 --- a/crates/semantic_index/src/db.rs +++ b/crates/semantic_index/src/db.rs @@ -1,6 +1,6 @@ use crate::{ embedding::Embedding, - parsing::{Document, DocumentDigest}, + parsing::{Span, SpanDigest}, SEMANTIC_INDEX_VERSION, }; use anyhow::{anyhow, Context, Result}; @@ -124,8 +124,8 @@ impl VectorDatabase { } log::trace!("vector database schema out of date. updating..."); - db.execute("DROP TABLE IF EXISTS documents", []) - .context("failed to drop 'documents' table")?; + db.execute("DROP TABLE IF EXISTS spans", []) + .context("failed to drop 'spans' table")?; db.execute("DROP TABLE IF EXISTS files", []) .context("failed to drop 'files' table")?; db.execute("DROP TABLE IF EXISTS worktrees", []) @@ -174,7 +174,7 @@ impl VectorDatabase { )?; db.execute( - "CREATE TABLE documents ( + "CREATE TABLE spans ( id INTEGER PRIMARY KEY AUTOINCREMENT, file_id INTEGER NOT NULL, start_byte INTEGER NOT NULL, @@ -211,7 +211,7 @@ impl VectorDatabase { worktree_id: i64, path: Arc, mtime: SystemTime, - documents: Vec, + spans: Vec, ) -> impl Future> { self.transact(move |db| { // Return the existing ID, if both the file and mtime match @@ -231,7 +231,7 @@ impl VectorDatabase { let t0 = Instant::now(); let mut query = db.prepare( " - INSERT INTO documents + INSERT INTO spans (file_id, start_byte, end_byte, name, embedding, digest) VALUES (?1, ?2, ?3, ?4, ?5, ?6) ", @@ -241,14 +241,14 @@ impl VectorDatabase { t0.elapsed().as_millis() ); - for document in documents { + for span in spans { query.execute(params![ file_id, - document.range.start.to_string(), - document.range.end.to_string(), - document.name, - document.embedding, - document.digest + span.range.start.to_string(), + span.range.end.to_string(), + span.name, + span.embedding, + span.digest ])?; } @@ -278,13 +278,13 @@ impl VectorDatabase { pub fn embeddings_for_files( &self, worktree_id_file_paths: HashMap>>, - ) -> impl Future>> { + ) -> impl Future>> { self.transact(move |db| { let mut query = db.prepare( " SELECT digest, embedding - FROM documents - LEFT JOIN files ON files.id = documents.file_id + FROM spans + LEFT JOIN files ON files.id = spans.file_id WHERE files.worktree_id = ? AND files.relative_path IN rarray(?) ", )?; @@ -297,10 +297,7 @@ impl VectorDatabase { .collect::>(), ); let rows = query.query_map(params![worktree_id, file_paths], |row| { - Ok(( - row.get::<_, DocumentDigest>(0)?, - row.get::<_, Embedding>(1)?, - )) + Ok((row.get::<_, SpanDigest>(0)?, row.get::<_, Embedding>(1)?)) })?; for row in rows { @@ -379,7 +376,7 @@ impl VectorDatabase { let file_ids = file_ids.to_vec(); self.transact(move |db| { let mut results = Vec::<(i64, f32)>::with_capacity(limit + 1); - Self::for_each_document(db, &file_ids, |id, embedding| { + Self::for_each_span(db, &file_ids, |id, embedding| { let similarity = embedding.similarity(&query_embedding); let ix = match results.binary_search_by(|(_, s)| { similarity.partial_cmp(&s).unwrap_or(Ordering::Equal) @@ -434,7 +431,7 @@ impl VectorDatabase { }) } - fn for_each_document( + fn for_each_span( db: &rusqlite::Connection, file_ids: &[i64], mut f: impl FnMut(i64, Embedding), @@ -444,7 +441,7 @@ impl VectorDatabase { SELECT id, embedding FROM - documents + spans WHERE file_id IN rarray(?) ", @@ -459,7 +456,7 @@ impl VectorDatabase { Ok(()) } - pub fn get_documents_by_ids( + pub fn spans_for_ids( &self, ids: &[i64], ) -> impl Future)>>> { @@ -468,16 +465,16 @@ impl VectorDatabase { let mut statement = db.prepare( " SELECT - documents.id, + spans.id, files.worktree_id, files.relative_path, - documents.start_byte, - documents.end_byte + spans.start_byte, + spans.end_byte FROM - documents, files + spans, files WHERE - documents.file_id = files.id AND - documents.id in rarray(?) + spans.file_id = files.id AND + spans.id in rarray(?) ", )?; @@ -500,7 +497,7 @@ impl VectorDatabase { for id in &ids { let value = values_by_id .remove(id) - .ok_or(anyhow!("missing document id {}", id))?; + .ok_or(anyhow!("missing span id {}", id))?; results.push(value); } diff --git a/crates/semantic_index/src/embedding_queue.rs b/crates/semantic_index/src/embedding_queue.rs index f1abbde3a4e2d528bc0b0b292511462c515b4a74..024881f0b808734a4e1d0fff2f48f07ddc88470b 100644 --- a/crates/semantic_index/src/embedding_queue.rs +++ b/crates/semantic_index/src/embedding_queue.rs @@ -1,4 +1,4 @@ -use crate::{embedding::EmbeddingProvider, parsing::Document, JobHandle}; +use crate::{embedding::EmbeddingProvider, parsing::Span, JobHandle}; use gpui::executor::Background; use parking_lot::Mutex; use smol::channel; @@ -9,7 +9,7 @@ pub struct FileToEmbed { pub worktree_id: i64, pub path: Arc, pub mtime: SystemTime, - pub documents: Vec, + pub spans: Vec, pub job_handle: JobHandle, } @@ -19,7 +19,7 @@ impl std::fmt::Debug for FileToEmbed { .field("worktree_id", &self.worktree_id) .field("path", &self.path) .field("mtime", &self.mtime) - .field("document", &self.documents) + .field("spans", &self.spans) .finish_non_exhaustive() } } @@ -29,13 +29,13 @@ impl PartialEq for FileToEmbed { self.worktree_id == other.worktree_id && self.path == other.path && self.mtime == other.mtime - && self.documents == other.documents + && self.spans == other.spans } } pub struct EmbeddingQueue { embedding_provider: Arc, - pending_batch: Vec, + pending_batch: Vec, executor: Arc, pending_batch_token_count: usize, finished_files_tx: channel::Sender, @@ -43,9 +43,9 @@ pub struct EmbeddingQueue { } #[derive(Clone)] -pub struct FileToEmbedFragment { +pub struct FileFragmentToEmbed { file: Arc>, - document_range: Range, + span_range: Range, } impl EmbeddingQueue { @@ -62,41 +62,41 @@ impl EmbeddingQueue { } pub fn push(&mut self, file: FileToEmbed) { - if file.documents.is_empty() { + if file.spans.is_empty() { self.finished_files_tx.try_send(file).unwrap(); return; } let file = Arc::new(Mutex::new(file)); - self.pending_batch.push(FileToEmbedFragment { + self.pending_batch.push(FileFragmentToEmbed { file: file.clone(), - document_range: 0..0, + span_range: 0..0, }); - let mut fragment_range = &mut self.pending_batch.last_mut().unwrap().document_range; + let mut fragment_range = &mut self.pending_batch.last_mut().unwrap().span_range; let mut saved_tokens = 0; - for (ix, document) in file.lock().documents.iter().enumerate() { - let document_token_count = if document.embedding.is_none() { - document.token_count + for (ix, span) in file.lock().spans.iter().enumerate() { + let span_token_count = if span.embedding.is_none() { + span.token_count } else { - saved_tokens += document.token_count; + saved_tokens += span.token_count; 0 }; - let next_token_count = self.pending_batch_token_count + document_token_count; + let next_token_count = self.pending_batch_token_count + span_token_count; if next_token_count > self.embedding_provider.max_tokens_per_batch() { let range_end = fragment_range.end; self.flush(); - self.pending_batch.push(FileToEmbedFragment { + self.pending_batch.push(FileFragmentToEmbed { file: file.clone(), - document_range: range_end..range_end, + span_range: range_end..range_end, }); - fragment_range = &mut self.pending_batch.last_mut().unwrap().document_range; + fragment_range = &mut self.pending_batch.last_mut().unwrap().span_range; } fragment_range.end = ix + 1; - self.pending_batch_token_count += document_token_count; + self.pending_batch_token_count += span_token_count; } log::trace!("Saved Tokens: {:?}", saved_tokens); } @@ -113,20 +113,20 @@ impl EmbeddingQueue { self.executor.spawn(async move { let mut spans = Vec::new(); - let mut document_count = 0; + let mut span_count = 0; for fragment in &batch { let file = fragment.file.lock(); - document_count += file.documents[fragment.document_range.clone()].len(); + span_count += file.spans[fragment.span_range.clone()].len(); spans.extend( { - file.documents[fragment.document_range.clone()] + file.spans[fragment.span_range.clone()] .iter().filter(|d| d.embedding.is_none()) .map(|d| d.content.clone()) } ); } - log::trace!("Documents Length: {:?}", document_count); + log::trace!("Documents Length: {:?}", span_count); log::trace!("Span Length: {:?}", spans.clone().len()); // If spans is 0, just send the fragment to the finished files if its the last one. @@ -143,11 +143,11 @@ impl EmbeddingQueue { Ok(embeddings) => { let mut embeddings = embeddings.into_iter(); for fragment in batch { - for document in - &mut fragment.file.lock().documents[fragment.document_range.clone()].iter_mut().filter(|d| d.embedding.is_none()) + for span in + &mut fragment.file.lock().spans[fragment.span_range.clone()].iter_mut().filter(|d| d.embedding.is_none()) { if let Some(embedding) = embeddings.next() { - document.embedding = Some(embedding); + span.embedding = Some(embedding); } else { // log::error!("number of embeddings returned different from number of documents"); diff --git a/crates/semantic_index/src/parsing.rs b/crates/semantic_index/src/parsing.rs index c0a94c6b7355cb51bc5193a2bfd01148c28f4162..b6fc000e1dc7530483b97418e15347f1b9985832 100644 --- a/crates/semantic_index/src/parsing.rs +++ b/crates/semantic_index/src/parsing.rs @@ -16,9 +16,9 @@ use std::{ use tree_sitter::{Parser, QueryCursor}; #[derive(Debug, PartialEq, Eq, Clone, Hash)] -pub struct DocumentDigest([u8; 20]); +pub struct SpanDigest([u8; 20]); -impl FromSql for DocumentDigest { +impl FromSql for SpanDigest { fn column_result(value: ValueRef) -> FromSqlResult { let blob = value.as_blob()?; let bytes = @@ -27,17 +27,17 @@ impl FromSql for DocumentDigest { expected_size: 20, blob_size: blob.len(), })?; - return Ok(DocumentDigest(bytes)); + return Ok(SpanDigest(bytes)); } } -impl ToSql for DocumentDigest { +impl ToSql for SpanDigest { fn to_sql(&self) -> rusqlite::Result { self.0.to_sql() } } -impl From<&'_ str> for DocumentDigest { +impl From<&'_ str> for SpanDigest { fn from(value: &'_ str) -> Self { let mut sha1 = Sha1::new(); sha1.update(value); @@ -46,12 +46,12 @@ impl From<&'_ str> for DocumentDigest { } #[derive(Debug, PartialEq, Clone)] -pub struct Document { +pub struct Span { pub name: String, pub range: Range, pub content: String, pub embedding: Option, - pub digest: DocumentDigest, + pub digest: SpanDigest, pub token_count: usize, } @@ -97,14 +97,14 @@ impl CodeContextRetriever { relative_path: &Path, language_name: Arc, content: &str, - ) -> Result> { + ) -> Result> { let document_span = ENTIRE_FILE_TEMPLATE .replace("", relative_path.to_string_lossy().as_ref()) .replace("", language_name.as_ref()) .replace("", &content); - let digest = DocumentDigest::from(document_span.as_str()); + let digest = SpanDigest::from(document_span.as_str()); let (document_span, token_count) = self.embedding_provider.truncate(&document_span); - Ok(vec![Document { + Ok(vec![Span { range: 0..content.len(), content: document_span, embedding: Default::default(), @@ -114,13 +114,13 @@ impl CodeContextRetriever { }]) } - fn parse_markdown_file(&self, relative_path: &Path, content: &str) -> Result> { + fn parse_markdown_file(&self, relative_path: &Path, content: &str) -> Result> { let document_span = MARKDOWN_CONTEXT_TEMPLATE .replace("", relative_path.to_string_lossy().as_ref()) .replace("", &content); - let digest = DocumentDigest::from(document_span.as_str()); + let digest = SpanDigest::from(document_span.as_str()); let (document_span, token_count) = self.embedding_provider.truncate(&document_span); - Ok(vec![Document { + Ok(vec![Span { range: 0..content.len(), content: document_span, embedding: None, @@ -191,32 +191,32 @@ impl CodeContextRetriever { relative_path: &Path, content: &str, language: Arc, - ) -> Result> { + ) -> Result> { let language_name = language.name(); if PARSEABLE_ENTIRE_FILE_TYPES.contains(&language_name.as_ref()) { return self.parse_entire_file(relative_path, language_name, &content); - } else if &language_name.to_string() == &"Markdown".to_string() { + } else if language_name.as_ref() == "Markdown" { return self.parse_markdown_file(relative_path, &content); } - let mut documents = self.parse_file(content, language)?; - for document in &mut documents { + let mut spans = self.parse_file(content, language)?; + for span in &mut spans { let document_content = CODE_CONTEXT_TEMPLATE .replace("", relative_path.to_string_lossy().as_ref()) .replace("", language_name.as_ref()) - .replace("item", &document.content); + .replace("item", &span.content); let (document_content, token_count) = self.embedding_provider.truncate(&document_content); - document.content = document_content; - document.token_count = token_count; + span.content = document_content; + span.token_count = token_count; } - Ok(documents) + Ok(spans) } - pub fn parse_file(&mut self, content: &str, language: Arc) -> Result> { + pub fn parse_file(&mut self, content: &str, language: Arc) -> Result> { let grammar = language .grammar() .ok_or_else(|| anyhow!("no grammar for language"))?; @@ -227,7 +227,7 @@ impl CodeContextRetriever { let language_scope = language.default_scope(); let placeholder = language_scope.collapsed_placeholder(); - let mut documents = Vec::new(); + let mut spans = Vec::new(); let mut collapsed_ranges_within = Vec::new(); let mut parsed_name_ranges = HashSet::new(); for (i, context_match) in matches.iter().enumerate() { @@ -267,22 +267,22 @@ impl CodeContextRetriever { collapsed_ranges_within.sort_by_key(|r| (r.start, Reverse(r.end))); - let mut document_content = String::new(); + let mut span_content = String::new(); for context_range in &context_match.context_ranges { add_content_from_range( - &mut document_content, + &mut span_content, content, context_range.clone(), context_match.start_col, ); - document_content.push_str("\n"); + span_content.push_str("\n"); } let mut offset = item_range.start; for collapsed_range in &collapsed_ranges_within { if collapsed_range.start > offset { add_content_from_range( - &mut document_content, + &mut span_content, content, offset..collapsed_range.start, context_match.start_col, @@ -291,24 +291,24 @@ impl CodeContextRetriever { } if collapsed_range.end > offset { - document_content.push_str(placeholder); + span_content.push_str(placeholder); offset = collapsed_range.end; } } if offset < item_range.end { add_content_from_range( - &mut document_content, + &mut span_content, content, offset..item_range.end, context_match.start_col, ); } - let sha1 = DocumentDigest::from(document_content.as_str()); - documents.push(Document { + let sha1 = SpanDigest::from(span_content.as_str()); + spans.push(Span { name, - content: document_content, + content: span_content, range: item_range.clone(), embedding: None, digest: sha1, @@ -316,7 +316,7 @@ impl CodeContextRetriever { }) } - return Ok(documents); + return Ok(spans); } } diff --git a/crates/semantic_index/src/semantic_index.rs b/crates/semantic_index/src/semantic_index.rs index a098152784a5822f4ec31ded0230bd5e6d808315..1c1c40fa27de3fed4641498ed20eba5877410965 100644 --- a/crates/semantic_index/src/semantic_index.rs +++ b/crates/semantic_index/src/semantic_index.rs @@ -17,7 +17,7 @@ use futures::{future, FutureExt, StreamExt}; use gpui::{AppContext, AsyncAppContext, Entity, ModelContext, ModelHandle, Task, WeakModelHandle}; use language::{Anchor, Bias, Buffer, Language, LanguageRegistry}; use parking_lot::Mutex; -use parsing::{CodeContextRetriever, DocumentDigest, PARSEABLE_ENTIRE_FILE_TYPES}; +use parsing::{CodeContextRetriever, SpanDigest, PARSEABLE_ENTIRE_FILE_TYPES}; use postage::watch; use project::{search::PathMatcher, Fs, PathChange, Project, ProjectEntryId, Worktree, WorktreeId}; use smol::channel; @@ -36,7 +36,7 @@ use util::{ ResultExt, }; -const SEMANTIC_INDEX_VERSION: usize = 9; +const SEMANTIC_INDEX_VERSION: usize = 10; const BACKGROUND_INDEXING_DELAY: Duration = Duration::from_secs(5 * 60); const EMBEDDING_QUEUE_FLUSH_TIMEOUT: Duration = Duration::from_millis(250); @@ -84,7 +84,7 @@ pub struct SemanticIndex { db: VectorDatabase, embedding_provider: Arc, language_registry: Arc, - parsing_files_tx: channel::Sender<(Arc>, PendingFile)>, + parsing_files_tx: channel::Sender<(Arc>, PendingFile)>, _embedding_task: Task<()>, _parsing_files_tasks: Vec>, projects: HashMap, ProjectState>, @@ -252,16 +252,16 @@ impl SemanticIndex { let db = db.clone(); async move { while let Ok(file) = embedded_files.recv().await { - db.insert_file(file.worktree_id, file.path, file.mtime, file.documents) + db.insert_file(file.worktree_id, file.path, file.mtime, file.spans) .await .log_err(); } } }); - // Parse files into embeddable documents. + // Parse files into embeddable spans. let (parsing_files_tx, parsing_files_rx) = - channel::unbounded::<(Arc>, PendingFile)>(); + channel::unbounded::<(Arc>, PendingFile)>(); let embedding_queue = Arc::new(Mutex::new(embedding_queue)); let mut _parsing_files_tasks = Vec::new(); for _ in 0..cx.background().num_cpus() { @@ -320,26 +320,26 @@ impl SemanticIndex { pending_file: PendingFile, retriever: &mut CodeContextRetriever, embedding_queue: &Arc>, - embeddings_for_digest: &HashMap, + embeddings_for_digest: &HashMap, ) { let Some(language) = pending_file.language else { return; }; if let Some(content) = fs.load(&pending_file.absolute_path).await.log_err() { - if let Some(mut documents) = retriever + if let Some(mut spans) = retriever .parse_file_with_template(&pending_file.relative_path, &content, language) .log_err() { log::trace!( - "parsed path {:?}: {} documents", + "parsed path {:?}: {} spans", pending_file.relative_path, - documents.len() + spans.len() ); - for document in documents.iter_mut() { - if let Some(embedding) = embeddings_for_digest.get(&document.digest) { - document.embedding = Some(embedding.to_owned()); + for span in &mut spans { + if let Some(embedding) = embeddings_for_digest.get(&span.digest) { + span.embedding = Some(embedding.to_owned()); } } @@ -348,7 +348,7 @@ impl SemanticIndex { path: pending_file.relative_path, mtime: pending_file.modified_time, job_handle: pending_file.job_handle, - documents, + spans: spans, }); } } @@ -708,13 +708,13 @@ impl SemanticIndex { } let ids = results.into_iter().map(|(id, _)| id).collect::>(); - let documents = database.get_documents_by_ids(ids.as_slice()).await?; + let spans = database.spans_for_ids(ids.as_slice()).await?; let mut tasks = Vec::new(); let mut ranges = Vec::new(); let weak_project = project.downgrade(); project.update(&mut cx, |project, cx| { - for (worktree_db_id, file_path, byte_range) in documents { + for (worktree_db_id, file_path, byte_range) in spans { let project_state = if let Some(state) = this.read(cx).projects.get(&weak_project) { state diff --git a/crates/semantic_index/src/semantic_index_tests.rs b/crates/semantic_index/src/semantic_index_tests.rs index fe1b6b9cf9bb7958a4aec99a34e411885ecbdb2a..ffd8db87814cacb09143381891b993ca86e173e7 100644 --- a/crates/semantic_index/src/semantic_index_tests.rs +++ b/crates/semantic_index/src/semantic_index_tests.rs @@ -1,7 +1,7 @@ use crate::{ embedding::{DummyEmbeddings, Embedding, EmbeddingProvider}, embedding_queue::EmbeddingQueue, - parsing::{subtract_ranges, CodeContextRetriever, Document, DocumentDigest}, + parsing::{subtract_ranges, CodeContextRetriever, Span, SpanDigest}, semantic_index_settings::SemanticIndexSettings, FileToEmbed, JobHandle, SearchResult, SemanticIndex, EMBEDDING_QUEUE_FLUSH_TIMEOUT, }; @@ -204,15 +204,15 @@ async fn test_embedding_batching(cx: &mut TestAppContext, mut rng: StdRng) { worktree_id: 5, path: Path::new(&format!("path-{file_ix}")).into(), mtime: SystemTime::now(), - documents: (0..rng.gen_range(4..22)) + spans: (0..rng.gen_range(4..22)) .map(|document_ix| { let content_len = rng.gen_range(10..100); let content = RandomCharIter::new(&mut rng) .with_simple_text() .take(content_len) .collect::(); - let digest = DocumentDigest::from(content.as_str()); - Document { + let digest = SpanDigest::from(content.as_str()); + Span { range: 0..10, embedding: None, name: format!("document {document_ix}"), @@ -245,7 +245,7 @@ async fn test_embedding_batching(cx: &mut TestAppContext, mut rng: StdRng) { .iter() .map(|file| { let mut file = file.clone(); - for doc in &mut file.documents { + for doc in &mut file.spans { doc.embedding = Some(embedding_provider.embed_sync(doc.content.as_ref())); } file @@ -437,7 +437,7 @@ async fn test_code_context_retrieval_json() { } fn assert_documents_eq( - documents: &[Document], + documents: &[Span], expected_contents_and_start_offsets: &[(String, usize)], ) { assert_eq!( From 3ad1befb11947200a4bfdcddec837732a8b7c6ce Mon Sep 17 00:00:00 2001 From: Antonio Scandurra Date: Thu, 7 Sep 2023 15:07:21 +0200 Subject: [PATCH 09/11] Remove unneeded logging Co-Authored-By: Kyle Caverly --- crates/semantic_index/src/embedding_queue.rs | 9 --------- 1 file changed, 9 deletions(-) diff --git a/crates/semantic_index/src/embedding_queue.rs b/crates/semantic_index/src/embedding_queue.rs index 024881f0b808734a4e1d0fff2f48f07ddc88470b..104a4eb8ee1f971237a17de8074817df9e78b69b 100644 --- a/crates/semantic_index/src/embedding_queue.rs +++ b/crates/semantic_index/src/embedding_queue.rs @@ -75,12 +75,10 @@ impl EmbeddingQueue { }); let mut fragment_range = &mut self.pending_batch.last_mut().unwrap().span_range; - let mut saved_tokens = 0; for (ix, span) in file.lock().spans.iter().enumerate() { let span_token_count = if span.embedding.is_none() { span.token_count } else { - saved_tokens += span.token_count; 0 }; @@ -98,7 +96,6 @@ impl EmbeddingQueue { fragment_range.end = ix + 1; self.pending_batch_token_count += span_token_count; } - log::trace!("Saved Tokens: {:?}", saved_tokens); } pub fn flush(&mut self) { @@ -113,10 +110,8 @@ impl EmbeddingQueue { self.executor.spawn(async move { let mut spans = Vec::new(); - let mut span_count = 0; for fragment in &batch { let file = fragment.file.lock(); - span_count += file.spans[fragment.span_range.clone()].len(); spans.extend( { file.spans[fragment.span_range.clone()] @@ -126,9 +121,6 @@ impl EmbeddingQueue { ); } - log::trace!("Documents Length: {:?}", span_count); - log::trace!("Span Length: {:?}", spans.clone().len()); - // If spans is 0, just send the fragment to the finished files if its the last one. if spans.len() == 0 { for fragment in batch.clone() { @@ -149,7 +141,6 @@ impl EmbeddingQueue { if let Some(embedding) = embeddings.next() { span.embedding = Some(embedding); } else { - // log::error!("number of embeddings returned different from number of documents"); } } From 757a28585256407787e4c97e00eb5c720e60c16b Mon Sep 17 00:00:00 2001 From: Antonio Scandurra Date: Thu, 7 Sep 2023 15:15:16 +0200 Subject: [PATCH 10/11] Keep dropping the `documents` table if it exists This is because we renamed `documents` to `spans`. Co-Authored-By: Kyle Caverly --- crates/semantic_index/src/db.rs | 4 ++++ crates/semantic_index/src/semantic_index.rs | 24 ++++++++++----------- 2 files changed, 16 insertions(+), 12 deletions(-) diff --git a/crates/semantic_index/src/db.rs b/crates/semantic_index/src/db.rs index 28bbd56156b820168b13998d41c1ea061706dcf9..c35057594a09674c338f4a1ce1150e0dc9281aec 100644 --- a/crates/semantic_index/src/db.rs +++ b/crates/semantic_index/src/db.rs @@ -124,6 +124,10 @@ impl VectorDatabase { } log::trace!("vector database schema out of date. updating..."); + // We renamed the `documents` table to `spans`, so we want to drop + // `documents` without recreating it if it exists. + db.execute("DROP TABLE IF EXISTS documents", []) + .context("failed to drop 'documents' table")?; db.execute("DROP TABLE IF EXISTS spans", []) .context("failed to drop 'spans' table")?; db.execute("DROP TABLE IF EXISTS files", []) diff --git a/crates/semantic_index/src/semantic_index.rs b/crates/semantic_index/src/semantic_index.rs index 1c1c40fa27de3fed4641498ed20eba5877410965..8bba2f1d0ea73af2d6106a95ce4270c2c01abba4 100644 --- a/crates/semantic_index/src/semantic_index.rs +++ b/crates/semantic_index/src/semantic_index.rs @@ -92,8 +92,8 @@ pub struct SemanticIndex { struct ProjectState { worktrees: HashMap, - outstanding_job_count_rx: watch::Receiver, - outstanding_job_count_tx: Arc>>, + pending_file_count_rx: watch::Receiver, + pending_file_count_tx: Arc>>, _subscription: gpui::Subscription, } @@ -178,12 +178,12 @@ impl JobHandle { impl ProjectState { fn new(subscription: gpui::Subscription) -> Self { - let (outstanding_job_count_tx, outstanding_job_count_rx) = watch::channel_with(0); - let outstanding_job_count_tx = Arc::new(Mutex::new(outstanding_job_count_tx)); + let (pending_file_count_tx, pending_file_count_rx) = watch::channel_with(0); + let pending_file_count_tx = Arc::new(Mutex::new(pending_file_count_tx)); Self { worktrees: Default::default(), - outstanding_job_count_rx, - outstanding_job_count_tx, + pending_file_count_rx, + pending_file_count_tx, _subscription: subscription, } } @@ -605,7 +605,7 @@ impl SemanticIndex { Some( self.projects .get(&project.downgrade())? - .outstanding_job_count_rx + .pending_file_count_rx .clone(), ) } @@ -774,8 +774,8 @@ impl SemanticIndex { .insert(project.downgrade(), ProjectState::new(subscription)); self.project_worktrees_changed(project.clone(), cx); } - let project_state = self.projects.get(&project.downgrade()).unwrap(); - let mut outstanding_job_count_rx = project_state.outstanding_job_count_rx.clone(); + let project_state = &self.projects[&project.downgrade()]; + let mut pending_file_count_rx = project_state.pending_file_count_rx.clone(); let db = self.db.clone(); let language_registry = self.language_registry.clone(); @@ -792,7 +792,7 @@ impl SemanticIndex { .projects .get_mut(&project.downgrade()) .ok_or_else(|| anyhow!("project was dropped"))?; - let outstanding_job_count_tx = &project_state.outstanding_job_count_tx; + let pending_file_count_tx = &project_state.pending_file_count_tx; project_state .worktrees @@ -816,7 +816,7 @@ impl SemanticIndex { files_to_delete.push((worktree_state.db_id, path.clone())); } else { let absolute_path = worktree.read(cx).absolutize(path); - let job_handle = JobHandle::new(outstanding_job_count_tx); + let job_handle = JobHandle::new(pending_file_count_tx); pending_files.push(PendingFile { absolute_path, relative_path: path.clone(), @@ -879,7 +879,7 @@ impl SemanticIndex { } // Wait until we're done indexing. - while let Some(count) = outstanding_job_count_rx.next().await { + while let Some(count) = pending_file_count_rx.next().await { if count == 0 { break; } From a45c8c380f04f5f39bb360374007379ea1f96781 Mon Sep 17 00:00:00 2001 From: Antonio Scandurra Date: Thu, 7 Sep 2023 15:25:23 +0200 Subject: [PATCH 11/11] :lipstick: --- crates/semantic_index/src/embedding_queue.rs | 81 ++++++++++---------- 1 file changed, 41 insertions(+), 40 deletions(-) diff --git a/crates/semantic_index/src/embedding_queue.rs b/crates/semantic_index/src/embedding_queue.rs index 104a4eb8ee1f971237a17de8074817df9e78b69b..3026eef9ae1cd21fcbb5cfb3ba941ed2a83f1884 100644 --- a/crates/semantic_index/src/embedding_queue.rs +++ b/crates/semantic_index/src/embedding_queue.rs @@ -108,54 +108,55 @@ impl EmbeddingQueue { let finished_files_tx = self.finished_files_tx.clone(); let embedding_provider = self.embedding_provider.clone(); - self.executor.spawn(async move { - let mut spans = Vec::new(); - for fragment in &batch { - let file = fragment.file.lock(); - spans.extend( - { + self.executor + .spawn(async move { + let mut spans = Vec::new(); + for fragment in &batch { + let file = fragment.file.lock(); + spans.extend( file.spans[fragment.span_range.clone()] - .iter().filter(|d| d.embedding.is_none()) - .map(|d| d.content.clone()) - } - ); - } - - // If spans is 0, just send the fragment to the finished files if its the last one. - if spans.len() == 0 { - for fragment in batch.clone() { - if let Some(file) = Arc::into_inner(fragment.file) { - finished_files_tx.try_send(file.into_inner()).unwrap(); - } + .iter() + .filter(|d| d.embedding.is_none()) + .map(|d| d.content.clone()), + ); } - return; - }; - - match embedding_provider.embed_batch(spans).await { - Ok(embeddings) => { - let mut embeddings = embeddings.into_iter(); - for fragment in batch { - for span in - &mut fragment.file.lock().spans[fragment.span_range.clone()].iter_mut().filter(|d| d.embedding.is_none()) - { - if let Some(embedding) = embeddings.next() { - span.embedding = Some(embedding); - } else { - log::error!("number of embeddings returned different from number of documents"); - } - } + // If spans is 0, just send the fragment to the finished files if its the last one. + if spans.is_empty() { + for fragment in batch.clone() { if let Some(file) = Arc::into_inner(fragment.file) { finished_files_tx.try_send(file.into_inner()).unwrap(); } } + return; + }; + + match embedding_provider.embed_batch(spans).await { + Ok(embeddings) => { + let mut embeddings = embeddings.into_iter(); + for fragment in batch { + for span in &mut fragment.file.lock().spans[fragment.span_range.clone()] + .iter_mut() + .filter(|d| d.embedding.is_none()) + { + if let Some(embedding) = embeddings.next() { + span.embedding = Some(embedding); + } else { + log::error!("number of embeddings != number of documents"); + } + } + + if let Some(file) = Arc::into_inner(fragment.file) { + finished_files_tx.try_send(file.into_inner()).unwrap(); + } + } + } + Err(error) => { + log::error!("{:?}", error); + } } - Err(error) => { - log::error!("{:?}", error); - } - } - }) - .detach(); + }) + .detach(); } pub fn finished_files(&self) -> channel::Receiver {