git: Hide pending op tracking in a helper wrapper

Jakub Konka created

Change summary

crates/project/src/git_store.rs | 185 +++++++++++++++++-----------------
1 file changed, 93 insertions(+), 92 deletions(-)

Detailed changes

crates/project/src/git_store.rs 🔗

@@ -17,7 +17,10 @@ pub use conflict_set::{ConflictRegion, ConflictSet, ConflictSetSnapshot, Conflic
 use fs::Fs;
 use futures::{
     FutureExt, StreamExt,
-    channel::{mpsc, oneshot},
+    channel::{
+        mpsc,
+        oneshot::{self, Canceled},
+    },
     future::{self, Shared},
     stream::FuturesOrdered,
 };
@@ -45,7 +48,7 @@ use language::{
     proto::{deserialize_version, serialize_version},
 };
 use parking_lot::Mutex;
-use pending_op::{PendingOp, PendingOps};
+use pending_op::{PendingOp, PendingOpId, PendingOps};
 use postage::stream::Stream as _;
 use rpc::{
     AnyProtoClient, TypedEnvelope,
@@ -3653,6 +3656,13 @@ impl Repository {
         let commit = commit.to_string();
         let id = self.id;
 
+        // let mut edits = Vec::with_capacity(paths.len());
+        // let mut ids = Vec::with_capacity(paths.len());
+        // for path in &paths {
+
+        // }
+        // self.snapshot.pending_ops_by_path.edit(edits, ());
+
         self.send_job(
             Some(format!("git checkout {}", commit).into()),
             move |git_repo, _| async move {
@@ -3828,28 +3838,16 @@ impl Repository {
             _ => None,
         };
 
-        let mut ids = Vec::with_capacity(entries.len());
-        let mut edits = Vec::with_capacity(entries.len());
-
-        for entry in &entries {
-            let op = self.snapshot.new_pending_op(pending_op::GitStatus::Staged);
-            let mut ops = self
-                .snapshot
-                .pending_ops_for_path(entry)
-                .unwrap_or_else(|| PendingOps::new(entry));
-            ops.ops.push(op);
-            edits.push(sum_tree::Edit::Insert(ops));
-            ids.push((op.id, entry.clone()));
-        }
-        self.snapshot.pending_ops_by_path.edit(edits, ());
-
-        cx.spawn(async move |this, cx| {
-            for save_task in save_tasks {
-                save_task.await?;
-            }
+        self.spawn_job_with_tracking(
+            entries.clone(),
+            pending_op::GitStatus::Staged,
+            cx,
+            async move |this, cx| {
+                for save_task in save_tasks {
+                    save_task.await?;
+                }
 
-            let res = this
-                .update(cx, |this, _| {
+                this.update(cx, |this, _| {
                     this.send_keyed_job(
                         job_key,
                         Some(status.into()),
@@ -3879,29 +3877,9 @@ impl Repository {
                         },
                     )
                 })?
-                .await;
-
-            let (job_status, res) = match res {
-                Ok(res) => (pending_op::JobStatus::Finished, res),
-                Err(err) => (err.into(), Ok(())),
-            };
-            res?;
-
-            this.update(cx, |this, _| {
-                let mut edits = Vec::with_capacity(ids.len());
-                for (id, entry) in ids {
-                    if let Some(mut ops) = this.snapshot.pending_ops_for_path(&entry) {
-                        if let Some(op) = ops.op_by_id_mut(id) {
-                            op.job_status = job_status;
-                        }
-                        edits.push(sum_tree::Edit::Insert(ops));
-                    }
-                }
-                this.snapshot.pending_ops_by_path.edit(edits, ());
-            })?;
-
-            Ok(())
-        })
+                .await?
+            },
+        )
     }
 
     pub fn unstage_entries(
@@ -3925,30 +3903,16 @@ impl Repository {
             _ => None,
         };
 
-        let mut ids = Vec::with_capacity(entries.len());
-        let mut edits = Vec::with_capacity(entries.len());
-
-        for entry in &entries {
-            let op = self
-                .snapshot
-                .new_pending_op(pending_op::GitStatus::Unstaged);
-            let mut ops = self
-                .snapshot
-                .pending_ops_for_path(entry)
-                .unwrap_or_else(|| PendingOps::new(entry));
-            ops.ops.push(op);
-            edits.push(sum_tree::Edit::Insert(ops));
-            ids.push((op.id, entry.clone()));
-        }
-        self.snapshot.pending_ops_by_path.edit(edits, ());
-
-        cx.spawn(async move |this, cx| {
-            for save_task in save_tasks {
-                save_task.await?;
-            }
+        self.spawn_job_with_tracking(
+            entries.clone(),
+            pending_op::GitStatus::Unstaged,
+            cx,
+            async move |this, cx| {
+                for save_task in save_tasks {
+                    save_task.await?;
+                }
 
-            let res = this
-                .update(cx, |this, _| {
+                this.update(cx, |this, _| {
                     this.send_keyed_job(
                         job_key,
                         Some(status.into()),
@@ -3978,29 +3942,9 @@ impl Repository {
                         },
                     )
                 })?
-                .await;
-
-            let (job_status, res) = match res {
-                Ok(res) => (pending_op::JobStatus::Finished, res),
-                Err(err) => (err.into(), Ok(())),
-            };
-            res?;
-
-            this.update(cx, |this, _| {
-                let mut edits = Vec::with_capacity(ids.len());
-                for (id, entry) in ids {
-                    if let Some(mut ops) = this.snapshot.pending_ops_for_path(&entry) {
-                        if let Some(op) = ops.op_by_id_mut(id) {
-                            op.job_status = job_status;
-                        }
-                        edits.push(sum_tree::Edit::Insert(ops));
-                    }
-                }
-                this.snapshot.pending_ops_by_path.edit(edits, ());
-            })?;
-
-            Ok(())
-        })
+                .await?
+            },
+        )
     }
 
     pub fn stage_all(&mut self, cx: &mut Context<Self>) -> Task<anyhow::Result<()>> {
@@ -5274,6 +5218,63 @@ impl Repository {
     pub fn barrier(&mut self) -> oneshot::Receiver<()> {
         self.send_job(None, |_, _| async {})
     }
+
+    fn spawn_job_with_tracking<AsyncFn>(
+        &mut self,
+        paths: Vec<RepoPath>,
+        git_status: pending_op::GitStatus,
+        cx: &mut Context<Self>,
+        f: AsyncFn,
+    ) -> Task<anyhow::Result<()>>
+    where
+        AsyncFn: AsyncFnOnce(WeakEntity<Repository>, &mut AsyncApp) -> anyhow::Result<()> + 'static,
+    {
+        let ids = self.new_pending_ops_for_paths(paths, git_status);
+
+        cx.spawn(async move |this, cx| {
+            let job_status = match f(this.clone(), cx).await {
+                Ok(()) => pending_op::JobStatus::Finished,
+                Err(err) if err.is::<Canceled>() => pending_op::JobStatus::Canceled,
+                Err(err) => return Err(err),
+            };
+
+            this.update(cx, |this, _| {
+                let mut edits = Vec::with_capacity(ids.len());
+                for (id, entry) in ids {
+                    if let Some(mut ops) = this.snapshot.pending_ops_for_path(&entry) {
+                        if let Some(op) = ops.op_by_id_mut(id) {
+                            op.job_status = job_status;
+                        }
+                        edits.push(sum_tree::Edit::Insert(ops));
+                    }
+                }
+                this.snapshot.pending_ops_by_path.edit(edits, ());
+            })?;
+
+            Ok(())
+        })
+    }
+
+    fn new_pending_ops_for_paths(
+        &mut self,
+        paths: Vec<RepoPath>,
+        git_status: pending_op::GitStatus,
+    ) -> Vec<(PendingOpId, RepoPath)> {
+        let mut edits = Vec::with_capacity(paths.len());
+        let mut ids = Vec::with_capacity(paths.len());
+        for path in paths {
+            let op = self.snapshot.new_pending_op(git_status);
+            let mut ops = self
+                .snapshot
+                .pending_ops_for_path(&path)
+                .unwrap_or_else(|| PendingOps::new(&path));
+            ops.ops.push(op);
+            edits.push(sum_tree::Edit::Insert(ops));
+            ids.push((op.id, path));
+        }
+        self.snapshot.pending_ops_by_path.edit(edits, ());
+        ids
+    }
 }
 
 fn get_permalink_in_rust_registry_src(