From 5da2b123b56f570cc44086d100b1292937ac87e8 Mon Sep 17 00:00:00 2001 From: Max Brunsfeld Date: Tue, 21 Mar 2023 11:53:04 -0700 Subject: [PATCH] Allow refreshing worktree entries while the initial scan is in-progress --- crates/language/src/buffer_tests.rs | 1 - crates/project/src/worktree.rs | 567 ++++++++++++++-------------- 2 files changed, 293 insertions(+), 275 deletions(-) diff --git a/crates/language/src/buffer_tests.rs b/crates/language/src/buffer_tests.rs index 3e65ec3120445bb69604c056b200d4b8f66b6863..765c0f96738d92194ee181ca67b3e44eb6339e2e 100644 --- a/crates/language/src/buffer_tests.rs +++ b/crates/language/src/buffer_tests.rs @@ -809,7 +809,6 @@ fn test_enclosing_bracket_ranges_where_brackets_are_not_outermost_children( }"}], ); - eprintln!("-----------------------"); // Regression test: even though the parent node of the parentheses (the for loop) does // intersect the given range, the parentheses themselves do not contain the range, so // they should not be returned. Only the curly braces contain the range. diff --git a/crates/project/src/worktree.rs b/crates/project/src/worktree.rs index 656268a02c56b2a918c00e8be2103b0549aaa491..33714e762ff94d43d7dd6f99a806c763ea33dafc 100644 --- a/crates/project/src/worktree.rs +++ b/crates/project/src/worktree.rs @@ -12,7 +12,7 @@ use futures::{ mpsc::{self, UnboundedReceiver, UnboundedSender}, oneshot, }, - Stream, StreamExt, + select_biased, Stream, StreamExt, }; use fuzzy::CharBag; use git::{DOT_GIT, GITIGNORE}; @@ -75,8 +75,8 @@ pub struct LocalWorktree { } pub struct RemoteWorktree { - pub snapshot: Snapshot, - pub(crate) background_snapshot: Arc>, + snapshot: Snapshot, + background_snapshot: Arc>, project_id: u64, client: Arc, updates_tx: Option>, @@ -116,7 +116,6 @@ impl std::fmt::Debug for GitRepositoryEntry { f.debug_struct("GitRepositoryEntry") .field("content_path", &self.content_path) .field("git_dir_path", &self.git_dir_path) - .field("libgit_repository", &"LibGitRepository") .finish() } } @@ -158,8 +157,13 @@ impl DerefMut for LocalSnapshot { enum ScanState { /// The worktree is performing its initial scan of the filesystem. - Initializing(LocalSnapshot), - Initialized(LocalSnapshot), + Initializing { + snapshot: LocalSnapshot, + barrier: Option, + }, + Initialized { + snapshot: LocalSnapshot, + }, /// The worktree is updating in response to filesystem events. Updating, Updated { @@ -167,7 +171,6 @@ enum ScanState { changes: HashMap, PathChange>, barrier: Option, }, - Err(Arc), } struct ShareState { @@ -538,32 +541,30 @@ impl LocalWorktree { cx: &mut ModelContext, ) { match scan_state { - ScanState::Initializing(new_snapshot) => { + ScanState::Initializing { snapshot, barrier } => { *self.is_scanning.0.borrow_mut() = true; - self.set_snapshot(new_snapshot, cx); + self.set_snapshot(snapshot, cx); + drop(barrier); } - ScanState::Initialized(new_snapshot) => { + ScanState::Initialized { snapshot } => { *self.is_scanning.0.borrow_mut() = false; - self.set_snapshot(new_snapshot, cx); + self.set_snapshot(snapshot, cx); } ScanState::Updating => { *self.is_scanning.0.borrow_mut() = true; } ScanState::Updated { - snapshot: new_snapshot, + snapshot, changes, barrier, } => { *self.is_scanning.0.borrow_mut() = false; cx.emit(Event::UpdatedEntries(changes)); - self.set_snapshot(new_snapshot, cx); + self.set_snapshot(snapshot, cx); drop(barrier); } - ScanState::Err(error) => { - *self.is_scanning.0.borrow_mut() = false; - log::error!("error scanning worktree {:?}", error); - } } + cx.notify(); } fn set_snapshot(&mut self, new_snapshot: LocalSnapshot, cx: &mut ModelContext) { @@ -580,7 +581,6 @@ impl LocalWorktree { if !updated_repos.is_empty() { cx.emit(Event::UpdatedGitRepositories(updated_repos)); } - cx.notify(); } fn changed_repos( @@ -759,15 +759,25 @@ impl LocalWorktree { is_dir: bool, cx: &mut ModelContext, ) -> Task> { - self.write_entry_internal( - path, + let path = path.into(); + let abs_path = self.absolutize(&path); + let fs = self.fs.clone(); + let write = cx.background().spawn(async move { if is_dir { - None + fs.create_dir(&abs_path).await } else { - Some(Default::default()) - }, - cx, - ) + fs.save(&abs_path, &Default::default(), Default::default()) + .await + } + }); + + cx.spawn(|this, mut cx| async move { + write.await?; + this.update(&mut cx, |this, cx| { + this.as_local_mut().unwrap().refresh_entry(path, None, cx) + }) + .await + }) } pub fn write_file( @@ -777,7 +787,20 @@ impl LocalWorktree { line_ending: LineEnding, cx: &mut ModelContext, ) -> Task> { - self.write_entry_internal(path, Some((text, line_ending)), cx) + let path = path.into(); + let abs_path = self.absolutize(&path); + let fs = self.fs.clone(); + let write = cx + .background() + .spawn(async move { fs.save(&abs_path, &text, line_ending).await }); + + cx.spawn(|this, mut cx| async move { + write.await?; + this.update(&mut cx, |this, cx| { + this.as_local_mut().unwrap().refresh_entry(path, None, cx) + }) + .await + }) } pub fn delete_entry( @@ -882,32 +905,6 @@ impl LocalWorktree { })) } - fn write_entry_internal( - &self, - path: impl Into>, - text_if_file: Option<(Rope, LineEnding)>, - cx: &mut ModelContext, - ) -> Task> { - let path = path.into(); - let abs_path = self.absolutize(&path); - let fs = self.fs.clone(); - let write = cx.background().spawn(async move { - if let Some((text, line_ending)) = text_if_file { - fs.save(&abs_path, &text, line_ending).await - } else { - fs.create_dir(&abs_path).await - } - }); - - cx.spawn(|this, mut cx| async move { - write.await?; - this.update(&mut cx, |this, cx| { - this.as_local_mut().unwrap().refresh_entry(path, None, cx) - }) - .await - }) - } - fn refresh_entry( &self, path: Arc, @@ -1380,7 +1377,10 @@ impl LocalSnapshot { .cloned() } - pub(crate) fn in_dot_git(&mut self, path: &Path) -> Option<&mut GitRepositoryEntry> { + pub(crate) fn repo_with_dot_git_containing( + &mut self, + path: &Path, + ) -> Option<&mut GitRepositoryEntry> { // Git repositories cannot be nested, so we don't need to reverse the order self.git_repositories .iter_mut() @@ -1682,7 +1682,7 @@ impl GitRepositoryEntry { path.starts_with(self.content_path.as_ref()) } - // Note that theis path should be relative to the worktree root. + // Note that this path should be relative to the worktree root. pub(crate) fn in_dot_git(&self, path: &Path) -> bool { path.starts_with(self.git_dir_path.as_ref()) } @@ -2162,61 +2162,143 @@ impl BackgroundScanner { events_rx: impl Stream>, mut changed_paths: UnboundedReceiver<(Vec, barrier::Sender)>, ) { - use futures::{select_biased, FutureExt as _}; - - // While performing the initial scan, send a new snapshot to the main - // thread on a recurring interval. - let initializing_task = self.executor.spawn({ - let executor = self.executor.clone(); - let snapshot = self.snapshot.clone(); - let notify = self.notify.clone(); - let is_fake_fs = self.fs.is_fake(); - async move { - loop { - if is_fake_fs { - #[cfg(any(test, feature = "test-support"))] - executor.simulate_random_delay().await; - } else { - smol::Timer::after(Duration::from_millis(100)).await; - } + use futures::FutureExt as _; - executor.timer(Duration::from_millis(100)).await; - if notify - .unbounded_send(ScanState::Initializing(snapshot.lock().clone())) - .is_err() - { - break; - } + // Retrieve the basic properties of the root node. + let root_char_bag; + let root_abs_path; + let root_inode; + let root_is_dir; + let next_entry_id; + { + let mut snapshot = self.snapshot.lock(); + snapshot.scan_started(); + root_char_bag = snapshot.root_char_bag; + root_abs_path = snapshot.abs_path.clone(); + root_inode = snapshot.root_entry().map(|e| e.inode); + root_is_dir = snapshot.root_entry().map_or(false, |e| e.is_dir()); + next_entry_id = snapshot.next_entry_id.clone(); + } + + // Populate ignores above the root. + let ignore_stack; + for ancestor in root_abs_path.ancestors().skip(1) { + if let Ok(ignore) = build_gitignore(&ancestor.join(&*GITIGNORE), self.fs.as_ref()).await + { + self.snapshot + .lock() + .ignores_by_parent_abs_path + .insert(ancestor.into(), (ignore.into(), 0)); + } + } + { + let mut snapshot = self.snapshot.lock(); + ignore_stack = snapshot.ignore_stack_for_abs_path(&root_abs_path, true); + if ignore_stack.is_all() { + if let Some(mut root_entry) = snapshot.root_entry().cloned() { + root_entry.is_ignored = true; + snapshot.insert_entry(root_entry, self.fs.as_ref()); } } - }); + }; - // Scan the entire directory. - if let Err(err) = self.scan_dirs().await { - if self - .notify - .unbounded_send(ScanState::Err(Arc::new(err))) - .is_err() - { - return; + if root_is_dir { + let mut ancestor_inodes = TreeSet::default(); + if let Some(root_inode) = root_inode { + ancestor_inodes.insert(root_inode); } + + let (tx, rx) = channel::unbounded(); + self.executor + .block(tx.send(ScanJob { + abs_path: root_abs_path.to_path_buf(), + path: Arc::from(Path::new("")), + ignore_stack, + ancestor_inodes, + scan_queue: tx.clone(), + })) + .unwrap(); + drop(tx); + + // Spawn a worker thread per logical CPU. + self.executor + .scoped(|scope| { + // One the first worker thread, listen for change requests from the worktree. + // For each change request, after refreshing the given paths, report + // a progress update for the snapshot. + scope.spawn(async { + let reporting_timer = self.delay().fuse(); + futures::pin_mut!(reporting_timer); + + loop { + select_biased! { + job = changed_paths.next().fuse() => { + let Some((abs_paths, barrier)) = job else { break }; + self.update_entries_for_paths(abs_paths, None).await; + if self.notify.unbounded_send(ScanState::Initializing { + snapshot: self.snapshot.lock().clone(), + barrier: Some(barrier), + }).is_err() { + break; + } + } + + _ = reporting_timer => { + reporting_timer.set(self.delay().fuse()); + if self.notify.unbounded_send(ScanState::Initializing { + snapshot: self.snapshot.lock().clone(), + barrier: None, + }).is_err() { + break; + } + } + + job = rx.recv().fuse() => { + let Ok(job) = job else { break }; + if let Err(err) = self + .scan_dir(root_char_bag, next_entry_id.clone(), &job) + .await + { + log::error!("error scanning {:?}: {}", job.abs_path, err); + } + } + } + } + }); + + // On all of the remaining worker threads, just scan directories. + for _ in 1..self.executor.num_cpus() { + scope.spawn(async { + while let Ok(job) = rx.recv().await { + if let Err(err) = self + .scan_dir(root_char_bag, next_entry_id.clone(), &job) + .await + { + log::error!("error scanning {:?}: {}", job.abs_path, err); + } + } + }); + } + }) + .await; } - drop(initializing_task); + self.snapshot.lock().scan_completed(); if self .notify - .unbounded_send(ScanState::Initialized(self.snapshot.lock().clone())) + .unbounded_send(ScanState::Initialized { + snapshot: self.snapshot.lock().clone(), + }) .is_err() { return; } - futures::pin_mut!(events_rx); - // Process any events that occurred while performing the initial scan. These // events can't be reported as precisely, because there is no snapshot of the // worktree before they occurred. + futures::pin_mut!(events_rx); if let Poll::Ready(Some(mut events)) = futures::poll!(events_rx.next()) { while let Poll::Ready(Some(additional_events)) = futures::poll!(events_rx.next()) { events.extend(additional_events); @@ -2224,7 +2306,10 @@ impl BackgroundScanner { if self.notify.unbounded_send(ScanState::Updating).is_err() { return; } - if !self.process_events(events, true).await { + if !self + .process_events(events.into_iter().map(|e| e.path).collect(), true) + .await + { return; } if self @@ -2242,24 +2327,17 @@ impl BackgroundScanner { // Continue processing events until the worktree is dropped. loop { - let events; + let abs_paths; let barrier; select_biased! { request = changed_paths.next().fuse() => { - let Some((paths, b)) = request else { break; }; - events = paths - .into_iter() - .map(|path| fsevent::Event { - path, - event_id: 0, - flags: fsevent::StreamFlags::NONE - }) - .collect::>(); + let Some((paths, b)) = request else { break }; + abs_paths = paths; barrier = Some(b); } - e = events_rx.next().fuse() => { - let Some(e) = e else { break; }; - events = e; + events = events_rx.next().fuse() => { + let Some(events) = events else { break }; + abs_paths = events.into_iter().map(|e| e.path).collect(); barrier = None; } } @@ -2267,7 +2345,7 @@ impl BackgroundScanner { if self.notify.unbounded_send(ScanState::Updating).is_err() { return; } - if !self.process_events(events, false).await { + if !self.process_events(abs_paths, false).await { return; } if self @@ -2284,85 +2362,12 @@ impl BackgroundScanner { } } - async fn scan_dirs(&mut self) -> Result<()> { - let root_char_bag; - let root_abs_path; - let root_inode; - let is_dir; - let next_entry_id; - { - let mut snapshot = self.snapshot.lock(); - snapshot.scan_started(); - root_char_bag = snapshot.root_char_bag; - root_abs_path = snapshot.abs_path.clone(); - root_inode = snapshot.root_entry().map(|e| e.inode); - is_dir = snapshot.root_entry().map_or(false, |e| e.is_dir()); - next_entry_id = snapshot.next_entry_id.clone(); - }; - - // Populate ignores above the root. - for ancestor in root_abs_path.ancestors().skip(1) { - if let Ok(ignore) = build_gitignore(&ancestor.join(&*GITIGNORE), self.fs.as_ref()).await - { - self.snapshot - .lock() - .ignores_by_parent_abs_path - .insert(ancestor.into(), (ignore.into(), 0)); - } - } - - let ignore_stack = { - let mut snapshot = self.snapshot.lock(); - let ignore_stack = snapshot.ignore_stack_for_abs_path(&root_abs_path, true); - if ignore_stack.is_all() { - if let Some(mut root_entry) = snapshot.root_entry().cloned() { - root_entry.is_ignored = true; - snapshot.insert_entry(root_entry, self.fs.as_ref()); - } - } - ignore_stack - }; - - if is_dir { - let path: Arc = Arc::from(Path::new("")); - let mut ancestor_inodes = TreeSet::default(); - if let Some(root_inode) = root_inode { - ancestor_inodes.insert(root_inode); - } - - let (tx, rx) = channel::unbounded(); - self.executor - .block(tx.send(ScanJob { - abs_path: root_abs_path.to_path_buf(), - path, - ignore_stack, - ancestor_inodes, - scan_queue: tx.clone(), - })) - .unwrap(); - drop(tx); - - self.executor - .scoped(|scope| { - for _ in 0..self.executor.num_cpus() { - scope.spawn(async { - while let Ok(job) = rx.recv().await { - if let Err(err) = self - .scan_dir(root_char_bag, next_entry_id.clone(), &job) - .await - { - log::error!("error scanning {:?}: {}", job.abs_path, err); - } - } - }); - } - }) - .await; - - self.snapshot.lock().scan_completed(); + async fn delay(&self) { + #[cfg(any(test, feature = "test-support"))] + if self.fs.is_fake() { + return self.executor.simulate_random_delay().await; } - - Ok(()) + smol::Timer::after(Duration::from_millis(100)).await; } async fn scan_dir( @@ -2492,85 +2497,125 @@ impl BackgroundScanner { async fn process_events( &mut self, - mut events: Vec, + abs_paths: Vec, received_before_initialized: bool, ) -> bool { - events.sort_unstable_by(|a, b| a.path.cmp(&b.path)); - events.dedup_by(|a, b| a.path.starts_with(&b.path)); + let (scan_queue_tx, scan_queue_rx) = channel::unbounded(); - let root_char_bag; - let root_abs_path; - let next_entry_id; - let prev_snapshot; - { + let prev_snapshot = { let mut snapshot = self.snapshot.lock(); - prev_snapshot = snapshot.snapshot.clone(); - root_char_bag = snapshot.root_char_bag; - root_abs_path = snapshot.abs_path.clone(); - next_entry_id = snapshot.next_entry_id.clone(); snapshot.scan_started(); - } + snapshot.clone() + }; - let root_canonical_path = if let Ok(path) = self.fs.canonicalize(&root_abs_path).await { - path + let event_paths = if let Some(event_paths) = self + .update_entries_for_paths(abs_paths, Some(scan_queue_tx)) + .await + { + event_paths } else { return false; }; + + // Scan any directories that were created as part of this event batch. + self.executor + .scoped(|scope| { + for _ in 0..self.executor.num_cpus() { + scope.spawn(async { + while let Ok(job) = scan_queue_rx.recv().await { + if let Err(err) = self + .scan_dir( + prev_snapshot.root_char_bag, + prev_snapshot.next_entry_id.clone(), + &job, + ) + .await + { + log::error!("error scanning {:?}: {}", job.abs_path, err); + } + } + }); + } + }) + .await; + + // Attempt to detect renames only over a single batch of file-system events. + self.snapshot.lock().removed_entry_ids.clear(); + + self.update_ignore_statuses().await; + self.update_git_repositories(); + self.build_change_set( + prev_snapshot.snapshot, + event_paths, + received_before_initialized, + ); + self.snapshot.lock().scan_completed(); + true + } + + async fn update_entries_for_paths( + &self, + mut abs_paths: Vec, + scan_queue_tx: Option>, + ) -> Option>> { + abs_paths.sort_unstable(); + abs_paths.dedup_by(|a, b| a.starts_with(&b)); + + let root_abs_path = self.snapshot.lock().abs_path.clone(); + let root_canonical_path = self.fs.canonicalize(&root_abs_path).await.ok()?; let metadata = futures::future::join_all( - events + abs_paths .iter() - .map(|event| self.fs.metadata(&event.path)) + .map(|abs_path| self.fs.metadata(&abs_path)) .collect::>(), ) .await; - // Hold the snapshot lock while clearing and re-inserting the root entries - // for each event. This way, the snapshot is not observable to the foreground - // thread while this operation is in-progress. - let mut event_paths = Vec::with_capacity(events.len()); - let (scan_queue_tx, scan_queue_rx) = channel::unbounded(); - { - let mut snapshot = self.snapshot.lock(); - for event in &events { - if let Ok(path) = event.path.strip_prefix(&root_canonical_path) { + let mut snapshot = self.snapshot.lock(); + if scan_queue_tx.is_some() { + for abs_path in &abs_paths { + if let Ok(path) = abs_path.strip_prefix(&root_canonical_path) { snapshot.remove_path(path); } } + } - for (event, metadata) in events.into_iter().zip(metadata.into_iter()) { - let path: Arc = match event.path.strip_prefix(&root_canonical_path) { - Ok(path) => Arc::from(path.to_path_buf()), - Err(_) => { - log::error!( - "unexpected event {:?} for root path {:?}", - event.path, - root_canonical_path - ); - continue; - } - }; - event_paths.push(path.clone()); - let abs_path = root_abs_path.join(&path); - - match metadata { - Ok(Some(metadata)) => { - let ignore_stack = - snapshot.ignore_stack_for_abs_path(&abs_path, metadata.is_dir); - let mut fs_entry = Entry::new( - path.clone(), - &metadata, - snapshot.next_entry_id.as_ref(), - snapshot.root_char_bag, - ); - fs_entry.is_ignored = ignore_stack.is_all(); - snapshot.insert_entry(fs_entry, self.fs.as_ref()); + let mut event_paths = Vec::with_capacity(abs_paths.len()); + for (abs_path, metadata) in abs_paths.into_iter().zip(metadata.into_iter()) { + let path: Arc = match abs_path.strip_prefix(&root_canonical_path) { + Ok(path) => Arc::from(path.to_path_buf()), + Err(_) => { + log::error!( + "unexpected event {:?} for root path {:?}", + abs_path, + root_canonical_path + ); + continue; + } + }; + event_paths.push(path.clone()); + let abs_path = root_abs_path.join(&path); + + match metadata { + Ok(Some(metadata)) => { + let ignore_stack = + snapshot.ignore_stack_for_abs_path(&abs_path, metadata.is_dir); + let mut fs_entry = Entry::new( + path.clone(), + &metadata, + snapshot.next_entry_id.as_ref(), + snapshot.root_char_bag, + ); + fs_entry.is_ignored = ignore_stack.is_all(); + snapshot.insert_entry(fs_entry, self.fs.as_ref()); - let scan_id = snapshot.scan_id; - if let Some(repo) = snapshot.in_dot_git(&path) { - repo.repo.lock().reload_index(); - repo.scan_id = scan_id; - } + let scan_id = snapshot.scan_id; + if let Some(repo) = snapshot.repo_with_dot_git_containing(&path) { + repo.repo.lock().reload_index(); + repo.scan_id = scan_id; + } + if let Some(scan_queue_tx) = &scan_queue_tx { let mut ancestor_inodes = snapshot.ancestor_inodes_for_path(&path); if metadata.is_dir && !ancestor_inodes.contains(&metadata.inode) { ancestor_inodes.insert(metadata.inode); @@ -2585,42 +2630,16 @@ impl BackgroundScanner { .unwrap(); } } - Ok(None) => {} - Err(err) => { - // TODO - create a special 'error' entry in the entries tree to mark this - log::error!("error reading file on event {:?}", err); - } + } + Ok(None) => {} + Err(err) => { + // TODO - create a special 'error' entry in the entries tree to mark this + log::error!("error reading file on event {:?}", err); } } - drop(scan_queue_tx); } - // Scan any directories that were created as part of this event batch. - self.executor - .scoped(|scope| { - for _ in 0..self.executor.num_cpus() { - scope.spawn(async { - while let Ok(job) = scan_queue_rx.recv().await { - if let Err(err) = self - .scan_dir(root_char_bag, next_entry_id.clone(), &job) - .await - { - log::error!("error scanning {:?}: {}", job.abs_path, err); - } - } - }); - } - }) - .await; - - // Attempt to detect renames only over a single batch of file-system events. - self.snapshot.lock().removed_entry_ids.clear(); - - self.update_ignore_statuses().await; - self.update_git_repositories(); - self.build_change_set(prev_snapshot, event_paths, received_before_initialized); - self.snapshot.lock().scan_completed(); - true + Some(event_paths) } async fn update_ignore_statuses(&self) {