diff --git a/crates/gpui/src/app/test_app_context.rs b/crates/gpui/src/app/test_app_context.rs index e956c4ca0d209276a07253388c497e67abcf7402..2fa86998837984d97139f4248493a29f70fa4f75 100644 --- a/crates/gpui/src/app/test_app_context.rs +++ b/crates/gpui/src/app/test_app_context.rs @@ -434,7 +434,9 @@ impl ModelHandle { Duration::from_secs(1) }; + let executor = cx.background().clone(); async move { + executor.start_waiting(); let notification = crate::util::timeout(duration, rx.next()) .await .expect("next notification timed out"); diff --git a/crates/gpui/src/executor.rs b/crates/gpui/src/executor.rs index a06e0d5fdbf0cdfad4636cc8928ee28ab1f448df..365766fb9dd0b080642a4b1e344d985dd312d22c 100644 --- a/crates/gpui/src/executor.rs +++ b/crates/gpui/src/executor.rs @@ -876,6 +876,14 @@ impl Background { } } } + + #[cfg(any(test, feature = "test-support"))] + pub fn start_waiting(&self) { + match self { + Self::Deterministic { executor, .. } => executor.start_waiting(), + _ => panic!("this method can only be called on a deterministic executor"), + } + } } impl Default for Background { diff --git a/crates/language/src/language.rs b/crates/language/src/language.rs index 87e4880b99186527acacd24c0a7ff609b0a015db..8b4041b852bc6a2545e35209b156e94ac88050bb 100644 --- a/crates/language/src/language.rs +++ b/crates/language/src/language.rs @@ -796,6 +796,12 @@ impl LanguageRegistry { http_client: Arc, cx: &mut AppContext, ) -> Option { + let server_id = self.state.write().next_language_server_id(); + log::info!( + "starting language server name:{}, path:{root_path:?}, id:{server_id}", + adapter.name.0 + ); + #[cfg(any(test, feature = "test-support"))] if language.fake_adapter.is_some() { let task = cx.spawn(|cx| async move { @@ -825,7 +831,6 @@ impl LanguageRegistry { Ok(server) }); - let server_id = self.state.write().next_language_server_id(); return Some(PendingLanguageServer { server_id, task }); } @@ -834,7 +839,6 @@ impl LanguageRegistry { .clone() .ok_or_else(|| anyhow!("language server download directory has not been assigned")) .log_err()?; - let this = self.clone(); let language = language.clone(); let http_client = http_client.clone(); @@ -843,7 +847,6 @@ impl LanguageRegistry { let adapter = adapter.clone(); let lsp_binary_statuses = self.lsp_binary_statuses_tx.clone(); let login_shell_env_loaded = self.login_shell_env_loaded.clone(); - let server_id = self.state.write().next_language_server_id(); let task = cx.spawn(|cx| async move { login_shell_env_loaded.await; diff --git a/crates/lsp/src/lsp.rs b/crates/lsp/src/lsp.rs index f4f538b061c892bb07cedc10304233936ee9c079..2ec1f63b8b22cb9207771bb81df51fb945f81ee4 100644 --- a/crates/lsp/src/lsp.rs +++ b/crates/lsp/src/lsp.rs @@ -849,10 +849,12 @@ impl FakeLanguageServer { T: request::Request, T::Result: 'static + Send, { + self.server.executor.start_waiting(); self.server.request::(params).await } pub async fn receive_notification(&mut self) -> T::Params { + self.server.executor.start_waiting(); self.try_receive_notification::().await.unwrap() } diff --git a/crates/project/src/project.rs b/crates/project/src/project.rs index c5442221eb295866a439e549d4c343d3ad81f85d..92210a75a8ea2067aafe3e36d68449c3af2e5142 100644 --- a/crates/project/src/project.rs +++ b/crates/project/src/project.rs @@ -1459,7 +1459,7 @@ impl Project { }; cx.foreground().spawn(async move { - pump_loading_buffer_reciever(loading_watch) + wait_for_loading_buffer(loading_watch) .await .map_err(|error| anyhow!("{}", error)) }) @@ -4847,7 +4847,7 @@ impl Project { if worktree.read(cx).is_local() { cx.subscribe(worktree, |this, worktree, event, cx| match event { worktree::Event::UpdatedEntries(changes) => { - this.update_local_worktree_buffers(&worktree, &changes, cx); + this.update_local_worktree_buffers(&worktree, changes, cx); this.update_local_worktree_language_servers(&worktree, changes, cx); } worktree::Event::UpdatedGitRepositories(updated_repos) => { @@ -4881,13 +4881,13 @@ impl Project { fn update_local_worktree_buffers( &mut self, worktree_handle: &ModelHandle, - changes: &HashMap<(Arc, ProjectEntryId), PathChange>, + changes: &[(Arc, ProjectEntryId, PathChange)], cx: &mut ModelContext, ) { let snapshot = worktree_handle.read(cx).snapshot(); let mut renamed_buffers = Vec::new(); - for (path, entry_id) in changes.keys() { + for (path, entry_id, _) in changes { let worktree_id = worktree_handle.read(cx).id(); let project_path = ProjectPath { worktree_id, @@ -4993,7 +4993,7 @@ impl Project { fn update_local_worktree_language_servers( &mut self, worktree_handle: &ModelHandle, - changes: &HashMap<(Arc, ProjectEntryId), PathChange>, + changes: &[(Arc, ProjectEntryId, PathChange)], cx: &mut ModelContext, ) { if changes.is_empty() { @@ -5024,23 +5024,21 @@ impl Project { let params = lsp::DidChangeWatchedFilesParams { changes: changes .iter() - .filter_map(|((path, _), change)| { - if watched_paths.is_match(&path) { - Some(lsp::FileEvent { - uri: lsp::Url::from_file_path(abs_path.join(path)) - .unwrap(), - typ: match change { - PathChange::Added => lsp::FileChangeType::CREATED, - PathChange::Removed => lsp::FileChangeType::DELETED, - PathChange::Updated - | PathChange::AddedOrUpdated => { - lsp::FileChangeType::CHANGED - } - }, - }) - } else { - None + .filter_map(|(path, _, change)| { + if !watched_paths.is_match(&path) { + return None; } + let typ = match change { + PathChange::Loaded => return None, + PathChange::Added => lsp::FileChangeType::CREATED, + PathChange::Removed => lsp::FileChangeType::DELETED, + PathChange::Updated => lsp::FileChangeType::CHANGED, + PathChange::AddedOrUpdated => lsp::FileChangeType::CHANGED, + }; + Some(lsp::FileEvent { + uri: lsp::Url::from_file_path(abs_path.join(path)).unwrap(), + typ, + }) }) .collect(), }; @@ -5059,98 +5057,102 @@ impl Project { fn update_local_worktree_buffers_git_repos( &mut self, worktree_handle: ModelHandle, - repos: &HashMap, LocalRepositoryEntry>, + changed_repos: &UpdatedGitRepositoriesSet, cx: &mut ModelContext, ) { debug_assert!(worktree_handle.read(cx).is_local()); - // Setup the pending buffers + // Identify the loading buffers whose containing repository that has changed. let future_buffers = self .loading_buffers_by_path .iter() - .filter_map(|(path, receiver)| { - let path = &path.path; - let (work_directory, repo) = repos - .iter() - .find(|(work_directory, _)| path.starts_with(work_directory))?; - - let repo_relative_path = path.strip_prefix(work_directory).log_err()?; - + .filter_map(|(project_path, receiver)| { + if project_path.worktree_id != worktree_handle.read(cx).id() { + return None; + } + let path = &project_path.path; + changed_repos.iter().find(|(work_dir, change)| { + path.starts_with(work_dir) && change.git_dir_changed + })?; let receiver = receiver.clone(); - let repo_ptr = repo.repo_ptr.clone(); - let repo_relative_path = repo_relative_path.to_owned(); + let path = path.clone(); Some(async move { - pump_loading_buffer_reciever(receiver) + wait_for_loading_buffer(receiver) .await .ok() - .map(|buffer| (buffer, repo_relative_path, repo_ptr)) + .map(|buffer| (buffer, path)) }) }) - .collect::>() - .filter_map(|result| async move { - let (buffer_handle, repo_relative_path, repo_ptr) = result?; + .collect::>(); - let lock = repo_ptr.lock(); - lock.load_index_text(&repo_relative_path) - .map(|diff_base| (diff_base, buffer_handle)) - }); + // Identify the current buffers whose containing repository has changed. + let current_buffers = self + .opened_buffers + .values() + .filter_map(|buffer| { + let buffer = buffer.upgrade(cx)?; + let file = File::from_dyn(buffer.read(cx).file())?; + if file.worktree != worktree_handle { + return None; + } + let path = file.path(); + changed_repos.iter().find(|(work_dir, change)| { + path.starts_with(work_dir) && change.git_dir_changed + })?; + Some((buffer, path.clone())) + }) + .collect::>(); - let update_diff_base_fn = update_diff_base(self); - cx.spawn(|_, mut cx| async move { - let diff_base_tasks = cx + if future_buffers.len() + current_buffers.len() == 0 { + return; + } + + let remote_id = self.remote_id(); + let client = self.client.clone(); + cx.spawn_weak(move |_, mut cx| async move { + // Wait for all of the buffers to load. + let future_buffers = future_buffers.collect::>().await; + + // Reload the diff base for every buffer whose containing git repository has changed. + let snapshot = + worktree_handle.read_with(&cx, |tree, _| tree.as_local().unwrap().snapshot()); + let diff_bases_by_buffer = cx .background() - .spawn(future_buffers.collect::>()) + .spawn(async move { + future_buffers + .into_iter() + .filter_map(|e| e) + .chain(current_buffers) + .filter_map(|(buffer, path)| { + let (work_directory, repo) = + snapshot.repository_and_work_directory_for_path(&path)?; + let repo = snapshot.get_local_repo(&repo)?; + let relative_path = path.strip_prefix(&work_directory).ok()?; + let base_text = repo.repo_ptr.lock().load_index_text(&relative_path); + Some((buffer, base_text)) + }) + .collect::>() + }) .await; - for (diff_base, buffer) in diff_base_tasks.into_iter() { - update_diff_base_fn(Some(diff_base), buffer, &mut cx); + // Assign the new diff bases on all of the buffers. + for (buffer, diff_base) in diff_bases_by_buffer { + let buffer_id = buffer.update(&mut cx, |buffer, cx| { + buffer.set_diff_base(diff_base.clone(), cx); + buffer.remote_id() + }); + if let Some(project_id) = remote_id { + client + .send(proto::UpdateDiffBase { + project_id, + buffer_id, + diff_base, + }) + .log_err(); + } } }) .detach(); - - // And the current buffers - for (_, buffer) in &self.opened_buffers { - if let Some(buffer) = buffer.upgrade(cx) { - let file = match File::from_dyn(buffer.read(cx).file()) { - Some(file) => file, - None => continue, - }; - if file.worktree != worktree_handle { - continue; - } - - let path = file.path().clone(); - - let worktree = worktree_handle.read(cx); - - let (work_directory, repo) = match repos - .iter() - .find(|(work_directory, _)| path.starts_with(work_directory)) - { - Some(repo) => repo.clone(), - None => continue, - }; - - let relative_repo = match path.strip_prefix(work_directory).log_err() { - Some(relative_repo) => relative_repo.to_owned(), - None => continue, - }; - - drop(worktree); - - let update_diff_base_fn = update_diff_base(self); - let git_ptr = repo.repo_ptr.clone(); - let diff_base_task = cx - .background() - .spawn(async move { git_ptr.lock().load_index_text(&relative_repo) }); - - cx.spawn(|_, mut cx| async move { - let diff_base = diff_base_task.await; - update_diff_base_fn(diff_base, buffer, &mut cx); - }) - .detach(); - } - } } pub fn set_active_path(&mut self, entry: Option, cx: &mut ModelContext) { @@ -7072,7 +7074,7 @@ impl Item for Buffer { } } -async fn pump_loading_buffer_reciever( +async fn wait_for_loading_buffer( mut receiver: postage::watch::Receiver, Arc>>>, ) -> Result, Arc> { loop { @@ -7085,26 +7087,3 @@ async fn pump_loading_buffer_reciever( receiver.next().await; } } - -fn update_diff_base( - project: &Project, -) -> impl Fn(Option, ModelHandle, &mut AsyncAppContext) { - let remote_id = project.remote_id(); - let client = project.client().clone(); - move |diff_base, buffer, cx| { - let buffer_id = buffer.update(cx, |buffer, cx| { - buffer.set_diff_base(diff_base.clone(), cx); - buffer.remote_id() - }); - - if let Some(project_id) = remote_id { - client - .send(proto::UpdateDiffBase { - project_id, - buffer_id: buffer_id as u64, - diff_base, - }) - .log_err(); - } - } -} diff --git a/crates/project/src/project_tests.rs b/crates/project/src/project_tests.rs index 285e3ee9b678c71708f13c66e6af11b41a80ac48..34b63fd5bd9fcaff67f9736ac79f6302045fd8f0 100644 --- a/crates/project/src/project_tests.rs +++ b/crates/project/src/project_tests.rs @@ -1193,7 +1193,7 @@ async fn test_toggling_enable_language_server(cx: &mut gpui::TestAppContext) { .await; } -#[gpui::test] +#[gpui::test(iterations = 3)] async fn test_transforming_diagnostics(cx: &mut gpui::TestAppContext) { init_test(cx); @@ -1273,7 +1273,7 @@ async fn test_transforming_diagnostics(cx: &mut gpui::TestAppContext) { // The diagnostics have moved down since they were created. buffer.next_notification(cx).await; - buffer.next_notification(cx).await; + cx.foreground().run_until_parked(); buffer.read_with(cx, |buffer, _| { assert_eq!( buffer @@ -1352,6 +1352,7 @@ async fn test_transforming_diagnostics(cx: &mut gpui::TestAppContext) { }); buffer.next_notification(cx).await; + cx.foreground().run_until_parked(); buffer.read_with(cx, |buffer, _| { assert_eq!( buffer @@ -1444,6 +1445,7 @@ async fn test_transforming_diagnostics(cx: &mut gpui::TestAppContext) { }); buffer.next_notification(cx).await; + cx.foreground().run_until_parked(); buffer.read_with(cx, |buffer, _| { assert_eq!( buffer @@ -2524,29 +2526,21 @@ async fn test_rescan_and_remote_updates( // Create a remote copy of this worktree. let tree = project.read_with(cx, |project, cx| project.worktrees(cx).next().unwrap()); - let initial_snapshot = tree.read_with(cx, |tree, _| tree.as_local().unwrap().snapshot()); - let remote = cx.update(|cx| { - Worktree::remote( - 1, - 1, - proto::WorktreeMetadata { - id: initial_snapshot.id().to_proto(), - root_name: initial_snapshot.root_name().into(), - abs_path: initial_snapshot - .abs_path() - .as_os_str() - .to_string_lossy() - .into(), - visible: true, - }, - rpc.clone(), - cx, - ) - }); - remote.update(cx, |remote, _| { - let update = initial_snapshot.build_initial_update(1); - remote.as_remote_mut().unwrap().update_from_remote(update); + + let metadata = tree.read_with(cx, |tree, _| tree.as_local().unwrap().metadata_proto()); + + let updates = Arc::new(Mutex::new(Vec::new())); + tree.update(cx, |tree, cx| { + let _ = tree.as_local_mut().unwrap().observe_updates(0, cx, { + let updates = updates.clone(); + move |update| { + updates.lock().push(update); + async { true } + } + }); }); + + let remote = cx.update(|cx| Worktree::remote(1, 1, metadata, rpc.clone(), cx)); deterministic.run_until_parked(); cx.read(|cx| { @@ -2612,14 +2606,11 @@ async fn test_rescan_and_remote_updates( // Update the remote worktree. Check that it becomes consistent with the // local worktree. - remote.update(cx, |remote, cx| { - let update = tree.read(cx).as_local().unwrap().snapshot().build_update( - &initial_snapshot, - 1, - 1, - true, - ); - remote.as_remote_mut().unwrap().update_from_remote(update); + deterministic.run_until_parked(); + remote.update(cx, |remote, _| { + for update in updates.lock().drain(..) { + remote.as_remote_mut().unwrap().update_from_remote(update); + } }); deterministic.run_until_parked(); remote.read_with(cx, |remote, _| { diff --git a/crates/project/src/worktree.rs b/crates/project/src/worktree.rs index 3a7044de0c705be6084401c2f90e9ef2b007e1d7..dc3c172775721c5142aedab6fc4f0f5d66a1c913 100644 --- a/crates/project/src/worktree.rs +++ b/crates/project/src/worktree.rs @@ -17,7 +17,7 @@ use futures::{ }, select_biased, task::Poll, - Stream, StreamExt, + FutureExt, Stream, StreamExt, }; use fuzzy::CharBag; use git::{DOT_GIT, GITIGNORE}; @@ -55,7 +55,7 @@ use std::{ time::{Duration, SystemTime}, }; use sum_tree::{Bias, Edit, SeekTarget, SumTree, TreeMap, TreeSet}; -use util::{paths::HOME, ResultExt, TakeUntilExt, TryFutureExt}; +use util::{paths::HOME, ResultExt, TakeUntilExt}; #[derive(Copy, Clone, PartialEq, Eq, Debug, Hash, PartialOrd, Ord)] pub struct WorktreeId(usize); @@ -317,18 +317,20 @@ pub struct LocalSnapshot { git_repositories: TreeMap, } -pub struct LocalMutableSnapshot { +pub struct BackgroundScannerState { snapshot: LocalSnapshot, /// The ids of all of the entries that were removed from the snapshot /// as part of the current update. These entry ids may be re-used /// if the same inode is discovered at a new path, or if the given /// path is re-created after being deleted. removed_entry_ids: HashMap, + changed_paths: Vec>, + prev_snapshot: Snapshot, } #[derive(Debug, Clone)] pub struct LocalRepositoryEntry { - pub(crate) scan_id: usize, + pub(crate) work_dir_scan_id: usize, pub(crate) git_dir_scan_id: usize, pub(crate) repo_ptr: Arc>, /// Path to the actual .git folder. @@ -357,25 +359,11 @@ impl DerefMut for LocalSnapshot { } } -impl Deref for LocalMutableSnapshot { - type Target = LocalSnapshot; - - fn deref(&self) -> &Self::Target { - &self.snapshot - } -} - -impl DerefMut for LocalMutableSnapshot { - fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.snapshot - } -} - enum ScanState { Started, Updated { snapshot: LocalSnapshot, - changes: HashMap<(Arc, ProjectEntryId), PathChange>, + changes: UpdatedEntriesSet, barrier: Option, scanning: bool, }, @@ -383,14 +371,15 @@ enum ScanState { struct ShareState { project_id: u64, - snapshots_tx: watch::Sender, + snapshots_tx: + mpsc::UnboundedSender<(LocalSnapshot, UpdatedEntriesSet, UpdatedGitRepositoriesSet)>, resume_updates: watch::Sender<()>, _maintain_remote_snapshot: Task>, } pub enum Event { - UpdatedEntries(HashMap<(Arc, ProjectEntryId), PathChange>), - UpdatedGitRepositories(HashMap, LocalRepositoryEntry>), + UpdatedEntries(UpdatedEntriesSet), + UpdatedGitRepositories(UpdatedGitRepositoriesSet), } impl Entity for Worktree { @@ -465,8 +454,7 @@ impl Worktree { scanning, } => { *this.is_scanning.0.borrow_mut() = scanning; - this.set_snapshot(snapshot, cx); - cx.emit(Event::UpdatedEntries(changes)); + this.set_snapshot(snapshot, changes, cx); drop(barrier); } } @@ -560,7 +548,7 @@ impl Worktree { this.update(&mut cx, |this, cx| { let this = this.as_remote_mut().unwrap(); this.snapshot = this.background_snapshot.lock().clone(); - cx.emit(Event::UpdatedEntries(Default::default())); + cx.emit(Event::UpdatedEntries(Arc::from([]))); cx.notify(); while let Some((scan_id, _)) = this.snapshot_subscriptions.front() { if this.observed_snapshot(*scan_id) { @@ -832,73 +820,137 @@ impl LocalWorktree { Ok(!old_summary.is_empty() || !new_summary.is_empty()) } - fn set_snapshot(&mut self, new_snapshot: LocalSnapshot, cx: &mut ModelContext) { - let updated_repos = - self.changed_repos(&self.git_repositories, &new_snapshot.git_repositories); + fn set_snapshot( + &mut self, + new_snapshot: LocalSnapshot, + entry_changes: UpdatedEntriesSet, + cx: &mut ModelContext, + ) { + let repo_changes = self.changed_repos(&self.snapshot, &new_snapshot); self.snapshot = new_snapshot; if let Some(share) = self.share.as_mut() { - *share.snapshots_tx.borrow_mut() = self.snapshot.clone(); + share + .snapshots_tx + .unbounded_send(( + self.snapshot.clone(), + entry_changes.clone(), + repo_changes.clone(), + )) + .ok(); } - if !updated_repos.is_empty() { - cx.emit(Event::UpdatedGitRepositories(updated_repos)); + if !entry_changes.is_empty() { + cx.emit(Event::UpdatedEntries(entry_changes)); + } + if !repo_changes.is_empty() { + cx.emit(Event::UpdatedGitRepositories(repo_changes)); } } fn changed_repos( &self, - old_repos: &TreeMap, - new_repos: &TreeMap, - ) -> HashMap, LocalRepositoryEntry> { - let mut diff = HashMap::default(); - let mut old_repos = old_repos.iter().peekable(); - let mut new_repos = new_repos.iter().peekable(); + old_snapshot: &LocalSnapshot, + new_snapshot: &LocalSnapshot, + ) -> UpdatedGitRepositoriesSet { + let mut changes = Vec::new(); + let mut old_repos = old_snapshot.git_repositories.iter().peekable(); + let mut new_repos = new_snapshot.git_repositories.iter().peekable(); loop { - match (old_repos.peek(), new_repos.peek()) { - (Some((old_entry_id, old_repo)), Some((new_entry_id, new_repo))) => { - match Ord::cmp(old_entry_id, new_entry_id) { + match (new_repos.peek().map(clone), old_repos.peek().map(clone)) { + (Some((new_entry_id, new_repo)), Some((old_entry_id, old_repo))) => { + match Ord::cmp(&new_entry_id, &old_entry_id) { Ordering::Less => { - if let Some(entry) = self.entry_for_id(**old_entry_id) { - diff.insert(entry.path.clone(), (*old_repo).clone()); + if let Some(entry) = new_snapshot.entry_for_id(new_entry_id) { + changes.push(( + entry.path.clone(), + GitRepositoryChange { + old_repository: None, + git_dir_changed: true, + }, + )); } - old_repos.next(); + new_repos.next(); } Ordering::Equal => { - if old_repo.git_dir_scan_id != new_repo.git_dir_scan_id { - if let Some(entry) = self.entry_for_id(**new_entry_id) { - diff.insert(entry.path.clone(), (*new_repo).clone()); + let git_dir_changed = + new_repo.git_dir_scan_id != old_repo.git_dir_scan_id; + let work_dir_changed = + new_repo.work_dir_scan_id != old_repo.work_dir_scan_id; + if git_dir_changed || work_dir_changed { + if let Some(entry) = new_snapshot.entry_for_id(new_entry_id) { + let old_repo = old_snapshot + .repository_entries + .get(&RepositoryWorkDirectory(entry.path.clone())) + .cloned(); + changes.push(( + entry.path.clone(), + GitRepositoryChange { + old_repository: old_repo, + git_dir_changed, + }, + )); } } - - old_repos.next(); new_repos.next(); + old_repos.next(); } Ordering::Greater => { - if let Some(entry) = self.entry_for_id(**new_entry_id) { - diff.insert(entry.path.clone(), (*new_repo).clone()); + if let Some(entry) = old_snapshot.entry_for_id(old_entry_id) { + let old_repo = old_snapshot + .repository_entries + .get(&RepositoryWorkDirectory(entry.path.clone())) + .cloned(); + changes.push(( + entry.path.clone(), + GitRepositoryChange { + old_repository: old_repo, + git_dir_changed: true, + }, + )); } - new_repos.next(); + old_repos.next(); } } } - (Some((old_entry_id, old_repo)), None) => { - if let Some(entry) = self.entry_for_id(**old_entry_id) { - diff.insert(entry.path.clone(), (*old_repo).clone()); + (Some((entry_id, _)), None) => { + if let Some(entry) = new_snapshot.entry_for_id(entry_id) { + changes.push(( + entry.path.clone(), + GitRepositoryChange { + old_repository: None, + git_dir_changed: true, + }, + )); } - old_repos.next(); + new_repos.next(); } - (None, Some((new_entry_id, new_repo))) => { - if let Some(entry) = self.entry_for_id(**new_entry_id) { - diff.insert(entry.path.clone(), (*new_repo).clone()); + (None, Some((entry_id, _))) => { + if let Some(entry) = old_snapshot.entry_for_id(entry_id) { + let old_repo = old_snapshot + .repository_entries + .get(&RepositoryWorkDirectory(entry.path.clone())) + .cloned(); + changes.push(( + entry.path.clone(), + GitRepositoryChange { + old_repository: old_repo, + git_dir_changed: true, + }, + )); } - new_repos.next(); + old_repos.next(); } (None, None) => break, } } - diff + + fn clone(value: &(&T, &U)) -> (T, U) { + (value.0.clone(), value.1.clone()) + } + + changes.into() } pub fn scan_complete(&self) -> impl Future { @@ -1239,89 +1291,97 @@ impl LocalWorktree { }) } - pub fn share(&mut self, project_id: u64, cx: &mut ModelContext) -> Task> { + pub fn observe_updates( + &mut self, + project_id: u64, + cx: &mut ModelContext, + callback: F, + ) -> oneshot::Receiver<()> + where + F: 'static + Send + Fn(proto::UpdateWorktree) -> Fut, + Fut: Send + Future, + { + #[cfg(any(test, feature = "test-support"))] + const MAX_CHUNK_SIZE: usize = 2; + #[cfg(not(any(test, feature = "test-support")))] + const MAX_CHUNK_SIZE: usize = 256; + let (share_tx, share_rx) = oneshot::channel(); if let Some(share) = self.share.as_mut() { - let _ = share_tx.send(()); + share_tx.send(()).ok(); *share.resume_updates.borrow_mut() = (); - } else { - let (snapshots_tx, mut snapshots_rx) = watch::channel_with(self.snapshot()); - let (resume_updates_tx, mut resume_updates_rx) = watch::channel(); - let worktree_id = cx.model_id() as u64; + return share_rx; + } - for (path, summaries) in &self.diagnostic_summaries { - for (&server_id, summary) in summaries { - if let Err(e) = self.client.send(proto::UpdateDiagnosticSummary { - project_id, - worktree_id, - summary: Some(summary.to_proto(server_id, &path)), - }) { - return Task::ready(Err(e)); - } + let (resume_updates_tx, mut resume_updates_rx) = watch::channel::<()>(); + let (snapshots_tx, mut snapshots_rx) = + mpsc::unbounded::<(LocalSnapshot, UpdatedEntriesSet, UpdatedGitRepositoriesSet)>(); + snapshots_tx + .unbounded_send((self.snapshot(), Arc::from([]), Arc::from([]))) + .ok(); + + let worktree_id = cx.model_id() as u64; + let _maintain_remote_snapshot = cx.background().spawn(async move { + let mut is_first = true; + while let Some((snapshot, entry_changes, repo_changes)) = snapshots_rx.next().await { + let update; + if is_first { + update = snapshot.build_initial_update(project_id, worktree_id); + is_first = false; + } else { + update = + snapshot.build_update(project_id, worktree_id, entry_changes, repo_changes); } - } - let _maintain_remote_snapshot = cx.background().spawn({ - let client = self.client.clone(); - async move { - let mut share_tx = Some(share_tx); - let mut prev_snapshot = LocalSnapshot { - ignores_by_parent_abs_path: Default::default(), - git_repositories: Default::default(), - snapshot: Snapshot { - id: WorktreeId(worktree_id as usize), - abs_path: Path::new("").into(), - root_name: Default::default(), - root_char_bag: Default::default(), - entries_by_path: Default::default(), - entries_by_id: Default::default(), - repository_entries: Default::default(), - scan_id: 0, - completed_scan_id: 0, - }, - }; - while let Some(snapshot) = snapshots_rx.recv().await { - #[cfg(any(test, feature = "test-support"))] - const MAX_CHUNK_SIZE: usize = 2; - #[cfg(not(any(test, feature = "test-support")))] - const MAX_CHUNK_SIZE: usize = 256; - - let update = - snapshot.build_update(&prev_snapshot, project_id, worktree_id, true); - for update in proto::split_worktree_update(update, MAX_CHUNK_SIZE) { - let _ = resume_updates_rx.try_recv(); - while let Err(error) = client.request(update.clone()).await { - log::error!("failed to send worktree update: {}", error); - log::info!("waiting to resume updates"); - if resume_updates_rx.next().await.is_none() { - return Ok(()); - } + for update in proto::split_worktree_update(update, MAX_CHUNK_SIZE) { + let _ = resume_updates_rx.try_recv(); + loop { + let result = callback(update.clone()); + if result.await { + break; + } else { + log::info!("waiting to resume updates"); + if resume_updates_rx.next().await.is_none() { + return Some(()); } } + } + } + } + share_tx.send(()).ok(); + Some(()) + }); - if let Some(share_tx) = share_tx.take() { - let _ = share_tx.send(()); - } + self.share = Some(ShareState { + project_id, + snapshots_tx, + resume_updates: resume_updates_tx, + _maintain_remote_snapshot, + }); + share_rx + } - prev_snapshot = snapshot; - } + pub fn share(&mut self, project_id: u64, cx: &mut ModelContext) -> Task> { + let client = self.client.clone(); - Ok::<_, anyhow::Error>(()) + for (path, summaries) in &self.diagnostic_summaries { + for (&server_id, summary) in summaries { + if let Err(e) = self.client.send(proto::UpdateDiagnosticSummary { + project_id, + worktree_id: cx.model_id() as u64, + summary: Some(summary.to_proto(server_id, &path)), + }) { + return Task::ready(Err(e)); } - .log_err() - }); - - self.share = Some(ShareState { - project_id, - snapshots_tx, - resume_updates: resume_updates_tx, - _maintain_remote_snapshot, - }); + } } + let rx = self.observe_updates(project_id, cx, move |update| { + client.request(update).map(|result| result.is_ok()) + }); cx.foreground() - .spawn(async move { share_rx.await.map_err(|_| anyhow!("share ended")) }) + .spawn(async move { rx.await.map_err(|_| anyhow!("share ended")) }) } pub fn unshare(&mut self) { @@ -1530,10 +1590,12 @@ impl Snapshot { pub(crate) fn apply_remote_update(&mut self, mut update: proto::UpdateWorktree) -> Result<()> { let mut entries_by_path_edits = Vec::new(); let mut entries_by_id_edits = Vec::new(); + for entry_id in update.removed_entries { - if let Some(entry) = self.entry_for_id(ProjectEntryId::from_proto(entry_id)) { + let entry_id = ProjectEntryId::from_proto(entry_id); + entries_by_id_edits.push(Edit::Remove(entry_id)); + if let Some(entry) = self.entry_for_id(entry_id) { entries_by_path_edits.push(Edit::Remove(PathKey(entry.path.clone()))); - entries_by_id_edits.push(Edit::Remove(entry.id)); } } @@ -1542,6 +1604,11 @@ impl Snapshot { if let Some(PathEntry { path, .. }) = self.entries_by_id.get(&entry.id, &()) { entries_by_path_edits.push(Edit::Remove(PathKey(path.clone()))); } + if let Some(old_entry) = self.entries_by_path.get(&PathKey(entry.path.clone()), &()) { + if old_entry.id != entry.id { + entries_by_id_edits.push(Edit::Remove(old_entry.id)); + } + } entries_by_id_edits.push(Edit::Insert(PathEntry { id: entry.id, path: entry.path.clone(), @@ -1684,20 +1751,19 @@ impl Snapshot { /// Get the repository whose work directory contains the given path. pub fn repository_for_path(&self, path: &Path) -> Option { - let mut max_len = 0; - let mut current_candidate = None; - for (work_directory, repo) in (&self.repository_entries).iter() { - if path.starts_with(&work_directory.0) { - if work_directory.0.as_os_str().len() >= max_len { - current_candidate = Some(repo); - max_len = work_directory.0.as_os_str().len(); - } else { - break; - } - } - } + self.repository_and_work_directory_for_path(path) + .map(|e| e.1) + } - current_candidate.cloned() + pub fn repository_and_work_directory_for_path( + &self, + path: &Path, + ) -> Option<(RepositoryWorkDirectory, RepositoryEntry)> { + self.repository_entries + .iter() + .filter(|(workdir_path, _)| path.starts_with(workdir_path)) + .last() + .map(|(path, repo)| (path.clone(), repo.clone())) } /// Given an ordered iterator of entries, returns an iterator of those entries, @@ -1833,117 +1899,52 @@ impl LocalSnapshot { .find(|(_, repo)| repo.in_dot_git(path)) } - #[cfg(test)] - pub(crate) fn build_initial_update(&self, project_id: u64) -> proto::UpdateWorktree { - let root_name = self.root_name.clone(); - proto::UpdateWorktree { - project_id, - worktree_id: self.id().to_proto(), - abs_path: self.abs_path().to_string_lossy().into(), - root_name, - updated_entries: self.entries_by_path.iter().map(Into::into).collect(), - removed_entries: Default::default(), - scan_id: self.scan_id as u64, - is_last_update: true, - updated_repositories: self.repository_entries.values().map(Into::into).collect(), - removed_repositories: Default::default(), - } - } - - pub(crate) fn build_update( + fn build_update( &self, - other: &Self, project_id: u64, worktree_id: u64, - include_ignored: bool, + entry_changes: UpdatedEntriesSet, + repo_changes: UpdatedGitRepositoriesSet, ) -> proto::UpdateWorktree { let mut updated_entries = Vec::new(); let mut removed_entries = Vec::new(); - let mut self_entries = self - .entries_by_id - .cursor::<()>() - .filter(|e| include_ignored || !e.is_ignored) - .peekable(); - let mut other_entries = other - .entries_by_id - .cursor::<()>() - .filter(|e| include_ignored || !e.is_ignored) - .peekable(); - loop { - match (self_entries.peek(), other_entries.peek()) { - (Some(self_entry), Some(other_entry)) => { - match Ord::cmp(&self_entry.id, &other_entry.id) { - Ordering::Less => { - let entry = self.entry_for_id(self_entry.id).unwrap().into(); - updated_entries.push(entry); - self_entries.next(); - } - Ordering::Equal => { - if self_entry.scan_id != other_entry.scan_id { - let entry = self.entry_for_id(self_entry.id).unwrap().into(); - updated_entries.push(entry); - } + let mut updated_repositories = Vec::new(); + let mut removed_repositories = Vec::new(); - self_entries.next(); - other_entries.next(); - } - Ordering::Greater => { - removed_entries.push(other_entry.id.to_proto()); - other_entries.next(); - } - } - } - (Some(self_entry), None) => { - let entry = self.entry_for_id(self_entry.id).unwrap().into(); - updated_entries.push(entry); - self_entries.next(); - } - (None, Some(other_entry)) => { - removed_entries.push(other_entry.id.to_proto()); - other_entries.next(); - } - (None, None) => break, + for (_, entry_id, path_change) in entry_changes.iter() { + if let PathChange::Removed = path_change { + removed_entries.push(entry_id.0 as u64); + } else if let Some(entry) = self.entry_for_id(*entry_id) { + updated_entries.push(proto::Entry::from(entry)); } } - let mut updated_repositories: Vec = Vec::new(); - let mut removed_repositories = Vec::new(); - let mut self_repos = self.snapshot.repository_entries.iter().peekable(); - let mut other_repos = other.snapshot.repository_entries.iter().peekable(); - loop { - match (self_repos.peek(), other_repos.peek()) { - (Some((self_work_dir, self_repo)), Some((other_work_dir, other_repo))) => { - match Ord::cmp(self_work_dir, other_work_dir) { - Ordering::Less => { - updated_repositories.push((*self_repo).into()); - self_repos.next(); - } - Ordering::Equal => { - if self_repo != other_repo { - updated_repositories.push(self_repo.build_update(other_repo)); - } - - self_repos.next(); - other_repos.next(); - } - Ordering::Greater => { - removed_repositories.push(other_repo.work_directory.to_proto()); - other_repos.next(); - } - } + for (work_dir_path, change) in repo_changes.iter() { + let new_repo = self + .repository_entries + .get(&RepositoryWorkDirectory(work_dir_path.clone())); + match (&change.old_repository, new_repo) { + (Some(old_repo), Some(new_repo)) => { + updated_repositories.push(new_repo.build_update(old_repo)); } - (Some((_, self_repo)), None) => { - updated_repositories.push((*self_repo).into()); - self_repos.next(); + (None, Some(new_repo)) => { + updated_repositories.push(proto::RepositoryEntry::from(new_repo)); } - (None, Some((_, other_repo))) => { - removed_repositories.push(other_repo.work_directory.to_proto()); - other_repos.next(); + (Some(old_repo), None) => { + removed_repositories.push(old_repo.work_directory.0.to_proto()); } - (None, None) => break, + _ => {} } } + removed_entries.sort_unstable(); + updated_entries.sort_unstable_by_key(|e| e.id); + removed_repositories.sort_unstable(); + updated_repositories.sort_unstable_by_key(|e| e.work_directory_id); + + // TODO - optimize, knowing that removed_entries are sorted. + removed_entries.retain(|id| updated_entries.binary_search_by_key(id, |e| e.id).is_err()); + proto::UpdateWorktree { project_id, worktree_id, @@ -1958,6 +1959,35 @@ impl LocalSnapshot { } } + fn build_initial_update(&self, project_id: u64, worktree_id: u64) -> proto::UpdateWorktree { + let mut updated_entries = self + .entries_by_path + .iter() + .map(proto::Entry::from) + .collect::>(); + updated_entries.sort_unstable_by_key(|e| e.id); + + let mut updated_repositories = self + .repository_entries + .values() + .map(proto::RepositoryEntry::from) + .collect::>(); + updated_repositories.sort_unstable_by_key(|e| e.work_directory_id); + + proto::UpdateWorktree { + project_id, + worktree_id, + abs_path: self.abs_path().to_string_lossy().into(), + root_name: self.root_name().to_string(), + updated_entries, + removed_entries: Vec::new(), + scan_id: self.scan_id as u64, + is_last_update: self.completed_scan_id == self.scan_id, + updated_repositories, + removed_repositories: Vec::new(), + } + } + fn insert_entry(&mut self, mut entry: Entry, fs: &dyn Fs) -> Entry { if entry.is_file() && entry.path.file_name() == Some(&GITIGNORE) { let abs_path = self.abs_path.join(&entry.path); @@ -2041,7 +2071,7 @@ impl LocalSnapshot { self.git_repositories.insert( work_dir_id, LocalRepositoryEntry { - scan_id, + work_dir_scan_id: scan_id, git_dir_scan_id: scan_id, repo_ptr: repo, git_dir_path: parent_path.clone(), @@ -2090,11 +2120,11 @@ impl LocalSnapshot { } } -impl LocalMutableSnapshot { +impl BackgroundScannerState { fn reuse_entry_id(&mut self, entry: &mut Entry) { if let Some(removed_entry_id) = self.removed_entry_ids.remove(&entry.inode) { entry.id = removed_entry_id; - } else if let Some(existing_entry) = self.entry_for_path(&entry.path) { + } else if let Some(existing_entry) = self.snapshot.entry_for_path(&entry.path) { entry.id = existing_entry.id; } } @@ -2111,8 +2141,10 @@ impl LocalMutableSnapshot { ignore: Option>, fs: &dyn Fs, ) { - let mut parent_entry = if let Some(parent_entry) = - self.entries_by_path.get(&PathKey(parent_path.clone()), &()) + let mut parent_entry = if let Some(parent_entry) = self + .snapshot + .entries_by_path + .get(&PathKey(parent_path.clone()), &()) { parent_entry.clone() } else { @@ -2132,13 +2164,14 @@ impl LocalMutableSnapshot { } if let Some(ignore) = ignore { - let abs_parent_path = self.abs_path.join(&parent_path).into(); - self.ignores_by_parent_abs_path + let abs_parent_path = self.snapshot.abs_path.join(&parent_path).into(); + self.snapshot + .ignores_by_parent_abs_path .insert(abs_parent_path, (ignore, false)); } if parent_path.file_name() == Some(&DOT_GIT) { - self.build_repo(parent_path, fs); + self.snapshot.build_repo(parent_path, fs); } let mut entries_by_path_edits = vec![Edit::Insert(parent_entry)]; @@ -2150,25 +2183,27 @@ impl LocalMutableSnapshot { id: entry.id, path: entry.path.clone(), is_ignored: entry.is_ignored, - scan_id: self.scan_id, + scan_id: self.snapshot.scan_id, })); entries_by_path_edits.push(Edit::Insert(entry)); } - self.entries_by_path.edit(entries_by_path_edits, &()); - self.entries_by_id.edit(entries_by_id_edits, &()); + self.snapshot + .entries_by_path + .edit(entries_by_path_edits, &()); + self.snapshot.entries_by_id.edit(entries_by_id_edits, &()); } fn remove_path(&mut self, path: &Path) { let mut new_entries; let removed_entries; { - let mut cursor = self.entries_by_path.cursor::(); + let mut cursor = self.snapshot.entries_by_path.cursor::(); new_entries = cursor.slice(&TraversalTarget::Path(path), Bias::Left, &()); removed_entries = cursor.slice(&TraversalTarget::PathSuccessor(path), Bias::Left, &()); new_entries.push_tree(cursor.suffix(&()), &()); } - self.entries_by_path = new_entries; + self.snapshot.entries_by_path = new_entries; let mut entries_by_id_edits = Vec::new(); for entry in removed_entries.cursor::<()>() { @@ -2179,11 +2214,12 @@ impl LocalMutableSnapshot { *removed_entry_id = cmp::max(*removed_entry_id, entry.id); entries_by_id_edits.push(Edit::Remove(entry.id)); } - self.entries_by_id.edit(entries_by_id_edits, &()); + self.snapshot.entries_by_id.edit(entries_by_id_edits, &()); if path.file_name() == Some(&GITIGNORE) { - let abs_parent_path = self.abs_path.join(path.parent().unwrap()); + let abs_parent_path = self.snapshot.abs_path.join(path.parent().unwrap()); if let Some((_, needs_update)) = self + .snapshot .ignores_by_parent_abs_path .get_mut(abs_parent_path.as_path()) { @@ -2473,12 +2509,31 @@ pub enum EntryKind { #[derive(Clone, Copy, Debug)] pub enum PathChange { + /// A filesystem entry was was created. Added, + /// A filesystem entry was removed. Removed, + /// A filesystem entry was updated. Updated, + /// A filesystem entry was either updated or added. We don't know + /// whether or not it already existed, because the path had not + /// been loaded before the event. AddedOrUpdated, + /// A filesystem entry was found during the initial scan of the worktree. + Loaded, +} + +pub struct GitRepositoryChange { + /// The previous state of the repository, if it already existed. + pub old_repository: Option, + /// Whether the content of the .git directory changed. This will be false + /// if only the repository's work directory changed. + pub git_dir_changed: bool, } +pub type UpdatedEntriesSet = Arc<[(Arc, ProjectEntryId, PathChange)]>; +pub type UpdatedGitRepositoriesSet = Arc<[(Arc, GitRepositoryChange)]>; + impl Entry { fn new( path: Arc, @@ -2635,19 +2690,20 @@ impl<'a> sum_tree::Dimension<'a, EntrySummary> for PathKey { } struct BackgroundScanner { - snapshot: Mutex, + state: Mutex, fs: Arc, status_updates_tx: UnboundedSender, executor: Arc, refresh_requests_rx: channel::Receiver<(Vec, barrier::Sender)>, - prev_state: Mutex, next_entry_id: Arc, - finished_initial_scan: bool, + phase: BackgroundScannerPhase, } -struct BackgroundScannerState { - snapshot: Snapshot, - event_paths: Vec>, +#[derive(PartialEq)] +enum BackgroundScannerPhase { + InitialScan, + EventsReceivedDuringInitialScan, + Events, } impl BackgroundScanner { @@ -2665,15 +2721,13 @@ impl BackgroundScanner { executor, refresh_requests_rx, next_entry_id, - prev_state: Mutex::new(BackgroundScannerState { - snapshot: snapshot.snapshot.clone(), - event_paths: Default::default(), - }), - snapshot: Mutex::new(LocalMutableSnapshot { + state: Mutex::new(BackgroundScannerState { + prev_snapshot: snapshot.snapshot.clone(), snapshot, removed_entry_ids: Default::default(), + changed_paths: Default::default(), }), - finished_initial_scan: false, + phase: BackgroundScannerPhase::InitialScan, } } @@ -2684,7 +2738,7 @@ impl BackgroundScanner { use futures::FutureExt as _; let (root_abs_path, root_inode) = { - let snapshot = self.snapshot.lock(); + let snapshot = &self.state.lock().snapshot; ( snapshot.abs_path.clone(), snapshot.root_entry().map(|e| e.inode), @@ -2696,20 +2750,23 @@ impl BackgroundScanner { for ancestor in root_abs_path.ancestors().skip(1) { if let Ok(ignore) = build_gitignore(&ancestor.join(&*GITIGNORE), self.fs.as_ref()).await { - self.snapshot + self.state .lock() + .snapshot .ignores_by_parent_abs_path .insert(ancestor.into(), (ignore.into(), false)); } } { - let mut snapshot = self.snapshot.lock(); - snapshot.scan_id += 1; - ignore_stack = snapshot.ignore_stack_for_abs_path(&root_abs_path, true); + let mut state = self.state.lock(); + state.snapshot.scan_id += 1; + ignore_stack = state + .snapshot + .ignore_stack_for_abs_path(&root_abs_path, true); if ignore_stack.is_all() { - if let Some(mut root_entry) = snapshot.root_entry().cloned() { + if let Some(mut root_entry) = state.snapshot.root_entry().cloned() { root_entry.is_ignored = true; - snapshot.insert_entry(root_entry, self.fs.as_ref()); + state.insert_entry(root_entry, self.fs.as_ref()); } } }; @@ -2727,14 +2784,15 @@ impl BackgroundScanner { drop(scan_job_tx); self.scan_dirs(true, scan_job_rx).await; { - let mut snapshot = self.snapshot.lock(); - snapshot.completed_scan_id = snapshot.scan_id; + let mut state = self.state.lock(); + state.snapshot.completed_scan_id = state.snapshot.scan_id; } self.send_status_update(false, None); // Process any any FS events that occurred while performing the initial scan. // For these events, update events cannot be as precise, because we didn't // have the previous state loaded yet. + self.phase = BackgroundScannerPhase::EventsReceivedDuringInitialScan; if let Poll::Ready(Some(events)) = futures::poll!(events_rx.next()) { let mut paths = events.into_iter().map(|e| e.path).collect::>(); while let Poll::Ready(Some(more_events)) = futures::poll!(events_rx.next()) { @@ -2743,9 +2801,8 @@ impl BackgroundScanner { self.process_events(paths).await; } - self.finished_initial_scan = true; - // Continue processing events until the worktree is dropped. + self.phase = BackgroundScannerPhase::Events; loop { select_biased! { // Process any path refresh requests from the worktree. Prioritize @@ -2770,15 +2827,7 @@ impl BackgroundScanner { } async fn process_refresh_request(&self, paths: Vec, barrier: barrier::Sender) -> bool { - if let Some(mut paths) = self.reload_entries_for_paths(paths, None).await { - paths.sort_unstable(); - util::extend_sorted( - &mut self.prev_state.lock().event_paths, - paths, - usize::MAX, - Ord::cmp, - ); - } + self.reload_entries_for_paths(paths, None).await; self.send_status_update(false, Some(barrier)) } @@ -2787,50 +2836,42 @@ impl BackgroundScanner { let paths = self .reload_entries_for_paths(paths, Some(scan_job_tx.clone())) .await; - if let Some(paths) = &paths { - util::extend_sorted( - &mut self.prev_state.lock().event_paths, - paths.iter().cloned(), - usize::MAX, - Ord::cmp, - ); - } drop(scan_job_tx); self.scan_dirs(false, scan_job_rx).await; self.update_ignore_statuses().await; - let mut snapshot = self.snapshot.lock(); + { + let mut snapshot = &mut self.state.lock().snapshot; - if let Some(paths) = paths { - for path in paths { - self.reload_repo_for_file_path(&path, &mut *snapshot, self.fs.as_ref()); + if let Some(paths) = paths { + for path in paths { + self.reload_repo_for_file_path(&path, &mut *snapshot, self.fs.as_ref()); + } } - } - let mut git_repositories = mem::take(&mut snapshot.git_repositories); - git_repositories.retain(|work_directory_id, _| { - snapshot - .entry_for_id(*work_directory_id) - .map_or(false, |entry| { - snapshot.entry_for_path(entry.path.join(*DOT_GIT)).is_some() - }) - }); - snapshot.git_repositories = git_repositories; + let mut git_repositories = mem::take(&mut snapshot.git_repositories); + git_repositories.retain(|work_directory_id, _| { + snapshot + .entry_for_id(*work_directory_id) + .map_or(false, |entry| { + snapshot.entry_for_path(entry.path.join(*DOT_GIT)).is_some() + }) + }); + snapshot.git_repositories = git_repositories; - let mut git_repository_entries = mem::take(&mut snapshot.snapshot.repository_entries); - git_repository_entries.retain(|_, entry| { - snapshot - .git_repositories - .get(&entry.work_directory.0) - .is_some() - }); - snapshot.snapshot.repository_entries = git_repository_entries; - snapshot.completed_scan_id = snapshot.scan_id; - drop(snapshot); + let mut git_repository_entries = mem::take(&mut snapshot.snapshot.repository_entries); + git_repository_entries.retain(|_, entry| { + snapshot + .git_repositories + .get(&entry.work_directory.0) + .is_some() + }); + snapshot.snapshot.repository_entries = git_repository_entries; + snapshot.completed_scan_id = snapshot.scan_id; + } self.send_status_update(false, None); - self.prev_state.lock().event_paths.clear(); } async fn scan_dirs( @@ -2907,15 +2948,15 @@ impl BackgroundScanner { } fn send_status_update(&self, scanning: bool, barrier: Option) -> bool { - let mut prev_state = self.prev_state.lock(); - let new_snapshot = self.snapshot.lock().clone(); - let old_snapshot = mem::replace(&mut prev_state.snapshot, new_snapshot.snapshot.clone()); - - let changes = self.build_change_set( - &old_snapshot, - &new_snapshot.snapshot, - &prev_state.event_paths, - ); + let mut state = self.state.lock(); + if state.changed_paths.is_empty() && scanning { + return true; + } + + let new_snapshot = state.snapshot.clone(); + let old_snapshot = mem::replace(&mut state.prev_snapshot, new_snapshot.snapshot.clone()); + let changes = self.build_change_set(&old_snapshot, &new_snapshot, &state.changed_paths); + state.changed_paths.clear(); self.status_updates_tx .unbounded_send(ScanState::Updated { @@ -2933,7 +2974,7 @@ impl BackgroundScanner { let mut ignore_stack = job.ignore_stack.clone(); let mut new_ignore = None; let (root_abs_path, root_char_bag, next_entry_id) = { - let snapshot = self.snapshot.lock(); + let snapshot = &self.state.lock().snapshot; ( snapshot.abs_path().clone(), snapshot.root_char_bag, @@ -3037,12 +3078,13 @@ impl BackgroundScanner { new_entries.push(child_entry); } - self.snapshot.lock().populate_dir( - job.path.clone(), - new_entries, - new_ignore, - self.fs.as_ref(), - ); + { + let mut state = self.state.lock(); + state.populate_dir(job.path.clone(), new_entries, new_ignore, self.fs.as_ref()); + if let Err(ix) = state.changed_paths.binary_search(&job.path) { + state.changed_paths.insert(ix, job.path.clone()); + } + } for new_job in new_jobs { if let Some(new_job) = new_job { @@ -3063,7 +3105,7 @@ impl BackgroundScanner { abs_paths.sort_unstable(); abs_paths.dedup_by(|a, b| a.starts_with(&b)); - let root_abs_path = self.snapshot.lock().abs_path.clone(); + let root_abs_path = self.state.lock().snapshot.abs_path.clone(); let root_canonical_path = self.fs.canonicalize(&root_abs_path).await.log_err()?; let metadata = futures::future::join_all( abs_paths @@ -3073,7 +3115,8 @@ impl BackgroundScanner { ) .await; - let mut snapshot = self.snapshot.lock(); + let mut state = self.state.lock(); + let snapshot = &mut state.snapshot; let is_idle = snapshot.completed_scan_id == snapshot.scan_id; snapshot.scan_id += 1; if is_idle && !doing_recursive_update { @@ -3087,7 +3130,7 @@ impl BackgroundScanner { for (abs_path, metadata) in abs_paths.iter().zip(metadata.iter()) { if let Ok(path) = abs_path.strip_prefix(&root_canonical_path) { if matches!(metadata, Ok(None)) || doing_recursive_update { - snapshot.remove_path(path); + state.remove_path(path); } event_paths.push(path.into()); } else { @@ -3104,19 +3147,20 @@ impl BackgroundScanner { match metadata { Ok(Some(metadata)) => { - let ignore_stack = - snapshot.ignore_stack_for_abs_path(&abs_path, metadata.is_dir); + let ignore_stack = state + .snapshot + .ignore_stack_for_abs_path(&abs_path, metadata.is_dir); let mut fs_entry = Entry::new( path.clone(), &metadata, self.next_entry_id.as_ref(), - snapshot.root_char_bag, + state.snapshot.root_char_bag, ); fs_entry.is_ignored = ignore_stack.is_all(); - snapshot.insert_entry(fs_entry, self.fs.as_ref()); + state.insert_entry(fs_entry, self.fs.as_ref()); if let Some(scan_queue_tx) = &scan_queue_tx { - let mut ancestor_inodes = snapshot.ancestor_inodes_for_path(&path); + let mut ancestor_inodes = state.snapshot.ancestor_inodes_for_path(&path); if metadata.is_dir && !ancestor_inodes.contains(&metadata.inode) { ancestor_inodes.insert(metadata.inode); smol::block_on(scan_queue_tx.send(ScanJob { @@ -3131,7 +3175,7 @@ impl BackgroundScanner { } } Ok(None) => { - self.remove_repo_path(&path, &mut snapshot); + self.remove_repo_path(&path, &mut state.snapshot); } Err(err) => { // TODO - create a special 'error' entry in the entries tree to mark this @@ -3140,6 +3184,13 @@ impl BackgroundScanner { } } + util::extend_sorted( + &mut state.changed_paths, + event_paths.iter().cloned(), + usize::MAX, + Ord::cmp, + ); + Some(event_paths) } @@ -3161,15 +3212,13 @@ impl BackgroundScanner { } let repo = snapshot.repository_for_path(&path)?; - let repo_path = repo.work_directory.relativize(&snapshot, &path)?; - let work_dir = repo.work_directory(snapshot)?; let work_dir_id = repo.work_directory; snapshot .git_repositories - .update(&work_dir_id, |entry| entry.scan_id = scan_id); + .update(&work_dir_id, |entry| entry.work_dir_scan_id = scan_id); snapshot.repository_entries.update(&work_dir, |entry| { entry @@ -3218,7 +3267,7 @@ impl BackgroundScanner { let statuses = repo.statuses().unwrap_or_default(); snapshot.git_repositories.update(&entry_id, |entry| { - entry.scan_id = scan_id; + entry.work_dir_scan_id = scan_id; entry.git_dir_scan_id = scan_id; }); @@ -3241,14 +3290,14 @@ impl BackgroundScanner { let work_dir = repo.work_directory(snapshot)?; let work_dir_id = repo.work_directory.clone(); - snapshot - .git_repositories - .update(&work_dir_id, |entry| entry.scan_id = scan_id); - - let local_repo = snapshot.get_local_repo(&repo)?.to_owned(); + let (local_repo, git_dir_scan_id) = + snapshot.git_repositories.update(&work_dir_id, |entry| { + entry.work_dir_scan_id = scan_id; + (entry.repo_ptr.clone(), entry.git_dir_scan_id) + })?; // Short circuit if we've already scanned everything - if local_repo.git_dir_scan_id == scan_id { + if git_dir_scan_id == scan_id { return None; } @@ -3259,7 +3308,7 @@ impl BackgroundScanner { continue; }; - let status = local_repo.repo_ptr.lock().status(&repo_path); + let status = local_repo.lock().status(&repo_path); if let Some(status) = status { repository.statuses.insert(repo_path.clone(), status); } else { @@ -3276,7 +3325,7 @@ impl BackgroundScanner { async fn update_ignore_statuses(&self) { use futures::FutureExt as _; - let mut snapshot = self.snapshot.lock().clone(); + let mut snapshot = self.state.lock().snapshot.clone(); let mut ignores_to_update = Vec::new(); let mut ignores_to_delete = Vec::new(); let abs_path = snapshot.abs_path.clone(); @@ -3298,8 +3347,9 @@ impl BackgroundScanner { for parent_abs_path in ignores_to_delete { snapshot.ignores_by_parent_abs_path.remove(&parent_abs_path); - self.snapshot + self.state .lock() + .snapshot .ignores_by_parent_abs_path .remove(&parent_abs_path); } @@ -3391,9 +3441,20 @@ impl BackgroundScanner { } } - let mut snapshot = self.snapshot.lock(); - snapshot.entries_by_path.edit(entries_by_path_edits, &()); - snapshot.entries_by_id.edit(entries_by_id_edits, &()); + let state = &mut self.state.lock(); + for edit in &entries_by_path_edits { + if let Edit::Insert(entry) = edit { + if let Err(ix) = state.changed_paths.binary_search(&entry.path) { + state.changed_paths.insert(ix, entry.path.clone()); + } + } + } + + state + .snapshot + .entries_by_path + .edit(entries_by_path_edits, &()); + state.snapshot.entries_by_id.edit(entries_by_id_edits, &()); } fn build_change_set( @@ -3401,18 +3462,25 @@ impl BackgroundScanner { old_snapshot: &Snapshot, new_snapshot: &Snapshot, event_paths: &[Arc], - ) -> HashMap<(Arc, ProjectEntryId), PathChange> { - use PathChange::{Added, AddedOrUpdated, Removed, Updated}; + ) -> UpdatedEntriesSet { + use BackgroundScannerPhase::*; + use PathChange::{Added, AddedOrUpdated, Loaded, Removed, Updated}; - let mut changes = HashMap::default(); + // Identify which paths have changed. Use the known set of changed + // parent paths to optimize the search. + let mut changes = Vec::new(); let mut old_paths = old_snapshot.entries_by_path.cursor::(); let mut new_paths = new_snapshot.entries_by_path.cursor::(); - let received_before_initialized = !self.finished_initial_scan; - + old_paths.next(&()); + new_paths.next(&()); for path in event_paths { let path = PathKey(path.clone()); - old_paths.seek(&path, Bias::Left, &()); - new_paths.seek(&path, Bias::Left, &()); + if old_paths.item().map_or(false, |e| e.path < path.0) { + old_paths.seek_forward(&path, Bias::Left, &()); + } + if new_paths.item().map_or(false, |e| e.path < path.0) { + new_paths.seek_forward(&path, Bias::Left, &()); + } loop { match (old_paths.item(), new_paths.item()) { @@ -3427,36 +3495,56 @@ impl BackgroundScanner { match Ord::cmp(&old_entry.path, &new_entry.path) { Ordering::Less => { - changes.insert((old_entry.path.clone(), old_entry.id), Removed); + changes.push((old_entry.path.clone(), old_entry.id, Removed)); old_paths.next(&()); } Ordering::Equal => { - if received_before_initialized { + if self.phase == EventsReceivedDuringInitialScan { // If the worktree was not fully initialized when this event was generated, // we can't know whether this entry was added during the scan or whether // it was merely updated. - changes.insert( - (new_entry.path.clone(), new_entry.id), + changes.push(( + new_entry.path.clone(), + new_entry.id, AddedOrUpdated, - ); - } else if old_entry.mtime != new_entry.mtime { - changes.insert((new_entry.path.clone(), new_entry.id), Updated); + )); + } else if old_entry.id != new_entry.id { + changes.push((old_entry.path.clone(), old_entry.id, Removed)); + changes.push((new_entry.path.clone(), new_entry.id, Added)); + } else if old_entry != new_entry { + changes.push((new_entry.path.clone(), new_entry.id, Updated)); } old_paths.next(&()); new_paths.next(&()); } Ordering::Greater => { - changes.insert((new_entry.path.clone(), new_entry.id), Added); + changes.push(( + new_entry.path.clone(), + new_entry.id, + if self.phase == InitialScan { + Loaded + } else { + Added + }, + )); new_paths.next(&()); } } } (Some(old_entry), None) => { - changes.insert((old_entry.path.clone(), old_entry.id), Removed); + changes.push((old_entry.path.clone(), old_entry.id, Removed)); old_paths.next(&()); } (None, Some(new_entry)) => { - changes.insert((new_entry.path.clone(), new_entry.id), Added); + changes.push(( + new_entry.path.clone(), + new_entry.id, + if self.phase == InitialScan { + Loaded + } else { + Added + }, + )); new_paths.next(&()); } (None, None) => break, @@ -3464,7 +3552,7 @@ impl BackgroundScanner { } } - changes + changes.into() } async fn progress_timer(&self, running: bool) { @@ -3525,8 +3613,6 @@ impl WorktreeHandle for ModelHandle { &self, cx: &'a gpui::TestAppContext, ) -> futures::future::LocalBoxFuture<'a, ()> { - use smol::future::FutureExt; - let filename = "fs-event-sentinel"; let tree = self.clone(); let (fs, root_path) = self.read_with(cx, |tree, _| { @@ -4189,7 +4275,18 @@ mod tests { .await .unwrap(); - let mut snapshot1 = tree.update(cx, |tree, _| tree.as_local().unwrap().snapshot()); + let snapshot1 = tree.update(cx, |tree, cx| { + let tree = tree.as_local_mut().unwrap(); + let snapshot = Arc::new(Mutex::new(tree.snapshot())); + let _ = tree.observe_updates(0, cx, { + let snapshot = snapshot.clone(); + move |update| { + snapshot.lock().apply_remote_update(update).unwrap(); + async { true } + } + }); + snapshot + }); let entry = tree .update(cx, |tree, cx| { @@ -4207,9 +4304,10 @@ mod tests { }); let snapshot2 = tree.update(cx, |tree, _| tree.as_local().unwrap().snapshot()); - let update = snapshot2.build_update(&snapshot1, 0, 0, true); - snapshot1.apply_remote_update(update).unwrap(); - assert_eq!(snapshot1.to_vec(true), snapshot2.to_vec(true),); + assert_eq!( + snapshot1.lock().entries(true).collect::>(), + snapshot2.entries(true).collect::>() + ); } #[gpui::test(iterations = 100)] @@ -4244,7 +4342,20 @@ mod tests { .await .unwrap(); - let mut snapshot = worktree.update(cx, |tree, _| tree.as_local().unwrap().snapshot()); + let mut snapshots = + vec![worktree.read_with(cx, |tree, _| tree.as_local().unwrap().snapshot())]; + let updates = Arc::new(Mutex::new(Vec::new())); + worktree.update(cx, |tree, cx| { + check_worktree_change_events(tree, cx); + + let _ = tree.as_local_mut().unwrap().observe_updates(0, cx, { + let updates = updates.clone(); + move |update| { + updates.lock().push(update); + async { true } + } + }); + }); for _ in 0..operations { worktree @@ -4258,35 +4369,39 @@ mod tests { }); if rng.gen_bool(0.6) { - let new_snapshot = - worktree.read_with(cx, |tree, _| tree.as_local().unwrap().snapshot()); - let update = new_snapshot.build_update(&snapshot, 0, 0, true); - snapshot.apply_remote_update(update.clone()).unwrap(); - assert_eq!( - snapshot.to_vec(true), - new_snapshot.to_vec(true), - "incorrect snapshot after update {:?}", - update - ); + snapshots + .push(worktree.read_with(cx, |tree, _| tree.as_local().unwrap().snapshot())); } } worktree .update(cx, |tree, _| tree.as_local_mut().unwrap().scan_complete()) .await; - worktree.read_with(cx, |tree, _| { - tree.as_local().unwrap().snapshot.check_invariants() + + cx.foreground().run_until_parked(); + + let final_snapshot = worktree.read_with(cx, |tree, _| { + let tree = tree.as_local().unwrap(); + tree.snapshot.check_invariants(); + tree.snapshot() }); - let new_snapshot = worktree.read_with(cx, |tree, _| tree.as_local().unwrap().snapshot()); - let update = new_snapshot.build_update(&snapshot, 0, 0, true); - snapshot.apply_remote_update(update.clone()).unwrap(); - assert_eq!( - snapshot.to_vec(true), - new_snapshot.to_vec(true), - "incorrect snapshot after update {:?}", - update - ); + for (i, snapshot) in snapshots.into_iter().enumerate().rev() { + let mut updated_snapshot = snapshot.clone(); + for update in updates.lock().iter() { + if update.scan_id >= updated_snapshot.scan_id() as u64 { + updated_snapshot + .apply_remote_update(update.clone()) + .unwrap(); + } + } + + assert_eq!( + updated_snapshot.entries(true).collect::>(), + final_snapshot.entries(true).collect::>(), + "wrong updates after snapshot {i}: {snapshot:#?} {updates:#?}", + ); + } } #[gpui::test(iterations = 100)] @@ -4318,57 +4433,23 @@ mod tests { .await .unwrap(); - worktree - .update(cx, |tree, _| tree.as_local_mut().unwrap().scan_complete()) - .await; - - // After the initial scan is complete, the `UpdatedEntries` event can - // be used to follow along with all changes to the worktree's snapshot. + let updates = Arc::new(Mutex::new(Vec::new())); worktree.update(cx, |tree, cx| { - let mut paths = tree - .as_local() - .unwrap() - .paths() - .cloned() - .collect::>(); - - cx.subscribe(&worktree, move |tree, _, event, _| { - if let Event::UpdatedEntries(changes) = event { - for ((path, _), change_type) in changes.iter() { - let path = path.clone(); - let ix = match paths.binary_search(&path) { - Ok(ix) | Err(ix) => ix, - }; - match change_type { - PathChange::Added => { - assert_ne!(paths.get(ix), Some(&path)); - paths.insert(ix, path); - } - - PathChange::Removed => { - assert_eq!(paths.get(ix), Some(&path)); - paths.remove(ix); - } + check_worktree_change_events(tree, cx); - PathChange::Updated => { - assert_eq!(paths.get(ix), Some(&path)); - } - - PathChange::AddedOrUpdated => { - if paths[ix] != path { - paths.insert(ix, path); - } - } - } - } - - let new_paths = tree.paths().cloned().collect::>(); - assert_eq!(paths, new_paths, "incorrect changes: {:?}", changes); + let _ = tree.as_local_mut().unwrap().observe_updates(0, cx, { + let updates = updates.clone(); + move |update| { + updates.lock().push(update); + async { true } } - }) - .detach(); + }); }); + worktree + .update(cx, |tree, _| tree.as_local_mut().unwrap().scan_complete()) + .await; + fs.as_fake().pause_events(); let mut snapshots = Vec::new(); let mut mutations_len = operations; @@ -4425,40 +4506,66 @@ mod tests { .await; let new_snapshot = new_worktree.read_with(cx, |tree, _| tree.as_local().unwrap().snapshot()); - assert_eq!(snapshot.to_vec(true), new_snapshot.to_vec(true)); + assert_eq!( + snapshot.entries_without_ids(true), + new_snapshot.entries_without_ids(true) + ); } - for (i, mut prev_snapshot) in snapshots.into_iter().enumerate() { - let include_ignored = rng.gen::(); - if !include_ignored { - let mut entries_by_path_edits = Vec::new(); - let mut entries_by_id_edits = Vec::new(); - for entry in prev_snapshot - .entries_by_id - .cursor::<()>() - .filter(|e| e.is_ignored) - { - entries_by_path_edits.push(Edit::Remove(PathKey(entry.path.clone()))); - entries_by_id_edits.push(Edit::Remove(entry.id)); + for (i, mut prev_snapshot) in snapshots.into_iter().enumerate().rev() { + for update in updates.lock().iter() { + if update.scan_id >= prev_snapshot.scan_id() as u64 { + prev_snapshot.apply_remote_update(update.clone()).unwrap(); } - - prev_snapshot - .entries_by_path - .edit(entries_by_path_edits, &()); - prev_snapshot.entries_by_id.edit(entries_by_id_edits, &()); } - let update = snapshot.build_update(&prev_snapshot, 0, 0, include_ignored); - prev_snapshot.apply_remote_update(update.clone()).unwrap(); assert_eq!( - prev_snapshot.to_vec(include_ignored), - snapshot.to_vec(include_ignored), - "wrong update for snapshot {i}. update: {:?}", - update + prev_snapshot.entries(true).collect::>(), + snapshot.entries(true).collect::>(), + "wrong updates after snapshot {i}: {updates:#?}", ); } } + // The worktree's `UpdatedEntries` event can be used to follow along with + // all changes to the worktree's snapshot. + fn check_worktree_change_events(tree: &mut Worktree, cx: &mut ModelContext) { + let mut entries = tree.entries(true).cloned().collect::>(); + cx.subscribe(&cx.handle(), move |tree, _, event, _| { + if let Event::UpdatedEntries(changes) = event { + for (path, _, change_type) in changes.iter() { + let entry = tree.entry_for_path(&path).cloned(); + let ix = match entries.binary_search_by_key(&path, |e| &e.path) { + Ok(ix) | Err(ix) => ix, + }; + match change_type { + PathChange::Loaded => entries.insert(ix, entry.unwrap()), + PathChange::Added => entries.insert(ix, entry.unwrap()), + PathChange::Removed => drop(entries.remove(ix)), + PathChange::Updated => { + let entry = entry.unwrap(); + let existing_entry = entries.get_mut(ix).unwrap(); + assert_eq!(existing_entry.path, entry.path); + *existing_entry = entry; + } + PathChange::AddedOrUpdated => { + let entry = entry.unwrap(); + if entries.get(ix).map(|e| &e.path) == Some(&entry.path) { + *entries.get_mut(ix).unwrap() = entry; + } else { + entries.insert(ix, entry); + } + } + } + } + + let new_entries = tree.entries(true).cloned().collect::>(); + assert_eq!(entries, new_entries, "incorrect changes: {:?}", changes); + } + }) + .detach(); + } + fn randomly_mutate_worktree( worktree: &mut Worktree, rng: &mut impl Rng, @@ -4750,7 +4857,7 @@ mod tests { } } - fn to_vec(&self, include_ignored: bool) -> Vec<(&Path, u64, bool)> { + fn entries_without_ids(&self, include_ignored: bool) -> Vec<(&Path, u64, bool)> { let mut paths = Vec::new(); for entry in self.entries_by_path.cursor::<()>() { if include_ignored || !entry.is_ignored { @@ -4942,8 +5049,8 @@ mod tests { assert_eq!( repo_update_events.lock()[0] - .keys() - .cloned() + .iter() + .map(|e| e.0.clone()) .collect::>>(), vec![Path::new("dir1").into()] );