@@ -48,7 +48,7 @@ use language::{
proto::{deserialize_version, serialize_version},
};
use parking_lot::Mutex;
-use pending_op::{PendingOp, PendingOpId, PendingOps};
+use pending_op::{PendingOp, PendingOpId, PendingOps, PendingOpsSummary};
use postage::stream::Stream as _;
use rpc::{
AnyProtoClient, TypedEnvelope,
@@ -255,7 +255,6 @@ pub struct MergeDetails {
pub struct RepositorySnapshot {
pub id: RepositoryId,
pub statuses_by_path: SumTree<StatusEntry>,
- pub pending_ops_by_path: SumTree<PendingOps>,
pub work_directory_abs_path: Arc<Path>,
pub path_style: PathStyle,
pub branch: Option<Branch>,
@@ -285,6 +284,7 @@ pub struct Repository {
paths_needing_status_update: BTreeSet<RepoPath>,
job_sender: mpsc::UnboundedSender<GitJob>,
active_jobs: HashMap<JobId, JobInfo>,
+ pending_ops: SumTree<PendingOps>,
job_id: JobId,
askpass_delegates: Arc<Mutex<HashMap<u64, AskPassDelegate>>>,
latest_askpass_id: u64,
@@ -316,9 +316,7 @@ pub enum RepositoryEvent {
MergeHeadsChanged,
BranchChanged,
StashEntriesChanged,
- PendingOpsChanged {
- pending_ops: SumTree<pending_op::PendingOps>,
- },
+ PendingOpsChanged { pending_ops: SumTree<PendingOps> },
}
#[derive(Clone, Debug)]
@@ -3062,7 +3060,6 @@ impl RepositorySnapshot {
Self {
id,
statuses_by_path: Default::default(),
- pending_ops_by_path: Default::default(),
work_directory_abs_path,
branch: None,
head_commit: None,
@@ -3190,12 +3187,6 @@ impl RepositorySnapshot {
.cloned()
}
- pub fn pending_ops_for_path(&self, path: &RepoPath) -> Option<PendingOps> {
- self.pending_ops_by_path
- .get(&PathKey(path.as_ref().clone()), ())
- .cloned()
- }
-
pub fn abs_path_to_repo_path(&self, abs_path: &Path) -> Option<RepoPath> {
Self::abs_path_to_repo_path_inner(&self.work_directory_abs_path, abs_path, self.path_style)
}
@@ -3331,6 +3322,20 @@ impl Repository {
self.snapshot.clone()
}
+ pub fn pending_ops(&self) -> impl Iterator<Item = PendingOps> + '_ {
+ self.pending_ops.iter().cloned()
+ }
+
+ pub fn pending_ops_summary(&self) -> PathSummary<PendingOpsSummary> {
+ self.pending_ops.summary().clone()
+ }
+
+ pub fn pending_ops_for_path(&self, path: &RepoPath) -> Option<PendingOps> {
+ self.pending_ops
+ .get(&PathKey(path.as_ref().clone()), ())
+ .cloned()
+ }
+
fn local(
id: RepositoryId,
work_directory_abs_path: Arc<Path>,
@@ -3348,6 +3353,7 @@ impl Repository {
this: cx.weak_entity(),
git_store,
snapshot,
+ pending_ops: Default::default(),
commit_message_buffer: None,
askpass_delegates: Default::default(),
paths_needing_status_update: Default::default(),
@@ -3381,6 +3387,7 @@ impl Repository {
snapshot,
commit_message_buffer: None,
git_store,
+ pending_ops: Default::default(),
paths_needing_status_update: Default::default(),
job_sender: Self::spawn_remote_git_worker(project_id, client, cx),
askpass_delegates: Default::default(),
@@ -5027,6 +5034,32 @@ impl Repository {
})
}
+ fn clear_pending_ops(&mut self, cx: &mut Context<Self>) {
+ let updated = SumTree::from_iter(
+ self.pending_ops.iter().filter_map(|ops| {
+ let inner_ops: Vec<PendingOp> =
+ ops.ops.iter().filter(|op| op.running()).cloned().collect();
+ if inner_ops.is_empty() {
+ None
+ } else {
+ Some(PendingOps {
+ repo_path: ops.repo_path.clone(),
+ ops: inner_ops,
+ })
+ }
+ }),
+ (),
+ );
+
+ if updated != self.pending_ops {
+ cx.emit(RepositoryEvent::PendingOpsChanged {
+ pending_ops: self.pending_ops.clone(),
+ })
+ }
+
+ self.pending_ops = updated;
+ }
+
fn schedule_scan(
&mut self,
updates_tx: Option<mpsc::UnboundedSender<DownstreamUpdate>>,
@@ -5058,6 +5091,7 @@ impl Repository {
.await?;
this.update(&mut cx, |this, cx| {
this.snapshot = snapshot.clone();
+ this.clear_pending_ops(cx);
for event in events {
cx.emit(event);
}
@@ -5390,14 +5424,18 @@ impl Repository {
this.update(cx, |this, _| {
let mut edits = Vec::with_capacity(ids.len());
for (id, entry) in ids {
- if let Some(mut ops) = this.snapshot.pending_ops_for_path(&entry) {
+ if let Some(mut ops) = this
+ .pending_ops
+ .get(&PathKey(entry.as_ref().clone()), ())
+ .cloned()
+ {
if let Some(op) = ops.op_by_id_mut(id) {
op.job_status = job_status;
}
edits.push(sum_tree::Edit::Insert(ops));
}
}
- this.snapshot.pending_ops_by_path.edit(edits, ());
+ this.pending_ops.edit(edits, ());
})?;
result
@@ -5413,8 +5451,9 @@ impl Repository {
let mut ids = Vec::with_capacity(paths.len());
for path in paths {
let mut ops = self
- .snapshot
- .pending_ops_for_path(&path)
+ .pending_ops
+ .get(&PathKey(path.as_ref().clone()), ())
+ .cloned()
.unwrap_or_else(|| PendingOps::new(&path));
let id = ops.max_id() + 1;
ops.ops.push(PendingOp {
@@ -5425,7 +5464,7 @@ impl Repository {
edits.push(sum_tree::Edit::Insert(ops));
ids.push((id, path));
}
- self.snapshot.pending_ops_by_path.edit(edits, ());
+ self.pending_ops.edit(edits, ());
ids
}
}
@@ -5695,28 +5734,6 @@ async fn compute_snapshot(
MergeDetails::load(&backend, &statuses_by_path, &prev_snapshot).await?;
log::debug!("new merge details (changed={merge_heads_changed:?}): {merge_details:?}");
- let pending_ops_by_path = SumTree::from_iter(
- prev_snapshot.pending_ops_by_path.iter().filter_map(|ops| {
- let inner_ops: Vec<PendingOp> =
- ops.ops.iter().filter(|op| op.running()).cloned().collect();
- if inner_ops.is_empty() {
- None
- } else {
- Some(PendingOps {
- repo_path: ops.repo_path.clone(),
- ops: inner_ops,
- })
- }
- }),
- (),
- );
-
- if pending_ops_by_path != prev_snapshot.pending_ops_by_path {
- events.push(RepositoryEvent::PendingOpsChanged {
- pending_ops: prev_snapshot.pending_ops_by_path.clone(),
- })
- }
-
if merge_heads_changed {
events.push(RepositoryEvent::MergeHeadsChanged);
}
@@ -5742,7 +5759,6 @@ async fn compute_snapshot(
let snapshot = RepositorySnapshot {
id,
statuses_by_path,
- pending_ops_by_path,
work_directory_abs_path,
path_style: prev_snapshot.path_style,
scan_id: prev_snapshot.scan_id + 1,