From 773850f477e304da9a9c34bab5a70cc5ff0ba5a3 Mon Sep 17 00:00:00 2001 From: Michael Sloan Date: Mon, 29 Sep 2025 16:15:00 -0600 Subject: [PATCH] zeta2: Use bounded parallelism for tree-sitter indexing + await completion in zeta_cli (#39147) Also skips indexing files that don't have a suffix that indicates a known language, and skips when the language doesn't have an outline grammar. Release Notes: - N/A --------- Co-authored-by: Agus --- Cargo.lock | 2 + crates/acp_thread/src/diff.rs | 2 +- crates/assistant_tools/src/edit_file_tool.rs | 2 +- crates/edit_prediction_context/Cargo.toml | 1 + .../src/edit_prediction_context.rs | 9 +- .../src/syntax_index.rs | 490 +++++++++++------- crates/language/src/language_registry.rs | 10 +- crates/markdown/src/markdown.rs | 2 +- crates/project/src/lsp_store.rs | 2 +- crates/project/src/project_tests.rs | 2 +- crates/worktree/src/worktree.rs | 2 +- crates/zeta2/src/zeta2.rs | 18 +- crates/zeta2_tools/src/zeta2_tools.rs | 1 + crates/zeta_cli/Cargo.toml | 1 + crates/zeta_cli/src/main.rs | 38 +- 15 files changed, 379 insertions(+), 203 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 845e4a3eb288645a1bd7d911446ebaaa4c4d5abd..aeb82b50beaf83353a12acee782d7541ffb0f6c8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5177,6 +5177,7 @@ dependencies = [ "language", "log", "ordered-float 2.10.1", + "postage", "pretty_assertions", "project", "regex", @@ -20746,6 +20747,7 @@ dependencies = [ "workspace-hack", "zeta", "zeta2", + "zlog", ] [[package]] diff --git a/crates/acp_thread/src/diff.rs b/crates/acp_thread/src/diff.rs index 753f157af934d6c92fcd6766aa645ec45c6b22f7..15de12af27fe233bad4ad8ebb2893ffa5fbdd598 100644 --- a/crates/acp_thread/src/diff.rs +++ b/crates/acp_thread/src/diff.rs @@ -31,7 +31,7 @@ impl Diff { let buffer = new_buffer.clone(); async move |_, cx| { let language = language_registry - .language_for_file_path(Path::new(&path)) + .load_language_for_file_path(Path::new(&path)) .await .log_err(); diff --git a/crates/assistant_tools/src/edit_file_tool.rs b/crates/assistant_tools/src/edit_file_tool.rs index c8b7add0b8f17f70d2157681b3ae115535a7616e..0dc889dfb5f8b9b9956f2248b1c9033ca1357d89 100644 --- a/crates/assistant_tools/src/edit_file_tool.rs +++ b/crates/assistant_tools/src/edit_file_tool.rs @@ -1161,7 +1161,7 @@ async fn build_buffer( LineEnding::normalize(&mut text); let text = Rope::from(text); let language = cx - .update(|_cx| language_registry.language_for_file_path(&path))? + .update(|_cx| language_registry.load_language_for_file_path(&path))? .await .ok(); let buffer = cx.new(|cx| { diff --git a/crates/edit_prediction_context/Cargo.toml b/crates/edit_prediction_context/Cargo.toml index d4321b10fa6ab338c4642ccc6678094f7bb1a385..c34386b3fb77565e627887155055917ed6ceb40c 100644 --- a/crates/edit_prediction_context/Cargo.toml +++ b/crates/edit_prediction_context/Cargo.toml @@ -23,6 +23,7 @@ itertools.workspace = true language.workspace = true log.workspace = true ordered-float.workspace = true +postage.workspace = true project.workspace = true regex.workspace = true serde.workspace = true diff --git a/crates/edit_prediction_context/src/edit_prediction_context.rs b/crates/edit_prediction_context/src/edit_prediction_context.rs index 1118e64eddbbb0bf5bd3a6fb95f41fa962b499bb..46fc11bb7b8c67663c8d73cf3ac51b3d614aaa5f 100644 --- a/crates/edit_prediction_context/src/edit_prediction_context.rs +++ b/crates/edit_prediction_context/src/edit_prediction_context.rs @@ -6,6 +6,8 @@ mod reference; mod syntax_index; mod text_similarity; +use std::sync::Arc; + use gpui::{App, AppContext as _, Entity, Task}; use language::BufferSnapshot; use text::{Point, ToOffset as _}; @@ -33,8 +35,10 @@ impl EditPredictionContext { cx: &mut App, ) -> Task> { if let Some(syntax_index) = syntax_index { - let index_state = syntax_index.read_with(cx, |index, _cx| index.state().clone()); + let index_state = + syntax_index.read_with(cx, |index, _cx| Arc::downgrade(index.state())); cx.background_spawn(async move { + let index_state = index_state.upgrade()?; let index_state = index_state.lock().await; Self::gather_context(cursor_point, &buffer, &excerpt_options, Some(&index_state)) }) @@ -237,7 +241,8 @@ mod tests { let lang_id = lang.id(); language_registry.add(Arc::new(lang)); - let index = cx.new(|cx| SyntaxIndex::new(&project, cx)); + let file_indexing_parallelism = 2; + let index = cx.new(|cx| SyntaxIndex::new(&project, file_indexing_parallelism, cx)); cx.run_until_parked(); (project, index, lang_id) diff --git a/crates/edit_prediction_context/src/syntax_index.rs b/crates/edit_prediction_context/src/syntax_index.rs index 4e890bd4230c05679b56f56d6a089fd0e5ebb0a4..c2b9b0540484071bcc145572b1a703baf889b7e9 100644 --- a/crates/edit_prediction_context/src/syntax_index.rs +++ b/crates/edit_prediction_context/src/syntax_index.rs @@ -1,13 +1,18 @@ +use anyhow::{Result, anyhow}; use collections::{HashMap, HashSet}; +use futures::channel::mpsc; use futures::lock::Mutex; -use gpui::{App, AppContext as _, Context, Entity, Task, WeakEntity}; +use futures::{FutureExt as _, StreamExt, future}; +use gpui::{App, AppContext as _, AsyncApp, Context, Entity, Task, WeakEntity}; +use itertools::Itertools; use language::{Buffer, BufferEvent}; +use postage::stream::Stream as _; use project::buffer_store::{BufferStore, BufferStoreEvent}; use project::worktree_store::{WorktreeStore, WorktreeStoreEvent}; use project::{PathChange, Project, ProjectEntryId, ProjectPath}; use slotmap::SlotMap; use std::iter; -use std::ops::Range; +use std::ops::{DerefMut, Range}; use std::sync::Arc; use text::BufferId; use util::{RangeExt as _, debug_panic, some_or_debug_panic}; @@ -17,42 +22,56 @@ use crate::declaration::{ }; use crate::outline::declarations_in_buffer; +// TODO +// +// * Also queue / debounce buffer changes. A challenge for this is that use of +// `buffer_declarations_containing_range` assumes that the index is always immediately up to date. +// +// * Add a per language configuration for skipping indexing. + // Potential future improvements: // +// * Prevent indexing of a large file from blocking the queue. +// // * Send multiple selected excerpt ranges. Challenge is that excerpt ranges influence which // references are present and their scores. // Potential future optimizations: // -// * Cache of buffers for files +// * Index files on multiple threads in Zed (currently only parallel for the CLI). Adding some kind +// of priority system to the background executor could help - it's single threaded for now to avoid +// interfering with other work. // -// * Parse files directly instead of loading into a Rope. Make SyntaxMap generic to handle embedded -// languages? Will also need to find line boundaries, but that can be done by scanning characters in -// the flat representation. +// * Parse files directly instead of loading into a Rope. +// +// - This would allow the task handling dirty_files to be done entirely on the background executor. +// +// - Make SyntaxMap generic to handle embedded languages? Will also need to find line boundaries, +// but that can be done by scanning characters in the flat representation. // // * Use something similar to slotmap without key versions. // // * Concurrent slotmap -// -// * Use queue for parsing pub struct SyntaxIndex { state: Arc>, project: WeakEntity, + initial_file_indexing_done_rx: postage::watch::Receiver, } -#[derive(Default)] pub struct SyntaxIndexState { declarations: SlotMap, identifiers: HashMap>, files: HashMap, buffers: HashMap, + dirty_files: HashMap, + dirty_files_tx: mpsc::Sender<()>, + _file_indexing_task: Option>, } #[derive(Debug, Default)] struct FileState { declarations: Vec, - task: Option>, } #[derive(Default)] @@ -62,33 +81,107 @@ struct BufferState { } impl SyntaxIndex { - pub fn new(project: &Entity, cx: &mut Context) -> Self { - let mut this = Self { + pub fn new( + project: &Entity, + file_indexing_parallelism: usize, + cx: &mut Context, + ) -> Self { + assert!(file_indexing_parallelism > 0); + let (dirty_files_tx, mut dirty_files_rx) = mpsc::channel::<()>(1); + let (mut initial_file_indexing_done_tx, initial_file_indexing_done_rx) = + postage::watch::channel(); + + let initial_state = SyntaxIndexState { + declarations: SlotMap::default(), + identifiers: HashMap::default(), + files: HashMap::default(), + buffers: HashMap::default(), + dirty_files: HashMap::default(), + dirty_files_tx, + _file_indexing_task: None, + }; + let this = Self { project: project.downgrade(), - state: Arc::new(Mutex::new(SyntaxIndexState::default())), + state: Arc::new(Mutex::new(initial_state)), + initial_file_indexing_done_rx, }; let worktree_store = project.read(cx).worktree_store(); - cx.subscribe(&worktree_store, Self::handle_worktree_store_event) - .detach(); - - for worktree in worktree_store + let initial_worktree_snapshots = worktree_store .read(cx) .worktrees() .map(|w| w.read(cx).snapshot()) - .collect::>() - { - for entry in worktree.files(false, 0) { - this.update_file( - entry.id, - ProjectPath { - worktree_id: worktree.id(), - path: entry.path.clone(), - }, - cx, - ); - } - } + .collect::>(); + this.state.try_lock().unwrap()._file_indexing_task = + Some(cx.spawn(async move |this, cx| { + let snapshots_file_count = initial_worktree_snapshots + .iter() + .map(|worktree| worktree.file_count()) + .sum::(); + let chunk_size = snapshots_file_count.div_ceil(file_indexing_parallelism); + let chunk_count = snapshots_file_count.div_ceil(chunk_size); + let file_chunks = initial_worktree_snapshots + .iter() + .flat_map(|worktree| { + let worktree_id = worktree.id(); + worktree.files(false, 0).map(move |entry| { + ( + entry.id, + ProjectPath { + worktree_id, + path: entry.path.clone(), + }, + ) + }) + }) + .chunks(chunk_size); + + let mut tasks = Vec::with_capacity(chunk_count); + for chunk in file_chunks.into_iter() { + tasks.push(Self::update_dirty_files( + &this, + chunk.into_iter().collect(), + cx.clone(), + )); + } + futures::future::join_all(tasks).await; + + log::info!("Finished initial file indexing"); + *initial_file_indexing_done_tx.borrow_mut() = true; + + let Ok(state) = this.read_with(cx, |this, _cx| this.state.clone()) else { + return; + }; + while dirty_files_rx.next().await.is_some() { + let mut state = state.lock().await; + let was_underused = state.dirty_files.capacity() > 255 + && state.dirty_files.len() * 8 < state.dirty_files.capacity(); + let dirty_files = state.dirty_files.drain().collect::>(); + if was_underused { + state.dirty_files.shrink_to_fit(); + } + drop(state); + if dirty_files.is_empty() { + continue; + } + + let chunk_size = dirty_files.len().div_ceil(file_indexing_parallelism); + let chunk_count = dirty_files.len().div_ceil(chunk_size); + let mut tasks = Vec::with_capacity(chunk_count); + let chunks = dirty_files.into_iter().chunks(chunk_size); + for chunk in chunks.into_iter() { + tasks.push(Self::update_dirty_files( + &this, + chunk.into_iter().collect(), + cx.clone(), + )); + } + futures::future::join_all(tasks).await; + } + })); + + cx.subscribe(&worktree_store, Self::handle_worktree_store_event) + .detach(); let buffer_store = project.read(cx).buffer_store().clone(); for buffer in buffer_store.read(cx).buffers().collect::>() { @@ -100,6 +193,42 @@ impl SyntaxIndex { this } + async fn update_dirty_files( + this: &WeakEntity, + dirty_files: Vec<(ProjectEntryId, ProjectPath)>, + mut cx: AsyncApp, + ) { + for (entry_id, project_path) in dirty_files { + let Ok(task) = this.update(&mut cx, |this, cx| { + this.update_file(entry_id, project_path, cx) + }) else { + return; + }; + task.await; + } + } + + pub fn wait_for_initial_file_indexing(&self, cx: &App) -> Task> { + if *self.initial_file_indexing_done_rx.borrow() { + Task::ready(Ok(())) + } else { + let mut rx = self.initial_file_indexing_done_rx.clone(); + cx.background_spawn(async move { + loop { + match rx.recv().await { + Some(true) => return Ok(()), + Some(false) => {} + None => { + return Err(anyhow!( + "SyntaxIndex dropped while waiting for initial file indexing" + )); + } + } + } + }) + } + } + fn handle_worktree_store_event( &mut self, _worktree_store: Entity, @@ -112,21 +241,26 @@ impl SyntaxIndex { let state = Arc::downgrade(&self.state); let worktree_id = *worktree_id; let updated_entries_set = updated_entries_set.clone(); - cx.spawn(async move |this, cx| { + cx.background_spawn(async move { let Some(state) = state.upgrade() else { return }; + let mut state = state.lock().await; for (path, entry_id, path_change) in updated_entries_set.iter() { if let PathChange::Removed = path_change { - state.lock().await.files.remove(entry_id); + state.files.remove(entry_id); + state.dirty_files.remove(entry_id); } else { let project_path = ProjectPath { worktree_id, path: path.clone(), }; - this.update(cx, |this, cx| { - this.update_file(*entry_id, project_path, cx); - }) - .ok(); + state.dirty_files.insert(*entry_id, project_path); + } + } + match state.dirty_files_tx.try_send(()) { + Err(err) if err.is_disconnected() => { + log::error!("bug: syntax indexing queue is disconnected"); } + _ => {} } }) .detach(); @@ -177,7 +311,7 @@ impl SyntaxIndex { .detach(); } - fn register_buffer(&mut self, buffer: &Entity, cx: &mut Context) { + fn register_buffer(&self, buffer: &Entity, cx: &mut Context) { let buffer_id = buffer.read(cx).remote_id(); cx.observe_release(buffer, move |this, _buffer, cx| { this.with_state(cx, move |state| { @@ -208,8 +342,11 @@ impl SyntaxIndex { } } - fn update_buffer(&mut self, buffer_entity: Entity, cx: &mut Context) { + fn update_buffer(&self, buffer_entity: Entity, cx: &mut Context) { let buffer = buffer_entity.read(cx); + if buffer.language().is_none() { + return; + } let Some(project_entry_id) = project::File::from_dyn(buffer.file()).and_then(|f| f.project_entry_id(cx)) @@ -229,70 +366,64 @@ impl SyntaxIndex { } }); - let parse_task = cx.background_spawn(async move { - let snapshot = snapshot_task.await?; - let rope = snapshot.text.as_rope().clone(); - - anyhow::Ok(( - declarations_in_buffer(&snapshot) - .into_iter() - .map(|item| { - ( - item.parent_index, - BufferDeclaration::from_outline(item, &rope), - ) - }) - .collect::>(), - rope, - )) - }); + let state = Arc::downgrade(&self.state); + let task = cx.background_spawn(async move { + // TODO: How to handle errors? + let Ok(snapshot) = snapshot_task.await else { + return; + }; + let rope = snapshot.text.as_rope(); - let task = cx.spawn({ - async move |this, cx| { - let Ok((declarations, rope)) = parse_task.await else { - return; - }; + let declarations = declarations_in_buffer(&snapshot) + .into_iter() + .map(|item| { + ( + item.parent_index, + BufferDeclaration::from_outline(item, &rope), + ) + }) + .collect::>(); - this.update(cx, move |this, cx| { - this.with_state(cx, move |state| { - let buffer_state = state - .buffers - .entry(buffer_id) - .or_insert_with(Default::default); - - SyntaxIndexState::remove_buffer_declarations( - &buffer_state.declarations, - &mut state.declarations, - &mut state.identifiers, - ); - - let mut new_ids = Vec::with_capacity(declarations.len()); - state.declarations.reserve(declarations.len()); - for (parent_index, mut declaration) in declarations { - declaration.parent = parent_index - .and_then(|ix| some_or_debug_panic(new_ids.get(ix).copied())); - - let identifier = declaration.identifier.clone(); - let declaration_id = state.declarations.insert(Declaration::Buffer { - rope: rope.clone(), - buffer_id, - declaration, - project_entry_id, - }); - new_ids.push(declaration_id); - - state - .identifiers - .entry(identifier) - .or_default() - .insert(declaration_id); - } + let Some(state) = state.upgrade() else { + return; + }; + let mut state = state.lock().await; + let state = state.deref_mut(); - buffer_state.declarations = new_ids; - }); - }) - .ok(); + let buffer_state = state + .buffers + .entry(buffer_id) + .or_insert_with(Default::default); + + SyntaxIndexState::remove_buffer_declarations( + &buffer_state.declarations, + &mut state.declarations, + &mut state.identifiers, + ); + + let mut new_ids = Vec::with_capacity(declarations.len()); + state.declarations.reserve(declarations.len()); + for (parent_index, mut declaration) in declarations { + declaration.parent = + parent_index.and_then(|ix| some_or_debug_panic(new_ids.get(ix).copied())); + + let identifier = declaration.identifier.clone(); + let declaration_id = state.declarations.insert(Declaration::Buffer { + rope: rope.clone(), + buffer_id, + declaration, + project_entry_id, + }); + new_ids.push(declaration_id); + + state + .identifiers + .entry(identifier) + .or_default() + .insert(declaration_id); } + + buffer_state.declarations = new_ids; }); self.with_state(cx, move |state| { @@ -309,28 +440,53 @@ impl SyntaxIndex { entry_id: ProjectEntryId, project_path: ProjectPath, cx: &mut Context, - ) { + ) -> Task<()> { let Some(project) = self.project.upgrade() else { - return; + return Task::ready(()); }; let project = project.read(cx); + + let language_registry = project.languages(); + let Some(available_language) = + language_registry.language_for_file_path(project_path.path.as_std_path()) + else { + return Task::ready(()); + }; + let language = if let Some(Ok(Ok(language))) = language_registry + .load_language(&available_language) + .now_or_never() + { + if language + .grammar() + .is_none_or(|grammar| grammar.outline_config.is_none()) + { + return Task::ready(()); + } + future::Either::Left(async { Ok(language) }) + } else { + let language_registry = language_registry.clone(); + future::Either::Right(async move { + anyhow::Ok( + language_registry + .load_language(&available_language) + .await??, + ) + }) + }; + let Some(worktree) = project.worktree_for_id(project_path.worktree_id, cx) else { - return; + return Task::ready(()); }; - let language_registry = project.languages().clone(); let snapshot_task = worktree.update(cx, |worktree, cx| { let load_task = worktree.load_file(&project_path.path, cx); cx.spawn(async move |_this, cx| { let loaded_file = load_task.await?; - let language = language_registry - .language_for_file_path(&project_path.path.as_std_path()) - .await - .ok(); + let language = language.await?; let buffer = cx.new(|cx| { let mut buffer = Buffer::local(loaded_file.text, cx); - buffer.set_language(language, cx); + buffer.set_language(Some(language), cx); buffer })?; @@ -343,75 +499,58 @@ impl SyntaxIndex { }) }); - let parse_task = cx.background_spawn(async move { - let snapshot = snapshot_task.await?; + let state = Arc::downgrade(&self.state); + cx.background_spawn(async move { + // TODO: How to handle errors? + let Ok(snapshot) = snapshot_task.await else { + return; + }; let rope = snapshot.as_rope(); let declarations = declarations_in_buffer(&snapshot) .into_iter() .map(|item| (item.parent_index, FileDeclaration::from_outline(item, rope))) .collect::>(); - anyhow::Ok(declarations) - }); - - let task = cx.spawn({ - async move |this, cx| { - // TODO: how to handle errors? - let Ok(declarations) = parse_task.await else { - return; - }; - this.update(cx, |this, cx| { - this.with_state(cx, move |state| { - let file_state = - state.files.entry(entry_id).or_insert_with(Default::default); - - for old_declaration_id in &file_state.declarations { - let Some(declaration) = state.declarations.remove(*old_declaration_id) - else { - debug_panic!("declaration not found"); - continue; - }; - if let Some(identifier_declarations) = - state.identifiers.get_mut(declaration.identifier()) - { - identifier_declarations.remove(old_declaration_id); - } - } - let mut new_ids = Vec::with_capacity(declarations.len()); - state.declarations.reserve(declarations.len()); - - for (parent_index, mut declaration) in declarations { - declaration.parent = parent_index - .and_then(|ix| some_or_debug_panic(new_ids.get(ix).copied())); - - let identifier = declaration.identifier.clone(); - let declaration_id = state.declarations.insert(Declaration::File { - project_entry_id: entry_id, - declaration, - }); - new_ids.push(declaration_id); - - state - .identifiers - .entry(identifier) - .or_default() - .insert(declaration_id); - } + let Some(state) = state.upgrade() else { + return; + }; + let mut state = state.lock().await; + let state = state.deref_mut(); - file_state.declarations = new_ids; - }); - }) - .ok(); + let file_state = state.files.entry(entry_id).or_insert_with(Default::default); + for old_declaration_id in &file_state.declarations { + let Some(declaration) = state.declarations.remove(*old_declaration_id) else { + debug_panic!("declaration not found"); + continue; + }; + if let Some(identifier_declarations) = + state.identifiers.get_mut(declaration.identifier()) + { + identifier_declarations.remove(old_declaration_id); + } } - }); - self.with_state(cx, move |state| { - state - .files - .entry(entry_id) - .or_insert_with(Default::default) - .task = Some(task); - }); + let mut new_ids = Vec::with_capacity(declarations.len()); + state.declarations.reserve(declarations.len()); + for (parent_index, mut declaration) in declarations { + declaration.parent = + parent_index.and_then(|ix| some_or_debug_panic(new_ids.get(ix).copied())); + + let identifier = declaration.identifier.clone(); + let declaration_id = state.declarations.insert(Declaration::File { + project_entry_id: entry_id, + declaration, + }); + new_ids.push(declaration_id); + + state + .identifiers + .entry(identifier) + .or_default() + .insert(declaration_id); + } + file_state.declarations = new_ids; + }) } } @@ -576,13 +715,13 @@ mod tests { let decls = index_state.declarations_for_identifier::<8>(&main); assert_eq!(decls.len(), 2); - let decl = expect_file_decl("c.rs", &decls[0].1, &project, cx); - assert_eq!(decl.identifier, main.clone()); - assert_eq!(decl.item_range, 32..280); - - let decl = expect_file_decl("a.rs", &decls[1].1, &project, cx); + let decl = expect_file_decl("a.rs", &decls[0].1, &project, cx); assert_eq!(decl.identifier, main); assert_eq!(decl.item_range, 0..98); + + let decl = expect_file_decl("c.rs", &decls[1].1, &project, cx); + assert_eq!(decl.identifier, main.clone()); + assert_eq!(decl.item_range, 32..280); }); } @@ -718,8 +857,8 @@ mod tests { cx.update(|cx| { let decls = index_state.declarations_for_identifier::<8>(&main); assert_eq!(decls.len(), 2); - expect_file_decl("c.rs", &decls[0].1, &project, cx); - expect_file_decl("a.rs", &decls[1].1, &project, cx); + expect_file_decl("a.rs", &decls[0].1, &project, cx); + expect_file_decl("c.rs", &decls[1].1, &project, cx); }); } @@ -852,7 +991,8 @@ mod tests { let lang_id = lang.id(); language_registry.add(Arc::new(lang)); - let index = cx.new(|cx| SyntaxIndex::new(&project, cx)); + let file_indexing_parallelism = 2; + let index = cx.new(|cx| SyntaxIndex::new(&project, file_indexing_parallelism, cx)); cx.run_until_parked(); (project, index, lang_id) diff --git a/crates/language/src/language_registry.rs b/crates/language/src/language_registry.rs index ed186062d825553660ce83a386897e742f75aca5..1e44660b891f62c37587fcc2d4bf83b040849af6 100644 --- a/crates/language/src/language_registry.rs +++ b/crates/language/src/language_registry.rs @@ -731,15 +731,19 @@ impl LanguageRegistry { ) } - pub fn language_for_file_path<'a>( + pub fn language_for_file_path(self: &Arc, path: &Path) -> Option { + self.language_for_file_internal(path, None, None) + } + + pub fn load_language_for_file_path<'a>( self: &Arc, path: &'a Path, ) -> impl Future>> + 'a { - let available_language = self.language_for_file_internal(path, None, None); + let language = self.language_for_file_path(path); let this = self.clone(); async move { - if let Some(language) = available_language { + if let Some(language) = language { this.load_language(&language).await? } else { Err(anyhow!(LanguageNotFound)) diff --git a/crates/markdown/src/markdown.rs b/crates/markdown/src/markdown.rs index 61846717237df76c5fcb0e91c2aad8e91cd683f9..c34ed69288e39c26d105877d76ee76c01c864c72 100644 --- a/crates/markdown/src/markdown.rs +++ b/crates/markdown/src/markdown.rs @@ -335,7 +335,7 @@ impl Markdown { for path in paths { if let Ok(language) = registry - .language_for_file_path(Path::new(path.as_ref())) + .load_language_for_file_path(Path::new(path.as_ref())) .await { languages_by_path.insert(path, language); diff --git a/crates/project/src/lsp_store.rs b/crates/project/src/lsp_store.rs index bd65f04c2f846808307d3cb142b1899d2e9f37ea..6a5d118951ed332097ec318ae2005f7cc3349586 100644 --- a/crates/project/src/lsp_store.rs +++ b/crates/project/src/lsp_store.rs @@ -12833,7 +12833,7 @@ async fn populate_labels_for_symbols( continue; }; let language = language_registry - .language_for_file_path(Path::new(file_name)) + .load_language_for_file_path(Path::new(file_name)) .await .ok() .or_else(|| { diff --git a/crates/project/src/project_tests.rs b/crates/project/src/project_tests.rs index 4a27d88a0193ea9a68a5499bf2430f1ba921b530..4a1f89b619d2fa8644971a7237995f2368125792 100644 --- a/crates/project/src/project_tests.rs +++ b/crates/project/src/project_tests.rs @@ -194,7 +194,7 @@ async fn test_editorconfig_support(cx: &mut gpui::TestAppContext) { let file_language = project .read(cx) .languages() - .language_for_file_path(file.path.as_std_path()); + .load_language_for_file_path(file.path.as_std_path()); let file_language = cx .background_executor() .block(file_language) diff --git a/crates/worktree/src/worktree.rs b/crates/worktree/src/worktree.rs index eae6986c0084c23bd1581cc1605ca714d877bbf1..f5b8448ba0b10da8361bd4b62a5f00e565e3034c 100644 --- a/crates/worktree/src/worktree.rs +++ b/crates/worktree/src/worktree.rs @@ -1103,7 +1103,7 @@ impl LocalWorktree { } }); self._background_scanner_tasks = vec![background_scanner, scan_state_updater]; - self.is_scanning = watch::channel_with(true); + *self.is_scanning.0.borrow_mut() = true; } fn set_snapshot( diff --git a/crates/zeta2/src/zeta2.rs b/crates/zeta2/src/zeta2.rs index 0aaf4c9d35e9e00e066a716bc645b6f4ad56480a..d32b2589135e2d77f9285706254d1e3c05f7f3e2 100644 --- a/crates/zeta2/src/zeta2.rs +++ b/crates/zeta2/src/zeta2.rs @@ -54,6 +54,7 @@ pub const DEFAULT_OPTIONS: ZetaOptions = ZetaOptions { max_prompt_bytes: DEFAULT_MAX_PROMPT_BYTES, max_diagnostic_bytes: 2048, prompt_format: PromptFormat::DEFAULT, + file_indexing_parallelism: 1, }; #[derive(Clone)] @@ -78,6 +79,7 @@ pub struct ZetaOptions { pub max_prompt_bytes: usize, pub max_diagnostic_bytes: usize, pub prompt_format: predict_edits_v3::PromptFormat, + pub file_indexing_parallelism: usize, } pub struct PredictionDebugInfo { @@ -242,7 +244,9 @@ impl Zeta { self.projects .entry(project.entity_id()) .or_insert_with(|| ZetaProject { - syntax_index: cx.new(|cx| SyntaxIndex::new(project, cx)), + syntax_index: cx.new(|cx| { + SyntaxIndex::new(project, self.options.file_indexing_parallelism, cx) + }), events: VecDeque::new(), registered_buffers: HashMap::new(), current_prediction: None, @@ -817,6 +821,18 @@ impl Zeta { }) }) } + + pub fn wait_for_initial_indexing( + &mut self, + project: &Entity, + cx: &mut App, + ) -> Task> { + let zeta_project = self.get_or_init_zeta_project(project, cx); + zeta_project + .syntax_index + .read(cx) + .wait_for_initial_file_indexing(cx) + } } #[derive(Error, Debug)] diff --git a/crates/zeta2_tools/src/zeta2_tools.rs b/crates/zeta2_tools/src/zeta2_tools.rs index e553d941325e1bf4cd4dc0db93175cac51514927..a299726c64b104e59cab9ba4609316c49d715876 100644 --- a/crates/zeta2_tools/src/zeta2_tools.rs +++ b/crates/zeta2_tools/src/zeta2_tools.rs @@ -252,6 +252,7 @@ impl Zeta2Inspector { max_prompt_bytes: number_input_value(&this.max_prompt_bytes_input, cx), max_diagnostic_bytes: zeta_options.max_diagnostic_bytes, prompt_format: zeta_options.prompt_format, + file_indexing_parallelism: zeta_options.file_indexing_parallelism, }, cx, ); diff --git a/crates/zeta_cli/Cargo.toml b/crates/zeta_cli/Cargo.toml index 7132340a4d884d4e85c8e67330ca01fb6315b514..87e8d10ea56eb8dd378a1d56defcb6cd952436d8 100644 --- a/crates/zeta_cli/Cargo.toml +++ b/crates/zeta_cli/Cargo.toml @@ -47,3 +47,4 @@ watch.workspace = true workspace-hack.workspace = true zeta.workspace = true zeta2.workspace = true +zlog.workspace = true diff --git a/crates/zeta_cli/src/main.rs b/crates/zeta_cli/src/main.rs index 8895603f2984f4c411611980d60145d2bfebcbf9..599b025659f15e5349cfe2e9d2821342a6c0e72a 100644 --- a/crates/zeta_cli/src/main.rs +++ b/crates/zeta_cli/src/main.rs @@ -80,6 +80,8 @@ struct Zeta2Args { prompt_format: PromptFormat, #[arg(long, value_enum, default_value_t = Default::default())] output_format: OutputFormat, + #[arg(long, default_value_t = 42)] + file_indexing_parallelism: usize, } #[derive(clap::ValueEnum, Default, Debug, Clone)] @@ -244,13 +246,19 @@ async fn get_context( }; if let Some(zeta2_args) = zeta2_args { - Ok(GetContextOutput::Zeta2( - cx.update(|cx| { + // wait for worktree scan before starting zeta2 so that wait_for_initial_indexing waits for + // the whole worktree. + worktree + .read_with(cx, |worktree, _cx| { + worktree.as_local().unwrap().scan_complete() + })? + .await; + let output = cx + .update(|cx| { let zeta = cx.new(|cx| { zeta2::Zeta::new(app_state.client.clone(), app_state.user_store.clone(), cx) }); - zeta.update(cx, |zeta, cx| { - zeta.register_buffer(&buffer, &project, cx); + let indexing_done_task = zeta.update(cx, |zeta, cx| { zeta.set_options(zeta2::ZetaOptions { excerpt: EditPredictionExcerptOptions { max_bytes: zeta2_args.max_excerpt_bytes, @@ -261,12 +269,13 @@ async fn get_context( max_diagnostic_bytes: zeta2_args.max_diagnostic_bytes, max_prompt_bytes: zeta2_args.max_prompt_bytes, prompt_format: zeta2_args.prompt_format.into(), - }) + file_indexing_parallelism: zeta2_args.file_indexing_parallelism, + }); + zeta.register_buffer(&buffer, &project, cx); + zeta.wait_for_initial_indexing(&project, cx) }); - // TODO: Actually wait for indexing. - let timer = cx.background_executor().timer(Duration::from_secs(5)); cx.spawn(async move |cx| { - timer.await; + indexing_done_task.await?; let request = zeta .update(cx, |zeta, cx| { let cursor = buffer.read(cx).snapshot().anchor_before(clipped_cursor); @@ -288,8 +297,8 @@ async fn get_context( } }) })? - .await?, - )) + .await?; + Ok(GetContextOutput::Zeta2(output)) } else { let prompt_for_events = move || (events, 0); Ok(GetContextOutput::Zeta1( @@ -414,7 +423,7 @@ pub fn wait_for_lang_server( ]; cx.spawn(async move |cx| { - let timeout = cx.background_executor().timer(Duration::new(60 * 5, 0)); + let timeout = cx.background_executor().timer(Duration::from_secs(60 * 5)); let result = futures::select! { _ = rx.next() => { println!("{}⚑ Language server idle", log_prefix); @@ -430,13 +439,14 @@ pub fn wait_for_lang_server( } fn main() { + zlog::init(); + zlog::init_output_stderr(); let args = ZetaCliArgs::parse(); let http_client = Arc::new(ReqwestClient::new()); let app = Application::headless().with_http_client(http_client); app.run(move |cx| { let app_state = Arc::new(headless::init(cx)); - let is_zeta2_context_command = matches!(args.command, Commands::Zeta2Context { .. }); cx.spawn(async move |cx| { let result = match args.command { Commands::Zeta2Context { @@ -498,10 +508,6 @@ fn main() { match result { Ok(output) => { println!("{}", output); - // TODO: Remove this once the 5 second delay is properly replaced. - if is_zeta2_context_command { - eprintln!("Note that zeta_cli doesn't yet wait for indexing, instead waits 5 seconds."); - } let _ = cx.update(|cx| cx.quit()); } Err(e) => {