Simplify project git code (#25662)

Mikayla Maki created

This was originally a part of another PR, but I wanted to get the
refactoring in and shift focus to working on bugs.

This causes all git commands via the `Repository` entity to be
serialized, and allows us to return values other than `Result<()>`

Release Notes:

- N/A

Change summary

crates/git/src/repository.rs   |  15 
crates/git_ui/src/git_panel.rs |  10 
crates/project/src/git.rs      | 773 ++++++++++++++++-------------------
3 files changed, 365 insertions(+), 433 deletions(-)

Detailed changes

crates/git/src/repository.rs 🔗

@@ -618,10 +618,9 @@ impl GitRepository for RealGitRepository {
                 "Failed to push:\n{}",
                 String::from_utf8_lossy(&output.stderr)
             ));
+        } else {
+            Ok(())
         }
-
-        // TODO: Get remote response out of this and show it to the user
-        Ok(())
     }
 
     fn pull(&self, branch_name: &str, remote_name: &str) -> Result<()> {
@@ -639,10 +638,9 @@ impl GitRepository for RealGitRepository {
                 "Failed to pull:\n{}",
                 String::from_utf8_lossy(&output.stderr)
             ));
+        } else {
+            return Ok(());
         }
-
-        // TODO: Get remote response out of this and show it to the user
-        Ok(())
     }
 
     fn fetch(&self) -> Result<()> {
@@ -658,10 +656,9 @@ impl GitRepository for RealGitRepository {
                 "Failed to fetch:\n{}",
                 String::from_utf8_lossy(&output.stderr)
             ));
+        } else {
+            return Ok(());
         }
-
-        // TODO: Get remote response out of this and show it to the user
-        Ok(())
     }
 
     fn get_remotes(&self, branch_name: Option<&str>) -> Result<Vec<Remote>> {

crates/git_ui/src/git_panel.rs 🔗

@@ -1386,14 +1386,14 @@ impl GitPanel {
             };
 
             let mut current_remotes: Vec<Remote> = repo
-                .update(&mut cx, |repo, cx| {
+                .update(&mut cx, |repo, _| {
                     let Some(current_branch) = repo.current_branch() else {
                         return Err(anyhow::anyhow!("No active branch"));
                     };
 
-                    Ok(repo.get_remotes(Some(current_branch.name.to_string()), cx))
+                    Ok(repo.get_remotes(Some(current_branch.name.to_string())))
                 })??
-                .await?;
+                .await??;
 
             if current_remotes.len() == 0 {
                 return Err(anyhow::anyhow!("No active remote"));
@@ -2357,7 +2357,9 @@ impl GitPanel {
         let Some(repo) = self.active_repository.clone() else {
             return Task::ready(Err(anyhow::anyhow!("no active repo")));
         };
-        repo.update(cx, |repo, cx| repo.show(sha, cx))
+
+        let show = repo.read(cx).show(sha);
+        cx.spawn(|_, _| async move { show.await? })
     }
 
     fn deploy_entry_context_menu(

crates/project/src/git.rs 🔗

@@ -11,26 +11,29 @@ use git::{
     status::{GitSummary, TrackedSummary},
 };
 use gpui::{
-    App, AppContext as _, AsyncApp, Context, Entity, EventEmitter, SharedString, Subscription,
-    Task, WeakEntity,
+    App, AppContext, AsyncApp, Context, Entity, EventEmitter, SharedString, Subscription, Task,
+    WeakEntity,
 };
 use language::{Buffer, LanguageRegistry};
 use rpc::proto::{git_reset, ToProto};
 use rpc::{proto, AnyProtoClient, TypedEnvelope};
 use settings::WorktreeId;
+use std::future::Future;
 use std::path::{Path, PathBuf};
 use std::sync::Arc;
 use text::BufferId;
 use util::{maybe, ResultExt};
 use worktree::{ProjectEntryId, RepositoryEntry, StatusEntry};
 
+type GitJob = Box<dyn FnOnce(&mut AsyncApp) -> Task<()>>;
+
 pub struct GitStore {
     buffer_store: Entity<BufferStore>,
     pub(super) project_id: Option<ProjectId>,
     pub(super) client: Option<AnyProtoClient>,
     repositories: Vec<Entity<Repository>>,
     active_index: Option<usize>,
-    update_sender: mpsc::UnboundedSender<(Message, oneshot::Sender<Result<()>>)>,
+    update_sender: mpsc::UnboundedSender<GitJob>,
     _subscription: Subscription,
 }
 
@@ -41,7 +44,7 @@ pub struct Repository {
     pub repository_entry: RepositoryEntry,
     pub git_repo: GitRepo,
     pub merge_message: Option<String>,
-    update_sender: mpsc::UnboundedSender<(Message, oneshot::Sender<Result<()>>)>,
+    job_sender: mpsc::UnboundedSender<GitJob>,
 }
 
 #[derive(Clone)]
@@ -55,40 +58,6 @@ pub enum GitRepo {
     },
 }
 
-pub enum Message {
-    Commit {
-        git_repo: GitRepo,
-        message: SharedString,
-        name_and_email: Option<(SharedString, SharedString)>,
-    },
-    Reset {
-        repo: GitRepo,
-        commit: SharedString,
-        reset_mode: ResetMode,
-    },
-    CheckoutFiles {
-        repo: GitRepo,
-        commit: SharedString,
-        paths: Vec<RepoPath>,
-    },
-    Stage(GitRepo, Vec<RepoPath>),
-    Unstage(GitRepo, Vec<RepoPath>),
-    SetIndexText(GitRepo, RepoPath, Option<String>),
-    Push {
-        repo: GitRepo,
-        branch_name: SharedString,
-        remote_name: SharedString,
-        options: Option<PushOptions>,
-    },
-    Pull {
-        repo: GitRepo,
-        branch_name: SharedString,
-        remote_name: SharedString,
-    },
-    Fetch(GitRepo),
-}
-
-#[derive(Debug)]
 pub enum GitEvent {
     ActiveRepositoryChanged,
     FileSystemUpdated,
@@ -220,7 +189,7 @@ impl GitStore {
                                 worktree_id,
                                 repository_entry: repo.clone(),
                                 git_repo,
-                                update_sender: self.update_sender.clone(),
+                                job_sender: self.update_sender.clone(),
                                 merge_message,
                                 commit_message_buffer: None,
                             })
@@ -252,272 +221,15 @@ impl GitStore {
         self.repositories.clone()
     }
 
-    fn spawn_git_worker(
-        cx: &mut Context<'_, GitStore>,
-    ) -> mpsc::UnboundedSender<(Message, oneshot::Sender<Result<()>>)> {
-        let (update_sender, mut update_receiver) =
-            mpsc::unbounded::<(Message, oneshot::Sender<Result<()>>)>();
-        cx.spawn(|_, cx| async move {
-            while let Some((msg, respond)) = update_receiver.next().await {
-                if !respond.is_canceled() {
-                    let result = cx.background_spawn(Self::process_git_msg(msg)).await;
-                    respond.send(result).ok();
-                }
+    fn spawn_git_worker(cx: &mut Context<'_, GitStore>) -> mpsc::UnboundedSender<GitJob> {
+        let (job_tx, mut job_rx) = mpsc::unbounded::<GitJob>();
+        cx.spawn(|_, mut cx| async move {
+            while let Some(job) = job_rx.next().await {
+                job(&mut cx).await
             }
         })
         .detach();
-        update_sender
-    }
-
-    async fn process_git_msg(msg: Message) -> Result<()> {
-        match msg {
-            Message::Fetch(repo) => {
-                match repo {
-                    GitRepo::Local(git_repository) => git_repository.fetch()?,
-                    GitRepo::Remote {
-                        project_id,
-                        client,
-                        worktree_id,
-                        work_directory_id,
-                    } => {
-                        client
-                            .request(proto::Fetch {
-                                project_id: project_id.0,
-                                worktree_id: worktree_id.to_proto(),
-                                work_directory_id: work_directory_id.to_proto(),
-                            })
-                            .await
-                            .context("sending fetch request")?;
-                    }
-                }
-                Ok(())
-            }
-
-            Message::Pull {
-                repo,
-                branch_name,
-                remote_name,
-            } => {
-                match repo {
-                    GitRepo::Local(git_repository) => {
-                        git_repository.pull(&branch_name, &remote_name)?
-                    }
-                    GitRepo::Remote {
-                        project_id,
-                        client,
-                        worktree_id,
-                        work_directory_id,
-                    } => {
-                        client
-                            .request(proto::Pull {
-                                project_id: project_id.0,
-                                worktree_id: worktree_id.to_proto(),
-                                work_directory_id: work_directory_id.to_proto(),
-                                branch_name: branch_name.to_string(),
-                                remote_name: remote_name.to_string(),
-                            })
-                            .await
-                            .context("sending pull request")?;
-                    }
-                }
-                Ok(())
-            }
-            Message::Push {
-                repo,
-                branch_name,
-                remote_name,
-                options,
-            } => {
-                match repo {
-                    GitRepo::Local(git_repository) => {
-                        git_repository.push(&branch_name, &remote_name, options)?
-                    }
-                    GitRepo::Remote {
-                        project_id,
-                        client,
-                        worktree_id,
-                        work_directory_id,
-                    } => {
-                        client
-                            .request(proto::Push {
-                                project_id: project_id.0,
-                                worktree_id: worktree_id.to_proto(),
-                                work_directory_id: work_directory_id.to_proto(),
-                                branch_name: branch_name.to_string(),
-                                remote_name: remote_name.to_string(),
-                                options: options.map(|options| match options {
-                                    PushOptions::Force => proto::push::PushOptions::Force,
-                                    PushOptions::SetUpstream => {
-                                        proto::push::PushOptions::SetUpstream
-                                    }
-                                }
-                                    as i32),
-                            })
-                            .await
-                            .context("sending push request")?;
-                    }
-                }
-                Ok(())
-            }
-            Message::Stage(repo, paths) => {
-                match repo {
-                    GitRepo::Local(repo) => repo.stage_paths(&paths)?,
-                    GitRepo::Remote {
-                        project_id,
-                        client,
-                        worktree_id,
-                        work_directory_id,
-                    } => {
-                        client
-                            .request(proto::Stage {
-                                project_id: project_id.0,
-                                worktree_id: worktree_id.to_proto(),
-                                work_directory_id: work_directory_id.to_proto(),
-                                paths: paths
-                                    .into_iter()
-                                    .map(|repo_path| repo_path.as_ref().to_proto())
-                                    .collect(),
-                            })
-                            .await
-                            .context("sending stage request")?;
-                    }
-                }
-                Ok(())
-            }
-            Message::Reset {
-                repo,
-                commit,
-                reset_mode,
-            } => {
-                match repo {
-                    GitRepo::Local(repo) => repo.reset(&commit, reset_mode)?,
-                    GitRepo::Remote {
-                        project_id,
-                        client,
-                        worktree_id,
-                        work_directory_id,
-                    } => {
-                        client
-                            .request(proto::GitReset {
-                                project_id: project_id.0,
-                                worktree_id: worktree_id.to_proto(),
-                                work_directory_id: work_directory_id.to_proto(),
-                                commit: commit.into(),
-                                mode: match reset_mode {
-                                    ResetMode::Soft => git_reset::ResetMode::Soft.into(),
-                                    ResetMode::Mixed => git_reset::ResetMode::Mixed.into(),
-                                },
-                            })
-                            .await?;
-                    }
-                }
-                Ok(())
-            }
-
-            Message::CheckoutFiles {
-                repo,
-                commit,
-                paths,
-            } => {
-                match repo {
-                    GitRepo::Local(repo) => repo.checkout_files(&commit, &paths)?,
-                    GitRepo::Remote {
-                        project_id,
-                        client,
-                        worktree_id,
-                        work_directory_id,
-                    } => {
-                        client
-                            .request(proto::GitCheckoutFiles {
-                                project_id: project_id.0,
-                                worktree_id: worktree_id.to_proto(),
-                                work_directory_id: work_directory_id.to_proto(),
-                                commit: commit.into(),
-                                paths: paths
-                                    .into_iter()
-                                    .map(|p| p.to_string_lossy().to_string())
-                                    .collect(),
-                            })
-                            .await?;
-                    }
-                }
-                Ok(())
-            }
-            Message::Unstage(repo, paths) => {
-                match repo {
-                    GitRepo::Local(repo) => repo.unstage_paths(&paths)?,
-                    GitRepo::Remote {
-                        project_id,
-                        client,
-                        worktree_id,
-                        work_directory_id,
-                    } => {
-                        client
-                            .request(proto::Unstage {
-                                project_id: project_id.0,
-                                worktree_id: worktree_id.to_proto(),
-                                work_directory_id: work_directory_id.to_proto(),
-                                paths: paths
-                                    .into_iter()
-                                    .map(|repo_path| repo_path.as_ref().to_proto())
-                                    .collect(),
-                            })
-                            .await
-                            .context("sending unstage request")?;
-                    }
-                }
-                Ok(())
-            }
-            Message::Commit {
-                git_repo,
-                message,
-                name_and_email,
-            } => {
-                match git_repo {
-                    GitRepo::Local(repo) => repo.commit(
-                        message.as_ref(),
-                        name_and_email
-                            .as_ref()
-                            .map(|(name, email)| (name.as_ref(), email.as_ref())),
-                    )?,
-                    GitRepo::Remote {
-                        project_id,
-                        client,
-                        worktree_id,
-                        work_directory_id,
-                    } => {
-                        let (name, email) = name_and_email.unzip();
-                        client
-                            .request(proto::Commit {
-                                project_id: project_id.0,
-                                worktree_id: worktree_id.to_proto(),
-                                work_directory_id: work_directory_id.to_proto(),
-                                message: String::from(message),
-                                name: name.map(String::from),
-                                email: email.map(String::from),
-                            })
-                            .await
-                            .context("sending commit request")?;
-                    }
-                }
-                Ok(())
-            }
-            Message::SetIndexText(git_repo, path, text) => match git_repo {
-                GitRepo::Local(repo) => repo.set_index_text(&path, text),
-                GitRepo::Remote {
-                    project_id,
-                    client,
-                    worktree_id,
-                    work_directory_id,
-                } => client.send(proto::SetIndexText {
-                    project_id: project_id.0,
-                    worktree_id: worktree_id.to_proto(),
-                    work_directory_id: work_directory_id.to_proto(),
-                    path: path.as_ref().to_proto(),
-                    text,
-                }),
-            },
-        }
+        job_tx
     }
 
     async fn handle_fetch(
@@ -696,10 +408,10 @@ impl GitStore {
         let branch_name = envelope.payload.branch_name;
 
         let remotes = repository_handle
-            .update(&mut cx, |repository_handle, cx| {
-                repository_handle.get_remotes(branch_name, cx)
+            .update(&mut cx, |repository_handle, _| {
+                repository_handle.get_remotes(branch_name)
             })?
-            .await?;
+            .await??;
 
         Ok(proto::GetRemotesResponse {
             remotes: remotes
@@ -722,10 +434,10 @@ impl GitStore {
             Self::repository_for_request(&this, worktree_id, work_directory_id, &mut cx)?;
 
         let commit = repository_handle
-            .update(&mut cx, |repository_handle, cx| {
-                repository_handle.show(&envelope.payload.commit, cx)
+            .update(&mut cx, |repository_handle, _| {
+                repository_handle.show(&envelope.payload.commit)
             })?
-            .await?;
+            .await??;
         Ok(proto::GitCommitDetails {
             sha: commit.sha.into(),
             message: commit.message.into(),
@@ -854,6 +566,26 @@ impl Repository {
         self.repository_entry.branch()
     }
 
+    fn send_job<F, Fut, R>(&self, job: F) -> oneshot::Receiver<R>
+    where
+        F: FnOnce(GitRepo) -> Fut + 'static,
+        Fut: Future<Output = R> + Send + 'static,
+        R: Send + 'static,
+    {
+        let (result_tx, result_rx) = futures::channel::oneshot::channel();
+        let git_repo = self.git_repo.clone();
+        self.job_sender
+            .unbounded_send(Box::new(|cx: &mut AsyncApp| {
+                let job = job(git_repo);
+                cx.background_spawn(async move {
+                    let result = job.await;
+                    result_tx.send(result).ok();
+                })
+            }))
+            .ok();
+        result_rx
+    }
+
     pub fn display_name(&self, project: &Project, cx: &App) -> SharedString {
         maybe!({
             let project_path = self.repo_path_to_project_path(&"".into())?;
@@ -1004,60 +736,106 @@ impl Repository {
         commit: &str,
         paths: Vec<RepoPath>,
     ) -> oneshot::Receiver<Result<()>> {
-        self.send_message(Message::CheckoutFiles {
-            repo: self.git_repo.clone(),
-            commit: commit.to_string().into(),
-            paths,
+        let commit = commit.to_string();
+        self.send_job(|git_repo| async move {
+            match git_repo {
+                GitRepo::Local(repo) => repo.checkout_files(&commit, &paths),
+                GitRepo::Remote {
+                    project_id,
+                    client,
+                    worktree_id,
+                    work_directory_id,
+                } => {
+                    client
+                        .request(proto::GitCheckoutFiles {
+                            project_id: project_id.0,
+                            worktree_id: worktree_id.to_proto(),
+                            work_directory_id: work_directory_id.to_proto(),
+                            commit,
+                            paths: paths
+                                .into_iter()
+                                .map(|p| p.to_string_lossy().to_string())
+                                .collect(),
+                        })
+                        .await?;
+
+                    Ok(())
+                }
+            }
         })
     }
 
     pub fn reset(&self, commit: &str, reset_mode: ResetMode) -> oneshot::Receiver<Result<()>> {
-        self.send_message(Message::Reset {
-            repo: self.git_repo.clone(),
-            commit: commit.to_string().into(),
-            reset_mode,
+        let commit = commit.to_string();
+        self.send_job(|git_repo| async move {
+            match git_repo {
+                GitRepo::Local(git_repo) => git_repo.reset(&commit, reset_mode),
+                GitRepo::Remote {
+                    project_id,
+                    client,
+                    worktree_id,
+                    work_directory_id,
+                } => {
+                    client
+                        .request(proto::GitReset {
+                            project_id: project_id.0,
+                            worktree_id: worktree_id.to_proto(),
+                            work_directory_id: work_directory_id.to_proto(),
+                            commit,
+                            mode: match reset_mode {
+                                ResetMode::Soft => git_reset::ResetMode::Soft.into(),
+                                ResetMode::Mixed => git_reset::ResetMode::Mixed.into(),
+                            },
+                        })
+                        .await?;
+
+                    Ok(())
+                }
+            }
         })
     }
 
-    pub fn show(&self, commit: &str, cx: &Context<Self>) -> Task<Result<CommitDetails>> {
+    pub fn show(&self, commit: &str) -> oneshot::Receiver<Result<CommitDetails>> {
         let commit = commit.to_string();
-        match self.git_repo.clone() {
-            GitRepo::Local(git_repository) => {
-                let commit = commit.to_string();
-                cx.background_spawn(async move { git_repository.show(&commit) })
-            }
-            GitRepo::Remote {
-                project_id,
-                client,
-                worktree_id,
-                work_directory_id,
-            } => cx.background_spawn(async move {
-                let resp = client
-                    .request(proto::GitShow {
-                        project_id: project_id.0,
-                        worktree_id: worktree_id.to_proto(),
-                        work_directory_id: work_directory_id.to_proto(),
-                        commit,
+        self.send_job(|git_repo| async move {
+            match git_repo {
+                GitRepo::Local(git_repository) => git_repository.show(&commit),
+                GitRepo::Remote {
+                    project_id,
+                    client,
+                    worktree_id,
+                    work_directory_id,
+                } => {
+                    let resp = client
+                        .request(proto::GitShow {
+                            project_id: project_id.0,
+                            worktree_id: worktree_id.to_proto(),
+                            work_directory_id: work_directory_id.to_proto(),
+                            commit,
+                        })
+                        .await?;
+
+                    Ok(CommitDetails {
+                        sha: resp.sha.into(),
+                        message: resp.message.into(),
+                        commit_timestamp: resp.commit_timestamp,
+                        committer_email: resp.committer_email.into(),
+                        committer_name: resp.committer_name.into(),
                     })
-                    .await?;
-
-                Ok(CommitDetails {
-                    sha: resp.sha.into(),
-                    message: resp.message.into(),
-                    commit_timestamp: resp.commit_timestamp,
-                    committer_email: resp.committer_email.into(),
-                    committer_name: resp.committer_name.into(),
-                })
-            }),
-        }
+                }
+            }
+        })
     }
 
     fn buffer_store(&self, cx: &App) -> Option<Entity<BufferStore>> {
         Some(self.git_store.upgrade()?.read(cx).buffer_store.clone())
     }
 
-    pub fn stage_entries(&self, entries: Vec<RepoPath>, cx: &mut App) -> Task<anyhow::Result<()>> {
-        let (result_tx, result_rx) = futures::channel::oneshot::channel();
+    pub fn stage_entries(
+        &self,
+        entries: Vec<RepoPath>,
+        cx: &mut Context<Self>,
+    ) -> Task<anyhow::Result<()>> {
         if entries.is_empty() {
             return Task::ready(Ok(()));
         }
@@ -1083,16 +861,41 @@ impl Repository {
             })
         }
 
-        let update_sender = self.update_sender.clone();
-        let git_repo = self.git_repo.clone();
-        cx.spawn(|_| async move {
+        cx.spawn(|this, mut cx| async move {
             for save_future in save_futures {
                 save_future.await?;
             }
-            update_sender
-                .unbounded_send((Message::Stage(git_repo, entries), result_tx))
-                .ok();
-            result_rx.await.anyhow()??;
+
+            this.update(&mut cx, |this, _| {
+                this.send_job(|git_repo| async move {
+                    match git_repo {
+                        GitRepo::Local(repo) => repo.stage_paths(&entries),
+                        GitRepo::Remote {
+                            project_id,
+                            client,
+                            worktree_id,
+                            work_directory_id,
+                        } => {
+                            client
+                                .request(proto::Stage {
+                                    project_id: project_id.0,
+                                    worktree_id: worktree_id.to_proto(),
+                                    work_directory_id: work_directory_id.to_proto(),
+                                    paths: entries
+                                        .into_iter()
+                                        .map(|repo_path| repo_path.as_ref().to_proto())
+                                        .collect(),
+                                })
+                                .await
+                                .context("sending stage request")?;
+
+                            Ok(())
+                        }
+                    }
+                })
+            })?
+            .await??;
+
             Ok(())
         })
     }
@@ -1100,9 +903,8 @@ impl Repository {
     pub fn unstage_entries(
         &self,
         entries: Vec<RepoPath>,
-        cx: &mut App,
+        cx: &mut Context<Self>,
     ) -> Task<anyhow::Result<()>> {
-        let (result_tx, result_rx) = futures::channel::oneshot::channel();
         if entries.is_empty() {
             return Task::ready(Ok(()));
         }
@@ -1128,21 +930,46 @@ impl Repository {
             })
         }
 
-        let update_sender = self.update_sender.clone();
-        let git_repo = self.git_repo.clone();
-        cx.spawn(|_| async move {
+        cx.spawn(move |this, mut cx| async move {
             for save_future in save_futures {
                 save_future.await?;
             }
-            update_sender
-                .unbounded_send((Message::Unstage(git_repo, entries), result_tx))
-                .ok();
-            result_rx.await.anyhow()??;
+
+            this.update(&mut cx, |this, _| {
+                this.send_job(|git_repo| async move {
+                    match git_repo {
+                        GitRepo::Local(repo) => repo.unstage_paths(&entries),
+                        GitRepo::Remote {
+                            project_id,
+                            client,
+                            worktree_id,
+                            work_directory_id,
+                        } => {
+                            client
+                                .request(proto::Unstage {
+                                    project_id: project_id.0,
+                                    worktree_id: worktree_id.to_proto(),
+                                    work_directory_id: work_directory_id.to_proto(),
+                                    paths: entries
+                                        .into_iter()
+                                        .map(|repo_path| repo_path.as_ref().to_proto())
+                                        .collect(),
+                                })
+                                .await
+                                .context("sending unstage request")?;
+
+                            Ok(())
+                        }
+                    }
+                })
+            })?
+            .await??;
+
             Ok(())
         })
     }
 
-    pub fn stage_all(&self, cx: &mut App) -> Task<anyhow::Result<()>> {
+    pub fn stage_all(&self, cx: &mut Context<Self>) -> Task<anyhow::Result<()>> {
         let to_stage = self
             .repository_entry
             .status()
@@ -1152,7 +979,7 @@ impl Repository {
         self.stage_entries(to_stage, cx)
     }
 
-    pub fn unstage_all(&self, cx: &mut App) -> Task<anyhow::Result<()>> {
+    pub fn unstage_all(&self, cx: &mut Context<Self>) -> Task<anyhow::Result<()>> {
         let to_unstage = self
             .repository_entry
             .status()
@@ -1185,15 +1012,62 @@ impl Repository {
         message: SharedString,
         name_and_email: Option<(SharedString, SharedString)>,
     ) -> oneshot::Receiver<Result<()>> {
-        self.send_message(Message::Commit {
-            git_repo: self.git_repo.clone(),
-            message,
-            name_and_email,
+        self.send_job(|git_repo| async move {
+            match git_repo {
+                GitRepo::Local(repo) => repo.commit(
+                    message.as_ref(),
+                    name_and_email
+                        .as_ref()
+                        .map(|(name, email)| (name.as_ref(), email.as_ref())),
+                ),
+                GitRepo::Remote {
+                    project_id,
+                    client,
+                    worktree_id,
+                    work_directory_id,
+                } => {
+                    let (name, email) = name_and_email.unzip();
+                    client
+                        .request(proto::Commit {
+                            project_id: project_id.0,
+                            worktree_id: worktree_id.to_proto(),
+                            work_directory_id: work_directory_id.to_proto(),
+                            message: String::from(message),
+                            name: name.map(String::from),
+                            email: email.map(String::from),
+                        })
+                        .await
+                        .context("sending commit request")?;
+
+                    Ok(())
+                }
+            }
         })
     }
 
     pub fn fetch(&self) -> oneshot::Receiver<Result<()>> {
-        self.send_message(Message::Fetch(self.git_repo.clone()))
+        self.send_job(|git_repo| async move {
+            match git_repo {
+                GitRepo::Local(git_repository) => git_repository.fetch(),
+                GitRepo::Remote {
+                    project_id,
+                    client,
+                    worktree_id,
+                    work_directory_id,
+                } => {
+                    client
+                        .request(proto::Fetch {
+                            project_id: project_id.0,
+                            worktree_id: worktree_id.to_proto(),
+                            work_directory_id: work_directory_id.to_proto(),
+                        })
+                        .await
+                        .context("sending fetch request")?;
+
+                    Ok(())
+                }
+            }
+        })
     }
 
     pub fn push(
@@ -1202,11 +1076,33 @@ impl Repository {
         remote: SharedString,
         options: Option<PushOptions>,
     ) -> oneshot::Receiver<Result<()>> {
-        self.send_message(Message::Push {
-            repo: self.git_repo.clone(),
-            branch_name: branch,
-            remote_name: remote,
-            options,
+        self.send_job(move |git_repo| async move {
+            match git_repo {
+                GitRepo::Local(git_repository) => git_repository.push(&branch, &remote, options),
+                GitRepo::Remote {
+                    project_id,
+                    client,
+                    worktree_id,
+                    work_directory_id,
+                } => {
+                    client
+                        .request(proto::Push {
+                            project_id: project_id.0,
+                            worktree_id: worktree_id.to_proto(),
+                            work_directory_id: work_directory_id.to_proto(),
+                            branch_name: branch.to_string(),
+                            remote_name: remote.to_string(),
+                            options: options.map(|options| match options {
+                                PushOptions::Force => proto::push::PushOptions::Force,
+                                PushOptions::SetUpstream => proto::push::PushOptions::SetUpstream,
+                            } as i32),
+                        })
+                        .await
+                        .context("sending push request")?;
+
+                    Ok(())
+                }
+            }
         })
     }
 
@@ -1215,10 +1111,30 @@ impl Repository {
         branch: SharedString,
         remote: SharedString,
     ) -> oneshot::Receiver<Result<()>> {
-        self.send_message(Message::Pull {
-            repo: self.git_repo.clone(),
-            branch_name: branch,
-            remote_name: remote,
+        self.send_job(|git_repo| async move {
+            match git_repo {
+                GitRepo::Local(git_repository) => git_repository.pull(&branch, &remote),
+                GitRepo::Remote {
+                    project_id,
+                    client,
+                    worktree_id,
+                    work_directory_id,
+                } => {
+                    client
+                        .request(proto::Pull {
+                            project_id: project_id.0,
+                            worktree_id: worktree_id.to_proto(),
+                            work_directory_id: work_directory_id.to_proto(),
+                            branch_name: branch.to_string(),
+                            remote_name: remote.to_string(),
+                        })
+                        .await
+                        .context("sending pull request")?;
+
+                    // TODO: wire through remote
+                    Ok(())
+                }
+            }
         })
     }
 
@@ -1227,49 +1143,66 @@ impl Repository {
         path: &RepoPath,
         content: Option<String>,
     ) -> oneshot::Receiver<anyhow::Result<()>> {
-        self.send_message(Message::SetIndexText(
-            self.git_repo.clone(),
-            path.clone(),
-            content,
-        ))
-    }
-
-    pub fn get_remotes(&self, branch_name: Option<String>, cx: &App) -> Task<Result<Vec<Remote>>> {
-        match self.git_repo.clone() {
-            GitRepo::Local(git_repository) => {
-                cx.background_spawn(
-                    async move { git_repository.get_remotes(branch_name.as_deref()) },
-                )
+        let path = path.clone();
+        self.send_job(|git_repo| async move {
+            match git_repo {
+                GitRepo::Local(repo) => repo.set_index_text(&path, content),
+                GitRepo::Remote {
+                    project_id,
+                    client,
+                    worktree_id,
+                    work_directory_id,
+                } => {
+                    client
+                        .request(proto::SetIndexText {
+                            project_id: project_id.0,
+                            worktree_id: worktree_id.to_proto(),
+                            work_directory_id: work_directory_id.to_proto(),
+                            path: path.as_ref().to_proto(),
+                            text: content,
+                        })
+                        .await?;
+                    Ok(())
+                }
             }
-            GitRepo::Remote {
-                project_id,
-                client,
-                worktree_id,
-                work_directory_id,
-            } => cx.background_spawn(async move {
-                let response = client
-                    .request(proto::GetRemotes {
-                        project_id: project_id.0,
-                        worktree_id: worktree_id.to_proto(),
-                        work_directory_id: work_directory_id.to_proto(),
-                        branch_name,
-                    })
-                    .await?;
-
-                Ok(response
-                    .remotes
-                    .into_iter()
-                    .map(|remotes| git::repository::Remote {
-                        name: remotes.name.into(),
-                    })
-                    .collect())
-            }),
-        }
+        })
     }
 
-    fn send_message(&self, message: Message) -> oneshot::Receiver<anyhow::Result<()>> {
-        let (result_tx, result_rx) = futures::channel::oneshot::channel();
-        self.update_sender.unbounded_send((message, result_tx)).ok();
-        result_rx
+    pub fn get_remotes(
+        &self,
+        branch_name: Option<String>,
+    ) -> oneshot::Receiver<Result<Vec<Remote>>> {
+        self.send_job(|repo| async move {
+            match repo {
+                GitRepo::Local(git_repository) => {
+                    git_repository.get_remotes(branch_name.as_deref())
+                }
+                GitRepo::Remote {
+                    project_id,
+                    client,
+                    worktree_id,
+                    work_directory_id,
+                } => {
+                    let response = client
+                        .request(proto::GetRemotes {
+                            project_id: project_id.0,
+                            worktree_id: worktree_id.to_proto(),
+                            work_directory_id: work_directory_id.to_proto(),
+                            branch_name,
+                        })
+                        .await?;
+
+                    let remotes = response
+                        .remotes
+                        .into_iter()
+                        .map(|remotes| git::repository::Remote {
+                            name: remotes.name.into(),
+                        })
+                        .collect();
+
+                    Ok(remotes)
+                }
+            }
+        })
     }
 }