From 00eafe63d9f1124a4f1fa4cd5fffeb2ec0a36dbe Mon Sep 17 00:00:00 2001 From: Jakub Konka Date: Fri, 7 Nov 2025 07:34:06 +0100 Subject: [PATCH] git: Make long-running git staging snappy in git panel (#42149) Previously, staging a large file in the git panel would block the UI items until that operation finished. This is due to the fact that staging is a git op that is locked globally by git (per repo) meaning only one op that is modifying the git index can run at any one time. In order to make the UI snappy while letting any pending git staging jobs to finish in the background, we track their progress via `PendingOps` indexed by git entry path. We have already had a concept of pending operations however they existed at the UI layer in the `GitPanel` abstraction. This PR moves and augments `PendingOps` into the model `Repository` in `git_store` which seems like a more natural place for tracking running git jobs/operations. Thanks to this, pending ops are now stored in a `SumTree` indexed by git entry path part of the `Repository` snapshot, which makes for efficient access from the UI. Release Notes: - Improved UI responsiveness when staging/unstaging large files in the git panel --- crates/git/src/status.rs | 10 +- crates/git_ui/src/git_panel.rs | 347 +++++++--------- crates/project/src/git_store.rs | 379 ++++++++++++------ crates/project/src/git_store/pending_op.rs | 147 +++++++ crates/project/src/project_tests.rs | 442 ++++++++++++++++++++- 5 files changed, 1003 insertions(+), 322 deletions(-) create mode 100644 crates/project/src/git_store/pending_op.rs diff --git a/crates/git/src/status.rs b/crates/git/src/status.rs index f3401a0e93990c61df80e0e88e28292c4f2b28e2..a36e24dd3bf0a8c67ee1c70566f4564ba8362616 100644 --- a/crates/git/src/status.rs +++ b/crates/git/src/status.rs @@ -64,23 +64,23 @@ pub enum StageStatus { } impl StageStatus { - pub fn is_fully_staged(&self) -> bool { + pub const fn is_fully_staged(&self) -> bool { matches!(self, StageStatus::Staged) } - pub fn is_fully_unstaged(&self) -> bool { + pub const fn is_fully_unstaged(&self) -> bool { matches!(self, StageStatus::Unstaged) } - pub fn has_staged(&self) -> bool { + pub const fn has_staged(&self) -> bool { matches!(self, StageStatus::Staged | StageStatus::PartiallyStaged) } - pub fn has_unstaged(&self) -> bool { + pub const fn has_unstaged(&self) -> bool { matches!(self, StageStatus::Unstaged | StageStatus::PartiallyStaged) } - pub fn as_bool(self) -> Option { + pub const fn as_bool(self) -> Option { match self { StageStatus::Staged => Some(true), StageStatus::Unstaged => Some(false), diff --git a/crates/git_ui/src/git_panel.rs b/crates/git_ui/src/git_panel.rs index 7cbcef3dce0433ef3b14e4f60bf0e191c72bd2cb..ae8ee5e0b373754e554b2b817c3be264fee0c7d4 100644 --- a/crates/git_ui/src/git_panel.rs +++ b/crates/git_ui/src/git_panel.rs @@ -47,7 +47,7 @@ use panel::{ }; use project::{ Fs, Project, ProjectPath, - git_store::{GitStoreEvent, Repository, RepositoryEvent, RepositoryId}, + git_store::{GitStoreEvent, Repository, RepositoryEvent, RepositoryId, pending_op}, }; use serde::{Deserialize, Serialize}; use settings::{Settings, SettingsStore, StatusStyle}; @@ -271,27 +271,6 @@ impl GitStatusEntry { } } -#[derive(Clone, Copy, Debug, PartialEq, Eq)] -enum TargetStatus { - Staged, - Unstaged, - Reverted, - Unchanged, -} - -struct PendingOperation { - finished: bool, - target_status: TargetStatus, - entries: Vec, - op_id: usize, -} - -impl PendingOperation { - fn contains_path(&self, path: &RepoPath) -> bool { - self.entries.iter().any(|p| &p.repo_path == path) - } -} - pub struct GitPanel { pub(crate) active_repository: Option>, pub(crate) commit_editor: Entity, @@ -307,7 +286,6 @@ pub struct GitPanel { new_count: usize, entry_count: usize, new_staged_count: usize, - pending: Vec, pending_commit: Option>, amend_pending: bool, original_commit_message: Option, @@ -427,7 +405,7 @@ impl GitPanel { move |this, _git_store, event, window, cx| match event { GitStoreEvent::ActiveRepositoryChanged(_) => { this.active_repository = this.project.read(cx).active_repository(cx); - this.schedule_update(true, window, cx); + this.schedule_update(window, cx); } GitStoreEvent::RepositoryUpdated( _, @@ -436,7 +414,7 @@ impl GitPanel { | RepositoryEvent::MergeHeadsChanged, true, ) => { - this.schedule_update(true, window, cx); + this.schedule_update(window, cx); } GitStoreEvent::RepositoryUpdated( _, @@ -445,7 +423,7 @@ impl GitPanel { ) | GitStoreEvent::RepositoryAdded | GitStoreEvent::RepositoryRemoved(_) => { - this.schedule_update(false, window, cx); + this.schedule_update(window, cx); } GitStoreEvent::IndexWriteError(error) => { this.workspace @@ -472,7 +450,6 @@ impl GitPanel { fs, new_count: 0, new_staged_count: 0, - pending: Vec::new(), pending_commit: None, amend_pending: false, original_commit_message: None, @@ -501,7 +478,7 @@ impl GitPanel { _settings_subscription, }; - this.schedule_update(false, window, cx); + this.schedule_update(window, cx); this }) } @@ -1013,15 +990,7 @@ impl GitPanel { return; }; - let op_id = self.pending.iter().map(|p| p.op_id).max().unwrap_or(0) + 1; - self.pending.push(PendingOperation { - op_id, - target_status: TargetStatus::Reverted, - entries: entries.clone(), - finished: false, - }); - self.update_visible_entries(window, cx); - let task = cx.spawn(async move |_, cx| { + let task = cx.spawn_in(window, async move |this, cx| { let tasks: Vec<_> = workspace.update(cx, |workspace, cx| { workspace.project().update(cx, |project, cx| { entries @@ -1038,8 +1007,8 @@ impl GitPanel { let buffers = futures::future::join_all(tasks).await; - active_repository - .update(cx, |repo, cx| { + this.update_in(cx, |this, window, cx| { + let task = active_repository.update(cx, |repo, cx| { repo.checkout_files( "HEAD", entries @@ -1048,10 +1017,14 @@ impl GitPanel { .collect(), cx, ) - })? - .await??; + }); + this.update_visible_entries(window, cx); + cx.notify(); + task + })? + .await?; - let tasks: Vec<_> = cx.update(|cx| { + let tasks: Vec<_> = cx.update(|_, cx| { buffers .iter() .filter_map(|buffer| { @@ -1071,21 +1044,10 @@ impl GitPanel { let result = task.await; this.update_in(cx, |this, window, cx| { - for pending in this.pending.iter_mut() { - if pending.op_id == op_id { - pending.finished = true; - if result.is_err() { - pending.target_status = TargetStatus::Unchanged; - this.update_visible_entries(window, cx); - } - break; - } + if let Err(err) = result { + this.update_visible_entries(window, cx); + this.show_error_toast("checkout", err, cx); } - result - .map_err(|e| { - this.show_error_toast("checkout", e, cx); - }) - .ok(); }) .ok(); }) @@ -1213,26 +1175,44 @@ impl GitPanel { }); } + fn change_all_files_stage(&mut self, stage: bool, cx: &mut Context) { + let Some(active_repository) = self.active_repository.clone() else { + return; + }; + cx.spawn({ + async move |this, cx| { + let result = this + .update(cx, |this, cx| { + let task = active_repository.update(cx, |repo, cx| { + if stage { + repo.stage_all(cx) + } else { + repo.unstage_all(cx) + } + }); + this.update_counts(active_repository.read(cx)); + cx.notify(); + task + })? + .await; + + this.update(cx, |this, cx| { + if let Err(err) = result { + this.show_error_toast(if stage { "add" } else { "reset" }, err, cx); + } + cx.notify() + }) + } + }) + .detach(); + } + pub fn stage_all(&mut self, _: &StageAll, _window: &mut Window, cx: &mut Context) { - let entries = self - .entries - .iter() - .filter_map(|entry| entry.status_entry()) - .filter(|status_entry| status_entry.staging.has_unstaged()) - .cloned() - .collect::>(); - self.change_file_stage(true, entries, cx); + self.change_all_files_stage(true, cx); } pub fn unstage_all(&mut self, _: &UnstageAll, _window: &mut Window, cx: &mut Context) { - let entries = self - .entries - .iter() - .filter_map(|entry| entry.status_entry()) - .filter(|status_entry| status_entry.staging.has_staged()) - .cloned() - .collect::>(); - self.change_file_stage(false, entries, cx); + self.change_all_files_stage(false, cx); } fn toggle_staged_for_entry( @@ -1247,9 +1227,12 @@ impl GitPanel { let (stage, repo_paths) = match entry { GitListEntry::Status(status_entry) => { let repo_paths = vec![status_entry.clone()]; - let stage = if let Some(status) = self.entry_staging(&status_entry) { - !status.is_fully_staged() - } else if status_entry.status.staging().is_fully_staged() { + let stage = if active_repository + .read(cx) + .pending_ops_for_path(&status_entry.repo_path) + .map(|ops| ops.staging() || ops.staged()) + .unwrap_or(status_entry.status.staging().has_staged()) + { if let Some(op) = self.bulk_staging.clone() && op.anchor == status_entry.repo_path { @@ -1291,56 +1274,31 @@ impl GitPanel { let Some(active_repository) = self.active_repository.clone() else { return; }; - let op_id = self.pending.iter().map(|p| p.op_id).max().unwrap_or(0) + 1; - self.pending.push(PendingOperation { - op_id, - target_status: if stage { - TargetStatus::Staged - } else { - TargetStatus::Unstaged - }, - entries: entries.clone(), - finished: false, - }); - let repository = active_repository.read(cx); - self.update_counts(repository); - cx.notify(); - cx.spawn({ async move |this, cx| { - let result = cx - .update(|cx| { - if stage { - active_repository.update(cx, |repo, cx| { - let repo_paths = entries - .iter() - .map(|entry| entry.repo_path.clone()) - .collect(); + let result = this + .update(cx, |this, cx| { + let task = active_repository.update(cx, |repo, cx| { + let repo_paths = entries + .iter() + .map(|entry| entry.repo_path.clone()) + .collect(); + if stage { repo.stage_entries(repo_paths, cx) - }) - } else { - active_repository.update(cx, |repo, cx| { - let repo_paths = entries - .iter() - .map(|entry| entry.repo_path.clone()) - .collect(); + } else { repo.unstage_entries(repo_paths, cx) - }) - } + } + }); + this.update_counts(active_repository.read(cx)); + cx.notify(); + task })? .await; this.update(cx, |this, cx| { - for pending in this.pending.iter_mut() { - if pending.op_id == op_id { - pending.finished = true - } + if let Err(err) = result { + this.show_error_toast(if stage { "add" } else { "reset" }, err, cx); } - result - .map_err(|e| { - this.show_error_toast(if stage { "add" } else { "reset" }, e, cx); - }) - .ok(); cx.notify(); }) } @@ -2572,12 +2530,7 @@ impl GitPanel { message.push('\n'); } - fn schedule_update( - &mut self, - clear_pending: bool, - window: &mut Window, - cx: &mut Context, - ) { + fn schedule_update(&mut self, window: &mut Window, cx: &mut Context) { let handle = cx.entity().downgrade(); self.reopen_commit_buffer(window, cx); self.update_visible_entries_task = cx.spawn_in(window, async move |_, cx| { @@ -2585,9 +2538,6 @@ impl GitPanel { if let Some(git_panel) = handle.upgrade() { git_panel .update_in(cx, |git_panel, window, cx| { - if clear_pending { - git_panel.clear_pending(); - } git_panel.update_visible_entries(window, cx); }) .ok(); @@ -2636,10 +2586,6 @@ impl GitPanel { .detach_and_log_err(cx); } - fn clear_pending(&mut self) { - self.pending.retain(|v| !v.finished) - } - fn update_visible_entries(&mut self, window: &mut Window, cx: &mut Context) { let path_style = self.project.read(cx).path_style(cx); let bulk_staging = self.bulk_staging.take(); @@ -2682,11 +2628,12 @@ impl GitPanel { let is_new = entry.status.is_created(); let staging = entry.status.staging(); - if self.pending.iter().any(|pending| { - pending.target_status == TargetStatus::Reverted - && !pending.finished - && pending.contains_path(&entry.repo_path) - }) { + if let Some(pending) = repo.pending_ops_for_path(&entry.repo_path) + && pending + .ops + .iter() + .any(|op| op.git_status == pending_op::GitStatus::Reverted && op.finished()) + { continue; } @@ -2727,30 +2674,35 @@ impl GitPanel { } } - let mut pending_staged_count = 0; - let mut last_pending_staged = None; - let mut pending_status_for_single_staged = None; - for pending in self.pending.iter() { - if pending.target_status == TargetStatus::Staged { - pending_staged_count += pending.entries.len(); - last_pending_staged = pending.entries.first().cloned(); - } - if let Some(single_staged) = &single_staged_entry - && pending.contains_path(&single_staged.repo_path) + if conflict_entries.is_empty() { + if staged_count == 1 + && let Some(entry) = single_staged_entry.as_ref() { - pending_status_for_single_staged = Some(pending.target_status); - } - } - - if conflict_entries.is_empty() && staged_count == 1 && pending_staged_count == 0 { - match pending_status_for_single_staged { - Some(TargetStatus::Staged) | None => { - self.single_staged_entry = single_staged_entry; + if let Some(ops) = repo.pending_ops_for_path(&entry.repo_path) { + if ops.staged() { + self.single_staged_entry = single_staged_entry; + } } - _ => {} + } else if repo + .pending_ops_by_path + .summary() + .item_summary + .staging_count + == 1 + { + self.single_staged_entry = repo.pending_ops_by_path.iter().find_map(|ops| { + if ops.staging() { + repo.status_for_path(&ops.repo_path) + .map(|status| GitStatusEntry { + repo_path: ops.repo_path.clone(), + status: status.status, + staging: StageStatus::Staged, + }) + } else { + None + } + }); } - } else if conflict_entries.is_empty() && pending_staged_count == 1 { - self.single_staged_entry = last_pending_staged; } if conflict_entries.is_empty() && changed_entries.len() == 1 { @@ -2799,7 +2751,10 @@ impl GitPanel { && let Some(index) = bulk_staging_anchor_new_index && let Some(entry) = self.entries.get(index) && let Some(entry) = entry.status_entry() - && self.entry_staging(entry).unwrap_or(entry.staging) == StageStatus::Staged + && repo + .pending_ops_for_path(&entry.repo_path) + .map(|ops| ops.staging() || ops.staged()) + .unwrap_or(entry.staging.has_staged()) { self.bulk_staging = bulk_staging; } @@ -2845,51 +2800,29 @@ impl GitPanel { continue; }; self.entry_count += 1; + let is_staging_or_staged = repo + .pending_ops_for_path(&status_entry.repo_path) + .map(|ops| ops.staging() || ops.staged()) + .unwrap_or(status_entry.staging.has_staged()); if repo.had_conflict_on_last_merge_head_change(&status_entry.repo_path) { self.conflicted_count += 1; - if self - .entry_staging(status_entry) - .unwrap_or(status_entry.staging) - .has_staged() - { + if is_staging_or_staged { self.conflicted_staged_count += 1; } } else if status_entry.status.is_created() { self.new_count += 1; - if self - .entry_staging(status_entry) - .unwrap_or(status_entry.staging) - .has_staged() - { + if is_staging_or_staged { self.new_staged_count += 1; } } else { self.tracked_count += 1; - if self - .entry_staging(status_entry) - .unwrap_or(status_entry.staging) - .has_staged() - { + if is_staging_or_staged { self.tracked_staged_count += 1; } } } } - fn entry_staging(&self, entry: &GitStatusEntry) -> Option { - for pending in self.pending.iter().rev() { - if pending.contains_path(&entry.repo_path) { - match pending.target_status { - TargetStatus::Staged => return Some(StageStatus::Staged), - TargetStatus::Unstaged => return Some(StageStatus::Unstaged), - TargetStatus::Reverted => continue, - TargetStatus::Unchanged => continue, - } - } - } - None - } - pub(crate) fn has_staged_changes(&self) -> bool { self.tracked_staged_count > 0 || self.new_staged_count > 0 @@ -3727,10 +3660,15 @@ impl GitPanel { let ix = self.entry_by_path(&repo_path, cx)?; let entry = self.entries.get(ix)?; - let status = entry.status_entry()?; - let entry_staging = self.entry_staging(status).unwrap_or(status.staging); + let is_staging_or_staged = if let Some(status_entry) = entry.status_entry() { + repo.pending_ops_for_path(&repo_path) + .map(|ops| ops.staging() || ops.staged()) + .unwrap_or(status_entry.staging.has_staged()) + } else { + false + }; - let checkbox = Checkbox::new("stage-file", entry_staging.as_bool().into()) + let checkbox = Checkbox::new("stage-file", is_staging_or_staged.into()) .disabled(!self.has_write_access(cx)) .fill() .elevation(ElevationIndex::Surface) @@ -4022,8 +3960,29 @@ impl GitPanel { let checkbox_id: ElementId = ElementId::Name(format!("entry_{}_{}_checkbox", display_name, ix).into()); - let entry_staging = self.entry_staging(entry).unwrap_or(entry.staging); - let mut is_staged: ToggleState = entry_staging.as_bool().into(); + let active_repo = self + .project + .read(cx) + .active_repository(cx) + .expect("active repository must be set"); + let repo = active_repo.read(cx); + // Checking for current staged/unstaged file status is a chained operation: + // 1. first, we check for any pending operation recorded in repository + // 2. if there are no pending ops either running or finished, we then ask the repository + // for the most up-to-date file status read from disk - we do this since `entry` arg to this function `render_entry` + // is likely to be staled, and may lead to weird artifacts in the form of subsecond auto-uncheck/check on + // the checkbox's state (or flickering) which is undesirable. + // 3. finally, if there is no info about this `entry` in the repo, we fall back to whatever status is encoded + // in `entry` arg. + let is_staging_or_staged = repo + .pending_ops_for_path(&entry.repo_path) + .map(|ops| ops.staging() || ops.staged()) + .or_else(|| { + repo.status_for_path(&entry.repo_path) + .map(|status| status.status.staging().has_staged()) + }) + .unwrap_or(entry.staging.has_staged()); + let mut is_staged: ToggleState = is_staging_or_staged.into(); if self.show_placeholders && !self.has_staged_changes() && !entry.status.is_created() { is_staged = ToggleState::Selected; } @@ -4142,9 +4101,11 @@ impl GitPanel { } }) .tooltip(move |_window, cx| { - let is_staged = entry_staging.is_fully_staged(); - - let action = if is_staged { "Unstage" } else { "Stage" }; + let action = if is_staging_or_staged { + "Unstage" + } else { + "Stage" + }; let tooltip_name = action.to_string(); Tooltip::for_action(tooltip_name, &ToggleStaged, cx) diff --git a/crates/project/src/git_store.rs b/crates/project/src/git_store.rs index f44a3cb570109651b9231f73b9e25c450b78dacc..3ea02d9e49a5edbd951a46a55eed8f48953c12b7 100644 --- a/crates/project/src/git_store.rs +++ b/crates/project/src/git_store.rs @@ -1,6 +1,7 @@ pub mod branch_diff; mod conflict_set; pub mod git_traversal; +pub mod pending_op; use crate::{ ProjectEnvironment, ProjectItem, ProjectPath, @@ -16,7 +17,10 @@ pub use conflict_set::{ConflictRegion, ConflictSet, ConflictSetSnapshot, Conflic use fs::Fs; use futures::{ FutureExt, StreamExt, - channel::{mpsc, oneshot}, + channel::{ + mpsc, + oneshot::{self, Canceled}, + }, future::{self, Shared}, stream::FuturesOrdered, }; @@ -44,6 +48,7 @@ use language::{ proto::{deserialize_version, serialize_version}, }; use parking_lot::Mutex; +use pending_op::{PendingOp, PendingOpId, PendingOps}; use postage::stream::Stream as _; use rpc::{ AnyProtoClient, TypedEnvelope, @@ -248,6 +253,7 @@ pub struct MergeDetails { pub struct RepositorySnapshot { pub id: RepositoryId, pub statuses_by_path: SumTree, + pub pending_ops_by_path: SumTree, pub work_directory_abs_path: Arc, pub path_style: PathStyle, pub branch: Option, @@ -311,6 +317,9 @@ pub enum RepositoryEvent { MergeHeadsChanged, BranchChanged, StashEntriesChanged, + PendingOpsChanged { + pending_ops: SumTree, + }, } #[derive(Clone, Debug)] @@ -338,7 +347,7 @@ pub struct GitJob { #[derive(PartialEq, Eq)] enum GitJobKey { - WriteIndex(RepoPath), + WriteIndex(Vec), ReloadBufferDiffBases, RefreshStatuses, ReloadGitState, @@ -2161,7 +2170,7 @@ impl GitStore { .update(&mut cx, |repository_handle, cx| { repository_handle.checkout_files(&envelope.payload.commit, paths, cx) })? - .await??; + .await?; Ok(proto::Ack {}) } @@ -2954,6 +2963,7 @@ impl RepositorySnapshot { Self { id, statuses_by_path: Default::default(), + pending_ops_by_path: Default::default(), work_directory_abs_path, branch: None, head_commit: None, @@ -3081,6 +3091,12 @@ impl RepositorySnapshot { .cloned() } + pub fn pending_ops_for_path(&self, path: &RepoPath) -> Option { + self.pending_ops_by_path + .get(&PathKey(path.0.clone()), ()) + .cloned() + } + pub fn abs_path_to_repo_path(&self, abs_path: &Path) -> Option { Self::abs_path_to_repo_path_inner(&self.work_directory_abs_path, abs_path, self.path_style) } @@ -3636,37 +3652,50 @@ impl Repository { &mut self, commit: &str, paths: Vec, - _cx: &mut App, - ) -> oneshot::Receiver> { + cx: &mut Context, + ) -> Task> { let commit = commit.to_string(); let id = self.id; - self.send_job( - Some(format!("git checkout {}", commit).into()), - move |git_repo, _| async move { - match git_repo { - RepositoryState::Local { - backend, - environment, - .. - } => { - backend - .checkout_files(commit, paths, environment.clone()) - .await - } - RepositoryState::Remote { project_id, client } => { - client - .request(proto::GitCheckoutFiles { - project_id: project_id.0, - repository_id: id.to_proto(), - commit, - paths: paths.into_iter().map(|p| p.to_proto()).collect(), - }) - .await?; - - Ok(()) - } - } + self.spawn_job_with_tracking( + paths.clone(), + pending_op::GitStatus::Reverted, + cx, + async move |this, cx| { + this.update(cx, |this, _cx| { + this.send_job( + Some(format!("git checkout {}", commit).into()), + move |git_repo, _| async move { + match git_repo { + RepositoryState::Local { + backend, + environment, + .. + } => { + backend + .checkout_files(commit, paths, environment.clone()) + .await + } + RepositoryState::Remote { project_id, client } => { + client + .request(proto::GitCheckoutFiles { + project_id: project_id.0, + repository_id: id.to_proto(), + commit, + paths: paths + .into_iter() + .map(|p| p.to_proto()) + .collect(), + }) + .await?; + + Ok(()) + } + } + }, + ) + })? + .await? }, ) } @@ -3796,7 +3825,7 @@ impl Repository { } pub fn stage_entries( - &self, + &mut self, entries: Vec, cx: &mut Context, ) -> Task> { @@ -3811,54 +3840,54 @@ impl Repository { .collect::>() .join(" "); let status = format!("git add {paths}"); - let job_key = match entries.len() { - 1 => Some(GitJobKey::WriteIndex(entries[0].clone())), - _ => None, - }; - - cx.spawn(async move |this, cx| { - for save_task in save_tasks { - save_task.await?; - } + let job_key = GitJobKey::WriteIndex(entries.clone()); - this.update(cx, |this, _| { - this.send_keyed_job( - job_key, - Some(status.into()), - move |git_repo, _cx| async move { - match git_repo { - RepositoryState::Local { - backend, - environment, - .. - } => backend.stage_paths(entries, environment.clone()).await, - RepositoryState::Remote { project_id, client } => { - client - .request(proto::Stage { - project_id: project_id.0, - repository_id: id.to_proto(), - paths: entries - .into_iter() - .map(|repo_path| repo_path.to_proto()) - .collect(), - }) - .await - .context("sending stage request")?; + self.spawn_job_with_tracking( + entries.clone(), + pending_op::GitStatus::Staged, + cx, + async move |this, cx| { + for save_task in save_tasks { + save_task.await?; + } - Ok(()) + this.update(cx, |this, _| { + this.send_keyed_job( + Some(job_key), + Some(status.into()), + move |git_repo, _cx| async move { + match git_repo { + RepositoryState::Local { + backend, + environment, + .. + } => backend.stage_paths(entries, environment.clone()).await, + RepositoryState::Remote { project_id, client } => { + client + .request(proto::Stage { + project_id: project_id.0, + repository_id: id.to_proto(), + paths: entries + .into_iter() + .map(|repo_path| repo_path.to_proto()) + .collect(), + }) + .await + .context("sending stage request")?; + + Ok(()) + } } - } - }, - ) - })? - .await??; - - Ok(()) - }) + }, + ) + })? + .await? + }, + ) } pub fn unstage_entries( - &self, + &mut self, entries: Vec, cx: &mut Context, ) -> Task> { @@ -3873,66 +3902,88 @@ impl Repository { .collect::>() .join(" "); let status = format!("git reset {paths}"); - let job_key = match entries.len() { - 1 => Some(GitJobKey::WriteIndex(entries[0].clone())), - _ => None, - }; + let job_key = GitJobKey::WriteIndex(entries.clone()); - cx.spawn(async move |this, cx| { - for save_task in save_tasks { - save_task.await?; - } - - this.update(cx, |this, _| { - this.send_keyed_job( - job_key, - Some(status.into()), - move |git_repo, _cx| async move { - match git_repo { - RepositoryState::Local { - backend, - environment, - .. - } => backend.unstage_paths(entries, environment).await, - RepositoryState::Remote { project_id, client } => { - client - .request(proto::Unstage { - project_id: project_id.0, - repository_id: id.to_proto(), - paths: entries - .into_iter() - .map(|repo_path| repo_path.to_proto()) - .collect(), - }) - .await - .context("sending unstage request")?; + self.spawn_job_with_tracking( + entries.clone(), + pending_op::GitStatus::Unstaged, + cx, + async move |this, cx| { + for save_task in save_tasks { + save_task.await?; + } - Ok(()) + this.update(cx, |this, _| { + this.send_keyed_job( + Some(job_key), + Some(status.into()), + move |git_repo, _cx| async move { + match git_repo { + RepositoryState::Local { + backend, + environment, + .. + } => backend.unstage_paths(entries, environment).await, + RepositoryState::Remote { project_id, client } => { + client + .request(proto::Unstage { + project_id: project_id.0, + repository_id: id.to_proto(), + paths: entries + .into_iter() + .map(|repo_path| repo_path.to_proto()) + .collect(), + }) + .await + .context("sending unstage request")?; + + Ok(()) + } } - } - }, - ) - })? - .await??; - - Ok(()) - }) + }, + ) + })? + .await? + }, + ) } - pub fn stage_all(&self, cx: &mut Context) -> Task> { + pub fn stage_all(&mut self, cx: &mut Context) -> Task> { let to_stage = self .cached_status() - .filter(|entry| !entry.status.staging().is_fully_staged()) - .map(|entry| entry.repo_path) + .filter_map(|entry| { + if let Some(ops) = self.pending_ops_for_path(&entry.repo_path) { + if ops.staging() || ops.staged() { + None + } else { + Some(entry.repo_path) + } + } else if entry.status.staging().has_staged() { + None + } else { + Some(entry.repo_path) + } + }) .collect(); self.stage_entries(to_stage, cx) } - pub fn unstage_all(&self, cx: &mut Context) -> Task> { + pub fn unstage_all(&mut self, cx: &mut Context) -> Task> { let to_unstage = self .cached_status() - .filter(|entry| entry.status.staging().has_staged()) - .map(|entry| entry.repo_path) + .filter_map(|entry| { + if let Some(ops) = self.pending_ops_for_path(&entry.repo_path) { + if !ops.staging() && !ops.staged() { + None + } else { + Some(entry.repo_path) + } + } else if entry.status.staging().has_unstaged() { + None + } else { + Some(entry.repo_path) + } + }) .collect(); self.unstage_entries(to_unstage, cx) } @@ -4368,7 +4419,7 @@ impl Repository { let this = cx.weak_entity(); let git_store = self.git_store.clone(); self.send_keyed_job( - Some(GitJobKey::WriteIndex(path.clone())), + Some(GitJobKey::WriteIndex(vec![path.clone()])), None, move |git_repo, mut cx| async move { log::debug!( @@ -5199,6 +5250,67 @@ impl Repository { pub fn barrier(&mut self) -> oneshot::Receiver<()> { self.send_job(None, |_, _| async {}) } + + fn spawn_job_with_tracking( + &mut self, + paths: Vec, + git_status: pending_op::GitStatus, + cx: &mut Context, + f: AsyncFn, + ) -> Task> + where + AsyncFn: AsyncFnOnce(WeakEntity, &mut AsyncApp) -> Result<()> + 'static, + { + let ids = self.new_pending_ops_for_paths(paths, git_status); + + cx.spawn(async move |this, cx| { + let (job_status, result) = match f(this.clone(), cx).await { + Ok(()) => (pending_op::JobStatus::Finished, Ok(())), + Err(err) if err.is::() => (pending_op::JobStatus::Skipped, Ok(())), + Err(err) => (pending_op::JobStatus::Error, Err(err)), + }; + + this.update(cx, |this, _| { + let mut edits = Vec::with_capacity(ids.len()); + for (id, entry) in ids { + if let Some(mut ops) = this.snapshot.pending_ops_for_path(&entry) { + if let Some(op) = ops.op_by_id_mut(id) { + op.job_status = job_status; + } + edits.push(sum_tree::Edit::Insert(ops)); + } + } + this.snapshot.pending_ops_by_path.edit(edits, ()); + })?; + + result + }) + } + + fn new_pending_ops_for_paths( + &mut self, + paths: Vec, + git_status: pending_op::GitStatus, + ) -> Vec<(PendingOpId, RepoPath)> { + let mut edits = Vec::with_capacity(paths.len()); + let mut ids = Vec::with_capacity(paths.len()); + for path in paths { + let mut ops = self + .snapshot + .pending_ops_for_path(&path) + .unwrap_or_else(|| PendingOps::new(&path)); + let id = ops.max_id() + 1; + ops.ops.push(PendingOp { + id, + git_status, + job_status: pending_op::JobStatus::Running, + }); + edits.push(sum_tree::Edit::Insert(ops)); + ids.push((id, path)); + } + self.snapshot.pending_ops_by_path.edit(edits, ()); + ids + } } fn get_permalink_in_rust_registry_src( @@ -5464,6 +5576,28 @@ async fn compute_snapshot( MergeDetails::load(&backend, &statuses_by_path, &prev_snapshot).await?; log::debug!("new merge details (changed={merge_heads_changed:?}): {merge_details:?}"); + let pending_ops_by_path = SumTree::from_iter( + prev_snapshot.pending_ops_by_path.iter().filter_map(|ops| { + let inner_ops: Vec = + ops.ops.iter().filter(|op| op.running()).cloned().collect(); + if inner_ops.is_empty() { + None + } else { + Some(PendingOps { + repo_path: ops.repo_path.clone(), + ops: inner_ops, + }) + } + }), + (), + ); + + if pending_ops_by_path != prev_snapshot.pending_ops_by_path { + events.push(RepositoryEvent::PendingOpsChanged { + pending_ops: prev_snapshot.pending_ops_by_path.clone(), + }) + } + if merge_heads_changed { events.push(RepositoryEvent::MergeHeadsChanged); } @@ -5489,6 +5623,7 @@ async fn compute_snapshot( let snapshot = RepositorySnapshot { id, statuses_by_path, + pending_ops_by_path, work_directory_abs_path, path_style: prev_snapshot.path_style, scan_id: prev_snapshot.scan_id + 1, diff --git a/crates/project/src/git_store/pending_op.rs b/crates/project/src/git_store/pending_op.rs new file mode 100644 index 0000000000000000000000000000000000000000..fd1b35035a8e334acdd244d8e663212f39bc383e --- /dev/null +++ b/crates/project/src/git_store/pending_op.rs @@ -0,0 +1,147 @@ +use git::repository::RepoPath; +use std::ops::Add; +use sum_tree::{ContextLessSummary, Item, KeyedItem}; +use worktree::{PathKey, PathSummary}; + +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub enum GitStatus { + Staged, + Unstaged, + Reverted, + Unchanged, +} + +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub enum JobStatus { + Running, + Finished, + Skipped, + Error, +} + +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct PendingOps { + pub repo_path: RepoPath, + pub ops: Vec, +} + +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub struct PendingOp { + pub id: PendingOpId, + pub git_status: GitStatus, + pub job_status: JobStatus, +} + +#[derive(Clone, Debug)] +pub struct PendingOpsSummary { + pub staged_count: usize, + pub staging_count: usize, +} + +#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)] +pub struct PendingOpId(pub u16); + +impl Item for PendingOps { + type Summary = PathSummary; + + fn summary(&self, _cx: ()) -> Self::Summary { + PathSummary { + max_path: self.repo_path.0.clone(), + item_summary: PendingOpsSummary { + staged_count: self.staged() as usize, + staging_count: self.staging() as usize, + }, + } + } +} + +impl ContextLessSummary for PendingOpsSummary { + fn zero() -> Self { + Self { + staged_count: 0, + staging_count: 0, + } + } + + fn add_summary(&mut self, summary: &Self) { + self.staged_count += summary.staged_count; + self.staging_count += summary.staging_count; + } +} + +impl KeyedItem for PendingOps { + type Key = PathKey; + + fn key(&self) -> Self::Key { + PathKey(self.repo_path.0.clone()) + } +} + +impl Add for PendingOpId { + type Output = PendingOpId; + + fn add(self, rhs: u16) -> Self::Output { + Self(self.0 + rhs) + } +} + +impl From for PendingOpId { + fn from(id: u16) -> Self { + Self(id) + } +} + +impl PendingOps { + pub fn new(path: &RepoPath) -> Self { + Self { + repo_path: path.clone(), + ops: Vec::new(), + } + } + + pub fn max_id(&self) -> PendingOpId { + self.ops.last().map(|op| op.id).unwrap_or_default() + } + + pub fn op_by_id(&self, id: PendingOpId) -> Option<&PendingOp> { + self.ops.iter().find(|op| op.id == id) + } + + pub fn op_by_id_mut(&mut self, id: PendingOpId) -> Option<&mut PendingOp> { + self.ops.iter_mut().find(|op| op.id == id) + } + + /// File is staged if the last job is finished and has status Staged. + pub fn staged(&self) -> bool { + if let Some(last) = self.ops.last() { + if last.git_status == GitStatus::Staged && last.job_status == JobStatus::Finished { + return true; + } + } + false + } + + /// File is staged if the last job is not finished and has status Staged. + pub fn staging(&self) -> bool { + if let Some(last) = self.ops.last() { + if last.git_status == GitStatus::Staged && last.job_status != JobStatus::Finished { + return true; + } + } + false + } +} + +impl PendingOp { + pub fn running(&self) -> bool { + self.job_status == JobStatus::Running + } + + pub fn finished(&self) -> bool { + matches!(self.job_status, JobStatus::Finished | JobStatus::Skipped) + } + + pub fn error(&self) -> bool { + self.job_status == JobStatus::Error + } +} diff --git a/crates/project/src/project_tests.rs b/crates/project/src/project_tests.rs index 3a824cb16eeaa0b2e82d14c89cc906e52e74cd7a..c7e53c652c97224cce4771afb3e90667d5efaa8a 100644 --- a/crates/project/src/project_tests.rs +++ b/crates/project/src/project_tests.rs @@ -2,7 +2,7 @@ use crate::{ Event, - git_store::{GitStoreEvent, RepositoryEvent, StatusEntry}, + git_store::{GitStoreEvent, RepositoryEvent, StatusEntry, pending_op}, task_inventory::TaskContexts, task_store::TaskSettingsLocation, *, @@ -20,7 +20,7 @@ use git::{ status::{StatusCode, TrackedStatus}, }; use git2::RepositoryInitOptions; -use gpui::{App, BackgroundExecutor, SemanticVersion, UpdateGlobal}; +use gpui::{App, BackgroundExecutor, FutureExt, SemanticVersion, UpdateGlobal}; use itertools::Itertools; use language::{ Diagnostic, DiagnosticEntry, DiagnosticEntryRef, DiagnosticSet, DiagnosticSourceKind, @@ -50,6 +50,7 @@ use std::{ sync::{Arc, OnceLock}, task::Poll, }; +use sum_tree::SumTree; use task::{ResolvedTask, ShellKind, TaskContext}; use unindent::Unindent as _; use util::{ @@ -8369,6 +8370,443 @@ async fn test_git_status_postprocessing(cx: &mut gpui::TestAppContext) { }); } +#[track_caller] +/// We merge lhs into rhs. +fn merge_pending_ops_snapshots( + source: Vec, + mut target: Vec, +) -> Vec { + for s_ops in source { + if let Some(idx) = target.iter().zip(0..).find_map(|(ops, idx)| { + if ops.repo_path == s_ops.repo_path { + Some(idx) + } else { + None + } + }) { + let t_ops = &mut target[idx]; + for s_op in s_ops.ops { + if let Some(op_idx) = t_ops + .ops + .iter() + .zip(0..) + .find_map(|(op, idx)| if op.id == s_op.id { Some(idx) } else { None }) + { + let t_op = &mut t_ops.ops[op_idx]; + match (s_op.job_status, t_op.job_status) { + (pending_op::JobStatus::Running, _) => {} + (s_st, pending_op::JobStatus::Running) => t_op.job_status = s_st, + (s_st, t_st) if s_st == t_st => {} + _ => unreachable!(), + } + } else { + t_ops.ops.push(s_op); + } + } + t_ops.ops.sort_by(|l, r| l.id.cmp(&r.id)); + } else { + target.push(s_ops); + } + } + target +} + +#[gpui::test] +async fn test_repository_pending_ops_staging( + executor: gpui::BackgroundExecutor, + cx: &mut gpui::TestAppContext, +) { + init_test(cx); + + let fs = FakeFs::new(executor); + fs.insert_tree( + path!("/root"), + json!({ + "my-repo": { + ".git": {}, + "a.txt": "a", + } + + }), + ) + .await; + + fs.set_status_for_repo( + path!("/root/my-repo/.git").as_ref(), + &[("a.txt", FileStatus::Untracked)], + ); + + let project = Project::test(fs.clone(), [path!("/root/my-repo").as_ref()], cx).await; + let pending_ops_all = Arc::new(Mutex::new(SumTree::default())); + project.update(cx, |project, cx| { + let pending_ops_all = pending_ops_all.clone(); + cx.subscribe(project.git_store(), move |_, _, e, _| { + if let GitStoreEvent::RepositoryUpdated( + _, + RepositoryEvent::PendingOpsChanged { pending_ops }, + _, + ) = e + { + let merged = merge_pending_ops_snapshots( + pending_ops.items(()), + pending_ops_all.lock().items(()), + ); + *pending_ops_all.lock() = SumTree::from_iter(merged.into_iter(), ()); + } + }) + .detach(); + }); + project + .update(cx, |project, cx| project.git_scans_complete(cx)) + .await; + + let repo = project.read_with(cx, |project, cx| { + project.repositories(cx).values().next().unwrap().clone() + }); + + // Ensure we have no pending ops for any of the untracked files + repo.read_with(cx, |repo, _cx| { + assert!(repo.pending_ops_by_path.is_empty()); + }); + + let mut id = 1u16; + + let mut assert_stage = async |path: RepoPath, stage| { + let git_status = if stage { + pending_op::GitStatus::Staged + } else { + pending_op::GitStatus::Unstaged + }; + repo.update(cx, |repo, cx| { + let task = if stage { + repo.stage_entries(vec![path.clone()], cx) + } else { + repo.unstage_entries(vec![path.clone()], cx) + }; + let ops = repo.pending_ops_for_path(&path).unwrap(); + assert_eq!( + ops.ops.last(), + Some(&pending_op::PendingOp { + id: id.into(), + git_status, + job_status: pending_op::JobStatus::Running + }) + ); + task + }) + .await + .unwrap(); + + repo.read_with(cx, |repo, _cx| { + let ops = repo.pending_ops_for_path(&path).unwrap(); + assert_eq!( + ops.ops.last(), + Some(&pending_op::PendingOp { + id: id.into(), + git_status, + job_status: pending_op::JobStatus::Finished + }) + ); + }); + + id += 1; + }; + + assert_stage(repo_path("a.txt"), true).await; + assert_stage(repo_path("a.txt"), false).await; + assert_stage(repo_path("a.txt"), true).await; + assert_stage(repo_path("a.txt"), false).await; + assert_stage(repo_path("a.txt"), true).await; + + cx.run_until_parked(); + + assert_eq!( + pending_ops_all + .lock() + .get(&worktree::PathKey(repo_path("a.txt").0), ()) + .unwrap() + .ops, + vec![ + pending_op::PendingOp { + id: 1u16.into(), + git_status: pending_op::GitStatus::Staged, + job_status: pending_op::JobStatus::Finished + }, + pending_op::PendingOp { + id: 2u16.into(), + git_status: pending_op::GitStatus::Unstaged, + job_status: pending_op::JobStatus::Finished + }, + pending_op::PendingOp { + id: 3u16.into(), + git_status: pending_op::GitStatus::Staged, + job_status: pending_op::JobStatus::Finished + }, + pending_op::PendingOp { + id: 4u16.into(), + git_status: pending_op::GitStatus::Unstaged, + job_status: pending_op::JobStatus::Finished + }, + pending_op::PendingOp { + id: 5u16.into(), + git_status: pending_op::GitStatus::Staged, + job_status: pending_op::JobStatus::Finished + } + ], + ); + + repo.update(cx, |repo, _cx| { + let git_statuses = repo.cached_status().collect::>(); + + assert_eq!( + git_statuses, + [StatusEntry { + repo_path: repo_path("a.txt"), + status: TrackedStatus { + index_status: StatusCode::Added, + worktree_status: StatusCode::Unmodified + } + .into(), + }] + ); + }); +} + +#[gpui::test] +async fn test_repository_pending_ops_long_running_staging( + executor: gpui::BackgroundExecutor, + cx: &mut gpui::TestAppContext, +) { + init_test(cx); + + let fs = FakeFs::new(executor); + fs.insert_tree( + path!("/root"), + json!({ + "my-repo": { + ".git": {}, + "a.txt": "a", + } + + }), + ) + .await; + + fs.set_status_for_repo( + path!("/root/my-repo/.git").as_ref(), + &[("a.txt", FileStatus::Untracked)], + ); + + let project = Project::test(fs.clone(), [path!("/root/my-repo").as_ref()], cx).await; + let pending_ops_all = Arc::new(Mutex::new(SumTree::default())); + project.update(cx, |project, cx| { + let pending_ops_all = pending_ops_all.clone(); + cx.subscribe(project.git_store(), move |_, _, e, _| { + if let GitStoreEvent::RepositoryUpdated( + _, + RepositoryEvent::PendingOpsChanged { pending_ops }, + _, + ) = e + { + let merged = merge_pending_ops_snapshots( + pending_ops.items(()), + pending_ops_all.lock().items(()), + ); + *pending_ops_all.lock() = SumTree::from_iter(merged.into_iter(), ()); + } + }) + .detach(); + }); + + project + .update(cx, |project, cx| project.git_scans_complete(cx)) + .await; + + let repo = project.read_with(cx, |project, cx| { + project.repositories(cx).values().next().unwrap().clone() + }); + + repo.update(cx, |repo, cx| { + repo.stage_entries(vec![repo_path("a.txt")], cx) + }) + .detach(); + + repo.update(cx, |repo, cx| { + repo.stage_entries(vec![repo_path("a.txt")], cx) + }) + .unwrap() + .with_timeout(Duration::from_secs(1), &cx.executor()) + .await + .unwrap(); + + cx.run_until_parked(); + + assert_eq!( + pending_ops_all + .lock() + .get(&worktree::PathKey(repo_path("a.txt").0), ()) + .unwrap() + .ops, + vec![ + pending_op::PendingOp { + id: 1u16.into(), + git_status: pending_op::GitStatus::Staged, + job_status: pending_op::JobStatus::Skipped + }, + pending_op::PendingOp { + id: 2u16.into(), + git_status: pending_op::GitStatus::Staged, + job_status: pending_op::JobStatus::Finished + } + ], + ); + + repo.update(cx, |repo, _cx| { + let git_statuses = repo.cached_status().collect::>(); + + assert_eq!( + git_statuses, + [StatusEntry { + repo_path: repo_path("a.txt"), + status: TrackedStatus { + index_status: StatusCode::Added, + worktree_status: StatusCode::Unmodified + } + .into(), + }] + ); + }); +} + +#[gpui::test] +async fn test_repository_pending_ops_stage_all( + executor: gpui::BackgroundExecutor, + cx: &mut gpui::TestAppContext, +) { + init_test(cx); + + let fs = FakeFs::new(executor); + fs.insert_tree( + path!("/root"), + json!({ + "my-repo": { + ".git": {}, + "a.txt": "a", + "b.txt": "b" + } + + }), + ) + .await; + + fs.set_status_for_repo( + path!("/root/my-repo/.git").as_ref(), + &[ + ("a.txt", FileStatus::Untracked), + ("b.txt", FileStatus::Untracked), + ], + ); + + let project = Project::test(fs.clone(), [path!("/root/my-repo").as_ref()], cx).await; + let pending_ops_all = Arc::new(Mutex::new(SumTree::default())); + project.update(cx, |project, cx| { + let pending_ops_all = pending_ops_all.clone(); + cx.subscribe(project.git_store(), move |_, _, e, _| { + if let GitStoreEvent::RepositoryUpdated( + _, + RepositoryEvent::PendingOpsChanged { pending_ops }, + _, + ) = e + { + let merged = merge_pending_ops_snapshots( + pending_ops.items(()), + pending_ops_all.lock().items(()), + ); + *pending_ops_all.lock() = SumTree::from_iter(merged.into_iter(), ()); + } + }) + .detach(); + }); + project + .update(cx, |project, cx| project.git_scans_complete(cx)) + .await; + + let repo = project.read_with(cx, |project, cx| { + project.repositories(cx).values().next().unwrap().clone() + }); + + repo.update(cx, |repo, cx| { + repo.stage_entries(vec![repo_path("a.txt")], cx) + }) + .await + .unwrap(); + repo.update(cx, |repo, cx| repo.stage_all(cx)) + .await + .unwrap(); + repo.update(cx, |repo, cx| repo.unstage_all(cx)) + .await + .unwrap(); + + cx.run_until_parked(); + + assert_eq!( + pending_ops_all + .lock() + .get(&worktree::PathKey(repo_path("a.txt").0), ()) + .unwrap() + .ops, + vec![ + pending_op::PendingOp { + id: 1u16.into(), + git_status: pending_op::GitStatus::Staged, + job_status: pending_op::JobStatus::Finished + }, + pending_op::PendingOp { + id: 2u16.into(), + git_status: pending_op::GitStatus::Unstaged, + job_status: pending_op::JobStatus::Finished + }, + ], + ); + assert_eq!( + pending_ops_all + .lock() + .get(&worktree::PathKey(repo_path("b.txt").0), ()) + .unwrap() + .ops, + vec![ + pending_op::PendingOp { + id: 1u16.into(), + git_status: pending_op::GitStatus::Staged, + job_status: pending_op::JobStatus::Finished + }, + pending_op::PendingOp { + id: 2u16.into(), + git_status: pending_op::GitStatus::Unstaged, + job_status: pending_op::JobStatus::Finished + }, + ], + ); + + repo.update(cx, |repo, _cx| { + let git_statuses = repo.cached_status().collect::>(); + + assert_eq!( + git_statuses, + [ + StatusEntry { + repo_path: repo_path("a.txt"), + status: FileStatus::Untracked, + }, + StatusEntry { + repo_path: repo_path("b.txt"), + status: FileStatus::Untracked, + }, + ] + ); + }); +} + #[gpui::test] async fn test_repository_subfolder_git_status( executor: gpui::BackgroundExecutor,