git: Index into sumtree by path, and store ops linearly per item

Jakub Konka created

git: Map canceled (coalesced) git job into JobStatus::Canceled

Change summary

crates/project/src/git_store.rs            | 250 +++++++++++++----------
crates/project/src/git_store/pending_op.rs |  90 +++++---
2 files changed, 195 insertions(+), 145 deletions(-)

Detailed changes

crates/project/src/git_store.rs 🔗

@@ -45,7 +45,7 @@ use language::{
     proto::{deserialize_version, serialize_version},
 };
 use parking_lot::Mutex;
-use pending_op::{PendingOp, PendingOpId, PendingOpStatus};
+use pending_op::{PendingOp, PendingOps};
 use postage::stream::Stream as _;
 use rpc::{
     AnyProtoClient, TypedEnvelope,
@@ -250,7 +250,7 @@ pub struct MergeDetails {
 pub struct RepositorySnapshot {
     pub id: RepositoryId,
     pub statuses_by_path: SumTree<StatusEntry>,
-    pub pending_ops: SumTree<PendingOp>,
+    pub pending_ops_by_path: SumTree<PendingOps>,
     pub work_directory_abs_path: Arc<Path>,
     pub path_style: PathStyle,
     pub branch: Option<Branch>,
@@ -2950,7 +2950,7 @@ impl RepositorySnapshot {
         Self {
             id,
             statuses_by_path: Default::default(),
-            pending_ops: Default::default(),
+            pending_ops_by_path: Default::default(),
             work_directory_abs_path,
             branch: None,
             head_commit: None,
@@ -3078,16 +3078,10 @@ impl RepositorySnapshot {
             .cloned()
     }
 
-    pub fn pending_op_by_id(&self, id: PendingOpId) -> Option<PendingOp> {
-        self.pending_ops.get(&id, ()).cloned()
-    }
-
-    pub fn pending_ops_for_path(&self, path: &RepoPath) -> Vec<PendingOp> {
-        self.pending_ops
-            .filter::<_, PathKey>((), |sum| sum.max_path == path.0)
-            .into_iter()
-            .map(Clone::clone)
-            .collect()
+    pub fn pending_ops_for_path(&self, path: &RepoPath) -> Option<PendingOps> {
+        self.pending_ops_by_path
+            .get(&PathKey(path.0.clone()), ())
+            .cloned()
     }
 
     pub fn abs_path_to_repo_path(&self, abs_path: &Path) -> Option<RepoPath> {
@@ -3826,67 +3820,86 @@ impl Repository {
         };
 
         let mut ids = Vec::with_capacity(entries.len());
+        let mut edits = Vec::with_capacity(entries.len());
 
         for entry in &entries {
-            let id = self.snapshot.pending_ops.summary().item_summary.max_id + 1;
-            self.snapshot.pending_ops.push(
-                PendingOp {
-                    repo_path: entry.clone(),
-                    id,
-                    status: PendingOpStatus::Staged,
-                    finished: false,
-                },
-                (),
-            );
-            ids.push(id);
+            let id = self
+                .snapshot
+                .pending_ops_by_path
+                .summary()
+                .item_summary
+                .max_id
+                + 1;
+            let op = PendingOp {
+                id,
+                git_status: pending_op::GitStatus::Staged,
+                job_status: pending_op::JobStatus::Started,
+            };
+            let mut ops = self
+                .snapshot
+                .pending_ops_for_path(entry)
+                .unwrap_or_else(|| PendingOps::new(entry));
+            ops.ops.push(op);
+            edits.push(sum_tree::Edit::Insert(ops));
+            ids.push((id, entry.clone()));
         }
+        self.snapshot.pending_ops_by_path.edit(edits, ());
 
         cx.spawn(async move |this, cx| {
             for save_task in save_tasks {
                 save_task.await?;
             }
 
-            this.update(cx, |this, _| {
-                this.send_keyed_job(
-                    job_key,
-                    Some(status.into()),
-                    move |git_repo, _cx| async move {
-                        match git_repo {
-                            RepositoryState::Local {
-                                backend,
-                                environment,
-                                ..
-                            } => backend.stage_paths(entries, environment.clone()).await,
-                            RepositoryState::Remote { project_id, client } => {
-                                client
-                                    .request(proto::Stage {
-                                        project_id: project_id.0,
-                                        repository_id: id.to_proto(),
-                                        paths: entries
-                                            .into_iter()
-                                            .map(|repo_path| repo_path.to_proto())
-                                            .collect(),
-                                    })
-                                    .await
-                                    .context("sending stage request")?;
-
-                                Ok(())
+            let res = this
+                .update(cx, |this, _| {
+                    this.send_keyed_job(
+                        job_key,
+                        Some(status.into()),
+                        move |git_repo, _cx| async move {
+                            match git_repo {
+                                RepositoryState::Local {
+                                    backend,
+                                    environment,
+                                    ..
+                                } => backend.stage_paths(entries, environment.clone()).await,
+                                RepositoryState::Remote { project_id, client } => {
+                                    client
+                                        .request(proto::Stage {
+                                            project_id: project_id.0,
+                                            repository_id: id.to_proto(),
+                                            paths: entries
+                                                .into_iter()
+                                                .map(|repo_path| repo_path.to_proto())
+                                                .collect(),
+                                        })
+                                        .await
+                                        .context("sending stage request")?;
+
+                                    Ok(())
+                                }
                             }
-                        }
-                    },
-                )
-            })?
-            .await??;
+                        },
+                    )
+                })?
+                .await;
+
+            let (job_status, res) = match res {
+                Ok(res) => (pending_op::JobStatus::Finished, res),
+                Err(err) => (err.into(), Ok(())),
+            };
+            res?;
 
             this.update(cx, |this, _| {
-                for id in ids {
-                    if let Some(mut op) = this.snapshot.pending_op_by_id(id) {
-                        op.finished = true;
-                        this.snapshot
-                            .pending_ops
-                            .edit(vec![sum_tree::Edit::Insert(op)], ());
+                let mut edits = Vec::with_capacity(ids.len());
+                for (id, entry) in ids {
+                    if let Some(mut ops) = this.snapshot.pending_ops_for_path(&entry) {
+                        if let Some(op) = ops.op_by_id_mut(id) {
+                            op.job_status = job_status;
+                        }
+                        edits.push(sum_tree::Edit::Insert(ops));
                     }
                 }
+                this.snapshot.pending_ops_by_path.edit(edits, ());
             })?;
 
             Ok(())
@@ -3915,67 +3928,86 @@ impl Repository {
         };
 
         let mut ids = Vec::with_capacity(entries.len());
+        let mut edits = Vec::with_capacity(entries.len());
 
         for entry in &entries {
-            let id = self.snapshot.pending_ops.summary().item_summary.max_id + 1;
-            self.snapshot.pending_ops.push(
-                PendingOp {
-                    repo_path: entry.clone(),
-                    id,
-                    status: PendingOpStatus::Unstaged,
-                    finished: false,
-                },
-                (),
-            );
-            ids.push(id);
+            let id = self
+                .snapshot
+                .pending_ops_by_path
+                .summary()
+                .item_summary
+                .max_id
+                + 1;
+            let op = PendingOp {
+                id,
+                git_status: pending_op::GitStatus::Unstaged,
+                job_status: pending_op::JobStatus::Started,
+            };
+            let mut ops = self
+                .snapshot
+                .pending_ops_for_path(entry)
+                .unwrap_or_else(|| PendingOps::new(entry));
+            ops.ops.push(op);
+            edits.push(sum_tree::Edit::Insert(ops));
+            ids.push((id, entry.clone()));
         }
+        self.snapshot.pending_ops_by_path.edit(edits, ());
 
         cx.spawn(async move |this, cx| {
             for save_task in save_tasks {
                 save_task.await?;
             }
 
-            this.update(cx, |this, _| {
-                this.send_keyed_job(
-                    job_key,
-                    Some(status.into()),
-                    move |git_repo, _cx| async move {
-                        match git_repo {
-                            RepositoryState::Local {
-                                backend,
-                                environment,
-                                ..
-                            } => backend.unstage_paths(entries, environment).await,
-                            RepositoryState::Remote { project_id, client } => {
-                                client
-                                    .request(proto::Unstage {
-                                        project_id: project_id.0,
-                                        repository_id: id.to_proto(),
-                                        paths: entries
-                                            .into_iter()
-                                            .map(|repo_path| repo_path.to_proto())
-                                            .collect(),
-                                    })
-                                    .await
-                                    .context("sending unstage request")?;
-
-                                Ok(())
+            let res = this
+                .update(cx, |this, _| {
+                    this.send_keyed_job(
+                        job_key,
+                        Some(status.into()),
+                        move |git_repo, _cx| async move {
+                            match git_repo {
+                                RepositoryState::Local {
+                                    backend,
+                                    environment,
+                                    ..
+                                } => backend.unstage_paths(entries, environment).await,
+                                RepositoryState::Remote { project_id, client } => {
+                                    client
+                                        .request(proto::Unstage {
+                                            project_id: project_id.0,
+                                            repository_id: id.to_proto(),
+                                            paths: entries
+                                                .into_iter()
+                                                .map(|repo_path| repo_path.to_proto())
+                                                .collect(),
+                                        })
+                                        .await
+                                        .context("sending unstage request")?;
+
+                                    Ok(())
+                                }
                             }
-                        }
-                    },
-                )
-            })?
-            .await??;
+                        },
+                    )
+                })?
+                .await;
+
+            let (job_status, res) = match res {
+                Ok(res) => (pending_op::JobStatus::Finished, res),
+                Err(err) => (err.into(), Ok(())),
+            };
+            res?;
 
             this.update(cx, |this, _| {
-                for id in ids {
-                    if let Some(mut op) = this.snapshot.pending_op_by_id(id) {
-                        op.finished = true;
-                        this.snapshot
-                            .pending_ops
-                            .edit(vec![sum_tree::Edit::Insert(op)], ());
+                let mut edits = Vec::with_capacity(ids.len());
+                for (id, entry) in ids {
+                    if let Some(mut ops) = this.snapshot.pending_ops_for_path(&entry) {
+                        if let Some(op) = ops.op_by_id_mut(id) {
+                            op.job_status = job_status;
+                        }
+                        edits.push(sum_tree::Edit::Insert(ops));
                     }
                 }
+                this.snapshot.pending_ops_by_path.edit(edits, ());
             })?;
 
             Ok(())
@@ -5518,7 +5550,7 @@ async fn compute_snapshot(
         MergeDetails::load(&backend, &statuses_by_path, &prev_snapshot).await?;
     log::debug!("new merge details (changed={merge_heads_changed:?}): {merge_details:?}");
 
-    let pending_ops = prev_snapshot.pending_ops.clone();
+    let pending_ops_by_path = prev_snapshot.pending_ops_by_path.clone();
 
     if merge_heads_changed {
         events.push(RepositoryEvent::MergeHeadsChanged);
@@ -5545,7 +5577,7 @@ async fn compute_snapshot(
     let snapshot = RepositorySnapshot {
         id,
         statuses_by_path,
-        pending_ops,
+        pending_ops_by_path,
         work_directory_abs_path,
         path_style: prev_snapshot.path_style,
         scan_id: prev_snapshot.scan_id + 1,

crates/project/src/git_store/pending_op.rs 🔗

@@ -1,87 +1,105 @@
+use futures::channel::oneshot::Canceled;
 use git::repository::RepoPath;
 use std::ops::Add;
-use sum_tree::{ContextLessSummary, Dimension, Item, KeyedItem};
-use worktree::PathSummary;
+use sum_tree::{ContextLessSummary, Item, KeyedItem};
+use worktree::{PathKey, PathSummary};
 
 #[derive(Clone, Copy, Debug, PartialEq, Eq)]
-pub enum PendingOpStatus {
+pub enum GitStatus {
     Staged,
     Unstaged,
     Reverted,
     Unchanged,
 }
 
+#[derive(Clone, Copy, Debug, PartialEq, Eq)]
+pub enum JobStatus {
+    Started,
+    Finished,
+    Canceled,
+}
+
 #[derive(Clone, Debug, PartialEq, Eq)]
-pub struct PendingOp {
+pub struct PendingOps {
     pub repo_path: RepoPath,
+    pub ops: Vec<PendingOp>,
+}
+
+#[derive(Clone, Debug, PartialEq, Eq)]
+pub struct PendingOp {
     pub id: PendingOpId,
-    pub status: PendingOpStatus,
-    pub finished: bool,
+    pub git_status: GitStatus,
+    pub job_status: JobStatus,
 }
 
 #[derive(Clone, Debug)]
-pub struct PendingOpSummary {
+pub struct PendingOpsSummary {
     pub max_id: PendingOpId,
-    pub staged_count: usize,
-    pub unstaged_count: usize,
 }
 
-#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
-pub struct PendingOpId(pub usize);
+#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
+pub struct PendingOpId(pub u16);
 
-impl Item for PendingOp {
-    type Summary = PathSummary<PendingOpSummary>;
+impl Item for PendingOps {
+    type Summary = PathSummary<PendingOpsSummary>;
 
     fn summary(&self, _cx: ()) -> Self::Summary {
         PathSummary {
             max_path: self.repo_path.0.clone(),
-            item_summary: PendingOpSummary {
-                max_id: self.id,
-                staged_count: (self.status == PendingOpStatus::Staged) as usize,
-                unstaged_count: (self.status == PendingOpStatus::Unstaged) as usize,
+            item_summary: PendingOpsSummary {
+                max_id: self.ops.last().map(|op| op.id).unwrap_or_default(),
             },
         }
     }
 }
 
-impl ContextLessSummary for PendingOpSummary {
+impl ContextLessSummary for PendingOpsSummary {
     fn zero() -> Self {
         Self {
-            max_id: PendingOpId(0),
-            staged_count: 0,
-            unstaged_count: 0,
+            max_id: PendingOpId::default(),
         }
     }
 
     fn add_summary(&mut self, summary: &Self) {
         self.max_id = summary.max_id;
-        self.staged_count += summary.staged_count;
-        self.unstaged_count += summary.unstaged_count;
     }
 }
 
-impl KeyedItem for PendingOp {
-    type Key = PendingOpId;
+impl KeyedItem for PendingOps {
+    type Key = PathKey;
 
     fn key(&self) -> Self::Key {
-        self.id
+        PathKey(self.repo_path.0.clone())
     }
 }
 
-impl Dimension<'_, PathSummary<PendingOpSummary>> for PendingOpId {
-    fn zero(_cx: ()) -> Self {
-        Self(0)
-    }
+impl Add<u16> for PendingOpId {
+    type Output = PendingOpId;
 
-    fn add_summary(&mut self, summary: &PathSummary<PendingOpSummary>, _cx: ()) {
-        *self = summary.item_summary.max_id;
+    fn add(self, rhs: u16) -> Self::Output {
+        Self(self.0 + rhs)
     }
 }
 
-impl Add<usize> for PendingOpId {
-    type Output = PendingOpId;
+impl PendingOps {
+    pub fn new(path: &RepoPath) -> Self {
+        Self {
+            repo_path: path.clone(),
+            ops: Vec::new(),
+        }
+    }
 
-    fn add(self, rhs: usize) -> Self::Output {
-        Self(self.0 + rhs)
+    pub fn op_by_id(&self, id: PendingOpId) -> Option<&PendingOp> {
+        self.ops.iter().find(|op| op.id == id)
+    }
+
+    pub fn op_by_id_mut(&mut self, id: PendingOpId) -> Option<&mut PendingOp> {
+        self.ops.iter_mut().find(|op| op.id == id)
+    }
+}
+
+impl From<Canceled> for JobStatus {
+    fn from(_err: Canceled) -> Self {
+        Self::Canceled
     }
 }