diff --git a/crates/git/src/status.rs b/crates/git/src/status.rs index f3401a0e93990c61df80e0e88e28292c4f2b28e2..a36e24dd3bf0a8c67ee1c70566f4564ba8362616 100644 --- a/crates/git/src/status.rs +++ b/crates/git/src/status.rs @@ -64,23 +64,23 @@ pub enum StageStatus { } impl StageStatus { - pub fn is_fully_staged(&self) -> bool { + pub const fn is_fully_staged(&self) -> bool { matches!(self, StageStatus::Staged) } - pub fn is_fully_unstaged(&self) -> bool { + pub const fn is_fully_unstaged(&self) -> bool { matches!(self, StageStatus::Unstaged) } - pub fn has_staged(&self) -> bool { + pub const fn has_staged(&self) -> bool { matches!(self, StageStatus::Staged | StageStatus::PartiallyStaged) } - pub fn has_unstaged(&self) -> bool { + pub const fn has_unstaged(&self) -> bool { matches!(self, StageStatus::Unstaged | StageStatus::PartiallyStaged) } - pub fn as_bool(self) -> Option { + pub const fn as_bool(self) -> Option { match self { StageStatus::Staged => Some(true), StageStatus::Unstaged => Some(false), diff --git a/crates/git_ui/src/git_panel.rs b/crates/git_ui/src/git_panel.rs index 7cbcef3dce0433ef3b14e4f60bf0e191c72bd2cb..ae8ee5e0b373754e554b2b817c3be264fee0c7d4 100644 --- a/crates/git_ui/src/git_panel.rs +++ b/crates/git_ui/src/git_panel.rs @@ -47,7 +47,7 @@ use panel::{ }; use project::{ Fs, Project, ProjectPath, - git_store::{GitStoreEvent, Repository, RepositoryEvent, RepositoryId}, + git_store::{GitStoreEvent, Repository, RepositoryEvent, RepositoryId, pending_op}, }; use serde::{Deserialize, Serialize}; use settings::{Settings, SettingsStore, StatusStyle}; @@ -271,27 +271,6 @@ impl GitStatusEntry { } } -#[derive(Clone, Copy, Debug, PartialEq, Eq)] -enum TargetStatus { - Staged, - Unstaged, - Reverted, - Unchanged, -} - -struct PendingOperation { - finished: bool, - target_status: TargetStatus, - entries: Vec, - op_id: usize, -} - -impl PendingOperation { - fn contains_path(&self, path: &RepoPath) -> bool { - self.entries.iter().any(|p| &p.repo_path == path) - } -} - pub struct GitPanel { pub(crate) active_repository: Option>, pub(crate) commit_editor: Entity, @@ -307,7 +286,6 @@ pub struct GitPanel { new_count: usize, entry_count: usize, new_staged_count: usize, - pending: Vec, pending_commit: Option>, amend_pending: bool, original_commit_message: Option, @@ -427,7 +405,7 @@ impl GitPanel { move |this, _git_store, event, window, cx| match event { GitStoreEvent::ActiveRepositoryChanged(_) => { this.active_repository = this.project.read(cx).active_repository(cx); - this.schedule_update(true, window, cx); + this.schedule_update(window, cx); } GitStoreEvent::RepositoryUpdated( _, @@ -436,7 +414,7 @@ impl GitPanel { | RepositoryEvent::MergeHeadsChanged, true, ) => { - this.schedule_update(true, window, cx); + this.schedule_update(window, cx); } GitStoreEvent::RepositoryUpdated( _, @@ -445,7 +423,7 @@ impl GitPanel { ) | GitStoreEvent::RepositoryAdded | GitStoreEvent::RepositoryRemoved(_) => { - this.schedule_update(false, window, cx); + this.schedule_update(window, cx); } GitStoreEvent::IndexWriteError(error) => { this.workspace @@ -472,7 +450,6 @@ impl GitPanel { fs, new_count: 0, new_staged_count: 0, - pending: Vec::new(), pending_commit: None, amend_pending: false, original_commit_message: None, @@ -501,7 +478,7 @@ impl GitPanel { _settings_subscription, }; - this.schedule_update(false, window, cx); + this.schedule_update(window, cx); this }) } @@ -1013,15 +990,7 @@ impl GitPanel { return; }; - let op_id = self.pending.iter().map(|p| p.op_id).max().unwrap_or(0) + 1; - self.pending.push(PendingOperation { - op_id, - target_status: TargetStatus::Reverted, - entries: entries.clone(), - finished: false, - }); - self.update_visible_entries(window, cx); - let task = cx.spawn(async move |_, cx| { + let task = cx.spawn_in(window, async move |this, cx| { let tasks: Vec<_> = workspace.update(cx, |workspace, cx| { workspace.project().update(cx, |project, cx| { entries @@ -1038,8 +1007,8 @@ impl GitPanel { let buffers = futures::future::join_all(tasks).await; - active_repository - .update(cx, |repo, cx| { + this.update_in(cx, |this, window, cx| { + let task = active_repository.update(cx, |repo, cx| { repo.checkout_files( "HEAD", entries @@ -1048,10 +1017,14 @@ impl GitPanel { .collect(), cx, ) - })? - .await??; + }); + this.update_visible_entries(window, cx); + cx.notify(); + task + })? + .await?; - let tasks: Vec<_> = cx.update(|cx| { + let tasks: Vec<_> = cx.update(|_, cx| { buffers .iter() .filter_map(|buffer| { @@ -1071,21 +1044,10 @@ impl GitPanel { let result = task.await; this.update_in(cx, |this, window, cx| { - for pending in this.pending.iter_mut() { - if pending.op_id == op_id { - pending.finished = true; - if result.is_err() { - pending.target_status = TargetStatus::Unchanged; - this.update_visible_entries(window, cx); - } - break; - } + if let Err(err) = result { + this.update_visible_entries(window, cx); + this.show_error_toast("checkout", err, cx); } - result - .map_err(|e| { - this.show_error_toast("checkout", e, cx); - }) - .ok(); }) .ok(); }) @@ -1213,26 +1175,44 @@ impl GitPanel { }); } + fn change_all_files_stage(&mut self, stage: bool, cx: &mut Context) { + let Some(active_repository) = self.active_repository.clone() else { + return; + }; + cx.spawn({ + async move |this, cx| { + let result = this + .update(cx, |this, cx| { + let task = active_repository.update(cx, |repo, cx| { + if stage { + repo.stage_all(cx) + } else { + repo.unstage_all(cx) + } + }); + this.update_counts(active_repository.read(cx)); + cx.notify(); + task + })? + .await; + + this.update(cx, |this, cx| { + if let Err(err) = result { + this.show_error_toast(if stage { "add" } else { "reset" }, err, cx); + } + cx.notify() + }) + } + }) + .detach(); + } + pub fn stage_all(&mut self, _: &StageAll, _window: &mut Window, cx: &mut Context) { - let entries = self - .entries - .iter() - .filter_map(|entry| entry.status_entry()) - .filter(|status_entry| status_entry.staging.has_unstaged()) - .cloned() - .collect::>(); - self.change_file_stage(true, entries, cx); + self.change_all_files_stage(true, cx); } pub fn unstage_all(&mut self, _: &UnstageAll, _window: &mut Window, cx: &mut Context) { - let entries = self - .entries - .iter() - .filter_map(|entry| entry.status_entry()) - .filter(|status_entry| status_entry.staging.has_staged()) - .cloned() - .collect::>(); - self.change_file_stage(false, entries, cx); + self.change_all_files_stage(false, cx); } fn toggle_staged_for_entry( @@ -1247,9 +1227,12 @@ impl GitPanel { let (stage, repo_paths) = match entry { GitListEntry::Status(status_entry) => { let repo_paths = vec![status_entry.clone()]; - let stage = if let Some(status) = self.entry_staging(&status_entry) { - !status.is_fully_staged() - } else if status_entry.status.staging().is_fully_staged() { + let stage = if active_repository + .read(cx) + .pending_ops_for_path(&status_entry.repo_path) + .map(|ops| ops.staging() || ops.staged()) + .unwrap_or(status_entry.status.staging().has_staged()) + { if let Some(op) = self.bulk_staging.clone() && op.anchor == status_entry.repo_path { @@ -1291,56 +1274,31 @@ impl GitPanel { let Some(active_repository) = self.active_repository.clone() else { return; }; - let op_id = self.pending.iter().map(|p| p.op_id).max().unwrap_or(0) + 1; - self.pending.push(PendingOperation { - op_id, - target_status: if stage { - TargetStatus::Staged - } else { - TargetStatus::Unstaged - }, - entries: entries.clone(), - finished: false, - }); - let repository = active_repository.read(cx); - self.update_counts(repository); - cx.notify(); - cx.spawn({ async move |this, cx| { - let result = cx - .update(|cx| { - if stage { - active_repository.update(cx, |repo, cx| { - let repo_paths = entries - .iter() - .map(|entry| entry.repo_path.clone()) - .collect(); + let result = this + .update(cx, |this, cx| { + let task = active_repository.update(cx, |repo, cx| { + let repo_paths = entries + .iter() + .map(|entry| entry.repo_path.clone()) + .collect(); + if stage { repo.stage_entries(repo_paths, cx) - }) - } else { - active_repository.update(cx, |repo, cx| { - let repo_paths = entries - .iter() - .map(|entry| entry.repo_path.clone()) - .collect(); + } else { repo.unstage_entries(repo_paths, cx) - }) - } + } + }); + this.update_counts(active_repository.read(cx)); + cx.notify(); + task })? .await; this.update(cx, |this, cx| { - for pending in this.pending.iter_mut() { - if pending.op_id == op_id { - pending.finished = true - } + if let Err(err) = result { + this.show_error_toast(if stage { "add" } else { "reset" }, err, cx); } - result - .map_err(|e| { - this.show_error_toast(if stage { "add" } else { "reset" }, e, cx); - }) - .ok(); cx.notify(); }) } @@ -2572,12 +2530,7 @@ impl GitPanel { message.push('\n'); } - fn schedule_update( - &mut self, - clear_pending: bool, - window: &mut Window, - cx: &mut Context, - ) { + fn schedule_update(&mut self, window: &mut Window, cx: &mut Context) { let handle = cx.entity().downgrade(); self.reopen_commit_buffer(window, cx); self.update_visible_entries_task = cx.spawn_in(window, async move |_, cx| { @@ -2585,9 +2538,6 @@ impl GitPanel { if let Some(git_panel) = handle.upgrade() { git_panel .update_in(cx, |git_panel, window, cx| { - if clear_pending { - git_panel.clear_pending(); - } git_panel.update_visible_entries(window, cx); }) .ok(); @@ -2636,10 +2586,6 @@ impl GitPanel { .detach_and_log_err(cx); } - fn clear_pending(&mut self) { - self.pending.retain(|v| !v.finished) - } - fn update_visible_entries(&mut self, window: &mut Window, cx: &mut Context) { let path_style = self.project.read(cx).path_style(cx); let bulk_staging = self.bulk_staging.take(); @@ -2682,11 +2628,12 @@ impl GitPanel { let is_new = entry.status.is_created(); let staging = entry.status.staging(); - if self.pending.iter().any(|pending| { - pending.target_status == TargetStatus::Reverted - && !pending.finished - && pending.contains_path(&entry.repo_path) - }) { + if let Some(pending) = repo.pending_ops_for_path(&entry.repo_path) + && pending + .ops + .iter() + .any(|op| op.git_status == pending_op::GitStatus::Reverted && op.finished()) + { continue; } @@ -2727,30 +2674,35 @@ impl GitPanel { } } - let mut pending_staged_count = 0; - let mut last_pending_staged = None; - let mut pending_status_for_single_staged = None; - for pending in self.pending.iter() { - if pending.target_status == TargetStatus::Staged { - pending_staged_count += pending.entries.len(); - last_pending_staged = pending.entries.first().cloned(); - } - if let Some(single_staged) = &single_staged_entry - && pending.contains_path(&single_staged.repo_path) + if conflict_entries.is_empty() { + if staged_count == 1 + && let Some(entry) = single_staged_entry.as_ref() { - pending_status_for_single_staged = Some(pending.target_status); - } - } - - if conflict_entries.is_empty() && staged_count == 1 && pending_staged_count == 0 { - match pending_status_for_single_staged { - Some(TargetStatus::Staged) | None => { - self.single_staged_entry = single_staged_entry; + if let Some(ops) = repo.pending_ops_for_path(&entry.repo_path) { + if ops.staged() { + self.single_staged_entry = single_staged_entry; + } } - _ => {} + } else if repo + .pending_ops_by_path + .summary() + .item_summary + .staging_count + == 1 + { + self.single_staged_entry = repo.pending_ops_by_path.iter().find_map(|ops| { + if ops.staging() { + repo.status_for_path(&ops.repo_path) + .map(|status| GitStatusEntry { + repo_path: ops.repo_path.clone(), + status: status.status, + staging: StageStatus::Staged, + }) + } else { + None + } + }); } - } else if conflict_entries.is_empty() && pending_staged_count == 1 { - self.single_staged_entry = last_pending_staged; } if conflict_entries.is_empty() && changed_entries.len() == 1 { @@ -2799,7 +2751,10 @@ impl GitPanel { && let Some(index) = bulk_staging_anchor_new_index && let Some(entry) = self.entries.get(index) && let Some(entry) = entry.status_entry() - && self.entry_staging(entry).unwrap_or(entry.staging) == StageStatus::Staged + && repo + .pending_ops_for_path(&entry.repo_path) + .map(|ops| ops.staging() || ops.staged()) + .unwrap_or(entry.staging.has_staged()) { self.bulk_staging = bulk_staging; } @@ -2845,51 +2800,29 @@ impl GitPanel { continue; }; self.entry_count += 1; + let is_staging_or_staged = repo + .pending_ops_for_path(&status_entry.repo_path) + .map(|ops| ops.staging() || ops.staged()) + .unwrap_or(status_entry.staging.has_staged()); if repo.had_conflict_on_last_merge_head_change(&status_entry.repo_path) { self.conflicted_count += 1; - if self - .entry_staging(status_entry) - .unwrap_or(status_entry.staging) - .has_staged() - { + if is_staging_or_staged { self.conflicted_staged_count += 1; } } else if status_entry.status.is_created() { self.new_count += 1; - if self - .entry_staging(status_entry) - .unwrap_or(status_entry.staging) - .has_staged() - { + if is_staging_or_staged { self.new_staged_count += 1; } } else { self.tracked_count += 1; - if self - .entry_staging(status_entry) - .unwrap_or(status_entry.staging) - .has_staged() - { + if is_staging_or_staged { self.tracked_staged_count += 1; } } } } - fn entry_staging(&self, entry: &GitStatusEntry) -> Option { - for pending in self.pending.iter().rev() { - if pending.contains_path(&entry.repo_path) { - match pending.target_status { - TargetStatus::Staged => return Some(StageStatus::Staged), - TargetStatus::Unstaged => return Some(StageStatus::Unstaged), - TargetStatus::Reverted => continue, - TargetStatus::Unchanged => continue, - } - } - } - None - } - pub(crate) fn has_staged_changes(&self) -> bool { self.tracked_staged_count > 0 || self.new_staged_count > 0 @@ -3727,10 +3660,15 @@ impl GitPanel { let ix = self.entry_by_path(&repo_path, cx)?; let entry = self.entries.get(ix)?; - let status = entry.status_entry()?; - let entry_staging = self.entry_staging(status).unwrap_or(status.staging); + let is_staging_or_staged = if let Some(status_entry) = entry.status_entry() { + repo.pending_ops_for_path(&repo_path) + .map(|ops| ops.staging() || ops.staged()) + .unwrap_or(status_entry.staging.has_staged()) + } else { + false + }; - let checkbox = Checkbox::new("stage-file", entry_staging.as_bool().into()) + let checkbox = Checkbox::new("stage-file", is_staging_or_staged.into()) .disabled(!self.has_write_access(cx)) .fill() .elevation(ElevationIndex::Surface) @@ -4022,8 +3960,29 @@ impl GitPanel { let checkbox_id: ElementId = ElementId::Name(format!("entry_{}_{}_checkbox", display_name, ix).into()); - let entry_staging = self.entry_staging(entry).unwrap_or(entry.staging); - let mut is_staged: ToggleState = entry_staging.as_bool().into(); + let active_repo = self + .project + .read(cx) + .active_repository(cx) + .expect("active repository must be set"); + let repo = active_repo.read(cx); + // Checking for current staged/unstaged file status is a chained operation: + // 1. first, we check for any pending operation recorded in repository + // 2. if there are no pending ops either running or finished, we then ask the repository + // for the most up-to-date file status read from disk - we do this since `entry` arg to this function `render_entry` + // is likely to be staled, and may lead to weird artifacts in the form of subsecond auto-uncheck/check on + // the checkbox's state (or flickering) which is undesirable. + // 3. finally, if there is no info about this `entry` in the repo, we fall back to whatever status is encoded + // in `entry` arg. + let is_staging_or_staged = repo + .pending_ops_for_path(&entry.repo_path) + .map(|ops| ops.staging() || ops.staged()) + .or_else(|| { + repo.status_for_path(&entry.repo_path) + .map(|status| status.status.staging().has_staged()) + }) + .unwrap_or(entry.staging.has_staged()); + let mut is_staged: ToggleState = is_staging_or_staged.into(); if self.show_placeholders && !self.has_staged_changes() && !entry.status.is_created() { is_staged = ToggleState::Selected; } @@ -4142,9 +4101,11 @@ impl GitPanel { } }) .tooltip(move |_window, cx| { - let is_staged = entry_staging.is_fully_staged(); - - let action = if is_staged { "Unstage" } else { "Stage" }; + let action = if is_staging_or_staged { + "Unstage" + } else { + "Stage" + }; let tooltip_name = action.to_string(); Tooltip::for_action(tooltip_name, &ToggleStaged, cx) diff --git a/crates/project/src/git_store.rs b/crates/project/src/git_store.rs index f44a3cb570109651b9231f73b9e25c450b78dacc..3ea02d9e49a5edbd951a46a55eed8f48953c12b7 100644 --- a/crates/project/src/git_store.rs +++ b/crates/project/src/git_store.rs @@ -1,6 +1,7 @@ pub mod branch_diff; mod conflict_set; pub mod git_traversal; +pub mod pending_op; use crate::{ ProjectEnvironment, ProjectItem, ProjectPath, @@ -16,7 +17,10 @@ pub use conflict_set::{ConflictRegion, ConflictSet, ConflictSetSnapshot, Conflic use fs::Fs; use futures::{ FutureExt, StreamExt, - channel::{mpsc, oneshot}, + channel::{ + mpsc, + oneshot::{self, Canceled}, + }, future::{self, Shared}, stream::FuturesOrdered, }; @@ -44,6 +48,7 @@ use language::{ proto::{deserialize_version, serialize_version}, }; use parking_lot::Mutex; +use pending_op::{PendingOp, PendingOpId, PendingOps}; use postage::stream::Stream as _; use rpc::{ AnyProtoClient, TypedEnvelope, @@ -248,6 +253,7 @@ pub struct MergeDetails { pub struct RepositorySnapshot { pub id: RepositoryId, pub statuses_by_path: SumTree, + pub pending_ops_by_path: SumTree, pub work_directory_abs_path: Arc, pub path_style: PathStyle, pub branch: Option, @@ -311,6 +317,9 @@ pub enum RepositoryEvent { MergeHeadsChanged, BranchChanged, StashEntriesChanged, + PendingOpsChanged { + pending_ops: SumTree, + }, } #[derive(Clone, Debug)] @@ -338,7 +347,7 @@ pub struct GitJob { #[derive(PartialEq, Eq)] enum GitJobKey { - WriteIndex(RepoPath), + WriteIndex(Vec), ReloadBufferDiffBases, RefreshStatuses, ReloadGitState, @@ -2161,7 +2170,7 @@ impl GitStore { .update(&mut cx, |repository_handle, cx| { repository_handle.checkout_files(&envelope.payload.commit, paths, cx) })? - .await??; + .await?; Ok(proto::Ack {}) } @@ -2954,6 +2963,7 @@ impl RepositorySnapshot { Self { id, statuses_by_path: Default::default(), + pending_ops_by_path: Default::default(), work_directory_abs_path, branch: None, head_commit: None, @@ -3081,6 +3091,12 @@ impl RepositorySnapshot { .cloned() } + pub fn pending_ops_for_path(&self, path: &RepoPath) -> Option { + self.pending_ops_by_path + .get(&PathKey(path.0.clone()), ()) + .cloned() + } + pub fn abs_path_to_repo_path(&self, abs_path: &Path) -> Option { Self::abs_path_to_repo_path_inner(&self.work_directory_abs_path, abs_path, self.path_style) } @@ -3636,37 +3652,50 @@ impl Repository { &mut self, commit: &str, paths: Vec, - _cx: &mut App, - ) -> oneshot::Receiver> { + cx: &mut Context, + ) -> Task> { let commit = commit.to_string(); let id = self.id; - self.send_job( - Some(format!("git checkout {}", commit).into()), - move |git_repo, _| async move { - match git_repo { - RepositoryState::Local { - backend, - environment, - .. - } => { - backend - .checkout_files(commit, paths, environment.clone()) - .await - } - RepositoryState::Remote { project_id, client } => { - client - .request(proto::GitCheckoutFiles { - project_id: project_id.0, - repository_id: id.to_proto(), - commit, - paths: paths.into_iter().map(|p| p.to_proto()).collect(), - }) - .await?; - - Ok(()) - } - } + self.spawn_job_with_tracking( + paths.clone(), + pending_op::GitStatus::Reverted, + cx, + async move |this, cx| { + this.update(cx, |this, _cx| { + this.send_job( + Some(format!("git checkout {}", commit).into()), + move |git_repo, _| async move { + match git_repo { + RepositoryState::Local { + backend, + environment, + .. + } => { + backend + .checkout_files(commit, paths, environment.clone()) + .await + } + RepositoryState::Remote { project_id, client } => { + client + .request(proto::GitCheckoutFiles { + project_id: project_id.0, + repository_id: id.to_proto(), + commit, + paths: paths + .into_iter() + .map(|p| p.to_proto()) + .collect(), + }) + .await?; + + Ok(()) + } + } + }, + ) + })? + .await? }, ) } @@ -3796,7 +3825,7 @@ impl Repository { } pub fn stage_entries( - &self, + &mut self, entries: Vec, cx: &mut Context, ) -> Task> { @@ -3811,54 +3840,54 @@ impl Repository { .collect::>() .join(" "); let status = format!("git add {paths}"); - let job_key = match entries.len() { - 1 => Some(GitJobKey::WriteIndex(entries[0].clone())), - _ => None, - }; - - cx.spawn(async move |this, cx| { - for save_task in save_tasks { - save_task.await?; - } + let job_key = GitJobKey::WriteIndex(entries.clone()); - this.update(cx, |this, _| { - this.send_keyed_job( - job_key, - Some(status.into()), - move |git_repo, _cx| async move { - match git_repo { - RepositoryState::Local { - backend, - environment, - .. - } => backend.stage_paths(entries, environment.clone()).await, - RepositoryState::Remote { project_id, client } => { - client - .request(proto::Stage { - project_id: project_id.0, - repository_id: id.to_proto(), - paths: entries - .into_iter() - .map(|repo_path| repo_path.to_proto()) - .collect(), - }) - .await - .context("sending stage request")?; + self.spawn_job_with_tracking( + entries.clone(), + pending_op::GitStatus::Staged, + cx, + async move |this, cx| { + for save_task in save_tasks { + save_task.await?; + } - Ok(()) + this.update(cx, |this, _| { + this.send_keyed_job( + Some(job_key), + Some(status.into()), + move |git_repo, _cx| async move { + match git_repo { + RepositoryState::Local { + backend, + environment, + .. + } => backend.stage_paths(entries, environment.clone()).await, + RepositoryState::Remote { project_id, client } => { + client + .request(proto::Stage { + project_id: project_id.0, + repository_id: id.to_proto(), + paths: entries + .into_iter() + .map(|repo_path| repo_path.to_proto()) + .collect(), + }) + .await + .context("sending stage request")?; + + Ok(()) + } } - } - }, - ) - })? - .await??; - - Ok(()) - }) + }, + ) + })? + .await? + }, + ) } pub fn unstage_entries( - &self, + &mut self, entries: Vec, cx: &mut Context, ) -> Task> { @@ -3873,66 +3902,88 @@ impl Repository { .collect::>() .join(" "); let status = format!("git reset {paths}"); - let job_key = match entries.len() { - 1 => Some(GitJobKey::WriteIndex(entries[0].clone())), - _ => None, - }; + let job_key = GitJobKey::WriteIndex(entries.clone()); - cx.spawn(async move |this, cx| { - for save_task in save_tasks { - save_task.await?; - } - - this.update(cx, |this, _| { - this.send_keyed_job( - job_key, - Some(status.into()), - move |git_repo, _cx| async move { - match git_repo { - RepositoryState::Local { - backend, - environment, - .. - } => backend.unstage_paths(entries, environment).await, - RepositoryState::Remote { project_id, client } => { - client - .request(proto::Unstage { - project_id: project_id.0, - repository_id: id.to_proto(), - paths: entries - .into_iter() - .map(|repo_path| repo_path.to_proto()) - .collect(), - }) - .await - .context("sending unstage request")?; + self.spawn_job_with_tracking( + entries.clone(), + pending_op::GitStatus::Unstaged, + cx, + async move |this, cx| { + for save_task in save_tasks { + save_task.await?; + } - Ok(()) + this.update(cx, |this, _| { + this.send_keyed_job( + Some(job_key), + Some(status.into()), + move |git_repo, _cx| async move { + match git_repo { + RepositoryState::Local { + backend, + environment, + .. + } => backend.unstage_paths(entries, environment).await, + RepositoryState::Remote { project_id, client } => { + client + .request(proto::Unstage { + project_id: project_id.0, + repository_id: id.to_proto(), + paths: entries + .into_iter() + .map(|repo_path| repo_path.to_proto()) + .collect(), + }) + .await + .context("sending unstage request")?; + + Ok(()) + } } - } - }, - ) - })? - .await??; - - Ok(()) - }) + }, + ) + })? + .await? + }, + ) } - pub fn stage_all(&self, cx: &mut Context) -> Task> { + pub fn stage_all(&mut self, cx: &mut Context) -> Task> { let to_stage = self .cached_status() - .filter(|entry| !entry.status.staging().is_fully_staged()) - .map(|entry| entry.repo_path) + .filter_map(|entry| { + if let Some(ops) = self.pending_ops_for_path(&entry.repo_path) { + if ops.staging() || ops.staged() { + None + } else { + Some(entry.repo_path) + } + } else if entry.status.staging().has_staged() { + None + } else { + Some(entry.repo_path) + } + }) .collect(); self.stage_entries(to_stage, cx) } - pub fn unstage_all(&self, cx: &mut Context) -> Task> { + pub fn unstage_all(&mut self, cx: &mut Context) -> Task> { let to_unstage = self .cached_status() - .filter(|entry| entry.status.staging().has_staged()) - .map(|entry| entry.repo_path) + .filter_map(|entry| { + if let Some(ops) = self.pending_ops_for_path(&entry.repo_path) { + if !ops.staging() && !ops.staged() { + None + } else { + Some(entry.repo_path) + } + } else if entry.status.staging().has_unstaged() { + None + } else { + Some(entry.repo_path) + } + }) .collect(); self.unstage_entries(to_unstage, cx) } @@ -4368,7 +4419,7 @@ impl Repository { let this = cx.weak_entity(); let git_store = self.git_store.clone(); self.send_keyed_job( - Some(GitJobKey::WriteIndex(path.clone())), + Some(GitJobKey::WriteIndex(vec![path.clone()])), None, move |git_repo, mut cx| async move { log::debug!( @@ -5199,6 +5250,67 @@ impl Repository { pub fn barrier(&mut self) -> oneshot::Receiver<()> { self.send_job(None, |_, _| async {}) } + + fn spawn_job_with_tracking( + &mut self, + paths: Vec, + git_status: pending_op::GitStatus, + cx: &mut Context, + f: AsyncFn, + ) -> Task> + where + AsyncFn: AsyncFnOnce(WeakEntity, &mut AsyncApp) -> Result<()> + 'static, + { + let ids = self.new_pending_ops_for_paths(paths, git_status); + + cx.spawn(async move |this, cx| { + let (job_status, result) = match f(this.clone(), cx).await { + Ok(()) => (pending_op::JobStatus::Finished, Ok(())), + Err(err) if err.is::() => (pending_op::JobStatus::Skipped, Ok(())), + Err(err) => (pending_op::JobStatus::Error, Err(err)), + }; + + this.update(cx, |this, _| { + let mut edits = Vec::with_capacity(ids.len()); + for (id, entry) in ids { + if let Some(mut ops) = this.snapshot.pending_ops_for_path(&entry) { + if let Some(op) = ops.op_by_id_mut(id) { + op.job_status = job_status; + } + edits.push(sum_tree::Edit::Insert(ops)); + } + } + this.snapshot.pending_ops_by_path.edit(edits, ()); + })?; + + result + }) + } + + fn new_pending_ops_for_paths( + &mut self, + paths: Vec, + git_status: pending_op::GitStatus, + ) -> Vec<(PendingOpId, RepoPath)> { + let mut edits = Vec::with_capacity(paths.len()); + let mut ids = Vec::with_capacity(paths.len()); + for path in paths { + let mut ops = self + .snapshot + .pending_ops_for_path(&path) + .unwrap_or_else(|| PendingOps::new(&path)); + let id = ops.max_id() + 1; + ops.ops.push(PendingOp { + id, + git_status, + job_status: pending_op::JobStatus::Running, + }); + edits.push(sum_tree::Edit::Insert(ops)); + ids.push((id, path)); + } + self.snapshot.pending_ops_by_path.edit(edits, ()); + ids + } } fn get_permalink_in_rust_registry_src( @@ -5464,6 +5576,28 @@ async fn compute_snapshot( MergeDetails::load(&backend, &statuses_by_path, &prev_snapshot).await?; log::debug!("new merge details (changed={merge_heads_changed:?}): {merge_details:?}"); + let pending_ops_by_path = SumTree::from_iter( + prev_snapshot.pending_ops_by_path.iter().filter_map(|ops| { + let inner_ops: Vec = + ops.ops.iter().filter(|op| op.running()).cloned().collect(); + if inner_ops.is_empty() { + None + } else { + Some(PendingOps { + repo_path: ops.repo_path.clone(), + ops: inner_ops, + }) + } + }), + (), + ); + + if pending_ops_by_path != prev_snapshot.pending_ops_by_path { + events.push(RepositoryEvent::PendingOpsChanged { + pending_ops: prev_snapshot.pending_ops_by_path.clone(), + }) + } + if merge_heads_changed { events.push(RepositoryEvent::MergeHeadsChanged); } @@ -5489,6 +5623,7 @@ async fn compute_snapshot( let snapshot = RepositorySnapshot { id, statuses_by_path, + pending_ops_by_path, work_directory_abs_path, path_style: prev_snapshot.path_style, scan_id: prev_snapshot.scan_id + 1, diff --git a/crates/project/src/git_store/pending_op.rs b/crates/project/src/git_store/pending_op.rs new file mode 100644 index 0000000000000000000000000000000000000000..fd1b35035a8e334acdd244d8e663212f39bc383e --- /dev/null +++ b/crates/project/src/git_store/pending_op.rs @@ -0,0 +1,147 @@ +use git::repository::RepoPath; +use std::ops::Add; +use sum_tree::{ContextLessSummary, Item, KeyedItem}; +use worktree::{PathKey, PathSummary}; + +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub enum GitStatus { + Staged, + Unstaged, + Reverted, + Unchanged, +} + +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub enum JobStatus { + Running, + Finished, + Skipped, + Error, +} + +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct PendingOps { + pub repo_path: RepoPath, + pub ops: Vec, +} + +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub struct PendingOp { + pub id: PendingOpId, + pub git_status: GitStatus, + pub job_status: JobStatus, +} + +#[derive(Clone, Debug)] +pub struct PendingOpsSummary { + pub staged_count: usize, + pub staging_count: usize, +} + +#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)] +pub struct PendingOpId(pub u16); + +impl Item for PendingOps { + type Summary = PathSummary; + + fn summary(&self, _cx: ()) -> Self::Summary { + PathSummary { + max_path: self.repo_path.0.clone(), + item_summary: PendingOpsSummary { + staged_count: self.staged() as usize, + staging_count: self.staging() as usize, + }, + } + } +} + +impl ContextLessSummary for PendingOpsSummary { + fn zero() -> Self { + Self { + staged_count: 0, + staging_count: 0, + } + } + + fn add_summary(&mut self, summary: &Self) { + self.staged_count += summary.staged_count; + self.staging_count += summary.staging_count; + } +} + +impl KeyedItem for PendingOps { + type Key = PathKey; + + fn key(&self) -> Self::Key { + PathKey(self.repo_path.0.clone()) + } +} + +impl Add for PendingOpId { + type Output = PendingOpId; + + fn add(self, rhs: u16) -> Self::Output { + Self(self.0 + rhs) + } +} + +impl From for PendingOpId { + fn from(id: u16) -> Self { + Self(id) + } +} + +impl PendingOps { + pub fn new(path: &RepoPath) -> Self { + Self { + repo_path: path.clone(), + ops: Vec::new(), + } + } + + pub fn max_id(&self) -> PendingOpId { + self.ops.last().map(|op| op.id).unwrap_or_default() + } + + pub fn op_by_id(&self, id: PendingOpId) -> Option<&PendingOp> { + self.ops.iter().find(|op| op.id == id) + } + + pub fn op_by_id_mut(&mut self, id: PendingOpId) -> Option<&mut PendingOp> { + self.ops.iter_mut().find(|op| op.id == id) + } + + /// File is staged if the last job is finished and has status Staged. + pub fn staged(&self) -> bool { + if let Some(last) = self.ops.last() { + if last.git_status == GitStatus::Staged && last.job_status == JobStatus::Finished { + return true; + } + } + false + } + + /// File is staged if the last job is not finished and has status Staged. + pub fn staging(&self) -> bool { + if let Some(last) = self.ops.last() { + if last.git_status == GitStatus::Staged && last.job_status != JobStatus::Finished { + return true; + } + } + false + } +} + +impl PendingOp { + pub fn running(&self) -> bool { + self.job_status == JobStatus::Running + } + + pub fn finished(&self) -> bool { + matches!(self.job_status, JobStatus::Finished | JobStatus::Skipped) + } + + pub fn error(&self) -> bool { + self.job_status == JobStatus::Error + } +} diff --git a/crates/project/src/project_tests.rs b/crates/project/src/project_tests.rs index 3a824cb16eeaa0b2e82d14c89cc906e52e74cd7a..c7e53c652c97224cce4771afb3e90667d5efaa8a 100644 --- a/crates/project/src/project_tests.rs +++ b/crates/project/src/project_tests.rs @@ -2,7 +2,7 @@ use crate::{ Event, - git_store::{GitStoreEvent, RepositoryEvent, StatusEntry}, + git_store::{GitStoreEvent, RepositoryEvent, StatusEntry, pending_op}, task_inventory::TaskContexts, task_store::TaskSettingsLocation, *, @@ -20,7 +20,7 @@ use git::{ status::{StatusCode, TrackedStatus}, }; use git2::RepositoryInitOptions; -use gpui::{App, BackgroundExecutor, SemanticVersion, UpdateGlobal}; +use gpui::{App, BackgroundExecutor, FutureExt, SemanticVersion, UpdateGlobal}; use itertools::Itertools; use language::{ Diagnostic, DiagnosticEntry, DiagnosticEntryRef, DiagnosticSet, DiagnosticSourceKind, @@ -50,6 +50,7 @@ use std::{ sync::{Arc, OnceLock}, task::Poll, }; +use sum_tree::SumTree; use task::{ResolvedTask, ShellKind, TaskContext}; use unindent::Unindent as _; use util::{ @@ -8369,6 +8370,443 @@ async fn test_git_status_postprocessing(cx: &mut gpui::TestAppContext) { }); } +#[track_caller] +/// We merge lhs into rhs. +fn merge_pending_ops_snapshots( + source: Vec, + mut target: Vec, +) -> Vec { + for s_ops in source { + if let Some(idx) = target.iter().zip(0..).find_map(|(ops, idx)| { + if ops.repo_path == s_ops.repo_path { + Some(idx) + } else { + None + } + }) { + let t_ops = &mut target[idx]; + for s_op in s_ops.ops { + if let Some(op_idx) = t_ops + .ops + .iter() + .zip(0..) + .find_map(|(op, idx)| if op.id == s_op.id { Some(idx) } else { None }) + { + let t_op = &mut t_ops.ops[op_idx]; + match (s_op.job_status, t_op.job_status) { + (pending_op::JobStatus::Running, _) => {} + (s_st, pending_op::JobStatus::Running) => t_op.job_status = s_st, + (s_st, t_st) if s_st == t_st => {} + _ => unreachable!(), + } + } else { + t_ops.ops.push(s_op); + } + } + t_ops.ops.sort_by(|l, r| l.id.cmp(&r.id)); + } else { + target.push(s_ops); + } + } + target +} + +#[gpui::test] +async fn test_repository_pending_ops_staging( + executor: gpui::BackgroundExecutor, + cx: &mut gpui::TestAppContext, +) { + init_test(cx); + + let fs = FakeFs::new(executor); + fs.insert_tree( + path!("/root"), + json!({ + "my-repo": { + ".git": {}, + "a.txt": "a", + } + + }), + ) + .await; + + fs.set_status_for_repo( + path!("/root/my-repo/.git").as_ref(), + &[("a.txt", FileStatus::Untracked)], + ); + + let project = Project::test(fs.clone(), [path!("/root/my-repo").as_ref()], cx).await; + let pending_ops_all = Arc::new(Mutex::new(SumTree::default())); + project.update(cx, |project, cx| { + let pending_ops_all = pending_ops_all.clone(); + cx.subscribe(project.git_store(), move |_, _, e, _| { + if let GitStoreEvent::RepositoryUpdated( + _, + RepositoryEvent::PendingOpsChanged { pending_ops }, + _, + ) = e + { + let merged = merge_pending_ops_snapshots( + pending_ops.items(()), + pending_ops_all.lock().items(()), + ); + *pending_ops_all.lock() = SumTree::from_iter(merged.into_iter(), ()); + } + }) + .detach(); + }); + project + .update(cx, |project, cx| project.git_scans_complete(cx)) + .await; + + let repo = project.read_with(cx, |project, cx| { + project.repositories(cx).values().next().unwrap().clone() + }); + + // Ensure we have no pending ops for any of the untracked files + repo.read_with(cx, |repo, _cx| { + assert!(repo.pending_ops_by_path.is_empty()); + }); + + let mut id = 1u16; + + let mut assert_stage = async |path: RepoPath, stage| { + let git_status = if stage { + pending_op::GitStatus::Staged + } else { + pending_op::GitStatus::Unstaged + }; + repo.update(cx, |repo, cx| { + let task = if stage { + repo.stage_entries(vec![path.clone()], cx) + } else { + repo.unstage_entries(vec![path.clone()], cx) + }; + let ops = repo.pending_ops_for_path(&path).unwrap(); + assert_eq!( + ops.ops.last(), + Some(&pending_op::PendingOp { + id: id.into(), + git_status, + job_status: pending_op::JobStatus::Running + }) + ); + task + }) + .await + .unwrap(); + + repo.read_with(cx, |repo, _cx| { + let ops = repo.pending_ops_for_path(&path).unwrap(); + assert_eq!( + ops.ops.last(), + Some(&pending_op::PendingOp { + id: id.into(), + git_status, + job_status: pending_op::JobStatus::Finished + }) + ); + }); + + id += 1; + }; + + assert_stage(repo_path("a.txt"), true).await; + assert_stage(repo_path("a.txt"), false).await; + assert_stage(repo_path("a.txt"), true).await; + assert_stage(repo_path("a.txt"), false).await; + assert_stage(repo_path("a.txt"), true).await; + + cx.run_until_parked(); + + assert_eq!( + pending_ops_all + .lock() + .get(&worktree::PathKey(repo_path("a.txt").0), ()) + .unwrap() + .ops, + vec![ + pending_op::PendingOp { + id: 1u16.into(), + git_status: pending_op::GitStatus::Staged, + job_status: pending_op::JobStatus::Finished + }, + pending_op::PendingOp { + id: 2u16.into(), + git_status: pending_op::GitStatus::Unstaged, + job_status: pending_op::JobStatus::Finished + }, + pending_op::PendingOp { + id: 3u16.into(), + git_status: pending_op::GitStatus::Staged, + job_status: pending_op::JobStatus::Finished + }, + pending_op::PendingOp { + id: 4u16.into(), + git_status: pending_op::GitStatus::Unstaged, + job_status: pending_op::JobStatus::Finished + }, + pending_op::PendingOp { + id: 5u16.into(), + git_status: pending_op::GitStatus::Staged, + job_status: pending_op::JobStatus::Finished + } + ], + ); + + repo.update(cx, |repo, _cx| { + let git_statuses = repo.cached_status().collect::>(); + + assert_eq!( + git_statuses, + [StatusEntry { + repo_path: repo_path("a.txt"), + status: TrackedStatus { + index_status: StatusCode::Added, + worktree_status: StatusCode::Unmodified + } + .into(), + }] + ); + }); +} + +#[gpui::test] +async fn test_repository_pending_ops_long_running_staging( + executor: gpui::BackgroundExecutor, + cx: &mut gpui::TestAppContext, +) { + init_test(cx); + + let fs = FakeFs::new(executor); + fs.insert_tree( + path!("/root"), + json!({ + "my-repo": { + ".git": {}, + "a.txt": "a", + } + + }), + ) + .await; + + fs.set_status_for_repo( + path!("/root/my-repo/.git").as_ref(), + &[("a.txt", FileStatus::Untracked)], + ); + + let project = Project::test(fs.clone(), [path!("/root/my-repo").as_ref()], cx).await; + let pending_ops_all = Arc::new(Mutex::new(SumTree::default())); + project.update(cx, |project, cx| { + let pending_ops_all = pending_ops_all.clone(); + cx.subscribe(project.git_store(), move |_, _, e, _| { + if let GitStoreEvent::RepositoryUpdated( + _, + RepositoryEvent::PendingOpsChanged { pending_ops }, + _, + ) = e + { + let merged = merge_pending_ops_snapshots( + pending_ops.items(()), + pending_ops_all.lock().items(()), + ); + *pending_ops_all.lock() = SumTree::from_iter(merged.into_iter(), ()); + } + }) + .detach(); + }); + + project + .update(cx, |project, cx| project.git_scans_complete(cx)) + .await; + + let repo = project.read_with(cx, |project, cx| { + project.repositories(cx).values().next().unwrap().clone() + }); + + repo.update(cx, |repo, cx| { + repo.stage_entries(vec![repo_path("a.txt")], cx) + }) + .detach(); + + repo.update(cx, |repo, cx| { + repo.stage_entries(vec![repo_path("a.txt")], cx) + }) + .unwrap() + .with_timeout(Duration::from_secs(1), &cx.executor()) + .await + .unwrap(); + + cx.run_until_parked(); + + assert_eq!( + pending_ops_all + .lock() + .get(&worktree::PathKey(repo_path("a.txt").0), ()) + .unwrap() + .ops, + vec![ + pending_op::PendingOp { + id: 1u16.into(), + git_status: pending_op::GitStatus::Staged, + job_status: pending_op::JobStatus::Skipped + }, + pending_op::PendingOp { + id: 2u16.into(), + git_status: pending_op::GitStatus::Staged, + job_status: pending_op::JobStatus::Finished + } + ], + ); + + repo.update(cx, |repo, _cx| { + let git_statuses = repo.cached_status().collect::>(); + + assert_eq!( + git_statuses, + [StatusEntry { + repo_path: repo_path("a.txt"), + status: TrackedStatus { + index_status: StatusCode::Added, + worktree_status: StatusCode::Unmodified + } + .into(), + }] + ); + }); +} + +#[gpui::test] +async fn test_repository_pending_ops_stage_all( + executor: gpui::BackgroundExecutor, + cx: &mut gpui::TestAppContext, +) { + init_test(cx); + + let fs = FakeFs::new(executor); + fs.insert_tree( + path!("/root"), + json!({ + "my-repo": { + ".git": {}, + "a.txt": "a", + "b.txt": "b" + } + + }), + ) + .await; + + fs.set_status_for_repo( + path!("/root/my-repo/.git").as_ref(), + &[ + ("a.txt", FileStatus::Untracked), + ("b.txt", FileStatus::Untracked), + ], + ); + + let project = Project::test(fs.clone(), [path!("/root/my-repo").as_ref()], cx).await; + let pending_ops_all = Arc::new(Mutex::new(SumTree::default())); + project.update(cx, |project, cx| { + let pending_ops_all = pending_ops_all.clone(); + cx.subscribe(project.git_store(), move |_, _, e, _| { + if let GitStoreEvent::RepositoryUpdated( + _, + RepositoryEvent::PendingOpsChanged { pending_ops }, + _, + ) = e + { + let merged = merge_pending_ops_snapshots( + pending_ops.items(()), + pending_ops_all.lock().items(()), + ); + *pending_ops_all.lock() = SumTree::from_iter(merged.into_iter(), ()); + } + }) + .detach(); + }); + project + .update(cx, |project, cx| project.git_scans_complete(cx)) + .await; + + let repo = project.read_with(cx, |project, cx| { + project.repositories(cx).values().next().unwrap().clone() + }); + + repo.update(cx, |repo, cx| { + repo.stage_entries(vec![repo_path("a.txt")], cx) + }) + .await + .unwrap(); + repo.update(cx, |repo, cx| repo.stage_all(cx)) + .await + .unwrap(); + repo.update(cx, |repo, cx| repo.unstage_all(cx)) + .await + .unwrap(); + + cx.run_until_parked(); + + assert_eq!( + pending_ops_all + .lock() + .get(&worktree::PathKey(repo_path("a.txt").0), ()) + .unwrap() + .ops, + vec![ + pending_op::PendingOp { + id: 1u16.into(), + git_status: pending_op::GitStatus::Staged, + job_status: pending_op::JobStatus::Finished + }, + pending_op::PendingOp { + id: 2u16.into(), + git_status: pending_op::GitStatus::Unstaged, + job_status: pending_op::JobStatus::Finished + }, + ], + ); + assert_eq!( + pending_ops_all + .lock() + .get(&worktree::PathKey(repo_path("b.txt").0), ()) + .unwrap() + .ops, + vec![ + pending_op::PendingOp { + id: 1u16.into(), + git_status: pending_op::GitStatus::Staged, + job_status: pending_op::JobStatus::Finished + }, + pending_op::PendingOp { + id: 2u16.into(), + git_status: pending_op::GitStatus::Unstaged, + job_status: pending_op::JobStatus::Finished + }, + ], + ); + + repo.update(cx, |repo, _cx| { + let git_statuses = repo.cached_status().collect::>(); + + assert_eq!( + git_statuses, + [ + StatusEntry { + repo_path: repo_path("a.txt"), + status: FileStatus::Untracked, + }, + StatusEntry { + repo_path: repo_path("b.txt"), + status: FileStatus::Untracked, + }, + ] + ); + }); +} + #[gpui::test] async fn test_repository_subfolder_git_status( executor: gpui::BackgroundExecutor,