@@ -12,7 +12,7 @@ use futures::{
mpsc::{self, UnboundedReceiver, UnboundedSender},
oneshot,
},
- Stream, StreamExt,
+ select_biased, Stream, StreamExt,
};
use fuzzy::CharBag;
use git::{DOT_GIT, GITIGNORE};
@@ -75,8 +75,8 @@ pub struct LocalWorktree {
}
pub struct RemoteWorktree {
- pub snapshot: Snapshot,
- pub(crate) background_snapshot: Arc<Mutex<Snapshot>>,
+ snapshot: Snapshot,
+ background_snapshot: Arc<Mutex<Snapshot>>,
project_id: u64,
client: Arc<Client>,
updates_tx: Option<UnboundedSender<proto::UpdateWorktree>>,
@@ -116,7 +116,6 @@ impl std::fmt::Debug for GitRepositoryEntry {
f.debug_struct("GitRepositoryEntry")
.field("content_path", &self.content_path)
.field("git_dir_path", &self.git_dir_path)
- .field("libgit_repository", &"LibGitRepository")
.finish()
}
}
@@ -158,8 +157,13 @@ impl DerefMut for LocalSnapshot {
enum ScanState {
/// The worktree is performing its initial scan of the filesystem.
- Initializing(LocalSnapshot),
- Initialized(LocalSnapshot),
+ Initializing {
+ snapshot: LocalSnapshot,
+ barrier: Option<barrier::Sender>,
+ },
+ Initialized {
+ snapshot: LocalSnapshot,
+ },
/// The worktree is updating in response to filesystem events.
Updating,
Updated {
@@ -167,7 +171,6 @@ enum ScanState {
changes: HashMap<Arc<Path>, PathChange>,
barrier: Option<barrier::Sender>,
},
- Err(Arc<anyhow::Error>),
}
struct ShareState {
@@ -538,32 +541,30 @@ impl LocalWorktree {
cx: &mut ModelContext<Worktree>,
) {
match scan_state {
- ScanState::Initializing(new_snapshot) => {
+ ScanState::Initializing { snapshot, barrier } => {
*self.is_scanning.0.borrow_mut() = true;
- self.set_snapshot(new_snapshot, cx);
+ self.set_snapshot(snapshot, cx);
+ drop(barrier);
}
- ScanState::Initialized(new_snapshot) => {
+ ScanState::Initialized { snapshot } => {
*self.is_scanning.0.borrow_mut() = false;
- self.set_snapshot(new_snapshot, cx);
+ self.set_snapshot(snapshot, cx);
}
ScanState::Updating => {
*self.is_scanning.0.borrow_mut() = true;
}
ScanState::Updated {
- snapshot: new_snapshot,
+ snapshot,
changes,
barrier,
} => {
*self.is_scanning.0.borrow_mut() = false;
cx.emit(Event::UpdatedEntries(changes));
- self.set_snapshot(new_snapshot, cx);
+ self.set_snapshot(snapshot, cx);
drop(barrier);
}
- ScanState::Err(error) => {
- *self.is_scanning.0.borrow_mut() = false;
- log::error!("error scanning worktree {:?}", error);
- }
}
+ cx.notify();
}
fn set_snapshot(&mut self, new_snapshot: LocalSnapshot, cx: &mut ModelContext<Worktree>) {
@@ -580,7 +581,6 @@ impl LocalWorktree {
if !updated_repos.is_empty() {
cx.emit(Event::UpdatedGitRepositories(updated_repos));
}
- cx.notify();
}
fn changed_repos(
@@ -759,15 +759,25 @@ impl LocalWorktree {
is_dir: bool,
cx: &mut ModelContext<Worktree>,
) -> Task<Result<Entry>> {
- self.write_entry_internal(
- path,
+ let path = path.into();
+ let abs_path = self.absolutize(&path);
+ let fs = self.fs.clone();
+ let write = cx.background().spawn(async move {
if is_dir {
- None
+ fs.create_dir(&abs_path).await
} else {
- Some(Default::default())
- },
- cx,
- )
+ fs.save(&abs_path, &Default::default(), Default::default())
+ .await
+ }
+ });
+
+ cx.spawn(|this, mut cx| async move {
+ write.await?;
+ this.update(&mut cx, |this, cx| {
+ this.as_local_mut().unwrap().refresh_entry(path, None, cx)
+ })
+ .await
+ })
}
pub fn write_file(
@@ -777,7 +787,20 @@ impl LocalWorktree {
line_ending: LineEnding,
cx: &mut ModelContext<Worktree>,
) -> Task<Result<Entry>> {
- self.write_entry_internal(path, Some((text, line_ending)), cx)
+ let path = path.into();
+ let abs_path = self.absolutize(&path);
+ let fs = self.fs.clone();
+ let write = cx
+ .background()
+ .spawn(async move { fs.save(&abs_path, &text, line_ending).await });
+
+ cx.spawn(|this, mut cx| async move {
+ write.await?;
+ this.update(&mut cx, |this, cx| {
+ this.as_local_mut().unwrap().refresh_entry(path, None, cx)
+ })
+ .await
+ })
}
pub fn delete_entry(
@@ -882,32 +905,6 @@ impl LocalWorktree {
}))
}
- fn write_entry_internal(
- &self,
- path: impl Into<Arc<Path>>,
- text_if_file: Option<(Rope, LineEnding)>,
- cx: &mut ModelContext<Worktree>,
- ) -> Task<Result<Entry>> {
- let path = path.into();
- let abs_path = self.absolutize(&path);
- let fs = self.fs.clone();
- let write = cx.background().spawn(async move {
- if let Some((text, line_ending)) = text_if_file {
- fs.save(&abs_path, &text, line_ending).await
- } else {
- fs.create_dir(&abs_path).await
- }
- });
-
- cx.spawn(|this, mut cx| async move {
- write.await?;
- this.update(&mut cx, |this, cx| {
- this.as_local_mut().unwrap().refresh_entry(path, None, cx)
- })
- .await
- })
- }
-
fn refresh_entry(
&self,
path: Arc<Path>,
@@ -1380,7 +1377,10 @@ impl LocalSnapshot {
.cloned()
}
- pub(crate) fn in_dot_git(&mut self, path: &Path) -> Option<&mut GitRepositoryEntry> {
+ pub(crate) fn repo_with_dot_git_containing(
+ &mut self,
+ path: &Path,
+ ) -> Option<&mut GitRepositoryEntry> {
// Git repositories cannot be nested, so we don't need to reverse the order
self.git_repositories
.iter_mut()
@@ -1682,7 +1682,7 @@ impl GitRepositoryEntry {
path.starts_with(self.content_path.as_ref())
}
- // Note that theis path should be relative to the worktree root.
+ // Note that this path should be relative to the worktree root.
pub(crate) fn in_dot_git(&self, path: &Path) -> bool {
path.starts_with(self.git_dir_path.as_ref())
}
@@ -2162,61 +2162,143 @@ impl BackgroundScanner {
events_rx: impl Stream<Item = Vec<fsevent::Event>>,
mut changed_paths: UnboundedReceiver<(Vec<PathBuf>, barrier::Sender)>,
) {
- use futures::{select_biased, FutureExt as _};
-
- // While performing the initial scan, send a new snapshot to the main
- // thread on a recurring interval.
- let initializing_task = self.executor.spawn({
- let executor = self.executor.clone();
- let snapshot = self.snapshot.clone();
- let notify = self.notify.clone();
- let is_fake_fs = self.fs.is_fake();
- async move {
- loop {
- if is_fake_fs {
- #[cfg(any(test, feature = "test-support"))]
- executor.simulate_random_delay().await;
- } else {
- smol::Timer::after(Duration::from_millis(100)).await;
- }
+ use futures::FutureExt as _;
- executor.timer(Duration::from_millis(100)).await;
- if notify
- .unbounded_send(ScanState::Initializing(snapshot.lock().clone()))
- .is_err()
- {
- break;
- }
+ // Retrieve the basic properties of the root node.
+ let root_char_bag;
+ let root_abs_path;
+ let root_inode;
+ let root_is_dir;
+ let next_entry_id;
+ {
+ let mut snapshot = self.snapshot.lock();
+ snapshot.scan_started();
+ root_char_bag = snapshot.root_char_bag;
+ root_abs_path = snapshot.abs_path.clone();
+ root_inode = snapshot.root_entry().map(|e| e.inode);
+ root_is_dir = snapshot.root_entry().map_or(false, |e| e.is_dir());
+ next_entry_id = snapshot.next_entry_id.clone();
+ }
+
+ // Populate ignores above the root.
+ let ignore_stack;
+ for ancestor in root_abs_path.ancestors().skip(1) {
+ if let Ok(ignore) = build_gitignore(&ancestor.join(&*GITIGNORE), self.fs.as_ref()).await
+ {
+ self.snapshot
+ .lock()
+ .ignores_by_parent_abs_path
+ .insert(ancestor.into(), (ignore.into(), 0));
+ }
+ }
+ {
+ let mut snapshot = self.snapshot.lock();
+ ignore_stack = snapshot.ignore_stack_for_abs_path(&root_abs_path, true);
+ if ignore_stack.is_all() {
+ if let Some(mut root_entry) = snapshot.root_entry().cloned() {
+ root_entry.is_ignored = true;
+ snapshot.insert_entry(root_entry, self.fs.as_ref());
}
}
- });
+ };
- // Scan the entire directory.
- if let Err(err) = self.scan_dirs().await {
- if self
- .notify
- .unbounded_send(ScanState::Err(Arc::new(err)))
- .is_err()
- {
- return;
+ if root_is_dir {
+ let mut ancestor_inodes = TreeSet::default();
+ if let Some(root_inode) = root_inode {
+ ancestor_inodes.insert(root_inode);
}
+
+ let (tx, rx) = channel::unbounded();
+ self.executor
+ .block(tx.send(ScanJob {
+ abs_path: root_abs_path.to_path_buf(),
+ path: Arc::from(Path::new("")),
+ ignore_stack,
+ ancestor_inodes,
+ scan_queue: tx.clone(),
+ }))
+ .unwrap();
+ drop(tx);
+
+ // Spawn a worker thread per logical CPU.
+ self.executor
+ .scoped(|scope| {
+ // One the first worker thread, listen for change requests from the worktree.
+ // For each change request, after refreshing the given paths, report
+ // a progress update for the snapshot.
+ scope.spawn(async {
+ let reporting_timer = self.delay().fuse();
+ futures::pin_mut!(reporting_timer);
+
+ loop {
+ select_biased! {
+ job = changed_paths.next().fuse() => {
+ let Some((abs_paths, barrier)) = job else { break };
+ self.update_entries_for_paths(abs_paths, None).await;
+ if self.notify.unbounded_send(ScanState::Initializing {
+ snapshot: self.snapshot.lock().clone(),
+ barrier: Some(barrier),
+ }).is_err() {
+ break;
+ }
+ }
+
+ _ = reporting_timer => {
+ reporting_timer.set(self.delay().fuse());
+ if self.notify.unbounded_send(ScanState::Initializing {
+ snapshot: self.snapshot.lock().clone(),
+ barrier: None,
+ }).is_err() {
+ break;
+ }
+ }
+
+ job = rx.recv().fuse() => {
+ let Ok(job) = job else { break };
+ if let Err(err) = self
+ .scan_dir(root_char_bag, next_entry_id.clone(), &job)
+ .await
+ {
+ log::error!("error scanning {:?}: {}", job.abs_path, err);
+ }
+ }
+ }
+ }
+ });
+
+ // On all of the remaining worker threads, just scan directories.
+ for _ in 1..self.executor.num_cpus() {
+ scope.spawn(async {
+ while let Ok(job) = rx.recv().await {
+ if let Err(err) = self
+ .scan_dir(root_char_bag, next_entry_id.clone(), &job)
+ .await
+ {
+ log::error!("error scanning {:?}: {}", job.abs_path, err);
+ }
+ }
+ });
+ }
+ })
+ .await;
}
- drop(initializing_task);
+ self.snapshot.lock().scan_completed();
if self
.notify
- .unbounded_send(ScanState::Initialized(self.snapshot.lock().clone()))
+ .unbounded_send(ScanState::Initialized {
+ snapshot: self.snapshot.lock().clone(),
+ })
.is_err()
{
return;
}
- futures::pin_mut!(events_rx);
-
// Process any events that occurred while performing the initial scan. These
// events can't be reported as precisely, because there is no snapshot of the
// worktree before they occurred.
+ futures::pin_mut!(events_rx);
if let Poll::Ready(Some(mut events)) = futures::poll!(events_rx.next()) {
while let Poll::Ready(Some(additional_events)) = futures::poll!(events_rx.next()) {
events.extend(additional_events);
@@ -2224,7 +2306,10 @@ impl BackgroundScanner {
if self.notify.unbounded_send(ScanState::Updating).is_err() {
return;
}
- if !self.process_events(events, true).await {
+ if !self
+ .process_events(events.into_iter().map(|e| e.path).collect(), true)
+ .await
+ {
return;
}
if self
@@ -2242,24 +2327,17 @@ impl BackgroundScanner {
// Continue processing events until the worktree is dropped.
loop {
- let events;
+ let abs_paths;
let barrier;
select_biased! {
request = changed_paths.next().fuse() => {
- let Some((paths, b)) = request else { break; };
- events = paths
- .into_iter()
- .map(|path| fsevent::Event {
- path,
- event_id: 0,
- flags: fsevent::StreamFlags::NONE
- })
- .collect::<Vec<_>>();
+ let Some((paths, b)) = request else { break };
+ abs_paths = paths;
barrier = Some(b);
}
- e = events_rx.next().fuse() => {
- let Some(e) = e else { break; };
- events = e;
+ events = events_rx.next().fuse() => {
+ let Some(events) = events else { break };
+ abs_paths = events.into_iter().map(|e| e.path).collect();
barrier = None;
}
}
@@ -2267,7 +2345,7 @@ impl BackgroundScanner {
if self.notify.unbounded_send(ScanState::Updating).is_err() {
return;
}
- if !self.process_events(events, false).await {
+ if !self.process_events(abs_paths, false).await {
return;
}
if self
@@ -2284,85 +2362,12 @@ impl BackgroundScanner {
}
}
- async fn scan_dirs(&mut self) -> Result<()> {
- let root_char_bag;
- let root_abs_path;
- let root_inode;
- let is_dir;
- let next_entry_id;
- {
- let mut snapshot = self.snapshot.lock();
- snapshot.scan_started();
- root_char_bag = snapshot.root_char_bag;
- root_abs_path = snapshot.abs_path.clone();
- root_inode = snapshot.root_entry().map(|e| e.inode);
- is_dir = snapshot.root_entry().map_or(false, |e| e.is_dir());
- next_entry_id = snapshot.next_entry_id.clone();
- };
-
- // Populate ignores above the root.
- for ancestor in root_abs_path.ancestors().skip(1) {
- if let Ok(ignore) = build_gitignore(&ancestor.join(&*GITIGNORE), self.fs.as_ref()).await
- {
- self.snapshot
- .lock()
- .ignores_by_parent_abs_path
- .insert(ancestor.into(), (ignore.into(), 0));
- }
- }
-
- let ignore_stack = {
- let mut snapshot = self.snapshot.lock();
- let ignore_stack = snapshot.ignore_stack_for_abs_path(&root_abs_path, true);
- if ignore_stack.is_all() {
- if let Some(mut root_entry) = snapshot.root_entry().cloned() {
- root_entry.is_ignored = true;
- snapshot.insert_entry(root_entry, self.fs.as_ref());
- }
- }
- ignore_stack
- };
-
- if is_dir {
- let path: Arc<Path> = Arc::from(Path::new(""));
- let mut ancestor_inodes = TreeSet::default();
- if let Some(root_inode) = root_inode {
- ancestor_inodes.insert(root_inode);
- }
-
- let (tx, rx) = channel::unbounded();
- self.executor
- .block(tx.send(ScanJob {
- abs_path: root_abs_path.to_path_buf(),
- path,
- ignore_stack,
- ancestor_inodes,
- scan_queue: tx.clone(),
- }))
- .unwrap();
- drop(tx);
-
- self.executor
- .scoped(|scope| {
- for _ in 0..self.executor.num_cpus() {
- scope.spawn(async {
- while let Ok(job) = rx.recv().await {
- if let Err(err) = self
- .scan_dir(root_char_bag, next_entry_id.clone(), &job)
- .await
- {
- log::error!("error scanning {:?}: {}", job.abs_path, err);
- }
- }
- });
- }
- })
- .await;
-
- self.snapshot.lock().scan_completed();
+ async fn delay(&self) {
+ #[cfg(any(test, feature = "test-support"))]
+ if self.fs.is_fake() {
+ return self.executor.simulate_random_delay().await;
}
-
- Ok(())
+ smol::Timer::after(Duration::from_millis(100)).await;
}
async fn scan_dir(
@@ -2492,85 +2497,125 @@ impl BackgroundScanner {
async fn process_events(
&mut self,
- mut events: Vec<fsevent::Event>,
+ abs_paths: Vec<PathBuf>,
received_before_initialized: bool,
) -> bool {
- events.sort_unstable_by(|a, b| a.path.cmp(&b.path));
- events.dedup_by(|a, b| a.path.starts_with(&b.path));
+ let (scan_queue_tx, scan_queue_rx) = channel::unbounded();
- let root_char_bag;
- let root_abs_path;
- let next_entry_id;
- let prev_snapshot;
- {
+ let prev_snapshot = {
let mut snapshot = self.snapshot.lock();
- prev_snapshot = snapshot.snapshot.clone();
- root_char_bag = snapshot.root_char_bag;
- root_abs_path = snapshot.abs_path.clone();
- next_entry_id = snapshot.next_entry_id.clone();
snapshot.scan_started();
- }
+ snapshot.clone()
+ };
- let root_canonical_path = if let Ok(path) = self.fs.canonicalize(&root_abs_path).await {
- path
+ let event_paths = if let Some(event_paths) = self
+ .update_entries_for_paths(abs_paths, Some(scan_queue_tx))
+ .await
+ {
+ event_paths
} else {
return false;
};
+
+ // Scan any directories that were created as part of this event batch.
+ self.executor
+ .scoped(|scope| {
+ for _ in 0..self.executor.num_cpus() {
+ scope.spawn(async {
+ while let Ok(job) = scan_queue_rx.recv().await {
+ if let Err(err) = self
+ .scan_dir(
+ prev_snapshot.root_char_bag,
+ prev_snapshot.next_entry_id.clone(),
+ &job,
+ )
+ .await
+ {
+ log::error!("error scanning {:?}: {}", job.abs_path, err);
+ }
+ }
+ });
+ }
+ })
+ .await;
+
+ // Attempt to detect renames only over a single batch of file-system events.
+ self.snapshot.lock().removed_entry_ids.clear();
+
+ self.update_ignore_statuses().await;
+ self.update_git_repositories();
+ self.build_change_set(
+ prev_snapshot.snapshot,
+ event_paths,
+ received_before_initialized,
+ );
+ self.snapshot.lock().scan_completed();
+ true
+ }
+
+ async fn update_entries_for_paths(
+ &self,
+ mut abs_paths: Vec<PathBuf>,
+ scan_queue_tx: Option<Sender<ScanJob>>,
+ ) -> Option<Vec<Arc<Path>>> {
+ abs_paths.sort_unstable();
+ abs_paths.dedup_by(|a, b| a.starts_with(&b));
+
+ let root_abs_path = self.snapshot.lock().abs_path.clone();
+ let root_canonical_path = self.fs.canonicalize(&root_abs_path).await.ok()?;
let metadata = futures::future::join_all(
- events
+ abs_paths
.iter()
- .map(|event| self.fs.metadata(&event.path))
+ .map(|abs_path| self.fs.metadata(&abs_path))
.collect::<Vec<_>>(),
)
.await;
- // Hold the snapshot lock while clearing and re-inserting the root entries
- // for each event. This way, the snapshot is not observable to the foreground
- // thread while this operation is in-progress.
- let mut event_paths = Vec::with_capacity(events.len());
- let (scan_queue_tx, scan_queue_rx) = channel::unbounded();
- {
- let mut snapshot = self.snapshot.lock();
- for event in &events {
- if let Ok(path) = event.path.strip_prefix(&root_canonical_path) {
+ let mut snapshot = self.snapshot.lock();
+ if scan_queue_tx.is_some() {
+ for abs_path in &abs_paths {
+ if let Ok(path) = abs_path.strip_prefix(&root_canonical_path) {
snapshot.remove_path(path);
}
}
+ }
- for (event, metadata) in events.into_iter().zip(metadata.into_iter()) {
- let path: Arc<Path> = match event.path.strip_prefix(&root_canonical_path) {
- Ok(path) => Arc::from(path.to_path_buf()),
- Err(_) => {
- log::error!(
- "unexpected event {:?} for root path {:?}",
- event.path,
- root_canonical_path
- );
- continue;
- }
- };
- event_paths.push(path.clone());
- let abs_path = root_abs_path.join(&path);
-
- match metadata {
- Ok(Some(metadata)) => {
- let ignore_stack =
- snapshot.ignore_stack_for_abs_path(&abs_path, metadata.is_dir);
- let mut fs_entry = Entry::new(
- path.clone(),
- &metadata,
- snapshot.next_entry_id.as_ref(),
- snapshot.root_char_bag,
- );
- fs_entry.is_ignored = ignore_stack.is_all();
- snapshot.insert_entry(fs_entry, self.fs.as_ref());
+ let mut event_paths = Vec::with_capacity(abs_paths.len());
+ for (abs_path, metadata) in abs_paths.into_iter().zip(metadata.into_iter()) {
+ let path: Arc<Path> = match abs_path.strip_prefix(&root_canonical_path) {
+ Ok(path) => Arc::from(path.to_path_buf()),
+ Err(_) => {
+ log::error!(
+ "unexpected event {:?} for root path {:?}",
+ abs_path,
+ root_canonical_path
+ );
+ continue;
+ }
+ };
+ event_paths.push(path.clone());
+ let abs_path = root_abs_path.join(&path);
+
+ match metadata {
+ Ok(Some(metadata)) => {
+ let ignore_stack =
+ snapshot.ignore_stack_for_abs_path(&abs_path, metadata.is_dir);
+ let mut fs_entry = Entry::new(
+ path.clone(),
+ &metadata,
+ snapshot.next_entry_id.as_ref(),
+ snapshot.root_char_bag,
+ );
+ fs_entry.is_ignored = ignore_stack.is_all();
+ snapshot.insert_entry(fs_entry, self.fs.as_ref());
- let scan_id = snapshot.scan_id;
- if let Some(repo) = snapshot.in_dot_git(&path) {
- repo.repo.lock().reload_index();
- repo.scan_id = scan_id;
- }
+ let scan_id = snapshot.scan_id;
+ if let Some(repo) = snapshot.repo_with_dot_git_containing(&path) {
+ repo.repo.lock().reload_index();
+ repo.scan_id = scan_id;
+ }
+ if let Some(scan_queue_tx) = &scan_queue_tx {
let mut ancestor_inodes = snapshot.ancestor_inodes_for_path(&path);
if metadata.is_dir && !ancestor_inodes.contains(&metadata.inode) {
ancestor_inodes.insert(metadata.inode);
@@ -2585,42 +2630,16 @@ impl BackgroundScanner {
.unwrap();
}
}
- Ok(None) => {}
- Err(err) => {
- // TODO - create a special 'error' entry in the entries tree to mark this
- log::error!("error reading file on event {:?}", err);
- }
+ }
+ Ok(None) => {}
+ Err(err) => {
+ // TODO - create a special 'error' entry in the entries tree to mark this
+ log::error!("error reading file on event {:?}", err);
}
}
- drop(scan_queue_tx);
}
- // Scan any directories that were created as part of this event batch.
- self.executor
- .scoped(|scope| {
- for _ in 0..self.executor.num_cpus() {
- scope.spawn(async {
- while let Ok(job) = scan_queue_rx.recv().await {
- if let Err(err) = self
- .scan_dir(root_char_bag, next_entry_id.clone(), &job)
- .await
- {
- log::error!("error scanning {:?}: {}", job.abs_path, err);
- }
- }
- });
- }
- })
- .await;
-
- // Attempt to detect renames only over a single batch of file-system events.
- self.snapshot.lock().removed_entry_ids.clear();
-
- self.update_ignore_statuses().await;
- self.update_git_repositories();
- self.build_change_set(prev_snapshot, event_paths, received_before_initialized);
- self.snapshot.lock().scan_completed();
- true
+ Some(event_paths)
}
async fn update_ignore_statuses(&self) {