From a693d445532629f5fe506485e3671063c040777b Mon Sep 17 00:00:00 2001 From: Agus Zubiaga Date: Fri, 10 Oct 2025 12:44:36 -0300 Subject: [PATCH] zeta2 cli: Resumable LSP declarations gathering (#39828) Gathering LSP declarations in zeta_cli can take a really long time for big repos and has to be started from scratch if interrupted. Instead of writing the cache file once we have walked the whole worktree, we'll now do so incrementally as we complete each file. On subsequent runs, we'll load as many valid declarations as has been previously written to the cache, and then continue to request the rest from the LSP which will append to the existing file as it makes progress. If the last cache entry is incomplete, we'll truncate the cache file to the end of the last valid line and continue from there, so we can just `ctrl-c` without breaking resumability. Release Notes: - N/A --- crates/zeta_cli/src/main.rs | 144 +++++++++++++++++++++++++++++------- 1 file changed, 118 insertions(+), 26 deletions(-) diff --git a/crates/zeta_cli/src/main.rs b/crates/zeta_cli/src/main.rs index 236c5eb4572cf451a3efd435b9d0ad20d4380b72..7d07a01815fb38c9b5a45f56491a90c42bbae456 100644 --- a/crates/zeta_cli/src/main.rs +++ b/crates/zeta_cli/src/main.rs @@ -28,7 +28,7 @@ use std::fmt::{self, Display}; use std::fs::File; use std::hash::Hash; use std::hash::Hasher; -use std::io::Write as _; +use std::io::{BufRead, BufReader, BufWriter, Write as _}; use std::ops::Range; use std::path::{Path, PathBuf}; use std::process::exit; @@ -529,28 +529,84 @@ pub async fn retrieval_stats( let file_snapshots = Arc::new(file_snapshots); let lsp_definitions_path = std::env::current_dir()?.join(format!( - "target/zeta2-lsp-definitions-{:x}.json", + "target/zeta2-lsp-definitions-{:x}.jsonl", files_hash )); - let lsp_definitions: Arc<_> = if std::fs::exists(&lsp_definitions_path)? { + let mut lsp_definitions = HashMap::default(); + let mut lsp_files = 0; + + if std::fs::exists(&lsp_definitions_path)? { log::info!( "Using cached LSP definitions from {}", lsp_definitions_path.display() ); - serde_json::from_reader(File::open(&lsp_definitions_path)?)? - } else { - log::warn!( - "No LSP definitions found populating {}", - lsp_definitions_path.display() - ); - let lsp_definitions = - gather_lsp_definitions(&filtered_files, &worktree, &project, cx).await?; - serde_json::to_writer_pretty(File::create(&lsp_definitions_path)?, &lsp_definitions)?; - lsp_definitions + + let file = File::options() + .read(true) + .write(true) + .open(&lsp_definitions_path)?; + let lines = BufReader::new(&file).lines(); + let mut valid_len: usize = 0; + + for (line, expected_file) in lines.zip(files.iter()) { + let line = line?; + let FileLspDefinitions { path, references } = match serde_json::from_str(&line) { + Ok(ok) => ok, + Err(_) => { + log::error!("Found invalid cache line. Truncating to #{lsp_files}.",); + file.set_len(valid_len as u64)?; + break; + } + }; + let expected_path = expected_file.snapshot.file().unwrap().path().as_unix_str(); + if expected_path != path.as_ref() { + log::error!( + "Expected file #{} to be {expected_path}, but found {path}. Truncating to #{lsp_files}.", + lsp_files + 1 + ); + file.set_len(valid_len as u64)?; + break; + } + for (point, ranges) in references { + let Ok(path) = RelPath::new(Path::new(path.as_ref()), PathStyle::Posix) else { + log::warn!("Invalid path: {}", path); + continue; + }; + lsp_definitions.insert( + SourceLocation { + path: path.into_arc(), + point: point.into(), + }, + ranges, + ); + } + lsp_files += 1; + valid_len += line.len() + 1 + } } - .into(); + if lsp_files < files.len() { + if lsp_files == 0 { + log::warn!( + "No LSP definitions found, populating {}", + lsp_definitions_path.display() + ); + } else { + log::warn!("{} files missing from LSP cache", files.len() - lsp_files); + } + + gather_lsp_definitions( + &lsp_definitions_path, + lsp_files, + &filtered_files, + &worktree, + &project, + &mut lsp_definitions, + cx, + ) + .await?; + } let files_len = files.len().min(file_limit.unwrap_or(usize::MAX)); let done_count = Arc::new(AtomicUsize::new(0)); @@ -596,7 +652,6 @@ pub async fn retrieval_stats( point: query_point, }; let lsp_definitions = lsp_definitions - .definitions .get(&source_location) .cloned() .unwrap_or_else(|| { @@ -916,11 +971,14 @@ async fn retrieve_definitions( } async fn gather_lsp_definitions( + lsp_definitions_path: &Path, + start_index: usize, files: &[ProjectPath], worktree: &Entity, project: &Entity, + definitions: &mut HashMap>, cx: &mut AsyncApp, -) -> Result { +) -> Result<()> { let worktree_id = worktree.read_with(cx, |worktree, _cx| worktree.id())?; let lsp_store = project.read_with(cx, |project, _cx| project.lsp_store())?; @@ -943,14 +1001,30 @@ async fn gather_lsp_definitions( })? .detach(); - let mut definitions = HashMap::default(); + let (cache_line_tx, mut cache_line_rx) = mpsc::unbounded::(); + + let cache_file = File::options() + .append(true) + .create(true) + .open(lsp_definitions_path) + .unwrap(); + + let cache_task = cx.background_spawn(async move { + let mut writer = BufWriter::new(cache_file); + while let Some(line) = cache_line_rx.next().await { + serde_json::to_writer(&mut writer, &line).unwrap(); + writer.write_all(&[b'\n']).unwrap(); + } + writer.flush().unwrap(); + }); + let mut error_count = 0; let mut lsp_open_handles = Vec::new(); let mut ready_languages = HashSet::default(); - for (file_index, project_path) in files.iter().enumerate() { + for (file_index, project_path) in files[start_index..].iter().enumerate() { println!( "Processing file {} of {}: {}", - file_index + 1, + start_index + file_index + 1, files.len(), project_path.path.display(PathStyle::Posix) ); @@ -994,6 +1068,8 @@ async fn gather_lsp_definitions( .await; } + let mut cache_line_references = Vec::with_capacity(references.len()); + for reference in references { // TODO: Rename declaration to definition in edit_prediction_context? let lsp_result = project @@ -1032,10 +1108,13 @@ async fn gather_lsp_definitions( })?; } + let point = snapshot.offset_to_point(reference.range.start); + + cache_line_references.push((point.into(), targets.clone())); definitions.insert( SourceLocation { path: project_path.path.clone(), - point: snapshot.offset_to_point(reference.range.start), + point, }, targets, ); @@ -1046,17 +1125,30 @@ async fn gather_lsp_definitions( } } } + + cache_line_tx + .unbounded_send(FileLspDefinitions { + path: project_path.path.as_unix_str().into(), + references: cache_line_references, + }) + .log_err(); } - log::error!("Encountered {} language server errors", error_count); + drop(cache_line_tx); - Ok(LspResults { definitions }) + if error_count > 0 { + log::error!("Encountered {} language server errors", error_count); + } + + cache_task.await; + + Ok(()) } -#[derive(Debug, Clone, Serialize, Deserialize)] -#[serde(transparent)] -struct LspResults { - definitions: HashMap>, +#[derive(Serialize, Deserialize)] +struct FileLspDefinitions { + path: Arc, + references: Vec<(SerializablePoint, Vec)>, } #[derive(Debug, Clone, Serialize, Deserialize)]