@@ -1459,7 +1459,7 @@ impl Project {
};
cx.foreground().spawn(async move {
- pump_loading_buffer_reciever(loading_watch)
+ wait_for_loading_buffer(loading_watch)
.await
.map_err(|error| anyhow!("{}", error))
})
@@ -4847,7 +4847,7 @@ impl Project {
if worktree.read(cx).is_local() {
cx.subscribe(worktree, |this, worktree, event, cx| match event {
worktree::Event::UpdatedEntries(changes) => {
- this.update_local_worktree_buffers(&worktree, &changes, cx);
+ this.update_local_worktree_buffers(&worktree, changes, cx);
this.update_local_worktree_language_servers(&worktree, changes, cx);
}
worktree::Event::UpdatedGitRepositories(updated_repos) => {
@@ -4881,13 +4881,13 @@ impl Project {
fn update_local_worktree_buffers(
&mut self,
worktree_handle: &ModelHandle<Worktree>,
- changes: &HashMap<(Arc<Path>, ProjectEntryId), PathChange>,
+ changes: &[(Arc<Path>, ProjectEntryId, PathChange)],
cx: &mut ModelContext<Self>,
) {
let snapshot = worktree_handle.read(cx).snapshot();
let mut renamed_buffers = Vec::new();
- for (path, entry_id) in changes.keys() {
+ for (path, entry_id, _) in changes {
let worktree_id = worktree_handle.read(cx).id();
let project_path = ProjectPath {
worktree_id,
@@ -4993,7 +4993,7 @@ impl Project {
fn update_local_worktree_language_servers(
&mut self,
worktree_handle: &ModelHandle<Worktree>,
- changes: &HashMap<(Arc<Path>, ProjectEntryId), PathChange>,
+ changes: &[(Arc<Path>, ProjectEntryId, PathChange)],
cx: &mut ModelContext<Self>,
) {
if changes.is_empty() {
@@ -5024,23 +5024,21 @@ impl Project {
let params = lsp::DidChangeWatchedFilesParams {
changes: changes
.iter()
- .filter_map(|((path, _), change)| {
- if watched_paths.is_match(&path) {
- Some(lsp::FileEvent {
- uri: lsp::Url::from_file_path(abs_path.join(path))
- .unwrap(),
- typ: match change {
- PathChange::Added => lsp::FileChangeType::CREATED,
- PathChange::Removed => lsp::FileChangeType::DELETED,
- PathChange::Updated
- | PathChange::AddedOrUpdated => {
- lsp::FileChangeType::CHANGED
- }
- },
- })
- } else {
- None
+ .filter_map(|(path, _, change)| {
+ if !watched_paths.is_match(&path) {
+ return None;
}
+ let typ = match change {
+ PathChange::Loaded => return None,
+ PathChange::Added => lsp::FileChangeType::CREATED,
+ PathChange::Removed => lsp::FileChangeType::DELETED,
+ PathChange::Updated => lsp::FileChangeType::CHANGED,
+ PathChange::AddedOrUpdated => lsp::FileChangeType::CHANGED,
+ };
+ Some(lsp::FileEvent {
+ uri: lsp::Url::from_file_path(abs_path.join(path)).unwrap(),
+ typ,
+ })
})
.collect(),
};
@@ -5059,98 +5057,102 @@ impl Project {
fn update_local_worktree_buffers_git_repos(
&mut self,
worktree_handle: ModelHandle<Worktree>,
- repos: &HashMap<Arc<Path>, LocalRepositoryEntry>,
+ changed_repos: &UpdatedGitRepositoriesSet,
cx: &mut ModelContext<Self>,
) {
debug_assert!(worktree_handle.read(cx).is_local());
- // Setup the pending buffers
+ // Identify the loading buffers whose containing repository that has changed.
let future_buffers = self
.loading_buffers_by_path
.iter()
- .filter_map(|(path, receiver)| {
- let path = &path.path;
- let (work_directory, repo) = repos
- .iter()
- .find(|(work_directory, _)| path.starts_with(work_directory))?;
-
- let repo_relative_path = path.strip_prefix(work_directory).log_err()?;
-
+ .filter_map(|(project_path, receiver)| {
+ if project_path.worktree_id != worktree_handle.read(cx).id() {
+ return None;
+ }
+ let path = &project_path.path;
+ changed_repos.iter().find(|(work_dir, change)| {
+ path.starts_with(work_dir) && change.git_dir_changed
+ })?;
let receiver = receiver.clone();
- let repo_ptr = repo.repo_ptr.clone();
- let repo_relative_path = repo_relative_path.to_owned();
+ let path = path.clone();
Some(async move {
- pump_loading_buffer_reciever(receiver)
+ wait_for_loading_buffer(receiver)
.await
.ok()
- .map(|buffer| (buffer, repo_relative_path, repo_ptr))
+ .map(|buffer| (buffer, path))
})
})
- .collect::<FuturesUnordered<_>>()
- .filter_map(|result| async move {
- let (buffer_handle, repo_relative_path, repo_ptr) = result?;
+ .collect::<FuturesUnordered<_>>();
- let lock = repo_ptr.lock();
- lock.load_index_text(&repo_relative_path)
- .map(|diff_base| (diff_base, buffer_handle))
- });
+ // Identify the current buffers whose containing repository has changed.
+ let current_buffers = self
+ .opened_buffers
+ .values()
+ .filter_map(|buffer| {
+ let buffer = buffer.upgrade(cx)?;
+ let file = File::from_dyn(buffer.read(cx).file())?;
+ if file.worktree != worktree_handle {
+ return None;
+ }
+ let path = file.path();
+ changed_repos.iter().find(|(work_dir, change)| {
+ path.starts_with(work_dir) && change.git_dir_changed
+ })?;
+ Some((buffer, path.clone()))
+ })
+ .collect::<Vec<_>>();
- let update_diff_base_fn = update_diff_base(self);
- cx.spawn(|_, mut cx| async move {
- let diff_base_tasks = cx
+ if future_buffers.len() + current_buffers.len() == 0 {
+ return;
+ }
+
+ let remote_id = self.remote_id();
+ let client = self.client.clone();
+ cx.spawn_weak(move |_, mut cx| async move {
+ // Wait for all of the buffers to load.
+ let future_buffers = future_buffers.collect::<Vec<_>>().await;
+
+ // Reload the diff base for every buffer whose containing git repository has changed.
+ let snapshot =
+ worktree_handle.read_with(&cx, |tree, _| tree.as_local().unwrap().snapshot());
+ let diff_bases_by_buffer = cx
.background()
- .spawn(future_buffers.collect::<Vec<_>>())
+ .spawn(async move {
+ future_buffers
+ .into_iter()
+ .filter_map(|e| e)
+ .chain(current_buffers)
+ .filter_map(|(buffer, path)| {
+ let (work_directory, repo) =
+ snapshot.repository_and_work_directory_for_path(&path)?;
+ let repo = snapshot.get_local_repo(&repo)?;
+ let relative_path = path.strip_prefix(&work_directory).ok()?;
+ let base_text = repo.repo_ptr.lock().load_index_text(&relative_path);
+ Some((buffer, base_text))
+ })
+ .collect::<Vec<_>>()
+ })
.await;
- for (diff_base, buffer) in diff_base_tasks.into_iter() {
- update_diff_base_fn(Some(diff_base), buffer, &mut cx);
+ // Assign the new diff bases on all of the buffers.
+ for (buffer, diff_base) in diff_bases_by_buffer {
+ let buffer_id = buffer.update(&mut cx, |buffer, cx| {
+ buffer.set_diff_base(diff_base.clone(), cx);
+ buffer.remote_id()
+ });
+ if let Some(project_id) = remote_id {
+ client
+ .send(proto::UpdateDiffBase {
+ project_id,
+ buffer_id,
+ diff_base,
+ })
+ .log_err();
+ }
}
})
.detach();
-
- // And the current buffers
- for (_, buffer) in &self.opened_buffers {
- if let Some(buffer) = buffer.upgrade(cx) {
- let file = match File::from_dyn(buffer.read(cx).file()) {
- Some(file) => file,
- None => continue,
- };
- if file.worktree != worktree_handle {
- continue;
- }
-
- let path = file.path().clone();
-
- let worktree = worktree_handle.read(cx);
-
- let (work_directory, repo) = match repos
- .iter()
- .find(|(work_directory, _)| path.starts_with(work_directory))
- {
- Some(repo) => repo.clone(),
- None => continue,
- };
-
- let relative_repo = match path.strip_prefix(work_directory).log_err() {
- Some(relative_repo) => relative_repo.to_owned(),
- None => continue,
- };
-
- drop(worktree);
-
- let update_diff_base_fn = update_diff_base(self);
- let git_ptr = repo.repo_ptr.clone();
- let diff_base_task = cx
- .background()
- .spawn(async move { git_ptr.lock().load_index_text(&relative_repo) });
-
- cx.spawn(|_, mut cx| async move {
- let diff_base = diff_base_task.await;
- update_diff_base_fn(diff_base, buffer, &mut cx);
- })
- .detach();
- }
- }
}
pub fn set_active_path(&mut self, entry: Option<ProjectPath>, cx: &mut ModelContext<Self>) {
@@ -7072,7 +7074,7 @@ impl Item for Buffer {
}
}
-async fn pump_loading_buffer_reciever(
+async fn wait_for_loading_buffer(
mut receiver: postage::watch::Receiver<Option<Result<ModelHandle<Buffer>, Arc<anyhow::Error>>>>,
) -> Result<ModelHandle<Buffer>, Arc<anyhow::Error>> {
loop {
@@ -7085,26 +7087,3 @@ async fn pump_loading_buffer_reciever(
receiver.next().await;
}
}
-
-fn update_diff_base(
- project: &Project,
-) -> impl Fn(Option<String>, ModelHandle<Buffer>, &mut AsyncAppContext) {
- let remote_id = project.remote_id();
- let client = project.client().clone();
- move |diff_base, buffer, cx| {
- let buffer_id = buffer.update(cx, |buffer, cx| {
- buffer.set_diff_base(diff_base.clone(), cx);
- buffer.remote_id()
- });
-
- if let Some(project_id) = remote_id {
- client
- .send(proto::UpdateDiffBase {
- project_id,
- buffer_id: buffer_id as u64,
- diff_base,
- })
- .log_err();
- }
- }
-}
@@ -17,7 +17,7 @@ use futures::{
},
select_biased,
task::Poll,
- Stream, StreamExt,
+ FutureExt, Stream, StreamExt,
};
use fuzzy::CharBag;
use git::{DOT_GIT, GITIGNORE};
@@ -55,7 +55,7 @@ use std::{
time::{Duration, SystemTime},
};
use sum_tree::{Bias, Edit, SeekTarget, SumTree, TreeMap, TreeSet};
-use util::{paths::HOME, ResultExt, TakeUntilExt, TryFutureExt};
+use util::{paths::HOME, ResultExt, TakeUntilExt};
#[derive(Copy, Clone, PartialEq, Eq, Debug, Hash, PartialOrd, Ord)]
pub struct WorktreeId(usize);
@@ -317,18 +317,20 @@ pub struct LocalSnapshot {
git_repositories: TreeMap<ProjectEntryId, LocalRepositoryEntry>,
}
-pub struct LocalMutableSnapshot {
+pub struct BackgroundScannerState {
snapshot: LocalSnapshot,
/// The ids of all of the entries that were removed from the snapshot
/// as part of the current update. These entry ids may be re-used
/// if the same inode is discovered at a new path, or if the given
/// path is re-created after being deleted.
removed_entry_ids: HashMap<u64, ProjectEntryId>,
+ changed_paths: Vec<Arc<Path>>,
+ prev_snapshot: Snapshot,
}
#[derive(Debug, Clone)]
pub struct LocalRepositoryEntry {
- pub(crate) scan_id: usize,
+ pub(crate) work_dir_scan_id: usize,
pub(crate) git_dir_scan_id: usize,
pub(crate) repo_ptr: Arc<Mutex<dyn GitRepository>>,
/// Path to the actual .git folder.
@@ -357,25 +359,11 @@ impl DerefMut for LocalSnapshot {
}
}
-impl Deref for LocalMutableSnapshot {
- type Target = LocalSnapshot;
-
- fn deref(&self) -> &Self::Target {
- &self.snapshot
- }
-}
-
-impl DerefMut for LocalMutableSnapshot {
- fn deref_mut(&mut self) -> &mut Self::Target {
- &mut self.snapshot
- }
-}
-
enum ScanState {
Started,
Updated {
snapshot: LocalSnapshot,
- changes: HashMap<(Arc<Path>, ProjectEntryId), PathChange>,
+ changes: UpdatedEntriesSet,
barrier: Option<barrier::Sender>,
scanning: bool,
},
@@ -383,14 +371,15 @@ enum ScanState {
struct ShareState {
project_id: u64,
- snapshots_tx: watch::Sender<LocalSnapshot>,
+ snapshots_tx:
+ mpsc::UnboundedSender<(LocalSnapshot, UpdatedEntriesSet, UpdatedGitRepositoriesSet)>,
resume_updates: watch::Sender<()>,
_maintain_remote_snapshot: Task<Option<()>>,
}
pub enum Event {
- UpdatedEntries(HashMap<(Arc<Path>, ProjectEntryId), PathChange>),
- UpdatedGitRepositories(HashMap<Arc<Path>, LocalRepositoryEntry>),
+ UpdatedEntries(UpdatedEntriesSet),
+ UpdatedGitRepositories(UpdatedGitRepositoriesSet),
}
impl Entity for Worktree {
@@ -465,8 +454,7 @@ impl Worktree {
scanning,
} => {
*this.is_scanning.0.borrow_mut() = scanning;
- this.set_snapshot(snapshot, cx);
- cx.emit(Event::UpdatedEntries(changes));
+ this.set_snapshot(snapshot, changes, cx);
drop(barrier);
}
}
@@ -560,7 +548,7 @@ impl Worktree {
this.update(&mut cx, |this, cx| {
let this = this.as_remote_mut().unwrap();
this.snapshot = this.background_snapshot.lock().clone();
- cx.emit(Event::UpdatedEntries(Default::default()));
+ cx.emit(Event::UpdatedEntries(Arc::from([])));
cx.notify();
while let Some((scan_id, _)) = this.snapshot_subscriptions.front() {
if this.observed_snapshot(*scan_id) {
@@ -832,73 +820,137 @@ impl LocalWorktree {
Ok(!old_summary.is_empty() || !new_summary.is_empty())
}
- fn set_snapshot(&mut self, new_snapshot: LocalSnapshot, cx: &mut ModelContext<Worktree>) {
- let updated_repos =
- self.changed_repos(&self.git_repositories, &new_snapshot.git_repositories);
+ fn set_snapshot(
+ &mut self,
+ new_snapshot: LocalSnapshot,
+ entry_changes: UpdatedEntriesSet,
+ cx: &mut ModelContext<Worktree>,
+ ) {
+ let repo_changes = self.changed_repos(&self.snapshot, &new_snapshot);
self.snapshot = new_snapshot;
if let Some(share) = self.share.as_mut() {
- *share.snapshots_tx.borrow_mut() = self.snapshot.clone();
+ share
+ .snapshots_tx
+ .unbounded_send((
+ self.snapshot.clone(),
+ entry_changes.clone(),
+ repo_changes.clone(),
+ ))
+ .ok();
}
- if !updated_repos.is_empty() {
- cx.emit(Event::UpdatedGitRepositories(updated_repos));
+ if !entry_changes.is_empty() {
+ cx.emit(Event::UpdatedEntries(entry_changes));
+ }
+ if !repo_changes.is_empty() {
+ cx.emit(Event::UpdatedGitRepositories(repo_changes));
}
}
fn changed_repos(
&self,
- old_repos: &TreeMap<ProjectEntryId, LocalRepositoryEntry>,
- new_repos: &TreeMap<ProjectEntryId, LocalRepositoryEntry>,
- ) -> HashMap<Arc<Path>, LocalRepositoryEntry> {
- let mut diff = HashMap::default();
- let mut old_repos = old_repos.iter().peekable();
- let mut new_repos = new_repos.iter().peekable();
+ old_snapshot: &LocalSnapshot,
+ new_snapshot: &LocalSnapshot,
+ ) -> UpdatedGitRepositoriesSet {
+ let mut changes = Vec::new();
+ let mut old_repos = old_snapshot.git_repositories.iter().peekable();
+ let mut new_repos = new_snapshot.git_repositories.iter().peekable();
loop {
- match (old_repos.peek(), new_repos.peek()) {
- (Some((old_entry_id, old_repo)), Some((new_entry_id, new_repo))) => {
- match Ord::cmp(old_entry_id, new_entry_id) {
+ match (new_repos.peek().map(clone), old_repos.peek().map(clone)) {
+ (Some((new_entry_id, new_repo)), Some((old_entry_id, old_repo))) => {
+ match Ord::cmp(&new_entry_id, &old_entry_id) {
Ordering::Less => {
- if let Some(entry) = self.entry_for_id(**old_entry_id) {
- diff.insert(entry.path.clone(), (*old_repo).clone());
+ if let Some(entry) = new_snapshot.entry_for_id(new_entry_id) {
+ changes.push((
+ entry.path.clone(),
+ GitRepositoryChange {
+ old_repository: None,
+ git_dir_changed: true,
+ },
+ ));
}
- old_repos.next();
+ new_repos.next();
}
Ordering::Equal => {
- if old_repo.git_dir_scan_id != new_repo.git_dir_scan_id {
- if let Some(entry) = self.entry_for_id(**new_entry_id) {
- diff.insert(entry.path.clone(), (*new_repo).clone());
+ let git_dir_changed =
+ new_repo.git_dir_scan_id != old_repo.git_dir_scan_id;
+ let work_dir_changed =
+ new_repo.work_dir_scan_id != old_repo.work_dir_scan_id;
+ if git_dir_changed || work_dir_changed {
+ if let Some(entry) = new_snapshot.entry_for_id(new_entry_id) {
+ let old_repo = old_snapshot
+ .repository_entries
+ .get(&RepositoryWorkDirectory(entry.path.clone()))
+ .cloned();
+ changes.push((
+ entry.path.clone(),
+ GitRepositoryChange {
+ old_repository: old_repo,
+ git_dir_changed,
+ },
+ ));
}
}
-
- old_repos.next();
new_repos.next();
+ old_repos.next();
}
Ordering::Greater => {
- if let Some(entry) = self.entry_for_id(**new_entry_id) {
- diff.insert(entry.path.clone(), (*new_repo).clone());
+ if let Some(entry) = old_snapshot.entry_for_id(old_entry_id) {
+ let old_repo = old_snapshot
+ .repository_entries
+ .get(&RepositoryWorkDirectory(entry.path.clone()))
+ .cloned();
+ changes.push((
+ entry.path.clone(),
+ GitRepositoryChange {
+ old_repository: old_repo,
+ git_dir_changed: true,
+ },
+ ));
}
- new_repos.next();
+ old_repos.next();
}
}
}
- (Some((old_entry_id, old_repo)), None) => {
- if let Some(entry) = self.entry_for_id(**old_entry_id) {
- diff.insert(entry.path.clone(), (*old_repo).clone());
+ (Some((entry_id, _)), None) => {
+ if let Some(entry) = new_snapshot.entry_for_id(entry_id) {
+ changes.push((
+ entry.path.clone(),
+ GitRepositoryChange {
+ old_repository: None,
+ git_dir_changed: true,
+ },
+ ));
}
- old_repos.next();
+ new_repos.next();
}
- (None, Some((new_entry_id, new_repo))) => {
- if let Some(entry) = self.entry_for_id(**new_entry_id) {
- diff.insert(entry.path.clone(), (*new_repo).clone());
+ (None, Some((entry_id, _))) => {
+ if let Some(entry) = old_snapshot.entry_for_id(entry_id) {
+ let old_repo = old_snapshot
+ .repository_entries
+ .get(&RepositoryWorkDirectory(entry.path.clone()))
+ .cloned();
+ changes.push((
+ entry.path.clone(),
+ GitRepositoryChange {
+ old_repository: old_repo,
+ git_dir_changed: true,
+ },
+ ));
}
- new_repos.next();
+ old_repos.next();
}
(None, None) => break,
}
}
- diff
+
+ fn clone<T: Clone, U: Clone>(value: &(&T, &U)) -> (T, U) {
+ (value.0.clone(), value.1.clone())
+ }
+
+ changes.into()
}
pub fn scan_complete(&self) -> impl Future<Output = ()> {
@@ -1239,89 +1291,97 @@ impl LocalWorktree {
})
}
- pub fn share(&mut self, project_id: u64, cx: &mut ModelContext<Worktree>) -> Task<Result<()>> {
+ pub fn observe_updates<F, Fut>(
+ &mut self,
+ project_id: u64,
+ cx: &mut ModelContext<Worktree>,
+ callback: F,
+ ) -> oneshot::Receiver<()>
+ where
+ F: 'static + Send + Fn(proto::UpdateWorktree) -> Fut,
+ Fut: Send + Future<Output = bool>,
+ {
+ #[cfg(any(test, feature = "test-support"))]
+ const MAX_CHUNK_SIZE: usize = 2;
+ #[cfg(not(any(test, feature = "test-support")))]
+ const MAX_CHUNK_SIZE: usize = 256;
+
let (share_tx, share_rx) = oneshot::channel();
if let Some(share) = self.share.as_mut() {
- let _ = share_tx.send(());
+ share_tx.send(()).ok();
*share.resume_updates.borrow_mut() = ();
- } else {
- let (snapshots_tx, mut snapshots_rx) = watch::channel_with(self.snapshot());
- let (resume_updates_tx, mut resume_updates_rx) = watch::channel();
- let worktree_id = cx.model_id() as u64;
+ return share_rx;
+ }
- for (path, summaries) in &self.diagnostic_summaries {
- for (&server_id, summary) in summaries {
- if let Err(e) = self.client.send(proto::UpdateDiagnosticSummary {
- project_id,
- worktree_id,
- summary: Some(summary.to_proto(server_id, &path)),
- }) {
- return Task::ready(Err(e));
- }
+ let (resume_updates_tx, mut resume_updates_rx) = watch::channel::<()>();
+ let (snapshots_tx, mut snapshots_rx) =
+ mpsc::unbounded::<(LocalSnapshot, UpdatedEntriesSet, UpdatedGitRepositoriesSet)>();
+ snapshots_tx
+ .unbounded_send((self.snapshot(), Arc::from([]), Arc::from([])))
+ .ok();
+
+ let worktree_id = cx.model_id() as u64;
+ let _maintain_remote_snapshot = cx.background().spawn(async move {
+ let mut is_first = true;
+ while let Some((snapshot, entry_changes, repo_changes)) = snapshots_rx.next().await {
+ let update;
+ if is_first {
+ update = snapshot.build_initial_update(project_id, worktree_id);
+ is_first = false;
+ } else {
+ update =
+ snapshot.build_update(project_id, worktree_id, entry_changes, repo_changes);
}
- }
- let _maintain_remote_snapshot = cx.background().spawn({
- let client = self.client.clone();
- async move {
- let mut share_tx = Some(share_tx);
- let mut prev_snapshot = LocalSnapshot {
- ignores_by_parent_abs_path: Default::default(),
- git_repositories: Default::default(),
- snapshot: Snapshot {
- id: WorktreeId(worktree_id as usize),
- abs_path: Path::new("").into(),
- root_name: Default::default(),
- root_char_bag: Default::default(),
- entries_by_path: Default::default(),
- entries_by_id: Default::default(),
- repository_entries: Default::default(),
- scan_id: 0,
- completed_scan_id: 0,
- },
- };
- while let Some(snapshot) = snapshots_rx.recv().await {
- #[cfg(any(test, feature = "test-support"))]
- const MAX_CHUNK_SIZE: usize = 2;
- #[cfg(not(any(test, feature = "test-support")))]
- const MAX_CHUNK_SIZE: usize = 256;
-
- let update =
- snapshot.build_update(&prev_snapshot, project_id, worktree_id, true);
- for update in proto::split_worktree_update(update, MAX_CHUNK_SIZE) {
- let _ = resume_updates_rx.try_recv();
- while let Err(error) = client.request(update.clone()).await {
- log::error!("failed to send worktree update: {}", error);
- log::info!("waiting to resume updates");
- if resume_updates_rx.next().await.is_none() {
- return Ok(());
- }
+ for update in proto::split_worktree_update(update, MAX_CHUNK_SIZE) {
+ let _ = resume_updates_rx.try_recv();
+ loop {
+ let result = callback(update.clone());
+ if result.await {
+ break;
+ } else {
+ log::info!("waiting to resume updates");
+ if resume_updates_rx.next().await.is_none() {
+ return Some(());
}
}
+ }
+ }
+ }
+ share_tx.send(()).ok();
+ Some(())
+ });
- if let Some(share_tx) = share_tx.take() {
- let _ = share_tx.send(());
- }
+ self.share = Some(ShareState {
+ project_id,
+ snapshots_tx,
+ resume_updates: resume_updates_tx,
+ _maintain_remote_snapshot,
+ });
+ share_rx
+ }
- prev_snapshot = snapshot;
- }
+ pub fn share(&mut self, project_id: u64, cx: &mut ModelContext<Worktree>) -> Task<Result<()>> {
+ let client = self.client.clone();
- Ok::<_, anyhow::Error>(())
+ for (path, summaries) in &self.diagnostic_summaries {
+ for (&server_id, summary) in summaries {
+ if let Err(e) = self.client.send(proto::UpdateDiagnosticSummary {
+ project_id,
+ worktree_id: cx.model_id() as u64,
+ summary: Some(summary.to_proto(server_id, &path)),
+ }) {
+ return Task::ready(Err(e));
}
- .log_err()
- });
-
- self.share = Some(ShareState {
- project_id,
- snapshots_tx,
- resume_updates: resume_updates_tx,
- _maintain_remote_snapshot,
- });
+ }
}
+ let rx = self.observe_updates(project_id, cx, move |update| {
+ client.request(update).map(|result| result.is_ok())
+ });
cx.foreground()
- .spawn(async move { share_rx.await.map_err(|_| anyhow!("share ended")) })
+ .spawn(async move { rx.await.map_err(|_| anyhow!("share ended")) })
}
pub fn unshare(&mut self) {
@@ -1530,10 +1590,12 @@ impl Snapshot {
pub(crate) fn apply_remote_update(&mut self, mut update: proto::UpdateWorktree) -> Result<()> {
let mut entries_by_path_edits = Vec::new();
let mut entries_by_id_edits = Vec::new();
+
for entry_id in update.removed_entries {
- if let Some(entry) = self.entry_for_id(ProjectEntryId::from_proto(entry_id)) {
+ let entry_id = ProjectEntryId::from_proto(entry_id);
+ entries_by_id_edits.push(Edit::Remove(entry_id));
+ if let Some(entry) = self.entry_for_id(entry_id) {
entries_by_path_edits.push(Edit::Remove(PathKey(entry.path.clone())));
- entries_by_id_edits.push(Edit::Remove(entry.id));
}
}
@@ -1542,6 +1604,11 @@ impl Snapshot {
if let Some(PathEntry { path, .. }) = self.entries_by_id.get(&entry.id, &()) {
entries_by_path_edits.push(Edit::Remove(PathKey(path.clone())));
}
+ if let Some(old_entry) = self.entries_by_path.get(&PathKey(entry.path.clone()), &()) {
+ if old_entry.id != entry.id {
+ entries_by_id_edits.push(Edit::Remove(old_entry.id));
+ }
+ }
entries_by_id_edits.push(Edit::Insert(PathEntry {
id: entry.id,
path: entry.path.clone(),
@@ -1684,20 +1751,19 @@ impl Snapshot {
/// Get the repository whose work directory contains the given path.
pub fn repository_for_path(&self, path: &Path) -> Option<RepositoryEntry> {
- let mut max_len = 0;
- let mut current_candidate = None;
- for (work_directory, repo) in (&self.repository_entries).iter() {
- if path.starts_with(&work_directory.0) {
- if work_directory.0.as_os_str().len() >= max_len {
- current_candidate = Some(repo);
- max_len = work_directory.0.as_os_str().len();
- } else {
- break;
- }
- }
- }
+ self.repository_and_work_directory_for_path(path)
+ .map(|e| e.1)
+ }
- current_candidate.cloned()
+ pub fn repository_and_work_directory_for_path(
+ &self,
+ path: &Path,
+ ) -> Option<(RepositoryWorkDirectory, RepositoryEntry)> {
+ self.repository_entries
+ .iter()
+ .filter(|(workdir_path, _)| path.starts_with(workdir_path))
+ .last()
+ .map(|(path, repo)| (path.clone(), repo.clone()))
}
/// Given an ordered iterator of entries, returns an iterator of those entries,
@@ -1833,117 +1899,52 @@ impl LocalSnapshot {
.find(|(_, repo)| repo.in_dot_git(path))
}
- #[cfg(test)]
- pub(crate) fn build_initial_update(&self, project_id: u64) -> proto::UpdateWorktree {
- let root_name = self.root_name.clone();
- proto::UpdateWorktree {
- project_id,
- worktree_id: self.id().to_proto(),
- abs_path: self.abs_path().to_string_lossy().into(),
- root_name,
- updated_entries: self.entries_by_path.iter().map(Into::into).collect(),
- removed_entries: Default::default(),
- scan_id: self.scan_id as u64,
- is_last_update: true,
- updated_repositories: self.repository_entries.values().map(Into::into).collect(),
- removed_repositories: Default::default(),
- }
- }
-
- pub(crate) fn build_update(
+ fn build_update(
&self,
- other: &Self,
project_id: u64,
worktree_id: u64,
- include_ignored: bool,
+ entry_changes: UpdatedEntriesSet,
+ repo_changes: UpdatedGitRepositoriesSet,
) -> proto::UpdateWorktree {
let mut updated_entries = Vec::new();
let mut removed_entries = Vec::new();
- let mut self_entries = self
- .entries_by_id
- .cursor::<()>()
- .filter(|e| include_ignored || !e.is_ignored)
- .peekable();
- let mut other_entries = other
- .entries_by_id
- .cursor::<()>()
- .filter(|e| include_ignored || !e.is_ignored)
- .peekable();
- loop {
- match (self_entries.peek(), other_entries.peek()) {
- (Some(self_entry), Some(other_entry)) => {
- match Ord::cmp(&self_entry.id, &other_entry.id) {
- Ordering::Less => {
- let entry = self.entry_for_id(self_entry.id).unwrap().into();
- updated_entries.push(entry);
- self_entries.next();
- }
- Ordering::Equal => {
- if self_entry.scan_id != other_entry.scan_id {
- let entry = self.entry_for_id(self_entry.id).unwrap().into();
- updated_entries.push(entry);
- }
+ let mut updated_repositories = Vec::new();
+ let mut removed_repositories = Vec::new();
- self_entries.next();
- other_entries.next();
- }
- Ordering::Greater => {
- removed_entries.push(other_entry.id.to_proto());
- other_entries.next();
- }
- }
- }
- (Some(self_entry), None) => {
- let entry = self.entry_for_id(self_entry.id).unwrap().into();
- updated_entries.push(entry);
- self_entries.next();
- }
- (None, Some(other_entry)) => {
- removed_entries.push(other_entry.id.to_proto());
- other_entries.next();
- }
- (None, None) => break,
+ for (_, entry_id, path_change) in entry_changes.iter() {
+ if let PathChange::Removed = path_change {
+ removed_entries.push(entry_id.0 as u64);
+ } else if let Some(entry) = self.entry_for_id(*entry_id) {
+ updated_entries.push(proto::Entry::from(entry));
}
}
- let mut updated_repositories: Vec<proto::RepositoryEntry> = Vec::new();
- let mut removed_repositories = Vec::new();
- let mut self_repos = self.snapshot.repository_entries.iter().peekable();
- let mut other_repos = other.snapshot.repository_entries.iter().peekable();
- loop {
- match (self_repos.peek(), other_repos.peek()) {
- (Some((self_work_dir, self_repo)), Some((other_work_dir, other_repo))) => {
- match Ord::cmp(self_work_dir, other_work_dir) {
- Ordering::Less => {
- updated_repositories.push((*self_repo).into());
- self_repos.next();
- }
- Ordering::Equal => {
- if self_repo != other_repo {
- updated_repositories.push(self_repo.build_update(other_repo));
- }
-
- self_repos.next();
- other_repos.next();
- }
- Ordering::Greater => {
- removed_repositories.push(other_repo.work_directory.to_proto());
- other_repos.next();
- }
- }
+ for (work_dir_path, change) in repo_changes.iter() {
+ let new_repo = self
+ .repository_entries
+ .get(&RepositoryWorkDirectory(work_dir_path.clone()));
+ match (&change.old_repository, new_repo) {
+ (Some(old_repo), Some(new_repo)) => {
+ updated_repositories.push(new_repo.build_update(old_repo));
}
- (Some((_, self_repo)), None) => {
- updated_repositories.push((*self_repo).into());
- self_repos.next();
+ (None, Some(new_repo)) => {
+ updated_repositories.push(proto::RepositoryEntry::from(new_repo));
}
- (None, Some((_, other_repo))) => {
- removed_repositories.push(other_repo.work_directory.to_proto());
- other_repos.next();
+ (Some(old_repo), None) => {
+ removed_repositories.push(old_repo.work_directory.0.to_proto());
}
- (None, None) => break,
+ _ => {}
}
}
+ removed_entries.sort_unstable();
+ updated_entries.sort_unstable_by_key(|e| e.id);
+ removed_repositories.sort_unstable();
+ updated_repositories.sort_unstable_by_key(|e| e.work_directory_id);
+
+ // TODO - optimize, knowing that removed_entries are sorted.
+ removed_entries.retain(|id| updated_entries.binary_search_by_key(id, |e| e.id).is_err());
+
proto::UpdateWorktree {
project_id,
worktree_id,
@@ -1958,6 +1959,35 @@ impl LocalSnapshot {
}
}
+ fn build_initial_update(&self, project_id: u64, worktree_id: u64) -> proto::UpdateWorktree {
+ let mut updated_entries = self
+ .entries_by_path
+ .iter()
+ .map(proto::Entry::from)
+ .collect::<Vec<_>>();
+ updated_entries.sort_unstable_by_key(|e| e.id);
+
+ let mut updated_repositories = self
+ .repository_entries
+ .values()
+ .map(proto::RepositoryEntry::from)
+ .collect::<Vec<_>>();
+ updated_repositories.sort_unstable_by_key(|e| e.work_directory_id);
+
+ proto::UpdateWorktree {
+ project_id,
+ worktree_id,
+ abs_path: self.abs_path().to_string_lossy().into(),
+ root_name: self.root_name().to_string(),
+ updated_entries,
+ removed_entries: Vec::new(),
+ scan_id: self.scan_id as u64,
+ is_last_update: self.completed_scan_id == self.scan_id,
+ updated_repositories,
+ removed_repositories: Vec::new(),
+ }
+ }
+
fn insert_entry(&mut self, mut entry: Entry, fs: &dyn Fs) -> Entry {
if entry.is_file() && entry.path.file_name() == Some(&GITIGNORE) {
let abs_path = self.abs_path.join(&entry.path);
@@ -2041,7 +2071,7 @@ impl LocalSnapshot {
self.git_repositories.insert(
work_dir_id,
LocalRepositoryEntry {
- scan_id,
+ work_dir_scan_id: scan_id,
git_dir_scan_id: scan_id,
repo_ptr: repo,
git_dir_path: parent_path.clone(),
@@ -2090,11 +2120,11 @@ impl LocalSnapshot {
}
}
-impl LocalMutableSnapshot {
+impl BackgroundScannerState {
fn reuse_entry_id(&mut self, entry: &mut Entry) {
if let Some(removed_entry_id) = self.removed_entry_ids.remove(&entry.inode) {
entry.id = removed_entry_id;
- } else if let Some(existing_entry) = self.entry_for_path(&entry.path) {
+ } else if let Some(existing_entry) = self.snapshot.entry_for_path(&entry.path) {
entry.id = existing_entry.id;
}
}
@@ -2111,8 +2141,10 @@ impl LocalMutableSnapshot {
ignore: Option<Arc<Gitignore>>,
fs: &dyn Fs,
) {
- let mut parent_entry = if let Some(parent_entry) =
- self.entries_by_path.get(&PathKey(parent_path.clone()), &())
+ let mut parent_entry = if let Some(parent_entry) = self
+ .snapshot
+ .entries_by_path
+ .get(&PathKey(parent_path.clone()), &())
{
parent_entry.clone()
} else {
@@ -2132,13 +2164,14 @@ impl LocalMutableSnapshot {
}
if let Some(ignore) = ignore {
- let abs_parent_path = self.abs_path.join(&parent_path).into();
- self.ignores_by_parent_abs_path
+ let abs_parent_path = self.snapshot.abs_path.join(&parent_path).into();
+ self.snapshot
+ .ignores_by_parent_abs_path
.insert(abs_parent_path, (ignore, false));
}
if parent_path.file_name() == Some(&DOT_GIT) {
- self.build_repo(parent_path, fs);
+ self.snapshot.build_repo(parent_path, fs);
}
let mut entries_by_path_edits = vec![Edit::Insert(parent_entry)];
@@ -2150,25 +2183,27 @@ impl LocalMutableSnapshot {
id: entry.id,
path: entry.path.clone(),
is_ignored: entry.is_ignored,
- scan_id: self.scan_id,
+ scan_id: self.snapshot.scan_id,
}));
entries_by_path_edits.push(Edit::Insert(entry));
}
- self.entries_by_path.edit(entries_by_path_edits, &());
- self.entries_by_id.edit(entries_by_id_edits, &());
+ self.snapshot
+ .entries_by_path
+ .edit(entries_by_path_edits, &());
+ self.snapshot.entries_by_id.edit(entries_by_id_edits, &());
}
fn remove_path(&mut self, path: &Path) {
let mut new_entries;
let removed_entries;
{
- let mut cursor = self.entries_by_path.cursor::<TraversalProgress>();
+ let mut cursor = self.snapshot.entries_by_path.cursor::<TraversalProgress>();
new_entries = cursor.slice(&TraversalTarget::Path(path), Bias::Left, &());
removed_entries = cursor.slice(&TraversalTarget::PathSuccessor(path), Bias::Left, &());
new_entries.push_tree(cursor.suffix(&()), &());
}
- self.entries_by_path = new_entries;
+ self.snapshot.entries_by_path = new_entries;
let mut entries_by_id_edits = Vec::new();
for entry in removed_entries.cursor::<()>() {
@@ -2179,11 +2214,12 @@ impl LocalMutableSnapshot {
*removed_entry_id = cmp::max(*removed_entry_id, entry.id);
entries_by_id_edits.push(Edit::Remove(entry.id));
}
- self.entries_by_id.edit(entries_by_id_edits, &());
+ self.snapshot.entries_by_id.edit(entries_by_id_edits, &());
if path.file_name() == Some(&GITIGNORE) {
- let abs_parent_path = self.abs_path.join(path.parent().unwrap());
+ let abs_parent_path = self.snapshot.abs_path.join(path.parent().unwrap());
if let Some((_, needs_update)) = self
+ .snapshot
.ignores_by_parent_abs_path
.get_mut(abs_parent_path.as_path())
{
@@ -2473,12 +2509,31 @@ pub enum EntryKind {
#[derive(Clone, Copy, Debug)]
pub enum PathChange {
+ /// A filesystem entry was was created.
Added,
+ /// A filesystem entry was removed.
Removed,
+ /// A filesystem entry was updated.
Updated,
+ /// A filesystem entry was either updated or added. We don't know
+ /// whether or not it already existed, because the path had not
+ /// been loaded before the event.
AddedOrUpdated,
+ /// A filesystem entry was found during the initial scan of the worktree.
+ Loaded,
+}
+
+pub struct GitRepositoryChange {
+ /// The previous state of the repository, if it already existed.
+ pub old_repository: Option<RepositoryEntry>,
+ /// Whether the content of the .git directory changed. This will be false
+ /// if only the repository's work directory changed.
+ pub git_dir_changed: bool,
}
+pub type UpdatedEntriesSet = Arc<[(Arc<Path>, ProjectEntryId, PathChange)]>;
+pub type UpdatedGitRepositoriesSet = Arc<[(Arc<Path>, GitRepositoryChange)]>;
+
impl Entry {
fn new(
path: Arc<Path>,
@@ -2635,19 +2690,20 @@ impl<'a> sum_tree::Dimension<'a, EntrySummary> for PathKey {
}
struct BackgroundScanner {
- snapshot: Mutex<LocalMutableSnapshot>,
+ state: Mutex<BackgroundScannerState>,
fs: Arc<dyn Fs>,
status_updates_tx: UnboundedSender<ScanState>,
executor: Arc<executor::Background>,
refresh_requests_rx: channel::Receiver<(Vec<PathBuf>, barrier::Sender)>,
- prev_state: Mutex<BackgroundScannerState>,
next_entry_id: Arc<AtomicUsize>,
- finished_initial_scan: bool,
+ phase: BackgroundScannerPhase,
}
-struct BackgroundScannerState {
- snapshot: Snapshot,
- event_paths: Vec<Arc<Path>>,
+#[derive(PartialEq)]
+enum BackgroundScannerPhase {
+ InitialScan,
+ EventsReceivedDuringInitialScan,
+ Events,
}
impl BackgroundScanner {
@@ -2665,15 +2721,13 @@ impl BackgroundScanner {
executor,
refresh_requests_rx,
next_entry_id,
- prev_state: Mutex::new(BackgroundScannerState {
- snapshot: snapshot.snapshot.clone(),
- event_paths: Default::default(),
- }),
- snapshot: Mutex::new(LocalMutableSnapshot {
+ state: Mutex::new(BackgroundScannerState {
+ prev_snapshot: snapshot.snapshot.clone(),
snapshot,
removed_entry_ids: Default::default(),
+ changed_paths: Default::default(),
}),
- finished_initial_scan: false,
+ phase: BackgroundScannerPhase::InitialScan,
}
}
@@ -2684,7 +2738,7 @@ impl BackgroundScanner {
use futures::FutureExt as _;
let (root_abs_path, root_inode) = {
- let snapshot = self.snapshot.lock();
+ let snapshot = &self.state.lock().snapshot;
(
snapshot.abs_path.clone(),
snapshot.root_entry().map(|e| e.inode),
@@ -2696,20 +2750,23 @@ impl BackgroundScanner {
for ancestor in root_abs_path.ancestors().skip(1) {
if let Ok(ignore) = build_gitignore(&ancestor.join(&*GITIGNORE), self.fs.as_ref()).await
{
- self.snapshot
+ self.state
.lock()
+ .snapshot
.ignores_by_parent_abs_path
.insert(ancestor.into(), (ignore.into(), false));
}
}
{
- let mut snapshot = self.snapshot.lock();
- snapshot.scan_id += 1;
- ignore_stack = snapshot.ignore_stack_for_abs_path(&root_abs_path, true);
+ let mut state = self.state.lock();
+ state.snapshot.scan_id += 1;
+ ignore_stack = state
+ .snapshot
+ .ignore_stack_for_abs_path(&root_abs_path, true);
if ignore_stack.is_all() {
- if let Some(mut root_entry) = snapshot.root_entry().cloned() {
+ if let Some(mut root_entry) = state.snapshot.root_entry().cloned() {
root_entry.is_ignored = true;
- snapshot.insert_entry(root_entry, self.fs.as_ref());
+ state.insert_entry(root_entry, self.fs.as_ref());
}
}
};
@@ -2727,14 +2784,15 @@ impl BackgroundScanner {
drop(scan_job_tx);
self.scan_dirs(true, scan_job_rx).await;
{
- let mut snapshot = self.snapshot.lock();
- snapshot.completed_scan_id = snapshot.scan_id;
+ let mut state = self.state.lock();
+ state.snapshot.completed_scan_id = state.snapshot.scan_id;
}
self.send_status_update(false, None);
// Process any any FS events that occurred while performing the initial scan.
// For these events, update events cannot be as precise, because we didn't
// have the previous state loaded yet.
+ self.phase = BackgroundScannerPhase::EventsReceivedDuringInitialScan;
if let Poll::Ready(Some(events)) = futures::poll!(events_rx.next()) {
let mut paths = events.into_iter().map(|e| e.path).collect::<Vec<_>>();
while let Poll::Ready(Some(more_events)) = futures::poll!(events_rx.next()) {
@@ -2743,9 +2801,8 @@ impl BackgroundScanner {
self.process_events(paths).await;
}
- self.finished_initial_scan = true;
-
// Continue processing events until the worktree is dropped.
+ self.phase = BackgroundScannerPhase::Events;
loop {
select_biased! {
// Process any path refresh requests from the worktree. Prioritize
@@ -2770,15 +2827,7 @@ impl BackgroundScanner {
}
async fn process_refresh_request(&self, paths: Vec<PathBuf>, barrier: barrier::Sender) -> bool {
- if let Some(mut paths) = self.reload_entries_for_paths(paths, None).await {
- paths.sort_unstable();
- util::extend_sorted(
- &mut self.prev_state.lock().event_paths,
- paths,
- usize::MAX,
- Ord::cmp,
- );
- }
+ self.reload_entries_for_paths(paths, None).await;
self.send_status_update(false, Some(barrier))
}
@@ -2787,50 +2836,42 @@ impl BackgroundScanner {
let paths = self
.reload_entries_for_paths(paths, Some(scan_job_tx.clone()))
.await;
- if let Some(paths) = &paths {
- util::extend_sorted(
- &mut self.prev_state.lock().event_paths,
- paths.iter().cloned(),
- usize::MAX,
- Ord::cmp,
- );
- }
drop(scan_job_tx);
self.scan_dirs(false, scan_job_rx).await;
self.update_ignore_statuses().await;
- let mut snapshot = self.snapshot.lock();
+ {
+ let mut snapshot = &mut self.state.lock().snapshot;
- if let Some(paths) = paths {
- for path in paths {
- self.reload_repo_for_file_path(&path, &mut *snapshot, self.fs.as_ref());
+ if let Some(paths) = paths {
+ for path in paths {
+ self.reload_repo_for_file_path(&path, &mut *snapshot, self.fs.as_ref());
+ }
}
- }
- let mut git_repositories = mem::take(&mut snapshot.git_repositories);
- git_repositories.retain(|work_directory_id, _| {
- snapshot
- .entry_for_id(*work_directory_id)
- .map_or(false, |entry| {
- snapshot.entry_for_path(entry.path.join(*DOT_GIT)).is_some()
- })
- });
- snapshot.git_repositories = git_repositories;
+ let mut git_repositories = mem::take(&mut snapshot.git_repositories);
+ git_repositories.retain(|work_directory_id, _| {
+ snapshot
+ .entry_for_id(*work_directory_id)
+ .map_or(false, |entry| {
+ snapshot.entry_for_path(entry.path.join(*DOT_GIT)).is_some()
+ })
+ });
+ snapshot.git_repositories = git_repositories;
- let mut git_repository_entries = mem::take(&mut snapshot.snapshot.repository_entries);
- git_repository_entries.retain(|_, entry| {
- snapshot
- .git_repositories
- .get(&entry.work_directory.0)
- .is_some()
- });
- snapshot.snapshot.repository_entries = git_repository_entries;
- snapshot.completed_scan_id = snapshot.scan_id;
- drop(snapshot);
+ let mut git_repository_entries = mem::take(&mut snapshot.snapshot.repository_entries);
+ git_repository_entries.retain(|_, entry| {
+ snapshot
+ .git_repositories
+ .get(&entry.work_directory.0)
+ .is_some()
+ });
+ snapshot.snapshot.repository_entries = git_repository_entries;
+ snapshot.completed_scan_id = snapshot.scan_id;
+ }
self.send_status_update(false, None);
- self.prev_state.lock().event_paths.clear();
}
async fn scan_dirs(