diff --git a/crates/project/src/git_store.rs b/crates/project/src/git_store.rs index d6f2f2b1910ecbd7c66483130c7b27e552de1b6d..86bba441b018d71fe7048a51ed20ba22e6e0130f 100644 --- a/crates/project/src/git_store.rs +++ b/crates/project/src/git_store.rs @@ -45,7 +45,7 @@ use language::{ proto::{deserialize_version, serialize_version}, }; use parking_lot::Mutex; -use pending_op::{PendingOp, PendingOpId, PendingOpStatus}; +use pending_op::{PendingOp, PendingOps}; use postage::stream::Stream as _; use rpc::{ AnyProtoClient, TypedEnvelope, @@ -250,7 +250,7 @@ pub struct MergeDetails { pub struct RepositorySnapshot { pub id: RepositoryId, pub statuses_by_path: SumTree, - pub pending_ops: SumTree, + pub pending_ops_by_path: SumTree, pub work_directory_abs_path: Arc, pub path_style: PathStyle, pub branch: Option, @@ -2950,7 +2950,7 @@ impl RepositorySnapshot { Self { id, statuses_by_path: Default::default(), - pending_ops: Default::default(), + pending_ops_by_path: Default::default(), work_directory_abs_path, branch: None, head_commit: None, @@ -3078,16 +3078,10 @@ impl RepositorySnapshot { .cloned() } - pub fn pending_op_by_id(&self, id: PendingOpId) -> Option { - self.pending_ops.get(&id, ()).cloned() - } - - pub fn pending_ops_for_path(&self, path: &RepoPath) -> Vec { - self.pending_ops - .filter::<_, PathKey>((), |sum| sum.max_path == path.0) - .into_iter() - .map(Clone::clone) - .collect() + pub fn pending_ops_for_path(&self, path: &RepoPath) -> Option { + self.pending_ops_by_path + .get(&PathKey(path.0.clone()), ()) + .cloned() } pub fn abs_path_to_repo_path(&self, abs_path: &Path) -> Option { @@ -3826,67 +3820,86 @@ impl Repository { }; let mut ids = Vec::with_capacity(entries.len()); + let mut edits = Vec::with_capacity(entries.len()); for entry in &entries { - let id = self.snapshot.pending_ops.summary().item_summary.max_id + 1; - self.snapshot.pending_ops.push( - PendingOp { - repo_path: entry.clone(), - id, - status: PendingOpStatus::Staged, - finished: false, - }, - (), - ); - ids.push(id); + let id = self + .snapshot + .pending_ops_by_path + .summary() + .item_summary + .max_id + + 1; + let op = PendingOp { + id, + git_status: pending_op::GitStatus::Staged, + job_status: pending_op::JobStatus::Started, + }; + let mut ops = self + .snapshot + .pending_ops_for_path(entry) + .unwrap_or_else(|| PendingOps::new(entry)); + ops.ops.push(op); + edits.push(sum_tree::Edit::Insert(ops)); + ids.push((id, entry.clone())); } + self.snapshot.pending_ops_by_path.edit(edits, ()); cx.spawn(async move |this, cx| { for save_task in save_tasks { save_task.await?; } - this.update(cx, |this, _| { - this.send_keyed_job( - job_key, - Some(status.into()), - move |git_repo, _cx| async move { - match git_repo { - RepositoryState::Local { - backend, - environment, - .. - } => backend.stage_paths(entries, environment.clone()).await, - RepositoryState::Remote { project_id, client } => { - client - .request(proto::Stage { - project_id: project_id.0, - repository_id: id.to_proto(), - paths: entries - .into_iter() - .map(|repo_path| repo_path.to_proto()) - .collect(), - }) - .await - .context("sending stage request")?; - - Ok(()) + let res = this + .update(cx, |this, _| { + this.send_keyed_job( + job_key, + Some(status.into()), + move |git_repo, _cx| async move { + match git_repo { + RepositoryState::Local { + backend, + environment, + .. + } => backend.stage_paths(entries, environment.clone()).await, + RepositoryState::Remote { project_id, client } => { + client + .request(proto::Stage { + project_id: project_id.0, + repository_id: id.to_proto(), + paths: entries + .into_iter() + .map(|repo_path| repo_path.to_proto()) + .collect(), + }) + .await + .context("sending stage request")?; + + Ok(()) + } } - } - }, - ) - })? - .await??; + }, + ) + })? + .await; + + let (job_status, res) = match res { + Ok(res) => (pending_op::JobStatus::Finished, res), + Err(err) => (err.into(), Ok(())), + }; + res?; this.update(cx, |this, _| { - for id in ids { - if let Some(mut op) = this.snapshot.pending_op_by_id(id) { - op.finished = true; - this.snapshot - .pending_ops - .edit(vec![sum_tree::Edit::Insert(op)], ()); + let mut edits = Vec::with_capacity(ids.len()); + for (id, entry) in ids { + if let Some(mut ops) = this.snapshot.pending_ops_for_path(&entry) { + if let Some(op) = ops.op_by_id_mut(id) { + op.job_status = job_status; + } + edits.push(sum_tree::Edit::Insert(ops)); } } + this.snapshot.pending_ops_by_path.edit(edits, ()); })?; Ok(()) @@ -3915,67 +3928,86 @@ impl Repository { }; let mut ids = Vec::with_capacity(entries.len()); + let mut edits = Vec::with_capacity(entries.len()); for entry in &entries { - let id = self.snapshot.pending_ops.summary().item_summary.max_id + 1; - self.snapshot.pending_ops.push( - PendingOp { - repo_path: entry.clone(), - id, - status: PendingOpStatus::Unstaged, - finished: false, - }, - (), - ); - ids.push(id); + let id = self + .snapshot + .pending_ops_by_path + .summary() + .item_summary + .max_id + + 1; + let op = PendingOp { + id, + git_status: pending_op::GitStatus::Unstaged, + job_status: pending_op::JobStatus::Started, + }; + let mut ops = self + .snapshot + .pending_ops_for_path(entry) + .unwrap_or_else(|| PendingOps::new(entry)); + ops.ops.push(op); + edits.push(sum_tree::Edit::Insert(ops)); + ids.push((id, entry.clone())); } + self.snapshot.pending_ops_by_path.edit(edits, ()); cx.spawn(async move |this, cx| { for save_task in save_tasks { save_task.await?; } - this.update(cx, |this, _| { - this.send_keyed_job( - job_key, - Some(status.into()), - move |git_repo, _cx| async move { - match git_repo { - RepositoryState::Local { - backend, - environment, - .. - } => backend.unstage_paths(entries, environment).await, - RepositoryState::Remote { project_id, client } => { - client - .request(proto::Unstage { - project_id: project_id.0, - repository_id: id.to_proto(), - paths: entries - .into_iter() - .map(|repo_path| repo_path.to_proto()) - .collect(), - }) - .await - .context("sending unstage request")?; - - Ok(()) + let res = this + .update(cx, |this, _| { + this.send_keyed_job( + job_key, + Some(status.into()), + move |git_repo, _cx| async move { + match git_repo { + RepositoryState::Local { + backend, + environment, + .. + } => backend.unstage_paths(entries, environment).await, + RepositoryState::Remote { project_id, client } => { + client + .request(proto::Unstage { + project_id: project_id.0, + repository_id: id.to_proto(), + paths: entries + .into_iter() + .map(|repo_path| repo_path.to_proto()) + .collect(), + }) + .await + .context("sending unstage request")?; + + Ok(()) + } } - } - }, - ) - })? - .await??; + }, + ) + })? + .await; + + let (job_status, res) = match res { + Ok(res) => (pending_op::JobStatus::Finished, res), + Err(err) => (err.into(), Ok(())), + }; + res?; this.update(cx, |this, _| { - for id in ids { - if let Some(mut op) = this.snapshot.pending_op_by_id(id) { - op.finished = true; - this.snapshot - .pending_ops - .edit(vec![sum_tree::Edit::Insert(op)], ()); + let mut edits = Vec::with_capacity(ids.len()); + for (id, entry) in ids { + if let Some(mut ops) = this.snapshot.pending_ops_for_path(&entry) { + if let Some(op) = ops.op_by_id_mut(id) { + op.job_status = job_status; + } + edits.push(sum_tree::Edit::Insert(ops)); } } + this.snapshot.pending_ops_by_path.edit(edits, ()); })?; Ok(()) @@ -5518,7 +5550,7 @@ async fn compute_snapshot( MergeDetails::load(&backend, &statuses_by_path, &prev_snapshot).await?; log::debug!("new merge details (changed={merge_heads_changed:?}): {merge_details:?}"); - let pending_ops = prev_snapshot.pending_ops.clone(); + let pending_ops_by_path = prev_snapshot.pending_ops_by_path.clone(); if merge_heads_changed { events.push(RepositoryEvent::MergeHeadsChanged); @@ -5545,7 +5577,7 @@ async fn compute_snapshot( let snapshot = RepositorySnapshot { id, statuses_by_path, - pending_ops, + pending_ops_by_path, work_directory_abs_path, path_style: prev_snapshot.path_style, scan_id: prev_snapshot.scan_id + 1, diff --git a/crates/project/src/git_store/pending_op.rs b/crates/project/src/git_store/pending_op.rs index 736d88ab4a86f5a12069eb5418a1533e1af466f9..4a3d7ed237f4da9f1daf221cf6c6253b1ef6b917 100644 --- a/crates/project/src/git_store/pending_op.rs +++ b/crates/project/src/git_store/pending_op.rs @@ -1,87 +1,105 @@ +use futures::channel::oneshot::Canceled; use git::repository::RepoPath; use std::ops::Add; -use sum_tree::{ContextLessSummary, Dimension, Item, KeyedItem}; -use worktree::PathSummary; +use sum_tree::{ContextLessSummary, Item, KeyedItem}; +use worktree::{PathKey, PathSummary}; #[derive(Clone, Copy, Debug, PartialEq, Eq)] -pub enum PendingOpStatus { +pub enum GitStatus { Staged, Unstaged, Reverted, Unchanged, } +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub enum JobStatus { + Started, + Finished, + Canceled, +} + #[derive(Clone, Debug, PartialEq, Eq)] -pub struct PendingOp { +pub struct PendingOps { pub repo_path: RepoPath, + pub ops: Vec, +} + +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct PendingOp { pub id: PendingOpId, - pub status: PendingOpStatus, - pub finished: bool, + pub git_status: GitStatus, + pub job_status: JobStatus, } #[derive(Clone, Debug)] -pub struct PendingOpSummary { +pub struct PendingOpsSummary { pub max_id: PendingOpId, - pub staged_count: usize, - pub unstaged_count: usize, } -#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)] -pub struct PendingOpId(pub usize); +#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)] +pub struct PendingOpId(pub u16); -impl Item for PendingOp { - type Summary = PathSummary; +impl Item for PendingOps { + type Summary = PathSummary; fn summary(&self, _cx: ()) -> Self::Summary { PathSummary { max_path: self.repo_path.0.clone(), - item_summary: PendingOpSummary { - max_id: self.id, - staged_count: (self.status == PendingOpStatus::Staged) as usize, - unstaged_count: (self.status == PendingOpStatus::Unstaged) as usize, + item_summary: PendingOpsSummary { + max_id: self.ops.last().map(|op| op.id).unwrap_or_default(), }, } } } -impl ContextLessSummary for PendingOpSummary { +impl ContextLessSummary for PendingOpsSummary { fn zero() -> Self { Self { - max_id: PendingOpId(0), - staged_count: 0, - unstaged_count: 0, + max_id: PendingOpId::default(), } } fn add_summary(&mut self, summary: &Self) { self.max_id = summary.max_id; - self.staged_count += summary.staged_count; - self.unstaged_count += summary.unstaged_count; } } -impl KeyedItem for PendingOp { - type Key = PendingOpId; +impl KeyedItem for PendingOps { + type Key = PathKey; fn key(&self) -> Self::Key { - self.id + PathKey(self.repo_path.0.clone()) } } -impl Dimension<'_, PathSummary> for PendingOpId { - fn zero(_cx: ()) -> Self { - Self(0) - } +impl Add for PendingOpId { + type Output = PendingOpId; - fn add_summary(&mut self, summary: &PathSummary, _cx: ()) { - *self = summary.item_summary.max_id; + fn add(self, rhs: u16) -> Self::Output { + Self(self.0 + rhs) } } -impl Add for PendingOpId { - type Output = PendingOpId; +impl PendingOps { + pub fn new(path: &RepoPath) -> Self { + Self { + repo_path: path.clone(), + ops: Vec::new(), + } + } - fn add(self, rhs: usize) -> Self::Output { - Self(self.0 + rhs) + pub fn op_by_id(&self, id: PendingOpId) -> Option<&PendingOp> { + self.ops.iter().find(|op| op.id == id) + } + + pub fn op_by_id_mut(&mut self, id: PendingOpId) -> Option<&mut PendingOp> { + self.ops.iter_mut().find(|op| op.id == id) + } +} + +impl From for JobStatus { + fn from(_err: Canceled) -> Self { + Self::Canceled } }