@@ -1,13 +1,18 @@
+use anyhow::{Result, anyhow};
use collections::{HashMap, HashSet};
+use futures::channel::mpsc;
use futures::lock::Mutex;
-use gpui::{App, AppContext as _, Context, Entity, Task, WeakEntity};
+use futures::{FutureExt as _, StreamExt, future};
+use gpui::{App, AppContext as _, AsyncApp, Context, Entity, Task, WeakEntity};
+use itertools::Itertools;
use language::{Buffer, BufferEvent};
+use postage::stream::Stream as _;
use project::buffer_store::{BufferStore, BufferStoreEvent};
use project::worktree_store::{WorktreeStore, WorktreeStoreEvent};
use project::{PathChange, Project, ProjectEntryId, ProjectPath};
use slotmap::SlotMap;
use std::iter;
-use std::ops::Range;
+use std::ops::{DerefMut, Range};
use std::sync::Arc;
use text::BufferId;
use util::{RangeExt as _, debug_panic, some_or_debug_panic};
@@ -17,42 +22,56 @@ use crate::declaration::{
};
use crate::outline::declarations_in_buffer;
+// TODO
+//
+// * Also queue / debounce buffer changes. A challenge for this is that use of
+// `buffer_declarations_containing_range` assumes that the index is always immediately up to date.
+//
+// * Add a per language configuration for skipping indexing.
+
// Potential future improvements:
//
+// * Prevent indexing of a large file from blocking the queue.
+//
// * Send multiple selected excerpt ranges. Challenge is that excerpt ranges influence which
// references are present and their scores.
// Potential future optimizations:
//
-// * Cache of buffers for files
+// * Index files on multiple threads in Zed (currently only parallel for the CLI). Adding some kind
+// of priority system to the background executor could help - it's single threaded for now to avoid
+// interfering with other work.
//
-// * Parse files directly instead of loading into a Rope. Make SyntaxMap generic to handle embedded
-// languages? Will also need to find line boundaries, but that can be done by scanning characters in
-// the flat representation.
+// * Parse files directly instead of loading into a Rope.
+//
+// - This would allow the task handling dirty_files to be done entirely on the background executor.
+//
+// - Make SyntaxMap generic to handle embedded languages? Will also need to find line boundaries,
+// but that can be done by scanning characters in the flat representation.
//
// * Use something similar to slotmap without key versions.
//
// * Concurrent slotmap
-//
-// * Use queue for parsing
pub struct SyntaxIndex {
state: Arc<Mutex<SyntaxIndexState>>,
project: WeakEntity<Project>,
+ initial_file_indexing_done_rx: postage::watch::Receiver<bool>,
}
-#[derive(Default)]
pub struct SyntaxIndexState {
declarations: SlotMap<DeclarationId, Declaration>,
identifiers: HashMap<Identifier, HashSet<DeclarationId>>,
files: HashMap<ProjectEntryId, FileState>,
buffers: HashMap<BufferId, BufferState>,
+ dirty_files: HashMap<ProjectEntryId, ProjectPath>,
+ dirty_files_tx: mpsc::Sender<()>,
+ _file_indexing_task: Option<Task<()>>,
}
#[derive(Debug, Default)]
struct FileState {
declarations: Vec<DeclarationId>,
- task: Option<Task<()>>,
}
#[derive(Default)]
@@ -62,33 +81,107 @@ struct BufferState {
}
impl SyntaxIndex {
- pub fn new(project: &Entity<Project>, cx: &mut Context<Self>) -> Self {
- let mut this = Self {
+ pub fn new(
+ project: &Entity<Project>,
+ file_indexing_parallelism: usize,
+ cx: &mut Context<Self>,
+ ) -> Self {
+ assert!(file_indexing_parallelism > 0);
+ let (dirty_files_tx, mut dirty_files_rx) = mpsc::channel::<()>(1);
+ let (mut initial_file_indexing_done_tx, initial_file_indexing_done_rx) =
+ postage::watch::channel();
+
+ let initial_state = SyntaxIndexState {
+ declarations: SlotMap::default(),
+ identifiers: HashMap::default(),
+ files: HashMap::default(),
+ buffers: HashMap::default(),
+ dirty_files: HashMap::default(),
+ dirty_files_tx,
+ _file_indexing_task: None,
+ };
+ let this = Self {
project: project.downgrade(),
- state: Arc::new(Mutex::new(SyntaxIndexState::default())),
+ state: Arc::new(Mutex::new(initial_state)),
+ initial_file_indexing_done_rx,
};
let worktree_store = project.read(cx).worktree_store();
- cx.subscribe(&worktree_store, Self::handle_worktree_store_event)
- .detach();
-
- for worktree in worktree_store
+ let initial_worktree_snapshots = worktree_store
.read(cx)
.worktrees()
.map(|w| w.read(cx).snapshot())
- .collect::<Vec<_>>()
- {
- for entry in worktree.files(false, 0) {
- this.update_file(
- entry.id,
- ProjectPath {
- worktree_id: worktree.id(),
- path: entry.path.clone(),
- },
- cx,
- );
- }
- }
+ .collect::<Vec<_>>();
+ this.state.try_lock().unwrap()._file_indexing_task =
+ Some(cx.spawn(async move |this, cx| {
+ let snapshots_file_count = initial_worktree_snapshots
+ .iter()
+ .map(|worktree| worktree.file_count())
+ .sum::<usize>();
+ let chunk_size = snapshots_file_count.div_ceil(file_indexing_parallelism);
+ let chunk_count = snapshots_file_count.div_ceil(chunk_size);
+ let file_chunks = initial_worktree_snapshots
+ .iter()
+ .flat_map(|worktree| {
+ let worktree_id = worktree.id();
+ worktree.files(false, 0).map(move |entry| {
+ (
+ entry.id,
+ ProjectPath {
+ worktree_id,
+ path: entry.path.clone(),
+ },
+ )
+ })
+ })
+ .chunks(chunk_size);
+
+ let mut tasks = Vec::with_capacity(chunk_count);
+ for chunk in file_chunks.into_iter() {
+ tasks.push(Self::update_dirty_files(
+ &this,
+ chunk.into_iter().collect(),
+ cx.clone(),
+ ));
+ }
+ futures::future::join_all(tasks).await;
+
+ log::info!("Finished initial file indexing");
+ *initial_file_indexing_done_tx.borrow_mut() = true;
+
+ let Ok(state) = this.read_with(cx, |this, _cx| this.state.clone()) else {
+ return;
+ };
+ while dirty_files_rx.next().await.is_some() {
+ let mut state = state.lock().await;
+ let was_underused = state.dirty_files.capacity() > 255
+ && state.dirty_files.len() * 8 < state.dirty_files.capacity();
+ let dirty_files = state.dirty_files.drain().collect::<Vec<_>>();
+ if was_underused {
+ state.dirty_files.shrink_to_fit();
+ }
+ drop(state);
+ if dirty_files.is_empty() {
+ continue;
+ }
+
+ let chunk_size = dirty_files.len().div_ceil(file_indexing_parallelism);
+ let chunk_count = dirty_files.len().div_ceil(chunk_size);
+ let mut tasks = Vec::with_capacity(chunk_count);
+ let chunks = dirty_files.into_iter().chunks(chunk_size);
+ for chunk in chunks.into_iter() {
+ tasks.push(Self::update_dirty_files(
+ &this,
+ chunk.into_iter().collect(),
+ cx.clone(),
+ ));
+ }
+ futures::future::join_all(tasks).await;
+ }
+ }));
+
+ cx.subscribe(&worktree_store, Self::handle_worktree_store_event)
+ .detach();
let buffer_store = project.read(cx).buffer_store().clone();
for buffer in buffer_store.read(cx).buffers().collect::<Vec<_>>() {
@@ -100,6 +193,42 @@ impl SyntaxIndex {
this
}
+ async fn update_dirty_files(
+ this: &WeakEntity<Self>,
+ dirty_files: Vec<(ProjectEntryId, ProjectPath)>,
+ mut cx: AsyncApp,
+ ) {
+ for (entry_id, project_path) in dirty_files {
+ let Ok(task) = this.update(&mut cx, |this, cx| {
+ this.update_file(entry_id, project_path, cx)
+ }) else {
+ return;
+ };
+ task.await;
+ }
+ }
+
+ pub fn wait_for_initial_file_indexing(&self, cx: &App) -> Task<Result<()>> {
+ if *self.initial_file_indexing_done_rx.borrow() {
+ Task::ready(Ok(()))
+ } else {
+ let mut rx = self.initial_file_indexing_done_rx.clone();
+ cx.background_spawn(async move {
+ loop {
+ match rx.recv().await {
+ Some(true) => return Ok(()),
+ Some(false) => {}
+ None => {
+ return Err(anyhow!(
+ "SyntaxIndex dropped while waiting for initial file indexing"
+ ));
+ }
+ }
+ }
+ })
+ }
+ }
+
fn handle_worktree_store_event(
&mut self,
_worktree_store: Entity<WorktreeStore>,
@@ -112,21 +241,26 @@ impl SyntaxIndex {
let state = Arc::downgrade(&self.state);
let worktree_id = *worktree_id;
let updated_entries_set = updated_entries_set.clone();
- cx.spawn(async move |this, cx| {
+ cx.background_spawn(async move {
let Some(state) = state.upgrade() else { return };
+ let mut state = state.lock().await;
for (path, entry_id, path_change) in updated_entries_set.iter() {
if let PathChange::Removed = path_change {
- state.lock().await.files.remove(entry_id);
+ state.files.remove(entry_id);
+ state.dirty_files.remove(entry_id);
} else {
let project_path = ProjectPath {
worktree_id,
path: path.clone(),
};
- this.update(cx, |this, cx| {
- this.update_file(*entry_id, project_path, cx);
- })
- .ok();
+ state.dirty_files.insert(*entry_id, project_path);
+ }
+ }
+ match state.dirty_files_tx.try_send(()) {
+ Err(err) if err.is_disconnected() => {
+ log::error!("bug: syntax indexing queue is disconnected");
}
+ _ => {}
}
})
.detach();
@@ -177,7 +311,7 @@ impl SyntaxIndex {
.detach();
}
- fn register_buffer(&mut self, buffer: &Entity<Buffer>, cx: &mut Context<Self>) {
+ fn register_buffer(&self, buffer: &Entity<Buffer>, cx: &mut Context<Self>) {
let buffer_id = buffer.read(cx).remote_id();
cx.observe_release(buffer, move |this, _buffer, cx| {
this.with_state(cx, move |state| {
@@ -208,8 +342,11 @@ impl SyntaxIndex {
}
}
- fn update_buffer(&mut self, buffer_entity: Entity<Buffer>, cx: &mut Context<Self>) {
+ fn update_buffer(&self, buffer_entity: Entity<Buffer>, cx: &mut Context<Self>) {
let buffer = buffer_entity.read(cx);
+ if buffer.language().is_none() {
+ return;
+ }
let Some(project_entry_id) =
project::File::from_dyn(buffer.file()).and_then(|f| f.project_entry_id(cx))
@@ -229,70 +366,64 @@ impl SyntaxIndex {
}
});
- let parse_task = cx.background_spawn(async move {
- let snapshot = snapshot_task.await?;
- let rope = snapshot.text.as_rope().clone();
-
- anyhow::Ok((
- declarations_in_buffer(&snapshot)
- .into_iter()
- .map(|item| {
- (
- item.parent_index,
- BufferDeclaration::from_outline(item, &rope),
- )
- })
- .collect::<Vec<_>>(),
- rope,
- ))
- });
+ let state = Arc::downgrade(&self.state);
+ let task = cx.background_spawn(async move {
+ // TODO: How to handle errors?
+ let Ok(snapshot) = snapshot_task.await else {
+ return;
+ };
+ let rope = snapshot.text.as_rope();
- let task = cx.spawn({
- async move |this, cx| {
- let Ok((declarations, rope)) = parse_task.await else {
- return;
- };
+ let declarations = declarations_in_buffer(&snapshot)
+ .into_iter()
+ .map(|item| {
+ (
+ item.parent_index,
+ BufferDeclaration::from_outline(item, &rope),
+ )
+ })
+ .collect::<Vec<_>>();
- this.update(cx, move |this, cx| {
- this.with_state(cx, move |state| {
- let buffer_state = state
- .buffers
- .entry(buffer_id)
- .or_insert_with(Default::default);
-
- SyntaxIndexState::remove_buffer_declarations(
- &buffer_state.declarations,
- &mut state.declarations,
- &mut state.identifiers,
- );
-
- let mut new_ids = Vec::with_capacity(declarations.len());
- state.declarations.reserve(declarations.len());
- for (parent_index, mut declaration) in declarations {
- declaration.parent = parent_index
- .and_then(|ix| some_or_debug_panic(new_ids.get(ix).copied()));
-
- let identifier = declaration.identifier.clone();
- let declaration_id = state.declarations.insert(Declaration::Buffer {
- rope: rope.clone(),
- buffer_id,
- declaration,
- project_entry_id,
- });
- new_ids.push(declaration_id);
-
- state
- .identifiers
- .entry(identifier)
- .or_default()
- .insert(declaration_id);
- }
+ let Some(state) = state.upgrade() else {
+ return;
+ };
+ let mut state = state.lock().await;
+ let state = state.deref_mut();
- buffer_state.declarations = new_ids;
- });
- })
- .ok();
+ let buffer_state = state
+ .buffers
+ .entry(buffer_id)
+ .or_insert_with(Default::default);
+
+ SyntaxIndexState::remove_buffer_declarations(
+ &buffer_state.declarations,
+ &mut state.declarations,
+ &mut state.identifiers,
+ );
+
+ let mut new_ids = Vec::with_capacity(declarations.len());
+ state.declarations.reserve(declarations.len());
+ for (parent_index, mut declaration) in declarations {
+ declaration.parent =
+ parent_index.and_then(|ix| some_or_debug_panic(new_ids.get(ix).copied()));
+
+ let identifier = declaration.identifier.clone();
+ let declaration_id = state.declarations.insert(Declaration::Buffer {
+ rope: rope.clone(),
+ buffer_id,
+ declaration,
+ project_entry_id,
+ });
+ new_ids.push(declaration_id);
+
+ state
+ .identifiers
+ .entry(identifier)
+ .or_default()
+ .insert(declaration_id);
}
+
+ buffer_state.declarations = new_ids;
});
self.with_state(cx, move |state| {
@@ -309,28 +440,53 @@ impl SyntaxIndex {
entry_id: ProjectEntryId,
project_path: ProjectPath,
cx: &mut Context<Self>,
- ) {
+ ) -> Task<()> {
let Some(project) = self.project.upgrade() else {
- return;
+ return Task::ready(());
};
let project = project.read(cx);
+
+ let language_registry = project.languages();
+ let Some(available_language) =
+ language_registry.language_for_file_path(project_path.path.as_std_path())
+ else {
+ return Task::ready(());
+ };
+ let language = if let Some(Ok(Ok(language))) = language_registry
+ .load_language(&available_language)
+ .now_or_never()
+ {
+ if language
+ .grammar()
+ .is_none_or(|grammar| grammar.outline_config.is_none())
+ {
+ return Task::ready(());
+ }
+ future::Either::Left(async { Ok(language) })
+ } else {
+ let language_registry = language_registry.clone();
+ future::Either::Right(async move {
+ anyhow::Ok(
+ language_registry
+ .load_language(&available_language)
+ .await??,
+ )
+ })
+ };
+
let Some(worktree) = project.worktree_for_id(project_path.worktree_id, cx) else {
- return;
+ return Task::ready(());
};
- let language_registry = project.languages().clone();
let snapshot_task = worktree.update(cx, |worktree, cx| {
let load_task = worktree.load_file(&project_path.path, cx);
cx.spawn(async move |_this, cx| {
let loaded_file = load_task.await?;
- let language = language_registry
- .language_for_file_path(&project_path.path.as_std_path())
- .await
- .ok();
+ let language = language.await?;
let buffer = cx.new(|cx| {
let mut buffer = Buffer::local(loaded_file.text, cx);
- buffer.set_language(language, cx);
+ buffer.set_language(Some(language), cx);
buffer
})?;
@@ -343,75 +499,58 @@ impl SyntaxIndex {
})
});
- let parse_task = cx.background_spawn(async move {
- let snapshot = snapshot_task.await?;
+ let state = Arc::downgrade(&self.state);
+ cx.background_spawn(async move {
+ // TODO: How to handle errors?
+ let Ok(snapshot) = snapshot_task.await else {
+ return;
+ };
let rope = snapshot.as_rope();
let declarations = declarations_in_buffer(&snapshot)
.into_iter()
.map(|item| (item.parent_index, FileDeclaration::from_outline(item, rope)))
.collect::<Vec<_>>();
- anyhow::Ok(declarations)
- });
-
- let task = cx.spawn({
- async move |this, cx| {
- // TODO: how to handle errors?
- let Ok(declarations) = parse_task.await else {
- return;
- };
- this.update(cx, |this, cx| {
- this.with_state(cx, move |state| {
- let file_state =
- state.files.entry(entry_id).or_insert_with(Default::default);
-
- for old_declaration_id in &file_state.declarations {
- let Some(declaration) = state.declarations.remove(*old_declaration_id)
- else {
- debug_panic!("declaration not found");
- continue;
- };
- if let Some(identifier_declarations) =
- state.identifiers.get_mut(declaration.identifier())
- {
- identifier_declarations.remove(old_declaration_id);
- }
- }
- let mut new_ids = Vec::with_capacity(declarations.len());
- state.declarations.reserve(declarations.len());
-
- for (parent_index, mut declaration) in declarations {
- declaration.parent = parent_index
- .and_then(|ix| some_or_debug_panic(new_ids.get(ix).copied()));
-
- let identifier = declaration.identifier.clone();
- let declaration_id = state.declarations.insert(Declaration::File {
- project_entry_id: entry_id,
- declaration,
- });
- new_ids.push(declaration_id);
-
- state
- .identifiers
- .entry(identifier)
- .or_default()
- .insert(declaration_id);
- }
+ let Some(state) = state.upgrade() else {
+ return;
+ };
+ let mut state = state.lock().await;
+ let state = state.deref_mut();
- file_state.declarations = new_ids;
- });
- })
- .ok();
+ let file_state = state.files.entry(entry_id).or_insert_with(Default::default);
+ for old_declaration_id in &file_state.declarations {
+ let Some(declaration) = state.declarations.remove(*old_declaration_id) else {
+ debug_panic!("declaration not found");
+ continue;
+ };
+ if let Some(identifier_declarations) =
+ state.identifiers.get_mut(declaration.identifier())
+ {
+ identifier_declarations.remove(old_declaration_id);
+ }
}
- });
- self.with_state(cx, move |state| {
- state
- .files
- .entry(entry_id)
- .or_insert_with(Default::default)
- .task = Some(task);
- });
+ let mut new_ids = Vec::with_capacity(declarations.len());
+ state.declarations.reserve(declarations.len());
+ for (parent_index, mut declaration) in declarations {
+ declaration.parent =
+ parent_index.and_then(|ix| some_or_debug_panic(new_ids.get(ix).copied()));
+
+ let identifier = declaration.identifier.clone();
+ let declaration_id = state.declarations.insert(Declaration::File {
+ project_entry_id: entry_id,
+ declaration,
+ });
+ new_ids.push(declaration_id);
+
+ state
+ .identifiers
+ .entry(identifier)
+ .or_default()
+ .insert(declaration_id);
+ }
+ file_state.declarations = new_ids;
+ })
}
}
@@ -576,13 +715,13 @@ mod tests {
let decls = index_state.declarations_for_identifier::<8>(&main);
assert_eq!(decls.len(), 2);
- let decl = expect_file_decl("c.rs", &decls[0].1, &project, cx);
- assert_eq!(decl.identifier, main.clone());
- assert_eq!(decl.item_range, 32..280);
-
- let decl = expect_file_decl("a.rs", &decls[1].1, &project, cx);
+ let decl = expect_file_decl("a.rs", &decls[0].1, &project, cx);
assert_eq!(decl.identifier, main);
assert_eq!(decl.item_range, 0..98);
+
+ let decl = expect_file_decl("c.rs", &decls[1].1, &project, cx);
+ assert_eq!(decl.identifier, main.clone());
+ assert_eq!(decl.item_range, 32..280);
});
}
@@ -718,8 +857,8 @@ mod tests {
cx.update(|cx| {
let decls = index_state.declarations_for_identifier::<8>(&main);
assert_eq!(decls.len(), 2);
- expect_file_decl("c.rs", &decls[0].1, &project, cx);
- expect_file_decl("a.rs", &decls[1].1, &project, cx);
+ expect_file_decl("a.rs", &decls[0].1, &project, cx);
+ expect_file_decl("c.rs", &decls[1].1, &project, cx);
});
}
@@ -852,7 +991,8 @@ mod tests {
let lang_id = lang.id();
language_registry.add(Arc::new(lang));
- let index = cx.new(|cx| SyntaxIndex::new(&project, cx));
+ let file_indexing_parallelism = 2;
+ let index = cx.new(|cx| SyntaxIndex::new(&project, file_indexing_parallelism, cx));
cx.run_until_parked();
(project, index, lang_id)
@@ -80,6 +80,8 @@ struct Zeta2Args {
prompt_format: PromptFormat,
#[arg(long, value_enum, default_value_t = Default::default())]
output_format: OutputFormat,
+ #[arg(long, default_value_t = 42)]
+ file_indexing_parallelism: usize,
}
#[derive(clap::ValueEnum, Default, Debug, Clone)]
@@ -244,13 +246,19 @@ async fn get_context(
};
if let Some(zeta2_args) = zeta2_args {
- Ok(GetContextOutput::Zeta2(
- cx.update(|cx| {
+ // wait for worktree scan before starting zeta2 so that wait_for_initial_indexing waits for
+ // the whole worktree.
+ worktree
+ .read_with(cx, |worktree, _cx| {
+ worktree.as_local().unwrap().scan_complete()
+ })?
+ .await;
+ let output = cx
+ .update(|cx| {
let zeta = cx.new(|cx| {
zeta2::Zeta::new(app_state.client.clone(), app_state.user_store.clone(), cx)
});
- zeta.update(cx, |zeta, cx| {
- zeta.register_buffer(&buffer, &project, cx);
+ let indexing_done_task = zeta.update(cx, |zeta, cx| {
zeta.set_options(zeta2::ZetaOptions {
excerpt: EditPredictionExcerptOptions {
max_bytes: zeta2_args.max_excerpt_bytes,
@@ -261,12 +269,13 @@ async fn get_context(
max_diagnostic_bytes: zeta2_args.max_diagnostic_bytes,
max_prompt_bytes: zeta2_args.max_prompt_bytes,
prompt_format: zeta2_args.prompt_format.into(),
- })
+ file_indexing_parallelism: zeta2_args.file_indexing_parallelism,
+ });
+ zeta.register_buffer(&buffer, &project, cx);
+ zeta.wait_for_initial_indexing(&project, cx)
});
- // TODO: Actually wait for indexing.
- let timer = cx.background_executor().timer(Duration::from_secs(5));
cx.spawn(async move |cx| {
- timer.await;
+ indexing_done_task.await?;
let request = zeta
.update(cx, |zeta, cx| {
let cursor = buffer.read(cx).snapshot().anchor_before(clipped_cursor);
@@ -288,8 +297,8 @@ async fn get_context(
}
})
})?
- .await?,
- ))
+ .await?;
+ Ok(GetContextOutput::Zeta2(output))
} else {
let prompt_for_events = move || (events, 0);
Ok(GetContextOutput::Zeta1(
@@ -414,7 +423,7 @@ pub fn wait_for_lang_server(
];
cx.spawn(async move |cx| {
- let timeout = cx.background_executor().timer(Duration::new(60 * 5, 0));
+ let timeout = cx.background_executor().timer(Duration::from_secs(60 * 5));
let result = futures::select! {
_ = rx.next() => {
println!("{}⚑ Language server idle", log_prefix);
@@ -430,13 +439,14 @@ pub fn wait_for_lang_server(
}
fn main() {
+ zlog::init();
+ zlog::init_output_stderr();
let args = ZetaCliArgs::parse();
let http_client = Arc::new(ReqwestClient::new());
let app = Application::headless().with_http_client(http_client);
app.run(move |cx| {
let app_state = Arc::new(headless::init(cx));
- let is_zeta2_context_command = matches!(args.command, Commands::Zeta2Context { .. });
cx.spawn(async move |cx| {
let result = match args.command {
Commands::Zeta2Context {
@@ -498,10 +508,6 @@ fn main() {
match result {
Ok(output) => {
println!("{}", output);
- // TODO: Remove this once the 5 second delay is properly replaced.
- if is_zeta2_context_command {
- eprintln!("Note that zeta_cli doesn't yet wait for indexing, instead waits 5 seconds.");
- }
let _ = cx.update(|cx| cx.quit());
}
Err(e) => {