zeta2 cli: Resumable LSP declarations gathering (#39828)

Agus Zubiaga created

Gathering LSP declarations in zeta_cli can take a really long time for
big repos and has to be started from scratch if interrupted.

Instead of writing the cache file once we have walked the whole
worktree, we'll now do so incrementally as we complete each file. On
subsequent runs, we'll load as many valid declarations as has been
previously written to the cache, and then continue to request the rest
from the LSP which will append to the existing file as it makes
progress. If the last cache entry is incomplete, we'll truncate the
cache file to the end of the last valid line and continue from there, so
we can just `ctrl-c` without breaking resumability.

Release Notes:

- N/A

Change summary

crates/zeta_cli/src/main.rs | 144 +++++++++++++++++++++++++++++++-------
1 file changed, 118 insertions(+), 26 deletions(-)

Detailed changes

crates/zeta_cli/src/main.rs 🔗

@@ -28,7 +28,7 @@ use std::fmt::{self, Display};
 use std::fs::File;
 use std::hash::Hash;
 use std::hash::Hasher;
-use std::io::Write as _;
+use std::io::{BufRead, BufReader, BufWriter, Write as _};
 use std::ops::Range;
 use std::path::{Path, PathBuf};
 use std::process::exit;
@@ -529,28 +529,84 @@ pub async fn retrieval_stats(
     let file_snapshots = Arc::new(file_snapshots);
 
     let lsp_definitions_path = std::env::current_dir()?.join(format!(
-        "target/zeta2-lsp-definitions-{:x}.json",
+        "target/zeta2-lsp-definitions-{:x}.jsonl",
         files_hash
     ));
 
-    let lsp_definitions: Arc<_> = if std::fs::exists(&lsp_definitions_path)? {
+    let mut lsp_definitions = HashMap::default();
+    let mut lsp_files = 0;
+
+    if std::fs::exists(&lsp_definitions_path)? {
         log::info!(
             "Using cached LSP definitions from {}",
             lsp_definitions_path.display()
         );
-        serde_json::from_reader(File::open(&lsp_definitions_path)?)?
-    } else {
-        log::warn!(
-            "No LSP definitions found populating {}",
-            lsp_definitions_path.display()
-        );
-        let lsp_definitions =
-            gather_lsp_definitions(&filtered_files, &worktree, &project, cx).await?;
-        serde_json::to_writer_pretty(File::create(&lsp_definitions_path)?, &lsp_definitions)?;
-        lsp_definitions
+
+        let file = File::options()
+            .read(true)
+            .write(true)
+            .open(&lsp_definitions_path)?;
+        let lines = BufReader::new(&file).lines();
+        let mut valid_len: usize = 0;
+
+        for (line, expected_file) in lines.zip(files.iter()) {
+            let line = line?;
+            let FileLspDefinitions { path, references } = match serde_json::from_str(&line) {
+                Ok(ok) => ok,
+                Err(_) => {
+                    log::error!("Found invalid cache line. Truncating to #{lsp_files}.",);
+                    file.set_len(valid_len as u64)?;
+                    break;
+                }
+            };
+            let expected_path = expected_file.snapshot.file().unwrap().path().as_unix_str();
+            if expected_path != path.as_ref() {
+                log::error!(
+                    "Expected file #{} to be {expected_path}, but found {path}. Truncating to #{lsp_files}.",
+                    lsp_files + 1
+                );
+                file.set_len(valid_len as u64)?;
+                break;
+            }
+            for (point, ranges) in references {
+                let Ok(path) = RelPath::new(Path::new(path.as_ref()), PathStyle::Posix) else {
+                    log::warn!("Invalid path: {}", path);
+                    continue;
+                };
+                lsp_definitions.insert(
+                    SourceLocation {
+                        path: path.into_arc(),
+                        point: point.into(),
+                    },
+                    ranges,
+                );
+            }
+            lsp_files += 1;
+            valid_len += line.len() + 1
+        }
     }
-    .into();
 
+    if lsp_files < files.len() {
+        if lsp_files == 0 {
+            log::warn!(
+                "No LSP definitions found, populating {}",
+                lsp_definitions_path.display()
+            );
+        } else {
+            log::warn!("{} files missing from LSP cache", files.len() - lsp_files);
+        }
+
+        gather_lsp_definitions(
+            &lsp_definitions_path,
+            lsp_files,
+            &filtered_files,
+            &worktree,
+            &project,
+            &mut lsp_definitions,
+            cx,
+        )
+        .await?;
+    }
     let files_len = files.len().min(file_limit.unwrap_or(usize::MAX));
     let done_count = Arc::new(AtomicUsize::new(0));
 
@@ -596,7 +652,6 @@ pub async fn retrieval_stats(
                         point: query_point,
                     };
                     let lsp_definitions = lsp_definitions
-                        .definitions
                         .get(&source_location)
                         .cloned()
                         .unwrap_or_else(|| {
@@ -916,11 +971,14 @@ async fn retrieve_definitions(
 }
 
 async fn gather_lsp_definitions(
+    lsp_definitions_path: &Path,
+    start_index: usize,
     files: &[ProjectPath],
     worktree: &Entity<Worktree>,
     project: &Entity<Project>,
+    definitions: &mut HashMap<SourceLocation, Vec<SourceRange>>,
     cx: &mut AsyncApp,
-) -> Result<LspResults> {
+) -> Result<()> {
     let worktree_id = worktree.read_with(cx, |worktree, _cx| worktree.id())?;
 
     let lsp_store = project.read_with(cx, |project, _cx| project.lsp_store())?;
@@ -943,14 +1001,30 @@ async fn gather_lsp_definitions(
     })?
     .detach();
 
-    let mut definitions = HashMap::default();
+    let (cache_line_tx, mut cache_line_rx) = mpsc::unbounded::<FileLspDefinitions>();
+
+    let cache_file = File::options()
+        .append(true)
+        .create(true)
+        .open(lsp_definitions_path)
+        .unwrap();
+
+    let cache_task = cx.background_spawn(async move {
+        let mut writer = BufWriter::new(cache_file);
+        while let Some(line) = cache_line_rx.next().await {
+            serde_json::to_writer(&mut writer, &line).unwrap();
+            writer.write_all(&[b'\n']).unwrap();
+        }
+        writer.flush().unwrap();
+    });
+
     let mut error_count = 0;
     let mut lsp_open_handles = Vec::new();
     let mut ready_languages = HashSet::default();
-    for (file_index, project_path) in files.iter().enumerate() {
+    for (file_index, project_path) in files[start_index..].iter().enumerate() {
         println!(
             "Processing file {} of {}: {}",
-            file_index + 1,
+            start_index + file_index + 1,
             files.len(),
             project_path.path.display(PathStyle::Posix)
         );
@@ -994,6 +1068,8 @@ async fn gather_lsp_definitions(
                 .await;
         }
 
+        let mut cache_line_references = Vec::with_capacity(references.len());
+
         for reference in references {
             // TODO: Rename declaration to definition in edit_prediction_context?
             let lsp_result = project
@@ -1032,10 +1108,13 @@ async fn gather_lsp_definitions(
                         })?;
                     }
 
+                    let point = snapshot.offset_to_point(reference.range.start);
+
+                    cache_line_references.push((point.into(), targets.clone()));
                     definitions.insert(
                         SourceLocation {
                             path: project_path.path.clone(),
-                            point: snapshot.offset_to_point(reference.range.start),
+                            point,
                         },
                         targets,
                     );
@@ -1046,17 +1125,30 @@ async fn gather_lsp_definitions(
                 }
             }
         }
+
+        cache_line_tx
+            .unbounded_send(FileLspDefinitions {
+                path: project_path.path.as_unix_str().into(),
+                references: cache_line_references,
+            })
+            .log_err();
     }
 
-    log::error!("Encountered {} language server errors", error_count);
+    drop(cache_line_tx);
 
-    Ok(LspResults { definitions })
+    if error_count > 0 {
+        log::error!("Encountered {} language server errors", error_count);
+    }
+
+    cache_task.await;
+
+    Ok(())
 }
 
-#[derive(Debug, Clone, Serialize, Deserialize)]
-#[serde(transparent)]
-struct LspResults {
-    definitions: HashMap<SourceLocation, Vec<SourceRange>>,
+#[derive(Serialize, Deserialize)]
+struct FileLspDefinitions {
+    path: Arc<str>,
+    references: Vec<(SerializablePoint, Vec<SourceRange>)>,
 }
 
 #[derive(Debug, Clone, Serialize, Deserialize)]